diff --git a/taoyao-client-media/src/Taoyao.js b/taoyao-client-media/src/Taoyao.js index 0a49fc7..94246c3 100644 --- a/taoyao-client-media/src/Taoyao.js +++ b/taoyao-client-media/src/Taoyao.js @@ -452,6 +452,9 @@ class Taoyao { case "media::transport::close": this.mediaTransportClose(message, body); break; + case "media::transport::plain::in": + me.mediaTransportPlainIn(message, body); + break; case "media::transport::webrtc::connect": me.mediaTransportWebrtcConnect(message, body); break; @@ -1252,6 +1255,36 @@ class Taoyao { } } + /** + * 创建RTP输入通道信令 + * + * @param {*} message 消息 + * @param {*} body 消息主体 + */ + async mediaTransportPlainIn(message, body) { + const me = this; + const { roomId, rtcpMux, comedia, clientId } = body; + const plainTransportOptions = { + rtcpMux: rtcpMux, + comedia: comedia, + ...config.mediasoup.plainTransportOptions, + }; + const room = this.rooms.get(roomId); + const transport = await room.mediasoupRouter.createPlainTransport(plainTransportOptions); + transport.clientId = clientId; + room.transports.set(transport.id, transport); + console.info(transport.tuple) + console.info(transport.rtcpTuple) + message.body = { + ip : transport.tuple.localIp, + port : transport.tuple.localPort, + roomId : roomId, + rtcpPort : transport.rtcpTuple ? transport.rtcpTuple.localPort : undefined, + transportId : transport.id, + }; + me.push(message); + } + /** * 连接WebRTC通道信令 * diff --git a/taoyao-signal-server/README.md b/taoyao-signal-server/README.md index 6f9c815..5f62132 100644 --- a/taoyao-signal-server/README.md +++ b/taoyao-signal-server/README.md @@ -12,3 +12,4 @@ ## 信令格式 [信令格式](https://localhost:8888/protocol/list) + diff --git a/taoyao-signal-server/taoyao-server/src/main/java/com/acgist/taoyao/controller/ControlController.java b/taoyao-signal-server/taoyao-server/src/main/java/com/acgist/taoyao/controller/ControlController.java index 4d2f887..5e059ad 100644 --- a/taoyao-signal-server/taoyao-server/src/main/java/com/acgist/taoyao/controller/ControlController.java +++ b/taoyao-signal-server/taoyao-server/src/main/java/com/acgist/taoyao/controller/ControlController.java @@ -12,7 +12,7 @@ import com.acgist.taoyao.boot.model.Message; import com.acgist.taoyao.signal.config.camera.AiProperties; import com.acgist.taoyao.signal.config.camera.BeautyProperties; import com.acgist.taoyao.signal.config.camera.WatermarkProperties; -import com.acgist.taoyao.signal.model.control.PtzControl; +import com.acgist.taoyao.signal.model.control.PtzModel; import com.acgist.taoyao.signal.protocol.control.ControlAiProtocol; import com.acgist.taoyao.signal.protocol.control.ControlBeautyProtocol; import com.acgist.taoyao.signal.protocol.control.ControlBellProtocol; @@ -79,8 +79,8 @@ public class ControlController { @Operation(summary = "PTZ", description = "PTZ控制") @GetMapping("/ptz/{clientId}") - public Message ptz(@PathVariable String clientId, @Valid PtzControl ptzControl) { - return Message.success(this.controlPtzProtocol.execute(clientId, ptzControl)); + public Message ptz(@PathVariable String clientId, @Valid PtzModel ptzModel) { + return Message.success(this.controlPtzProtocol.execute(clientId, ptzModel)); } @Operation(summary = "响铃", description = "响铃控制") diff --git a/taoyao-signal-server/taoyao-server/src/test/java/com/acgist/taoyao/signal/SocketSignalTest.java b/taoyao-signal-server/taoyao-server/src/test/java/com/acgist/taoyao/signal/SocketSignalTest.java index 732a733..6c62f7f 100644 --- a/taoyao-signal-server/taoyao-server/src/test/java/com/acgist/taoyao/signal/SocketSignalTest.java +++ b/taoyao-signal-server/taoyao-server/src/test/java/com/acgist/taoyao/signal/SocketSignalTest.java @@ -5,12 +5,11 @@ import java.io.OutputStream; import java.net.InetSocketAddress; import java.net.Socket; import java.nio.ByteBuffer; -import java.util.concurrent.Executor; -import java.util.concurrent.Executors; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.Scanner; import javax.crypto.Cipher; +import org.apache.commons.lang3.StringUtils; import org.junit.jupiter.api.Test; import com.acgist.taoyao.boot.config.SocketProperties.Encrypt; @@ -26,13 +25,10 @@ public class SocketSignalTest { void testSocket() throws Exception { final Socket socket = new Socket(); socket.connect(new InetSocketAddress("127.0.0.1", 9999)); - final AtomicInteger recvIndex = new AtomicInteger(); final InputStream inputStream = socket.getInputStream(); final OutputStream outputStream = socket.getOutputStream(); // 随机密码:https://localhost:8888/config/socket - final String secret = """ - Oi7ZvxZEcOU= - """.strip(); + final String secret = "TSFXzB7hcfE=".strip(); final Cipher encrypt = CipherUtils.buildCipher(Cipher.ENCRYPT_MODE, Encrypt.DES, secret); final Cipher decrypt = CipherUtils.buildCipher(Cipher.DECRYPT_MODE, Encrypt.DES, secret); // 接收 @@ -68,7 +64,6 @@ public class SocketSignalTest { buffer.get(message); buffer.compact(); log.debug("收到消息:{}", new String(decrypt.doFinal(message))); - recvIndex.incrementAndGet(); } } } @@ -78,30 +73,41 @@ public class SocketSignalTest { } }).start(); // 发送 - final AtomicInteger sendIndex = new AtomicInteger(); - final Executor executor = Executors.newFixedThreadPool(10); - for (int index = 0; index < 100; index++) { - executor.execute(() -> { - try { - final byte[] bytes = ("{\"time\":" + System.nanoTime() + "}").getBytes(); - final byte[] encryptBytes = encrypt.doFinal(bytes); - final ByteBuffer buffer = ByteBuffer.allocateDirect(Short.BYTES + encryptBytes.length); - buffer.putShort((short) encryptBytes.length); - buffer.put(encryptBytes); - buffer.flip(); - final byte[] message = new byte[buffer.capacity()]; - buffer.get(message); - outputStream.write(message); - sendIndex.incrementAndGet(); - } catch (Exception e) { - log.error("发送异常", e); - } - }); - } - Thread.sleep(5000); - log.info("发送数据:{}", sendIndex.get()); - log.info("接收数据:{}", recvIndex.get()); + String line = """ + { + "header":{"v":"1.0.0","id":1215293599999001,"signal":"client::register"}, + "body":{"clientId":"ffmpeg","name":"ffmpeg","clientType":"WEB","battery":100,"charging":true,"username":"taoyao","password":"taoyao"} + } + """; + // {"header":{"v":"1.0.0","id":1215310510002009,"signal":"room::enter"},"body":{"roomId":"8260e615-3081-4bfc-96a8-574f4dd780d9"}} + // {"header":{"v":"1.0.0","id":1215310510002010,"signal":"media::transport::plain::in"},"body":{"roomId":"8260e615-3081-4bfc-96a8-574f4dd780d9","rtcpMux":false,"comedia":true}} + // {"header":{"v":"1.0.0","id":1215375110006012,"signal":"media::produce"},"body":{"kind":"video","roomId":"8260e615-3081-4bfc-96a8-574f4dd780d9","transportId":"14dc9307-bf9c-4442-a9ad-ce6a97623ef4","appData":{},"rtpParameters":{"codecs":[{"mimeType":"video/vp8","clockRate":90000,"payloadType":102,"rtcpFeedback":[]}],"encodings":[{"ssrc":123123}]}}} +// ffplay -protocol_whitelist "file,udp,rtp" taoyao.sdp +// ffmpeg -re -i video.mp4 -vcodec copy -map 0:0 -f rtp rtp://localhost:6666 > taoyao.sdp + // ffmpeg不支持rtcpMux +// ffmpeg -re -i video.mp4 -c:v libvpx -map 0:0 -f tee "[select=v:f=rtp:ssrc=123123:payload_type=102]rtp://192.168.1.110:40793?rtcpport=47218" +// ffmpeg -re -i video.mp4 -vcodec vp8 -map 0:0 -f tee "[select=v:f=rtp:ssrc=123123:payload_type=102]rtp://192.168.1.110:40793?rtcpport=47218" + final Scanner scanner = new Scanner(System.in); + do { + if(StringUtils.isEmpty(line)) { + break; + } + try { + final byte[] bytes = line.getBytes(); + final byte[] encryptBytes = encrypt.doFinal(bytes); + final ByteBuffer buffer = ByteBuffer.allocateDirect(Short.BYTES + encryptBytes.length); + buffer.putShort((short) encryptBytes.length); + buffer.put(encryptBytes); + buffer.flip(); + final byte[] message = new byte[buffer.capacity()]; + buffer.get(message); + outputStream.write(message); + } catch (Exception e) { + log.error("发送异常", e); + } + } while((line = scanner.next()) != null); socket.close(); + scanner.close(); } } diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/controller/RoomController.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/controller/RoomController.java index 7259927..219c9a9 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/controller/RoomController.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/controller/RoomController.java @@ -2,6 +2,7 @@ package com.acgist.taoyao.signal.controller; import java.util.List; +import org.springframework.validation.annotation.Validated; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; @@ -25,6 +26,7 @@ import io.swagger.v3.oas.annotations.tags.Tag; * @author acgist */ @Tag(name = "房间", description = "房间管理") +@Validated @RestController @RequestMapping("/room") public class RoomController { diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/model/control/PtzControl.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/model/control/PtzModel.java similarity index 96% rename from taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/model/control/PtzControl.java rename to taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/model/control/PtzModel.java index ec33df2..ac39b96 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/model/control/PtzControl.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/model/control/PtzModel.java @@ -9,7 +9,7 @@ import jakarta.validation.constraints.NotNull; * @author acgist */ @Schema(title = "PTZ控制参数", description = "PTZ控制参数") -public class PtzControl { +public class PtzModel { /** * PTZ类型 diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/control/ControlPtzProtocol.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/control/ControlPtzProtocol.java index c6fa41c..1b5d8fc 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/control/ControlPtzProtocol.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/control/ControlPtzProtocol.java @@ -7,7 +7,7 @@ import com.acgist.taoyao.boot.annotation.Protocol; import com.acgist.taoyao.boot.model.Message; import com.acgist.taoyao.signal.client.Client; import com.acgist.taoyao.signal.client.ClientType; -import com.acgist.taoyao.signal.model.control.PtzControl; +import com.acgist.taoyao.signal.model.control.PtzModel; import com.acgist.taoyao.signal.protocol.ProtocolControlAdapter; /** @@ -43,12 +43,12 @@ public class ControlPtzProtocol extends ProtocolControlAdapter { /** * @param clientId 终端标识 - * @param ptzControl PTZ控制参数 + * @param ptzModel PTZ控制参数 * * @return 执行结果 */ - public Message execute(String clientId, PtzControl ptzControl) { - return this.request(clientId, this.build(ptzControl)); + public Message execute(String clientId, PtzModel ptzModel) { + return this.request(clientId, this.build(ptzModel)); } } diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaTransportPlainInProtocol.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaTransportPlainInProtocol.java new file mode 100644 index 0000000..f60e677 --- /dev/null +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaTransportPlainInProtocol.java @@ -0,0 +1,90 @@ +package com.acgist.taoyao.signal.protocol.media; + +import java.util.Map; + +import org.apache.commons.lang3.StringUtils; + +import com.acgist.taoyao.boot.annotation.Description; +import com.acgist.taoyao.boot.annotation.Protocol; +import com.acgist.taoyao.boot.config.Constant; +import com.acgist.taoyao.boot.model.Message; +import com.acgist.taoyao.boot.utils.MapUtils; +import com.acgist.taoyao.boot.utils.NetUtils; +import com.acgist.taoyao.signal.client.Client; +import com.acgist.taoyao.signal.client.ClientType; +import com.acgist.taoyao.signal.party.media.ClientWrapper; +import com.acgist.taoyao.signal.party.media.Room; +import com.acgist.taoyao.signal.party.media.Transport; +import com.acgist.taoyao.signal.party.media.Transport.Direction; +import com.acgist.taoyao.signal.protocol.ProtocolRoomAdapter; + +import lombok.extern.slf4j.Slf4j; + +/** + * 创建RTP输入通道信令 + * TODO:srtp + * + * @author acgist + */ +@Slf4j +@Protocol +@Description( + body = """ + { + "roomId": "房间ID", + "rtcpMux": RTP和RTCP端口复用(true|false), + "comedia": 自动终端端口(true|false), + } + """ +) +public class MediaTransportPlainInProtocol extends ProtocolRoomAdapter { + + public static final String SIGNAL = "media::transport::plain::in"; + + public MediaTransportPlainInProtocol() { + super("创建RTP输入通道信令", SIGNAL); + } + + @Override + public void execute(String clientId, ClientType clientType, Room room, Client client, Client mediaClient, Message message, Map body) { + body.put(Constant.CLIENT_ID, clientId); + final Message response = room.request(message); + final Map responseBody = response.body(); + final Map transports = room.getTransports(); + final String transportId = MapUtils.get(responseBody, Constant.TRANSPORT_ID); + // 重写地址 + this.rewriteIp(client.ip(), responseBody); + // 处理逻辑 + final ClientWrapper clientWrapper = room.clientWrapper(client); + // 生产者 + Transport sendTransport = clientWrapper.getSendTransport(); + if(sendTransport == null) { + sendTransport = new Transport(transportId, Direction.SEND, room, client); + transports.put(transportId, sendTransport); + } else { + log.warn("发送通道已经存在:{}", transportId); + } + clientWrapper.setSendTransport(sendTransport); + // 拷贝属性 + sendTransport.copy(responseBody); + client.push(response); + log.info("{}创建RTP信令通道:{}", clientId, transportId); + } + + /** + * 重写IP地址 + * + * @param clientIp 终端IP + * @param body 消息主体 + */ + private void rewriteIp(String clientIp, Map body) { + // 媒体服务返回IP + final String mediaIp = (String) body.get(Constant.IP); + if(StringUtils.isNotEmpty(mediaIp)) { + final String rewriteIp = NetUtils.rewriteIp(mediaIp, clientIp); + log.debug("重写地址:{} + {} -> {}", mediaIp, clientIp, rewriteIp); + body.put(Constant.IP, rewriteIp); + } + } + +} diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaTransportPlainCreateProtocol.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaTransportPlainOutProtocol.java similarity index 50% rename from taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaTransportPlainCreateProtocol.java rename to taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaTransportPlainOutProtocol.java index 13cab55..ba36b6b 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaTransportPlainCreateProtocol.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaTransportPlainOutProtocol.java @@ -1,10 +1,10 @@ package com.acgist.taoyao.signal.protocol.media; /** - * 创建RTP通道信令 + * 创建RTP输出通道信令 * * @author acgist */ -public class MediaTransportPlainCreateProtocol { - +public class MediaTransportPlainOutProtocol { + } diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaTransportWebRtcConnectProtocol.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaTransportWebRtcConnectProtocol.java index 24ae279..e11cb20 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaTransportWebRtcConnectProtocol.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaTransportWebRtcConnectProtocol.java @@ -43,7 +43,7 @@ public class MediaTransportWebRtcConnectProtocol extends ProtocolRoomAdapter { final Map responseBody = response.body(); client.push(response); final String transportId = MapUtils.get(responseBody, Constant.TRANSPORT_ID); - log.info("{}连接WebRTC信令通道:{}", clientId, transportId); + log.info("{}连接WebRTC通道信令:{}", clientId, transportId); } else { // 忽略其他情况 } diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaTransportWebRtcCreateProtocol.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaTransportWebRtcCreateProtocol.java index cb04b53..28a7b46 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaTransportWebRtcCreateProtocol.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaTransportWebRtcCreateProtocol.java @@ -98,7 +98,7 @@ public class MediaTransportWebRtcCreateProtocol extends ProtocolRoomAdapter { sendTransport.copy(responseBody); } client.push(response); - log.info("{}创建WebRTC信令通道:{}", clientId, transportId); + log.info("{}创建WebRTC通道信令:{}", clientId, transportId); } /**