diff --git a/taoyao-client-media/src/Taoyao.js b/taoyao-client-media/src/Taoyao.js index e68eb9d..4b195ff 100644 --- a/taoyao-client-media/src/Taoyao.js +++ b/taoyao-client-media/src/Taoyao.js @@ -838,7 +838,6 @@ class Taoyao { * @param {*} body 消息主体 */ async mediaProduce(message, body) { - const me = this; const { kind, roomId, @@ -848,6 +847,7 @@ class Taoyao { appData, rtpParameters } = body; + const me = this; const room = me.rooms.get(roomId); const transport = room?.transports.get(transportId); if(!transport) { @@ -863,6 +863,7 @@ class Taoyao { producer.clientId = clientId; producer.streamId = streamId; room.producers.set(producer.id, producer); + console.debug("创建生产者", producer.id, streamId); producer.on("transportclose", () => { console.info("生产者关闭(通道关闭)", producer.id, streamId); producer.close(); @@ -870,7 +871,7 @@ class Taoyao { producer.observer.on("close", () => { if(room.producers.delete(producer.id)) { console.info("生产者关闭", producer.id, streamId); - this.push( + me.push( protocol.buildMessage("media::producer::close", { roomId : roomId, producerId: producer.id @@ -882,7 +883,7 @@ class Taoyao { }); producer.observer.on("pause", () => { console.debug("生产者暂停", producer.id, streamId); - this.push( + me.push( protocol.buildMessage("media::producer::pause", { roomId : roomId, producerId: producer.id @@ -891,7 +892,7 @@ class Taoyao { }); producer.observer.on("resume", () => { console.debug("生产者恢复", producer.id, streamId); - this.push( + me.push( protocol.buildMessage("media::producer::resume", { roomId : roomId, producerId: producer.id @@ -922,14 +923,14 @@ class Taoyao { // await producer.enableTraceEvent([ 'pli', 'fir', 'rtp', 'nack', 'keyframe' ]); // producer.observer.on("trace", fn(trace)); // producer.on("trace", (trace) => { - // console.debug("生产者跟踪", producer.id, trace); + // console.debug("生产者跟踪事件(trace)", producer.id, streamId, trace); // }); message.body = { kind : kind, roomId : roomId, producerId: producer.id }; - this.push(message); + me.push(message); if (producer.kind === "audio") { room.audioLevelObserver .addProducer({ producerId: producer.id }) @@ -944,23 +945,22 @@ class Taoyao { } } - // TODO:continue - /** * 关闭生产者信令 * * @param {*} message 消息 - * @param {*} body 消息主体 + * @param {*} body 消息主体 */ async mediaProducerClose(message, body) { + const me = this; const { roomId, producerId } = body; - const room = this.rooms.get(roomId); + const room = me.rooms.get(roomId); const producer = room?.producers.get(producerId); if(producer) { - console.info("关闭生产者:", producerId); + console.info("关闭生产者", producerId); await producer.close(); } else { - console.info("关闭生产者无效:", producerId); + console.debug("关闭生产者(无效)", producerId); } } @@ -968,17 +968,18 @@ class Taoyao { * 暂停生产者信令 * * @param {*} message 消息 - * @param {*} body 消息主体 + * @param {*} body 消息主体 */ async mediaProducerPause(message, body) { + const me = this; const { roomId, producerId } = body; - const room = this.rooms.get(roomId); - const producer = room.producers.get(producerId); + const room = me.rooms.get(roomId); + const producer = room?.producers.get(producerId); if(producer) { - console.info("暂停生产者:", producerId); + console.debug("暂停生产者", producerId); await producer.pause(); } else { - console.info("暂停生产者无效:", producerId); + console.debug("暂停生产者无效(无效)", producerId); } } @@ -986,20 +987,27 @@ class Taoyao { * 恢复生产者信令 * * @param {*} message 消息 - * @param {*} body 消息主体 + * @param {*} body 消息主体 */ async mediaProducerResume(message, body) { + const me = this; const { roomId, producerId } = body; - const room = this.rooms.get(roomId); - const producer = room.producers.get(producerId); + const room = me.rooms.get(roomId); + const producer = room?.producers.get(producerId); if(producer) { - console.info("恢复生产者:", producerId); + console.debug("恢复生产者", producerId); await producer.resume(); } else { - console.info("恢复生产者无效:", producerId); + console.debug("恢复生产者(无效)", producerId); } } + /** + * 消费媒体信令 + * + * @param {*} message 消息 + * @param {*} body 消息主体 + */ async mediaConsume(message, body) { const { roomId, @@ -1011,63 +1019,50 @@ class Taoyao { appData, rtpCapabilities, } = body; - const room = this.rooms.get(roomId); - const producer = room?.producers.get(producerId); + const me = this; + const room = me.rooms.get(roomId); + const producer = room?.producers.get(producerId); const transport = room?.transports.get(transportId); if ( - !room || - !producer || - !transport || + !room || + !producer || + !transport || !rtpCapabilities || !room.mediasoupRouter.canConsume({ - producerId: producerId, + producerId : producerId, rtpCapabilities: rtpCapabilities, }) ) { - console.warn( - "不能消费媒体:", - roomId, - clientId, - producerId, - transportId, - rtpCapabilities - ); + console.warn("不能消费媒体", body); return; } - const promises = []; - const consumerCount = 1 + room.consumerReplicas; + const promises = []; + const consumerCount = room.consumerReplicas + 1; for (let i = 0; i < consumerCount; i++) { promises.push( (async () => { let consumer; try { consumer = await transport.consume({ - producerId: producerId, + // 默认暂停 + paused : true, + producerId : producerId, rtpCapabilities: rtpCapabilities, - // 暂停 - paused: true, }); } catch (error) { - console.error( - "创建消费者异常:", - roomId, - clientId, - producerId, - transportId, - rtpCapabilities, - error - ); + console.error("创建消费者异常", body, error); return; } consumer.clientId = clientId; consumer.streamId = streamId; room.consumers.set(consumer.id, consumer); + console.debug("创建消费者", consumer.id, streamId); consumer.on("transportclose", () => { - console.info("consumer transportclose:", consumer.id); + console.info("消费者关闭(通道关闭)", consumer.id, streamId); consumer.close(); }); consumer.on("producerclose", () => { - console.info("consumer producerclose:", consumer.id); + console.info("消费者关闭(生产者关闭)", consumer.id, streamId); consumer.close(); }); consumer.on("producerpause", () => { @@ -1075,114 +1070,98 @@ class Taoyao { if(consumer.localPaused) { return; } - console.info("consumer producerpause:", consumer.id); + console.debug("消费者暂停(生产者暂停)", consumer.id, streamId); consumer.pause(); - this.push( - protocol.buildMessage("media::consumer::pause", { - roomId: roomId, - consumerId: consumer.id, - }) - ); }); consumer.on("producerresume", () => { // 本地暂停不要操作 if(consumer.localPaused) { return; } - console.info("consumer producerresume:", consumer.id); + console.debug("消费者恢复(生产者恢复)", consumer.id, streamId); consumer.resume(); - this.push( - protocol.buildMessage("media::consumer::resume", { - roomId: roomId, - consumerId: consumer.id, - }) - ); }); + // consumer.observer.on("score", fn(score)); consumer.on("score", (score) => { - console.info("consumer score:", consumer.id, score); - this.push( + console.debug("消费者评分", consumer.id, streamId, score); + me.push( protocol.buildMessage("media::consumer::score", { - score: score, - roomId: roomId, + score : score, + roomId : roomId, consumerId: consumer.id, }) ); }); + // consumer.observer.on("layerschange", fn(layers)); consumer.on("layerschange", (layers) => { - console.info("consumer layerschange:", consumer.id, layers); - this.push( + console.debug("消费者空间层和时间层改变", consumer.id, streamId, layers); + me.push( protocol.buildMessage("media::consumer::layers::change", { - roomId: roomId, + roomId : roomId, consumerId: consumer.id, - spatialLayer: layers ? layers.spatialLayer : null, + spatialLayer : layers ? layers.spatialLayer : null, temporalLayer: layers ? layers.temporalLayer : null, }) ); }); - // await consumer.enableTraceEvent([ 'rtp', 'keyframe', 'nack', 'pli', 'fir' ]); - // consumer.on("trace", (trace) => { - // console.info("consumer trace:", consumer.id, trace); - // }); - // consumer.on("rtp", (rtpPacket) => { - // console.info("consumer rtp:", consumer.id, rtpPacket); - // }); consumer.observer.on("close", () => { if(room.consumers.delete(consumer.id)) { - console.info("consumer close:", consumer.id); - this.push( + console.info("消费者关闭", consumer.id, streamId); + me.push( protocol.buildMessage("media::consumer::close", { - roomId: roomId, + roomId : roomId, consumerId: consumer.id }) ); } else { - console.debug("consumer close non:", consumer.id); + console.debug("消费者关闭(无效)", consumer.id, streamId); } }); consumer.observer.on("pause", () => { - console.info("consumer pause:", consumer.id); - this.push( + console.debug("消费者暂停", consumer.id, streamId); + me.push( protocol.buildMessage("media::consumer::pause", { - roomId: roomId, + roomId : roomId, consumerId: consumer.id }) ); }); consumer.observer.on("resume", () => { - console.info("consumer resume:", consumer.id); - this.push( + console.debug("消费者恢复", consumer.id, streamId); + me.push( protocol.buildMessage("media::consumer::resume", { - roomId: roomId, + roomId : roomId, consumerId: consumer.id }) ); }); - // consumer.observer.on("score", fn(score)); - // consumer.observer.on("layerschange", fn(layers)); + // await consumer.enableTraceEvent([ 'pli', 'fir', 'rtp', 'nack', 'keyframe' ]); // consumer.observer.on("trace", fn(trace)); - // 等待终端准备就绪 - await this.request( - // this.push( + // consumer.on("trace", (trace) => { + // console.info("消费者跟踪事件(trace)", consumer.id, streamId, trace); + // }); + // 等待终端准备就绪:可以不用等待直接使用push方法 + await me.request( protocol.buildMessage("media::consume", { - kind: consumer.kind, - type: consumer.type, - roomId: roomId, - appData: producer.appData, - clientId: clientId, - sourceId: sourceId, - streamId: streamId, - producerId: producerId, - consumerId: consumer.id, - rtpParameters: consumer.rtpParameters, + kind : consumer.kind, + type : consumer.type, + roomId : roomId, + clientId : clientId, + sourceId : sourceId, + streamId : streamId, + producerId : producerId, + consumerId : consumer.id, + appData : producer.appData, + rtpParameters : consumer.rtpParameters, producerPaused: consumer.producerPaused, }) ); await consumer.resume(); consumer.localPaused = false; - this.push( + me.push( protocol.buildMessage("media::consumer::score", { - score: consumer.score, - roomId: roomId, + score : consumer.score, + roomId : roomId, consumerId: consumer.id, }) ); @@ -1192,7 +1171,7 @@ class Taoyao { try { await Promise.all(promises); } catch (error) { - console.warn("_createConsumer() | failed:%o", error); + console.error("消费媒体异常", error); } } @@ -1200,17 +1179,18 @@ class Taoyao { * 关闭消费者信令 * * @param {*} message 消息 - * @param {*} body 消息主体 + * @param {*} body 消息主体 */ async mediaConsumerClose(message, body) { + const me = this; const { roomId, consumerId } = body; - const room = this.rooms.get(roomId); + const room = me.rooms.get(roomId); const consumer = room?.consumers.get(consumerId); if(consumer) { - console.info("关闭消费者:", consumerId); + console.info("关闭消费者", consumerId); await consumer.close(); } else { - console.info("关闭消费者无效:", consumerId); + console.debug("关闭消费者无效(无效)", consumerId); } } @@ -1218,18 +1198,19 @@ class Taoyao { * 暂停消费者信令 * * @param {*} message 消息 - * @param {*} body 消息主体 + * @param {*} body 消息主体 */ async mediaConsumerPause(message, body) { + const me = this; const { roomId, consumerId } = body; - const room = this.rooms.get(roomId); + const room = me.rooms.get(roomId); const consumer = room?.consumers.get(consumerId); if(consumer) { consumer.localPaused = true; - console.info("暂停消费者:", consumerId); + console.debug("暂停消费者", consumerId); await consumer.pause(); } else { - console.info("暂停消费者无效:", consumerId); + console.debug("暂停消费者(无效)", consumerId); } } @@ -1237,20 +1218,19 @@ class Taoyao { * 请求关键帧信令 * * @param {*} message 消息 - * @param {*} body 消息主体 + * @param {*} body 消息主体 */ async mediaConsumerRequestKeyFrame(message, body) { const me = this; const { roomId, consumerId } = body; - const room = this.rooms.get(roomId); + const room = me.rooms.get(roomId); const consumer = room?.consumers.get(consumerId); if(consumer) { - console.info("mediaConsumerRequestKeyFrame:", consumerId); - // 处理trace监听读取关键帧 + console.debug("请求关键帧", consumerId); + // 通过trace事件监听关键帧的信息 await consumer.requestKeyFrame(); - me.push(message); } else { - console.info("mediaConsumerRequestKeyFrame non:", consumerId); + console.debug("请求关键帧(无效)", consumerId); } } @@ -1258,18 +1238,19 @@ class Taoyao { * 恢复消费者信令 * * @param {*} message 消息 - * @param {*} body 消息主体 + * @param {*} body 消息主体 */ async mediaConsumerResume(message, body) { + const me = this; const { roomId, consumerId } = body; - const room = this.rooms.get(roomId); - const consumer = room.consumers.get(consumerId); + const room = me.rooms.get(roomId); + const consumer = room?.consumers.get(consumerId); if(consumer) { consumer.localPaused = false; - console.info("恢复消费者:", consumerId); + console.debug("恢复消费者", consumerId); await consumer.resume(); } else { - console.info("恢复消费者无效:", consumerId); + console.debug("恢复消费者(无效)", consumerId); } } @@ -1277,7 +1258,7 @@ class Taoyao { * 修改最佳空间层和时间层信令 * * @param {*} message 消息 - * @param {*} body 消息主体 + * @param {*} body 消息主体 */ async mediaConsumerSetPreferredLayers(message, body) { const me = this; @@ -1287,17 +1268,18 @@ class Taoyao { spatialLayer, temporalLayer } = body; - const room = this.rooms.get(roomId); + const room = me.rooms.get(roomId); const consumer = room?.consumers.get(consumerId); if(consumer) { - console.info("mediaConsumerSetPreferredLayers:", consumerId); + console.debug("修改最佳空间层和时间层", consumerId); await consumer.setPreferredLayers({ spatialLayer, temporalLayer }); - me.push(message); } else { - console.info("mediaConsumerSetPreferredLayers non:", consumerId); + console.debug("修改最佳空间层和时间层(无效)", consumerId); } } + // TODO:continue + /** * 消费数据信令 * @@ -1641,14 +1623,9 @@ class Taoyao { console.info("transport listenserverclose:", transport.id); transport.close(); }); - // await transport.enableTraceEvent([ 'probation', 'bwe' ]); + // await transport.enableTraceEvent([ 'bwe', 'probation' ]); // transport.on("trace", (trace) => { - // // 网络评估 - // if (trace.type === "bwe" && trace.direction === "out") { - // logger.debug("transport downlinkBwe:", trace); - // } else { - // console.debug("transport trace:", transport.id, trace); - // } + // console.debug("通道跟踪事件(trace)", transport.id, trace); // }); transport.observer.on("close", () => { if(room.transports.delete(transport.id)) { diff --git a/taoyao-client-web/src/components/Taoyao.js b/taoyao-client-web/src/components/Taoyao.js index a61f97b..2a967b8 100644 --- a/taoyao-client-web/src/components/Taoyao.js +++ b/taoyao-client-web/src/components/Taoyao.js @@ -2300,6 +2300,7 @@ class Taoyao extends RemoteClient { async sessionCall(clientId) { const me = this; if (!clientId) { + // TODO:判断自己 this.callbackError("无效终端"); return; }