diff --git a/cue/src/app.rs b/cue/src/app.rs index e71b6c3..aabedec 100644 --- a/cue/src/app.rs +++ b/cue/src/app.rs @@ -255,6 +255,9 @@ pub struct App { ph_result: Option, ph_stabilize: String, + /* measurement dedup */ + current_esp_ts: Option, + /* Reference baselines */ eis_ref: Option>, lsv_ref: Option>, @@ -501,6 +504,8 @@ impl App { ph_result: None, ph_stabilize: "30".into(), + current_esp_ts: None, + eis_ref: None, lsv_ref: None, amp_ref: None, @@ -556,7 +561,7 @@ impl App { "rcal": format!("{}", self.rcal), "electrode": format!("{}", self.electrode), }); - if let Ok(mid) = self.storage.create_measurement(session_id, "eis", ¶ms.to_string()) { + if let Ok(mid) = self.storage.create_measurement(session_id, "eis", ¶ms.to_string(), self.current_esp_ts) { let pts: Vec<(i32, String)> = self.eis_points.iter().enumerate() .filter_map(|(i, p)| serde_json::to_string(p).ok().map(|j| (i as i32, j))) .collect(); @@ -571,7 +576,7 @@ impl App { "scan_rate": self.lsv_scan_rate, "rtia": format!("{}", self.lsv_rtia), }); - if let Ok(mid) = self.storage.create_measurement(session_id, "lsv", ¶ms.to_string()) { + if let Ok(mid) = self.storage.create_measurement(session_id, "lsv", ¶ms.to_string(), self.current_esp_ts) { let pts: Vec<(i32, String)> = self.lsv_points.iter().enumerate() .filter_map(|(i, p)| serde_json::to_string(p).ok().map(|j| (i as i32, j))) .collect(); @@ -586,7 +591,7 @@ impl App { "duration_s": self.amp_duration, "rtia": format!("{}", self.amp_rtia), }); - if let Ok(mid) = self.storage.create_measurement(session_id, "amp", ¶ms.to_string()) { + if let Ok(mid) = self.storage.create_measurement(session_id, "amp", ¶ms.to_string(), self.current_esp_ts) { let pts: Vec<(i32, String)> = self.amp_points.iter().enumerate() .filter_map(|(i, p)| serde_json::to_string(p).ok().map(|j| (i as i32, j))) .collect(); @@ -604,7 +609,7 @@ impl App { "meas_t": self.cl_meas_t, "rtia": format!("{}", self.cl_rtia), }); - if let Ok(mid) = self.storage.create_measurement(session_id, "chlorine", ¶ms.to_string()) { + if let Ok(mid) = self.storage.create_measurement(session_id, "chlorine", ¶ms.to_string(), self.current_esp_ts) { let mut pts: Vec<(i32, String)> = self.cl_points.iter().enumerate() .filter_map(|(i, p)| serde_json::to_string(p).ok().map(|j| (i as i32, j))) .collect(); @@ -621,7 +626,7 @@ impl App { let params = serde_json::json!({ "stabilize_s": self.ph_stabilize, }); - if let Ok(mid) = self.storage.create_measurement(session_id, "ph", ¶ms.to_string()) { + if let Ok(mid) = self.storage.create_measurement(session_id, "ph", ¶ms.to_string(), self.current_esp_ts) { if let Ok(j) = serde_json::to_string(result) { let _ = self.storage.add_data_point(mid, 0, &j); } @@ -646,9 +651,9 @@ impl App { self.status = s; } Message::DeviceData(msg) => match msg { - EisMessage::SweepStart { num_points, freq_start, freq_stop, .. } => { + EisMessage::SweepStart { num_points, freq_start, freq_stop, esp_timestamp, .. } => { + self.current_esp_ts = esp_timestamp; if self.collecting_refs { - /* ref collection: clear temp buffer */ self.eis_points.clear(); self.sweep_total = num_points; } else { @@ -691,7 +696,8 @@ impl App { self.electrode = cfg.electrode; self.status = "Config received".into(); } - EisMessage::LsvStart { num_points, v_start, v_stop, .. } => { + EisMessage::LsvStart { num_points, v_start, v_stop, esp_timestamp, .. } => { + self.current_esp_ts = esp_timestamp; self.lsv_points.clear(); self.lsv_total = num_points; self.lsv_data = text_editor::Content::with_text(&fmt_lsv(&self.lsv_points)); @@ -739,7 +745,8 @@ impl App { ); } } - EisMessage::AmpStart { v_hold, .. } => { + EisMessage::AmpStart { v_hold, esp_timestamp, .. } => { + self.current_esp_ts = esp_timestamp; self.amp_points.clear(); self.amp_running = true; self.amp_data = text_editor::Content::with_text(&fmt_amp(&self.amp_points)); @@ -758,7 +765,8 @@ impl App { } self.status = format!("Amp complete: {} points", self.amp_points.len()); } - EisMessage::ClStart { num_points, .. } => { + EisMessage::ClStart { num_points, esp_timestamp, .. } => { + self.current_esp_ts = esp_timestamp; self.cl_points.clear(); self.cl_result = None; self.cl_total = num_points; @@ -798,11 +806,12 @@ impl App { self.status = format!("Chlorine complete: {} points", self.cl_points.len()); } } - EisMessage::PhResult(r, _, _) => { + EisMessage::PhResult(r, esp_ts, _) => { if self.collecting_refs { self.ph_ref = Some(r); } else { if let Some(sid) = self.current_session { + self.current_esp_ts = esp_ts; self.save_ph(sid, &r); } self.status = format!("pH: {:.2} (OCP={:.1} mV, T={:.1}C)", diff --git a/cue/src/storage.rs b/cue/src/storage.rs index 4ae206e..65b28d2 100644 --- a/cue/src/storage.rs +++ b/cue/src/storage.rs @@ -18,6 +18,7 @@ pub struct Measurement { pub mtype: String, pub params_json: String, pub created_at: String, + pub esp_timestamp: Option, } #[derive(Debug, Clone)] @@ -41,9 +42,21 @@ impl Storage { let conn = Connection::open(path)?; conn.execute_batch("PRAGMA journal_mode=WAL; PRAGMA foreign_keys=ON;")?; conn.execute_batch(SCHEMA)?; + Self::migrate_v2(&conn)?; Ok(Self { conn }) } + fn migrate_v2(conn: &Connection) -> Result<(), rusqlite::Error> { + let has_col: bool = conn.prepare("SELECT esp_timestamp FROM measurements LIMIT 0") + .is_ok(); + if !has_col { + conn.execute_batch( + "ALTER TABLE measurements ADD COLUMN esp_timestamp INTEGER;" + )?; + } + Ok(()) + } + pub fn create_session(&self, name: &str, notes: &str) -> Result { self.conn.execute( "INSERT INTO sessions (name, notes) VALUES (?1, ?2)", @@ -74,10 +87,21 @@ impl Storage { pub fn create_measurement( &self, session_id: i64, mtype: &str, params_json: &str, + esp_timestamp: Option, ) -> Result { + if let Some(ts) = esp_timestamp { + let exists: bool = self.conn.query_row( + "SELECT EXISTS(SELECT 1 FROM measurements WHERE session_id = ?1 AND esp_timestamp = ?2)", + params![session_id, ts as i64], + |row| row.get(0), + )?; + if exists { + return Err(rusqlite::Error::StatementChangedRows(0)); + } + } self.conn.execute( - "INSERT INTO measurements (session_id, type, params_json) VALUES (?1, ?2, ?3)", - params![session_id, mtype, params_json], + "INSERT INTO measurements (session_id, type, params_json, esp_timestamp) VALUES (?1, ?2, ?3, ?4)", + params![session_id, mtype, params_json, esp_timestamp.map(|t| t as i64)], )?; Ok(self.conn.last_insert_rowid()) } @@ -109,7 +133,7 @@ impl Storage { pub fn get_measurements(&self, session_id: i64) -> Result, rusqlite::Error> { let mut stmt = self.conn.prepare( - "SELECT id, session_id, type, params_json, created_at \ + "SELECT id, session_id, type, params_json, created_at, esp_timestamp \ FROM measurements WHERE session_id = ?1 ORDER BY created_at DESC", )?; let rows = stmt.query_map(params![session_id], |row| { @@ -119,6 +143,7 @@ impl Storage { mtype: row.get(2)?, params_json: row.get(3)?, created_at: row.get(4)?, + esp_timestamp: row.get(5)?, }) })?; rows.collect() @@ -244,7 +269,7 @@ impl Storage { Some(t) => serde_json::to_string(&toml_table_to_json(t))?, None => "{}".to_string(), }; - let mid = self.create_measurement(session_id, mtype, ¶ms_json)?; + let mid = self.create_measurement(session_id, mtype, ¶ms_json, None)?; if let Some(toml::Value::Array(data)) = mt.get("data") { let pts: Vec<(i32, String)> = data.iter().enumerate()