diff --git a/README.md b/README.md index 2846ea1..6718330 100644 --- a/README.md +++ b/README.md @@ -54,7 +54,10 @@ |混音|支持|完成|多路混音| |水印|支持|完成|视频水印| -> 注意:Web终端不支持同时进入多个视频房间,安卓终端支持同时进入多个视频房间。 +### 注意事项 + +* Web终端不支持同时进入多个视频房间,安卓终端支持同时进入多个视频房间。 +* 服务端录制只支持视频房间(会议)模式,视频会话(监控)模式不支持服务器录制。 ## Docker diff --git a/taoyao-client-media/src/Taoyao.js b/taoyao-client-media/src/Taoyao.js index 45e4d32..7b86045 100644 --- a/taoyao-client-media/src/Taoyao.js +++ b/taoyao-client-media/src/Taoyao.js @@ -778,25 +778,39 @@ class Taoyao { */ async mediaRecord(message, body) { const me = this; - const { roomId, rtcpMux, comedia, clientId, host, audioPort, videoProt, rtpCapabilities, audioProducerId, audioStreamId, videoProducerId, videoStreamId } = body; + const { enabled, roomId } = body; + const room = this.rooms.get(roomId); + if(enabled) { + await me.mediaRecordStart(message, body, room); + } else { + await me.mediaRecordStop(message, body, room); + } + } + + async mediaRecordStart(message, body, room) { + const { roomId, clientId, host, audioPort, videoPort, rtpCapabilities, audioStreamId, videoStreamId, audioProducerId, videoProducerId } = body; const plainTransportOptions = { ...config.mediasoup.plainTransportOptions, - rtcpMux: rtcpMux, - comedia: comedia + rtcpMux : true, + comedia : true }; - const room = this.rooms.get(roomId); - const transport = await room.mediasoupRouter.createPlainTransport(plainTransportOptions); - me.transportEvent("plain", roomId, transport); - transport.clientId = clientId; - room.transports.set(transport.id, transport); - let audioConsumerId; let videoConsumerId; - await transport.connect({ - ip: '127.0.0.1', - port: remoteRtpPort, - rtcpPort: remoteRtcpPort - }); + let audioConsumerId; + let audioTransportId; + let videoTransportId; if(audioProducerId) { + const audioTransport = await room.mediasoupRouter.createPlainTransport(plainTransportOptions); + me.transportEvent("plain", roomId, audioTransport); + audioTransport.clientId = clientId; + room.transports.set(audioTransport.id, audioTransport); + audioTransport.observer.on("close", () => { + room.transports.delete(audioTransport.id) + }); + await audioTransport.connect({ + ip : host, + port : audioPort, + rtcpPort : audioPort + }); const audioConsumer = await transport.consume({ producerId: audioProducerId, rtpCapabilities, @@ -805,10 +819,25 @@ class Taoyao { audioConsumerId = audioConsumer.id; await audioConsumer.resume(); audioConsumer.clientId = clientId; - audioConsumer.streamId = videoStreamId; + audioConsumer.streamId = audioStreamId; room.consumers.set(audioConsumer.id, audioConsumer); + audioConsumer.observer.on("close", () => { + room.consumers.delete(audioConsumer.id); + }); } if(videoProducerId) { + const videoTransport = await room.mediasoupRouter.createPlainTransport(plainTransportOptions); + me.transportEvent("plain", roomId, videoTransport); + videoTransport.clientId = clientId; + room.transports.set(videoTransport.id, videoTransport); + videoTransport.observer.on("close", () => { + room.transports.delete(videoTransport.id) + }); + await videoTransport.connect({ + ip : host, + port : videoPort, + rtcpPort : videoPort + }); const videoConsumer = await transport.consume({ producerId: videoProducerId, rtpCapabilities, @@ -819,21 +848,44 @@ class Taoyao { videoConsumer.clientId = clientId; videoConsumer.streamId = videoStreamId; room.consumers.set(videoConsumer.id, videoConsumer); + videoConsumer.observer.on("close", () => { + room.consumers.delete(videoConsumer.id); + }); } - console.info(transport.tuple) - console.info(transport.rtcpTuple) message.body = { - ip : transport.tuple.localIp, - port : transport.tuple.localPort, - roomId : roomId, - rtcpPort : transport.rtcpTuple ? transport.rtcpTuple.localPort : undefined, - transportId : transport.id, - audioConsumerId : audioConsumerId, - videoConsumerId : videoConsumerId, + roomId : roomId, + audioConsumerId : audioConsumerId, + videoConsumerId : videoConsumerId, + audioTransportId : audioTransportId, + videoTransportId : videoTransportId, }; me.push(message); } + async mediaRecordStart(message, body, room) { + const { audioStreamId, videoStreamId, audioConsumerId, videoConsumerId, audioTransportId, videoTransportId } = body; + const audioConsumer = room.consumers.get(audioConsumerId); + if(audioConsumer) { + audioConsumer.close(); + room.consumers.delete(audioConsumerId); + } + const videoConsumer = room.consumers.get(videoConsumerId); + if(videoConsumer) { + videoConsumer.close(); + room.consumers.delete(videoConsumerId); + } + const audioTransport = room.transports.get(audioTransportId); + if(audioTransport) { + audioTransport.close(); + room.transports.delete(audioTransportId); + } + const videoTransport = room.transports.get(videoTransportId); + if(videoTransport) { + videoTransport.close(); + room.transports.delete(videoTransportId); + } + } + async mediaConsume(message, body) { const { roomId, @@ -1363,20 +1415,18 @@ class Taoyao { const { roomId, rtcpMux, comedia, clientId, enableSctp, numSctpStreams, enableSrtp, srtpCryptoSuite } = body; const plainTransportOptions = { ...config.mediasoup.plainTransportOptions, - rtcpMux: rtcpMux, - comedia: comedia, - enableSctp: enableSctp || Boolean(numSctpStreams), - numSctpStreams: numSctpStreams || 0, - enableSrtp: enableSrtp, - srtpCryptoSuite: srtpCryptoSuite, + rtcpMux : rtcpMux, + comedia : comedia, + enableSctp : enableSctp || Boolean(numSctpStreams), + numSctpStreams : numSctpStreams || 0, + enableSrtp : enableSrtp, + srtpCryptoSuite : srtpCryptoSuite, }; const room = this.rooms.get(roomId); const transport = await room.mediasoupRouter.createPlainTransport(plainTransportOptions); me.transportEvent("plain", roomId, transport); transport.clientId = clientId; room.transports.set(transport.id, transport); - console.info(transport.tuple) - console.info(transport.rtcpTuple) message.body = { ip : transport.tuple.localIp, port : transport.tuple.localPort, diff --git a/taoyao-signal-server/taoyao-boot/src/main/java/com/acgist/taoyao/boot/config/Constant.java b/taoyao-signal-server/taoyao-boot/src/main/java/com/acgist/taoyao/boot/config/Constant.java index 92b84fa..e3890f5 100644 --- a/taoyao-signal-server/taoyao-boot/src/main/java/com/acgist/taoyao/boot/config/Constant.java +++ b/taoyao-signal-server/taoyao-boot/src/main/java/com/acgist/taoyao/boot/config/Constant.java @@ -109,6 +109,14 @@ public interface Constant { * 密码 */ String PASSWORD = "password"; + /** + * 端口 + */ + String PORT = "port"; + /** + * 地址 + */ + String HOST = "host"; /** * 数据 */ @@ -233,6 +241,10 @@ public interface Constant { * 状态 */ String ENABLED = "enabled"; + /** + * 文件路径 + */ + String FILEPATH = "filepath"; /** * 是否是消费者 */ diff --git a/taoyao-signal-server/taoyao-boot/src/main/java/com/acgist/taoyao/boot/config/FfmpegProperties.java b/taoyao-signal-server/taoyao-boot/src/main/java/com/acgist/taoyao/boot/config/FfmpegProperties.java index 0a776f6..cce65af 100644 --- a/taoyao-signal-server/taoyao-boot/src/main/java/com/acgist/taoyao/boot/config/FfmpegProperties.java +++ b/taoyao-signal-server/taoyao-boot/src/main/java/com/acgist/taoyao/boot/config/FfmpegProperties.java @@ -31,6 +31,8 @@ public class FfmpegProperties { private String storageImagePath; @Schema(title = "视频存储目录", description = "视频存储目录") private String storageVideoPath; + @Schema(title = "录像录像地址", description = "录像录像地址") + private String host; @Schema(title = "录像最小端口", description = "录像最小端口") private Integer minPort; @Schema(title = "录像最大端口", description = "录像最大端口") diff --git a/taoyao-signal-server/taoyao-boot/src/main/java/com/acgist/taoyao/boot/utils/NetUtils.java b/taoyao-signal-server/taoyao-boot/src/main/java/com/acgist/taoyao/boot/utils/NetUtils.java index 2d8981a..01d1d04 100644 --- a/taoyao-signal-server/taoyao-boot/src/main/java/com/acgist/taoyao/boot/utils/NetUtils.java +++ b/taoyao-signal-server/taoyao-boot/src/main/java/com/acgist/taoyao/boot/utils/NetUtils.java @@ -1,7 +1,9 @@ package com.acgist.taoyao.boot.utils; import java.math.BigInteger; +import java.net.DatagramSocket; import java.net.InetAddress; +import java.net.SocketException; import java.net.UnknownHostException; import java.util.BitSet; @@ -209,5 +211,25 @@ public final class NetUtils { // 本地地址:A/B/C类本地地址 inetAddress.isSiteLocalAddress(); } - + + /** + * 扫描端口 + * + * @param min 最小端口 + * @param max 最大端口 + * + * @return 端口 + */ + public static final int scanPort(int min, int max) { + for (int port = min; port < max; port++) { + try (final DatagramSocket socket = new DatagramSocket(port)) { + socket.disconnect(); + return port; + } catch (SocketException e) { + // 忽略 + } + } + return 0; + } + } diff --git a/taoyao-signal-server/taoyao-boot/src/main/java/com/acgist/taoyao/boot/utils/ScriptUtils.java b/taoyao-signal-server/taoyao-boot/src/main/java/com/acgist/taoyao/boot/utils/ScriptUtils.java index c15b346..2f70de5 100644 --- a/taoyao-signal-server/taoyao-boot/src/main/java/com/acgist/taoyao/boot/utils/ScriptUtils.java +++ b/taoyao-signal-server/taoyao-boot/src/main/java/com/acgist/taoyao/boot/utils/ScriptUtils.java @@ -1,16 +1,20 @@ package com.acgist.taoyao.boot.utils; +import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; import org.apache.commons.lang3.StringUtils; import com.acgist.taoyao.boot.model.MessageCode; import com.acgist.taoyao.boot.model.MessageCodeException; +import lombok.Getter; +import lombok.Setter; import lombok.extern.slf4j.Slf4j; /** - * 脚本工具 + * 命令工具 * * @author acgist */ @@ -27,36 +31,176 @@ public final class ScriptUtils { * * @return 执行结果 */ - public static final String execute(String script) { + public static final ScriptExecutor execute(String script) { if(StringUtils.isEmpty(script)) { throw MessageCodeException.of(MessageCode.CODE_1002, "无效命令:" + script); } - String result = null; - Process process = null; + final ScriptExecutor executor = new ScriptExecutor(script); try { - process = Runtime.getRuntime().exec(script); - try( - final InputStream input = process.getInputStream(); - final InputStream error = process.getErrorStream(); - ) { - final String inputValue = new String(input.readAllBytes()); - final String errorValue = new String(input.readAllBytes()); - log.info(""" - 执行命令:{} - 执行结果:{} - 失败结果:{} - """, script, inputValue, errorValue); - result = StringUtils.isEmpty(inputValue) ? errorValue : inputValue; - } + executor.execute(); } catch (Exception e) { log.error("执行命令异常:{}", script, e); - result = e.getMessage(); - } finally { - if(process != null) { - process.destroy(); + } + return executor; + } + + /** + * 命令执行器 + * + * @author acgist + */ + @Getter + @Setter + public static final class ScriptExecutor { + + /** + * 执行结果 + */ + private int code; + /** + * 是否正在运行 + */ + private boolean running; + /** + * 命令进程 + */ + private Process process; + /** + * 命令进程Builder + */ + private ProcessBuilder processBuilder; + /** + * 执行命令 + */ + private final String script; + /** + * 日志输出 + */ + private final StringBuilder input; + /** + * 错误输出 + */ + private final StringBuilder error; + + /** + * @param script 执行命令 + */ + public ScriptExecutor(String script) { + this.script = script; + this.input = new StringBuilder(); + this.error = new StringBuilder(); + } + + /** + * 执行命令 + * + * @throws IOException IO异常 + * @throws InterruptedException 线程异常 + */ + public void execute() throws InterruptedException, IOException { + final boolean linux = FileUtils.linux(); + if(linux) { + this.processBuilder = new ProcessBuilder("/bin/bash", "-c", this.script); + this.process = this.processBuilder.start(); + } else { + this.processBuilder = new ProcessBuilder("cmd", "/c", this.script); + this.process = this.processBuilder.start(); + } + log.debug("开始执行命令:{}", this.script); + this.running = true; + try ( + final InputStream input = this.process.getInputStream(); + final InputStream error = this.process.getErrorStream(); + ) { + this.streamThread(linux, "TaoyaoScriptInput", this.input, input); + this.streamThread(linux, "TaoyaoScriptError", this.error, error); + this.code = this.process.waitFor(); + } + this.running = false; + log.debug(""" + 结束执行命令:{} + 执行状态:{} + 执行日志:{} + 错误日志:{} + """, this.script, this.code, this.input, this.error); + } + + /** + * @param linux 是否Linux + * @param name 线程名称 + * @param builder 日志记录 + * @param input 日志输入流 + */ + private void streamThread(boolean linux, String name, StringBuilder builder, InputStream input) { + final Thread streamThread = new Thread(() -> { + try { + int length; + final byte[] bytes = new byte[1024]; + while(this.running && (length = input.read(bytes)) >= 0) { + builder.append(linux ? new String(bytes, 0, length) : new String(bytes, 0, length, "GBK")); + } + } catch (Exception e) { + log.error("读取执行命令日志异常", e); + } + }); + streamThread.setName(name); + streamThread.setDaemon(true); + streamThread.start(); + } + + /** + * 结束命令 + */ + public void stop() { + this.stop(null); + } + + /** + * 结束命令 + * + * @param script 结束命令 + */ + public void stop(String script) { + // 等待时间 + long wait = 0; + // 使用按键结束 + if(StringUtils.isNotEmpty(script)) { + try (final OutputStream output = this.process.getOutputStream();) { + output.write(script.getBytes()); + } catch (Exception e) { + log.error("结束命令异常:{}", this.script, e); + } + wait = 5000; + } + // 等待正常结束 + while(this.process.isAlive() && wait >= 0) { + wait -= 10; + try { + Thread.sleep(10); + } catch (InterruptedException e) { + Thread.yield(); + } + } + if(this.process.isAlive()) { + log.info("强制结束命令:{}", this.script); + // 所有子进程 + this.process.children().forEach(process -> { + process.destroy(); + }); + // 当前父进程 + this.process.destroy(); + } else { + log.debug("正常结束命令:{}", this.script); } } - return result; + + /** + * @return 执行结果 + */ + public String getResult() { + return this.input.isEmpty() ? this.error.toString() : this.input.toString(); + } + } } diff --git a/taoyao-signal-server/taoyao-server/src/main/resources/application.yml b/taoyao-signal-server/taoyao-server/src/main/resources/application.yml index d66f6d2..2fd26d1 100644 --- a/taoyao-signal-server/taoyao-server/src/main/resources/application.yml +++ b/taoyao-signal-server/taoyao-server/src/main/resources/application.yml @@ -228,21 +228,21 @@ taoyao: # SDP:VP8 | H264 sdp: | v=0 - o=- 0 0 IN IP4 127.0.0.1 + o=- 0 0 IN IP4 %s s=TaoyaoRecord t=0 0 m=audio %d RTP/AVP 97 - c=IN IP4 127.0.0.1 - a=rtpmap:97 opus/48000/2 + c=IN IP4 %s + a=rtpmap:97 OPUS/48000/2 a=fmtp:97 sprop-stereo=1 m=video %d RTP/AVP 96 - c=IN IP4 127.0.0.1 + c=IN IP4 %s a=rtpmap:96 H264/90000 a=fmtp:96 packetization-mode=1 # 录像命令 - record: ffmpeg -y -protocol_whitelist "file,rtp,udp" -i %s %s + record: ffmpeg -protocol_whitelist "file,rtp,udp" -y -i %s %s # 截图命令 - preview: ffmpeg -y -i %s -t %d -f image2 %s + preview: ffmpeg -y -i %s -ss %d -vframes 1 -f image2 %s # 时长命令 duration: ffprobe -i %s -show_entries format=duration # 存储目录 @@ -251,6 +251,8 @@ taoyao: storage-image-path: /data/taoyao/storage/image # 视频存储目录 storage-video-path: /data/taoyao/storage/video + # 录像地址 + host: 127.0.0.1 # 端口范围 min-port: 50000 max-port: 59999 diff --git a/taoyao-signal-server/taoyao-server/src/test/java/com/acgist/taoyao/RecorderTest.java b/taoyao-signal-server/taoyao-server/src/test/java/com/acgist/taoyao/RecorderTest.java index 7f7f732..1ae673e 100644 --- a/taoyao-signal-server/taoyao-server/src/test/java/com/acgist/taoyao/RecorderTest.java +++ b/taoyao-signal-server/taoyao-server/src/test/java/com/acgist/taoyao/RecorderTest.java @@ -11,22 +11,27 @@ public class RecorderTest { public void testStart() throws InterruptedException { final FfmpegProperties ffmpegProperties = new FfmpegProperties(); ffmpegProperties.setStorageVideoPath("D:\\tmp\\video"); + ffmpegProperties.setMinPort(50000); + ffmpegProperties.setMaxPort(59999); + ffmpegProperties.setHost("127.0.0.1"); ffmpegProperties.setSdp(""" v=0 - o=- 0 0 IN IP4 127.0.0.1 + o=- 0 0 IN IP4 %s s=TaoyaoRecord t=0 0 m=audio %d RTP/AVP 97 - c=IN IP4 127.0.0.1 - a=rtpmap:97 opus/48000/2 + c=IN IP4 %s + a=rtpmap:97 OPUS/48000/2 a=fmtp:97 sprop-stereo=1 m=video %d RTP/AVP 96 - c=IN IP4 127.0.0.1 + c=IN IP4 %s a=rtpmap:96 H264/90000 a=fmtp:96 packetization-mode=1 """); - ffmpegProperties.setRecord("ffmpeg -y -protocol_whitelist \"file,rtp,udp\" -i %s %s"); - final Recorder recorder = new Recorder(ffmpegProperties); + ffmpegProperties.setRecord("ffmpeg -protocol_whitelist \"file,rtp,udp\" -y -i %s %s"); + ffmpegProperties.setPreview("ffmpeg -y -i %s -ss %d -vframes 1 -f image2 %s"); + ffmpegProperties.setDuration("ffprobe -i %s -show_entries format=duration"); + final Recorder recorder = new Recorder("taoyao", ffmpegProperties); recorder.start(); Thread.sleep(20 * 1000); recorder.stop(); diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/party/media/Recorder.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/party/media/Recorder.java index bbe90ce..aa1c82f 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/party/media/Recorder.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/party/media/Recorder.java @@ -1,14 +1,20 @@ package com.acgist.taoyao.signal.party.media; +import java.io.File; import java.io.IOException; -import java.io.InputStream; import java.nio.file.Files; import java.nio.file.Paths; import java.nio.file.StandardOpenOption; -import java.util.UUID; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.apache.commons.lang3.math.NumberUtils; import com.acgist.taoyao.boot.config.FfmpegProperties; import com.acgist.taoyao.boot.utils.FileUtils; +import com.acgist.taoyao.boot.utils.NetUtils; +import com.acgist.taoyao.boot.utils.ScriptUtils; +import com.acgist.taoyao.boot.utils.ScriptUtils.ScriptExecutor; import lombok.Getter; import lombok.Setter; @@ -41,41 +47,49 @@ public class Recorder { */ private Integer videoPort; /** - * 传输通道 + * 音频流ID */ - private Transport transport; + private String audioStreamId; /** - * 音频消费者 + * 音频生产者ID */ - private Consumer audioConsumer; + private String audioProducerId; /** - * 视频消费者 + * 音频消费者ID */ - private Consumer videoConsumer; + private String audioConsumerId; /** - * 录像进程 + * 音频通道ID */ - private Process process; + private String audioTransportId; /** - * 进程Builder + * 视频流ID */ - private ProcessBuilder processBuilder; + private String videoStreamId; + /** + * 视频生产者ID + */ + private String videoProducerId; + /** + * 视频消费者ID + */ + private String videoConsumerId; + /** + * 视频通道ID + */ + private String videoTransportId; /** * 录制线程 */ private Thread thread; /** - * 日志线程 + * 视频时长 */ - private Thread inputThread; + private Double duration; /** - * 异常线程 + * 命令执行器 */ - private Thread errorThread; - /** - * 命令 - */ - private String command; + private ScriptExecutor scriptExecutor; /** * 文件路径 */ @@ -84,6 +98,10 @@ public class Recorder { * SDP路径 */ private final String sdpfile; + /** + * 预览图片 + */ + private final String preview; /** * 文件路径 */ @@ -93,15 +111,18 @@ public class Recorder { */ private final FfmpegProperties ffmpegProperties; - public Recorder(FfmpegProperties ffmpegProperties) { + /** + * @param name 录像名称 + * @param ffmpegProperties FFmpeg配置 + */ + public Recorder(String name, FfmpegProperties ffmpegProperties) { this.close = false; this.running = false; this.ffmpegProperties = ffmpegProperties; - final String id = UUID.randomUUID().toString(); - this.folder = Paths.get(ffmpegProperties.getStorageVideoPath(), id).toAbsolutePath().toString(); - this.sdpfile = Paths.get(this.folder, "taoyao.sdp").toAbsolutePath().toString(); - this.filepath = Paths.get(this.folder, "taoyao.mp4").toAbsolutePath().toString(); - this.command = String.format(this.ffmpegProperties.getRecord(), this.sdpfile, this.filepath); + this.folder = Paths.get(ffmpegProperties.getStorageVideoPath(), name).toAbsolutePath().toString(); + this.sdpfile = Paths.get(this.folder, "taoyao.sdp").toAbsolutePath().toString(); + this.preview = Paths.get(this.folder, "taoyao.jpg").toAbsolutePath().toString(); + this.filepath = Paths.get(this.folder, "taoyao.mp4").toAbsolutePath().toString(); FileUtils.mkdirs(this.folder); } @@ -114,11 +135,11 @@ public class Recorder { return; } this.running = true; - this.thread = new Thread(this::record); - this.thread.setDaemon(true); - this.thread.setName("TaoyaoRecord"); - this.thread.start(); } + this.thread = new Thread(this::record); + this.thread.setDaemon(true); + this.thread.setName("TaoyaoRecord"); + this.thread.start(); } /** @@ -126,62 +147,19 @@ public class Recorder { */ private void record() { this.buildSdpfile(); - int status = 0; - final StringBuilder input = new StringBuilder(); - final StringBuilder error = new StringBuilder(); + final String recordScript = String.format(this.ffmpegProperties.getRecord(), this.sdpfile, this.filepath); + this.scriptExecutor = new ScriptExecutor(recordScript); try { - final boolean linux = FileUtils.linux(); - if(linux) { - this.processBuilder = new ProcessBuilder("/bin/bash", "-c", this.command); - this.process = processBuilder.start(); - } else { - this.processBuilder = new ProcessBuilder("cmd", "/c", this.command); - this.process = processBuilder.start(); - } log.debug(""" 开始录像:{} - 录像命令:{} - """, this.filepath, this.command); - this.inputThread = new Thread(() -> { - try (final InputStream inputStream = this.process.getInputStream()) { - int length; - final byte[] bytes = new byte[1024]; - while(this.running && !this.close && (length = inputStream.read(bytes)) >= 0) { - input.append(linux ? new String(bytes, 0, length) : new String(bytes, 0, length, "GBK")); - } - } catch (Exception e) { - log.error("读取录像日志异常", e); - } - }); - this.inputThread.setDaemon(true); - this.inputThread.setName("TaoyaoRecordInput"); - this.inputThread.start(); - this.errorThread = new Thread(() -> { - try (final InputStream inputStream = this.process.getErrorStream();) { - int length; - final byte[] bytes = new byte[1024]; - while(this.running && !this.close && (length = inputStream.read(bytes)) >= 0) { - error.append(linux ? new String(bytes, 0, length) : new String(bytes, 0, length, "GBK")); - } - } catch (Exception e) { - log.error("读取录像错误异常", e); - } - }); - this.errorThread.setDaemon(true); - this.errorThread.setName("TaoyaoRecordError"); - this.errorThread.start(); - status = this.process.waitFor(); + 录像端口:{} - {} + """, this.folder, this.audioPort, this.videoPort); + this.scriptExecutor.execute(); } catch (Exception e) { - log.error("录像异常:{}", this.command, e); + log.error("录像异常:{}", recordScript, e); } finally { this.stop(); } - log.debug(""" - 结束录像:{} - 结束状态:{} - 录像日志:{} - 异常日志:{} - """, this.filepath, status, input, error); } /** @@ -189,9 +167,20 @@ public class Recorder { */ private void buildSdpfile() { try { + this.audioPort = NetUtils.scanPort(this.ffmpegProperties.getMinPort(), this.ffmpegProperties.getMaxPort()); + // 预留控制端口 + this.videoPort = NetUtils.scanPort(this.audioPort + 16, this.ffmpegProperties.getMaxPort()); + final String sdp = String.format( + this.ffmpegProperties.getSdp(), + this.ffmpegProperties.getHost(), + this.audioPort, + this.ffmpegProperties.getHost(), + this.videoPort, + this.ffmpegProperties.getHost() + ); Files.write( Paths.get(this.sdpfile), - String.format(this.ffmpegProperties.getSdp(), 8888, 9999).getBytes(), + sdp.getBytes(), StandardOpenOption.WRITE, StandardOpenOption.CREATE ); } catch (IOException e) { @@ -199,6 +188,41 @@ public class Recorder { } } + /** + * 视频预览截图 + */ + private void preview() { + int time = 2; + final File file = Paths.get(this.preview).toFile(); + while(time > 0 && !(file.exists() && file.length() > 0L)) { + log.debug("视频预览截图:{}", this.preview); + final String previewScript = String.format(this.ffmpegProperties.getPreview(), this.filepath, time, this.preview); + ScriptUtils.execute(previewScript); + time /= 2; + } + } + + /** + * 视频时长 + */ + private void duration() { + log.debug("视频时长:{}", this.filepath); + final String durationScript = String.format(this.ffmpegProperties.getDuration(), this.filepath); + final ScriptExecutor executor = ScriptUtils.execute(durationScript); + final Pattern pattern = Pattern.compile(".*duration\\=([0-9\\.]+).*"); + final Matcher matcher = pattern.matcher(executor.getResult()); + String duration = null; + if(matcher.find()) { + duration = matcher.group(matcher.groupCount()).strip(); + } + if(NumberUtils.isCreatable(duration)) { + this.duration = Double.parseDouble(duration); + } else { + this.duration = 0D; + } + + } + /** * 结束录像 */ @@ -209,16 +233,13 @@ public class Recorder { } this.close = true; } - if(this.process == null) { + if(this.scriptExecutor == null) { return; } - log.debug("结束媒体录像:{}", this.filepath); - // 所有子进程 - this.process.children().forEach(process -> { - process.destroy(); - }); - // 当前父进程 - this.process.destroy(); + log.debug("结束媒体录像:{}", this.folder); + this.scriptExecutor.stop("q"); + this.preview(); + this.duration(); } } diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaRecordProtocol.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaRecordProtocol.java index 107e7cc..5eec23a 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaRecordProtocol.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/media/MediaRecordProtocol.java @@ -1,14 +1,19 @@ package com.acgist.taoyao.signal.protocol.media; +import java.util.HashMap; import java.util.Map; +import java.util.UUID; import com.acgist.taoyao.boot.annotation.Description; import com.acgist.taoyao.boot.annotation.Protocol; +import com.acgist.taoyao.boot.config.Constant; import com.acgist.taoyao.boot.config.FfmpegProperties; import com.acgist.taoyao.boot.model.Message; +import com.acgist.taoyao.boot.utils.MapUtils; import com.acgist.taoyao.signal.client.Client; import com.acgist.taoyao.signal.client.ClientType; import com.acgist.taoyao.signal.party.media.ClientWrapper; +import com.acgist.taoyao.signal.party.media.Kind; import com.acgist.taoyao.signal.party.media.Recorder; import com.acgist.taoyao.signal.party.media.Room; import com.acgist.taoyao.signal.protocol.ProtocolRoomAdapter; @@ -47,7 +52,15 @@ public class MediaRecordProtocol extends ProtocolRoomAdapter { @Override public void execute(String clientId, ClientType clientType, Room room, Client client, Client mediaClient, Message message, Map body) { - + final Boolean enabled = MapUtils.get(body, Constant.ENABLED, Boolean.TRUE); + String filepath; + if(enabled) { + filepath = this.start(room, client, mediaClient); + } else { + filepath = this.stop(room, client, mediaClient); + } + body.put(Constant.FILEPATH, filepath); + client.push(message); } /** @@ -60,25 +73,103 @@ public class MediaRecordProtocol extends ProtocolRoomAdapter { public Message execute(String roomId, String clientId, Boolean enabled) { final Room room = this.roomManager.room(roomId); final Client client = this.clientManager.clients(clientId); + final Client mediaClient = room.getMediaClient(); + String filepath; if(enabled) { - this.record(room, client); + filepath = this.start(room, client, mediaClient); + } else { + filepath = this.stop(room, client, mediaClient); } - return null; + return Message.success(Map.of( + Constant.ENABLED, enabled, + Constant.FILEPATH, filepath + )); } /** * 开始录制 + * + * @param room 房间 + * @param client 终端 + * @param mediaClient 媒体终端 + * + * @return 文件地址 */ - private void record(Room room, Client client) { + private String start(Room room, Client client, Client mediaClient) { final ClientWrapper clientWrapper = room.clientWrapper(client); synchronized (clientWrapper) { - if(clientWrapper.getRecorder() != null) { - return; + final Recorder recorder = clientWrapper.getRecorder(); + if(recorder != null) { + return recorder.getFilepath(); } - final Recorder recorder = new Recorder(this.ffmpegProperties); - recorder.start(); - clientWrapper.setRecorder(recorder); } + // 打开录制线程 + final Recorder recorder = new Recorder(UUID.randomUUID().toString(), this.ffmpegProperties); + recorder.start(); + clientWrapper.setRecorder(recorder); + // 打开媒体录制 + final Message message = this.build(); + final Map body = new HashMap<>(); + body.put("audioPort", recorder.getAudioPort()); + body.put("videoPort", recorder.getVideoPort()); + body.put(Constant.HOST, this.ffmpegProperties.getHost()); + body.put(Constant.ROOM_ID, room.getRoomId()); + body.put(Constant.ENABLED, true); + body.put(Constant.CLIENT_ID, client.clientId()); + body.put(Constant.RTP_CAPABILITIES, clientWrapper.getRtpCapabilities()); + clientWrapper.getProducers().values().forEach(producer -> { + if(producer.getKind() == Kind.AUDIO) { + recorder.setAudioStreamId(Constant.STREAM_ID_CONSUMER.apply(producer.getStreamId(), client.clientId())); + body.put("audioStreamId", recorder.getAudioStreamId()); + body.put("audioProducerId", producer.getProducerId()); + } else if(producer.getKind() == Kind.VIDEO) { + recorder.setAudioStreamId(Constant.STREAM_ID_CONSUMER.apply(producer.getStreamId(), client.clientId())); + body.put("videoStreamId", recorder.getVideoStreamId()); + body.put("videoProducerId", producer.getProducerId()); + } else { + // 忽略 + } + }); + message.setBody(body); + mediaClient.request(message); + return recorder.getFilepath(); + } + + /** + * 关闭录像 + * + * @param room 房间 + * @param client 终端 + * @param mediaClient 媒体终端 + * + * @return 文件地址 + */ + private String stop(Room room, Client client, Client mediaClient) { + final Recorder recorder; + final ClientWrapper clientWrapper = room.clientWrapper(client); + synchronized (clientWrapper) { + recorder = clientWrapper.getRecorder(); + if(recorder == null) { + return null; + } + } + // 关闭录制线程 + recorder.stop(); + clientWrapper.setRecorder(null); + // 关闭媒体录制 + final Message message = this.build(); + final Map body = new HashMap<>(); + body.put("audioStreamId", recorder.getAudioStreamId()); + body.put("videoStreamId", recorder.getVideoStreamId()); + body.put("audioConsumerId", recorder.getAudioConsumerId()); + body.put("videoConsumerId", recorder.getVideoConsumerId()); + body.put("audioTransportId", recorder.getAudioTransportId()); + body.put("videoTransportId", recorder.getVideoConsumerId()); + body.put(Constant.ROOM_ID, room.getRoomId()); + body.put(Constant.ENABLED, false); + message.setBody(body); + mediaClient.request(message); + return recorder.getFilepath(); } } diff --git a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/platform/PlatformScriptProtocol.java b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/platform/PlatformScriptProtocol.java index af978d5..14e1185 100644 --- a/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/platform/PlatformScriptProtocol.java +++ b/taoyao-signal-server/taoyao-signal/src/main/java/com/acgist/taoyao/signal/protocol/platform/PlatformScriptProtocol.java @@ -7,6 +7,7 @@ import com.acgist.taoyao.boot.config.Constant; import com.acgist.taoyao.boot.model.Message; import com.acgist.taoyao.boot.utils.MapUtils; import com.acgist.taoyao.boot.utils.ScriptUtils; +import com.acgist.taoyao.boot.utils.ScriptUtils.ScriptExecutor; import com.acgist.taoyao.signal.client.Client; import com.acgist.taoyao.signal.client.ClientType; import com.acgist.taoyao.signal.protocol.ProtocolClientAdapter; @@ -45,7 +46,8 @@ public class PlatformScriptProtocol extends ProtocolClientAdapter { @Override public void execute(String clientId, ClientType clientType, Client client, Message message, Map body) { final String script = MapUtils.get(body, Constant.SCRIPT); - final String result = ScriptUtils.execute(script); + final ScriptExecutor executor = ScriptUtils.execute(script); + final String result = executor.getResult(); log.info(""" 执行终端:{} 执行命令:{}