Signed-off-by: Jess Frazelle <github@jessfraz.com>
This commit is contained in:
Jess Frazelle
2025-04-05 14:16:27 -07:00
parent e943303434
commit cb2368dc17
4 changed files with 210 additions and 143 deletions

View File

@ -31,14 +31,14 @@ use tokio::task::JoinSet;
use crate::{
engine::EngineManager,
errors::KclError,
errors::{KclError, KclErrorDetails},
execution::{
artifact::build_artifact_graph,
cache::{CacheInformation, CacheResult},
types::{UnitAngle, UnitLen},
},
fs::FileManager,
modules::{ModuleId, ModulePath},
modules::{ModuleId, ModulePath, ModuleRepr},
parsing::ast::types::{Expr, ImportPath, NodeRef},
source_range::SourceRange,
std::StdLib,
@ -671,7 +671,7 @@ impl ExecutorContext {
(program, exec_state, false)
};
let result = self.inner_run(&program, &mut exec_state, preserve_mem).await;
let result = self.run_concurrent(&program, &mut exec_state, preserve_mem).await;
if result.is_err() {
cache::bust_cache().await;
@ -704,7 +704,7 @@ impl ExecutorContext {
program: &crate::Program,
exec_state: &mut ExecState,
) -> Result<(EnvironmentRef, Option<ModelingSessionData>), KclErrorWithOutputs> {
self.run_concurrent(program, exec_state).await
self.run_concurrent(program, exec_state, false).await
}
/// Perform the execution of a program using an (experimental!) concurrent
@ -721,46 +721,112 @@ impl ExecutorContext {
&self,
program: &crate::Program,
exec_state: &mut ExecState,
preserve_mem: bool,
) -> Result<(EnvironmentRef, Option<ModelingSessionData>), KclErrorWithOutputs> {
self.prepare_mem(exec_state).await.unwrap();
let mut universe = std::collections::HashMap::new();
crate::walk::import_universe(self, &program.ast, &mut universe)
crate::walk::import_universe(self, &program.ast, &mut universe, exec_state)
.await
.unwrap();
.map_err(KclErrorWithOutputs::no_outputs)?;
for modules in crate::walk::import_graph(&universe).unwrap().into_iter() {
for modules in crate::walk::import_graph(&universe, self)
.map_err(KclErrorWithOutputs::no_outputs)?
.into_iter()
{
#[cfg(not(target_arch = "wasm32"))]
let mut set = JoinSet::new();
#[allow(clippy::type_complexity)]
let (results_tx, mut results_rx): (
tokio::sync::mpsc::Sender<(
ModuleId,
ModulePath,
Result<(Option<KclValue>, EnvironmentRef, Vec<String>), KclError>,
)>,
tokio::sync::mpsc::Receiver<_>,
) = tokio::sync::mpsc::channel(1);
for module in modules {
let program = universe.get(&module).unwrap().clone();
let Some((module_id, module_path, program)) = universe.get(&module) else {
return Err(KclErrorWithOutputs::no_outputs(KclError::Internal(KclErrorDetails {
message: format!("Module {module} not found in universe"),
source_ranges: Default::default(),
})));
};
let module_id = *module_id;
let module_path = module_path.clone();
let program = program.clone();
let exec_state = exec_state.clone();
let exec_ctxt = self.clone();
let results_tx = results_tx.clone();
set.spawn(async move {
println!("Running module {module} from run_concurrent");
let mut exec_state = exec_state;
let exec_ctxt = exec_ctxt;
let program = program;
#[cfg(target_arch = "wasm32")]
{
wasm_bindgen_futures::spawn_local(async move {
//set.spawn(async move {
println!("Running module {module} from run_concurrent");
let mut exec_state = exec_state;
let exec_ctxt = exec_ctxt;
exec_ctxt
.inner_run(
&crate::Program {
ast: program.clone(),
original_file_contents: "".to_owned(),
},
&mut exec_state,
false,
)
.await
});
let result = exec_ctxt
.exec_module_from_ast(
&program,
module_id,
&module_path,
&mut exec_state,
Default::default(),
)
.await;
results_tx.send((module_id, module_path, result)).await.unwrap();
});
}
#[cfg(not(target_arch = "wasm32"))]
{
set.spawn(async move {
println!("Running module {module} from run_concurrent");
let mut exec_state = exec_state;
let exec_ctxt = exec_ctxt;
let result = exec_ctxt
.exec_module_from_ast(
&program,
module_id,
&module_path,
&mut exec_state,
Default::default(),
)
.await;
results_tx.send((module_id, module_path, result)).await.unwrap();
});
}
}
set.join_all().await;
drop(results_tx);
while let Some((module_id, _, result)) = results_rx.recv().await {
match result {
Ok((_, session_data, variables)) => {
let mut repr = exec_state.global.module_infos[&module_id].take_repr();
let ModuleRepr::Kcl(_, cache) = &mut repr else {
continue;
};
*cache = Some((session_data, variables.clone()));
exec_state.global.module_infos[&module_id].restore_repr(repr);
}
Err(e) => {
return Err(KclErrorWithOutputs::no_outputs(e));
}
}
}
}
self.inner_run(program, exec_state, false).await
self.inner_run(program, exec_state, preserve_mem).await
}
/// Perform the execution of a program. Accept all possible parameters and
@ -811,11 +877,11 @@ impl ExecutorContext {
)
})?;
/* if !self.is_mock() {
if !self.is_mock() {
let mut mem = exec_state.stack().deep_clone();
mem.restore_env(env_ref);
cache::write_old_memory((mem, exec_state.global.module_infos.clone())).await;
}*/
}
let session_data = self.engine.get_session_data().await;
Ok((env_ref, session_data))
}