diff --git a/rust/kcl-lib/src/execution/exec_ast.rs b/rust/kcl-lib/src/execution/exec_ast.rs index 3b5e7d6cd..2ed45f842 100644 --- a/rust/kcl-lib/src/execution/exec_ast.rs +++ b/rust/kcl-lib/src/execution/exec_ast.rs @@ -29,7 +29,7 @@ use crate::{ args::{Arg, KwArgs}, FunctionKind, }, - CompilationError, ExecutorSettings, + CompilationError, }; enum StatementKind<'a> { @@ -2227,9 +2227,10 @@ mod test { use crate::{ execution::{memory::Stack, parse_execute, ContextType}, parsing::ast::types::{DefaultParamVal, Identifier, Parameter}, + ExecutorSettings, }; use std::sync::Arc; - use tokio::{io::AsyncWriteExt, task::JoinSet}; + use tokio::io::AsyncWriteExt; #[tokio::test(flavor = "multi_thread")] async fn test_assign_args_to_params() { @@ -2449,35 +2450,40 @@ a = foo() let programa_kcl = r#" export a = 1 "#; - let programa = crate::parsing::parse_str(&programa_kcl, ModuleId::default()) - .parse_errs_as_err() - .unwrap(); - universe.insert("a.kcl".to_owned(), programa); - // program b.kcl let programb_kcl = r#" import a from 'a.kcl' export b = a + 1 "#; - let programb = crate::parsing::parse_str(&programb_kcl, ModuleId::default()) - .parse_errs_as_err() - .unwrap(); - universe.insert("b.kcl".to_owned(), programb); - // program c.kcl let programc_kcl = r#" import a from 'a.kcl' export c = a + 2 "#; - let programc = crate::parsing::parse_str(&programc_kcl, ModuleId::default()) + + // program main.kcl + let main_kcl = r#" +import b from 'b.kcl' +import c from 'c.kcl' + +d = b + c +"#; + + let main = crate::parsing::parse_str(&main_kcl, ModuleId::default()) .parse_errs_as_err() .unwrap(); - universe.insert("c.kcl".to_owned(), programc); let tmpdir = tempdir::TempDir::new("zma_kcl_load_all_modules").unwrap(); + tokio::fs::File::create(tmpdir.path().join("main.kcl")) + .await + .unwrap() + .write_all(main_kcl.as_bytes()) + .await + .unwrap(); + tokio::fs::File::create(tmpdir.path().join("a.kcl")) .await .unwrap() @@ -2499,9 +2505,6 @@ export c = a + 2 .await .unwrap(); - // ok we have all the "files" loaded, let's do a sort and concurrent - // run here. - let exec_ctxt = ExecutorContext { engine: Arc::new(Box::new( crate::engine::conn_mock::EngineConnection::new() @@ -2524,49 +2527,15 @@ export c = a + 2 }; let mut exec_state = ExecState::new(&exec_ctxt); - exec_ctxt.prepare_mem(&mut exec_state).await.unwrap(); - - for modules in crate::walk::import_graph(&universe).unwrap().into_iter() { - let mut set = JoinSet::new(); - let (results_send, mut results_recv) = tokio::sync::mpsc::channel(1); - - for module in modules { - let program = universe.get(&module).unwrap().clone(); - let module = module.clone(); - let mut exec_state = exec_state.clone(); - let exec_ctxt = exec_ctxt.clone(); - let results_send = results_send.clone(); - - set.spawn(async move { - let module = module; - let mut exec_state = exec_state; - let exec_ctxt = exec_ctxt; - let program = program; - let results_send = results_send; - - let result = exec_ctxt - .run( - &crate::Program { - ast: program.clone(), - original_file_contents: "".to_owned(), - }, - &mut exec_state, - ) - .await - .unwrap(); - - results_send.send((module, result)).await.unwrap(); - }); - } - drop(results_send); - - eprintln!("Collecting results"); - while let Some((module_name, module_result)) = results_recv.recv().await { - eprintln!("Got result for {}", module_name); - eprintln!("{:?}", module_result); - } - - set.join_all().await; - } + exec_ctxt + .run_concurrent( + &crate::Program { + ast: main.clone(), + original_file_contents: "".to_owned(), + }, + &mut exec_state, + ) + .await + .unwrap(); } } diff --git a/rust/kcl-lib/src/execution/mod.rs b/rust/kcl-lib/src/execution/mod.rs index 812e0a8c2..bd8a11b8c 100644 --- a/rust/kcl-lib/src/execution/mod.rs +++ b/rust/kcl-lib/src/execution/mod.rs @@ -27,6 +27,7 @@ pub use memory::EnvironmentRef; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; pub use state::{ExecState, MetaSettings}; +use tokio::task::JoinSet; use crate::{ engine::EngineManager, @@ -721,6 +722,58 @@ impl ExecutorContext { self.inner_run(program, exec_state, false).await } + /// Perform the execution of a program using an (experimental!) concurrent + /// execution model. This has the same signature as [Self::run]. + /// + /// For now -- do not use this unless you're willing to accept some + /// breakage. + /// + /// You can optionally pass in some initialization memory for partial + /// execution. + /// + /// To access non-fatal errors and warnings, extract them from the `ExecState`. + pub async fn run_concurrent( + &self, + program: &crate::Program, + exec_state: &mut ExecState, + ) -> Result<(EnvironmentRef, Option), KclErrorWithOutputs> { + self.prepare_mem(exec_state).await.unwrap(); + + let universe = std::collections::HashMap::new(); // crate::walk::import_universe(&self).await.unwrap(); + + for modules in crate::walk::import_graph(&universe).unwrap().into_iter() { + let mut set = JoinSet::new(); + + for module in modules { + let program = universe.get(&module).unwrap().clone(); + let module = module.clone(); + let mut exec_state = exec_state.clone(); + let exec_ctxt = self.clone(); + + set.spawn(async move { + let module = module; + let mut exec_state = exec_state; + let exec_ctxt = exec_ctxt; + let program = program; + + exec_ctxt + .run( + &crate::Program { + ast: program.clone(), + original_file_contents: "".to_owned(), + }, + &mut exec_state, + ) + .await + }); + } + + set.join_all().await; + } + + self.run(&program, exec_state).await + } + /// Perform the execution of a program. Accept all possible parameters and /// output everything. async fn inner_run( diff --git a/rust/kcl-lib/src/walk/import_graph.rs b/rust/kcl-lib/src/walk/import_graph.rs index 22418ad63..a45d5c61d 100644 --- a/rust/kcl-lib/src/walk/import_graph.rs +++ b/rust/kcl-lib/src/walk/import_graph.rs @@ -8,6 +8,7 @@ use anyhow::Result; use crate::{ parsing::ast::types::{ImportPath, Node as AstNode, NodeRef, Program}, walk::{Node, Visitable}, + ExecutorContext, }; /// Specific dependency between two modules. The 0th element of this tuple @@ -125,6 +126,10 @@ pub(crate) fn import_dependencies(prog: NodeRef) -> Result> Ok(ret) } +pub(crate) async fn import_universe(ctx: &ExecutorContext) -> Result> { + panic!("ASF"); +} + #[cfg(test)] mod tests { use super::*; diff --git a/rust/kcl-lib/src/walk/mod.rs b/rust/kcl-lib/src/walk/mod.rs index 5e04ca66d..99110eb93 100644 --- a/rust/kcl-lib/src/walk/mod.rs +++ b/rust/kcl-lib/src/walk/mod.rs @@ -9,3 +9,4 @@ pub use ast_node::Node; pub use ast_visitor::Visitable; pub use ast_walk::walk; pub use import_graph::import_graph; +pub(crate) use import_graph::import_universe;