This commit is contained in:
acgist
2023-02-25 16:00:19 +08:00
parent 7e693da12a
commit 523913c2d3
6 changed files with 91 additions and 25 deletions

View File

@@ -1,20 +1,44 @@
package com.acgist.taoyao.signal.flute.media; package com.acgist.taoyao.signal.flute.media;
import com.acgist.taoyao.signal.client.Client; import lombok.Getter;
import lombok.Setter;
/** /**
* 消费者 * 消费者
* *
* @author acgist * @author acgist
*/ */
@Getter
@Setter
public class Consumer { public class Consumer {
private Client client;
private Room room;
/** /**
* 终端ID * 消费者终端
* 来源终端ID->
*/ */
private String clientId; private final ClientWrapper client;
/**
* 生产者
*/
private final Producer producer;
/**
* 媒体类型
*/
private final Kind kind;
/**
* 媒体流ID
*/
private final String streamId;
/**
* 消费者标识
*/
private final String consumerId;
public Consumer(ClientWrapper client, Producer producer, String kind, String streamId, String consumerId) {
this.client = client;
this.producer = producer;
this.kind = Kind.of(kind);
this.streamId = streamId;
this.consumerId = consumerId;
}
} }

View File

@@ -1,8 +1,7 @@
package com.acgist.taoyao.signal.flute.media; package com.acgist.taoyao.signal.flute.media;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import com.acgist.taoyao.signal.client.Client;
import lombok.Getter; import lombok.Getter;
import lombok.Setter; import lombok.Setter;
@@ -17,9 +16,9 @@ import lombok.Setter;
public class Producer { public class Producer {
/** /**
* 终端 * 生产者终端
*/ */
private final Client client; private final ClientWrapper client;
/** /**
* 媒体类型 * 媒体类型
*/ */
@@ -29,19 +28,20 @@ public class Producer {
*/ */
private final String streamId; private final String streamId;
/** /**
* 消费者标识 * 生产者标识
*/ */
private final String producerId; private final String producerId;
/** /**
* 消费者 * 消费者
*/ */
private Map<String, Consumer> consumers; private final Map<String, Consumer> consumers;
public Producer(Client client, String kind, String streamId, String producerId) { public Producer(ClientWrapper client, String kind, String streamId, String producerId) {
this.client = client; this.client = client;
this.kind = Kind.of(kind); this.kind = Kind.of(kind);
this.streamId = streamId; this.streamId = streamId;
this.producerId = producerId; this.producerId = producerId;
this.consumers = new ConcurrentHashMap<>();
} }
} }

View File

@@ -156,10 +156,22 @@ public class Room implements Closeable {
* @param client * @param client
* @return * @return
*/ */
public ClientWrapper client(Client client) { public ClientWrapper clientWrapper(Client client) {
return this.clients.get(client); return this.clients.get(client);
} }
/**
*
* @param client
* @return
*/
public ClientWrapper clientWrapper(String clientId) {
return this.clients.values().stream()
.filter(v -> clientId.equals(v.getClientId()))
.findFirst()
.orElse(null);
}
/** /**
* *
* @param producerId * @param producerId

View File

@@ -15,16 +15,20 @@ import com.acgist.taoyao.signal.client.Client;
import com.acgist.taoyao.signal.client.ClientType; import com.acgist.taoyao.signal.client.ClientType;
import com.acgist.taoyao.signal.event.MediaProduceEvent; import com.acgist.taoyao.signal.event.MediaProduceEvent;
import com.acgist.taoyao.signal.flute.media.ClientWrapper; import com.acgist.taoyao.signal.flute.media.ClientWrapper;
import com.acgist.taoyao.signal.flute.media.Consumer;
import com.acgist.taoyao.signal.flute.media.Producer; import com.acgist.taoyao.signal.flute.media.Producer;
import com.acgist.taoyao.signal.flute.media.Room; import com.acgist.taoyao.signal.flute.media.Room;
import com.acgist.taoyao.signal.flute.media.Transport; import com.acgist.taoyao.signal.flute.media.Transport;
import com.acgist.taoyao.signal.protocol.ProtocolRoomAdapter; import com.acgist.taoyao.signal.protocol.ProtocolRoomAdapter;
import lombok.extern.slf4j.Slf4j;
/** /**
* 消费媒体信令 * 消费媒体信令
* *
* @author acgist * @author acgist
*/ */
@Slf4j
@Protocol @Protocol
@Description( @Description(
flow = { flow = {
@@ -54,29 +58,55 @@ public class MediaConsumeProtocol extends ProtocolRoomAdapter implements Applica
@Override @Override
public void execute(String clientId, ClientType clientType, Room room, Client client, Client mediaClient, Message message, Map<String, Object> body) { public void execute(String clientId, ClientType clientType, Room room, Client client, Client mediaClient, Message message, Map<String, Object> body) {
final String kind = MapUtils.get(body, Constant.KIND);
final String producerId = MapUtils.get(body, Constant.PRODUCER_ID); final String producerId = MapUtils.get(body, Constant.PRODUCER_ID);
final Producer producer = room.producer(producerId); final Producer producer = room.producer(producerId);
final String streamId = producer.getStreamId() + "->" + clientId;
if(clientType == ClientType.WEB || clientType == ClientType.CAMERA) { if(clientType == ClientType.WEB || clientType == ClientType.CAMERA) {
// 请求消费 // 请求消费
this.consume(room, client, producer);
} else if(clientType == ClientType.MEDIA) { } else if(clientType == ClientType.MEDIA) {
// 等待消费者准备完成 // 等待消费者准备完成
final String kind = MapUtils.get(body, Constant.KIND);
final String streamId = MapUtils.get(body, Constant.STREAM_ID);
final String consumerId = MapUtils.get(body, Constant.CONSUMER_ID);
final String consumeClientId = MapUtils.get(body, Constant.CLIENT_ID);
final ClientWrapper consumeClientWrapper = room.clientWrapper(consumeClientId);
final Map<String, Consumer> consumers = producer.getConsumers();
final Consumer consumer = consumers.get(producerId);
if(consumer != null) {
log.warn("消费者已经存在:{}", consumerId);
// TODO关闭旧的
// consumer.close();
}
final Client consumeClient = consumeClientWrapper.getClient();
consumers.put(consumerId, new Consumer(consumeClientWrapper, producer, kind, streamId, consumerId));
final Message response = consumeClient.request(message);
client.push(response);
} else { } else {
// TODOlog
} }
} }
private void consume(Room room, Client client, Producer producer) { /**
* 消费媒体
*
* @param room
* @param consumeClient
* @param producer
*/
private void consume(Room room, Client consumeClient, Producer producer) {
final Client mediaClient = room.getMediaClient(); final Client mediaClient = room.getMediaClient();
final ClientWrapper clientWrapper = room.client(client); final ClientWrapper consumeClientWrapper = room.clientWrapper(consumeClient);
final Transport recvTransport = clientWrapper.getRecvTransport(); final Transport recvTransport = consumeClientWrapper.getRecvTransport();
final Map<String, Object> body = new HashMap<>(); final Map<String, Object> body = new HashMap<>();
final String clientId = consumeClient.clientId();
final String streamId = producer.getStreamId() + "->" + clientId;
body.put(Constant.ROOM_ID, room.getRoomId()); body.put(Constant.ROOM_ID, room.getRoomId());
body.put(Constant.CLIENT_ID, clientId);
body.put(Constant.STREAM_ID, streamId);
body.put(Constant.PRODUCER_ID, producer.getProducerId()); body.put(Constant.PRODUCER_ID, producer.getProducerId());
body.put(Constant.TRANSPORT_ID, recvTransport.getTransportId()); body.put(Constant.TRANSPORT_ID, recvTransport.getTransportId());
body.put(Constant.RTP_CAPABILITIES, clientWrapper.getRtpCapabilities()); body.put(Constant.RTP_CAPABILITIES, consumeClientWrapper.getRtpCapabilities());
body.put(Constant.SCTP_CAPABILITIES, clientWrapper.getSctpCapabilities()); body.put(Constant.SCTP_CAPABILITIES, consumeClientWrapper.getSctpCapabilities());
mediaClient.push(this.build(body)); mediaClient.push(this.build(body));
} }

View File

@@ -55,9 +55,9 @@ public class MediaProduceProtocol extends ProtocolRoomAdapter {
final Message response = room.request(message); final Message response = room.request(message);
final Map<String, Object> responseBody = response.mapBody(); final Map<String, Object> responseBody = response.mapBody();
final String producerId = MapUtils.get(responseBody, Constant.PRODUCER_ID); final String producerId = MapUtils.get(responseBody, Constant.PRODUCER_ID);
final ClientWrapper clientWrapper = room.client(client); final ClientWrapper clientWrapper = room.clientWrapper(client);
final Map<String, Producer> producers = clientWrapper.getProducers(); final Map<String, Producer> producers = clientWrapper.getProducers();
final Producer producer = producers.computeIfAbsent(producerId, key -> new Producer(client, kind, streamId, producerId)); final Producer producer = producers.computeIfAbsent(producerId, key -> new Producer(clientWrapper, kind, streamId, producerId));
final Message responseMessage = response.cloneWithoutBody(); final Message responseMessage = response.cloneWithoutBody();
responseMessage.setBody(Map.of( responseMessage.setBody(Map.of(
Constant.KIND, kind, Constant.KIND, kind,

View File

@@ -63,7 +63,7 @@ public class MediaTransportWebRtcCreateProtocol extends ProtocolRoomAdapter {
// 重写地址 // 重写地址
this.rewriteIp(client.ip(), responseBody); this.rewriteIp(client.ip(), responseBody);
// 处理逻辑 // 处理逻辑
final ClientWrapper clientWrapper = room.client(client); final ClientWrapper clientWrapper = room.clientWrapper(client);
// 消费者 // 消费者
final Boolean consuming = MapUtils.getBoolean(body, Constant.CONSUMING); final Boolean consuming = MapUtils.getBoolean(body, Constant.CONSUMING);
if(Boolean.TRUE.equals(consuming)) { if(Boolean.TRUE.equals(consuming)) {