[*] 统一消息模型

This commit is contained in:
acgist
2022-11-11 07:59:36 +08:00
parent ad76cf7fc4
commit e668670da8
57 changed files with 1542 additions and 764 deletions

View File

@@ -1,14 +1,11 @@
package com.acgist.taoyao.signal.config;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
import com.acgist.taoyao.signal.protocol.ProtocolManager;
import com.acgist.taoyao.signal.session.SessionManager;
import com.acgist.taoyao.signal.session.websocket.TaoyaoWebSocket;
import com.acgist.taoyao.signal.session.websocket.WebSocketSignal;
/**
* 信令配置
@@ -18,14 +15,10 @@ import com.acgist.taoyao.signal.session.websocket.TaoyaoWebSocket;
@Configuration
public class SignalAutoConfiguration {
@Autowired
private ProtocolManager eventManager;
@Autowired
private SessionManager sessionManager;
@Autowired
public TaoyaoWebSocket taoyaoWebSocket() {
return new TaoyaoWebSocket(this.eventManager, this.sessionManager);
@Bean
@ConditionalOnMissingBean
public WebSocketSignal webSocketSignal() {
return new WebSocketSignal();
}
@Bean

View File

@@ -1,15 +0,0 @@
package com.acgist.taoyao.signal.event;
import com.acgist.taoyao.signal.protocol.Protocol;
/**
* 事件
* 事件主要负责执行信令
*
* @author acgist
*
* @see Protocol
*/
public interface Event {
}

View File

@@ -1,10 +0,0 @@
package com.acgist.taoyao.signal.event;
/**
* 注册事件
*
* @author acgist
*/
public class EventAdapter {
}

View File

@@ -1,10 +1,31 @@
package com.acgist.taoyao.signal.event.client;
import org.springframework.context.ApplicationEvent;
import com.acgist.taoyao.signal.session.ClientSession;
import lombok.Getter;
import lombok.Setter;
/**
* 注册事件
* 终端注册事件
*
* @author acgist
*/
public class RegisterEvent {
@Getter
@Setter
public class RegisterEvent extends ApplicationEvent {
private static final long serialVersionUID = 1L;
/**
* 会话
*/
private ClientSession session;
public RegisterEvent(ClientSession session) {
super(session);
this.session = session;
}
}

View File

@@ -0,0 +1,21 @@
package com.acgist.taoyao.signal.listener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationListener;
import com.acgist.taoyao.signal.session.ClientSessionManager;
/**
* 事件监听
*
* @param <E> 事件泛型
*
* @author acgist
*/
public abstract class ApplicationListenerAdapter<E extends ApplicationEvent> implements ApplicationListener<E> {
@Autowired
protected ClientSessionManager clientSessionManager;
}

View File

@@ -0,0 +1,38 @@
package com.acgist.taoyao.signal.listener.client;
import java.util.Map;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import com.acgist.taoyao.boot.model.Message;
import com.acgist.taoyao.signal.event.client.RegisterEvent;
import com.acgist.taoyao.signal.listener.ApplicationListenerAdapter;
import com.acgist.taoyao.signal.protocol.client.OnlineProtocol;
import com.acgist.taoyao.signal.session.ClientSession;
/**
* 终端注册监听
*
* @author acgist
*/
@Component
public class RegisterListener extends ApplicationListenerAdapter<RegisterEvent> {
@Autowired
private OnlineProtocol onlineProtocol;
@Async
@Override
public void onApplicationEvent(RegisterEvent event) {
final ClientSession session = event.getSession();
if(!session.authorized()) {
return;
}
final Message message = this.onlineProtocol.build();
message.setBody(Map.of("sn", session.sn()));
this.clientSessionManager.broadcast(message);
}
}

View File

@@ -1,38 +0,0 @@
package com.acgist.taoyao.signal.message;
import java.io.Serializable;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
/**
* 信令头部
*
* @author acgist
*/
@Getter
@Setter
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class Header implements Serializable {
private static final long serialVersionUID = 1L;
/**
* 信令来源
*/
private String sn;
/**
* 事件标识
*/
private String event;
/**
* 信令版本
*/
private String version;
}

View File

@@ -1,41 +0,0 @@
package com.acgist.taoyao.signal.message;
import java.io.Serializable;
import com.acgist.taoyao.boot.utils.JSONUtils;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
/**
* 信令消息
*
* @author acgist
*/
@Getter
@Setter
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class Message implements Serializable {
private static final long serialVersionUID = 1L;
/**
* 信令头部
*/
private Header header;
/**
* 信令主体
*/
private Object body;
@Override
public String toString() {
return JSONUtils.toJSON(this);
}
}

View File

@@ -1,5 +1,10 @@
package com.acgist.taoyao.signal.protocol;
import org.springframework.context.ApplicationEvent;
import com.acgist.taoyao.boot.model.Message;
import com.acgist.taoyao.signal.session.ClientSession;
/**
* 信令协议
*
@@ -7,9 +12,33 @@ package com.acgist.taoyao.signal.protocol;
* 2000~2999终端信令注册、注销、终端列表
* 3000~3999直播信令
* 4000~4999会议信令
* 9999信令异常
*
* @author acgist
*/
public interface Protocol {
/**
* @return 信令协议标识
*/
Integer protocol();
/**
* 处理信令消息
*
* @param sn 终端标识
* @param message 信令消息
* @param session 会话
*
* @return 事件
*/
ApplicationEvent execute(String sn, Message message, ClientSession session);
/**
* 创建信令消息
*
* @return 信令消息
*/
Message build();
}

View File

@@ -1,5 +1,49 @@
package com.acgist.taoyao.signal.protocol;
public class ProtocolAdapter {
import org.springframework.beans.factory.annotation.Autowired;
import com.acgist.taoyao.boot.config.TaoyaoProperties;
import com.acgist.taoyao.boot.model.Header;
import com.acgist.taoyao.boot.model.Message;
import com.acgist.taoyao.boot.service.IdService;
/**
* 信令协议适配器
*
* @author acgist
*/
public abstract class ProtocolAdapter implements Protocol {
@Autowired
private IdService idService;
@Autowired
protected TaoyaoProperties taoyaoProperties;
/**
* 信令协议标识
*/
protected final Integer protocol;
protected ProtocolAdapter(Integer protocol) {
this.protocol = protocol;
}
@Override
public Integer protocol() {
return this.protocol;
}
@Override
public Message build() {
final Header header = Header.builder()
.v(this.taoyaoProperties.getVersion())
.id(this.idService.id())
.pid(this.protocol)
.build();
final Message message = Message.builder()
.header(header)
.build();
return message;
}
}

View File

@@ -0,0 +1,44 @@
package com.acgist.taoyao.signal.protocol;
import java.util.Map;
import org.springframework.context.ApplicationEvent;
import com.acgist.taoyao.boot.model.Message;
import com.acgist.taoyao.boot.model.MessageCodeException;
import com.acgist.taoyao.signal.session.ClientSession;
/**
* 信令协议Map主体适配器
*
* @author acgist
*/
public abstract class ProtocolBodyMapAdapter extends ProtocolAdapter {
protected ProtocolBodyMapAdapter(Integer protocol) {
super(protocol);
}
@Override
public ApplicationEvent execute(String sn, Message message, ClientSession session) {
final Object body = message.getBody();
if(body instanceof Map<?, ?> map) {
return this.execute(sn, map, message, session);
} else {
throw MessageCodeException.of("信令主体类型错误:" + message);
}
}
/**
* 处理信令消息
*
* @param sn 终端标识
* @param body 消息主体
* @param message 信令消息
* @param session 会话
*
* @return 事件
*/
public abstract ApplicationEvent execute(String sn, Map<?, ?> body, Message message, ClientSession session);
}

View File

@@ -0,0 +1,55 @@
package com.acgist.taoyao.signal.protocol;
import java.util.Map;
import org.springframework.cglib.beans.BeanMap;
import org.springframework.context.ApplicationEvent;
import com.acgist.taoyao.boot.model.Message;
import com.acgist.taoyao.boot.model.MessageCodeException;
import com.acgist.taoyao.boot.utils.BeanUtils;
import com.acgist.taoyao.signal.session.ClientSession;
/**
* 信令协议对象主体适配器
*
* @author acgist
*/
public abstract class ProtocolBodyObjectAdapter<T> extends ProtocolAdapter {
/**
* 对象类型
*/
private final Class<T> clazz;
protected ProtocolBodyObjectAdapter(Integer protocol, Class<T> clazz) {
super(protocol);
this.clazz = clazz;
}
@Override
public ApplicationEvent execute(String sn, Message message, ClientSession session) {
final Object body = message.getBody();
if(body instanceof Map<?, ?> map) {
final T t = BeanUtils.newInstance(this.clazz);
final BeanMap beanMap = BeanMap.create(t);
beanMap.putAll(map);
return this.execute(sn, t, message, session);
} else {
throw MessageCodeException.of("信令主体类型错误:" + message);
}
}
/**
* 处理信令消息
*
* @param sn 终端标识
* @param body 消息主体
* @param message 信令消息
* @param session 会话
*
* @return 事件
*/
public abstract ApplicationEvent execute(String sn, T body, Message message, ClientSession session);
}

View File

@@ -1,20 +1,98 @@
package com.acgist.taoyao.signal.protocol;
import javax.websocket.Session;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.PostConstruct;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationEvent;
import org.springframework.stereotype.Service;
import com.acgist.taoyao.boot.model.Header;
import com.acgist.taoyao.boot.model.Message;
import com.acgist.taoyao.boot.model.MessageCodeException;
import com.acgist.taoyao.boot.utils.JSONUtils;
import com.acgist.taoyao.signal.protocol.client.RegisterProtocol;
import com.acgist.taoyao.signal.session.ClientSession;
import com.acgist.taoyao.signal.session.ClientSessionManager;
import lombok.extern.slf4j.Slf4j;
/**
* 协议管理
*
* @author acgist
*/
@Slf4j
@Service
public class ProtocolManager {
public void execute(Session session, String message) {
// TODO Auto-generated method stub
/**
* 协议映射
*/
private Map<Integer, Protocol> protocolMapping = new ConcurrentHashMap<>();
@Autowired
private ApplicationContext context;
@Autowired
private ClientSessionManager clientSessionManager;
@PostConstruct
public void init() {
final Map<String, Protocol> map = this.context.getBeansOfType(Protocol.class);
map.forEach((k, v) -> {
final Integer protocol = v.protocol();
if(this.protocolMapping.containsKey(protocol)) {
throw MessageCodeException.of("存在重复信令协议:" + protocol);
}
log.info("注册信令协议:{}-{}", protocol, k);
this.protocolMapping.put(protocol, v);
});
}
/**
* 执行信令消息
*
* @param message 信令消息
* @param instance 会话实例
*/
public void execute(String message, AutoCloseable instance) {
if(StringUtils.isEmpty(message)) {
log.warn("消息为空:{}", message);
return;
}
final Message value = JSONUtils.toJava(message, Message.class);
final Header header = value.getHeader();
if(header == null) {
log.warn("消息格式错误(没有头部):{}", message);
return;
}
final String sn = header.getSn();
final Integer pid = header.getPid();
if(sn == null || pid == null) {
log.warn("消息格式错误没有SN或者PID{}", message);
return;
}
final Protocol protocol = this.protocolMapping.get(pid);
if(protocol == null) {
log.warn("不支持的信令协议:{}", message);
return;
}
ApplicationEvent event = null;
final ClientSession session = this.clientSessionManager.session(instance);
if(session != null && protocol instanceof RegisterProtocol) {
event = protocol.execute(sn, value, session);
} else if(session != null) {
event = protocol.execute(sn, value, session);
} else {
log.warn("会话没有权限:{}", message);
}
if(event != null) {
this.context.publishEvent(event);
}
}
}

View File

@@ -0,0 +1,33 @@
package com.acgist.taoyao.signal.protocol.client;
import org.springframework.context.ApplicationEvent;
import org.springframework.stereotype.Component;
import com.acgist.taoyao.boot.model.Message;
import com.acgist.taoyao.signal.protocol.ProtocolAdapter;
import com.acgist.taoyao.signal.session.ClientSession;
/**
* 关闭信令协议
*
* @author acgist
*/
@Component
public class CloseProtocol extends ProtocolAdapter {
/**
* 信令协议标识
*/
public static final Integer PID = 2001;
public CloseProtocol() {
super(PID);
}
@Override
public ApplicationEvent execute(String sn, Message message, ClientSession session) {
// TODO
return null;
}
}

View File

@@ -0,0 +1,10 @@
package com.acgist.taoyao.signal.protocol.client;
public class OfflineProtocol {
/**
* 信令协议标识
*/
public static final Integer PID = 2003;
}

View File

@@ -0,0 +1,32 @@
package com.acgist.taoyao.signal.protocol.client;
import org.springframework.context.ApplicationEvent;
import org.springframework.stereotype.Component;
import com.acgist.taoyao.boot.model.Message;
import com.acgist.taoyao.signal.protocol.ProtocolAdapter;
import com.acgist.taoyao.signal.session.ClientSession;
/**
* 上线信令协议
*
* @author acgist
*/
@Component
public class OnlineProtocol extends ProtocolAdapter {
/**
* 信令协议标识
*/
public static final Integer PID = 2002;
public OnlineProtocol() {
super(PID);
}
@Override
public ApplicationEvent execute(String sn, Message message, ClientSession session) {
return null;
}
}

View File

@@ -0,0 +1,54 @@
package com.acgist.taoyao.signal.protocol.client;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationEvent;
import org.springframework.stereotype.Component;
import com.acgist.taoyao.boot.config.SecurityProperties;
import com.acgist.taoyao.boot.model.Message;
import com.acgist.taoyao.boot.model.MessageCode;
import com.acgist.taoyao.signal.event.client.RegisterEvent;
import com.acgist.taoyao.signal.protocol.ProtocolBodyMapAdapter;
import com.acgist.taoyao.signal.session.ClientSession;
/**
* 注册信令协议
*
* @author acgist
*/
@Component
public class RegisterProtocol extends ProtocolBodyMapAdapter {
/**
* 信令协议标识
*/
public static final Integer PID = 2000;
@Autowired
private SecurityProperties securityProperties;
public RegisterProtocol() {
super(PID);
}
@Override
public ApplicationEvent execute(String sn, Map<?, ?> body, Message message, ClientSession session) {
final String username = (String) body.get("username");
final String password = (String) body.get("password");
if(
StringUtils.equals(this.securityProperties.getUsername(), username) &&
StringUtils.equals(this.securityProperties.getPassword(), password)
) {
session.authorize(sn);
message.setCode(MessageCode.CODE_0000);
} else {
message.setCode(MessageCode.CODE_3401);
}
session.push(message);
return new RegisterEvent(session);
}
}

View File

@@ -0,0 +1,32 @@
package com.acgist.taoyao.signal.protocol.system;
import org.springframework.context.ApplicationEvent;
import org.springframework.stereotype.Component;
import com.acgist.taoyao.boot.model.Message;
import com.acgist.taoyao.signal.protocol.ProtocolAdapter;
import com.acgist.taoyao.signal.session.ClientSession;
/**
* 异常信令协议
*
* @author acgist
*/
@Component
public class ErrorProtocol extends ProtocolAdapter {
/**
* 信令协议标识
*/
public static final Integer PID = 9999;
public ErrorProtocol() {
super(PID);
}
@Override
public ApplicationEvent execute(String sn, Message message, ClientSession session) {
return null;
}
}

View File

@@ -0,0 +1,66 @@
package com.acgist.taoyao.signal.session;
import com.acgist.taoyao.boot.model.Message;
/**
* 会话
*
* @author acgist
*
* @param <T> 会话类型
*/
public interface ClientSession extends AutoCloseable {
/**
* @return 终端标识
*/
String sn();
/**
* 推送消息
*
* @param message 消息
*/
void push(Message message);
/**
* @param timeout 超时时间
*
* @return 是否超时会话
*/
boolean timeout(long timeout);
/**
* 设置授权
*
* @param sn 重点标识
*/
void authorize(String sn);
/**
* @return 是否授权
*/
boolean authorized();
/**
* @param sn 终端标识
*
* @return 终端标识是否匹配
*/
boolean matchSn(String sn);
/**
* @param sn 终端标识
*
* @return 终端标识是否匹配失败
*/
boolean matchNoneSn(String sn);
/**
* @param instance 会话实例
*
* @return 会话实例是否匹配
*/
<M extends AutoCloseable> boolean matchInstance(M instance);
}

View File

@@ -0,0 +1,88 @@
package com.acgist.taoyao.signal.session;
import org.apache.commons.lang3.StringUtils;
/**
* 会话适配器
*
* @author acgist
*/
public abstract class ClientSessionAdapter<T extends AutoCloseable> implements ClientSession {
/**
* 终端标识
*/
protected String sn;
/**
* 进入时间
*/
protected final long time;
/**
* 会话实例
*/
protected final T instance;
/**
* 是否授权
*/
protected boolean authorized;
protected ClientSessionAdapter(T instance) {
this.time = System.currentTimeMillis();
this.instance = instance;
this.authorized = false;
}
@Override
public String sn() {
return this.sn;
}
@Override
public boolean timeout(long timeout) {
return !(this.authorized && System.currentTimeMillis() - this.time <= timeout);
}
@Override
public void authorize(String sn) {
this.sn = sn;
this.authorized = true;
}
@Override
public boolean authorized() {
return this.authorized;
}
@Override
public boolean matchSn(String sn) {
return StringUtils.equals(sn, this.sn);
}
@Override
public boolean matchNoneSn(String sn) {
return !StringUtils.equals(sn, this.sn);
}
@Override
public <I extends AutoCloseable> boolean matchInstance(I instance) {
return instance == this.instance;
}
@Override
public void close() throws Exception {
try {
this.instance.close();
} finally {
// TODO退出房间
// TODO退出帐号
}
}
/**
* @return 会话实例
*/
public T instance() {
return this.instance;
}
}

View File

@@ -0,0 +1,129 @@
package com.acgist.taoyao.signal.session;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import com.acgist.taoyao.boot.config.TaoyaoProperties;
import com.acgist.taoyao.boot.model.Message;
import lombok.extern.slf4j.Slf4j;
/**
* 会话管理
*
* @author acgist
*/
@Slf4j
@Service
public class ClientSessionManager {
@Autowired
private TaoyaoProperties taoyaoProperties;
/**
* 会话列表
*/
private List<ClientSession> sessions = new CopyOnWriteArrayList<>();
@Scheduled(cron = "${taoyao.scheduled.session:0 * * * * ?}")
public void scheduled() {
this.closeTimeoutSession();
}
/**
* @param session 会话
*/
public void open(ClientSession session) {
this.sessions.add(session);
}
/**
* @param instance 会话实例
*
* @return 会话
*/
public ClientSession session(AutoCloseable instance) {
return this.sessions.stream()
.filter(v -> v.matchInstance(instance))
.findFirst()
.orElse(null);
}
/**
* 单播消息
*
* @param to 接收终端
* @param message 消息
*/
public void unicast(String to, Message message) {
this.sessions.stream().filter(v -> v.matchSn(to)).forEach(v -> {
message.getHeader().setSn(v.sn());
v.push(message);
});
}
/**
* 广播消息
*
* @param message 消息
*/
public void broadcast(Message message) {
this.sessions.forEach(v -> {
message.getHeader().setSn(v.sn());
v.push(message);
});
}
/**
* 广播消息
*
* @param from 发送终端
* @param message 消息
*/
public void broadcast(String from, Message message) {
this.sessions.stream().filter(v -> v.matchNoneSn(from)).forEach(v -> {
message.getHeader().setSn(v.sn());
v.push(message);
});
}
/**
* 关闭会话
*
* @param instance 会话实例
*/
public void close(AutoCloseable instance) {
final ClientSession session = this.session(instance);
try {
if(session != null) {
session.close();
} else {
instance.close();
}
} catch (Exception e) {
log.error("关闭会话异常", e);
} finally {
if(session != null) {
this.sessions.remove(session);
}
}
}
/**
* 定时关闭超时会话
*/
private void closeTimeoutSession() {
log.debug("定时关闭超时会话");
this.sessions.stream()
.filter(v -> v.timeout(this.taoyaoProperties.getTimeout()))
.forEach(v -> {
log.debug("关闭超时会话:{}", v);
this.close(v);
});
}
}

View File

@@ -0,0 +1,12 @@
package com.acgist.taoyao.signal.session;
/**
* 终端状态
*
* @author acgist
*/
public class ClientSessionStatus {
}

View File

@@ -1,5 +0,0 @@
package com.acgist.taoyao.signal.session;
public interface Session {
}

View File

@@ -1,5 +0,0 @@
package com.acgist.taoyao.signal.session;
public class SessionAdapter {
}

View File

@@ -1,162 +0,0 @@
package com.acgist.taoyao.signal.session;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import javax.websocket.Session;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import com.acgist.taoyao.boot.config.TaoyaoProperties;
import com.acgist.taoyao.signal.message.Message;
import com.acgist.taoyao.signal.session.websocket.SessionWrapper;
import lombok.extern.slf4j.Slf4j;
/**
* 会话管理
*
* @author acgist
*/
@Slf4j
@Service
public class SessionManager {
@Autowired
private TaoyaoProperties taoyaoProperties;
/**
* 没有授权会话
*/
private Map<Session, Long> unauthorized = new ConcurrentHashMap<>();
/**
* 授权会话列表
*/
private List<SessionWrapper> sessions = new CopyOnWriteArrayList<>();
@Scheduled(cron = "${taoyao.scheduled.session:0 * * * * ?}")
public void scheduled() {
this.closeTimeoutSession();
}
/**
* 存入没有授权会话,定时清除没有授权会话。
*
* @param session 会话
*/
public void open(Session session) {
this.unauthorized.put(session, System.currentTimeMillis());
}
/**
* @param session 会话
*
* @return 会话包装器
*/
public SessionWrapper getWrapper(Session session) {
return this.sessions.stream()
.filter(v -> v.matchSession(session))
.findFirst()
.orElse(null);
}
/**
* 认证会话
*
* @param sn 终端标识
* @param session 会话
*/
public void authorized(String sn, Session session) {
this.unauthorized.remove(session);
final SessionWrapper wrapper = new SessionWrapper();
wrapper.setSn(sn);
wrapper.setSession(session);
this.sessions.add(wrapper);
}
/**
* 单播消息
*
* @param to 接收终端
* @param message 消息
*/
public void unicast(String to, Message message) {
this.sessions.stream().filter(v -> v.matchSn(to)).forEach(v -> {
message.getHeader().setSn(v.getSn());
message.getHeader().setVersion(this.taoyaoProperties.getVersion());
v.send(message);
});
}
/**
* 广播消息
*
* @param message 消息
*/
public void broadcast(Message message) {
this.sessions.forEach(v -> {
message.getHeader().setSn(v.getSn());
message.getHeader().setVersion(this.taoyaoProperties.getVersion());
v.send(message);
});
}
/**
* 广播消息
*
* @param from 发送终端
* @param message 消息
*/
public void broadcast(String from, Message message) {
this.sessions.stream().filter(v -> v.matchNoneSn(from)).forEach(v -> {
message.getHeader().setSn(v.getSn());
message.getHeader().setVersion(this.taoyaoProperties.getVersion());
v.send(message);
});
}
/**
* 关闭会话
*
* @param session 会话
*/
public void close(Session session) {
final SessionWrapper wrapper = this.getWrapper(session);
if(wrapper != null) {
// TODO退出房间
// TODO退出帐号
// 移除
this.sessions.remove(wrapper);
}
try {
session.close();
} catch (IOException e) {
log.error("关闭会话异常", e);
}
}
/**
* 定时关闭超时会话
*/
private void closeTimeoutSession() {
log.debug("定时关闭超时会话");
final Iterator<Entry<Session, Long>> iterator = this.unauthorized.entrySet().iterator();
while(iterator.hasNext()) {
final Entry<Session, Long> next = iterator.next();
final Long last = next.getValue();
final Session session = next.getKey();
if(System.currentTimeMillis() - last > this.taoyaoProperties.getTimeout()) {
log.debug("关闭超时会话:{}", session);
this.close(session);
}
}
}
}

View File

@@ -5,6 +5,6 @@ package com.acgist.taoyao.signal.session.socket;
*
* @author acgist
*/
public class TaoyaoSocket {
public class SocketSignal {
}

View File

@@ -1,72 +0,0 @@
package com.acgist.taoyao.signal.session.websocket;
import java.io.IOException;
import javax.websocket.Session;
import com.acgist.taoyao.signal.message.Message;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
/**
* 会话包装器
*
* @author acgist
*/
@Slf4j
@Getter
@Setter
public class SessionWrapper {
/**
* 终端帐号
*/
private String sn;
/**
* 会话
*/
private Session session;
/**
* 发送消息
*
* @param message 消息
*/
public void send(Message message) {
try {
this.session.getBasicRemote().sendText(message.toString());
} catch (IOException e) {
log.error("WebSocket发送消息异常{}", message, e);
}
}
/**
* @param sn 终端编号
*
* @return 是否匹配成功
*/
public boolean matchSn(String sn) {
return this.sn != null && this.sn.equals(sn);
}
/**
* @param sn 终端编号
*
* @return 是否匹配失败
*/
public boolean matchNoneSn(String sn) {
return this.sn != null && !this.sn.equals(sn);
}
/**
* @param session 会话
*
* @return 是否匹配成功
*/
public boolean matchSession(Session session) {
return this.session != null && this.session.equals(session);
}
}

View File

@@ -1,60 +0,0 @@
package com.acgist.taoyao.signal.session.websocket;
import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.ServerEndpoint;
import com.acgist.taoyao.signal.protocol.ProtocolManager;
import com.acgist.taoyao.signal.session.SessionManager;
import lombok.extern.slf4j.Slf4j;
/**
* WebSocket信令
*
* @author acgist
*/
@Slf4j
@ServerEndpoint(value = "/taoyao/websocket")
public class TaoyaoWebSocket {
private ProtocolManager eventManager;
private SessionManager sessionManager;
public TaoyaoWebSocket(ProtocolManager eventManager, SessionManager sessionManager) {
this.eventManager = eventManager;
this.sessionManager = sessionManager;
}
@OnOpen
public void open(Session session) {
log.debug("会话连接:{}", session);
this.sessionManager.open(session);
}
@OnMessage
public void message(Session session, String message) {
log.debug("会话消息:{}-{}", session, message);
try {
this.eventManager.execute(session, message);
} catch (Exception e) {
log.error("处理会话消息异常", e);
}
}
@OnClose
public void close(Session session) {
log.debug("会话关闭:{}", session);
this.sessionManager.close(session);
}
@OnError
public void error(Session session, Throwable e) {
log.error("会话异常:{}", session, e);
this.close(session);
}
}

View File

@@ -0,0 +1,37 @@
package com.acgist.taoyao.signal.session.websocket;
import java.io.IOException;
import javax.websocket.Session;
import com.acgist.taoyao.boot.model.Message;
import com.acgist.taoyao.signal.session.ClientSessionAdapter;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
/**
* WebSocket会话
*
* @author acgist
*/
@Slf4j
@Getter
@Setter
public class WebSocketSession extends ClientSessionAdapter<Session> {
public WebSocketSession(Session instance) {
super(instance);
}
@Override
public void push(Message message) {
try {
this.instance.getBasicRemote().sendText(message.toString());
} catch (IOException e) {
log.error("WebSocket发送消息异常{}", message, e);
}
}
}

View File

@@ -0,0 +1,94 @@
package com.acgist.taoyao.signal.session.websocket;
import java.io.IOException;
import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.ServerEndpoint;
import org.springframework.beans.factory.annotation.Autowired;
import com.acgist.taoyao.boot.model.Message;
import com.acgist.taoyao.signal.protocol.ProtocolManager;
import com.acgist.taoyao.signal.protocol.system.ErrorProtocol;
import com.acgist.taoyao.signal.session.ClientSessionManager;
import lombok.extern.slf4j.Slf4j;
/**
* WebSocket信令
*
* @author acgist
*/
@Slf4j
@ServerEndpoint(value = "/websocket.signal")
public class WebSocketSignal {
private static ErrorProtocol errorProtocol;
private static ProtocolManager protocolManager;
private static ClientSessionManager clientSessionManager;
@OnOpen
public void open(Session session) {
log.debug("会话连接:{}", session);
WebSocketSignal.clientSessionManager.open(new WebSocketSession(session));
}
@OnMessage
public void message(Session session, String message) {
log.debug("会话消息:{}-{}", session, message);
try {
WebSocketSignal.protocolManager.execute(message, session);
} catch (Exception e) {
log.error("处理会话消息异常", e);
final Message errorMessage = WebSocketSignal.errorProtocol.build();
errorMessage.setBody(e.getMessage());
this.push(session, errorMessage);
}
}
@OnClose
public void close(Session session) {
log.debug("会话关闭:{}", session);
WebSocketSignal.clientSessionManager.close(session);
}
@OnError
public void error(Session session, Throwable e) {
log.error("会话异常:{}", session, e);
this.close(session);
}
/**
* 推送消息
*
* @param session 会话
* @param message 消息
*/
private void push(Session session, Message message) {
try {
session.getBasicRemote().sendText(message.toString());
} catch (IOException e) {
log.error("推送消息异常:{}", message, e);
}
}
@Autowired
public void setErrorProtocol(ErrorProtocol errorProtocol) {
WebSocketSignal.errorProtocol = errorProtocol;
}
@Autowired
public void setProtocolManager(ProtocolManager protocolManager) {
WebSocketSignal.protocolManager = protocolManager;
}
@Autowired
public void setClientSessionManager(ClientSessionManager clientSessionManager) {
WebSocketSignal.clientSessionManager = clientSessionManager;
}
}