Parallelize the artifact graph only time suck (#6482)

* parallelize the artifact only time suck

Signed-off-by: Jess Frazelle <github@jessfraz.com>

updates

Signed-off-by: Jess Frazelle <github@jessfraz.com>

make wasm safe

Signed-off-by: Jess Frazelle <github@jessfraz.com>

updates

Signed-off-by: Jess Frazelle <github@jessfraz.com>

* artifact graph things

Signed-off-by: Jess Frazelle <github@jessfraz.com>

* updates

Signed-off-by: Jess Frazelle <github@jessfraz.com>

* updates

Signed-off-by: Jess Frazelle <github@jessfraz.com>

---------

Signed-off-by: Jess Frazelle <github@jessfraz.com>
This commit is contained in:
Jess Frazelle
2025-04-26 21:21:26 -07:00
committed by GitHub
parent d0b0365f75
commit 24465cf463
253 changed files with 73547 additions and 55353 deletions

View File

@ -18,11 +18,12 @@ use tokio::sync::{mpsc, oneshot, RwLock};
use tokio_tungstenite::tungstenite::Message as WsMsg;
use uuid::Uuid;
use super::EngineStats;
#[cfg(feature = "artifact-graph")]
use crate::execution::ArtifactCommand;
use crate::{
engine::EngineManager,
engine::{AsyncTasks, EngineManager, EngineStats},
errors::{KclError, KclErrorDetails},
execution::{ArtifactCommand, DefaultPlanes, IdGenerator},
execution::{DefaultPlanes, IdGenerator},
SourceRange,
};
@ -37,13 +38,14 @@ type WebSocketTcpWrite = futures::stream::SplitSink<tokio_tungstenite::WebSocket
pub struct EngineConnection {
engine_req_tx: mpsc::Sender<ToEngineReq>,
shutdown_tx: mpsc::Sender<()>,
responses: Arc<RwLock<IndexMap<uuid::Uuid, WebSocketResponse>>>,
responses: ResponseInformation,
pending_errors: Arc<RwLock<Vec<String>>>,
#[allow(dead_code)]
tcp_read_handle: Arc<TcpReadHandle>,
socket_health: Arc<RwLock<SocketHealth>>,
batch: Arc<RwLock<Vec<(WebSocketRequest, SourceRange)>>>,
batch_end: Arc<RwLock<IndexMap<uuid::Uuid, (WebSocketRequest, SourceRange)>>>,
#[cfg(feature = "artifact-graph")]
artifact_commands: Arc<RwLock<Vec<ArtifactCommand>>>,
ids_of_async_commands: Arc<RwLock<IndexMap<Uuid, SourceRange>>>,
@ -53,6 +55,8 @@ pub struct EngineConnection {
session_data: Arc<RwLock<Option<ModelingSessionData>>>,
stats: EngineStats,
async_tasks: AsyncTasks,
}
pub struct TcpRead {
@ -116,12 +120,14 @@ impl Drop for TcpReadHandle {
}
}
struct ResponsesInformation {
/// Information about the responses from the engine.
#[derive(Clone, Debug)]
struct ResponseInformation {
/// The responses from the engine.
responses: Arc<RwLock<IndexMap<uuid::Uuid, WebSocketResponse>>>,
}
impl ResponsesInformation {
impl ResponseInformation {
pub async fn add(&self, id: Uuid, response: WebSocketResponse) {
self.responses.write().await.insert(id, response);
}
@ -238,14 +244,14 @@ impl EngineConnection {
let session_data: Arc<RwLock<Option<ModelingSessionData>>> = Arc::new(RwLock::new(None));
let session_data2 = session_data.clone();
let responses: Arc<RwLock<IndexMap<uuid::Uuid, WebSocketResponse>>> = Arc::new(RwLock::new(IndexMap::new()));
let ids_of_async_commands: Arc<RwLock<IndexMap<Uuid, SourceRange>>> = Arc::new(RwLock::new(IndexMap::new()));
let socket_health = Arc::new(RwLock::new(SocketHealth::Active));
let pending_errors = Arc::new(RwLock::new(Vec::new()));
let pending_errors_clone = pending_errors.clone();
let responses_information = ResponsesInformation {
responses: responses.clone(),
let response_information = ResponseInformation {
responses: Arc::new(RwLock::new(IndexMap::new())),
};
let response_information_cloned = response_information.clone();
let socket_health_tcp_read = socket_health.clone();
let tcp_read_handle = tokio::spawn(async move {
@ -270,7 +276,7 @@ impl EngineConnection {
BatchResponse::Success { response } => {
// If the id is in our ids of async commands, remove
// it.
responses_information
response_information_cloned
.add(
id,
WebSocketResponse::Success(SuccessWebSocketResponse {
@ -284,7 +290,7 @@ impl EngineConnection {
.await;
}
BatchResponse::Failure { errors } => {
responses_information
response_information_cloned
.add(
id,
WebSocketResponse::Failure(FailureWebSocketResponse {
@ -311,7 +317,7 @@ impl EngineConnection {
errors,
}) => {
if let Some(id) = request_id {
responses_information
response_information_cloned
.add(
*id,
WebSocketResponse::Failure(FailureWebSocketResponse {
@ -336,7 +342,7 @@ impl EngineConnection {
}
if let Some(id) = id {
responses_information.add(id, ws_resp.clone()).await;
response_information_cloned.add(id, ws_resp.clone()).await;
}
}
Err(e) => {
@ -357,16 +363,18 @@ impl EngineConnection {
tcp_read_handle: Arc::new(TcpReadHandle {
handle: Arc::new(tcp_read_handle),
}),
responses,
responses: response_information,
pending_errors,
socket_health,
batch: Arc::new(RwLock::new(Vec::new())),
batch_end: Arc::new(RwLock::new(IndexMap::new())),
#[cfg(feature = "artifact-graph")]
artifact_commands: Arc::new(RwLock::new(Vec::new())),
ids_of_async_commands,
default_planes: Default::default(),
session_data,
stats: Default::default(),
async_tasks: AsyncTasks::new(),
})
}
}
@ -382,9 +390,10 @@ impl EngineManager for EngineConnection {
}
fn responses(&self) -> Arc<RwLock<IndexMap<Uuid, WebSocketResponse>>> {
self.responses.clone()
self.responses.responses.clone()
}
#[cfg(feature = "artifact-graph")]
fn artifact_commands(&self) -> Arc<RwLock<Vec<ArtifactCommand>>> {
self.artifact_commands.clone()
}
@ -393,6 +402,10 @@ impl EngineManager for EngineConnection {
self.ids_of_async_commands.clone()
}
fn async_tasks(&self) -> AsyncTasks {
self.async_tasks.clone()
}
fn stats(&self) -> &EngineStats {
&self.stats
}
@ -483,9 +496,19 @@ impl EngineManager for EngineConnection {
}));
}
}
// We pop off the responses to cleanup our mappings.
if let Some(resp) = self.responses.write().await.shift_remove(&id) {
return Ok(resp);
#[cfg(feature = "artifact-graph")]
{
// We cannot pop here or it will break the artifact graph.
if let Some(resp) = self.responses.responses.read().await.get(&id) {
return Ok(resp.clone());
}
}
#[cfg(not(feature = "artifact-graph"))]
{
if let Some(resp) = self.responses.responses.write().await.shift_remove(&id) {
return Ok(resp);
}
}
}