[*] 日常优化

This commit is contained in:
acgist
2023-07-19 21:05:42 +08:00
parent 5fcbce18f4
commit 2c2449fd3a
25 changed files with 211 additions and 195 deletions

View File

@@ -9,33 +9,43 @@ import com.acgist.taoyao.boot.model.Message;
*/
public interface Client extends AutoCloseable {
/**
* @return IP
*/
String ip();
/**
* @return 终端标识
*/
String clientId();
/**
* @return 终端类型
*/
ClientType clientType();
/**
* @return 终端状态
*/
ClientStatus status();
/**
* 推送消息
*
* @param message 消息
*/
void push(Message message);
/**
* @return IP
*/
String getIP();
/**
* @return 终端名称
*/
String getName();
/**
* @return 终端ID
*/
String getClientId();
/**
* @return 终端类型
*/
ClientType getClientType();
/**
* @return 终端状态
*/
ClientStatus getStatus();
/**
* @return 终端实例
*/
AutoCloseable getInstance();
/**
* 推送消息
*
* @param message 消息
*/
void push(Message message);
/**
* 请求消息
*
@@ -48,35 +58,35 @@ public interface Client extends AutoCloseable {
/**
* 响应消息
*
* @param id 消息标识
* @param id 消息ID
* @param message 消息
*
* @return 是否响应消息
*/
boolean response(Long id, Message message);
/**
* @param timeout 超时时间
*
* @return 授权是否超时
*/
boolean timeout(long timeout);
/**
* @return 终端实例
*/
AutoCloseable instance();
/**
* 设置授权
*
* @param clientId 终端标识
*/
void authorize(String clientId);
/**
* @return 是否授权
*/
boolean authorized();
/**
* @return 授权是否超时
*/
boolean timeout();
/**
* 设置授权
*
* @param clientId 终端ID
*/
void authorize(String clientId);
/**
* @return 是否授权
*/
boolean authorized();
/**
* @return 是否没有授权
*/
default boolean unauthorized() {
return !this.authorized();
}
}

View File

@@ -20,72 +20,86 @@ import lombok.extern.slf4j.Slf4j;
@Slf4j
public abstract class ClientAdapter<T extends AutoCloseable> implements Client {
/**
* IP
*/
protected String ip;
/**
* 进入时间
*/
protected final long time;
/**
/**
* IP
*/
protected String ip;
/**
* 终端ID
*/
protected String clientId;
/**
* 进入时间
*/
protected final long time;
/**
* 超时时间
*/
protected final long timeout;
/**
* 终端标识
*/
protected String clientId;
/**
* 终端实例
*/
protected final T instance;
/**
* 是否授权
*/
protected boolean authorized;
/**
* 终端状态
*/
protected final ClientStatus status;
/**
/**
* 终端实例
*/
protected final T instance;
/**
* 是否授权
*/
protected boolean authorized;
/**
* 终端状态
*/
protected final ClientStatus status;
/**
* 同步消息
*/
protected final Map<Long, Message> requestMessage;
protected ClientAdapter(long timeout, T instance) {
this.time = System.currentTimeMillis();
this.timeout = timeout;
this.instance = instance;
this.authorized = false;
this.status = new ClientStatus();
this.requestMessage = new ConcurrentHashMap<>();
}
@Override
public String ip() {
return this.ip;
}
@Override
public String clientId() {
return this.clientId;
}
@Override
public ClientType clientType() {
return this.status.getClientType();
}
@Override
public ClientStatus status() {
return this.status;
}
@Override
/**
* @param timeout 超时时间
* @param instance 终端实例
*/
protected ClientAdapter(long timeout, T instance) {
this.time = System.currentTimeMillis();
this.timeout = timeout;
this.instance = instance;
this.authorized = false;
this.status = new ClientStatus();
this.requestMessage = new ConcurrentHashMap<>();
}
@Override
public String getIP() {
return this.ip;
}
@Override
public String getName() {
return this.status.getName();
}
@Override
public String getClientId() {
return this.clientId;
}
@Override
public ClientType getClientType() {
return this.status.getClientType();
}
@Override
public ClientStatus getStatus() {
return this.status;
}
@Override
public T getInstance() {
return this.instance;
}
@Override
public Message request(Message request) {
final Header header = request.getHeader();
final Long id = header.getId();
final Long id = header.getId();
this.requestMessage.put(id, request);
synchronized (request) {
this.push(request);
@@ -102,9 +116,9 @@ public abstract class ClientAdapter<T extends AutoCloseable> implements Client {
}
return response;
}
@Override
public boolean response(Long id, Message message) {
@Override
public boolean response(Long id, Message message) {
final Message request = this.requestMessage.get(id);
if (request != null) {
// 同步处理:重新设置响应消息
@@ -117,37 +131,32 @@ public abstract class ClientAdapter<T extends AutoCloseable> implements Client {
} else {
return false;
}
}
@Override
public boolean timeout(long timeout) {
return System.currentTimeMillis() - this.time > timeout;
}
@Override
public T instance() {
return this.instance;
}
@Override
public void authorize(String clientId) {
this.clientId = clientId;
this.authorized = true;
}
@Override
public boolean authorized() {
return this.authorized;
}
@Override
public void close() throws Exception {
this.instance.close();
}
@Override
public String toString() {
return this.getClass().getSimpleName() + " - " + this.ip + " - " + this.clientId;
}
}
@Override
public boolean timeout() {
return System.currentTimeMillis() - this.time > this.timeout;
}
@Override
public void authorize(String clientId) {
this.clientId = clientId;
this.authorized = true;
}
@Override
public boolean authorized() {
return this.authorized;
}
@Override
public void close() throws Exception {
this.instance.close();
}
@Override
public String toString() {
return this.getClass().getSimpleName() + " - " + this.ip + " - " + this.clientId;
}
}

View File

@@ -9,7 +9,6 @@ import org.springframework.context.ApplicationContext;
import org.springframework.scheduling.annotation.Scheduled;
import com.acgist.taoyao.boot.annotation.Manager;
import com.acgist.taoyao.boot.config.TaoyaoProperties;
import com.acgist.taoyao.boot.model.Message;
import com.acgist.taoyao.signal.event.client.ClientCloseEvent;
@@ -24,7 +23,6 @@ import lombok.extern.slf4j.Slf4j;
@Manager
public class ClientManager {
private final TaoyaoProperties taoyaoProperties;
private final ApplicationContext applicationContext;
/**
@@ -32,8 +30,7 @@ public class ClientManager {
*/
private final List<Client> clients;
public ClientManager(TaoyaoProperties taoyaoProperties, ApplicationContext applicationContext) {
this.taoyaoProperties = taoyaoProperties;
public ClientManager(ApplicationContext applicationContext) {
this.applicationContext = applicationContext;
this.clients = new CopyOnWriteArrayList<>();
}
@@ -60,7 +57,7 @@ public class ClientManager {
*/
public void unicast(String to, Message message) {
this.clients().stream()
.filter(v -> Objects.equals(to, v.clientId()))
.filter(v -> Objects.equals(to, v.getClientId()))
.forEach(v -> v.push(message));
}
@@ -72,7 +69,7 @@ public class ClientManager {
*/
public void unicast(Client to, Message message) {
this.clients().stream()
.filter(v -> v.instance() == to)
.filter(v -> v.getInstance() == to)
.forEach(v -> v.push(message));
}
@@ -95,7 +92,7 @@ public class ClientManager {
*/
public void broadcast(String from, Message message, ClientType ... clientTypes) {
this.clients(clientTypes).stream()
.filter(v -> !Objects.equals(from, v.clientId()))
.filter(v -> !Objects.equals(from, v.getClientId()))
.forEach(v -> v.push(message));
}
@@ -108,7 +105,7 @@ public class ClientManager {
*/
public void broadcast(Client from, Message message, ClientType ... clientTypes) {
this.clients(clientTypes).stream()
.filter(v -> v.instance() != from)
.filter(v -> v.getInstance() != from)
.forEach(v -> v.push(message));
}
@@ -119,7 +116,7 @@ public class ClientManager {
*/
public Client clients(AutoCloseable instance) {
return this.clients.stream()
.filter(v -> v.instance() == instance)
.filter(v -> v.getInstance() == instance)
.findFirst()
.orElse(null);
}
@@ -131,7 +128,7 @@ public class ClientManager {
*/
public Client clients(String clientId) {
return this.clients().stream()
.filter(v -> Objects.equals(clientId, v.clientId()))
.filter(v -> Objects.equals(clientId, v.getClientId()))
.findFirst()
.orElse(null);
}
@@ -144,7 +141,7 @@ public class ClientManager {
public List<Client> clients(ClientType ... clientTypes) {
return this.clients.stream()
.filter(Client::authorized)
.filter(client -> ArrayUtils.isEmpty(clientTypes) || ArrayUtils.contains(clientTypes, client.clientType()))
.filter(client -> ArrayUtils.isEmpty(clientTypes) || ArrayUtils.contains(clientTypes, client.getClientType()))
.toList();
}
@@ -155,7 +152,7 @@ public class ClientManager {
*/
public ClientStatus status(AutoCloseable instance) {
final Client client = this.clients(instance);
return client == null ? null : client.status();
return client == null ? null : client.getStatus();
}
/**
@@ -165,7 +162,7 @@ public class ClientManager {
*/
public ClientStatus status(String clientId) {
final Client client = this.clients(clientId);
return client == null ? null : client.status();
return client == null ? null : client.getStatus();
}
/**
@@ -175,7 +172,7 @@ public class ClientManager {
*/
public List<ClientStatus> status(ClientType ... clientTypes) {
return this.clients(clientTypes).stream()
.map(Client::status)
.map(Client::getStatus)
.toList();
}
@@ -225,8 +222,8 @@ public class ClientManager {
private void closeTimeout() {
final int oldSize = this.clients.size();
this.clients.stream()
.filter(v -> !v.authorized())
.filter(v -> v.timeout(this.taoyaoProperties.getTimeout()))
.filter(v -> v.unauthorized())
.filter(v -> v.timeout())
.forEach(v -> {
log.debug("关闭超时终端:{}", v);
this.close(v);

View File

@@ -37,7 +37,7 @@ public abstract class ClientEventAdapter extends ApplicationEventAdapter {
public ClientEventAdapter(Client client, Message message, Map<String, Object> body) {
super(client, message, body);
this.client = client;
this.clientId = client.clientId();
this.clientId = client.getClientId();
}
}

View File

@@ -88,7 +88,7 @@ public class ClientWrapper implements AutoCloseable {
this.room = room;
this.client = client;
this.roomId = room.getRoomId();
this.clientId = client.clientId();
this.clientId = client.getClientId();
this.producers = new ConcurrentHashMap<>();
this.consumers = new ConcurrentHashMap<>();
this.dataProducers = new ConcurrentHashMap<>();

View File

@@ -293,7 +293,7 @@ public class Recorder {
* @param status 状态
*/
private void updateRecordStatus(boolean status) {
this.clientWrapper.getClient().status().setServerRecording(status);
this.clientWrapper.getClient().getStatus().setServerRecording(status);
}
}

View File

@@ -105,7 +105,7 @@ public class Room extends OperatorAdapter {
*/
public List<ClientStatus> clientStatus() {
return this.clients.keySet().stream()
.map(Client::status)
.map(Client::getStatus)
.toList();
}
@@ -122,7 +122,7 @@ public class Room extends OperatorAdapter {
if(clientWrapper != null) {
return clientWrapper;
}
log.info("终端进入房间:{} - {}", this.roomId, client.clientId());
log.info("终端进入房间:{} - {}", this.roomId, client.getClientId());
clientWrapper = new ClientWrapper(this, client);
this.clients.put(client, clientWrapper);
this.roomStatus.setClientSize(this.roomStatus.getClientSize() + 1);
@@ -139,7 +139,7 @@ public class Room extends OperatorAdapter {
synchronized (this.clients) {
final ClientWrapper wrapper = this.clients.remove(client);
if(wrapper != null) {
log.info("终端离开房间:{} - {}", this.roomId, client.clientId());
log.info("终端离开房间:{} - {}", this.roomId, client.getClientId());
try {
wrapper.close();
} catch (Exception e) {
@@ -179,7 +179,7 @@ public class Room extends OperatorAdapter {
*/
public void unicast(String to, Message message) {
this.clients.keySet().stream()
.filter(v -> Objects.equals(to, v.clientId()))
.filter(v -> Objects.equals(to, v.getClientId()))
.forEach(v -> v.push(message));
}
@@ -200,7 +200,7 @@ public class Room extends OperatorAdapter {
*/
public void broadcast(String from, Message message) {
this.clients.keySet().stream()
.filter(v -> !Objects.equals(from, v.clientId()))
.filter(v -> !Objects.equals(from, v.getClientId()))
.forEach(v -> v.push(message));
}

View File

@@ -91,7 +91,7 @@ public class RoomManager {
*/
public void recreate(Client mediaClient, Message message) {
this.rooms.stream()
.filter(room -> mediaClient.clientId().equals(room.getMediaClient().clientId()))
.filter(room -> mediaClient.getClientId().equals(room.getMediaClient().getClientId()))
.forEach(room -> {
log.info("重建房间:{}", room.getRoomId());
final Message clone = message.cloneWithoutBody();

View File

@@ -84,7 +84,7 @@ public class Transport extends OperatorAdapter {
this.room = room;
this.client = client;
this.roomId = room.getRoomId();
this.clientId = client.clientId();
this.clientId = client.getClientId();
}
/**

View File

@@ -53,7 +53,7 @@ public class Session implements Closeable {
* @param message 消息
*/
public void pushOther(String clientId, Message message) {
if(this.source.clientId().equals(clientId)) {
if(this.source.getClientId().equals(clientId)) {
this.target.push(message);
} else {
this.source.push(message);

View File

@@ -35,7 +35,7 @@ public class SessionManager {
public Session call(Client source, Client target) {
final Session session = new Session(this.idService.buildUuid(), source, target);
this.sessions.put(session.getId(), session);
log.info("创建视频会话:{} - {} - {}", session.getId(), session.getSource().clientId(), session.getTarget().clientId());
log.info("创建视频会话:{} - {} - {}", session.getId(), session.getSource().getClientId(), session.getTarget().getClientId());
return session;
}
@@ -56,7 +56,7 @@ public class SessionManager {
public Session remove(String sessionId) {
final Session session = this.sessions.remove(sessionId);
if(session != null) {
log.info("视频会话关闭:{} - {} - {}", sessionId, session.getSource().clientId(), session.getTarget().clientId());
log.info("视频会话关闭:{} - {} - {}", sessionId, session.getSource().getClientId(), session.getTarget().getClientId());
}
return session;
}

View File

@@ -19,7 +19,7 @@ public abstract class ProtocolClientAdapter extends ProtocolAdapter {
@Override
public void execute(Client client, Message message) {
this.execute(client.clientId(), client.clientType(), client, message, message.body());
this.execute(client.getClientId(), client.getClientType(), client, message, message.body());
}
/**

View File

@@ -120,7 +120,7 @@ public class ProtocolManager {
return;
}
if(log.isDebugEnabled()) {
log.debug("执行信令消息:{} - {}", client.clientId(), content);
log.debug("执行信令消息:{} - {}", client.getClientId(), content);
}
if(protocol instanceof ClientRegisterProtocol) {
protocol.execute(client, message);

View File

@@ -54,7 +54,7 @@ public class ClientAlarmProtocol extends ProtocolClientAdapter {
alarmMessage,
alarmDatetime
);
final ClientStatus status = client.status();
final ClientStatus status = client.getStatus();
status.setAlarming(Boolean.TRUE);
// 业务逻辑
}

View File

@@ -61,11 +61,11 @@ public class ClientCloseProtocol extends ProtocolClientAdapter implements Applic
* @param client 终端
*/
private void close(Client client) {
if(client == null || !client.authorized()) {
if(client == null || client.unauthorized()) {
// 没有授权终端
return;
}
final String clientId = client.clientId();
final String clientId = client.getClientId();
log.info("关闭终端:{}", clientId);
// 释放房间终端
this.roomManager.leave(client);

View File

@@ -53,7 +53,7 @@ public class ClientConfigProtocol extends ProtocolClientAdapter implements Appli
@Override
public void onApplicationEvent(ClientConfigEvent event) {
final Client client = event.getClient();
final ClientType clientType = client.clientType();
final ClientType clientType = client.getClientType();
client.push(this.build(clientType));
}

View File

@@ -46,7 +46,7 @@ public class ClientHeartbeatProtocol extends ProtocolClientAdapter {
@Override
public void execute(String clientId, ClientType clientType, Client client, Message message, Map<String, Object> body) {
client.push(message.cloneWithoutBody());
final ClientStatus status = client.status();
final ClientStatus status = client.getStatus();
status.copy(body);
}

View File

@@ -53,7 +53,7 @@ public class ClientOnlineProtocol extends ProtocolClientAdapter implements Appli
final String clientId = event.getClientId();
this.clientManager.broadcast(
clientId,
this.build(client.status())
this.build(client.getStatus())
);
}

View File

@@ -110,8 +110,8 @@ public class ClientRegisterProtocol extends ProtocolClientAdapter {
* @return 终端状态
*/
private ClientStatus buildStatus(String clientId, ClientType clientType, Client client, Map<String, Object> body) {
final ClientStatus status = client.status();
status.setIp(client.ip());
final ClientStatus status = client.getStatus();
status.setIp(client.getIP());
status.setName(MapUtils.get(body, Constant.NAME));
status.setClientId(clientId);
status.setClientType(clientType);

View File

@@ -64,7 +64,7 @@ public class ControlClientRecordProtocol extends ProtocolControlAdapter implemen
* @param enabled 录像状态
*/
private void updateRecordStatus(Client client, Boolean enabled) {
client.status().setClientRecording(enabled);
client.getStatus().setClientRecording(enabled);
}
}

View File

@@ -63,7 +63,7 @@ public class MediaTransportPlainProtocol extends ProtocolRoomAdapter {
final Map<String, Transport> transports = room.getTransports();
final String transportId = MapUtils.get(responseBody, Constant.TRANSPORT_ID);
// 重写地址
this.rewriteIp(client.ip(), responseBody);
this.rewriteIp(client.getIP(), responseBody);
// 处理逻辑
final ClientWrapper clientWrapper = room.clientWrapper(client);
// 生产者

View File

@@ -64,7 +64,7 @@ public class MediaTransportWebRtcCreateProtocol extends ProtocolRoomAdapter {
final Map<String, Transport> transports = room.getTransports();
final String transportId = MapUtils.get(responseBody, Constant.TRANSPORT_ID);
// 重写地址
this.rewriteIp(client.ip(), responseBody);
this.rewriteIp(client.getIP(), responseBody);
// 处理逻辑
final ClientWrapper clientWrapper = room.clientWrapper(client);
// 消费者

View File

@@ -103,7 +103,7 @@ public class RoomEnterProtocol extends ProtocolRoomAdapter {
message.setBody(Map.of(
Constant.ROOM_ID, room.getRoomId(),
Constant.CLIENT_ID, clientId,
Constant.STATUS, client.status()
Constant.STATUS, client.getStatus()
));
room.broadcast(message);
// 进入房间事件

View File

@@ -47,7 +47,7 @@ public class RoomLeaveProtocol extends ProtocolRoomAdapter implements Applicatio
final Client client = event.getClient();
final Map<String, String> body = Map.of(
Constant.ROOM_ID, room.getRoomId(),
Constant.CLIENT_ID, client.clientId()
Constant.CLIENT_ID, client.getClientId()
);
room.broadcast(client, this.build(body));
}

View File

@@ -52,15 +52,15 @@ public class SessionCallProtocol extends ProtocolSessionAdapter {
}
final Session session = this.sessionManager.call(client, target);
message.setBody(Map.of(
Constant.NAME, target.status().getName(),
Constant.CLIENT_ID, target.clientId(),
Constant.NAME, target.getName(),
Constant.CLIENT_ID, target.getClientId(),
Constant.SESSION_ID, session.getId()
));
client.push(message);
final Message callMessage = message.cloneWithoutBody();
callMessage.setBody(Map.of(
Constant.NAME, client.status().getName(),
Constant.CLIENT_ID, client.clientId(),
Constant.NAME, client.getName(),
Constant.CLIENT_ID, client.getClientId(),
Constant.SESSION_ID, session.getId(),
Constant.AUDIO, MapUtils.get(body, Constant.AUDIO, true),
Constant.VIDEO, MapUtils.get(body, Constant.VIDEO, true)