diff --git a/src/webrtc/events/Close.ts b/src/webrtc/events/Close.ts index ce03b9fd..ced364f8 100644 --- a/src/webrtc/events/Close.ts +++ b/src/webrtc/events/Close.ts @@ -18,33 +18,34 @@ import { WebSocket } from "@spacebar/gateway"; import { Session } from "@spacebar/util"; +import { getClients } from "../util"; export async function onClose(this: WebSocket, code: number, reason: string) { console.log("[WebRTC] closed", code, reason.toString()); if (this.session_id) await Session.delete({ session_id: this.session_id }); - // // we need to find all consumers on all clients that have a producer in our client - // const clients = getClients(this.client?.channel_id!); + // we need to find all consumers on all clients that have a producer in our client + const clients = getClients(this.client?.channel_id!); - // for (const client of clients) { - // if (client.websocket.user_id === this.user_id) continue; + for (const client of clients) { + if (client.websocket.user_id === this.user_id) continue; - // // if any consumer on this client has a producer id that is in our client, close it - // client.consumers.forEach((consumer) => { - // // check if any producers in our client have the same producer id - // this.client?.producers.forEach((producer) => { - // if (producer.id === consumer.producerId) { - // console.log("[WebRTC] closing consumer", consumer.id); - // consumer.close(); - // } - // }); - // }); - // } + // if any consumer on this client has a producer id that is in our client, close it + client.consumers.forEach((consumer) => { + if ( + client.producers.audio?.id === consumer.producerId || + client.producers.video?.id === consumer.producerId + ) { + console.log("[WebRTC] closing consumer", consumer.id); + consumer.close(); + } + }); + } - // this.client?.producers.forEach((producer) => producer.close()); - // this.client?.consumers.forEach((consumer) => consumer.close()); this.client?.transport?.close(); + this.client?.producers.audio?.close(); + this.client?.producers.video?.close(); this.removeAllListeners(); } diff --git a/src/webrtc/opcodes/Identify.ts b/src/webrtc/opcodes/Identify.ts index cb9e2b10..ae98133b 100644 --- a/src/webrtc/opcodes/Identify.ts +++ b/src/webrtc/opcodes/Identify.ts @@ -124,7 +124,7 @@ export async function onIdentify(this: WebSocket, data: IdentifyPayload) { channel_id: voiceState.channel_id, transport: producerTransport, producers: {}, - consumers: {}, + consumers: [], }; const clients = getClients(voiceState.channel_id)!; diff --git a/src/webrtc/opcodes/Video.ts b/src/webrtc/opcodes/Video.ts index 79e205dd..cdd82d09 100644 --- a/src/webrtc/opcodes/Video.ts +++ b/src/webrtc/opcodes/Video.ts @@ -18,6 +18,7 @@ import { Payload, Send, WebSocket } from "@spacebar/gateway"; import { validateSchema, VoiceVideoSchema } from "@spacebar/util"; +import { types as MediaSoupTypes } from "mediasoup"; import { getClients, getRouter, VoiceOPCodes } from "../util"; // request: @@ -52,8 +53,6 @@ export async function onVideo(this: WebSocket, payload: Payload) { await Send(this, { op: VoiceOPCodes.MEDIA_SINK_WANTS, d: { any: 100 } }); - if (d.audio_ssrc === 0) return; - const router = getRouter(channel_id); if (!router) { console.error(`router not found`); @@ -62,47 +61,122 @@ export async function onVideo(this: WebSocket, payload: Payload) { const transport = this.client.transport!; - const producer = await transport.produce({ - kind: "audio", - rtpParameters: { - codecs: [ - { - payloadType: 109, - mimeType: "audio/opus", - clockRate: 48000, - channels: 2, - rtcpFeedback: [{ type: "nack" }, { type: "transport-cc" }], + let audioProducer: MediaSoupTypes.Producer | undefined = + this.client.producers.audio; + + if (d.audio_ssrc !== 0) { + if (!audioProducer) { + audioProducer = await transport.produce({ + kind: "audio", + rtpParameters: { + codecs: [ + { + payloadType: 109, + mimeType: "audio/opus", + clockRate: 48000, + channels: 2, + rtcpFeedback: [ + { type: "nack" }, + { type: "transport-cc" }, + ], + }, + ], + encodings: [ + { + ssrc: d.audio_ssrc, + }, + ], + // headerExtensions: this.client + // .sdpOffer2!.media[0].ext?.filter((x) => + // SUPPORTED_EXTENTIONS.includes(x.uri), + // ) + // .map((x) => ({ + // uri: x.uri as NMediaSoupTypes.RtpHeaderExtensionUri, + // id: x.value, + // encrypt: false, + // })), }, - ], - encodings: [ - { - ssrc: d.audio_ssrc, - }, - ], - // headerExtensions: this.client - // .sdpOffer2!.media[0].ext?.filter((x) => - // SUPPORTED_EXTENTIONS.includes(x.uri), - // ) - // .map((x) => ({ - // uri: x.uri as NMediaSoupTypes.RtpHeaderExtensionUri, - // id: x.value, - // encrypt: false, - // })), - }, - }); + }); - await producer.enableTraceEvent(["rtp"]); + await audioProducer.enableTraceEvent(["rtp"]); - // producer.on("score", (score) => { - // console.debug(`audio producer score:`, score); - // }); + // producer.on("score", (score) => { + // console.debug(`audio producer score:`, score); + // }); - // producer.on("trace", (trace) => { - // console.debug(`audio producer trace:`, trace); - // }); + // producer.on("trace", (trace) => { + // console.debug(`audio producer trace:`, trace); + // }); - // this.client.producers.push(producer); - this.client.producers.audio = producer; + // this.client.producers.push(producer); + this.client.producers.audio = audioProducer; + } + } + + let videoProducer: MediaSoupTypes.Producer | undefined = + this.client.producers.video; + + if (d.video_ssrc !== 0) { + videoProducer = await transport.produce({ + kind: "video", + rtpParameters: { + codecs: [ + { + payloadType: 120, + mimeType: "video/VP8", + clockRate: 90000, + rtcpFeedback: [ + { type: "nack" }, + { type: "nack", parameter: "pli" }, + { type: "ccm", parameter: "fir" }, + { type: "goog-remb" }, + { type: "transport-cc" }, + ], + }, + { + payloadType: 126, + mimeType: "video/H264", + clockRate: 90000, + parameters: { + "level-asymmetry-allowed": 1, + }, + rtcpFeedback: [ + { type: "nack" }, + { type: "nack", parameter: "pli" }, + { type: "ccm", parameter: "fir" }, + { type: "goog-remb" }, + { type: "transport-cc" }, + ], + }, + ], + encodings: [ + { + ssrc: d.video_ssrc, + rtx: { ssrc: d.rtx_ssrc! }, + }, + ], + // headerExtensions: this.client + // .sdpOffer2!.media[1].ext?.filter((x) => + // SUPPORTED_EXTENTIONS.includes(x.uri), + // ) + // .map((x) => ({ + // uri: x.uri as NMediaSoupTypes.RtpHeaderExtensionUri, + // id: x.value, + // encrypt: false, + // })), + }, + }); + + await videoProducer.enableTraceEvent(["rtp"]); + + videoProducer.on("score", (score) => { + console.debug(`video producer score:`, score); + }); + + videoProducer.on("trace", (trace) => { + console.debug(`video producer trace:`, trace); + }); + } // loop the clients and add a consumer for each one const clients = getClients(channel_id); @@ -110,33 +184,28 @@ export async function onVideo(this: WebSocket, payload: Payload) { if (client.websocket.user_id === this.user_id) continue; if (!client.transport) continue; - const consumer = await client.transport.consume({ - producerId: producer.id, - rtpCapabilities: router.router.rtpCapabilities, - paused: false, - }); - - // listen to any events - for (const event of consumer.eventNames()) { - if (typeof event !== "string") continue; - consumer.on(event as any, (...args) => { - console.debug( - `consumer(producer of ${this.user_id}; ${event}):`, - args, - ); - }); - } - // listen to any events - for (const event of consumer.observer.eventNames()) { - if (typeof event !== "string") continue; - consumer.observer.on(event as any, (...args) => { - console.debug( - `consumer observer(producer of ${this.user_id}; ${event}):`, - args, - ); + if (d.audio_ssrc !== 0) { + // close the existing consumer if it exists + const a = client.consumers.find((x) => x.kind === "audio"); + await a?.close(); + const consumer = await client.transport.consume({ + producerId: audioProducer?.id!, + rtpCapabilities: router.router.rtpCapabilities, + paused: false, }); + client.consumers.push(consumer); } - client.consumers.audio = consumer; + if (d.video_ssrc !== 0) { + // close the existing consumer if it exists + const a = client.consumers.find((x) => x.kind === "video"); + await a?.close(); + const consumer = await client.transport.consume({ + producerId: videoProducer?.id!, + rtpCapabilities: router.router.rtpCapabilities, + paused: false, + }); + client.consumers.push(consumer); + } } } diff --git a/src/webrtc/util/MediaServer.ts b/src/webrtc/util/MediaServer.ts index e9423b20..4a8d98d9 100644 --- a/src/webrtc/util/MediaServer.ts +++ b/src/webrtc/util/MediaServer.ts @@ -78,10 +78,7 @@ export interface Client { audio?: MediaSoupTypes.Producer; video?: MediaSoupTypes.Producer; }; - consumers: { - audio?: MediaSoupTypes.Consumer; - video?: MediaSoupTypes.Consumer; - }; + consumers: MediaSoupTypes.Consumer[]; } export function getClients(channel_id: string) {