This commit is contained in:
acgist
2023-02-25 16:01:02 +08:00
parent 523913c2d3
commit 3990d8f2d3
3 changed files with 145 additions and 25 deletions

View File

@@ -573,7 +573,7 @@ class Signal {
}
async mediaConsume(message, body) {
const { roomId, producerId, transportId, rtpCapabilities } = body;
const { roomId, clientId, streamId, producerId, transportId, rtpCapabilities } = body;
const room = this.rooms.get(roomId);
const producer = room.producers.get(producerId);
const transport = room.transports.get(transportId);
@@ -590,6 +590,7 @@ class Signal {
console.warn(
"不能消费媒体:",
roomId,
clientId,
producerId,
transportId,
rtpCapabilities
@@ -613,6 +614,7 @@ class Signal {
console.error(
"创建消费者异常:",
roomId,
clientId,
producerId,
transportId,
rtpCapabilities,
@@ -620,6 +622,8 @@ class Signal {
);
return;
}
consumer.clientId = clientId;
consumer.streamId = streamId;
room.consumers.set(consumer.id, consumer);
consumer.on("transportclose", () => {
room.consumers.delete(consumer.id);
@@ -677,6 +681,8 @@ class Signal {
kind: consumer.kind,
type: consumer.type,
roomId: roomId,
clientId: clientId,
streamId: streamId,
producerId: producerId,
consumerId: consumer.id,
rtpParameters: consumer.rtpParameters,

View File

@@ -49,21 +49,22 @@ export default {
*
* @return 是否继续执行
*/
callback(data, error) {
async callback(data, error) {
let self = this;
if(data.header.signal === "platform::error") {
console.error("发生异常:", data, error);
return false;
}
switch (data.header.signal) {
case "client::config":
self.roomVisible = true;
break;
case "client::register":
if(data.code === '3401') {
self.signalVisible = true;
}
self.signalVisible = data.code !== '0000';
return true;
case "platform::error":
if(error) {
console.error("发生异常:", data, error);
} else {
console.warn("发生错误:", data);
}
break;
}
return false;
},

View File

@@ -128,10 +128,11 @@ const signalChannel = {
/**
* 回调策略:
* 1. 如果注册请求回调同时执行结果返回true不再执行后面所有回调。
* 2. 如果注册全局回调同时执行结果返回true不再执行后面所有回调
* 3. 如果前面所有回调没有返回true执行默认回调。
* 2. 执行前置回调
* 3. 如果注册全局回调同时执行结果返回true不再执行后面所有回调。
* 3. 执行后置回调
*/
self.channel.onmessage = function (e) {
self.channel.onmessage = async function (e) {
console.debug("信令通道消息:", e.data);
let done = false;
const message = JSON.parse(e.data);
@@ -143,13 +144,17 @@ const signalChannel = {
self.callbackMapping.delete(message.header.id);
}
}
// 前置回调
if (!done) {
await self.preCallback(message);
}
// 全局回调
if (!done && self.callback) {
done = self.callback(message);
done = await self.callback(message);
}
// 默认回调
// 后置回调
if (!done) {
self.defaultCallback(message);
await self.postCallback(message);
}
};
});
@@ -243,26 +248,38 @@ const signalChannel = {
clearTimeout(self.reconnectTimer);
},
/**
* 默认回调
* 前置回调
*
* @param {*} message 消息
* @param {*} message
*/
defaultCallback(message) {
let self = this;
console.debug("没有适配信令消息执行默认处理", message);
async preCallback(message) {
const self = this;
switch (message.header.signal) {
case "client::config":
self.defaultClientConfig(message);
break;
case "client::register":
console.info("终端注册成功");
break;
case "media::consume":
await self.taoyao.consumeMedia(message);
break;
}
},
/**
* 后置回调
*
* @param {*} message 消息
*/
async postCallback(message) {
const self = this;
switch (message.header.signal) {
case "client::reboot":
self.defaultClientReboot(message);
break;
case "client::shutdown":
self.defaultClientShutdown(message);
break;
case "client::register":
console.info("终端注册成功");
break;
case "platform::error":
self.callbackError(message);
break;
@@ -426,7 +443,11 @@ class Taoyao {
callbackError(message, error) {
const self = this;
if (!self.callback) {
console.warn("没有注册回调:", message, error);
if (error) {
console.error("没有注册回调:", message, error);
} else {
console.warn("没有注册回调:", message);
}
}
// 错误回调
self.callback(
@@ -641,7 +662,8 @@ class Taoyao {
self
.request(
protocol.buildMessage("media::transport::webrtc::connect", {
transportId: this.recvTransport.id,
roomId: self.roomId,
transportId: self.recvTransport.id,
dtlsParameters,
})
)
@@ -757,6 +779,97 @@ class Taoyao {
* TODO重复点击
*/
async produceVideo() {}
/**
* 消费媒体
*
* @param {*} message
* @returns
*/
async consumeMedia(message) {
const self = this;
if (!self.consume) {
console.log("没有消费媒体");
return;
}
const {
kind,
type,
roomId,
clientId,
streamId,
producerId,
consumerId,
rtpParameters,
appData,
producerPaused,
} = message.body;
try {
const consumer = await self.recvTransport.consume({
id: consumerId,
kind,
producerId,
rtpParameters,
// NOTE: Force streamId to be same in mic and webcam and different
// in screen sharing so libwebrtc will just try to sync mic and
// webcam streams from the same remote peer.
//streamId: `${peerId}-${appData.share ? "share" : "mic-webcam"}`,
streamId: `${clientId}-${appData.share ? "share" : "mic-webcam"}`,
appData, // Trick.
});
consumer.clientId = clientId;
consumer.streamId = streamId;
self.consumers.set(consumer.id, consumer);
consumer.on("transportclose", () => {
self.consumers.delete(consumer.id);
});
const { spatialLayers, temporalLayers } =
mediasoupClient.parseScalabilityMode(
consumer.rtpParameters.encodings[0].scalabilityMode
);
// store.dispatch(
// stateActions.addConsumer(
// {
// id: consumer.id,
// type: type,
// locallyPaused: false,
// remotelyPaused: producerPaused,
// rtpParameters: consumer.rtpParameters,
// spatialLayers: spatialLayers,
// temporalLayers: temporalLayers,
// preferredSpatialLayer: spatialLayers - 1,
// preferredTemporalLayer: temporalLayers - 1,
// priority: 1,
// codec: consumer.rtpParameters.codecs[0].mimeType.split("/")[1],
// track: consumer.track,
// },
// peerId
// )
// );
self.push(message);
console.log(consumer)
const audioElem = document.createElement('video');
document.getElementsByTagName('body')[0].appendChild(audioElem)
const stream = new MediaStream();
stream.addTrack(consumer.track);
audioElem.srcObject = stream;
audioElem.play()
.catch((error) => logger.warn('audioElem.play() failed:%o', error));
// If audio-only mode is enabled, pause it.
if (consumer.kind === "video" && !self.videoProduce) {
// this.pauseConsumer(consumer);
// TODO实现
}
} catch (error) {
self.callbackError("消费媒体异常", error);
}
}
/**
* 验证设备
*/