[*] 日常优化

This commit is contained in:
acgist
2023-09-10 08:52:22 +08:00
parent eb4510bf3d
commit dc2334e8b6
3 changed files with 193 additions and 168 deletions

View File

@@ -480,12 +480,12 @@ class Taoyao {
case "platform::error": case "platform::error":
me.platformError(message, body); me.platformError(message, body);
break; break;
case "room::create":
me.roomCreate(message, body);
break;
case "room::close": case "room::close":
me.roomClose(message, body); me.roomClose(message, body);
break; break;
case "room::create":
me.roomCreate(message, body);
break;
} }
} }
@@ -1005,32 +1005,6 @@ class Taoyao {
} }
} }
/**
* 查询生产者状态信令
*
* @param {*} message 信令消息
* @param {*} body 消息主体
*/
async mediaProducerStatus(message, body) {
const me = this;
const {
roomId,
producerId,
} = body;
const room = me.rooms.get(roomId);
const producer = room?.producers.get(producerId);
if(producer) {
console.debug("查询生产者状态", producerId);
message.body = {
...body,
status: await producer.getStats()
};
me.push(message);
} else {
console.debug("查询生产者状态(无效)", producerId);
}
}
/** /**
* 消费媒体信令 * 消费媒体信令
* *
@@ -1589,54 +1563,86 @@ class Taoyao {
} }
} }
/**
* 查询生产者状态信令
*
* @param {*} message 信令消息
* @param {*} body 消息主体
*/
async mediaProducerStatus(message, body) {
const {
roomId,
producerId,
} = body;
const room = this.rooms.get(roomId);
const producer = room?.producers.get(producerId);
if(!producer) {
console.warn("查询生产者状态(无效生产者)", roomId, producerId);
return;
}
console.debug("查询生产者状态", producerId);
message.body = {
...body,
status: await producer.getStats()
};
this.push(message);
}
/** /**
* 路由RTP协商信令 * 路由RTP协商信令
* *
* @param {*} message 消息 * @param {*} message 信令消息
* @param {*} body 消息主体 * @param {*} body 消息主体
*/ */
mediaRouterRtpCapabilities(message, body) { mediaRouterRtpCapabilities(message, body) {
const me = this; const {
const { roomId } = body; roomId
const room = me.rooms.get(roomId); } = body;
message.body.rtpCapabilities = room?.mediasoupRouter.rtpCapabilities; const room = this.rooms.get(roomId);
message.body = {
...message.body,
rtpCapabilities: room?.mediasoupRouter.rtpCapabilities
};
this.push(message); this.push(message);
} }
/** /**
* 关闭传输通道信令 * 关闭传输通道信令
* *
* @param {*} message 消息 * @param {*} message 信令消息
* @param {*} body 消息主体 * @param {*} body 消息主体
*/ */
async mediaTransportClose(message, body) { async mediaTransportClose(message, body) {
const me = this; const {
const { roomId, transportId } = body; roomId,
const room = me.rooms.get(roomId); transportId
} = body;
const room = this.rooms.get(roomId);
const transport = room?.transports.get(transportId); const transport = room?.transports.get(transportId);
if(transport) { if(!transport) {
console.info("关闭传输通道", transportId); console.info("关闭传输通道(无效通道)", roomId, transportId);
await transport.close(); return;
} else {
console.debug("关闭传输通道(无效)", transportId);
} }
console.debug("关闭传输通道", transportId);
await transport.close();
} }
/** /**
* 创建RTP输入通道信令 * 创建RTP输入通道信令
* *
* @param {*} message 消息 * @param {*} message 信令消息
* @param {*} body 消息主体 * @param {*} body 消息主体
*/ */
async mediaTransportPlainCreate(message, body) { async mediaTransportPlainCreate(message, body) {
const me = this;
const { const {
roomId, roomId,
clientId, clientId,
rtcpMux, rtcpMux,
comedia, comedia,
enableSctp, numSctpStreams, enableSctp,
enableSrtp, srtpCryptoSuite numSctpStreams,
enableSrtp,
srtpCryptoSuite
} = body; } = body;
const plainTransportOptions = { const plainTransportOptions = {
...config.mediasoup.plainTransportOptions, ...config.mediasoup.plainTransportOptions,
@@ -1647,10 +1653,14 @@ class Taoyao {
enableSrtp : enableSrtp, enableSrtp : enableSrtp,
srtpCryptoSuite : srtpCryptoSuite, srtpCryptoSuite : srtpCryptoSuite,
}; };
const room = me.rooms.get(roomId); const room = this.rooms.get(roomId);
if(!room) {
console.warn("创建RTP输入通道无效房间", roomId);
return;
}
const transport = await room?.mediasoupRouter.createPlainTransport(plainTransportOptions); const transport = await room?.mediasoupRouter.createPlainTransport(plainTransportOptions);
console.info("创建RTP输入通道", transport.id); console.debug("创建RTP输入通道", transport.id);
me.transportEvent("plain", roomId, transport); this.transportEvent("plain", roomId, transport);
transport.clientId = clientId; transport.clientId = clientId;
room.transports.set(transport.id, transport); room.transports.set(transport.id, transport);
message.body = { message.body = {
@@ -1660,39 +1670,38 @@ class Taoyao {
port : transport.tuple.localPort, port : transport.tuple.localPort,
rtcpPort : transport.rtcpTuple?.localPort, rtcpPort : transport.rtcpTuple?.localPort,
}; };
me.push(message); this.push(message);
} }
/** /**
* 查询通道状态信令 * 查询通道状态信令
* *
* @param {*} message 消息 * @param {*} message 信令消息
* @param {*} body 消息主体 * @param {*} body 消息主体
*/ */
async mediaTransportStatus(message, body) { async mediaTransportStatus(message, body) {
const me = this;
const { const {
roomId, roomId,
transportId, transportId,
} = body; } = body;
const room = me.rooms.get(roomId); const room = this.rooms.get(roomId);
const transport = room?.transports.get(transportId); const transport = room?.transports.get(transportId);
if(transport) { if(!transport) {
console.debug("查询通道状态", transportId); console.warn("查询通道状态(无效通道)", roomId, transportId);
message.body = { return;
...body,
status: await transport.getStats()
};
me.push(message);
} else {
console.debug("查询通道状态(无效)", transportId);
} }
console.debug("查询通道状态", transportId);
message.body = {
...body,
status: await transport.getStats()
};
this.push(message);
} }
/** /**
* 连接WebRTC通道信令 * 连接WebRTC通道信令
* *
* @param {*} message 消息 * @param {*} message 信令消息
* @param {*} body 消息主体 * @param {*} body 消息主体
*/ */
async mediaTransportWebrtcConnect(message, body) { async mediaTransportWebrtcConnect(message, body) {
@@ -1701,29 +1710,30 @@ class Taoyao {
transportId, transportId,
dtlsParameters dtlsParameters
} = body; } = body;
const room = this.rooms.get(roomId); const room = this.rooms.get(roomId);
const transport = room?.transports.get(transportId); const transport = room?.transports.get(transportId);
if(transport) { if(!transport) {
await transport.connect({ dtlsParameters }); console.warn("连接WebRTC通道无效通道", roomId, transportId);
console.info("连接WebRTC通道", transportId); return;
message.body = {
roomId : roomId,
transportId: transport.id
};
this.push(message);
} else {
console.warn("连接WebRTC通道无效", transportId);
} }
await transport.connect({
dtlsParameters
});
console.debug("连接WebRTC通道", transportId);
message.body = {
roomId : roomId,
transportId: transport.id
};
this.push(message);
} }
/** /**
* 创建WebRTC通道信令 * 创建WebRTC通道信令
* *
* @param {*} message 消息 * @param {*} message 信令消息
* @param {*} body 消息主体 * @param {*} body 消息主体
*/ */
async mediaTransportWebrtcCreate(message, body) { async mediaTransportWebrtcCreate(message, body) {
const me = this;
const { const {
roomId, roomId,
clientId, clientId,
@@ -1734,7 +1744,10 @@ class Taoyao {
} = body; } = body;
const webRtcTransportOptions = { const webRtcTransportOptions = {
...config.mediasoup.webRtcTransportOptions, ...config.mediasoup.webRtcTransportOptions,
appData : { producing, consuming }, appData: {
producing,
consuming
},
enableSctp : Boolean(sctpCapabilities), enableSctp : Boolean(sctpCapabilities),
numSctpStreams: (sctpCapabilities || {}).numStreams, numSctpStreams: (sctpCapabilities || {}).numStreams,
}; };
@@ -1742,13 +1755,17 @@ class Taoyao {
webRtcTransportOptions.enableUdp = false; webRtcTransportOptions.enableUdp = false;
webRtcTransportOptions.enableTcp = true; webRtcTransportOptions.enableTcp = true;
} }
const room = me.rooms.get(roomId); const room = this.rooms.get(roomId);
if(!room) {
console.warn("创建WebRTC通道无效房间", roomId);
return;
}
const transport = await room.mediasoupRouter.createWebRtcTransport({ const transport = await room.mediasoupRouter.createWebRtcTransport({
...webRtcTransportOptions, ...webRtcTransportOptions,
webRtcServer: room.webRtcServer, webRtcServer: room.webRtcServer,
}); });
console.info("创建WebRTC通道", transport.id); console.debug("创建WebRTC通道", transport.id);
me.transportEvent("webrtc", roomId, transport); this.transportEvent("webrtc", roomId, transport);
transport.clientId = clientId; transport.clientId = clientId;
room.transports.set(transport.id, transport); room.transports.set(transport.id, transport);
message.body = { message.body = {
@@ -1759,7 +1776,7 @@ class Taoyao {
dtlsParameters: transport.dtlsParameters, dtlsParameters: transport.dtlsParameters,
sctpParameters: transport.sctpParameters, sctpParameters: transport.sctpParameters,
}; };
me.push(message); this.push(message);
const { const {
maxOutgoingBitrate, maxOutgoingBitrate,
maxIncomingBitrate, maxIncomingBitrate,
@@ -1775,83 +1792,82 @@ class Taoyao {
/** /**
* 通道事件 * 通道事件
* *
* @param {*} type 类型:webrtc|plain|pipe|direct * @param {*} type 类型:pipe|plain|direct|webrtc
* @param {*} roomId 房间ID * @param {*} roomId 房间ID
* @param {*} transport 通道 * @param {*} transport 通道
*/ */
transportEvent(type, roomId, transport) { transportEvent(type, roomId, transport) {
const me = this; const room = this.rooms.get(roomId);
const room = me.rooms.get(roomId); const transportId = transport.id;
/********************* 通用通道事件 *********************/ /********************* 通用通道事件 *********************/
transport.on("routerclose", () => { transport.on("routerclose", () => {
console.info("通道关闭(路由关闭)", transport.id); console.debug("通道关闭(路由关闭)", roomId, transportId);
transport.close(); transport.close();
}); });
transport.on("listenserverclose", () => { transport.on("listenserverclose", () => {
console.info("通道关闭(监听服务关闭)", transport.id); console.debug("通道关闭(监听服务关闭)", roomId, transportId);
transport.close(); transport.close();
}); });
transport.observer.on("close", () => { transport.observer.on("close", () => {
if(room.transports.delete(transport.id)) { if(room.transports.delete(transportId)) {
console.info("通道关闭", transport.id); console.debug("通道关闭", roomId, transportId);
me.push( this.push(protocol.buildMessage("media::transport::close", {
protocol.buildMessage("media::transport::close", { roomId,
roomId : roomId, transportId,
transportId: transport.id, }));
})
);
} else { } else {
console.info("通道关闭(无效)", transport.id); console.info("通道关闭(无效通道", roomId, transportId);
} }
}); });
// transport.observer.on("newproducer", (producer) => {}); // transport.observer.on("newproducer", (producer) => {});
// transport.observer.on("newconsumer", (consumer) => {}); // transport.observer.on("newconsumer", (consumer) => {});
// transport.observer.on("newdataproducer", (dataProducer) => {}); // transport.observer.on("newdataproducer", (dataProducer) => {});
// transport.observer.on("newdataconsumer", (dataConsumer) => {}); // transport.observer.on("newdataconsumer", (dataConsumer) => {});
// transport.observer.on("trace", fn(trace)); // 设置追踪信息
// await transport.enableTraceEvent([ 'bwe', 'probation' ]); // await transport.enableTraceEvent([ 'bwe', 'probation' ]);
// transport.on("trace", (trace) => { // transport.on("trace", (trace) => {});
// console.debug("通道跟踪事件(trace", transport.id, trace); // transport.observer.on("trace", fn(trace));
// }); /********************* pipeTransport通道事件 *********************/
/********************* webRtcTransport通道事件 *********************/ if("pipe" === type) {
if("webrtc" === type) { // transport.on("sctpstatechange", fn(sctpState));
// transport.on("icestatechange", (iceState) => {});
// transport.on("iceselectedtuplechange", (iceSelectedTuple) => {});
// transport.on("dtlsstatechange", (dtlsState) => {});
// transport.on("sctpstatechange", (sctpState) => {});
// transport.observer.on("icestatechange", fn(iceState));
// transport.observer.on("iceselectedtuplechange", fn(iceSelectedTuple));
// transport.observer.on("dtlsstatechange", fn(dtlsState));
// transport.observer.on("sctpstatechange", fn(sctpState)); // transport.observer.on("sctpstatechange", fn(sctpState));
} }
/********************* plainTransport通道事件 *********************/ /********************* plainTransport通道事件 *********************/
if("plain" === type) { if("plain" === type) {
// transport.on("tuple", fn(tuple)); // transport.on("tuple", fn(tuple));
// transport.on("rtcptuple", fn(rtcpTuple)); // transport.on("rtcptuple", fn(rtcpTuple));
// transport.on("sctpstatechange", fn(sctpState)); // transport.on("sctpstatechange", fn(sctpState));
// transport.observer.on("tuple", fn(tuple)); // transport.observer.on("tuple", fn(tuple));
// transport.observer.on("rtcptuple", fn(rtcpTuple)); // transport.observer.on("rtcptuple", fn(rtcpTuple));
// transport.observer.on("sctpstatechange", fn(sctpState));
}
/********************* pipeTransport通道事件 *********************/
if("pipe" === type) {
// transport.on("sctpstatechange", fn(sctpState));
// transport.observer.on("sctpstatechange", fn(sctpState)); // transport.observer.on("sctpstatechange", fn(sctpState));
} }
/********************* directTransport通道事件 *********************/ /********************* directTransport通道事件 *********************/
if("rtcp" === type) { if("direct" === type) {
// transport.on("rtcp", fn(rtcpPacket)); // transport.on("rtcp", fn(rtcpPacket));
} }
/********************* webRtcTransport通道事件 *********************/
if("webrtc" === type) {
// transport.on("icestatechange", (iceState) => {});
// transport.on("iceselectedtuplechange", (iceSelectedTuple) => {});
// transport.on("dtlsstatechange", (dtlsState) => {});
// transport.on("sctpstatechange", (sctpState) => {});
// transport.observer.on("icestatechange", fn(iceState));
// transport.observer.on("iceselectedtuplechange", fn(iceSelectedTuple));
// transport.observer.on("dtlsstatechange", fn(dtlsState));
// transport.observer.on("sctpstatechange", fn(sctpState));
}
} }
/** /**
* 平台异常信令 * 平台异常信令
* *
* @param {*} message 消息 * @param {*} message 信令消息
* @param {*} body 消息主体 * @param {*} body 消息主体
*/ */
platformError(message, body) { platformError(message, body) {
const { code } = message; const {
code
} = message;
if(code === "3401") { if(code === "3401") {
signalChannel.close(); signalChannel.close();
console.warn("授权异常(关闭信令)", message); console.warn("授权异常(关闭信令)", message);
@@ -1863,15 +1879,16 @@ class Taoyao {
/** /**
* 关闭房间信令 * 关闭房间信令
* *
* @param {*} message 消息 * @param {*} message 信令消息
* @param {*} body 消息主体 * @param {*} body 消息主体
*/ */
async roomClose(message, body) { async roomClose(message, body) {
const me = this; const {
const { roomId } = body; roomId
const room = me.rooms.get(roomId); } = body;
const room = this.rooms.get(roomId);
if(!room) { if(!room) {
console.debug("关闭房间(无效)", roomId); console.info("关闭房间(无效房间", roomId);
return; return;
} }
room.closeAll(); room.closeAll();
@@ -1880,21 +1897,25 @@ class Taoyao {
/** /**
* 创建房间信令 * 创建房间信令
* *
* @param {*} message 消息 * @param {*} message 信令消息
* @param {*} body 消息主体 * @param {*} body 消息主体
*/ */
async roomCreate(message, body) { async roomCreate(message, body) {
const me = this; const {
const { roomId } = body; roomId
let room = me.rooms.get(roomId); } = body;
if (room) { if (this.rooms.has(roomId)) {
console.debug("创建房间已经存在", room); console.warn("创建房间已经存在", roomId);
me.push(message); this.push(message);
return; return;
} }
const mediasoupWorker = me.nextMediasoupWorker(); const {
const { mediaCodecs } = config.mediasoup.routerOptions; mediaCodecs
const mediasoupRouter = await mediasoupWorker.createRouter({ mediaCodecs }); } = config.mediasoup.routerOptions;
const mediasoupWorker = this.nextMediasoupWorker();
const mediasoupRouter = await mediasoupWorker.createRouter({
mediaCodecs
});
// 音量监控 // 音量监控
const audioLevelObserver = await mediasoupRouter.createAudioLevelObserver({ const audioLevelObserver = await mediasoupRouter.createAudioLevelObserver({
// 监控周期 // 监控周期
@@ -1908,35 +1929,33 @@ class Taoyao {
const activeSpeakerObserver = await mediasoupRouter.createActiveSpeakerObserver({ const activeSpeakerObserver = await mediasoupRouter.createActiveSpeakerObserver({
interval: 500, interval: 500,
}); });
room = new Room({ const room = new Room({
roomId, roomId,
taoyao: me,
webRtcServer: mediasoupWorker.appData.webRtcServer,
mediasoupRouter, mediasoupRouter,
audioLevelObserver, audioLevelObserver,
activeSpeakerObserver, activeSpeakerObserver,
taoyao : this,
webRtcServer: mediasoupWorker.appData.webRtcServer,
}); });
console.info("创建房间", roomId, mediasoupRouter.id); console.debug("创建房间", roomId, mediasoupRouter.id);
me.rooms.set(roomId, room); this.rooms.set(roomId, room);
me.push(message); this.push(message);
mediasoupRouter.on("workerclose", () => { mediasoupRouter.on("workerclose", () => {
console.info("路由关闭(工作线程关闭)", roomId, mediasoupRouter.id); console.debug("路由关闭(工作线程关闭)", roomId, mediasoupRouter.id);
mediasoupRouter.close(); mediasoupRouter.close();
}); });
mediasoupRouter.observer.on("close", () => { mediasoupRouter.observer.on("close", () => {
if(me.rooms.delete(roomId)) { if(this.rooms.delete(roomId)) {
console.info("路由关闭", roomId, mediasoupRouter.id); console.debug("路由关闭", roomId, mediasoupRouter.id);
room.closeAll(); room.closeAll();
me.push( this.push(protocol.buildMessage("room::close", {
protocol.buildMessage("room::close", { roomId
roomId: roomId }));
})
);
} else { } else {
console.info("路由关闭(无效)", roomId, mediasoupRouter.id); console.info("路由关闭(无效房间", roomId, mediasoupRouter.id);
} }
}); });
// mediasoupRouter.observer.on("newtransport", (transport) => {}); // mediasoupRouter.observer.on("newtransport", (transport) => {});
// mediasoupRouter.observer.on("newrtpobserver", (rtpObserver) => {}); // mediasoupRouter.observer.on("newrtpobserver", (rtpObserver) => {});
} }
}; };

View File

@@ -1905,19 +1905,6 @@ class Taoyao extends RemoteClient {
console.debug("生产者评分", message); console.debug("生产者评分", message);
} }
/**
* 查询生产者状态信令
*
* @param {*} producerId 生产者ID
*/
async mediaProducerStatus(producerId) {
const me = this;
return await me.request(protocol.buildMessage('media::producer::status', {
roomId: me.roomId,
producerId
}));
}
/** /**
* 消费媒体信令 * 消费媒体信令
* *
@@ -2481,6 +2468,18 @@ class Taoyao extends RemoteClient {
} }
} }
/**
* 查询生产者状态信令
*
* @param {*} producerId 生产者ID
*/
async mediaProducerStatus(producerId) {
return await this.request(protocol.buildMessage("media::producer::status", {
producerId,
roomId: this.roomId,
}));
}
/** /**
* 关闭通道信令 * 关闭通道信令
* *

View File

@@ -22,6 +22,13 @@ import com.acgist.taoyao.signal.protocol.ProtocolRoomAdapter;
"roomId" : "房间ID", "roomId" : "房间ID",
"producerId": "生产者ID" "producerId": "生产者ID"
} }
{
"roomId" : "房间ID",
"producerId": "生产者ID",
"status" : [
...状态信息
]
}
""", """,
flow = "终端=>信令服务->媒体服务" flow = "终端=>信令服务->媒体服务"
) )