[*] 日常优化

This commit is contained in:
acgist
2023-09-08 10:11:53 +08:00
parent 475bb71300
commit 587b3c99d5
5 changed files with 234 additions and 61 deletions

View File

@@ -465,8 +465,8 @@ class Taoyao {
case "media::transport::close":
me.mediaTransportClose(message, body);
break;
case "media::transport::plain":
me.mediaTransportPlain(message, body);
case "media::transport::plain::create":
me.mediaTransportPlainCreate(message, body);
break;
case "media::transport::status":
me.mediaTransportStatus(message, body);
@@ -1628,7 +1628,7 @@ class Taoyao {
* @param {*} message 消息
* @param {*} body 消息主体
*/
async mediaTransportPlain(message, body) {
async mediaTransportPlainCreate(message, body) {
const me = this;
const {
roomId,

View File

@@ -5,7 +5,11 @@ import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import java.util.Scanner;
import java.util.Timer;
import java.util.TimerTask;
import javax.crypto.Cipher;
@@ -13,22 +17,31 @@ import org.apache.commons.lang3.StringUtils;
import org.junit.jupiter.api.Test;
import com.acgist.taoyao.boot.config.SocketProperties.Encrypt;
import com.acgist.taoyao.boot.model.Message;
import com.acgist.taoyao.boot.model.MessageCodeException;
import com.acgist.taoyao.boot.utils.JSONUtils;
import com.acgist.taoyao.boot.utils.ScriptUtils;
import com.acgist.taoyao.signal.utils.CipherUtils;
import lombok.extern.slf4j.Slf4j;
/**
* TODO验证不能生产两个媒体
*/
@Slf4j
public class RtpTest {
private String roomId = "79371adf-0f05-4852-a664-f00b1b77662d";
private Map<String, Message> response = new HashMap<>();
@Test
void testSocket() throws Exception {
final Socket socket = new Socket();
socket.connect(new InetSocketAddress("127.0.0.1", 9999));
final InputStream inputStream = socket.getInputStream();
final InputStream inputStream = socket.getInputStream();
final OutputStream outputStream = socket.getOutputStream();
// 随机密码https://localhost:8888/config/socket
final String secret = "2SPWy+TF1zM=".strip();
final String secret = "2SPWy+TF1zM=".strip();
final Cipher encrypt = CipherUtils.buildCipher(Cipher.ENCRYPT_MODE, Encrypt.DES, secret);
final Cipher decrypt = CipherUtils.buildCipher(Cipher.DECRYPT_MODE, Encrypt.DES, secret);
// 接收
@@ -63,12 +76,15 @@ public class RtpTest {
buffer.flip();
buffer.get(message);
buffer.compact();
final String value = new String(decrypt.doFinal(message));
if(value.contains("media::audio::volume")) {
final String value = new String(decrypt.doFinal(message));
final Message response = JSONUtils.toJava(value, Message.class);
final String signal = response.getHeader().getSignal();
if("media::audio::volume".equals(signal)) {
log.debug("收到消息:{}", value);
} else {
log.info("收到消息:{}", value);
}
this.response.put(signal, response);
}
}
}
@@ -77,36 +93,24 @@ public class RtpTest {
log.error("读取异常", e);
}
}).start();
// 发送
String line = """
{
"header":{"v":"1.0.0","id":1215293599999001,"signal":"client::register"},
"body":{"clientId":"ffmpeg","name":"ffmpeg","clientType":"WEB","battery":100,"charging":true,"username":"taoyao","password":"taoyao"}
}
""";
// {"header":{"v":"1.0.0","id":1215310510002009,"signal":"room::enter"},"body":{"roomId":"8260e615-3081-4bfc-96a8-574f4dd780d9"}}
// {"header":{"v":"1.0.0","id":1215310510002010,"signal":"media::transport::plain"},"body":{"roomId":"8260e615-3081-4bfc-96a8-574f4dd780d9","rtcpMux":false,"comedia":true}}
// {"header":{"v":"1.0.0","id":1215375110006012,"signal":"media::produce"},"body":{"kind":"video","roomId":"8260e615-3081-4bfc-96a8-574f4dd780d9","transportId":"14dc9307-bf9c-4442-a9ad-ce6a97623ef4","appData":{},"rtpParameters":{"codecs":[{"mimeType":"video/vp8","clockRate":90000,"payloadType":102,"rtcpFeedback":[]}],"encodings":[{"ssrc":123123}]}}}
// 音频转为PCM
// ffmpeg.exe -i .\a.m4a -f s16le a.pcm
// ffmpeg.exe -i .\a.m4a -f s16le -ac 2 -ar 8000 a.pcm
// ffplay.exe -ar 48000 -ac 2 -f s16le -i a.pcm
// ffmpeg不支持rtcpMux
// ffmpeg -re -i video.mp4 -c:v vp8 -map 0:0 -f tee "[select=v:f=rtp:ssrc=123123:payload_type=102]rtp://192.168.1.110:40793?rtcpport=47218"
// ffmpeg -re -i video.mp4 -c:v libvpx -map 0:0 -f tee "[select=v:f=rtp:ssrc=123123:payload_type=102]rtp://192.168.1.110:40793?rtcpport=47218"
// ffmpeg -re -i video.mp4 -c:v vp8 -map 0:0 -f tee "[select=v:f=rtp:ssrc=123456:payload_type=102]rtp://192.168.1.110:40793?rtcpport=47218"
// ffmpeg -re -i video.mp4 -c:v libvpx -map 0:0 -f tee "[select=v:f=rtp:ssrc=123456:payload_type=102]rtp://192.168.1.110:40793?rtcpport=47218"
// 音频视频同时传输
// ffmpeg -re -i video.mp4 -c:a libopus -vn -f rtp rtp://192.168.1.110:8888 -c:v libx264 -an -f rtp rtp://192.168.1.110:9999 -sdp_file taoyao.sdp
// ffplay -protocol_whitelist "file,rtp,udp" -i taoyao.sdp
// ffmpeg -protocol_whitelist "file,rtp,udp" -i taoyao.sdp taoyao.mp4
// 发送命令register/enter/create/audio/video
String command = "register";
final Scanner scanner = new Scanner(System.in);
do {
if(StringUtils.isEmpty(line)) {
if(StringUtils.isEmpty(command)) {
break;
}
try {
final byte[] bytes = line.getBytes();
final byte[] bytes = this.request(command).getBytes();
final byte[] encryptBytes = encrypt.doFinal(bytes);
final ByteBuffer buffer = ByteBuffer.allocateDirect(Short.BYTES + encryptBytes.length);
final ByteBuffer buffer = ByteBuffer.allocateDirect(Short.BYTES + encryptBytes.length);
buffer.putShort((short) encryptBytes.length);
buffer.put(encryptBytes);
buffer.flip();
@@ -116,9 +120,162 @@ public class RtpTest {
} catch (Exception e) {
log.error("发送异常", e);
}
} while((line = scanner.next()) != null);
} while((command = scanner.next()) != null);
socket.close();
scanner.close();
}
private String request(String command) {
return switch (command) {
case "register" -> this.register();
case "enter" -> this.enter();
case "transport" -> this.transport();
case "audio" -> this.produceAudio();
case "video" -> this.produceVideo();
default -> null;
};
}
private String register() {
return """
{
"header": {
"v" : "1.0.0",
"id" : 1,
"signal": "client::register"
},
"body" : {
"clientId" : "ffmpeg",
"name" : "ffmpeg",
"clientType": "WEB",
"battery" : 100,
"charging" : true,
"username" : "taoyao",
"password" : "taoyao"
}
}
""";
}
private String enter() {
return String.format("""
{
"header": {
"v" : "1.0.0",
"id" : 2,
"signal": "room::enter"
},
"body" : {
"roomId": "%s"
}
}
""", this.roomId);
}
private String transport() {
return String.format("""
{
"header": {
"v" : "1.0.0",
"id" : 3,
"signal": "media::transport::plain::create"},
"body" : {
"roomId" : "%s",
"rtcpMux": false,
"comedia": true
}
}
""", this.roomId);
}
private String produceAudio() {
final Message message = this.response.get("media::transport::plain::create");
final Map<String, String> body = message.body();
final String transportId = body.get("transportId");
new Timer().schedule(new TimerTask() {
@Override
public void run() {
final Message produce = RtpTest.this.response.get("media::transport::plain::create");
final Map<String, Object> map = produce.body();
final String ip = map.get("ip").toString();
final String port = map.get("port").toString();
final String rtcpPort = map.get("rtcpPort").toString();
final String command = String.format("ffmpeg -re -i D:\\tmp\\video.mp4 -c:a libopus -map 0:1 -f tee \"[select=a:f=rtp:ssrc=123456:payload_type=100]rtp://%s:%s?rtcpport=%s\"", ip, port, rtcpPort);
log.info("执行命令:{}", command);
ScriptUtils.execute(command);
}
}, 1000);
return String.format("""
{
"header": {
"v" : "1.0.0",
"id" : 4,
"signal": "media::produce"
},
"body" : {
"kind" : "audio",
"roomId" : "%s",
"transportId": "%s",
"appData" : {},
"rtpParameters":{
"codecs" : [{
"mimeType" : "audio/opus",
"channels" : 2,
"clockRate" : 48000,
"payloadType": 100
}],
"encodings": [{
"ssrc": 123456
}]
}
}
}
""", this.roomId, transportId);
}
private String produceVideo() {
final Message message = this.response.get("media::transport::plain::create");
final Map<String, String> body = message.body();
final String transportId = body.get("transportId");
new Timer().schedule(new TimerTask() {
@Override
public void run() {
final Message produce = RtpTest.this.response.get("media::transport::plain::create");
final Map<String, Object> map = produce.body();
final String ip = map.get("ip").toString();
final String port = map.get("port").toString();
final String rtcpPort = map.get("rtcpPort").toString();
final String command = String.format("ffmpeg -re -i D:\\tmp\\video.mp4 -c:v libvpx -map 0:0 -f tee \"[select=v:f=rtp:ssrc=654321:payload_type=102]rtp://%s:%s?rtcpport=%s\"", ip, port, rtcpPort);
log.info("执行命令:{}", command);
ScriptUtils.execute(command);
}
}, 1000);
return String.format("""
{
"header": {
"v" : "1.0.0",
"id" : 5,
"signal": "media::produce"
},
"body" : {
"kind" : "video",
"roomId" : "%s",
"transportId": "%s",
"appData" : {},
"rtpParameters":{
"codecs" : [{
"mimeType" : "video/vp8",
"clockRate" : 90000,
"payloadType" : 102,
"rtcpFeedback":[]
}],
"encodings": [{
"ssrc": 654321
}]
}
}
}
""", this.roomId, transportId);
}
}

View File

@@ -33,27 +33,34 @@ import lombok.extern.slf4j.Slf4j;
@Slf4j
@Protocol
@Description(
memo = "用来接入RTP终端",
memo = "用来接入RTP协议终端",
body = """
{
"roomId" : "房间ID",
"rtcpMux" : RTPRTCP端口复用true|false,
"comedia" : 自动识别终端端口true|false,
"enableSctp" : 是否开启SCTPtrue|false,
"numSctpStreams": SCTP数量,
"enableSrtp" : 是否开启SRTPtrue|false,
"roomId" : "房间ID",
"rtcpMux" : RTP/RTCP端口复用true|false,
"comedia" : 自动识别终端端口true|false,
"enableSctp" : 是否开启SCTPtrue|false,
"numSctpStreams" : SCTP数量,
"enableSrtp" : 是否开启SRTPtrue|false,
"srtpCryptoSuite": {
"cryptoSuite": "算法AEAD_AES_256_GCM|AEAD_AES_128_GCM|AES_CM_128_HMAC_SHA1_80|AES_CM_128_HMAC_SHA1_32",
"keyBase64" : "密钥"
}
}
{
roomId : "房间ID",
transportId: "通道ID",
ip : "RTP监听IP",
port : "RTP媒体端口",
rtcpPort : "RTP媒体RTCP端口"
}
"""
)
public class MediaTransportPlainProtocol extends ProtocolRoomAdapter {
public class MediaTransportPlainCreateProtocol extends ProtocolRoomAdapter {
public static final String SIGNAL = "media::transport::plain";
public static final String SIGNAL = "media::transport::plain::create";
public MediaTransportPlainProtocol() {
public MediaTransportPlainCreateProtocol() {
super("创建RTP输入通道信令", SIGNAL);
}
@@ -77,11 +84,20 @@ public class MediaTransportPlainProtocol extends ProtocolRoomAdapter {
log.warn("发送通道已经存在:{}", transportId);
}
clientWrapper.setSendTransport(sendTransport);
// TODO双向队列
// 拷贝属性
sendTransport.copy(responseBody);
// TODO需要测试
// 消费者
Transport recvTransport = clientWrapper.getRecvTransport();
if(recvTransport == null) {
recvTransport = new Transport(transportId, Direction.RECV, room, client);
// transports.put(transportId, recvTransport);
// 消费媒体
// this.publishEvent(new MediaConsumeEvent(room, clientWrapper));
} else {
log.warn("接收通道已经存在:{}", transportId);
}
clientWrapper.setRecvTransport(recvTransport);
client.push(response);
log.info("{}创建RTP信令通道:{}", clientId, transportId);
log.info("{}创建RTP输入通道:{}", clientId, transportId);
}
/**

View File

@@ -30,7 +30,7 @@ import lombok.extern.slf4j.Slf4j;
"dtlsParameters": "DTLS参数"
}
{
"roomId" : "房间标识",
"roomId" : "房间ID",
"transportId" : "传输通道标识"
}
"""
@@ -52,7 +52,7 @@ public class MediaTransportWebRtcConnectProtocol extends ProtocolRoomAdapter {
final Map<String, Object> responseBody = response.body();
client.push(response);
final String transportId = MapUtils.get(responseBody, Constant.TRANSPORT_ID);
log.info("{}连接WebRTC通道信令{}", clientId, transportId);
log.info("{}连接WebRTC通道{}", clientId, transportId);
} else {
this.logNoAdapter(clientType);
}

View File

@@ -34,15 +34,15 @@ import lombok.extern.slf4j.Slf4j;
body = {
"""
{
"roomId" : "房间标识",
"roomId" : "房间ID",
"forceTcp" : "强制使用TCP",
"producing" : "是否生产",
"consuming" : "是否消费",
"sctpCapabilities": "sctpCapabilities"
}
{
"roomId" : "房间标识",
"transportId" : "传输通道标识",
"roomId" : "房间ID",
"transportId" : "传输通道ID",
"iceCandidates" : "iceCandidates",
"iceParameters" : "iceParameters",
"dtlsParameters": "dtlsParameters",
@@ -72,6 +72,20 @@ public class MediaTransportWebRtcCreateProtocol extends ProtocolRoomAdapter {
this.rewriteIP(client.getIP(), responseBody);
// 处理逻辑
final ClientWrapper clientWrapper = room.clientWrapper(client);
// 生产者
final Boolean producing = MapUtils.getBoolean(body, Constant.PRODUCING);
if(Boolean.TRUE.equals(producing)) {
Transport sendTransport = clientWrapper.getSendTransport();
if(sendTransport == null) {
sendTransport = new Transport(transportId, Direction.SEND, room, client);
transports.put(transportId, sendTransport);
} else {
log.warn("发送通道已经存在:{}", transportId);
}
clientWrapper.setSendTransport(sendTransport);
// 拷贝属性
sendTransport.copy(responseBody);
}
// 消费者
final Boolean consuming = MapUtils.getBoolean(body, Constant.CONSUMING);
if(Boolean.TRUE.equals(consuming)) {
@@ -88,22 +102,8 @@ public class MediaTransportWebRtcCreateProtocol extends ProtocolRoomAdapter {
// 消费媒体:不能在连接时调用
this.publishEvent(new MediaConsumeEvent(room, clientWrapper));
}
// 生产者
final Boolean producing = MapUtils.getBoolean(body, Constant.PRODUCING);
if(Boolean.TRUE.equals(producing)) {
Transport sendTransport = clientWrapper.getSendTransport();
if(sendTransport == null) {
sendTransport = new Transport(transportId, Direction.SEND, room, client);
transports.put(transportId, sendTransport);
} else {
log.warn("发送通道已经存在:{}", transportId);
}
clientWrapper.setSendTransport(sendTransport);
// 拷贝属性
sendTransport.copy(responseBody);
}
client.push(response);
log.info("{}创建WebRTC通道信令{}", clientId, transportId);
log.info("{}创建WebRTC通道{}", clientId, transportId);
} else {
this.logNoAdapter(clientType);
}