专栏电商日志财经减肥爱情
投稿投诉
爱情常识
搭配分娩
减肥两性
孕期塑形
财经教案
论文美文
日志体育
养生学堂
电商科学
头戴业界
专栏星座
用品音乐

聊聊KafkaConsumer源码解析之ConsumerNe

  一、Consumer的使用
  Consumer的源码解析主要来看KafkaConsumer,KafkaConsumer是Consumer接口的实现类。KafkaConsumer提供了一套封装良好的API,开发人员可以基于这套API轻松实现从Kafka服务端拉取消息的功能,这样开发人员根本不用关心与Kafka服务端之间网络连接的管理、心跳检测、请求超时重试等底层操作,也不必关心订阅Topic的分区数量、分区副本的网络拓扑以及ConsumerGroup的Rebalance等Kafka具体细节,KafkaConsumer中还提供了自动提交offset的功能,使的开发人员更加关注业务逻辑,提高了开发效率。
  下面我们来看一个KafkaConsumer的示例程序:author:微信公众号【老周聊架构】publicclassKafkaConsumerTest{publicstaticvoidmain(String〔〕args){PropertiespropsnewProperties();kafka地址,列表格式为host1:port1,host2:port2,。。。,无需添加所有的集群地址,kafka会根据提供的地址发现其他的地址(建议多提供几个,以防提供的服务器关闭)必须设置props。put(bootstrap。servers,localhost:9092);key序列化方式必须设置props。put(key。serializer,org。apache。kafka。common。serialization。StringSerializer);value序列化方式必须设置props。put(value。serializer,org。apache。kafka。common。serialization。StringSerializer);props。put(group。id,consumerriemanntest);KafkaConsumerString,StringconsumernewKafkaConsumer(props);可消费多个topic,组成一个listStringtopicriemannkafkatest;consumer。subscribe(Arrays。asList(topic));while(true){ConsumerRecordsString,Stringrecordsconsumer。poll(Duration。ofMillis(100));for(ConsumerRecordString,Stringrecord:records){System。out。printf(offsetd,keys,values,record。offset(),record。key(),record。value());try{Thread。sleep(100);}catch(InterruptedExceptione){e。printStackTrace();}}}}}
  从示例中可以看出KafkaConsumer的核心方法是poll(),它负责从Kafka服务端拉取消息。核心方法的具体细节我想放在下一篇再细讲,关乎消费侧的客户端与Kafka服务端的通信模型。这一篇我们主要从宏观的角度来剖析下Consumer消费端的源码。二、KafkaConsumer分析
  我们先来看下Consumer接口,该接口定义了KafkaConsumer对外的API,其核心方法可以分为以下六类:subscribe()方法:订阅指定的Topic,并为消费者自动分配分区。assign()方法:用户手动订阅指定的Topic,并且指定消费的分区,此方法subscribe()方法互斥。poll()方法:负责从服务端获取消息。commit()方法:提交消费者已经消费完成的offset。seek()方法:指定消费者起始消费的位置。pause()、resume()方法:暂停、继续Consumer,暂停后poll()方法会返回空。
  我们先来看下KafkaConsumer的重要属性以及UML结构图。
  clientId:Consumer的唯一标识。groupId:消费者组的唯一标识。coordinator:控制着Consumer与服务端GroupCoordinator之间的通信逻辑,读者可以理解为Consumer与服务端GroupCoordinator通信的门面。keyDeserializer、valueDeserializer:key和value的反序列化器。fetcher:负责从服务端获取消息。interceptors:ConsumerInterceptors集合,ConsumerInterceptors。onConsumer()方法可以在消息通过poll()方法返回给用户之前对其进行拦截或修改;ConsumerInterceptors。onCommit()方法也可以在服务端返回提交offset成功的响应进行拦截或修改。client:ConsumerNetworkClient负责消费者与Kafka服务端的网络通信。subscriptions:SubscriptionState维护了消费者的消费状态。metadata:ConsumerMetadata记录了整个Kafka集群的元信息。currentThread、refcount:分别记录的KafkaConsumer的线程id和重入次数三、ConsumerNetworkClient
  ConsumerNetworkClient在NetworkClient之上进行了封装,提供了更高级的功能和更易用的API。
  我们先来看下ConsumerNetworkClient的重要属性以及UML结构图。
  client:NetworkClient对象。unsent:缓冲队列。UnsentRequests对象,该对象内部维护了一个unsent属性,该属性是ConcurrentMap,key是Node节点,value是ConcurrentLinkedQueue。metadata:用于管理Kafka集群元数据。retryBackoffMs:在尝试重试对给定主题分区的失败请求之前等待的时间量,这避免了在某些故障情况下在紧密循环中重复发送请求。对应retry。backoff。ms配置,默认100ms。maxPollTimeoutMs:使用Kafka的组管理工具时,消费者协调器的心跳之间的预期时间。心跳用于确保消费者的会话保持活跃,并在新消费者加入或离开组时促进重新平衡。该值必须设置为低于session。timeout。ms,但通常不应设置为高于该值的13。它可以调整得更低,以控制正常重新平衡的预期时间。对应heartbeat。interval。ms配置,默认3000ms。构造函数中,maxPollTimeoutMs取的是maxPollTimeoutMs与MAXPOLLTIMEOUTMS的最小值,MAXPOLLTIMEOUTMS默认为5000ms。requestTimeoutMs:配置控制客户端等待请求响应的最长时间。如果在超时之前没有收到响应,客户端将在必要时重新发送请求,或者如果重试用尽,则请求失败。对应request。timeout。ms配置,默认305000ms。wakeupDisabled:由调用KafkaConsumer对象的消费者线程之外的其它线程设置,表示要中断KafkaConsumer线程。lock:我们不需要高吞吐量,所以使用公平锁来尽量避免饥饿。pendingCompletion:当请求完成时,它们在调用之前被转移到这个队列。目的是避免在持有此对象的监视器时调用它们,这可能会为死锁打开门。pendingDisconnects:断开与协调器连接节点的队列。wakeup:这个标志允许客户端被安全唤醒而无需等待上面的锁。为了同时启用它,避免需要获取上面的锁是原子的。
  ConsumerNetworkClient的核心方法是poll()方法,poll()方法有很多重载方法,最终会调用poll(Timertimer,PollConditionpollCondition,booleandisableWakeup)方法,这三个参数含义是:timer表示定时器限制此方法可以阻塞多长时间;pollCondition表示可空阻塞条件;disableWakeup表示如果true禁用触发唤醒。
  我们来简单回顾下ConsumerNetworkClient的功能:3。1org。apache。kafka。clients。consumer。internals。ConsumerNetworkClienttrySend
  循环处理unsent中缓存的请求,对每个Node节点,循环遍历其ClientRequest链表,每次循环都调用NetworkClient。ready()方法检测消费者与此节点之间的连接,以及发送请求的条件。若符合条件,则调用NetworkClient。send()方法将请求放入InFlightRequest中等待响应,也放入KafkaChannel中的send字段等待发送,并将消息从列表中删除。代码如下:longtrySend(longnow){longpollDelayMsmaxPollTimeoutMs;sendanyrequeststhatcanbesentnow遍历unsent集合for(Nodenode:unsent。nodes()){IteratorClientRequestiteratorunsent。requestIterator(node);if(iterator。hasNext())pollDelayMsMath。min(pollDelayMs,client。pollDelayMs(node,now));while(iterator。hasNext()){ClientRequestrequestiterator。next();调用NetworkClient。ready()检查是否可以发送请求if(client。ready(node,now)){调用NetworkClient。send()方法,等待发送请求。client。send(request,now);从unsent集合中删除此请求iterator。remove();}else{trynextnodewhencurrentnodeisnotreadybreak;}}}returnpollDelayMs;}3。2计算超时时间
  如果没有请求在进行中,则阻塞时间不要超过重试退避时间。3。3org。apache。kafka。clients。NetworkClientpoll判断是否需要更新metadata元数据调用Selector。poll()进行socket相关的IO操作处理完成后的操作(处理一系列handle()方法处理请求响应、连接断开、超时等情况,并调用每个请求的回调函数)3。4调用checkDisconnects()方法检测连接状态
  调用checkDisconnects()方法检测连接状态。检测消费者与每个Node之间的连接状态,当检测到连接断开的Node时,会将其在unsent集合中对应的全部ClientRequest对象清除掉,之后调用这些ClientRequest的回调函数。privatevoidcheckDisconnects(longnow){anydisconnectsaffectingrequeststhathavealreadybeentransmittedwillbehandledbyNetworkClient,sowejustneedtocheckwhetherconnectionsforanyoftheunsentrequestshavebeendisconnected;iftheyhave,thenwecompletethecorrespondingfutureandsetthedisconnectflagintheClientResponsefor(Nodenode:unsent。nodes()){检测消费者与每个Node之间的连接状态if(client。connectionFailed(node)){Removeentrybeforeinvokingrequestcallbacktoavoidcallbackshandlingcoordinatorfailurestraversingtheunsentlistagain。在调用请求回调之前删除条目以避免回调处理再次遍历未发送列表的协调器故障。CollectionClientRequestrequestsunsent。remove(node);for(ClientRequestrequest:requests){RequestFutureCompletionHandlerhandler(RequestFutureCompletionHandler)request。callback();AuthenticationExceptionauthenticationExceptionclient。authenticationException(node);调用ClientRequest的回调函数handler。onComplete(newClientResponse(request。makeHeader(request。requestBuilder()。latestAllowedVersion()),request。callback(),request。destination(),request。createdTimeMs(),now,true,null,authenticationException,null));}}}}3。5org。apache。kafka。clients。consumer。internals。ConsumerNetworkClientmaybeTriggerWakeup
  检查wakeupDisabled和wakeup,查看是否有其它线程中断。如果有中断请求,则抛出WakeupException异常,中断当前ConsumerNetworkClient。poll()方法。publicvoidmaybeTriggerWakeup(){通过wakeupDisabled检测是否在执行不可中断的方法,通过wakeup检测是否有中断请求。if(!wakeupDisabled。get()wakeup。get()){log。debug(RaisingWakeupExceptioninresponsetouserwakeup);重置中断标志wakeup。set(false);thrownewWakeupException();}}3。6再次调用trySend()方法
  再次调用trySend()方法。在步骤2。1。3中调用了NetworkClient。poll()方法,在其中可能已经将KafkaChannel。send字段上的请求发送出去了,也可能已经新建了与某些Node的网络连接,所以这里再次尝试调用trySend()方法。3。7org。apache。kafka。clients。consumer。internals。ConsumerNetworkClientfailExpiredRequests
  处理unsent中超时请求。它会循环遍历整个unsent集合,检测每个ClientRequest是否超时,将过期请求加入到expiredRequests集合,并将其从unsent集合中删除。调用超时ClientRequest的回调函数onFailure()。privatevoidfailExpiredRequests(longnow){clearallexpiredunsentrequestsandfailtheircorrespondingfutures清除所有过期的未发送请求并使其相应的futures失败CollectionClientRequestexpiredRequestsunsent。removeExpiredRequests(now);for(ClientRequestrequest:expiredRequests){RequestFutureCompletionHandlerhandler(RequestFutureCompletionHandler)request。callback();调用回调函数handler。onFailure(newTimeoutException(Failedtosendrequestafterrequest。requestTimeoutMs()ms。));}}privateCollectionClientRequestremoveExpiredRequests(longnow){ListClientRequestexpiredRequestsnewArrayList();for(ConcurrentLinkedQueueClientRequestrequests:unsent。values()){IteratorClientRequestrequestIteratorrequests。iterator();while(requestIterator。hasNext()){ClientRequestrequestrequestIterator。next();检查是否超时longelapsedMsMath。max(0,nowrequest。createdTimeMs());if(elapsedMsrequest。requestTimeoutMs()){将过期请求加入到expiredRequests集合expiredRequests。add(request);requestIterator。remove();}elsebreak;}}returnexpiredRequests;}四、RequestFutureCompletionHandler
  说RequestFutureCompletionHandler之前,我们先来看下ConsumerNetworkClient。send()方法。里面的逻辑会将待发送的请求封装成ClientRequest,然后保存到unsent集合中等待发送,代码如下:publicRequestFutureClientResponsesend(Nodenode,AbstractRequest。Builderlt;?requestBuilder,intrequestTimeoutMs){longnowtime。milliseconds();RequestFutureCompletionHandlercompletionHandlernewRequestFutureCompletionHandler();ClientRequestclientRequestclient。newClientRequest(node。idString(),requestBuilder,now,true,requestTimeoutMs,completionHandler);创建clientRequest对象,并保存到unsent集合中。unsent。put(node,clientRequest);wakeuptheclientincaseitisblockinginpollsothatwecansendthequeuedrequest唤醒客户端以防它在轮询中阻塞,以便我们可以发送排队的请求。client。wakeup();returncompletionHandler。future;}
  我们重点来关注一下ConsumerNetworkClient中使用的回调对象RequestFutureCompletionHandler。其继承关系如下:
  从RequestFutureCompletionHandler继承关系图我们可以知道,它不仅实现了RequestCompletionHandler接口,还组合了RequestFuture类,RequestFuture是一个泛型类,其核心字段与方法如下:listeners:RequestFutureListener队列,用来监听请求完成的情况。RequestFutureListener接口有onSuccess()和onFailure()两个方法,对应于请求正常完成和出现异常两种情况。isDone():表示当前请求是否已经完成,不管正常完成还是出现异常,此字段都会被设置为true。value():记录请求正常完成时收到的响应,与exception()方法互斥。此字段非空表示正常完成,反之表示出现异常。exception():记录导致请求异常完成的异常类,与value()互斥。此字段非空则表示出现异常,反之则表示正常完成。
  我们之所以要分析源码,是因为源码中有很多设计模式可以借鉴,应用到你自己的工作中。RequestFuture中有两处典型的设计模式的使用,我们来看一下:compose()方法:使用了适配器模式。chain()方法:使用了责任链模式。4。1RequestFuture。compose()适配器Adaptfromarequestfutureofonetypetoanother。paramFTypetoadaptfromparamTTypetoadapttopublicabstractclassRequestFutureAdapterF,T{publicabstractvoidonSuccess(Fvalue,RequestFutureTfuture);publicvoidonFailure(RuntimeExceptione,RequestFutureTfuture){future。raise(e);}}RequestFutureT适配成RequestFutureSConvertfromarequestfutureofonetypetoanothertypeparamadapterTheadapterwhichdoestheconversionparamSThetypeofthefutureadaptedtoreturnThenewfuturepublicSRequestFutureScompose(finalRequestFutureAdapterT,Sadapter){适配之后的结果finalRequestFutureSadaptednewRequestFuture();在当前RequestFuture上添加监听器addListener(newRequestFutureListenerT(){OverridepublicvoidonSuccess(Tvalue){adapter。onSuccess(value,adapted);}OverridepublicvoidonFailure(RuntimeExceptione){adapter。onFailure(e,adapted);}});returnadapted;}
  使用compose()方法进行适配后,回调时的调用过程,也可以认为是请求完成的事件传播流程。当调用RequestFuture对象的complete()或raise()方法时,会调用RequestFutureListener的onSuccess()或onFailure()方法,然后调用RequestFutureAdapterT,S的对应方法,最终调用RequestFuture对象的对应方法。
  4。2RequestFuture。chain()
  chain()方法与compose()方法类似,也是通过RequestFutureListener在多个RequestFuture之间传递事件。代码如下:publicvoidchain(finalRequestFutureTfuture){添加监听器addListener(newRequestFutureListenerT(){OverridepublicvoidonSuccess(Tvalue){通过监听器将value传递给下一个RequestFuture对象future。complete(value);}OverridepublicvoidonFailure(RuntimeExceptione){通过监听器将异常传递给下一个RequestFuture对象future。raise(e);}});}
  好了,ConsumerNetworkClient的源码分析告一段落了,希望文章对你有帮助,我们下期再见。

西门子与矿冶集团中技公司等多家中国企业在进博会上达成合作协议11月6日,西门子中国在第五届中国国际进口博览会上与吉香居食品矿冶集团通用技术集团中国技术进出口有限公司等多家中国企业达成合作协议。西门子与吉香居食品股份有限公司(吉香居)签署战略假发之乡小提琴之乡中国产业集群的活力从哪里来中国用了短短40年时间实现了工业化,走过了欧美200多年工业化的历程,这是人类历史上前所未有的。传统经济理论认为工业化需要两大必要条件,完善的产权制度和金融制度,可是改革初期,这些西宁11月房价出炉,新房价格区域分化严重,有区暴涨也有区暴跌根据吉屋网统计数据,西宁11月新房均价9948元,环比下跌0。2,二手房均价10030元,环比下降0。2。从房价走势可知,一方面,西宁新房连续几个月横盘震荡,震荡幅度也很小,二手房全方位展现陆海空!第五届全球无人系统大会将首秀无人战法南方财经全媒体记者彭敏静珠海报道11月4日,记者从全球无人系统大会组委会获悉,2022第五届全球无人系统大会将于11月16日18日在珠海举行。本届大会将集论坛赛事展示演练战法推介考裁员破产股价暴跌,自动驾驶开向寒冬作者夏言近日,自动驾驶独角兽ArgoAI官宣关闭。ArgoAI曾先后获得福特10亿美元大众26亿美元的投资,估值一度达到73亿美元它描绘的前沿技术和飙涨的估值,也曾鼓舞了众多押注自西媒科学家解释恋爱为何会让人变胖西班牙世界报网站3日发表题为以公斤计算的爱我们在恋爱中这样变胖的文章,全文摘编如下西班牙肥胖症研究学会(SEEDO)进行的一项医学调查显示,恋爱带来的体重增长平均约4。5公斤。这一python获取索引位置的3种方式python获取列表中指定元素在列表(或字符串)中的位置(索引)方法总结一。find()注意只适用于字符串(只能返回第一次出现的字符索引!!!)stevenHelloword获取0元宇宙的商业奥秘为什么LVZNFT耐克等大牌都要挤进去?近两年,元宇宙(Metaverse)一词在媒体上频繁的出现。机构彭博分析后认为,到2024年元宇宙的市场规模将达到8000亿美元。麦肯锡发报告表示,到2030年全球元宇宙市场的规模双十一手机挑花眼?安兔兔10月好评榜公布,国产旗舰强势来袭许多朋友都想在双十一换新手机,但面对琳琅满目的产品不知道该怎么选,本篇文章跟大家分享安兔兔10好评榜机型,高口碑国产机不会有错。小米12SUltra再次以95。68分成绩夺冠,自发钱学森预言再次成真,VR技术迎来技术大变革上世纪90年代,钱学森曾预言VR将是计算机技术之后的,又一项技术革命,为了让国家重视这项技术,钱学森将它形容为一项可以震撼全世界的变革,是人类历史上的大事,甚至为了让我们更好理解V鄂西页岩气地质资源潜力达11。68万亿立方米湖北页岩气产业底气足页岩气,赋存于富有机质泥页岩及其夹层中,是以吸附和游离状态为主要存在方式的非常规天然气。与常规天然气相比,页岩气开发具有开采寿命长和生产周期长的优点,加快页岩气勘探开发和利用,对满
江苏宿迁新首富发家史,出身贫寒,抓住机遇,如今身价125亿元从上世纪80年代初改革开放起,有一大批有志青年放弃稳定安逸的工作,加入到创业大军的队伍当中,他们的出现,成为了国内民营经济高速发展的中流砥柱。我们历时数年时间,研究了上百位优秀民营前F1老板发话!乔丹汉密尔顿在梅奔已经不是主角,拉塞尔才是北京时间6月3日,据英国媒体透露,前F1乔丹车队的老板埃迪乔丹在接受采访的时候表示,目前的F1梅奔车队中,汉密尔顿已经不是boss,拉塞尔才是。尽管本赛季刘易斯汉密尔顿落后自己的队男生为何会嫖娼上瘾,难以自拔?别因为一时冲动,害了自己导语相信大家经常在报纸或者新闻上看到扫黄现场,然后看到某某,某某嫖娼被抓,拘留十天,罚款三千块等事情,!我国法律规定了,嫖娼是一种违法的行为,那么为何又有那么多人明知是违法还要去做天热别忘做这个吃,放碗中一蒸,浇上料一拌酸爽开胃,比凉粉好吃莫愁厨路无知己,谁人不识小面姨。大家好,我是小面姨。今天小面姨给大家分享一道荞麦碗托的,美食做法。今天是一年一度的端午节,按照我家的惯例,每年的端午节都要吃家庭团圆饭,就像过年一样北方中学生笔下和手中的端午节端午节到来之际,为弘扬传统文化,纪念屈原,保留民俗传统,让中学生更多地了解并传承中华文化,四平市第十七中学校组织系列活动通过亲自动手和绘画过好别样的端午节。端午节,又称端阳节龙舟节我70后河南人,科威特打工,月薪20000变2000,没钱回家绝处逢生这是我们讲述的第430位真人的故事我是老虎我的子弹會拐弯,70后河南人,现在在科威特生活。大学毕业那年,父亲突遭车祸去世。作为长子,我不得不承担起家庭的重担。曾做过销售,开过出租,热心老陈翻车了?斜杠青年也要注意了最近热心老陈翻车了,自从反诈老陈爆红到辞职,网上一直有太多解读。与其说每个人都觉得自己说得对,不如说大家都是来蹭话题流量更恰当。毕竟再多的你觉得,你认为都不是当事人的真相。而更多人纸质优惠券会梦见疯狂星期四吗当我们谈起肯德基和麦当劳,你会第一时间想起什么?相较于改了又改的门牌和永远都在限时返场的人气产品,人们现在似乎更愿意将目光停留在那些查重率100的疯狂星期四文案上。哪怕你只要看第一勇士末节崩盘两大金身告破绿军对攻不怂!还让你一个塔图姆NBA总决赛首场,勇士在主场108120惨遭凯尔特人逆转,总比分01落后。这场失败也打破了勇士的两大金身。赛前,勇士在季后赛主场9战全胜,而历史上没有哪支球队在季后赛主场取得超过1于晓光端午陪儿子抓鱼,4岁啪嗒罕出镜,秋瓷炫视角下父子超有爱6月3日端午节,演员于晓光罕见在个人社交平台分享了一组秋瓷炫视角下与儿子啪嗒的有爱合照秀幸福,虽然他没有配上任何文字,但父子俩久违同框,画面还是温馨又幸福。晒出的照片里,于晓光穿着42岁殷桃素颜现身街头,和男友人聊天被拍,二人乘豪车离开引猜测近日,有媒体拍到知名女星殷桃现身街头的画面,让大家一睹女神私下状态,只见她穿着一身黑色运动装,随意扎着丸子头,一脸素颜打扮低调,站在男友人面前颇显乖巧,瞬间化身为邻家女孩。当时,殷
友情链接:快好找快生活快百科快传网中准网文好找聚热点快软网