batch shit

Signed-off-by: Jess Frazelle <github@jessfraz.com>
This commit is contained in:
Jess Frazelle
2025-04-07 13:47:56 -07:00
parent f00a7059a2
commit 6d1035710f

View File

@ -272,6 +272,8 @@ pub trait EngineManager: std::fmt::Debug + Send + Sync + 'static {
self.batch().write().await.push((req, source_range)); self.batch().write().await.push((req, source_range));
self.stats().commands_batched.fetch_add(1, Ordering::Relaxed); self.stats().commands_batched.fetch_add(1, Ordering::Relaxed);
self.flush_batch(false, source_range).await?;
Ok(()) Ok(())
} }
@ -324,36 +326,36 @@ pub trait EngineManager: std::fmt::Debug + Send + Sync + 'static {
source_range: SourceRange, source_range: SourceRange,
cmd: &ModelingCmd, cmd: &ModelingCmd,
) -> Result<OkWebSocketResponseData, crate::errors::KclError> { ) -> Result<OkWebSocketResponseData, crate::errors::KclError> {
self.batch_modeling_cmd(id, source_range, cmd).await?; let mut requests = self.take_batch().await.clone();
// Add the command to the batch.
requests.push((
WebSocketRequest::ModelingCmdReq(ModelingCmdReq {
cmd: cmd.clone(),
cmd_id: id.into(),
}),
source_range,
));
self.stats().commands_batched.fetch_add(1, Ordering::Relaxed);
// Flush the batch queue. // Flush the batch queue.
self.flush_batch(false, source_range).await self.run_batch(requests, source_range).await
} }
/// Force flush the batch queue. /// Run the batch for the specifc commands.
async fn flush_batch( async fn run_batch(
&self, &self,
// Whether or not to flush the end commands as well. orig_requests: Vec<(WebSocketRequest, SourceRange)>,
// We only do this at the very end of the file.
batch_end: bool,
source_range: SourceRange, source_range: SourceRange,
) -> Result<OkWebSocketResponseData, crate::errors::KclError> { ) -> Result<OkWebSocketResponseData, crate::errors::KclError> {
let all_requests = if batch_end {
let mut requests = self.take_batch().await.clone();
requests.extend(self.take_batch_end().await.values().cloned());
requests
} else {
self.take_batch().await.clone()
};
// Return early if we have no commands to send. // Return early if we have no commands to send.
if all_requests.is_empty() { if orig_requests.is_empty() {
return Ok(OkWebSocketResponseData::Modeling { return Ok(OkWebSocketResponseData::Modeling {
modeling_response: OkModelingCmdResponse::Empty {}, modeling_response: OkModelingCmdResponse::Empty {},
}); });
} }
let requests: Vec<ModelingCmdReq> = all_requests let requests: Vec<ModelingCmdReq> = orig_requests
.iter() .iter()
.filter_map(|(val, _)| match val { .filter_map(|(val, _)| match val {
WebSocketRequest::ModelingCmdReq(ModelingCmdReq { cmd, cmd_id }) => Some(ModelingCmdReq { WebSocketRequest::ModelingCmdReq(ModelingCmdReq { cmd, cmd_id }) => Some(ModelingCmdReq {
@ -370,9 +372,9 @@ pub trait EngineManager: std::fmt::Debug + Send + Sync + 'static {
responses: true, responses: true,
}); });
let final_req = if all_requests.len() == 1 { let final_req = if orig_requests.len() == 1 {
// We can unwrap here because we know the batch has only one element. // We can unwrap here because we know the batch has only one element.
all_requests.first().unwrap().0.clone() orig_requests.first().unwrap().0.clone()
} else { } else {
batched_requests batched_requests
}; };
@ -380,7 +382,7 @@ pub trait EngineManager: std::fmt::Debug + Send + Sync + 'static {
// Create the map of original command IDs to source range. // Create the map of original command IDs to source range.
// This is for the wasm side, kurt needs it for selections. // This is for the wasm side, kurt needs it for selections.
let mut id_to_source_range = HashMap::new(); let mut id_to_source_range = HashMap::new();
for (req, range) in all_requests.iter() { for (req, range) in orig_requests.iter() {
match req { match req {
WebSocketRequest::ModelingCmdReq(ModelingCmdReq { cmd: _, cmd_id }) => { WebSocketRequest::ModelingCmdReq(ModelingCmdReq { cmd: _, cmd_id }) => {
id_to_source_range.insert(Uuid::from(*cmd_id), *range); id_to_source_range.insert(Uuid::from(*cmd_id), *range);
@ -395,7 +397,7 @@ pub trait EngineManager: std::fmt::Debug + Send + Sync + 'static {
} }
// Do the artifact commands. // Do the artifact commands.
for (req, _) in all_requests.iter() { for (req, _) in orig_requests.iter() {
match &req { match &req {
WebSocketRequest::ModelingCmdBatchReq(ModelingBatch { requests, .. }) => { WebSocketRequest::ModelingCmdBatchReq(ModelingBatch { requests, .. }) => {
for request in requests { for request in requests {
@ -466,6 +468,25 @@ pub trait EngineManager: std::fmt::Debug + Send + Sync + 'static {
} }
} }
/// Force flush the batch queue.
async fn flush_batch(
&self,
// Whether or not to flush the end commands as well.
// We only do this at the very end of the file.
batch_end: bool,
source_range: SourceRange,
) -> Result<OkWebSocketResponseData, crate::errors::KclError> {
let all_requests = if batch_end {
let mut requests = self.take_batch().await.clone();
requests.extend(self.take_batch_end().await.values().cloned());
requests
} else {
self.take_batch().await.clone()
};
self.run_batch(all_requests, source_range).await
}
async fn make_default_plane( async fn make_default_plane(
&self, &self,
plane_id: uuid::Uuid, plane_id: uuid::Uuid,