Files
hik-push/hik_push/push_notification.py
2024-04-11 09:57:27 +08:00

183 lines
6.8 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import asyncio
import json
import logging
from pathlib import Path
import requests
import yaml
from redis import asyncio as aioredis
from redis.asyncio import Redis
def read_config():
# 读取配置文件
home_dir = Path.home()
with open(home_dir / ".config" / "hik-push" / "config.yaml", "r", encoding="utf-8") as f:
config = yaml.safe_load(f)
if "user_ids" not in config:
raise Exception("请配置user_ids")
if "devices" not in config:
raise Exception("请配置devices")
if "event_type" not in config:
raise Exception("请配置event_type")
if "push_url" not in config:
raise Exception("请配置push_url")
if "event_level" not in config:
raise Exception("请配置event_level")
logging.info(config)
return config
def replace_image_host(detection_data):
if "imageUrl" in detection_data:
detection_data["imageUrl"] = detection_data["imageUrl"].replace(
"192.168.1.250", "192.168.11.180"
)
if "visiblePicUrl" in detection_data:
detection_data["visiblePicUrl"] = detection_data["visiblePicUrl"].replace(
"192.168.1.250", "192.168.11.180"
)
def push_notification(push_url: str, notify_content: dict):
try:
push_resp = requests.post(push_url, json=notify_content).content.decode(
"utf-8"
)
logging.info(push_resp)
except Exception as e:
logging.error(f"网络错误推送失败: {e}")
async def handle_event_detail(event_detail: dict, config: dict, redis: Redis):
user_map = config["user_ids"]
device_map = config["devices"]
event_map = config["event_type"]
event_level = config["event_level"]
# 事件类型 id 替换为 事件名称字符串
event_type = event_detail["eventType"]
event_type_str = event_map.get(
event_type, "未知事件类型"
)
event_detail["eventType"] = event_type_str
# 添加设备名称
src_index = event_detail["srcIndex"]
device_name = device_map.get(src_index, "未知设备")
event_detail["deviceName"] = device_name
# 从redis中查询最近是否发生过该事件
happen_recent = await redis.get(name=f"hik-push-interval:{device_name}:{event_type}")
# 如果同一地点最近发生过同一事件则不推送
if happen_recent is not None:
return
# 如果事件类型为区域入侵或吸烟or打电话
if event_type == 131588 or event_type == 422400002:
await redis.set(name=f"hik-push-interval:{device_name}:{event_type}", value=event_detail["happenTime"], ex=80)
elif event_type == 192518 or event_type == 192517:
await redis.set(name=f"hik-push-interval:{device_name}:{event_type}", value=event_detail["happenTime"], ex=60)
# 添加regionName
if "srcName" in event_detail:
event_detail["regionName"] = event_detail["srcName"]
else:
event_detail["regionName"] = device_name
# 设置默认的事件等级,高级-3其他为中级-2
if event_type in event_level["high"]:
event_detail["eventLvl"] = 3
elif "eventLvl" not in event_detail:
event_detail["eventLvl"] = 2
# 根据设备名称获取 user_ids
# 高级事件
if event_detail["eventLvl"] == 3:
user_all = user_map.get("all", {}).get("high", []) + user_map.get("all", {}).get("all", [])
# 接受全部通知的用户
user_ids = user_map.get(device_name, {}).get("high", []) + user_map.get(device_name, {}).get("all", [])
user_ids = user_ids + user_all
# 除高级外均为中级
else:
user_all = user_map.get("all", {}).get("all", [])
# 接受全部通知的用户
user_ids = user_map.get(device_name, {}).get("all", [])
user_ids = user_ids + user_all
event_detail["userIds"] = user_ids
# 如果存在 data 属性
if "data" in event_detail:
# 替换分析结果字段
if "eventType" in event_detail["data"]:
# 事件的分析结果字段名称
detection_field_name = event_detail["data"]["eventType"]
if detection_field_name in event_detail["data"]:
# 不同事件的分析结果字段名称不同将字段名称替换为_detectionResult
event_detail["data"]["_detectionResult"] = (
event_detail["data"].pop(detection_field_name)
)
# 如果_detectionResult类型为数组则将该数组第一项赋值给_detectionResult
if isinstance(
event_detail["data"]["_detectionResult"], list
) and len(event_detail["data"]["_detectionResult"]):
event_detail["data"]["_detectionResult"] = (
event_detail["data"]["_detectionResult"][0]
)
# 如果typeName字段不存在则将事件类型名称赋值给typeName字段
if "typeName" not in event_detail["data"]["_detectionResult"]:
event_detail["data"]["_detectionResult"]["typeName"] = event_type_str
# 替换图片链接url的host
replace_image_host(
event_detail["data"]["_detectionResult"]
)
logging.info(event_detail)
return event_detail
async def push_loop():
logging.basicConfig(level=logging.INFO)
config = read_config()
redis_client = await aioredis.Redis(
host="127.0.0.1",
port=7019,
password="SMHdFrlK"
)
push_url = config["push_url"]
while True:
try:
data = await redis_client.brpop("hik-sub-event")
sub_json = json.loads(data[1].decode("utf-8"))
events = sub_json["params"]["events"]
for event in events:
notify_json = None
# AI事件
if event["srcType"] == "eventRule":
event_details = event["eventDetails"]
if len(event_details):
event_detail = event_details[0]
event_detail["eventLvl"] = event["eventLvl"]
event_detail["happenTime"] = event["happenTime"]
notify_json = await handle_event_detail(event_detail, config, redis_client)
else:
notify_json = await handle_event_detail(event, config, redis_client)
if notify_json is not None:
push_notification(push_url, notify_json)
else:
logging.error("事件间隔小于80秒")
except Exception as e:
logging.error("error: ", e)
continue
finally:
await asyncio.sleep(0.5)
def run_app():
asyncio.run(push_loop())
if __name__ == "__main__":
asyncio.run(push_loop())