专栏电商日志财经减肥爱情
投稿投诉
爱情常识
搭配分娩
减肥两性
孕期塑形
财经教案
论文美文
日志体育
养生学堂
电商科学
头戴业界
专栏星座
用品音乐

Kafka基于Windows的Kafka有关环境搭建以及使用

  前言:基于Windows系统下的Kafka环境搭建;以及使用。NET6环境进行开发简单的生产者与消费者的演示。
  一、环境部署
  Kafka是使用Java语言和Scala语言开发的,所以需要有对应的Java环境,以及Scala语言环境。
  Java环境配置,如果不清楚的,可以查看鄙人的另一篇博客:
  https:www。cnblogs。comweskynetp14852471。html
  https:www。scalalang。orgdownloadscala2。html
  要选择Binaries版本的环境,否则需要自己编译:
  2、Kafka基于Zookeeper环境运行,zookeeper提供给kafka一系列的功能支持,所以还需要安装Zookeeper有关的环境。下载zookeeper地址:
  https:zookeeper。apache。orgreleases。htmldownload
  3、同样,Zookeeper也需要下载带bin的链接,没有带bin的链接,可能是源码,需要自己编译:
  https:kafka。apache。orgdownloads。html
  5、同样需要选择下载binary版本,然后根据scala的版本选择对应的版本。
  6、下载的三个安装包,如图所示:
  7、先安装Scala语言包环境:
  8、验证Scala语言包是否安装成功:
  控制台窗口,输入:scalaversion
  如果提示类似如下有关版本信息,则代表安装成功。
  9、然后是安装zookeeper环境。必须先启动zookeeper,才可以使用kafka。
  安装zookeeper环境,先解压下载的包,然后在解压后的目录下新增data文件夹
  10、然后复制data文件夹的绝对路径,备用。在conf文件夹下,编辑cfg文件
  11、在cfg文件内,修改dataDir指定为上面新建的data文件夹的绝对路径。注意路径是斜杠,如果要使用反斜杆,需要写双反斜杠
  12、也要更改cfg格式的文件名称为zoo。cfg否则zookeeper无法识别配置文件。Zoo。cfg文件是zookeeper启动时候自动关联的默认配置文件名称。
  13、然后新建环境变量ZOOKEEPERHOME:
  14、环境变量path新增:ZOOKEEPERHOMEbin
  15、启动zookeeper,直接任意打开控制台,输入zkServer
  16、如果都没有报错,一般是启动成功了的。再次验证下,可以任意开个控制台,输入JPS进行查看,如下图所示,有JPS、也有QuorumPeerMain,代表zookeeper启动成功了。
  17、Kafka环境安装。先解压,然后在解压后的目录下,新增logs文件夹
  18、然后在Config文件夹下,修改server。properties文件,修改log。dirs的值为新增的logs文件夹的绝对路径
  19、进入到解压后的kafka目录下,在路径栏输入cmd,快速打开当前文件夹下的控制台窗口:
  20、输入命令:
  。binwindowskafkaserverstart。bat。configserver。properties
  进行启动Kafka服务:
  21、启动Kafka报错了,可能是版本问题,kafka一般新版本对windows环境不友好,所以降级一下。此处我把kafka3。0降级为2。8:
  22、此处我下载的版本为2。132。8。1,各位大佬们可以按照自己意愿选择版本。可能2。x版本和3。x版本跨度比较大,所以3。0版本没法玩。
  23、然后是重复以上配置kafka有关的动作,修改有关配置文件以及新增logs文件夹等。此处省略。
  24、接着在低版本的kafka目录下,快速进入当前解压缩的目录下,再次输入有关命令尝试一下:
  25、没有提示错误,根据提示信息,代表是启动成功了。任意打开控制台,再输入JPS查看下,可以看到Kafka,确认是启动OK了。
  https:www。kafkatool。comdownload。html
  27、安装可视化工具,默认可以一直下一步:
  28、可以在安装目录下把可执行程序发送到桌面快捷方式,方便打开。
  29、一些配置,包括名称、kafka版本、端口号、服务地址等
  30、连接以后的效果图,如下。Topic是空的,接下来写点代码。
  二、代码开发与测试
  31、新建类库项目,当作kafka服务类库
  32、此处选择标准库2。1,用于可以给多种。netcore版本使用,方便兼容。
  33、引用Confluent。Kafka包。
  34、此处新增发布服务类和订阅服务类:
  35、新增的生产者发布服务方法代码如下:
  代码:
  Description:Kafka生产者发布服务
  CreateTime:202212119:35:27
  Author:Wesky
  summary
  publicclassPublishService:IPublishService
  {
  publicasyncTaskPublishAsync(stringbroker,stringtopicName,TMessagemessage)whereTMessage:class
  {
  varconfignewProducerConfig
  {
  BootstrapServersbroker,kafka服务集群,例如192。168。0。1:9092,192。168。0。2:9092或者单机192。168。0。1:9092
  AcksAcks。All,
  MessageSendMaxRetries3,发送失败重试的次数
  };
  using(varproducernewProducerBuilderspanstylefontsize:inherit;lineheight:1。5;color:rgb(0,0,255);stringspan,string(config)。Build())
  {
  try
  {
  stringdataNewtonsoft。Json。JsonConvert。SerializeObject(message);
  varsendDatanewMessagespanstylefontsize:inherit;lineheight:1。5;color:rgb(0,0,255);stringspan,string{KeyGuid。NewGuid()。ToString(N),Valuedata};
  varreportawaitproducer。ProduceAsync(topicName,sendData);
  Console。WriteLine(消息:{data}r发送到:{report。TopicPartitionOffset});
  }
  catch(ProduceExceptionspanstylefontsize:inherit;lineheight:1。5;color:rgb(0,0,255);stringspan,stringex)
  {
  Console。WriteLine(消息发送失败:rCode{ex。Error。Code}rError{ex。Message});
  }
  }
  }
  }
  36、新增的消费者接收服务方法代码如下:
  代码:
  Description:kafka消费者订阅服务
  CreateTime:202212119:36:25
  Author:Wesky
  summary
  publicclassSubscribeService:ISubscribeService
  {
  消费者服务核心代码
  summary
  消费者配置信息param
  主题集合param
  param
  param
  publicasyncTaskSubscribeAsync(ConsumerConfigconfig,IEnumerablespanstylefontsize:inherit;lineheight:1。5;color:rgb(0,0,255);stringspantopics,Actionfunc,CancellationTokencancellationToken)whereTMessage:class
  {
  constintcommitPeriod1;
  using(varconsumernewConsumerBuilderIgnore,string(config)
  。SetErrorHandler((,e)
  {
  Console。WriteLine(消费错误:{e。Reason});
  })
  。SetStatisticsHandler((,json)
  {
  Console。WriteLine();
  })
  。SetPartitionsAssignedHandler((c,partitionList)
  {
  stringpartitionsstring。Join(,,partitionList);
  Console。WriteLine(分配的分区:{partitions});
  })
  。SetPartitionsRevokedHandler((c,partitionList)
  {
  stringpartitionsstring。Join(,,partitionList);
  Console。WriteLine(回收的分区:{partitions});
  })
  。Build())
  {
  consumer。Subscribe(topics);
  try
  {
  while(true)
  {
  try
  {
  varconsumeResultconsumer。Consume(cancellationToken);
  if(consumeResult。IsPartitionEOF)
  {
  continue;
  }
  if(consumeResult?。OffsetcommitPeriod0){
  try
  {
  varresultJsonConvert。DeserializeObject(consumeResult。Message?。Value);
  func(result);消费消息
  }
  catch(Exceptionex)
  {
  Console。WriteLine(消费业务处理失败:{ex。Message});
  }
  try
  {
  consumer。Commit(consumeResult);手动提交
  Console。WriteLine(消费者消费完成,已提交);
  }
  catch(KafkaExceptione)
  {
  Console。WriteLine(提交错误:{e。Error。Reason});
  }
  }
  }
  catch(ConsumeExceptione)
  {
  Console。WriteLine(消费错误:{e。Error。Reason});
  }
  }
  }
  catch(Exceptione)
  {
  Console。WriteLine(其他错误:{e。Message});
  consumer。Close();
  }
  }
  awaitTask。CompletedTask;
  }
  }pre
  37、并且提供对应的接口服务,用于开放给外部调用,或者提供依赖注入使用:
  38、新建一个控制台项目,用来当作消费者端的测试,并且新增一个方法,用来当作消费者接收到消息以后的业务处理方法体。此处控制台环境版本为。NET6
  39、消费客户端代码如下。其中,BootstrapServers也可以提供集群地址,例如ip1:port,ip2:port服务之间以半角逗号隔开。
  40、再新增一个webapi项目,用来当作生产者的客户端进行发送数据。以及对kafka服务类部分进行依赖注入注册,此处使用单例。该webapi此处使用。NET6环境,带有控制器的模式。
  41、新增的控制器里面,进行生产者的注入与实现。注意:topicName参数对应上边的topicwesky,通过主题绑定,否则消费者不认识就没办法消费到了。
  控制器代码:
  〔Route(api〔controller〕〔action〕)〕
  〔ApiController〕
  publicclassProducerController:ControllerBase
  {
  IPublishServiceservice;
  publicProducerController(IPublishServicepublishService)
  {
  servicepublishService;
  }
  〔HttpPost〕
  publicIActionResultSendMessage(stringbroker,stringtopicName,stringmessage)
  {
  service。PublishAsync(broker,topicName,message);
  returnOk();
  }
  }
  42、接下来是一些测试,如图所示:
  43、最后,使用可视化管理工具Offset进行查看,可以看到对应的主题。选中主题,可以设置数据类型,这里我设置为字符串,就可以查看到对应的消息内容了。如果没有设置,默认是16进制的数据。
  44、查看刚刚测试时候收发的消息队列里面的数据,如下所示:
  45、一些额外补充:
  Kafka也是消息队列的一种,用于在高吞吐量场景下使用比较适合。如果是轻量级的,只需要用于削峰,可以使用RabbitMQ。
  以上只是简单的操作演示,至于要用得溜,观众朋友们可以自行补充所需的相关理论知识。
  可视化工具还有一款yahoo提供的开源的工具,叫kafkamanager,有兴趣的大佬们可以自行玩玩,开源地址:
  https:github。comyahooCMAK
  还有一款滴滴平台做的开源的kafka运维管理平台,有兴趣的大佬们也可以自行了解,地址:
  https:github。comdidiLogiKM
  以上就是该博客的全部内容,感谢各位大佬们的观看

盘点吉利2月销量新能源占比超两成,高端车型成为主力军随着时间来到了三月初,各大车企陆陆续续公布了2月份的销量数据。由于今年春节假期在1月份的缘故,加上整个社会经济面持续向好,这些因素综合在一起,使得2月份各大车企取得了不错的销量数据两会丨全国人大代表骆驼股份董事长刘长来进一步规范退役动力电池回收利用点蓝字关注,不迷路资源循环利用,不仅可以解决我国资源约束性矛盾和环境污染问题,也有利于推动双碳战略目标落地。全国人大代表骆驼股份董事长刘长来表示,今年主要围绕资源循环利用与高质量发欣旺达的动力电池狂想欣旺达(300207。SZ)又要再融资了。根据3月9日公告,欣旺达拟定增募资不超过48亿元,用于总投资56。01亿元的SiP系统封测项目高性能消费类圆柱锂离子电池项目和补充流动资金一企业出售合肥地皮,获政府5。8亿补偿!近日,统一企业中国公布,出售合肥工业地皮予土地储备中心,获补偿约5。83亿元,预期自出售事项确认除税前未经审核收益净额约3。51亿元。据悉,由于近期安徽省合肥经济技术开发区进行城市财经聚焦2月份CPI和PPI同比涨幅回落物价持续平稳运行国家统计局9日发布数据,2月份,全国居民消费价格指数(CPI)同比上涨1,涨幅比上月回落1。1个百分点全国工业生产者出厂价格指数(PPI)同比下降1。4,降幅比上月扩大0。6个百分我型我秀奔七奶奶青春洋溢我今年67岁,网名叫红珊瑚,江苏南京人。虽然是位奔七奶奶,但我觉得自己还可以像年轻人一样穿的时尚,穿的有个性。(受访者供图)浅粉上衣加黑色休闲裤,配上斜挎小包和黑色墨镜,营造一种轻真是横的怕楞的!美威胁将击落朝鲜导弹,朝鲜立马就发射了一颗韩国军方证实称,朝鲜当地时间星期四(3月9日)向其西海岸水域发射了一枚短程弹道导弹。值得注意的是,美国此前曾公开威胁称将击落朝鲜导弹,对此朝鲜立即强硬回击警告美国若这样做将被视为对眼馋吗?7种塔吉克斯坦特色小食在塔吉克斯坦,糖果和甜点是节日宴会的一个重要组成部分,不仅大人和小孩喜欢吃,甚至减肥的人也无法抗拒甜食的魔力。帕什玛克酥糖这种酥糖也叫龙须糖,是一种用面粉和糖,在黄油中烘烤而成的细身边的大国工匠身边的大国工匠原标题15年来田得梅零失误完成巨型精密装置吊装(引题)解锁毫厘间的操作密码(主题)工人日报中工网记者邢生祥2月28日晚,2022年大国工匠年度人物在南京揭晓。颁奖晚会举报虽然乌龙警示切莫忽视来源台海网秋海棠近日厦门有市民举报,称位于湖里区的一家公司养了数只鸳鸯。湖里自然资源综合执法大队工作人员立即赶往现场调查取证,没想到鸳鸯却是绿头鸭。原来,绿头鸭和鸳鸯外貌有相似之处春天第一口吃什么?良品铺子一大波新品零食春季上新17年来,在寻找好原料打造高品质零食的路上,良品铺子始终没有停下脚步。阳春三月,良品铺子门店内的新品无核加州大西梅椰奶银耳蛋糕野山笋等纷纷上市,以满足消费者春季出游的需求。每一口都
拳王维尔德心态被揭露!知情者他如今矛盾重重,不知何去何从青铜轰炸机德昂泰维尔德如今职业生涯岌岌可危,这位前WBC重量级拳王至今没有进行训练,更没有确定下一场比赛。过去几个月间,维尔德的行为显得有些怪异,一会在推特上疯狂刷屏,一会又悄无声中国女排,改编电影夺冠2019年国庆节,当郎平率女排队员从天安门前经过时,一时间万众瞩目。这一幕让人不由得想起36年前的这张照片。这两个中国女排史上最辉煌的时刻,都有着同一个人,郎平。彼时的郎平不仅是当再过一个月,伊犁又将惊艳全国!美丽的伊犁欢迎您!春雷响,万物长,今日惊蛰,是二十四节气中的第三个节气,标志着仲春时节的开始。随着气温慢慢地回升,伊犁真正的春天正式登场,那拉提草原上的野百合将随着冰雪的消融,顶冰而出,悄然绽放!杏俄罗斯美女告诉你只要3000人民币,你就能享受到特殊服务出国旅游只有3000元的话,对我们中国的百姓来说,根本就不算什么,不过我国的游客觉得,钱这么少能游玩的好吗?(此处已添加小程序,请到今日头条客户端查看)不过,如果你的旅游目的地是俄加拿大境内旅游签转工签政策再次延期政策终于延续了,昨天还在讨论会怎么样,今天加拿大就宣布,加拿大境内旅游签转工签政策再次延期2年至2025年02月28日。加拿大旅游签转工作签证政策从2020年8月25日,加拿大移民乘坐中老铁路去老挝,最全通关秘籍和注意事项汇总老挝,旅游资源居于东南亚末流,无法与泰国马来西亚菲律宾相提并论。但在境外游还未全面放开的2023年,老挝凭借与我国接壤优势意外火了。如今到此不需要核酸报告和疫苗证明,出国和回国手续黑龙江牡丹江冰雪逐渐消融冰凌花破冰绽放2023年3月6日,黑龙江牡丹江,在牡丹峰冰雪尚未完全褪去的山坡上,一些花朵直径一厘米左右的嫩黄色小花在冰凌中悄悄绽放。图片来源视觉中国据了解,冰凌花即侧金盏花。侧金盏花植株矮小,第一二批开放出境团体游的国家名单2023年2月6日起,试点恢复团队出境旅游国家泰国印度尼西亚柬埔寨马尔代夫斯里兰卡菲律宾马来西亚新加坡老挝阿联酋埃及肯尼亚南非俄罗斯瑞士匈牙利新西兰斐济古巴阿根廷。出境游热度不断升土耳其联赛发生骚乱,球员在混乱中踢完比赛据土耳其媒体turkishminute6日报道,土耳其足球第三级别联赛当地时间上周日(5日)发生严重骚乱事件。阿梅德体育球员球迷遭到了布尔萨体育球迷的种族主义攻击。比赛期间,球场上三场比赛后,客观评价有杜兰特的太阳队的新首发阵容坦诚地讲,自从凯文杜兰特到来后,菲尼克斯太阳队刚开始就变得十分流畅,这让人有些震惊。他们在三个客场比赛中取得了30的成绩,首发球员贡献出了无与伦比的竞技状态。即使太阳的新首发阵容合火箭,22连胜时期的姚麦之外的火箭球员们,后来去哪了?火箭,22连胜时期的姚麦之外的火箭球员们,后来去哪了?20072008赛季,对于休斯敦火箭队的球迷们来说,绝对是难忘的一个赛季,当赛季,在阿德尔曼的率领下,球队打出了一波22连胜。
友情链接:快好找快生活快百科快传网中准网文好找聚热点快软网