From dc2334e8b6186a93f7b3f071dc314ef96fbc8b1c Mon Sep 17 00:00:00 2001 From: acgist <289547414@qq.com> Date: Sun, 10 Sep 2023 08:52:22 +0800 Subject: [PATCH] =?UTF-8?q?[*]=20=E6=97=A5=E5=B8=B8=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- taoyao-client-media/src/Taoyao.js | 329 +++++++++--------- taoyao-client-web/src/components/Taoyao.js | 25 +- .../media/MediaProducerStatusProtocol.java | 7 + 3 files changed, 193 insertions(+), 168 deletions(-) diff --git a/taoyao-client-media/src/Taoyao.js b/taoyao-client-media/src/Taoyao.js index 29c85df..a722eb1 100644 --- a/taoyao-client-media/src/Taoyao.js +++ b/taoyao-client-media/src/Taoyao.js @@ -480,12 +480,12 @@ class Taoyao { case "platform::error": me.platformError(message, body); break; - case "room::create": - me.roomCreate(message, body); - break; case "room::close": me.roomClose(message, body); break; + case "room::create": + me.roomCreate(message, body); + break; } } @@ -1005,32 +1005,6 @@ class Taoyao { } } - /** - * 查询生产者状态信令 - * - * @param {*} message 信令消息 - * @param {*} body 消息主体 - */ - async mediaProducerStatus(message, body) { - const me = this; - const { - roomId, - producerId, - } = body; - const room = me.rooms.get(roomId); - const producer = room?.producers.get(producerId); - if(producer) { - console.debug("查询生产者状态", producerId); - message.body = { - ...body, - status: await producer.getStats() - }; - me.push(message); - } else { - console.debug("查询生产者状态(无效)", producerId); - } - } - /** * 消费媒体信令 * @@ -1589,54 +1563,86 @@ class Taoyao { } } + /** + * 查询生产者状态信令 + * + * @param {*} message 信令消息 + * @param {*} body 消息主体 + */ + async mediaProducerStatus(message, body) { + const { + roomId, + producerId, + } = body; + const room = this.rooms.get(roomId); + const producer = room?.producers.get(producerId); + if(!producer) { + console.warn("查询生产者状态(无效生产者)", roomId, producerId); + return; + } + console.debug("查询生产者状态", producerId); + message.body = { + ...body, + status: await producer.getStats() + }; + this.push(message); + } + /** * 路由RTP协商信令 * - * @param {*} message 消息 + * @param {*} message 信令消息 * @param {*} body 消息主体 */ mediaRouterRtpCapabilities(message, body) { - const me = this; - const { roomId } = body; - const room = me.rooms.get(roomId); - message.body.rtpCapabilities = room?.mediasoupRouter.rtpCapabilities; + const { + roomId + } = body; + const room = this.rooms.get(roomId); + message.body = { + ...message.body, + rtpCapabilities: room?.mediasoupRouter.rtpCapabilities + }; this.push(message); } /** * 关闭传输通道信令 * - * @param {*} message 消息 + * @param {*} message 信令消息 * @param {*} body 消息主体 */ async mediaTransportClose(message, body) { - const me = this; - const { roomId, transportId } = body; - const room = me.rooms.get(roomId); + const { + roomId, + transportId + } = body; + const room = this.rooms.get(roomId); const transport = room?.transports.get(transportId); - if(transport) { - console.info("关闭传输通道", transportId); - await transport.close(); - } else { - console.debug("关闭传输通道(无效)", transportId); + if(!transport) { + console.info("关闭传输通道(无效通道)", roomId, transportId); + return; } + console.debug("关闭传输通道", transportId); + await transport.close(); } /** * 创建RTP输入通道信令 * - * @param {*} message 消息 + * @param {*} message 信令消息 * @param {*} body 消息主体 */ async mediaTransportPlainCreate(message, body) { - const me = this; const { roomId, clientId, rtcpMux, comedia, - enableSctp, numSctpStreams, - enableSrtp, srtpCryptoSuite + enableSctp, + numSctpStreams, + enableSrtp, + srtpCryptoSuite } = body; const plainTransportOptions = { ...config.mediasoup.plainTransportOptions, @@ -1647,10 +1653,14 @@ class Taoyao { enableSrtp : enableSrtp, srtpCryptoSuite : srtpCryptoSuite, }; - const room = me.rooms.get(roomId); + const room = this.rooms.get(roomId); + if(!room) { + console.warn("创建RTP输入通道(无效房间)", roomId); + return; + } const transport = await room?.mediasoupRouter.createPlainTransport(plainTransportOptions); - console.info("创建RTP输入通道", transport.id); - me.transportEvent("plain", roomId, transport); + console.debug("创建RTP输入通道", transport.id); + this.transportEvent("plain", roomId, transport); transport.clientId = clientId; room.transports.set(transport.id, transport); message.body = { @@ -1660,39 +1670,38 @@ class Taoyao { port : transport.tuple.localPort, rtcpPort : transport.rtcpTuple?.localPort, }; - me.push(message); + this.push(message); } /** * 查询通道状态信令 * - * @param {*} message 消息 + * @param {*} message 信令消息 * @param {*} body 消息主体 */ async mediaTransportStatus(message, body) { - const me = this; const { roomId, transportId, } = body; - const room = me.rooms.get(roomId); + const room = this.rooms.get(roomId); const transport = room?.transports.get(transportId); - if(transport) { - console.debug("查询通道状态", transportId); - message.body = { - ...body, - status: await transport.getStats() - }; - me.push(message); - } else { - console.debug("查询通道状态(无效)", transportId); + if(!transport) { + console.warn("查询通道状态(无效通道)", roomId, transportId); + return; } + console.debug("查询通道状态", transportId); + message.body = { + ...body, + status: await transport.getStats() + }; + this.push(message); } /** * 连接WebRTC通道信令 * - * @param {*} message 消息 + * @param {*} message 信令消息 * @param {*} body 消息主体 */ async mediaTransportWebrtcConnect(message, body) { @@ -1701,29 +1710,30 @@ class Taoyao { transportId, dtlsParameters } = body; - const room = this.rooms.get(roomId); + const room = this.rooms.get(roomId); const transport = room?.transports.get(transportId); - if(transport) { - await transport.connect({ dtlsParameters }); - console.info("连接WebRTC通道", transportId); - message.body = { - roomId : roomId, - transportId: transport.id - }; - this.push(message); - } else { - console.warn("连接WebRTC通道(无效)", transportId); + if(!transport) { + console.warn("连接WebRTC通道(无效通道)", roomId, transportId); + return; } + await transport.connect({ + dtlsParameters + }); + console.debug("连接WebRTC通道", transportId); + message.body = { + roomId : roomId, + transportId: transport.id + }; + this.push(message); } /** * 创建WebRTC通道信令 * - * @param {*} message 消息 + * @param {*} message 信令消息 * @param {*} body 消息主体 */ async mediaTransportWebrtcCreate(message, body) { - const me = this; const { roomId, clientId, @@ -1734,7 +1744,10 @@ class Taoyao { } = body; const webRtcTransportOptions = { ...config.mediasoup.webRtcTransportOptions, - appData : { producing, consuming }, + appData: { + producing, + consuming + }, enableSctp : Boolean(sctpCapabilities), numSctpStreams: (sctpCapabilities || {}).numStreams, }; @@ -1742,13 +1755,17 @@ class Taoyao { webRtcTransportOptions.enableUdp = false; webRtcTransportOptions.enableTcp = true; } - const room = me.rooms.get(roomId); + const room = this.rooms.get(roomId); + if(!room) { + console.warn("创建WebRTC通道(无效房间)", roomId); + return; + } const transport = await room.mediasoupRouter.createWebRtcTransport({ ...webRtcTransportOptions, webRtcServer: room.webRtcServer, }); - console.info("创建WebRTC通道", transport.id); - me.transportEvent("webrtc", roomId, transport); + console.debug("创建WebRTC通道", transport.id); + this.transportEvent("webrtc", roomId, transport); transport.clientId = clientId; room.transports.set(transport.id, transport); message.body = { @@ -1759,7 +1776,7 @@ class Taoyao { dtlsParameters: transport.dtlsParameters, sctpParameters: transport.sctpParameters, }; - me.push(message); + this.push(message); const { maxOutgoingBitrate, maxIncomingBitrate, @@ -1775,83 +1792,82 @@ class Taoyao { /** * 通道事件 * - * @param {*} type 类型:webrtc|plain|pipe|direct + * @param {*} type 类型:pipe|plain|direct|webrtc * @param {*} roomId 房间ID * @param {*} transport 通道 */ transportEvent(type, roomId, transport) { - const me = this; - const room = me.rooms.get(roomId); + const room = this.rooms.get(roomId); + const transportId = transport.id; /********************* 通用通道事件 *********************/ transport.on("routerclose", () => { - console.info("通道关闭(路由关闭)", transport.id); + console.debug("通道关闭(路由关闭)", roomId, transportId); transport.close(); }); transport.on("listenserverclose", () => { - console.info("通道关闭(监听服务关闭)", transport.id); + console.debug("通道关闭(监听服务关闭)", roomId, transportId); transport.close(); }); transport.observer.on("close", () => { - if(room.transports.delete(transport.id)) { - console.info("通道关闭", transport.id); - me.push( - protocol.buildMessage("media::transport::close", { - roomId : roomId, - transportId: transport.id, - }) - ); + if(room.transports.delete(transportId)) { + console.debug("通道关闭", roomId, transportId); + this.push(protocol.buildMessage("media::transport::close", { + roomId, + transportId, + })); } else { - console.info("通道关闭(无效)", transport.id); + console.info("通道关闭(无效通道)", roomId, transportId); } }); - // transport.observer.on("newproducer", (producer) => {}); - // transport.observer.on("newconsumer", (consumer) => {}); + // transport.observer.on("newproducer", (producer) => {}); + // transport.observer.on("newconsumer", (consumer) => {}); // transport.observer.on("newdataproducer", (dataProducer) => {}); // transport.observer.on("newdataconsumer", (dataConsumer) => {}); - // transport.observer.on("trace", fn(trace)); + // 设置追踪信息 // await transport.enableTraceEvent([ 'bwe', 'probation' ]); - // transport.on("trace", (trace) => { - // console.debug("通道跟踪事件(trace)", transport.id, trace); - // }); - /********************* webRtcTransport通道事件 *********************/ - if("webrtc" === type) { - // transport.on("icestatechange", (iceState) => {}); - // transport.on("iceselectedtuplechange", (iceSelectedTuple) => {}); - // transport.on("dtlsstatechange", (dtlsState) => {}); - // transport.on("sctpstatechange", (sctpState) => {}); - // transport.observer.on("icestatechange", fn(iceState)); - // transport.observer.on("iceselectedtuplechange", fn(iceSelectedTuple)); - // transport.observer.on("dtlsstatechange", fn(dtlsState)); + // transport.on("trace", (trace) => {}); + // transport.observer.on("trace", fn(trace)); + /********************* pipeTransport通道事件 *********************/ + if("pipe" === type) { + // transport.on("sctpstatechange", fn(sctpState)); // transport.observer.on("sctpstatechange", fn(sctpState)); } /********************* plainTransport通道事件 *********************/ if("plain" === type) { - // transport.on("tuple", fn(tuple)); - // transport.on("rtcptuple", fn(rtcpTuple)); - // transport.on("sctpstatechange", fn(sctpState)); - // transport.observer.on("tuple", fn(tuple)); - // transport.observer.on("rtcptuple", fn(rtcpTuple)); - // transport.observer.on("sctpstatechange", fn(sctpState)); - } - /********************* pipeTransport通道事件 *********************/ - if("pipe" === type) { - // transport.on("sctpstatechange", fn(sctpState)); + // transport.on("tuple", fn(tuple)); + // transport.on("rtcptuple", fn(rtcpTuple)); + // transport.on("sctpstatechange", fn(sctpState)); + // transport.observer.on("tuple", fn(tuple)); + // transport.observer.on("rtcptuple", fn(rtcpTuple)); // transport.observer.on("sctpstatechange", fn(sctpState)); } /********************* directTransport通道事件 *********************/ - if("rtcp" === type) { + if("direct" === type) { // transport.on("rtcp", fn(rtcpPacket)); } + /********************* webRtcTransport通道事件 *********************/ + if("webrtc" === type) { + // transport.on("icestatechange", (iceState) => {}); + // transport.on("iceselectedtuplechange", (iceSelectedTuple) => {}); + // transport.on("dtlsstatechange", (dtlsState) => {}); + // transport.on("sctpstatechange", (sctpState) => {}); + // transport.observer.on("icestatechange", fn(iceState)); + // transport.observer.on("iceselectedtuplechange", fn(iceSelectedTuple)); + // transport.observer.on("dtlsstatechange", fn(dtlsState)); + // transport.observer.on("sctpstatechange", fn(sctpState)); + } } /** * 平台异常信令 * - * @param {*} message 消息 + * @param {*} message 信令消息 * @param {*} body 消息主体 */ platformError(message, body) { - const { code } = message; + const { + code + } = message; if(code === "3401") { signalChannel.close(); console.warn("授权异常(关闭信令)", message); @@ -1863,15 +1879,16 @@ class Taoyao { /** * 关闭房间信令 * - * @param {*} message 消息 + * @param {*} message 信令消息 * @param {*} body 消息主体 */ async roomClose(message, body) { - const me = this; - const { roomId } = body; - const room = me.rooms.get(roomId); + const { + roomId + } = body; + const room = this.rooms.get(roomId); if(!room) { - console.debug("关闭房间(无效)", roomId); + console.info("关闭房间(无效房间)", roomId); return; } room.closeAll(); @@ -1880,21 +1897,25 @@ class Taoyao { /** * 创建房间信令 * - * @param {*} message 消息 + * @param {*} message 信令消息 * @param {*} body 消息主体 */ async roomCreate(message, body) { - const me = this; - const { roomId } = body; - let room = me.rooms.get(roomId); - if (room) { - console.debug("创建房间已经存在", room); - me.push(message); + const { + roomId + } = body; + if (this.rooms.has(roomId)) { + console.warn("创建房间(已经存在)", roomId); + this.push(message); return; } - const mediasoupWorker = me.nextMediasoupWorker(); - const { mediaCodecs } = config.mediasoup.routerOptions; - const mediasoupRouter = await mediasoupWorker.createRouter({ mediaCodecs }); + const { + mediaCodecs + } = config.mediasoup.routerOptions; + const mediasoupWorker = this.nextMediasoupWorker(); + const mediasoupRouter = await mediasoupWorker.createRouter({ + mediaCodecs + }); // 音量监控 const audioLevelObserver = await mediasoupRouter.createAudioLevelObserver({ // 监控周期 @@ -1908,35 +1929,33 @@ class Taoyao { const activeSpeakerObserver = await mediasoupRouter.createActiveSpeakerObserver({ interval: 500, }); - room = new Room({ + const room = new Room({ roomId, - taoyao: me, - webRtcServer: mediasoupWorker.appData.webRtcServer, mediasoupRouter, audioLevelObserver, activeSpeakerObserver, + taoyao : this, + webRtcServer: mediasoupWorker.appData.webRtcServer, }); - console.info("创建房间", roomId, mediasoupRouter.id); - me.rooms.set(roomId, room); - me.push(message); + console.debug("创建房间", roomId, mediasoupRouter.id); + this.rooms.set(roomId, room); + this.push(message); mediasoupRouter.on("workerclose", () => { - console.info("路由关闭(工作线程关闭)", roomId, mediasoupRouter.id); + console.debug("路由关闭(工作线程关闭)", roomId, mediasoupRouter.id); mediasoupRouter.close(); }); mediasoupRouter.observer.on("close", () => { - if(me.rooms.delete(roomId)) { - console.info("路由关闭", roomId, mediasoupRouter.id); + if(this.rooms.delete(roomId)) { + console.debug("路由关闭", roomId, mediasoupRouter.id); room.closeAll(); - me.push( - protocol.buildMessage("room::close", { - roomId: roomId - }) - ); + this.push(protocol.buildMessage("room::close", { + roomId + })); } else { - console.info("路由关闭(无效)", roomId, mediasoupRouter.id); + console.info("路由关闭(无效房间)", roomId, mediasoupRouter.id); } }); - // mediasoupRouter.observer.on("newtransport", (transport) => {}); + // mediasoupRouter.observer.on("newtransport", (transport) => {}); // mediasoupRouter.observer.on("newrtpobserver", (rtpObserver) => {}); } }; diff --git a/taoyao-client-web/src/components/Taoyao.js b/taoyao-client-web/src/components/Taoyao.js index c9bbd6e..2f3d2a8 100644 --- a/taoyao-client-web/src/components/Taoyao.js +++ b/taoyao-client-web/src/components/Taoyao.js @@ -1905,19 +1905,6 @@ class Taoyao extends RemoteClient { console.debug("生产者评分", message); } - /** - * 查询生产者状态信令 - * - * @param {*} producerId 生产者ID - */ - async mediaProducerStatus(producerId) { - const me = this; - return await me.request(protocol.buildMessage('media::producer::status', { - roomId: me.roomId, - producerId - })); - } - /** * 消费媒体信令 * @@ -2481,6 +2468,18 @@ class Taoyao extends RemoteClient { } } + /** + * 查询生产者状态信令 + * + * @param {*} producerId 生产者ID + */ + async mediaProducerStatus(producerId) { + return await this.request(protocol.buildMessage("media::producer::status", { + producerId, + roomId: this.roomId, + })); + } + /** * 关闭通道信令 * diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaProducerStatusProtocol.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaProducerStatusProtocol.java index a6f61d8..72c623c 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaProducerStatusProtocol.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaProducerStatusProtocol.java @@ -22,6 +22,13 @@ import com.acgist.taoyao.signal.protocol.ProtocolRoomAdapter; "roomId" : "房间ID", "producerId": "生产者ID" } + { + "roomId" : "房间ID", + "producerId": "生产者ID", + "status" : [ + ...状态信息 + ] + } """, flow = "终端=>信令服务->媒体服务" )