[+] 调整媒体关闭整体逻辑

This commit is contained in:
acgist
2023-03-07 08:13:18 +08:00
parent 01810fba20
commit f53fe0eb20
24 changed files with 419 additions and 126 deletions

View File

@@ -12,4 +12,3 @@
## 信令格式 ## 信令格式
[信令格式](https://localhost:8888/protocol/list) [信令格式](https://localhost:8888/protocol/list)

View File

@@ -10,6 +10,12 @@ import java.lang.annotation.Target;
/** /**
* 信令描述 * 信令描述
* *
* -[消息类型]> 异步请求 | 单播
* =[消息类型]> 同步请求
* -[消息类型]) 全员广播:对所有的终端广播信令(排除自己)
* +[消息类型]) 全员广播:对所有的终端广播信令(包含自己)
* ...:其他自定义的透传内容
*
* @author acgist * @author acgist
*/ */
@Target(ElementType.TYPE) @Target(ElementType.TYPE)
@@ -24,6 +30,9 @@ public @interface Description {
String[] body() default { "{}" }; String[] body() default { "{}" };
/** /**
* 同步:需要等待服务端数据时使用
* 异步:不用等待服务端数据时使用(服务端能主动通知类型消息都能使用异步)
*
* @return 数据流向 * @return 数据流向
*/ */
String[] flow() default { "终端->信令服务->终端" }; String[] flow() default { "终端->信令服务->终端" };

View File

@@ -26,6 +26,41 @@ public enum ClientType {
this.name = name; this.name = name;
} }
/**
* @return 是否是Web
*/
public boolean web() {
return this == WEB;
}
/**
* @return 是否是媒体服务
*/
public boolean media() {
return this == MEDIA;
}
/**
* @return 是否是摄像头
*/
public boolean camera() {
return this == CAMERA;
}
/**
* @return 是否是媒体终端
*/
public boolean mediaClient() {
return this == WEB || this == CAMERA;
}
/**
* @return 是否是媒体服务
*/
public boolean mediaServer() {
return this == MEDIA;
}
/** /**
* @param value 类型 * @param value 类型
* *

View File

@@ -1,12 +1,17 @@
package com.acgist.taoyao.signal.configuration; package com.acgist.taoyao.signal.configuration;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner; import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.autoconfigure.AutoConfiguration; import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import com.acgist.taoyao.boot.runner.OrderedCommandLineRunner; import com.acgist.taoyao.boot.runner.OrderedCommandLineRunner;
import com.acgist.taoyao.signal.event.EventPublisher;
import com.acgist.taoyao.signal.protocol.ProtocolManager; import com.acgist.taoyao.signal.protocol.ProtocolManager;
import jakarta.annotation.PostConstruct;
/** /**
* 信令自动配置 * 信令自动配置
* *
@@ -15,6 +20,14 @@ import com.acgist.taoyao.signal.protocol.ProtocolManager;
@AutoConfiguration @AutoConfiguration
public class SignalAutoConfiguration { public class SignalAutoConfiguration {
@Autowired
private ApplicationContext applicationContext;
@PostConstruct
public void init() {
EventPublisher.setApplicationContext(this.applicationContext);
}
@Bean @Bean
public CommandLineRunner signalCommandLineRunner(ProtocolManager protocolManager) { public CommandLineRunner signalCommandLineRunner(ProtocolManager protocolManager) {
return new OrderedCommandLineRunner() { return new OrderedCommandLineRunner() {

View File

@@ -0,0 +1,29 @@
package com.acgist.taoyao.signal.event;
import com.acgist.taoyao.signal.party.media.Room;
import lombok.Getter;
import lombok.Setter;
/**
* 关闭消费者事件
*
* @author acgist
*/
@Getter
@Setter
public class ConsumerCloseEvent extends RoomEventAdapter {
private static final long serialVersionUID = 1L;
/**
* 消费者ID
*/
private final String consumerId;
public ConsumerCloseEvent(String consumerId, Room room) {
super(room);
this.consumerId = consumerId;
}
}

View File

@@ -0,0 +1,34 @@
package com.acgist.taoyao.signal.event;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationEvent;
/**
* 事件法布者
*
* @author acgist
*/
public class EventPublisher {
/**
* 上下文
*/
private static ApplicationContext applicationContext;
/**
* @param applicationContext 上下文
*/
public static final void setApplicationContext(ApplicationContext applicationContext) {
EventPublisher.applicationContext = applicationContext;
}
/**
* 发布事件
*
* @param event 事件
*/
public static final void publishEvent(ApplicationEvent event) {
EventPublisher.applicationContext.publishEvent(event);
}
}

View File

@@ -0,0 +1,29 @@
package com.acgist.taoyao.signal.event;
import com.acgist.taoyao.signal.party.media.Room;
import lombok.Getter;
import lombok.Setter;
/**
* 关闭生产者事件
*
* @author acgist
*/
@Getter
@Setter
public class ProducerCloseEvent extends RoomEventAdapter {
private static final long serialVersionUID = 1L;
/**
* 生产者ID
*/
private final String producerId;
public ProducerCloseEvent(String producerId, Room room) {
super(room);
this.producerId = producerId;
}
}

View File

@@ -10,7 +10,7 @@ import lombok.Getter;
import lombok.Setter; import lombok.Setter;
/** /**
* 终端包装器 * 终端包装器Peer
* *
* @author acgist * @author acgist
*/ */
@@ -43,7 +43,7 @@ public class ClientWrapper implements AutoCloseable {
return SubscribeType.ALL; return SubscribeType.ALL;
} }
public boolean consume(Producer producer) { public boolean canConsume(Producer producer) {
return switch (this) { return switch (this) {
case NONE -> false; case NONE -> false;
case ALL_AUDIO -> producer.getKind() == Kind.AUDIO; case ALL_AUDIO -> producer.getKind() == Kind.AUDIO;
@@ -94,7 +94,7 @@ public class ClientWrapper implements AutoCloseable {
private Transport recvTransport; private Transport recvTransport;
/** /**
* 生产者 * 生产者
* 生产者里面的消费者是其他终端消费当前终端的消费者 * 其他终端消费当前终端的消费者
*/ */
private final Map<String, Producer> producers; private final Map<String, Producer> producers;
/** /**
@@ -144,28 +144,19 @@ public class ClientWrapper implements AutoCloseable {
* *
* @return 是否已经消费 * @return 是否已经消费
*/ */
public boolean consume(Producer producer) { public boolean consumed(Producer producer) {
return this.consumers.values().stream() return this.consumers.values().stream()
.anyMatch(v -> v.getProducer() == producer); .anyMatch(v -> v.getProducer() == producer);
} }
/**
* 删除消费者
*
* @param wrapper 消费者终端保证期
*/
public void remove(ClientWrapper wrapper) {
this.consumers.entrySet().stream()
.filter(v -> v.getValue().getProducer().getProduceClient() == wrapper)
.map(Map.Entry::getKey)
.forEach(this.consumers::remove);
// TODO资源释放
this.producers.values().forEach(v -> v.remove(wrapper));
}
@Override @Override
public void close() throws Exception { public void close() throws Exception {
// TODO释放资源 // TODO释放资源:通道、消费者、生产者
this.consumers.forEach((k, v) -> v.close());
this.producers.forEach((k, v) -> v.close());
// TODO实现
this.recvTransport.close();
this.sendTransport.close();
} }
} }

View File

@@ -1,25 +1,28 @@
package com.acgist.taoyao.signal.party.media; package com.acgist.taoyao.signal.party.media;
import java.io.Closeable;
import com.acgist.taoyao.signal.event.ConsumerCloseEvent;
import com.acgist.taoyao.signal.event.EventPublisher;
import lombok.Getter; import lombok.Getter;
import lombok.Setter; import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
/** /**
* 消费者 * 消费者
* *
* @author acgist * @author acgist
*/ */
@Slf4j
@Getter @Getter
@Setter @Setter
public class Consumer { public class Consumer implements Closeable {
/** /**
* 消费者终端 * 是否关闭
*/ */
private final ClientWrapper consumeClient; private volatile boolean close = false;
/**
* 生产者
*/
private final Producer producer;
/** /**
* 媒体类型 * 媒体类型
*/ */
@@ -32,13 +35,38 @@ public class Consumer {
* 消费者标识 * 消费者标识
*/ */
private final String consumerId; private final String consumerId;
/**
* 房间
*/
private final Room room;
/**
* 生产者
*/
private final Producer producer;
/**
* 消费者终端
*/
private final ClientWrapper consumerClient;
public Consumer(ClientWrapper consumeClient, Producer producer, String kind, String streamId, String consumerId) { public Consumer(String kind, String streamId, String consumerId, Room room, Producer producer, ClientWrapper consumerClient) {
this.consumeClient = consumeClient;
this.producer = producer;
this.kind = Kind.of(kind); this.kind = Kind.of(kind);
this.streamId = streamId; this.streamId = streamId;
this.consumerId = consumerId; this.consumerId = consumerId;
this.room = room;
this.producer = producer;
this.consumerClient = consumerClient;
}
@Override
public void close() {
if(this.close) {
return;
}
this.close = true;
log.info("关闭消费者:{}", this.consumerId);
this.getProducer().remove(this.consumerId);
this.consumerClient.getConsumers().remove(this.consumerId);
EventPublisher.publishEvent(new ConsumerCloseEvent(this.consumerId, this.room));
} }
} }

View File

@@ -1,24 +1,30 @@
package com.acgist.taoyao.signal.party.media; package com.acgist.taoyao.signal.party.media;
import java.io.Closeable;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import com.acgist.taoyao.signal.event.EventPublisher;
import com.acgist.taoyao.signal.event.ProducerCloseEvent;
import lombok.Getter; import lombok.Getter;
import lombok.Setter; import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
/** /**
* 生产者 * 生产者
* *
* @author acgist * @author acgist
*/ */
@Slf4j
@Getter @Getter
@Setter @Setter
public class Producer { public class Producer implements Closeable {
/** /**
* 生产者终端 * 是否关闭
*/ */
private final ClientWrapper produceClient; private volatile boolean close = false;
/** /**
* 媒体类型 * 媒体类型
*/ */
@@ -31,30 +37,48 @@ public class Producer {
* 生产者标识 * 生产者标识
*/ */
private final String producerId; private final String producerId;
/**
* 房间
*/
private final Room room;
/**
* 生产者终端
*/
private final ClientWrapper producerClient;
/** /**
* 消费者 * 消费者
* 其他终端消费当前终端的消费者
*/ */
private final Map<String, Consumer> consumers; private final Map<String, Consumer> consumers;
public Producer(ClientWrapper produceClient, String kind, String streamId, String producerId) { public Producer(String kind, String streamId, String producerId, Room room, ClientWrapper produceClient) {
this.produceClient = produceClient;
this.kind = Kind.of(kind); this.kind = Kind.of(kind);
this.streamId = streamId; this.streamId = streamId;
this.producerId = producerId; this.producerId = producerId;
this.room = room;
this.producerClient = produceClient;
this.consumers = new ConcurrentHashMap<>(); this.consumers = new ConcurrentHashMap<>();
} }
/** /**
* 删除消费者 * 删除消费者
* *
* @param consumer 消费者 * @param consumerId 消费者ID
*/ */
public void remove(ClientWrapper consumer) { public void remove(String consumerId) {
this.consumers.entrySet().stream() this.consumers.remove(consumerId);
.filter(v -> v.getValue().getConsumeClient() == consumer) }
.map(Map.Entry::getKey)
.forEach(this.consumers::remove); @Override
// TODO资源释放 public void close() {
if(this.close) {
return;
}
this.close = true;
log.info("关闭生产者:{}", this.producerId);
this.consumers.forEach((k, v) -> v.close());
this.producerClient.getProducers().remove(this.producerId);
EventPublisher.publishEvent(new ProducerCloseEvent(this.producerId, this.room));
} }
} }

View File

@@ -6,11 +6,10 @@ import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import org.springframework.context.ApplicationContext;
import com.acgist.taoyao.boot.model.Message; import com.acgist.taoyao.boot.model.Message;
import com.acgist.taoyao.signal.client.Client; import com.acgist.taoyao.signal.client.Client;
import com.acgist.taoyao.signal.client.ClientStatus; import com.acgist.taoyao.signal.client.ClientStatus;
import com.acgist.taoyao.signal.event.EventPublisher;
import com.acgist.taoyao.signal.event.RoomLeaveEvent; import com.acgist.taoyao.signal.event.RoomLeaveEvent;
import lombok.Getter; import lombok.Getter;
@@ -28,6 +27,10 @@ import lombok.extern.slf4j.Slf4j;
@Setter @Setter
public class Room implements Closeable { public class Room implements Closeable {
/**
* 是否关闭
*/
private volatile boolean close = false;
/** /**
* 房间标识 * 房间标识
*/ */
@@ -49,10 +52,6 @@ public class Room implements Closeable {
* 房间管理 * 房间管理
*/ */
private final RoomManager roomManager; private final RoomManager roomManager;
/**
* 系统上下文
*/
private final ApplicationContext applicationContext;
/** /**
* 终端 * 终端
*/ */
@@ -61,11 +60,10 @@ public class Room implements Closeable {
/** /**
* @param mediaClient 媒体服务 * @param mediaClient 媒体服务
*/ */
public Room(Client mediaClient, RoomManager roomManager, ApplicationContext applicationContext) { public Room(Client mediaClient, RoomManager roomManager) {
this.roomStatus = new RoomStatus(); this.roomStatus = new RoomStatus();
this.mediaClient = mediaClient; this.mediaClient = mediaClient;
this.roomManager = roomManager; this.roomManager = roomManager;
this.applicationContext = applicationContext;
this.clients = new ConcurrentHashMap<>(); this.clients = new ConcurrentHashMap<>();
} }
@@ -101,7 +99,6 @@ public class Room implements Closeable {
/** /**
* 终端离开 * 终端离开
* 立即释放所有资源
* *
* @param client 终端 * @param client 终端
*/ */
@@ -109,18 +106,13 @@ public class Room implements Closeable {
synchronized (this.clients) { synchronized (this.clients) {
final ClientWrapper wrapper = this.clients.remove(client); final ClientWrapper wrapper = this.clients.remove(client);
if(wrapper != null) { if(wrapper != null) {
this.roomStatus.setClientSize(this.roomStatus.getClientSize() - 1);
// 删除消费者
this.clients.values().stream()
.filter(v -> v != wrapper)
.forEach(v -> v.remove(wrapper));
// TODO删除数据消费者
this.applicationContext.publishEvent(new RoomLeaveEvent(this, client));
try { try {
wrapper.close(); wrapper.close();
} catch (Exception e) { } catch (Exception e) {
log.error("终端关闭异常", e); log.error("关闭终端代理异常:{}", wrapper.getClientId(), e);
} }
this.roomStatus.setClientSize(this.roomStatus.getClientSize() - 1);
EventPublisher.publishEvent(new RoomLeaveEvent(this, client));
} }
} }
} }
@@ -145,6 +137,17 @@ public class Room implements Closeable {
return this.mediaClient.request(message); return this.mediaClient.request(message);
} }
/**
* 广播消息
* 所有终端以及媒体服务
*
* @param message 消息
*/
public void broadcastAll(Message message) {
this.broadcast(message);
this.mediaClient.push(message);
}
/** /**
* 广播消息 * 广播消息
* *
@@ -200,9 +203,9 @@ public class Room implements Closeable {
} }
/** /**
* @param producerId 生产者ID
* *
* @param producerId * @return 生产者
* @return
*/ */
public Producer producer(String producerId) { public Producer producer(String producerId) {
return this.clients.values().stream() return this.clients.values().stream()
@@ -212,11 +215,28 @@ public class Room implements Closeable {
.orElse(null); .orElse(null);
} }
/**
* @param consumerId 消费者ID
*
* @return 消费者
*/
public Consumer consumer(String consumerId) {
return this.clients.values().stream()
.map(wrapper -> wrapper.getConsumers().get(consumerId))
.filter(Objects::nonNull)
.findFirst()
.orElse(null);
}
@Override @Override
public void close() { public void close() {
if(this.close) {
return;
}
this.close = true;
log.info("关闭房间:{}", this.roomId); log.info("关闭房间:{}", this.roomId);
// TODO关闭房间 // TODO关闭房间
// TODO:媒体服务 // TODO媒体服务:直接没提服务关闭所有资源(通道、生产者、消费者)
this.roomManager.remove(this); this.roomManager.remove(this);
} }

View File

@@ -5,8 +5,6 @@ import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
import org.springframework.context.ApplicationContext;
import com.acgist.taoyao.boot.annotation.Manager; import com.acgist.taoyao.boot.annotation.Manager;
import com.acgist.taoyao.boot.config.Constant; import com.acgist.taoyao.boot.config.Constant;
import com.acgist.taoyao.boot.model.Message; import com.acgist.taoyao.boot.model.Message;
@@ -28,17 +26,15 @@ public class RoomManager {
private final IdService idService; private final IdService idService;
private final ClientManager clientManager; private final ClientManager clientManager;
private final ApplicationContext applicationContext;
/** /**
* 房间列表 * 房间列表
*/ */
private final List<Room> rooms; private final List<Room> rooms;
public RoomManager(IdService idService, ClientManager clientManager, ApplicationContext applicationContext) { public RoomManager(IdService idService, ClientManager clientManager) {
this.idService = idService; this.idService = idService;
this.clientManager = clientManager; this.clientManager = clientManager;
this.applicationContext = applicationContext;
this.rooms = new CopyOnWriteArrayList<>(); this.rooms = new CopyOnWriteArrayList<>();
} }
@@ -121,7 +117,7 @@ public class RoomManager {
} }
final String roomId = this.idService.buildUuid(); final String roomId = this.idService.buildUuid();
// 房间 // 房间
final Room room = new Room(mediaClient, this, this.applicationContext); final Room room = new Room(mediaClient, this);
room.setRoomId(roomId); room.setRoomId(roomId);
room.setPassword(password); room.setPassword(password);
// 状态 // 状态

View File

@@ -39,9 +39,21 @@ public class Transport implements Closeable {
* 通道标识 * 通道标识
*/ */
private final String transportId; private final String transportId;
/**
* ICE协商
*/
private Object iceCandidates; private Object iceCandidates;
/**
* ICE参数
*/
private Object iceParameters; private Object iceParameters;
/**
* DTLS参数
*/
private Object dtlsParameters; private Object dtlsParameters;
/**
* SCTP参数
*/
private Object sctpParameters; private Object sctpParameters;
public Transport(String transportId, Room room, Client client) { public Transport(String transportId, Room room, Client client) {
@@ -66,6 +78,7 @@ public class Transport implements Closeable {
@Override @Override
public void close() { public void close() {
// TODO发送事件
} }
} }

View File

@@ -73,7 +73,7 @@ public class ClientConfigProtocol extends ProtocolClientAdapter implements Appli
// 日期时间 // 日期时间
config.put(Constant.DATETIME, DateUtils.format(LocalDateTime.now(), DateTimeStyle.YYYYMMDDHH24MMSS)); config.put(Constant.DATETIME, DateUtils.format(LocalDateTime.now(), DateTimeStyle.YYYYMMDDHH24MMSS));
// Web、摄像头媒体配置 // Web、摄像头媒体配置
if(clientType == ClientType.WEB || clientType == ClientType.CAMERA) { if(clientType.mediaClient()) {
config.put(Constant.MEDIA, this.mediaProperties); config.put(Constant.MEDIA, this.mediaProperties);
config.put(Constant.WEBRTC, this.webrtcProperties); config.put(Constant.WEBRTC, this.webrtcProperties);
} else { } else {

View File

@@ -96,7 +96,7 @@ public class ClientRegisterProtocol extends ProtocolClientAdapter {
// 终端上线事件 // 终端上线事件
this.publishEvent(new ClientOnlineEvent(client)); this.publishEvent(new ClientOnlineEvent(client));
// 媒体服务注册事件 // 媒体服务注册事件
if(clientType == ClientType.MEDIA) { if(clientType.mediaServer()) {
this.publishEvent(new MediaClientRegisterEvent(client)); this.publishEvent(new MediaClientRegisterEvent(client));
} }
} }

View File

@@ -41,7 +41,7 @@ public class MediaAudioVolumeProtocol extends ProtocolRoomAdapter {
@Override @Override
public void execute(String clientId, ClientType clientType, Room room, Client client, Client mediaClient, Message message, Map<String, Object> body) { public void execute(String clientId, ClientType clientType, Room room, Client client, Client mediaClient, Message message, Map<String, Object> body) {
if(clientType == ClientType.MEDIA) { if(clientType.mediaServer()) {
room.broadcast(message); room.broadcast(message);
} else { } else {
this.logNoAdapter(clientType); this.logNoAdapter(clientType);

View File

@@ -33,7 +33,7 @@ import lombok.extern.slf4j.Slf4j;
@Protocol @Protocol
@Description( @Description(
memo = """ memo = """
消费媒体:主动消费、终端生媒体、终端创建WebRTC消费通道 消费媒体:主动消费、终端生媒体、终端创建WebRTC消费通道
终端生产媒体当前房间所有终端根据订阅类型自动消费媒体 终端生产媒体当前房间所有终端根据订阅类型自动消费媒体
终端创建WebRTC消费通道根据订阅类型自动消费当前房间已有媒体 终端创建WebRTC消费通道根据订阅类型自动消费当前房间已有媒体
""", """,
@@ -58,19 +58,19 @@ public class MediaConsumeProtocol extends ProtocolRoomAdapter implements Applica
if(event.getProducer() != null) { if(event.getProducer() != null) {
// 生产媒体:其他终端消费 // 生产媒体:其他终端消费
final Producer producer = event.getProducer(); final Producer producer = event.getProducer();
final ClientWrapper produceClientWrapper = producer.getProduceClient(); final ClientWrapper produceClientWrapper = producer.getProducerClient();
room.getClients().values().stream() room.getClients().values().stream()
.filter(v -> v != produceClientWrapper) .filter(v -> v != produceClientWrapper)
.filter(v -> v.getSubscribeType().consume(producer)) .filter(v -> v.getSubscribeType().canConsume(producer))
.forEach(v -> this.consume(room, v, producer)); .forEach(v -> this.consume(room, v, producer, this.build()));
} else if(event.getClientWrapper() != null) { } else if(event.getClientWrapper() != null) {
// 创建WebRTC消费通道消费其他终端 // 创建WebRTC消费通道消费其他终端
final ClientWrapper consumeClientWrapper = event.getClientWrapper(); final ClientWrapper consumeClientWrapper = event.getClientWrapper();
room.getClients().values().stream() room.getClients().values().stream()
.filter(v -> v != consumeClientWrapper) .filter(v -> v != consumeClientWrapper)
.flatMap(v -> v.getProducers().values().stream()) .flatMap(v -> v.getProducers().values().stream())
.filter(v -> consumeClientWrapper.getSubscribeType().consume(v)) .filter(v -> consumeClientWrapper.getSubscribeType().canConsume(v))
.forEach(producer -> this.consume(room, consumeClientWrapper, producer)); .forEach(producer -> this.consume(room, consumeClientWrapper, producer, this.build()));
} else { } else {
throw MessageCodeException.of("消费媒体失败"); throw MessageCodeException.of("消费媒体失败");
} }
@@ -80,26 +80,25 @@ public class MediaConsumeProtocol extends ProtocolRoomAdapter implements Applica
public void execute(String clientId, ClientType clientType, Room room, Client client, Client mediaClient, Message message, Map<String, Object> body) { public void execute(String clientId, ClientType clientType, Room room, Client client, Client mediaClient, Message message, Map<String, Object> body) {
final String producerId = MapUtils.get(body, Constant.PRODUCER_ID); final String producerId = MapUtils.get(body, Constant.PRODUCER_ID);
final Producer producer = room.producer(producerId); final Producer producer = room.producer(producerId);
if(clientType == ClientType.WEB || clientType == ClientType.CAMERA) { if(clientType.mediaClient()) {
// 请求消费媒体 // 主动请求消费 || 消费通道准备就绪
this.consume(room, room.clientWrapper(client), producer); this.consume(room, room.clientWrapper(client), producer, message);
} else if(clientType == ClientType.MEDIA) { } else if(clientType.mediaServer()) {
// 通知终端准备:准备完成再次请求消费媒体结束媒体服务等待 // 媒体通道准备就绪
final String kind = MapUtils.get(body, Constant.KIND); final String kind = MapUtils.get(body, Constant.KIND);
final String streamId = MapUtils.get(body, Constant.STREAM_ID); final String streamId = MapUtils.get(body, Constant.STREAM_ID);
final String consumerId = MapUtils.get(body, Constant.CONSUMER_ID); final String consumerId = MapUtils.get(body, Constant.CONSUMER_ID);
final String consumeClientId = MapUtils.get(body, Constant.CLIENT_ID); final String consumerClientId = MapUtils.get(body, Constant.CLIENT_ID);
final ClientWrapper consumeClientWrapper = room.clientWrapper(consumeClientId); final ClientWrapper consumerClientWrapper = room.clientWrapper(consumerClientId);
final Map<String, Consumer> consumers = consumeClientWrapper.getConsumers(); final Map<String, Consumer> consumers = consumerClientWrapper.getConsumers();
final Map<String, Consumer> producerConsumers = producer.getConsumers(); final Map<String, Consumer> producerConsumers = producer.getConsumers();
final Consumer consumer = new Consumer(consumeClientWrapper, producer, kind, streamId, consumerId); final Consumer consumer = new Consumer(kind, streamId, consumerId, room, producer, consumerClientWrapper);
final Consumer oldConsumer = consumers.put(producerId, consumer); final Consumer oldConsumer = consumers.put(producerId, consumer);
final Consumer oldProducerConsumer = producerConsumers.put(consumerId, consumer); final Consumer oldProducerConsumer = producerConsumers.put(consumerId, consumer);
if(oldConsumer != null || oldProducerConsumer != null) { if(oldConsumer != null || oldProducerConsumer != null) {
log.warn("消费者已经存在:{}", consumerId); log.warn("消费者已经存在:{}", consumerId);
// TODO关闭旧的
} }
final Client consumeClient = consumeClientWrapper.getClient(); final Client consumeClient = consumerClientWrapper.getClient();
consumeClient.push(message); consumeClient.push(message);
} else { } else {
this.logNoAdapter(clientType); this.logNoAdapter(clientType);
@@ -110,26 +109,27 @@ public class MediaConsumeProtocol extends ProtocolRoomAdapter implements Applica
* 消费媒体 * 消费媒体
* *
* @param room 房间 * @param room 房间
* @param consumeClientWrapper 消费者终端包装器 * @param consumerClientWrapper 消费者终端包装器
* @param producer 生产者 * @param producer 生产者
* @param message 消息
*/ */
private void consume(Room room, ClientWrapper consumeClientWrapper, Producer producer) { private void consume(Room room, ClientWrapper consumerClientWrapper, Producer producer, Message message) {
if(consumeClientWrapper.consume(producer)) {
// TODO回调媒体服务准备完成
if(log.isDebugEnabled()) {
log.debug("已经消费媒体:{} - {}", consumeClientWrapper.getClientId(), producer.getStreamId());
}
return;
} else {
if(log.isDebugEnabled()) {
log.debug("消费媒体:{} - {}", consumeClientWrapper.getClientId(), producer.getStreamId());
}
}
final String clientId = consumeClientWrapper.getClientId();
final String streamId = producer.getStreamId() + "->" + clientId;
final Client mediaClient = room.getMediaClient(); final Client mediaClient = room.getMediaClient();
final Transport recvTransport = consumeClientWrapper.getRecvTransport(); if(consumerClientWrapper.consumed(producer)) {
final ClientWrapper produceClientWrapper = producer.getProduceClient(); // 消费通道准备就绪
if(log.isDebugEnabled()) {
log.debug("消费通道准备就绪:{} - {}", consumerClientWrapper.getClientId(), producer.getStreamId());
}
mediaClient.push(message);
} else {
// 主动消费媒体
if(log.isDebugEnabled()) {
log.debug("消费媒体:{} - {}", consumerClientWrapper.getClientId(), producer.getStreamId());
}
final String clientId = consumerClientWrapper.getClientId();
final String streamId = producer.getStreamId() + "->" + clientId;
final Transport recvTransport = consumerClientWrapper.getRecvTransport();
final ClientWrapper produceClientWrapper = producer.getProducerClient();
final Map<String, Object> body = new HashMap<>(); final Map<String, Object> body = new HashMap<>();
body.put(Constant.ROOM_ID, room.getRoomId()); body.put(Constant.ROOM_ID, room.getRoomId());
body.put(Constant.CLIENT_ID, clientId); body.put(Constant.CLIENT_ID, clientId);
@@ -137,9 +137,11 @@ public class MediaConsumeProtocol extends ProtocolRoomAdapter implements Applica
body.put(Constant.STREAM_ID, streamId); body.put(Constant.STREAM_ID, streamId);
body.put(Constant.PRODUCER_ID, producer.getProducerId()); body.put(Constant.PRODUCER_ID, producer.getProducerId());
body.put(Constant.TRANSPORT_ID, recvTransport.getTransportId()); body.put(Constant.TRANSPORT_ID, recvTransport.getTransportId());
body.put(Constant.RTP_CAPABILITIES, consumeClientWrapper.getRtpCapabilities()); body.put(Constant.RTP_CAPABILITIES, consumerClientWrapper.getRtpCapabilities());
body.put(Constant.SCTP_CAPABILITIES, consumeClientWrapper.getSctpCapabilities()); body.put(Constant.SCTP_CAPABILITIES, consumerClientWrapper.getSctpCapabilities());
mediaClient.push(this.build(body)); message.setBody(body);
mediaClient.push(message);
}
} }
} }

View File

@@ -1,5 +1,75 @@
package com.acgist.taoyao.signal.protocol.media; package com.acgist.taoyao.signal.protocol.media;
public class MediaConsumerCloseProtocol { import java.util.Map;
import org.springframework.context.ApplicationListener;
import com.acgist.taoyao.boot.annotation.Description;
import com.acgist.taoyao.boot.annotation.Protocol;
import com.acgist.taoyao.boot.config.Constant;
import com.acgist.taoyao.boot.model.Message;
import com.acgist.taoyao.boot.utils.MapUtils;
import com.acgist.taoyao.signal.client.Client;
import com.acgist.taoyao.signal.client.ClientType;
import com.acgist.taoyao.signal.event.ConsumerCloseEvent;
import com.acgist.taoyao.signal.party.media.Consumer;
import com.acgist.taoyao.signal.party.media.Room;
import com.acgist.taoyao.signal.protocol.ProtocolRoomAdapter;
import lombok.extern.slf4j.Slf4j;
/**
* 关闭消费者信令
*
* @author acgist
*/
@Slf4j
@Protocol
@Description(
body = """
{
"consumerId": "消费者ID"
}
""",
flow = "终端->信令服务+)终端"
)
public class MediaConsumerCloseProtocol extends ProtocolRoomAdapter implements ApplicationListener<ConsumerCloseEvent> {
public static final String SIGNAL = "media::consumer::close";
public MediaConsumerCloseProtocol() {
super("关闭消费者信令", SIGNAL);
}
@Override
public void onApplicationEvent(ConsumerCloseEvent event) {
final Room room = event.getRoom();
final Map<String, Object> body = Map.of(
Constant.ROOM_ID, room.getRoomId(),
Constant.CONSUMER_ID, event.getConsumerId()
);
this.close(room, this.build(body));
}
@Override
public void execute(String clientId, ClientType clientType, Room room, Client client, Client mediaClient, Message message, Map<String, Object> body) {
final String consumerId = MapUtils.get(body, Constant.CONSUMER_ID);
final Consumer consumer = room.consumer(consumerId);
if(consumer == null) {
log.warn("关闭消费者无效:{}", consumerId);
} else {
consumer.close();
}
}
/**
* 关闭消费者
*
* @param room 房间
* @param message 消息
*/
private void close(Room room, Message message) {
room.broadcastAll(message);
}
} }

View File

@@ -57,7 +57,7 @@ public class MediaProduceProtocol extends ProtocolRoomAdapter {
final String producerId = MapUtils.get(responseBody, Constant.PRODUCER_ID); final String producerId = MapUtils.get(responseBody, Constant.PRODUCER_ID);
final ClientWrapper clientWrapper = room.clientWrapper(client); final ClientWrapper clientWrapper = room.clientWrapper(client);
final Map<String, Producer> producers = clientWrapper.getProducers(); final Map<String, Producer> producers = clientWrapper.getProducers();
final Producer producer = producers.computeIfAbsent(producerId, key -> new Producer(clientWrapper, kind, streamId, producerId)); final Producer producer = producers.computeIfAbsent(producerId, key -> new Producer(kind, streamId, producerId, room, clientWrapper));
final Message responseMessage = response.cloneWithoutBody(); final Message responseMessage = response.cloneWithoutBody();
responseMessage.setBody(Map.of( responseMessage.setBody(Map.of(
Constant.KIND, kind, Constant.KIND, kind,

View File

@@ -42,7 +42,7 @@ public class MediaRouterRtpCapabilitiesProtocol extends ProtocolRoomAdapter {
@Override @Override
public void execute(String clientId, ClientType clientType, Room room, Client client, Client mediaClient, Message message, Map<String, Object> body) { public void execute(String clientId, ClientType clientType, Room room, Client client, Client mediaClient, Message message, Map<String, Object> body) {
if(clientType == ClientType.WEB || clientType == ClientType.CAMERA) { if(clientType.mediaClient()) {
client.push(room.request(message)); client.push(room.request(message));
} else { } else {
this.logNoAdapter(clientType); this.logNoAdapter(clientType);

View File

@@ -38,7 +38,7 @@ public class MediaTransportWebRtcConnectProtocol extends ProtocolRoomAdapter {
@Override @Override
public void execute(String clientId, ClientType clientType, Room room, Client client, Client mediaClient, Message message, Map<String, Object> body) { public void execute(String clientId, ClientType clientType, Room room, Client client, Client mediaClient, Message message, Map<String, Object> body) {
if(clientType == ClientType.WEB || clientType == ClientType.CAMERA) { if(clientType.mediaClient()) {
final Message response = room.request(message); final Message response = room.request(message);
final Map<String, Object> responseBody = response.body(); final Map<String, Object> responseBody = response.body();
client.push(response); client.push(response);

View File

@@ -23,7 +23,7 @@ import com.acgist.taoyao.signal.protocol.ProtocolRoomAdapter;
"roomId": "房间ID" "roomId": "房间ID"
} }
""", """,
flow = "终端->信令服务->媒体服务->信令服务+)终端" flow = "终端->信令服务+)终端"
) )
public class RoomCloseProtocol extends ProtocolRoomAdapter { public class RoomCloseProtocol extends ProtocolRoomAdapter {
@@ -35,9 +35,10 @@ public class RoomCloseProtocol extends ProtocolRoomAdapter {
@Override @Override
public void execute(String clientId, ClientType clientType, Room room, Client client, Client mediaClient, Message message, Map<String, Object> body) { public void execute(String clientId, ClientType clientType, Room room, Client client, Client mediaClient, Message message, Map<String, Object> body) {
if(clientType == ClientType.WEB) { // TODO改为星型
if(clientType.web()) {
mediaClient.push(this.build(Map.of(Constant.ROOM_ID, room.getRoomId()))); mediaClient.push(this.build(Map.of(Constant.ROOM_ID, room.getRoomId())));
} else if(clientType == ClientType.MEDIA) { } else if(clientType.mediaServer()) {
room.close(); room.close();
room.broadcast(message); room.broadcast(message);
} else { } else {

View File

@@ -57,7 +57,7 @@ public class RoomCreateProtocol extends ProtocolClientAdapter implements Applica
@Override @Override
public void execute(String clientId, ClientType clientType, Client client, Message message, Map<String, Object> body) { public void execute(String clientId, ClientType clientType, Client client, Message message, Map<String, Object> body) {
if(clientType == ClientType.WEB) { if(clientType.web()) {
// WEB同步创建房间 // WEB同步创建房间
final Room room = this.roomManager.create( final Room room = this.roomManager.create(
MapUtils.get(body, Constant.NAME), MapUtils.get(body, Constant.NAME),

View File

@@ -52,7 +52,7 @@ public class RoomEnterProtocol extends ProtocolRoomAdapter {
@Override @Override
public void execute(String clientId, ClientType clientType, Room room, Client client, Client mediaClient, Message message, Map<String, Object> body) { public void execute(String clientId, ClientType clientType, Room room, Client client, Client mediaClient, Message message, Map<String, Object> body) {
if(clientType == ClientType.WEB || clientType == ClientType.CAMERA) { if(clientType.mediaClient()) {
this.enter(clientId, room, client, message, body); this.enter(clientId, room, client, message, body);
} else { } else {
this.logNoAdapter(clientType); this.logNoAdapter(clientType);