[*] 日常优化

This commit is contained in:
acgist
2023-09-21 07:55:34 +08:00
parent c76d32cc58
commit 107a9e301d
4 changed files with 36 additions and 41 deletions

View File

@@ -1129,13 +1129,10 @@ class Taoyao {
/** /**
* 消费数据信令 * 消费数据信令
* *
* TODO重复 * @param {*} message 信令消息
*
* @param {*} message 消息
* @param {*} body 消息主体 * @param {*} body 消息主体
*/ */
async mediaDataConsume(message, body) { async mediaDataConsume(message, body) {
const me = this;
const { const {
roomId, roomId,
clientId, clientId,
@@ -1145,7 +1142,7 @@ class Taoyao {
transportId, transportId,
rtpCapabilities, rtpCapabilities,
} = body; } = body;
const room = me.rooms.get(roomId); const room = this.rooms.get(roomId);
const transport = room?.transports.get(transportId); const transport = room?.transports.get(transportId);
const dataProducer = room?.dataProducers.get(producerId); const dataProducer = room?.dataProducers.get(producerId);
if ( if (
@@ -1169,12 +1166,12 @@ class Taoyao {
dataConsumer.streamId = streamId; dataConsumer.streamId = streamId;
room.dataConsumers.set(dataConsumer.id, dataConsumer); room.dataConsumers.set(dataConsumer.id, dataConsumer);
console.debug("创建数据消费者", dataProducer.id, streamId); console.debug("创建数据消费者", dataProducer.id, streamId);
dataConsumer.on('transportclose', () => { dataConsumer.on("transportclose", () => {
console.info("数据消费者关闭(通道关闭)", dataConsumer.id, streamId); console.debug("数据消费者关闭(通道关闭)", dataConsumer.id, streamId);
dataConsumer.close(); dataConsumer.close();
}); });
dataConsumer.on('dataproducerclose', () => { dataConsumer.on("dataproducerclose", () => {
console.info("数据消费者关闭(生产者关闭)", dataConsumer.id, streamId); console.debug("数据消费者关闭(生产者关闭)", dataConsumer.id, streamId);
dataConsumer.close(); dataConsumer.close();
}); });
// dataConsumer.on("bufferedamountlow", fn(bufferedAmount)); // dataConsumer.on("bufferedamountlow", fn(bufferedAmount));
@@ -1182,7 +1179,7 @@ class Taoyao {
dataConsumer.observer.on("close", () => { dataConsumer.observer.on("close", () => {
if(room.dataConsumers.delete(dataConsumer.id)) { if(room.dataConsumers.delete(dataConsumer.id)) {
console.debug("数据消费者关闭", dataConsumer.id, streamId); console.debug("数据消费者关闭", dataConsumer.id, streamId);
me.push(protocol.buildMessage("media::data::consumer::close", { this.push(protocol.buildMessage("media::data::consumer::close", {
roomId, roomId,
consumerId: dataConsumer.id, consumerId: dataConsumer.id,
})); }));
@@ -1190,20 +1187,18 @@ class Taoyao {
console.debug("数据消费者关闭(数据消费者无效)", dataConsumer.id, streamId); console.debug("数据消费者关闭(数据消费者无效)", dataConsumer.id, streamId);
} }
}); });
me.push( this.push(protocol.buildMessage("media::data::consume", {
protocol.buildMessage("media::data::consume", { roomId : roomId,
roomId : roomId, clientId : clientId,
clientId : clientId, sourceId : sourceId,
sourceId : sourceId, streamId : streamId,
streamId : streamId, producerId : producerId,
producerId : producerId, consumerId : dataConsumer.id,
consumerId : dataConsumer.id, label : dataConsumer.label,
label : dataConsumer.label, appData : dataProducer.appData,
appData : dataProducer.appData, protocol : dataConsumer.protocol,
protocol : dataConsumer.protocol, sctpStreamParameters: dataConsumer.sctpStreamParameters,
sctpStreamParameters: dataConsumer.sctpStreamParameters, }));
})
);
} }
/** /**

View File

@@ -1822,24 +1822,22 @@ class Taoyao extends RemoteClient {
* @param {*} producerId 数据生产者ID * @param {*} producerId 数据生产者ID
*/ */
mediaDataConsume(producerId) { mediaDataConsume(producerId) {
const me = this; if(!this.recvTransport) {
if(!me.recvTransport) { this.platformError("没有连接接收通道");
me.platformError("没有连接接收通道");
return; return;
} }
me.push(protocol.buildMessage("media::data::consume", { this.push(protocol.buildMessage("media::data::consume", {
roomId : me.roomId, producerId,
producerId: producerId, roomId: this.roomId,
})); }));
} }
/** /**
* 消费数据信令 * 消费数据信令
* *
* @param {*} message 消息 * @param {*} message 信令消息
*/ */
async defaultMediaDataConsume(message) { async defaultMediaDataConsume(message) {
const me = this;
const { const {
roomId, roomId,
clientId, clientId,
@@ -1853,7 +1851,7 @@ class Taoyao extends RemoteClient {
sctpStreamParameters, sctpStreamParameters,
} = message.body; } = message.body;
try { try {
const dataConsumer = await me.recvTransport.consumeData({ const dataConsumer = await this.recvTransport.consumeData({
id : consumerId, id : consumerId,
dataProducerId: producerId, dataProducerId: producerId,
label, label,
@@ -1861,9 +1859,9 @@ class Taoyao extends RemoteClient {
protocol, protocol,
sctpStreamParameters, sctpStreamParameters,
}); });
me.dataConsumers.set(dataConsumer.id, dataConsumer); this.dataConsumers.set(dataConsumer.id, dataConsumer);
dataConsumer.on("open", () => { dataConsumer.on("open", () => {
console.debug("数据消费者打开", dataConsumer.id); console.debug("数据消费者打开", dataConsumer.id, streamId);
}); });
dataConsumer.on("transportclose", () => { dataConsumer.on("transportclose", () => {
console.debug("数据消费者关闭(通道关闭)", dataConsumer.id, streamId); console.debug("数据消费者关闭(通道关闭)", dataConsumer.id, streamId);
@@ -1871,10 +1869,10 @@ class Taoyao extends RemoteClient {
}); });
// dataConsumer.observer.on("close", fn()) // dataConsumer.observer.on("close", fn())
dataConsumer.on("close", () => { dataConsumer.on("close", () => {
if(me.dataConsumers.delete(dataConsumer.id)) { if(this.dataConsumers.delete(dataConsumer.id)) {
console.debug("数据消费者关闭", dataConsumer.id, streamId); console.debug("数据消费者关闭", dataConsumer.id, streamId);
} else { } else {
console.debug("数据消费者关闭(无效)", dataConsumer.id, streamId); console.debug("数据消费者关闭(数据消费者无效)", dataConsumer.id, streamId);
} }
}); });
dataConsumer.on("error", (error) => { dataConsumer.on("error", (error) => {
@@ -1884,7 +1882,7 @@ class Taoyao extends RemoteClient {
console.debug("数据消费者消息", dataConsumer.id, streamId, message.toString("UTF-8"), ppid); console.debug("数据消费者消息", dataConsumer.id, streamId, message.toString("UTF-8"), ppid);
}); });
} catch (error) { } catch (error) {
console.error("打开数据消费者异常", error); me.platformError("消费数据异常", error);
} }
} }

View File

@@ -91,6 +91,9 @@ public class MediaConsumeProtocol extends ProtocolRoomAdapter implements Applica
public void execute(String clientId, ClientType clientType, Room room, Client client, Client mediaClient, Message message, Map<String, Object> body) { public void execute(String clientId, ClientType clientType, Room room, Client client, Client mediaClient, Message message, Map<String, Object> body) {
final String producerId = MapUtils.get(body, Constant.PRODUCER_ID); final String producerId = MapUtils.get(body, Constant.PRODUCER_ID);
final Producer producer = room.producer(producerId); final Producer producer = room.producer(producerId);
if(producer == null) {
throw MessageCodeException.of("媒体生产者无效:" + producerId);
}
if(clientType.isClient()) { if(clientType.isClient()) {
// 主动请求消费 || 消费通道准备就绪 // 主动请求消费 || 消费通道准备就绪
this.consume(room, room.clientWrapper(client), producer, message); this.consume(room, room.clientWrapper(client), producer, message);

View File

@@ -22,8 +22,6 @@ import lombok.extern.slf4j.Slf4j;
/** /**
* 消费数据信令 * 消费数据信令
* *
* TODO防止重复消费
*
* @author acgist * @author acgist
*/ */
@Slf4j @Slf4j
@@ -55,7 +53,7 @@ public class MediaDataConsumeProtocol extends ProtocolRoomAdapter {
final String producerId = MapUtils.get(body, Constant.PRODUCER_ID); final String producerId = MapUtils.get(body, Constant.PRODUCER_ID);
final DataProducer dataProducer = room.dataProducer(producerId); final DataProducer dataProducer = room.dataProducer(producerId);
if(dataProducer == null) { if(dataProducer == null) {
throw MessageCodeException.of("没有提供数据生产:" + producerId); throw MessageCodeException.of("数据生产者无效" + producerId);
} }
if(clientType.isClient()) { if(clientType.isClient()) {
final ClientWrapper dataConsumerClientWrapper = room.clientWrapper(client); final ClientWrapper dataConsumerClientWrapper = room.clientWrapper(client);
@@ -73,6 +71,7 @@ public class MediaDataConsumeProtocol extends ProtocolRoomAdapter {
body.put(Constant.RTP_CAPABILITIES, dataConsumerClientWrapper.getRtpCapabilities()); body.put(Constant.RTP_CAPABILITIES, dataConsumerClientWrapper.getRtpCapabilities());
body.put(Constant.SCTP_CAPABILITIES, dataConsumerClientWrapper.getSctpCapabilities()); body.put(Constant.SCTP_CAPABILITIES, dataConsumerClientWrapper.getSctpCapabilities());
mediaClient.push(message); mediaClient.push(message);
log.info("{}主动消费数据:{} - {}", dataConsumerClientId, dataProducerClientId, streamId);
} else if(clientType.isMedia()) { } else if(clientType.isMedia()) {
final String streamId = MapUtils.get(body, Constant.STREAM_ID); final String streamId = MapUtils.get(body, Constant.STREAM_ID);
final String consumerId = MapUtils.get(body, Constant.CONSUMER_ID); final String consumerId = MapUtils.get(body, Constant.CONSUMER_ID);