diff --git a/taoyao-client-media/src/Taoyao.js b/taoyao-client-media/src/Taoyao.js index a55cfac..bd0cf62 100644 --- a/taoyao-client-media/src/Taoyao.js +++ b/taoyao-client-media/src/Taoyao.js @@ -804,7 +804,7 @@ class Taoyao { /** * 消费媒体信令 * - * @param {*} message 消息 + * @param {*} message 信令消息 * @param {*} body 消息主体 */ async mediaConsume(message, body) { @@ -818,8 +818,7 @@ class Taoyao { appData, rtpCapabilities, } = body; - const me = this; - const room = me.rooms.get(roomId); + const room = this.rooms.get(roomId); const producer = room?.producers.get(producerId); const transport = room?.transports.get(transportId); if ( @@ -828,8 +827,8 @@ class Taoyao { !transport || !rtpCapabilities || !room.mediasoupRouter.canConsume({ - producerId : producerId, - rtpCapabilities: rtpCapabilities, + producerId, + rtpCapabilities, }) ) { console.warn("不能消费媒体", body); @@ -840,28 +839,21 @@ class Taoyao { for (let i = 0; i < consumerCount; i++) { promises.push( (async () => { - let consumer; - try { - consumer = await transport.consume({ - // 默认暂停 - paused : true, - producerId : producerId, - rtpCapabilities: rtpCapabilities, - }); - } catch (error) { - console.error("创建消费者异常", body, error); - return; - } + const consumer = await transport.consume({ + paused: true, + producerId, + rtpCapabilities, + }); consumer.clientId = clientId; consumer.streamId = streamId; room.consumers.set(consumer.id, consumer); console.debug("创建消费者", consumer.id, streamId); consumer.on("transportclose", () => { - console.info("消费者关闭(通道关闭)", consumer.id, streamId); + console.debug("消费者关闭(通道关闭)", consumer.id, streamId); consumer.close(); }); consumer.on("producerclose", () => { - console.info("消费者关闭(生产者关闭)", consumer.id, streamId); + console.debug("消费者关闭(生产者关闭)", consumer.id, streamId); consumer.close(); }); consumer.on("producerpause", () => { @@ -883,7 +875,7 @@ class Taoyao { // consumer.observer.on("score", fn(score)); consumer.on("score", (score) => { console.debug("消费者评分", consumer.id, streamId, score); - me.push(protocol.buildMessage("media::consumer::score", { + this.push(protocol.buildMessage("media::consumer::score", { score, roomId, consumerId: consumer.id, @@ -892,17 +884,17 @@ class Taoyao { // consumer.observer.on("layerschange", fn(layers)); consumer.on("layerschange", (layers) => { console.debug("消费者空间层和时间层改变", consumer.id, streamId, layers); - me.push(protocol.buildMessage("media::consumer::layers::change", { + this.push(protocol.buildMessage("media::consumer::layers::change", { roomId, consumerId : consumer.id, - spatialLayer : layers ? layers.spatialLayer : null, - temporalLayer: layers ? layers.temporalLayer : null, + spatialLayer : layers?.spatialLayer, + temporalLayer: layers?.temporalLayer, })); }); consumer.observer.on("close", () => { if(room.consumers.delete(consumer.id)) { console.debug("消费者关闭", consumer.id, streamId); - me.push(protocol.buildMessage("media::consumer::close", { + this.push(protocol.buildMessage("media::consumer::close", { roomId, consumerId: consumer.id })); @@ -912,14 +904,14 @@ class Taoyao { }); consumer.observer.on("pause", () => { console.debug("消费者暂停", consumer.id, streamId); - me.push(protocol.buildMessage("media::consumer::pause", { + this.push(protocol.buildMessage("media::consumer::pause", { roomId, consumerId: consumer.id })); }); consumer.observer.on("resume", () => { console.debug("消费者恢复", consumer.id, streamId); - me.push(protocol.buildMessage("media::consumer::resume", { + this.push(protocol.buildMessage("media::consumer::resume", { roomId, consumerId: consumer.id })); @@ -930,7 +922,7 @@ class Taoyao { // console.debug("消费者跟踪事件(trace)", consumer.id, streamId, trace); // }); // 等待终端准备就绪:可以不用等待直接使用push方法 - await me.request(protocol.buildMessage("media::consume", { + await this.request(protocol.buildMessage("media::consume", { roomId, clientId, sourceId, diff --git a/taoyao-client-web/src/components/Taoyao.js b/taoyao-client-web/src/components/Taoyao.js index 6a7c964..689ea89 100644 --- a/taoyao-client-web/src/components/Taoyao.js +++ b/taoyao-client-web/src/components/Taoyao.js @@ -838,6 +838,9 @@ class Taoyao extends RemoteClient { case "media::consumer::close": me.defaultMediaConsumerClose(message, body); break; + case "media::consumer::layers::change": + this.defaultMediaConsumerLayersChange(message, body); + break; case "media::consumer::pause": me.defaultMediaConsumerPause(message, body); break; @@ -1527,14 +1530,13 @@ class Taoyao extends RemoteClient { * @param {*} producerId 生产者ID */ mediaConsume(producerId) { - const me = this; - if(!me.recvTransport) { - me.platformError("没有连接接收通道"); + if(!this.recvTransport) { + this.platformError("没有连接接收通道"); return; } - me.push(protocol.buildMessage("media::consume", { - roomId : me.roomId, - producerId: producerId, + this.push(protocol.buildMessage("media::consume", { + producerId, + roomId: this.roomId, })); } @@ -1550,8 +1552,7 @@ class Taoyao extends RemoteClient { * @param {*} body 消息主体 */ async defaultMediaConsume(message, body) { - const me = this; - if (!me.audioConsume && !me.videoConsume) { + if (!this.audioConsume && !this.videoConsume) { console.debug("没有消费媒体"); return; } @@ -1569,10 +1570,15 @@ class Taoyao extends RemoteClient { producerPaused, } = body; try { - const consumer = await me.recvTransport.consume({ + const consumer = await this.recvTransport.consume({ id: consumerId, - appData: { ...appData, clientId, sourceId, streamId }, - // 让libwebrtc同步相同来源媒体 + appData: { + ...appData, + clientId, + sourceId, + streamId + }, + // libwebrtc同步相同来源媒体 streamId: `${clientId}-${appData.videoSource || "taoyao"}`, kind, producerId, @@ -1581,16 +1587,16 @@ class Taoyao extends RemoteClient { consumer.clientId = clientId; consumer.sourceId = sourceId; consumer.streamId = streamId; - me.consumers.set(consumer.id, consumer); + this.consumers.set(consumer.id, consumer); consumer.on("transportclose", () => { console.debug("消费者关闭(通道关闭)", consumer.id, streamId); consumer.close(); }); consumer.observer.on("close", () => { - if(me.consumers.delete(consumer.id)) { + if(this.consumers.delete(consumer.id)) { console.debug("消费者关闭", consumer.id, streamId); } else { - console.debug("消费者关闭(无效)", consumer.id, streamId); + console.debug("消费者关闭(消费者无效)", consumer.id, streamId); } }); const { @@ -1599,12 +1605,11 @@ class Taoyao extends RemoteClient { } = mediasoupClient.parseScalabilityMode( consumer.rtpParameters.encodings[0].scalabilityMode ); - console.debug("时间层空间层", spatialLayers, temporalLayers); - me.push(message); - console.debug("远程媒体消费者", consumer); + this.push(message); + console.debug("添加远程媒体消费者", consumer, spatialLayers, temporalLayers); const track = consumer.track; - const remoteClient = me.remoteClients.get(consumer.sourceId); - me.callbackTrack(sourceId, track); + const remoteClient = this.remoteClients.get(consumer.sourceId); + this.callbackTrack(sourceId, track); if ( remoteClient && remoteClient.proxy && @@ -1621,10 +1626,10 @@ class Taoyao extends RemoteClient { } remoteClient.proxy.media(track, consumer); } else { - console.warn("远程终端没有实现代理", remoteClient); + console.warn("远程终端没有实现代理", consumer.sourceId, remoteClient); } } catch (error) { - me.platformError("消费媒体异常", error); + this.platformError("消费媒体异常", error); } } @@ -1659,6 +1664,16 @@ class Taoyao extends RemoteClient { consumer.close(); } + /** + * 消费者空间层和时间层改变信令 + * + * @param {*} message 信令消息 + * @param {*} body 信令主体 + */ + defaultMediaConsumerLayersChange(message, body) { + console.debug("消费者空间层和时间层改变", body); + } + /** * 暂停消费者信令 * @@ -1903,7 +1918,7 @@ class Taoyao extends RemoteClient { console.debug("数据消费者消息", dataConsumer.id, streamId, message.toString("UTF-8"), ppid); }); } catch (error) { - me.platformError("消费数据异常", error); + this.platformError("消费数据异常", error); } } diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaConsumeProtocol.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaConsumeProtocol.java index d757648..1e5a9c6 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaConsumeProtocol.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaConsumeProtocol.java @@ -39,14 +39,27 @@ import lombok.extern.slf4j.Slf4j; """, body = """ { - "roomId" : "房间ID" + "roomId" : "房间ID", "producerId": "生产者ID" } + { + "roomId" : "房间ID", + "clientId" : "消费者ID", + "sourceId" : "生产者ID", + "streamId" : "媒体ID", + "producerId" : "生产者ID", + "consumerId" : "消费者ID", + "kind" : "消费者媒体类型", + "type" : "消费者类型", + "appData" : "APP数据", + "rtpParameters" : "RTP参数", + "producerPaused": "生产者是否暂停", + } """, flow = { + "终端->信令服务->媒体服务=>信令服务->终端", "终端-[生产媒体]>信令服务-[消费媒体])信令服务=>信令服务->终端", "终端-[创建WebRTC通道]>信令服务-[消费媒体])信令服务=>信令服务->终端", - "终端->信令服务->媒体服务=>信令服务->终端" } ) public class MediaConsumeProtocol extends ProtocolRoomAdapter implements ApplicationListener { @@ -63,13 +76,13 @@ public class MediaConsumeProtocol extends ProtocolRoomAdapter implements Applica final Room room = event.getRoom(); if(event.getProducer() != null) { // 生产媒体:其他终端消费 - final Producer producer = event.getProducer(); + final Producer producer = event.getProducer(); final ClientWrapper produceClientWrapper = producer.getProducerClient(); room.getClients().values().stream() .filter(v -> v != produceClientWrapper) .filter(v -> v.getRecvTransport() != null) .filter(v -> v.getSubscribeType().canConsume(producer)) - .forEach(v -> this.consume(room, v, producer, this.build())); + .forEach(consumeClientWrapper -> this.consume(room, consumeClientWrapper, producer, this.build())); } else if(event.getClientWrapper() != null) { // 创建WebRTC消费通道:消费其他终端 final ClientWrapper consumeClientWrapper = event.getClientWrapper(); @@ -138,7 +151,7 @@ public class MediaConsumeProtocol extends ProtocolRoomAdapter implements Applica if(consumerClientWrapper.consumed(producer)) { // 消费通道就绪 mediaClient.push(message); - log.info("{}消费通道就绪:{}", consumerClientId, streamId); + log.debug("{}消费通道就绪:{}", consumerClientId, streamId); } else { // 主动消费媒体 final Transport recvTransport = consumerClientWrapper.getRecvTransport();