Build out EngineConnection's retry and timeout logic (#299)

* Build out EngineConnection's retry and timeout logic

 * Migrate the EngineConnection to be an EventTarget for other parts of the
   code (mostly the EngineManager, but maybe others?) to listen to, rather
   than having a boolean 'done' promise, and remove callbacks in favor of
   the eventListeners.

 * When a WebRTC connection is online, send a 'ping' command every 10 seconds.
   The UDP stream likely needs something similar, but the connection is
   maintained by the WebRTC video stream for now.

 * Begin to migrate code to use a more generic object "send" helper
   which can handle the JSON encoding, as well as connection retry logic
   in the future.

 * Add a watchdog to trigger 5 seconds after a connection is initiated
   to cancel and retry the connection if it's not become ready by the
   time it wakes up. This won't watch an established connection yet.

Signed-off-by: Paul R. Tagliamonte <paul@kittycad.io>
This commit is contained in:
Paul Tagliamonte
2023-08-21 16:53:31 -04:00
committed by GitHub
parent 6809a46b6a
commit 75bb91c7e1

View File

@ -32,56 +32,82 @@ interface CursorSelectionsArgs {
idBasedSelections: { type: string; id: string }[]
}
interface NewTrackArgs {
conn: EngineConnection
mediaStream: MediaStream
}
export type EngineCommand = Models['WebSocketMessages_type']
type OkResponse = Models['OkModelingCmdResponse_type']
type WebSocketResponse = Models['WebSocketResponses_type']
enum EngineConnectionEvents {
ConnectionStarted = 'connectionStarted',
WebsocketOpen = 'websocketOpen',
NewTrack = 'newTrack',
DataChannelOpen = 'dataChannelOpen',
Open = 'open',
Close = 'close',
}
// EngineConnection encapsulates the connection(s) to the Engine
// for the EngineCommandManager; namely, the underlying WebSocket
// and WebRTC connections.
export class EngineConnection {
export class EngineConnection extends EventTarget {
websocket?: WebSocket
pc?: RTCPeerConnection
lossyDataChannel?: RTCDataChannel
onConnectionStarted: (conn: EngineConnection) => void = () => {}
waitForReady: Promise<void> = new Promise(() => {})
private resolveReady = () => {}
private ready: boolean
readonly url: string
private readonly token?: string
constructor({
url,
token,
onConnectionStarted,
}: {
url: string
token?: string
onConnectionStarted: (conn: EngineConnection) => void
}) {
constructor({ url, token }: { url: string; token?: string }) {
super()
this.url = url
this.token = token
this.onConnectionStarted = onConnectionStarted
this.ready = false
// TODO(paultag): This isn't right; this should be when the
// connection is in a good place, and tied to the connect() method,
// but this is part of a larger refactor to untangle logic. Once the
// Connection is pulled apart, we can rework how ready is represented.
// This was just the easiest way to ensure some level of parity between
// the CommandManager and the Connection until I send a rework for
// retry logic.
this.waitForReady = new Promise((resolve) => {
this.resolveReady = resolve
this.addEventListener(EngineConnectionEvents.Open, () => {
this.ready = true
})
this.addEventListener(EngineConnectionEvents.Close, () => {
this.ready = false
})
// TODO(paultag): This ought to be tweakable.
const pingIntervalMs = 10000
setInterval(() => {
if (this.isReady()) {
// When we're online, every 10 seconds, we'll attempt to put a 'ping'
// command through the WebSocket connection. This will help both ends
// of the connection maintain the TCP connection without hitting a
// timeout condition.
this.send({ type: 'ping' })
}
}, pingIntervalMs)
}
// isReady will return true only when the WebRTC *and* WebSocket connection
// are connected. During setup, the WebSocket connection comes online first,
// which is used to establish the WebRTC connection. The EngineConnection
// is not "Ready" until both are connected.
isReady() {
return this.ready
}
// connect will attempt to connect to the Engine over a WebSocket, and
// establish the WebRTC connections.
//
// This will attempt the full handshake, and retry if the connection
// did not establish.
connect() {
this.websocket = new WebSocket(this.url, [])
// TODO(paultag): make this safe to call multiple times, and figure out
// when a connection is in progress (state: connecting or something).
this.websocket = new WebSocket(this.url, [])
this.websocket.binaryType = 'arraybuffer'
this.pc = new RTCPeerConnection()
@ -89,18 +115,26 @@ export class EngineConnection {
this.websocket.addEventListener('open', (event) => {
console.log('Connected to websocket, waiting for ICE servers')
if (this.token) {
this.websocket?.send(
JSON.stringify({ headers: { Authorization: `Bearer ${this.token}` } })
)
this.send({ headers: { Authorization: `Bearer ${this.token}` } })
}
})
this.websocket.addEventListener('open', (event) => {
this.dispatchEvent(
new CustomEvent(EngineConnectionEvents.WebsocketOpen, {
detail: this,
})
)
})
this.websocket.addEventListener('close', (event) => {
console.log('websocket connection closed', event)
this.close()
})
this.websocket.addEventListener('error', (event) => {
console.log('websocket connection error', event)
this.close()
})
this.websocket.addEventListener('message', (event) => {
@ -115,18 +149,29 @@ export class EngineConnection {
return
}
if (event.data.toLocaleLowerCase().startsWith('error')) {
console.error('something went wrong: ', event.data)
return
}
const message: WebSocketResponse = JSON.parse(event.data)
if (
message.type === 'sdp_answer' &&
message.answer.type !== 'unspecified'
) {
this.pc?.setRemoteDescription(
new RTCSessionDescription({
type: message.answer.type,
sdp: message.answer.sdp,
})
)
if (this.pc?.signalingState !== 'stable') {
// If the connection is stable, we shouldn't bother updating the
// SDP, since we have a stable connection to the backend. If we
// need to renegotiate, the whole PeerConnection needs to get
// tore down.
this.pc?.setRemoteDescription(
new RTCSessionDescription({
type: message.answer.type,
sdp: message.answer.sdp,
})
)
}
} else if (message.type === 'trickle_ice') {
this.pc?.addIceCandidate(message.candidate as RTCIceCandidateInit)
} else if (message.type === 'ice_server_info' && this.pc) {
@ -152,29 +197,27 @@ export class EngineConnection {
// until the end of this function is setup of our end of the
// PeerConnection and waiting for events to fire our callbacks.
this.pc.addEventListener('connectionstatechange', (e) =>
console.log(this.pc?.iceConnectionState)
)
this.pc.addEventListener('connectionstatechange', (event) => {
// if (this.pc?.iceConnectionState === 'disconnected') {
// this.close()
// }
})
this.pc.addEventListener('icecandidate', (event) => {
if (!this.pc || !this.websocket) return
if (event.candidate === null) {
console.log('sent sdp_offer')
this.websocket.send(
JSON.stringify({
type: 'sdp_offer',
offer: this.pc.localDescription,
})
)
this.send({
type: 'sdp_offer',
offer: this.pc.localDescription,
})
} else {
console.log('sending trickle ice candidate')
const { candidate } = event
this.websocket?.send(
JSON.stringify({
type: 'trickle_ice',
candidate: candidate.toJSON(),
})
)
this.send({
type: 'trickle_ice',
candidate: candidate.toJSON(),
})
}
})
@ -189,14 +232,40 @@ export class EngineConnection {
.then(async (descriptionInit) => {
await this?.pc?.setLocalDescription(descriptionInit)
console.log('sent sdp_offer begin')
const msg = JSON.stringify({
this.send({
type: 'sdp_offer',
offer: this.pc?.localDescription,
})
this.websocket?.send(msg)
})
.catch(console.log)
}
// TODO(paultag): This ought to be both controllable, as well as something
// like exponential backoff to have some grace on the backend, as well as
// fix responsiveness for clients that had a weird network hiccup.
const connectionTimeoutMs = 5000
setTimeout(() => {
if (this.isReady()) {
return
}
console.log('engine connection timeout on connection, retrying')
this.close()
this.connect()
}, connectionTimeoutMs)
})
this.pc.addEventListener('track', (event) => {
console.log('received track', event)
const mediaStream = event.streams[0]
this.dispatchEvent(
new CustomEvent(EngineConnectionEvents.NewTrack, {
detail: {
conn: this,
mediaStream: mediaStream,
},
})
)
})
this.pc.addEventListener('datachannel', (event) => {
@ -204,25 +273,53 @@ export class EngineConnection {
console.log('accepted lossy data channel', event.channel.label)
this.lossyDataChannel.addEventListener('open', (event) => {
this.resolveReady()
console.log('lossy data channel opened', event)
this.dispatchEvent(
new CustomEvent(EngineConnectionEvents.DataChannelOpen, {
detail: this,
})
)
this.dispatchEvent(
new CustomEvent(EngineConnectionEvents.Open, {
detail: this,
})
)
})
this.lossyDataChannel.addEventListener('close', (event) => {
console.log('lossy data channel closed')
this.close()
})
this.lossyDataChannel.addEventListener('error', (event) => {
console.log('lossy data channel error')
this.close()
})
})
if (this.onConnectionStarted) this.onConnectionStarted(this)
this.dispatchEvent(
new CustomEvent(EngineConnectionEvents.ConnectionStarted, {
detail: this,
})
)
}
send(message: object) {
// TODO(paultag): Add in logic to determine the connection state and
// take actions if needed?
this.websocket?.send(JSON.stringify(message))
}
close() {
this.websocket?.close()
this.pc?.close()
this.lossyDataChannel?.close()
this.dispatchEvent(
new CustomEvent(EngineConnectionEvents.Close, {
detail: this,
})
)
}
}
@ -258,15 +355,32 @@ export class EngineCommandManager {
this.engineConnection = new EngineConnection({
url,
token,
onConnectionStarted: (conn) => {
this.engineConnection?.pc?.addEventListener('track', (event) => {
console.log('received track', event)
const mediaStream = event.streams[0]
setMediaStream(mediaStream)
})
})
this.engineConnection.addEventListener(
EngineConnectionEvents.Open,
(event) => {
this.resolveReady()
setIsStreamReady(true)
}
)
this.engineConnection.addEventListener(
EngineConnectionEvents.Close,
(event) => {
setIsStreamReady(false)
}
)
this.engineConnection.addEventListener(
EngineConnectionEvents.ConnectionStarted,
(event: Event) => {
let customEvent = <CustomEvent<EngineConnection>>event
let conn = customEvent.detail
this.engineConnection?.pc?.addEventListener('datachannel', (event) => {
let lossyDataChannel = event.channel
lossyDataChannel.addEventListener('message', (event) => {
const result: OkResponse = JSON.parse(event.data)
if (
@ -289,66 +403,72 @@ export class EngineCommandManager {
// export we send a binary blob.
// Pass this to our export function.
exportSave(event.data)
} else if (
typeof event.data === 'string' &&
event.data.toLocaleLowerCase().startsWith('error')
) {
console.warn('something went wrong: ', event.data)
} else {
if (event.data.toLocaleLowerCase().startsWith('error')) {
// Errors are not JSON encoded; if we have an error we can bail
// here; debugging the error to the console happens in the core
// engine code.
return
}
const message: WebSocketResponse = JSON.parse(event.data)
if (message.type === 'modeling') {
const id = message.cmd_id
const command = this.artifactMap[id]
if ('ok' in message.result) {
const result: OkResponse = message.result.ok
if (result.type === 'select_with_point') {
if (result.entity_id) {
this.onClickCallback({
id: result.entity_id,
type: 'default',
})
} else {
this.onClickCallback()
}
}
}
if (command && command.type === 'pending') {
const resolve = command.resolve
this.artifactMap[id] = {
type: 'result',
data: message.result,
}
resolve({
id,
})
} else {
this.artifactMap[id] = {
type: 'result',
data: message.result,
}
}
this.handleModelingCommand(message)
}
}
})
},
})
// TODO(paultag): this isn't quite right, and the double promises is
// pretty grim.
this.engineConnection?.waitForReady.then(this.resolveReady)
this.waitForReady.then(() => {
setIsStreamReady(true)
})
}
)
this.engineConnection.addEventListener(
EngineConnectionEvents.NewTrack,
(event: Event) => {
let customEvent = <CustomEvent<NewTrackArgs>>event
let mediaStream = customEvent.detail.mediaStream
console.log('received track', mediaStream)
setMediaStream(mediaStream)
}
)
this.engineConnection?.connect()
}
handleModelingCommand(message: WebSocketResponse) {
if (message.type !== 'modeling') {
return
}
const id = message.cmd_id
const command = this.artifactMap[id]
if ('ok' in message.result) {
const result: OkResponse = message.result.ok
if (result.type === 'select_with_point') {
if (result.entity_id) {
this.onClickCallback({
id: result.entity_id,
type: 'default',
})
} else {
this.onClickCallback()
}
}
}
if (command && command.type === 'pending') {
const resolve = command.resolve
this.artifactMap[id] = {
type: 'result',
data: message.result,
}
resolve({
id,
})
} else {
this.artifactMap[id] = {
type: 'result',
data: message.result,
}
}
}
tearDown() {
// close all channels, sockets and WebRTC connections
this.engineConnection?.close()
}
startNewSession() {
this.artifactMap = {}
this.sourceRangeMap = {}
@ -372,8 +492,8 @@ export class EngineCommandManager {
otherSelections: Selections['otherSelections']
idBasedSelections: { type: string; id: string }[]
}) {
if (this.engineConnection?.websocket?.readyState === 0) {
console.log('socket not open')
if (!this.engineConnection?.isReady()) {
console.log('engine connection isnt ready')
return
}
this.sendSceneCommand({
@ -393,7 +513,7 @@ export class EngineCommandManager {
})
}
sendSceneCommand(command: EngineCommand) {
if (this.engineConnection?.websocket?.readyState === 0) {
if (!this.engineConnection?.isReady()) {
console.log('socket not ready')
return
}
@ -417,7 +537,7 @@ export class EngineCommandManager {
return
}
console.log('sending command', command)
this.engineConnection?.websocket?.send(JSON.stringify(command))
this.engineConnection?.send(command)
}
sendModellingCommand({
id,
@ -432,11 +552,11 @@ export class EngineCommandManager {
}): Promise<any> {
this.sourceRangeMap[id] = range
if (this.engineConnection?.websocket?.readyState === 0) {
if (!this.engineConnection?.isReady()) {
console.log('socket not ready')
return new Promise(() => {})
}
this.engineConnection?.websocket?.send(JSON.stringify(command))
this.engineConnection?.send(command)
let resolve: (val: any) => void = () => {}
const promise = new Promise((_resolve, reject) => {
resolve = _resolve