This commit is contained in:
acgist
2022-11-10 07:24:15 +08:00
parent ec9a0ff37a
commit 20e5114f2e
70 changed files with 3096 additions and 75 deletions

View File

@@ -0,0 +1,37 @@
package com.acgist.taoyao.signal.config;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
import com.acgist.taoyao.signal.protocol.ProtocolManager;
import com.acgist.taoyao.signal.session.SessionManager;
import com.acgist.taoyao.signal.session.websocket.TaoyaoWebSocket;
/**
* 信令配置
*
* @author acgist
*/
@Configuration
public class SignalAutoConfiguration {
@Autowired
private ProtocolManager eventManager;
@Autowired
private SessionManager sessionManager;
@Autowired
public TaoyaoWebSocket taoyaoWebSocket() {
return new TaoyaoWebSocket(this.eventManager, this.sessionManager);
}
@Bean
@ConditionalOnMissingBean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}

View File

@@ -0,0 +1,15 @@
package com.acgist.taoyao.signal.event;
import com.acgist.taoyao.signal.protocol.Protocol;
/**
* 事件
* 事件主要负责执行信令
*
* @author acgist
*
* @see Protocol
*/
public interface Event {
}

View File

@@ -0,0 +1,10 @@
package com.acgist.taoyao.signal.event;
/**
* 注册事件
*
* @author acgist
*/
public class EventAdapter {
}

View File

@@ -0,0 +1,10 @@
package com.acgist.taoyao.signal.event.client;
/**
* 注册事件
*
* @author acgist
*/
public class RegisterEvent {
}

View File

@@ -0,0 +1,38 @@
package com.acgist.taoyao.signal.message;
import java.io.Serializable;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
/**
* 信令头部
*
* @author acgist
*/
@Getter
@Setter
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class Header implements Serializable {
private static final long serialVersionUID = 1L;
/**
* 信令来源
*/
private String sn;
/**
* 事件标识
*/
private String event;
/**
* 信令版本
*/
private String version;
}

View File

@@ -0,0 +1,41 @@
package com.acgist.taoyao.signal.message;
import java.io.Serializable;
import com.acgist.taoyao.boot.utils.JSONUtils;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
/**
* 信令消息
*
* @author acgist
*/
@Getter
@Setter
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class Message implements Serializable {
private static final long serialVersionUID = 1L;
/**
* 信令头部
*/
private Header header;
/**
* 信令主体
*/
private Object body;
@Override
public String toString() {
return JSONUtils.toJSON(this);
}
}

View File

@@ -0,0 +1,15 @@
package com.acgist.taoyao.signal.protocol;
/**
* 信令协议
*
* 1000~1999系统信令关机
* 2000~2999终端信令注册、注销、终端列表
* 3000~3999直播信令
* 4000~4999会议信令
*
* @author acgist
*/
public interface Protocol {
}

View File

@@ -0,0 +1,5 @@
package com.acgist.taoyao.signal.protocol;
public class ProtocolAdapter {
}

View File

@@ -0,0 +1,20 @@
package com.acgist.taoyao.signal.protocol;
import javax.websocket.Session;
import org.springframework.stereotype.Service;
/**
* 协议管理
*
* @author acgist
*/
@Service
public class ProtocolManager {
public void execute(Session session, String message) {
// TODO Auto-generated method stub
}
}

View File

@@ -0,0 +1,5 @@
package com.acgist.taoyao.signal.session;
public interface Session {
}

View File

@@ -0,0 +1,5 @@
package com.acgist.taoyao.signal.session;
public class SessionAdapter {
}

View File

@@ -0,0 +1,162 @@
package com.acgist.taoyao.signal.session;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import javax.websocket.Session;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import com.acgist.taoyao.boot.config.TaoyaoProperties;
import com.acgist.taoyao.signal.message.Message;
import com.acgist.taoyao.signal.session.websocket.SessionWrapper;
import lombok.extern.slf4j.Slf4j;
/**
* 会话管理
*
* @author acgist
*/
@Slf4j
@Service
public class SessionManager {
@Autowired
private TaoyaoProperties taoyaoProperties;
/**
* 没有授权会话
*/
private Map<Session, Long> unauthorized = new ConcurrentHashMap<>();
/**
* 授权会话列表
*/
private List<SessionWrapper> sessions = new CopyOnWriteArrayList<>();
@Scheduled(cron = "${taoyao.scheduled.session:0 * * * * ?}")
public void scheduled() {
this.closeTimeoutSession();
}
/**
* 存入没有授权会话,定时清除没有授权会话。
*
* @param session 会话
*/
public void open(Session session) {
this.unauthorized.put(session, System.currentTimeMillis());
}
/**
* @param session 会话
*
* @return 会话包装器
*/
public SessionWrapper getWrapper(Session session) {
return this.sessions.stream()
.filter(v -> v.matchSession(session))
.findFirst()
.orElse(null);
}
/**
* 认证会话
*
* @param sn 终端标识
* @param session 会话
*/
public void authorized(String sn, Session session) {
this.unauthorized.remove(session);
final SessionWrapper wrapper = new SessionWrapper();
wrapper.setSn(sn);
wrapper.setSession(session);
this.sessions.add(wrapper);
}
/**
* 单播消息
*
* @param to 接收终端
* @param message 消息
*/
public void unicast(String to, Message message) {
this.sessions.stream().filter(v -> v.matchSn(to)).forEach(v -> {
message.getHeader().setSn(v.getSn());
message.getHeader().setVersion(this.taoyaoProperties.getVersion());
v.send(message);
});
}
/**
* 广播消息
*
* @param message 消息
*/
public void broadcast(Message message) {
this.sessions.forEach(v -> {
message.getHeader().setSn(v.getSn());
message.getHeader().setVersion(this.taoyaoProperties.getVersion());
v.send(message);
});
}
/**
* 广播消息
*
* @param from 发送终端
* @param message 消息
*/
public void broadcast(String from, Message message) {
this.sessions.stream().filter(v -> v.matchNoneSn(from)).forEach(v -> {
message.getHeader().setSn(v.getSn());
message.getHeader().setVersion(this.taoyaoProperties.getVersion());
v.send(message);
});
}
/**
* 关闭会话
*
* @param session 会话
*/
public void close(Session session) {
final SessionWrapper wrapper = this.getWrapper(session);
if(wrapper != null) {
// TODO退出房间
// TODO退出帐号
// 移除
this.sessions.remove(wrapper);
}
try {
session.close();
} catch (IOException e) {
log.error("关闭会话异常", e);
}
}
/**
* 定时关闭超时会话
*/
private void closeTimeoutSession() {
log.debug("定时关闭超时会话");
final Iterator<Entry<Session, Long>> iterator = this.unauthorized.entrySet().iterator();
while(iterator.hasNext()) {
final Entry<Session, Long> next = iterator.next();
final Long last = next.getValue();
final Session session = next.getKey();
if(System.currentTimeMillis() - last > this.taoyaoProperties.getTimeout()) {
log.debug("关闭超时会话:{}", session);
this.close(session);
}
}
}
}

View File

@@ -0,0 +1,10 @@
package com.acgist.taoyao.signal.session.socket;
/**
* Socket信令
*
* @author acgist
*/
public class TaoyaoSocket {
}

View File

@@ -0,0 +1,72 @@
package com.acgist.taoyao.signal.session.websocket;
import java.io.IOException;
import javax.websocket.Session;
import com.acgist.taoyao.signal.message.Message;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
/**
* 会话包装器
*
* @author acgist
*/
@Slf4j
@Getter
@Setter
public class SessionWrapper {
/**
* 终端帐号
*/
private String sn;
/**
* 会话
*/
private Session session;
/**
* 发送消息
*
* @param message 消息
*/
public void send(Message message) {
try {
this.session.getBasicRemote().sendText(message.toString());
} catch (IOException e) {
log.error("WebSocket发送消息异常{}", message, e);
}
}
/**
* @param sn 终端编号
*
* @return 是否匹配成功
*/
public boolean matchSn(String sn) {
return this.sn != null && this.sn.equals(sn);
}
/**
* @param sn 终端编号
*
* @return 是否匹配失败
*/
public boolean matchNoneSn(String sn) {
return this.sn != null && !this.sn.equals(sn);
}
/**
* @param session 会话
*
* @return 是否匹配成功
*/
public boolean matchSession(Session session) {
return this.session != null && this.session.equals(session);
}
}

View File

@@ -0,0 +1,60 @@
package com.acgist.taoyao.signal.session.websocket;
import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.ServerEndpoint;
import com.acgist.taoyao.signal.protocol.ProtocolManager;
import com.acgist.taoyao.signal.session.SessionManager;
import lombok.extern.slf4j.Slf4j;
/**
* WebSocket信令
*
* @author acgist
*/
@Slf4j
@ServerEndpoint(value = "/taoyao/websocket")
public class TaoyaoWebSocket {
private ProtocolManager eventManager;
private SessionManager sessionManager;
public TaoyaoWebSocket(ProtocolManager eventManager, SessionManager sessionManager) {
this.eventManager = eventManager;
this.sessionManager = sessionManager;
}
@OnOpen
public void open(Session session) {
log.debug("会话连接:{}", session);
this.sessionManager.open(session);
}
@OnMessage
public void message(Session session, String message) {
log.debug("会话消息:{}-{}", session, message);
try {
this.eventManager.execute(session, message);
} catch (Exception e) {
log.error("处理会话消息异常", e);
}
}
@OnClose
public void close(Session session) {
log.debug("会话关闭:{}", session);
this.sessionManager.close(session);
}
@OnError
public void error(Session session, Throwable e) {
log.error("会话异常:{}", session, e);
this.close(session);
}
}

View File

@@ -0,0 +1,2 @@
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.acgist.taoyao.signal.config.SignalAutoConfiguration