[+] 终端信令连接

This commit is contained in:
acgist
2022-11-13 12:14:10 +08:00
parent b8f243c64c
commit 8d1392a6da
21 changed files with 412 additions and 70 deletions

View File

@@ -54,16 +54,6 @@
> 只有公网Mesh架构才需要真正的内网穿透
### STUN/TURN公共服务地址
```
stun:stun1.l.google.com:19302
stun:stun2.l.google.com:19302
stun:stun3.l.google.com:19302
stun:stun4.l.google.com:19302
stun:stun.stunprotocol.org:3478
```
## 信令
|功能|描述|标识|响应|

View File

@@ -50,8 +50,6 @@ import org.springframework.web.servlet.NoHandlerFoundException;
import com.acgist.taoyao.boot.controller.TaoyaoControllerAdvice;
import com.acgist.taoyao.boot.controller.TaoyaoErrorController;
import com.acgist.taoyao.boot.interceptor.SecurityInterceptor;
import com.acgist.taoyao.boot.interceptor.SlowInterceptor;
import com.acgist.taoyao.boot.model.MessageCode;
import com.acgist.taoyao.boot.service.IdService;
import com.acgist.taoyao.boot.service.impl.IdServiceImpl;
@@ -121,18 +119,6 @@ public class BootAutoConfiguration {
};
}
@Bean
@ConditionalOnMissingBean
public SlowInterceptor slowInterceptor() {
return new SlowInterceptor();
}
@Bean
@ConditionalOnMissingBean
public SecurityInterceptor securityInterceptor() {
return new SecurityInterceptor();
}
@Bean
@ConditionalOnMissingBean
public TaoyaoErrorController taoyaoErrorController() {

View File

@@ -77,5 +77,33 @@ public class WebrtcProperties {
*/
@Schema(title = "turn服务器", description = "turn服务器")
private String[] turn;
/**
* 信令主机
*/
@Schema(title = "信令主机", description = "信令主机")
private String host;
/**
* 信令端口
*/
@Schema(title = "信令端口", description = "信令端口")
private Integer port;
/**
* 信令协议
*/
@Schema(title = "信令协议", description = "信令协议")
private String schema;
/**
* 信令地址
*/
@Schema(title = "信令地址", description = "信令地址")
private String websocket;
/**
* 完整信令地址
*/
@Schema(title = "完整信令地址", description = "完整信令地址")
public String getSignalAddress() {
return this.schema + "://" + this.host + ":" + this.port + this.websocket;
}
}

View File

@@ -33,7 +33,7 @@ public class Header implements Serializable {
* 请求响应标识
*/
@Schema(title = "请求响应标识", description = "请求响应标识")
private Long id;
private String id;
/**
* 终端标识
*/

View File

@@ -12,6 +12,13 @@ public interface IdService {
*
* @return ID
*/
long id();
long buildId();
/**
* @see #buildId()
*
* @return ID
*/
String buildIdToString();
}

View File

@@ -18,7 +18,7 @@ public class IdServiceImpl implements IdService {
private IdProperties idProperties;
@Override
public long id() {
public long buildId() {
synchronized (this) {
if (++this.index > this.idProperties.getMaxIndex()) {
this.index = 0;
@@ -38,4 +38,9 @@ public class IdServiceImpl implements IdService {
this.index;
}
@Override
public String buildIdToString() {
return String.valueOf(this.buildId());
}
}

View File

@@ -0,0 +1,30 @@
package com.acgist.taoyao.config;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import com.acgist.taoyao.interceptor.SecurityInterceptor;
import com.acgist.taoyao.interceptor.SlowInterceptor;
/**
* 配置
*
* @author acgist
*/
@Configuration
public class TaoyaoAutoConfiguration {
@Bean
@ConditionalOnMissingBean
public SlowInterceptor slowInterceptor() {
return new SlowInterceptor();
}
@Bean
@ConditionalOnMissingBean
public SecurityInterceptor securityInterceptor() {
return new SecurityInterceptor();
}
}

View File

@@ -1,4 +1,4 @@
package com.acgist.taoyao.boot.interceptor;
package com.acgist.taoyao.interceptor;
import java.util.Base64;
@@ -12,6 +12,7 @@ import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import com.acgist.taoyao.boot.config.SecurityProperties;
import com.acgist.taoyao.boot.interceptor.InterceptorAdapter;
import lombok.extern.slf4j.Slf4j;

View File

@@ -1,4 +1,4 @@
package com.acgist.taoyao.boot.interceptor;
package com.acgist.taoyao.interceptor;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
@@ -6,6 +6,7 @@ import javax.servlet.http.HttpServletResponse;
import org.springframework.beans.factory.annotation.Autowired;
import com.acgist.taoyao.boot.config.TaoyaoProperties;
import com.acgist.taoyao.boot.interceptor.InterceptorAdapter;
import lombok.extern.slf4j.Slf4j;

View File

@@ -74,6 +74,10 @@ taoyao:
- stun:stun4.l.google.com:19302
- stun:stun.stunprotocol.org:3478
turn:
host: localhost
port: ${server.port:8888}
schema: wss
websocket: /websocket.signal
record:
storage: /data/record
security:

View File

@@ -1,6 +1,14 @@
/**
* 桃夭WebRTC终端示例
*/
/** 配置 */
const config = {
// 当前终端SN
sn: 'taoyao',
// 信令授权
username: 'taoyao',
password: 'taoyao'
};
/** 音频配置 */
const defaultAudioConfig = {
// 音量0~1
@@ -33,29 +41,45 @@ const defaultVideoConfig = {
// 设备
// deviceId: '',
// 帧率
frameRate: 30,
frameRate: 24,
// 裁切
// resizeMode: '',
// 选摄像头user|left|right|environment
facingMode: 'environment'
}
/** 兼容 */
const XMLHttpRequest = window.XMLHttpRequest;
const PeerConnection = window.RTCPeerConnection || window.mozRTCPeerConnection || window.webkitRTCPeerConnection;
/** 桃夭 */
function Taoyao(
webSocket,
iceServer
iceServer,
audioConfig,
videoConfig
) {
this.webSocket = webSocket;
this.iceServer = iceServer;
/** 媒体状态 */
this.audioStatus = true;
this.videoStatus = true;
/** 设备状态 */
this.audioEnabled = true;
this.videoEnabled = true;
/** 媒体信息 */
this.audioStreamId = null;
this.videoStreamId = null;
this.audioConfig = defaultAudioConfig;
this.videoConfig = defaultVideoConfig;
/** 媒体配置 */
this.audioConfig = audioConfig || defaultAudioConfig;
this.videoConfig = videoConfig || defaultVideoConfig;
/** 本地视频 */
this.localVideo = null;
/** 终端媒体 */
this.clientMedia = {};
/** 信令通道 */
this.signalChannel = null;
/** 初始 */
this.init = function() {
let self = this;
if(navigator.mediaDevices && navigator.mediaDevices.enumerateDevices) {
navigator.mediaDevices.enumerateDevices()
.then(list => {
@@ -71,57 +95,247 @@ function Taoyao(
});
if(!audioDevice) {
console.log('终端没有音频输入设备');
this.audioConfig = false;
self.audioEnabled = false;
}
if(!videoDevice) {
console.log('终端没有视频输入设备');
this.videoConfig = false;
self.videoEnabled = false;
}
})
.catch(e => console.log('获取终端设备失败', e));
.catch(e => {
console.log('获取终端设备失败', e);
self.videoEnabled = false;
self.videoEnabled = false;
});
}
return this;
};
/** 媒体 */
this.buildUserMedia = function() {
/** 请求 */
this.request = function(url, data = {}, method = 'GET', async = true, timeout = 5000, mime = 'json') {
return new Promise((resolve, reject) => {
let xhr = new XMLHttpRequest();
xhr.open(method, url, async);
if(async) {
xhr.timeout = timeout;
xhr.responseType = mime;
xhr.send(data);
xhr.onload = function() {
if(xhr.readyState === 4 && xhr.status === 200) {
resolve(xhr.response);
} else {
reject(xhr.response);
}
}
xhr.onerror = reject;
} else {
xhr.send(data);
if(xhr.readyState === 4 && xhr.status === 200) {
resolve(JSON.parse(xhr.response));
} else {
reject(JSON.parse(xhr.response));
}
}
});
};
/** 媒体配置 */
this.configMedia = function(audio = {}, video = {}) {
this.audioConfig = {...this.audioConfig, ...audio};
this.videoCofnig = {...this.videoCofnig, ...video};
console.log('终端媒体配置', this.audioConfig, this.videoConfig);
};
/** WebRTC配置 */
this.configWebrtc = function(config = {}) {
this.webSocket = config.signalAddress;
this.iceServer = config.stun;
console.log('WebRTC配置', this.webSocket, this.iceServer);
};
/** 信令通道 */
this.buildChannel = function(callback) {
this.signalChannel = signalChannel;
this.signalChannel.connect(this.webSocket, callback);
};
/** 本地媒体 */
this.buildLocalMedia = function() {
console.log("获取终端媒体:", this.audioConfig, this.videoConfig);
let self = this;
return new Promise((resolve, reject) => {
console.log("获取终端媒体:", this.audioConfig, this.videoConfig);
if(navigator.mediaDevices && navigator.mediaDevices.getUserMedia) {
navigator.mediaDevices.getUserMedia({
audio: this.audioConfig,
video: this.videoConfig
audio: self.audioConfig,
video: self.videoConfig
})
.then(resolve)
.catch(reject);
} else if(navigator.getUserMedia) {
navigator.getUserMedia({
audio: this.audioConfig,
video: this.videoConfig
audio: self.audioConfig,
video: self.videoConfig
}, resolve, reject);
} else {
reject("获取终端媒体失败");
}
});
};
/** 本地 */
this.local = async function(localVideoId, stream) {
const localVideo = document.getElementById(localVideoId);
if ('srcObject' in localVideo) {
localVideo.srcObject = stream;
/** 本地媒体 */
this.localMedia = async function(localVideoId, stream) {
this.localVideo = document.getElementById(localVideoId);
if ('srcObject' in this.localVideo) {
this.localVideo.srcObject = stream;
} else {
localVideo.src = URL.createObjectURL(stream);;
this.localVideo.src = URL.createObjectURL(stream);;
}
await localVideo.play();
await this.localVideo.play();
};
/** 连接 */
this.connect = function() {
};
/** 重连 */
/** 定时 */
/** 媒体 */
/** 视频 */
};
/** 信令协议 */
const protocol = {
pid: {
/** 心跳 */
heartbeat: 1000,
/** 注册 */
register: 2000
},
/** 当前索引 */
index: 100000,
/** 最小索引 */
minIndex: 100000,
/** 最大索引 */
maxIndex: 999999,
/** 生成ID */
buildId: function() {
if(this.index++ >= this.maxIndex) {
this.index = this.minIndex;
}
return Date.now() + '' + this.index;
},
/** 生成协议 */
buildProtocol: function(sn, pid, body) {
let message = {
header: {
v: '1.0.0',
id: this.buildId(),
sn: sn,
pid: pid,
},
"body": body
};
return JSON.stringify(message);
}
};
/** 信令消息 */
/** 信令通道 */
const signalChannel = {
/** 通道 */
channel: null,
/** 地址 */
address: null,
/** 回调 */
callback: null,
/** 心跳时间 */
heartbeatTime: 10 * 1000,
/** 心跳定时器 */
heartbeatTimer: null,
/** 防止重连 */
lockReconnect: false,
/** 重连时间 */
connectionTimeout: 5 * 1000,
/** 最小重连时间 */
minReconnectionDelay: 5 * 1000,
/** 最大重连时间 */
maxReconnectionDelay: 5 * 60 * 1000,
/** 自动重连失败后重连时间增长倍数 */
reconnectionDelayGrowFactor: 1.5,
/** 关闭 */
close: function() {
clearTimeout(this.heartbeatTimer);
},
/** 心跳 */
}
heartbeat: function() {
let self = this;
self.heartbeatTimer = setTimeout(function() {
if (self.channel && self.channel.readyState == WebSocket.OPEN) {
self.channel.send(protocol.buildProtocol(config.sn, protocol.pid.heartbeat));
self.heartbeat();
} else {
console.log('发送心跳失败', self.channel);
}
}, self.heartbeatTime);
},
/** 重连 */
reconnect: function() {
let self = this;
if (self.lockReconnect) {
return;
}
self.lockReconnect = true;
// 关闭旧的通道
if(self.channel && self.channel.readyState == WebSocket.OPEN) {
self.channel.close();
self.channel = null;
}
// 打开定时重连
setTimeout(function() {
console.log('信令通道重连', self.address);
self.connect(self.address, self.callback, true);
self.lockReconnect = false;
}, self.connectionTimeout);
if (self.connectionTimeout >= self.maxReconnectionDelay) {
self.connectionTimeout = self.maxReconnectionDelay;
} else {
self.connectionTimeout = self.connectionTimeout * self.reconnectionDelayGrowFactor
}
},
/** 连接 */
connect: function(address, callback, reconnection = true) {
let self = this;
this.address = address;
this.callback = callback;
console.log("连接信令通道", address);
return new Promise((resolve, reject) => {
self.channel = new WebSocket(address);
self.channel.onopen = function(e) {
console.log('信令通道打开', e);
self.channel.send(protocol.buildProtocol(
config.sn,
protocol.pid.register,
{
ip: null,
mac: null,
signal: 100,
battery: 100,
username: config.username,
password: config.password
}
));
self.connectionTimeout = self.minReconnectionDelay
self.heartbeat();
resolve(e);
};
self.channel.onclose = function(e) {
console.log('信令通道关闭', self.channel, e);
if(reconnection) {
self.reconnect();
}
reject(e);
};
self.channel.onerror = function(e) {
console.error('信令通道异常', self.channel, e);
if(reconnection) {
self.reconnect();
}
reject(e);
};
self.channel.onmessage = function(e) {
console.log('信令消息', e.data);
if(callback) {
callback(JSON.parse(e.data));
}
};
});
}
};
/*
var peer;
var socket; // WebSocket

View File

@@ -50,11 +50,35 @@
list.appendChild(child);
}
const taoyao = new Taoyao();
taoyao
.init()
.buildUserMedia()
.then(stream => taoyao.local('local', stream))
.catch((e) => alert('获取终端媒体失败:' + e));
// 初始
taoyao.init();
// 配置媒体
taoyao.request('/config/media', {}, 'GET', false)
.then(response => {
taoyao.configMedia(response.audio, response.video);
})
.catch(e => console.error('获取媒体配置失败', e));
// 配置WebRTC
taoyao.request('/config/webrtc', {}, 'GET', false)
.then(response => {
taoyao.configWebrtc(response);
taoyao.buildChannel(callback);
})
.catch(e => console.error('获取WebRTC配置失败', e));
// 信令回调
function callback(data) {
switch(data.header.pid) {
case 1000:
// 心跳
break;
}
}
// 信令通道
/*
taoyao.buildLocalMedia()
.then(stream => taoyao.localMedia('local', stream))
.catch((e) => alert('获取终端媒体失败:' + e));
*/
</script>
</body>
</html>

View File

@@ -22,14 +22,14 @@ class IdServiceTest {
// @Rollback()
// @RepeatedTest(10)
void testId() {
final long id = this.idService.id();
final long id = this.idService.buildId();
log.info("生成ID{}", id);
}
@Test
@CostedTest(count = 100000, thread = 10)
void testIdCosted() {
this.idService.id();
this.idService.buildId();
}
}

View File

@@ -20,6 +20,12 @@
## 系统信令1000~1999|9999
### 心跳信令1000
```
{}
```
### 异常信令9999
## 设备信令2000~2999

View File

@@ -1,5 +1,7 @@
package com.acgist.taoyao.signal.event.client;
import java.util.Map;
import org.springframework.context.ApplicationEvent;
import com.acgist.taoyao.signal.session.ClientSession;
@@ -22,10 +24,15 @@ public class RegisterEvent extends ApplicationEvent {
* 会话
*/
private ClientSession session;
/**
* 参数
*/
private Map<?, ?> data;
public RegisterEvent(ClientSession session) {
public RegisterEvent(ClientSession session, Map<?, ?> data) {
super(session);
this.session = session;
this.data = data;
}
}

View File

@@ -11,6 +11,7 @@ import com.acgist.taoyao.signal.event.client.RegisterEvent;
import com.acgist.taoyao.signal.listener.ApplicationListenerAdapter;
import com.acgist.taoyao.signal.protocol.client.OnlineProtocol;
import com.acgist.taoyao.signal.session.ClientSession;
import com.acgist.taoyao.signal.session.ClientSessionStatus;
/**
* 终端注册监听
@@ -33,8 +34,13 @@ public class RegisterListener extends ApplicationListenerAdapter<RegisterEvent>
final Message message = this.onlineProtocol.build();
message.setBody(Map.of("sn", session.sn()));
this.clientSessionManager.broadcast(session.sn(), message);
// TODOip等等
// TODO重新注册上来需要掉线重连
final Map<?, ?> data = event.getData();
final ClientSessionStatus status = session.status();
status.setSn(session.sn());
status.setIp((String) data.get("ip"));
status.setMac((String) data.get("mac"));
status.setSignal((Integer) data.get("signal"));
status.setBattery((Integer) data.get("battery"));
}
}

View File

@@ -37,7 +37,7 @@ public abstract class ProtocolAdapter implements Protocol {
public Message build() {
final Header header = Header.builder()
.v(this.taoyaoProperties.getVersion())
.id(this.idService.id())
.id(this.idService.buildIdToString())
.pid(this.protocol)
.build();
final Message message = Message.builder()

View File

@@ -86,7 +86,7 @@ public class ProtocolManager {
final ClientSession session = this.clientSessionManager.session(instance);
if(session != null && protocol instanceof RegisterProtocol) {
event = protocol.execute(sn, value, session);
} else if(session != null) {
} else if(session != null && session.authorized()) {
event = protocol.execute(sn, value, session);
} else {
log.warn("会话没有权限:{}", message);

View File

@@ -48,7 +48,7 @@ public class RegisterProtocol extends ProtocolBodyMapAdapter {
message.setCode(MessageCode.CODE_3401);
}
session.push(message);
return new RegisterEvent(session);
return new RegisterEvent(session, body);
}
}

View File

@@ -0,0 +1,33 @@
package com.acgist.taoyao.signal.protocol.system;
import org.springframework.context.ApplicationEvent;
import org.springframework.stereotype.Component;
import com.acgist.taoyao.boot.model.Message;
import com.acgist.taoyao.signal.protocol.ProtocolAdapter;
import com.acgist.taoyao.signal.session.ClientSession;
/**
* 心跳信令协议
*
* @author acgist
*/
@Component
public class HeartbeatProtocol extends ProtocolAdapter {
/**
* 信令协议标识
*/
public static final Integer PID = 1000;
public HeartbeatProtocol() {
super(PID);
}
@Override
public ApplicationEvent execute(String sn, Message message, ClientSession session) {
session.push(message);
return null;
}
}

View File

@@ -72,7 +72,7 @@ public abstract class ClientSessionAdapter<T extends AutoCloseable> implements C
@Override
public boolean timeout(long timeout) {
return !(this.authorized && System.currentTimeMillis() - this.time <= timeout);
return !this.authorized && System.currentTimeMillis() - this.time > timeout;
}
@Override