SRS之RTMP推拉流分析
SRS是一个简单高效的实时视频服务器,支持RTMPWebRTCHLSHTTPFLVSRTGB28181;本文以SRS4。0版本进行分析RTMP推拉流架构,SRS整体架构如下图(官网图片)所示:
有图可知SRS支持多种客户端以不同的媒流体协议进行推流、拉流,内部还包括了不同协议的转换,同时还支持SRS的集群。推荐视频:SRS4。0RTMP推流读取数据,拉流转发数据
本文主要分析在SRS中RTMP的推流、拉流源码分析,其核心类如下:SrsServerSRS流媒体服务
SrsBufferListener监听器,主要是TCP的监听SrsTcpListenerTCP监听器SrsRtmpConnRTMP连接,
对应了SrsStSocket和SrsCoroutineSrsRtmpServer提供与客户端之间的RTMP命令协议消息的交互服务,使SrsRtmpConn提供的socket读写数据SrsLiveSource描述路播放源,包括推流和拉流的描述SrsLiveConsumer拉流消费者,每路拉流客户端对应个SrsLiveConsumerSrsStSocket经过封装的socket接SrsRecvThread负责接收数据,但是要注意的是他这并不是从IO读取数据,从SrsRtmpServer类拉取数据,然后推送到SrsPublishRecvThread(推流),或者SrsQueueRecvThread(拉流)SrsQueueRecvThread主要于拉流,对应的是客户端服务器的控制消息,和视频消息没有关系。客户端读取数据还是从consumer的queue去读取。
SrsPublishRecvThread主要于推流,内部封装了协程
RTMP推拉流代码流程如下:
C音视频学习资料免费获取方法:关注音视频开发T哥,点击链接即可免费获取2023年最新C音视频开发进阶独家免费学习大礼包!
SRS网络模型分析
在主函数runhybridserver中开始于srshybridrun()轮询,通过流体服务SrsServer::listen()进入服务端监听,这里分别对不同的协议进行了不同的监听处理,代码如下:srserrortSrsServer::listen(){srserrorterrsrssuccess;rtmp的listenif((errlistenrtmp())!srssuccess){returnsrserrorwrap(err,rtmplisten);}if((errlistenhttpapi())!srssuccess){returnsrserrorwrap(err,httpapilisten);}if((errlistenhttpsapi())!srssuccess){returnsrserrorwrap(err,httpsapilisten);}if((errlistenhttpstream())!srssuccess){returnsrserrorwrap(err,httpstreamlisten);}if((errlistenhttpsstream())!srssuccess){returnsrserrorwrap(err,httpsstreamlisten);}if((errlistenstreamcaster())!srssuccess){returnsrserrorwrap(err,streamcasterlisten);}if((errconnmanagerstart())!srssuccess){returnsrserrorwrap(err,connectionmanager);}returnerr;}
进入RTMP对应的listen,这里主要通过SrsBufferListener进一步封装了listen,包括httpapi、httpsapi的监听都是用SrsBufferListener统一的封装类;
C音视频学习资料免费获取方法:关注音视频开发T哥,点击链接即可免费获取2023年最新C音视频开发进阶独家免费学习大礼包!
srserrortSrsBufferListener::listen(stringi,intp){srserrorterrsrssuccess;ipi;portp;srsfreep(listener);listenernewSrsTcpListener(this,ip,port);new一个SrsTcpListener对象,传一个指针if((errlistenerlisten())!srssuccess){returnsrserrorwrap(err,bufferedtcplisten);}stringvsrslistenertype2string(type);srstrace(slistenattcp:s:d,fdd,v。cstr(),ip。cstr(),port,listenerfd());returnerr;}
在newSrsTcpListener时传入了this,其实是在构造的时候给handler赋值,继续进入SrsTcpListener::listen()每一个监听,对应一个协程srserrortSrsTcpListener::listen(){srserrorterrsrssuccess;rtmp使用的是tcp,开始listenif((errsrstcplisten(ip,port,lfd))!srssuccess){returnsrserrorwrap(err,listenats:d,ip。cstr(),port);}srsfreep(trd);trdnewSrsSTCoroutine(tcp,this);创建一个协程,传一个用户(SrsTcpListener)指针,如果协程需要回调,可以通过指针找到对应的对象if((errtrdstart())!srssuccess){启动协程,执行SrsSTCoroutine::cycle(),即handlecycle(),最终是SrsTcpListener::cycle()returnsrserrorwrap(err,startcoroutine);}returnerr;}
启动协程进行监听,执行cycle(),代码如下:srserrortSrsTcpListener::cycle(){srserrorterrsrssuccess;while(true){if((errtrdpull())!srssuccess){读取错误码,判断协程是否结束,不为srssuccess时,说明该协程要退出returnsrserrorwrap(err,tcplistener);}srsnetfdtfdsrsaccept(lfd,NULL,NULL,SRSUTIMENOTIMEOUT);检测新连接if(fdNULL){returnsrserrornew(ERRORSOCKETACCEPT,acceptatfdd,srsnetfdfileno(lfd));}if((errsrsfdcloseexec(srsnetfdfileno(fd)))!srssuccess){returnsrserrorwrap(err,setcloseexec);}if((errhandlerontcpclient(fd))!srssuccess){handle就是new一个SrsTcpListener对象时,传入的ISrsTcpHandler指针,即SrsBufferListener(SrsBufferListener继承了ISrsTcpHandler)returnsrserrorwrap(err,handlefdd,srsnetfdfileno(fd));}}returnerr;}
这里的ontcpclient实际执行的就是构造函数时传入this,即SrsBufferListener的成员函数,代码如下:监听新的连接srserrortSrsBufferListener::ontcpclient(srsnetfdtstfd){srserrorterrserveracceptclient(type,stfd);if(err!srssuccess){srswarn(acceptclientfailed,erriss,srserrordesc(err)。cstr());srsfreep(err);}returnsrssuccess;}
进入acceptclient代码如下:type传递了对应的连接类型srserrortSrsServer::acceptclient(SrsListenerTypetype,srsnetfdtstfd){srserrorterrsrssuccess;ISrsStartableConnecitonconnNULL;将fd和一个conn绑定,并返回一个连接connif((errfdtoresource(type,stfd,conn))!srssuccess){if(srserrorcode(err)ERRORSOCKETGETPEERIPsrsconfigemptyipok()){srsclosestfd(stfd);srserrorreset(err);returnsrssuccess;}returnsrserrorwrap(err,fdtoresource);}srsassert(conn);directlyenqueue,thecyclethreadwillremovetheclient。connmanageradd(conn);把连接添加到connmanager进行管理启动类型对应的协程,比如启动rtmp连接对应的协程,每个SrsRtmpConn都有1:1对应的协程if((errconnstart())!srssuccess){returnsrserrorwrap(err,startconncoroutine);}returnerr;}
此处首先将fd和一个conn绑定,并返回一个连接conn,代码如下:srserrortSrsServer::fdtoresource(SrsListenerTypetype,srsnetfdtstfd,ISrsStartableConnecitonpr){srserrorterrsrssuccess;intfdsrsnetfdfileno(stfd);stringipsrsgetpeerip(fd);intportsrsgetpeerport(fd);。。。。。。。。。。最大连接数判断处理。。。。。。。。。。Thecontextidmaychangeduringcreatingthebellowobjects。SrsContextRestore(srscontextgetid());new一个类型对应的连接if(typeSrsListenerRtmpStream){prnewSrsRtmpConn(this,stfd,ip,port);}elseif(typeSrsListenerHttpApi){prnewSrsHttpApi(false,this,stfd,httpapimux,ip,port);}elseif(typeSrsListenerHttpsApi){prnewSrsHttpApi(true,this,stfd,httpapimux,ip,port);}elseif(typeSrsListenerHttpStream){prnewSrsResponseOnlyHttpConn(false,this,stfd,httpserver,ip,port);}elseif(typeSrsListenerHttpsStream){prnewSrsResponseOnlyHttpConn(true,this,stfd,httpserver,ip,port);}else{srswarn(closefornoservicehandler。fdd,ips:d,fd,ip。cstr(),port);srsclosestfd(stfd);returnerr;}returnerr;}
其次时将连接conn添加到connmanager进行管理,最后connstart()启动协程进行接收发送数据的处理,这里每一个SrsRtmpConn连接都有1:1对应SrsCoroutine协程,启动后进入SrsRtmpConn::docycle()轮询,代码如下:rtmp接收数据处理srserrortSrsRtmpConn::docycle(){srserrorterrsrssuccess;srstrace(RTMPclientips:d,fdd,ip。cstr(),port,srsnetfdfileno(stfd));设置收发超时时间rtmpsetrecvtimeout(SRSCONSTSRTMPTIMEOUT);rtmpsetsendtimeout(SRSCONSTSRTMPTIMEOUT);rtmp握手if((errrtmphandshake())!srssuccess){returnsrserrorwrap(err,rtmphandshake);}rtmp代理相关uint32triprtmpproxyrealip();if(rip0){srstrace(RTMPproxyrealclientipd。d。d。d,uint8t(rip24),uint8t(rip16),uint8t(rip8),uint8t(rip));}SrsRequestreqinforeq;if((errrtmpconnectapp(req))!srssuccess){握手成功后,处理client发送的connectreturnsrserrorwrap(err,rtmpconnecttcUrl);}setclientiptorequest。reqipip;保存客户端IPsrstrace(connectapp,tcUrls,pageUrls,swfUrls,schemas,vhosts,portd,apps,argss,reqtcUrl。cstr(),reqpageUrl。cstr(),reqswfUrl。cstr(),reqschema。cstr(),reqvhost。cstr(),reqport,reqapp。cstr(),(reqargs?(obj):null));showclientidentityif(reqargs){std::stringsrsversion;std::stringsrsserverip;intsrspid0;intsrsid0;SrsAmf0AnypropNULL;if((propreqargsensurepropertystring(srsversion))!NULL){srsversionproptostr();}if((propreqargsensurepropertystring(srsserverip))!NULL){srsserveripproptostr();}if((propreqargsensurepropertynumber(srspid))!NULL){srspid(int)proptonumber();}if((propreqargsensurepropertynumber(srsid))!NULL){srsid(int)proptonumber();}if(srspid0){srstrace(edgesrsips,versions,pidd,idd,srsserverip。cstr(),srsversion。cstr(),srspid,srsid);}}if((errservicecycle())!srssuccess){errsrserrorwrap(err,servicecycle);}srserrortr0srssuccess;if((r0ondisconnect())!srssuccess){errsrserrorwrap(err,ondisconnects,srserrordesc(r0)。cstr());srsfreep(r0);}Ifclientisredirecttootherservers,wealreadyloggedtheevent。if(srserrorcode(err)ERRORCONTROLREDIRECT){srserrorreset(err);}returnerr;}
开始进行RTMP正常的握手交互过程、设置收发超时、rtmp代理,握手成功(处理client发送的connect请求);进入servicecycle(),继续数据交互,设置窗口大小、带宽大小、chunk大小、连接成功响应客户端。{srserrorterrsrssuccess;SrsRequestreqinforeq;窗口大小设置intoutacksizesrsconfiggetoutacksize(reqvhost);if(outacksize(errrtmpsetwindowacksize(outacksize))!srssuccess){returnsrserrorwrap(err,rtmp:setoutwindowacksize);}intinacksizesrsconfiggetinacksize(reqvhost);if(inacksize(errrtmpsetinwindowacksize(inacksize))!srssuccess){returnsrserrorwrap(err,rtmp:setinwindowacksize);}带宽设置if((errrtmpsetpeerbandwidth((int)(2。510001000),2))!srssuccess){returnsrserrorwrap(err,rtmp:setpeerbandwidth);}gettheipwhichclientconnected。std::stringlocalipsrsgetlocalip(srsnetfdfileno(stfd));dobandwidthtestifconnecttothevhostwhichisforbandwidthcheck。if(srsconfiggetbwcheckenabled(reqvhost)){if((errbandwidthbandwidthcheck(rtmp,skt,req,localip))!srssuccess){returnsrserrorwrap(err,rtmp:bandwidthcheck);}returnerr;}setchunksizetolarger。setthechunksizebeforeanylargerresponsegreaterthan128,tomakeOBShappy,seehttps:github。comossrssrsissues454intchunksizesrsconfiggetchunksize(reqvhost);从配置文件读取chunksize大小,进行设置,一般设置60k,如果太小就得拆分if((errrtmpsetchunksize(chunksize))!srssuccess){returnsrserrorwrap(err,rtmp:setchunksized,chunksize);}responsetheclientconnectok。if((errrtmpresponseconnectapp(req,localip。cstr()))!srssuccess){连接成功,响应客户端returnsrserrorwrap(err,rtmp:responseconnectapp);}if((errrtmponbwdone())!srssuccess){returnsrserrorwrap(err,rtmp:onbwdown);}真正的循环while(true){if((errtrdpull())!srssuccess){returnsrserrorwrap(err,rtmp:threadquit);}errstreamservicecycle();。。。。。。。。。。。。。。。。。。}returnerr;}
来到streamservicecycle(),才是真正推流、拉流处理,值得注意的是,还对cachegop是否开启的设置。srserrortSrsRtmpConn::streamservicecycle(){srserrorterrsrssuccess;。。。。。。。。。。。。findasourcetoserve。SrsLiveSourcesourceNULL;一个直播对应一个SrsLiveSource,一个推流,0~N个拉流if((errsrssourcesfetchorcreate(req,server,source))!srssuccess){查找创建一个sourcereturnsrserrorwrap(err,rtmp:fetchsource);}srsassert(source!NULL);读取配置文件,设置是否需要cachegopboolenabledcachesrsconfiggetgopcache(reqvhost);默认是开的srstrace(sourceurls,ips,cached,isedged,sourceidss,reqgetstreamurl()。cstr(),ip。cstr(),enabledcache,infoedge,sourcesourceid()。cstr(),sourcepresourceid()。cstr());sourcesetcache(enabledcache);设置推流、拉流处理switch(infotype){caseSrsRtmpConnPlay:{responseconnectionstartplayif((errrtmpstartplay(inforesstreamid))!srssuccess){returnsrserrorwrap(err,rtmp:startplay);}if((errhttphooksonplay())!srssuccess){returnsrserrorwrap(err,rtmp:callbackonplay);}拉流errplaying(source);httphooksonstop();returnerr;}caseSrsRtmpConnFMLEPublish:{RTMP基本走这里if((errrtmpstartfmlepublish(inforesstreamid))!srssuccess){接收客户端相应的消息,并返回对应的响应returnsrserrorwrap(err,rtmp:startFMLEpublish);}returnpublishing(source);}caseSrsRtmpConnHaivisionPublish:{if((errrtmpstarthaivisionpublish(inforesstreamid))!srssuccess){returnsrserrorwrap(err,rtmp:startHAIVISIONpublish);}returnpublishing(source);}caseSrsRtmpConnFlashPublish:{if((errrtmpstartflashpublish(inforesstreamid))!srssuccess){returnsrserrorwrap(err,rtmp:startFLASHpublish);}returnpublishing(source);}default:{returnsrserrornew(ERRORSYSTEMCLIENTINVALID,rtmp:unknownclienttyped,infotype);}}returnerr;}
推流流程
推流流程主要是dopublishing,需要注意的是使用SrsPublishRecvThread封装好的协程与拉流使用的SrsQueueRecvThread区分开来,其代码如下:推流流程srserrortSrsRtmpConn::publishing(SrsLiveSourcesource){srserrorterrsrssuccess;SrsRequestreqinforeq;。。。。。。。。。。。。。。TODO:FIXME:Shouldrefinethestateofpublishing。if((erracquirepublish(source))srssuccess){协程实际是SrsPublishRecvThread内部封装的SrsRecvThread的SrsCoroutine成员变量trd,主要看docycle()的流程参数:rtmp:在协程中有一些rtmp接收数据的处理,req:URL相关,SrsPublishRecvThreadrtrd(rtmp,req,srsnetfdfileno(stfd),0,this,source,srscontextgetid());errdopublishing(source,rtrd);实际推流流程,source就是直播对应的那个sourcertrd。stop();}。。。。。。。。。。。returnerr;}srserrortSrsRtmpConn::dopublishing(SrsLiveSourcesource,SrsPublishRecvThreadrtrd){srserrorterrsrssuccess;SrsRequestreqinforeq;SrsPithyPrintpprintSrsPithyPrint::creatertmppublish();SrsAutoFree(SrsPithyPrint,pprint);updatethestatisticwhensourcedisconveried。SrsStatisticstatSrsStatistic::instance();if((errstatonclient(srscontextgetid()。cstr(),req,this,infotype))!srssuccess){returnsrserrorwrap(err,rtmp:statclient);}startisolaterecvthread。TODO:FIXME:Passthecallbackhere。if((errrtrdstart())!srssuccess){启动协程,SrsRecvThread::docycle()轮询读取数据returnsrserrorwrap(err,rtmp:receivethread);}initializethepublishtimeout。publish1stpkttimeoutsrsconfiggetpublish1stpkttimeout(reqvhost);publishnormaltimeoutsrsconfiggetpublishnormaltimeout(reqvhost);setthesockoptions。setsockoptions();if(true){boolmrsrsconfiggetmrenabled(reqvhost);srsutimetmrsleepsrsconfiggetmrsleep(reqvhost);srstrace(startpublishmrdd,p1stptd,pntd,tcpnodelayd,mr,srsu2msi(mrsleep),srsu2msi(publish1stpkttimeout),srsu2msi(publishnormaltimeout),tcpnodelay);}int64tnbmsgs0;uint64tnbframes0;while(true){if((errtrdpull())!srssuccess){returnsrserrorwrap(err,rtmp:threadquit);}pprintelapse();condwaitfortimeout。if(nbmsgs0){whennotgotmsgs,waitforalargertimeout。seehttps:github。comossrssrsissues441rtrdwait(publish1stpkttimeout);}else{rtrdwait(publishnormaltimeout);}checkthethreaderrorcode。if((errrtrderrorcode())!srssuccess){returnsrserrorwrap(err,rtmp:receivethread);}whennotgotanymessages,timeout。超时处理if(rtrdnbmsgs()nbmsgs){returnsrserrornew(ERRORSOCKETTIMEOUT,rtmp:publishtimeoutdms,nbmsgsd,nbmsgs?srsu2msi(publishnormaltimeout):srsu2msi(publish1stpkttimeout),(int)nbmsgs);}nbmsgsrtrdnbmsgs();收到消息数量Updatethestatforvideofps。remarkhttps:github。comossrssrsissues851SrsStatisticstatSrsStatistic::instance();if((errstatonvideoframes(req,(int)(rtrdnbvideoframes()nbframes)))!srssuccess){returnsrserrorwrap(err,rtmp:statvideoframes);}nbframesrtrdnbvideoframes();视频帧数量reportableif(pprintcanprint()){kbpssample();boolmrsrsconfiggetmrenabled(reqvhost);srsutimetmrsleepsrsconfiggetmrsleep(reqvhost);srstrace(SRSCONSTSLOGCLIENTPUBLISHtimed,okbpsd,d,d,ikbpsd,d,d,mrdd,p1stptd,pntd,(int)pprintage(),kbpsgetsendkbps(),kbpsgetsendkbps30s(),kbpsgetsendkbps5m(),kbpsgetrecvkbps(),kbpsgetrecvkbps30s(),kbpsgetrecvkbps5m(),mr,srsu2msi(mrsleep),srsu2msi(publish1stpkttimeout),srsu2msi(publishnormaltimeout));码率的计算,s,30s,5min的码率}}returnerr;}
看看SrsPublishRecvThread的成员SrsRecvThreadtrd的docycle()的处理,这里主要是rtmprecvmessage(msg)接收消息,pumperconsume(msg)把消息推送给消费者。srserrortSrsRecvThread::docycle(){srserrorterrsrssuccess;while(true){if((errtrdpull())!srssuccess){returnsrserrorwrap(err,recvthread);}Whenthepumperisinterrupted,waitthenretry。if(pumperinterrupted()){srsusleep(timeout);continue;}SrsCommonMessagemsgNULL;Processthereceivedmessage。处理收到的消息,rtmp由SrsPublishRecvThread的构造函数传进来if((errrtmprecvmessage(msg))srssuccess){errpumperconsume(msg);推送给消费者,pumper也是从SrsPublishRecvThread的SrsRecvThread成员变量trd的构造函数传进来的}if(err!srssuccess){Interruptthereceivethreadforanyerror。trdinterrupt();Notifythepumpertoquitforerror。pumperinterrupt(err);returnsrserrorwrap(err,recvthread);}}returnerr;}
consume内部进行消息数量、视频帧数量的统计,然后connhandlepublishmessage(source,msg)对消息的处理,最终执行函数processpublishmessage()。audio、video、metaData处理srserrortSrsRtmpConn::processpublishmessage(SrsLiveSourcesource,SrsCommonMessagemsg){srserrorterrsrssuccess;foredge,directlyproxymessagetoorigin。if(infoedge){if((errsourceonedgeproxypublish(msg))!srssuccess){returnsrserrorwrap(err,rtmp:proxypublish);}returnerr;}processaudiopacketRTMPMSGAudioMessage8if(msgheader。isaudio()){if((errsourceonaudio(msg))!srssuccess){audio的处理returnsrserrorwrap(err,rtmp:consumeaudio);}returnerr;}processvideopacketRTMPMSGVideoMessage9if(msgheader。isvideo()){if((errsourceonvideo(msg))!srssuccess){video处理returnsrserrorwrap(err,rtmp:consumevideo);}returnerr;}processaggregatepacketif(msgheader。isaggregate()){if((errsourceonaggregate(msg))!srssuccess){returnsrserrorwrap(err,rtmp:consumeaggregate);}returnerr;}processonMetaDataMetaData处理RTMPMSGAMF0DataMessage18或RTMPMSGAMF3DataMessage15if(msgheader。isamf0data()msgheader。isamf3data()){SrsPacketpktNULL;if((errrtmpdecodemessage(msg,pkt))!srssuccess){returnsrserrorwrap(err,rtmp:decodemessage);}SrsAutoFree(SrsPacket,pkt);if(dynamiccastSrsOnMetaDataPacket(pkt)){SrsOnMetaDataPacketmetadatadynamiccastSrsOnMetaDataPacket(pkt);将packet转成metaDataif((errsourceonmetadata(msg,metadata))!srssuccess){returnsrserrorwrap(err,rtmp:consumemetadata);}returnerr;}returnerr;}returnerr;}
processpublishmessage对音频、视频、metaData进行处理;先看看音频处理,把msg发送给每一个拉流端消费者,这里的consumers容器保存所有拉流端消费者,在拉流流程中,新建消费者时添加的。音频数据处理srserrortSrsLiveSource::onaudio(SrsCommonMessagesharedaudio){srserrorterrsrssuccess;。。。。。。。。。。。。。。。。。convertsharedaudiotomsg,usershouldnotusesharedaudioagain。通过引用计数的方式,创建一个消息SrsSharedPtrMessagemsg;类似智能指针,数据拷贝实际上是浅拷贝,通过引用计数方式,为0释放内存if((errmsg。create(sharedaudio))!srssuccess){returnsrserrorwrap(err,createmessage);}directlyprocesstheaudiomessage。if(!mixcorrect){默认不做校正,就直接处理,就是不用放到map进行排序returnonaudioimp(msg);}insertmsgtothequeue。mixqueuepush(msg。copy());把流消息都插入到队列中,内部并按时间戳做了排序fetchsomeonefrommixqueue。从map中取出来SrsSharedPtrMessagemmixqueuepop();pop时间戳最小的出来if(!m){returnerr;}consumethemonotonicallyincreasemessage。if(misaudio()){erronaudioimp(m);}else{erronvideoimp(m);}srsfreep(m);returnerr;}srserrortSrsLiveSource::onaudioimp(SrsSharedPtrMessagemsg){srserrorterrsrssuccess;。。。。。。。。。。。。。。。。。。。。。。。。。。。。copytoallconsumer把msg拷贝到消费者对象的队列中,即把数据发给每个拉流端if(!dropforreduce){for(inti0;i(int)consumers。size();i){SrsLiveConsumerconsumerconsumers。at(i);if((errconsumerenqueue(msg,atc,jitteralgorithm))!srssuccess){把消息放到消费者队列returnsrserrorwrap(err,consumemessage);}}}cachethesequenceheaderofaac,orfirstpacketofmp3。forexample,themp3isusedforhlstowritetherightaudiocodec。TODO:FIXME:torefinethestreaminfosystem。if(isaacsequenceheader!metaash()){if((errmetaupdateash(msg))!srssuccess){更新audiosequencereturnsrserrorwrap(err,metaconsumeaudio);}}whensequenceheader,donotpushtogopcacheandadjustthetimestamp。if(issequenceheader){returnerr;}cachethelastgoppacketsif((errgopcachecache(msg))!srssuccess){returnsrserrorwrap(err,gopcacheconsumeaudio);}。。。。。。。。。。。。。returnerr;}
类似的视频处理,如下:srserrortSrsLiveSource::onvideo(SrsCommonMessagesharedvideo){srserrorterrsrssuccess;。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。convertsharedvideotomsg,usershouldnotusesharedvideoagain。thepayloadistransfertomsg,andsettoNULLinsharedvideo。SrsSharedPtrMessagemsg;智能指针的封装if((errmsg。create(sharedvideo))!srssuccess){returnsrserrorwrap(err,createmessage);}directlyprocessthevideomessage。if(!mixcorrect){returnonvideoimp(msg);}insertmsgtothequeue。mixqueuepush(msg。copy());把流消息都插入到队列中,内部并按时间戳做了排序fetchsomeonefrommixqueue。SrsSharedPtrMessagemmixqueuepop();pop时间戳最小的消息出来if(!m){returnerr;}consumethemonotonicallyincreasemessage。if(misaudio()){erronaudioimp(m);}else{erronvideoimp(m);}srsfreep(m);returnerr;}srserrortSrsLiveSource::onvideoimp(SrsSharedPtrMessagemsg){srserrorterrsrssuccess;。。。。。。。。。。。。。。。。。。。。。cachethesequenceheaderifh264donotcachethesequenceheadertogopcache,returnhere。if(issequenceheader(errmetaupdatevsh(msg))!srssuccess){更新videosequencereturnsrserrorwrap(err,metaupdatevideo);}Copytohubtoallutilities。if((errhubonvideo(msg,issequenceheader))!srssuccess){returnsrserrorwrap(err,hubconsumevideo);}Forbridgertoconsumethemessage。if(bridger(errbridgeronvideo(msg))!srssuccess){returnsrserrorwrap(err,bridgerconsumevideo);}copytoallconsumer把数据发给拉流端的消费者(队列中)if(!dropforreduce){for(inti0;i(int)consumers。size();i){SrsLiveConsumerconsumerconsumers。at(i);if((errconsumerenqueue(msg,atc,jitteralgorithm))!srssuccess){把消息放到消费者队列中returnsrserrorwrap(err,consumevideo);}}}whensequenceheader,donotpushtogopcacheandadjustthetimestamp。if(issequenceheader){returnerr;}cachethelastgoppacketscachegop如果是I帧,就会清空掉,重新push新的数据if((errgopcachecache(msg))!srssuccess){returnsrserrorwrap(err,gopcacheconsumevdieo);}。。。。。。。。。returnerr;}
metaData的处理如下:srserrortSrsLiveSource::onmetadata(SrsCommonMessagemsg,SrsOnMetaDataPacketmetadata){srserrorterrsrssuccess;。。。。。。。。。。。。。。Updatethemetacache。更新metaData保存起来boolupdatedfalse;if((errmetaupdatedata(msgheader,metadata,updated))!srssuccess){returnsrserrorwrap(err,updatemetadata);}if(!updated){returnerr;}whenalreadygotmetadata,dropwhenreducesequenceheader。booldropforreducefalse;if(metadata()srsconfiggetreducesequenceheader(reqvhost)){dropforreducetrue;srswarn(dropforreduceshmetadata,sized,msgsize);}copytoallconsumer把推流端发的metaData也插入消费队列中,便于拉流者知道if(!dropforreduce){std::vectorSrsLiveConsumer::iteratorit;for(itconsumers。begin();it!consumers。end();it){SrsLiveConsumerconsumerit;if((errconsumerenqueue(metadata(),atc,jitteralgorithm))!srssuccess){returnsrserrorwrap(err,consumemetadata);}}}Copytohubtoallutilities。returnhubonmetadata(metadata(),metadata);}
拉流流程
首先每一个拉流端都会绑定一个SrsConsumer消费者,每一个消费者对应一个SrsQueueRecvThread协程,执行doplayingsrserrortSrsRtmpConn::playing(SrsLiveSourcesource){srserrorterrsrssuccess;。。。。。。。。。。。。。。。。。。。。。。。。Createaconsumerofsource。SrsLiveConsumerconsumerNULL;消费者,每个拉流都会绑定一个SrsConsumerSrsAutoFree(SrsLiveConsumer,consumer);if((errsourcecreateconsumer(consumer))!srssuccess){returnsrserrorwrap(err,rtmp:createconsumer);}if((errsourceconsumerdumps(consumer))!srssuccess){returnsrserrorwrap(err,rtmp:dumpsconsumer);}每一个消费者独立一个协程SrsQueueRecvThreadtrd(consumer,rtmp,SRSPERFMWSLEEP,srscontextgetid());if((errtrd。start())!srssuccess){returnsrserrorwrap(err,rtmp:startreceivethread);}Deliverpacketstopeer。wakableconsumer;errdoplaying(source,consumer,trd);每个流source绑定一个消费者SrsConsumerwakableNULL;trd。stop();Dropallpacketsinreceivingthread。if(!trd。empty()){srswarn(dropthereceiveddmessages,trd。size());}returnerr;}srserrortSrsRtmpConn::doplaying(SrsLiveSourcesource,SrsLiveConsumerconsumer,SrsQueueRecvThreadrtrd){srserrorterrsrssuccess;。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。while(true){whensourceissettoexpired,disconnectit。if((errtrdpull())!srssuccess){判断协程是否退出returnsrserrorwrap(err,rtmp:threadquit);}collectelapseforpithyprint。pprintelapse();touseisolatethreadtorecv,canimproveabout33performance。while(!rtrdempty()){SrsCommonMessagemsgrtrdpump();if((errprocessplaycontrolmsg(consumer,msg))!srssuccess){播放控制处理returnsrserrorwrap(err,rtmp:playcontrolmessage);}}quitwhenrecvthreaderror。if((errrtrderrorcode())!srssuccess){returnsrserrorwrap(err,rtmp:recvthread);}ifdefSRSPERFQUEUECONDWAITwaitformessagetoincoming。seehttps:github。comossrssrsissues257consumerwait(mwmsgs,mwsleep);等数据累积一段时间攒一定数据,再发送endifgetmessagesfromconsumer。eachmsginmsgs。msgsmustbefree,fortheSrsMessageArrayneverfreethem。remarkwhenenablesendmininterval,onlyfetchonemessageatime。intcount(sendmininterval0)?1:0;if((errconsumerdumppackets(msgs,count))!srssuccess){从消费队列中一次读取出来,数据从SrsConsumerqueue来,实际是从source给过来的returnsrserrorwrap(err,rtmp:consumerdumppackets);}。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。sendoutmessages,allmessagesarefreedbysendandfreemessages()。noneedtoassertmsg,forthertmpwillassertit。if(count0(errrtmpsendandfreemessages(msgs。msgs,count,inforesstreamid))!srssuccess){发送数据,给到客户端,最终调用protocol封装好的socketapireturnsrserrorwrap(err,rtmp:senddmessages,count);}ifdurationspecified,andexceedit,stopplaylive。see:https:github。comossrssrsissues45if(userspecifieddurationtostop){if(durationreqduration){returnsrserrornew(ERRORRTMPDURATIONEXCEED,rtmp:timedupd,srsu2msi(duration),srsu2msi(reqduration));}}applytheminimalintervalfordeliverystreaminsrsutimet。if(sendmininterval0){srsusleep(sendmininterval);}Yieldtoanothercoroutines。seehttps:github。comossrssrsissues2194issuecomment777437476srsthreadyield();让出cpu,让其他协程继续运行}returnerr;}
doplaying内部processplaycontrolmsg播放控制处理,consumerdumppackets(msgs,count)从消费队列读取数据,最终rtmpsendandfreemessages(msgs。msgs,count,inforesstreamid)发送到play客户端。作者:MrJuJu
原文链接:https:www。cnblogs。comjujugop17039564。html
音视频开发流媒体程序员
期待!就在2月14日2月14日晚上,浩茫苍穹将出现由恒星和行星联手排列的三星一线奇趣天象。御夫座最明亮的恒星五车二距离地球最近的外行星火星和金牛座最明亮的恒星毕宿五,三星由北至南依次排成一条直线,十分
神舟十五号航天员乘组圆满完成首次出舱活动全部既定任务新华社北京2月10日电(记者李国利邓孟)记者从中国载人航天工程办公室获悉,2月10日0时16分,经过约7小时的出舱活动,神舟十五号航天员费俊龙邓清明张陆密切协同,圆满完成出舱活动全
Web安全头号大敌XSS漏洞解决最佳实践写在前面虽然今天是周末,但职位每日推荐不可少,万一有努力找工作的小伙伴因为我一天偷懒导致没找到满意工作那就是我的罪过了,今日职位已放置在文章末尾,请注意查看!引言XSS是目前最普遍
曝郎平或再度出山执教,球迷欢呼,中国女排有望重回巅峰众所周知,目前中国女排已经在宁波北仑训练基地开启了新一轮的国家队集训。毫无疑问,在过去的一年,中国女排在国际大赛当中所取得的成绩并不算太好,因此这一次的国家队集训,我们可以看到有不
墨菲打趣莺歌不打得有人站出来于是我拿出了我的莺歌技能包直播吧2月6日讯今日NBA常规赛,鹈鹕136104大胜国王。赛后,本场比赛创下个人得分赛季新高的鹈鹕球员特雷墨菲接受采访,墨菲打趣道今天英格拉姆不打,所以有人得站出来成为英格拉姆,
单欢欢终于走了,大连人4将出走,谢晖成光杆司令底线留住林良铭单欢欢终于走了,大连人4将出走,谢晖成光杆司令底线留住林良铭山东泰山队成都蓉城浙江队和天津津门虎都引援了,深圳队广州城生存出现了问题,悲欢是不能相同的。大连人则是一个中间位置,球队
神奇主席托尼布鲁姆格拉汉姆波特凯塞多麦克利斯特三沾薰,近期英超赛场炙手可热的球员和教练,都和英超球队布莱顿有关。但你能想象吗,很多球员都是俱乐部主席用系统筛选出来的?现在布莱顿在竞争激烈的英超赛场排
南华大学2022年十大教书育人案例1。杨宏发用生命诠释医者仁心杨宏发,中共党员,附属第二医院心血管内科主治医师第二临床学院教师,是湖南省第一批援鄂医疗队员,获全国五一劳动奖章湖南省优秀共产党员湖南省抗疫先进个人20
王雨馨承认和黄景瑜结过婚,醉酒脸红眼神迷离,结婚证曝光带钢印王雨馨承认和黄景瑜结过婚近期,迪丽热巴和黄景瑜陷入隐婚怀孕的风波中,尽管迪丽热巴多次否认,并起诉相关人士,还不及黄景瑜前任王雨馨的一句话管用,王雨馨为热巴澄清,别再编了,热巴人挺可
缅甸赛马争霸赛即将开办,全国赛马好手集聚让我们红尘作伴活得潇潇洒洒策马奔腾共享人世繁华对酒当歌唱出心中喜悦轰轰烈烈把握青春年华歌声响起,我们的记忆也随之回到了当年看还珠格格的时候。当电视里的阿哥和格格们策马奔腾,您是否也
景致记录在徐霞客眼中的广西南丹州,建筑美而且荒凉,驿卒刁蛮头条创作挑战赛头条带你游广西在前一篇旅游文里,介绍到徐霞客眼里的广西南丹州,建筑美丽山水也美丽,只是看见荒凉的寺庙,倍感无奈。谁曾想,南丹州的刁蛮驿卒,不仅轻慢于徐霞客,还无故克扣
台湾考生免试进985,好事?坏事?最新消息凡是台湾高中毕业生可凭当年学测成绩,在内地(祖国大陆)高校面向港澳台招生信息网,报考大陆高校。2023年开放在线申请的学校包括清华大学北京大学浙江大学中山大学厦门大学等26
花4万多白转黑没效果原理像种菜?湖州德清的朱女士向我们反映,她在当地一家阿芳理发店做了白转黑,结果白头发反而变多了。理发店做白转黑原理就像种菜?朱女士这个项目就是头发白的把你变黑,她说头发白了是需要营养,缺少营养
韩国总统纪念抗日却带头向日本跪了不是军国主义是亲密伙伴下面这张图片,是韩国总统带着他的网红脸老婆出席三一运动纪念活动,正在向韩国的抗日先辈致敬。然而,在致敬之后,却发生了让人意想不到的一幕这名韩国总统带头向日本跪了。一三一运动本来是韩
合理分担生育成本消除隐孕之忧来源中国经济网新闻背景当下,隐孕成为一些职场女性不得已的生存策略。担忧被降薪辞退,影响试用期转正升职加薪等,是她们选择隐孕的重要原因。在隐孕的另一端,则是一些企业对于孕产期女职工可
985研究生提醒学弟不要尽信HR秋招时不要把鸡蛋放在一个篮子里栗子(化名)同学是某985高校的一名研二学生,近日他在B站上说自己在秋招时遭受HR欺骗性对待。栗子表示HR的口头承诺不可信,海量HC更不可信,求人不如求己,尽可能地不要把鸡蛋放在一
宝宝到厌奶期了吗?一般来说厌奶期可能持续几周到一个月左右,甚至也有持续两个月的。厌奶的起因有很多种,主要可以分为病理性厌奶和生理性厌奶两种1。生理性厌奶生理性厌奶是由于宝宝生长发育到了一定阶段后,肠
生育登记取消结婚限制鼓励当小三?扯淡2023年1月,四川出台新的四川省生育登记服务管理办法,其中一条是取消是否结婚的限制。这个问题引发全民热议。首先我们要明确生育登记的目的是什么。生育登记有助于准确统计出生人口数量生
从计划生育到鼓励生育经历了什么?自从1982年把计划生育定位基本国策到如今的放开生育鼓励生育,从当时超生罚款到如今奖励钱,这其中到底经历了什么?为什么现在这么多人都不肯多要孩子甚至不生孩子,最根本的原因就是钱!!
可别小看了它!宝宝口臭肚子胀不消化,我都用它来解决!红薯燕麦蛋挞推荐月龄12M配餐用途主食功效锻炼抓握防便秘锻炼咀嚼营养特点过年啦,我们都带着宝宝走亲戚,到哪里都是各种美食伺候着,春节就是吃货的狂欢节,不仅妈妈们每逢佳节胖三斤,宝宝
人类史最有影响100人(之82)杨坚第82,隋文帝杨坚(公元541公元604)隋朝上承魏晋南北朝的纷争,下启大唐盛世的序幕,开国皇帝杨坚功不可没。公元541年杨坚出生在北方的名门望族,相传他是西汉太尉杨震后代,北周名
萧涛涛年薪原本8万,恒大直接给1400万!退籍前狂挣超4000万萧涛涛目前已经完成了退籍程序,重新恢复了秘鲁国籍。根据国内媒体的报道,萧涛涛原本在秘鲁国内效力时年薪只有8万,但恒大一下子开出200万美元的年薪(1400万人民币左右),在退籍前,