[*] mesh多人

This commit is contained in:
acgist
2022-12-03 14:02:11 +08:00
parent 18cc4e536d
commit 7f5ee58fbb
20 changed files with 197 additions and 218 deletions

View File

@@ -30,12 +30,6 @@
<groupId>com.acgist</groupId>
<artifactId>taoyao-meeting</artifactId>
</dependency>
<dependency>
<groupId>com.acgist</groupId>
<artifactId>taoyao-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>

View File

@@ -91,19 +91,19 @@ taoyao:
# 媒体端口范围
min-port: 45535
max-port: 65535
# 公共服务
stun:
- stun:stun1.l.google.com:19302
- stun:stun2.l.google.com:19302
- stun:stun3.l.google.com:19302
- stun:stun4.l.google.com:19302
- stun:stun.stunprotocol.org:3478
# 自己搭建coturn
turn:
- stun:stun1.l.google.com:19302
- stun:stun2.l.google.com:19302
- stun:stun3.l.google.com:19302
- stun:stun4.l.google.com:19302
- stun:stun.stunprotocol.org:3478
# KMS服务配置
- turn:127.0.0.1:8888
- turn:127.0.0.1:8888
- turn:127.0.0.1:8888
- turn:127.0.0.1:8888
# KMS服务配置可以部署多个简单实现负载均衡
kms:
host: 192.168.1.100
port: 18888

View File

@@ -60,7 +60,7 @@ const defaultRPCConfig = {
/** 信令配置 */
const signalConfig = {
/** 当前终端SN */
sn: localStorage.getItem('taoyao.sn') || 'taoyao',
sn: 'taoyao',
/** 当前版本 */
version: '1.0.0',
// 信令授权
@@ -170,6 +170,7 @@ const signalChannel = {
clearTimeout(self.heartbeatTimer);
}
self.heartbeatTimer = setTimeout(function() {
// 电池navigator.getBattery()
if (self.channel && self.channel.readyState === WebSocket.OPEN) {
self.push(signalProtocol.buildProtocol(
signalProtocol.client.heartbeat,
@@ -341,9 +342,23 @@ const signalChannel = {
},
/** 终端默认回调 */
defaultClientConfig: function(data) {
this.taoyao
let self = this;
// 配置终端
self.taoyao
.configMedia(data.body.media.audio, data.body.media.video)
.configWebrtc(data.body.webrtc);
// 打开媒体通道
let videoId = self.taoyao.videoId;
if(videoId) {
self.taoyao.buildLocalMedia()
.then(stream => {
self.taoyao.buildMediaChannel(videoId, stream);
})
.catch(e => console.error('打开终端媒体失败', e));
console.debug('自动打开媒体通道', videoId);
} else {
console.debug('没有配置本地媒体信息跳过自动打开媒体通道');
}
},
defaultClientReboot: function(data) {
console.info('重启终端');
@@ -355,10 +370,10 @@ const signalChannel = {
defaultMediaSubscribe: function(data) {
let self = this;
const from = data.body.from;
this.taoyao.remoteClientFilter(from, true);
self.taoyao.localMediaChannel.createOffer().then(description => {
const remote = this.taoyao.remoteClientFilter(from, true);
remote.localMediaChannel.createOffer().then(description => {
console.debug('Local Create Offer', description);
self.taoyao.localMediaChannel.setLocalDescription(description);
remote.localMediaChannel.setLocalDescription(description);
self.push(signalProtocol.buildProtocol(
signalProtocol.media.offer,
{
@@ -374,11 +389,11 @@ const signalChannel = {
defaultMediaOffer: function(data) {
let self = this;
const from = data.body.from;
this.taoyao.remoteClientFilter(from, true);
self.taoyao.remoteMediaChannel.setRemoteDescription(new RTCSessionDescription(data.body.sdp));
self.taoyao.remoteMediaChannel.createAnswer().then(description => {
const remote = this.taoyao.remoteClientFilter(from, true);
remote.remoteMediaChannel.setRemoteDescription(new RTCSessionDescription(data.body.sdp));
remote.remoteMediaChannel.createAnswer().then(description => {
console.debug('Remote Create Answer', description);
self.taoyao.remoteMediaChannel.setLocalDescription(description);
remote.remoteMediaChannel.setLocalDescription(description);
self.push(signalProtocol.buildProtocol(
signalProtocol.media.answer,
{
@@ -392,18 +407,27 @@ const signalChannel = {
});
},
defaultMediaAnswer: function(data) {
this.taoyao.localMediaChannel.setRemoteDescription(new RTCSessionDescription(data.body.sdp));
const from = data.body.from;
const remote = this.taoyao.remoteClientFilter(from, true);
remote.localMediaChannel.setRemoteDescription(new RTCSessionDescription(data.body.sdp));
},
defaultMediaCandidate: function(data) {
if(!this.taoyao.checkCandidate(data.body.candidate)) {
console.debug('候选缺失要素', data);
return;
}
console.debug('Set ICE Candidate', this.taoyao.remoteMediaChannel);
console.debug('Set ICE Candidate', data.body);
const from = data.body.from;
const remote = this.taoyao.remoteClientFilter(from, true);
if(data.body.type === 'local') {
this.taoyao.remoteMediaChannel.addIceCandidate(new RTCIceCandidate(data.body.candidate));
remote.remoteMediaChannel.addIceCandidate(new RTCIceCandidate(data.body.candidate));
} else if(data.body.type === 'remote'){
remote.localMediaChannel.addIceCandidate(new RTCIceCandidate(data.body.candidate));
} else if(data.body.type === 'mesh') {
remote.localMediaChannel.addIceCandidate(new RTCIceCandidate(data.body.candidate));
// remote.remoteMediaChannel.addIceCandidate(new RTCIceCandidate(data.body.candidate));
} else {
this.taoyao.localMediaChannel.addIceCandidate(new RTCIceCandidate(data.body.candidate));
console.warn('不支持的候选类型', data.body.type);
}
},
/** 会议默认回调 */
@@ -413,10 +437,14 @@ const signalChannel = {
};
/** 终端 */
function TaoyaoClient(
taoyao,
sn,
shareMediaChannel,
audioEnabled,
videoEnabled
) {
/** 桃夭 */
this.taoyao = taoyao;
/** 终端标识 */
this.sn = sn;
/** 视频对象 */
@@ -429,6 +457,12 @@ function TaoyaoClient(
this.audioStatus = false;
this.videoStatus = false;
this.recordStatus = false;
/** 本地媒体通道 */
this.localMediaChannel = null;
/** 远程媒体通道 */
this.remoteMediaChannel = null;
/** 是否共享媒体通道 */
this.shareMediaChannel = shareMediaChannel;
/** 媒体状态:是否播放 */
this.audioEnabled = audioEnabled == undefined ? true : audioEnabled;
this.videoEnabled = videoEnabled == undefined ? true : videoEnabled;
@@ -453,6 +487,7 @@ function TaoyaoClient(
/** 关闭视频 */
this.close = async function() {
await this.video.close();
// TODO释放连接
return this;
};
/** 设置媒体 */
@@ -474,15 +509,21 @@ function TaoyaoClient(
if(track.kind === 'video') {
this.buildVideoTrack(track);
}
} else {
} else if(stream) {
let audioTrack = stream.getAudioTracks();
let videoTrack = stream.getVideoTracks();
// TODO验证API试试修改媒体
// audioTrack.getSettings
// audioTrack.getCapabilities
// audioTrack.applyCapabilities
if(audioTrack && audioTrack.length) {
audioTrack.forEach(v => this.buildAudioTrack(v));
}
if(videoTrack && videoTrack.length) {
videoTrack.forEach(v => this.buildVideoTrack(v));
}
} else {
throw new Error('无效媒体信息');
}
console.debug('设置媒体', this.video, this.stream, this.audioTrack, this.videoTrack);
await this.load();
@@ -509,13 +550,53 @@ function TaoyaoClient(
this.stream.addTrack(track);
}
};
/** 打开媒体通道 */
this.openMediaChannel = function() {
if(this.shareMediaChannel) {
this.localMediaChannel = this.taoyao.localMediaChannel;
this.remoteMediaChannel = this.taoyao.remoteMediaChannel;
} else {
let self = this;
// 本地通道
let mediaChannel = new RTCPeerConnection(defaultRPCConfig);
self.taoyao.localClient.audioTrack.forEach(v => mediaChannel.addTrack(v, self.taoyao.localClient.stream));
self.taoyao.localClient.videoTrack.forEach(v => mediaChannel.addTrack(v, self.taoyao.localClient.stream));
mediaChannel.ontrack = function(e) {
console.debug('Mesh Media Track', self.sn, e);
let remote = self.taoyao.remoteClientFilter(self.sn);
remote.buildStream(remote.sn, e.streams[0], e.track);
};
mediaChannel.onicecandidate = function(e) {
// TODO判断给谁
let to = self.taoyao.remoteClient.map(v => v.sn)[0];
if(!self.taoyao.checkCandidate(e.candidate)) {
console.debug('Send Mesh ICE Candidate Fail', e);
return;
}
console.debug('Send Mesh ICE Candidate', to, e);
self.taoyao.push(signalProtocol.buildProtocol(
signalProtocol.media.candidate,
{
to: to,
type: 'mesh',
candidate: e.candidate
}
));
};
this.localMediaChannel = mediaChannel;
this.remoteMediaChannel = mediaChannel;
}
};
}
/** 桃夭 */
function Taoyao(
videoId,
webSocket,
localClientAudioEnabled,
localClientVideoEnabled
) {
/** 本地视频ID */
this.videoId = videoId;
/** WebRTC配置 */
this.webrtc = null;
/** WebSocket地址 */
@@ -538,6 +619,8 @@ function Taoyao(
/** 远程终端 */
this.remoteClient = [];
this.remoteMediaChannel = null;
/** 是否共享媒体通道 */
this.shareMediaChannel = true;
/** 信令通道 */
this.signalChannel = null;
/** 媒体配置 */
@@ -551,6 +634,7 @@ function Taoyao(
this.configWebrtc = function(config = {}) {
this.webrtc = config;
this.webSocket = this.webrtc.signal.address;
this.shareMediaChannel = this.webrtc.framework === 'MOON';
defaultRPCConfig.iceServers = this.webrtc.stun.map(v => ({'urls': v}));
console.debug('WebRTC配置', this.webrtc, defaultRPCConfig);
return this;
@@ -560,8 +644,8 @@ function Taoyao(
signalChannel.taoyao = this;
this.signalChannel = signalChannel;
// 不能直接this.push = this.signalChannel.push这样导致this对象错误
this.push = function(data, callback) {
this.signalChannel.push(data, callback)
this.push = function(data, pushCallback) {
this.signalChannel.push(data, pushCallback);
};
return this.signalChannel.connect(this.webSocket, callback);
};
@@ -607,11 +691,6 @@ function Taoyao(
})
.then(resolve)
.catch(reject);
// 兼容旧版
// navigator.getUserMedia({
// audio: self.audioConfig,
// video: self.videoConfig
// }, resolve, reject);
})
.catch(e => {
console.error('检查终端设备异常', e);
@@ -625,12 +704,17 @@ function Taoyao(
};
/** 远程终端过滤 */
this.remoteClientFilter = function(sn, autoBuild) {
if(sn === signalConfig.sn) {
console.warn('远程终端等于本地终端');
return this.localClient;
}
let array = this.remoteClient.filter(v => v.sn === sn);
let remote = null;
if(array.length > 0) {
remote = array[0];
} else if(autoBuild) {
remote = new TaoyaoClient(sn);
remote = new TaoyaoClient(this, sn, this.shareMediaChannel);
remote.openMediaChannel();
this.remoteClient.push(remote);
}
return remote;
@@ -647,86 +731,60 @@ function Taoyao(
this.buildMediaChannel = async function(localVideoId, stream) {
let self = this;
// 本地视频
this.localClient = new TaoyaoClient(signalConfig.sn, this.localClientAudioEnabled, this.localClientVideoEnabled);
this.localClient = new TaoyaoClient(this, signalConfig.sn, this.shareMediaChannel, this.localClientAudioEnabled, this.localClientVideoEnabled);
await this.localClient.buildStream(localVideoId, stream);
// 本地通道
this.localMediaChannel = new RTCPeerConnection(defaultRPCConfig);
this.localClient.audioTrack.forEach(v => this.localMediaChannel.addTrack(v, this.localClient.stream));
this.localClient.videoTrack.forEach(v => this.localMediaChannel.addTrack(v, this.localClient.stream));
this.localMediaChannel.ontrack = function(e) {
console.debug('Local Media Track', e);
};
this.localMediaChannel.ondatachannel = function(channel) {
channel.onopen = function() {
console.debug('Local DataChannel Open');
}
channel.onmessage = function(data) {
console.debug('Local DataChannel Message', data);
}
channel.onclose = function() {
console.debug('Local DataChannel Close');
}
channel.onerror = function(e) {
console.debug('Local DataChannel Error', e);
}
};
this.localMediaChannel.onicecandidate = function(e) {
// TODO判断给谁
let to = self.remoteClient.map(v => v.sn)[0];
if(!self.checkCandidate(e.candidate)) {
console.debug('Send Local ICE Candidate Fail', e);
return;
}
console.debug('Send Local ICE Candidate', to, e);
self.push(signalProtocol.buildProtocol(
signalProtocol.media.candidate,
{
to: to,
type: 'local',
candidate: e.candidate
if(this.shareMediaChannel) {
// 本地通道
this.localMediaChannel = new RTCPeerConnection(defaultRPCConfig);
this.localClient.audioTrack.forEach(v => this.localMediaChannel.addTrack(v, this.localClient.stream));
this.localClient.videoTrack.forEach(v => this.localMediaChannel.addTrack(v, this.localClient.stream));
this.localMediaChannel.ontrack = function(e) {
console.debug('Local Media Track', e);
};
this.localMediaChannel.onicecandidate = function(e) {
// TODO判断给谁
let to = self.remoteClient.map(v => v.sn)[0];
if(!self.checkCandidate(e.candidate)) {
console.debug('Send Local ICE Candidate Fail', e);
return;
}
));
};
// 远程通道
this.remoteMediaChannel = new RTCPeerConnection(defaultRPCConfig);
this.remoteMediaChannel.ontrack = function(e) {
console.debug('Remote Media Track', e);
// TODO匹配
let remote = self.remoteClient[0];
remote.buildStream(remote.sn, e.streams[0], e.track);
};
this.remoteMediaChannel.ondatachannel = function(channel) {
channel.onopen = function() {
console.debug('Remote DataChannel Open');
}
channel.onmessage = function(data) {
console.debug('Remote DataChannel Message', data);
}
channel.onclose = function() {
console.debug('Remote DataChannel Close');
}
channel.onerror = function(e) {
console.debug('Remote DataChannel Error', e);
}
};
this.remoteMediaChannel.onicecandidate = function(e) {
// TODO判断给谁
let to = self.remoteClient.map(v => v.sn)[0];
if(!self.checkCandidate(e.candidate)) {
console.debug('Send Remote ICE Candidate Fail', e);
return;
}
console.debug('Send Remote ICE Candidate', to, e);
self.push(signalProtocol.buildProtocol(
signalProtocol.media.candidate,
{
to: to,
type: 'remote',
candidate: e.candidate
console.debug('Send Local ICE Candidate', to, e);
self.push(signalProtocol.buildProtocol(
signalProtocol.media.candidate,
{
to: to,
type: 'local',
candidate: e.candidate
}
));
};
// 远程通道
this.remoteMediaChannel = new RTCPeerConnection(defaultRPCConfig);
this.remoteMediaChannel.ontrack = function(e) {
console.debug('Remote Media Track', e);
// TODO匹配
let remote = self.remoteClient[0];
remote.buildStream(remote.sn, e.streams[0], e.track);
};
this.remoteMediaChannel.onicecandidate = function(e) {
// TODO判断给谁
let to = self.remoteClient.map(v => v.sn)[0];
if(!self.checkCandidate(e.candidate)) {
console.debug('Send Remote ICE Candidate Fail', e);
return;
}
));
};
console.debug('打开媒体通道', this.localMediaChannel, this.remoteMediaChannel);
console.debug('Send Remote ICE Candidate', to, e);
self.push(signalProtocol.buildProtocol(
signalProtocol.media.candidate,
{
to: to,
type: 'remote',
candidate: e.candidate
}
));
};
console.debug('打开共享媒体通道', this.localMediaChannel, this.remoteMediaChannel);
}
return this;
};
/** 校验candidate */

View File

@@ -51,38 +51,27 @@
},
mounted() {
let self = this;
this.taoyao = new Taoyao();
this.taoyao = new Taoyao('local');
this.remoteClient = this.taoyao.remoteClient;
// 打开信令通道
this.taoyao
.buildChannel(self.callback)
.then(e => console.debug('信令通道连接成功'));
// 打开媒体通道
this.taoyao.buildLocalMedia()
.then(stream => {
self.taoyao.buildMediaChannel('local', stream);
})
.catch(e => {
console.error('打开终端媒体失败', e);
// 方便相同电脑测试
self.taoyao.buildMediaChannel('local', null);
});
},
beforeDestroy() {
},
methods: {
// 信令回调:返回true表示已经处理
// 信令回调true表示已经处理false表示没有处理
callback: function(data) {
let self = this;
switch(data.header.pid) {
case signalProtocol.client.config:
// 如果需要下发配置生效需要在此打开媒体通道
return false;
}
return false;
},
// 创建会议
create: function(event) {
let sn = prompt('你的账号', signalConfig.sn);
signalConfig.sn = sn;
let self = this;
this.taoyao.meetingCreate(data => {
self.taoyao.meetingEnter(data.body.id);

View File

@@ -0,0 +1,42 @@
package com.acgist.taoyao.annotation;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Inherited;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import java.util.concurrent.TimeUnit;
/**
* 多线程测试
*
* @author acgist
*/
@Target(ElementType.METHOD)
@Inherited
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface CostedTest {
/**
* @return 执行次数
*/
int count() default 1;
/**
* @return 线程数量
*/
int thread() default 1;
/**
* @return 超时时间
*/
long timeout() default 1000;
/**
* @return 超时时间单位
*/
TimeUnit timeUnit() default TimeUnit.MILLISECONDS;
}

View File

@@ -0,0 +1,58 @@
package com.acgist.taoyao.annotation;
import java.lang.reflect.InvocationTargetException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.springframework.test.context.TestContext;
import org.springframework.test.context.TestExecutionListener;
import lombok.extern.slf4j.Slf4j;
/**
* 多线程测试监听
*
* @author acgist
*/
@Slf4j
public class CostedTestTestExecutionListener implements TestExecutionListener {
@Override
public void afterTestMethod(TestContext testContext) throws Exception {
final CostedTest costedTest = testContext.getTestMethod().getDeclaredAnnotation(CostedTest.class);
if(costedTest == null) {
return;
}
final int count = costedTest.count();
final int thread = costedTest.thread();
final long timeout = costedTest.timeout();
final TimeUnit timeUnit = costedTest.timeUnit();
final long aTime = System.currentTimeMillis();
if(thread == 1) {
for (int index = 0; index < count; index++) {
testContext.getTestMethod().invoke(testContext.getTestInstance());
}
} else {
final CountDownLatch countDownLatch = new CountDownLatch(count);
final ExecutorService executor = Executors.newFixedThreadPool(thread);
for (int index = 0; index < count; index++) {
executor.execute(() -> {
try {
testContext.getTestMethod().invoke(testContext.getTestInstance());
} catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
log.error("多线程测试异常", e);
} finally {
countDownLatch.countDown();
}
});
}
countDownLatch.await(timeout, timeUnit);
}
final long zTime = System.currentTimeMillis();
final long costed = zTime - aTime;
log.info("多线程测试消耗时间:{}", costed);
}
}

View File

@@ -0,0 +1,31 @@
package com.acgist.taoyao.annotation;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Inherited;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.core.annotation.AliasFor;
import org.springframework.test.context.TestExecutionListeners;
import org.springframework.test.context.TestExecutionListeners.MergeMode;
/**
* 测试启动
*
* @author acgist
*/
@Target(ElementType.TYPE)
@Inherited
@Retention(RetentionPolicy.RUNTIME)
@Documented
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.DEFINED_PORT)
@TestExecutionListeners(listeners = CostedTestTestExecutionListener.class, mergeMode = MergeMode.MERGE_WITH_DEFAULTS)
public @interface TaoyaoTest {
@AliasFor(annotation = SpringBootTest.class)
Class<?>[] classes() default {};
}

View File

@@ -3,9 +3,9 @@ package com.acgist.taoyao.boot.service;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import com.acgist.taoyao.annotation.CostedTest;
import com.acgist.taoyao.annotation.TaoyaoTest;
import com.acgist.taoyao.main.TaoyaoApplication;
import com.acgist.taoyao.test.annotation.TaoyaoTest;
import com.acgist.taoyao.test.annotation.CostedTest;
import lombok.extern.slf4j.Slf4j;

View File

@@ -7,8 +7,8 @@ import java.util.concurrent.CountDownLatch;
import org.junit.jupiter.api.Test;
import com.acgist.taoyao.annotation.TaoyaoTest;
import com.acgist.taoyao.main.TaoyaoApplication;
import com.acgist.taoyao.test.annotation.TaoyaoTest;
import lombok.extern.slf4j.Slf4j;

View File

@@ -7,9 +7,9 @@ import java.util.Map;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import com.acgist.taoyao.annotation.TaoyaoTest;
import com.acgist.taoyao.main.TaoyaoApplication;
import com.acgist.taoyao.signal.protocol.platform.ScriptProtocol;
import com.acgist.taoyao.test.annotation.TaoyaoTest;
@TaoyaoTest(classes = TaoyaoApplication.class)
class ScriptProtocolTest {

View File

@@ -5,9 +5,9 @@ import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import com.acgist.taoyao.annotation.TaoyaoTest;
import com.acgist.taoyao.main.TaoyaoApplication;
import com.acgist.taoyao.signal.protocol.platform.ShutdownProtocol;
import com.acgist.taoyao.test.annotation.TaoyaoTest;
@TaoyaoTest(classes = TaoyaoApplication.class)
class ShutdownProtocolTest {