[*] 日常优化

This commit is contained in:
acgist
2023-08-06 07:44:37 +08:00
parent 3501308dbd
commit 0002f55424
7 changed files with 83 additions and 53 deletions

View File

@@ -25,4 +25,4 @@ MEDIASOUP_LISTEN_IP=0.0.0.0
# Mediasoup监听端口起始端口 # Mediasoup监听端口起始端口
MEDIASOUP_LISTEN_PORT=44444 MEDIASOUP_LISTEN_PORT=44444
# Mediasoup声明地址不能配置环回地址 # Mediasoup声明地址不能配置环回地址
MEDIASOUP_ANNOUNCED_IP=192.168.1.110 MEDIASOUP_ANNOUNCED_IP=192.168.1.100

View File

@@ -426,6 +426,9 @@ class Taoyao {
case "media::data::consumer::close": case "media::data::consumer::close":
me.mediaDataConsumerClose(message, body); me.mediaDataConsumerClose(message, body);
break; break;
case "media::data::consumer::status":
me.mediaDataConsumerStatus(message, body);
break;
case "media::data::produce": case "media::data::produce":
me.mediaDataProduce(message, body); me.mediaDataProduce(message, body);
break; break;
@@ -1413,6 +1416,32 @@ class Taoyao {
} }
} }
/**
* 查询数据消费者状态信令
*
* @param {*} message 信令消息
* @param {*} body 消息主体
*/
async mediaDataConsumerStatus(message, body) {
const me = this;
const {
roomId,
consumerId,
} = body;
const room = me.rooms.get(roomId);
const dataConsumer = room?.dataConsumers.get(consumerId);
if(dataConsumer) {
console.debug("查询消费者状态", consumerId);
message.body = {
...body,
status: await dataConsumer.getStats()
};
me.push(message);
} else {
console.debug("查询消费者状态(无效)", consumerId);
}
}
/** /**
* 生产数据信令 * 生产数据信令
* *

View File

@@ -1668,7 +1668,7 @@ class Taoyao extends RemoteClient {
*/ */
async mediaDataConsumerStatus(consumerId) { async mediaDataConsumerStatus(consumerId) {
const me = this; const me = this;
return await me.request(protocol.buildMessage('media::data::consumer::status', { return await me.request(protocol.buildMessage("media::data::consumer::status", {
roomId: me.roomId, roomId: me.roomId,
consumerId consumerId
})); }));
@@ -2384,7 +2384,7 @@ class Taoyao extends RemoteClient {
async mediaProduce(audioTrack, videoTrack) { async mediaProduce(audioTrack, videoTrack) {
const me = this; const me = this;
if(!audioTrack || !videoTrack) { if(!audioTrack || !videoTrack) {
me.checkDevice(); await me.checkDevice();
} }
await me.createSendTransport(); await me.createSendTransport();
await me.createRecvTransport(); await me.createRecvTransport();

View File

@@ -96,17 +96,17 @@ public class MediaConsumeProtocol extends ProtocolRoomAdapter implements Applica
this.consume(room, room.clientWrapper(client), producer, message); this.consume(room, room.clientWrapper(client), producer, message);
} else if(clientType.mediaServer()) { } else if(clientType.mediaServer()) {
// 媒体通道准备就绪 // 媒体通道准备就绪
final String kind = MapUtils.get(body, Constant.KIND); final String kind = MapUtils.get(body, Constant.KIND);
final String streamId = MapUtils.get(body, Constant.STREAM_ID); final String streamId = MapUtils.get(body, Constant.STREAM_ID);
final String consumerId = MapUtils.get(body, Constant.CONSUMER_ID); final String consumerId = MapUtils.get(body, Constant.CONSUMER_ID);
final String consumerClientId = MapUtils.get(body, Constant.CLIENT_ID); final String consumerClientId = MapUtils.get(body, Constant.CLIENT_ID);
final ClientWrapper consumerClientWrapper = room.clientWrapper(consumerClientId); final ClientWrapper consumerClientWrapper = room.clientWrapper(consumerClientId);
final Map<String, Consumer> roomConsumers = room.getConsumers(); final Map<String, Consumer> roomConsumers = room.getConsumers();
final Map<String, Consumer> clientConsumers = consumerClientWrapper.getConsumers(); final Map<String, Consumer> clientConsumers = consumerClientWrapper.getConsumers();
final Map<String, Consumer> producerConsumers = producer.getConsumers(); final Map<String, Consumer> producerConsumers = producer.getConsumers();
final Consumer consumer = new Consumer(kind, streamId, consumerId, room, producer, consumerClientWrapper); final Consumer consumer = new Consumer(kind, streamId, consumerId, room, producer, consumerClientWrapper);
final Consumer oldRoomConsumer = roomConsumers.put(consumerId, consumer); final Consumer oldRoomConsumer = roomConsumers.put(consumerId, consumer);
final Consumer oldClientConsumer = clientConsumers.put(consumerId, consumer); final Consumer oldClientConsumer = clientConsumers.put(consumerId, consumer);
final Consumer oldProducerConsumer = producerConsumers.put(consumerId, consumer); final Consumer oldProducerConsumer = producerConsumers.put(consumerId, consumer);
if(oldRoomConsumer != null || oldClientConsumer != null || oldProducerConsumer != null) { if(oldRoomConsumer != null || oldClientConsumer != null || oldProducerConsumer != null) {
log.warn("消费者已经存在:{}", consumerId); log.warn("消费者已经存在:{}", consumerId);
@@ -127,26 +127,26 @@ public class MediaConsumeProtocol extends ProtocolRoomAdapter implements Applica
* @param message 消息 * @param message 消息
*/ */
private void consume(Room room, ClientWrapper consumerClientWrapper, Producer producer, Message message) { private void consume(Room room, ClientWrapper consumerClientWrapper, Producer producer, Message message) {
final Client mediaClient = room.getMediaClient(); final Client mediaClient = room.getMediaClient();
final String consumerClientId = consumerClientWrapper.getClientId(); final String consumerClientId = consumerClientWrapper.getClientId();
final String streamId = Constant.STREAM_ID_CONSUMER.apply(producer.getStreamId(), consumerClientId); final String streamId = Constant.STREAM_ID_CONSUMER.apply(producer.getStreamId(), consumerClientId);
final ClientWrapper producerClientWrapper = producer.getProducerClient(); final ClientWrapper producerClientWrapper = producer.getProducerClient();
final String producerClientId = producerClientWrapper.getClientId(); final String producerClientId = producerClientWrapper.getClientId();
if(consumerClientWrapper.consumed(producer)) { if(consumerClientWrapper.consumed(producer)) {
// 消费通道就绪 // 消费通道就绪
mediaClient.push(message); mediaClient.push(message);
log.info("{}消费通道就绪:{}", consumerClientId, streamId); log.info("{}消费通道就绪:{}", consumerClientId, streamId);
} else { } else {
// 主动消费媒体 // 主动消费媒体
final Transport recvTransport = consumerClientWrapper.getRecvTransport(); final Transport recvTransport = consumerClientWrapper.getRecvTransport();
final Map<String, Object> body = new HashMap<>(); final Map<String, Object> body = new HashMap<>();
body.put(Constant.ROOM_ID, room.getRoomId()); body.put(Constant.ROOM_ID, room.getRoomId());
body.put(Constant.CLIENT_ID, consumerClientId); body.put(Constant.CLIENT_ID, consumerClientId);
body.put(Constant.SOURCE_ID, producerClientId); body.put(Constant.SOURCE_ID, producerClientId);
body.put(Constant.STREAM_ID, streamId); body.put(Constant.STREAM_ID, streamId);
body.put(Constant.PRODUCER_ID, producer.getProducerId()); body.put(Constant.PRODUCER_ID, producer.getProducerId());
body.put(Constant.TRANSPORT_ID, recvTransport.getTransportId()); body.put(Constant.TRANSPORT_ID, recvTransport.getTransportId());
body.put(Constant.RTP_CAPABILITIES, consumerClientWrapper.getRtpCapabilities()); body.put(Constant.RTP_CAPABILITIES, consumerClientWrapper.getRtpCapabilities());
body.put(Constant.SCTP_CAPABILITIES, consumerClientWrapper.getSctpCapabilities()); body.put(Constant.SCTP_CAPABILITIES, consumerClientWrapper.getSctpCapabilities());
message.setBody(body); message.setBody(body);
mediaClient.push(message); mediaClient.push(message);

View File

@@ -34,12 +34,12 @@ import lombok.extern.slf4j.Slf4j;
""", """,
body = """ body = """
{ {
"roomId": "房间ID" "roomId" : "房间ID"
"producerId": "生产者ID", "producerId": "生产者ID",
} }
""", """,
flow = { flow = {
"终端=>信令服务->媒体服务->信令服务->媒体服务" "终端->信令服务->媒体服务->信令服务->终端"
} }
) )
public class MediaDataConsumeProtocol extends ProtocolRoomAdapter { public class MediaDataConsumeProtocol extends ProtocolRoomAdapter {
@@ -59,31 +59,31 @@ public class MediaDataConsumeProtocol extends ProtocolRoomAdapter {
} }
if(clientType.mediaClient()) { if(clientType.mediaClient()) {
final ClientWrapper dataConsumerClientWrapper = room.clientWrapper(client); final ClientWrapper dataConsumerClientWrapper = room.clientWrapper(client);
final String dataConsumerClientId = dataConsumerClientWrapper.getClientId(); final String dataConsumerClientId = dataConsumerClientWrapper.getClientId();
final ClientWrapper dataProducerClientWrapper = dataProducer.getProducerClient(); final ClientWrapper dataProducerClientWrapper = dataProducer.getProducerClient();
final String dataProducerClientId = dataProducerClientWrapper.getClientId(); final String dataProducerClientId = dataProducerClientWrapper.getClientId();
final Transport recvTransport = dataConsumerClientWrapper.getRecvTransport(); final Transport recvTransport = dataConsumerClientWrapper.getRecvTransport();
final String streamId = Constant.STREAM_ID_CONSUMER.apply(dataProducer.getStreamId(), dataConsumerClientId); final String streamId = Constant.STREAM_ID_CONSUMER.apply(dataProducer.getStreamId(), dataConsumerClientId);
body.put(Constant.ROOM_ID, room.getRoomId()); body.put(Constant.ROOM_ID, room.getRoomId());
body.put(Constant.CLIENT_ID, dataConsumerClientId); body.put(Constant.CLIENT_ID, dataConsumerClientId);
body.put(Constant.SOURCE_ID, dataProducerClientId); body.put(Constant.SOURCE_ID, dataProducerClientId);
body.put(Constant.STREAM_ID, streamId); body.put(Constant.STREAM_ID, streamId);
body.put(Constant.PRODUCER_ID, dataProducer.getProducerId()); body.put(Constant.PRODUCER_ID, dataProducer.getProducerId());
body.put(Constant.TRANSPORT_ID, recvTransport.getTransportId()); body.put(Constant.TRANSPORT_ID, recvTransport.getTransportId());
body.put(Constant.RTP_CAPABILITIES, dataConsumerClientWrapper.getRtpCapabilities()); body.put(Constant.RTP_CAPABILITIES, dataConsumerClientWrapper.getRtpCapabilities());
body.put(Constant.SCTP_CAPABILITIES, dataConsumerClientWrapper.getSctpCapabilities()); body.put(Constant.SCTP_CAPABILITIES, dataConsumerClientWrapper.getSctpCapabilities());
mediaClient.push(message); mediaClient.push(message);
} else if(clientType.mediaServer()) { } else if(clientType.mediaServer()) {
final String streamId = MapUtils.get(body, Constant.STREAM_ID); final String streamId = MapUtils.get(body, Constant.STREAM_ID);
final String consumerId = MapUtils.get(body, Constant.CONSUMER_ID); final String consumerId = MapUtils.get(body, Constant.CONSUMER_ID);
final String dataConsumerClientId = MapUtils.get(body, Constant.CLIENT_ID); final String dataConsumerClientId = MapUtils.get(body, Constant.CLIENT_ID);
final ClientWrapper dataConsumerClientWrapper = room.clientWrapper(dataConsumerClientId); final ClientWrapper dataConsumerClientWrapper = room.clientWrapper(dataConsumerClientId);
final Map<String, DataConsumer> roomDataConsumers = room.getDataConsumers(); final Map<String, DataConsumer> roomDataConsumers = room.getDataConsumers();
final Map<String, DataConsumer> clientDataConsumers = dataConsumerClientWrapper.getDataConsumers(); final Map<String, DataConsumer> clientDataConsumers = dataConsumerClientWrapper.getDataConsumers();
final Map<String, DataConsumer> producerDataConsumers = dataProducer.getDataConsumers(); final Map<String, DataConsumer> producerDataConsumers = dataProducer.getDataConsumers();
final DataConsumer dataConsumer = new DataConsumer(streamId, consumerId, room, dataProducer, dataConsumerClientWrapper); final DataConsumer dataConsumer = new DataConsumer(streamId, consumerId, room, dataProducer, dataConsumerClientWrapper);
final DataConsumer oldDataRoomConsumer = roomDataConsumers.put(consumerId, dataConsumer); final DataConsumer oldDataRoomConsumer = roomDataConsumers.put(consumerId, dataConsumer);
final DataConsumer oldDataClientConsumer = clientDataConsumers.put(consumerId, dataConsumer); final DataConsumer oldDataClientConsumer = clientDataConsumers.put(consumerId, dataConsumer);
final DataConsumer oldDataProducerConsumer = producerDataConsumers.put(consumerId, dataConsumer); final DataConsumer oldDataProducerConsumer = producerDataConsumers.put(consumerId, dataConsumer);
if(oldDataRoomConsumer != null || oldDataClientConsumer != null || oldDataProducerConsumer != null) { if(oldDataRoomConsumer != null || oldDataClientConsumer != null || oldDataProducerConsumer != null) {
log.warn("消费者已经存在:{}", consumerId); log.warn("消费者已经存在:{}", consumerId);

View File

@@ -27,15 +27,17 @@ import lombok.extern.slf4j.Slf4j;
@Slf4j @Slf4j
@Protocol @Protocol
@Description( @Description(
memo = "关闭通过回调实现所以不能同步响应",
body = """ body = """
{ {
"roomId": "房间ID" "roomId" : "房间ID"
"consumerId": "数据消费者ID" "consumerId": "数据消费者ID"
} }
""", """,
flow = { flow = {
"媒体服务->信令服务-)终端", "媒体服务->信令服务->终端",
"终端->信令服务->媒体服务->信令服务+)终端" "信令服务->媒体服务->信令服务->终端",
"终端->信令服务->媒体服务->信令服务->终端"
} }
) )
public class MediaDataConsumerCloseProtocol extends ProtocolRoomAdapter implements ApplicationListener<MediaDataConsumerCloseEvent> { public class MediaDataConsumerCloseProtocol extends ProtocolRoomAdapter implements ApplicationListener<MediaDataConsumerCloseEvent> {
@@ -49,8 +51,8 @@ public class MediaDataConsumerCloseProtocol extends ProtocolRoomAdapter implemen
@Async @Async
@Override @Override
public void onApplicationEvent(MediaDataConsumerCloseEvent event) { public void onApplicationEvent(MediaDataConsumerCloseEvent event) {
final Room room = event.getRoom(); final Room room = event.getRoom();
final Client mediaClient = event.getMediaClient(); final Client mediaClient = event.getMediaClient();
final Map<String, Object> body = Map.of( final Map<String, Object> body = Map.of(
Constant.ROOM_ID, room.getRoomId(), Constant.ROOM_ID, room.getRoomId(),
Constant.CONSUMER_ID, event.getConsumerId() Constant.CONSUMER_ID, event.getConsumerId()
@@ -60,7 +62,7 @@ public class MediaDataConsumerCloseProtocol extends ProtocolRoomAdapter implemen
@Override @Override
public void execute(String clientId, ClientType clientType, Room room, Client client, Client mediaClient, Message message, Map<String, Object> body) { public void execute(String clientId, ClientType clientType, Room room, Client client, Client mediaClient, Message message, Map<String, Object> body) {
final String consumerId = MapUtils.get(body, Constant.CONSUMER_ID); final String consumerId = MapUtils.get(body, Constant.CONSUMER_ID);
final DataConsumer dataConsumer = room.dataConsumer(consumerId); final DataConsumer dataConsumer = room.dataConsumer(consumerId);
if(dataConsumer == null) { if(dataConsumer == null) {
log.debug("数据消费者无效:{} - {}", consumerId, clientType); log.debug("数据消费者无效:{} - {}", consumerId, clientType);
@@ -69,9 +71,8 @@ public class MediaDataConsumerCloseProtocol extends ProtocolRoomAdapter implemen
if(clientType.mediaClient()) { if(clientType.mediaClient()) {
dataConsumer.close(); dataConsumer.close();
} else if(clientType.mediaServer()) { } else if(clientType.mediaServer()) {
// TODO路由到真实消费者
dataConsumer.remove(); dataConsumer.remove();
room.broadcast(message); dataConsumer.getConsumerClient().push(message);
} else { } else {
this.logNoAdapter(clientType); this.logNoAdapter(clientType);
} }

View File

@@ -19,11 +19,11 @@ import com.acgist.taoyao.signal.protocol.ProtocolRoomAdapter;
@Description( @Description(
body = """ body = """
{ {
"roomId": "房间ID", "roomId" : "房间ID",
"consumerId": "数据消费者ID" "consumerId": "数据消费者ID"
} }
""", """,
flow = "终端=>信令服务->媒体服务->信令服务->终端" flow = "终端=>信令服务->媒体服务"
) )
public class MediaDataConsumerStatusProtocol extends ProtocolRoomAdapter { public class MediaDataConsumerStatusProtocol extends ProtocolRoomAdapter {