Revert "Add ping pong health, remove a timeout interval, fix up netwo… (#1771)

Revert "Add ping pong health, remove a timeout interval, fix up network events (#1555)"

This reverts commit 61d7950ca3.

Co-authored-by: Kurt Hutten <k.hutten@protonmail.ch>
This commit is contained in:
Frank Noirot
2024-03-19 20:39:49 -04:00
committed by GitHub
parent 4b9d4fd45b
commit 1913519f68
4 changed files with 187 additions and 295 deletions

View File

@ -1,5 +1,5 @@
import { PathToNode, Program, SourceRange } from 'lang/wasm'
import { VITE_KC_API_WS_MODELING_URL } from 'env'
import { VITE_KC_API_WS_MODELING_URL, VITE_KC_CONNECTION_TIMEOUT_MS } from 'env'
import { Models } from '@kittycad/lib'
import { exportSave } from 'lib/exportSave'
import { v4 as uuidv4 } from 'uuid'
@ -8,9 +8,6 @@ import { sceneInfra } from 'clientSideScene/sceneInfra'
let lastMessage = ''
// TODO(paultag): This ought to be tweakable.
const pingIntervalMs = 10000
interface CommandInfo {
commandType: CommandTypes
range: SourceRange
@ -40,6 +37,11 @@ export interface ArtifactMap {
[key: string]: ResultCommand | PendingCommand | FailedCommand
}
interface NewTrackArgs {
conn: EngineConnection
mediaStream: MediaStream
}
// This looks funny, I know. This is needed because node and the browser
// disagree as to the type. In a browser it's a number, but in node it's a
// "Timeout".
@ -156,28 +158,10 @@ export type EngineConnectionState =
| State<EngineConnectionStateType.Disconnecting, DisconnectingValue>
| State<EngineConnectionStateType.Disconnected, void>
export type PingPongState = 'OK' | 'BAD'
export enum EngineConnectionEvents {
// Fires for each ping-pong success or failure.
PingPongChanged = 'ping-pong-changed', // (state: PingPongState) => void
// For now, this is only used by the NetworkHealthIndicator.
// We can eventually use it for more, but one step at a time.
ConnectionStateChanged = 'connection-state-changed', // (state: EngineConnectionState) => void
// These are used for the EngineCommandManager and were created
// before onConnectionStateChange existed.
ConnectionStarted = 'connection-started', // (engineConnection: EngineConnection) => void
Opened = 'opened', // (engineConnection: EngineConnection) => void
Closed = 'closed', // (engineConnection: EngineConnection) => void
NewTrack = 'new-track', // (track: NewTrackArgs) => void
}
// EngineConnection encapsulates the connection(s) to the Engine
// for the EngineCommandManager; namely, the underlying WebSocket
// and WebRTC connections.
class EngineConnection extends EventTarget {
class EngineConnection {
websocket?: WebSocket
pc?: RTCPeerConnection
unreliableDataChannel?: RTCDataChannel
@ -211,12 +195,7 @@ class EngineConnection extends EventTarget {
}
}
this._state = next
this.dispatchEvent(
new CustomEvent(EngineConnectionEvents.ConnectionStateChanged, {
detail: this._state,
})
)
this.onConnectionStateChange(this._state)
}
private failedConnTimeout: Timeout | null
@ -224,39 +203,74 @@ class EngineConnection extends EventTarget {
readonly url: string
private readonly token?: string
// For now, this is only used by the NetworkHealthIndicator.
// We can eventually use it for more, but one step at a time.
private onConnectionStateChange: (state: EngineConnectionState) => void
// These are used for the EngineCommandManager and were created
// before onConnectionStateChange existed.
private onEngineConnectionOpen: (engineConnection: EngineConnection) => void
private onConnectionStarted: (engineConnection: EngineConnection) => void
private onClose: (engineConnection: EngineConnection) => void
private onNewTrack: (track: NewTrackArgs) => void
// TODO: actual type is ClientMetrics
private webrtcStatsCollector?: () => Promise<ClientMetrics>
private pingPongSpan: { ping?: Date; pong?: Date }
constructor({ url, token }: { url: string; token?: string }) {
super()
constructor({
url,
token,
onConnectionStateChange = () => {},
onNewTrack = () => {},
onEngineConnectionOpen = () => {},
onConnectionStarted = () => {},
onClose = () => {},
}: {
url: string
token?: string
onConnectionStateChange?: (state: EngineConnectionState) => void
onEngineConnectionOpen?: (engineConnection: EngineConnection) => void
onConnectionStarted?: (engineConnection: EngineConnection) => void
onClose?: (engineConnection: EngineConnection) => void
onNewTrack?: (track: NewTrackArgs) => void
}) {
this.url = url
this.token = token
this.failedConnTimeout = null
this.onConnectionStateChange = onConnectionStateChange
this.onEngineConnectionOpen = onEngineConnectionOpen
this.onConnectionStarted = onConnectionStarted
this.pingPongSpan = { ping: undefined, pong: undefined }
this.onClose = onClose
this.onNewTrack = onNewTrack
// TODO(paultag): This ought to be tweakable.
const pingIntervalMs = 10000
// Without an interval ping, our connection will timeout.
setInterval(() => {
let pingInterval = setInterval(() => {
switch (this.state.type as EngineConnectionStateType) {
case EngineConnectionStateType.ConnectionEstablished:
this.send({ type: 'ping' })
this.pingPongSpan.ping = new Date()
break
case EngineConnectionStateType.Disconnecting:
case EngineConnectionStateType.Disconnected:
// Reconnect if we have disconnected.
if (!this.isConnecting()) this.connect()
clearInterval(pingInterval)
break
default:
if (this.isConnecting()) break
// Means we never could do an initial connection. Reconnect everything.
if (!this.pingPongSpan.ping) this.connect()
break
}
}, pingIntervalMs)
const connectionTimeoutMs = VITE_KC_CONNECTION_TIMEOUT_MS
let connectRetryInterval = setInterval(() => {
if (this.state.type !== EngineConnectionStateType.Disconnected) return
// Only try reconnecting when completely disconnected.
clearInterval(connectRetryInterval)
console.log('Trying to reconnect')
this.connect()
}, connectionTimeoutMs)
}
isConnecting() {
@ -338,11 +352,7 @@ class EngineConnection extends EventTarget {
// dance is it safest to connect the video tracks / stream
case 'connected':
// Let the browser attach to the video stream now
this.dispatchEvent(
new CustomEvent(EngineConnectionEvents.NewTrack, {
detail: { conn: this, mediaStream: this.mediaStream! },
})
)
this.onNewTrack({ conn: this, mediaStream: this.mediaStream! })
break
case 'failed':
this.disconnectAll()
@ -458,9 +468,7 @@ class EngineConnection extends EventTarget {
// Everything is now connected.
this.state = { type: EngineConnectionStateType.ConnectionEstablished }
this.dispatchEvent(
new CustomEvent(EngineConnectionEvents.Opened, { detail: this })
)
this.onEngineConnectionOpen(this)
})
this.unreliableDataChannel.addEventListener('close', (event) => {
@ -502,10 +510,6 @@ class EngineConnection extends EventTarget {
},
}
// Send an initial ping
this.send({ type: 'ping' })
this.pingPongSpan.ping = new Date()
// This is required for when KCMA is running stand-alone / within Tauri.
// Otherwise when run in a browser, the token is sent implicitly via
// the Cookie header.
@ -571,34 +575,12 @@ failed cmd type was ${artifactThatFailed?.commandType}`
let resp = message.resp
// If there's no body to the response, we can bail here.
// !resp.type is usually "pong" response for our "ping"
if (!resp || !resp.type) {
return
}
switch (resp.type) {
case 'pong':
this.pingPongSpan.pong = new Date()
if (this.pingPongSpan.ping && this.pingPongSpan.pong) {
if (
Math.abs(
this.pingPongSpan.pong.valueOf() -
this.pingPongSpan.ping.valueOf()
) >= pingIntervalMs
) {
this.dispatchEvent(
new CustomEvent(EngineConnectionEvents.PingPongChanged, {
detail: 'BAD',
})
)
} else {
this.dispatchEvent(
new CustomEvent(EngineConnectionEvents.PingPongChanged, {
detail: 'OK',
})
)
}
}
break
case 'ice_server_info':
let ice_servers = resp.data?.ice_servers
@ -745,11 +727,27 @@ failed cmd type was ${artifactThatFailed?.commandType}`
}
})
this.dispatchEvent(
new CustomEvent(EngineConnectionEvents.ConnectionStarted, {
detail: this,
})
)
const connectionTimeoutMs = VITE_KC_CONNECTION_TIMEOUT_MS
if (this.failedConnTimeout) {
clearTimeout(this.failedConnTimeout)
this.failedConnTimeout = null
}
this.failedConnTimeout = setTimeout(() => {
if (this.isReady()) {
return
}
this.failedConnTimeout = null
this.state = {
type: EngineConnectionStateType.Disconnecting,
value: {
type: DisconnectingType.Timeout,
},
}
this.disconnectAll()
this.finalizeIfAllConnectionsClosed()
}, connectionTimeoutMs)
this.onConnectionStarted(this)
}
unreliableSend(message: object | string) {
// TODO(paultag): Add in logic to determine the connection state and
@ -798,8 +796,6 @@ interface UnreliableSubscription<T extends UnreliableResponses['type']> {
callback: (data: Extract<UnreliableResponses, { type: T }>) => void
}
// TODO: Should eventually be replaced with native EventTarget event system,
// as it manages events in a more familiar way to other developers.
export interface Subscription<T extends ModelTypes> {
event: T
callback: (
@ -827,11 +823,7 @@ export type CommandLog =
data: null
}
export enum EngineCommandManagerEvents {
EngineAvailable = 'engine-available',
}
export class EngineCommandManager extends EventTarget {
export class EngineCommandManager {
artifactMap: ArtifactMap = {}
lastArtifactMap: ArtifactMap = {}
sceneCommandArtifacts: ArtifactMap = {}
@ -865,9 +857,10 @@ export class EngineCommandManager extends EventTarget {
}
} = {} as any
constructor() {
super()
callbacksEngineStateConnection: ((state: EngineConnectionState) => void)[] =
[]
constructor() {
this.engineConnection = undefined
;(async () => {
// circular dependency needs one to be lazy loaded
@ -908,17 +901,12 @@ export class EngineCommandManager extends EventTarget {
this.engineConnection = new EngineConnection({
url,
token,
})
this.dispatchEvent(
new CustomEvent(EngineCommandManagerEvents.EngineAvailable, {
detail: this.engineConnection,
})
)
this.engineConnection.addEventListener(
EngineConnectionEvents.Opened,
() => {
onConnectionStateChange: (state: EngineConnectionState) => {
for (let cb of this.callbacksEngineStateConnection) {
cb(state)
}
},
onEngineConnectionOpen: () => {
// Make the axis gizmo.
// We do this after the connection opened to avoid a race condition.
// Connected opened is the last thing that happens when the stream
@ -953,98 +941,78 @@ export class EngineCommandManager extends EventTarget {
setIsStreamReady(true)
executeCode(undefined, true)
})
}
)
this.engineConnection.addEventListener(
EngineConnectionEvents.Closed,
() => {
},
onClose: () => {
setIsStreamReady(false)
}
)
},
onConnectionStarted: (engineConnection) => {
engineConnection?.pc?.addEventListener('datachannel', (event) => {
let unreliableDataChannel = event.channel
this.engineConnection.addEventListener(
EngineConnectionEvents.ConnectionStarted,
(({ detail: engineConnection }: CustomEvent) => {
engineConnection?.pc?.addEventListener(
'datachannel',
(event: RTCDataChannelEvent) => {
let unreliableDataChannel = event.channel
unreliableDataChannel.addEventListener(
'message',
(event: MessageEvent) => {
const result: UnreliableResponses = JSON.parse(event.data)
Object.values(
this.unreliableSubscriptions[result.type] || {}
).forEach(
// TODO: There is only one response that uses the unreliable channel atm,
// highlight_set_entity, if there are more it's likely they will all have the same
// sequence logic, but I'm not sure if we use a single global sequence or a sequence
// per unreliable subscription.
(callback) => {
if (
result?.data?.sequence &&
result?.data.sequence > this.inSequence &&
result.type === 'highlight_set_entity'
) {
this.inSequence = result.data.sequence
callback(result)
}
}
)
unreliableDataChannel.addEventListener('message', (event) => {
const result: UnreliableResponses = JSON.parse(event.data)
Object.values(
this.unreliableSubscriptions[result.type] || {}
).forEach(
// TODO: There is only one response that uses the unreliable channel atm,
// highlight_set_entity, if there are more it's likely they will all have the same
// sequence logic, but I'm not sure if we use a single global sequence or a sequence
// per unreliable subscription.
(callback) => {
if (
result?.data?.sequence &&
result?.data.sequence > this.inSequence &&
result.type === 'highlight_set_entity'
) {
this.inSequence = result.data.sequence
callback(result)
}
}
)
}
)
})
})
// When the EngineConnection starts a connection, we want to register
// callbacks into the WebSocket/PeerConnection.
engineConnection.websocket?.addEventListener(
'message',
(event: MessageEvent) => {
if (event.data instanceof ArrayBuffer) {
// If the data is an ArrayBuffer, it's the result of an export command,
// because in all other cases we send JSON strings. But in the case of
// export we send a binary blob.
// Pass this to our export function.
void exportSave(event.data)
} else {
const message: Models['WebSocketResponse_type'] = JSON.parse(
event.data
)
if (
message.success &&
message.resp.type === 'modeling' &&
message.request_id
) {
this.handleModelingCommand(message.resp, message.request_id)
} else if (
!message.success &&
message.request_id &&
this.artifactMap[message.request_id]
) {
this.handleFailedModelingCommand(message)
}
engineConnection.websocket?.addEventListener('message', (event) => {
if (event.data instanceof ArrayBuffer) {
// If the data is an ArrayBuffer, it's the result of an export command,
// because in all other cases we send JSON strings. But in the case of
// export we send a binary blob.
// Pass this to our export function.
void exportSave(event.data)
} else {
const message: Models['WebSocketResponse_type'] = JSON.parse(
event.data
)
if (
message.success &&
message.resp.type === 'modeling' &&
message.request_id
) {
this.handleModelingCommand(message.resp, message.request_id)
} else if (
!message.success &&
message.request_id &&
this.artifactMap[message.request_id]
) {
this.handleFailedModelingCommand(message)
}
}
)
}) as EventListener
)
})
},
onNewTrack: ({ mediaStream }) => {
console.log('received track', mediaStream)
this.engineConnection.addEventListener(EngineConnectionEvents.NewTrack, (({
detail: { mediaStream },
}: CustomEvent) => {
console.log('received track', mediaStream)
mediaStream.getVideoTracks()[0].addEventListener('mute', () => {
console.log('peer is not sending video to us')
// this.engineConnection?.close()
// this.engineConnection?.connect()
})
mediaStream.getVideoTracks()[0].addEventListener('mute', () => {
console.log('peer is not sending video to us')
// this.engineConnection?.close()
// this.engineConnection?.connect()
})
setMediaStream(mediaStream)
}) as EventListener)
setMediaStream(mediaStream)
},
})
this.engineConnection?.connect()
}
@ -1234,6 +1202,9 @@ export class EngineCommandManager extends EventTarget {
) {
delete this.unreliableSubscriptions[event][id]
}
onConnectionStateChange(callback: (state: EngineConnectionState) => void) {
this.callbacksEngineStateConnection.push(callback)
}
endSession() {
// TODO: instead of sending a single command with `object_ids: Object.keys(this.artifactMap)`
// we need to loop over them each individually because if the engine doesn't recognise a single