[+] 结构调整

This commit is contained in:
acgist
2022-11-11 19:28:02 +08:00
parent e668670da8
commit 605e0fbbe7
46 changed files with 841 additions and 215 deletions

View File

@@ -22,17 +22,19 @@ public class RegisterListener extends ApplicationListenerAdapter<RegisterEvent>
@Autowired
private OnlineProtocol onlineProtocol;
@Async
@Override
public void onApplicationEvent(RegisterEvent event) {
final ClientSession session = event.getSession();
if(!session.authorized()) {
if (!session.authorized()) {
return;
}
final Message message = this.onlineProtocol.build();
message.setBody(Map.of("sn", session.sn()));
this.clientSessionManager.broadcast(message);
this.clientSessionManager.broadcast(session.sn(), message);
// TODOip等等
// TODO重新注册上来需要掉线重连
}
}

View File

@@ -0,0 +1,30 @@
package com.acgist.taoyao.signal.media;
/**
* 终端媒体操作
*
* @author acgist
*/
public interface ClientMediaHandler {
/**
* 打开
*/
void open(String id);
/**
* 暂停
*/
void pause();
/**
* 恢复
*/
void resume();
/**
* 关闭
*/
void close();
}

View File

@@ -0,0 +1,14 @@
package com.acgist.taoyao.signal.media;
/**
* 终端推流
*/
public class ClientMediaPublisher {
public void publish(String id) {
}
public void unpublish(String id) {
}
}

View File

@@ -0,0 +1,40 @@
package com.acgist.taoyao.signal.media;
/**
* 终端媒体订阅者(终端拉流)
*
* @author acgist
*/
public class ClientMediaSubscriber implements ClientMediaHandler {
public void subscribe(String id) {
}
public void unsubscribe(String id) {
}
@Override
public void open(String id) {
// TODO Auto-generated method stub
}
@Override
public void pause() {
// TODO Auto-generated method stub
}
@Override
public void resume() {
// TODO Auto-generated method stub
}
@Override
public void close() {
// TODO Auto-generated method stub
}
}

View File

@@ -0,0 +1,10 @@
package com.acgist.taoyao.signal.media.router;
/**
* 终端媒体流路由(推流->拉流)
*
* @author acgist
*/
public interface ClientMediaStreamRouter {
}

View File

@@ -0,0 +1,10 @@
package com.acgist.taoyao.signal.media.router;
import com.acgist.taoyao.signal.media.stream.ClientMediaStream;
public class ClientMediaStreamRouterAdapter {
ClientMediaStream source;
ClientMediaStream target;
}

View File

@@ -0,0 +1,89 @@
package com.acgist.taoyao.signal.media.stream;
/**
* 终端媒体流
*
* @author acgist
*/
public interface ClientMediaStream {
/**
* 终端媒体类型
*
* @author acgist
*/
public enum Type {
/**
* 音频
*/
AUDIO,
/**
* 视频
*/
VIDEO;
}
/**
* 终端媒体流状态
*
* @author acgist
*/
public enum Status {
/**
* 没有激活
*/
IDLE,
/**
* 已经激活
*/
BUSY,
/**
* 已经暂停
*/
PAUSE,
/**
* 已经关闭
*/
CLOSE;
}
/**
* @return 终端媒体流ID
*/
String id();
/**
* 打开终端媒体流
*/
void open();
/**
* 暂停终端媒体流
*/
void pause();
/**
* 恢复终端媒体流
*/
void resume();
/**
* 关闭终端媒体流
*/
void close();
/**
* @return 终端媒体流类型
*/
Type type();
/**
* @return 终端媒体流状态
*/
Status status();
}

View File

@@ -0,0 +1,19 @@
package com.acgist.taoyao.signal.media.stream;
/**
* 终端媒体流适配器
*
* @author acgist
*/
public abstract class ClientMediaStreamAdapter<T> implements ClientMediaStream {
/**
* 媒体标识
*/
private String id;
/**
* 真实流
*/
protected T stream;
}

View File

@@ -60,6 +60,7 @@ public class ProtocolManager {
* @param instance 会话实例
*/
public void execute(String message, AutoCloseable instance) {
log.debug("执行信令消息:{}", message);
if(StringUtils.isEmpty(message)) {
log.warn("消息为空:{}", message);
return;

View File

@@ -1,6 +1,8 @@
package com.acgist.taoyao.signal.session;
import com.acgist.taoyao.boot.model.Message;
import com.acgist.taoyao.signal.media.ClientMediaPublisher;
import com.acgist.taoyao.signal.media.ClientMediaSubscriber;
/**
* 会话
@@ -15,6 +17,21 @@ public interface ClientSession extends AutoCloseable {
* @return 终端标识
*/
String sn();
/**
* @return 终端状态
*/
ClientSessionStatus status();
/**
* @return 终端媒体发布者
*/
ClientMediaPublisher publisher();
/**
* @return 终端媒体订阅者
*/
ClientMediaSubscriber subscriber();
/**
* 推送消息

View File

@@ -2,6 +2,9 @@ package com.acgist.taoyao.signal.session;
import org.apache.commons.lang3.StringUtils;
import com.acgist.taoyao.signal.media.ClientMediaPublisher;
import com.acgist.taoyao.signal.media.ClientMediaSubscriber;
/**
* 会话适配器
*
@@ -25,11 +28,26 @@ public abstract class ClientSessionAdapter<T extends AutoCloseable> implements C
* 是否授权
*/
protected boolean authorized;
/**
* 终端状态
*/
protected ClientSessionStatus status;
/**
* 终端媒体发布者
*/
protected ClientMediaPublisher publisher;
/**
* 终端媒体订阅者
*/
protected ClientMediaSubscriber subscriber;
protected ClientSessionAdapter(T instance) {
this.time = System.currentTimeMillis();
this.instance = instance;
this.authorized = false;
this.status = new ClientSessionStatus();
this.publisher = new ClientMediaPublisher();
this.subscriber = new ClientMediaSubscriber();
}
@Override
@@ -37,6 +55,21 @@ public abstract class ClientSessionAdapter<T extends AutoCloseable> implements C
return this.sn;
}
@Override
public ClientSessionStatus status() {
return this.status;
}
@Override
public ClientMediaPublisher publisher() {
return this.publisher;
}
@Override
public ClientMediaSubscriber subscriber() {
return this.subscriber;
}
@Override
public boolean timeout(long timeout) {
return !(this.authorized && System.currentTimeMillis() - this.time <= timeout);

View File

@@ -101,9 +101,8 @@ public class ClientSessionManager {
try {
if(session != null) {
session.close();
} else {
instance.close();
}
instance.close();
} catch (Exception e) {
log.error("关闭会话异常", e);
} finally {

View File

@@ -1,12 +1,36 @@
package com.acgist.taoyao.signal.session;
import lombok.Getter;
import lombok.Setter;
/**
* 终端状态
*
* @author acgist
*/
@Getter
@Setter
public class ClientSessionStatus {
/**
* 终端标识
*/
private String sn;
/**
* IP
*/
private String ip;
/**
* MAC
*/
private String mac;
/**
* 信号强度0~100
*/
private Integer signal = 0;
/**
* 电量0~100
*/
private Integer battery = 0;
}

View File

@@ -0,0 +1,36 @@
package com.acgist.taoyao.signal.session.socket;
import java.io.IOException;
import java.net.Socket;
import com.acgist.taoyao.boot.model.Message;
import com.acgist.taoyao.signal.session.ClientSessionAdapter;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
/**
* Socket会话
*
* @author acgist
*/
@Slf4j
@Getter
@Setter
public class SocketSession extends ClientSessionAdapter<Socket> {
public SocketSession(Socket instance) {
super(instance);
}
@Override
public void push(Message message) {
try {
this.instance.getOutputStream().write(message.toString().getBytes());
} catch (IOException e) {
log.error("Socket发送消息异常{}", message, e);
}
}
}

View File

@@ -1,7 +1,5 @@
package com.acgist.taoyao.signal.session.websocket;
import java.io.IOException;
import javax.websocket.Session;
import com.acgist.taoyao.boot.model.Message;
@@ -28,8 +26,12 @@ public class WebSocketSession extends ClientSessionAdapter<Session> {
@Override
public void push(Message message) {
try {
this.instance.getBasicRemote().sendText(message.toString());
} catch (IOException e) {
if(this.instance.isOpen()) {
this.instance.getBasicRemote().sendText(message.toString());
} else {
log.error("会话已经关闭:{}", this.instance);
}
} catch (Exception e) {
log.error("WebSocket发送消息异常{}", message, e);
}
}

View File

@@ -1,7 +1,5 @@
package com.acgist.taoyao.signal.session.websocket;
import java.io.IOException;
import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnMessage;
@@ -70,8 +68,12 @@ public class WebSocketSignal {
*/
private void push(Session session, Message message) {
try {
session.getBasicRemote().sendText(message.toString());
} catch (IOException e) {
if(session.isOpen()) {
session.getBasicRemote().sendText(message.toString());
} else {
log.error("会话已经关闭:{}", session);
}
} catch (Exception e) {
log.error("推送消息异常:{}", message, e);
}
}