[*] 日常优化

This commit is contained in:
acgist
2023-08-07 07:41:09 +08:00
parent 0002f55424
commit f356e72476
6 changed files with 49 additions and 22 deletions

View File

@@ -435,6 +435,9 @@ class Taoyao {
case "media::data::producer::close": case "media::data::producer::close":
me.mediaDataProducerClose(message, body); me.mediaDataProducerClose(message, body);
break; break;
case "media::data::producer::status":
me.mediaDataProducerStatus(message, body);
break;
case "media::ice::restart": case "media::ice::restart":
me.mediaIceRestart(message, body); me.mediaIceRestart(message, body);
break; break;
@@ -1431,14 +1434,14 @@ class Taoyao {
const room = me.rooms.get(roomId); const room = me.rooms.get(roomId);
const dataConsumer = room?.dataConsumers.get(consumerId); const dataConsumer = room?.dataConsumers.get(consumerId);
if(dataConsumer) { if(dataConsumer) {
console.debug("查询消费者状态", consumerId); console.debug("查询数据消费者状态", consumerId);
message.body = { message.body = {
...body, ...body,
status: await dataConsumer.getStats() status: await dataConsumer.getStats()
}; };
me.push(message); me.push(message);
} else { } else {
console.debug("查询消费者状态(无效)", consumerId); console.debug("查询数据消费者状态(无效)", consumerId);
} }
} }
@@ -1483,12 +1486,10 @@ class Taoyao {
dataProducer.observer.on("close", () => { dataProducer.observer.on("close", () => {
if(room.dataProducers.delete(dataProducer.id)) { if(room.dataProducers.delete(dataProducer.id)) {
console.info("数据生产者关闭", dataProducer.id, streamId); console.info("数据生产者关闭", dataProducer.id, streamId);
me.push( me.push(taoyaoProtocol.buildMessage("media::data::producer::close", {
taoyaoProtocol.buildMessage("media::data::producer::close", { roomId : roomId,
roomId : roomId, producerId: dataProducer.id,
producerId: dataProducer.id, }));
})
);
} else { } else {
console.debug("数据生产者关闭(无效)", dataProducer.id, streamId); console.debug("数据生产者关闭(无效)", dataProducer.id, streamId);
} }
@@ -1519,6 +1520,32 @@ class Taoyao {
} }
} }
/**
* 查询数据生产者状态信令
*
* @param {*} message 信令消息
* @param {*} body 消息主体
*/
async mediaDataProducerStatus(message, body) {
const me = this;
const {
roomId,
producerId,
} = body;
const room = me.rooms.get(roomId);
const dataProducer = room?.dataProducers.get(producerId);
if(dataProducer) {
console.debug("查询数据生产者状态", producerId);
message.body = {
...body,
status: await dataProducer.getStats()
};
me.push(message);
} else {
console.debug("查询数据生产者状态(无效)", producerId);
}
}
/** /**
* 路由RTP协商信令 * 路由RTP协商信令
* *

View File

@@ -1713,7 +1713,7 @@ class Taoyao extends RemoteClient {
*/ */
async mediaDataProducerStatus(producerId) { async mediaDataProducerStatus(producerId) {
const me = this; const me = this;
return await me.request(protocol.buildMessage('media::data::producer::status', { return await me.request(protocol.buildMessage("media::data::producer::status", {
roomId: me.roomId, roomId: me.roomId,
producerId producerId
})); }));

View File

@@ -27,12 +27,12 @@ import lombok.extern.slf4j.Slf4j;
body = { body = {
""" """
{ {
"roomId": "房间标识", "roomId" : "房间标识",
"transportId": "通道标识" "transportId": "通道标识"
} }
""" """
}, },
flow = "终端->信令服务->媒体服务->信令服务->终端" flow = "终端=>信令服务->媒体服务"
) )
public class MediaDataProduceProtocol extends ProtocolRoomAdapter { public class MediaDataProduceProtocol extends ProtocolRoomAdapter {
@@ -65,7 +65,7 @@ public class MediaDataProduceProtocol extends ProtocolRoomAdapter {
Constant.STREAM_ID, streamId, Constant.STREAM_ID, streamId,
Constant.PRODUCER_ID, producerId Constant.PRODUCER_ID, producerId
)); ));
room.broadcast(responseMessage); client.push(responseMessage);
log.info("{}生产数据:{} - {}", clientId, streamId, producerId); log.info("{}生产数据:{} - {}", clientId, streamId, producerId);
} else { } else {
this.logNoAdapter(clientType); this.logNoAdapter(clientType);

View File

@@ -27,13 +27,14 @@ import lombok.extern.slf4j.Slf4j;
@Slf4j @Slf4j
@Protocol @Protocol
@Description( @Description(
memo = "关闭通过回调实现所以不能同步响应",
body = """ body = """
{ {
"roomId": "房间ID" "roomId" : "房间ID"
"consumerId": "数据生产者ID" "consumerId": "数据生产者ID"
} }
""", """,
flow = "终端->信令服务->媒体服务->信令服务+)终端" flow = "终端->信令服务->媒体服务->信令服务->终端"
) )
public class MediaDataProducerCloseProtocol extends ProtocolRoomAdapter implements ApplicationListener<MediaDataProducerCloseEvent> { public class MediaDataProducerCloseProtocol extends ProtocolRoomAdapter implements ApplicationListener<MediaDataProducerCloseEvent> {
@@ -46,10 +47,10 @@ public class MediaDataProducerCloseProtocol extends ProtocolRoomAdapter implemen
@Async @Async
@Override @Override
public void onApplicationEvent(MediaDataProducerCloseEvent event) { public void onApplicationEvent(MediaDataProducerCloseEvent event) {
final Room room = event.getRoom(); final Room room = event.getRoom();
final Client mediaClient = event.getMediaClient(); final Client mediaClient = event.getMediaClient();
final Map<String, Object> body = Map.of( final Map<String, Object> body = Map.of(
Constant.ROOM_ID, room.getRoomId(), Constant.ROOM_ID, room.getRoomId(),
Constant.PRODUCER_ID, event.getProducerId() Constant.PRODUCER_ID, event.getProducerId()
); );
mediaClient.push(this.build(body)); mediaClient.push(this.build(body));
@@ -57,7 +58,7 @@ public class MediaDataProducerCloseProtocol extends ProtocolRoomAdapter implemen
@Override @Override
public void execute(String clientId, ClientType clientType, Room room, Client client, Client mediaClient, Message message, Map<String, Object> body) { public void execute(String clientId, ClientType clientType, Room room, Client client, Client mediaClient, Message message, Map<String, Object> body) {
final String producerId = MapUtils.get(body, Constant.PRODUCER_ID); final String producerId = MapUtils.get(body, Constant.PRODUCER_ID);
final DataProducer dataProducer = room.dataProducer(producerId); final DataProducer dataProducer = room.dataProducer(producerId);
if(dataProducer == null) { if(dataProducer == null) {
log.debug("数据生产者无效:{} - {}", producerId, clientType); log.debug("数据生产者无效:{} - {}", producerId, clientType);
@@ -66,9 +67,8 @@ public class MediaDataProducerCloseProtocol extends ProtocolRoomAdapter implemen
if(clientType.mediaClient()) { if(clientType.mediaClient()) {
dataProducer.close(); dataProducer.close();
} else if(clientType.mediaServer()) { } else if(clientType.mediaServer()) {
// TODO路由到真实消费者
dataProducer.remove(); dataProducer.remove();
room.broadcast(message); dataProducer.getProducerClient().push(message);
} else { } else {
this.logNoAdapter(clientType); this.logNoAdapter(clientType);
} }

View File

@@ -19,11 +19,11 @@ import com.acgist.taoyao.signal.protocol.ProtocolRoomAdapter;
@Description( @Description(
body = """ body = """
{ {
"roomId": "房间ID", "roomId" : "房间ID",
"producerId": "数据生产者ID" "producerId": "数据生产者ID"
} }
""", """,
flow = "终端=>信令服务->媒体服务->信令服务->终端" flow = "终端=>信令服务->媒体服务"
) )
public class MediaDataProducerStatusProtocol extends ProtocolRoomAdapter { public class MediaDataProducerStatusProtocol extends ProtocolRoomAdapter {

View File

@@ -70,7 +70,7 @@ public class MediaProduceProtocol extends ProtocolRoomAdapter {
Constant.STREAM_ID, streamId, Constant.STREAM_ID, streamId,
Constant.PRODUCER_ID, producerId Constant.PRODUCER_ID, producerId
)); ));
room.broadcast(responseMessage); client.push(responseMessage);
log.info("{}生产媒体:{} - {}", clientId, streamId, producerId); log.info("{}生产媒体:{} - {}", clientId, streamId, producerId);
this.publishEvent(new MediaConsumeEvent(room, producer)); this.publishEvent(new MediaConsumeEvent(room, producer));
} else { } else {