[+] 房间穿肩

This commit is contained in:
acgist
2023-02-12 13:13:17 +08:00
parent 5f85dfccca
commit e8ae344d11
55 changed files with 788 additions and 427 deletions

View File

@@ -6,6 +6,7 @@ const os = require("os");
module.exports = {
// 服务名称
name: "taoyao-media-server",
version: "1.0.0",
// 交互式命令行
command: true,
// 日志级别
@@ -30,7 +31,8 @@ module.exports = {
workerSize: Object.keys(os.cpus()).length,
// Workerhttps://mediasoup.org/documentation/v3/mediasoup/api/#WorkerSettings
workerSettings: {
logLevel: "info",
// 级别debug | warn | error | none
logLevel: "warn",
logTags: [
"bwe",
"ice",

View File

@@ -3,10 +3,19 @@
const fs = require("fs");
const ws = require("ws");
const https = require("https");
// const mediasoup = require("mediasoup");
const mediasoup = require("mediasoup");
const config = require("./Config");
const Logger = require("./Logger");
const Signal = require("./Signal");
const { fail } = require("assert");
// 线程名称
process.title = config.name;
// 无效信令终端列表
const clients = [];
// Mediasoup Worker列表
const mediasoupWorkers = [];
// HTTPS server
let httpsServer;
@@ -15,18 +24,12 @@ let webSocketServer;
// 日志
const logger = new Logger();
// 信令
const signal = new Signal();
// 无效信令终端列表
const client = [];
// Mediasoup Worker列表
const mediasoupWorker = [];
// 配置名称
process.title = config.name;
const signal = new Signal(mediasoupWorkers);
/**
* 启动Mediasoup Worker
*/
async function buildMediasoupWorker() {
async function buildMediasoupWorkers() {
const { workerSize } = config.mediasoup;
logger.info("启动Mediasoup Worker", workerSize);
for (let i = 0; i < workerSize; i++) {
@@ -43,11 +46,11 @@ async function buildMediasoupWorker() {
setTimeout(() => process.exit(1), 2000);
});
// 加入Worker队列
mediasoupWorker.push(worker);
mediasoupWorkers.push(worker);
// 配置WebRTC服务
if (process.env.MEDIASOUP_USE_WEBRTC_SERVER !== "false") {
// 配置Worker端口
const portIncrement = mediasoupWorker.length - 1;
const portIncrement = mediasoupWorkers.length - 1;
const webRtcServerOptions = JSON.parse(
JSON.stringify(config.mediasoup.webRtcServerOptions)
);
@@ -85,7 +88,7 @@ async function buildSignalServer() {
logger.info("打开信令通道", session._socket.remoteAddress);
session.datetime = Date.now();
session.authorize = false;
client.push(session);
clients.push(session);
session.on("close", (code) => {
logger.info("关闭信令通道", session._socket.remoteAddress, code);
});
@@ -93,11 +96,11 @@ async function buildSignalServer() {
logger.error("信令通道异常", session._socket.remoteAddress, error);
});
session.on("message", (message) => {
logger.debug("处理信令消息", message);
logger.debug("处理信令消息", message.toString());
try {
signal.on(JSON.parse(message), session);
} catch (error) {
logger.error("处理信令消息异常", message, error);
logger.error("处理信令消息异常", message.toString(), error);
}
});
});
@@ -117,18 +120,23 @@ async function buildSignalServer() {
async function buildClientInterval() {
setInterval(() => {
const datetime = Date.now();
const oldLength = client.length;
for (let i = 0; i < client.length; i++) {
const session = client[i];
let failSize = 0;
let successSize = 0;
for (let i = 0; i < clients.length; i++) {
const session = clients[i];
// 超过五秒自动关闭
if (datetime - session.datetime >= 5000) {
client.splice(i, 1);
session.close();
clients.splice(i, 1);
if(session.authorize) {
successSize++;
} else {
failSize++;
session.close();
}
i--;
}
}
const newLength = client.length;
logger.info("定时清理无效信令终端", oldLength - newLength);
logger.info("定时清理无效信令终端", failSize, successSize, clients.length);
}, 60 * 1000);
}
@@ -170,7 +178,7 @@ async function buildCommandConsole() {
async function main() {
logger.debug("DEBUG").info("INFO").warn("WARN").error("ERROR");
logger.info("开始启动", config.name);
// await buildMediasoupWorker();
await buildMediasoupWorkers();
await buildSignalServer();
await buildClientInterval();
await buildCommandConsole();

View File

@@ -1,21 +1,146 @@
const Logger = require("./Logger");
const config = require("./Config");
/**
* 信令协议
*/
const protocol = {
// 当前索引
index: 100000,
// 最小索引
minIndex: 100000,
// 最大索引
maxIndex: 999999,
/**
* @returns 索引
*/
buildId: function () {
if (this.index++ >= this.maxIndex) {
this.index = this.minIndex;
}
return Date.now() + "" + this.index;
},
/**
* 生成信令消息
*
* @param {*} signal 信令标识
* @param {*} roomId 房间标识
* @param {*} body 信令消息
* @param {*} id ID
*
* @returns 信令消息
*/
buildMessage: function (signal, roomId, body = {}, id) {
let message = {
header: {
v: config.version,
id: id || this.buildId(),
signal: signal,
},
body: {
roomId,
...body
},
};
return message;
},
};
/**
* 房间
*/
class Room {
// 是否关闭
close = false;
// 房间ID
roomId = null;
// 网络节流
networkThrottled = false;
// 信令
signal = null;
// WebRTCServer
webRtcServer = null;
// 路由
mediasoupRouter = null;
// 音频监控
audioLevelObserver = null;
// 音频监控
activeSpeakerObserver = null;
constructor({
roomId,
signal,
webRtcServer,
mediasoupRouter,
audioLevelObserver,
activeSpeakerObserver
}) {
this.close = false;
this.roomId = roomId;
this.networkThrottled = false;
this.signal = signal;
this.webRtcServer = webRtcServer;
this.mediasoupRouter = mediasoupRouter;
this.audioLevelObserver = audioLevelObserver;
this.activeSpeakerObserver = activeSpeakerObserver;
this.handleAudioLevelObserver();
this.handleActiveSpeakerObserver();
}
handleAudioLevelObserver() {
// 声音
this.audioLevelObserver.on("volumes", (volumes) => {
for(const value of volumes) {
const { producer, volume } = value;
this.signal.push(protocol.buildMessage(
"audio::active::speaker",
this.roomId,
{
peerId: producer.appData.peerId,
volume: volume
}
));
}
});
// 静音
this.audioLevelObserver.on('silence', () => {
this.signal.push(protocol.buildMessage(
"audio::active::speaker",
this.roomId,
{
peerId: null
}
));
});
}
handleActiveSpeakerObserver() {
this.activeSpeakerObserver.on('dominantspeaker', (dominantSpeaker) => {
logger.debug("dominantspeaker", dominantSpeaker.producer.id);
});
}
}
/**
* 信令
*/
class Signal {
// 房间列表
rooms = new Map();
// 信令终端列表
client = [];
clients = [];
// 日志
logger = new Logger();
// Mediasoup Worker列表
mediasoupWorker = [];
mediasoupWorkers = [];
// Mediasoup Worker索引
nextMediasoupWorkerIndex = 0;
constructor(mediasoupWorker) {
this.mediasoupWorker = mediasoupWorker;
constructor(mediasoupWorkers) {
this.mediasoupWorkers = mediasoupWorkers;
}
/**
@@ -33,7 +158,7 @@ class Signal {
message?.body?.password === config.https.password
) {
this.logger.debug("授权成功", session._socket.remoteAddress);
client.push(session);
this.clients.push(session);
session.authorize = true;
message.code = "0000";
message.message = "授权成功";
@@ -49,6 +174,12 @@ class Signal {
}
// 处理信令
switch (message.header.signal) {
case "router::rtp::capabilities":
this.routerRtpCapabilities(session, message);
break;
case "room::create":
this.roomCreate(session, message, message.body);
break;
}
}
@@ -56,30 +187,97 @@ class Signal {
* 通知信令
*
* @param {*} message 消息
* @param {*} session websocket
* @param {*} session 信令通道
*/
push(message, session) {
try {
session.send(JSON.stringify(message));
} catch (error) {
logger.error(
"通知信令失败",
session._socket.remoteAddress,
message,
error
);
if(session) {
try {
session.send(JSON.stringify(message));
} catch (error) {
logger.error(
"通知信令失败",
session._socket.remoteAddress,
message,
error
);
}
} else {
this.clients.forEach((session) => this.push(message, session));
}
}
/**
* 通知信令
*
* @returns 下个Meidasoup Worker
*/
nextMediasoupWorker() {
const worker = this.mediasoupWorkers[this.nextMediasoupWorkerIndex];
if (++this.nextMediasoupWorkerIndex === this.mediasoupWorkers.length) {
this.nextMediasoupWorkerIndex = 0;
}
return worker;
}
/**
* @param {*} message 消息
*
* @returns 房间
*/
selectRoom(message) {
return this.rooms.get(message.body.roomId);
}
/**
* 路由RTP能力信令
*
* @param {*} session 信令通道
* @param {*} message 消息
*/
push(message) {
this.client.forEach((session) => this.push(message, session));
routerRtpCapabilities(session, message) {
const room = this.selectRoom(message);
message.body = room.mediasoupRouter.rtpCapabilities;
this.push(message, session);
}
/**
* 创建房间信令
*
* @param {*} session 信令通道
* @param {*} message 消息
* @param {*} body 消息主体
*
* @returns 路由
*/
async roomCreate(session, message, body) {
const roomId = body.roomId;
let room = this.rooms.get(roomId);
if (room) {
return room;
}
const mediasoupWorker = this.nextMediasoupWorker();
const { mediaCodecs } = config.mediasoup.routerOptions;
const mediasoupRouter = await mediasoupWorker.createRouter({ mediaCodecs });
// TODO下面两个监控改为配置启用
const audioLevelObserver = await mediasoupRouter.createAudioLevelObserver({
maxEntries: 1,
threshold: -80,
interval: 2000
});
const activeSpeakerObserver = await mediasoupRouter.createActiveSpeakerObserver({
interval: 500
});
room = new Room({
roomId,
webRtcServer: mediasoupWorker.appData.webRtcServer,
mediasoupRouter,
audioLevelObserver,
activeSpeakerObserver
});
this.rooms.set(roomId, room);
this.logger.info("创建房间", roomId, room);
this.push(message, session);
return room;
}
}
module.exports = Signal;