add esp_timestamp to DB schema with v2 migration and dedup

This commit is contained in:
jess 2026-04-03 07:05:29 -07:00
parent 1ba6772738
commit dcde79cf08
2 changed files with 49 additions and 15 deletions

View File

@ -255,6 +255,9 @@ pub struct App {
ph_result: Option<PhResult>,
ph_stabilize: String,
/* measurement dedup */
current_esp_ts: Option<u32>,
/* Reference baselines */
eis_ref: Option<Vec<EisPoint>>,
lsv_ref: Option<Vec<LsvPoint>>,
@ -501,6 +504,8 @@ impl App {
ph_result: None,
ph_stabilize: "30".into(),
current_esp_ts: None,
eis_ref: None,
lsv_ref: None,
amp_ref: None,
@ -556,7 +561,7 @@ impl App {
"rcal": format!("{}", self.rcal),
"electrode": format!("{}", self.electrode),
});
if let Ok(mid) = self.storage.create_measurement(session_id, "eis", &params.to_string()) {
if let Ok(mid) = self.storage.create_measurement(session_id, "eis", &params.to_string(), self.current_esp_ts) {
let pts: Vec<(i32, String)> = self.eis_points.iter().enumerate()
.filter_map(|(i, p)| serde_json::to_string(p).ok().map(|j| (i as i32, j)))
.collect();
@ -571,7 +576,7 @@ impl App {
"scan_rate": self.lsv_scan_rate,
"rtia": format!("{}", self.lsv_rtia),
});
if let Ok(mid) = self.storage.create_measurement(session_id, "lsv", &params.to_string()) {
if let Ok(mid) = self.storage.create_measurement(session_id, "lsv", &params.to_string(), self.current_esp_ts) {
let pts: Vec<(i32, String)> = self.lsv_points.iter().enumerate()
.filter_map(|(i, p)| serde_json::to_string(p).ok().map(|j| (i as i32, j)))
.collect();
@ -586,7 +591,7 @@ impl App {
"duration_s": self.amp_duration,
"rtia": format!("{}", self.amp_rtia),
});
if let Ok(mid) = self.storage.create_measurement(session_id, "amp", &params.to_string()) {
if let Ok(mid) = self.storage.create_measurement(session_id, "amp", &params.to_string(), self.current_esp_ts) {
let pts: Vec<(i32, String)> = self.amp_points.iter().enumerate()
.filter_map(|(i, p)| serde_json::to_string(p).ok().map(|j| (i as i32, j)))
.collect();
@ -604,7 +609,7 @@ impl App {
"meas_t": self.cl_meas_t,
"rtia": format!("{}", self.cl_rtia),
});
if let Ok(mid) = self.storage.create_measurement(session_id, "chlorine", &params.to_string()) {
if let Ok(mid) = self.storage.create_measurement(session_id, "chlorine", &params.to_string(), self.current_esp_ts) {
let mut pts: Vec<(i32, String)> = self.cl_points.iter().enumerate()
.filter_map(|(i, p)| serde_json::to_string(p).ok().map(|j| (i as i32, j)))
.collect();
@ -621,7 +626,7 @@ impl App {
let params = serde_json::json!({
"stabilize_s": self.ph_stabilize,
});
if let Ok(mid) = self.storage.create_measurement(session_id, "ph", &params.to_string()) {
if let Ok(mid) = self.storage.create_measurement(session_id, "ph", &params.to_string(), self.current_esp_ts) {
if let Ok(j) = serde_json::to_string(result) {
let _ = self.storage.add_data_point(mid, 0, &j);
}
@ -646,9 +651,9 @@ impl App {
self.status = s;
}
Message::DeviceData(msg) => match msg {
EisMessage::SweepStart { num_points, freq_start, freq_stop, .. } => {
EisMessage::SweepStart { num_points, freq_start, freq_stop, esp_timestamp, .. } => {
self.current_esp_ts = esp_timestamp;
if self.collecting_refs {
/* ref collection: clear temp buffer */
self.eis_points.clear();
self.sweep_total = num_points;
} else {
@ -691,7 +696,8 @@ impl App {
self.electrode = cfg.electrode;
self.status = "Config received".into();
}
EisMessage::LsvStart { num_points, v_start, v_stop, .. } => {
EisMessage::LsvStart { num_points, v_start, v_stop, esp_timestamp, .. } => {
self.current_esp_ts = esp_timestamp;
self.lsv_points.clear();
self.lsv_total = num_points;
self.lsv_data = text_editor::Content::with_text(&fmt_lsv(&self.lsv_points));
@ -739,7 +745,8 @@ impl App {
);
}
}
EisMessage::AmpStart { v_hold, .. } => {
EisMessage::AmpStart { v_hold, esp_timestamp, .. } => {
self.current_esp_ts = esp_timestamp;
self.amp_points.clear();
self.amp_running = true;
self.amp_data = text_editor::Content::with_text(&fmt_amp(&self.amp_points));
@ -758,7 +765,8 @@ impl App {
}
self.status = format!("Amp complete: {} points", self.amp_points.len());
}
EisMessage::ClStart { num_points, .. } => {
EisMessage::ClStart { num_points, esp_timestamp, .. } => {
self.current_esp_ts = esp_timestamp;
self.cl_points.clear();
self.cl_result = None;
self.cl_total = num_points;
@ -798,11 +806,12 @@ impl App {
self.status = format!("Chlorine complete: {} points", self.cl_points.len());
}
}
EisMessage::PhResult(r, _, _) => {
EisMessage::PhResult(r, esp_ts, _) => {
if self.collecting_refs {
self.ph_ref = Some(r);
} else {
if let Some(sid) = self.current_session {
self.current_esp_ts = esp_ts;
self.save_ph(sid, &r);
}
self.status = format!("pH: {:.2} (OCP={:.1} mV, T={:.1}C)",

View File

@ -18,6 +18,7 @@ pub struct Measurement {
pub mtype: String,
pub params_json: String,
pub created_at: String,
pub esp_timestamp: Option<i64>,
}
#[derive(Debug, Clone)]
@ -41,9 +42,21 @@ impl Storage {
let conn = Connection::open(path)?;
conn.execute_batch("PRAGMA journal_mode=WAL; PRAGMA foreign_keys=ON;")?;
conn.execute_batch(SCHEMA)?;
Self::migrate_v2(&conn)?;
Ok(Self { conn })
}
fn migrate_v2(conn: &Connection) -> Result<(), rusqlite::Error> {
let has_col: bool = conn.prepare("SELECT esp_timestamp FROM measurements LIMIT 0")
.is_ok();
if !has_col {
conn.execute_batch(
"ALTER TABLE measurements ADD COLUMN esp_timestamp INTEGER;"
)?;
}
Ok(())
}
pub fn create_session(&self, name: &str, notes: &str) -> Result<i64, rusqlite::Error> {
self.conn.execute(
"INSERT INTO sessions (name, notes) VALUES (?1, ?2)",
@ -74,10 +87,21 @@ impl Storage {
pub fn create_measurement(
&self, session_id: i64, mtype: &str, params_json: &str,
esp_timestamp: Option<u32>,
) -> Result<i64, rusqlite::Error> {
if let Some(ts) = esp_timestamp {
let exists: bool = self.conn.query_row(
"SELECT EXISTS(SELECT 1 FROM measurements WHERE session_id = ?1 AND esp_timestamp = ?2)",
params![session_id, ts as i64],
|row| row.get(0),
)?;
if exists {
return Err(rusqlite::Error::StatementChangedRows(0));
}
}
self.conn.execute(
"INSERT INTO measurements (session_id, type, params_json) VALUES (?1, ?2, ?3)",
params![session_id, mtype, params_json],
"INSERT INTO measurements (session_id, type, params_json, esp_timestamp) VALUES (?1, ?2, ?3, ?4)",
params![session_id, mtype, params_json, esp_timestamp.map(|t| t as i64)],
)?;
Ok(self.conn.last_insert_rowid())
}
@ -109,7 +133,7 @@ impl Storage {
pub fn get_measurements(&self, session_id: i64) -> Result<Vec<Measurement>, rusqlite::Error> {
let mut stmt = self.conn.prepare(
"SELECT id, session_id, type, params_json, created_at \
"SELECT id, session_id, type, params_json, created_at, esp_timestamp \
FROM measurements WHERE session_id = ?1 ORDER BY created_at DESC",
)?;
let rows = stmt.query_map(params![session_id], |row| {
@ -119,6 +143,7 @@ impl Storage {
mtype: row.get(2)?,
params_json: row.get(3)?,
created_at: row.get(4)?,
esp_timestamp: row.get(5)?,
})
})?;
rows.collect()
@ -244,7 +269,7 @@ impl Storage {
Some(t) => serde_json::to_string(&toml_table_to_json(t))?,
None => "{}".to_string(),
};
let mid = self.create_measurement(session_id, mtype, &params_json)?;
let mid = self.create_measurement(session_id, mtype, &params_json, None)?;
if let Some(toml::Value::Array(data)) = mt.get("data") {
let pts: Vec<(i32, String)> = data.iter().enumerate()