150 lines
4.3 KiB
Rust
150 lines
4.3 KiB
Rust
use crypto_helper::crypto::Crypto;
|
|
use mqtt::{Event, ConnectionError};
|
|
use std::{time::Duration, cell::RefCell};
|
|
extern crate rumqttc as mqtt;
|
|
|
|
use serde::{Deserialize, Serialize};
|
|
|
|
pub type TimeCallback<T> = fn(u32, &T) -> Result<(),()>;
|
|
|
|
#[derive(Serialize, Deserialize, PartialEq)]
|
|
enum MqttMessageKind {
|
|
Time,
|
|
Confirm,
|
|
}
|
|
|
|
#[derive(Serialize, Deserialize)]
|
|
struct MqttMessage {
|
|
id: uuid::Uuid,
|
|
kind: MqttMessageKind,
|
|
time: u32,
|
|
}
|
|
|
|
struct Topics {
|
|
time: String,
|
|
confirmation: String
|
|
}
|
|
|
|
pub struct Comm<T> {
|
|
client: RefCell<mqtt::Client>,
|
|
connection: RefCell<mqtt::Connection>,
|
|
handler: TimeCallback<T>,
|
|
crypto: Crypto,
|
|
topics: Topics,
|
|
last_uuids: RefCell<Vec<uuid::Uuid>>
|
|
}
|
|
|
|
impl<T> Comm<T> {
|
|
pub fn new(
|
|
broker_domain: &str,
|
|
password: &str,
|
|
handler: TimeCallback<T>,
|
|
) -> Result<Self, mqtt::ClientError> {
|
|
let mut create_opts =
|
|
mqtt::MqttOptions::new(uuid::Uuid::new_v4().to_string(), broker_domain, 8883);
|
|
create_opts.set_keep_alive(Duration::from_secs(5));
|
|
|
|
let transport = mqtt::Transport::Tls(mqtt::TlsConfiguration::default());
|
|
create_opts.set_transport(transport);
|
|
|
|
let (mut cli, conn) = mqtt::Client::new(create_opts, 10);
|
|
|
|
let crypto = Crypto::new(password, broker_domain);
|
|
|
|
let time_topic = format!("org.speedclimbing.ok-ready-go.{password}.time");
|
|
let time_topic = Crypto::sha256(&crypto.encrypt(&time_topic));
|
|
|
|
let confirmation_topic = format!("org.speedclimbing.ok-ready-go.{password}.confirmation");
|
|
let confirmation_topic = Crypto::sha256(&crypto.encrypt(&confirmation_topic));
|
|
|
|
cli.subscribe(&time_topic, mqtt::QoS::AtMostOnce)?;
|
|
|
|
Ok(Comm {
|
|
client: RefCell::new(cli),
|
|
connection: RefCell::new(conn),
|
|
handler,
|
|
crypto,
|
|
topics: Topics {
|
|
time: time_topic,
|
|
confirmation: confirmation_topic
|
|
},
|
|
last_uuids: RefCell::new(Vec::new())
|
|
})
|
|
}
|
|
|
|
pub fn handle_next_event(&self, handler_arg: &T) {
|
|
let notification = self.connection.borrow_mut().iter().next();
|
|
if notification.is_none() {
|
|
return;
|
|
}
|
|
|
|
let notification = notification.unwrap();
|
|
self._handle_notification(notification, handler_arg);
|
|
}
|
|
|
|
pub fn disconnect(&self) {
|
|
self.client.borrow_mut().disconnect().unwrap();
|
|
}
|
|
|
|
fn _handle_notification(&self, notification: Result<Event, ConnectionError>, handler_arg: &T) {
|
|
if let Err(e) = notification {
|
|
println!("Error: {:?}", e);
|
|
return;
|
|
}
|
|
|
|
let notification = notification.unwrap();
|
|
if let mqtt::Event::Outgoing(_) = notification {
|
|
return;
|
|
}
|
|
|
|
if let mqtt::Event::Incoming(notification) = notification {
|
|
self._handle_incoming_packet(notification, handler_arg);
|
|
}
|
|
}
|
|
|
|
fn _handle_incoming_packet(&self, packet: mqtt::Packet, handler_arg: &T) {
|
|
match packet {
|
|
mqtt::Packet::Publish(publish) => self._handle_publish_packet(publish, handler_arg),
|
|
_ => return,
|
|
}
|
|
}
|
|
|
|
fn _handle_publish_packet(&self, packet: mqtt::Publish, handler_arg: &T) {
|
|
if packet.topic != self.topics.time {
|
|
return;
|
|
}
|
|
|
|
let msg = String::from_utf8(packet.payload.to_vec()).unwrap();
|
|
|
|
let msg = self.crypto.decrypt(&msg);
|
|
println!("Got: {}", msg);
|
|
let msg: MqttMessage = serde_json::from_str(&msg).unwrap();
|
|
if msg.kind != MqttMessageKind::Time {
|
|
return;
|
|
}
|
|
|
|
if self.last_uuids.borrow().contains(&msg.id) {
|
|
return;
|
|
}
|
|
|
|
self.last_uuids.borrow_mut().push(msg.id);
|
|
|
|
if (self.handler)(msg.time, handler_arg).is_err() {
|
|
println!("[WARN] error in time handler -> not sending confirmation");
|
|
return;
|
|
}
|
|
|
|
let reply = MqttMessage {
|
|
id: msg.id,
|
|
kind: MqttMessageKind::Confirm,
|
|
time: 0,
|
|
};
|
|
|
|
let reply = serde_json::to_string(&reply).unwrap();
|
|
let reply = self.crypto.encrypt(&reply);
|
|
self.client.borrow_mut()
|
|
.publish(&self.topics.confirmation, mqtt::QoS::AtMostOnce, false, reply.as_bytes())
|
|
.unwrap();
|
|
}
|
|
}
|