[+] 信令接入:status、score

This commit is contained in:
acgist
2023-03-08 08:12:49 +08:00
parent f53fe0eb20
commit 21f55f2759
41 changed files with 623 additions and 119 deletions

View File

@@ -393,6 +393,9 @@ class Taoyao {
case "media::consume": case "media::consume":
this.mediaConsume(message, body); this.mediaConsume(message, body);
break; break;
case "media::consumer::close":
me.mediaConsumerClose(message, body);
break;
case "media::produce": case "media::produce":
this.mediaProduce(message, body); this.mediaProduce(message, body);
break; break;
@@ -470,7 +473,7 @@ class Taoyao {
const usage = await worker.getResourceUsage(); const usage = await worker.getResourceUsage();
console.info("Worker使用情况", worker.pid, usage); console.info("Worker使用情况", worker.pid, usage);
} }
console.info("路由数量:", this.mediasoupWorkers.length); console.info("工作线程数量:", this.mediasoupWorkers.length);
console.info("房间数量:", this.rooms.size); console.info("房间数量:", this.rooms.size);
Array.from(this.rooms.values()).forEach((room) => room.usage()); Array.from(this.rooms.values()).forEach((room) => room.usage());
} }
@@ -656,15 +659,16 @@ class Taoyao {
consumer.streamId = streamId; consumer.streamId = streamId;
room.consumers.set(consumer.id, consumer); room.consumers.set(consumer.id, consumer);
consumer.on("transportclose", () => { consumer.on("transportclose", () => {
room.consumers.delete(consumer.id); console.info("通道关闭同时关闭消费者:", consumer.id);
// 信令服务统一调度关闭
// consumer.close();
// room.consumers.delete(consumer.id);
}); });
consumer.on("producerclose", () => { consumer.on("producerclose", () => {
room.consumers.delete(consumer.id); console.info("生产者关闭同时关闭消费者:", consumer.id);
this.push( // 信令服务统一调度关闭
protocol.buildMessage("media::consumer::close", { // consumer.close();
consumerId: consumer.id, // room.consumers.delete(consumer.id);
})
);
}); });
consumer.on("producerpause", () => { consumer.on("producerpause", () => {
this.push( this.push(
@@ -683,8 +687,9 @@ class Taoyao {
consumer.on("score", (score) => { consumer.on("score", (score) => {
this.push( this.push(
protocol.buildMessage("media::consumer::score", { protocol.buildMessage("media::consumer::score", {
score: score,
roomId: roomId,
consumerId: consumer.id, consumerId: consumer.id,
score,
}) })
); );
}); });
@@ -705,10 +710,18 @@ class Taoyao {
trace trace
); );
}); });
consumer.observer.on("close", () => {
this.push(
protocol.buildMessage("media::consumer::close", {
roomId: roomId,
consumerId: consumer.id
})
);
});
// TODO改为同步 // TODO改为同步
//await this.request("media::consume", {
this.push( this.push(
protocol.buildMessage("media::consume", { protocol.buildMessage("media::consume", {
//await this.request("media::consume", {
kind: consumer.kind, kind: consumer.kind,
type: consumer.type, type: consumer.type,
roomId: roomId, roomId: roomId,
@@ -725,8 +738,9 @@ class Taoyao {
await consumer.resume(); await consumer.resume();
this.push( this.push(
protocol.buildMessage("media::consumer::score", { protocol.buildMessage("media::consumer::score", {
consumerId: consumer.id,
score: consumer.score, score: consumer.score,
roomId: roomId,
consumerId: consumer.id,
}) })
); );
})() })()
@@ -740,6 +754,25 @@ class Taoyao {
} }
} }
/**
* 关闭消费者信令
*
* @param {*} message 消息
* @param {*} body 消息主体
*/
mediaConsumerClose(message, body) {
const { roomId, consumerId } = body;
const room = this.rooms.get(roomId);
const consumer = room.consumers.get(consumerId);
if(consumer) {
console.info("关闭消费者:", consumerId);
consumer.close();
room.consumers.delete(consumerId);
} else {
console.debug("关闭消费者无效:", consumerId);
}
}
/** /**
* 路由RTP协商信令 * 路由RTP协商信令
* *

View File

@@ -5,10 +5,10 @@
<video ref="video"></video> <video ref="video"></video>
<p class="title">{{ client?.name || "" }}</p> <p class="title">{{ client?.name || "" }}</p>
<div class="buttons" :style="{'--volume': client?.volume}"> <div class="buttons" :style="{'--volume': client?.volume}">
<el-button type="danger" title="打开麦克风" :icon="Mute" circle /> <el-button v-show="!client.audioActive" type="danger" title="打开麦克风" :icon="Mute" circle />
<el-button type="primary" title="关闭麦克风" :icon="Microphone" circle /> <el-button v-show="client.audioActive" type="primary" title="关闭麦克风" :icon="Microphone" circle />
<el-button type="danger" title="打开摄像头" :icon="VideoPause" circle /> <el-button v-show="!client.videoActive" type="danger" title="打开摄像头" :icon="VideoPause" circle />
<el-button type="primary" title="关闭摄像头" :icon="VideoPlay" circle /> <el-button v-show="client.videoActive" type="primary" title="关闭摄像头" :icon="VideoPlay" circle />
<el-button title="拍照" :icon="Camera" circle /> <el-button title="拍照" :icon="Camera" circle />
<el-button title="录像" :icon="VideoCamera" circle /> <el-button title="录像" :icon="VideoCamera" circle />
<el-button title="媒体信息" :icon="InfoFilled" circle /> <el-button title="媒体信息" :icon="InfoFilled" circle />

View File

@@ -236,6 +236,22 @@ class RemoteClient {
volume = 0; volume = 0;
// 代理对象 // 代理对象
proxy; proxy;
// 数据可用
dataActive = false;
// 音频可用
audioActive = false;
// 视频可用
videoActive = false;
// 数据消费者
dataConsumer;
// 音频消费者
audioConsumer;
// 视频消费者
videoConsumer;
// 音频Track
audioTrack;
// 视频Track
videoTrack;
constructor({ constructor({
name, name,
@@ -292,10 +308,6 @@ class Taoyao extends RemoteClient {
recvTransport; recvTransport;
// 媒体设备 // 媒体设备
mediasoupDevice; mediasoupDevice;
// 是否消费
consume;
// 是否生产
produce;
// 视频来源file | camera | screen // 视频来源file | camera | screen
videoSource = "camera"; videoSource = "camera";
// 强制使用TCP // 强制使用TCP
@@ -306,6 +318,12 @@ class Taoyao extends RemoteClient {
forceH264; forceH264;
// 同时上送多种质量媒体 // 同时上送多种质量媒体
useSimulcast; useSimulcast;
// 是否消费数据
dataConsume;
// 是否消费音频
audioConsume;
// 是否消费视频
videoConsume;
// 是否生产数据 // 是否生产数据
dataProduce; dataProduce;
// 是否生产音频 // 是否生产音频
@@ -331,12 +349,13 @@ class Taoyao extends RemoteClient {
username, username,
password, password,
roomId, roomId,
consume = true, dataConsume = true,
produce = true, audioConsume = true,
videoConsume = true,
dataProduce = true,
audioProduce = true, audioProduce = true,
videoProduce = true, videoProduce = true,
forceTcp = false, forceTcp = false,
dataProduce = true,
}) { }) {
super({ name, clientId }); super({ name, clientId });
this.name = name; this.name = name;
@@ -346,11 +365,12 @@ class Taoyao extends RemoteClient {
this.username = username; this.username = username;
this.password = password; this.password = password;
this.roomId = roomId; this.roomId = roomId;
this.consume = consume; this.dataConsume = dataConsume;
this.produce = produce; this.audioConsume = audioConsume;
this.dataProduce = produce && dataProduce; this.videoConsume = videoConsume;
this.audioProduce = produce && audioProduce; this.dataProduce = dataProduce;
this.videoProduce = produce && videoProduce; this.audioProduce = audioProduce;
this.videoProduce = videoProduce;
this.forceTcp = forceTcp; this.forceTcp = forceTcp;
} }
@@ -494,6 +514,9 @@ class Taoyao extends RemoteClient {
case "media::audio::volume": case "media::audio::volume":
me.defaultMediaAudioVolume(message); me.defaultMediaAudioVolume(message);
break; break;
case "media::consumer::close":
me.defaultMediaConsumerClose(message);
break;
case "room::client::list": case "room::client::list":
me.defaultRoomClientList(message); me.defaultRoomClientList(message);
break; break;
@@ -584,6 +607,35 @@ class Taoyao extends RemoteClient {
} }
}); });
} }
/**
* 关闭消费者信令
*
* @param {*} consumerId 消费者ID
*/
mediaConsumerClose(consumerId) {
const me = this;
me.push(protocol.buildMessage("media::consumer::close", {
roomId: me.roomId,
consumerId: consumerId
}));
}
/**
* 关闭消费者信令
*
* @param {*} message 消息
*/
defaultMediaConsumerClose(message) {
const me = this;
const { roomId, consumerId } = message.body;
const consumer = me.consumers.get(consumerId);
if(consumer) {
console.info("关闭消费者:", consumerId);
consumer.close();
me.consumers.delete(consumerId);
} else {
console.debug("关闭消费者无效:", consumerId);
}
}
/** /**
* 消费媒体信令 * 消费媒体信令
* *
@@ -591,8 +643,8 @@ class Taoyao extends RemoteClient {
*/ */
async defaultMediaConsume(message) { async defaultMediaConsume(message) {
const self = this; const self = this;
if (!self.consume) { if (!self.audioConsume && !self.videoConsume) {
console.log("没有消费媒体"); console.debug("没有消费媒体");
return; return;
} }
const { const {
@@ -655,6 +707,19 @@ class Taoyao extends RemoteClient {
console.debug("远程媒体:", consumer); console.debug("远程媒体:", consumer);
const remoteClient = self.remoteClients.get(consumer.sourceId); const remoteClient = self.remoteClients.get(consumer.sourceId);
if(remoteClient && remoteClient.proxy && remoteClient.proxy.media) { if(remoteClient && remoteClient.proxy && remoteClient.proxy.media) {
const track = consumer.track;
// TODO旧的媒体
if(track.kind === 'audio') {
remoteClient.audioActive = true;
remoteClient.audioTrack = track;
remoteClient.audioConsumer = consumer;
} else if(track.kind === 'video') {
remoteClient.audioActive = false;
remoteClient.videoTrack = track;
remoteClient.videoconsumer = consumer;
} else {
console.warn("不支持的媒体:", track);
}
remoteClient.proxy.media(consumer.track, consumer); remoteClient.proxy.media(consumer.track, consumer);
} else { } else {
console.warn("远程终端没有实现服务代理:", remoteClient); console.warn("远程终端没有实现服务代理:", remoteClient);
@@ -858,7 +923,7 @@ class Taoyao extends RemoteClient {
setTimeout(() => audioTrack.stop(), 30000); setTimeout(() => audioTrack.stop(), 30000);
}); });
} }
if (self.produce) { if (self.audioProduce || self.videoProduce) {
const response = await self.request( const response = await self.request(
protocol.buildMessage("media::transport::webrtc::create", { protocol.buildMessage("media::transport::webrtc::create", {
roomId: self.roomId, roomId: self.roomId,
@@ -954,7 +1019,7 @@ class Taoyao extends RemoteClient {
} }
); );
} }
if (this.consume) { if (self.audioConsume || self.videoConsume) {
const self = this; const self = this;
const response = await self.request( const response = await self.request(
protocol.buildMessage("media::transport::webrtc::create", { protocol.buildMessage("media::transport::webrtc::create", {
@@ -1163,6 +1228,7 @@ class Taoyao extends RemoteClient {
// TODO异常 // TODO异常
} }
if(self.proxy && self.proxy.media) { if(self.proxy && self.proxy.media) {
self.videoTrack = track;
self.proxy.media(track); self.proxy.media(track);
} else { } else {
console.warn("终端没有实现服务代理:", self); console.warn("终端没有实现服务代理:", self);
@@ -1323,7 +1389,6 @@ class Taoyao extends RemoteClient {
async checkDevice() { async checkDevice() {
const self = this; const self = this;
if ( if (
self.produce &&
navigator.mediaDevices && navigator.mediaDevices &&
navigator.mediaDevices.getUserMedia && navigator.mediaDevices.getUserMedia &&
navigator.mediaDevices.enumerateDevices navigator.mediaDevices.enumerateDevices

View File

@@ -11,7 +11,7 @@ import org.springframework.scheduling.annotation.Scheduled;
import com.acgist.taoyao.boot.annotation.Manager; import com.acgist.taoyao.boot.annotation.Manager;
import com.acgist.taoyao.boot.config.TaoyaoProperties; import com.acgist.taoyao.boot.config.TaoyaoProperties;
import com.acgist.taoyao.boot.model.Message; import com.acgist.taoyao.boot.model.Message;
import com.acgist.taoyao.signal.event.ClientCloseEvent; import com.acgist.taoyao.signal.event.client.ClientCloseEvent;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;

View File

@@ -1,18 +0,0 @@
package com.acgist.taoyao.signal.event;
import com.acgist.taoyao.signal.client.Client;
/**
* 媒体服务终端注册事件
*
* @author acgist
*/
public class MediaClientRegisterEvent extends ClientEventAdapter {
private static final long serialVersionUID = 1L;
public MediaClientRegisterEvent(Client client) {
super(client);
}
}

View File

@@ -1,6 +1,7 @@
package com.acgist.taoyao.signal.event; package com.acgist.taoyao.signal.event.client;
import com.acgist.taoyao.signal.client.Client; import com.acgist.taoyao.signal.client.Client;
import com.acgist.taoyao.signal.event.ClientEventAdapter;
/** /**
* 终端关闭事件 * 终端关闭事件

View File

@@ -1,6 +1,7 @@
package com.acgist.taoyao.signal.event; package com.acgist.taoyao.signal.event.client;
import com.acgist.taoyao.signal.client.Client; import com.acgist.taoyao.signal.client.Client;
import com.acgist.taoyao.signal.event.ClientEventAdapter;
import lombok.Getter; import lombok.Getter;
import lombok.Setter; import lombok.Setter;

View File

@@ -1,6 +1,7 @@
package com.acgist.taoyao.signal.event; package com.acgist.taoyao.signal.event.client;
import com.acgist.taoyao.signal.client.Client; import com.acgist.taoyao.signal.client.Client;
import com.acgist.taoyao.signal.event.ClientEventAdapter;
import lombok.Getter; import lombok.Getter;
import lombok.Setter; import lombok.Setter;

View File

@@ -1,6 +1,7 @@
package com.acgist.taoyao.signal.event; package com.acgist.taoyao.signal.event.client;
import com.acgist.taoyao.signal.client.Client; import com.acgist.taoyao.signal.client.Client;
import com.acgist.taoyao.signal.event.ClientEventAdapter;
import lombok.Getter; import lombok.Getter;
import lombok.Setter; import lombok.Setter;

View File

@@ -1,5 +1,6 @@
package com.acgist.taoyao.signal.event; package com.acgist.taoyao.signal.event.media;
import com.acgist.taoyao.signal.event.RoomEventAdapter;
import com.acgist.taoyao.signal.party.media.ClientWrapper; import com.acgist.taoyao.signal.party.media.ClientWrapper;
import com.acgist.taoyao.signal.party.media.Producer; import com.acgist.taoyao.signal.party.media.Producer;
import com.acgist.taoyao.signal.party.media.Room; import com.acgist.taoyao.signal.party.media.Room;
@@ -8,13 +9,13 @@ import lombok.Getter;
import lombok.Setter; import lombok.Setter;
/** /**
* 生产媒体事件 * 消费媒体事件
* *
* @author acgist * @author acgist
*/ */
@Getter @Getter
@Setter @Setter
public class MediaProduceEvent extends RoomEventAdapter { public class MediaConsumeEvent extends RoomEventAdapter {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
@@ -27,13 +28,13 @@ public class MediaProduceEvent extends RoomEventAdapter {
*/ */
private final ClientWrapper clientWrapper; private final ClientWrapper clientWrapper;
public MediaProduceEvent(Room room, Producer producer) { public MediaConsumeEvent(Room room, Producer producer) {
super(room); super(room);
this.producer = producer; this.producer = producer;
this.clientWrapper = null; this.clientWrapper = null;
} }
public MediaProduceEvent(Room room, ClientWrapper clientWrapper) { public MediaConsumeEvent(Room room, ClientWrapper clientWrapper) {
super(room); super(room);
this.producer = null; this.producer = null;
this.clientWrapper = clientWrapper; this.clientWrapper = clientWrapper;

View File

@@ -1,5 +1,6 @@
package com.acgist.taoyao.signal.event; package com.acgist.taoyao.signal.event.media;
import com.acgist.taoyao.signal.event.RoomEventAdapter;
import com.acgist.taoyao.signal.party.media.Room; import com.acgist.taoyao.signal.party.media.Room;
import lombok.Getter; import lombok.Getter;
@@ -12,7 +13,7 @@ import lombok.Setter;
*/ */
@Getter @Getter
@Setter @Setter
public class ConsumerCloseEvent extends RoomEventAdapter { public class MediaConsumerCloseEvent extends RoomEventAdapter {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
@@ -21,7 +22,7 @@ public class ConsumerCloseEvent extends RoomEventAdapter {
*/ */
private final String consumerId; private final String consumerId;
public ConsumerCloseEvent(String consumerId, Room room) { public MediaConsumerCloseEvent(String consumerId, Room room) {
super(room); super(room);
this.consumerId = consumerId; this.consumerId = consumerId;
} }

View File

@@ -1,5 +1,6 @@
package com.acgist.taoyao.signal.event; package com.acgist.taoyao.signal.event.media;
import com.acgist.taoyao.signal.event.RoomEventAdapter;
import com.acgist.taoyao.signal.party.media.Room; import com.acgist.taoyao.signal.party.media.Room;
import lombok.Getter; import lombok.Getter;
@@ -12,7 +13,7 @@ import lombok.Setter;
*/ */
@Getter @Getter
@Setter @Setter
public class ProducerCloseEvent extends RoomEventAdapter { public class MediaProducerCloseEvent extends RoomEventAdapter {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
@@ -21,7 +22,7 @@ public class ProducerCloseEvent extends RoomEventAdapter {
*/ */
private final String producerId; private final String producerId;
public ProducerCloseEvent(String producerId, Room room) { public MediaProducerCloseEvent(String producerId, Room room) {
super(room); super(room);
this.producerId = producerId; this.producerId = producerId;
} }

View File

@@ -0,0 +1,19 @@
package com.acgist.taoyao.signal.event.room;
import com.acgist.taoyao.signal.client.Client;
import com.acgist.taoyao.signal.event.ClientEventAdapter;
/**
* 创建房间事件
*
* @author acgist
*/
public class RoomCreateEvent extends ClientEventAdapter {
private static final long serialVersionUID = 1L;
public RoomCreateEvent(Client client) {
super(client);
}
}

View File

@@ -1,6 +1,7 @@
package com.acgist.taoyao.signal.event; package com.acgist.taoyao.signal.event.room;
import com.acgist.taoyao.signal.client.Client; import com.acgist.taoyao.signal.client.Client;
import com.acgist.taoyao.signal.event.RoomEventAdapter;
import com.acgist.taoyao.signal.party.media.Room; import com.acgist.taoyao.signal.party.media.Room;
import lombok.Getter; import lombok.Getter;

View File

@@ -1,6 +1,7 @@
package com.acgist.taoyao.signal.event; package com.acgist.taoyao.signal.event.room;
import com.acgist.taoyao.signal.client.Client; import com.acgist.taoyao.signal.client.Client;
import com.acgist.taoyao.signal.event.RoomEventAdapter;
import com.acgist.taoyao.signal.party.media.Room; import com.acgist.taoyao.signal.party.media.Room;
import lombok.Getter; import lombok.Getter;

View File

@@ -2,8 +2,8 @@ package com.acgist.taoyao.signal.party.media;
import java.io.Closeable; import java.io.Closeable;
import com.acgist.taoyao.signal.event.ConsumerCloseEvent;
import com.acgist.taoyao.signal.event.EventPublisher; import com.acgist.taoyao.signal.event.EventPublisher;
import com.acgist.taoyao.signal.event.media.MediaConsumerCloseEvent;
import lombok.Getter; import lombok.Getter;
import lombok.Setter; import lombok.Setter;
@@ -66,7 +66,7 @@ public class Consumer implements Closeable {
log.info("关闭消费者:{}", this.consumerId); log.info("关闭消费者:{}", this.consumerId);
this.getProducer().remove(this.consumerId); this.getProducer().remove(this.consumerId);
this.consumerClient.getConsumers().remove(this.consumerId); this.consumerClient.getConsumers().remove(this.consumerId);
EventPublisher.publishEvent(new ConsumerCloseEvent(this.consumerId, this.room)); EventPublisher.publishEvent(new MediaConsumerCloseEvent(this.consumerId, this.room));
} }
} }

View File

@@ -5,7 +5,7 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import com.acgist.taoyao.signal.event.EventPublisher; import com.acgist.taoyao.signal.event.EventPublisher;
import com.acgist.taoyao.signal.event.ProducerCloseEvent; import com.acgist.taoyao.signal.event.media.MediaProducerCloseEvent;
import lombok.Getter; import lombok.Getter;
import lombok.Setter; import lombok.Setter;
@@ -78,7 +78,7 @@ public class Producer implements Closeable {
log.info("关闭生产者:{}", this.producerId); log.info("关闭生产者:{}", this.producerId);
this.consumers.forEach((k, v) -> v.close()); this.consumers.forEach((k, v) -> v.close());
this.producerClient.getProducers().remove(this.producerId); this.producerClient.getProducers().remove(this.producerId);
EventPublisher.publishEvent(new ProducerCloseEvent(this.producerId, this.room)); EventPublisher.publishEvent(new MediaProducerCloseEvent(this.producerId, this.room));
} }
} }

View File

@@ -10,7 +10,7 @@ import com.acgist.taoyao.boot.model.Message;
import com.acgist.taoyao.signal.client.Client; import com.acgist.taoyao.signal.client.Client;
import com.acgist.taoyao.signal.client.ClientStatus; import com.acgist.taoyao.signal.client.ClientStatus;
import com.acgist.taoyao.signal.event.EventPublisher; import com.acgist.taoyao.signal.event.EventPublisher;
import com.acgist.taoyao.signal.event.RoomLeaveEvent; import com.acgist.taoyao.signal.event.room.RoomLeaveEvent;
import lombok.Getter; import lombok.Getter;
import lombok.Setter; import lombok.Setter;

View File

@@ -10,8 +10,8 @@ import com.acgist.taoyao.boot.annotation.Protocol;
import com.acgist.taoyao.boot.model.Message; import com.acgist.taoyao.boot.model.Message;
import com.acgist.taoyao.signal.client.Client; import com.acgist.taoyao.signal.client.Client;
import com.acgist.taoyao.signal.client.ClientType; import com.acgist.taoyao.signal.client.ClientType;
import com.acgist.taoyao.signal.event.ClientCloseEvent; import com.acgist.taoyao.signal.event.client.ClientCloseEvent;
import com.acgist.taoyao.signal.event.ClientOfflineEvent; import com.acgist.taoyao.signal.event.client.ClientOfflineEvent;
import com.acgist.taoyao.signal.protocol.ProtocolClientAdapter; import com.acgist.taoyao.signal.protocol.ProtocolClientAdapter;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;

View File

@@ -17,7 +17,7 @@ import com.acgist.taoyao.boot.utils.DateUtils;
import com.acgist.taoyao.boot.utils.DateUtils.DateTimeStyle; import com.acgist.taoyao.boot.utils.DateUtils.DateTimeStyle;
import com.acgist.taoyao.signal.client.Client; import com.acgist.taoyao.signal.client.Client;
import com.acgist.taoyao.signal.client.ClientType; import com.acgist.taoyao.signal.client.ClientType;
import com.acgist.taoyao.signal.event.ClientConfigEvent; import com.acgist.taoyao.signal.event.client.ClientConfigEvent;
import com.acgist.taoyao.signal.protocol.ProtocolClientAdapter; import com.acgist.taoyao.signal.protocol.ProtocolClientAdapter;
/** /**

View File

@@ -8,7 +8,7 @@ import org.springframework.scheduling.annotation.Async;
import com.acgist.taoyao.boot.annotation.Description; import com.acgist.taoyao.boot.annotation.Description;
import com.acgist.taoyao.boot.annotation.Protocol; import com.acgist.taoyao.boot.annotation.Protocol;
import com.acgist.taoyao.boot.config.Constant; import com.acgist.taoyao.boot.config.Constant;
import com.acgist.taoyao.signal.event.ClientOfflineEvent; import com.acgist.taoyao.signal.event.client.ClientOfflineEvent;
import com.acgist.taoyao.signal.protocol.ProtocolClientAdapter; import com.acgist.taoyao.signal.protocol.ProtocolClientAdapter;
/** /**

View File

@@ -6,7 +6,7 @@ import org.springframework.scheduling.annotation.Async;
import com.acgist.taoyao.boot.annotation.Description; import com.acgist.taoyao.boot.annotation.Description;
import com.acgist.taoyao.boot.annotation.Protocol; import com.acgist.taoyao.boot.annotation.Protocol;
import com.acgist.taoyao.signal.client.Client; import com.acgist.taoyao.signal.client.Client;
import com.acgist.taoyao.signal.event.ClientOnlineEvent; import com.acgist.taoyao.signal.event.client.ClientOnlineEvent;
import com.acgist.taoyao.signal.protocol.ProtocolClientAdapter; import com.acgist.taoyao.signal.protocol.ProtocolClientAdapter;
/** /**

View File

@@ -13,9 +13,9 @@ import com.acgist.taoyao.boot.utils.MapUtils;
import com.acgist.taoyao.signal.client.Client; import com.acgist.taoyao.signal.client.Client;
import com.acgist.taoyao.signal.client.ClientStatus; import com.acgist.taoyao.signal.client.ClientStatus;
import com.acgist.taoyao.signal.client.ClientType; import com.acgist.taoyao.signal.client.ClientType;
import com.acgist.taoyao.signal.event.ClientConfigEvent; import com.acgist.taoyao.signal.event.client.ClientConfigEvent;
import com.acgist.taoyao.signal.event.ClientOnlineEvent; import com.acgist.taoyao.signal.event.client.ClientOnlineEvent;
import com.acgist.taoyao.signal.event.MediaClientRegisterEvent; import com.acgist.taoyao.signal.event.room.RoomCreateEvent;
import com.acgist.taoyao.signal.protocol.ProtocolClientAdapter; import com.acgist.taoyao.signal.protocol.ProtocolClientAdapter;
import com.acgist.taoyao.signal.service.SecurityService; import com.acgist.taoyao.signal.service.SecurityService;
@@ -95,9 +95,9 @@ public class ClientRegisterProtocol extends ProtocolClientAdapter {
this.publishEvent(new ClientConfigEvent(client)); this.publishEvent(new ClientConfigEvent(client));
// 终端上线事件 // 终端上线事件
this.publishEvent(new ClientOnlineEvent(client)); this.publishEvent(new ClientOnlineEvent(client));
// 媒体服务注册事件 // 媒体服务注册:创建房间事件
if(clientType.mediaServer()) { if(clientType.mediaServer()) {
this.publishEvent(new MediaClientRegisterEvent(client)); this.publishEvent(new RoomCreateEvent(client));
} }
} }

View File

@@ -14,7 +14,7 @@ import com.acgist.taoyao.boot.model.MessageCodeException;
import com.acgist.taoyao.boot.utils.MapUtils; import com.acgist.taoyao.boot.utils.MapUtils;
import com.acgist.taoyao.signal.client.Client; import com.acgist.taoyao.signal.client.Client;
import com.acgist.taoyao.signal.client.ClientType; import com.acgist.taoyao.signal.client.ClientType;
import com.acgist.taoyao.signal.event.MediaProduceEvent; import com.acgist.taoyao.signal.event.media.MediaConsumeEvent;
import com.acgist.taoyao.signal.party.media.ClientWrapper; import com.acgist.taoyao.signal.party.media.ClientWrapper;
import com.acgist.taoyao.signal.party.media.Consumer; import com.acgist.taoyao.signal.party.media.Consumer;
import com.acgist.taoyao.signal.party.media.Producer; import com.acgist.taoyao.signal.party.media.Producer;
@@ -43,7 +43,7 @@ import lombok.extern.slf4j.Slf4j;
"终端->信令服务->媒体服务=>信令服务->终端->信令服务->媒体服务" "终端->信令服务->媒体服务=>信令服务->终端->信令服务->媒体服务"
} }
) )
public class MediaConsumeProtocol extends ProtocolRoomAdapter implements ApplicationListener<MediaProduceEvent> { public class MediaConsumeProtocol extends ProtocolRoomAdapter implements ApplicationListener<MediaConsumeEvent> {
public static final String SIGNAL = "media::consume"; public static final String SIGNAL = "media::consume";
@@ -53,7 +53,7 @@ public class MediaConsumeProtocol extends ProtocolRoomAdapter implements Applica
@Async @Async
@Override @Override
public void onApplicationEvent(MediaProduceEvent event) { public void onApplicationEvent(MediaConsumeEvent event) {
final Room room = event.getRoom(); final Room room = event.getRoom();
if(event.getProducer() != null) { if(event.getProducer() != null) {
// 生产媒体:其他终端消费 // 生产媒体:其他终端消费
@@ -116,6 +116,7 @@ public class MediaConsumeProtocol extends ProtocolRoomAdapter implements Applica
private void consume(Room room, ClientWrapper consumerClientWrapper, Producer producer, Message message) { private void consume(Room room, ClientWrapper consumerClientWrapper, Producer producer, Message message) {
final Client mediaClient = room.getMediaClient(); final Client mediaClient = room.getMediaClient();
if(consumerClientWrapper.consumed(producer)) { if(consumerClientWrapper.consumed(producer)) {
// TODO没有清理干净
// 消费通道准备就绪 // 消费通道准备就绪
if(log.isDebugEnabled()) { if(log.isDebugEnabled()) {
log.debug("消费通道准备就绪:{} - {}", consumerClientWrapper.getClientId(), producer.getStreamId()); log.debug("消费通道准备就绪:{} - {}", consumerClientWrapper.getClientId(), producer.getStreamId());

View File

@@ -3,6 +3,7 @@ package com.acgist.taoyao.signal.protocol.media;
import java.util.Map; import java.util.Map;
import org.springframework.context.ApplicationListener; import org.springframework.context.ApplicationListener;
import org.springframework.scheduling.annotation.Async;
import com.acgist.taoyao.boot.annotation.Description; import com.acgist.taoyao.boot.annotation.Description;
import com.acgist.taoyao.boot.annotation.Protocol; import com.acgist.taoyao.boot.annotation.Protocol;
@@ -11,7 +12,7 @@ import com.acgist.taoyao.boot.model.Message;
import com.acgist.taoyao.boot.utils.MapUtils; import com.acgist.taoyao.boot.utils.MapUtils;
import com.acgist.taoyao.signal.client.Client; import com.acgist.taoyao.signal.client.Client;
import com.acgist.taoyao.signal.client.ClientType; import com.acgist.taoyao.signal.client.ClientType;
import com.acgist.taoyao.signal.event.ConsumerCloseEvent; import com.acgist.taoyao.signal.event.media.MediaConsumerCloseEvent;
import com.acgist.taoyao.signal.party.media.Consumer; import com.acgist.taoyao.signal.party.media.Consumer;
import com.acgist.taoyao.signal.party.media.Room; import com.acgist.taoyao.signal.party.media.Room;
import com.acgist.taoyao.signal.protocol.ProtocolRoomAdapter; import com.acgist.taoyao.signal.protocol.ProtocolRoomAdapter;
@@ -20,6 +21,7 @@ import lombok.extern.slf4j.Slf4j;
/** /**
* 关闭消费者信令 * 关闭消费者信令
* 注意:正常情况不会存在关闭消费者的情况,所以一般不用处理关闭消费者信令。
* *
* @author acgist * @author acgist
*/ */
@@ -28,12 +30,13 @@ import lombok.extern.slf4j.Slf4j;
@Description( @Description(
body = """ body = """
{ {
"roomId": "房间ID"
"consumerId": "消费者ID" "consumerId": "消费者ID"
} }
""", """,
flow = "终端->信令服务+)终端" flow = "终端->信令服务+)终端"
) )
public class MediaConsumerCloseProtocol extends ProtocolRoomAdapter implements ApplicationListener<ConsumerCloseEvent> { public class MediaConsumerCloseProtocol extends ProtocolRoomAdapter implements ApplicationListener<MediaConsumerCloseEvent> {
public static final String SIGNAL = "media::consumer::close"; public static final String SIGNAL = "media::consumer::close";
@@ -41,14 +44,15 @@ public class MediaConsumerCloseProtocol extends ProtocolRoomAdapter implements A
super("关闭消费者信令", SIGNAL); super("关闭消费者信令", SIGNAL);
} }
@Async
@Override @Override
public void onApplicationEvent(ConsumerCloseEvent event) { public void onApplicationEvent(MediaConsumerCloseEvent event) {
final Room room = event.getRoom(); final Room room = event.getRoom();
final Map<String, Object> body = Map.of( final Map<String, Object> body = Map.of(
Constant.ROOM_ID, room.getRoomId(), Constant.ROOM_ID, room.getRoomId(),
Constant.CONSUMER_ID, event.getConsumerId() Constant.CONSUMER_ID, event.getConsumerId()
); );
this.close(room, this.build(body)); room.broadcastAll(this.build(body));
} }
@Override @Override
@@ -62,14 +66,4 @@ public class MediaConsumerCloseProtocol extends ProtocolRoomAdapter implements A
} }
} }
/**
* 关闭消费者
*
* @param room 房间
* @param message 消息
*/
private void close(Room room, Message message) {
room.broadcastAll(message);
}
} }

View File

@@ -1,5 +1,47 @@
package com.acgist.taoyao.signal.protocol.media; package com.acgist.taoyao.signal.protocol.media;
public class MediaConsumerPauseProtocol { import java.util.Map;
import com.acgist.taoyao.boot.annotation.Description;
import com.acgist.taoyao.boot.annotation.Protocol;
import com.acgist.taoyao.boot.model.Message;
import com.acgist.taoyao.signal.client.Client;
import com.acgist.taoyao.signal.client.ClientType;
import com.acgist.taoyao.signal.party.media.Room;
import com.acgist.taoyao.signal.protocol.ProtocolRoomAdapter;
/**
* 暂停消费者信令
*
* @author acgist
*/
@Protocol
@Description(
body = """
{
"roomId": "房间ID"
"consumerId": "消费者ID"
}
""",
flow = "终端->信令服务->媒体服务->信令服务->终端"
)
public class MediaConsumerPauseProtocol extends ProtocolRoomAdapter {
public static final String SIGNAL = "media::consumer::pause";
public MediaConsumerPauseProtocol() {
super("暂停消费者信令", SIGNAL);
}
@Override
public void execute(String clientId, ClientType clientType, Room room, Client client, Client mediaClient, Message message, Map<String, Object> body) {
if(clientType.mediaClient()) {
mediaClient.push(message);
} else if(clientType.mediaServer()) {
room.broadcast(message);
} else {
this.logNoAdapter(clientType);
}
}
} }

View File

@@ -1,5 +1,47 @@
package com.acgist.taoyao.signal.protocol.media; package com.acgist.taoyao.signal.protocol.media;
public class MediaConsumerResumeProtocol { import java.util.Map;
import com.acgist.taoyao.boot.annotation.Description;
import com.acgist.taoyao.boot.annotation.Protocol;
import com.acgist.taoyao.boot.model.Message;
import com.acgist.taoyao.signal.client.Client;
import com.acgist.taoyao.signal.client.ClientType;
import com.acgist.taoyao.signal.party.media.Room;
import com.acgist.taoyao.signal.protocol.ProtocolRoomAdapter;
/**
* 恢复消费者信令
*
* @author acgist
*/
@Protocol
@Description(
body = """
{
"roomId": "房间ID"
"consumerId": "消费者ID"
}
""",
flow = "终端->信令服务->媒体服务->信令服务->终端"
)
public class MediaConsumerResumeProtocol extends ProtocolRoomAdapter {
public static final String SIGNAL = "media::consumer::resumt";
public MediaConsumerResumeProtocol() {
super("恢复消费者信令", SIGNAL);
}
@Override
public void execute(String clientId, ClientType clientType, Room room, Client client, Client mediaClient, Message message, Map<String, Object> body) {
if(clientType.mediaClient()) {
mediaClient.push(message);
} else if(clientType.mediaServer()) {
room.broadcast(message);
} else {
this.logNoAdapter(clientType);
}
}
} }

View File

@@ -0,0 +1,46 @@
package com.acgist.taoyao.signal.protocol.media;
import java.util.Map;
import com.acgist.taoyao.boot.annotation.Description;
import com.acgist.taoyao.boot.annotation.Protocol;
import com.acgist.taoyao.boot.model.Message;
import com.acgist.taoyao.signal.client.Client;
import com.acgist.taoyao.signal.client.ClientType;
import com.acgist.taoyao.signal.party.media.Room;
import com.acgist.taoyao.signal.protocol.ProtocolRoomAdapter;
/**
* 媒体消费者评分信令
*
* @author acgist
*/
@Protocol
@Description(
body = """
{
"score": "消费者RTP流得分表示传输质量0~10",
"producerScore": "生产者RTP流得分表示传输质量0~10",
"producerScores": [所有生产者RTP流得分]
}
""",
flow = "媒体服务->信令服务->终端"
)
public class MediaConsumerScoreProtocol extends ProtocolRoomAdapter {
public static final String SIGNAL = "media::consumer::score";
public MediaConsumerScoreProtocol() {
super("媒体消费者评分信令", SIGNAL);
}
@Override
public void execute(String clientId, ClientType clientType, Room room, Client client, Client mediaClient, Message message, Map<String, Object> body) {
if(clientType.mediaServer()) {
room.broadcast(message);
} else {
this.logNoAdapter(clientType);
}
}
}

View File

@@ -1,5 +1,45 @@
package com.acgist.taoyao.signal.protocol.media; package com.acgist.taoyao.signal.protocol.media;
public class MediaConsumerStatusProtocol { import java.util.Map;
import com.acgist.taoyao.boot.annotation.Description;
import com.acgist.taoyao.boot.annotation.Protocol;
import com.acgist.taoyao.boot.model.Message;
import com.acgist.taoyao.signal.client.Client;
import com.acgist.taoyao.signal.client.ClientType;
import com.acgist.taoyao.signal.party.media.Room;
import com.acgist.taoyao.signal.protocol.ProtocolRoomAdapter;
/**
* 查询消费者状态信令
*
* @author acgist
*/
@Protocol
@Description(
body = """
{
"roomId": "房间ID",
"consumerId": "消费者ID"
}
""",
flow = "终端=>信令服务->媒体服务->信令服务->终端"
)
public class MediaConsumerStatusProtocol extends ProtocolRoomAdapter {
public static final String SIGNAL = "media::consumer::status";
public MediaConsumerStatusProtocol() {
super("查询消费者状态信令", SIGNAL);
}
@Override
public void execute(String clientId, ClientType clientType, Room room, Client client, Client mediaClient, Message message, Map<String, Object> body) {
if(clientType.mediaClient()) {
client.push(mediaClient.request(message));
} else {
this.logNoAdapter(clientType);
}
}
} }

View File

@@ -9,7 +9,7 @@ import com.acgist.taoyao.boot.model.Message;
import com.acgist.taoyao.boot.utils.MapUtils; import com.acgist.taoyao.boot.utils.MapUtils;
import com.acgist.taoyao.signal.client.Client; import com.acgist.taoyao.signal.client.Client;
import com.acgist.taoyao.signal.client.ClientType; import com.acgist.taoyao.signal.client.ClientType;
import com.acgist.taoyao.signal.event.MediaProduceEvent; import com.acgist.taoyao.signal.event.media.MediaConsumeEvent;
import com.acgist.taoyao.signal.party.media.ClientWrapper; import com.acgist.taoyao.signal.party.media.ClientWrapper;
import com.acgist.taoyao.signal.party.media.Producer; import com.acgist.taoyao.signal.party.media.Producer;
import com.acgist.taoyao.signal.party.media.Room; import com.acgist.taoyao.signal.party.media.Room;
@@ -71,7 +71,7 @@ public class MediaProduceProtocol extends ProtocolRoomAdapter {
// 全部不收:全部广播 // 全部不收:全部广播
room.broadcast(responseMessage); room.broadcast(responseMessage);
log.info("{}生产媒体:{} - {} - {}", clientId, kind, streamId, producerId); log.info("{}生产媒体:{} - {} - {}", clientId, kind, streamId, producerId);
this.publishEvent(new MediaProduceEvent(room, producer)); this.publishEvent(new MediaConsumeEvent(room, producer));
} }
} }

View File

@@ -1,5 +1,68 @@
package com.acgist.taoyao.signal.protocol.media; package com.acgist.taoyao.signal.protocol.media;
public class MediaProducerCloseProtocol { import java.util.Map;
import org.springframework.context.ApplicationListener;
import org.springframework.scheduling.annotation.Async;
import com.acgist.taoyao.boot.annotation.Description;
import com.acgist.taoyao.boot.annotation.Protocol;
import com.acgist.taoyao.boot.config.Constant;
import com.acgist.taoyao.boot.model.Message;
import com.acgist.taoyao.boot.utils.MapUtils;
import com.acgist.taoyao.signal.client.Client;
import com.acgist.taoyao.signal.client.ClientType;
import com.acgist.taoyao.signal.event.media.MediaProducerCloseEvent;
import com.acgist.taoyao.signal.party.media.Producer;
import com.acgist.taoyao.signal.party.media.Room;
import com.acgist.taoyao.signal.protocol.ProtocolRoomAdapter;
import lombok.extern.slf4j.Slf4j;
/**
* 关闭生产者信令
*
* @author acgist
*/
@Slf4j
@Protocol
@Description(
body = """
{
"roomId": "房间ID"
"consumerId": "消费者ID"
}
""",
flow = "终端->信令服务+)终端"
)
public class MediaProducerCloseProtocol extends ProtocolRoomAdapter implements ApplicationListener<MediaProducerCloseEvent> {
public static final String SIGNAL = "media::producer::close";
public MediaProducerCloseProtocol() {
super("关闭生产者信令", SIGNAL);
}
@Async
@Override
public void onApplicationEvent(MediaProducerCloseEvent event) {
final Room room = event.getRoom();
final Map<String, Object> body = Map.of(
Constant.ROOM_ID, room.getRoomId(),
Constant.PRODUCER_ID, event.getProducerId()
);
room.broadcastAll(this.build(body));
}
@Override
public void execute(String clientId, ClientType clientType, Room room, Client client, Client mediaClient, Message message, Map<String, Object> body) {
final String producerId = MapUtils.get(body, Constant.PRODUCER_ID);
final Producer producer = room.producer(producerId);
if(producer == null) {
log.warn("关闭生产者无效:{}", producerId);
} else {
producer.close();
}
}
} }

View File

@@ -1,5 +1,47 @@
package com.acgist.taoyao.signal.protocol.media; package com.acgist.taoyao.signal.protocol.media;
public class MediaProducerPauseProtocol { import java.util.Map;
import com.acgist.taoyao.boot.annotation.Description;
import com.acgist.taoyao.boot.annotation.Protocol;
import com.acgist.taoyao.boot.model.Message;
import com.acgist.taoyao.signal.client.Client;
import com.acgist.taoyao.signal.client.ClientType;
import com.acgist.taoyao.signal.party.media.Room;
import com.acgist.taoyao.signal.protocol.ProtocolRoomAdapter;
/**
* 暂停生产者信令
*
* @author acgist
*/
@Protocol
@Description(
body = """
{
"roomId": "房间ID"
"producerId": "消费者ID"
}
""",
flow = "终端->信令服务->媒体服务->信令服务->终端"
)
public class MediaProducerPauseProtocol extends ProtocolRoomAdapter {
public static final String SIGNAL = "media::producer::pause";
public MediaProducerPauseProtocol() {
super("暂停生产者信令", SIGNAL);
}
@Override
public void execute(String clientId, ClientType clientType, Room room, Client client, Client mediaClient, Message message, Map<String, Object> body) {
if(clientType.mediaClient()) {
mediaClient.push(message);
} else if(clientType.mediaServer()) {
room.broadcast(message);
} else {
this.logNoAdapter(clientType);
}
}
} }

View File

@@ -1,5 +1,47 @@
package com.acgist.taoyao.signal.protocol.media; package com.acgist.taoyao.signal.protocol.media;
public class MediaProducerResumeProtocol { import java.util.Map;
import com.acgist.taoyao.boot.annotation.Description;
import com.acgist.taoyao.boot.annotation.Protocol;
import com.acgist.taoyao.boot.model.Message;
import com.acgist.taoyao.signal.client.Client;
import com.acgist.taoyao.signal.client.ClientType;
import com.acgist.taoyao.signal.party.media.Room;
import com.acgist.taoyao.signal.protocol.ProtocolRoomAdapter;
/**
* 恢复生产者信令
*
* @author acgist
*/
@Protocol
@Description(
body = """
{
"roomId": "房间ID"
"producerId": "消费者ID"
}
""",
flow = "终端->信令服务->媒体服务->信令服务->终端"
)
public class MediaProducerResumeProtocol extends ProtocolRoomAdapter {
public static final String SIGNAL = "media::producer::resume";
public MediaProducerResumeProtocol() {
super("恢复生产者信令", SIGNAL);
}
@Override
public void execute(String clientId, ClientType clientType, Room room, Client client, Client mediaClient, Message message, Map<String, Object> body) {
if(clientType.mediaClient()) {
mediaClient.push(message);
} else if(clientType.mediaServer()) {
room.broadcast(message);
} else {
this.logNoAdapter(clientType);
}
}
} }

View File

@@ -29,7 +29,11 @@ public class MediaProducerScoreProtocol extends ProtocolRoomAdapter {
@Override @Override
public void execute(String clientId, ClientType clientType, Room room, Client client, Client mediaClient, Message message, Map<String, Object> body) { public void execute(String clientId, ClientType clientType, Room room, Client client, Client mediaClient, Message message, Map<String, Object> body) {
if(clientType.mediaServer()) {
room.broadcast(message); room.broadcast(message);
} else {
this.logNoAdapter(clientType);
}
} }
} }

View File

@@ -1,5 +1,45 @@
package com.acgist.taoyao.signal.protocol.media; package com.acgist.taoyao.signal.protocol.media;
public class MediaProducerStatusProtocol { import java.util.Map;
import com.acgist.taoyao.boot.annotation.Description;
import com.acgist.taoyao.boot.annotation.Protocol;
import com.acgist.taoyao.boot.model.Message;
import com.acgist.taoyao.signal.client.Client;
import com.acgist.taoyao.signal.client.ClientType;
import com.acgist.taoyao.signal.party.media.Room;
import com.acgist.taoyao.signal.protocol.ProtocolRoomAdapter;
/**
* 查询生产者状态信令
*
* @author acgist
*/
@Protocol
@Description(
body = """
{
"roomId": "房间ID",
"producerId": "生产者ID"
}
""",
flow = "终端=>信令服务->媒体服务->信令服务->终端"
)
public class MediaProducerStatusProtocol extends ProtocolRoomAdapter {
public static final String SIGNAL = "media::producer::status";
public MediaProducerStatusProtocol() {
super("查询生产者状态信令", SIGNAL);
}
@Override
public void execute(String clientId, ClientType clientType, Room room, Client client, Client mediaClient, Message message, Map<String, Object> body) {
if(clientType.mediaClient()) {
client.push(mediaClient.request(message));
} else {
this.logNoAdapter(clientType);
}
}
} }

View File

@@ -1,5 +1,45 @@
package com.acgist.taoyao.signal.protocol.media; package com.acgist.taoyao.signal.protocol.media;
public class MediaTransportStatusProtocol { import java.util.Map;
import com.acgist.taoyao.boot.annotation.Description;
import com.acgist.taoyao.boot.annotation.Protocol;
import com.acgist.taoyao.boot.model.Message;
import com.acgist.taoyao.signal.client.Client;
import com.acgist.taoyao.signal.client.ClientType;
import com.acgist.taoyao.signal.party.media.Room;
import com.acgist.taoyao.signal.protocol.ProtocolRoomAdapter;
/**
* 查询通道状态信令
*
* @author acgist
*/
@Protocol
@Description(
body = """
{
"roomId": "房间ID",
"transportId": "通道ID"
}
""",
flow = "终端=>信令服务->媒体服务->信令服务->终端"
)
public class MediaTransportStatusProtocol extends ProtocolRoomAdapter {
public static final String SIGNAL = "media::transport::status";
public MediaTransportStatusProtocol() {
super("查询通道状态信令", SIGNAL);
}
@Override
public void execute(String clientId, ClientType clientType, Room room, Client client, Client mediaClient, Message message, Map<String, Object> body) {
if(clientType.mediaClient()) {
client.push(mediaClient.request(message));
} else {
this.logNoAdapter(clientType);
}
}
} }

View File

@@ -14,7 +14,7 @@ import com.acgist.taoyao.boot.utils.MapUtils;
import com.acgist.taoyao.boot.utils.NetUtils; import com.acgist.taoyao.boot.utils.NetUtils;
import com.acgist.taoyao.signal.client.Client; import com.acgist.taoyao.signal.client.Client;
import com.acgist.taoyao.signal.client.ClientType; import com.acgist.taoyao.signal.client.ClientType;
import com.acgist.taoyao.signal.event.MediaProduceEvent; import com.acgist.taoyao.signal.event.media.MediaConsumeEvent;
import com.acgist.taoyao.signal.party.media.ClientWrapper; import com.acgist.taoyao.signal.party.media.ClientWrapper;
import com.acgist.taoyao.signal.party.media.Room; import com.acgist.taoyao.signal.party.media.Room;
import com.acgist.taoyao.signal.party.media.Transport; import com.acgist.taoyao.signal.party.media.Transport;
@@ -75,7 +75,7 @@ public class MediaTransportWebRtcCreateProtocol extends ProtocolRoomAdapter {
} }
// 拷贝属性 // 拷贝属性
recvTransport.copy(responseBody); recvTransport.copy(responseBody);
this.publishEvent(new MediaProduceEvent(room, clientWrapper)); this.publishEvent(new MediaConsumeEvent(room, clientWrapper));
} }
// 生产者 // 生产者
final Boolean producing = MapUtils.getBoolean(body, Constant.PRODUCING); final Boolean producing = MapUtils.getBoolean(body, Constant.PRODUCING);

View File

@@ -10,7 +10,7 @@ import com.acgist.taoyao.boot.annotation.Protocol;
import com.acgist.taoyao.boot.model.Message; import com.acgist.taoyao.boot.model.Message;
import com.acgist.taoyao.signal.client.Client; import com.acgist.taoyao.signal.client.Client;
import com.acgist.taoyao.signal.client.ClientType; import com.acgist.taoyao.signal.client.ClientType;
import com.acgist.taoyao.signal.event.RoomEnterEvent; import com.acgist.taoyao.signal.event.room.RoomEnterEvent;
import com.acgist.taoyao.signal.party.media.Room; import com.acgist.taoyao.signal.party.media.Room;
import com.acgist.taoyao.signal.protocol.ProtocolRoomAdapter; import com.acgist.taoyao.signal.protocol.ProtocolRoomAdapter;

View File

@@ -12,7 +12,7 @@ import com.acgist.taoyao.boot.model.Message;
import com.acgist.taoyao.boot.utils.MapUtils; import com.acgist.taoyao.boot.utils.MapUtils;
import com.acgist.taoyao.signal.client.Client; import com.acgist.taoyao.signal.client.Client;
import com.acgist.taoyao.signal.client.ClientType; import com.acgist.taoyao.signal.client.ClientType;
import com.acgist.taoyao.signal.event.MediaClientRegisterEvent; import com.acgist.taoyao.signal.event.room.RoomCreateEvent;
import com.acgist.taoyao.signal.party.media.Room; import com.acgist.taoyao.signal.party.media.Room;
import com.acgist.taoyao.signal.protocol.ProtocolClientAdapter; import com.acgist.taoyao.signal.protocol.ProtocolClientAdapter;
@@ -41,7 +41,7 @@ import com.acgist.taoyao.signal.protocol.ProtocolClientAdapter;
}, },
flow = "终端->信令服务->媒体服务->信令服务+)终端" flow = "终端->信令服务->媒体服务->信令服务+)终端"
) )
public class RoomCreateProtocol extends ProtocolClientAdapter implements ApplicationListener<MediaClientRegisterEvent> { public class RoomCreateProtocol extends ProtocolClientAdapter implements ApplicationListener<RoomCreateEvent> {
public static final String SIGNAL = "room::create"; public static final String SIGNAL = "room::create";
@@ -51,7 +51,7 @@ public class RoomCreateProtocol extends ProtocolClientAdapter implements Applica
@Async @Async
@Override @Override
public void onApplicationEvent(MediaClientRegisterEvent event) { public void onApplicationEvent(RoomCreateEvent event) {
this.roomManager.recreate(event.getClient(), this.build()); this.roomManager.recreate(event.getClient(), this.build());
} }

View File

@@ -13,7 +13,7 @@ import com.acgist.taoyao.boot.model.MessageCodeException;
import com.acgist.taoyao.boot.utils.MapUtils; import com.acgist.taoyao.boot.utils.MapUtils;
import com.acgist.taoyao.signal.client.Client; import com.acgist.taoyao.signal.client.Client;
import com.acgist.taoyao.signal.client.ClientType; import com.acgist.taoyao.signal.client.ClientType;
import com.acgist.taoyao.signal.event.RoomEnterEvent; import com.acgist.taoyao.signal.event.room.RoomEnterEvent;
import com.acgist.taoyao.signal.party.media.ClientWrapper; import com.acgist.taoyao.signal.party.media.ClientWrapper;
import com.acgist.taoyao.signal.party.media.ClientWrapper.SubscribeType; import com.acgist.taoyao.signal.party.media.ClientWrapper.SubscribeType;
import com.acgist.taoyao.signal.party.media.Room; import com.acgist.taoyao.signal.party.media.Room;

View File

@@ -11,7 +11,7 @@ import com.acgist.taoyao.boot.config.Constant;
import com.acgist.taoyao.boot.model.Message; import com.acgist.taoyao.boot.model.Message;
import com.acgist.taoyao.signal.client.Client; import com.acgist.taoyao.signal.client.Client;
import com.acgist.taoyao.signal.client.ClientType; import com.acgist.taoyao.signal.client.ClientType;
import com.acgist.taoyao.signal.event.RoomLeaveEvent; import com.acgist.taoyao.signal.event.room.RoomLeaveEvent;
import com.acgist.taoyao.signal.party.media.Room; import com.acgist.taoyao.signal.party.media.Room;
import com.acgist.taoyao.signal.protocol.ProtocolRoomAdapter; import com.acgist.taoyao.signal.protocol.ProtocolRoomAdapter;