[*] 服务端录制

This commit is contained in:
acgist
2023-06-01 07:32:35 +08:00
parent 441e99483b
commit a06f6a251f
11 changed files with 521 additions and 167 deletions

View File

@@ -54,7 +54,10 @@
|混音|支持|完成|多路混音|
|水印|支持|完成|视频水印|
> 注意Web终端不支持同时进入多个视频房间安卓终端支持同时进入多个视频房间。
### 注意事项
* Web终端不支持同时进入多个视频房间安卓终端支持同时进入多个视频房间。
* 服务端录制只支持视频房间(会议)模式,视频会话(监控)模式不支持服务器录制。
## Docker

View File

@@ -778,25 +778,39 @@ class Taoyao {
*/
async mediaRecord(message, body) {
const me = this;
const { roomId, rtcpMux, comedia, clientId, host, audioPort, videoProt, rtpCapabilities, audioProducerId, audioStreamId, videoProducerId, videoStreamId } = body;
const { enabled, roomId } = body;
const room = this.rooms.get(roomId);
if(enabled) {
await me.mediaRecordStart(message, body, room);
} else {
await me.mediaRecordStop(message, body, room);
}
}
async mediaRecordStart(message, body, room) {
const { roomId, clientId, host, audioPort, videoPort, rtpCapabilities, audioStreamId, videoStreamId, audioProducerId, videoProducerId } = body;
const plainTransportOptions = {
...config.mediasoup.plainTransportOptions,
rtcpMux: rtcpMux,
comedia: comedia
rtcpMux : true,
comedia : true
};
const room = this.rooms.get(roomId);
const transport = await room.mediasoupRouter.createPlainTransport(plainTransportOptions);
me.transportEvent("plain", roomId, transport);
transport.clientId = clientId;
room.transports.set(transport.id, transport);
let audioConsumerId;
let videoConsumerId;
await transport.connect({
ip: '127.0.0.1',
port: remoteRtpPort,
rtcpPort: remoteRtcpPort
});
let audioConsumerId;
let audioTransportId;
let videoTransportId;
if(audioProducerId) {
const audioTransport = await room.mediasoupRouter.createPlainTransport(plainTransportOptions);
me.transportEvent("plain", roomId, audioTransport);
audioTransport.clientId = clientId;
room.transports.set(audioTransport.id, audioTransport);
audioTransport.observer.on("close", () => {
room.transports.delete(audioTransport.id)
});
await audioTransport.connect({
ip : host,
port : audioPort,
rtcpPort : audioPort
});
const audioConsumer = await transport.consume({
producerId: audioProducerId,
rtpCapabilities,
@@ -805,10 +819,25 @@ class Taoyao {
audioConsumerId = audioConsumer.id;
await audioConsumer.resume();
audioConsumer.clientId = clientId;
audioConsumer.streamId = videoStreamId;
audioConsumer.streamId = audioStreamId;
room.consumers.set(audioConsumer.id, audioConsumer);
audioConsumer.observer.on("close", () => {
room.consumers.delete(audioConsumer.id);
});
}
if(videoProducerId) {
const videoTransport = await room.mediasoupRouter.createPlainTransport(plainTransportOptions);
me.transportEvent("plain", roomId, videoTransport);
videoTransport.clientId = clientId;
room.transports.set(videoTransport.id, videoTransport);
videoTransport.observer.on("close", () => {
room.transports.delete(videoTransport.id)
});
await videoTransport.connect({
ip : host,
port : videoPort,
rtcpPort : videoPort
});
const videoConsumer = await transport.consume({
producerId: videoProducerId,
rtpCapabilities,
@@ -819,21 +848,44 @@ class Taoyao {
videoConsumer.clientId = clientId;
videoConsumer.streamId = videoStreamId;
room.consumers.set(videoConsumer.id, videoConsumer);
videoConsumer.observer.on("close", () => {
room.consumers.delete(videoConsumer.id);
});
}
console.info(transport.tuple)
console.info(transport.rtcpTuple)
message.body = {
ip : transport.tuple.localIp,
port : transport.tuple.localPort,
roomId : roomId,
rtcpPort : transport.rtcpTuple ? transport.rtcpTuple.localPort : undefined,
transportId : transport.id,
audioConsumerId : audioConsumerId,
videoConsumerId : videoConsumerId,
roomId : roomId,
audioConsumerId : audioConsumerId,
videoConsumerId : videoConsumerId,
audioTransportId : audioTransportId,
videoTransportId : videoTransportId,
};
me.push(message);
}
async mediaRecordStart(message, body, room) {
const { audioStreamId, videoStreamId, audioConsumerId, videoConsumerId, audioTransportId, videoTransportId } = body;
const audioConsumer = room.consumers.get(audioConsumerId);
if(audioConsumer) {
audioConsumer.close();
room.consumers.delete(audioConsumerId);
}
const videoConsumer = room.consumers.get(videoConsumerId);
if(videoConsumer) {
videoConsumer.close();
room.consumers.delete(videoConsumerId);
}
const audioTransport = room.transports.get(audioTransportId);
if(audioTransport) {
audioTransport.close();
room.transports.delete(audioTransportId);
}
const videoTransport = room.transports.get(videoTransportId);
if(videoTransport) {
videoTransport.close();
room.transports.delete(videoTransportId);
}
}
async mediaConsume(message, body) {
const {
roomId,
@@ -1363,20 +1415,18 @@ class Taoyao {
const { roomId, rtcpMux, comedia, clientId, enableSctp, numSctpStreams, enableSrtp, srtpCryptoSuite } = body;
const plainTransportOptions = {
...config.mediasoup.plainTransportOptions,
rtcpMux: rtcpMux,
comedia: comedia,
enableSctp: enableSctp || Boolean(numSctpStreams),
numSctpStreams: numSctpStreams || 0,
enableSrtp: enableSrtp,
srtpCryptoSuite: srtpCryptoSuite,
rtcpMux : rtcpMux,
comedia : comedia,
enableSctp : enableSctp || Boolean(numSctpStreams),
numSctpStreams : numSctpStreams || 0,
enableSrtp : enableSrtp,
srtpCryptoSuite : srtpCryptoSuite,
};
const room = this.rooms.get(roomId);
const transport = await room.mediasoupRouter.createPlainTransport(plainTransportOptions);
me.transportEvent("plain", roomId, transport);
transport.clientId = clientId;
room.transports.set(transport.id, transport);
console.info(transport.tuple)
console.info(transport.rtcpTuple)
message.body = {
ip : transport.tuple.localIp,
port : transport.tuple.localPort,

View File

@@ -109,6 +109,14 @@ public interface Constant {
* 密码
*/
String PASSWORD = "password";
/**
* 端口
*/
String PORT = "port";
/**
* 地址
*/
String HOST = "host";
/**
* 数据
*/
@@ -233,6 +241,10 @@ public interface Constant {
* 状态
*/
String ENABLED = "enabled";
/**
* 文件路径
*/
String FILEPATH = "filepath";
/**
* 是否是消费者
*/

View File

@@ -31,6 +31,8 @@ public class FfmpegProperties {
private String storageImagePath;
@Schema(title = "视频存储目录", description = "视频存储目录")
private String storageVideoPath;
@Schema(title = "录像录像地址", description = "录像录像地址")
private String host;
@Schema(title = "录像最小端口", description = "录像最小端口")
private Integer minPort;
@Schema(title = "录像最大端口", description = "录像最大端口")

View File

@@ -1,7 +1,9 @@
package com.acgist.taoyao.boot.utils;
import java.math.BigInteger;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.SocketException;
import java.net.UnknownHostException;
import java.util.BitSet;
@@ -209,5 +211,25 @@ public final class NetUtils {
// 本地地址A/B/C类本地地址
inetAddress.isSiteLocalAddress();
}
/**
* 扫描端口
*
* @param min 最小端口
* @param max 最大端口
*
* @return 端口
*/
public static final int scanPort(int min, int max) {
for (int port = min; port < max; port++) {
try (final DatagramSocket socket = new DatagramSocket(port)) {
socket.disconnect();
return port;
} catch (SocketException e) {
// 忽略
}
}
return 0;
}
}

View File

@@ -1,16 +1,20 @@
package com.acgist.taoyao.boot.utils;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import org.apache.commons.lang3.StringUtils;
import com.acgist.taoyao.boot.model.MessageCode;
import com.acgist.taoyao.boot.model.MessageCodeException;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
/**
* 脚本工具
* 命令工具
*
* @author acgist
*/
@@ -27,36 +31,176 @@ public final class ScriptUtils {
*
* @return 执行结果
*/
public static final String execute(String script) {
public static final ScriptExecutor execute(String script) {
if(StringUtils.isEmpty(script)) {
throw MessageCodeException.of(MessageCode.CODE_1002, "无效命令:" + script);
}
String result = null;
Process process = null;
final ScriptExecutor executor = new ScriptExecutor(script);
try {
process = Runtime.getRuntime().exec(script);
try(
final InputStream input = process.getInputStream();
final InputStream error = process.getErrorStream();
) {
final String inputValue = new String(input.readAllBytes());
final String errorValue = new String(input.readAllBytes());
log.info("""
执行命令:{}
执行结果:{}
失败结果:{}
""", script, inputValue, errorValue);
result = StringUtils.isEmpty(inputValue) ? errorValue : inputValue;
}
executor.execute();
} catch (Exception e) {
log.error("执行命令异常:{}", script, e);
result = e.getMessage();
} finally {
if(process != null) {
process.destroy();
}
return executor;
}
/**
* 命令执行器
*
* @author acgist
*/
@Getter
@Setter
public static final class ScriptExecutor {
/**
* 执行结果
*/
private int code;
/**
* 是否正在运行
*/
private boolean running;
/**
* 命令进程
*/
private Process process;
/**
* 命令进程Builder
*/
private ProcessBuilder processBuilder;
/**
* 执行命令
*/
private final String script;
/**
* 日志输出
*/
private final StringBuilder input;
/**
* 错误输出
*/
private final StringBuilder error;
/**
* @param script 执行命令
*/
public ScriptExecutor(String script) {
this.script = script;
this.input = new StringBuilder();
this.error = new StringBuilder();
}
/**
* 执行命令
*
* @throws IOException IO异常
* @throws InterruptedException 线程异常
*/
public void execute() throws InterruptedException, IOException {
final boolean linux = FileUtils.linux();
if(linux) {
this.processBuilder = new ProcessBuilder("/bin/bash", "-c", this.script);
this.process = this.processBuilder.start();
} else {
this.processBuilder = new ProcessBuilder("cmd", "/c", this.script);
this.process = this.processBuilder.start();
}
log.debug("开始执行命令:{}", this.script);
this.running = true;
try (
final InputStream input = this.process.getInputStream();
final InputStream error = this.process.getErrorStream();
) {
this.streamThread(linux, "TaoyaoScriptInput", this.input, input);
this.streamThread(linux, "TaoyaoScriptError", this.error, error);
this.code = this.process.waitFor();
}
this.running = false;
log.debug("""
结束执行命令:{}
执行状态:{}
执行日志:{}
错误日志:{}
""", this.script, this.code, this.input, this.error);
}
/**
* @param linux 是否Linux
* @param name 线程名称
* @param builder 日志记录
* @param input 日志输入流
*/
private void streamThread(boolean linux, String name, StringBuilder builder, InputStream input) {
final Thread streamThread = new Thread(() -> {
try {
int length;
final byte[] bytes = new byte[1024];
while(this.running && (length = input.read(bytes)) >= 0) {
builder.append(linux ? new String(bytes, 0, length) : new String(bytes, 0, length, "GBK"));
}
} catch (Exception e) {
log.error("读取执行命令日志异常", e);
}
});
streamThread.setName(name);
streamThread.setDaemon(true);
streamThread.start();
}
/**
* 结束命令
*/
public void stop() {
this.stop(null);
}
/**
* 结束命令
*
* @param script 结束命令
*/
public void stop(String script) {
// 等待时间
long wait = 0;
// 使用按键结束
if(StringUtils.isNotEmpty(script)) {
try (final OutputStream output = this.process.getOutputStream();) {
output.write(script.getBytes());
} catch (Exception e) {
log.error("结束命令异常:{}", this.script, e);
}
wait = 5000;
}
// 等待正常结束
while(this.process.isAlive() && wait >= 0) {
wait -= 10;
try {
Thread.sleep(10);
} catch (InterruptedException e) {
Thread.yield();
}
}
if(this.process.isAlive()) {
log.info("强制结束命令:{}", this.script);
// 所有子进程
this.process.children().forEach(process -> {
process.destroy();
});
// 当前父进程
this.process.destroy();
} else {
log.debug("正常结束命令:{}", this.script);
}
}
return result;
/**
* @return 执行结果
*/
public String getResult() {
return this.input.isEmpty() ? this.error.toString() : this.input.toString();
}
}
}

View File

@@ -228,21 +228,21 @@ taoyao:
# SDPVP8 | H264
sdp: |
v=0
o=- 0 0 IN IP4 127.0.0.1
o=- 0 0 IN IP4 %s
s=TaoyaoRecord
t=0 0
m=audio %d RTP/AVP 97
c=IN IP4 127.0.0.1
a=rtpmap:97 opus/48000/2
c=IN IP4 %s
a=rtpmap:97 OPUS/48000/2
a=fmtp:97 sprop-stereo=1
m=video %d RTP/AVP 96
c=IN IP4 127.0.0.1
c=IN IP4 %s
a=rtpmap:96 H264/90000
a=fmtp:96 packetization-mode=1
# 录像命令
record: ffmpeg -y -protocol_whitelist "file,rtp,udp" -i %s %s
record: ffmpeg -protocol_whitelist "file,rtp,udp" -y -i %s %s
# 截图命令
preview: ffmpeg -y -i %s -t %d -f image2 %s
preview: ffmpeg -y -i %s -ss %d -vframes 1 -f image2 %s
# 时长命令
duration: ffprobe -i %s -show_entries format=duration
# 存储目录
@@ -251,6 +251,8 @@ taoyao:
storage-image-path: /data/taoyao/storage/image
# 视频存储目录
storage-video-path: /data/taoyao/storage/video
# 录像地址
host: 127.0.0.1
# 端口范围
min-port: 50000
max-port: 59999

View File

@@ -11,22 +11,27 @@ public class RecorderTest {
public void testStart() throws InterruptedException {
final FfmpegProperties ffmpegProperties = new FfmpegProperties();
ffmpegProperties.setStorageVideoPath("D:\\tmp\\video");
ffmpegProperties.setMinPort(50000);
ffmpegProperties.setMaxPort(59999);
ffmpegProperties.setHost("127.0.0.1");
ffmpegProperties.setSdp("""
v=0
o=- 0 0 IN IP4 127.0.0.1
o=- 0 0 IN IP4 %s
s=TaoyaoRecord
t=0 0
m=audio %d RTP/AVP 97
c=IN IP4 127.0.0.1
a=rtpmap:97 opus/48000/2
c=IN IP4 %s
a=rtpmap:97 OPUS/48000/2
a=fmtp:97 sprop-stereo=1
m=video %d RTP/AVP 96
c=IN IP4 127.0.0.1
c=IN IP4 %s
a=rtpmap:96 H264/90000
a=fmtp:96 packetization-mode=1
""");
ffmpegProperties.setRecord("ffmpeg -y -protocol_whitelist \"file,rtp,udp\" -i %s %s");
final Recorder recorder = new Recorder(ffmpegProperties);
ffmpegProperties.setRecord("ffmpeg -protocol_whitelist \"file,rtp,udp\" -y -i %s %s");
ffmpegProperties.setPreview("ffmpeg -y -i %s -ss %d -vframes 1 -f image2 %s");
ffmpegProperties.setDuration("ffprobe -i %s -show_entries format=duration");
final Recorder recorder = new Recorder("taoyao", ffmpegProperties);
recorder.start();
Thread.sleep(20 * 1000);
recorder.stop();

View File

@@ -1,14 +1,20 @@
package com.acgist.taoyao.signal.party.media;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.UUID;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.lang3.math.NumberUtils;
import com.acgist.taoyao.boot.config.FfmpegProperties;
import com.acgist.taoyao.boot.utils.FileUtils;
import com.acgist.taoyao.boot.utils.NetUtils;
import com.acgist.taoyao.boot.utils.ScriptUtils;
import com.acgist.taoyao.boot.utils.ScriptUtils.ScriptExecutor;
import lombok.Getter;
import lombok.Setter;
@@ -41,41 +47,49 @@ public class Recorder {
*/
private Integer videoPort;
/**
* 传输通道
* 音频流ID
*/
private Transport transport;
private String audioStreamId;
/**
* 音频消费者
* 音频生产者ID
*/
private Consumer audioConsumer;
private String audioProducerId;
/**
* 频消费者
* 频消费者ID
*/
private Consumer videoConsumer;
private String audioConsumerId;
/**
* 录像进程
* 音频通道ID
*/
private Process process;
private String audioTransportId;
/**
* 进程Builder
* 视频流ID
*/
private ProcessBuilder processBuilder;
private String videoStreamId;
/**
* 视频生产者ID
*/
private String videoProducerId;
/**
* 视频消费者ID
*/
private String videoConsumerId;
/**
* 视频通道ID
*/
private String videoTransportId;
/**
* 录制线程
*/
private Thread thread;
/**
* 日志线程
* 视频时长
*/
private Thread inputThread;
private Double duration;
/**
* 异常线程
* 命令执行器
*/
private Thread errorThread;
/**
* 命令
*/
private String command;
private ScriptExecutor scriptExecutor;
/**
* 文件路径
*/
@@ -84,6 +98,10 @@ public class Recorder {
* SDP路径
*/
private final String sdpfile;
/**
* 预览图片
*/
private final String preview;
/**
* 文件路径
*/
@@ -93,15 +111,18 @@ public class Recorder {
*/
private final FfmpegProperties ffmpegProperties;
public Recorder(FfmpegProperties ffmpegProperties) {
/**
* @param name 录像名称
* @param ffmpegProperties FFmpeg配置
*/
public Recorder(String name, FfmpegProperties ffmpegProperties) {
this.close = false;
this.running = false;
this.ffmpegProperties = ffmpegProperties;
final String id = UUID.randomUUID().toString();
this.folder = Paths.get(ffmpegProperties.getStorageVideoPath(), id).toAbsolutePath().toString();
this.sdpfile = Paths.get(this.folder, "taoyao.sdp").toAbsolutePath().toString();
this.filepath = Paths.get(this.folder, "taoyao.mp4").toAbsolutePath().toString();
this.command = String.format(this.ffmpegProperties.getRecord(), this.sdpfile, this.filepath);
this.folder = Paths.get(ffmpegProperties.getStorageVideoPath(), name).toAbsolutePath().toString();
this.sdpfile = Paths.get(this.folder, "taoyao.sdp").toAbsolutePath().toString();
this.preview = Paths.get(this.folder, "taoyao.jpg").toAbsolutePath().toString();
this.filepath = Paths.get(this.folder, "taoyao.mp4").toAbsolutePath().toString();
FileUtils.mkdirs(this.folder);
}
@@ -114,11 +135,11 @@ public class Recorder {
return;
}
this.running = true;
this.thread = new Thread(this::record);
this.thread.setDaemon(true);
this.thread.setName("TaoyaoRecord");
this.thread.start();
}
this.thread = new Thread(this::record);
this.thread.setDaemon(true);
this.thread.setName("TaoyaoRecord");
this.thread.start();
}
/**
@@ -126,62 +147,19 @@ public class Recorder {
*/
private void record() {
this.buildSdpfile();
int status = 0;
final StringBuilder input = new StringBuilder();
final StringBuilder error = new StringBuilder();
final String recordScript = String.format(this.ffmpegProperties.getRecord(), this.sdpfile, this.filepath);
this.scriptExecutor = new ScriptExecutor(recordScript);
try {
final boolean linux = FileUtils.linux();
if(linux) {
this.processBuilder = new ProcessBuilder("/bin/bash", "-c", this.command);
this.process = processBuilder.start();
} else {
this.processBuilder = new ProcessBuilder("cmd", "/c", this.command);
this.process = processBuilder.start();
}
log.debug("""
开始录像:{}
录像命令{}
""", this.filepath, this.command);
this.inputThread = new Thread(() -> {
try (final InputStream inputStream = this.process.getInputStream()) {
int length;
final byte[] bytes = new byte[1024];
while(this.running && !this.close && (length = inputStream.read(bytes)) >= 0) {
input.append(linux ? new String(bytes, 0, length) : new String(bytes, 0, length, "GBK"));
}
} catch (Exception e) {
log.error("读取录像日志异常", e);
}
});
this.inputThread.setDaemon(true);
this.inputThread.setName("TaoyaoRecordInput");
this.inputThread.start();
this.errorThread = new Thread(() -> {
try (final InputStream inputStream = this.process.getErrorStream();) {
int length;
final byte[] bytes = new byte[1024];
while(this.running && !this.close && (length = inputStream.read(bytes)) >= 0) {
error.append(linux ? new String(bytes, 0, length) : new String(bytes, 0, length, "GBK"));
}
} catch (Exception e) {
log.error("读取录像错误异常", e);
}
});
this.errorThread.setDaemon(true);
this.errorThread.setName("TaoyaoRecordError");
this.errorThread.start();
status = this.process.waitFor();
录像端口{} - {}
""", this.folder, this.audioPort, this.videoPort);
this.scriptExecutor.execute();
} catch (Exception e) {
log.error("录像异常:{}", this.command, e);
log.error("录像异常:{}", recordScript, e);
} finally {
this.stop();
}
log.debug("""
结束录像:{}
结束状态:{}
录像日志:{}
异常日志:{}
""", this.filepath, status, input, error);
}
/**
@@ -189,9 +167,20 @@ public class Recorder {
*/
private void buildSdpfile() {
try {
this.audioPort = NetUtils.scanPort(this.ffmpegProperties.getMinPort(), this.ffmpegProperties.getMaxPort());
// 预留控制端口
this.videoPort = NetUtils.scanPort(this.audioPort + 16, this.ffmpegProperties.getMaxPort());
final String sdp = String.format(
this.ffmpegProperties.getSdp(),
this.ffmpegProperties.getHost(),
this.audioPort,
this.ffmpegProperties.getHost(),
this.videoPort,
this.ffmpegProperties.getHost()
);
Files.write(
Paths.get(this.sdpfile),
String.format(this.ffmpegProperties.getSdp(), 8888, 9999).getBytes(),
sdp.getBytes(),
StandardOpenOption.WRITE, StandardOpenOption.CREATE
);
} catch (IOException e) {
@@ -199,6 +188,41 @@ public class Recorder {
}
}
/**
* 视频预览截图
*/
private void preview() {
int time = 2;
final File file = Paths.get(this.preview).toFile();
while(time > 0 && !(file.exists() && file.length() > 0L)) {
log.debug("视频预览截图:{}", this.preview);
final String previewScript = String.format(this.ffmpegProperties.getPreview(), this.filepath, time, this.preview);
ScriptUtils.execute(previewScript);
time /= 2;
}
}
/**
* 视频时长
*/
private void duration() {
log.debug("视频时长:{}", this.filepath);
final String durationScript = String.format(this.ffmpegProperties.getDuration(), this.filepath);
final ScriptExecutor executor = ScriptUtils.execute(durationScript);
final Pattern pattern = Pattern.compile(".*duration\\=([0-9\\.]+).*");
final Matcher matcher = pattern.matcher(executor.getResult());
String duration = null;
if(matcher.find()) {
duration = matcher.group(matcher.groupCount()).strip();
}
if(NumberUtils.isCreatable(duration)) {
this.duration = Double.parseDouble(duration);
} else {
this.duration = 0D;
}
}
/**
* 结束录像
*/
@@ -209,16 +233,13 @@ public class Recorder {
}
this.close = true;
}
if(this.process == null) {
if(this.scriptExecutor == null) {
return;
}
log.debug("结束媒体录像:{}", this.filepath);
// 所有子进程
this.process.children().forEach(process -> {
process.destroy();
});
// 当前父进程
this.process.destroy();
log.debug("结束媒体录像:{}", this.folder);
this.scriptExecutor.stop("q");
this.preview();
this.duration();
}
}

View File

@@ -1,14 +1,19 @@
package com.acgist.taoyao.signal.protocol.media;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import com.acgist.taoyao.boot.annotation.Description;
import com.acgist.taoyao.boot.annotation.Protocol;
import com.acgist.taoyao.boot.config.Constant;
import com.acgist.taoyao.boot.config.FfmpegProperties;
import com.acgist.taoyao.boot.model.Message;
import com.acgist.taoyao.boot.utils.MapUtils;
import com.acgist.taoyao.signal.client.Client;
import com.acgist.taoyao.signal.client.ClientType;
import com.acgist.taoyao.signal.party.media.ClientWrapper;
import com.acgist.taoyao.signal.party.media.Kind;
import com.acgist.taoyao.signal.party.media.Recorder;
import com.acgist.taoyao.signal.party.media.Room;
import com.acgist.taoyao.signal.protocol.ProtocolRoomAdapter;
@@ -47,7 +52,15 @@ public class MediaRecordProtocol extends ProtocolRoomAdapter {
@Override
public void execute(String clientId, ClientType clientType, Room room, Client client, Client mediaClient, Message message, Map<String, Object> body) {
final Boolean enabled = MapUtils.get(body, Constant.ENABLED, Boolean.TRUE);
String filepath;
if(enabled) {
filepath = this.start(room, client, mediaClient);
} else {
filepath = this.stop(room, client, mediaClient);
}
body.put(Constant.FILEPATH, filepath);
client.push(message);
}
/**
@@ -60,25 +73,103 @@ public class MediaRecordProtocol extends ProtocolRoomAdapter {
public Message execute(String roomId, String clientId, Boolean enabled) {
final Room room = this.roomManager.room(roomId);
final Client client = this.clientManager.clients(clientId);
final Client mediaClient = room.getMediaClient();
String filepath;
if(enabled) {
this.record(room, client);
filepath = this.start(room, client, mediaClient);
} else {
filepath = this.stop(room, client, mediaClient);
}
return null;
return Message.success(Map.of(
Constant.ENABLED, enabled,
Constant.FILEPATH, filepath
));
}
/**
* 开始录制
*
* @param room 房间
* @param client 终端
* @param mediaClient 媒体终端
*
* @return 文件地址
*/
private void record(Room room, Client client) {
private String start(Room room, Client client, Client mediaClient) {
final ClientWrapper clientWrapper = room.clientWrapper(client);
synchronized (clientWrapper) {
if(clientWrapper.getRecorder() != null) {
return;
final Recorder recorder = clientWrapper.getRecorder();
if(recorder != null) {
return recorder.getFilepath();
}
final Recorder recorder = new Recorder(this.ffmpegProperties);
recorder.start();
clientWrapper.setRecorder(recorder);
}
// 打开录制线程
final Recorder recorder = new Recorder(UUID.randomUUID().toString(), this.ffmpegProperties);
recorder.start();
clientWrapper.setRecorder(recorder);
// 打开媒体录制
final Message message = this.build();
final Map<String, Object> body = new HashMap<>();
body.put("audioPort", recorder.getAudioPort());
body.put("videoPort", recorder.getVideoPort());
body.put(Constant.HOST, this.ffmpegProperties.getHost());
body.put(Constant.ROOM_ID, room.getRoomId());
body.put(Constant.ENABLED, true);
body.put(Constant.CLIENT_ID, client.clientId());
body.put(Constant.RTP_CAPABILITIES, clientWrapper.getRtpCapabilities());
clientWrapper.getProducers().values().forEach(producer -> {
if(producer.getKind() == Kind.AUDIO) {
recorder.setAudioStreamId(Constant.STREAM_ID_CONSUMER.apply(producer.getStreamId(), client.clientId()));
body.put("audioStreamId", recorder.getAudioStreamId());
body.put("audioProducerId", producer.getProducerId());
} else if(producer.getKind() == Kind.VIDEO) {
recorder.setAudioStreamId(Constant.STREAM_ID_CONSUMER.apply(producer.getStreamId(), client.clientId()));
body.put("videoStreamId", recorder.getVideoStreamId());
body.put("videoProducerId", producer.getProducerId());
} else {
// 忽略
}
});
message.setBody(body);
mediaClient.request(message);
return recorder.getFilepath();
}
/**
* 关闭录像
*
* @param room 房间
* @param client 终端
* @param mediaClient 媒体终端
*
* @return 文件地址
*/
private String stop(Room room, Client client, Client mediaClient) {
final Recorder recorder;
final ClientWrapper clientWrapper = room.clientWrapper(client);
synchronized (clientWrapper) {
recorder = clientWrapper.getRecorder();
if(recorder == null) {
return null;
}
}
// 关闭录制线程
recorder.stop();
clientWrapper.setRecorder(null);
// 关闭媒体录制
final Message message = this.build();
final Map<String, Object> body = new HashMap<>();
body.put("audioStreamId", recorder.getAudioStreamId());
body.put("videoStreamId", recorder.getVideoStreamId());
body.put("audioConsumerId", recorder.getAudioConsumerId());
body.put("videoConsumerId", recorder.getVideoConsumerId());
body.put("audioTransportId", recorder.getAudioTransportId());
body.put("videoTransportId", recorder.getVideoConsumerId());
body.put(Constant.ROOM_ID, room.getRoomId());
body.put(Constant.ENABLED, false);
message.setBody(body);
mediaClient.request(message);
return recorder.getFilepath();
}
}

View File

@@ -7,6 +7,7 @@ import com.acgist.taoyao.boot.config.Constant;
import com.acgist.taoyao.boot.model.Message;
import com.acgist.taoyao.boot.utils.MapUtils;
import com.acgist.taoyao.boot.utils.ScriptUtils;
import com.acgist.taoyao.boot.utils.ScriptUtils.ScriptExecutor;
import com.acgist.taoyao.signal.client.Client;
import com.acgist.taoyao.signal.client.ClientType;
import com.acgist.taoyao.signal.protocol.ProtocolClientAdapter;
@@ -45,7 +46,8 @@ public class PlatformScriptProtocol extends ProtocolClientAdapter {
@Override
public void execute(String clientId, ClientType clientType, Client client, Message message, Map<String, Object> body) {
final String script = MapUtils.get(body, Constant.SCRIPT);
final String result = ScriptUtils.execute(script);
final ScriptExecutor executor = ScriptUtils.execute(script);
final String result = executor.getResult();
log.info("""
执行终端:{}
执行命令:{}