parallelized modules from kcl (#6206)

* wip

* sketch a bit more; going to pull this out of tests next

* wip

* lock start things

* this was a bad idea

* Revert "this was a bad idea"

This reverts commit a2092e7ed6.

* prepare prelude before spawning

* error

* poop

* yike

* :(

* ok

* Reapply "this was a bad idea"

This reverts commit fafdf41093.

* chip away more

* man this is bad

* fix rebase add feature flag

Signed-off-by: Jess Frazelle <github@jessfraz.com>

* get rid of execution kind

Signed-off-by: Jess Frazelle <github@jessfraz.com>

* clippy

Signed-off-by: Jess Frazelle <github@jessfraz.com>

* logs

Signed-off-by: Jess Frazelle <github@jessfraz.com>

* updates

Signed-off-by: Jess Frazelle <github@jessfraz.com>

* no extra executes

Signed-off-by: Jess Frazelle <github@jessfraz.com>

* race w batch

Signed-off-by: Jess Frazelle <github@jessfraz.com>

* cluppy

Signed-off-by: Jess Frazelle <github@jessfraz.com>

* no printlns

Signed-off-by: Jess Frazelle <github@jessfraz.com>

* no printlns

Signed-off-by: Jess Frazelle <github@jessfraz.com>

* fix source ranges

Signed-off-by: Jess Frazelle <github@jessfraz.com>

* batch shit

Signed-off-by: Jess Frazelle <github@jessfraz.com>

* fixes

Signed-off-by: Jess Frazelle <github@jessfraz.com>

* updates

Signed-off-by: Jess Frazelle <github@jessfraz.com>

* fix

Signed-off-by: Jess Frazelle <github@jessfraz.com>

* fix some bugs

Signed-off-by: Jess Frazelle <github@jessfraz.com>

* fix error

Signed-off-by: Jess Frazelle <github@jessfraz.com>

* cut 1

* preserve mem

* re-ad deep_clone

the helper we were calling was pushing a new call, which was hanging
out. we can skip the middleman since we already have something properly
prepared, just without a stdlib in some cases.

* skip non-kcl

* clean up source range bug

* error message changed

the uuids also changed because the error is hit before execute even
starts.

* typo

* rensnapshot a few

* order things

* MAYBE REVERT LATER:

attempt at an ordering

* snapsnap

* Revert "snapsnap"

This reverts commit 7350b32c7d.

* Revert "MAYBE REVERT LATER:"

This reverts commit ab49f3e85f.

* ugh

* poop

* poop2

* lint

* tranche 1

* more

* more snaps

* snap

* more

* update

* MAYBE REVERT THIS

* cache multi-file

Signed-off-by: Jess Frazelle <github@jessfraz.com>

* addd tests

Signed-off-by: Jess Frazelle <github@jessfraz.com>

* set to false

Signed-off-by: Jess Frazelle <github@jessfraz.com>

* add test outputs

Signed-off-by: Jess Frazelle <github@jessfraz.com>

* clippy

Signed-off-by: Jess Frazelle <github@jessfraz.com>

* kcl-py-bindings uses carwheel

Signed-off-by: Jess Frazelle <github@jessfraz.com>

* update snapshots

Signed-off-by: Jess Frazelle <github@jessfraz.com>

* updates

Signed-off-by: Jess Frazelle <github@jessfraz.com>

---------

Signed-off-by: Jess Frazelle <github@jessfraz.com>
Co-authored-by: Paul R. Tagliamonte <paul@zoo.dev>
Co-authored-by: Paul Tagliamonte <paultag@gmail.com>
This commit is contained in:
Jess Frazelle
2025-04-16 11:52:14 -07:00
committed by GitHub
parent 5586646a60
commit d9fe78171f
195 changed files with 15585 additions and 8887 deletions

View File

@ -9,7 +9,6 @@ use super::{
types::{PrimitiveType, CHECK_NUMERIC_TYPES},
};
use crate::{
engine::ExecutionKind,
errors::{KclError, KclErrorDetails},
execution::{
annotations,
@ -66,15 +65,13 @@ impl ExecutorContext {
exec_state.mod_local.explicit_length_units = true;
}
let new_units = exec_state.length_unit();
if !self.engine.execution_kind().await.is_isolated() {
self.engine
.set_units(
new_units.into(),
annotation.as_source_range(),
exec_state.id_generator(),
)
.await?;
}
self.engine
.set_units(
new_units.into(),
annotation.as_source_range(),
exec_state.id_generator(),
)
.await?;
} else {
exec_state.err(CompilationError::err(
annotation.as_source_range(),
@ -104,15 +101,11 @@ impl ExecutorContext {
&self,
program: &Node<Program>,
exec_state: &mut ExecState,
exec_kind: ExecutionKind,
preserve_mem: bool,
module_id: ModuleId,
path: &ModulePath,
) -> Result<(Option<KclValue>, EnvironmentRef, Vec<String>), KclError> {
crate::log::log(format!("enter module {path} {} {exec_kind:?}", exec_state.stack()));
let old_units = exec_state.length_unit();
let original_execution = self.engine.replace_execution_kind(exec_kind).await;
crate::log::log(format!("enter module {path} {}", exec_state.stack()));
let mut local_state = ModuleState::new(path.std_path(), exec_state.stack().memory.clone(), Some(module_id));
if !preserve_mem {
@ -131,7 +124,6 @@ impl ExecutorContext {
.exec_block(program, exec_state, crate::execution::BodyType::Root)
.await;
let new_units = exec_state.length_unit();
let env_ref = if preserve_mem {
exec_state.mut_stack().pop_and_preserve_env()
} else {
@ -141,17 +133,6 @@ impl ExecutorContext {
std::mem::swap(&mut exec_state.mod_local, &mut local_state);
}
// We only need to reset the units if we are not on the Main path.
// If we reset at the end of the main path, then we just add on an extra
// command and we'd need to flush the batch again.
// This avoids that.
if !exec_kind.is_isolated() && new_units != old_units && *path != ModulePath::Main {
self.engine
.set_units(old_units.into(), Default::default(), exec_state.id_generator())
.await?;
}
self.engine.replace_execution_kind(original_execution).await;
crate::log::log(format!("leave {path}"));
result.map(|result| (result, env_ref, local_state.module_exports))
@ -161,7 +142,7 @@ impl ExecutorContext {
#[async_recursion]
pub(super) async fn exec_block<'a>(
&'a self,
program: NodeRef<'a, crate::parsing::ast::types::Program>,
program: NodeRef<'a, Program>,
exec_state: &mut ExecState,
body_type: BodyType,
) -> Result<Option<KclValue>, KclError> {
@ -185,9 +166,8 @@ impl ExecutorContext {
match &import_stmt.selector {
ImportSelector::List { items } => {
let (env_ref, module_exports) = self
.exec_module_for_items(module_id, exec_state, ExecutionKind::Isolated, source_range)
.await?;
let (env_ref, module_exports) =
self.exec_module_for_items(module_id, exec_state, source_range).await?;
for import_item in items {
// Extract the item from the module.
let item = exec_state
@ -228,9 +208,8 @@ impl ExecutorContext {
}
}
ImportSelector::Glob(_) => {
let (env_ref, module_exports) = self
.exec_module_for_items(module_id, exec_state, ExecutionKind::Isolated, source_range)
.await?;
let (env_ref, module_exports) =
self.exec_module_for_items(module_id, exec_state, source_range).await?;
for name in module_exports.iter() {
let item = exec_state
.stack()
@ -421,7 +400,7 @@ impl ExecutorContext {
Ok(last_expr)
}
pub(super) async fn open_module(
pub async fn open_module(
&self,
path: &ImportPath,
attrs: &[Node<Annotation>],
@ -429,6 +408,7 @@ impl ExecutorContext {
source_range: SourceRange,
) -> Result<ModuleId, KclError> {
let resolved_path = ModulePath::from_import_path(path, &self.settings.project_directory);
match path {
ImportPath::Kcl { .. } => {
exec_state.global.mod_loader.cycle_check(&resolved_path, source_range)?;
@ -485,7 +465,6 @@ impl ExecutorContext {
&self,
module_id: ModuleId,
exec_state: &mut ExecState,
exec_kind: ExecutionKind,
source_range: SourceRange,
) -> Result<(EnvironmentRef, Vec<String>), KclError> {
let path = exec_state.global.module_infos[&module_id].path.clone();
@ -494,12 +473,12 @@ impl ExecutorContext {
let result = match &mut repr {
ModuleRepr::Root => Err(exec_state.circular_import_error(&path, source_range)),
ModuleRepr::Kcl(_, Some((env_ref, items))) => Ok((*env_ref, items.clone())),
ModuleRepr::Kcl(_, Some((_, env_ref, items))) => Ok((*env_ref, items.clone())),
ModuleRepr::Kcl(program, cache) => self
.exec_module_from_ast(program, module_id, &path, exec_state, exec_kind, source_range)
.exec_module_from_ast(program, module_id, &path, exec_state, source_range, false)
.await
.map(|(_, er, items)| {
*cache = Some((er, items.clone()));
.map(|(val, er, items)| {
*cache = Some((val, er, items.clone()));
(er, items)
}),
ModuleRepr::Foreign(geom) => Err(KclError::Semantic(KclErrorDetails {
@ -518,7 +497,6 @@ impl ExecutorContext {
module_id: ModuleId,
module_name: &BoxNode<Name>,
exec_state: &mut ExecState,
exec_kind: ExecutionKind,
source_range: SourceRange,
) -> Result<Option<KclValue>, KclError> {
exec_state.global.operations.push(Operation::GroupBegin {
@ -535,13 +513,14 @@ impl ExecutorContext {
let result = match &mut repr {
ModuleRepr::Root => Err(exec_state.circular_import_error(&path, source_range)),
ModuleRepr::Kcl(_, Some((val, _, _))) => Ok(val.clone()),
ModuleRepr::Kcl(program, cached_items) => {
let result = self
.exec_module_from_ast(program, module_id, &path, exec_state, exec_kind, source_range)
.exec_module_from_ast(program, module_id, &path, exec_state, source_range, false)
.await;
match result {
Ok((val, env, items)) => {
*cached_items = Some((env, items));
*cached_items = Some((val.clone(), env, items));
Ok(val)
}
Err(e) => Err(e),
@ -560,18 +539,18 @@ impl ExecutorContext {
result
}
async fn exec_module_from_ast(
pub async fn exec_module_from_ast(
&self,
program: &Node<Program>,
module_id: ModuleId,
path: &ModulePath,
exec_state: &mut ExecState,
exec_kind: ExecutionKind,
source_range: SourceRange,
preserve_mem: bool,
) -> Result<(Option<KclValue>, EnvironmentRef, Vec<String>), KclError> {
exec_state.global.mod_loader.enter_module(path);
let result = self
.exec_module_body(program, exec_state, exec_kind, false, module_id, path)
.exec_module_body(program, exec_state, preserve_mem, module_id, path)
.await;
exec_state.global.mod_loader.leave_module(path);
@ -608,7 +587,7 @@ impl ExecutorContext {
Expr::Name(name) => {
let value = name.get_result(exec_state, self).await?.clone();
if let KclValue::Module { value: module_id, meta } = value {
self.exec_module_for_result(module_id, name, exec_state, ExecutionKind::Normal, metadata.source_range)
self.exec_module_for_result(module_id, name, exec_state, metadata.source_range)
.await?
.unwrap_or_else(|| {
exec_state.warn(CompilationError::err(
@ -808,7 +787,7 @@ impl Node<Name> {
};
mem_spec = Some(
ctx.exec_module_for_items(*module_id, exec_state, ExecutionKind::Normal, p.as_source_range())
ctx.exec_module_for_items(*module_id, exec_state, p.as_source_range())
.await?,
);
}
@ -1320,7 +1299,7 @@ impl Node<CallExpressionKw> {
));
}
let op = if func.feature_tree_operation() && !ctx.is_isolated_execution().await {
let op = if func.feature_tree_operation() {
let op_labeled_args = args
.kw_args
.labeled
@ -1406,7 +1385,7 @@ impl Node<CallExpressionKw> {
e.add_source_ranges(vec![callsite])
})?;
if matches!(fn_src, FunctionSource::User { .. }) && !ctx.is_isolated_execution().await {
if matches!(fn_src, FunctionSource::User { .. }) {
// Track return operation.
exec_state.global.operations.push(Operation::GroupEnd);
}
@ -1458,7 +1437,7 @@ impl Node<CallExpression> {
));
}
let op = if func.feature_tree_operation() && !ctx.is_isolated_execution().await {
let op = if func.feature_tree_operation() {
let op_labeled_args = func
.args(false)
.iter()
@ -1516,19 +1495,17 @@ impl Node<CallExpression> {
// exec_state.
let func = fn_name.get_result(exec_state, ctx).await?.clone();
if !ctx.is_isolated_execution().await {
// Track call operation.
exec_state.global.operations.push(Operation::GroupBegin {
group: Group::FunctionCall {
name: Some(fn_name.to_string()),
function_source_range: func.function_def_source_range().unwrap_or_default(),
unlabeled_arg: None,
// TODO: Add the arguments for legacy positional parameters.
labeled_args: Default::default(),
},
source_range: callsite,
});
}
// Track call operation.
exec_state.global.operations.push(Operation::GroupBegin {
group: Group::FunctionCall {
name: Some(fn_name.to_string()),
function_source_range: func.function_def_source_range().unwrap_or_default(),
unlabeled_arg: None,
// TODO: Add the arguments for legacy positional parameters.
labeled_args: Default::default(),
},
source_range: callsite,
});
let Some(fn_src) = func.as_fn() else {
return Err(KclError::Semantic(KclErrorDetails {
@ -1557,10 +1534,8 @@ impl Node<CallExpression> {
})
})?;
if !ctx.is_isolated_execution().await {
// Track return operation.
exec_state.global.operations.push(Operation::GroupEnd);
}
// Track return operation.
exec_state.global.operations.push(Operation::GroupEnd);
Ok(result)
}
@ -2313,7 +2288,7 @@ impl FunctionSource {
}
}
let op = if props.include_in_feature_tree && !ctx.is_isolated_execution().await {
let op = if props.include_in_feature_tree {
let op_labeled_args = args
.kw_args
.labeled
@ -2357,28 +2332,26 @@ impl FunctionSource {
Ok(Some(result))
}
FunctionSource::User { ast, memory, .. } => {
if !ctx.is_isolated_execution().await {
// Track call operation.
let op_labeled_args = args
.kw_args
.labeled
.iter()
.map(|(k, arg)| (k.clone(), OpArg::new(OpKclValue::from(&arg.value), arg.source_range)))
.collect();
exec_state.global.operations.push(Operation::GroupBegin {
group: Group::FunctionCall {
name: fn_name.clone(),
function_source_range: ast.as_source_range(),
unlabeled_arg: args
.kw_args
.unlabeled
.as_ref()
.map(|arg| OpArg::new(OpKclValue::from(&arg.value), arg.source_range)),
labeled_args: op_labeled_args,
},
source_range: callsite,
});
}
// Track call operation.
let op_labeled_args = args
.kw_args
.labeled
.iter()
.map(|(k, arg)| (k.clone(), OpArg::new(OpKclValue::from(&arg.value), arg.source_range)))
.collect();
exec_state.global.operations.push(Operation::GroupBegin {
group: Group::FunctionCall {
name: fn_name.clone(),
function_source_range: ast.as_source_range(),
unlabeled_arg: args
.kw_args
.unlabeled
.as_ref()
.map(|arg| OpArg::new(OpKclValue::from(&arg.value), arg.source_range)),
labeled_args: op_labeled_args,
},
source_range: callsite,
});
call_user_defined_function_kw(fn_name.as_deref(), args.kw_args, *memory, ast, exec_state, ctx).await
}
@ -2391,10 +2364,13 @@ impl FunctionSource {
mod test {
use std::sync::Arc;
use tokio::io::AsyncWriteExt;
use super::*;
use crate::{
execution::{memory::Stack, parse_execute, ContextType},
parsing::ast::types::{DefaultParamVal, Identifier, Parameter},
ExecutorSettings,
};
#[tokio::test(flavor = "multi_thread")]
@ -2504,7 +2480,7 @@ mod test {
// Run each test.
let func_expr = &Node::no_src(FunctionExpression {
params,
body: crate::parsing::ast::types::Program::empty(),
body: Program::empty(),
return_type: None,
digest: None,
});
@ -2607,6 +2583,102 @@ a = foo()
assert!(result.unwrap_err().to_string().contains("return"));
}
#[tokio::test(flavor = "multi_thread")]
async fn load_all_modules() {
// program a.kcl
let programa_kcl = r#"
export a = 1
"#;
// program b.kcl
let programb_kcl = r#"
import a from 'a.kcl'
export b = a + 1
"#;
// program c.kcl
let programc_kcl = r#"
import a from 'a.kcl'
export c = a + 2
"#;
// program main.kcl
let main_kcl = r#"
import b from 'b.kcl'
import c from 'c.kcl'
d = b + c
"#;
let main = crate::parsing::parse_str(main_kcl, ModuleId::default())
.parse_errs_as_err()
.unwrap();
let tmpdir = tempdir::TempDir::new("zma_kcl_load_all_modules").unwrap();
tokio::fs::File::create(tmpdir.path().join("main.kcl"))
.await
.unwrap()
.write_all(main_kcl.as_bytes())
.await
.unwrap();
tokio::fs::File::create(tmpdir.path().join("a.kcl"))
.await
.unwrap()
.write_all(programa_kcl.as_bytes())
.await
.unwrap();
tokio::fs::File::create(tmpdir.path().join("b.kcl"))
.await
.unwrap()
.write_all(programb_kcl.as_bytes())
.await
.unwrap();
tokio::fs::File::create(tmpdir.path().join("c.kcl"))
.await
.unwrap()
.write_all(programc_kcl.as_bytes())
.await
.unwrap();
let exec_ctxt = ExecutorContext {
engine: Arc::new(Box::new(
crate::engine::conn_mock::EngineConnection::new()
.await
.map_err(|err| {
KclError::Internal(crate::errors::KclErrorDetails {
message: format!("Failed to create mock engine connection: {}", err),
source_ranges: vec![SourceRange::default()],
})
})
.unwrap(),
)),
fs: Arc::new(crate::fs::FileManager::new()),
settings: ExecutorSettings {
project_directory: Some(tmpdir.path().into()),
..Default::default()
},
stdlib: Arc::new(crate::std::StdLib::new()),
context_type: ContextType::Mock,
};
let mut exec_state = ExecState::new(&exec_ctxt);
exec_ctxt
.run_concurrent(
&crate::Program {
ast: main.clone(),
original_file_contents: "".to_owned(),
},
&mut exec_state,
false,
)
.await
.unwrap();
}
#[tokio::test(flavor = "multi_thread")]
async fn user_coercion() {
let program = r#"fn foo(x: Axis2d) {