Files
hik-push/hik_push/push_notification.py
2024-03-21 15:53:55 +08:00

171 lines
6.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import asyncio
import json
import logging
from pathlib import Path
import requests
import yaml
from redis import asyncio as aioredis
def read_config():
# 读取配置文件
home_dir = Path.home()
with open(home_dir / ".config" / "hik-push" / "config.yaml", "r", encoding="utf-8") as f:
config = yaml.safe_load(f)
if "user_ids" not in config:
raise Exception("请配置user_ids")
if "devices" not in config:
raise Exception("请配置devices")
if "event_type" not in config:
raise Exception("请配置event_type")
if "push_url" not in config:
raise Exception("请配置push_url")
if "event_level" not in config:
raise Exception("请配置event_level")
logging.info(config)
return config
def replace_image_host(detection_data):
if "imageUrl" in detection_data:
detection_data["imageUrl"] = detection_data["imageUrl"].replace(
"192.168.1.250", "192.168.11.180"
)
if "visiblePicUrl" in detection_data:
detection_data["visiblePicUrl"] = detection_data["visiblePicUrl"].replace(
"192.168.1.250", "192.168.11.180"
)
def push_notification(push_url: str, notify_content: dict):
try:
push_resp = requests.post(push_url, json=notify_content).content.decode(
"utf-8"
)
logging.info(push_resp)
except Exception as e:
logging.error(f"网络错误推送失败: {e}")
def handle_event_detail(event_detail: dict, config: dict):
user_map = config["user_ids"]
device_map = config["devices"]
event_map = config["event_type"]
event_level = config["event_level"]
# 事件类型 id 替换为 事件名称字符串
event_type = event_detail["eventType"]
event_type_str = event_map.get(
event_type, "未知事件类型"
)
event_detail["eventType"] = event_type_str
# 添加设备名称
src_index = event_detail["srcIndex"]
device_name = device_map.get(src_index, "未知设备")
event_detail["deviceName"] = device_name
# 添加regionName
if "srcName" in event_detail:
event_detail["regionName"] = event_detail["srcName"]
else:
event_detail["regionName"] = device_name
# 设置默认的事件等级,高级-3其他为中级-2
if event_type in event_level["high"]:
event_detail["eventLvl"] = 3
elif "eventLvl" not in event_detail:
event_detail["eventLvl"] = 2
# 根据设备名称获取 user_ids
# 高级事件
if event_detail["eventLvl"] == 3:
user_all = user_map.get("all", {}).get("high", [])
# 接受全部通知的用户
user_ids = user_map.get(device_name, {}).get("high", [])
user_ids = user_ids + user_all
# 除高级外均为中级
else:
user_all = user_map.get("all", {}).get("all", [])
# 接受全部通知的用户
user_ids = user_map.get(device_name, {}).get("all", [])
user_ids = user_ids + user_all
event_detail["userIds"] = user_ids
# 如果存在 data 属性
if "data" in event_detail:
# 替换分析结果字段
if "eventType" in event_detail["data"]:
# 事件的分析结果字段名称
detection_field_name = event_detail["data"]["eventType"]
logging.info(f"DFF{detection_field_name}")
if detection_field_name in event_detail["data"]:
# 不同事件的分析结果字段名称不同将字段名称替换为_detectionResult
event_detail["data"]["_detectionResult"] = (
event_detail["data"].pop(detection_field_name)
)
# 如果_detectionResult类型为数组则将该数组第一项赋值给_detectionResult
if isinstance(
event_detail["data"]["_detectionResult"], list
) and len(event_detail["data"]["_detectionResult"]):
event_detail["data"]["_detectionResult"] = (
event_detail["data"]["_detectionResult"][0]
)
# 如果typeName字段不存在则将事件类型名称赋值给typeName字段
if "typeName" not in event_detail["data"]["_detectionResult"]:
event_detail["data"]["_detectionResult"]["typeName"] = event_type_str
# 替换图片链接url的host
replace_image_host(
event_detail["data"]["_detectionResult"]
)
logging.info(event_detail)
return event_detail
async def push_loop():
logging.basicConfig(level=logging.INFO)
config = read_config()
redis_client = await aioredis.Redis(
host="127.0.0.1", port=7019, password="SMHdFrlK"
)
push_url = config["push_url"]
while True:
try:
data = await redis_client.brpop("hik-sub-event")
sub_json = json.loads(data[1].decode("utf-8"))
events = sub_json["params"]["events"]
for event in events:
notify_json = None
# AI事件
if event["srcType"] == "eventRule":
event_details = event["eventDetails"]
if len(event_details):
event_detail = event_details[0]
event_detail["eventLvl"] = event["eventLvl"]
event_detail["happenTime"] = event["happenTime"]
notify_json = handle_event_detail(event_detail, config)
else:
notify_json = handle_event_detail(event, config)
if notify_json is not None:
push_notification(push_url, notify_json)
else:
logging.error("No event detected")
except Exception as e:
logging.error("error: ", e)
continue
finally:
await asyncio.sleep(0.5)
def run_app():
asyncio.run(push_loop())
if __name__ == "__main__":
asyncio.run(push_loop())