diff --git a/src/wasm-lib/Cargo.lock b/src/wasm-lib/Cargo.lock index 6c63514cf..a608b0166 100644 --- a/src/wasm-lib/Cargo.lock +++ b/src/wasm-lib/Cargo.lock @@ -1434,6 +1434,19 @@ dependencies = [ "syn 2.0.66", ] +[[package]] +name = "kcl-test-server" +version = "0.1.0" +dependencies = [ + "anyhow", + "hyper", + "kcl-lib", + "pico-args", + "serde", + "serde_json", + "tokio", +] + [[package]] name = "kittycad" version = "0.3.5" @@ -1815,6 +1828,12 @@ dependencies = [ "thiserror", ] +[[package]] +name = "pico-args" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5be167a7af36ee22fe3115051bc51f6e6c7054c9348e28deb4f49bd6f705a315" + [[package]] name = "pin-project" version = "1.1.5" @@ -2492,9 +2511,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.116" +version = "1.0.117" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3e17db7126d17feb94eb3fad46bf1a96b034e8aacbc2e775fe81505f8b0b2813" +checksum = "455182ea6142b14f93f4bc5320a2b31c1f266b66a4a5c858b013302a5d8cbfc3" dependencies = [ "indexmap 2.2.5", "itoa", diff --git a/src/wasm-lib/Cargo.toml b/src/wasm-lib/Cargo.toml index d625ac205..3d9f44eb2 100644 --- a/src/wasm-lib/Cargo.toml +++ b/src/wasm-lib/Cargo.toml @@ -65,6 +65,7 @@ members = [ "derive-docs", "kcl", "kcl-macros", + "kcl-test-server", ] [workspace.dependencies] diff --git a/src/wasm-lib/kcl-test-server/Cargo.toml b/src/wasm-lib/kcl-test-server/Cargo.toml new file mode 100644 index 000000000..35d87cfdf --- /dev/null +++ b/src/wasm-lib/kcl-test-server/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "kcl-test-server" +version = "0.1.0" +edition = "2021" + +[dependencies] +anyhow = "1.0.86" +hyper = { version = "0.14.29", features = ["server"] } +kcl-lib = { path = "../kcl" } +pico-args = "0.5.0" +serde = { version = "1.0.203", features = ["derive"] } +serde_json = "1.0.117" +tokio = { version = "1.38.0", features = ["macros", "rt-multi-thread"] } diff --git a/src/wasm-lib/kcl-test-server/src/lib.rs b/src/wasm-lib/kcl-test-server/src/lib.rs new file mode 100644 index 000000000..ba6f5f6ef --- /dev/null +++ b/src/wasm-lib/kcl-test-server/src/lib.rs @@ -0,0 +1,201 @@ +//! Executes KCL programs. +//! The server reuses the same engine session for each KCL program it receives. +use std::net::SocketAddr; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; +use std::time::Duration; + +use hyper::body::Bytes; +use hyper::header::CONTENT_TYPE; +use hyper::service::{make_service_fn, service_fn}; +use hyper::{Body, Error, Response, Server}; +use kcl_lib::executor::ExecutorContext; +use kcl_lib::settings::types::UnitLength; +use kcl_lib::test_server::RequestBody; +use tokio::sync::{mpsc, oneshot}; +use tokio::task::JoinHandle; +use tokio::time::sleep; + +#[derive(Debug)] +pub struct ServerArgs { + /// What port this server should listen on. + pub listen_on: SocketAddr, + /// How many connections to establish with the engine. + pub num_engine_conns: u8, +} + +impl ServerArgs { + pub fn parse(mut pargs: pico_args::Arguments) -> Result { + let args = ServerArgs { + listen_on: pargs + .opt_value_from_str("--listen-on")? + .unwrap_or("0.0.0.0:3333".parse().unwrap()), + num_engine_conns: pargs.opt_value_from_str("--num-engine-conns")?.unwrap_or(1), + }; + println!("Config is {args:?}"); + Ok(args) + } +} + +/// Sent from the server to each worker. +struct WorkerReq { + /// A KCL program, in UTF-8. + body: Bytes, + /// A channel to send the HTTP response back. + resp: oneshot::Sender>, +} + +/// Each worker has a connection to the engine, and accepts +/// KCL programs. When it receives one (over the mpsc channel) +/// it executes it and returns the result via a oneshot channel. +fn start_worker(i: u8) -> mpsc::Sender { + println!("Starting worker {i}"); + // Make a work queue for this worker. + let (tx, mut rx) = mpsc::channel(1); + tokio::task::spawn(async move { + let state = ExecutorContext::new_for_unit_test(UnitLength::Mm).await.unwrap(); + println!("Worker {i} ready"); + while let Some(req) = rx.recv().await { + let req: WorkerReq = req; + let resp = snapshot_endpoint(req.body, state.clone()).await; + if req.resp.send(resp).is_err() { + println!("\tWorker {i} exiting"); + } + } + println!("\tWorker {i} exiting"); + }); + tx +} + +struct ServerState { + workers: Vec>, + req_num: AtomicUsize, +} + +pub async fn start_server(args: ServerArgs) -> anyhow::Result<()> { + let ServerArgs { + listen_on, + num_engine_conns, + } = args; + let workers: Vec<_> = (0..num_engine_conns).map(start_worker).collect(); + let state = Arc::new(ServerState { + workers, + req_num: 0.into(), + }); + // In hyper, a `MakeService` is basically your server. + // It makes a `Service` for each connection, which manages the connection. + let make_service = make_service_fn( + // This closure is run for each connection. + move |_conn_info| { + // eprintln!("Connected to a client"); + let state = state.clone(); + async move { + // This is the `Service` which handles the connection. + // `service_fn` converts a function which returns a Response + // into a `Service`. + Ok::<_, Error>(service_fn(move |req| { + // eprintln!("Received a request"); + let state = state.clone(); + async move { handle_request(req, state).await } + })) + } + }, + ); + let server = Server::bind(&listen_on).serve(make_service); + println!("Listening on {listen_on}"); + println!("PID is {}", std::process::id()); + if let Err(e) = server.await { + eprintln!("Server error: {e}"); + return Err(e.into()); + } + Ok(()) +} + +async fn handle_request(req: hyper::Request, state3: Arc) -> Result, Error> { + let body = hyper::body::to_bytes(req.into_body()).await?; + + // Round robin requests between each available worker. + let req_num = state3.req_num.fetch_add(1, Ordering::Relaxed); + let worker_id = req_num % state3.workers.len(); + // println!("Sending request {req_num} to worker {worker_id}"); + let worker = state3.workers[worker_id].clone(); + let (tx, rx) = oneshot::channel(); + let req_sent = worker.send(WorkerReq { body, resp: tx }).await; + req_sent.unwrap(); + let resp = rx.await.unwrap(); + Ok(resp) +} + +/// Execute a KCL program, then respond with a PNG snapshot. +/// KCL errors (from engine or the executor) respond with HTTP Bad Gateway. +/// Malformed requests are HTTP Bad Request. +/// Successful requests contain a PNG as the body. +async fn snapshot_endpoint(body: Bytes, state: ExecutorContext) -> Response { + let body = match serde_json::from_slice::(body.as_ref()) { + Ok(bd) => bd, + Err(e) => return bad_request(format!("Invalid request JSON: {e}")), + }; + let RequestBody { kcl_program, test_name } = body; + let parser = match kcl_lib::token::lexer(&kcl_program) { + Ok(ts) => kcl_lib::parser::Parser::new(ts), + Err(e) => return bad_request(format!("tokenization error: {e}")), + }; + let program = match parser.ast() { + Ok(pr) => pr, + Err(e) => return bad_request(format!("Parse error: {e}")), + }; + eprintln!("Executing {test_name}"); + if let Err(e) = state.reset_scene().await { + return kcl_err(e); + } + // Let users know if the test is taking a long time. + let (done_tx, done_rx) = oneshot::channel::<()>(); + let timer = time_until(done_rx); + let snapshot = match state.execute_and_prepare_snapshot(program).await { + Ok(sn) => sn, + Err(e) => return kcl_err(e), + }; + let _ = done_tx.send(()); + timer.abort(); + eprintln!("\tServing response"); + let png_bytes = snapshot.contents.0; + let mut resp = Response::new(Body::from(png_bytes)); + resp.headers_mut().insert(CONTENT_TYPE, "image/png".parse().unwrap()); + resp +} + +fn bad_request(msg: String) -> Response { + eprintln!("\tBad request"); + let mut resp = Response::new(Body::from(msg)); + *resp.status_mut() = hyper::StatusCode::BAD_REQUEST; + resp +} + +fn bad_gateway(msg: String) -> Response { + eprintln!("\tBad gateway"); + let mut resp = Response::new(Body::from(msg)); + *resp.status_mut() = hyper::StatusCode::BAD_GATEWAY; + resp +} + +fn kcl_err(err: anyhow::Error) -> Response { + eprintln!("\tBad KCL"); + bad_gateway(format!("{err}")) +} + +fn time_until(done: oneshot::Receiver<()>) -> JoinHandle<()> { + tokio::task::spawn(async move { + let period = 10; + tokio::pin!(done); + for i in 1..=3 { + tokio::select! { + biased; + // If the test is done, no need for this timer anymore. + _ = &mut done => return, + _ = sleep(Duration::from_secs(period)) => { + eprintln!("\tTest has taken {}s", period * i); + }, + }; + } + }) +} diff --git a/src/wasm-lib/kcl/src/engine/conn.rs b/src/wasm-lib/kcl/src/engine/conn.rs index 6d6aa8c62..37426a9b2 100644 --- a/src/wasm-lib/kcl/src/engine/conn.rs +++ b/src/wasm-lib/kcl/src/engine/conn.rs @@ -40,23 +40,54 @@ pub struct TcpRead { stream: futures::stream::SplitStream>, } +/// Occurs when client couldn't read from the WebSocket to the engine. +// #[derive(Debug)] +pub enum WebSocketReadError { + /// Could not read a message due to WebSocket errors. + Read(tokio_tungstenite::tungstenite::Error), + /// WebSocket message didn't contain a valid message that the KCL Executor could parse. + Deser(anyhow::Error), +} + +impl From for WebSocketReadError { + fn from(e: anyhow::Error) -> Self { + Self::Deser(e) + } +} + impl TcpRead { - pub async fn read(&mut self) -> Result { + pub async fn read(&mut self) -> std::result::Result { let Some(msg) = self.stream.next().await else { - anyhow::bail!("Failed to read from websocket"); + return Err(anyhow::anyhow!("Failed to read from WebSocket").into()); }; - let msg: WebSocketResponse = match msg? { - WsMsg::Text(text) => serde_json::from_str(&text)?, - WsMsg::Binary(bin) => bson::from_slice(&bin)?, - other => anyhow::bail!("Unexpected websocket message from server: {}", other), + let msg = match msg { + Ok(msg) => msg, + Err(e) if matches!(e, tokio_tungstenite::tungstenite::Error::Protocol(_)) => { + return Err(WebSocketReadError::Read(e)) + } + Err(e) => return Err(anyhow::anyhow!("Error reading from engine's WebSocket: {e}").into()), + }; + let msg: WebSocketResponse = match msg { + WsMsg::Text(text) => serde_json::from_str(&text) + .map_err(anyhow::Error::from) + .map_err(WebSocketReadError::from)?, + WsMsg::Binary(bin) => bson::from_slice(&bin) + .map_err(anyhow::Error::from) + .map_err(WebSocketReadError::from)?, + other => return Err(anyhow::anyhow!("Unexpected WebSocket message from engine API: {other}").into()), }; Ok(msg) } } -#[derive(Debug)] pub struct TcpReadHandle { - handle: Arc>>, + handle: Arc>>, +} + +impl std::fmt::Debug for TcpReadHandle { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "TcpReadHandle") + } } impl Drop for TcpReadHandle { @@ -150,14 +181,17 @@ impl EngineConnection { match tcp_read.read().await { Ok(ws_resp) => { for e in ws_resp.errors.iter().flatten() { - println!("got error message: {e}"); + println!("got error message: {} {}", e.error_code, e.message); } if let Some(id) = ws_resp.request_id { responses_clone.insert(id, ws_resp.clone()); } } Err(e) => { - println!("got ws error: {:?}", e); + match &e { + WebSocketReadError::Read(e) => eprintln!("could not read from WS: {:?}", e), + WebSocketReadError::Deser(e) => eprintln!("could not deserialize msg from WS: {:?}", e), + } *socket_health_tcp_read.lock().unwrap() = SocketHealth::Inactive; return Err(e); } diff --git a/src/wasm-lib/kcl/src/executor.rs b/src/wasm-lib/kcl/src/executor.rs index 959d93417..64a10b109 100644 --- a/src/wasm-lib/kcl/src/executor.rs +++ b/src/wasm-lib/kcl/src/executor.rs @@ -15,6 +15,7 @@ use crate::{ engine::EngineManager, errors::{KclError, KclErrorDetails}, fs::FileManager, + settings::types::UnitLength, std::{FunctionKind, StdLib}, }; @@ -992,7 +993,7 @@ pub struct ExecutorContext { #[derive(Debug, Clone)] pub struct ExecutorSettings { /// The unit to use in modeling dimensions. - pub units: crate::settings::types::UnitLength, + pub units: UnitLength, /// Highlight edges of 3D objects? pub highlight_edges: bool, /// Whether or not Screen Space Ambient Occlusion (SSAO) is enabled. @@ -1083,6 +1084,57 @@ impl ExecutorContext { }) } + /// For executing unit tests. + #[cfg(not(target_arch = "wasm32"))] + pub async fn new_for_unit_test(units: UnitLength) -> Result { + let user_agent = concat!(env!("CARGO_PKG_NAME"), ".rs/", env!("CARGO_PKG_VERSION"),); + let http_client = reqwest::Client::builder() + .user_agent(user_agent) + // For file conversions we need this to be long. + .timeout(std::time::Duration::from_secs(600)) + .connect_timeout(std::time::Duration::from_secs(60)); + let ws_client = reqwest::Client::builder() + .user_agent(user_agent) + // For file conversions we need this to be long. + .timeout(std::time::Duration::from_secs(600)) + .connect_timeout(std::time::Duration::from_secs(60)) + .connection_verbose(true) + .tcp_keepalive(std::time::Duration::from_secs(600)) + .http1_only(); + + let token = std::env::var("KITTYCAD_API_TOKEN").expect("KITTYCAD_API_TOKEN not set"); + + // Create the client. + let mut client = kittycad::Client::new_from_reqwest(token, http_client, ws_client); + // Set a local engine address if it's set. + if let Ok(addr) = std::env::var("LOCAL_ENGINE_ADDR") { + client.set_base_url(addr); + } + + let ctx = ExecutorContext::new( + &client, + ExecutorSettings { + units, + highlight_edges: true, + enable_ssao: false, + }, + ) + .await?; + Ok(ctx) + } + + /// Clear everything in the scene. + pub async fn reset_scene(&self) -> Result<()> { + self.engine + .send_modeling_cmd( + uuid::Uuid::new_v4(), + SourceRange::default(), + kittycad::types::ModelingCmd::SceneClearAll {}, + ) + .await?; + Ok(()) + } + /// Perform the execution of a program. /// You can optionally pass in some initialization memory. /// Kurt uses this for partial execution. @@ -1309,7 +1361,7 @@ impl ExecutorContext { } /// Update the units for the executor. - pub fn update_units(&mut self, units: crate::settings::types::UnitLength) { + pub fn update_units(&mut self, units: UnitLength) { self.settings.units = units; } diff --git a/src/wasm-lib/kcl/src/lib.rs b/src/wasm-lib/kcl/src/lib.rs index 16e472fa1..fd61ecdcf 100644 --- a/src/wasm-lib/kcl/src/lib.rs +++ b/src/wasm-lib/kcl/src/lib.rs @@ -16,6 +16,7 @@ pub mod lsp; pub mod parser; pub mod settings; pub mod std; +pub mod test_server; pub mod thread; pub mod token; #[cfg(target_arch = "wasm32")] diff --git a/src/wasm-lib/kcl/src/test_server.rs b/src/wasm-lib/kcl/src/test_server.rs new file mode 100644 index 000000000..7e2a1b1e2 --- /dev/null +++ b/src/wasm-lib/kcl/src/test_server.rs @@ -0,0 +1,8 @@ +//! Types used to send data to the test server. + +#[derive(serde::Deserialize, serde::Serialize)] +pub struct RequestBody { + pub kcl_program: String, + #[serde(default)] + pub test_name: String, +}