Start a refactor of the connection to the Engine (#284)

The front-end and the back-end communicate with three channels.  The
first is the WebSocket connection to the Engine API. Once that
connection is online, a WebRTC connection is negotiated, which contains
one video stream from the server to us for the GUI, and a second, which
is a binary data channel from us to the server, which we send JSON over
for real-time events like mouse positioning.

The lifecycle of the WebRTC connection and the WebSocket connection are
tied, since if the WebSocket connection breaks down, the WebRTC
connection must get restarted (to get a connection to the *same* backend
that we have an open WebSocket connection to).

This starts a move to split the WebRTC and WebSocket pair to be managed
by a new class (EngineConnection), which will only start and maintain
the WebSocket and WebRTC channels. Anything using the EngineConnection
will be able to communnicate commands without needing to add control
logic for the underlying data channels.

Signed-off-by: Paul R. Tagliamonte <paul@kittycad.io>
This commit is contained in:
Paul Tagliamonte
2023-08-18 16:16:16 -04:00
committed by GitHub
parent 3b9094e0dd
commit 01f95d4ace

View File

@ -38,14 +38,200 @@ type OkResponse = Models['OkModelingCmdResponse_type']
type WebSocketResponse = Models['WebSocketResponses_type']
// EngineConnection encapsulates the connection(s) to the Engine
// for the EngineCommandManager; namely, the underlying WebSocket
// and WebRTC connections.
export class EngineConnection {
websocket?: WebSocket
pc?: RTCPeerConnection
lossyDataChannel?: RTCDataChannel
onConnectionStarted: (conn: EngineConnection) => void = () => {}
waitForReady: Promise<void> = new Promise(() => {})
private resolveReady = () => {}
readonly url: string
private readonly token?: string
constructor({
url,
token,
onConnectionStarted,
}: {
url: string
token?: string
onConnectionStarted: (conn: EngineConnection) => void
}) {
this.url = url
this.token = token
this.onConnectionStarted = onConnectionStarted
// TODO(paultag): This isn't right; this should be when the
// connection is in a good place, and tied to the connect() method,
// but this is part of a larger refactor to untangle logic. Once the
// Connection is pulled apart, we can rework how ready is represented.
// This was just the easiest way to ensure some level of parity between
// the CommandManager and the Connection until I send a rework for
// retry logic.
this.waitForReady = new Promise((resolve) => {
this.resolveReady = resolve
})
}
connect() {
this.websocket = new WebSocket(this.url, [])
this.websocket.binaryType = 'arraybuffer'
this.pc = new RTCPeerConnection()
this.pc.createDataChannel('unreliable_modeling_cmds')
this.websocket.addEventListener('open', (event) => {
console.log('Connected to websocket, waiting for ICE servers')
if (this.token) {
this.websocket?.send(
JSON.stringify({ headers: { Authorization: `Bearer ${this.token}` } })
)
}
})
this.websocket.addEventListener('close', (event) => {
console.log('websocket connection closed', event)
})
this.websocket.addEventListener('error', (event) => {
console.log('websocket connection error', event)
})
this.websocket.addEventListener('message', (event) => {
// In the EngineConnection, we're looking for messages to/from
// the server that relate to the ICE handshake, or WebRTC
// negotiation. There may be other messages (including ArrayBuffer
// messages) that are intended for the GUI itself, so be careful
// when assuming we're the only consumer or that all messages will
// be carefully formatted here.
if (typeof event.data !== 'string') {
return
}
const message: WebSocketResponse = JSON.parse(event.data)
if (
message.type === 'sdp_answer' &&
message.answer.type !== 'unspecified'
) {
this.pc?.setRemoteDescription(
new RTCSessionDescription({
type: message.answer.type,
sdp: message.answer.sdp,
})
)
} else if (message.type === 'trickle_ice') {
this.pc?.addIceCandidate(message.candidate as RTCIceCandidateInit)
} else if (message.type === 'ice_server_info' && this.pc) {
console.log('received ice_server_info')
if (message.ice_servers.length > 0) {
// When we set the Configuration, we want to always force
// iceTransportPolicy to 'relay', since we know the topology
// of the ICE/STUN/TUN server and the engine. We don't wish to
// talk to the engine in any configuration /other/ than relay
// from a infra POV.
this.pc.setConfiguration({
iceServers: message.ice_servers,
iceTransportPolicy: 'relay',
})
} else {
this.pc?.setConfiguration({})
}
// We have an ICE Servers set now. We just setConfiguration, so let's
// start adding things we care about to the PeerConnection and let
// ICE negotiation happen in the background. Everything from here
// until the end of this function is setup of our end of the
// PeerConnection and waiting for events to fire our callbacks.
this.pc.addEventListener('connectionstatechange', (e) =>
console.log(this.pc?.iceConnectionState)
)
this.pc.addEventListener('icecandidate', (event) => {
if (!this.pc || !this.websocket) return
if (event.candidate === null) {
console.log('sent sdp_offer')
this.websocket.send(
JSON.stringify({
type: 'sdp_offer',
offer: this.pc.localDescription,
})
)
} else {
console.log('sending trickle ice candidate')
const { candidate } = event
this.websocket?.send(
JSON.stringify({
type: 'trickle_ice',
candidate: candidate.toJSON(),
})
)
}
})
// Offer to receive 1 video track
this.pc.addTransceiver('video', {})
// Finally (but actually firstly!), to kick things off, we're going to
// generate our SDP, set it on our PeerConnection, and let the server
// know about our capabilities.
this.pc
.createOffer()
.then(async (descriptionInit) => {
await this?.pc?.setLocalDescription(descriptionInit)
console.log('sent sdp_offer begin')
const msg = JSON.stringify({
type: 'sdp_offer',
offer: this.pc?.localDescription,
})
this.websocket?.send(msg)
})
.catch(console.log)
}
})
this.pc.addEventListener('datachannel', (event) => {
this.lossyDataChannel = event.channel
console.log('accepted lossy data channel', event.channel.label)
this.lossyDataChannel.addEventListener('open', (event) => {
this.resolveReady()
console.log('lossy data channel opened', event)
})
this.lossyDataChannel.addEventListener('close', (event) => {
console.log('lossy data channel closed')
})
this.lossyDataChannel.addEventListener('error', (event) => {
console.log('lossy data channel error')
})
})
if (this.onConnectionStarted) this.onConnectionStarted(this)
}
close() {
this.websocket?.close()
this.pc?.close()
this.lossyDataChannel?.close()
}
}
export class EngineCommandManager {
artifactMap: ArtifactMap = {}
sourceRangeMap: SourceRangeMap = {}
outSequence = 1
inSequence = 1
socket?: WebSocket
pc?: RTCPeerConnection
lossyDataChannel?: RTCDataChannel
engineConnection?: EngineConnection
waitForReady: Promise<void> = new Promise(() => {})
private resolveReady = () => {}
onHoverCallback: (id?: string) => void = () => {}
@ -69,183 +255,98 @@ export class EngineCommandManager {
this.resolveReady = resolve
})
const url = `${VITE_KC_API_WS_MODELING_URL}?video_res_width=${width}&video_res_height=${height}`
this.socket = new WebSocket(url, [])
this.engineConnection = new EngineConnection({
url,
token,
onConnectionStarted: (conn) => {
this.engineConnection?.pc?.addEventListener('track', (event) => {
console.log('received track', event)
const mediaStream = event.streams[0]
setMediaStream(mediaStream)
})
// Change binary type from "blob" to "arraybuffer"
this.socket.binaryType = 'arraybuffer'
this.pc = new RTCPeerConnection()
this.pc.createDataChannel('unreliable_modeling_cmds')
this.socket.addEventListener('open', (event) => {
console.log('Connected to websocket, waiting for ICE servers')
if (token) {
this.socket?.send(
JSON.stringify({ headers: { Authorization: `Bearer ${token}` } })
)
}
})
this.socket.addEventListener('close', (event) => {
console.log('websocket connection closed', event)
})
this.socket.addEventListener('error', (event) => {
console.log('websocket connection error', event)
})
this?.socket?.addEventListener('message', (event) => {
if (!this.socket || !this.pc) return
// console.log('Message from server ', event.data);
if (event.data instanceof ArrayBuffer) {
// If the data is an ArrayBuffer, it's the result of an export command,
// because in all other cases we send JSON strings. But in the case of
// export we send a binary blob.
// Pass this to our export function.
exportSave(event.data)
} else if (
typeof event.data === 'string' &&
event.data.toLocaleLowerCase().startsWith('error')
) {
console.warn('something went wrong: ', event.data)
} else {
const message: WebSocketResponse = JSON.parse(event.data)
if (
message.type === 'sdp_answer' &&
message.answer.type !== 'unspecified'
) {
this.pc?.setRemoteDescription(
new RTCSessionDescription({
type: message.answer.type,
sdp: message.answer.sdp,
})
)
} else if (message.type === 'trickle_ice') {
this.pc?.addIceCandidate(message.candidate as RTCIceCandidateInit)
} else if (message.type === 'ice_server_info' && this.pc) {
console.log('received ice_server_info')
if (message.ice_servers.length > 0) {
this.pc?.setConfiguration({
iceServers: message.ice_servers,
iceTransportPolicy: 'relay',
})
} else {
this.pc?.setConfiguration({})
}
this.pc.addEventListener('track', (event) => {
console.log('received track', event)
const mediaStream = event.streams[0]
setMediaStream(mediaStream)
})
this.pc.addEventListener('connectionstatechange', (e) =>
console.log(this?.pc?.iceConnectionState)
)
this.pc.addEventListener('icecandidate', (event) => {
if (!this.pc || !this.socket) return
if (event.candidate === null) {
console.log('sent sdp_offer')
this.socket.send(
JSON.stringify({
type: 'sdp_offer',
offer: this.pc.localDescription,
})
)
} else {
console.log('sending trickle ice candidate')
const { candidate } = event
this.socket?.send(
JSON.stringify({
type: 'trickle_ice',
candidate: candidate.toJSON(),
})
)
this.engineConnection?.pc?.addEventListener('datachannel', (event) => {
let lossyDataChannel = event.channel
lossyDataChannel.addEventListener('message', (event) => {
const result: OkResponse = JSON.parse(event.data)
if (
result.type === 'highlight_set_entity' &&
result.sequence &&
result.sequence > this.inSequence
) {
this.onHoverCallback(result.entity_id)
this.inSequence = result.sequence
}
})
})
// Offer to receive 1 video track
this.pc.addTransceiver('video', {
direction: 'sendrecv',
})
this.pc
.createOffer()
.then(async (descriptionInit) => {
await this?.pc?.setLocalDescription(descriptionInit)
console.log('sent sdp_offer begin')
const msg = JSON.stringify({
type: 'sdp_offer',
offer: this.pc?.localDescription,
})
this.socket?.send(msg)
})
.catch(console.log)
// When the EngineConnection starts a connection, we want to register
// callbacks into the WebSocket/PeerConnection.
conn.websocket?.addEventListener('message', (event) => {
if (event.data instanceof ArrayBuffer) {
// If the data is an ArrayBuffer, it's the result of an export command,
// because in all other cases we send JSON strings. But in the case of
// export we send a binary blob.
// Pass this to our export function.
exportSave(event.data)
} else if (
typeof event.data === 'string' &&
event.data.toLocaleLowerCase().startsWith('error')
) {
console.warn('something went wrong: ', event.data)
} else {
const message: WebSocketResponse = JSON.parse(event.data)
this.pc.addEventListener('datachannel', (event) => {
this.lossyDataChannel = event.channel
console.log('accepted lossy data channel', event.channel.label)
this.lossyDataChannel.addEventListener('open', (event) => {
setIsStreamReady(true)
this.resolveReady()
console.log('lossy data channel opened', event)
})
this.lossyDataChannel.addEventListener('close', (event) => {
console.log('lossy data channel closed')
})
this.lossyDataChannel.addEventListener('error', (event) => {
console.log('lossy data channel error')
})
this.lossyDataChannel.addEventListener('message', (event) => {
const result: OkResponse = JSON.parse(event.data)
if (
result.type === 'highlight_set_entity' &&
result.sequence &&
result.sequence > this.inSequence
) {
this.onHoverCallback(result.entity_id)
this.inSequence = result.sequence
if (message.type === 'modeling') {
const id = message.cmd_id
const command = this.artifactMap[id]
if ('ok' in message.result) {
const result: OkResponse = message.result.ok
if (result.type === 'select_with_point') {
if (result.entity_id) {
this.onClickCallback({
id: result.entity_id,
type: 'default',
})
} else {
this.onClickCallback()
}
}
}
})
})
} else if (message.type === 'modeling') {
const id = message.cmd_id
const command = this.artifactMap[id]
if ('ok' in message.result) {
const result: OkResponse = message.result.ok
if (result.type === 'select_with_point') {
if (result.entity_id) {
this.onClickCallback({
id: result.entity_id,
type: 'default',
if (command && command.type === 'pending') {
const resolve = command.resolve
this.artifactMap[id] = {
type: 'result',
data: message.result,
}
resolve({
id,
})
} else {
this.onClickCallback()
this.artifactMap[id] = {
type: 'result',
data: message.result,
}
}
}
}
if (command && command.type === 'pending') {
const resolve = command.resolve
this.artifactMap[id] = {
type: 'result',
data: message.result,
}
resolve({
id,
})
} else {
this.artifactMap[id] = {
type: 'result',
data: message.result,
}
}
}
}
})
},
})
// TODO(paultag): this isn't quite right, and the double promises is
// pretty grim.
this.engineConnection?.waitForReady.then(this.resolveReady)
this.waitForReady.then(() => {
setIsStreamReady(true)
})
this.engineConnection?.connect()
}
tearDown() {
// close all channels, sockets and WebRTC connections
this.lossyDataChannel?.close()
this.socket?.close()
this.pc?.close()
this.engineConnection?.close()
}
startNewSession() {
@ -253,7 +354,7 @@ export class EngineCommandManager {
this.sourceRangeMap = {}
}
endSession() {
// this.socket?.close()
// this.websocket?.close()
// socket.off('command')
}
onHover(callback: (id?: string) => void) {
@ -272,7 +373,7 @@ export class EngineCommandManager {
otherSelections: Selections['otherSelections']
idBasedSelections: { type: string; id: string }[]
}) {
if (this.socket?.readyState === 0) {
if (this.engineConnection?.websocket?.readyState === 0) {
console.log('socket not open')
return
}
@ -293,25 +394,31 @@ export class EngineCommandManager {
})
}
sendSceneCommand(command: EngineCommand) {
if (this.socket?.readyState === 0) {
if (this.engineConnection?.websocket?.readyState === 0) {
console.log('socket not ready')
return
}
if (command.type !== 'modeling_cmd_req') return
const cmd = command.cmd
if (cmd.type === 'camera_drag_move' && this.lossyDataChannel) {
if (
cmd.type === 'camera_drag_move' &&
this.engineConnection?.lossyDataChannel
) {
cmd.sequence = this.outSequence
this.outSequence++
this.lossyDataChannel.send(JSON.stringify(command))
this.engineConnection?.lossyDataChannel?.send(JSON.stringify(command))
return
} else if (cmd.type === 'highlight_set_entity' && this.lossyDataChannel) {
} else if (
cmd.type === 'highlight_set_entity' &&
this.engineConnection?.lossyDataChannel
) {
cmd.sequence = this.outSequence
this.outSequence++
this.lossyDataChannel.send(JSON.stringify(command))
this.engineConnection?.lossyDataChannel?.send(JSON.stringify(command))
return
}
console.log('sending command', command)
this.socket?.send(JSON.stringify(command))
this.engineConnection?.websocket?.send(JSON.stringify(command))
}
sendModellingCommand({
id,
@ -326,11 +433,11 @@ export class EngineCommandManager {
}): Promise<any> {
this.sourceRangeMap[id] = range
if (this.socket?.readyState === 0) {
if (this.engineConnection?.websocket?.readyState === 0) {
console.log('socket not ready')
return new Promise(() => {})
}
this.socket?.send(JSON.stringify(command))
this.engineConnection?.websocket?.send(JSON.stringify(command))
let resolve: (val: any) => void = () => {}
const promise = new Promise((_resolve, reject) => {
resolve = _resolve