diff --git a/taoyao-client-media/src/Taoyao.js b/taoyao-client-media/src/Taoyao.js index 82e94d8..29c85df 100644 --- a/taoyao-client-media/src/Taoyao.js +++ b/taoyao-client-media/src/Taoyao.js @@ -465,8 +465,8 @@ class Taoyao { case "media::transport::close": me.mediaTransportClose(message, body); break; - case "media::transport::plain": - me.mediaTransportPlain(message, body); + case "media::transport::plain::create": + me.mediaTransportPlainCreate(message, body); break; case "media::transport::status": me.mediaTransportStatus(message, body); @@ -1628,7 +1628,7 @@ class Taoyao { * @param {*} message 消息 * @param {*} body 消息主体 */ - async mediaTransportPlain(message, body) { + async mediaTransportPlainCreate(message, body) { const me = this; const { roomId, diff --git a/taoyao-signal-server/taoyao-server/src/test/java/com/acgist/taoyao/rtp/RtpTest.java b/taoyao-signal-server/taoyao-server/src/test/java/com/acgist/taoyao/rtp/RtpTest.java index 88702c4..471cd85 100644 --- a/taoyao-signal-server/taoyao-server/src/test/java/com/acgist/taoyao/rtp/RtpTest.java +++ b/taoyao-signal-server/taoyao-server/src/test/java/com/acgist/taoyao/rtp/RtpTest.java @@ -5,7 +5,11 @@ import java.io.OutputStream; import java.net.InetSocketAddress; import java.net.Socket; import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Map; import java.util.Scanner; +import java.util.Timer; +import java.util.TimerTask; import javax.crypto.Cipher; @@ -13,22 +17,31 @@ import org.apache.commons.lang3.StringUtils; import org.junit.jupiter.api.Test; import com.acgist.taoyao.boot.config.SocketProperties.Encrypt; +import com.acgist.taoyao.boot.model.Message; import com.acgist.taoyao.boot.model.MessageCodeException; +import com.acgist.taoyao.boot.utils.JSONUtils; +import com.acgist.taoyao.boot.utils.ScriptUtils; import com.acgist.taoyao.signal.utils.CipherUtils; import lombok.extern.slf4j.Slf4j; +/** + * TODO:验证不能生产两个媒体 + */ @Slf4j public class RtpTest { + private String roomId = "79371adf-0f05-4852-a664-f00b1b77662d"; + private Map response = new HashMap<>(); + @Test void testSocket() throws Exception { final Socket socket = new Socket(); socket.connect(new InetSocketAddress("127.0.0.1", 9999)); - final InputStream inputStream = socket.getInputStream(); + final InputStream inputStream = socket.getInputStream(); final OutputStream outputStream = socket.getOutputStream(); // 随机密码:https://localhost:8888/config/socket - final String secret = "2SPWy+TF1zM=".strip(); + final String secret = "2SPWy+TF1zM=".strip(); final Cipher encrypt = CipherUtils.buildCipher(Cipher.ENCRYPT_MODE, Encrypt.DES, secret); final Cipher decrypt = CipherUtils.buildCipher(Cipher.DECRYPT_MODE, Encrypt.DES, secret); // 接收 @@ -63,12 +76,15 @@ public class RtpTest { buffer.flip(); buffer.get(message); buffer.compact(); - final String value = new String(decrypt.doFinal(message)); - if(value.contains("media::audio::volume")) { + final String value = new String(decrypt.doFinal(message)); + final Message response = JSONUtils.toJava(value, Message.class); + final String signal = response.getHeader().getSignal(); + if("media::audio::volume".equals(signal)) { log.debug("收到消息:{}", value); } else { log.info("收到消息:{}", value); } + this.response.put(signal, response); } } } @@ -77,36 +93,24 @@ public class RtpTest { log.error("读取异常", e); } }).start(); - // 发送 - String line = """ - { - "header":{"v":"1.0.0","id":1215293599999001,"signal":"client::register"}, - "body":{"clientId":"ffmpeg","name":"ffmpeg","clientType":"WEB","battery":100,"charging":true,"username":"taoyao","password":"taoyao"} - } - """; - // {"header":{"v":"1.0.0","id":1215310510002009,"signal":"room::enter"},"body":{"roomId":"8260e615-3081-4bfc-96a8-574f4dd780d9"}} - // {"header":{"v":"1.0.0","id":1215310510002010,"signal":"media::transport::plain"},"body":{"roomId":"8260e615-3081-4bfc-96a8-574f4dd780d9","rtcpMux":false,"comedia":true}} - // {"header":{"v":"1.0.0","id":1215375110006012,"signal":"media::produce"},"body":{"kind":"video","roomId":"8260e615-3081-4bfc-96a8-574f4dd780d9","transportId":"14dc9307-bf9c-4442-a9ad-ce6a97623ef4","appData":{},"rtpParameters":{"codecs":[{"mimeType":"video/vp8","clockRate":90000,"payloadType":102,"rtcpFeedback":[]}],"encodings":[{"ssrc":123123}]}}} - // 音频转为PCM -// ffmpeg.exe -i .\a.m4a -f s16le a.pcm -// ffmpeg.exe -i .\a.m4a -f s16le -ac 2 -ar 8000 a.pcm -// ffplay.exe -ar 48000 -ac 2 -f s16le -i a.pcm // ffmpeg不支持rtcpMux -// ffmpeg -re -i video.mp4 -c:v vp8 -map 0:0 -f tee "[select=v:f=rtp:ssrc=123123:payload_type=102]rtp://192.168.1.110:40793?rtcpport=47218" -// ffmpeg -re -i video.mp4 -c:v libvpx -map 0:0 -f tee "[select=v:f=rtp:ssrc=123123:payload_type=102]rtp://192.168.1.110:40793?rtcpport=47218" +// ffmpeg -re -i video.mp4 -c:v vp8 -map 0:0 -f tee "[select=v:f=rtp:ssrc=123456:payload_type=102]rtp://192.168.1.110:40793?rtcpport=47218" +// ffmpeg -re -i video.mp4 -c:v libvpx -map 0:0 -f tee "[select=v:f=rtp:ssrc=123456:payload_type=102]rtp://192.168.1.110:40793?rtcpport=47218" // 音频视频同时传输 // ffmpeg -re -i video.mp4 -c:a libopus -vn -f rtp rtp://192.168.1.110:8888 -c:v libx264 -an -f rtp rtp://192.168.1.110:9999 -sdp_file taoyao.sdp // ffplay -protocol_whitelist "file,rtp,udp" -i taoyao.sdp // ffmpeg -protocol_whitelist "file,rtp,udp" -i taoyao.sdp taoyao.mp4 + // 发送命令:register/enter/create/audio/video + String command = "register"; final Scanner scanner = new Scanner(System.in); do { - if(StringUtils.isEmpty(line)) { + if(StringUtils.isEmpty(command)) { break; } try { - final byte[] bytes = line.getBytes(); + final byte[] bytes = this.request(command).getBytes(); final byte[] encryptBytes = encrypt.doFinal(bytes); - final ByteBuffer buffer = ByteBuffer.allocateDirect(Short.BYTES + encryptBytes.length); + final ByteBuffer buffer = ByteBuffer.allocateDirect(Short.BYTES + encryptBytes.length); buffer.putShort((short) encryptBytes.length); buffer.put(encryptBytes); buffer.flip(); @@ -116,9 +120,162 @@ public class RtpTest { } catch (Exception e) { log.error("发送异常", e); } - } while((line = scanner.next()) != null); + } while((command = scanner.next()) != null); socket.close(); scanner.close(); } + private String request(String command) { + return switch (command) { + case "register" -> this.register(); + case "enter" -> this.enter(); + case "transport" -> this.transport(); + case "audio" -> this.produceAudio(); + case "video" -> this.produceVideo(); + default -> null; + }; + } + + private String register() { + return """ + { + "header": { + "v" : "1.0.0", + "id" : 1, + "signal": "client::register" + }, + "body" : { + "clientId" : "ffmpeg", + "name" : "ffmpeg", + "clientType": "WEB", + "battery" : 100, + "charging" : true, + "username" : "taoyao", + "password" : "taoyao" + } + } + """; + } + + private String enter() { + return String.format(""" + { + "header": { + "v" : "1.0.0", + "id" : 2, + "signal": "room::enter" + }, + "body" : { + "roomId": "%s" + } + } + """, this.roomId); + } + + private String transport() { + return String.format(""" + { + "header": { + "v" : "1.0.0", + "id" : 3, + "signal": "media::transport::plain::create"}, + "body" : { + "roomId" : "%s", + "rtcpMux": false, + "comedia": true + } + } + """, this.roomId); + } + + private String produceAudio() { + final Message message = this.response.get("media::transport::plain::create"); + final Map body = message.body(); + final String transportId = body.get("transportId"); + new Timer().schedule(new TimerTask() { + @Override + public void run() { + final Message produce = RtpTest.this.response.get("media::transport::plain::create"); + final Map map = produce.body(); + final String ip = map.get("ip").toString(); + final String port = map.get("port").toString(); + final String rtcpPort = map.get("rtcpPort").toString(); + final String command = String.format("ffmpeg -re -i D:\\tmp\\video.mp4 -c:a libopus -map 0:1 -f tee \"[select=a:f=rtp:ssrc=123456:payload_type=100]rtp://%s:%s?rtcpport=%s\"", ip, port, rtcpPort); + log.info("执行命令:{}", command); + ScriptUtils.execute(command); + } + }, 1000); + return String.format(""" + { + "header": { + "v" : "1.0.0", + "id" : 4, + "signal": "media::produce" + }, + "body" : { + "kind" : "audio", + "roomId" : "%s", + "transportId": "%s", + "appData" : {}, + "rtpParameters":{ + "codecs" : [{ + "mimeType" : "audio/opus", + "channels" : 2, + "clockRate" : 48000, + "payloadType": 100 + }], + "encodings": [{ + "ssrc": 123456 + }] + } + } + } + """, this.roomId, transportId); + } + + private String produceVideo() { + final Message message = this.response.get("media::transport::plain::create"); + final Map body = message.body(); + final String transportId = body.get("transportId"); + new Timer().schedule(new TimerTask() { + @Override + public void run() { + final Message produce = RtpTest.this.response.get("media::transport::plain::create"); + final Map map = produce.body(); + final String ip = map.get("ip").toString(); + final String port = map.get("port").toString(); + final String rtcpPort = map.get("rtcpPort").toString(); + final String command = String.format("ffmpeg -re -i D:\\tmp\\video.mp4 -c:v libvpx -map 0:0 -f tee \"[select=v:f=rtp:ssrc=654321:payload_type=102]rtp://%s:%s?rtcpport=%s\"", ip, port, rtcpPort); + log.info("执行命令:{}", command); + ScriptUtils.execute(command); + } + }, 1000); + return String.format(""" + { + "header": { + "v" : "1.0.0", + "id" : 5, + "signal": "media::produce" + }, + "body" : { + "kind" : "video", + "roomId" : "%s", + "transportId": "%s", + "appData" : {}, + "rtpParameters":{ + "codecs" : [{ + "mimeType" : "video/vp8", + "clockRate" : 90000, + "payloadType" : 102, + "rtcpFeedback":[] + }], + "encodings": [{ + "ssrc": 654321 + }] + } + } + } + """, this.roomId, transportId); + } + } diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaTransportPlainProtocol.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaTransportPlainCreateProtocol.java similarity index 70% rename from taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaTransportPlainProtocol.java rename to taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaTransportPlainCreateProtocol.java index 9db277d..3655f4a 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaTransportPlainProtocol.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaTransportPlainCreateProtocol.java @@ -33,27 +33,34 @@ import lombok.extern.slf4j.Slf4j; @Slf4j @Protocol @Description( - memo = "用来接入RTP终端", + memo = "用来接入RTP协议终端", body = """ { - "roomId" : "房间ID", - "rtcpMux" : RTP和RTCP端口复用(true|false), - "comedia" : 自动识别终端端口(true|false), - "enableSctp" : 是否开启SCTP(true|false), - "numSctpStreams": SCTP数量, - "enableSrtp" : 是否开启SRTP(true|false), + "roomId" : "房间ID", + "rtcpMux" : RTP/RTCP端口复用(true|false), + "comedia" : 自动识别终端端口(true|false), + "enableSctp" : 是否开启SCTP(true|false), + "numSctpStreams" : SCTP数量, + "enableSrtp" : 是否开启SRTP(true|false), "srtpCryptoSuite": { "cryptoSuite": "算法(AEAD_AES_256_GCM|AEAD_AES_128_GCM|AES_CM_128_HMAC_SHA1_80|AES_CM_128_HMAC_SHA1_32)", "keyBase64" : "密钥" } } + { + roomId : "房间ID", + transportId: "通道ID", + ip : "RTP监听IP", + port : "RTP媒体端口", + rtcpPort : "RTP媒体RTCP端口" + } """ ) -public class MediaTransportPlainProtocol extends ProtocolRoomAdapter { +public class MediaTransportPlainCreateProtocol extends ProtocolRoomAdapter { - public static final String SIGNAL = "media::transport::plain"; + public static final String SIGNAL = "media::transport::plain::create"; - public MediaTransportPlainProtocol() { + public MediaTransportPlainCreateProtocol() { super("创建RTP输入通道信令", SIGNAL); } @@ -77,11 +84,20 @@ public class MediaTransportPlainProtocol extends ProtocolRoomAdapter { log.warn("发送通道已经存在:{}", transportId); } clientWrapper.setSendTransport(sendTransport); - // TODO:双向队列 - // 拷贝属性 - sendTransport.copy(responseBody); + // TODO:需要测试 + // 消费者 + Transport recvTransport = clientWrapper.getRecvTransport(); + if(recvTransport == null) { + recvTransport = new Transport(transportId, Direction.RECV, room, client); + // transports.put(transportId, recvTransport); + // 消费媒体 + // this.publishEvent(new MediaConsumeEvent(room, clientWrapper)); + } else { + log.warn("接收通道已经存在:{}", transportId); + } + clientWrapper.setRecvTransport(recvTransport); client.push(response); - log.info("{}创建RTP信令通道:{}", clientId, transportId); + log.info("{}创建RTP输入通道:{}", clientId, transportId); } /** diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaTransportWebRtcConnectProtocol.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaTransportWebRtcConnectProtocol.java index 9b59e66..0f76e01 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaTransportWebRtcConnectProtocol.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaTransportWebRtcConnectProtocol.java @@ -30,7 +30,7 @@ import lombok.extern.slf4j.Slf4j; "dtlsParameters": "DTLS参数" } { - "roomId" : "房间标识", + "roomId" : "房间ID", "transportId" : "传输通道标识" } """ @@ -52,7 +52,7 @@ public class MediaTransportWebRtcConnectProtocol extends ProtocolRoomAdapter { final Map responseBody = response.body(); client.push(response); final String transportId = MapUtils.get(responseBody, Constant.TRANSPORT_ID); - log.info("{}连接WebRTC通道信令:{}", clientId, transportId); + log.info("{}连接WebRTC通道:{}", clientId, transportId); } else { this.logNoAdapter(clientType); } diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaTransportWebRtcCreateProtocol.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaTransportWebRtcCreateProtocol.java index 5965e00..1baa56b 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaTransportWebRtcCreateProtocol.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaTransportWebRtcCreateProtocol.java @@ -34,15 +34,15 @@ import lombok.extern.slf4j.Slf4j; body = { """ { - "roomId" : "房间标识", + "roomId" : "房间ID", "forceTcp" : "强制使用TCP", "producing" : "是否生产", "consuming" : "是否消费", "sctpCapabilities": "sctpCapabilities" } { - "roomId" : "房间标识", - "transportId" : "传输通道标识", + "roomId" : "房间ID", + "transportId" : "传输通道ID", "iceCandidates" : "iceCandidates", "iceParameters" : "iceParameters", "dtlsParameters": "dtlsParameters", @@ -72,6 +72,20 @@ public class MediaTransportWebRtcCreateProtocol extends ProtocolRoomAdapter { this.rewriteIP(client.getIP(), responseBody); // 处理逻辑 final ClientWrapper clientWrapper = room.clientWrapper(client); + // 生产者 + final Boolean producing = MapUtils.getBoolean(body, Constant.PRODUCING); + if(Boolean.TRUE.equals(producing)) { + Transport sendTransport = clientWrapper.getSendTransport(); + if(sendTransport == null) { + sendTransport = new Transport(transportId, Direction.SEND, room, client); + transports.put(transportId, sendTransport); + } else { + log.warn("发送通道已经存在:{}", transportId); + } + clientWrapper.setSendTransport(sendTransport); + // 拷贝属性 + sendTransport.copy(responseBody); + } // 消费者 final Boolean consuming = MapUtils.getBoolean(body, Constant.CONSUMING); if(Boolean.TRUE.equals(consuming)) { @@ -88,22 +102,8 @@ public class MediaTransportWebRtcCreateProtocol extends ProtocolRoomAdapter { // 消费媒体:不能在连接时调用 this.publishEvent(new MediaConsumeEvent(room, clientWrapper)); } - // 生产者 - final Boolean producing = MapUtils.getBoolean(body, Constant.PRODUCING); - if(Boolean.TRUE.equals(producing)) { - Transport sendTransport = clientWrapper.getSendTransport(); - if(sendTransport == null) { - sendTransport = new Transport(transportId, Direction.SEND, room, client); - transports.put(transportId, sendTransport); - } else { - log.warn("发送通道已经存在:{}", transportId); - } - clientWrapper.setSendTransport(sendTransport); - // 拷贝属性 - sendTransport.copy(responseBody); - } client.push(response); - log.info("{}创建WebRTC通道信令:{}", clientId, transportId); + log.info("{}创建WebRTC通道:{}", clientId, transportId); } else { this.logNoAdapter(clientType); }