From f36989e3f9c91b39245de9b9a54fd52e20c30afc Mon Sep 17 00:00:00 2001 From: jess Date: Tue, 31 Mar 2026 19:56:39 -0700 Subject: [PATCH] =?UTF-8?q?cue:=20add=20UDP=20transport=20=E2=80=94=20conn?= =?UTF-8?q?ect=20to=20ESP32=20WiFi=20AP=20as=20alternative=20to=20BLE=20MI?= =?UTF-8?q?DI?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cue/src/app.rs | 154 +++++++++++++++++++++++++++++++++++------------- cue/src/main.rs | 1 + cue/src/udp.rs | 116 ++++++++++++++++++++++++++++++++++++ 3 files changed, 230 insertions(+), 41 deletions(-) create mode 100644 cue/src/udp.rs diff --git a/cue/src/app.rs b/cue/src/app.rs index 424837d..e9735a9 100644 --- a/cue/src/app.rs +++ b/cue/src/app.rs @@ -16,6 +16,13 @@ use crate::protocol::{ PhResult, Rcal, Rtia, }; use crate::storage::{self, Session, Storage}; +use crate::udp::UdpEvent; + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum TransportMode { + Midi, + Udp, +} #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum Tab { @@ -119,6 +126,7 @@ pub enum Message { /* Misc */ OpenMidiSetup, RefreshMidi, + ToggleTransport, } pub struct App { @@ -215,6 +223,7 @@ pub struct App { /* Global */ temp_c: f32, midi_gen: u64, + transport: TransportMode, } /* ---- data table formatting ---- */ @@ -408,6 +417,7 @@ impl App { temp_c: 25.0, midi_gen: 0, + transport: TransportMode::Midi, }, Task::none()) } @@ -1003,41 +1013,86 @@ impl App { self.ble_connected = false; self.status = "Looking for MIDI device...".into(); } + Message::ToggleTransport => { + self.transport = match self.transport { + TransportMode::Midi => TransportMode::Udp, + TransportMode::Udp => TransportMode::Midi, + }; + self.midi_gen += 1; + self.cmd_tx = None; + self.ble_connected = false; + self.status = match self.transport { + TransportMode::Midi => "Looking for MIDI device...".into(), + TransportMode::Udp => "Connecting UDP...".into(), + }; + } } Task::none() } pub fn subscription(&self) -> Subscription { - let ble = Subscription::run_with_id( + let use_udp = self.transport == TransportMode::Udp; + let transport = Subscription::run_with_id( self.midi_gen, - iced::stream::channel(100, |mut output| async move { - loop { - let (ble_tx, mut ble_rx) = mpsc::unbounded_channel::(); - let (cmd_tx, cmd_rx) = mpsc::unbounded_channel::>(); + iced::stream::channel(100, move |mut output| async move { + if use_udp { + loop { + let (udp_tx, mut udp_rx) = mpsc::unbounded_channel::(); + let (cmd_tx, cmd_rx) = mpsc::unbounded_channel::>(); - let tx = ble_tx.clone(); - tokio::spawn(async move { - if let Err(e) = crate::ble::connect_and_run(tx, cmd_rx).await { - eprintln!("BLE: {e}"); - } - }); - - let mut ready_sent = false; - while let Some(ev) = ble_rx.recv().await { - let msg = match ev { - BleEvent::Status(ref s) if s == "Connected" && !ready_sent => { - ready_sent = true; - let _ = output.send(Message::BleReady(cmd_tx.clone())).await; - Message::BleStatus(s.clone()) + let tx = udp_tx.clone(); + tokio::spawn(async move { + if let Err(e) = crate::udp::connect_and_run(tx, cmd_rx).await { + eprintln!("UDP: {e}"); } - BleEvent::Status(s) => Message::BleStatus(s), - BleEvent::Data(m) => Message::BleData(m), - }; - let _ = output.send(msg).await; - } + }); - let _ = output.send(Message::BleStatus("Reconnecting...".into())).await; - tokio::time::sleep(Duration::from_millis(500)).await; + let mut ready_sent = false; + while let Some(ev) = udp_rx.recv().await { + let msg = match ev { + UdpEvent::Status(ref s) if s == "Connected" && !ready_sent => { + ready_sent = true; + let _ = output.send(Message::BleReady(cmd_tx.clone())).await; + Message::BleStatus(s.clone()) + } + UdpEvent::Status(s) => Message::BleStatus(s), + UdpEvent::Data(m) => Message::BleData(m), + }; + let _ = output.send(msg).await; + } + + let _ = output.send(Message::BleStatus("Reconnecting UDP...".into())).await; + tokio::time::sleep(Duration::from_millis(500)).await; + } + } else { + loop { + let (ble_tx, mut ble_rx) = mpsc::unbounded_channel::(); + let (cmd_tx, cmd_rx) = mpsc::unbounded_channel::>(); + + let tx = ble_tx.clone(); + tokio::spawn(async move { + if let Err(e) = crate::ble::connect_and_run(tx, cmd_rx).await { + eprintln!("BLE: {e}"); + } + }); + + let mut ready_sent = false; + while let Some(ev) = ble_rx.recv().await { + let msg = match ev { + BleEvent::Status(ref s) if s == "Connected" && !ready_sent => { + ready_sent = true; + let _ = output.send(Message::BleReady(cmd_tx.clone())).await; + Message::BleStatus(s.clone()) + } + BleEvent::Status(s) => Message::BleStatus(s), + BleEvent::Data(m) => Message::BleData(m), + }; + let _ = output.send(msg).await; + } + + let _ = output.send(Message::BleStatus("Reconnecting...".into())).await; + tokio::time::sleep(Duration::from_millis(500)).await; + } } }), ); @@ -1047,7 +1102,7 @@ impl App { let menu_tick = iced::time::every(Duration::from_millis(50)) .map(|_| Message::NativeMenuTick); - Subscription::batch([ble, temp_poll, menu_tick]) + Subscription::batch([transport, temp_poll, menu_tick]) } pub fn view(&self) -> Element<'_, Message> { @@ -1058,28 +1113,35 @@ impl App { if active { b.into() } else { b.on_press(Message::TabSelected(t)).into() } }; - let tabs = row![ + let mut tabs = row![ tab_btn("EIS", Tab::Eis, self.tab == Tab::Eis), tab_btn("LSV", Tab::Lsv, self.tab == Tab::Lsv), tab_btn("Amperometry", Tab::Amp, self.tab == Tab::Amp), tab_btn("Chlorine", Tab::Chlorine, self.tab == Tab::Chlorine), tab_btn("pH", Tab::Ph, self.tab == Tab::Ph), tab_btn("Browse", Tab::Browse, self.tab == Tab::Browse), - button(text("MIDI Setup").size(13)) - .style(style_neutral()) - .padding([6, 14]) - .on_press(Message::OpenMidiSetup), - iced::widget::horizontal_space(), - text("Clean").size(12), - text_input("mV", &self.clean_v).on_input(Message::CleanVChanged).width(60), - text_input("s", &self.clean_dur).on_input(Message::CleanDurChanged).width(45), - button(text("Clean").size(13)) - .style(btn_style(Color::from_rgb(0.65, 0.55, 0.15), Color::WHITE)) - .padding([6, 14]) - .on_press(Message::StartClean), ] .spacing(4) .align_y(iced::Alignment::Center); + if self.transport == TransportMode::Midi { + tabs = tabs.push( + button(text("MIDI Setup").size(13)) + .style(style_neutral()) + .padding([6, 14]) + .on_press(Message::OpenMidiSetup), + ); + } + tabs = tabs + .push(iced::widget::horizontal_space()) + .push(text("Clean").size(12)) + .push(text_input("mV", &self.clean_v).on_input(Message::CleanVChanged).width(60)) + .push(text_input("s", &self.clean_dur).on_input(Message::CleanDurChanged).width(45)) + .push( + button(text("Clean").size(13)) + .style(btn_style(Color::from_rgb(0.65, 0.55, 0.15), Color::WHITE)) + .padding([6, 14]) + .on_press(Message::StartClean), + ); let has_ref = match self.tab { Tab::Eis => self.eis_ref.is_some(), @@ -1149,9 +1211,19 @@ impl App { } let connected = self.ble_connected; + let transport_label = match self.transport { + TransportMode::Midi => "MIDI", + TransportMode::Udp => "UDP", + }; let mut status_row = row![text(&self.status).size(16)].spacing(6) .align_y(iced::Alignment::Center); - if !connected { + status_row = status_row.push( + button(text(transport_label).size(11)) + .style(style_neutral()) + .padding([4, 10]) + .on_press(Message::ToggleTransport), + ); + if !connected && self.transport == TransportMode::Midi { status_row = status_row.push( button(text("Refresh MIDI").size(11)) .style(style_apply()) diff --git a/cue/src/main.rs b/cue/src/main.rs index f85c648..a062931 100644 --- a/cue/src/main.rs +++ b/cue/src/main.rs @@ -4,6 +4,7 @@ mod native_menu; mod plot; mod protocol; mod storage; +mod udp; fn main() -> iced::Result { iced::application(app::App::title, app::App::update, app::App::view) diff --git a/cue/src/udp.rs b/cue/src/udp.rs new file mode 100644 index 0000000..0ec9ee3 --- /dev/null +++ b/cue/src/udp.rs @@ -0,0 +1,116 @@ +use tokio::net::UdpSocket; +use tokio::sync::mpsc; +use std::time::{Duration, Instant}; + +use crate::protocol::{self, EisMessage}; + +const ESP_ADDR: &str = "192.168.4.1:5941"; +const KEEPALIVE_INTERVAL: Duration = Duration::from_secs(5); +const TIMEOUT: Duration = Duration::from_secs(10); + +#[derive(Debug, Clone)] +pub enum UdpEvent { + Status(String), + Data(EisMessage), +} + +fn extract_sysex_frames(buf: &[u8]) -> Vec> { + let mut frames = Vec::new(); + let mut i = 0; + while i < buf.len() { + if buf[i] == 0xF0 { + if let Some(end) = buf[i..].iter().position(|&b| b == 0xF7) { + frames.push(buf[i + 1..i + end].to_vec()); + i += end + 1; + continue; + } + } + i += 1; + } + frames +} + +pub async fn connect_and_run( + tx: mpsc::UnboundedSender, + mut cmd_rx: mpsc::UnboundedReceiver>, +) -> Result<(), Box> { + loop { + let _ = tx.send(UdpEvent::Status("Connecting UDP...".into())); + + let sock = match UdpSocket::bind("0.0.0.0:0").await { + Ok(s) => s, + Err(e) => { + let _ = tx.send(UdpEvent::Status(format!("Bind failed: {e}"))); + tokio::time::sleep(Duration::from_secs(2)).await; + continue; + } + }; + + if let Err(e) = sock.connect(ESP_ADDR).await { + let _ = tx.send(UdpEvent::Status(format!("Connect failed: {e}"))); + tokio::time::sleep(Duration::from_secs(2)).await; + continue; + } + + /* Initial keepalive to register with ESP */ + let keepalive = protocol::build_sysex_get_temp(); + let _ = sock.send(&keepalive).await; + + let mut last_rx = Instant::now(); + let mut last_keepalive = Instant::now(); + let mut connected = false; + let mut buf = [0u8; 4096]; + + loop { + let deadline = tokio::time::sleep(Duration::from_millis(50)); + tokio::pin!(deadline); + + tokio::select! { + result = sock.recv(&mut buf) => { + match result { + Ok(n) if n > 0 => { + last_rx = Instant::now(); + if !connected { + connected = true; + let _ = tx.send(UdpEvent::Status("Connected".into())); + } + for frame in extract_sysex_frames(&buf[..n]) { + if let Some(msg) = protocol::parse_sysex(&frame) { + let _ = tx.send(UdpEvent::Data(msg)); + } + } + } + Ok(_) => {} + Err(e) => { + let _ = tx.send(UdpEvent::Status(format!("Recv error: {e}"))); + break; + } + } + } + cmd = cmd_rx.recv() => { + match cmd { + Some(pkt) => { + if let Err(e) = sock.send(&pkt).await { + eprintln!("UDP send: {e}"); + } + } + None => return Ok(()), + } + } + _ = &mut deadline => {} + } + + if last_keepalive.elapsed() >= KEEPALIVE_INTERVAL { + let _ = sock.send(&keepalive).await; + last_keepalive = Instant::now(); + } + + if connected && last_rx.elapsed() >= TIMEOUT { + let _ = tx.send(UdpEvent::Status("Timeout — reconnecting...".into())); + break; + } + } + + tokio::time::sleep(Duration::from_millis(500)).await; + } +}