[+] 媒体信令

This commit is contained in:
acgist
2023-02-04 23:20:09 +08:00
parent a8aea2548e
commit 790666b221
8 changed files with 233 additions and 27 deletions

View File

@@ -5,7 +5,7 @@
const fs = require("fs"); const fs = require("fs");
const ws = require("ws"); const ws = require("ws");
const https = require("https"); const https = require("https");
const mediasoup = require("mediasoup"); // const mediasoup = require("mediasoup");
const config = require("./Config"); const config = require("./Config");
const Logger = require("./Logger"); const Logger = require("./Logger");
const Signal = require("./Signal"); const Signal = require("./Signal");
@@ -174,21 +174,23 @@ async function onmessage(message, session) {
// 授权验证 // 授权验证
if (!session.authorize) { if (!session.authorize) {
if ( if (
data.username === config.https.username && data.body.username === config.https.username &&
data.password === config.https.password data.body.password === config.https.password
) { ) {
logger.debug("授权成功:%s", session._socket.remoteAddress); logger.debug("授权成功:%s", session._socket.remoteAddress);
session.authorize = true; session.authorize = true;
data.code = "0000";
data.message = "授权成功";
data.body.username = null;
data.body.password = null;
session.send(JSON.stringify(data));
} else { } else {
logger.warn("授权失败:%s", session._socket.remoteAddress); logger.warn("授权失败:%s", session._socket.remoteAddress);
session.close(); data.code = "3401";
} data.message = "授权失败";
for (let i = 0; i < client.length; i++) { session.send(JSON.stringify(data));
if (client[i] === session) {
client.splice(i, 1);
break;
}
} }
// 不要传递授权信息
return; return;
} }
// 处理信令 // 处理信令
@@ -211,12 +213,12 @@ async function onmessage(message, session) {
async function main() { async function main() {
logger.debug("DEBUG").info("INFO").warn("WARN").error("ERROR"); logger.debug("DEBUG").info("INFO").warn("WARN").error("ERROR");
logger.info("开始启动:%s", config.name); logger.info("开始启动:%s", config.name);
await buildMediasoupWorker(); // await buildMediasoupWorker();
await buildSignalServer(); await buildSignalServer();
await buildCommandConsole(); await buildCommandConsole();
await buildClientInterval(); await buildClientInterval();
logger.info("启动完成:%s", config.name); logger.info("启动完成:%s", config.name);
} }
// 启动 // 启动服务
main(); main();

View File

@@ -49,6 +49,15 @@ public class Message implements Cloneable, Serializable {
@Schema(title = "请求响应主体", description = "请求响应主体") @Schema(title = "请求响应主体", description = "请求响应主体")
private Object body; private Object body;
/**
* 覆盖
*
* @param code 状态编码
*/
public void setCode(String code) {
this.code = code;
}
/** /**
* @param code 状态编码 * @param code 状态编码
* *

View File

@@ -2,4 +2,31 @@
## 媒体信令 ## 媒体信令
### ### 信令格式
```
{
"header": {
"v": "版本",
"id": 请求标识,
"sn": "设备标识"
"pid": 信令标识,
},
"code": "响应编码",
"message": "响应描述",
"body": {
// 信令主体
}
}
```
### 终端
#### 授权信息
```
```
### 路由
### 传输

View File

@@ -14,19 +14,27 @@ import java.time.Duration;
import java.time.Instant; import java.time.Instant;
import java.util.Map; import java.util.Map;
import java.util.concurrent.CompletionStage; import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import javax.net.ssl.SSLContext; import javax.net.ssl.SSLContext;
import javax.net.ssl.X509TrustManager; import javax.net.ssl.X509TrustManager;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.TaskScheduler; import org.springframework.scheduling.TaskScheduler;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import com.acgist.taoyao.boot.model.Header;
import com.acgist.taoyao.boot.model.Message;
import com.acgist.taoyao.boot.property.MediasoupProperties; import com.acgist.taoyao.boot.property.MediasoupProperties;
import com.acgist.taoyao.boot.property.TaoyaoProperties; import com.acgist.taoyao.boot.property.TaoyaoProperties;
import com.acgist.taoyao.boot.property.WebrtcProperties; import com.acgist.taoyao.boot.property.WebrtcProperties;
import com.acgist.taoyao.boot.utils.JSONUtils; import com.acgist.taoyao.boot.utils.JSONUtils;
import com.acgist.taoyao.mediasoup.protocol.ProtocolMediasoupAdapter;
import com.acgist.taoyao.mediasoup.protocol.client.AuthorizeProtocol;
import com.acgist.taoyao.signal.protocol.Protocol;
import com.acgist.taoyao.signal.protocol.ProtocolManager;
import jakarta.annotation.PostConstruct; import jakarta.annotation.PostConstruct;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@@ -39,7 +47,18 @@ import lombok.extern.slf4j.Slf4j;
@Slf4j @Slf4j
@Service @Service
public class MediasoupClient { public class MediasoupClient {
@Autowired
private TaskScheduler taskSchedulerl;
@Autowired
private ProtocolManager protocolManager;
@Autowired
private TaoyaoProperties taoyaoProperties;
@Autowired
private WebrtcProperties webrtcProperties;
@Autowired
private AuthorizeProtocol authorizeProtocol;
/** /**
* Mediasoup WebSocket通道 * Mediasoup WebSocket通道
*/ */
@@ -48,13 +67,10 @@ public class MediasoupClient {
* Mediasoup配置 * Mediasoup配置
*/ */
private MediasoupProperties mediasoupProperties; private MediasoupProperties mediasoupProperties;
/**
@Autowired * 同步消息
private TaskScheduler taskSchedulerl; */
@Autowired private Map<String, Message> syncMessage = new ConcurrentHashMap<>();
private TaoyaoProperties taoyaoProperties;
@Autowired
private WebrtcProperties webrtcProperties;
@PostConstruct @PostConstruct
public void init() { public void init() {
@@ -88,13 +104,68 @@ public class MediasoupClient {
* *
* @param message 消息 * @param message 消息
*/ */
public void send(Object message) { public void send(Message message) {
while(this.webSocket == null) { while(this.webSocket == null) {
Thread.yield(); Thread.yield();
} }
this.webSocket.sendText(JSONUtils.toJSON(message), true); this.webSocket.sendText(JSONUtils.toJSON(message), true);
} }
/**
* 同步发送消息
*
* @param message 消息
*
* @return 响应
*/
public Message sendSync(Message message) {
final String id = message.getHeader().getId();
this.syncMessage.put(id, message);
synchronized (message) {
try {
message.wait(this.taoyaoProperties.getTimeout());
} catch (InterruptedException e) {
log.error("等待同步消息异常:{}", message, e);
}
}
final Message response = this.syncMessage.remove(id);
if(response == null || message.equals(response)) {
log.warn("消息没有响应:{}", message);
}
return response;
}
/**
* 处理消息
*
* @param data 消息
*/
private void execute(String data) {
if(StringUtils.isNotEmpty(data)) {
final Message message = JSONUtils.toJava(data, Message.class);
final Header header = message.getHeader();
final String id = header.getId();
final Integer pid = header.getPid();
final Message request = this.syncMessage.get(id);
// 存在同步响应
if(request != null) {
// 重新设置消息
this.syncMessage.put(id, message);
// 唤醒等待现场
synchronized (request) {
request.notifyAll();
}
} else {
final Protocol protocol = this.protocolManager.protocol(pid);
if(protocol instanceof ProtocolMediasoupAdapter mediasoupProtocol) {
mediasoupProtocol.execute(message, this.webSocket);
} else {
log.warn("未知Mediasoup信令{}", data);
}
}
}
}
/** /**
* 消息监听 * 消息监听
* *
@@ -113,10 +184,7 @@ public class MediasoupClient {
// 设置新的通道 // 设置新的通道
MediasoupClient.this.webSocket = webSocket; MediasoupClient.this.webSocket = webSocket;
// 发送授权消息 // 发送授权消息
MediasoupClient.this.send(Map.of( MediasoupClient.this.send(MediasoupClient.this.authorizeProtocol.build());
"username", MediasoupClient.this.mediasoupProperties.getUsername(),
"password", MediasoupClient.this.mediasoupProperties.getPassword()
));
} }
@Override @Override
@@ -128,6 +196,7 @@ public class MediasoupClient {
@Override @Override
public CompletionStage<?> onText(WebSocket webSocket, CharSequence data, boolean last) { public CompletionStage<?> onText(WebSocket webSocket, CharSequence data, boolean last) {
log.debug("Mediasoup收到消息text{}-{}", webSocket, data); log.debug("Mediasoup收到消息text{}-{}", webSocket, data);
MediasoupClient.this.execute(data.toString());
return Listener.super.onText(webSocket, data, last); return Listener.super.onText(webSocket, data, last);
} }

View File

@@ -0,0 +1,40 @@
package com.acgist.taoyao.mediasoup.protocol;
import java.net.http.WebSocket;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import com.acgist.taoyao.boot.model.Message;
import com.acgist.taoyao.mediasoup.MediasoupClient;
import com.acgist.taoyao.signal.client.ClientSession;
import com.acgist.taoyao.signal.protocol.ProtocolAdapter;
/**
* Mediasoup信令适配器
*
* @author acgist
*/
public abstract class ProtocolMediasoupAdapter extends ProtocolAdapter {
@Lazy
@Autowired
protected MediasoupClient mediasoupClient;
protected ProtocolMediasoupAdapter(Integer pid, String name) {
super(pid, name);
}
@Override
public void execute(String sn, Message message, ClientSession session) {
}
/**
* 处理Mediasoup信令
*
* @param message 信令消息
* @param webSocket WebSocket
*/
public abstract void execute(Message message, WebSocket webSocket);
}

View File

@@ -0,0 +1,48 @@
package com.acgist.taoyao.mediasoup.protocol.client;
import java.net.http.WebSocket;
import java.util.Map;
import org.springframework.beans.factory.annotation.Autowired;
import com.acgist.taoyao.boot.annotation.Protocol;
import com.acgist.taoyao.boot.model.Message;
import com.acgist.taoyao.boot.property.MediasoupProperties;
import com.acgist.taoyao.boot.property.WebrtcProperties;
import com.acgist.taoyao.mediasoup.protocol.ProtocolMediasoupAdapter;
import lombok.extern.slf4j.Slf4j;
/**
* Mediasoup终端授权信令
*
* @author acgist
*/
@Slf4j
@Protocol
public class AuthorizeProtocol extends ProtocolMediasoupAdapter {
public static final Integer PID = 6000;
@Autowired
private WebrtcProperties webrtcProperties;
public AuthorizeProtocol() {
super(PID, "Mediasoup终端授权信令");
}
@Override
public Message build() {
final MediasoupProperties mediasoup = this.webrtcProperties.getMediasoup();
return super.build(Map.of(
"username", mediasoup.getUsername(),
"password", mediasoup.getPassword()
));
}
@Override
public void execute(Message message, WebSocket webSocket) {
log.info("Mediasoup终端授权结果{}", message);
}
}

View File

@@ -13,6 +13,7 @@ import com.acgist.taoyao.signal.event.ApplicationEventAdapter;
* 3000~3999会议信令 * 3000~3999会议信令
* 4000~4999直播信令 * 4000~4999直播信令
* 5000~5999媒体信令 * 5000~5999媒体信令
* 6000~6999媒体信令Mediasoup
* *
* @author acgist * @author acgist
*/ */

View File

@@ -1,5 +1,6 @@
package com.acgist.taoyao.signal.protocol; package com.acgist.taoyao.signal.protocol;
import java.net.http.WebSocket;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
@@ -62,6 +63,15 @@ public class ProtocolManager {
}); });
} }
/**
* @param pid 信令标识
*
* @return 信令
*/
public Protocol protocol(Integer pid) {
return this.protocolMapping.get(pid);
}
/** /**
* 执行信令消息 * 执行信令消息
* *
@@ -111,5 +121,5 @@ public class ProtocolManager {
session.push(this.errorProtocol.build(MessageCode.CODE_3401, "终端会话没有授权")); session.push(this.errorProtocol.build(MessageCode.CODE_3401, "终端会话没有授权"));
} }
} }
} }