diff --git a/taoyao-signal-server/README.md b/taoyao-signal-server/README.md index 5f62132..6f9c815 100644 --- a/taoyao-signal-server/README.md +++ b/taoyao-signal-server/README.md @@ -12,4 +12,3 @@ ## 信令格式 [信令格式](https://localhost:8888/protocol/list) - diff --git a/taoyao-signal-server/taoyao-boot/src/main/java/com/acgist/taoyao/boot/annotation/Description.java b/taoyao-signal-server/taoyao-boot/src/main/java/com/acgist/taoyao/boot/annotation/Description.java index 76e9129..55d7645 100644 --- a/taoyao-signal-server/taoyao-boot/src/main/java/com/acgist/taoyao/boot/annotation/Description.java +++ b/taoyao-signal-server/taoyao-boot/src/main/java/com/acgist/taoyao/boot/annotation/Description.java @@ -10,6 +10,12 @@ import java.lang.annotation.Target; /** * 信令描述 * + * -[消息类型]> 异步请求 | 单播 + * =[消息类型]> 同步请求 + * -[消息类型]) 全员广播:对所有的终端广播信令(排除自己) + * +[消息类型]) 全员广播:对所有的终端广播信令(包含自己) + * ...:其他自定义的透传内容 + * * @author acgist */ @Target(ElementType.TYPE) @@ -24,6 +30,9 @@ public @interface Description { String[] body() default { "{}" }; /** + * 同步:需要等待服务端数据时使用 + * 异步:不用等待服务端数据时使用(服务端能主动通知类型消息都能使用异步) + * * @return 数据流向 */ String[] flow() default { "终端->信令服务->终端" }; diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/client/ClientType.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/client/ClientType.java index c41d3ff..5073740 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/client/ClientType.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/client/ClientType.java @@ -25,6 +25,41 @@ public enum ClientType { private ClientType(String name) { this.name = name; } + + /** + * @return 是否是Web + */ + public boolean web() { + return this == WEB; + } + + /** + * @return 是否是媒体服务 + */ + public boolean media() { + return this == MEDIA; + } + + /** + * @return 是否是摄像头 + */ + public boolean camera() { + return this == CAMERA; + } + + /** + * @return 是否是媒体终端 + */ + public boolean mediaClient() { + return this == WEB || this == CAMERA; + } + + /** + * @return 是否是媒体服务 + */ + public boolean mediaServer() { + return this == MEDIA; + } /** * @param value 类型 diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/configuration/SignalAutoConfiguration.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/configuration/SignalAutoConfiguration.java index 459f6f1..ec3a656 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/configuration/SignalAutoConfiguration.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/configuration/SignalAutoConfiguration.java @@ -1,12 +1,17 @@ package com.acgist.taoyao.signal.configuration; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.autoconfigure.AutoConfiguration; +import org.springframework.context.ApplicationContext; import org.springframework.context.annotation.Bean; import com.acgist.taoyao.boot.runner.OrderedCommandLineRunner; +import com.acgist.taoyao.signal.event.EventPublisher; import com.acgist.taoyao.signal.protocol.ProtocolManager; +import jakarta.annotation.PostConstruct; + /** * 信令自动配置 * @@ -15,6 +20,14 @@ import com.acgist.taoyao.signal.protocol.ProtocolManager; @AutoConfiguration public class SignalAutoConfiguration { + @Autowired + private ApplicationContext applicationContext; + + @PostConstruct + public void init() { + EventPublisher.setApplicationContext(this.applicationContext); + } + @Bean public CommandLineRunner signalCommandLineRunner(ProtocolManager protocolManager) { return new OrderedCommandLineRunner() { diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/event/ConsumerCloseEvent.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/event/ConsumerCloseEvent.java new file mode 100644 index 0000000..24d05b3 --- /dev/null +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/event/ConsumerCloseEvent.java @@ -0,0 +1,29 @@ +package com.acgist.taoyao.signal.event; + +import com.acgist.taoyao.signal.party.media.Room; + +import lombok.Getter; +import lombok.Setter; + +/** + * 关闭消费者事件 + * + * @author acgist + */ +@Getter +@Setter +public class ConsumerCloseEvent extends RoomEventAdapter { + + private static final long serialVersionUID = 1L; + + /** + * 消费者ID + */ + private final String consumerId; + + public ConsumerCloseEvent(String consumerId, Room room) { + super(room); + this.consumerId = consumerId; + } + +} diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/event/EventPublisher.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/event/EventPublisher.java new file mode 100644 index 0000000..8498800 --- /dev/null +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/event/EventPublisher.java @@ -0,0 +1,34 @@ +package com.acgist.taoyao.signal.event; + +import org.springframework.context.ApplicationContext; +import org.springframework.context.ApplicationEvent; + +/** + * 事件法布者 + * + * @author acgist + */ +public class EventPublisher { + + /** + * 上下文 + */ + private static ApplicationContext applicationContext; + + /** + * @param applicationContext 上下文 + */ + public static final void setApplicationContext(ApplicationContext applicationContext) { + EventPublisher.applicationContext = applicationContext; + } + + /** + * 发布事件 + * + * @param event 事件 + */ + public static final void publishEvent(ApplicationEvent event) { + EventPublisher.applicationContext.publishEvent(event); + } + +} diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/event/ProducerCloseEvent.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/event/ProducerCloseEvent.java new file mode 100644 index 0000000..3e0d1aa --- /dev/null +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/event/ProducerCloseEvent.java @@ -0,0 +1,29 @@ +package com.acgist.taoyao.signal.event; + +import com.acgist.taoyao.signal.party.media.Room; + +import lombok.Getter; +import lombok.Setter; + +/** + * 关闭生产者事件 + * + * @author acgist + */ +@Getter +@Setter +public class ProducerCloseEvent extends RoomEventAdapter { + + private static final long serialVersionUID = 1L; + + /** + * 生产者ID + */ + private final String producerId; + + public ProducerCloseEvent(String producerId, Room room) { + super(room); + this.producerId = producerId; + } + +} diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/party/media/ClientWrapper.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/party/media/ClientWrapper.java index 19d88dd..72d5d8d 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/party/media/ClientWrapper.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/party/media/ClientWrapper.java @@ -10,7 +10,7 @@ import lombok.Getter; import lombok.Setter; /** - * 终端包装器 + * 终端包装器:Peer * * @author acgist */ @@ -43,7 +43,7 @@ public class ClientWrapper implements AutoCloseable { return SubscribeType.ALL; } - public boolean consume(Producer producer) { + public boolean canConsume(Producer producer) { return switch (this) { case NONE -> false; case ALL_AUDIO -> producer.getKind() == Kind.AUDIO; @@ -94,7 +94,7 @@ public class ClientWrapper implements AutoCloseable { private Transport recvTransport; /** * 生产者 - * 生产者里面的消费者是其他终端消费当前终端的消费者 + * 其他终端消费当前终端的消费者 */ private final Map producers; /** @@ -144,28 +144,19 @@ public class ClientWrapper implements AutoCloseable { * * @return 是否已经消费 */ - public boolean consume(Producer producer) { + public boolean consumed(Producer producer) { return this.consumers.values().stream() .anyMatch(v -> v.getProducer() == producer); } - /** - * 删除消费者 - * - * @param wrapper 消费者终端保证期 - */ - public void remove(ClientWrapper wrapper) { - this.consumers.entrySet().stream() - .filter(v -> v.getValue().getProducer().getProduceClient() == wrapper) - .map(Map.Entry::getKey) - .forEach(this.consumers::remove); - // TODO:资源释放 - this.producers.values().forEach(v -> v.remove(wrapper)); - } - @Override public void close() throws Exception { - // TODO:释放资源 + // TODO:释放资源:通道、消费者、生产者 + this.consumers.forEach((k, v) -> v.close()); + this.producers.forEach((k, v) -> v.close()); + // TODO:实现 + this.recvTransport.close(); + this.sendTransport.close(); } } diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/party/media/Consumer.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/party/media/Consumer.java index 735c58c..462261b 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/party/media/Consumer.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/party/media/Consumer.java @@ -1,25 +1,28 @@ package com.acgist.taoyao.signal.party.media; +import java.io.Closeable; + +import com.acgist.taoyao.signal.event.ConsumerCloseEvent; +import com.acgist.taoyao.signal.event.EventPublisher; + import lombok.Getter; import lombok.Setter; +import lombok.extern.slf4j.Slf4j; /** * 消费者 * * @author acgist */ +@Slf4j @Getter @Setter -public class Consumer { +public class Consumer implements Closeable { /** - * 消费者终端 + * 是否关闭 */ - private final ClientWrapper consumeClient; - /** - * 生产者 - */ - private final Producer producer; + private volatile boolean close = false; /** * 媒体类型 */ @@ -32,13 +35,38 @@ public class Consumer { * 消费者标识 */ private final String consumerId; + /** + * 房间 + */ + private final Room room; + /** + * 生产者 + */ + private final Producer producer; + /** + * 消费者终端 + */ + private final ClientWrapper consumerClient; - public Consumer(ClientWrapper consumeClient, Producer producer, String kind, String streamId, String consumerId) { - this.consumeClient = consumeClient; - this.producer = producer; + public Consumer(String kind, String streamId, String consumerId, Room room, Producer producer, ClientWrapper consumerClient) { this.kind = Kind.of(kind); this.streamId = streamId; this.consumerId = consumerId; + this.room = room; + this.producer = producer; + this.consumerClient = consumerClient; + } + + @Override + public void close() { + if(this.close) { + return; + } + this.close = true; + log.info("关闭消费者:{}", this.consumerId); + this.getProducer().remove(this.consumerId); + this.consumerClient.getConsumers().remove(this.consumerId); + EventPublisher.publishEvent(new ConsumerCloseEvent(this.consumerId, this.room)); } } diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/party/media/Producer.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/party/media/Producer.java index 1d0fbf3..fe41618 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/party/media/Producer.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/party/media/Producer.java @@ -1,24 +1,30 @@ package com.acgist.taoyao.signal.party.media; +import java.io.Closeable; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import com.acgist.taoyao.signal.event.EventPublisher; +import com.acgist.taoyao.signal.event.ProducerCloseEvent; + import lombok.Getter; import lombok.Setter; +import lombok.extern.slf4j.Slf4j; /** * 生产者 * * @author acgist */ +@Slf4j @Getter @Setter -public class Producer { - +public class Producer implements Closeable { + /** - * 生产者终端 + * 是否关闭 */ - private final ClientWrapper produceClient; + private volatile boolean close = false; /** * 媒体类型 */ @@ -31,30 +37,48 @@ public class Producer { * 生产者标识 */ private final String producerId; + /** + * 房间 + */ + private final Room room; + /** + * 生产者终端 + */ + private final ClientWrapper producerClient; /** * 消费者 + * 其他终端消费当前终端的消费者 */ private final Map consumers; - public Producer(ClientWrapper produceClient, String kind, String streamId, String producerId) { - this.produceClient = produceClient; + public Producer(String kind, String streamId, String producerId, Room room, ClientWrapper produceClient) { this.kind = Kind.of(kind); this.streamId = streamId; this.producerId = producerId; + this.room = room; + this.producerClient = produceClient; this.consumers = new ConcurrentHashMap<>(); } - + /** * 删除消费者 * - * @param consumer 消费者 + * @param consumerId 消费者ID */ - public void remove(ClientWrapper consumer) { - this.consumers.entrySet().stream() - .filter(v -> v.getValue().getConsumeClient() == consumer) - .map(Map.Entry::getKey) - .forEach(this.consumers::remove); - // TODO:资源释放 + public void remove(String consumerId) { + this.consumers.remove(consumerId); + } + + @Override + public void close() { + if(this.close) { + return; + } + this.close = true; + log.info("关闭生产者:{}", this.producerId); + this.consumers.forEach((k, v) -> v.close()); + this.producerClient.getProducers().remove(this.producerId); + EventPublisher.publishEvent(new ProducerCloseEvent(this.producerId, this.room)); } } diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/party/media/Room.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/party/media/Room.java index a522c9d..cd4e99b 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/party/media/Room.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/party/media/Room.java @@ -6,11 +6,10 @@ import java.util.Map; import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; -import org.springframework.context.ApplicationContext; - import com.acgist.taoyao.boot.model.Message; import com.acgist.taoyao.signal.client.Client; import com.acgist.taoyao.signal.client.ClientStatus; +import com.acgist.taoyao.signal.event.EventPublisher; import com.acgist.taoyao.signal.event.RoomLeaveEvent; import lombok.Getter; @@ -28,6 +27,10 @@ import lombok.extern.slf4j.Slf4j; @Setter public class Room implements Closeable { + /** + * 是否关闭 + */ + private volatile boolean close = false; /** * 房间标识 */ @@ -49,10 +52,6 @@ public class Room implements Closeable { * 房间管理 */ private final RoomManager roomManager; - /** - * 系统上下文 - */ - private final ApplicationContext applicationContext; /** * 终端 */ @@ -61,11 +60,10 @@ public class Room implements Closeable { /** * @param mediaClient 媒体服务 */ - public Room(Client mediaClient, RoomManager roomManager, ApplicationContext applicationContext) { + public Room(Client mediaClient, RoomManager roomManager) { this.roomStatus = new RoomStatus(); this.mediaClient = mediaClient; this.roomManager = roomManager; - this.applicationContext = applicationContext; this.clients = new ConcurrentHashMap<>(); } @@ -101,7 +99,6 @@ public class Room implements Closeable { /** * 终端离开 - * 立即释放所有资源 * * @param client 终端 */ @@ -109,18 +106,13 @@ public class Room implements Closeable { synchronized (this.clients) { final ClientWrapper wrapper = this.clients.remove(client); if(wrapper != null) { - this.roomStatus.setClientSize(this.roomStatus.getClientSize() - 1); - // 删除消费者 - this.clients.values().stream() - .filter(v -> v != wrapper) - .forEach(v -> v.remove(wrapper)); - // TODO:删除数据消费者 - this.applicationContext.publishEvent(new RoomLeaveEvent(this, client)); try { wrapper.close(); } catch (Exception e) { - log.error("终端关闭异常", e); + log.error("关闭终端代理异常:{}", wrapper.getClientId(), e); } + this.roomStatus.setClientSize(this.roomStatus.getClientSize() - 1); + EventPublisher.publishEvent(new RoomLeaveEvent(this, client)); } } } @@ -145,6 +137,17 @@ public class Room implements Closeable { return this.mediaClient.request(message); } + /** + * 广播消息 + * 所有终端以及媒体服务 + * + * @param message 消息 + */ + public void broadcastAll(Message message) { + this.broadcast(message); + this.mediaClient.push(message); + } + /** * 广播消息 * @@ -200,9 +203,9 @@ public class Room implements Closeable { } /** + * @param producerId 生产者ID * - * @param producerId - * @return + * @return 生产者 */ public Producer producer(String producerId) { return this.clients.values().stream() @@ -212,11 +215,28 @@ public class Room implements Closeable { .orElse(null); } + /** + * @param consumerId 消费者ID + * + * @return 消费者 + */ + public Consumer consumer(String consumerId) { + return this.clients.values().stream() + .map(wrapper -> wrapper.getConsumers().get(consumerId)) + .filter(Objects::nonNull) + .findFirst() + .orElse(null); + } + @Override public void close() { + if(this.close) { + return; + } + this.close = true; log.info("关闭房间:{}", this.roomId); // TODO:关闭房间 - // TODO:媒体服务 + // TODO:媒体服务:直接没提服务关闭所有资源(通道、生产者、消费者) this.roomManager.remove(this); } diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/party/media/RoomManager.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/party/media/RoomManager.java index cfa7192..06c516e 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/party/media/RoomManager.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/party/media/RoomManager.java @@ -5,8 +5,6 @@ import java.util.Map; import java.util.Objects; import java.util.concurrent.CopyOnWriteArrayList; -import org.springframework.context.ApplicationContext; - import com.acgist.taoyao.boot.annotation.Manager; import com.acgist.taoyao.boot.config.Constant; import com.acgist.taoyao.boot.model.Message; @@ -28,17 +26,15 @@ public class RoomManager { private final IdService idService; private final ClientManager clientManager; - private final ApplicationContext applicationContext; /** * 房间列表 */ private final List rooms; - public RoomManager(IdService idService, ClientManager clientManager, ApplicationContext applicationContext) { + public RoomManager(IdService idService, ClientManager clientManager) { this.idService = idService; this.clientManager = clientManager; - this.applicationContext = applicationContext; this.rooms = new CopyOnWriteArrayList<>(); } @@ -121,7 +117,7 @@ public class RoomManager { } final String roomId = this.idService.buildUuid(); // 房间 - final Room room = new Room(mediaClient, this, this.applicationContext); + final Room room = new Room(mediaClient, this); room.setRoomId(roomId); room.setPassword(password); // 状态 diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/party/media/Transport.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/party/media/Transport.java index 8dab6a9..2618a4b 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/party/media/Transport.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/party/media/Transport.java @@ -39,9 +39,21 @@ public class Transport implements Closeable { * 通道标识 */ private final String transportId; + /** + * ICE协商 + */ private Object iceCandidates; + /** + * ICE参数 + */ private Object iceParameters; + /** + * DTLS参数 + */ private Object dtlsParameters; + /** + * SCTP参数 + */ private Object sctpParameters; public Transport(String transportId, Room room, Client client) { @@ -66,6 +78,7 @@ public class Transport implements Closeable { @Override public void close() { + // TODO:发送事件 } } diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/client/ClientConfigProtocol.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/client/ClientConfigProtocol.java index 229530b..38220e3 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/client/ClientConfigProtocol.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/client/ClientConfigProtocol.java @@ -73,7 +73,7 @@ public class ClientConfigProtocol extends ProtocolClientAdapter implements Appli // 日期时间 config.put(Constant.DATETIME, DateUtils.format(LocalDateTime.now(), DateTimeStyle.YYYYMMDDHH24MMSS)); // Web、摄像头:媒体配置 - if(clientType == ClientType.WEB || clientType == ClientType.CAMERA) { + if(clientType.mediaClient()) { config.put(Constant.MEDIA, this.mediaProperties); config.put(Constant.WEBRTC, this.webrtcProperties); } else { diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/client/ClientRegisterProtocol.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/client/ClientRegisterProtocol.java index 6ff96a6..fee11fb 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/client/ClientRegisterProtocol.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/client/ClientRegisterProtocol.java @@ -96,7 +96,7 @@ public class ClientRegisterProtocol extends ProtocolClientAdapter { // 终端上线事件 this.publishEvent(new ClientOnlineEvent(client)); // 媒体服务注册事件 - if(clientType == ClientType.MEDIA) { + if(clientType.mediaServer()) { this.publishEvent(new MediaClientRegisterEvent(client)); } } diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaAudioVolumeProtocol.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaAudioVolumeProtocol.java index 3620006..fc4dea4 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaAudioVolumeProtocol.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaAudioVolumeProtocol.java @@ -41,7 +41,7 @@ public class MediaAudioVolumeProtocol extends ProtocolRoomAdapter { @Override public void execute(String clientId, ClientType clientType, Room room, Client client, Client mediaClient, Message message, Map body) { - if(clientType == ClientType.MEDIA) { + if(clientType.mediaServer()) { room.broadcast(message); } else { this.logNoAdapter(clientType); diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaConsumeProtocol.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaConsumeProtocol.java index 2adbb25..10d209f 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaConsumeProtocol.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaConsumeProtocol.java @@ -33,7 +33,7 @@ import lombok.extern.slf4j.Slf4j; @Protocol @Description( memo = """ - 消费媒体:主动消费、终端生成媒体、终端创建WebRTC消费通道 + 消费媒体:主动消费、终端生产媒体、终端创建WebRTC消费通道 终端生产媒体当前房间所有终端根据订阅类型自动消费媒体 终端创建WebRTC消费通道根据订阅类型自动消费当前房间已有媒体 """, @@ -58,19 +58,19 @@ public class MediaConsumeProtocol extends ProtocolRoomAdapter implements Applica if(event.getProducer() != null) { // 生产媒体:其他终端消费 final Producer producer = event.getProducer(); - final ClientWrapper produceClientWrapper = producer.getProduceClient(); + final ClientWrapper produceClientWrapper = producer.getProducerClient(); room.getClients().values().stream() .filter(v -> v != produceClientWrapper) - .filter(v -> v.getSubscribeType().consume(producer)) - .forEach(v -> this.consume(room, v, producer)); + .filter(v -> v.getSubscribeType().canConsume(producer)) + .forEach(v -> this.consume(room, v, producer, this.build())); } else if(event.getClientWrapper() != null) { // 创建WebRTC消费通道:消费其他终端 final ClientWrapper consumeClientWrapper = event.getClientWrapper(); room.getClients().values().stream() .filter(v -> v != consumeClientWrapper) .flatMap(v -> v.getProducers().values().stream()) - .filter(v -> consumeClientWrapper.getSubscribeType().consume(v)) - .forEach(producer -> this.consume(room, consumeClientWrapper, producer)); + .filter(v -> consumeClientWrapper.getSubscribeType().canConsume(v)) + .forEach(producer -> this.consume(room, consumeClientWrapper, producer, this.build())); } else { throw MessageCodeException.of("消费媒体失败"); } @@ -80,26 +80,25 @@ public class MediaConsumeProtocol extends ProtocolRoomAdapter implements Applica public void execute(String clientId, ClientType clientType, Room room, Client client, Client mediaClient, Message message, Map body) { final String producerId = MapUtils.get(body, Constant.PRODUCER_ID); final Producer producer = room.producer(producerId); - if(clientType == ClientType.WEB || clientType == ClientType.CAMERA) { - // 请求消费媒体 - this.consume(room, room.clientWrapper(client), producer); - } else if(clientType == ClientType.MEDIA) { - // 通知终端准备:准备完成再次请求消费媒体结束媒体服务等待 + if(clientType.mediaClient()) { + // 主动请求消费 || 消费通道准备就绪 + this.consume(room, room.clientWrapper(client), producer, message); + } else if(clientType.mediaServer()) { + // 媒体通道准备就绪 final String kind = MapUtils.get(body, Constant.KIND); final String streamId = MapUtils.get(body, Constant.STREAM_ID); final String consumerId = MapUtils.get(body, Constant.CONSUMER_ID); - final String consumeClientId = MapUtils.get(body, Constant.CLIENT_ID); - final ClientWrapper consumeClientWrapper = room.clientWrapper(consumeClientId); - final Map consumers = consumeClientWrapper.getConsumers(); + final String consumerClientId = MapUtils.get(body, Constant.CLIENT_ID); + final ClientWrapper consumerClientWrapper = room.clientWrapper(consumerClientId); + final Map consumers = consumerClientWrapper.getConsumers(); final Map producerConsumers = producer.getConsumers(); - final Consumer consumer = new Consumer(consumeClientWrapper, producer, kind, streamId, consumerId); + final Consumer consumer = new Consumer(kind, streamId, consumerId, room, producer, consumerClientWrapper); final Consumer oldConsumer = consumers.put(producerId, consumer); final Consumer oldProducerConsumer = producerConsumers.put(consumerId, consumer); if(oldConsumer != null || oldProducerConsumer != null) { log.warn("消费者已经存在:{}", consumerId); - // TODO:关闭旧的? } - final Client consumeClient = consumeClientWrapper.getClient(); + final Client consumeClient = consumerClientWrapper.getClient(); consumeClient.push(message); } else { this.logNoAdapter(clientType); @@ -110,36 +109,39 @@ public class MediaConsumeProtocol extends ProtocolRoomAdapter implements Applica * 消费媒体 * * @param room 房间 - * @param consumeClientWrapper 消费者终端包装器 + * @param consumerClientWrapper 消费者终端包装器 * @param producer 生产者 + * @param message 消息 */ - private void consume(Room room, ClientWrapper consumeClientWrapper, Producer producer) { - if(consumeClientWrapper.consume(producer)) { - // TODO:回调媒体服务准备完成 - if(log.isDebugEnabled()) { - log.debug("已经消费媒体:{} - {}", consumeClientWrapper.getClientId(), producer.getStreamId()); - } - return; - } else { - if(log.isDebugEnabled()) { - log.debug("消费媒体:{} - {}", consumeClientWrapper.getClientId(), producer.getStreamId()); - } - } - final String clientId = consumeClientWrapper.getClientId(); - final String streamId = producer.getStreamId() + "->" + clientId; + private void consume(Room room, ClientWrapper consumerClientWrapper, Producer producer, Message message) { final Client mediaClient = room.getMediaClient(); - final Transport recvTransport = consumeClientWrapper.getRecvTransport(); - final ClientWrapper produceClientWrapper = producer.getProduceClient(); - final Map body = new HashMap<>(); - body.put(Constant.ROOM_ID, room.getRoomId()); - body.put(Constant.CLIENT_ID, clientId); - body.put(Constant.SOURCE_ID, produceClientWrapper.getClientId()); - body.put(Constant.STREAM_ID, streamId); - body.put(Constant.PRODUCER_ID, producer.getProducerId()); - body.put(Constant.TRANSPORT_ID, recvTransport.getTransportId()); - body.put(Constant.RTP_CAPABILITIES, consumeClientWrapper.getRtpCapabilities()); - body.put(Constant.SCTP_CAPABILITIES, consumeClientWrapper.getSctpCapabilities()); - mediaClient.push(this.build(body)); + if(consumerClientWrapper.consumed(producer)) { + // 消费通道准备就绪 + if(log.isDebugEnabled()) { + log.debug("消费通道准备就绪:{} - {}", consumerClientWrapper.getClientId(), producer.getStreamId()); + } + mediaClient.push(message); + } else { + // 主动消费媒体 + if(log.isDebugEnabled()) { + log.debug("消费媒体:{} - {}", consumerClientWrapper.getClientId(), producer.getStreamId()); + } + final String clientId = consumerClientWrapper.getClientId(); + final String streamId = producer.getStreamId() + "->" + clientId; + final Transport recvTransport = consumerClientWrapper.getRecvTransport(); + final ClientWrapper produceClientWrapper = producer.getProducerClient(); + final Map body = new HashMap<>(); + body.put(Constant.ROOM_ID, room.getRoomId()); + body.put(Constant.CLIENT_ID, clientId); + body.put(Constant.SOURCE_ID, produceClientWrapper.getClientId()); + body.put(Constant.STREAM_ID, streamId); + body.put(Constant.PRODUCER_ID, producer.getProducerId()); + body.put(Constant.TRANSPORT_ID, recvTransport.getTransportId()); + body.put(Constant.RTP_CAPABILITIES, consumerClientWrapper.getRtpCapabilities()); + body.put(Constant.SCTP_CAPABILITIES, consumerClientWrapper.getSctpCapabilities()); + message.setBody(body); + mediaClient.push(message); + } } } diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaConsumerCloseProtocol.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaConsumerCloseProtocol.java index 04fc463..f6d6914 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaConsumerCloseProtocol.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaConsumerCloseProtocol.java @@ -1,5 +1,75 @@ package com.acgist.taoyao.signal.protocol.media; -public class MediaConsumerCloseProtocol { +import java.util.Map; + +import org.springframework.context.ApplicationListener; + +import com.acgist.taoyao.boot.annotation.Description; +import com.acgist.taoyao.boot.annotation.Protocol; +import com.acgist.taoyao.boot.config.Constant; +import com.acgist.taoyao.boot.model.Message; +import com.acgist.taoyao.boot.utils.MapUtils; +import com.acgist.taoyao.signal.client.Client; +import com.acgist.taoyao.signal.client.ClientType; +import com.acgist.taoyao.signal.event.ConsumerCloseEvent; +import com.acgist.taoyao.signal.party.media.Consumer; +import com.acgist.taoyao.signal.party.media.Room; +import com.acgist.taoyao.signal.protocol.ProtocolRoomAdapter; + +import lombok.extern.slf4j.Slf4j; + +/** + * 关闭消费者信令 + * + * @author acgist + */ +@Slf4j +@Protocol +@Description( + body = """ + { + "consumerId": "消费者ID" + } + """, + flow = "终端->信令服务+)终端" +) +public class MediaConsumerCloseProtocol extends ProtocolRoomAdapter implements ApplicationListener { + + public static final String SIGNAL = "media::consumer::close"; + + public MediaConsumerCloseProtocol() { + super("关闭消费者信令", SIGNAL); + } + + @Override + public void onApplicationEvent(ConsumerCloseEvent event) { + final Room room = event.getRoom(); + final Map body = Map.of( + Constant.ROOM_ID, room.getRoomId(), + Constant.CONSUMER_ID, event.getConsumerId() + ); + this.close(room, this.build(body)); + } + + @Override + public void execute(String clientId, ClientType clientType, Room room, Client client, Client mediaClient, Message message, Map body) { + final String consumerId = MapUtils.get(body, Constant.CONSUMER_ID); + final Consumer consumer = room.consumer(consumerId); + if(consumer == null) { + log.warn("关闭消费者无效:{}", consumerId); + } else { + consumer.close(); + } + } + + /** + * 关闭消费者 + * + * @param room 房间 + * @param message 消息 + */ + private void close(Room room, Message message) { + room.broadcastAll(message); + } } diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaProduceProtocol.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaProduceProtocol.java index a640602..ad237ce 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaProduceProtocol.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaProduceProtocol.java @@ -57,7 +57,7 @@ public class MediaProduceProtocol extends ProtocolRoomAdapter { final String producerId = MapUtils.get(responseBody, Constant.PRODUCER_ID); final ClientWrapper clientWrapper = room.clientWrapper(client); final Map producers = clientWrapper.getProducers(); - final Producer producer = producers.computeIfAbsent(producerId, key -> new Producer(clientWrapper, kind, streamId, producerId)); + final Producer producer = producers.computeIfAbsent(producerId, key -> new Producer(kind, streamId, producerId, room, clientWrapper)); final Message responseMessage = response.cloneWithoutBody(); responseMessage.setBody(Map.of( Constant.KIND, kind, diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaRouterRtpCapabilitiesProtocol.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaRouterRtpCapabilitiesProtocol.java index 2b69fbb..dc1ff24 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaRouterRtpCapabilitiesProtocol.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaRouterRtpCapabilitiesProtocol.java @@ -42,7 +42,7 @@ public class MediaRouterRtpCapabilitiesProtocol extends ProtocolRoomAdapter { @Override public void execute(String clientId, ClientType clientType, Room room, Client client, Client mediaClient, Message message, Map body) { - if(clientType == ClientType.WEB || clientType == ClientType.CAMERA) { + if(clientType.mediaClient()) { client.push(room.request(message)); } else { this.logNoAdapter(clientType); diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaTransportWebRtcConnectProtocol.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaTransportWebRtcConnectProtocol.java index a617564..d764c88 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaTransportWebRtcConnectProtocol.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaTransportWebRtcConnectProtocol.java @@ -38,7 +38,7 @@ public class MediaTransportWebRtcConnectProtocol extends ProtocolRoomAdapter { @Override public void execute(String clientId, ClientType clientType, Room room, Client client, Client mediaClient, Message message, Map body) { - if(clientType == ClientType.WEB || clientType == ClientType.CAMERA) { + if(clientType.mediaClient()) { final Message response = room.request(message); final Map responseBody = response.body(); client.push(response); diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/room/RoomCloseProtocol.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/room/RoomCloseProtocol.java index d7332b3..29c621b 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/room/RoomCloseProtocol.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/room/RoomCloseProtocol.java @@ -23,7 +23,7 @@ import com.acgist.taoyao.signal.protocol.ProtocolRoomAdapter; "roomId": "房间ID" } """, - flow = "终端->信令服务->媒体服务->信令服务+)终端" + flow = "终端->信令服务+)终端" ) public class RoomCloseProtocol extends ProtocolRoomAdapter { @@ -35,9 +35,10 @@ public class RoomCloseProtocol extends ProtocolRoomAdapter { @Override public void execute(String clientId, ClientType clientType, Room room, Client client, Client mediaClient, Message message, Map body) { - if(clientType == ClientType.WEB) { + // TODO:改为星型 + if(clientType.web()) { mediaClient.push(this.build(Map.of(Constant.ROOM_ID, room.getRoomId()))); - } else if(clientType == ClientType.MEDIA) { + } else if(clientType.mediaServer()) { room.close(); room.broadcast(message); } else { diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/room/RoomCreateProtocol.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/room/RoomCreateProtocol.java index 840d345..13d5490 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/room/RoomCreateProtocol.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/room/RoomCreateProtocol.java @@ -57,7 +57,7 @@ public class RoomCreateProtocol extends ProtocolClientAdapter implements Applica @Override public void execute(String clientId, ClientType clientType, Client client, Message message, Map body) { - if(clientType == ClientType.WEB) { + if(clientType.web()) { // WEB同步创建房间 final Room room = this.roomManager.create( MapUtils.get(body, Constant.NAME), diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/room/RoomEnterProtocol.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/room/RoomEnterProtocol.java index b80756e..353a282 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/room/RoomEnterProtocol.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/room/RoomEnterProtocol.java @@ -52,7 +52,7 @@ public class RoomEnterProtocol extends ProtocolRoomAdapter { @Override public void execute(String clientId, ClientType clientType, Room room, Client client, Client mediaClient, Message message, Map body) { - if(clientType == ClientType.WEB || clientType == ClientType.CAMERA) { + if(clientType.mediaClient()) { this.enter(clientId, room, client, message, body); } else { this.logNoAdapter(clientType);