refactor: move IPC roundtrips to single-owner transport task
This commit is contained in:
parent
5f37c40aec
commit
46f5d8b731
|
|
@ -205,13 +205,7 @@ impl KiCadClient {
|
||||||
.clone();
|
.clone();
|
||||||
|
|
||||||
let request_bytes = envelope::encode_request(&token, &self.inner.client_name, command)?;
|
let request_bytes = envelope::encode_request(&token, &self.inner.client_name, command)?;
|
||||||
|
let response_bytes = self.inner.transport.roundtrip(request_bytes).await?;
|
||||||
let transport = self.inner.clone();
|
|
||||||
let response_bytes = tokio::task::spawn_blocking(move || {
|
|
||||||
transport.transport.roundtrip(request_bytes.as_slice())
|
|
||||||
})
|
|
||||||
.await
|
|
||||||
.map_err(|err| KiCadError::RuntimeJoin(err.to_string()))??;
|
|
||||||
|
|
||||||
let response = envelope::decode_response(&response_bytes)?;
|
let response = envelope::decode_response(&response_bytes)?;
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -19,6 +19,9 @@ pub enum KiCadError {
|
||||||
#[error("transport receive failed: {reason}")]
|
#[error("transport receive failed: {reason}")]
|
||||||
TransportReceive { reason: String },
|
TransportReceive { reason: String },
|
||||||
|
|
||||||
|
#[error("transport task is unavailable")]
|
||||||
|
TransportClosed,
|
||||||
|
|
||||||
#[error("request timed out after {timeout:?}")]
|
#[error("request timed out after {timeout:?}")]
|
||||||
Timeout { timeout: Duration },
|
Timeout { timeout: Duration },
|
||||||
|
|
||||||
|
|
|
||||||
107
src/transport.rs
107
src/transport.rs
|
|
@ -1,65 +1,104 @@
|
||||||
use std::sync::Mutex;
|
use std::thread;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
use nng::options::{Options, RecvTimeout, SendTimeout};
|
use nng::options::{Options, RecvTimeout, SendTimeout};
|
||||||
use nng::{Error as NngError, Protocol, Socket};
|
use nng::{Error as NngError, Protocol, Socket};
|
||||||
|
use tokio::sync::{mpsc, oneshot};
|
||||||
|
|
||||||
use crate::error::KiCadError;
|
use crate::error::KiCadError;
|
||||||
|
|
||||||
|
const TRANSPORT_QUEUE_CAPACITY: usize = 64;
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub(crate) struct Transport {
|
pub(crate) struct Transport {
|
||||||
socket: Mutex<Socket>,
|
request_tx: mpsc::Sender<TransportRequest>,
|
||||||
timeout: Duration,
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
struct TransportRequest {
|
||||||
|
request_bytes: Vec<u8>,
|
||||||
|
response_tx: oneshot::Sender<Result<Vec<u8>, KiCadError>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Transport {
|
impl Transport {
|
||||||
pub(crate) fn connect(socket_uri: &str, timeout: Duration) -> Result<Self, KiCadError> {
|
pub(crate) fn connect(socket_uri: &str, timeout: Duration) -> Result<Self, KiCadError> {
|
||||||
let socket = Socket::new(Protocol::Req0).map_err(|err| KiCadError::Connection {
|
let socket = configured_socket(socket_uri, timeout)?;
|
||||||
socket_uri: socket_uri.to_string(),
|
let (request_tx, mut request_rx) = mpsc::channel::<TransportRequest>(TRANSPORT_QUEUE_CAPACITY);
|
||||||
reason: err.to_string(),
|
|
||||||
})?;
|
|
||||||
|
|
||||||
socket
|
let worker_name = format!("kicad-ipc-transport-{}", std::process::id());
|
||||||
.set_opt::<SendTimeout>(Some(timeout))
|
thread::Builder::new()
|
||||||
|
.name(worker_name)
|
||||||
|
.spawn(move || {
|
||||||
|
while let Some(request) = request_rx.blocking_recv() {
|
||||||
|
let response = socket_roundtrip(&socket, request.request_bytes.as_slice(), timeout);
|
||||||
|
let _ = request.response_tx.send(response);
|
||||||
|
}
|
||||||
|
})
|
||||||
.map_err(|err| KiCadError::Connection {
|
.map_err(|err| KiCadError::Connection {
|
||||||
socket_uri: socket_uri.to_string(),
|
socket_uri: socket_uri.to_string(),
|
||||||
reason: err.to_string(),
|
reason: err.to_string(),
|
||||||
})?;
|
})?;
|
||||||
|
|
||||||
socket
|
Ok(Self { request_tx })
|
||||||
.set_opt::<RecvTimeout>(Some(timeout))
|
}
|
||||||
.map_err(|err| KiCadError::Connection {
|
|
||||||
socket_uri: socket_uri.to_string(),
|
|
||||||
reason: err.to_string(),
|
|
||||||
})?;
|
|
||||||
|
|
||||||
socket.dial(socket_uri).map_err(|err| KiCadError::Connection {
|
pub(crate) async fn roundtrip(&self, request_bytes: Vec<u8>) -> Result<Vec<u8>, KiCadError> {
|
||||||
|
let (response_tx, response_rx) = oneshot::channel();
|
||||||
|
|
||||||
|
self.request_tx
|
||||||
|
.send(TransportRequest {
|
||||||
|
request_bytes,
|
||||||
|
response_tx,
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.map_err(|_| KiCadError::TransportClosed)?;
|
||||||
|
|
||||||
|
response_rx.await.map_err(|_| KiCadError::TransportClosed)?
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn configured_socket(socket_uri: &str, timeout: Duration) -> Result<Socket, KiCadError> {
|
||||||
|
let socket = Socket::new(Protocol::Req0).map_err(|err| KiCadError::Connection {
|
||||||
|
socket_uri: socket_uri.to_string(),
|
||||||
|
reason: err.to_string(),
|
||||||
|
})?;
|
||||||
|
|
||||||
|
socket
|
||||||
|
.set_opt::<SendTimeout>(Some(timeout))
|
||||||
|
.map_err(|err| KiCadError::Connection {
|
||||||
socket_uri: socket_uri.to_string(),
|
socket_uri: socket_uri.to_string(),
|
||||||
reason: err.to_string(),
|
reason: err.to_string(),
|
||||||
})?;
|
})?;
|
||||||
|
|
||||||
Ok(Self {
|
socket
|
||||||
socket: Mutex::new(socket),
|
.set_opt::<RecvTimeout>(Some(timeout))
|
||||||
timeout,
|
.map_err(|err| KiCadError::Connection {
|
||||||
})
|
socket_uri: socket_uri.to_string(),
|
||||||
}
|
reason: err.to_string(),
|
||||||
|
})?;
|
||||||
|
|
||||||
pub(crate) fn roundtrip(&self, request_bytes: &[u8]) -> Result<Vec<u8>, KiCadError> {
|
socket.dial(socket_uri).map_err(|err| KiCadError::Connection {
|
||||||
let guard = self
|
socket_uri: socket_uri.to_string(),
|
||||||
.socket
|
reason: err.to_string(),
|
||||||
.lock()
|
})?;
|
||||||
.map_err(|_| KiCadError::InternalPoisoned)?;
|
|
||||||
|
|
||||||
guard
|
Ok(socket)
|
||||||
.send(request_bytes)
|
}
|
||||||
.map_err(|(_, err)| map_send_error(err, self.timeout))?;
|
|
||||||
|
|
||||||
let response = guard
|
fn socket_roundtrip(
|
||||||
.recv()
|
socket: &Socket,
|
||||||
.map_err(|err| map_receive_error(err, self.timeout))?;
|
request_bytes: &[u8],
|
||||||
|
timeout: Duration,
|
||||||
|
) -> Result<Vec<u8>, KiCadError> {
|
||||||
|
socket
|
||||||
|
.send(request_bytes)
|
||||||
|
.map_err(|(_, err)| map_send_error(err, timeout))?;
|
||||||
|
|
||||||
Ok(response.as_slice().to_vec())
|
let response = socket
|
||||||
}
|
.recv()
|
||||||
|
.map_err(|err| map_receive_error(err, timeout))?;
|
||||||
|
|
||||||
|
Ok(response.as_slice().to_vec())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn map_send_error(error: NngError, timeout: Duration) -> KiCadError {
|
fn map_send_error(error: NngError, timeout: Duration) -> KiCadError {
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue