From 1a1320c09c50bd5e44962419191bfbf63e547194 Mon Sep 17 00:00:00 2001 From: Dorian Zedler Date: Fri, 17 Feb 2023 14:13:23 +0100 Subject: [PATCH] Chore: use diffrent topics for time and confirmation --- receiver/src/comm.rs | 82 ++++++++++++++++++++++++++++---------------- web/js/mqtt.js | 45 ++++++++++++++++-------- 2 files changed, 83 insertions(+), 44 deletions(-) diff --git a/receiver/src/comm.rs b/receiver/src/comm.rs index 00ff9a9..7e9a764 100644 --- a/receiver/src/comm.rs +++ b/receiver/src/comm.rs @@ -1,10 +1,11 @@ use crypto_helper::crypto::Crypto; +use mqtt::{Event, ConnectionError}; use std::{time::Duration, cell::RefCell}; extern crate rumqttc as mqtt; use serde::{Deserialize, Serialize}; -pub type TimeCallback = fn(u32) -> Result<(),()>; +pub type TimeCallback = fn(u32, &T) -> Result<(),()>; #[derive(Serialize, Deserialize, PartialEq)] enum MqttMessageKind { @@ -19,20 +20,25 @@ struct MqttMessage { time: u32, } -pub struct Comm { +struct Topics { + time: String, + confirmation: String +} + +pub struct Comm { client: RefCell, connection: RefCell, - handler: TimeCallback, + handler: TimeCallback, crypto: Crypto, - topic: String, + topics: Topics, last_uuids: RefCell> } -impl Comm { +impl Comm { pub fn new( broker_domain: &str, password: &str, - handler: TimeCallback, + handler: TimeCallback, ) -> Result { let mut create_opts = mqtt::MqttOptions::new(uuid::Uuid::new_v4().to_string(), broker_domain, 8883); @@ -45,48 +51,66 @@ impl Comm { let crypto = Crypto::new(password, broker_domain); - let topic = format!("org.speedclimbing.ok-ready-go.{password}"); - let topic = Crypto::sha256(&crypto.encrypt(&topic)); + let time_topic = format!("org.speedclimbing.ok-ready-go.{password}.time"); + let time_topic = Crypto::sha256(&crypto.encrypt(&time_topic)); - cli.subscribe(&topic, mqtt::QoS::AtMostOnce)?; + let confirmation_topic = format!("org.speedclimbing.ok-ready-go.{password}.confirmation"); + let confirmation_topic = Crypto::sha256(&crypto.encrypt(&confirmation_topic)); + + cli.subscribe(&time_topic, mqtt::QoS::AtMostOnce)?; Ok(Comm { client: RefCell::new(cli), connection: RefCell::new(conn), handler, crypto, - topic, + topics: Topics { + time: time_topic, + confirmation: confirmation_topic + }, last_uuids: RefCell::new(Vec::new()) }) } - pub fn listen(&self) { - for (_, notification) in self.connection.borrow_mut().iter().enumerate() { - if let Err(e) = notification { - println!("Error: {:?}", e); - continue; - } + pub fn handle_next_event(&self, handler_arg: &T) { + let notification = self.connection.borrow_mut().iter().next(); + if notification.is_none() { + return; + } - let notification = notification.unwrap(); - if let mqtt::Event::Outgoing(_) = notification { - continue; - } + let notification = notification.unwrap(); + self._handle_notification(notification, handler_arg); + } - if let mqtt::Event::Incoming(notification) = notification { - self._handle_incoming_packet(notification); - } + pub fn disconnect(&self) { + self.client.borrow_mut().disconnect().unwrap(); + } + + fn _handle_notification(&self, notification: Result, handler_arg: &T) { + if let Err(e) = notification { + println!("Error: {:?}", e); + return; + } + + let notification = notification.unwrap(); + if let mqtt::Event::Outgoing(_) = notification { + return; + } + + if let mqtt::Event::Incoming(notification) = notification { + self._handle_incoming_packet(notification, handler_arg); } } - fn _handle_incoming_packet(&self, packet: mqtt::Packet) { + fn _handle_incoming_packet(&self, packet: mqtt::Packet, handler_arg: &T) { match packet { - mqtt::Packet::Publish(publish) => self._handle_publish_packet(publish), + mqtt::Packet::Publish(publish) => self._handle_publish_packet(publish, handler_arg), _ => return, } } - fn _handle_publish_packet(&self, packet: mqtt::Publish) { - if packet.topic != self.topic { + fn _handle_publish_packet(&self, packet: mqtt::Publish, handler_arg: &T) { + if packet.topic != self.topics.time { return; } @@ -105,7 +129,7 @@ impl Comm { self.last_uuids.borrow_mut().push(msg.id); - if (self.handler)(msg.time).is_err() { + if (self.handler)(msg.time, handler_arg).is_err() { println!("[WARN] error in time handler -> not sending confirmation"); return; } @@ -119,7 +143,7 @@ impl Comm { let reply = serde_json::to_string(&reply).unwrap(); let reply = self.crypto.encrypt(&reply); self.client.borrow_mut() - .publish(&self.topic, mqtt::QoS::AtMostOnce, false, reply.as_bytes()) + .publish(&self.topics.confirmation, mqtt::QoS::AtMostOnce, false, reply.as_bytes()) .unwrap(); } } diff --git a/web/js/mqtt.js b/web/js/mqtt.js index 0c0b6ea..e354282 100644 --- a/web/js/mqtt.js +++ b/web/js/mqtt.js @@ -1,15 +1,17 @@ let wasm_inited_resolve; -const wasm_inited = new Promise((resolve) => {wasm_inited_resolve = resolve;}); +const wasm_inited = new Promise((resolve) => { + wasm_inited_resolve = resolve; +}); document.addEventListener("wasm-loaded", () => { - wasm_inited_resolve(wasm_bindgen) + wasm_inited_resolve(wasm_bindgen); }); document.addEventListener("alpine:init", () => { Alpine.store("mqtt", { connected: false, _client: null, - _topic: null, + _topics: null, _c: null, _pendingPromises: {}, @@ -18,7 +20,9 @@ document.addEventListener("alpine:init", () => { if (!this.connected) return null; const id = uuidv4(); - const promise = new Promise((resolve, reject) => {this._pendingPromises[id] = [resolve, reject]}); + const promise = new Promise((resolve, reject) => { + this._pendingPromises[id] = [resolve, reject]; + }); this._publish({ id: id, // used to prevent replay attacks and to identify confirm messages kind: "Time", // can be "time" or "confirm" @@ -30,7 +34,7 @@ document.addEventListener("alpine:init", () => { async connect() { if (this.connected) return; - + const password = Alpine.store("localState").password; const that = this; const brokerDomain = "broker.emqx.io"; @@ -38,7 +42,7 @@ document.addEventListener("alpine:init", () => { if (!password) return false; - const {Crypto} = await wasm_inited; + const { Crypto } = await wasm_inited; Alpine.store("localState")._state = 5; @@ -47,12 +51,18 @@ document.addEventListener("alpine:init", () => { console.log("Test", this._encrypt("test")); - this._topic = Crypto.sha256( - this._encrypt(`org.speedclimbing.ok-ready-go.${password}`) - ).toString(); + this._topics = { + time: Crypto.sha256( + this._encrypt(`org.speedclimbing.ok-ready-go.${password}.time`) + ).toString(), + confirmation: Crypto.sha256( + this._encrypt( + `org.speedclimbing.ok-ready-go.${password}.confirmation` + ) + ).toString(), + }; console.log("Connecting to MQTT broker..."); - console.log("topic:", this._topic); const options = { // Clean session @@ -65,7 +75,7 @@ document.addEventListener("alpine:init", () => { this._client.on("connect", () => { Alpine.store("localState")._state = 0; - that._client.subscribe(that._topic); + that._client.subscribe(that._topics.confirmation); this.connected = true; }); @@ -74,7 +84,12 @@ document.addEventListener("alpine:init", () => { message = that._decrypt(message.toString()); const data = JSON.parse(message); - if (topic !== that._topic || data.kind !== "Confirm" || Object.keys(this._pendingPromises).indexOf(data.id) === -1) return; + if ( + topic !== that._topics.confirmation || + data.kind !== "Confirm" || + Object.keys(this._pendingPromises).indexOf(data.id) === -1 + ) + return; console.log("<<< ", data); this._pendingPromises[data.id][0](); @@ -82,13 +97,13 @@ document.addEventListener("alpine:init", () => { }, disconnect() { - if(!this.connected) return; + if (!this.connected) return; this._client.end(true); this._client = null; this.connected = false; - this._topic = null; + this._topics = null; for (const promiseId in this._pendingPromises) { this._pendingPromises[promiseId][1](); @@ -100,7 +115,7 @@ document.addEventListener("alpine:init", () => { _publish(data) { const encryptedData = this._encrypt(JSON.stringify(data)); console.log(">>> ", data); - this._client.publish(this._topic, encryptedData, { + this._client.publish(this._topics.time, encryptedData, { qos: 1, retain: false, });