diff --git a/taoyao-client-media/src/Taoyao.js b/taoyao-client-media/src/Taoyao.js index 244ac11..01f5900 100644 --- a/taoyao-client-media/src/Taoyao.js +++ b/taoyao-client-media/src/Taoyao.js @@ -435,6 +435,9 @@ class Taoyao { case "media::data::producer::close": me.mediaDataProducerClose(message, body); break; + case "media::data::producer::status": + me.mediaDataProducerStatus(message, body); + break; case "media::ice::restart": me.mediaIceRestart(message, body); break; @@ -1431,14 +1434,14 @@ class Taoyao { const room = me.rooms.get(roomId); const dataConsumer = room?.dataConsumers.get(consumerId); if(dataConsumer) { - console.debug("查询消费者状态", consumerId); + console.debug("查询数据消费者状态", consumerId); message.body = { ...body, status: await dataConsumer.getStats() }; me.push(message); } else { - console.debug("查询消费者状态(无效)", consumerId); + console.debug("查询数据消费者状态(无效)", consumerId); } } @@ -1483,12 +1486,10 @@ class Taoyao { dataProducer.observer.on("close", () => { if(room.dataProducers.delete(dataProducer.id)) { console.info("数据生产者关闭", dataProducer.id, streamId); - me.push( - taoyaoProtocol.buildMessage("media::data::producer::close", { - roomId : roomId, - producerId: dataProducer.id, - }) - ); + me.push(taoyaoProtocol.buildMessage("media::data::producer::close", { + roomId : roomId, + producerId: dataProducer.id, + })); } else { console.debug("数据生产者关闭(无效)", dataProducer.id, streamId); } @@ -1519,6 +1520,32 @@ class Taoyao { } } + /** + * 查询数据生产者状态信令 + * + * @param {*} message 信令消息 + * @param {*} body 消息主体 + */ + async mediaDataProducerStatus(message, body) { + const me = this; + const { + roomId, + producerId, + } = body; + const room = me.rooms.get(roomId); + const dataProducer = room?.dataProducers.get(producerId); + if(dataProducer) { + console.debug("查询数据生产者状态", producerId); + message.body = { + ...body, + status: await dataProducer.getStats() + }; + me.push(message); + } else { + console.debug("查询数据生产者状态(无效)", producerId); + } + } + /** * 路由RTP协商信令 * diff --git a/taoyao-client-web/src/components/Taoyao.js b/taoyao-client-web/src/components/Taoyao.js index 84f8a88..3f13e12 100644 --- a/taoyao-client-web/src/components/Taoyao.js +++ b/taoyao-client-web/src/components/Taoyao.js @@ -1713,7 +1713,7 @@ class Taoyao extends RemoteClient { */ async mediaDataProducerStatus(producerId) { const me = this; - return await me.request(protocol.buildMessage('media::data::producer::status', { + return await me.request(protocol.buildMessage("media::data::producer::status", { roomId: me.roomId, producerId })); diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaDataProduceProtocol.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaDataProduceProtocol.java index 05b7b0a..6eb893d 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaDataProduceProtocol.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaDataProduceProtocol.java @@ -27,12 +27,12 @@ import lombok.extern.slf4j.Slf4j; body = { """ { - "roomId": "房间标识", + "roomId" : "房间标识", "transportId": "通道标识" } """ }, - flow = "终端->信令服务->媒体服务->信令服务->终端" + flow = "终端=>信令服务->媒体服务" ) public class MediaDataProduceProtocol extends ProtocolRoomAdapter { @@ -65,7 +65,7 @@ public class MediaDataProduceProtocol extends ProtocolRoomAdapter { Constant.STREAM_ID, streamId, Constant.PRODUCER_ID, producerId )); - room.broadcast(responseMessage); + client.push(responseMessage); log.info("{}生产数据:{} - {}", clientId, streamId, producerId); } else { this.logNoAdapter(clientType); diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaDataProducerCloseProtocol.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaDataProducerCloseProtocol.java index 6f74ec3..b0f849b 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaDataProducerCloseProtocol.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaDataProducerCloseProtocol.java @@ -27,13 +27,14 @@ import lombok.extern.slf4j.Slf4j; @Slf4j @Protocol @Description( + memo = "关闭通过回调实现所以不能同步响应", body = """ { - "roomId": "房间ID" + "roomId" : "房间ID" "consumerId": "数据生产者ID" } """, - flow = "终端->信令服务->媒体服务->信令服务+)终端" + flow = "终端->信令服务->媒体服务->信令服务->终端" ) public class MediaDataProducerCloseProtocol extends ProtocolRoomAdapter implements ApplicationListener { @@ -46,10 +47,10 @@ public class MediaDataProducerCloseProtocol extends ProtocolRoomAdapter implemen @Async @Override public void onApplicationEvent(MediaDataProducerCloseEvent event) { - final Room room = event.getRoom(); + final Room room = event.getRoom(); final Client mediaClient = event.getMediaClient(); final Map body = Map.of( - Constant.ROOM_ID, room.getRoomId(), + Constant.ROOM_ID, room.getRoomId(), Constant.PRODUCER_ID, event.getProducerId() ); mediaClient.push(this.build(body)); @@ -57,7 +58,7 @@ public class MediaDataProducerCloseProtocol extends ProtocolRoomAdapter implemen @Override public void execute(String clientId, ClientType clientType, Room room, Client client, Client mediaClient, Message message, Map body) { - final String producerId = MapUtils.get(body, Constant.PRODUCER_ID); + final String producerId = MapUtils.get(body, Constant.PRODUCER_ID); final DataProducer dataProducer = room.dataProducer(producerId); if(dataProducer == null) { log.debug("数据生产者无效:{} - {}", producerId, clientType); @@ -66,9 +67,8 @@ public class MediaDataProducerCloseProtocol extends ProtocolRoomAdapter implemen if(clientType.mediaClient()) { dataProducer.close(); } else if(clientType.mediaServer()) { - // TODO:路由到真实消费者 dataProducer.remove(); - room.broadcast(message); + dataProducer.getProducerClient().push(message); } else { this.logNoAdapter(clientType); } diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaDataProducerStatusProtocol.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaDataProducerStatusProtocol.java index 12d779f..c636de8 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaDataProducerStatusProtocol.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaDataProducerStatusProtocol.java @@ -19,11 +19,11 @@ import com.acgist.taoyao.signal.protocol.ProtocolRoomAdapter; @Description( body = """ { - "roomId": "房间ID", + "roomId" : "房间ID", "producerId": "数据生产者ID" } """, - flow = "终端=>信令服务->媒体服务->信令服务->终端" + flow = "终端=>信令服务->媒体服务" ) public class MediaDataProducerStatusProtocol extends ProtocolRoomAdapter { diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaProduceProtocol.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaProduceProtocol.java index 888c794..2122afd 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaProduceProtocol.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaProduceProtocol.java @@ -70,7 +70,7 @@ public class MediaProduceProtocol extends ProtocolRoomAdapter { Constant.STREAM_ID, streamId, Constant.PRODUCER_ID, producerId )); - room.broadcast(responseMessage); + client.push(responseMessage); log.info("{}生产媒体:{} - {}", clientId, streamId, producerId); this.publishEvent(new MediaConsumeEvent(room, producer)); } else {