ok-ready-go/receiver/src/comm.rs

116 lines
3.2 KiB
Rust

use crypto_helper::crypto::Crypto;
use std::{time::Duration, cell::RefCell};
extern crate rumqttc as mqtt;
use serde::{Deserialize, Serialize};
pub type TimeCallback = fn(u32) -> Result<(),()>;
#[derive(Serialize, Deserialize, PartialEq)]
enum MqttMessageKind {
Time,
Confirm,
}
#[derive(Serialize, Deserialize)]
struct MqttMessage {
id: uuid::Uuid,
kind: MqttMessageKind,
time: u32,
}
pub struct Comm {
client: RefCell<mqtt::Client>,
connection: RefCell<mqtt::Connection>,
handler: TimeCallback,
crypto: Crypto,
topic: String,
}
impl Comm {
pub fn new(
broker_domain: &str,
password: &str,
handler: TimeCallback,
) -> Result<Self, mqtt::ClientError> {
let mut create_opts =
mqtt::MqttOptions::new(uuid::Uuid::new_v4().to_string(), broker_domain, 8883);
create_opts.set_keep_alive(Duration::from_secs(5));
let transport = mqtt::Transport::Tls(mqtt::TlsConfiguration::default());
create_opts.set_transport(transport);
let (mut cli, conn) = mqtt::Client::new(create_opts, 10);
let crypto = Crypto::new(password, broker_domain);
let topic = format!("org.speedclimbing.ok-ready-go.{password}");
let topic = Crypto::sha256(&crypto.encrypt(&topic));
cli.subscribe(&topic, mqtt::QoS::AtMostOnce)?;
Ok(Comm {
client: RefCell::new(cli),
connection: RefCell::new(conn),
handler,
crypto,
topic,
})
}
pub fn listen(&self) {
for (_, notification) in self.connection.borrow_mut().iter().enumerate() {
if let Err(e) = notification {
println!("Error: {:?}", e);
continue;
}
let notification = notification.unwrap();
if let mqtt::Event::Outgoing(_) = notification {
continue;
}
if let mqtt::Event::Incoming(notification) = notification {
self._handle_incoming_packet(notification);
}
}
}
fn _handle_incoming_packet(&self, packet: mqtt::Packet) {
match packet {
mqtt::Packet::Publish(publish) => self._handle_publish_packet(publish),
_ => return,
}
}
fn _handle_publish_packet(&self, packet: mqtt::Publish) {
if packet.topic != self.topic {
return;
}
let msg = String::from_utf8(packet.payload.to_vec()).unwrap();
let msg = self.crypto.decrypt(&msg);
println!("Got: {}", msg);
let msg: MqttMessage = serde_json::from_str(&msg).unwrap();
if msg.kind != MqttMessageKind::Time {
return;
}
if (self.handler)(msg.time).is_err() {
println!("[WARN] error in time handler -> not sending confirmation");
return;
}
let reply = MqttMessage {
id: msg.id,
kind: MqttMessageKind::Confirm,
time: 0,
};
let reply = serde_json::to_string(&reply).unwrap();
let reply = self.crypto.encrypt(&reply);
self.client.borrow_mut()
.publish(&self.topic, mqtt::QoS::AtMostOnce, false, reply.as_bytes())
.unwrap();
}
}