diff --git a/docs/Learning.md b/docs/Learning.md index cb9df3c..fde19af 100644 --- a/docs/Learning.md +++ b/docs/Learning.md @@ -28,6 +28,7 @@ https://www.cnblogs.com/ssyfj/p/14843082.html https://zhuanlan.zhihu.com/p/466172240 http://koca.szkingdom.com/forum/t/topic/218 +https://blog.csdn.net/qw225967/article/details/121251305 http://www.manoner.com/post/音视频基础/WebRTC核心组件和协议栈/ https://blog.csdn.net/ababab12345/article/details/115585378 https://blog.csdn.net/jisuanji111111/article/details/121634199 diff --git a/taoyao-client-media/src/Server.js b/taoyao-client-media/src/Server.js index db96572..a7ffc0d 100644 --- a/taoyao-client-media/src/Server.js +++ b/taoyao-client-media/src/Server.js @@ -82,6 +82,8 @@ async function main() { console.log(` 桃之夭夭,灼灼其华。 之子于归,宜其室家。 + + :: https://gitee.com/acgist/taoyao `); console.info("开始启动:", config.name); await buildMediasoupWorkers(); diff --git a/taoyao-client-media/src/Taoyao.js b/taoyao-client-media/src/Taoyao.js index 81de5b2..0a49fc7 100644 --- a/taoyao-client-media/src/Taoyao.js +++ b/taoyao-client-media/src/Taoyao.js @@ -1,5 +1,5 @@ -const config = require("./Config"); -const process = require("child_process"); +const config = require("./Config.js"); +const process = require("child_process"); const WebSocket = require("ws"); /** @@ -21,34 +21,40 @@ const protocol = { } const date = new Date(); return ( - 100000000000000 * date.getDate() + - 1000000000000 * date.getHours() + - 10000000000 * date.getMinutes() + - 100000000 * date.getSeconds() + - 1000 * this.clientIndex + + 100000000000000 * date.getDate() + + 1000000000000 * date.getHours() + + 10000000000 * date.getMinutes() + + 100000000 * date.getSeconds() + + 1000 * this.clientIndex + this.index ); }, /** * @param {*} signal 信令标识 - * @param {*} body 消息主体 - * @param {*} id 消息ID + * @param {*} body 消息主体 + * @param {*} id 消息ID + * @param {*} v 消息版本 * * @returns 信令消息 */ - buildMessage(signal, body = {}, id) { + buildMessage(signal, body = {}, id, v) { const message = { header: { - v: config.signal.version, - id: id || this.buildId(), + v: v || config.signal.version, + id: id || this.buildId(), signal: signal, }, - body: body, + body: body, }; return message; }, }; +/** + * 名称冲突 + */ + const taoyaoProtocol = protocol; + /** * 信令通道 */ @@ -404,9 +410,30 @@ class Taoyao { case "media::consumer::pause": me.mediaConsumerPause(message, body); break; + case "media::consumer::request::key::frame": + me.mediaConsumerRequestKeyFrame(message, body); + break; case "media::consumer::resume": me.mediaConsumerResume(message, body); break; + case "media::consumer::set::preferred::layers": + me.mediaConsumerSetPreferredLayers(message, body); + break; + case "media::data::consume": + me.mediaDataConsume(message, body); + break; + case "media::data::consumer::close": + me.mediaDataConsumerClose(message, body); + break; + case "media::data::produce": + me.mediaDataProduce(message, body); + break; + case "media::data::producer::close": + me.mediaDataProducerClose(message, body); + break; + case "media::ice::restart": + me.mediaIceRestart(message, body); + break; case "media::produce": me.mediaProduce(message, body); break; @@ -576,6 +603,7 @@ class Taoyao { kind, appData, rtpParameters, + // 关键帧延迟时间 // keyFrameRequestDelay: 5000 }); producer.clientId = clientId; @@ -597,10 +625,15 @@ class Taoyao { }); producer.on("videoorientationchange", (videoOrientation) => { console.info("producer videoorientationchange:", producer.id, videoOrientation); + self.push(protocol.buildMessage("media::video::orientation::change", { + roomId: roomId, + ...videoOrientation + })); }); - producer.on("trace", (trace) => { - console.info("producer trace:", producer.id, trace); - }); + // await producer.enableTraceEvent([ 'rtp', 'keyframe', 'nack', 'pli', 'fir' ]); + // producer.on("trace", (trace) => { + // console.info("producer trace:", producer.id, trace); + // }); producer.observer.on("close", () => { if(room.producers.delete(producer.id)) { console.info("producer close:", producer.id); @@ -635,7 +668,7 @@ class Taoyao { // producer.observer.on("score", fn(score)); // producer.observer.on("videoorientationchange", fn(videoOrientation)); // producer.observer.on("trace", fn(trace)); - message.body = { kind: kind, producerId: producer.id }; + message.body = { kind: kind, roomId: roomId, producerId: producer.id }; this.push(message); if (producer.kind === "audio") { room.audioLevelObserver @@ -725,8 +758,8 @@ class Taoyao { rtpCapabilities, } = body; const room = this.rooms.get(roomId); - const producer = room.producers.get(producerId); - const transport = room.transports.get(transportId); + const producer = room?.producers.get(producerId); + const transport = room?.transports.get(transportId); if ( !room || !producer || @@ -822,9 +855,10 @@ class Taoyao { }) ); }); - consumer.on("trace", (trace) => { - console.info("consumer trace:", consumer.id, trace); - }); + // await consumer.enableTraceEvent([ 'rtp', 'keyframe', 'nack', 'pli', 'fir' ]); + // consumer.on("trace", (trace) => { + // console.info("consumer trace:", consumer.id, trace); + // }); // consumer.on("rtp", (rtpPacket) => { // console.info("consumer rtp:", consumer.id, rtpPacket); // }); @@ -863,18 +897,19 @@ class Taoyao { // consumer.observer.on("layerschange", fn(layers)); // consumer.observer.on("trace", fn(trace)); // 等待终端准备就绪 - this.request( + await this.request( + // this.push( protocol.buildMessage("media::consume", { kind: consumer.kind, type: consumer.type, roomId: roomId, + appData: producer.appData, clientId: clientId, sourceId: sourceId, streamId: streamId, producerId: producerId, consumerId: consumer.id, rtpParameters: consumer.rtpParameters, - appData: producer.appData, producerPaused: consumer.producerPaused, }) ); @@ -924,7 +959,7 @@ class Taoyao { async mediaConsumerPause(message, body) { const { roomId, consumerId } = body; const room = this.rooms.get(roomId); - const consumer = room.consumers.get(consumerId); + const consumer = room?.consumers.get(consumerId); if(consumer) { console.info("暂停消费者:", consumerId); await consumer.pause(); @@ -933,6 +968,27 @@ class Taoyao { } } + /** + * 请求关键帧信令 + * + * @param {*} message 消息 + * @param {*} body 消息主体 + */ + async mediaConsumerRequestKeyFrame(message, body) { + const me = this; + const { roomId, consumerId } = body; + const room = this.rooms.get(roomId); + const consumer = room?.consumers.get(consumerId); + if(consumer) { + console.info("mediaConsumerRequestKeyFrame:", consumerId); + // 处理trace监听读取关键帧 + await consumer.requestKeyFrame(); + me.push(message); + } else { + console.info("mediaConsumerRequestKeyFrame non:", consumerId); + } + } + /** * 恢复消费者信令 * @@ -951,6 +1007,220 @@ class Taoyao { } } + /** + * 修改最佳空间层和时间层信令 + * + * @param {*} message 消息 + * @param {*} body 消息主体 + */ + async mediaConsumerSetPreferredLayers(message, body) { + const me = this; + const { roomId, consumerId, spatialLayer, temporalLayer } = body; + const room = this.rooms.get(roomId); + const consumer = room?.consumers.get(consumerId); + if(consumer) { + console.info("mediaConsumerSetPreferredLayers:", consumerId); + await consumer.setPreferredLayers({ spatialLayer, temporalLayer }); + me.push(message); + } else { + console.info("mediaConsumerSetPreferredLayers non:", consumerId); + } + } + + /** + * 消费数据信令 + * + * @param {*} message 消息 + * @param {*} body 消息主体 + */ + async mediaDataConsume(message, body) { + const me = this; + const { + roomId, + clientId, + sourceId, + streamId, + producerId, + transportId, + rtpCapabilities, + } = body; + const room = this.rooms.get(roomId); + const transport = room?.transports.get(transportId); + const dataProducer = room?.dataProducers.get(producerId); + if ( + !room || + !transport || + !dataProducer + ) { + console.warn( + "不能消费数据:", + roomId, + clientId, + producerId, + transportId + ); + return; + } + let dataConsumer; + try { + dataConsumer = await transport.consumeData({ + dataProducerId : dataProducer.id + }); + } catch (error) { + console.error("消费数据异常:", producerId, error); + return; + } + dataConsumer.clientId = clientId; + dataConsumer.streamId = streamId; + room.dataConsumers.set(dataConsumer.id, dataConsumer); + console.info("创建数据消费者:", dataProducer.id); + dataConsumer.on('transportclose', () => { + console.info("dataConsumer transportclose:", dataConsumer.id); + dataConsumer.close(); + }); + dataConsumer.on('dataproducerclose', () => { + console.info("dataConsumer dataproducerclose:", dataConsumer.id); + dataConsumer.close(); + }); + dataConsumer.observer.on("close", () => { + if(room.dataConsumers.delete(dataConsumer.id)) { + console.info("dataConsumer close:", dataConsumer.id); + me.push( + protocol.buildMessage("media::data::consumer::close", { + roomId: roomId, + consumerId: dataConsumer.id, + }) + ); + } else { + console.info("dataConsumer close non:", dataConsumer.id); + } + }); + // dataConsumer.on("message", fn(message, ppid)); + // dataConsumer.on("bufferedamountlow", fn(bufferedAmount)); + // dataConsumer.on("sctpsendbufferfull", fn()); + this.push( + protocol.buildMessage("media::data::consume", { + label: dataConsumer.label, + roomId: roomId, + appData: dataProducer.appData, + protocol: dataConsumer.protocol, + clientId: clientId, + sourceId: sourceId, + streamId: streamId, + producerId: producerId, + consumerId: dataConsumer.id, + sctpStreamParameters: dataConsumer.sctpStreamParameters, + }) + ); + } + + /** + * 关闭数据消费者信令 + * + * @param {*} message 消息 + * @param {*} body 消息主体 + */ + async mediaDataConsumerClose(message, body) { + const { roomId, consumerId } = body; + const room = this.rooms.get(roomId); + const dataConsumer = room?.dataConsumers.get(consumerId); + if(dataConsumer) { + console.info("关闭数据消费者:", consumerId); + await dataConsumer.close(); + } else { + console.info("关闭数据消费者无效:", consumerId); + } + } + + /** + * 生产数据信令 + * + * @param {*} message 消息 + * @param {*} body 消息主体 + */ + async mediaDataProduce(message, body) { + const me = this; + const { + label, + roomId, + appData, + clientId, + streamId, + protocol, + transportId, + sctpStreamParameters, + } = body; + const room = me.rooms.get(roomId); + const transport = room?.transports.get(transportId); + if(!transport) { + console.warn("生产数据生产者通道无效:", transportId); + return; + } + const dataProducer = await transport.produceData({ + label, + appData, + protocol, + sctpStreamParameters, + }); + dataProducer.clientId = clientId; + dataProducer.streamId = streamId; + room.dataProducers.set(dataProducer.id, dataProducer); + console.info("创建数据生产者:", dataProducer.id); + dataProducer.on("transportclose", () => { + console.info("dataProducer transportclose:", dataProducer.id); + dataProducer.close(); + }); + dataProducer.observer.on("close", () => { + if(room.dataProducers.delete(dataProducer.id)) { + console.info("dataProducer close:", dataProducer.id); + me.push( + taoyaoProtocol.buildMessage("media::data::producer::close", { + roomId: roomId, + producerId: dataProducer.id, + }) + ); + } else { + console.info("dataProducer close non:", dataProducer.id); + } + }) + message.body = { roomId: roomId, producerId: dataProducer.id }; + this.push(message); + } + + /** + * 关闭数据生产者信令 + * + * @param {*} message 消息 + * @param {*} body 消息主体 + */ + async mediaDataProducerClose(message, body) { + const { roomId, producerId } = body; + const room = this.rooms.get(roomId); + const dataProducer = room?.dataProducers.get(producerId); + if(dataProducer) { + console.info("关闭数据生产者:", producerId); + await dataProducer.close(); + } else { + console.info("关闭数据生产者无效:", producerId); + } + } + + /** + * 重启ICE信令 + * + * @param {*} message 消息 + * @param {*} body 消息主体 + */ + async mediaIceRestart(message, body) { + const me = this; + const { roomId, transportId } = body; + const room = this.rooms.get(roomId); + const transport = room?.transports.get(transportId); + const iceParameters = await transport.restartIce(); + message.body.iceParameters = iceParameters; + me.push(message); + } + /** * 路由RTP协商信令 * @@ -973,7 +1243,7 @@ class Taoyao { async mediaTransportClose(message, body) { const { roomId, transportId } = body; const room = this.rooms.get(roomId); - const transport = room.transports.get(transportId); + const transport = room?.transports.get(transportId); if(transport) { console.info("关闭传输通道:", transportId); transport.close(); @@ -991,10 +1261,15 @@ class Taoyao { async mediaTransportWebrtcConnect(message, body) { const { roomId, transportId, dtlsParameters } = body; const room = this.rooms.get(roomId); - const transport = room.transports.get(transportId); - await transport.connect({ dtlsParameters }); - message.body = { roomId: roomId, transportId: transport.id }; - this.push(message); + const transport = room?.transports.get(transportId); + if(transport) { + console.info("连接WebRTC通道:", transportId); + await transport.connect({ dtlsParameters }); + message.body = { roomId: roomId, transportId: transport.id }; + this.push(message); + } else { + console.info("连接WebRTC通道无效:", transportId); + } } /** @@ -1038,11 +1313,10 @@ class Taoyao { console.info("transport listenserverclose:", transport.id); transport.close(); }); - await transport.enableTraceEvent(["bwe"]); // await transport.enableTraceEvent([ 'probation', 'bwe' ]); - transport.on("trace", (trace) => { - console.debug("transport trace:", transport.id, trace); - }); + // transport.on("trace", (trace) => { + // console.debug("transport trace:", transport.id, trace); + // }); transport.observer.on("close", () => { if(room.transports.delete(transport.id)) { console.info("transport close:", transport.id); @@ -1100,6 +1374,7 @@ class Taoyao { // transport.on("rtcp", fn(rtcpPacket)); room.transports.set(transport.id, transport); message.body = { + roomId: roomId, transportId: transport.id, iceCandidates: transport.iceCandidates, iceParameters: transport.iceParameters, diff --git a/taoyao-client-web/src/App.vue b/taoyao-client-web/src/App.vue index 7807e53..b82ede7 100644 --- a/taoyao-client-web/src/App.vue +++ b/taoyao-client-web/src/App.vue @@ -3,7 +3,7 @@
- + @@ -29,7 +29,7 @@ - + @@ -48,6 +48,13 @@ + + + + + + + @@ -56,23 +63,24 @@
- +
@@ -92,6 +100,7 @@ export default { room: {}, rooms: null, medias: null, + clients: null, config: { clientId: "taoyao", name: "taoyao", @@ -111,6 +120,8 @@ export default { console.info(` 中庭地白树栖鸦,冷露无声湿桂花。 今夜月明人尽望,不知秋思落谁家。 + + :: https://gitee.com/acgist/taoyao `); }, methods: { @@ -126,22 +137,27 @@ export default { async loadList() { this.rooms = await this.taoyao.roomList(); this.medias = await this.taoyao.mediaList(); + this.clients = await this.taoyao.clientList(); + }, + async roomLeave() { + this.taoyao.roomLeave(); }, async roomClose() { this.taoyao.roomClose(); }, - async roomCreate() { - const room = await this.taoyao.roomCreate(this.room); - this.room.roomId = room.roomId; - await this.roomEnter(); - }, async roomEnter() { await this.taoyao.roomEnter(this.room.roomId, this.room.password); await this.taoyao.produceMedia(); this.roomVisible = false; }, - audioVolume(message) { - + async roomCreate() { + const room = await this.taoyao.roomCreate(this.room); + this.room.roomId = room.roomId; + await this.roomEnter(); + }, + async roomInvite() { + this.taoyao.roomInvite(this.room.inviteClientId); + this.roomVisible = false; }, /** * 信令回调 @@ -157,9 +173,6 @@ export default { case "client::config": me.roomVisible = true; break; - case "media::audio::active::speaker": - me.audioVolume(message); - break; case "platform::error": if (error) { console.error("发生异常:", message, error); diff --git a/taoyao-client-web/src/components/LocalClient.vue b/taoyao-client-web/src/components/LocalClient.vue index d848e11..a7a4b04 100644 --- a/taoyao-client-web/src/components/LocalClient.vue +++ b/taoyao-client-web/src/components/LocalClient.vue @@ -5,11 +5,11 @@

{{ client?.name || "" }}

- - - - - + + + + + @@ -104,15 +104,26 @@ export default { } }, methods: { - media(track) { - if (track.kind === "video") { + exchangeVideoSource() { + // TODO:文件支持 + this.taoyao.videoSource = this.taoyao.videoSource === "camera" ? "screen" : "camera"; + this.taoyao.updateVideoProducer(); + }, + media(track, producer) { + if(track.kind === "audio") { + // 不用加载音频 + this.audioProducer = producer; + } else if (track.kind === "video") { + this.videoProducer = producer; if (this.videoStream) { - // TODO:资源释放 - } else { - this.videoStream = new MediaStream(); - this.videoStream.addTrack(track); - this.video.srcObject = this.videoStream; + this.videoStream.getVideoTracks().forEach(oldTrack => { + console.debug("关闭旧的媒体:", oldTrack); + oldTrack.stop(); + }); } + this.videoStream = new MediaStream(); + this.videoStream.addTrack(track); + this.video.srcObject = this.videoStream; this.video.play().catch((error) => console.warn("视频播放失败", error)); } else { console.debug("本地不支持的媒体类型:", track); diff --git a/taoyao-client-web/src/components/RemoteClient.vue b/taoyao-client-web/src/components/RemoteClient.vue index d5a5fbf..79b00f1 100644 --- a/taoyao-client-web/src/components/RemoteClient.vue +++ b/taoyao-client-web/src/components/RemoteClient.vue @@ -5,14 +5,14 @@

{{ client?.name || "" }}

- - - - + + + + - +
@@ -69,8 +69,12 @@ export default { } }, methods: { + roomExpel() { + this.taoyao.roomExpel(this.client.clientId); + }, media(track, consumer) { if(track.kind === 'audio') { + this.audioConsumer = consumer; if (this.audioStream) { // TODO:资源释放 } else { @@ -80,6 +84,7 @@ export default { } this.audio.play().catch((error) => console.warn("视频播放失败", error)); } else if(track.kind === 'video') { + this.videoConsumer = consumer; if (this.videoStream) { // TODO:资源释放 } else { diff --git a/taoyao-client-web/src/components/Taoyao.js b/taoyao-client-web/src/components/Taoyao.js index 8af96e1..b52bad0 100644 --- a/taoyao-client-web/src/components/Taoyao.js +++ b/taoyao-client-web/src/components/Taoyao.js @@ -26,38 +26,40 @@ const protocol = { } const date = new Date(); return ( - 100000000000000 * date.getDate() + - 1000000000000 * date.getHours() + - 10000000000 * date.getMinutes() + - 100000000 * date.getSeconds() + - 1000 * this.clientIndex + + 100000000000000 * date.getDate() + + 1000000000000 * date.getHours() + + 10000000000 * date.getMinutes() + + 100000000 * date.getSeconds() + + 1000 * this.clientIndex + this.index ); }, /** * @param {*} signal 信令标识 - * @param {*} body 消息主体 - * @param {*} id 消息ID - * @param {*} v 消息版本 + * @param {*} body 消息主体 + * @param {*} id 消息ID + * @param {*} v 消息版本 * * @returns 信令消息 */ buildMessage(signal, body = {}, id, v) { - if (!signal) { - throw new Error("信令标识缺失"); - } const message = { header: { - v: v || "1.0.0", - id: id || this.buildId(), + v: v || "1.0.0", + id: id || this.buildId(), signal: signal, }, - body: body, + body: body, }; return message; }, }; +/** + * 名称冲突 + */ +const taoyaoProtocol = protocol; + /** * 信令通道 */ @@ -236,12 +238,6 @@ class RemoteClient { volume = 0; // 代理对象 proxy; - // 数据可用 - dataActive = false; - // 音频可用 - audioActive = false; - // 视频可用 - videoActive = false; // 数据消费者 dataConsumer; // 音频消费者 @@ -328,7 +324,7 @@ class Taoyao extends RemoteClient { dataProduce; // 是否生产音频 audioProduce; - // 是否生成视频 + // 是否生产视频 videoProduce; // 数据生产者 dataProducer; @@ -336,8 +332,10 @@ class Taoyao extends RemoteClient { audioProducer; // 视频生产者 videoProducer; - // 消费者:音频、视频、数据 + // 消费者:音频、视频 consumers = new Map(); + // 消费者:数据 + dataConsumers = new Map(); // 远程终端 remoteClients = new Map(); @@ -450,7 +448,7 @@ class Taoyao extends RemoteClient { * 2. 执行前置回调 * 3. 如果注册全局回调,同时执行结果返回true不再执行后面所有回调。 * 4. 执行后置回调 - * + * * @param {*} message 消息 */ async on(message) { @@ -494,6 +492,9 @@ class Taoyao extends RemoteClient { case "media::consume": await me.defaultMediaConsume(message); break; + case "media::data::consume": + me.defaultMediaDataConsume(message); + break; case "platform::error": me.defaultPlatformError(message); break; @@ -519,6 +520,45 @@ class Taoyao extends RemoteClient { case "media::consumer::close": me.defaultMediaConsumerClose(message); break; + case "media::consumer::pause": + this.defaultMediaConsumerPause(message); + break; + case "media::consumer::request::key::frame": + me.defaultMediaConsumerRequestKeyFrame(message); + break; + case "media::consumer::resume": + this.defaultMediaConsumerResume(message); + break; + case "media::consumer::set::preferred::layers": + me.defaultMediaConsumerSetPreferredLayers(message); + break; + case "media::consumer::status": + this.defaultMediaConsumerStatus(message); + break; + case "media::data::consumer::close": + me.defaultMediaDataConsumerClose(message); + break; + case "media::data::consumer::status": + me.defaultMediaDataConsumerStatus(message); + break; + case "media::data::producer::close": + me.defaultMediaDataProducerClose(message); + break; + case "media::data::producer::status": + me.defaultMediaDataProducerStatus(message); + break; + case "media::producer::close": + me.defaultMediaProducerClose(message); + break; + case "media::producer::pause": + me.defaultMediaProducerPause(message); + break; + case "media::producer::resume": + me.defaultMediaProducerResume(message); + break; + case "media::video::orientation::change": + me.defaultMediaVideoOrientationChange(message); + break; case "room::client::list": me.defaultRoomClientList(message); break; @@ -528,6 +568,12 @@ class Taoyao extends RemoteClient { case "room::enter": me.defaultRoomEnter(message); break; + case "room::expel": + me.defaultRoomExpel(message); + break; + case "room::invite": + me.defaultRoomInvite(message); + break; case "room::leave": me.defaultRoomLeave(message); break; @@ -536,6 +582,58 @@ class Taoyao extends RemoteClient { break; } } + /************************ 管理 ************************/ + getProducer(producerId) { + const me = this; + if(me?.audioProducer.id === producerId) { + return me.audioProducer; + } else if(me?.videoProducer.id === producerId) { + return me.videoProducer; + } else if(me?.dataProducer.id === producerId) { + return me.dataProducer; + } else { + return null; + } + } + async getVideoTrack() { + let track; + const self = this; + if (self.videoSource === "file") { + // TODO:实现文件分享 + // const stream = await this._getExternalVideoStream(); + // track = stream.getVideoTracks()[0].clone(); + } else if (self.videoSource === "camera") { + console.debug("enableWebcam() | calling getUserMedia()"); + // TODO:参数 + const stream = await navigator.mediaDevices.getUserMedia({ + video: self.videoConfig, + }); + track = stream.getVideoTracks()[0]; + // TODO:验证修改API videoTrack.applyCapabilities + console.debug( + "视频信息:", + track.getSettings(), + track.getCapabilities() + ); + } else if (self.videoSource === "screen") { + const stream = await navigator.mediaDevices.getDisplayMedia({ + // 如果需要共享声音 + audio: false, + video: { + cursor: true, + width: { max: 1920 }, + height: { max: 1080 }, + frameRate: { max: 30 }, + logicalSurface: true, + displaySurface: "monitor", + }, + }); + track = stream.getVideoTracks()[0]; + } else { + // TODO:异常 + } + return track; + } /************************ 信令 ************************/ /** * 终端配置信令 @@ -546,14 +644,40 @@ class Taoyao extends RemoteClient { const me = this; const { media, webrtc } = message.body; const { audio, video } = media; - me.audioConfig.sampleSize = { min: media.minSampleSize, ideal: audio.sampleSize, max: media.maxSampleSize }; - me.audioConfig.sampleRate = { min: media.minSampleRate, ideal: audio.sampleRate, max: media.maxSampleRate }; - me.videoConfig.width = { min: media.minWidth, ideal: video.width, max: media.maxWidth }; - me.videoConfig.height = { min: media.minHeight, ideal: video.height, max: media.maxHeight }; - me.videoConfig.frameRate = { min: media.minFrameRate, ideal: video.frameRate, max: media.maxFrameRate }; + me.audioConfig.sampleSize = { + min: media.minSampleSize, + ideal: audio.sampleSize, + max: media.maxSampleSize, + }; + me.audioConfig.sampleRate = { + min: media.minSampleRate, + ideal: audio.sampleRate, + max: media.maxSampleRate, + }; + me.videoConfig.width = { + min: media.minWidth, + ideal: video.width, + max: media.maxWidth, + }; + me.videoConfig.height = { + min: media.minHeight, + ideal: video.height, + max: media.maxHeight, + }; + me.videoConfig.frameRate = { + min: media.minFrameRate, + ideal: video.frameRate, + max: media.maxFrameRate, + }; me.mediaConfig = media; me.webrtcConfig = webrtc; - console.debug("终端配置:", me.audioConfig, me.videoConfig, me.mediaConfig, me.webrtcConfig); + console.debug( + "终端配置:", + me.audioConfig, + me.videoConfig, + me.mediaConfig, + me.webrtcConfig + ); } /** * 重启终端信令 @@ -566,10 +690,10 @@ class Taoyao extends RemoteClient { } /** * 终端注册信令 - * + * * @param {*} message 消息 */ - defaultClientRegister(message) { + defaultClientRegister(message) { const { index } = message.body; protocol.clientIndex = index; } @@ -584,26 +708,26 @@ class Taoyao extends RemoteClient { } /** * 终端音量信令 - * + * * @param {*} message 消息 */ defaultMediaAudioVolume(message) { const me = this; const { roomId, volumes } = message.body; // 静音 - if(!volumes || !volumes.length) { + if (!volumes || !volumes.length) { me.volume = 0; - me.remoteClients.forEach(v => v.volume = 0); + me.remoteClients.forEach((v) => (v.volume = 0)); return; } // 声音 - volumes.forEach(v => { + volumes.forEach((v) => { const { volume, clientId } = v; - if(me.clientId === clientId) { + if (me.clientId === clientId) { me.setVolume(volume); } else { const remoteClient = me.remoteClients.get(clientId); - if(remoteClient) { + if (remoteClient) { remoteClient.setVolume(volume); } } @@ -611,26 +735,28 @@ class Taoyao extends RemoteClient { } /** * 关闭消费者信令 - * + * * @param {*} consumerId 消费者ID */ mediaConsumerClose(consumerId) { const me = this; - me.push(protocol.buildMessage("media::consumer::close", { - roomId: me.roomId, - consumerId: consumerId - })); + me.push( + protocol.buildMessage("media::consumer::close", { + roomId: me.roomId, + consumerId: consumerId, + }) + ); } /** * 关闭消费者信令 - * + * * @param {*} message 消息 */ defaultMediaConsumerClose(message) { const me = this; const { roomId, consumerId } = message.body; const consumer = me.consumers.get(consumerId); - if(consumer) { + if (consumer) { console.info("关闭消费者:", consumerId); consumer.close(); me.consumers.delete(consumerId); @@ -638,12 +764,329 @@ class Taoyao extends RemoteClient { console.debug("关闭消费者无效:", consumerId); } } + /** + * 暂停消费者 + * + * @param {*} consumerId 消费者ID + */ + mediaConsumerPause(consumerId) { + const me = this; + const consumer = me.consumers.get(consumerId); + if(consumer) { + if(consumer.paused) { + return; + } + console.debug("mediaConsumerPause:", consumerId); + me.push(protocol.buildMessage("media::consumer::pause", { + roomId: me.roomId, + consumerId: consumerId, + })); + } else { + console.debug("mediaConsumerPause non:", consumerId); + } + } + /** + * 暂停消费者信令 + * + * @param {*} message 消息 + */ + defaultMediaConsumerPause(message) { + const me = this; + const { roomId, consumerId } = message.body; + const consumer = me.consumers.get(consumerId); + if (consumer) { + console.debug("暂停消费者:", consumerId); + consumer.pause(); + } else { + console.debug("暂停消费者无效:", consumerId); + } + } + /** + * 请求关键帧 + * + * @param {*} consumerId 消费者ID + */ + mediaConsumerRequestKeyFrame(consumerId) { + const me = this; + const consumer = me.consumers.get(consumerId); + if(!consumer) { + me.callbackError("消费者无效:" + consumerId); + return; + } + if(consumer.kind !== "video") { + me.callbackError("消费者不是视频媒体:" + consumerId); + return; + } + me.push(protocol.buildMessage("media::consumer::request::key::frame", { + roomId: me.roomId, + consumerId: consumerId, + })); + } + /** + * 请求关键帧信令 + * + * @param {*} message 消息 + */ + defaultMediaConsumerRequestKeyFrame(message) { + console.debug("defaultMediaConsumerRequestKeyFrame:", message); + } + /** + * 恢复消费者 + * + * @param {*} consumerId 消费者ID + */ + mediaConsumerResume(consumerId) { + const me = this; + const consumer = me.consumers.get(consumerId); + if(consumer) { + if(!consumer.paused) { + return; + } + console.debug("mediaConsumerResume:", consumerId); + me.push(protocol.buildMessage("media::consumer::resume", { + roomId: me.roomId, + consumerId: consumerId, + })); + } else { + console.debug("mediaConsumerResume non:", consumerId); + } + } + /** + * 恢复消费者信令 + * + * @param {*} message 消息 + */ + defaultMediaConsumerResume(message) { + const me = this; + const { roomId, consumerId } = message.body; + const consumer = me.consumers.get(consumerId); + if (consumer) { + console.info("恢复消费者:", consumerId); + consumer.resume(); + } else { + console.debug("恢复消费者无效:", consumerId); + } + } + /** + * 修改最佳空间层和时间层 + * + * @param {*} consumerId 消费者ID + * @param {*} spatialLayer 空间层 + * @param {*} temporalLayer 时间层 + */ + mediaConsumerSetPreferredLayers(consumerId, spatialLayer, temporalLayer) { + const me = this; + const consumer = me.consumers.get(consumerId); + if(!consumer) { + me.callbackError("消费者无效:" + consumerId); + return; + } + if(consumer.kind !== "video") { + me.callbackError("消费者不是视频媒体:" + consumerId); + return; + } + me.push(protocol.buildMessage("media::consumer::set::preferred::layers", { + roomId: me.roomId, + consumerId, + spatialLayer, + temporalLayer, + })); + } + /** + * 修改最佳空间层和时间层信令 + * + * @param {*} message 消息 + */ + defaultMediaConsumerSetPreferredLayers(message) { + console.debug("defaultMediaConsumerSetPreferredLayers:", message); + } + /** + * 查询消费者状态信令 + * + * @param {*} message 消息 + */ + defaultMediaConsumerStatus(message) { + console.info("defaultMediaConsumerStatus:", message); + } + /** + * 关闭数据消费者信令 + * + * @param {*} message 消息 + */ + defaultMediaDataConsumerClose(message) { + const me = this; + const { roomId, consumerId } = message.body; + const dataConsumer = me.dataConsumers.get(consumerId); + if (dataConsumer) { + console.info("关闭数据消费者:", consumerId); + dataConsumer.close(); + me.dataConsumers.delete(consumerId); + } else { + console.debug("关闭数据消费者无效:", consumerId); + } + } + /** + * 查询数据消费者状态信令 + * + * @param {*} message 消息 + */ + defaultMediaDataConsumerStatus(message) { + console.info("defaultMediaDataConsumerStatus:", message); + } + /** + * 关闭数据生产者信令 + * + * @param {*} message 消息 + */ + defaultMediaDataProducerClose(message) { + const me = this; + const { roomId, producerId } = message.body; + const producer = me.dataProducer; + if (producer) { + console.info("关闭数据生产者:", producerId); + producer.close(); + // TODO:类型判断设置为空 + } else { + console.debug("关闭数据生产者无效:", producerId); + } + } + /** + * 关闭数据消费者信令 + * + * @param {*} message 消息 + */ + defaultMediaDataProducerStatus(message) { + console.info("defaultMediaDataProducerStatus:", message); + } + /** + * 关闭生产者信令 + * + * @param {*} message 消息 + */ + async defaultMediaProducerClose(message) { + const me = this; + const { roomId, producerId } = message.body; + const producer = me.getProducer(producerId); + if (producer) { + console.info("关闭生产者:", producerId); + producer.close(); + // TODO:类型判断设置为空 + } else { + console.debug("关闭生产者无效:", producerId); + } + } + /** + * 暂停生产者 + * + * @param {*} producerId 生产者ID + */ + mediaProducerPause(producerId) { + const me = this; + const producer = me.getProducer(producerId); + if(producer) { + if(producer.paused) { + return; + } + console.debug("mediaProducerPause:", producerId); + me.push(protocol.buildMessage("media::producer::pause", { + roomId: me.roomId, + producerId: producerId, + })); + } else { + console.debug("mediaProducerPause non:", producerId); + } + } + /** + * 暂停生产者信令 + * + * @param {*} message 消息 + */ + async defaultMediaProducerPause(message) { + const me = this; + const { roomId, producerId } = message.body; + const producer = me.getProducer(producerId); + if (producer) { + console.debug("暂停生产者:", producerId); + producer.pause(); + } else { + console.debug("暂停生产者无效:", producerId); + } + } + /** + * 恢复生产者 + * + * @param {*} producerId 生产者ID + */ + mediaProducerResume(producerId) { + const me = this; + const producer = me.getProducer(producerId); + if(producer) { + if(!producer.paused) { + return; + } + console.debug("mediaProducerResume:", producerId); + me.push(protocol.buildMessage("media::producer::resume", { + roomId: me.roomId, + producerId: producerId, + })); + } else { + console.debug("mediaProducerResume non:", producerId); + } + } + /** + * 恢复生产者信令 + * + * @param {*} message 消息 + */ + async defaultMediaProducerResume(message) { + const me = this; + const { roomId, producerId } = message.body; + const producer = me.getProducer(producerId); + if (producer) { + console.debug("恢复生产者:", producerId); + producer.resume(); + } else { + console.debug("恢复生产者无效:", producerId); + } + } + /** + * 重启ICE + */ + async mediaIceRestart() { + const me = this; + if (me.sendTransport) { + const response = await me.request(protocol.buildMessage( + 'media::ice::restart', + { transportId: me.sendTransport.id } + )); + const { iceParameters } = response.body; + await me.sendTransport.restartIce({ iceParameters }); + } + if (me.recvTransport) + { + const response = await me.request(protocol.buildMessage( + 'media::ice::restart', + { transportId: me.recvTransport.id } + )); + const { iceParameters } = response; + await me.recvTransport.restartIce({ iceParameters }); + } + + } + /** + * 视频方向变化信令 + * + * @param {*} message 消息 + */ + defaultMediaVideoOrientationChange(message) { + console.debug("视频方向变化信令:", message); + } /** * 消费媒体信令 * * @param {*} message 消息 */ - async defaultMediaConsume(message) { + async defaultMediaConsume(message) { const self = this; if (!self.audioConsume && !self.videoConsume) { console.debug("没有消费媒体"); @@ -708,15 +1151,13 @@ class Taoyao extends RemoteClient { self.push(message); console.debug("远程媒体:", consumer); const remoteClient = self.remoteClients.get(consumer.sourceId); - if(remoteClient && remoteClient.proxy && remoteClient.proxy.media) { + if (remoteClient && remoteClient.proxy && remoteClient.proxy.media) { const track = consumer.track; // TODO:旧的媒体? - if(track.kind === 'audio') { - remoteClient.audioActive = true; + if (track.kind === "audio") { remoteClient.audioTrack = track; remoteClient.audioConsumer = consumer; - } else if(track.kind === 'video') { - remoteClient.videoActive = true; + } else if (track.kind === "video") { remoteClient.videoTrack = track; remoteClient.videoconsumer = consumer; } else { @@ -736,24 +1177,98 @@ class Taoyao extends RemoteClient { } } /** - * 平台异常信令 * + * @param {*} producerId + */ + mediaDataConsume(producerId) { + const me = this; + if(!me.recvTransport) { + me.callbackError("没有连接接收通道"); + return; + } + me.push( + protocol.buildMessage("media::data::consume", { + roomId: me.roomId, + producerId: producerId, + }) + ); + } + /** + * 消费数据信令 + * + * @param {*} message 消息 + */ + async defaultMediaDataConsume(message) { + const me = this; + const { + label, + appData, + protocol, + consumerId, + producerId, + sctpStreamParameters, + } = message.body; + try { + const dataConsumer = await me.recvTransport.consumeData({ + id : consumerId, + dataProducerId : producerId, + label, + appData, + protocol, + sctpStreamParameters, + }); + me.dataConsumers.set(dataConsumer.id, dataConsumer); + dataConsumer.on('transportclose', () => { + console.info("dataConsumer transportclose:", dataConsumer.id); + dataConsumer.close(); + }); + dataConsumer.on('open', () => { + console.info("dataConsumer open:", dataConsumer.id); + window.dataConsumer = dataConsumer; + }); + dataConsumer.on('close', () => { + if(me.dataConsumers.delete(dataConsumer.id)) { + console.info("dataConsumer close:", dataConsumer.id); + me.push( + taoyaoProtocol.buildMessage("media::data::consumer::close", { + roomId: roomId, + consumerId: dataConsumer.id, + }) + ); + } else { + console.info("dataConsumer close non:", dataConsumer.id); + } + }); + dataConsumer.on('error', (error) => { + console.error("dataConsumer error:", dataConsumer.id, error); + dataConsumer.close(); + }); + dataConsumer.on('message', (message) => { + console.info("dataConsume message:", dataConsumer.id, message); + }); + } catch (error) { + console.error("打开数据消费者异常", error); + } + } + /** + * 平台异常信令 + * * @param {*} message 消息 */ defaultPlatformError(message) { const { code } = message; - if(code === "3401") { + if (code === "3401") { signalChannel.close(); } } /** * 房间终端列表信令 - * + * * @param {*} message 消息 */ defaultRoomClientList(message) { const me = this; - message.body.forEach(v => { + message.body.forEach((v) => { if (v.clientId === me.clientId) { // 忽略自己 } else { @@ -764,35 +1279,39 @@ class Taoyao extends RemoteClient { /** * 关闭房间信令 */ - async roomClose() { + async roomClose() { const me = this; - if(!me.roomId) { + if (!me.roomId) { console.warn("房间无效:", me.roomId); return; } - me.push(protocol.buildMessage("room::close", { - roomId: me.roomId - })); + me.push( + protocol.buildMessage("room::close", { + roomId: me.roomId, + }) + ); } /** * 关闭房间信令 - * + * * @param {*} message 消息 */ defaultRoomClose(message) { const me = this; const { roomId } = message.body; - if(me.roomId !== roomId) { + if (me.roomId !== roomId) { return; } console.info("关闭房间:", roomId); me.closeMedia(); + me.roomId = null; + me.remoteClients.clear(); } /** * 创建房间信令 - * + * * @param {*} room 房间 - * + * * @returns 房间 */ async roomCreate(room) { @@ -808,7 +1327,7 @@ class Taoyao extends RemoteClient { } /** * 进入房间信令 - * + * * @param {*} roomId 房间ID * @param {*} password 房间密码 */ @@ -822,7 +1341,7 @@ class Taoyao extends RemoteClient { me.mediasoupDevice = new mediasoupClient.Device(); const response = await me.request( protocol.buildMessage("media::router::rtp::capabilities", { - roomId: me.roomId + roomId: me.roomId, }) ); const routerRtpCapabilities = response.body.rtpCapabilities; @@ -831,14 +1350,14 @@ class Taoyao extends RemoteClient { protocol.buildMessage("room::enter", { roomId: roomId, password: password, - rtpCapabilities: (me.audioConsume || me.videoConsume || me.audioProduce || me.videoProduce) ? me.mediasoupDevice.rtpCapabilities : undefined, - sctpCapabilities: (me.dataConsume || me.dataProduce) ? me.mediasoupDevice.sctpCapabilities : undefined, + rtpCapabilities: me.audioConsume || me.videoConsume || me.audioProduce || me.videoProduce ? me.mediasoupDevice.rtpCapabilities : undefined, + sctpCapabilities: me.dataConsume || me.dataProduce ? me.mediasoupDevice.sctpCapabilities : undefined, }) ); } /** * 进入房间信令 - * + * * @param {*} message 消息 */ defaultRoomEnter(message) { @@ -851,16 +1370,84 @@ class Taoyao extends RemoteClient { } } /** - * 离开房间信令 + * 踢出终端 * - * @param {*} message + * @param {*} clientId 终端ID + */ + roomExpel(clientId) { + const me = this; + if(!me.roomId) { + this.callbackError("没有进入房间"); + return; + } + me.push(protocol.buildMessage("room::expel", { + roomId: this.roomId, + clientId, + })); + } + /** + * 踢出终端信令 + * + * @param {*} message 消息 + */ + async defaultRoomExpel(message) { + const me = this; + me.roomLeave(); + } + /** + * 邀请终端 + * + * @param {*} clientId 终端ID + */ + roomInvite(clientId) { + const me = this; + if(!me.roomId) { + this.callbackError("没有进入房间"); + return; + } + me.push(protocol.buildMessage("room::invite", { + roomId: this.roomId, + clientId, + })); + } + /** + * 邀请终端信令 + * + * @param {*} message 消息 + */ + async defaultRoomInvite(message) { + const me = this; + // 默认进入,如果需要确认使用回调函数重写。 + const { roomId, password } = message.body; + // if(me.roomId) { + // this.callbackError(); + // return; + // } + await me.roomEnter(roomId, password); + await me.produceMedia(); + } + /** + * 离开房间 + */ + roomLeave() { + const me = this; + me.push(protocol.buildMessage("room::leave", { + roomId: me.roomId + })); + me.closeMedia(); + me.roomId = null; + me.remoteClients.clear(); + } + /** + * 离开房间信令 + * + * @param {*} message */ defaultRoomLeave(message) { const me = this; const { clientId } = message.body; me.remoteClients.delete(clientId); console.info("终端离开:", clientId); - // TODO:资源释放 } /** * 错误回调 @@ -895,6 +1482,12 @@ class Taoyao extends RemoteClient { ); return response.body; } + async clientList() { + const response = await this.request( + protocol.buildMessage("client::list", {}) + ); + return response.body; + } async clientList() { const response = await this.request( protocol.buildMessage("client::list", { roomId: self.roomId }) @@ -920,21 +1513,19 @@ class Taoyao extends RemoteClient { */ { const stream = await navigator.mediaDevices.getUserMedia({ audio: true }); - stream.getAudioTracks().forEach(audioTrack => { + stream.getAudioTracks().forEach((audioTrack) => { audioTrack.enabled = false; setTimeout(() => audioTrack.stop(), 30000); }); } - if (self.audioProduce || self.videoProduce) { + if (self.dataProduce || self.audioProduce || self.videoProduce) { const response = await self.request( protocol.buildMessage("media::transport::webrtc::create", { roomId: self.roomId, forceTcp: self.forceTcp, producing: true, consuming: false, - sctpCapabilities: self.dataProduce - ? self.mediasoupDevice.sctpCapabilities - : undefined, + sctpCapabilities: self.dataProduce ? self.mediasoupDevice.sctpCapabilities : undefined, }) ); const { @@ -982,7 +1573,7 @@ class Taoyao extends RemoteClient { "produce", async ({ kind, appData, rtpParameters }, callback, errback) => { try { - const { producerId } = await self.request( + const response = await self.request( protocol.buildMessage("media::produce", { kind, roomId: self.roomId, @@ -991,37 +1582,37 @@ class Taoyao extends RemoteClient { rtpParameters, }) ); + const { streamId, producerId } = response.body; callback({ id: producerId }); } catch (error) { errback(error); } } ); + // 生产数据 self.sendTransport.on( "producedata", - async ( - { label, protocol, appData, sctpStreamParameters }, - callback, - errback - ) => { + async ({ label, appData, protocol, sctpStreamParameters }, callback, errback) => { try { - const { id } = await self.request( - protocol.buildMessage("media::produceData", { + const response = await self.request( + taoyaoProtocol.buildMessage("media::data::produce", { label, + roomId: self.roomId, appData, protocol, transportId: self.sendTransport.id, sctpStreamParameters, }) ); - callback({ id }); + const { treamId, producerId } = response.body; + callback({ id: producerId }); } catch (error) { errback(error); } } ); } - if (self.audioConsume || self.videoConsume) { + if (self.dataConsume || self.audioConsume || self.videoConsume) { const self = this; const response = await self.request( protocol.buildMessage("media::transport::webrtc::create", { @@ -1029,9 +1620,7 @@ class Taoyao extends RemoteClient { forceTcp: self.forceTcp, producing: false, consuming: true, - sctpCapabilities: self.dataProduce - ? self.mediasoupDevice.sctpCapabilities - : undefined, + sctpCapabilities: self.dataProduce ? self.mediasoupDevice.sctpCapabilities : undefined, }) ); const { @@ -1079,8 +1668,14 @@ class Taoyao extends RemoteClient { } ); } + // 快速响应 this.produceAudio(); this.produceVideo(); + this.produceData(); + // 等待响应 + // await this.produceAudio(); + // await this.produceVideo(); + // await this.produceData(); } /** * 生产音频 @@ -1104,7 +1699,11 @@ class Taoyao extends RemoteClient { } track = tracks[0]; // TODO:验证修改API audioTrack.applyCapabilities - console.debug("音频信息:", track.getSettings(), track.getCapabilities()); + console.debug( + "音频信息:", + track.getSettings(), + track.getCapabilities() + ); this.audioProducer = await this.sendTransport.produce({ track, codecOptions: { @@ -1115,8 +1714,12 @@ class Taoyao extends RemoteClient { // codec : this._mediasoupDevice.rtpCapabilities.codecs // .find((codec) => codec.mimeType.toLowerCase() === 'audio/pcma') }); - self.audioActive = true; - self.track = track; + if (self.proxy && self.proxy.media) { + self.audioTrack = track; + self.proxy.media(track, this.audioProducer); + } else { + console.warn("终端没有实现服务代理:", self); + } // TODO:加密解密 // if (this._e2eKey && e2e.isSupported()) { // e2e.setupSenderTransform(this._micProducer.rtpSender); @@ -1145,46 +1748,26 @@ class Taoyao extends RemoteClient { if (!this.audioProducer) { return; } - this.audioProducer.close(); try { await this.request( protocol.buildMessage("media::producer::close", { + roomId: this.roomId, producerId: this.audioProducer.id, }) ); } catch (error) { console.error("关闭麦克风异常", error); } - this.audioProducer = null; } async pauseAudioProducer() { console.debug("静音麦克风"); - this.audioProducer.pause(); - try { - await this.request( - protocol.buildMessage("media::producer::pause", { - producerId: this.audioProducer.id, - }) - ); - } catch (error) { - console.error("静音麦克风异常", error); - // TODO:异常调用回调 - } + this.mediaProducerPause(this.audioProducer.id); } async resumeAudioProducer() { console.debug("恢复麦克风"); - this.audioProducer.resume(); - try { - await this.request( - protocol.buildMessage("media::producer::resume", { - producerId: this.audioProducer.id, - }) - ); - } catch (error) { - console.error("恢复麦克风异常", error); - } + this.mediaProducerResume(this.audioProducer.id); } /** @@ -1198,46 +1781,8 @@ class Taoyao extends RemoteClient { if (self.videoProducer) { return; } - let track; try { - if (self.videoSource === "file") { - // TODO:实现文件分享 - // const stream = await this._getExternalVideoStream(); - // track = stream.getVideoTracks()[0].clone(); - } else if (self.videoSource === "camera") { - console.debug("enableWebcam() | calling getUserMedia()"); - // TODO:参数 - const stream = await navigator.mediaDevices.getUserMedia({ - video: self.videoConfig, - }); - track = stream.getVideoTracks()[0]; - // TODO:验证修改API videoTrack.applyCapabilities - console.debug("视频信息:", track.getSettings(), track.getCapabilities()); - } else if (self.videoSource === "screen") { - const stream = await navigator.mediaDevices.getDisplayMedia({ - // 如果需要共享声音 - audio: false, - video: { - cursor: true, - width: { max: 1920 }, - height: { max: 1080 }, - frameRate: { max: 30 }, - logicalSurface: true, - displaySurface: "monitor", - }, - }); - track = stream.getVideoTracks()[0]; - } else { - // TODO:异常 - } - if(self.proxy && self.proxy.media) { - self.videoTrack = track; - self.proxy.media(track); - } else { - console.warn("终端没有实现服务代理:", self); - } - self.videoActive = true; - self.track = track; + let track = await self.getVideoTrack(); let codec; let encodings; const codecOptions = { @@ -1281,15 +1826,18 @@ class Taoyao extends RemoteClient { encodings, codecOptions, }); - + if (self.proxy && self.proxy.media) { + self.videoTrack = track; + self.proxy.media(track, this.videoProducer); + } else { + console.warn("终端没有实现服务代理:", self); + } // if (this._e2eKey && e2e.isSupported()) { // e2e.setupSenderTransform(this.videoProducer.rtpSender); // } - this.videoProducer.on("transportclose", () => { this.videoProducer = null; }); - this.videoProducer.on("trackended", () => { console.warn("video producer trackended", this.audioProducer); this.closeVideoProducer().catch(() => {}); @@ -1305,89 +1853,97 @@ class Taoyao extends RemoteClient { } } + /** + * 生产数据 + */ + async produceData() { + const me = this; + try { + const dataProducer = await me.sendTransport.produceData({ + ordered: false, + maxPacketLifeTime: 2000, + }); + me.dataProducer = dataProducer; + me.dataProducer.on("open", () => { + console.debug("dataProducer open:", me.dataProducer.id); + window.dataProducer = me.dataProducer; + }); + me.dataProducer.on("close", () => { + console.debug("dataProducer close:", me.dataProducer.id); + me.dataProducer = null; + }); + me.dataProducer.on("error", (error) => { + console.debug("dataProducer error:", me.dataProducer.id, error); + me.dataProducer.close(); + }); + me.dataProducer.on("transportclose", () => { + console.debug("dataProducer transportclose:", me.dataProducer.id); + me.dataProducer.close(); + }); + me.dataProducer.on("bufferedamountlow", () => { + console.debug("dataProducer bufferedamountlow:", me.dataProducer.id); + }); + } catch (error) { + me.callbackError("生产数据异常", error); + } + } + + /** + * 通过数据生产者发送数据 + * + * @param {*} data 数据 + */ + async sendDataProducer(data) { + const me = this; + if(!me.dataProducer) { + me.callbackError("数据生产者无效"); + return; + } + me.dataProducer.send(data); + } + async closeVideoProducer() { console.debug("disableWebcam()"); if (!this.videoProducer) { return; } - this.videoProducer.close(); try { await this.request( protocol.buildMessage("media::producer::close", { + roomId: this.roomId, producerId: this.videoProducer.id, }) - ); - } catch (error) { - console.error(error); - } - - this._webcamProducer = null; + ); + } catch (error) { + console.error(error); + } } async pauseVideoProducer() { console.debug("关闭摄像头"); - this.videoProducer.pause(); - try { - await this.request( - protocol.buildMessage("media::producer::pause", { - producerId: this.videoProducer.id, - }) - ); - } catch (error) { - console.error("关闭摄像头异常", error); - // TODO:异常调用回调 - } + this.mediaProducerPause(this.videoProducer.id); } async resumeVideoProducer() { console.debug("恢复摄像头"); - this.videoProducer.resume(); - try { - await this.request( - protocol.buildMessage("media::producer::resume", { - producerId: this.videoProducer.id, - }) - ); - } catch (error) { - console.error("恢复摄像头异常", error); - } + this.mediaProducerResume(this.videoProducer.id); } - async updateVideoConfig(config) { + /** + * 更新视频生产者 + */ + async updateVideoProducer() { + const me = this; console.debug("更新摄像头参数"); try { - this.videoProducer.track.stop(); - // TODO:screen、参数配置 - const stream = await navigator.mediaDevices.getUserMedia({ - video: true, - }); - const track = stream.getVideoTracks()[0]; + const track = await me.getVideoTrack(); await this.videoProducer.replaceTrack({ track }); + me.proxy.media(track, this.videoProducer); } catch (error) { console.error("changeWebcam() | failed: %o", error); } } - async pauseConsumer(consumer) { - if (consumer.paused) return; - try { - await this._protoo.request("pauseConsumer", { consumerId: consumer.id }); - consumer.pause(); - } catch (error) { - logger.error("_pauseConsumer() | failed:%o", error); - } - } - - async resumeConsumer(consumer) { - if (!consumer.paused) return; - try { - await this._protoo.request("resumeConsumer", { consumerId: consumer.id }); - consumer.resume(); - } catch (error) { - logger.error("_resumeConsumer() | failed:%o", error); - } - } - /** * 验证设备 */ @@ -1453,14 +2009,25 @@ class Taoyao extends RemoteClient { * 关闭媒体 */ closeMedia() { - let self = this; - if (self.sendTransport) { - self.sendTransport.close(); + let me = this; + if (me.sendTransport) { + me.sendTransport.close(); } - if (self.recvTransport) { - self.recvTransport.close(); + if (me.recvTransport) { + me.recvTransport.close(); } - }; + if(me.audioTrack) { + me.audioTrack.stop(); + } + if(me.videoTrack) { + me.videoTrack.stop(); + } + me.sendTransport = null; + me.recvTransport = null; + me.audioProducer = null; + me.videoProducer = null; + me.dataProducer = null; + } /** * 关闭资源 */ @@ -1470,7 +2037,7 @@ class Taoyao extends RemoteClient { if (me.signalChannel) { me.signalChannel.close(); } - }; + } } export { Taoyao }; diff --git a/taoyao-signal-server/taoyao-boot/src/main/java/com/acgist/taoyao/boot/annotation/Description.java b/taoyao-signal-server/taoyao-boot/src/main/java/com/acgist/taoyao/boot/annotation/Description.java index 55d7645..8f7ccf8 100644 --- a/taoyao-signal-server/taoyao-boot/src/main/java/com/acgist/taoyao/boot/annotation/Description.java +++ b/taoyao-signal-server/taoyao-boot/src/main/java/com/acgist/taoyao/boot/annotation/Description.java @@ -14,7 +14,6 @@ import java.lang.annotation.Target; * =[消息类型]> 同步请求 * -[消息类型]) 全员广播:对所有的终端广播信令(排除自己) * +[消息类型]) 全员广播:对所有的终端广播信令(包含自己) - * ...:其他自定义的透传内容 * * @author acgist */ @@ -30,9 +29,6 @@ public @interface Description { String[] body() default { "{}" }; /** - * 同步:需要等待服务端数据时使用 - * 异步:不用等待服务端数据时使用(服务端能主动通知类型消息都能使用异步) - * * @return 数据流向 */ String[] flow() default { "终端->信令服务->终端" }; diff --git a/taoyao-signal-server/taoyao-boot/src/main/java/com/acgist/taoyao/boot/annotation/Prototype.java b/taoyao-signal-server/taoyao-boot/src/main/java/com/acgist/taoyao/boot/annotation/Prototype.java deleted file mode 100644 index 2e0a563..0000000 --- a/taoyao-signal-server/taoyao-boot/src/main/java/com/acgist/taoyao/boot/annotation/Prototype.java +++ /dev/null @@ -1,25 +0,0 @@ -package com.acgist.taoyao.boot.annotation; - -import java.lang.annotation.Documented; -import java.lang.annotation.ElementType; -import java.lang.annotation.Retention; -import java.lang.annotation.RetentionPolicy; -import java.lang.annotation.Target; - -import org.springframework.beans.factory.config.ConfigurableBeanFactory; -import org.springframework.context.annotation.Scope; -import org.springframework.stereotype.Component; - -/** - * 模板:多例对象 - * - * @author acgist - */ -@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) -@Target(ElementType.TYPE) -@Component -@Retention(RetentionPolicy.RUNTIME) -@Documented -public @interface Prototype { - -} diff --git a/taoyao-signal-server/taoyao-boot/src/main/java/com/acgist/taoyao/boot/config/Constant.java b/taoyao-signal-server/taoyao-boot/src/main/java/com/acgist/taoyao/boot/config/Constant.java index 0279e86..d298eaf 100644 --- a/taoyao-signal-server/taoyao-boot/src/main/java/com/acgist/taoyao/boot/config/Constant.java +++ b/taoyao-signal-server/taoyao-boot/src/main/java/com/acgist/taoyao/boot/config/Constant.java @@ -1,5 +1,7 @@ package com.acgist.taoyao.boot.config; +import java.util.function.BiFunction; + /** * 常量 * @@ -107,6 +109,10 @@ public interface Constant { * 密码 */ String PASSWORD = "password"; + /** + * 数据 + */ + String DATA = "data"; /** * 名称 */ @@ -228,4 +234,14 @@ public interface Constant { */ String SUBSCRIBE_TYPE = "subscribeType"; + /** + * 生产者ID生成器 + */ + public static final BiFunction STREAM_ID_PRODUCER = (type, producerId) -> type + "::" + producerId; + + /** + * 消费者ID生成器 + */ + public static final BiFunction STREAM_ID_CONSUMER = (producerStreamId, consumerId) -> producerStreamId + "->" + consumerId; + } diff --git a/taoyao-signal-server/taoyao-server/src/main/resources/application.yml b/taoyao-signal-server/taoyao-server/src/main/resources/application.yml index 6a35078..27341ad 100644 --- a/taoyao-signal-server/taoyao-server/src/main/resources/application.yml +++ b/taoyao-signal-server/taoyao-server/src/main/resources/application.yml @@ -203,6 +203,8 @@ taoyao: password: taoyao # 定时任务 scheduled: + # 清理房间无效资源 + room: 0 0/5 * * * ? # 清理无效终端连接 client: 0 * * * * ? # 地址重写 diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/party/media/DataConsumer.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/party/media/DataConsumer.java index 3c7d003..00ed093 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/party/media/DataConsumer.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/party/media/DataConsumer.java @@ -58,7 +58,7 @@ public class DataConsumer extends OperatorAdapter { @Override public void remove() { log.info("移除数据消费者:{} - {}", this.streamId, this.consumerId); - this.room.getDataProducers().remove(this.consumerId); + this.room.getDataConsumers().remove(this.consumerId); this.dataProducer.getDataConsumers().remove(this.consumerId); this.consumerClient.getDataConsumers().remove(this.consumerId); } diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/party/media/Room.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/party/media/Room.java index 72e9ecc..115e2d9 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/party/media/Room.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/party/media/Room.java @@ -170,6 +170,18 @@ public class Room extends OperatorAdapter { return this.mediaClient.request(message); } + /** + * 单播消息 + * + * @param to 接收终端 + * @param message 消息 + */ + public void unicast(String to, Message message) { + this.clients.keySet().stream() + .filter(v -> Objects.equals(to, v.clientId())) + .forEach(v -> v.push(message)); + } + /** * 广播消息 * @@ -305,5 +317,16 @@ public class Room extends OperatorAdapter { ); this.clients.values().forEach(ClientWrapper::log); } + + /** + * 清理没有关联终端的资源 + */ + public void releaseUnknowClient() { + this.transports.values().stream().filter(v -> !this.clients.containsKey(v.getClient())).forEach(Transport::close); + this.consumers.values().stream().filter(v -> !this.clients.containsValue(v.getConsumerClient())).forEach(Consumer::close); + this.producers.values().stream().filter(v -> !this.clients.containsValue(v.getProducerClient())).forEach(Producer::close); + this.dataConsumers.values().stream().filter(v -> !this.clients.containsValue(v.getConsumerClient())).forEach(DataConsumer::close); + this.dataProducers.values().stream().filter(v -> !this.clients.containsValue(v.getProducerClient())).forEach(DataProducer::close); + } } diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/party/media/RoomManager.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/party/media/RoomManager.java index ccaad4a..f9404bd 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/party/media/RoomManager.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/party/media/RoomManager.java @@ -5,6 +5,8 @@ import java.util.Map; import java.util.Objects; import java.util.concurrent.CopyOnWriteArrayList; +import org.springframework.scheduling.annotation.Scheduled; + import com.acgist.taoyao.boot.annotation.Manager; import com.acgist.taoyao.boot.config.Constant; import com.acgist.taoyao.boot.model.Message; @@ -37,6 +39,11 @@ public class RoomManager { this.clientManager = clientManager; this.rooms = new CopyOnWriteArrayList<>(); } + + @Scheduled(cron = "${taoyao.scheduled.room:0 0/5 * * * ?}") + public void scheduled() { + this.releaseUnknowClient(); + } /** * @param roomId 房间标识 @@ -161,5 +168,12 @@ public class RoomManager { ); this.rooms.forEach(Room::log); } + + /** + * 清理没有关联终端的资源 + */ + private void releaseUnknowClient() { + this.rooms.forEach(Room::releaseUnknowClient); + } } diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/ProtocolClientAdapter.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/ProtocolClientAdapter.java index f85eecf..f13c733 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/ProtocolClientAdapter.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/ProtocolClientAdapter.java @@ -25,11 +25,11 @@ public abstract class ProtocolClientAdapter extends ProtocolAdapter { /** * 处理终端信令 * - * @param clientId 终端标识 + * @param clientId 终端标识 * @param clientType 终端类型 - * @param client 终端 - * @param message 消息 - * @param body 消息主体 + * @param client 终端 + * @param message 消息 + * @param body 消息主体 */ public void execute(String clientId, ClientType clientType, Client client, Message message, Map body) { } diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/ProtocolControlAdapter.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/ProtocolControlAdapter.java index d771ceb..3b1a021 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/ProtocolControlAdapter.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/ProtocolControlAdapter.java @@ -33,13 +33,13 @@ public abstract class ProtocolControlAdapter extends ProtocolClientAdapter { /** * 处理终端控制信令 * - * @param clientId 终端标识 - * @param clientType 终端类型 - * @param room 房间 - * @param client 终端 + * @param clientId 终端标识 + * @param clientType 终端类型 + * @param room 房间 + * @param client 终端 * @param targetClient 目标 - * @param message 消息 - * @param body 消息主体 + * @param message 消息 + * @param body 消息主体 */ public void execute(String clientId, ClientType clientType, Client client, Client targetClient, Message message, Map body) { } @@ -48,7 +48,7 @@ public abstract class ProtocolControlAdapter extends ProtocolClientAdapter { * 请求终端执行控制信令 * * @param clientId 终端ID - * @param request 请求 + * @param request 请求 * * @return 响应 */ diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/ProtocolRoomAdapter.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/ProtocolRoomAdapter.java index 2fec447..5fe22de 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/ProtocolRoomAdapter.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/ProtocolRoomAdapter.java @@ -31,9 +31,7 @@ public abstract class ProtocolRoomAdapter extends ProtocolClientAdapter { if(!this.authenticate(room, client)) { throw MessageCodeException.of("终端没有房间权限:" + clientId); } - synchronized (room) { - this.execute(clientId, clientType, room, client, room.getMediaClient(), message, body); - } + this.execute(clientId, clientType, room, client, room.getMediaClient(), message, body); } /** diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaConsumeProtocol.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaConsumeProtocol.java index d0aa676..5a08e02 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaConsumeProtocol.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaConsumeProtocol.java @@ -37,6 +37,12 @@ import lombok.extern.slf4j.Slf4j; 终端生产媒体当前房间所有终端根据订阅类型自动消费媒体 终端创建WebRTC消费通道根据订阅类型自动消费当前房间已有媒体 """, + body = """ + { + "roomId": "房间ID" + "producerId": "生产者ID" + } + """, flow = { "终端-[生产媒体]>信令服务-[其他终端消费])信令服务", "终端-[创建WebRTC消费通道]>信令服务-[消费其他终端])信令服务", @@ -118,7 +124,7 @@ public class MediaConsumeProtocol extends ProtocolRoomAdapter implements Applica private void consume(Room room, ClientWrapper consumerClientWrapper, Producer producer, Message message) { final Client mediaClient = room.getMediaClient(); final String consumerClientId = consumerClientWrapper.getClientId(); - final String streamId = producer.getStreamId() + "->" + consumerClientId; + final String streamId = Constant.STREAM_ID_CONSUMER.apply(producer.getStreamId(), consumerClientId); final ClientWrapper producerClientWrapper = producer.getProducerClient(); final String producerClientId = producerClientWrapper.getClientId(); if(consumerClientWrapper.consumed(producer)) { diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaConsumerCloseProtocol.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaConsumerCloseProtocol.java index e2b1c13..59bf80b 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaConsumerCloseProtocol.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaConsumerCloseProtocol.java @@ -69,6 +69,7 @@ public class MediaConsumerCloseProtocol extends ProtocolRoomAdapter implements A if(clientType.mediaClient()) { consumer.close(); } else if(clientType.mediaServer()) { + // TODO:路由到真实消费者 consumer.remove(); room.broadcast(message); } else { diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaConsumerPauseProtocol.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaConsumerPauseProtocol.java index 01cd60f..19a37af 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaConsumerPauseProtocol.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaConsumerPauseProtocol.java @@ -59,6 +59,7 @@ public class MediaConsumerPauseProtocol extends ProtocolRoomAdapter implements A final Consumer consumer = room.consumer(consumerId); consumer.pause(); } else if(clientType.mediaServer()) { + // TODO:路由到真实消费者 room.broadcast(message); } else { this.logNoAdapter(clientType); diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaConsumerRequestKeyFrameProtocol.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaConsumerRequestKeyFrameProtocol.java index 0badf9a..3f3ee86 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaConsumerRequestKeyFrameProtocol.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaConsumerRequestKeyFrameProtocol.java @@ -1,5 +1,52 @@ package com.acgist.taoyao.signal.protocol.media; -public class MediaConsumerRequestKeyFrameProtocol { +import java.util.Map; + +import com.acgist.taoyao.boot.annotation.Description; +import com.acgist.taoyao.boot.annotation.Protocol; +import com.acgist.taoyao.boot.config.Constant; +import com.acgist.taoyao.boot.model.Message; +import com.acgist.taoyao.boot.utils.MapUtils; +import com.acgist.taoyao.signal.client.Client; +import com.acgist.taoyao.signal.client.ClientType; +import com.acgist.taoyao.signal.party.media.Room; +import com.acgist.taoyao.signal.protocol.ProtocolRoomAdapter; + +/** + * 请求关键帧信令 + * 注意:视频才有 + * + * @author acgist + */ +@Protocol +@Description( + body = """ + { + "roomId": "房间ID", + "consumerId": "消费者ID" + } + """, + flow = "终端->信令服务->媒体服务->信令服务->终端" +) +public class MediaConsumerRequestKeyFrameProtocol extends ProtocolRoomAdapter { + + public static final String SIGNAL = "media::consumer::request::key::frame"; + + public MediaConsumerRequestKeyFrameProtocol() { + super("请求关键帧信令", SIGNAL); + } + + @Override + public void execute(String clientId, ClientType clientType, Room room, Client client, Client mediaClient, Message message, Map body) { + if(clientType.mediaClient()) { + body.put(Constant.CLIENT_ID, clientId); + mediaClient.push(message); + } else if(clientType.mediaServer()) { + final String requestClientId = MapUtils.remove(body, Constant.CLIENT_ID); + room.unicast(requestClientId, message); + } else { + this.logNoAdapter(clientType); + } + } } diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaConsumerResumeProtocol.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaConsumerResumeProtocol.java index b8cfa61..5768b8a 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaConsumerResumeProtocol.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaConsumerResumeProtocol.java @@ -34,7 +34,7 @@ import com.acgist.taoyao.signal.protocol.ProtocolRoomAdapter; ) public class MediaConsumerResumeProtocol extends ProtocolRoomAdapter implements ApplicationListener { - public static final String SIGNAL = "media::consumer::resumt"; + public static final String SIGNAL = "media::consumer::resume"; public MediaConsumerResumeProtocol() { super("恢复消费者信令", SIGNAL); @@ -57,8 +57,9 @@ public class MediaConsumerResumeProtocol extends ProtocolRoomAdapter implements if(clientType.mediaClient()) { final String consumerId = MapUtils.get(body, Constant.CONSUMER_ID); final Consumer consumer = room.consumer(consumerId); - consumer.pause(); + consumer.resume(); } else if(clientType.mediaServer()) { + // TODO:路由到真实消费者 room.broadcast(message); } else { this.logNoAdapter(clientType); diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaConsumerSetPreferredLayersProtocol.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaConsumerSetPreferredLayersProtocol.java index cddae89..75fbe35 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaConsumerSetPreferredLayersProtocol.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaConsumerSetPreferredLayersProtocol.java @@ -1,5 +1,57 @@ package com.acgist.taoyao.signal.protocol.media; -public class MediaConsumerSetPreferredLayersProtocol { +import java.util.Map; +import com.acgist.taoyao.boot.annotation.Description; +import com.acgist.taoyao.boot.annotation.Protocol; +import com.acgist.taoyao.boot.config.Constant; +import com.acgist.taoyao.boot.model.Message; +import com.acgist.taoyao.boot.utils.MapUtils; +import com.acgist.taoyao.signal.client.Client; +import com.acgist.taoyao.signal.client.ClientType; +import com.acgist.taoyao.signal.party.media.Room; +import com.acgist.taoyao.signal.protocol.ProtocolRoomAdapter; + +/** + * 修改最佳空间层和时间层信令 + * 空间层(spatialLayer):分辨率 + * 时间层(temporalLayer):帧率 + * 码率:数据大小和时间的比值 + * 注意:只有simulcast和SVC消费者有效 + * + * @author acgist + */ +@Protocol +@Description( + body = """ + { + "roomId": "房间ID", + "consumerId": "消费者ID", + "spatialLayer": 最佳空间层, + "temporalLayer": 最佳时间层 + } + """, + flow = "终端->信令服务->媒体服务->信令服务->终端" +) +public class MediaConsumerSetPreferredLayersProtocol extends ProtocolRoomAdapter { + + public static final String SIGNAL = "media::consumer::set::preferred::layers"; + + public MediaConsumerSetPreferredLayersProtocol() { + super("修改最佳空间层和时间层信令", SIGNAL); + } + + @Override + public void execute(String clientId, ClientType clientType, Room room, Client client, Client mediaClient, Message message, Map body) { + if(clientType.mediaClient()) { + body.put(Constant.CLIENT_ID, clientId); + mediaClient.push(message); + } else if(clientType.mediaServer()) { + final String requestClientId = MapUtils.remove(body, Constant.CLIENT_ID); + room.unicast(requestClientId, message); + } else { + this.logNoAdapter(clientType); + } + } + } diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaConsumerSetPriorityProtocol.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaConsumerSetPriorityProtocol.java index 64a82b9..6873293 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaConsumerSetPriorityProtocol.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaConsumerSetPriorityProtocol.java @@ -1,5 +1,45 @@ package com.acgist.taoyao.signal.protocol.media; -public class MediaConsumerSetPriorityProtocol { +import java.util.Map; + +import com.acgist.taoyao.boot.annotation.Description; +import com.acgist.taoyao.boot.annotation.Protocol; +import com.acgist.taoyao.boot.model.Message; +import com.acgist.taoyao.signal.client.Client; +import com.acgist.taoyao.signal.client.ClientType; +import com.acgist.taoyao.signal.party.media.Room; +import com.acgist.taoyao.signal.protocol.ProtocolRoomAdapter; + +/** + * 设置消费者优先级信令 + * + * @author acgist + */ +@Protocol +@Description( + body = """ + { + "roomId": "房间ID", + "consumerId": "消费者ID", + "priority": 优先级(1~255) + } + """ +) +public class MediaConsumerSetPriorityProtocol extends ProtocolRoomAdapter { + + public static final String SIGNAL = "media::consumer::set::priority"; + + public MediaConsumerSetPriorityProtocol() { + super("设置消费者优先级信令", SIGNAL); + } + + @Override + public void execute(String clientId, ClientType clientType, Room room, Client client, Client mediaClient, Message message, Map body) { + if(clientType.mediaClient()) { + client.push(mediaClient.request(message)); + } else { + this.logNoAdapter(clientType); + } + } } diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaDataConsumeProtocol.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaDataConsumeProtocol.java index f6d34d8..696c90f 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaDataConsumeProtocol.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaDataConsumeProtocol.java @@ -1,5 +1,96 @@ package com.acgist.taoyao.signal.protocol.media; -public class MediaDataConsumeProtocol { +import java.util.Map; + +import com.acgist.taoyao.boot.annotation.Description; +import com.acgist.taoyao.boot.annotation.Protocol; +import com.acgist.taoyao.boot.config.Constant; +import com.acgist.taoyao.boot.model.Message; +import com.acgist.taoyao.boot.model.MessageCodeException; +import com.acgist.taoyao.boot.utils.MapUtils; +import com.acgist.taoyao.signal.client.Client; +import com.acgist.taoyao.signal.client.ClientType; +import com.acgist.taoyao.signal.party.media.ClientWrapper; +import com.acgist.taoyao.signal.party.media.DataConsumer; +import com.acgist.taoyao.signal.party.media.DataProducer; +import com.acgist.taoyao.signal.party.media.Room; +import com.acgist.taoyao.signal.party.media.Transport; +import com.acgist.taoyao.signal.protocol.ProtocolRoomAdapter; + +import lombok.extern.slf4j.Slf4j; + +/** + * 消费数据信令 + * + * @author acgist + */ +@Slf4j +@Protocol +@Description( + memo = """ + 数据通道消费者不会自动创建,需要用户自己订阅生产者。 + """, + body = """ + { + "roomId": "房间ID" + "producerId": "生产者ID", + } + """, + flow = { + "终端=>信令服务->媒体服务->信令服务->媒体服务" + } +) +public class MediaDataConsumeProtocol extends ProtocolRoomAdapter { + + public static final String SIGNAL = "media::data::consume"; + + public MediaDataConsumeProtocol() { + super("消费数据信令", SIGNAL); + } + + @Override + public void execute(String clientId, ClientType clientType, Room room, Client client, Client mediaClient, Message message, Map body) { + final String producerId = MapUtils.get(body, Constant.PRODUCER_ID); + final DataProducer dataProducer = room.dataProducer(producerId); + if(dataProducer == null) { + throw MessageCodeException.of("没有提供数据生产:" + producerId); + } + if(clientType.mediaClient()) { + final ClientWrapper dataConsumerClientWrapper = room.clientWrapper(client); + final String dataConsumerClientId = dataConsumerClientWrapper.getClientId(); + final ClientWrapper dataProducerClientWrapper = dataProducer.getProducerClient(); + final String dataProducerClientId = dataProducerClientWrapper.getClientId(); + final Transport recvTransport = dataConsumerClientWrapper.getRecvTransport(); + final String streamId = Constant.STREAM_ID_CONSUMER.apply(dataProducer.getStreamId(), dataConsumerClientId); + body.put(Constant.ROOM_ID, room.getRoomId()); + body.put(Constant.CLIENT_ID, dataConsumerClientId); + body.put(Constant.SOURCE_ID, dataProducerClientId); + body.put(Constant.STREAM_ID, streamId); + body.put(Constant.PRODUCER_ID, dataProducer.getProducerId()); + body.put(Constant.TRANSPORT_ID, recvTransport.getTransportId()); + body.put(Constant.RTP_CAPABILITIES, dataConsumerClientWrapper.getRtpCapabilities()); + body.put(Constant.SCTP_CAPABILITIES, dataConsumerClientWrapper.getSctpCapabilities()); + mediaClient.push(message); + } else if(clientType.mediaServer()) { + final String streamId = MapUtils.get(body, Constant.STREAM_ID); + final String consumerId = MapUtils.get(body, Constant.CONSUMER_ID); + final String dataConsumerClientId = MapUtils.get(body, Constant.CLIENT_ID); + final ClientWrapper dataConsumerClientWrapper = room.clientWrapper(dataConsumerClientId); + final Map roomDataConsumers = room.getDataConsumers(); + final Map clientDataConsumers = dataConsumerClientWrapper.getDataConsumers(); + final Map producerDataConsumers = dataProducer.getDataConsumers(); + final DataConsumer dataConsumer = new DataConsumer(streamId, consumerId, room, dataProducer, dataConsumerClientWrapper); + final DataConsumer oldDataRoomConsumer = roomDataConsumers.put(consumerId, dataConsumer); + final DataConsumer oldDataClientConsumer = clientDataConsumers.put(consumerId, dataConsumer); + final DataConsumer oldDataProducerConsumer = producerDataConsumers.put(consumerId, dataConsumer); + if(oldDataRoomConsumer != null || oldDataClientConsumer != null || oldDataProducerConsumer != null) { + log.warn("消费者已经存在:{}", consumerId); + } + final Client consumeClient = dataConsumerClientWrapper.getClient(); + consumeClient.push(message); + } else { + this.logNoAdapter(clientType); + } + } } diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaDataConsumerCloseProtocol.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaDataConsumerCloseProtocol.java index 84ebb36..7fadb0e 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaDataConsumerCloseProtocol.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaDataConsumerCloseProtocol.java @@ -69,6 +69,7 @@ public class MediaDataConsumerCloseProtocol extends ProtocolRoomAdapter implemen if(clientType.mediaClient()) { dataConsumer.close(); } else if(clientType.mediaServer()) { + // TODO:路由到真实消费者 dataConsumer.remove(); room.broadcast(message); } else { diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaDataConsumerStatusProtocol.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaDataConsumerStatusProtocol.java index 2ec4c88..fe51cb9 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaDataConsumerStatusProtocol.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaDataConsumerStatusProtocol.java @@ -1,5 +1,45 @@ package com.acgist.taoyao.signal.protocol.media; -public class MediaDataConsumerStatusProtocol { +import java.util.Map; +import com.acgist.taoyao.boot.annotation.Description; +import com.acgist.taoyao.boot.annotation.Protocol; +import com.acgist.taoyao.boot.model.Message; +import com.acgist.taoyao.signal.client.Client; +import com.acgist.taoyao.signal.client.ClientType; +import com.acgist.taoyao.signal.party.media.Room; +import com.acgist.taoyao.signal.protocol.ProtocolRoomAdapter; + +/** + * 查询数据消费者状态信令 + * + * @author acgist + */ +@Protocol +@Description( + body = """ + { + "roomId": "房间ID", + "consumerId": "数据消费者ID" + } + """, + flow = "终端=>信令服务->媒体服务->信令服务->终端" +) +public class MediaDataConsumerStatusProtocol extends ProtocolRoomAdapter { + + public static final String SIGNAL = "media::data::consumer::status"; + + public MediaDataConsumerStatusProtocol() { + super("查询数据消费者状态信令", SIGNAL); + } + + @Override + public void execute(String clientId, ClientType clientType, Room room, Client client, Client mediaClient, Message message, Map body) { + if(clientType.mediaClient()) { + client.push(mediaClient.request(message)); + } else { + this.logNoAdapter(clientType); + } + } + } diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaDataProduceProtocol.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaDataProduceProtocol.java index a25e6cf..d2b1b2b 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaDataProduceProtocol.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaDataProduceProtocol.java @@ -1,5 +1,75 @@ package com.acgist.taoyao.signal.protocol.media; -public class MediaDataProduceProtocol { +import java.util.Map; +import com.acgist.taoyao.boot.annotation.Description; +import com.acgist.taoyao.boot.annotation.Protocol; +import com.acgist.taoyao.boot.config.Constant; +import com.acgist.taoyao.boot.model.Message; +import com.acgist.taoyao.boot.utils.MapUtils; +import com.acgist.taoyao.signal.client.Client; +import com.acgist.taoyao.signal.client.ClientType; +import com.acgist.taoyao.signal.party.media.ClientWrapper; +import com.acgist.taoyao.signal.party.media.DataProducer; +import com.acgist.taoyao.signal.party.media.Room; +import com.acgist.taoyao.signal.protocol.ProtocolRoomAdapter; + +import lombok.extern.slf4j.Slf4j; + +/** + * 生产数据信令 + * + * @author acgist + */ +@Slf4j +@Protocol +@Description( + body = { + """ + { + "roomId": "房间标识", + "transportId": "通道标识" + } + """ + }, + flow = "终端->信令服务->媒体服务->信令服务->终端" +) +public class MediaDataProduceProtocol extends ProtocolRoomAdapter { + + public static final String SIGNAL = "media::data::produce"; + + public MediaDataProduceProtocol() { + super("生产数据信令", SIGNAL); + } + + @Override + public void execute(String clientId, ClientType clientType, Room room, Client client, Client mediaClient, Message message, Map body) { + if(clientType.mediaClient()) { + final String streamId = Constant.STREAM_ID_PRODUCER.apply(Constant.DATA, clientId); + body.put(Constant.CLIENT_ID, clientId); + body.put(Constant.STREAM_ID, streamId); + final Message response = room.request(message); + final Map responseBody = response.body(); + final String producerId = MapUtils.get(responseBody, Constant.PRODUCER_ID); + final ClientWrapper producerClientWrapper = room.clientWrapper(client); + final Map roomDataProducers = room.getDataProducers(); + final Map clientDataProducers = producerClientWrapper.getDataProducers(); + final DataProducer dataProducer = new DataProducer(streamId, producerId, room, producerClientWrapper); + final DataProducer oldRoomDataProducer = roomDataProducers.put(producerId, dataProducer); + final DataProducer oldClientDataProducer = clientDataProducers.put(producerId, dataProducer); + if(oldRoomDataProducer != null || oldClientDataProducer != null) { + log.warn("数据生产者已经存在:{}", producerId); + } + final Message responseMessage = response.cloneWithoutBody(); + responseMessage.setBody(Map.of( + Constant.STREAM_ID, streamId, + Constant.PRODUCER_ID, producerId + )); + room.broadcast(responseMessage); + log.info("{}生产数据:{} - {}", clientId, streamId, producerId); + } else { + this.logNoAdapter(clientType); + } + } + } diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaDataProducerCloseProtocol.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaDataProducerCloseProtocol.java index bb8d2ff..32f6c5c 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaDataProducerCloseProtocol.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaDataProducerCloseProtocol.java @@ -66,6 +66,7 @@ public class MediaDataProducerCloseProtocol extends ProtocolRoomAdapter implemen if(clientType.mediaClient()) { dataProducer.close(); } else if(clientType.mediaServer()) { + // TODO:路由到真实消费者 dataProducer.remove(); room.broadcast(message); } else { diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaDataProducerStatusProtocol.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaDataProducerStatusProtocol.java index 142e096..201e949 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaDataProducerStatusProtocol.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaDataProducerStatusProtocol.java @@ -1,5 +1,45 @@ package com.acgist.taoyao.signal.protocol.media; -public class MediaDataProducerStatusProtocol { +import java.util.Map; + +import com.acgist.taoyao.boot.annotation.Description; +import com.acgist.taoyao.boot.annotation.Protocol; +import com.acgist.taoyao.boot.model.Message; +import com.acgist.taoyao.signal.client.Client; +import com.acgist.taoyao.signal.client.ClientType; +import com.acgist.taoyao.signal.party.media.Room; +import com.acgist.taoyao.signal.protocol.ProtocolRoomAdapter; + +/** + * 查询数据生产者状态信令 + * + * @author acgist + */ +@Protocol +@Description( + body = """ + { + "roomId": "房间ID", + "producerId": "数据生产者ID" + } + """, + flow = "终端=>信令服务->媒体服务->信令服务->终端" +) +public class MediaDataProducerStatusProtocol extends ProtocolRoomAdapter { + + public static final String SIGNAL = "media::data::producer::status"; + + public MediaDataProducerStatusProtocol() { + super("查询数据生产者状态信令", SIGNAL); + } + + @Override + public void execute(String clientId, ClientType clientType, Room room, Client client, Client mediaClient, Message message, Map body) { + if(clientType.mediaClient()) { + client.push(mediaClient.request(message)); + } else { + this.logNoAdapter(clientType); + } + } } diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaIceRestartProtocol.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaIceRestartProtocol.java index d44ad85..9d10d46 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaIceRestartProtocol.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaIceRestartProtocol.java @@ -1,10 +1,17 @@ package com.acgist.taoyao.signal.protocol.media; +import java.util.Map; + import com.acgist.taoyao.boot.annotation.Description; import com.acgist.taoyao.boot.annotation.Protocol; +import com.acgist.taoyao.boot.model.Message; +import com.acgist.taoyao.signal.client.Client; +import com.acgist.taoyao.signal.client.ClientType; +import com.acgist.taoyao.signal.party.media.Room; +import com.acgist.taoyao.signal.protocol.ProtocolRoomAdapter; /** - * 媒体重启ICE信令 + * 重启ICE信令 * * @author acgist */ @@ -25,10 +32,23 @@ import com.acgist.taoyao.boot.annotation.Protocol; } """ }, - flow = "终端->信令服务->媒体服务->信令服务->终端" + flow = "终端=>信令服务->媒体服务->信令服务->终端" ) -public class MediaIceRestartProtocol { +public class MediaIceRestartProtocol extends ProtocolRoomAdapter { public static final String SIGNAL = "media::ice::restart"; + public MediaIceRestartProtocol() { + super("重启ICE信令", SIGNAL); + } + + @Override + public void execute(String clientId, ClientType clientType, Room room, Client client, Client mediaClient, Message message, Map body) { + if(clientType.mediaClient()) { + client.push(mediaClient.request(message)); + } else { + this.logNoAdapter(clientType); + } + } + } diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaProduceProtocol.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaProduceProtocol.java index 880f762..3d921cc 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaProduceProtocol.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaProduceProtocol.java @@ -49,7 +49,7 @@ public class MediaProduceProtocol extends ProtocolRoomAdapter { public void execute(String clientId, ClientType clientType, Room room, Client client, Client mediaClient, Message message, Map body) { if(clientType.mediaClient()) { final String kind = MapUtils.get(body, Constant.KIND); - final String streamId = kind + "::" + clientId; + final String streamId = Constant.STREAM_ID_PRODUCER.apply(kind, clientId); body.put(Constant.CLIENT_ID, clientId); body.put(Constant.STREAM_ID, streamId); final Message response = room.request(message); @@ -69,7 +69,7 @@ public class MediaProduceProtocol extends ProtocolRoomAdapter { Constant.KIND, kind, Constant.STREAM_ID, streamId, Constant.PRODUCER_ID, producerId - )); + )); room.broadcast(responseMessage); log.info("{}生产媒体:{} - {}", clientId, streamId, producerId); this.publishEvent(new MediaConsumeEvent(room, producer)); diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaProducerCloseProtocol.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaProducerCloseProtocol.java index a6ab655..4f43882 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaProducerCloseProtocol.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaProducerCloseProtocol.java @@ -66,6 +66,7 @@ public class MediaProducerCloseProtocol extends ProtocolRoomAdapter implements A if(clientType.mediaClient()) { producer.close(); } else if(clientType.mediaServer()) { + // TODO:路由到真实消费者 producer.remove(); room.broadcast(message); } else { diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaProducerPauseProtocol.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaProducerPauseProtocol.java index f6f6a37..1bfeb54 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaProducerPauseProtocol.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaProducerPauseProtocol.java @@ -57,6 +57,7 @@ public class MediaProducerPauseProtocol extends ProtocolRoomAdapter implements A final Producer producer = room.producer(producerId); producer.pause(); } else if(clientType.mediaServer()) { + // TODO:路由到真实消费者 room.broadcast(message); } else { this.logNoAdapter(clientType); diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaProducerResumeProtocol.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaProducerResumeProtocol.java index 2085f11..691cb5e 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaProducerResumeProtocol.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaProducerResumeProtocol.java @@ -57,6 +57,7 @@ public class MediaProducerResumeProtocol extends ProtocolRoomAdapter implements final Producer producer = room.producer(producerId); producer.resume(); } else if(clientType.mediaServer()) { + // TODO:路由到真实消费者 room.broadcast(message); } else { this.logNoAdapter(clientType); diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaTransportWebRtcCreateProtocol.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaTransportWebRtcCreateProtocol.java index 40a036f..cb04b53 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaTransportWebRtcCreateProtocol.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaTransportWebRtcCreateProtocol.java @@ -80,6 +80,7 @@ public class MediaTransportWebRtcCreateProtocol extends ProtocolRoomAdapter { clientWrapper.setRecvTransport(recvTransport); // 拷贝属性 recvTransport.copy(responseBody); + // 消费媒体:不能在连接时调用 this.publishEvent(new MediaConsumeEvent(room, clientWrapper)); } // 生产者 diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaVideoOrientationChangeProtocol.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaVideoOrientationChangeProtocol.java index 3a2ee1b..a3d6f9d 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaVideoOrientationChangeProtocol.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaVideoOrientationChangeProtocol.java @@ -1,5 +1,41 @@ package com.acgist.taoyao.signal.protocol.media; -public class MediaVideoOrientationChangeProtocol { +import java.util.Map; + +import com.acgist.taoyao.boot.annotation.Description; +import com.acgist.taoyao.boot.annotation.Protocol; +import com.acgist.taoyao.boot.model.Message; +import com.acgist.taoyao.signal.client.Client; +import com.acgist.taoyao.signal.client.ClientType; +import com.acgist.taoyao.signal.party.media.Room; +import com.acgist.taoyao.signal.protocol.ProtocolRoomAdapter; + +/** + * 视频方向变化信令 + * + * @author acgist + */ +@Protocol +@Description( + body = """ + """, + flow = "媒体服务->信令服务->终端" +) +public class MediaVideoOrientationChangeProtocol extends ProtocolRoomAdapter { + + public static final String SIGNAL = "media::video::orientation::change"; + + public MediaVideoOrientationChangeProtocol() { + super("视频方向变化信令", SIGNAL); + } + + @Override + public void execute(String clientId, ClientType clientType, Room room, Client client, Client mediaClient, Message message, Map body) { + if(clientType.mediaServer()) { + room.broadcast(message); + } else { + this.logNoAdapter(clientType); + } + } } diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/room/RoomExpelProtocol.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/room/RoomExpelProtocol.java index 5f012ca..b24e882 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/room/RoomExpelProtocol.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/room/RoomExpelProtocol.java @@ -1,10 +1,48 @@ package com.acgist.taoyao.signal.protocol.room; +import java.util.Map; + +import com.acgist.taoyao.boot.annotation.Description; +import com.acgist.taoyao.boot.annotation.Protocol; +import com.acgist.taoyao.boot.config.Constant; +import com.acgist.taoyao.boot.model.Message; +import com.acgist.taoyao.boot.utils.MapUtils; +import com.acgist.taoyao.signal.client.Client; +import com.acgist.taoyao.signal.client.ClientType; +import com.acgist.taoyao.signal.party.media.Room; +import com.acgist.taoyao.signal.protocol.ProtocolRoomAdapter; + /** * 踢出房间信令 * * @author acgist */ -public class RoomExpelProtocol { +@Protocol +@Description( + body = """ + { + "roomId": "房间ID", + "clientId": "终端ID" + } + """, + flow = "终端->信令服务->终端" +) +public class RoomExpelProtocol extends ProtocolRoomAdapter { + + public static final String SIGNAL = "room::expel"; + + public RoomExpelProtocol() { + super("踢出房间信令", SIGNAL); + } + + @Override + public void execute(String clientId, ClientType clientType, Room room, Client client, Client mediaClient, Message message, Map body) { + if(clientType.mediaClient()) { + final String expelClientId = MapUtils.get(body, Constant.CLIENT_ID); + room.unicast(expelClientId, message); + } else { + this.logNoAdapter(clientType); + } + } } diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/room/RoomInviteProtocol.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/room/RoomInviteProtocol.java index 51de3f9..b4c6b04 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/room/RoomInviteProtocol.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/room/RoomInviteProtocol.java @@ -1,10 +1,50 @@ package com.acgist.taoyao.signal.protocol.room; +import java.util.Map; + +import com.acgist.taoyao.boot.annotation.Description; +import com.acgist.taoyao.boot.annotation.Protocol; +import com.acgist.taoyao.boot.config.Constant; +import com.acgist.taoyao.boot.model.Message; +import com.acgist.taoyao.boot.utils.MapUtils; +import com.acgist.taoyao.signal.client.Client; +import com.acgist.taoyao.signal.client.ClientType; +import com.acgist.taoyao.signal.party.media.Room; +import com.acgist.taoyao.signal.protocol.ProtocolRoomAdapter; + /** - * 邀请房间信令 + * 邀请终端信令 * * @author acgist */ -public class RoomInviteProtocol { +@Protocol +@Description( + body = """ + { + "roomId": "房间ID", + "clientId": "终端ID", + "password": "密码(选填)" + } + """, + flow = "终端->信令服务->终端" +) +public class RoomInviteProtocol extends ProtocolRoomAdapter { + + public static final String SIGNAL = "room::invite"; + + public RoomInviteProtocol() { + super("邀请终端信令", SIGNAL); + } + + @Override + public void execute(String clientId, ClientType clientType, Room room, Client client, Client mediaClient, Message message, Map body) { + if(clientType.mediaClient()) { + final String inviteClientId = MapUtils.get(body, Constant.CLIENT_ID); + body.put(Constant.PASSWORD, room.getPassword()); + this.clientManager.unicast(inviteClientId, message); + } else { + this.logNoAdapter(clientType); + } + } }