From eb84cc3d8d1d3ebda081c54e38a4db69ee084a74 Mon Sep 17 00:00:00 2001 From: acgist <289547414@qq.com> Date: Thu, 20 Jul 2023 08:32:14 +0800 Subject: [PATCH] =?UTF-8?q?[*]=20=E6=97=A5=E5=B8=B8=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../taoyao/boot/utils/CloseableUtils.java | 16 + .../taoyao/signal/client/ClientAdapter.java | 11 + .../taoyao/signal/client/ClientManager.java | 408 +++++++++--------- .../taoyao/signal/client/ClientStatus.java | 102 ++--- .../taoyao/signal/client/ClientType.java | 36 +- .../signal/client/socket/SocketClient.java | 89 ++-- .../signal/client/socket/SocketSignal.java | 190 ++++---- .../socket/SocketSignalAcceptHandler.java | 84 ++-- .../socket/SocketSignalMessageHandler.java | 268 ++++++------ .../client/websocket/WebSocketClient.java | 6 +- .../protocol/room/RoomCreateProtocol.java | 2 +- 11 files changed, 627 insertions(+), 585 deletions(-) diff --git a/taoyao-signal-server/taoyao-boot/src/main/java/com/acgist/taoyao/boot/utils/CloseableUtils.java b/taoyao-signal-server/taoyao-boot/src/main/java/com/acgist/taoyao/boot/utils/CloseableUtils.java index 82cea84..e1086eb 100644 --- a/taoyao-signal-server/taoyao-boot/src/main/java/com/acgist/taoyao/boot/utils/CloseableUtils.java +++ b/taoyao-signal-server/taoyao-boot/src/main/java/com/acgist/taoyao/boot/utils/CloseableUtils.java @@ -1,6 +1,7 @@ package com.acgist.taoyao.boot.utils; import java.io.Closeable; +import java.nio.channels.AsynchronousChannelGroup; import lombok.extern.slf4j.Slf4j; @@ -45,4 +46,19 @@ public final class CloseableUtils { } } + /** + * 关闭通道线程池 + * + * @param group 通道线程池 + */ + public static final void shutdown(AsynchronousChannelGroup group) { + try { + if(group != null) { + group.shutdown(); + } + } catch (Exception e) { + log.error("关闭通道线程池异常", e); + } + } + } diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/client/ClientAdapter.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/client/ClientAdapter.java index e614ce3..26ca761 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/client/ClientAdapter.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/client/ClientAdapter.java @@ -58,6 +58,7 @@ public abstract class ClientAdapter implements Client { * @param instance 终端实例 */ protected ClientAdapter(long timeout, T instance) { + this.ip = this.getClientIP(instance); this.time = System.currentTimeMillis(); this.timeout = timeout; this.instance = instance; @@ -151,9 +152,19 @@ public abstract class ClientAdapter implements Client { @Override public void close() throws Exception { + log.info("关闭终端实例:{} - {}", this.ip, this.clientId); this.instance.close(); } + /** + * 解析终端IP + * + * @param instance 终端实例 + * + * @return 终端IP + */ + protected abstract String getClientIP(T instance); + @Override public String toString() { return this.getClass().getSimpleName() + " - " + this.ip + " - " + this.clientId; diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/client/ClientManager.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/client/ClientManager.java index 89e220d..bb1cabe 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/client/ClientManager.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/client/ClientManager.java @@ -22,214 +22,212 @@ import lombok.extern.slf4j.Slf4j; @Slf4j @Manager public class ClientManager { - - private final ApplicationContext applicationContext; - - /** - * 终端列表 - */ - private final List clients; - - public ClientManager(ApplicationContext applicationContext) { + + private final ApplicationContext applicationContext; + + /** + * 终端列表 + */ + private final List clients; + + public ClientManager(ApplicationContext applicationContext) { this.applicationContext = applicationContext; - this.clients = new CopyOnWriteArrayList<>(); + this.clients = new CopyOnWriteArrayList<>(); + } + + @Scheduled(cron = "${taoyao.scheduled.client:0 * * * * ?}") + public void scheduled() { + this.closeTimeout(); + } + + /** + * 终端打开加入管理 + * + * @param client 终端 + */ + public void open(Client client) { + this.clients.add(client); + } + + /** + * 授权终端单播消息 + * + * @param to 接收终端 + * @param message 消息 + */ + public void unicast(String to, Message message) { + this.clients().stream() + .filter(v -> Objects.equals(to, v.getClientId())) + .forEach(v -> v.push(message)); + } + + /** + * 授权终端单播消息 + * + * @param to 接收终端 + * @param message 消息 + */ + public void unicast(Client to, Message message) { + this.clients().stream() + .filter(v -> v == to) + .forEach(v -> v.push(message)); + } + + /** + * 授权终端广播消息 + * + * @param message 消息 + * @param clientTypes 终端类型 + */ + public void broadcast(Message message, ClientType ... clientTypes) { + this.clients(clientTypes).forEach(v -> v.push(message)); + } + + /** + * 授权终端广播消息 + * + * @param from 发送终端 + * @param message 消息 + * @param clientTypes 终端类型 + */ + public void broadcast(String from, Message message, ClientType ... clientTypes) { + this.clients(clientTypes).stream() + .filter(v -> !Objects.equals(from, v.getClientId())) + .forEach(v -> v.push(message)); + } + + /** + * 授权终端广播消息 + * + * @param from 发送终端 + * @param message 消息 + * @param clientTypes 终端类型 + */ + public void broadcast(Client from, Message message, ClientType ... clientTypes) { + this.clients(clientTypes).stream() + .filter(v -> v != from) + .forEach(v -> v.push(message)); + } + + /** + * @param instance 终端实例 + * + * @return 终端(包含授权和未授权) + */ + public Client clients(AutoCloseable instance) { + return this.clients.stream() + .filter(v -> v.getInstance() == instance) + .findFirst() + .orElse(null); + } + + /** + * @param clientId 终端ID + * + * @return 授权终端 + */ + public Client clients(String clientId) { + return this.clients().stream() + .filter(v -> Objects.equals(clientId, v.getClientId())) + .findFirst() + .orElse(null); + } + + /** + * @param clientTypes 终端类型 + * + * @return 授权终端列表 + */ + public List clients(ClientType ... clientTypes) { + return this.clients.stream() + .filter(Client::authorized) + .filter(client -> ArrayUtils.isEmpty(clientTypes) || ArrayUtils.contains(clientTypes, client.getClientType())) + .toList(); + } + + /** + * @param instance 终端实例 + * + * @return 终端状态 + */ + public ClientStatus status(AutoCloseable instance) { + final Client client = this.clients(instance); + return client == null ? null : client.getStatus(); + } + + /** + * @param clientId 终端ID + * + * @return 授权终端状态 + */ + public ClientStatus status(String clientId) { + final Client client = this.clients(clientId); + return client == null ? null : client.getStatus(); } - - @Scheduled(cron = "${taoyao.scheduled.client:0 * * * * ?}") - public void scheduled() { - this.closeTimeout(); - } - - /** - * 终端打开加入管理 - * - * @param client 终端 - */ - public void open(Client client) { - this.clients.add(client); - } - - /** - * 授权终端单播消息 - * - * @param to 接收终端 - * @param message 消息 - */ - public void unicast(String to, Message message) { - this.clients().stream() - .filter(v -> Objects.equals(to, v.getClientId())) - .forEach(v -> v.push(message)); - } - - /** - * 授权终端单播消息 - * - * @param to 接收终端 - * @param message 消息 - */ - public void unicast(Client to, Message message) { - this.clients().stream() - .filter(v -> v.getInstance() == to) - .forEach(v -> v.push(message)); - } - - /** - * 授权终端广播消息 - * - * @param message 消息 - * @param clientTypes 终端类型 - */ - public void broadcast(Message message, ClientType ... clientTypes) { - this.clients(clientTypes).forEach(v -> v.push(message)); - } - - /** - * 授权终端广播消息 - * - * @param from 发送终端 - * @param message 消息 - * @param clientTypes 终端类型 - */ - public void broadcast(String from, Message message, ClientType ... clientTypes) { - this.clients(clientTypes).stream() - .filter(v -> !Objects.equals(from, v.getClientId())) - .forEach(v -> v.push(message)); - } - - /** - * 授权终端广播消息 - * - * @param from 发送终端 - * @param message 消息 - * @param clientTypes 终端类型 - */ - public void broadcast(Client from, Message message, ClientType ... clientTypes) { - this.clients(clientTypes).stream() - .filter(v -> v.getInstance() != from) - .forEach(v -> v.push(message)); - } - - /** - * @param instance 终端实例 - * - * @return 终端 - */ - public Client clients(AutoCloseable instance) { - return this.clients.stream() - .filter(v -> v.getInstance() == instance) - .findFirst() - .orElse(null); - } - - /** - * @param clientId 终端标识 - * - * @return 授权终端 - */ - public Client clients(String clientId) { - return this.clients().stream() - .filter(v -> Objects.equals(clientId, v.getClientId())) - .findFirst() - .orElse(null); - } - - /** - * @param clientTypes 终端类型 - * - * @return 所有授权终端列表 - */ - public List clients(ClientType ... clientTypes) { - return this.clients.stream() - .filter(Client::authorized) - .filter(client -> ArrayUtils.isEmpty(clientTypes) || ArrayUtils.contains(clientTypes, client.getClientType())) - .toList(); - } - - /** - * @param instance 终端实例 - * - * @return 终端状态 - */ - public ClientStatus status(AutoCloseable instance) { - final Client client = this.clients(instance); - return client == null ? null : client.getStatus(); - } - - /** - * @param clientId 终端标识 - * - * @return 授权终端状态 - */ - public ClientStatus status(String clientId) { - final Client client = this.clients(clientId); - return client == null ? null : client.getStatus(); - } - /** - * @param clientTypes 终端类型 - * - * @return 所有授权终端状态列表 - */ - public List status(ClientType ... clientTypes) { - return this.clients(clientTypes).stream() - .map(Client::getStatus) - .toList(); - } + /** + * @param clientTypes 终端类型 + * + * @return 授权终端状态列表 + */ + public List status(ClientType ... clientTypes) { + return this.clients(clientTypes).stream() + .map(Client::getStatus) + .toList(); + } - /** - * 推送消息 - * - * @param instance 终端实例 - * @param message 消息 - */ - public void push(AutoCloseable instance, Message message) { - final Client client = this.clients(instance); - if(client == null) { - log.warn("推送消息终端无效:{}-{}", instance, message); - return; - } - client.push(message); - } - - /** - * 关闭终端 - * - * @param instance 终端实例 - */ - public void close(AutoCloseable instance) { - final Client client = this.clients(instance); - try { - if(client != null) { - client.close(); - } else { - instance.close(); - } - } catch (Exception e) { - log.error("关闭终端异常:{}", instance, e); - } finally { - if(client != null) { - // 移除管理 - this.clients.remove(client); - // 关闭事件 - this.applicationContext.publishEvent(new ClientCloseEvent(client)); - } - } - } - - /** - * 定时关闭超时终端 - */ - private void closeTimeout() { - final int oldSize = this.clients.size(); - this.clients.stream() - .filter(v -> v.unauthorized()) - .filter(v -> v.timeout()) - .forEach(v -> { - log.debug("关闭超时终端:{}", v); - this.close(v); - }); - final int newSize = this.clients.size(); - log.debug("定时关闭超时终端:{}", newSize - oldSize); - } + /** + * 推送消息 + * + * @param instance 终端实例 + * @param message 消息 + */ + public void push(AutoCloseable instance, Message message) { + final Client client = this.clients(instance); + if(client == null) { + log.warn("推送消息终端无效:{} - {}", instance, message); + return; + } + client.push(message); + } + + /** + * 关闭终端 + * + * @param instance 终端实例 + */ + public void close(AutoCloseable instance) { + final Client client = this.clients(instance); + try { + if(client != null) { + client.close(); + } else { + instance.close(); + } + } catch (Exception e) { + log.error("关闭终端异常:{}", instance, e); + } finally { + if(client != null) { + this.clients.remove(client); + this.applicationContext.publishEvent(new ClientCloseEvent(client)); + } + } + } + + /** + * 定时关闭超时终端 + */ + private void closeTimeout() { + final int oldSize = this.clients.size(); + this.clients.stream() + .filter(v -> v.unauthorized()) + .filter(v -> v.timeout()) + .forEach(v -> { + log.debug("关闭超时终端:{}", v); + this.close(v); + }); + final int newSize = this.clients.size(); + log.debug("定时关闭超时终端:{}", newSize - oldSize); + } } diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/client/ClientStatus.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/client/ClientStatus.java index d1fd1d0..55403d3 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/client/ClientStatus.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/client/ClientStatus.java @@ -37,31 +37,31 @@ public class ClientStatus { private Double humidity; @Schema(title = "温度", description = "温度") private Double temperature; - @Schema(title = "信号强度(0~100)", description = "信号强度(0~100)") - private Integer signal; - @Schema(title = "电池电量(0~100)", description = "电池电量(0~100)") - private Integer battery; - @Schema(title = "是否发生告警", description = "是否发生告警") - private Boolean alarming; - @Schema(title = "是否正在充电", description = "是否正在充电") - private Boolean charging; - @Schema(title = "终端是否正在录像", description = "终端是否正在录像") - private Boolean clientRecording; - @Schema(title = "服务端是否正在录像", description = "服务端是否正在录像") - private Boolean serverRecording; - @Schema(title = "最后心跳时间", description = "最后心跳时间") - private LocalDateTime lastHeartbeat; - @Schema(title = "终端状态", description = "其他扩展终端状态") - private Map status = new HashMap<>(); - @Schema(title = "终端配置", description = "其他扩展终端配置") - private Map config = new HashMap<>(); - - /** - * 拷贝属性 - * - * @param body 消息主体 - */ - public void copy(Map body) { + @Schema(title = "信号强度(0~100)", description = "信号强度(0~100)") + private Integer signal; + @Schema(title = "电池电量(0~100)", description = "电池电量(0~100)") + private Integer battery; + @Schema(title = "是否发生告警", description = "是否发生告警") + private Boolean alarming; + @Schema(title = "是否正在充电", description = "是否正在充电") + private Boolean charging; + @Schema(title = "终端是否正在录像", description = "终端是否正在录像") + private Boolean clientRecording; + @Schema(title = "服务端是否正在录像", description = "服务端是否正在录像") + private Boolean serverRecording; + @Schema(title = "终端状态", description = "其他扩展终端状态") + private Map status = new HashMap<>(); + @Schema(title = "终端配置", description = "其他扩展终端配置") + private Map config = new HashMap<>(); + @Schema(title = "最后心跳时间", description = "最后心跳时间") + private LocalDateTime lastHeartbeat; + + /** + * 拷贝属性 + * + * @param body 消息主体 + */ + public void copy(Map body) { this.setLatitude(MapUtils.getDouble(body, Constant.LATITUDE)); this.setLongitude(MapUtils.getDouble(body, Constant.LONGITUDE)); this.setHumidity(MapUtils.getDouble(body, Constant.HUMIDITY)); @@ -74,30 +74,30 @@ public class ClientStatus { this.status(MapUtils.get(body, Constant.STATUS)); this.config(MapUtils.get(body, Constant.CONFIG)); this.setLastHeartbeat(LocalDateTime.now()); - } - - /** - * 拷贝状态 - * - * @param map 状态 - */ - public void status(Map map) { - if(map == null) { - return; - } - map.forEach(this.status::put); - } - - /** - * 拷贝配置 - * - * @param map 配置 - */ - public void config(Map map) { - if(map == null) { - return; - } - map.forEach(this.config::put); - } - + } + + /** + * 拷贝状态 + * + * @param map 状态 + */ + public void status(Map map) { + if(map == null) { + return; + } + map.forEach(this.status::put); + } + + /** + * 拷贝配置 + * + * @param map 配置 + */ + public void config(Map map) { + if(map == null) { + return; + } + map.forEach(this.config::put); + } + } diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/client/ClientType.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/client/ClientType.java index 84b9239..9169ec0 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/client/ClientType.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/client/ClientType.java @@ -14,10 +14,25 @@ import lombok.Getter; @Getter public enum ClientType { + /** + * 通过浏览器接入的终端 + */ WEB("Web"), + /** + * 媒体服务终端 + */ MEDIA("媒体服务"), + /** + * 没有界面的摄像头 + */ CAMERA("摄像头"), + /** + * 手机APP、平板APP + */ MOBILE("移动端"), + /** + * 其他智能终端 + */ OTHER("其他终端"); /** @@ -33,7 +48,10 @@ public enum ClientType { * @return 是否是媒体终端 */ public boolean mediaClient() { - return this == WEB || this == CAMERA || this == MOBILE; + return + this == WEB || + this == CAMERA || + this == MOBILE; } /** @@ -49,7 +67,8 @@ public enum ClientType { * @return 类型 */ public static final ClientType of(String value) { - for (ClientType type : ClientType.values()) { + final ClientType[] types = ClientType.values(); + for (ClientType type : types) { if(type.name().equalsIgnoreCase(value)) { return type; } @@ -58,12 +77,17 @@ public enum ClientType { } /** - * 媒体终端 + * 媒体终端类型列表 */ - public static final ClientType[] MEDIA_CLIENT = Stream.of(ClientType.values()).filter(ClientType::mediaClient).toArray(ClientType[]::new); + public static final ClientType[] MEDIA_CLIENT_TYPE + = + Stream.of(ClientType.values()).filter(ClientType::mediaClient).toArray(ClientType[]::new); + /** - * 媒体服务 + * 媒体服务类型列表 */ - public static final ClientType[] MEDIA_SERVER = Stream.of(ClientType.values()).filter(ClientType::mediaServer).toArray(ClientType[]::new); + public static final ClientType[] MEDIA_SERVER_TYPE + = + Stream.of(ClientType.values()).filter(ClientType::mediaServer).toArray(ClientType[]::new); } diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/client/socket/SocketClient.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/client/socket/SocketClient.java index 537496e..235fa4e 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/client/socket/SocketClient.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/client/socket/SocketClient.java @@ -36,55 +36,50 @@ public class SocketClient extends ClientAdapter { */ private final Cipher cipher; - public SocketClient(SocketProperties socketProperties, AsynchronousSocketChannel instance) { - super(socketProperties.getTimeout(), instance); - this.ip = this.clientIp(instance); - this.cipher = CipherUtils.buildCipher(Cipher.ENCRYPT_MODE, socketProperties.getEncrypt(), socketProperties.getEncryptSecret()); - } + public SocketClient(SocketProperties socketProperties, AsynchronousSocketChannel instance) { + super(socketProperties.getTimeout(), instance); + this.cipher = CipherUtils.buildCipher(Cipher.ENCRYPT_MODE, socketProperties.getEncrypt(), socketProperties.getEncryptSecret()); + } - @Override - public void push(Message message) { - synchronized (this.instance) { - try { - if(this.instance.isOpen()) { - // 加密 - final byte[] bytes = this.encrypt(message); - // 发送 - final ByteBuffer buffer = ByteBuffer.allocateDirect(Short.BYTES + bytes.length); - buffer.putShort((short) bytes.length); - buffer.put(bytes); - buffer.flip(); - final Future future = this.instance.write(buffer); - future.get(this.timeout, TimeUnit.MILLISECONDS); - } else { - log.error("Socket终端已经关闭:{}", this.instance); - } - } catch (Exception e) { - log.error("Socket终端发送消息异常:{}", message, e); - } - } - } - - /** - * @param instance 终端实例 - * - * @return 终端IP - */ - private String clientIp(AsynchronousSocketChannel instance) { - try { + @Override + public void push(Message message) { + synchronized (this.instance) { + try { + if(this.instance.isOpen()) { + // 加密 + final byte[] bytes = this.encrypt(message); + // 发送 + final ByteBuffer buffer = ByteBuffer.allocateDirect(Short.BYTES + bytes.length); + buffer.putShort((short) bytes.length); + buffer.put(bytes); + buffer.flip(); + final Future future = this.instance.write(buffer); + future.get(this.timeout, TimeUnit.MILLISECONDS); + } else { + log.error("Socket终端已经关闭:{}", this.instance); + } + } catch (Exception e) { + log.error("Socket终端发送消息异常:{}", message, e); + } + } + } + + @Override + protected String getClientIP(AsynchronousSocketChannel instance) { + try { return ((InetSocketAddress) instance.getRemoteAddress()).getHostString(); } catch (IOException e) { - throw MessageCodeException.of(e, "无效终端(IP):" + instance); + throw MessageCodeException.of(e, "无效终端IP:" + instance); } - } - - /** - * @param message 消息 - * - * @return 加密消息 - */ - private byte[] encrypt(Message message) { - final byte[] bytes = message.toString().getBytes(); + } + + /** + * @param message 消息 + * + * @return 加密消息 + */ + private byte[] encrypt(Message message) { + final byte[] bytes = message.toString().getBytes(); if(this.cipher != null) { try { return this.cipher.doFinal(bytes); @@ -93,6 +88,6 @@ public class SocketClient extends ClientAdapter { } } return bytes; - } - + } + } diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/client/socket/SocketSignal.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/client/socket/SocketSignal.java index 88ce5f9..e82d6a1 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/client/socket/SocketSignal.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/client/socket/SocketSignal.java @@ -22,106 +22,102 @@ import lombok.extern.slf4j.Slf4j; /** * Socket信令 * - * TODO:加密 - * * @author acgist */ @Slf4j public class SocketSignal { - - private ClientManager clientManager; - private ProtocolManager protocolManager; - private SocketProperties socketProperties; - private PlatformErrorProtocol platformErrorProtocol; + + private final ClientManager clientManager; + private final ProtocolManager protocolManager; + private final SocketProperties socketProperties; + private final PlatformErrorProtocol platformErrorProtocol; - /** - * 线程序号 - */ - private int index = 0; - /** - * 通道线程池 - */ - private AsynchronousChannelGroup group; - /** - * 服务端通道 - */ - private AsynchronousServerSocketChannel channel; - - public SocketSignal( - ClientManager clientManager, - ProtocolManager protocolManager, - SocketProperties socketProperties, - PlatformErrorProtocol platformErrorProtocol - ) { - this.clientManager = clientManager; - this.protocolManager = protocolManager; - this.socketProperties = socketProperties; - this.platformErrorProtocol = platformErrorProtocol; - } - - /** - * 初始化服务端 - */ - public void init() { - boolean success = true; - final String host = this.socketProperties.getHost(); - final Integer port = this.socketProperties.getPort(); - try { - final ExecutorService executor = new ThreadPoolExecutor( - this.socketProperties.getMinThread(), - this.socketProperties.getMaxThread(), - this.socketProperties.getKeepAliveTime(), - TimeUnit.MILLISECONDS, - new LinkedBlockingQueue<>(this.socketProperties.getQueueSize()), - this.newThreadFactory() - ); - this.group = AsynchronousChannelGroup.withThreadPool(executor); - this.channel = AsynchronousServerSocketChannel.open(this.group); - this.channel.bind(new InetSocketAddress(host, port)); - this.channel.accept(this.channel, new SocketSignalAcceptHandler( - this.clientManager, - this.protocolManager, - this.socketProperties, - this.platformErrorProtocol - )); - } catch (IOException e) { - log.error("启动Socket信令服务异常", e); - success = false; - } finally { - if(success) { - log.info("启动Socket信令服务成功:{}-{}", host, port); - } else { - this.destroy(); - } - } - } - - /** - * @return 线程池工厂 - */ - private ThreadFactory newThreadFactory() { - return (runnable) -> { - final Thread thread = new Thread(runnable); - // 线程名称 - synchronized(this) { - if(++this.index > this.socketProperties.getMaxThread()) { - this.index = 0; - } - thread.setName(this.socketProperties.getThreadNamePrefix() + this.index); - } - // 守护线程 - thread.setDaemon(true); - return thread; - }; - } - - @PreDestroy - public void destroy() { - log.debug("关闭Socket信令服务:{}", this.channel); - CloseableUtils.close(this.channel); - if(this.group != null) { - this.group.shutdown(); - } - } - + /** + * 线程序号 + */ + private int index = 0; + /** + * 通道线程池 + */ + private AsynchronousChannelGroup group; + /** + * 服务端通道 + */ + private AsynchronousServerSocketChannel server; + + public SocketSignal( + ClientManager clientManager, + ProtocolManager protocolManager, + SocketProperties socketProperties, + PlatformErrorProtocol platformErrorProtocol + ) { + this.clientManager = clientManager; + this.protocolManager = protocolManager; + this.socketProperties = socketProperties; + this.platformErrorProtocol = platformErrorProtocol; + } + + /** + * 初始化服务端 + */ + public void init() { + boolean success = true; + final String host = this.socketProperties.getHost(); + final Integer port = this.socketProperties.getPort(); + try { + final ExecutorService executor = new ThreadPoolExecutor( + this.socketProperties.getMinThread(), + this.socketProperties.getMaxThread(), + this.socketProperties.getKeepAliveTime(), + TimeUnit.MILLISECONDS, + new LinkedBlockingQueue<>(this.socketProperties.getQueueSize()), + this.newThreadFactory() + ); + this.group = AsynchronousChannelGroup.withThreadPool(executor); + this.server = AsynchronousServerSocketChannel.open(this.group); + this.server.bind(new InetSocketAddress(host, port)); + this.server.accept(this.server, new SocketSignalAcceptHandler( + this.clientManager, + this.protocolManager, + this.socketProperties, + this.platformErrorProtocol + )); + } catch (IOException e) { + log.error("启动Socket信令服务异常", e); + success = false; + } finally { + if(success) { + log.info("启动Socket信令服务成功:{} - {}", host, port); + } else { + this.destroy(); + } + } + } + + /** + * @return 线程池工厂 + */ + private ThreadFactory newThreadFactory() { + return (runnable) -> { + final Thread thread = new Thread(runnable); + // 线程名称 + synchronized(this) { + if(++this.index > this.socketProperties.getMaxThread()) { + this.index = 0; + } + thread.setName(this.socketProperties.getThreadNamePrefix() + this.index); + } + // 守护线程 + thread.setDaemon(true); + return thread; + }; + } + + @PreDestroy + public void destroy() { + log.debug("关闭Socket信令服务:{}", this.server); + CloseableUtils.close(this.server); + CloseableUtils.shutdown(this.group); + } + } diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/client/socket/SocketSignalAcceptHandler.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/client/socket/SocketSignalAcceptHandler.java index df86d59..ba7d7ab 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/client/socket/SocketSignalAcceptHandler.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/client/socket/SocketSignalAcceptHandler.java @@ -21,47 +21,47 @@ import lombok.extern.slf4j.Slf4j; @Slf4j public final class SocketSignalAcceptHandler implements CompletionHandler { - private ClientManager clientManager; - private ProtocolManager protocolManager; - private SocketProperties socketProperties; - private PlatformErrorProtocol platformErrorProtocol; - - public SocketSignalAcceptHandler( - ClientManager clientManager, - ProtocolManager protocolManager, - SocketProperties socketProperties, - PlatformErrorProtocol platformErrorProtocol - ) { - this.clientManager = clientManager; - this.protocolManager = protocolManager; - this.socketProperties = socketProperties; - this.platformErrorProtocol = platformErrorProtocol; - } + private final ClientManager clientManager; + private final ProtocolManager protocolManager; + private final SocketProperties socketProperties; + private final PlatformErrorProtocol platformErrorProtocol; + + public SocketSignalAcceptHandler( + ClientManager clientManager, + ProtocolManager protocolManager, + SocketProperties socketProperties, + PlatformErrorProtocol platformErrorProtocol + ) { + this.clientManager = clientManager; + this.protocolManager = protocolManager; + this.socketProperties = socketProperties; + this.platformErrorProtocol = platformErrorProtocol; + } - @Override - public void completed(AsynchronousSocketChannel channel, AsynchronousServerSocketChannel server) { - try { - channel.setOption(StandardSocketOptions.SO_KEEPALIVE, Boolean.TRUE); - this.clientManager.open(new SocketClient(this.socketProperties, channel)); - final SocketSignalMessageHandler messageHandler = new SocketSignalMessageHandler( - this.clientManager, - this.protocolManager, - this.socketProperties, - channel, - this.platformErrorProtocol - ); - messageHandler.loopMessage(); - log.debug("Socket信令终端连接成功:{}", channel); - } catch (IOException e) { - log.error("Socket信令终端连接异常:", channel, e); - } finally { - server.accept(server, this); - } - } - - @Override - public void failed(Throwable throwable, AsynchronousServerSocketChannel server) { - log.error("Socket信令终端连接异常:{}", server, throwable); - } - + @Override + public void completed(AsynchronousSocketChannel channel, AsynchronousServerSocketChannel server) { + try { + channel.setOption(StandardSocketOptions.SO_KEEPALIVE, Boolean.TRUE); + this.clientManager.open(new SocketClient(this.socketProperties, channel)); + final SocketSignalMessageHandler messageHandler = new SocketSignalMessageHandler( + this.clientManager, + this.protocolManager, + this.socketProperties, + this.platformErrorProtocol, + channel + ); + messageHandler.loopMessage(); + log.debug("Socket信令终端连接成功:{}", channel); + } catch (IOException e) { + log.error("Socket信令终端连接异常:", channel, e); + } finally { + server.accept(server, this); + } + } + + @Override + public void failed(Throwable throwable, AsynchronousServerSocketChannel server) { + log.error("Socket信令终端连接异常:{}", server, throwable); + } + } \ No newline at end of file diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/client/socket/SocketSignalMessageHandler.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/client/socket/SocketSignalMessageHandler.java index 8974ea8..1461260 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/client/socket/SocketSignalMessageHandler.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/client/socket/SocketSignalMessageHandler.java @@ -10,7 +10,6 @@ import javax.crypto.IllegalBlockSizeException; import com.acgist.taoyao.boot.config.SocketProperties; import com.acgist.taoyao.boot.model.MessageCodeException; -import com.acgist.taoyao.boot.utils.CloseableUtils; import com.acgist.taoyao.signal.client.ClientManager; import com.acgist.taoyao.signal.protocol.ProtocolManager; import com.acgist.taoyao.signal.protocol.platform.PlatformErrorProtocol; @@ -27,134 +26,133 @@ import lombok.extern.slf4j.Slf4j; @Slf4j public final class SocketSignalMessageHandler implements CompletionHandler { - private ClientManager clientManager; - private ProtocolManager protocolManager; - private PlatformErrorProtocol platformErrorProtocol; - - /** - * 消息长度 - */ - private short messageLength; - /** - * 缓冲大小 - */ - private final int bufferSize; - /** - * 最大缓存大小 - */ - private final int maxBufferSize; - /** - * 加密工具 - */ - private final Cipher cipher; - /** - * 消息处理 - */ - private final ByteBuffer buffer; - /** - * 终端通道 - */ - private final AsynchronousSocketChannel channel; - - public SocketSignalMessageHandler( - ClientManager clientManager, - ProtocolManager protocolManager, - SocketProperties socketProperties, - AsynchronousSocketChannel channel, - PlatformErrorProtocol platformErrorProtocol - ) { - this.messageLength = 0; - this.bufferSize = socketProperties.getBufferSize(); - this.maxBufferSize = socketProperties.getMaxBufferSize(); - this.cipher = CipherUtils.buildCipher(Cipher.DECRYPT_MODE, socketProperties.getEncrypt(), socketProperties.getEncryptSecret()); - this.buffer = ByteBuffer.allocateDirect(maxBufferSize); - this.channel = channel; - this.clientManager = clientManager; - this.protocolManager = protocolManager; - this.platformErrorProtocol = platformErrorProtocol; - } + private final ClientManager clientManager; + private final ProtocolManager protocolManager; + private final PlatformErrorProtocol platformErrorProtocol; + + /** + * 消息长度 + */ + private short messageLength; + /** + * 缓冲大小 + */ + private final int bufferSize; + /** + * 最大缓存大小 + */ + private final int maxBufferSize; + /** + * 加密工具 + */ + private final Cipher cipher; + /** + * 消息处理 + */ + private final ByteBuffer buffer; + /** + * 终端通道 + */ + private final AsynchronousSocketChannel channel; + + public SocketSignalMessageHandler( + ClientManager clientManager, + ProtocolManager protocolManager, + SocketProperties socketProperties, + PlatformErrorProtocol platformErrorProtocol, + AsynchronousSocketChannel channel + ) { + this.clientManager = clientManager; + this.protocolManager = protocolManager; + this.platformErrorProtocol = platformErrorProtocol; + this.channel = channel; + this.messageLength = 0; + this.bufferSize = socketProperties.getBufferSize(); + this.maxBufferSize = socketProperties.getMaxBufferSize(); + this.cipher = CipherUtils.buildCipher(Cipher.DECRYPT_MODE, socketProperties.getEncrypt(), socketProperties.getEncryptSecret()); + this.buffer = ByteBuffer.allocateDirect(maxBufferSize); + } - /** - * 消息轮询 - */ - public void loopMessage() { - if(this.channel.isOpen()) { - final ByteBuffer buffer = ByteBuffer.allocateDirect(this.bufferSize); - this.channel.read(buffer, buffer, this); - } else { - log.debug("Socket信令消息轮询退出(通道已经关闭)"); - this.close(); - } - } + /** + * 消息轮询 + */ + public void loopMessage() { + if(this.channel.isOpen()) { + final ByteBuffer buffer = ByteBuffer.allocateDirect(this.bufferSize); + this.channel.read(buffer, buffer, this); + } else { + log.debug("Socket信令消息轮询退出(通道已经关闭)"); + this.close(); + } + } - /** - * 关闭通道 - */ - private void close() { - log.debug("Socket信令终端关闭:{}", this.channel); - CloseableUtils.close(this.channel); - this.clientManager.close(this.channel); - } - - @Override - public void completed(Integer result, ByteBuffer buffer) { - if (result == null || result < 0) { - log.warn("Socket信令接收消息失败关闭通道:{}", result); - this.close(); - } else if(result == 0) { - // 消息空轮询 - log.debug("Socket信令接收消息失败(长度):{}", result); - } else { - buffer.flip(); - this.buffer.put(buffer); - while(this.buffer.position() > 0) { - if(this.messageLength <= 0) { - if(this.buffer.position() < Short.BYTES) { - // 不够消息长度 - break; - } else { - this.buffer.flip(); - this.messageLength = this.buffer.getShort(); - this.buffer.compact(); - if(this.messageLength < 0 || this.messageLength > this.maxBufferSize) { - throw MessageCodeException.of("信令消息长度错误:" + this.messageLength); - } - } - } else { - if(this.buffer.position() < this.messageLength) { - // 不够消息长度 - break; - } else { - // 拆包 - final byte[] bytes = new byte[this.messageLength]; - this.messageLength = 0; - this.buffer.flip(); - this.buffer.get(bytes); - this.buffer.compact(); - // 解密 - final String message = this.decrypt(bytes); - log.debug("Socket信令消息:{} - {}", this.channel, message); - // 处理 - this.execute(message.strip()); - } - } - } - } - this.loopMessage(); - } - - @Override - public void failed(Throwable throwable, ByteBuffer buffer) { - log.error("Socket信令终端异常:{}", this.channel, throwable); - this.close(); - } - - /** - * @param bytes 加密消息 - * - * @return 消息 - */ - private String decrypt(byte[] bytes) { + /** + * 关闭通道 + */ + private void close() { + log.debug("Socket信令终端关闭:{}", this.channel); + this.clientManager.close(this.channel); + } + + @Override + public void completed(Integer result, ByteBuffer buffer) { + if (result == null || result < 0) { + log.warn("Socket信令接收消息失败关闭通道:{}", result); + this.close(); + } else if(result == 0) { + // 消息空轮询 + log.debug("Socket信令接收消息失败(长度):{}", result); + } else { + buffer.flip(); + this.buffer.put(buffer); + while(this.buffer.position() > 0) { + if(this.messageLength <= 0) { + if(this.buffer.position() < Short.BYTES) { + // 不够消息长度 + break; + } else { + this.buffer.flip(); + this.messageLength = this.buffer.getShort(); + this.buffer.compact(); + if(this.messageLength < 0 || this.messageLength > this.maxBufferSize) { + throw MessageCodeException.of("信令消息长度错误:" + this.messageLength); + } + } + } else { + if(this.buffer.position() < this.messageLength) { + // 不够消息长度 + break; + } else { + // 拆包 + final byte[] bytes = new byte[this.messageLength]; + this.messageLength = 0; + this.buffer.flip(); + this.buffer.get(bytes); + this.buffer.compact(); + // 解密 + final String message = this.decrypt(bytes); + log.debug("Socket信令消息:{} - {}", this.channel, message); + // 处理 + this.execute(message.strip()); + } + } + } + } + this.loopMessage(); + } + + @Override + public void failed(Throwable throwable, ByteBuffer buffer) { + log.error("Socket信令终端异常:{}", this.channel, throwable); + this.close(); + } + + /** + * @param bytes 加密消息 + * + * @return 消息 + */ + private String decrypt(byte[] bytes) { if(this.cipher != null) { try { return new String(this.cipher.doFinal(bytes)); @@ -163,18 +161,18 @@ public final class SocketSignalMessageHandler implements CompletionHandler { public WebSocketClient(long timeout, Session instance) { super(timeout, instance); - this.ip = (String) instance.getUserProperties().get(Constant.IP); } @Override @@ -38,5 +37,10 @@ public class WebSocketClient extends ClientAdapter { } } } + + @Override + protected String getClientIP(Session instance) { + return (String) instance.getUserProperties().get(Constant.IP); + } } diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/room/RoomCreateProtocol.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/room/RoomCreateProtocol.java index 01b6ddf..afb7c82 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/room/RoomCreateProtocol.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/room/RoomCreateProtocol.java @@ -67,7 +67,7 @@ public class RoomCreateProtocol extends ProtocolClientAdapter implements Applica ); message.setBody(room.getRoomStatus()); // 通知媒体终端 - this.clientManager.broadcast(message, ClientType.MEDIA_CLIENT); + this.clientManager.broadcast(message, ClientType.MEDIA_CLIENT_TYPE); } else { this.logNoAdapter(clientType); }