Compare commits

...

1 Commits

View File

@ -40,23 +40,54 @@ pub struct TcpRead {
stream: futures::stream::SplitStream<tokio_tungstenite::WebSocketStream<reqwest::Upgraded>>, stream: futures::stream::SplitStream<tokio_tungstenite::WebSocketStream<reqwest::Upgraded>>,
} }
/// Occurs when client couldn't read from the WebSocket to the engine.
// #[derive(Debug)]
pub enum WebSocketReadError {
/// Could not read a message due to WebSocket errors.
Read(tokio_tungstenite::tungstenite::Error),
/// WebSocket message didn't contain a valid message that the KCL Executor could parse.
Deser(anyhow::Error),
}
impl From<anyhow::Error> for WebSocketReadError {
fn from(e: anyhow::Error) -> Self {
Self::Deser(e)
}
}
impl TcpRead { impl TcpRead {
pub async fn read(&mut self) -> Result<WebSocketResponse> { pub async fn read(&mut self) -> std::result::Result<WebSocketResponse, WebSocketReadError> {
let Some(msg) = self.stream.next().await else { let Some(msg) = self.stream.next().await else {
anyhow::bail!("Failed to read from websocket"); return Err(anyhow::anyhow!("Failed to read from WebSocket").into());
}; };
let msg: WebSocketResponse = match msg? { let msg = match msg {
WsMsg::Text(text) => serde_json::from_str(&text)?, Ok(msg) => msg,
WsMsg::Binary(bin) => bson::from_slice(&bin)?, Err(e) if matches!(e, tokio_tungstenite::tungstenite::Error::Protocol(_)) => {
other => anyhow::bail!("Unexpected websocket message from server: {}", other), return Err(WebSocketReadError::Read(e))
}
Err(e) => return Err(anyhow::anyhow!("Error reading from engine's WebSocket: {e}").into()),
};
let msg: WebSocketResponse = match msg {
WsMsg::Text(text) => serde_json::from_str(&text)
.map_err(anyhow::Error::from)
.map_err(WebSocketReadError::from)?,
WsMsg::Binary(bin) => bson::from_slice(&bin)
.map_err(anyhow::Error::from)
.map_err(WebSocketReadError::from)?,
other => return Err(anyhow::anyhow!("Unexpected WebSocket message from engine API: {other}").into()),
}; };
Ok(msg) Ok(msg)
} }
} }
#[derive(Debug)]
pub struct TcpReadHandle { pub struct TcpReadHandle {
handle: Arc<tokio::task::JoinHandle<Result<()>>>, handle: Arc<tokio::task::JoinHandle<Result<(), WebSocketReadError>>>,
}
impl std::fmt::Debug for TcpReadHandle {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "TcpReadHandle")
}
} }
impl Drop for TcpReadHandle { impl Drop for TcpReadHandle {
@ -150,14 +181,17 @@ impl EngineConnection {
match tcp_read.read().await { match tcp_read.read().await {
Ok(ws_resp) => { Ok(ws_resp) => {
for e in ws_resp.errors.iter().flatten() { for e in ws_resp.errors.iter().flatten() {
println!("got error message: {e}"); println!("got error message: {} {}", e.error_code, e.message);
} }
if let Some(id) = ws_resp.request_id { if let Some(id) = ws_resp.request_id {
responses_clone.insert(id, ws_resp.clone()); responses_clone.insert(id, ws_resp.clone());
} }
} }
Err(e) => { Err(e) => {
println!("got ws error: {:?}", e); match &e {
WebSocketReadError::Read(e) => eprintln!("could not read from WS: {:?}", e),
WebSocketReadError::Deser(e) => eprintln!("could not deserialize msg from WS: {:?}", e),
}
*socket_health_tcp_read.lock().unwrap() = SocketHealth::Inactive; *socket_health_tcp_read.lock().unwrap() = SocketHealth::Inactive;
return Err(e); return Err(e);
} }