专栏电商日志财经减肥爱情
投稿投诉
爱情常识
搭配分娩
减肥两性
孕期塑形
财经教案
论文美文
日志体育
养生学堂
电商科学
头戴业界
专栏星座
用品音乐

聊聊KafkaProducer源码解析

  一、前言
  前面几篇我们讲了关于Kafka的基础架构以及搭建,从这篇开始我们就来源码分析一波。我们这用的Kafka版本是2。7。0,其Client端是由Java实现,Server端是由Scala来实现的,在使用Kafka时,Client是用户最先接触到的部分,因此,我们从Client端开始,会先从Producer端开始,今天我们就来对Producer源码解析一番。二、Producer使用
  首先我们先通过一段代码来展示KafkaProducer的使用方法。在下面的示例中,我们使用KafkaProducer实现向Kafka发送消息的功能。在示例程序中,首先将KafkaProduce使用的配置写入
  到Properties中,每项配置的具体含义在注释中进行解释。之后以此Properties对象为参数构造KafkaProducer对象,最后通过send方法完成发送,代码中包含同步发送、异步发送两种情况。
  从上面的代码可以看出Kafka为用户提供了非常简洁方便的API,在使用时,只需要如下两步:初始化KafkaProducer实例调用send接口发送数据
  本文主要是围绕着初始化KafkaProducer实例与如何实现send接口发送数据而展开的。三、KafkaProducer实例化
  了解了KafkaProducer的基本使用,然后我们来深入了解下方法核心逻辑:publicKafkaProducer(Propertiesproperties){this(Utils。propsToMap(properties),(Serializer)null,(Serializer)null,(ProducerMetadata)null,(KafkaClient)null,(ProducerInterceptors)null,Time。SYSTEM);}
  四、消息发送过程
  用户是直接使用producer。send()发送的数据,先看一下send()接口的实现异步向一个topic发送数据publicFutureRecordMetadatasend(ProducerRecordK,Vrecord){returnthis。send(record,(Callback)null);}向topic异步地发送数据,当发送确认后唤起回调函数publicFutureRecordMetadatasend(ProducerRecordK,Vrecord,Callbackcallback){ProducerRecordK,VinterceptedRecordthis。interceptors。onSend(record);returnthis。doSend(interceptedRecord,callback);}
  数据发送的最终实现还是调用了Producer的doSend()接口。
  4。1拦截器
  首先方法会先进入拦截器集合ProducerInterceptors,onSend方法是遍历拦截器onSend方法,拦截器的目的是将数据处理加工,Kafka本身并没有给出默认的拦截器的实现。如果需要使用拦截器功能,必须自己实现接口。
  4。1。1拦截器代码
  4。1。2拦截器核心逻辑
  ProducerInterceptor接口包括三个方法:onSend(ProducerRecordvar1):该方法封装进KafkaProducer。send方法中,即它运行在用户主线程中的。确保在消息被序列化以计算分区前调用该方法。用户可以在该方法中对消息做任何操作,但最好保证不要修改消息所属的topic和分区,否则会影响目标分区的计算。onAcknowledgement(RecordMetadatavar1,Exceptionvar2):该方法会在消息被应答之前或消息发送失败时调用,并且通常都是在producer回调逻辑触发之前。onAcknowledgement运行在producer的IO线程中,因此不要在该方法中放入很重的逻辑,否则会拖慢producer的消息发送效率。close():关闭interceptor,主要用于执行一些资源清理工作。
  拦截器可能被运行在多个线程中,因此在具体实现时用户需要自行确保线程安全。另外倘若指定了多个interceptor,则producer将按照指定顺序调用它们,并仅仅是捕获每个interceptor可能抛出的异常记录到错误日志中而非在向上传递。
  4。2Producer的doSend实现
  下面是doSend()的具体实现:
  在doSend()方法的实现上,一条Record数据的发送,主要分为以下五步:确认数据要发送到的topic的metadata是可用的(如果该partition的leader存在则是可用的,如果开启权限时,client有相应的权限),如果没有topic的metadata信息,就需要获取相应的metadata;序列化record的key和value;获取该record要发送到的partition(可以指定,也可以根据算法计算);向accumulator中追加record数据,数据会先进行缓存;如果追加完数据后,对应的RecordBatch已经达到了batch。size的大小(或者batch的剩余空间不足以添加下一条Record),则唤醒sender线程发送数据。
  数据的发送过程,可以简单总结为以上五点,下面会这几部分的具体实现进行详细分析。五、消息发送过程
  5。1获取topic的metadata信息
  Producer通过waitOnMetadata()方法来获取对应topic的metadata信息,这块内容我下一篇再来讲。
  5。2key和value的序列化
  Producer端对record的key和value值进行序列化操作,在Consumer端再进行相应的反序列化,Kafka内部提供的序列化和反序列化算法如下图所示:
  当然我们也是可以自定义序列化的具体实现,不过一般情况下,Kafka内部提供的这些方法已经足够使用。
  5。3获取该record要发送到的partition
  获取partition值,具体分为下面三种情况:指明partition的情况下,直接将指明的值直接作为partiton值;没有指明partition值但有key的情况下,将key的hash值与topic的partition数进行取余得到partition值;既没有partition值又没有key值的情况下,第一次调用时随机生成一个整数(后面每次调用在这个整数上自增),将这个值与topic可用的partition总数取余得到partition值,也就是常说的roundrobin算法。
  具体实现如下:当record中有partition值时,直接返回,没有的情况下调用partitioner的类的partition方法去计算(KafkaProducer。class)privateintpartition(ProducerRecordK,Vrecord,byte〔〕serializedKey,byte〔〕serializedValue,Clustercluster){Integerpartitionrecord。partition();returnpartition!null?partition:this。partitioner。partition(record。topic(),record。key(),serializedKey,record。value(),serializedValue,cluster);}
  Producer默认使用的partitioner是org。apache。kafka。clients。producer。internals。DefaultPartitioner,用户也可以自定义partition的策略,下面是默认分区策略具体实现:publicintpartition(Stringtopic,Objectkey,byte〔〕keyBytes,Objectvalue,byte〔〕valueBytes,Clustercluster){returnthis。partition(topic,key,keyBytes,value,valueBytes,cluster,cluster。partitionsForTopic(topic)。size());}publicintpartition(Stringtopic,Objectkey,byte〔〕keyBytes,Objectvalue,byte〔〕valueBytes,Clustercluster,intnumPartitions){returnkeyBytesnull?this。stickyPartitionCache。partition(topic,cluster):Utils。toPositive(Utils。murmur2(keyBytes))numPartitions;}
  上面这个默认算法核心就是粘着分区缓存
  5。4向RecordAccmulator中追加record数据
  我们讲RecordAccumulator之前先看这张图,这样的话会对整个发送流程有个大局观。
  RecordAccmulator承担了缓冲区的角色。默认是32MB。
  在KafkaProducer中,消息不是一条一条发给broker的,而是多条消息组成一个ProducerBatch,然后由Sender一次性发出去,这里的batch。size并不是消息的条数(凑满多少条即发送),而是一个大小。默认是16KB,可以根据具体情况来进行优化。
  在RecordAccumulator中,最核心的参数就是:privatefinalConcurrentMapTopicPartition,DequeProducerBatchbatches;
  它是一个ConcurrentMap,key是TopicPartition类,代表一个topic的一个partition。value是一个包含ProducerBatch的双端队列。等待Sender线程发送给broker。画张图来看下:
  上面的代码不知道大家有没有疑问?分配内存的代码为啥不在synchronized同步块中分配?导致下面的synchronized同步块中还要tryAppend一下。
  因为这时候可能其他线程已经创建好ProducerBatch了,造成多余的内存申请。
  如果把分配内存放在synchronized同步块会有什么问题?
  内存申请不到线程会一直等待,如果放在同步块中会造成一直不释放Deque队列的锁,那其他线程将无法对Deque队列进行线程安全的同步操作。
  再跟下tryAppend()方法,这就比较简单了。
  以上代码见图解:
  5。5唤醒sender线程发送ProducerBatch
  当record写入成功后,如果发现ProducerBatch已满足发送的条件(通常是queue中有多个batch,那么最先添加的那些batch肯定是可以发送了),那么就会唤醒sender线程,发送ProducerBatch。
  sender线程对ProducerBatch的处理是在run()方法中进行的,该方法具体实现如下:
  其中比较核心的方法是run()方法中的org。apache。kafka。clients。producer。internals。SendersendProducerData
  其中pollTimeout意思是最长阻塞到至少有一个通道在你注册的事件就绪了。返回0则表示走起发车了。
  我们继续跟下:org。apache。kafka。clients。producer。internals。RecordAccumulatorready
  最后再来看下里面这个方法org。apache。kafka。clients。producer。internals。RecordAccumulatordrain,从accumulator缓冲区获取要发送的数据,最大一次性发max。request。size大小的数据。
  六、总结
  最后为了让你对KafkaProducer有个宏观的架构理解,请看下图:
  简要说明:newKafkaProducer()后创建一个后台线程KafkaThread(实际运行线程是Sender,KafkaThread是对Sender的封装)扫描RecordAccumulator中是否有消息。调用KafkaProducer。send()发送消息,实际是将消息保存到RecordAccumulator中,实际上就是保存到一个Map中(ConcurrentMap),这条消息会被记录到同一个记录批次(相同主题相同分区算同一个批次)里面,这个批次的所有消息会被发送到相同的主题和分区上。后台的独立线程扫描到RecordAccumulator中有消息后,会将消息发送到Kafka集群中(不是一有消息就发送,而是要看消息是否ready)如果发送成功(消息成功写入Kafka),就返回一个RecordMetaData对象,它包括了主题和分区信息,以及记录在分区里的偏移量。如果写入失败,就会返回一个错误,生产者在收到错误之后会尝试重新发送消息(如果允许的话,此时会将消息在保存到RecordAccumulator中),几次之后如果还是失败就返回错误消息。
  好了,本文对KafkaProducer源码进行了解析,下一篇文章将会详细介绍metadata的内容以及在Producer端metadata的更新机制。敬请期待

每一口茶都不白喝,每一步路都不白走一个人要付出多少代价,才能看起来毫不费力地过好这一生?事实上,除了天生运气好,投胎就在罗马的那些人,大部分人出身平平资质平平运气平平,需要靠着自己的双脚,一步一个脚印,踏踏实实认认谁的心里没有伤有的说,我受伤了,心里。我想说的是,谁的心里没有伤。从我们出生,到我们熬完这几十年,谁的心里不是伤痕累累,血迹斑斑。不成年前的还不算,只说成年后。从您上大学,到您大学毕业后找工作。中国空间站第三次太空授课开讲新晋太空教师首次在问天舱授课10月12日,在云南省大理新世纪中学,收看天宫课堂第三课的学生介绍地面种植的拟南芥生长情况。10月12日16时01分,天宫课堂第三课在中国空间站开讲,新晋太空教师陈冬刘洋蔡旭哲为广欢迎归来!中国女篮今日启程回国,世界杯亚军收获1800万重奖北京时间10月12日,根据国内媒体的报道,中国女篮结束澳大利亚的世界杯之后,今天正式踏上回国的征程,相对于接下来的安排,除了部分球员养伤之外,剩下就是好好休息为主,备战新赛季的WC2513!中国女排扳回一城,蔡斌变阵,奇兵盘活全队!主攻尖兵6分10月11日23点开始,女排世锦赛第一场淘汰赛,中国女排迎来苦主意大利女排。蔡斌派出最强阵容,中国队开局不利,竟然29落后多达7分。此后中国队不断调整,但是面对埃格努犀利的进攻,还床上红人中国对芯片的狂热,从出口电热毯开始一个奇怪的副作用是中国向欧洲出口电热毯的小幅增长。中国分析师和咨询机构JWInsights(Jiwei)报告称,销售额从7万英镑增至8万英镑,而前几年约为3万英镑。这些毯子中有大量美债务高达31万亿,中国9万多亿资产安全吗?美国会冻结吗美国虽然是全球第一经济体,但是你知道它借了多少钱吗?据日前美国财务部公布的数据表示,美国债务目前高达31万亿美元。因为这数字太大,所以很多人没有这个概念。如果拿美国GDP来比较的话为什么有的酒店住一晚要几千元,隐藏的5项服务,让人很过瘾酒店有很多的档次,同一间酒店的房间也分不同的档次,那么为什么有些房间一晚上就需要几千块的费用呢?(此处已添加小程序,请到今日头条客户端查看)首先,这样的房间会为客人提供优质的管家服如何浸沉式穿越青海?趁着这种玩法还未大火,赶紧走起头条创作挑战赛巅峰之作探险级旅行历时二年精心筹备,倾力打造,诚邀你一同前往探寻人迹罕至的极致美景,揭开生命禁区的神秘的面纱。可可西里中国四大无人区之首这里是远离人类侵扰地球上最寂静深圳遛娃好去处全国接待游客最多的主题公园里感受中华神韵深圳遛娃好去处,今天小编为大家推荐的是深圳必打卡地锦绣中华民俗村。深圳锦绣中华民俗村是中国第一个现代主题公园,国家首批5A级景区,也是中国待游客最多的主题公园,截至目前已累计接待游乡村游?不要再黑黑朱庄了不太相信这是真的,这是在调侃吧?每天早晨都路过黑朱庄,人流量超大,出租车排队是常态,听说晚上排队更长。作为城中村改造的样板,同时处在城市东扩后的核心位置,交通便利,房源众多,为周边
广西东兴等一场中越旅行在东兴,上山下海出国,都不难。这座位于西南边陲的小城,隶属于广西防城港市,因兴起于北仑河东岸而得名。走过111米长的中越友谊大桥,便能抵达越南芒街市,观赏错落有致的法式建筑,喝上一周知!2023年尖山桃花会将于3月3日盛大开幕自贡网记者周姝桃花春色暖先开明媚谁人不看来自贡市第23届自流井尖山桃花会将于3月3日开幕并一直持续到5月8日记者了解到,本届尖山桃花会设有古风体验亲子互动情侣打卡桃花集市四大游玩区我,2014年来到三亚闯世界,现在却要下决心离开这里,真的很无奈头条创作挑战赛海南三亚我叫吴艳娟,安徽人,今年42岁。2014年,我从原单位辞职,独自一个人到三亚来闯世界。9年来在事业上坎坎坷坷起起伏伏,现在我终于要下定决心离开这里。但要真的去日本签证办理你符合条件了吗?日本三年五年签证办理基础资料1。护照原件(6个月以上有效期)2。两寸白底彩色证件照(半年内拍摄)3。个人资料表(赴日申请表个人处理同意书)4。身份证明(身份证户口本结婚证等复印件)广安邻水提档升级旅游景点带动旅游热起来春回大地,随着气温回暖,正是踏青出游的好时节。邻水县积极筹备,精心打造旅游打卡点,推出精品春季出游线路,迎接春季旅游旺季。近日,记者来到棕阁山生态度假区,在景区野生杜鹃生长区域,工西双版纳景洪市出境游升温迎来出入境证照办理热潮近日,在云南省西双版纳傣族自治州景洪市政务服务大厅出入境窗口,等待办理证件的人已经排起队伍。随着有关部门宣布出境游重启,景洪市出境游市场迅速升温,政务服务大厅出入境窗口时隔三年再度2023暖春限定!80万亩花海,漫山绽放,这里藏着国内最早的春天!赴一场春日风物诗说起云南从耳熟详闻的丽江大理,玉龙雪山到西北边境的香格里拉,梅里雪山还有南部的热带雨林,西双版纳有哪个不是在朋友圈让人羡慕的坐标当提起滇东南时很多人很疑惑有这个地方当前最顶端的手机,预算足够建议直接拿下如果您喜欢,可以点击上面的关注二字。后续会为您提供更多有价值的内容。今天分享当前最顶端的手机,预算足够建议直接拿下第一款vivoX90Pro参考价格5499元(12256G)viv2023换机,小米和vivo谁更具性价比?这4款高水准如果您喜欢,可以点击上面的关注二字。后续会为您提供更多有价值的内容。今天分享,2023换机,小米和vivo谁更具性价比?这4款高水准。第一款iQOONeo7参考价格2799元(12手机用的久,还在小米和OPPO间迟疑吗?这4款很优秀如果您喜欢,可以点击上面的关注二字。后续会为您提供更多有价值的内容。今天分享,手机用的久,还在小米和OPPO间迟疑吗?这4款很优秀,性价比高。第一款红米K60参考价格3299元(1手机新增智感扫码,手机对准二维码,不打开应用自动读取鸿蒙系统3。0最新版本,新增了智感扫码你知道吗?大家好,欢迎来到科技熊,今天给大家分享的是如何使用手机的一个智感扫码功能,所谓的智感扫码就是比如说你的手机直接对准二维码,无论是收款
友情链接:快好找快生活快百科快传网中准网文好找聚热点快软网