From 49805077ec96d7c8b69c27fcb47c710054a62525 Mon Sep 17 00:00:00 2001 From: acgist <289547414@qq.com> Date: Thu, 3 Aug 2023 08:17:14 +0800 Subject: [PATCH] =?UTF-8?q?[*]=20=E6=97=A5=E5=B8=B8=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- taoyao-client-media/src/Taoyao.js | 1 + taoyao-client-web/src/components/Taoyao.js | 23 +++++++++++++------ .../signal/party/room/ClientWrapper.java | 21 +++++++++++++++++ .../media/MediaConsumerCloseProtocol.java | 11 +++++---- .../media/MediaConsumerPauseProtocol.java | 18 ++++++++++----- .../MediaConsumerRequestKeyFrameProtocol.java | 4 ++-- .../media/MediaConsumerResumeProtocol.java | 18 ++++++++++----- 7 files changed, 70 insertions(+), 26 deletions(-) diff --git a/taoyao-client-media/src/Taoyao.js b/taoyao-client-media/src/Taoyao.js index a094f36..2d5c021 100644 --- a/taoyao-client-media/src/Taoyao.js +++ b/taoyao-client-media/src/Taoyao.js @@ -905,6 +905,7 @@ class Taoyao { // }) // ); if (producer.kind === "audio") { + // TODO:关闭生产者时移除监听 room.audioLevelObserver .addProducer({ producerId: producer.id }) .catch((error) => { diff --git a/taoyao-client-web/src/components/Taoyao.js b/taoyao-client-web/src/components/Taoyao.js index 2f02e5f..be4f94f 100644 --- a/taoyao-client-web/src/components/Taoyao.js +++ b/taoyao-client-web/src/components/Taoyao.js @@ -1439,7 +1439,10 @@ class Taoyao extends RemoteClient { */ defaultMediaConsumerClose(message) { const me = this; - const { roomId, consumerId } = message.body; + const { + roomId, + consumerId + } = message.body; const consumer = me.consumers.get(consumerId); if (consumer) { console.debug("关闭消费者", consumerId); @@ -1455,7 +1458,7 @@ class Taoyao extends RemoteClient { * * @param {*} consumerId 消费者ID */ - mediaConsumerPause(consumerId) { + async mediaConsumerPause(consumerId) { const me = this; const consumer = me.consumers.get(consumerId); if(consumer) { @@ -1463,7 +1466,7 @@ class Taoyao extends RemoteClient { return; } console.debug("暂停消费者", consumerId); - me.push(protocol.buildMessage("media::consumer::pause", { + await me.request(protocol.buildMessage("media::consumer::pause", { roomId : me.roomId, consumerId: consumerId, })); @@ -1479,7 +1482,10 @@ class Taoyao extends RemoteClient { */ defaultMediaConsumerPause(message) { const me = this; - const { roomId, consumerId } = message.body; + const { + roomId, + consumerId + } = message.body; const consumer = me.consumers.get(consumerId); if (consumer) { console.debug("暂停消费者", consumerId); @@ -1516,7 +1522,7 @@ class Taoyao extends RemoteClient { * * @param {*} consumerId 消费者ID */ - mediaConsumerResume(consumerId) { + async mediaConsumerResume(consumerId) { const me = this; const consumer = me.consumers.get(consumerId); if(consumer) { @@ -1524,7 +1530,7 @@ class Taoyao extends RemoteClient { return; } console.debug("恢复消费者", consumerId); - me.push(protocol.buildMessage("media::consumer::resume", { + await me.request(protocol.buildMessage("media::consumer::resume", { roomId : me.roomId, consumerId: consumerId, })); @@ -1540,7 +1546,10 @@ class Taoyao extends RemoteClient { */ defaultMediaConsumerResume(message) { const me = this; - const { roomId, consumerId } = message.body; + const { + roomId, + consumerId + } = message.body; const consumer = me.consumers.get(consumerId); if (consumer) { console.debug("恢复消费者", consumerId); diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/party/room/ClientWrapper.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/party/room/ClientWrapper.java index 9d7aca1..2dc6a2f 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/party/room/ClientWrapper.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/party/room/ClientWrapper.java @@ -3,6 +3,7 @@ package com.acgist.taoyao.signal.party.room; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import com.acgist.taoyao.boot.model.Message; import com.acgist.taoyao.signal.client.Client; import com.acgist.taoyao.signal.party.media.Consumer; import com.acgist.taoyao.signal.party.media.DataConsumer; @@ -118,6 +119,26 @@ public class ClientWrapper implements AutoCloseable { .anyMatch(v -> v.getDataProducer() == dataProducer); } + /** + * 推送消息 + * + * @param message 消息 + */ + public void push(Message message) { + this.client.push(message); + } + + /** + * 请求消息 + * + * @param message 请求 + * + * @return 响应 + */ + public Message request(Message message) { + return this.client.request(message); + } + @Override public void close() { // 注意:不要关闭终端(只是离开房间) diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaConsumerCloseProtocol.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaConsumerCloseProtocol.java index 62409f9..d904f66 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaConsumerCloseProtocol.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaConsumerCloseProtocol.java @@ -27,15 +27,17 @@ import lombok.extern.slf4j.Slf4j; @Slf4j @Protocol @Description( + memo = "关闭通过回调实现所以不能同步响应", body = """ { - "roomId": "房间ID" + "roomId" : "房间ID" "consumerId": "消费者ID" } """, flow = { - "媒体服务->信令服务-)终端", - "终端->信令服务->媒体服务->信令服务+)终端" + "媒体服务->信令服务->终端", + "信令服务->媒体服务->信令服务->终端", + "终端->信令服务->媒体服务->信令服务->终端" } ) public class MediaConsumerCloseProtocol extends ProtocolRoomAdapter implements ApplicationListener { @@ -69,9 +71,8 @@ public class MediaConsumerCloseProtocol extends ProtocolRoomAdapter implements A if(clientType.mediaClient()) { consumer.close(); } else if(clientType.mediaServer()) { - // TODO:路由到真实消费者 consumer.remove(); - room.broadcast(message); + consumer.getConsumerClient().push(message); } else { this.logNoAdapter(clientType); } diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaConsumerPauseProtocol.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaConsumerPauseProtocol.java index 20f2d63..41e05b2 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaConsumerPauseProtocol.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaConsumerPauseProtocol.java @@ -17,20 +17,23 @@ import com.acgist.taoyao.signal.party.media.Consumer; import com.acgist.taoyao.signal.party.room.Room; import com.acgist.taoyao.signal.protocol.ProtocolRoomAdapter; +import lombok.extern.slf4j.Slf4j; + /** * 暂停消费者信令 * * @author acgist */ +@Slf4j @Protocol @Description( body = """ { - "roomId": "房间ID" + "roomId" : "房间ID" "consumerId": "消费者ID" } """, - flow = "终端->信令服务->媒体服务->信令服务->终端" + flow = "终端=>信令服务->媒体服务" ) public class MediaConsumerPauseProtocol extends ProtocolRoomAdapter implements ApplicationListener { @@ -54,13 +57,16 @@ public class MediaConsumerPauseProtocol extends ProtocolRoomAdapter implements A @Override public void execute(String clientId, ClientType clientType, Room room, Client client, Client mediaClient, Message message, Map body) { + final String consumerId = MapUtils.get(body, Constant.CONSUMER_ID); + final Consumer consumer = room.consumer(consumerId); + if(consumer == null) { + log.debug("消费者无效:{} - {}", consumerId, clientType); + return; + } if(clientType.mediaClient()) { - final String consumerId = MapUtils.get(body, Constant.CONSUMER_ID); - final Consumer consumer = room.consumer(consumerId); consumer.pause(); } else if(clientType.mediaServer()) { - // TODO:路由到真实消费者 - room.broadcast(message); + consumer.getConsumerClient().push(message); } else { this.logNoAdapter(clientType); } diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaConsumerRequestKeyFrameProtocol.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaConsumerRequestKeyFrameProtocol.java index 59bb1b0..d5d23cb 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaConsumerRequestKeyFrameProtocol.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaConsumerRequestKeyFrameProtocol.java @@ -21,11 +21,11 @@ import com.acgist.taoyao.signal.protocol.ProtocolRoomAdapter; @Description( body = """ { - "roomId": "房间ID", + "roomId" : "房间ID", "consumerId": "消费者ID" } """, - flow = "终端->信令服务->媒体服务->信令服务->终端" + flow = "终端->信令服务->媒体服务" ) public class MediaConsumerRequestKeyFrameProtocol extends ProtocolRoomAdapter { diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaConsumerResumeProtocol.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaConsumerResumeProtocol.java index 193f164..02d1179 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaConsumerResumeProtocol.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaConsumerResumeProtocol.java @@ -17,20 +17,23 @@ import com.acgist.taoyao.signal.party.media.Consumer; import com.acgist.taoyao.signal.party.room.Room; import com.acgist.taoyao.signal.protocol.ProtocolRoomAdapter; +import lombok.extern.slf4j.Slf4j; + /** * 恢复消费者信令 * * @author acgist */ +@Slf4j @Protocol @Description( body = """ { - "roomId": "房间ID" + "roomId" : "房间ID" "consumerId": "消费者ID" } """, - flow = "终端->信令服务->媒体服务->信令服务->终端" + flow = "终端=>信令服务->媒体服务" ) public class MediaConsumerResumeProtocol extends ProtocolRoomAdapter implements ApplicationListener { @@ -54,13 +57,16 @@ public class MediaConsumerResumeProtocol extends ProtocolRoomAdapter implements @Override public void execute(String clientId, ClientType clientType, Room room, Client client, Client mediaClient, Message message, Map body) { + final String consumerId = MapUtils.get(body, Constant.CONSUMER_ID); + final Consumer consumer = room.consumer(consumerId); + if(consumer == null) { + log.debug("消费者无效:{} - {}", consumerId, clientType); + return; + } if(clientType.mediaClient()) { - final String consumerId = MapUtils.get(body, Constant.CONSUMER_ID); - final Consumer consumer = room.consumer(consumerId); consumer.resume(); } else if(clientType.mediaServer()) { - // TODO:路由到真实消费者 - room.broadcast(message); + consumer.getConsumerClient().push(message); } else { this.logNoAdapter(clientType); }