const Logger = require("./Logger"); const config = require("./Config"); /** * 信令协议 */ const protocol = { // 当前索引 index: 100000, // 最小索引 minIndex: 100000, // 最大索引 maxIndex: 999999, /** * @returns 索引 */ buildId: function () { if (this.index++ >= this.maxIndex) { this.index = this.minIndex; } return Date.now() + "" + this.index; }, /** * 生成信令消息 * * @param {*} signal 信令标识 * @param {*} roomId 房间标识 * @param {*} body 信令消息 * @param {*} id ID * * @returns 信令消息 */ buildMessage: function (signal, roomId, body = {}, id) { const message = { header: { v: config.version, id: id || this.buildId(), signal: signal, }, body: { roomId, ...body }, }; return message; }, }; /** * Peer */ class Peer { peerId; device; displayName; rtpCapabilities; sctpCapabilities; transports = new Map(); producers = new Map(); consumers = new Map(); dataProducers = new Map(); dataConsumers = new Map(); } /** * 房间 */ class Room { // 是否关闭 close = false; // 终端 peers = new Map(); // 房间ID roomId = null; // 网络节流 networkThrottled = false; // 信令 signal = null; // WebRTCServer webRtcServer = null; // 路由 mediasoupRouter = null; // 音频监控 audioLevelObserver = null; // 音频监控 activeSpeakerObserver = null; constructor({ roomId, signal, webRtcServer, mediasoupRouter, audioLevelObserver, activeSpeakerObserver }) { this.close = false; this.roomId = roomId; this.networkThrottled = false; this.signal = signal; this.webRtcServer = webRtcServer; this.mediasoupRouter = mediasoupRouter; this.audioLevelObserver = audioLevelObserver; this.activeSpeakerObserver = activeSpeakerObserver; this.handleAudioLevelObserver(); this.handleActiveSpeakerObserver(); } handleAudioLevelObserver() { // 声音 this.audioLevelObserver.on("volumes", (volumes) => { for(const value of volumes) { const { producer, volume } = value; this.signal.push(protocol.buildMessage( "audio::active::speaker", this.roomId, { peerId: producer.appData.peerId, volume: volume } )); } }); // 静音 this.audioLevelObserver.on("silence", () => { this.signal.push(protocol.buildMessage( "audio::active::speaker", this.roomId, { peerId: null } )); }); } handleActiveSpeakerObserver() { this.activeSpeakerObserver.on("dominantspeaker", (dominantSpeaker) => { logger.debug("dominantspeaker", dominantSpeaker.producer.id); }); } close() { this.close = true; if(this.mediasoupRouter) { this.mediasoupRouter.close(); } } } /** * 信令 */ class Signal { // 房间列表 rooms = new Map(); // 信令终端列表 clients = []; // 日志 logger = new Logger(); // Worker列表 mediasoupWorkers = []; // Worker索引 nextMediasoupWorkerIndex = 0; constructor(mediasoupWorkers) { this.mediasoupWorkers = mediasoupWorkers; } /** * 处理事件 * * @param {*} message 消息 * @param {*} session websocket */ on(message, session) { // 授权验证 if (!session.authorize) { if ( message?.header?.signal === "media::register" && message?.body?.username === config.https.username && message?.body?.password === config.https.password ) { this.logger.debug("授权成功", session._socket.remoteAddress); this.clients.push(session); session.authorize = true; message.code = "0000"; message.message = "授权成功"; message.body.username = null; message.body.password = null; } else { this.logger.warn("授权失败", session._socket.remoteAddress); message.code = "3401"; message.message = "授权失败"; } this.push(message, session); return; } // 处理信令 switch (message.header.signal) { case "router::rtp::capabilities": this.routerRtpCapabilities(session, message); break; case "room::create": this.roomCreate(session, message, message.body); break; case "transport:webrtc::create": this.transportWebrtcCreate(session, message, message.body); break; } } /** * 通知信令 * * @param {*} message 消息 * @param {*} session 信令通道 */ push(message, session) { if(session) { try { session.send(JSON.stringify(message)); } catch (error) { logger.error( "通知信令失败", session._socket.remoteAddress, message, error ); } } else { this.clients.forEach((session) => this.push(message, session)); } } /** * @returns 下个Meidasoup Worker */ nextMediasoupWorker() { const worker = this.mediasoupWorkers[this.nextMediasoupWorkerIndex]; if (++this.nextMediasoupWorkerIndex === this.mediasoupWorkers.length) { this.nextMediasoupWorkerIndex = 0; } return worker; } /** * @param {*} message 消息 * * @returns 房间 */ selectRoom(message) { return this.rooms.get(message.body.roomId); } /** * 路由RTP能力信令 * * @param {*} session 信令通道 * @param {*} message 消息 */ routerRtpCapabilities(session, message) { const room = this.selectRoom(message); message.body = room.mediasoupRouter.rtpCapabilities; this.push(message, session); } /** * 创建房间信令 * * @param {*} session 信令通道 * @param {*} message 消息 * @param {*} body 消息主体 * * @returns 房间 */ async roomCreate(session, message, body) { const roomId = body.roomId; let room = this.rooms.get(roomId); if (room) { return room; } const mediasoupWorker = this.nextMediasoupWorker(); const { mediaCodecs } = config.mediasoup.routerOptions; const mediasoupRouter = await mediasoupWorker.createRouter({ mediaCodecs }); mediasoupRouter.on("workerclose", () => { // TODO:通知房间关闭 }); mediasoupRouter.observer.on("close", () => { // TODO:通知房间关闭 }); // TODO:下面两个监控改为配置启用 const audioLevelObserver = await mediasoupRouter.createAudioLevelObserver({ maxEntries: 1, threshold: -80, interval: 2000 }); const activeSpeakerObserver = await mediasoupRouter.createActiveSpeakerObserver({ interval: 500 }); room = new Room({ roomId, webRtcServer: mediasoupWorker.appData.webRtcServer, mediasoupRouter, audioLevelObserver, activeSpeakerObserver }); this.rooms.set(roomId, room); this.logger.info("创建房间", roomId, room); this.push(message, session); return room; } /** * @param {*} session 信令通道 * @param {*} message 消息 * @param {*} body 消息主体 */ transportWebrtcCreate(session, message, body) { const { roomId, forceTcp, producing, consuming, sctpCapabilities } = body; const webRtcTransportOptions = { ...config.mediasoup.webRtcTransportOptions, enableSctp : Boolean(sctpCapabilities), numSctpStreams : (sctpCapabilities || {}).numStreams, appData : { producing, consuming } }; if (forceTcp) { webRtcTransportOptions.enableUdp = false; webRtcTransportOptions.enableTcp = true; } const room = this.rooms.get(roomId); const transport = await room.mediasoupRouter.createWebRtcTransport( { ...webRtcTransportOptions, webRtcServer : room.webRtcServer }); transport.on('sctpstatechange', (sctpState) => { logger.debug('WebRtcTransport "sctpstatechange" event [sctpState:%s]', sctpState); }); transport.on('dtlsstatechange', (dtlsState) => { if (dtlsState === 'failed' || dtlsState === 'closed') logger.warn('WebRtcTransport "dtlsstatechange" event [dtlsState:%s]', dtlsState); }); // NOTE: For testing. // await transport.enableTraceEvent([ 'probation', 'bwe' ]); await transport.enableTraceEvent([ 'bwe' ]); transport.on('trace', (trace) => { logger.debug( 'transport "trace" event [transportId:%s, trace.type:%s, trace:%o]', transport.id, trace.type, trace); if (trace.type === 'bwe' && trace.direction === 'out') { peer.notify( 'downlinkBwe', { desiredBitrate : trace.info.desiredBitrate, effectiveDesiredBitrate : trace.info.effectiveDesiredBitrate, availableBitrate : trace.info.availableBitrate }) .catch(() => {}); } }); // Store the WebRtcTransport into the protoo Peer data Object. peer.data.transports.set(transport.id, transport); self.push( { id : transport.id, iceParameters : transport.iceParameters, iceCandidates : transport.iceCandidates, dtlsParameters : transport.dtlsParameters, sctpParameters : transport.sctpParameters }, session); const { maxIncomingBitrate } = config.mediasoup.webRtcTransportOptions; // If set, apply max incoming bitrate limit. if (maxIncomingBitrate) { try { await transport.setMaxIncomingBitrate(maxIncomingBitrate); } catch (error) {} } } } module.exports = Signal;