Make the test executor a bit more patient (#5004)
This commit is contained in:
18
flake.lock
generated
18
flake.lock
generated
@ -2,11 +2,11 @@
|
|||||||
"nodes": {
|
"nodes": {
|
||||||
"nixpkgs": {
|
"nixpkgs": {
|
||||||
"locked": {
|
"locked": {
|
||||||
"lastModified": 1721933792,
|
"lastModified": 1736320768,
|
||||||
"narHash": "sha256-zYVwABlQnxpbaHMfX6Wt9jhyQstFYwN2XjleOJV3VVg=",
|
"narHash": "sha256-nIYdTAiKIGnFNugbomgBJR+Xv5F1ZQU+HfaBqJKroC0=",
|
||||||
"owner": "NixOS",
|
"owner": "NixOS",
|
||||||
"repo": "nixpkgs",
|
"repo": "nixpkgs",
|
||||||
"rev": "2122a9b35b35719ad9a395fe783eabb092df01b1",
|
"rev": "4bc9c909d9ac828a039f288cf872d16d38185db8",
|
||||||
"type": "github"
|
"type": "github"
|
||||||
},
|
},
|
||||||
"original": {
|
"original": {
|
||||||
@ -18,11 +18,11 @@
|
|||||||
},
|
},
|
||||||
"nixpkgs_2": {
|
"nixpkgs_2": {
|
||||||
"locked": {
|
"locked": {
|
||||||
"lastModified": 1718428119,
|
"lastModified": 1728538411,
|
||||||
"narHash": "sha256-WdWDpNaq6u1IPtxtYHHWpl5BmabtpmLnMAx0RdJ/vo8=",
|
"narHash": "sha256-f0SBJz1eZ2yOuKUr5CA9BHULGXVSn6miBuUWdTyhUhU=",
|
||||||
"owner": "NixOS",
|
"owner": "NixOS",
|
||||||
"repo": "nixpkgs",
|
"repo": "nixpkgs",
|
||||||
"rev": "e6cea36f83499eb4e9cd184c8a8e823296b50ad5",
|
"rev": "b69de56fac8c2b6f8fd27f2eca01dcda8e0a4221",
|
||||||
"type": "github"
|
"type": "github"
|
||||||
},
|
},
|
||||||
"original": {
|
"original": {
|
||||||
@ -43,11 +43,11 @@
|
|||||||
"nixpkgs": "nixpkgs_2"
|
"nixpkgs": "nixpkgs_2"
|
||||||
},
|
},
|
||||||
"locked": {
|
"locked": {
|
||||||
"lastModified": 1721960387,
|
"lastModified": 1736476219,
|
||||||
"narHash": "sha256-o21ax+745ETGXrcgc/yUuLw1SI77ymp3xEpJt+w/kks=",
|
"narHash": "sha256-+qyv3QqdZCdZ3cSO/cbpEY6tntyYjfe1bB12mdpNFaY=",
|
||||||
"owner": "oxalica",
|
"owner": "oxalica",
|
||||||
"repo": "rust-overlay",
|
"repo": "rust-overlay",
|
||||||
"rev": "9cbf831c5b20a53354fc12758abd05966f9f1699",
|
"rev": "de30cc5963da22e9742bbbbb9a3344570ed237b9",
|
||||||
"type": "github"
|
"type": "github"
|
||||||
},
|
},
|
||||||
"original": {
|
"original": {
|
||||||
|
@ -502,4 +502,6 @@ impl kcl_lib::EngineManager for EngineConnection {
|
|||||||
})),
|
})),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn close(&self) {}
|
||||||
}
|
}
|
||||||
|
@ -37,9 +37,10 @@ enum SocketHealth {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type WebSocketTcpWrite = futures::stream::SplitSink<tokio_tungstenite::WebSocketStream<reqwest::Upgraded>, WsMsg>;
|
type WebSocketTcpWrite = futures::stream::SplitSink<tokio_tungstenite::WebSocketStream<reqwest::Upgraded>, WsMsg>;
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug)]
|
||||||
pub struct EngineConnection {
|
pub struct EngineConnection {
|
||||||
engine_req_tx: mpsc::Sender<ToEngineReq>,
|
engine_req_tx: mpsc::Sender<ToEngineReq>,
|
||||||
|
shutdown_tx: mpsc::Sender<()>,
|
||||||
responses: Arc<DashMap<uuid::Uuid, WebSocketResponse>>,
|
responses: Arc<DashMap<uuid::Uuid, WebSocketResponse>>,
|
||||||
pending_errors: Arc<Mutex<Vec<String>>>,
|
pending_errors: Arc<Mutex<Vec<String>>>,
|
||||||
#[allow(dead_code)]
|
#[allow(dead_code)]
|
||||||
@ -130,21 +131,49 @@ struct ToEngineReq {
|
|||||||
|
|
||||||
impl EngineConnection {
|
impl EngineConnection {
|
||||||
/// Start waiting for incoming engine requests, and send each one over the WebSocket to the engine.
|
/// Start waiting for incoming engine requests, and send each one over the WebSocket to the engine.
|
||||||
async fn start_write_actor(mut tcp_write: WebSocketTcpWrite, mut engine_req_rx: mpsc::Receiver<ToEngineReq>) {
|
async fn start_write_actor(
|
||||||
while let Some(req) = engine_req_rx.recv().await {
|
mut tcp_write: WebSocketTcpWrite,
|
||||||
let ToEngineReq { req, request_sent } = req;
|
mut engine_req_rx: mpsc::Receiver<ToEngineReq>,
|
||||||
let res = if let WebSocketRequest::ModelingCmdReq(ModelingCmdReq {
|
mut shutdown_rx: mpsc::Receiver<()>,
|
||||||
cmd: ModelingCmd::ImportFiles { .. },
|
) {
|
||||||
cmd_id: _,
|
loop {
|
||||||
}) = &req
|
tokio::select! {
|
||||||
{
|
maybe_req = engine_req_rx.recv() => {
|
||||||
// Send it as binary.
|
match maybe_req {
|
||||||
Self::inner_send_to_engine_binary(req, &mut tcp_write).await
|
Some(ToEngineReq { req, request_sent }) => {
|
||||||
} else {
|
// Decide whether to send as binary or text,
|
||||||
Self::inner_send_to_engine(req, &mut tcp_write).await
|
// then send to the engine.
|
||||||
};
|
let res = if let WebSocketRequest::ModelingCmdReq(ModelingCmdReq {
|
||||||
let _ = request_sent.send(res);
|
cmd: ModelingCmd::ImportFiles { .. },
|
||||||
|
cmd_id: _,
|
||||||
|
}) = &req
|
||||||
|
{
|
||||||
|
Self::inner_send_to_engine_binary(req, &mut tcp_write).await
|
||||||
|
} else {
|
||||||
|
Self::inner_send_to_engine(req, &mut tcp_write).await
|
||||||
|
};
|
||||||
|
|
||||||
|
// Let the caller know we’ve sent the request (ok or error).
|
||||||
|
let _ = request_sent.send(res);
|
||||||
|
}
|
||||||
|
None => {
|
||||||
|
// The engine_req_rx channel has closed, so no more requests.
|
||||||
|
// We'll gracefully exit the loop and close the engine.
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
|
||||||
|
// If we get a shutdown signal, close the engine immediately and return.
|
||||||
|
_ = shutdown_rx.recv() => {
|
||||||
|
let _ = Self::inner_close_engine(&mut tcp_write).await;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// If we exit the loop (e.g. engine_req_rx was closed),
|
||||||
|
// still gracefully close the engine before returning.
|
||||||
let _ = Self::inner_close_engine(&mut tcp_write).await;
|
let _ = Self::inner_close_engine(&mut tcp_write).await;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -194,7 +223,8 @@ impl EngineConnection {
|
|||||||
|
|
||||||
let (tcp_write, tcp_read) = ws_stream.split();
|
let (tcp_write, tcp_read) = ws_stream.split();
|
||||||
let (engine_req_tx, engine_req_rx) = mpsc::channel(10);
|
let (engine_req_tx, engine_req_rx) = mpsc::channel(10);
|
||||||
tokio::task::spawn(Self::start_write_actor(tcp_write, engine_req_rx));
|
let (shutdown_tx, shutdown_rx) = mpsc::channel(1);
|
||||||
|
tokio::task::spawn(Self::start_write_actor(tcp_write, engine_req_rx, shutdown_rx));
|
||||||
|
|
||||||
let mut tcp_read = TcpRead { stream: tcp_read };
|
let mut tcp_read = TcpRead { stream: tcp_read };
|
||||||
|
|
||||||
@ -304,6 +334,7 @@ impl EngineConnection {
|
|||||||
|
|
||||||
Ok(EngineConnection {
|
Ok(EngineConnection {
|
||||||
engine_req_tx,
|
engine_req_tx,
|
||||||
|
shutdown_tx,
|
||||||
tcp_read_handle: Arc::new(TcpReadHandle {
|
tcp_read_handle: Arc::new(TcpReadHandle {
|
||||||
handle: Arc::new(tcp_read_handle),
|
handle: Arc::new(tcp_read_handle),
|
||||||
}),
|
}),
|
||||||
@ -484,4 +515,15 @@ impl EngineManager for EngineConnection {
|
|||||||
fn get_session_data(&self) -> Option<ModelingSessionData> {
|
fn get_session_data(&self) -> Option<ModelingSessionData> {
|
||||||
self.session_data.lock().unwrap().clone()
|
self.session_data.lock().unwrap().clone()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn close(&self) {
|
||||||
|
let _ = self.shutdown_tx.send(()).await;
|
||||||
|
loop {
|
||||||
|
if let Ok(guard) = self.socket_health.lock() {
|
||||||
|
if *guard == SocketHealth::Inactive {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -160,4 +160,6 @@ impl crate::engine::EngineManager for EngineConnection {
|
|||||||
})),
|
})),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn close(&self) {}
|
||||||
}
|
}
|
||||||
|
@ -267,4 +267,7 @@ impl crate::engine::EngineManager for EngineConnection {
|
|||||||
|
|
||||||
Ok(ws_result)
|
Ok(ws_result)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// maybe we can actually impl this here? not sure how atm.
|
||||||
|
async fn close(&self) {}
|
||||||
}
|
}
|
||||||
|
@ -600,6 +600,9 @@ pub trait EngineManager: std::fmt::Debug + Send + Sync + 'static {
|
|||||||
fn get_session_data(&self) -> Option<ModelingSessionData> {
|
fn get_session_data(&self) -> Option<ModelingSessionData> {
|
||||||
None
|
None
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Close the engine connection and wait for it to finish.
|
||||||
|
async fn close(&self);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Hash, Eq, Clone, Deserialize, Serialize, PartialEq, ts_rs::TS, JsonSchema)]
|
#[derive(Debug, Hash, Eq, Clone, Deserialize, Serialize, PartialEq, ts_rs::TS, JsonSchema)]
|
||||||
|
@ -2626,6 +2626,10 @@ impl ExecutorContext {
|
|||||||
|
|
||||||
self.prepare_snapshot().await
|
self.prepare_snapshot().await
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn close(&self) {
|
||||||
|
self.engine.close().await;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// For each argument given,
|
/// For each argument given,
|
||||||
|
@ -25,10 +25,12 @@ pub async fn execute_and_snapshot(
|
|||||||
) -> Result<image::DynamicImage, ExecError> {
|
) -> Result<image::DynamicImage, ExecError> {
|
||||||
let ctx = new_context(units, true, project_directory).await?;
|
let ctx = new_context(units, true, project_directory).await?;
|
||||||
let program = Program::parse_no_errs(code).map_err(KclErrorWithOutputs::no_outputs)?;
|
let program = Program::parse_no_errs(code).map_err(KclErrorWithOutputs::no_outputs)?;
|
||||||
do_execute_and_snapshot(&ctx, program)
|
let res = do_execute_and_snapshot(&ctx, program)
|
||||||
.await
|
.await
|
||||||
.map(|(_state, snap)| snap)
|
.map(|(_state, snap)| snap)
|
||||||
.map_err(|err| err.error)
|
.map_err(|err| err.error);
|
||||||
|
ctx.close().await;
|
||||||
|
res
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Executes a kcl program and takes a snapshot of the result.
|
/// Executes a kcl program and takes a snapshot of the result.
|
||||||
@ -39,14 +41,16 @@ pub async fn execute_and_snapshot_ast(
|
|||||||
project_directory: Option<PathBuf>,
|
project_directory: Option<PathBuf>,
|
||||||
) -> Result<(ProgramMemory, Vec<Operation>, Vec<ArtifactCommand>, image::DynamicImage), ExecErrorWithState> {
|
) -> Result<(ProgramMemory, Vec<Operation>, Vec<ArtifactCommand>, image::DynamicImage), ExecErrorWithState> {
|
||||||
let ctx = new_context(units, true, project_directory).await?;
|
let ctx = new_context(units, true, project_directory).await?;
|
||||||
do_execute_and_snapshot(&ctx, ast).await.map(|(state, snap)| {
|
let res = do_execute_and_snapshot(&ctx, ast).await.map(|(state, snap)| {
|
||||||
(
|
(
|
||||||
state.mod_local.memory,
|
state.mod_local.memory,
|
||||||
state.mod_local.operations,
|
state.mod_local.operations,
|
||||||
state.global.artifact_commands,
|
state.global.artifact_commands,
|
||||||
snap,
|
snap,
|
||||||
)
|
)
|
||||||
})
|
});
|
||||||
|
ctx.close().await;
|
||||||
|
res
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn execute_and_snapshot_no_auth(
|
pub async fn execute_and_snapshot_no_auth(
|
||||||
@ -56,10 +60,12 @@ pub async fn execute_and_snapshot_no_auth(
|
|||||||
) -> Result<image::DynamicImage, ExecError> {
|
) -> Result<image::DynamicImage, ExecError> {
|
||||||
let ctx = new_context(units, false, project_directory).await?;
|
let ctx = new_context(units, false, project_directory).await?;
|
||||||
let program = Program::parse_no_errs(code).map_err(KclErrorWithOutputs::no_outputs)?;
|
let program = Program::parse_no_errs(code).map_err(KclErrorWithOutputs::no_outputs)?;
|
||||||
do_execute_and_snapshot(&ctx, program)
|
let res = do_execute_and_snapshot(&ctx, program)
|
||||||
.await
|
.await
|
||||||
.map(|(_state, snap)| snap)
|
.map(|(_state, snap)| snap)
|
||||||
.map_err(|err| err.error)
|
.map_err(|err| err.error);
|
||||||
|
ctx.close().await;
|
||||||
|
res
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn do_execute_and_snapshot(
|
async fn do_execute_and_snapshot(
|
||||||
@ -80,6 +86,9 @@ async fn do_execute_and_snapshot(
|
|||||||
.map_err(|e| ExecError::BadPng(e.to_string()))
|
.map_err(|e| ExecError::BadPng(e.to_string()))
|
||||||
.and_then(|x| x.decode().map_err(|e| ExecError::BadPng(e.to_string())))
|
.and_then(|x| x.decode().map_err(|e| ExecError::BadPng(e.to_string())))
|
||||||
.map_err(|err| ExecErrorWithState::new(err, exec_state.clone()))?;
|
.map_err(|err| ExecErrorWithState::new(err, exec_state.clone()))?;
|
||||||
|
|
||||||
|
ctx.close().await;
|
||||||
|
|
||||||
Ok((exec_state, img))
|
Ok((exec_state, img))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -52,6 +52,8 @@ async fn cache_test(test_name: &str, variations: Vec<Variation<'_>>) -> Result<V
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ctx.close().await;
|
||||||
|
|
||||||
Ok(img_results)
|
Ok(img_results)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user