KafkaKraft模式教程(一)
说明
Kafka版本:3。3。1
Kraft模式!准备三台服务器
我是通过vagrant搭配virtualbox创建的虚拟机。vagrant的Vagrantfile文件内容如下:Vagrant。configure(2)doconfig(1。。3)。eachdoiconfig。vm。definekraft{i}donode设置虚拟机的Box。指定本地的box文件node。vm。boxboxomaticcentosstream9设置虚拟机的主机名node。vm。hostnamekraft{i}设置虚拟机的IPnode。vm。networkprivatenetwork,ip:192。168。10。1{i}VirtualBox相关配置node。vm。providervirtualboxdov设置虚拟机的名称v。namekraft{i}设置虚拟机的内存大小v。memory2048设置虚拟机的CPU个数v。cpus1endendendend
然后执行vagrantup执行创建虚拟机。创建完成的虚拟机IP和HOSTNAME如下:
IP
主机名
192。168。10。11
kraft1
192。168。10。12
kraft2
192。168。10。13
kraft3Host配置
修改etchosts,配置hosts,保证服务器之间能够通过hostname通信。192。168。10。11kraft1192。168。10。12kraft2192。168。10。13kraft3JDK安装
kafka的运行依赖JDK(要求JDK8),三个服务器都安装JDK〔vagrantkraft1〕sudoyuminstalljava17openjdky〔vagrantkraft2〕sudoyuminstalljava17openjdky〔vagrantkraft3〕sudoyuminstalljava17openjdky下载kafka
kafka3。3。1下载地址下载完毕后,上传到三台服务器上,然后解压。配置修改
修改kafka目录下的configkraftserver。properties文件。三个服务器都需要修改。特别注意:每个服务器(broker)上的配置里的node。id必须是数字,并且不能重复。LicensedtotheApacheSoftwareFoundation(ASF)underoneormorecontributorlicenseagreements。SeetheNOTICEfiledistributedwiththisworkforadditionalinformationregardingcopyrightownership。TheASFlicensesthisfiletoYouundertheApacheLicense,Version2。0(theLicense);youmaynotusethisfileexceptincompliancewiththeLicense。YoumayobtainacopyoftheLicenseathttp:www。apache。orglicensesLICENSE2。0Unlessrequiredbyapplicablelaworagreedtoinwriting,softwaredistributedundertheLicenseisdistributedonanASISBASIS,WITHOUTWARRANTIESORCONDITIONSOFANYKIND,eitherexpressorimplied。SeetheLicenseforthespecificlanguagegoverningpermissionsandlimitationsundertheLicense。ThisconfigurationfileisintendedforuseinKRaftmode,whereApacheZooKeeperisnotpresent。SeeconfigkraftREADME。mdfordetails。ServerBasicsTheroleofthisserver。SettingthisputsusinKRaftmodeprocess。rolesbroker,controllerThenodeidassociatedwiththisinstancesrolesnode。id1Theconnectstringforthecontrollerquorumcontroller。quorum。voters1kraft1:9093,2kraft2:9093,3kraft3:9093SocketServerSettingsTheaddressthesocketserverlistenson。Combinednodes(i。e。thosewithprocess。rolesbroker,controller)mustlistthecontrollerlistenerhereataminimum。Ifthebrokerlistenerisnotdefined,thedefaultlistenerwilluseahostnamethatisequaltothevalueofjava。net。InetAddress。getCanonicalHostName(),withPLAINTEXTlistenername,andport9092。FORMAT:listenerslistenername:hostname:portEXAMPLE:listenersPLAINTEXT:your。host。name:9092listenersPLAINTEXT::9092,CONTROLLER::9093Nameoflistenerusedforcommunicationbetweenbrokers。inter。broker。listener。namePLAINTEXTListenername,hostnameandportthebrokerwilladvertisetoclients。Ifnotset,itusesthevalueforlisteners。advertised。listenersPLAINTEXT::9092Acommaseparatedlistofthenamesofthelistenersusedbythecontroller。Ifnoexplicitmappingsetinlistener。security。protocol。map,defaultwillbeusingPLAINTEXTprotocolThisisrequiredifrunninginKRaftmode。controller。listener。namesCONTROLLERMapslistenernamestosecurityprotocols,thedefaultisforthemtobethesame。Seetheconfigdocumentationformoredetailslistener。security。protocol。mapCONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASLPLAINTEXT:SASLPLAINTEXT,SASLSSL:SASLSSLThenumberofthreadsthattheserverusesforreceivingrequestsfromthenetworkandsendingresponsestothenetworknum。network。threads3Thenumberofthreadsthattheserverusesforprocessingrequests,whichmayincludediskIOnum。io。threads8Thesendbuffer(SOSNDBUF)usedbythesocketserversocket。send。buffer。bytes102400Thereceivebuffer(SORCVBUF)usedbythesocketserversocket。receive。buffer。bytes102400Themaximumsizeofarequestthatthesocketserverwillaccept(protectionagainstOOM)socket。request。max。bytes104857600LogBasicsAcommaseparatedlistofdirectoriesunderwhichtostorelogfileslog。dirshomevagrantkraftcombinedlogsThedefaultnumberoflogpartitionspertopic。Morepartitionsallowgreaterparallelismforconsumption,butthiswillalsoresultinmorefilesacrossthebrokers。num。partitions1Thenumberofthreadsperdatadirectorytobeusedforlogrecoveryatstartupandflushingatshutdown。ThisvalueisrecommendedtobeincreasedforinstallationswithdatadirslocatedinRAIDarray。num。recovery。threads。per。data。dir1InternalTopicSettingsThereplicationfactorforthegroupmetadatainternaltopicsconsumeroffsetsandtransactionstateForanythingotherthandevelopmenttesting,avaluegreaterthan1isrecommendedtoensureavailabilitysuchas3。offsets。topic。replication。factor1transaction。state。log。replication。factor1transaction。state。log。min。isr1LogFlushPolicyMessagesareimmediatelywrittentothefilesystembutbydefaultweonlyfsync()tosynctheOScachelazily。Thefollowingconfigurationscontroltheflushofdatatodisk。Thereareafewimportanttradeoffshere:1。Durability:Unflusheddatamaybelostifyouarenotusingreplication。2。Latency:Verylargeflushintervalsmayleadtolatencyspikeswhentheflushdoesoccurastherewillbealotofdatatoflush。3。Throughput:Theflushisgenerallythemostexpensiveoperation,andasmallflushintervalmayleadtoexcessiveseeks。ThesettingsbelowallowonetoconfiguretheflushpolicytoflushdataafteraperiodoftimeoreveryNmessages(orboth)。Thiscanbedonegloballyandoverriddenonapertopicbasis。Thenumberofmessagestoacceptbeforeforcingaflushofdatatodisklog。flush。interval。messages10000Themaximumamountoftimeamessagecansitinalogbeforeweforceaflushlog。flush。interval。ms1000LogRetentionPolicyThefollowingconfigurationscontrolthedisposaloflogsegments。Thepolicycanbesettodeletesegmentsafteraperiodoftime,orafteragivensizehasaccumulated。Asegmentwillbedeletedwhenevereitherofthesecriteriaaremet。Deletionalwayshappensfromtheendofthelog。Theminimumageofalogfiletobeeligiblefordeletionduetoagelog。retention。hours168Asizebasedretentionpolicyforlogs。Segmentsareprunedfromthelogunlesstheremainingsegmentsdropbelowlog。retention。bytes。Functionsindependentlyoflog。retention。hours。log。retention。bytes1073741824Themaximumsizeofalogsegmentfile。Whenthissizeisreachedanewlogsegmentwillbecreated。log。segment。bytes1073741824Theintervalatwhichlogsegmentsarecheckedtoseeiftheycanbedeletedaccordingtotheretentionpolicieslog。retention。check。interval。ms300000
三个broker的配置基本都和上面的配置一样,不同的地方就是node。id。
kraft1node。id1
kraft2node。id2
kraft3node。id3
另外还有两处需要修改。controller。quorum。voters1kraft1:9093,2kraft2:9093,3kraft3:9093【以逗号分隔的{id}{host}:{port}投票者列表。例如:1localhost:9092,2localhost:9093,3localhost:9094】log。dirshomevagrantkraftcombinedlogs【日志路径,默认是temp下的文件下,生产环境不要使用,因为linux会清理tmp目录下的文件,会造成数据丢失】生成集群ID
随便找一个服务器,进入kafka目录,使用kafkastorage。sh生成一个uuid,一个集群只能有一个uuid!!!〔vagrantkraft1kafka2。133。3。1〕KAFKACLUSTERID(binkafkastorage。shrandomuuid)〔vagrantkraft1kafka2。133。3。1〕echoKAFKACLUSTERIDt6vWCV2iRneJB62NXxO19g
这个ID就可以作为集群的ID格式化存储目录
三个机器上都需要执行kraft1服务器〔vagrantkraft1kafka2。133。3。1〕binkafkastorage。shformattt6vWCV2iRneJB62NXxO19gcconfigkraftserver。propertiesFormattinghomevagrantkraftcombinedlogswithmetadata。version3。3IV3。kraft2服务器〔vagrantkraft2kafka2。133。3。1〕binkafkastorage。shformattt6vWCV2iRneJB62NXxO19gcconfigkraftserver。propertiesFormattinghomevagrantkraftcombinedlogswithmetadata。version3。3IV3。kraft3服务器〔vagrantkraft3kafka2。133。3。1〕binkafkastorage。shformattt6vWCV2iRneJB62NXxO19gcconfigkraftserver。propertiesFormattinghomevagrantkraftcombinedlogswithmetadata。version3。3IV3。启动服务器
三个机器都需要执行kraft1服务器〔vagrantkraft1kafka2。133。3。1〕binkafkaserverstart。shdaemonconfigkraftserver。propertieskraft2服务器〔vagrantkraft2kafka2。133。3。1〕binkafkaserverstart。shdaemonconfigkraftserver。propertieskraft3服务器〔vagrantkraft3kafka2。133。3。1〕binkafkaserverstart。shdaemonconfigkraftserver。properties查看元数据(Metadata)〔vagrantkraft1kafka2。133。3。1〕binkafkametadatashell。shsnapshothomevagrantkraftcombinedlogsclustermetadata000000000000000000000。logLoading。。。Starting。。。〔2022122811:12:45,455〕WARN〔snapshotReaderQueue〕eventhandlerthreadexitingwithexception(org。apache。kafka。queue。KafkaEventQueue)java。nio。channels。NonWritableChannelExceptionatjava。basesun。nio。ch。FileChannelImpl。truncate(FileChannelImpl。java:406)atorg。apache。kafka。common。record。FileRecords。truncateTo(FileRecords。java:270)atorg。apache。kafka。common。record。FileRecords。trim(FileRecords。java:231)atorg。apache。kafka。common。record。FileRecords。close(FileRecords。java:205)atorg。apache。kafka。metadata。util。SnapshotFileReader3。run(SnapshotFileReader。java:182)atorg。apache。kafka。queue。KafkaEventQueueEventHandler。run(KafkaEventQueue。java:174)atjava。basejava。lang。Thread。run(Thread。java:833)〔KafkaMetadataShell〕lsbrokersfeatureslocalmetadataQuorumlsbrokers123lsfeaturesmetadata。versionlslocalcommitIdversionlsmetadataQuorumleaderoffset
集群搭建完毕后,metadata中的一级目录只有brokers,features,local,metadataQuorum。创建主题,消费的时候,会增加一些其他的一级目录。比如topics,topicIds等。
这里报了个错,不知道具体原因,目前不影响使用,暂时忽略(之后确定下)!创建主题
我创建一个3副本、3分区的主题(itlab1024topic1)。〔vagrantkraft1kafka2。133。3。1〕binkafkatopics。shcreatetopicitlab1024topic1partitions3replicationfactor3bootstrapserverkraft1:9092,kraft2:9092,kraft3:9092Createdtopicitlab1024topic1。查看主题〔vagrantkraft1kafka2。133。3。1〕binkafkatopics。shdescribetopicitlab1024topic1bootstrapserverkraft1:9092,kraft2:9092,kraft3:9092Topic:itlab1024topic1TopicId:li8JnUSOeFHIAZdDZKAPartitionCount:3ReplicationFactor:3Configs:segment。bytes1073741824Topic:itlab1024topic1Partition:0Leader:2Replicas:2,3,1Isr:2,3,1Topic:itlab1024topic1Partition:1Leader:3Replicas:3,1,2Isr:3,1,2Topic:itlab1024topic1Partition:2Leader:1Replicas:1,2,3Isr:1,2,3再次查看元数据(Metadata)〔vagrantkraft1kafka2。133。3。1〕binkafkametadatashell。shsnapshothomevagrantkraftcombinedlogsclustermetadata000000000000000000000。logLoading。。。Starting。。。〔2022122811:24:58,958〕WARN〔snapshotReaderQueue〕eventhandlerthreadexitingwithexception(org。apache。kafka。queue。KafkaEventQueue)java。nio。channels。NonWritableChannelExceptionatjava。basesun。nio。ch。FileChannelImpl。truncate(FileChannelImpl。java:406)atorg。apache。kafka。common。record。FileRecords。truncateTo(FileRecords。java:270)atorg。apache。kafka。common。record。FileRecords。trim(FileRecords。java:231)atorg。apache。kafka。common。record。FileRecords。close(FileRecords。java:205)atorg。apache。kafka。metadata。util。SnapshotFileReader3。run(SnapshotFileReader。java:182)atorg。apache。kafka。queue。KafkaEventQueueEventHandler。run(KafkaEventQueue。java:174)atjava。basejava。lang。Thread。run(Thread。java:833)〔KafkaMetadataShell〕lsbrokersfeatureslocalmetadataQuorumtopicIdstopics
可以看到,一级目录多了topicIds和topics。生产消息〔vagrantkraft1kafka2。133。3。1〕binkafkaconsoleproducer。shtopicitlab1024topic1bootstrapserverkraft1:9092,kraft2:9092,kraft3:909212消费消息
上面发送了1和2两个消息,看看能否消费到。〔vagrantkraft1kafka2。133。3。1〕binkafkaconsoleconsumer。shtopicitlab1024topic1bootstrapserverkraft1:9092,kraft2:9092,kraft3:9092frombeginning12
没问题,正常消费。Api使用
使用Java程序,来生产和消费消息。建立Maven项目并引入依赖dependencygroupIdorg。apache。kafkagroupIdkafkaclientsartifactIdversion3。3。1versiondependency创建消费者类packagecom。itlab1024。kafka;importorg。apache。kafka。clients。consumer。Consumer;importorg。apache。kafka。clients。consumer。ConsumerRecord;importorg。apache。kafka。clients。consumer。ConsumerRecords;importorg。apache。kafka。clients。consumer。KafkaConsumer;importjava。time。Duration;importjava。util。List;importjava。util。Properties;publicclassConsumerClient{SuppressWarnings(InfiniteLoopStatement)publicstaticvoidmain(String〔〕args){PropertiespropsnewProperties();props。setProperty(bootstrap。servers,kraft1:9092,kraft2:9092,kraft3:9092);props。setProperty(group。id,test);props。setProperty(enable。auto。commit,true);props。setProperty(auto。commit。interval。ms,1000);props。setProperty(key。deserializer,org。apache。kafka。common。serialization。StringDeserializer);props。setProperty(value。deserializer,org。apache。kafka。common。serialization。StringDeserializer);try(ConsumerString,StringconsumernewKafkaConsumer(props)){consumer。subscribe(List。of(itlab1024topic1));while(true){ConsumerRecordsString,Stringrecordsconsumer。poll(Duration。ofMillis(100));for(ConsumerRecordString,Stringrecord:records)System。out。printf(offsetd,keys,valuesn,record。offset(),record。key(),record。value());}}}}创建生产者类packagecom。itlab1024。kafka;importorg。apache。kafka。clients。producer。KafkaProducer;importorg。apache。kafka。clients。producer。Producer;importorg。apache。kafka。clients。producer。ProducerRecord;importjava。util。Properties;publicclassProducerClient{publicstaticvoidmain(String〔〕args){PropertiespropsnewProperties();props。put(bootstrap。servers,kraft1:9092,kraft2:9092,kraft3:9092);props。put(linger。ms,1);props。put(key。serializer,org。apache。kafka。common。serialization。StringSerializer);props。put(value。serializer,org。apache。kafka。common。serialization。StringSerializer);try(ProducerString,StringproducernewKafkaProducer(props)){for(inti0;i10;i){producer。send(newProducerRecord(itlab1024topic1,itlabi,itlabi));}}}}
运行消费者类,再执行生产者类,观察消费者类的控制台会输出如下内容:offset0,keyitlab1,valueitlab1offset1,keyitlab2,valueitlab2offset2,keyitlab5,valueitlab5offset3,keyitlab7,valueitlab7offset4,keyitlab8,valueitlab8offset0,keyitlab0,valueitlab0offset1,keyitlab3,valueitlab3offset2,keyitlab4,valueitlab4offset3,keyitlab6,valueitlab6offset4,keyitlab9,valueitlab9
说明也是能够正常生产和消费消息的。
上面基本介绍了KafkaRaft模式集群的搭建方式,并没有具体讲解配置含义(还有很多配置)。下一版会介绍!Kafka〔Kraft模式〕教程(二)基本解释
在教程一中创建了一个基础的KafkaRaft模式集群,但是并没有细讲该模式的具体细节,本文章来讲解下,我尽可能讲解的很清晰。
在kafka中节点服务器主要有两种角色,一种是controller,一种是broker,zookeeper和raft模式下都是这两种角色,不同的是zookeeper模式下的controller强依赖于zookeeper,zookeeper中存储了集群的元数据信息。
但是依赖于zookeeper有很多问题:首先使用zookeeper则多了一个组件,运维成本高zookeeper符合CAP悖论中的CP,也就是说zookeeper是强一致性的组件。那么如果集群中某个节点数据变更,就得通知其他节点同步,并且要超过半数完成才行,当节点较多的时候,性能下降明显。zookeeper的设计决定了它只适用于存储一些简单的配置或者是集群的元数据,数据量大的时候性能和稳定性就会下降,一些监听器也会延时甚至丢失。zookeeper本身也是分布式系统,主从结构,如果主节点挂掉,也会选举出来主节点,他的选举并不快,并且选举的时候是不能提供服务的。
那么Raft模式,弃用zookeeper后,controller中的信息就不会存储到zookeeper中了(zookeeper都没了),而是存储到了kafka自己的服务器上。
通过一张图来看下变化前后的区别(图片来源网络):
用QuorumController代替之前的Controller,Quorum中每个Controller节点都会保存所有元数据,通过KRaft协议保证副本的一致性。这样即使QuorumController节点出故障了,新的Controller迁移也会非常快。
在Kraft模式下,只有一小组专门选择的服务器可以充当控制器(设置process。roles包含controller),controller服务器的作用是参与元数据的仲裁。
多个controller服务器只有一个是active状态的,其他的都是standby状态的(也就是备用服务器)。
controler服务器的数量遵循Quorum原则(过半原则),也就是说要奇数个,比如3个服务器允许1个故障,5个服务器允许2个故障。配置说明
kraft模式下的配置文件在configkraft目录下。〔vagrantkraft3kraft〕pwdhomevagrantkafka2。133。3。1configkraft〔vagrantkraft3kraft〕lsbroker。propertiescontroller。propertiesREADME。mdserver。properties
这里有三个properties文件,三个文件中内容基本相同,唯一不同的是process。roles的配置。broker。properties:process。rolesbroker,代表该服务器只是broker角色。controller。properties:process。rolescontroller,代表该服务器只是controller角色。server。properties:process。rolesbroker,controller,代表该服务器既是broker角色也是controller角色。
kafka只是给我们提供了三个不同角色的配置文件,方便我们使用而已。
文件中的具体配置内容,才是我们应该重视的,接下来一个一个说明,并尝试修改默认配置进行试验!process。roles
用于配置服务器的角色,可以有如下配置。process。rolesbroker,代表该服务器只是broker角色。process。rolescontroller,代表该服务器只是controller角色。process。rolesbroker,controller,代表该服务器既是broker角色也是controller角色。
也可以不配置,如果不配置,则说明当前集群不是kraft模式,而是zookeeper模式。
说明:目前还不支持两种模式自由切换(以后是否支持也不清楚),如果要切换模式,比如重新使用binkafkastorage。sh重新格式化(重新格式化数据肯定会丢失的,特别注意!)
同时具有broker和controller两种角色的服务器(也叫组合服务器)在开发环境中是很好的(服务器可能较少,搭建方便),但是在生产环境中是不推荐的,因为这样做会导致broker和controller的隔离性差,不可能在组合模式下单独滚动或缩放controller与broker。
试验:
考虑我之前搭建的集群,三台机器都是配置的process。rolesbroker,controller,这是不好的!博主信息
个人主页:https:itlab1024。com
Github:https:github。comitlab1024