[*] 优化

This commit is contained in:
acgist
2024-05-12 15:42:23 +08:00
parent 3d2f8e08f9
commit 90e32fa2e3
8 changed files with 160 additions and 79 deletions

View File

@@ -17,6 +17,7 @@
#include <future>
#include <thread>
#include <chrono>
#include <functional>
#include <uv.h>
@@ -42,21 +43,23 @@ static std::recursive_mutex taoyaoMutex;
// 读取JSON
#ifndef TAOYAO_JSON_BODY
#define TAOYAO_JSON_BODY(size) \
napi_value ret; \
size_t argc = size; \
napi_value args[size] = { nullptr }; \
napi_get_cb_info(env, info, &argc, args, nullptr, nullptr); \
size_t length; \
char chars[2048] = { 0 }; \
napi_get_value_string_utf8(env, args[0], chars, sizeof(chars), &length); \
if(length <= 0) { \
OH_LOG_WARN(LOG_APP, "TAOYAO ERROR JSON: %{public}s", chars); \
napi_create_int32(env, -1, &ret); \
return ret; \
} \
OH_LOG_DEBUG(LOG_APP, "TAOYAO JSON: %{public}s", chars); \
nlohmann::json json = nlohmann::json::parse(chars, chars + length); \
#define TAOYAO_JSON_BODY(size) \
napi_value ret; \
size_t argc = size; \
napi_value args[size] = { nullptr }; \
napi_get_cb_info(env, info, &argc, args, nullptr, nullptr); \
size_t length; \
char* chars = new char[16 * 1024] { 0 }; \
napi_get_value_string_utf8(env, args[0], chars, 16 * 1024, &length); \
if(length <= 0) { \
OH_LOG_WARN(LOG_APP, "TAOYAO ERROR JSON: %{public}d %{public}s", length, chars); \
napi_create_int32(env, -1, &ret); \
delete[] chars; \
return ret; \
} \
OH_LOG_DEBUG(LOG_APP, "TAOYAO JSON: %{public}d %{public}s", length, chars); \
nlohmann::json json = nlohmann::json::parse(chars, chars + length); \
delete[] chars; \
nlohmann::json body = json["body"];
#endif
@@ -108,7 +111,7 @@ static acgist::MediaManager* mediaManager = nullptr;
// 房间管理
static std::map<std::string, acgist::Room*> roomMap;
// 异步回调
static std::map<uint64_t, std::promise<std::string>*> promiseMap;
static std::map<uint64_t, std::promise<nlohmann::json>*> promiseMap;
/**
* 支持的编解码
@@ -141,7 +144,6 @@ struct Message {
};
static void pushCallback(uv_work_t* work) {
// 不能调用ETS函数
}
static void afterPushCallback(uv_work_t* work, int status) {
@@ -181,7 +183,6 @@ void push(const std::string& signal, const std::string& body, uint64_t id) {
}
static void requestCallback(uv_work_t* work) {
// 不能调用ETS函数
}
static void afterRequestCallback(uv_work_t* work, int status) {
@@ -212,7 +213,7 @@ static void afterRequestCallback(uv_work_t* work, int status) {
/**
* 发送请求
*/
std::string request(const std::string& signal, const std::string& body, uint64_t id) {
nlohmann::json request(const std::string& signal, const std::string& body, uint64_t id) {
uv_loop_s* loop = nullptr;
napi_get_uv_event_loop(acgist::env, &loop);
uv_work_t* work = new uv_work_t{};
@@ -231,23 +232,56 @@ std::string request(const std::string& signal, const std::string& body, uint64_t
1000 * acgist::clientIndex +
acgist::index;
}
std::promise<std::string>* promise = new std::promise<std::string>{};
std::promise<nlohmann::json>* promise = new std::promise<nlohmann::json>{};
acgist::promiseMap.insert({ id, promise });
work->data = new Message{ id, signal, body };
uv_queue_work(loop, work, requestCallback, afterRequestCallback);
std::future<std::string> future = promise->get_future();
std::future<nlohmann::json> future = promise->get_future();
if(future.wait_for(std::chrono::seconds(5)) == std::future_status::timeout) {
OH_LOG_WARN(LOG_APP, "请求超时:%{public}s %{public}s", signal.data(), body.data());
acgist::promiseMap.erase(id);
delete promise;
return "{}";
return nlohmann::json{};
} else {
acgist::promiseMap.erase(id);
delete promise;
return future.get();
return std::move(future.get());
}
}
struct Adync {
std::function<void()>* function;
};
static void asyncCallback(uv_work_t* work) {
Adync* async = (Adync*) work->data;
(*async->function)();
delete async;
}
static void afterAsyncCallback(uv_work_t* work, int status) {
delete work;
}
/**
* 异步执行
* 注意此处不能使用promise-future等待
*
* @param function 方法
*
* @return 结果
*/
static int asyncExecute(std::function<void()> function) {
// uv_loop_s* loop = nullptr;
// napi_get_uv_event_loop(acgist::env, &loop);
// uv_work_t* work = new uv_work_t{};
// work->data = new Adync { &function };
// uv_queue_work(loop, work, asyncCallback, afterAsyncCallback);
std::thread thread(function);
thread.detach();
return 0;
}
/**
* 加载系统
*/
@@ -326,9 +360,11 @@ static napi_value callback(napi_env env, napi_callback_info info) {
auto promise = acgist::promiseMap.find(id);
if(promise == acgist::promiseMap.end()) {
napi_create_int32(env, -1, &ret);
OH_LOG_DEBUG(LOG_APP, "Promise回调无效%{public}lld", id);
} else {
napi_create_int32(env, 0, &ret);
promise->second->set_value(chars);
promise->second->set_value(std::move(json));
OH_LOG_DEBUG(LOG_APP, "Promise回调成功%{public}lld", id);
}
return ret;
}
@@ -404,20 +440,29 @@ static napi_value roomInvite(napi_env env, napi_callback_info info) {
TAOYAO_JSON_BODY(1);
{
std::lock_guard<std::recursive_mutex> roomLock(roomMutex);
// TODO: 试试引用
std::string roomId = body["roomId"];
std::string password = body["password"];
std::string roomId = body["roomId"];
std::string password;
if(body.find("password") != body.end()) {
password = body["password"];
}
auto oldRoom = acgist::roomMap.find(roomId);
if(oldRoom == acgist::roomMap.end()) {
OH_LOG_INFO(LOG_APP, "进入房间:%s", roomId.data());
OH_LOG_INFO(LOG_APP, "进入房间:%{public}s", roomId.data());
auto room = new acgist::Room(roomId, mediaManager);
int result = room->enter(password);
if(result == acgist::SUCCESS_CODE) {
acgist::roomMap.insert({ roomId, room });
room->produceMedia();
} else {
delete room;
}
acgist::roomMap.insert({ roomId, room });
int result = asyncExecute([room, roomId, password]() {
int code = room->enter(password);
if(code == acgist::SUCCESS_CODE) {
try {
room->produceMedia();
} catch(const std::exception& e) {
OH_LOG_ERROR(LOG_APP, "进入房间异常:%{public}s %{public}s", roomId.data(), e.what());
}
} else {
acgist::roomMap.erase(roomId);
delete room;
}
});
napi_create_int32(env, result, &ret);
} else {
OH_LOG_INFO(LOG_APP, "已经进入房间:%s", roomId.data());

View File

@@ -54,7 +54,7 @@ extern void push(const std::string& signal, const std::string& body, uint64_t id
*
* @return 响应
*/
extern std::string request(const std::string& signal, const std::string& body, uint64_t id = 0L);
extern nlohmann::json request(const std::string& signal, const std::string& body, uint64_t id = 0L);
}

View File

@@ -13,7 +13,7 @@ static int32_t OnStreamEvent(OH_AudioCapturer* capturer, void* userData, OH_Audi
static int32_t OnInterruptEvent(OH_AudioCapturer* capturer, void* userData, OH_AudioInterrupt_ForceType type, OH_AudioInterrupt_Hint hint);
acgist::AudioCapturer::AudioCapturer() {
OH_AudioStream_Result ret = OH_AudioStreamBuilder_Create(&this->builder, AUDIOSTREAM_TYPE_RENDERER);
OH_AudioStream_Result ret = OH_AudioStreamBuilder_Create(&this->builder, AUDIOSTREAM_TYPE_CAPTURER);
OH_LOG_INFO(LOG_APP, "配置音频构造器:%o", ret);
// 配置音频采集参数
OH_AudioStreamBuilder_SetSamplingRate(this->builder, acgist::samplingRate);

View File

@@ -3,7 +3,7 @@
acgist::LocalClient::LocalClient(acgist::MediaManager* mediaManager) : acgist::RoomClient(mediaManager) {
this->mediaManager->newLocalClient();
this->audioTrack = this->mediaManager->getAudioTrack();
this->videoTrack = this->mediaManager->getVideoTrack();
// this->videoTrack = this->mediaManager->getVideoTrack();
}
acgist::LocalClient::~LocalClient() {

View File

@@ -110,7 +110,7 @@ int acgist::MediaManager::releaseLocalClient() {
bool acgist::MediaManager::startCapture() {
this->startAudioCapture();
this->startVideoCapture();
// this->startVideoCapture();
return true;
}

View File

@@ -1,6 +1,7 @@
#include "../include/Room.hpp"
#include <mutex>
#include <thread>
#include "hilog/log.h"
@@ -65,7 +66,7 @@ int acgist::Room::enter(const std::string& password) {
}
this->enterd = true;
if (this->device->IsLoaded()) {
OH_LOG_WARN(LOG_APP, "Device配置已经加载%s", this->roomId.data());
OH_LOG_WARN(LOG_APP, "Device配置已经加载%{public}s", this->roomId.data());
return -1;
}
// 本地终端
@@ -80,30 +81,34 @@ int acgist::Room::enter(const std::string& password) {
nlohmann::json requestBody = {
{ "roomId", this->roomId }
};
std::string response = acgist::request("media::router::rtp::capabilities", requestBody.dump());
nlohmann::json json = nlohmann::json::parse(response);
nlohmann::json json = acgist::request("media::router::rtp::capabilities", requestBody.dump());
if(json.find("body") == json.end()) {
OH_LOG_WARN(LOG_APP, "进入房间失败:%{public}s", this->roomId.data());
return -1;
}
nlohmann::json responseBody = json["body"];
nlohmann::json rtpCapabilities = responseBody["rtpCapabilities"];
// 加载设备
OH_LOG_INFO(LOG_APP, "加载设备:%{public}s", this->roomId.data());
mediasoupclient::PeerConnection::Options options;
options.config = *this->rtcConfiguration;
options.factory = this->mediaManager->peerConnectionFactory.get();
this->device->Load(rtpCapabilities, &options);
// 进入房间
OH_LOG_INFO(LOG_APP, "进入房间:%{public}s", this->roomId.data());
requestBody = {
{ "roomId", this->roomId },
{ "password", password },
{ "roomId", this->roomId },
{ "password", password },
{ "rtpCapabilities", this->device->GetRtpCapabilities() },
{ "sctpCapabilities", this->device->GetSctpCapabilities() }
};
response = acgist::request("room::enter", requestBody.dump());
OH_LOG_INFO(LOG_APP, "进入房间:%s", this->roomId.data());
acgist::request("room::enter", requestBody.dump());
return 0;
}
int acgist::Room::produceMedia() {
std::lock_guard<std::recursive_mutex> lockRoom(roomMutex);
OH_LOG_INFO(LOG_APP, "生成媒体:%s", this->roomId.data());
OH_LOG_INFO(LOG_APP, "生成媒体:%{public}s", this->roomId.data());
if(this->audioProduce || this->videoProduce) {
this->createSendTransport();
}
@@ -114,17 +119,16 @@ int acgist::Room::produceMedia() {
this->produceAudio();
}
if(this->videoProduce) {
this->produceVideo();
// this->produceVideo();
}
}
int acgist::Room::createSendTransport() {
std::lock_guard<std::recursive_mutex> lockRoom(roomMutex);
if(this->sendTransport != nullptr) {
OH_LOG_INFO(LOG_APP, "发送通道已经存在:%s", this->roomId.data());
OH_LOG_INFO(LOG_APP, "发送通道已经存在:%{public}s", this->roomId.data());
return -1;
}
OH_LOG_INFO(LOG_APP, "创建发送通道:%s", this->roomId.data());
nlohmann::json requestBody = {
{ "roomId", this->roomId },
{ "forceTcp", false },
@@ -132,8 +136,12 @@ int acgist::Room::createSendTransport() {
{ "consuming", false },
// { "sctpCapabilities", sctpCapabilities },
};
std::string response = acgist::request("media::transport::webrtc::create", requestBody.dump());
nlohmann::json json = nlohmann::json::parse(response);
nlohmann::json json = acgist::request("media::transport::webrtc::create", requestBody.dump());
if(json.find("body") == json.end()) {
OH_LOG_WARN(LOG_APP, "创建发送通道失败:%{public}s", this->roomId.data());
return -1;
}
OH_LOG_INFO(LOG_APP, "创建发送通道:%{public}s", this->roomId.data());
nlohmann::json responseBody = json["body"];
mediasoupclient::PeerConnection::Options options;
options.config = *this->rtcConfiguration;
@@ -153,10 +161,9 @@ int acgist::Room::createSendTransport() {
int acgist::Room::createRecvTransport() {
std::lock_guard<std::recursive_mutex> lockRoom(roomMutex);
if(this->recvTransport != nullptr) {
OH_LOG_INFO(LOG_APP, "接收通道已经存在:%s", this->roomId.data());
OH_LOG_INFO(LOG_APP, "接收通道已经存在:%{public}s", this->roomId.data());
return -1;
}
OH_LOG_INFO(LOG_APP, "创建接收通道:%s", this->roomId.data());
nlohmann::json requestBody = {
{ "roomId", this->roomId },
{ "forceTcp", false },
@@ -164,19 +171,23 @@ int acgist::Room::createRecvTransport() {
{ "consuming", true },
// { "sctpCapabilities", sctpCapabilities },
};
std::string response = acgist::request("media::transport::webrtc::create", requestBody.dump());
nlohmann::json json = nlohmann::json::parse(response);
nlohmann::json json = acgist::request("media::transport::webrtc::create", requestBody.dump());
if(json.find("body") == json.end()) {
OH_LOG_WARN(LOG_APP, "创建接收通道失败:%{public}s", this->roomId.data());
return -1;
}
OH_LOG_INFO(LOG_APP, "创建接收通道:%{public}s", this->roomId.data());
nlohmann::json responseBody = json["body"];
mediasoupclient::PeerConnection::Options options;
options.config = *this->rtcConfiguration;
options.factory = this->mediaManager->peerConnectionFactory.get();
this->recvTransport = this->device->CreateRecvTransport(
this->recvListener,
json["transportId"],
json["iceParameters"],
json["iceCandidates"],
json["dtlsParameters"],
json["sctpParameters"],
responseBody["transportId"],
responseBody["iceParameters"],
responseBody["iceCandidates"],
responseBody["dtlsParameters"],
responseBody["sctpParameters"],
&options
);
return 0;
@@ -185,15 +196,15 @@ int acgist::Room::createRecvTransport() {
int acgist::Room::produceAudio() {
std::lock_guard<std::recursive_mutex> lockRoom(roomMutex);
if(this->audioProducer != nullptr) {
OH_LOG_INFO(LOG_APP, "音频媒体已经生产:%s", this->roomId.data());
OH_LOG_INFO(LOG_APP, "音频媒体已经生产:%{public}s", this->roomId.data());
return -1;
}
if(!this->device->CanProduce("audio") || this->audioProducer == nullptr) {
OH_LOG_INFO(LOG_APP, "不能生产音频媒体:%s", this->roomId.data());
if(!this->device->CanProduce("audio")) {
OH_LOG_INFO(LOG_APP, "不能生产音频媒体:%{public}s", this->roomId.data());
return -1;
}
if(this->client->audioTrack->state() == webrtc::MediaStreamTrackInterface::TrackState::kEnded) {
OH_LOG_INFO(LOG_APP, "音频媒体状态错误:%s", this->roomId.data());
OH_LOG_INFO(LOG_APP, "音频媒体状态错误:%{public}s", this->roomId.data());
return -2;
}
OH_LOG_INFO(LOG_APP, "生产音频媒体:%s", this->roomId.data());
@@ -217,7 +228,7 @@ int acgist::Room::produceVideo() {
OH_LOG_INFO(LOG_APP, "视频媒体已经生产:%s", this->roomId.data());
return -1;
}
if(!this->device->CanProduce("video") || this->videoProducer == nullptr) {
if(!this->device->CanProduce("video")) {
OH_LOG_INFO(LOG_APP, "不能生产视频媒体:%s", this->roomId.data());
return -1;
}
@@ -543,12 +554,16 @@ std::future<std::string> acgist::SendListener::OnProduce(mediasoupclient::SendTr
{ "transportId", transport->GetId() },
{ "rtpParameters", rtpParameters },
};
std::string response = acgist::request("media::produce", requestBody.dump());
nlohmann::json json = nlohmann::json::parse(response);
nlohmann::json responseBody = json["body"];
std::string producerId = responseBody["producerId"];
std::promise<std::string> promise;
promise.set_value(producerId);
nlohmann::json json = acgist::request("media::produce", requestBody.dump());
if(json.find("body") == json.end()) {
OH_LOG_WARN(LOG_APP, "生产媒体失败:%{public}s", this->room->roomId.data());
promise.set_value("");
} else {
nlohmann::json responseBody = json["body"];
std::string producerId = responseBody["producerId"];
promise.set_value(producerId);
}
return promise.get_future();
}

View File

@@ -1,10 +1,13 @@
/**
* 信令
*
* 线程模型设计真的是个小丑🤡🤡🤡🤡
*
* @author acgist
*/
import hilog from "@ohos.hilog";
import List from '@ohos.util.List';
import { BusinessError } from '@ohos.base';
import webSocket from "@ohos.net.webSocket";
@@ -22,6 +25,8 @@ class TaoyaoSignal {
connected: boolean = false;
// 心跳定时
heartbeatTimer: number = 0;
// 本地回调
nativeCallback = new List<number>();
// 同步请求
callbackMapping = new Map<number, Function>();
// 当前消息索引0-666
@@ -265,8 +270,17 @@ class TaoyaoSignal {
const header: Record<string, Object> = json.header as Record<string, Object>;
const id : number = header.id as number;
const signal: string = header.signal as string;
if(this.nativeCallback.has(id)) {
hilog.debug(0x0000, "TaoyaoSignal", "处理同步消息(NAPI)%{public}s", message);
try {
taoyaoModule.callback(message);
} finally {
this.nativeCallback.remove(id);
}
return;
}
if (this.callbackMapping.has(id)) {
hilog.debug(0x0000, "TaoyaoSignal", "处理同步消息:%{public}s", message);
hilog.debug(0x0000, "TaoyaoSignal", "处理同步消息(ETS)%{public}s", message);
try {
const callback = this.callbackMapping.get(id) as Function;
if(callback(json)) {
@@ -294,10 +308,10 @@ class TaoyaoSignal {
ret = taoyaoModule.roomLeave(message);
break;
case "room::client::list":
ret = taoyaoModule.roomClientList(message);
// ret = taoyaoModule.roomClientList(message);
break;
case "media::consume":
ret = taoyaoModule.mediaConsume(message);
// ret = taoyaoModule.mediaConsume(message);
break;
case "media::consumer::close":
ret = taoyaoModule.mediaConsumerClose(message);
@@ -344,9 +358,11 @@ class TaoyaoSignal {
* @param id ID
*/
nativeRequest(signal: string, body: string, id: number = 0) {
taoyaoSignal.request(signal, JSON.parse(body), id).then(response => {
taoyaoModule.callback(JSON.stringify(response));
});
taoyaoSignal.nativeCallback.add(id);
taoyaoSignal.push(signal, JSON.parse(body), id);
setTimeout(() => {
taoyaoSignal.nativeCallback.remove(id);
}, 5000);
}
}

View File

@@ -36,30 +36,35 @@
"requestPermissions": [
{
"name": "ohos.permission.CAMERA",
"reason": "$string:app_name",
"usedScene": {
"when": "always"
"when": "inuse"
}
},
{
"name": "ohos.permission.INTERNET",
"reason": "$string:app_name",
"usedScene": {
"when": "always"
}
},
{
"name": "ohos.permission.MICROPHONE",
"reason": "$string:app_name",
"usedScene": {
"when": "always"
"when": "inuse"
}
},
{
"name": "ohos.permission.READ_MEDIA",
"reason": "$string:app_name",
"usedScene": {
"when": "always"
}
},
{
"name": "ohos.permission.WRITE_MEDIA",
"reason": "$string:app_name",
"usedScene": {
"when": "always"
}