[+] 信令调整

This commit is contained in:
acgist
2023-03-03 08:09:35 +08:00
parent b53f50adf7
commit c3dbe52d5c
36 changed files with 588 additions and 306 deletions

View File

@@ -0,0 +1,893 @@
const config = require("./Config");
const process = require("child_process");
const WebSocket = require("ws");
/**
* 信令协议
*/
const protocol = {
// 当前索引
index: 0,
// 最大索引
maxIndex: 999,
// 终端索引
clientIndex: 99999,
/**
* @returns 索引
*/
buildId() {
if (++this.index > this.maxIndex) {
this.index = 0;
}
const date = new Date();
return (
100000000000000 * date.getDate() +
1000000000000 * date.getHours() +
10000000000 * date.getMinutes() +
100000000 * date.getSeconds() +
1000 * this.clientIndex +
this.index
);
},
/**
* @param {*} signal 信令标识
* @param {*} body 消息主体
* @param {*} id 消息ID
*
* @returns 信令消息
*/
buildMessage(signal, body = {}, id) {
const message = {
header: {
v: config.signal.version,
id: id || this.buildId(),
signal: signal,
},
body: body,
};
return message;
},
};
/**
* 信令通道
*/
const signalChannel = {
// 桃夭
taoyao: null,
// 通道
channel: null,
// 地址
address: null,
// 心跳时间
heartbeatTime: 30 * 1000,
// 心跳定时器
heartbeatTimer: null,
// 是否重连
reconnection: true,
// 重连定时器
reconnectTimer: null,
// 防止重复重连
lockReconnect: false,
// 当前重连时间
reconnectionTimeout: 5 * 1000,
// 最小重连时间
minReconnectionDelay: 5 * 1000,
// 最大重连时间
maxReconnectionDelay: 60 * 1000,
/**
* 心跳
*/
heartbeat() {
const me = this;
if (me.heartbeatTimer) {
clearTimeout(me.heartbeatTimer);
}
me.heartbeatTimer = setTimeout(async function () {
if (me.channel && me.channel.readyState === WebSocket.OPEN) {
me.push(
// TODO电池信息
protocol.buildMessage("client::heartbeat", {
battery: 100,
charging: true,
})
);
me.heartbeat();
} else {
console.warn("心跳失败:", me.address);
}
}, me.heartbeatTime);
},
/**
* 连接
*
* @param {*} address 地址
* @param {*} reconnection 是否重连
*
* @returns Promise
*/
async connect(address, reconnection = true) {
const me = this;
if (me.channel && me.channel.readyState === WebSocket.OPEN) {
return new Promise((resolve, reject) => {
resolve(me.channel);
});
}
me.address = address;
me.reconnection = reconnection;
return new Promise((resolve, reject) => {
console.debug("连接信令通道:", me.address);
me.channel = new WebSocket(me.address, { handshakeTimeout: 5000 });
me.channel.on("open", async function () {
console.info("打开信令通道:", me.address);
// TODO电池信息
me.push(
protocol.buildMessage("client::register", {
clientId: config.signal.clientId,
name: config.signal.name,
clientType: "MEDIA",
battery: 100,
charging: true,
username: config.signal.username,
password: config.signal.password,
})
);
me.reconnectionTimeout = me.minReconnectionDelay;
me.taoyao.connect = true;
me.heartbeat();
resolve(me.channel);
});
me.channel.on("close", async function () {
console.warn("信令通道关闭:", me.address);
me.taoyao.connect = false;
if (me.reconnection) {
me.reconnect();
}
// 不要失败回调
});
me.channel.on("error", async function (e) {
console.error("信令通道异常:", me.address, e);
me.taoyao.connect = false;
if (me.reconnection) {
me.reconnect();
}
// 不要失败回调
});
me.channel.on("message", async function (data) {
try {
const content = data.toString();
console.debug("信令通道消息:", content);
me.taoyao.on(JSON.parse(content));
} catch (error) {
console.error("处理信令消息异常:", data, error);
}
});
});
},
/**
* 重连
*/
reconnect() {
const me = this;
if (
me.lockReconnect ||
(me.channel && me.channel.readyState === WebSocket.OPEN)
) {
return;
}
me.lockReconnect = true;
if (me.reconnectTimer) {
clearTimeout(me.reconnectTimer);
}
// 定时重连
me.reconnectTimer = setTimeout(function () {
console.info("重连信令通道:", me.address);
me.connect(me.address, me.reconnection);
me.lockReconnect = false;
}, me.reconnectionTimeout);
me.reconnectionTimeout = Math.min(
me.reconnectionTimeout + me.minReconnectionDelay,
me.maxReconnectionDelay
);
},
/**
* 异步请求
*
* @param {*} message 消息
*/
push(message) {
try {
this.channel.send(JSON.stringify(message));
} catch (error) {
console.error("异步请求异常:", message, error);
}
},
/**
* 关闭通道
*/
close() {
const me = this;
clearTimeout(me.heartbeatTimer);
clearTimeout(me.reconnectTimer);
me.reconnection = false;
me.channel.close();
},
};
/**
* 房间
*/
class Room {
// 是否关闭
close = false;
// 房间ID
roomId = null;
// 桃夭
taoyao = null;
// WebRTCServer
webRtcServer = null;
// 路由
mediasoupRouter = null;
// 音频监控
audioLevelObserver = null;
// 音频监控
activeSpeakerObserver = null;
// 消费者复制数量
consumerReplicas = 0;
// 通道
transports = new Map();
// 生产者
producers = new Map();
// 消费者
consumers = new Map();
// 数据通道生产者
dataProducers = new Map();
// 数据通道消费者
dataConsumers = new Map();
constructor({
roomId,
taoyao,
webRtcServer,
mediasoupRouter,
audioLevelObserver,
activeSpeakerObserver,
}) {
this.close = false;
this.roomId = roomId;
this.networkThrottled = false;
this.taoyao = taoyao;
this.webRtcServer = webRtcServer;
this.mediasoupRouter = mediasoupRouter;
this.audioLevelObserver = audioLevelObserver;
this.activeSpeakerObserver = activeSpeakerObserver;
this.handleAudioLevelObserver();
this.handleActiveSpeakerObserver();
}
/**
* 声音监控
*/
handleAudioLevelObserver() {
const self = this;
// 声音
self.audioLevelObserver.on("volumes", (volumes) => {
for (const value of volumes) {
const { producer, volume } = value;
signalChannel.push(
protocol.buildMessage("media::audio::active::speaker", {
roomId: self.roomId,
clientId: producer.clientId,
volume: volume,
})
);
}
});
// 静音
self.audioLevelObserver.on("silence", () => {
signalChannel.push(
protocol.buildMessage("media::audio::active::speaker", {
roomId: self.roomId,
})
);
});
}
/**
* 说话监控
*/
handleActiveSpeakerObserver() {
const self = this;
self.activeSpeakerObserver.on("dominantspeaker", (dominantSpeaker) => {
console.debug(
"dominantspeaker",
dominantSpeaker.producer.id,
dominantSpeaker.producer.clientId
);
});
}
/**
* 使用情况
*/
usage() {
console.info("房间标识:", this.roomId);
console.info("房间通道数量:", this.transports.size);
console.info("房间生产者数量:", this.producers.size);
console.info("房间消费者数量:", this.consumers.size);
console.info("房间数据生产者数量:", this.dataProducers.size);
console.info("房间数据消费者数量:", this.dataConsumers.size);
}
/**
* 关闭资源
*/
close() {
const self = this;
if (self.close) {
return;
}
self.close = true;
if (self.mediasoupRouter) {
self.mediasoupRouter.close();
}
}
}
/**
* 桃夭
*/
class Taoyao {
// 是否连接
connect = false;
// 房间列表
rooms = new Map();
// 回调事件
callbackMapping = new Map();
// Worker列表
mediasoupWorkers = [];
// Worker索引
nextMediasoupWorkerIndex = 0;
constructor(mediasoupWorkers) {
this.mediasoupWorkers = mediasoupWorkers;
// 定时打印使用情况
setInterval(async () => {
this.usage();
}, 300 * 1000);
}
/**
* 处理信令消息
*
* @param {*} message 消息
*/
on(message) {
// 请求回调
if (this.callbackMapping.has(message.header.id)) {
try {
this.callbackMapping.get(message.header.id)(message);
} finally {
this.callbackMapping.delete(message.header.id);
}
return;
}
const body = message.body;
switch (message.header.signal) {
case "client::reboot":
this.clientReboot(message, body);
break;
case "client::shutdown":
this.clientShutdown(message, body);
break;
case "client::register":
protocol.clientIndex = body.index;
break;
case "media::ice::restart":
this.mediaIceRestart(message, body);
break;
case "media::consume":
this.mediaConsume(message, body);
break;
case "media::produce":
this.mediaProduce(message, body);
break;
case "media::router::rtp::capabilities":
this.mediaRouterRtpCapabilities(message, body);
break;
case "media::transport::webrtc::connect":
this.mediaTransportWebrtcConnect(message, body);
break;
case "media::transport::webrtc::create":
this.mediaTransportWebrtcCreate(message, body);
break;
case "room::create":
this.roomCreate(message, body);
break;
}
}
/**
* 异步请求
*
* @param {*} message 消息
*/
push(message) {
try {
signalChannel.channel.send(JSON.stringify(message));
} catch (error) {
console.error("异步请求异常:", message, error);
}
}
/**
* 同步请求
*
* @param {*} message 消息
*
* @returns Promise
*/
async request(message) {
const me = this;
return new Promise((resolve, reject) => {
let done = false;
// 注册回调
me.callbackMapping.set(message.header.id, (response) => {
resolve(response);
done = true;
});
// 发送消息
try {
signalChannel.channel.send(JSON.stringify(message));
} catch (error) {
reject("同步请求异常", error);
}
// 设置超时
setTimeout(() => {
if (!done) {
me.callbackMapping.delete(message.header.id);
reject("请求超时", message);
}
}, 5000);
});
}
/**
* 打印日志
*/
async usage() {
for (const worker of this.mediasoupWorkers) {
const usage = await worker.getResourceUsage();
console.info("Worker使用情况", worker.pid, usage);
}
console.info("路由数量:", this.mediasoupWorkers.length);
console.info("房间数量:", this.rooms.size);
Array.from(this.rooms.values()).forEach((room) => room.usage());
}
/**
* @returns 下个Meidasoup Worker
*/
nextMediasoupWorker() {
const worker = this.mediasoupWorkers[this.nextMediasoupWorkerIndex];
if (++this.nextMediasoupWorkerIndex === this.mediasoupWorkers.length) {
this.nextMediasoupWorkerIndex = 0;
}
return worker;
}
/**
* 重启终端信令
*
* @param {*} message 消息
* @param {*} body 消息主体
*/
clientReboot(message, body) {
process.exec(
`pm2 restart ${config.signal.clientId}`,
function (error, stdout, stderr) {
console.info("重启媒体服务:", error, stdout, stderr);
}
);
// this.push(message);
}
/**
* 关闭终端信令
*
* @param {*} message 消息
* @param {*} body 消息主体
*/
clientShutdown(message, body) {
process.exec(
`pm2 stop ${config.signal.clientId}`,
function (error, stdout, stderr) {
console.info("关闭媒体服务:", error, stdout, stderr);
}
);
// this.push(message);
}
/**
* 媒体重启ICE信令
*
* @param {*} message 消息
* @param {*} body 消息主体
*/
async mediaIceRestart(message, body) {
const { roomId, transportId } = body;
const room = this.rooms.get(roomId);
const transport = room.transports.get(transportId);
const iceParameters = await transport.restartIce();
message.body.iceParameters = iceParameters;
this.push(message);
}
async mediaProduce(message, body) {
const self = this;
const {
kind,
roomId,
clientId,
streamId,
appData,
transportId,
rtpParameters,
} = body;
const room = self.rooms.get(roomId);
const transport = room.transports.get(transportId);
const producer = await transport.produce({
kind,
appData,
rtpParameters,
// keyFrameRequestDelay: 5000
});
producer.clientId = clientId;
producer.streamId = streamId;
room.producers.set(producer.id, producer);
// 打分
producer.on("score", (score) => {
self.push(
protocol.buildMessage("media::producer::score", {
roomId: roomId,
producerId: producer.id,
score,
})
);
});
producer.on("videoorientationchange", (videoOrientation) => {
logger.debug(
'producer "videoorientationchange" event [producerId:%s, videoOrientation:%o]',
producer.id,
videoOrientation
);
});
producer.on("trace", (trace) => {
logger.debug(
'producer "trace" event [producerId:%s, trace.type:%s, trace:%o]',
producer.id,
trace.type,
trace
);
});
message.body = { kind: kind, producerId: producer.id };
this.push(message);
if (producer.kind === "audio") {
room.audioLevelObserver
.addProducer({ producerId: producer.id })
.catch(() => {});
room.activeSpeakerObserver
.addProducer({ producerId: producer.id })
.catch(() => {});
}
}
async mediaConsume(message, body) {
const {
roomId,
clientId,
sourceId,
streamId,
producerId,
transportId,
rtpCapabilities,
} = body;
const room = this.rooms.get(roomId);
const producer = room.producers.get(producerId);
const transport = room.transports.get(transportId);
if (
!room ||
!producer ||
!transport ||
!rtpCapabilities ||
!room.mediasoupRouter.canConsume({
producerId: producerId,
rtpCapabilities: rtpCapabilities,
})
) {
console.warn(
"不能消费媒体:",
roomId,
clientId,
producerId,
transportId,
rtpCapabilities
);
return;
}
const promises = [];
const consumerCount = 1 + room.consumerReplicas;
for (let i = 0; i < consumerCount; i++) {
promises.push(
(async () => {
let consumer;
try {
consumer = await transport.consume({
producerId: producerId,
rtpCapabilities: rtpCapabilities,
// 暂停
paused: true,
});
} catch (error) {
console.error(
"创建消费者异常:",
roomId,
clientId,
producerId,
transportId,
rtpCapabilities,
error
);
return;
}
consumer.clientId = clientId;
consumer.streamId = streamId;
room.consumers.set(consumer.id, consumer);
consumer.on("transportclose", () => {
room.consumers.delete(consumer.id);
});
consumer.on("producerclose", () => {
room.consumers.delete(consumer.id);
this.push(
protocol.buildMessage("media::consumer::close", {
consumerId: consumer.id,
})
);
});
consumer.on("producerpause", () => {
this.push(
protocol.buildMessage("media::consumer::pause", {
consumerId: consumer.id,
})
);
});
consumer.on("producerresume", () => {
this.push(
protocol.buildMessage("media::consumer::resume", {
consumerId: consumer.id,
})
);
});
consumer.on("score", (score) => {
this.push(
protocol.buildMessage("media::consumer::score", {
consumerId: consumer.id,
score,
})
);
});
consumer.on("layerschange", (layers) => {
this.push(
protocol.buildMessage("media::consumer::layers::change", {
consumerId: consumer.id,
spatialLayer: layers ? layers.spatialLayer : null,
temporalLayer: layers ? layers.temporalLayer : null,
})
);
});
consumer.on("trace", (trace) => {
logger.debug(
'consumer "trace" event [producerId:%s, trace.type:%s, trace:%o]',
consumer.id,
trace.type,
trace
);
});
// TODO改为同步
this.push(
protocol.buildMessage("media::consume", {
//await this.request("media::consume", {
kind: consumer.kind,
type: consumer.type,
roomId: roomId,
clientId: clientId,
sourceId: sourceId,
streamId: streamId,
producerId: producerId,
consumerId: consumer.id,
rtpParameters: consumer.rtpParameters,
appData: producer.appData,
producerPaused: consumer.producerPaused,
})
);
await consumer.resume();
this.push(
protocol.buildMessage("media::consumer::score", {
consumerId: consumer.id,
score: consumer.score,
})
);
})()
);
}
try {
await Promise.all(promises);
} catch (error) {
console.warn("_createConsumer() | failed:%o", error);
}
}
/**
* 路由RTP能力信令
*
* @param {*} message 消息
* @param {*} body 消息主体
*/
mediaRouterRtpCapabilities(message, body) {
const { roomId } = body;
const room = this.rooms.get(roomId);
message.body.rtpCapabilities = room.mediasoupRouter.rtpCapabilities;
this.push(message);
}
/**
* 连接WebRTC通道信令
*
* @param {*} message 消息
* @param {*} body 消息主体
*/
async mediaTransportWebrtcConnect(message, body) {
const { roomId, transportId, dtlsParameters } = body;
const room = this.rooms.get(roomId);
const transport = room.transports.get(transportId);
await transport.connect({ dtlsParameters });
message.body = { roomId: roomId, transportId: transport.id };
this.push(message);
}
/**
* 创建WebRTC通道信令
*
* @param {*} message 消息
* @param {*} body 消息主体
*/
async mediaTransportWebrtcCreate(message, body) {
const self = this;
const {
roomId,
clientId,
forceTcp,
producing,
consuming,
sctpCapabilities,
} = body;
const webRtcTransportOptions = {
...config.mediasoup.webRtcTransportOptions,
appData: { producing, consuming },
enableSctp: Boolean(sctpCapabilities),
numSctpStreams: (sctpCapabilities || {}).numStreams,
};
if (forceTcp) {
webRtcTransportOptions.enableUdp = false;
webRtcTransportOptions.enableTcp = true;
}
const room = this.rooms.get(roomId);
const transport = await room.mediasoupRouter.createWebRtcTransport({
...webRtcTransportOptions,
webRtcServer: room.webRtcServer,
});
transport.clientId = clientId;
transport.on("icestatechange", (iceState) => {
console.debug(
"WebRtcTransport icestatechange event",
iceState,
transport.id
);
});
transport.on("dtlsstatechange", (dtlsState) => {
console.debug(
"WebRtcTransport dtlsstatechange event",
dtlsState,
transport.id
);
});
transport.on("sctpstatechange", (sctpState) => {
console.debug(
"WebRtcTransport sctpstatechange event",
sctpState,
transport.id
);
});
await transport.enableTraceEvent(["bwe"]);
transport.on("trace", (trace) => {
console.debug("transport trace event", trace, trace.type, transport.id);
});
// 可配置的事件
// transport.on("routerclose", fn());
// transport.on("listenserverclose", fn());
// transport.observer.on("close", fn());
// transport.observer.on("newproducer", fn(producer));
// transport.observer.on("newconsumer", fn(consumer));
// transport.observer.on("newdataproducer", fn(dataProducer));
// transport.observer.on("newdataconsumer", fn(dataConsumer));
// transport.observer.on("trace", fn(trace));
room.transports.set(transport.id, transport);
message.body = {
transportId: transport.id,
iceCandidates: transport.iceCandidates,
iceParameters: transport.iceParameters,
dtlsParameters: transport.dtlsParameters,
sctpParameters: transport.sctpParameters,
};
self.push(message);
const { maxIncomingBitrate } = config.mediasoup.webRtcTransportOptions;
// If set, apply max incoming bitrate limit.
if (maxIncomingBitrate) {
try {
await transport.setMaxIncomingBitrate(maxIncomingBitrate);
} catch (error) {}
}
}
/**
* 创建房间信令
*
* @param {*} message 消息
* @param {*} body 消息主体
*
* @returns 房间
*/
async roomCreate(message, body) {
const roomId = body.roomId;
let room = this.rooms.get(roomId);
if (room) {
this.push(message);
return room;
}
const mediasoupWorker = this.nextMediasoupWorker();
const { mediaCodecs } = config.mediasoup.routerOptions;
const mediasoupRouter = await mediasoupWorker.createRouter({ mediaCodecs });
mediasoupRouter.observer.on("close", () => {
// TODO通知房间关闭
});
// 可配置的事件
// mediasoupRouter.on("workerclose", () => {});
// mediasoupRouter.observer.on("newtransport", fn(transport));
// TODO下面两个监控改为配置启用
const audioLevelObserver = await mediasoupRouter.createAudioLevelObserver({
maxEntries: 1,
threshold: -80,
interval: 2000,
});
const activeSpeakerObserver =
await mediasoupRouter.createActiveSpeakerObserver({
interval: 500,
});
room = new Room({
roomId,
webRtcServer: mediasoupWorker.appData.webRtcServer,
mediasoupRouter,
audioLevelObserver,
activeSpeakerObserver,
});
this.rooms.set(roomId, room);
console.info("创建房间", roomId);
this.push(message);
return room;
}
}
module.exports = { Taoyao, signalChannel };