Chore: use diffrent topics for time and confirmation

This commit is contained in:
Dorian Zedler 2023-02-17 14:13:23 +01:00
parent 5b2aefe659
commit 1a1320c09c
Signed by: dorian
GPG key ID: 989DE36109AFA354
2 changed files with 83 additions and 44 deletions

View file

@ -1,10 +1,11 @@
use crypto_helper::crypto::Crypto;
use mqtt::{Event, ConnectionError};
use std::{time::Duration, cell::RefCell};
extern crate rumqttc as mqtt;
use serde::{Deserialize, Serialize};
pub type TimeCallback = fn(u32) -> Result<(),()>;
pub type TimeCallback<T> = fn(u32, &T) -> Result<(),()>;
#[derive(Serialize, Deserialize, PartialEq)]
enum MqttMessageKind {
@ -19,20 +20,25 @@ struct MqttMessage {
time: u32,
}
pub struct Comm {
struct Topics {
time: String,
confirmation: String
}
pub struct Comm<T> {
client: RefCell<mqtt::Client>,
connection: RefCell<mqtt::Connection>,
handler: TimeCallback,
handler: TimeCallback<T>,
crypto: Crypto,
topic: String,
topics: Topics,
last_uuids: RefCell<Vec<uuid::Uuid>>
}
impl Comm {
impl<T> Comm<T> {
pub fn new(
broker_domain: &str,
password: &str,
handler: TimeCallback,
handler: TimeCallback<T>,
) -> Result<Self, mqtt::ClientError> {
let mut create_opts =
mqtt::MqttOptions::new(uuid::Uuid::new_v4().to_string(), broker_domain, 8883);
@ -45,48 +51,66 @@ impl Comm {
let crypto = Crypto::new(password, broker_domain);
let topic = format!("org.speedclimbing.ok-ready-go.{password}");
let topic = Crypto::sha256(&crypto.encrypt(&topic));
let time_topic = format!("org.speedclimbing.ok-ready-go.{password}.time");
let time_topic = Crypto::sha256(&crypto.encrypt(&time_topic));
cli.subscribe(&topic, mqtt::QoS::AtMostOnce)?;
let confirmation_topic = format!("org.speedclimbing.ok-ready-go.{password}.confirmation");
let confirmation_topic = Crypto::sha256(&crypto.encrypt(&confirmation_topic));
cli.subscribe(&time_topic, mqtt::QoS::AtMostOnce)?;
Ok(Comm {
client: RefCell::new(cli),
connection: RefCell::new(conn),
handler,
crypto,
topic,
topics: Topics {
time: time_topic,
confirmation: confirmation_topic
},
last_uuids: RefCell::new(Vec::new())
})
}
pub fn listen(&self) {
for (_, notification) in self.connection.borrow_mut().iter().enumerate() {
if let Err(e) = notification {
println!("Error: {:?}", e);
continue;
}
pub fn handle_next_event(&self, handler_arg: &T) {
let notification = self.connection.borrow_mut().iter().next();
if notification.is_none() {
return;
}
let notification = notification.unwrap();
if let mqtt::Event::Outgoing(_) = notification {
continue;
}
let notification = notification.unwrap();
self._handle_notification(notification, handler_arg);
}
if let mqtt::Event::Incoming(notification) = notification {
self._handle_incoming_packet(notification);
}
pub fn disconnect(&self) {
self.client.borrow_mut().disconnect().unwrap();
}
fn _handle_notification(&self, notification: Result<Event, ConnectionError>, handler_arg: &T) {
if let Err(e) = notification {
println!("Error: {:?}", e);
return;
}
let notification = notification.unwrap();
if let mqtt::Event::Outgoing(_) = notification {
return;
}
if let mqtt::Event::Incoming(notification) = notification {
self._handle_incoming_packet(notification, handler_arg);
}
}
fn _handle_incoming_packet(&self, packet: mqtt::Packet) {
fn _handle_incoming_packet(&self, packet: mqtt::Packet, handler_arg: &T) {
match packet {
mqtt::Packet::Publish(publish) => self._handle_publish_packet(publish),
mqtt::Packet::Publish(publish) => self._handle_publish_packet(publish, handler_arg),
_ => return,
}
}
fn _handle_publish_packet(&self, packet: mqtt::Publish) {
if packet.topic != self.topic {
fn _handle_publish_packet(&self, packet: mqtt::Publish, handler_arg: &T) {
if packet.topic != self.topics.time {
return;
}
@ -105,7 +129,7 @@ impl Comm {
self.last_uuids.borrow_mut().push(msg.id);
if (self.handler)(msg.time).is_err() {
if (self.handler)(msg.time, handler_arg).is_err() {
println!("[WARN] error in time handler -> not sending confirmation");
return;
}
@ -119,7 +143,7 @@ impl Comm {
let reply = serde_json::to_string(&reply).unwrap();
let reply = self.crypto.encrypt(&reply);
self.client.borrow_mut()
.publish(&self.topic, mqtt::QoS::AtMostOnce, false, reply.as_bytes())
.publish(&self.topics.confirmation, mqtt::QoS::AtMostOnce, false, reply.as_bytes())
.unwrap();
}
}

View file

@ -1,15 +1,17 @@
let wasm_inited_resolve;
const wasm_inited = new Promise((resolve) => {wasm_inited_resolve = resolve;});
const wasm_inited = new Promise((resolve) => {
wasm_inited_resolve = resolve;
});
document.addEventListener("wasm-loaded", () => {
wasm_inited_resolve(wasm_bindgen)
wasm_inited_resolve(wasm_bindgen);
});
document.addEventListener("alpine:init", () => {
Alpine.store("mqtt", {
connected: false,
_client: null,
_topic: null,
_topics: null,
_c: null,
_pendingPromises: {},
@ -18,7 +20,9 @@ document.addEventListener("alpine:init", () => {
if (!this.connected) return null;
const id = uuidv4();
const promise = new Promise((resolve, reject) => {this._pendingPromises[id] = [resolve, reject]});
const promise = new Promise((resolve, reject) => {
this._pendingPromises[id] = [resolve, reject];
});
this._publish({
id: id, // used to prevent replay attacks and to identify confirm messages
kind: "Time", // can be "time" or "confirm"
@ -30,7 +34,7 @@ document.addEventListener("alpine:init", () => {
async connect() {
if (this.connected) return;
const password = Alpine.store("localState").password;
const that = this;
const brokerDomain = "broker.emqx.io";
@ -38,7 +42,7 @@ document.addEventListener("alpine:init", () => {
if (!password) return false;
const {Crypto} = await wasm_inited;
const { Crypto } = await wasm_inited;
Alpine.store("localState")._state = 5;
@ -47,12 +51,18 @@ document.addEventListener("alpine:init", () => {
console.log("Test", this._encrypt("test"));
this._topic = Crypto.sha256(
this._encrypt(`org.speedclimbing.ok-ready-go.${password}`)
).toString();
this._topics = {
time: Crypto.sha256(
this._encrypt(`org.speedclimbing.ok-ready-go.${password}.time`)
).toString(),
confirmation: Crypto.sha256(
this._encrypt(
`org.speedclimbing.ok-ready-go.${password}.confirmation`
)
).toString(),
};
console.log("Connecting to MQTT broker...");
console.log("topic:", this._topic);
const options = {
// Clean session
@ -65,7 +75,7 @@ document.addEventListener("alpine:init", () => {
this._client.on("connect", () => {
Alpine.store("localState")._state = 0;
that._client.subscribe(that._topic);
that._client.subscribe(that._topics.confirmation);
this.connected = true;
});
@ -74,7 +84,12 @@ document.addEventListener("alpine:init", () => {
message = that._decrypt(message.toString());
const data = JSON.parse(message);
if (topic !== that._topic || data.kind !== "Confirm" || Object.keys(this._pendingPromises).indexOf(data.id) === -1) return;
if (
topic !== that._topics.confirmation ||
data.kind !== "Confirm" ||
Object.keys(this._pendingPromises).indexOf(data.id) === -1
)
return;
console.log("<<< ", data);
this._pendingPromises[data.id][0]();
@ -82,13 +97,13 @@ document.addEventListener("alpine:init", () => {
},
disconnect() {
if(!this.connected) return;
if (!this.connected) return;
this._client.end(true);
this._client = null;
this.connected = false;
this._topic = null;
this._topics = null;
for (const promiseId in this._pendingPromises) {
this._pendingPromises[promiseId][1]();
@ -100,7 +115,7 @@ document.addEventListener("alpine:init", () => {
_publish(data) {
const encryptedData = this._encrypt(JSON.stringify(data));
console.log(">>> ", data);
this._client.publish(this._topic, encryptedData, {
this._client.publish(this._topics.time, encryptedData, {
qos: 1,
retain: false,
});