[*] 日常优化

This commit is contained in:
acgist
2023-10-08 08:00:43 +08:00
parent 993ed9ed1d
commit 03ab29a13f
3 changed files with 74 additions and 54 deletions

View File

@@ -804,7 +804,7 @@ class Taoyao {
/**
* 消费媒体信令
*
* @param {*} message 消息
* @param {*} message 信令消息
* @param {*} body 消息主体
*/
async mediaConsume(message, body) {
@@ -818,8 +818,7 @@ class Taoyao {
appData,
rtpCapabilities,
} = body;
const me = this;
const room = me.rooms.get(roomId);
const room = this.rooms.get(roomId);
const producer = room?.producers.get(producerId);
const transport = room?.transports.get(transportId);
if (
@@ -828,8 +827,8 @@ class Taoyao {
!transport ||
!rtpCapabilities ||
!room.mediasoupRouter.canConsume({
producerId : producerId,
rtpCapabilities: rtpCapabilities,
producerId,
rtpCapabilities,
})
) {
console.warn("不能消费媒体", body);
@@ -840,28 +839,21 @@ class Taoyao {
for (let i = 0; i < consumerCount; i++) {
promises.push(
(async () => {
let consumer;
try {
consumer = await transport.consume({
// 默认暂停
const consumer = await transport.consume({
paused: true,
producerId : producerId,
rtpCapabilities: rtpCapabilities,
producerId,
rtpCapabilities,
});
} catch (error) {
console.error("创建消费者异常", body, error);
return;
}
consumer.clientId = clientId;
consumer.streamId = streamId;
room.consumers.set(consumer.id, consumer);
console.debug("创建消费者", consumer.id, streamId);
consumer.on("transportclose", () => {
console.info("消费者关闭(通道关闭)", consumer.id, streamId);
console.debug("消费者关闭(通道关闭)", consumer.id, streamId);
consumer.close();
});
consumer.on("producerclose", () => {
console.info("消费者关闭(生产者关闭)", consumer.id, streamId);
console.debug("消费者关闭(生产者关闭)", consumer.id, streamId);
consumer.close();
});
consumer.on("producerpause", () => {
@@ -883,7 +875,7 @@ class Taoyao {
// consumer.observer.on("score", fn(score));
consumer.on("score", (score) => {
console.debug("消费者评分", consumer.id, streamId, score);
me.push(protocol.buildMessage("media::consumer::score", {
this.push(protocol.buildMessage("media::consumer::score", {
score,
roomId,
consumerId: consumer.id,
@@ -892,17 +884,17 @@ class Taoyao {
// consumer.observer.on("layerschange", fn(layers));
consumer.on("layerschange", (layers) => {
console.debug("消费者空间层和时间层改变", consumer.id, streamId, layers);
me.push(protocol.buildMessage("media::consumer::layers::change", {
this.push(protocol.buildMessage("media::consumer::layers::change", {
roomId,
consumerId : consumer.id,
spatialLayer : layers ? layers.spatialLayer : null,
temporalLayer: layers ? layers.temporalLayer : null,
spatialLayer : layers?.spatialLayer,
temporalLayer: layers?.temporalLayer,
}));
});
consumer.observer.on("close", () => {
if(room.consumers.delete(consumer.id)) {
console.debug("消费者关闭", consumer.id, streamId);
me.push(protocol.buildMessage("media::consumer::close", {
this.push(protocol.buildMessage("media::consumer::close", {
roomId,
consumerId: consumer.id
}));
@@ -912,14 +904,14 @@ class Taoyao {
});
consumer.observer.on("pause", () => {
console.debug("消费者暂停", consumer.id, streamId);
me.push(protocol.buildMessage("media::consumer::pause", {
this.push(protocol.buildMessage("media::consumer::pause", {
roomId,
consumerId: consumer.id
}));
});
consumer.observer.on("resume", () => {
console.debug("消费者恢复", consumer.id, streamId);
me.push(protocol.buildMessage("media::consumer::resume", {
this.push(protocol.buildMessage("media::consumer::resume", {
roomId,
consumerId: consumer.id
}));
@@ -930,7 +922,7 @@ class Taoyao {
// console.debug("消费者跟踪事件trace", consumer.id, streamId, trace);
// });
// 等待终端准备就绪可以不用等待直接使用push方法
await me.request(protocol.buildMessage("media::consume", {
await this.request(protocol.buildMessage("media::consume", {
roomId,
clientId,
sourceId,

View File

@@ -838,6 +838,9 @@ class Taoyao extends RemoteClient {
case "media::consumer::close":
me.defaultMediaConsumerClose(message, body);
break;
case "media::consumer::layers::change":
this.defaultMediaConsumerLayersChange(message, body);
break;
case "media::consumer::pause":
me.defaultMediaConsumerPause(message, body);
break;
@@ -1527,14 +1530,13 @@ class Taoyao extends RemoteClient {
* @param {*} producerId 生产者ID
*/
mediaConsume(producerId) {
const me = this;
if(!me.recvTransport) {
me.platformError("没有连接接收通道");
if(!this.recvTransport) {
this.platformError("没有连接接收通道");
return;
}
me.push(protocol.buildMessage("media::consume", {
roomId : me.roomId,
producerId: producerId,
this.push(protocol.buildMessage("media::consume", {
producerId,
roomId: this.roomId,
}));
}
@@ -1550,8 +1552,7 @@ class Taoyao extends RemoteClient {
* @param {*} body 消息主体
*/
async defaultMediaConsume(message, body) {
const me = this;
if (!me.audioConsume && !me.videoConsume) {
if (!this.audioConsume && !this.videoConsume) {
console.debug("没有消费媒体");
return;
}
@@ -1569,10 +1570,15 @@ class Taoyao extends RemoteClient {
producerPaused,
} = body;
try {
const consumer = await me.recvTransport.consume({
const consumer = await this.recvTransport.consume({
id: consumerId,
appData: { ...appData, clientId, sourceId, streamId },
// 让libwebrtc同步相同来源媒体
appData: {
...appData,
clientId,
sourceId,
streamId
},
// libwebrtc同步相同来源媒体
streamId: `${clientId}-${appData.videoSource || "taoyao"}`,
kind,
producerId,
@@ -1581,16 +1587,16 @@ class Taoyao extends RemoteClient {
consumer.clientId = clientId;
consumer.sourceId = sourceId;
consumer.streamId = streamId;
me.consumers.set(consumer.id, consumer);
this.consumers.set(consumer.id, consumer);
consumer.on("transportclose", () => {
console.debug("消费者关闭(通道关闭)", consumer.id, streamId);
consumer.close();
});
consumer.observer.on("close", () => {
if(me.consumers.delete(consumer.id)) {
if(this.consumers.delete(consumer.id)) {
console.debug("消费者关闭", consumer.id, streamId);
} else {
console.debug("消费者关闭(无效)", consumer.id, streamId);
console.debug("消费者关闭(消费者无效)", consumer.id, streamId);
}
});
const {
@@ -1599,12 +1605,11 @@ class Taoyao extends RemoteClient {
} = mediasoupClient.parseScalabilityMode(
consumer.rtpParameters.encodings[0].scalabilityMode
);
console.debug("时间层空间层", spatialLayers, temporalLayers);
me.push(message);
console.debug("远程媒体消费者", consumer);
this.push(message);
console.debug("添加远程媒体消费者", consumer, spatialLayers, temporalLayers);
const track = consumer.track;
const remoteClient = me.remoteClients.get(consumer.sourceId);
me.callbackTrack(sourceId, track);
const remoteClient = this.remoteClients.get(consumer.sourceId);
this.callbackTrack(sourceId, track);
if (
remoteClient &&
remoteClient.proxy &&
@@ -1621,10 +1626,10 @@ class Taoyao extends RemoteClient {
}
remoteClient.proxy.media(track, consumer);
} else {
console.warn("远程终端没有实现代理", remoteClient);
console.warn("远程终端没有实现代理", consumer.sourceId, remoteClient);
}
} catch (error) {
me.platformError("消费媒体异常", error);
this.platformError("消费媒体异常", error);
}
}
@@ -1659,6 +1664,16 @@ class Taoyao extends RemoteClient {
consumer.close();
}
/**
* 消费者空间层和时间层改变信令
*
* @param {*} message 信令消息
* @param {*} body 信令主体
*/
defaultMediaConsumerLayersChange(message, body) {
console.debug("消费者空间层和时间层改变", body);
}
/**
* 暂停消费者信令
*
@@ -1903,7 +1918,7 @@ class Taoyao extends RemoteClient {
console.debug("数据消费者消息", dataConsumer.id, streamId, message.toString("UTF-8"), ppid);
});
} catch (error) {
me.platformError("消费数据异常", error);
this.platformError("消费数据异常", error);
}
}

View File

@@ -39,14 +39,27 @@ import lombok.extern.slf4j.Slf4j;
""",
body = """
{
"roomId" : "房间ID"
"roomId" : "房间ID",
"producerId": "生产者ID"
}
{
"roomId" : "房间ID",
"clientId" : "消费者ID",
"sourceId" : "生产者ID",
"streamId" : "媒体ID",
"producerId" : "生产者ID",
"consumerId" : "消费者ID",
"kind" : "消费者媒体类型",
"type" : "消费者类型",
"appData" : "APP数据",
"rtpParameters" : "RTP参数",
"producerPaused": "生产者是否暂停",
}
""",
flow = {
"终端->信令服务->媒体服务=>信令服务->终端",
"终端-[生产媒体]>信令服务-[消费媒体])信令服务=>信令服务->终端",
"终端-[创建WebRTC通道]>信令服务-[消费媒体])信令服务=>信令服务->终端",
"终端->信令服务->媒体服务=>信令服务->终端"
}
)
public class MediaConsumeProtocol extends ProtocolRoomAdapter implements ApplicationListener<MediaConsumeEvent> {
@@ -69,7 +82,7 @@ public class MediaConsumeProtocol extends ProtocolRoomAdapter implements Applica
.filter(v -> v != produceClientWrapper)
.filter(v -> v.getRecvTransport() != null)
.filter(v -> v.getSubscribeType().canConsume(producer))
.forEach(v -> this.consume(room, v, producer, this.build()));
.forEach(consumeClientWrapper -> this.consume(room, consumeClientWrapper, producer, this.build()));
} else if(event.getClientWrapper() != null) {
// 创建WebRTC消费通道消费其他终端
final ClientWrapper consumeClientWrapper = event.getClientWrapper();
@@ -138,7 +151,7 @@ public class MediaConsumeProtocol extends ProtocolRoomAdapter implements Applica
if(consumerClientWrapper.consumed(producer)) {
// 消费通道就绪
mediaClient.push(message);
log.info("{}消费通道就绪:{}", consumerClientId, streamId);
log.debug("{}消费通道就绪:{}", consumerClientId, streamId);
} else {
// 主动消费媒体
final Transport recvTransport = consumerClientWrapper.getRecvTransport();