import asyncio import json import logging from pathlib import Path import requests import yaml from redis import asyncio as aioredis from redis.asyncio import Redis def read_config(): # 读取配置文件 home_dir = Path.home() with open(home_dir / ".config" / "hik-push" / "config.yaml", "r", encoding="utf-8") as f: config = yaml.safe_load(f) if "user_ids" not in config: raise Exception("请配置user_ids") if "devices" not in config: raise Exception("请配置devices") if "event_type" not in config: raise Exception("请配置event_type") if "push_url" not in config: raise Exception("请配置push_url") if "event_level" not in config: raise Exception("请配置event_level") logging.info(config) return config def replace_image_host(detection_data): if "imageUrl" in detection_data: detection_data["imageUrl"] = detection_data["imageUrl"].replace( "192.168.1.250", "192.168.11.180" ) if "visiblePicUrl" in detection_data: detection_data["visiblePicUrl"] = detection_data["visiblePicUrl"].replace( "192.168.1.250", "192.168.11.180" ) def push_notification(push_url: str, notify_content: dict): try: push_resp = requests.post(push_url, json=notify_content).content.decode( "utf-8" ) logging.info(push_resp) except Exception as e: logging.error(f"网络错误推送失败: {e}") async def handle_event_detail(event_detail: dict, config: dict, redis: Redis): user_map = config["user_ids"] device_map = config["devices"] event_map = config["event_type"] event_level = config["event_level"] # 事件类型 id 替换为 事件名称字符串 event_type = event_detail["eventType"] event_type_str = event_map.get( event_type, "未知事件类型" ) event_detail["eventType"] = event_type_str # 添加设备名称 src_index = event_detail["srcIndex"] device_name = device_map.get(src_index, "未知设备") event_detail["deviceName"] = device_name # 从redis中查询最近是否发生过该事件 happen_recent = await redis.get(name=f"hik-push-interval:{device_name}:{event_type}") # 如果同一地点最近发生过同一事件则不推送 if happen_recent is not None: return # 如果事件类型为区域入侵或吸烟or打电话 if event_type == 131588 or event_type == 422400002: await redis.set(name=f"hik-push-interval:{device_name}:{event_type}", value=event_detail["happenTime"], ex=80) elif event_type == 192518 or event_type == 192517: await redis.set(name=f"hik-push-interval:{device_name}:{event_type}", value=event_detail["happenTime"], ex=60) # 添加regionName if "srcName" in event_detail: event_detail["regionName"] = event_detail["srcName"] else: event_detail["regionName"] = device_name # 设置默认的事件等级,高级-3,其他为中级-2 if event_type in event_level["high"]: event_detail["eventLvl"] = 3 elif "eventLvl" not in event_detail: event_detail["eventLvl"] = 2 # 根据设备名称获取 user_ids # 高级事件 if event_detail["eventLvl"] == 3: user_all = user_map.get("all", {}).get("high", []) + user_map.get("all", {}).get("all", []) # 接受全部通知的用户 user_ids = user_map.get(device_name, {}).get("high", []) + user_map.get(device_name, {}).get("all", []) user_ids = user_ids + user_all # 除高级外均为中级 else: user_all = user_map.get("all", {}).get("all", []) # 接受全部通知的用户 user_ids = user_map.get(device_name, {}).get("all", []) user_ids = user_ids + user_all event_detail["userIds"] = user_ids # 如果存在 data 属性 if "data" in event_detail: # 替换分析结果字段 if "eventType" in event_detail["data"]: # 事件的分析结果字段名称 detection_field_name = event_detail["data"]["eventType"] if detection_field_name in event_detail["data"]: # 不同事件的分析结果字段名称不同,将字段名称替换为_detectionResult event_detail["data"]["_detectionResult"] = ( event_detail["data"].pop(detection_field_name) ) # 如果_detectionResult类型为数组,则将该数组第一项赋值给_detectionResult if isinstance( event_detail["data"]["_detectionResult"], list ) and len(event_detail["data"]["_detectionResult"]): event_detail["data"]["_detectionResult"] = ( event_detail["data"]["_detectionResult"][0] ) # 如果typeName字段不存在,则将事件类型名称赋值给typeName字段 if "typeName" not in event_detail["data"]["_detectionResult"]: event_detail["data"]["_detectionResult"]["typeName"] = event_type_str # 替换图片链接url的host replace_image_host( event_detail["data"]["_detectionResult"] ) logging.info(event_detail) return event_detail async def push_loop(): logging.basicConfig(level=logging.INFO) config = read_config() redis_client = await aioredis.Redis( host="127.0.0.1", port=7019, password="SMHdFrlK" ) push_url = config["push_url"] while True: try: data = await redis_client.brpop("hik-sub-event") sub_json = json.loads(data[1].decode("utf-8")) events = sub_json["params"]["events"] for event in events: notify_json = None # AI事件 if event["srcType"] == "eventRule": event_details = event["eventDetails"] if len(event_details): event_detail = event_details[0] event_detail["eventLvl"] = event["eventLvl"] event_detail["happenTime"] = event["happenTime"] notify_json = await handle_event_detail(event_detail, config, redis_client) else: notify_json = await handle_event_detail(event, config, redis_client) if notify_json is not None: push_notification(push_url, notify_json) else: logging.error("事件间隔小于80秒") except Exception as e: logging.error("error: ", e) continue finally: await asyncio.sleep(0.5) def run_app(): asyncio.run(push_loop()) if __name__ == "__main__": asyncio.run(push_loop())