diff --git a/taoyao-client-media/src/Taoyao.js b/taoyao-client-media/src/Taoyao.js index 5fdf4bd..7f35109 100644 --- a/taoyao-client-media/src/Taoyao.js +++ b/taoyao-client-media/src/Taoyao.js @@ -1129,13 +1129,10 @@ class Taoyao { /** * 消费数据信令 * - * TODO:重复 - * - * @param {*} message 消息 + * @param {*} message 信令消息 * @param {*} body 消息主体 */ async mediaDataConsume(message, body) { - const me = this; const { roomId, clientId, @@ -1145,7 +1142,7 @@ class Taoyao { transportId, rtpCapabilities, } = body; - const room = me.rooms.get(roomId); + const room = this.rooms.get(roomId); const transport = room?.transports.get(transportId); const dataProducer = room?.dataProducers.get(producerId); if ( @@ -1169,12 +1166,12 @@ class Taoyao { dataConsumer.streamId = streamId; room.dataConsumers.set(dataConsumer.id, dataConsumer); console.debug("创建数据消费者", dataProducer.id, streamId); - dataConsumer.on('transportclose', () => { - console.info("数据消费者关闭(通道关闭)", dataConsumer.id, streamId); + dataConsumer.on("transportclose", () => { + console.debug("数据消费者关闭(通道关闭)", dataConsumer.id, streamId); dataConsumer.close(); }); - dataConsumer.on('dataproducerclose', () => { - console.info("数据消费者关闭(生产者关闭)", dataConsumer.id, streamId); + dataConsumer.on("dataproducerclose", () => { + console.debug("数据消费者关闭(生产者关闭)", dataConsumer.id, streamId); dataConsumer.close(); }); // dataConsumer.on("bufferedamountlow", fn(bufferedAmount)); @@ -1182,7 +1179,7 @@ class Taoyao { dataConsumer.observer.on("close", () => { if(room.dataConsumers.delete(dataConsumer.id)) { console.debug("数据消费者关闭", dataConsumer.id, streamId); - me.push(protocol.buildMessage("media::data::consumer::close", { + this.push(protocol.buildMessage("media::data::consumer::close", { roomId, consumerId: dataConsumer.id, })); @@ -1190,20 +1187,18 @@ class Taoyao { console.debug("数据消费者关闭(数据消费者无效)", dataConsumer.id, streamId); } }); - me.push( - protocol.buildMessage("media::data::consume", { - roomId : roomId, - clientId : clientId, - sourceId : sourceId, - streamId : streamId, - producerId : producerId, - consumerId : dataConsumer.id, - label : dataConsumer.label, - appData : dataProducer.appData, - protocol : dataConsumer.protocol, - sctpStreamParameters: dataConsumer.sctpStreamParameters, - }) - ); + this.push(protocol.buildMessage("media::data::consume", { + roomId : roomId, + clientId : clientId, + sourceId : sourceId, + streamId : streamId, + producerId : producerId, + consumerId : dataConsumer.id, + label : dataConsumer.label, + appData : dataProducer.appData, + protocol : dataConsumer.protocol, + sctpStreamParameters: dataConsumer.sctpStreamParameters, + })); } /** diff --git a/taoyao-client-web/src/components/Taoyao.js b/taoyao-client-web/src/components/Taoyao.js index d9f4a54..74440d1 100644 --- a/taoyao-client-web/src/components/Taoyao.js +++ b/taoyao-client-web/src/components/Taoyao.js @@ -1822,24 +1822,22 @@ class Taoyao extends RemoteClient { * @param {*} producerId 数据生产者ID */ mediaDataConsume(producerId) { - const me = this; - if(!me.recvTransport) { - me.platformError("没有连接接收通道"); + if(!this.recvTransport) { + this.platformError("没有连接接收通道"); return; } - me.push(protocol.buildMessage("media::data::consume", { - roomId : me.roomId, - producerId: producerId, + this.push(protocol.buildMessage("media::data::consume", { + producerId, + roomId: this.roomId, })); } /** * 消费数据信令 * - * @param {*} message 消息 + * @param {*} message 信令消息 */ async defaultMediaDataConsume(message) { - const me = this; const { roomId, clientId, @@ -1853,7 +1851,7 @@ class Taoyao extends RemoteClient { sctpStreamParameters, } = message.body; try { - const dataConsumer = await me.recvTransport.consumeData({ + const dataConsumer = await this.recvTransport.consumeData({ id : consumerId, dataProducerId: producerId, label, @@ -1861,9 +1859,9 @@ class Taoyao extends RemoteClient { protocol, sctpStreamParameters, }); - me.dataConsumers.set(dataConsumer.id, dataConsumer); + this.dataConsumers.set(dataConsumer.id, dataConsumer); dataConsumer.on("open", () => { - console.debug("数据消费者打开", dataConsumer.id); + console.debug("数据消费者打开", dataConsumer.id, streamId); }); dataConsumer.on("transportclose", () => { console.debug("数据消费者关闭(通道关闭)", dataConsumer.id, streamId); @@ -1871,10 +1869,10 @@ class Taoyao extends RemoteClient { }); // dataConsumer.observer.on("close", fn()) dataConsumer.on("close", () => { - if(me.dataConsumers.delete(dataConsumer.id)) { + if(this.dataConsumers.delete(dataConsumer.id)) { console.debug("数据消费者关闭", dataConsumer.id, streamId); } else { - console.debug("数据消费者关闭(无效)", dataConsumer.id, streamId); + console.debug("数据消费者关闭(数据消费者无效)", dataConsumer.id, streamId); } }); dataConsumer.on("error", (error) => { @@ -1884,7 +1882,7 @@ class Taoyao extends RemoteClient { console.debug("数据消费者消息", dataConsumer.id, streamId, message.toString("UTF-8"), ppid); }); } catch (error) { - console.error("打开数据消费者异常", error); + me.platformError("消费数据异常", error); } } diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaConsumeProtocol.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaConsumeProtocol.java index aa10e63..d757648 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaConsumeProtocol.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaConsumeProtocol.java @@ -91,6 +91,9 @@ public class MediaConsumeProtocol extends ProtocolRoomAdapter implements Applica public void execute(String clientId, ClientType clientType, Room room, Client client, Client mediaClient, Message message, Map body) { final String producerId = MapUtils.get(body, Constant.PRODUCER_ID); final Producer producer = room.producer(producerId); + if(producer == null) { + throw MessageCodeException.of("媒体生产者无效:" + producerId); + } if(clientType.isClient()) { // 主动请求消费 || 消费通道准备就绪 this.consume(room, room.clientWrapper(client), producer, message); diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaDataConsumeProtocol.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaDataConsumeProtocol.java index b9c2ec8..72ea3f3 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaDataConsumeProtocol.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaDataConsumeProtocol.java @@ -22,8 +22,6 @@ import lombok.extern.slf4j.Slf4j; /** * 消费数据信令 * - * TODO:防止重复消费 - * * @author acgist */ @Slf4j @@ -55,7 +53,7 @@ public class MediaDataConsumeProtocol extends ProtocolRoomAdapter { final String producerId = MapUtils.get(body, Constant.PRODUCER_ID); final DataProducer dataProducer = room.dataProducer(producerId); if(dataProducer == null) { - throw MessageCodeException.of("没有提供数据生产:" + producerId); + throw MessageCodeException.of("数据生产者无效:" + producerId); } if(clientType.isClient()) { final ClientWrapper dataConsumerClientWrapper = room.clientWrapper(client); @@ -73,6 +71,7 @@ public class MediaDataConsumeProtocol extends ProtocolRoomAdapter { body.put(Constant.RTP_CAPABILITIES, dataConsumerClientWrapper.getRtpCapabilities()); body.put(Constant.SCTP_CAPABILITIES, dataConsumerClientWrapper.getSctpCapabilities()); mediaClient.push(message); + log.info("{}主动消费数据:{} - {}", dataConsumerClientId, dataProducerClientId, streamId); } else if(clientType.isMedia()) { final String streamId = MapUtils.get(body, Constant.STREAM_ID); final String consumerId = MapUtils.get(body, Constant.CONSUMER_ID);