[*] 日常优化

This commit is contained in:
acgist
2023-06-28 08:22:15 +08:00
parent 8245d3446a
commit 989ecefeac

View File

@@ -471,7 +471,7 @@ class Taoyao {
me.mediaRouterRtpCapabilities(message, body); me.mediaRouterRtpCapabilities(message, body);
break; break;
case "media::transport::close": case "media::transport::close":
this.mediaTransportClose(message, body); me.mediaTransportClose(message, body);
break; break;
case "media::transport::plain": case "media::transport::plain":
me.mediaTransportPlain(message, body); me.mediaTransportPlain(message, body);
@@ -539,54 +539,97 @@ class Taoyao {
}); });
} }
// TODO:continue
/** /**
* 打印日志 * 打印日志
*/ */
async usage() { async usage() {
for (const worker of this.mediasoupWorkers) { const me = this;
for (const worker of me.mediasoupWorkers) {
const usage = await worker.getResourceUsage(); const usage = await worker.getResourceUsage();
console.info("Worker使用情况", worker.pid, usage); console.info("工作线程使用情况", worker.pid, usage);
} }
console.info("工作线程数量", this.mediasoupWorkers.length); console.info("工作线程数量", me.mediasoupWorkers.length);
console.info("房间数量", this.rooms.size); console.info("现存房间数量", me.rooms.size);
Array.from(this.rooms.values()).forEach((room) => room.usage()); Array.from(me.rooms.values()).forEach((room) => room.usage());
} }
/** /**
* @returns 下个Meidasoup Worker * @returns 下个工作线程
*/ */
nextMediasoupWorker() { nextMediasoupWorker() {
const worker = this.mediasoupWorkers[this.nextMediasoupWorkerIndex]; const me = this;
if (++this.nextMediasoupWorkerIndex === this.mediasoupWorkers.length) { const worker = me.mediasoupWorkers[me.nextMediasoupWorkerIndex];
this.nextMediasoupWorkerIndex = 0; me.nextMediasoupWorkerIndex = ++me.nextMediasoupWorkerIndex % me.mediasoupWorkers.length;
}
return worker; return worker;
} }
/**
* 关闭所有房间
*/
closeAllRoom() { closeAllRoom() {
console.info("关闭所有房间", this.rooms.size); console.info("关闭所有房间", this.rooms.size);
this.rooms.forEach((room, roomId) => { this.rooms.forEach((room, roomId) => room.closeAll());
room.closeAll();
});
this.rooms.clear(); this.rooms.clear();
} }
/**
* 校验房间
*
* @param {*} room 房间
*/
checkRoom(room) {
if(!room) {
throw new Error("无效房间");
}
}
/**
* 校验通道
*
* @param {*} transport 通道
*/
checkTransport(transport) {
if(!transport) {
throw new Error("无效通道");
}
}
/**
* 校验生产者
*
* @param {*} producer 生产者
*/
checkProducer(producer) {
if(!producer) {
throw new Error("无效生产者");
}
}
/**
* 校验消费者
*
* @param {*} consumer 消费者
*/
checkConsumer(consumer) {
if(!consumer) {
throw new Error("无效消费者");
}
}
/** /**
* 重启终端信令 * 重启终端信令
* *
* @param {*} message 消息 * @param {*} message 消息
* @param {*} body 消息主体 * @param {*} body 消息主体
*/ */
clientReboot(message, body) { clientReboot(message, body) {
const { clientId } = config.signal;
process.exec( process.exec(
`pm2 restart ${config.signal.clientId}`, `pm2 restart ${clientId}`,
function (error, stdout, stderr) { (error, stdout, stderr) => {
console.info("重启媒体服务", error, stdout, stderr); console.info("重启媒体服务", clientId, error, stdout, stderr);
} }
); );
// this.push(message);
} }
/** /**
@@ -597,38 +640,217 @@ class Taoyao {
*/ */
clientRegister(message, body) { clientRegister(message, body) {
protocol.clientIndex = body.index; protocol.clientIndex = body.index;
console.debug("终端序号", protocol.clientIndex); console.debug("终端注册成功", protocol.clientIndex);
} }
/** /**
* 关闭终端信令 * 关闭终端信令
* *
* @param {*} message 消息 * @param {*} message 消息
* @param {*} body 消息主体 * @param {*} body 消息主体
*/ */
clientShutdown(message, body) { clientShutdown(message, body) {
const { clientId } = config.signal;
process.exec( process.exec(
`pm2 stop ${config.signal.clientId}`, `pm2 stop ${clientId}`,
function (error, stdout, stderr) { (error, stdout, stderr) => {
console.info("关闭媒体服务", error, stdout, stderr); console.info("关闭媒体服务", clientId, error, stdout, stderr);
} }
); );
// this.push(message);
} }
/** /**
* 媒体重启ICE信令 * 服务端录像信令
* *
* @param {*} message 消息 * @param {*} message 消息
* @param {*} body 消息主体 * @param {*} body 消息主体
*/
async controlServerRecord(message, body) {
const me = this;
const { enabled, roomId } = body;
const room = me.rooms.get(roomId);
me.checkRoom(room);
if(enabled) {
await me.controlServerRecordStart(message, body, room);
} else {
await me.controlServerRecordStop(message, body, room);
}
}
// TODO:continue
async controlServerRecordStart(message, body, room) {
const me = this;
const {
roomId, clientId, host, filepath, audioPort, audioRtcpPort, videoPort, videoRtcpPort,
rtpCapabilities, audioStreamId, videoStreamId, audioProducerId, videoProducerId
} = body;
const plainTransportOptions = {
...config.mediasoup.plainTransportOptions,
rtcpMux: false,
comedia: false
};
let audioConsumer;
let videoConsumer;
let audioConsumerId;
let videoConsumerId;
let audioTransportId;
let videoTransportId;
let audioRtpParameters;
let videoRtpParameters;
if(audioProducerId) {
const audioTransport = await room.mediasoupRouter.createPlainTransport(plainTransportOptions);
audioTransportId = audioTransport.id;
me.transportEvent("plain", roomId, audioTransport);
audioTransport.clientId = clientId;
room.transports.set(audioTransport.id, audioTransport);
audioTransport.observer.on("close", () => {
console.debug("controlServerRecord audioTransport close", audioTransport.id);
room.transports.delete(audioTransport.id)
});
await audioTransport.connect({
ip : host,
port : audioPort,
rtcpPort: audioRtcpPort
});
audioConsumer = await audioTransport.consume({
producerId: audioProducerId,
rtpCapabilities,
paused: true
});
audioConsumerId = audioConsumer.id;
audioRtpParameters = audioConsumer.rtpParameters;
audioConsumer.clientId = clientId;
audioConsumer.streamId = audioStreamId;
room.consumers.set(audioConsumer.id, audioConsumer);
audioConsumer.observer.on("close", () => {
console.debug("controlServerRecord audioConsumer close", audioConsumer.id);
room.consumers.delete(audioConsumer.id);
});
console.debug("controlServerRecord audio", audioTransportId, audioConsumerId, audioTransport.tuple, audioRtpParameters);
}
if(videoProducerId) {
const videoTransport = await room.mediasoupRouter.createPlainTransport(plainTransportOptions);
videoTransportId = videoTransport.id;
me.transportEvent("plain", roomId, videoTransport);
videoTransport.clientId = clientId;
room.transports.set(videoTransport.id, videoTransport);
videoTransport.observer.on("close", () => {
console.debug("controlServerRecord videoTransport close", videoTransport.id);
room.transports.delete(videoTransport.id)
});
await videoTransport.connect({
ip : host,
port : videoPort,
rtcpPort: videoRtcpPort
});
videoConsumer = await videoTransport.consume({
producerId: videoProducerId,
rtpCapabilities,
paused: true
});
videoConsumerId = videoConsumer.id;
videoRtpParameters = videoConsumer.rtpParameters;
videoConsumer.clientId = clientId;
videoConsumer.streamId = videoStreamId;
room.consumers.set(videoConsumer.id, videoConsumer);
videoConsumer.observer.on("close", () => {
console.debug("controlServerRecord videoConsumer close", videoConsumer.id);
room.consumers.delete(videoConsumer.id);
});
console.debug("controlServerRecord video", videoTransportId, videoConsumerId, videoTransport.tuple, videoRtpParameters);
}
if(audioConsumer) {
await audioConsumer.resume();
}
if(videoConsumer) {
await videoConsumer.resume();
}
this.requestKeyFrameForRecord(0, filepath, videoConsumer);
message.body = {
roomId : roomId,
audioConsumerId : audioConsumerId,
videoConsumerId : videoConsumerId,
audioTransportId : audioTransportId,
videoTransportId : videoTransportId,
audioRtpParameters: audioRtpParameters,
videoRtpParameters: videoRtpParameters,
};
me.push(message);
}
/**
* 请求录像关键帧
* 视频录像需要通过关键帧解析视频信息,关键帧数据太慢会丢弃视频数据包,导致录像文件只有音频没有视频。
*
* @param {*} index 重试次数
* @param {*} filepath 文件路径
* @param {*} videoConsumer 视频消费者
*/
requestKeyFrameForRecord(index, filepath, videoConsumer) {
if(!filepath || !videoConsumer) {
return;
}
if(++index > config.record.requestKeyFrameMaxIndex) {
console.warn("请求录像关键帧次数超限", filepath, index);
return;
}
if(videoConsumer.closed) {
console.warn("请求录像关键帧视频关闭", filepath);
return;
}
// 判断文件大小验证是否已经开始录像:创建文件 -> 视频信息 -> 视频数据 -> 封装视频
if(fs.existsSync(filepath) && fs.statSync(filepath).size >= config.record.requestKeyFrameFileSize) {
console.debug("请求录像关键帧已经开始录像", filepath);
return;
}
console.debug("请求录像关键帧", filepath);
videoConsumer.requestKeyFrame();
setTimeout(() => {
this.requestKeyFrameForRecord(index, filepath, videoConsumer);
}, 1000);
}
async controlServerRecordStop(message, body, room) {
const me = this;
const { audioStreamId, videoStreamId, audioConsumerId, videoConsumerId, audioTransportId, videoTransportId } = body;
const audioConsumer = room.consumers.get(audioConsumerId);
if(audioConsumer) {
audioConsumer.close();
room.consumers.delete(audioConsumerId);
}
const videoConsumer = room.consumers.get(videoConsumerId);
if(videoConsumer) {
videoConsumer.close();
room.consumers.delete(videoConsumerId);
}
const audioTransport = room.transports.get(audioTransportId);
if(audioTransport) {
audioTransport.close();
room.transports.delete(audioTransportId);
}
const videoTransport = room.transports.get(videoTransportId);
if(videoTransport) {
videoTransport.close();
room.transports.delete(videoTransportId);
}
me.push(message);
}
/**
* 重启ICE信令
*
* @param {*} message 消息
* @param {*} body 消息主体
*/ */
async mediaIceRestart(message, body) { async mediaIceRestart(message, body) {
const me = this;
const { roomId, transportId } = body; const { roomId, transportId } = body;
const room = this.rooms.get(roomId); const room = me.rooms.get(roomId);
const transport = room.transports.get(transportId); const transport = room.transports.get(transportId);
const iceParameters = await transport.restartIce(); const iceParameters = await transport.restartIce();
message.body.iceParameters = iceParameters; message.body.iceParameters = iceParameters;
this.push(message); me.push(message);
} }
async mediaProduce(message, body) { async mediaProduce(message, body) {
@@ -792,181 +1014,6 @@ class Taoyao {
} }
} }
/**
* 媒体录像
*
* @param {*} message 消息
* @param {*} body 消息主体
*/
async controlServerRecord(message, body) {
const me = this;
const { enabled, roomId } = body;
const room = this.rooms.get(roomId);
if(enabled) {
await me.controlServerRecordStart(message, body, room);
} else {
await me.controlServerRecordStop(message, body, room);
}
}
async controlServerRecordStart(message, body, room) {
const me = this;
const {
roomId, clientId, host, filepath, audioPort, audioRtcpPort, videoPort, videoRtcpPort,
rtpCapabilities, audioStreamId, videoStreamId, audioProducerId, videoProducerId
} = body;
const plainTransportOptions = {
...config.mediasoup.plainTransportOptions,
rtcpMux: false,
comedia: false
};
let audioConsumer;
let videoConsumer;
let audioConsumerId;
let videoConsumerId;
let audioTransportId;
let videoTransportId;
let audioRtpParameters;
let videoRtpParameters;
if(audioProducerId) {
const audioTransport = await room.mediasoupRouter.createPlainTransport(plainTransportOptions);
audioTransportId = audioTransport.id;
me.transportEvent("plain", roomId, audioTransport);
audioTransport.clientId = clientId;
room.transports.set(audioTransport.id, audioTransport);
audioTransport.observer.on("close", () => {
console.debug("controlServerRecord audioTransport close", audioTransport.id);
room.transports.delete(audioTransport.id)
});
await audioTransport.connect({
ip : host,
port : audioPort,
rtcpPort: audioRtcpPort
});
audioConsumer = await audioTransport.consume({
producerId: audioProducerId,
rtpCapabilities,
paused: true
});
audioConsumerId = audioConsumer.id;
audioRtpParameters = audioConsumer.rtpParameters;
audioConsumer.clientId = clientId;
audioConsumer.streamId = audioStreamId;
room.consumers.set(audioConsumer.id, audioConsumer);
audioConsumer.observer.on("close", () => {
console.debug("controlServerRecord audioConsumer close", audioConsumer.id);
room.consumers.delete(audioConsumer.id);
});
console.debug("controlServerRecord audio", audioTransportId, audioConsumerId, audioTransport.tuple, audioRtpParameters);
}
if(videoProducerId) {
const videoTransport = await room.mediasoupRouter.createPlainTransport(plainTransportOptions);
videoTransportId = videoTransport.id;
me.transportEvent("plain", roomId, videoTransport);
videoTransport.clientId = clientId;
room.transports.set(videoTransport.id, videoTransport);
videoTransport.observer.on("close", () => {
console.debug("controlServerRecord videoTransport close", videoTransport.id);
room.transports.delete(videoTransport.id)
});
await videoTransport.connect({
ip : host,
port : videoPort,
rtcpPort: videoRtcpPort
});
videoConsumer = await videoTransport.consume({
producerId: videoProducerId,
rtpCapabilities,
paused: true
});
videoConsumerId = videoConsumer.id;
videoRtpParameters = videoConsumer.rtpParameters;
videoConsumer.clientId = clientId;
videoConsumer.streamId = videoStreamId;
room.consumers.set(videoConsumer.id, videoConsumer);
videoConsumer.observer.on("close", () => {
console.debug("controlServerRecord videoConsumer close", videoConsumer.id);
room.consumers.delete(videoConsumer.id);
});
console.debug("controlServerRecord video", videoTransportId, videoConsumerId, videoTransport.tuple, videoRtpParameters);
}
if(audioConsumer) {
await audioConsumer.resume();
}
if(videoConsumer) {
await videoConsumer.resume();
}
this.requestKeyFrameForRecord(0, filepath, videoConsumer);
message.body = {
roomId : roomId,
audioConsumerId : audioConsumerId,
videoConsumerId : videoConsumerId,
audioTransportId : audioTransportId,
videoTransportId : videoTransportId,
audioRtpParameters: audioRtpParameters,
videoRtpParameters: videoRtpParameters,
};
me.push(message);
}
/**
* 请求录像关键帧
* 视频录像需要通过关键帧解析视频信息,关键帧数据太慢会丢弃视频数据包,导致录像文件只有音频没有视频。
*
* @param {*} index 重试次数
* @param {*} filepath 文件路径
* @param {*} videoConsumer 视频消费者
*/
requestKeyFrameForRecord(index, filepath, videoConsumer) {
if(!filepath || !videoConsumer) {
return;
}
if(++index > config.record.requestKeyFrameMaxIndex) {
console.warn("请求录像关键帧次数超限", filepath, index);
return;
}
if(videoConsumer.closed) {
console.warn("请求录像关键帧视频关闭", filepath);
return;
}
// 判断文件大小验证是否已经开始录像:创建文件 -> 视频信息 -> 视频数据 -> 封装视频
if(fs.existsSync(filepath) && fs.statSync(filepath).size >= config.record.requestKeyFrameFileSize) {
console.debug("请求录像关键帧已经开始录像", filepath);
return;
}
console.debug("请求录像关键帧", filepath);
videoConsumer.requestKeyFrame();
setTimeout(() => {
this.requestKeyFrameForRecord(index, filepath, videoConsumer);
}, 1000);
}
async controlServerRecordStop(message, body, room) {
const me = this;
const { audioStreamId, videoStreamId, audioConsumerId, videoConsumerId, audioTransportId, videoTransportId } = body;
const audioConsumer = room.consumers.get(audioConsumerId);
if(audioConsumer) {
audioConsumer.close();
room.consumers.delete(audioConsumerId);
}
const videoConsumer = room.consumers.get(videoConsumerId);
if(videoConsumer) {
videoConsumer.close();
room.consumers.delete(videoConsumerId);
}
const audioTransport = room.transports.get(audioTransportId);
if(audioTransport) {
audioTransport.close();
room.transports.delete(audioTransportId);
}
const videoTransport = room.transports.get(videoTransportId);
if(videoTransport) {
videoTransport.close();
room.transports.delete(videoTransportId);
}
me.push(message);
}
async mediaConsume(message, body) { async mediaConsume(message, body) {
const { const {
roomId, roomId,