From 90e32fa2e3b4aa5bd3a186e483b75185aee00b32 Mon Sep 17 00:00:00 2001 From: acgist <289547414@qq.com> Date: Sun, 12 May 2024 15:42:23 +0800 Subject: [PATCH] =?UTF-8?q?[*]=20=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../taoyao/media/src/main/cpp/bind.cpp | 115 ++++++++++++------ .../media/src/main/cpp/include/Signal.hpp | 2 +- .../src/main/cpp/media/AudioCapturer.cpp | 2 +- .../media/src/main/cpp/media/LocalClient.cpp | 2 +- .../media/src/main/cpp/media/MediaManager.cpp | 2 +- .../taoyao/media/src/main/cpp/media/Room.cpp | 79 +++++++----- .../src/main/ets/taoyao/TaoyaoSignal.ets | 28 ++++- .../taoyao/media/src/main/module.json5 | 9 +- 8 files changed, 160 insertions(+), 79 deletions(-) diff --git a/taoyao-client-openharmony/taoyao/media/src/main/cpp/bind.cpp b/taoyao-client-openharmony/taoyao/media/src/main/cpp/bind.cpp index 1727731..b4e5c3d 100644 --- a/taoyao-client-openharmony/taoyao/media/src/main/cpp/bind.cpp +++ b/taoyao-client-openharmony/taoyao/media/src/main/cpp/bind.cpp @@ -17,6 +17,7 @@ #include #include #include +#include #include @@ -42,21 +43,23 @@ static std::recursive_mutex taoyaoMutex; // 读取JSON #ifndef TAOYAO_JSON_BODY -#define TAOYAO_JSON_BODY(size) \ - napi_value ret; \ - size_t argc = size; \ - napi_value args[size] = { nullptr }; \ - napi_get_cb_info(env, info, &argc, args, nullptr, nullptr); \ - size_t length; \ - char chars[2048] = { 0 }; \ - napi_get_value_string_utf8(env, args[0], chars, sizeof(chars), &length); \ - if(length <= 0) { \ - OH_LOG_WARN(LOG_APP, "TAOYAO ERROR JSON: %{public}s", chars); \ - napi_create_int32(env, -1, &ret); \ - return ret; \ - } \ - OH_LOG_DEBUG(LOG_APP, "TAOYAO JSON: %{public}s", chars); \ - nlohmann::json json = nlohmann::json::parse(chars, chars + length); \ +#define TAOYAO_JSON_BODY(size) \ + napi_value ret; \ + size_t argc = size; \ + napi_value args[size] = { nullptr }; \ + napi_get_cb_info(env, info, &argc, args, nullptr, nullptr); \ + size_t length; \ + char* chars = new char[16 * 1024] { 0 }; \ + napi_get_value_string_utf8(env, args[0], chars, 16 * 1024, &length); \ + if(length <= 0) { \ + OH_LOG_WARN(LOG_APP, "TAOYAO ERROR JSON: %{public}d %{public}s", length, chars); \ + napi_create_int32(env, -1, &ret); \ + delete[] chars; \ + return ret; \ + } \ + OH_LOG_DEBUG(LOG_APP, "TAOYAO JSON: %{public}d %{public}s", length, chars); \ + nlohmann::json json = nlohmann::json::parse(chars, chars + length); \ + delete[] chars; \ nlohmann::json body = json["body"]; #endif @@ -108,7 +111,7 @@ static acgist::MediaManager* mediaManager = nullptr; // 房间管理 static std::map roomMap; // 异步回调 -static std::map*> promiseMap; +static std::map*> promiseMap; /** * 支持的编解码 @@ -141,7 +144,6 @@ struct Message { }; static void pushCallback(uv_work_t* work) { - // 不能调用ETS函数 } static void afterPushCallback(uv_work_t* work, int status) { @@ -181,7 +183,6 @@ void push(const std::string& signal, const std::string& body, uint64_t id) { } static void requestCallback(uv_work_t* work) { - // 不能调用ETS函数 } static void afterRequestCallback(uv_work_t* work, int status) { @@ -212,7 +213,7 @@ static void afterRequestCallback(uv_work_t* work, int status) { /** * 发送请求 */ -std::string request(const std::string& signal, const std::string& body, uint64_t id) { +nlohmann::json request(const std::string& signal, const std::string& body, uint64_t id) { uv_loop_s* loop = nullptr; napi_get_uv_event_loop(acgist::env, &loop); uv_work_t* work = new uv_work_t{}; @@ -231,23 +232,56 @@ std::string request(const std::string& signal, const std::string& body, uint64_t 1000 * acgist::clientIndex + acgist::index; } - std::promise* promise = new std::promise{}; + std::promise* promise = new std::promise{}; acgist::promiseMap.insert({ id, promise }); work->data = new Message{ id, signal, body }; uv_queue_work(loop, work, requestCallback, afterRequestCallback); - std::future future = promise->get_future(); + std::future future = promise->get_future(); if(future.wait_for(std::chrono::seconds(5)) == std::future_status::timeout) { OH_LOG_WARN(LOG_APP, "请求超时:%{public}s %{public}s", signal.data(), body.data()); acgist::promiseMap.erase(id); delete promise; - return "{}"; + return nlohmann::json{}; } else { acgist::promiseMap.erase(id); delete promise; - return future.get(); + return std::move(future.get()); } } +struct Adync { + std::function* function; +}; + +static void asyncCallback(uv_work_t* work) { + Adync* async = (Adync*) work->data; + (*async->function)(); + delete async; +} + +static void afterAsyncCallback(uv_work_t* work, int status) { + delete work; +} + +/** + * 异步执行 + * 注意:此处不能使用promise-future等待 + * + * @param function 方法 + * + * @return 结果 + */ +static int asyncExecute(std::function function) { +// uv_loop_s* loop = nullptr; +// napi_get_uv_event_loop(acgist::env, &loop); +// uv_work_t* work = new uv_work_t{}; +// work->data = new Adync { &function }; +// uv_queue_work(loop, work, asyncCallback, afterAsyncCallback); + std::thread thread(function); + thread.detach(); + return 0; +} + /** * 加载系统 */ @@ -326,9 +360,11 @@ static napi_value callback(napi_env env, napi_callback_info info) { auto promise = acgist::promiseMap.find(id); if(promise == acgist::promiseMap.end()) { napi_create_int32(env, -1, &ret); + OH_LOG_DEBUG(LOG_APP, "Promise回调无效:%{public}lld", id); } else { napi_create_int32(env, 0, &ret); - promise->second->set_value(chars); + promise->second->set_value(std::move(json)); + OH_LOG_DEBUG(LOG_APP, "Promise回调成功:%{public}lld", id); } return ret; } @@ -404,20 +440,29 @@ static napi_value roomInvite(napi_env env, napi_callback_info info) { TAOYAO_JSON_BODY(1); { std::lock_guard roomLock(roomMutex); - // TODO: 试试引用 - std::string roomId = body["roomId"]; - std::string password = body["password"]; + std::string roomId = body["roomId"]; + std::string password; + if(body.find("password") != body.end()) { + password = body["password"]; + } auto oldRoom = acgist::roomMap.find(roomId); if(oldRoom == acgist::roomMap.end()) { - OH_LOG_INFO(LOG_APP, "进入房间:%s", roomId.data()); + OH_LOG_INFO(LOG_APP, "进入房间:%{public}s", roomId.data()); auto room = new acgist::Room(roomId, mediaManager); - int result = room->enter(password); - if(result == acgist::SUCCESS_CODE) { - acgist::roomMap.insert({ roomId, room }); - room->produceMedia(); - } else { - delete room; - } + acgist::roomMap.insert({ roomId, room }); + int result = asyncExecute([room, roomId, password]() { + int code = room->enter(password); + if(code == acgist::SUCCESS_CODE) { + try { + room->produceMedia(); + } catch(const std::exception& e) { + OH_LOG_ERROR(LOG_APP, "进入房间异常:%{public}s %{public}s", roomId.data(), e.what()); + } + } else { + acgist::roomMap.erase(roomId); + delete room; + } + }); napi_create_int32(env, result, &ret); } else { OH_LOG_INFO(LOG_APP, "已经进入房间:%s", roomId.data()); diff --git a/taoyao-client-openharmony/taoyao/media/src/main/cpp/include/Signal.hpp b/taoyao-client-openharmony/taoyao/media/src/main/cpp/include/Signal.hpp index 0205f53..bee6939 100644 --- a/taoyao-client-openharmony/taoyao/media/src/main/cpp/include/Signal.hpp +++ b/taoyao-client-openharmony/taoyao/media/src/main/cpp/include/Signal.hpp @@ -54,7 +54,7 @@ extern void push(const std::string& signal, const std::string& body, uint64_t id * * @return 响应 */ -extern std::string request(const std::string& signal, const std::string& body, uint64_t id = 0L); +extern nlohmann::json request(const std::string& signal, const std::string& body, uint64_t id = 0L); } diff --git a/taoyao-client-openharmony/taoyao/media/src/main/cpp/media/AudioCapturer.cpp b/taoyao-client-openharmony/taoyao/media/src/main/cpp/media/AudioCapturer.cpp index 5b4c9d1..1fb840a 100644 --- a/taoyao-client-openharmony/taoyao/media/src/main/cpp/media/AudioCapturer.cpp +++ b/taoyao-client-openharmony/taoyao/media/src/main/cpp/media/AudioCapturer.cpp @@ -13,7 +13,7 @@ static int32_t OnStreamEvent(OH_AudioCapturer* capturer, void* userData, OH_Audi static int32_t OnInterruptEvent(OH_AudioCapturer* capturer, void* userData, OH_AudioInterrupt_ForceType type, OH_AudioInterrupt_Hint hint); acgist::AudioCapturer::AudioCapturer() { - OH_AudioStream_Result ret = OH_AudioStreamBuilder_Create(&this->builder, AUDIOSTREAM_TYPE_RENDERER); + OH_AudioStream_Result ret = OH_AudioStreamBuilder_Create(&this->builder, AUDIOSTREAM_TYPE_CAPTURER); OH_LOG_INFO(LOG_APP, "配置音频构造器:%o", ret); // 配置音频采集参数 OH_AudioStreamBuilder_SetSamplingRate(this->builder, acgist::samplingRate); diff --git a/taoyao-client-openharmony/taoyao/media/src/main/cpp/media/LocalClient.cpp b/taoyao-client-openharmony/taoyao/media/src/main/cpp/media/LocalClient.cpp index 2432679..dd5482b 100644 --- a/taoyao-client-openharmony/taoyao/media/src/main/cpp/media/LocalClient.cpp +++ b/taoyao-client-openharmony/taoyao/media/src/main/cpp/media/LocalClient.cpp @@ -3,7 +3,7 @@ acgist::LocalClient::LocalClient(acgist::MediaManager* mediaManager) : acgist::RoomClient(mediaManager) { this->mediaManager->newLocalClient(); this->audioTrack = this->mediaManager->getAudioTrack(); - this->videoTrack = this->mediaManager->getVideoTrack(); +// this->videoTrack = this->mediaManager->getVideoTrack(); } acgist::LocalClient::~LocalClient() { diff --git a/taoyao-client-openharmony/taoyao/media/src/main/cpp/media/MediaManager.cpp b/taoyao-client-openharmony/taoyao/media/src/main/cpp/media/MediaManager.cpp index 99b9181..a56a0a3 100644 --- a/taoyao-client-openharmony/taoyao/media/src/main/cpp/media/MediaManager.cpp +++ b/taoyao-client-openharmony/taoyao/media/src/main/cpp/media/MediaManager.cpp @@ -110,7 +110,7 @@ int acgist::MediaManager::releaseLocalClient() { bool acgist::MediaManager::startCapture() { this->startAudioCapture(); - this->startVideoCapture(); +// this->startVideoCapture(); return true; } diff --git a/taoyao-client-openharmony/taoyao/media/src/main/cpp/media/Room.cpp b/taoyao-client-openharmony/taoyao/media/src/main/cpp/media/Room.cpp index d19abb6..951e584 100644 --- a/taoyao-client-openharmony/taoyao/media/src/main/cpp/media/Room.cpp +++ b/taoyao-client-openharmony/taoyao/media/src/main/cpp/media/Room.cpp @@ -1,6 +1,7 @@ #include "../include/Room.hpp" #include +#include #include "hilog/log.h" @@ -65,7 +66,7 @@ int acgist::Room::enter(const std::string& password) { } this->enterd = true; if (this->device->IsLoaded()) { - OH_LOG_WARN(LOG_APP, "Device配置已经加载:%s", this->roomId.data()); + OH_LOG_WARN(LOG_APP, "Device配置已经加载:%{public}s", this->roomId.data()); return -1; } // 本地终端 @@ -80,30 +81,34 @@ int acgist::Room::enter(const std::string& password) { nlohmann::json requestBody = { { "roomId", this->roomId } }; - std::string response = acgist::request("media::router::rtp::capabilities", requestBody.dump()); - nlohmann::json json = nlohmann::json::parse(response); + nlohmann::json json = acgist::request("media::router::rtp::capabilities", requestBody.dump()); + if(json.find("body") == json.end()) { + OH_LOG_WARN(LOG_APP, "进入房间失败:%{public}s", this->roomId.data()); + return -1; + } nlohmann::json responseBody = json["body"]; nlohmann::json rtpCapabilities = responseBody["rtpCapabilities"]; // 加载设备 + OH_LOG_INFO(LOG_APP, "加载设备:%{public}s", this->roomId.data()); mediasoupclient::PeerConnection::Options options; options.config = *this->rtcConfiguration; options.factory = this->mediaManager->peerConnectionFactory.get(); this->device->Load(rtpCapabilities, &options); // 进入房间 + OH_LOG_INFO(LOG_APP, "进入房间:%{public}s", this->roomId.data()); requestBody = { - { "roomId", this->roomId }, - { "password", password }, + { "roomId", this->roomId }, + { "password", password }, { "rtpCapabilities", this->device->GetRtpCapabilities() }, { "sctpCapabilities", this->device->GetSctpCapabilities() } }; - response = acgist::request("room::enter", requestBody.dump()); - OH_LOG_INFO(LOG_APP, "进入房间:%s", this->roomId.data()); + acgist::request("room::enter", requestBody.dump()); return 0; } int acgist::Room::produceMedia() { std::lock_guard lockRoom(roomMutex); - OH_LOG_INFO(LOG_APP, "生成媒体:%s", this->roomId.data()); + OH_LOG_INFO(LOG_APP, "生成媒体:%{public}s", this->roomId.data()); if(this->audioProduce || this->videoProduce) { this->createSendTransport(); } @@ -114,17 +119,16 @@ int acgist::Room::produceMedia() { this->produceAudio(); } if(this->videoProduce) { - this->produceVideo(); +// this->produceVideo(); } } int acgist::Room::createSendTransport() { std::lock_guard lockRoom(roomMutex); if(this->sendTransport != nullptr) { - OH_LOG_INFO(LOG_APP, "发送通道已经存在:%s", this->roomId.data()); + OH_LOG_INFO(LOG_APP, "发送通道已经存在:%{public}s", this->roomId.data()); return -1; } - OH_LOG_INFO(LOG_APP, "创建发送通道:%s", this->roomId.data()); nlohmann::json requestBody = { { "roomId", this->roomId }, { "forceTcp", false }, @@ -132,8 +136,12 @@ int acgist::Room::createSendTransport() { { "consuming", false }, // { "sctpCapabilities", sctpCapabilities }, }; - std::string response = acgist::request("media::transport::webrtc::create", requestBody.dump()); - nlohmann::json json = nlohmann::json::parse(response); + nlohmann::json json = acgist::request("media::transport::webrtc::create", requestBody.dump()); + if(json.find("body") == json.end()) { + OH_LOG_WARN(LOG_APP, "创建发送通道失败:%{public}s", this->roomId.data()); + return -1; + } + OH_LOG_INFO(LOG_APP, "创建发送通道:%{public}s", this->roomId.data()); nlohmann::json responseBody = json["body"]; mediasoupclient::PeerConnection::Options options; options.config = *this->rtcConfiguration; @@ -153,10 +161,9 @@ int acgist::Room::createSendTransport() { int acgist::Room::createRecvTransport() { std::lock_guard lockRoom(roomMutex); if(this->recvTransport != nullptr) { - OH_LOG_INFO(LOG_APP, "接收通道已经存在:%s", this->roomId.data()); + OH_LOG_INFO(LOG_APP, "接收通道已经存在:%{public}s", this->roomId.data()); return -1; } - OH_LOG_INFO(LOG_APP, "创建接收通道:%s", this->roomId.data()); nlohmann::json requestBody = { { "roomId", this->roomId }, { "forceTcp", false }, @@ -164,19 +171,23 @@ int acgist::Room::createRecvTransport() { { "consuming", true }, // { "sctpCapabilities", sctpCapabilities }, }; - std::string response = acgist::request("media::transport::webrtc::create", requestBody.dump()); - nlohmann::json json = nlohmann::json::parse(response); + nlohmann::json json = acgist::request("media::transport::webrtc::create", requestBody.dump()); + if(json.find("body") == json.end()) { + OH_LOG_WARN(LOG_APP, "创建接收通道失败:%{public}s", this->roomId.data()); + return -1; + } + OH_LOG_INFO(LOG_APP, "创建接收通道:%{public}s", this->roomId.data()); nlohmann::json responseBody = json["body"]; mediasoupclient::PeerConnection::Options options; options.config = *this->rtcConfiguration; options.factory = this->mediaManager->peerConnectionFactory.get(); this->recvTransport = this->device->CreateRecvTransport( this->recvListener, - json["transportId"], - json["iceParameters"], - json["iceCandidates"], - json["dtlsParameters"], - json["sctpParameters"], + responseBody["transportId"], + responseBody["iceParameters"], + responseBody["iceCandidates"], + responseBody["dtlsParameters"], + responseBody["sctpParameters"], &options ); return 0; @@ -185,15 +196,15 @@ int acgist::Room::createRecvTransport() { int acgist::Room::produceAudio() { std::lock_guard lockRoom(roomMutex); if(this->audioProducer != nullptr) { - OH_LOG_INFO(LOG_APP, "音频媒体已经生产:%s", this->roomId.data()); + OH_LOG_INFO(LOG_APP, "音频媒体已经生产:%{public}s", this->roomId.data()); return -1; } - if(!this->device->CanProduce("audio") || this->audioProducer == nullptr) { - OH_LOG_INFO(LOG_APP, "不能生产音频媒体:%s", this->roomId.data()); + if(!this->device->CanProduce("audio")) { + OH_LOG_INFO(LOG_APP, "不能生产音频媒体:%{public}s", this->roomId.data()); return -1; } if(this->client->audioTrack->state() == webrtc::MediaStreamTrackInterface::TrackState::kEnded) { - OH_LOG_INFO(LOG_APP, "音频媒体状态错误:%s", this->roomId.data()); + OH_LOG_INFO(LOG_APP, "音频媒体状态错误:%{public}s", this->roomId.data()); return -2; } OH_LOG_INFO(LOG_APP, "生产音频媒体:%s", this->roomId.data()); @@ -217,7 +228,7 @@ int acgist::Room::produceVideo() { OH_LOG_INFO(LOG_APP, "视频媒体已经生产:%s", this->roomId.data()); return -1; } - if(!this->device->CanProduce("video") || this->videoProducer == nullptr) { + if(!this->device->CanProduce("video")) { OH_LOG_INFO(LOG_APP, "不能生产视频媒体:%s", this->roomId.data()); return -1; } @@ -543,12 +554,16 @@ std::future acgist::SendListener::OnProduce(mediasoupclient::SendTr { "transportId", transport->GetId() }, { "rtpParameters", rtpParameters }, }; - std::string response = acgist::request("media::produce", requestBody.dump()); - nlohmann::json json = nlohmann::json::parse(response); - nlohmann::json responseBody = json["body"]; - std::string producerId = responseBody["producerId"]; std::promise promise; - promise.set_value(producerId); + nlohmann::json json = acgist::request("media::produce", requestBody.dump()); + if(json.find("body") == json.end()) { + OH_LOG_WARN(LOG_APP, "生产媒体失败:%{public}s", this->room->roomId.data()); + promise.set_value(""); + } else { + nlohmann::json responseBody = json["body"]; + std::string producerId = responseBody["producerId"]; + promise.set_value(producerId); + } return promise.get_future(); } diff --git a/taoyao-client-openharmony/taoyao/media/src/main/ets/taoyao/TaoyaoSignal.ets b/taoyao-client-openharmony/taoyao/media/src/main/ets/taoyao/TaoyaoSignal.ets index f06af0f..fd6cb0f 100644 --- a/taoyao-client-openharmony/taoyao/media/src/main/ets/taoyao/TaoyaoSignal.ets +++ b/taoyao-client-openharmony/taoyao/media/src/main/ets/taoyao/TaoyaoSignal.ets @@ -1,10 +1,13 @@ /** * 信令 * + * 线程模型设计真的是个小丑🤡🤡🤡🤡 + * * @author acgist */ import hilog from "@ohos.hilog"; +import List from '@ohos.util.List'; import { BusinessError } from '@ohos.base'; import webSocket from "@ohos.net.webSocket"; @@ -22,6 +25,8 @@ class TaoyaoSignal { connected: boolean = false; // 心跳定时 heartbeatTimer: number = 0; + // 本地回调 + nativeCallback = new List(); // 同步请求 callbackMapping = new Map(); // 当前消息索引:0-666 @@ -265,8 +270,17 @@ class TaoyaoSignal { const header: Record = json.header as Record; const id : number = header.id as number; const signal: string = header.signal as string; + if(this.nativeCallback.has(id)) { + hilog.debug(0x0000, "TaoyaoSignal", "处理同步消息(NAPI):%{public}s", message); + try { + taoyaoModule.callback(message); + } finally { + this.nativeCallback.remove(id); + } + return; + } if (this.callbackMapping.has(id)) { - hilog.debug(0x0000, "TaoyaoSignal", "处理同步消息:%{public}s", message); + hilog.debug(0x0000, "TaoyaoSignal", "处理同步消息(ETS):%{public}s", message); try { const callback = this.callbackMapping.get(id) as Function; if(callback(json)) { @@ -294,10 +308,10 @@ class TaoyaoSignal { ret = taoyaoModule.roomLeave(message); break; case "room::client::list": - ret = taoyaoModule.roomClientList(message); + // ret = taoyaoModule.roomClientList(message); break; case "media::consume": - ret = taoyaoModule.mediaConsume(message); + // ret = taoyaoModule.mediaConsume(message); break; case "media::consumer::close": ret = taoyaoModule.mediaConsumerClose(message); @@ -344,9 +358,11 @@ class TaoyaoSignal { * @param id ID */ nativeRequest(signal: string, body: string, id: number = 0) { - taoyaoSignal.request(signal, JSON.parse(body), id).then(response => { - taoyaoModule.callback(JSON.stringify(response)); - }); + taoyaoSignal.nativeCallback.add(id); + taoyaoSignal.push(signal, JSON.parse(body), id); + setTimeout(() => { + taoyaoSignal.nativeCallback.remove(id); + }, 5000); } } diff --git a/taoyao-client-openharmony/taoyao/media/src/main/module.json5 b/taoyao-client-openharmony/taoyao/media/src/main/module.json5 index 707ca0f..7f722c4 100644 --- a/taoyao-client-openharmony/taoyao/media/src/main/module.json5 +++ b/taoyao-client-openharmony/taoyao/media/src/main/module.json5 @@ -36,30 +36,35 @@ "requestPermissions": [ { "name": "ohos.permission.CAMERA", + "reason": "$string:app_name", "usedScene": { - "when": "always" + "when": "inuse" } }, { "name": "ohos.permission.INTERNET", + "reason": "$string:app_name", "usedScene": { "when": "always" } }, { "name": "ohos.permission.MICROPHONE", + "reason": "$string:app_name", "usedScene": { - "when": "always" + "when": "inuse" } }, { "name": "ohos.permission.READ_MEDIA", + "reason": "$string:app_name", "usedScene": { "when": "always" } }, { "name": "ohos.permission.WRITE_MEDIA", + "reason": "$string:app_name", "usedScene": { "when": "always" }