diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/flute/media/Consumer.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/flute/media/Consumer.java index 0982ca6..627bbb4 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/flute/media/Consumer.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/flute/media/Consumer.java @@ -1,20 +1,44 @@ package com.acgist.taoyao.signal.flute.media; -import com.acgist.taoyao.signal.client.Client; +import lombok.Getter; +import lombok.Setter; /** * 消费者 * * @author acgist */ +@Getter +@Setter public class Consumer { - private Client client; - private Room room; /** - * 终端ID - * 来源终端ID-> + * 消费者终端 */ - private String clientId; + private final ClientWrapper client; + /** + * 生产者 + */ + private final Producer producer; + /** + * 媒体类型 + */ + private final Kind kind; + /** + * 媒体流ID + */ + private final String streamId; + /** + * 消费者标识 + */ + private final String consumerId; + + public Consumer(ClientWrapper client, Producer producer, String kind, String streamId, String consumerId) { + this.client = client; + this.producer = producer; + this.kind = Kind.of(kind); + this.streamId = streamId; + this.consumerId = consumerId; + } } diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/flute/media/Producer.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/flute/media/Producer.java index 1dadc71..f7108a3 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/flute/media/Producer.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/flute/media/Producer.java @@ -1,8 +1,7 @@ package com.acgist.taoyao.signal.flute.media; import java.util.Map; - -import com.acgist.taoyao.signal.client.Client; +import java.util.concurrent.ConcurrentHashMap; import lombok.Getter; import lombok.Setter; @@ -17,9 +16,9 @@ import lombok.Setter; public class Producer { /** - * 终端 + * 生产者终端 */ - private final Client client; + private final ClientWrapper client; /** * 媒体类型 */ @@ -29,19 +28,20 @@ public class Producer { */ private final String streamId; /** - * 消费者标识 + * 生产者标识 */ private final String producerId; /** * 消费者 */ - private Map consumers; + private final Map consumers; - public Producer(Client client, String kind, String streamId, String producerId) { + public Producer(ClientWrapper client, String kind, String streamId, String producerId) { this.client = client; this.kind = Kind.of(kind); this.streamId = streamId; this.producerId = producerId; + this.consumers = new ConcurrentHashMap<>(); } } diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/flute/media/Room.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/flute/media/Room.java index 9714137..6d6ebc5 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/flute/media/Room.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/flute/media/Room.java @@ -156,10 +156,22 @@ public class Room implements Closeable { * @param client * @return */ - public ClientWrapper client(Client client) { + public ClientWrapper clientWrapper(Client client) { return this.clients.get(client); } + /** + * + * @param client + * @return + */ + public ClientWrapper clientWrapper(String clientId) { + return this.clients.values().stream() + .filter(v -> clientId.equals(v.getClientId())) + .findFirst() + .orElse(null); + } + /** * * @param producerId diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaConsumeProtocol.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaConsumeProtocol.java index 1263a34..cf263c5 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaConsumeProtocol.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaConsumeProtocol.java @@ -15,16 +15,20 @@ import com.acgist.taoyao.signal.client.Client; import com.acgist.taoyao.signal.client.ClientType; import com.acgist.taoyao.signal.event.MediaProduceEvent; import com.acgist.taoyao.signal.flute.media.ClientWrapper; +import com.acgist.taoyao.signal.flute.media.Consumer; import com.acgist.taoyao.signal.flute.media.Producer; import com.acgist.taoyao.signal.flute.media.Room; import com.acgist.taoyao.signal.flute.media.Transport; import com.acgist.taoyao.signal.protocol.ProtocolRoomAdapter; +import lombok.extern.slf4j.Slf4j; + /** * 消费媒体信令 * * @author acgist */ +@Slf4j @Protocol @Description( flow = { @@ -54,29 +58,55 @@ public class MediaConsumeProtocol extends ProtocolRoomAdapter implements Applica @Override public void execute(String clientId, ClientType clientType, Room room, Client client, Client mediaClient, Message message, Map body) { - final String kind = MapUtils.get(body, Constant.KIND); final String producerId = MapUtils.get(body, Constant.PRODUCER_ID); final Producer producer = room.producer(producerId); - final String streamId = producer.getStreamId() + "->" + clientId; if(clientType == ClientType.WEB || clientType == ClientType.CAMERA) { // 请求消费 + this.consume(room, client, producer); } else if(clientType == ClientType.MEDIA) { // 等待消费者准备完成 - + final String kind = MapUtils.get(body, Constant.KIND); + final String streamId = MapUtils.get(body, Constant.STREAM_ID); + final String consumerId = MapUtils.get(body, Constant.CONSUMER_ID); + final String consumeClientId = MapUtils.get(body, Constant.CLIENT_ID); + final ClientWrapper consumeClientWrapper = room.clientWrapper(consumeClientId); + final Map consumers = producer.getConsumers(); + final Consumer consumer = consumers.get(producerId); + if(consumer != null) { + log.warn("消费者已经存在:{}", consumerId); + // TODO:关闭旧的 +// consumer.close(); + } + final Client consumeClient = consumeClientWrapper.getClient(); + consumers.put(consumerId, new Consumer(consumeClientWrapper, producer, kind, streamId, consumerId)); + final Message response = consumeClient.request(message); + client.push(response); } else { + // TODO:log } } - private void consume(Room room, Client client, Producer producer) { + /** + * 消费媒体 + * + * @param room + * @param consumeClient + * @param producer + */ + private void consume(Room room, Client consumeClient, Producer producer) { final Client mediaClient = room.getMediaClient(); - final ClientWrapper clientWrapper = room.client(client); - final Transport recvTransport = clientWrapper.getRecvTransport(); + final ClientWrapper consumeClientWrapper = room.clientWrapper(consumeClient); + final Transport recvTransport = consumeClientWrapper.getRecvTransport(); final Map body = new HashMap<>(); + final String clientId = consumeClient.clientId(); + final String streamId = producer.getStreamId() + "->" + clientId; body.put(Constant.ROOM_ID, room.getRoomId()); + body.put(Constant.CLIENT_ID, clientId); + body.put(Constant.STREAM_ID, streamId); body.put(Constant.PRODUCER_ID, producer.getProducerId()); body.put(Constant.TRANSPORT_ID, recvTransport.getTransportId()); - body.put(Constant.RTP_CAPABILITIES, clientWrapper.getRtpCapabilities()); - body.put(Constant.SCTP_CAPABILITIES, clientWrapper.getSctpCapabilities()); + body.put(Constant.RTP_CAPABILITIES, consumeClientWrapper.getRtpCapabilities()); + body.put(Constant.SCTP_CAPABILITIES, consumeClientWrapper.getSctpCapabilities()); mediaClient.push(this.build(body)); } diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaProduceProtocol.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaProduceProtocol.java index d32a964..aa12d52 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaProduceProtocol.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaProduceProtocol.java @@ -55,9 +55,9 @@ public class MediaProduceProtocol extends ProtocolRoomAdapter { final Message response = room.request(message); final Map responseBody = response.mapBody(); final String producerId = MapUtils.get(responseBody, Constant.PRODUCER_ID); - final ClientWrapper clientWrapper = room.client(client); + final ClientWrapper clientWrapper = room.clientWrapper(client); final Map producers = clientWrapper.getProducers(); - final Producer producer = producers.computeIfAbsent(producerId, key -> new Producer(client, kind, streamId, producerId)); + final Producer producer = producers.computeIfAbsent(producerId, key -> new Producer(clientWrapper, kind, streamId, producerId)); final Message responseMessage = response.cloneWithoutBody(); responseMessage.setBody(Map.of( Constant.KIND, kind, diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaTransportWebRtcCreateProtocol.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaTransportWebRtcCreateProtocol.java index c6c8817..6a6a4d6 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaTransportWebRtcCreateProtocol.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaTransportWebRtcCreateProtocol.java @@ -63,7 +63,7 @@ public class MediaTransportWebRtcCreateProtocol extends ProtocolRoomAdapter { // 重写地址 this.rewriteIp(client.ip(), responseBody); // 处理逻辑 - final ClientWrapper clientWrapper = room.client(client); + final ClientWrapper clientWrapper = room.clientWrapper(client); // 消费者 final Boolean consuming = MapUtils.getBoolean(body, Constant.CONSUMING); if(Boolean.TRUE.equals(consuming)) {