[*] 踢人 拉人 切换视频媒体

This commit is contained in:
acgist
2023-03-12 12:29:53 +08:00
parent 1bf8fbe415
commit ab8646a21f
39 changed files with 1779 additions and 352 deletions

View File

@@ -28,6 +28,7 @@ https://www.cnblogs.com/ssyfj/p/14843082.html
https://zhuanlan.zhihu.com/p/466172240
http://koca.szkingdom.com/forum/t/topic/218
https://blog.csdn.net/qw225967/article/details/121251305
http://www.manoner.com/post/音视频基础/WebRTC核心组件和协议栈/
https://blog.csdn.net/ababab12345/article/details/115585378
https://blog.csdn.net/jisuanji111111/article/details/121634199

View File

@@ -82,6 +82,8 @@ async function main() {
console.log(`
桃之夭夭,灼灼其华。
之子于归,宜其室家。
:: https://gitee.com/acgist/taoyao
`);
console.info("开始启动:", config.name);
await buildMediasoupWorkers();

View File

@@ -1,4 +1,4 @@
const config = require("./Config");
const config = require("./Config.js");
const process = require("child_process");
const WebSocket = require("ws");
@@ -33,13 +33,14 @@ const protocol = {
* @param {*} signal 信令标识
* @param {*} body 消息主体
* @param {*} id 消息ID
* @param {*} v 消息版本
*
* @returns 信令消息
*/
buildMessage(signal, body = {}, id) {
buildMessage(signal, body = {}, id, v) {
const message = {
header: {
v: config.signal.version,
v: v || config.signal.version,
id: id || this.buildId(),
signal: signal,
},
@@ -49,6 +50,11 @@ const protocol = {
},
};
/**
* 名称冲突
*/
const taoyaoProtocol = protocol;
/**
* 信令通道
*/
@@ -404,9 +410,30 @@ class Taoyao {
case "media::consumer::pause":
me.mediaConsumerPause(message, body);
break;
case "media::consumer::request::key::frame":
me.mediaConsumerRequestKeyFrame(message, body);
break;
case "media::consumer::resume":
me.mediaConsumerResume(message, body);
break;
case "media::consumer::set::preferred::layers":
me.mediaConsumerSetPreferredLayers(message, body);
break;
case "media::data::consume":
me.mediaDataConsume(message, body);
break;
case "media::data::consumer::close":
me.mediaDataConsumerClose(message, body);
break;
case "media::data::produce":
me.mediaDataProduce(message, body);
break;
case "media::data::producer::close":
me.mediaDataProducerClose(message, body);
break;
case "media::ice::restart":
me.mediaIceRestart(message, body);
break;
case "media::produce":
me.mediaProduce(message, body);
break;
@@ -576,6 +603,7 @@ class Taoyao {
kind,
appData,
rtpParameters,
// 关键帧延迟时间
// keyFrameRequestDelay: 5000
});
producer.clientId = clientId;
@@ -597,10 +625,15 @@ class Taoyao {
});
producer.on("videoorientationchange", (videoOrientation) => {
console.info("producer videoorientationchange", producer.id, videoOrientation);
self.push(protocol.buildMessage("media::video::orientation::change", {
roomId: roomId,
...videoOrientation
}));
});
producer.on("trace", (trace) => {
console.info("producer trace", producer.id, trace);
});
// await producer.enableTraceEvent([ 'rtp', 'keyframe', 'nack', 'pli', 'fir' ]);
// producer.on("trace", (trace) => {
// console.info("producer trace", producer.id, trace);
// });
producer.observer.on("close", () => {
if(room.producers.delete(producer.id)) {
console.info("producer close", producer.id);
@@ -635,7 +668,7 @@ class Taoyao {
// producer.observer.on("score", fn(score));
// producer.observer.on("videoorientationchange", fn(videoOrientation));
// producer.observer.on("trace", fn(trace));
message.body = { kind: kind, producerId: producer.id };
message.body = { kind: kind, roomId: roomId, producerId: producer.id };
this.push(message);
if (producer.kind === "audio") {
room.audioLevelObserver
@@ -725,8 +758,8 @@ class Taoyao {
rtpCapabilities,
} = body;
const room = this.rooms.get(roomId);
const producer = room.producers.get(producerId);
const transport = room.transports.get(transportId);
const producer = room?.producers.get(producerId);
const transport = room?.transports.get(transportId);
if (
!room ||
!producer ||
@@ -822,9 +855,10 @@ class Taoyao {
})
);
});
consumer.on("trace", (trace) => {
console.info("consumer trace", consumer.id, trace);
});
// await consumer.enableTraceEvent([ 'rtp', 'keyframe', 'nack', 'pli', 'fir' ]);
// consumer.on("trace", (trace) => {
// console.info("consumer trace", consumer.id, trace);
// });
// consumer.on("rtp", (rtpPacket) => {
// console.info("consumer rtp", consumer.id, rtpPacket);
// });
@@ -863,18 +897,19 @@ class Taoyao {
// consumer.observer.on("layerschange", fn(layers));
// consumer.observer.on("trace", fn(trace));
// 等待终端准备就绪
this.request(
await this.request(
// this.push(
protocol.buildMessage("media::consume", {
kind: consumer.kind,
type: consumer.type,
roomId: roomId,
appData: producer.appData,
clientId: clientId,
sourceId: sourceId,
streamId: streamId,
producerId: producerId,
consumerId: consumer.id,
rtpParameters: consumer.rtpParameters,
appData: producer.appData,
producerPaused: consumer.producerPaused,
})
);
@@ -924,7 +959,7 @@ class Taoyao {
async mediaConsumerPause(message, body) {
const { roomId, consumerId } = body;
const room = this.rooms.get(roomId);
const consumer = room.consumers.get(consumerId);
const consumer = room?.consumers.get(consumerId);
if(consumer) {
console.info("暂停消费者:", consumerId);
await consumer.pause();
@@ -933,6 +968,27 @@ class Taoyao {
}
}
/**
* 请求关键帧信令
*
* @param {*} message 消息
* @param {*} body 消息主体
*/
async mediaConsumerRequestKeyFrame(message, body) {
const me = this;
const { roomId, consumerId } = body;
const room = this.rooms.get(roomId);
const consumer = room?.consumers.get(consumerId);
if(consumer) {
console.info("mediaConsumerRequestKeyFrame", consumerId);
// 处理trace监听读取关键帧
await consumer.requestKeyFrame();
me.push(message);
} else {
console.info("mediaConsumerRequestKeyFrame non", consumerId);
}
}
/**
* 恢复消费者信令
*
@@ -951,6 +1007,220 @@ class Taoyao {
}
}
/**
* 修改最佳空间层和时间层信令
*
* @param {*} message 消息
* @param {*} body 消息主体
*/
async mediaConsumerSetPreferredLayers(message, body) {
const me = this;
const { roomId, consumerId, spatialLayer, temporalLayer } = body;
const room = this.rooms.get(roomId);
const consumer = room?.consumers.get(consumerId);
if(consumer) {
console.info("mediaConsumerSetPreferredLayers", consumerId);
await consumer.setPreferredLayers({ spatialLayer, temporalLayer });
me.push(message);
} else {
console.info("mediaConsumerSetPreferredLayers non", consumerId);
}
}
/**
* 消费数据信令
*
* @param {*} message 消息
* @param {*} body 消息主体
*/
async mediaDataConsume(message, body) {
const me = this;
const {
roomId,
clientId,
sourceId,
streamId,
producerId,
transportId,
rtpCapabilities,
} = body;
const room = this.rooms.get(roomId);
const transport = room?.transports.get(transportId);
const dataProducer = room?.dataProducers.get(producerId);
if (
!room ||
!transport ||
!dataProducer
) {
console.warn(
"不能消费数据:",
roomId,
clientId,
producerId,
transportId
);
return;
}
let dataConsumer;
try {
dataConsumer = await transport.consumeData({
dataProducerId : dataProducer.id
});
} catch (error) {
console.error("消费数据异常:", producerId, error);
return;
}
dataConsumer.clientId = clientId;
dataConsumer.streamId = streamId;
room.dataConsumers.set(dataConsumer.id, dataConsumer);
console.info("创建数据消费者:", dataProducer.id);
dataConsumer.on('transportclose', () => {
console.info("dataConsumer transportclose", dataConsumer.id);
dataConsumer.close();
});
dataConsumer.on('dataproducerclose', () => {
console.info("dataConsumer dataproducerclose", dataConsumer.id);
dataConsumer.close();
});
dataConsumer.observer.on("close", () => {
if(room.dataConsumers.delete(dataConsumer.id)) {
console.info("dataConsumer close", dataConsumer.id);
me.push(
protocol.buildMessage("media::data::consumer::close", {
roomId: roomId,
consumerId: dataConsumer.id,
})
);
} else {
console.info("dataConsumer close non", dataConsumer.id);
}
});
// dataConsumer.on("message", fn(message, ppid));
// dataConsumer.on("bufferedamountlow", fn(bufferedAmount));
// dataConsumer.on("sctpsendbufferfull", fn());
this.push(
protocol.buildMessage("media::data::consume", {
label: dataConsumer.label,
roomId: roomId,
appData: dataProducer.appData,
protocol: dataConsumer.protocol,
clientId: clientId,
sourceId: sourceId,
streamId: streamId,
producerId: producerId,
consumerId: dataConsumer.id,
sctpStreamParameters: dataConsumer.sctpStreamParameters,
})
);
}
/**
* 关闭数据消费者信令
*
* @param {*} message 消息
* @param {*} body 消息主体
*/
async mediaDataConsumerClose(message, body) {
const { roomId, consumerId } = body;
const room = this.rooms.get(roomId);
const dataConsumer = room?.dataConsumers.get(consumerId);
if(dataConsumer) {
console.info("关闭数据消费者:", consumerId);
await dataConsumer.close();
} else {
console.info("关闭数据消费者无效:", consumerId);
}
}
/**
* 生产数据信令
*
* @param {*} message 消息
* @param {*} body 消息主体
*/
async mediaDataProduce(message, body) {
const me = this;
const {
label,
roomId,
appData,
clientId,
streamId,
protocol,
transportId,
sctpStreamParameters,
} = body;
const room = me.rooms.get(roomId);
const transport = room?.transports.get(transportId);
if(!transport) {
console.warn("生产数据生产者通道无效:", transportId);
return;
}
const dataProducer = await transport.produceData({
label,
appData,
protocol,
sctpStreamParameters,
});
dataProducer.clientId = clientId;
dataProducer.streamId = streamId;
room.dataProducers.set(dataProducer.id, dataProducer);
console.info("创建数据生产者:", dataProducer.id);
dataProducer.on("transportclose", () => {
console.info("dataProducer transportclose", dataProducer.id);
dataProducer.close();
});
dataProducer.observer.on("close", () => {
if(room.dataProducers.delete(dataProducer.id)) {
console.info("dataProducer close", dataProducer.id);
me.push(
taoyaoProtocol.buildMessage("media::data::producer::close", {
roomId: roomId,
producerId: dataProducer.id,
})
);
} else {
console.info("dataProducer close non", dataProducer.id);
}
})
message.body = { roomId: roomId, producerId: dataProducer.id };
this.push(message);
}
/**
* 关闭数据生产者信令
*
* @param {*} message 消息
* @param {*} body 消息主体
*/
async mediaDataProducerClose(message, body) {
const { roomId, producerId } = body;
const room = this.rooms.get(roomId);
const dataProducer = room?.dataProducers.get(producerId);
if(dataProducer) {
console.info("关闭数据生产者:", producerId);
await dataProducer.close();
} else {
console.info("关闭数据生产者无效:", producerId);
}
}
/**
* 重启ICE信令
*
* @param {*} message 消息
* @param {*} body 消息主体
*/
async mediaIceRestart(message, body) {
const me = this;
const { roomId, transportId } = body;
const room = this.rooms.get(roomId);
const transport = room?.transports.get(transportId);
const iceParameters = await transport.restartIce();
message.body.iceParameters = iceParameters;
me.push(message);
}
/**
* 路由RTP协商信令
*
@@ -973,7 +1243,7 @@ class Taoyao {
async mediaTransportClose(message, body) {
const { roomId, transportId } = body;
const room = this.rooms.get(roomId);
const transport = room.transports.get(transportId);
const transport = room?.transports.get(transportId);
if(transport) {
console.info("关闭传输通道:", transportId);
transport.close();
@@ -991,10 +1261,15 @@ class Taoyao {
async mediaTransportWebrtcConnect(message, body) {
const { roomId, transportId, dtlsParameters } = body;
const room = this.rooms.get(roomId);
const transport = room.transports.get(transportId);
const transport = room?.transports.get(transportId);
if(transport) {
console.info("连接WebRTC通道", transportId);
await transport.connect({ dtlsParameters });
message.body = { roomId: roomId, transportId: transport.id };
this.push(message);
} else {
console.info("连接WebRTC通道无效", transportId);
}
}
/**
@@ -1038,11 +1313,10 @@ class Taoyao {
console.info("transport listenserverclose", transport.id);
transport.close();
});
await transport.enableTraceEvent(["bwe"]);
// await transport.enableTraceEvent([ 'probation', 'bwe' ]);
transport.on("trace", (trace) => {
console.debug("transport trace", transport.id, trace);
});
// transport.on("trace", (trace) => {
// console.debug("transport trace", transport.id, trace);
// });
transport.observer.on("close", () => {
if(room.transports.delete(transport.id)) {
console.info("transport close", transport.id);
@@ -1100,6 +1374,7 @@ class Taoyao {
// transport.on("rtcp", fn(rtcpPacket));
room.transports.set(transport.id, transport);
message.body = {
roomId: roomId,
transportId: transport.id,
iceCandidates: transport.iceCandidates,
iceParameters: transport.iceParameters,

View File

@@ -3,7 +3,7 @@
<div id="taoyao">
<!-- 信令 -->
<el-dialog center width="30%" title="终端设置" :show-close="false" v-model="signalVisible">
<el-form ref="SignalSetting" :model="config">
<el-form ref="SignalSetting">
<el-form-item label="终端标识">
<el-input v-model="config.clientId" placeholder="终端标识" />
</el-form-item>
@@ -29,7 +29,7 @@
</el-dialog>
<!-- 房间 -->
<el-dialog center width="30%" title="房间设置" :show-close="false" v-model="roomVisible" @open="loadList">
<el-form ref="RoomSetting" :model="room">
<el-form ref="RoomSetting">
<el-tabs v-model="roomActive">
<el-tab-pane label="进入房间" name="enter">
<el-form-item label="房间标识">
@@ -48,6 +48,13 @@
<el-input v-model="room.name" placeholder="房间名称" />
</el-form-item>
</el-tab-pane>
<el-tab-pane label="邀请终端" name="invite">
<el-form-item label="终端标识">
<el-select v-model="room.inviteClientId" placeholder="终端标识">
<el-option v-for="value in clients" :key="value.clientId" :label="value.name || value.clientId" :value="value.clientId" />
</el-select>
</el-form-item>
</el-tab-pane>
</el-tabs>
<el-form-item label="房间密码">
<el-input v-model="room.password" placeholder="房间密码" />
@@ -56,23 +63,24 @@
<template #footer>
<el-button type="primary" @click="roomEnter" v-if="roomActive === 'enter'">进入</el-button>
<el-button type="primary" @click="roomCreate" v-if="roomActive === 'create'">创建</el-button>
<el-button type="primary" @click="roomInvite" v-if="roomActive === 'invite'">邀请</el-button>
</template>
</el-dialog>
<!-- 菜单 -->
<div class="menus">
<el-button type="primary" :disabled="taoyao && taoyao.connect" @click="signalVisible = true">连接信令</el-button>
<el-button type="primary" :disabled="!taoyao" @click="roomActive = 'enter';roomVisible = true;">选择房间</el-button>
<el-button type="primary" :disabled="!taoyao" @click="roomActive = 'create';roomVisible = true;">创建房间</el-button>
<el-button :disabled="!taoyao || !room.roomId">邀请终端</el-button>
<el-button :disabled="!taoyao || !room.roomId">离开房间</el-button>
<el-button :disabled="!taoyao || !room.roomId" @click="roomClose()" type="danger">关闭房间</el-button>
<el-button @click="signalVisible = true" type="primary" :disabled="taoyao && taoyao.connect">连接信令</el-button>
<el-button @click="roomActive = 'enter'; roomVisible = true;" type="primary" :disabled="!taoyao">选择房间</el-button>
<el-button @click="roomActive = 'create'; roomVisible = true;" type="primary" :disabled="!taoyao">创建房间</el-button>
<el-button @click="roomActive = 'invite'; roomVisible = true;" :disabled="!taoyao || !taoyao.roomId">邀请终端</el-button>
<el-button @click="roomLeave" :disabled="!taoyao || !taoyao.roomId">离开房间</el-button>
<el-button @click="roomClose" :disabled="!taoyao || !taoyao.roomId" type="danger">关闭房间</el-button>
</div>
<!-- 终端 -->
<div class="clients">
<!-- 本地终端 -->
<LocalClient v-if="taoyao" ref="local-client" :client="taoyao" :taoyao="taoyao"></LocalClient>
<LocalClient v-if="taoyao && taoyao.roomId" ref="local-client" :client="taoyao" :taoyao="taoyao"></LocalClient>
<!-- 远程终端 -->
<RemoteClient v-for="(kv, index) in remoteClients" :key="index" :ref="'remote-client-' + kv[0]" :client="kv[1]" :taoyao="taoyao"></RemoteClient>
</div>
@@ -92,6 +100,7 @@ export default {
room: {},
rooms: null,
medias: null,
clients: null,
config: {
clientId: "taoyao",
name: "taoyao",
@@ -111,6 +120,8 @@ export default {
console.info(`
中庭地白树栖鸦,冷露无声湿桂花。
今夜月明人尽望,不知秋思落谁家。
:: https://gitee.com/acgist/taoyao
`);
},
methods: {
@@ -126,22 +137,27 @@ export default {
async loadList() {
this.rooms = await this.taoyao.roomList();
this.medias = await this.taoyao.mediaList();
this.clients = await this.taoyao.clientList();
},
async roomLeave() {
this.taoyao.roomLeave();
},
async roomClose() {
this.taoyao.roomClose();
},
async roomCreate() {
const room = await this.taoyao.roomCreate(this.room);
this.room.roomId = room.roomId;
await this.roomEnter();
},
async roomEnter() {
await this.taoyao.roomEnter(this.room.roomId, this.room.password);
await this.taoyao.produceMedia();
this.roomVisible = false;
},
audioVolume(message) {
async roomCreate() {
const room = await this.taoyao.roomCreate(this.room);
this.room.roomId = room.roomId;
await this.roomEnter();
},
async roomInvite() {
this.taoyao.roomInvite(this.room.inviteClientId);
this.roomVisible = false;
},
/**
* 信令回调
@@ -157,9 +173,6 @@ export default {
case "client::config":
me.roomVisible = true;
break;
case "media::audio::active::speaker":
me.audioVolume(message);
break;
case "platform::error":
if (error) {
console.error("发生异常:", message, error);

View File

@@ -5,11 +5,11 @@
<video ref="video"></video>
<p class="title">{{ client?.name || "" }}</p>
<div class="buttons" :style="{'--volume': client?.volume}">
<el-button v-show="!client.audioActive" type="primary" title="打开麦克风" :icon="Microphone" circle />
<el-button v-show="client.audioActive" type="danger" title="关闭麦克风" :icon="Mute" circle />
<el-button v-show="!client.videoActive" type="primary" title="打开摄像头" :icon="VideoPlay" circle />
<el-button v-show="client.videoActive" type="danger" title="关闭摄像头" :icon="VideoPause" circle />
<el-button title="交换媒体" :icon="Refresh" circle />
<el-button @click="taoyao.mediaProducerResume(audioProducer.id)" v-show="audioProducer && audioProducer.paused" type="primary" title="打开麦克风" :icon="Microphone" circle />
<el-button @click="taoyao.mediaProducerPause(audioProducer.id)" v-show="audioProducer && !audioProducer.paused" type="danger" title="关闭麦克风" :icon="Mute" circle />
<el-button @click="taoyao.mediaProducerResume(videoProducer.id)" v-show="videoProducer && videoProducer.paused" type="primary" title="打开摄像头" :icon="VideoPlay" circle />
<el-button @click="taoyao.mediaProducerPause(videoProducer.id)" v-show="videoProducer && !videoProducer.paused" type="danger" title="关闭摄像头" :icon="VideoPause" circle />
<el-button @click="exchangeVideoSource" title="交换媒体" :icon="Refresh" circle />
<el-button title="拍照" :icon="Camera" circle />
<el-button title="录像" :icon="VideoCamera" circle />
<el-button title="媒体信息" :icon="InfoFilled" circle />
@@ -104,15 +104,26 @@ export default {
}
},
methods: {
media(track) {
if (track.kind === "video") {
exchangeVideoSource() {
// TODO文件支持
this.taoyao.videoSource = this.taoyao.videoSource === "camera" ? "screen" : "camera";
this.taoyao.updateVideoProducer();
},
media(track, producer) {
if(track.kind === "audio") {
// 不用加载音频
this.audioProducer = producer;
} else if (track.kind === "video") {
this.videoProducer = producer;
if (this.videoStream) {
// TODO资源释放
} else {
this.videoStream.getVideoTracks().forEach(oldTrack => {
console.debug("关闭旧的媒体:", oldTrack);
oldTrack.stop();
});
}
this.videoStream = new MediaStream();
this.videoStream.addTrack(track);
this.video.srcObject = this.videoStream;
}
this.video.play().catch((error) => console.warn("视频播放失败", error));
} else {
console.debug("本地不支持的媒体类型:", track);

View File

@@ -5,14 +5,14 @@
<video ref="video"></video>
<p class="title">{{ client?.name || "" }}</p>
<div class="buttons" :style="{'--volume': client?.volume}">
<el-button v-show="!client.audioActive" type="primary" title="打开麦克风" :icon="Microphone" circle />
<el-button v-show="client.audioActive" type="danger" title="关闭麦克风" :icon="Mute" circle />
<el-button v-show="!client.videoActive" type="primary" title="打开摄像头" :icon="VideoPlay" circle />
<el-button v-show="client.videoActive" type="danger" title="关闭摄像头" :icon="VideoPause" circle />
<el-button @click="taoyao.mediaConsumerResume(audioConsumer.id)" v-show="audioConsumer && audioConsumer.paused" type="primary" title="打开麦克风" :icon="Microphone" circle />
<el-button @click="taoyao.mediaConsumerPause(audioConsumer.id)" v-show="audioConsumer && !audioConsumer.paused" type="danger" title="关闭麦克风" :icon="Mute" circle />
<el-button @click="taoyao.mediaConsumerResume(videoConsumer.id)" v-show="videoConsumer && videoConsumer.paused" type="primary" title="打开摄像头" :icon="VideoPlay" circle />
<el-button @click="taoyao.mediaConsumerPause(videoConsumer.id)" v-show="videoConsumer && !videoConsumer.paused" type="danger" title="关闭摄像头" :icon="VideoPause" circle />
<el-button title="拍照" :icon="Camera" circle />
<el-button title="录像" :icon="VideoCamera" circle />
<el-button title="媒体信息" :icon="InfoFilled" circle />
<el-button title="踢出" :icon="CircleClose" circle />
<el-button @click="roomExpel" title="踢出" :icon="CircleClose" circle />
</div>
</div>
</template>
@@ -69,8 +69,12 @@ export default {
}
},
methods: {
roomExpel() {
this.taoyao.roomExpel(this.client.clientId);
},
media(track, consumer) {
if(track.kind === 'audio') {
this.audioConsumer = consumer;
if (this.audioStream) {
// TODO资源释放
} else {
@@ -80,6 +84,7 @@ export default {
}
this.audio.play().catch((error) => console.warn("视频播放失败", error));
} else if(track.kind === 'video') {
this.videoConsumer = consumer;
if (this.videoStream) {
// TODO资源释放
} else {

File diff suppressed because it is too large Load Diff

View File

@@ -14,7 +14,6 @@ import java.lang.annotation.Target;
* =[消息类型]> 同步请求
* -[消息类型]) 全员广播:对所有的终端广播信令(排除自己)
* +[消息类型]) 全员广播:对所有的终端广播信令(包含自己)
* ...:其他自定义的透传内容
*
* @author acgist
*/
@@ -30,9 +29,6 @@ public @interface Description {
String[] body() default { "{}" };
/**
* 同步:需要等待服务端数据时使用
* 异步:不用等待服务端数据时使用(服务端能主动通知类型消息都能使用异步)
*
* @return 数据流向
*/
String[] flow() default { "终端->信令服务->终端" };

View File

@@ -1,25 +0,0 @@
package com.acgist.taoyao.boot.annotation;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
/**
* 模板:多例对象
*
* @author acgist
*/
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
@Target(ElementType.TYPE)
@Component
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface Prototype {
}

View File

@@ -1,5 +1,7 @@
package com.acgist.taoyao.boot.config;
import java.util.function.BiFunction;
/**
* 常量
*
@@ -107,6 +109,10 @@ public interface Constant {
* 密码
*/
String PASSWORD = "password";
/**
* 数据
*/
String DATA = "data";
/**
* 名称
*/
@@ -228,4 +234,14 @@ public interface Constant {
*/
String SUBSCRIBE_TYPE = "subscribeType";
/**
* 生产者ID生成器
*/
public static final BiFunction<String, String, String> STREAM_ID_PRODUCER = (type, producerId) -> type + "::" + producerId;
/**
* 消费者ID生成器
*/
public static final BiFunction<String, String, String> STREAM_ID_CONSUMER = (producerStreamId, consumerId) -> producerStreamId + "->" + consumerId;
}

View File

@@ -203,6 +203,8 @@ taoyao:
password: taoyao
# 定时任务
scheduled:
# 清理房间无效资源
room: 0 0/5 * * * ?
# 清理无效终端连接
client: 0 * * * * ?
# 地址重写

View File

@@ -58,7 +58,7 @@ public class DataConsumer extends OperatorAdapter {
@Override
public void remove() {
log.info("移除数据消费者:{} - {}", this.streamId, this.consumerId);
this.room.getDataProducers().remove(this.consumerId);
this.room.getDataConsumers().remove(this.consumerId);
this.dataProducer.getDataConsumers().remove(this.consumerId);
this.consumerClient.getDataConsumers().remove(this.consumerId);
}

View File

@@ -170,6 +170,18 @@ public class Room extends OperatorAdapter {
return this.mediaClient.request(message);
}
/**
* 单播消息
*
* @param to 接收终端
* @param message 消息
*/
public void unicast(String to, Message message) {
this.clients.keySet().stream()
.filter(v -> Objects.equals(to, v.clientId()))
.forEach(v -> v.push(message));
}
/**
* 广播消息
*
@@ -306,4 +318,15 @@ public class Room extends OperatorAdapter {
this.clients.values().forEach(ClientWrapper::log);
}
/**
* 清理没有关联终端的资源
*/
public void releaseUnknowClient() {
this.transports.values().stream().filter(v -> !this.clients.containsKey(v.getClient())).forEach(Transport::close);
this.consumers.values().stream().filter(v -> !this.clients.containsValue(v.getConsumerClient())).forEach(Consumer::close);
this.producers.values().stream().filter(v -> !this.clients.containsValue(v.getProducerClient())).forEach(Producer::close);
this.dataConsumers.values().stream().filter(v -> !this.clients.containsValue(v.getConsumerClient())).forEach(DataConsumer::close);
this.dataProducers.values().stream().filter(v -> !this.clients.containsValue(v.getProducerClient())).forEach(DataProducer::close);
}
}

View File

@@ -5,6 +5,8 @@ import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CopyOnWriteArrayList;
import org.springframework.scheduling.annotation.Scheduled;
import com.acgist.taoyao.boot.annotation.Manager;
import com.acgist.taoyao.boot.config.Constant;
import com.acgist.taoyao.boot.model.Message;
@@ -38,6 +40,11 @@ public class RoomManager {
this.rooms = new CopyOnWriteArrayList<>();
}
@Scheduled(cron = "${taoyao.scheduled.room:0 0/5 * * * ?}")
public void scheduled() {
this.releaseUnknowClient();
}
/**
* @param roomId 房间标识
*
@@ -162,4 +169,11 @@ public class RoomManager {
this.rooms.forEach(Room::log);
}
/**
* 清理没有关联终端的资源
*/
private void releaseUnknowClient() {
this.rooms.forEach(Room::releaseUnknowClient);
}
}

View File

@@ -31,10 +31,8 @@ public abstract class ProtocolRoomAdapter extends ProtocolClientAdapter {
if(!this.authenticate(room, client)) {
throw MessageCodeException.of("终端没有房间权限:" + clientId);
}
synchronized (room) {
this.execute(clientId, clientType, room, client, room.getMediaClient(), message, body);
}
}
/**
* @param room 房间

View File

@@ -37,6 +37,12 @@ import lombok.extern.slf4j.Slf4j;
终端生产媒体当前房间所有终端根据订阅类型自动消费媒体
终端创建WebRTC消费通道根据订阅类型自动消费当前房间已有媒体
""",
body = """
{
"roomId": "房间ID"
"producerId": "生产者ID"
}
""",
flow = {
"终端-[生产媒体]>信令服务-[其他终端消费])信令服务",
"终端-[创建WebRTC消费通道]>信令服务-[消费其他终端])信令服务",
@@ -118,7 +124,7 @@ public class MediaConsumeProtocol extends ProtocolRoomAdapter implements Applica
private void consume(Room room, ClientWrapper consumerClientWrapper, Producer producer, Message message) {
final Client mediaClient = room.getMediaClient();
final String consumerClientId = consumerClientWrapper.getClientId();
final String streamId = producer.getStreamId() + "->" + consumerClientId;
final String streamId = Constant.STREAM_ID_CONSUMER.apply(producer.getStreamId(), consumerClientId);
final ClientWrapper producerClientWrapper = producer.getProducerClient();
final String producerClientId = producerClientWrapper.getClientId();
if(consumerClientWrapper.consumed(producer)) {

View File

@@ -69,6 +69,7 @@ public class MediaConsumerCloseProtocol extends ProtocolRoomAdapter implements A
if(clientType.mediaClient()) {
consumer.close();
} else if(clientType.mediaServer()) {
// TODO路由到真实消费者
consumer.remove();
room.broadcast(message);
} else {

View File

@@ -59,6 +59,7 @@ public class MediaConsumerPauseProtocol extends ProtocolRoomAdapter implements A
final Consumer consumer = room.consumer(consumerId);
consumer.pause();
} else if(clientType.mediaServer()) {
// TODO路由到真实消费者
room.broadcast(message);
} else {
this.logNoAdapter(clientType);

View File

@@ -1,5 +1,52 @@
package com.acgist.taoyao.signal.protocol.media;
public class MediaConsumerRequestKeyFrameProtocol {
import java.util.Map;
import com.acgist.taoyao.boot.annotation.Description;
import com.acgist.taoyao.boot.annotation.Protocol;
import com.acgist.taoyao.boot.config.Constant;
import com.acgist.taoyao.boot.model.Message;
import com.acgist.taoyao.boot.utils.MapUtils;
import com.acgist.taoyao.signal.client.Client;
import com.acgist.taoyao.signal.client.ClientType;
import com.acgist.taoyao.signal.party.media.Room;
import com.acgist.taoyao.signal.protocol.ProtocolRoomAdapter;
/**
* 请求关键帧信令
* 注意:视频才有
*
* @author acgist
*/
@Protocol
@Description(
body = """
{
"roomId": "房间ID",
"consumerId": "消费者ID"
}
""",
flow = "终端->信令服务->媒体服务->信令服务->终端"
)
public class MediaConsumerRequestKeyFrameProtocol extends ProtocolRoomAdapter {
public static final String SIGNAL = "media::consumer::request::key::frame";
public MediaConsumerRequestKeyFrameProtocol() {
super("请求关键帧信令", SIGNAL);
}
@Override
public void execute(String clientId, ClientType clientType, Room room, Client client, Client mediaClient, Message message, Map<String, Object> body) {
if(clientType.mediaClient()) {
body.put(Constant.CLIENT_ID, clientId);
mediaClient.push(message);
} else if(clientType.mediaServer()) {
final String requestClientId = MapUtils.remove(body, Constant.CLIENT_ID);
room.unicast(requestClientId, message);
} else {
this.logNoAdapter(clientType);
}
}
}

View File

@@ -34,7 +34,7 @@ import com.acgist.taoyao.signal.protocol.ProtocolRoomAdapter;
)
public class MediaConsumerResumeProtocol extends ProtocolRoomAdapter implements ApplicationListener<MediaConsumerResumeEvent> {
public static final String SIGNAL = "media::consumer::resumt";
public static final String SIGNAL = "media::consumer::resume";
public MediaConsumerResumeProtocol() {
super("恢复消费者信令", SIGNAL);
@@ -57,8 +57,9 @@ public class MediaConsumerResumeProtocol extends ProtocolRoomAdapter implements
if(clientType.mediaClient()) {
final String consumerId = MapUtils.get(body, Constant.CONSUMER_ID);
final Consumer consumer = room.consumer(consumerId);
consumer.pause();
consumer.resume();
} else if(clientType.mediaServer()) {
// TODO路由到真实消费者
room.broadcast(message);
} else {
this.logNoAdapter(clientType);

View File

@@ -1,5 +1,57 @@
package com.acgist.taoyao.signal.protocol.media;
public class MediaConsumerSetPreferredLayersProtocol {
import java.util.Map;
import com.acgist.taoyao.boot.annotation.Description;
import com.acgist.taoyao.boot.annotation.Protocol;
import com.acgist.taoyao.boot.config.Constant;
import com.acgist.taoyao.boot.model.Message;
import com.acgist.taoyao.boot.utils.MapUtils;
import com.acgist.taoyao.signal.client.Client;
import com.acgist.taoyao.signal.client.ClientType;
import com.acgist.taoyao.signal.party.media.Room;
import com.acgist.taoyao.signal.protocol.ProtocolRoomAdapter;
/**
* 修改最佳空间层和时间层信令
* 空间层spatialLayer分辨率
* 时间层temporalLayer帧率
* 码率:数据大小和时间的比值
* 注意只有simulcast和SVC消费者有效
*
* @author acgist
*/
@Protocol
@Description(
body = """
{
"roomId": "房间ID",
"consumerId": "消费者ID",
"spatialLayer": 最佳空间层,
"temporalLayer": 最佳时间层
}
""",
flow = "终端->信令服务->媒体服务->信令服务->终端"
)
public class MediaConsumerSetPreferredLayersProtocol extends ProtocolRoomAdapter {
public static final String SIGNAL = "media::consumer::set::preferred::layers";
public MediaConsumerSetPreferredLayersProtocol() {
super("修改最佳空间层和时间层信令", SIGNAL);
}
@Override
public void execute(String clientId, ClientType clientType, Room room, Client client, Client mediaClient, Message message, Map<String, Object> body) {
if(clientType.mediaClient()) {
body.put(Constant.CLIENT_ID, clientId);
mediaClient.push(message);
} else if(clientType.mediaServer()) {
final String requestClientId = MapUtils.remove(body, Constant.CLIENT_ID);
room.unicast(requestClientId, message);
} else {
this.logNoAdapter(clientType);
}
}
}

View File

@@ -1,5 +1,45 @@
package com.acgist.taoyao.signal.protocol.media;
public class MediaConsumerSetPriorityProtocol {
import java.util.Map;
import com.acgist.taoyao.boot.annotation.Description;
import com.acgist.taoyao.boot.annotation.Protocol;
import com.acgist.taoyao.boot.model.Message;
import com.acgist.taoyao.signal.client.Client;
import com.acgist.taoyao.signal.client.ClientType;
import com.acgist.taoyao.signal.party.media.Room;
import com.acgist.taoyao.signal.protocol.ProtocolRoomAdapter;
/**
* 设置消费者优先级信令
*
* @author acgist
*/
@Protocol
@Description(
body = """
{
"roomId": "房间ID",
"consumerId": "消费者ID",
"priority": 优先级1~255
}
"""
)
public class MediaConsumerSetPriorityProtocol extends ProtocolRoomAdapter {
public static final String SIGNAL = "media::consumer::set::priority";
public MediaConsumerSetPriorityProtocol() {
super("设置消费者优先级信令", SIGNAL);
}
@Override
public void execute(String clientId, ClientType clientType, Room room, Client client, Client mediaClient, Message message, Map<String, Object> body) {
if(clientType.mediaClient()) {
client.push(mediaClient.request(message));
} else {
this.logNoAdapter(clientType);
}
}
}

View File

@@ -1,5 +1,96 @@
package com.acgist.taoyao.signal.protocol.media;
public class MediaDataConsumeProtocol {
import java.util.Map;
import com.acgist.taoyao.boot.annotation.Description;
import com.acgist.taoyao.boot.annotation.Protocol;
import com.acgist.taoyao.boot.config.Constant;
import com.acgist.taoyao.boot.model.Message;
import com.acgist.taoyao.boot.model.MessageCodeException;
import com.acgist.taoyao.boot.utils.MapUtils;
import com.acgist.taoyao.signal.client.Client;
import com.acgist.taoyao.signal.client.ClientType;
import com.acgist.taoyao.signal.party.media.ClientWrapper;
import com.acgist.taoyao.signal.party.media.DataConsumer;
import com.acgist.taoyao.signal.party.media.DataProducer;
import com.acgist.taoyao.signal.party.media.Room;
import com.acgist.taoyao.signal.party.media.Transport;
import com.acgist.taoyao.signal.protocol.ProtocolRoomAdapter;
import lombok.extern.slf4j.Slf4j;
/**
* 消费数据信令
*
* @author acgist
*/
@Slf4j
@Protocol
@Description(
memo = """
数据通道消费者不会自动创建,需要用户自己订阅生产者。
""",
body = """
{
"roomId": "房间ID"
"producerId": "生产者ID",
}
""",
flow = {
"终端=>信令服务->媒体服务->信令服务->媒体服务"
}
)
public class MediaDataConsumeProtocol extends ProtocolRoomAdapter {
public static final String SIGNAL = "media::data::consume";
public MediaDataConsumeProtocol() {
super("消费数据信令", SIGNAL);
}
@Override
public void execute(String clientId, ClientType clientType, Room room, Client client, Client mediaClient, Message message, Map<String, Object> body) {
final String producerId = MapUtils.get(body, Constant.PRODUCER_ID);
final DataProducer dataProducer = room.dataProducer(producerId);
if(dataProducer == null) {
throw MessageCodeException.of("没有提供数据生产:" + producerId);
}
if(clientType.mediaClient()) {
final ClientWrapper dataConsumerClientWrapper = room.clientWrapper(client);
final String dataConsumerClientId = dataConsumerClientWrapper.getClientId();
final ClientWrapper dataProducerClientWrapper = dataProducer.getProducerClient();
final String dataProducerClientId = dataProducerClientWrapper.getClientId();
final Transport recvTransport = dataConsumerClientWrapper.getRecvTransport();
final String streamId = Constant.STREAM_ID_CONSUMER.apply(dataProducer.getStreamId(), dataConsumerClientId);
body.put(Constant.ROOM_ID, room.getRoomId());
body.put(Constant.CLIENT_ID, dataConsumerClientId);
body.put(Constant.SOURCE_ID, dataProducerClientId);
body.put(Constant.STREAM_ID, streamId);
body.put(Constant.PRODUCER_ID, dataProducer.getProducerId());
body.put(Constant.TRANSPORT_ID, recvTransport.getTransportId());
body.put(Constant.RTP_CAPABILITIES, dataConsumerClientWrapper.getRtpCapabilities());
body.put(Constant.SCTP_CAPABILITIES, dataConsumerClientWrapper.getSctpCapabilities());
mediaClient.push(message);
} else if(clientType.mediaServer()) {
final String streamId = MapUtils.get(body, Constant.STREAM_ID);
final String consumerId = MapUtils.get(body, Constant.CONSUMER_ID);
final String dataConsumerClientId = MapUtils.get(body, Constant.CLIENT_ID);
final ClientWrapper dataConsumerClientWrapper = room.clientWrapper(dataConsumerClientId);
final Map<String, DataConsumer> roomDataConsumers = room.getDataConsumers();
final Map<String, DataConsumer> clientDataConsumers = dataConsumerClientWrapper.getDataConsumers();
final Map<String, DataConsumer> producerDataConsumers = dataProducer.getDataConsumers();
final DataConsumer dataConsumer = new DataConsumer(streamId, consumerId, room, dataProducer, dataConsumerClientWrapper);
final DataConsumer oldDataRoomConsumer = roomDataConsumers.put(consumerId, dataConsumer);
final DataConsumer oldDataClientConsumer = clientDataConsumers.put(consumerId, dataConsumer);
final DataConsumer oldDataProducerConsumer = producerDataConsumers.put(consumerId, dataConsumer);
if(oldDataRoomConsumer != null || oldDataClientConsumer != null || oldDataProducerConsumer != null) {
log.warn("消费者已经存在:{}", consumerId);
}
final Client consumeClient = dataConsumerClientWrapper.getClient();
consumeClient.push(message);
} else {
this.logNoAdapter(clientType);
}
}
}

View File

@@ -69,6 +69,7 @@ public class MediaDataConsumerCloseProtocol extends ProtocolRoomAdapter implemen
if(clientType.mediaClient()) {
dataConsumer.close();
} else if(clientType.mediaServer()) {
// TODO路由到真实消费者
dataConsumer.remove();
room.broadcast(message);
} else {

View File

@@ -1,5 +1,45 @@
package com.acgist.taoyao.signal.protocol.media;
public class MediaDataConsumerStatusProtocol {
import java.util.Map;
import com.acgist.taoyao.boot.annotation.Description;
import com.acgist.taoyao.boot.annotation.Protocol;
import com.acgist.taoyao.boot.model.Message;
import com.acgist.taoyao.signal.client.Client;
import com.acgist.taoyao.signal.client.ClientType;
import com.acgist.taoyao.signal.party.media.Room;
import com.acgist.taoyao.signal.protocol.ProtocolRoomAdapter;
/**
* 查询数据消费者状态信令
*
* @author acgist
*/
@Protocol
@Description(
body = """
{
"roomId": "房间ID",
"consumerId": "数据消费者ID"
}
""",
flow = "终端=>信令服务->媒体服务->信令服务->终端"
)
public class MediaDataConsumerStatusProtocol extends ProtocolRoomAdapter {
public static final String SIGNAL = "media::data::consumer::status";
public MediaDataConsumerStatusProtocol() {
super("查询数据消费者状态信令", SIGNAL);
}
@Override
public void execute(String clientId, ClientType clientType, Room room, Client client, Client mediaClient, Message message, Map<String, Object> body) {
if(clientType.mediaClient()) {
client.push(mediaClient.request(message));
} else {
this.logNoAdapter(clientType);
}
}
}

View File

@@ -1,5 +1,75 @@
package com.acgist.taoyao.signal.protocol.media;
public class MediaDataProduceProtocol {
import java.util.Map;
import com.acgist.taoyao.boot.annotation.Description;
import com.acgist.taoyao.boot.annotation.Protocol;
import com.acgist.taoyao.boot.config.Constant;
import com.acgist.taoyao.boot.model.Message;
import com.acgist.taoyao.boot.utils.MapUtils;
import com.acgist.taoyao.signal.client.Client;
import com.acgist.taoyao.signal.client.ClientType;
import com.acgist.taoyao.signal.party.media.ClientWrapper;
import com.acgist.taoyao.signal.party.media.DataProducer;
import com.acgist.taoyao.signal.party.media.Room;
import com.acgist.taoyao.signal.protocol.ProtocolRoomAdapter;
import lombok.extern.slf4j.Slf4j;
/**
* 生产数据信令
*
* @author acgist
*/
@Slf4j
@Protocol
@Description(
body = {
"""
{
"roomId": "房间标识",
"transportId": "通道标识"
}
"""
},
flow = "终端->信令服务->媒体服务->信令服务->终端"
)
public class MediaDataProduceProtocol extends ProtocolRoomAdapter {
public static final String SIGNAL = "media::data::produce";
public MediaDataProduceProtocol() {
super("生产数据信令", SIGNAL);
}
@Override
public void execute(String clientId, ClientType clientType, Room room, Client client, Client mediaClient, Message message, Map<String, Object> body) {
if(clientType.mediaClient()) {
final String streamId = Constant.STREAM_ID_PRODUCER.apply(Constant.DATA, clientId);
body.put(Constant.CLIENT_ID, clientId);
body.put(Constant.STREAM_ID, streamId);
final Message response = room.request(message);
final Map<String, Object> responseBody = response.body();
final String producerId = MapUtils.get(responseBody, Constant.PRODUCER_ID);
final ClientWrapper producerClientWrapper = room.clientWrapper(client);
final Map<String, DataProducer> roomDataProducers = room.getDataProducers();
final Map<String, DataProducer> clientDataProducers = producerClientWrapper.getDataProducers();
final DataProducer dataProducer = new DataProducer(streamId, producerId, room, producerClientWrapper);
final DataProducer oldRoomDataProducer = roomDataProducers.put(producerId, dataProducer);
final DataProducer oldClientDataProducer = clientDataProducers.put(producerId, dataProducer);
if(oldRoomDataProducer != null || oldClientDataProducer != null) {
log.warn("数据生产者已经存在:{}", producerId);
}
final Message responseMessage = response.cloneWithoutBody();
responseMessage.setBody(Map.of(
Constant.STREAM_ID, streamId,
Constant.PRODUCER_ID, producerId
));
room.broadcast(responseMessage);
log.info("{}生产数据:{} - {}", clientId, streamId, producerId);
} else {
this.logNoAdapter(clientType);
}
}
}

View File

@@ -66,6 +66,7 @@ public class MediaDataProducerCloseProtocol extends ProtocolRoomAdapter implemen
if(clientType.mediaClient()) {
dataProducer.close();
} else if(clientType.mediaServer()) {
// TODO路由到真实消费者
dataProducer.remove();
room.broadcast(message);
} else {

View File

@@ -1,5 +1,45 @@
package com.acgist.taoyao.signal.protocol.media;
public class MediaDataProducerStatusProtocol {
import java.util.Map;
import com.acgist.taoyao.boot.annotation.Description;
import com.acgist.taoyao.boot.annotation.Protocol;
import com.acgist.taoyao.boot.model.Message;
import com.acgist.taoyao.signal.client.Client;
import com.acgist.taoyao.signal.client.ClientType;
import com.acgist.taoyao.signal.party.media.Room;
import com.acgist.taoyao.signal.protocol.ProtocolRoomAdapter;
/**
* 查询数据生产者状态信令
*
* @author acgist
*/
@Protocol
@Description(
body = """
{
"roomId": "房间ID",
"producerId": "数据生产者ID"
}
""",
flow = "终端=>信令服务->媒体服务->信令服务->终端"
)
public class MediaDataProducerStatusProtocol extends ProtocolRoomAdapter {
public static final String SIGNAL = "media::data::producer::status";
public MediaDataProducerStatusProtocol() {
super("查询数据生产者状态信令", SIGNAL);
}
@Override
public void execute(String clientId, ClientType clientType, Room room, Client client, Client mediaClient, Message message, Map<String, Object> body) {
if(clientType.mediaClient()) {
client.push(mediaClient.request(message));
} else {
this.logNoAdapter(clientType);
}
}
}

View File

@@ -1,10 +1,17 @@
package com.acgist.taoyao.signal.protocol.media;
import java.util.Map;
import com.acgist.taoyao.boot.annotation.Description;
import com.acgist.taoyao.boot.annotation.Protocol;
import com.acgist.taoyao.boot.model.Message;
import com.acgist.taoyao.signal.client.Client;
import com.acgist.taoyao.signal.client.ClientType;
import com.acgist.taoyao.signal.party.media.Room;
import com.acgist.taoyao.signal.protocol.ProtocolRoomAdapter;
/**
* 媒体重启ICE信令
* 重启ICE信令
*
* @author acgist
*/
@@ -25,10 +32,23 @@ import com.acgist.taoyao.boot.annotation.Protocol;
}
"""
},
flow = "终端->信令服务->媒体服务->信令服务->终端"
flow = "终端=>信令服务->媒体服务->信令服务->终端"
)
public class MediaIceRestartProtocol {
public class MediaIceRestartProtocol extends ProtocolRoomAdapter {
public static final String SIGNAL = "media::ice::restart";
public MediaIceRestartProtocol() {
super("重启ICE信令", SIGNAL);
}
@Override
public void execute(String clientId, ClientType clientType, Room room, Client client, Client mediaClient, Message message, Map<String, Object> body) {
if(clientType.mediaClient()) {
client.push(mediaClient.request(message));
} else {
this.logNoAdapter(clientType);
}
}
}

View File

@@ -49,7 +49,7 @@ public class MediaProduceProtocol extends ProtocolRoomAdapter {
public void execute(String clientId, ClientType clientType, Room room, Client client, Client mediaClient, Message message, Map<String, Object> body) {
if(clientType.mediaClient()) {
final String kind = MapUtils.get(body, Constant.KIND);
final String streamId = kind + "::" + clientId;
final String streamId = Constant.STREAM_ID_PRODUCER.apply(kind, clientId);
body.put(Constant.CLIENT_ID, clientId);
body.put(Constant.STREAM_ID, streamId);
final Message response = room.request(message);

View File

@@ -66,6 +66,7 @@ public class MediaProducerCloseProtocol extends ProtocolRoomAdapter implements A
if(clientType.mediaClient()) {
producer.close();
} else if(clientType.mediaServer()) {
// TODO路由到真实消费者
producer.remove();
room.broadcast(message);
} else {

View File

@@ -57,6 +57,7 @@ public class MediaProducerPauseProtocol extends ProtocolRoomAdapter implements A
final Producer producer = room.producer(producerId);
producer.pause();
} else if(clientType.mediaServer()) {
// TODO路由到真实消费者
room.broadcast(message);
} else {
this.logNoAdapter(clientType);

View File

@@ -57,6 +57,7 @@ public class MediaProducerResumeProtocol extends ProtocolRoomAdapter implements
final Producer producer = room.producer(producerId);
producer.resume();
} else if(clientType.mediaServer()) {
// TODO路由到真实消费者
room.broadcast(message);
} else {
this.logNoAdapter(clientType);

View File

@@ -80,6 +80,7 @@ public class MediaTransportWebRtcCreateProtocol extends ProtocolRoomAdapter {
clientWrapper.setRecvTransport(recvTransport);
// 拷贝属性
recvTransport.copy(responseBody);
// 消费媒体:不能在连接时调用
this.publishEvent(new MediaConsumeEvent(room, clientWrapper));
}
// 生产者

View File

@@ -1,5 +1,41 @@
package com.acgist.taoyao.signal.protocol.media;
public class MediaVideoOrientationChangeProtocol {
import java.util.Map;
import com.acgist.taoyao.boot.annotation.Description;
import com.acgist.taoyao.boot.annotation.Protocol;
import com.acgist.taoyao.boot.model.Message;
import com.acgist.taoyao.signal.client.Client;
import com.acgist.taoyao.signal.client.ClientType;
import com.acgist.taoyao.signal.party.media.Room;
import com.acgist.taoyao.signal.protocol.ProtocolRoomAdapter;
/**
* 视频方向变化信令
*
* @author acgist
*/
@Protocol
@Description(
body = """
""",
flow = "媒体服务->信令服务->终端"
)
public class MediaVideoOrientationChangeProtocol extends ProtocolRoomAdapter {
public static final String SIGNAL = "media::video::orientation::change";
public MediaVideoOrientationChangeProtocol() {
super("视频方向变化信令", SIGNAL);
}
@Override
public void execute(String clientId, ClientType clientType, Room room, Client client, Client mediaClient, Message message, Map<String, Object> body) {
if(clientType.mediaServer()) {
room.broadcast(message);
} else {
this.logNoAdapter(clientType);
}
}
}

View File

@@ -1,10 +1,48 @@
package com.acgist.taoyao.signal.protocol.room;
import java.util.Map;
import com.acgist.taoyao.boot.annotation.Description;
import com.acgist.taoyao.boot.annotation.Protocol;
import com.acgist.taoyao.boot.config.Constant;
import com.acgist.taoyao.boot.model.Message;
import com.acgist.taoyao.boot.utils.MapUtils;
import com.acgist.taoyao.signal.client.Client;
import com.acgist.taoyao.signal.client.ClientType;
import com.acgist.taoyao.signal.party.media.Room;
import com.acgist.taoyao.signal.protocol.ProtocolRoomAdapter;
/**
* 踢出房间信令
*
* @author acgist
*/
public class RoomExpelProtocol {
@Protocol
@Description(
body = """
{
"roomId": "房间ID",
"clientId": "终端ID"
}
""",
flow = "终端->信令服务->终端"
)
public class RoomExpelProtocol extends ProtocolRoomAdapter {
public static final String SIGNAL = "room::expel";
public RoomExpelProtocol() {
super("踢出房间信令", SIGNAL);
}
@Override
public void execute(String clientId, ClientType clientType, Room room, Client client, Client mediaClient, Message message, Map<String, Object> body) {
if(clientType.mediaClient()) {
final String expelClientId = MapUtils.get(body, Constant.CLIENT_ID);
room.unicast(expelClientId, message);
} else {
this.logNoAdapter(clientType);
}
}
}

View File

@@ -1,10 +1,50 @@
package com.acgist.taoyao.signal.protocol.room;
import java.util.Map;
import com.acgist.taoyao.boot.annotation.Description;
import com.acgist.taoyao.boot.annotation.Protocol;
import com.acgist.taoyao.boot.config.Constant;
import com.acgist.taoyao.boot.model.Message;
import com.acgist.taoyao.boot.utils.MapUtils;
import com.acgist.taoyao.signal.client.Client;
import com.acgist.taoyao.signal.client.ClientType;
import com.acgist.taoyao.signal.party.media.Room;
import com.acgist.taoyao.signal.protocol.ProtocolRoomAdapter;
/**
* 邀请房间信令
* 邀请终端信令
*
* @author acgist
*/
public class RoomInviteProtocol {
@Protocol
@Description(
body = """
{
"roomId": "房间ID",
"clientId": "终端ID",
"password": "密码(选填)"
}
""",
flow = "终端->信令服务->终端"
)
public class RoomInviteProtocol extends ProtocolRoomAdapter {
public static final String SIGNAL = "room::invite";
public RoomInviteProtocol() {
super("邀请终端信令", SIGNAL);
}
@Override
public void execute(String clientId, ClientType clientType, Room room, Client client, Client mediaClient, Message message, Map<String, Object> body) {
if(clientType.mediaClient()) {
final String inviteClientId = MapUtils.get(body, Constant.CLIENT_ID);
body.put(Constant.PASSWORD, room.getPassword());
this.clientManager.unicast(inviteClientId, message);
} else {
this.logNoAdapter(clientType);
}
}
}