diff --git a/taoyao-client-media/src/Taoyao.js b/taoyao-client-media/src/Taoyao.js index 1e0f320..69ce75b 100644 --- a/taoyao-client-media/src/Taoyao.js +++ b/taoyao-client-media/src/Taoyao.js @@ -393,6 +393,9 @@ class Taoyao { case "media::consume": this.mediaConsume(message, body); break; + case "media::consumer::close": + me.mediaConsumerClose(message, body); + break; case "media::produce": this.mediaProduce(message, body); break; @@ -470,7 +473,7 @@ class Taoyao { const usage = await worker.getResourceUsage(); console.info("Worker使用情况:", worker.pid, usage); } - console.info("路由数量:", this.mediasoupWorkers.length); + console.info("工作线程数量:", this.mediasoupWorkers.length); console.info("房间数量:", this.rooms.size); Array.from(this.rooms.values()).forEach((room) => room.usage()); } @@ -656,15 +659,16 @@ class Taoyao { consumer.streamId = streamId; room.consumers.set(consumer.id, consumer); consumer.on("transportclose", () => { - room.consumers.delete(consumer.id); + console.info("通道关闭同时关闭消费者:", consumer.id); + // 信令服务统一调度关闭 + // consumer.close(); + // room.consumers.delete(consumer.id); }); consumer.on("producerclose", () => { - room.consumers.delete(consumer.id); - this.push( - protocol.buildMessage("media::consumer::close", { - consumerId: consumer.id, - }) - ); + console.info("生产者关闭同时关闭消费者:", consumer.id); + // 信令服务统一调度关闭 + // consumer.close(); + // room.consumers.delete(consumer.id); }); consumer.on("producerpause", () => { this.push( @@ -683,8 +687,9 @@ class Taoyao { consumer.on("score", (score) => { this.push( protocol.buildMessage("media::consumer::score", { + score: score, + roomId: roomId, consumerId: consumer.id, - score, }) ); }); @@ -705,10 +710,18 @@ class Taoyao { trace ); }); + consumer.observer.on("close", () => { + this.push( + protocol.buildMessage("media::consumer::close", { + roomId: roomId, + consumerId: consumer.id + }) + ); + }); // TODO:改为同步 + //await this.request("media::consume", { this.push( protocol.buildMessage("media::consume", { - //await this.request("media::consume", { kind: consumer.kind, type: consumer.type, roomId: roomId, @@ -725,8 +738,9 @@ class Taoyao { await consumer.resume(); this.push( protocol.buildMessage("media::consumer::score", { - consumerId: consumer.id, score: consumer.score, + roomId: roomId, + consumerId: consumer.id, }) ); })() @@ -740,6 +754,25 @@ class Taoyao { } } + /** + * 关闭消费者信令 + * + * @param {*} message 消息 + * @param {*} body 消息主体 + */ + mediaConsumerClose(message, body) { + const { roomId, consumerId } = body; + const room = this.rooms.get(roomId); + const consumer = room.consumers.get(consumerId); + if(consumer) { + console.info("关闭消费者:", consumerId); + consumer.close(); + room.consumers.delete(consumerId); + } else { + console.debug("关闭消费者无效:", consumerId); + } + } + /** * 路由RTP协商信令 * diff --git a/taoyao-client-web/src/components/RemoteClient.vue b/taoyao-client-web/src/components/RemoteClient.vue index 9452c74..f68da21 100644 --- a/taoyao-client-web/src/components/RemoteClient.vue +++ b/taoyao-client-web/src/components/RemoteClient.vue @@ -5,10 +5,10 @@

{{ client?.name || "" }}

- - - - + + + + diff --git a/taoyao-client-web/src/components/Taoyao.js b/taoyao-client-web/src/components/Taoyao.js index 4bcb8f6..4b97640 100644 --- a/taoyao-client-web/src/components/Taoyao.js +++ b/taoyao-client-web/src/components/Taoyao.js @@ -236,6 +236,22 @@ class RemoteClient { volume = 0; // 代理对象 proxy; + // 数据可用 + dataActive = false; + // 音频可用 + audioActive = false; + // 视频可用 + videoActive = false; + // 数据消费者 + dataConsumer; + // 音频消费者 + audioConsumer; + // 视频消费者 + videoConsumer; + // 音频Track + audioTrack; + // 视频Track + videoTrack; constructor({ name, @@ -292,10 +308,6 @@ class Taoyao extends RemoteClient { recvTransport; // 媒体设备 mediasoupDevice; - // 是否消费 - consume; - // 是否生产 - produce; // 视频来源:file | camera | screen videoSource = "camera"; // 强制使用TCP @@ -306,6 +318,12 @@ class Taoyao extends RemoteClient { forceH264; // 同时上送多种质量媒体 useSimulcast; + // 是否消费数据 + dataConsume; + // 是否消费音频 + audioConsume; + // 是否消费视频 + videoConsume; // 是否生产数据 dataProduce; // 是否生产音频 @@ -331,12 +349,13 @@ class Taoyao extends RemoteClient { username, password, roomId, - consume = true, - produce = true, + dataConsume = true, + audioConsume = true, + videoConsume = true, + dataProduce = true, audioProduce = true, videoProduce = true, forceTcp = false, - dataProduce = true, }) { super({ name, clientId }); this.name = name; @@ -346,11 +365,12 @@ class Taoyao extends RemoteClient { this.username = username; this.password = password; this.roomId = roomId; - this.consume = consume; - this.produce = produce; - this.dataProduce = produce && dataProduce; - this.audioProduce = produce && audioProduce; - this.videoProduce = produce && videoProduce; + this.dataConsume = dataConsume; + this.audioConsume = audioConsume; + this.videoConsume = videoConsume; + this.dataProduce = dataProduce; + this.audioProduce = audioProduce; + this.videoProduce = videoProduce; this.forceTcp = forceTcp; } @@ -494,6 +514,9 @@ class Taoyao extends RemoteClient { case "media::audio::volume": me.defaultMediaAudioVolume(message); break; + case "media::consumer::close": + me.defaultMediaConsumerClose(message); + break; case "room::client::list": me.defaultRoomClientList(message); break; @@ -584,6 +607,35 @@ class Taoyao extends RemoteClient { } }); } + /** + * 关闭消费者信令 + * + * @param {*} consumerId 消费者ID + */ + mediaConsumerClose(consumerId) { + const me = this; + me.push(protocol.buildMessage("media::consumer::close", { + roomId: me.roomId, + consumerId: consumerId + })); + } + /** + * 关闭消费者信令 + * + * @param {*} message 消息 + */ + defaultMediaConsumerClose(message) { + const me = this; + const { roomId, consumerId } = message.body; + const consumer = me.consumers.get(consumerId); + if(consumer) { + console.info("关闭消费者:", consumerId); + consumer.close(); + me.consumers.delete(consumerId); + } else { + console.debug("关闭消费者无效:", consumerId); + } + } /** * 消费媒体信令 * @@ -591,8 +643,8 @@ class Taoyao extends RemoteClient { */ async defaultMediaConsume(message) { const self = this; - if (!self.consume) { - console.log("没有消费媒体"); + if (!self.audioConsume && !self.videoConsume) { + console.debug("没有消费媒体"); return; } const { @@ -655,6 +707,19 @@ class Taoyao extends RemoteClient { console.debug("远程媒体:", consumer); const remoteClient = self.remoteClients.get(consumer.sourceId); if(remoteClient && remoteClient.proxy && remoteClient.proxy.media) { + const track = consumer.track; + // TODO:旧的媒体? + if(track.kind === 'audio') { + remoteClient.audioActive = true; + remoteClient.audioTrack = track; + remoteClient.audioConsumer = consumer; + } else if(track.kind === 'video') { + remoteClient.audioActive = false; + remoteClient.videoTrack = track; + remoteClient.videoconsumer = consumer; + } else { + console.warn("不支持的媒体:", track); + } remoteClient.proxy.media(consumer.track, consumer); } else { console.warn("远程终端没有实现服务代理:", remoteClient); @@ -858,7 +923,7 @@ class Taoyao extends RemoteClient { setTimeout(() => audioTrack.stop(), 30000); }); } - if (self.produce) { + if (self.audioProduce || self.videoProduce) { const response = await self.request( protocol.buildMessage("media::transport::webrtc::create", { roomId: self.roomId, @@ -954,7 +1019,7 @@ class Taoyao extends RemoteClient { } ); } - if (this.consume) { + if (self.audioConsume || self.videoConsume) { const self = this; const response = await self.request( protocol.buildMessage("media::transport::webrtc::create", { @@ -1163,6 +1228,7 @@ class Taoyao extends RemoteClient { // TODO:异常 } if(self.proxy && self.proxy.media) { + self.videoTrack = track; self.proxy.media(track); } else { console.warn("终端没有实现服务代理:", self); @@ -1323,7 +1389,6 @@ class Taoyao extends RemoteClient { async checkDevice() { const self = this; if ( - self.produce && navigator.mediaDevices && navigator.mediaDevices.getUserMedia && navigator.mediaDevices.enumerateDevices diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/client/ClientManager.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/client/ClientManager.java index 634e61a..aad7b9f 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/client/ClientManager.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/client/ClientManager.java @@ -11,7 +11,7 @@ import org.springframework.scheduling.annotation.Scheduled; import com.acgist.taoyao.boot.annotation.Manager; import com.acgist.taoyao.boot.config.TaoyaoProperties; import com.acgist.taoyao.boot.model.Message; -import com.acgist.taoyao.signal.event.ClientCloseEvent; +import com.acgist.taoyao.signal.event.client.ClientCloseEvent; import lombok.extern.slf4j.Slf4j; diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/event/MediaClientRegisterEvent.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/event/MediaClientRegisterEvent.java deleted file mode 100644 index e85bdef..0000000 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/event/MediaClientRegisterEvent.java +++ /dev/null @@ -1,18 +0,0 @@ -package com.acgist.taoyao.signal.event; - -import com.acgist.taoyao.signal.client.Client; - -/** - * 媒体服务终端注册事件 - * - * @author acgist - */ -public class MediaClientRegisterEvent extends ClientEventAdapter { - - private static final long serialVersionUID = 1L; - - public MediaClientRegisterEvent(Client client) { - super(client); - } - -} diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/event/ClientCloseEvent.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/event/client/ClientCloseEvent.java similarity index 73% rename from taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/event/ClientCloseEvent.java rename to taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/event/client/ClientCloseEvent.java index e50cb6a..23c9a81 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/event/ClientCloseEvent.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/event/client/ClientCloseEvent.java @@ -1,6 +1,7 @@ -package com.acgist.taoyao.signal.event; +package com.acgist.taoyao.signal.event.client; import com.acgist.taoyao.signal.client.Client; +import com.acgist.taoyao.signal.event.ClientEventAdapter; /** * 终端关闭事件 diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/event/ClientConfigEvent.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/event/client/ClientConfigEvent.java similarity index 77% rename from taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/event/ClientConfigEvent.java rename to taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/event/client/ClientConfigEvent.java index 8a547bb..ba678d5 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/event/ClientConfigEvent.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/event/client/ClientConfigEvent.java @@ -1,6 +1,7 @@ -package com.acgist.taoyao.signal.event; +package com.acgist.taoyao.signal.event.client; import com.acgist.taoyao.signal.client.Client; +import com.acgist.taoyao.signal.event.ClientEventAdapter; import lombok.Getter; import lombok.Setter; diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/event/ClientOfflineEvent.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/event/client/ClientOfflineEvent.java similarity index 77% rename from taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/event/ClientOfflineEvent.java rename to taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/event/client/ClientOfflineEvent.java index 9435dd9..189c4ed 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/event/ClientOfflineEvent.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/event/client/ClientOfflineEvent.java @@ -1,6 +1,7 @@ -package com.acgist.taoyao.signal.event; +package com.acgist.taoyao.signal.event.client; import com.acgist.taoyao.signal.client.Client; +import com.acgist.taoyao.signal.event.ClientEventAdapter; import lombok.Getter; import lombok.Setter; diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/event/ClientOnlineEvent.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/event/client/ClientOnlineEvent.java similarity index 77% rename from taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/event/ClientOnlineEvent.java rename to taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/event/client/ClientOnlineEvent.java index 4034ab6..e708e2a 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/event/ClientOnlineEvent.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/event/client/ClientOnlineEvent.java @@ -1,6 +1,7 @@ -package com.acgist.taoyao.signal.event; +package com.acgist.taoyao.signal.event.client; import com.acgist.taoyao.signal.client.Client; +import com.acgist.taoyao.signal.event.ClientEventAdapter; import lombok.Getter; import lombok.Setter; diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/event/MediaProduceEvent.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/event/media/MediaConsumeEvent.java similarity index 69% rename from taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/event/MediaProduceEvent.java rename to taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/event/media/MediaConsumeEvent.java index cec35f5..98373bd 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/event/MediaProduceEvent.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/event/media/MediaConsumeEvent.java @@ -1,5 +1,6 @@ -package com.acgist.taoyao.signal.event; +package com.acgist.taoyao.signal.event.media; +import com.acgist.taoyao.signal.event.RoomEventAdapter; import com.acgist.taoyao.signal.party.media.ClientWrapper; import com.acgist.taoyao.signal.party.media.Producer; import com.acgist.taoyao.signal.party.media.Room; @@ -8,13 +9,13 @@ import lombok.Getter; import lombok.Setter; /** - * 生产媒体事件 + * 消费媒体事件 * * @author acgist */ @Getter @Setter -public class MediaProduceEvent extends RoomEventAdapter { +public class MediaConsumeEvent extends RoomEventAdapter { private static final long serialVersionUID = 1L; @@ -27,13 +28,13 @@ public class MediaProduceEvent extends RoomEventAdapter { */ private final ClientWrapper clientWrapper; - public MediaProduceEvent(Room room, Producer producer) { + public MediaConsumeEvent(Room room, Producer producer) { super(room); this.producer = producer; this.clientWrapper = null; } - public MediaProduceEvent(Room room, ClientWrapper clientWrapper) { + public MediaConsumeEvent(Room room, ClientWrapper clientWrapper) { super(room); this.producer = null; this.clientWrapper = clientWrapper; diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/event/ConsumerCloseEvent.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/event/media/MediaConsumerCloseEvent.java similarity index 61% rename from taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/event/ConsumerCloseEvent.java rename to taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/event/media/MediaConsumerCloseEvent.java index 24d05b3..937efcb 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/event/ConsumerCloseEvent.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/event/media/MediaConsumerCloseEvent.java @@ -1,5 +1,6 @@ -package com.acgist.taoyao.signal.event; +package com.acgist.taoyao.signal.event.media; +import com.acgist.taoyao.signal.event.RoomEventAdapter; import com.acgist.taoyao.signal.party.media.Room; import lombok.Getter; @@ -12,7 +13,7 @@ import lombok.Setter; */ @Getter @Setter -public class ConsumerCloseEvent extends RoomEventAdapter { +public class MediaConsumerCloseEvent extends RoomEventAdapter { private static final long serialVersionUID = 1L; @@ -21,7 +22,7 @@ public class ConsumerCloseEvent extends RoomEventAdapter { */ private final String consumerId; - public ConsumerCloseEvent(String consumerId, Room room) { + public MediaConsumerCloseEvent(String consumerId, Room room) { super(room); this.consumerId = consumerId; } diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/event/ProducerCloseEvent.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/event/media/MediaProducerCloseEvent.java similarity index 61% rename from taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/event/ProducerCloseEvent.java rename to taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/event/media/MediaProducerCloseEvent.java index 3e0d1aa..f5c5f8d 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/event/ProducerCloseEvent.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/event/media/MediaProducerCloseEvent.java @@ -1,5 +1,6 @@ -package com.acgist.taoyao.signal.event; +package com.acgist.taoyao.signal.event.media; +import com.acgist.taoyao.signal.event.RoomEventAdapter; import com.acgist.taoyao.signal.party.media.Room; import lombok.Getter; @@ -12,7 +13,7 @@ import lombok.Setter; */ @Getter @Setter -public class ProducerCloseEvent extends RoomEventAdapter { +public class MediaProducerCloseEvent extends RoomEventAdapter { private static final long serialVersionUID = 1L; @@ -21,7 +22,7 @@ public class ProducerCloseEvent extends RoomEventAdapter { */ private final String producerId; - public ProducerCloseEvent(String producerId, Room room) { + public MediaProducerCloseEvent(String producerId, Room room) { super(room); this.producerId = producerId; } diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/event/room/RoomCreateEvent.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/event/room/RoomCreateEvent.java new file mode 100644 index 0000000..e62e991 --- /dev/null +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/event/room/RoomCreateEvent.java @@ -0,0 +1,19 @@ +package com.acgist.taoyao.signal.event.room; + +import com.acgist.taoyao.signal.client.Client; +import com.acgist.taoyao.signal.event.ClientEventAdapter; + +/** + * 创建房间事件 + * + * @author acgist + */ +public class RoomCreateEvent extends ClientEventAdapter { + + private static final long serialVersionUID = 1L; + + public RoomCreateEvent(Client client) { + super(client); + } + +} diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/event/RoomEnterEvent.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/event/room/RoomEnterEvent.java similarity index 83% rename from taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/event/RoomEnterEvent.java rename to taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/event/room/RoomEnterEvent.java index b88c7e8..ef6a99a 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/event/RoomEnterEvent.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/event/room/RoomEnterEvent.java @@ -1,6 +1,7 @@ -package com.acgist.taoyao.signal.event; +package com.acgist.taoyao.signal.event.room; import com.acgist.taoyao.signal.client.Client; +import com.acgist.taoyao.signal.event.RoomEventAdapter; import com.acgist.taoyao.signal.party.media.Room; import lombok.Getter; diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/event/RoomLeaveEvent.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/event/room/RoomLeaveEvent.java similarity index 83% rename from taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/event/RoomLeaveEvent.java rename to taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/event/room/RoomLeaveEvent.java index 266b62f..b02b00f 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/event/RoomLeaveEvent.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/event/room/RoomLeaveEvent.java @@ -1,6 +1,7 @@ -package com.acgist.taoyao.signal.event; +package com.acgist.taoyao.signal.event.room; import com.acgist.taoyao.signal.client.Client; +import com.acgist.taoyao.signal.event.RoomEventAdapter; import com.acgist.taoyao.signal.party.media.Room; import lombok.Getter; diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/party/media/Consumer.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/party/media/Consumer.java index 462261b..5d055c9 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/party/media/Consumer.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/party/media/Consumer.java @@ -2,8 +2,8 @@ package com.acgist.taoyao.signal.party.media; import java.io.Closeable; -import com.acgist.taoyao.signal.event.ConsumerCloseEvent; import com.acgist.taoyao.signal.event.EventPublisher; +import com.acgist.taoyao.signal.event.media.MediaConsumerCloseEvent; import lombok.Getter; import lombok.Setter; @@ -66,7 +66,7 @@ public class Consumer implements Closeable { log.info("关闭消费者:{}", this.consumerId); this.getProducer().remove(this.consumerId); this.consumerClient.getConsumers().remove(this.consumerId); - EventPublisher.publishEvent(new ConsumerCloseEvent(this.consumerId, this.room)); + EventPublisher.publishEvent(new MediaConsumerCloseEvent(this.consumerId, this.room)); } } diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/party/media/Producer.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/party/media/Producer.java index fe41618..1de9336 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/party/media/Producer.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/party/media/Producer.java @@ -5,7 +5,7 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import com.acgist.taoyao.signal.event.EventPublisher; -import com.acgist.taoyao.signal.event.ProducerCloseEvent; +import com.acgist.taoyao.signal.event.media.MediaProducerCloseEvent; import lombok.Getter; import lombok.Setter; @@ -78,7 +78,7 @@ public class Producer implements Closeable { log.info("关闭生产者:{}", this.producerId); this.consumers.forEach((k, v) -> v.close()); this.producerClient.getProducers().remove(this.producerId); - EventPublisher.publishEvent(new ProducerCloseEvent(this.producerId, this.room)); + EventPublisher.publishEvent(new MediaProducerCloseEvent(this.producerId, this.room)); } } diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/party/media/Room.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/party/media/Room.java index cd4e99b..af28658 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/party/media/Room.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/party/media/Room.java @@ -10,7 +10,7 @@ import com.acgist.taoyao.boot.model.Message; import com.acgist.taoyao.signal.client.Client; import com.acgist.taoyao.signal.client.ClientStatus; import com.acgist.taoyao.signal.event.EventPublisher; -import com.acgist.taoyao.signal.event.RoomLeaveEvent; +import com.acgist.taoyao.signal.event.room.RoomLeaveEvent; import lombok.Getter; import lombok.Setter; diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/client/ClientCloseProtocol.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/client/ClientCloseProtocol.java index 14b4d4c..6649008 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/client/ClientCloseProtocol.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/client/ClientCloseProtocol.java @@ -10,8 +10,8 @@ import com.acgist.taoyao.boot.annotation.Protocol; import com.acgist.taoyao.boot.model.Message; import com.acgist.taoyao.signal.client.Client; import com.acgist.taoyao.signal.client.ClientType; -import com.acgist.taoyao.signal.event.ClientCloseEvent; -import com.acgist.taoyao.signal.event.ClientOfflineEvent; +import com.acgist.taoyao.signal.event.client.ClientCloseEvent; +import com.acgist.taoyao.signal.event.client.ClientOfflineEvent; import com.acgist.taoyao.signal.protocol.ProtocolClientAdapter; import lombok.extern.slf4j.Slf4j; diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/client/ClientConfigProtocol.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/client/ClientConfigProtocol.java index 38220e3..c30f306 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/client/ClientConfigProtocol.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/client/ClientConfigProtocol.java @@ -17,7 +17,7 @@ import com.acgist.taoyao.boot.utils.DateUtils; import com.acgist.taoyao.boot.utils.DateUtils.DateTimeStyle; import com.acgist.taoyao.signal.client.Client; import com.acgist.taoyao.signal.client.ClientType; -import com.acgist.taoyao.signal.event.ClientConfigEvent; +import com.acgist.taoyao.signal.event.client.ClientConfigEvent; import com.acgist.taoyao.signal.protocol.ProtocolClientAdapter; /** diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/client/ClientOfflineProtocol.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/client/ClientOfflineProtocol.java index 2c50ba4..d5e91ac 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/client/ClientOfflineProtocol.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/client/ClientOfflineProtocol.java @@ -8,7 +8,7 @@ import org.springframework.scheduling.annotation.Async; import com.acgist.taoyao.boot.annotation.Description; import com.acgist.taoyao.boot.annotation.Protocol; import com.acgist.taoyao.boot.config.Constant; -import com.acgist.taoyao.signal.event.ClientOfflineEvent; +import com.acgist.taoyao.signal.event.client.ClientOfflineEvent; import com.acgist.taoyao.signal.protocol.ProtocolClientAdapter; /** diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/client/ClientOnlineProtocol.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/client/ClientOnlineProtocol.java index 1c5351f..ce6f480 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/client/ClientOnlineProtocol.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/client/ClientOnlineProtocol.java @@ -6,7 +6,7 @@ import org.springframework.scheduling.annotation.Async; import com.acgist.taoyao.boot.annotation.Description; import com.acgist.taoyao.boot.annotation.Protocol; import com.acgist.taoyao.signal.client.Client; -import com.acgist.taoyao.signal.event.ClientOnlineEvent; +import com.acgist.taoyao.signal.event.client.ClientOnlineEvent; import com.acgist.taoyao.signal.protocol.ProtocolClientAdapter; /** diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/client/ClientRegisterProtocol.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/client/ClientRegisterProtocol.java index fee11fb..89ce33c 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/client/ClientRegisterProtocol.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/client/ClientRegisterProtocol.java @@ -13,9 +13,9 @@ import com.acgist.taoyao.boot.utils.MapUtils; import com.acgist.taoyao.signal.client.Client; import com.acgist.taoyao.signal.client.ClientStatus; import com.acgist.taoyao.signal.client.ClientType; -import com.acgist.taoyao.signal.event.ClientConfigEvent; -import com.acgist.taoyao.signal.event.ClientOnlineEvent; -import com.acgist.taoyao.signal.event.MediaClientRegisterEvent; +import com.acgist.taoyao.signal.event.client.ClientConfigEvent; +import com.acgist.taoyao.signal.event.client.ClientOnlineEvent; +import com.acgist.taoyao.signal.event.room.RoomCreateEvent; import com.acgist.taoyao.signal.protocol.ProtocolClientAdapter; import com.acgist.taoyao.signal.service.SecurityService; @@ -95,9 +95,9 @@ public class ClientRegisterProtocol extends ProtocolClientAdapter { this.publishEvent(new ClientConfigEvent(client)); // 终端上线事件 this.publishEvent(new ClientOnlineEvent(client)); - // 媒体服务注册事件 + // 媒体服务注册:创建房间事件 if(clientType.mediaServer()) { - this.publishEvent(new MediaClientRegisterEvent(client)); + this.publishEvent(new RoomCreateEvent(client)); } } diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaConsumeProtocol.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaConsumeProtocol.java index 10d209f..7ea400b 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaConsumeProtocol.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaConsumeProtocol.java @@ -14,7 +14,7 @@ import com.acgist.taoyao.boot.model.MessageCodeException; import com.acgist.taoyao.boot.utils.MapUtils; import com.acgist.taoyao.signal.client.Client; import com.acgist.taoyao.signal.client.ClientType; -import com.acgist.taoyao.signal.event.MediaProduceEvent; +import com.acgist.taoyao.signal.event.media.MediaConsumeEvent; import com.acgist.taoyao.signal.party.media.ClientWrapper; import com.acgist.taoyao.signal.party.media.Consumer; import com.acgist.taoyao.signal.party.media.Producer; @@ -43,7 +43,7 @@ import lombok.extern.slf4j.Slf4j; "终端->信令服务->媒体服务=>信令服务->终端->信令服务->媒体服务" } ) -public class MediaConsumeProtocol extends ProtocolRoomAdapter implements ApplicationListener { +public class MediaConsumeProtocol extends ProtocolRoomAdapter implements ApplicationListener { public static final String SIGNAL = "media::consume"; @@ -53,7 +53,7 @@ public class MediaConsumeProtocol extends ProtocolRoomAdapter implements Applica @Async @Override - public void onApplicationEvent(MediaProduceEvent event) { + public void onApplicationEvent(MediaConsumeEvent event) { final Room room = event.getRoom(); if(event.getProducer() != null) { // 生产媒体:其他终端消费 @@ -116,6 +116,7 @@ public class MediaConsumeProtocol extends ProtocolRoomAdapter implements Applica private void consume(Room room, ClientWrapper consumerClientWrapper, Producer producer, Message message) { final Client mediaClient = room.getMediaClient(); if(consumerClientWrapper.consumed(producer)) { + // TODO:没有清理干净 // 消费通道准备就绪 if(log.isDebugEnabled()) { log.debug("消费通道准备就绪:{} - {}", consumerClientWrapper.getClientId(), producer.getStreamId()); diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaConsumerCloseProtocol.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaConsumerCloseProtocol.java index f6d6914..7ce5347 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaConsumerCloseProtocol.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaConsumerCloseProtocol.java @@ -3,6 +3,7 @@ package com.acgist.taoyao.signal.protocol.media; import java.util.Map; import org.springframework.context.ApplicationListener; +import org.springframework.scheduling.annotation.Async; import com.acgist.taoyao.boot.annotation.Description; import com.acgist.taoyao.boot.annotation.Protocol; @@ -11,7 +12,7 @@ import com.acgist.taoyao.boot.model.Message; import com.acgist.taoyao.boot.utils.MapUtils; import com.acgist.taoyao.signal.client.Client; import com.acgist.taoyao.signal.client.ClientType; -import com.acgist.taoyao.signal.event.ConsumerCloseEvent; +import com.acgist.taoyao.signal.event.media.MediaConsumerCloseEvent; import com.acgist.taoyao.signal.party.media.Consumer; import com.acgist.taoyao.signal.party.media.Room; import com.acgist.taoyao.signal.protocol.ProtocolRoomAdapter; @@ -20,6 +21,7 @@ import lombok.extern.slf4j.Slf4j; /** * 关闭消费者信令 + * 注意:正常情况不会存在关闭消费者的情况,所以一般不用处理关闭消费者信令。 * * @author acgist */ @@ -28,12 +30,13 @@ import lombok.extern.slf4j.Slf4j; @Description( body = """ { + "roomId": "房间ID" "consumerId": "消费者ID" } """, flow = "终端->信令服务+)终端" ) -public class MediaConsumerCloseProtocol extends ProtocolRoomAdapter implements ApplicationListener { +public class MediaConsumerCloseProtocol extends ProtocolRoomAdapter implements ApplicationListener { public static final String SIGNAL = "media::consumer::close"; @@ -41,14 +44,15 @@ public class MediaConsumerCloseProtocol extends ProtocolRoomAdapter implements A super("关闭消费者信令", SIGNAL); } + @Async @Override - public void onApplicationEvent(ConsumerCloseEvent event) { + public void onApplicationEvent(MediaConsumerCloseEvent event) { final Room room = event.getRoom(); final Map body = Map.of( Constant.ROOM_ID, room.getRoomId(), Constant.CONSUMER_ID, event.getConsumerId() ); - this.close(room, this.build(body)); + room.broadcastAll(this.build(body)); } @Override @@ -62,14 +66,4 @@ public class MediaConsumerCloseProtocol extends ProtocolRoomAdapter implements A } } - /** - * 关闭消费者 - * - * @param room 房间 - * @param message 消息 - */ - private void close(Room room, Message message) { - room.broadcastAll(message); - } - } diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaConsumerPauseProtocol.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaConsumerPauseProtocol.java index cc12913..93bb17f 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaConsumerPauseProtocol.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaConsumerPauseProtocol.java @@ -1,5 +1,47 @@ package com.acgist.taoyao.signal.protocol.media; -public class MediaConsumerPauseProtocol { +import java.util.Map; + +import com.acgist.taoyao.boot.annotation.Description; +import com.acgist.taoyao.boot.annotation.Protocol; +import com.acgist.taoyao.boot.model.Message; +import com.acgist.taoyao.signal.client.Client; +import com.acgist.taoyao.signal.client.ClientType; +import com.acgist.taoyao.signal.party.media.Room; +import com.acgist.taoyao.signal.protocol.ProtocolRoomAdapter; + +/** + * 暂停消费者信令 + * + * @author acgist + */ +@Protocol +@Description( + body = """ + { + "roomId": "房间ID" + "consumerId": "消费者ID" + } + """, + flow = "终端->信令服务->媒体服务->信令服务->终端" +) +public class MediaConsumerPauseProtocol extends ProtocolRoomAdapter { + + public static final String SIGNAL = "media::consumer::pause"; + + public MediaConsumerPauseProtocol() { + super("暂停消费者信令", SIGNAL); + } + + @Override + public void execute(String clientId, ClientType clientType, Room room, Client client, Client mediaClient, Message message, Map body) { + if(clientType.mediaClient()) { + mediaClient.push(message); + } else if(clientType.mediaServer()) { + room.broadcast(message); + } else { + this.logNoAdapter(clientType); + } + } } diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaConsumerResumeProtocol.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaConsumerResumeProtocol.java index 1257551..8f70e0e 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaConsumerResumeProtocol.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaConsumerResumeProtocol.java @@ -1,5 +1,47 @@ package com.acgist.taoyao.signal.protocol.media; -public class MediaConsumerResumeProtocol { +import java.util.Map; + +import com.acgist.taoyao.boot.annotation.Description; +import com.acgist.taoyao.boot.annotation.Protocol; +import com.acgist.taoyao.boot.model.Message; +import com.acgist.taoyao.signal.client.Client; +import com.acgist.taoyao.signal.client.ClientType; +import com.acgist.taoyao.signal.party.media.Room; +import com.acgist.taoyao.signal.protocol.ProtocolRoomAdapter; + +/** + * 恢复消费者信令 + * + * @author acgist + */ +@Protocol +@Description( + body = """ + { + "roomId": "房间ID" + "consumerId": "消费者ID" + } + """, + flow = "终端->信令服务->媒体服务->信令服务->终端" +) +public class MediaConsumerResumeProtocol extends ProtocolRoomAdapter { + + public static final String SIGNAL = "media::consumer::resumt"; + + public MediaConsumerResumeProtocol() { + super("恢复消费者信令", SIGNAL); + } + + @Override + public void execute(String clientId, ClientType clientType, Room room, Client client, Client mediaClient, Message message, Map body) { + if(clientType.mediaClient()) { + mediaClient.push(message); + } else if(clientType.mediaServer()) { + room.broadcast(message); + } else { + this.logNoAdapter(clientType); + } + } } diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaConsumerScoreProtocol.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaConsumerScoreProtocol.java new file mode 100644 index 0000000..497ef89 --- /dev/null +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaConsumerScoreProtocol.java @@ -0,0 +1,46 @@ +package com.acgist.taoyao.signal.protocol.media; + +import java.util.Map; + +import com.acgist.taoyao.boot.annotation.Description; +import com.acgist.taoyao.boot.annotation.Protocol; +import com.acgist.taoyao.boot.model.Message; +import com.acgist.taoyao.signal.client.Client; +import com.acgist.taoyao.signal.client.ClientType; +import com.acgist.taoyao.signal.party.media.Room; +import com.acgist.taoyao.signal.protocol.ProtocolRoomAdapter; + +/** + * 媒体消费者评分信令 + * + * @author acgist + */ +@Protocol +@Description( + body = """ + { + "score": "消费者RTP流得分表示传输质量:0~10", + "producerScore": "生产者RTP流得分表示传输质量:0~10", + "producerScores": [所有生产者RTP流得分] + } + """, + flow = "媒体服务->信令服务->终端" +) +public class MediaConsumerScoreProtocol extends ProtocolRoomAdapter { + + public static final String SIGNAL = "media::consumer::score"; + + public MediaConsumerScoreProtocol() { + super("媒体消费者评分信令", SIGNAL); + } + + @Override + public void execute(String clientId, ClientType clientType, Room room, Client client, Client mediaClient, Message message, Map body) { + if(clientType.mediaServer()) { + room.broadcast(message); + } else { + this.logNoAdapter(clientType); + } + } + +} diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaConsumerStatusProtocol.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaConsumerStatusProtocol.java index e9892be..21fd855 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaConsumerStatusProtocol.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaConsumerStatusProtocol.java @@ -1,5 +1,45 @@ package com.acgist.taoyao.signal.protocol.media; -public class MediaConsumerStatusProtocol { +import java.util.Map; + +import com.acgist.taoyao.boot.annotation.Description; +import com.acgist.taoyao.boot.annotation.Protocol; +import com.acgist.taoyao.boot.model.Message; +import com.acgist.taoyao.signal.client.Client; +import com.acgist.taoyao.signal.client.ClientType; +import com.acgist.taoyao.signal.party.media.Room; +import com.acgist.taoyao.signal.protocol.ProtocolRoomAdapter; + +/** + * 查询消费者状态信令 + * + * @author acgist + */ +@Protocol +@Description( + body = """ + { + "roomId": "房间ID", + "consumerId": "消费者ID" + } + """, + flow = "终端=>信令服务->媒体服务->信令服务->终端" +) +public class MediaConsumerStatusProtocol extends ProtocolRoomAdapter { + + public static final String SIGNAL = "media::consumer::status"; + + public MediaConsumerStatusProtocol() { + super("查询消费者状态信令", SIGNAL); + } + + @Override + public void execute(String clientId, ClientType clientType, Room room, Client client, Client mediaClient, Message message, Map body) { + if(clientType.mediaClient()) { + client.push(mediaClient.request(message)); + } else { + this.logNoAdapter(clientType); + } + } } diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaProduceProtocol.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaProduceProtocol.java index ad237ce..bf771df 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaProduceProtocol.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaProduceProtocol.java @@ -9,7 +9,7 @@ import com.acgist.taoyao.boot.model.Message; import com.acgist.taoyao.boot.utils.MapUtils; import com.acgist.taoyao.signal.client.Client; import com.acgist.taoyao.signal.client.ClientType; -import com.acgist.taoyao.signal.event.MediaProduceEvent; +import com.acgist.taoyao.signal.event.media.MediaConsumeEvent; import com.acgist.taoyao.signal.party.media.ClientWrapper; import com.acgist.taoyao.signal.party.media.Producer; import com.acgist.taoyao.signal.party.media.Room; @@ -71,7 +71,7 @@ public class MediaProduceProtocol extends ProtocolRoomAdapter { // 全部不收:全部广播 room.broadcast(responseMessage); log.info("{}生产媒体:{} - {} - {}", clientId, kind, streamId, producerId); - this.publishEvent(new MediaProduceEvent(room, producer)); + this.publishEvent(new MediaConsumeEvent(room, producer)); } } diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaProducerCloseProtocol.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaProducerCloseProtocol.java index 4c0fad8..5ef82a8 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaProducerCloseProtocol.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaProducerCloseProtocol.java @@ -1,5 +1,68 @@ package com.acgist.taoyao.signal.protocol.media; -public class MediaProducerCloseProtocol { +import java.util.Map; + +import org.springframework.context.ApplicationListener; +import org.springframework.scheduling.annotation.Async; + +import com.acgist.taoyao.boot.annotation.Description; +import com.acgist.taoyao.boot.annotation.Protocol; +import com.acgist.taoyao.boot.config.Constant; +import com.acgist.taoyao.boot.model.Message; +import com.acgist.taoyao.boot.utils.MapUtils; +import com.acgist.taoyao.signal.client.Client; +import com.acgist.taoyao.signal.client.ClientType; +import com.acgist.taoyao.signal.event.media.MediaProducerCloseEvent; +import com.acgist.taoyao.signal.party.media.Producer; +import com.acgist.taoyao.signal.party.media.Room; +import com.acgist.taoyao.signal.protocol.ProtocolRoomAdapter; + +import lombok.extern.slf4j.Slf4j; + +/** + * 关闭生产者信令 + * + * @author acgist + */ +@Slf4j +@Protocol +@Description( + body = """ + { + "roomId": "房间ID" + "consumerId": "消费者ID" + } + """, + flow = "终端->信令服务+)终端" +) +public class MediaProducerCloseProtocol extends ProtocolRoomAdapter implements ApplicationListener { + + public static final String SIGNAL = "media::producer::close"; + + public MediaProducerCloseProtocol() { + super("关闭生产者信令", SIGNAL); + } + + @Async + @Override + public void onApplicationEvent(MediaProducerCloseEvent event) { + final Room room = event.getRoom(); + final Map body = Map.of( + Constant.ROOM_ID, room.getRoomId(), + Constant.PRODUCER_ID, event.getProducerId() + ); + room.broadcastAll(this.build(body)); + } + + @Override + public void execute(String clientId, ClientType clientType, Room room, Client client, Client mediaClient, Message message, Map body) { + final String producerId = MapUtils.get(body, Constant.PRODUCER_ID); + final Producer producer = room.producer(producerId); + if(producer == null) { + log.warn("关闭生产者无效:{}", producerId); + } else { + producer.close(); + } + } } diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaProducerPauseProtocol.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaProducerPauseProtocol.java index d3423a4..de4e752 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaProducerPauseProtocol.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaProducerPauseProtocol.java @@ -1,5 +1,47 @@ package com.acgist.taoyao.signal.protocol.media; -public class MediaProducerPauseProtocol { +import java.util.Map; + +import com.acgist.taoyao.boot.annotation.Description; +import com.acgist.taoyao.boot.annotation.Protocol; +import com.acgist.taoyao.boot.model.Message; +import com.acgist.taoyao.signal.client.Client; +import com.acgist.taoyao.signal.client.ClientType; +import com.acgist.taoyao.signal.party.media.Room; +import com.acgist.taoyao.signal.protocol.ProtocolRoomAdapter; + +/** + * 暂停生产者信令 + * + * @author acgist + */ +@Protocol +@Description( + body = """ + { + "roomId": "房间ID" + "producerId": "消费者ID" + } + """, + flow = "终端->信令服务->媒体服务->信令服务->终端" +) +public class MediaProducerPauseProtocol extends ProtocolRoomAdapter { + + public static final String SIGNAL = "media::producer::pause"; + + public MediaProducerPauseProtocol() { + super("暂停生产者信令", SIGNAL); + } + + @Override + public void execute(String clientId, ClientType clientType, Room room, Client client, Client mediaClient, Message message, Map body) { + if(clientType.mediaClient()) { + mediaClient.push(message); + } else if(clientType.mediaServer()) { + room.broadcast(message); + } else { + this.logNoAdapter(clientType); + } + } } diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaProducerResumeProtocol.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaProducerResumeProtocol.java index 315c904..663fd32 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaProducerResumeProtocol.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaProducerResumeProtocol.java @@ -1,5 +1,47 @@ package com.acgist.taoyao.signal.protocol.media; -public class MediaProducerResumeProtocol { +import java.util.Map; + +import com.acgist.taoyao.boot.annotation.Description; +import com.acgist.taoyao.boot.annotation.Protocol; +import com.acgist.taoyao.boot.model.Message; +import com.acgist.taoyao.signal.client.Client; +import com.acgist.taoyao.signal.client.ClientType; +import com.acgist.taoyao.signal.party.media.Room; +import com.acgist.taoyao.signal.protocol.ProtocolRoomAdapter; + +/** + * 恢复生产者信令 + * + * @author acgist + */ +@Protocol +@Description( + body = """ + { + "roomId": "房间ID" + "producerId": "消费者ID" + } + """, + flow = "终端->信令服务->媒体服务->信令服务->终端" +) +public class MediaProducerResumeProtocol extends ProtocolRoomAdapter { + + public static final String SIGNAL = "media::producer::resume"; + + public MediaProducerResumeProtocol() { + super("恢复生产者信令", SIGNAL); + } + + @Override + public void execute(String clientId, ClientType clientType, Room room, Client client, Client mediaClient, Message message, Map body) { + if(clientType.mediaClient()) { + mediaClient.push(message); + } else if(clientType.mediaServer()) { + room.broadcast(message); + } else { + this.logNoAdapter(clientType); + } + } } diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaProducerScoreProtocol.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaProducerScoreProtocol.java index 1a04df8..ca80223 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaProducerScoreProtocol.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaProducerScoreProtocol.java @@ -29,7 +29,11 @@ public class MediaProducerScoreProtocol extends ProtocolRoomAdapter { @Override public void execute(String clientId, ClientType clientType, Room room, Client client, Client mediaClient, Message message, Map body) { - room.broadcast(message); + if(clientType.mediaServer()) { + room.broadcast(message); + } else { + this.logNoAdapter(clientType); + } } } diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaProducerStatusProtocol.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaProducerStatusProtocol.java index 57275b9..46683d9 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaProducerStatusProtocol.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaProducerStatusProtocol.java @@ -1,5 +1,45 @@ package com.acgist.taoyao.signal.protocol.media; -public class MediaProducerStatusProtocol { +import java.util.Map; + +import com.acgist.taoyao.boot.annotation.Description; +import com.acgist.taoyao.boot.annotation.Protocol; +import com.acgist.taoyao.boot.model.Message; +import com.acgist.taoyao.signal.client.Client; +import com.acgist.taoyao.signal.client.ClientType; +import com.acgist.taoyao.signal.party.media.Room; +import com.acgist.taoyao.signal.protocol.ProtocolRoomAdapter; + +/** + * 查询生产者状态信令 + * + * @author acgist + */ +@Protocol +@Description( + body = """ + { + "roomId": "房间ID", + "producerId": "生产者ID" + } + """, + flow = "终端=>信令服务->媒体服务->信令服务->终端" +) +public class MediaProducerStatusProtocol extends ProtocolRoomAdapter { + + public static final String SIGNAL = "media::producer::status"; + + public MediaProducerStatusProtocol() { + super("查询生产者状态信令", SIGNAL); + } + + @Override + public void execute(String clientId, ClientType clientType, Room room, Client client, Client mediaClient, Message message, Map body) { + if(clientType.mediaClient()) { + client.push(mediaClient.request(message)); + } else { + this.logNoAdapter(clientType); + } + } } diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaTransportStatusProtocol.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaTransportStatusProtocol.java index 7ba14fe..314b1ff 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaTransportStatusProtocol.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaTransportStatusProtocol.java @@ -1,5 +1,45 @@ package com.acgist.taoyao.signal.protocol.media; -public class MediaTransportStatusProtocol { +import java.util.Map; +import com.acgist.taoyao.boot.annotation.Description; +import com.acgist.taoyao.boot.annotation.Protocol; +import com.acgist.taoyao.boot.model.Message; +import com.acgist.taoyao.signal.client.Client; +import com.acgist.taoyao.signal.client.ClientType; +import com.acgist.taoyao.signal.party.media.Room; +import com.acgist.taoyao.signal.protocol.ProtocolRoomAdapter; + +/** + * 查询通道状态信令 + * + * @author acgist + */ +@Protocol +@Description( + body = """ + { + "roomId": "房间ID", + "transportId": "通道ID" + } + """, + flow = "终端=>信令服务->媒体服务->信令服务->终端" +) +public class MediaTransportStatusProtocol extends ProtocolRoomAdapter { + + public static final String SIGNAL = "media::transport::status"; + + public MediaTransportStatusProtocol() { + super("查询通道状态信令", SIGNAL); + } + + @Override + public void execute(String clientId, ClientType clientType, Room room, Client client, Client mediaClient, Message message, Map body) { + if(clientType.mediaClient()) { + client.push(mediaClient.request(message)); + } else { + this.logNoAdapter(clientType); + } + } + } diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaTransportWebRtcCreateProtocol.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaTransportWebRtcCreateProtocol.java index 887c6f6..8a91b43 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaTransportWebRtcCreateProtocol.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaTransportWebRtcCreateProtocol.java @@ -14,7 +14,7 @@ import com.acgist.taoyao.boot.utils.MapUtils; import com.acgist.taoyao.boot.utils.NetUtils; import com.acgist.taoyao.signal.client.Client; import com.acgist.taoyao.signal.client.ClientType; -import com.acgist.taoyao.signal.event.MediaProduceEvent; +import com.acgist.taoyao.signal.event.media.MediaConsumeEvent; import com.acgist.taoyao.signal.party.media.ClientWrapper; import com.acgist.taoyao.signal.party.media.Room; import com.acgist.taoyao.signal.party.media.Transport; @@ -75,7 +75,7 @@ public class MediaTransportWebRtcCreateProtocol extends ProtocolRoomAdapter { } // 拷贝属性 recvTransport.copy(responseBody); - this.publishEvent(new MediaProduceEvent(room, clientWrapper)); + this.publishEvent(new MediaConsumeEvent(room, clientWrapper)); } // 生产者 final Boolean producing = MapUtils.getBoolean(body, Constant.PRODUCING); diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/room/RoomClientListProtocol.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/room/RoomClientListProtocol.java index 541ad2a..dca257d 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/room/RoomClientListProtocol.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/room/RoomClientListProtocol.java @@ -10,7 +10,7 @@ import com.acgist.taoyao.boot.annotation.Protocol; import com.acgist.taoyao.boot.model.Message; import com.acgist.taoyao.signal.client.Client; import com.acgist.taoyao.signal.client.ClientType; -import com.acgist.taoyao.signal.event.RoomEnterEvent; +import com.acgist.taoyao.signal.event.room.RoomEnterEvent; import com.acgist.taoyao.signal.party.media.Room; import com.acgist.taoyao.signal.protocol.ProtocolRoomAdapter; diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/room/RoomCreateProtocol.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/room/RoomCreateProtocol.java index 13d5490..bb0004a 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/room/RoomCreateProtocol.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/room/RoomCreateProtocol.java @@ -12,7 +12,7 @@ import com.acgist.taoyao.boot.model.Message; import com.acgist.taoyao.boot.utils.MapUtils; import com.acgist.taoyao.signal.client.Client; import com.acgist.taoyao.signal.client.ClientType; -import com.acgist.taoyao.signal.event.MediaClientRegisterEvent; +import com.acgist.taoyao.signal.event.room.RoomCreateEvent; import com.acgist.taoyao.signal.party.media.Room; import com.acgist.taoyao.signal.protocol.ProtocolClientAdapter; @@ -41,7 +41,7 @@ import com.acgist.taoyao.signal.protocol.ProtocolClientAdapter; }, flow = "终端->信令服务->媒体服务->信令服务+)终端" ) -public class RoomCreateProtocol extends ProtocolClientAdapter implements ApplicationListener { +public class RoomCreateProtocol extends ProtocolClientAdapter implements ApplicationListener { public static final String SIGNAL = "room::create"; @@ -51,7 +51,7 @@ public class RoomCreateProtocol extends ProtocolClientAdapter implements Applica @Async @Override - public void onApplicationEvent(MediaClientRegisterEvent event) { + public void onApplicationEvent(RoomCreateEvent event) { this.roomManager.recreate(event.getClient(), this.build()); } diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/room/RoomEnterProtocol.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/room/RoomEnterProtocol.java index 353a282..73dce98 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/room/RoomEnterProtocol.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/room/RoomEnterProtocol.java @@ -13,7 +13,7 @@ import com.acgist.taoyao.boot.model.MessageCodeException; import com.acgist.taoyao.boot.utils.MapUtils; import com.acgist.taoyao.signal.client.Client; import com.acgist.taoyao.signal.client.ClientType; -import com.acgist.taoyao.signal.event.RoomEnterEvent; +import com.acgist.taoyao.signal.event.room.RoomEnterEvent; import com.acgist.taoyao.signal.party.media.ClientWrapper; import com.acgist.taoyao.signal.party.media.ClientWrapper.SubscribeType; import com.acgist.taoyao.signal.party.media.Room; diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/room/RoomLeaveProtocol.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/room/RoomLeaveProtocol.java index d799fa8..d387f18 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/room/RoomLeaveProtocol.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/room/RoomLeaveProtocol.java @@ -11,7 +11,7 @@ import com.acgist.taoyao.boot.config.Constant; import com.acgist.taoyao.boot.model.Message; import com.acgist.taoyao.signal.client.Client; import com.acgist.taoyao.signal.client.ClientType; -import com.acgist.taoyao.signal.event.RoomLeaveEvent; +import com.acgist.taoyao.signal.event.room.RoomLeaveEvent; import com.acgist.taoyao.signal.party.media.Room; import com.acgist.taoyao.signal.protocol.ProtocolRoomAdapter;