[*] 日常优化

This commit is contained in:
acgist
2023-07-20 08:32:14 +08:00
parent 2c2449fd3a
commit eb84cc3d8d
11 changed files with 627 additions and 585 deletions

View File

@@ -58,6 +58,7 @@ public abstract class ClientAdapter<T extends AutoCloseable> implements Client {
* @param instance 终端实例
*/
protected ClientAdapter(long timeout, T instance) {
this.ip = this.getClientIP(instance);
this.time = System.currentTimeMillis();
this.timeout = timeout;
this.instance = instance;
@@ -151,9 +152,19 @@ public abstract class ClientAdapter<T extends AutoCloseable> implements Client {
@Override
public void close() throws Exception {
log.info("关闭终端实例:{} - {}", this.ip, this.clientId);
this.instance.close();
}
/**
* 解析终端IP
*
* @param instance 终端实例
*
* @return 终端IP
*/
protected abstract String getClientIP(T instance);
@Override
public String toString() {
return this.getClass().getSimpleName() + " - " + this.ip + " - " + this.clientId;

View File

@@ -22,214 +22,212 @@ import lombok.extern.slf4j.Slf4j;
@Slf4j
@Manager
public class ClientManager {
private final ApplicationContext applicationContext;
/**
* 终端列表
*/
private final List<Client> clients;
public ClientManager(ApplicationContext applicationContext) {
private final ApplicationContext applicationContext;
/**
* 终端列表
*/
private final List<Client> clients;
public ClientManager(ApplicationContext applicationContext) {
this.applicationContext = applicationContext;
this.clients = new CopyOnWriteArrayList<>();
this.clients = new CopyOnWriteArrayList<>();
}
@Scheduled(cron = "${taoyao.scheduled.client:0 * * * * ?}")
public void scheduled() {
this.closeTimeout();
}
/**
* 终端打开加入管理
*
* @param client 终端
*/
public void open(Client client) {
this.clients.add(client);
}
/**
* 授权终端单播消息
*
* @param to 接收终端
* @param message 消息
*/
public void unicast(String to, Message message) {
this.clients().stream()
.filter(v -> Objects.equals(to, v.getClientId()))
.forEach(v -> v.push(message));
}
/**
* 授权终端单播消息
*
* @param to 接收终端
* @param message 消息
*/
public void unicast(Client to, Message message) {
this.clients().stream()
.filter(v -> v == to)
.forEach(v -> v.push(message));
}
/**
* 授权终端广播消息
*
* @param message 消息
* @param clientTypes 终端类型
*/
public void broadcast(Message message, ClientType ... clientTypes) {
this.clients(clientTypes).forEach(v -> v.push(message));
}
/**
* 授权终端广播消息
*
* @param from 发送终端
* @param message 消息
* @param clientTypes 终端类型
*/
public void broadcast(String from, Message message, ClientType ... clientTypes) {
this.clients(clientTypes).stream()
.filter(v -> !Objects.equals(from, v.getClientId()))
.forEach(v -> v.push(message));
}
/**
* 授权终端广播消息
*
* @param from 发送终端
* @param message 消息
* @param clientTypes 终端类型
*/
public void broadcast(Client from, Message message, ClientType ... clientTypes) {
this.clients(clientTypes).stream()
.filter(v -> v != from)
.forEach(v -> v.push(message));
}
/**
* @param instance 终端实例
*
* @return 终端(包含授权和未授权)
*/
public Client clients(AutoCloseable instance) {
return this.clients.stream()
.filter(v -> v.getInstance() == instance)
.findFirst()
.orElse(null);
}
/**
* @param clientId 终端ID
*
* @return 授权终端
*/
public Client clients(String clientId) {
return this.clients().stream()
.filter(v -> Objects.equals(clientId, v.getClientId()))
.findFirst()
.orElse(null);
}
/**
* @param clientTypes 终端类型
*
* @return 授权终端列表
*/
public List<Client> clients(ClientType ... clientTypes) {
return this.clients.stream()
.filter(Client::authorized)
.filter(client -> ArrayUtils.isEmpty(clientTypes) || ArrayUtils.contains(clientTypes, client.getClientType()))
.toList();
}
/**
* @param instance 终端实例
*
* @return 终端状态
*/
public ClientStatus status(AutoCloseable instance) {
final Client client = this.clients(instance);
return client == null ? null : client.getStatus();
}
/**
* @param clientId 终端ID
*
* @return 授权终端状态
*/
public ClientStatus status(String clientId) {
final Client client = this.clients(clientId);
return client == null ? null : client.getStatus();
}
@Scheduled(cron = "${taoyao.scheduled.client:0 * * * * ?}")
public void scheduled() {
this.closeTimeout();
}
/**
* 终端打开加入管理
*
* @param client 终端
*/
public void open(Client client) {
this.clients.add(client);
}
/**
* 授权终端单播消息
*
* @param to 接收终端
* @param message 消息
*/
public void unicast(String to, Message message) {
this.clients().stream()
.filter(v -> Objects.equals(to, v.getClientId()))
.forEach(v -> v.push(message));
}
/**
* 授权终端单播消息
*
* @param to 接收终端
* @param message 消息
*/
public void unicast(Client to, Message message) {
this.clients().stream()
.filter(v -> v.getInstance() == to)
.forEach(v -> v.push(message));
}
/**
* 授权终端广播消息
*
* @param message 消息
* @param clientTypes 终端类型
*/
public void broadcast(Message message, ClientType ... clientTypes) {
this.clients(clientTypes).forEach(v -> v.push(message));
}
/**
* 授权终端广播消息
*
* @param from 发送终端
* @param message 消息
* @param clientTypes 终端类型
*/
public void broadcast(String from, Message message, ClientType ... clientTypes) {
this.clients(clientTypes).stream()
.filter(v -> !Objects.equals(from, v.getClientId()))
.forEach(v -> v.push(message));
}
/**
* 授权终端广播消息
*
* @param from 发送终端
* @param message 消息
* @param clientTypes 终端类型
*/
public void broadcast(Client from, Message message, ClientType ... clientTypes) {
this.clients(clientTypes).stream()
.filter(v -> v.getInstance() != from)
.forEach(v -> v.push(message));
}
/**
* @param instance 终端实例
*
* @return 终端
*/
public Client clients(AutoCloseable instance) {
return this.clients.stream()
.filter(v -> v.getInstance() == instance)
.findFirst()
.orElse(null);
}
/**
* @param clientId 终端标识
*
* @return 授权终端
*/
public Client clients(String clientId) {
return this.clients().stream()
.filter(v -> Objects.equals(clientId, v.getClientId()))
.findFirst()
.orElse(null);
}
/**
* @param clientTypes 终端类型
*
* @return 所有授权终端列表
*/
public List<Client> clients(ClientType ... clientTypes) {
return this.clients.stream()
.filter(Client::authorized)
.filter(client -> ArrayUtils.isEmpty(clientTypes) || ArrayUtils.contains(clientTypes, client.getClientType()))
.toList();
}
/**
* @param instance 终端实例
*
* @return 终端状态
*/
public ClientStatus status(AutoCloseable instance) {
final Client client = this.clients(instance);
return client == null ? null : client.getStatus();
}
/**
* @param clientId 终端标识
*
* @return 授权终端状态
*/
public ClientStatus status(String clientId) {
final Client client = this.clients(clientId);
return client == null ? null : client.getStatus();
}
/**
* @param clientTypes 终端类型
*
* @return 所有授权终端状态列表
*/
public List<ClientStatus> status(ClientType ... clientTypes) {
return this.clients(clientTypes).stream()
.map(Client::getStatus)
.toList();
}
/**
* @param clientTypes 终端类型
*
* @return 授权终端状态列表
*/
public List<ClientStatus> status(ClientType ... clientTypes) {
return this.clients(clientTypes).stream()
.map(Client::getStatus)
.toList();
}
/**
* 推送消息
*
* @param instance 终端实例
* @param message 消息
*/
public void push(AutoCloseable instance, Message message) {
final Client client = this.clients(instance);
if(client == null) {
log.warn("推送消息终端无效:{}-{}", instance, message);
return;
}
client.push(message);
}
/**
* 关闭终端
*
* @param instance 终端实例
*/
public void close(AutoCloseable instance) {
final Client client = this.clients(instance);
try {
if(client != null) {
client.close();
} else {
instance.close();
}
} catch (Exception e) {
log.error("关闭终端异常:{}", instance, e);
} finally {
if(client != null) {
// 移除管理
this.clients.remove(client);
// 关闭事件
this.applicationContext.publishEvent(new ClientCloseEvent(client));
}
}
}
/**
* 定时关闭超时终端
*/
private void closeTimeout() {
final int oldSize = this.clients.size();
this.clients.stream()
.filter(v -> v.unauthorized())
.filter(v -> v.timeout())
.forEach(v -> {
log.debug("关闭超时终端:{}", v);
this.close(v);
});
final int newSize = this.clients.size();
log.debug("定时关闭超时终端:{}", newSize - oldSize);
}
/**
* 推送消息
*
* @param instance 终端实例
* @param message 消息
*/
public void push(AutoCloseable instance, Message message) {
final Client client = this.clients(instance);
if(client == null) {
log.warn("推送消息终端无效:{} - {}", instance, message);
return;
}
client.push(message);
}
/**
* 关闭终端
*
* @param instance 终端实例
*/
public void close(AutoCloseable instance) {
final Client client = this.clients(instance);
try {
if(client != null) {
client.close();
} else {
instance.close();
}
} catch (Exception e) {
log.error("关闭终端异常:{}", instance, e);
} finally {
if(client != null) {
this.clients.remove(client);
this.applicationContext.publishEvent(new ClientCloseEvent(client));
}
}
}
/**
* 定时关闭超时终端
*/
private void closeTimeout() {
final int oldSize = this.clients.size();
this.clients.stream()
.filter(v -> v.unauthorized())
.filter(v -> v.timeout())
.forEach(v -> {
log.debug("关闭超时终端:{}", v);
this.close(v);
});
final int newSize = this.clients.size();
log.debug("定时关闭超时终端:{}", newSize - oldSize);
}
}

View File

@@ -37,31 +37,31 @@ public class ClientStatus {
private Double humidity;
@Schema(title = "温度", description = "温度")
private Double temperature;
@Schema(title = "信号强度0~100", description = "信号强度0~100")
private Integer signal;
@Schema(title = "电池电量0~100", description = "电池电量0~100")
private Integer battery;
@Schema(title = "是否发生告警", description = "是否发生告警")
private Boolean alarming;
@Schema(title = "是否正在充电", description = "是否正在充电")
private Boolean charging;
@Schema(title = "终端是否正在录像", description = "终端是否正在录像")
private Boolean clientRecording;
@Schema(title = "服务端是否正在录像", description = "服务端是否正在录像")
private Boolean serverRecording;
@Schema(title = "最后心跳时间", description = "最后心跳时间")
private LocalDateTime lastHeartbeat;
@Schema(title = "终端状态", description = "其他扩展终端状态")
private Map<String, Object> status = new HashMap<>();
@Schema(title = "终端配置", description = "其他扩展终端配置")
private Map<String, Object> config = new HashMap<>();
/**
* 拷贝属性
*
* @param body 消息主体
*/
public void copy(Map<String, Object> body) {
@Schema(title = "信号强度0~100", description = "信号强度0~100")
private Integer signal;
@Schema(title = "电池电量0~100", description = "电池电量0~100")
private Integer battery;
@Schema(title = "是否发生告警", description = "是否发生告警")
private Boolean alarming;
@Schema(title = "是否正在充电", description = "是否正在充电")
private Boolean charging;
@Schema(title = "终端是否正在录像", description = "终端是否正在录像")
private Boolean clientRecording;
@Schema(title = "服务端是否正在录像", description = "服务端是否正在录像")
private Boolean serverRecording;
@Schema(title = "终端状态", description = "其他扩展终端状态")
private Map<String, Object> status = new HashMap<>();
@Schema(title = "终端配置", description = "其他扩展终端配置")
private Map<String, Object> config = new HashMap<>();
@Schema(title = "最后心跳时间", description = "最后心跳时间")
private LocalDateTime lastHeartbeat;
/**
* 拷贝属性
*
* @param body 消息主体
*/
public void copy(Map<String, Object> body) {
this.setLatitude(MapUtils.getDouble(body, Constant.LATITUDE));
this.setLongitude(MapUtils.getDouble(body, Constant.LONGITUDE));
this.setHumidity(MapUtils.getDouble(body, Constant.HUMIDITY));
@@ -74,30 +74,30 @@ public class ClientStatus {
this.status(MapUtils.get(body, Constant.STATUS));
this.config(MapUtils.get(body, Constant.CONFIG));
this.setLastHeartbeat(LocalDateTime.now());
}
/**
* 拷贝状态
*
* @param map 状态
*/
public void status(Map<String, Object> map) {
if(map == null) {
return;
}
map.forEach(this.status::put);
}
/**
* 拷贝配置
*
* @param map 配置
*/
public void config(Map<String, Object> map) {
if(map == null) {
return;
}
map.forEach(this.config::put);
}
}
/**
* 拷贝状态
*
* @param map 状态
*/
public void status(Map<String, Object> map) {
if(map == null) {
return;
}
map.forEach(this.status::put);
}
/**
* 拷贝配置
*
* @param map 配置
*/
public void config(Map<String, Object> map) {
if(map == null) {
return;
}
map.forEach(this.config::put);
}
}

View File

@@ -14,10 +14,25 @@ import lombok.Getter;
@Getter
public enum ClientType {
/**
* 通过浏览器接入的终端
*/
WEB("Web"),
/**
* 媒体服务终端
*/
MEDIA("媒体服务"),
/**
* 没有界面的摄像头
*/
CAMERA("摄像头"),
/**
* 手机APP、平板APP
*/
MOBILE("移动端"),
/**
* 其他智能终端
*/
OTHER("其他终端");
/**
@@ -33,7 +48,10 @@ public enum ClientType {
* @return 是否是媒体终端
*/
public boolean mediaClient() {
return this == WEB || this == CAMERA || this == MOBILE;
return
this == WEB ||
this == CAMERA ||
this == MOBILE;
}
/**
@@ -49,7 +67,8 @@ public enum ClientType {
* @return 类型
*/
public static final ClientType of(String value) {
for (ClientType type : ClientType.values()) {
final ClientType[] types = ClientType.values();
for (ClientType type : types) {
if(type.name().equalsIgnoreCase(value)) {
return type;
}
@@ -58,12 +77,17 @@ public enum ClientType {
}
/**
* 媒体终端
* 媒体终端类型列表
*/
public static final ClientType[] MEDIA_CLIENT = Stream.of(ClientType.values()).filter(ClientType::mediaClient).toArray(ClientType[]::new);
public static final ClientType[] MEDIA_CLIENT_TYPE
=
Stream.of(ClientType.values()).filter(ClientType::mediaClient).toArray(ClientType[]::new);
/**
* 媒体服务
* 媒体服务类型列表
*/
public static final ClientType[] MEDIA_SERVER = Stream.of(ClientType.values()).filter(ClientType::mediaServer).toArray(ClientType[]::new);
public static final ClientType[] MEDIA_SERVER_TYPE
=
Stream.of(ClientType.values()).filter(ClientType::mediaServer).toArray(ClientType[]::new);
}

View File

@@ -36,55 +36,50 @@ public class SocketClient extends ClientAdapter<AsynchronousSocketChannel> {
*/
private final Cipher cipher;
public SocketClient(SocketProperties socketProperties, AsynchronousSocketChannel instance) {
super(socketProperties.getTimeout(), instance);
this.ip = this.clientIp(instance);
this.cipher = CipherUtils.buildCipher(Cipher.ENCRYPT_MODE, socketProperties.getEncrypt(), socketProperties.getEncryptSecret());
}
public SocketClient(SocketProperties socketProperties, AsynchronousSocketChannel instance) {
super(socketProperties.getTimeout(), instance);
this.cipher = CipherUtils.buildCipher(Cipher.ENCRYPT_MODE, socketProperties.getEncrypt(), socketProperties.getEncryptSecret());
}
@Override
public void push(Message message) {
synchronized (this.instance) {
try {
if(this.instance.isOpen()) {
// 加密
final byte[] bytes = this.encrypt(message);
// 发送
final ByteBuffer buffer = ByteBuffer.allocateDirect(Short.BYTES + bytes.length);
buffer.putShort((short) bytes.length);
buffer.put(bytes);
buffer.flip();
final Future<Integer> future = this.instance.write(buffer);
future.get(this.timeout, TimeUnit.MILLISECONDS);
} else {
log.error("Socket终端已经关闭{}", this.instance);
}
} catch (Exception e) {
log.error("Socket终端发送消息异常{}", message, e);
}
}
}
/**
* @param instance 终端实例
*
* @return 终端IP
*/
private String clientIp(AsynchronousSocketChannel instance) {
try {
@Override
public void push(Message message) {
synchronized (this.instance) {
try {
if(this.instance.isOpen()) {
// 加密
final byte[] bytes = this.encrypt(message);
// 发送
final ByteBuffer buffer = ByteBuffer.allocateDirect(Short.BYTES + bytes.length);
buffer.putShort((short) bytes.length);
buffer.put(bytes);
buffer.flip();
final Future<Integer> future = this.instance.write(buffer);
future.get(this.timeout, TimeUnit.MILLISECONDS);
} else {
log.error("Socket终端已经关闭{}", this.instance);
}
} catch (Exception e) {
log.error("Socket终端发送消息异常{}", message, e);
}
}
}
@Override
protected String getClientIP(AsynchronousSocketChannel instance) {
try {
return ((InetSocketAddress) instance.getRemoteAddress()).getHostString();
} catch (IOException e) {
throw MessageCodeException.of(e, "无效终端IP" + instance);
throw MessageCodeException.of(e, "无效终端IP" + instance);
}
}
/**
* @param message 消息
*
* @return 加密消息
*/
private byte[] encrypt(Message message) {
final byte[] bytes = message.toString().getBytes();
}
/**
* @param message 消息
*
* @return 加密消息
*/
private byte[] encrypt(Message message) {
final byte[] bytes = message.toString().getBytes();
if(this.cipher != null) {
try {
return this.cipher.doFinal(bytes);
@@ -93,6 +88,6 @@ public class SocketClient extends ClientAdapter<AsynchronousSocketChannel> {
}
}
return bytes;
}
}
}

View File

@@ -22,106 +22,102 @@ import lombok.extern.slf4j.Slf4j;
/**
* Socket信令
*
* TODO加密
*
* @author acgist
*/
@Slf4j
public class SocketSignal {
private ClientManager clientManager;
private ProtocolManager protocolManager;
private SocketProperties socketProperties;
private PlatformErrorProtocol platformErrorProtocol;
private final ClientManager clientManager;
private final ProtocolManager protocolManager;
private final SocketProperties socketProperties;
private final PlatformErrorProtocol platformErrorProtocol;
/**
* 线程序号
*/
private int index = 0;
/**
* 通道线程池
*/
private AsynchronousChannelGroup group;
/**
* 服务端通道
*/
private AsynchronousServerSocketChannel channel;
public SocketSignal(
ClientManager clientManager,
ProtocolManager protocolManager,
SocketProperties socketProperties,
PlatformErrorProtocol platformErrorProtocol
) {
this.clientManager = clientManager;
this.protocolManager = protocolManager;
this.socketProperties = socketProperties;
this.platformErrorProtocol = platformErrorProtocol;
}
/**
* 初始化服务端
*/
public void init() {
boolean success = true;
final String host = this.socketProperties.getHost();
final Integer port = this.socketProperties.getPort();
try {
final ExecutorService executor = new ThreadPoolExecutor(
this.socketProperties.getMinThread(),
this.socketProperties.getMaxThread(),
this.socketProperties.getKeepAliveTime(),
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(this.socketProperties.getQueueSize()),
this.newThreadFactory()
);
this.group = AsynchronousChannelGroup.withThreadPool(executor);
this.channel = AsynchronousServerSocketChannel.open(this.group);
this.channel.bind(new InetSocketAddress(host, port));
this.channel.accept(this.channel, new SocketSignalAcceptHandler(
this.clientManager,
this.protocolManager,
this.socketProperties,
this.platformErrorProtocol
));
} catch (IOException e) {
log.error("启动Socket信令服务异常", e);
success = false;
} finally {
if(success) {
log.info("启动Socket信令服务成功{}-{}", host, port);
} else {
this.destroy();
}
}
}
/**
* @return 线程池工厂
*/
private ThreadFactory newThreadFactory() {
return (runnable) -> {
final Thread thread = new Thread(runnable);
// 线程名称
synchronized(this) {
if(++this.index > this.socketProperties.getMaxThread()) {
this.index = 0;
}
thread.setName(this.socketProperties.getThreadNamePrefix() + this.index);
}
// 守护线程
thread.setDaemon(true);
return thread;
};
}
@PreDestroy
public void destroy() {
log.debug("关闭Socket信令服务{}", this.channel);
CloseableUtils.close(this.channel);
if(this.group != null) {
this.group.shutdown();
}
}
/**
* 线程序号
*/
private int index = 0;
/**
* 通道线程池
*/
private AsynchronousChannelGroup group;
/**
* 服务端通道
*/
private AsynchronousServerSocketChannel server;
public SocketSignal(
ClientManager clientManager,
ProtocolManager protocolManager,
SocketProperties socketProperties,
PlatformErrorProtocol platformErrorProtocol
) {
this.clientManager = clientManager;
this.protocolManager = protocolManager;
this.socketProperties = socketProperties;
this.platformErrorProtocol = platformErrorProtocol;
}
/**
* 初始化服务端
*/
public void init() {
boolean success = true;
final String host = this.socketProperties.getHost();
final Integer port = this.socketProperties.getPort();
try {
final ExecutorService executor = new ThreadPoolExecutor(
this.socketProperties.getMinThread(),
this.socketProperties.getMaxThread(),
this.socketProperties.getKeepAliveTime(),
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(this.socketProperties.getQueueSize()),
this.newThreadFactory()
);
this.group = AsynchronousChannelGroup.withThreadPool(executor);
this.server = AsynchronousServerSocketChannel.open(this.group);
this.server.bind(new InetSocketAddress(host, port));
this.server.accept(this.server, new SocketSignalAcceptHandler(
this.clientManager,
this.protocolManager,
this.socketProperties,
this.platformErrorProtocol
));
} catch (IOException e) {
log.error("启动Socket信令服务异常", e);
success = false;
} finally {
if(success) {
log.info("启动Socket信令服务成功{} - {}", host, port);
} else {
this.destroy();
}
}
}
/**
* @return 线程池工厂
*/
private ThreadFactory newThreadFactory() {
return (runnable) -> {
final Thread thread = new Thread(runnable);
// 线程名称
synchronized(this) {
if(++this.index > this.socketProperties.getMaxThread()) {
this.index = 0;
}
thread.setName(this.socketProperties.getThreadNamePrefix() + this.index);
}
// 守护线程
thread.setDaemon(true);
return thread;
};
}
@PreDestroy
public void destroy() {
log.debug("关闭Socket信令服务{}", this.server);
CloseableUtils.close(this.server);
CloseableUtils.shutdown(this.group);
}
}

View File

@@ -21,47 +21,47 @@ import lombok.extern.slf4j.Slf4j;
@Slf4j
public final class SocketSignalAcceptHandler implements CompletionHandler<AsynchronousSocketChannel, AsynchronousServerSocketChannel> {
private ClientManager clientManager;
private ProtocolManager protocolManager;
private SocketProperties socketProperties;
private PlatformErrorProtocol platformErrorProtocol;
public SocketSignalAcceptHandler(
ClientManager clientManager,
ProtocolManager protocolManager,
SocketProperties socketProperties,
PlatformErrorProtocol platformErrorProtocol
) {
this.clientManager = clientManager;
this.protocolManager = protocolManager;
this.socketProperties = socketProperties;
this.platformErrorProtocol = platformErrorProtocol;
}
private final ClientManager clientManager;
private final ProtocolManager protocolManager;
private final SocketProperties socketProperties;
private final PlatformErrorProtocol platformErrorProtocol;
public SocketSignalAcceptHandler(
ClientManager clientManager,
ProtocolManager protocolManager,
SocketProperties socketProperties,
PlatformErrorProtocol platformErrorProtocol
) {
this.clientManager = clientManager;
this.protocolManager = protocolManager;
this.socketProperties = socketProperties;
this.platformErrorProtocol = platformErrorProtocol;
}
@Override
public void completed(AsynchronousSocketChannel channel, AsynchronousServerSocketChannel server) {
try {
channel.setOption(StandardSocketOptions.SO_KEEPALIVE, Boolean.TRUE);
this.clientManager.open(new SocketClient(this.socketProperties, channel));
final SocketSignalMessageHandler messageHandler = new SocketSignalMessageHandler(
this.clientManager,
this.protocolManager,
this.socketProperties,
channel,
this.platformErrorProtocol
);
messageHandler.loopMessage();
log.debug("Socket信令终端连接成功{}", channel);
} catch (IOException e) {
log.error("Socket信令终端连接异常", channel, e);
} finally {
server.accept(server, this);
}
}
@Override
public void failed(Throwable throwable, AsynchronousServerSocketChannel server) {
log.error("Socket信令终端连接异常{}", server, throwable);
}
@Override
public void completed(AsynchronousSocketChannel channel, AsynchronousServerSocketChannel server) {
try {
channel.setOption(StandardSocketOptions.SO_KEEPALIVE, Boolean.TRUE);
this.clientManager.open(new SocketClient(this.socketProperties, channel));
final SocketSignalMessageHandler messageHandler = new SocketSignalMessageHandler(
this.clientManager,
this.protocolManager,
this.socketProperties,
this.platformErrorProtocol,
channel
);
messageHandler.loopMessage();
log.debug("Socket信令终端连接成功{}", channel);
} catch (IOException e) {
log.error("Socket信令终端连接异常", channel, e);
} finally {
server.accept(server, this);
}
}
@Override
public void failed(Throwable throwable, AsynchronousServerSocketChannel server) {
log.error("Socket信令终端连接异常{}", server, throwable);
}
}

View File

@@ -10,7 +10,6 @@ import javax.crypto.IllegalBlockSizeException;
import com.acgist.taoyao.boot.config.SocketProperties;
import com.acgist.taoyao.boot.model.MessageCodeException;
import com.acgist.taoyao.boot.utils.CloseableUtils;
import com.acgist.taoyao.signal.client.ClientManager;
import com.acgist.taoyao.signal.protocol.ProtocolManager;
import com.acgist.taoyao.signal.protocol.platform.PlatformErrorProtocol;
@@ -27,134 +26,133 @@ import lombok.extern.slf4j.Slf4j;
@Slf4j
public final class SocketSignalMessageHandler implements CompletionHandler<Integer, ByteBuffer> {
private ClientManager clientManager;
private ProtocolManager protocolManager;
private PlatformErrorProtocol platformErrorProtocol;
/**
* 消息长度
*/
private short messageLength;
/**
* 缓冲大小
*/
private final int bufferSize;
/**
* 最大缓存大小
*/
private final int maxBufferSize;
/**
* 加密工具
*/
private final Cipher cipher;
/**
* 消息处理
*/
private final ByteBuffer buffer;
/**
* 终端通道
*/
private final AsynchronousSocketChannel channel;
public SocketSignalMessageHandler(
ClientManager clientManager,
ProtocolManager protocolManager,
SocketProperties socketProperties,
AsynchronousSocketChannel channel,
PlatformErrorProtocol platformErrorProtocol
) {
this.messageLength = 0;
this.bufferSize = socketProperties.getBufferSize();
this.maxBufferSize = socketProperties.getMaxBufferSize();
this.cipher = CipherUtils.buildCipher(Cipher.DECRYPT_MODE, socketProperties.getEncrypt(), socketProperties.getEncryptSecret());
this.buffer = ByteBuffer.allocateDirect(maxBufferSize);
this.channel = channel;
this.clientManager = clientManager;
this.protocolManager = protocolManager;
this.platformErrorProtocol = platformErrorProtocol;
}
private final ClientManager clientManager;
private final ProtocolManager protocolManager;
private final PlatformErrorProtocol platformErrorProtocol;
/**
* 消息长度
*/
private short messageLength;
/**
* 缓冲大小
*/
private final int bufferSize;
/**
* 最大缓存大小
*/
private final int maxBufferSize;
/**
* 加密工具
*/
private final Cipher cipher;
/**
* 消息处理
*/
private final ByteBuffer buffer;
/**
* 终端通道
*/
private final AsynchronousSocketChannel channel;
public SocketSignalMessageHandler(
ClientManager clientManager,
ProtocolManager protocolManager,
SocketProperties socketProperties,
PlatformErrorProtocol platformErrorProtocol,
AsynchronousSocketChannel channel
) {
this.clientManager = clientManager;
this.protocolManager = protocolManager;
this.platformErrorProtocol = platformErrorProtocol;
this.channel = channel;
this.messageLength = 0;
this.bufferSize = socketProperties.getBufferSize();
this.maxBufferSize = socketProperties.getMaxBufferSize();
this.cipher = CipherUtils.buildCipher(Cipher.DECRYPT_MODE, socketProperties.getEncrypt(), socketProperties.getEncryptSecret());
this.buffer = ByteBuffer.allocateDirect(maxBufferSize);
}
/**
* 消息轮询
*/
public void loopMessage() {
if(this.channel.isOpen()) {
final ByteBuffer buffer = ByteBuffer.allocateDirect(this.bufferSize);
this.channel.read(buffer, buffer, this);
} else {
log.debug("Socket信令消息轮询退出通道已经关闭");
this.close();
}
}
/**
* 消息轮询
*/
public void loopMessage() {
if(this.channel.isOpen()) {
final ByteBuffer buffer = ByteBuffer.allocateDirect(this.bufferSize);
this.channel.read(buffer, buffer, this);
} else {
log.debug("Socket信令消息轮询退出通道已经关闭");
this.close();
}
}
/**
* 关闭通道
*/
private void close() {
log.debug("Socket信令终端关闭{}", this.channel);
CloseableUtils.close(this.channel);
this.clientManager.close(this.channel);
}
@Override
public void completed(Integer result, ByteBuffer buffer) {
if (result == null || result < 0) {
log.warn("Socket信令接收消息失败关闭通道{}", result);
this.close();
} else if(result == 0) {
// 消息空轮询
log.debug("Socket信令接收消息失败长度{}", result);
} else {
buffer.flip();
this.buffer.put(buffer);
while(this.buffer.position() > 0) {
if(this.messageLength <= 0) {
if(this.buffer.position() < Short.BYTES) {
// 不够消息长度
break;
} else {
this.buffer.flip();
this.messageLength = this.buffer.getShort();
this.buffer.compact();
if(this.messageLength < 0 || this.messageLength > this.maxBufferSize) {
throw MessageCodeException.of("信令消息长度错误:" + this.messageLength);
}
}
} else {
if(this.buffer.position() < this.messageLength) {
// 不够消息长度
break;
} else {
// 拆包
final byte[] bytes = new byte[this.messageLength];
this.messageLength = 0;
this.buffer.flip();
this.buffer.get(bytes);
this.buffer.compact();
// 解密
final String message = this.decrypt(bytes);
log.debug("Socket信令消息{} - {}", this.channel, message);
// 处理
this.execute(message.strip());
}
}
}
}
this.loopMessage();
}
@Override
public void failed(Throwable throwable, ByteBuffer buffer) {
log.error("Socket信令终端异常{}", this.channel, throwable);
this.close();
}
/**
* @param bytes 加密消息
*
* @return 消息
*/
private String decrypt(byte[] bytes) {
/**
* 关闭通道
*/
private void close() {
log.debug("Socket信令终端关闭{}", this.channel);
this.clientManager.close(this.channel);
}
@Override
public void completed(Integer result, ByteBuffer buffer) {
if (result == null || result < 0) {
log.warn("Socket信令接收消息失败关闭通道{}", result);
this.close();
} else if(result == 0) {
// 消息空轮询
log.debug("Socket信令接收消息失败长度{}", result);
} else {
buffer.flip();
this.buffer.put(buffer);
while(this.buffer.position() > 0) {
if(this.messageLength <= 0) {
if(this.buffer.position() < Short.BYTES) {
// 不够消息长度
break;
} else {
this.buffer.flip();
this.messageLength = this.buffer.getShort();
this.buffer.compact();
if(this.messageLength < 0 || this.messageLength > this.maxBufferSize) {
throw MessageCodeException.of("信令消息长度错误:" + this.messageLength);
}
}
} else {
if(this.buffer.position() < this.messageLength) {
// 不够消息长度
break;
} else {
// 拆包
final byte[] bytes = new byte[this.messageLength];
this.messageLength = 0;
this.buffer.flip();
this.buffer.get(bytes);
this.buffer.compact();
// 解密
final String message = this.decrypt(bytes);
log.debug("Socket信令消息{} - {}", this.channel, message);
// 处理
this.execute(message.strip());
}
}
}
}
this.loopMessage();
}
@Override
public void failed(Throwable throwable, ByteBuffer buffer) {
log.error("Socket信令终端异常{}", this.channel, throwable);
this.close();
}
/**
* @param bytes 加密消息
*
* @return 消息
*/
private String decrypt(byte[] bytes) {
if(this.cipher != null) {
try {
return new String(this.cipher.doFinal(bytes));
@@ -163,18 +161,18 @@ public final class SocketSignalMessageHandler implements CompletionHandler<Integ
}
}
return new String(bytes);
}
/**
* @param message 消息
*/
private void execute(String message) {
}
/**
* @param message 消息
*/
private void execute(String message) {
try {
this.protocolManager.execute(message, this.channel);
} catch (Exception e) {
log.error("处理Socket信令消息异常{} - {}", this.clientManager.clients(this.channel), message, e);
this.clientManager.push(this.channel, this.platformErrorProtocol.build(e));
}
}
}
}

View File

@@ -21,7 +21,6 @@ public class WebSocketClient extends ClientAdapter<Session> {
public WebSocketClient(long timeout, Session instance) {
super(timeout, instance);
this.ip = (String) instance.getUserProperties().get(Constant.IP);
}
@Override
@@ -38,5 +37,10 @@ public class WebSocketClient extends ClientAdapter<Session> {
}
}
}
@Override
protected String getClientIP(Session instance) {
return (String) instance.getUserProperties().get(Constant.IP);
}
}

View File

@@ -67,7 +67,7 @@ public class RoomCreateProtocol extends ProtocolClientAdapter implements Applica
);
message.setBody(room.getRoomStatus());
// 通知媒体终端
this.clientManager.broadcast(message, ClientType.MEDIA_CLIENT);
this.clientManager.broadcast(message, ClientType.MEDIA_CLIENT_TYPE);
} else {
this.logNoAdapter(clientType);
}