@@ -806,21 +806,12 @@ class Taoyao extends RemoteClient {
case "media::consumer::resume" :
this . defaultMediaConsumerResume ( message ) ;
break ;
case "media::consumer::status" :
this . defaultMediaConsumerStatus ( message ) ;
break ;
case "media::data::consumer::close" :
me . defaultMediaDataConsumerClose ( message ) ;
break ;
case "media::data::consumer::status" :
me . defaultMediaDataConsumerStatus ( message ) ;
break ;
case "media::data::producer::close" :
me . defaultMediaDataProducerClose ( message ) ;
break ;
case "media::data::producer::status" :
me . defaultMediaDataProducerStatus ( message ) ;
break ;
case "media::producer::close" :
me . defaultMediaProducerClose ( message ) ;
break ;
@@ -1315,12 +1306,29 @@ class Taoyao extends RemoteClient {
}
/**
* 查询消费者状态信令
*
* @param {*} message 消息
*/
defaultM ediaConsumerStatus( message ) {
console . debug ( "消费者状态" , message ) ;
* 查询消费者状态信令
*
* @param {*} consumerId 消费者ID
*/
async m ediaConsumerStatus( consumerId ) {
const me = this ;
return await me . request ( protocol . buildMessage ( 'media::consumer::status' , {
roomId : me . roomId ,
consumerId
} ) ) ;
}
/**
* 关闭数据消费者信令
*
* @param {*} consumerId 数据消费者ID
*/
mediaDataConsumerClose ( consumerId ) {
const me = this ;
me . push ( protocol . buildMessage ( "media::data::consumer::close" , {
roomId : me . roomId ,
consumerId : consumerId ,
} ) ) ;
}
/**
@@ -1344,10 +1352,28 @@ class Taoyao extends RemoteClient {
/**
* 查询数据消费者状态信令
*
* @param {*} message 信令消息
* @param {*} consumerId 消费者ID
*/
defaultM ediaDataConsumerStatus( message ) {
console . debug ( "数据消费者状态" , message ) ;
async m ediaDataConsumerStatus( consumerId ) {
const me = this ;
return await me . request ( protocol . buildMessage ( 'media::data::consumer::status' , {
roomId : me . roomId ,
consumerId
} ) ) ;
}
/**
* 关闭数据生产者信令
*
* @param {*} producerId 数据生产者ID
*/
mediaDataProducerClose ( producerId ) {
this . push (
protocol . buildMessage ( "media::data::producer::close" , {
roomId : this . roomId ,
producerId : producerId
} )
) ;
}
/**
@@ -1369,12 +1395,30 @@ class Taoyao extends RemoteClient {
}
/**
* 关闭数据消费者 信令
* 查询数据生产者状态 信令
*
* @param {*} message 消息
* @param {*} producerId 生产者ID
*/
defaultM ediaDataProducerStatus( message ) {
console . info ( "数据生产者状态" , message ) ;
async m ediaDataProducerStatus( producerId ) {
const me = this ;
return await me . request ( protocol . buildMessage ( 'media::data::producer::status' , {
roomId : me . roomId ,
producerId
} ) ) ;
}
/**
* 关闭生产者信令
*
* @param {*} producerId 生产者ID
*/
mediaProducerClose ( producerId ) {
this . push (
protocol . buildMessage ( "media::producer::close" , {
roomId : this . roomId ,
producerId : producerId
} )
) ;
}
/**
@@ -1478,6 +1522,19 @@ class Taoyao extends RemoteClient {
}
}
/**
* 查询生产者状态信令
*
* @param {*} producerId 生产者ID
*/
async mediaProducerStatus ( producerId ) {
const me = this ;
return await me . request ( protocol . buildMessage ( 'media::producer::status' , {
roomId : me . roomId ,
producerId
} ) ) ;
}
/**
* 重启ICE信令
*/
@@ -1508,7 +1565,22 @@ class Taoyao extends RemoteClient {
console . debug ( "视频方向变化信令" , message ) ;
}
// TODO: continue
/**
* 消费媒体信令
*
* @param {*} producerId 生产者ID
*/
mediaConsume ( producerId ) {
const me = this ;
if ( ! me . recvTransport ) {
me . callbackError ( "没有连接接收通道" ) ;
return ;
}
me . push ( protocol . buildMessage ( "media::consume" , {
roomId : me . roomId ,
producerId : producerId ,
} ) ) ;
}
/**
* 消费媒体信令
@@ -1520,76 +1592,85 @@ class Taoyao extends RemoteClient {
* @param {*} message 消息
*/
async defaultMediaConsume ( message ) {
const self = this ;
if ( ! self . audioConsume && ! self . videoConsume ) {
const me = this ;
if ( ! me . audioConsume && ! me . videoConsume ) {
console . debug ( "没有消费媒体" ) ;
return ;
}
const {
kind ,
type ,
roomId ,
appData ,
clientId ,
sourceId ,
streamId ,
producerId ,
consumerId ,
kind ,
type ,
appData ,
rtpParameters ,
producerPaused ,
} = message . body ;
try {
const consumer = await self . recvTransport . consume ( {
const consumer = await me . recvTransport . consume ( {
id : consumerId ,
appData : { ... appData , clientId , sourceId , streamId } ,
// 让libwebrtc同步相同来源媒体
streamId : ` ${ clientId } - ${ appData . videoSource || "taoyao" } ` ,
kind ,
producerId ,
rtpParameters ,
// 强制设置streamId, 让libwebrtc同步麦克风和摄像头, 屏幕共享不要求同步。
streamId : ` ${ clientId } - ${ appData ? . videoSource ? appData . videoSource : "unknown" } ` ,
appData , // Trick.
} ) ;
consumer . clientId = clientId ;
consumer . sourceId = sourceId ;
consumer . streamId = streamId ;
self . consumers . set ( consumer . id , consumer ) ;
me . consumers . set ( consumer . id , consumer ) ;
consumer . on ( "transportclose" , ( ) => {
self . consumers . delete ( consumer . id ) ;
console . debug ( "消费者关闭(通道关闭)" , consumer . id , streamId );
consumer . close ( ) ;
} ) ;
const { spatialLay ers , temporalLayers } =
mediasoupClient . parseScalabilityMode (
consumer . rtpParameters . encodings [ 0 ] . scalabilityMode
) ;
self . push ( message ) ;
console . debug ( "远程媒体:" , consumer ) ;
const remoteClient = self . remoteClients . get ( consumer . sourceId ) ;
if ( remoteClient && r emoteClient . proxy && remote Client. proxy . media ) {
const track = consumer . track ;
// TODO: 旧的媒体?
consumer . observ er. on ( "close" , ( ) => {
if ( me . consumers . delete ( consumer . id ) ) {
console . debug ( "消费者关闭" , consumer . id , streamId ) ;
} else {
console . debug ( "消费者关闭(无效)" , consumer . id , streamId ) ;
}
} ) ;
const { spatialLayers , t emporalLayers } = mediasoup Client. parseScalabilityMode (
consumer . rtpParameters . encodings [ 0 ] . scalabilityMode
) ;
console . debug ( "时间层空间层" , spatialLayers , temporalLayers ) ;
me . push ( message ) ;
console . debug ( "远程媒体消费者" , consumer ) ;
const track = consumer . track ;
const remoteClient = me . remoteClients . get ( consumer . sourceId ) ;
me . callbackTrack ( sourceId , track ) ;
if (
remoteClient &&
remoteClient . proxy &&
remoteClient . proxy . media
) {
if ( track . kind === "audio" ) {
remoteClient . audioTrack = track ;
remoteClient . audioTrack = track ;
remoteClient . audioConsumer = consumer ;
} else if ( track . kind === "video" ) {
remoteClient . videoTrack = track ;
remoteClient . videoTrack = track ;
remoteClient . videoConsumer = consumer ;
} else {
console . warn ( "不支持的媒体: " , track ) ;
console . warn ( "不支持的媒体类型 " , track ) ;
}
remoteClient . proxy . media ( consumer . track, consumer ) ;
remoteClient . proxy . media ( track , consumer ) ;
} else {
console . warn ( "远程终端没有实现服务 代理: " , remoteClient ) ;
}
// 实现进入自动暂停视频,注:必须订阅所有类型媒体,不然媒体服务直接不会转发视频媒体
if ( consumer . kind === "video" && ! self . videoProduce ) {
// this.pauseConsumer(consumer);
// TODO: 实现
console . warn ( "远程终端没有实现代理" , remoteClient ) ;
}
} catch ( error ) {
self . callbackError ( "消费媒体异常" , error ) ;
me . callbackError ( "消费媒体异常" , error ) ;
}
}
/**
* 消费数据信令
*
* @param {*} producerId
* @param {*} producerId 数据生产者ID
*/
mediaDataConsume ( producerId ) {
const me = this ;
@@ -1597,13 +1678,12 @@ class Taoyao extends RemoteClient {
me . callbackError ( "没有连接接收通道" ) ;
return ;
}
me . push (
p rotocol . buildMessage ( "media::data::consume" , {
roomId : me . room Id ,
producerId : producerId ,
} )
) ;
me . push ( protocol . buildMessage ( "media::data::consume" , {
roomId : me . roomId ,
producerId : producer Id,
} ) ) ;
}
/**
* 消费数据信令
*
@@ -1612,63 +1692,67 @@ class Taoyao extends RemoteClient {
async defaultMediaDataConsume ( message ) {
const me = this ;
const {
roomId ,
clientId ,
sourceId ,
streamId ,
producerId ,
consumerId ,
label ,
appData ,
protocol ,
consumerId ,
producerId ,
sctpStreamParameters ,
} = message . body ;
try {
const dataConsumer = await me . recvTransport . consumeData ( {
id : consumerId ,
dataProducerId : producerId ,
id : consumerId ,
dataProducerId : producerId ,
label ,
appData ,
protocol ,
sctpStreamParameters ,
} ) ;
me . dataConsumers . set ( dataConsumer . id , dataConsumer ) ;
dataConsumer. on ( 'transportclose' , ( ) => {
console. info ( "dataConsumer transportclose: " , dataConsumer . id ) ;
// dataConsumer.on("open", () => {
// console.info("数据消费者打开", dataConsumer.id);
// });
dataConsumer . observer . on ( "open" , ( ) => {
console . info ( "数据消费者打开" , dataConsumer . id ) ;
} ) ;
dataConsumer . on ( "transportclose" , ( ) => {
console . debug ( "数据消费者关闭(通道关闭)" , dataConsumer . id , streamId ) ;
dataConsumer . close ( ) ;
} ) ;
// TODO: 绑定remoteclient
dataConsumer . on ( 'open' , ( ) => {
console . info ( "dataConsumer open: " , dataConsumer . id ) ;
window . dataConsumer = dataConsumer ;
} ) ;
dataConsumer . on ( 'close' , ( ) => {
// dataConsumer.observer.on("close", fn())
dataConsumer . on ( "close" , ( ) => {
if ( me . dataConsumers . delete ( dataConsumer . id ) ) {
console . info ( "dataConsumer close: " , dataConsumer . id ) ;
me . push (
taoyaoProtocol . buildMessage ( "media::data::consumer::close" , {
roomId : roomId ,
consumerId : dataConsumer . id ,
} )
) ;
console . debug ( "数据消费者关闭 " , dataConsumer . id , streamId );
} else {
console . info ( "dataConsumer close non: " , dataConsumer . id ) ;
console . debug ( "数据消费者关闭(无效) " , dataConsumer . id , streamId );
}
} ) ;
dataConsumer . on ( ' error' , ( error ) => {
console . error ( "dataConsumer error: " , dataConsumer . id , error ) ;
dataConsumer . on ( " error" , ( error ) => {
console . error ( "数据消费者异常 " , dataConsumer . id , streamId , error ) ;
dataConsumer . close ( ) ;
} ) ;
// dataConsumer.on("bufferedamountlow", fn(bufferedAmount));
// dataConsumer.on("sctpsendbufferfull", fn());
dataConsumer . on ( 'message' , ( message , ppid ) => {
console . info ( "dataConsume message: " , dataConsumer . id , message ) ;
dataConsumer. on ( "message" , ( message , ppid ) => {
if ( ppid === 51 ) {
console . lo g( "文本" , message . toString ( "utf -8" ) ) ;
console . debu g( "数据消费者消息" , dataConsumer . id , streamId , message . toString ( "UTF -8" ) , ppid );
} else if ( ppid === 53 ) {
console . lo g( "二进制" ) ;
console . debu g( "数据消费者消息" , dataConsumer . id , streamId , message , ppid ) ;
} else {
console . debug ( "数据消费者消息" , dataConsumer . id , streamId , message , ppid ) ;
}
} ) ;
// dataConsumer.on("bufferedamountlow", fn(bufferedAmount));
// dataConsumer.on("sctpsendbufferfull", fn());
} catch ( error ) {
console . error ( "打开数据消费者异常" , error ) ;
}
}
// TODO: continue
/**
* 平台异常信令
*
@@ -1875,6 +1959,27 @@ class Taoyao extends RemoteClient {
// TODO: remoteclient.close
console . info ( "终端离开:" , clientId ) ;
}
/**
* 媒体回调
*
* @param {*} clientId 终端ID
* @param {*} track 媒体轨道
*/
callbackTrack ( clientId , track ) {
const me = this ;
const trackMessage = protocol . buildMessage (
"media::track" ,
{
clientId ,
track ,
} ,
) ;
trackMessage . code = "0000" ;
trackMessage . message = "媒体回调" ;
me . callback ( trackMessage ) ;
}
/**
* 错误回调
*/
@@ -1892,9 +1997,8 @@ class Taoyao extends RemoteClient {
const errorMessage = protocol . buildMessage (
"platform::error" ,
{ message } ,
- 9999
) ;
errorMessage . code = "- 9999" ;
errorMessage . code = "9999" ;
errorMessage . message = message ;
self . callback ( errorMessage , error ) ;
}
@@ -1996,7 +2100,11 @@ class Taoyao extends RemoteClient {
} ,
sctpParameters ,
proprietaryConstraints : {
optional : [ { googDscp : true } ] ,
optional : [ {
googDscp : true ,
// googIPv6 : true,
// DtlsSrtpKeyAgreement: true,
} ] ,
} ,
} ) ;
self . sendTransport . on (
@@ -2085,7 +2193,11 @@ class Taoyao extends RemoteClient {
} ,
sctpParameters ,
proprietaryConstraints : {
optional : [ { googDscp : true } ] ,
optional : [ {
googDscp : true ,
// googIPv6 : true,
// DtlsSrtpKeyAgreement: true,
} ] ,
} ,
} ) ;
self . recvTransport . on (
@@ -2134,6 +2246,9 @@ class Taoyao extends RemoteClient {
let track = await self . getAudioTrack ( ) ;
this . audioProducer = await this . sendTransport . produce ( {
track ,
appData : {
videoSource : this . videoSource
} ,
codecOptions : {
opusStereo : 1 ,
opusDtx : 1 ,
@@ -2150,24 +2265,24 @@ class Taoyao extends RemoteClient {
// codec : this._mediasoupDevice.rtpCapabilities.codecs
// .find((codec) => codec.mimeType.toLowerCase() === 'audio/pcma')
} ) ;
self . callbackTrack ( self . clientId , track ) ;
if ( self . proxy && self . proxy . media ) {
self . audioTrack = track ;
self . proxy . media ( track , this . audioProducer ) ;
} else {
console . warn ( "终端没有实现服务代理:" , self ) ;
}
// TODO: 加密解密
// if (this._e2eKey && e2e.isSupported()) {
// e2e.setupSenderTransform(this._micProducer.rtpSender);
// }
this . audioProducer . on ( "transportclose" , ( ) => {
this . audioProducer = null ;
console . debug ( "关闭音频生产者(通道关闭)" , this . audioProducer . id ) ;
this . audioProducer . close ( ) ;
} ) ;
this . audioProducer . on ( "trackended" , ( ) => {
console . warn ( "audio producer trackended " , this . audioProducer ) ;
this . closeA udioProducer( ) . catch ( ( ) => { } ) ;
console . debug ( "关闭音频生产者(媒体结束) " , this . audioProducer . id );
this . a udioProducer. close ( ) ;
} ) ;
this . audioProducer . observer . on ( "close" , ( ) => {
console . debug ( "关闭音频生产者" , this . audioProducer . id ) ;
this . audioProducer = null ;
} ) ;
} catch ( error ) {
self . callbackError ( "麦克风打开异常" , error ) ;
@@ -2181,20 +2296,7 @@ class Taoyao extends RemoteClient {
}
async closeAudioProducer ( ) {
console . debug ( "closeA udioProducer()" ) ;
if ( ! this . audioProducer ) {
return ;
}
try {
await this . request (
protocol . buildMessage ( "media::producer::close" , {
roomId : this . roomId ,
producerId : this . audioProducer . id ,
} )
) ;
} catch ( error ) {
console . error ( "关闭麦克风异常" , error ) ;
}
this . mediaProducerClose ( this . a udioProducer. id ) ;
}
async pauseAudioProducer ( ) {
@@ -2273,22 +2375,24 @@ class Taoyao extends RemoteClient {
track ,
encodings ,
codecOptions ,
appData : {
videoSource : this . videoSource
} ,
} ) ;
self . callbackTrack ( self . clientId , track ) ;
if ( self . proxy && self . proxy . media ) {
self . videoTrack = track ;
self . proxy . media ( track , this . videoProducer ) ;
} else {
console . warn ( "终端没有实现服务代理:" , self ) ;
}
// if (this._e2eKey && e2e.isSupported()) {
// e2e.setupSenderTransform(this.videoProducer.rtpSender);
// }
this . videoProducer . on ( "transportclose" , ( ) => {
this . videoProducer = null ;
console . debug ( "关闭视频生产者(通道关闭)" , this . videoProducer . id ) ;
this . videoProducer . close ( ) ;
} ) ;
this . videoProducer . on ( "trackended" , ( ) => {
console . warn ( "video producer trackended " , this . videoProducer ) ;
this . closeV ideoProducer( ) . catch ( ( ) => { } ) ;
console . debug ( "关闭视频生产者 " , this . videoProducer . id );
this . v ideoProducer = null ;
} ) ;
} catch ( error ) {
self . callbackError ( "摄像头打开异常" , error ) ;
@@ -2301,6 +2405,20 @@ class Taoyao extends RemoteClient {
}
}
async closeVideoProducer ( ) {
this . mediaProducerClose ( this . videoProducer . id ) ;
}
async pauseVideoProducer ( ) {
console . debug ( "关闭摄像头" ) ;
this . mediaProducerPause ( this . videoProducer . id ) ;
}
async resumeVideoProducer ( ) {
console . debug ( "恢复摄像头" ) ;
this . mediaProducerResume ( this . videoProducer . id ) ;
}
/**
* 生产数据
*/
@@ -2315,7 +2433,6 @@ class Taoyao extends RemoteClient {
me . dataProducer = dataProducer ;
me . dataProducer . on ( "open" , ( ) => {
console . debug ( "dataProducer open: " , me . dataProducer . id ) ;
window . dataProducer = me . dataProducer ;
} ) ;
me . dataProducer . on ( "close" , ( ) => {
console . debug ( "dataProducer close: " , me . dataProducer . id ) ;
@@ -2334,6 +2451,10 @@ class Taoyao extends RemoteClient {
}
}
async closeDataProducer ( ) {
this . mediaDataProducerClose ( this . dataProducer . id ) ;
}
/**
* 通过数据生产者发送数据
*
@@ -2348,33 +2469,6 @@ class Taoyao extends RemoteClient {
me . dataProducer . send ( data ) ;
}
async closeVideoProducer ( ) {
console . debug ( "disableWebcam()" ) ;
if ( ! this . videoProducer ) {
return ;
}
try {
await this . request (
protocol . buildMessage ( "media::producer::close" , {
roomId : this . roomId ,
producerId : this . videoProducer . id ,
} )
) ;
} catch ( error ) {
console . error ( error ) ;
}
}
async pauseVideoProducer ( ) {
console . debug ( "关闭摄像头" ) ;
this . mediaProducerPause ( this . videoProducer . id ) ;
}
async resumeVideoProducer ( ) {
console . debug ( "恢复摄像头" ) ;
this . mediaProducerResume ( this . videoProducer . id ) ;
}
/**
* 切换视频来源
*/
@@ -2406,6 +2500,7 @@ class Taoyao extends RemoteClient {
try {
const track = await me . getVideoTrack ( ) ;
await this . videoProducer . replaceTrack ( { track } ) ;
me . callbackTrack ( me . clientId , track ) ;
me . proxy . media ( track , this . videoProducer ) ;
} catch ( error ) {
console . error ( "changeWebcam() | failed: %o" , error ) ;
@@ -2625,6 +2720,7 @@ class Taoyao extends RemoteClient {
} else {
}
if ( session . proxy && session . proxy . media ) {
me . callbackTrack ( session . clientId , track ) ;
session . proxy . media ( track ) ;
}
} ;