[*] map body

This commit is contained in:
acgist
2023-02-12 15:33:35 +08:00
parent e8ae344d11
commit 61c93b2614
34 changed files with 223 additions and 189 deletions

View File

@@ -26,6 +26,7 @@ public class WebSocketUtils {
if (session == null) { if (session == null) {
return null; return null;
} }
// 远程IP地址
return (String) getField(session.getAsyncRemote(), "base.socketWrapper.remoteAddr"); return (String) getField(session.getAsyncRemote(), "base.socketWrapper.remoteAddr");
} }

View File

@@ -2,7 +2,7 @@ taoyao:
media: media:
media-server-list: media-server-list:
- name: media-local-a - name: media-local-a
enabled: true enabled: false
host: 192.168.1.110 host: 192.168.1.110
port: 4443 port: 4443
schema: wss schema: wss

View File

@@ -9,8 +9,8 @@ server:
key-password: 123456 key-password: 123456
tomcat: tomcat:
thread: thread:
max: 128 max: 256
min-spare: 4 min-spare: 8
remoteip: remoteip:
host-header: X-Forwarded-Host host-header: X-Forwarded-Host
port-header: X-Forwarded-Port port-header: X-Forwarded-Port

View File

@@ -32,7 +32,12 @@ public @interface CostedTest {
/** /**
* @return 超时时间 * @return 超时时间
*/ */
long timeout() default 1000; long timeout() default 1000L;
/**
* @return 等待资源释放时间
*/
long waitRelease() default 0L;
/** /**
* @return 超时时间单位 * @return 超时时间单位

View File

@@ -49,10 +49,15 @@ public class CostedTestTestExecutionListener implements TestExecutionListener {
}); });
} }
countDownLatch.await(timeout, timeUnit); countDownLatch.await(timeout, timeUnit);
executor.shutdown();
} }
final long zTime = System.currentTimeMillis(); final long zTime = System.currentTimeMillis();
final long costed = zTime - aTime; final long costed = zTime - aTime;
log.info("多线程测试消耗时间:{}", costed); log.info("多线程测试消耗时间:{}", costed);
final long waitRelease = costedTest.waitRelease();
if(waitRelease > 0) {
Thread.sleep(waitRelease);
}
} }
} }

View File

@@ -1,12 +1,16 @@
package com.acgist.taoyao.signal; package com.acgist.taoyao.signal;
import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.net.http.WebSocket; import java.net.http.WebSocket;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import com.acgist.taoyao.annotation.CostedTest;
import com.acgist.taoyao.annotation.TaoyaoTest; import com.acgist.taoyao.annotation.TaoyaoTest;
import com.acgist.taoyao.main.TaoyaoApplication; import com.acgist.taoyao.main.TaoyaoApplication;
@@ -16,34 +20,41 @@ import lombok.extern.slf4j.Slf4j;
@TaoyaoTest(classes = TaoyaoApplication.class) @TaoyaoTest(classes = TaoyaoApplication.class)
class SignalTest { class SignalTest {
/**
* 防止GC
*/
private List<WebSocket> list = new ArrayList<>();
@Test @Test
void testSignal() throws InterruptedException { void testSignal() throws InterruptedException {
final WebSocket clientA = WebSocketClient.build("wss://localhost:8888/websocket.signal", "clientA"); final WebSocket clientA = WebSocketClient.build("wss://localhost:8888/websocket.signal", "clientA");
final WebSocket clientB = WebSocketClient.build("wss://localhost:8888/websocket.signal", "clientB"); final WebSocket clientB = WebSocketClient.build("wss://localhost:8888/websocket.signal", "clientB");
clientA.sendText(""" clientA.sendText("""
{"header":{"pid":1000,"v":"1.0.0","id":"1","sn":"clientA"},"body":{}} {"header":{"signal":"client::heartbeat","v":"1.0.0","id":"1"},"body":{}}
""", true).join(); """, true).join();
assertNotNull(clientA); assertNotNull(clientA);
assertNotNull(clientB); assertNotNull(clientB);
} }
@Test @Test
@CostedTest(thread = 10, count = 100, waitRelease = 5000L)
void testThread() throws InterruptedException { void testThread() throws InterruptedException {
final int total = 1000; final int total = 100;
final CountDownLatch count = new CountDownLatch(total); final CountDownLatch count = new CountDownLatch(total);
final WebSocket clientA = WebSocketClient.build("wss://localhost:8888/websocket.signal", "clientA", count); final WebSocket clientA = WebSocketClient.build("wss://localhost:8888/websocket.signal", "clientA", count);
final long aTime = System.currentTimeMillis(); final long aTime = System.currentTimeMillis();
for (int index = 0; index < total; index++) { for (int index = 0; index < total; index++) {
clientA.sendText(""" clientA.sendText("""
{"header":{"pid":2999,"v":"1.0.0","id":"1","sn":"clientA"},"body":{}} {"header":{"signal":"client::status","v":"1.0.0","id":"1"},"body":{}}
""", true).join(); """, true).join();
} }
this.list.add(clientA);
// final ExecutorService executor = Executors.newFixedThreadPool(10); // final ExecutorService executor = Executors.newFixedThreadPool(10);
// for (int index = 0; index < total; index++) { // for (int index = 0; index < total; index++) {
// executor.execute(() -> { // executor.execute(() -> {
// synchronized (clientA) { // synchronized (clientA) {
// clientA.sendText(""" // clientA.sendText("""
// {"header":{"pid":2999,"v":"1.0.0","id":"1","sn":"clientA"},"body":{}} // {"header":{"signal":"client::status","v":"1.0.0","id":"1"},"body":{}}
// """, true).join(); // """, true).join();
// } // }
// }); // });
@@ -51,8 +62,20 @@ class SignalTest {
count.await(); count.await();
final long zTime = System.currentTimeMillis(); final long zTime = System.currentTimeMillis();
log.info("执行时间:{}", zTime - aTime); log.info("执行时间:{}", zTime - aTime);
Thread.sleep(1000); log.info("当前连接数量:{}", this.list.size());
assertNotNull(clientA); assertNotNull(clientA);
} }
@Test
void testMax() throws InterruptedException {
final int size = 1024;
final CountDownLatch count = new CountDownLatch(size);
for (int index = 0; index < size; index++) {
final WebSocket clientA = WebSocketClient.build("wss://localhost:8888/websocket.signal", "clientA", count);
assertNotNull(clientA);
assertTrue(!(clientA.isInputClosed() || clientA.isOutputClosed()));
}
count.await();
}
} }

View File

@@ -37,7 +37,7 @@ public class WebSocketClient {
@Override @Override
public void onOpen(WebSocket webSocket) { public void onOpen(WebSocket webSocket) {
webSocket.sendText(String.format(""" webSocket.sendText(String.format("""
{"header":{"pid":2000,"v":"1.0.0","id":"1","sn":"%s"},"body":{"username":"taoyao","password":"taoyao","ip":"127.0.0.1"}} {"header":{"signal":"client::register","v":"1.0.0","id":"1"},"body":{"username":"taoyao","password":"taoyao","ip":"127.0.0.1","sn":"%s"}}
""", sn), true); """, sn), true);
Listener.super.onOpen(webSocket); Listener.super.onOpen(webSocket);
} }

View File

@@ -1,29 +0,0 @@
package com.acgist.taoyao.signal.protocol;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import java.util.Map;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import com.acgist.taoyao.annotation.TaoyaoTest;
import com.acgist.taoyao.boot.model.Message;
import com.acgist.taoyao.main.TaoyaoApplication;
import com.acgist.taoyao.signal.protocol.platform.PlatformScriptProtocol;
@TaoyaoTest(classes = TaoyaoApplication.class)
class PlatformScriptProtocolTest {
@Autowired
private PlatformScriptProtocol platformScriptProtocol;
@Test
void testScript() {
assertDoesNotThrow(() -> {
this.platformScriptProtocol.execute("taoyao", Map.of("script", "netstat -ano"), null, Message.success());
Thread.sleep(1000);
});
}
}

View File

@@ -1,27 +0,0 @@
package com.acgist.taoyao.signal.protocol;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import com.acgist.taoyao.annotation.TaoyaoTest;
import com.acgist.taoyao.boot.model.Message;
import com.acgist.taoyao.main.TaoyaoApplication;
import com.acgist.taoyao.signal.protocol.platform.PlatformShutdownProtocol;
@TaoyaoTest(classes = TaoyaoApplication.class)
class PlatformShutdownProtocolTest {
@Autowired
private PlatformShutdownProtocol platformShutdownProtocol;
@Test
void testShutdown() {
assertDoesNotThrow(() -> {
this.platformShutdownProtocol.execute("taoyao", null, Message.success());
Thread.sleep(1000);
});
}
}

View File

@@ -393,6 +393,6 @@
``` ```
let socket = new WebSocket("wss://localhost:8888/websocket.signal"); let socket = new WebSocket("wss://localhost:8888/websocket.signal");
socket.send('{"header":{"pid":2000,"v":"1.0.0","id":"1","sn":"taoyao"},"body":{"username":"taoyao","password":"taoyao"}}'); socket.send('{"header":{"signal":"client::register","v":"1.0.0","id":"1"},"body":{"username":"taoyao","password":"taoyao","sn":"taoyao"}}');
socket.send('{"header":{"pid":1000,"v":"1.0.0","id":"1","sn":"taoyao"},"body":{}}'); socket.send('{"header":{"signal":"client::heartbeat","v":"1.0.0","id":"1"},"body":{}}');
``` ```

View File

@@ -0,0 +1,104 @@
package com.acgist.taoyao.signal;
import java.util.Map;
/**
* Map参数
*
* @author acgist
*/
public interface MapBodyGetter {
/**
* @param <T> 参数泛型
*
* @param body 消息主体
* @param key 参数名称
*
* @return 值
*/
@SuppressWarnings("unchecked")
default <T> T get(Map<?, ?> body, String key) {
if(body == null) {
return null;
}
return (T) body.get(key);
}
/**
* @param <T> 参数泛型
*
* @param body 消息主体
* @param key 参数名称
* @param defaultValue 默认值
*
* @return 值
*/
@SuppressWarnings("unchecked")
default <T> T get(Map<?, ?> body, String key, T defaultValue) {
if(body == null) {
return defaultValue;
}
final T t = (T) body.get(key);
return t == null ? defaultValue : t;
}
/**
* @param body 消息主体
* @param key 参数名称
*
* @return 值
*/
default Long getLong(Map<?, ?> body, String key) {
if(body == null) {
return null;
}
final Object object = body.get(key);
if(object == null) {
return null;
} else if(object instanceof Long value) {
return value;
}
return Long.valueOf(object.toString());
}
/**
* @param body 消息主体
* @param key 参数名称
*
* @return 值
*/
default Integer getInteger(Map<?, ?> body, String key) {
if(body == null) {
return null;
}
final Object object = body.get(key);
if(object == null) {
return null;
} else if(object instanceof Integer value) {
return value;
}
return Integer.valueOf(object.toString());
}
/**
* @param body 消息主体
* @param key 参数名称
*
* @return 值
*/
default Boolean getBoolean(Map<?, ?> body, String key) {
if(body == null) {
return null;
}
final Object object = body.get(key);
if(object == null) {
return null;
} else if(object instanceof Boolean value) {
return value;
}
return Boolean.valueOf(object.toString());
}
}

View File

@@ -104,19 +104,7 @@ public class ClientManager {
} }
/** /**
* @param sn 终端标识 * @param instance 终端实例
*
* @return 终端会话
*/
public Client client(String sn) {
return this.clients().stream()
.filter(v -> Objects.equals(sn, v.sn()))
.findFirst()
.orElse(null);
}
/**
* @param instance 终端示例
* *
* @return 终端 * @return 终端
*/ */
@@ -127,6 +115,17 @@ public class ClientManager {
.orElse(null); .orElse(null);
} }
/**
* @param sn 终端标识
*
* @return 终端会话
*/
public List<Client> clients(String sn) {
return this.clients().stream()
.filter(v -> Objects.equals(sn, v.sn()))
.toList();
}
/** /**
* @return 所有终端会话 * @return 所有终端会话
*/ */
@@ -136,14 +135,25 @@ public class ClientManager {
.toList(); .toList();
} }
/**
* @param instance 终端实例
*
* @return 终端状态
*/
public ClientStatus status(AutoCloseable instance) {
final Client client = this.client(instance);
return client == null ? null : client.status();
}
/** /**
* @param sn 终端标识 * @param sn 终端标识
* *
* @return 终端状态 * @return 终端状态
*/ */
public ClientStatus status(String sn) { public List<ClientStatus> status(String sn) {
final Client client = this.client(sn); return this.clients(sn).stream()
return client == null ? null : client.status(); .map(Client::status)
.toList();
} }
/** /**

View File

@@ -36,7 +36,7 @@ public class WebSocketClient extends ClientAdapter<Session> {
synchronized (this.instance) { synchronized (this.instance) {
try { try {
if(this.instance.isOpen()) { if(this.instance.isOpen()) {
this.basic.sendText(message.toString()); this.basic.sendText(message.toString(), true);
} else { } else {
log.error("会话已经关闭:{}", this.instance); log.error("会话已经关闭:{}", this.instance);
} }

View File

@@ -10,9 +10,9 @@ import org.springframework.web.bind.annotation.RestController;
import com.acgist.taoyao.boot.model.Message; import com.acgist.taoyao.boot.model.Message;
import com.acgist.taoyao.signal.client.ClientStatus; import com.acgist.taoyao.signal.client.ClientStatus;
import com.acgist.taoyao.signal.room.Room; import com.acgist.taoyao.signal.media.Room;
import com.acgist.taoyao.signal.room.RoomManager; import com.acgist.taoyao.signal.media.RoomManager;
import com.acgist.taoyao.signal.room.RoomStatus; import com.acgist.taoyao.signal.media.RoomStatus;
import io.swagger.v3.oas.annotations.Operation; import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.media.Content; import io.swagger.v3.oas.annotations.media.Content;

View File

@@ -1,12 +1,11 @@
package com.acgist.taoyao.signal.event; package com.acgist.taoyao.signal.event;
import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Objects;
import org.springframework.context.ApplicationEvent; import org.springframework.context.ApplicationEvent;
import com.acgist.taoyao.boot.model.Message; import com.acgist.taoyao.boot.model.Message;
import com.acgist.taoyao.signal.MapBodyGetter;
import com.acgist.taoyao.signal.client.Client; import com.acgist.taoyao.signal.client.Client;
import lombok.Getter; import lombok.Getter;
@@ -19,7 +18,7 @@ import lombok.Setter;
*/ */
@Getter @Getter
@Setter @Setter
public abstract class ApplicationEventAdapter extends ApplicationEvent { public abstract class ApplicationEventAdapter extends ApplicationEvent implements MapBodyGetter {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
@@ -53,65 +52,31 @@ public abstract class ApplicationEventAdapter extends ApplicationEvent {
} }
/** /**
* @param <T> 参数泛型 * @see #get(Map, String)
*
* @param key 参数名称
*
* @return 值
*/ */
@SuppressWarnings("unchecked")
public <T> T get(String key) { public <T> T get(String key) {
if(this.body == null) { return this.get(this.body, key);
return null;
}
return (T) this.body.get(key);
} }
/** /**
* @param <T> 参数泛型 * @see #get(Map, String, Object)
*
* @param key 参数名称
* @param defaultValue 默认值
*
* @return 值
*/ */
@SuppressWarnings("unchecked")
public <T> T get(String key, T defaultValue) { public <T> T get(String key, T defaultValue) {
if(this.body == null) { return this.get(body, key, defaultValue);
return defaultValue;
}
final T t = (T) this.body.get(key);
return t == null ? defaultValue : t;
} }
/** /**
* @param key 参数名称 * @see #getLong(Map, String)
*
* @return 值
*/ */
public Long getLong(String key) { public Long getLong(String key) {
if(this.body == null) { return this.getLong(body, key);
return null;
}
final Object object = this.body.get(key);
if(object == null) {
return null;
} else if(object instanceof Long value) {
return value;
}
return Long.valueOf(object.toString());
} }
/** /**
* @return 新的主体 * @see #getInteger(Map, String)
*/ */
public Map<String, Object> mergeBody() { public Integer getInteger(String key) {
final Map<String, Object> body = new HashMap<>(); return this.getInteger(body, key);
if(this.body != null) {
this.body.forEach((k, v) -> body.put(Objects.toString(k), v));
}
this.message.setBody(body);
return body;
} }
} }

View File

@@ -7,7 +7,7 @@ import org.springframework.context.ApplicationListener;
import com.acgist.taoyao.signal.client.ClientManager; import com.acgist.taoyao.signal.client.ClientManager;
import com.acgist.taoyao.signal.event.ApplicationEventAdapter; import com.acgist.taoyao.signal.event.ApplicationEventAdapter;
import com.acgist.taoyao.signal.media.MediaClientManager; import com.acgist.taoyao.signal.media.MediaClientManager;
import com.acgist.taoyao.signal.room.RoomManager; import com.acgist.taoyao.signal.media.RoomManager;
/** /**
* 事件监听适配器 * 事件监听适配器

View File

@@ -5,7 +5,7 @@ import com.acgist.taoyao.boot.model.Message;
import com.acgist.taoyao.boot.model.MessageCodeException; import com.acgist.taoyao.boot.model.MessageCodeException;
import com.acgist.taoyao.signal.event.room.RoomCreateEvent; import com.acgist.taoyao.signal.event.room.RoomCreateEvent;
import com.acgist.taoyao.signal.listener.ApplicationListenerAdapter; import com.acgist.taoyao.signal.listener.ApplicationListenerAdapter;
import com.acgist.taoyao.signal.room.Room; import com.acgist.taoyao.signal.media.Room;
/** /**
* 创建房间监听 * 创建房间监听

View File

@@ -10,8 +10,8 @@ import com.acgist.taoyao.signal.client.Client;
import com.acgist.taoyao.signal.event.room.RoomEnterEvent; import com.acgist.taoyao.signal.event.room.RoomEnterEvent;
import com.acgist.taoyao.signal.listener.ApplicationListenerAdapter; import com.acgist.taoyao.signal.listener.ApplicationListenerAdapter;
import com.acgist.taoyao.signal.media.MediaClient; import com.acgist.taoyao.signal.media.MediaClient;
import com.acgist.taoyao.signal.media.Room;
import com.acgist.taoyao.signal.protocol.Constant; import com.acgist.taoyao.signal.protocol.Constant;
import com.acgist.taoyao.signal.room.Room;
/** /**
* 进入房间监听 * 进入房间监听

View File

@@ -38,7 +38,6 @@ import com.acgist.taoyao.signal.protocol.Protocol;
import com.acgist.taoyao.signal.protocol.ProtocolManager; import com.acgist.taoyao.signal.protocol.ProtocolManager;
import com.acgist.taoyao.signal.protocol.ProtocolMediaAdapter; import com.acgist.taoyao.signal.protocol.ProtocolMediaAdapter;
import com.acgist.taoyao.signal.protocol.media.MediaRegisterProtocol; import com.acgist.taoyao.signal.protocol.media.MediaRegisterProtocol;
import com.acgist.taoyao.signal.protocol.platform.PlatformErrorProtocol;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@@ -60,8 +59,6 @@ public class MediaClient {
private TaoyaoProperties taoyaoProperties; private TaoyaoProperties taoyaoProperties;
@Autowired @Autowired
private MediaRegisterProtocol mediaRegisterProtocol; private MediaRegisterProtocol mediaRegisterProtocol;
@Autowired
private PlatformErrorProtocol platformErrorProtocol;
/** /**
* 名称 * 名称

View File

@@ -1,4 +1,4 @@
package com.acgist.taoyao.signal.room; package com.acgist.taoyao.signal.media;
import java.io.Closeable; import java.io.Closeable;
import java.util.List; import java.util.List;
@@ -7,8 +7,6 @@ import java.util.Objects;
import com.acgist.taoyao.boot.model.Message; import com.acgist.taoyao.boot.model.Message;
import com.acgist.taoyao.signal.client.Client; import com.acgist.taoyao.signal.client.Client;
import com.acgist.taoyao.signal.client.ClientStatus; import com.acgist.taoyao.signal.client.ClientStatus;
import com.acgist.taoyao.signal.media.MediaClient;
import com.acgist.taoyao.signal.media.Transport;
import lombok.Getter; import lombok.Getter;
import lombok.Setter; import lombok.Setter;

View File

@@ -1,4 +1,4 @@
package com.acgist.taoyao.signal.room; package com.acgist.taoyao.signal.media;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@@ -12,8 +12,6 @@ import com.acgist.taoyao.boot.model.Message;
import com.acgist.taoyao.boot.model.MessageCodeException; import com.acgist.taoyao.boot.model.MessageCodeException;
import com.acgist.taoyao.boot.service.IdService; import com.acgist.taoyao.boot.service.IdService;
import com.acgist.taoyao.signal.client.Client; import com.acgist.taoyao.signal.client.Client;
import com.acgist.taoyao.signal.media.MediaClient;
import com.acgist.taoyao.signal.media.MediaClientManager;
import com.acgist.taoyao.signal.protocol.Constant; import com.acgist.taoyao.signal.protocol.Constant;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;

View File

@@ -1,4 +1,4 @@
package com.acgist.taoyao.signal.room; package com.acgist.taoyao.signal.media;
import io.swagger.v3.oas.annotations.media.Schema; import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Getter; import lombok.Getter;

View File

@@ -12,7 +12,7 @@ import com.acgist.taoyao.boot.service.IdService;
import com.acgist.taoyao.signal.client.ClientManager; import com.acgist.taoyao.signal.client.ClientManager;
import com.acgist.taoyao.signal.event.ApplicationEventAdapter; import com.acgist.taoyao.signal.event.ApplicationEventAdapter;
import com.acgist.taoyao.signal.media.MediaClientManager; import com.acgist.taoyao.signal.media.MediaClientManager;
import com.acgist.taoyao.signal.room.RoomManager; import com.acgist.taoyao.signal.media.RoomManager;
/** /**
* 信令适配器 * 信令适配器

View File

@@ -80,6 +80,10 @@ public class ProtocolManager {
public void execute(String content, AutoCloseable instance) { public void execute(String content, AutoCloseable instance) {
log.debug("执行信令消息:{}", content); log.debug("执行信令消息:{}", content);
final Client client = this.clientManager.client(instance); final Client client = this.clientManager.client(instance);
if(client == null) {
log.warn("信令终端无效:{}-{}", instance, content);
return;
}
// 验证请求 // 验证请求
final Message message = JSONUtils.toJava(content, Message.class); final Message message = JSONUtils.toJava(content, Message.class);
if(message == null) { if(message == null) {

View File

@@ -4,6 +4,7 @@ import java.util.Map;
import com.acgist.taoyao.boot.model.Message; import com.acgist.taoyao.boot.model.Message;
import com.acgist.taoyao.boot.model.MessageCodeException; import com.acgist.taoyao.boot.model.MessageCodeException;
import com.acgist.taoyao.signal.MapBodyGetter;
import com.acgist.taoyao.signal.client.Client; import com.acgist.taoyao.signal.client.Client;
/** /**
@@ -11,7 +12,7 @@ import com.acgist.taoyao.signal.client.Client;
* *
* @author acgist * @author acgist
*/ */
public abstract class ProtocolMapAdapter extends ProtocolAdapter { public abstract class ProtocolMapAdapter extends ProtocolAdapter implements MapBodyGetter {
protected ProtocolMapAdapter(String name, String signal) { protected ProtocolMapAdapter(String name, String signal) {
super(name, signal); super(name, signal);

View File

@@ -5,15 +5,16 @@ import java.util.Map;
import com.acgist.taoyao.boot.model.Message; import com.acgist.taoyao.boot.model.Message;
import com.acgist.taoyao.boot.model.MessageCodeException; import com.acgist.taoyao.boot.model.MessageCodeException;
import com.acgist.taoyao.signal.MapBodyGetter;
import com.acgist.taoyao.signal.client.Client; import com.acgist.taoyao.signal.client.Client;
import com.acgist.taoyao.signal.room.Room; import com.acgist.taoyao.signal.media.Room;
/** /**
* 房间媒体服务信令适配器 * 房间媒体服务信令适配器
* *
* @author acgist * @author acgist
*/ */
public abstract class ProtocolMediaRoomAdapter extends ProtocolMediaAdapter { public abstract class ProtocolMediaRoomAdapter extends ProtocolMediaAdapter implements MapBodyGetter {
protected ProtocolMediaRoomAdapter(String name, String signal) { protected ProtocolMediaRoomAdapter(String name, String signal) {
super(name, signal); super(name, signal);
@@ -45,7 +46,7 @@ public abstract class ProtocolMediaRoomAdapter extends ProtocolMediaAdapter {
* @return 房间 * @return 房间
*/ */
protected Room room(Map<?, ?> map) { protected Room room(Map<?, ?> map) {
final Long roomId = this.roomId(map); final Long roomId = this.getLong(map, Constant.ROOM_ID);
final Room room = this.roomManager.room(roomId); final Room room = this.roomManager.room(roomId);
if(room == null) { if(room == null) {
throw MessageCodeException.of("房间无效:" + roomId); throw MessageCodeException.of("房间无效:" + roomId);
@@ -53,21 +54,6 @@ public abstract class ProtocolMediaRoomAdapter extends ProtocolMediaAdapter {
return room; return room;
} }
/**
* @param map 参数
*
* @return 房间ID
*/
protected Long roomId(Map<?, ?> map) {
final Object object = map.get(Constant.ROOM_ID);
if(object == null) {
return null;
} else if(object instanceof Long value) {
return value;
}
return Long.valueOf(object.toString());
}
/** /**
* 处理房间信令 * 处理房间信令
* *

View File

@@ -30,9 +30,9 @@ public class ClientHeartbeatProtocol extends ProtocolMapAdapter {
client.push(message.cloneWidthoutBody()); client.push(message.cloneWidthoutBody());
// 设置状态 // 设置状态
final ClientStatus status = client.status(); final ClientStatus status = client.status();
status.setSignal((Integer) body.get(Constant.SIGNAL)); status.setSignal(this.get(body, Constant.SIGNAL));
status.setBattery((Integer) body.get(Constant.BATTERY)); status.setBattery(this.get(body, Constant.BATTERY));
status.setCharging((Boolean) body.get(Constant.CHARGING)); status.setCharging(this.get(body, Constant.CHARGING));
status.setLastHeartbeat(LocalDateTime.now()); status.setLastHeartbeat(LocalDateTime.now());
} }

View File

@@ -35,9 +35,9 @@ public class ClientRegisterProtocol extends ProtocolMapAdapter {
@Override @Override
public void execute(String sn, Map<?, ?> body, Client client, Message message) { public void execute(String sn, Map<?, ?> body, Client client, Message message) {
final String clientSn = (String) body.get(Constant.SN); final String clientSn = this.get(body, Constant.SN);
final String username = (String) body.get(Constant.USERNAME); final String username = this.get(body, Constant.USERNAME);
final String password = (String) body.get(Constant.PASSWORD); final String password = this.get(body, Constant.PASSWORD);
// 如果需要终端鉴权在此实现 // 如果需要终端鉴权在此实现
if(this.securityService.authenticate(username, password)) { if(this.securityService.authenticate(username, password)) {
log.info("终端注册:{}", clientSn); log.info("终端注册:{}", clientSn);

View File

@@ -1,9 +1,6 @@
package com.acgist.taoyao.signal.protocol.client; package com.acgist.taoyao.signal.protocol.client;
import java.util.Map; import java.util.Map;
import java.util.Objects;
import org.apache.commons.lang3.StringUtils;
import com.acgist.taoyao.boot.annotation.Protocol; import com.acgist.taoyao.boot.annotation.Protocol;
import com.acgist.taoyao.boot.model.Message; import com.acgist.taoyao.boot.model.Message;
@@ -28,11 +25,7 @@ public class ClientStatusProtocol extends ProtocolMapAdapter {
@Override @Override
public void execute(String sn, Map<?, ?> body, Client client, Message message) { public void execute(String sn, Map<?, ?> body, Client client, Message message) {
// 如果没有指定终端标识默认查询自己 // 如果没有指定终端标识默认查询自己
String querySn = (String) body.get(Constant.SN); message.setBody(this.clientManager.status(this.get(body, Constant.SN, sn)));
if(StringUtils.isEmpty(querySn)) {
querySn = sn;
}
message.setBody(this.clientManager.status(querySn));
client.push(message); client.push(message);
} }

View File

@@ -6,8 +6,8 @@ import java.util.Map;
import com.acgist.taoyao.boot.annotation.Protocol; import com.acgist.taoyao.boot.annotation.Protocol;
import com.acgist.taoyao.boot.model.Message; import com.acgist.taoyao.boot.model.Message;
import com.acgist.taoyao.signal.client.Client; import com.acgist.taoyao.signal.client.Client;
import com.acgist.taoyao.signal.media.Room;
import com.acgist.taoyao.signal.protocol.ProtocolMediaRoomAdapter; import com.acgist.taoyao.signal.protocol.ProtocolMediaRoomAdapter;
import com.acgist.taoyao.signal.room.Room;
/** /**
* 当前讲话终端信令 * 当前讲话终端信令

View File

@@ -6,8 +6,8 @@ import java.util.Map;
import com.acgist.taoyao.boot.annotation.Protocol; import com.acgist.taoyao.boot.annotation.Protocol;
import com.acgist.taoyao.boot.model.Message; import com.acgist.taoyao.boot.model.Message;
import com.acgist.taoyao.signal.client.Client; import com.acgist.taoyao.signal.client.Client;
import com.acgist.taoyao.signal.media.Room;
import com.acgist.taoyao.signal.protocol.ProtocolMediaRoomAdapter; import com.acgist.taoyao.signal.protocol.ProtocolMediaRoomAdapter;
import com.acgist.taoyao.signal.room.Room;
/** /**
* 路由RTP能力信令 * 路由RTP能力信令

View File

@@ -5,8 +5,8 @@ import org.springframework.beans.factory.annotation.Autowired;
import com.acgist.taoyao.boot.annotation.Protocol; import com.acgist.taoyao.boot.annotation.Protocol;
import com.acgist.taoyao.boot.model.Message; import com.acgist.taoyao.boot.model.Message;
import com.acgist.taoyao.signal.client.Client; import com.acgist.taoyao.signal.client.Client;
import com.acgist.taoyao.signal.media.RoomManager;
import com.acgist.taoyao.signal.protocol.ProtocolAdapter; import com.acgist.taoyao.signal.protocol.ProtocolAdapter;
import com.acgist.taoyao.signal.room.RoomManager;
/** /**
* 房间列表信令 * 房间列表信令

View File

@@ -25,11 +25,11 @@ public interface SecurityService {
* 鉴权 * 鉴权
* *
* @param message 信令 * @param message 信令
* @param session 会话 * @param client 会话
* @param protocol 协议 * @param protocol 协议
* *
* @return 是否成功 * @return 是否成功
*/ */
boolean authenticate(Message message, Client session, Protocol protocol); boolean authenticate(Message message, Client client, Protocol protocol);
} }

View File

@@ -27,8 +27,8 @@ public class SecurityServiceImpl implements SecurityService {
} }
@Override @Override
public boolean authenticate(Message message, Client session, Protocol protocol) { public boolean authenticate(Message message, Client client, Protocol protocol) {
if(!session.authorized()) { if(!client.authorized()) {
return false; return false;
} }
// 信令权限鉴定 // 信令权限鉴定