This commit is contained in:
acgist
2022-11-28 19:33:17 +08:00
parent 56dc649279
commit 7fe3babab0
28 changed files with 474 additions and 326 deletions

View File

@@ -373,10 +373,6 @@
取消订阅终端媒体流(终端取消拉流)
### 候选信令5004
IceCandidate
### 暂停信令5004
终端->服务端
@@ -397,6 +393,18 @@ MCU/SFU模式有效
配置订阅媒体:码率、帧率、分辨率等等
### Offer信令5997
Offer
### Answer信令5998
Answer
### 候选信令5999
IceCandidate
## 测试
```

View File

@@ -49,7 +49,7 @@ public abstract class ClientSessionAdapter<T extends AutoCloseable> implements C
@Override
public boolean timeout(long timeout) {
return !this.authorized && System.currentTimeMillis() - this.time > timeout;
return System.currentTimeMillis() - this.time > timeout;
}
@Override

View File

@@ -36,7 +36,7 @@ public class ClientSessionManager {
@Scheduled(cron = "${taoyao.scheduled.session:0 * * * * ?}")
public void scheduled() {
this.closeTimeoutSession();
this.closeTimeout();
}
/**
@@ -65,7 +65,9 @@ public class ClientSessionManager {
* @param message 消息
*/
public void unicast(String to, Message message) {
this.sessions.stream().filter(v -> v.filterSn(to)).forEach(v -> {
this.sessions().stream()
.filter(v -> v.filterSn(to))
.forEach(v -> {
message.getHeader().setSn(v.sn());
v.push(message);
});
@@ -77,7 +79,7 @@ public class ClientSessionManager {
* @param message 消息
*/
public void broadcast(Message message) {
this.sessions.forEach(v -> {
this.sessions().forEach(v -> {
message.getHeader().setSn(v.sn());
v.push(message);
});
@@ -90,7 +92,9 @@ public class ClientSessionManager {
* @param message 消息
*/
public void broadcast(String from, Message message) {
this.sessions.stream().filter(v -> v.filterNoneSn(from)).forEach(v -> {
this.sessions().stream().
filter(v -> v.filterNoneSn(from))
.forEach(v -> {
message.getHeader().setSn(v.sn());
v.push(message);
});
@@ -102,7 +106,7 @@ public class ClientSessionManager {
* @return 终端会话
*/
public ClientSession session(String sn) {
return this.sessions.stream()
return this.sessions().stream()
.filter(v -> StringUtils.equals(sn, v.sn()))
.findFirst()
.orElse(null);
@@ -122,14 +126,18 @@ public class ClientSessionManager {
* @return 所有终端会话
*/
public List<ClientSession> sessions() {
return this.sessions;
return this.sessions.stream()
.filter(ClientSession::authorized)
.toList();
}
/**
* @return 所有终端状态
*/
public List<ClientSessionStatus> status() {
return this.sessions().stream().map(ClientSession::status).toList();
return this.sessions().stream()
.map(ClientSession::status)
.toList();
}
/**
@@ -160,9 +168,10 @@ public class ClientSessionManager {
/**
* 定时关闭超时会话
*/
private void closeTimeoutSession() {
private void closeTimeout() {
log.debug("定时关闭超时会话");
this.sessions.stream()
.filter(v -> !v.authorized())
.filter(v -> v.timeout(this.taoyaoProperties.getTimeout()))
.forEach(v -> {
log.debug("关闭超时会话:{}", v);

View File

@@ -0,0 +1,34 @@
package com.acgist.taoyao.signal.event.media;
import java.util.Map;
import com.acgist.taoyao.boot.model.Message;
import com.acgist.taoyao.signal.client.ClientSession;
import com.acgist.taoyao.signal.event.ApplicationEventAdapter;
import lombok.Getter;
import lombok.Setter;
/**
* Answer事件
*
* @author acgist
*/
@Getter
@Setter
public class MediaAnswerEvent extends ApplicationEventAdapter {
private static final long serialVersionUID = 1L;
public MediaAnswerEvent(String sn, Map<?, ?> body, Message message, ClientSession session) {
super(sn, body, message, session);
}
/**
* @return 接收终端标识
*/
public String getTo() {
return this.get("to");
}
}

View File

@@ -1,6 +1,5 @@
package com.acgist.taoyao.signal.event.media;
import java.util.List;
import java.util.Map;
import com.acgist.taoyao.boot.model.Message;
@@ -26,10 +25,10 @@ public class MediaCandidateEvent extends ApplicationEventAdapter {
}
/**
* @return 终端列表
* @return 接收终端标识
*/
public List<String> getSns() {
return this.get("sns");
public String getTo() {
return this.get("to");
}
}

View File

@@ -0,0 +1,34 @@
package com.acgist.taoyao.signal.event.media;
import java.util.Map;
import com.acgist.taoyao.boot.model.Message;
import com.acgist.taoyao.signal.client.ClientSession;
import com.acgist.taoyao.signal.event.ApplicationEventAdapter;
import lombok.Getter;
import lombok.Setter;
/**
* Offer事件
*
* @author acgist
*/
@Getter
@Setter
public class MediaOfferEvent extends ApplicationEventAdapter {
private static final long serialVersionUID = 1L;
public MediaOfferEvent(String sn, Map<?, ?> body, Message message, ClientSession session) {
super(sn, body, message, session);
}
/**
* @return 接收终端标识
*/
public String getTo() {
return this.get("to");
}
}

View File

@@ -25,7 +25,7 @@ public class MediaPublishEvent extends ApplicationEventAdapter {
}
/**
* @return 终端标识(发布给谁)
* @return 接收终端标识
*/
public String getTo() {
return this.get("to");

View File

@@ -25,7 +25,7 @@ public class MediaSubscribeEvent extends ApplicationEventAdapter {
}
/**
* @return 终端标识(订阅的谁)
* @return 接收终端标识
*/
public String getTo() {
return this.get("to");

View File

@@ -0,0 +1,30 @@
package com.acgist.taoyao.signal.protocol.media;
import java.util.Map;
import com.acgist.taoyao.boot.annotation.Protocol;
import com.acgist.taoyao.boot.model.Message;
import com.acgist.taoyao.signal.client.ClientSession;
import com.acgist.taoyao.signal.event.media.MediaAnswerEvent;
import com.acgist.taoyao.signal.protocol.ProtocolMapAdapter;
/**
* Answer信令
*
* @author acgist
*/
@Protocol
public class MediaAnswerProtocol extends ProtocolMapAdapter {
public static final Integer PID = 5998;
public MediaAnswerProtocol() {
super(PID, "Answer信令");
}
@Override
public void execute(String sn, Map<?, ?> body, Message message, ClientSession session) {
this.publishEvent(new MediaAnswerEvent(sn, body, message, session));
}
}

View File

@@ -16,7 +16,7 @@ import com.acgist.taoyao.signal.protocol.ProtocolMapAdapter;
@Protocol
public class MediaCandidateProtocol extends ProtocolMapAdapter {
public static final Integer PID = 5004;
public static final Integer PID = 5999;
public MediaCandidateProtocol() {
super(PID, "候选信令");

View File

@@ -0,0 +1,30 @@
package com.acgist.taoyao.signal.protocol.media;
import java.util.Map;
import com.acgist.taoyao.boot.annotation.Protocol;
import com.acgist.taoyao.boot.model.Message;
import com.acgist.taoyao.signal.client.ClientSession;
import com.acgist.taoyao.signal.event.media.MediaOfferEvent;
import com.acgist.taoyao.signal.protocol.ProtocolMapAdapter;
/**
* Offer信令
*
* @author acgist
*/
@Protocol
public class MediaOfferProtocol extends ProtocolMapAdapter {
public static final Integer PID = 5997;
public MediaOfferProtocol() {
super(PID, "Offer信令");
}
@Override
public void execute(String sn, Map<?, ?> body, Message message, ClientSession session) {
this.publishEvent(new MediaOfferEvent(sn, body, message, session));
}
}