diff --git a/README.md b/README.md index 9d06d29..160593a 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # 桃夭 -基于WebRTC实现信令服务,实现Mesh、MCU和SFU三种媒体通信架构,支持直播会议两种场景。 +基于WebRTC实现信令服务,实现Mesh、MCU和SFU三种媒体通信架构,支持直播会议两种场景。
项目提供WebRTC服务信令,终端已有H5示例,其他终端需要自己实现。 ## 模块 @@ -54,14 +54,14 @@ ### Mesh -流媒体点对点连接,不经过服务端。 +流媒体点对点连接,不经过服务端。
Mesh架构声音视频控制部分功能均在终端实现,同时不会实现录制、美颜、变声、混音等等功能。 > 需要使用STUN/TURN实现内网穿透(可以自己搭建coturn服务) ### MCU -终端推流到服务端,由服务端混音分流。 +终端推流到服务端,由服务端处理分流。 ### SFU diff --git a/taoyao-boot/src/main/java/com/acgist/taoyao/boot/annotation/EventListener.java b/taoyao-boot/src/main/java/com/acgist/taoyao/boot/annotation/EventListener.java new file mode 100644 index 0000000..336536f --- /dev/null +++ b/taoyao-boot/src/main/java/com/acgist/taoyao/boot/annotation/EventListener.java @@ -0,0 +1,22 @@ +package com.acgist.taoyao.boot.annotation; + +import java.lang.annotation.Documented; +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +import org.springframework.stereotype.Component; + +/** + * 监听 + * + * @author acgist + */ +@Target(ElementType.TYPE) +@Retention(RetentionPolicy.RUNTIME) +@Component +@Documented +public @interface EventListener { + +} diff --git a/taoyao-boot/src/main/java/com/acgist/taoyao/boot/annotation/Manager.java b/taoyao-boot/src/main/java/com/acgist/taoyao/boot/annotation/Manager.java new file mode 100644 index 0000000..e4bed51 --- /dev/null +++ b/taoyao-boot/src/main/java/com/acgist/taoyao/boot/annotation/Manager.java @@ -0,0 +1,22 @@ +package com.acgist.taoyao.boot.annotation; + +import java.lang.annotation.Documented; +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +import org.springframework.stereotype.Component; + +/** + * 管理 + * + * @author acgist + */ +@Target(ElementType.TYPE) +@Retention(RetentionPolicy.RUNTIME) +@Component +@Documented +public @interface Manager { + +} diff --git a/taoyao-boot/src/main/java/com/acgist/taoyao/boot/annotation/Protocol.java b/taoyao-boot/src/main/java/com/acgist/taoyao/boot/annotation/Protocol.java new file mode 100644 index 0000000..e6fa324 --- /dev/null +++ b/taoyao-boot/src/main/java/com/acgist/taoyao/boot/annotation/Protocol.java @@ -0,0 +1,22 @@ +package com.acgist.taoyao.boot.annotation; + +import java.lang.annotation.Documented; +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +import org.springframework.stereotype.Component; + +/** + * 协议 + * + * @author acgist + */ +@Target(ElementType.TYPE) +@Retention(RetentionPolicy.RUNTIME) +@Component +@Documented +public @interface Protocol { + +} diff --git a/taoyao-boot/src/main/java/com/acgist/taoyao/boot/utils/ErrorUtils.java b/taoyao-boot/src/main/java/com/acgist/taoyao/boot/utils/ErrorUtils.java index 071beb1..937405b 100644 --- a/taoyao-boot/src/main/java/com/acgist/taoyao/boot/utils/ErrorUtils.java +++ b/taoyao-boot/src/main/java/com/acgist/taoyao/boot/utils/ErrorUtils.java @@ -41,10 +41,6 @@ public final class ErrorUtils { * 错误地址 */ public static final String ERROR_PATH = "/error"; - /** - * Servlet错误异常 - */ - public static final String EXCEPTION_SERVLET = "javax.servlet.error.exception"; /** * Servlet错误编码 */ @@ -53,6 +49,10 @@ public final class ErrorUtils { * Servlet错误地址 */ public static final String SERVLET_REQUEST_URI = "javax.servlet.error.request_uri"; + /** + * Servlet错误异常 + */ + public static final String EXCEPTION_SERVLET = "javax.servlet.error.exception"; /** * SpringBoot异常 */ diff --git a/taoyao-live/src/main/java/com/acgist/taoyao/live/LiveListenerAdapter.java b/taoyao-live/src/main/java/com/acgist/taoyao/live/LiveListenerAdapter.java new file mode 100644 index 0000000..ada6f19 --- /dev/null +++ b/taoyao-live/src/main/java/com/acgist/taoyao/live/LiveListenerAdapter.java @@ -0,0 +1,20 @@ +package com.acgist.taoyao.live; + +import org.springframework.beans.factory.annotation.Autowired; + +import com.acgist.taoyao.signal.event.ApplicationEventAdapter; +import com.acgist.taoyao.signal.listener.ApplicationListenerAdapter; + +/** + * 直播事件监听适配器 + * + * @param 事件泛型 + * + * @author acgist + */ +public abstract class LiveListenerAdapter extends ApplicationListenerAdapter { + + @Autowired + protected LiveManager liveManager; + +} diff --git a/taoyao-live/src/main/java/com/acgist/taoyao/live/LiveManager.java b/taoyao-live/src/main/java/com/acgist/taoyao/live/LiveManager.java index 9433055..5476064 100644 --- a/taoyao-live/src/main/java/com/acgist/taoyao/live/LiveManager.java +++ b/taoyao-live/src/main/java/com/acgist/taoyao/live/LiveManager.java @@ -1,5 +1,13 @@ package com.acgist.taoyao.live; +import com.acgist.taoyao.boot.annotation.Manager; + +/** + * 直播管理 + * + * @author acgist + */ +@Manager public class LiveManager { } diff --git a/taoyao-meeting/src/main/java/com/acgist/taoyao/meeting/MeetingListenerAdapter.java b/taoyao-meeting/src/main/java/com/acgist/taoyao/meeting/MeetingListenerAdapter.java new file mode 100644 index 0000000..af4db37 --- /dev/null +++ b/taoyao-meeting/src/main/java/com/acgist/taoyao/meeting/MeetingListenerAdapter.java @@ -0,0 +1,20 @@ +package com.acgist.taoyao.meeting; + +import org.springframework.beans.factory.annotation.Autowired; + +import com.acgist.taoyao.signal.event.ApplicationEventAdapter; +import com.acgist.taoyao.signal.listener.ApplicationListenerAdapter; + +/** + * 会议事件监听适配器 + * + * @param 事件泛型 + * + * @author acgist + */ +public abstract class MeetingListenerAdapter extends ApplicationListenerAdapter { + + @Autowired + protected MeetingManager meetingManager; + +} diff --git a/taoyao-meeting/src/main/java/com/acgist/taoyao/meeting/MeetingManager.java b/taoyao-meeting/src/main/java/com/acgist/taoyao/meeting/MeetingManager.java index c933d43..233c5e3 100644 --- a/taoyao-meeting/src/main/java/com/acgist/taoyao/meeting/MeetingManager.java +++ b/taoyao-meeting/src/main/java/com/acgist/taoyao/meeting/MeetingManager.java @@ -4,8 +4,8 @@ import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Service; +import com.acgist.taoyao.boot.annotation.Manager; import com.acgist.taoyao.boot.service.IdService; import lombok.extern.slf4j.Slf4j; @@ -16,7 +16,7 @@ import lombok.extern.slf4j.Slf4j; * @author acgist */ @Slf4j -@Service +@Manager public class MeetingManager { @Autowired diff --git a/taoyao-meeting/src/main/java/com/acgist/taoyao/meeting/listener/MeetingCreateListener.java b/taoyao-meeting/src/main/java/com/acgist/taoyao/meeting/listener/MeetingCreateListener.java index 1622552..696f38b 100644 --- a/taoyao-meeting/src/main/java/com/acgist/taoyao/meeting/listener/MeetingCreateListener.java +++ b/taoyao-meeting/src/main/java/com/acgist/taoyao/meeting/listener/MeetingCreateListener.java @@ -2,26 +2,20 @@ package com.acgist.taoyao.meeting.listener; import java.util.Map; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; - +import com.acgist.taoyao.boot.annotation.EventListener; import com.acgist.taoyao.boot.model.Message; import com.acgist.taoyao.meeting.Meeting; -import com.acgist.taoyao.meeting.MeetingManager; +import com.acgist.taoyao.meeting.MeetingListenerAdapter; import com.acgist.taoyao.signal.event.meeting.MeetingCreateEvent; -import com.acgist.taoyao.signal.listener.ApplicationListenerAdapter; /** * 创建会议监听 * * @author acgist */ -@Component -public class MeetingCreateListener extends ApplicationListenerAdapter { +@EventListener +public class MeetingCreateListener extends MeetingListenerAdapter { - @Autowired - private MeetingManager meetingManager; - @Override public void onApplicationEvent(MeetingCreateEvent event) { final Meeting meeting = this.meetingManager.create(event.getSn()); diff --git a/taoyao-meeting/src/main/java/com/acgist/taoyao/meeting/listener/MeetingEnterListener.java b/taoyao-meeting/src/main/java/com/acgist/taoyao/meeting/listener/MeetingEnterListener.java index 60a0c70..c603ff5 100644 --- a/taoyao-meeting/src/main/java/com/acgist/taoyao/meeting/listener/MeetingEnterListener.java +++ b/taoyao-meeting/src/main/java/com/acgist/taoyao/meeting/listener/MeetingEnterListener.java @@ -2,31 +2,24 @@ package com.acgist.taoyao.meeting.listener; import java.util.Map; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; - +import com.acgist.taoyao.boot.annotation.EventListener; import com.acgist.taoyao.boot.model.Message; import com.acgist.taoyao.meeting.Meeting; -import com.acgist.taoyao.meeting.MeetingManager; +import com.acgist.taoyao.meeting.MeetingListenerAdapter; import com.acgist.taoyao.signal.event.meeting.MeetingEnterEvent; -import com.acgist.taoyao.signal.listener.ApplicationListenerAdapter; /** * 进入会议监听 * * @author acgist */ -@Component -public class MeetingEnterListener extends ApplicationListenerAdapter { +@EventListener +public class MeetingEnterListener extends MeetingListenerAdapter { - @Autowired - private MeetingManager meetingManager; - @Override public void onApplicationEvent(MeetingEnterEvent event) { final String sn = event.getSn(); - final Map body = event.getBody(); - final String id = (String) body.get("id"); + final String id = event.get("id"); final Meeting meeting = this.meetingManager.meeting(id); meeting.addSn(sn); final Message message = event.getMessage(); diff --git a/taoyao-server/src/main/resources/static/javascript/taoyao.js b/taoyao-server/src/main/resources/static/javascript/taoyao.js index 8a89d4e..580d299 100644 --- a/taoyao-server/src/main/resources/static/javascript/taoyao.js +++ b/taoyao-server/src/main/resources/static/javascript/taoyao.js @@ -1,6 +1,7 @@ /** 桃夭WebRTC终端核心功能 */ /** 兼容 */ const RTCPeerConnection = window.RTCPeerConnection || window.mozRTCPeerConnection || window.webkitRTCPeerConnection; +const RTCSessionDescription = window.RTCSessionDescription || window.mozRTCSessionDescription || window.webkitRTCSessionDescription; /** 默认音频配置 */ const defaultAudioConfig = { // 音量:0~1 @@ -42,7 +43,7 @@ const defaultVideoConfig = { /** 默认RTCPeerConnection配置 */ const defaultRPCConfig = { // ICE代理的服务器 - // iceServers: null, + iceServers: null, // 传输通道绑定策略:balanced|max-compat|max-bundle bundlePolicy: 'balanced', // RTCP多路复用策略:require|negotiate @@ -78,9 +79,15 @@ const signalProtocol = { config: 2004, /** 心跳 */ heartbeat: 2005, + /** 重启终端 */ + reboot: 2997, }, /** 会议信令 */ meeting: { + /** 创建会议信令 */ + create: 4000, + /** 进入会议信令 */ + enter: 4002, }, /** 平台信令 */ platform: { @@ -208,11 +215,25 @@ const signalChannel = { } reject(e); }; + /** + * 回调策略: + * 1. 如果注册请求回调,同时执行结果返回true不再执行后面所有回调。 + * 2. 如果注册全局回调,同时执行结果返回true不再执行后面所有回调。 + * 3. 如果前面所有回调没有返回true执行默认回调。 + */ self.channel.onmessage = function(e) { console.debug('信令通道消息', e.data); let done = false; let data = JSON.parse(e.data); - // 注册回调 + // 请求回调 + if(self.callbackMapping.has(data.header.id)) { + try { + done = self.callbackMapping.get(data.header.id)(data); + } finally { + self.callbackMapping.delete(data.header.id); + } + } + // 全局回调 if(self.callback) { done = self.callback(data); } @@ -220,11 +241,6 @@ const signalChannel = { if(!done) { self.defaultCallback(data); } - // 请求回调 - if(self.callbackMapping.has(data.header.id)) { - self.callbackMapping.get(data.header.id)(); - self.callbackMapping.delete(data.header.id); - } }; }); }, @@ -277,22 +293,42 @@ const signalChannel = { console.debug('没有适配信令消息默认处理', data); switch(data.header.pid) { case signalProtocol.client.register: - console.debug('终端注册成功'); break; case signalProtocol.client.config: - if(this.taoyao) { - this.taoyao - .configMedia(data.body.media) - .configWebrtc(data.body.webrtc); - } + this.defaultClientConfig(data); break; case signalProtocol.client.heartbeat: - console.debug('心跳'); + break; + case signalProtocol.client.reboot: + this.defaultClientReboot(data); + break; + case signalProtocol.meeting.create: + break; + case signalProtocol.meeting.enter: break; case signalProtocol.platform.error: console.error('信令发生错误', data); break; } + }, + /** 终端默认回调 */ + defaultClientConfig: function(data) { + this.taoyao + .configMedia(data.body.media) + .configWebrtc(data.body.webrtc); + }, + defaultClientReboot: function(data) { + console.info('重启终端'); + location.reload(); + }, + /** 默认媒体回调 */ + defaultMediaSubscribe: function(data) { + + }, + /** 会议默认回调 */ + defaultMeetingEnter: function(data) { + this.taoyao + .mediaSubscribe(data.body.sn); } }; /** 终端 */ @@ -304,16 +340,13 @@ function TaoyaoClient( /** 视频对象 */ this.video = null; /** 媒体信息 */ + this.stream = null; this.audioTrack = null; this.videoTrack = null; /** 媒体状态 */ - this.audioStatus = true; - this.videoStatus = true; - /** 录制状态 */ + this.audioStatus = false; + this.videoStatus = false; this.recordStatus = false; - /** 媒体信息 */ - this.audioStreamId = null; - this.videoStreamId = null; /** 播放视频 */ this.play = async function() { await this.video.play(); @@ -345,35 +378,48 @@ function TaoyaoClient( /** 设置媒体流 */ this.buildStream = async function(stream) { if(stream) { + this.stream = stream; this.video.srcObject = stream; + let audioTrack = stream.getAudioTracks(); + let videoTrack = stream.getVideoTracks(); + if(audioTrack && audioTrack.length) { + this.audioTrack = audioTrack; + this.audioStatus = true; + } + if(videoTrack && videoTrack.length) { + this.videoTrack = videoTrack; + this.videoStatus = true; + } + console.debug('设置媒体流', this.stream, this.audioTrack, this.videoTrack); await this.play(); } return this; }; /** 设置音频流 */ - this.buildAudioStream = function() { + this.buildAudioTrack = function() { + // 关闭旧的 + // 创建新的 }; /** 设置视频流 */ - this.buildVideoStream = function() { + this.buildVideoTrack = function() { + // 关闭旧的 + // 创建新的 }; } /** 桃夭 */ function Taoyao( - webSocket, - iceServer, - audioConfig, - videoConfig + webSocket ) { + /** WebRTC配置 */ + this.webrtc = null; /** WebSocket地址 */ this.webSocket = webSocket; - /** IceServer地址 */ - this.iceServer = iceServer; /** 设备状态 */ this.audioEnabled = true; this.videoEnabled = true; /** 媒体配置 */ - this.audioConfig = audioConfig || defaultAudioConfig; - this.videoConfig = videoConfig || defaultVideoConfig; + this.audioConfig = defaultAudioConfig; + this.videoConfig = defaultVideoConfig; /** 发送信令 */ this.push = null; /** 本地终端 */ @@ -438,26 +484,27 @@ function Taoyao( }; /** WebRTC配置 */ this.configWebrtc = function(config = {}) { - this.webSocket = config.signalAddress; - this.iceServer = config.stun; - console.debug('WebRTC配置', this.webSocket, this.iceServer); + this.webrtc = config; + this.webSocket = this.webrtc.signalAddress; + defaultRPCConfig.iceServers = this.webrtc.stun.map(v => ({'urls': v})); + console.debug('WebRTC配置', this.webrtc, defaultRPCConfig); return this; }; /** 打开信令通道 */ this.buildChannel = function(callback) { signalChannel.taoyao = this; this.signalChannel = signalChannel; - this.signalChannel.connect(this.webSocket, callback); // 不能直接this.push = this.signalChannel.push这样导致this对象错误 this.push = function(data, callback) { this.signalChannel.push(data, callback) }; - return this; + return this.signalChannel.connect(this.webSocket, callback); }; /** 打开本地媒体 */ this.buildLocalMedia = function() { console.debug('打开终端媒体', this.audioConfig, this.videoConfig); let self = this; + this.checkDevice(); return new Promise((resolve, reject) => { navigator.mediaDevices.getUserMedia({ audio: self.audioConfig, @@ -465,13 +512,21 @@ function Taoyao( }) .then(resolve) .catch(reject); + // 兼容旧版 + // navigator.getUserMedia({ + // audio: self.audioConfig, + // video: self.videoConfig + // }, resolve, reject); }); }; - /** 设置本地终端 */ - this.buildLocalClient = async function(localVideoId, stream) { - this.localClient = new TaoyaoClient(signalConfig.sn); - await this.localClient.buildVideo(localVideoId, stream); - }; + /** 远程终端过滤 */ + this.remoteClientFilter = function(sn) { + let array = this.remoteClient.filter(v => v.sn === sn); + if(array.length <= 0) { + return null; + } + return this.remoteClient.filter(v => v.sn === sn)[0]; + } /** 关闭:关闭媒体 */ this.close = function() { // TODO:释放资源 @@ -479,9 +534,96 @@ function Taoyao( /** 关机:关闭媒体、关闭信令 */ this.shutdown = function() { this.close(); + }; + /** 打开媒体通道 */ + this.buildMediaChannel = async function(localVideoId, stream) { + // 本地视频 + this.localClient = new TaoyaoClient(signalConfig.sn); + await this.localClient.buildVideo(localVideoId, stream); + // 本地通道 + this.localMediaChannel = new RTCPeerConnection(defaultRPCConfig); + if(this.localClient.audioTrack) { + this.localClient.audioTrack.forEach(v => this.localMediaChannel.addTrack(v, this.localClient.stream)); + } + if(this.localClient.videoTrack) { + this.localClient.videoTrack.forEach(v => this.localMediaChannel.addTrack(v, this.localClient.stream)); + } + this.localMediaChannel.ontrack = this.localMediaChannelTrack; + this.localMediaChannel.ondatachannel = this.localMediaChannelDataChannel; + this.localMediaChannel.onicecandidate = this.localMediaChannelIceCandidate; + // 远程通道 + this.remoteMediaChannel = new RTCPeerConnection(defaultRPCConfig); + this.remoteMediaChannel.ontrack = this.remoteMediaChannelTrack; + this.remoteMediaChannel.ondatachannel = this.remoteMediaChannelDataChannel; + this.remoteMediaChannel.onicecandidate = this.remoteMediaChannelIceCandidate; + return this; + }; + /** 本地 */ + this.localMediaChannelTrack = function() { + }; + this.localMediaChannelDataChannel = function(channel) { + channel.onopen = function() { + console.debug('DataChannel Open'); + } + channel.onmessage = function(data) { + console.debug('DataChannel Message', data); + } + channel.onclose = function() { + console.debug('DataChannel Close'); + } + channel.onerror = function(e) { + console.debug('DataChannel Error', e); + } + }; + this.localMediaChannelIceCandidate = function() { + }; + /** 远程 */ + this.localMediaChannelTrack = function() { + }; + this.localMediaChannelDataChannel = function(channel) { + channel.onopen = function() { + console.debug('DataChannel Open'); + } + channel.onmessage = function(data) { + console.debug('DataChannel Message', data); + } + channel.onclose = function() { + console.debug('DataChannel Close'); + } + channel.onerror = function(e) { + console.debug('DataChannel Error', e); + } + }; + this.localMediaChannelIceCandidate = function() { + }; + /** 媒体信令 */ + this.mediaSubscribe = function(sn, callback) { + let self = this; + if(self.webrtc.model === 'MESH') { + self.localMediaChannel.createOffer().then(description => { + console.debug('Local Create Offer', description); + self.localMediaChannel.setLocalDescription(description); + }); + } + }; + /** 会议信令 */ + this.meetingCreate = function(callback) { + let self = this; + self.push(signalProtocol.buildProtocol( + signalConfig.sn, + signalProtocol.meeting.create, + ), callback); } - /** 媒体 */ - /** 视频 */ + this.meetingEnter = function(id, callback) { + let self = this; + self.push(signalProtocol.buildProtocol( + signalConfig.sn, + signalProtocol.meeting.enter, + { + id: id + } + ), callback); + }; }; /* var peer; diff --git a/taoyao-server/src/main/resources/static/meeting.html b/taoyao-server/src/main/resources/static/meeting.html index e880553..4431bdf 100644 --- a/taoyao-server/src/main/resources/static/meeting.html +++ b/taoyao-server/src/main/resources/static/meeting.html @@ -11,11 +11,11 @@
- - - - - + + + + +
@@ -36,7 +36,7 @@ - +
@@ -57,14 +57,18 @@ if(signalConfig.sn) { // TODO:修改sn } + let self = this; this.taoyao = new Taoyao("wss://localhost:8888/websocket.signal"); - // 检查设备 + // 打开信令通道 this.taoyao - .checkDevice() - .buildChannel(this.callback) - .buildLocalMedia() - .then(stream => this.taoyao.buildLocalClient('local', stream)) - .catch((e) => console.error('打开终端媒体失败', e)); + .buildChannel(self.callback) + .then(e => console.debug('连接成功')); + // 打开媒体通道 + this.taoyao.buildLocalMedia() + .then(stream => { + self.taoyao.buildMediaChannel('local', stream); + }) + .catch(e => console.error('打开终端媒体失败', e)); }, beforeDestroy() { }, @@ -81,19 +85,21 @@ // 创建会议 create: function(event) { let self = this; - this.taoyao.createMeeting(data => { - self.meetingId = data.body.id; + this.taoyao.meetingCreate(data => { + console.log(data) + self.taoyao.meetingEnter(data.body.id); + return true; }); }, - // 返回终端 - client: function(sn) { - return this.clients.filter(v => v.sn === sn)[0]; - }, // 会议邀请 invite: function(sn) { }, // 进入会议 enter: function(sn) { + let id = prompt('房间标识'); + if(id) { + this.taoyao.meetingEnter(id); + } }, // 离开会议 leave: function(sn) { diff --git a/taoyao-server/src/test/java/com/acgist/taoyao/signal/SignalTest.java b/taoyao-server/src/test/java/com/acgist/taoyao/signal/SignalTest.java index 4ed4042..e1d445a 100644 --- a/taoyao-server/src/test/java/com/acgist/taoyao/signal/SignalTest.java +++ b/taoyao-server/src/test/java/com/acgist/taoyao/signal/SignalTest.java @@ -3,12 +3,16 @@ package com.acgist.taoyao.signal; import static org.junit.jupiter.api.Assertions.assertNotNull; import java.net.http.WebSocket; +import java.util.concurrent.CountDownLatch; import org.junit.jupiter.api.Test; import com.acgist.taoyao.main.TaoyaoApplication; import com.acgist.taoyao.test.annotation.TaoyaoTest; +import lombok.extern.slf4j.Slf4j; + +@Slf4j @TaoyaoTest(classes = TaoyaoApplication.class) class SignalTest { @@ -18,9 +22,37 @@ class SignalTest { final WebSocket clientB = WebSocketClient.build("wss://localhost:8888/websocket.signal", "clientB"); clientA.sendText(""" {"header":{"pid":1000,"v":"1.0.0","id":"1","sn":"clientA"},"body":{}} - """, true).join(); + """, true).join(); assertNotNull(clientA); assertNotNull(clientB); } + @Test + void testThread() throws InterruptedException { + final int total = 1000; + final CountDownLatch count = new CountDownLatch(total); + final WebSocket clientA = WebSocketClient.build("wss://localhost:8888/websocket.signal", "clientA", count); + final long aTime = System.currentTimeMillis(); + for (int index = 0; index < total; index++) { + clientA.sendText(""" + {"header":{"pid":2999,"v":"1.0.0","id":"1","sn":"clientA"},"body":{}} + """, true).join(); + } +// final ExecutorService executor = Executors.newFixedThreadPool(10); +// for (int index = 0; index < total; index++) { +// executor.execute(() -> { +// synchronized (clientA) { +// clientA.sendText(""" +// {"header":{"pid":2999,"v":"1.0.0","id":"1","sn":"clientA"},"body":{}} +// """, true).join(); +// } +// }); +// } + count.await(); + final long zTime = System.currentTimeMillis(); + log.info("执行时间:{}", zTime - aTime); + Thread.sleep(1000); + assertNotNull(clientA); + } + } diff --git a/taoyao-server/src/test/java/com/acgist/taoyao/signal/WebSocketClient.java b/taoyao-server/src/test/java/com/acgist/taoyao/signal/WebSocketClient.java index 668405f..c1f66dd 100644 --- a/taoyao-server/src/test/java/com/acgist/taoyao/signal/WebSocketClient.java +++ b/taoyao-server/src/test/java/com/acgist/taoyao/signal/WebSocketClient.java @@ -10,6 +10,7 @@ import java.security.SecureRandom; import java.security.cert.CertificateException; import java.security.cert.X509Certificate; import java.util.concurrent.CompletionStage; +import java.util.concurrent.CountDownLatch; import javax.net.ssl.SSLContext; import javax.net.ssl.TrustManager; @@ -21,6 +22,10 @@ import lombok.extern.slf4j.Slf4j; public class WebSocketClient { public static final WebSocket build(String uri, String sn) throws InterruptedException { + return build(uri, sn, null); + } + + public static final WebSocket build(String uri, String sn, CountDownLatch count) throws InterruptedException { final Object lock = new Object(); try { return HttpClient @@ -32,7 +37,7 @@ public class WebSocketClient { @Override public void onOpen(WebSocket webSocket) { webSocket.sendText(String.format(""" - {"header":{"pid":2000,"v":"1.0.0","id":"1","sn":"%s"},"body":{"username":"taoyao","password":"taoyao"}} + {"header":{"pid":2000,"v":"1.0.0","id":"1","sn":"%s"},"body":{"username":"taoyao","password":"taoyao","ip":"127.0.0.1","mac":"00:00:00:00:00:00"}} """, sn), true); Listener.super.onOpen(webSocket); } @@ -41,7 +46,12 @@ public class WebSocketClient { synchronized (lock) { lock.notifyAll(); } - log.info("收到WebSocket消息:{}", data); + if(count == null) { + log.debug("收到WebSocket消息:{}", data); + } else { + count.countDown(); + log.debug("收到WebSocket消息:{}-{}", count.getCount(), data); + } return Listener.super.onText(webSocket, data, last); } }) diff --git a/taoyao-signal/README.md b/taoyao-signal/README.md index 2b19fe6..7b3f9cf 100644 --- a/taoyao-signal/README.md +++ b/taoyao-signal/README.md @@ -220,6 +220,20 @@ 终端广播信令到所有的终端 +### 重启终端信令(2997) + +#### 消息主体 + +``` +# 请求 +{ +} +``` + +#### 消息流程:服务器->终端 + +重启终端 + ### 终端状态信令(2998) #### 消息主体 @@ -291,7 +305,7 @@ {} ---- { - "id": "会议标识", + "id": "会议标识" } ``` diff --git a/taoyao-signal/src/main/java/com/acgist/taoyao/signal/client/ClientSessionManager.java b/taoyao-signal/src/main/java/com/acgist/taoyao/signal/client/ClientSessionManager.java index c9e783b..c67d53c 100644 --- a/taoyao-signal/src/main/java/com/acgist/taoyao/signal/client/ClientSessionManager.java +++ b/taoyao-signal/src/main/java/com/acgist/taoyao/signal/client/ClientSessionManager.java @@ -7,8 +7,8 @@ import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationContext; import org.springframework.scheduling.annotation.Scheduled; -import org.springframework.stereotype.Service; +import com.acgist.taoyao.boot.annotation.Manager; import com.acgist.taoyao.boot.config.TaoyaoProperties; import com.acgist.taoyao.boot.model.Message; import com.acgist.taoyao.signal.event.client.ClientCloseEvent; @@ -21,7 +21,7 @@ import lombok.extern.slf4j.Slf4j; * @author acgist */ @Slf4j -@Service +@Manager public class ClientSessionManager { @Autowired @@ -169,5 +169,5 @@ public class ClientSessionManager { this.close(v); }); } - + } diff --git a/taoyao-signal/src/main/java/com/acgist/taoyao/signal/client/websocket/WebSocketSignal.java b/taoyao-signal/src/main/java/com/acgist/taoyao/signal/client/websocket/WebSocketSignal.java index e883c00..cc824f7 100644 --- a/taoyao-signal/src/main/java/com/acgist/taoyao/signal/client/websocket/WebSocketSignal.java +++ b/taoyao-signal/src/main/java/com/acgist/taoyao/signal/client/websocket/WebSocketSignal.java @@ -38,7 +38,7 @@ public class WebSocketSignal { public void message(Session session, String message) { log.debug("会话消息:{}-{}", session, message); try { - WebSocketSignal.protocolManager.execute(message, session); + WebSocketSignal.protocolManager.execute(message.strip(), session); } catch (Exception e) { log.error("处理会话消息异常", e); final Message errorMessage = WebSocketSignal.errorProtocol.build(); diff --git a/taoyao-signal/src/main/java/com/acgist/taoyao/signal/config/SignalAutoConfiguration.java b/taoyao-signal/src/main/java/com/acgist/taoyao/signal/config/SignalAutoConfiguration.java index d1f41f3..22f7850 100644 --- a/taoyao-signal/src/main/java/com/acgist/taoyao/signal/config/SignalAutoConfiguration.java +++ b/taoyao-signal/src/main/java/com/acgist/taoyao/signal/config/SignalAutoConfiguration.java @@ -11,6 +11,8 @@ import com.acgist.taoyao.signal.client.websocket.WebSocketSignal; import com.acgist.taoyao.signal.listener.platform.ScriptListener; import com.acgist.taoyao.signal.protocol.platform.ScriptProtocol; import com.acgist.taoyao.signal.protocol.platform.ShutdownProtocol; +import com.acgist.taoyao.signal.service.SecurityService; +import com.acgist.taoyao.signal.service.impl.SecurityServiceImpl; /** * 信令配置 @@ -33,6 +35,12 @@ public class SignalAutoConfiguration { return new ServerEndpointExporter(); } + @Bean + @ConditionalOnMissingBean + public SecurityService securityService() { + return new SecurityServiceImpl(); + } + @Bean @ConditionalOnProperty(prefix = "taoyao.signal.platform.script", name = "enabled", havingValue = "true", matchIfMissing = true) @ConditionalOnMissingBean diff --git a/taoyao-signal/src/main/java/com/acgist/taoyao/signal/controller/ClientController.java b/taoyao-signal/src/main/java/com/acgist/taoyao/signal/controller/ClientController.java index c411cb2..523f41a 100644 --- a/taoyao-signal/src/main/java/com/acgist/taoyao/signal/controller/ClientController.java +++ b/taoyao-signal/src/main/java/com/acgist/taoyao/signal/controller/ClientController.java @@ -8,6 +8,7 @@ import org.springframework.web.bind.annotation.RestController; import com.acgist.taoyao.boot.model.Message; import com.acgist.taoyao.signal.client.ClientSessionManager; +import com.acgist.taoyao.signal.protocol.client.ClientRebootProtocol; import io.swagger.v3.oas.annotations.Operation; import io.swagger.v3.oas.annotations.tags.Tag; @@ -24,6 +25,8 @@ public class ClientController { @Autowired private ClientSessionManager clientSessionManager; + @Autowired + private ClientRebootProtocol clientRebootProtocol; @Operation(summary = "终端列表", description = "终端列表") @GetMapping("/list") @@ -37,4 +40,11 @@ public class ClientController { return Message.success(this.clientSessionManager.status(sn)); } + @Operation(summary = "重启终端", description = "重启终端") + @GetMapping("/reboot/{sn}") + public Message reboot(@PathVariable String sn) { + this.clientSessionManager.unicast(sn, this.clientRebootProtocol.build()); + return Message.success(); + } + } diff --git a/taoyao-signal/src/main/java/com/acgist/taoyao/signal/event/ApplicationEventAdapter.java b/taoyao-signal/src/main/java/com/acgist/taoyao/signal/event/ApplicationEventAdapter.java index fc472e6..b6e9dd3 100644 --- a/taoyao-signal/src/main/java/com/acgist/taoyao/signal/event/ApplicationEventAdapter.java +++ b/taoyao-signal/src/main/java/com/acgist/taoyao/signal/event/ApplicationEventAdapter.java @@ -53,5 +53,37 @@ public abstract class ApplicationEventAdapter extends ApplicationEvent { this.message = message; this.session = session; } + + /** + * @param 参数泛型 + * + * @param key 参数名称 + * + * @return 值 + */ + @SuppressWarnings("unchecked") + public T get(String key) { + if(this.body == null) { + return null; + } + return (T) this.body.get(key); + } + + /** + * @param 参数泛型 + * + * @param key 参数名称 + * @param defaultValue 默认值 + * + * @return 值 + */ + @SuppressWarnings("unchecked") + public T get(String key, T defaultValue) { + if(this.body == null) { + return defaultValue; + } + final T t = (T) this.body.get(key); + return t == null ? defaultValue : t; + } } diff --git a/taoyao-signal/src/main/java/com/acgist/taoyao/signal/event/client/ClientRegisterEvent.java b/taoyao-signal/src/main/java/com/acgist/taoyao/signal/event/client/ClientRegisterEvent.java index a70ebec..fc04c64 100644 --- a/taoyao-signal/src/main/java/com/acgist/taoyao/signal/event/client/ClientRegisterEvent.java +++ b/taoyao-signal/src/main/java/com/acgist/taoyao/signal/event/client/ClientRegisterEvent.java @@ -4,6 +4,7 @@ import java.util.Map; import com.acgist.taoyao.boot.model.Message; import com.acgist.taoyao.signal.client.ClientSession; +import com.acgist.taoyao.signal.client.ClientSessionStatus; import com.acgist.taoyao.signal.event.ApplicationEventAdapter; import lombok.Getter; @@ -23,5 +24,33 @@ public class ClientRegisterEvent extends ApplicationEventAdapter { public ClientRegisterEvent(String sn, Map body, Message message, ClientSession session) { super(sn, body, message, session); } + + /** + * @return IP + */ + public String getIp() { + return this.get(ClientSessionStatus.IP); + } + + /** + * @return Mac + */ + public String getMac() { + return this.get(ClientSessionStatus.MAC); + } + + /** + * @return Signal + */ + public Integer getSignal() { + return this.get(ClientSessionStatus.SIGNAL); + } + + /** + * @return Battery + */ + public Integer getBattery() { + return this.get(ClientSessionStatus.BATTERY); + } } diff --git a/taoyao-signal/src/main/java/com/acgist/taoyao/signal/listener/MediaListenerAdapter.java b/taoyao-signal/src/main/java/com/acgist/taoyao/signal/listener/MediaListenerAdapter.java new file mode 100644 index 0000000..0346013 --- /dev/null +++ b/taoyao-signal/src/main/java/com/acgist/taoyao/signal/listener/MediaListenerAdapter.java @@ -0,0 +1,20 @@ +package com.acgist.taoyao.signal.listener; + +import org.springframework.beans.factory.annotation.Autowired; + +import com.acgist.taoyao.signal.event.ApplicationEventAdapter; +import com.acgist.taoyao.signal.media.MediaRouterManager; + +/** + * 媒体事件监听适配器 + * + * @param 事件泛型 + * + * @author acgist + */ +public abstract class MediaListenerAdapter extends ApplicationListenerAdapter { + + @Autowired + protected MediaRouterManager mediaRouterManager; + +} diff --git a/taoyao-signal/src/main/java/com/acgist/taoyao/signal/listener/client/ClientCloseListener.java b/taoyao-signal/src/main/java/com/acgist/taoyao/signal/listener/client/ClientCloseListener.java index 47cbe24..48ea8ac 100644 --- a/taoyao-signal/src/main/java/com/acgist/taoyao/signal/listener/client/ClientCloseListener.java +++ b/taoyao-signal/src/main/java/com/acgist/taoyao/signal/listener/client/ClientCloseListener.java @@ -3,8 +3,8 @@ package com.acgist.taoyao.signal.listener.client; import java.util.Map; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; +import com.acgist.taoyao.boot.annotation.EventListener; import com.acgist.taoyao.boot.model.Message; import com.acgist.taoyao.signal.client.ClientSession; import com.acgist.taoyao.signal.event.client.ClientCloseEvent; @@ -19,7 +19,7 @@ import lombok.extern.slf4j.Slf4j; * @author acgist */ @Slf4j -@Component +@EventListener public class ClientCloseListener extends ApplicationListenerAdapter { @Autowired diff --git a/taoyao-signal/src/main/java/com/acgist/taoyao/signal/listener/client/ClientRegisterListener.java b/taoyao-signal/src/main/java/com/acgist/taoyao/signal/listener/client/ClientRegisterListener.java index 27429db..979c43c 100644 --- a/taoyao-signal/src/main/java/com/acgist/taoyao/signal/listener/client/ClientRegisterListener.java +++ b/taoyao-signal/src/main/java/com/acgist/taoyao/signal/listener/client/ClientRegisterListener.java @@ -1,11 +1,9 @@ package com.acgist.taoyao.signal.listener.client; -import java.util.Map; - import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Async; -import org.springframework.stereotype.Component; +import com.acgist.taoyao.boot.annotation.EventListener; import com.acgist.taoyao.signal.client.ClientSession; import com.acgist.taoyao.signal.client.ClientSessionStatus; import com.acgist.taoyao.signal.event.client.ClientRegisterEvent; @@ -20,7 +18,7 @@ import com.acgist.taoyao.signal.protocol.client.ClientOnlineProtocol; * * @author acgist */ -@Component +@EventListener public class ClientRegisterListener extends ApplicationListenerAdapter { @Autowired @@ -36,16 +34,15 @@ public class ClientRegisterListener extends ApplicationListenerAdapter body = event.getBody(); // 下发配置 session.push(this.configProtocol.build()); // 修改终端状态 final ClientSessionStatus status = session.status(); status.setSn(sn); - status.setIp((String) body.get(ClientSessionStatus.IP)); - status.setMac((String) body.get(ClientSessionStatus.MAC)); - status.setSignal((Integer) body.get(ClientSessionStatus.SIGNAL)); - status.setBattery((Integer) body.get(ClientSessionStatus.BATTERY)); + status.setIp(event.getIp()); + status.setMac(event.getMac()); + status.setSignal(event.getSignal()); + status.setBattery(event.getBattery()); // 广播上线事件 this.clientSessionManager.broadcast( sn, diff --git a/taoyao-signal/src/main/java/com/acgist/taoyao/signal/listener/platform/ScriptListener.java b/taoyao-signal/src/main/java/com/acgist/taoyao/signal/listener/platform/ScriptListener.java index 02007e3..594c2f1 100644 --- a/taoyao-signal/src/main/java/com/acgist/taoyao/signal/listener/platform/ScriptListener.java +++ b/taoyao-signal/src/main/java/com/acgist/taoyao/signal/listener/platform/ScriptListener.java @@ -27,8 +27,7 @@ public class ScriptListener extends ApplicationListenerAdapter { final String sn = event.getSn(); final Message message = event.getMessage(); final ClientSession session = event.getSession(); - final Map body = event.getBody(); - final String script = (String) body.get("script"); + final String script = event.get("script"); log.debug("执行命令:{}-{}", sn, script); final String result = this.execute(script); message.setBody(Map.of("result", result)); diff --git a/taoyao-signal/src/main/java/com/acgist/taoyao/signal/media/MediaRouterManager.java b/taoyao-signal/src/main/java/com/acgist/taoyao/signal/media/MediaRouterManager.java index 7cf0954..7120b57 100644 --- a/taoyao-signal/src/main/java/com/acgist/taoyao/signal/media/MediaRouterManager.java +++ b/taoyao-signal/src/main/java/com/acgist/taoyao/signal/media/MediaRouterManager.java @@ -1,19 +1,73 @@ package com.acgist.taoyao.signal.media; -import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import org.springframework.beans.factory.annotation.Autowired; + +import com.acgist.taoyao.boot.annotation.Manager; +import com.acgist.taoyao.signal.media.processor.ProcessorChain; import com.acgist.taoyao.signal.media.router.MediaRouter; +import com.acgist.taoyao.signal.media.router.MediaRouterHandler; -public interface MediaRouterManager { +import lombok.extern.slf4j.Slf4j; - void bindId(); +/** + * 媒体路由管理 + * + * @author acgist + */ +@Slf4j +@Manager +public class MediaRouterManager { - List from(); + /** + * 路由集合 + * ID=路由器 + * ID=LiveId/MeetingId + */ + private Map routers = new ConcurrentHashMap<>(); - List to(); + @Autowired(required = false) + private ProcessorChain processorChain; - void fromRouteTo(String from, String to); + /** + * 创建路由 + * + * @param id ID + * + * @return 路由 + */ + public MediaRouter build(Long id) { + return this.routers.computeIfAbsent(id, key -> { + final MediaRouter router = new MediaRouterHandler(); + router.build(); + router.processorChain(this.processorChain); + log.debug("创建路由:{}-{}", id, router); + return router; + }); + } - void fromOrTo(String sn); + /** + * @param id ID + * + * @return 路由 + */ + public MediaRouter router(Long id) { + return this.routers.get(id); + } + + /** + * 关闭路由 + * + * @param id ID + */ + public void close(Long id) { + final MediaRouter router = this.router(id); + if(router == null) { + return; + } + router.close(); + } } diff --git a/taoyao-signal/src/main/java/com/acgist/taoyao/signal/media/processor/MediaMixProcessor.java b/taoyao-signal/src/main/java/com/acgist/taoyao/signal/media/processor/MediaMixProcessor.java new file mode 100644 index 0000000..61dbc4a --- /dev/null +++ b/taoyao-signal/src/main/java/com/acgist/taoyao/signal/media/processor/MediaMixProcessor.java @@ -0,0 +1,5 @@ +package com.acgist.taoyao.signal.media.processor; + +public class MediaMixProcessor { + +} diff --git a/taoyao-signal/src/main/java/com/acgist/taoyao/signal/media/processor/MediaProcessor.java b/taoyao-signal/src/main/java/com/acgist/taoyao/signal/media/processor/MediaProcessor.java index 8a191b3..d85c139 100644 --- a/taoyao-signal/src/main/java/com/acgist/taoyao/signal/media/processor/MediaProcessor.java +++ b/taoyao-signal/src/main/java/com/acgist/taoyao/signal/media/processor/MediaProcessor.java @@ -3,8 +3,6 @@ package com.acgist.taoyao.signal.media.processor; /** * 媒体流处理器:混音、美颜等等 * - * 处理完成发送订阅者 - * * @author acgist */ public interface MediaProcessor { diff --git a/taoyao-signal/src/main/java/com/acgist/taoyao/signal/media/processor/ProcessorChain.java b/taoyao-signal/src/main/java/com/acgist/taoyao/signal/media/processor/ProcessorChain.java index df93a3c..e553ca7 100644 --- a/taoyao-signal/src/main/java/com/acgist/taoyao/signal/media/processor/ProcessorChain.java +++ b/taoyao-signal/src/main/java/com/acgist/taoyao/signal/media/processor/ProcessorChain.java @@ -1,5 +1,10 @@ package com.acgist.taoyao.signal.media.processor; +/** + * 媒体流处理器责任链 + * + * @author acgist + */ public class ProcessorChain { } diff --git a/taoyao-signal/src/main/java/com/acgist/taoyao/signal/media/router/MediaMixRouter.java b/taoyao-signal/src/main/java/com/acgist/taoyao/signal/media/router/MediaMixRouter.java deleted file mode 100644 index b612af1..0000000 --- a/taoyao-signal/src/main/java/com/acgist/taoyao/signal/media/router/MediaMixRouter.java +++ /dev/null @@ -1,5 +0,0 @@ -package com.acgist.taoyao.signal.media.router; - -public class MediaMixRouter { - -} diff --git a/taoyao-signal/src/main/java/com/acgist/taoyao/signal/media/router/MediaPublisher.java b/taoyao-signal/src/main/java/com/acgist/taoyao/signal/media/router/MediaPublisher.java new file mode 100644 index 0000000..677c5cf --- /dev/null +++ b/taoyao-signal/src/main/java/com/acgist/taoyao/signal/media/router/MediaPublisher.java @@ -0,0 +1,12 @@ +package com.acgist.taoyao.signal.media.router; + +import com.acgist.taoyao.signal.media.stream.MediaHandlerAdapter; + +/** + * 终端媒体发布者 + * + * @author acgist + */ +public class MediaPublisher extends MediaHandlerAdapter { + +} diff --git a/taoyao-signal/src/main/java/com/acgist/taoyao/signal/media/router/MediaRouter.java b/taoyao-signal/src/main/java/com/acgist/taoyao/signal/media/router/MediaRouter.java index 38e2a03..6bece2d 100644 --- a/taoyao-signal/src/main/java/com/acgist/taoyao/signal/media/router/MediaRouter.java +++ b/taoyao-signal/src/main/java/com/acgist/taoyao/signal/media/router/MediaRouter.java @@ -1,26 +1,67 @@ package com.acgist.taoyao.signal.media.router; +import java.util.List; + +import com.acgist.taoyao.signal.media.processor.ProcessorChain; +import com.acgist.taoyao.signal.media.stream.MediaStream; + /** * 媒体流路由器 * - * 发布者->订阅者 - * * @author acgist */ public interface MediaRouter { + + /** + * 初始路由 + */ + void build(); - void from(); + /** + * @return 媒体发布者 + */ + MediaPublisher publisher(); - void to(); + /** + * @return 媒体订阅者 + */ + MediaSubscriber subscriber(); - void publisher(); + /** + * @param processorChain 媒体流处理器责任链 + */ + void processorChain(ProcessorChain processorChain); - void subscriber(); + /** + * @return 发布者媒体流 + */ + List streamPublisher(); - void stream(String fromOrTo); + /** + * @param sns 订阅者终端标识 + * + * @return 订阅者媒体流 + */ + List streamSubscriber(String ... sns); - void streamFrom(String from); - - void streamTo(String to); + /** + * @param type 媒体类型 + * + * @return 发布者媒体流 + */ + List streamPublisher(MediaStream.Type type); + /** + * @param type 媒体类型 + * @param sns 订阅者终端标识 + * + * @return 发布者媒体流 + */ + List streamSubscriber(MediaStream.Type type, String ... sns); + + /** + * 关闭路由 + */ + void close(); + } diff --git a/taoyao-signal/src/main/java/com/acgist/taoyao/signal/media/router/MediaRouterHandler.java b/taoyao-signal/src/main/java/com/acgist/taoyao/signal/media/router/MediaRouterHandler.java new file mode 100644 index 0000000..b9b74c2 --- /dev/null +++ b/taoyao-signal/src/main/java/com/acgist/taoyao/signal/media/router/MediaRouterHandler.java @@ -0,0 +1,97 @@ +package com.acgist.taoyao.signal.media.router; + +import java.io.IOException; +import java.util.List; + +import org.apache.commons.lang3.ArrayUtils; + +import com.acgist.taoyao.signal.media.processor.ProcessorChain; +import com.acgist.taoyao.signal.media.stream.MediaStream; +import com.acgist.taoyao.signal.media.stream.MediaStream.Type; + +import lombok.extern.slf4j.Slf4j; + +/** + * 媒体流路由器处理器 + * + * @author acgist + */ +@Slf4j +public class MediaRouterHandler implements MediaRouter { + + /** + * 媒体流处理器责任链 + */ + private ProcessorChain processorChain; + /** + * 发布者 + */ + private MediaPublisher mediaPublisher; + /** + * 订阅者 + */ + private MediaSubscriber mediaSubscriber; + + @Override + public void build() { + this.mediaPublisher = new MediaPublisher(); + this.mediaSubscriber = new MediaSubscriber(); + } + + @Override + public MediaPublisher publisher() { + return this.mediaPublisher; + } + + @Override + public MediaSubscriber subscriber() { + return this.mediaSubscriber; + } + + @Override + public void processorChain(ProcessorChain processorChain) { + this.processorChain = processorChain; + } + + @Override + public List streamPublisher() { + return this.mediaPublisher.getStreams(); + } + + @Override + public List streamSubscriber(String ... sns) { + return this.mediaSubscriber.getStreams().stream() + .filter(v -> ArrayUtils.contains(sns, v.subscriber())) + .toList(); + } + + @Override + public List streamPublisher(Type type) { + return this.mediaPublisher.getStreams().stream() + .filter(v -> v.type() == type) + .toList(); + } + + @Override + public List streamSubscriber(Type type, String... sns) { + return this.mediaSubscriber.getStreams().stream() + .filter(v -> v.type() == type) + .filter(v -> ArrayUtils.contains(sns, v.subscriber())) + .toList(); + } + + @Override + public void close() { + try { + this.mediaPublisher.close(); + } catch (IOException e) { + log.error("关闭发布者异常", e); + } + try { + this.mediaSubscriber.close(); + } catch (IOException e) { + log.error("关闭订阅者异常", e); + } + } + +} diff --git a/taoyao-signal/src/main/java/com/acgist/taoyao/signal/media/router/MediaSubscriber.java b/taoyao-signal/src/main/java/com/acgist/taoyao/signal/media/router/MediaSubscriber.java new file mode 100644 index 0000000..3d78f35 --- /dev/null +++ b/taoyao-signal/src/main/java/com/acgist/taoyao/signal/media/router/MediaSubscriber.java @@ -0,0 +1,12 @@ +package com.acgist.taoyao.signal.media.router; + +import com.acgist.taoyao.signal.media.stream.MediaHandlerAdapter; + +/** + * 终端媒体订阅者 + * + * @author acgist + */ +public class MediaSubscriber extends MediaHandlerAdapter { + +} diff --git a/taoyao-signal/src/main/java/com/acgist/taoyao/signal/media/stream/MediaHandler.java b/taoyao-signal/src/main/java/com/acgist/taoyao/signal/media/stream/MediaHandler.java index c96f957..1f68a66 100644 --- a/taoyao-signal/src/main/java/com/acgist/taoyao/signal/media/stream/MediaHandler.java +++ b/taoyao-signal/src/main/java/com/acgist/taoyao/signal/media/stream/MediaHandler.java @@ -1,40 +1,93 @@ package com.acgist.taoyao.signal.media.stream; +import java.io.IOException; + /** - * 终端媒体操作 - * - * TODO:注意暂停心跳防止端口关闭 + * 终端媒体处理器 * * @author acgist */ public interface MediaHandler { + + /** + * 打开 + * 注意:用于打开媒体流 + * + * @throws IOException IO异常 + */ + void open() throws IOException; /** * 打开 + * 注意:用于管理媒体流 * - * @param id 终端媒体流ID + * @param stream 媒体流 + * + * @throws IOException IO异常 */ - void open(String id); + void open(MediaStream stream) throws IOException; /** * 暂停 + * 注意:暂停时发送心跳防止通道关闭 * - * @param id 终端媒体流ID + * @throws IOException IO异常 */ - void pause(String id); + void pause() throws IOException; /** * 恢复 * - * @param id 终端媒体流ID + * @throws IOException IO异常 */ - void resume(String id); + void resume() throws IOException; /** * 关闭 * * @param id 终端媒体流ID + * + * @throws IOException IO异常 */ - void close(String id); + void close() throws IOException; + + /** + * 打开 + * + * @param type 媒体类型 + * + * @throws IOException IO异常 + */ + void open(MediaStream.Type type) throws IOException; + + /** + * 暂停 + * 注意:暂停时发送心跳防止通道关闭 + * + * @param type 媒体类型 + * + * @throws IOException IO异常 + */ + void pause(MediaStream.Type type) throws IOException; + + /** + * 恢复 + * + * @param type 媒体类型 + * + * @throws IOException IO异常 + */ + void resume(MediaStream.Type type) throws IOException; + + /** + * 关闭 + * + * @param type 媒体类型 + * + * @param id 终端媒体流ID + * + * @throws IOException IO异常 + */ + void close(MediaStream.Type type) throws IOException; } diff --git a/taoyao-signal/src/main/java/com/acgist/taoyao/signal/media/stream/MediaHandlerAdapter.java b/taoyao-signal/src/main/java/com/acgist/taoyao/signal/media/stream/MediaHandlerAdapter.java new file mode 100644 index 0000000..f933c61 --- /dev/null +++ b/taoyao-signal/src/main/java/com/acgist/taoyao/signal/media/stream/MediaHandlerAdapter.java @@ -0,0 +1,111 @@ +package com.acgist.taoyao.signal.media.stream; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; + +import com.acgist.taoyao.boot.model.MessageCodeException; +import com.acgist.taoyao.signal.media.stream.MediaStream.Type; + +import lombok.Getter; +import lombok.Setter; +import lombok.extern.slf4j.Slf4j; + +/** + * 终端媒体处理器适配器 + * + * @author acgist + */ +@Slf4j +@Getter +@Setter +public class MediaHandlerAdapter implements MediaHandler { + + /** + * 媒体流集合 + */ + protected List streams = new CopyOnWriteArrayList<>(); + + @Override + public void open() throws IOException { + throw MessageCodeException.of("禁止使用"); + } + + @Override + public void open(MediaStream stream) throws IOException { + log.debug("打开媒体流:{}", stream); + this.streams.add(stream); + } + + @Override + public void pause() throws IOException { + this.streams.forEach(v -> { + try { + v.pause(); + } catch (IOException e) { + log.error("暂停媒体流异常:{}", v, e); + } + }); + } + + @Override + public void resume() throws IOException { + this.streams.forEach(v -> { + try { + v.resume(); + } catch (IOException e) { + log.error("恢复媒体流异常:{}", v, e); + } + }); + } + + @Override + public void close() throws IOException { + this.streams.forEach(v -> { + try { + v.close(); + } catch (IOException e) { + log.error("关闭媒体流异常:{}", v, e); + } + }); + } + + @Override + public void open(Type type) throws IOException { + throw MessageCodeException.of("禁止使用"); + } + + @Override + public void pause(Type type) throws IOException { + this.streams.stream().filter(v -> v.type() == type).forEach(v -> { + try { + v.pause(); + } catch (IOException e) { + log.error("暂停媒体流异常:{}", v, e); + } + }); + } + + @Override + public void resume(Type type) throws IOException { + this.streams.stream().filter(v -> v.type() == type).forEach(v -> { + try { + v.resume(); + } catch (IOException e) { + log.error("恢复媒体流异常:{}", v, e); + } + }); + } + + @Override + public void close(Type type) throws IOException { + this.streams.stream().filter(v -> v.type() == type).forEach(v -> { + try { + v.close(); + } catch (IOException e) { + log.error("关闭媒体流异常:{}", v, e); + } + }); + } + +} diff --git a/taoyao-signal/src/main/java/com/acgist/taoyao/signal/media/stream/MediaPublisher.java b/taoyao-signal/src/main/java/com/acgist/taoyao/signal/media/stream/MediaPublisher.java deleted file mode 100644 index 8bfd8e8..0000000 --- a/taoyao-signal/src/main/java/com/acgist/taoyao/signal/media/stream/MediaPublisher.java +++ /dev/null @@ -1,78 +0,0 @@ -package com.acgist.taoyao.signal.media.stream; - -import java.io.IOException; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - -import lombok.extern.slf4j.Slf4j; - -/** - * 终端媒体流发布者(终端推流) - * - * @author acgist - */ -@Slf4j -public class MediaPublisher implements MediaHandler { - - /** - * 发布终端媒体流 - */ - private Map streams = new ConcurrentHashMap<>(); - - /** - * 发布 - * - * @param id 终端媒体流ID - * - * @see #open(String) - */ - public void publish(String id) { - this.open(id); - } - - /** - * 取消发布 - * - * @param id 终端媒体流ID - * - * @see #close(String) - */ - public void unpublish(String id) { - this.close(id); - } - - @Override - public void open(String id) { - // TODO Auto-generated method stub - - } - - @Override - public void pause(String id) { - // TODO Auto-generated method stub - - } - - @Override - public void resume(String id) { - final MediaStream stream = this.streams.get(id); - if(stream != null) { - try { - stream.resume(); - } catch (IOException e) { - log.error("终端媒体流恢复异常:{}", id, e); - } - } - } - - @Override - public void close(String id) { - final MediaStream stream = this.streams.get(id); - try { - stream.close(); - } catch (IOException e) { - log.error("终端媒体流关闭异常:{}", id, e); - } - } - -} diff --git a/taoyao-signal/src/main/java/com/acgist/taoyao/signal/media/stream/MediaStream.java b/taoyao-signal/src/main/java/com/acgist/taoyao/signal/media/stream/MediaStream.java index b9407d7..608ca50 100644 --- a/taoyao-signal/src/main/java/com/acgist/taoyao/signal/media/stream/MediaStream.java +++ b/taoyao-signal/src/main/java/com/acgist/taoyao/signal/media/stream/MediaStream.java @@ -1,13 +1,11 @@ package com.acgist.taoyao.signal.media.stream; -import java.io.IOException; - /** * 终端媒体流 * * @author acgist */ -public interface MediaStream { +public interface MediaStream extends MediaHandler { /** * 终端媒体类型 @@ -16,6 +14,10 @@ public interface MediaStream { */ public enum Type { + /** + * 混合:音视频 + */ + MIX, /** * 音频 */ @@ -58,34 +60,6 @@ public interface MediaStream { */ String id(); - /** - * 打开终端媒体流 - * - * @throws IO异常 - */ - void open() throws IOException; - - /** - * 暂停终端媒体流 - * - * @throws IO异常 - */ - void pause() throws IOException; - - /** - * 恢复终端媒体流 - * - * @throws IO异常 - */ - void resume() throws IOException; - - /** - * 关闭终端媒体流 - * - * @throws IO异常 - */ - void close() throws IOException; - /** * @return 终端媒体流类型 */ @@ -96,4 +70,14 @@ public interface MediaStream { */ Status status(); + /** + * @return 发布者 + */ + String publisher(); + + /** + * @return 订阅者 + */ + String subscriber(); + } diff --git a/taoyao-signal/src/main/java/com/acgist/taoyao/signal/media/stream/MediaStreamAdapter.java b/taoyao-signal/src/main/java/com/acgist/taoyao/signal/media/stream/MediaStreamAdapter.java index 62917d6..6dfc3c4 100644 --- a/taoyao-signal/src/main/java/com/acgist/taoyao/signal/media/stream/MediaStreamAdapter.java +++ b/taoyao-signal/src/main/java/com/acgist/taoyao/signal/media/stream/MediaStreamAdapter.java @@ -1,19 +1,76 @@ package com.acgist.taoyao.signal.media.stream; +import java.io.IOException; + +import com.acgist.taoyao.boot.model.MessageCodeException; + +import lombok.Getter; +import lombok.Setter; +import lombok.ToString; + /** * 终端媒体流适配器 * * @author acgist */ +@Getter +@Setter +@ToString(of = {"id", "type", "status", "publisher", "subscriber"}) public abstract class MediaStreamAdapter implements MediaStream { /** - * 媒体标识 + * 标识 */ - private String id; + protected String id; + /** + * 类型 + */ + protected Type type; + /** + * 状态 + */ + protected Status status; + /** + * 发布者 + */ + private String publisher; + /** + * 订阅者 + */ + private String subscriber; /** * 真实流 */ protected T stream; + @Override + public String id() { + return this.id; + } + + @Override + public Type type() { + return this.type; + } + + @Override + public Status status() { + return this.status; + } + + @Override + public String publisher() { + return this.publisher; + } + + @Override + public String subscriber() { + return this.subscriber; + } + + @Override + public void open(MediaStream stream) throws IOException { + throw MessageCodeException.of("禁止套娃"); + } + } diff --git a/taoyao-signal/src/main/java/com/acgist/taoyao/signal/media/stream/MediaSubscriber.java b/taoyao-signal/src/main/java/com/acgist/taoyao/signal/media/stream/MediaSubscriber.java deleted file mode 100644 index 549e91f..0000000 --- a/taoyao-signal/src/main/java/com/acgist/taoyao/signal/media/stream/MediaSubscriber.java +++ /dev/null @@ -1,64 +0,0 @@ -package com.acgist.taoyao.signal.media.stream; - -import java.util.List; -import java.util.concurrent.CopyOnWriteArrayList; - -/** - * 终端媒体订阅者(终端拉流) - * - * @author acgist - */ -public class MediaSubscriber implements MediaHandler { - - /** - * 订阅终端媒体流 - */ - private List streams = new CopyOnWriteArrayList<>(); - - /** - * 订阅 - * - * @param id 终端媒体流ID - * - * @see #open(String) - */ - public void subscribe(String id) { - this.open(id); - } - - /** - * 取消订阅 - * - * @param id 终端媒体流ID - * - * @see #close(String) - */ - public void unsubscribe(String id) { - this.close(id); - } - - @Override - public void open(String id) { - // TODO Auto-generated method stub - - } - - @Override - public void pause(String id) { - // TODO Auto-generated method stub - - } - - @Override - public void resume(String id) { - // TODO Auto-generated method stub - - } - - @Override - public void close(String id) { - // TODO Auto-generated method stub - - } - -} diff --git a/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/Protocol.java b/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/Protocol.java index 6400418..8fea884 100644 --- a/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/Protocol.java +++ b/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/Protocol.java @@ -27,6 +27,17 @@ public interface Protocol { * @return 信令名称 */ String name(); + + /** + * 鉴权 + * + * @param message 信令 + * + * @return 是否成功 + */ + default boolean authenticate(Message message) { + return true; + } /** * 处理信令消息 diff --git a/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/ProtocolManager.java b/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/ProtocolManager.java index b54c101..beb1852 100644 --- a/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/ProtocolManager.java +++ b/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/ProtocolManager.java @@ -5,8 +5,8 @@ import java.util.concurrent.ConcurrentHashMap; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationContext; -import org.springframework.stereotype.Service; +import com.acgist.taoyao.boot.annotation.Manager; import com.acgist.taoyao.boot.model.Header; import com.acgist.taoyao.boot.model.Message; import com.acgist.taoyao.boot.model.MessageCode; @@ -16,6 +16,7 @@ import com.acgist.taoyao.signal.client.ClientSession; import com.acgist.taoyao.signal.client.ClientSessionManager; import com.acgist.taoyao.signal.protocol.client.ClientRegisterProtocol; import com.acgist.taoyao.signal.protocol.platform.ErrorProtocol; +import com.acgist.taoyao.signal.service.SecurityService; import jakarta.annotation.PostConstruct; import lombok.extern.slf4j.Slf4j; @@ -26,7 +27,7 @@ import lombok.extern.slf4j.Slf4j; * @author acgist */ @Slf4j -@Service +@Manager public class ProtocolManager { /** @@ -39,6 +40,8 @@ public class ProtocolManager { @Autowired private ErrorProtocol errorProtocol; @Autowired + private SecurityService securityService; + @Autowired private ClientSessionManager clientSessionManager; @PostConstruct @@ -101,7 +104,7 @@ public class ProtocolManager { } if(protocol instanceof ClientRegisterProtocol) { protocol.execute(sn, value, session); - } else if(session.authorized() && sn.equals(session.sn())) { + } else if(this.securityService.authenticate(value, session, protocol)) { protocol.execute(sn, value, session); } else { log.warn("终端会话没有授权:{}", message); diff --git a/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/client/ClientBroadcastProtocol.java b/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/client/ClientBroadcastProtocol.java index 0a16abf..f13c4d8 100644 --- a/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/client/ClientBroadcastProtocol.java +++ b/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/client/ClientBroadcastProtocol.java @@ -1,8 +1,8 @@ package com.acgist.taoyao.signal.protocol.client; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; +import com.acgist.taoyao.boot.annotation.Protocol; import com.acgist.taoyao.boot.model.Message; import com.acgist.taoyao.signal.client.ClientSession; import com.acgist.taoyao.signal.client.ClientSessionManager; @@ -13,7 +13,7 @@ import com.acgist.taoyao.signal.protocol.ProtocolAdapter; * * @author acgist */ -@Component +@Protocol public class ClientBroadcastProtocol extends ProtocolAdapter { public static final Integer PID = 2007; diff --git a/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/client/ClientCloseProtocol.java b/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/client/ClientCloseProtocol.java index 808a1c7..e45dd06 100644 --- a/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/client/ClientCloseProtocol.java +++ b/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/client/ClientCloseProtocol.java @@ -1,7 +1,6 @@ package com.acgist.taoyao.signal.protocol.client; -import org.springframework.stereotype.Component; - +import com.acgist.taoyao.boot.annotation.Protocol; import com.acgist.taoyao.boot.model.Message; import com.acgist.taoyao.signal.client.ClientSession; import com.acgist.taoyao.signal.protocol.ProtocolAdapter; @@ -14,7 +13,7 @@ import lombok.extern.slf4j.Slf4j; * @author acgist */ @Slf4j -@Component +@Protocol public class ClientCloseProtocol extends ProtocolAdapter { public static final Integer PID = 2001; diff --git a/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/client/ClientConfigProtocol.java b/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/client/ClientConfigProtocol.java index 80366fb..feed3fd 100644 --- a/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/client/ClientConfigProtocol.java +++ b/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/client/ClientConfigProtocol.java @@ -4,8 +4,8 @@ import java.time.LocalDateTime; import java.util.Map; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; +import com.acgist.taoyao.boot.annotation.Protocol; import com.acgist.taoyao.boot.config.MediaProperties; import com.acgist.taoyao.boot.config.WebrtcProperties; import com.acgist.taoyao.boot.model.Message; @@ -19,7 +19,7 @@ import com.acgist.taoyao.signal.protocol.ProtocolAdapter; * * @author acgist */ -@Component +@Protocol public class ClientConfigProtocol extends ProtocolAdapter { public static final Integer PID = 2004; diff --git a/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/client/ClientHeartbeatProtocol.java b/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/client/ClientHeartbeatProtocol.java index c3d6e71..aeff236 100644 --- a/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/client/ClientHeartbeatProtocol.java +++ b/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/client/ClientHeartbeatProtocol.java @@ -3,8 +3,7 @@ package com.acgist.taoyao.signal.protocol.client; import java.time.LocalDateTime; import java.util.Map; -import org.springframework.stereotype.Component; - +import com.acgist.taoyao.boot.annotation.Protocol; import com.acgist.taoyao.boot.model.Message; import com.acgist.taoyao.signal.client.ClientSession; import com.acgist.taoyao.signal.client.ClientSessionStatus; @@ -15,7 +14,7 @@ import com.acgist.taoyao.signal.protocol.ProtocolMapAdapter; * * @author acgist */ -@Component +@Protocol public class ClientHeartbeatProtocol extends ProtocolMapAdapter { public static final Integer PID = 2005; diff --git a/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/client/ClientListProtocol.java b/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/client/ClientListProtocol.java index 57a2531..86f6ed4 100644 --- a/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/client/ClientListProtocol.java +++ b/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/client/ClientListProtocol.java @@ -1,8 +1,8 @@ package com.acgist.taoyao.signal.protocol.client; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; +import com.acgist.taoyao.boot.annotation.Protocol; import com.acgist.taoyao.boot.model.Message; import com.acgist.taoyao.signal.client.ClientSession; import com.acgist.taoyao.signal.client.ClientSessionManager; @@ -13,7 +13,7 @@ import com.acgist.taoyao.signal.protocol.ProtocolAdapter; * * @author acgist */ -@Component +@Protocol public class ClientListProtocol extends ProtocolAdapter { public static final Integer PID = 2999; diff --git a/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/client/ClientOfflineProtocol.java b/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/client/ClientOfflineProtocol.java index 859c849..0868771 100644 --- a/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/client/ClientOfflineProtocol.java +++ b/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/client/ClientOfflineProtocol.java @@ -1,7 +1,6 @@ package com.acgist.taoyao.signal.protocol.client; -import org.springframework.stereotype.Component; - +import com.acgist.taoyao.boot.annotation.Protocol; import com.acgist.taoyao.boot.model.Message; import com.acgist.taoyao.signal.client.ClientSession; import com.acgist.taoyao.signal.protocol.ProtocolAdapter; @@ -11,7 +10,7 @@ import com.acgist.taoyao.signal.protocol.ProtocolAdapter; * * @author acgist */ -@Component +@Protocol public class ClientOfflineProtocol extends ProtocolAdapter { public static final Integer PID = 2003; diff --git a/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/client/ClientOnlineProtocol.java b/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/client/ClientOnlineProtocol.java index 57dd519..1120a8c 100644 --- a/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/client/ClientOnlineProtocol.java +++ b/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/client/ClientOnlineProtocol.java @@ -1,7 +1,6 @@ package com.acgist.taoyao.signal.protocol.client; -import org.springframework.stereotype.Component; - +import com.acgist.taoyao.boot.annotation.Protocol; import com.acgist.taoyao.boot.model.Message; import com.acgist.taoyao.signal.client.ClientSession; import com.acgist.taoyao.signal.protocol.ProtocolAdapter; @@ -11,7 +10,7 @@ import com.acgist.taoyao.signal.protocol.ProtocolAdapter; * * @author acgist */ -@Component +@Protocol public class ClientOnlineProtocol extends ProtocolAdapter { public static final Integer PID = 2002; diff --git a/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/client/ClientRebootProtocol.java b/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/client/ClientRebootProtocol.java new file mode 100644 index 0000000..4488c52 --- /dev/null +++ b/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/client/ClientRebootProtocol.java @@ -0,0 +1,26 @@ +package com.acgist.taoyao.signal.protocol.client; + +import com.acgist.taoyao.boot.annotation.Protocol; +import com.acgist.taoyao.boot.model.Message; +import com.acgist.taoyao.signal.client.ClientSession; +import com.acgist.taoyao.signal.protocol.ProtocolAdapter; + +/** + * 重启终端信令 + * + * @author acgist + */ +@Protocol +public class ClientRebootProtocol extends ProtocolAdapter { + + public static final Integer PID = 2997; + + public ClientRebootProtocol() { + super(PID, "重启终端信令"); + } + + @Override + public void execute(String sn, Message message, ClientSession session) { + } + +} diff --git a/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/client/ClientRegisterProtocol.java b/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/client/ClientRegisterProtocol.java index 6957b82..9f6195e 100644 --- a/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/client/ClientRegisterProtocol.java +++ b/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/client/ClientRegisterProtocol.java @@ -4,8 +4,8 @@ import java.util.Map; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; +import com.acgist.taoyao.boot.annotation.Protocol; import com.acgist.taoyao.boot.config.SecurityProperties; import com.acgist.taoyao.boot.model.Message; import com.acgist.taoyao.boot.model.MessageCode; @@ -18,7 +18,7 @@ import com.acgist.taoyao.signal.protocol.ProtocolMapAdapter; * * @author acgist */ -@Component +@Protocol public class ClientRegisterProtocol extends ProtocolMapAdapter { public static final Integer PID = 2000; diff --git a/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/client/ClientStatusProtocol.java b/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/client/ClientStatusProtocol.java index 1d5fbcc..f662bb1 100644 --- a/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/client/ClientStatusProtocol.java +++ b/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/client/ClientStatusProtocol.java @@ -4,8 +4,8 @@ import java.util.Map; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; +import com.acgist.taoyao.boot.annotation.Protocol; import com.acgist.taoyao.boot.model.Message; import com.acgist.taoyao.signal.client.ClientSession; import com.acgist.taoyao.signal.client.ClientSessionManager; @@ -16,7 +16,7 @@ import com.acgist.taoyao.signal.protocol.ProtocolMapAdapter; * * @author acgist */ -@Component +@Protocol public class ClientStatusProtocol extends ProtocolMapAdapter { public static final Integer PID = 2998; diff --git a/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/client/ClientUnicastProtocol.java b/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/client/ClientUnicastProtocol.java index 153d216..9b2e161 100644 --- a/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/client/ClientUnicastProtocol.java +++ b/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/client/ClientUnicastProtocol.java @@ -4,8 +4,8 @@ import java.util.Map; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; +import com.acgist.taoyao.boot.annotation.Protocol; import com.acgist.taoyao.boot.model.Message; import com.acgist.taoyao.signal.client.ClientSession; import com.acgist.taoyao.signal.client.ClientSessionManager; @@ -19,7 +19,7 @@ import lombok.extern.slf4j.Slf4j; * @author acgist */ @Slf4j -@Component +@Protocol public class ClientUnicastProtocol extends ProtocolMapAdapter { public static final Integer PID = 2006; diff --git a/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/meeting/MeetingCreateProtocol.java b/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/meeting/MeetingCreateProtocol.java index e6b5aae..d395cf6 100644 --- a/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/meeting/MeetingCreateProtocol.java +++ b/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/meeting/MeetingCreateProtocol.java @@ -2,8 +2,7 @@ package com.acgist.taoyao.signal.protocol.meeting; import java.util.Map; -import org.springframework.stereotype.Component; - +import com.acgist.taoyao.boot.annotation.Protocol; import com.acgist.taoyao.boot.model.Message; import com.acgist.taoyao.signal.client.ClientSession; import com.acgist.taoyao.signal.event.meeting.MeetingCreateEvent; @@ -14,7 +13,7 @@ import com.acgist.taoyao.signal.protocol.ProtocolMapAdapter; * * @author acgist */ -@Component +@Protocol public class MeetingCreateProtocol extends ProtocolMapAdapter { public static final Integer PID = 4000; diff --git a/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/meeting/MeetingEnterProtocol.java b/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/meeting/MeetingEnterProtocol.java index ba46985..87f3c06 100644 --- a/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/meeting/MeetingEnterProtocol.java +++ b/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/meeting/MeetingEnterProtocol.java @@ -2,8 +2,7 @@ package com.acgist.taoyao.signal.protocol.meeting; import java.util.Map; -import org.springframework.stereotype.Component; - +import com.acgist.taoyao.boot.annotation.Protocol; import com.acgist.taoyao.boot.model.Message; import com.acgist.taoyao.signal.client.ClientSession; import com.acgist.taoyao.signal.event.meeting.MeetingEnterEvent; @@ -14,7 +13,7 @@ import com.acgist.taoyao.signal.protocol.ProtocolMapAdapter; * * @author acgist */ -@Component +@Protocol public class MeetingEnterProtocol extends ProtocolMapAdapter { public static final Integer PID = 4002; diff --git a/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/platform/ErrorProtocol.java b/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/platform/ErrorProtocol.java index efc03d6..4c3176e 100644 --- a/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/platform/ErrorProtocol.java +++ b/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/platform/ErrorProtocol.java @@ -1,7 +1,6 @@ package com.acgist.taoyao.signal.protocol.platform; -import org.springframework.stereotype.Component; - +import com.acgist.taoyao.boot.annotation.Protocol; import com.acgist.taoyao.boot.model.Message; import com.acgist.taoyao.boot.model.MessageCode; import com.acgist.taoyao.signal.client.ClientSession; @@ -12,7 +11,7 @@ import com.acgist.taoyao.signal.protocol.ProtocolAdapter; * * @author acgist */ -@Component +@Protocol public class ErrorProtocol extends ProtocolAdapter { public static final Integer PID = 1999; diff --git a/taoyao-signal/src/main/java/com/acgist/taoyao/signal/service/SecurityService.java b/taoyao-signal/src/main/java/com/acgist/taoyao/signal/service/SecurityService.java new file mode 100644 index 0000000..a70276b --- /dev/null +++ b/taoyao-signal/src/main/java/com/acgist/taoyao/signal/service/SecurityService.java @@ -0,0 +1,25 @@ +package com.acgist.taoyao.signal.service; + +import com.acgist.taoyao.boot.model.Message; +import com.acgist.taoyao.signal.client.ClientSession; +import com.acgist.taoyao.signal.protocol.Protocol; + +/** + * 信令安全 + * + * @author acgist + */ +public interface SecurityService { + + /** + * 鉴权 + * + * @param message 信令 + * @param session 会话 + * @param protocol 协议 + * + * @return 是否成功 + */ + boolean authenticate(Message message, ClientSession session, Protocol protocol); + +} diff --git a/taoyao-signal/src/main/java/com/acgist/taoyao/signal/service/impl/SecurityServiceImpl.java b/taoyao-signal/src/main/java/com/acgist/taoyao/signal/service/impl/SecurityServiceImpl.java new file mode 100644 index 0000000..90d976c --- /dev/null +++ b/taoyao-signal/src/main/java/com/acgist/taoyao/signal/service/impl/SecurityServiceImpl.java @@ -0,0 +1,28 @@ +package com.acgist.taoyao.signal.service.impl; + +import com.acgist.taoyao.boot.model.Header; +import com.acgist.taoyao.boot.model.Message; +import com.acgist.taoyao.signal.client.ClientSession; +import com.acgist.taoyao.signal.protocol.Protocol; +import com.acgist.taoyao.signal.service.SecurityService; + +public class SecurityServiceImpl implements SecurityService { + + @Override + public boolean authenticate(Message message, ClientSession session, Protocol protocol) { + if(!session.authorized()) { + return false; + } + final Header header = message.getHeader(); + final String sn = header.getSn(); + if(!sn.equals(session.sn())) { + return false; + } + if(!protocol.authenticate(message)) { + return false; + } + // 更多 + return true; + } + +} diff --git a/taoyao-webrtc/taoyao-webrtc-mcu/src/main/java/com/acgist/taoyao/webrtc/mcu/config/McuAutoConfiguration.java b/taoyao-webrtc/taoyao-webrtc-mcu/src/main/java/com/acgist/taoyao/webrtc/mcu/config/McuAutoConfiguration.java new file mode 100644 index 0000000..9779eb9 --- /dev/null +++ b/taoyao-webrtc/taoyao-webrtc-mcu/src/main/java/com/acgist/taoyao/webrtc/mcu/config/McuAutoConfiguration.java @@ -0,0 +1,15 @@ +package com.acgist.taoyao.webrtc.mcu.config; + +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.context.annotation.Configuration; + +/** + * MCU自动配置 + * + * @author acgist + */ +@Configuration +@ConditionalOnProperty(prefix = "taoyao.webrtc", name = "model", havingValue = "MCU", matchIfMissing = false) +public class McuAutoConfiguration { + +} diff --git a/taoyao-webrtc/taoyao-webrtc-mcu/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports b/taoyao-webrtc/taoyao-webrtc-mcu/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports new file mode 100644 index 0000000..4e6a786 --- /dev/null +++ b/taoyao-webrtc/taoyao-webrtc-mcu/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports @@ -0,0 +1 @@ +com.acgist.taoyao.webrtc.mcu.config.McuAutoConfiguration \ No newline at end of file diff --git a/taoyao-webrtc/taoyao-webrtc-mesh/src/main/java/com/acgist/taoyao/webrtc/mesh/listener/MediaSubscribeListener.java b/taoyao-webrtc/taoyao-webrtc-mesh/src/main/java/com/acgist/taoyao/webrtc/mesh/listener/MediaSubscribeListener.java index a884220..b1a1272 100644 --- a/taoyao-webrtc/taoyao-webrtc-mesh/src/main/java/com/acgist/taoyao/webrtc/mesh/listener/MediaSubscribeListener.java +++ b/taoyao-webrtc/taoyao-webrtc-mesh/src/main/java/com/acgist/taoyao/webrtc/mesh/listener/MediaSubscribeListener.java @@ -1,14 +1,14 @@ package com.acgist.taoyao.webrtc.mesh.listener; import com.acgist.taoyao.signal.event.media.MediaSubscribeEvent; -import com.acgist.taoyao.signal.listener.ApplicationListenerAdapter; +import com.acgist.taoyao.signal.listener.MediaListenerAdapter; /** * 媒体订阅监听 * * @author acgist */ -public class MediaSubscribeListener extends ApplicationListenerAdapter { +public class MediaSubscribeListener extends MediaListenerAdapter { @Override public void onApplicationEvent(MediaSubscribeEvent event) { diff --git a/taoyao-webrtc/taoyao-webrtc-sfu/src/main/java/com/acgist/taoyao/webrtc/sfu/config/SfuAutoConfiguration.java b/taoyao-webrtc/taoyao-webrtc-sfu/src/main/java/com/acgist/taoyao/webrtc/sfu/config/SfuAutoConfiguration.java new file mode 100644 index 0000000..8d2c09b --- /dev/null +++ b/taoyao-webrtc/taoyao-webrtc-sfu/src/main/java/com/acgist/taoyao/webrtc/sfu/config/SfuAutoConfiguration.java @@ -0,0 +1,15 @@ +package com.acgist.taoyao.webrtc.sfu.config; + +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.context.annotation.Configuration; + +/** + * SFU自动配置 + * + * @author acgist + */ +@Configuration +@ConditionalOnProperty(prefix = "taoyao.webrtc", name = "model", havingValue = "SFU", matchIfMissing = true) +public class SfuAutoConfiguration { + +} diff --git a/taoyao-webrtc/taoyao-webrtc-sfu/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports b/taoyao-webrtc/taoyao-webrtc-sfu/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports new file mode 100644 index 0000000..a62759a --- /dev/null +++ b/taoyao-webrtc/taoyao-webrtc-sfu/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports @@ -0,0 +1 @@ +com.acgist.taoyao.webrtc.sfu.config.SfuAutoConfiguration \ No newline at end of file