import asyncio import json from pathlib import Path import logging import yaml import requests from redis import asyncio as aioredis async def read_event(): logging.basicConfig(level=logging.INFO) # 读取配置文件 home_dir = Path.home() with open(home_dir / '.config' / 'hik-push' / 'config.yaml', 'r', encoding='utf-8') as f: config = yaml.safe_load(f) if "user_ids" not in config: logging.error("请配置user_ids") return if "devices" not in config: logging.error("请配置devices") return if "event_type" not in config: logging.error("请配置event_type") return if "push_url" not in config: logging.error("请配置push_url") return logging.info(config) user_map = config['user_ids'] device_map = config['devices'] event_map = config['event_type'] push_url = config['push_url'] redis_client = await aioredis.Redis(host="127.0.0.1", port=7019, password="SMHdFrlK") while True: try: data = await redis_client.brpop("hik-sub-event") sub_json = json.loads(data[1].decode('utf-8')) events = sub_json["params"]["events"] for event in events: # 如果是ai事件 if event["srcType"] == "eventRule": event_details = event["eventDetails"] for event_detail in event_details: # 事件类型 id 替换为 str event_type_str = event_map.get(event_detail['eventType'], "未知事件类型") event_detail['eventType'] = event_type_str # 添加设备名称 src_index = event_detail["srcIndex"] device_name = device_map.get(src_index, "未知设备") event_detail['deviceName'] = device_name # 根据设备名称获取 user_ids user_all = user_map.get("all", []) # 接受全部通知的用户 user_ids = user_map.get(device_name, []) user_ids = user_ids + user_all event_detail['userIds'] = user_ids # 添加其他字段 event_detail["eventLvl"] = event['eventLvl'] event_detail["happenTime"] = event['happenTime'] # 如果存在 data 属性 if "data" in event_detail: # 替换分析结果字段 if "eventType" in event_detail['data']: detection_field_name = event_detail['data']['eventType'] if detection_field_name in event_detail['data']: event_detail['data']["_detectionResult"] = event_detail['data'].pop( detection_field_name) replace_image_host(event_detail['data']["_detectionResult"]) logging.info(event_detail) try: push_resp = requests.post(push_url, json=event_detail).content.decode('utf-8') logging.info(push_resp) except Exception as e: logging.error(f"网络错误推送失败: {e}") else: # 将 eventType ID 替换为中文字符串 event_type = event['eventType'] logging.info(event_type) event_type_str = event_map.get(event_type, "未知事件类型") event['eventType'] = event_type_str # 添加设备名称 src_index = event["srcIndex"] device_name = device_map.get(src_index, "未知设备") # 根据设备名称获取 user_ids user_all = user_map.get("all", []) # 接受全部通知的用户 user_ids = user_map.get(device_name, []) user_ids = user_ids + user_all event['deviceName'] = device_name event['userIds'] = user_ids if "data" in event: # 如果存在 data 属性 # 替换分析结果字段 if "eventType" in event['data']: detection_field_name = event['data']['eventType'] if detection_field_name in event['data']: event['data']["_detectionResult"] = event['data'].pop(detection_field_name) replace_image_host(event['data']["_detectionResult"]) # 请求推送 api logging.info(event) try: push_resp = requests.post(push_url, json=event).content.decode('utf-8') logging.info(push_resp) except Exception as e: logging.error(f"网络错误推送失败: {e}") except Exception as e: logging.error("error: ", e) continue finally: await asyncio.sleep(0.5) def replace_image_host(detection_data): if "imageUrl" in detection_data: detection_data["imageUrl"] = detection_data["imageUrl"].replace("192.168.1.250", "192.168.11.180") if "visiblePicUrl" in detection_data: detection_data["visiblePicUrl"] = detection_data["visiblePicUrl"].replace("192.168.1.250", "192.168.11.180") def run_app(): asyncio.run(read_event()) if __name__ == "__main__": asyncio.run(read_event())