Rework initial engine connection logic (#1205) (#1221)

Rework EngineConnection class (#1205)

Co-authored-by: lf94 <inbox@leefallat.ca>
This commit is contained in:
49fl
2024-01-23 13:13:43 -05:00
committed by GitHub
parent e04b09fcd8
commit 69f72d62e0

View File

@ -48,6 +48,72 @@ type Timeout = ReturnType<typeof setTimeout>
type ClientMetrics = Models['ClientMetrics_type']
type Value<T, U> = U extends undefined
? { type: T; value: U }
: U extends void
? { type: T }
: { type: T; value: U }
type State<T, U> = Value<T, U>
enum EngineConnectionStateType {
Fresh = 'fresh',
Connecting = 'connecting',
ConnectionEstablished = 'connection-established',
Disconnected = 'disconnected',
}
enum DisconnectedType {
Error = 'error',
Timeout = 'timeout',
Quit = 'quit',
}
type DisconnectedValue =
| State<DisconnectedType.Error, Error | undefined>
| State<DisconnectedType.Timeout, void>
| State<DisconnectedType.Quit, void>
// These are ordered by the expected sequence.
enum ConnectingType {
WebSocketConnecting = 'websocket-connecting',
WebSocketEstablished = 'websocket-established',
PeerConnectionCreated = 'peer-connection-created',
ICEServersSet = 'ice-servers-set',
SetLocalDescription = 'set-local-description',
OfferedSdp = 'offered-sdp',
ReceivedSdp = 'received-sdp',
SetRemoteDescription = 'set-remote-description',
WebRTCConnecting = 'webrtc-connecting',
ICECandidateReceived = 'ice-candidate-received',
TrackReceived = 'track-received',
DataChannelRequested = 'data-channel-requested',
DataChannelConnecting = 'data-channel-connecting',
DataChannelEstablished = 'data-channel-established',
}
type ConnectingValue =
| State<ConnectingType.WebSocketConnecting, void>
| State<ConnectingType.WebSocketEstablished, void>
| State<ConnectingType.PeerConnectionCreated, void>
| State<ConnectingType.ICEServersSet, void>
| State<ConnectingType.SetLocalDescription, void>
| State<ConnectingType.OfferedSdp, void>
| State<ConnectingType.ReceivedSdp, void>
| State<ConnectingType.SetRemoteDescription, void>
| State<ConnectingType.WebRTCConnecting, void>
| State<ConnectingType.TrackReceived, void>
| State<ConnectingType.ICECandidateReceived, void>
| State<ConnectingType.DataChannelRequested, string>
| State<ConnectingType.DataChannelConnecting, string>
| State<ConnectingType.DataChannelEstablished, void>
type EngineConnectionState =
| State<EngineConnectionStateType.Fresh, void>
| State<EngineConnectionStateType.Connecting, ConnectingValue>
| State<EngineConnectionStateType.ConnectionEstablished, void>
| State<EngineConnectionStateType.Disconnected, DisconnectedValue>
// EngineConnection encapsulates the connection(s) to the Engine
// for the EngineCommandManager; namely, the underlying WebSocket
// and WebRTC connections.
@ -55,10 +121,28 @@ class EngineConnection {
websocket?: WebSocket
pc?: RTCPeerConnection
unreliableDataChannel?: RTCDataChannel
mediaStream?: MediaStream
private _state: EngineConnectionState = {
type: EngineConnectionStateType.Fresh,
}
get state(): EngineConnectionState {
return this._state
}
set state(next: EngineConnectionState) {
console.log(`${JSON.stringify(this.state)}${JSON.stringify(next)}`)
if (next.type === EngineConnectionStateType.Disconnected) {
console.trace()
const sub = next.value
if (sub.type === DisconnectedType.Error) {
console.error(sub.value)
}
}
this._state = next
}
private ready: boolean
private connecting: boolean
private dead: boolean
private failedConnTimeout: Timeout | null
readonly url: string
@ -94,74 +178,77 @@ class EngineConnection {
}) {
this.url = url
this.token = token
this.ready = false
this.connecting = false
this.dead = false
this.failedConnTimeout = null
this.onWebsocketOpen = onWebsocketOpen
this.onDataChannelOpen = onDataChannelOpen
this.onEngineConnectionOpen = onEngineConnectionOpen
this.onConnectionStarted = onConnectionStarted
this.onClose = onClose
this.onNewTrack = onNewTrack
// TODO(paultag): This ought to be tweakable.
const pingIntervalMs = 10000
// Without an interval ping, our connection will timeout.
let pingInterval = setInterval(() => {
if (this.dead) {
clearInterval(pingInterval)
}
if (this.isReady()) {
// When we're online, every 10 seconds, we'll attempt to put a 'ping'
// command through the WebSocket connection. This will help both ends
// of the connection maintain the TCP connection without hitting a
// timeout condition.
this.send({ type: 'ping' })
switch (this.state.type as EngineConnectionStateType) {
case EngineConnectionStateType.ConnectionEstablished:
this.send({ type: 'ping' })
break
case EngineConnectionStateType.Disconnected:
clearInterval(pingInterval)
break
default:
break
}
}, pingIntervalMs)
const connectionTimeoutMs = VITE_KC_CONNECTION_TIMEOUT_MS
let connectInterval = setInterval(() => {
if (this.dead) {
clearInterval(connectInterval)
return
let connectRetryInterval = setInterval(() => {
if (this.state.type !== EngineConnectionStateType.Disconnected) return
switch (this.state.value.type) {
case DisconnectedType.Error:
clearInterval(connectRetryInterval)
break
case DisconnectedType.Timeout:
console.log('Trying to reconnect')
this.connect()
break
default:
break
}
if (this.isReady()) {
return
}
console.log('connecting via retry')
this.connect()
}, connectionTimeoutMs)
}
// isConnecting will return true when connect has been called, but the full
// WebRTC is not online.
isConnecting() {
return this.connecting
return this.state.type === EngineConnectionStateType.Connecting
}
// isReady will return true only when the WebRTC *and* WebSocket connection
// are connected. During setup, the WebSocket connection comes online first,
// which is used to establish the WebRTC connection. The EngineConnection
// is not "Ready" until both are connected.
isReady() {
return this.ready
return this.state.type === EngineConnectionStateType.ConnectionEstablished
}
tearDown() {
this.dead = true
this.close()
this.disconnectAll()
this.state = {
type: EngineConnectionStateType.Disconnected,
value: { type: DisconnectedType.Quit },
}
}
// shouldTrace will return true when Sentry should be used to instrument
// the Engine.
shouldTrace() {
return Sentry.getCurrentHub()?.getClient()?.getOptions()?.sendClientReports
}
// connect will attempt to connect to the Engine over a WebSocket, and
// establish the WebRTC connections.
//
// This will attempt the full handshake, and retry if the connection
// did not establish.
connect() {
console.log('connect was called')
if (this.isConnecting() || this.isReady()) {
return
}
@ -195,71 +282,269 @@ class EngineConnection {
let handshakeSpan: SpanPromise
let iceSpan: SpanPromise
const spanStart = (op: string) =>
new SpanPromise(webrtcMediaTransaction.startChild({ op }))
if (this.shouldTrace()) {
webrtcMediaTransaction = Sentry.startTransaction({
name: 'webrtc-media',
webrtcMediaTransaction = Sentry.startTransaction({ name: 'webrtc-media' })
websocketSpan = spanStart('websocket')
}
const createPeerConnection = () => {
this.pc = new RTCPeerConnection()
// Data channels MUST BE specified before SDP offers because requesting
// them affects what our needs are!
const DATACHANNEL_NAME_UMC = 'unreliable_modeling_cmds'
this.pc.createDataChannel(DATACHANNEL_NAME_UMC)
this.state = {
type: EngineConnectionStateType.Connecting,
value: {
type: ConnectingType.DataChannelRequested,
value: DATACHANNEL_NAME_UMC,
},
}
this.pc.addEventListener('icecandidate', (event) => {
if (event.candidate === null) {
return
}
this.state = {
type: EngineConnectionStateType.Connecting,
value: {
type: ConnectingType.ICECandidateReceived,
},
}
// Request a candidate to use
this.send({
type: 'trickle_ice',
candidate: event.candidate.toJSON(),
})
})
websocketSpan = new SpanPromise(
webrtcMediaTransaction.startChild({ op: 'websocket' })
)
this.pc.addEventListener('icecandidateerror', (_event: Event) => {
const event = _event as RTCPeerConnectionIceErrorEvent
console.warn(
`ICE candidate returned an error: ${event.errorCode}: ${event.errorText} for ${event.url}`
)
})
// https://developer.mozilla.org/en-US/docs/Web/API/RTCPeerConnection/connectionstatechange_event
// Event type: generic Event type...
this.pc.addEventListener('connectionstatechange', (event: any) => {
console.log('connectionstatechange: ' + event.target?.connectionState)
switch (event.target?.connectionState) {
// From what I understand, only after have we done the ICE song and
// dance is it safest to connect the video tracks / stream
case 'connected':
if (this.shouldTrace()) {
iceSpan.resolve?.()
}
// Let the browser attach to the video stream now
this.onNewTrack({ conn: this, mediaStream: this.mediaStream! })
break
case 'failed':
this.disconnectAll()
this.state = {
type: EngineConnectionStateType.Disconnected,
value: {
type: DisconnectedType.Error,
value: new Error(
'failed to negotiate ice connection; restarting'
),
},
}
break
default:
break
}
})
this.pc.addEventListener('track', (event) => {
const mediaStream = event.streams[0]
this.state = {
type: EngineConnectionStateType.Connecting,
value: {
type: ConnectingType.TrackReceived,
},
}
if (this.shouldTrace()) {
let mediaStreamTrack = mediaStream.getVideoTracks()[0]
mediaStreamTrack.addEventListener('unmute', () => {
// let settings = mediaStreamTrack.getSettings()
// mediaTrackSpan.span.setTag("fps", settings.frameRate)
// mediaTrackSpan.span.setTag("width", settings.width)
// mediaTrackSpan.span.setTag("height", settings.height)
mediaTrackSpan.resolve?.()
})
}
this.webrtcStatsCollector = (): Promise<ClientMetrics> => {
return new Promise((resolve, reject) => {
if (mediaStream.getVideoTracks().length !== 1) {
reject(new Error('too many video tracks to report'))
return
}
let videoTrack = mediaStream.getVideoTracks()[0]
void this.pc?.getStats(videoTrack).then((videoTrackStats) => {
let client_metrics: ClientMetrics = {
rtc_frames_decoded: 0,
rtc_frames_dropped: 0,
rtc_frames_received: 0,
rtc_frames_per_second: 0,
rtc_freeze_count: 0,
rtc_jitter_sec: 0.0,
rtc_keyframes_decoded: 0,
rtc_total_freezes_duration_sec: 0.0,
}
// TODO(paultag): Since we can technically have multiple WebRTC
// video tracks (even if the Server doesn't at the moment), we
// ought to send stats for every video track(?), and add the stream
// ID into it. This raises the cardinality of collected metrics
// when/if we do, but for now, just report the one stream.
videoTrackStats.forEach((videoTrackReport) => {
if (videoTrackReport.type === 'inbound-rtp') {
client_metrics.rtc_frames_decoded =
videoTrackReport.framesDecoded || 0
client_metrics.rtc_frames_dropped =
videoTrackReport.framesDropped || 0
client_metrics.rtc_frames_received =
videoTrackReport.framesReceived || 0
client_metrics.rtc_frames_per_second =
videoTrackReport.framesPerSecond || 0
client_metrics.rtc_freeze_count =
videoTrackReport.freezeCount || 0
client_metrics.rtc_jitter_sec = videoTrackReport.jitter || 0.0
client_metrics.rtc_keyframes_decoded =
videoTrackReport.keyFramesDecoded || 0
client_metrics.rtc_total_freezes_duration_sec =
videoTrackReport.totalFreezesDuration || 0
} else if (videoTrackReport.type === 'transport') {
// videoTrackReport.bytesReceived,
// videoTrackReport.bytesSent,
}
})
resolve(client_metrics)
})
})
}
// The app is eager to use the MediaStream; as soon as onNewTrack is
// called, the following sequence happens:
// EngineConnection.onNewTrack -> StoreState.setMediaStream ->
// Stream.tsx reacts to mediaStream change, setting a video element.
// We wait until connectionstatechange changes to "connected"
// to pass it to the rest of the application.
this.mediaStream = mediaStream
})
this.pc.addEventListener('datachannel', (event) => {
this.unreliableDataChannel = event.channel
this.state = {
type: EngineConnectionStateType.Connecting,
value: {
type: ConnectingType.DataChannelConnecting,
value: event.channel.label,
},
}
this.unreliableDataChannel.addEventListener('open', (event) => {
this.state = {
type: EngineConnectionStateType.Connecting,
value: {
type: ConnectingType.DataChannelEstablished,
},
}
if (this.shouldTrace()) {
dataChannelSpan.resolve?.()
}
this.onDataChannelOpen(this)
// Everything is now connected.
this.state = { type: EngineConnectionStateType.ConnectionEstablished }
this.onEngineConnectionOpen(this)
})
this.unreliableDataChannel.addEventListener('close', (event) => {
console.log(event)
console.log('unreliable data channel closed')
this.disconnectAll()
this.unreliableDataChannel = undefined
if (this.areAllConnectionsClosed()) {
this.state = {
type: EngineConnectionStateType.Disconnected,
value: { type: DisconnectedType.Quit },
}
}
})
this.unreliableDataChannel.addEventListener('error', (event) => {
this.disconnectAll()
this.state = {
type: EngineConnectionStateType.Disconnected,
value: {
type: DisconnectedType.Error,
value: new Error(event.toString()),
},
}
})
})
}
this.state = {
type: EngineConnectionStateType.Connecting,
value: {
type: ConnectingType.WebSocketConnecting,
},
}
this.websocket = new WebSocket(this.url, [])
this.websocket.binaryType = 'arraybuffer'
this.pc = new RTCPeerConnection()
this.pc.createDataChannel('unreliable_modeling_cmds')
this.websocket.addEventListener('open', (event) => {
console.log('Connected to websocket, waiting for ICE servers')
this.state = {
type: EngineConnectionStateType.Connecting,
value: {
type: ConnectingType.WebSocketEstablished,
},
}
this.onWebsocketOpen(this)
// This is required for when KCMA is running stand-alone / within Tauri.
// Otherwise when run in a browser, the token is sent implicitly via
// the Cookie header.
if (this.token) {
this.send({ headers: { Authorization: `Bearer ${this.token}` } })
}
})
this.pc.addEventListener('icecandidateerror', (_event) => {
const event = _event as RTCPeerConnectionIceErrorEvent
console.error(
`ICE candidate returned an error: ${event.errorCode}: ${event.errorText} for ${event.url}`
)
})
this.pc.addEventListener('connectionstatechange', (event) => {
if (this.pc?.iceConnectionState === 'connected') {
if (this.shouldTrace()) {
iceSpan.resolve?.()
}
} else if (this.pc?.iceConnectionState === 'failed') {
// failed is a terminal state; let's explicitly kill the
// connection to the server at this point.
console.log('failed to negotiate ice connection; restarting')
this.close()
}
})
this.websocket.addEventListener('open', (event) => {
if (this.shouldTrace()) {
websocketSpan.resolve?.()
handshakeSpan = new SpanPromise(
webrtcMediaTransaction.startChild({ op: 'handshake' })
)
iceSpan = new SpanPromise(
webrtcMediaTransaction.startChild({ op: 'ice' })
)
dataChannelSpan = new SpanPromise(
webrtcMediaTransaction.startChild({
op: 'data-channel',
})
)
mediaTrackSpan = new SpanPromise(
webrtcMediaTransaction.startChild({
op: 'media-track',
})
)
handshakeSpan = spanStart('handshake')
iceSpan = spanStart('ice')
dataChannelSpan = spanStart('data-channel')
mediaTrackSpan = spanStart('media-track')
}
if (this.shouldTrace()) {
Promise.all([
void Promise.all([
handshakeSpan.promise,
iceSpan.promise,
dataChannelSpan.promise,
@ -269,18 +554,30 @@ class EngineConnection {
webrtcMediaTransaction?.finish()
})
}
this.onWebsocketOpen(this)
})
this.websocket.addEventListener('close', (event) => {
console.log('websocket connection closed', event)
this.close()
this.disconnectAll()
this.websocket = undefined
if (this.areAllConnectionsClosed()) {
this.state = {
type: EngineConnectionStateType.Disconnected,
value: { type: DisconnectedType.Quit },
}
}
})
this.websocket.addEventListener('error', (event) => {
console.log('websocket connection error', event)
this.close()
this.disconnectAll()
this.state = {
type: EngineConnectionStateType.Disconnected,
value: {
type: DisconnectedType.Error,
value: new Error(event.toString()),
},
}
})
this.websocket.addEventListener('message', (event) => {
@ -314,28 +611,137 @@ class EngineConnection {
}
let resp = message.resp
if (!resp) {
// If there's no body to the response, we can bail here.
// If there's no body to the response, we can bail here.
// !resp.type is usually "pong" response for our "ping"
if (!resp || !resp.type) {
return
}
if (resp.type === 'sdp_answer') {
let answer = resp.data?.answer
if (!answer || answer.type === 'unspecified') {
return
}
console.log('received', resp)
if (this.pc?.signalingState !== 'stable') {
// If the connection is stable, we shouldn't bother updating the
// SDP, since we have a stable connection to the backend. If we
// need to renegotiate, the whole PeerConnection needs to get
// tore down.
this.pc?.setRemoteDescription(
new RTCSessionDescription({
type: answer.type,
sdp: answer.sdp,
switch (resp.type) {
case 'ice_server_info':
let ice_servers = resp.data?.ice_servers
// Now that we have some ICE servers it makes sense
// to start initializing the RTCPeerConnection. RTCPeerConnection
// will begin the ICE process.
createPeerConnection()
this.state = {
type: EngineConnectionStateType.Connecting,
value: {
type: ConnectingType.PeerConnectionCreated,
},
}
// No ICE servers can be valid in a local dev. env.
if (ice_servers?.length === 0) {
console.warn('No ICE servers')
this.pc?.setConfiguration({})
} else {
// When we set the Configuration, we want to always force
// iceTransportPolicy to 'relay', since we know the topology
// of the ICE/STUN/TUN server and the engine. We don't wish to
// talk to the engine in any configuration /other/ than relay
// from a infra POV.
this.pc?.setConfiguration({
iceServers: ice_servers,
iceTransportPolicy: 'relay',
})
)
}
this.state = {
type: EngineConnectionStateType.Connecting,
value: {
type: ConnectingType.ICEServersSet,
},
}
// We have an ICE Servers set now. We just setConfiguration, so let's
// start adding things we care about to the PeerConnection and let
// ICE negotiation happen in the background. Everything from here
// until the end of this function is setup of our end of the
// PeerConnection and waiting for events to fire our callbacks.
// Add a transceiver to our SDP offer
this.pc?.addTransceiver('video', {
direction: 'recvonly',
})
// Create a session description offer based on our local environment
// that we will send to the remote end. The remote will send back
// what it supports via sdp_answer.
this.pc
?.createOffer()
.then((offer: RTCSessionDescriptionInit) => {
console.log(offer)
this.state = {
type: EngineConnectionStateType.Connecting,
value: {
type: ConnectingType.SetLocalDescription,
},
}
return this.pc?.setLocalDescription(offer).then(() => {
this.send({
type: 'sdp_offer',
offer,
})
this.state = {
type: EngineConnectionStateType.Connecting,
value: {
type: ConnectingType.OfferedSdp,
},
}
})
})
.catch((error: Error) => {
console.error(error)
// The local description is invalid, so there's no point continuing.
this.disconnectAll()
this.state = {
type: EngineConnectionStateType.Disconnected,
value: {
type: DisconnectedType.Error,
value: error,
},
}
})
break
case 'sdp_answer':
let answer = resp.data?.answer
if (!answer || answer.type === 'unspecified') {
return
}
this.state = {
type: EngineConnectionStateType.Connecting,
value: {
type: ConnectingType.ReceivedSdp,
},
}
// As soon as this is set, RTCPeerConnection tries to
// establish a connection.
// @ts-ignore
// Have to ignore because dom.ts doesn't have the right type
void this.pc?.setRemoteDescription(answer)
this.state = {
type: EngineConnectionStateType.Connecting,
value: {
type: ConnectingType.SetRemoteDescription,
},
}
this.state = {
type: EngineConnectionStateType.Connecting,
value: {
type: ConnectingType.WebRTCConnecting,
},
}
if (this.shouldTrace()) {
// When both ends have a local and remote SDP, we've been able to
@ -343,194 +749,46 @@ class EngineConnection {
// servers, but this is hand-shook.
handshakeSpan.resolve?.()
}
}
} else if (resp.type === 'trickle_ice') {
let candidate = resp.data?.candidate
this.pc?.addIceCandidate(candidate as RTCIceCandidateInit)
} else if (resp.type === 'ice_server_info' && this.pc) {
console.log('received ice_server_info')
let ice_servers = resp.data?.ice_servers
break
if (ice_servers?.length > 0) {
// When we set the Configuration, we want to always force
// iceTransportPolicy to 'relay', since we know the topology
// of the ICE/STUN/TUN server and the engine. We don't wish to
// talk to the engine in any configuration /other/ than relay
// from a infra POV.
this.pc.setConfiguration({
iceServers: ice_servers,
iceTransportPolicy: 'relay',
})
} else {
this.pc?.setConfiguration({})
}
case 'trickle_ice':
let candidate = resp.data?.candidate
console.log('trickle_ice: using this candidate: ', candidate)
void this.pc?.addIceCandidate(candidate as RTCIceCandidateInit)
break
// We have an ICE Servers set now. We just setConfiguration, so let's
// start adding things we care about to the PeerConnection and let
// ICE negotiation happen in the background. Everything from here
// until the end of this function is setup of our end of the
// PeerConnection and waiting for events to fire our callbacks.
this.pc.addEventListener('icecandidate', (event) => {
if (!this.pc || !this.websocket) return
if (event.candidate !== null) {
console.log('sending trickle ice candidate')
const { candidate } = event
this.send({
type: 'trickle_ice',
candidate: candidate.toJSON(),
})
}
})
// Offer to receive 1 video track
this.pc.addTransceiver('video', {})
// Finally (but actually firstly!), to kick things off, we're going to
// generate our SDP, set it on our PeerConnection, and let the server
// know about our capabilities.
this.pc
.createOffer()
.then(async (descriptionInit) => {
await this?.pc?.setLocalDescription(descriptionInit)
console.log('sent sdp_offer begin')
this.send({
type: 'sdp_offer',
offer: this.pc?.localDescription,
})
})
.catch(console.log)
} else if (resp.type === 'metrics_request') {
if (this.webrtcStatsCollector === undefined) {
// TODO: Error message here?
return
}
this.webrtcStatsCollector().then((client_metrics) => {
this.send({
type: 'metrics_response',
metrics: client_metrics,
})
})
}
})
this.pc.addEventListener('track', (event) => {
const mediaStream = event.streams[0]
if (this.shouldTrace()) {
let mediaStreamTrack = mediaStream.getVideoTracks()[0]
mediaStreamTrack.addEventListener('unmute', () => {
// let settings = mediaStreamTrack.getSettings()
// mediaTrackSpan.span.setTag("fps", settings.frameRate)
// mediaTrackSpan.span.setTag("width", settings.width)
// mediaTrackSpan.span.setTag("height", settings.height)
mediaTrackSpan.resolve?.()
})
}
this.webrtcStatsCollector = (): Promise<ClientMetrics> => {
return new Promise((resolve, reject) => {
if (mediaStream.getVideoTracks().length !== 1) {
reject(new Error('too many video tracks to report'))
case 'metrics_request':
if (this.webrtcStatsCollector === undefined) {
// TODO: Error message here?
return
}
let videoTrack = mediaStream.getVideoTracks()[0]
this.pc?.getStats(videoTrack).then((videoTrackStats) => {
let client_metrics: ClientMetrics = {
rtc_frames_decoded: 0,
rtc_frames_dropped: 0,
rtc_frames_received: 0,
rtc_frames_per_second: 0,
rtc_freeze_count: 0,
rtc_jitter_sec: 0.0,
rtc_keyframes_decoded: 0,
rtc_total_freezes_duration_sec: 0.0,
}
// TODO(paultag): Since we can technically have multiple WebRTC
// video tracks (even if the Server doesn't at the moment), we
// ought to send stats for every video track(?), and add the stream
// ID into it. This raises the cardinality of collected metrics
// when/if we do, but for now, just report the one stream.
videoTrackStats.forEach((videoTrackReport) => {
if (videoTrackReport.type === 'inbound-rtp') {
client_metrics.rtc_frames_decoded =
videoTrackReport.framesDecoded || 0
client_metrics.rtc_frames_dropped =
videoTrackReport.framesDropped || 0
client_metrics.rtc_frames_received =
videoTrackReport.framesReceived || 0
client_metrics.rtc_frames_per_second =
videoTrackReport.framesPerSecond || 0
client_metrics.rtc_freeze_count =
videoTrackReport.freezeCount || 0
client_metrics.rtc_jitter_sec = videoTrackReport.jitter || 0.0
client_metrics.rtc_keyframes_decoded =
videoTrackReport.keyFramesDecoded || 0
client_metrics.rtc_total_freezes_duration_sec =
videoTrackReport.totalFreezesDuration || 0
} else if (videoTrackReport.type === 'transport') {
// videoTrackReport.bytesReceived,
// videoTrackReport.bytesSent,
}
void this.webrtcStatsCollector().then((client_metrics) => {
this.send({
type: 'metrics_response',
metrics: client_metrics,
})
resolve(client_metrics)
})
})
break
}
this.onNewTrack({
conn: this,
mediaStream: mediaStream,
})
})
this.pc.addEventListener('datachannel', (event) => {
this.unreliableDataChannel = event.channel
console.log('accepted unreliable data channel', event.channel.label)
this.unreliableDataChannel.addEventListener('open', (event) => {
console.log('unreliable data channel opened', event)
if (this.shouldTrace()) {
dataChannelSpan.resolve?.()
}
this.onDataChannelOpen(this)
this.ready = true
this.connecting = false
// Do this after we set the connection is ready to avoid errors when
// we try to send messages before the connection is ready.
this.onEngineConnectionOpen(this)
})
this.unreliableDataChannel.addEventListener('close', (event) => {
console.log('unreliable data channel closed')
this.close()
})
this.unreliableDataChannel.addEventListener('error', (event) => {
console.log('unreliable data channel error')
this.close()
})
})
const connectionTimeoutMs = VITE_KC_CONNECTION_TIMEOUT_MS
if (this.failedConnTimeout) {
console.log('clearing timeout before set')
clearTimeout(this.failedConnTimeout)
this.failedConnTimeout = null
}
console.log('timeout set')
this.failedConnTimeout = setTimeout(() => {
if (this.isReady()) {
return
}
console.log('engine connection timeout on connection, closing')
this.close()
this.failedConnTimeout = null
this.disconnectAll()
this.state = {
type: EngineConnectionStateType.Disconnected,
value: {
type: DisconnectedType.Timeout,
},
}
}, connectionTimeoutMs)
this.onConnectionStarted(this)
@ -549,23 +807,15 @@ class EngineConnection {
typeof message === 'string' ? message : JSON.stringify(message)
)
}
close() {
disconnectAll() {
this.websocket?.close()
this.pc?.close()
this.unreliableDataChannel?.close()
this.websocket = undefined
this.pc = undefined
this.unreliableDataChannel = undefined
this.pc?.close()
this.webrtcStatsCollector = undefined
if (this.failedConnTimeout) {
console.log('closed timeout in close')
clearTimeout(this.failedConnTimeout)
this.failedConnTimeout = null
}
this.onClose(this)
this.ready = false
this.connecting = false
}
areAllConnectionsClosed() {
console.log(this.websocket, this.pc, this.unreliableDataChannel)
return !this.websocket && !this.pc && !this.unreliableDataChannel
}
}
@ -685,7 +935,7 @@ export class EngineCommandManager {
// We also do this here because we want to ensure we create the gizmo
// and execute the code everytime the stream is restarted.
const gizmoId = uuidv4()
this.sendSceneCommand({
void this.sendSceneCommand({
type: 'modeling_cmd_req',
cmd_id: gizmoId,
cmd: {
@ -698,7 +948,7 @@ export class EngineCommandManager {
})
// Initialize the planes.
this.initPlanes().then(() => {
void this.initPlanes().then(() => {
// We execute the code here to make sure if the stream was to
// restart in a session, we want to make sure to execute the code.
// We force it to re-execute the code because we want to make sure
@ -745,7 +995,7 @@ export class EngineCommandManager {
// because in all other cases we send JSON strings. But in the case of
// export we send a binary blob.
// Pass this to our export function.
exportSave(event.data)
void exportSave(event.data)
} else {
const message: Models['WebSocketResponse_type'] = JSON.parse(
event.data