聊聊KafkaProducer源码解析
一、前言
前面几篇我们讲了关于Kafka的基础架构以及搭建,从这篇开始我们就来源码分析一波。我们这用的Kafka版本是2。7。0,其Client端是由Java实现,Server端是由Scala来实现的,在使用Kafka时,Client是用户最先接触到的部分,因此,我们从Client端开始,会先从Producer端开始,今天我们就来对Producer源码解析一番。二、Producer使用
首先我们先通过一段代码来展示KafkaProducer的使用方法。在下面的示例中,我们使用KafkaProducer实现向Kafka发送消息的功能。在示例程序中,首先将KafkaProduce使用的配置写入
到Properties中,每项配置的具体含义在注释中进行解释。之后以此Properties对象为参数构造KafkaProducer对象,最后通过send方法完成发送,代码中包含同步发送、异步发送两种情况。
从上面的代码可以看出Kafka为用户提供了非常简洁方便的API,在使用时,只需要如下两步:初始化KafkaProducer实例调用send接口发送数据
本文主要是围绕着初始化KafkaProducer实例与如何实现send接口发送数据而展开的。三、KafkaProducer实例化
了解了KafkaProducer的基本使用,然后我们来深入了解下方法核心逻辑:publicKafkaProducer(Propertiesproperties){this(Utils。propsToMap(properties),(Serializer)null,(Serializer)null,(ProducerMetadata)null,(KafkaClient)null,(ProducerInterceptors)null,Time。SYSTEM);}
四、消息发送过程
用户是直接使用producer。send()发送的数据,先看一下send()接口的实现异步向一个topic发送数据publicFutureRecordMetadatasend(ProducerRecordK,Vrecord){returnthis。send(record,(Callback)null);}向topic异步地发送数据,当发送确认后唤起回调函数publicFutureRecordMetadatasend(ProducerRecordK,Vrecord,Callbackcallback){ProducerRecordK,VinterceptedRecordthis。interceptors。onSend(record);returnthis。doSend(interceptedRecord,callback);}
数据发送的最终实现还是调用了Producer的doSend()接口。
4。1拦截器
首先方法会先进入拦截器集合ProducerInterceptors,onSend方法是遍历拦截器onSend方法,拦截器的目的是将数据处理加工,Kafka本身并没有给出默认的拦截器的实现。如果需要使用拦截器功能,必须自己实现接口。
4。1。1拦截器代码
4。1。2拦截器核心逻辑
ProducerInterceptor接口包括三个方法:onSend(ProducerRecordvar1):该方法封装进KafkaProducer。send方法中,即它运行在用户主线程中的。确保在消息被序列化以计算分区前调用该方法。用户可以在该方法中对消息做任何操作,但最好保证不要修改消息所属的topic和分区,否则会影响目标分区的计算。onAcknowledgement(RecordMetadatavar1,Exceptionvar2):该方法会在消息被应答之前或消息发送失败时调用,并且通常都是在producer回调逻辑触发之前。onAcknowledgement运行在producer的IO线程中,因此不要在该方法中放入很重的逻辑,否则会拖慢producer的消息发送效率。close():关闭interceptor,主要用于执行一些资源清理工作。
拦截器可能被运行在多个线程中,因此在具体实现时用户需要自行确保线程安全。另外倘若指定了多个interceptor,则producer将按照指定顺序调用它们,并仅仅是捕获每个interceptor可能抛出的异常记录到错误日志中而非在向上传递。
4。2Producer的doSend实现
下面是doSend()的具体实现:
在doSend()方法的实现上,一条Record数据的发送,主要分为以下五步:确认数据要发送到的topic的metadata是可用的(如果该partition的leader存在则是可用的,如果开启权限时,client有相应的权限),如果没有topic的metadata信息,就需要获取相应的metadata;序列化record的key和value;获取该record要发送到的partition(可以指定,也可以根据算法计算);向accumulator中追加record数据,数据会先进行缓存;如果追加完数据后,对应的RecordBatch已经达到了batch。size的大小(或者batch的剩余空间不足以添加下一条Record),则唤醒sender线程发送数据。
数据的发送过程,可以简单总结为以上五点,下面会这几部分的具体实现进行详细分析。五、消息发送过程
5。1获取topic的metadata信息
Producer通过waitOnMetadata()方法来获取对应topic的metadata信息,这块内容我下一篇再来讲。
5。2key和value的序列化
Producer端对record的key和value值进行序列化操作,在Consumer端再进行相应的反序列化,Kafka内部提供的序列化和反序列化算法如下图所示:
当然我们也是可以自定义序列化的具体实现,不过一般情况下,Kafka内部提供的这些方法已经足够使用。
5。3获取该record要发送到的partition
获取partition值,具体分为下面三种情况:指明partition的情况下,直接将指明的值直接作为partiton值;没有指明partition值但有key的情况下,将key的hash值与topic的partition数进行取余得到partition值;既没有partition值又没有key值的情况下,第一次调用时随机生成一个整数(后面每次调用在这个整数上自增),将这个值与topic可用的partition总数取余得到partition值,也就是常说的roundrobin算法。
具体实现如下:当record中有partition值时,直接返回,没有的情况下调用partitioner的类的partition方法去计算(KafkaProducer。class)privateintpartition(ProducerRecordK,Vrecord,byte〔〕serializedKey,byte〔〕serializedValue,Clustercluster){Integerpartitionrecord。partition();returnpartition!null?partition:this。partitioner。partition(record。topic(),record。key(),serializedKey,record。value(),serializedValue,cluster);}
Producer默认使用的partitioner是org。apache。kafka。clients。producer。internals。DefaultPartitioner,用户也可以自定义partition的策略,下面是默认分区策略具体实现:publicintpartition(Stringtopic,Objectkey,byte〔〕keyBytes,Objectvalue,byte〔〕valueBytes,Clustercluster){returnthis。partition(topic,key,keyBytes,value,valueBytes,cluster,cluster。partitionsForTopic(topic)。size());}publicintpartition(Stringtopic,Objectkey,byte〔〕keyBytes,Objectvalue,byte〔〕valueBytes,Clustercluster,intnumPartitions){returnkeyBytesnull?this。stickyPartitionCache。partition(topic,cluster):Utils。toPositive(Utils。murmur2(keyBytes))numPartitions;}
上面这个默认算法核心就是粘着分区缓存
5。4向RecordAccmulator中追加record数据
我们讲RecordAccumulator之前先看这张图,这样的话会对整个发送流程有个大局观。
RecordAccmulator承担了缓冲区的角色。默认是32MB。
在KafkaProducer中,消息不是一条一条发给broker的,而是多条消息组成一个ProducerBatch,然后由Sender一次性发出去,这里的batch。size并不是消息的条数(凑满多少条即发送),而是一个大小。默认是16KB,可以根据具体情况来进行优化。
在RecordAccumulator中,最核心的参数就是:privatefinalConcurrentMapTopicPartition,DequeProducerBatchbatches;
它是一个ConcurrentMap,key是TopicPartition类,代表一个topic的一个partition。value是一个包含ProducerBatch的双端队列。等待Sender线程发送给broker。画张图来看下:
上面的代码不知道大家有没有疑问?分配内存的代码为啥不在synchronized同步块中分配?导致下面的synchronized同步块中还要tryAppend一下。
因为这时候可能其他线程已经创建好ProducerBatch了,造成多余的内存申请。
如果把分配内存放在synchronized同步块会有什么问题?
内存申请不到线程会一直等待,如果放在同步块中会造成一直不释放Deque队列的锁,那其他线程将无法对Deque队列进行线程安全的同步操作。
再跟下tryAppend()方法,这就比较简单了。
以上代码见图解:
5。5唤醒sender线程发送ProducerBatch
当record写入成功后,如果发现ProducerBatch已满足发送的条件(通常是queue中有多个batch,那么最先添加的那些batch肯定是可以发送了),那么就会唤醒sender线程,发送ProducerBatch。
sender线程对ProducerBatch的处理是在run()方法中进行的,该方法具体实现如下:
其中比较核心的方法是run()方法中的org。apache。kafka。clients。producer。internals。SendersendProducerData
其中pollTimeout意思是最长阻塞到至少有一个通道在你注册的事件就绪了。返回0则表示走起发车了。
我们继续跟下:org。apache。kafka。clients。producer。internals。RecordAccumulatorready
最后再来看下里面这个方法org。apache。kafka。clients。producer。internals。RecordAccumulatordrain,从accumulator缓冲区获取要发送的数据,最大一次性发max。request。size大小的数据。
六、总结
最后为了让你对KafkaProducer有个宏观的架构理解,请看下图:
简要说明:newKafkaProducer()后创建一个后台线程KafkaThread(实际运行线程是Sender,KafkaThread是对Sender的封装)扫描RecordAccumulator中是否有消息。调用KafkaProducer。send()发送消息,实际是将消息保存到RecordAccumulator中,实际上就是保存到一个Map中(ConcurrentMap),这条消息会被记录到同一个记录批次(相同主题相同分区算同一个批次)里面,这个批次的所有消息会被发送到相同的主题和分区上。后台的独立线程扫描到RecordAccumulator中有消息后,会将消息发送到Kafka集群中(不是一有消息就发送,而是要看消息是否ready)如果发送成功(消息成功写入Kafka),就返回一个RecordMetaData对象,它包括了主题和分区信息,以及记录在分区里的偏移量。如果写入失败,就会返回一个错误,生产者在收到错误之后会尝试重新发送消息(如果允许的话,此时会将消息在保存到RecordAccumulator中),几次之后如果还是失败就返回错误消息。
好了,本文对KafkaProducer源码进行了解析,下一篇文章将会详细介绍metadata的内容以及在Producer端metadata的更新机制。敬请期待