[*] 视频分享

This commit is contained in:
acgist
2023-02-26 13:09:50 +08:00
parent e3ead16857
commit a0ebe8c842
10 changed files with 227 additions and 44 deletions

View File

@@ -1,6 +1,5 @@
package com.acgist.taoyao.signal.event;
import com.acgist.taoyao.signal.client.Client;
import com.acgist.taoyao.signal.flute.media.Producer;
import com.acgist.taoyao.signal.flute.media.Room;
@@ -18,12 +17,13 @@ public class MediaProduceEvent extends RoomEventAdapter {
private static final long serialVersionUID = 1L;
private final Client client;
/**
* 生产者
*/
private final Producer producer;
public MediaProduceEvent(Room room, Client client, Producer producer) {
public MediaProduceEvent(Room room, Producer producer) {
super(room);
this.client = client;
this.producer = producer;
}

View File

@@ -10,7 +10,7 @@ import lombok.Getter;
import lombok.Setter;
/**
* Peer
* 终端包装器
*
* @author acgist
*/
@@ -20,7 +20,6 @@ public class ClientWrapper {
/**
* 订阅类型
* 如果需要订阅指定终端需要调用媒体消费信令
*
* @author acgist
*/
@@ -35,6 +34,24 @@ public class ClientWrapper {
// 没有订阅任何媒体
NONE;
public static final SubscribeType of(String value) {
for (SubscribeType type : SubscribeType.values()) {
if(type.name().equalsIgnoreCase(value)) {
return type;
}
}
return SubscribeType.ALL;
}
public boolean consume(Producer producer) {
return switch (this) {
case NONE -> false;
case ALL_AUDIO -> producer.getKind() == Kind.AUDIO;
case ALL_VIDEO -> producer.getKind() == Kind.VIDEO;
default -> true;
};
}
}
/**
@@ -53,6 +70,12 @@ public class ClientWrapper {
* 终端标识
*/
private final String clientId;
/**
* 订阅类型
* 指定订阅类型终端注册或者生成媒体后会自动进行媒体推流拉流
* 没有订阅任何媒体时需要用户自己对媒体进行消费控制
*/
private SubscribeType subscribeType;
private Object rtpCapabilities;
private Object sctpCapabilities;
/**
@@ -98,4 +121,15 @@ public class ClientWrapper {
.intValue();
}
/**
* 是否已经消费
*
* @param producer
* @return
*/
public boolean consume(Producer producer) {
return this.producers.values().stream()
.anyMatch(v -> v.getConsumers().values().stream().anyMatch(c -> c.getProducer() == producer));
}
}

View File

@@ -15,7 +15,7 @@ public class Consumer {
/**
* 消费者终端
*/
private final ClientWrapper client;
private final ClientWrapper consumeClient;
/**
* 生产者
*/
@@ -33,8 +33,8 @@ public class Consumer {
*/
private final String consumerId;
public Consumer(ClientWrapper client, Producer producer, String kind, String streamId, String consumerId) {
this.client = client;
public Consumer(ClientWrapper consumeClient, Producer producer, String kind, String streamId, String consumerId) {
this.consumeClient = consumeClient;
this.producer = producer;
this.kind = Kind.of(kind);
this.streamId = streamId;

View File

@@ -18,7 +18,7 @@ public class Producer {
/**
* 生产者终端
*/
private final ClientWrapper client;
private final ClientWrapper produceClient;
/**
* 媒体类型
*/
@@ -36,12 +36,12 @@ public class Producer {
*/
private final Map<String, Consumer> consumers;
public Producer(ClientWrapper client, String kind, String streamId, String producerId) {
this.client = client;
public Producer(ClientWrapper produceClient, String kind, String streamId, String producerId) {
this.produceClient = produceClient;
this.kind = Kind.of(kind);
this.streamId = streamId;
this.producerId = producerId;
this.consumers = new ConcurrentHashMap<>();
}
}

View File

@@ -47,12 +47,13 @@ public class MediaConsumeProtocol extends ProtocolRoomAdapter implements Applica
@Async
@Override
public void onApplicationEvent(MediaProduceEvent event) {
// TODO根据类型进行消费
final Room room = event.getRoom();
final Client client = event.getClient();
final Producer producer = event.getProducer();
room.getClients().keySet().stream()
.filter(v -> v != client)
final ClientWrapper clientWrapper = producer.getProduceClient();
final Client client = clientWrapper.getClient();
room.getClients().values().stream()
.filter(v -> v.getClient() != client)
.filter(v -> v.getSubscribeType().consume(producer))
.forEach(v -> this.consume(room, v, producer));
}
@@ -62,7 +63,7 @@ public class MediaConsumeProtocol extends ProtocolRoomAdapter implements Applica
final Producer producer = room.producer(producerId);
if(clientType == ClientType.WEB || clientType == ClientType.CAMERA) {
// 请求消费
this.consume(room, client, producer);
this.consume(room, room.clientWrapper(client), producer);
} else if(clientType == ClientType.MEDIA) {
// 等待消费者准备完成
final String kind = MapUtils.get(body, Constant.KIND);
@@ -79,8 +80,7 @@ public class MediaConsumeProtocol extends ProtocolRoomAdapter implements Applica
}
final Client consumeClient = consumeClientWrapper.getClient();
consumers.put(consumerId, new Consumer(consumeClientWrapper, producer, kind, streamId, consumerId));
final Message response = consumeClient.request(message);
client.push(response);
consumeClient.push(message);
} else {
// TODOlog
}
@@ -93,12 +93,15 @@ public class MediaConsumeProtocol extends ProtocolRoomAdapter implements Applica
* @param consumeClient
* @param producer
*/
private void consume(Room room, Client consumeClient, Producer producer) {
private void consume(Room room, ClientWrapper consumeClientWrapper, Producer producer) {
if(producer.getProduceClient().consume(producer)) {
log.debug("已经消费:{}", consumeClientWrapper.getClientId());
return;
}
final Client mediaClient = room.getMediaClient();
final ClientWrapper consumeClientWrapper = room.clientWrapper(consumeClient);
final Transport recvTransport = consumeClientWrapper.getRecvTransport();
final Map<String, Object> body = new HashMap<>();
final String clientId = consumeClient.clientId();
final String clientId = consumeClientWrapper.getClientId();
final String streamId = producer.getStreamId() + "->" + clientId;
body.put(Constant.ROOM_ID, room.getRoomId());
body.put(Constant.CLIENT_ID, clientId);

View File

@@ -71,7 +71,7 @@ public class MediaProduceProtocol extends ProtocolRoomAdapter {
// 全部不收:全部广播
room.broadcast(responseMessage);
log.info("{}生产媒体:{} - {} - {}", clientId, kind, streamId, producerId);
this.publishEvent(new MediaProduceEvent(room, client, producer));
this.publishEvent(new MediaProduceEvent(room, producer));
}
}

View File

@@ -14,6 +14,7 @@ import com.acgist.taoyao.boot.utils.MapUtils;
import com.acgist.taoyao.boot.utils.NetUtils;
import com.acgist.taoyao.signal.client.Client;
import com.acgist.taoyao.signal.client.ClientType;
import com.acgist.taoyao.signal.event.MediaProduceEvent;
import com.acgist.taoyao.signal.flute.media.ClientWrapper;
import com.acgist.taoyao.signal.flute.media.Room;
import com.acgist.taoyao.signal.flute.media.Transport;
@@ -74,6 +75,7 @@ public class MediaTransportWebRtcCreateProtocol extends ProtocolRoomAdapter {
}
// 拷贝属性
recvTransport.copy(responseBody);
this.produce(room, clientWrapper);
}
// 生产者
final Boolean producing = MapUtils.getBoolean(body, Constant.PRODUCING);
@@ -112,4 +114,17 @@ public class MediaTransportWebRtcCreateProtocol extends ProtocolRoomAdapter {
});
}
/**
* 生产数据
*
* @param room
* @param clientWrapper
*/
private void produce(Room room, ClientWrapper clientWrapper) {
room.getClients().values().stream()
.filter(v -> v != clientWrapper)
.flatMap(v -> v.getProducers().values().stream())
.forEach(producer -> this.publishEvent(new MediaProduceEvent(room, producer)));
}
}

View File

@@ -12,6 +12,7 @@ import com.acgist.taoyao.boot.utils.MapUtils;
import com.acgist.taoyao.signal.client.Client;
import com.acgist.taoyao.signal.client.ClientType;
import com.acgist.taoyao.signal.flute.media.ClientWrapper;
import com.acgist.taoyao.signal.flute.media.ClientWrapper.SubscribeType;
import com.acgist.taoyao.signal.flute.media.Room;
import com.acgist.taoyao.signal.protocol.ProtocolRoomAdapter;
@@ -47,10 +48,11 @@ public class RoomEnterProtocol extends ProtocolRoomAdapter {
public RoomEnterProtocol() {
super("进入房间信令", SIGNAL);
}
@Override
public void execute(String clientId, ClientType clientType, Room room, Client client, Client mediaClient, Message message, Map<String, Object> body) {
final String password = MapUtils.get(body, Constant.PASSWORD);
final String subscribeType = MapUtils.get(body, Constant.SUBSCRIBE_TYPE);
final Object rtpCapabilities = MapUtils.get(body, Constant.RTP_CAPABILITIES);
final Object sctpCapabilities = MapUtils.get(body, Constant.SCTP_CAPABILITIES);
final String roomPassowrd = room.getPassword();
@@ -59,6 +61,7 @@ public class RoomEnterProtocol extends ProtocolRoomAdapter {
}
// 进入房间
final ClientWrapper clientWrapper = room.enter(client);
clientWrapper.setSubscribeType(SubscribeType.of(subscribeType));
clientWrapper.setRtpCapabilities(rtpCapabilities);
clientWrapper.setSctpCapabilities(sctpCapabilities);
// 发送通知