diff --git a/src/components/NetworkHealthIndicator.test.tsx b/src/components/NetworkHealthIndicator.test.tsx index 3dfeafaf2..9761d102a 100644 --- a/src/components/NetworkHealthIndicator.test.tsx +++ b/src/components/NetworkHealthIndicator.test.tsx @@ -30,7 +30,7 @@ describe('NetworkHealthIndicator tests', () => { fireEvent.click(screen.getByTestId('network-toggle')) expect(screen.getByTestId('network')).toHaveTextContent( - NETWORK_HEALTH_TEXT[NetworkHealthState.Ok] + NETWORK_HEALTH_TEXT[NetworkHealthState.Issue] ) }) diff --git a/src/components/NetworkHealthIndicator.tsx b/src/components/NetworkHealthIndicator.tsx index 0c9f496d8..01b2eee7f 100644 --- a/src/components/NetworkHealthIndicator.tsx +++ b/src/components/NetworkHealthIndicator.tsx @@ -6,7 +6,8 @@ import { ConnectingTypeGroup, DisconnectingType, engineCommandManager, - EngineConnectionState, + EngineCommandManagerEvents, + EngineConnectionEvents, EngineConnectionStateType, ErrorType, initialConnectingTypeGroupState, @@ -81,37 +82,35 @@ const overallConnectionStateIcon: Record< } export function useNetworkStatus() { - const [steps, setSteps] = useState(initialConnectingTypeGroupState) + const [steps, setSteps] = useState( + structuredClone(initialConnectingTypeGroupState) + ) const [internetConnected, setInternetConnected] = useState(true) const [overallState, setOverallState] = useState( - NetworkHealthState.Ok + NetworkHealthState.Disconnected ) + const [pingPongHealth, setPingPongHealth] = useState<'OK' | 'BAD'>('BAD') const [hasCopied, setHasCopied] = useState(false) const [error, setError] = useState(undefined) - const issues: Record = { - [ConnectingTypeGroup.WebSocket]: steps[ConnectingTypeGroup.WebSocket].some( - (a: [ConnectingType, boolean | undefined]) => a[1] === false - ), - [ConnectingTypeGroup.ICE]: steps[ConnectingTypeGroup.ICE].some( - (a: [ConnectingType, boolean | undefined]) => a[1] === false - ), - [ConnectingTypeGroup.WebRTC]: steps[ConnectingTypeGroup.WebRTC].some( - (a: [ConnectingType, boolean | undefined]) => a[1] === false - ), - } + const hasIssue = (i: [ConnectingType, boolean | undefined]) => + i[1] === undefined ? i[1] : !i[1] - const hasIssues: boolean = - issues[ConnectingTypeGroup.WebSocket] || - issues[ConnectingTypeGroup.ICE] || - issues[ConnectingTypeGroup.WebRTC] + const [issues, setIssues] = useState< + Record + >({ + [ConnectingTypeGroup.WebSocket]: undefined, + [ConnectingTypeGroup.ICE]: undefined, + [ConnectingTypeGroup.WebRTC]: undefined, + }) + const [hasIssues, setHasIssues] = useState(undefined) useEffect(() => { setOverallState( !internetConnected ? NetworkHealthState.Disconnected - : hasIssues + : hasIssues || hasIssues === undefined ? NetworkHealthState.Issue : NetworkHealthState.Ok ) @@ -134,19 +133,59 @@ export function useNetworkStatus() { }, []) useEffect(() => { - engineCommandManager.onConnectionStateChange( - (engineConnectionState: EngineConnectionState) => { - let hasSetAStep = false + console.log(pingPongHealth) + }, [pingPongHealth]) + + useEffect(() => { + const issues = { + [ConnectingTypeGroup.WebSocket]: steps[ + ConnectingTypeGroup.WebSocket + ].reduce( + (acc: boolean | undefined, a) => + acc === true || acc === undefined ? acc : hasIssue(a), + false + ), + [ConnectingTypeGroup.ICE]: steps[ConnectingTypeGroup.ICE].reduce( + (acc: boolean | undefined, a) => + acc === true || acc === undefined ? acc : hasIssue(a), + false + ), + [ConnectingTypeGroup.WebRTC]: steps[ConnectingTypeGroup.WebRTC].reduce( + (acc: boolean | undefined, a) => + acc === true || acc === undefined ? acc : hasIssue(a), + false + ), + } + setIssues(issues) + }, [steps]) + + useEffect(() => { + setHasIssues( + issues[ConnectingTypeGroup.WebSocket] || + issues[ConnectingTypeGroup.ICE] || + issues[ConnectingTypeGroup.WebRTC] + ) + }, [issues]) + + useEffect(() => { + const onPingPongChange = ({ detail: state }: CustomEvent) => { + setPingPongHealth(state) + } + + const onConnectionStateChange = ({ + detail: engineConnectionState, + }: CustomEvent) => { + setSteps((steps) => { + let nextSteps = structuredClone(steps) if ( engineConnectionState.type === EngineConnectionStateType.Connecting ) { - const groups = Object.values(steps) + const groups = Object.values(nextSteps) for (let group of groups) { for (let step of group) { if (step[0] !== engineConnectionState.value.type) continue step[1] = true - hasSetAStep = true } } } @@ -154,7 +193,7 @@ export function useNetworkStatus() { if ( engineConnectionState.type === EngineConnectionStateType.Disconnecting ) { - const groups = Object.values(steps) + const groups = Object.values(nextSteps) for (let group of groups) { for (let step of group) { if ( @@ -165,7 +204,6 @@ export function useNetworkStatus() { ?.type === step[0] ) { step[1] = false - hasSetAStep = true } } } @@ -176,11 +214,50 @@ export function useNetworkStatus() { } } - if (hasSetAStep) { - setSteps(steps) + // Reset the state of all steps if we have disconnected. + if ( + engineConnectionState.type === EngineConnectionStateType.Disconnected + ) { + return structuredClone(initialConnectingTypeGroupState) } - } + + return nextSteps + }) + } + + const onEngineAvailable = ({ detail: engineConnection }: CustomEvent) => { + engineConnection.addEventListener( + EngineConnectionEvents.PingPongChanged, + onPingPongChange as EventListener + ) + engineConnection.addEventListener( + EngineConnectionEvents.ConnectionStateChanged, + onConnectionStateChange as EventListener + ) + } + + engineCommandManager.addEventListener( + EngineCommandManagerEvents.EngineAvailable, + onEngineAvailable as EventListener ) + + return () => { + engineCommandManager.removeEventListener( + EngineCommandManagerEvents.EngineAvailable, + onEngineAvailable as EventListener + ) + + // When the component is unmounted these should be assigned, but it's possible + // the component mounts and unmounts before engine is available. + engineCommandManager.engineConnection?.addEventListener( + EngineConnectionEvents.PingPongChanged, + onPingPongChange as EventListener + ) + engineCommandManager.engineConnection?.addEventListener( + EngineConnectionEvents.ConnectionStateChanged, + onConnectionStateChange as EventListener + ) + } }, []) return { @@ -192,6 +269,7 @@ export function useNetworkStatus() { error, setHasCopied, hasCopied, + pingPongHealth, } } @@ -256,18 +334,18 @@ export const NetworkHealthIndicator = () => { size="lg" icon={ hasIssueToIcon[ - issues[name as ConnectingTypeGroup].toString() + String(issues[name as ConnectingTypeGroup]) ] } iconClassName={ hasIssueToIconColors[ - issues[name as ConnectingTypeGroup].toString() + String(issues[name as ConnectingTypeGroup]) ].icon } bgClassName={ 'rounded-sm ' + hasIssueToIconColors[ - issues[name as ConnectingTypeGroup].toString() + String(issues[name as ConnectingTypeGroup]) ].bg } /> diff --git a/src/components/Stream.tsx b/src/components/Stream.tsx index e7108db84..05970fb6e 100644 --- a/src/components/Stream.tsx +++ b/src/components/Stream.tsx @@ -32,6 +32,7 @@ export const Stream = ({ className = '' }: { className?: string }) => { const { state } = useModelingContext() const { isExecuting } = useKclContext() const { overallState } = useNetworkStatus() + const isNetworkOkay = overallState === NetworkHealthState.Ok useEffect(() => { diff --git a/src/lang/std/engineConnection.ts b/src/lang/std/engineConnection.ts index df1572b1e..95b264577 100644 --- a/src/lang/std/engineConnection.ts +++ b/src/lang/std/engineConnection.ts @@ -1,5 +1,5 @@ import { PathToNode, Program, SourceRange } from 'lang/wasm' -import { VITE_KC_API_WS_MODELING_URL, VITE_KC_CONNECTION_TIMEOUT_MS } from 'env' +import { VITE_KC_API_WS_MODELING_URL } from 'env' import { Models } from '@kittycad/lib' import { exportSave } from 'lib/exportSave' import { v4 as uuidv4 } from 'uuid' @@ -8,6 +8,9 @@ import { sceneInfra } from 'clientSideScene/sceneInfra' let lastMessage = '' +// TODO(paultag): This ought to be tweakable. +const pingIntervalMs = 10000 + interface CommandInfo { commandType: CommandTypes range: SourceRange @@ -37,11 +40,6 @@ export interface ArtifactMap { [key: string]: ResultCommand | PendingCommand | FailedCommand } -interface NewTrackArgs { - conn: EngineConnection - mediaStream: MediaStream -} - // This looks funny, I know. This is needed because node and the browser // disagree as to the type. In a browser it's a number, but in node it's a // "Timeout". @@ -158,10 +156,28 @@ export type EngineConnectionState = | State | State +export type PingPongState = 'OK' | 'BAD' + +export enum EngineConnectionEvents { + // Fires for each ping-pong success or failure. + PingPongChanged = 'ping-pong-changed', // (state: PingPongState) => void + + // For now, this is only used by the NetworkHealthIndicator. + // We can eventually use it for more, but one step at a time. + ConnectionStateChanged = 'connection-state-changed', // (state: EngineConnectionState) => void + + // These are used for the EngineCommandManager and were created + // before onConnectionStateChange existed. + ConnectionStarted = 'connection-started', // (engineConnection: EngineConnection) => void + Opened = 'opened', // (engineConnection: EngineConnection) => void + Closed = 'closed', // (engineConnection: EngineConnection) => void + NewTrack = 'new-track', // (track: NewTrackArgs) => void +} + // EngineConnection encapsulates the connection(s) to the Engine // for the EngineCommandManager; namely, the underlying WebSocket // and WebRTC connections. -class EngineConnection { +class EngineConnection extends EventTarget { websocket?: WebSocket pc?: RTCPeerConnection unreliableDataChannel?: RTCDataChannel @@ -195,7 +211,12 @@ class EngineConnection { } } this._state = next - this.onConnectionStateChange(this._state) + + this.dispatchEvent( + new CustomEvent(EngineConnectionEvents.ConnectionStateChanged, { + detail: this._state, + }) + ) } private failedConnTimeout: Timeout | null @@ -203,74 +224,39 @@ class EngineConnection { readonly url: string private readonly token?: string - // For now, this is only used by the NetworkHealthIndicator. - // We can eventually use it for more, but one step at a time. - private onConnectionStateChange: (state: EngineConnectionState) => void - - // These are used for the EngineCommandManager and were created - // before onConnectionStateChange existed. - private onEngineConnectionOpen: (engineConnection: EngineConnection) => void - private onConnectionStarted: (engineConnection: EngineConnection) => void - private onClose: (engineConnection: EngineConnection) => void - private onNewTrack: (track: NewTrackArgs) => void - // TODO: actual type is ClientMetrics private webrtcStatsCollector?: () => Promise - constructor({ - url, - token, - onConnectionStateChange = () => {}, - onNewTrack = () => {}, - onEngineConnectionOpen = () => {}, - onConnectionStarted = () => {}, - onClose = () => {}, - }: { - url: string - token?: string - onConnectionStateChange?: (state: EngineConnectionState) => void - onEngineConnectionOpen?: (engineConnection: EngineConnection) => void - onConnectionStarted?: (engineConnection: EngineConnection) => void - onClose?: (engineConnection: EngineConnection) => void - onNewTrack?: (track: NewTrackArgs) => void - }) { + private pingPongSpan: { ping?: Date; pong?: Date } + + constructor({ url, token }: { url: string; token?: string }) { + super() + this.url = url this.token = token this.failedConnTimeout = null - this.onConnectionStateChange = onConnectionStateChange - this.onEngineConnectionOpen = onEngineConnectionOpen - this.onConnectionStarted = onConnectionStarted - this.onClose = onClose - this.onNewTrack = onNewTrack - - // TODO(paultag): This ought to be tweakable. - const pingIntervalMs = 10000 + this.pingPongSpan = { ping: undefined, pong: undefined } // Without an interval ping, our connection will timeout. - let pingInterval = setInterval(() => { + setInterval(() => { switch (this.state.type as EngineConnectionStateType) { case EngineConnectionStateType.ConnectionEstablished: this.send({ type: 'ping' }) + this.pingPongSpan.ping = new Date() break case EngineConnectionStateType.Disconnecting: case EngineConnectionStateType.Disconnected: - clearInterval(pingInterval) + // Reconnect if we have disconnected. + if (!this.isConnecting()) this.connect() break default: + if (this.isConnecting()) break + // Means we never could do an initial connection. Reconnect everything. + if (!this.pingPongSpan.ping) this.connect() break } }, pingIntervalMs) - - const connectionTimeoutMs = VITE_KC_CONNECTION_TIMEOUT_MS - let connectRetryInterval = setInterval(() => { - if (this.state.type !== EngineConnectionStateType.Disconnected) return - - // Only try reconnecting when completely disconnected. - clearInterval(connectRetryInterval) - console.log('Trying to reconnect') - this.connect() - }, connectionTimeoutMs) } isConnecting() { @@ -352,7 +338,11 @@ class EngineConnection { // dance is it safest to connect the video tracks / stream case 'connected': // Let the browser attach to the video stream now - this.onNewTrack({ conn: this, mediaStream: this.mediaStream! }) + this.dispatchEvent( + new CustomEvent(EngineConnectionEvents.NewTrack, { + detail: { conn: this, mediaStream: this.mediaStream! }, + }) + ) break case 'failed': this.disconnectAll() @@ -468,7 +458,9 @@ class EngineConnection { // Everything is now connected. this.state = { type: EngineConnectionStateType.ConnectionEstablished } - this.onEngineConnectionOpen(this) + this.dispatchEvent( + new CustomEvent(EngineConnectionEvents.Opened, { detail: this }) + ) }) this.unreliableDataChannel.addEventListener('close', (event) => { @@ -510,6 +502,10 @@ class EngineConnection { }, } + // Send an initial ping + this.send({ type: 'ping' }) + this.pingPongSpan.ping = new Date() + // This is required for when KCMA is running stand-alone / within Tauri. // Otherwise when run in a browser, the token is sent implicitly via // the Cookie header. @@ -575,12 +571,34 @@ failed cmd type was ${artifactThatFailed?.commandType}` let resp = message.resp // If there's no body to the response, we can bail here. - // !resp.type is usually "pong" response for our "ping" if (!resp || !resp.type) { return } switch (resp.type) { + case 'pong': + this.pingPongSpan.pong = new Date() + if (this.pingPongSpan.ping && this.pingPongSpan.pong) { + if ( + Math.abs( + this.pingPongSpan.pong.valueOf() - + this.pingPongSpan.ping.valueOf() + ) >= pingIntervalMs + ) { + this.dispatchEvent( + new CustomEvent(EngineConnectionEvents.PingPongChanged, { + detail: 'BAD', + }) + ) + } else { + this.dispatchEvent( + new CustomEvent(EngineConnectionEvents.PingPongChanged, { + detail: 'OK', + }) + ) + } + } + break case 'ice_server_info': let ice_servers = resp.data?.ice_servers @@ -727,27 +745,11 @@ failed cmd type was ${artifactThatFailed?.commandType}` } }) - const connectionTimeoutMs = VITE_KC_CONNECTION_TIMEOUT_MS - if (this.failedConnTimeout) { - clearTimeout(this.failedConnTimeout) - this.failedConnTimeout = null - } - this.failedConnTimeout = setTimeout(() => { - if (this.isReady()) { - return - } - this.failedConnTimeout = null - this.state = { - type: EngineConnectionStateType.Disconnecting, - value: { - type: DisconnectingType.Timeout, - }, - } - this.disconnectAll() - this.finalizeIfAllConnectionsClosed() - }, connectionTimeoutMs) - - this.onConnectionStarted(this) + this.dispatchEvent( + new CustomEvent(EngineConnectionEvents.ConnectionStarted, { + detail: this, + }) + ) } unreliableSend(message: object | string) { // TODO(paultag): Add in logic to determine the connection state and @@ -796,6 +798,8 @@ interface UnreliableSubscription { callback: (data: Extract) => void } +// TODO: Should eventually be replaced with native EventTarget event system, +// as it manages events in a more familiar way to other developers. export interface Subscription { event: T callback: ( @@ -823,7 +827,11 @@ export type CommandLog = data: null } -export class EngineCommandManager { +export enum EngineCommandManagerEvents { + EngineAvailable = 'engine-available', +} + +export class EngineCommandManager extends EventTarget { artifactMap: ArtifactMap = {} lastArtifactMap: ArtifactMap = {} sceneCommandArtifacts: ArtifactMap = {} @@ -857,10 +865,9 @@ export class EngineCommandManager { } } = {} as any - callbacksEngineStateConnection: ((state: EngineConnectionState) => void)[] = - [] - constructor() { + super() + this.engineConnection = undefined ;(async () => { // circular dependency needs one to be lazy loaded @@ -901,12 +908,17 @@ export class EngineCommandManager { this.engineConnection = new EngineConnection({ url, token, - onConnectionStateChange: (state: EngineConnectionState) => { - for (let cb of this.callbacksEngineStateConnection) { - cb(state) - } - }, - onEngineConnectionOpen: () => { + }) + + this.dispatchEvent( + new CustomEvent(EngineCommandManagerEvents.EngineAvailable, { + detail: this.engineConnection, + }) + ) + + this.engineConnection.addEventListener( + EngineConnectionEvents.Opened, + () => { // Make the axis gizmo. // We do this after the connection opened to avoid a race condition. // Connected opened is the last thing that happens when the stream @@ -941,78 +953,98 @@ export class EngineCommandManager { setIsStreamReady(true) executeCode(undefined, true) }) - }, - onClose: () => { - setIsStreamReady(false) - }, - onConnectionStarted: (engineConnection) => { - engineConnection?.pc?.addEventListener('datachannel', (event) => { - let unreliableDataChannel = event.channel + } + ) - unreliableDataChannel.addEventListener('message', (event) => { - const result: UnreliableResponses = JSON.parse(event.data) - Object.values( - this.unreliableSubscriptions[result.type] || {} - ).forEach( - // TODO: There is only one response that uses the unreliable channel atm, - // highlight_set_entity, if there are more it's likely they will all have the same - // sequence logic, but I'm not sure if we use a single global sequence or a sequence - // per unreliable subscription. - (callback) => { - if ( - result?.data?.sequence && - result?.data.sequence > this.inSequence && - result.type === 'highlight_set_entity' - ) { - this.inSequence = result.data.sequence - callback(result) - } + this.engineConnection.addEventListener( + EngineConnectionEvents.Closed, + () => { + setIsStreamReady(false) + } + ) + + this.engineConnection.addEventListener( + EngineConnectionEvents.ConnectionStarted, + (({ detail: engineConnection }: CustomEvent) => { + engineConnection?.pc?.addEventListener( + 'datachannel', + (event: RTCDataChannelEvent) => { + let unreliableDataChannel = event.channel + + unreliableDataChannel.addEventListener( + 'message', + (event: MessageEvent) => { + const result: UnreliableResponses = JSON.parse(event.data) + Object.values( + this.unreliableSubscriptions[result.type] || {} + ).forEach( + // TODO: There is only one response that uses the unreliable channel atm, + // highlight_set_entity, if there are more it's likely they will all have the same + // sequence logic, but I'm not sure if we use a single global sequence or a sequence + // per unreliable subscription. + (callback) => { + if ( + result?.data?.sequence && + result?.data.sequence > this.inSequence && + result.type === 'highlight_set_entity' + ) { + this.inSequence = result.data.sequence + callback(result) + } + } + ) } ) - }) - }) + } + ) // When the EngineConnection starts a connection, we want to register // callbacks into the WebSocket/PeerConnection. - engineConnection.websocket?.addEventListener('message', (event) => { - if (event.data instanceof ArrayBuffer) { - // If the data is an ArrayBuffer, it's the result of an export command, - // because in all other cases we send JSON strings. But in the case of - // export we send a binary blob. - // Pass this to our export function. - void exportSave(event.data) - } else { - const message: Models['WebSocketResponse_type'] = JSON.parse( - event.data - ) - if ( - message.success && - message.resp.type === 'modeling' && - message.request_id - ) { - this.handleModelingCommand(message.resp, message.request_id) - } else if ( - !message.success && - message.request_id && - this.artifactMap[message.request_id] - ) { - this.handleFailedModelingCommand(message) + engineConnection.websocket?.addEventListener( + 'message', + (event: MessageEvent) => { + if (event.data instanceof ArrayBuffer) { + // If the data is an ArrayBuffer, it's the result of an export command, + // because in all other cases we send JSON strings. But in the case of + // export we send a binary blob. + // Pass this to our export function. + void exportSave(event.data) + } else { + const message: Models['WebSocketResponse_type'] = JSON.parse( + event.data + ) + if ( + message.success && + message.resp.type === 'modeling' && + message.request_id + ) { + this.handleModelingCommand(message.resp, message.request_id) + } else if ( + !message.success && + message.request_id && + this.artifactMap[message.request_id] + ) { + this.handleFailedModelingCommand(message) + } } } - }) - }, - onNewTrack: ({ mediaStream }) => { - console.log('received track', mediaStream) + ) + }) as EventListener + ) - mediaStream.getVideoTracks()[0].addEventListener('mute', () => { - console.log('peer is not sending video to us') - // this.engineConnection?.close() - // this.engineConnection?.connect() - }) + this.engineConnection.addEventListener(EngineConnectionEvents.NewTrack, (({ + detail: { mediaStream }, + }: CustomEvent) => { + console.log('received track', mediaStream) - setMediaStream(mediaStream) - }, - }) + mediaStream.getVideoTracks()[0].addEventListener('mute', () => { + console.log('peer is not sending video to us') + // this.engineConnection?.close() + // this.engineConnection?.connect() + }) + + setMediaStream(mediaStream) + }) as EventListener) this.engineConnection?.connect() } @@ -1202,9 +1234,6 @@ export class EngineCommandManager { ) { delete this.unreliableSubscriptions[event][id] } - onConnectionStateChange(callback: (state: EngineConnectionState) => void) { - this.callbacksEngineStateConnection.push(callback) - } endSession() { // TODO: instead of sending a single command with `object_ids: Object.keys(this.artifactMap)` // we need to loop over them each individually because if the engine doesn't recognise a single