EngineConnection should fail fast if socket closes (#1600)
* EngineConnection should fail fast if socket closes * Fix clippy lint
This commit is contained in:
		@ -1,7 +1,7 @@
 | 
			
		||||
//! Functions for setting up our WebSocket and WebRTC connections for communications with the
 | 
			
		||||
//! engine.
 | 
			
		||||
 | 
			
		||||
use std::sync::Arc;
 | 
			
		||||
use std::sync::{Arc, Mutex};
 | 
			
		||||
 | 
			
		||||
use anyhow::{anyhow, Result};
 | 
			
		||||
use dashmap::DashMap;
 | 
			
		||||
@ -15,6 +15,12 @@ use crate::{
 | 
			
		||||
    errors::{KclError, KclErrorDetails},
 | 
			
		||||
};
 | 
			
		||||
 | 
			
		||||
#[derive(Debug, PartialEq)]
 | 
			
		||||
enum SocketHealth {
 | 
			
		||||
    Active,
 | 
			
		||||
    Inactive,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type WebSocketTcpWrite = futures::stream::SplitSink<tokio_tungstenite::WebSocketStream<reqwest::Upgraded>, WsMsg>;
 | 
			
		||||
#[derive(Debug, Clone)]
 | 
			
		||||
#[allow(dead_code)] // for the TcpReadHandle
 | 
			
		||||
@ -22,6 +28,7 @@ pub struct EngineConnection {
 | 
			
		||||
    engine_req_tx: mpsc::Sender<ToEngineReq>,
 | 
			
		||||
    responses: Arc<DashMap<uuid::Uuid, WebSocketResponse>>,
 | 
			
		||||
    tcp_read_handle: Arc<TcpReadHandle>,
 | 
			
		||||
    socket_health: Arc<Mutex<SocketHealth>>,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
pub struct TcpRead {
 | 
			
		||||
@ -119,7 +126,9 @@ impl EngineConnection {
 | 
			
		||||
 | 
			
		||||
        let responses: Arc<DashMap<uuid::Uuid, WebSocketResponse>> = Arc::new(DashMap::new());
 | 
			
		||||
        let responses_clone = responses.clone();
 | 
			
		||||
        let socket_health = Arc::new(Mutex::new(SocketHealth::Active));
 | 
			
		||||
 | 
			
		||||
        let socket_health_tcp_read = socket_health.clone();
 | 
			
		||||
        let tcp_read_handle = tokio::spawn(async move {
 | 
			
		||||
            // Get Websocket messages from API server
 | 
			
		||||
            loop {
 | 
			
		||||
@ -131,6 +140,7 @@ impl EngineConnection {
 | 
			
		||||
                    }
 | 
			
		||||
                    Err(e) => {
 | 
			
		||||
                        println!("got ws error: {:?}", e);
 | 
			
		||||
                        *socket_health_tcp_read.lock().unwrap() = SocketHealth::Inactive;
 | 
			
		||||
                        return Err(e);
 | 
			
		||||
                    }
 | 
			
		||||
                }
 | 
			
		||||
@ -143,6 +153,7 @@ impl EngineConnection {
 | 
			
		||||
                handle: Arc::new(tcp_read_handle),
 | 
			
		||||
            }),
 | 
			
		||||
            responses,
 | 
			
		||||
            socket_health,
 | 
			
		||||
        })
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
@ -192,6 +203,14 @@ impl EngineManager for EngineConnection {
 | 
			
		||||
        // Wait for the response.
 | 
			
		||||
        let current_time = std::time::Instant::now();
 | 
			
		||||
        while current_time.elapsed().as_secs() < 60 {
 | 
			
		||||
            if let Ok(guard) = self.socket_health.lock() {
 | 
			
		||||
                if *guard == SocketHealth::Inactive {
 | 
			
		||||
                    return Err(KclError::Engine(KclErrorDetails {
 | 
			
		||||
                        message: "Modeling command failed: websocket closed early".to_string(),
 | 
			
		||||
                        source_ranges: vec![source_range],
 | 
			
		||||
                    }));
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
            // We pop off the responses to cleanup our mappings.
 | 
			
		||||
            if let Some((_, resp)) = self.responses.remove(&id) {
 | 
			
		||||
                return if let Some(data) = &resp.resp {
 | 
			
		||||
 | 
			
		||||
		Reference in New Issue
	
	Block a user