diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/party/media/ClientWrapper.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/party/media/ClientWrapper.java index 477fc30..39f17ab 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/party/media/ClientWrapper.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/party/media/ClientWrapper.java @@ -2,7 +2,6 @@ package com.acgist.taoyao.signal.party.media; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import java.util.stream.Collectors; import com.acgist.taoyao.signal.client.Client; @@ -11,8 +10,7 @@ import lombok.Setter; import lombok.extern.slf4j.Slf4j; /** - * 终端包装器:Peer - * 视频房间使用 + * 终端包装器 * * @author acgist */ @@ -25,103 +23,96 @@ public class ClientWrapper implements AutoCloseable { * 房间 */ private final Room room; - /** - * 终端 - */ - private final Client client; - /** - * 房间标识 - */ - private final String roomId; - /** - * 终端标识 - */ - private final String clientId; - /** - * 媒体订阅类型 - * 指定订阅类型终端注册或者生成媒体后会自动进行媒体推流拉流 - * 没有订阅任何媒体时需要用户自己对媒体进行消费控制 - */ - private SubscribeType subscribeType; - /** - * RTP协商 - */ - private Object rtpCapabilities; - /** - * SCTP协商 - */ - private Object sctpCapabilities; - /** - * 媒体录像 - */ - private Recorder recorder; - /** - * 发送通道 - */ - private Transport sendTransport; - /** - * 接收通道 - */ - private Transport recvTransport; - /** - * 生产者 - * 其他终端消费当前终端的消费者 - */ - private final Map producers; + /** + * 终端 + */ + private final Client client; + /** + * 房间ID + */ + private final String roomId; + /** + * 终端ID + */ + private final String clientId; + /** + * 媒体订阅类型 + * 指定订阅类型终端注册或者生成媒体后会自动进行媒体推流拉流 + * 没有订阅任何媒体时需要用户自己对媒体进行消费控制 + */ + private SubscribeType subscribeType; + /** + * RTP协商 + */ + private Object rtpCapabilities; + /** + * SCTP协商 + */ + private Object sctpCapabilities; + /** + * 服务端媒体录像机 + */ + private Recorder recorder; + /** + * 发送通道 + */ + private Transport sendTransport; + /** + * 接收通道 + */ + private Transport recvTransport; + /** + * 生产者 + * 其他终端消费当前终端的生产者 + */ + private final Map producers; /** * 消费者 * 当前终端消费其他终端的消费者 */ private final Map consumers; - /** - * 数据生产者 - * 其他终端消费当前终端的消费者 - */ - private final Map dataProducers; - /** - * 数据消费者 - * 当前终端消费其他终端的消费者 - */ - private final Map dataConsumers; - + /** + * 数据生产者 + * 其他终端消费当前终端的生产者 + */ + private final Map dataProducers; + /** + * 数据消费者 + * 当前终端消费其他终端的消费者 + */ + private final Map dataConsumers; + public ClientWrapper(Room room, Client client) { - this.room = room; - this.client = client; - this.roomId = room.getRoomId(); - this.clientId = client.getClientId(); - this.producers = new ConcurrentHashMap<>(); - this.consumers = new ConcurrentHashMap<>(); + this.room = room; + this.client = client; + this.roomId = room.getRoomId(); + this.clientId = client.getClientId(); + this.producers = new ConcurrentHashMap<>(); + this.consumers = new ConcurrentHashMap<>(); this.dataProducers = new ConcurrentHashMap<>(); this.dataConsumers = new ConcurrentHashMap<>(); } - - /** - * @return 生产者数量 - */ - public Integer producerSize() { - return this.producers.size(); - } - - /** - * @return 消费者数量 - */ - public Integer consumerSize() { - return this.producers.values().stream() - .map(producer -> producer.getConsumers().size()) - .collect(Collectors.counting()) - .intValue(); - } /** * @param producer 生产者 * - * @return 是否已经消费 + * @return 是否已经消费生产者 */ public boolean consumed(Producer producer) { return this.consumers.values().stream() .anyMatch(v -> v.getProducer() == producer); } + /** + * @param dataProducer 数据生产者 + * + * @return 是否已经消费数据生产者 + */ + public boolean consumedData(DataProducer dataProducer) { + return this.dataConsumers.values().stream() + .anyMatch(v -> v.getDataProducer() == dataProducer); + } + @Override public void close() { // 注意:不要关闭终端(只是离开房间) diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/party/media/Consumer.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/party/media/Consumer.java index 56a6fa4..ac3770a 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/party/media/Consumer.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/party/media/Consumer.java @@ -28,7 +28,7 @@ public class Consumer extends OperatorAdapter { */ private final String streamId; /** - * 消费者标识 + * 消费者ID */ private final String consumerId; /** @@ -45,11 +45,11 @@ public class Consumer extends OperatorAdapter { private final ClientWrapper consumerClient; public Consumer(String kind, String streamId, String consumerId, Room room, Producer producer, ClientWrapper consumerClient) { - this.kind = Kind.of(kind); - this.streamId = streamId; - this.consumerId = consumerId; - this.room = room; - this.producer = producer; + this.kind = Kind.of(kind); + this.streamId = streamId; + this.consumerId = consumerId; + this.room = room; + this.producer = producer; this.consumerClient = consumerClient; } @@ -72,19 +72,19 @@ public class Consumer extends OperatorAdapter { @Override public void pause() { - log.info("暂停消费者:{} - {}", this.streamId, this.consumerId); + log.debug("暂停消费者:{} - {}", this.streamId, this.consumerId); EventPublisher.publishEvent(new MediaConsumerPauseEvent(this.consumerId, this.room)); } @Override public void resume() { - log.info("恢复消费者:{} - {}", this.streamId, this.consumerId); + log.debug("恢复消费者:{} - {}", this.streamId, this.consumerId); EventPublisher.publishEvent(new MediaConsumerResumeEvent(this.consumerId, this.room)); } @Override public void log() { - log.debug("当前消费者:{} - {} - {}", this.consumerId, this.kind, this.streamId); + log.info("当前消费者:{} - {} - {}", this.streamId, this.consumerId, this.kind); } } diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/party/media/DataConsumer.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/party/media/DataConsumer.java index 00ed093..c619cd6 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/party/media/DataConsumer.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/party/media/DataConsumer.java @@ -22,7 +22,7 @@ public class DataConsumer extends OperatorAdapter { */ private final String streamId; /** - * 消费者标识 + * 消费者ID */ private final String consumerId; /** @@ -39,10 +39,10 @@ public class DataConsumer extends OperatorAdapter { private final ClientWrapper consumerClient; public DataConsumer(String streamId, String consumerId, Room room, DataProducer dataProducer, ClientWrapper consumerClient) { - this.streamId = streamId; - this.consumerId = consumerId; - this.room = room; - this.dataProducer = dataProducer; + this.streamId = streamId; + this.consumerId = consumerId; + this.room = room; + this.dataProducer = dataProducer; this.consumerClient = consumerClient; } @@ -65,7 +65,7 @@ public class DataConsumer extends OperatorAdapter { @Override public void log() { - log.debug("当前数据消费者:{} - {}", this.consumerId, this.streamId); + log.info("当前数据消费者:{} - {}", this.streamId, this.consumerId); } } diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/party/media/DataProducer.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/party/media/DataProducer.java index 8fa9c3d..10180ae 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/party/media/DataProducer.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/party/media/DataProducer.java @@ -25,7 +25,7 @@ public class DataProducer extends OperatorAdapter { */ private final String streamId; /** - * 生产者标识 + * 生产者ID */ private final String producerId; /** @@ -43,11 +43,11 @@ public class DataProducer extends OperatorAdapter { private final Map dataConsumers; public DataProducer(String streamId, String producerId, Room room, ClientWrapper producerClient) { - this.streamId = streamId; - this.producerId = producerId; - this.room = room; + this.streamId = streamId; + this.producerId = producerId; + this.room = room; this.producerClient = producerClient; - this.dataConsumers = new ConcurrentHashMap<>(); + this.dataConsumers = new ConcurrentHashMap<>(); } @Override @@ -69,7 +69,7 @@ public class DataProducer extends OperatorAdapter { @Override public void log() { - log.debug("当前数据生产者:{} - {}", this.producerId, this.streamId); + log.info("当前数据生产者:{} - {}", this.streamId, this.producerId); this.dataConsumers.values().forEach(DataConsumer::log); } diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/party/media/Kind.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/party/media/Kind.java index a78f099..d7f2b26 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/party/media/Kind.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/party/media/Kind.java @@ -24,7 +24,8 @@ public enum Kind { * @return 类型 */ public static final Kind of(String value) { - for (Kind kind : Kind.values()) { + final Kind[] values = Kind.values(); + for (Kind kind : values) { if(kind.name().equalsIgnoreCase(value)) { return kind; } diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/party/media/OperatorAdapter.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/party/media/OperatorAdapter.java index a164e5f..27acbf2 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/party/media/OperatorAdapter.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/party/media/OperatorAdapter.java @@ -1,7 +1,7 @@ package com.acgist.taoyao.signal.party.media; /** - * 关闭移除接口适配器 + * 操作接口适配器 * * @author acgist */ diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/party/media/Producer.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/party/media/Producer.java index 3a6c525..4e1acae 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/party/media/Producer.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/party/media/Producer.java @@ -31,7 +31,7 @@ public class Producer extends OperatorAdapter { */ private final String streamId; /** - * 生产者标识 + * 生产者ID */ private final String producerId; /** @@ -49,12 +49,12 @@ public class Producer extends OperatorAdapter { private final Map consumers; public Producer(String kind, String streamId, String producerId, Room room, ClientWrapper producerClient) { - this.kind = Kind.of(kind); - this.streamId = streamId; - this.producerId = producerId; - this.room = room; + this.kind = Kind.of(kind); + this.streamId = streamId; + this.producerId = producerId; + this.room = room; this.producerClient = producerClient; - this.consumers = new ConcurrentHashMap<>(); + this.consumers = new ConcurrentHashMap<>(); } @Override @@ -76,19 +76,19 @@ public class Producer extends OperatorAdapter { @Override public void pause() { - log.info("暂停生产者:{} - {}", this.streamId, this.producerId); + log.debug("暂停生产者:{} - {}", this.streamId, this.producerId); EventPublisher.publishEvent(new MediaProducerPauseEvent(this.producerId, this.room)); } @Override public void resume() { - log.info("恢复生产者:{} - {}", this.streamId, this.producerId); + log.debug("恢复生产者:{} - {}", this.streamId, this.producerId); EventPublisher.publishEvent(new MediaProducerResumeEvent(this.producerId, this.room)); } @Override public void log() { - log.info("当前生产者:{} - {} - {}", this.producerId, this.kind, this.streamId); + log.info("当前生产者:{} - {} - {}", this.streamId, this.producerId, this.kind); this.consumers.values().forEach(Consumer::log); } diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/party/media/Recorder.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/party/media/Recorder.java index e28e2b6..e2d513f 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/party/media/Recorder.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/party/media/Recorder.java @@ -23,7 +23,7 @@ import lombok.Setter; import lombok.extern.slf4j.Slf4j; /** - * 媒体录像机 + * 服务端媒体录像机 * * OPUS = 100 * VP8 = 101 diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaDataConsumeProtocol.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaDataConsumeProtocol.java index 696c90f..95f8d2a 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaDataConsumeProtocol.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaDataConsumeProtocol.java @@ -22,6 +22,8 @@ import lombok.extern.slf4j.Slf4j; /** * 消费数据信令 * + * TODO:防止重复消费 + * * @author acgist */ @Slf4j