Feat: switch to rumqtt for mqtt
This commit is contained in:
parent
ee53708636
commit
8a907ed401
4 changed files with 2561 additions and 113 deletions
2428
receiver/Cargo.lock
generated
2428
receiver/Cargo.lock
generated
File diff suppressed because it is too large
Load diff
|
@ -8,12 +8,12 @@ edition = "2021"
|
||||||
[dependencies]
|
[dependencies]
|
||||||
base64 = "0.21.0"
|
base64 = "0.21.0"
|
||||||
hex = "0.4.3"
|
hex = "0.4.3"
|
||||||
paho-mqtt = { version = "0.12.0", default-features = false, features = ["bundled"] }
|
|
||||||
pbkdf2 = "0.11.0"
|
|
||||||
crypto_helper = {path = "../crypto_helper"}
|
crypto_helper = {path = "../crypto_helper"}
|
||||||
uuid = { version = "1.3.0", features = ["serde", "v4"] }
|
uuid = { version = "1.3.0", features = ["serde", "v4"] }
|
||||||
serde = { version = "1.0", features = ["derive"] }
|
serde = { version = "1.0", features = ["derive"] }
|
||||||
serde_json = "1.0"
|
serde_json = "1.0"
|
||||||
|
iced = "0.7.0"
|
||||||
|
rumqttc = { version = "0.20.0", default-features = false, features = ["use-rustls"] }
|
||||||
|
|
||||||
[target.'cfg(unix)'.dependencies]
|
[target.'cfg(unix)'.dependencies]
|
||||||
enigo = "0.0.14"
|
enigo = "0.0.14"
|
||||||
|
|
112
receiver/src/comm.rs
Normal file
112
receiver/src/comm.rs
Normal file
|
@ -0,0 +1,112 @@
|
||||||
|
use crypto_helper::crypto::Crypto;
|
||||||
|
use std::{time::Duration, cell::RefCell};
|
||||||
|
extern crate rumqttc as mqtt;
|
||||||
|
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
|
pub type TimeCallback = fn(u32) -> ();
|
||||||
|
|
||||||
|
#[derive(Serialize, Deserialize, PartialEq)]
|
||||||
|
enum MqttMessageKind {
|
||||||
|
Time,
|
||||||
|
Confirm,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Serialize, Deserialize)]
|
||||||
|
struct MqttMessage {
|
||||||
|
id: uuid::Uuid,
|
||||||
|
kind: MqttMessageKind,
|
||||||
|
time: u32,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct Comm {
|
||||||
|
client: RefCell<mqtt::Client>,
|
||||||
|
connection: RefCell<mqtt::Connection>,
|
||||||
|
handler: TimeCallback,
|
||||||
|
crypto: Crypto,
|
||||||
|
topic: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Comm {
|
||||||
|
pub fn new(
|
||||||
|
broker_domain: &str,
|
||||||
|
password: &str,
|
||||||
|
handler: TimeCallback,
|
||||||
|
) -> Result<Self, mqtt::ClientError> {
|
||||||
|
let mut create_opts =
|
||||||
|
mqtt::MqttOptions::new(uuid::Uuid::new_v4().to_string(), broker_domain, 8883);
|
||||||
|
create_opts.set_keep_alive(Duration::from_secs(5));
|
||||||
|
let transport = mqtt::Transport::Tls(mqtt::TlsConfiguration::default());
|
||||||
|
create_opts.set_transport(transport);
|
||||||
|
let (mut cli, conn) = mqtt::Client::new(create_opts, 10);
|
||||||
|
|
||||||
|
let crypto = Crypto::new(password, broker_domain);
|
||||||
|
|
||||||
|
let topic = format!("org.speedclimbing.ok-ready-go.{password}");
|
||||||
|
let topic = Crypto::sha256(&crypto.encrypt(&topic));
|
||||||
|
|
||||||
|
cli.subscribe(&topic, mqtt::QoS::AtMostOnce)?;
|
||||||
|
|
||||||
|
Ok(Comm {
|
||||||
|
client: RefCell::new(cli),
|
||||||
|
connection: RefCell::new(conn),
|
||||||
|
handler,
|
||||||
|
crypto,
|
||||||
|
topic,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn listen(&self) {
|
||||||
|
for (_, notification) in self.connection.borrow_mut().iter().enumerate() {
|
||||||
|
if let Err(e) = notification {
|
||||||
|
println!("Error: {:?}", e);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
let notification = notification.unwrap();
|
||||||
|
if let mqtt::Event::Outgoing(_) = notification {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if let mqtt::Event::Incoming(notification) = notification {
|
||||||
|
self._handle_incoming_packet(notification);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn _handle_incoming_packet(&self, packet: mqtt::Packet) {
|
||||||
|
match packet {
|
||||||
|
mqtt::Packet::Publish(publish) => self._handle_publish_packet(publish),
|
||||||
|
_ => return,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn _handle_publish_packet(&self, packet: mqtt::Publish) {
|
||||||
|
if packet.topic != self.topic {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
let msg = String::from_utf8(packet.payload.to_vec()).unwrap();
|
||||||
|
|
||||||
|
let msg = self.crypto.decrypt(&msg);
|
||||||
|
println!("Got: {}", msg);
|
||||||
|
let msg: MqttMessage = serde_json::from_str(&msg).unwrap();
|
||||||
|
if msg.kind != MqttMessageKind::Time {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
(self.handler)(msg.time);
|
||||||
|
|
||||||
|
let reply = MqttMessage {
|
||||||
|
id: msg.id,
|
||||||
|
kind: MqttMessageKind::Confirm,
|
||||||
|
time: 0,
|
||||||
|
};
|
||||||
|
|
||||||
|
let reply = serde_json::to_string(&reply).unwrap();
|
||||||
|
let reply = self.crypto.encrypt(&reply);
|
||||||
|
self.client.borrow_mut()
|
||||||
|
.publish(&self.topic, mqtt::QoS::AtMostOnce, false, reply.as_bytes())
|
||||||
|
.unwrap();
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,46 +1,63 @@
|
||||||
use serde::{Deserialize, Serialize};
|
use iced::theme;
|
||||||
use std::time::Duration;
|
use iced::widget::{
|
||||||
|
checkbox, column, container, horizontal_space, image, radio, row, scrollable, slider, text,
|
||||||
extern crate paho_mqtt as mqtt;
|
text_input, toggler, vertical_space,
|
||||||
|
};
|
||||||
|
use iced::widget::{Button, Column, Container, Slider};
|
||||||
|
use iced::{alignment, Element, Length, Sandbox, Settings};
|
||||||
|
|
||||||
|
mod comm;
|
||||||
mod keyboard;
|
mod keyboard;
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize, PartialEq)]
|
struct Gui {
|
||||||
enum MqttMessageKind {
|
password: String,
|
||||||
Time,
|
|
||||||
Confirm,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize)]
|
#[derive(Debug, Clone)]
|
||||||
struct MqttMessage {
|
enum Message {
|
||||||
id: uuid::Uuid,
|
Pressed,
|
||||||
kind: MqttMessageKind,
|
PasswordChanged(String),
|
||||||
time: u32,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn init_mqtt(
|
impl Sandbox for Gui {
|
||||||
uri: &str,
|
type Message = Message;
|
||||||
topic: &str,
|
|
||||||
) -> mqtt::Result<(mqtt::Client, mqtt::Receiver<Option<mqtt::Message>>)> {
|
|
||||||
let create_opts = mqtt::CreateOptionsBuilder::new().server_uri(uri).finalize();
|
|
||||||
|
|
||||||
// Create a client.
|
fn new() -> Self {
|
||||||
let cli = mqtt::Client::new(create_opts)?;
|
Gui {password: "".to_owned()}
|
||||||
|
}
|
||||||
|
|
||||||
// Define the set of options for the connection.
|
fn update(&mut self, event: Message) {
|
||||||
let conn_opts = mqtt::ConnectOptionsBuilder::new()
|
match event {
|
||||||
.keep_alive_interval(Duration::from_secs(20))
|
Message::PasswordChanged(p) => self.password = p,
|
||||||
.clean_session(true)
|
Message::Pressed => println!("Pressed")
|
||||||
.finalize();
|
}
|
||||||
|
}
|
||||||
|
|
||||||
let rx = cli.start_consuming();
|
fn view(&self) -> Element<Message> {
|
||||||
|
let content: Element<_> = column![
|
||||||
|
text_input("Enter password", &self.password, Message::PasswordChanged)
|
||||||
|
.password()
|
||||||
|
.size(30),
|
||||||
|
button("Connect")
|
||||||
|
.style(theme::Button::Primary)
|
||||||
|
.on_press(Message::Pressed),
|
||||||
|
]
|
||||||
|
.max_width(540)
|
||||||
|
.spacing(20)
|
||||||
|
.padding(20)
|
||||||
|
.into();
|
||||||
|
|
||||||
// Connect and wait for it to complete or fail.
|
content
|
||||||
cli.connect(conn_opts)?;
|
}
|
||||||
|
|
||||||
cli.subscribe(topic, 1)?;
|
fn title(&self) -> String {
|
||||||
|
"gui".to_owned()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
Ok((cli, rx))
|
fn button<'a, Message: Clone>(label: &str) -> Button<'a, Message> {
|
||||||
|
iced::widget::button(text(label).horizontal_alignment(alignment::Horizontal::Center))
|
||||||
|
.padding(12)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn millis_to_string(millis: u32) -> String {
|
fn millis_to_string(millis: u32) -> String {
|
||||||
|
@ -49,51 +66,24 @@ fn millis_to_string(millis: u32) -> String {
|
||||||
formatted.replace(".", ",")
|
formatted.replace(".", ",")
|
||||||
}
|
}
|
||||||
|
|
||||||
fn main() {
|
fn handle_time(time: u32) {
|
||||||
let password = "test";
|
println!("Got time: {}", time);
|
||||||
let broker_domain = "broker.emqx.io";
|
let time_with_comma = millis_to_string(time);
|
||||||
let broker_url = format!("tcp://{}:1883", broker_domain);
|
|
||||||
|
|
||||||
let c = crypto_helper::crypto::Crypto::new(password, broker_domain);
|
|
||||||
|
|
||||||
let topic = format!("org.speedclimbing.ok-ready-go.{password}");
|
|
||||||
let topic = crypto_helper::crypto::Crypto::sha256(&c.encrypt(&topic));
|
|
||||||
|
|
||||||
let (cli, rx) = init_mqtt(&broker_url, &topic).unwrap();
|
|
||||||
|
|
||||||
println!("Processing requests...");
|
|
||||||
for msg in rx.iter() {
|
|
||||||
if let Some(msg) = msg {
|
|
||||||
let msg = msg.payload_str();
|
|
||||||
|
|
||||||
let msg = c.decrypt(&msg);
|
|
||||||
println!("Got: {}", msg);
|
|
||||||
let msg: MqttMessage = serde_json::from_str(&msg).unwrap();
|
|
||||||
if msg.kind != MqttMessageKind::Time {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
println!("Got time: {} with id {}", msg.time, msg.id);
|
|
||||||
let time_with_comma = millis_to_string(msg.time);
|
|
||||||
|
|
||||||
println!("Trying to type {time_with_comma}");
|
println!("Trying to type {time_with_comma}");
|
||||||
keyboard::type_text(&time_with_comma);
|
keyboard::type_text(&time_with_comma);
|
||||||
keyboard::click_tab();
|
keyboard::click_tab();
|
||||||
keyboard::click_tab();
|
keyboard::click_tab();
|
||||||
|
}
|
||||||
let reply = MqttMessage {
|
|
||||||
id: msg.id,
|
fn main() {
|
||||||
kind: MqttMessageKind::Confirm,
|
let password = "test";
|
||||||
time: 0,
|
let broker_domain = "broker.emqx.io";
|
||||||
};
|
|
||||||
|
let mut c = comm::Comm::new(&broker_domain, &password, handle_time).unwrap();
|
||||||
let reply = serde_json::to_string(&reply).unwrap();
|
|
||||||
let reply = c.encrypt(&reply);
|
//Gui::run(Settings::default());
|
||||||
//println!("Sending reply: {reply}");
|
|
||||||
let reply = mqtt::Message::new(&topic, reply, 1);
|
println!("Processing requests...");
|
||||||
cli.publish(reply).expect("publish");
|
c.listen();
|
||||||
} else if !cli.is_connected() {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue