[*] 日常优化

This commit is contained in:
acgist
2023-07-25 08:05:34 +08:00
parent c0657770e8
commit 4fbfd6a29e
31 changed files with 521 additions and 487 deletions

View File

@@ -56,7 +56,7 @@ public class ClientManager {
* @param message 消息
*/
public void unicast(String to, Message message) {
this.clients().stream()
this.getClients().stream()
.filter(v -> Objects.equals(to, v.getClientId()))
.forEach(v -> v.push(message));
}
@@ -68,7 +68,7 @@ public class ClientManager {
* @param message 消息
*/
public void unicast(Client to, Message message) {
this.clients().stream()
this.getClients().stream()
.filter(v -> v == to)
.forEach(v -> v.push(message));
}
@@ -80,7 +80,7 @@ public class ClientManager {
* @param clientTypes 终端类型
*/
public void broadcast(Message message, ClientType ... clientTypes) {
this.clients(clientTypes).forEach(v -> v.push(message));
this.getClients(clientTypes).forEach(v -> v.push(message));
}
/**
@@ -91,7 +91,7 @@ public class ClientManager {
* @param clientTypes 终端类型
*/
public void broadcast(String from, Message message, ClientType ... clientTypes) {
this.clients(clientTypes).stream()
this.getClients(clientTypes).stream()
.filter(v -> !Objects.equals(from, v.getClientId()))
.forEach(v -> v.push(message));
}
@@ -104,7 +104,7 @@ public class ClientManager {
* @param clientTypes 终端类型
*/
public void broadcast(Client from, Message message, ClientType ... clientTypes) {
this.clients(clientTypes).stream()
this.getClients(clientTypes).stream()
.filter(v -> v != from)
.forEach(v -> v.push(message));
}
@@ -114,7 +114,7 @@ public class ClientManager {
*
* @return 终端(包含授权和未授权)
*/
public Client clients(AutoCloseable instance) {
public Client getClients(AutoCloseable instance) {
return this.clients.stream()
.filter(v -> v.getInstance() == instance)
.findFirst()
@@ -126,8 +126,8 @@ public class ClientManager {
*
* @return 授权终端
*/
public Client clients(String clientId) {
return this.clients().stream()
public Client getClients(String clientId) {
return this.getClients().stream()
.filter(v -> Objects.equals(clientId, v.getClientId()))
.findFirst()
.orElse(null);
@@ -138,7 +138,7 @@ public class ClientManager {
*
* @return 授权终端列表
*/
public List<Client> clients(ClientType ... clientTypes) {
public List<Client> getClients(ClientType ... clientTypes) {
return this.clients.stream()
.filter(Client::authorized)
.filter(client -> ArrayUtils.isEmpty(clientTypes) || ArrayUtils.contains(clientTypes, client.getClientType()))
@@ -150,8 +150,8 @@ public class ClientManager {
*
* @return 终端状态
*/
public ClientStatus status(AutoCloseable instance) {
final Client client = this.clients(instance);
public ClientStatus getStatus(AutoCloseable instance) {
final Client client = this.getClients(instance);
return client == null ? null : client.getStatus();
}
@@ -160,8 +160,8 @@ public class ClientManager {
*
* @return 授权终端状态
*/
public ClientStatus status(String clientId) {
final Client client = this.clients(clientId);
public ClientStatus getStatus(String clientId) {
final Client client = this.getClients(clientId);
return client == null ? null : client.getStatus();
}
@@ -170,8 +170,8 @@ public class ClientManager {
*
* @return 授权终端状态列表
*/
public List<ClientStatus> status(ClientType ... clientTypes) {
return this.clients(clientTypes).stream()
public List<ClientStatus> getStatus(ClientType ... clientTypes) {
return this.getClients(clientTypes).stream()
.map(Client::getStatus)
.toList();
}
@@ -183,7 +183,7 @@ public class ClientManager {
* @param message 消息
*/
public void push(AutoCloseable instance, Message message) {
final Client client = this.clients(instance);
final Client client = this.getClients(instance);
if(client == null) {
log.warn("推送消息终端无效:{} - {}", instance, message);
return;
@@ -197,7 +197,7 @@ public class ClientManager {
* @param instance 终端实例
*/
public void close(AutoCloseable instance) {
final Client client = this.clients(instance);
final Client client = this.getClients(instance);
try {
if(client != null) {
client.close();

View File

@@ -170,7 +170,7 @@ public final class SocketSignalMessageHandler implements CompletionHandler<Integ
try {
this.protocolManager.execute(message, this.channel);
} catch (Exception e) {
log.error("处理Socket信令消息异常{} - {}", this.clientManager.clients(this.channel), message, e);
log.error("处理Socket信令消息异常{} - {}", this.clientManager.getClients(this.channel), message, e);
this.clientManager.push(this.channel, this.platformErrorProtocol.build(e));
}
}

View File

@@ -41,7 +41,7 @@ public class WebSocketSignal {
try {
WebSocketSignal.protocolManager.execute(message, session);
} catch (Exception e) {
log.error("处理WebSocket信令消息异常{} - {}", WebSocketSignal.clientManager.clients(session), message, e);
log.error("处理WebSocket信令消息异常{} - {}", WebSocketSignal.clientManager.getClients(session), message, e);
WebSocketSignal.clientManager.push(session, WebSocketSignal.platformErrorProtocol.build(e));
}
}

View File

@@ -39,14 +39,14 @@ public class ClientController {
@GetMapping("/list")
@ApiResponse(content = @Content(schema = @Schema(implementation = ClientStatus.class)))
public Message list() {
return Message.success(this.clientManager.status());
return Message.success(this.clientManager.getStatus());
}
@Operation(summary = "终端状态", description = "终端状态")
@GetMapping("/status/{clientId}")
@ApiResponse(content = @Content(schema = @Schema(implementation = ClientStatus.class)))
public Message status(@PathVariable String clientId) {
return Message.success(this.clientManager.status(clientId));
return Message.success(this.clientManager.getStatus(clientId));
}
@Operation(summary = "唤醒终端", description = "唤醒终端")

View File

@@ -46,22 +46,22 @@ public class RoomController {
@GetMapping("/list")
@ApiResponse(content = @Content(schema = @Schema(implementation = RoomStatus.class)))
public Message list() {
return Message.success(this.roomManager.status());
return Message.success(this.roomManager.getStatus());
}
@Operation(summary = "房间状态", description = "房间状态")
@GetMapping("/status/{roomId}")
@ApiResponse(content = @Content(schema = @Schema(implementation = RoomStatus.class)))
public Message status(@PathVariable String roomId) {
return Message.success(this.roomManager.status(roomId));
return Message.success(this.roomManager.getStatus(roomId));
}
@Operation(summary = "房间终端列表", description = "房间终端列表")
@GetMapping("/list/client/{roomId}")
@ApiResponse(content = @Content(schema = @Schema(implementation = ClientStatus.class)))
public Message listClient(@PathVariable String roomId) {
final Room room = this.roomManager.room(roomId);
return Message.success(room == null ? List.of() : room.clientStatus());
final Room room = this.roomManager.getRoom(roomId);
return Message.success(room == null ? List.of() : room.getClientStatus());
}
}

View File

@@ -27,36 +27,37 @@ import lombok.extern.slf4j.Slf4j;
@Getter
@Setter
public class Room extends OperatorAdapter {
/**
* 房间标识
*/
private String roomId;
/**
* 密码
* 设置密码之后进入房间需要验证密码
*/
private String password;
/**
* 状态
*/
private final RoomStatus roomStatus;
/**
* 媒体服务
*/
private Client mediaClient;
/**
* 房间管理
*/
private final RoomManager roomManager;
/**
* 终端
*/
private final Map<Client, ClientWrapper> clients;
/**
* 通道
*/
private final Map<String, Transport> transports;
/**
* 房间ID
*/
private final String roomId;
/**
* 密码
* 设置密码之后进入房间需要验证密码
*/
private final String password;
/**
* 房间状态
*/
private final RoomStatus roomStatus;
/**
* 媒体服务
* 可以切换
*/
private Client mediaClient;
/**
* 房间管理
*/
private final RoomManager roomManager;
/**
* 终端
*/
private final Map<Client, ClientWrapper> clients;
/**
* 通道
*/
private final Map<String, Transport> transports;
/**
* 生产者
*/
@@ -73,90 +74,97 @@ public class Room extends OperatorAdapter {
* 数据消费者
*/
private final Map<String, DataConsumer> dataConsumers;
/**
* @param mediaClient 媒体服务
*/
public Room(Client mediaClient, RoomManager roomManager) {
this.roomStatus = new RoomStatus();
this.mediaClient = mediaClient;
this.roomManager = roomManager;
this.clients = new ConcurrentHashMap<>();
this.transports = new ConcurrentHashMap<>();
this.producers = new ConcurrentHashMap<>();
this.consumers = new ConcurrentHashMap<>();
this.dataProducers = new ConcurrentHashMap<>();
this.dataConsumers = new ConcurrentHashMap<>();
}
/**
* @param client 终端
*
* @return 是否授权
*/
public boolean authenticate(Client client) {
return
this.mediaClient == client ||
this.clients.containsKey(client);
}
/**
* @return 终端状态列表
*/
public List<ClientStatus> clientStatus() {
return this.clients.keySet().stream()
.map(Client::getStatus)
.toList();
}
/**
* 终端进入
*
* @param client 终端
*
* @return 终端封装器
*/
public ClientWrapper enter(Client client) {
synchronized (this.clients) {
ClientWrapper clientWrapper = this.clients.get(client);
if(clientWrapper != null) {
return clientWrapper;
}
log.info("终端进入房间:{} - {}", this.roomId, client.getClientId());
clientWrapper = new ClientWrapper(this, client);
this.clients.put(client, clientWrapper);
this.roomStatus.setClientSize(this.roomStatus.getClientSize() + 1);
return clientWrapper;
}
}
/**
* 终端离开
*
* @param client 终端
*/
public void leave(Client client) {
synchronized (this.clients) {
final ClientWrapper wrapper = this.clients.remove(client);
if(wrapper != null) {
log.info("终端离开房间:{} - {}", this.roomId, client.getClientId());
try {
/**
* @param roomId 房间ID
* @param password 房间密码
* @param mediaClient 媒体服务
* @param roomManager 房间管理
*/
public Room(String roomId, String password, Client mediaClient, RoomManager roomManager) {
this.roomId = roomId;
this.password = password;
this.roomStatus = new RoomStatus();
this.mediaClient = mediaClient;
this.roomManager = roomManager;
this.clients = new ConcurrentHashMap<>();
this.transports = new ConcurrentHashMap<>();
this.producers = new ConcurrentHashMap<>();
this.consumers = new ConcurrentHashMap<>();
this.dataProducers = new ConcurrentHashMap<>();
this.dataConsumers = new ConcurrentHashMap<>();
}
/**
* 已经进入房间才能使用房间信令
*
* @param client 终端
*
* @return 是否认证
*/
public boolean authenticate(Client client) {
return
this.mediaClient == client ||
this.clients.containsKey(client);
}
/**
* @return 终端状态列表
*/
public List<ClientStatus> getClientStatus() {
return this.clients.keySet().stream()
.map(Client::getStatus)
.toList();
}
/**
* 终端进入
*
* @param client 终端
*
* @return 终端封装器
*/
public ClientWrapper enter(Client client) {
synchronized (this.clients) {
ClientWrapper clientWrapper = this.clients.get(client);
if(clientWrapper != null) {
return clientWrapper;
}
log.info("终端进入房间:{} - {}", this.roomId, client.getClientId());
clientWrapper = new ClientWrapper(this, client);
this.clients.put(client, clientWrapper);
this.roomStatus.setClientSize(this.roomStatus.getClientSize() + 1);
return clientWrapper;
}
}
/**
* 终端离开
*
* @param client 终端
*/
public void leave(Client client) {
synchronized (this.clients) {
final ClientWrapper wrapper = this.clients.remove(client);
if(wrapper != null) {
log.info("终端离开房间:{} - {}", this.roomId, client.getClientId());
try {
wrapper.close();
} catch (Exception e) {
log.error("关闭终端代理异常:{}", wrapper.getClientId(), e);
}
this.roomStatus.setClientSize(this.roomStatus.getClientSize() - 1);
EventPublisher.publishEvent(new RoomLeaveEvent(this, client));
}
}
}
/**
* 媒体服务推送消息
*
* @param message 消息
*/
public void push(Message message) {
this.roomStatus.setClientSize(this.roomStatus.getClientSize() - 1);
EventPublisher.publishEvent(new RoomLeaveEvent(this, client));
}
}
}
/**
* 媒体服务推送消息
*
* @param message 消息
*/
public void pushMedia(Message message) {
this.mediaClient.push(message);
}
@@ -167,14 +175,14 @@ public class Room extends OperatorAdapter {
*
* @return 响应
*/
public Message request(Message message) {
public Message requestMedia(Message message) {
return this.mediaClient.request(message);
}
/**
* 单播消息
*
* @param to 接收终端
* @param to 接收终端
* @param message 消息
*/
public void unicast(String to, Message message) {
@@ -183,151 +191,163 @@ public class Room extends OperatorAdapter {
.forEach(v -> v.push(message));
}
/**
* 广播消息
*
* @param message 消息
*/
public void broadcast(Message message) {
this.clients.keySet().forEach(v -> v.push(message));
}
/**
* 广播消息
*
* @param from 发送终端
* @param message 消息
*/
public void broadcast(String from, Message message) {
this.clients.keySet().stream()
.filter(v -> !Objects.equals(from, v.getClientId()))
.forEach(v -> v.push(message));
}
/**
* 广播消息
*
* @param from 发送终端
* @param message 消息
*/
public void broadcast(Client from, Message message) {
this.clients.keySet().stream()
.filter(v -> v != from)
.forEach(v -> v.push(message));
}
/**
*
* @param client
* @return
*/
public ClientWrapper clientWrapper(Client client) {
return this.clients.get(client);
}
/**
*
* @param client
* @return
*/
public ClientWrapper clientWrapper(String clientId) {
return this.clients.values().stream()
.filter(v -> clientId.equals(v.getClientId()))
.findFirst()
.orElse(null);
}
/**
* @param transportId 通道ID
*
* @return 通道
*/
public Transport transport(String transportId) {
return this.transports.get(transportId);
}
/**
* @param producerId 生产者ID
*
* @return 生产者
*/
public Producer producer(String producerId) {
return this.producers.get(producerId);
}
/**
* @param consumerId 消费者ID
*
* @return 消费者
*/
public Consumer consumer(String consumerId) {
return this.consumers.get(consumerId);
}
/**
* @param producerId 数据生产者ID
*
* @return 数据生产者
*/
public DataProducer dataProducer(String producerId) {
return this.dataProducers.get(producerId);
}
/**
* @param consumerId 数据消费者ID
*
* @return 数据消费者
*/
public DataConsumer dataConsumer(String consumerId) {
return this.dataConsumers.get(consumerId);
}
@Override
public void close() {
if(this.markClose()) {
return;
}
log.info("关闭房间:{}", this.roomId);
this.clients.values().forEach(ClientWrapper::close);
EventPublisher.publishEvent(new RoomCloseEvent(this));
}
@Override
public void remove() {
/**
* 播消息
*
* @param to 接收终端
* @param message 消息
*/
public void unicast(Client to, Message message) {
this.clients.keySet().stream()
.filter(v -> v == to)
.forEach(v -> v.push(message));
}
/**
* 广播消息
*
* @param message 消息
*/
public void broadcast(Message message) {
this.clients.keySet().forEach(v -> v.push(message));
}
/**
* 广播消息
*
* @param from 发送终端
* @param message 消息
*/
public void broadcast(String from, Message message) {
this.clients.keySet().stream()
.filter(v -> !Objects.equals(from, v.getClientId()))
.forEach(v -> v.push(message));
}
/**
* 广播消息
*
* @param from 发送终端
* @param message 消息
*/
public void broadcast(Client from, Message message) {
this.clients.keySet().stream()
.filter(v -> v != from)
.forEach(v -> v.push(message));
}
/**
* @param client 终端
*
* @return 终端包装器
*/
public ClientWrapper clientWrapper(Client client) {
return this.clients.get(client);
}
/**
* @param clientId 终端ID
*
* @return 终端包装器
*/
public ClientWrapper clientWrapper(String clientId) {
return this.clients.values().stream()
.filter(v -> Objects.equals(clientId, v.getClientId()))
.findFirst()
.orElse(null);
}
/**
* @param transportId 通道ID
*
* @return 通道
*/
public Transport transport(String transportId) {
return this.transports.get(transportId);
}
/**
* @param producerId 生产者ID
*
* @return 生产者
*/
public Producer producer(String producerId) {
return this.producers.get(producerId);
}
/**
* @param consumerId 消费者ID
*
* @return 消费者
*/
public Consumer consumer(String consumerId) {
return this.consumers.get(consumerId);
}
/**
* @param producerId 数据生产者ID
*
* @return 数据生产者
*/
public DataProducer dataProducer(String producerId) {
return this.dataProducers.get(producerId);
}
/**
* @param consumerId 数据消费者ID
*
* @return 数据消费者
*/
public DataConsumer dataConsumer(String consumerId) {
return this.dataConsumers.get(consumerId);
}
@Override
public void close() {
if(this.markClose()) {
return;
}
log.info("关闭房间:{}", this.roomId);
this.clients.values().forEach(ClientWrapper::close);
EventPublisher.publishEvent(new RoomCloseEvent(this));
}
@Override
public void remove() {
log.info("移除房间:{}", this.roomId);
this.roomManager.remove(this);
}
}
@Override
public void log() {
log.info("""
当前房间:{}
终端数量:{}
通道数量:{}
消费者数量:{}
生产者数量:{}
数据消费者数量:{}
数据生产者数量:{}""",
this.roomId,
this.clients.size(),
this.transports.size(),
this.consumers.size(),
this.producers.size(),
this.dataConsumers.size(),
this.dataProducers.size()
);
this.clients.values().forEach(ClientWrapper::log);
}
@Override
public void log() {
log.info("""
当前房间:{}
终端数量:{}
通道数量:{}
消费者数量:{}
生产者数量:{}
数据消费者数量:{}
数据生产者数量:{}""",
this.roomId,
this.clients.size(),
this.transports.size(),
this.consumers.size(),
this.producers.size(),
this.dataConsumers.size(),
this.dataProducers.size()
);
this.clients.values().forEach(ClientWrapper::log);
}
/**
* 清理没有关联终端的资源
*/
public void releaseUnknowClient() {
this.transports.values().stream().filter(v -> !this.clients.containsKey(v.getClient())).forEach(Transport::close);
this.consumers.values().stream().filter(v -> !this.clients.containsValue(v.getConsumerClient())).forEach(Consumer::close);
this.producers.values().stream().filter(v -> !this.clients.containsValue(v.getProducerClient())).forEach(Producer::close);
this.dataConsumers.values().stream().filter(v -> !this.clients.containsValue(v.getConsumerClient())).forEach(DataConsumer::close);
this.dataProducers.values().stream().filter(v -> !this.clients.containsValue(v.getProducerClient())).forEach(DataProducer::close);
}
/**
* 清理没有关联终端的资源
*/
public void releaseUnknowClient() {
this.consumers.values().stream().filter(v -> !this.clients.containsValue(v.getConsumerClient())).forEach(Consumer::close);
this.producers.values().stream().filter(v -> !this.clients.containsValue(v.getProducerClient())).forEach(Producer::close);
this.dataConsumers.values().stream().filter(v -> !this.clients.containsValue(v.getConsumerClient())).forEach(DataConsumer::close);
this.dataProducers.values().stream().filter(v -> !this.clients.containsValue(v.getProducerClient())).forEach(DataProducer::close);
this.transports.values().stream().filter(v -> !this.clients.containsKey(v.getClient())).forEach(Transport::close);
}
}

View File

@@ -8,30 +8,30 @@ import lombok.Getter;
import lombok.Setter;
/**
* 房间终端ID
* 房间终端ID集合
*
* @author acgist
*/
@Getter
@Setter
@Schema(title = "房间终端ID", description = "房间终端ID")
@Schema(title = "房间终端ID集合", description = "房间终端ID集合")
public class RoomClientId {
@Schema(title = "房间ID", description = "房间ID")
private String roomId;
@Schema(title = "终端ID", description = "终端ID")
private String clientId;
@Schema(title = "数据生产者ID", description = "数据生产者ID")
@Schema(title = "数据生产者ID集合", description = "数据生产者ID集合")
private List<String> dataProducers;
@Schema(title = "数据消费者ID", description = "数据消费者ID")
@Schema(title = "数据消费者ID集合", description = "数据消费者ID集合")
private List<String> dataConsumers;
@Schema(title = "音频生产者ID", description = "音频生产者ID")
@Schema(title = "音频生产者ID集合", description = "音频生产者ID集合")
private List<String> audioProducers;
@Schema(title = "视频生产者ID", description = "视频生产者ID")
@Schema(title = "视频生产者ID集合", description = "视频生产者ID集合")
private List<String> videoProducers;
@Schema(title = "音频消费者ID", description = "音频消费者ID")
@Schema(title = "音频消费者ID集合", description = "音频消费者ID集合")
private List<String> audioConsumers;
@Schema(title = "视频消费者ID", description = "视频消费者ID")
@Schema(title = "视频消费者ID集合", description = "视频消费者ID集合")
private List<String> videoConsumers;
public RoomClientId() {

View File

@@ -26,155 +26,144 @@ import lombok.extern.slf4j.Slf4j;
@Manager
public class RoomManager {
private final IdService idService;
private final ClientManager clientManager;
/**
* 房间列表
*/
private final List<Room> rooms;
public RoomManager(IdService idService, ClientManager clientManager) {
this.idService = idService;
private final IdService idService;
private final ClientManager clientManager;
/**
* 房间列表
*/
private final List<Room> rooms;
public RoomManager(IdService idService, ClientManager clientManager) {
this.idService = idService;
this.clientManager = clientManager;
this.rooms = new CopyOnWriteArrayList<>();
this.rooms = new CopyOnWriteArrayList<>();
}
@Scheduled(cron = "${taoyao.scheduled.room:0 0/5 * * * ?}")
public void scheduled() {
this.releaseUnknowClient();
}
/**
* @param roomId 房间标识
*
* @return 房间
*/
public Room room(String roomId) {
return this.rooms.stream()
.filter(v -> Objects.equals(roomId, v.getRoomId()))
.findFirst()
.orElse(null);
}
/**
* @return 所有房间列表
*/
public List<Room> rooms() {
return this.rooms;
}
/**
* @param roomId 房间标识
*
* @return 房间状态
*/
public RoomStatus status(String roomId) {
final Room room = this.room(roomId);
return room == null ? null : room.getRoomStatus();
}
/**
* @return 所有房间状态列表
*/
public List<RoomStatus> status() {
return this.rooms().stream()
.map(Room::getRoomStatus)
.toList();
}
/**
* @param roomId 房间ID
*
* @return 房间
*/
public Room getRoom(String roomId) {
return this.rooms.stream()
.filter(v -> Objects.equals(roomId, v.getRoomId()))
.findFirst()
.orElse(null);
}
/**
* @param roomId 房间ID
*
* @return 房间状态
*/
public RoomStatus getStatus(String roomId) {
final Room room = this.getRoom(roomId);
return room == null ? null : room.getRoomStatus();
}
/**
* @return 所有房间状态列表
*/
public List<RoomStatus> getStatus() {
return this.rooms.stream()
.map(Room::getRoomStatus)
.toList();
}
/**
* 重建房间
*
* @param mediaClient 媒体服务终端
* @param message 消息
*/
public void recreate(Client mediaClient, Message message) {
this.rooms.stream()
.filter(room -> mediaClient.getClientId().equals(room.getMediaClient().getClientId()))
.forEach(room -> {
log.info("重建房间:{}", room.getRoomId());
final Message clone = message.cloneWithoutBody();
clone.getHeader().setId(this.idService.buildId());
clone.setBody(Map.of(Constant.ROOM_ID, room.getRoomId()));
// 异步发送防止线程卡死
mediaClient.push(clone);
// 同步需要添加异步注解
// mediaClient.request(clone);
// 更新媒体服务
room.setMediaClient(mediaClient);
// TODO通知重建房间
});
}
/**
* 重建房间
*
* @param mediaClient 媒体服务终端
* @param message 消息
*/
public void recreate(Client mediaClient, Message message) {
this.rooms.stream()
.filter(room -> mediaClient.getClientId().equals(room.getMediaClient().getClientId()))
.forEach(room -> {
log.info("重建房间:{}", room.getRoomId());
final Message clone = message.cloneWithoutBody();
clone.getHeader().setId(this.idService.buildId());
clone.setBody(Map.of(Constant.ROOM_ID, room.getRoomId()));
// 异步发送防止线程卡死
mediaClient.push(clone);
// 同步需要添加异步注解
// mediaClient.request(clone);
// 更新媒体服务
room.setMediaClient(mediaClient);
// TODO通知重建房间
});
}
/**
* 创建房间
*
* @param name 名称
* @param password 密码
* @param mediaClientId 媒体服务终端标识
* @param message 消息
*
* @return 房间信息
*/
public Room create(String name, String password, String mediaClientId, Message message) {
final Client mediaClient = this.clientManager.clients(mediaClientId);
if(mediaClient == null) {
throw MessageCodeException.of("无效媒体服务:" + mediaClientId);
}
final String roomId = this.idService.buildUuid();
// 房间
final Room room = new Room(mediaClient, this);
room.setRoomId(roomId);
room.setPassword(password);
// 状态
final RoomStatus roomStatus = room.getRoomStatus();
roomStatus.setRoomId(roomId);
roomStatus.setName(name);
roomStatus.setMediaClientId(mediaClientId);
roomStatus.setClientSize(0L);
// 创建媒体服务房间
message.setBody(Map.of(Constant.ROOM_ID, roomId));
mediaClient.request(message);
log.info("创建房间:{}-{}", roomId, name);
this.rooms.add(room);
return room;
}
/**
* 创建房间
*
* @param name 名称
* @param password 密码
* @param mediaClientId 媒体服务终端标识
* @param message 消息
*
* @return 房间信息
*/
public Room create(String name, String password, String mediaClientId, Message message) {
final Client mediaClient = this.clientManager.getClients(mediaClientId);
if(mediaClient == null) {
throw MessageCodeException.of("无效媒体服务:" + mediaClientId);
}
final String roomId = this.idService.buildUuid();
final Room room = new Room(roomId, password, mediaClient, this);
final RoomStatus roomStatus = room.getRoomStatus();
roomStatus.setName(name);
roomStatus.setRoomId(roomId);
roomStatus.setClientSize(0L);
roomStatus.setMediaClientId(mediaClientId);
// 创建媒体服务房间
message.setBody(Map.of(Constant.ROOM_ID, roomId));
mediaClient.request(message);
log.info("创建房间:{} - {}", roomId, name);
this.rooms.add(room);
return room;
}
/**
* 离开房间
*
* @param client 终端
*/
public void leave(Client client) {
this.rooms.forEach(v -> v.leave(client));
}
/**
* 删除房间
*
* @param room 房间
*/
public void remove(Room room) {
this.rooms.remove(room);
}
/**
* 记录日志
*/
public void log() {
log.info("""
当前房间数量:{}""",
this.rooms.size()
);
this.rooms.forEach(Room::log);
}
/**
* 离开房间
*
* @param client 终端
*/
public void leave(Client client) {
this.rooms.forEach(v -> v.leave(client));
}
/**
* 删除房间
*
* @param room 房间
*/
public void remove(Room room) {
this.rooms.remove(room);
}
/**
* 记录日志
*/
public void log() {
log.info("""
当前房间数量:{}""",
this.rooms.size()
);
this.rooms.forEach(Room::log);
}
/**
* 清理没有关联终端的资源
*/
private void releaseUnknowClient() {
this.rooms.forEach(Room::releaseUnknowClient);
}
/**
* 清理没有关联终端的资源
*/
private void releaseUnknowClient() {
this.rooms.forEach(Room::releaseUnknowClient);
}
}

View File

@@ -13,14 +13,14 @@ import lombok.Setter;
@Setter
@Schema(title = "房间状态", description = "房间状态")
public class RoomStatus {
@Schema(title = "房间标识", description = "房间标识")
private String roomId;
@Schema(title = "房间名称", description = "房间名称")
private String name;
@Schema(title = "终端数量", description = "终端数量")
private Long clientSize;
@Schema(title = "媒体服务标识", description = "媒体服务标识")
private String mediaClientId;
@Schema(title = "房间ID", description = "房间ID")
private String roomId;
@Schema(title = "房间名称", description = "房间名称")
private String name;
@Schema(title = "终端数量", description = "终端数量")
private Long clientSize;
@Schema(title = "媒体服务标识", description = "媒体服务标识")
private String mediaClientId;
}

View File

@@ -7,11 +7,17 @@ package com.acgist.taoyao.signal.party.media;
*/
public enum RouterType {
// 对讲:只有两个人之间的媒体相互路由
/**
* 对讲:只有两个人之间的媒体相互路由
*/
ONE_TO_ONE,
// 广播:只有一个人的媒体路由到其他人
/**
* 广播:只有一个人的媒体路由到其他人
*/
ONE_TO_ALL,
// 网播:所有人的媒体相互路由
/**
* 网播:所有人的媒体相互路由
*/
ALL_TO_ALL,
}

View File

@@ -7,17 +7,31 @@ package com.acgist.taoyao.signal.party.media;
*/
public enum SubscribeType {
// 订阅所有媒体
/**
* 订阅所有媒体
*/
ALL,
// 订阅所有音频媒体
/**
* 订阅所有音频媒体
*/
ALL_AUDIO,
// 订阅所有视频媒体
/**
* 订阅所有视频媒体
*/
ALL_VIDEO,
// 没有订阅任何媒体
/**
* 没有订阅任何媒体
*/
NONE;
/**
* @param value 名称
*
* @return 类型
*/
public static final SubscribeType of(String value) {
for (SubscribeType type : SubscribeType.values()) {
final SubscribeType[] values = SubscribeType.values();
for (SubscribeType type : values) {
if(type.name().equalsIgnoreCase(value)) {
return type;
}
@@ -28,14 +42,15 @@ public enum SubscribeType {
/**
* @param producer 生产者
*
* @return 是否可以消
* @return 是否可以消
*/
public boolean canConsume(Producer producer) {
return switch (this) {
case NONE -> false;
case NONE -> false;
case ALL_AUDIO -> producer.getKind() == Kind.AUDIO;
case ALL_VIDEO -> producer.getKind() == Kind.VIDEO;
default -> true;
case ALL -> true;
default -> true;
};
}

View File

@@ -29,10 +29,14 @@ public class Transport extends OperatorAdapter {
* @author acgist
*/
public enum Direction {
// 接收
/**
* 接收
*/
RECV,
// 发送
/**
* 发送
*/
SEND;
}
@@ -46,15 +50,15 @@ public class Transport extends OperatorAdapter {
*/
private final Client client;
/**
* 房间标识
* 房间ID
*/
private final String roomId;
/**
* 终端标识
* 终端ID
*/
private final String clientId;
/**
* 通道标识
* 通道ID
*/
private final String transportId;
/**
@@ -79,12 +83,12 @@ public class Transport extends OperatorAdapter {
private Object sctpParameters;
public Transport(String transportId, Direction direction, Room room, Client client) {
this.room = room;
this.roomId = room.getRoomId();
this.client = client;
this.clientId = client.getClientId();
this.transportId = transportId;
this.direction = direction;
this.room = room;
this.client = client;
this.roomId = room.getRoomId();
this.clientId = client.getClientId();
this.direction = direction;
}
/**
@@ -93,8 +97,8 @@ public class Transport extends OperatorAdapter {
* @param body 消息主体
*/
public void copy(Map<String, Object> body) {
this.iceCandidates = MapUtils.get(body, Constant.ICE_CANDIDATES);
this.iceParameters = MapUtils.get(body, Constant.ICE_PARAMETERS);
this.iceCandidates = MapUtils.get(body, Constant.ICE_CANDIDATES);
this.iceParameters = MapUtils.get(body, Constant.ICE_PARAMETERS);
this.dtlsParameters = MapUtils.get(body, Constant.DTLS_PARAMETERS);
this.sctpParameters = MapUtils.get(body, Constant.SCTP_PARAMETERS);
}

View File

@@ -23,7 +23,7 @@ public abstract class ProtocolControlAdapter extends ProtocolClientAdapter {
@Override
public void execute(String clientId, ClientType clientType, Client client, Message message, Map<String, Object> body) {
final String to = MapUtils.remove(body, Constant.TO);
final Client targetClient = this.clientManager.clients(to);
final Client targetClient = this.clientManager.getClients(to);
if(targetClient == null) {
throw MessageCodeException.of("目标终端无效:" + to);
}
@@ -53,7 +53,7 @@ public abstract class ProtocolControlAdapter extends ProtocolClientAdapter {
* @return 响应
*/
protected Message request(String clientId, Message request) {
final Client client = this.clientManager.clients(clientId);
final Client client = this.clientManager.getClients(clientId);
if(client == null) {
return Message.fail("无效终端:" + clientId);
} else {

View File

@@ -84,7 +84,7 @@ public class ProtocolManager {
* @param instance 终端实例
*/
public void execute(String content, AutoCloseable instance) {
final Client client = this.clientManager.clients(instance);
final Client client = this.clientManager.getClients(instance);
if(client == null) {
log.warn("信令终端无效:{}-{}", instance, content);
return;

View File

@@ -24,7 +24,7 @@ public abstract class ProtocolRoomAdapter extends ProtocolClientAdapter {
@Override
public void execute(String clientId, ClientType clientType, Client client, Message message, Map<String, Object> body) {
final String roomId = MapUtils.get(body, Constant.ROOM_ID);
final Room room = this.roomManager.room(roomId);
final Room room = this.roomManager.getRoom(roomId);
if(room == null) {
throw MessageCodeException.of("无效房间:" + roomId);
}
@@ -38,7 +38,7 @@ public abstract class ProtocolRoomAdapter extends ProtocolClientAdapter {
* @param room 房间
* @param client 终端
*
* @return 是否授权
* @return 是否认证
*/
protected boolean authenticate(Room room, Client client) {
return room.authenticate(client);

View File

@@ -65,9 +65,9 @@ public class ClientListProtocol extends ProtocolClientAdapter {
public void execute(String clientId, ClientType clientType, Client client, Message message, Map<String, Object> body) {
final String queryClientType = MapUtils.get(body, Constant.CLIENT_TYPE);
if(StringUtils.isEmpty(queryClientType)) {
message.setBody(this.clientManager.status());
message.setBody(this.clientManager.getStatus());
} else {
message.setBody(this.clientManager.status(ClientType.of(queryClientType)));
message.setBody(this.clientManager.getStatus(ClientType.of(queryClientType)));
}
client.push(message);
}

View File

@@ -73,7 +73,7 @@ public class ClientRegisterProtocol extends ProtocolClientAdapter {
final String username = MapUtils.get(body, Constant.USERNAME);
final String password = MapUtils.get(body, Constant.PASSWORD);
if(this.securityService.authenticate(username, password)) {
final Client oldClient = this.clientManager.clients(clientId);
final Client oldClient = this.clientManager.getClients(clientId);
if(oldClient != null) {
log.debug("终端已经存在(注销旧的终端):{}", clientId);
CloseableUtils.close(oldClient);

View File

@@ -59,7 +59,7 @@ public class ClientStatusProtocol extends ProtocolClientAdapter {
@Override
public void execute(String clientId, ClientType clientType, Client client, Message message, Map<String, Object> body) {
final String queryClientId = MapUtils.get(body, Constant.CLIENT_ID, clientId);
message.setBody(this.clientManager.status(queryClientId));
message.setBody(this.clientManager.getStatus(queryClientId));
client.push(message);
}

View File

@@ -53,7 +53,7 @@ public class ControlClientRecordProtocol extends ProtocolControlAdapter implemen
@Override
public Message execute(String clientId, Boolean enabled) {
this.updateRecordStatus(this.clientManager.clients(clientId), enabled);
this.updateRecordStatus(this.clientManager.getClients(clientId), enabled);
return this.request(clientId, this.build(Map.of(Constant.ENABLED, enabled)));
}

View File

@@ -72,7 +72,7 @@ public class ControlServerRecordProtocol extends ProtocolControlAdapter implemen
String filepath;
final String roomId = MapUtils.get(body, Constant.ROOM_ID);
final Boolean enabled = MapUtils.get(body, Constant.ENABLED, Boolean.TRUE);
final Room room = this.roomManager.room(roomId);
final Room room = this.roomManager.getRoom(roomId);
if(enabled) {
filepath = this.start(room, room.clientWrapper(targetClient));
} else {
@@ -86,8 +86,8 @@ public class ControlServerRecordProtocol extends ProtocolControlAdapter implemen
@Override
public Message execute(String roomId, String clientId, Boolean enabled) {
String filepath;
final Room room = this.roomManager.room(roomId);
final Client client = this.clientManager.clients(clientId);
final Room room = this.roomManager.getRoom(roomId);
final Client client = this.clientManager.getClients(clientId);
if(enabled) {
filepath = this.start(room, room.clientWrapper(client));
} else {

View File

@@ -48,7 +48,7 @@ public class MediaDataProduceProtocol extends ProtocolRoomAdapter {
final String streamId = Constant.STREAM_ID_PRODUCER.apply(Constant.DATA, clientId);
body.put(Constant.CLIENT_ID, clientId);
body.put(Constant.STREAM_ID, streamId);
final Message response = room.request(message);
final Message response = room.requestMedia(message);
final Map<String, Object> responseBody = response.body();
final String producerId = MapUtils.get(responseBody, Constant.PRODUCER_ID);
final ClientWrapper producerClientWrapper = room.clientWrapper(client);

View File

@@ -52,7 +52,7 @@ public class MediaProduceProtocol extends ProtocolRoomAdapter {
final String streamId = Constant.STREAM_ID_PRODUCER.apply(kind, clientId);
body.put(Constant.CLIENT_ID, clientId);
body.put(Constant.STREAM_ID, streamId);
final Message response = room.request(message);
final Message response = room.requestMedia(message);
final Map<String, Object> responseBody = response.body();
final String producerId = MapUtils.get(responseBody, Constant.PRODUCER_ID);
final ClientWrapper producerClientWrapper = room.clientWrapper(client);

View File

@@ -48,7 +48,7 @@ public class MediaRouterRtpCapabilitiesProtocol extends ProtocolRoomAdapter {
@Override
public void execute(String clientId, ClientType clientType, Room room, Client client, Client mediaClient, Message message, Map<String, Object> body) {
if(clientType.mediaClient()) {
client.push(room.request(message));
client.push(room.requestMedia(message));
} else {
this.logNoAdapter(clientType);
}

View File

@@ -58,7 +58,7 @@ public class MediaTransportPlainProtocol extends ProtocolRoomAdapter {
@Override
public void execute(String clientId, ClientType clientType, Room room, Client client, Client mediaClient, Message message, Map<String, Object> body) {
body.put(Constant.CLIENT_ID, clientId);
final Message response = room.request(message);
final Message response = room.requestMedia(message);
final Map<String, Object> responseBody = response.body();
final Map<String, Transport> transports = room.getTransports();
final String transportId = MapUtils.get(responseBody, Constant.TRANSPORT_ID);

View File

@@ -39,7 +39,7 @@ public class MediaTransportWebRtcConnectProtocol extends ProtocolRoomAdapter {
@Override
public void execute(String clientId, ClientType clientType, Room room, Client client, Client mediaClient, Message message, Map<String, Object> body) {
if(clientType.mediaClient()) {
final Message response = room.request(message);
final Message response = room.requestMedia(message);
final Map<String, Object> responseBody = response.body();
client.push(response);
final String transportId = MapUtils.get(responseBody, Constant.TRANSPORT_ID);

View File

@@ -59,7 +59,7 @@ public class MediaTransportWebRtcCreateProtocol extends ProtocolRoomAdapter {
@Override
public void execute(String clientId, ClientType clientType, Room room, Client client, Client mediaClient, Message message, Map<String, Object> body) {
body.put(Constant.CLIENT_ID, clientId);
final Message response = room.request(message);
final Message response = room.requestMedia(message);
final Map<String, Object> responseBody = response.body();
final Map<String, Transport> transports = room.getTransports();
final String transportId = MapUtils.get(responseBody, Constant.TRANSPORT_ID);

View File

@@ -18,7 +18,7 @@ import com.acgist.taoyao.signal.party.media.RoomClientId;
import com.acgist.taoyao.signal.protocol.ProtocolRoomAdapter;
/**
* 房间终端ID信令
* 房间终端ID集合信令
*
* @author acgist
*/

View File

@@ -69,7 +69,7 @@ public class RoomClientListProtocol extends ProtocolRoomAdapter implements Appli
final Client client = event.getClient();
client.push(this.build(Map.of(
Constant.ROOM_ID, room.getRoomId(),
Constant.CLIENTS, room.clientStatus()
Constant.CLIENTS, room.getClientStatus()
)));
}
@@ -77,7 +77,7 @@ public class RoomClientListProtocol extends ProtocolRoomAdapter implements Appli
public void execute(String clientId, ClientType clientType, Room room, Client client, Client mediaClient, Message message, Map<String, Object> body) {
message.setBody(Map.of(
Constant.ROOM_ID, room.getRoomId(),
Constant.CLIENTS, room.clientStatus()
Constant.CLIENTS, room.getClientStatus()
));
client.push(message);
}

View File

@@ -55,7 +55,7 @@ public class RoomEnterProtocol extends ProtocolRoomAdapter {
final Map<String, Object> body = message.body();
final String roomId = MapUtils.get(body, Constant.ROOM_ID);
final String password = MapUtils.get(body, Constant.PASSWORD);
final Room room = this.roomManager.room(roomId);
final Room room = this.roomManager.getRoom(roomId);
if(room == null) {
throw MessageCodeException.of("无效房间:" + roomId);
}

View File

@@ -38,7 +38,7 @@ public class RoomListProtocol extends ProtocolClientAdapter {
@Override
public void execute(String clientId, ClientType clientType, Client client, Message message, Map<String, Object> body) {
message.setBody(this.roomManager.status());
message.setBody(this.roomManager.getStatus());
client.push(message);
}

View File

@@ -45,7 +45,7 @@ public class SessionCallProtocol extends ProtocolSessionAdapter {
@Override
public void execute(String clientId, ClientType clientType, Client client, Message message, Map<String, Object> body) {
final String targetId = MapUtils.get(body, Constant.CLIENT_ID);
final Client target = this.clientManager.clients(targetId);
final Client target = this.clientManager.getClients(targetId);
if(target == null) {
log.warn("邀请对象无效:{}", clientId);
return;