diff --git a/taoyao-client-media/src/Taoyao.js b/taoyao-client-media/src/Taoyao.js index dc32cbe..904b39a 100644 --- a/taoyao-client-media/src/Taoyao.js +++ b/taoyao-client-media/src/Taoyao.js @@ -471,7 +471,7 @@ class Taoyao { me.mediaRouterRtpCapabilities(message, body); break; case "media::transport::close": - this.mediaTransportClose(message, body); + me.mediaTransportClose(message, body); break; case "media::transport::plain": me.mediaTransportPlain(message, body); @@ -539,54 +539,97 @@ class Taoyao { }); } - // TODO:continue - /** * 打印日志 */ async usage() { - for (const worker of this.mediasoupWorkers) { + const me = this; + for (const worker of me.mediasoupWorkers) { const usage = await worker.getResourceUsage(); - console.info("Worker使用情况:", worker.pid, usage); + console.info("工作线程使用情况", worker.pid, usage); } - console.info("工作线程数量:", this.mediasoupWorkers.length); - console.info("房间数量:", this.rooms.size); - Array.from(this.rooms.values()).forEach((room) => room.usage()); + console.info("工作线程数量", me.mediasoupWorkers.length); + console.info("现存房间数量", me.rooms.size); + Array.from(me.rooms.values()).forEach((room) => room.usage()); } /** - * @returns 下个Meidasoup Worker + * @returns 下个工作线程 */ nextMediasoupWorker() { - const worker = this.mediasoupWorkers[this.nextMediasoupWorkerIndex]; - if (++this.nextMediasoupWorkerIndex === this.mediasoupWorkers.length) { - this.nextMediasoupWorkerIndex = 0; - } + const me = this; + const worker = me.mediasoupWorkers[me.nextMediasoupWorkerIndex]; + me.nextMediasoupWorkerIndex = ++me.nextMediasoupWorkerIndex % me.mediasoupWorkers.length; return worker; } + /** + * 关闭所有房间 + */ closeAllRoom() { - console.info("关闭所有房间:", this.rooms.size); - this.rooms.forEach((room, roomId) => { - room.closeAll(); - }); + console.info("关闭所有房间", this.rooms.size); + this.rooms.forEach((room, roomId) => room.closeAll()); this.rooms.clear(); } + /** + * 校验房间 + * + * @param {*} room 房间 + */ + checkRoom(room) { + if(!room) { + throw new Error("无效房间"); + } + } + + /** + * 校验通道 + * + * @param {*} transport 通道 + */ + checkTransport(transport) { + if(!transport) { + throw new Error("无效通道"); + } + } + + /** + * 校验生产者 + * + * @param {*} producer 生产者 + */ + checkProducer(producer) { + if(!producer) { + throw new Error("无效生产者"); + } + } + + /** + * 校验消费者 + * + * @param {*} consumer 消费者 + */ + checkConsumer(consumer) { + if(!consumer) { + throw new Error("无效消费者"); + } + } + /** * 重启终端信令 * * @param {*} message 消息 - * @param {*} body 消息主体 + * @param {*} body 消息主体 */ clientReboot(message, body) { + const { clientId } = config.signal; process.exec( - `pm2 restart ${config.signal.clientId}`, - function (error, stdout, stderr) { - console.info("重启媒体服务:", error, stdout, stderr); + `pm2 restart ${clientId}`, + (error, stdout, stderr) => { + console.info("重启媒体服务", clientId, error, stdout, stderr); } ); - // this.push(message); } /** @@ -597,38 +640,217 @@ class Taoyao { */ clientRegister(message, body) { protocol.clientIndex = body.index; - console.debug("终端序号", protocol.clientIndex); + console.debug("终端注册成功", protocol.clientIndex); } /** * 关闭终端信令 * * @param {*} message 消息 - * @param {*} body 消息主体 + * @param {*} body 消息主体 */ clientShutdown(message, body) { + const { clientId } = config.signal; process.exec( - `pm2 stop ${config.signal.clientId}`, - function (error, stdout, stderr) { - console.info("关闭媒体服务:", error, stdout, stderr); + `pm2 stop ${clientId}`, + (error, stdout, stderr) => { + console.info("关闭媒体服务", clientId, error, stdout, stderr); } ); - // this.push(message); } /** - * 媒体重启ICE信令 + * 服务端录像信令 + * + * @param {*} message 消息 + * @param {*} body 消息主体 + */ + async controlServerRecord(message, body) { + const me = this; + const { enabled, roomId } = body; + const room = me.rooms.get(roomId); + me.checkRoom(room); + if(enabled) { + await me.controlServerRecordStart(message, body, room); + } else { + await me.controlServerRecordStop(message, body, room); + } + } + + // TODO:continue + + async controlServerRecordStart(message, body, room) { + const me = this; + const { + roomId, clientId, host, filepath, audioPort, audioRtcpPort, videoPort, videoRtcpPort, + rtpCapabilities, audioStreamId, videoStreamId, audioProducerId, videoProducerId + } = body; + const plainTransportOptions = { + ...config.mediasoup.plainTransportOptions, + rtcpMux: false, + comedia: false + }; + let audioConsumer; + let videoConsumer; + let audioConsumerId; + let videoConsumerId; + let audioTransportId; + let videoTransportId; + let audioRtpParameters; + let videoRtpParameters; + if(audioProducerId) { + const audioTransport = await room.mediasoupRouter.createPlainTransport(plainTransportOptions); + audioTransportId = audioTransport.id; + me.transportEvent("plain", roomId, audioTransport); + audioTransport.clientId = clientId; + room.transports.set(audioTransport.id, audioTransport); + audioTransport.observer.on("close", () => { + console.debug("controlServerRecord audioTransport close:", audioTransport.id); + room.transports.delete(audioTransport.id) + }); + await audioTransport.connect({ + ip : host, + port : audioPort, + rtcpPort: audioRtcpPort + }); + audioConsumer = await audioTransport.consume({ + producerId: audioProducerId, + rtpCapabilities, + paused: true + }); + audioConsumerId = audioConsumer.id; + audioRtpParameters = audioConsumer.rtpParameters; + audioConsumer.clientId = clientId; + audioConsumer.streamId = audioStreamId; + room.consumers.set(audioConsumer.id, audioConsumer); + audioConsumer.observer.on("close", () => { + console.debug("controlServerRecord audioConsumer close:", audioConsumer.id); + room.consumers.delete(audioConsumer.id); + }); + console.debug("controlServerRecord audio", audioTransportId, audioConsumerId, audioTransport.tuple, audioRtpParameters); + } + if(videoProducerId) { + const videoTransport = await room.mediasoupRouter.createPlainTransport(plainTransportOptions); + videoTransportId = videoTransport.id; + me.transportEvent("plain", roomId, videoTransport); + videoTransport.clientId = clientId; + room.transports.set(videoTransport.id, videoTransport); + videoTransport.observer.on("close", () => { + console.debug("controlServerRecord videoTransport close:", videoTransport.id); + room.transports.delete(videoTransport.id) + }); + await videoTransport.connect({ + ip : host, + port : videoPort, + rtcpPort: videoRtcpPort + }); + videoConsumer = await videoTransport.consume({ + producerId: videoProducerId, + rtpCapabilities, + paused: true + }); + videoConsumerId = videoConsumer.id; + videoRtpParameters = videoConsumer.rtpParameters; + videoConsumer.clientId = clientId; + videoConsumer.streamId = videoStreamId; + room.consumers.set(videoConsumer.id, videoConsumer); + videoConsumer.observer.on("close", () => { + console.debug("controlServerRecord videoConsumer close:", videoConsumer.id); + room.consumers.delete(videoConsumer.id); + }); + console.debug("controlServerRecord video:", videoTransportId, videoConsumerId, videoTransport.tuple, videoRtpParameters); + } + if(audioConsumer) { + await audioConsumer.resume(); + } + if(videoConsumer) { + await videoConsumer.resume(); + } + this.requestKeyFrameForRecord(0, filepath, videoConsumer); + message.body = { + roomId : roomId, + audioConsumerId : audioConsumerId, + videoConsumerId : videoConsumerId, + audioTransportId : audioTransportId, + videoTransportId : videoTransportId, + audioRtpParameters: audioRtpParameters, + videoRtpParameters: videoRtpParameters, + }; + me.push(message); + } + + /** + * 请求录像关键帧 + * 视频录像需要通过关键帧解析视频信息,关键帧数据太慢会丢弃视频数据包,导致录像文件只有音频没有视频。 + * + * @param {*} index 重试次数 + * @param {*} filepath 文件路径 + * @param {*} videoConsumer 视频消费者 + */ + requestKeyFrameForRecord(index, filepath, videoConsumer) { + if(!filepath || !videoConsumer) { + return; + } + if(++index > config.record.requestKeyFrameMaxIndex) { + console.warn("请求录像关键帧次数超限", filepath, index); + return; + } + if(videoConsumer.closed) { + console.warn("请求录像关键帧视频关闭", filepath); + return; + } + // 判断文件大小验证是否已经开始录像:创建文件 -> 视频信息 -> 视频数据 -> 封装视频 + if(fs.existsSync(filepath) && fs.statSync(filepath).size >= config.record.requestKeyFrameFileSize) { + console.debug("请求录像关键帧已经开始录像", filepath); + return; + } + console.debug("请求录像关键帧", filepath); + videoConsumer.requestKeyFrame(); + setTimeout(() => { + this.requestKeyFrameForRecord(index, filepath, videoConsumer); + }, 1000); + } + + async controlServerRecordStop(message, body, room) { + const me = this; + const { audioStreamId, videoStreamId, audioConsumerId, videoConsumerId, audioTransportId, videoTransportId } = body; + const audioConsumer = room.consumers.get(audioConsumerId); + if(audioConsumer) { + audioConsumer.close(); + room.consumers.delete(audioConsumerId); + } + const videoConsumer = room.consumers.get(videoConsumerId); + if(videoConsumer) { + videoConsumer.close(); + room.consumers.delete(videoConsumerId); + } + const audioTransport = room.transports.get(audioTransportId); + if(audioTransport) { + audioTransport.close(); + room.transports.delete(audioTransportId); + } + const videoTransport = room.transports.get(videoTransportId); + if(videoTransport) { + videoTransport.close(); + room.transports.delete(videoTransportId); + } + me.push(message); + } + + /** + * 重启ICE信令 * * @param {*} message 消息 - * @param {*} body 消息主体 + * @param {*} body 消息主体 */ async mediaIceRestart(message, body) { + const me = this; const { roomId, transportId } = body; - const room = this.rooms.get(roomId); - const transport = room.transports.get(transportId); + const room = me.rooms.get(roomId); + const transport = room.transports.get(transportId); const iceParameters = await transport.restartIce(); message.body.iceParameters = iceParameters; - this.push(message); + me.push(message); } async mediaProduce(message, body) { @@ -792,181 +1014,6 @@ class Taoyao { } } - /** - * 媒体录像 - * - * @param {*} message 消息 - * @param {*} body 消息主体 - */ - async controlServerRecord(message, body) { - const me = this; - const { enabled, roomId } = body; - const room = this.rooms.get(roomId); - if(enabled) { - await me.controlServerRecordStart(message, body, room); - } else { - await me.controlServerRecordStop(message, body, room); - } - } - - async controlServerRecordStart(message, body, room) { - const me = this; - const { - roomId, clientId, host, filepath, audioPort, audioRtcpPort, videoPort, videoRtcpPort, - rtpCapabilities, audioStreamId, videoStreamId, audioProducerId, videoProducerId - } = body; - const plainTransportOptions = { - ...config.mediasoup.plainTransportOptions, - rtcpMux: false, - comedia: false - }; - let audioConsumer; - let videoConsumer; - let audioConsumerId; - let videoConsumerId; - let audioTransportId; - let videoTransportId; - let audioRtpParameters; - let videoRtpParameters; - if(audioProducerId) { - const audioTransport = await room.mediasoupRouter.createPlainTransport(plainTransportOptions); - audioTransportId = audioTransport.id; - me.transportEvent("plain", roomId, audioTransport); - audioTransport.clientId = clientId; - room.transports.set(audioTransport.id, audioTransport); - audioTransport.observer.on("close", () => { - console.debug("controlServerRecord audioTransport close:", audioTransport.id); - room.transports.delete(audioTransport.id) - }); - await audioTransport.connect({ - ip : host, - port : audioPort, - rtcpPort: audioRtcpPort - }); - audioConsumer = await audioTransport.consume({ - producerId: audioProducerId, - rtpCapabilities, - paused: true - }); - audioConsumerId = audioConsumer.id; - audioRtpParameters = audioConsumer.rtpParameters; - audioConsumer.clientId = clientId; - audioConsumer.streamId = audioStreamId; - room.consumers.set(audioConsumer.id, audioConsumer); - audioConsumer.observer.on("close", () => { - console.debug("controlServerRecord audioConsumer close:", audioConsumer.id); - room.consumers.delete(audioConsumer.id); - }); - console.debug("controlServerRecord audio", audioTransportId, audioConsumerId, audioTransport.tuple, audioRtpParameters); - } - if(videoProducerId) { - const videoTransport = await room.mediasoupRouter.createPlainTransport(plainTransportOptions); - videoTransportId = videoTransport.id; - me.transportEvent("plain", roomId, videoTransport); - videoTransport.clientId = clientId; - room.transports.set(videoTransport.id, videoTransport); - videoTransport.observer.on("close", () => { - console.debug("controlServerRecord videoTransport close:", videoTransport.id); - room.transports.delete(videoTransport.id) - }); - await videoTransport.connect({ - ip : host, - port : videoPort, - rtcpPort: videoRtcpPort - }); - videoConsumer = await videoTransport.consume({ - producerId: videoProducerId, - rtpCapabilities, - paused: true - }); - videoConsumerId = videoConsumer.id; - videoRtpParameters = videoConsumer.rtpParameters; - videoConsumer.clientId = clientId; - videoConsumer.streamId = videoStreamId; - room.consumers.set(videoConsumer.id, videoConsumer); - videoConsumer.observer.on("close", () => { - console.debug("controlServerRecord videoConsumer close:", videoConsumer.id); - room.consumers.delete(videoConsumer.id); - }); - console.debug("controlServerRecord video:", videoTransportId, videoConsumerId, videoTransport.tuple, videoRtpParameters); - } - if(audioConsumer) { - await audioConsumer.resume(); - } - if(videoConsumer) { - await videoConsumer.resume(); - } - this.requestKeyFrameForRecord(0, filepath, videoConsumer); - message.body = { - roomId : roomId, - audioConsumerId : audioConsumerId, - videoConsumerId : videoConsumerId, - audioTransportId : audioTransportId, - videoTransportId : videoTransportId, - audioRtpParameters: audioRtpParameters, - videoRtpParameters: videoRtpParameters, - }; - me.push(message); - } - - /** - * 请求录像关键帧 - * 视频录像需要通过关键帧解析视频信息,关键帧数据太慢会丢弃视频数据包,导致录像文件只有音频没有视频。 - * - * @param {*} index 重试次数 - * @param {*} filepath 文件路径 - * @param {*} videoConsumer 视频消费者 - */ - requestKeyFrameForRecord(index, filepath, videoConsumer) { - if(!filepath || !videoConsumer) { - return; - } - if(++index > config.record.requestKeyFrameMaxIndex) { - console.warn("请求录像关键帧次数超限", filepath, index); - return; - } - if(videoConsumer.closed) { - console.warn("请求录像关键帧视频关闭", filepath); - return; - } - // 判断文件大小验证是否已经开始录像:创建文件 -> 视频信息 -> 视频数据 -> 封装视频 - if(fs.existsSync(filepath) && fs.statSync(filepath).size >= config.record.requestKeyFrameFileSize) { - console.debug("请求录像关键帧已经开始录像", filepath); - return; - } - console.debug("请求录像关键帧", filepath); - videoConsumer.requestKeyFrame(); - setTimeout(() => { - this.requestKeyFrameForRecord(index, filepath, videoConsumer); - }, 1000); - } - - async controlServerRecordStop(message, body, room) { - const me = this; - const { audioStreamId, videoStreamId, audioConsumerId, videoConsumerId, audioTransportId, videoTransportId } = body; - const audioConsumer = room.consumers.get(audioConsumerId); - if(audioConsumer) { - audioConsumer.close(); - room.consumers.delete(audioConsumerId); - } - const videoConsumer = room.consumers.get(videoConsumerId); - if(videoConsumer) { - videoConsumer.close(); - room.consumers.delete(videoConsumerId); - } - const audioTransport = room.transports.get(audioTransportId); - if(audioTransport) { - audioTransport.close(); - room.transports.delete(audioTransportId); - } - const videoTransport = room.transports.get(videoTransportId); - if(videoTransport) { - videoTransport.close(); - room.transports.delete(videoTransportId); - } - me.push(message); - } - async mediaConsume(message, body) { const { roomId,