allow sending async commands to engine (#6342)

* start of async

Signed-off-by: Jess Frazelle <github@jessfraz.com>

check at end if the async commands completed

Signed-off-by: Jess Frazelle <github@jessfraz.com>

run at the end of inner_run

Signed-off-by: Jess Frazelle <github@jessfraz.com>

set import as async

Signed-off-by: Jess Frazelle <github@jessfraz.com>

updates

Signed-off-by: Jess Frazelle <github@jessfraz.com>

updates

Signed-off-by: Jess Frazelle <github@jessfraz.com>

add to the wasm side

Signed-off-by: Jess Frazelle <github@jessfraz.com>

updates

Signed-off-by: Jess Frazelle <github@jessfraz.com>

fmt

Signed-off-by: Jess Frazelle <github@jessfraz.com>

* fire

Signed-off-by: Jess Frazelle <github@jessfraz.com>

* flake

Signed-off-by: Jess Frazelle <github@jessfraz.com>

* fixup for awaiting on import

Signed-off-by: Jess Frazelle <github@jessfraz.com>

* updates

Signed-off-by: Jess Frazelle <github@jessfraz.com>

* updates

Signed-off-by: Jess Frazelle <github@jessfraz.com>

* fix mock

Signed-off-by: Jess Frazelle <github@jessfraz.com>

* fix mock

Signed-off-by: Jess Frazelle <github@jessfraz.com>

* updates

Signed-off-by: Jess Frazelle <github@jessfraz.com>

* fixes

Signed-off-by: Jess Frazelle <github@jessfraz.com>

* add a test where we import then do a bunch of other stuff

Signed-off-by: Jess Frazelle <github@jessfraz.com>

* updates

Signed-off-by: Jess Frazelle <github@jessfraz.com>

* fixup to see

Signed-off-by: Jess Frazelle <github@jessfraz.com>

* fixups

Signed-off-by: Jess Frazelle <github@jessfraz.com>

* fix tests

Signed-off-by: Jess Frazelle <github@jessfraz.com>

* updates

Signed-off-by: Jess Frazelle <github@jessfraz.com>

* cross platform time

Signed-off-by: Jess Frazelle <github@jessfraz.com>

* fixes

Signed-off-by: Jess Frazelle <github@jessfraz.com>

* updates

Signed-off-by: Jess Frazelle <github@jessfraz.com>

* updates

Signed-off-by: Jess Frazelle <github@jessfraz.com>

* another appearance tests

Signed-off-by: Jess Frazelle <github@jessfraz.com>

* new docs and tests

Signed-off-by: Jess Frazelle <github@jessfraz.com>

* updates

Signed-off-by: Jess Frazelle <github@jessfraz.com>

* dont loop so tight

Signed-off-by: Jess Frazelle <github@jessfraz.com>

* fixes

Signed-off-by: Jess Frazelle <github@jessfraz.com>

---------

Signed-off-by: Jess Frazelle <github@jessfraz.com>
This commit is contained in:
Jess Frazelle
2025-04-17 17:22:19 -07:00
committed by GitHub
parent 0b9889e313
commit bd4bad0020
40 changed files with 5681335 additions and 155 deletions

View File

@ -45,6 +45,7 @@ pub struct EngineConnection {
batch: Arc<RwLock<Vec<(WebSocketRequest, SourceRange)>>>,
batch_end: Arc<RwLock<IndexMap<uuid::Uuid, (WebSocketRequest, SourceRange)>>>,
artifact_commands: Arc<RwLock<Vec<ArtifactCommand>>>,
ids_of_async_commands: Arc<RwLock<IndexMap<Uuid, SourceRange>>>,
/// The default planes for the scene.
default_planes: Arc<RwLock<Option<DefaultPlanes>>>,
@ -115,6 +116,17 @@ impl Drop for TcpReadHandle {
}
}
struct ResponsesInformation {
/// The responses from the engine.
responses: Arc<RwLock<IndexMap<uuid::Uuid, WebSocketResponse>>>,
}
impl ResponsesInformation {
pub async fn add(&self, id: Uuid, response: WebSocketResponse) {
self.responses.write().await.insert(id, response);
}
}
/// Requests to send to the engine, and a way to await a response.
struct ToEngineReq {
/// The request to send
@ -227,10 +239,13 @@ impl EngineConnection {
let session_data: Arc<RwLock<Option<ModelingSessionData>>> = Arc::new(RwLock::new(None));
let session_data2 = session_data.clone();
let responses: Arc<RwLock<IndexMap<uuid::Uuid, WebSocketResponse>>> = Arc::new(RwLock::new(IndexMap::new()));
let responses_clone = responses.clone();
let ids_of_async_commands: Arc<RwLock<IndexMap<Uuid, SourceRange>>> = Arc::new(RwLock::new(IndexMap::new()));
let socket_health = Arc::new(RwLock::new(SocketHealth::Active));
let pending_errors = Arc::new(RwLock::new(Vec::new()));
let pending_errors_clone = pending_errors.clone();
let responses_information = ResponsesInformation {
responses: responses.clone(),
};
let socket_health_tcp_read = socket_health.clone();
let tcp_read_handle = tokio::spawn(async move {
@ -244,8 +259,7 @@ impl EngineConnection {
WebSocketResponse::Success(SuccessWebSocketResponse {
resp: OkWebSocketResponseData::ModelingBatch { responses },
..
}) =>
{
}) => {
#[expect(
clippy::iter_over_hash_type,
reason = "modeling command uses a HashMap and keys are random, so we don't really have a choice"
@ -254,26 +268,32 @@ impl EngineConnection {
let id: uuid::Uuid = (*resp_id).into();
match batch_response {
BatchResponse::Success { response } => {
responses_clone.write().await.insert(
id,
WebSocketResponse::Success(SuccessWebSocketResponse {
success: true,
request_id: Some(id),
resp: OkWebSocketResponseData::Modeling {
modeling_response: response.clone(),
},
}),
);
// If the id is in our ids of async commands, remove
// it.
responses_information
.add(
id,
WebSocketResponse::Success(SuccessWebSocketResponse {
success: true,
request_id: Some(id),
resp: OkWebSocketResponseData::Modeling {
modeling_response: response.clone(),
},
}),
)
.await;
}
BatchResponse::Failure { errors } => {
responses_clone.write().await.insert(
id,
WebSocketResponse::Failure(FailureWebSocketResponse {
success: false,
request_id: Some(id),
errors: errors.clone(),
}),
);
responses_information
.add(
id,
WebSocketResponse::Failure(FailureWebSocketResponse {
success: false,
request_id: Some(id),
errors: errors.clone(),
}),
)
.await;
}
}
}
@ -291,14 +311,16 @@ impl EngineConnection {
errors,
}) => {
if let Some(id) = request_id {
responses_clone.write().await.insert(
*id,
WebSocketResponse::Failure(FailureWebSocketResponse {
success: false,
request_id: *request_id,
errors: errors.clone(),
}),
);
responses_information
.add(
*id,
WebSocketResponse::Failure(FailureWebSocketResponse {
success: false,
request_id: *request_id,
errors: errors.clone(),
}),
)
.await;
} else {
// Add it to our pending errors.
let mut pe = pending_errors_clone.write().await;
@ -314,7 +336,7 @@ impl EngineConnection {
}
if let Some(id) = id {
responses_clone.write().await.insert(id, ws_resp.clone());
responses_information.add(id, ws_resp.clone()).await;
}
}
Err(e) => {
@ -341,6 +363,7 @@ impl EngineConnection {
batch: Arc::new(RwLock::new(Vec::new())),
batch_end: Arc::new(RwLock::new(IndexMap::new())),
artifact_commands: Arc::new(RwLock::new(Vec::new())),
ids_of_async_commands,
default_planes: Default::default(),
session_data,
stats: Default::default(),
@ -366,6 +389,10 @@ impl EngineManager for EngineConnection {
self.artifact_commands.clone()
}
fn ids_of_async_commands(&self) -> Arc<RwLock<IndexMap<Uuid, SourceRange>>> {
self.ids_of_async_commands.clone()
}
fn stats(&self) -> &EngineStats {
&self.stats
}
@ -386,13 +413,13 @@ impl EngineManager for EngineConnection {
Ok(())
}
async fn inner_send_modeling_cmd(
async fn inner_fire_modeling_cmd(
&self,
id: uuid::Uuid,
_id: uuid::Uuid,
source_range: SourceRange,
cmd: WebSocketRequest,
_id_to_source_range: HashMap<Uuid, SourceRange>,
) -> Result<WebSocketResponse, KclError> {
) -> Result<(), KclError> {
let (tx, rx) = oneshot::channel();
// Send the request to the engine, via the actor.
@ -424,6 +451,19 @@ impl EngineManager for EngineConnection {
})
})?;
Ok(())
}
async fn inner_send_modeling_cmd(
&self,
id: uuid::Uuid,
source_range: SourceRange,
cmd: WebSocketRequest,
id_to_source_range: HashMap<Uuid, SourceRange>,
) -> Result<WebSocketResponse, KclError> {
self.inner_fire_modeling_cmd(id, source_range, cmd, id_to_source_range)
.await?;
// Wait for the response.
let current_time = std::time::Instant::now();
while current_time.elapsed().as_secs() < 60 {

View File

@ -12,7 +12,7 @@ use kcmc::{
WebSocketResponse,
},
};
use kittycad_modeling_cmds::{self as kcmc};
use kittycad_modeling_cmds::{self as kcmc, websocket::ModelingCmdReq, ImportFiles, ModelingCmd};
use tokio::sync::RwLock;
use uuid::Uuid;
@ -29,6 +29,8 @@ pub struct EngineConnection {
batch: Arc<RwLock<Vec<(WebSocketRequest, SourceRange)>>>,
batch_end: Arc<RwLock<IndexMap<uuid::Uuid, (WebSocketRequest, SourceRange)>>>,
artifact_commands: Arc<RwLock<Vec<ArtifactCommand>>>,
ids_of_async_commands: Arc<RwLock<IndexMap<Uuid, SourceRange>>>,
responses: Arc<RwLock<IndexMap<Uuid, WebSocketResponse>>>,
/// The default planes for the scene.
default_planes: Arc<RwLock<Option<DefaultPlanes>>>,
stats: EngineStats,
@ -40,6 +42,8 @@ impl EngineConnection {
batch: Arc::new(RwLock::new(Vec::new())),
batch_end: Arc::new(RwLock::new(IndexMap::new())),
artifact_commands: Arc::new(RwLock::new(Vec::new())),
ids_of_async_commands: Arc::new(RwLock::new(IndexMap::new())),
responses: Arc::new(RwLock::new(IndexMap::new())),
default_planes: Default::default(),
stats: Default::default(),
})
@ -57,7 +61,7 @@ impl crate::engine::EngineManager for EngineConnection {
}
fn responses(&self) -> Arc<RwLock<IndexMap<Uuid, WebSocketResponse>>> {
Arc::new(RwLock::new(IndexMap::new()))
self.responses.clone()
}
fn stats(&self) -> &EngineStats {
@ -68,6 +72,10 @@ impl crate::engine::EngineManager for EngineConnection {
self.artifact_commands.clone()
}
fn ids_of_async_commands(&self) -> Arc<RwLock<IndexMap<Uuid, SourceRange>>> {
self.ids_of_async_commands.clone()
}
fn get_default_planes(&self) -> Arc<RwLock<Option<DefaultPlanes>>> {
self.default_planes.clone()
}
@ -80,6 +88,25 @@ impl crate::engine::EngineManager for EngineConnection {
Ok(())
}
async fn inner_fire_modeling_cmd(
&self,
id: uuid::Uuid,
source_range: SourceRange,
cmd: WebSocketRequest,
id_to_source_range: HashMap<Uuid, SourceRange>,
) -> Result<(), KclError> {
// Pop off the id we care about.
self.ids_of_async_commands.write().await.swap_remove(&id);
// Add the response to our responses.
let response = self
.inner_send_modeling_cmd(id, source_range, cmd, id_to_source_range)
.await?;
self.responses().write().await.insert(id, response);
Ok(())
}
async fn inner_send_modeling_cmd(
&self,
id: uuid::Uuid,
@ -109,6 +136,20 @@ impl crate::engine::EngineManager for EngineConnection {
success: true,
}))
}
WebSocketRequest::ModelingCmdReq(ModelingCmdReq {
cmd: ModelingCmd::ImportFiles(ImportFiles { .. }),
cmd_id,
}) => Ok(WebSocketResponse::Success(SuccessWebSocketResponse {
request_id: Some(id),
resp: OkWebSocketResponseData::Modeling {
modeling_response: OkModelingCmdResponse::ImportFiles(
kittycad_modeling_cmds::output::ImportFiles {
object_id: cmd_id.into(),
},
),
},
success: true,
})),
WebSocketRequest::ModelingCmdReq(_) => Ok(WebSocketResponse::Success(SuccessWebSocketResponse {
request_id: Some(id),
resp: OkWebSocketResponseData::Modeling {

View File

@ -22,6 +22,15 @@ extern "C" {
#[derive(Debug, Clone)]
pub type EngineCommandManager;
#[wasm_bindgen(method, js_name = fireModelingCommandFromWasm, catch)]
fn fire_modeling_cmd_from_wasm(
this: &EngineCommandManager,
id: String,
rangeStr: String,
cmdStr: String,
idToRangeStr: String,
) -> Result<(), js_sys::Error>;
#[wasm_bindgen(method, js_name = sendModelingCommandFromWasm, catch)]
fn send_modeling_cmd_from_wasm(
this: &EngineCommandManager,
@ -38,33 +47,128 @@ extern "C" {
#[derive(Debug, Clone)]
pub struct EngineConnection {
manager: Arc<EngineCommandManager>,
response_context: Arc<ResponseContext>,
batch: Arc<RwLock<Vec<(WebSocketRequest, SourceRange)>>>,
batch_end: Arc<RwLock<IndexMap<uuid::Uuid, (WebSocketRequest, SourceRange)>>>,
responses: Arc<RwLock<IndexMap<Uuid, WebSocketResponse>>>,
artifact_commands: Arc<RwLock<Vec<ArtifactCommand>>>,
ids_of_async_commands: Arc<RwLock<IndexMap<Uuid, SourceRange>>>,
/// The default planes for the scene.
default_planes: Arc<RwLock<Option<DefaultPlanes>>>,
stats: EngineStats,
}
#[wasm_bindgen]
#[derive(Debug, Clone)]
pub struct ResponseContext {
responses: Arc<RwLock<IndexMap<Uuid, WebSocketResponse>>>,
}
#[wasm_bindgen]
impl ResponseContext {
#[wasm_bindgen(constructor)]
pub fn new() -> Self {
Self {
responses: Arc::new(RwLock::new(IndexMap::new())),
}
}
// Add a response to the context.
pub async fn send_response(&self, data: js_sys::Uint8Array) -> Result<(), JsValue> {
let ws_result: WebSocketResponse = match bson::from_slice(&data.to_vec()) {
Ok(res) => res,
Err(_) => {
// We don't care about the error if we can't parse it.
return Ok(());
}
};
let id = match &ws_result {
WebSocketResponse::Success(res) => res.request_id,
WebSocketResponse::Failure(res) => res.request_id,
};
let Some(id) = id else {
// We only care if we have an id.
return Ok(());
};
// Add this response to our responses.
self.add(id, ws_result.clone()).await;
Ok(())
}
}
impl ResponseContext {
pub async fn add(&self, id: Uuid, response: WebSocketResponse) {
self.responses.write().await.insert(id, response);
}
pub fn responses(&self) -> Arc<RwLock<IndexMap<Uuid, WebSocketResponse>>> {
self.responses.clone()
}
}
// Safety: WebAssembly will only ever run in a single-threaded context.
unsafe impl Send for EngineConnection {}
unsafe impl Sync for EngineConnection {}
impl EngineConnection {
pub async fn new(manager: EngineCommandManager) -> Result<EngineConnection, JsValue> {
pub async fn new(
manager: EngineCommandManager,
response_context: Arc<ResponseContext>,
) -> Result<EngineConnection, JsValue> {
#[allow(clippy::arc_with_non_send_sync)]
Ok(EngineConnection {
manager: Arc::new(manager),
batch: Arc::new(RwLock::new(Vec::new())),
batch_end: Arc::new(RwLock::new(IndexMap::new())),
responses: Arc::new(RwLock::new(IndexMap::new())),
response_context,
artifact_commands: Arc::new(RwLock::new(Vec::new())),
ids_of_async_commands: Arc::new(RwLock::new(IndexMap::new())),
default_planes: Default::default(),
stats: Default::default(),
})
}
async fn do_fire_modeling_cmd(
&self,
id: uuid::Uuid,
source_range: SourceRange,
cmd: WebSocketRequest,
id_to_source_range: HashMap<uuid::Uuid, SourceRange>,
) -> Result<(), KclError> {
let source_range_str = serde_json::to_string(&source_range).map_err(|e| {
KclError::Engine(KclErrorDetails {
message: format!("Failed to serialize source range: {:?}", e),
source_ranges: vec![source_range],
})
})?;
let cmd_str = serde_json::to_string(&cmd).map_err(|e| {
KclError::Engine(KclErrorDetails {
message: format!("Failed to serialize modeling command: {:?}", e),
source_ranges: vec![source_range],
})
})?;
let id_to_source_range_str = serde_json::to_string(&id_to_source_range).map_err(|e| {
KclError::Engine(KclErrorDetails {
message: format!("Failed to serialize id to source range: {:?}", e),
source_ranges: vec![source_range],
})
})?;
self.manager
.fire_modeling_cmd_from_wasm(id.to_string(), source_range_str, cmd_str, id_to_source_range_str)
.map_err(|e| {
KclError::Engine(KclErrorDetails {
message: e.to_string().into(),
source_ranges: vec![source_range],
})
})?;
Ok(())
}
async fn do_send_modeling_cmd(
&self,
id: uuid::Uuid,
@ -151,7 +255,7 @@ impl crate::engine::EngineManager for EngineConnection {
}
fn responses(&self) -> Arc<RwLock<IndexMap<Uuid, WebSocketResponse>>> {
self.responses.clone()
self.response_context.responses.clone()
}
fn stats(&self) -> &EngineStats {
@ -162,6 +266,10 @@ impl crate::engine::EngineManager for EngineConnection {
self.artifact_commands.clone()
}
fn ids_of_async_commands(&self) -> Arc<RwLock<IndexMap<Uuid, SourceRange>>> {
self.ids_of_async_commands.clone()
}
fn get_default_planes(&self) -> Arc<RwLock<Option<DefaultPlanes>>> {
self.default_planes.clone()
}
@ -193,6 +301,19 @@ impl crate::engine::EngineManager for EngineConnection {
Ok(())
}
async fn inner_fire_modeling_cmd(
&self,
id: uuid::Uuid,
source_range: SourceRange,
cmd: WebSocketRequest,
id_to_source_range: HashMap<Uuid, SourceRange>,
) -> Result<(), KclError> {
self.do_fire_modeling_cmd(id, source_range, cmd, id_to_source_range)
.await?;
Ok(())
}
async fn inner_send_modeling_cmd(
&self,
id: uuid::Uuid,
@ -204,9 +325,7 @@ impl crate::engine::EngineManager for EngineConnection {
.do_send_modeling_cmd(id, source_range, cmd, id_to_source_range)
.await?;
let mut responses = self.responses.write().await;
responses.insert(id, ws_result.clone());
drop(responses);
self.response_context.add(id, ws_result.clone()).await;
Ok(ws_result)
}

View File

@ -76,6 +76,9 @@ pub trait EngineManager: std::fmt::Debug + Send + Sync + 'static {
/// Get the artifact commands that have accumulated so far.
fn artifact_commands(&self) -> Arc<RwLock<Vec<ArtifactCommand>>>;
/// Get the ids of the async commands we are waiting for.
fn ids_of_async_commands(&self) -> Arc<RwLock<IndexMap<Uuid, SourceRange>>>;
/// Take the batch of commands that have accumulated so far and clear them.
async fn take_batch(&self) -> Vec<(WebSocketRequest, SourceRange)> {
std::mem::take(&mut *self.batch().write().await)
@ -96,6 +99,11 @@ pub trait EngineManager: std::fmt::Debug + Send + Sync + 'static {
std::mem::take(&mut *self.artifact_commands().write().await)
}
/// Take the ids of async commands that have accumulated so far and clear them.
async fn take_ids_of_async_commands(&self) -> IndexMap<Uuid, SourceRange> {
std::mem::take(&mut *self.ids_of_async_commands().write().await)
}
/// Take the responses that have accumulated so far and clear them.
async fn take_responses(&self) -> IndexMap<Uuid, WebSocketResponse> {
std::mem::take(&mut *self.responses().write().await)
@ -136,8 +144,18 @@ pub trait EngineManager: std::fmt::Debug + Send + Sync + 'static {
async fn clear_queues(&self) {
self.batch().write().await.clear();
self.batch_end().write().await.clear();
self.ids_of_async_commands().write().await.clear();
}
/// Send a modeling command and do not wait for the response message.
async fn inner_fire_modeling_cmd(
&self,
id: uuid::Uuid,
source_range: SourceRange,
cmd: WebSocketRequest,
id_to_source_range: HashMap<Uuid, SourceRange>,
) -> Result<(), crate::errors::KclError>;
/// Send a modeling command and wait for the response message.
async fn inner_send_modeling_cmd(
&self,
@ -180,6 +198,68 @@ pub trait EngineManager: std::fmt::Debug + Send + Sync + 'static {
Ok(())
}
/// Ensure a specific async command has been completed.
async fn ensure_async_command_completed(
&self,
id: uuid::Uuid,
source_range: Option<SourceRange>,
) -> Result<OkWebSocketResponseData, KclError> {
let source_range = if let Some(source_range) = source_range {
source_range
} else {
// Look it up if we don't have it.
self.ids_of_async_commands()
.read()
.await
.get(&id)
.cloned()
.unwrap_or_default()
};
let current_time = instant::Instant::now();
while current_time.elapsed().as_secs() < 60 {
let responses = self.responses().read().await.clone();
let Some(resp) = responses.get(&id) else {
// Sleep for a little so we don't hog the CPU.
// No seriously WE DO NOT WANT TO PAUSE THE WHOLE APP ON THE JS SIDE.
let duration = instant::Duration::from_millis(100);
#[cfg(target_arch = "wasm32")]
wasm_timer::Delay::new(duration).await.map_err(|err| {
KclError::Internal(KclErrorDetails {
message: format!("Failed to sleep: {:?}", err),
source_ranges: vec![source_range],
})
})?;
#[cfg(not(target_arch = "wasm32"))]
tokio::time::sleep(duration).await;
continue;
};
// If the response is an error, return it.
// Parsing will do that and we can ignore the result, we don't care.
let response = self.parse_websocket_response(resp.clone(), source_range)?;
return Ok(response);
}
Err(KclError::Engine(KclErrorDetails {
message: "async command timed out".to_string(),
source_ranges: vec![source_range],
}))
}
/// Ensure ALL async commands have been completed.
async fn ensure_async_commands_completed(&self) -> Result<(), KclError> {
// Check if all async commands have been completed.
let ids = self.take_ids_of_async_commands().await;
// Try to get them from the responses.
for (id, source_range) in ids {
self.ensure_async_command_completed(id, Some(source_range)).await?;
}
Ok(())
}
/// Set the visibility of edges.
async fn set_edge_visibility(
&self,
@ -342,6 +422,36 @@ pub trait EngineManager: std::fmt::Debug + Send + Sync + 'static {
self.run_batch(requests, source_range).await
}
/// Send the modeling cmd async and don't wait for the response.
/// Add it to our list of async commands.
async fn async_modeling_cmd(
&self,
id: uuid::Uuid,
source_range: SourceRange,
cmd: &ModelingCmd,
) -> Result<(), crate::errors::KclError> {
// Add the command ID to the list of async commands.
self.ids_of_async_commands().write().await.insert(id, source_range);
// Add to artifact commands.
self.handle_artifact_command(cmd, id.into(), &HashMap::from([(id, source_range)]))
.await?;
// Fire off the command now, but don't wait for the response, we don't care about it.
self.inner_fire_modeling_cmd(
id,
source_range,
WebSocketRequest::ModelingCmdReq(ModelingCmdReq {
cmd: cmd.clone(),
cmd_id: id.into(),
}),
HashMap::from([(id, source_range)]),
)
.await?;
Ok(())
}
/// Run the batch for the specific commands.
async fn run_batch(
&self,