Comments and rename vars

This commit is contained in:
Adam Chalmers
2024-06-13 00:09:58 -05:00
parent 70023a31ef
commit f297e0827e

View File

@ -5,6 +5,7 @@ use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration;
use hyper::body::Bytes;
use hyper::header::CONTENT_TYPE;
use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Error, Response, Server};
@ -26,8 +27,10 @@ async fn main() -> anyhow::Result<()> {
#[derive(Debug)]
struct ServerArgs {
/// What port this server should listen on.
listen_on: SocketAddr,
worker_threads: u8,
/// How many connections to establish with the engine.
num_engine_conns: u8,
}
impl ServerArgs {
@ -36,15 +39,18 @@ impl ServerArgs {
listen_on: pargs
.opt_value_from_str("--listen-on")?
.unwrap_or("0.0.0.0:3333".parse().unwrap()),
worker_threads: pargs.opt_value_from_str("--worker-threads")?.unwrap_or(1),
num_engine_conns: pargs.opt_value_from_str("--num-engine-conns")?.unwrap_or(1),
};
println!("Config is {args:?}");
Ok(args)
}
}
/// Sent from the server to each worker.
struct WorkerReq {
body: Vec<u8>,
/// A KCL program, in UTF-8.
body: Bytes,
/// A channel to send the HTTP response back.
resp: oneshot::Sender<Response<Body>>,
}
@ -78,9 +84,9 @@ struct ServerState {
async fn start_server(args: ServerArgs) -> anyhow::Result<()> {
let ServerArgs {
listen_on,
worker_threads,
num_engine_conns,
} = args;
let workers: Vec<_> = (0..worker_threads).map(start_worker).collect();
let workers: Vec<_> = (0..num_engine_conns).map(start_worker).collect();
let state = Arc::new(ServerState {
workers,
req_num: 0.into(),
@ -115,7 +121,7 @@ async fn start_server(args: ServerArgs) -> anyhow::Result<()> {
}
async fn handle_request(req: hyper::Request<Body>, state3: Arc<ServerState>) -> Result<Response<Body>, Error> {
let whole_body = hyper::body::to_bytes(req.into_body()).await?;
let body = hyper::body::to_bytes(req.into_body()).await?;
// Round robin requests between each available worker.
let req_num = state3.req_num.fetch_add(1, Ordering::Relaxed);
@ -123,23 +129,18 @@ async fn handle_request(req: hyper::Request<Body>, state3: Arc<ServerState>) ->
// println!("Sending request {req_num} to worker {worker_id}");
let worker = state3.workers[worker_id].clone();
let (tx, rx) = oneshot::channel();
let req_sent = worker
.send(WorkerReq {
body: whole_body.into(),
resp: tx,
})
.await;
let req_sent = worker.send(WorkerReq { body, resp: tx }).await;
req_sent.unwrap();
let resp = rx.await.unwrap();
Ok::<_, Error>(resp)
Ok(resp)
}
/// Execute a KCL program, then respond with a PNG snapshot.
/// KCL errors (from engine or the executor) respond with HTTP Bad Gateway.
/// Malformed requests are HTTP Bad Request.
/// Successful requests contain a PNG as the body.
async fn snapshot_endpoint(body: Vec<u8>, state: ExecutorContext) -> Response<Body> {
let body = match serde_json::from_slice::<RequestBody>(&body) {
async fn snapshot_endpoint(body: Bytes, state: ExecutorContext) -> Response<Body> {
let body = match serde_json::from_slice::<RequestBody>(body.as_ref()) {
Ok(bd) => bd,
Err(e) => return bad_request(format!("Invalid request JSON: {e}")),
};