[+] 新增client
This commit is contained in:
@@ -379,6 +379,18 @@ Moon模式有效
|
||||
|
||||
配置订阅媒体:码率、帧率、分辨率等等
|
||||
|
||||
### 终端
|
||||
|
||||
#### 授权信息(6000)
|
||||
|
||||
```
|
||||
```
|
||||
|
||||
### 路由
|
||||
|
||||
### 传输
|
||||
|
||||
|
||||
## 测试
|
||||
|
||||
```
|
||||
|
||||
@@ -0,0 +1,35 @@
|
||||
package com.acgist.taoyao.media.listener;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
import com.acgist.taoyao.boot.annotation.EventListener;
|
||||
import com.acgist.taoyao.boot.model.Message;
|
||||
import com.acgist.taoyao.signal.event.media.MediaAnswerEvent;
|
||||
import com.acgist.taoyao.signal.listener.MediaListenerAdapter;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
/**
|
||||
* Answer监听
|
||||
*
|
||||
* @author acgist
|
||||
*/
|
||||
@Slf4j
|
||||
@EventListener
|
||||
public class MediaAnswerListener extends MediaListenerAdapter<MediaAnswerEvent> {
|
||||
|
||||
@Override
|
||||
public void onApplicationEvent(MediaAnswerEvent event) {
|
||||
final String sn = event.getSn();
|
||||
final String to = event.getTo();
|
||||
if(sn.equals(to)) {
|
||||
log.debug("忽略Answer消息(相同终端):{}-{}", sn, to);
|
||||
return;
|
||||
}
|
||||
final Message message = event.getMessage();
|
||||
final Map<String, Object> mergeBody = event.mergeBody();
|
||||
mergeBody.put("from", sn);
|
||||
this.clientSessionManager.unicast(to, message);
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,35 @@
|
||||
package com.acgist.taoyao.media.listener;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
import com.acgist.taoyao.boot.annotation.EventListener;
|
||||
import com.acgist.taoyao.boot.model.Message;
|
||||
import com.acgist.taoyao.signal.event.media.MediaCandidateEvent;
|
||||
import com.acgist.taoyao.signal.listener.MediaListenerAdapter;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
/**
|
||||
* 候选监听
|
||||
*
|
||||
* @author acgist
|
||||
*/
|
||||
@Slf4j
|
||||
@EventListener
|
||||
public class MediaCandidateListener extends MediaListenerAdapter<MediaCandidateEvent> {
|
||||
|
||||
@Override
|
||||
public void onApplicationEvent(MediaCandidateEvent event) {
|
||||
final String sn = event.getSn();
|
||||
final String to = event.getTo();
|
||||
if(sn.equals(to)) {
|
||||
log.debug("忽略候选消息(相同终端):{}-{}", sn, to);
|
||||
return;
|
||||
}
|
||||
final Message message = event.getMessage();
|
||||
final Map<String, Object> mergeBody = event.mergeBody();
|
||||
mergeBody.put("from", sn);
|
||||
this.clientSessionManager.unicast(to, message);
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,35 @@
|
||||
package com.acgist.taoyao.media.listener;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
import com.acgist.taoyao.boot.annotation.EventListener;
|
||||
import com.acgist.taoyao.boot.model.Message;
|
||||
import com.acgist.taoyao.signal.event.media.MediaOfferEvent;
|
||||
import com.acgist.taoyao.signal.listener.MediaListenerAdapter;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
/**
|
||||
* Offer监听
|
||||
*
|
||||
* @author acgist
|
||||
*/
|
||||
@Slf4j
|
||||
@EventListener
|
||||
public class MediaOfferListener extends MediaListenerAdapter<MediaOfferEvent> {
|
||||
|
||||
@Override
|
||||
public void onApplicationEvent(MediaOfferEvent event) {
|
||||
final String sn = event.getSn();
|
||||
final String to = event.getTo();
|
||||
if(sn.equals(to)) {
|
||||
log.debug("忽略Offer消息(相同终端):{}-{}", sn, to);
|
||||
return;
|
||||
}
|
||||
final Message message = event.getMessage();
|
||||
final Map<String, Object> mergeBody = event.mergeBody();
|
||||
mergeBody.put("from", sn);
|
||||
this.clientSessionManager.unicast(to, message);
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,35 @@
|
||||
package com.acgist.taoyao.media.listener;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
import com.acgist.taoyao.boot.annotation.EventListener;
|
||||
import com.acgist.taoyao.boot.model.Message;
|
||||
import com.acgist.taoyao.signal.event.media.MediaPublishEvent;
|
||||
import com.acgist.taoyao.signal.listener.MediaListenerAdapter;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
/**
|
||||
* 发布监听
|
||||
*
|
||||
* @author acgist
|
||||
*/
|
||||
@Slf4j
|
||||
@EventListener
|
||||
public class MediaPublishListener extends MediaListenerAdapter<MediaPublishEvent> {
|
||||
|
||||
@Override
|
||||
public void onApplicationEvent(MediaPublishEvent event) {
|
||||
final String sn = event.getSn();
|
||||
final String to = event.getTo();
|
||||
if(sn.equals(to)) {
|
||||
log.debug("忽略发布消息(相同终端):{}-{}", sn, to);
|
||||
return;
|
||||
}
|
||||
final Message message = event.getMessage();
|
||||
final Map<String, Object> mergeBody = event.mergeBody();
|
||||
mergeBody.put("from", sn);
|
||||
this.clientSessionManager.unicast(to, message);
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,35 @@
|
||||
package com.acgist.taoyao.media.listener;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
import com.acgist.taoyao.boot.annotation.EventListener;
|
||||
import com.acgist.taoyao.boot.model.Message;
|
||||
import com.acgist.taoyao.signal.event.media.MediaSubscribeEvent;
|
||||
import com.acgist.taoyao.signal.listener.MediaListenerAdapter;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
/**
|
||||
* 订阅监听
|
||||
*
|
||||
* @author acgist
|
||||
*/
|
||||
@Slf4j
|
||||
@EventListener
|
||||
public class MediaSubscribeListener extends MediaListenerAdapter<MediaSubscribeEvent> {
|
||||
|
||||
@Override
|
||||
public void onApplicationEvent(MediaSubscribeEvent event) {
|
||||
final String sn = event.getSn();
|
||||
final String to = event.getTo();
|
||||
if(sn.equals(to)) {
|
||||
log.debug("忽略订阅消息(相同终端):{}-{}", sn, to);
|
||||
return;
|
||||
}
|
||||
final Message message = event.getMessage();
|
||||
final Map<String, Object> mergeBody = event.mergeBody();
|
||||
mergeBody.put("from", sn);
|
||||
this.clientSessionManager.unicast(to, message);
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,59 @@
|
||||
package com.acgist.taoyao.media.meeting;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import lombok.Getter;
|
||||
import lombok.Setter;
|
||||
|
||||
/**
|
||||
* 会议
|
||||
*
|
||||
* @author acgist
|
||||
*/
|
||||
@Getter
|
||||
@Setter
|
||||
@Schema(title = "会议", description = "会议")
|
||||
public class Meeting {
|
||||
|
||||
/**
|
||||
* 会议标识
|
||||
*/
|
||||
@Schema(title = "会议标识", description = "会议标识")
|
||||
private String id;
|
||||
/**
|
||||
* 会议名称
|
||||
*/
|
||||
@Schema(title = "会议名称", description = "会议名称")
|
||||
private String name;
|
||||
/**
|
||||
* 会议密码
|
||||
*/
|
||||
@Schema(title = "会议密码", description = "会议密码")
|
||||
private String password;
|
||||
/**
|
||||
* 终端会话标识列表
|
||||
*/
|
||||
@Schema(title = "终端会话标识列表", description = "终端会话标识列表")
|
||||
private List<String> sns;
|
||||
/**
|
||||
* 创建终端标识
|
||||
*/
|
||||
@Schema(title = "创建终端标识", description = "创建终端标识")
|
||||
private String creator;
|
||||
|
||||
/**
|
||||
* 新增终端会话标识
|
||||
*
|
||||
* @param sn 终端会话标识
|
||||
*/
|
||||
public void addSn(String sn) {
|
||||
synchronized (this.sns) {
|
||||
if(this.sns.contains(sn)) {
|
||||
return;
|
||||
}
|
||||
this.sns.add(sn);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,20 @@
|
||||
package com.acgist.taoyao.media.meeting;
|
||||
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
|
||||
import com.acgist.taoyao.signal.event.ApplicationEventAdapter;
|
||||
import com.acgist.taoyao.signal.listener.ApplicationListenerAdapter;
|
||||
|
||||
/**
|
||||
* 会议事件监听适配器
|
||||
*
|
||||
* @param <E> 事件泛型
|
||||
*
|
||||
* @author acgist
|
||||
*/
|
||||
public abstract class MeetingListenerAdapter<E extends ApplicationEventAdapter> extends ApplicationListenerAdapter<E> {
|
||||
|
||||
@Autowired
|
||||
protected MeetingManager meetingManager;
|
||||
|
||||
}
|
||||
@@ -0,0 +1,77 @@
|
||||
package com.acgist.taoyao.media.meeting;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
|
||||
import com.acgist.taoyao.boot.annotation.Manager;
|
||||
import com.acgist.taoyao.boot.service.IdService;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
/**
|
||||
* 会议管理
|
||||
*
|
||||
* @author acgist
|
||||
*/
|
||||
@Slf4j
|
||||
@Manager
|
||||
public class MeetingManager {
|
||||
|
||||
@Autowired
|
||||
private IdService idService;
|
||||
|
||||
/**
|
||||
* 会议列表
|
||||
*/
|
||||
private List<Meeting> meetings = new CopyOnWriteArrayList<>();
|
||||
|
||||
/**
|
||||
* @return 所有会议列表
|
||||
*/
|
||||
public List<Meeting> meetings() {
|
||||
return this.meetings;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param id 会议标识
|
||||
*
|
||||
* @return 会议信息
|
||||
*/
|
||||
public Meeting meeting(String id) {
|
||||
return this.meetings.stream()
|
||||
.filter(v -> v.getId().equals(id))
|
||||
.findFirst()
|
||||
.orElse(null);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param id 会议标识
|
||||
*
|
||||
* @return 会议所有终端标识
|
||||
*/
|
||||
public List<String> sns(String id) {
|
||||
final Meeting meeting = this.meeting(id);
|
||||
return meeting == null ? List.of() : meeting.getSns();
|
||||
}
|
||||
|
||||
/**
|
||||
* 创建会议
|
||||
*
|
||||
* @param sn 创建会议终端标识
|
||||
*
|
||||
* @return 会议信息
|
||||
*/
|
||||
public Meeting create(String sn) {
|
||||
final Meeting meeting = new Meeting();
|
||||
meeting.setId(this.idService.buildIdToString());
|
||||
meeting.setSns(new CopyOnWriteArrayList<>());
|
||||
meeting.setCreator(sn);
|
||||
meeting.addSn(sn);
|
||||
this.meetings.add(meeting);
|
||||
log.info("创建会议:{}", meeting.getId());
|
||||
return meeting;
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,51 @@
|
||||
package com.acgist.taoyao.media.meeting.controller;
|
||||
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.web.bind.annotation.GetMapping;
|
||||
import org.springframework.web.bind.annotation.PathVariable;
|
||||
import org.springframework.web.bind.annotation.RequestMapping;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
|
||||
import com.acgist.taoyao.boot.model.Message;
|
||||
import com.acgist.taoyao.media.meeting.Meeting;
|
||||
import com.acgist.taoyao.media.meeting.MeetingManager;
|
||||
|
||||
import io.swagger.v3.oas.annotations.Operation;
|
||||
import io.swagger.v3.oas.annotations.media.Content;
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import io.swagger.v3.oas.annotations.responses.ApiResponse;
|
||||
import io.swagger.v3.oas.annotations.tags.Tag;
|
||||
|
||||
/**
|
||||
* 会议
|
||||
*
|
||||
* @author acgist
|
||||
*/
|
||||
@Tag(name = "会议", description = "会议管理")
|
||||
@RestController
|
||||
@RequestMapping("/meeting")
|
||||
public class MeetingController {
|
||||
|
||||
@Autowired
|
||||
private MeetingManager meetingManager;
|
||||
|
||||
@Operation(summary = "会议列表", description = "会议列表")
|
||||
@GetMapping("/list")
|
||||
@ApiResponse(content = @Content(schema = @Schema(implementation = Meeting.class)))
|
||||
public Message list() {
|
||||
return Message.success(this.meetingManager.meetings());
|
||||
}
|
||||
|
||||
@Operation(summary = "会议状态", description = "会议状态")
|
||||
@GetMapping("/status/{id}")
|
||||
public Message status(@PathVariable String id) {
|
||||
return Message.success(this.meetingManager.meeting(id));
|
||||
}
|
||||
|
||||
@Operation(summary = "会议终端列表", description = "会议终端列表")
|
||||
@GetMapping("/list/client/{id}")
|
||||
public Message listClient(@PathVariable String id) {
|
||||
return Message.success(this.meetingManager.sns(id));
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,27 @@
|
||||
package com.acgist.taoyao.media.meeting.listener;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
import com.acgist.taoyao.boot.annotation.EventListener;
|
||||
import com.acgist.taoyao.boot.model.Message;
|
||||
import com.acgist.taoyao.media.meeting.Meeting;
|
||||
import com.acgist.taoyao.media.meeting.MeetingListenerAdapter;
|
||||
import com.acgist.taoyao.signal.event.meeting.MeetingCreateEvent;
|
||||
|
||||
/**
|
||||
* 创建会议监听
|
||||
*
|
||||
* @author acgist
|
||||
*/
|
||||
@EventListener
|
||||
public class MeetingCreateListener extends MeetingListenerAdapter<MeetingCreateEvent> {
|
||||
|
||||
@Override
|
||||
public void onApplicationEvent(MeetingCreateEvent event) {
|
||||
final Meeting meeting = this.meetingManager.create(event.getSn());
|
||||
final Message message = event.getMessage();
|
||||
message.setBody(Map.of("id", meeting.getId()));
|
||||
this.clientSessionManager.broadcast(message);
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,41 @@
|
||||
package com.acgist.taoyao.media.meeting.listener;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
import com.acgist.taoyao.boot.annotation.EventListener;
|
||||
import com.acgist.taoyao.boot.model.Message;
|
||||
import com.acgist.taoyao.boot.model.MessageCode;
|
||||
import com.acgist.taoyao.boot.model.MessageCodeException;
|
||||
import com.acgist.taoyao.media.meeting.Meeting;
|
||||
import com.acgist.taoyao.media.meeting.MeetingListenerAdapter;
|
||||
import com.acgist.taoyao.signal.event.meeting.MeetingEnterEvent;
|
||||
|
||||
/**
|
||||
* 进入会议监听
|
||||
*
|
||||
* @author acgist
|
||||
*/
|
||||
@EventListener
|
||||
public class MeetingEnterListener extends MeetingListenerAdapter<MeetingEnterEvent> {
|
||||
|
||||
@Override
|
||||
public void onApplicationEvent(MeetingEnterEvent event) {
|
||||
final String sn = event.getSn();
|
||||
final String id = event.get("id");
|
||||
final Meeting meeting = this.meetingManager.meeting(id);
|
||||
if(meeting == null) {
|
||||
throw MessageCodeException.of(MessageCode.CODE_3400, "无效会议");
|
||||
}
|
||||
meeting.addSn(sn);
|
||||
final Message message = event.getMessage();
|
||||
message.setBody(Map.of(
|
||||
"id", meeting.getId(),
|
||||
"sn", sn
|
||||
));
|
||||
// TODO:返回房间列表
|
||||
meeting.getSns().stream()
|
||||
.filter(v -> !sn.equals(v))
|
||||
.forEach(v -> this.clientSessionManager.unicast(v, message));
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,312 @@
|
||||
package com.acgist.taoyao.mediasoup;
|
||||
|
||||
import java.net.URI;
|
||||
import java.net.http.HttpClient;
|
||||
import java.net.http.WebSocket;
|
||||
import java.net.http.WebSocket.Listener;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.security.KeyManagementException;
|
||||
import java.security.NoSuchAlgorithmException;
|
||||
import java.security.SecureRandom;
|
||||
import java.security.cert.CertificateException;
|
||||
import java.security.cert.X509Certificate;
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.CompletionStage;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
|
||||
import javax.net.ssl.SSLContext;
|
||||
import javax.net.ssl.X509TrustManager;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.scheduling.TaskScheduler;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import com.acgist.taoyao.boot.model.Header;
|
||||
import com.acgist.taoyao.boot.model.Message;
|
||||
import com.acgist.taoyao.boot.property.MediasoupProperties;
|
||||
import com.acgist.taoyao.boot.property.TaoyaoProperties;
|
||||
import com.acgist.taoyao.boot.property.WebrtcProperties;
|
||||
import com.acgist.taoyao.boot.utils.JSONUtils;
|
||||
import com.acgist.taoyao.mediasoup.protocol.ProtocolMediasoupAdapter;
|
||||
import com.acgist.taoyao.mediasoup.protocol.client.AuthorizeProtocol;
|
||||
import com.acgist.taoyao.signal.protocol.Protocol;
|
||||
import com.acgist.taoyao.signal.protocol.ProtocolManager;
|
||||
|
||||
import jakarta.annotation.PostConstruct;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
/**
|
||||
* Mediasoup客户端
|
||||
*
|
||||
* @author acgist
|
||||
*/
|
||||
@Slf4j
|
||||
@Service
|
||||
public class MediasoupClient {
|
||||
|
||||
@Autowired
|
||||
private TaskScheduler taskSchedulerl;
|
||||
@Autowired
|
||||
private ProtocolManager protocolManager;
|
||||
@Autowired
|
||||
private TaoyaoProperties taoyaoProperties;
|
||||
@Autowired
|
||||
private WebrtcProperties webrtcProperties;
|
||||
@Autowired
|
||||
private AuthorizeProtocol authorizeProtocol;
|
||||
|
||||
/**
|
||||
* 最大重试次数
|
||||
*/
|
||||
private static final int MAX_RETRY = 12;
|
||||
|
||||
/**
|
||||
* 重试次数
|
||||
*/
|
||||
private int retry = 1;
|
||||
/**
|
||||
* Mediasoup WebSocket通道
|
||||
*/
|
||||
private WebSocket webSocket;
|
||||
/**
|
||||
* Mediasoup配置
|
||||
*/
|
||||
private MediasoupProperties mediasoupProperties;
|
||||
/**
|
||||
* 同步消息
|
||||
*/
|
||||
private Map<String, Message> syncMessage = new ConcurrentHashMap<>();
|
||||
|
||||
@PostConstruct
|
||||
public void init() {
|
||||
this.mediasoupProperties = this.webrtcProperties.getMediasoup();
|
||||
this.buildClient();
|
||||
}
|
||||
|
||||
/**
|
||||
* 连接Mediasoup WebSocket通道
|
||||
*/
|
||||
public void buildClient() {
|
||||
final URI uri = URI.create(this.mediasoupProperties.getAddress());
|
||||
log.info("开始连接Mediasoup:{}", uri);
|
||||
try {
|
||||
HttpClient
|
||||
.newBuilder()
|
||||
.sslContext(buildSSLContext())
|
||||
.build()
|
||||
.newWebSocketBuilder()
|
||||
.connectTimeout(Duration.ofMillis(this.taoyaoProperties.getTimeout()))
|
||||
.buildAsync(uri, new MessageListener())
|
||||
.get();
|
||||
} catch (InterruptedException | ExecutionException e) {
|
||||
log.error("连接Mediasoup异常:{}", uri, e);
|
||||
this.taskSchedulerl.schedule(
|
||||
this::buildClient,
|
||||
Instant.now().plusSeconds(Math.min(this.retry++, MAX_RETRY) * 5)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 发送消息
|
||||
*
|
||||
* @param message 消息
|
||||
*/
|
||||
public void send(Message message) {
|
||||
while(this.webSocket == null) {
|
||||
Thread.yield();
|
||||
}
|
||||
this.webSocket.sendText(JSONUtils.toJSON(message), true);
|
||||
}
|
||||
|
||||
/**
|
||||
* 同步发送消息
|
||||
*
|
||||
* @param message 消息
|
||||
*
|
||||
* @return 响应
|
||||
*/
|
||||
public Message sendSync(Message message) {
|
||||
final String id = message.getHeader().getId();
|
||||
this.syncMessage.put(id, message);
|
||||
synchronized (message) {
|
||||
try {
|
||||
message.wait(this.taoyaoProperties.getTimeout());
|
||||
} catch (InterruptedException e) {
|
||||
log.error("等待同步消息异常:{}", message, e);
|
||||
}
|
||||
}
|
||||
final Message response = this.syncMessage.remove(id);
|
||||
if(response == null || message.equals(response)) {
|
||||
log.warn("消息没有响应:{}", message);
|
||||
}
|
||||
return response;
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理消息
|
||||
*
|
||||
* @param data 消息
|
||||
*/
|
||||
private void execute(String data) {
|
||||
if(StringUtils.isNotEmpty(data)) {
|
||||
final Message message = JSONUtils.toJava(data, Message.class);
|
||||
final Header header = message.getHeader();
|
||||
final String id = header.getId();
|
||||
final Integer pid = header.getPid();
|
||||
final Message request = this.syncMessage.get(id);
|
||||
// 存在同步响应
|
||||
if(request != null) {
|
||||
// 重新设置消息
|
||||
this.syncMessage.put(id, message);
|
||||
// 唤醒等待现场
|
||||
synchronized (request) {
|
||||
request.notifyAll();
|
||||
}
|
||||
} else {
|
||||
final Protocol protocol = this.protocolManager.protocol(pid);
|
||||
if(protocol instanceof ProtocolMediasoupAdapter mediasoupProtocol) {
|
||||
mediasoupProtocol.execute(message, this.webSocket);
|
||||
} else {
|
||||
log.warn("未知Mediasoup信令:{}", data);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 消息监听
|
||||
*
|
||||
* @author acgist
|
||||
*/
|
||||
public class MessageListener implements Listener {
|
||||
|
||||
@Override
|
||||
public void onOpen(WebSocket webSocket) {
|
||||
log.info("Mediasoup通道打开:{}", webSocket);
|
||||
Listener.super.onOpen(webSocket);
|
||||
// 关闭旧的通道
|
||||
if(MediasoupClient.this.webSocket != null && !(MediasoupClient.this.webSocket.isInputClosed() && MediasoupClient.this.webSocket.isOutputClosed())) {
|
||||
MediasoupClient.this.webSocket.abort();
|
||||
}
|
||||
// 重置重试次数
|
||||
MediasoupClient.this.retry = 1;
|
||||
// 设置新的通道
|
||||
MediasoupClient.this.webSocket = webSocket;
|
||||
// 发送授权消息
|
||||
MediasoupClient.this.send(MediasoupClient.this.authorizeProtocol.build());
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletionStage<?> onBinary(WebSocket webSocket, ByteBuffer data, boolean last) {
|
||||
log.debug("Mediasoup收到消息(binary):{}", webSocket);
|
||||
return Listener.super.onBinary(webSocket, data, last);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletionStage<?> onText(WebSocket webSocket, CharSequence data, boolean last) {
|
||||
log.debug("Mediasoup收到消息(text):{}-{}", webSocket, data);
|
||||
MediasoupClient.this.execute(data.toString());
|
||||
return Listener.super.onText(webSocket, data, last);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletionStage<?> onClose(WebSocket webSocket, int statusCode, String reason) {
|
||||
log.warn("Mediasoup通道关闭:{}-{}-{}", webSocket, statusCode, reason);
|
||||
try {
|
||||
return Listener.super.onClose(webSocket, statusCode, reason);
|
||||
} finally {
|
||||
MediasoupClient.this.taskSchedulerl.schedule(
|
||||
MediasoupClient.this::buildClient,
|
||||
Instant.now().plusSeconds(Math.min(MediasoupClient.this.retry++, MAX_RETRY) * 5)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(WebSocket webSocket, Throwable error) {
|
||||
log.error("Mediasoup通道异常:{}", webSocket, error);
|
||||
try {
|
||||
Listener.super.onError(webSocket, error);
|
||||
} finally {
|
||||
MediasoupClient.this.taskSchedulerl.schedule(
|
||||
MediasoupClient.this::buildClient,
|
||||
Instant.now().plusSeconds(Math.min(MediasoupClient.this.retry++, MAX_RETRY) * 5)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletionStage<?> onPing(WebSocket webSocket, ByteBuffer message) {
|
||||
log.debug("Mediasoup收到消息(ping):{}", webSocket);
|
||||
return Listener.super.onPing(webSocket, message);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletionStage<?> onPong(WebSocket webSocket, ByteBuffer message) {
|
||||
log.debug("Mediasoup收到消息(pong):{}", webSocket);
|
||||
return Listener.super.onPong(webSocket, message);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* SSLContext
|
||||
*
|
||||
* @return {@link SSLContext}
|
||||
*/
|
||||
private static final SSLContext buildSSLContext() {
|
||||
try {
|
||||
// SSL协议:SSL、SSLv2、SSLv3、TLS、TLSv1、TLSv1.1、TLSv1.2、TLSv1.3
|
||||
final SSLContext sslContext = SSLContext.getInstance("TLSv1.2");
|
||||
sslContext.init(null, new X509TrustManager[] { TaoyaoTrustManager.INSTANCE }, new SecureRandom());
|
||||
return sslContext;
|
||||
} catch (KeyManagementException | NoSuchAlgorithmException e) {
|
||||
log.error("新建SSLContext异常", e);
|
||||
}
|
||||
try {
|
||||
return SSLContext.getDefault();
|
||||
} catch (NoSuchAlgorithmException e) {
|
||||
log.error("新建SSLContext异常", e);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* 证书验证
|
||||
*
|
||||
* @author acgist
|
||||
*/
|
||||
public static class TaoyaoTrustManager implements X509TrustManager {
|
||||
|
||||
private static final TaoyaoTrustManager INSTANCE = new TaoyaoTrustManager();
|
||||
|
||||
private TaoyaoTrustManager() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public X509Certificate[] getAcceptedIssuers() {
|
||||
return new X509Certificate[0];
|
||||
}
|
||||
|
||||
@Override
|
||||
public void checkClientTrusted(X509Certificate[] chain, String authType) throws CertificateException {
|
||||
if(chain == null) {
|
||||
throw new CertificateException("证书验证失败");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void checkServerTrusted(X509Certificate[] chain, String authType) throws CertificateException {
|
||||
if(chain == null) {
|
||||
throw new CertificateException("证书验证失败");
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,10 @@
|
||||
package com.acgist.taoyao.mediasoup.client;
|
||||
|
||||
/**
|
||||
* 终端媒体:producer、consumer
|
||||
*
|
||||
* @author acgist
|
||||
*/
|
||||
public class ClientStream {
|
||||
|
||||
}
|
||||
@@ -0,0 +1,10 @@
|
||||
package com.acgist.taoyao.mediasoup.listener;
|
||||
|
||||
/**
|
||||
* Mediasoup事件监听
|
||||
*
|
||||
* @author acgist
|
||||
*/
|
||||
public abstract class Listener {
|
||||
|
||||
}
|
||||
@@ -0,0 +1,40 @@
|
||||
package com.acgist.taoyao.mediasoup.protocol;
|
||||
|
||||
import java.net.http.WebSocket;
|
||||
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.context.annotation.Lazy;
|
||||
|
||||
import com.acgist.taoyao.boot.model.Message;
|
||||
import com.acgist.taoyao.mediasoup.MediasoupClient;
|
||||
import com.acgist.taoyao.signal.client.ClientSession;
|
||||
import com.acgist.taoyao.signal.protocol.ProtocolAdapter;
|
||||
|
||||
/**
|
||||
* Mediasoup信令适配器
|
||||
*
|
||||
* @author acgist
|
||||
*/
|
||||
public abstract class ProtocolMediasoupAdapter extends ProtocolAdapter {
|
||||
|
||||
@Lazy
|
||||
@Autowired
|
||||
protected MediasoupClient mediasoupClient;
|
||||
|
||||
protected ProtocolMediasoupAdapter(Integer pid, String name) {
|
||||
super(pid, name);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void execute(String sn, Message message, ClientSession session) {
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理Mediasoup信令
|
||||
*
|
||||
* @param message 信令消息
|
||||
* @param webSocket WebSocket
|
||||
*/
|
||||
public abstract void execute(Message message, WebSocket webSocket);
|
||||
|
||||
}
|
||||
@@ -0,0 +1,48 @@
|
||||
package com.acgist.taoyao.mediasoup.protocol.client;
|
||||
|
||||
import java.net.http.WebSocket;
|
||||
import java.util.Map;
|
||||
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
|
||||
import com.acgist.taoyao.boot.annotation.Protocol;
|
||||
import com.acgist.taoyao.boot.model.Message;
|
||||
import com.acgist.taoyao.boot.property.MediasoupProperties;
|
||||
import com.acgist.taoyao.boot.property.WebrtcProperties;
|
||||
import com.acgist.taoyao.mediasoup.protocol.ProtocolMediasoupAdapter;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
/**
|
||||
* Mediasoup终端授权信令
|
||||
*
|
||||
* @author acgist
|
||||
*/
|
||||
@Slf4j
|
||||
@Protocol
|
||||
public class AuthorizeProtocol extends ProtocolMediasoupAdapter {
|
||||
|
||||
public static final Integer PID = 6000;
|
||||
|
||||
@Autowired
|
||||
private WebrtcProperties webrtcProperties;
|
||||
|
||||
public AuthorizeProtocol() {
|
||||
super(PID, "Mediasoup终端授权信令");
|
||||
}
|
||||
|
||||
@Override
|
||||
public Message build() {
|
||||
final MediasoupProperties mediasoup = this.webrtcProperties.getMediasoup();
|
||||
return super.build(Map.of(
|
||||
"username", mediasoup.getUsername(),
|
||||
"password", mediasoup.getPassword()
|
||||
));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void execute(Message message, WebSocket webSocket) {
|
||||
log.info("Mediasoup终端授权结果:{}", message);
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,5 @@
|
||||
package com.acgist.taoyao.mediasoup.protocol.media;
|
||||
|
||||
public class ApplyNetworkThrottleProtocol {
|
||||
|
||||
}
|
||||
@@ -0,0 +1,5 @@
|
||||
package com.acgist.taoyao.mediasoup.protocol.media;
|
||||
|
||||
public class ChangeDisplayNameProtocol {
|
||||
|
||||
}
|
||||
@@ -0,0 +1,5 @@
|
||||
package com.acgist.taoyao.mediasoup.protocol.media;
|
||||
|
||||
public class CloseProducerProtocol {
|
||||
|
||||
}
|
||||
@@ -0,0 +1,5 @@
|
||||
package com.acgist.taoyao.mediasoup.protocol.media;
|
||||
|
||||
public class ConnectWebRtcTransportProtocol {
|
||||
|
||||
}
|
||||
@@ -0,0 +1,5 @@
|
||||
package com.acgist.taoyao.mediasoup.protocol.media;
|
||||
|
||||
public class CreateWebRtcTransportProtocol {
|
||||
|
||||
}
|
||||
@@ -0,0 +1,5 @@
|
||||
package com.acgist.taoyao.mediasoup.protocol.media;
|
||||
|
||||
public class GetConsumerStatsProtocol {
|
||||
|
||||
}
|
||||
@@ -0,0 +1,5 @@
|
||||
package com.acgist.taoyao.mediasoup.protocol.media;
|
||||
|
||||
public class GetDataConsumerStatsProtocol {
|
||||
|
||||
}
|
||||
@@ -0,0 +1,5 @@
|
||||
package com.acgist.taoyao.mediasoup.protocol.media;
|
||||
|
||||
public class GetDataProducerStatsProtocol {
|
||||
|
||||
}
|
||||
@@ -0,0 +1,5 @@
|
||||
package com.acgist.taoyao.mediasoup.protocol.media;
|
||||
|
||||
public class GetProducerStatsProtocol {
|
||||
|
||||
}
|
||||
@@ -0,0 +1,5 @@
|
||||
package com.acgist.taoyao.mediasoup.protocol.media;
|
||||
|
||||
public class GetTransportStatsProtocol {
|
||||
|
||||
}
|
||||
@@ -0,0 +1,5 @@
|
||||
package com.acgist.taoyao.mediasoup.protocol.media;
|
||||
|
||||
public class NewPeerProtocol {
|
||||
|
||||
}
|
||||
@@ -0,0 +1,5 @@
|
||||
package com.acgist.taoyao.mediasoup.protocol.media;
|
||||
|
||||
public class PauseConsumerProtocol {
|
||||
|
||||
}
|
||||
@@ -0,0 +1,5 @@
|
||||
package com.acgist.taoyao.mediasoup.protocol.media;
|
||||
|
||||
public class PauseProducerProtocol {
|
||||
|
||||
}
|
||||
@@ -0,0 +1,5 @@
|
||||
package com.acgist.taoyao.mediasoup.protocol.media;
|
||||
|
||||
public class ProduceDataProtocol {
|
||||
|
||||
}
|
||||
@@ -0,0 +1,5 @@
|
||||
package com.acgist.taoyao.mediasoup.protocol.media;
|
||||
|
||||
public class ProduceProtocol {
|
||||
|
||||
}
|
||||
@@ -0,0 +1,5 @@
|
||||
package com.acgist.taoyao.mediasoup.protocol.media;
|
||||
|
||||
public class ProducerScoreProtocol {
|
||||
|
||||
}
|
||||
@@ -0,0 +1,5 @@
|
||||
package com.acgist.taoyao.mediasoup.protocol.media;
|
||||
|
||||
public class RequestConsumerKeyFrameProtocol {
|
||||
|
||||
}
|
||||
@@ -0,0 +1,5 @@
|
||||
package com.acgist.taoyao.mediasoup.protocol.media;
|
||||
|
||||
public class ResetNetworkThrottleProtocol {
|
||||
|
||||
}
|
||||
@@ -0,0 +1,5 @@
|
||||
package com.acgist.taoyao.mediasoup.protocol.media;
|
||||
|
||||
public class RestartIceProtocol {
|
||||
|
||||
}
|
||||
@@ -0,0 +1,5 @@
|
||||
package com.acgist.taoyao.mediasoup.protocol.media;
|
||||
|
||||
public class ResumeConsumerProtocol {
|
||||
|
||||
}
|
||||
@@ -0,0 +1,5 @@
|
||||
package com.acgist.taoyao.mediasoup.protocol.media;
|
||||
|
||||
public class ResumeProducerProtocol {
|
||||
|
||||
}
|
||||
@@ -0,0 +1,5 @@
|
||||
package com.acgist.taoyao.mediasoup.protocol.media;
|
||||
|
||||
public class SetConsumerPreferredLayersProtocol {
|
||||
|
||||
}
|
||||
@@ -0,0 +1,5 @@
|
||||
package com.acgist.taoyao.mediasoup.protocol.media;
|
||||
|
||||
public class SetConsumerPriorityProtocol {
|
||||
|
||||
}
|
||||
@@ -0,0 +1,5 @@
|
||||
package com.acgist.taoyao.mediasoup.protocol.media;
|
||||
|
||||
public class VideoorientationchangeProtocol {
|
||||
|
||||
}
|
||||
@@ -0,0 +1,19 @@
|
||||
package com.acgist.taoyao.mediasoup.router;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
import com.acgist.taoyao.mediasoup.transport.Transport;
|
||||
|
||||
/**
|
||||
* 路由
|
||||
*
|
||||
* @author acgist
|
||||
*/
|
||||
public final class Router {
|
||||
|
||||
/**
|
||||
* 传输通道列表
|
||||
*/
|
||||
private List<Transport> transportList;
|
||||
|
||||
}
|
||||
@@ -0,0 +1,28 @@
|
||||
package com.acgist.taoyao.mediasoup.transport;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
import com.acgist.taoyao.mediasoup.client.ClientStream;
|
||||
import com.acgist.taoyao.signal.client.ClientSession;
|
||||
|
||||
/**
|
||||
* 传输通道
|
||||
*
|
||||
* @author acgist
|
||||
*/
|
||||
public final class Transport {
|
||||
|
||||
/**
|
||||
* 终端
|
||||
*/
|
||||
private ClientSession clientSession;
|
||||
/**
|
||||
* 生产者列表
|
||||
*/
|
||||
private List<ClientStream> producerList;
|
||||
/**
|
||||
* 消费者列表
|
||||
*/
|
||||
private List<ClientStream> consumerList;
|
||||
|
||||
}
|
||||
@@ -0,0 +1,27 @@
|
||||
package com.acgist.taoyao.signal.event.platform;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
import com.acgist.taoyao.boot.model.Message;
|
||||
import com.acgist.taoyao.signal.client.ClientSession;
|
||||
import com.acgist.taoyao.signal.event.ApplicationEventAdapter;
|
||||
|
||||
import lombok.Getter;
|
||||
import lombok.Setter;
|
||||
|
||||
/**
|
||||
* 关闭服务事件
|
||||
*
|
||||
* @author acgist
|
||||
*/
|
||||
@Getter
|
||||
@Setter
|
||||
public class ShutdownEvent extends ApplicationEventAdapter {
|
||||
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
public ShutdownEvent(String sn, Map<?, ?> body, Message message, ClientSession session) {
|
||||
super(sn, body, message, session);
|
||||
}
|
||||
|
||||
}
|
||||
@@ -32,6 +32,11 @@ public abstract class ProtocolAdapter implements Protocol {
|
||||
* 信令标识
|
||||
*/
|
||||
protected final Integer pid;
|
||||
/**
|
||||
* 信令标识
|
||||
* TODO:
|
||||
*/
|
||||
protected final String signal = "";
|
||||
/**
|
||||
* 信令名称
|
||||
*/
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
package com.acgist.taoyao.signal.protocol;
|
||||
|
||||
import java.net.http.WebSocket;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
@@ -58,7 +57,7 @@ public class ProtocolManager {
|
||||
if(this.protocolMapping.containsKey(pid)) {
|
||||
throw MessageCodeException.of("存在重复信令协议:" + pid);
|
||||
}
|
||||
log.info("注册信令协议:{}-{}-{}", pid, name, k);
|
||||
log.info("注册信令协议:{}-{}-{}", pid, String.format("%32s", k), name);
|
||||
this.protocolMapping.put(pid, v);
|
||||
});
|
||||
}
|
||||
|
||||
@@ -1,10 +1,13 @@
|
||||
package com.acgist.taoyao.signal.protocol.platform;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
import org.springframework.context.ConfigurableApplicationContext;
|
||||
|
||||
import com.acgist.taoyao.boot.model.Message;
|
||||
import com.acgist.taoyao.signal.client.ClientSession;
|
||||
import com.acgist.taoyao.signal.protocol.ProtocolAdapter;
|
||||
import com.acgist.taoyao.signal.event.platform.ShutdownEvent;
|
||||
import com.acgist.taoyao.signal.protocol.ProtocolMapAdapter;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
@@ -14,7 +17,7 @@ import lombok.extern.slf4j.Slf4j;
|
||||
* @author acgist
|
||||
*/
|
||||
@Slf4j
|
||||
public class ShutdownProtocol extends ProtocolAdapter {
|
||||
public class ShutdownProtocol extends ProtocolMapAdapter {
|
||||
|
||||
public static final Integer PID = 1000;
|
||||
|
||||
@@ -23,7 +26,9 @@ public class ShutdownProtocol extends ProtocolAdapter {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void execute(String sn, Message message, ClientSession session) {
|
||||
public void execute(String sn, Map<?, ?> body, Message message, ClientSession session) {
|
||||
// 推送事件
|
||||
this.publishEvent(new ShutdownEvent(sn, body, message, session));
|
||||
// 全员广播
|
||||
this.clientSessionManager.broadcast(message);
|
||||
// 关闭信令服务
|
||||
|
||||
Reference in New Issue
Block a user