Files
hik-push/hik_push/read_event.py
quantulr 31b4ef217b update
2024-02-23 10:11:42 +08:00

117 lines
5.2 KiB
Python

import asyncio
import json
from pathlib import Path
import logging
import yaml
import requests
from redis import asyncio as aioredis
async def read_event():
logging.basicConfig(level=logging.INFO)
# 读取配置文件
home_dir = Path.home()
with open(home_dir / '.config' / 'hik-push' / 'config.yaml', 'r', encoding='utf-8') as f:
config = yaml.safe_load(f)
if "user_ids" not in config:
logging.error("请配置user_ids")
return
if "devices" not in config:
logging.error("请配置devices")
return
if "event_type" not in config:
logging.error("请配置event_type")
return
if "push_url" not in config:
logging.error("请配置push_url")
return
logging.info(config)
user_map = config['user_ids']
device_map = config['devices']
event_map = config['event_type']
push_url = config['push_url']
redis_client = await aioredis.Redis(host="127.0.0.1", port=7019, password="SMHdFrlK")
while True:
try:
data = await redis_client.brpop("hik-sub-event")
sub_json = json.loads(data[1].decode('utf-8'))
events = sub_json["params"]["events"]
for event in events:
# 如果是ai事件
if event["srcType"] == "eventRule":
event_details = event["eventDetails"]
for event_detail in event_details:
# 事件类型 id 替换为 str
event_type_str = event_map.get(event_detail['eventType'], "未知事件类型")
event_detail['eventType'] = event_type_str
# 添加设备名称
src_index = event_detail["srcIndex"]
device_name = device_map.get(src_index, "未知设备")
event_detail['deviceName'] = device_name
# 根据设备名称获取 user_ids
user_all = user_map.get("all", []) # 接受全部通知的用户
user_ids = user_map.get(device_name, [])
user_ids = user_ids + user_all
event_detail['userIds'] = user_ids
# 添加其他字段
event_detail["eventLvl"] = event['eventLvl']
event_detail["happenTime"] = event['happenTime']
# 如果存在 data 属性
if "data" in event_detail:
# 替换分析结果字段
if "eventType" in event_detail['data']:
detection_field_name = event_detail['data']['eventType']
if detection_field_name in event_detail['data']:
event_detail['data']["_detectionResult"] = event_detail['data'].pop(
detection_field_name)
logging.info(event_detail)
try:
push_resp = requests.post(push_url, json=event_detail).content.decode('utf-8')
logging.info(push_resp)
except Exception as e:
logging.error(f"网络错误推送失败: {e}")
else:
# 将 eventType ID 替换为中文字符串
event_type = event['eventType']
logging.info(event_type)
event_type_str = event_map.get(event_type, "未知事件类型")
event['eventType'] = event_type_str
# 添加设备名称
src_index = event["srcIndex"]
device_name = device_map.get(src_index, "未知设备")
# 根据设备名称获取 user_ids
user_all = user_map.get("all", []) # 接受全部通知的用户
user_ids = user_map.get(device_name, [])
user_ids = user_ids + user_all
event['deviceName'] = device_name
event['userIds'] = user_ids
if "data" in event:
# 如果存在 data 属性
# 替换分析结果字段
if "eventType" in event['data']:
detection_field_name = event['data']['eventType']
if detection_field_name in event['data']:
event['data']["_detectionResult"] = event['data'].pop(detection_field_name)
# 请求推送 api
logging.info(event)
try:
push_resp = requests.post(push_url, json=event).content.decode('utf-8')
logging.info(push_resp)
except Exception as e:
logging.error(f"网络错误推送失败: {e}")
except Exception as e:
logging.error("error: ", e)
continue
finally:
await asyncio.sleep(0.5)
def run_app():
asyncio.run(read_event())
if __name__ == "__main__":
asyncio.run(read_event())