聊聊Kafka编译Kafka源码并搭建源码环境
一、前言
老周这里编译Kafka的版本是2。7,为啥采用这个版本来搭建源码的阅读环境呢?因为该版本相对来说比较新。而我为啥不用2。7后的版本呢?比如2。8,这是因为去掉了ZooKeeper,还不太稳定,生产环境也不太建议使用,所以以2。7版本进行源码搭建并研究。二、环境准备JDK:1。8。0241Scala:2。12。8Gradle:6。6Zookeeper:3。4。14三、环境搭建
3。1JDK环境搭建
这个就不用我说了吧,搞Java的本机都有JDK环境。
3。2Scala环境搭建
下载链接:https:www。scalalang。orgdownload2。12。8。html
这里老周是MacOS系统,这里大家看着自己的系统来下就好了哈。
3。2。1配置Scala环境变量
终端输入以下命令进行编辑:vim。bashprofile这里的路径是你安装SCALAHOMEUsersRiemannToolsscala2。12。8exportSCALAHOMEexportPATHPATH:SCALAHOMEbin使环境变量生效,在命令行执行。source。bashprofile
3。2。2验证
终端输入以下命令:scalaversion
出现以下提示,说明Scala环境搭建成功。
3。3Gradle环境搭建
首先来到Gradle官网:https:services。gradle。orgdistributions
如下图:
我们选择想要安装的发布版本,gradlex。xbin。zip是需要下载的安装发布版,gradlex。xsrc。zip是源码,gradlex。xall。zip则是下载全部的文件。我本地为gradle6。6。
Gradle下载的源码不需要安装,我们将下载的压缩包在本机的目录下直接解压即可,解压后的目录如下图所示。
3。3。1配置Gradle环境变量
终端输入以下命令进行编辑:vim。bashprofile这里的路径是你安装GRADLEHOMEUsersRiemannToolsgradle6。6exportGRADLEHOMEexportPATHPATH:GRADLEHOMEbin使环境变量生效,在命令行执行。source。bashprofile
3。3。2验证
终端输入以下命令:gradlev
出现以下提示,说明Gradle环境搭建成功。
3。4Zookeeper环境搭建
Zookeeper环境老周在Linux环境已经搭建好了的,直接用。这里我也给出搭建的步骤,不管你是啥系统,都是类似的
3。4。1下载wgethttp:mirrors。hust。edu。cnapachezookeeperzookeeper3。4。14zookeeper3。4。14。tar。gz
3。4。2解压tarzxvfzookeeper3。4。14。tar。gz
3。4。3进入zookeeper3。4。14目录,创建data文件夹cdzookeeper3。4。14mkdirdata
3。4。4修改配置文件cdconfmvzoosample。cfgzoo。cfg
3。4。5修改zoo。cfg中的data属性dataDirrootzookeeper3。4。14data
3。4。6zookeeper服务启动
进入bin目录,启动服务输入命令。zkServer。shstart
输出以下内容表示启动成功
3。5Kafka源码环境搭建
官网下载对应版本的源码包,网址:http:kafka。apache。orgdownloads
下载完后解压,这个源码文件还需要导入依赖jar包,个人使用IDEA来import导入项目,导入完后需使用前面配置好的gradle作为Gradlehome地址。
3。5。1导入Kafka源码至IDEA
3。5。2修改build。gradle
接下来还不能导jar包,需要把镜像文件下载服务器更换为国内的私服,否则会相当慢,直接导致timeout报错。
进入kafka源码包,修改build。gradle文件,在原来配置上,添加ali私服配置。buildscript{repositories{maven{urlhttp:maven。aliyun。comnexuscontentgroupspublic}maven{urlhttp:maven。aliyun。comnexuscontentrepositoriesjcenter}}}allprojects{repositories{maven{urlhttp:maven。aliyun。comnexuscontentgroupspublic}maven{urlhttp:maven。aliyun。comnexuscontentrepositoriesjcenter}}}
3。5。3代码构建
可以用命令来构建,也可以在idea图形界面的gradle来构建,这里肯定是idea图形化界面操作更简单,但这里也提供gradle的命令来构建。。gradlewcleanbuildxtest
去找一下直接下载Wrapper所需的Jar包,手动把这个Jar文件拷贝到kafka路径下的gradlewrapper子目录下,然后重新执行gradlewbuild命令去构建工程。
链接:https:pan。baidu。coms1W6EHysWY3ZWQZRWNdNZn3Q提取码:hpj5
gradle其它命令:构建jar包并运行。gradlewjar构建项目,看你是idea工具还是eclipse。gradlewidea。gradleweclipse构建源码包。gradlewsrcJar构建javadoc文档。gradlewaggregatedJavadoc清理并构建。gradlewclean四、代码结构
4。1代码安装包结构bin目录:保存Kafka工具行脚本,我们熟知的kafkaserverstart和kafkaconsoleproducer等脚本都存放在这里。checkstyle目录:代码规范,自动化检测。
Checkstyle是什么,关于格式化的讨论就不曾中断过,到底什么才是正确的,什么才是错误的,到现在也没有完整的定论。但随着时间发展,渐渐衍生出一套规范出来。没有什么绝对的正确和错误,关键在于规范的定义。最出名的就是googlestyleguide,Checkstyle就是以这种风格开发出的一个自动化插件,来辅助判断代码格式是否满足规范。
该目录下的文件定义了工程代码格式的规范,我们可以在build。gradle中看到相关checkstyle的配置和自动化代码格式化配置:
checkstyle配置:
scala自动化代码格式化配置:
clients目录:保存Kafka客户端代码,比如生产者和消费者的代码都在该目录下。config目录:保存Kafka的配置文件,其中比较重要的配置文件是server。properties。connect目录:保存Connect组件的源代码。KafkaConnect组件是用来实现Kafka与外部系统之间的实时数据传输的。core目录:保存Broker端代码。Kafka服务器端代码全部保存在该目录下。docs目录:Kafka设计文档以及组件相关结构图。examples目录:Kafka样例相关目录。generator目录:Kafka消息类处理模块,主要是根据clients模块下的messagejson文件生成对应的java类,在build。gradle文件中,可以看到定义了一个任务processMessages:
gradle目录:gradle的脚本和依赖包定义等相关文件。jmhbenchmarks目录:Kafka代码微基准测试相关类。
JMH,即JavaMicrobenchmarkHarness,是专门用于代码微基准测试的工具套件。何谓MicroBenchmark呢?简单的来说就是基于方法层面的基准测试,精度可以达到微秒级。当你定位到热点方法,希望进一步优化方法性能的时候,就可以使用JMH对优化的结果进行量化的分析。
JMH比较典型的应用场景有:想准确的知道某个方法需要执行多长时间,以及执行时间和输入之间的相关性;对比接口不同实现在给定条件下的吞吐量,找到最优实现。kafkalogs目录:server。properties文件中配置log。dirs生成的目录。log4jappender目录:
Alog4jappenderthatproduceslogmessagestoKafka这个目录里面就一个KafkaLog4jAppender类。raft目录:raft一致性协议相关。streams目录:
KafkaStreamsisaclientlibraryforbuildingapplicationsandmicroservices,wheretheinputandoutputdataarestoredinKafkaclusters。提供一个基于Kafka的流式处理类库,直接提供具体的类给开发者调用,整个应用的运行方式主要由开发者控制,方便使用和调试。KafkaStreams是一个用来构建流处理程序的库,特别是其输入是一个Kafkatopic,输出是另一个Kafkatopic的程序(或者是调用外部服务,或者是更新数据库,或者其它)。它使得你以一种分布式以及容错的方式来做这件事情。tests目录:此目录的内容介绍如何进行Kafka系统集成和性能测试。tools目录:工具类模块。vagrant目录:介绍如何在Vagrant虚拟环境中运行Kafka,提供了相关的脚本文件和说明文档。
Vagrant是一个基于Ruby的工具,用于创建和部署虚拟化开发环境。它使用Oracle的开源VirtualBox虚拟化系统,使用Chef创建自动化虚拟环境。
4。2项目结构
项目结构的话主要关注core目录,core目录是Kafka核心包,有集群管理,分区管理,存储管理,副本管理,消费者组管理,网络通信,消费管理等核心类。
admin包:执行管理命令的功能;api包:封装请求和响应DTO对象;cluster包:集群对象,例如Replica类代表一个分区副本,Partition类代表一个分区;common包:通用jar包;controller包:和kafkaController(kc)相关的类,重点模块,一个kafka集群只有一个leaderkc,该kc负责分区管理,副本管理,并保证集群信息在集群中同步;coordinator包:保存了消费者端的GroupCoordinator代码和用于事务的TransactionCoordinator代码。对coordinator包进行分析,特别是对消费者端的GroupCoordinator代码进行分析,是Broker端协调者组件设计原理的关键。log包:保存了Kafka最核心的日志结构代码,包括日志、日志段、索引文件等,另外,该包下还封装了LogCompaction的实现机制,是非常重要的源码包。network包:封装了Kafka服务器端网络层的代码,特别是SocketServer。scala这个文件,是Kafka实现Reactor模式的具体操作类,非常值得一读。consumer包:后面会丢弃该包,用clients包下consumer相关类代替。server包:顾名思义,它是Kafka的服务器端主代码,里面的类非常多,很多关键的Kafka组件都存放在这里,比如状态机、Purgatory延时机制等。tools包:工具类。五、环境验证
下面我们来验证一下Kafka源码环境是否搭建成功。
5。1首先,我们在coresrcmain目录下新建resources目录,再将conf目录下的log4j。properties配置文件拷贝到resources目录下。
如下图所示:
5。2修改conf目录下的server。properties文件log。dirsUsersRiemannCodeframeworksourcecodeanalysiskafka2。7。0srckafkalogs
server。properties文件中的其他配置暂时不用修改。
5。3在IDEA中配置kafka。Kafka这个入口类
具体配置如下图所示:
5。4启动KafkaBroker
启动成功的话,控制台输出没有异常,且能看到如下输出:
5。5可能出现以下异常
5。5。1异常1log4j:WARNNoappenderscouldbefoundforlogger(kafka。utils。Log4jControllerRegistration)。log4j:WARNPleaseinitializethelog4jsystemproperly。log4j:WARNSeehttp:logging。apache。orglog4j1。2faq。htmlnoconfigformoreinfo。
在projectstructure中加入slf4jlog4j121。7。30。jar和log4j1。2。17。jar两个日志包,当然也可以在build。gradle中添加对应的配置来添加包。
方法1:
方法2:compilegroup:log4j,name:log4j,version:1。2。17compilegroup:org。slf4j,name:slf4japi,version:1。7。30compilegroup:org。slf4j,name:slf4jlog4j12,version:1。7。30
加到build。gradle文件中的core模块:
5。5。2异常2SLF4J:Failedtoloadclassorg。slf4j。impl。StaticLoggerBinder。
SLF4J:Defaultingtonooperation(NOP)loggerimplementation
SLF4J:Seehttp:www。slf4j。orgcodes。htmlStaticLoggerBinderforfurtherdetails。
在这里插入图片描述
5。6发送、消费message
我们这里使用Kafka自带的脚本工具来验证上面搭建的Kafka源码环境
首先,我们进入到{KAFKAHOME}bin目录,通过kafkatopics。sh命令来创建一个名为topictest的topic:
执行效果如下图所示:
然后我们通过kafkaconsoleconsumer。sh命令启动一个命令行的consumer来消费topictest这个topic,如下:。kafkaconsoleconsumer。shbootstrapserverlocalhost:9092topictopictest
接下来,我们通过kafkaconsoleproducer。sh命令启动一个命令行的producer向topictest这个topic中生成数据,如下:
当我们输入一条message并回车之后,message会发送到topictest这个topic中。
我们输入完message并回车之后,就可以在consumer处收到该message了,效果如下图所示:
大功告成,后续会陆续分析KafkaBroker端的源码,尽情期待
注意!化工大省沦陷!8万多家化工厂或被封控近期多地疫情再度抬头,全国单日新增感染病例已连续多日破万,作为化工大省的广东疫情形势升级,让不少化工人焦灼不已。疫情的侵袭给化工行业造成了影响。国家卫健委31省份昨增本土21452
铜曲折的转向本周观点上周铜价震荡回落,与我们周报中的观点完全吻合,我们认为目前下行动能释放接近尾声,后续可能转入区间震荡最近的回调主要在于此前推升铜价脉冲式上涨的因素均出现一定程度的反转。首先
在上海,普通人也能操作的经营性抵押贷款引言经营性抵押贷款其实不是一个新鲜词,但由于从2020年开始政府为保经济保就业保民生,相关政策大放水,使得该类贷款的热度近三年来一直居高不下。今年,经营性抵押贷出现年化利率低至3。
液晶电视200元卖不动,电视行业到底怎么了?电视行业的低价竞争越来越严重了。日前据奥维云网消费电子事业部研究总监刘飞向媒体透露电视行业的竞争还是比较严重,20182022年平均尺寸增长近10寸,但均价一直维持在3000元左右
家具建材经销商转型之路(三)行业八大痛点和转型之路的探索家具建材经销商经营中存在的八大痛点一行业内卷越来越严重当今这个经济寒冬中,各行各业都存在或多或少的内卷。家具建材行业尤其严重。改革开放这几十年来,随着生产力的提升和中国经济的腾飞,
高清组图再见!南博会再见!南博会11月22日,为期4天的第6届中国南亚博览会暨第26届中国昆明进出口商品交易会落下帷幕。共享新机遇,共谋新发展在会展期间,中外参展商积极洽谈交流,开展商务对接,携手合作
有合理诉求时,怎么投诉银行,才能得到最快速最有效的处理?文又吃了三碗饭入驻头条第233天,第99篇原创文章全文1364字,阅读时长约5分钟当前,很多客户在银行办理业务时遇到不合理的对待,或对银行服务不满意的时候,往往会选择投诉。干了10
11月LPR保持不变,办理贷款还需要等吗?快年底了,房贷一族们都翘首以盼,期待央妈能再发个红包,利率再降一降。虽然11月15日央行的MLF操作,已预示着11月LPR大概率维持不变11月21日,最新一期LPR报价出炉1年期L
九九归心病是怎么来的焦虑症是太要强要出来的抑郁症是钻牛角尖粘出来的失眠是爱操心操出来的胃不好是怨出来的肝不好是怒出来的肾不好是怕出来的结节和乳腺病是气出来的今天得的病不是昨天刚得的而是日积
直播14场只卖了2W,乐视开播两个月,为什么还能越播越快乐?前段时间,因为没有内卷和996,乐视员工引来全网羡慕。可是近期,画风变了,大家都在看乐视笑话。这是怎么回事呢?大概是受到新东方直播转型成功的影响,乐视也开始直播,还斗志满满地说努力
我们也要测试小行星防御,同时发射两个拦截器,2025年就能实现?还记得今年9月27日,美国DART拦截器在距离地球约1100万公里的深空,成功给某个双小行星系统做的重定向手术吗?当时震惊了很多人,从这一刻开始,人类掌握了逆天改命的技术,可以避免