[+] media::transport::webrtc::connect

This commit is contained in:
acgist
2023-02-19 11:57:09 +08:00
parent 9023883c5b
commit fd505dfbd2
105 changed files with 1511 additions and 2116 deletions

View File

@@ -0,0 +1,14 @@
{
"apps": [
{
"cwd": "./",
"name": "taoyao-media-server",
"script": "src/Server.js",
"instances": 2,
"exec_mode": "cluster",
"out_file": "./logs/out.log",
"error_file": "./logs/error.log",
"log_date_format": "YYYY-MM-DD HH:mm:ss"
}
]
}

View File

@@ -0,0 +1,12 @@
{
"apps": [
{
"cwd": "./",
"name": "taoyao-media-server",
"script": "src/Server.js",
"out_file": "./logs/out.log",
"error_file": "./logs/error.log",
"log_date_format": "YYYY-MM-DD HH:mm:ss"
}
]
}

View File

@@ -1,93 +0,0 @@
const moment = require("moment");
const config = require("./Config");
/**
* 日志
*/
class Logger {
// 名称
name = config.name;
// 级别
level = ["DEBUG", "INFO", "WARN", "ERROR", "OFF"];
// 级别索引
levelIndex = this.level.indexOf(config.logLevel.toUpperCase());
constructor(prefix) {
if (prefix) {
this.name = this.name + " : " + prefix;
}
}
/**
* debug
*
* @param {...any} args 参数
*
* @returns this
*/
debug(...args) {
return this.log(console.debug, "37m", "DEBUG", args);
}
/**
* info
*
* @param {...any} args 参数
*
* @returns this
*/
info(...args) {
return this.log(console.info, "32m", "INFO", args);
}
/**
* warn
*
* @param {...any} args 参数
*
* @returns this
*/
warn(...args) {
return this.log(console.warn, "33m", "WARN", args);
}
/**
* error
*
* @param {...any} args 参数
*
* @returns this
*/
error(...args) {
return this.log(console.error, "31m", "ERROR", args);
}
/**
* 日志
*
* @param {*} out 输出
* @param {*} color 颜色
* @param {*} level 级别
* @param {*} args 参数
*
* @returns this
*/
log(out, color, level, args) {
if (!args || this.level.indexOf(level) < this.levelIndex) {
return this;
}
if (args.length > 1 && args[0].length > 0) {
out(`\x1B[${color}${this.name} ${moment().format("yyyy-MM-DD HH:mm:ss")} : [${level.padEnd(5, " ")}] :\x1B[0m`, args);
} else if (args.length === 1 && args[0].length > 0) {
out(`\x1B[${color}${this.name} ${moment().format("yyyy-MM-DD HH:mm:ss")} : [${level.padEnd(5, " ")}] :\x1B[0m`, args);
} else {
// 其他情况直接输出换行
out("");
}
return this;
}
}
module.exports = Logger;

View File

@@ -5,9 +5,7 @@ const ws = require("ws");
const https = require("https");
const mediasoup = require("mediasoup");
const config = require("./Config");
const Logger = require("./Logger");
const Signal = require("./Signal");
const { fail } = require("assert");
// 线程名称
process.title = config.name;
@@ -21,8 +19,6 @@ const mediasoupWorkers = [];
let httpsServer;
// WebSocket server
let webSocketServer;
// 日志
const logger = new Logger();
// 信令
const signal = new Signal(mediasoupWorkers);
@@ -31,7 +27,7 @@ const signal = new Signal(mediasoupWorkers);
*/
async function buildMediasoupWorkers() {
const { workerSize } = config.mediasoup;
logger.info("启动Worker", workerSize);
console.info("启动Worker", workerSize);
for (let index = 0; index < workerSize; index++) {
// 新建Worker
const worker = await mediasoup.createWorker({
@@ -42,11 +38,11 @@ async function buildMediasoupWorkers() {
});
// 监听Worker事件
worker.on("died", () => {
logger.warn("Worker停止服务", worker.pid);
console.warn("Worker停止服务", worker.pid);
setTimeout(() => process.exit(1), 2000);
});
worker.observer.on("close", () => {
logger.warn("Worker关闭服务", worker.pid);
console.warn("Worker关闭服务", worker.pid);
});
// 配置WebRTC服务
const webRtcServerOptions = JSON.parse(
@@ -78,22 +74,22 @@ async function buildSignalServer() {
// 配置WebSocket Server
webSocketServer = new ws.Server({ server: httpsServer });
webSocketServer.on("connection", (session) => {
logger.info("打开信令通道", session._socket.remoteAddress);
console.info("打开信令通道", session._socket.remoteAddress);
session.datetime = Date.now();
session.authorize = false;
clients.push(session);
session.on("close", (code) => {
logger.info("关闭信令通道", session._socket.remoteAddress, code);
console.info("关闭信令通道", session._socket.remoteAddress, code);
});
session.on("error", (error) => {
logger.error("信令通道异常", session._socket.remoteAddress, error);
console.error("信令通道异常", session._socket.remoteAddress, error);
});
session.on("message", (message) => {
logger.debug("处理信令消息", message.toString());
console.debug("处理信令消息", message.toString());
try {
signal.on(JSON.parse(message), session);
} catch (error) {
logger.error("处理信令消息异常", message.toString(), error);
console.error("处理信令消息异常", message.toString(), error);
}
});
});
@@ -102,7 +98,7 @@ async function buildSignalServer() {
Number(config.https.listenPort),
config.https.listenIp,
() => {
logger.info("信令服务启动完成");
console.info("信令服务启动完成");
}
);
}
@@ -111,6 +107,10 @@ async function buildSignalServer() {
* 定时任务
*/
async function buildInterval() {
// 定时打印使用情况
setInterval(async () => {
signal.usage();
}, 300 * 1000);
// 定时清理过期无效终端
setInterval(() => {
let failSize = 0;
@@ -132,33 +132,21 @@ async function buildInterval() {
silentSize++;
}
}
logger.info("定时清理无效信令终端(无效|静默|成功|现存)", failSize, silentSize, successSize, clients.length);
console.info("定时清理无效信令终端(无效|静默|成功|现存)", failSize, silentSize, successSize, clients.length);
}, 60 * 1000);
// 定时打印Worker使用情况
setInterval(async () => {
for (const worker of mediasoupWorkers) {
const usage = await worker.getResourceUsage();
logger.info("Worker使用情况", worker.pid, usage);
}
}, 120 * 1000);
}
/**
* 启动方法
*/
async function main() {
logger
.info("桃之夭夭,灼灼其华。")
.info("之子于归,宜其室家。")
.debug("DEBUG")
.info("INFO")
.warn("WARN")
.error("ERROR");
logger.info("开始启动", config.name);
console.info("桃之夭夭,灼灼其华。")
console.info("之子于归,宜其室家。")
console.info("开始启动", config.name);
await buildMediasoupWorkers();
await buildSignalServer();
await buildInterval();
logger.info("启动完成", config.name);
console.info("启动完成", config.name);
}
// 启动服务

View File

@@ -1,10 +1,9 @@
const Logger = require("./Logger");
const config = require("./Config");
/**
* 信令协议
*/
const protocol = {
const protocol = {
// 当前索引
index: 100000,
// 最小索引
@@ -39,44 +38,21 @@ const config = require("./Config");
},
body: {
roomId,
...body
...body,
},
};
return message;
},
};
/**
* Peer
*/
class Peer {
peerId;
device;
displayName;
rtpCapabilities;
sctpCapabilities;
transports = new Map();
producers = new Map();
consumers = new Map();
dataProducers = new Map();
dataConsumers = new Map();
}
/**
* 房间
*/
class Room {
// 是否关闭
close = false;
// 终端
peers = new Map();
// 房间ID
roomId = null;
// 网络节流
networkThrottled = false;
// 信令
signal = null;
// WebRTCServer
@@ -87,6 +63,16 @@ class Room {
audioLevelObserver = null;
// 音频监控
activeSpeakerObserver = null;
// 通道
transports = new Map();
// 生产者
producers = new Map();
// 消费者
consumers = new Map();
// 数据通道生产者
dataProducers = new Map();
// 数据通道消费者
dataConsumers = new Map();
constructor({
roomId,
@@ -94,7 +80,7 @@ class Room {
webRtcServer,
mediasoupRouter,
audioLevelObserver,
activeSpeakerObserver
activeSpeakerObserver,
}) {
this.close = false;
this.roomId = roomId;
@@ -111,56 +97,57 @@ class Room {
handleAudioLevelObserver() {
// 声音
this.audioLevelObserver.on("volumes", (volumes) => {
for(const value of volumes) {
for (const value of volumes) {
const { producer, volume } = value;
this.signal.push(protocol.buildMessage(
"audio::active::speaker",
this.roomId,
{
this.signal.push(
protocol.buildMessage("audio::active::speaker", this.roomId, {
peerId: producer.appData.peerId,
volume: volume
}
));
volume: volume,
})
);
}
});
// 静音
this.audioLevelObserver.on("silence", () => {
this.signal.push(protocol.buildMessage(
"audio::active::speaker",
this.roomId,
{
peerId: null
}
));
this.signal.push(
protocol.buildMessage("audio::active::speaker", this.roomId, {
peerId: null,
})
);
});
}
handleActiveSpeakerObserver() {
this.activeSpeakerObserver.on("dominantspeaker", (dominantSpeaker) => {
logger.debug("dominantspeaker", dominantSpeaker.producer.id);
console.debug("dominantspeaker", dominantSpeaker.producer.id);
});
}
usage() {
console.info("房间标识", this.roomId);
console.info("房间通道数量", this.transports.size);
console.info("房间生产者数量", this.producers.size);
console.info("房间消费者数量", this.consumers.size);
console.info("房间数据生产者数量", this.dataProducers.size);
console.info("房间数据消费者数量", this.dataConsumers.size);
}
close() {
this.close = true;
if(this.mediasoupRouter) {
if (this.mediasoupRouter) {
this.mediasoupRouter.close();
}
}
}
/**
* 信令
*/
class Signal {
// 房间列表
rooms = new Map();
// 信令终端列表
clients = [];
// 日志
logger = new Logger();
// Worker列表
mediasoupWorkers = [];
// Worker索引
@@ -184,15 +171,15 @@ class Signal {
message?.body?.username === config.https.username &&
message?.body?.password === config.https.password
) {
this.logger.debug("授权成功", session._socket.remoteAddress);
console.debug("授权成功", session._socket.remoteAddress);
this.clients.push(session);
session.authorize = true;
message.code = "0000";
message.message = "授权成功";
message.body.username = null;
message.body.password = null;
message.body.username = undefined;
message.body.password = undefined;
} else {
this.logger.warn("授权失败", session._socket.remoteAddress);
console.warn("授权失败", session._socket.remoteAddress);
message.code = "3401";
message.message = "授权失败";
}
@@ -200,15 +187,19 @@ class Signal {
return;
}
// 处理信令
const body = message.body;
switch (message.header.signal) {
case "router::rtp::capabilities":
this.routerRtpCapabilities(session, message);
case "media::router::rtp::capabilities":
this.mediaRouterRtpCapabilities(session, message, body);
break;
case "media::transport::webrtc::connect":
this.mediaTransportWebrtcConnect(session, message, body);
break;
case "media::transport::webrtc::create":
this.mediaTransportWebrtcCreate(session, message, body);
break;
case "room::create":
this.roomCreate(session, message, message.body);
break;
case "transport:webrtc::create":
this.transportWebrtcCreate(session, message, message.body);
this.roomCreate(session, message, body);
break;
}
}
@@ -220,11 +211,11 @@ class Signal {
* @param {*} session 信令通道
*/
push(message, session) {
if(session) {
if (session) {
try {
session.send(JSON.stringify(message));
} catch (error) {
logger.error(
console.error(
"通知信令失败",
session._socket.remoteAddress,
message,
@@ -236,6 +227,19 @@ class Signal {
}
}
/**
* 打印日志
*/
async usage() {
for (const worker of this.mediasoupWorkers) {
const usage = await worker.getResourceUsage();
console.info("Worker使用情况", worker.pid, usage);
}
console.info("路由数量", this.mediasoupWorkers.length);
console.info("房间数量", this.rooms.size);
Array.from(this.rooms.values()).forEach(room => room.usage());
}
/**
* @returns 下个Meidasoup Worker
*/
@@ -247,40 +251,113 @@ class Signal {
return worker;
}
/**
* @param {*} message 消息
*
* @returns 房间
*/
selectRoom(message) {
return this.rooms.get(message.body.roomId);
}
/**
* 路由RTP能力信令
*
*
* @param {*} session 信令通道
* @param {*} message 消息
* @param {*} body 消息主体
*/
routerRtpCapabilities(session, message) {
const room = this.selectRoom(message);
mediaRouterRtpCapabilities(session, message, body) {
const { roomId } = body;
const room = this.rooms.get(roomId);
message.body = room.mediasoupRouter.rtpCapabilities;
this.push(message, session);
}
async mediaTransportWebrtcConnect(session, message, body) {
const { roomId, transportId, dtlsParameters } = body;
const room = this.rooms.get(roomId);
const transport = room.transports.get(transportId);
if (!transport) {
throw new Error(`transport with id "${transportId}" not found`);
}
await transport.connect({ dtlsParameters });
message.body = { roomId };
this.push(message, session);
}
/**
* 创建房间信令
*
* @param {*} session 信令通道
* @param {*} message 消息
* @param {*} body 消息主体
*
*/
async mediaTransportWebrtcCreate(session, message, body) {
const self = this;
const { roomId, forceTcp, producing, consuming, sctpCapabilities } = body;
const webRtcTransportOptions = {
...config.mediasoup.webRtcTransportOptions,
appData: { producing, consuming },
enableSctp: Boolean(sctpCapabilities),
numSctpStreams: (sctpCapabilities || {}).numStreams,
};
if (forceTcp) {
webRtcTransportOptions.enableUdp = false;
webRtcTransportOptions.enableTcp = true;
}
const room = this.rooms.get(roomId);
const transport = await room.mediasoupRouter.createWebRtcTransport({
...webRtcTransportOptions,
webRtcServer: room.webRtcServer,
});
transport.on("dtlsstatechange", (dtlsState) => {
console.debug(
'WebRtcTransport dtlsstatechange event',
dtlsState
);
});
transport.on("sctpstatechange", (sctpState) => {
console.debug(
'WebRtcTransport sctpstatechange event',
sctpState
);
});
// await transport.enableTraceEvent([ 'probation', 'bwe' ]);
await transport.enableTraceEvent(["bwe"]);
transport.on("trace", (trace) => {
console.debug(
'transport trace event',
trace,
trace.type,
transport.id,
);
});
// Store the WebRtcTransport into the protoo Peer data Object.
room.transports.set(transport.id, transport);
message.body = {
transportId: transport.id,
iceCandidates: transport.iceCandidates,
iceParameters: transport.iceParameters,
dtlsParameters: transport.dtlsParameters,
sctpParameters: transport.sctpParameters,
};
self.push(
message,
session
);
const { maxIncomingBitrate } = config.mediasoup.webRtcTransportOptions;
// If set, apply max incoming bitrate limit.
if (maxIncomingBitrate) {
try {
await transport.setMaxIncomingBitrate(maxIncomingBitrate);
} catch (error) {}
}
}
/**
* 创建房间信令
*
* @param {*} session 信令通道
* @param {*} message 消息
* @param {*} body 消息主体
*
* @returns 房间
*/
async roomCreate(session, message, body) {
const roomId = body.roomId;
let room = this.rooms.get(roomId);
if (room) {
this.push(message, session);
return room;
}
const mediasoupWorker = this.nextMediasoupWorker();
@@ -296,113 +373,24 @@ class Signal {
const audioLevelObserver = await mediasoupRouter.createAudioLevelObserver({
maxEntries: 1,
threshold: -80,
interval: 2000
});
const activeSpeakerObserver = await mediasoupRouter.createActiveSpeakerObserver({
interval: 500
interval: 2000,
});
const activeSpeakerObserver =
await mediasoupRouter.createActiveSpeakerObserver({
interval: 500,
});
room = new Room({
roomId,
webRtcServer: mediasoupWorker.appData.webRtcServer,
mediasoupRouter,
audioLevelObserver,
activeSpeakerObserver
activeSpeakerObserver,
});
this.rooms.set(roomId, room);
this.logger.info("创建房间", roomId, room);
console.info("创建房间", roomId);
this.push(message, session);
return room;
}
/**
* @param {*} session 信令通道
* @param {*} message 消息
* @param {*} body 消息主体
*/
transportWebrtcCreate(session, message, body) {
const {
roomId,
forceTcp,
producing,
consuming,
sctpCapabilities
} = body;
const webRtcTransportOptions =
{
...config.mediasoup.webRtcTransportOptions,
enableSctp : Boolean(sctpCapabilities),
numSctpStreams : (sctpCapabilities || {}).numStreams,
appData : { producing, consuming }
};
if (forceTcp)
{
webRtcTransportOptions.enableUdp = false;
webRtcTransportOptions.enableTcp = true;
}
const room = this.rooms.get(roomId);
const transport = await room.mediasoupRouter.createWebRtcTransport(
{
...webRtcTransportOptions,
webRtcServer : room.webRtcServer
});
transport.on('sctpstatechange', (sctpState) =>
{
logger.debug('WebRtcTransport "sctpstatechange" event [sctpState:%s]', sctpState);
});
transport.on('dtlsstatechange', (dtlsState) =>
{
if (dtlsState === 'failed' || dtlsState === 'closed')
logger.warn('WebRtcTransport "dtlsstatechange" event [dtlsState:%s]', dtlsState);
});
// NOTE: For testing.
// await transport.enableTraceEvent([ 'probation', 'bwe' ]);
await transport.enableTraceEvent([ 'bwe' ]);
transport.on('trace', (trace) =>
{
logger.debug(
'transport "trace" event [transportId:%s, trace.type:%s, trace:%o]',
transport.id, trace.type, trace);
if (trace.type === 'bwe' && trace.direction === 'out')
{
peer.notify(
'downlinkBwe',
{
desiredBitrate : trace.info.desiredBitrate,
effectiveDesiredBitrate : trace.info.effectiveDesiredBitrate,
availableBitrate : trace.info.availableBitrate
})
.catch(() => {});
}
});
// Store the WebRtcTransport into the protoo Peer data Object.
peer.data.transports.set(transport.id, transport);
self.push(
{
id : transport.id,
iceParameters : transport.iceParameters,
iceCandidates : transport.iceCandidates,
dtlsParameters : transport.dtlsParameters,
sctpParameters : transport.sctpParameters
}, session);
const { maxIncomingBitrate } = config.mediasoup.webRtcTransportOptions;
// If set, apply max incoming bitrate limit.
if (maxIncomingBitrate)
{
try { await transport.setMaxIncomingBitrate(maxIncomingBitrate); }
catch (error) {}
}
}
}
module.exports = Signal;