[+] socket signal

This commit is contained in:
acgist
2023-02-12 21:16:08 +08:00
parent 61c93b2614
commit 78fa48c35c
14 changed files with 412 additions and 35 deletions

View File

@@ -0,0 +1,42 @@
package com.acgist.taoyao.signal.client.socket;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import com.acgist.taoyao.signal.client.ClientManager;
import com.acgist.taoyao.signal.protocol.ProtocolManager;
import lombok.extern.slf4j.Slf4j;
/**
* Socket信令接收
*
* @author acgist
*/
@Slf4j
public final class SocketAcceptHandler implements CompletionHandler<AsynchronousSocketChannel, AsynchronousServerSocketChannel> {
private ClientManager clientManager;
private ProtocolManager protocolManager;
public SocketAcceptHandler(ClientManager clientManager, ProtocolManager protocolManager) {
this.clientManager = clientManager;
this.protocolManager = protocolManager;
}
@Override
public void completed(AsynchronousSocketChannel channel, AsynchronousServerSocketChannel server) {
log.debug("Socket信令连接成功{}", channel);
this.clientManager.open(new SocketClient(channel));
final SocketMessageHandler socketMessageHandler = new SocketMessageHandler(this.clientManager, this.protocolManager);
socketMessageHandler.handle(channel);
server.accept(server, this);
}
@Override
public void failed(Throwable throwable, AsynchronousServerSocketChannel server) {
log.error("Socket信令连接异常{}", server, throwable);
}
}

View File

@@ -1,8 +1,9 @@
package com.acgist.taoyao.signal.client.socket;
import java.io.IOException;
import java.io.OutputStream;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import com.acgist.taoyao.boot.model.Message;
import com.acgist.taoyao.signal.client.ClientAdapter;
@@ -19,31 +20,25 @@ import lombok.extern.slf4j.Slf4j;
@Slf4j
@Getter
@Setter
public class SocketClient extends ClientAdapter<Socket> {
public class SocketClient extends ClientAdapter<AsynchronousSocketChannel> {
/**
* 输出
*/
private OutputStream outputStream;
public SocketClient(Socket instance) {
public SocketClient(AsynchronousSocketChannel instance) {
super(instance);
try {
this.outputStream = instance.getOutputStream();
} catch (IOException e) {
log.error("Socket终端输出异常{}", instance, e);
}
}
@Override
public void push(Message message) {
try {
if(this.instance.isClosed()) {
log.error("会话已经关闭:{}", this.instance);
} else {
this.outputStream.write(message.toString().getBytes());
synchronized (this.instance) {
}
} catch (IOException e) {
if(this.instance.isOpen()) {
final Future<Integer> future = this.instance.write(ByteBuffer.wrap(message.toString().getBytes()));
future.get();
// TODO超时
} else {
log.error("会话已经关闭:{}", this.instance);
}
} catch (InterruptedException | ExecutionException e) {
log.error("Socket发送消息异常{}", message, e);
}
}

View File

@@ -0,0 +1,76 @@
package com.acgist.taoyao.signal.client.socket;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import com.acgist.taoyao.boot.utils.CloseableUtils;
import com.acgist.taoyao.signal.client.ClientManager;
import com.acgist.taoyao.signal.protocol.ProtocolManager;
import lombok.extern.slf4j.Slf4j;
/**
* Socket信令消息
*
* @author acgist
*/
@Slf4j
public final class SocketMessageHandler implements CompletionHandler<Integer, ByteBuffer> {
private ClientManager clientManager;
private ProtocolManager protocolManager;
private AsynchronousSocketChannel channel;
public SocketMessageHandler(ClientManager clientManager, ProtocolManager protocolManager) {
this.clientManager = clientManager;
this.protocolManager = protocolManager;
}
public void handle(AsynchronousSocketChannel channel) {
this.channel = channel;
this.waitMessage();
}
/**
* 消息轮询
*/
private void waitMessage() {
if(this.channel.isOpen()) {
final ByteBuffer buffer = ByteBuffer.allocateDirect(2048);
this.channel.read(buffer, buffer, this);
} else {
log.debug("Socket信令消息退出消息轮询");
}
}
private void close() {
CloseableUtils.close(this.channel);
}
@Override
public void completed(Integer result, ByteBuffer buffer) {
if (result == null) {
this.close();
} else if(result == -1) {
// 服务端关闭
this.close();
} else if(result == 0) {
// 消息空轮询
log.debug("Socket信令消息接收失败长度{}", result);
} else {
final byte[] bytes = new byte[buffer.remaining()];
buffer.get(bytes);
this.protocolManager.execute(new String(bytes), this.channel);
}
this.waitMessage();
}
@Override
public void failed(Throwable throwable, ByteBuffer buffer) {
log.error("Socket信令消息处理异常{}", this.channel, throwable);
this.close();
}
}

View File

@@ -1,10 +1,111 @@
package com.acgist.taoyao.signal.client.socket;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import com.acgist.taoyao.boot.model.MessageCodeException;
import com.acgist.taoyao.boot.property.SocketProperties;
import com.acgist.taoyao.boot.utils.CloseableUtils;
import com.acgist.taoyao.signal.client.ClientManager;
import com.acgist.taoyao.signal.protocol.ProtocolManager;
import lombok.extern.slf4j.Slf4j;
/**
* Socket信令
*
* @author acgist
*/
@Slf4j
public class SocketSignal {
private int index = 0;
private ClientManager clientManager;
private AsynchronousChannelGroup group;
private ProtocolManager protocolManager;
private SocketProperties socketProperties;
private AsynchronousServerSocketChannel channel;
public SocketSignal(
ClientManager clientManager,
ProtocolManager protocolManager,
SocketProperties socketProperties
) {
this.clientManager = clientManager;
this.protocolManager = protocolManager;
this.socketProperties = socketProperties;
try {
final ExecutorService executor = new ThreadPoolExecutor(
this.socketProperties.getThreadMin(),
this.socketProperties.getThreadMax(),
this.socketProperties.getKeepAliveTime(),
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(this.socketProperties.getQueueSize()),
newThreadFactory()
);
this.group = AsynchronousChannelGroup.withThreadPool(executor);
} catch (IOException e) {
throw MessageCodeException.of(e, "创建Socket信令失败");
}
}
/**
* @return 线程池工厂
*/
private ThreadFactory newThreadFactory() {
return (runnable) -> {
final Thread thread = new Thread(runnable);
// 线程名称
synchronized(this) {
if(++this.index > this.socketProperties.getThreadMax()) {
this.index = 0;
}
thread.setName(this.socketProperties.getThreadNamePrefix() + this.index);
}
// 守护线程
thread.setDaemon(true);
return thread;
};
}
/**
* 开启监听
*
* @return 是否成功
*/
public boolean listen() {
boolean success = true;
try {
this.channel = AsynchronousServerSocketChannel.open(this.group);
this.channel.bind(new InetSocketAddress(this.socketProperties.getHost(), this.socketProperties.getPort()));
this.channel.accept(this.channel, new SocketAcceptHandler(this.clientManager, this.protocolManager));
} catch (IOException e) {
log.error("启动Socket信令服务异常", e);
success = false;
} finally {
if(success) {
log.info("启动Socket信令服务{}-{}", this.socketProperties.getHost(), this.socketProperties.getPort());
} else {
this.shutdown();
}
}
return success;
}
/**
* 关闭Socket信令服务
*/
public void shutdown() {
log.debug("关闭Socket信令服务");
CloseableUtils.close(this.channel);
this.group.shutdown();
}
}

View File

@@ -9,8 +9,12 @@ import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
import com.acgist.taoyao.boot.property.SocketProperties;
import com.acgist.taoyao.signal.client.ClientManager;
import com.acgist.taoyao.signal.client.socket.SocketSignal;
import com.acgist.taoyao.signal.client.websocket.WebSocketSignal;
import com.acgist.taoyao.signal.media.MediaClientManager;
import com.acgist.taoyao.signal.protocol.ProtocolManager;
import com.acgist.taoyao.signal.protocol.media.MediaRebootProtocol;
import com.acgist.taoyao.signal.protocol.media.MediaShutdownProtocol;
import com.acgist.taoyao.signal.protocol.platform.PlatformRebootProtocol;
@@ -61,6 +65,32 @@ public class SignalAutoConfiguration {
};
}
@Bean
@Autowired
@ConditionalOnProperty(prefix = "taoyao.socket", name = "enabled", havingValue = "true", matchIfMissing = true)
@ConditionalOnMissingBean
public SocketSignal socketSignal(
ClientManager clientManager,
ProtocolManager protocolManager,
SocketProperties socketProperties
) {
return new SocketSignal(clientManager, protocolManager, socketProperties);
}
@Bean
@Autowired
@ConditionalOnProperty(prefix = "taoyao.socket", name = "enabled", havingValue = "true", matchIfMissing = true)
public CommandLineRunner socketSignalCommandLineRunner(
SocketSignal socketSignal
) {
return new CommandLineRunner() {
@Override
public void run(String ... args) throws Exception {
socketSignal.listen();
}
};
}
@Bean
@ConditionalOnProperty(prefix = "taoyao.script", name = "enabled", havingValue = "true", matchIfMissing = true)
@ConditionalOnMissingBean

View File

@@ -12,6 +12,8 @@ import com.acgist.taoyao.signal.media.Room;
/**
* 房间媒体服务信令适配器
*
* TODO校验是否是房间内的用户权限
*
* @author acgist
*/
public abstract class ProtocolMediaRoomAdapter extends ProtocolMediaAdapter implements MapBodyGetter {