官网:http:kafka。apachecn。orgdocumentation。html开始学习吧。 先启动zk,然后启动kafka。就行了,安装不做介绍了,都是简单修改下配置就行了。相信都会。不过我担忧下载的网速(真的坑,现在肯能下载不动,所以我找了个老版本)。前提环境就是有Java环境就可以了。 kafka版本号前面部分是scala版本,后面部分是kafka版本,我是用的是0。11。0。3版本,所以客户端最好跟着一致。〔adminhadoop1kafka〕lltotal0drwxrxrx7adminadmin94Nov1006:18kafka2。110。11。0。3 启动命令:1。测试环境推荐,可以实时看日志报错。。kafkaserverstart。sh。。configserver。properties2。线上推荐,后台模式。kafkaserverstart。shdaemon。。configserver。properties快速开始 最好加入日志框架,然后放入配置文件,看日志很重要。dependenciesdependencygroupIdorg。apache。kafkagroupIdkafkaclientsartifactIdversion0。11。0。3versiondependencyLOG4J配置。。。dependencies 服务器端, org。apache。kafka。clients。CommonClientConfigs下面就是客户端服务器的通用配置 org。apache。kafka。clients。producer。ProducerConfig是生产者的配置信息。 org。apache。kafka。clients。consumer。ConsumerConfig是消费者配置。 org。apache。kafka。common。config。TopicConfig是topic的通用配置信息。 org。apache。kafka。common。config。SslConfigsSSL配置 这些配置信息都是Config表示的配置的key,Doc表示解释。我表示不理解为啥呢。哈哈哈不浪费内存吗 一般只是用ConsumerConfig和ProducerConfig足矣了。 配置太多。但是有个技巧。我这里写了个程序反射获取他的成员变量。然后可以输出他的配置信息。FieldfieldConsumerConfig。class。getDeclaredField(CONFIG);field。setAccessible(true);ConfigDefdef(ConfigDef)field。get(null);StringhtmlTabledef。toHtmlTable();FileOutputStreamstreamnewFileOutputStream(consumer。html);stream。write(htmlTable。getBytes());stream。close(); 如果配置不懂的这个官方网站也可以看http:kafka。apachecn。orgdocumentation。htmlproducerconfigs 基本就是文档了。,很详细。那么开始吧。 服务端代码:publicclassProducer{publicstaticvoidmain(String〔〕args){finalStringurlhadoop1:9092;finalStringtopictopic1;配置。HashMapString,ObjectconfignewHashMap();连接地址config。put(ProducerConfig。BOOTSTRAPSERVERSCONFIG,url);ACKconfig。put(ProducerConfig。ACKSCONFIG,all);相应超时。config。put(ProducerConfig。TRANSACTIONTIMEOUTCONFIG,5000);缓冲区大小。(发送给服务器的)config。put(ProducerConfig。BUFFERMEMORYCONFIG,1024102410);每次最多发10Kconfig。put(ProducerConfig。MAXBLOCKMSCONFIG,102410);不重试,有些非幂等性可以。config。put(ProducerConfig。RETRIESCONFIG,0);snappy压缩。。config。put(ProducerConfig。COMPRESSIONTYPECONFIG,snappy);序列化config。put(ProducerConfig。KEYSERIALIZERCLASSCONFIG,StringSerializer。class);config。put(ProducerConfig。VALUESERIALIZERCLASSCONFIG,StringSerializer。class);ok了。KafkaProducerString,StringproducernewKafkaProducer(config);IntStream。range(0,10)。forEach(value{发送producer。send(newProducerRecord(topic,curtime,String。format(id:d,time:d。,value,System。currentTimeMillis())),(metadata,exception){});});最后记得刷新出去。producer。flush();}} 消费端:publicclassConsumer{publicstaticvoidmain(String〔〕args){finalStringtopictopic1;finalStringgroupconsumer1;finalStringurlhadoop1:9092;HashMapString,ObjectconfignewHashMap();config。put(ConsumerConfig。GROUPIDCONFIG,group);config。put(ConsumerConfig。BOOTSTRAPSERVERSCONFIG,url);config。put(ConsumerConfig。AUTOOFFSETRESETCONFIG,earliest);config。put(ConsumerConfig。ENABLEAUTOCOMMITCONFIG,false);config。put(ConsumerConfig。KEYDESERIALIZERCLASSCONFIG,StringDeserializer。class);config。put(ConsumerConfig。VALUEDESERIALIZERCLASSCONFIG,StringDeserializer。class);KafkaConsumerString,StringconsumernewKafkaConsumer(config);consumer。subscribe(Collections。singletonList(topic));while(true){ConsumerRecordsString,Stringpollconsumer。poll(500);poll。forEach(recordSystem。out。println(String。format(topic:s,key:s,value:s,offset:d。,record。topic(),record。key(),record。value(),record。offset())));提交偏移量。consumer。commitSync();}}} 结果:消费端输出:topic:topic1,key:curtime,value:id:0,time:1582971209662。,offset:0。topic:topic1,key:curtime,value:id:1,time:1582971209859。,offset:1。topic:topic1,key:curtime,value:id:2,time:1582971209859。,offset:2。topic:topic1,key:curtime,value:id:3,time:1582971209859。,offset:3。topic:topic1,key:curtime,value:id:4,time:1582971209859。,offset:4。topic:topic1,key:curtime,value:id:5,time:1582971209859。,offset:5。topic:topic1,key:curtime,value:id:6,time:1582971209860。,offset:6。topic:topic1,key:curtime,value:id:7,time:1582971209860。,offset:7。topic:topic1,key:curtime,value:id:8,time:1582971209866。,offset:8。topic:topic1,key:curtime,value:id:9,time:1582971209867。,offset:9。 我们可以发现kafka的偏移量是从0开始的。 我们发现服务端日志:先去创建一个topic,去zk中。〔2020030102:10:26,840〕INFOTopiccreation{version:1,partitions:{0:〔0〕}}(kafka。admin。AdminUtils)〔2020030102:10:26,845〕INFO〔KafkaApi0〕Autocreationoftopictopic1with1partitionsandreplicationfactor1issuccessful(kafka。server。KafkaApis)〔2020030102:10:26,891〕INFO〔ReplicaFetcherManageronbroker0〕Removedfetcherforpartitionstopic10(kafka。server。ReplicaFetcherManager)〔2020030102:10:26,896〕INFOLoadingproducerstatefromoffset0forpartitiontopic10withmessageformatversion2(kafka。log。Log)创建日志文件。分区号是0〔2020030102:10:26,896〕INFOCompletedloadoflogtopic10with1logsegments,logstartoffset0andlogendoffset0in1ms(kafka。log。Log)〔2020030102:10:26,897〕INFOCreatedlogforpartition〔topic1,0〕inhomeadminkafkakafka2。110。11。0。3logswithproperties{compression。typeproducer,message。format。version0。11。0IV2,file。delete。delay。ms60000,max。message。bytes1000012,min。compaction。lag。ms0,message。timestamp。typeCreateTime,min。insync。replicas1,segment。jitter。ms0,preallocatefalse,min。cleanable。dirty。ratio0。5,index。interval。bytes4096,unclean。leader。election。enablefalse,retention。bytes1,delete。retention。ms86400000,cleanup。policy〔delete〕,flush。ms9223372036854775807,segment。ms604800000,segment。bytes1073741824,retention。ms604800000,message。timestamp。difference。max。ms9223372036854775807,segment。index。bytes10485760,flush。messages9223372036854775807}。(kafka。log。LogManager)消费者推出〔2020030102:12:03,532〕INFO〔GroupCoordinator0〕:Memberconsumer1f17da2c971ad4035b36676bdebba5951ingroupconsumer1hasfailed,removingitfromthegroup(kafka。coordinator。group。GroupCoordinator)topic topic的概念就是最小的主题单位,不能比他在小了。最起码也要有一个topic。是消费的最小主题。如果你学过RocketMQ那么他可能还有很多方式。 我们要知道kafka是一个读写均衡的中间件,所以他所做的将top分区处理,让组分区究竟做什么 我们继续测试。我们将服务器端不断发送。 此时客户端同时有两个客户端在一个组内。就是都是consumer1 此时发现只有最先接入的那个组可以收到消息,第二个不可以,当第一个客户端退出的时候,第二个客户端才去消费。 我这里有一张图: 客户端二先去消费去了。然后挂掉了。 客户端一此时就终于可以收到消息了。 此时我们发现客户端2成功拉去到了397。 服务器日志是:〔2020030102:13:41,787〕INFO〔GroupCoordinator0〕:Preparingtorebalancegroupconsumer1witholdgeneration4(consumeroffsets6)(kafka。coordinator。group。GroupCoordinator) 我们发现一个组内,是不可以同时消费的,而且他限制组内只允许一个人访问。 kafka会帮助我们记住这个组的偏移量,以组名字做区分,所以一个新的组,必须从最开始开始读,不能设置为null偏移量longoffsetproducer。send(newProducerRecord(topic1,curtime,String。format(id:d,time:d。,value,System。currentTimeMillis())),(metadata,exception){})。get()。offset();logger。info(偏移量:{},offset); 我们可以通过以下方式来每次获取偏移量。 消费端偏移量,是通过这个参数设计的,第一初始化组的时候不能使用none(因为Broker没有记录此消费者组的offset),但是可以使用earliest。如果其组内已经有人启动了,那么此时就算你设置earliest,也会根据组内人士偏移量决定的。config。put(ConsumerConfig。AUTOOFFSETRESETCONFIG,earliest);分区和组的关系 我们单机上再拷贝一个kafka,改成partition为2,只需要改以下三个参数,根据情况改,(懒得开台虚拟机,外网测试根本连不上,不知道为啥哈哈哈。)第一台我的是0,第二台是1broker。id1listenersPLAINTEXT::9093log。dirshomeadminkafka2kafka2。110。11。0。3logsnum。partitions2 代码:finalStringurlhadoop1:9092,hadoop1:9093;finalStringtopictopic2;同时我们打印partition的位置服务端RecordMetadatafutureproducer。send(newProducerRecord(topic2,curtimevalue,String。format(id:d,time:d。,value,System。currentTimeMillis())),(metadata,exception){})。get();logger。info(偏移量:{},分区:{}。,future。offset(),future。partition());客户端ConsumerRecordsString,Stringpollconsumer。poll(500);poll。forEach(recordSystem。out。println(String。format(topic:s,key:s,value:s,offset:d,partition:d。,record。topic(),record。key(),record。value(),record。offset(),record。partition())));consumer。commitAsync(); 启动。一台生产者,三个消费者在同一个组内。 我们发现: 生产者:2020022920:40:25,590390057〔main〕INFOcom。example。producer。Producer偏移量:610,分区:1。2020022920:40:26,112390579〔main〕INFOcom。example。producer。Producer偏移量:611,分区:1。2020022920:40:26,629391096〔main〕INFOcom。example。producer。Producer偏移量:508,分区:0。2020022920:40:27,153391620〔main〕INFOcom。example。producer。Producer偏移量:612,分区:1。2020022920:40:27,657392124〔main〕INFOcom。example。producer。Producer偏移量:613,分区:1。2020022920:40:28,164392631〔main〕INFOcom。example。producer。Producer偏移量:509,分区:0。 消费者1:topic:topic2,key:curtime756,value:id:756,time:1582980020546。,offset:502,partition:0。topic:topic2,key:curtime757,value:id:757,time:1582980021048。,offset:503,partition:0。topic:topic2,key:curtime758,value:id:758,time:1582980021552。,offset:504,partition:0。topic:topic2,key:curtime759,value:id:759,time:1582980022056。,offset:505,partition:0。topic:topic2,key:curtime760,value:id:760,time:1582980022561。,offset:506,partition:0。 消费者2:topic:topic2,key:curtime804,value:id:804,time:1582980044811。,offset:633,partition:1。topic:topic2,key:curtime810,value:id:810,time:1582980047837。,offset:634,partition:1。topic:topic2,key:curtime812,value:id:812,time:1582980048844。,offset:635,partition:1。topic:topic2,key:curtime813,value:id:813,time:1582980049347。,offset:636,partition:1。topic:topic2,key:curtime815,value:id:815,time:1582980050352。,offset:637,partition:1。topic:topic2,key:curtime820,value:id:820,time:1582980052870。,offset:638,partition:1。 消费者3:显然是死的,卡着不动,此时当我们断掉消费者2,此时会打印如下日志。成功均衡。020022920:41:49,409454117〔main〕INFO。internals。AbstractCoordinator(Re)joininggroupconsumer12020022920:41:51,850456558〔main〕INFO。internals。AbstractCoordinatorSuccessfullyjoinedgroupconsumer1withgeneration412020022920:41:51,851456559〔main〕INFO。internals。ConsumerCoordinatorSettingnewlyassignedpartitions〔topic21〕forgroupconsumer1topic:topic2,key:curtime912,value:id:912,time:1582980099342。,offset:690,partition:1。模拟down机,保证可靠性 当我们把一台机器关闭,生产者消费者都会抛出一下异常。2020022920:56:11,733202726〔ucer1〕WARNhe。kafka。clients。NetworkClientConnectiontonode1couldnotbeestablished。Brokermaynotbeavailable。2020022920:56:13,785204778〔ucer1〕WARNhe。kafka。clients。NetworkClientConnectiontonode1couldnotbeestablished。Brokermaynotbeavailable。抛出一次。就是无法和topic20也就是第0个分区联系。会等待30S超时,这个属性我们可以自己设置。Causedby:org。apache。kafka。common。errors。TimeoutException:Expiring1record(s)fortopic20:30084mshaspassedsincebatchcreationpluslingertime2020022920:56:40,313231306〔main〕INFOcom。example。producer。Producer偏移量:836,分区:1。 这个会不断的重试。失败就放弃,继续重试,其实这个partition策略我们可以自己写。 由于我们模拟的单线程操作,也就是会阻塞。所以很正常。正常开发都是多线程。但是这个超时是逃避不了的。比如一个web请求,你这里超时30S,这个绝对不行。 所以我们调整参数,改成3S。config。put(ProducerConfig。REQUESTTIMEOUTMSCONFIG,3000); 此时报错就是,这个超时时间自己根据业务把握,不一定越小越好。Causedby:org。apache。kafka。common。errors。TimeoutException:Expiring1record(s)fortopic20:3058mshaspassedsincebatchcreationpluslingertime 所以kafka的可靠性是极高的,不会因为一个broker挂掉了,业务就无法进行了。此时连不上会将数据全部写到另外一个分区中,当重新启动又会恢复平衡,所以可靠性极高。分区策略 简单实现一个org。apache。kafka。clients。producer。Partitioner接口吧。publicclassOrderPartitionerimplementsPartitioner{topic计数器。每个topic都维护一个计数器。这里可以考虑把map设置为安全的,因为会出现并发问题。privateHashMapString,AtomicIntegermapnewHashMap();privatestaticfinalFunctionString,AtomicIntegerprovidersnewAtomicInteger(0);这里业务逻辑其实不对,如果写入失败,那么永远也是Overridepublicintpartition(Stringtopic,Objectkey,byte〔〕keyBytes,Objectvalue,byte〔〕valueBytes,Clustercluster){ListPartitionInfolistcluster。availablePartitionsForTopic(topic);AtomicIntegerintegermap。computeIfAbsent(topic,provider);returninteger。incrementAndGet()list。size();}Overridepublicvoidclose(){清空释放内存map。clear();}Overridepublicvoidconfigure(MapString,?configs){}} 简单的加入到生成者的配置中去,config。put(ProducerConfig。PARTITIONERCLASSCONFIG,OrderPartitioner。class); 结果就是:发现很均匀。2020022921:38:53,3091358〔main〕INFOcom。example。producer。Producer偏移量:1224,分区:1。2020022921:38:53,8131862〔main〕INFOcom。example。producer。Producer偏移量:1006,分区:0。2020022921:38:54,3182367〔main〕INFOcom。example。producer。Producer偏移量:1225,分区:1。2020022921:38:54,8202869〔main〕INFOcom。example。producer。Producer偏移量:1007,分区:0。2020022921:38:55,3233372〔main〕INFOcom。example。producer。Producer偏移量:1226,分区:1。拦截器功能拦截器,必须传入一个集合,config。put(ProducerConfig。INTERCEPTORCLASSESCONFIG,Collections。singletonList(MyProducerInterceptor。class)); 简单写一个吧,publicclassMyProducerInterceptorimplementsProducerInterceptor{调用send方法会回调到这里。OverridepublicProducerRecordonSend(ProducerRecordrecord){System。out。println(MyProducerInterceptoronSend);returnrecord;}当服务器返回数据会调用这里。OverridepublicvoidonAcknowledgement(RecordMetadatametadata,Exceptionexception){System。out。println(MyProducerInterceptoronAcknowledgement);}Overridepublicvoidclose(){}Overridepublicvoidconfigure(MapString,?configs){}} 我们再看看打印日志:MyProducerInterceptoronSendOrderPartitionerMyProducerInterceptoronAcknowledgement2020022921:52:29,2623371〔main〕INFOcom。example。producer。Producer偏移量:1238,分区:1。 就是先拦截。后给分区去处理,如果你把send方法返回一个null。我的程序反正直接退出了,哈哈哈哈。因为org。apache。kafka。clients。producer。KafkaProducersend这里往下走,有个sendon方法里需要处理这个结果。所以基本上这个是用作包装或者统计,或者实现回调,让主线程可以不使用get,使其阻塞。就这个。总结一下。 一。一个分区只能被同一个组内某一个人消费。 其实就是一个消费者组和分区是一一对应的,也就是上面那个问题。 两个分区,但是一个消费组确有三个人或者更多人消费。此时只会有两个人,俩人各连一个分区。 二。分区数,比如我第一个topic1,一开始是一个分区,就算我服务器配置改成了两个分区,此时需要修改元信息。 三。一个分区默认就是有序的。不用考虑顺序性。对于顺序性比较强的业务可以考虑将其设置为一个分区,获取通过接口编写你所需要的需求。分区可以解决down机等问题,所以并不推荐 四。高可靠性,可以保证一台服务器down机,其他仍然可以处理(主机从机一样,他会自行选举,我两台机器都没啥问题)。 五。分区策略,灵活性。相信我这些基本满足你开发,。 以上虽然有大量的日志,是让大家方便理解。谢谢。其实对于大多数概念来说,比如自己练习练习,概念毕竟是概念,如何修改分区副本数量 分区副本是两个不同的概念。分区属于leader,副本属于follower,所以这里就是一个防止leaderDown机的问题,follower只会做一件事就是同步leader。但是follower不对外提供读写服务的,这里是为了防止数据不一致问题,因为从机跟随会有延时的。但是有些场景我感觉是满足的,因为kafka这种一个分区对应一个组的一个消费者很好地捆绑并不会发生不一致问题,最多也就是一个读慢了,个人觉得。 在线修改配置。这个简单,其实有些小伙伴经常查看zk的话,发现他就是将信息保存在zk中,,其实修改zk就可以了, 比如查看topic2的主题信息〔adminhadoop1bin〕。kafkatopics。shzookeeperlocalhost:2181describetopictopic2Topic:topic2PartitionCount:2ReplicationFactor:1Configs:Topic:topic2Partition:0Leader:1Replicas:1Isr:1Topic:topic2Partition:1Leader:0Replicas:0Isr:0 Leader:分区0的leader是主机1上,分区1的leader是主机0。 Replicas:分区0的副本在主机1上,分区1的副本在主机0上 ISR:表示副本跟随的进度。如果和副本主机号一致,说明跟随一致。 如果我们想修改分区数量:。kafkatopics。shzookeeperlocalhost:2181alterpartitions2topictopic1 修改会有一个警告信息,就是分区可能影响你原来的业务逻辑。不过提示成功了。〔adminhadoop1bin〕。kafkatopics。shzookeeperlocalhost:2181alterpartitions2topictopic1WARNING:Ifpartitionsareincreasedforatopicthathasakey,thepartitionlogicororderingofthemessageswillbeaffectedAddingpartitionssucceeded! 修改完赶紧测试一下。。发现,其实不同分区的偏移量都是独立计算的。不过也无所谓,2020022922:32:59,8761456〔main〕INFOcom。example。producer。Producer偏移量:0,分区:1。2020022922:33:00,4081988〔main〕INFOcom。example。producer。Producer偏移量:638,分区:0。2020022922:33:00,9152495〔main〕INFOcom。example。producer。Producer偏移量:1,分区:1。 从机选举成主机的过程,两台机器无法实现,必须是至少一个leader和两个follower才可以选举成功。实在懒得测试请求理解。哈哈哈哈。其实这些都是运维做的。可以感兴趣再测试一下。很简单的。kafkaconsoleconsumer。shbootstrapserverlocalhost:9092topictopicNamefrombeginning下一期深入理解kafka 这一节只是了解了如何使用,这个根本不够我们需要学习kafka是如何做的。实现高读写,高可靠性,高拓展性,这是一个分布式设计必备的。