Use an actor to manage the Tokio engine connection (#669)

* Use an actor to manage the Tokio engine connection

This means EngineManager trait's methods take &self not &mut self, and the tokio implementation can be cloned.

* Clean up code
This commit is contained in:
Adam Chalmers
2023-09-20 16:59:03 -05:00
committed by GitHub
parent b67c16cc9d
commit 18dbbad244
4 changed files with 75 additions and 27 deletions

View File

@ -3,10 +3,11 @@
use std::sync::Arc;
use anyhow::Result;
use anyhow::{anyhow, Result};
use dashmap::DashMap;
use futures::{SinkExt, StreamExt};
use kittycad::types::{OkWebSocketResponseData, WebSocketRequest, WebSocketResponse};
use tokio::sync::{mpsc, oneshot};
use tokio_tungstenite::tungstenite::Message as WsMsg;
use crate::{
@ -14,10 +15,11 @@ use crate::{
errors::{KclError, KclErrorDetails},
};
#[derive(Debug)]
type WebSocketTcpWrite = futures::stream::SplitSink<tokio_tungstenite::WebSocketStream<reqwest::Upgraded>, WsMsg>;
#[derive(Debug, Clone)]
pub struct EngineConnection {
tcp_write: futures::stream::SplitSink<tokio_tungstenite::WebSocketStream<reqwest::Upgraded>, WsMsg>,
tcp_read_handle: tokio::task::JoinHandle<Result<()>>,
engine_req_tx: mpsc::Sender<ToEngineReq>,
tcp_read_handle: Arc<tokio::task::JoinHandle<Result<()>>>,
responses: Arc<DashMap<uuid::Uuid, WebSocketResponse>>,
}
@ -46,7 +48,36 @@ impl TcpRead {
}
}
/// Requests to send to the engine, and a way to await a response.
struct ToEngineReq {
/// The request to send
req: WebSocketRequest,
/// If this resolves to Ok, the request was sent.
/// If this resolves to Err, the request could not be sent.
/// If this has not yet resolved, the request has not been sent yet.
request_sent: oneshot::Sender<Result<()>>,
}
impl EngineConnection {
/// Start waiting for incoming engine requests, and send each one over the WebSocket to the engine.
async fn start_write_actor(mut tcp_write: WebSocketTcpWrite, mut engine_req_rx: mpsc::Receiver<ToEngineReq>) {
while let Some(req) = engine_req_rx.recv().await {
let ToEngineReq { req, request_sent } = req;
let res = Self::inner_send_to_engine(req, &mut tcp_write).await;
let _ = request_sent.send(res);
}
}
/// Send the given `request` to the engine via the WebSocket connection `tcp_write`.
async fn inner_send_to_engine(request: WebSocketRequest, tcp_write: &mut WebSocketTcpWrite) -> Result<()> {
let msg = serde_json::to_string(&request).map_err(|e| anyhow!("could not serialize json: {e}"))?;
tcp_write
.send(WsMsg::Text(msg))
.await
.map_err(|e| anyhow!("could not send json over websocket: {e}"))?;
Ok(())
}
pub async fn new(ws: reqwest::Upgraded) -> Result<EngineConnection> {
let ws_stream = tokio_tungstenite::WebSocketStream::from_raw_socket(
ws,
@ -56,6 +87,8 @@ impl EngineConnection {
.await;
let (tcp_write, tcp_read) = ws_stream.split();
let (engine_req_tx, engine_req_rx) = mpsc::channel(10);
tokio::task::spawn(Self::start_write_actor(tcp_write, engine_req_rx));
let mut tcp_read = TcpRead { stream: tcp_read };
@ -80,18 +113,11 @@ impl EngineConnection {
});
Ok(EngineConnection {
tcp_write,
tcp_read_handle,
engine_req_tx,
tcp_read_handle: Arc::new(tcp_read_handle),
responses,
})
}
pub async fn tcp_send(&mut self, msg: WebSocketRequest) -> Result<()> {
let msg = serde_json::to_string(&msg)?;
self.tcp_write.send(WsMsg::Text(msg)).await?;
Ok(())
}
}
#[async_trait::async_trait(?Send)]
@ -99,7 +125,7 @@ impl EngineManager for EngineConnection {
/// Send a modeling command.
/// Do not wait for the response message.
fn send_modeling_cmd(
&mut self,
&self,
id: uuid::Uuid,
source_range: crate::executor::SourceRange,
cmd: kittycad::types::ModelingCmd,
@ -110,12 +136,19 @@ impl EngineManager for EngineConnection {
/// Send a modeling command and wait for the response message.
async fn send_modeling_cmd_get_response(
&mut self,
&self,
id: uuid::Uuid,
source_range: crate::executor::SourceRange,
cmd: kittycad::types::ModelingCmd,
) -> Result<OkWebSocketResponseData, KclError> {
self.tcp_send(WebSocketRequest::ModelingCmdReq { cmd, cmd_id: id })
let (tx, rx) = oneshot::channel();
// Send the request to the engine, via the actor.
self.engine_req_tx
.send(ToEngineReq {
req: WebSocketRequest::ModelingCmdReq { cmd, cmd_id: id },
request_sent: tx,
})
.await
.map_err(|e| {
KclError::Engine(KclErrorDetails {
@ -124,17 +157,32 @@ impl EngineManager for EngineConnection {
})
})?;
// Wait for the request to be sent.
rx.await
.map_err(|e| {
KclError::Engine(KclErrorDetails {
message: format!("could not send request to the engine actor: {e}"),
source_ranges: vec![source_range],
})
})?
.map_err(|e| {
KclError::Engine(KclErrorDetails {
message: format!("could not send request to the engine: {e}"),
source_ranges: vec![source_range],
})
})?;
// Wait for the response.
loop {
if let Some(resp) = self.responses.get(&id) {
if let Some(data) = &resp.resp {
return Ok(data.clone());
return if let Some(data) = &resp.resp {
Ok(data.clone())
} else {
return Err(KclError::Engine(KclErrorDetails {
Err(KclError::Engine(KclErrorDetails {
message: format!("Modeling command failed: {:?}", resp.errors),
source_ranges: vec![source_range],
}));
}
}))
};
}
}
}

View File

@ -18,7 +18,7 @@ impl EngineConnection {
#[async_trait::async_trait(?Send)]
impl crate::engine::EngineManager for EngineConnection {
fn send_modeling_cmd(
&mut self,
&self,
_id: uuid::Uuid,
_source_range: crate::executor::SourceRange,
_cmd: kittycad::types::ModelingCmd,
@ -27,7 +27,7 @@ impl crate::engine::EngineManager for EngineConnection {
}
async fn send_modeling_cmd_get_response(
&mut self,
&self,
_id: uuid::Uuid,
_source_range: crate::executor::SourceRange,
_cmd: kittycad::types::ModelingCmd,

View File

@ -35,7 +35,7 @@ impl EngineConnection {
#[async_trait::async_trait(?Send)]
impl crate::engine::EngineManager for EngineConnection {
fn send_modeling_cmd(
&mut self,
&self,
id: uuid::Uuid,
source_range: crate::executor::SourceRange,
cmd: kittycad::types::ModelingCmd,
@ -60,7 +60,7 @@ impl crate::engine::EngineManager for EngineConnection {
}
async fn send_modeling_cmd_get_response(
&mut self,
&self,
id: uuid::Uuid,
source_range: crate::executor::SourceRange,
cmd: kittycad::types::ModelingCmd,

View File

@ -36,7 +36,7 @@ pub trait EngineManager {
/// Send a modeling command.
/// Do not wait for the response message.
fn send_modeling_cmd(
&mut self,
&self,
id: uuid::Uuid,
source_range: crate::executor::SourceRange,
cmd: kittycad::types::ModelingCmd,
@ -44,7 +44,7 @@ pub trait EngineManager {
/// Send a modeling command and wait for the response message.
async fn send_modeling_cmd_get_response(
&mut self,
&self,
id: uuid::Uuid,
source_range: crate::executor::SourceRange,
cmd: kittycad::types::ModelingCmd,