[*] 优化媒体消费逻辑

This commit is contained in:
acgist
2023-03-05 16:02:46 +08:00
parent 38b7db4dfe
commit a50fb071d7
13 changed files with 173 additions and 83 deletions

View File

@@ -1,5 +1,6 @@
package com.acgist.taoyao.signal.event;
import com.acgist.taoyao.signal.party.media.ClientWrapper;
import com.acgist.taoyao.signal.party.media.Producer;
import com.acgist.taoyao.signal.party.media.Room;
@@ -7,7 +8,7 @@ import lombok.Getter;
import lombok.Setter;
/**
* 媒体生产事件
* 生产媒体事件
*
* @author acgist
*/
@@ -21,10 +22,21 @@ public class MediaProduceEvent extends RoomEventAdapter {
* 生产者
*/
private final Producer producer;
/**
* 消费者
*/
private final ClientWrapper clientWrapper;
public MediaProduceEvent(Room room, Producer producer) {
super(room);
this.producer = producer;
this.clientWrapper = null;
}
public MediaProduceEvent(Room room, ClientWrapper clientWrapper) {
super(room);
this.producer = null;
this.clientWrapper = clientWrapper;
}
}

View File

@@ -7,13 +7,13 @@ import lombok.Getter;
import lombok.Setter;
/**
* 房间终端列表事件
* 进入房间事件
*
* @author acgist
*/
@Getter
@Setter
public class RoomClientListEvent extends RoomEventAdapter {
public class RoomEnterEvent extends RoomEventAdapter {
private static final long serialVersionUID = 1L;
@@ -22,7 +22,7 @@ public class RoomClientListEvent extends RoomEventAdapter {
*/
private final Client client;
public RoomClientListEvent(Room room, Client client) {
public RoomEnterEvent(Room room, Client client) {
super(room);
this.client = client;
}

View File

@@ -96,10 +96,18 @@ public class ClientWrapper implements AutoCloseable {
* 生产者
*/
private final Map<String, Producer> producers;
/**
* 消费者
*/
private final Map<String, Consumer> consumers;
/**
* 数据通道生产者
*/
private final Map<String, DataProducer> dataProducers;
/**
* 数据通道消费者
*/
private final Map<String, DataProducer> dataConsumers;
public ClientWrapper(Room room, Client client) {
this.room = room;
@@ -107,7 +115,9 @@ public class ClientWrapper implements AutoCloseable {
this.roomId = room.getRoomId();
this.clientId = client.clientId();
this.producers = new ConcurrentHashMap<>();
this.consumers = new ConcurrentHashMap<>();
this.dataProducers = new ConcurrentHashMap<>();
this.dataConsumers = new ConcurrentHashMap<>();
}
/**
@@ -128,18 +138,32 @@ public class ClientWrapper implements AutoCloseable {
}
/**
* 是否已经消费
* @param producer 生产者
*
* @param producer
* @return
* @return 是否已经消费
*/
public boolean consume(Producer producer) {
return this.producers.values().stream()
.anyMatch(v -> v.getConsumers().values().stream().anyMatch(c -> c.getProducer() == producer));
return this.consumers.values().stream()
.anyMatch(v -> v.getProducer() == producer);
}
/**
* 删除消费者
*
* @param wrapper 消费者终端保证期
*/
public void remove(ClientWrapper wrapper) {
this.consumers.entrySet().stream()
.filter(v -> v.getValue().getConsumeClient() == wrapper)
.map(Map.Entry::getKey)
.forEach(this.consumers::remove);
// TODO资源释放
this.producers.values().forEach(v -> v.remove(wrapper));
}
@Override
public void close() throws Exception {
// TODO释放资源
}
}

View File

@@ -44,4 +44,17 @@ public class Producer {
this.consumers = new ConcurrentHashMap<>();
}
/**
* 删除消费者
*
* @param consumer 消费者
*/
public void remove(ClientWrapper consumer) {
this.consumers.entrySet().stream()
.filter(v -> v.getValue().getConsumeClient() == consumer)
.map(Map.Entry::getKey)
.forEach(this.consumers::remove);
// TODO资源释放
}
}

View File

@@ -6,9 +6,12 @@ import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import org.springframework.context.ApplicationContext;
import com.acgist.taoyao.boot.model.Message;
import com.acgist.taoyao.signal.client.Client;
import com.acgist.taoyao.signal.client.ClientStatus;
import com.acgist.taoyao.signal.event.RoomLeaveEvent;
import lombok.Getter;
import lombok.Setter;
@@ -37,11 +40,19 @@ public class Room implements Closeable {
/**
* 状态
*/
private RoomStatus roomStatus;
private final RoomStatus roomStatus;
/**
* 媒体服务
*/
private Client mediaClient;
/**
* 房间管理
*/
private final RoomManager roomManager;
/**
* 系统上下文
*/
private final ApplicationContext applicationContext;
/**
* 终端
*/
@@ -50,9 +61,11 @@ public class Room implements Closeable {
/**
* @param mediaClient 媒体服务
*/
public Room(Client mediaClient) {
public Room(Client mediaClient, RoomManager roomManager, ApplicationContext applicationContext) {
this.roomStatus = new RoomStatus();
this.mediaClient = mediaClient;
this.roomManager = roomManager;
this.applicationContext = applicationContext;
this.clients = new ConcurrentHashMap<>();
}
@@ -96,13 +109,18 @@ public class Room implements Closeable {
synchronized (this.clients) {
final ClientWrapper wrapper = this.clients.remove(client);
if(wrapper != null) {
this.roomStatus.setClientSize(this.roomStatus.getClientSize() - 1);
// 删除消费者
this.clients.values().stream()
.filter(v -> v != wrapper)
.forEach(v -> v.remove(wrapper));
// TODO删除数据消费者
this.applicationContext.publishEvent(new RoomLeaveEvent(this, client));
try {
wrapper.close();
} catch (Exception e) {
log.error("终端关闭异常", e);
}
this.roomStatus.setClientSize(this.roomStatus.getClientSize() - 1);
// TODOleave事件
}
}
}
@@ -198,6 +216,8 @@ public class Room implements Closeable {
public void close() {
log.info("关闭房间:{}", this.roomId);
// TODO关闭房间
// TODO:媒体服务
this.roomManager.remove(this);
}
}

View File

@@ -5,6 +5,8 @@ import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CopyOnWriteArrayList;
import org.springframework.context.ApplicationContext;
import com.acgist.taoyao.boot.annotation.Manager;
import com.acgist.taoyao.boot.config.Constant;
import com.acgist.taoyao.boot.model.Message;
@@ -26,15 +28,17 @@ public class RoomManager {
private final IdService idService;
private final ClientManager clientManager;
private final ApplicationContext applicationContext;
/**
* 房间列表
*/
private final List<Room> rooms;
public RoomManager(IdService idService, ClientManager clientManager) {
public RoomManager(IdService idService, ClientManager clientManager, ApplicationContext applicationContext) {
this.idService = idService;
this.clientManager = clientManager;
this.applicationContext = applicationContext;
this.rooms = new CopyOnWriteArrayList<>();
}
@@ -117,7 +121,7 @@ public class RoomManager {
}
final String roomId = this.idService.buildUuid();
// 房间
final Room room = new Room(mediaClient);
final Room room = new Room(mediaClient, this, this.applicationContext);
room.setRoomId(roomId);
room.setPassword(password);
// 状态
@@ -133,22 +137,6 @@ public class RoomManager {
this.rooms.add(room);
return room;
}
/**
* 关闭房间
*
* @param roomId 房间标识
*/
public void close(String roomId) {
final Room room = this.room(roomId);
if(room == null) {
log.warn("关闭房间无效:{}", roomId);
return;
}
if(this.rooms.remove(room)) {
// TODO:媒体服务
}
}
/**
* 离开房间
@@ -159,4 +147,13 @@ public class RoomManager {
this.rooms.forEach(v -> v.leave(client));
}
/**
* 删除房间
*
* @param room 房间
*/
public void remove(Room room) {
this.rooms.remove(room);
}
}

View File

@@ -10,6 +10,7 @@ import com.acgist.taoyao.boot.annotation.Description;
import com.acgist.taoyao.boot.annotation.Protocol;
import com.acgist.taoyao.boot.config.Constant;
import com.acgist.taoyao.boot.model.Message;
import com.acgist.taoyao.boot.model.MessageCodeException;
import com.acgist.taoyao.boot.utils.MapUtils;
import com.acgist.taoyao.signal.client.Client;
import com.acgist.taoyao.signal.client.ClientType;
@@ -31,9 +32,15 @@ import lombok.extern.slf4j.Slf4j;
@Slf4j
@Protocol
@Description(
memo = """
消费媒体主动消费、终端生成媒体、终端创建WebRTC消费通道
终端生产媒体当前房间所有终端根据订阅类型自动消费媒体
终端创建WebRTC消费通道根据订阅类型自动消费当前房间已有媒体
""",
flow = {
"信令服务->媒体服务=>信令服务=>终端->信令服务->信令服务",
"终端->信令服务->媒体服务=>信令服务=>终端->信令服务->信令服务"
"终端-[生产媒体]>信令服务-[其他终端消费])信令服务",
"终端-[创建WebRTC消费通道]>信令服务-[消费其他终端])信令服务",
"终端->信令服务->媒体服务=>信令服务->终端->信令服务->媒体服务"
}
)
public class MediaConsumeProtocol extends ProtocolRoomAdapter implements ApplicationListener<MediaProduceEvent> {
@@ -48,13 +55,25 @@ public class MediaConsumeProtocol extends ProtocolRoomAdapter implements Applica
@Override
public void onApplicationEvent(MediaProduceEvent event) {
final Room room = event.getRoom();
final Producer producer = event.getProducer();
final ClientWrapper clientWrapper = producer.getProduceClient();
final Client client = clientWrapper.getClient();
room.getClients().values().stream()
.filter(v -> v.getClient() != client)
.filter(v -> v.getSubscribeType().consume(producer))
.forEach(v -> this.consume(room, v, producer));
if(event.getProducer() != null) {
// 生产媒体:其他终端消费
final Producer producer = event.getProducer();
final ClientWrapper produceClientWrapper = producer.getProduceClient();
room.getClients().values().stream()
.filter(v -> v != produceClientWrapper)
.filter(v -> v.getSubscribeType().consume(producer))
.forEach(v -> this.consume(room, v, producer));
} else if(event.getClientWrapper() != null) {
// 创建WebRTC消费通道消费其他终端
final ClientWrapper consumeClientWrapper = event.getClientWrapper();
room.getClients().values().stream()
.filter(v -> v != consumeClientWrapper)
.flatMap(v -> v.getProducers().values().stream())
.filter(v -> consumeClientWrapper.getSubscribeType().consume(v))
.forEach(producer -> this.consume(room, consumeClientWrapper, producer));
} else {
throw MessageCodeException.of("消费媒体失败");
}
}
@Override
@@ -62,51 +81,55 @@ public class MediaConsumeProtocol extends ProtocolRoomAdapter implements Applica
final String producerId = MapUtils.get(body, Constant.PRODUCER_ID);
final Producer producer = room.producer(producerId);
if(clientType == ClientType.WEB || clientType == ClientType.CAMERA) {
// 请求消费
// 请求消费媒体
this.consume(room, room.clientWrapper(client), producer);
} else if(clientType == ClientType.MEDIA) {
// 等待消费者准备完成
// 通知终端准备:准备完成再次请求消费媒体结束媒体服务等待
final String kind = MapUtils.get(body, Constant.KIND);
final String streamId = MapUtils.get(body, Constant.STREAM_ID);
final String consumerId = MapUtils.get(body, Constant.CONSUMER_ID);
final String consumeClientId = MapUtils.get(body, Constant.CLIENT_ID);
final ClientWrapper consumeClientWrapper = room.clientWrapper(consumeClientId);
final Map<String, Consumer> consumers = producer.getConsumers();
final Consumer consumer = consumers.get(producerId);
if(consumer != null) {
final Map<String, Consumer> consumers = consumeClientWrapper.getConsumers();
final Map<String, Consumer> producerConsumers = producer.getConsumers();
final Consumer consumer = new Consumer(consumeClientWrapper, producer, kind, streamId, consumerId);
final Consumer oldConsumer = consumers.put(producerId, consumer);
final Consumer oldProducerConsumer = producerConsumers.put(consumerId, consumer);
if(oldConsumer != null || oldProducerConsumer != null) {
log.warn("消费者已经存在:{}", consumerId);
// TODO关闭旧的
// consumer.close();
// TODO关闭旧的
}
final Client consumeClient = consumeClientWrapper.getClient();
consumers.put(consumerId, new Consumer(consumeClientWrapper, producer, kind, streamId, consumerId));
consumeClient.push(message);
} else {
// TODOlog
this.logNoAdapter(clientType);
}
}
/**
* 消费媒体
*
* @param room
* @param consumeClient
* @param producer
* @param room 房间
* @param consumeClientWrapper 消费者终端包装器
* @param producer 生产者
*/
private void consume(Room room, ClientWrapper consumeClientWrapper, Producer producer) {
// TODO掉线删除
if(producer.getProduceClient().consume(producer)) {
log.debug("已经消费:{}", consumeClientWrapper.getClientId());
if(consumeClientWrapper.consume(producer)) {
// TODO回调媒体服务准备完成
if(log.isDebugEnabled()) {
log.debug("已经消费:{}", consumeClientWrapper.getClientId(), producer.getStreamId());
}
return;
}
final Client mediaClient = room.getMediaClient();
final Transport recvTransport = consumeClientWrapper.getRecvTransport();
final Map<String, Object> body = new HashMap<>();
final String clientId = consumeClientWrapper.getClientId();
final String streamId = producer.getStreamId() + "->" + clientId;
final Client mediaClient = room.getMediaClient();
final Transport recvTransport = consumeClientWrapper.getRecvTransport();
final ClientWrapper produceClientWrapper = producer.getProduceClient();
final Map<String, Object> body = new HashMap<>();
body.put(Constant.ROOM_ID, room.getRoomId());
body.put(Constant.CLIENT_ID, clientId);
body.put(Constant.SOURCE_ID, producer.getProduceClient().getClientId());
body.put(Constant.SOURCE_ID, produceClientWrapper.getClientId());
body.put(Constant.STREAM_ID, streamId);
body.put(Constant.PRODUCER_ID, producer.getProducerId());
body.put(Constant.TRANSPORT_ID, recvTransport.getTransportId());

View File

@@ -75,7 +75,7 @@ public class MediaTransportWebRtcCreateProtocol extends ProtocolRoomAdapter {
}
// 拷贝属性
recvTransport.copy(responseBody);
this.produce(room, clientWrapper);
this.publishEvent(new MediaProduceEvent(room, clientWrapper));
}
// 生产者
final Boolean producing = MapUtils.getBoolean(body, Constant.PRODUCING);
@@ -114,17 +114,4 @@ public class MediaTransportWebRtcCreateProtocol extends ProtocolRoomAdapter {
});
}
/**
* 生产数据
*
* @param room
* @param clientWrapper
*/
private void produce(Room room, ClientWrapper clientWrapper) {
room.getClients().values().stream()
.filter(v -> v != clientWrapper)
.flatMap(v -> v.getProducers().values().stream())
.forEach(producer -> this.publishEvent(new MediaProduceEvent(room, producer)));
}
}

View File

@@ -10,7 +10,7 @@ import com.acgist.taoyao.boot.annotation.Protocol;
import com.acgist.taoyao.boot.model.Message;
import com.acgist.taoyao.signal.client.Client;
import com.acgist.taoyao.signal.client.ClientType;
import com.acgist.taoyao.signal.event.RoomClientListEvent;
import com.acgist.taoyao.signal.event.RoomEnterEvent;
import com.acgist.taoyao.signal.party.media.Room;
import com.acgist.taoyao.signal.protocol.ProtocolRoomAdapter;
@@ -53,7 +53,7 @@ import com.acgist.taoyao.signal.protocol.ProtocolRoomAdapter;
},
flow = "终端=>信令服务->终端"
)
public class RoomClientListProtocol extends ProtocolRoomAdapter implements ApplicationListener<RoomClientListEvent> {
public class RoomClientListProtocol extends ProtocolRoomAdapter implements ApplicationListener<RoomEnterEvent> {
public static final String SIGNAL = "room::client::list";
@@ -63,7 +63,7 @@ public class RoomClientListProtocol extends ProtocolRoomAdapter implements Appli
@Async
@Override
public void onApplicationEvent(RoomClientListEvent event) {
public void onApplicationEvent(RoomEnterEvent event) {
final Room room = event.getRoom();
final Client client = event.getClient();
client.push(this.build(room.clientStatus()));

View File

@@ -40,7 +40,6 @@ public class RoomCloseProtocol extends ProtocolRoomAdapter {
} else if(clientType == ClientType.MEDIA) {
room.close();
room.broadcast(message);
// TODO移除
} else {
this.logNoAdapter(clientType);
}

View File

@@ -13,7 +13,7 @@ import com.acgist.taoyao.boot.model.MessageCodeException;
import com.acgist.taoyao.boot.utils.MapUtils;
import com.acgist.taoyao.signal.client.Client;
import com.acgist.taoyao.signal.client.ClientType;
import com.acgist.taoyao.signal.event.RoomClientListEvent;
import com.acgist.taoyao.signal.event.RoomEnterEvent;
import com.acgist.taoyao.signal.party.media.ClientWrapper;
import com.acgist.taoyao.signal.party.media.ClientWrapper.SubscribeType;
import com.acgist.taoyao.signal.party.media.Room;
@@ -90,8 +90,8 @@ public class RoomEnterProtocol extends ProtocolRoomAdapter {
Constant.STATUS, client.status()
));
room.broadcast(message);
// 房间终端列表事件
this.publishEvent(new RoomClientListEvent(room, client));
// 进入房间事件
this.publishEvent(new RoomEnterEvent(room, client));
}
}