KCL test server starts a connection pool

This commit is contained in:
Adam Chalmers
2024-06-12 23:45:04 -05:00
parent 793e3cfa95
commit fb0def6797
3 changed files with 90 additions and 11 deletions

View File

@ -1441,6 +1441,7 @@ dependencies = [
"anyhow",
"hyper",
"kcl-lib",
"pico-args",
"serde",
"serde_json",
"tokio",
@ -1827,6 +1828,12 @@ dependencies = [
"thiserror",
]
[[package]]
name = "pico-args"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5be167a7af36ee22fe3115051bc51f6e6c7054c9348e28deb4f49bd6f705a315"
[[package]]
name = "pin-project"
version = "1.1.5"

View File

@ -7,6 +7,7 @@ edition = "2021"
anyhow = "1.0.86"
hyper = { version = "0.14.29", features = ["server"] }
kcl-lib = { path = "../kcl" }
pico-args = "0.5.0"
serde = { version = "1.0.203", features = ["derive"] }
serde_json = "1.0.117"
tokio = { version = "1.38.0", features = ["macros", "rt-multi-thread"] }

View File

@ -1,6 +1,8 @@
//! Executes KCL programs.
//! The server reuses the same engine session for each KCL program it receives.
use std::net::SocketAddr;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration;
use hyper::header::CONTENT_TYPE;
@ -9,24 +11,79 @@ use hyper::{Body, Error, Response, Server};
use kcl_lib::executor::ExecutorContext;
use kcl_lib::settings::types::UnitLength;
use kcl_lib::test_server::RequestBody;
use tokio::sync::oneshot;
use tokio::sync::{mpsc, oneshot};
use tokio::task::JoinHandle;
use tokio::time::sleep;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
// Parse the CLI arguments.
let mut args: Vec<_> = std::env::args().collect();
args.reverse();
let _process_name = args.pop().unwrap();
let listen_on = args.pop().unwrap_or_else(|| "0.0.0.0:3333".to_owned()).parse()?;
let pargs = pico_args::Arguments::from_env();
let args = ServerArgs::parse(pargs)?;
// Run the actual server.
start(listen_on).await
start_server(args).await
}
pub async fn start(listen_on: SocketAddr) -> anyhow::Result<()> {
let state = ExecutorContext::new_for_unit_test(UnitLength::Mm).await?;
#[derive(Debug)]
struct ServerArgs {
listen_on: SocketAddr,
worker_threads: u8,
}
impl ServerArgs {
fn parse(mut pargs: pico_args::Arguments) -> Result<Self, pico_args::Error> {
let args = ServerArgs {
listen_on: pargs
.opt_value_from_str("--listen-on")?
.unwrap_or("0.0.0.0:3333".parse().unwrap()),
worker_threads: pargs.opt_value_from_str("--worker-threads")?.unwrap_or(1),
};
println!("Config is {args:?}");
Ok(args)
}
}
struct WorkerReq {
body: Vec<u8>,
resp: oneshot::Sender<Response<Body>>,
}
/// Each worker has a connection to the engine, and accepts
/// KCL programs. When it receives one (over the mpsc channel)
/// it executes it and returns the result via a oneshot channel.
fn start_worker(i: u8) -> mpsc::Sender<WorkerReq> {
println!("Starting worker {i}");
// Make a work queue for this worker.
let (tx, mut rx) = mpsc::channel(1);
tokio::task::spawn(async move {
let state = ExecutorContext::new_for_unit_test(UnitLength::Mm).await.unwrap();
println!("Worker {i} ready");
while let Some(req) = rx.recv().await {
let req: WorkerReq = req;
let resp = snapshot_endpoint(req.body, state.clone()).await;
if req.resp.send(resp).is_err() {
println!("\tWorker {i} exiting");
}
}
println!("\tWorker {i} exiting");
});
tx
}
async fn start_server(args: ServerArgs) -> anyhow::Result<()> {
let ServerArgs {
listen_on,
worker_threads,
} = args;
let workers: Vec<_> = (0..worker_threads).map(start_worker).collect();
struct State {
workers: Vec<mpsc::Sender<WorkerReq>>,
req_num: AtomicUsize,
}
let state = Arc::new(State {
workers,
req_num: 0.into(),
});
// In hyper, a `MakeService` is basically your server.
// It makes a `Service` for each connection, which manages the connection.
let make_service = make_service_fn(
@ -41,10 +98,24 @@ pub async fn start(listen_on: SocketAddr) -> anyhow::Result<()> {
Ok::<_, Error>(service_fn(move |req| {
// eprintln!("Received a request");
let state3 = state2.clone();
// TODO: Don't let multiple requests through at once.
async move {
let whole_body = hyper::body::to_bytes(req.into_body()).await?;
Ok::<_, Error>(snapshot_endpoint(whole_body.into(), state3).await)
// Round robin requests between each available worker.
let req_num = state3.req_num.fetch_add(1, Ordering::Relaxed);
let worker_id = req_num % state3.workers.len();
// println!("Sending request {req_num} to worker {worker_id}");
let worker = state3.workers[worker_id].clone();
let (tx, rx) = oneshot::channel();
let req_sent = worker
.send(WorkerReq {
body: whole_body.into(),
resp: tx,
})
.await;
req_sent.unwrap();
let resp = rx.await.unwrap();
Ok::<_, Error>(resp)
}
}))
}