[*] 测试

This commit is contained in:
acgist
2022-11-20 11:14:44 +08:00
parent 70db787618
commit f900bbb998
5 changed files with 126 additions and 5 deletions

View File

@@ -0,0 +1,26 @@
package com.acgist.taoyao.signal;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import java.net.http.WebSocket;
import org.junit.jupiter.api.Test;
import com.acgist.taoyao.main.TaoyaoApplication;
import com.acgist.taoyao.test.annotation.TaoyaoTest;
@TaoyaoTest(classes = TaoyaoApplication.class)
class SignalTest {
@Test
void testSignal() throws InterruptedException {
final WebSocket clientA = WebSocketClient.build("wss://localhost:8888/websocket.signal", "clientA");
final WebSocket clientB = WebSocketClient.build("wss://localhost:8888/websocket.signal", "clientB");
clientA.sendText("""
{"header":{"pid":1000,"v":"1.0.0","id":"1","sn":"clientA"},"body":{}}
""", true).join();
assertNotNull(clientA);
assertNotNull(clientB);
}
}

View File

@@ -0,0 +1,88 @@
package com.acgist.taoyao.signal;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.WebSocket;
import java.net.http.WebSocket.Listener;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.security.SecureRandom;
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
import java.util.concurrent.CompletionStage;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManager;
import javax.net.ssl.X509TrustManager;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class WebSocketClient {
public static final WebSocket build(String uri, String sn) throws InterruptedException {
final Object lock = new Object();
try {
return HttpClient
.newBuilder()
.sslContext(newSSLContext())
.build()
.newWebSocketBuilder()
.buildAsync(URI.create(uri), new Listener() {
@Override
public void onOpen(WebSocket webSocket) {
webSocket.sendText(String.format("""
{"header":{"pid":2000,"v":"1.0.0","id":"1","sn":"%s"},"body":{"username":"taoyao","password":"taoyao"}}
""", sn), true);
Listener.super.onOpen(webSocket);
}
@Override
public CompletionStage<?> onText(WebSocket webSocket, CharSequence data, boolean last) {
synchronized (lock) {
lock.notifyAll();
}
log.info("收到WebSocket消息{}", data);
return Listener.super.onText(webSocket, data, last);
}
})
.join();
} finally {
synchronized (lock) {
lock.wait(1000);
}
}
}
private static final SSLContext newSSLContext() {
SSLContext sslContext = null;
try {
// SSL协议SSL、SSLv2、SSLv3、TLS、TLSv1、TLSv1.1、TLSv1.2、TLSv1.3
sslContext = SSLContext.getInstance("TLSv1.2");
sslContext.init(null, TRUST_ALL_CERT_MANAGER, new SecureRandom());
} catch (KeyManagementException | NoSuchAlgorithmException e) {
log.error("新建SSLContext异常", e);
try {
sslContext = SSLContext.getDefault();
} catch (NoSuchAlgorithmException ex) {
log.error("新建默认SSLContext异常", ex);
}
}
return sslContext;
}
private static final TrustManager[] TRUST_ALL_CERT_MANAGER = new TrustManager[] {
new X509TrustManager() {
@Override
public void checkClientTrusted(X509Certificate[] chain, String authType) throws CertificateException {
}
@Override
public void checkServerTrusted(X509Certificate[] chain, String authType) throws CertificateException {
}
@Override
public X509Certificate[] getAcceptedIssuers() {
return null;
}
}
};
}

View File

@@ -41,7 +41,7 @@
#### 消息流程:终端->服务端+)终端 #### 消息流程:终端->服务端+)终端
全员广播关闭服务信令,然后关闭信令服务。 全员广播[关闭服务信令](#关闭服务信令1000),然后关闭信令服务。
### 执行命令信令1001 ### 执行命令信令1001
@@ -60,7 +60,7 @@
#### 消息流程:终端->服务端->终端 #### 消息流程:终端->服务端->终端
在服务端执行终端命令并返回结果 执行命令同时响应结果
### 异常信令1999 ### 异常信令1999

View File

@@ -67,22 +67,27 @@ public class ProtocolManager {
*/ */
public void execute(String message, AutoCloseable instance) { public void execute(String message, AutoCloseable instance) {
log.debug("执行信令消息:{}", message); log.debug("执行信令消息:{}", message);
final ClientSession session = this.clientSessionManager.session(instance);
// 验证请求 // 验证请求
final Message value = JSONUtils.toJava(message, Message.class); final Message value = JSONUtils.toJava(message, Message.class);
if(value == null) { if(value == null) {
log.warn("消息格式错误(解析失败):{}", message); log.warn("消息格式错误(解析失败):{}", message);
session.push(this.errorProtocol.build("消息格式错误(解析失败)"));
return; return;
} }
final Header header = value.getHeader(); final Header header = value.getHeader();
if(header == null) { if(header == null) {
log.warn("消息格式错误(没有头部):{}", message); log.warn("消息格式错误(没有头部):{}", message);
session.push(this.errorProtocol.build("消息格式错误(没有头部)"));
return; return;
} }
final String v = header.getV();
final String id = header.getId(); final String id = header.getId();
final String sn = header.getSn(); final String sn = header.getSn();
final Integer pid = header.getPid(); final Integer pid = header.getPid();
if(id == null || sn == null || pid == null) { if(v == null || id == null || sn == null || pid == null) {
log.warn("消息格式错误(id|sn|pid{}", message); log.warn("消息格式错误(缺失头部关键参数{}", message);
session.push(this.errorProtocol.build("消息格式错误(缺失头部关键参数)"));
return; return;
} }
// 设置缓存ID // 设置缓存ID
@@ -91,14 +96,15 @@ public class ProtocolManager {
final Protocol protocol = this.protocolMapping.get(pid); final Protocol protocol = this.protocolMapping.get(pid);
if(protocol == null) { if(protocol == null) {
log.warn("不支持的信令协议:{}", message); log.warn("不支持的信令协议:{}", message);
session.push(this.errorProtocol.build("不支持的信令协议"));
return; return;
} }
final ClientSession session = this.clientSessionManager.session(instance);
if(protocol instanceof ClientRegisterProtocol) { if(protocol instanceof ClientRegisterProtocol) {
protocol.execute(sn, value, session); protocol.execute(sn, value, session);
} else if(session.authorized() && sn.equals(session.sn())) { } else if(session.authorized() && sn.equals(session.sn())) {
protocol.execute(sn, value, session); protocol.execute(sn, value, session);
} else { } else {
log.warn("终端会话没有授权:{}", message);
session.push(this.errorProtocol.build(MessageCode.CODE_3401, "终端会话没有授权")); session.push(this.errorProtocol.build(MessageCode.CODE_3401, "终端会话没有授权"));
} }
} }

View File

@@ -30,6 +30,7 @@ public class ShutdownProtocol extends ProtocolAdapter {
if(this.context instanceof ConfigurableApplicationContext context) { if(this.context instanceof ConfigurableApplicationContext context) {
log.info("关闭信令服务:{}", sn); log.info("关闭信令服务:{}", sn);
if(context.isActive()) { if(context.isActive()) {
// 如果需要完整广播可以设置延时
context.close(); context.close();
} }
} else { } else {