diff --git a/taoyao-client-media/src/Taoyao.js b/taoyao-client-media/src/Taoyao.js index 4b195ff..2bb9499 100644 --- a/taoyao-client-media/src/Taoyao.js +++ b/taoyao-client-media/src/Taoyao.js @@ -851,7 +851,7 @@ class Taoyao { const room = me.rooms.get(roomId); const transport = room?.transports.get(transportId); if(!transport) { - console.warn("生成媒体通道无效", roomId, transportId); + console.warn("生产媒体通道无效", roomId, transportId); return; } const producer = await transport.produce({ @@ -1097,8 +1097,8 @@ class Taoyao { console.debug("消费者空间层和时间层改变", consumer.id, streamId, layers); me.push( protocol.buildMessage("media::consumer::layers::change", { - roomId : roomId, - consumerId: consumer.id, + roomId : roomId, + consumerId : consumer.id, spatialLayer : layers ? layers.spatialLayer : null, temporalLayer: layers ? layers.temporalLayer : null, }) @@ -1143,14 +1143,14 @@ class Taoyao { // 等待终端准备就绪:可以不用等待直接使用push方法 await me.request( protocol.buildMessage("media::consume", { - kind : consumer.kind, - type : consumer.type, roomId : roomId, clientId : clientId, sourceId : sourceId, streamId : streamId, producerId : producerId, consumerId : consumer.id, + kind : consumer.kind, + type : consumer.type, appData : producer.appData, rtpParameters : consumer.rtpParameters, producerPaused: consumer.producerPaused, @@ -1278,13 +1278,11 @@ class Taoyao { } } - // TODO:continue - /** * 消费数据信令 * * @param {*} message 消息 - * @param {*} body 消息主体 + * @param {*} body 消息主体 */ async mediaDataConsume(message, body) { const me = this; @@ -1297,21 +1295,15 @@ class Taoyao { transportId, rtpCapabilities, } = body; - const room = this.rooms.get(roomId); - const transport = room?.transports.get(transportId); + const room = me.rooms.get(roomId); + const transport = room?.transports.get(transportId); const dataProducer = room?.dataProducers.get(producerId); if ( - !room || + !room || !transport || !dataProducer ) { - console.warn( - "不能消费数据:", - roomId, - clientId, - producerId, - transportId - ); + console.warn("不能消费数据", body); return; } let dataConsumer; @@ -1320,48 +1312,45 @@ class Taoyao { dataProducerId : dataProducer.id }); } catch (error) { - console.error("消费数据异常:", producerId, error); + console.error("创建数据消费者异常", body, error); return; } dataConsumer.clientId = clientId; dataConsumer.streamId = streamId; room.dataConsumers.set(dataConsumer.id, dataConsumer); - console.info("创建数据消费者:", dataProducer.id); + console.debug("创建数据消费者", dataProducer.id, streamId); dataConsumer.on('transportclose', () => { - console.info("dataConsumer transportclose:", dataConsumer.id); + console.info("数据消费者关闭(通道关闭)", dataConsumer.id, streamId); dataConsumer.close(); }); dataConsumer.on('dataproducerclose', () => { - console.info("dataConsumer dataproducerclose:", dataConsumer.id); + console.info("数据消费者关闭(生产者关闭)", dataConsumer.id, streamId); dataConsumer.close(); }); dataConsumer.observer.on("close", () => { if(room.dataConsumers.delete(dataConsumer.id)) { - console.info("dataConsumer close:", dataConsumer.id); + console.info("数据消费者关闭", dataConsumer.id, streamId); me.push( protocol.buildMessage("media::data::consumer::close", { - roomId: roomId, + roomId : roomId, consumerId: dataConsumer.id, }) ); } else { - console.info("dataConsumer close non:", dataConsumer.id); + console.debug("数据消费者关闭(无效)", dataConsumer.id, streamId); } }); - // dataConsumer.on("message", fn(message, ppid)); - // dataConsumer.on("bufferedamountlow", fn(bufferedAmount)); - // dataConsumer.on("sctpsendbufferfull", fn()); - this.push( + me.push( protocol.buildMessage("media::data::consume", { - label: dataConsumer.label, - roomId: roomId, - appData: dataProducer.appData, - protocol: dataConsumer.protocol, - clientId: clientId, - sourceId: sourceId, - streamId: streamId, - producerId: producerId, - consumerId: dataConsumer.id, + roomId : roomId, + clientId : clientId, + sourceId : sourceId, + streamId : streamId, + producerId : producerId, + consumerId : dataConsumer.id, + label : dataConsumer.label, + appData : dataProducer.appData, + protocol : dataConsumer.protocol, sctpStreamParameters: dataConsumer.sctpStreamParameters, }) ); @@ -1371,17 +1360,18 @@ class Taoyao { * 关闭数据消费者信令 * * @param {*} message 消息 - * @param {*} body 消息主体 + * @param {*} body 消息主体 */ async mediaDataConsumerClose(message, body) { + const me = this; const { roomId, consumerId } = body; - const room = this.rooms.get(roomId); + const room = me.rooms.get(roomId); const dataConsumer = room?.dataConsumers.get(consumerId); if(dataConsumer) { - console.info("关闭数据消费者:", consumerId); + console.info("关闭数据消费者", consumerId); await dataConsumer.close(); } else { - console.info("关闭数据消费者无效:", consumerId); + console.info("关闭数据消费者(无效)", consumerId); } } @@ -1389,7 +1379,7 @@ class Taoyao { * 生产数据信令 * * @param {*} message 消息 - * @param {*} body 消息主体 + * @param {*} body 消息主体 */ async mediaDataProduce(message, body) { const me = this; @@ -1403,10 +1393,10 @@ class Taoyao { transportId, sctpStreamParameters, } = body; - const room = me.rooms.get(roomId); + const room = me.rooms.get(roomId); const transport = room?.transports.get(transportId); if(!transport) { - console.warn("生产数据生产者通道无效:", transportId); + console.warn("生产数据通道无效", roomId, transportId); return; } const dataProducer = await transport.produceData({ @@ -1418,46 +1408,52 @@ class Taoyao { dataProducer.clientId = clientId; dataProducer.streamId = streamId; room.dataProducers.set(dataProducer.id, dataProducer); - console.info("创建数据生产者:", dataProducer.id); + console.info("创建数据生产者", dataProducer.id, streamId); dataProducer.on("transportclose", () => { - console.info("dataProducer transportclose:", dataProducer.id); + console.info("数据生产者关闭(通道关闭)", dataProducer.id, streamId); dataProducer.close(); }); dataProducer.observer.on("close", () => { if(room.dataProducers.delete(dataProducer.id)) { - console.info("dataProducer close:", dataProducer.id); + console.info("数据生产者关闭", dataProducer.id, streamId); me.push( taoyaoProtocol.buildMessage("media::data::producer::close", { - roomId: roomId, + roomId : roomId, producerId: dataProducer.id, }) ); } else { - console.info("dataProducer close non:", dataProducer.id); + console.debug("数据生产者关闭(无效)", dataProducer.id, streamId); } }) - message.body = { roomId: roomId, producerId: dataProducer.id }; - this.push(message); + message.body = { + roomId : roomId, + producerId: dataProducer.id + }; + me.push(message); } /** * 关闭数据生产者信令 * * @param {*} message 消息 - * @param {*} body 消息主体 + * @param {*} body 消息主体 */ async mediaDataProducerClose(message, body) { + const me = this; const { roomId, producerId } = body; - const room = this.rooms.get(roomId); + const room = me.rooms.get(roomId); const dataProducer = room?.dataProducers.get(producerId); if(dataProducer) { - console.info("关闭数据生产者:", producerId); + console.info("关闭数据生产者", producerId); await dataProducer.close(); } else { - console.info("关闭数据生产者无效:", producerId); + console.info("关闭数据生产者(无效)", producerId); } } + // TODO:continue + /** * 路由RTP协商信令 * diff --git a/taoyao-client-web/src/components/Taoyao.js b/taoyao-client-web/src/components/Taoyao.js index 2a967b8..cdcce6d 100644 --- a/taoyao-client-web/src/components/Taoyao.js +++ b/taoyao-client-web/src/components/Taoyao.js @@ -1510,8 +1510,15 @@ class Taoyao extends RemoteClient { console.error("dataConsumer error:", dataConsumer.id, error); dataConsumer.close(); }); - dataConsumer.on('message', (message) => { + // dataConsumer.on("bufferedamountlow", fn(bufferedAmount)); + // dataConsumer.on("sctpsendbufferfull", fn()); + dataConsumer.on('message', (message, ppid) => { console.info("dataConsume message:", dataConsumer.id, message); + if (ppid === 51) { + console.log("文本", message.toString("utf-8")); + } else if (ppid === 53) { + console.log("二进制"); + } }); } catch (error) { console.error("打开数据消费者异常", error); @@ -2141,6 +2148,7 @@ class Taoyao extends RemoteClient { */ async produceData() { const me = this; + // TODO:判断dataProduce try { const dataProducer = await me.sendTransport.produceData({ ordered: false, @@ -2163,9 +2171,6 @@ class Taoyao extends RemoteClient { console.debug("dataProducer transportclose:", me.dataProducer.id); me.dataProducer.close(); }); - me.dataProducer.on("bufferedamountlow", () => { - console.debug("dataProducer bufferedamountlow:", me.dataProducer.id); - }); } catch (error) { me.callbackError("生产数据异常", error); }