diff --git a/src/client.rs b/src/client.rs index 63db39e..1898090 100644 --- a/src/client.rs +++ b/src/client.rs @@ -205,13 +205,7 @@ impl KiCadClient { .clone(); let request_bytes = envelope::encode_request(&token, &self.inner.client_name, command)?; - - let transport = self.inner.clone(); - let response_bytes = tokio::task::spawn_blocking(move || { - transport.transport.roundtrip(request_bytes.as_slice()) - }) - .await - .map_err(|err| KiCadError::RuntimeJoin(err.to_string()))??; + let response_bytes = self.inner.transport.roundtrip(request_bytes).await?; let response = envelope::decode_response(&response_bytes)?; diff --git a/src/error.rs b/src/error.rs index 52b06bb..8c5d48f 100644 --- a/src/error.rs +++ b/src/error.rs @@ -19,6 +19,9 @@ pub enum KiCadError { #[error("transport receive failed: {reason}")] TransportReceive { reason: String }, + #[error("transport task is unavailable")] + TransportClosed, + #[error("request timed out after {timeout:?}")] Timeout { timeout: Duration }, diff --git a/src/transport.rs b/src/transport.rs index 3ad8e2c..ba3828b 100644 --- a/src/transport.rs +++ b/src/transport.rs @@ -1,65 +1,104 @@ -use std::sync::Mutex; +use std::thread; use std::time::Duration; use nng::options::{Options, RecvTimeout, SendTimeout}; use nng::{Error as NngError, Protocol, Socket}; +use tokio::sync::{mpsc, oneshot}; use crate::error::KiCadError; +const TRANSPORT_QUEUE_CAPACITY: usize = 64; + #[derive(Debug)] pub(crate) struct Transport { - socket: Mutex, - timeout: Duration, + request_tx: mpsc::Sender, +} + +#[derive(Debug)] +struct TransportRequest { + request_bytes: Vec, + response_tx: oneshot::Sender, KiCadError>>, } impl Transport { pub(crate) fn connect(socket_uri: &str, timeout: Duration) -> Result { - let socket = Socket::new(Protocol::Req0).map_err(|err| KiCadError::Connection { - socket_uri: socket_uri.to_string(), - reason: err.to_string(), - })?; + let socket = configured_socket(socket_uri, timeout)?; + let (request_tx, mut request_rx) = mpsc::channel::(TRANSPORT_QUEUE_CAPACITY); - socket - .set_opt::(Some(timeout)) + let worker_name = format!("kicad-ipc-transport-{}", std::process::id()); + thread::Builder::new() + .name(worker_name) + .spawn(move || { + while let Some(request) = request_rx.blocking_recv() { + let response = socket_roundtrip(&socket, request.request_bytes.as_slice(), timeout); + let _ = request.response_tx.send(response); + } + }) .map_err(|err| KiCadError::Connection { socket_uri: socket_uri.to_string(), reason: err.to_string(), })?; - socket - .set_opt::(Some(timeout)) - .map_err(|err| KiCadError::Connection { - socket_uri: socket_uri.to_string(), - reason: err.to_string(), - })?; + Ok(Self { request_tx }) + } - socket.dial(socket_uri).map_err(|err| KiCadError::Connection { + pub(crate) async fn roundtrip(&self, request_bytes: Vec) -> Result, KiCadError> { + let (response_tx, response_rx) = oneshot::channel(); + + self.request_tx + .send(TransportRequest { + request_bytes, + response_tx, + }) + .await + .map_err(|_| KiCadError::TransportClosed)?; + + response_rx.await.map_err(|_| KiCadError::TransportClosed)? + } +} + +fn configured_socket(socket_uri: &str, timeout: Duration) -> Result { + let socket = Socket::new(Protocol::Req0).map_err(|err| KiCadError::Connection { + socket_uri: socket_uri.to_string(), + reason: err.to_string(), + })?; + + socket + .set_opt::(Some(timeout)) + .map_err(|err| KiCadError::Connection { socket_uri: socket_uri.to_string(), reason: err.to_string(), })?; - Ok(Self { - socket: Mutex::new(socket), - timeout, - }) - } + socket + .set_opt::(Some(timeout)) + .map_err(|err| KiCadError::Connection { + socket_uri: socket_uri.to_string(), + reason: err.to_string(), + })?; - pub(crate) fn roundtrip(&self, request_bytes: &[u8]) -> Result, KiCadError> { - let guard = self - .socket - .lock() - .map_err(|_| KiCadError::InternalPoisoned)?; + socket.dial(socket_uri).map_err(|err| KiCadError::Connection { + socket_uri: socket_uri.to_string(), + reason: err.to_string(), + })?; - guard - .send(request_bytes) - .map_err(|(_, err)| map_send_error(err, self.timeout))?; + Ok(socket) +} - let response = guard - .recv() - .map_err(|err| map_receive_error(err, self.timeout))?; +fn socket_roundtrip( + socket: &Socket, + request_bytes: &[u8], + timeout: Duration, +) -> Result, KiCadError> { + socket + .send(request_bytes) + .map_err(|(_, err)| map_send_error(err, timeout))?; - Ok(response.as_slice().to_vec()) - } + let response = socket + .recv() + .map_err(|err| map_receive_error(err, timeout))?; + + Ok(response.as_slice().to_vec()) } fn map_send_error(error: NngError, timeout: Duration) -> KiCadError {