KCL execution server (#2686)

Adds a new library, the kcl-test-server. It lets you easily start a HTTP server with one endpoint, which accepts JSON. The JSON body contains a KCL program and a test name. The server has a pool of active engine sessions, and when it gets a KCL program, it executes it on one of those engine sessions.

This addresses part of #2580 but currently the sketch-on-face tests don't pass with this new test server yet.

This is a library, not a binary, because I want to use it in both the wasm-lib unit tests and in the zoo CLI.
This commit is contained in:
Adam Chalmers
2024-06-18 14:38:25 -05:00
committed by GitHub
parent a79e365c0f
commit c61273085f
8 changed files with 343 additions and 14 deletions

View File

@ -1434,6 +1434,19 @@ dependencies = [
"syn 2.0.66",
]
[[package]]
name = "kcl-test-server"
version = "0.1.0"
dependencies = [
"anyhow",
"hyper",
"kcl-lib",
"pico-args",
"serde",
"serde_json",
"tokio",
]
[[package]]
name = "kittycad"
version = "0.3.5"
@ -1815,6 +1828,12 @@ dependencies = [
"thiserror",
]
[[package]]
name = "pico-args"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5be167a7af36ee22fe3115051bc51f6e6c7054c9348e28deb4f49bd6f705a315"
[[package]]
name = "pin-project"
version = "1.1.5"
@ -2492,9 +2511,9 @@ dependencies = [
[[package]]
name = "serde_json"
version = "1.0.116"
version = "1.0.117"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3e17db7126d17feb94eb3fad46bf1a96b034e8aacbc2e775fe81505f8b0b2813"
checksum = "455182ea6142b14f93f4bc5320a2b31c1f266b66a4a5c858b013302a5d8cbfc3"
dependencies = [
"indexmap 2.2.5",
"itoa",

View File

@ -65,6 +65,7 @@ members = [
"derive-docs",
"kcl",
"kcl-macros",
"kcl-test-server",
]
[workspace.dependencies]

View File

@ -0,0 +1,13 @@
[package]
name = "kcl-test-server"
version = "0.1.0"
edition = "2021"
[dependencies]
anyhow = "1.0.86"
hyper = { version = "0.14.29", features = ["server"] }
kcl-lib = { path = "../kcl" }
pico-args = "0.5.0"
serde = { version = "1.0.203", features = ["derive"] }
serde_json = "1.0.117"
tokio = { version = "1.38.0", features = ["macros", "rt-multi-thread"] }

View File

@ -0,0 +1,201 @@
//! Executes KCL programs.
//! The server reuses the same engine session for each KCL program it receives.
use std::net::SocketAddr;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration;
use hyper::body::Bytes;
use hyper::header::CONTENT_TYPE;
use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Error, Response, Server};
use kcl_lib::executor::ExecutorContext;
use kcl_lib::settings::types::UnitLength;
use kcl_lib::test_server::RequestBody;
use tokio::sync::{mpsc, oneshot};
use tokio::task::JoinHandle;
use tokio::time::sleep;
#[derive(Debug)]
pub struct ServerArgs {
/// What port this server should listen on.
pub listen_on: SocketAddr,
/// How many connections to establish with the engine.
pub num_engine_conns: u8,
}
impl ServerArgs {
pub fn parse(mut pargs: pico_args::Arguments) -> Result<Self, pico_args::Error> {
let args = ServerArgs {
listen_on: pargs
.opt_value_from_str("--listen-on")?
.unwrap_or("0.0.0.0:3333".parse().unwrap()),
num_engine_conns: pargs.opt_value_from_str("--num-engine-conns")?.unwrap_or(1),
};
println!("Config is {args:?}");
Ok(args)
}
}
/// Sent from the server to each worker.
struct WorkerReq {
/// A KCL program, in UTF-8.
body: Bytes,
/// A channel to send the HTTP response back.
resp: oneshot::Sender<Response<Body>>,
}
/// Each worker has a connection to the engine, and accepts
/// KCL programs. When it receives one (over the mpsc channel)
/// it executes it and returns the result via a oneshot channel.
fn start_worker(i: u8) -> mpsc::Sender<WorkerReq> {
println!("Starting worker {i}");
// Make a work queue for this worker.
let (tx, mut rx) = mpsc::channel(1);
tokio::task::spawn(async move {
let state = ExecutorContext::new_for_unit_test(UnitLength::Mm).await.unwrap();
println!("Worker {i} ready");
while let Some(req) = rx.recv().await {
let req: WorkerReq = req;
let resp = snapshot_endpoint(req.body, state.clone()).await;
if req.resp.send(resp).is_err() {
println!("\tWorker {i} exiting");
}
}
println!("\tWorker {i} exiting");
});
tx
}
struct ServerState {
workers: Vec<mpsc::Sender<WorkerReq>>,
req_num: AtomicUsize,
}
pub async fn start_server(args: ServerArgs) -> anyhow::Result<()> {
let ServerArgs {
listen_on,
num_engine_conns,
} = args;
let workers: Vec<_> = (0..num_engine_conns).map(start_worker).collect();
let state = Arc::new(ServerState {
workers,
req_num: 0.into(),
});
// In hyper, a `MakeService` is basically your server.
// It makes a `Service` for each connection, which manages the connection.
let make_service = make_service_fn(
// This closure is run for each connection.
move |_conn_info| {
// eprintln!("Connected to a client");
let state = state.clone();
async move {
// This is the `Service` which handles the connection.
// `service_fn` converts a function which returns a Response
// into a `Service`.
Ok::<_, Error>(service_fn(move |req| {
// eprintln!("Received a request");
let state = state.clone();
async move { handle_request(req, state).await }
}))
}
},
);
let server = Server::bind(&listen_on).serve(make_service);
println!("Listening on {listen_on}");
println!("PID is {}", std::process::id());
if let Err(e) = server.await {
eprintln!("Server error: {e}");
return Err(e.into());
}
Ok(())
}
async fn handle_request(req: hyper::Request<Body>, state3: Arc<ServerState>) -> Result<Response<Body>, Error> {
let body = hyper::body::to_bytes(req.into_body()).await?;
// Round robin requests between each available worker.
let req_num = state3.req_num.fetch_add(1, Ordering::Relaxed);
let worker_id = req_num % state3.workers.len();
// println!("Sending request {req_num} to worker {worker_id}");
let worker = state3.workers[worker_id].clone();
let (tx, rx) = oneshot::channel();
let req_sent = worker.send(WorkerReq { body, resp: tx }).await;
req_sent.unwrap();
let resp = rx.await.unwrap();
Ok(resp)
}
/// Execute a KCL program, then respond with a PNG snapshot.
/// KCL errors (from engine or the executor) respond with HTTP Bad Gateway.
/// Malformed requests are HTTP Bad Request.
/// Successful requests contain a PNG as the body.
async fn snapshot_endpoint(body: Bytes, state: ExecutorContext) -> Response<Body> {
let body = match serde_json::from_slice::<RequestBody>(body.as_ref()) {
Ok(bd) => bd,
Err(e) => return bad_request(format!("Invalid request JSON: {e}")),
};
let RequestBody { kcl_program, test_name } = body;
let parser = match kcl_lib::token::lexer(&kcl_program) {
Ok(ts) => kcl_lib::parser::Parser::new(ts),
Err(e) => return bad_request(format!("tokenization error: {e}")),
};
let program = match parser.ast() {
Ok(pr) => pr,
Err(e) => return bad_request(format!("Parse error: {e}")),
};
eprintln!("Executing {test_name}");
if let Err(e) = state.reset_scene().await {
return kcl_err(e);
}
// Let users know if the test is taking a long time.
let (done_tx, done_rx) = oneshot::channel::<()>();
let timer = time_until(done_rx);
let snapshot = match state.execute_and_prepare_snapshot(program).await {
Ok(sn) => sn,
Err(e) => return kcl_err(e),
};
let _ = done_tx.send(());
timer.abort();
eprintln!("\tServing response");
let png_bytes = snapshot.contents.0;
let mut resp = Response::new(Body::from(png_bytes));
resp.headers_mut().insert(CONTENT_TYPE, "image/png".parse().unwrap());
resp
}
fn bad_request(msg: String) -> Response<Body> {
eprintln!("\tBad request");
let mut resp = Response::new(Body::from(msg));
*resp.status_mut() = hyper::StatusCode::BAD_REQUEST;
resp
}
fn bad_gateway(msg: String) -> Response<Body> {
eprintln!("\tBad gateway");
let mut resp = Response::new(Body::from(msg));
*resp.status_mut() = hyper::StatusCode::BAD_GATEWAY;
resp
}
fn kcl_err(err: anyhow::Error) -> Response<Body> {
eprintln!("\tBad KCL");
bad_gateway(format!("{err}"))
}
fn time_until(done: oneshot::Receiver<()>) -> JoinHandle<()> {
tokio::task::spawn(async move {
let period = 10;
tokio::pin!(done);
for i in 1..=3 {
tokio::select! {
biased;
// If the test is done, no need for this timer anymore.
_ = &mut done => return,
_ = sleep(Duration::from_secs(period)) => {
eprintln!("\tTest has taken {}s", period * i);
},
};
}
})
}

View File

@ -40,23 +40,54 @@ pub struct TcpRead {
stream: futures::stream::SplitStream<tokio_tungstenite::WebSocketStream<reqwest::Upgraded>>,
}
/// Occurs when client couldn't read from the WebSocket to the engine.
// #[derive(Debug)]
pub enum WebSocketReadError {
/// Could not read a message due to WebSocket errors.
Read(tokio_tungstenite::tungstenite::Error),
/// WebSocket message didn't contain a valid message that the KCL Executor could parse.
Deser(anyhow::Error),
}
impl From<anyhow::Error> for WebSocketReadError {
fn from(e: anyhow::Error) -> Self {
Self::Deser(e)
}
}
impl TcpRead {
pub async fn read(&mut self) -> Result<WebSocketResponse> {
pub async fn read(&mut self) -> std::result::Result<WebSocketResponse, WebSocketReadError> {
let Some(msg) = self.stream.next().await else {
anyhow::bail!("Failed to read from websocket");
return Err(anyhow::anyhow!("Failed to read from WebSocket").into());
};
let msg: WebSocketResponse = match msg? {
WsMsg::Text(text) => serde_json::from_str(&text)?,
WsMsg::Binary(bin) => bson::from_slice(&bin)?,
other => anyhow::bail!("Unexpected websocket message from server: {}", other),
let msg = match msg {
Ok(msg) => msg,
Err(e) if matches!(e, tokio_tungstenite::tungstenite::Error::Protocol(_)) => {
return Err(WebSocketReadError::Read(e))
}
Err(e) => return Err(anyhow::anyhow!("Error reading from engine's WebSocket: {e}").into()),
};
let msg: WebSocketResponse = match msg {
WsMsg::Text(text) => serde_json::from_str(&text)
.map_err(anyhow::Error::from)
.map_err(WebSocketReadError::from)?,
WsMsg::Binary(bin) => bson::from_slice(&bin)
.map_err(anyhow::Error::from)
.map_err(WebSocketReadError::from)?,
other => return Err(anyhow::anyhow!("Unexpected WebSocket message from engine API: {other}").into()),
};
Ok(msg)
}
}
#[derive(Debug)]
pub struct TcpReadHandle {
handle: Arc<tokio::task::JoinHandle<Result<()>>>,
handle: Arc<tokio::task::JoinHandle<Result<(), WebSocketReadError>>>,
}
impl std::fmt::Debug for TcpReadHandle {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "TcpReadHandle")
}
}
impl Drop for TcpReadHandle {
@ -150,14 +181,17 @@ impl EngineConnection {
match tcp_read.read().await {
Ok(ws_resp) => {
for e in ws_resp.errors.iter().flatten() {
println!("got error message: {e}");
println!("got error message: {} {}", e.error_code, e.message);
}
if let Some(id) = ws_resp.request_id {
responses_clone.insert(id, ws_resp.clone());
}
}
Err(e) => {
println!("got ws error: {:?}", e);
match &e {
WebSocketReadError::Read(e) => eprintln!("could not read from WS: {:?}", e),
WebSocketReadError::Deser(e) => eprintln!("could not deserialize msg from WS: {:?}", e),
}
*socket_health_tcp_read.lock().unwrap() = SocketHealth::Inactive;
return Err(e);
}

View File

@ -15,6 +15,7 @@ use crate::{
engine::EngineManager,
errors::{KclError, KclErrorDetails},
fs::FileManager,
settings::types::UnitLength,
std::{FunctionKind, StdLib},
};
@ -992,7 +993,7 @@ pub struct ExecutorContext {
#[derive(Debug, Clone)]
pub struct ExecutorSettings {
/// The unit to use in modeling dimensions.
pub units: crate::settings::types::UnitLength,
pub units: UnitLength,
/// Highlight edges of 3D objects?
pub highlight_edges: bool,
/// Whether or not Screen Space Ambient Occlusion (SSAO) is enabled.
@ -1083,6 +1084,57 @@ impl ExecutorContext {
})
}
/// For executing unit tests.
#[cfg(not(target_arch = "wasm32"))]
pub async fn new_for_unit_test(units: UnitLength) -> Result<Self> {
let user_agent = concat!(env!("CARGO_PKG_NAME"), ".rs/", env!("CARGO_PKG_VERSION"),);
let http_client = reqwest::Client::builder()
.user_agent(user_agent)
// For file conversions we need this to be long.
.timeout(std::time::Duration::from_secs(600))
.connect_timeout(std::time::Duration::from_secs(60));
let ws_client = reqwest::Client::builder()
.user_agent(user_agent)
// For file conversions we need this to be long.
.timeout(std::time::Duration::from_secs(600))
.connect_timeout(std::time::Duration::from_secs(60))
.connection_verbose(true)
.tcp_keepalive(std::time::Duration::from_secs(600))
.http1_only();
let token = std::env::var("KITTYCAD_API_TOKEN").expect("KITTYCAD_API_TOKEN not set");
// Create the client.
let mut client = kittycad::Client::new_from_reqwest(token, http_client, ws_client);
// Set a local engine address if it's set.
if let Ok(addr) = std::env::var("LOCAL_ENGINE_ADDR") {
client.set_base_url(addr);
}
let ctx = ExecutorContext::new(
&client,
ExecutorSettings {
units,
highlight_edges: true,
enable_ssao: false,
},
)
.await?;
Ok(ctx)
}
/// Clear everything in the scene.
pub async fn reset_scene(&self) -> Result<()> {
self.engine
.send_modeling_cmd(
uuid::Uuid::new_v4(),
SourceRange::default(),
kittycad::types::ModelingCmd::SceneClearAll {},
)
.await?;
Ok(())
}
/// Perform the execution of a program.
/// You can optionally pass in some initialization memory.
/// Kurt uses this for partial execution.
@ -1309,7 +1361,7 @@ impl ExecutorContext {
}
/// Update the units for the executor.
pub fn update_units(&mut self, units: crate::settings::types::UnitLength) {
pub fn update_units(&mut self, units: UnitLength) {
self.settings.units = units;
}

View File

@ -16,6 +16,7 @@ pub mod lsp;
pub mod parser;
pub mod settings;
pub mod std;
pub mod test_server;
pub mod thread;
pub mod token;
#[cfg(target_arch = "wasm32")]

View File

@ -0,0 +1,8 @@
//! Types used to send data to the test server.
#[derive(serde::Deserialize, serde::Serialize)]
pub struct RequestBody {
pub kcl_program: String,
#[serde(default)]
pub test_name: String,
}