[*] 日常优化

This commit is contained in:
acgist
2023-07-01 09:30:44 +08:00
parent 02f266dbc9
commit ac96910bf0
2 changed files with 119 additions and 141 deletions

View File

@@ -838,7 +838,6 @@ class Taoyao {
* @param {*} body 消息主体
*/
async mediaProduce(message, body) {
const me = this;
const {
kind,
roomId,
@@ -848,6 +847,7 @@ class Taoyao {
appData,
rtpParameters
} = body;
const me = this;
const room = me.rooms.get(roomId);
const transport = room?.transports.get(transportId);
if(!transport) {
@@ -863,6 +863,7 @@ class Taoyao {
producer.clientId = clientId;
producer.streamId = streamId;
room.producers.set(producer.id, producer);
console.debug("创建生产者", producer.id, streamId);
producer.on("transportclose", () => {
console.info("生产者关闭(通道关闭)", producer.id, streamId);
producer.close();
@@ -870,7 +871,7 @@ class Taoyao {
producer.observer.on("close", () => {
if(room.producers.delete(producer.id)) {
console.info("生产者关闭", producer.id, streamId);
this.push(
me.push(
protocol.buildMessage("media::producer::close", {
roomId : roomId,
producerId: producer.id
@@ -882,7 +883,7 @@ class Taoyao {
});
producer.observer.on("pause", () => {
console.debug("生产者暂停", producer.id, streamId);
this.push(
me.push(
protocol.buildMessage("media::producer::pause", {
roomId : roomId,
producerId: producer.id
@@ -891,7 +892,7 @@ class Taoyao {
});
producer.observer.on("resume", () => {
console.debug("生产者恢复", producer.id, streamId);
this.push(
me.push(
protocol.buildMessage("media::producer::resume", {
roomId : roomId,
producerId: producer.id
@@ -922,14 +923,14 @@ class Taoyao {
// await producer.enableTraceEvent([ 'pli', 'fir', 'rtp', 'nack', 'keyframe' ]);
// producer.observer.on("trace", fn(trace));
// producer.on("trace", (trace) => {
// console.debug("生产者跟踪", producer.id, trace);
// console.debug("生产者跟踪事件trace", producer.id, streamId, trace);
// });
message.body = {
kind : kind,
roomId : roomId,
producerId: producer.id
};
this.push(message);
me.push(message);
if (producer.kind === "audio") {
room.audioLevelObserver
.addProducer({ producerId: producer.id })
@@ -944,23 +945,22 @@ class Taoyao {
}
}
// TODO:continue
/**
* 关闭生产者信令
*
* @param {*} message 消息
* @param {*} body 消息主体
* @param {*} body 消息主体
*/
async mediaProducerClose(message, body) {
const me = this;
const { roomId, producerId } = body;
const room = this.rooms.get(roomId);
const room = me.rooms.get(roomId);
const producer = room?.producers.get(producerId);
if(producer) {
console.info("关闭生产者", producerId);
console.info("关闭生产者", producerId);
await producer.close();
} else {
console.info("关闭生产者无效", producerId);
console.debug("关闭生产者无效", producerId);
}
}
@@ -968,17 +968,18 @@ class Taoyao {
* 暂停生产者信令
*
* @param {*} message 消息
* @param {*} body 消息主体
* @param {*} body 消息主体
*/
async mediaProducerPause(message, body) {
const me = this;
const { roomId, producerId } = body;
const room = this.rooms.get(roomId);
const producer = room.producers.get(producerId);
const room = me.rooms.get(roomId);
const producer = room?.producers.get(producerId);
if(producer) {
console.info("暂停生产者", producerId);
console.debug("暂停生产者", producerId);
await producer.pause();
} else {
console.info("暂停生产者无效", producerId);
console.debug("暂停生产者无效(无效)", producerId);
}
}
@@ -986,20 +987,27 @@ class Taoyao {
* 恢复生产者信令
*
* @param {*} message 消息
* @param {*} body 消息主体
* @param {*} body 消息主体
*/
async mediaProducerResume(message, body) {
const me = this;
const { roomId, producerId } = body;
const room = this.rooms.get(roomId);
const producer = room.producers.get(producerId);
const room = me.rooms.get(roomId);
const producer = room?.producers.get(producerId);
if(producer) {
console.info("恢复生产者", producerId);
console.debug("恢复生产者", producerId);
await producer.resume();
} else {
console.info("恢复生产者无效", producerId);
console.debug("恢复生产者无效", producerId);
}
}
/**
* 消费媒体信令
*
* @param {*} message 消息
* @param {*} body 消息主体
*/
async mediaConsume(message, body) {
const {
roomId,
@@ -1011,63 +1019,50 @@ class Taoyao {
appData,
rtpCapabilities,
} = body;
const room = this.rooms.get(roomId);
const producer = room?.producers.get(producerId);
const me = this;
const room = me.rooms.get(roomId);
const producer = room?.producers.get(producerId);
const transport = room?.transports.get(transportId);
if (
!room ||
!producer ||
!transport ||
!room ||
!producer ||
!transport ||
!rtpCapabilities ||
!room.mediasoupRouter.canConsume({
producerId: producerId,
producerId : producerId,
rtpCapabilities: rtpCapabilities,
})
) {
console.warn(
"不能消费媒体:",
roomId,
clientId,
producerId,
transportId,
rtpCapabilities
);
console.warn("不能消费媒体", body);
return;
}
const promises = [];
const consumerCount = 1 + room.consumerReplicas;
const promises = [];
const consumerCount = room.consumerReplicas + 1;
for (let i = 0; i < consumerCount; i++) {
promises.push(
(async () => {
let consumer;
try {
consumer = await transport.consume({
producerId: producerId,
// 默认暂停
paused : true,
producerId : producerId,
rtpCapabilities: rtpCapabilities,
// 暂停
paused: true,
});
} catch (error) {
console.error(
"创建消费者异常:",
roomId,
clientId,
producerId,
transportId,
rtpCapabilities,
error
);
console.error("创建消费者异常", body, error);
return;
}
consumer.clientId = clientId;
consumer.streamId = streamId;
room.consumers.set(consumer.id, consumer);
console.debug("创建消费者", consumer.id, streamId);
consumer.on("transportclose", () => {
console.info("consumer transportclose", consumer.id);
console.info("消费者关闭(通道关闭)", consumer.id, streamId);
consumer.close();
});
consumer.on("producerclose", () => {
console.info("consumer producerclose", consumer.id);
console.info("消费者关闭(生产者关闭)", consumer.id, streamId);
consumer.close();
});
consumer.on("producerpause", () => {
@@ -1075,114 +1070,98 @@ class Taoyao {
if(consumer.localPaused) {
return;
}
console.info("consumer producerpause", consumer.id);
console.debug("消费者暂停(生产者暂停)", consumer.id, streamId);
consumer.pause();
this.push(
protocol.buildMessage("media::consumer::pause", {
roomId: roomId,
consumerId: consumer.id,
})
);
});
consumer.on("producerresume", () => {
// 本地暂停不要操作
if(consumer.localPaused) {
return;
}
console.info("consumer producerresume", consumer.id);
console.debug("消费者恢复(生产者恢复)", consumer.id, streamId);
consumer.resume();
this.push(
protocol.buildMessage("media::consumer::resume", {
roomId: roomId,
consumerId: consumer.id,
})
);
});
// consumer.observer.on("score", fn(score));
consumer.on("score", (score) => {
console.info("consumer score", consumer.id, score);
this.push(
console.debug("消费者评分", consumer.id, streamId, score);
me.push(
protocol.buildMessage("media::consumer::score", {
score: score,
roomId: roomId,
score : score,
roomId : roomId,
consumerId: consumer.id,
})
);
});
// consumer.observer.on("layerschange", fn(layers));
consumer.on("layerschange", (layers) => {
console.info("consumer layerschange", consumer.id, layers);
this.push(
console.debug("消费者空间层和时间层改变", consumer.id, streamId, layers);
me.push(
protocol.buildMessage("media::consumer::layers::change", {
roomId: roomId,
roomId : roomId,
consumerId: consumer.id,
spatialLayer: layers ? layers.spatialLayer : null,
spatialLayer : layers ? layers.spatialLayer : null,
temporalLayer: layers ? layers.temporalLayer : null,
})
);
});
// await consumer.enableTraceEvent([ 'rtp', 'keyframe', 'nack', 'pli', 'fir' ]);
// consumer.on("trace", (trace) => {
// console.info("consumer trace", consumer.id, trace);
// });
// consumer.on("rtp", (rtpPacket) => {
// console.info("consumer rtp", consumer.id, rtpPacket);
// });
consumer.observer.on("close", () => {
if(room.consumers.delete(consumer.id)) {
console.info("consumer close", consumer.id);
this.push(
console.info("消费者关闭", consumer.id, streamId);
me.push(
protocol.buildMessage("media::consumer::close", {
roomId: roomId,
roomId : roomId,
consumerId: consumer.id
})
);
} else {
console.debug("consumer close non", consumer.id);
console.debug("消费者关闭(无效)", consumer.id, streamId);
}
});
consumer.observer.on("pause", () => {
console.info("consumer pause", consumer.id);
this.push(
console.debug("消费者暂停", consumer.id, streamId);
me.push(
protocol.buildMessage("media::consumer::pause", {
roomId: roomId,
roomId : roomId,
consumerId: consumer.id
})
);
});
consumer.observer.on("resume", () => {
console.info("consumer resume", consumer.id);
this.push(
console.debug("消费者恢复", consumer.id, streamId);
me.push(
protocol.buildMessage("media::consumer::resume", {
roomId: roomId,
roomId : roomId,
consumerId: consumer.id
})
);
});
// consumer.observer.on("score", fn(score));
// consumer.observer.on("layerschange", fn(layers));
// await consumer.enableTraceEvent([ 'pli', 'fir', 'rtp', 'nack', 'keyframe' ]);
// consumer.observer.on("trace", fn(trace));
// 等待终端准备就绪
await this.request(
// this.push(
// consumer.on("trace", (trace) => {
// console.info("消费者跟踪事件trace", consumer.id, streamId, trace);
// });
// 等待终端准备就绪可以不用等待直接使用push方法
await me.request(
protocol.buildMessage("media::consume", {
kind: consumer.kind,
type: consumer.type,
roomId: roomId,
appData: producer.appData,
clientId: clientId,
sourceId: sourceId,
streamId: streamId,
producerId: producerId,
consumerId: consumer.id,
rtpParameters: consumer.rtpParameters,
kind : consumer.kind,
type : consumer.type,
roomId : roomId,
clientId : clientId,
sourceId : sourceId,
streamId : streamId,
producerId : producerId,
consumerId : consumer.id,
appData : producer.appData,
rtpParameters : consumer.rtpParameters,
producerPaused: consumer.producerPaused,
})
);
await consumer.resume();
consumer.localPaused = false;
this.push(
me.push(
protocol.buildMessage("media::consumer::score", {
score: consumer.score,
roomId: roomId,
score : consumer.score,
roomId : roomId,
consumerId: consumer.id,
})
);
@@ -1192,7 +1171,7 @@ class Taoyao {
try {
await Promise.all(promises);
} catch (error) {
console.warn("_createConsumer() | failed:%o", error);
console.error("消费媒体异常", error);
}
}
@@ -1200,17 +1179,18 @@ class Taoyao {
* 关闭消费者信令
*
* @param {*} message 消息
* @param {*} body 消息主体
* @param {*} body 消息主体
*/
async mediaConsumerClose(message, body) {
const me = this;
const { roomId, consumerId } = body;
const room = this.rooms.get(roomId);
const room = me.rooms.get(roomId);
const consumer = room?.consumers.get(consumerId);
if(consumer) {
console.info("关闭消费者", consumerId);
console.info("关闭消费者", consumerId);
await consumer.close();
} else {
console.info("关闭消费者无效", consumerId);
console.debug("关闭消费者无效(无效)", consumerId);
}
}
@@ -1218,18 +1198,19 @@ class Taoyao {
* 暂停消费者信令
*
* @param {*} message 消息
* @param {*} body 消息主体
* @param {*} body 消息主体
*/
async mediaConsumerPause(message, body) {
const me = this;
const { roomId, consumerId } = body;
const room = this.rooms.get(roomId);
const room = me.rooms.get(roomId);
const consumer = room?.consumers.get(consumerId);
if(consumer) {
consumer.localPaused = true;
console.info("暂停消费者", consumerId);
console.debug("暂停消费者", consumerId);
await consumer.pause();
} else {
console.info("暂停消费者无效", consumerId);
console.debug("暂停消费者无效", consumerId);
}
}
@@ -1237,20 +1218,19 @@ class Taoyao {
* 请求关键帧信令
*
* @param {*} message 消息
* @param {*} body 消息主体
* @param {*} body 消息主体
*/
async mediaConsumerRequestKeyFrame(message, body) {
const me = this;
const { roomId, consumerId } = body;
const room = this.rooms.get(roomId);
const room = me.rooms.get(roomId);
const consumer = room?.consumers.get(consumerId);
if(consumer) {
console.info("mediaConsumerRequestKeyFrame", consumerId);
// 处理trace监听读取关键帧
console.debug("请求关键帧", consumerId);
// 通过trace事件监听关键帧的信息
await consumer.requestKeyFrame();
me.push(message);
} else {
console.info("mediaConsumerRequestKeyFrame non", consumerId);
console.debug("请求关键帧(无效)", consumerId);
}
}
@@ -1258,18 +1238,19 @@ class Taoyao {
* 恢复消费者信令
*
* @param {*} message 消息
* @param {*} body 消息主体
* @param {*} body 消息主体
*/
async mediaConsumerResume(message, body) {
const me = this;
const { roomId, consumerId } = body;
const room = this.rooms.get(roomId);
const consumer = room.consumers.get(consumerId);
const room = me.rooms.get(roomId);
const consumer = room?.consumers.get(consumerId);
if(consumer) {
consumer.localPaused = false;
console.info("恢复消费者", consumerId);
console.debug("恢复消费者", consumerId);
await consumer.resume();
} else {
console.info("恢复消费者无效", consumerId);
console.debug("恢复消费者无效", consumerId);
}
}
@@ -1277,7 +1258,7 @@ class Taoyao {
* 修改最佳空间层和时间层信令
*
* @param {*} message 消息
* @param {*} body 消息主体
* @param {*} body 消息主体
*/
async mediaConsumerSetPreferredLayers(message, body) {
const me = this;
@@ -1287,17 +1268,18 @@ class Taoyao {
spatialLayer,
temporalLayer
} = body;
const room = this.rooms.get(roomId);
const room = me.rooms.get(roomId);
const consumer = room?.consumers.get(consumerId);
if(consumer) {
console.info("mediaConsumerSetPreferredLayers", consumerId);
console.debug("修改最佳空间层和时间层", consumerId);
await consumer.setPreferredLayers({ spatialLayer, temporalLayer });
me.push(message);
} else {
console.info("mediaConsumerSetPreferredLayers non", consumerId);
console.debug("修改最佳空间层和时间层(无效)", consumerId);
}
}
// TODOcontinue
/**
* 消费数据信令
*
@@ -1641,14 +1623,9 @@ class Taoyao {
console.info("transport listenserverclose", transport.id);
transport.close();
});
// await transport.enableTraceEvent([ 'probation', 'bwe' ]);
// await transport.enableTraceEvent([ 'bwe', 'probation' ]);
// transport.on("trace", (trace) => {
// // 网络评估
// if (trace.type === "bwe" && trace.direction === "out") {
// logger.debug("transport downlinkBwe", trace);
// } else {
// console.debug("transport trace", transport.id, trace);
// }
// console.debug("通道跟踪事件trace", transport.id, trace);
// });
transport.observer.on("close", () => {
if(room.transports.delete(transport.id)) {