[+] 优化监控流程,提高稳定性。

This commit is contained in:
acgist
2023-04-22 14:08:28 +08:00
parent aadd59a51b
commit 4da70b5216
6 changed files with 229 additions and 142 deletions

View File

@@ -182,7 +182,8 @@ public class MainActivity extends AppCompatActivity implements Serializable {
}
this.threadHandler.post(() -> {
// 进入房间
Taoyao.taoyao.roomEnter("91f81c0a-0556-4087-b9a4-5889fac36fb6", null);
// Taoyao.taoyao.roomEnter("91f81c0a-0556-4087-b9a4-5889fac36fb6", null);
Taoyao.taoyao.sessionCall("taoyao");
});
}
@@ -253,9 +254,12 @@ public class MainActivity extends AppCompatActivity implements Serializable {
}
private void removeVideo(Message message) {
synchronized (this) {
final GridLayout video = this.binding.video;
final SurfaceView surfaceView = (SurfaceView) message.obj;
video.removeView(surfaceView);
final int index = video.indexOfChild(surfaceView);
video.removeViewAt(index);
}
}
}

View File

@@ -520,7 +520,13 @@ public final class Taoyao implements ITaoyao {
request.notifyAll();
}
} else {
this.executeHandler.post(() -> this.dispatch(content, header, message));
this.executeHandler.post(() -> {
try {
this.dispatch(content, header, message);
} catch (Exception e) {
Log.e(Taoyao.class.getSimpleName(), "处理信令异常:" + content, e);
}
});
}
}
@@ -725,8 +731,8 @@ public final class Taoyao implements ITaoyao {
resources.getBoolean(R.bool.dataConsume),
resources.getBoolean(R.bool.audioConsume),
resources.getBoolean(R.bool.videoConsume),
resources.getBoolean(R.bool.audioProduce),
resources.getBoolean(R.bool.dataProduce),
resources.getBoolean(R.bool.audioProduce),
resources.getBoolean(R.bool.videoProduce),
this.mediaManager.getMediaProperties(),
this.mediaManager.getWebrtcProperties()
@@ -775,7 +781,7 @@ public final class Taoyao implements ITaoyao {
room.closeRemoteClient(clientId);
}
private void sessionCall(String clientId) {
public void sessionCall(String clientId) {
this.requestFuture(
this.buildMessage(
"session::call",
@@ -794,14 +800,13 @@ public final class Taoyao implements ITaoyao {
resources.getBoolean(R.bool.dataConsume),
resources.getBoolean(R.bool.audioConsume),
resources.getBoolean(R.bool.videoConsume),
resources.getBoolean(R.bool.audioProduce),
resources.getBoolean(R.bool.dataProduce),
resources.getBoolean(R.bool.audioProduce),
resources.getBoolean(R.bool.videoProduce),
this.mediaManager.getMediaProperties(),
this.mediaManager.getWebrtcProperties()
);
sessionClient.init();
sessionClient.offer();
this.sessionClients.put(sessionId, sessionClient);
}
);
}
@@ -819,8 +824,8 @@ public final class Taoyao implements ITaoyao {
resources.getBoolean(R.bool.dataConsume),
resources.getBoolean(R.bool.audioConsume),
resources.getBoolean(R.bool.videoConsume),
resources.getBoolean(R.bool.audioProduce),
resources.getBoolean(R.bool.dataProduce),
resources.getBoolean(R.bool.audioProduce),
resources.getBoolean(R.bool.videoProduce),
this.mediaManager.getMediaProperties(),
this.mediaManager.getWebrtcProperties()

View File

@@ -4,6 +4,7 @@ import android.os.Handler;
import android.util.Log;
import com.acgist.taoyao.boot.model.Message;
import com.acgist.taoyao.boot.utils.JSONUtils;
import com.acgist.taoyao.boot.utils.ListUtils;
import com.acgist.taoyao.boot.utils.MapUtils;
import com.acgist.taoyao.media.VideoSourceType;
@@ -24,9 +25,11 @@ import org.webrtc.SdpObserver;
import org.webrtc.SessionDescription;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
/**
* P2P终端
@@ -53,6 +56,10 @@ public class SessionClient extends Client {
private final boolean videoProduce;
private final MediaProperties mediaProperties;
private final WebrtcProperties webrtcProperties;
/**
* 是否已经提供本地媒体
*/
private volatile boolean offerLocal;
/**
* 本地媒体
*/
@@ -65,14 +72,6 @@ public class SessionClient extends Client {
* 远程媒体
*/
private MediaStream remoteMediaStream;
/**
* OfferSdpObserver
*/
private SdpObserver offerSdpObserver;
/**
* AnswerSdpObserver
*/
private SdpObserver answerSdpObserver;
/**
* Peer连接
*/
@@ -140,8 +139,6 @@ public class SessionClient extends Client {
final PeerConnection.RTCConfiguration configuration = new PeerConnection.RTCConfiguration(iceServers);
this.observer = this.observer();
this.mediaStream = this.mediaManager.buildLocalMediaStream(this.audioProduce, this.videoProduce);
this.offerSdpObserver = this.offerSdpObserver();
this.answerSdpObserver = this.answerSdpObserver();
this.peerConnection = this.peerConnectionFactory.createPeerConnection(configuration, this.observer);
this.peerConnection.addStream(this.mediaStream);
// 设置streamId同步
@@ -170,31 +167,62 @@ public class SessionClient extends Client {
/**
* 提供媒体服务
*/
public void offer() {
public synchronized void offer() {
if(this.offerLocal) {
return;
}
this.offerLocal = true;
final MediaConstraints mediaConstraints = this.mediaManager.buildMediaConstraints();
this.peerConnection.createOffer(this.offerSdpObserver, mediaConstraints);
this.peerConnection.createOffer(this.sdpObserver(
"主动Offer",
sessionDescription -> {
this.peerConnection.setLocalDescription(this.sdpObserver(
"主动OfferExchange",
null,
() -> {
this.exchangeSessionDescription(sessionDescription);
}
), sessionDescription);
},
null
), mediaConstraints);
}
private void offer(Message message, Map<String, Object> body) {
this.init();
final String sdp = MapUtils.get(body, "sdp");
final String type = MapUtils.get(body, "type");
final SessionDescription.Type sdpType = SessionDescription.Type.valueOf(type.toUpperCase());
final SessionDescription sessionDescription = new SessionDescription(sdpType, sdp);
this.peerConnection.setRemoteDescription(this.offerSdpObserver, sessionDescription);
this.answer();
this.peerConnection.setRemoteDescription(this.sdpObserver(
"被动Offer",
null,
() -> this.peerConnection.createAnswer(this.sdpObserver(
"主动Answer",
sessionDescription -> {
this.peerConnection.setLocalDescription(this.sdpObserver(
"主动AnswerExchange",
null,
() -> {
this.exchangeSessionDescription(sessionDescription);
this.offer();
}
private void answer() {
final MediaConstraints mediaConstraints = this.mediaManager.buildMediaConstraints();
this.peerConnection.createAnswer(this.answerSdpObserver, mediaConstraints);
), sessionDescription);
},
null
), this.mediaManager.buildMediaConstraints())
), new SessionDescription(sdpType, sdp));
}
private void answer(Message message, Map<String, Object> body) {
final String sdp = MapUtils.get(body, "sdp");
final String type = MapUtils.get(body, "type");
final SessionDescription.Type sdpType = SessionDescription.Type.valueOf(type.toUpperCase());
final SessionDescription sessionDescription = new SessionDescription(sdpType, sdp);
this.peerConnection.setRemoteDescription(this.answerSdpObserver, sessionDescription);
this.peerConnection.setRemoteDescription(this.sdpObserver(
"被动Answer",
null,
null
// () -> this.offer()
), new SessionDescription(sdpType, sdp));
}
private void candidate(Message message, Map<String, Object> body) {
@@ -205,8 +233,7 @@ public class SessionClient extends Client {
if(sdp == null || sdpMid == null || sdpMLineIndex == null) {
Log.w(SessionClient.class.getSimpleName(), "无效媒体协商:" + body);
} else {
final IceCandidate iceCandidate = new IceCandidate(sdpMid, sdpMLineIndex, sdp);
this.peerConnection.addIceCandidate(iceCandidate);
this.peerConnection.addIceCandidate(new IceCandidate(sdpMid, sdpMLineIndex, sdp));
}
}
@@ -294,15 +321,19 @@ public class SessionClient extends Client {
return;
}
super.close();
try {
// if(this.mediaStream != null) {
// this.mediaStream.dispose();
// }
if(this.remoteMediaStream != null) {
this.remoteMediaStream.dispose();
}
// if(this.remoteMediaStream != null) {
// this.remoteMediaStream.dispose();
// }
if(this.peerConnection != null) {
this.peerConnection.dispose();
}
} catch (Exception e) {
Log.e(SessionClient.class.getSimpleName(), "释放资源异常", e);
}
this.mediaManager.closeClient();
}
}
@@ -315,17 +346,23 @@ public class SessionClient extends Client {
@Override
public void onSignalingChange(PeerConnection.SignalingState signalingState) {
Log.d(SessionClient.class.getSimpleName(), "SignalingState状态改变:" + signalingState);
Log.d(SessionClient.class.getSimpleName(), "PC信令状态改变:" + signalingState);
SessionClient.this.logState();
// TODO处理失败
}
@Override
public void onIceGatheringChange(PeerConnection.IceGatheringState iceGatheringState) {
Log.d(SessionClient.class.getSimpleName(), "IceGatheringState状态改变:" + iceGatheringState);
Log.d(SessionClient.class.getSimpleName(), "PCIce收集状态改变:" + iceGatheringState);
SessionClient.this.logState();
// TODO处理失败
}
@Override
public void onIceConnectionChange(PeerConnection.IceConnectionState iceConnectionState) {
Log.d(SessionClient.class.getSimpleName(), "IceConnectionState状态改变:" + iceConnectionState);
Log.d(SessionClient.class.getSimpleName(), "PCIce连接状态改变:" + iceConnectionState);
SessionClient.this.logState();
// TODO处理失败
}
@Override
@@ -335,10 +372,14 @@ public class SessionClient extends Client {
@Override
public void onIceCandidate(IceCandidate iceCandidate) {
Log.d(SessionClient.class.getSimpleName(), "发送媒体协商:" + SessionClient.this.sessionId);
final Map<String, Object> candidate = new HashMap<>();
candidate.put("sdpMid", iceCandidate.sdpMid);
candidate.put("candidate", iceCandidate.sdp);
candidate.put("sdpMLineIndex", iceCandidate.sdpMLineIndex);
SessionClient.this.taoyao.push(SessionClient.this.taoyao.buildMessage(
"session::exchange",
"type", "candidate",
"candidate", iceCandidate,
"candidate", candidate,
"sessionId", SessionClient.this.sessionId
));
}
@@ -376,7 +417,7 @@ public class SessionClient extends Client {
@Override
public void onRenegotiationNeeded() {
Log.d(SessionClient.class.getSimpleName(), "重新协商媒体:" + SessionClient.this.sessionId);
if(peerConnection.connectionState() == PeerConnection.PeerConnectionState.CONNECTED) {
if(SessionClient.this.peerConnection.connectionState() == PeerConnection.PeerConnectionState.CONNECTED) {
// TODO重新协商
// SessionClient.this.offer();
}
@@ -385,70 +426,75 @@ public class SessionClient extends Client {
};
}
private SdpObserver offerSdpObserver() {
/**
* @param tag 标记
* @param createSuccessConsumer 创建成功消费者
* @param setSuccessRunnable 设置成功执行者
*
* @return SDP观察者
*/
private SdpObserver sdpObserver(String tag, Consumer<SessionDescription> createSuccessConsumer, Runnable setSuccessRunnable) {
return new SdpObserver() {
@Override
public void onCreateSuccess(SessionDescription sessionDescription) {
Log.d(SessionClient.class.getSimpleName(), "创建OfferSDP成功" + SessionClient.this.sessionId);
SessionClient.this.peerConnection.setLocalDescription(this, sessionDescription);
SessionClient.this.taoyao.push(SessionClient.this.taoyao.buildMessage(
"session::exchange",
"sdp", sessionDescription.description,
"type", sessionDescription.type.toString().toLowerCase(),
"sessionId", SessionClient.this.sessionId
));
Log.d(SessionClient.class.getSimpleName(), "创建" + tag + "SDP成功" + SessionClient.this.sessionId);
SessionClient.this.logState();
if(createSuccessConsumer != null) {
createSuccessConsumer.accept(sessionDescription);
}
}
@Override
public void onSetSuccess() {
Log.d(SessionClient.class.getSimpleName(), "设置OfferSDP成功" + SessionClient.this.sessionId);
Log.d(SessionClient.class.getSimpleName(), "设置" + tag + "SDP成功" + SessionClient.this.sessionId);
SessionClient.this.logState();
if(setSuccessRunnable != null) {
setSuccessRunnable.run();
}
}
@Override
public void onCreateFailure(String message) {
Log.w(SessionClient.class.getSimpleName(), "创建OfferSDP失败" + message);
Log.w(SessionClient.class.getSimpleName(), "创建" + tag + "SDP失败" + message);
SessionClient.this.logState();
}
@Override
public void onSetFailure(String message) {
Log.w(SessionClient.class.getSimpleName(), "设置OfferSDP失败" + message);
Log.w(SessionClient.class.getSimpleName(), "设置" + tag + "SDP失败" + message);
SessionClient.this.logState();
}
};
}
private SdpObserver answerSdpObserver() {
return new SdpObserver() {
@Override
public void onCreateSuccess(SessionDescription sessionDescription) {
Log.d(SessionClient.class.getSimpleName(), "创建AnswerSDP成功" + SessionClient.this.sessionId);
SessionClient.this.peerConnection.setLocalDescription(this, sessionDescription);
private void exchangeSessionDescription(SessionDescription sessionDescription) {
if(sessionDescription == null) {
return;
}
final String type = sessionDescription.type.toString().toLowerCase();
SessionClient.this.taoyao.push(SessionClient.this.taoyao.buildMessage(
"session::exchange",
"sdp", sessionDescription.description,
"type", sessionDescription.type.toString().toLowerCase(),
"type", type,
"sessionId", SessionClient.this.sessionId
));
}
@Override
public void onSetSuccess() {
Log.d(SessionClient.class.getSimpleName(), "设置AnswerSDP成功" + SessionClient.this.sessionId);
}
@Override
public void onCreateFailure(String message) {
Log.w(SessionClient.class.getSimpleName(), "创建AnswerSDP失败" + message);
}
@Override
public void onSetFailure(String message) {
Log.w(SessionClient.class.getSimpleName(), "设置AnswerSDP失败" + message);
}
};
private void logState() {
Log.d(SessionClient.class.getSimpleName(), String.format(
"""
PC信令状态%s
PC连接状态%s
PCIce收集状态%s
PCIce连接状态%s
""",
this.peerConnection.signalingState().name(),
this.peerConnection.connectionState().name(),
this.peerConnection.iceGatheringState().name(),
this.peerConnection.iceConnectionState().name()
));
}
}

View File

@@ -1,5 +1,7 @@
package com.acgist.taoyao.media.config;
import com.fasterxml.jackson.annotation.JsonIgnore;
import org.apache.commons.lang3.ArrayUtils;
import org.webrtc.PeerConnection;
@@ -20,6 +22,7 @@ public class WebrtcProperties {
private WebrtcStunProperties[] stun;
private WebrtcTurnProperties[] turn;
@JsonIgnore
public List<PeerConnection.IceServer> getIceServers() {
final List<PeerConnection.IceServer> iceServers = new ArrayList<>();
if(ArrayUtils.isNotEmpty(this.stun)) {

View File

@@ -235,8 +235,14 @@ class Session {
id;
// 远程终端名称
name;
// 是否关闭
closed;
// 远程终端ID
clientId;
// 会话ID
sessionId;
// 是否已经提供本地媒体
offerLocal;
// 本地媒体流
localStream;
// 本地音频
@@ -251,29 +257,30 @@ class Session {
peerConnection;
constructor({
id,
name,
clientId
clientId,
sessionId,
}) {
this.id = id;
this.id = sessionId;
this.name = name;
this.closed = false;
this.clientId = clientId;
this.sessionId = sessionId;
this.offerLocal = false;
}
async pause() {
this.localAudioTrack.enabled = false;
this.localVideoTrack.enabled = false;
this.localStream.active = false;
}
async resume() {
this.localAudioTrack.enabled = true;
this.localVideoTrack.enabled = true;
this.localStream.active = true;
}
async close() {
this.localStream.active = false;
this.closed = true;
this.localAudioTrack.stop();
this.localVideoTrack.stop();
this.remoteAudioTrack.stop();
@@ -281,6 +288,17 @@ class Session {
this.peerConnection.close();
}
async addIceCandidate(candidate) {
if(this.closed) {
return;
}
if(this.peerConnection) {
await this.peerConnection.addIceCandidate(new RTCIceCandidate(candidate));
} else {
setTimeout(() => this.addIceCandidate(candidate), 50);
}
}
}
/**
@@ -2122,40 +2140,18 @@ class Taoyao extends RemoteClient {
})
);
const { name, sessionId } = response.body;
const session = new Session(name, response.body.clientId, sessionId);
const session = new Session({name, clientId: response.body.clientId, sessionId});
this.sessionClients.set(sessionId, session);
session.peerConnection = await me.buildPeerConnection(session, sessionId);
const localStream = await me.getStream();
session.localAudioTrack = localStream.getAudioTracks()[0];
session.localVideoTrack = localStream.getVideoTracks()[0];
session.peerConnection.addTrack(session.localAudioTrack, localStream);
session.peerConnection.addTrack(session.localVideoTrack, localStream);
session.peerConnection.createOffer().then(async description => {
await session.peerConnection.setLocalDescription(description);
me.push(
protocol.buildMessage("session::exchange", {
sdp : description.sdp,
type : description.type,
sessionId: sessionId
})
);
});
}
async defaultSessionCall(message) {
const me = this;
const { name, clientId, sessionId } = message.body;
const session = new Session(name, clientId, sessionId);
const session = new Session({name, clientId, sessionId});
this.sessionClients.set(sessionId, session);
session.peerConnection = await me.buildPeerConnection(session, sessionId);
const localStream = await me.getStream();
session.localStream = localStream;
session.localAudioTrack = localStream.getAudioTracks()[0];
session.localVideoTrack = localStream.getVideoTracks()[0];
// 相同Stream音视频同步
session.peerConnection.addTrack(session.localAudioTrack, localStream);
session.peerConnection.addTrack(session.localVideoTrack, localStream);
await me.buildPeerConnection(session, sessionId);
session.peerConnection.createOffer().then(async description => {
session.offerLocal = true;
await session.peerConnection.setLocalDescription(description);
me.push(
protocol.buildMessage("session::exchange", {
@@ -2192,7 +2188,8 @@ class Taoyao extends RemoteClient {
const { type, candidate, sessionId } = message.body;
const session = this.sessionClients.get(sessionId);
if (type === "offer") {
session.peerConnection.setRemoteDescription(new RTCSessionDescription(message.body));
await me.buildPeerConnection(session, sessionId);
await session.peerConnection.setRemoteDescription(new RTCSessionDescription(message.body));
session.peerConnection.createAnswer().then(async description => {
await session.peerConnection.setLocalDescription(description);
me.push(
@@ -2202,13 +2199,27 @@ class Taoyao extends RemoteClient {
sessionId: sessionId
})
);
if(!session.offerLocal) {
session.peerConnection.createOffer().then(async description => {
await session.peerConnection.setLocalDescription(description);
me.push(
protocol.buildMessage("session::exchange", {
sdp : description.sdp,
type : description.type,
sessionId: sessionId
})
);
});
}
});
} else if (type === "answer") {
await session.peerConnection.setRemoteDescription(new RTCSessionDescription(message.body));
} else if (type === "candidate") {
if(candidate) {
await session.peerConnection.addIceCandidate(new RTCIceCandidate(candidate));
if(!candidate || candidate.sdpMid === undefined || candidate.sdpMLineIndex === undefined && candidate.candidate === undefined) {
return;
}
await session.addIceCandidate(candidate);
} else {
}
}
@@ -2231,6 +2242,9 @@ class Taoyao extends RemoteClient {
}
async buildPeerConnection(session, sessionId) {
if(session.peerConnection) {
return session.peerConnection;
}
const me = this;
const peerConnection = new RTCPeerConnection({"iceServers" : [{"url" : "stun:stun1.l.google.com:19302"}]});
peerConnection.ontrack = event => {
@@ -2262,6 +2276,13 @@ class Taoyao extends RemoteClient {
// TODO重连
}
}
const localStream = await me.getStream();
session.localStream = localStream;
session.peerConnection = peerConnection;
session.localAudioTrack = localStream.getAudioTracks()[0];
session.localVideoTrack = localStream.getVideoTracks()[0];
await session.peerConnection.addTrack(session.localAudioTrack, localStream);
await session.peerConnection.addTrack(session.localVideoTrack, localStream);
return peerConnection;
}

View File

@@ -174,20 +174,28 @@ taoyao:
encrypt: false
# STUN服务
stun:
- host: 192.168.1.110
port: 3478
- host: 192.168.8.110
port: 3478
# - host: 192.168.1.110
# port: 3478
# - host: 192.168.8.110
# port: 3478
- host: stun1.l.google.com
port: 19302
- host: stun2.l.google.com
port: 19302
- host: stun3.l.google.com
port: 19302
- host: stun4.l.google.com
port: 19302
# TURN服务
turn:
- host: 192.168.1.110
port: 3478
username: taoyao
password: taoyao
- host: 192.168.8.110
port: 3478
username: taoyao
password: taoyao
# - host: 192.168.1.110
# port: 3478
# username: taoyao
# password: taoyao
# - host: 192.168.8.110
# port: 3478
# username: taoyao
# password: taoyao
# 摄像头配置
camera:
# 混音