This commit is contained in:
acgist
2022-11-23 08:43:57 +08:00
parent 8a19192f1d
commit 46130cc15b
69 changed files with 629 additions and 218 deletions

View File

@@ -345,6 +345,8 @@
### 发布信令5000
Offer/Answer
控制终端推流(服务端拉流)
### 取消发布指令5001
@@ -353,39 +355,38 @@
### 订阅指令5002
Offer/Answer
订阅终端媒体流(终端拉流)
### 取消订阅指令5003
取消订阅终端媒体流(终端取消拉流)
### 暂停指5004
### 候选信5004
IceCandidate
### 暂停信令5004
终端->服务端
暂停发布、订阅(不关媒体流通道)
MCU/SFU模式有效
### 恢复5005
### 恢复5005
终端->服务端
暂停发布、订阅(不关媒体流通道)
MCU/SFU模式有效
### 开启录像5006
### 开启录像信令5006
### 停止录像5007
### 停止录像信令5007
### 配置媒体5008
### 配置媒体信令5008
配置订阅媒体:码率、帧率、分辨率等等
### IceCandidate
### Offer
### Answer
## 测试
```

View File

@@ -18,6 +18,11 @@ import lombok.Setter;
@Schema(title = "终端状态", description = "终端状态")
public class ClientSessionStatus {
public static final String IP = "ip";
public static final String MAC = "mac";
public static final String SIGNAL = "signal";
public static final String BATTERY = "battery";
/**
* 终端标识
*/

View File

@@ -21,6 +21,10 @@ public abstract class ApplicationEventAdapter extends ApplicationEvent {
private static final long serialVersionUID = 1L;
/**
* 终端标识
*/
private String sn;
/**
* 主体
*/
@@ -35,11 +39,16 @@ public abstract class ApplicationEventAdapter extends ApplicationEvent {
private final ClientSession session;
public ApplicationEventAdapter(Message message, ClientSession session) {
this(null, message, session);
this(session.sn(), null, message, session);
}
public ApplicationEventAdapter(Map<?, ?> body, Message message, ClientSession session) {
this(session.sn(), body, message, session);
}
public ApplicationEventAdapter(String sn, Map<?, ?> body, Message message, ClientSession session) {
super(session);
this.sn = sn;
this.body = body;
this.message = message;
this.session = session;

View File

@@ -20,8 +20,8 @@ public class ClientRegisterEvent extends ApplicationEventAdapter {
private static final long serialVersionUID = 1L;
public ClientRegisterEvent(Map<?, ?> body, Message message, ClientSession session) {
super(body, message, session);
public ClientRegisterEvent(String sn, Map<?, ?> body, Message message, ClientSession session) {
super(sn, body, message, session);
}
}

View File

@@ -0,0 +1,27 @@
package com.acgist.taoyao.signal.event.media;
import java.util.Map;
import com.acgist.taoyao.boot.model.Message;
import com.acgist.taoyao.signal.client.ClientSession;
import com.acgist.taoyao.signal.event.ApplicationEventAdapter;
import lombok.Getter;
import lombok.Setter;
/**
* 订阅事件
*
* @author acgist
*/
@Getter
@Setter
public class MediaSubscribeEvent extends ApplicationEventAdapter {
private static final long serialVersionUID = 1L;
public MediaSubscribeEvent(String sn, Map<?, ?> body, Message message, ClientSession session) {
super(sn, body, message, session);
}
}

View File

@@ -6,17 +6,22 @@ import com.acgist.taoyao.boot.model.Message;
import com.acgist.taoyao.signal.client.ClientSession;
import com.acgist.taoyao.signal.event.ApplicationEventAdapter;
import lombok.Getter;
import lombok.Setter;
/**
* 创建会议事件
*
* @author acgist
*/
@Getter
@Setter
public class MeetingCreateEvent extends ApplicationEventAdapter {
private static final long serialVersionUID = 1L;
public MeetingCreateEvent(Map<?, ?> body, Message message, ClientSession session) {
super(body, message, session);
public MeetingCreateEvent(String sn, Map<?, ?> body, Message message, ClientSession session) {
super(sn, body, message, session);
}
}

View File

@@ -6,17 +6,22 @@ import com.acgist.taoyao.boot.model.Message;
import com.acgist.taoyao.signal.client.ClientSession;
import com.acgist.taoyao.signal.event.ApplicationEventAdapter;
import lombok.Getter;
import lombok.Setter;
/**
* 进入会议事件
*
* @author acgist
*/
@Getter
@Setter
public class MeetingEnterEvent extends ApplicationEventAdapter {
private static final long serialVersionUID = 1L;
public MeetingEnterEvent(Map<?, ?> body, Message message, ClientSession session) {
super(body, message, session);
public MeetingEnterEvent(String sn, Map<?, ?> body, Message message, ClientSession session) {
super(sn, body, message, session);
}
}

View File

@@ -6,17 +6,22 @@ import com.acgist.taoyao.boot.model.Message;
import com.acgist.taoyao.signal.client.ClientSession;
import com.acgist.taoyao.signal.event.ApplicationEventAdapter;
import lombok.Getter;
import lombok.Setter;
/**
* 执行命令事件
*
* @author acgist
*/
@Getter
@Setter
public class ScriptEvent extends ApplicationEventAdapter {
private static final long serialVersionUID = 1L;
public ScriptEvent(Map<?, ?> body, Message message, ClientSession session) {
super(body, message, session);
public ScriptEvent(String sn, Map<?, ?> body, Message message, ClientSession session) {
super(sn, body, message, session);
}
}

View File

@@ -2,7 +2,6 @@ package com.acgist.taoyao.signal.listener.client;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@@ -29,11 +28,11 @@ public class ClientCloseListener extends ApplicationListenerAdapter<ClientCloseE
@Override
public void onApplicationEvent(ClientCloseEvent event) {
final ClientSession session = event.getSession();
final String sn = session.sn();
if(StringUtils.isEmpty(sn)) {
if(!session.authorized()) {
// 没有授权终端
return;
}
final String sn = event.getSn();
log.info("关闭终端:{}", sn);
// 广播下线事件
final Message message = this.offlineProtocol.build(
@@ -41,8 +40,7 @@ public class ClientCloseListener extends ApplicationListenerAdapter<ClientCloseE
);
this.clientSessionManager.broadcast(sn, message);
// TODO释放连接
// TODO释放房间
// TODO退出房间
// TODO释放会议
// TODO退出帐号
// TODO注意释放是否考虑没有message非正常的关闭不要立即释放
}

View File

@@ -31,23 +31,24 @@ public class ClientRegisterListener extends ApplicationListenerAdapter<ClientReg
@Async
@Override
public void onApplicationEvent(ClientRegisterEvent event) {
final Map<?, ?> body = event.getBody();
final ClientSession session = event.getSession();
if (!session.authorized()) {
return;
}
final String sn = event.getSn();
final Map<?, ?> body = event.getBody();
// 下发配置
session.push(this.configProtocol.build());
// 修改终端状态
final ClientSessionStatus status = session.status();
status.setSn(session.sn());
status.setIp((String) body.get("ip"));
status.setMac((String) body.get("mac"));
status.setSignal((Integer) body.get("signal"));
status.setBattery((Integer) body.get("battery"));
status.setSn(sn);
status.setIp((String) body.get(ClientSessionStatus.IP));
status.setMac((String) body.get(ClientSessionStatus.MAC));
status.setSignal((Integer) body.get(ClientSessionStatus.SIGNAL));
status.setBattery((Integer) body.get(ClientSessionStatus.BATTERY));
// 广播上线事件
this.clientSessionManager.broadcast(
session.sn(),
sn,
this.onlineProtocol.build(status)
);
}

View File

@@ -24,11 +24,12 @@ public class ScriptListener extends ApplicationListenerAdapter<ScriptEvent> {
@Async
@Override
public void onApplicationEvent(ScriptEvent event) {
final String sn = event.getSn();
final Message message = event.getMessage();
final ClientSession session = event.getSession();
final Map<?, ?> body = event.getBody();
final String script = (String) body.get("script");
log.debug("执行命令:{}", script);
log.debug("执行命令:{}-{}", sn, script);
final String result = this.execute(script);
message.setBody(Map.of("result", result));
session.push(message);

View File

@@ -2,15 +2,15 @@ package com.acgist.taoyao.signal.media;
import java.util.List;
import com.acgist.taoyao.signal.media.router.Router;
import com.acgist.taoyao.signal.media.router.MediaRouter;
public interface RouterManager {
public interface MediaRouterManager {
void bindId();
List<Router> from();
List<MediaRouter> from();
List<Router> to();
List<MediaRouter> to();
void fromRouteTo(String from, String to);

View File

@@ -1,12 +1,12 @@
package com.acgist.taoyao.signal.media.processor;
/**
* 媒体处理混音美颜等等
* 媒体处理混音美颜等等
*
* 处理完成发送订阅者
*
* @author acgist
*/
public interface Processor {
public interface MediaProcessor {
}

View File

@@ -0,0 +1,5 @@
package com.acgist.taoyao.signal.media.processor;
public class ProcessorChain {
}

View File

@@ -0,0 +1,5 @@
package com.acgist.taoyao.signal.media.router;
public class MediaMixRouter {
}

View File

@@ -0,0 +1,26 @@
package com.acgist.taoyao.signal.media.router;
/**
* 媒体流路由器
*
* 发布者->订阅者
*
* @author acgist
*/
public interface MediaRouter {
void from();
void to();
void publisher();
void subscriber();
void stream(String fromOrTo);
void streamFrom(String from);
void streamTo(String to);
}

View File

@@ -1,16 +0,0 @@
package com.acgist.taoyao.signal.media.router;
/**
* 直播会议路由绑定
*
* 发布者->订阅者
*
* @author acgist
*/
public interface Router {
void from();
void to();
}

View File

@@ -7,7 +7,7 @@ package com.acgist.taoyao.signal.media.stream;
*
* @author acgist
*/
public interface ClientMediaHandler {
public interface MediaHandler {
/**
* 打开

View File

@@ -9,17 +9,15 @@ import lombok.extern.slf4j.Slf4j;
/**
* 终端媒体流发布者终端推流
*
* 通过处理器发送给订阅者
*
* @author acgist
*/
@Slf4j
public class ClientMediaPublisher implements ClientMediaHandler {
public class MediaPublisher implements MediaHandler {
/**
* 发布终端媒体流
*/
private Map<String, ClientMediaStream> streams = new ConcurrentHashMap<>();
private Map<String, MediaStream> streams = new ConcurrentHashMap<>();
/**
* 发布
@@ -57,7 +55,7 @@ public class ClientMediaPublisher implements ClientMediaHandler {
@Override
public void resume(String id) {
final ClientMediaStream stream = this.streams.get(id);
final MediaStream stream = this.streams.get(id);
if(stream != null) {
try {
stream.resume();
@@ -69,7 +67,7 @@ public class ClientMediaPublisher implements ClientMediaHandler {
@Override
public void close(String id) {
final ClientMediaStream stream = this.streams.get(id);
final MediaStream stream = this.streams.get(id);
try {
stream.close();
} catch (IOException e) {

View File

@@ -7,7 +7,7 @@ import java.io.IOException;
*
* @author acgist
*/
public interface ClientMediaStream {
public interface MediaStream {
/**
* 终端媒体类型

View File

@@ -5,7 +5,7 @@ package com.acgist.taoyao.signal.media.stream;
*
* @author acgist
*/
public abstract class ClientMediaStreamAdapter<T> implements ClientMediaStream {
public abstract class MediaStreamAdapter<T> implements MediaStream {
/**
* 媒体标识

View File

@@ -8,12 +8,12 @@ import java.util.concurrent.CopyOnWriteArrayList;
*
* @author acgist
*/
public class ClientMediaSubscriber implements ClientMediaHandler {
public class MediaSubscriber implements MediaHandler {
/**
* 订阅终端媒体流
*/
private List<ClientMediaStream> streams = new CopyOnWriteArrayList<>();
private List<MediaStream> streams = new CopyOnWriteArrayList<>();
/**
* 订阅

View File

@@ -30,8 +30,8 @@ public class ClientHeartbeatProtocol extends ProtocolMapAdapter {
session.push(message.cloneWidthoutBody());
// 设置状态
final ClientSessionStatus status = session.status();
status.setSignal((Integer) body.get("signal"));
status.setBattery((Integer) body.get("battery"));
status.setSignal((Integer) body.get(ClientSessionStatus.SIGNAL));
status.setBattery((Integer) body.get(ClientSessionStatus.BATTERY));
status.setLastHeartbeat(LocalDateTime.now());
}

View File

@@ -48,7 +48,7 @@ public class ClientRegisterProtocol extends ProtocolMapAdapter {
// 推送消息
session.push(message.cloneWidthoutBody());
// 发送事件
this.publishEvent(new ClientRegisterEvent(body, message, session));
this.publishEvent(new ClientRegisterEvent(sn, body, message, session));
}
}

View File

@@ -1,5 +1,28 @@
package com.acgist.taoyao.signal.protocol.media;
public class MediaSubscribeProtocol {
import java.util.Map;
import com.acgist.taoyao.boot.model.Message;
import com.acgist.taoyao.signal.client.ClientSession;
import com.acgist.taoyao.signal.event.media.MediaSubscribeEvent;
import com.acgist.taoyao.signal.protocol.ProtocolMapAdapter;
/**
* 订阅指令
*
* @author acgist
*/
public class MediaSubscribeProtocol extends ProtocolMapAdapter {
public static final Integer PID = 5002;
public MediaSubscribeProtocol() {
super(PID, "订阅指令");
}
@Override
public void execute(String sn, Map<?, ?> body, Message message, ClientSession session) {
this.publishEvent(new MediaSubscribeEvent(sn, body, message, session));
}
}

View File

@@ -25,7 +25,7 @@ public class MeetingCreateProtocol extends ProtocolMapAdapter {
@Override
public void execute(String sn, Map<?, ?> body, Message message, ClientSession session) {
this.publishEvent(new MeetingCreateEvent(body, message, session));
this.publishEvent(new MeetingCreateEvent(sn, body, message, session));
}
}

View File

@@ -25,7 +25,7 @@ public class MeetingEnterProtocol extends ProtocolMapAdapter {
@Override
public void execute(String sn, Map<?, ?> body, Message message, ClientSession session) {
this.publishEvent(new MeetingEnterEvent(body, message, session));
this.publishEvent(new MeetingEnterEvent(sn, body, message, session));
}
}

View File

@@ -22,7 +22,7 @@ public class ScriptProtocol extends ProtocolMapAdapter {
@Override
public void execute(String sn, Map<?, ?> body, Message message, ClientSession session) {
this.publishEvent(new ScriptEvent(body, message, session));
this.publishEvent(new ScriptEvent(sn, body, message, session));
}
}