chip away more

This commit is contained in:
Paul Tagliamonte
2025-04-01 13:35:50 -04:00
parent 0ac9ac3896
commit 275c23f294
4 changed files with 89 additions and 61 deletions

View File

@ -29,7 +29,7 @@ use crate::{
args::{Arg, KwArgs}, args::{Arg, KwArgs},
FunctionKind, FunctionKind,
}, },
CompilationError, ExecutorSettings, CompilationError,
}; };
enum StatementKind<'a> { enum StatementKind<'a> {
@ -2227,9 +2227,10 @@ mod test {
use crate::{ use crate::{
execution::{memory::Stack, parse_execute, ContextType}, execution::{memory::Stack, parse_execute, ContextType},
parsing::ast::types::{DefaultParamVal, Identifier, Parameter}, parsing::ast::types::{DefaultParamVal, Identifier, Parameter},
ExecutorSettings,
}; };
use std::sync::Arc; use std::sync::Arc;
use tokio::{io::AsyncWriteExt, task::JoinSet}; use tokio::io::AsyncWriteExt;
#[tokio::test(flavor = "multi_thread")] #[tokio::test(flavor = "multi_thread")]
async fn test_assign_args_to_params() { async fn test_assign_args_to_params() {
@ -2449,35 +2450,40 @@ a = foo()
let programa_kcl = r#" let programa_kcl = r#"
export a = 1 export a = 1
"#; "#;
let programa = crate::parsing::parse_str(&programa_kcl, ModuleId::default())
.parse_errs_as_err()
.unwrap();
universe.insert("a.kcl".to_owned(), programa);
// program b.kcl // program b.kcl
let programb_kcl = r#" let programb_kcl = r#"
import a from 'a.kcl' import a from 'a.kcl'
export b = a + 1 export b = a + 1
"#; "#;
let programb = crate::parsing::parse_str(&programb_kcl, ModuleId::default())
.parse_errs_as_err()
.unwrap();
universe.insert("b.kcl".to_owned(), programb);
// program c.kcl // program c.kcl
let programc_kcl = r#" let programc_kcl = r#"
import a from 'a.kcl' import a from 'a.kcl'
export c = a + 2 export c = a + 2
"#; "#;
let programc = crate::parsing::parse_str(&programc_kcl, ModuleId::default())
// program main.kcl
let main_kcl = r#"
import b from 'b.kcl'
import c from 'c.kcl'
d = b + c
"#;
let main = crate::parsing::parse_str(&main_kcl, ModuleId::default())
.parse_errs_as_err() .parse_errs_as_err()
.unwrap(); .unwrap();
universe.insert("c.kcl".to_owned(), programc);
let tmpdir = tempdir::TempDir::new("zma_kcl_load_all_modules").unwrap(); let tmpdir = tempdir::TempDir::new("zma_kcl_load_all_modules").unwrap();
tokio::fs::File::create(tmpdir.path().join("main.kcl"))
.await
.unwrap()
.write_all(main_kcl.as_bytes())
.await
.unwrap();
tokio::fs::File::create(tmpdir.path().join("a.kcl")) tokio::fs::File::create(tmpdir.path().join("a.kcl"))
.await .await
.unwrap() .unwrap()
@ -2499,9 +2505,6 @@ export c = a + 2
.await .await
.unwrap(); .unwrap();
// ok we have all the "files" loaded, let's do a sort and concurrent
// run here.
let exec_ctxt = ExecutorContext { let exec_ctxt = ExecutorContext {
engine: Arc::new(Box::new( engine: Arc::new(Box::new(
crate::engine::conn_mock::EngineConnection::new() crate::engine::conn_mock::EngineConnection::new()
@ -2524,49 +2527,15 @@ export c = a + 2
}; };
let mut exec_state = ExecState::new(&exec_ctxt); let mut exec_state = ExecState::new(&exec_ctxt);
exec_ctxt.prepare_mem(&mut exec_state).await.unwrap(); exec_ctxt
.run_concurrent(
for modules in crate::walk::import_graph(&universe).unwrap().into_iter() { &crate::Program {
let mut set = JoinSet::new(); ast: main.clone(),
let (results_send, mut results_recv) = tokio::sync::mpsc::channel(1); original_file_contents: "".to_owned(),
},
for module in modules { &mut exec_state,
let program = universe.get(&module).unwrap().clone(); )
let module = module.clone(); .await
let mut exec_state = exec_state.clone(); .unwrap();
let exec_ctxt = exec_ctxt.clone();
let results_send = results_send.clone();
set.spawn(async move {
let module = module;
let mut exec_state = exec_state;
let exec_ctxt = exec_ctxt;
let program = program;
let results_send = results_send;
let result = exec_ctxt
.run(
&crate::Program {
ast: program.clone(),
original_file_contents: "".to_owned(),
},
&mut exec_state,
)
.await
.unwrap();
results_send.send((module, result)).await.unwrap();
});
}
drop(results_send);
eprintln!("Collecting results");
while let Some((module_name, module_result)) = results_recv.recv().await {
eprintln!("Got result for {}", module_name);
eprintln!("{:?}", module_result);
}
set.join_all().await;
}
} }
} }

View File

@ -27,6 +27,7 @@ pub use memory::EnvironmentRef;
use schemars::JsonSchema; use schemars::JsonSchema;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
pub use state::{ExecState, MetaSettings}; pub use state::{ExecState, MetaSettings};
use tokio::task::JoinSet;
use crate::{ use crate::{
engine::EngineManager, engine::EngineManager,
@ -721,6 +722,58 @@ impl ExecutorContext {
self.inner_run(program, exec_state, false).await self.inner_run(program, exec_state, false).await
} }
/// Perform the execution of a program using an (experimental!) concurrent
/// execution model. This has the same signature as [Self::run].
///
/// For now -- do not use this unless you're willing to accept some
/// breakage.
///
/// You can optionally pass in some initialization memory for partial
/// execution.
///
/// To access non-fatal errors and warnings, extract them from the `ExecState`.
pub async fn run_concurrent(
&self,
program: &crate::Program,
exec_state: &mut ExecState,
) -> Result<(EnvironmentRef, Option<ModelingSessionData>), KclErrorWithOutputs> {
self.prepare_mem(exec_state).await.unwrap();
let universe = std::collections::HashMap::new(); // crate::walk::import_universe(&self).await.unwrap();
for modules in crate::walk::import_graph(&universe).unwrap().into_iter() {
let mut set = JoinSet::new();
for module in modules {
let program = universe.get(&module).unwrap().clone();
let module = module.clone();
let mut exec_state = exec_state.clone();
let exec_ctxt = self.clone();
set.spawn(async move {
let module = module;
let mut exec_state = exec_state;
let exec_ctxt = exec_ctxt;
let program = program;
exec_ctxt
.run(
&crate::Program {
ast: program.clone(),
original_file_contents: "".to_owned(),
},
&mut exec_state,
)
.await
});
}
set.join_all().await;
}
self.run(&program, exec_state).await
}
/// Perform the execution of a program. Accept all possible parameters and /// Perform the execution of a program. Accept all possible parameters and
/// output everything. /// output everything.
async fn inner_run( async fn inner_run(

View File

@ -8,6 +8,7 @@ use anyhow::Result;
use crate::{ use crate::{
parsing::ast::types::{ImportPath, Node as AstNode, NodeRef, Program}, parsing::ast::types::{ImportPath, Node as AstNode, NodeRef, Program},
walk::{Node, Visitable}, walk::{Node, Visitable},
ExecutorContext,
}; };
/// Specific dependency between two modules. The 0th element of this tuple /// Specific dependency between two modules. The 0th element of this tuple
@ -125,6 +126,10 @@ pub(crate) fn import_dependencies(prog: NodeRef<Program>) -> Result<Vec<String>>
Ok(ret) Ok(ret)
} }
pub(crate) async fn import_universe(ctx: &ExecutorContext) -> Result<HashMap<String, Program>> {
panic!("ASF");
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;

View File

@ -9,3 +9,4 @@ pub use ast_node::Node;
pub use ast_visitor::Visitable; pub use ast_visitor::Visitable;
pub use ast_walk::walk; pub use ast_walk::walk;
pub use import_graph::import_graph; pub use import_graph::import_graph;
pub(crate) use import_graph::import_universe;