diff --git a/taoyao-client-media/.env b/taoyao-client-media/.env index 8db2994..1766951 100644 --- a/taoyao-client-media/.env +++ b/taoyao-client-media/.env @@ -25,4 +25,4 @@ MEDIASOUP_LISTEN_IP=0.0.0.0 # Mediasoup监听端口(起始端口) MEDIASOUP_LISTEN_PORT=44444 # Mediasoup声明地址(不能配置环回地址) -MEDIASOUP_ANNOUNCED_IP=192.168.1.110 \ No newline at end of file +MEDIASOUP_ANNOUNCED_IP=192.168.1.100 \ No newline at end of file diff --git a/taoyao-client-media/src/Taoyao.js b/taoyao-client-media/src/Taoyao.js index b6c7ede..244ac11 100644 --- a/taoyao-client-media/src/Taoyao.js +++ b/taoyao-client-media/src/Taoyao.js @@ -426,6 +426,9 @@ class Taoyao { case "media::data::consumer::close": me.mediaDataConsumerClose(message, body); break; + case "media::data::consumer::status": + me.mediaDataConsumerStatus(message, body); + break; case "media::data::produce": me.mediaDataProduce(message, body); break; @@ -1413,6 +1416,32 @@ class Taoyao { } } + /** + * 查询数据消费者状态信令 + * + * @param {*} message 信令消息 + * @param {*} body 消息主体 + */ + async mediaDataConsumerStatus(message, body) { + const me = this; + const { + roomId, + consumerId, + } = body; + const room = me.rooms.get(roomId); + const dataConsumer = room?.dataConsumers.get(consumerId); + if(dataConsumer) { + console.debug("查询消费者状态", consumerId); + message.body = { + ...body, + status: await dataConsumer.getStats() + }; + me.push(message); + } else { + console.debug("查询消费者状态(无效)", consumerId); + } + } + /** * 生产数据信令 * diff --git a/taoyao-client-web/src/components/Taoyao.js b/taoyao-client-web/src/components/Taoyao.js index c35af9a..84f8a88 100644 --- a/taoyao-client-web/src/components/Taoyao.js +++ b/taoyao-client-web/src/components/Taoyao.js @@ -1668,7 +1668,7 @@ class Taoyao extends RemoteClient { */ async mediaDataConsumerStatus(consumerId) { const me = this; - return await me.request(protocol.buildMessage('media::data::consumer::status', { + return await me.request(protocol.buildMessage("media::data::consumer::status", { roomId: me.roomId, consumerId })); @@ -2384,7 +2384,7 @@ class Taoyao extends RemoteClient { async mediaProduce(audioTrack, videoTrack) { const me = this; if(!audioTrack || !videoTrack) { - me.checkDevice(); + await me.checkDevice(); } await me.createSendTransport(); await me.createRecvTransport(); diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaConsumeProtocol.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaConsumeProtocol.java index 45c1bcb..d7485e9 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaConsumeProtocol.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaConsumeProtocol.java @@ -96,17 +96,17 @@ public class MediaConsumeProtocol extends ProtocolRoomAdapter implements Applica this.consume(room, room.clientWrapper(client), producer, message); } else if(clientType.mediaServer()) { // 媒体通道准备就绪 - final String kind = MapUtils.get(body, Constant.KIND); - final String streamId = MapUtils.get(body, Constant.STREAM_ID); - final String consumerId = MapUtils.get(body, Constant.CONSUMER_ID); + final String kind = MapUtils.get(body, Constant.KIND); + final String streamId = MapUtils.get(body, Constant.STREAM_ID); + final String consumerId = MapUtils.get(body, Constant.CONSUMER_ID); final String consumerClientId = MapUtils.get(body, Constant.CLIENT_ID); - final ClientWrapper consumerClientWrapper = room.clientWrapper(consumerClientId); - final Map roomConsumers = room.getConsumers(); - final Map clientConsumers = consumerClientWrapper.getConsumers(); + final ClientWrapper consumerClientWrapper = room.clientWrapper(consumerClientId); + final Map roomConsumers = room.getConsumers(); + final Map clientConsumers = consumerClientWrapper.getConsumers(); final Map producerConsumers = producer.getConsumers(); - final Consumer consumer = new Consumer(kind, streamId, consumerId, room, producer, consumerClientWrapper); - final Consumer oldRoomConsumer = roomConsumers.put(consumerId, consumer); - final Consumer oldClientConsumer = clientConsumers.put(consumerId, consumer); + final Consumer consumer = new Consumer(kind, streamId, consumerId, room, producer, consumerClientWrapper); + final Consumer oldRoomConsumer = roomConsumers.put(consumerId, consumer); + final Consumer oldClientConsumer = clientConsumers.put(consumerId, consumer); final Consumer oldProducerConsumer = producerConsumers.put(consumerId, consumer); if(oldRoomConsumer != null || oldClientConsumer != null || oldProducerConsumer != null) { log.warn("消费者已经存在:{}", consumerId); @@ -127,26 +127,26 @@ public class MediaConsumeProtocol extends ProtocolRoomAdapter implements Applica * @param message 消息 */ private void consume(Room room, ClientWrapper consumerClientWrapper, Producer producer, Message message) { - final Client mediaClient = room.getMediaClient(); + final Client mediaClient = room.getMediaClient(); final String consumerClientId = consumerClientWrapper.getClientId(); - final String streamId = Constant.STREAM_ID_CONSUMER.apply(producer.getStreamId(), consumerClientId); + final String streamId = Constant.STREAM_ID_CONSUMER.apply(producer.getStreamId(), consumerClientId); final ClientWrapper producerClientWrapper = producer.getProducerClient(); - final String producerClientId = producerClientWrapper.getClientId(); + final String producerClientId = producerClientWrapper.getClientId(); if(consumerClientWrapper.consumed(producer)) { // 消费通道就绪 mediaClient.push(message); log.info("{}消费通道就绪:{}", consumerClientId, streamId); } else { // 主动消费媒体 - final Transport recvTransport = consumerClientWrapper.getRecvTransport(); + final Transport recvTransport = consumerClientWrapper.getRecvTransport(); final Map body = new HashMap<>(); - body.put(Constant.ROOM_ID, room.getRoomId()); - body.put(Constant.CLIENT_ID, consumerClientId); - body.put(Constant.SOURCE_ID, producerClientId); - body.put(Constant.STREAM_ID, streamId); - body.put(Constant.PRODUCER_ID, producer.getProducerId()); - body.put(Constant.TRANSPORT_ID, recvTransport.getTransportId()); - body.put(Constant.RTP_CAPABILITIES, consumerClientWrapper.getRtpCapabilities()); + body.put(Constant.ROOM_ID, room.getRoomId()); + body.put(Constant.CLIENT_ID, consumerClientId); + body.put(Constant.SOURCE_ID, producerClientId); + body.put(Constant.STREAM_ID, streamId); + body.put(Constant.PRODUCER_ID, producer.getProducerId()); + body.put(Constant.TRANSPORT_ID, recvTransport.getTransportId()); + body.put(Constant.RTP_CAPABILITIES, consumerClientWrapper.getRtpCapabilities()); body.put(Constant.SCTP_CAPABILITIES, consumerClientWrapper.getSctpCapabilities()); message.setBody(body); mediaClient.push(message); diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaDataConsumeProtocol.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaDataConsumeProtocol.java index 907fba8..9290bb5 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaDataConsumeProtocol.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaDataConsumeProtocol.java @@ -34,12 +34,12 @@ import lombok.extern.slf4j.Slf4j; """, body = """ { - "roomId": "房间ID" + "roomId" : "房间ID" "producerId": "生产者ID", } """, flow = { - "终端=>信令服务->媒体服务->信令服务->媒体服务" + "终端->信令服务->媒体服务->信令服务->终端" } ) public class MediaDataConsumeProtocol extends ProtocolRoomAdapter { @@ -59,31 +59,31 @@ public class MediaDataConsumeProtocol extends ProtocolRoomAdapter { } if(clientType.mediaClient()) { final ClientWrapper dataConsumerClientWrapper = room.clientWrapper(client); - final String dataConsumerClientId = dataConsumerClientWrapper.getClientId(); + final String dataConsumerClientId = dataConsumerClientWrapper.getClientId(); final ClientWrapper dataProducerClientWrapper = dataProducer.getProducerClient(); - final String dataProducerClientId = dataProducerClientWrapper.getClientId(); - final Transport recvTransport = dataConsumerClientWrapper.getRecvTransport(); - final String streamId = Constant.STREAM_ID_CONSUMER.apply(dataProducer.getStreamId(), dataConsumerClientId); - body.put(Constant.ROOM_ID, room.getRoomId()); - body.put(Constant.CLIENT_ID, dataConsumerClientId); - body.put(Constant.SOURCE_ID, dataProducerClientId); - body.put(Constant.STREAM_ID, streamId); - body.put(Constant.PRODUCER_ID, dataProducer.getProducerId()); - body.put(Constant.TRANSPORT_ID, recvTransport.getTransportId()); - body.put(Constant.RTP_CAPABILITIES, dataConsumerClientWrapper.getRtpCapabilities()); + final String dataProducerClientId = dataProducerClientWrapper.getClientId(); + final Transport recvTransport = dataConsumerClientWrapper.getRecvTransport(); + final String streamId = Constant.STREAM_ID_CONSUMER.apply(dataProducer.getStreamId(), dataConsumerClientId); + body.put(Constant.ROOM_ID, room.getRoomId()); + body.put(Constant.CLIENT_ID, dataConsumerClientId); + body.put(Constant.SOURCE_ID, dataProducerClientId); + body.put(Constant.STREAM_ID, streamId); + body.put(Constant.PRODUCER_ID, dataProducer.getProducerId()); + body.put(Constant.TRANSPORT_ID, recvTransport.getTransportId()); + body.put(Constant.RTP_CAPABILITIES, dataConsumerClientWrapper.getRtpCapabilities()); body.put(Constant.SCTP_CAPABILITIES, dataConsumerClientWrapper.getSctpCapabilities()); mediaClient.push(message); } else if(clientType.mediaServer()) { - final String streamId = MapUtils.get(body, Constant.STREAM_ID); + final String streamId = MapUtils.get(body, Constant.STREAM_ID); final String consumerId = MapUtils.get(body, Constant.CONSUMER_ID); - final String dataConsumerClientId = MapUtils.get(body, Constant.CLIENT_ID); + final String dataConsumerClientId = MapUtils.get(body, Constant.CLIENT_ID); final ClientWrapper dataConsumerClientWrapper = room.clientWrapper(dataConsumerClientId); - final Map roomDataConsumers = room.getDataConsumers(); - final Map clientDataConsumers = dataConsumerClientWrapper.getDataConsumers(); + final Map roomDataConsumers = room.getDataConsumers(); + final Map clientDataConsumers = dataConsumerClientWrapper.getDataConsumers(); final Map producerDataConsumers = dataProducer.getDataConsumers(); - final DataConsumer dataConsumer = new DataConsumer(streamId, consumerId, room, dataProducer, dataConsumerClientWrapper); - final DataConsumer oldDataRoomConsumer = roomDataConsumers.put(consumerId, dataConsumer); - final DataConsumer oldDataClientConsumer = clientDataConsumers.put(consumerId, dataConsumer); + final DataConsumer dataConsumer = new DataConsumer(streamId, consumerId, room, dataProducer, dataConsumerClientWrapper); + final DataConsumer oldDataRoomConsumer = roomDataConsumers.put(consumerId, dataConsumer); + final DataConsumer oldDataClientConsumer = clientDataConsumers.put(consumerId, dataConsumer); final DataConsumer oldDataProducerConsumer = producerDataConsumers.put(consumerId, dataConsumer); if(oldDataRoomConsumer != null || oldDataClientConsumer != null || oldDataProducerConsumer != null) { log.warn("消费者已经存在:{}", consumerId); diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaDataConsumerCloseProtocol.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaDataConsumerCloseProtocol.java index 6c3884f..6656aab 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaDataConsumerCloseProtocol.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaDataConsumerCloseProtocol.java @@ -27,15 +27,17 @@ import lombok.extern.slf4j.Slf4j; @Slf4j @Protocol @Description( + memo = "关闭通过回调实现所以不能同步响应", body = """ { - "roomId": "房间ID" + "roomId" : "房间ID" "consumerId": "数据消费者ID" } """, flow = { - "媒体服务->信令服务-)终端", - "终端->信令服务->媒体服务->信令服务+)终端" + "媒体服务->信令服务->终端", + "信令服务->媒体服务->信令服务->终端", + "终端->信令服务->媒体服务->信令服务->终端" } ) public class MediaDataConsumerCloseProtocol extends ProtocolRoomAdapter implements ApplicationListener { @@ -49,8 +51,8 @@ public class MediaDataConsumerCloseProtocol extends ProtocolRoomAdapter implemen @Async @Override public void onApplicationEvent(MediaDataConsumerCloseEvent event) { - final Room room = event.getRoom(); - final Client mediaClient = event.getMediaClient(); + final Room room = event.getRoom(); + final Client mediaClient = event.getMediaClient(); final Map body = Map.of( Constant.ROOM_ID, room.getRoomId(), Constant.CONSUMER_ID, event.getConsumerId() @@ -60,7 +62,7 @@ public class MediaDataConsumerCloseProtocol extends ProtocolRoomAdapter implemen @Override public void execute(String clientId, ClientType clientType, Room room, Client client, Client mediaClient, Message message, Map body) { - final String consumerId = MapUtils.get(body, Constant.CONSUMER_ID); + final String consumerId = MapUtils.get(body, Constant.CONSUMER_ID); final DataConsumer dataConsumer = room.dataConsumer(consumerId); if(dataConsumer == null) { log.debug("数据消费者无效:{} - {}", consumerId, clientType); @@ -69,9 +71,8 @@ public class MediaDataConsumerCloseProtocol extends ProtocolRoomAdapter implemen if(clientType.mediaClient()) { dataConsumer.close(); } else if(clientType.mediaServer()) { - // TODO:路由到真实消费者 dataConsumer.remove(); - room.broadcast(message); + dataConsumer.getConsumerClient().push(message); } else { this.logNoAdapter(clientType); } diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaDataConsumerStatusProtocol.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaDataConsumerStatusProtocol.java index 9b6cb1a..627b805 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaDataConsumerStatusProtocol.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaDataConsumerStatusProtocol.java @@ -19,11 +19,11 @@ import com.acgist.taoyao.signal.protocol.ProtocolRoomAdapter; @Description( body = """ { - "roomId": "房间ID", + "roomId" : "房间ID", "consumerId": "数据消费者ID" } """, - flow = "终端=>信令服务->媒体服务->信令服务->终端" + flow = "终端=>信令服务->媒体服务" ) public class MediaDataConsumerStatusProtocol extends ProtocolRoomAdapter {