diff --git a/taoyao-media-server/src/Server.js b/taoyao-media-server/src/Server.js index 976a01a..b6e23a5 100644 --- a/taoyao-media-server/src/Server.js +++ b/taoyao-media-server/src/Server.js @@ -5,7 +5,7 @@ const fs = require("fs"); const ws = require("ws"); const https = require("https"); -const mediasoup = require("mediasoup"); +// const mediasoup = require("mediasoup"); const config = require("./Config"); const Logger = require("./Logger"); const Signal = require("./Signal"); @@ -174,21 +174,23 @@ async function onmessage(message, session) { // 授权验证 if (!session.authorize) { if ( - data.username === config.https.username && - data.password === config.https.password + data.body.username === config.https.username && + data.body.password === config.https.password ) { logger.debug("授权成功:%s", session._socket.remoteAddress); session.authorize = true; + data.code = "0000"; + data.message = "授权成功"; + data.body.username = null; + data.body.password = null; + session.send(JSON.stringify(data)); } else { logger.warn("授权失败:%s", session._socket.remoteAddress); - session.close(); - } - for (let i = 0; i < client.length; i++) { - if (client[i] === session) { - client.splice(i, 1); - break; - } + data.code = "3401"; + data.message = "授权失败"; + session.send(JSON.stringify(data)); } + // 不要传递授权信息 return; } // 处理信令 @@ -211,12 +213,12 @@ async function onmessage(message, session) { async function main() { logger.debug("DEBUG").info("INFO").warn("WARN").error("ERROR"); logger.info("开始启动:%s", config.name); - await buildMediasoupWorker(); + // await buildMediasoupWorker(); await buildSignalServer(); await buildCommandConsole(); await buildClientInterval(); logger.info("启动完成:%s", config.name); } -// 启动 +// 启动服务 main(); diff --git a/taoyao-signal-server/taoyao-boot/src/main/java/com/acgist/taoyao/boot/model/Message.java b/taoyao-signal-server/taoyao-boot/src/main/java/com/acgist/taoyao/boot/model/Message.java index 2060776..f4ada0c 100644 --- a/taoyao-signal-server/taoyao-boot/src/main/java/com/acgist/taoyao/boot/model/Message.java +++ b/taoyao-signal-server/taoyao-boot/src/main/java/com/acgist/taoyao/boot/model/Message.java @@ -49,6 +49,15 @@ public class Message implements Cloneable, Serializable { @Schema(title = "请求响应主体", description = "请求响应主体") private Object body; + /** + * 覆盖 + * + * @param code 状态编码 + */ + public void setCode(String code) { + this.code = code; + } + /** * @param code 状态编码 * diff --git a/taoyao-signal-server/taoyao-media/README.md b/taoyao-signal-server/taoyao-media/README.md index eb234f7..5fcd8c5 100644 --- a/taoyao-signal-server/taoyao-media/README.md +++ b/taoyao-signal-server/taoyao-media/README.md @@ -2,4 +2,31 @@ ## 媒体信令 -### +### 信令格式 + +``` +{ + "header": { + "v": "版本", + "id": 请求标识, + "sn": "设备标识" + "pid": 信令标识, + }, + "code": "响应编码", + "message": "响应描述", + "body": { + // 信令主体 + } +} +``` + +### 终端 + +#### 授权信息 + +``` +``` + +### 路由 + +### 传输 diff --git a/taoyao-signal-server/taoyao-media/src/main/java/com/acgist/taoyao/mediasoup/MediasoupClient.java b/taoyao-signal-server/taoyao-media/src/main/java/com/acgist/taoyao/mediasoup/MediasoupClient.java index 9460d8c..c529f78 100644 --- a/taoyao-signal-server/taoyao-media/src/main/java/com/acgist/taoyao/mediasoup/MediasoupClient.java +++ b/taoyao-signal-server/taoyao-media/src/main/java/com/acgist/taoyao/mediasoup/MediasoupClient.java @@ -14,19 +14,27 @@ import java.time.Duration; import java.time.Instant; import java.util.Map; import java.util.concurrent.CompletionStage; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import javax.net.ssl.SSLContext; import javax.net.ssl.X509TrustManager; +import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.TaskScheduler; import org.springframework.stereotype.Service; +import com.acgist.taoyao.boot.model.Header; +import com.acgist.taoyao.boot.model.Message; import com.acgist.taoyao.boot.property.MediasoupProperties; import com.acgist.taoyao.boot.property.TaoyaoProperties; import com.acgist.taoyao.boot.property.WebrtcProperties; import com.acgist.taoyao.boot.utils.JSONUtils; +import com.acgist.taoyao.mediasoup.protocol.ProtocolMediasoupAdapter; +import com.acgist.taoyao.mediasoup.protocol.client.AuthorizeProtocol; +import com.acgist.taoyao.signal.protocol.Protocol; +import com.acgist.taoyao.signal.protocol.ProtocolManager; import jakarta.annotation.PostConstruct; import lombok.extern.slf4j.Slf4j; @@ -39,7 +47,18 @@ import lombok.extern.slf4j.Slf4j; @Slf4j @Service public class MediasoupClient { - + + @Autowired + private TaskScheduler taskSchedulerl; + @Autowired + private ProtocolManager protocolManager; + @Autowired + private TaoyaoProperties taoyaoProperties; + @Autowired + private WebrtcProperties webrtcProperties; + @Autowired + private AuthorizeProtocol authorizeProtocol; + /** * Mediasoup WebSocket通道 */ @@ -48,13 +67,10 @@ public class MediasoupClient { * Mediasoup配置 */ private MediasoupProperties mediasoupProperties; - - @Autowired - private TaskScheduler taskSchedulerl; - @Autowired - private TaoyaoProperties taoyaoProperties; - @Autowired - private WebrtcProperties webrtcProperties; + /** + * 同步消息 + */ + private Map syncMessage = new ConcurrentHashMap<>(); @PostConstruct public void init() { @@ -88,13 +104,68 @@ public class MediasoupClient { * * @param message 消息 */ - public void send(Object message) { + public void send(Message message) { while(this.webSocket == null) { Thread.yield(); } this.webSocket.sendText(JSONUtils.toJSON(message), true); } + /** + * 同步发送消息 + * + * @param message 消息 + * + * @return 响应 + */ + public Message sendSync(Message message) { + final String id = message.getHeader().getId(); + this.syncMessage.put(id, message); + synchronized (message) { + try { + message.wait(this.taoyaoProperties.getTimeout()); + } catch (InterruptedException e) { + log.error("等待同步消息异常:{}", message, e); + } + } + final Message response = this.syncMessage.remove(id); + if(response == null || message.equals(response)) { + log.warn("消息没有响应:{}", message); + } + return response; + } + + /** + * 处理消息 + * + * @param data 消息 + */ + private void execute(String data) { + if(StringUtils.isNotEmpty(data)) { + final Message message = JSONUtils.toJava(data, Message.class); + final Header header = message.getHeader(); + final String id = header.getId(); + final Integer pid = header.getPid(); + final Message request = this.syncMessage.get(id); + // 存在同步响应 + if(request != null) { + // 重新设置消息 + this.syncMessage.put(id, message); + // 唤醒等待现场 + synchronized (request) { + request.notifyAll(); + } + } else { + final Protocol protocol = this.protocolManager.protocol(pid); + if(protocol instanceof ProtocolMediasoupAdapter mediasoupProtocol) { + mediasoupProtocol.execute(message, this.webSocket); + } else { + log.warn("未知Mediasoup信令:{}", data); + } + } + } + } + /** * 消息监听 * @@ -113,10 +184,7 @@ public class MediasoupClient { // 设置新的通道 MediasoupClient.this.webSocket = webSocket; // 发送授权消息 - MediasoupClient.this.send(Map.of( - "username", MediasoupClient.this.mediasoupProperties.getUsername(), - "password", MediasoupClient.this.mediasoupProperties.getPassword() - )); + MediasoupClient.this.send(MediasoupClient.this.authorizeProtocol.build()); } @Override @@ -128,6 +196,7 @@ public class MediasoupClient { @Override public CompletionStage onText(WebSocket webSocket, CharSequence data, boolean last) { log.debug("Mediasoup收到消息(text):{}-{}", webSocket, data); + MediasoupClient.this.execute(data.toString()); return Listener.super.onText(webSocket, data, last); } diff --git a/taoyao-signal-server/taoyao-media/src/main/java/com/acgist/taoyao/mediasoup/protocol/ProtocolMediasoupAdapter.java b/taoyao-signal-server/taoyao-media/src/main/java/com/acgist/taoyao/mediasoup/protocol/ProtocolMediasoupAdapter.java new file mode 100644 index 0000000..02291b4 --- /dev/null +++ b/taoyao-signal-server/taoyao-media/src/main/java/com/acgist/taoyao/mediasoup/protocol/ProtocolMediasoupAdapter.java @@ -0,0 +1,40 @@ +package com.acgist.taoyao.mediasoup.protocol; + +import java.net.http.WebSocket; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Lazy; + +import com.acgist.taoyao.boot.model.Message; +import com.acgist.taoyao.mediasoup.MediasoupClient; +import com.acgist.taoyao.signal.client.ClientSession; +import com.acgist.taoyao.signal.protocol.ProtocolAdapter; + +/** + * Mediasoup信令适配器 + * + * @author acgist + */ +public abstract class ProtocolMediasoupAdapter extends ProtocolAdapter { + + @Lazy + @Autowired + protected MediasoupClient mediasoupClient; + + protected ProtocolMediasoupAdapter(Integer pid, String name) { + super(pid, name); + } + + @Override + public void execute(String sn, Message message, ClientSession session) { + } + + /** + * 处理Mediasoup信令 + * + * @param message 信令消息 + * @param webSocket WebSocket + */ + public abstract void execute(Message message, WebSocket webSocket); + +} diff --git a/taoyao-signal-server/taoyao-media/src/main/java/com/acgist/taoyao/mediasoup/protocol/client/AuthorizeProtocol.java b/taoyao-signal-server/taoyao-media/src/main/java/com/acgist/taoyao/mediasoup/protocol/client/AuthorizeProtocol.java new file mode 100644 index 0000000..6a2744c --- /dev/null +++ b/taoyao-signal-server/taoyao-media/src/main/java/com/acgist/taoyao/mediasoup/protocol/client/AuthorizeProtocol.java @@ -0,0 +1,48 @@ +package com.acgist.taoyao.mediasoup.protocol.client; + +import java.net.http.WebSocket; +import java.util.Map; + +import org.springframework.beans.factory.annotation.Autowired; + +import com.acgist.taoyao.boot.annotation.Protocol; +import com.acgist.taoyao.boot.model.Message; +import com.acgist.taoyao.boot.property.MediasoupProperties; +import com.acgist.taoyao.boot.property.WebrtcProperties; +import com.acgist.taoyao.mediasoup.protocol.ProtocolMediasoupAdapter; + +import lombok.extern.slf4j.Slf4j; + +/** + * Mediasoup终端授权信令 + * + * @author acgist + */ +@Slf4j +@Protocol +public class AuthorizeProtocol extends ProtocolMediasoupAdapter { + + public static final Integer PID = 6000; + + @Autowired + private WebrtcProperties webrtcProperties; + + public AuthorizeProtocol() { + super(PID, "Mediasoup终端授权信令"); + } + + @Override + public Message build() { + final MediasoupProperties mediasoup = this.webrtcProperties.getMediasoup(); + return super.build(Map.of( + "username", mediasoup.getUsername(), + "password", mediasoup.getPassword() + )); + } + + @Override + public void execute(Message message, WebSocket webSocket) { + log.info("Mediasoup终端授权结果:{}", message); + } + +} diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/Protocol.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/Protocol.java index 8fea884..81cdab5 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/Protocol.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/Protocol.java @@ -13,6 +13,7 @@ import com.acgist.taoyao.signal.event.ApplicationEventAdapter; * 3000~3999:会议信令 * 4000~4999:直播信令 * 5000~5999:媒体信令 + * 6000~6999:媒体信令(Mediasoup) * * @author acgist */ diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/ProtocolManager.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/ProtocolManager.java index 0150c43..34f4c09 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/ProtocolManager.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/ProtocolManager.java @@ -1,5 +1,6 @@ package com.acgist.taoyao.signal.protocol; +import java.net.http.WebSocket; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -62,6 +63,15 @@ public class ProtocolManager { }); } + /** + * @param pid 信令标识 + * + * @return 信令 + */ + public Protocol protocol(Integer pid) { + return this.protocolMapping.get(pid); + } + /** * 执行信令消息 * @@ -111,5 +121,5 @@ public class ProtocolManager { session.push(this.errorProtocol.build(MessageCode.CODE_3401, "终端会话没有授权")); } } - + }