This commit is contained in:
acgist
2022-11-26 12:20:05 +08:00
parent 0f339f4aea
commit 474be08cc9
21 changed files with 493 additions and 362 deletions

View File

@@ -4,6 +4,8 @@ import java.util.Map;
import com.acgist.taoyao.boot.annotation.EventListener;
import com.acgist.taoyao.boot.model.Message;
import com.acgist.taoyao.boot.model.MessageCode;
import com.acgist.taoyao.boot.model.MessageCodeException;
import com.acgist.taoyao.meeting.Meeting;
import com.acgist.taoyao.meeting.MeetingListenerAdapter;
import com.acgist.taoyao.signal.event.meeting.MeetingEnterEvent;
@@ -21,13 +23,18 @@ public class MeetingEnterListener extends MeetingListenerAdapter<MeetingEnterEve
final String sn = event.getSn();
final String id = event.get("id");
final Meeting meeting = this.meetingManager.meeting(id);
if(meeting == null) {
throw MessageCodeException.of(MessageCode.CODE_3400, "无效会议");
}
meeting.addSn(sn);
final Message message = event.getMessage();
message.setBody(Map.of(
"id", meeting.getId(),
"sn", sn
));
this.clientSessionManager.broadcast(sn, message);
meeting.getSns().stream()
.filter(v -> !sn.equals(v))
.forEach(v -> this.clientSessionManager.unicast(v, message));
}
}

View File

@@ -84,7 +84,7 @@ taoyao:
- stun:stun4.l.google.com:19302
- stun:stun.stunprotocol.org:3478
# 信令服务配置
host: localhost
host: 192.168.1.100
port: ${server.port:8888}
schema: wss
websocket: /websocket.signal

View File

@@ -1,5 +1,6 @@
/** 桃夭WebRTC终端核心功能 */
/** 兼容 */
const RTCIceCandidate = window.RTCIceCandidate || window.mozRTCIceCandidate || window.webkitRTCIceCandidate;
const RTCPeerConnection = window.RTCPeerConnection || window.mozRTCPeerConnection || window.webkitRTCPeerConnection;
const RTCSessionDescription = window.RTCSessionDescription || window.mozRTCSessionDescription || window.webkitRTCSessionDescription;
/** 默认音频配置 */
@@ -70,6 +71,12 @@ const signalProtocol = {
},
/** 媒体信令 */
media: {
/** 发布 */
publish: 5000,
/** 订阅 */
subscribe: 5002,
/** 候选 */
candidate: 5004
},
/** 终端信令 */
client: {
@@ -108,12 +115,12 @@ const signalProtocol = {
return Date.now() + '' + this.index;
},
/** 生成信令消息 */
buildProtocol: function(sn, pid, body, id) {
buildProtocol: function(pid, body, id) {
let message = {
header: {
v: signalConfig.version,
id: id || this.buildId(),
sn: sn,
sn: signalConfig.sn,
pid: pid,
},
'body': body
@@ -156,9 +163,8 @@ const signalChannel = {
clearTimeout(self.heartbeatTimer);
}
self.heartbeatTimer = setTimeout(function() {
if (self.channel && self.channel.readyState == WebSocket.OPEN) {
if (self.channel && self.channel.readyState === WebSocket.OPEN) {
self.push(signalProtocol.buildProtocol(
signalConfig.sn,
signalProtocol.client.heartbeat,
{
signal: 100,
@@ -183,7 +189,6 @@ const signalChannel = {
console.debug('打开信令通道', e);
// 注册终端
self.push(signalProtocol.buildProtocol(
signalConfig.sn,
signalProtocol.client.register,
{
ip: null,
@@ -252,7 +257,7 @@ const signalChannel = {
}
self.lockReconnect = true;
// 关闭旧的通道
if(self.channel && self.channel.readyState == WebSocket.OPEN) {
if(self.channel && self.channel.readyState === WebSocket.OPEN) {
self.channel.close();
self.channel = null;
}
@@ -292,6 +297,15 @@ const signalChannel = {
defaultCallback: function(data) {
console.debug('没有适配信令消息默认处理', data);
switch(data.header.pid) {
case signalProtocol.media.publish:
this.defaultMediaPublish(data);
break;
case signalProtocol.media.subscribe:
this.defaultMediaSubscribe(data);
break;
case signalProtocol.media.candidate:
this.defaultMediaCandidate(data);
break;
case signalProtocol.client.register:
break;
case signalProtocol.client.config:
@@ -305,6 +319,7 @@ const signalChannel = {
case signalProtocol.meeting.create:
break;
case signalProtocol.meeting.enter:
this.defaultMeetingEnter(data);
break;
case signalProtocol.platform.error:
console.error('信令发生错误', data);
@@ -322,8 +337,43 @@ const signalChannel = {
location.reload();
},
/** 默认媒体回调 */
defaultMediaPublish: function(data) {
this.taoyao.localMediaChannel.setRemoteDescription(new RTCSessionDescription(data.body));
},
defaultMediaSubscribe: function(data) {
let self = this;
const from = data.body.from;
let remote = this.taoyao.remoteClientFilter(from);
if(!remote) {
remote = new TaoyaoClient(from);
this.taoyao.remoteClient.push(remote);
}
self.taoyao.remoteMediaChannel.setRemoteDescription(new RTCSessionDescription(data.body));
self.taoyao.remoteMediaChannel.createAnswer().then(description => {
console.debug('Local Create Answer', description);
self.taoyao.remoteMediaChannel.setLocalDescription(description);
self.push(signalProtocol.buildProtocol(
signalProtocol.media.publish,
{
to: from,
sdp: description.sdp,
type: description.type
}
));
});
},
defaultMediaCandidate: function(data) {
if(
!data.body.candidate ||
!data.body.candidate.candidate ||
!data.body.candidate.sdpMid ||
!data.body.candidate.sdpMLineIndex
) {
console.warn('候选缺失要素', data);
return;
}
let candidate = new RTCIceCandidate(data.body.candidate);
this.taoyao.remoteMediaChannel.addIceCandidate(candidate);
},
/** 会议默认回调 */
defaultMeetingEnter: function(data) {
@@ -347,6 +397,9 @@ function TaoyaoClient(
this.audioStatus = false;
this.videoStatus = false;
this.recordStatus = false;
/** 重置 */
this.reset = function() {
}
/** 播放视频 */
this.play = async function() {
await this.video.play();
@@ -368,16 +421,32 @@ function TaoyaoClient(
return this;
};
/** 设置视频对象 */
this.buildVideo = async function(videoId, stream) {
this.buildVideo = async function(videoId, stream, track) {
if(!this.video) {
this.video = document.getElementById(videoId);
}
await this.buildStream(stream);
await this.buildStream(stream, track);
return this;
};
/** 设置媒体流 */
this.buildStream = async function(stream) {
this.buildStream = async function(stream, track) {
if(stream) {
if(track) {
if(!this.stream) {
this.stream = stream;
this.video.srcObject = this.stream;
}
// TODO删除旧的
this.stream.addTrack(track);
if(track.kind === 'audio') {
this.audioTrack = track;
this.audioStatus = true;
} else if(track.kind === 'video') {
this.videoTrack = track;
this.videoStatus = true;
}
await this.video.load();
} else {
this.stream = stream;
this.video.srcObject = stream;
let audioTrack = stream.getAudioTracks();
@@ -390,7 +459,9 @@ function TaoyaoClient(
this.videoTrack = videoTrack;
this.videoStatus = true;
}
console.debug('设置媒体流', this.stream, this.audioTrack, this.videoTrack);
await this.video.load();
}
console.debug('设置媒体流', this.video, this.stream, this.audioTrack, this.videoTrack);
await this.play();
}
return this;
@@ -430,51 +501,6 @@ function Taoyao(
this.remoteMediaChannel = null;
/** 信令通道 */
this.signalChannel = null;
/** 检查设备 */
this.checkDevice = function() {
let self = this;
if(
navigator.mediaDevices &&
navigator.mediaDevices.getUserMedia &&
navigator.mediaDevices.enumerateDevices
) {
navigator.mediaDevices.enumerateDevices()
.then(list => {
let audioDevice = false;
let videoDevice = false;
list.forEach(v => {
console.debug('终端媒体设备', v.kind, v.label);
switch(v.kind) {
case 'audioinput':
audioDevice = true;
break;
case 'videoinput':
videoDevice = true;
break;
default:
console.debug('没有适配设备', v.kind, v.label);
break;
}
});
if(!audioDevice) {
console.warn('终端没有音频输入设备');
self.audioEnabled = false;
}
if(!videoDevice) {
console.warn('终端没有视频输入设备');
self.videoEnabled = false;
}
})
.catch(e => {
console.error('检查终端设备异常', e);
self.videoEnabled = false;
self.videoEnabled = false;
});
} else {
throw new Error('不支持的终端设备');
}
return this;
};
/** 媒体配置 */
this.configMedia = function(audio = {}, video = {}) {
this.audioConfig = {...this.audioConfig, ...audio};
@@ -502,13 +528,43 @@ function Taoyao(
};
/** 打开本地媒体 */
this.buildLocalMedia = function() {
console.debug('打开终端媒体', this.audioConfig, this.videoConfig);
let self = this;
this.checkDevice();
return new Promise((resolve, reject) => {
if(
navigator.mediaDevices &&
navigator.mediaDevices.getUserMedia &&
navigator.mediaDevices.enumerateDevices
) {
navigator.mediaDevices.enumerateDevices()
.then(list => {
let audioEnabled = false;
let videoEnabled = false;
list.forEach(v => {
console.debug('终端媒体设备', v, v.kind, v.label);
switch(v.kind) {
case 'audioinput':
audioEnabled = true;
break;
case 'videoinput':
videoEnabled = true;
break;
default:
console.debug('没有适配设备', v.kind, v.label);
break;
}
});
if(!audioEnabled) {
console.warn('终端没有音频输入设备');
self.audioEnabled = false;
}
if(!videoEnabled) {
console.warn('终端没有视频输入设备');
self.videoEnabled = false;
}
console.debug('打开终端媒体', self.audioEnabled, self.videoEnabled, self.audioConfig, self.videoConfig);
navigator.mediaDevices.getUserMedia({
audio: self.audioConfig,
video: self.videoConfig
audio: self.audioEnabled ? self.audioConfig : false,
video: self.videoEnabled ? self.videoConfig : false
})
.then(resolve)
.catch(reject);
@@ -517,6 +573,15 @@ function Taoyao(
// audio: self.audioConfig,
// video: self.videoConfig
// }, resolve, reject);
})
.catch(e => {
console.error('检查终端设备异常', e);
self.videoEnabled = false;
self.videoEnabled = false;
});
} else {
throw new Error('不支持的终端设备');
}
});
};
/** 远程终端过滤 */
@@ -537,6 +602,7 @@ function Taoyao(
};
/** 打开媒体通道 */
this.buildMediaChannel = async function(localVideoId, stream) {
let self = this;
// 本地视频
this.localClient = new TaoyaoClient(signalConfig.sn);
await this.localClient.buildVideo(localVideoId, stream);
@@ -548,61 +614,92 @@ function Taoyao(
if(this.localClient.videoTrack) {
this.localClient.videoTrack.forEach(v => this.localMediaChannel.addTrack(v, this.localClient.stream));
}
this.localMediaChannel.ontrack = this.localMediaChannelTrack;
this.localMediaChannel.ondatachannel = this.localMediaChannelDataChannel;
this.localMediaChannel.onicecandidate = this.localMediaChannelIceCandidate;
this.localMediaChannel.ontrack = function(e) {
console.debug('Local Media Track', e);
};
this.localMediaChannel.ondatachannel = function(channel) {
channel.onopen = function() {
console.debug('Local DataChannel Open');
}
channel.onmessage = function(data) {
console.debug('Local DataChannel Message', data);
}
channel.onclose = function() {
console.debug('Local DataChannel Close');
}
channel.onerror = function(e) {
console.debug('Local DataChannel Error', e);
}
};
this.localMediaChannel.onicecandidate = function(e) {
let sns = self.remoteClient.filter(v => v.video === null).map(v => v.sn);
console.debug('Local ICE Candidate', sns, e);
self.push(signalProtocol.buildProtocol(
signalProtocol.media.candidate,
{
sns: sns,
candidate: e.candidate
}
));
};
// 远程通道
this.remoteMediaChannel = new RTCPeerConnection(defaultRPCConfig);
this.remoteMediaChannel.ontrack = this.remoteMediaChannelTrack;
this.remoteMediaChannel.ondatachannel = this.remoteMediaChannelDataChannel;
this.remoteMediaChannel.onicecandidate = this.remoteMediaChannelIceCandidate;
this.remoteMediaChannel.ontrack = function(e) {
console.debug('Remote Media Track', e);
// TODO匹配
let remote = self.remoteClient[0];
remote.buildVideo(remote.sn, e.streams[0], e.track);
};
this.remoteMediaChannel.ondatachannel = function(channel) {
channel.onopen = function() {
console.debug('Remote DataChannel Open');
}
channel.onmessage = function(data) {
console.debug('Remote DataChannel Message', data);
}
channel.onclose = function() {
console.debug('Remote DataChannel Close');
}
channel.onerror = function(e) {
console.debug('Remote DataChannel Error', e);
}
};
this.remoteMediaChannel.onicecandidate = function(e) {
let sns = self.remoteClient.filter(v => v.video === null).map(v => v.sn);
console.debug('Remote ICE Candidate', sns, e);
self.push(signalProtocol.buildProtocol(
signalProtocol.media.candidate,
{
sns: sns,
candidate: e.candidate
}
));
};
console.debug('打开媒体通道', this.localMediaChannel, this.remoteMediaChannel);
return this;
};
/** 本地 */
this.localMediaChannelTrack = function() {
};
this.localMediaChannelDataChannel = function(channel) {
channel.onopen = function() {
console.debug('DataChannel Open');
}
channel.onmessage = function(data) {
console.debug('DataChannel Message', data);
}
channel.onclose = function() {
console.debug('DataChannel Close');
}
channel.onerror = function(e) {
console.debug('DataChannel Error', e);
}
};
this.localMediaChannelIceCandidate = function() {
};
/** 远程 */
this.localMediaChannelTrack = function() {
};
this.localMediaChannelDataChannel = function(channel) {
channel.onopen = function() {
console.debug('DataChannel Open');
}
channel.onmessage = function(data) {
console.debug('DataChannel Message', data);
}
channel.onclose = function() {
console.debug('DataChannel Close');
}
channel.onerror = function(e) {
console.debug('DataChannel Error', e);
}
};
this.localMediaChannelIceCandidate = function() {
};
/** 媒体信令 */
this.mediaSubscribe = function(sn, callback) {
let self = this;
let remote = self.remoteClientFilter(sn);
if(remote) {
remote.reset();
} else {
remote = new TaoyaoClient(sn);
this.remoteClient.push(remote);
}
if(self.webrtc.model === 'MESH') {
self.localMediaChannel.createOffer().then(description => {
console.debug('Local Create Offer', description);
self.localMediaChannel.setLocalDescription(description);
self.push(signalProtocol.buildProtocol(
signalProtocol.media.subscribe,
{
to: sn,
sdp: description.sdp,
type: description.type
}
), callback);
});
}
};
@@ -610,14 +707,12 @@ function Taoyao(
this.meetingCreate = function(callback) {
let self = this;
self.push(signalProtocol.buildProtocol(
signalConfig.sn,
signalProtocol.meeting.create,
signalProtocol.meeting.create
), callback);
}
this.meetingEnter = function(id, callback) {
let self = this;
self.push(signalProtocol.buildProtocol(
signalConfig.sn,
signalProtocol.meeting.enter,
{
id: id
@@ -625,206 +720,3 @@ function Taoyao(
), callback);
};
};
/*
var peer;
var socket; // WebSocket
var supportStream = false; // 是否支持使用数据流
var localVideo; // 本地视频
var localVideoStream; // 本地视频流
var remoteVideo; // 远程视频
var remoteVideoStream; // 远程视频流
var initiator = false; // 是否已经有人在等待
var started = false; // 是否开始
var channelReady = false; // 是否打开WebSocket通道
// 初始
function initialize() {
console.log("初始聊天");
// 获取视频
localVideo = document.getElementById("localVideo");
remoteVideo = document.getElementById("remoteVideo");
supportStream = "srcObject" in localVideo;
// 显示状态
if (initiator) {
setNotice("开始连接");
} else {
setNotice("加入聊天https://www.acgist.com/demo/video/?oid=FFB85D84AC56DAF88B7E22AFFA7533D3");
}
// 打开WebSocket
openChannel();
// 创建终端媒体
buildUserMedia();
}
function openChannel() {
console.log("打开WebSocket");
socket = new WebSocket("wss://www.acgist.com/video.ws/FFB85D84AC56DAF88B7E22AFFA7533D3");
socket.onopen = channelOpened;
socket.onmessage = channelMessage;
socket.onclose = channelClosed;
socket.onerror = channelError;
}
function channelOpened() {
console.log("打开WebSocket成功");
channelReady = true;
}
function channelMessage(message) {
console.log("收到消息:" + message.data);
var msg = JSON.parse(message.data);
if (msg.type === "offer") { // 处理Offer消息
if (!initiator && !started) {
connectPeer();
}
peer.setRemoteDescription(new RTCSessionDescription(msg));
peer.createAnswer().then(buildLocalDescription);
} else if (msg.type === "answer" && started) { // 处理Answer消息
peer.setRemoteDescription(new RTCSessionDescription(msg));
} else if (msg.type === "candidate" && started) {
var candidate = new RTCIceCandidate({
sdpMLineIndex : msg.label,
candidate : msg.candidate
});
peer.addIceCandidate(candidate);
} else if (msg.type === "bye" && started) {
onRemoteClose();
setNotice("对方已断开!");
} else if(msg.type === "nowaiting") {
onRemoteClose();
setNotice("对方已离开!");
}
}
function channelClosed() {
console.log("关闭WebSocket");
openChannel(); // 重新打开WebSocket
}
function channelError(event) {
console.log("WebSocket异常" + event);
}
function buildUserMedia() {
console.log("获取终端媒体");
if(navigator.mediaDevices && navigator.mediaDevices.getUserMedia) {
navigator.mediaDevices.getUserMedia({
"audio" : true,
"video" : true
})
.then(onUserMediaSuccess)
.catch(onUserMediaError);
} else {
navigator.getUserMedia({
"audio" : true,
"video" : true
}, onUserMediaSuccess, onUserMediaError);
}
}
function onUserMediaSuccess(stream) {
localVideoStream = stream;
if (supportStream) {
localVideo.srcObject = localVideoStream;
} else {
localVideo.src = URL.createObjectURL(localVideoStream);
}
if (initiator) {
connectPeer();
}
}
function onUserMediaError(error) {
alert("请打开摄像头!");
}
function connectPeer() {
if (!started && localVideoStream && channelReady) {
console.log("开始连接Peer");
started = true;
buildPeerConnection();
peer.addStream(localVideoStream);
if (initiator) {
peer.createOffer().then(buildLocalDescription);
}
}
}
function buildPeerConnection() {
//var server = {"iceServers" : [{"url" : "stun:stun.l.google.com:19302"}]};
var server = {"iceServers" : [{"url" : "stun:stun1.l.google.com:19302"}]};
peer = new PeerConnection(server);
peer.onicecandidate = peerIceCandidate;
peer.onconnecting = peerConnecting;
peer.onopen = peerOpened;
peer.onaddstream = peerAddStream;
peer.onremovestream = peerRemoveStream;
}
function peerIceCandidate(event) {
if (event.candidate) {
sendMessage({
type : "candidate",
id : event.candidate.sdpMid,
label : event.candidate.sdpMLineIndex,
candidate : event.candidate.candidate
});
} else {
console.log("不支持的candidate");
}
}
function peerConnecting(message) {
console.log("Peer连接");
}
function peerOpened(message) {
console.log("Peer打开");
}
function peerAddStream(event) {
console.log("远程视频添加");
remoteVideoStream = event.stream;
if(supportStream) {
remoteVideo.srcObject = remoteVideoStream;
} else {
remoteVideo.src = URL.createObjectURL(remoteVideoStream);
}
setNotice("连接成功");
waitForRemoteVideo();
}
function peerRemoveStream(event) {
console.log("远程视频移除");
}
function buildLocalDescription(description) {
peer.setLocalDescription(description);
sendMessage(description);
}
function sendMessage(message) {
var msgJson = JSON.stringify(message);
socket.send(msgJson);
console.log("发送信息:" + msgJson);
}
function setNotice(msg) {
document.getElementById("footer").innerHTML = msg;
}
function onRemoteClose() {
started = false;
initiator = false;
if(supportStream) {
remoteVideo.srcObject = null;
} else {
remoteVideo.src = null;
}
peer.close();
}
function waitForRemoteVideo() {
if (remoteVideo.currentTime > 0) { // 判断远程视频长度
setNotice("连接成功!");
} else {
setTimeout(waitForRemoteVideo, 100);
}
}
window.onbeforeunload = function() {
sendMessage({type : "bye"});
if(peer) {
peer.close();
}
socket.close();
}
if(!WebSocket) {
alert("你的浏览器不支持WebSocket");
} else if(!PeerConnection) {
alert("你的浏览器不支持RTCPeerConnection");
} else {
setTimeout(initialize, 100); // 加载完成调用初始化方法
}
window.onbeforeunload = function() {
socket.close();
}
*/

View File

@@ -28,14 +28,14 @@
<a class="record icon-radio-checked" title="录制视频" @click="recordSelf"></a>
</div>
</div>
<div class="meeting" v-for="client in this.clients" :key="client.sn">
<div class="meeting" v-for="client in this.remoteClient" :key="client.sn">
<div class="video">
<video v-bind:id="client.sn"></video>
</div>
<div class="handler">
<a class="audio" title="音频状态" v-bind:class="client.audio?'icon-volume-medium':'icon-volume-mute2'" @click="audio(client.sn)"></a>
<a class="video" title="视频状态" v-bind:class="client.video?'icon-play2':'icon-stop'" @click="video(client.sn)"></a>
<a class="record icon-radio-checked" title="录制视频" v-bind:class="client.record?'active':''" @click="record(client.sn)"></a>
<a class="audio" title="音频状态" v-bind:class="client.audioStatus?'icon-volume-medium':'icon-volume-mute2'" @click="audio(client.sn)"></a>
<a class="video" title="视频状态" v-bind:class="client.videoStatus?'icon-play2':'icon-stop'" @click="video(client.sn)"></a>
<a class="record icon-radio-checked" title="录制视频" v-bind:class="client.recordStatus?'active':''" @click="record(client.sn)"></a>
<a class="expel icon-cancel-circle" title="踢出会议" @click="expel(client.sn)"></a>
</div>
</div>
@@ -45,30 +45,36 @@
const vue = new Vue({
el: "#app",
data: {
clients: [
{sn:"1", audio: true, video: true, record: false},
{sn:"2", audio: true, video: true, record: false},
{sn:"3", audio: true, video: true, record: false}
],
taoyao: null,
remoteClient: [],
meetingId: null
},
mounted() {
if(signalConfig.sn) {
// TODO修改sn
// 随机终端标识
if(signalConfig.sn === 'taoyao') {
let sn = localStorage.getItem('taoyao.sn');
if(sn) {
signalConfig.sn = sn;
}
console.debug('终端标识', sn);
}
let self = this;
this.taoyao = new Taoyao("wss://localhost:8888/websocket.signal");
this.taoyao = new Taoyao("wss://192.168.1.100:8888/websocket.signal");
this.remoteClient = this.taoyao.remoteClient;
// 打开信令通道
this.taoyao
.buildChannel(self.callback)
.then(e => console.debug('连接成功'));
.then(e => console.debug('信令通道连接成功'));
// 打开媒体通道
this.taoyao.buildLocalMedia()
.then(stream => {
self.taoyao.buildMediaChannel('local', stream);
})
.catch(e => console.error('打开终端媒体失败', e));
.catch(e => {
console.error('打开终端媒体失败', e);
// 方便相同电脑测试
self.taoyao.buildMediaChannel('local', null);
});
},
beforeDestroy() {
},
@@ -86,7 +92,6 @@
create: function(event) {
let self = this;
this.taoyao.meetingCreate(data => {
console.log(data)
self.taoyao.meetingEnter(data.body.id);
return true;
});

View File

@@ -359,8 +359,6 @@
### 发布信令5000
Offer/Answer
控制终端推流(服务端拉流)
### 取消发布指令5001
@@ -369,8 +367,6 @@ Offer/Answer
### 订阅指令5002
Offer/Answer
订阅终端媒体流(终端拉流)
### 取消订阅指令5003

View File

@@ -52,14 +52,14 @@ public interface ClientSession extends AutoCloseable {
*
* @return 终端标识是否匹配
*/
boolean matchSn(String sn);
boolean filterSn(String sn);
/**
* @param sn 终端标识
*
* @return 终端标识是否匹配失败
*/
boolean matchNoneSn(String sn);
boolean filterNoneSn(String sn);
/**
* @param instance 会话实例

View File

@@ -64,12 +64,12 @@ public abstract class ClientSessionAdapter<T extends AutoCloseable> implements C
}
@Override
public boolean matchSn(String sn) {
public boolean filterSn(String sn) {
return StringUtils.equals(sn, this.sn);
}
@Override
public boolean matchNoneSn(String sn) {
public boolean filterNoneSn(String sn) {
return !StringUtils.equals(sn, this.sn);
}

View File

@@ -65,7 +65,7 @@ public class ClientSessionManager {
* @param message 消息
*/
public void unicast(String to, Message message) {
this.sessions.stream().filter(v -> v.matchSn(to)).forEach(v -> {
this.sessions.stream().filter(v -> v.filterSn(to)).forEach(v -> {
message.getHeader().setSn(v.sn());
v.push(message);
});
@@ -90,7 +90,7 @@ public class ClientSessionManager {
* @param message 消息
*/
public void broadcast(String from, Message message) {
this.sessions.stream().filter(v -> v.matchNoneSn(from)).forEach(v -> {
this.sessions.stream().filter(v -> v.filterNoneSn(from)).forEach(v -> {
message.getHeader().setSn(v.sn());
v.push(message);
});

View File

@@ -3,6 +3,7 @@ package com.acgist.taoyao.signal.client.websocket;
import org.springframework.beans.factory.annotation.Autowired;
import com.acgist.taoyao.boot.model.Message;
import com.acgist.taoyao.boot.model.MessageCodeException;
import com.acgist.taoyao.signal.client.ClientSessionManager;
import com.acgist.taoyao.signal.protocol.ProtocolManager;
import com.acgist.taoyao.signal.protocol.platform.ErrorProtocol;
@@ -42,6 +43,9 @@ public class WebSocketSignal {
} catch (Exception e) {
log.error("处理会话消息异常", e);
final Message errorMessage = WebSocketSignal.errorProtocol.build();
if(e instanceof MessageCodeException code) {
errorMessage.setCode(code.getCode(), code.getMessage());
}
errorMessage.setBody(e.getMessage());
this.push(session, errorMessage);
}

View File

@@ -1,6 +1,8 @@
package com.acgist.taoyao.signal.event;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import org.springframework.context.ApplicationEvent;
@@ -86,4 +88,16 @@ public abstract class ApplicationEventAdapter extends ApplicationEvent {
return t == null ? defaultValue : t;
}
/**
* @return 新的主体
*/
public Map<String, Object> mergeBody() {
final Map<String, Object> body = new HashMap<>();
if(this.body != null) {
this.body.forEach((k, v) -> body.put(Objects.toString(k), v));
}
this.message.setBody(body);
return body;
}
}

View File

@@ -0,0 +1,35 @@
package com.acgist.taoyao.signal.event.media;
import java.util.List;
import java.util.Map;
import com.acgist.taoyao.boot.model.Message;
import com.acgist.taoyao.signal.client.ClientSession;
import com.acgist.taoyao.signal.event.ApplicationEventAdapter;
import lombok.Getter;
import lombok.Setter;
/**
* 候选事件
*
* @author acgist
*/
@Getter
@Setter
public class MediaCandidateEvent extends ApplicationEventAdapter {
private static final long serialVersionUID = 1L;
public MediaCandidateEvent(String sn, Map<?, ?> body, Message message, ClientSession session) {
super(sn, body, message, session);
}
/**
* @return 终端列表
*/
public List<String> getSns() {
return this.get("sns");
}
}

View File

@@ -0,0 +1,34 @@
package com.acgist.taoyao.signal.event.media;
import java.util.Map;
import com.acgist.taoyao.boot.model.Message;
import com.acgist.taoyao.signal.client.ClientSession;
import com.acgist.taoyao.signal.event.ApplicationEventAdapter;
import lombok.Getter;
import lombok.Setter;
/**
* 发布事件
*
* @author acgist
*/
@Getter
@Setter
public class MediaPublishEvent extends ApplicationEventAdapter {
private static final long serialVersionUID = 1L;
public MediaPublishEvent(String sn, Map<?, ?> body, Message message, ClientSession session) {
super(sn, body, message, session);
}
/**
* @return 终端标识(发布给谁)
*/
public String getTo() {
return this.get("to");
}
}

View File

@@ -24,4 +24,11 @@ public class MediaSubscribeEvent extends ApplicationEventAdapter {
super(sn, body, message, session);
}
/**
* @return 终端标识(订阅的谁)
*/
public String getTo() {
return this.get("to");
}
}

View File

@@ -99,7 +99,7 @@ public class ProtocolManager {
final Protocol protocol = this.protocolMapping.get(pid);
if(protocol == null) {
log.warn("不支持的信令协议:{}", message);
session.push(this.errorProtocol.build("不支持的信令协议"));
session.push(this.errorProtocol.build("不支持的信令协议" + pid));
return;
}
if(protocol instanceof ClientRegisterProtocol) {

View File

@@ -0,0 +1,30 @@
package com.acgist.taoyao.signal.protocol.media;
import java.util.Map;
import com.acgist.taoyao.boot.annotation.Protocol;
import com.acgist.taoyao.boot.model.Message;
import com.acgist.taoyao.signal.client.ClientSession;
import com.acgist.taoyao.signal.event.media.MediaCandidateEvent;
import com.acgist.taoyao.signal.protocol.ProtocolMapAdapter;
/**
* 候选信令
*
* @author acgist
*/
@Protocol
public class MediaCandidateProtocol extends ProtocolMapAdapter {
public static final Integer PID = 5004;
public MediaCandidateProtocol() {
super(PID, "候选信令");
}
@Override
public void execute(String sn, Map<?, ?> body, Message message, ClientSession session) {
this.publishEvent(new MediaCandidateEvent(sn, body, message, session));
}
}

View File

@@ -1,5 +1,30 @@
package com.acgist.taoyao.signal.protocol.media;
public class MediaPublishProtocol {
import java.util.Map;
import com.acgist.taoyao.boot.annotation.Protocol;
import com.acgist.taoyao.boot.model.Message;
import com.acgist.taoyao.signal.client.ClientSession;
import com.acgist.taoyao.signal.event.media.MediaPublishEvent;
import com.acgist.taoyao.signal.protocol.ProtocolMapAdapter;
/**
* 发布信令
*
* @author acgist
*/
@Protocol
public class MediaPublishProtocol extends ProtocolMapAdapter {
public static final Integer PID = 5000;
public MediaPublishProtocol() {
super(PID, "发布信令");
}
@Override
public void execute(String sn, Map<?, ?> body, Message message, ClientSession session) {
this.publishEvent(new MediaPublishEvent(sn, body, message, session));
}
}

View File

@@ -2,6 +2,7 @@ package com.acgist.taoyao.signal.protocol.media;
import java.util.Map;
import com.acgist.taoyao.boot.annotation.Protocol;
import com.acgist.taoyao.boot.model.Message;
import com.acgist.taoyao.signal.client.ClientSession;
import com.acgist.taoyao.signal.event.media.MediaSubscribeEvent;
@@ -12,6 +13,7 @@ import com.acgist.taoyao.signal.protocol.ProtocolMapAdapter;
*
* @author acgist
*/
@Protocol
public class MediaSubscribeProtocol extends ProtocolMapAdapter {
public static final Integer PID = 5002;

View File

@@ -5,6 +5,8 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import com.acgist.taoyao.webrtc.mesh.listener.MediaCandidateListener;
import com.acgist.taoyao.webrtc.mesh.listener.MediaPublishListener;
import com.acgist.taoyao.webrtc.mesh.listener.MediaSubscribeListener;
/**
@@ -16,10 +18,22 @@ import com.acgist.taoyao.webrtc.mesh.listener.MediaSubscribeListener;
@ConditionalOnProperty(prefix = "taoyao.webrtc", name = "model", havingValue = "MESH", matchIfMissing = false)
public class MeshAutoConfiguration {
@Bean
@ConditionalOnMissingBean
public MediaPublishListener mediaPublishListener() {
return new MediaPublishListener();
}
@Bean
@ConditionalOnMissingBean
public MediaSubscribeListener mediaSubscribeListener() {
return new MediaSubscribeListener();
}
@Bean
@ConditionalOnMissingBean
public MediaCandidateListener mediaCandidateListener() {
return new MediaCandidateListener();
}
}

View File

@@ -0,0 +1,32 @@
package com.acgist.taoyao.webrtc.mesh.listener;
import java.util.List;
import java.util.Map;
import org.apache.commons.collections4.CollectionUtils;
import com.acgist.taoyao.boot.model.Message;
import com.acgist.taoyao.signal.event.media.MediaCandidateEvent;
import com.acgist.taoyao.signal.listener.MediaListenerAdapter;
/**
* 候选监听
*
* @author acgist
*/
public class MediaCandidateListener extends MediaListenerAdapter<MediaCandidateEvent> {
@Override
public void onApplicationEvent(MediaCandidateEvent event) {
final String sn = event.getSn();
final List<String> sns = event.getSns();
if(CollectionUtils.isEmpty(sns)) {
return;
}
final Message message = event.getMessage();
final Map<String, Object> mergeBody = event.mergeBody();
mergeBody.put("from", sn);
sns.forEach(to -> this.clientSessionManager.unicast(to, message));
}
}

View File

@@ -0,0 +1,26 @@
package com.acgist.taoyao.webrtc.mesh.listener;
import java.util.Map;
import com.acgist.taoyao.boot.model.Message;
import com.acgist.taoyao.signal.event.media.MediaPublishEvent;
import com.acgist.taoyao.signal.listener.MediaListenerAdapter;
/**
* 发布监听
*
* @author acgist
*/
public class MediaPublishListener extends MediaListenerAdapter<MediaPublishEvent> {
@Override
public void onApplicationEvent(MediaPublishEvent event) {
final String sn = event.getSn();
final String to = event.getTo();
final Message message = event.getMessage();
final Map<String, Object> mergeBody = event.mergeBody();
mergeBody.put("from", sn);
this.clientSessionManager.unicast(to, message);
}
}

View File

@@ -1,10 +1,13 @@
package com.acgist.taoyao.webrtc.mesh.listener;
import java.util.Map;
import com.acgist.taoyao.boot.model.Message;
import com.acgist.taoyao.signal.event.media.MediaSubscribeEvent;
import com.acgist.taoyao.signal.listener.MediaListenerAdapter;
/**
* 媒体订阅监听
* 订阅监听
*
* @author acgist
*/
@@ -12,7 +15,12 @@ public class MediaSubscribeListener extends MediaListenerAdapter<MediaSubscribeE
@Override
public void onApplicationEvent(MediaSubscribeEvent event) {
final String sn = event.getSn();
final String to = event.getTo();
final Message message = event.getMessage();
final Map<String, Object> mergeBody = event.mergeBody();
mergeBody.put("from", sn);
this.clientSessionManager.unicast(to, message);
}
}