前言:基于Windows系统下的Kafka环境搭建;以及使用。NET6环境进行开发简单的生产者与消费者的演示。 一、环境部署 Kafka是使用Java语言和Scala语言开发的,所以需要有对应的Java环境,以及Scala语言环境。 Java环境配置,如果不清楚的,可以查看鄙人的另一篇博客: https:www。cnblogs。comweskynetp14852471。html https:www。scalalang。orgdownloadscala2。html 要选择Binaries版本的环境,否则需要自己编译: 2、Kafka基于Zookeeper环境运行,zookeeper提供给kafka一系列的功能支持,所以还需要安装Zookeeper有关的环境。下载zookeeper地址: https:zookeeper。apache。orgreleases。htmldownload 3、同样,Zookeeper也需要下载带bin的链接,没有带bin的链接,可能是源码,需要自己编译: https:kafka。apache。orgdownloads。html 5、同样需要选择下载binary版本,然后根据scala的版本选择对应的版本。 6、下载的三个安装包,如图所示: 7、先安装Scala语言包环境: 8、验证Scala语言包是否安装成功: 控制台窗口,输入:scalaversion 如果提示类似如下有关版本信息,则代表安装成功。 9、然后是安装zookeeper环境。必须先启动zookeeper,才可以使用kafka。 安装zookeeper环境,先解压下载的包,然后在解压后的目录下新增data文件夹 10、然后复制data文件夹的绝对路径,备用。在conf文件夹下,编辑cfg文件 11、在cfg文件内,修改dataDir指定为上面新建的data文件夹的绝对路径。注意路径是斜杠,如果要使用反斜杆,需要写双反斜杠 12、也要更改cfg格式的文件名称为zoo。cfg否则zookeeper无法识别配置文件。Zoo。cfg文件是zookeeper启动时候自动关联的默认配置文件名称。 13、然后新建环境变量ZOOKEEPERHOME: 14、环境变量path新增:ZOOKEEPERHOMEbin 15、启动zookeeper,直接任意打开控制台,输入zkServer 16、如果都没有报错,一般是启动成功了的。再次验证下,可以任意开个控制台,输入JPS进行查看,如下图所示,有JPS、也有QuorumPeerMain,代表zookeeper启动成功了。 17、Kafka环境安装。先解压,然后在解压后的目录下,新增logs文件夹 18、然后在Config文件夹下,修改server。properties文件,修改log。dirs的值为新增的logs文件夹的绝对路径 19、进入到解压后的kafka目录下,在路径栏输入cmd,快速打开当前文件夹下的控制台窗口: 20、输入命令: 。binwindowskafkaserverstart。bat。configserver。properties 进行启动Kafka服务: 21、启动Kafka报错了,可能是版本问题,kafka一般新版本对windows环境不友好,所以降级一下。此处我把kafka3。0降级为2。8: 22、此处我下载的版本为2。132。8。1,各位大佬们可以按照自己意愿选择版本。可能2。x版本和3。x版本跨度比较大,所以3。0版本没法玩。 23、然后是重复以上配置kafka有关的动作,修改有关配置文件以及新增logs文件夹等。此处省略。 24、接着在低版本的kafka目录下,快速进入当前解压缩的目录下,再次输入有关命令尝试一下: 25、没有提示错误,根据提示信息,代表是启动成功了。任意打开控制台,再输入JPS查看下,可以看到Kafka,确认是启动OK了。 https:www。kafkatool。comdownload。html 27、安装可视化工具,默认可以一直下一步: 28、可以在安装目录下把可执行程序发送到桌面快捷方式,方便打开。 29、一些配置,包括名称、kafka版本、端口号、服务地址等 30、连接以后的效果图,如下。Topic是空的,接下来写点代码。 二、代码开发与测试 31、新建类库项目,当作kafka服务类库 32、此处选择标准库2。1,用于可以给多种。netcore版本使用,方便兼容。 33、引用Confluent。Kafka包。 34、此处新增发布服务类和订阅服务类: 35、新增的生产者发布服务方法代码如下: 代码: Description:Kafka生产者发布服务 CreateTime:202212119:35:27 Author:Wesky summary publicclassPublishService:IPublishService { publicasyncTaskPublishAsync(stringbroker,stringtopicName,TMessagemessage)whereTMessage:class { varconfignewProducerConfig { BootstrapServersbroker,kafka服务集群,例如192。168。0。1:9092,192。168。0。2:9092或者单机192。168。0。1:9092 AcksAcks。All, MessageSendMaxRetries3,发送失败重试的次数 }; using(varproducernewProducerBuilderspanstylefontsize:inherit;lineheight:1。5;color:rgb(0,0,255);stringspan,string(config)。Build()) { try { stringdataNewtonsoft。Json。JsonConvert。SerializeObject(message); varsendDatanewMessagespanstylefontsize:inherit;lineheight:1。5;color:rgb(0,0,255);stringspan,string{KeyGuid。NewGuid()。ToString(N),Valuedata}; varreportawaitproducer。ProduceAsync(topicName,sendData); Console。WriteLine(消息:{data}r发送到:{report。TopicPartitionOffset}); } catch(ProduceExceptionspanstylefontsize:inherit;lineheight:1。5;color:rgb(0,0,255);stringspan,stringex) { Console。WriteLine(消息发送失败:rCode{ex。Error。Code}rError{ex。Message}); } } } } 36、新增的消费者接收服务方法代码如下: 代码: Description:kafka消费者订阅服务 CreateTime:202212119:36:25 Author:Wesky summary publicclassSubscribeService:ISubscribeService { 消费者服务核心代码 summary 消费者配置信息param 主题集合param param param publicasyncTaskSubscribeAsync(ConsumerConfigconfig,IEnumerablespanstylefontsize:inherit;lineheight:1。5;color:rgb(0,0,255);stringspantopics,Actionfunc,CancellationTokencancellationToken)whereTMessage:class { constintcommitPeriod1; using(varconsumernewConsumerBuilderIgnore,string(config) 。SetErrorHandler((,e) { Console。WriteLine(消费错误:{e。Reason}); }) 。SetStatisticsHandler((,json) { Console。WriteLine(); }) 。SetPartitionsAssignedHandler((c,partitionList) { stringpartitionsstring。Join(,,partitionList); Console。WriteLine(分配的分区:{partitions}); }) 。SetPartitionsRevokedHandler((c,partitionList) { stringpartitionsstring。Join(,,partitionList); Console。WriteLine(回收的分区:{partitions}); }) 。Build()) { consumer。Subscribe(topics); try { while(true) { try { varconsumeResultconsumer。Consume(cancellationToken); if(consumeResult。IsPartitionEOF) { continue; } if(consumeResult?。OffsetcommitPeriod0){ try { varresultJsonConvert。DeserializeObject(consumeResult。Message?。Value); func(result);消费消息 } catch(Exceptionex) { Console。WriteLine(消费业务处理失败:{ex。Message}); } try { consumer。Commit(consumeResult);手动提交 Console。WriteLine(消费者消费完成,已提交); } catch(KafkaExceptione) { Console。WriteLine(提交错误:{e。Error。Reason}); } } } catch(ConsumeExceptione) { Console。WriteLine(消费错误:{e。Error。Reason}); } } } catch(Exceptione) { Console。WriteLine(其他错误:{e。Message}); consumer。Close(); } } awaitTask。CompletedTask; } }pre 37、并且提供对应的接口服务,用于开放给外部调用,或者提供依赖注入使用: 38、新建一个控制台项目,用来当作消费者端的测试,并且新增一个方法,用来当作消费者接收到消息以后的业务处理方法体。此处控制台环境版本为。NET6 39、消费客户端代码如下。其中,BootstrapServers也可以提供集群地址,例如ip1:port,ip2:port服务之间以半角逗号隔开。 40、再新增一个webapi项目,用来当作生产者的客户端进行发送数据。以及对kafka服务类部分进行依赖注入注册,此处使用单例。该webapi此处使用。NET6环境,带有控制器的模式。 41、新增的控制器里面,进行生产者的注入与实现。注意:topicName参数对应上边的topicwesky,通过主题绑定,否则消费者不认识就没办法消费到了。 控制器代码: 〔Route(api〔controller〕〔action〕)〕 〔ApiController〕 publicclassProducerController:ControllerBase { IPublishServiceservice; publicProducerController(IPublishServicepublishService) { servicepublishService; } 〔HttpPost〕 publicIActionResultSendMessage(stringbroker,stringtopicName,stringmessage) { service。PublishAsync(broker,topicName,message); returnOk(); } } 42、接下来是一些测试,如图所示: 43、最后,使用可视化管理工具Offset进行查看,可以看到对应的主题。选中主题,可以设置数据类型,这里我设置为字符串,就可以查看到对应的消息内容了。如果没有设置,默认是16进制的数据。 44、查看刚刚测试时候收发的消息队列里面的数据,如下所示: 45、一些额外补充: Kafka也是消息队列的一种,用于在高吞吐量场景下使用比较适合。如果是轻量级的,只需要用于削峰,可以使用RabbitMQ。 以上只是简单的操作演示,至于要用得溜,观众朋友们可以自行补充所需的相关理论知识。 可视化工具还有一款yahoo提供的开源的工具,叫kafkamanager,有兴趣的大佬们可以自行玩玩,开源地址: https:github。comyahooCMAK 还有一款滴滴平台做的开源的kafka运维管理平台,有兴趣的大佬们也可以自行了解,地址: https:github。comdidiLogiKM 以上就是该博客的全部内容,感谢各位大佬们的观看