[*] 流程

This commit is contained in:
acgist
2023-04-09 14:33:08 +08:00
parent 46f21e3a43
commit 124b161644
19 changed files with 477 additions and 162 deletions

View File

@@ -505,24 +505,26 @@ public final class Taoyao implements ITaoyao {
case "client::shutdown" -> this.clientShutdown(message, body); case "client::shutdown" -> this.clientShutdown(message, body);
case "media::consume" -> this.mediaConsume(message, body); case "media::consume" -> this.mediaConsume(message, body);
// case "media::audio::volume" -> this.mediaAudioVolume(message, body); // case "media::audio::volume" -> this.mediaAudioVolume(message, body);
// case "media::consumer::close" -> this.mediaConsumerClose(message, body); case "media::consumer::close" -> this.mediaConsumerClose(message, body);
// case "media::consumer::pause" -> this.mediaConsumerPause(message, body); case "media::consumer::pause" -> this.mediaConsumerPause(message, body);
// case "media::consumer::request::key::frame" -> this.mediaConsumerRequestKeyFrame(message, body); // case "media::consumer::request::key::frame" -> this.mediaConsumerRequestKeyFrame(message, body);
// case "media::consumer::resume" -> this.mediaConsumerResume(message, body); case "media::consumer::resume" -> this.mediaConsumerResume(message, body);
// case "media::consumer::set::preferred::layers" -> this.mediaConsumerSetPreferredLayers(message, body); // case "media::consumer::set::preferred::layers" -> this.mediaConsumerSetPreferredLayers(message, body);
// case "media::consumer::status" -> this.mediaConsumerStatus(message, body); // case "media::consumer::status" -> this.mediaConsumerStatus(message, body);
// case "media::producer::close" -> this.mediaProducerClose(message, body); case "media::producer::close" -> this.mediaProducerClose(message, body);
// case "media::producer::pause" -> this.mediaProducerPause(message, body); case "media::producer::pause" -> this.mediaProducerPause(message, body);
// case "media::producer::resume" -> this.mediaProducerResume(message, body); case "media::producer::resume" -> this.mediaProducerResume(message, body);
// case "media::producer::video::orientation:change" -> this.mediaVideoOrientationChange(message, body); // case "media::producer::video::orientation:change" -> this.mediaVideoOrientationChange(message, body);
case "room::close" -> this.roomClose(message, body); case "room::close" -> this.roomClose(message, body);
case "room::enter" -> this.roomEnter(message, body); case "room::enter" -> this.roomEnter(message, body);
// case "room::expel" -> this.roomExpel(message, body); case "room::expel" -> this.roomExpel(message, body);
case "room::invite" -> this.roomInivte(message, body); case "room::invite" -> this.roomInivte(message, body);
// case "room::leave" -> this.roomLeave(message, body); case "room::leave" -> this.roomLeave(message, body);
case "session::call" -> this.sessionCall(message, body); case "session::call" -> this.sessionCall(message, body);
case "session::close" -> this.sessionClose(message, body); case "session::close" -> this.sessionClose(message, body);
case "session::exchange" -> this.sessionExchange(message, body); case "session::exchange" -> this.sessionExchange(message, body);
case "session::pause" -> this.sessionPause(message, body);
case "session::resume" -> this.sessionResume(message, body);
default -> Log.d(Taoyao.class.getSimpleName(), "没有适配信令:" + content); default -> Log.d(Taoyao.class.getSimpleName(), "没有适配信令:" + content);
} }
} }
@@ -590,6 +592,60 @@ public final class Taoyao implements ITaoyao {
room.mediaConsume(message, body); room.mediaConsume(message, body);
} }
private void mediaConsumerClose(Message message, Map<String, Object> body) {
final String roomId = MapUtils.get(body, "roomId");
final Room room = this.rooms.get(roomId);
if(room == null) {
return;
}
room.mediaConsumerClose(body);
}
private void mediaConsumerPause(Message message, Map<String, Object> body) {
final String roomId = MapUtils.get(body, "roomId");
final Room room = this.rooms.get(roomId);
if(room == null) {
return;
}
room.mediaConsumerPause(body);
}
private void mediaConsumerResume(Message message, Map<String, Object> body) {
final String roomId = MapUtils.get(body, "roomId");
final Room room = this.rooms.get(roomId);
if(room == null) {
return;
}
room.mediaConsumerResume(body);
}
private void mediaProducerClose(Message message, Map<String, Object> body) {
final String roomId = MapUtils.get(body, "roomId");
final Room room = this.rooms.get(roomId);
if(room == null) {
return;
}
room.mediaProducerClose(body);
}
private void mediaProducerPause(Message message, Map<String, Object> body) {
final String roomId = MapUtils.get(body, "roomId");
final Room room = this.rooms.get(roomId);
if(room == null) {
return;
}
room.mediaProducerPause(body);
}
private void mediaProducerResume(Message message, Map<String, Object> body) {
final String roomId = MapUtils.get(body, "roomId");
final Room room = this.rooms.get(roomId);
if(room == null) {
return;
}
room.mediaProducerResume(body);
}
private void roomClose(Message message, Map<String, Object> body) { private void roomClose(Message message, Map<String, Object> body) {
final String roomId = MapUtils.get(body, "roomId"); final String roomId = MapUtils.get(body, "roomId");
final Room room = this.rooms.remove(roomId); final Room room = this.rooms.remove(roomId);
@@ -608,12 +664,6 @@ public final class Taoyao implements ITaoyao {
room.newRemoteClient(body); room.newRemoteClient(body);
} }
private void roomInivte(Message message, Map<String, Object> body) {
final String roomId = MapUtils.get(body, "roomId");
final String password = MapUtils.get(body, "password");
this.roomEnter(roomId, password);
}
public Room roomEnter(String roomId, String password) { public Room roomEnter(String roomId, String password) {
final Resources resources = this.context.getResources(); final Resources resources = this.context.getResources();
final Room room = this.rooms.computeIfAbsent( final Room room = this.rooms.computeIfAbsent(
@@ -621,7 +671,7 @@ public final class Taoyao implements ITaoyao {
key -> new Room( key -> new Room(
this.name, this.clientId, this.name, this.clientId,
key, password, key, password,
this.mainHandler, this, this, this.mainHandler,
resources.getBoolean(R.bool.dataConsume), resources.getBoolean(R.bool.dataConsume),
resources.getBoolean(R.bool.audioConsume), resources.getBoolean(R.bool.audioConsume),
resources.getBoolean(R.bool.videoConsume), resources.getBoolean(R.bool.videoConsume),
@@ -630,22 +680,66 @@ public final class Taoyao implements ITaoyao {
resources.getBoolean(R.bool.videoProduce) resources.getBoolean(R.bool.videoProduce)
) )
); );
room.enter(); final boolean success = room.enter();
if(success) {
room.mediaProduce(); room.mediaProduce();
return room; return room;
} else {
this.rooms.remove(roomId);
return null;
}
}
private void roomExpel(Message message, Map<String, Object> body) {
final String roomId = MapUtils.get(body, "roomId");
this.roomLeave(roomId);
}
private void roomInivte(Message message, Map<String, Object> body) {
final String roomId = MapUtils.get(body, "roomId");
final String password = MapUtils.get(body, "password");
this.roomEnter(roomId, password);
}
public void roomLeave(String roomId) {
final Room room = this.rooms.remove(roomId);
if(room == null) {
return;
}
this.push(this.buildMessage(
"room::leave",
"roomId", roomId
));
room.close();
}
private void roomLeave(Message message, Map<String, Object> body) {
final String roomId = MapUtils.get(body, "roomId");
final String clientId = MapUtils.get(body, "clientId");
final Room room = this.rooms.get(roomId);
if(room == null) {
return;
}
room.closeRemoteClient(clientId);
} }
private void sessionCall(Message message, Map<String, Object> body) { private void sessionCall(Message message, Map<String, Object> body) {
final String name = MapUtils.get(body, "name"); final String name = MapUtils.get(body, "name");
final String clientId = MapUtils.get(body, "clientId"); final String clientId = MapUtils.get(body, "clientId");
final String sessionId = MapUtils.get(body, "sessionId"); final String sessionId = MapUtils.get(body, "sessionId");
final SessionClient sessionClient = new SessionClient(sessionId, name, clientId, this.mainHandler, this); final SessionClient sessionClient = new SessionClient(sessionId, name, clientId, this, this.mainHandler);
this.sessionClients.put(sessionId, sessionClient); this.sessionClients.put(sessionId, sessionClient);
sessionClient.init(); sessionClient.init();
sessionClient.offer(); sessionClient.offer();
} }
private void sessionClose(Message message, Map<String, Object> body) { private void sessionClose(Message message, Map<String, Object> body) {
final String sessionId = MapUtils.get(body, "sessionId");
final SessionClient sessionClient = this.sessionClients.remove(sessionId);
if(sessionClient == null) {
return;
}
sessionClient.close();
} }
private void sessionExchange(Message message, Map<String, Object> body) { private void sessionExchange(Message message, Map<String, Object> body) {
@@ -658,6 +752,24 @@ public final class Taoyao implements ITaoyao {
sessionClient.exchange(message, body); sessionClient.exchange(message, body);
} }
private void sessionPause(Message message, Map<String, Object> body) {
final String sessionId = MapUtils.get(body, "sessionId");
final SessionClient sessionClient = this.sessionClients.get(sessionId);
if(sessionClient == null) {
return;
}
sessionClient.pause();
}
private void sessionResume(Message message, Map<String, Object> body) {
final String sessionId = MapUtils.get(body, "sessionId");
final SessionClient sessionClient = this.sessionClients.get(sessionId);
if(sessionClient == null) {
return;
}
sessionClient.resume();
}
/** /**
* 心跳 * 心跳
*/ */

View File

@@ -278,6 +278,7 @@ public final class MediaManager {
/** /**
* 关闭一个终端 * 关闭一个终端
* 最后一个终端关闭时,释放所有资源。 * 最后一个终端关闭时,释放所有资源。
* 注意:所有本地媒体关闭调用,不要直接关闭本地媒体流。
* *
* @return 剩余终端数量 * @return 剩余终端数量
*/ */
@@ -298,7 +299,7 @@ public final class MediaManager {
public RecordClient startRecord(String path, String filename) { public RecordClient startRecord(String path, String filename) {
synchronized (this) { synchronized (this) {
this.recordClient = new RecordClient(path, filename, this.handler, this.taoyao); this.recordClient = new RecordClient(path, filename, this.taoyao, this.handler);
this.recordClient.start(); this.recordClient.start();
return this.recordClient; return this.recordClient;
} }

View File

@@ -20,7 +20,7 @@ import java.io.Closeable;
* *
* @author acgist * @author acgist
*/ */
public abstract class Client implements Closeable { public abstract class Client extends CloseableClient {
/** /**
* 终端名称 * 终端名称
@@ -30,29 +30,15 @@ public abstract class Client implements Closeable {
* 终端ID * 终端ID
*/ */
protected final String clientId; protected final String clientId;
/**
* Handler
*/
protected final Handler handler;
/**
* 信令通道
*/
protected final ITaoyao taoyao;
/**
* 媒体服务
*/
protected final MediaManager mediaManager;
/** /**
* 视频预览 * 视频预览
*/ */
protected SurfaceViewRenderer surfaceViewRenderer; protected SurfaceViewRenderer surfaceViewRenderer;
public Client(String name, String clientId, Handler handler, ITaoyao taoyao) { public Client(String name, String clientId, ITaoyao taoyao, Handler handler) {
super(taoyao, handler);
this.name = name; this.name = name;
this.clientId = clientId; this.clientId = clientId;
this.taoyao = taoyao;
this.handler = handler;
this.mediaManager = MediaManager.getInstance();
} }
/** /**
@@ -86,8 +72,15 @@ public abstract class Client implements Closeable {
} }
} }
public void pause() {
}
public void resume() {
}
@Override @Override
public void close() { public void close() {
super.close();
Log.i(this.getClass().getSimpleName(), "关闭终端:" + this.clientId); Log.i(this.getClass().getSimpleName(), "关闭终端:" + this.clientId);
if(this.surfaceViewRenderer != null) { if(this.surfaceViewRenderer != null) {
this.surfaceViewRenderer.release(); this.surfaceViewRenderer.release();

View File

@@ -0,0 +1,61 @@
package com.acgist.taoyao.media.client;
import android.os.Handler;
import com.acgist.taoyao.media.MediaManager;
import com.acgist.taoyao.media.signal.ITaoyao;
import org.webrtc.PeerConnectionFactory;
import java.io.Closeable;
/**
* 需要关闭终端
*
* @author acgist
*/
public abstract class CloseableClient implements Closeable {
/**
* 是否加载
* 防止重复加载
*/
protected volatile boolean init;
/**
* 是否关闭
*/
protected volatile boolean close;
/**
* 信令通道
*/
protected final ITaoyao taoyao;
/**
* Handler
*/
protected final Handler handler;
/**
* 媒体服务
*/
protected final MediaManager mediaManager;
public CloseableClient(ITaoyao taoyao, Handler handler) {
this.init = false;
this.close = false;
this.taoyao = taoyao;
this.handler = handler;
this.mediaManager = MediaManager.getInstance();
}
/**
* 加载
*/
protected void init() {
this.init = true;
}
@Override
public void close() {
this.close = true;
}
}

View File

@@ -20,8 +20,8 @@ public class LocalClient extends RoomClient {
*/ */
protected MediaStream mediaStream; protected MediaStream mediaStream;
public LocalClient(String name, String clientId, Handler handler, ITaoyao taoyao) { public LocalClient(String name, String clientId, ITaoyao taoyao, Handler handler) {
super(name, clientId, handler, taoyao); super(name, clientId, taoyao, handler);
} }
public MediaStream getMediaStream() { public MediaStream getMediaStream() {
@@ -58,9 +58,6 @@ public class LocalClient extends RoomClient {
@Override @Override
public void close() { public void close() {
super.close(); super.close();
if(this.mediaStream != null) {
this.mediaStream.dispose();
}
} }
} }

View File

@@ -35,10 +35,6 @@ import java.util.concurrent.Executors;
*/ */
public class RecordClient extends Client { public class RecordClient extends Client {
/**
* 是否正在录像
*/
private volatile boolean active;
/** /**
* 音频准备录制 * 音频准备录制
*/ */
@@ -92,8 +88,8 @@ public class RecordClient extends Client {
*/ */
private final ExecutorService executorService; private final ExecutorService executorService;
public RecordClient(String path, String filename, Handler handler, ITaoyao taoyao) { public RecordClient(String path, String filename, ITaoyao taoyao, Handler handler) {
super("本地录像", "LocalRecordClient", handler, taoyao); super("本地录像", "LocalRecordClient", taoyao, handler);
this.filename = filename; this.filename = filename;
final Path filePath = Paths.get(Environment.getExternalStoragePublicDirectory(Environment.DIRECTORY_DOCUMENTS).getAbsolutePath(), path, filename); final Path filePath = Paths.get(Environment.getExternalStoragePublicDirectory(Environment.DIRECTORY_DOCUMENTS).getAbsolutePath(), path, filename);
final File parentFile = filePath.getParent().toFile(); final File parentFile = filePath.getParent().toFile();
@@ -105,19 +101,13 @@ public class RecordClient extends Client {
this.executorService = Executors.newFixedThreadPool(2); this.executorService = Executors.newFixedThreadPool(2);
} }
/**
* @return 是否正在录像
*/
public boolean isActive() {
return this.active;
}
public void start() { public void start() {
synchronized (this) { synchronized (this) {
if(this.active) { if(this.init) {
return; return;
} }
this.active = true; super.init();
this.mediaManager.newClient(MediaManager.Type.BACK);
this.record(null, null, 1, 1); this.record(null, null, 1, 1);
} }
} }
@@ -161,7 +151,7 @@ public class RecordClient extends Client {
int outputIndex; int outputIndex;
this.audioCodec.start(); this.audioCodec.start();
this.audioActive = true; this.audioActive = true;
while (this.active) { while (!this.close) {
outputIndex = this.audioCodec.dequeueOutputBuffer(info, 1000L * 1000); outputIndex = this.audioCodec.dequeueOutputBuffer(info, 1000L * 1000);
if (outputIndex == MediaCodec.INFO_TRY_AGAIN_LATER) { if (outputIndex == MediaCodec.INFO_TRY_AGAIN_LATER) {
} else if (outputIndex == MediaCodec.INFO_OUTPUT_FORMAT_CHANGED) { } else if (outputIndex == MediaCodec.INFO_OUTPUT_FORMAT_CHANGED) {
@@ -173,7 +163,7 @@ public class RecordClient extends Client {
this.pts = System.currentTimeMillis(); this.pts = System.currentTimeMillis();
this.mediaMuxer.start(); this.mediaMuxer.start();
this.notifyAll(); this.notifyAll();
} else if (this.active) { } else if (!this.close) {
try { try {
this.wait(); this.wait();
} catch (InterruptedException e) { } catch (InterruptedException e) {
@@ -208,12 +198,12 @@ public class RecordClient extends Client {
} }
public void putAudio(JavaAudioDeviceModule.AudioSamples audioSamples) { public void putAudio(JavaAudioDeviceModule.AudioSamples audioSamples) {
if(this.active && this.audioActive) { if(!this.close && this.audioActive) {
} }
} }
public void putVideo(VideoFrame videoFrame) { public void putVideo(VideoFrame videoFrame) {
if (this.active && this.videoActive) { if (!this.close && this.videoActive) {
this.executorService.submit(() -> { this.executorService.submit(() -> {
// TextureBufferImpl // TextureBufferImpl
// videoFrame.retain(); // videoFrame.retain();
@@ -281,7 +271,7 @@ public class RecordClient extends Client {
int outputIndex; int outputIndex;
this.videoCodec.start(); this.videoCodec.start();
this.videoActive = true; this.videoActive = true;
while (this.active) { while (!this.close) {
outputIndex = this.videoCodec.dequeueOutputBuffer(info, 1000L * 1000); outputIndex = this.videoCodec.dequeueOutputBuffer(info, 1000L * 1000);
if (outputIndex == MediaCodec.INFO_TRY_AGAIN_LATER) { if (outputIndex == MediaCodec.INFO_TRY_AGAIN_LATER) {
} else if (outputIndex == MediaCodec.INFO_OUTPUT_FORMAT_CHANGED) { } else if (outputIndex == MediaCodec.INFO_OUTPUT_FORMAT_CHANGED) {
@@ -293,7 +283,7 @@ public class RecordClient extends Client {
this.pts = System.currentTimeMillis(); this.pts = System.currentTimeMillis();
this.mediaMuxer.start(); this.mediaMuxer.start();
this.notifyAll(); this.notifyAll();
} else if (this.active) { } else if (!this.close) {
try { try {
this.wait(); this.wait();
} catch (InterruptedException e) { } catch (InterruptedException e) {
@@ -346,9 +336,14 @@ public class RecordClient extends Client {
} }
} }
private void stop() { @Override
public void close() {
synchronized (this) {
if(this.close) {
return;
}
super.close();
Log.i(RecordClient.class.getSimpleName(), "结束录制:" + this.filepath); Log.i(RecordClient.class.getSimpleName(), "结束录制:" + this.filepath);
this.active = false;
if (audioThread != null) { if (audioThread != null) {
this.audioThread.quitSafely(); this.audioThread.quitSafely();
} }
@@ -358,16 +353,8 @@ public class RecordClient extends Client {
if(this.executorService != null) { if(this.executorService != null) {
this.executorService.shutdown(); this.executorService.shutdown();
} }
synchronized (this) {
this.notifyAll(); this.notifyAll();
} this.mediaManager.closeClient();
}
@Override
public void close() {
synchronized (this) {
super.close();
this.stop();
} }
} }

View File

@@ -25,8 +25,8 @@ public class RemoteClient extends RoomClient {
*/ */
protected final Map<String, MediaStreamTrack> tracks; protected final Map<String, MediaStreamTrack> tracks;
public RemoteClient(String name, String clientId, Handler handler, ITaoyao taoyao) { public RemoteClient(String name, String clientId, ITaoyao taoyao, Handler handler) {
super(name, clientId, handler, taoyao); super(name, clientId, taoyao, handler);
this.tracks = new ConcurrentHashMap<>(); this.tracks = new ConcurrentHashMap<>();
} }

View File

@@ -30,14 +30,12 @@ import java.util.concurrent.ConcurrentHashMap;
* *
* @author acgist * @author acgist
*/ */
public class Room implements Closeable, RouterCallback { public class Room extends CloseableClient implements RouterCallback {
private final String name; private final String name;
private final String clientId; private final String clientId;
private final String roomId; private final String roomId;
private final String password; private final String password;
private final Handler handler;
private final ITaoyao taoyao;
private final boolean dataConsume; private final boolean dataConsume;
private final boolean audioConsume; private final boolean audioConsume;
private final boolean videoConsume; private final boolean videoConsume;
@@ -45,8 +43,6 @@ public class Room implements Closeable, RouterCallback {
private final boolean audioProduce; private final boolean audioProduce;
private final boolean videoProduce; private final boolean videoProduce;
private final long nativeRoomPointer; private final long nativeRoomPointer;
private final MediaManager mediaManager;
private volatile boolean enter;
private LocalClient localClient; private LocalClient localClient;
private Map<String, RemoteClient> remoteClients; private Map<String, RemoteClient> remoteClients;
private PeerConnection.RTCConfiguration rtcConfiguration; private PeerConnection.RTCConfiguration rtcConfiguration;
@@ -56,16 +52,15 @@ public class Room implements Closeable, RouterCallback {
public Room( public Room(
String name, String clientId, String name, String clientId,
String roomId, String password, String roomId, String password,
Handler handler, ITaoyao taoyao, ITaoyao taoyao, Handler handler,
boolean dataConsume, boolean audioConsume, boolean videoConsume, boolean dataConsume, boolean audioConsume, boolean videoConsume,
boolean dataProduce, boolean audioProduce, boolean videoProduce boolean dataProduce, boolean audioProduce, boolean videoProduce
) { ) {
super(taoyao, handler);
this.name = name; this.name = name;
this.clientId = clientId; this.clientId = clientId;
this.roomId = roomId; this.roomId = roomId;
this.password = password; this.password = password;
this.handler = handler;
this.taoyao = taoyao;
this.dataConsume = dataConsume; this.dataConsume = dataConsume;
this.audioConsume = audioConsume; this.audioConsume = audioConsume;
this.videoConsume = videoConsume; this.videoConsume = videoConsume;
@@ -73,21 +68,17 @@ public class Room implements Closeable, RouterCallback {
this.audioProduce = audioProduce; this.audioProduce = audioProduce;
this.videoProduce = videoProduce; this.videoProduce = videoProduce;
this.nativeRoomPointer = this.nativeNewRoom(roomId, this); this.nativeRoomPointer = this.nativeNewRoom(roomId, this);
this.mediaManager = MediaManager.getInstance();
this.remoteClients = new ConcurrentHashMap<>(); this.remoteClients = new ConcurrentHashMap<>();
this.enter = false;
} }
public synchronized void enter() { public boolean enter() {
if (this.enter) { synchronized (this) {
return; if (this.init) {
return true;
} }
final Message response = this.taoyao.request(this.taoyao.buildMessage("media::router::rtp::capabilities", "roomId", this.roomId)); super.init();
if (response == null) { this.peerConnectionFactory = this.mediaManager.newClient(MediaManager.Type.BACK);
Log.w(Room.class.getSimpleName(), "获取通道能力失败"); this.localClient = new LocalClient(this.name, this.clientId, this.taoyao, this.handler);
return;
}
this.localClient = new LocalClient(this.name, this.clientId, this.handler, this.taoyao);
this.localClient.setMediaStream(this.mediaManager.getMediaStream()); this.localClient.setMediaStream(this.mediaManager.getMediaStream());
// STUN | TURN // STUN | TURN
final List<PeerConnection.IceServer> iceServers = new ArrayList<>(); final List<PeerConnection.IceServer> iceServers = new ArrayList<>();
@@ -95,9 +86,15 @@ public class Room implements Closeable, RouterCallback {
final PeerConnection.IceServer iceServer = PeerConnection.IceServer.builder("stun:stun1.l.google.com:19302").createIceServer(); final PeerConnection.IceServer iceServer = PeerConnection.IceServer.builder("stun:stun1.l.google.com:19302").createIceServer();
iceServers.add(iceServer); iceServers.add(iceServer);
this.rtcConfiguration = new PeerConnection.RTCConfiguration(iceServers); this.rtcConfiguration = new PeerConnection.RTCConfiguration(iceServers);
this.peerConnectionFactory = this.mediaManager.newClient(MediaManager.Type.BACK); final Message response = this.taoyao.request(this.taoyao.buildMessage("media::router::rtp::capabilities", "roomId", this.roomId));
if(response == null) {
this.close();
return false;
}
final Object rtpCapabilities = MapUtils.get(response.body(), "rtpCapabilities"); final Object rtpCapabilities = MapUtils.get(response.body(), "rtpCapabilities");
this.nativeEnter(this.nativeRoomPointer, JSONUtils.toJSON(rtpCapabilities), this.peerConnectionFactory.getNativePeerConnectionFactory(), this.rtcConfiguration); this.nativeEnter(this.nativeRoomPointer, JSONUtils.toJSON(rtpCapabilities), this.peerConnectionFactory.getNativePeerConnectionFactory(), this.rtcConfiguration);
return true;
}
} }
public void mediaProduce() { public void mediaProduce() {
@@ -162,7 +159,7 @@ public class Room implements Closeable, RouterCallback {
final String clientId = MapUtils.get(body, "clientId"); final String clientId = MapUtils.get(body, "clientId");
final Map<String, Object> status = MapUtils.get(body, "status"); final Map<String, Object> status = MapUtils.get(body, "status");
final String name = MapUtils.get(status, "name"); final String name = MapUtils.get(status, "name");
final RemoteClient remoteClient = new RemoteClient(name, clientId, this.handler, this.taoyao); final RemoteClient remoteClient = new RemoteClient(name, clientId, this.taoyao, this.handler);
final RemoteClient old = this.remoteClients.put(clientId, remoteClient); final RemoteClient old = this.remoteClients.put(clientId, remoteClient);
if(old != null) { if(old != null) {
// 关闭旧的资源 // 关闭旧的资源
@@ -183,11 +180,18 @@ public class Room implements Closeable, RouterCallback {
@Override @Override
public void close() { public void close() {
synchronized (this) {
if(this.close) {
return;
}
Log.i(Room.class.getSimpleName(), "关闭房间:" + this.roomId); Log.i(Room.class.getSimpleName(), "关闭房间:" + this.roomId);
this.localClient.close(); super.close();
this.remoteClients.values().forEach(v -> this.closeRemoteClient(v.clientId));
this.mediaManager.closeClient();
this.nativeCloseRoom(this.nativeRoomPointer); this.nativeCloseRoom(this.nativeRoomPointer);
this.remoteClients.values().forEach(v -> this.closeRemoteClient(v.clientId));
this.remoteClients.clear();
this.localClient.close();
this.mediaManager.closeClient();
}
} }
public void mediaConsumerClose(String consumerId) { public void mediaConsumerClose(String consumerId) {
@@ -198,6 +202,10 @@ public class Room implements Closeable, RouterCallback {
)); ));
} }
public void mediaConsumerClose(Map<String, Object> body) {
}
public void mediaConsumerPause(String consumerId) { public void mediaConsumerPause(String consumerId) {
this.taoyao.push(this.taoyao.buildMessage( this.taoyao.push(this.taoyao.buildMessage(
"media::consumer::pause", "media::consumer::pause",
@@ -206,6 +214,10 @@ public class Room implements Closeable, RouterCallback {
)); ));
} }
public void mediaConsumerPause(Map<String, Object> body) {
}
public void mediaConsumerResume(String consumerId) { public void mediaConsumerResume(String consumerId) {
this.taoyao.push(this.taoyao.buildMessage( this.taoyao.push(this.taoyao.buildMessage(
"media::consumer::resume", "media::consumer::resume",
@@ -214,6 +226,10 @@ public class Room implements Closeable, RouterCallback {
)); ));
} }
public void mediaConsumerResume(Map<String, Object> body) {
}
public void mediaProducerClose(String producerId) { public void mediaProducerClose(String producerId) {
this.taoyao.push(this.taoyao.buildMessage( this.taoyao.push(this.taoyao.buildMessage(
"media::producer::close", "media::producer::close",
@@ -222,6 +238,10 @@ public class Room implements Closeable, RouterCallback {
)); ));
} }
public void mediaProducerClose(Map<String, Object> body) {
}
public void mediaProducerPause(String producerId) { public void mediaProducerPause(String producerId) {
this.taoyao.push(this.taoyao.buildMessage( this.taoyao.push(this.taoyao.buildMessage(
"media::producer::pause", "media::producer::pause",
@@ -230,6 +250,10 @@ public class Room implements Closeable, RouterCallback {
)); ));
} }
public void mediaProducerPause(Map<String, Object> body) {
}
public void mediaProducerResume(String producerId) { public void mediaProducerResume(String producerId) {
this.taoyao.push(this.taoyao.buildMessage( this.taoyao.push(this.taoyao.buildMessage(
"media::producer::resume", "media::producer::resume",
@@ -238,6 +262,10 @@ public class Room implements Closeable, RouterCallback {
)); ));
} }
public void mediaProducerResume(Map<String, Object> body) {
}
@Override @Override
public void enterCallback(String rtpCapabilities, String sctpCapabilities) { public void enterCallback(String rtpCapabilities, String sctpCapabilities) {
this.taoyao.request(this.taoyao.buildMessage( this.taoyao.request(this.taoyao.buildMessage(
@@ -247,7 +275,6 @@ public class Room implements Closeable, RouterCallback {
"rtpCapabilities", rtpCapabilities, "rtpCapabilities", rtpCapabilities,
"sctpCapabilities", sctpCapabilities "sctpCapabilities", sctpCapabilities
)); ));
this.enter = true;
} }
@Override @Override

View File

@@ -14,8 +14,8 @@ import org.webrtc.MediaStreamTrack;
*/ */
public class RoomClient extends Client { public class RoomClient extends Client {
public RoomClient(String name, String clientId, Handler handler, ITaoyao taoyao) { public RoomClient(String name, String clientId, ITaoyao taoyao, Handler handler) {
super(name, clientId, handler, taoyao); super(name, clientId, taoyao, handler);
} }
@Override @Override

View File

@@ -1,7 +1,6 @@
package com.acgist.taoyao.media.client; package com.acgist.taoyao.media.client;
import android.os.Handler; import android.os.Handler;
import android.se.omapi.Session;
import android.util.Log; import android.util.Log;
import com.acgist.taoyao.boot.model.Message; import com.acgist.taoyao.boot.model.Message;
@@ -23,7 +22,6 @@ import org.webrtc.VideoTrack;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
/** /**
* P2P终端 * P2P终端
@@ -64,12 +62,18 @@ public class SessionClient extends Client {
*/ */
private PeerConnectionFactory peerConnectionFactory; private PeerConnectionFactory peerConnectionFactory;
public SessionClient(String sessionId, String name, String clientId, Handler handler, ITaoyao taoyao) { public SessionClient(String sessionId, String name, String clientId, ITaoyao taoyao, Handler handler) {
super(name, clientId, handler, taoyao); super(name, clientId, taoyao, handler);
this.sessionId = sessionId; this.sessionId = sessionId;
} }
@Override
public void init() { public void init() {
synchronized (this) {
if(this.init) {
return;
}
super.init();
this.peerConnectionFactory = this.mediaManager.newClient(MediaManager.Type.BACK); this.peerConnectionFactory = this.mediaManager.newClient(MediaManager.Type.BACK);
// STUN | TURN // STUN | TURN
final List<PeerConnection.IceServer> iceServers = new ArrayList<>(); final List<PeerConnection.IceServer> iceServers = new ArrayList<>();
@@ -83,6 +87,7 @@ public class SessionClient extends Client {
this.peerConnection = this.peerConnectionFactory.createPeerConnection(configuration, this.observer); this.peerConnection = this.peerConnectionFactory.createPeerConnection(configuration, this.observer);
this.peerConnection.addStream(this.mediaStream); this.peerConnection.addStream(this.mediaStream);
} }
}
public void exchange(Message message, Map<String, Object> body) { public void exchange(Message message, Map<String, Object> body) {
final String type = MapUtils.get(body, "type"); final String type = MapUtils.get(body, "type");
@@ -153,6 +158,27 @@ public class SessionClient extends Client {
} }
} }
@Override
public void playAudio() {
super.playAudio();
if(this.remoteMediaStream == null) {
return;
}
this.remoteMediaStream.audioTracks.forEach(v -> v.setEnabled(true));
}
@Override
public void pauseAudio() {
super.pauseAudio();
this.remoteMediaStream.audioTracks.forEach(v -> v.setEnabled(false));
}
@Override
public void resumeAudio() {
super.resumeAudio();
this.remoteMediaStream.audioTracks.forEach(v -> v.setEnabled(true));
}
@Override @Override
public void playVideo() { public void playVideo() {
super.playVideo(); super.playVideo();
@@ -168,21 +194,41 @@ public class SessionClient extends Client {
} }
@Override @Override
public void playAudio() { public void pauseVideo() {
super.playAudio(); super.pauseVideo();
if(this.remoteMediaStream == null) { this.mediaStream.videoTracks.forEach(v -> v.setEnabled(false));
return;
} }
this.remoteMediaStream.audioTracks.forEach(v -> v.setEnabled(true));
@Override
public void resumeVideo() {
super.resumeVideo();
this.mediaStream.videoTracks.forEach(v -> v.setEnabled(true));
}
@Override
public void pause() {
super.pause();
this.pauseAudio();
this.pauseVideo();
}
@Override
public void resume() {
super.resume();
this.resumeAudio();
this.resumeVideo();
} }
@Override @Override
public void close() { public void close() {
synchronized (this) {
if(this.close) {
return;
}
super.close(); super.close();
// 本地资源释放不要直接关闭MediaStream共享资源
this.mediaManager.closeClient();
// 远程资源释放
this.remoteMediaStream.dispose(); this.remoteMediaStream.dispose();
this.mediaManager.closeClient();
}
} }
/** /**

View File

@@ -237,6 +237,8 @@ class Session {
name; name;
// 远程终端ID // 远程终端ID
clientId; clientId;
// 本地媒体流
localStream;
// 本地音频 // 本地音频
localAudioTrack; localAudioTrack;
// 本地视频 // 本地视频
@@ -258,6 +260,27 @@ class Session {
this.clientId = clientId; this.clientId = clientId;
} }
async pause() {
this.localAudioTrack.enabled = false;
this.localVideoTrack.enabled = false;
this.localStream.active = false;
}
async resume() {
this.localAudioTrack.enabled = true;
this.localVideoTrack.enabled = true;
this.localStream.active = true;
}
async close() {
this.localStream.active = false;
this.localAudioTrack.stop();
this.localVideoTrack.stop();
this.remoteAudioTrack.stop();
this.remoteVideoTrack.stop();
this.peerConnection.close();
}
} }
/** /**
@@ -605,6 +628,12 @@ class Taoyao extends RemoteClient {
case "session::exchange": case "session::exchange":
me.defaultSessionExchange(message); me.defaultSessionExchange(message);
break; break;
case "session::pause":
me.defaultSessionPause(message);
break;
case "session::resume":
me.defaultSessionResume(message);
break;
case "room::client::list": case "room::client::list":
me.defaultRoomClientList(message); me.defaultRoomClientList(message);
break; break;
@@ -2122,6 +2151,7 @@ class Taoyao extends RemoteClient {
this.sessionClients.set(sessionId, session); this.sessionClients.set(sessionId, session);
session.peerConnection = await me.buildPeerConnection(session, sessionId); session.peerConnection = await me.buildPeerConnection(session, sessionId);
const localStream = await me.getStream(); const localStream = await me.getStream();
session.localStream = localStream;
session.localAudioTrack = localStream.getAudioTracks()[0]; session.localAudioTrack = localStream.getAudioTracks()[0];
session.localVideoTrack = localStream.getVideoTracks()[0]; session.localVideoTrack = localStream.getVideoTracks()[0];
// 相同Stream音视频同步 // 相同Stream音视频同步
@@ -2139,10 +2169,24 @@ class Taoyao extends RemoteClient {
}); });
} }
async sessionClose() { async sessionClose(sessionId) {
const me = this;
me.push(protocol.buildMessage("session::close", {
sessionId
}));
} }
async defaultSessionClose(message) { async defaultSessionClose(message) {
const me = this;
const { sessionId } = message.body;
const session = me.sessionClients.get(sessionId);
if(session) {
console.debug("会话关闭", sessionId);
session.close();
me.sessionClients.delete(sessionId);
} else {
console.debug("关闭会话无效", sessionId);
}
} }
async defaultSessionExchange(message) { async defaultSessionExchange(message) {
@@ -2170,6 +2214,24 @@ class Taoyao extends RemoteClient {
} }
} }
async defaultSessionPause(message) {
const me = this;
const { sessionId } = message.body;
const session = me.sessionClients.get(sessionId);
if(session) {
session.pause();
}
}
async defaultSessionResume(message) {
const me = this;
const { sessionId } = message.body;
const session = me.sessionClients.get(sessionId);
if(session) {
session.resume();
}
}
async buildPeerConnection(session, sessionId) { async buildPeerConnection(session, sessionId) {
const me = this; const me = this;
const peerConnection = new RTCPeerConnection({"iceServers" : [{"url" : "stun:stun1.l.google.com:19302"}]}); const peerConnection = new RTCPeerConnection({"iceServers" : [{"url" : "stun:stun1.l.google.com:19302"}]});

View File

@@ -1,6 +1,6 @@
import fs from "node:fs"; import fs from "node:fs";
import { defineConfig } from "vite";
import vue from "@vitejs/plugin-vue"; import vue from "@vitejs/plugin-vue";
import { defineConfig } from "vite";
import { fileURLToPath, URL } from "node:url"; import { fileURLToPath, URL } from "node:url";
export default defineConfig({ export default defineConfig({

View File

@@ -27,11 +27,26 @@ public class Session implements Closeable {
* 接收者 * 接收者
*/ */
private final Client target; private final Client target;
/**
* P2P会话管理器
*/
private final SessionManager sessionManager;
public Session(String id, Client source, Client target) { public Session(String id, Client source, Client target, SessionManager sessionManager) {
this.id = id; this.id = id;
this.source = source; this.source = source;
this.target = target; this.target = target;
this.sessionManager = sessionManager;
}
/**
* 推送消息
*
* @param message 消息
*/
public void push(Message message) {
this.source.push(message);
this.target.push(message);
} }
/** /**
@@ -40,7 +55,7 @@ public class Session implements Closeable {
* @param clientId 当前终端ID * @param clientId 当前终端ID
* @param message 消息 * @param message 消息
*/ */
public void pushRemote(String clientId, Message message) { public void pushOther(String clientId, Message message) {
if(this.source.clientId().equals(clientId)) { if(this.source.clientId().equals(clientId)) {
this.target.push(message); this.target.push(message);
} else { } else {
@@ -50,7 +65,7 @@ public class Session implements Closeable {
@Override @Override
public void close() { public void close() {
this.sessionManager.remove(this.id);
} }

View File

@@ -30,7 +30,7 @@ public class SessionManager {
* @return 会话 * @return 会话
*/ */
public Session call(Client source, Client target) { public Session call(Client source, Client target) {
final Session session = new Session(this.idService.buildUuid(), source, target); final Session session = new Session(this.idService.buildUuid(), source, target, this);
this.sessions.put(session.getId(), session); this.sessions.put(session.getId(), session);
return session; return session;
} }
@@ -44,4 +44,13 @@ public class SessionManager {
return this.sessions.get(sessionId); return this.sessions.get(sessionId);
} }
/**
* @param sessionId 会话ID
*
* @return 会话
*/
public Session remove(String sessionId) {
return this.sessions.remove(sessionId);
}
} }

View File

@@ -45,7 +45,10 @@ public class RoomLeaveProtocol extends ProtocolRoomAdapter implements Applicatio
public void onApplicationEvent(RoomLeaveEvent event) { public void onApplicationEvent(RoomLeaveEvent event) {
final Room room = event.getRoom(); final Room room = event.getRoom();
final Client client = event.getClient(); final Client client = event.getClient();
final Map<String, String> body = Map.of(Constant.CLIENT_ID, client.clientId()); final Map<String, String> body = Map.of(
Constant.ROOM_ID, room.getRoomId(),
Constant.CLIENT_ID, client.clientId()
);
room.broadcast(client, this.build(body)); room.broadcast(client, this.build(body));
} }

View File

@@ -21,7 +21,7 @@ import com.acgist.taoyao.signal.protocol.ProtocolSessionAdapter;
{ {
} }
""", """,
flow = "终端->信令服务->终端" flow = "终端->信令服务+)终端"
) )
public class SessionCloseProtocol extends ProtocolSessionAdapter { public class SessionCloseProtocol extends ProtocolSessionAdapter {
@@ -33,6 +33,8 @@ public class SessionCloseProtocol extends ProtocolSessionAdapter {
@Override @Override
public void execute(String clientId, ClientType clientType, Session session, Client client, Message message, Map<String, Object> body) { public void execute(String clientId, ClientType clientType, Session session, Client client, Message message, Map<String, Object> body) {
session.push(message);
session.close();
} }
} }

View File

@@ -34,7 +34,7 @@ public class SessionExchangeProtocol extends ProtocolSessionAdapter {
@Override @Override
public void execute(String clientId, ClientType clientType, Session session, Client client, Message message, Map<String, Object> body) { public void execute(String clientId, ClientType clientType, Session session, Client client, Message message, Map<String, Object> body) {
session.pushRemote(clientId, message); session.pushOther(clientId, message);
} }
} }

View File

@@ -35,7 +35,7 @@ public class SessionPauseProtocol extends ProtocolSessionAdapter {
@Override @Override
public void execute(String clientId, ClientType clientType, Session session, Client client, Message message, Map<String, Object> body) { public void execute(String clientId, ClientType clientType, Session session, Client client, Message message, Map<String, Object> body) {
session.pushRemote(clientId, message); session.pushOther(clientId, message);
} }
} }

View File

@@ -35,7 +35,7 @@ public class SessionResumeProtocol extends ProtocolSessionAdapter {
@Override @Override
public void execute(String clientId, ClientType clientType, Session session, Client client, Message message, Map<String, Object> body) { public void execute(String clientId, ClientType clientType, Session session, Client client, Message message, Map<String, Object> body) {
session.pushRemote(clientId, message); session.pushOther(clientId, message);
} }
} }