From 78fa48c35c01be72cb54242f7d15b3646b7e5cf9 Mon Sep 17 00:00:00 2001 From: acgist <289547414@qq.com> Date: Sun, 12 Feb 2023 21:16:08 +0800 Subject: [PATCH] [+] socket signal --- taoyao-client-web/vite.config.js | 1 + .../boot/config/BootAutoConfiguration.java | 2 + .../boot/property/SocketProperties.java | 52 +++++++++ .../boot/property/TaoyaoProperties.java | 20 ++-- .../taoyao/boot/utils/CloseableUtils.java | 44 ++++++++ .../src/main/resources/application.yml | 18 +++- .../taoyao/signal/SocketSignalTest.java | 22 ++++ ...gnalTest.java => WebSocketSignalTest.java} | 2 +- .../client/socket/SocketAcceptHandler.java | 42 ++++++++ .../signal/client/socket/SocketClient.java | 35 +++--- .../client/socket/SocketMessageHandler.java | 76 +++++++++++++ .../signal/client/socket/SocketSignal.java | 101 ++++++++++++++++++ .../config/SignalAutoConfiguration.java | 30 ++++++ .../protocol/ProtocolMediaRoomAdapter.java | 2 + 14 files changed, 412 insertions(+), 35 deletions(-) create mode 100644 taoyao-signal-server/taoyao-boot/src/main/java/com/acgist/taoyao/boot/property/SocketProperties.java create mode 100644 taoyao-signal-server/taoyao-boot/src/main/java/com/acgist/taoyao/boot/utils/CloseableUtils.java create mode 100644 taoyao-signal-server/taoyao-server/src/test/java/com/acgist/taoyao/signal/SocketSignalTest.java rename taoyao-signal-server/taoyao-server/src/test/java/com/acgist/taoyao/signal/{SignalTest.java => WebSocketSignalTest.java} (98%) create mode 100644 taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/client/socket/SocketAcceptHandler.java create mode 100644 taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/client/socket/SocketMessageHandler.java diff --git a/taoyao-client-web/vite.config.js b/taoyao-client-web/vite.config.js index ad59863..f8f923a 100644 --- a/taoyao-client-web/vite.config.js +++ b/taoyao-client-web/vite.config.js @@ -5,6 +5,7 @@ import { fileURLToPath, URL } from "node:url"; export default defineConfig({ plugins: [vue()], server: { + port: 8443, host: "0.0.0.0", }, resolve: { diff --git a/taoyao-signal-server/taoyao-boot/src/main/java/com/acgist/taoyao/boot/config/BootAutoConfiguration.java b/taoyao-signal-server/taoyao-boot/src/main/java/com/acgist/taoyao/boot/config/BootAutoConfiguration.java index 2b8f93d..421d129 100644 --- a/taoyao-signal-server/taoyao-boot/src/main/java/com/acgist/taoyao/boot/config/BootAutoConfiguration.java +++ b/taoyao-signal-server/taoyao-boot/src/main/java/com/acgist/taoyao/boot/config/BootAutoConfiguration.java @@ -52,6 +52,7 @@ import com.acgist.taoyao.boot.property.IdProperties; import com.acgist.taoyao.boot.property.MediaProperties; import com.acgist.taoyao.boot.property.ScriptProperties; import com.acgist.taoyao.boot.property.SecurityProperties; +import com.acgist.taoyao.boot.property.SocketProperties; import com.acgist.taoyao.boot.property.TaoyaoProperties; import com.acgist.taoyao.boot.property.WebrtcProperties; import com.acgist.taoyao.boot.service.IdService; @@ -82,6 +83,7 @@ import lombok.extern.slf4j.Slf4j; IdProperties.class, MediaProperties.class, ScriptProperties.class, + SocketProperties.class, TaoyaoProperties.class, WebrtcProperties.class, SecurityProperties.class diff --git a/taoyao-signal-server/taoyao-boot/src/main/java/com/acgist/taoyao/boot/property/SocketProperties.java b/taoyao-signal-server/taoyao-boot/src/main/java/com/acgist/taoyao/boot/property/SocketProperties.java new file mode 100644 index 0000000..9b8e936 --- /dev/null +++ b/taoyao-signal-server/taoyao-boot/src/main/java/com/acgist/taoyao/boot/property/SocketProperties.java @@ -0,0 +1,52 @@ +package com.acgist.taoyao.boot.property; + +import org.springframework.boot.context.properties.ConfigurationProperties; + +import lombok.Getter; +import lombok.Setter; + +/** + * Socket信令配置 + * + * @author acgist + */ +@Getter +@Setter +@ConfigurationProperties(prefix = "taoyao.socket") +public class SocketProperties { + + /** + * 是否启用 + */ + private Boolean enabled; + /** + * 监听地址 + */ + private String host; + /** + * 监听端口 + */ + private Integer port; + /** + * 线程队列长度 + */ + private Integer queueSize; + /** + * 最小线程数量 + */ + private Integer threadMin; + /** + * 最大线程数量 + */ + private Integer threadMax; + /** + * 线程池的前缀 + */ + private String threadNamePrefix; + /** + * 线程销毁时间 + */ + private Integer keepAliveTime; + + +} diff --git a/taoyao-signal-server/taoyao-boot/src/main/java/com/acgist/taoyao/boot/property/TaoyaoProperties.java b/taoyao-signal-server/taoyao-boot/src/main/java/com/acgist/taoyao/boot/property/TaoyaoProperties.java index 01feab7..9afceb6 100644 --- a/taoyao-signal-server/taoyao-boot/src/main/java/com/acgist/taoyao/boot/property/TaoyaoProperties.java +++ b/taoyao-signal-server/taoyao-boot/src/main/java/com/acgist/taoyao/boot/property/TaoyaoProperties.java @@ -26,16 +26,6 @@ public class TaoyaoProperties { */ @Schema(title = "项目名称", description = "项目名称") private String name; - /** - * 超时时间 - */ - @Schema(title = "超时时间", description = "超时时间") - private Long timeout; - /** - * 最大超时时间 - */ - @Schema(title = "最大超时时间", description = "最大超时时间") - private Long maxTimeout; /** * 项目版本 */ @@ -46,5 +36,15 @@ public class TaoyaoProperties { */ @Schema(title = "项目描述", description = "项目描述") private String description; + /** + * 超时时间 + */ + @Schema(title = "超时时间", description = "超时时间") + private Long timeout; + /** + * 最大超时时间 + */ + @Schema(title = "最大超时时间", description = "最大超时时间") + private Long maxTimeout; } diff --git a/taoyao-signal-server/taoyao-boot/src/main/java/com/acgist/taoyao/boot/utils/CloseableUtils.java b/taoyao-signal-server/taoyao-boot/src/main/java/com/acgist/taoyao/boot/utils/CloseableUtils.java new file mode 100644 index 0000000..a5f0caf --- /dev/null +++ b/taoyao-signal-server/taoyao-boot/src/main/java/com/acgist/taoyao/boot/utils/CloseableUtils.java @@ -0,0 +1,44 @@ +package com.acgist.taoyao.boot.utils; + +import java.io.Closeable; + +import lombok.extern.slf4j.Slf4j; + +/** + * 关闭资源工具 + * + * @author acgist + */ +@Slf4j +public class CloseableUtils { + + private CloseableUtils() { + } + + /** + * 关闭资源 + * + * @param closeable 资源 + */ + public static final void close(Closeable closeable) { + try { + closeable.close(); + } catch (Exception e) { + log.error("关闭资源异常", e); + } + } + + /** + * 关闭资源 + * + * @param closeable 资源 + */ + public static final void close(AutoCloseable closeable) { + try { + closeable.close(); + } catch (Exception e) { + log.error("关闭资源异常", e); + } + } + +} diff --git a/taoyao-signal-server/taoyao-server/src/main/resources/application.yml b/taoyao-signal-server/taoyao-server/src/main/resources/application.yml index 995951f..90b6498 100644 --- a/taoyao-signal-server/taoyao-server/src/main/resources/application.yml +++ b/taoyao-signal-server/taoyao-server/src/main/resources/application.yml @@ -111,6 +111,16 @@ taoyao: password: taoyao # 录像配置 record-storage-path: /data/record + # Socket信令 + socket: + enabled: true + host: 0.0.0.0 + port: 9999 + queue-size: 100000 + thread-min: 4 + thread-max: 128 + thread-name-prefix: ${spring.application.name}-signal- + keep-alive-time: 60 # WebRTC配置 webrtc: # STUN服务 @@ -121,10 +131,10 @@ taoyao: - stun:stun4.l.google.com:19302 # TURN服务 turn: - - turn:127.0.0.1:8888 - - turn:127.0.0.1:8888 - - turn:127.0.0.1:8888 - - turn:127.0.0.1:8888 + - turn:192.168.8.110:3478 + - turn:192.168.8.111:3478 + - turn:192.168.8.112:3478 + - turn:192.168.8.113:3478 # 安全配置 security: enabled: true diff --git a/taoyao-signal-server/taoyao-server/src/test/java/com/acgist/taoyao/signal/SocketSignalTest.java b/taoyao-signal-server/taoyao-server/src/test/java/com/acgist/taoyao/signal/SocketSignalTest.java new file mode 100644 index 0000000..8562d75 --- /dev/null +++ b/taoyao-signal-server/taoyao-server/src/test/java/com/acgist/taoyao/signal/SocketSignalTest.java @@ -0,0 +1,22 @@ +package com.acgist.taoyao.signal; + +import java.io.IOException; +import java.io.OutputStream; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.net.UnknownHostException; + +import org.junit.jupiter.api.Test; + +public class SocketSignalTest { + + @Test + void test() throws UnknownHostException, IOException { + final Socket socket = new Socket(); + socket.connect(new InetSocketAddress("127.0.0.1", 9999)); + final OutputStream outputStream = socket.getOutputStream(); + outputStream.write("{}".getBytes()); + socket.close(); + } + +} diff --git a/taoyao-signal-server/taoyao-server/src/test/java/com/acgist/taoyao/signal/SignalTest.java b/taoyao-signal-server/taoyao-server/src/test/java/com/acgist/taoyao/signal/WebSocketSignalTest.java similarity index 98% rename from taoyao-signal-server/taoyao-server/src/test/java/com/acgist/taoyao/signal/SignalTest.java rename to taoyao-signal-server/taoyao-server/src/test/java/com/acgist/taoyao/signal/WebSocketSignalTest.java index 1bd5ddc..aa6af9a 100644 --- a/taoyao-signal-server/taoyao-server/src/test/java/com/acgist/taoyao/signal/SignalTest.java +++ b/taoyao-signal-server/taoyao-server/src/test/java/com/acgist/taoyao/signal/WebSocketSignalTest.java @@ -18,7 +18,7 @@ import lombok.extern.slf4j.Slf4j; @Slf4j @TaoyaoTest(classes = TaoyaoApplication.class) -class SignalTest { +class WebSocketSignalTest { /** * 防止GC diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/client/socket/SocketAcceptHandler.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/client/socket/SocketAcceptHandler.java new file mode 100644 index 0000000..ab2ad20 --- /dev/null +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/client/socket/SocketAcceptHandler.java @@ -0,0 +1,42 @@ +package com.acgist.taoyao.signal.client.socket; + +import java.nio.channels.AsynchronousServerSocketChannel; +import java.nio.channels.AsynchronousSocketChannel; +import java.nio.channels.CompletionHandler; + +import com.acgist.taoyao.signal.client.ClientManager; +import com.acgist.taoyao.signal.protocol.ProtocolManager; + +import lombok.extern.slf4j.Slf4j; + +/** + * Socket信令接收 + * + * @author acgist + */ +@Slf4j +public final class SocketAcceptHandler implements CompletionHandler { + + private ClientManager clientManager; + private ProtocolManager protocolManager; + + public SocketAcceptHandler(ClientManager clientManager, ProtocolManager protocolManager) { + this.clientManager = clientManager; + this.protocolManager = protocolManager; + } + + @Override + public void completed(AsynchronousSocketChannel channel, AsynchronousServerSocketChannel server) { + log.debug("Socket信令连接成功:{}", channel); + this.clientManager.open(new SocketClient(channel)); + final SocketMessageHandler socketMessageHandler = new SocketMessageHandler(this.clientManager, this.protocolManager); + socketMessageHandler.handle(channel); + server.accept(server, this); + } + + @Override + public void failed(Throwable throwable, AsynchronousServerSocketChannel server) { + log.error("Socket信令连接异常:{}", server, throwable); + } + +} \ No newline at end of file diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/client/socket/SocketClient.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/client/socket/SocketClient.java index 5330f8a..9492178 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/client/socket/SocketClient.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/client/socket/SocketClient.java @@ -1,8 +1,9 @@ package com.acgist.taoyao.signal.client.socket; -import java.io.IOException; -import java.io.OutputStream; -import java.net.Socket; +import java.nio.ByteBuffer; +import java.nio.channels.AsynchronousSocketChannel; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import com.acgist.taoyao.boot.model.Message; import com.acgist.taoyao.signal.client.ClientAdapter; @@ -19,31 +20,25 @@ import lombok.extern.slf4j.Slf4j; @Slf4j @Getter @Setter -public class SocketClient extends ClientAdapter { +public class SocketClient extends ClientAdapter { - /** - * 输出 - */ - private OutputStream outputStream; - - public SocketClient(Socket instance) { + public SocketClient(AsynchronousSocketChannel instance) { super(instance); - try { - this.outputStream = instance.getOutputStream(); - } catch (IOException e) { - log.error("Socket终端输出异常:{}", instance, e); - } } @Override public void push(Message message) { try { - if(this.instance.isClosed()) { - log.error("会话已经关闭:{}", this.instance); - } else { - this.outputStream.write(message.toString().getBytes()); + synchronized (this.instance) { } - } catch (IOException e) { + if(this.instance.isOpen()) { + final Future future = this.instance.write(ByteBuffer.wrap(message.toString().getBytes())); + future.get(); + // TODO:超时 + } else { + log.error("会话已经关闭:{}", this.instance); + } + } catch (InterruptedException | ExecutionException e) { log.error("Socket发送消息异常:{}", message, e); } } diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/client/socket/SocketMessageHandler.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/client/socket/SocketMessageHandler.java new file mode 100644 index 0000000..781ee6b --- /dev/null +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/client/socket/SocketMessageHandler.java @@ -0,0 +1,76 @@ +package com.acgist.taoyao.signal.client.socket; + +import java.nio.ByteBuffer; +import java.nio.channels.AsynchronousSocketChannel; +import java.nio.channels.CompletionHandler; + +import com.acgist.taoyao.boot.utils.CloseableUtils; +import com.acgist.taoyao.signal.client.ClientManager; +import com.acgist.taoyao.signal.protocol.ProtocolManager; + +import lombok.extern.slf4j.Slf4j; + +/** + * Socket信令消息 + * + * @author acgist + */ +@Slf4j +public final class SocketMessageHandler implements CompletionHandler { + + private ClientManager clientManager; + private ProtocolManager protocolManager; + private AsynchronousSocketChannel channel; + + public SocketMessageHandler(ClientManager clientManager, ProtocolManager protocolManager) { + this.clientManager = clientManager; + this.protocolManager = protocolManager; + } + + public void handle(AsynchronousSocketChannel channel) { + this.channel = channel; + this.waitMessage(); + } + + /** + * 消息轮询 + */ + private void waitMessage() { + if(this.channel.isOpen()) { + final ByteBuffer buffer = ByteBuffer.allocateDirect(2048); + this.channel.read(buffer, buffer, this); + } else { + log.debug("Socket信令消息退出消息轮询"); + } + } + + private void close() { + CloseableUtils.close(this.channel); + } + + @Override + public void completed(Integer result, ByteBuffer buffer) { + if (result == null) { + this.close(); + } else if(result == -1) { + // 服务端关闭 + this.close(); + } else if(result == 0) { + // 消息空轮询 + log.debug("Socket信令消息接收失败(长度):{}", result); + } else { + final byte[] bytes = new byte[buffer.remaining()]; + buffer.get(bytes); + this.protocolManager.execute(new String(bytes), this.channel); + } + this.waitMessage(); + } + + @Override + public void failed(Throwable throwable, ByteBuffer buffer) { + log.error("Socket信令消息处理异常:{}", this.channel, throwable); + this.close(); + } + + +} \ No newline at end of file diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/client/socket/SocketSignal.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/client/socket/SocketSignal.java index 10e638a..dfed25b 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/client/socket/SocketSignal.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/client/socket/SocketSignal.java @@ -1,10 +1,111 @@ package com.acgist.taoyao.signal.client.socket; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.channels.AsynchronousChannelGroup; +import java.nio.channels.AsynchronousServerSocketChannel; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import com.acgist.taoyao.boot.model.MessageCodeException; +import com.acgist.taoyao.boot.property.SocketProperties; +import com.acgist.taoyao.boot.utils.CloseableUtils; +import com.acgist.taoyao.signal.client.ClientManager; +import com.acgist.taoyao.signal.protocol.ProtocolManager; + +import lombok.extern.slf4j.Slf4j; + /** * Socket信令 * * @author acgist */ +@Slf4j public class SocketSignal { + private int index = 0; + private ClientManager clientManager; + private AsynchronousChannelGroup group; + private ProtocolManager protocolManager; + private SocketProperties socketProperties; + private AsynchronousServerSocketChannel channel; + + public SocketSignal( + ClientManager clientManager, + ProtocolManager protocolManager, + SocketProperties socketProperties + ) { + this.clientManager = clientManager; + this.protocolManager = protocolManager; + this.socketProperties = socketProperties; + try { + final ExecutorService executor = new ThreadPoolExecutor( + this.socketProperties.getThreadMin(), + this.socketProperties.getThreadMax(), + this.socketProperties.getKeepAliveTime(), + TimeUnit.SECONDS, + new LinkedBlockingQueue<>(this.socketProperties.getQueueSize()), + newThreadFactory() + ); + this.group = AsynchronousChannelGroup.withThreadPool(executor); + } catch (IOException e) { + throw MessageCodeException.of(e, "创建Socket信令失败"); + } + } + + /** + * @return 线程池工厂 + */ + private ThreadFactory newThreadFactory() { + return (runnable) -> { + final Thread thread = new Thread(runnable); + // 线程名称 + synchronized(this) { + if(++this.index > this.socketProperties.getThreadMax()) { + this.index = 0; + } + thread.setName(this.socketProperties.getThreadNamePrefix() + this.index); + } + // 守护线程 + thread.setDaemon(true); + return thread; + }; + } + + /** + * 开启监听 + * + * @return 是否成功 + */ + public boolean listen() { + boolean success = true; + try { + this.channel = AsynchronousServerSocketChannel.open(this.group); + this.channel.bind(new InetSocketAddress(this.socketProperties.getHost(), this.socketProperties.getPort())); + this.channel.accept(this.channel, new SocketAcceptHandler(this.clientManager, this.protocolManager)); + } catch (IOException e) { + log.error("启动Socket信令服务异常", e); + success = false; + } finally { + if(success) { + log.info("启动Socket信令服务:{}-{}", this.socketProperties.getHost(), this.socketProperties.getPort()); + } else { + this.shutdown(); + } + } + return success; + } + + /** + * 关闭Socket信令服务 + */ + public void shutdown() { + log.debug("关闭Socket信令服务"); + CloseableUtils.close(this.channel); + this.group.shutdown(); + } + } diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/config/SignalAutoConfiguration.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/config/SignalAutoConfiguration.java index 642ad84..9110f85 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/config/SignalAutoConfiguration.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/config/SignalAutoConfiguration.java @@ -9,8 +9,12 @@ import org.springframework.context.annotation.Configuration; import org.springframework.web.socket.config.annotation.EnableWebSocket; import org.springframework.web.socket.server.standard.ServerEndpointExporter; +import com.acgist.taoyao.boot.property.SocketProperties; +import com.acgist.taoyao.signal.client.ClientManager; +import com.acgist.taoyao.signal.client.socket.SocketSignal; import com.acgist.taoyao.signal.client.websocket.WebSocketSignal; import com.acgist.taoyao.signal.media.MediaClientManager; +import com.acgist.taoyao.signal.protocol.ProtocolManager; import com.acgist.taoyao.signal.protocol.media.MediaRebootProtocol; import com.acgist.taoyao.signal.protocol.media.MediaShutdownProtocol; import com.acgist.taoyao.signal.protocol.platform.PlatformRebootProtocol; @@ -61,6 +65,32 @@ public class SignalAutoConfiguration { }; } + @Bean + @Autowired + @ConditionalOnProperty(prefix = "taoyao.socket", name = "enabled", havingValue = "true", matchIfMissing = true) + @ConditionalOnMissingBean + public SocketSignal socketSignal( + ClientManager clientManager, + ProtocolManager protocolManager, + SocketProperties socketProperties + ) { + return new SocketSignal(clientManager, protocolManager, socketProperties); + } + + @Bean + @Autowired + @ConditionalOnProperty(prefix = "taoyao.socket", name = "enabled", havingValue = "true", matchIfMissing = true) + public CommandLineRunner socketSignalCommandLineRunner( + SocketSignal socketSignal + ) { + return new CommandLineRunner() { + @Override + public void run(String ... args) throws Exception { + socketSignal.listen(); + } + }; + } + @Bean @ConditionalOnProperty(prefix = "taoyao.script", name = "enabled", havingValue = "true", matchIfMissing = true) @ConditionalOnMissingBean diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/ProtocolMediaRoomAdapter.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/ProtocolMediaRoomAdapter.java index 29546c8..82d3a5d 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/ProtocolMediaRoomAdapter.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/ProtocolMediaRoomAdapter.java @@ -12,6 +12,8 @@ import com.acgist.taoyao.signal.media.Room; /** * 房间媒体服务信令适配器 * + * TODO:校验是否是房间内的用户权限 + * * @author acgist */ public abstract class ProtocolMediaRoomAdapter extends ProtocolMediaAdapter implements MapBodyGetter {