[+] 完善socket信令

This commit is contained in:
acgist
2023-02-14 08:14:23 +08:00
parent 78fa48c35c
commit 778b933000
59 changed files with 751 additions and 679 deletions

View File

@@ -164,6 +164,21 @@ public class ClientManager {
.map(Client::status)
.toList();
}
/**
* 发送消息
*
* @param instance 会话实例
* @param message 消息
*/
public void send(AutoCloseable instance, Message message) {
final Client client = this.client(instance);
if(client == null) {
log.warn("发送消息终端无效:{}-{}", instance, message);
return;
}
client.push(message);
}
/**
* 关闭会话
@@ -186,7 +201,7 @@ public class ClientManager {
// 移除管理
this.clients.remove(client);
// 关闭事件
this.applicationContext.publishEvent(new ClientCloseEvent(client, null));
this.applicationContext.publishEvent(new ClientCloseEvent(null, client));
}
}
}

View File

@@ -1,42 +0,0 @@
package com.acgist.taoyao.signal.client.socket;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import com.acgist.taoyao.signal.client.ClientManager;
import com.acgist.taoyao.signal.protocol.ProtocolManager;
import lombok.extern.slf4j.Slf4j;
/**
* Socket信令接收
*
* @author acgist
*/
@Slf4j
public final class SocketAcceptHandler implements CompletionHandler<AsynchronousSocketChannel, AsynchronousServerSocketChannel> {
private ClientManager clientManager;
private ProtocolManager protocolManager;
public SocketAcceptHandler(ClientManager clientManager, ProtocolManager protocolManager) {
this.clientManager = clientManager;
this.protocolManager = protocolManager;
}
@Override
public void completed(AsynchronousSocketChannel channel, AsynchronousServerSocketChannel server) {
log.debug("Socket信令连接成功{}", channel);
this.clientManager.open(new SocketClient(channel));
final SocketMessageHandler socketMessageHandler = new SocketMessageHandler(this.clientManager, this.protocolManager);
socketMessageHandler.handle(channel);
server.accept(server, this);
}
@Override
public void failed(Throwable throwable, AsynchronousServerSocketChannel server) {
log.error("Socket信令连接异常{}", server, throwable);
}
}

View File

@@ -1,19 +1,22 @@
package com.acgist.taoyao.signal.client.socket;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import com.acgist.taoyao.boot.model.Message;
import com.acgist.taoyao.signal.client.ClientAdapter;
import com.acgist.taoyao.signal.protocol.Constant;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
/**
* Socket会话
* Socket终端
*
* @author acgist
*/
@@ -22,24 +25,49 @@ import lombok.extern.slf4j.Slf4j;
@Setter
public class SocketClient extends ClientAdapter<AsynchronousSocketChannel> {
public SocketClient(AsynchronousSocketChannel instance) {
/**
* 发送超时时间
*/
private final long timeout;
/**
* 换行符号
*/
private final byte[] line;
/**
* 换行符号长度
*/
private final int lineLength;
public SocketClient(Integer timeout, AsynchronousSocketChannel instance) {
super(instance);
this.timeout = timeout;
this.line = Constant.LINE.getBytes();
this.lineLength = this.line.length;
try {
this.ip = ((InetSocketAddress) instance.getRemoteAddress()).getHostString();
} catch (IOException e) {
log.error("Socket信令获取远程IP异常", e);
}
}
@Override
public void push(Message message) {
try {
synchronized (this.instance) {
if(this.instance.isOpen()) {
final byte[] bytes = message.toString().getBytes();
final ByteBuffer buffer = ByteBuffer.allocateDirect(bytes.length + this.lineLength);
buffer.put(bytes);
buffer.put(this.line);
buffer.flip();
final Future<Integer> future = this.instance.write(buffer);
future.get(this.timeout, TimeUnit.MILLISECONDS);
} else {
log.error("Socket信令已经关闭{}", this.instance);
}
}
if(this.instance.isOpen()) {
final Future<Integer> future = this.instance.write(ByteBuffer.wrap(message.toString().getBytes()));
future.get();
// TODO超时
} else {
log.error("会话已经关闭:{}", this.instance);
}
} catch (InterruptedException | ExecutionException e) {
log.error("Socket发送消息异常{}", message, e);
} catch (Exception e) {
log.error("Socket信令发送消息异常{}", message, e);
}
}

View File

@@ -1,76 +0,0 @@
package com.acgist.taoyao.signal.client.socket;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import com.acgist.taoyao.boot.utils.CloseableUtils;
import com.acgist.taoyao.signal.client.ClientManager;
import com.acgist.taoyao.signal.protocol.ProtocolManager;
import lombok.extern.slf4j.Slf4j;
/**
* Socket信令消息
*
* @author acgist
*/
@Slf4j
public final class SocketMessageHandler implements CompletionHandler<Integer, ByteBuffer> {
private ClientManager clientManager;
private ProtocolManager protocolManager;
private AsynchronousSocketChannel channel;
public SocketMessageHandler(ClientManager clientManager, ProtocolManager protocolManager) {
this.clientManager = clientManager;
this.protocolManager = protocolManager;
}
public void handle(AsynchronousSocketChannel channel) {
this.channel = channel;
this.waitMessage();
}
/**
* 消息轮询
*/
private void waitMessage() {
if(this.channel.isOpen()) {
final ByteBuffer buffer = ByteBuffer.allocateDirect(2048);
this.channel.read(buffer, buffer, this);
} else {
log.debug("Socket信令消息退出消息轮询");
}
}
private void close() {
CloseableUtils.close(this.channel);
}
@Override
public void completed(Integer result, ByteBuffer buffer) {
if (result == null) {
this.close();
} else if(result == -1) {
// 服务端关闭
this.close();
} else if(result == 0) {
// 消息空轮询
log.debug("Socket信令消息接收失败长度{}", result);
} else {
final byte[] bytes = new byte[buffer.remaining()];
buffer.get(bytes);
this.protocolManager.execute(new String(bytes), this.channel);
}
this.waitMessage();
}
@Override
public void failed(Throwable throwable, ByteBuffer buffer) {
log.error("Socket信令消息处理异常{}", this.channel, throwable);
this.close();
}
}

View File

@@ -10,12 +10,13 @@ import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import com.acgist.taoyao.boot.model.MessageCodeException;
import com.acgist.taoyao.boot.property.SocketProperties;
import com.acgist.taoyao.boot.utils.CloseableUtils;
import com.acgist.taoyao.signal.client.ClientManager;
import com.acgist.taoyao.signal.protocol.ProtocolManager;
import com.acgist.taoyao.signal.protocol.platform.PlatformErrorProtocol;
import jakarta.annotation.PreDestroy;
import lombok.extern.slf4j.Slf4j;
/**
@@ -25,22 +26,42 @@ import lombok.extern.slf4j.Slf4j;
*/
@Slf4j
public class SocketSignal {
private int index = 0;
private ClientManager clientManager;
private AsynchronousChannelGroup group;
private ProtocolManager protocolManager;
private SocketProperties socketProperties;
private PlatformErrorProtocol platformErrorProtocol;
/**
* 线程序号
*/
private int index = 0;
/**
* 通道线程池
*/
private AsynchronousChannelGroup group;
/**
* 服务端通道
*/
private AsynchronousServerSocketChannel channel;
public SocketSignal(
ClientManager clientManager,
ProtocolManager protocolManager,
SocketProperties socketProperties
SocketProperties socketProperties,
PlatformErrorProtocol platformErrorProtocol
) {
this.clientManager = clientManager;
this.protocolManager = protocolManager;
this.socketProperties = socketProperties;
this.platformErrorProtocol = platformErrorProtocol;
}
/**
* 初始化服务端
*/
public void init() {
boolean success = true;
try {
final ExecutorService executor = new ThreadPoolExecutor(
this.socketProperties.getThreadMin(),
@@ -48,11 +69,26 @@ public class SocketSignal {
this.socketProperties.getKeepAliveTime(),
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(this.socketProperties.getQueueSize()),
newThreadFactory()
this.newThreadFactory()
);
this.group = AsynchronousChannelGroup.withThreadPool(executor);
this.channel = AsynchronousServerSocketChannel.open(this.group);
this.channel.bind(new InetSocketAddress(this.socketProperties.getHost(), this.socketProperties.getPort()));
this.channel.accept(this.channel, new SocketSignalAcceptHandler(
this.clientManager,
this.protocolManager,
this.socketProperties,
this.platformErrorProtocol
));
} catch (IOException e) {
throw MessageCodeException.of(e, "创建Socket信令失败");
log.error("启动Socket信令服务异常", e);
success = false;
} finally {
if(success) {
log.info("启动Socket信令服务{}-{}", this.socketProperties.getHost(), this.socketProperties.getPort());
} else {
this.destroy();
}
}
}
@@ -74,35 +110,9 @@ public class SocketSignal {
return thread;
};
}
/**
* 开启监听
*
* @return 是否成功
*/
public boolean listen() {
boolean success = true;
try {
this.channel = AsynchronousServerSocketChannel.open(this.group);
this.channel.bind(new InetSocketAddress(this.socketProperties.getHost(), this.socketProperties.getPort()));
this.channel.accept(this.channel, new SocketAcceptHandler(this.clientManager, this.protocolManager));
} catch (IOException e) {
log.error("启动Socket信令服务异常", e);
success = false;
} finally {
if(success) {
log.info("启动Socket信令服务{}-{}", this.socketProperties.getHost(), this.socketProperties.getPort());
} else {
this.shutdown();
}
}
return success;
}
/**
* 关闭Socket信令服务
*/
public void shutdown() {
@PreDestroy
public void destroy() {
log.debug("关闭Socket信令服务");
CloseableUtils.close(this.channel);
this.group.shutdown();

View File

@@ -0,0 +1,67 @@
package com.acgist.taoyao.signal.client.socket;
import java.io.IOException;
import java.net.StandardSocketOptions;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import com.acgist.taoyao.boot.property.SocketProperties;
import com.acgist.taoyao.signal.client.ClientManager;
import com.acgist.taoyao.signal.protocol.ProtocolManager;
import com.acgist.taoyao.signal.protocol.platform.PlatformErrorProtocol;
import lombok.extern.slf4j.Slf4j;
/**
* Socket信令接收处理器
*
* @author acgist
*/
@Slf4j
public final class SocketSignalAcceptHandler implements CompletionHandler<AsynchronousSocketChannel, AsynchronousServerSocketChannel> {
private ClientManager clientManager;
private ProtocolManager protocolManager;
private SocketProperties socketProperties;
private PlatformErrorProtocol platformErrorProtocol;
public SocketSignalAcceptHandler(
ClientManager clientManager,
ProtocolManager protocolManager,
SocketProperties socketProperties,
PlatformErrorProtocol platformErrorProtocol
) {
this.clientManager = clientManager;
this.protocolManager = protocolManager;
this.socketProperties = socketProperties;
this.platformErrorProtocol = platformErrorProtocol;
}
@Override
public void completed(AsynchronousSocketChannel channel, AsynchronousServerSocketChannel server) {
try {
channel.setOption(StandardSocketOptions.SO_KEEPALIVE, Boolean.TRUE);
this.clientManager.open(new SocketClient(this.socketProperties.getTimeout(), channel));
final SocketSignalMessageHandler messageHandler = new SocketSignalMessageHandler(
this.socketProperties.getBufferSize(),
this.clientManager,
this.protocolManager,
channel,
this.platformErrorProtocol
);
messageHandler.loopMessage();
log.debug("Socket信令连接成功{}", channel);
} catch (IOException e) {
log.error("Socket信令连接异常", e);
} finally {
server.accept(server, this);
}
}
@Override
public void failed(Throwable throwable, AsynchronousServerSocketChannel server) {
log.error("Socket信令连接异常{}", server, throwable);
}
}

View File

@@ -0,0 +1,118 @@
package com.acgist.taoyao.signal.client.socket;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import com.acgist.taoyao.boot.utils.CloseableUtils;
import com.acgist.taoyao.signal.client.ClientManager;
import com.acgist.taoyao.signal.protocol.Constant;
import com.acgist.taoyao.signal.protocol.ProtocolManager;
import com.acgist.taoyao.signal.protocol.platform.PlatformErrorProtocol;
import lombok.extern.slf4j.Slf4j;
/**
* Socket信令消息处理器
*
* @author acgist
*/
@Slf4j
public final class SocketSignalMessageHandler implements CompletionHandler<Integer, ByteBuffer> {
private ClientManager clientManager;
private ProtocolManager protocolManager;
private AsynchronousSocketChannel channel;
private PlatformErrorProtocol platformErrorProtocol;
/**
* 换行符号
*/
private String line;
/**
* 换行符号长度
*/
private int lineLength;
/**
* 缓冲大小
*/
private int bufferSize;
/**
* 消息缓存
*/
private StringBuilder builder;
public SocketSignalMessageHandler(
int bufferSize,
ClientManager clientManager,
ProtocolManager protocolManager,
AsynchronousSocketChannel channel,
PlatformErrorProtocol platformErrorProtocol
) {
this.line = Constant.LINE;
this.lineLength = this.line.length();
this.builder = new StringBuilder();
this.channel = channel;
this.bufferSize = bufferSize;
this.clientManager = clientManager;
this.protocolManager = protocolManager;
this.platformErrorProtocol = platformErrorProtocol;
}
/**
* 消息轮询
*/
public void loopMessage() {
if(this.channel.isOpen()) {
final ByteBuffer buffer = ByteBuffer.allocateDirect(this.bufferSize);
this.channel.read(buffer, buffer, this);
} else {
log.debug("Socket信令退出消息轮询");
}
}
/**
* 关闭通道
*/
private void close() {
CloseableUtils.close(this.channel);
}
@Override
public void completed(Integer result, ByteBuffer buffer) {
if (result == null) {
this.close();
} else if(result < 0) {
// 服务端关闭
this.close();
} else if(result == 0) {
// 消息空轮询
log.debug("Socket信令接收消息失败长度{}", result);
} else {
buffer.flip();
final byte[] bytes = new byte[buffer.remaining()];
buffer.get(bytes);
this.builder.append(new String(bytes));
int index = 0;
while((index = this.builder.indexOf(this.line)) >= 0) {
final String message = this.builder.substring(0, index);
this.builder.delete(0, index + this.lineLength);
log.debug("Socket信令消息{}-{}", this.channel, message);
try {
this.protocolManager.execute(message.strip(), this.channel);
} catch (Exception e) {
log.error("处理Socket信令消息异常{}", message, e);
this.clientManager.send(this.channel, this.platformErrorProtocol.build(e));
}
}
}
this.loopMessage();
}
@Override
public void failed(Throwable throwable, ByteBuffer buffer) {
log.error("Socket信令异常{}", this.channel, throwable);
this.close();
}
}

View File

@@ -4,14 +4,13 @@ import com.acgist.taoyao.boot.model.Message;
import com.acgist.taoyao.boot.utils.WebSocketUtils;
import com.acgist.taoyao.signal.client.ClientAdapter;
import jakarta.websocket.RemoteEndpoint;
import jakarta.websocket.Session;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
/**
* WebSocket会话
* WebSocket终端
*
* @author acgist
*/
@@ -20,15 +19,9 @@ import lombok.extern.slf4j.Slf4j;
@Setter
public class WebSocketClient extends ClientAdapter<Session> {
/**
* 输出
*/
private RemoteEndpoint.Basic basic;
public WebSocketClient(Session instance) {
super(instance);
this.ip = WebSocketUtils.getRemoteAddress(instance);
this.basic = instance.getBasicRemote();
}
@Override
@@ -36,12 +29,12 @@ public class WebSocketClient extends ClientAdapter<Session> {
synchronized (this.instance) {
try {
if(this.instance.isOpen()) {
this.basic.sendText(message.toString(), true);
this.instance.getBasicRemote().sendText(message.toString(), true);
} else {
log.error("会话已经关闭:{}", this.instance);
log.error("WebSocket信令已经关闭:{}", this.instance);
}
} catch (Exception e) {
log.error("WebSocket发送消息异常{}", message, e);
log.error("WebSocket信令发送消息异常:{}", message, e);
}
}
}

View File

@@ -2,8 +2,6 @@ package com.acgist.taoyao.signal.client.websocket;
import org.springframework.beans.factory.annotation.Autowired;
import com.acgist.taoyao.boot.model.Message;
import com.acgist.taoyao.boot.model.MessageCodeException;
import com.acgist.taoyao.signal.client.ClientManager;
import com.acgist.taoyao.signal.protocol.ProtocolManager;
import com.acgist.taoyao.signal.protocol.platform.PlatformErrorProtocol;
@@ -31,58 +29,33 @@ public class WebSocketSignal {
@OnOpen
public void open(Session session) {
log.debug("会话连接{}", session);
log.debug("WebSocket信令连接成功{}", session);
WebSocketSignal.clientManager.open(new WebSocketClient(session));
}
@OnMessage
public void message(Session session, String message) {
log.debug("会话消息:{}-{}", session, message);
log.debug("WebSocket信令消息:{}-{}", session, message);
try {
WebSocketSignal.protocolManager.execute(message.strip(), session);
} catch (Exception e) {
log.error("处理会话消息异常", e);
final Message errorMessage = WebSocketSignal.platformErrorProtocol.build();
if(e instanceof MessageCodeException code) {
errorMessage.setCode(code.getCode(), code.getMessage());
}
errorMessage.setBody(e.getMessage());
this.push(session, errorMessage);
log.error("处理WebSocket信令消息异常{}", message, e);
WebSocketSignal.clientManager.send(session, WebSocketSignal.platformErrorProtocol.build(e));
}
}
@OnClose
public void close(Session session) {
log.debug("会话关闭:{}", session);
log.debug("WebSocket信令关闭:{}", session);
WebSocketSignal.clientManager.close(session);
}
@OnError
public void error(Session session, Throwable e) {
log.error("会话异常:{}", session, e);
log.error("WebSocket信令异常:{}", session, e);
this.close(session);
}
/**
* 推送消息
*
* @param session 会话
* @param message 消息
*/
private void push(Session session, Message message) {
synchronized (session) {
try {
if(session.isOpen()) {
session.getBasicRemote().sendText(message.toString());
} else {
log.error("会话已经关闭:{}", session);
}
} catch (Exception e) {
log.error("推送消息异常:{}", message, e);
}
}
}
@Autowired
public void setClientManager(ClientManager clientManager) {
WebSocketSignal.clientManager = clientManager;

View File

@@ -0,0 +1,67 @@
package com.acgist.taoyao.signal.config;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import com.acgist.taoyao.signal.protocol.media.MediaRebootProtocol;
import com.acgist.taoyao.signal.protocol.media.MediaShutdownProtocol;
import com.acgist.taoyao.signal.protocol.platform.PlatformRebootProtocol;
import com.acgist.taoyao.signal.protocol.platform.PlatformScriptProtocol;
import com.acgist.taoyao.signal.protocol.platform.PlatformShutdownProtocol;
import com.acgist.taoyao.signal.protocol.system.SystemRebootProtocol;
import com.acgist.taoyao.signal.protocol.system.SystemShutdownProtocol;
/**
* 脚本自动配置
*
* @author acgist
*/
@Configuration
@ConditionalOnProperty(prefix = "taoyao.script", name = "enabled", havingValue = "true", matchIfMissing = true)
public class ScriptAutoConfiguration {
@Bean
@ConditionalOnMissingBean
public MediaRebootProtocol mediaRebootProtocol() {
return new MediaRebootProtocol();
}
@Bean
@ConditionalOnMissingBean
public MediaShutdownProtocol mediaShutdownProtocol() {
return new MediaShutdownProtocol();
}
@Bean
@ConditionalOnMissingBean
public SystemRebootProtocol systemRebootProtocol() {
return new SystemRebootProtocol();
}
@Bean
@ConditionalOnMissingBean
public SystemShutdownProtocol systemShutdownProtocol() {
return new SystemShutdownProtocol();
}
@Bean
@ConditionalOnMissingBean
public PlatformScriptProtocol platformScriptProtocol() {
return new PlatformScriptProtocol();
}
@Bean
@ConditionalOnMissingBean
public PlatformRebootProtocol platformRebootProtocol() {
return new PlatformRebootProtocol();
}
@Bean
@ConditionalOnMissingBean
public PlatformShutdownProtocol platformShutdownProtocol() {
return new PlatformShutdownProtocol();
}
}

View File

@@ -3,25 +3,13 @@ package com.acgist.taoyao.signal.config;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
import com.acgist.taoyao.boot.property.SocketProperties;
import com.acgist.taoyao.signal.client.ClientManager;
import com.acgist.taoyao.signal.client.socket.SocketSignal;
import com.acgist.taoyao.signal.client.websocket.WebSocketSignal;
import com.acgist.taoyao.signal.media.MediaClientManager;
import com.acgist.taoyao.signal.protocol.ProtocolManager;
import com.acgist.taoyao.signal.protocol.media.MediaRebootProtocol;
import com.acgist.taoyao.signal.protocol.media.MediaShutdownProtocol;
import com.acgist.taoyao.signal.protocol.platform.PlatformRebootProtocol;
import com.acgist.taoyao.signal.protocol.platform.PlatformScriptProtocol;
import com.acgist.taoyao.signal.protocol.platform.PlatformShutdownProtocol;
import com.acgist.taoyao.signal.protocol.system.SystemRebootProtocol;
import com.acgist.taoyao.signal.protocol.system.SystemShutdownProtocol;
import com.acgist.taoyao.signal.service.SecurityService;
import com.acgist.taoyao.signal.service.impl.SecurityServiceImpl;
@@ -34,21 +22,12 @@ import com.acgist.taoyao.signal.service.impl.SecurityServiceImpl;
@EnableWebSocket
public class SignalAutoConfiguration {
@Autowired
private MediaClientManager mediaClientManager;
@Bean
@ConditionalOnMissingBean
public WebSocketSignal webSocketSignal() {
return new WebSocketSignal();
}
@Bean
@ConditionalOnMissingBean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
@Bean
@ConditionalOnMissingBean
public SecurityService securityService() {
@@ -56,88 +35,20 @@ public class SignalAutoConfiguration {
}
@Bean
public CommandLineRunner mediaCommandLineRunner() {
return new CommandLineRunner() {
@Override
public void run(String ... args) throws Exception {
SignalAutoConfiguration.this.mediaClientManager.init();
}
};
@ConditionalOnMissingBean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
@Bean
@Autowired
@ConditionalOnProperty(prefix = "taoyao.socket", name = "enabled", havingValue = "true", matchIfMissing = true)
@ConditionalOnMissingBean
public SocketSignal socketSignal(
ClientManager clientManager,
ProtocolManager protocolManager,
SocketProperties socketProperties
) {
return new SocketSignal(clientManager, protocolManager, socketProperties);
}
@Bean
@Autowired
@ConditionalOnProperty(prefix = "taoyao.socket", name = "enabled", havingValue = "true", matchIfMissing = true)
public CommandLineRunner socketSignalCommandLineRunner(
SocketSignal socketSignal
) {
public CommandLineRunner mediaCommandLineRunner(MediaClientManager mediaClientManager) {
return new CommandLineRunner() {
@Override
public void run(String ... args) throws Exception {
socketSignal.listen();
mediaClientManager.init();
}
};
}
@Bean
@ConditionalOnProperty(prefix = "taoyao.script", name = "enabled", havingValue = "true", matchIfMissing = true)
@ConditionalOnMissingBean
public MediaRebootProtocol mediaRebootProtocol() {
return new MediaRebootProtocol();
}
@Bean
@ConditionalOnProperty(prefix = "taoyao.script", name = "enabled", havingValue = "true", matchIfMissing = true)
@ConditionalOnMissingBean
public MediaShutdownProtocol mediaShutdownProtocol() {
return new MediaShutdownProtocol();
}
@Bean
@ConditionalOnProperty(prefix = "taoyao.script", name = "enabled", havingValue = "true", matchIfMissing = true)
@ConditionalOnMissingBean
public SystemRebootProtocol systemRebootProtocol() {
return new SystemRebootProtocol();
}
@Bean
@ConditionalOnProperty(prefix = "taoyao.script", name = "enabled", havingValue = "true", matchIfMissing = true)
@ConditionalOnMissingBean
public SystemShutdownProtocol systemShutdownProtocol() {
return new SystemShutdownProtocol();
}
@Bean
@ConditionalOnProperty(prefix = "taoyao.script", name = "enabled", havingValue = "true", matchIfMissing = true)
@ConditionalOnMissingBean
public PlatformRebootProtocol platformRebootProtocol() {
return new PlatformRebootProtocol();
}
@Bean
@ConditionalOnProperty(prefix = "taoyao.script", name = "enabled", havingValue = "true", matchIfMissing = true)
@ConditionalOnMissingBean
public PlatformScriptProtocol platformScriptProtocol() {
return new PlatformScriptProtocol();
}
@Bean
@ConditionalOnProperty(prefix = "taoyao.script", name = "enabled", havingValue = "true", matchIfMissing = true)
@ConditionalOnMissingBean
public PlatformShutdownProtocol platformShutdownProtocol() {
return new PlatformShutdownProtocol();
}
}

View File

@@ -0,0 +1,50 @@
package com.acgist.taoyao.signal.config;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import com.acgist.taoyao.boot.property.SocketProperties;
import com.acgist.taoyao.signal.client.ClientManager;
import com.acgist.taoyao.signal.client.socket.SocketSignal;
import com.acgist.taoyao.signal.protocol.ProtocolManager;
import com.acgist.taoyao.signal.protocol.platform.PlatformErrorProtocol;
/**
* Socket信令自动配置
*
* @author acgist
*/
@Configuration
@ConditionalOnProperty(prefix = "taoyao.socket", name = "enabled", havingValue = "true", matchIfMissing = true)
public class SocketAutoConfigruation {
@Bean
@Autowired
@ConditionalOnMissingBean
public SocketSignal socketSignal(
ClientManager clientManager,
ProtocolManager protocolManager,
SocketProperties socketProperties,
PlatformErrorProtocol platformErrorProtocol
) {
return new SocketSignal(clientManager, protocolManager, socketProperties, platformErrorProtocol);
}
@Bean
@Autowired
@ConditionalOnBean(SocketSignal.class)
public CommandLineRunner socketSignalCommandLineRunner(SocketSignal socketSignal) {
return new CommandLineRunner() {
@Override
public void run(String ... args) throws Exception {
socketSignal.init();
}
};
}
}

View File

@@ -6,7 +6,6 @@ import org.springframework.context.ApplicationEvent;
import com.acgist.taoyao.boot.model.Message;
import com.acgist.taoyao.signal.MapBodyGetter;
import com.acgist.taoyao.signal.client.Client;
import lombok.Getter;
import lombok.Setter;
@@ -18,65 +17,51 @@ import lombok.Setter;
*/
@Getter
@Setter
public abstract class ApplicationEventAdapter extends ApplicationEvent implements MapBodyGetter {
private static final long serialVersionUID = 1L;
public class ApplicationEventAdapter extends ApplicationEvent implements MapBodyGetter {
/**
* 终端标识
*/
private String sn;
/**
* 主体
*/
private final Map<?, ?> body;
/**
* 终端
*/
private final Client client;
/**
* 消息
*/
private final Message message;
public ApplicationEventAdapter(Client client, Message message) {
this(null, client, message);
}
public ApplicationEventAdapter(Map<?, ?> body, Client client, Message message) {
super(client);
this.sn = client.sn();
this.body = body;
this.client = client;
this.message = message;
}
/**
* @see #get(Map, String)
*/
public <T> T get(String key) {
return this.get(this.body, key);
}
private static final long serialVersionUID = 1L;
/**
* @see #get(Map, String, Object)
*/
public <T> T get(String key, T defaultValue) {
return this.get(body, key, defaultValue);
}
/**
* @see #getLong(Map, String)
*/
public Long getLong(String key) {
return this.getLong(body, key);
}
/**
* @see #getInteger(Map, String)
*/
public Integer getInteger(String key) {
return this.getInteger(body, key);
}
/**
* 主体
*/
private final Map<?, ?> body;
/**
* 消息
*/
private final Message message;
protected ApplicationEventAdapter(Map<?, ?> body, Message message, Object source) {
super(source);
this.body = body;
this.message = message;
}
/**
* @see #get(Map, String)
*/
public <T> T get(String key) {
return this.get(this.body, key);
}
/**
* @see #get(Map, String, Object)
*/
public <T> T get(String key, T defaultValue) {
return this.get(body, key, defaultValue);
}
/**
* @see #getLong(Map, String)
*/
public Long getLong(String key) {
return this.getLong(body, key);
}
/**
* @see #getInteger(Map, String)
*/
public Integer getInteger(String key) {
return this.getInteger(body, key);
}
}

View File

@@ -0,0 +1,41 @@
package com.acgist.taoyao.signal.event;
import java.util.Map;
import com.acgist.taoyao.boot.model.Message;
import com.acgist.taoyao.signal.client.Client;
import lombok.Getter;
import lombok.Setter;
/**
* 终端事件适配器
*
* @author acgist
*/
@Getter
@Setter
public abstract class ClientEventAdapter extends ApplicationEventAdapter {
private static final long serialVersionUID = 1L;
/**
* 终端标识
*/
private String sn;
/**
* 终端
*/
private final Client client;
public ClientEventAdapter(Message message, Client client) {
this(Map.of(), message, client);
}
public ClientEventAdapter(Map<?, ?> body, Message message, Client client) {
super(body, message, client);
this.sn = client.sn();
this.client = client;
}
}

View File

@@ -0,0 +1,36 @@
package com.acgist.taoyao.signal.event;
import java.util.Map;
import com.acgist.taoyao.boot.model.Message;
import com.acgist.taoyao.signal.media.MediaClient;
import lombok.Getter;
import lombok.Setter;
/**
* 媒体事件适配器
*
* @author acgist
*/
@Getter
@Setter
public class MediaEventAdapter extends ApplicationEventAdapter {
private static final long serialVersionUID = 1L;
/**
* 终端
*/
private final MediaClient mediaClient;
public MediaEventAdapter(Message message, MediaClient mediaClient) {
this(Map.of(), message, mediaClient);
}
public MediaEventAdapter(Map<?, ?> body, Message message, MediaClient mediaClient) {
super(body, message, mediaClient);
this.mediaClient = mediaClient;
}
}

View File

@@ -2,7 +2,7 @@ package com.acgist.taoyao.signal.event.client;
import com.acgist.taoyao.boot.model.Message;
import com.acgist.taoyao.signal.client.Client;
import com.acgist.taoyao.signal.event.ApplicationEventAdapter;
import com.acgist.taoyao.signal.event.ClientEventAdapter;
import lombok.Getter;
import lombok.Setter;
@@ -14,12 +14,12 @@ import lombok.Setter;
*/
@Getter
@Setter
public class ClientCloseEvent extends ApplicationEventAdapter {
public class ClientCloseEvent extends ClientEventAdapter {
private static final long serialVersionUID = 1L;
public ClientCloseEvent(Client client, Message message) {
super(client, message);
public ClientCloseEvent(Message message, Client client) {
super(message, client);
}
}

View File

@@ -4,7 +4,7 @@ import java.util.Map;
import com.acgist.taoyao.boot.model.Message;
import com.acgist.taoyao.signal.client.Client;
import com.acgist.taoyao.signal.event.ApplicationEventAdapter;
import com.acgist.taoyao.signal.event.ClientEventAdapter;
import com.acgist.taoyao.signal.protocol.Constant;
import lombok.Getter;
@@ -17,12 +17,12 @@ import lombok.Setter;
*/
@Getter
@Setter
public class ClientRegisterEvent extends ApplicationEventAdapter {
public class ClientRegisterEvent extends ClientEventAdapter {
private static final long serialVersionUID = 1L;
public ClientRegisterEvent(Map<?, ?> body, Client client, Message message) {
super(body, client, message);
public ClientRegisterEvent(Map<?, ?> body, Message message, Client client) {
super(body, message, client);
}
/**

View File

@@ -0,0 +1,12 @@
package com.acgist.taoyao.signal.event.media;
/**
* 媒体服务注册事件
*
* TODO成功以后注册已有房间
*
* @author acgist
*/
public class MediaRegisterEvent {
}

View File

@@ -4,7 +4,7 @@ import java.util.Map;
import com.acgist.taoyao.boot.model.Message;
import com.acgist.taoyao.signal.client.Client;
import com.acgist.taoyao.signal.event.ApplicationEventAdapter;
import com.acgist.taoyao.signal.event.ClientEventAdapter;
import com.acgist.taoyao.signal.protocol.Constant;
import lombok.Getter;
@@ -17,16 +17,16 @@ import lombok.Setter;
*/
@Getter
@Setter
public class PlatformScriptEvent extends ApplicationEventAdapter {
public class PlatformScriptEvent extends ClientEventAdapter {
private static final long serialVersionUID = 1L;
public PlatformScriptEvent(String script, Client client, Message message) {
this(Map.of(Constant.SCRIPT, script), client, message);
public PlatformScriptEvent(String script, Message message, Client client) {
this(Map.of(Constant.SCRIPT, script), message, client);
}
public PlatformScriptEvent(Map<?, ?> body, Client client, Message message) {
super(body, client, message);
public PlatformScriptEvent(Map<?, ?> body, Message message, Client client) {
super(body, message, client);
}
/**

View File

@@ -2,7 +2,7 @@ package com.acgist.taoyao.signal.event.platform;
import com.acgist.taoyao.boot.model.Message;
import com.acgist.taoyao.signal.client.Client;
import com.acgist.taoyao.signal.event.ApplicationEventAdapter;
import com.acgist.taoyao.signal.event.ClientEventAdapter;
import lombok.Getter;
import lombok.Setter;
@@ -14,12 +14,12 @@ import lombok.Setter;
*/
@Getter
@Setter
public class PlatformShutdownEvent extends ApplicationEventAdapter {
public class PlatformShutdownEvent extends ClientEventAdapter {
private static final long serialVersionUID = 1L;
public PlatformShutdownEvent(Client client, Message message) {
super(client, message);
public PlatformShutdownEvent(Message message, Client client) {
super(message, client);
}
}

View File

@@ -4,7 +4,7 @@ import java.util.Map;
import com.acgist.taoyao.boot.model.Message;
import com.acgist.taoyao.signal.client.Client;
import com.acgist.taoyao.signal.event.ApplicationEventAdapter;
import com.acgist.taoyao.signal.event.ClientEventAdapter;
import com.acgist.taoyao.signal.protocol.Constant;
import lombok.Getter;
@@ -17,12 +17,12 @@ import lombok.Setter;
*/
@Getter
@Setter
public class RoomCreateEvent extends ApplicationEventAdapter {
public class RoomCreateEvent extends ClientEventAdapter {
private static final long serialVersionUID = 1L;
public RoomCreateEvent(Map<?, ?> body, Client client, Message message) {
super(body, client, message);
public RoomCreateEvent(Map<?, ?> body, Message message, Client client) {
super(body, message, client);
}
/**

View File

@@ -4,7 +4,7 @@ import java.util.Map;
import com.acgist.taoyao.boot.model.Message;
import com.acgist.taoyao.signal.client.Client;
import com.acgist.taoyao.signal.event.ApplicationEventAdapter;
import com.acgist.taoyao.signal.event.ClientEventAdapter;
import com.acgist.taoyao.signal.protocol.Constant;
import lombok.Getter;
@@ -17,12 +17,12 @@ import lombok.Setter;
*/
@Getter
@Setter
public class RoomEnterEvent extends ApplicationEventAdapter {
public class RoomEnterEvent extends ClientEventAdapter {
private static final long serialVersionUID = 1L;
public RoomEnterEvent(Map<?, ?> body, Client client, Message message) {
super(body, client, message);
public RoomEnterEvent(Map<?, ?> body, Message message, Client client) {
super(body, message, client);
}
/**

View File

@@ -5,7 +5,7 @@ import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationListener;
import com.acgist.taoyao.signal.client.ClientManager;
import com.acgist.taoyao.signal.event.ApplicationEventAdapter;
import com.acgist.taoyao.signal.event.ClientEventAdapter;
import com.acgist.taoyao.signal.media.MediaClientManager;
import com.acgist.taoyao.signal.media.RoomManager;
@@ -16,7 +16,7 @@ import com.acgist.taoyao.signal.media.RoomManager;
*
* @author acgist
*/
public abstract class ApplicationListenerAdapter<E extends ApplicationEventAdapter> implements ApplicationListener<E> {
public abstract class ApplicationListenerAdapter<E extends ClientEventAdapter> implements ApplicationListener<E> {
@Autowired
protected RoomManager roomManager;

View File

@@ -8,6 +8,8 @@ import org.springframework.scheduling.annotation.Async;
import com.acgist.taoyao.boot.annotation.EventListener;
import com.acgist.taoyao.boot.model.Message;
import com.acgist.taoyao.boot.model.MessageCode;
import com.acgist.taoyao.boot.model.MessageCodeException;
import com.acgist.taoyao.signal.client.Client;
import com.acgist.taoyao.signal.event.platform.PlatformScriptEvent;
import com.acgist.taoyao.signal.listener.ApplicationListenerAdapter;
@@ -46,8 +48,7 @@ public class PlatformScriptListener extends ApplicationListenerAdapter<PlatformS
*/
private String execute(String script) {
if(StringUtils.isEmpty(script)) {
log.warn("执行命令失败:{}", script);
return "命令为空";
throw MessageCodeException.of(MessageCode.CODE_1002, "无效命令:" + script);
}
String result = null;
Process process = null;

View File

@@ -36,8 +36,8 @@ public class PlatformShutdownListener extends ApplicationListenerAdapter<Platfor
// 命令关闭
this.applicationContext.publishEvent(new PlatformScriptEvent(
this.scriptProperties.getPlatformShutdown(),
event.getClient(),
event.getMessage()
event.getMessage(),
event.getClient()
));
}
}

View File

@@ -59,6 +59,11 @@ public class MediaClient {
private TaoyaoProperties taoyaoProperties;
@Autowired
private MediaRegisterProtocol mediaRegisterProtocol;
/**
* 最长重试周期
*/
private static final long MAX_DURATION = 60L * 1000;
/**
* 名称
@@ -184,7 +189,7 @@ public class MediaClient {
* @return 重试周期
*/
private long retryDuration() {
return this.duration = Math.min(this.duration + this.taoyaoProperties.getTimeout(), this.taoyaoProperties.getMaxTimeout());
return this.duration = Math.min(this.duration + this.taoyaoProperties.getTimeout(), MAX_DURATION);
}
/**

View File

@@ -91,5 +91,9 @@ public interface Constant {
* 终端ID
*/
String STREAM_ID = "streamId";
/**
* 换行
*/
String LINE = "\n";
}

View File

@@ -3,7 +3,7 @@ package com.acgist.taoyao.signal.protocol;
import com.acgist.taoyao.boot.model.Message;
import com.acgist.taoyao.boot.model.MessageCode;
import com.acgist.taoyao.signal.client.Client;
import com.acgist.taoyao.signal.event.ApplicationEventAdapter;
import com.acgist.taoyao.signal.event.ClientEventAdapter;
/**
* 信令协议
@@ -55,7 +55,7 @@ public interface Protocol {
*
* @param event 事件
*/
<E extends ApplicationEventAdapter> void publishEvent(E event);
<E extends ClientEventAdapter> void publishEvent(E event);
/**
* 创建信令消息

View File

@@ -10,7 +10,7 @@ import com.acgist.taoyao.boot.model.MessageCode;
import com.acgist.taoyao.boot.property.TaoyaoProperties;
import com.acgist.taoyao.boot.service.IdService;
import com.acgist.taoyao.signal.client.ClientManager;
import com.acgist.taoyao.signal.event.ApplicationEventAdapter;
import com.acgist.taoyao.signal.event.ClientEventAdapter;
import com.acgist.taoyao.signal.media.MediaClientManager;
import com.acgist.taoyao.signal.media.RoomManager;
@@ -59,7 +59,7 @@ public abstract class ProtocolAdapter implements Protocol {
}
@Override
public <E extends ApplicationEventAdapter> void publishEvent(E event) {
public <E extends ClientEventAdapter> void publishEvent(E event) {
this.applicationContext.publishEvent(event);
}

View File

@@ -49,7 +49,7 @@ public class ClientRegisterProtocol extends ProtocolMapAdapter {
// 推送消息
client.push(message.cloneWidthoutBody());
// 发送事件
this.publishEvent(new ClientRegisterEvent(body, client, message));
this.publishEvent(new ClientRegisterEvent(body, message, client));
}
}

View File

@@ -33,7 +33,7 @@ public class MediaRebootProtocol extends ProtocolAdapter {
// 全员广播
this.clientManager.broadcast(message);
// 推送事件
this.publishEvent(new PlatformScriptEvent(this.scriptProperties.getMediaReboot(), client, message));
this.publishEvent(new PlatformScriptEvent(this.scriptProperties.getMediaReboot(), message, client));
}
}

View File

@@ -3,6 +3,7 @@ package com.acgist.taoyao.signal.protocol.platform;
import com.acgist.taoyao.boot.annotation.Protocol;
import com.acgist.taoyao.boot.model.Message;
import com.acgist.taoyao.boot.model.MessageCode;
import com.acgist.taoyao.boot.model.MessageCodeException;
import com.acgist.taoyao.signal.client.Client;
import com.acgist.taoyao.signal.protocol.ProtocolAdapter;
@@ -35,6 +36,20 @@ public class PlatformErrorProtocol extends ProtocolAdapter {
@Override
public void execute(String sn, Client client, Message message) {
}
/**
* @param e 异常
*
* @return 异常消息
*/
public Message build(Exception e) {
final Message message = super.build();
if(e instanceof MessageCodeException code) {
message.setCode(code.getCode(), code.getMessage());
}
message.setBody(e.getMessage());
return message;
}
@Override
public Message build(String id, MessageCode code, String message, Object body) {

View File

@@ -33,7 +33,7 @@ public class PlatformRebootProtocol extends ProtocolAdapter {
// 全员广播
this.clientManager.broadcast(message);
// 推送事件
this.publishEvent(new PlatformScriptEvent(this.scriptProperties.getPlatformReboot(), client, message));
this.publishEvent(new PlatformScriptEvent(this.scriptProperties.getPlatformReboot(), message, client));
}
}

View File

@@ -22,7 +22,7 @@ public class PlatformScriptProtocol extends ProtocolMapAdapter {
@Override
public void execute(String sn, Map<?, ?> body, Client client, Message message) {
this.publishEvent(new PlatformScriptEvent(body, client, message));
this.publishEvent(new PlatformScriptEvent(body, message, client));
}
}

View File

@@ -27,7 +27,7 @@ public class PlatformShutdownProtocol extends ProtocolAdapter {
// 全员广播
this.clientManager.broadcast(message);
// 推送事件
this.publishEvent(new PlatformShutdownEvent(client, message));
this.publishEvent(new PlatformShutdownEvent(message, client));
}
}

View File

@@ -24,7 +24,7 @@ public class RoomCreateProtocol extends ProtocolMapAdapter {
@Override
public void execute(String sn, Map<?, ?> body, Client client, Message message) {
this.publishEvent(new RoomCreateEvent(body, client, message));
this.publishEvent(new RoomCreateEvent(body, message, client));
}
}

View File

@@ -24,7 +24,7 @@ public class RoomEnterProtocol extends ProtocolMapAdapter {
@Override
public void execute(String sn, Map<?, ?> body, Client client, Message message) {
this.publishEvent(new RoomEnterEvent(body, client, message));
this.publishEvent(new RoomEnterEvent(body, message, client));
}
}

View File

@@ -31,7 +31,7 @@ public class SystemRebootProtocol extends ProtocolAdapter {
public void execute(String sn, Client client, Message message) {
log.info("重启系统:{}", sn);
this.clientManager.broadcast(message);
this.publishEvent(new PlatformScriptEvent(this.scriptProperties.getSystemReboot(), client, message));
this.publishEvent(new PlatformScriptEvent(this.scriptProperties.getSystemReboot(), message, client));
}
}

View File

@@ -31,7 +31,7 @@ public class SystemShutdownProtocol extends ProtocolAdapter {
public void execute(String sn, Client client, Message message) {
log.info("关闭系统:{}", sn);
this.clientManager.broadcast(message);
this.publishEvent(new PlatformScriptEvent(this.scriptProperties.getSystemShutdown(), client, message));
this.publishEvent(new PlatformScriptEvent(this.scriptProperties.getSystemShutdown(), message, client));
}
}

View File

@@ -1 +1,3 @@
com.acgist.taoyao.signal.config.ScriptAutoConfiguration
com.acgist.taoyao.signal.config.SignalAutoConfiguration
com.acgist.taoyao.signal.config.SocketAutoConfigruation