import asyncio import json from pathlib import Path import yaml from hik_push.mapping.event_map import event_map from redis import asyncio as aioredis async def read_event(): # 读取配置文件 home_dir = Path.home() with open(home_dir / '.config' / 'hik-push' / 'config.yaml', 'r', encoding='utf-8') as f: config = yaml.safe_load(f) if "user_ids" not in config: print("请配置user_ids") return if "devices" not in config: print("请配置devices") return print(config) user_map = config['user_ids'] device_map = config['devices'] redis_client = await aioredis.Redis(host="127.0.0.1", port=6379) while True: data = await redis_client.brpop("hik-sub-event") sub_json = json.loads(data[1].decode('utf-8')) try: events = sub_json["params"]["events"] for event in events: # 将eventType ID 替换为中文字符串 event_type = event['eventType'] event_type_str = event_map.get(event_type, "未知事件类型") event['eventType'] = event_type_str if "data" in event: # 如果存在 data 属性 # ip_address ip_address = event["data"]["ipAddress"] # 根据 ip_address 获取对应设备名称 device_name = device_map.get(ip_address, "未知设备") event['data']['deviceName'] = device_name # 根据 ip_address 获取对应 userid 数组 user_ids = user_map.get(ip_address, []) event['data']['userIds'] = user_ids # 替换分析结果字段 if "eventType" in event['data']: detection_field_name = event['data']['eventType'] if detection_field_name in event['data']: event['data']["_detectionResult"] = event['data'].pop(detection_field_name) # TODO: 请求推送 api # print(event) except Exception as e: print("error: ", e) continue finally: await asyncio.sleep(0.5) def run_app(): asyncio.run(read_event()) if __name__ == "__main__": asyncio.run(read_event())