diff --git a/docs/Deploy.md b/docs/Deploy.md index 1b82bd3..7d11b7e 100644 --- a/docs/Deploy.md +++ b/docs/Deploy.md @@ -97,15 +97,15 @@ g++ -v ``` # 下载 -mkdir -p /data/nodejs -cd /data/nodejs +mkdir -p /data/dev/nodejs +cd /data/dev/nodejs wget https://nodejs.org/dist/v16.19.0/node-v16.19.0-linux-x64.tar.xz xz -d node-v16.19.0-linux-x64.tar.xz tar -xf node-v16.19.0-linux-x64.tar # 连接 -ln -sf /data/nodejs/node-v16.19.0-linux-x64/bin/npm /usr/local/bin/ -ln -sf /data/nodejs/node-v16.19.0-linux-x64/bin/node /usr/local/bin/ +ln -sf /data/dev/nodejs/node-v16.19.0-linux-x64/bin/npm /usr/local/bin/ +ln -sf /data/dev/nodejs/node-v16.19.0-linux-x64/bin/node /usr/local/bin/ # 验证 npm -v @@ -119,7 +119,7 @@ node -v npm install -g pm2 # 连接 -ln -sf /data/nodejs/node-v16.19.0-linux-x64/bin/pm2 /usr/local/bin/ +ln -sf /data/dev/nodejs/node-v16.19.0-linux-x64/bin/pm2 /usr/local/bin/ # 日志 pm2 install pm2-logrotate @@ -139,8 +139,8 @@ pm2 save ``` # 下载 -mkdir -p /data/java -cd /data/java +mkdir -p /data/dev/java +cd /data/dev/java wget https://download.java.net/java/GA/jdk17.0.2/dfd4a8d0985749f896bed50d7138ee7f/8/GPL/openjdk-17.0.2_linux-x64_bin.tar.gz tar -zxvf openjdk-17.0.2_linux-x64_bin.tar.gz @@ -148,12 +148,12 @@ tar -zxvf openjdk-17.0.2_linux-x64_bin.tar.gz vim ~/.bash_profile --- -JAVA_HOME=/data/java/jdk-17.0.2 +JAVA_HOME=/data/dev/java/jdk-17.0.2 PATH=$PATH:$JAVA_HOME/bin --- . ~/.bash_profile -ln -sf /data/java/jdk-17.0.2/bin/java /usr/local/bin/java +ln -sf /data/dev/java/jdk-17.0.2/bin/java /usr/local/bin/java # 验证 java -version @@ -163,8 +163,8 @@ java -version ``` # 下载 -mkdir -p /data/maven -cd /data/maven +mkdir -p /data/dev/maven +cd /data/dev/maven wget https://dlcdn.apache.org/maven/maven-3/3.8.6/binaries/apache-maven-3.8.6-bin.tar.gz tar -zxvf apache-maven-3.8.6-bin.tar.gz @@ -172,7 +172,7 @@ tar -zxvf apache-maven-3.8.6-bin.tar.gz vim ~/.bash_profile --- -MAVEN_HOME=/data/maven/apache-maven-3.8.6 +MAVEN_HOME=/data/dev/maven/apache-maven-3.8.6 PATH=$PATH:$MAVEN_HOME/bin --- @@ -191,8 +191,8 @@ yum install libffi-devel yum install openssl-devel # 下载 -mkdir -p /data/python -cd /data/python +mkdir -p /data/dev/python +cd /data/dev/python #wget https://www.python.org/ftp/python/3.8.16/Python-3.8.16.tar.xz wget https://mirrors.huaweicloud.com/python/3.8.16/Python-3.8.16.tar.xz xz -d Python-3.8.16.tar.xz diff --git a/taoyao-client-media/src/Taoyao.js b/taoyao-client-media/src/Taoyao.js index 7f5d276..81de5b2 100644 --- a/taoyao-client-media/src/Taoyao.js +++ b/taoyao-client-media/src/Taoyao.js @@ -330,14 +330,13 @@ class Room { return; } me.close = true; - // TODO:测试是否需要这里释放 - // me.producers.forEach(v => v.close()); - // me.consumers.forEach(v => v.close()); - // me.dataProducers.forEach(v => v.close()); - // me.dataConsumers.forEach(v => v.close()); + me.producers.forEach(v => v.close()); + me.consumers.forEach(v => v.close()); + me.dataProducers.forEach(v => v.close()); + me.dataConsumers.forEach(v => v.close()); + me.transports.forEach(v => v.close()); me.audioLevelObserver.close(); me.activeSpeakerObserver.close(); - me.transports.forEach(v => v.close()); me.mediasoupRouter.close(); } } @@ -402,12 +401,30 @@ class Taoyao { case "media::consumer::close": me.mediaConsumerClose(message, body); break; + case "media::consumer::pause": + me.mediaConsumerPause(message, body); + break; + case "media::consumer::resume": + me.mediaConsumerResume(message, body); + break; case "media::produce": me.mediaProduce(message, body); break; + case "media::producer::close": + me.mediaProducerClose(message, body); + break; + case "media::producer::pause": + me.mediaProducerPause(message, body); + break; + case "media::producer::resume": + me.mediaProducerResume(message, body); + break; case "media::router::rtp::capabilities": me.mediaRouterRtpCapabilities(message, body); break; + case "media::transport::close": + this.mediaTransportClose(message, body); + break; case "media::transport::webrtc::connect": me.mediaTransportWebrtcConnect(message, body); break; @@ -569,11 +586,12 @@ class Taoyao { producer.close(); }); producer.on("score", (score) => { + console.info("producer score:", producer.id, score); self.push( protocol.buildMessage("media::producer::score", { + score: score, roomId: roomId, producerId: producer.id, - score, }) ); }); @@ -584,7 +602,7 @@ class Taoyao { console.info("producer trace:", producer.id, trace); }); producer.observer.on("close", () => { - if(me.producers.delete(producer.id)) { + if(room.producers.delete(producer.id)) { console.info("producer close:", producer.id); this.push( protocol.buildMessage("media::producer::close", { @@ -598,9 +616,21 @@ class Taoyao { }); producer.observer.on("pause", () => { console.info("producer pause:", producer.id); + this.push( + protocol.buildMessage("media::producer::pause", { + roomId: roomId, + producerId: producer.id + }) + ); }); producer.observer.on("resume", () => { console.info("producer resume:", producer.id); + this.push( + protocol.buildMessage("media::producer::resume", { + roomId: roomId, + producerId: producer.id + }) + ); }); // producer.observer.on("score", fn(score)); // producer.observer.on("videoorientationchange", fn(videoOrientation)); @@ -610,10 +640,77 @@ class Taoyao { if (producer.kind === "audio") { room.audioLevelObserver .addProducer({ producerId: producer.id }) - .catch(() => {}); + .catch((error) => { + console.error("音量监听异常:", error); + }); room.activeSpeakerObserver .addProducer({ producerId: producer.id }) - .catch(() => {}); + .catch((error) => { + console.error("声音监听异常:", error); + }); + } + } + + /** + * 关闭生产者信令 + * + * @param {*} message 消息 + * @param {*} body 消息主体 + */ + async mediaProducerClose(message, body) { + const { + roomId, + producerId, + } = body; + const room = this.rooms.get(roomId); + const producer = room?.producers.get(producerId); + if(producer) { + console.info("关闭生产者:", producerId); + await producer.close(); + } else { + console.info("关闭生产者无效:", producerId); + } + } + + /** + * 暂停生产者信令 + * + * @param {*} message 消息 + * @param {*} body 消息主体 + */ + async mediaProducerPause(message, body) { + const { + roomId, + producerId, + } = body; + const room = this.rooms.get(roomId); + const producer = room.producers.get(producerId); + if(producer) { + console.info("暂停生产者:", producerId); + await producer.pause(); + } else { + console.info("暂停生产者无效:", producerId); + } + } + + /** + * 恢复生产者信令 + * + * @param {*} message 消息 + * @param {*} body 消息主体 + */ + async mediaProducerResume(message, body) { + const { + roomId, + producerId, + } = body; + const room = this.rooms.get(roomId); + const producer = room.producers.get(producerId); + if(producer) { + console.info("恢复生产者:", producerId); + await producer.resume(); + } else { + console.info("恢复生产者无效:", producerId); } } @@ -680,20 +777,17 @@ class Taoyao { room.consumers.set(consumer.id, consumer); consumer.on("transportclose", () => { console.info("consumer transportclose:", consumer.id); - // 信令服务统一调度关闭 - // consumer.close(); - // room.consumers.delete(consumer.id); + consumer.close(); }); consumer.on("producerclose", () => { console.info("consumer producerclose:", consumer.id); - // 信令服务统一调度关闭 - // consumer.close(); - // room.consumers.delete(consumer.id); + consumer.close(); }); consumer.on("producerpause", () => { console.info("consumer producerpause:", consumer.id); this.push( protocol.buildMessage("media::consumer::pause", { + roomId: roomId, consumerId: consumer.id, }) ); @@ -702,6 +796,7 @@ class Taoyao { console.info("consumer producerresume:", consumer.id); this.push( protocol.buildMessage("media::consumer::resume", { + roomId: roomId, consumerId: consumer.id, }) ); @@ -720,6 +815,7 @@ class Taoyao { console.info("consumer layerschange:", consumer.id, layers); this.push( protocol.buildMessage("media::consumer::layers::change", { + roomId: roomId, consumerId: consumer.id, spatialLayer: layers ? layers.spatialLayer : null, temporalLayer: layers ? layers.temporalLayer : null, @@ -734,7 +830,7 @@ class Taoyao { // }); consumer.observer.on("close", () => { if(room.consumers.delete(consumer.id)) { - console.debug("consumer close:", consumer.id); + console.info("consumer close:", consumer.id); this.push( protocol.buildMessage("media::consumer::close", { roomId: roomId, @@ -746,6 +842,7 @@ class Taoyao { } }); consumer.observer.on("pause", () => { + console.info("consumer pause:", consumer.id); this.push( protocol.buildMessage("media::consumer::pause", { roomId: roomId, @@ -754,6 +851,7 @@ class Taoyao { ); }); consumer.observer.on("resume", () => { + console.info("consumer resume:", consumer.id); this.push( protocol.buildMessage("media::consumer::resume", { roomId: roomId, @@ -805,15 +903,51 @@ class Taoyao { * @param {*} message 消息 * @param {*} body 消息主体 */ - mediaConsumerClose(message, body) { + async mediaConsumerClose(message, body) { + const { roomId, consumerId } = body; + const room = this.rooms.get(roomId); + const consumer = room?.consumers.get(consumerId); + if(consumer) { + console.info("关闭消费者:", consumerId); + await consumer.close(); + } else { + console.info("关闭消费者无效:", consumerId); + } + } + + /** + * 暂停消费者信令 + * + * @param {*} message 消息 + * @param {*} body 消息主体 + */ + async mediaConsumerPause(message, body) { const { roomId, consumerId } = body; const room = this.rooms.get(roomId); const consumer = room.consumers.get(consumerId); if(consumer) { - console.info("关闭消费者:", consumerId); - consumer.close(); + console.info("暂停消费者:", consumerId); + await consumer.pause(); } else { - console.debug("关闭消费者无效:", consumerId); + console.info("暂停消费者无效:", consumerId); + } + } + + /** + * 恢复消费者信令 + * + * @param {*} message 消息 + * @param {*} body 消息主体 + */ + async mediaConsumerResume(message, body) { + const { roomId, consumerId } = body; + const room = this.rooms.get(roomId); + const consumer = room.consumers.get(consumerId); + if(consumer) { + console.info("恢复消费者:", consumerId); + await consumer.resume(); + } else { + console.info("恢复消费者无效:", consumerId); } } @@ -830,6 +964,24 @@ class Taoyao { this.push(message); } + /** + * 关闭传输通道信令 + * + * @param {*} message 消息 + * @param {*} body 消息主体 + */ + async mediaTransportClose(message, body) { + const { roomId, transportId } = body; + const room = this.rooms.get(roomId); + const transport = room.transports.get(transportId); + if(transport) { + console.info("关闭传输通道:", transportId); + transport.close(); + } else { + console.info("关闭传输通道无效:", transportId); + } + } + /** * 连接WebRTC通道信令 * @@ -892,20 +1044,30 @@ class Taoyao { console.debug("transport trace:", transport.id, trace); }); transport.observer.on("close", () => { - console.info("transport close:", transport.id); - }); - transport.observer.on("newproducer", (producer) => { - console.info("transport newproducer:", transport.id, producer.id); - }); - transport.observer.on("newconsumer", (consumer) => { - console.info("transport newconsumer:", transport.id, consumer.id); - }); - transport.observer.on("newdataproducer", (dataProducer) => { - console.info("transport newdataproducer:", transport.id, dataProducer.id); - }); - transport.observer.on("newdataconsumer", (dataConsumer) => { - console.info("transport newdataconsumer:", transport.id, dataProducer.id); + if(room.transports.delete(transport.id)) { + console.info("transport close:", transport.id); + self.push( + protocol.buildMessage("media::transport::close", { + roomId: roomId, + transportId: transport.id, + }) + ); + } else { + console.info("transport close non:", transport.id); + } }); + // transport.observer.on("newproducer", (producer) => { + // console.info("transport newproducer:", transport.id, producer.id); + // }); + // transport.observer.on("newconsumer", (consumer) => { + // console.info("transport newconsumer:", transport.id, consumer.id); + // }); + // transport.observer.on("newdataproducer", (dataProducer) => { + // console.info("transport newdataproducer:", transport.id, dataProducer.id); + // }); + // transport.observer.on("newdataconsumer", (dataConsumer) => { + // console.info("transport newdataconsumer:", transport.id, dataProducer.id); + // }); // transport.observer.on("trace", fn(trace)); /********************* webRtcTransport通道事件 *********************/ // transport.on("icestatechange", (iceState) => { @@ -1020,8 +1182,8 @@ class Taoyao { activeSpeakerObserver, }); me.rooms.set(roomId, room); - console.info("roomCreate:", roomId, mediasoupRouter.id); me.push(message); + console.info("创建房间:", roomId, mediasoupRouter.id); mediasoupRouter.on("workerclose", () => { console.info("mediasoupRouter workerclose:", roomId, mediasoupRouter.id); room.closeAll(); diff --git a/taoyao-client-web/src/components/Taoyao.js b/taoyao-client-web/src/components/Taoyao.js index 15d8c63..8af96e1 100644 --- a/taoyao-client-web/src/components/Taoyao.js +++ b/taoyao-client-web/src/components/Taoyao.js @@ -349,9 +349,11 @@ class Taoyao extends RemoteClient { username, password, roomId, + // TODO:修改默认关闭 dataConsume = true, audioConsume = true, videoConsume = true, + // TODO:修改默认关闭 dataProduce = true, audioProduce = true, videoProduce = true, @@ -784,7 +786,7 @@ class Taoyao extends RemoteClient { return; } console.info("关闭房间:", roomId); - me.close(); + me.closeMedia(); } /** * 创建房间信令 @@ -1450,7 +1452,7 @@ class Taoyao extends RemoteClient { /** * 关闭媒体 */ - closeMedia = function () { + closeMedia() { let self = this; if (self.sendTransport) { self.sendTransport.close(); @@ -1460,13 +1462,13 @@ class Taoyao extends RemoteClient { } }; /** - * 关闭 + * 关闭资源 */ - close = function () { - let self = this; - self.closeMedia(); - if (self.signalChannel) { - self.signalChannel.close(); + close() { + let me = this; + me.closeMedia(); + if (me.signalChannel) { + me.signalChannel.close(); } }; } diff --git a/taoyao-signal-server/taoyao-boot/src/main/java/com/acgist/taoyao/boot/config/Constant.java b/taoyao-signal-server/taoyao-boot/src/main/java/com/acgist/taoyao/boot/config/Constant.java index f47dbdd..0279e86 100644 --- a/taoyao-signal-server/taoyao-boot/src/main/java/com/acgist/taoyao/boot/config/Constant.java +++ b/taoyao-signal-server/taoyao-boot/src/main/java/com/acgist/taoyao/boot/config/Constant.java @@ -11,10 +11,6 @@ public interface Constant { * 接收方的终端标识 */ String TO = "to"; - /** - * 换行 - */ - String LINE = "\n"; /** * IP */ diff --git a/taoyao-signal-server/taoyao-boot/src/main/java/com/acgist/taoyao/boot/config/SocketProperties.java b/taoyao-signal-server/taoyao-boot/src/main/java/com/acgist/taoyao/boot/config/SocketProperties.java index feb1117..ee65f9d 100644 --- a/taoyao-signal-server/taoyao-boot/src/main/java/com/acgist/taoyao/boot/config/SocketProperties.java +++ b/taoyao-signal-server/taoyao-boot/src/main/java/com/acgist/taoyao/boot/config/SocketProperties.java @@ -17,12 +17,42 @@ import lombok.Setter; @ConfigurationProperties(prefix = "taoyao.socket") public class SocketProperties { + /** + * 机密策略 + * + * @author acgist + */ + @Getter + public enum Encrypt { + + // AES + AES("AES/ECB/PKCS5Padding"), + // DES + DES("DES/ECB/PKCS5Padding"), + // 明文 + PLAINTEXT(null); + + /** + * 算法 + */ + private final String algo; + + private Encrypt(String algo) { + this.algo = algo; + } + + } + @Schema(title = "是否启用", description = "是否启用") private Boolean enabled; @Schema(title = "监听地址", description = "监听地址") private String host; @Schema(title = "监听端口", description = "监听端口") private Integer port; + @Schema(title = "加密策略", description = "加密策略") + private Encrypt encrypt; + @Schema(title = "加密密钥", description = "加密密钥:为空自动生成") + private String encryptKey; @Schema(title = "超时时间", description = "超时时间") private Long timeout; @Schema(title = "队列长度", description = "队列长度") @@ -37,5 +67,7 @@ public class SocketProperties { private Long keepAliveTime; @Schema(title = "缓冲大小", description = "缓冲大小") private Integer bufferSize; + @Schema(title = "最大缓冲大小", description = "最大缓冲大小") + private Integer maxBufferSize; } diff --git a/taoyao-signal-server/taoyao-boot/src/main/java/com/acgist/taoyao/boot/configuration/WebMvcConfigurerAutoConfiguration.java b/taoyao-signal-server/taoyao-boot/src/main/java/com/acgist/taoyao/boot/configuration/WebMvcConfigurerAutoConfiguration.java index f1d9f4f..94a35e5 100644 --- a/taoyao-signal-server/taoyao-boot/src/main/java/com/acgist/taoyao/boot/configuration/WebMvcConfigurerAutoConfiguration.java +++ b/taoyao-signal-server/taoyao-boot/src/main/java/com/acgist/taoyao/boot/configuration/WebMvcConfigurerAutoConfiguration.java @@ -32,7 +32,9 @@ public class WebMvcConfigurerAutoConfiguration implements WebMvcConfigurer { .sorted((a, z) -> a.getValue().compareTo(z.getValue())) .forEach(entry -> { final InterceptorAdapter value = entry.getValue(); - log.info("注册MVC拦截器:{} - {}", String.format("%-32s", entry.getKey()), value.name()); + if(log.isDebugEnabled()) { + log.debug("注册MVC拦截器:{} - {}", String.format("%-32s", entry.getKey()), value.name()); + } registry.addInterceptor(value).addPathPatterns(value.pathPattern()); }); } diff --git a/taoyao-signal-server/taoyao-server/src/main/java/com/acgist/taoyao/controller/ConfigController.java b/taoyao-signal-server/taoyao-server/src/main/java/com/acgist/taoyao/controller/ConfigController.java index 6e45aee..3de4f12 100644 --- a/taoyao-signal-server/taoyao-server/src/main/java/com/acgist/taoyao/controller/ConfigController.java +++ b/taoyao-signal-server/taoyao-server/src/main/java/com/acgist/taoyao/controller/ConfigController.java @@ -5,6 +5,7 @@ import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import com.acgist.taoyao.boot.config.MediaProperties; +import com.acgist.taoyao.boot.config.SocketProperties; import com.acgist.taoyao.boot.config.WebrtcProperties; import com.acgist.taoyao.boot.model.Message; import com.acgist.taoyao.signal.config.camera.CameraProperties; @@ -27,11 +28,18 @@ public class ConfigController { private final MediaProperties mediaProperties; private final CameraProperties cameraProperties; + private final SocketProperties socketProperties; private final WebrtcProperties webrtcProperties; - public ConfigController(MediaProperties mediaProperties, CameraProperties cameraProperties, WebrtcProperties webrtcProperties) { + public ConfigController( + MediaProperties mediaProperties, + CameraProperties cameraProperties, + SocketProperties socketProperties, + WebrtcProperties webrtcProperties + ) { this.mediaProperties = mediaProperties; this.cameraProperties = cameraProperties; + this.socketProperties = socketProperties; this.webrtcProperties = webrtcProperties; } @@ -48,6 +56,13 @@ public class ConfigController { private Message camera() { return Message.success(this.cameraProperties); } + + @Operation(summary = "Socket配置", description = "Socket配置") + @GetMapping("/socket") + @ApiResponse(content = @Content(schema = @Schema(implementation = SocketProperties.class))) + public Message socket() { + return Message.success(this.socketProperties); + } @Operation(summary = "WebRTC配置", description = "WebRTC配置") @GetMapping("/webrtc") diff --git a/taoyao-signal-server/taoyao-server/src/main/resources/application.yml b/taoyao-signal-server/taoyao-server/src/main/resources/application.yml index 5325b1c..6a35078 100644 --- a/taoyao-signal-server/taoyao-server/src/main/resources/application.yml +++ b/taoyao-signal-server/taoyao-server/src/main/resources/application.yml @@ -131,6 +131,8 @@ taoyao: enabled: true host: 0.0.0.0 port: 9999 + encrypt: DES + encrypt-key: timeout: ${taoyao.timeout} queue-size: 100000 min-thread: 4 @@ -138,9 +140,10 @@ taoyao: thread-name-prefix: ${spring.application.name}-signal- keep-alive-time: 60000 buffer-size: 2048 + max-buffer-size: 32768 # WebRTC配置 webrtc: - # 是否加密 + # 是否加密:E2E encrypt: false # STUN服务 stun: diff --git a/taoyao-signal-server/taoyao-server/src/test/java/com/acgist/taoyao/signal/SocketSignalTest.java b/taoyao-signal-server/taoyao-server/src/test/java/com/acgist/taoyao/signal/SocketSignalTest.java index 3f00c52..732a733 100644 --- a/taoyao-signal-server/taoyao-server/src/test/java/com/acgist/taoyao/signal/SocketSignalTest.java +++ b/taoyao-signal-server/taoyao-server/src/test/java/com/acgist/taoyao/signal/SocketSignalTest.java @@ -1,16 +1,21 @@ package com.acgist.taoyao.signal; -import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.net.InetSocketAddress; import java.net.Socket; +import java.nio.ByteBuffer; import java.util.concurrent.Executor; import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicInteger; + +import javax.crypto.Cipher; import org.junit.jupiter.api.Test; -import com.acgist.taoyao.boot.config.Constant; +import com.acgist.taoyao.boot.config.SocketProperties.Encrypt; +import com.acgist.taoyao.boot.model.MessageCodeException; +import com.acgist.taoyao.signal.utils.CipherUtils; import lombok.extern.slf4j.Slf4j; @@ -21,38 +26,81 @@ public class SocketSignalTest { void testSocket() throws Exception { final Socket socket = new Socket(); socket.connect(new InetSocketAddress("127.0.0.1", 9999)); - final OutputStream outputStream = socket.getOutputStream(); + final AtomicInteger recvIndex = new AtomicInteger(); final InputStream inputStream = socket.getInputStream(); - final String line = Constant.LINE; - final int lineLength = line.length(); + final OutputStream outputStream = socket.getOutputStream(); + // 随机密码:https://localhost:8888/config/socket + final String secret = """ + Oi7ZvxZEcOU= + """.strip(); + final Cipher encrypt = CipherUtils.buildCipher(Cipher.ENCRYPT_MODE, Encrypt.DES, secret); + final Cipher decrypt = CipherUtils.buildCipher(Cipher.DECRYPT_MODE, Encrypt.DES, secret); + // 接收 new Thread(() -> { - int index = 0; int length = 0; + short messageLength = 0; final byte[] bytes = new byte[1024]; - final StringBuilder builder = new StringBuilder(); + final ByteBuffer buffer = ByteBuffer.allocateDirect(16 * 1024); try { while((length = inputStream.read(bytes)) >= 0) { - builder.append(new String(bytes, 0, length)); - while((index = builder.indexOf(line)) >= 0) { - log.info("收到消息:{}", builder.substring(0, index)); - builder.delete(0, index + lineLength); - } + buffer.put(bytes, 0, length); + while(buffer.position() > 0) { + if(messageLength <= 0) { + if(buffer.position() < Short.BYTES) { + // 不够消息长度 + break; + } else { + buffer.flip(); + messageLength = buffer.getShort(); + buffer.compact(); + if(messageLength > 16 * 1024) { + throw MessageCodeException.of("超过最大数据大小:" + messageLength); + } + } + } else { + if(buffer.position() < messageLength) { + // 不够消息长度 + break; + } else { + final byte[] message = new byte[messageLength]; + messageLength = 0; + buffer.flip(); + buffer.get(message); + buffer.compact(); + log.debug("收到消息:{}", new String(decrypt.doFinal(message))); + recvIndex.incrementAndGet(); + } + } + } } - } catch (IOException e) { + } catch (Exception e) { log.error("读取异常", e); } }).start(); + // 发送 + final AtomicInteger sendIndex = new AtomicInteger(); final Executor executor = Executors.newFixedThreadPool(10); for (int index = 0; index < 100; index++) { executor.execute(() -> { try { - outputStream.write(("{}" + line).getBytes()); - } catch (IOException e) { + final byte[] bytes = ("{\"time\":" + System.nanoTime() + "}").getBytes(); + final byte[] encryptBytes = encrypt.doFinal(bytes); + final ByteBuffer buffer = ByteBuffer.allocateDirect(Short.BYTES + encryptBytes.length); + buffer.putShort((short) encryptBytes.length); + buffer.put(encryptBytes); + buffer.flip(); + final byte[] message = new byte[buffer.capacity()]; + buffer.get(message); + outputStream.write(message); + sendIndex.incrementAndGet(); + } catch (Exception e) { log.error("发送异常", e); } }); } Thread.sleep(5000); + log.info("发送数据:{}", sendIndex.get()); + log.info("接收数据:{}", recvIndex.get()); socket.close(); } diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/client/socket/SocketClient.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/client/socket/SocketClient.java index 49c1fa9..445b8a6 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/client/socket/SocketClient.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/client/socket/SocketClient.java @@ -7,10 +7,15 @@ import java.nio.channels.AsynchronousSocketChannel; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; -import com.acgist.taoyao.boot.config.Constant; +import javax.crypto.BadPaddingException; +import javax.crypto.Cipher; +import javax.crypto.IllegalBlockSizeException; + +import com.acgist.taoyao.boot.config.SocketProperties; import com.acgist.taoyao.boot.model.Message; import com.acgist.taoyao.boot.model.MessageCodeException; import com.acgist.taoyao.signal.client.ClientAdapter; +import com.acgist.taoyao.signal.utils.CipherUtils; import lombok.Getter; import lombok.Setter; @@ -26,20 +31,15 @@ import lombok.extern.slf4j.Slf4j; @Setter public class SocketClient extends ClientAdapter { - /** - * 换行符号 - */ - private final byte[] line; - /** - * 换行符号长度 - */ - private final int lineLength; - - public SocketClient(Long timeout, AsynchronousSocketChannel instance) { - super(timeout, instance); + /** + * 加密工具 + */ + private final Cipher cipher; + + public SocketClient(SocketProperties socketProperties, AsynchronousSocketChannel instance) { + super(socketProperties.getTimeout(), instance); this.ip = this.clientIp(instance); - this.line = Constant.LINE.getBytes(); - this.lineLength = this.line.length; + this.cipher = CipherUtils.buildCipher(Cipher.ENCRYPT_MODE, socketProperties.getEncrypt(), socketProperties.getEncryptKey()); } @Override @@ -47,10 +47,12 @@ public class SocketClient extends ClientAdapter { synchronized (this.instance) { try { if(this.instance.isOpen()) { - final byte[] bytes = message.toString().getBytes(); - final ByteBuffer buffer = ByteBuffer.allocateDirect(bytes.length + this.lineLength); + // 加密 + final byte[] bytes = this.encrypt(message); + // 发送 + final ByteBuffer buffer = ByteBuffer.allocateDirect(Short.BYTES + bytes.length); + buffer.putShort((short) bytes.length); buffer.put(bytes); - buffer.put(this.line); buffer.flip(); final Future future = this.instance.write(buffer); future.get(this.timeout, TimeUnit.MILLISECONDS); @@ -76,4 +78,21 @@ public class SocketClient extends ClientAdapter { } } + /** + * @param message 消息 + * + * @return 加密消息 + */ + private byte[] encrypt(Message message) { + final byte[] bytes = message.toString().getBytes(); + if(this.cipher != null) { + try { + return this.cipher.doFinal(bytes); + } catch (IllegalBlockSizeException | BadPaddingException e) { + log.error("加密异常:{}", message); + } + } + return bytes; + } + } diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/client/socket/SocketSignalAcceptHandler.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/client/socket/SocketSignalAcceptHandler.java index 52d288d..df86d59 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/client/socket/SocketSignalAcceptHandler.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/client/socket/SocketSignalAcceptHandler.java @@ -42,11 +42,11 @@ public final class SocketSignalAcceptHandler implements CompletionHandler= 0) { - final String message = this.builder.substring(0, index); - this.builder.delete(0, index + this.lineLength); - log.debug("Socket信令消息:{}-{}", this.channel, message); - try { - this.protocolManager.execute(message.strip(), this.channel); - } catch (Exception e) { - log.error("处理Socket信令消息异常:{}-{}", this.clientManager.clients(this.channel), message, e); - this.clientManager.push(this.channel, this.platformErrorProtocol.build(e)); - } + this.buffer.put(buffer); + while(this.buffer.position() > 0) { + if(this.messageLength <= 0) { + if(this.buffer.position() < Short.BYTES) { + // 不够消息长度 + break; + } else { + this.buffer.flip(); + this.messageLength = this.buffer.getShort(); + this.buffer.compact(); + if(this.messageLength < 0 || this.messageLength > this.maxBufferSize) { + throw MessageCodeException.of("信令消息长度错误:" + this.messageLength); + } + } + } else { + if(this.buffer.position() < this.messageLength) { + // 不够消息长度 + break; + } else { + // 拆包 + final byte[] bytes = new byte[this.messageLength]; + this.messageLength = 0; + this.buffer.flip(); + this.buffer.get(bytes); + this.buffer.compact(); + // 解密 + final String message = this.decrypt(bytes); + log.debug("Socket信令消息:{} - {}", this.channel, message); + // 处理 + this.execute(message.strip()); + } + } } } this.loopMessage(); @@ -120,4 +149,32 @@ public final class SocketSignalMessageHandler implements CompletionHandler { + final byte[] bytes = new byte[16]; + random.nextBytes(bytes); + socketProperties.setEncryptKey(Base64.getMimeEncoder().encodeToString(bytes)); + } + case DES -> { + final byte[] bytes = new byte[8]; + random.nextBytes(bytes); + socketProperties.setEncryptKey(Base64.getMimeEncoder().encodeToString(bytes)); + } + default -> { + // 其他情况使用明文 + } } + log.info("Socket信令加密密码:{}", socketProperties.getEncryptKey()); + } + +} diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/controller/ProtocolController.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/controller/ProtocolController.java index 94ca7e6..4a4d990 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/controller/ProtocolController.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/controller/ProtocolController.java @@ -11,7 +11,6 @@ import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import com.acgist.taoyao.boot.annotation.Description; -import com.acgist.taoyao.boot.config.Constant; import com.acgist.taoyao.signal.protocol.Protocol; import io.swagger.v3.oas.annotations.Operation; @@ -38,6 +37,7 @@ public class ProtocolController { @Operation(summary = "信令列表", description = "信令列表Markdown") @GetMapping("/list") public String list() { + final String newLine = System.lineSeparator(); final StringBuilder builder = new StringBuilder(""" ## 信令格式 @@ -94,21 +94,21 @@ public class ProtocolController { // 信令名称 builder.append("### ").append(name) .append("(").append(signal).append(")") - .append(Constant.LINE).append(Constant.LINE); + .append(newLine).append(newLine); // 描述信息 final String memo = description.memo().strip(); if(StringUtils.isNotEmpty(memo)) { - builder.append(memo).append(Constant.LINE).append(Constant.LINE); + builder.append(memo).append(newLine).append(newLine); } // 消息主体 builder - .append("```").append(Constant.LINE) - .append("# 消息主体").append(Constant.LINE); - Stream.of(description.body()).forEach(line -> builder.append(line.strip()).append(Constant.LINE)); + .append("```").append(newLine) + .append("# 消息主体").append(newLine); + Stream.of(description.body()).forEach(line -> builder.append(line.strip()).append(newLine)); // 数据流向 - builder.append("# 数据流向").append(Constant.LINE); - Stream.of(description.flow()).forEach(line -> builder.append(line.strip()).append(Constant.LINE)); - builder.append("```").append(Constant.LINE).append(Constant.LINE); + builder.append("# 数据流向").append(newLine); + Stream.of(description.flow()).forEach(line -> builder.append(line.strip()).append(newLine)); + builder.append("```").append(newLine).append(newLine); }); return builder.toString(); } diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/event/RoomEventAdapter.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/event/RoomEventAdapter.java index 60e271f..4621071 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/event/RoomEventAdapter.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/event/RoomEventAdapter.java @@ -42,5 +42,5 @@ public class RoomEventAdapter extends ApplicationEventAdapter { public Client getMediaClient() { return this.room.getMediaClient(); } - + } diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/event/media/MediaConsumerPauseEvent.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/event/media/MediaConsumerPauseEvent.java new file mode 100644 index 0000000..7d00910 --- /dev/null +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/event/media/MediaConsumerPauseEvent.java @@ -0,0 +1,30 @@ +package com.acgist.taoyao.signal.event.media; + +import com.acgist.taoyao.signal.event.RoomEventAdapter; +import com.acgist.taoyao.signal.party.media.Room; + +import lombok.Getter; +import lombok.Setter; + +/** + * 暂停消费者事件 + * + * @author acgist + */ +@Getter +@Setter +public class MediaConsumerPauseEvent extends RoomEventAdapter { + + private static final long serialVersionUID = 1L; + + /** + * 消费者ID + */ + private final String consumerId; + + public MediaConsumerPauseEvent(String consumerId, Room room) { + super(room); + this.consumerId = consumerId; + } + +} diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/event/media/MediaConsumerResumeEvent.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/event/media/MediaConsumerResumeEvent.java new file mode 100644 index 0000000..0d29168 --- /dev/null +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/event/media/MediaConsumerResumeEvent.java @@ -0,0 +1,30 @@ +package com.acgist.taoyao.signal.event.media; + +import com.acgist.taoyao.signal.event.RoomEventAdapter; +import com.acgist.taoyao.signal.party.media.Room; + +import lombok.Getter; +import lombok.Setter; + +/** + * 恢复消费者信令 + * + * @author acgist + */ +@Getter +@Setter +public class MediaConsumerResumeEvent extends RoomEventAdapter { + + private static final long serialVersionUID = 1L; + + /** + * 消费者ID + */ + private final String consumerId; + + public MediaConsumerResumeEvent(String consumerId, Room room) { + super(room); + this.consumerId = consumerId; + } + +} diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/event/media/MediaDataConsumerCloseEvent.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/event/media/MediaDataConsumerCloseEvent.java new file mode 100644 index 0000000..2576c25 --- /dev/null +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/event/media/MediaDataConsumerCloseEvent.java @@ -0,0 +1,30 @@ +package com.acgist.taoyao.signal.event.media; + +import com.acgist.taoyao.signal.event.RoomEventAdapter; +import com.acgist.taoyao.signal.party.media.Room; + +import lombok.Getter; +import lombok.Setter; + +/** + * 关闭数据消费者事件 + * + * @author acgist + */ +@Getter +@Setter +public class MediaDataConsumerCloseEvent extends RoomEventAdapter { + + private static final long serialVersionUID = 1L; + + /** + * 消费者ID + */ + private final String consumerId; + + public MediaDataConsumerCloseEvent(String consumerId, Room room) { + super(room); + this.consumerId = consumerId; + } + +} diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/event/media/MediaDataProducerCloseEvent.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/event/media/MediaDataProducerCloseEvent.java new file mode 100644 index 0000000..5e59210 --- /dev/null +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/event/media/MediaDataProducerCloseEvent.java @@ -0,0 +1,30 @@ +package com.acgist.taoyao.signal.event.media; + +import com.acgist.taoyao.signal.event.RoomEventAdapter; +import com.acgist.taoyao.signal.party.media.Room; + +import lombok.Getter; +import lombok.Setter; + +/** + * 关闭数据生产者事件 + * + * @author acgist + */ +@Getter +@Setter +public class MediaDataProducerCloseEvent extends RoomEventAdapter { + + private static final long serialVersionUID = 1L; + + /** + * 生产者ID + */ + private final String producerId; + + public MediaDataProducerCloseEvent(String producerId, Room room) { + super(room); + this.producerId = producerId; + } + +} diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/event/media/MediaProducerPauseEvent.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/event/media/MediaProducerPauseEvent.java new file mode 100644 index 0000000..ec8403b --- /dev/null +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/event/media/MediaProducerPauseEvent.java @@ -0,0 +1,30 @@ +package com.acgist.taoyao.signal.event.media; + +import com.acgist.taoyao.signal.event.RoomEventAdapter; +import com.acgist.taoyao.signal.party.media.Room; + +import lombok.Getter; +import lombok.Setter; + +/** + * 暂停生产者事件 + * + * @author acgist + */ +@Getter +@Setter +public class MediaProducerPauseEvent extends RoomEventAdapter { + + private static final long serialVersionUID = 1L; + + /** + * 生产者ID + */ + private final String producerId; + + public MediaProducerPauseEvent(String producerId, Room room) { + super(room); + this.producerId = producerId; + } + +} diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/event/media/MediaProducerResumeEvent.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/event/media/MediaProducerResumeEvent.java new file mode 100644 index 0000000..c1e5faa --- /dev/null +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/event/media/MediaProducerResumeEvent.java @@ -0,0 +1,30 @@ +package com.acgist.taoyao.signal.event.media; + +import com.acgist.taoyao.signal.event.RoomEventAdapter; +import com.acgist.taoyao.signal.party.media.Room; + +import lombok.Getter; +import lombok.Setter; + +/** + * 恢复生产者事件 + * + * @author acgist + */ +@Getter +@Setter +public class MediaProducerResumeEvent extends RoomEventAdapter { + + private static final long serialVersionUID = 1L; + + /** + * 生产者ID + */ + private final String producerId; + + public MediaProducerResumeEvent(String producerId, Room room) { + super(room); + this.producerId = producerId; + } + +} diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/event/media/TransportCloseEvent.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/event/media/TransportCloseEvent.java new file mode 100644 index 0000000..5da5875 --- /dev/null +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/event/media/TransportCloseEvent.java @@ -0,0 +1,30 @@ +package com.acgist.taoyao.signal.event.media; + +import com.acgist.taoyao.signal.event.RoomEventAdapter; +import com.acgist.taoyao.signal.party.media.Room; + +import lombok.Getter; +import lombok.Setter; + +/** + * 关闭通道事件 + * + * @author acgist + */ +@Getter +@Setter +public class TransportCloseEvent extends RoomEventAdapter { + + private static final long serialVersionUID = 1L; + + /** + * 通道ID + */ + private final String transportId; + + public TransportCloseEvent(String transportId, Room room) { + super(room); + this.transportId = transportId; + } + +} diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/party/media/ClientWrapper.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/party/media/ClientWrapper.java index af0638e..2e78886 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/party/media/ClientWrapper.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/party/media/ClientWrapper.java @@ -20,42 +20,6 @@ import lombok.extern.slf4j.Slf4j; @Setter public class ClientWrapper implements AutoCloseable { - /** - * 媒体订阅类型 - * - * @author acgist - */ - public enum SubscribeType { - - // 订阅所有媒体 - ALL, - // 订阅所有音频媒体 - ALL_AUDIO, - // 订阅所有视频媒体 - ALL_VIDEO, - // 没有订阅任何媒体 - NONE; - - public static final SubscribeType of(String value) { - for (SubscribeType type : SubscribeType.values()) { - if(type.name().equalsIgnoreCase(value)) { - return type; - } - } - return SubscribeType.ALL; - } - - public boolean canConsume(Producer producer) { - return switch (this) { - case NONE -> false; - case ALL_AUDIO -> producer.getKind() == Kind.AUDIO; - case ALL_VIDEO -> producer.getKind() == Kind.VIDEO; - default -> true; - }; - } - - } - /** * 房间 */ @@ -105,11 +69,13 @@ public class ClientWrapper implements AutoCloseable { */ private final Map consumers; /** - * 数据通道生产者 + * 数据生产者 + * 其他终端消费当前终端的消费者 */ private final Map dataProducers; /** - * 数据通道消费者 + * 数据消费者 + * 当前终端消费其他终端的消费者 */ private final Map dataConsumers; @@ -153,19 +119,24 @@ public class ClientWrapper implements AutoCloseable { @Override public void close() { - // TODO:释放资源:通道、消费者、生产者 - this.consumers.forEach((k, v) -> v.close()); - this.producers.forEach((k, v) -> v.close()); - // TODO:实现 - this.recvTransport.close(); - this.sendTransport.close(); + // 注意:不要关闭终端 + this.consumers.values().forEach(Consumer::close); + this.producers.values().forEach(Producer::close); + this.dataConsumers.values().forEach(DataConsumer::close); + this.dataProducers.values().forEach(DataProducer::close); + if(this.recvTransport != null) { + this.recvTransport.close(); + } + if(this.sendTransport != null) { + this.sendTransport.close(); + } } /** * 记录日志 */ public void log() { - log.debug(""" + log.info(""" 当前终端:{} 消费者数量:{} 生产者数量:{} diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/party/media/Consumer.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/party/media/Consumer.java index 6ff4e4b..56a6fa4 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/party/media/Consumer.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/party/media/Consumer.java @@ -2,6 +2,8 @@ package com.acgist.taoyao.signal.party.media; import com.acgist.taoyao.signal.event.EventPublisher; import com.acgist.taoyao.signal.event.media.MediaConsumerCloseEvent; +import com.acgist.taoyao.signal.event.media.MediaConsumerPauseEvent; +import com.acgist.taoyao.signal.event.media.MediaConsumerResumeEvent; import lombok.Getter; import lombok.Setter; @@ -62,14 +64,25 @@ public class Consumer extends OperatorAdapter { @Override public void remove() { - this.getProducer().remove(this.consumerId); - this.consumerClient.getConsumers().remove(this.consumerId); log.info("移除消费者:{} - {}", this.streamId, this.consumerId); + this.room.getConsumers().remove(this.consumerId); + this.producer.getConsumers().remove(this.consumerId); + this.consumerClient.getConsumers().remove(this.consumerId); } - /** - * 记录日志 - */ + @Override + public void pause() { + log.info("暂停消费者:{} - {}", this.streamId, this.consumerId); + EventPublisher.publishEvent(new MediaConsumerPauseEvent(this.consumerId, this.room)); + } + + @Override + public void resume() { + log.info("恢复消费者:{} - {}", this.streamId, this.consumerId); + EventPublisher.publishEvent(new MediaConsumerResumeEvent(this.consumerId, this.room)); + } + + @Override public void log() { log.debug("当前消费者:{} - {} - {}", this.consumerId, this.kind, this.streamId); } diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/party/media/DataConsumer.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/party/media/DataConsumer.java index a52b4f1..3c7d003 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/party/media/DataConsumer.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/party/media/DataConsumer.java @@ -1,5 +1,8 @@ package com.acgist.taoyao.signal.party.media; +import com.acgist.taoyao.signal.event.EventPublisher; +import com.acgist.taoyao.signal.event.media.MediaDataConsumerCloseEvent; + import lombok.Getter; import lombok.Setter; import lombok.extern.slf4j.Slf4j; @@ -12,16 +15,8 @@ import lombok.extern.slf4j.Slf4j; @Slf4j @Getter @Setter -public class DataConsumer { +public class DataConsumer extends OperatorAdapter { - /** - * 消费者终端 - */ - private final ClientWrapper consumeClient; - /** - * 生产者 - */ - private final Producer producer; /** * 数据流ID */ @@ -30,17 +25,45 @@ public class DataConsumer { * 消费者标识 */ private final String consumerId; + /** + * 房间 + */ + private final Room room; + /** + * 生产者 + */ + private final DataProducer dataProducer; + /** + * 消费者终端 + */ + private final ClientWrapper consumerClient; - public DataConsumer(ClientWrapper consumeClient, Producer producer, String streamId, String consumerId) { - this.consumeClient = consumeClient; - this.producer = producer; + public DataConsumer(String streamId, String consumerId, Room room, DataProducer dataProducer, ClientWrapper consumerClient) { this.streamId = streamId; this.consumerId = consumerId; + this.room = room; + this.dataProducer = dataProducer; + this.consumerClient = consumerClient; } - /** - * 记录日志 - */ + @Override + public void close() { + if(this.markClose()) { + return; + } + log.info("关闭数据消费者:{} - {}", this.streamId, this.consumerId); + EventPublisher.publishEvent(new MediaDataConsumerCloseEvent(this.consumerId, this.room)); + } + + @Override + public void remove() { + log.info("移除数据消费者:{} - {}", this.streamId, this.consumerId); + this.room.getDataProducers().remove(this.consumerId); + this.dataProducer.getDataConsumers().remove(this.consumerId); + this.consumerClient.getDataConsumers().remove(this.consumerId); + } + + @Override public void log() { log.debug("当前数据消费者:{} - {}", this.consumerId, this.streamId); } diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/party/media/DataProducer.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/party/media/DataProducer.java index 967bb91..8fa9c3d 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/party/media/DataProducer.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/party/media/DataProducer.java @@ -3,6 +3,9 @@ package com.acgist.taoyao.signal.party.media; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import com.acgist.taoyao.signal.event.EventPublisher; +import com.acgist.taoyao.signal.event.media.MediaDataProducerCloseEvent; + import lombok.Getter; import lombok.Setter; import lombok.extern.slf4j.Slf4j; @@ -15,12 +18,8 @@ import lombok.extern.slf4j.Slf4j; @Slf4j @Setter @Getter -public class DataProducer { +public class DataProducer extends OperatorAdapter { - /** - * 生产者终端 - */ - private final ClientWrapper produceClient; /** * 数据流ID */ @@ -29,21 +28,46 @@ public class DataProducer { * 生产者标识 */ private final String producerId; + /** + * 房间 + */ + private final Room room; + /** + * 生产者终端 + */ + private final ClientWrapper producerClient; /** * 消费者 + * 其他终端消费当前终端的消费者 */ private final Map dataConsumers; - public DataProducer(ClientWrapper produceClient, String streamId, String producerId) { - this.produceClient = produceClient; + public DataProducer(String streamId, String producerId, Room room, ClientWrapper producerClient) { this.streamId = streamId; this.producerId = producerId; + this.room = room; + this.producerClient = producerClient; this.dataConsumers = new ConcurrentHashMap<>(); } - /** - * 记录日志 - */ + @Override + public void close() { + if(this.markClose()) { + return; + } + log.info("关闭数据生产者:{} - {}", this.streamId, this.producerId); + this.dataConsumers.values().forEach(DataConsumer::close); + EventPublisher.publishEvent(new MediaDataProducerCloseEvent(this.producerId, this.room)); + } + + @Override + public void remove() { + log.info("移除数据生产者:{} - {}", this.streamId, this.producerId); + this.room.getDataProducers().remove(this.producerId); + this.producerClient.getDataProducers().remove(this.producerId); + } + + @Override public void log() { log.debug("当前数据生产者:{} - {}", this.producerId, this.streamId); this.dataConsumers.values().forEach(DataConsumer::log); diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/party/media/Operator.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/party/media/Operator.java index 13a2cbc..e6eb1c5 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/party/media/Operator.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/party/media/Operator.java @@ -31,4 +31,9 @@ public interface Operator extends Closeable { */ void resume(); + /** + * 记录日志 + */ + void log(); + } diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/party/media/OperatorAdapter.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/party/media/OperatorAdapter.java index 4992dec..a164e5f 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/party/media/OperatorAdapter.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/party/media/OperatorAdapter.java @@ -12,6 +12,14 @@ public abstract class OperatorAdapter implements Operator { */ protected volatile boolean close = false; + @Override + public void close() { + } + + @Override + public void remove() { + } + @Override public void pause() { } @@ -20,6 +28,10 @@ public abstract class OperatorAdapter implements Operator { public void resume() { } + @Override + public void log() { + } + /** * 标记关闭 * diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/party/media/Producer.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/party/media/Producer.java index ed91ebd..3a6c525 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/party/media/Producer.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/party/media/Producer.java @@ -1,11 +1,12 @@ package com.acgist.taoyao.signal.party.media; -import java.io.Closeable; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import com.acgist.taoyao.signal.event.EventPublisher; import com.acgist.taoyao.signal.event.media.MediaProducerCloseEvent; +import com.acgist.taoyao.signal.event.media.MediaProducerPauseEvent; +import com.acgist.taoyao.signal.event.media.MediaProducerResumeEvent; import lombok.Getter; import lombok.Setter; @@ -19,12 +20,8 @@ import lombok.extern.slf4j.Slf4j; @Slf4j @Getter @Setter -public class Producer implements Closeable { +public class Producer extends OperatorAdapter { - /** - * 是否关闭 - */ - private volatile boolean close = false; /** * 媒体类型 */ @@ -51,41 +48,47 @@ public class Producer implements Closeable { */ private final Map consumers; - public Producer(String kind, String streamId, String producerId, Room room, ClientWrapper produceClient) { + public Producer(String kind, String streamId, String producerId, Room room, ClientWrapper producerClient) { this.kind = Kind.of(kind); this.streamId = streamId; this.producerId = producerId; this.room = room; - this.producerClient = produceClient; + this.producerClient = producerClient; this.consumers = new ConcurrentHashMap<>(); } - /** - * 删除消费者 - * - * @param consumerId 消费者ID - */ - public void remove(String consumerId) { - this.consumers.remove(consumerId); - } - @Override public void close() { - if(this.close) { + if(this.markClose()) { return; } - this.close = true; log.info("关闭生产者:{} - {}", this.streamId, this.producerId); - this.consumers.forEach((k, v) -> v.close()); - this.producerClient.getProducers().remove(this.producerId); + this.consumers.values().forEach(Consumer::close); EventPublisher.publishEvent(new MediaProducerCloseEvent(this.producerId, this.room)); } - /** - * 记录日志 - */ + @Override + public void remove() { + log.info("移除生产者:{} - {}", this.streamId, this.producerId); + this.room.getProducers().remove(this.producerId); + this.producerClient.getProducers().remove(this.producerId); + } + + @Override + public void pause() { + log.info("暂停生产者:{} - {}", this.streamId, this.producerId); + EventPublisher.publishEvent(new MediaProducerPauseEvent(this.producerId, this.room)); + } + + @Override + public void resume() { + log.info("恢复生产者:{} - {}", this.streamId, this.producerId); + EventPublisher.publishEvent(new MediaProducerResumeEvent(this.producerId, this.room)); + } + + @Override public void log() { - log.debug("当前生产者:{} - {} - {}", this.producerId, this.kind, this.streamId); + log.info("当前生产者:{} - {} - {}", this.producerId, this.kind, this.streamId); this.consumers.values().forEach(Consumer::log); } diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/party/media/Room.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/party/media/Room.java index 63ab00f..72e9ecc 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/party/media/Room.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/party/media/Room.java @@ -19,6 +19,7 @@ import lombok.extern.slf4j.Slf4j; /** * 房间 * 房间和媒体路由一对一关联 + * 房间媒体媒体路由规则:订阅类型 + 路由类型 * * @author acgist */ @@ -52,6 +53,26 @@ public class Room extends OperatorAdapter { * 终端 */ private final Map clients; + /** + * 通道 + */ + private final Map transports; + /** + * 生产者 + */ + private final Map producers; + /** + * 消费者 + */ + private final Map consumers; + /** + * 数据生产者 + */ + private final Map dataProducers; + /** + * 数据消费者 + */ + private final Map dataConsumers; /** * @param mediaClient 媒体服务 @@ -61,6 +82,22 @@ public class Room extends OperatorAdapter { this.mediaClient = mediaClient; this.roomManager = roomManager; this.clients = new ConcurrentHashMap<>(); + this.transports = new ConcurrentHashMap<>(); + this.producers = new ConcurrentHashMap<>(); + this.consumers = new ConcurrentHashMap<>(); + this.dataProducers = new ConcurrentHashMap<>(); + this.dataConsumers = new ConcurrentHashMap<>(); + } + + /** + * @param client 终端 + * + * @return 是否授权 + */ + public boolean authenticate(Client client) { + return + this.mediaClient == client || + this.clients.containsKey(client); } /** @@ -132,17 +169,6 @@ public class Room extends OperatorAdapter { public Message request(Message message) { return this.mediaClient.request(message); } - - /** - * 广播消息 - * 所有终端以及媒体服务 - * - * @param message 消息 - */ - public void broadcastAll(Message message) { - this.broadcast(message); - this.mediaClient.push(message); - } /** * 广播消息 @@ -198,17 +224,22 @@ public class Room extends OperatorAdapter { .orElse(null); } + /** + * @param transportId 通道ID + * + * @return 通道 + */ + public Transport transport(String transportId) { + return this.transports.get(transportId); + } + /** * @param producerId 生产者ID * * @return 生产者 */ public Producer producer(String producerId) { - return this.clients.values().stream() - .map(wrapper -> wrapper.getProducers().get(producerId)) - .filter(Objects::nonNull) - .findFirst() - .orElse(null); + return this.producers.get(producerId); } /** @@ -217,11 +248,25 @@ public class Room extends OperatorAdapter { * @return 消费者 */ public Consumer consumer(String consumerId) { - return this.clients.values().stream() - .map(wrapper -> wrapper.getConsumers().get(consumerId)) - .filter(Objects::nonNull) - .findFirst() - .orElse(null); + return this.consumers.get(consumerId); + } + + /** + * @param producerId 数据生产者ID + * + * @return 数据生产者 + */ + public DataProducer dataProducer(String producerId) { + return this.dataProducers.get(producerId); + } + + /** + * @param consumerId 数据消费者ID + * + * @return 数据消费者 + */ + public DataConsumer dataConsumer(String consumerId) { + return this.dataConsumers.get(consumerId); } @Override @@ -240,15 +285,23 @@ public class Room extends OperatorAdapter { this.roomManager.remove(this); } - /** - * 记录日志 - */ + @Override public void log() { log.info(""" 当前房间:{} - 终端数量:{}""", + 终端数量:{} + 通道数量:{} + 消费者数量:{} + 生产者数量:{} + 数据消费者数量:{} + 数据生产者数量:{}""", this.roomId, - this.clients.size() + this.clients.size(), + this.transports.size(), + this.consumers.size(), + this.producers.size(), + this.dataConsumers.size(), + this.dataProducers.size() ); this.clients.values().forEach(ClientWrapper::log); } diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/party/media/RouteType.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/party/media/RouteType.java new file mode 100644 index 0000000..9f05665 --- /dev/null +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/party/media/RouteType.java @@ -0,0 +1,17 @@ +package com.acgist.taoyao.signal.party.media; + +/** + * 媒体路由类型 + * + * @author acgist + */ +public enum RouteType { + + // 对讲:只有两个人之间的媒体相互路由 + ONE_TO_ONE, + // 广播:只有一个人的媒体路由到其他人 + ONE_TO_ALL, + // 网播:所有人的媒体相互路由 + ALL_TO_ALL, + +} diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/party/media/SubscribeType.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/party/media/SubscribeType.java new file mode 100644 index 0000000..e360341 --- /dev/null +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/party/media/SubscribeType.java @@ -0,0 +1,42 @@ +package com.acgist.taoyao.signal.party.media; + +/** + * 媒体订阅类型 + * + * @author acgist + */ +public enum SubscribeType { + + // 订阅所有媒体 + ALL, + // 订阅所有音频媒体 + ALL_AUDIO, + // 订阅所有视频媒体 + ALL_VIDEO, + // 没有订阅任何媒体 + NONE; + + public static final SubscribeType of(String value) { + for (SubscribeType type : SubscribeType.values()) { + if(type.name().equalsIgnoreCase(value)) { + return type; + } + } + return SubscribeType.ALL; + } + + /** + * @param producer 生产者 + * + * @return 是否可以消防 + */ + public boolean canConsume(Producer producer) { + return switch (this) { + case NONE -> false; + case ALL_AUDIO -> producer.getKind() == Kind.AUDIO; + case ALL_VIDEO -> producer.getKind() == Kind.VIDEO; + default -> true; + }; + } + +} diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/party/media/Transport.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/party/media/Transport.java index 34e8721..26d9a42 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/party/media/Transport.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/party/media/Transport.java @@ -1,24 +1,42 @@ package com.acgist.taoyao.signal.party.media; -import java.io.Closeable; import java.util.Map; import com.acgist.taoyao.boot.config.Constant; import com.acgist.taoyao.boot.utils.MapUtils; import com.acgist.taoyao.signal.client.Client; +import com.acgist.taoyao.signal.event.EventPublisher; +import com.acgist.taoyao.signal.event.media.TransportCloseEvent; import lombok.Getter; import lombok.Setter; +import lombok.extern.slf4j.Slf4j; /** * 传输通道 + * 注意:正常情况不会调用 * * @author acgist */ +@Slf4j @Getter @Setter -public class Transport implements Closeable { +public class Transport extends OperatorAdapter { + /** + * 方向 + * + * @author acgist + */ + public enum Direction { + + // 接收 + RECV, + // 发送 + SEND; + + } + /** * 房间 */ @@ -39,6 +57,10 @@ public class Transport implements Closeable { * 通道标识 */ private final String transportId; + /** + * 方向 + */ + private final Direction direction; /** * ICE协商 */ @@ -56,8 +78,9 @@ public class Transport implements Closeable { */ private Object sctpParameters; - public Transport(String transportId, Room room, Client client) { + public Transport(String transportId, Direction direction, Room room, Client client) { this.transportId = transportId; + this.direction = direction; this.room = room; this.client = client; this.roomId = room.getRoomId(); @@ -78,7 +101,14 @@ public class Transport implements Closeable { @Override public void close() { - // TODO:实现 + log.info("关闭传输通道:{} - {}", this.transportId, this.direction); + EventPublisher.publishEvent(new TransportCloseEvent(this.transportId, this.room)); + } + + @Override + public void remove() { + log.info("移除传输通道:{} - {}", this.transportId, this.direction); + this.room.getTransports().remove(this.transportId); } } diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/ProtocolManager.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/ProtocolManager.java index d907ab3..d7d7345 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/ProtocolManager.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/ProtocolManager.java @@ -60,9 +60,12 @@ public class ProtocolManager { if(this.protocolMapping.containsKey(signal)) { throw MessageCodeException.of("存在重复信令协议:" + signal); } - log.info("注册信令协议:{} - {} - {}", String.format("%-36s", signal), String.format("%-36s", key), name); + if(log.isDebugEnabled()) { + log.debug("注册信令协议:{} - {} - {}", String.format("%-36s", signal), String.format("%-36s", key), name); + } this.protocolMapping.put(signal, value); }); + log.info("当前注册信令数量:{}", this.protocolMapping.size()); } /** diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/ProtocolRoomAdapter.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/ProtocolRoomAdapter.java index 3c29dce..2fec447 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/ProtocolRoomAdapter.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/ProtocolRoomAdapter.java @@ -28,8 +28,22 @@ public abstract class ProtocolRoomAdapter extends ProtocolClientAdapter { if(room == null) { throw MessageCodeException.of("无效房间:" + roomId); } - // TODO:验证用户是否在房间里面 - this.execute(clientId, clientType, room, client, room.getMediaClient(), message, body); + if(!this.authenticate(room, client)) { + throw MessageCodeException.of("终端没有房间权限:" + clientId); + } + synchronized (room) { + this.execute(clientId, clientType, room, client, room.getMediaClient(), message, body); + } + } + + /** + * @param room 房间 + * @param client 终端 + * + * @return 是否授权 + */ + protected boolean authenticate(Room room, Client client) { + return room.authenticate(client); } /** diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaConsumeProtocol.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaConsumeProtocol.java index 1f17d0e..d0aa676 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaConsumeProtocol.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaConsumeProtocol.java @@ -90,12 +90,14 @@ public class MediaConsumeProtocol extends ProtocolRoomAdapter implements Applica final String consumerId = MapUtils.get(body, Constant.CONSUMER_ID); final String consumerClientId = MapUtils.get(body, Constant.CLIENT_ID); final ClientWrapper consumerClientWrapper = room.clientWrapper(consumerClientId); - final Map consumers = consumerClientWrapper.getConsumers(); + final Map roomConsumers = room.getConsumers(); + final Map clientConsumers = consumerClientWrapper.getConsumers(); final Map producerConsumers = producer.getConsumers(); final Consumer consumer = new Consumer(kind, streamId, consumerId, room, producer, consumerClientWrapper); - final Consumer oldConsumer = consumers.put(producerId, consumer); + final Consumer oldRoomConsumer = roomConsumers.put(consumerId, consumer); + final Consumer oldClientConsumer = clientConsumers.put(consumerId, consumer); final Consumer oldProducerConsumer = producerConsumers.put(consumerId, consumer); - if(oldConsumer != null || oldProducerConsumer != null) { + if(oldRoomConsumer != null || oldClientConsumer != null || oldProducerConsumer != null) { log.warn("消费者已经存在:{}", consumerId); } final Client consumeClient = consumerClientWrapper.getClient(); diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaConsumerCloseProtocol.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaConsumerCloseProtocol.java index 240d730..e2b1c13 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaConsumerCloseProtocol.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaConsumerCloseProtocol.java @@ -17,12 +17,14 @@ import com.acgist.taoyao.signal.party.media.Consumer; import com.acgist.taoyao.signal.party.media.Room; import com.acgist.taoyao.signal.protocol.ProtocolRoomAdapter; +import lombok.extern.slf4j.Slf4j; + /** * 关闭消费者信令 - * 注意:正常情况不会存在关闭消费者的情况,所以一般不用处理关闭消费者信令。 * * @author acgist */ +@Slf4j @Protocol @Description( body = """ @@ -60,6 +62,10 @@ public class MediaConsumerCloseProtocol extends ProtocolRoomAdapter implements A public void execute(String clientId, ClientType clientType, Room room, Client client, Client mediaClient, Message message, Map body) { final String consumerId = MapUtils.get(body, Constant.CONSUMER_ID); final Consumer consumer = room.consumer(consumerId); + if(consumer == null) { + log.debug("消费者无效:{} - {}", consumerId, clientType); + return; + } if(clientType.mediaClient()) { consumer.close(); } else if(clientType.mediaServer()) { diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaConsumerPauseProtocol.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaConsumerPauseProtocol.java index 93bb17f..01cd60f 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaConsumerPauseProtocol.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaConsumerPauseProtocol.java @@ -2,11 +2,18 @@ package com.acgist.taoyao.signal.protocol.media; import java.util.Map; +import org.springframework.context.ApplicationListener; +import org.springframework.scheduling.annotation.Async; + import com.acgist.taoyao.boot.annotation.Description; import com.acgist.taoyao.boot.annotation.Protocol; +import com.acgist.taoyao.boot.config.Constant; import com.acgist.taoyao.boot.model.Message; +import com.acgist.taoyao.boot.utils.MapUtils; import com.acgist.taoyao.signal.client.Client; import com.acgist.taoyao.signal.client.ClientType; +import com.acgist.taoyao.signal.event.media.MediaConsumerPauseEvent; +import com.acgist.taoyao.signal.party.media.Consumer; import com.acgist.taoyao.signal.party.media.Room; import com.acgist.taoyao.signal.protocol.ProtocolRoomAdapter; @@ -25,7 +32,7 @@ import com.acgist.taoyao.signal.protocol.ProtocolRoomAdapter; """, flow = "终端->信令服务->媒体服务->信令服务->终端" ) -public class MediaConsumerPauseProtocol extends ProtocolRoomAdapter { +public class MediaConsumerPauseProtocol extends ProtocolRoomAdapter implements ApplicationListener { public static final String SIGNAL = "media::consumer::pause"; @@ -33,10 +40,24 @@ public class MediaConsumerPauseProtocol extends ProtocolRoomAdapter { super("暂停消费者信令", SIGNAL); } + @Async + @Override + public void onApplicationEvent(MediaConsumerPauseEvent event) { + final Room room = event.getRoom(); + final Client mediaClient = event.getMediaClient(); + final Map body = Map.of( + Constant.ROOM_ID, room.getRoomId(), + Constant.CONSUMER_ID, event.getConsumerId() + ); + mediaClient.push(this.build(body)); + } + @Override public void execute(String clientId, ClientType clientType, Room room, Client client, Client mediaClient, Message message, Map body) { if(clientType.mediaClient()) { - mediaClient.push(message); + final String consumerId = MapUtils.get(body, Constant.CONSUMER_ID); + final Consumer consumer = room.consumer(consumerId); + consumer.pause(); } else if(clientType.mediaServer()) { room.broadcast(message); } else { diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaConsumerResumeProtocol.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaConsumerResumeProtocol.java index 8f70e0e..b8cfa61 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaConsumerResumeProtocol.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaConsumerResumeProtocol.java @@ -2,11 +2,18 @@ package com.acgist.taoyao.signal.protocol.media; import java.util.Map; +import org.springframework.context.ApplicationListener; +import org.springframework.scheduling.annotation.Async; + import com.acgist.taoyao.boot.annotation.Description; import com.acgist.taoyao.boot.annotation.Protocol; +import com.acgist.taoyao.boot.config.Constant; import com.acgist.taoyao.boot.model.Message; +import com.acgist.taoyao.boot.utils.MapUtils; import com.acgist.taoyao.signal.client.Client; import com.acgist.taoyao.signal.client.ClientType; +import com.acgist.taoyao.signal.event.media.MediaConsumerResumeEvent; +import com.acgist.taoyao.signal.party.media.Consumer; import com.acgist.taoyao.signal.party.media.Room; import com.acgist.taoyao.signal.protocol.ProtocolRoomAdapter; @@ -25,7 +32,7 @@ import com.acgist.taoyao.signal.protocol.ProtocolRoomAdapter; """, flow = "终端->信令服务->媒体服务->信令服务->终端" ) -public class MediaConsumerResumeProtocol extends ProtocolRoomAdapter { +public class MediaConsumerResumeProtocol extends ProtocolRoomAdapter implements ApplicationListener { public static final String SIGNAL = "media::consumer::resumt"; @@ -33,10 +40,24 @@ public class MediaConsumerResumeProtocol extends ProtocolRoomAdapter { super("恢复消费者信令", SIGNAL); } + @Async + @Override + public void onApplicationEvent(MediaConsumerResumeEvent event) { + final Room room = event.getRoom(); + final Client mediaClient = event.getMediaClient(); + final Map body = Map.of( + Constant.ROOM_ID, room.getRoomId(), + Constant.CONSUMER_ID, event.getConsumerId() + ); + mediaClient.push(this.build(body)); + } + @Override public void execute(String clientId, ClientType clientType, Room room, Client client, Client mediaClient, Message message, Map body) { if(clientType.mediaClient()) { - mediaClient.push(message); + final String consumerId = MapUtils.get(body, Constant.CONSUMER_ID); + final Consumer consumer = room.consumer(consumerId); + consumer.pause(); } else if(clientType.mediaServer()) { room.broadcast(message); } else { diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaProduceDataProtocol.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaDataConsumeProtocol.java similarity index 56% rename from taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaProduceDataProtocol.java rename to taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaDataConsumeProtocol.java index 7b1d0a1..f6d34d8 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaProduceDataProtocol.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaDataConsumeProtocol.java @@ -1,5 +1,5 @@ package com.acgist.taoyao.signal.protocol.media; -public class MediaProduceDataProtocol { +public class MediaDataConsumeProtocol { } diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaDataConsumerCloseProtocol.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaDataConsumerCloseProtocol.java new file mode 100644 index 0000000..84ebb36 --- /dev/null +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaDataConsumerCloseProtocol.java @@ -0,0 +1,79 @@ +package com.acgist.taoyao.signal.protocol.media; + +import java.util.Map; + +import org.springframework.context.ApplicationListener; +import org.springframework.scheduling.annotation.Async; + +import com.acgist.taoyao.boot.annotation.Description; +import com.acgist.taoyao.boot.annotation.Protocol; +import com.acgist.taoyao.boot.config.Constant; +import com.acgist.taoyao.boot.model.Message; +import com.acgist.taoyao.boot.utils.MapUtils; +import com.acgist.taoyao.signal.client.Client; +import com.acgist.taoyao.signal.client.ClientType; +import com.acgist.taoyao.signal.event.media.MediaDataConsumerCloseEvent; +import com.acgist.taoyao.signal.party.media.DataConsumer; +import com.acgist.taoyao.signal.party.media.Room; +import com.acgist.taoyao.signal.protocol.ProtocolRoomAdapter; + +import lombok.extern.slf4j.Slf4j; + +/** + * 关闭数据消费者信令 + * + * @author acgist + */ +@Slf4j +@Protocol +@Description( + body = """ + { + "roomId": "房间ID" + "consumerId": "数据消费者ID" + } + """, + flow = { + "媒体服务->信令服务-)终端", + "终端->信令服务->媒体服务->信令服务+)终端" + } +) +public class MediaDataConsumerCloseProtocol extends ProtocolRoomAdapter implements ApplicationListener { + + public static final String SIGNAL = "media::data::consumer::close"; + + public MediaDataConsumerCloseProtocol() { + super("关闭数据消费者信令", SIGNAL); + } + + @Async + @Override + public void onApplicationEvent(MediaDataConsumerCloseEvent event) { + final Room room = event.getRoom(); + final Client mediaClient = event.getMediaClient(); + final Map body = Map.of( + Constant.ROOM_ID, room.getRoomId(), + Constant.CONSUMER_ID, event.getConsumerId() + ); + mediaClient.push(this.build(body)); + } + + @Override + public void execute(String clientId, ClientType clientType, Room room, Client client, Client mediaClient, Message message, Map body) { + final String consumerId = MapUtils.get(body, Constant.CONSUMER_ID); + final DataConsumer dataConsumer = room.dataConsumer(consumerId); + if(dataConsumer == null) { + log.debug("数据消费者无效:{} - {}", consumerId, clientType); + return; + } + if(clientType.mediaClient()) { + dataConsumer.close(); + } else if(clientType.mediaServer()) { + dataConsumer.remove(); + room.broadcast(message); + } else { + this.logNoAdapter(clientType); + } + } + +} diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaConsumerDataStatusProtocol.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaDataConsumerStatusProtocol.java similarity index 53% rename from taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaConsumerDataStatusProtocol.java rename to taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaDataConsumerStatusProtocol.java index b76c4d5..2ec4c88 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaConsumerDataStatusProtocol.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaDataConsumerStatusProtocol.java @@ -1,5 +1,5 @@ package com.acgist.taoyao.signal.protocol.media; -public class MediaConsumerDataStatusProtocol { +public class MediaDataConsumerStatusProtocol { } diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaProducerDataStatusProtocol.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaDataProduceProtocol.java similarity index 53% rename from taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaProducerDataStatusProtocol.java rename to taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaDataProduceProtocol.java index 993e054..a25e6cf 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaProducerDataStatusProtocol.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaDataProduceProtocol.java @@ -1,5 +1,5 @@ package com.acgist.taoyao.signal.protocol.media; -public class MediaProducerDataStatusProtocol { +public class MediaDataProduceProtocol { } diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaDataProducerCloseProtocol.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaDataProducerCloseProtocol.java new file mode 100644 index 0000000..bb8d2ff --- /dev/null +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaDataProducerCloseProtocol.java @@ -0,0 +1,76 @@ +package com.acgist.taoyao.signal.protocol.media; + +import java.util.Map; + +import org.springframework.context.ApplicationListener; +import org.springframework.scheduling.annotation.Async; + +import com.acgist.taoyao.boot.annotation.Description; +import com.acgist.taoyao.boot.annotation.Protocol; +import com.acgist.taoyao.boot.config.Constant; +import com.acgist.taoyao.boot.model.Message; +import com.acgist.taoyao.boot.utils.MapUtils; +import com.acgist.taoyao.signal.client.Client; +import com.acgist.taoyao.signal.client.ClientType; +import com.acgist.taoyao.signal.event.media.MediaDataProducerCloseEvent; +import com.acgist.taoyao.signal.party.media.DataProducer; +import com.acgist.taoyao.signal.party.media.Room; +import com.acgist.taoyao.signal.protocol.ProtocolRoomAdapter; + +import lombok.extern.slf4j.Slf4j; + +/** + * 关闭数据生产者信令 + * + * @author acgist + */ +@Slf4j +@Protocol +@Description( + body = """ + { + "roomId": "房间ID" + "consumerId": "数据生产者ID" + } + """, + flow = "终端->信令服务->媒体服务->信令服务+)终端" +) +public class MediaDataProducerCloseProtocol extends ProtocolRoomAdapter implements ApplicationListener { + + public static final String SIGNAL = "media::data::producer::close"; + + public MediaDataProducerCloseProtocol() { + super("关闭数据生产者信令", SIGNAL); + } + + @Async + @Override + public void onApplicationEvent(MediaDataProducerCloseEvent event) { + final Room room = event.getRoom(); + final Client mediaClient = event.getMediaClient(); + final Map body = Map.of( + Constant.ROOM_ID, room.getRoomId(), + Constant.PRODUCER_ID, event.getProducerId() + ); + mediaClient.push(this.build(body)); + } + + @Override + public void execute(String clientId, ClientType clientType, Room room, Client client, Client mediaClient, Message message, Map body) { + final String producerId = MapUtils.get(body, Constant.PRODUCER_ID); + final DataProducer dataProducer = room.dataProducer(producerId); + if(dataProducer == null) { + log.debug("数据生产者无效:{} - {}", producerId, clientType); + return; + } + if(clientType.mediaClient()) { + dataProducer.close(); + } else if(clientType.mediaServer()) { + dataProducer.remove(); + room.broadcast(message); + } else { + this.logNoAdapter(clientType); + } + } + +} diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaDataProducerStatusProtocol.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaDataProducerStatusProtocol.java new file mode 100644 index 0000000..142e096 --- /dev/null +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaDataProducerStatusProtocol.java @@ -0,0 +1,5 @@ +package com.acgist.taoyao.signal.protocol.media; + +public class MediaDataProducerStatusProtocol { + +} diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaProduceProtocol.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaProduceProtocol.java index 8175dcf..880f762 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaProduceProtocol.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaProduceProtocol.java @@ -47,31 +47,35 @@ public class MediaProduceProtocol extends ProtocolRoomAdapter { @Override public void execute(String clientId, ClientType clientType, Room room, Client client, Client mediaClient, Message message, Map body) { - // TODO;类型判断? - final String kind = MapUtils.get(body, Constant.KIND); - final String streamId = kind + "::" + clientId; - body.put(Constant.CLIENT_ID, clientId); - body.put(Constant.STREAM_ID, streamId); - final Message response = room.request(message); - final Map responseBody = response.body(); - final String producerId = MapUtils.get(responseBody, Constant.PRODUCER_ID); - final ClientWrapper clientWrapper = room.clientWrapper(client); - final Map producers = clientWrapper.getProducers(); - final Producer producer = producers.computeIfAbsent(producerId, key -> new Producer(kind, streamId, producerId, room, clientWrapper)); - final Message responseMessage = response.cloneWithoutBody(); - responseMessage.setBody(Map.of( - Constant.KIND, kind, - Constant.STREAM_ID, streamId, - Constant.PRODUCER_ID, producerId - )); - // 根据不同类型进行推送: - // 自动推送:不用广播 - // 音频全收:广播视频 - // 视频全收:广播音频 - // 全部不收:全部广播 - room.broadcast(responseMessage); - log.info("{}生产媒体:{} - {}", clientId, streamId, producerId); - this.publishEvent(new MediaConsumeEvent(room, producer)); + if(clientType.mediaClient()) { + final String kind = MapUtils.get(body, Constant.KIND); + final String streamId = kind + "::" + clientId; + body.put(Constant.CLIENT_ID, clientId); + body.put(Constant.STREAM_ID, streamId); + final Message response = room.request(message); + final Map responseBody = response.body(); + final String producerId = MapUtils.get(responseBody, Constant.PRODUCER_ID); + final ClientWrapper producerClientWrapper = room.clientWrapper(client); + final Map roomProducers = room.getProducers(); + final Map clientProducers = producerClientWrapper.getProducers(); + final Producer producer = new Producer(kind, streamId, producerId, room, producerClientWrapper); + final Producer oldRoomProducer = roomProducers.put(producerId, producer); + final Producer oldClientProducer = clientProducers.put(producerId, producer); + if(oldRoomProducer != null || oldClientProducer != null) { + log.warn("生产者已经存在:{}", producerId); + } + final Message responseMessage = response.cloneWithoutBody(); + responseMessage.setBody(Map.of( + Constant.KIND, kind, + Constant.STREAM_ID, streamId, + Constant.PRODUCER_ID, producerId + )); + room.broadcast(responseMessage); + log.info("{}生产媒体:{} - {}", clientId, streamId, producerId); + this.publishEvent(new MediaConsumeEvent(room, producer)); + } else { + this.logNoAdapter(clientType); + } } } diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaProducerCloseProtocol.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaProducerCloseProtocol.java index 5ef82a8..a6ab655 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaProducerCloseProtocol.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaProducerCloseProtocol.java @@ -30,10 +30,10 @@ import lombok.extern.slf4j.Slf4j; body = """ { "roomId": "房间ID" - "consumerId": "消费者ID" + "consumerId": "生产者ID" } """, - flow = "终端->信令服务+)终端" + flow = "终端->信令服务->媒体服务->信令服务+)终端" ) public class MediaProducerCloseProtocol extends ProtocolRoomAdapter implements ApplicationListener { @@ -47,11 +47,12 @@ public class MediaProducerCloseProtocol extends ProtocolRoomAdapter implements A @Override public void onApplicationEvent(MediaProducerCloseEvent event) { final Room room = event.getRoom(); + final Client mediaClient = event.getMediaClient(); final Map body = Map.of( Constant.ROOM_ID, room.getRoomId(), Constant.PRODUCER_ID, event.getProducerId() ); - room.broadcastAll(this.build(body)); + mediaClient.push(this.build(body)); } @Override @@ -59,9 +60,16 @@ public class MediaProducerCloseProtocol extends ProtocolRoomAdapter implements A final String producerId = MapUtils.get(body, Constant.PRODUCER_ID); final Producer producer = room.producer(producerId); if(producer == null) { - log.warn("关闭生产者无效:{}", producerId); - } else { + log.debug("生产者无效:{} - {}", producerId, clientType); + return; + } + if(clientType.mediaClient()) { producer.close(); + } else if(clientType.mediaServer()) { + producer.remove(); + room.broadcast(message); + } else { + this.logNoAdapter(clientType); } } diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaProducerPauseProtocol.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaProducerPauseProtocol.java index de4e752..f6f6a37 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaProducerPauseProtocol.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaProducerPauseProtocol.java @@ -2,11 +2,17 @@ package com.acgist.taoyao.signal.protocol.media; import java.util.Map; +import org.springframework.context.ApplicationListener; + import com.acgist.taoyao.boot.annotation.Description; import com.acgist.taoyao.boot.annotation.Protocol; +import com.acgist.taoyao.boot.config.Constant; import com.acgist.taoyao.boot.model.Message; +import com.acgist.taoyao.boot.utils.MapUtils; import com.acgist.taoyao.signal.client.Client; import com.acgist.taoyao.signal.client.ClientType; +import com.acgist.taoyao.signal.event.media.MediaProducerPauseEvent; +import com.acgist.taoyao.signal.party.media.Producer; import com.acgist.taoyao.signal.party.media.Room; import com.acgist.taoyao.signal.protocol.ProtocolRoomAdapter; @@ -25,7 +31,7 @@ import com.acgist.taoyao.signal.protocol.ProtocolRoomAdapter; """, flow = "终端->信令服务->媒体服务->信令服务->终端" ) -public class MediaProducerPauseProtocol extends ProtocolRoomAdapter { +public class MediaProducerPauseProtocol extends ProtocolRoomAdapter implements ApplicationListener { public static final String SIGNAL = "media::producer::pause"; @@ -33,10 +39,23 @@ public class MediaProducerPauseProtocol extends ProtocolRoomAdapter { super("暂停生产者信令", SIGNAL); } + @Override + public void onApplicationEvent(MediaProducerPauseEvent event) { + final Room room = event.getRoom(); + final Client mediaClient = event.getMediaClient(); + final Map body = Map.of( + Constant.ROOM_ID, room.getRoomId(), + Constant.PRODUCER_ID, event.getProducerId() + ); + mediaClient.push(this.build(body)); + } + @Override public void execute(String clientId, ClientType clientType, Room room, Client client, Client mediaClient, Message message, Map body) { if(clientType.mediaClient()) { - mediaClient.push(message); + final String producerId = MapUtils.get(body, Constant.PRODUCER_ID); + final Producer producer = room.producer(producerId); + producer.pause(); } else if(clientType.mediaServer()) { room.broadcast(message); } else { diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaProducerResumeProtocol.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaProducerResumeProtocol.java index 663fd32..2085f11 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaProducerResumeProtocol.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaProducerResumeProtocol.java @@ -2,11 +2,17 @@ package com.acgist.taoyao.signal.protocol.media; import java.util.Map; +import org.springframework.context.ApplicationListener; + import com.acgist.taoyao.boot.annotation.Description; import com.acgist.taoyao.boot.annotation.Protocol; +import com.acgist.taoyao.boot.config.Constant; import com.acgist.taoyao.boot.model.Message; +import com.acgist.taoyao.boot.utils.MapUtils; import com.acgist.taoyao.signal.client.Client; import com.acgist.taoyao.signal.client.ClientType; +import com.acgist.taoyao.signal.event.media.MediaProducerResumeEvent; +import com.acgist.taoyao.signal.party.media.Producer; import com.acgist.taoyao.signal.party.media.Room; import com.acgist.taoyao.signal.protocol.ProtocolRoomAdapter; @@ -25,7 +31,7 @@ import com.acgist.taoyao.signal.protocol.ProtocolRoomAdapter; """, flow = "终端->信令服务->媒体服务->信令服务->终端" ) -public class MediaProducerResumeProtocol extends ProtocolRoomAdapter { +public class MediaProducerResumeProtocol extends ProtocolRoomAdapter implements ApplicationListener { public static final String SIGNAL = "media::producer::resume"; @@ -33,10 +39,23 @@ public class MediaProducerResumeProtocol extends ProtocolRoomAdapter { super("恢复生产者信令", SIGNAL); } + @Override + public void onApplicationEvent(MediaProducerResumeEvent event) { + final Room room = event.getRoom(); + final Client mediaClient = event.getMediaClient(); + final Map body = Map.of( + Constant.ROOM_ID, room.getRoomId(), + Constant.PRODUCER_ID, event.getProducerId() + ); + mediaClient.push(this.build(body)); + } + @Override public void execute(String clientId, ClientType clientType, Room room, Client client, Client mediaClient, Message message, Map body) { if(clientType.mediaClient()) { - mediaClient.push(message); + final String producerId = MapUtils.get(body, Constant.PRODUCER_ID); + final Producer producer = room.producer(producerId); + producer.resume(); } else if(clientType.mediaServer()) { room.broadcast(message); } else { diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaRouterRtpCapabilitiesProtocol.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaRouterRtpCapabilitiesProtocol.java index dc1ff24..13fc29a 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaRouterRtpCapabilitiesProtocol.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaRouterRtpCapabilitiesProtocol.java @@ -39,6 +39,11 @@ public class MediaRouterRtpCapabilitiesProtocol extends ProtocolRoomAdapter { public MediaRouterRtpCapabilitiesProtocol() { super("路由RTP协商信令", SIGNAL); } + + @Override + protected boolean authenticate(Room room, Client client) { + return true; + } @Override public void execute(String clientId, ClientType clientType, Room room, Client client, Client mediaClient, Message message, Map body) { diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaTransportCloseProtocol.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaTransportCloseProtocol.java new file mode 100644 index 0000000..32195db --- /dev/null +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaTransportCloseProtocol.java @@ -0,0 +1,76 @@ +package com.acgist.taoyao.signal.protocol.media; + +import java.util.Map; + +import org.springframework.context.ApplicationListener; +import org.springframework.scheduling.annotation.Async; + +import com.acgist.taoyao.boot.annotation.Description; +import com.acgist.taoyao.boot.annotation.Protocol; +import com.acgist.taoyao.boot.config.Constant; +import com.acgist.taoyao.boot.model.Message; +import com.acgist.taoyao.boot.utils.MapUtils; +import com.acgist.taoyao.signal.client.Client; +import com.acgist.taoyao.signal.client.ClientType; +import com.acgist.taoyao.signal.event.media.TransportCloseEvent; +import com.acgist.taoyao.signal.party.media.Room; +import com.acgist.taoyao.signal.party.media.Transport; +import com.acgist.taoyao.signal.protocol.ProtocolRoomAdapter; + +import lombok.extern.slf4j.Slf4j; + +/** + * 关闭通道信令 + * + * @author acgist + */ +@Slf4j +@Protocol +@Description( + body = """ + { + "roomId": "房间ID" + "transportId": "通道ID" + } + """, + flow = "终端->信令服务->媒体服务->信令服务+)终端" +) +public class MediaTransportCloseProtocol extends ProtocolRoomAdapter implements ApplicationListener { + + public static final String SIGNAL = "media::transport::close"; + + public MediaTransportCloseProtocol() { + super("关闭通道信令", SIGNAL); + } + + @Async + @Override + public void onApplicationEvent(TransportCloseEvent event) { + final Room room = event.getRoom(); + final Client mediaClient = event.getMediaClient(); + final Map body = Map.of( + Constant.ROOM_ID, room.getRoomId(), + Constant.TRANSPORT_ID, event.getTransportId() + ); + mediaClient.push(this.build(body)); + } + + @Override + public void execute(String clientId, ClientType clientType, Room room, Client client, Client mediaClient, Message message, Map body) { + final String transportId = MapUtils.get(body, Constant.TRANSPORT_ID); + final Transport transport = room.transport(transportId); + if(transport == null) { + log.debug("通道无效:{} - {}", transportId, clientType); + return; + } + if(clientType.mediaClient()) { + transport.close(); + } else if(clientType.mediaServer()) { + transport.remove(); + room.broadcast(message); + } else { + this.logNoAdapter(clientType); + } + } + +} diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaPlainCreateProtocol.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaTransportPlainCreateProtocol.java similarity index 52% rename from taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaPlainCreateProtocol.java rename to taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaTransportPlainCreateProtocol.java index 54fdcb4..13cab55 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaPlainCreateProtocol.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaTransportPlainCreateProtocol.java @@ -1,10 +1,10 @@ package com.acgist.taoyao.signal.protocol.media; /** - * RTP + * 创建RTP通道信令 * * @author acgist */ -public class MediaPlainCreateProtocol { +public class MediaTransportPlainCreateProtocol { } diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaTransportWebRtcConnectProtocol.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaTransportWebRtcConnectProtocol.java index d764c88..24ae279 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaTransportWebRtcConnectProtocol.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaTransportWebRtcConnectProtocol.java @@ -43,7 +43,7 @@ public class MediaTransportWebRtcConnectProtocol extends ProtocolRoomAdapter { final Map responseBody = response.body(); client.push(response); final String transportId = MapUtils.get(responseBody, Constant.TRANSPORT_ID); - log.info("{} 连接WebRTC信令通道:{}", clientId, transportId); + log.info("{}连接WebRTC信令通道:{}", clientId, transportId); } else { // 忽略其他情况 } diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaTransportWebRtcCreateProtocol.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaTransportWebRtcCreateProtocol.java index 8a91b43..40a036f 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaTransportWebRtcCreateProtocol.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaTransportWebRtcCreateProtocol.java @@ -18,6 +18,7 @@ import com.acgist.taoyao.signal.event.media.MediaConsumeEvent; import com.acgist.taoyao.signal.party.media.ClientWrapper; import com.acgist.taoyao.signal.party.media.Room; import com.acgist.taoyao.signal.party.media.Transport; +import com.acgist.taoyao.signal.party.media.Transport.Direction; import com.acgist.taoyao.signal.protocol.ProtocolRoomAdapter; import lombok.extern.slf4j.Slf4j; @@ -60,6 +61,7 @@ public class MediaTransportWebRtcCreateProtocol extends ProtocolRoomAdapter { body.put(Constant.CLIENT_ID, clientId); final Message response = room.request(message); final Map responseBody = response.body(); + final Map transports = room.getTransports(); final String transportId = MapUtils.get(responseBody, Constant.TRANSPORT_ID); // 重写地址 this.rewriteIp(client.ip(), responseBody); @@ -70,9 +72,12 @@ public class MediaTransportWebRtcCreateProtocol extends ProtocolRoomAdapter { if(Boolean.TRUE.equals(consuming)) { Transport recvTransport = clientWrapper.getRecvTransport(); if(recvTransport == null) { - recvTransport = new Transport(transportId, room, client); - clientWrapper.setRecvTransport(recvTransport); + recvTransport = new Transport(transportId, Direction.RECV, room, client); + transports.put(transportId, recvTransport); + } else { + log.warn("接收通道已经存在:{}", transportId); } + clientWrapper.setRecvTransport(recvTransport); // 拷贝属性 recvTransport.copy(responseBody); this.publishEvent(new MediaConsumeEvent(room, clientWrapper)); @@ -82,14 +87,17 @@ public class MediaTransportWebRtcCreateProtocol extends ProtocolRoomAdapter { if(Boolean.TRUE.equals(producing)) { Transport sendTransport = clientWrapper.getSendTransport(); if(sendTransport == null) { - sendTransport = new Transport(transportId, room, client); - clientWrapper.setSendTransport(sendTransport); + sendTransport = new Transport(transportId, Direction.SEND, room, client); + transports.put(transportId, sendTransport); + } else { + log.warn("发送通道已经存在:{}", transportId); } + clientWrapper.setSendTransport(sendTransport); // 拷贝属性 sendTransport.copy(responseBody); } client.push(response); - log.info("{} 创建WebRTC信令通道:{}", clientId, transportId); + log.info("{}创建WebRTC信令通道:{}", clientId, transportId); } /** diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/room/RoomEnterProtocol.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/room/RoomEnterProtocol.java index 73dce98..217ce3b 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/room/RoomEnterProtocol.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/room/RoomEnterProtocol.java @@ -15,8 +15,8 @@ import com.acgist.taoyao.signal.client.Client; import com.acgist.taoyao.signal.client.ClientType; import com.acgist.taoyao.signal.event.room.RoomEnterEvent; import com.acgist.taoyao.signal.party.media.ClientWrapper; -import com.acgist.taoyao.signal.party.media.ClientWrapper.SubscribeType; import com.acgist.taoyao.signal.party.media.Room; +import com.acgist.taoyao.signal.party.media.SubscribeType; import com.acgist.taoyao.signal.protocol.ProtocolRoomAdapter; /** @@ -50,6 +50,27 @@ public class RoomEnterProtocol extends ProtocolRoomAdapter { super("进入房间信令", SIGNAL); } + @Override + public boolean authenticate(Message message) { + final Map body = message.body(); + final String roomId = MapUtils.get(body, Constant.ROOM_ID); + final String password = MapUtils.get(body, Constant.PASSWORD); + final Room room = this.roomManager.room(roomId); + if(room == null) { + throw MessageCodeException.of("无效房间:" + roomId); + } + final String roomPassowrd = room.getPassword(); + if(StringUtils.isEmpty(roomPassowrd) || roomPassowrd.equals(password)) { + return true; + } + throw MessageCodeException.of(MessageCode.CODE_3401, "密码错误"); + } + + @Override + public boolean authenticate(Room room, Client client) { + return true; + } + @Override public void execute(String clientId, ClientType clientType, Room room, Client client, Client mediaClient, Message message, Map body) { if(clientType.mediaClient()) { @@ -69,11 +90,6 @@ public class RoomEnterProtocol extends ProtocolRoomAdapter { * @param body 消息主体 */ private void enter(String clientId, Room room, Client client, Message message, Map body) { - final String password = MapUtils.get(body, Constant.PASSWORD); - final String roomPassowrd = room.getPassword(); - if(StringUtils.isNotEmpty(roomPassowrd) && !roomPassowrd.equals(password)) { - throw MessageCodeException.of(MessageCode.CODE_3401, "密码错误"); - } final String subscribeType = MapUtils.get(body, Constant.SUBSCRIBE_TYPE); final Object rtpCapabilities = MapUtils.get(body, Constant.RTP_CAPABILITIES); final Object sctpCapabilities = MapUtils.get(body, Constant.SCTP_CAPABILITIES); diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/room/RoomLeaveProtocol.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/room/RoomLeaveProtocol.java index d387f18..fac12f8 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/room/RoomLeaveProtocol.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/room/RoomLeaveProtocol.java @@ -43,24 +43,19 @@ public class RoomLeaveProtocol extends ProtocolRoomAdapter implements Applicatio @Async @Override public void onApplicationEvent(RoomLeaveEvent event) { - this.leave(event.getRoom(), event.getClient()); + final Room room = event.getRoom(); + final Client client = event.getClient(); + final Map body = Map.of(Constant.CLIENT_ID, client.clientId()); + room.broadcast(client, this.build(body)); } @Override public void execute(String clientId, ClientType clientType, Room room, Client client, Client mediaClient, Message message, Map body) { - // 离开房间后会发布事件 - room.leave(client); - } - - /** - * 离开房间 - * - * @param room 房间 - * @param client 终端 - */ - private void leave(Room room, Client client) { - final Message leaveMessage = this.build(Map.of(Constant.CLIENT_ID, client.clientId())); - room.broadcast(client, leaveMessage); + if(clientType.mediaClient()) { + room.leave(client); + } else { + this.logNoAdapter(clientType); + } } } diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/utils/CipherUtils.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/utils/CipherUtils.java new file mode 100644 index 0000000..994c84e --- /dev/null +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/utils/CipherUtils.java @@ -0,0 +1,51 @@ +package com.acgist.taoyao.signal.utils; + +import java.security.InvalidKeyException; +import java.security.NoSuchAlgorithmException; +import java.util.Base64; + +import javax.crypto.Cipher; +import javax.crypto.NoSuchPaddingException; +import javax.crypto.spec.SecretKeySpec; + +import org.apache.commons.lang3.StringUtils; + +import com.acgist.taoyao.boot.config.SocketProperties.Encrypt; + +import lombok.extern.slf4j.Slf4j; + +/** + * 加密工具 + * + * @author acgist + */ +@Slf4j +public class CipherUtils { + + private CipherUtils() { + } + + /** + * @param mode 模式 + * @param encrypt 算法 + * @param key 密钥 + * + * @return 加密工具 + */ + public static final Cipher buildCipher(int mode, Encrypt encrypt, String key) { + if(encrypt == null || encrypt == Encrypt.PLAINTEXT || StringUtils.isEmpty(key)) { + return null; + } + try { + final String algo = encrypt.getAlgo(); + final String name = encrypt.name(); + final Cipher cipher = Cipher.getInstance(algo); + cipher.init(mode, new SecretKeySpec(Base64.getMimeDecoder().decode(key), name)); + return cipher; + } catch (NoSuchAlgorithmException | NoSuchPaddingException | InvalidKeyException e) { + log.error("创建加密工具异常:{} - {} - {}", mode, encrypt, key, e); + } + return null; + } + +}