diff --git a/Cargo.lock b/Cargo.lock index 361eded..ec55b20 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -149,6 +149,20 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" +[[package]] +name = "combine" +version = "4.6.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "35ed6e9d84f0b51a7f52daf1c7d71dd136fd7a3f41a8462b8cdb8c78d920fad4" +dependencies = [ + "bytes", + "futures-core", + "memchr", + "pin-project-lite", + "tokio", + "tokio-util", +] + [[package]] name = "core-foundation" version = "0.9.4" @@ -266,6 +280,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48" dependencies = [ "futures-core", + "futures-sink", "futures-task", "pin-project-lite", "pin-utils", @@ -333,6 +348,8 @@ version = "0.1.0" dependencies = [ "axum", "dotenvy", + "log", + "redis", "reqwest", "serde", "serde_json", @@ -425,7 +442,7 @@ dependencies = [ "httpdate", "itoa", "pin-project-lite", - "socket2", + "socket2 0.5.5", "tokio", "tower-service", "tracing", @@ -477,7 +494,7 @@ dependencies = [ "http-body 1.0.0", "hyper 1.1.0", "pin-project-lite", - "socket2", + "socket2 0.5.5", "tokio", "tracing", ] @@ -767,6 +784,27 @@ dependencies = [ "proc-macro2", ] +[[package]] +name = "redis" +version = "0.24.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c580d9cbbe1d1b479e8d67cf9daf6a62c957e6846048408b80b43ac3f6af84cd" +dependencies = [ + "async-trait", + "bytes", + "combine", + "futures-util", + "itoa", + "percent-encoding", + "pin-project-lite", + "ryu", + "sha1_smol", + "socket2 0.4.10", + "tokio", + "tokio-util", + "url", +] + [[package]] name = "redox_syscall" version = "0.4.1" @@ -936,6 +974,12 @@ dependencies = [ "serde", ] +[[package]] +name = "sha1_smol" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ae1a47186c03a32177042e55dbc5fd5aee900b8e0069a8d70fba96a9375cd012" + [[package]] name = "signal-hook-registry" version = "1.4.1" @@ -960,6 +1004,16 @@ version = "1.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2593d31f82ead8df961d8bd23a64c2ccf2eb5dd34b0a34bfb4dd54011c72009e" +[[package]] +name = "socket2" +version = "0.4.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9f7916fc008ca5542385b89a3d3ce689953c143e9304a9bf8beec1de48994c0d" +dependencies = [ + "libc", + "winapi", +] + [[package]] name = "socket2" version = "0.5.5" @@ -1050,7 +1104,7 @@ dependencies = [ "parking_lot", "pin-project-lite", "signal-hook-registry", - "socket2", + "socket2 0.5.5", "tokio-macros", "windows-sys 0.48.0", ] @@ -1273,6 +1327,28 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "winapi" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419" +dependencies = [ + "winapi-i686-pc-windows-gnu", + "winapi-x86_64-pc-windows-gnu", +] + +[[package]] +name = "winapi-i686-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" + +[[package]] +name = "winapi-x86_64-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" + [[package]] name = "windows-sys" version = "0.48.0" diff --git a/Cargo.toml b/Cargo.toml index 299928c..615d734 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,3 +13,5 @@ tower = "0.4.13" serde = { version = "1.0.195", features = ["derive"] } serde_json = "1.0.111" dotenvy = "0.15.7" +log = "0.4.20" +redis = { version = "0.24.0", features = ["aio", "tokio-comp", "streams"] } diff --git a/src/main.rs b/src/main.rs index 19b7c2f..00f9655 100644 --- a/src/main.rs +++ b/src/main.rs @@ -5,13 +5,62 @@ use axum::{ Json, Router, }; use dotenvy::dotenv; -use serde_json::Value; +use redis::{AsyncCommands, Value}; +use redis::streams::{StreamReadOptions, StreamReadReply}; +use tokio::sync::mpsc; use crate::event_type::get_event_type_map; use typing::EventMessage; +use crate::typing::area_invasion::AreaInvasion; +use crate::typing::smoke_detection::SmokeDetection; +use crate::typing::temperature_alarm::TemperatureAlarm; mod event_type; mod typing; +mod test; + + +async fn consume_messages(mut redis_client: redis::aio::Connection, tx: mpsc::Sender) { + // 假设使用的是消费者组名为 "my_consumer_group" + let consumer_group = "my_consumer_group"; + // redis_client.xgroup_create((), (), ()); + // 消费者从 Stream 中获取消息 + let stream_name = "my_stream"; + let consumer_name = "my_consumer"; + let opts = StreamReadOptions::default().group(consumer_group, consumer_name).block(0).count(1); + loop { + // 使用 XREAD 命令从 Redis Stream 获取消息 + let result: Option = redis_client + .xread_options(&["my_stream"], &["0"], &opts) + .await + .expect("Failed to read from stream"); + + if let Some(reply) = result { + for stream_key in reply.keys { + println!("->> xread block: {}", stream_key.key); + for stream_id in stream_key.ids { + println!(" ->> StreamId: {:?}", stream_id); + // redis_client.xack::<&[&str; 1], &str, i32, RV>(&["my_stream"], consumer_group, &[0]); + } + } + println!(); + } + // for (_, messages) in result { + // println!("hello"); + // } + // for (_, messages) in result { + // for message in messages { + // for (field, value) in message { + // if field == "message" { + // // 将消息发送到通道 + // tx.send(value).await.expect("Failed to send message"); + // } + // } + // } + // } + } +} + async fn event_rcv_handle( Json(form_data): Json, @@ -24,22 +73,32 @@ async fn event_rcv_handle( None => String::from("未知事件类型"), Some(name) => name.to_string(), }; + + if event.data.is_some() { + // 区域入侵 + if event.event_type.eq(&131588) { + let area_invasion_data: AreaInvasion = serde_json::from_value(event.data.clone().unwrap()).unwrap(); + println!("{}", area_invasion_data.ip_address); + } + // 温度报警 + else if event.event_type.eq(&192517) { + let temperature_alarm_data: TemperatureAlarm = serde_json::from_value(event.data.clone().unwrap()).unwrap(); + println!("{}", temperature_alarm_data.ip_address); + } + // 烟雾检测 + else if event.event_type.eq(&192513) { + let smoke_detection: SmokeDetection = serde_json::from_value(event.data.clone().unwrap()).unwrap(); + println!("{}", smoke_detection.ip_address); + } + } + + println!("{}", event_type); let _ = reqwest::get(format!("https://www.feishu.cn/flow/api/trigger-webhook/31f13dead0bf78fc4bdb51ba23abba9f?title={}&content=发生时间:{}", event_type, event.happen_time)).await; } }); Ok(String::from("success")) } -async fn event_rcv_handle_v2( - Json(form_data): Json, -) -> Result { - println!("{}", form_data.to_string()); - tokio::spawn(async move { - println!("收到告警,向飞书推送消息"); - let _ = reqwest::get(format!("https://www.feishu.cn/flow/api/trigger-webhook/31f13dead0bf78fc4bdb51ba23abba9f?title={}&content={}", "收到告警", "告警内容")).await; - }); - Ok(String::from("success")) -} #[tokio::main] async fn main() { @@ -47,10 +106,17 @@ async fn main() { dotenv().expect(".env file not found"); let port = env::var("PORT").expect("PORT is not set in .env file"); + let redis_client = redis::Client::open("redis://127.0.0.1/").expect("Failed to create Redis client"); + let redis_connection = redis_client.get_async_connection().await.expect(""); + // 创建消息通道 + let (tx, rx) = mpsc::channel::(100); + + let consumer = tokio::spawn(consume_messages(redis_connection, tx.clone())); + // build our application with a single route let app = Router::new() .route("/", get(|| async { "Hello, World!" })) - .route("/eventRcv", post(event_rcv_handle_v2)); + .route("/eventRcv", post(event_rcv_handle)); // run our app with hyper, listening globally on port 3000 let listener = tokio::net::TcpListener::bind(format!("0.0.0.0:{}", port)) diff --git a/src/test.rs b/src/test.rs new file mode 100644 index 0000000..a89b748 --- /dev/null +++ b/src/test.rs @@ -0,0 +1,14 @@ +// [ +// bulk( +// string-data('"my_stream"'), +// bulk( +// bulk( +// string-data('"1705904296739-0"'), +// bulk( +// string-data('"json"'), +// string-data('"{}"') +// ) +// ) +// ) +// ) +// ] \ No newline at end of file diff --git a/src/typing.rs b/src/typing.rs index e44e24d..320099b 100644 --- a/src/typing.rs +++ b/src/typing.rs @@ -1,7 +1,8 @@ use serde::{Deserialize, Serialize}; - -#[derive(Serialize, Deserialize)] -pub struct EventData {} +use serde_json::Value; +pub mod area_invasion; +pub mod temperature_alarm; +pub mod smoke_detection; #[derive(Serialize, Deserialize)] pub struct Event { @@ -23,11 +24,11 @@ pub struct Event { #[serde(rename = "eventLvl")] pub event_lvl: Option, pub timeout: i64, - pub data: Option, + pub data: Option, } #[derive(Serialize, Deserialize)] -pub struct Params { +pub struct EventParams { #[serde(rename = "sendTime")] pub send_time: String, pub ability: String, @@ -38,5 +39,5 @@ pub struct Params { #[derive(Serialize, Deserialize)] pub struct EventMessage { pub method: String, - pub params: Params, + pub params: EventParams, } diff --git a/src/typing/area_invasion.rs b/src/typing/area_invasion.rs new file mode 100644 index 0000000..e1a72de --- /dev/null +++ b/src/typing/area_invasion.rs @@ -0,0 +1,66 @@ +use serde::{Deserialize, Serialize}; + +#[derive(Serialize, Deserialize)] +pub struct RegionCoordinatesList { + #[serde(rename = "positionX")] + pub position_x: Option, + #[serde(rename = "positionY")] + pub position_y: Option, +} + +#[derive(Serialize, Deserialize)] +pub struct TargetAttrs { + #[serde(rename = "imageServerCode")] + pub image_server_code: Option, + #[serde(rename = "deviceIndexCode")] + pub device_index_code: String, + #[serde(rename = "cameraIndexCode")] + pub camera_index_code: String, + #[serde(rename = "channelName")] + pub channel_name: String, + #[serde(rename = "cameraAddress")] + pub camera_address: Option, + pub longitude: Option, + pub latitude: Option, +} + +#[derive(Serialize, Deserialize)] +pub struct FieldDetection { + #[serde(rename = "targetAttrs")] + pub target_attrs: Option, + #[serde(rename = "imageUrl")] + pub image_url: Option, + pub duration: Option, + #[serde(rename = "sensitivityLevel")] + pub sensitivity_level: Option, + pub rate: Option, + #[serde(rename = "detectionTarget")] + pub detection_target: Option, + #[serde(rename = "regionCoordinatesList")] + pub region_coordinates_list: Vec, +} + + +// 区域入侵事件data +#[derive(Serialize, Deserialize)] +pub struct AreaInvasion { + #[serde(rename = "dataType")] + pub data_type: String, + #[serde(rename = "recvTime")] + pub recv_time: String, + #[serde(rename = "sendTime")] + pub send_time: String, + #[serde(rename = "dateTime")] + pub date_time: String, + #[serde(rename = "ipAddress")] + pub ip_address: String, + #[serde(rename = "portNo")] + pub port_no: i64, + #[serde(rename = "channelID")] + pub channel_id: i64, + #[serde(rename = "eventType")] + pub event_type: String, + #[serde(rename = "eventDescription")] + pub event_description: String, + pub fielddetection: Vec, +} diff --git a/src/typing/smoke_detection.rs b/src/typing/smoke_detection.rs new file mode 100644 index 0000000..5e28636 --- /dev/null +++ b/src/typing/smoke_detection.rs @@ -0,0 +1,91 @@ +use serde::{Deserialize, Serialize}; + +#[derive(Serialize, Deserialize)] +pub struct PtzInfo { + pub pan: String, + pub tilt: String, + pub zoom: String, + pub focus: String, +} + +#[derive(Serialize, Deserialize)] +pub struct AbsoluteHigh { + pub elevation: String, + pub azimuth: String, + #[serde(rename = "absoluteZoom")] + pub absolute_zoom: String, +} + +#[derive(Serialize, Deserialize)] +pub struct SmokeRegion { + pub x: String, + pub y: String, + pub width: String, + pub height: String, +} + +#[derive(Serialize, Deserialize)] +pub struct RegionCoordinates { + #[serde(rename = "positionX")] + pub position_x: String, + #[serde(rename = "positionY")] + pub position_y: String, +} + +#[derive(Serialize, Deserialize)] +pub struct TargetAttrs { + #[serde(rename = "imageServerCode")] + pub image_server_code: String, + #[serde(rename = "deviceIndexCode")] + pub device_index_code: String, + #[serde(rename = "cameraIndexCode")] + pub camera_index_code: String, + #[serde(rename = "cameraAddress")] + pub camera_address: String, + pub longitude: f64, + pub latitude: f64, +} + +#[derive(Serialize, Deserialize)] +pub struct Detection { + #[serde(rename = "targetAttrs")] + pub target_attrs: TargetAttrs, + #[serde(rename = "regionID")] + pub region_id: i64, + #[serde(rename = "regionCoordinatesList")] + pub region_coordinates_list: Vec, + #[serde(rename = "smokeRegion")] + pub smoke_region: SmokeRegion, + #[serde(rename = "absoluteHigh")] + pub absolute_high: AbsoluteHigh, + #[serde(rename = "ptzInfo")] + pub ptz_info: PtzInfo, + #[serde(rename = "imageUrl")] + pub image_url: String, + #[serde(rename = "visiblePicUrl")] + pub visible_pic_url: String, +} + +#[derive(Serialize, Deserialize)] +pub struct SmokeDetection { + #[serde(rename = "dataType")] + pub data_type: String, + #[serde(rename = "recvTime")] + pub recv_time: String, + #[serde(rename = "sendTime")] + pub send_time: String, + #[serde(rename = "dateTime")] + pub date_time: String, + #[serde(rename = "ipAddress")] + pub ip_address: String, + #[serde(rename = "portNo")] + pub port_no: i64, + #[serde(rename = "channelID")] + pub channel_id: i64, + #[serde(rename = "eventType")] + pub event_type: String, + #[serde(rename = "eventDescription")] + pub event_description: String, + #[serde(rename = "smokeDetection")] + pub smoke_detection: Vec, +} \ No newline at end of file diff --git a/src/typing/temperature_alarm.rs b/src/typing/temperature_alarm.rs new file mode 100644 index 0000000..2c2ac49 --- /dev/null +++ b/src/typing/temperature_alarm.rs @@ -0,0 +1,92 @@ +use serde::{Deserialize, Serialize}; + +#[derive(Serialize, Deserialize)] +pub struct HighesPoint { + pub x: String, + pub y: String, +} + +#[derive(Serialize, Deserialize)] +pub struct PtzInfo { + pub pan: String, + pub tilt: String, + pub zoom: String, + pub focus: String, +} + +#[derive(Serialize, Deserialize)] +pub struct TargetAttrs { + #[serde(rename = "imageServerCode")] + pub image_server_code: Option, + #[serde(rename = "deviceIndexCode")] + pub device_index_code: String, + #[serde(rename = "cameraIndexCode")] + pub camera_index_code: String, + #[serde(rename = "cameraAddress")] + pub camera_address: Option, + pub longitude: Option, + pub latitude: Option, +} + +#[derive(Serialize, Deserialize)] +pub struct Thermometry { + #[serde(rename = "targetAttrs")] + pub target_attrs: TargetAttrs, + #[serde(rename = "presetNo")] + pub preset_no: i64, + #[serde(rename = "alarmLevel")] + pub alarm_level: i64, + #[serde(rename = "alarmType")] + pub alarm_type: i64, + #[serde(rename = "alarmRule")] + pub alarm_rule: i64, + #[serde(rename = "ruleCalibType")] + pub rule_calib_type: i64, + #[serde(rename = "ruleTemperature")] + pub rule_temperature: String, + #[serde(rename = "curTemperature")] + pub cur_temperature: String, + #[serde(rename = "ptzInfo")] + pub ptz_info: PtzInfo, + #[serde(rename = "imageUrl")] + pub image_url: String, + #[serde(rename = "visiblePicUrl")] + pub visible_pic_url: String, + #[serde(rename = "thermometryUnit")] + pub thermometry_unit: i64, + #[serde(rename = "toleranceTemperature")] + pub tolerance_temperature: String, + #[serde(rename = "alertFilterTime")] + pub alert_filter_time: i64, + #[serde(rename = "alarmFilterTime")] + pub alarm_filter_time: i64, + #[serde(rename = "ruleId")] + pub rule_id: i64, + pub point: Vec, + #[serde(rename = "highesPoint")] + pub highes_point: HighesPoint, +} + +#[derive(Serialize, Deserialize)] +pub struct TemperatureAlarm { + #[serde(rename = "dataType")] + pub data_type: String, + #[serde(rename = "recvTime")] + pub recv_time: String, + #[serde(rename = "sendTime")] + pub send_time: String, + #[serde(rename = "dateTime")] + pub date_time: String, + #[serde(rename = "ipAddress")] + pub ip_address: String, + #[serde(rename = "portNo")] + pub port_no: i64, + #[serde(rename = "channelID")] + pub channel_id: i64, + #[serde(rename = "eventType")] + pub event_type: String, + #[serde(rename = "eventDescription")] + pub event_description: String, + #[serde(rename = " thermometry")] + pub _thermometry: Vec, +} \ No newline at end of file