[*] 日常优化

This commit is contained in:
acgist
2023-08-03 08:17:14 +08:00
parent 2a1acabcba
commit 49805077ec
7 changed files with 70 additions and 26 deletions

View File

@@ -905,6 +905,7 @@ class Taoyao {
// })
// );
if (producer.kind === "audio") {
// TODO关闭生产者时移除监听
room.audioLevelObserver
.addProducer({ producerId: producer.id })
.catch((error) => {

View File

@@ -1439,7 +1439,10 @@ class Taoyao extends RemoteClient {
*/
defaultMediaConsumerClose(message) {
const me = this;
const { roomId, consumerId } = message.body;
const {
roomId,
consumerId
} = message.body;
const consumer = me.consumers.get(consumerId);
if (consumer) {
console.debug("关闭消费者", consumerId);
@@ -1455,7 +1458,7 @@ class Taoyao extends RemoteClient {
*
* @param {*} consumerId 消费者ID
*/
mediaConsumerPause(consumerId) {
async mediaConsumerPause(consumerId) {
const me = this;
const consumer = me.consumers.get(consumerId);
if(consumer) {
@@ -1463,7 +1466,7 @@ class Taoyao extends RemoteClient {
return;
}
console.debug("暂停消费者", consumerId);
me.push(protocol.buildMessage("media::consumer::pause", {
await me.request(protocol.buildMessage("media::consumer::pause", {
roomId : me.roomId,
consumerId: consumerId,
}));
@@ -1479,7 +1482,10 @@ class Taoyao extends RemoteClient {
*/
defaultMediaConsumerPause(message) {
const me = this;
const { roomId, consumerId } = message.body;
const {
roomId,
consumerId
} = message.body;
const consumer = me.consumers.get(consumerId);
if (consumer) {
console.debug("暂停消费者", consumerId);
@@ -1516,7 +1522,7 @@ class Taoyao extends RemoteClient {
*
* @param {*} consumerId 消费者ID
*/
mediaConsumerResume(consumerId) {
async mediaConsumerResume(consumerId) {
const me = this;
const consumer = me.consumers.get(consumerId);
if(consumer) {
@@ -1524,7 +1530,7 @@ class Taoyao extends RemoteClient {
return;
}
console.debug("恢复消费者", consumerId);
me.push(protocol.buildMessage("media::consumer::resume", {
await me.request(protocol.buildMessage("media::consumer::resume", {
roomId : me.roomId,
consumerId: consumerId,
}));
@@ -1540,7 +1546,10 @@ class Taoyao extends RemoteClient {
*/
defaultMediaConsumerResume(message) {
const me = this;
const { roomId, consumerId } = message.body;
const {
roomId,
consumerId
} = message.body;
const consumer = me.consumers.get(consumerId);
if (consumer) {
console.debug("恢复消费者", consumerId);

View File

@@ -3,6 +3,7 @@ package com.acgist.taoyao.signal.party.room;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import com.acgist.taoyao.boot.model.Message;
import com.acgist.taoyao.signal.client.Client;
import com.acgist.taoyao.signal.party.media.Consumer;
import com.acgist.taoyao.signal.party.media.DataConsumer;
@@ -118,6 +119,26 @@ public class ClientWrapper implements AutoCloseable {
.anyMatch(v -> v.getDataProducer() == dataProducer);
}
/**
* 推送消息
*
* @param message 消息
*/
public void push(Message message) {
this.client.push(message);
}
/**
* 请求消息
*
* @param message 请求
*
* @return 响应
*/
public Message request(Message message) {
return this.client.request(message);
}
@Override
public void close() {
// 注意:不要关闭终端(只是离开房间)

View File

@@ -27,15 +27,17 @@ import lombok.extern.slf4j.Slf4j;
@Slf4j
@Protocol
@Description(
memo = "关闭通过回调实现所以不能同步响应",
body = """
{
"roomId": "房间ID"
"roomId" : "房间ID"
"consumerId": "消费者ID"
}
""",
flow = {
"媒体服务->信令服务-)终端",
"终端->信令服务->媒体服务->信令服务+)终端"
"媒体服务->信令服务->终端",
"信令服务->媒体服务->信令服务->终端",
"终端->信令服务->媒体服务->信令服务->终端"
}
)
public class MediaConsumerCloseProtocol extends ProtocolRoomAdapter implements ApplicationListener<MediaConsumerCloseEvent> {
@@ -69,9 +71,8 @@ public class MediaConsumerCloseProtocol extends ProtocolRoomAdapter implements A
if(clientType.mediaClient()) {
consumer.close();
} else if(clientType.mediaServer()) {
// TODO路由到真实消费者
consumer.remove();
room.broadcast(message);
consumer.getConsumerClient().push(message);
} else {
this.logNoAdapter(clientType);
}

View File

@@ -17,20 +17,23 @@ import com.acgist.taoyao.signal.party.media.Consumer;
import com.acgist.taoyao.signal.party.room.Room;
import com.acgist.taoyao.signal.protocol.ProtocolRoomAdapter;
import lombok.extern.slf4j.Slf4j;
/**
* 暂停消费者信令
*
* @author acgist
*/
@Slf4j
@Protocol
@Description(
body = """
{
"roomId": "房间ID"
"roomId" : "房间ID"
"consumerId": "消费者ID"
}
""",
flow = "终端->信令服务->媒体服务->信令服务->终端"
flow = "终端=>信令服务->媒体服务"
)
public class MediaConsumerPauseProtocol extends ProtocolRoomAdapter implements ApplicationListener<MediaConsumerPauseEvent> {
@@ -54,13 +57,16 @@ public class MediaConsumerPauseProtocol extends ProtocolRoomAdapter implements A
@Override
public void execute(String clientId, ClientType clientType, Room room, Client client, Client mediaClient, Message message, Map<String, Object> body) {
final String consumerId = MapUtils.get(body, Constant.CONSUMER_ID);
final Consumer consumer = room.consumer(consumerId);
if(consumer == null) {
log.debug("消费者无效:{} - {}", consumerId, clientType);
return;
}
if(clientType.mediaClient()) {
final String consumerId = MapUtils.get(body, Constant.CONSUMER_ID);
final Consumer consumer = room.consumer(consumerId);
consumer.pause();
} else if(clientType.mediaServer()) {
// TODO路由到真实消费者
room.broadcast(message);
consumer.getConsumerClient().push(message);
} else {
this.logNoAdapter(clientType);
}

View File

@@ -21,11 +21,11 @@ import com.acgist.taoyao.signal.protocol.ProtocolRoomAdapter;
@Description(
body = """
{
"roomId": "房间ID",
"roomId" : "房间ID",
"consumerId": "消费者ID"
}
""",
flow = "终端->信令服务->媒体服务->信令服务->终端"
flow = "终端->信令服务->媒体服务"
)
public class MediaConsumerRequestKeyFrameProtocol extends ProtocolRoomAdapter {

View File

@@ -17,20 +17,23 @@ import com.acgist.taoyao.signal.party.media.Consumer;
import com.acgist.taoyao.signal.party.room.Room;
import com.acgist.taoyao.signal.protocol.ProtocolRoomAdapter;
import lombok.extern.slf4j.Slf4j;
/**
* 恢复消费者信令
*
* @author acgist
*/
@Slf4j
@Protocol
@Description(
body = """
{
"roomId": "房间ID"
"roomId" : "房间ID"
"consumerId": "消费者ID"
}
""",
flow = "终端->信令服务->媒体服务->信令服务->终端"
flow = "终端=>信令服务->媒体服务"
)
public class MediaConsumerResumeProtocol extends ProtocolRoomAdapter implements ApplicationListener<MediaConsumerResumeEvent> {
@@ -54,13 +57,16 @@ public class MediaConsumerResumeProtocol extends ProtocolRoomAdapter implements
@Override
public void execute(String clientId, ClientType clientType, Room room, Client client, Client mediaClient, Message message, Map<String, Object> body) {
final String consumerId = MapUtils.get(body, Constant.CONSUMER_ID);
final Consumer consumer = room.consumer(consumerId);
if(consumer == null) {
log.debug("消费者无效:{} - {}", consumerId, clientType);
return;
}
if(clientType.mediaClient()) {
final String consumerId = MapUtils.get(body, Constant.CONSUMER_ID);
final Consumer consumer = room.consumer(consumerId);
consumer.resume();
} else if(clientType.mediaServer()) {
// TODO路由到真实消费者
room.broadcast(message);
consumer.getConsumerClient().push(message);
} else {
this.logNoAdapter(clientType);
}