[*] 拉取bug

This commit is contained in:
acgist
2023-03-09 08:10:21 +08:00
parent 4b709f2944
commit 2a7be453f3
12 changed files with 81 additions and 90 deletions

View File

@@ -63,7 +63,7 @@ public class Consumer implements Closeable {
return;
}
this.close = true;
log.info("关闭消费者:{}", this.consumerId);
log.info("关闭消费者:{} - {}", this.streamId, this.consumerId);
this.getProducer().remove(this.consumerId);
this.consumerClient.getConsumers().remove(this.consumerId);
EventPublisher.publishEvent(new MediaConsumerCloseEvent(this.consumerId, this.room));

View File

@@ -75,7 +75,7 @@ public class Producer implements Closeable {
return;
}
this.close = true;
log.info("关闭生产者:{}", this.producerId);
log.info("关闭生产者:{} - {}", this.streamId, this.producerId);
this.consumers.forEach((k, v) -> v.close());
this.producerClient.getProducers().remove(this.producerId);
EventPublisher.publishEvent(new MediaProducerCloseEvent(this.producerId, this.room));

View File

@@ -115,26 +115,21 @@ public class MediaConsumeProtocol extends ProtocolRoomAdapter implements Applica
*/
private void consume(Room room, ClientWrapper consumerClientWrapper, Producer producer, Message message) {
final Client mediaClient = room.getMediaClient();
final String consumerClientId = consumerClientWrapper.getClientId();
final String streamId = producer.getStreamId() + "->" + consumerClientId;
final ClientWrapper producerClientWrapper = producer.getProducerClient();
final String producerClientId = producerClientWrapper.getClientId();
if(consumerClientWrapper.consumed(producer)) {
// TODO没有清理干净
// 消费通道准备就绪
if(log.isDebugEnabled()) {
log.debug("消费通道准备就绪:{} - {}", consumerClientWrapper.getClientId(), producer.getStreamId());
}
// 消费通道就绪
mediaClient.push(message);
log.info("{}消费通道就绪:{}", consumerClientId, streamId);
} else {
// 主动消费媒体
if(log.isDebugEnabled()) {
log.debug("消费媒体:{} - {}", consumerClientWrapper.getClientId(), producer.getStreamId());
}
final String clientId = consumerClientWrapper.getClientId();
final String streamId = producer.getStreamId() + "->" + clientId;
final Transport recvTransport = consumerClientWrapper.getRecvTransport();
final ClientWrapper produceClientWrapper = producer.getProducerClient();
final Map<String, Object> body = new HashMap<>();
body.put(Constant.ROOM_ID, room.getRoomId());
body.put(Constant.CLIENT_ID, clientId);
body.put(Constant.SOURCE_ID, produceClientWrapper.getClientId());
body.put(Constant.CLIENT_ID, consumerClientId);
body.put(Constant.SOURCE_ID, producerClientId);
body.put(Constant.STREAM_ID, streamId);
body.put(Constant.PRODUCER_ID, producer.getProducerId());
body.put(Constant.TRANSPORT_ID, recvTransport.getTransportId());
@@ -142,6 +137,7 @@ public class MediaConsumeProtocol extends ProtocolRoomAdapter implements Applica
body.put(Constant.SCTP_CAPABILITIES, consumerClientWrapper.getSctpCapabilities());
message.setBody(body);
mediaClient.push(message);
log.info("{}主动消费媒体:{} - {}", consumerClientId, producerClientId, streamId);
}
}

View File

@@ -60,7 +60,7 @@ public class MediaConsumerCloseProtocol extends ProtocolRoomAdapter implements A
final String consumerId = MapUtils.get(body, Constant.CONSUMER_ID);
final Consumer consumer = room.consumer(consumerId);
if(consumer == null) {
log.warn("关闭消费者无效:{}", consumerId);
log.debug("关闭消费者无效:{}", consumerId);
} else {
consumer.close();
}

View File

@@ -70,7 +70,7 @@ public class MediaProduceProtocol extends ProtocolRoomAdapter {
// 视频全收:广播音频
// 全部不收:全部广播
room.broadcast(responseMessage);
log.info("{}生产媒体:{} - {} - {}", clientId, kind, streamId, producerId);
log.info("{}生产媒体:{} - {}", clientId, streamId, producerId);
this.publishEvent(new MediaConsumeEvent(room, producer));
}