[+] 重写媒体事件模型

This commit is contained in:
acgist
2023-03-10 08:14:19 +08:00
parent fe7bc4baf9
commit 63a37be492
32 changed files with 558 additions and 162 deletions

View File

@@ -39,16 +39,31 @@ async function buildMediasoupWorkers() {
const webRtcServer = await worker.createWebRtcServer(webRtcServerOptions);
worker.appData.webRtcServer = webRtcServer;
mediasoupWorkers.push(worker);
// 监听事件
worker.on("died", (error) => {
console.warn("Mediasoup Worker停止服务", worker.pid, error);
console.warn("worker died", worker.pid, error);
setTimeout(() => process.exit(1), 2000);
});
worker.observer.on("close", () => {
console.warn("Mediasoup Worker关闭服务", worker.pid);
console.info("worker close", worker.pid);
});
// worker.observer.on("newrouter", fn(router));
// worker.observer.on("newwebrtcserver", fn(router));
// worker.observer.on("newrouter", (router) => {
// console.info("worker newrouter", worker.pid, router.id);
// });
// worker.observer.on("newwebrtcserver", (webRtcServer) => {
// console.info("worker newwebrtcserver", worker.pid, webRtcServer.id);
// });
// webRtcServer.on("workerclose", () => {
// console.info("webRtcServer workerclose", worker.pid, webRtcServer.id);
// });
// webRtcServer.observer.on("close", () => {
// console.info("webRtcServer close", worker.pid, webRtcServer.id);
// });
// webRtcServer.observer.on("webrtctransporthandled", (webRtcTransport) => {
// console.info("webRtcServer webrtctransporthandled", worker.pid, webRtcServer.id, webRtcTransport.id);
// });
// webRtcServer.observer.on("webrtctransportunhandled", (webRtcTransport) => {
// console.info("webRtcServer webrtctransportunhandled", worker.pid, webRtcServer.id, webRtcTransport.id);
// });
}
}

View File

@@ -278,6 +278,7 @@ class Room {
})
);
});
// me.audioLevelObserver.observer.on("silence", fn());
// 音量
me.audioLevelObserver.on("volumes", (volumes) => {
const volumeArray = [];
@@ -292,6 +293,7 @@ class Room {
})
);
});
// me.audioLevelObserver.observer.on("volumes", fn(volumes));
}
/**
* 采样监控
@@ -305,6 +307,7 @@ class Room {
dominantSpeaker.producer.clientId
);
});
// me.activeSpeakerObserver.observer.on("dominantspeaker", fn(dominantSpeaker));
}
/**
* 使用情况
@@ -327,6 +330,7 @@ class Room {
return;
}
me.close = true;
// TODO测试是否需要这里释放
// me.producers.forEach(v => v.close());
// me.consumers.forEach(v => v.close());
// me.dataProducers.forEach(v => v.close());
@@ -560,7 +564,10 @@ class Taoyao {
producer.clientId = clientId;
producer.streamId = streamId;
room.producers.set(producer.id, producer);
// 打分
producer.on("transportclose", () => {
console.info("producer transportclose", producer.id);
producer.close();
});
producer.on("score", (score) => {
self.push(
protocol.buildMessage("media::producer::score", {
@@ -570,23 +577,34 @@ class Taoyao {
})
);
});
producer.on("videoorientationchange", (videoOrientation) => {
logger.debug(
'producer "videoorientationchange" event [producerId:%s, videoOrientation:%o]',
producer.id,
videoOrientation
);
console.info("producer videoorientationchange", producer.id, videoOrientation);
});
producer.on("trace", (trace) => {
logger.debug(
'producer "trace" event [producerId:%s, trace.type:%s, trace:%o]',
producer.id,
trace.type,
trace
);
console.info("producer trace", producer.id, trace);
});
producer.observer.on("close", () => {
if(me.producers.delete(producer.id)) {
console.info("producer close", producer.id);
this.push(
protocol.buildMessage("media::producer::close", {
roomId: roomId,
producerId: producer.id
})
);
} else {
console.info("producer close non", producer.id);
}
});
producer.observer.on("pause", () => {
console.info("producer pause", producer.id);
});
producer.observer.on("resume", () => {
console.info("producer resume", producer.id);
});
// producer.observer.on("score", fn(score));
// producer.observer.on("videoorientationchange", fn(videoOrientation));
// producer.observer.on("trace", fn(trace));
message.body = { kind: kind, producerId: producer.id };
this.push(message);
if (producer.kind === "audio") {
@@ -661,18 +679,19 @@ class Taoyao {
consumer.streamId = streamId;
room.consumers.set(consumer.id, consumer);
consumer.on("transportclose", () => {
console.info("通道关闭同时关闭消费者", consumer.id);
console.info("consumer transportclose", consumer.id);
// 信令服务统一调度关闭
// consumer.close();
// room.consumers.delete(consumer.id);
});
consumer.on("producerclose", () => {
console.info("生产者关闭同时关闭消费者", consumer.id);
console.info("consumer producerclose", consumer.id);
// 信令服务统一调度关闭
// consumer.close();
// room.consumers.delete(consumer.id);
});
consumer.on("producerpause", () => {
console.info("consumer producerpause", consumer.id);
this.push(
protocol.buildMessage("media::consumer::pause", {
consumerId: consumer.id,
@@ -680,6 +699,7 @@ class Taoyao {
);
});
consumer.on("producerresume", () => {
console.info("consumer producerresume", consumer.id);
this.push(
protocol.buildMessage("media::consumer::resume", {
consumerId: consumer.id,
@@ -687,6 +707,7 @@ class Taoyao {
);
});
consumer.on("score", (score) => {
console.info("consumer score", consumer.id, score);
this.push(
protocol.buildMessage("media::consumer::score", {
score: score,
@@ -696,6 +717,7 @@ class Taoyao {
);
});
consumer.on("layerschange", (layers) => {
console.info("consumer layerschange", consumer.id, layers);
this.push(
protocol.buildMessage("media::consumer::layers::change", {
consumerId: consumer.id,
@@ -705,21 +727,43 @@ class Taoyao {
);
});
consumer.on("trace", (trace) => {
logger.debug(
'consumer "trace" event [producerId:%s, trace.type:%s, trace:%o]',
consumer.id,
trace.type,
trace
);
console.info("consumer trace", consumer.id, trace);
});
// consumer.on("rtp", (rtpPacket) => {
// console.info("consumer rtp", consumer.id, rtpPacket);
// });
consumer.observer.on("close", () => {
if(room.consumers.delete(consumer.id)) {
console.debug("consumer close", consumer.id);
this.push(
protocol.buildMessage("media::consumer::close", {
roomId: roomId,
consumerId: consumer.id
})
);
} else {
console.debug("consumer close non", consumer.id);
}
});
consumer.observer.on("pause", () => {
this.push(
protocol.buildMessage("media::consumer::close", {
protocol.buildMessage("media::consumer::pause", {
roomId: roomId,
consumerId: consumer.id
})
);
});
consumer.observer.on("resume", () => {
this.push(
protocol.buildMessage("media::consumer::resume", {
roomId: roomId,
consumerId: consumer.id
})
);
});
// consumer.observer.on("score", fn(score));
// consumer.observer.on("layerschange", fn(layers));
// consumer.observer.on("trace", fn(trace));
// 等待终端准备就绪
this.request(
protocol.buildMessage("media::consume", {
@@ -768,7 +812,6 @@ class Taoyao {
if(consumer) {
console.info("关闭消费者:", consumerId);
consumer.close();
room.consumers.delete(consumerId);
} else {
console.debug("关闭消费者无效:", consumerId);
}
@@ -834,41 +877,65 @@ class Taoyao {
webRtcServer: room.webRtcServer,
});
transport.clientId = clientId;
transport.on("icestatechange", (iceState) => {
console.debug(
"WebRtcTransport icestatechange event",
iceState,
transport.id
);
// 通用事件
transport.on("routerclose", () => {
console.info("transport routerclose", transport.id);
transport.close();
});
transport.on("dtlsstatechange", (dtlsState) => {
console.debug(
"WebRtcTransport dtlsstatechange event",
dtlsState,
transport.id
);
});
transport.on("sctpstatechange", (sctpState) => {
console.debug(
"WebRtcTransport sctpstatechange event",
sctpState,
transport.id
);
transport.on("listenserverclose", () => {
console.info("transport listenserverclose", transport.id);
transport.close();
});
await transport.enableTraceEvent(["bwe"]);
// await transport.enableTraceEvent([ 'probation', 'bwe' ]);
transport.on("trace", (trace) => {
console.debug("transport trace event", transport.id, trace.type, trace);
console.debug("transport trace", transport.id, trace);
});
transport.observer.on("close", () => {
console.info("transport close", transport.id);
});
transport.observer.on("newproducer", (producer) => {
console.info("transport newproducer", transport.id, producer.id);
});
transport.observer.on("newconsumer", (consumer) => {
console.info("transport newconsumer", transport.id, consumer.id);
});
transport.observer.on("newdataproducer", (dataProducer) => {
console.info("transport newdataproducer", transport.id, dataProducer.id);
});
transport.observer.on("newdataconsumer", (dataConsumer) => {
console.info("transport newdataconsumer", transport.id, dataProducer.id);
});
// 可配置的事件
// transport.on("routerclose", fn());
// transport.on("listenserverclose", fn());
// transport.observer.on("close", fn());
// transport.observer.on("newproducer", fn(producer));
// transport.observer.on("newconsumer", fn(consumer));
// transport.observer.on("newdataproducer", fn(dataProducer));
// transport.observer.on("newdataconsumer", fn(dataConsumer));
// transport.observer.on("trace", fn(trace));
/********************* webRtcTransport通道事件 *********************/
// transport.on("icestatechange", (iceState) => {
// console.info("transport icestatechange", transport.id, iceState);
// });
// transport.on("iceselectedtuplechange", (iceSelectedTuple) => {
// console.info("transport iceselectedtuplechange", transport.id, iceSelectedTuple);
// });
// transport.on("dtlsstatechange", (dtlsState) => {
// console.info("transport dtlsstatechange", transport.id, dtlsState);
// });
// transport.on("sctpstatechange", (sctpState) => {
// console.info("transport sctpstatechange", transport.id, sctpState);
// });
// transport.observer.on("icestatechange", fn(iceState));
// transport.observer.on("iceselectedtuplechange", fn(iceSelectedTuple));
// transport.observer.on("dtlsstatechange", fn(dtlsState));
// transport.observer.on("sctpstatechange", fn(sctpState));
/********************* plainTransport通道事件 *********************/
// transport.on("tuple", fn(tuple));
// transport.on("rtcptuple", fn(rtcpTuple));
// transport.on("sctpstatechange", fn(sctpState));
// transport.observer.on("tuple", fn(tuple));
// transport.observer.on("rtcptuple", fn(rtcpTuple));
// transport.observer.on("sctpstatechange", fn(sctpState));
/********************* pipeTransport通道事件 *********************/
// transport.on("sctpstatechange", fn(sctpState));
// transport.observer.on("sctpstatechange", fn(sctpState));
/********************* directTransport通道事件 *********************/
// transport.on("rtcp", fn(rtcpPacket));
room.transports.set(transport.id, transport);
message.body = {
transportId: transport.id,
@@ -913,7 +980,6 @@ class Taoyao {
}
console.info("关闭房间:", roomId);
room.closeAll();
this.rooms.delete(roomId);
}
/**
@@ -954,21 +1020,30 @@ class Taoyao {
activeSpeakerObserver,
});
me.rooms.set(roomId, room);
console.info("创建房间", roomId);
console.info("roomCreate", roomId, mediasoupRouter.id);
me.push(message);
// 监听事件
mediasoupRouter.observer.on("close", () => {
console.info("房间路由关闭:", roomId, mediasoupRouter);
mediasoupRouter.on("workerclose", () => {
console.info("mediasoupRouter workerclose", roomId, mediasoupRouter.id);
room.closeAll();
me.rooms.delete(roomId);
me.push(
protocol.buildMessage("room::close", {
roomId: roomId
})
);
});
// mediasoupRouter.on("workerclose", () => {});
// mediasoupRouter.observer.on("newtransport", fn(transport));
mediasoupRouter.observer.on("close", () => {
if(me.rooms.delete(roomId)) {
console.info("mediasoupRouter close", roomId, mediasoupRouter.id);
me.push(
protocol.buildMessage("room::close", {
roomId: roomId
})
);
} else {
console.info("mediasoupRouter close non", roomId, mediasoupRouter.id);
}
});
// mediasoupRouter.observer.on("newtransport", (transport) => {
// console.info("mediasoupRouter newtransport", roomId, mediasoupRouter.id, transport.id);
// });
// mediasoupRouter.observer.on("newrtpobserver", (rtpObserver) => {
// console.info("mediasoupRouter newrtpobserver", roomId, mediasoupRouter.id, rtpObserver.id);
// });
}
}