diff --git a/taoyao-client-web/src/components/Taoyao.js b/taoyao-client-web/src/components/Taoyao.js index fe19d8d..4e6aef7 100644 --- a/taoyao-client-web/src/components/Taoyao.js +++ b/taoyao-client-web/src/components/Taoyao.js @@ -10,6 +10,20 @@ import { defaultRTCPeerConnectionConfig, } from "./Config.js"; +// Used for simulcast webcam video. +const WEBCAM_SIMULCAST_ENCODINGS = +[ + { scaleResolutionDownBy: 4, maxBitrate: 500000, scalabilityMode: 'S1T2' }, + { scaleResolutionDownBy: 2, maxBitrate: 1000000, scalabilityMode: 'S1T2' }, + { scaleResolutionDownBy: 1, maxBitrate: 5000000, scalabilityMode: 'S1T2' } +]; + +// Used for VP9 webcam video. +const WEBCAM_KSVC_ENCODINGS = +[ + { scalabilityMode: 'S3T3_KEY' } +]; + /** * 信令通道 */ @@ -366,6 +380,13 @@ class Taoyao { consume; // 是否生产 produce; + // 视频来源:file | camera | screen + videoSource = "screen"; + // 强制使用VP9 + forceVP9; + // 强制使用H264 + forceH264; + useSimulcast; // 是否生产音频 audioProduce; // 是否生成视频 @@ -450,18 +471,17 @@ class Taoyao { } } // 错误回调 - const errorMessage = protocol.buildMessage("platform::error", { message }, -9999); + const errorMessage = protocol.buildMessage( + "platform::error", + { message }, + -9999 + ); errorMessage.code = "-9999"; errorMessage.message = message; - self.callback( - errorMessage, - error - ); + self.callback(errorMessage, error); } async roomList() { - const response = await this.request( - protocol.buildMessage("room::list") - ); + const response = await this.request(protocol.buildMessage("room::list")); return response.body; } async mediaList() { @@ -711,7 +731,11 @@ class Taoyao { const stream = await navigator.mediaDevices.getUserMedia({ audio: true, }); - track = stream.getAudioTracks()[0]; + const tracks = stream.getAudioTracks(); + if (tracks.length > 1) { + console.log("多个音频轨道"); + } + track = tracks[0]; this.audioProducer = await this.sendTransport.produce({ track, codecOptions: { @@ -797,7 +821,108 @@ class Taoyao { * 生产视频 * TODO:重复点击 */ - async produceVideo() {} + async produceVideo() { + console.debug("打开摄像头"); + const self = this; + if (self.videoProduce && self.mediasoupDevice.canProduce("video")) { + if (self.videoProducer) { + return; + } + let track; + try { + if (self.videoSource === "file") { + // TODO:实现文件分享 + // const stream = await this._getExternalVideoStream(); + // track = stream.getVideoTracks()[0].clone(); + } else if (self.videoSource === "camera") { + console.debug("enableWebcam() | calling getUserMedia()"); + // TODO:参数 + const stream = await navigator.mediaDevices.getUserMedia({ + video: true, + }); + track = stream.getVideoTracks()[0]; + } else if (self.videoSource === "screen") { + const stream = await navigator.mediaDevices.getDisplayMedia({ + audio: false, + video: { + displaySurface: "monitor", + logicalSurface: true, + cursor: true, + width: { max: 1920 }, + height: { max: 1080 }, + frameRate: { max: 30 }, + }, + }); + track = stream.getVideoTracks()[0]; + } else { + // TODO:异常 + } + let codec; + let encodings; + const codecOptions = { + videoGoogleStartBitrate: 1000, + }; + if (self.forceH264) { + codec = self.mediasoupDevice.rtpCapabilities.codecs.find( + (c) => c.mimeType.toLowerCase() === "video/h264" + ); + if (!codec) { + throw new Error( + "desired H264 codec+configuration is not supported" + ); + } + } else if (self.forceVP9) { + codec = self.mediasoupDevice.rtpCapabilities.codecs.find( + (c) => c.mimeType.toLowerCase() === "video/vp9" + ); + if (!codec) { + throw new Error("desired VP9 codec+configuration is not supported"); + } + } + if (this.useSimulcast) { + // If VP9 is the only available video codec then use SVC. + const firstVideoCodec = + this.mediasoupDevice.rtpCapabilities.codecs.find( + (c) => c.kind === "video" + ); + if ( + (this.forceVP9 && codec) || + firstVideoCodec.mimeType.toLowerCase() === "video/vp9" + ) { + encodings = WEBCAM_KSVC_ENCODINGS; + } else { + encodings = WEBCAM_SIMULCAST_ENCODINGS; + } + } + this.videoProducer = await this.sendTransport.produce({ + codec, + track, + encodings, + codecOptions, + }); + + // if (this._e2eKey && e2e.isSupported()) { + // e2e.setupSenderTransform(this.videoProducer.rtpSender); + // } + + this.videoProducer.on("transportclose", () => { + this.videoProducer = null; + }); + + this.videoProducer.on("trackended", () => { + console.warn("video producer trackended", this.audioProducer); + this.closeVideoProducer().catch(() => {}); + }); + } catch (error) { + self.callbackError("摄像头打开异常", error); + if (track) { + track.stop(); + } + } + } else { + console.error("打开摄像头失败"); + } + } /** * 消费媒体 @@ -866,18 +991,17 @@ class Taoyao { // ) // ); self.push(message); - console.log(consumer) + console.log(consumer); - - const audioElem = document.createElement('video'); - document.getElementsByTagName('body')[0].appendChild(audioElem) + const audioElem = document.createElement("video"); + document.getElementsByTagName("body")[0].appendChild(audioElem); const stream = new MediaStream(); - stream.addTrack(consumer.track); - audioElem.srcObject = stream; - audioElem.play() - .catch((error) => logger.warn('audioElem.play() failed:%o', error)); - + stream.addTrack(consumer.track); + audioElem.srcObject = stream; + audioElem + .play() + .catch((error) => console.warn("audioElem.play() failed:%o", error)); // If audio-only mode is enabled, pause it. if (consumer.kind === "video" && !self.videoProduce) { diff --git a/taoyao-signal-server/taoyao-boot/src/main/java/com/acgist/taoyao/boot/config/Constant.java b/taoyao-signal-server/taoyao-boot/src/main/java/com/acgist/taoyao/boot/config/Constant.java index a65d158..1b84f70 100644 --- a/taoyao-signal-server/taoyao-boot/src/main/java/com/acgist/taoyao/boot/config/Constant.java +++ b/taoyao-signal-server/taoyao-boot/src/main/java/com/acgist/taoyao/boot/config/Constant.java @@ -223,5 +223,9 @@ public interface Constant { * 消费者 */ String PRODUCING = "producing"; + /** + * 订阅类型 + */ + String SUBSCRIBE_TYPE = "subscribeType"; } diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/event/MediaProduceEvent.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/event/MediaProduceEvent.java index 6c7b719..02923b5 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/event/MediaProduceEvent.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/event/MediaProduceEvent.java @@ -1,6 +1,5 @@ package com.acgist.taoyao.signal.event; -import com.acgist.taoyao.signal.client.Client; import com.acgist.taoyao.signal.flute.media.Producer; import com.acgist.taoyao.signal.flute.media.Room; @@ -18,12 +17,13 @@ public class MediaProduceEvent extends RoomEventAdapter { private static final long serialVersionUID = 1L; - private final Client client; + /** + * 生产者 + */ private final Producer producer; - public MediaProduceEvent(Room room, Client client, Producer producer) { + public MediaProduceEvent(Room room, Producer producer) { super(room); - this.client = client; this.producer = producer; } diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/flute/media/ClientWrapper.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/flute/media/ClientWrapper.java index eda9d17..74a79f1 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/flute/media/ClientWrapper.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/flute/media/ClientWrapper.java @@ -10,7 +10,7 @@ import lombok.Getter; import lombok.Setter; /** - * Peer + * 终端包装器 * * @author acgist */ @@ -20,7 +20,6 @@ public class ClientWrapper { /** * 订阅类型 - * 如果需要订阅指定终端需要调用媒体消费信令 * * @author acgist */ @@ -35,6 +34,24 @@ public class ClientWrapper { // 没有订阅任何媒体 NONE; + public static final SubscribeType of(String value) { + for (SubscribeType type : SubscribeType.values()) { + if(type.name().equalsIgnoreCase(value)) { + return type; + } + } + return SubscribeType.ALL; + } + + public boolean consume(Producer producer) { + return switch (this) { + case NONE -> false; + case ALL_AUDIO -> producer.getKind() == Kind.AUDIO; + case ALL_VIDEO -> producer.getKind() == Kind.VIDEO; + default -> true; + }; + } + } /** @@ -53,6 +70,12 @@ public class ClientWrapper { * 终端标识 */ private final String clientId; + /** + * 订阅类型 + * 指定订阅类型终端注册或者生成媒体后会自动进行媒体推流拉流 + * 没有订阅任何媒体时需要用户自己对媒体进行消费控制 + */ + private SubscribeType subscribeType; private Object rtpCapabilities; private Object sctpCapabilities; /** @@ -98,4 +121,15 @@ public class ClientWrapper { .intValue(); } + /** + * 是否已经消费 + * + * @param producer + * @return + */ + public boolean consume(Producer producer) { + return this.producers.values().stream() + .anyMatch(v -> v.getConsumers().values().stream().anyMatch(c -> c.getProducer() == producer)); + } + } diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/flute/media/Consumer.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/flute/media/Consumer.java index 627bbb4..99ba53d 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/flute/media/Consumer.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/flute/media/Consumer.java @@ -15,7 +15,7 @@ public class Consumer { /** * 消费者终端 */ - private final ClientWrapper client; + private final ClientWrapper consumeClient; /** * 生产者 */ @@ -33,8 +33,8 @@ public class Consumer { */ private final String consumerId; - public Consumer(ClientWrapper client, Producer producer, String kind, String streamId, String consumerId) { - this.client = client; + public Consumer(ClientWrapper consumeClient, Producer producer, String kind, String streamId, String consumerId) { + this.consumeClient = consumeClient; this.producer = producer; this.kind = Kind.of(kind); this.streamId = streamId; diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/flute/media/Producer.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/flute/media/Producer.java index f7108a3..7f82c2d 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/flute/media/Producer.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/flute/media/Producer.java @@ -18,7 +18,7 @@ public class Producer { /** * 生产者终端 */ - private final ClientWrapper client; + private final ClientWrapper produceClient; /** * 媒体类型 */ @@ -36,12 +36,12 @@ public class Producer { */ private final Map consumers; - public Producer(ClientWrapper client, String kind, String streamId, String producerId) { - this.client = client; + public Producer(ClientWrapper produceClient, String kind, String streamId, String producerId) { + this.produceClient = produceClient; this.kind = Kind.of(kind); this.streamId = streamId; this.producerId = producerId; this.consumers = new ConcurrentHashMap<>(); } - + } diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaConsumeProtocol.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaConsumeProtocol.java index cf263c5..ea5cd41 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaConsumeProtocol.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaConsumeProtocol.java @@ -47,12 +47,13 @@ public class MediaConsumeProtocol extends ProtocolRoomAdapter implements Applica @Async @Override public void onApplicationEvent(MediaProduceEvent event) { - // TODO:根据类型进行消费 final Room room = event.getRoom(); - final Client client = event.getClient(); final Producer producer = event.getProducer(); - room.getClients().keySet().stream() - .filter(v -> v != client) + final ClientWrapper clientWrapper = producer.getProduceClient(); + final Client client = clientWrapper.getClient(); + room.getClients().values().stream() + .filter(v -> v.getClient() != client) + .filter(v -> v.getSubscribeType().consume(producer)) .forEach(v -> this.consume(room, v, producer)); } @@ -62,7 +63,7 @@ public class MediaConsumeProtocol extends ProtocolRoomAdapter implements Applica final Producer producer = room.producer(producerId); if(clientType == ClientType.WEB || clientType == ClientType.CAMERA) { // 请求消费 - this.consume(room, client, producer); + this.consume(room, room.clientWrapper(client), producer); } else if(clientType == ClientType.MEDIA) { // 等待消费者准备完成 final String kind = MapUtils.get(body, Constant.KIND); @@ -79,8 +80,7 @@ public class MediaConsumeProtocol extends ProtocolRoomAdapter implements Applica } final Client consumeClient = consumeClientWrapper.getClient(); consumers.put(consumerId, new Consumer(consumeClientWrapper, producer, kind, streamId, consumerId)); - final Message response = consumeClient.request(message); - client.push(response); + consumeClient.push(message); } else { // TODO:log } @@ -93,12 +93,15 @@ public class MediaConsumeProtocol extends ProtocolRoomAdapter implements Applica * @param consumeClient * @param producer */ - private void consume(Room room, Client consumeClient, Producer producer) { + private void consume(Room room, ClientWrapper consumeClientWrapper, Producer producer) { + if(producer.getProduceClient().consume(producer)) { + log.debug("已经消费:{}", consumeClientWrapper.getClientId()); + return; + } final Client mediaClient = room.getMediaClient(); - final ClientWrapper consumeClientWrapper = room.clientWrapper(consumeClient); final Transport recvTransport = consumeClientWrapper.getRecvTransport(); final Map body = new HashMap<>(); - final String clientId = consumeClient.clientId(); + final String clientId = consumeClientWrapper.getClientId(); final String streamId = producer.getStreamId() + "->" + clientId; body.put(Constant.ROOM_ID, room.getRoomId()); body.put(Constant.CLIENT_ID, clientId); diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaProduceProtocol.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaProduceProtocol.java index aa12d52..d35832c 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaProduceProtocol.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaProduceProtocol.java @@ -71,7 +71,7 @@ public class MediaProduceProtocol extends ProtocolRoomAdapter { // 全部不收:全部广播 room.broadcast(responseMessage); log.info("{}生产媒体:{} - {} - {}", clientId, kind, streamId, producerId); - this.publishEvent(new MediaProduceEvent(room, client, producer)); + this.publishEvent(new MediaProduceEvent(room, producer)); } } diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaTransportWebRtcCreateProtocol.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaTransportWebRtcCreateProtocol.java index 6a6a4d6..c60a5dd 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaTransportWebRtcCreateProtocol.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaTransportWebRtcCreateProtocol.java @@ -14,6 +14,7 @@ import com.acgist.taoyao.boot.utils.MapUtils; import com.acgist.taoyao.boot.utils.NetUtils; import com.acgist.taoyao.signal.client.Client; import com.acgist.taoyao.signal.client.ClientType; +import com.acgist.taoyao.signal.event.MediaProduceEvent; import com.acgist.taoyao.signal.flute.media.ClientWrapper; import com.acgist.taoyao.signal.flute.media.Room; import com.acgist.taoyao.signal.flute.media.Transport; @@ -74,6 +75,7 @@ public class MediaTransportWebRtcCreateProtocol extends ProtocolRoomAdapter { } // 拷贝属性 recvTransport.copy(responseBody); + this.produce(room, clientWrapper); } // 生产者 final Boolean producing = MapUtils.getBoolean(body, Constant.PRODUCING); @@ -112,4 +114,17 @@ public class MediaTransportWebRtcCreateProtocol extends ProtocolRoomAdapter { }); } + /** + * 生产数据 + * + * @param room + * @param clientWrapper + */ + private void produce(Room room, ClientWrapper clientWrapper) { + room.getClients().values().stream() + .filter(v -> v != clientWrapper) + .flatMap(v -> v.getProducers().values().stream()) + .forEach(producer -> this.publishEvent(new MediaProduceEvent(room, producer))); + } + } diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/room/RoomEnterProtocol.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/room/RoomEnterProtocol.java index e7686dc..86c9b90 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/room/RoomEnterProtocol.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/room/RoomEnterProtocol.java @@ -12,6 +12,7 @@ import com.acgist.taoyao.boot.utils.MapUtils; import com.acgist.taoyao.signal.client.Client; import com.acgist.taoyao.signal.client.ClientType; import com.acgist.taoyao.signal.flute.media.ClientWrapper; +import com.acgist.taoyao.signal.flute.media.ClientWrapper.SubscribeType; import com.acgist.taoyao.signal.flute.media.Room; import com.acgist.taoyao.signal.protocol.ProtocolRoomAdapter; @@ -47,10 +48,11 @@ public class RoomEnterProtocol extends ProtocolRoomAdapter { public RoomEnterProtocol() { super("进入房间信令", SIGNAL); } - + @Override public void execute(String clientId, ClientType clientType, Room room, Client client, Client mediaClient, Message message, Map body) { final String password = MapUtils.get(body, Constant.PASSWORD); + final String subscribeType = MapUtils.get(body, Constant.SUBSCRIBE_TYPE); final Object rtpCapabilities = MapUtils.get(body, Constant.RTP_CAPABILITIES); final Object sctpCapabilities = MapUtils.get(body, Constant.SCTP_CAPABILITIES); final String roomPassowrd = room.getPassword(); @@ -59,6 +61,7 @@ public class RoomEnterProtocol extends ProtocolRoomAdapter { } // 进入房间 final ClientWrapper clientWrapper = room.enter(client); + clientWrapper.setSubscribeType(SubscribeType.of(subscribeType)); clientWrapper.setRtpCapabilities(rtpCapabilities); clientWrapper.setSctpCapabilities(sctpCapabilities); // 发送通知