[*] 日常优化

This commit is contained in:
acgist
2023-07-24 08:11:48 +08:00
parent 16ff2e7f34
commit c0657770e8
9 changed files with 108 additions and 114 deletions

View File

@@ -2,7 +2,6 @@ package com.acgist.taoyao.signal.party.media;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import com.acgist.taoyao.signal.client.Client; import com.acgist.taoyao.signal.client.Client;
@@ -11,8 +10,7 @@ import lombok.Setter;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
/** /**
* 终端包装器Peer * 终端包装器
* 视频房间使用
* *
* @author acgist * @author acgist
*/ */
@@ -25,103 +23,96 @@ public class ClientWrapper implements AutoCloseable {
* 房间 * 房间
*/ */
private final Room room; private final Room room;
/** /**
* 终端 * 终端
*/ */
private final Client client; private final Client client;
/** /**
* 房间标识 * 房间ID
*/ */
private final String roomId; private final String roomId;
/** /**
* 终端标识 * 终端ID
*/ */
private final String clientId; private final String clientId;
/** /**
* 媒体订阅类型 * 媒体订阅类型
* 指定订阅类型终端注册或者生成媒体后会自动进行媒体推流拉流 * 指定订阅类型终端注册或者生成媒体后会自动进行媒体推流拉流
* 没有订阅任何媒体时需要用户自己对媒体进行消费控制 * 没有订阅任何媒体时需要用户自己对媒体进行消费控制
*/ */
private SubscribeType subscribeType; private SubscribeType subscribeType;
/** /**
* RTP协商 * RTP协商
*/ */
private Object rtpCapabilities; private Object rtpCapabilities;
/** /**
* SCTP协商 * SCTP协商
*/ */
private Object sctpCapabilities; private Object sctpCapabilities;
/** /**
* 媒体录像 * 服务端媒体录像
*/ */
private Recorder recorder; private Recorder recorder;
/** /**
* 发送通道 * 发送通道
*/ */
private Transport sendTransport; private Transport sendTransport;
/** /**
* 接收通道 * 接收通道
*/ */
private Transport recvTransport; private Transport recvTransport;
/** /**
* 生产者 * 生产者
* 其他终端消费当前终端的消费 * 其他终端消费当前终端的生产
*/ */
private final Map<String, Producer> producers; private final Map<String, Producer> producers;
/** /**
* 消费者 * 消费者
* 当前终端消费其他终端的消费者 * 当前终端消费其他终端的消费者
*/ */
private final Map<String, Consumer> consumers; private final Map<String, Consumer> consumers;
/** /**
* 数据生产者 * 数据生产者
* 其他终端消费当前终端的消费 * 其他终端消费当前终端的生产
*/ */
private final Map<String, DataProducer> dataProducers; private final Map<String, DataProducer> dataProducers;
/** /**
* 数据消费者 * 数据消费者
* 当前终端消费其他终端的消费者 * 当前终端消费其他终端的消费者
*/ */
private final Map<String, DataConsumer> dataConsumers; private final Map<String, DataConsumer> dataConsumers;
public ClientWrapper(Room room, Client client) { public ClientWrapper(Room room, Client client) {
this.room = room; this.room = room;
this.client = client; this.client = client;
this.roomId = room.getRoomId(); this.roomId = room.getRoomId();
this.clientId = client.getClientId(); this.clientId = client.getClientId();
this.producers = new ConcurrentHashMap<>(); this.producers = new ConcurrentHashMap<>();
this.consumers = new ConcurrentHashMap<>(); this.consumers = new ConcurrentHashMap<>();
this.dataProducers = new ConcurrentHashMap<>(); this.dataProducers = new ConcurrentHashMap<>();
this.dataConsumers = new ConcurrentHashMap<>(); this.dataConsumers = new ConcurrentHashMap<>();
} }
/**
* @return 生产者数量
*/
public Integer producerSize() {
return this.producers.size();
}
/**
* @return 消费者数量
*/
public Integer consumerSize() {
return this.producers.values().stream()
.map(producer -> producer.getConsumers().size())
.collect(Collectors.counting())
.intValue();
}
/** /**
* @param producer 生产者 * @param producer 生产者
* *
* @return 是否已经消费 * @return 是否已经消费生产者
*/ */
public boolean consumed(Producer producer) { public boolean consumed(Producer producer) {
return this.consumers.values().stream() return this.consumers.values().stream()
.anyMatch(v -> v.getProducer() == producer); .anyMatch(v -> v.getProducer() == producer);
} }
/**
* @param dataProducer 数据生产者
*
* @return 是否已经消费数据生产者
*/
public boolean consumedData(DataProducer dataProducer) {
return this.dataConsumers.values().stream()
.anyMatch(v -> v.getDataProducer() == dataProducer);
}
@Override @Override
public void close() { public void close() {
// 注意:不要关闭终端(只是离开房间) // 注意:不要关闭终端(只是离开房间)

View File

@@ -28,7 +28,7 @@ public class Consumer extends OperatorAdapter {
*/ */
private final String streamId; private final String streamId;
/** /**
* 消费者标识 * 消费者ID
*/ */
private final String consumerId; private final String consumerId;
/** /**
@@ -45,11 +45,11 @@ public class Consumer extends OperatorAdapter {
private final ClientWrapper consumerClient; private final ClientWrapper consumerClient;
public Consumer(String kind, String streamId, String consumerId, Room room, Producer producer, ClientWrapper consumerClient) { public Consumer(String kind, String streamId, String consumerId, Room room, Producer producer, ClientWrapper consumerClient) {
this.kind = Kind.of(kind); this.kind = Kind.of(kind);
this.streamId = streamId; this.streamId = streamId;
this.consumerId = consumerId; this.consumerId = consumerId;
this.room = room; this.room = room;
this.producer = producer; this.producer = producer;
this.consumerClient = consumerClient; this.consumerClient = consumerClient;
} }
@@ -72,19 +72,19 @@ public class Consumer extends OperatorAdapter {
@Override @Override
public void pause() { public void pause() {
log.info("暂停消费者:{} - {}", this.streamId, this.consumerId); log.debug("暂停消费者:{} - {}", this.streamId, this.consumerId);
EventPublisher.publishEvent(new MediaConsumerPauseEvent(this.consumerId, this.room)); EventPublisher.publishEvent(new MediaConsumerPauseEvent(this.consumerId, this.room));
} }
@Override @Override
public void resume() { public void resume() {
log.info("恢复消费者:{} - {}", this.streamId, this.consumerId); log.debug("恢复消费者:{} - {}", this.streamId, this.consumerId);
EventPublisher.publishEvent(new MediaConsumerResumeEvent(this.consumerId, this.room)); EventPublisher.publishEvent(new MediaConsumerResumeEvent(this.consumerId, this.room));
} }
@Override @Override
public void log() { public void log() {
log.debug("当前消费者:{} - {} - {}", this.consumerId, this.kind, this.streamId); log.info("当前消费者:{} - {} - {}", this.streamId, this.consumerId, this.kind);
} }
} }

View File

@@ -22,7 +22,7 @@ public class DataConsumer extends OperatorAdapter {
*/ */
private final String streamId; private final String streamId;
/** /**
* 消费者标识 * 消费者ID
*/ */
private final String consumerId; private final String consumerId;
/** /**
@@ -39,10 +39,10 @@ public class DataConsumer extends OperatorAdapter {
private final ClientWrapper consumerClient; private final ClientWrapper consumerClient;
public DataConsumer(String streamId, String consumerId, Room room, DataProducer dataProducer, ClientWrapper consumerClient) { public DataConsumer(String streamId, String consumerId, Room room, DataProducer dataProducer, ClientWrapper consumerClient) {
this.streamId = streamId; this.streamId = streamId;
this.consumerId = consumerId; this.consumerId = consumerId;
this.room = room; this.room = room;
this.dataProducer = dataProducer; this.dataProducer = dataProducer;
this.consumerClient = consumerClient; this.consumerClient = consumerClient;
} }
@@ -65,7 +65,7 @@ public class DataConsumer extends OperatorAdapter {
@Override @Override
public void log() { public void log() {
log.debug("当前数据消费者:{} - {}", this.consumerId, this.streamId); log.info("当前数据消费者:{} - {}", this.streamId, this.consumerId);
} }
} }

View File

@@ -25,7 +25,7 @@ public class DataProducer extends OperatorAdapter {
*/ */
private final String streamId; private final String streamId;
/** /**
* 生产者标识 * 生产者ID
*/ */
private final String producerId; private final String producerId;
/** /**
@@ -43,11 +43,11 @@ public class DataProducer extends OperatorAdapter {
private final Map<String, DataConsumer> dataConsumers; private final Map<String, DataConsumer> dataConsumers;
public DataProducer(String streamId, String producerId, Room room, ClientWrapper producerClient) { public DataProducer(String streamId, String producerId, Room room, ClientWrapper producerClient) {
this.streamId = streamId; this.streamId = streamId;
this.producerId = producerId; this.producerId = producerId;
this.room = room; this.room = room;
this.producerClient = producerClient; this.producerClient = producerClient;
this.dataConsumers = new ConcurrentHashMap<>(); this.dataConsumers = new ConcurrentHashMap<>();
} }
@Override @Override
@@ -69,7 +69,7 @@ public class DataProducer extends OperatorAdapter {
@Override @Override
public void log() { public void log() {
log.debug("当前数据生产者:{} - {}", this.producerId, this.streamId); log.info("当前数据生产者:{} - {}", this.streamId, this.producerId);
this.dataConsumers.values().forEach(DataConsumer::log); this.dataConsumers.values().forEach(DataConsumer::log);
} }

View File

@@ -24,7 +24,8 @@ public enum Kind {
* @return 类型 * @return 类型
*/ */
public static final Kind of(String value) { public static final Kind of(String value) {
for (Kind kind : Kind.values()) { final Kind[] values = Kind.values();
for (Kind kind : values) {
if(kind.name().equalsIgnoreCase(value)) { if(kind.name().equalsIgnoreCase(value)) {
return kind; return kind;
} }

View File

@@ -1,7 +1,7 @@
package com.acgist.taoyao.signal.party.media; package com.acgist.taoyao.signal.party.media;
/** /**
* 关闭移除接口适配器 * 操作接口适配器
* *
* @author acgist * @author acgist
*/ */

View File

@@ -31,7 +31,7 @@ public class Producer extends OperatorAdapter {
*/ */
private final String streamId; private final String streamId;
/** /**
* 生产者标识 * 生产者ID
*/ */
private final String producerId; private final String producerId;
/** /**
@@ -49,12 +49,12 @@ public class Producer extends OperatorAdapter {
private final Map<String, Consumer> consumers; private final Map<String, Consumer> consumers;
public Producer(String kind, String streamId, String producerId, Room room, ClientWrapper producerClient) { public Producer(String kind, String streamId, String producerId, Room room, ClientWrapper producerClient) {
this.kind = Kind.of(kind); this.kind = Kind.of(kind);
this.streamId = streamId; this.streamId = streamId;
this.producerId = producerId; this.producerId = producerId;
this.room = room; this.room = room;
this.producerClient = producerClient; this.producerClient = producerClient;
this.consumers = new ConcurrentHashMap<>(); this.consumers = new ConcurrentHashMap<>();
} }
@Override @Override
@@ -76,19 +76,19 @@ public class Producer extends OperatorAdapter {
@Override @Override
public void pause() { public void pause() {
log.info("暂停生产者:{} - {}", this.streamId, this.producerId); log.debug("暂停生产者:{} - {}", this.streamId, this.producerId);
EventPublisher.publishEvent(new MediaProducerPauseEvent(this.producerId, this.room)); EventPublisher.publishEvent(new MediaProducerPauseEvent(this.producerId, this.room));
} }
@Override @Override
public void resume() { public void resume() {
log.info("恢复生产者:{} - {}", this.streamId, this.producerId); log.debug("恢复生产者:{} - {}", this.streamId, this.producerId);
EventPublisher.publishEvent(new MediaProducerResumeEvent(this.producerId, this.room)); EventPublisher.publishEvent(new MediaProducerResumeEvent(this.producerId, this.room));
} }
@Override @Override
public void log() { public void log() {
log.info("当前生产者:{} - {} - {}", this.producerId, this.kind, this.streamId); log.info("当前生产者:{} - {} - {}", this.streamId, this.producerId, this.kind);
this.consumers.values().forEach(Consumer::log); this.consumers.values().forEach(Consumer::log);
} }

View File

@@ -23,7 +23,7 @@ import lombok.Setter;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
/** /**
* 媒体录像机 * 服务端媒体录像机
* *
* OPUS = 100 * OPUS = 100
* VP8 = 101 * VP8 = 101

View File

@@ -22,6 +22,8 @@ import lombok.extern.slf4j.Slf4j;
/** /**
* 消费数据信令 * 消费数据信令
* *
* TODO防止重复消费
*
* @author acgist * @author acgist
*/ */
@Slf4j @Slf4j