diff --git a/flake.lock b/flake.lock index dff03418b..20c3e4946 100644 --- a/flake.lock +++ b/flake.lock @@ -2,11 +2,11 @@ "nodes": { "nixpkgs": { "locked": { - "lastModified": 1721933792, - "narHash": "sha256-zYVwABlQnxpbaHMfX6Wt9jhyQstFYwN2XjleOJV3VVg=", + "lastModified": 1736320768, + "narHash": "sha256-nIYdTAiKIGnFNugbomgBJR+Xv5F1ZQU+HfaBqJKroC0=", "owner": "NixOS", "repo": "nixpkgs", - "rev": "2122a9b35b35719ad9a395fe783eabb092df01b1", + "rev": "4bc9c909d9ac828a039f288cf872d16d38185db8", "type": "github" }, "original": { @@ -18,11 +18,11 @@ }, "nixpkgs_2": { "locked": { - "lastModified": 1718428119, - "narHash": "sha256-WdWDpNaq6u1IPtxtYHHWpl5BmabtpmLnMAx0RdJ/vo8=", + "lastModified": 1728538411, + "narHash": "sha256-f0SBJz1eZ2yOuKUr5CA9BHULGXVSn6miBuUWdTyhUhU=", "owner": "NixOS", "repo": "nixpkgs", - "rev": "e6cea36f83499eb4e9cd184c8a8e823296b50ad5", + "rev": "b69de56fac8c2b6f8fd27f2eca01dcda8e0a4221", "type": "github" }, "original": { @@ -43,11 +43,11 @@ "nixpkgs": "nixpkgs_2" }, "locked": { - "lastModified": 1721960387, - "narHash": "sha256-o21ax+745ETGXrcgc/yUuLw1SI77ymp3xEpJt+w/kks=", + "lastModified": 1736476219, + "narHash": "sha256-+qyv3QqdZCdZ3cSO/cbpEY6tntyYjfe1bB12mdpNFaY=", "owner": "oxalica", "repo": "rust-overlay", - "rev": "9cbf831c5b20a53354fc12758abd05966f9f1699", + "rev": "de30cc5963da22e9742bbbbb9a3344570ed237b9", "type": "github" }, "original": { diff --git a/src/wasm-lib/kcl-to-core/src/conn_mock_core.rs b/src/wasm-lib/kcl-to-core/src/conn_mock_core.rs index 9fc98b775..e3fd9c798 100644 --- a/src/wasm-lib/kcl-to-core/src/conn_mock_core.rs +++ b/src/wasm-lib/kcl-to-core/src/conn_mock_core.rs @@ -502,4 +502,6 @@ impl kcl_lib::EngineManager for EngineConnection { })), } } + + async fn close(&self) {} } diff --git a/src/wasm-lib/kcl/src/engine/conn.rs b/src/wasm-lib/kcl/src/engine/conn.rs index 013707013..fe0039471 100644 --- a/src/wasm-lib/kcl/src/engine/conn.rs +++ b/src/wasm-lib/kcl/src/engine/conn.rs @@ -37,9 +37,10 @@ enum SocketHealth { } type WebSocketTcpWrite = futures::stream::SplitSink, WsMsg>; -#[derive(Debug, Clone)] +#[derive(Debug)] pub struct EngineConnection { engine_req_tx: mpsc::Sender, + shutdown_tx: mpsc::Sender<()>, responses: Arc>, pending_errors: Arc>>, #[allow(dead_code)] @@ -130,21 +131,49 @@ struct ToEngineReq { impl EngineConnection { /// Start waiting for incoming engine requests, and send each one over the WebSocket to the engine. - async fn start_write_actor(mut tcp_write: WebSocketTcpWrite, mut engine_req_rx: mpsc::Receiver) { - while let Some(req) = engine_req_rx.recv().await { - let ToEngineReq { req, request_sent } = req; - let res = if let WebSocketRequest::ModelingCmdReq(ModelingCmdReq { - cmd: ModelingCmd::ImportFiles { .. }, - cmd_id: _, - }) = &req - { - // Send it as binary. - Self::inner_send_to_engine_binary(req, &mut tcp_write).await - } else { - Self::inner_send_to_engine(req, &mut tcp_write).await - }; - let _ = request_sent.send(res); + async fn start_write_actor( + mut tcp_write: WebSocketTcpWrite, + mut engine_req_rx: mpsc::Receiver, + mut shutdown_rx: mpsc::Receiver<()>, + ) { + loop { + tokio::select! { + maybe_req = engine_req_rx.recv() => { + match maybe_req { + Some(ToEngineReq { req, request_sent }) => { + // Decide whether to send as binary or text, + // then send to the engine. + let res = if let WebSocketRequest::ModelingCmdReq(ModelingCmdReq { + cmd: ModelingCmd::ImportFiles { .. }, + cmd_id: _, + }) = &req + { + Self::inner_send_to_engine_binary(req, &mut tcp_write).await + } else { + Self::inner_send_to_engine(req, &mut tcp_write).await + }; + + // Let the caller know we’ve sent the request (ok or error). + let _ = request_sent.send(res); + } + None => { + // The engine_req_rx channel has closed, so no more requests. + // We'll gracefully exit the loop and close the engine. + break; + } + } + }, + + // If we get a shutdown signal, close the engine immediately and return. + _ = shutdown_rx.recv() => { + let _ = Self::inner_close_engine(&mut tcp_write).await; + return; + } + } } + + // If we exit the loop (e.g. engine_req_rx was closed), + // still gracefully close the engine before returning. let _ = Self::inner_close_engine(&mut tcp_write).await; } @@ -194,7 +223,8 @@ impl EngineConnection { let (tcp_write, tcp_read) = ws_stream.split(); let (engine_req_tx, engine_req_rx) = mpsc::channel(10); - tokio::task::spawn(Self::start_write_actor(tcp_write, engine_req_rx)); + let (shutdown_tx, shutdown_rx) = mpsc::channel(1); + tokio::task::spawn(Self::start_write_actor(tcp_write, engine_req_rx, shutdown_rx)); let mut tcp_read = TcpRead { stream: tcp_read }; @@ -304,6 +334,7 @@ impl EngineConnection { Ok(EngineConnection { engine_req_tx, + shutdown_tx, tcp_read_handle: Arc::new(TcpReadHandle { handle: Arc::new(tcp_read_handle), }), @@ -484,4 +515,15 @@ impl EngineManager for EngineConnection { fn get_session_data(&self) -> Option { self.session_data.lock().unwrap().clone() } + + async fn close(&self) { + let _ = self.shutdown_tx.send(()).await; + loop { + if let Ok(guard) = self.socket_health.lock() { + if *guard == SocketHealth::Inactive { + return; + } + } + } + } } diff --git a/src/wasm-lib/kcl/src/engine/conn_mock.rs b/src/wasm-lib/kcl/src/engine/conn_mock.rs index 8e5dc2cfe..3b868f48c 100644 --- a/src/wasm-lib/kcl/src/engine/conn_mock.rs +++ b/src/wasm-lib/kcl/src/engine/conn_mock.rs @@ -160,4 +160,6 @@ impl crate::engine::EngineManager for EngineConnection { })), } } + + async fn close(&self) {} } diff --git a/src/wasm-lib/kcl/src/engine/conn_wasm.rs b/src/wasm-lib/kcl/src/engine/conn_wasm.rs index cd5360409..367f75ba1 100644 --- a/src/wasm-lib/kcl/src/engine/conn_wasm.rs +++ b/src/wasm-lib/kcl/src/engine/conn_wasm.rs @@ -267,4 +267,7 @@ impl crate::engine::EngineManager for EngineConnection { Ok(ws_result) } + + // maybe we can actually impl this here? not sure how atm. + async fn close(&self) {} } diff --git a/src/wasm-lib/kcl/src/engine/mod.rs b/src/wasm-lib/kcl/src/engine/mod.rs index 929611ac6..ec7350771 100644 --- a/src/wasm-lib/kcl/src/engine/mod.rs +++ b/src/wasm-lib/kcl/src/engine/mod.rs @@ -600,6 +600,9 @@ pub trait EngineManager: std::fmt::Debug + Send + Sync + 'static { fn get_session_data(&self) -> Option { None } + + /// Close the engine connection and wait for it to finish. + async fn close(&self); } #[derive(Debug, Hash, Eq, Clone, Deserialize, Serialize, PartialEq, ts_rs::TS, JsonSchema)] diff --git a/src/wasm-lib/kcl/src/execution/mod.rs b/src/wasm-lib/kcl/src/execution/mod.rs index cf9f5fb3e..b66a35641 100644 --- a/src/wasm-lib/kcl/src/execution/mod.rs +++ b/src/wasm-lib/kcl/src/execution/mod.rs @@ -2626,6 +2626,10 @@ impl ExecutorContext { self.prepare_snapshot().await } + + pub async fn close(&self) { + self.engine.close().await; + } } /// For each argument given, diff --git a/src/wasm-lib/kcl/src/test_server.rs b/src/wasm-lib/kcl/src/test_server.rs index 4bfe5d516..e768c938c 100644 --- a/src/wasm-lib/kcl/src/test_server.rs +++ b/src/wasm-lib/kcl/src/test_server.rs @@ -25,10 +25,12 @@ pub async fn execute_and_snapshot( ) -> Result { let ctx = new_context(units, true, project_directory).await?; let program = Program::parse_no_errs(code).map_err(KclErrorWithOutputs::no_outputs)?; - do_execute_and_snapshot(&ctx, program) + let res = do_execute_and_snapshot(&ctx, program) .await .map(|(_state, snap)| snap) - .map_err(|err| err.error) + .map_err(|err| err.error); + ctx.close().await; + res } /// Executes a kcl program and takes a snapshot of the result. @@ -39,14 +41,16 @@ pub async fn execute_and_snapshot_ast( project_directory: Option, ) -> Result<(ProgramMemory, Vec, Vec, image::DynamicImage), ExecErrorWithState> { let ctx = new_context(units, true, project_directory).await?; - do_execute_and_snapshot(&ctx, ast).await.map(|(state, snap)| { + let res = do_execute_and_snapshot(&ctx, ast).await.map(|(state, snap)| { ( state.mod_local.memory, state.mod_local.operations, state.global.artifact_commands, snap, ) - }) + }); + ctx.close().await; + res } pub async fn execute_and_snapshot_no_auth( @@ -56,10 +60,12 @@ pub async fn execute_and_snapshot_no_auth( ) -> Result { let ctx = new_context(units, false, project_directory).await?; let program = Program::parse_no_errs(code).map_err(KclErrorWithOutputs::no_outputs)?; - do_execute_and_snapshot(&ctx, program) + let res = do_execute_and_snapshot(&ctx, program) .await .map(|(_state, snap)| snap) - .map_err(|err| err.error) + .map_err(|err| err.error); + ctx.close().await; + res } async fn do_execute_and_snapshot( @@ -80,6 +86,9 @@ async fn do_execute_and_snapshot( .map_err(|e| ExecError::BadPng(e.to_string())) .and_then(|x| x.decode().map_err(|e| ExecError::BadPng(e.to_string()))) .map_err(|err| ExecErrorWithState::new(err, exec_state.clone()))?; + + ctx.close().await; + Ok((exec_state, img)) } diff --git a/src/wasm-lib/tests/executor/cache.rs b/src/wasm-lib/tests/executor/cache.rs index 6c6a76174..579a19aed 100644 --- a/src/wasm-lib/tests/executor/cache.rs +++ b/src/wasm-lib/tests/executor/cache.rs @@ -52,6 +52,8 @@ async fn cache_test(test_name: &str, variations: Vec>) -> Result