[*] 十年饮冰 难凉热血

This commit is contained in:
acgist
2023-03-10 23:35:24 +08:00
parent 63a37be492
commit 1bf8fbe415
57 changed files with 1593 additions and 393 deletions

View File

@@ -5,6 +5,7 @@ import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import com.acgist.taoyao.boot.config.MediaProperties;
import com.acgist.taoyao.boot.config.SocketProperties;
import com.acgist.taoyao.boot.config.WebrtcProperties;
import com.acgist.taoyao.boot.model.Message;
import com.acgist.taoyao.signal.config.camera.CameraProperties;
@@ -27,11 +28,18 @@ public class ConfigController {
private final MediaProperties mediaProperties;
private final CameraProperties cameraProperties;
private final SocketProperties socketProperties;
private final WebrtcProperties webrtcProperties;
public ConfigController(MediaProperties mediaProperties, CameraProperties cameraProperties, WebrtcProperties webrtcProperties) {
public ConfigController(
MediaProperties mediaProperties,
CameraProperties cameraProperties,
SocketProperties socketProperties,
WebrtcProperties webrtcProperties
) {
this.mediaProperties = mediaProperties;
this.cameraProperties = cameraProperties;
this.socketProperties = socketProperties;
this.webrtcProperties = webrtcProperties;
}
@@ -48,6 +56,13 @@ public class ConfigController {
private Message camera() {
return Message.success(this.cameraProperties);
}
@Operation(summary = "Socket配置", description = "Socket配置")
@GetMapping("/socket")
@ApiResponse(content = @Content(schema = @Schema(implementation = SocketProperties.class)))
public Message socket() {
return Message.success(this.socketProperties);
}
@Operation(summary = "WebRTC配置", description = "WebRTC配置")
@GetMapping("/webrtc")

View File

@@ -131,6 +131,8 @@ taoyao:
enabled: true
host: 0.0.0.0
port: 9999
encrypt: DES
encrypt-key:
timeout: ${taoyao.timeout}
queue-size: 100000
min-thread: 4
@@ -138,9 +140,10 @@ taoyao:
thread-name-prefix: ${spring.application.name}-signal-
keep-alive-time: 60000
buffer-size: 2048
max-buffer-size: 32768
# WebRTC配置
webrtc:
# 是否加密
# 是否加密E2E
encrypt: false
# STUN服务
stun:

View File

@@ -1,16 +1,21 @@
package com.acgist.taoyao.signal;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import javax.crypto.Cipher;
import org.junit.jupiter.api.Test;
import com.acgist.taoyao.boot.config.Constant;
import com.acgist.taoyao.boot.config.SocketProperties.Encrypt;
import com.acgist.taoyao.boot.model.MessageCodeException;
import com.acgist.taoyao.signal.utils.CipherUtils;
import lombok.extern.slf4j.Slf4j;
@@ -21,38 +26,81 @@ public class SocketSignalTest {
void testSocket() throws Exception {
final Socket socket = new Socket();
socket.connect(new InetSocketAddress("127.0.0.1", 9999));
final OutputStream outputStream = socket.getOutputStream();
final AtomicInteger recvIndex = new AtomicInteger();
final InputStream inputStream = socket.getInputStream();
final String line = Constant.LINE;
final int lineLength = line.length();
final OutputStream outputStream = socket.getOutputStream();
// 随机密码https://localhost:8888/config/socket
final String secret = """
Oi7ZvxZEcOU=
""".strip();
final Cipher encrypt = CipherUtils.buildCipher(Cipher.ENCRYPT_MODE, Encrypt.DES, secret);
final Cipher decrypt = CipherUtils.buildCipher(Cipher.DECRYPT_MODE, Encrypt.DES, secret);
// 接收
new Thread(() -> {
int index = 0;
int length = 0;
short messageLength = 0;
final byte[] bytes = new byte[1024];
final StringBuilder builder = new StringBuilder();
final ByteBuffer buffer = ByteBuffer.allocateDirect(16 * 1024);
try {
while((length = inputStream.read(bytes)) >= 0) {
builder.append(new String(bytes, 0, length));
while((index = builder.indexOf(line)) >= 0) {
log.info("收到消息:{}", builder.substring(0, index));
builder.delete(0, index + lineLength);
}
buffer.put(bytes, 0, length);
while(buffer.position() > 0) {
if(messageLength <= 0) {
if(buffer.position() < Short.BYTES) {
// 不够消息长度
break;
} else {
buffer.flip();
messageLength = buffer.getShort();
buffer.compact();
if(messageLength > 16 * 1024) {
throw MessageCodeException.of("超过最大数据大小:" + messageLength);
}
}
} else {
if(buffer.position() < messageLength) {
// 不够消息长度
break;
} else {
final byte[] message = new byte[messageLength];
messageLength = 0;
buffer.flip();
buffer.get(message);
buffer.compact();
log.debug("收到消息:{}", new String(decrypt.doFinal(message)));
recvIndex.incrementAndGet();
}
}
}
}
} catch (IOException e) {
} catch (Exception e) {
log.error("读取异常", e);
}
}).start();
// 发送
final AtomicInteger sendIndex = new AtomicInteger();
final Executor executor = Executors.newFixedThreadPool(10);
for (int index = 0; index < 100; index++) {
executor.execute(() -> {
try {
outputStream.write(("{}" + line).getBytes());
} catch (IOException e) {
final byte[] bytes = ("{\"time\":" + System.nanoTime() + "}").getBytes();
final byte[] encryptBytes = encrypt.doFinal(bytes);
final ByteBuffer buffer = ByteBuffer.allocateDirect(Short.BYTES + encryptBytes.length);
buffer.putShort((short) encryptBytes.length);
buffer.put(encryptBytes);
buffer.flip();
final byte[] message = new byte[buffer.capacity()];
buffer.get(message);
outputStream.write(message);
sendIndex.incrementAndGet();
} catch (Exception e) {
log.error("发送异常", e);
}
});
}
Thread.sleep(5000);
log.info("发送数据:{}", sendIndex.get());
log.info("接收数据:{}", recvIndex.get());
socket.close();
}