范文健康探索娱乐情感热点
投稿投诉
热点动态
科技财经
情感日志
励志美文
娱乐时尚
游戏搞笑
探索旅游
历史星座
健康养生
美丽育儿
范文作文
教案论文
国学影视

SRS之RTMP推拉流分析

  SRS是一个简单高效的实时视频服务器,支持RTMP/WebRTC/HLS/HTTP-FLV/SRT/GB28181;本文以SRS4.0版本进行分析RTMP推拉流架构,SRS整体架构如下图(官网图片)所示:
  有图可知SRS支持多种客户端以不同的媒流体协议进行推流、拉流,内部还包括了不同协议的转换,同时还支持SRS的集群。 推荐视频:SRS 4.0 RTMP推流读取数据,拉流转发数据
  本文主要分析在SRS中RTMP的推流、拉流源码分析,其核心类如下: SrsServer SRS流媒体服务
  SrsBufferListener 监听器,主要是TCP的监听 SrsTcpListener TCP监听器 SrsRtmpConn RTMP连接,
  对应了SrsStSocket和SrsCoroutine SrsRtmpServer 提供与客户端之间的RTMP-命令-协议-消息的交互服务,使SrsRtmpConn 提供的socket读写数据 SrsLiveSource 描述路播放源,包括推流和拉流的描述 SrsLiveConsumer 拉流消费者,每路拉流客户端对应个SrsLiveConsumer SrsStSocket 经过封装的socket接SrsRecvThread 负责接收数据,但是要注意的是他这并不是从IO读取数据,从SrsRtmpServer类拉取数据,然后推送到SrsPublishRecvThread(推流 ),或者 SrsQueueRecvThread(拉流) SrsQueueRecvThread 主要于拉流,对应的是客户端-服务器的控制消息,和 视频消息没有关系。客 户端读取数据还是从consumer的queue去读取。
  SrsPublishRecvThread 主要于推流,内部封装了协程
  RTMP推拉流代码流程如下:
  C++音视频学习资料免费获取方法:关注音视频开发T哥  ,点击「链接」即可免费获取2023年最新 C++音视频开发进阶独家免费学习大礼包!
  SRS网络模型分析
  在主函数run_hybrid_server中开始于_srs_hybrid->run()轮询,通过流体服务SrsServer::listen()进入服务端监听,这里分别对不同的协议进行了不同的监听处理,代码如下: srs_error_t SrsServer::listen() {     srs_error_t err = srs_success;     //rtmp的listen     if ((err = listen_rtmp()) != srs_success) {         return srs_error_wrap(err, "rtmp listen");     }          if ((err = listen_http_api()) != srs_success) {         return srs_error_wrap(err, "http api listen");     }      if ((err = listen_https_api()) != srs_success) {         return srs_error_wrap(err, "https api listen");     }          if ((err = listen_http_stream()) != srs_success) {         return srs_error_wrap(err, "http stream listen");     }      if ((err = listen_https_stream()) != srs_success) {         return srs_error_wrap(err, "https stream listen");     }          if ((err = listen_stream_caster()) != srs_success) {         return srs_error_wrap(err, "stream caster listen");     }          if ((err = conn_manager->start()) != srs_success) {         return srs_error_wrap(err, "connection manager");     }      return err; }
  进入RTMP对应的listen,这里主要通过SrsBufferListener进一步封装了listen,包括http api、https api的监听都是用SrsBufferListener统一的封装类;
  C++音视频学习资料免费获取方法:关注音视频开发T哥  ,点击「链接」即可免费获取2023年最新 C++音视频开发进阶独家免费学习大礼包!
  srs_error_t SrsBufferListener::listen(string i, int p) {     srs_error_t err = srs_success;          ip = i;     port = p;          srs_freep(listener);     listener = new SrsTcpListener(this, ip, port);//new一个SrsTcpListener对象,传一个指针          if ((err = listener->listen()) != srs_success) {         return srs_error_wrap(err, "buffered tcp listen");     }          string v = srs_listener_type2string(type);     srs_trace("%s listen at tcp://%s:%d, fd=%d", v.c_str(), ip.c_str(), port, listener->fd());          return err; }
  在new SrsTcpListener 时传入了this,其实是在构造的时候给handler赋值,继续进入SrsTcpListener::listen() //每一个监听,对应一个协程 srs_error_t SrsTcpListener::listen() {     srs_error_t err = srs_success;     //rtmp使用的是tcp,开始listen     if ((err = srs_tcp_listen(ip, port, &lfd)) != srs_success) {         return srs_error_wrap(err, "listen at %s:%d", ip.c_str(), port);     }          srs_freep(trd);     trd = new SrsSTCoroutine("tcp", this);//创建一个协程,传一个用户(SrsTcpListener)指针,如果协程需要回调,可以通过指针找到对应的对象     if ((err = trd->start()) != srs_success) {//启动协程,执行SrsSTCoroutine::cycle(),即handle->cycle(),最终是SrsTcpListener::cycle()         return srs_error_wrap(err, "start coroutine");     }          return err; }
  启动协程进行监听,执行cycle(),代码如下: srs_error_t SrsTcpListener::cycle() {     srs_error_t err = srs_success;          while (true) {         if ((err = trd->pull()) != srs_success) {//读取错误码,判断协程是否结束,不为srs_success时,说明该协程要退出             return srs_error_wrap(err, "tcp listener");         }        //         srs_netfd_t fd = srs_accept(lfd, NULL, NULL, SRS_UTIME_NO_TIMEOUT);//检测新连接         if(fd == NULL){             return srs_error_new(ERROR_SOCKET_ACCEPT, "accept at fd=%d", srs_netfd_fileno(lfd));         }                  if ((err = srs_fd_closeexec(srs_netfd_fileno(fd))) != srs_success) {             return srs_error_wrap(err, "set closeexec");         }                  if ((err = handler->on_tcp_client(fd)) != srs_success) {//handle就是new一个SrsTcpListener对象时,传入的ISrsTcpHandler指针,即SrsBufferListener(SrsBufferListener继承了ISrsTcpHandler)             return srs_error_wrap(err, "handle fd=%d", srs_netfd_fileno(fd));         }     }          return err; }
  这里的on_tcp_client实际执行的就是构造函数时传入this,即SrsBufferListener的成员函数,代码如下: //监听新的连接 srs_error_t SrsBufferListener::on_tcp_client(srs_netfd_t stfd) {     srs_error_t err = server->accept_client(type, stfd);     if (err != srs_success) {         srs_warn("accept client failed, err is %s", srs_error_desc(err).c_str());         srs_freep(err);     }          return srs_success; }
  进入accept_client代码如下: //type传递了对应的连接类型 srs_error_t SrsServer::accept_client(SrsListenerType type, srs_netfd_t stfd) {     srs_error_t err = srs_success;          ISrsStartableConneciton* conn = NULL;     //将fd和一个conn绑定,并返回一个连接conn     if ((err = fd_to_resource(type, stfd, &conn)) != srs_success) {         if (srs_error_code(err) == ERROR_SOCKET_GET_PEER_IP && _srs_config->empty_ip_ok()) {             srs_close_stfd(stfd); srs_error_reset(err);             return srs_success;         }         return srs_error_wrap(err, "fd to resource");     }     srs_assert(conn);          // directly enqueue, the cycle thread will remove the client.     conn_manager->add(conn);//把连接添加到conn_manager进行管理     //启动类型对应的协程,比如启动rtmp连接对应的协程,每个SrsRtmpConn都有1:1对应的协程     if ((err = conn->start()) != srs_success) {         return srs_error_wrap(err, "start conn coroutine");     }          return err; }
  此处首先将fd和一个conn绑定,并返回一个连接conn,代码如下: srs_error_t SrsServer::fd_to_resource(SrsListenerType type, srs_netfd_t stfd, ISrsStartableConneciton** pr) {     srs_error_t err = srs_success;     int fd = srs_netfd_fileno(stfd);     string ip = srs_get_peer_ip(fd);     int port = srs_get_peer_port(fd);     .....     .....          // 最大连接数判断处理     .....     .....      // The context id may change during creating the bellow objects.     SrsContextRestore(_srs_context->get_id());     //new一个类型对应的连接     if (type == SrsListenerRtmpStream) {         *pr = new SrsRtmpConn(this, stfd, ip, port);     } else if (type == SrsListenerHttpApi) {         *pr = new SrsHttpApi(false, this, stfd, http_api_mux, ip, port);     } else if (type == SrsListenerHttpsApi) {         *pr = new SrsHttpApi(true, this, stfd, http_api_mux, ip, port);     } else if (type == SrsListenerHttpStream) {         *pr = new SrsResponseOnlyHttpConn(false, this, stfd, http_server, ip, port);     } else if (type == SrsListenerHttpsStream) {         *pr = new SrsResponseOnlyHttpConn(true, this, stfd, http_server, ip, port);     } else {         srs_warn("close for no service handler. fd=%d, ip=%s:%d", fd, ip.c_str(), port);         srs_close_stfd(stfd);         return err;     }          return err; }
  其次时将连接conn添加到conn_manager进行管理,最后conn->start()启动协程进行接收/发送数据的处理,这里每一个SrsRtmpConn连接都有1:1对应SrsCoroutine协程,启动后进入SrsRtmpConn::do_cycle()轮询,代码如下: // rtmp接收数据处理 srs_error_t SrsRtmpConn::do_cycle() {     srs_error_t err = srs_success;          srs_trace("RTMP client ip=%s:%d, fd=%d", ip.c_str(), port, srs_netfd_fileno(stfd));     //设置收发超时时间     rtmp->set_recv_timeout(SRS_CONSTS_RTMP_TIMEOUT);     rtmp->set_send_timeout(SRS_CONSTS_RTMP_TIMEOUT);     //rtmp 握手     if ((err = rtmp->handshake()) != srs_success) {         return srs_error_wrap(err, "rtmp handshake");     }     //rtmp代理相关     uint32_t rip = rtmp->proxy_real_ip();     if (rip > 0) {         srs_trace("RTMP proxy real client ip=%d.%d.%d.%d",             uint8_t(rip>>24), uint8_t(rip>>16), uint8_t(rip>>8), uint8_t(rip));     }          SrsRequest* req = info->req;     if ((err = rtmp->connect_app(req)) != srs_success) {//握手成功后,处理client发送的connect         return srs_error_wrap(err, "rtmp connect tcUrl");     }          // set client ip to request.     req->ip = ip;//保存客户端IP          srs_trace("connect app, tcUrl=%s, pageUrl=%s, swfUrl=%s, schema=%s, vhost=%s, port=%d, app=%s, args=%s",         req->tcUrl.c_str(), req->pageUrl.c_str(), req->swfUrl.c_str(),         req->schema.c_str(), req->vhost.c_str(), req->port,         req->app.c_str(), (req->args? "(obj)":"null"));          // show client identity     if(req->args) {         std::string srs_version;         std::string srs_server_ip;         int srs_pid = 0;         int srs_id = 0;                  SrsAmf0Any* prop = NULL;         if ((prop = req->args->ensure_property_string("srs_version")) != NULL) {             srs_version = prop->to_str();         }         if ((prop = req->args->ensure_property_string("srs_server_ip")) != NULL) {             srs_server_ip = prop->to_str();         }         if ((prop = req->args->ensure_property_number("srs_pid")) != NULL) {             srs_pid = (int)prop->to_number();         }         if ((prop = req->args->ensure_property_number("srs_id")) != NULL) {             srs_id = (int)prop->to_number();         }                  if (srs_pid > 0) {             srs_trace("edge-srs ip=%s, version=%s, pid=%d, id=%d",                 srs_server_ip.c_str(), srs_version.c_str(), srs_pid, srs_id);         }     }     //     if ((err = service_cycle()) != srs_success) {         err = srs_error_wrap(err, "service cycle");     }          srs_error_t r0 = srs_success;     if ((r0 = on_disconnect()) != srs_success) {         err = srs_error_wrap(err, "on disconnect %s", srs_error_desc(r0).c_str());         srs_freep(r0);     }          // If client is redirect to other servers, we already logged the event.     if (srs_error_code(err) == ERROR_CONTROL_REDIRECT) {         srs_error_reset(err);     }          return err; }
  开始进行RTMP正常的握手交互过程、设置收发超时、rtmp代理,握手成功(处理client发送的connect请求);进入service_cycle(),继续数据交互,设置窗口大小、带宽大小、chunk大小、连接成功响应客户端。 {     srs_error_t err = srs_success;          SrsRequest* req = info->req;     //窗口大小设置     int out_ack_size = _srs_config->get_out_ack_size(req->vhost);     if (out_ack_size && (err = rtmp->set_window_ack_size(out_ack_size)) != srs_success) {         return srs_error_wrap(err, "rtmp: set out window ack size");     }          int in_ack_size = _srs_config->get_in_ack_size(req->vhost);     if (in_ack_size && (err = rtmp->set_in_window_ack_size(in_ack_size)) != srs_success) {         return srs_error_wrap(err, "rtmp: set in window ack size");     }     //带宽设置     if ((err = rtmp->set_peer_bandwidth((int)(2.5 * 1000 * 1000), 2)) != srs_success) {         return srs_error_wrap(err, "rtmp: set peer bandwidth");     }          // get the ip which client connected.     std::string local_ip = srs_get_local_ip(srs_netfd_fileno(stfd));          // do bandwidth test if connect to the vhost which is for bandwidth check.     if (_srs_config->get_bw_check_enabled(req->vhost)) {         if ((err = bandwidth->bandwidth_check(rtmp, skt, req, local_ip)) != srs_success) {             return srs_error_wrap(err, "rtmp: bandwidth check");         }         return err;     }          // set chunk size to larger.     // set the chunk size before any larger response greater than 128,     // to make OBS happy, @see https://github.com/ossrs/srs/issues/454     int chunk_size = _srs_config->get_chunk_size(req->vhost); //从配置文件读取chunk size大小,进行设置,一般设置60k,如果太小就得拆分     if ((err = rtmp->set_chunk_size(chunk_size)) != srs_success) {         return srs_error_wrap(err, "rtmp: set chunk size %d", chunk_size);     }          // response the client connect ok.     if ((err = rtmp->response_connect_app(req, local_ip.c_str())) != srs_success) {//连接成功,响应客户端         return srs_error_wrap(err, "rtmp: response connect app");     }          if ((err = rtmp->on_bw_done()) != srs_success) {         return srs_error_wrap(err, "rtmp: on bw down");     }     //真正的循环     while (true) {         if ((err = trd->pull()) != srs_success) {             return srs_error_wrap(err, "rtmp: thread quit");         }                  err = stream_service_cycle();        .........    .........     }          return err; }
  来到stream_service_cycle(),才是真正推流、拉流处理,值得注意的是,还对cache gop是否开启的设置。 srs_error_t SrsRtmpConn::stream_service_cycle() {     srs_error_t err = srs_success;     ......     ......     // find a source to serve.     SrsLiveSource* source = NULL;//一个直播对应一个SrsLiveSource,一个推流,0 N个拉流     if ((err = _srs_sources->fetch_or_create(req, server, &source)) != srs_success) {//查找/创建一个source         return srs_error_wrap(err, "rtmp: fetch source");     }     srs_assert(source != NULL);     //读取配置文件,设置是否需要cache gop     bool enabled_cache = _srs_config->get_gop_cache(req->vhost);//默认是开的     srs_trace("source url=%s, ip=%s, cache=%d, is_edge=%d, source_id=%s/%s",         req->get_stream_url().c_str(), ip.c_str(), enabled_cache, info->edge, source->source_id().c_str(), source->pre_source_id().c_str());     source->set_cache(enabled_cache);//设置     //推流、拉流处理     switch (info->type) {         case SrsRtmpConnPlay: {             // response connection start play             if ((err = rtmp->start_play(info->res->stream_id)) != srs_success) {                 return srs_error_wrap(err, "rtmp: start play");             }             if ((err = http_hooks_on_play()) != srs_success) {                 return srs_error_wrap(err, "rtmp: callback on play");             }             //拉流             err = playing(source);             http_hooks_on_stop();                          return err;         }         case SrsRtmpConnFMLEPublish: {//RTMP基本走这里             if ((err = rtmp->start_fmle_publish(info->res->stream_id)) != srs_success) {//接收客户端相应的消息,并返回对应的响应                 return srs_error_wrap(err, "rtmp: start FMLE publish");             }                          return publishing(source);         }         case SrsRtmpConnHaivisionPublish: {             if ((err = rtmp->start_haivision_publish(info->res->stream_id)) != srs_success) {                 return srs_error_wrap(err, "rtmp: start HAIVISION publish");             }                          return publishing(source);         }         case SrsRtmpConnFlashPublish: {             if ((err = rtmp->start_flash_publish(info->res->stream_id)) != srs_success) {                 return srs_error_wrap(err, "rtmp: start FLASH publish");             }                          return publishing(source);         }         default: {             return srs_error_new(ERROR_SYSTEM_CLIENT_INVALID, "rtmp: unknown client type=%d", info->type);         }     }          return err; }
   推流流程
  推流流程主要是do_publishing,需要注意的是使用SrsPublishRecvThread封装好的协程与拉流使用的SrsQueueRecvThread区分开来,其代码如下: //推流流程 srs_error_t SrsRtmpConn::publishing(SrsLiveSource* source) {     srs_error_t err = srs_success;          SrsRequest* req = info->req;     ..............// TODO: FIXME: Should refine the state of publishing.     if ((err = acquire_publish(source)) == srs_success) {         // 协程实际是SrsPublishRecvThread内部封装的SrsRecvThread的SrsCoroutine成员变量trd,主要看do_cycle()的流程         // 参数:rtmp:在协程中有一些rtmp接收数据的处理,req:URL相关,         SrsPublishRecvThread rtrd(rtmp, req, srs_netfd_fileno(stfd), 0, this, source, _srs_context->get_id());         err = do_publishing(source, &rtrd);//实际推流流程,source就是直播对应的那个source         rtrd.stop();     }     ...........     return err; }  srs_error_t SrsRtmpConn::do_publishing(SrsLiveSource* source, SrsPublishRecvThread* rtrd) {     srs_error_t err = srs_success;          SrsRequest* req = info->req;     SrsPithyPrint* pprint = SrsPithyPrint::create_rtmp_publish();     SrsAutoFree(SrsPithyPrint, pprint);      // update the statistic when source disconveried.     SrsStatistic* stat = SrsStatistic::instance();     if ((err = stat->on_client(_srs_context->get_id().c_str(), req, this, info->type)) != srs_success) {         return srs_error_wrap(err, "rtmp: stat client");     }      // start isolate recv thread.     // TODO: FIXME: Pass the callback here.     if ((err = rtrd->start()) != srs_success) {//启动协程,SrsRecvThread::do_cycle()轮询读取数据         return srs_error_wrap(err, "rtmp: receive thread");     }          // initialize the publish timeout.     publish_1stpkt_timeout = _srs_config->get_publish_1stpkt_timeout(req->vhost);     publish_normal_timeout = _srs_config->get_publish_normal_timeout(req->vhost);          // set the sock options.     set_sock_options();          if (true) {         bool mr = _srs_config->get_mr_enabled(req->vhost);         srs_utime_t mr_sleep = _srs_config->get_mr_sleep(req->vhost);         srs_trace("start publish mr=%d/%d, p1stpt=%d, pnt=%d, tcp_nodelay=%d",             mr, srsu2msi(mr_sleep), srsu2msi(publish_1stpkt_timeout), srsu2msi(publish_normal_timeout), tcp_nodelay);     }          int64_t nb_msgs = 0;     uint64_t nb_frames = 0;     while (true) {         if ((err = trd->pull()) != srs_success) {             return srs_error_wrap(err, "rtmp: thread quit");         }          pprint->elapse();          // cond wait for timeout.         if (nb_msgs == 0) {             // when not got msgs, wait for a larger timeout.             // @see https://github.com/ossrs/srs/issues/441             rtrd->wait(publish_1stpkt_timeout);         } else {             rtrd->wait(publish_normal_timeout);         }                  // check the thread error code.         if ((err = rtrd->error_code()) != srs_success) {             return srs_error_wrap(err, "rtmp: receive thread");         }                  // when not got any messages, timeout. 超时处理         if (rtrd->nb_msgs() <= nb_msgs) {             return srs_error_new(ERROR_SOCKET_TIMEOUT, "rtmp: publish timeout %dms, nb_msgs=%d",                 nb_msgs? srsu2msi(publish_normal_timeout) : srsu2msi(publish_1stpkt_timeout), (int)nb_msgs);         }         nb_msgs = rtrd->nb_msgs();//收到消息数量                  // Update the stat for video fps.         // @remark https://github.com/ossrs/srs/issues/851         SrsStatistic* stat = SrsStatistic::instance();         if ((err = stat->on_video_frames(req, (int)(rtrd->nb_video_frames() - nb_frames))) != srs_success) {             return srs_error_wrap(err, "rtmp: stat video frames");         }         nb_frames = rtrd->nb_video_frames();//视频帧数量          // reportable         if (pprint->can_print()) {             kbps->sample();             bool mr = _srs_config->get_mr_enabled(req->vhost);             srs_utime_t mr_sleep = _srs_config->get_mr_sleep(req->vhost);             srs_trace("<- " SRS_CONSTS_LOG_CLIENT_PUBLISH " time=%d, okbps=%d,%d,%d, ikbps=%d,%d,%d, mr=%d/%d, p1stpt=%d, pnt=%d",                 (int)pprint->age(), kbps->get_send_kbps(), kbps->get_send_kbps_30s(), kbps->get_send_kbps_5m(),                 kbps->get_recv_kbps(), kbps->get_recv_kbps_30s(), kbps->get_recv_kbps_5m(), mr, srsu2msi(mr_sleep),                 srsu2msi(publish_1stpkt_timeout), srsu2msi(publish_normal_timeout));//码率的计算,s,30s,5min的码率         }     }          return err; }
  看看SrsPublishRecvThread的成员SrsRecvThread trd 的do_cycle()的处理,这里主要是rtmp->recv_message(&msg)接收消息,pumper->consume(msg)把消息推送给消费者。 srs_error_t SrsRecvThread::do_cycle() {     srs_error_t err = srs_success;          while (true) {         if ((err = trd->pull()) != srs_success) {             return srs_error_wrap(err, "recv thread");         }                  // When the pumper is interrupted, wait then retry.         if (pumper->interrupted()) {             srs_usleep(timeout);             continue;         }                  SrsCommonMessage* msg = NULL;                  // Process the received message. 处理收到的消息,rtmp由SrsPublishRecvThread的构造函数传进来         if ((err = rtmp->recv_message(&msg)) == srs_success) {             err = pumper->consume(msg);//推送给消费者,pumper也是从SrsPublishRecvThread的SrsRecvThread成员变量trd的构造函数传进来的         }                  if (err != srs_success) {             // Interrupt the receive thread for any error.             trd->interrupt();                          // Notify the pumper to quit for error.             pumper->interrupt(err);                          return srs_error_wrap(err, "recv thread");         }     }          return err; }
  consume内部进行消息数量、视频帧数量的统计,然后 conn->handle_publish_message( source, msg)对消息的处理,最终执行函数process_publish_message()。 //audio、video、metaData处理 srs_error_t SrsRtmpConn::process_publish_message(SrsLiveSource* source, SrsCommonMessage* msg) {     srs_error_t err = srs_success;          // for edge, directly proxy message to origin.     if (info->edge) {         if ((err = source->on_edge_proxy_publish(msg)) != srs_success) {             return srs_error_wrap(err, "rtmp: proxy publish");         }         return err;     }          // process audio packet RTMP_MSG_AudioMessage  8      if (msg->header.is_audio()) {         if ((err = source->on_audio(msg)) != srs_success) {//audio的处理             return srs_error_wrap(err, "rtmp: consume audio");         }         return err;     }     // process video packet RTMP_MSG_VideoMessage 9     if (msg->header.is_video()) {         if ((err = source->on_video(msg)) != srs_success) {//video处理             return srs_error_wrap(err, "rtmp: consume video");         }         return err;     }          // process aggregate packet     if (msg->header.is_aggregate()) {         if ((err = source->on_aggregate(msg)) != srs_success) {             return srs_error_wrap(err, "rtmp: consume aggregate");         }         return err;     }          // process onMetaData MetaData处理  RTMP_MSG_AMF0DataMessage 18 或  RTMP_MSG_AMF3DataMessage 15     if (msg->header.is_amf0_data() || msg->header.is_amf3_data()) {         SrsPacket* pkt = NULL;         if ((err = rtmp->decode_message(msg, &pkt)) != srs_success) {             return srs_error_wrap(err, "rtmp: decode message");         }         SrsAutoFree(SrsPacket, pkt);                  if (dynamic_cast(pkt)) {             SrsOnMetaDataPacket* metadata = dynamic_cast(pkt);//将packet转成metaData             if ((err = source->on_meta_data(msg, metadata)) != srs_success) {                 return srs_error_wrap(err, "rtmp: consume metadata");             }             return err;         }         return err;     }          return err; }
  process_publish_message对音频、视频、metaData进行处理;先看看音频处理,把msg发送给每一个拉流端消费者,这里的consumers容器保存所有拉流端消费者,在拉流流程中,新建消费者时添加的。 //音频数据处理 srs_error_t SrsLiveSource::on_audio(SrsCommonMessage* shared_audio) {     srs_error_t err = srs_success;     .................          // convert shared_audio to msg, user should not use shared_audio again.     // 通过引用计数的方式,创建一个消息     SrsSharedPtrMessage msg;//类似智能指针,数据拷贝实际上是浅拷贝,通过引用计数方式,为0释放内存     if ((err = msg.create(shared_audio)) != srs_success) {         return srs_error_wrap(err, "create message");     }          // directly process the audio message.     if (!mix_correct) {//默认不做校正,就直接处理,就是不用放到map进行排序         return on_audio_imp(&msg);     }          // insert msg to the queue.      mix_queue->push(msg.copy());//把流消息都插入到队列中,内部并按时间戳做了排序          // fetch someone from mix queue. 从map中取出来     SrsSharedPtrMessage* m = mix_queue->pop();//pop时间戳最小的出来     if (!m) {         return err;     }          // consume the monotonically increase message.     if (m->is_audio()) {         err = on_audio_imp(m);     } else {         err = on_video_imp(m);     }     srs_freep(m);          return err; }  srs_error_t SrsLiveSource::on_audio_imp(SrsSharedPtrMessage* msg) {     srs_error_t err = srs_success;          ............................      // copy to all consumer 把msg拷贝到消费者对象的队列中,即把数据发给每个拉流端     if (!drop_for_reduce) {         for (int i = 0; i < (int)consumers.size(); i++) {             SrsLiveConsumer* consumer = consumers.at(i);             if ((err = consumer->enqueue(msg, atc, jitter_algorithm)) != srs_success) {//把消息放到消费者队列                 return srs_error_wrap(err, "consume message");             }         }     }          // cache the sequence header of aac, or first packet of mp3.     // for example, the mp3 is used for hls to write the "right" audio codec.     // TODO: FIXME: to refine the stream info system.     if (is_aac_sequence_header || !meta->ash()) {         if ((err = meta->update_ash(msg)) != srs_success) { //更新audio sequence             return srs_error_wrap(err, "meta consume audio");         }     }          // when sequence header, donot push to gop cache and adjust the timestamp.     if (is_sequence_header) {         return err;     }          // cache the last gop packets     if ((err = gop_cache->cache(msg)) != srs_success) {         return srs_error_wrap(err, "gop cache consume audio");     }  .............          return err; }
  类似的视频处理,如下: srs_error_t SrsLiveSource::on_video(SrsCommonMessage* shared_video) {     srs_error_t err = srs_success;          .........................................          // convert shared_video to msg, user should not use shared_video again.     // the payload is transfer to msg, and set to NULL in shared_video.     SrsSharedPtrMessage msg;//智能指针的封装     if ((err = msg.create(shared_video)) != srs_success) {         return srs_error_wrap(err, "create message");     }          // directly process the video message.     if (!mix_correct) {         return on_video_imp(&msg);     }          // insert msg to the queue.     mix_queue->push(msg.copy());//把流消息都插入到队列中,内部并按时间戳做了排序          // fetch someone from mix queue.     SrsSharedPtrMessage* m = mix_queue->pop();//pop时间戳最小的消息出来     if (!m) {         return err;     }          // consume the monotonically increase message.     if (m->is_audio()) {         err = on_audio_imp(m);     } else {         err = on_video_imp(m);     }     srs_freep(m);          return err; }  srs_error_t SrsLiveSource::on_video_imp(SrsSharedPtrMessage* msg) {     srs_error_t err = srs_success;     .....................     // cache the sequence header if h264     // donot cache the sequence header to gop_cache, return here.     if (is_sequence_header && (err = meta->update_vsh(msg)) != srs_success) { //更新video sequence         return srs_error_wrap(err, "meta update video");     }          // Copy to hub to all utilities.     if ((err = hub->on_video(msg, is_sequence_header)) != srs_success) {         return srs_error_wrap(err, "hub consume video");     }      // For bridger to consume the message.     if (bridger_ && (err = bridger_->on_video(msg)) != srs_success) {         return srs_error_wrap(err, "bridger consume video");     }      // copy to all consumer 把数据发给拉流端的消费者(队列中)     if (!drop_for_reduce) {         for (int i = 0; i < (int)consumers.size(); i++) {             SrsLiveConsumer* consumer = consumers.at(i);             if ((err = consumer->enqueue(msg, atc, jitter_algorithm)) != srs_success) {//把消息放到消费者队列中                 return srs_error_wrap(err, "consume video");             }         }     }          // when sequence header, donot push to gop cache and adjust the timestamp.     if (is_sequence_header) {         return err;     }          // cache the last gop packets       cache gop 如果是I帧,就会清空掉,重新push新的数据     if ((err = gop_cache->cache(msg)) != srs_success) {         return srs_error_wrap(err, "gop cache consume vdieo");     }     .........     return err; }
  metaData的处理如下: srs_error_t SrsLiveSource::on_meta_data(SrsCommonMessage* msg, SrsOnMetaDataPacket* metadata) {     srs_error_t err = srs_success;     ..............          // Update the meta cache.  更新metaData保存起来     bool updated = false;     if ((err = meta->update_data(&msg->header, metadata, updated)) != srs_success) {         return srs_error_wrap(err, "update metadata");     }     if (!updated) {         return err;     }          // when already got metadata, drop when reduce sequence header.     bool drop_for_reduce = false;     if (meta->data() && _srs_config->get_reduce_sequence_header(req->vhost)) {         drop_for_reduce = true;         srs_warn("drop for reduce sh metadata, size=%d", msg->size);     }          // copy to all consumer 把推流端发的metaData也插入消费队列中,便于拉流者知道     if (!drop_for_reduce) {         std::vector::iterator it;         for (it = consumers.begin(); it != consumers.end(); ++it) {             SrsLiveConsumer* consumer = *it;             if ((err = consumer->enqueue(meta->data(), atc, jitter_algorithm)) != srs_success) {                 return srs_error_wrap(err, "consume metadata");             }         }     }          // Copy to hub to all utilities.     return hub->on_meta_data(meta->data(), metadata); }
  拉流流程
  首先每一个拉流端都会绑定一个SrsConsumer消费者,每一个消费者对应一个SrsQueueRecvThread协程,执行do_playing srs_error_t SrsRtmpConn::playing(SrsLiveSource* source) {     srs_error_t err = srs_success;     ........................          // Create a consumer of source.     SrsLiveConsumer* consumer = NULL;//消费者,每个拉流都会绑定一个SrsConsumer     SrsAutoFree(SrsLiveConsumer, consumer);     if ((err = source->create_consumer(consumer)) != srs_success) {         return srs_error_wrap(err, "rtmp: create consumer");     }     if ((err = source->consumer_dumps(consumer)) != srs_success) {         return srs_error_wrap(err, "rtmp: dumps consumer");     }          // 每一个消费者独立一个协程     SrsQueueRecvThread trd(consumer, rtmp, SRS_PERF_MW_SLEEP, _srs_context->get_id());          if ((err = trd.start()) != srs_success) {         return srs_error_wrap(err, "rtmp: start receive thread");     }          // Deliver packets to peer.     wakable = consumer;     err = do_playing(source, consumer, &trd);//每个流source绑定一个消费者SrsConsumer     wakable = NULL;          trd.stop();          // Drop all packets in receiving thread.     if (!trd.empty()) {         srs_warn("drop the received %d messages", trd.size());     }          return err; }  srs_error_t SrsRtmpConn::do_playing(SrsLiveSource* source, SrsLiveConsumer* consumer, SrsQueueRecvThread* rtrd) {     srs_error_t err = srs_success;     ...................................          while (true) {         // when source is set to expired, disconnect it.         if ((err = trd->pull()) != srs_success) {//判断协程是否退出             return srs_error_wrap(err, "rtmp: thread quit");         }          // collect elapse for pithy print.         pprint->elapse();          // to use isolate thread to recv, can improve about 33% performance.         while (!rtrd->empty()) {             SrsCommonMessage* msg = rtrd->pump();             if ((err = process_play_control_msg(consumer, msg)) != srs_success) {//播放控制处理                 return srs_error_wrap(err, "rtmp: play control message");             }         }                  // quit when recv thread error.         if ((err = rtrd->error_code()) != srs_success) {             return srs_error_wrap(err, "rtmp: recv thread");         }          #ifdef SRS_PERF_QUEUE_COND_WAIT         // wait for message to incoming.         // @see https://github.com/ossrs/srs/issues/257         consumer->wait(mw_msgs, mw_sleep);//等数据累积一段时间攒一定数据,再发送 #endif                  // get messages from consumer.         // each msg in msgs.msgs must be free, for the SrsMessageArray never free them.         // @remark when enable send_min_interval, only fetch one message a time.         int count = (send_min_interval > 0)? 1 : 0;         if ((err = consumer->dump_packets(&msgs, count)) != srs_success) {//从消费队列中一次读取出来,数据从SrsConsumer queue来,实际是从source给过来的             return srs_error_wrap(err, "rtmp: consumer dump packets");         }         ...................................................         // sendout messages, all messages are freed by send_and_free_messages().         // no need to assert msg, for the rtmp will assert it.         if (count > 0 && (err = rtmp->send_and_free_messages(msgs.msgs, count, info->res->stream_id)) != srs_success) {//发送数据,给到客户端,最终调用protocol封装好的socket api             return srs_error_wrap(err, "rtmp: send %d messages", count);         }                  // if duration specified, and exceed it, stop play live.         // @see: https://github.com/ossrs/srs/issues/45         if (user_specified_duration_to_stop) {             if (duration >= req->duration) {                 return srs_error_new(ERROR_RTMP_DURATION_EXCEED, "rtmp: time %d up %d", srsu2msi(duration), srsu2msi(req->duration));             }         }                  // apply the minimal interval for delivery stream in srs_utime_t.         if (send_min_interval > 0) {             srs_usleep(send_min_interval);         }          // Yield to another coroutines.         // @see https://github.com/ossrs/srs/issues/2194#issuecomment-777437476         srs_thread_yield();//让出cpu,让其他协程继续运行     }          return err; }
  do_playing内部process_play_control_msg播放控制处理,consumer->dump_packets(&msgs, count)从消费队列读取数据,最终rtmp->send_and_free_messages(msgs.msgs, count, info->res->stream_id)发送到play客户端。 作者:Mr Ju Ju
  原文链接:https://www.cnblogs.com/juju-go/p/17039564.html
  #音视频开发# #流媒体# #程序员#

中国人买布洛芬竟然还要看印度人脸色商业认知东北制药卖的扑热息痛两元20片无人知,而一盒20元的布洛芬却遭14亿国人疯抢,虽然如今供不应求,但它真的能赚大钱吗?我们来算笔账,先看成本,像芬必得,一粒原料成本是七分钱,中国金融专题盘活存量资产扩大有效投资专题盘活存量资产扩大有效投资编者按有效盘活存量资产,形成存量资产和新增投资的良性循环,对于提升基础设施运营管理水平拓宽社会投资渠道等具有重要意义。2022年5月,国务院办公厅印发关以蝴蝶为灵感,中国空间太阳能电站获新突破!两年内进行对地传输我国正在开发一个空间太阳能发电站,该空间站可以收集太阳能,并将其以微波束的形式发送到地球用于发电。中国科学家表示,他们利用一种以蝴蝶为灵感的实用解决方案,解决了在太空中产生强大能量韦伯望远镜的新星系照片让人瞠目结舌阅读文章之前,请点击关注,方便您回来查看内容,以及参与大家的互动,感谢您给予我码字动力!让您大饱眼福充满星系的宇宙詹姆斯韦伯太空望远镜非常强大。欧洲航天局最新发布的一张星系图像显示前宁波台主持人夫妇实控企业利安科技上会通过(第489篇)吾攀升创业板2022。12。30上会通过(橡胶和塑料制品业)宁波利安科技股份有限公司全文共10849字,阅读时长20分钟参考说明注1以上数字以及相关信息均来自宁波利安科技股份有限公司最新无定形冰或可揭示宇宙生命奥秘据英国泰晤士报网站2月2日报道,金汤力鸡尾酒中的冰可能是其他行星上存在生命的线索。报道说,你的金汤力鸡尾酒也许再也不一样了。科学家研制了一种新型冰既不漂浮也不下沉。他们相信,这项研国安海口开练斯坦利人人要有危机感9日,北京国安全队在位于海口观澜湖的训练基地开始了球队冬训的首堂训练课,主帅斯坦利表示,即将开始的新赛季球队会有比去年更高的目标,而在他本人看来,纪律态度和团结是最为重要的事情。过以后排骨都要这样做才好吃!红烧排骨(川味)本来不是去买排骨的,但是看到它们太可爱,就想带回家了,以后排骨这样煮,都可以吃3碗米饭用料排骨2根胡萝卜1根姜4片蒜2颗料酒2勺老抽1勺干花椒干辣椒少许豆瓣酱1。5勺盐少许白醋半勺我妈教我的这个排骨做法,试做一次就爱上了,简单健康还下饭蒸煮是简单方便健康无油烟的烹调方法。将煮沸后产生的蒸汽作为蒸煮食品的传热介质,更符合现代健康饮食的要求。最近就喜欢上的这个排骨的做法蒸!这个做法最重要的一点是简单,而且可以变换成不42岁董洁带货火了!不仅没被骂穿搭还掀起种草风,网友直言想买hi大家好一边和前夫潘粤明上演世纪大和解,一边在无人在意的角落,逆袭成直播带货女王,估计42岁的董洁自己都没有想到,从老天爷赏饭吃的云端跌落谷底后,有一天还能重新获得认可和赞美。就任天堂(NTDOY。US)塞尔达传说王国之泪涨至70美元其他公司或跟进智通财经APP获悉,任天堂(NTDOY。US)将于5月12日发售塞尔达传说荒野之息的续集塞尔达传说王国之泪,美版价格为69。99美元,涨价后将成为首个定价70美元的Switch游戏
数字货币正成为大国竞逐场数字货币正成为大国竞逐场(冯兴元,中国社会科学院农村发展研究所研究员庄希威,国家开发银行风险管理部)中国经济报告提要政府也要为发展数字货币提供某种通道,因为,数字货币领域在未来会成基金出现这样的信号可以持有过节今天市场高开,三大指数都是高开,而且高开的幅度是适中的,这有利于最后一天的震荡收阳走势。今天的上证指数又留下了一个向上的跳空缺口,缺口位置是昨天的收盘点位3240。28点,上沿是3试管婴儿门诊促排卵监测在门诊促排卵周期中,可有一个或多个优势卵泡发育成熟排卵,从而增加受精机会,以达到最大限度地增加妊娠率,但也增加了多胎妊娠的风险。适应证门诊促排卵监测适用于月经不规律低促性腺激素性卵北京疾控提醒阳康后,适当增加优质蛋白,切勿暴饮暴食北京市疾病预防控制中心春节到吃出健康幸福年千门万户曈曈日总把新桃换旧符辞旧迎新之际,我们将洗去一年的尘埃,拥抱充满希望的新年。身体健康是承载希望的基石,春节期间怎么吃,才能让我们的社火闹新春秦腔唱大戏!兰州春节文化庙会正月初二隍庙鸣锣20日记者获悉,我们的中国梦文化进万家2023年兰州春节文化庙会活动将于正月初二至正月初六(1月23日1月27日),每日上午1000至下午300,在兰州市第一工人俱乐部(隍庙)和张除夕快乐丨青海文旅兔年限定红包封面来啦!快来抢,手慢无!春节是一年中最重要的节日漂泊在外的游子都会赶着回家去和家人团聚享受阖家团圆的幸福辞暮尔尔烟火年年除夕是辞旧,也是迎新在这个新旧交替,辞旧迎新的日子祝大家新年快乐,阖家幸福万事顺意所任正非寒意继续蔓延!三大芯片巨头顶不住了任正非寒意继续蔓延!三大芯片巨头顶不住了在华为被美国卡脖子后,中国芯片行业的日子不好过。据外媒报道,目前,中国在芯片设计领域的优势已经不再明显。除了中芯国际外,华虹半导体台积电联电春晚女星穿搭,随便1件就是五六位数,杨紫最豪气身上穿了一套房嘿,辣条陪你唠嗑春晚明星得穿搭向来都会成为一时的潮流标,每次同款也都是秒出秒爆仓。今年自然也是如此,明星身上的穿搭除了好看,就是一个字贵,两个字死贵。个人感觉,全场最佳的是赵丽颖的紧急提醒!这6种微信红包千万别抢今天是大年初一小编祝大家新年快乐!新年发红包抢红包想必也是大家的必选动作又到了拼手速的时候但是,小编提醒大家以下微信红包千万别领!一需输密码的红包微信好友发来一个红包,点开时却提醒春晚毛衣连续美上热搜!兔耳同款抢镜,绿开衫12分钟卖断货hi大家新年快乐呀,在兔年的第一天,先祝大家钱兔似锦老规矩,又到了一年一度春晚后的讨论环节,看看今年谁穿的衣服最好看,谁的发型口红色号火了,谁又是今年的带货女王!看了圈你们的关注,卫裤搭配指南之白色棉服篇卫裤是秋冬必备裤,你有想过这么搭配吗?卫裤搭配好了,整个人气质不一样。以下为个人几点搭配指南上衣指南,遵循三色原则。即上衣白色,裤子我们就灰色,这里的裤子不要西裤,我们上衣选择白色