turn back on the test i tturned off (#6522)
* random other cahnges Signed-off-by: Jess Frazelle <github@jessfraz.com> * turn back on test Signed-off-by: Jess Frazelle <github@jessfraz.com> * docs Signed-off-by: Jess Frazelle <github@jessfraz.com> * lots of enhancements Signed-off-by: Jess Frazelle <github@jessfraz.com> * cleanup Signed-off-by: Jess Frazelle <github@jessfraz.com> * updates Signed-off-by: Jess Frazelle <github@jessfraz.com> * mesh test Signed-off-by: Jess Frazelle <github@jessfraz.com> * mesh test Signed-off-by: Jess Frazelle <github@jessfraz.com> * check panics Signed-off-by: Jess Frazelle <github@jessfraz.com> * updates Signed-off-by: Jess Frazelle <github@jessfraz.com> * check panics Signed-off-by: Jess Frazelle <github@jessfraz.com> * check panics Signed-off-by: Jess Frazelle <github@jessfraz.com> * cleanup Signed-off-by: Jess Frazelle <github@jessfraz.com> * if running in vitest make single threadedd Signed-off-by: Jess Frazelle <github@jessfraz.com> * check if running in vitest Signed-off-by: Jess Frazelle <github@jessfraz.com> * console logs Signed-off-by: Jess Frazelle <github@jessfraz.com> --------- Signed-off-by: Jess Frazelle <github@jessfraz.com>
This commit is contained in:
@ -1,89 +1,116 @@
|
||||
//! This module contains the `AsyncTasks` struct, which is used to manage a set of asynchronous
|
||||
//! This module contains the wasm-specific `AsyncTasks` struct, which is used to manage a set of asynchronous
|
||||
//! tasks.
|
||||
|
||||
use std::{ops::AddAssign, sync::Arc};
|
||||
use std::sync::{
|
||||
atomic::{AtomicUsize, Ordering},
|
||||
Arc,
|
||||
};
|
||||
|
||||
use tokio::sync::RwLock;
|
||||
use tokio::sync::{mpsc, Notify};
|
||||
|
||||
use crate::errors::KclError;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct AsyncTasks {
|
||||
pub sender: Arc<RwLock<tokio::sync::mpsc::Sender<Result<(), KclError>>>>,
|
||||
pub receiver: Arc<RwLock<tokio::sync::mpsc::Receiver<Result<(), KclError>>>>,
|
||||
pub sent: Arc<RwLock<usize>>,
|
||||
// Results arrive here (unbounded = never blocks the producer)
|
||||
tx: mpsc::UnboundedSender<Result<(), KclError>>,
|
||||
rx: Arc<tokio::sync::Mutex<mpsc::UnboundedReceiver<Result<(), KclError>>>>,
|
||||
|
||||
// How many tasks we started since last clear()
|
||||
spawned: Arc<AtomicUsize>,
|
||||
|
||||
// Used to wake `join_all()` as soon as a task finishes.
|
||||
notifier: Arc<Notify>,
|
||||
}
|
||||
|
||||
impl AsyncTasks {
|
||||
pub fn new() -> Self {
|
||||
let (results_tx, results_rx) = tokio::sync::mpsc::channel(1);
|
||||
Self {
|
||||
sender: Arc::new(RwLock::new(results_tx)),
|
||||
receiver: Arc::new(RwLock::new(results_rx)),
|
||||
sent: Arc::new(RwLock::new(0)),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn spawn<F>(&mut self, task: F)
|
||||
where
|
||||
F: std::future::Future<Output = anyhow::Result<(), KclError>>,
|
||||
F: Send + 'static,
|
||||
{
|
||||
// Add one to the sent counter.
|
||||
self.sent.write().await.add_assign(1);
|
||||
|
||||
// Spawn the task and send the result to the channel.
|
||||
let sender_clone = self.sender.clone();
|
||||
wasm_bindgen_futures::spawn_local(async move {
|
||||
let result = task.await;
|
||||
let sender = sender_clone.read().await;
|
||||
if let Err(_) = sender.send(result).await {
|
||||
web_sys::console::error_1(&"Failed to send result".into());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// Wait for all tasks to finish.
|
||||
// Return an error if any of them failed.
|
||||
pub async fn join_all(&mut self) -> anyhow::Result<(), KclError> {
|
||||
if *self.sent.read().await == 0 {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let mut results = Vec::new();
|
||||
let mut receiver = self.receiver.write().await;
|
||||
|
||||
// Wait for all tasks to finish.
|
||||
while let Some(result) = receiver.recv().await {
|
||||
results.push(result);
|
||||
|
||||
// Check if all tasks have finished.
|
||||
if results.len() == *self.sent.read().await {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// Check if any of the tasks failed.
|
||||
for result in results {
|
||||
result?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn clear(&mut self) {
|
||||
// Clear the sent counter.
|
||||
*self.sent.write().await = 0;
|
||||
|
||||
// Clear the channel.
|
||||
let (results_tx, results_rx) = tokio::sync::mpsc::channel(1);
|
||||
*self.sender.write().await = results_tx;
|
||||
*self.receiver.write().await = results_rx;
|
||||
}
|
||||
}
|
||||
// Safety: single-threaded wasm ⇒ these are sound.
|
||||
unsafe impl Send for AsyncTasks {}
|
||||
unsafe impl Sync for AsyncTasks {}
|
||||
|
||||
impl Default for AsyncTasks {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
|
||||
impl AsyncTasks {
|
||||
pub fn new() -> Self {
|
||||
console_error_panic_hook::set_once();
|
||||
|
||||
let (tx, rx) = mpsc::unbounded_channel();
|
||||
Self {
|
||||
tx,
|
||||
rx: Arc::new(tokio::sync::Mutex::new(rx)),
|
||||
spawned: Arc::new(AtomicUsize::new(0)),
|
||||
notifier: Arc::new(Notify::new()),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn spawn<F>(&mut self, fut: F)
|
||||
where
|
||||
F: std::future::Future<Output = anyhow::Result<(), KclError>> + Send + 'static,
|
||||
{
|
||||
self.spawned.fetch_add(1, Ordering::Relaxed);
|
||||
let tx = self.tx.clone();
|
||||
let notify = self.notifier.clone();
|
||||
|
||||
wasm_bindgen_futures::spawn_local(async move {
|
||||
console_error_panic_hook::set_once();
|
||||
let _ = tx.send(fut.await); // ignore if receiver disappeared
|
||||
notify.notify_one(); // wake any join_all waiter
|
||||
});
|
||||
}
|
||||
|
||||
// Wait for all tasks to finish.
|
||||
// Return an error if any of them failed.
|
||||
pub async fn join_all(&mut self) -> anyhow::Result<(), KclError> {
|
||||
let total = self.spawned.load(Ordering::Acquire);
|
||||
if total == 0 {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let mut done = 0;
|
||||
while done < total {
|
||||
// 1) Drain whatever is already in the channel
|
||||
{
|
||||
let mut rx = self.rx.lock().await;
|
||||
while let Ok(res) = rx.try_recv() {
|
||||
done += 1;
|
||||
res?; // propagate first Err
|
||||
}
|
||||
}
|
||||
if done >= total {
|
||||
break;
|
||||
}
|
||||
// Yield to the event loop so that we don't block the UI thread.
|
||||
// No seriously WE DO NOT WANT TO PAUSE THE WHOLE APP ON THE JS SIDE.
|
||||
futures_lite::future::yield_now().await;
|
||||
// Check again before waiting to avoid missing notifications
|
||||
{
|
||||
let mut rx = self.rx.lock().await;
|
||||
while let Ok(res) = rx.try_recv() {
|
||||
done += 1;
|
||||
res?; // propagate first Err
|
||||
if done >= total {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
// Only wait for notification if we still need more tasks to complete
|
||||
if done < total {
|
||||
// 2) Nothing ready yet → wait for a notifier poke
|
||||
self.notifier.notified().await;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn clear(&mut self) {
|
||||
self.spawned.store(0, Ordering::Release);
|
||||
|
||||
// Drain channel so old results don’t confuse the next join_all.
|
||||
let mut rx = self.rx.lock().await;
|
||||
while rx.try_recv().is_ok() {}
|
||||
}
|
||||
}
|
||||
|
@ -229,18 +229,20 @@ pub trait EngineManager: std::fmt::Debug + Send + Sync + 'static {
|
||||
while current_time.elapsed().as_secs() < 60 {
|
||||
let responses = self.responses().read().await.clone();
|
||||
let Some(resp) = responses.get(&id) else {
|
||||
// Sleep for a little so we don't hog the CPU.
|
||||
// Yield to the event loop so that we don’t block the UI thread.
|
||||
// No seriously WE DO NOT WANT TO PAUSE THE WHOLE APP ON THE JS SIDE.
|
||||
let duration = instant::Duration::from_millis(100);
|
||||
#[cfg(target_arch = "wasm32")]
|
||||
wasm_timer::Delay::new(duration).await.map_err(|err| {
|
||||
KclError::Internal(KclErrorDetails {
|
||||
message: format!("Failed to sleep: {:?}", err),
|
||||
source_ranges: vec![source_range],
|
||||
})
|
||||
})?;
|
||||
{
|
||||
let duration = instant::Duration::from_millis(1);
|
||||
wasm_timer::Delay::new(duration).await.map_err(|err| {
|
||||
KclError::Internal(KclErrorDetails {
|
||||
message: format!("Failed to sleep: {:?}", err),
|
||||
source_ranges: vec![source_range],
|
||||
})
|
||||
})?;
|
||||
}
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
tokio::time::sleep(duration).await;
|
||||
tokio::task::yield_now().await;
|
||||
continue;
|
||||
};
|
||||
|
||||
|
Reference in New Issue
Block a user