This commit is contained in:
quantulr
2024-01-23 17:31:56 +08:00
parent 159d040089
commit e4c6a9230d
8 changed files with 429 additions and 21 deletions

82
Cargo.lock generated
View File

@ -149,6 +149,20 @@ version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
[[package]]
name = "combine"
version = "4.6.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "35ed6e9d84f0b51a7f52daf1c7d71dd136fd7a3f41a8462b8cdb8c78d920fad4"
dependencies = [
"bytes",
"futures-core",
"memchr",
"pin-project-lite",
"tokio",
"tokio-util",
]
[[package]]
name = "core-foundation"
version = "0.9.4"
@ -266,6 +280,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48"
dependencies = [
"futures-core",
"futures-sink",
"futures-task",
"pin-project-lite",
"pin-utils",
@ -333,6 +348,8 @@ version = "0.1.0"
dependencies = [
"axum",
"dotenvy",
"log",
"redis",
"reqwest",
"serde",
"serde_json",
@ -425,7 +442,7 @@ dependencies = [
"httpdate",
"itoa",
"pin-project-lite",
"socket2",
"socket2 0.5.5",
"tokio",
"tower-service",
"tracing",
@ -477,7 +494,7 @@ dependencies = [
"http-body 1.0.0",
"hyper 1.1.0",
"pin-project-lite",
"socket2",
"socket2 0.5.5",
"tokio",
"tracing",
]
@ -767,6 +784,27 @@ dependencies = [
"proc-macro2",
]
[[package]]
name = "redis"
version = "0.24.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c580d9cbbe1d1b479e8d67cf9daf6a62c957e6846048408b80b43ac3f6af84cd"
dependencies = [
"async-trait",
"bytes",
"combine",
"futures-util",
"itoa",
"percent-encoding",
"pin-project-lite",
"ryu",
"sha1_smol",
"socket2 0.4.10",
"tokio",
"tokio-util",
"url",
]
[[package]]
name = "redox_syscall"
version = "0.4.1"
@ -936,6 +974,12 @@ dependencies = [
"serde",
]
[[package]]
name = "sha1_smol"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ae1a47186c03a32177042e55dbc5fd5aee900b8e0069a8d70fba96a9375cd012"
[[package]]
name = "signal-hook-registry"
version = "1.4.1"
@ -960,6 +1004,16 @@ version = "1.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2593d31f82ead8df961d8bd23a64c2ccf2eb5dd34b0a34bfb4dd54011c72009e"
[[package]]
name = "socket2"
version = "0.4.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9f7916fc008ca5542385b89a3d3ce689953c143e9304a9bf8beec1de48994c0d"
dependencies = [
"libc",
"winapi",
]
[[package]]
name = "socket2"
version = "0.5.5"
@ -1050,7 +1104,7 @@ dependencies = [
"parking_lot",
"pin-project-lite",
"signal-hook-registry",
"socket2",
"socket2 0.5.5",
"tokio-macros",
"windows-sys 0.48.0",
]
@ -1273,6 +1327,28 @@ dependencies = [
"wasm-bindgen",
]
[[package]]
name = "winapi"
version = "0.3.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419"
dependencies = [
"winapi-i686-pc-windows-gnu",
"winapi-x86_64-pc-windows-gnu",
]
[[package]]
name = "winapi-i686-pc-windows-gnu"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6"
[[package]]
name = "winapi-x86_64-pc-windows-gnu"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f"
[[package]]
name = "windows-sys"
version = "0.48.0"

View File

@ -13,3 +13,5 @@ tower = "0.4.13"
serde = { version = "1.0.195", features = ["derive"] }
serde_json = "1.0.111"
dotenvy = "0.15.7"
log = "0.4.20"
redis = { version = "0.24.0", features = ["aio", "tokio-comp", "streams"] }

View File

@ -5,13 +5,62 @@ use axum::{
Json, Router,
};
use dotenvy::dotenv;
use serde_json::Value;
use redis::{AsyncCommands, Value};
use redis::streams::{StreamReadOptions, StreamReadReply};
use tokio::sync::mpsc;
use crate::event_type::get_event_type_map;
use typing::EventMessage;
use crate::typing::area_invasion::AreaInvasion;
use crate::typing::smoke_detection::SmokeDetection;
use crate::typing::temperature_alarm::TemperatureAlarm;
mod event_type;
mod typing;
mod test;
async fn consume_messages(mut redis_client: redis::aio::Connection, tx: mpsc::Sender<String>) {
// 假设使用的是消费者组名为 "my_consumer_group"
let consumer_group = "my_consumer_group";
// redis_client.xgroup_create((), (), ());
// 消费者从 Stream 中获取消息
let stream_name = "my_stream";
let consumer_name = "my_consumer";
let opts = StreamReadOptions::default().group(consumer_group, consumer_name).block(0).count(1);
loop {
// 使用 XREAD 命令从 Redis Stream 获取消息
let result: Option<StreamReadReply> = redis_client
.xread_options(&["my_stream"], &["0"], &opts)
.await
.expect("Failed to read from stream");
if let Some(reply) = result {
for stream_key in reply.keys {
println!("->> xread block: {}", stream_key.key);
for stream_id in stream_key.ids {
println!(" ->> StreamId: {:?}", stream_id);
// redis_client.xack::<&[&str; 1], &str, i32, RV>(&["my_stream"], consumer_group, &[0]);
}
}
println!();
}
// for (_, messages) in result {
// println!("hello");
// }
// for (_, messages) in result {
// for message in messages {
// for (field, value) in message {
// if field == "message" {
// // 将消息发送到通道
// tx.send(value).await.expect("Failed to send message");
// }
// }
// }
// }
}
}
async fn event_rcv_handle(
Json(form_data): Json<EventMessage>,
@ -24,22 +73,32 @@ async fn event_rcv_handle(
None => String::from("未知事件类型"),
Some(name) => name.to_string(),
};
if event.data.is_some() {
// 区域入侵
if event.event_type.eq(&131588) {
let area_invasion_data: AreaInvasion = serde_json::from_value(event.data.clone().unwrap()).unwrap();
println!("{}", area_invasion_data.ip_address);
}
// 温度报警
else if event.event_type.eq(&192517) {
let temperature_alarm_data: TemperatureAlarm = serde_json::from_value(event.data.clone().unwrap()).unwrap();
println!("{}", temperature_alarm_data.ip_address);
}
// 烟雾检测
else if event.event_type.eq(&192513) {
let smoke_detection: SmokeDetection = serde_json::from_value(event.data.clone().unwrap()).unwrap();
println!("{}", smoke_detection.ip_address);
}
}
println!("{}", event_type);
let _ = reqwest::get(format!("https://www.feishu.cn/flow/api/trigger-webhook/31f13dead0bf78fc4bdb51ba23abba9f?title={}&content=发生时间:{}", event_type, event.happen_time)).await;
}
});
Ok(String::from("success"))
}
async fn event_rcv_handle_v2(
Json(form_data): Json<Value>,
) -> Result<String, (StatusCode, String)> {
println!("{}", form_data.to_string());
tokio::spawn(async move {
println!("收到告警,向飞书推送消息");
let _ = reqwest::get(format!("https://www.feishu.cn/flow/api/trigger-webhook/31f13dead0bf78fc4bdb51ba23abba9f?title={}&content={}", "收到告警", "告警内容")).await;
});
Ok(String::from("success"))
}
#[tokio::main]
async fn main() {
@ -47,10 +106,17 @@ async fn main() {
dotenv().expect(".env file not found");
let port = env::var("PORT").expect("PORT is not set in .env file");
let redis_client = redis::Client::open("redis://127.0.0.1/").expect("Failed to create Redis client");
let redis_connection = redis_client.get_async_connection().await.expect("");
// 创建消息通道
let (tx, rx) = mpsc::channel::<String>(100);
let consumer = tokio::spawn(consume_messages(redis_connection, tx.clone()));
// build our application with a single route
let app = Router::new()
.route("/", get(|| async { "Hello, World!" }))
.route("/eventRcv", post(event_rcv_handle_v2));
.route("/eventRcv", post(event_rcv_handle));
// run our app with hyper, listening globally on port 3000
let listener = tokio::net::TcpListener::bind(format!("0.0.0.0:{}", port))

14
src/test.rs Normal file
View File

@ -0,0 +1,14 @@
// [
// bulk(
// string-data('"my_stream"'),
// bulk(
// bulk(
// string-data('"1705904296739-0"'),
// bulk(
// string-data('"json"'),
// string-data('"{}"')
// )
// )
// )
// )
// ]

View File

@ -1,7 +1,8 @@
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize)]
pub struct EventData {}
use serde_json::Value;
pub mod area_invasion;
pub mod temperature_alarm;
pub mod smoke_detection;
#[derive(Serialize, Deserialize)]
pub struct Event {
@ -23,11 +24,11 @@ pub struct Event {
#[serde(rename = "eventLvl")]
pub event_lvl: Option<i64>,
pub timeout: i64,
pub data: Option<EventData>,
pub data: Option<Value>,
}
#[derive(Serialize, Deserialize)]
pub struct Params {
pub struct EventParams {
#[serde(rename = "sendTime")]
pub send_time: String,
pub ability: String,
@ -38,5 +39,5 @@ pub struct Params {
#[derive(Serialize, Deserialize)]
pub struct EventMessage {
pub method: String,
pub params: Params,
pub params: EventParams,
}

View File

@ -0,0 +1,66 @@
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize)]
pub struct RegionCoordinatesList {
#[serde(rename = "positionX")]
pub position_x: Option<f64>,
#[serde(rename = "positionY")]
pub position_y: Option<f64>,
}
#[derive(Serialize, Deserialize)]
pub struct TargetAttrs {
#[serde(rename = "imageServerCode")]
pub image_server_code: Option<String>,
#[serde(rename = "deviceIndexCode")]
pub device_index_code: String,
#[serde(rename = "cameraIndexCode")]
pub camera_index_code: String,
#[serde(rename = "channelName")]
pub channel_name: String,
#[serde(rename = "cameraAddress")]
pub camera_address: Option<String>,
pub longitude: Option<f64>,
pub latitude: Option<f64>,
}
#[derive(Serialize, Deserialize)]
pub struct FieldDetection {
#[serde(rename = "targetAttrs")]
pub target_attrs: Option<TargetAttrs>,
#[serde(rename = "imageUrl")]
pub image_url: Option<String>,
pub duration: Option<i64>,
#[serde(rename = "sensitivityLevel")]
pub sensitivity_level: Option<i64>,
pub rate: Option<i64>,
#[serde(rename = "detectionTarget")]
pub detection_target: Option<i64>,
#[serde(rename = "regionCoordinatesList")]
pub region_coordinates_list: Vec<RegionCoordinatesList>,
}
// 区域入侵事件data
#[derive(Serialize, Deserialize)]
pub struct AreaInvasion {
#[serde(rename = "dataType")]
pub data_type: String,
#[serde(rename = "recvTime")]
pub recv_time: String,
#[serde(rename = "sendTime")]
pub send_time: String,
#[serde(rename = "dateTime")]
pub date_time: String,
#[serde(rename = "ipAddress")]
pub ip_address: String,
#[serde(rename = "portNo")]
pub port_no: i64,
#[serde(rename = "channelID")]
pub channel_id: i64,
#[serde(rename = "eventType")]
pub event_type: String,
#[serde(rename = "eventDescription")]
pub event_description: String,
pub fielddetection: Vec<FieldDetection>,
}

View File

@ -0,0 +1,91 @@
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize)]
pub struct PtzInfo {
pub pan: String,
pub tilt: String,
pub zoom: String,
pub focus: String,
}
#[derive(Serialize, Deserialize)]
pub struct AbsoluteHigh {
pub elevation: String,
pub azimuth: String,
#[serde(rename = "absoluteZoom")]
pub absolute_zoom: String,
}
#[derive(Serialize, Deserialize)]
pub struct SmokeRegion {
pub x: String,
pub y: String,
pub width: String,
pub height: String,
}
#[derive(Serialize, Deserialize)]
pub struct RegionCoordinates {
#[serde(rename = "positionX")]
pub position_x: String,
#[serde(rename = "positionY")]
pub position_y: String,
}
#[derive(Serialize, Deserialize)]
pub struct TargetAttrs {
#[serde(rename = "imageServerCode")]
pub image_server_code: String,
#[serde(rename = "deviceIndexCode")]
pub device_index_code: String,
#[serde(rename = "cameraIndexCode")]
pub camera_index_code: String,
#[serde(rename = "cameraAddress")]
pub camera_address: String,
pub longitude: f64,
pub latitude: f64,
}
#[derive(Serialize, Deserialize)]
pub struct Detection {
#[serde(rename = "targetAttrs")]
pub target_attrs: TargetAttrs,
#[serde(rename = "regionID")]
pub region_id: i64,
#[serde(rename = "regionCoordinatesList")]
pub region_coordinates_list: Vec<RegionCoordinates>,
#[serde(rename = "smokeRegion")]
pub smoke_region: SmokeRegion,
#[serde(rename = "absoluteHigh")]
pub absolute_high: AbsoluteHigh,
#[serde(rename = "ptzInfo")]
pub ptz_info: PtzInfo,
#[serde(rename = "imageUrl")]
pub image_url: String,
#[serde(rename = "visiblePicUrl")]
pub visible_pic_url: String,
}
#[derive(Serialize, Deserialize)]
pub struct SmokeDetection {
#[serde(rename = "dataType")]
pub data_type: String,
#[serde(rename = "recvTime")]
pub recv_time: String,
#[serde(rename = "sendTime")]
pub send_time: String,
#[serde(rename = "dateTime")]
pub date_time: String,
#[serde(rename = "ipAddress")]
pub ip_address: String,
#[serde(rename = "portNo")]
pub port_no: i64,
#[serde(rename = "channelID")]
pub channel_id: i64,
#[serde(rename = "eventType")]
pub event_type: String,
#[serde(rename = "eventDescription")]
pub event_description: String,
#[serde(rename = "smokeDetection")]
pub smoke_detection: Vec<Detection>,
}

View File

@ -0,0 +1,92 @@
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize)]
pub struct HighesPoint {
pub x: String,
pub y: String,
}
#[derive(Serialize, Deserialize)]
pub struct PtzInfo {
pub pan: String,
pub tilt: String,
pub zoom: String,
pub focus: String,
}
#[derive(Serialize, Deserialize)]
pub struct TargetAttrs {
#[serde(rename = "imageServerCode")]
pub image_server_code: Option<String>,
#[serde(rename = "deviceIndexCode")]
pub device_index_code: String,
#[serde(rename = "cameraIndexCode")]
pub camera_index_code: String,
#[serde(rename = "cameraAddress")]
pub camera_address: Option<String>,
pub longitude: Option<f64>,
pub latitude: Option<f64>,
}
#[derive(Serialize, Deserialize)]
pub struct Thermometry {
#[serde(rename = "targetAttrs")]
pub target_attrs: TargetAttrs,
#[serde(rename = "presetNo")]
pub preset_no: i64,
#[serde(rename = "alarmLevel")]
pub alarm_level: i64,
#[serde(rename = "alarmType")]
pub alarm_type: i64,
#[serde(rename = "alarmRule")]
pub alarm_rule: i64,
#[serde(rename = "ruleCalibType")]
pub rule_calib_type: i64,
#[serde(rename = "ruleTemperature")]
pub rule_temperature: String,
#[serde(rename = "curTemperature")]
pub cur_temperature: String,
#[serde(rename = "ptzInfo")]
pub ptz_info: PtzInfo,
#[serde(rename = "imageUrl")]
pub image_url: String,
#[serde(rename = "visiblePicUrl")]
pub visible_pic_url: String,
#[serde(rename = "thermometryUnit")]
pub thermometry_unit: i64,
#[serde(rename = "toleranceTemperature")]
pub tolerance_temperature: String,
#[serde(rename = "alertFilterTime")]
pub alert_filter_time: i64,
#[serde(rename = "alarmFilterTime")]
pub alarm_filter_time: i64,
#[serde(rename = "ruleId")]
pub rule_id: i64,
pub point: Vec<HighesPoint>,
#[serde(rename = "highesPoint")]
pub highes_point: HighesPoint,
}
#[derive(Serialize, Deserialize)]
pub struct TemperatureAlarm {
#[serde(rename = "dataType")]
pub data_type: String,
#[serde(rename = "recvTime")]
pub recv_time: String,
#[serde(rename = "sendTime")]
pub send_time: String,
#[serde(rename = "dateTime")]
pub date_time: String,
#[serde(rename = "ipAddress")]
pub ip_address: String,
#[serde(rename = "portNo")]
pub port_no: i64,
#[serde(rename = "channelID")]
pub channel_id: i64,
#[serde(rename = "eventType")]
pub event_type: String,
#[serde(rename = "eventDescription")]
pub event_description: String,
#[serde(rename = " thermometry")]
pub _thermometry: Vec<Thermometry>,
}