[+] 整体架构调整

生产者接入完成
媒体作为信令服务的终端注册
This commit is contained in:
acgist
2023-02-25 13:31:57 +08:00
parent 6358255458
commit 129c36ed80
156 changed files with 3659 additions and 2817 deletions

View File

@@ -1,4 +1,4 @@
# 媒体
# 媒体终端
只要负责媒体处理,不要添加任何业务逻辑,所有业务逻辑都由[taoyao-signal-server](../taoyao-signal-server)处理。
@@ -16,50 +16,9 @@ make
make -C worker
```
## 事件
## 节点配置
```
mediasoup.observer.on("newworker", fn(worker));
worker.on("died", fn(error));
worker.observer.on("close", fn());
worker.observer.on("newrouter", fn(router));
worker.observer.on("newwebrtcserver", fn(router));
router.on(“workerclose”, fn());
router.observer.on(“close”, fn());
router.observer.on(“newtransport”, fn(transport));
router.observer.on(“newrtpobserver”, fn(rtpObserver));
transport.on("trace", fn(trace));
transport.on(“routerclose”, fn());
transport.on(“listenserverclose”, fn());
transport.on(“trace”, fn(trace));
webRtcTransport.on(“icestatechange”, fn(iceState))
webRtcTransport.on(“iceselectedtuplechange”, fn(iceSelectedTuple))
webRtcTransport.on(“dtlsstatechange”, fn(dtlsState))
webRtcTransport.on(“sctpstatechange”, fn(sctpState))
plainTransport.on(“tuple”, fn(tuple))
plainTransport.on(“rtcptuple”, fn(rtcpTuple))
plainTransport.on(“sctpstatechange”, fn(sctpState))
pipeTransport.on(“sctpstatechange”, fn(sctpState))
directTransport.on(“rtcp”, fn(rtcpPacket))
transport.observer.on(“close”, fn())
transport.observer.on(“newproducer”, fn(producer))
transport.observer.on(“newconsumer”, fn(consumer))
transport.observer.on(“newdataproducer”, fn(dataProducer))
transport.observer.on(“newdataconsumer”, fn(dataConsumer))
transport.observer.on(“trace”, fn(trace))
webRtcTransport.observer.on(“icestatechange”, fn(iceState))
webRtcTransport.observer.on(“iceselectedtuplechange”, fn(iceSelectedTuple))
webRtcTransport.observer.on(“dtlsstatechange”, fn(dtlsState))
webRtcTransport.observer.on(“sctpstatechange”, fn(sctpState))
plainTransport.observer.on(“tuple”, fn(tuple))
plainTransport.observer.on(“rtcptuple”, fn(rtcpTuple))
plainTransport.observer.on(“sctpstatechange”, fn(sctpState))
pipeTransport.observer.on(“sctpstatechange”, fn(sctpState))
```
## 安全
默认媒体服务只要暴露媒体`UDP`端口,信令接口不用暴露,所以使用简单鉴权。
需要保证`src/Config.js`中的`clientId``ecosystem.config.json`中的`name`保持一致,否者重启和关闭信令无效。
## 动态调节码率

View File

@@ -10,7 +10,6 @@
},
"dependencies": {
"ws": "^8.12.0",
"moment": "^2.29.4",
"mediasoup": "file:./mediasoup"
}
}

View File

@@ -6,27 +6,28 @@ const os = require("os");
module.exports = {
// 服务名称
name: "taoyao-client-media",
// 服务版本
version: "1.0.0",
// 欢迎页面
welcome: `${__dirname}/index.html`,
// 日志级别
logLevel: "DEBUG",
// 录像目录
recordStoragePath: "/data/record",
// 信令服务
https: {
// 信令服务地址端口
listenIp: process.env.MEDIASOUP_LISTEN_IP || "0.0.0.0",
listenPort: process.env.HTTPS_LISTEN_PORT || 9443,
// 信令服务安全配置
// 保存目录
storagePath: "/data/storage",
// 图片目录
storageImagePath: "/data/storage/image",
// 视频目录
storageVideoPath: "/data/storage/video",
// 信令配置
signal: {
// 服务版本
version: "1.0.0",
// 终端标识
clientId: "taoyao-client-media",
// 地址
host: "127.0.0.1",
// 端口
port: 8888,
// 协议
scheme: "wss",
// 信令用户
username: "taoyao",
// 信令密码
password: "taoyao",
// 信令服务证书配置
tls: {
cert: process.env.HTTPS_CERT_PUBLIC_KEY || `${__dirname}/certs/publicKey.pem`,
key: process.env.HTTPS_CERT_PRIVATE_KEY || `${__dirname}/certs/privateKey.pem`,
},
},
// 水印
watermark: {
@@ -42,8 +43,7 @@ module.exports = {
workerSize: Object.keys(os.cpus()).length,
// Workerhttps://mediasoup.org/documentation/v3/mediasoup/api/#WorkerSettings
workerSettings: {
// 级别debug | warn | error | none
logLevel: "warn",
// 记录标记
logTags: [
"bwe",
"ice",
@@ -59,6 +59,8 @@ module.exports = {
"message",
"simulcast",
],
// 级别debug | warn | error | none
logLevel: "warn",
rtcMinPort: process.env.MEDIASOUP_MIN_PORT || 40000,
rtcMaxPort: process.env.MEDIASOUP_MAX_PORT || 49999,
},
@@ -120,14 +122,14 @@ module.exports = {
ip: process.env.MEDIASOUP_LISTEN_IP || "0.0.0.0",
port: 44444,
// 公网地址
announcedIp: process.env.MEDIASOUP_ANNOUNCED_IP,
announcedIp: process.env.MEDIASOUP_ANNOUNCED_IP || "192.168.1.110",
},
{
protocol: "tcp",
ip: process.env.MEDIASOUP_LISTEN_IP || "0.0.0.0",
port: 44444,
// 公网地址
announcedIp: process.env.MEDIASOUP_ANNOUNCED_IP,
announcedIp: process.env.MEDIASOUP_ANNOUNCED_IP || "192.168.1.110",
},
],
},
@@ -137,7 +139,7 @@ module.exports = {
{
ip: process.env.MEDIASOUP_LISTEN_IP || "0.0.0.0",
// 公网地址
announcedIp: process.env.MEDIASOUP_ANNOUNCED_IP,
announcedIp: process.env.MEDIASOUP_ANNOUNCED_IP || "192.168.1.110",
},
],
initialAvailableOutgoingBitrate: 1000000,
@@ -150,7 +152,7 @@ module.exports = {
listenIp: {
ip: process.env.MEDIASOUP_LISTEN_IP || "0.0.0.0",
// 公网地址
announcedIp: process.env.MEDIASOUP_ANNOUNCED_IP,
announcedIp: process.env.MEDIASOUP_ANNOUNCED_IP || "192.168.1.110",
},
maxSctpMessageSize: 262144,
},

View File

@@ -1,49 +1,44 @@
#!/usr/bin/env node
const fs = require("fs");
const ws = require("ws");
const https = require("https");
const mediasoup = require("mediasoup");
const config = require("./Config");
const Signal = require("./Signal");
const mediasoup = require("mediasoup");
const { Signal, signalChannel } = require("./Signal");
// 线程名称
process.title = config.name;
// 禁止校验无效证书
process.env.NODE_TLS_REJECT_UNAUTHORIZED = "0";
// 无效信令终端列表
const clients = [];
// Mediasoup Worker列表
const mediasoupWorkers = [];
// HTTPS server
let httpsServer;
// WebSocket server
let webSocketServer;
// 信令
// 信令服务
const signal = new Signal(mediasoupWorkers);
/**
* 启动Mediasoup Worker
* 创建Mediasoup Worker列表
*/
async function buildMediasoupWorkers() {
// 可配置的事件
// mediasoup.observer.on("newworker", fn(worker));
const { workerSize } = config.mediasoup;
console.info("启动Worker", workerSize);
console.info("创建Mediasoup Worker数量:", workerSize);
for (let index = 0; index < workerSize; index++) {
// 新建Worker
const worker = await mediasoup.createWorker({
logLevel: config.mediasoup.workerSettings.logLevel,
logTags: config.mediasoup.workerSettings.logTags,
logLevel: config.mediasoup.workerSettings.logLevel,
rtcMinPort: Number(config.mediasoup.workerSettings.rtcMinPort),
rtcMaxPort: Number(config.mediasoup.workerSettings.rtcMaxPort),
});
// 监听Worker事件
worker.on("died", () => {
console.warn("Worker停止服务", worker.pid);
worker.on("died", (error) => {
console.warn("Mediasoup Worker停止服务", worker.pid, error);
setTimeout(() => process.exit(1), 2000);
});
worker.observer.on("close", () => {
console.warn("Worker关闭服务", worker.pid);
console.warn("Mediasoup Worker关闭服务", worker.pid);
});
// 可配置的事件
// worker.observer.on("newrouter", fn(router));
// worker.observer.on("newwebrtcserver", fn(router));
// 配置WebRTC服务
const webRtcServerOptions = JSON.parse(
JSON.stringify(config.mediasoup.webRtcServerOptions)
@@ -53,101 +48,34 @@ async function buildMediasoupWorkers() {
}
const webRtcServer = await worker.createWebRtcServer(webRtcServerOptions);
worker.appData.webRtcServer = webRtcServer;
// 加入Worker队列
mediasoupWorkers.push(worker);
}
}
/**
* 启动信令服务
* 连接信令服务
*/
async function buildSignalServer() {
const tls = {
cert: fs.readFileSync(config.https.tls.cert),
key: fs.readFileSync(config.https.tls.key),
};
// 配置HTTPS Server
httpsServer = https.createServer(tls, (request, response) => {
response.writeHead(200);
response.end(fs.readFileSync(config.welcome));
});
// 配置WebSocket Server
webSocketServer = new ws.Server({ server: httpsServer });
webSocketServer.on("connection", (session) => {
console.info("打开信令通道", session._socket.remoteAddress);
session.datetime = Date.now();
session.authorize = false;
clients.push(session);
session.on("close", (code) => {
console.info("关闭信令通道", session._socket.remoteAddress, code);
});
session.on("error", (error) => {
console.error("信令通道异常", session._socket.remoteAddress, error);
});
session.on("message", (message) => {
console.debug("处理信令消息", message.toString());
try {
signal.on(JSON.parse(message), session);
} catch (error) {
console.error("处理信令消息异常", message.toString(), error);
}
});
});
// 打开监听
httpsServer.listen(
Number(config.https.listenPort),
config.https.listenIp,
() => {
console.info("信令服务启动完成");
async function connectSignalServer() {
await signalChannel.connect(
`wss://${config.signal.host}:${config.signal.port}/websocket.signal`,
function (message) {
signal.on(message);
}
);
}
/**
* 定时任务
*/
async function buildInterval() {
// 定时打印使用情况
setInterval(async () => {
signal.usage();
}, 300 * 1000);
// 定时清理过期无效终端
setInterval(() => {
let failSize = 0;
let silentSize = 0;
let successSize = 0;
const datetime = Date.now();
for (let index = 0; index < clients.length; index++) {
const session = clients[index];
if (session.authorize) {
clients.splice(index, 1);
successSize++;
index--;
} else if (datetime - session.datetime >= 5000) {
clients.splice(index, 1);
session.close();
failSize++;
index--;
} else {
silentSize++;
}
}
console.info("定时清理无效信令终端(无效|静默|成功|现存)", failSize, silentSize, successSize, clients.length);
}, 60 * 1000);
}
/**
* 启动方法
*/
async function main() {
console.info("桃之夭夭,灼灼其华。")
console.info("之子于归,宜其室家。")
console.info("开始启动", config.name);
console.log(`
桃之夭夭,灼灼其华。
之子于归,宜其室家。
`);
console.info("开始启动:", config.name);
await buildMediasoupWorkers();
await buildSignalServer();
await buildInterval();
console.info("启动完成", config.name);
await connectSignalServer();
console.info("启动完成:", config.name);
}
// 启动服务
main();

View File

@@ -1,50 +1,216 @@
const config = require("./Config");
const process = require("child_process");
const WebSocket = require("ws");
/**
* 信令协议
*/
const protocol = {
// 当前索引
index: 100000,
// 最小索引
minIndex: 100000,
index: 0,
// 最大索引
maxIndex: 999999,
maxIndex: 1000,
/**
* @returns 索引
*/
buildId: function () {
if (this.index++ >= this.maxIndex) {
this.index = this.minIndex;
buildId() {
if (++this.index >= this.maxIndex) {
this.index = 0;
}
return Date.now() + "" + this.index;
return Date.now() * 1000 + this.index;
},
/**
* 生成信令消息
*
* @param {*} signal 信令标识
* @param {*} roomId 房间标识
* @param {*} body 信令消息
* @param {*} id ID
*
* @returns 信令消息
*/
buildMessage: function (signal, roomId, body = {}, id) {
buildMessage(signal, body = {}, id) {
const message = {
header: {
v: config.version,
v: config.signal.version,
id: id || this.buildId(),
signal: signal,
},
body: {
roomId,
...body,
},
body: body,
};
return message;
},
};
/**
* 信令通道
*/
const signalChannel = {
// 通道
channel: null,
// 地址
address: null,
// 回调
callback: null,
// 心跳时间
heartbeatTime: 30 * 1000,
// 心跳定时器
heartbeatTimer: null,
// 是否重连
reconnection: true,
// 重连定时器
reconnectTimer: null,
// 防止重复重连
lockReconnect: false,
// 当前重连时间
connectionTimeout: 5 * 1000,
// 最小重连时间
minReconnectionDelay: 5 * 1000,
// 最大重连时间
maxReconnectionDelay: 60 * 1000,
/**
* 心跳
*/
heartbeat() {
const self = this;
if (self.heartbeatTimer) {
clearTimeout(self.heartbeatTimer);
}
self.heartbeatTimer = setTimeout(async function () {
if (self.channel && self.channel.readyState === WebSocket.OPEN) {
// TODO信号强度、电池信息
self.push(
protocol.buildMessage("client::heartbeat", {
signal: 100,
battery: 100,
charging: true,
})
);
self.heartbeat();
} else {
console.warn("发送心跳失败:", self.address);
}
}, self.heartbeatTime);
},
/**
* 连接
*
* @param {*} address 地址
* @param {*} callback 回调
* @param {*} reconnection 是否重连
*
* @returns Promise
*/
async connect(address, callback, reconnection = true) {
const self = this;
if (self.channel && self.channel.readyState === WebSocket.OPEN) {
return new Promise((resolve, reject) => {
resolve(self.channel);
});
}
self.address = address;
self.callback = callback;
self.reconnection = reconnection;
return new Promise((resolve, reject) => {
console.debug("连接信令通道:", self.address);
self.channel = new WebSocket(self.address, { handshakeTimeout: 5000 });
self.channel.on("open", async function () {
console.info("打开信令通道:", self.address);
// 注册终端
// TODO信号强度、电池信息
self.push(
protocol.buildMessage("client::register", {
name: "桃夭媒体服务",
clientId: config.signal.clientId,
clientType: "MEDIA",
signal: 100,
battery: 100,
charging: true,
username: config.signal.username,
password: config.signal.password,
})
);
// 重置时间
self.connectionTimeout = self.minReconnectionDelay;
// 开始心跳
self.heartbeat();
// 成功回调
resolve(self.channel);
});
self.channel.on("close", async function () {
console.warn("信令通道关闭:", self.address);
if (self.reconnection) {
self.reconnect();
}
// 不要失败回调
});
self.channel.on("error", async function (e) {
console.error("信令通道异常:", self.address, e);
if (self.reconnection) {
self.reconnect();
}
// 不要失败回调
});
self.channel.on("message", async function (data) {
try {
const content = data.toString();
console.debug("信令通道消息:", content);
self.callback(JSON.parse(content));
} catch (error) {
console.error("处理信令消息异常:", content, error);
}
});
});
},
/**
* 重连
*/
reconnect() {
const self = this;
if (
self.lockReconnect ||
(self.channel && self.channel.readyState === WebSocket.OPEN)
) {
return;
}
self.lockReconnect = true;
if (self.reconnectTimer) {
clearTimeout(self.reconnectTimer);
}
// 定时重连
self.reconnectTimer = setTimeout(function () {
console.info("信令通道重连:", self.address);
self.connect(self.address, self.callback, self.reconnection);
self.lockReconnect = false;
}, self.connectionTimeout);
self.connectionTimeout = Math.min(
self.connectionTimeout + self.minReconnectionDelay,
self.maxReconnectionDelay
);
},
/**
* 推送消息
*
* @param {*} message 消息
*/
push(message) {
try {
this.channel.send(JSON.stringify(message));
} catch (error) {
console.error("推送消息异常:", message, error);
}
},
/**
* 关闭通道
*/
close() {
const self = this;
self.reconnection = false;
self.channel.close();
clearTimeout(self.heartbeatTimer);
clearTimeout(self.reconnectTimer);
},
};
/**
* 房间
*/
@@ -63,6 +229,8 @@ class Room {
audioLevelObserver = null;
// 音频监控
activeSpeakerObserver = null;
// 消费者复制数量
consumerReplicas = 0;
// 通道
transports = new Map();
// 生产者
@@ -94,60 +262,83 @@ class Room {
this.handleActiveSpeakerObserver();
}
/**
* 声音监控
*/
handleAudioLevelObserver() {
const self = this;
// 声音
this.audioLevelObserver.on("volumes", (volumes) => {
self.audioLevelObserver.on("volumes", (volumes) => {
for (const value of volumes) {
const { producer, volume } = value;
this.signal.push(
protocol.buildMessage("audio::active::speaker", this.roomId, {
peerId: producer.appData.peerId,
signalChannel.push(
protocol.buildMessage("media::audio::active::speaker", {
roomId: self.roomId,
clientId: producer.clientId,
volume: volume,
})
);
}
});
// 静音
this.audioLevelObserver.on("silence", () => {
this.signal.push(
protocol.buildMessage("audio::active::speaker", this.roomId, {
peerId: null,
self.audioLevelObserver.on("silence", () => {
signalChannel.push(
protocol.buildMessage("media::audio::active::speaker", {
roomId: self.roomId,
})
);
});
}
/**
* 说话监控
*/
handleActiveSpeakerObserver() {
this.activeSpeakerObserver.on("dominantspeaker", (dominantSpeaker) => {
console.debug("dominantspeaker", dominantSpeaker.producer.id);
const self = this;
self.activeSpeakerObserver.on("dominantspeaker", (dominantSpeaker) => {
console.debug(
"dominantspeaker",
dominantSpeaker.producer.id,
dominantSpeaker.producer.clientId
);
});
}
/**
* 使用情况
*/
usage() {
console.info("房间标识", this.roomId);
console.info("房间通道数量", this.transports.size);
console.info("房间生产者数量", this.producers.size);
console.info("房间消费者数量", this.consumers.size);
console.info("房间数据生产者数量", this.dataProducers.size);
console.info("房间数据消费者数量", this.dataConsumers.size);
console.info("房间标识", this.roomId);
console.info("房间通道数量", this.transports.size);
console.info("房间生产者数量", this.producers.size);
console.info("房间消费者数量", this.consumers.size);
console.info("房间数据生产者数量", this.dataProducers.size);
console.info("房间数据消费者数量", this.dataConsumers.size);
}
/**
* 关闭资源
*/
close() {
this.close = true;
if (this.mediasoupRouter) {
this.mediasoupRouter.close();
const self = this;
if (self.close) {
return;
}
self.close = true;
if (self.mediasoupRouter) {
self.mediasoupRouter.close();
}
}
}
/**
* 信令
* 信令服务
*/
class Signal {
// 房间列表
rooms = new Map();
// 信令终端列表
clients = [];
// 回调事件
callbackMapping = new Map();
// Worker列表
mediasoupWorkers = [];
// Worker索引
@@ -155,51 +346,55 @@ class Signal {
constructor(mediasoupWorkers) {
this.mediasoupWorkers = mediasoupWorkers;
// 定时打印使用情况
setInterval(async () => {
this.usage();
}, 300 * 1000);
}
/**
* 处理事件
* 处理信令消息
*
* @param {*} message 消息
* @param {*} session websocket
*/
on(message, session) {
// 授权验证
if (!session.authorize) {
if (
message?.header?.signal === "media::register" &&
message?.body?.username === config.https.username &&
message?.body?.password === config.https.password
) {
console.debug("授权成功", session._socket.remoteAddress);
this.clients.push(session);
session.authorize = true;
message.code = "0000";
message.message = "授权成功";
message.body.username = undefined;
message.body.password = undefined;
} else {
console.warn("授权失败", session._socket.remoteAddress);
message.code = "3401";
message.message = "授权失败";
on(message) {
// 请求回调
if (this.callbackMapping.has(message.header.id)) {
try {
this.callbackMapping.get(message.header.id)(message);
} finally {
this.callbackMapping.delete(message.header.id);
}
this.push(message, session);
return;
}
// 处理信令
const body = message.body;
switch (message.header.signal) {
case "client::reboot":
this.clientReboot(message, body);
break;
case "client::shutdown":
this.clientShutdown(message, body);
break;
case "media::ice::restart":
this.mediaIceRestart(message, body);
break;
case "media::consume":
this.mediaConsume(message, body);
break;
case "media::produce":
this.mediaProduce(message, body);
break;
case "media::router::rtp::capabilities":
this.mediaRouterRtpCapabilities(session, message, body);
this.mediaRouterRtpCapabilities(message, body);
break;
case "media::transport::webrtc::connect":
this.mediaTransportWebrtcConnect(session, message, body);
this.mediaTransportWebrtcConnect(message, body);
break;
case "media::transport::webrtc::create":
this.mediaTransportWebrtcCreate(session, message, body);
this.mediaTransportWebrtcCreate(message, body);
break;
case "room::create":
this.roomCreate(session, message, body);
this.roomCreate(message, body);
break;
}
}
@@ -208,23 +403,41 @@ class Signal {
* 通知信令
*
* @param {*} message 消息
* @param {*} session 信令通道
*/
push(message, session) {
if (session) {
push(message) {
signalChannel.push(message);
}
/**
* 同步请求
*
* @param {*} message 消息
*
* @returns Promise
*/
async request(message) {
const self = this;
return new Promise((resolve, reject) => {
let done = false;
// 注册回调
self.callbackMapping.set(message.header.id, (response) => {
resolve(response);
done = true;
});
// 发送消息
try {
session.send(JSON.stringify(message));
self.channel.send(JSON.stringify(message));
} catch (error) {
console.error(
"通知信令失败",
session._socket.remoteAddress,
message,
error
);
console.error("请求消息异常:", message, error);
}
} else {
this.clients.forEach((session) => this.push(message, session));
}
// 设置超时
setTimeout(() => {
if (!done) {
self.callbackMapping.delete(message.header.id);
reject("请求超时", message);
}
}, 5000);
});
}
/**
@@ -233,11 +446,11 @@ class Signal {
async usage() {
for (const worker of this.mediasoupWorkers) {
const usage = await worker.getResourceUsage();
console.info("Worker使用情况", worker.pid, usage);
console.info("Worker使用情况", worker.pid, usage);
}
console.info("路由数量", this.mediasoupWorkers.length);
console.info("房间数量", this.rooms.size);
Array.from(this.rooms.values()).forEach(room => room.usage());
console.info("路由数量", this.mediasoupWorkers.length);
console.info("房间数量", this.rooms.size);
Array.from(this.rooms.values()).forEach((room) => room.usage());
}
/**
@@ -252,39 +465,286 @@ class Signal {
}
/**
* 路由RTP能力信令
* 重启终端信令
*
* @param {*} session 信令通道
* @param {*} message 消息
* @param {*} body 消息主体
*/
mediaRouterRtpCapabilities(session, message, body) {
const { roomId } = body;
const room = this.rooms.get(roomId);
message.body = room.mediasoupRouter.rtpCapabilities;
this.push(message, session);
}
async mediaTransportWebrtcConnect(session, message, body) {
const { roomId, transportId, dtlsParameters } = body;
const room = this.rooms.get(roomId);
const transport = room.transports.get(transportId);
if (!transport) {
throw new Error(`transport with id "${transportId}" not found`);
}
await transport.connect({ dtlsParameters });
message.body = { roomId };
this.push(message, session);
clientReboot(message, body) {
process.exec(
`pm2 restart ${config.signal.clientId}`,
function (error, stdout, stderr) {
console.info("重启媒体服务:", error, stdout, stderr);
}
);
// this.push(message);
}
/**
* @param {*} session 信令通道
* 关闭终端信令
*
* @param {*} message 消息
* @param {*} body 消息主体
*/
async mediaTransportWebrtcCreate(session, message, body) {
clientShutdown(message, body) {
process.exec(
`pm2 stop ${config.signal.clientId}`,
function (error, stdout, stderr) {
console.info("关闭媒体服务:", error, stdout, stderr);
}
);
// this.push(message);
}
/**
* 媒体重启ICE信令
*
* @param {*} message 消息
* @param {*} body 消息主体
*/
async mediaIceRestart(message, body) {
const { roomId, transportId } = body;
const room = this.rooms.get(roomId);
const transport = room.transports.get(transportId);
const iceParameters = await transport.restartIce();
message.body.iceParameters = iceParameters;
this.push(message);
}
async mediaProduce(message, body) {
const self = this;
const { roomId, forceTcp, producing, consuming, sctpCapabilities } = body;
const {
kind,
roomId,
clientId,
streamId,
appData,
transportId,
rtpParameters,
} = body;
const room = self.rooms.get(roomId);
const transport = room.transports.get(transportId);
const producer = await transport.produce({
kind,
appData,
rtpParameters,
// keyFrameRequestDelay: 5000
});
producer.clientId = clientId;
producer.streamId = streamId;
room.producers.set(producer.id, producer);
// 打分
producer.on("score", (score) => {
self.push(
protocol.buildMessage("media::producer::score", {
roomId: roomId,
producerId: producer.id,
score,
})
);
});
producer.on("videoorientationchange", (videoOrientation) => {
logger.debug(
'producer "videoorientationchange" event [producerId:%s, videoOrientation:%o]',
producer.id,
videoOrientation
);
});
producer.on("trace", (trace) => {
logger.debug(
'producer "trace" event [producerId:%s, trace.type:%s, trace:%o]',
producer.id,
trace.type,
trace
);
});
message.body = { kind: kind, producerId: producer.id };
this.push(message);
if (producer.kind === "audio") {
room.audioLevelObserver
.addProducer({ producerId: producer.id })
.catch(() => {});
room.activeSpeakerObserver
.addProducer({ producerId: producer.id })
.catch(() => {});
}
}
async mediaConsume(message, body) {
const { roomId, producerId, transportId, rtpCapabilities } = body;
const room = this.rooms.get(roomId);
const producer = room.producers.get(producerId);
const transport = room.transports.get(transportId);
if (
!room ||
!producer ||
!transport ||
!rtpCapabilities ||
!room.mediasoupRouter.canConsume({
producerId: producerId,
rtpCapabilities: rtpCapabilities,
})
) {
console.warn(
"不能消费媒体:",
roomId,
producerId,
transportId,
rtpCapabilities
);
return;
}
const promises = [];
const consumerCount = 1 + room.consumerReplicas;
for (let i = 0; i < consumerCount; i++) {
promises.push(
(async () => {
let consumer;
try {
consumer = await transport.consume({
producerId: producerId,
rtpCapabilities: rtpCapabilities,
// 暂停
paused: true,
});
} catch (error) {
console.error(
"创建消费者异常:",
roomId,
producerId,
transportId,
rtpCapabilities,
error
);
return;
}
room.consumers.set(consumer.id, consumer);
consumer.on("transportclose", () => {
room.consumers.delete(consumer.id);
});
consumer.on("producerclose", () => {
room.consumers.delete(consumer.id);
this.push(
protocol.buildMessage("media::consumer::close", {
consumerId: consumer.id,
})
);
});
consumer.on("producerpause", () => {
this.push(
protocol.buildMessage("media::consumer::pause", {
consumerId: consumer.id,
})
);
});
consumer.on("producerresume", () => {
this.push(
protocol.buildMessage("media::consumer::resume", {
consumerId: consumer.id,
})
);
});
consumer.on("score", (score) => {
this.push(
protocol.buildMessage("media::consumer::score", {
consumerId: consumer.id,
score,
})
);
});
consumer.on("layerschange", (layers) => {
this.push(
protocol.buildMessage("media::consumer::layers::change", {
consumerId: consumer.id,
spatialLayer: layers ? layers.spatialLayer : null,
temporalLayer: layers ? layers.temporalLayer : null,
})
);
});
consumer.on("trace", (trace) => {
logger.debug(
'consumer "trace" event [producerId:%s, trace.type:%s, trace:%o]',
consumer.id,
trace.type,
trace
);
});
// TODO改为同步
this.push(protocol.buildMessage("media::consume", {
//await this.request("media::consume", {
kind: consumer.kind,
type: consumer.type,
roomId: roomId,
producerId: producerId,
consumerId: consumer.id,
rtpParameters: consumer.rtpParameters,
appData: producer.appData,
producerPaused: consumer.producerPaused,
}));
await consumer.resume();
this.push(
protocol.buildMessage("media::consumer::score", {
consumerId: consumer.id,
score: consumer.score,
})
);
})()
);
}
try {
await Promise.all(promises);
} catch (error) {
console.warn("_createConsumer() | failed:%o", error);
}
}
/**
* 路由RTP能力信令
*
* @param {*} message 消息
* @param {*} body 消息主体
*/
mediaRouterRtpCapabilities(message, body) {
const { roomId } = body;
const room = this.rooms.get(roomId);
message.body.rtpCapabilities = room.mediasoupRouter.rtpCapabilities;
this.push(message);
}
/**
* 连接WebRTC通道信令
*
* @param {*} message 消息
* @param {*} body 消息主体
*/
async mediaTransportWebrtcConnect(message, body) {
const { roomId, transportId, dtlsParameters } = body;
const room = this.rooms.get(roomId);
const transport = room.transports.get(transportId);
await transport.connect({ dtlsParameters });
message.body = { roomId: roomId, transportId: transport.id };
this.push(message);
}
/**
* 创建WebRTC通道信令
*
* @param {*} message 消息
* @param {*} body 消息主体
*/
async mediaTransportWebrtcCreate(message, body) {
const self = this;
const {
roomId,
clientId,
forceTcp,
producing,
consuming,
sctpCapabilities,
} = body;
const webRtcTransportOptions = {
...config.mediasoup.webRtcTransportOptions,
appData: { producing, consuming },
@@ -300,29 +760,41 @@ class Signal {
...webRtcTransportOptions,
webRtcServer: room.webRtcServer,
});
transport.clientId = clientId;
transport.on("icestatechange", (iceState) => {
console.debug(
"WebRtcTransport icestatechange event",
iceState,
transport.id
);
});
transport.on("dtlsstatechange", (dtlsState) => {
console.debug(
'WebRtcTransport dtlsstatechange event',
dtlsState
"WebRtcTransport dtlsstatechange event",
dtlsState,
transport.id
);
});
transport.on("sctpstatechange", (sctpState) => {
console.debug(
'WebRtcTransport sctpstatechange event',
sctpState
"WebRtcTransport sctpstatechange event",
sctpState,
transport.id
);
});
// await transport.enableTraceEvent([ 'probation', 'bwe' ]);
await transport.enableTraceEvent(["bwe"]);
transport.on("trace", (trace) => {
console.debug(
'transport trace event',
trace,
trace.type,
transport.id,
);
console.debug("transport trace event", trace, trace.type, transport.id);
});
// Store the WebRtcTransport into the protoo Peer data Object.
// 可配置的事件
// transport.on("routerclose", fn());
// transport.on("listenserverclose", fn());
// transport.observer.on("close", fn());
// transport.observer.on("newproducer", fn(producer));
// transport.observer.on("newconsumer", fn(consumer));
// transport.observer.on("newdataproducer", fn(dataProducer));
// transport.observer.on("newdataconsumer", fn(dataConsumer));
// transport.observer.on("trace", fn(trace));
room.transports.set(transport.id, transport);
message.body = {
transportId: transport.id,
@@ -331,10 +803,7 @@ class Signal {
dtlsParameters: transport.dtlsParameters,
sctpParameters: transport.sctpParameters,
};
self.push(
message,
session
);
self.push(message);
const { maxIncomingBitrate } = config.mediasoup.webRtcTransportOptions;
// If set, apply max incoming bitrate limit.
if (maxIncomingBitrate) {
@@ -347,28 +816,27 @@ class Signal {
/**
* 创建房间信令
*
* @param {*} session 信令通道
* @param {*} message 消息
* @param {*} body 消息主体
*
* @returns 房间
*/
async roomCreate(session, message, body) {
async roomCreate(message, body) {
const roomId = body.roomId;
let room = this.rooms.get(roomId);
if (room) {
this.push(message, session);
this.push(message);
return room;
}
const mediasoupWorker = this.nextMediasoupWorker();
const { mediaCodecs } = config.mediasoup.routerOptions;
const mediasoupRouter = await mediasoupWorker.createRouter({ mediaCodecs });
mediasoupRouter.on("workerclose", () => {
// TODO通知房间关闭
});
mediasoupRouter.observer.on("close", () => {
// TODO通知房间关闭
});
// 可配置的事件
// mediasoupRouter.on("workerclose", () => {});
// mediasoupRouter.observer.on("newtransport", fn(transport));
// TODO下面两个监控改为配置启用
const audioLevelObserver = await mediasoupRouter.createAudioLevelObserver({
maxEntries: 1,
@@ -388,9 +856,9 @@ class Signal {
});
this.rooms.set(roomId, room);
console.info("创建房间", roomId);
this.push(message, session);
this.push(message);
return room;
}
}
module.exports = Signal;
module.exports = { Signal, signalChannel };

View File

@@ -1,28 +0,0 @@
-----BEGIN PRIVATE KEY-----
MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQCfmqQa2T/dGQIk
cXYiRE47csmhtOEmDL9e1uncyFOYaDLopqma+z0OWV71HrT9m5lEAbK3HZy56CWL
HNlFmnIztT84jTc11b3996cd6G1/5nffRj8D3FcSps9qVsRwAaEnKE6tGrCJhjWB
+/qKIZDP3MmGOkf1guQMTPeZBOEzo+JmL2MOwaLk8dXbb4KHbrye31UnvoXoANs3
1IbdAE3gRMDc40yBoVHeWOQ0h4c/wg1+rMhYPaUtgFkyqFr7zX1YvTa7BwZE4oVf
l7f5k+FRWO8OKn++kHIlE9H3pjdaGKWIuYFRYCxNC9IfGazhPSfYHVgg0q/8UUKg
jFqgbjIJAgMBAAECggEANs6ateGOjbUtyCfyQjgkiUOUu+PqQO+1s7KnYjqkgjyd
5sh8i4zk3Y2RDyl5S3FoQzM2FK2liS2P3uKMNdugheMij5/mqqT4dkLZ72pGV9pj
pZdwwjmi6PPBXCnpkPDuTw0HX2g/4SnmK/nEgjSejtKpnV9cIJHPD+5KRBCp6No+
JLgVFdhCCMEmyzTOU9ASxgRuw0sxGmPsdg3ZewUmoDb94mGDDot500rCB+wnYHue
ACegbaalrTWhY2DzYyNCfnR+F/mshIeqDjMVLsdPoj8MahdQbIoFcyNF+ts3pFVl
MXUUuO3bhZtrSVIT+4r07u82XoXaBufyPzK6Mo1vNQKBgQC9uebhZ1jCyYgGro5o
xuPuW+B5tHKGtlElJ0e7D/XjDvPWj3eGEkVCD5BH2L3qG9oqpKxX+hpmjc5ey21Q
zcnAGG2gC9o+uGQxxjix8sbZ0HEr2J6EdRlKzIZ/N2EnTmU+K8WWWI4g2PEqsId9
yt39EwU/D81bc0b5pcMGJTMk1wKBgQDXWxhywNFtPVreCeerXvvHLIQEtiZUMdhm
8zWtqHNhdojYoeaFPifVcajBo41Jl9qSVCot8cmdhIzmPwbRk1Ob7MtBC8t7OtzT
UmwT9nuxC5iSaGLJNBMtP3M0TzV7+qvvg+Az13kYtje9475LMEYdBLnCb4Mz1Y/R
QOR0mhWkHwKBgBxoepajl9nKvVBq0K4Fodlt7mWqzD85i1rpz8bFtAaklYQ6BSaR
E8e5dtwbKwyj0P3znE6sB0n1z8HH6f1gYuYdgkSloa8kgvQk/xY+COJSYK+1Br9E
nV3i0/y2eRiel3BAs5w4dEec1DeVKSR/vM+JCo8PuasIzsbQuCvyY/8PAoGBALWH
kT0xsZcej9j4inMXNq62pHYAQKDZ/2sQed/vTYsLSuEo39LTCOrPywum3LL7MQAF
uCRQWr3PfKGc4ReJ04FtAgvLcHNos7niET5ml+8uMia/nP2zSrLqeCbQ2emu7H2S
MUwhxm8BMk17iu2APKm7UQZHz1XDIF6oD6sGM1XLAoGALFpdCO486AbHYts2NHey
vQ39u3WDlrgzIM6hs8BI0FEZIdtuNWa/wpZYPaiXUvwTPsfQA1eCqXXuQGzH0fFn
g1M8RxsZ8XNjfQVpqSceZp+qFkTrR8zrbbQiZwBUm9WBdKfryMZHLhFraGLShFdM
ONu5qg3tXgMfFpNcMyiGgeA=
-----END PRIVATE KEY-----

View File

@@ -1,22 +0,0 @@
-----BEGIN CERTIFICATE-----
MIIDnzCCAoegAwIBAgIJAIKYVI9RPbj+MA0GCSqGSIb3DQEBCwUAMF0xCzAJBgNV
BAYTAkNOMQswCQYDVQQIEwJHRDELMAkGA1UEBxMCR1oxDzANBgNVBAoTBnRhb3lh
bzEPMA0GA1UECxMGYWNnaXN0MRIwEAYDVQQDEwlsb2NhbGhvc3QwHhcNMjIxMTA5
MDEyMTM3WhcNMzIxMTA2MDEyMTM3WjBdMQswCQYDVQQGEwJDTjELMAkGA1UECBMC
R0QxCzAJBgNVBAcTAkdaMQ8wDQYDVQQKEwZ0YW95YW8xDzANBgNVBAsTBmFjZ2lz
dDESMBAGA1UEAxMJbG9jYWxob3N0MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIB
CgKCAQEAn5qkGtk/3RkCJHF2IkROO3LJobThJgy/Xtbp3MhTmGgy6Kapmvs9Dlle
9R60/ZuZRAGytx2cueglixzZRZpyM7U/OI03NdW9/fenHehtf+Z330Y/A9xXEqbP
albEcAGhJyhOrRqwiYY1gfv6iiGQz9zJhjpH9YLkDEz3mQThM6PiZi9jDsGi5PHV
22+Ch268nt9VJ76F6ADbN9SG3QBN4ETA3ONMgaFR3ljkNIeHP8INfqzIWD2lLYBZ
Mqha+819WL02uwcGROKFX5e3+ZPhUVjvDip/vpByJRPR96Y3WhiliLmBUWAsTQvS
Hxms4T0n2B1YINKv/FFCoIxaoG4yCQIDAQABo2IwYDAdBgNVHQ4EFgQUPpT59FzS
UUzHsxrKeGOQ/YeaqpswDgYDVR0PAQH/BAQDAgWgMBoGA1UdEQQTMBGCCWxvY2Fs
aG9zdIcEfwAAATATBgNVHSUEDDAKBggrBgEFBQcDATANBgkqhkiG9w0BAQsFAAOC
AQEAKsoYmBr9EmYev6sWVet8x+/YDZgXGZolZff3agPT+uFL/6mtyAqnFD/ncPOh
n206l4RimSChVlEVx3pE4r5sBiLzPaX/dcCRoZNxEQMtjfYCk+4iRfkxhIvpqLzf
ZsEGbJCh9JodG2xkYNViPF2AqR8OchEMRttYQa2dDkk2oDVMg0bmqgZgSD7vEjdk
ovBhEIQ1Rhgv0yi9IT+kXYa+nTpc+/9m/GmmejtaFaVdpj+WuNqPf/WyzR+3JQWZ
Y/O7ESF7tUcw0HSxNv/pk6Z13RQClUo6bPHzPF4JJw1tAIbkuyKZ6ZpErQePUEpk
dVZPy7rDD3N/BI//0vBZmy0v1w==
-----END CERTIFICATE-----

View File

@@ -1,15 +0,0 @@
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>桃夭媒体服务</title>
<style type="text/css">
p{text-align:center;}
a{text-decoration:none;}
</style>
</head>
<body>
<p><a href="https://gitee.com/acgist/taoyao">taoyao-client-media</a></p>
<p><a href="https://www.acgist.com">acgist</a></p>
</body>
</html>