一、Consumer的使用 Consumer的源码解析主要来看KafkaConsumer,KafkaConsumer是Consumer接口的实现类。KafkaConsumer提供了一套封装良好的API,开发人员可以基于这套API轻松实现从Kafka服务端拉取消息的功能,这样开发人员根本不用关心与Kafka服务端之间网络连接的管理、心跳检测、请求超时重试等底层操作,也不必关心订阅Topic的分区数量、分区副本的网络拓扑以及ConsumerGroup的Rebalance等Kafka具体细节,KafkaConsumer中还提供了自动提交offset的功能,使的开发人员更加关注业务逻辑,提高了开发效率。 下面我们来看一个KafkaConsumer的示例程序:author:微信公众号【老周聊架构】publicclassKafkaConsumerTest{publicstaticvoidmain(String〔〕args){PropertiespropsnewProperties();kafka地址,列表格式为host1:port1,host2:port2,。。。,无需添加所有的集群地址,kafka会根据提供的地址发现其他的地址(建议多提供几个,以防提供的服务器关闭)必须设置props。put(bootstrap。servers,localhost:9092);key序列化方式必须设置props。put(key。serializer,org。apache。kafka。common。serialization。StringSerializer);value序列化方式必须设置props。put(value。serializer,org。apache。kafka。common。serialization。StringSerializer);props。put(group。id,consumerriemanntest);KafkaConsumerString,StringconsumernewKafkaConsumer(props);可消费多个topic,组成一个listStringtopicriemannkafkatest;consumer。subscribe(Arrays。asList(topic));while(true){ConsumerRecordsString,Stringrecordsconsumer。poll(Duration。ofMillis(100));for(ConsumerRecordString,Stringrecord:records){System。out。printf(offsetd,keys,values,record。offset(),record。key(),record。value());try{Thread。sleep(100);}catch(InterruptedExceptione){e。printStackTrace();}}}}} 从示例中可以看出KafkaConsumer的核心方法是poll(),它负责从Kafka服务端拉取消息。核心方法的具体细节我想放在下一篇再细讲,关乎消费侧的客户端与Kafka服务端的通信模型。这一篇我们主要从宏观的角度来剖析下Consumer消费端的源码。二、KafkaConsumer分析 我们先来看下Consumer接口,该接口定义了KafkaConsumer对外的API,其核心方法可以分为以下六类:subscribe()方法:订阅指定的Topic,并为消费者自动分配分区。assign()方法:用户手动订阅指定的Topic,并且指定消费的分区,此方法subscribe()方法互斥。poll()方法:负责从服务端获取消息。commit()方法:提交消费者已经消费完成的offset。seek()方法:指定消费者起始消费的位置。pause()、resume()方法:暂停、继续Consumer,暂停后poll()方法会返回空。 我们先来看下KafkaConsumer的重要属性以及UML结构图。 clientId:Consumer的唯一标识。groupId:消费者组的唯一标识。coordinator:控制着Consumer与服务端GroupCoordinator之间的通信逻辑,读者可以理解为Consumer与服务端GroupCoordinator通信的门面。keyDeserializer、valueDeserializer:key和value的反序列化器。fetcher:负责从服务端获取消息。interceptors:ConsumerInterceptors集合,ConsumerInterceptors。onConsumer()方法可以在消息通过poll()方法返回给用户之前对其进行拦截或修改;ConsumerInterceptors。onCommit()方法也可以在服务端返回提交offset成功的响应进行拦截或修改。client:ConsumerNetworkClient负责消费者与Kafka服务端的网络通信。subscriptions:SubscriptionState维护了消费者的消费状态。metadata:ConsumerMetadata记录了整个Kafka集群的元信息。currentThread、refcount:分别记录的KafkaConsumer的线程id和重入次数三、ConsumerNetworkClient ConsumerNetworkClient在NetworkClient之上进行了封装,提供了更高级的功能和更易用的API。 我们先来看下ConsumerNetworkClient的重要属性以及UML结构图。 client:NetworkClient对象。unsent:缓冲队列。UnsentRequests对象,该对象内部维护了一个unsent属性,该属性是ConcurrentMap,key是Node节点,value是ConcurrentLinkedQueue。metadata:用于管理Kafka集群元数据。retryBackoffMs:在尝试重试对给定主题分区的失败请求之前等待的时间量,这避免了在某些故障情况下在紧密循环中重复发送请求。对应retry。backoff。ms配置,默认100ms。maxPollTimeoutMs:使用Kafka的组管理工具时,消费者协调器的心跳之间的预期时间。心跳用于确保消费者的会话保持活跃,并在新消费者加入或离开组时促进重新平衡。该值必须设置为低于session。timeout。ms,但通常不应设置为高于该值的13。它可以调整得更低,以控制正常重新平衡的预期时间。对应heartbeat。interval。ms配置,默认3000ms。构造函数中,maxPollTimeoutMs取的是maxPollTimeoutMs与MAXPOLLTIMEOUTMS的最小值,MAXPOLLTIMEOUTMS默认为5000ms。requestTimeoutMs:配置控制客户端等待请求响应的最长时间。如果在超时之前没有收到响应,客户端将在必要时重新发送请求,或者如果重试用尽,则请求失败。对应request。timeout。ms配置,默认305000ms。wakeupDisabled:由调用KafkaConsumer对象的消费者线程之外的其它线程设置,表示要中断KafkaConsumer线程。lock:我们不需要高吞吐量,所以使用公平锁来尽量避免饥饿。pendingCompletion:当请求完成时,它们在调用之前被转移到这个队列。目的是避免在持有此对象的监视器时调用它们,这可能会为死锁打开门。pendingDisconnects:断开与协调器连接节点的队列。wakeup:这个标志允许客户端被安全唤醒而无需等待上面的锁。为了同时启用它,避免需要获取上面的锁是原子的。 ConsumerNetworkClient的核心方法是poll()方法,poll()方法有很多重载方法,最终会调用poll(Timertimer,PollConditionpollCondition,booleandisableWakeup)方法,这三个参数含义是:timer表示定时器限制此方法可以阻塞多长时间;pollCondition表示可空阻塞条件;disableWakeup表示如果true禁用触发唤醒。 我们来简单回顾下ConsumerNetworkClient的功能:3。1org。apache。kafka。clients。consumer。internals。ConsumerNetworkClienttrySend 循环处理unsent中缓存的请求,对每个Node节点,循环遍历其ClientRequest链表,每次循环都调用NetworkClient。ready()方法检测消费者与此节点之间的连接,以及发送请求的条件。若符合条件,则调用NetworkClient。send()方法将请求放入InFlightRequest中等待响应,也放入KafkaChannel中的send字段等待发送,并将消息从列表中删除。代码如下:longtrySend(longnow){longpollDelayMsmaxPollTimeoutMs;sendanyrequeststhatcanbesentnow遍历unsent集合for(Nodenode:unsent。nodes()){IteratorClientRequestiteratorunsent。requestIterator(node);if(iterator。hasNext())pollDelayMsMath。min(pollDelayMs,client。pollDelayMs(node,now));while(iterator。hasNext()){ClientRequestrequestiterator。next();调用NetworkClient。ready()检查是否可以发送请求if(client。ready(node,now)){调用NetworkClient。send()方法,等待发送请求。client。send(request,now);从unsent集合中删除此请求iterator。remove();}else{trynextnodewhencurrentnodeisnotreadybreak;}}}returnpollDelayMs;}3。2计算超时时间 如果没有请求在进行中,则阻塞时间不要超过重试退避时间。3。3org。apache。kafka。clients。NetworkClientpoll判断是否需要更新metadata元数据调用Selector。poll()进行socket相关的IO操作处理完成后的操作(处理一系列handle()方法处理请求响应、连接断开、超时等情况,并调用每个请求的回调函数)3。4调用checkDisconnects()方法检测连接状态 调用checkDisconnects()方法检测连接状态。检测消费者与每个Node之间的连接状态,当检测到连接断开的Node时,会将其在unsent集合中对应的全部ClientRequest对象清除掉,之后调用这些ClientRequest的回调函数。privatevoidcheckDisconnects(longnow){anydisconnectsaffectingrequeststhathavealreadybeentransmittedwillbehandledbyNetworkClient,sowejustneedtocheckwhetherconnectionsforanyoftheunsentrequestshavebeendisconnected;iftheyhave,thenwecompletethecorrespondingfutureandsetthedisconnectflagintheClientResponsefor(Nodenode:unsent。nodes()){检测消费者与每个Node之间的连接状态if(client。connectionFailed(node)){Removeentrybeforeinvokingrequestcallbacktoavoidcallbackshandlingcoordinatorfailurestraversingtheunsentlistagain。在调用请求回调之前删除条目以避免回调处理再次遍历未发送列表的协调器故障。CollectionClientRequestrequestsunsent。remove(node);for(ClientRequestrequest:requests){RequestFutureCompletionHandlerhandler(RequestFutureCompletionHandler)request。callback();AuthenticationExceptionauthenticationExceptionclient。authenticationException(node);调用ClientRequest的回调函数handler。onComplete(newClientResponse(request。makeHeader(request。requestBuilder()。latestAllowedVersion()),request。callback(),request。destination(),request。createdTimeMs(),now,true,null,authenticationException,null));}}}}3。5org。apache。kafka。clients。consumer。internals。ConsumerNetworkClientmaybeTriggerWakeup 检查wakeupDisabled和wakeup,查看是否有其它线程中断。如果有中断请求,则抛出WakeupException异常,中断当前ConsumerNetworkClient。poll()方法。publicvoidmaybeTriggerWakeup(){通过wakeupDisabled检测是否在执行不可中断的方法,通过wakeup检测是否有中断请求。if(!wakeupDisabled。get()wakeup。get()){log。debug(RaisingWakeupExceptioninresponsetouserwakeup);重置中断标志wakeup。set(false);thrownewWakeupException();}}3。6再次调用trySend()方法 再次调用trySend()方法。在步骤2。1。3中调用了NetworkClient。poll()方法,在其中可能已经将KafkaChannel。send字段上的请求发送出去了,也可能已经新建了与某些Node的网络连接,所以这里再次尝试调用trySend()方法。3。7org。apache。kafka。clients。consumer。internals。ConsumerNetworkClientfailExpiredRequests 处理unsent中超时请求。它会循环遍历整个unsent集合,检测每个ClientRequest是否超时,将过期请求加入到expiredRequests集合,并将其从unsent集合中删除。调用超时ClientRequest的回调函数onFailure()。privatevoidfailExpiredRequests(longnow){clearallexpiredunsentrequestsandfailtheircorrespondingfutures清除所有过期的未发送请求并使其相应的futures失败CollectionClientRequestexpiredRequestsunsent。removeExpiredRequests(now);for(ClientRequestrequest:expiredRequests){RequestFutureCompletionHandlerhandler(RequestFutureCompletionHandler)request。callback();调用回调函数handler。onFailure(newTimeoutException(Failedtosendrequestafterrequest。requestTimeoutMs()ms。));}}privateCollectionClientRequestremoveExpiredRequests(longnow){ListClientRequestexpiredRequestsnewArrayList();for(ConcurrentLinkedQueueClientRequestrequests:unsent。values()){IteratorClientRequestrequestIteratorrequests。iterator();while(requestIterator。hasNext()){ClientRequestrequestrequestIterator。next();检查是否超时longelapsedMsMath。max(0,nowrequest。createdTimeMs());if(elapsedMsrequest。requestTimeoutMs()){将过期请求加入到expiredRequests集合expiredRequests。add(request);requestIterator。remove();}elsebreak;}}returnexpiredRequests;}四、RequestFutureCompletionHandler 说RequestFutureCompletionHandler之前,我们先来看下ConsumerNetworkClient。send()方法。里面的逻辑会将待发送的请求封装成ClientRequest,然后保存到unsent集合中等待发送,代码如下:publicRequestFutureClientResponsesend(Nodenode,AbstractRequest。Builderlt;?requestBuilder,intrequestTimeoutMs){longnowtime。milliseconds();RequestFutureCompletionHandlercompletionHandlernewRequestFutureCompletionHandler();ClientRequestclientRequestclient。newClientRequest(node。idString(),requestBuilder,now,true,requestTimeoutMs,completionHandler);创建clientRequest对象,并保存到unsent集合中。unsent。put(node,clientRequest);wakeuptheclientincaseitisblockinginpollsothatwecansendthequeuedrequest唤醒客户端以防它在轮询中阻塞,以便我们可以发送排队的请求。client。wakeup();returncompletionHandler。future;} 我们重点来关注一下ConsumerNetworkClient中使用的回调对象RequestFutureCompletionHandler。其继承关系如下: 从RequestFutureCompletionHandler继承关系图我们可以知道,它不仅实现了RequestCompletionHandler接口,还组合了RequestFuture类,RequestFuture是一个泛型类,其核心字段与方法如下:listeners:RequestFutureListener队列,用来监听请求完成的情况。RequestFutureListener接口有onSuccess()和onFailure()两个方法,对应于请求正常完成和出现异常两种情况。isDone():表示当前请求是否已经完成,不管正常完成还是出现异常,此字段都会被设置为true。value():记录请求正常完成时收到的响应,与exception()方法互斥。此字段非空表示正常完成,反之表示出现异常。exception():记录导致请求异常完成的异常类,与value()互斥。此字段非空则表示出现异常,反之则表示正常完成。 我们之所以要分析源码,是因为源码中有很多设计模式可以借鉴,应用到你自己的工作中。RequestFuture中有两处典型的设计模式的使用,我们来看一下:compose()方法:使用了适配器模式。chain()方法:使用了责任链模式。4。1RequestFuture。compose()适配器Adaptfromarequestfutureofonetypetoanother。paramFTypetoadaptfromparamTTypetoadapttopublicabstractclassRequestFutureAdapterF,T{publicabstractvoidonSuccess(Fvalue,RequestFutureTfuture);publicvoidonFailure(RuntimeExceptione,RequestFutureTfuture){future。raise(e);}}RequestFutureT适配成RequestFutureSConvertfromarequestfutureofonetypetoanothertypeparamadapterTheadapterwhichdoestheconversionparamSThetypeofthefutureadaptedtoreturnThenewfuturepublicSRequestFutureScompose(finalRequestFutureAdapterT,Sadapter){适配之后的结果finalRequestFutureSadaptednewRequestFuture();在当前RequestFuture上添加监听器addListener(newRequestFutureListenerT(){OverridepublicvoidonSuccess(Tvalue){adapter。onSuccess(value,adapted);}OverridepublicvoidonFailure(RuntimeExceptione){adapter。onFailure(e,adapted);}});returnadapted;} 使用compose()方法进行适配后,回调时的调用过程,也可以认为是请求完成的事件传播流程。当调用RequestFuture对象的complete()或raise()方法时,会调用RequestFutureListener的onSuccess()或onFailure()方法,然后调用RequestFutureAdapterT,S的对应方法,最终调用RequestFuture对象的对应方法。 4。2RequestFuture。chain() chain()方法与compose()方法类似,也是通过RequestFutureListener在多个RequestFuture之间传递事件。代码如下:publicvoidchain(finalRequestFutureTfuture){添加监听器addListener(newRequestFutureListenerT(){OverridepublicvoidonSuccess(Tvalue){通过监听器将value传递给下一个RequestFuture对象future。complete(value);}OverridepublicvoidonFailure(RuntimeExceptione){通过监听器将异常传递给下一个RequestFuture对象future。raise(e);}});} 好了,ConsumerNetworkClient的源码分析告一段落了,希望文章对你有帮助,我们下期再见。