From a8aea2548e4578152bd39f3d477a0c6aed6c01f5 Mon Sep 17 00:00:00 2001 From: acgist <289547414@qq.com> Date: Sat, 4 Feb 2023 22:29:46 +0800 Subject: [PATCH] =?UTF-8?q?[+]=20=E8=BF=9E=E6=8E=A5Mediasoup?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../taoyao/boot/property/KmsProperties.java | 60 ----- .../boot/property/MediasoupProperties.java | 60 +++++ .../taoyao/boot/property/MoonProperties.java | 23 -- .../boot/property/WebrtcProperties.java | 15 +- taoyao-signal-server/taoyao-media/README.md | 4 + .../taoyao/mediasoup/MediasoupClient.java | 211 ++++++++++++++++++ .../taoyao/mediasoup/listener/Listener.java | 10 + .../taoyao/mediasoup/transport/Transport.java | 5 + .../src/main/resources/application.yml | 24 +- 9 files changed, 306 insertions(+), 106 deletions(-) delete mode 100644 taoyao-signal-server/taoyao-boot/src/main/java/com/acgist/taoyao/boot/property/KmsProperties.java create mode 100644 taoyao-signal-server/taoyao-boot/src/main/java/com/acgist/taoyao/boot/property/MediasoupProperties.java delete mode 100644 taoyao-signal-server/taoyao-boot/src/main/java/com/acgist/taoyao/boot/property/MoonProperties.java create mode 100644 taoyao-signal-server/taoyao-media/src/main/java/com/acgist/taoyao/mediasoup/listener/Listener.java diff --git a/taoyao-signal-server/taoyao-boot/src/main/java/com/acgist/taoyao/boot/property/KmsProperties.java b/taoyao-signal-server/taoyao-boot/src/main/java/com/acgist/taoyao/boot/property/KmsProperties.java deleted file mode 100644 index a7f947c..0000000 --- a/taoyao-signal-server/taoyao-boot/src/main/java/com/acgist/taoyao/boot/property/KmsProperties.java +++ /dev/null @@ -1,60 +0,0 @@ -package com.acgist.taoyao.boot.property; - -import com.fasterxml.jackson.annotation.JsonIgnore; - -import io.swagger.v3.oas.annotations.media.Schema; -import lombok.Getter; -import lombok.Setter; - -/** - * KMS配置 - * - * @author acgist - */ -@Getter -@Setter -@Schema(title = "KMS配置", description = "KMS配置") -public class KmsProperties { - - /** - * KMS主机 - */ - @Schema(title = "KMS主机", description = "KMS主机") - private String host; - /** - * KMS端口 - */ - @Schema(title = "KMS端口", description = "KMS端口") - private Integer port; - /** - * KMS协议 - */ - @Schema(title = "KMS协议", description = "KMS协议") - private String schema; - /** - * KMS地址 - */ - @Schema(title = "KMS地址", description = "KMS地址") - private String websocket; - /** - * KMS用户 - */ - @Schema(title = "KMS用户", description = "KMS用户") - @JsonIgnore - private String username; - /** - * KMS密码 - */ - @Schema(title = "KMS密码", description = "KMS密码") - @JsonIgnore - private String password; - - /** - * @return 完整KMS地址 - */ - @Schema(title = "完整KMS地址", description = "完整KMS地址") - public String getAddress() { - return this.schema + "://" + this.host + ":" + this.port + this.websocket; - } - -} diff --git a/taoyao-signal-server/taoyao-boot/src/main/java/com/acgist/taoyao/boot/property/MediasoupProperties.java b/taoyao-signal-server/taoyao-boot/src/main/java/com/acgist/taoyao/boot/property/MediasoupProperties.java new file mode 100644 index 0000000..d1bfcf8 --- /dev/null +++ b/taoyao-signal-server/taoyao-boot/src/main/java/com/acgist/taoyao/boot/property/MediasoupProperties.java @@ -0,0 +1,60 @@ +package com.acgist.taoyao.boot.property; + +import com.fasterxml.jackson.annotation.JsonIgnore; + +import io.swagger.v3.oas.annotations.media.Schema; +import lombok.Getter; +import lombok.Setter; + +/** + * Mediasoup配置 + * + * @author acgist + */ +@Getter +@Setter +@Schema(title = "Mediasoup配置", description = "Mediasoup配置") +public class MediasoupProperties { + + /** + * Mediasoup主机 + */ + @Schema(title = "Mediasoup主机", description = "Mediasoup主机") + private String host; + /** + * Mediasoup端口 + */ + @Schema(title = "Mediasoup端口", description = "Mediasoup端口") + private Integer port; + /** + * Mediasoup协议 + */ + @Schema(title = "Mediasoup协议", description = "Mediasoup协议") + private String schema; + /** + * Mediasoup地址 + */ + @Schema(title = "Mediasoup地址", description = "Mediasoup地址") + private String websocket; + /** + * Mediasoup用户 + */ + @Schema(title = "Mediasoup用户", description = "Mediasoup用户") + @JsonIgnore + private String username; + /** + * Mediasoup密码 + */ + @Schema(title = "Mediasoup密码", description = "Mediasoup密码") + @JsonIgnore + private String password; + + /** + * @return 完整Mediasoup地址 + */ + @Schema(title = "完整Mediasoup地址", description = "完整Mediasoup地址") + public String getAddress() { + return this.schema + "://" + this.host + ":" + this.port + this.websocket; + } + +} diff --git a/taoyao-signal-server/taoyao-boot/src/main/java/com/acgist/taoyao/boot/property/MoonProperties.java b/taoyao-signal-server/taoyao-boot/src/main/java/com/acgist/taoyao/boot/property/MoonProperties.java deleted file mode 100644 index 06d8dd2..0000000 --- a/taoyao-signal-server/taoyao-boot/src/main/java/com/acgist/taoyao/boot/property/MoonProperties.java +++ /dev/null @@ -1,23 +0,0 @@ -package com.acgist.taoyao.boot.property; - -import io.swagger.v3.oas.annotations.media.Schema; -import lombok.Getter; -import lombok.Setter; - -/** - * Moon架构配置 - * - * @author acgist - */ -@Getter -@Setter -@Schema(title = "Moon架构配置", description = "Moon架构配置") -public class MoonProperties { - - /** - * 是否混音 - */ - @Schema(title = "是否混音", description = "是否混音") - private Boolean audioMix; - -} diff --git a/taoyao-signal-server/taoyao-boot/src/main/java/com/acgist/taoyao/boot/property/WebrtcProperties.java b/taoyao-signal-server/taoyao-boot/src/main/java/com/acgist/taoyao/boot/property/WebrtcProperties.java index 3b2fc12..50e5a21 100644 --- a/taoyao-signal-server/taoyao-boot/src/main/java/com/acgist/taoyao/boot/property/WebrtcProperties.java +++ b/taoyao-signal-server/taoyao-boot/src/main/java/com/acgist/taoyao/boot/property/WebrtcProperties.java @@ -60,20 +60,15 @@ public class WebrtcProperties { */ @Schema(title = "turn服务器", description = "turn服务器") private String[] turn; - /** - * KMS配置 - */ - @Schema(title = "KMS配置", description = "KMS配置") - private KmsProperties kms; - /** - * Moon架构配置 - */ - @Schema(title = "Moon架构配置", description = "Moon架构配置") - private MoonProperties moon; /** * 信令配置 */ @Schema(title = "信令配置", description = "信令配置") private SignalProperties signal; + /** + * Mediasoup配置 + */ + @Schema(title = "Mediasoup配置", description = "Mediasoup配置") + private MediasoupProperties mediasoup; } diff --git a/taoyao-signal-server/taoyao-media/README.md b/taoyao-signal-server/taoyao-media/README.md index 0cece0c..eb234f7 100644 --- a/taoyao-signal-server/taoyao-media/README.md +++ b/taoyao-signal-server/taoyao-media/README.md @@ -1 +1,5 @@ # 媒体 + +## 媒体信令 + +### diff --git a/taoyao-signal-server/taoyao-media/src/main/java/com/acgist/taoyao/mediasoup/MediasoupClient.java b/taoyao-signal-server/taoyao-media/src/main/java/com/acgist/taoyao/mediasoup/MediasoupClient.java index 13209cf..9460d8c 100644 --- a/taoyao-signal-server/taoyao-media/src/main/java/com/acgist/taoyao/mediasoup/MediasoupClient.java +++ b/taoyao-signal-server/taoyao-media/src/main/java/com/acgist/taoyao/mediasoup/MediasoupClient.java @@ -1,12 +1,223 @@ package com.acgist.taoyao.mediasoup; +import java.net.URI; +import java.net.http.HttpClient; +import java.net.http.WebSocket; +import java.net.http.WebSocket.Listener; +import java.nio.ByteBuffer; +import java.security.KeyManagementException; +import java.security.NoSuchAlgorithmException; +import java.security.SecureRandom; +import java.security.cert.CertificateException; +import java.security.cert.X509Certificate; +import java.time.Duration; +import java.time.Instant; +import java.util.Map; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.ExecutionException; + +import javax.net.ssl.SSLContext; +import javax.net.ssl.X509TrustManager; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.scheduling.TaskScheduler; +import org.springframework.stereotype.Service; + +import com.acgist.taoyao.boot.property.MediasoupProperties; +import com.acgist.taoyao.boot.property.TaoyaoProperties; +import com.acgist.taoyao.boot.property.WebrtcProperties; +import com.acgist.taoyao.boot.utils.JSONUtils; + +import jakarta.annotation.PostConstruct; +import lombok.extern.slf4j.Slf4j; + /** * Mediasoup客户端 * * @author acgist */ +@Slf4j +@Service public class MediasoupClient { + /** + * Mediasoup WebSocket通道 + */ + private WebSocket webSocket; + /** + * Mediasoup配置 + */ + private MediasoupProperties mediasoupProperties; + @Autowired + private TaskScheduler taskSchedulerl; + @Autowired + private TaoyaoProperties taoyaoProperties; + @Autowired + private WebrtcProperties webrtcProperties; + + @PostConstruct + public void init() { + this.mediasoupProperties = this.webrtcProperties.getMediasoup(); + this.buildClient(); + } + + /** + * 连接Mediasoup WebSocket通道 + */ + public void buildClient() { + final URI uri = URI.create(this.mediasoupProperties.getAddress()); + log.info("开始连接Mediasoup:{}", uri); + try { + HttpClient + .newBuilder() + .sslContext(buildSSLContext()) + .build() + .newWebSocketBuilder() + .connectTimeout(Duration.ofMillis(this.taoyaoProperties.getTimeout())) + .buildAsync(uri, new MessageListener()) + .get(); + } catch (InterruptedException | ExecutionException e) { + log.error("连接Mediasoup异常:{}", uri, e); + this.taskSchedulerl.schedule(this::buildClient, Instant.now().plusSeconds(5)); + } + } + + /** + * 发送消息 + * + * @param message 消息 + */ + public void send(Object message) { + while(this.webSocket == null) { + Thread.yield(); + } + this.webSocket.sendText(JSONUtils.toJSON(message), true); + } + + /** + * 消息监听 + * + * @author acgist + */ + public class MessageListener implements Listener { + + @Override + public void onOpen(WebSocket webSocket) { + log.info("Mediasoup通道打开:{}", webSocket); + Listener.super.onOpen(webSocket); + // 关闭旧的通道 + if(MediasoupClient.this.webSocket != null && !(MediasoupClient.this.webSocket.isInputClosed() && MediasoupClient.this.webSocket.isOutputClosed())) { + MediasoupClient.this.webSocket.abort(); + } + // 设置新的通道 + MediasoupClient.this.webSocket = webSocket; + // 发送授权消息 + MediasoupClient.this.send(Map.of( + "username", MediasoupClient.this.mediasoupProperties.getUsername(), + "password", MediasoupClient.this.mediasoupProperties.getPassword() + )); + } + + @Override + public CompletionStage onBinary(WebSocket webSocket, ByteBuffer data, boolean last) { + log.debug("Mediasoup收到消息(binary):{}", webSocket); + return Listener.super.onBinary(webSocket, data, last); + } + + @Override + public CompletionStage onText(WebSocket webSocket, CharSequence data, boolean last) { + log.debug("Mediasoup收到消息(text):{}-{}", webSocket, data); + return Listener.super.onText(webSocket, data, last); + } + + @Override + public CompletionStage onClose(WebSocket webSocket, int statusCode, String reason) { + log.warn("Mediasoup通道关闭:{}-{}-{}", webSocket, statusCode, reason); + try { + return Listener.super.onClose(webSocket, statusCode, reason); + } finally { + MediasoupClient.this.taskSchedulerl.schedule(MediasoupClient.this::buildClient, Instant.now().plusSeconds(5)); + } + } + + @Override + public void onError(WebSocket webSocket, Throwable error) { + log.error("Mediasoup通道异常:{}", webSocket, error); + try { + Listener.super.onError(webSocket, error); + } finally { + MediasoupClient.this.taskSchedulerl.schedule(MediasoupClient.this::buildClient, Instant.now().plusSeconds(5)); + } + } + + @Override + public CompletionStage onPing(WebSocket webSocket, ByteBuffer message) { + log.debug("Mediasoup收到消息(ping):{}", webSocket); + return Listener.super.onPing(webSocket, message); + } + + @Override + public CompletionStage onPong(WebSocket webSocket, ByteBuffer message) { + log.debug("Mediasoup收到消息(pong):{}", webSocket); + return Listener.super.onPong(webSocket, message); + } + + } + + /** + * SSLContext + * + * @return {@link SSLContext} + */ + private static final SSLContext buildSSLContext() { + try { + // SSL协议:SSL、SSLv2、SSLv3、TLS、TLSv1、TLSv1.1、TLSv1.2、TLSv1.3 + final SSLContext sslContext = SSLContext.getInstance("TLSv1.2"); + sslContext.init(null, new X509TrustManager[] { TaoyaoTrustManager.INSTANCE }, new SecureRandom()); + return sslContext; + } catch (KeyManagementException | NoSuchAlgorithmException e) { + log.error("新建SSLContext异常", e); + } + try { + return SSLContext.getDefault(); + } catch (NoSuchAlgorithmException e) { + log.error("新建SSLContext异常", e); + } + return null; + } + + /** + * 证书验证 + * + * @author acgist + */ + public static class TaoyaoTrustManager implements X509TrustManager { + + private static final TaoyaoTrustManager INSTANCE = new TaoyaoTrustManager(); + + private TaoyaoTrustManager() { + } + + @Override + public X509Certificate[] getAcceptedIssuers() { + return new X509Certificate[0]; + } + + @Override + public void checkClientTrusted(X509Certificate[] chain, String authType) throws CertificateException { + if(chain == null) { + throw new CertificateException("证书验证失败"); + } + } + + @Override + public void checkServerTrusted(X509Certificate[] chain, String authType) throws CertificateException { + if(chain == null) { + throw new CertificateException("证书验证失败"); + } + } + + } } diff --git a/taoyao-signal-server/taoyao-media/src/main/java/com/acgist/taoyao/mediasoup/listener/Listener.java b/taoyao-signal-server/taoyao-media/src/main/java/com/acgist/taoyao/mediasoup/listener/Listener.java new file mode 100644 index 0000000..6202f7c --- /dev/null +++ b/taoyao-signal-server/taoyao-media/src/main/java/com/acgist/taoyao/mediasoup/listener/Listener.java @@ -0,0 +1,10 @@ +package com.acgist.taoyao.mediasoup.listener; + +/** + * Mediasoup事件监听 + * + * @author acgist + */ +public abstract class Listener { + +} diff --git a/taoyao-signal-server/taoyao-media/src/main/java/com/acgist/taoyao/mediasoup/transport/Transport.java b/taoyao-signal-server/taoyao-media/src/main/java/com/acgist/taoyao/mediasoup/transport/Transport.java index 593ccd9..ff27f13 100644 --- a/taoyao-signal-server/taoyao-media/src/main/java/com/acgist/taoyao/mediasoup/transport/Transport.java +++ b/taoyao-signal-server/taoyao-media/src/main/java/com/acgist/taoyao/mediasoup/transport/Transport.java @@ -3,6 +3,7 @@ package com.acgist.taoyao.mediasoup.transport; import java.util.List; import com.acgist.taoyao.mediasoup.client.ClientStream; +import com.acgist.taoyao.signal.client.ClientSession; /** * 传输通道 @@ -11,6 +12,10 @@ import com.acgist.taoyao.mediasoup.client.ClientStream; */ public final class Transport { + /** + * 终端 + */ + private ClientSession clientSession; /** * 生产者列表 */ diff --git a/taoyao-signal-server/taoyao-server/src/main/resources/application.yml b/taoyao-signal-server/taoyao-server/src/main/resources/application.yml index 963cdca..6be3678 100644 --- a/taoyao-signal-server/taoyao-server/src/main/resources/application.yml +++ b/taoyao-signal-server/taoyao-server/src/main/resources/application.yml @@ -90,8 +90,8 @@ taoyao: # 架构模式 framework: MESH # 媒体端口范围 - min-port: 45535 - max-port: 65535 + min-port: 40000 + max-port: 49999 # 公共服务 stun: - stun:stun1.l.google.com:19302 @@ -104,23 +104,21 @@ taoyao: - turn:127.0.0.1:8888 - turn:127.0.0.1:8888 - turn:127.0.0.1:8888 - # KMS服务配置:可以部署多个简单实现负载均衡 - kms: - host: 192.168.1.100 - port: 18888 - schema: wss - websocket: /websocket.signal - username: taoyao - password: taoyao - # Moon架构配置 - moon: - audio-mix: true # 信令服务配置 signal: host: 192.168.1.100 port: ${server.port:8888} schema: wss websocket: /websocket.signal + # Mediasoup服务配置:可以部署多个简单实现负载均衡 + mediasoup: + host: 127.0.0.1 + #host: 192.168.8.110 + port: 4443 + schema: wss + websocket: /websocket.signal + username: taoyao + password: taoyao record: storage: /data/record security: