范文健康探索娱乐情感热点
投稿投诉
热点动态
科技财经
情感日志
励志美文
娱乐时尚
游戏搞笑
探索旅游
历史星座
健康养生
美丽育儿
范文作文
教案论文
国学影视

Kafka系统与ELK的整合(八)

  我们使用Apache Flume来采集数据到Kafka中进行存储,最后在ELK中展示出来。到http://flume.apache.org/的地址下载Apache Flume,下载后部署在日志的服务器。下载后进行解压以及配置到环境变量中。整体思路是在拉勾网搜索"测试开发工程师",把获取到的结果信息存储到Kafka的系统中,最后展示在ELK中。下面具体配置这些信息。在conf的目录下编辑文件,文件内容为:#设置代理名agent.sources=s1 agent.channels=c1 agent.sinks=k1  #设置收集方式agent.sources.s1.type=exec agent.sources.s1.command=tail -F  /Applications/devOps/bigData/ELK/apache-flume/logs/apps.log agant.sources.s1.channels=c1 agent.channels.c1.type=memory agent.channels.c1.capacity=10000apage.channels.c1.transactionCapacity=100#设置kafka接收器agent.sinks.k1.type=org.apache.flume.sink.kafka.KafkaSink #设置kafka的broker和端口号agent.sinks.k1.brokerList=localhost:9092#设置kafka的topicagent.sinks.k1.topic=laGou #设置序列化agent.sinks.k1.serializer.class=kafka.serializer.StringEncoder #指定管道名agent.sinks.k1.channel=c1
  这里使用的主题是laGou,切记此时需要启动Kafka。下来启动Apache Flume,在apache-flume/bin的执行如下命令来启动,命令为:flume-ng agent -n agent --conf conf --conf-file ../conf/flume-kafka.properties  -Dflume.root.logger=DEBUG,CONSOLE
  执行后,输出如下的信息:
  下来使用分流数据的方式来实现数据的展示,具体可以理解为把采集到的数据存储到Kafka系统中,然后使用LogStash来消费Kafka存储的数据,并将消费后的数据存储到ElasticSearch中。下来配置logstash.yml的文件,配置LogStash账户和密码,具体如下:
  配置kafka_laGou.conf,具体内容为:
  配置完成后,在控制台中LogStach来消费Kafka集群中主题为laGou的数据,到LogStash的bin目录下执行:./logstash -f ../config/kafka_laGou.conf
  执行后,LogStash的Agent将正常启动并消费Kafka集群中的数据,然后把消费后的数据存储到ElasticSearch集群中,执行后,输出如下信息:Sending Logstash"s logs to /Applications/devOps/bigData/ELK/logstash/logs which is now configured via log4j2.properties[2021-06-12T18:39:43,175][WARN ][logstash.config.source.multilocal] Ignoring the "pipelines.yml" file because modules or command line options are specified [2021-06-12T18:39:43,210][FATAL][logstash.runner          ] Logstash could not be started because there is already another instance using the configured data directory.  If you wish to run multiple instances, you must change the "path.data" setting. [2021-06-12T18:39:43,221][ERROR][org.logstash.Logstash    ] java.lang.IllegalStateException: Logstash stopped processing because of an error: (SystemExit) exitlocalhost:bin liwangping$ clear localhost:bin liwangping$ ./logstash -f ../config/kafka_laGou.conf Sending Logstash"s logs to /Applications/devOps/bigData/ELK/logstash/logs which is now configured via log4j2.properties[2021-06-12T18:40:31,712][WARN ][logstash.config.source.multilocal] Ignoring the "pipelines.yml" file because modules or command line options are specified [2021-06-12T18:40:32,136][INFO ][logstash.runner          ] Starting Logstash {"logstash.version"=>"6.3.2"} [2021-06-12T18:40:33,674][INFO ][logstash.pipeline        ] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>8, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50} [2021-06-12T18:40:34,092][INFO ][logstash.outputs.elasticsearch] Elasticsearch pool URLs updated {:changes=>{:removed=>[], :added=>[http://elastic:xxxxxx@localhost:9200/]}} [2021-06-12T18:40:34,111][INFO ][logstash.outputs.elasticsearch] Running health check to see if an Elasticsearch connection is working {:healthcheck_url=>http://elastic:xxxxxx@localhost:9200/, :path=>"/"} [2021-06-12T18:40:34,426][WARN ][logstash.outputs.elasticsearch] Restored connection to ES instance {:url=>"http://elastic:xxxxxx@localhost:9200/"} [2021-06-12T18:40:34,505][INFO ][logstash.outputs.elasticsearch] ES Output version determined {:es_version=>6} [2021-06-12T18:40:34,508][WARN ][logstash.outputs.elasticsearch] Detected a 6.x and above cluster: the `type` event field won"t be used to determine the document _type {:es_version=>6}[2021-06-12T18:40:34,528][INFO ][logstash.outputs.elasticsearch] New Elasticsearch output {:class=>"LogStash::Outputs::ElasticSearch", :hosts=>["//localhost:9200"]} [2021-06-12T18:40:34,544][INFO ][logstash.outputs.elasticsearch] Using mapping template from {:path=>nil} [2021-06-12T18:40:34,561][INFO ][logstash.outputs.elasticsearch] Attempting to install template {:manage_template=>{"template"=>"logstash-*", "version"=>60001, "settings"=>{"index.refresh_interval"=>"5s"}, "mappings"=>{"_default_"=>{"dynamic_templates"=>[{"message_field"=>{"path_match"=>"message", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false}}}, {"string_fields"=>{"match"=>"*", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false, "fields"=>{"keyword"=>{"type"=>"keyword", "ignore_above"=>256}}}}}], "properties"=>{"@timestamp"=>{"type"=>"date"}, "@version"=>{"type"=>"keyword"}, "geoip"=>{"dynamic"=>true, "properties"=>{"ip"=>{"type"=>"ip"}, "location"=>{"type"=>"geo_point"}, "latitude"=>{"type"=>"half_float"}, "longitude"=>{"type"=>"half_float"}}}}}}}} [2021-06-12T18:40:34,584][INFO ][logstash.pipeline        ] Pipeline started successfully {:pipeline_id=>"main", :thread=>"#"} [2021-06-12T18:40:34,670][INFO ][logstash.outputs.elasticsearch] Installing elasticsearch template to _template/logstash [2021-06-12T18:40:34,676][INFO ][logstash.agent           ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]} [2021-06-12T18:40:34,691][INFO ][org.apache.kafka.clients.consumer.ConsumerConfig] ConsumerConfig values: auto.commit.interval.ms = 5000auto.offset.reset = latest bootstrap.servers = [localhost:9092] check.crcs = true client.id = logstash-0connections.max.idle.ms = 540000enable.auto.commit = true exclude.internal.topics = true fetch.max.bytes = 52428800fetch.max.wait.ms = 500fetch.min.bytes = 1group.id = console-consumer-83756heartbeat.interval.ms = 3000interceptor.classes = [] internal.leave.group.on.close = true isolation.level = read_uncommitted key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer max.partition.fetch.bytes = 1048576max.poll.interval.ms = 300000max.poll.records = 500metadata.max.age.ms = 300000metric.reporters = [] metrics.num.samples = 2metrics.recording.level = INFO metrics.sample.window.ms = 30000partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor] receive.buffer.bytes = 65536reconnect.backoff.max.ms = 1000reconnect.backoff.ms = 50request.timeout.ms = 305000retry.backoff.ms = 100sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05sasl.kerberos.ticket.renew.window.factor = 0.8sasl.mechanism = GSSAPI security.protocol = PLAINTEXT send.buffer.bytes = 131072session.timeout.ms = 10000ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer  [2021-06-12T18:40:34,797][INFO ][org.apache.kafka.common.utils.AppInfoParser] Kafka version : 1.1.0[2021-06-12T18:40:34,798][INFO ][org.apache.kafka.common.utils.AppInfoParser] Kafka commitId : fdcf75ea326b8e07 [2021-06-12T18:40:35,011][INFO ][org.apache.kafka.clients.Metadata] Cluster ID: E0qvXyu_T_Wr_vZgZUV80w [2021-06-12T18:40:35,024][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-0, groupId=console-consumer-83756] Discovered group coordinator localhost:9092 (id: 2147483647 rack: null) [2021-06-12T18:40:35,029][INFO ][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] [Consumer clientId=logstash-0, groupId=console-consumer-83756] Revoking previously assigned partitions [] [2021-06-12T18:40:35,029][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-0, groupId=console-consumer-83756] (Re-)joining group [2021-06-12T18:40:35,047][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600} [2021-06-12T18:40:35,149][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-0, groupId=console-consumer-83756] Successfully joined group with generation 1[2021-06-12T18:40:35,151][INFO ][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] [Consumer clientId=logstash-0, groupId=console-consumer-83756] Setting newly assigned partitions [laGou-0, laGou-1, laGou-2, laGou-3, laGou-4, laGou-5] [2021-06-12T18:40:35,168][INFO ][org.apache.kafka.clients.consumer.internals.Fetcher] [Consumer clientId=logstash-0, groupId=console-consumer-83756] Resetting offset for partition laGou-0 to offset 1.[2021-06-12T18:40:35,169][INFO ][org.apache.kafka.clients.consumer.internals.Fetcher] [Consumer clientId=logstash-0, groupId=console-consumer-83756] Resetting offset for partition laGou-1 to offset 1.[2021-06-12T18:40:35,169][INFO ][org.apache.kafka.clients.consumer.internals.Fetcher] [Consumer clientId=logstash-0, groupId=console-consumer-83756] Resetting offset for partition laGou-2 to offset 1.[2021-06-12T18:40:35,169][INFO ][org.apache.kafka.clients.consumer.internals.Fetcher] [Consumer clientId=logstash-0, groupId=console-consumer-83756] Resetting offset for partition laGou-3 to offset 1.[2021-06-12T18:40:35,169][INFO ][org.apache.kafka.clients.consumer.internals.Fetcher] [Consumer clientId=logstash-0, groupId=console-consumer-83756] Resetting offset for partition laGou-4 to offset 1.[2021-06-12T18:40:35,169][INFO ][org.apache.kafka.clients.consumer.internals.Fetcher] [Consumer clientId=logstash-0, groupId=console-consumer-83756] Resetting offset for partition laGou-5 to offset 0.
  此时,在Kafka的监控系统中可以看到主题laGou消费的详细信息,如下所示:
  下来实现数据的可视化,把数据存储到ElasticSearch的集群后,就可以通过Kibana来查询和分析数据。在ManageMent里面创建索引后,点击Discover模块,然后就会展示消费到的拉勾网的测试开发职位的数据,如下所示:
  可以使用不同的索引来查询,比如使用message来查询,就会显示如下的信息:
  当然也可以点击查看完整的数据,点击向右的箭头,就可以使用table格式和JSON格式来展示具体的数据。
  感谢您的阅读和关注,后续会持续更新!

快讯!康文森与戴姆勒达成专利和解!Avanci汽车专利池再进一步作者黄莺4月23日,据国外媒体Juvepatent报道,专利运营公司康文森(Conversant)与德国汽车制造商戴姆勒的专利诉讼纠纷在上周达成双方和解。但并未有更多细节披露。此次携2件发明专利闯关科创板,用友汽车科创属性有点软6月28日,用友汽车科创板上市申请被正式受理。在吉利汽车终止科创板之后,谁会成为科创板汽车第一股,成为一个悬念。用友汽车的到来,并不会成为人们期待中的科创板汽车第一股,虽然挂着汽车极致产品力加持,登顶珠峰的荣耀30持续热销,3K到5K价位无敌要说最近比较火的新闻,一定有首登珠峰60周年纪念活动,中国登山队重登珠峰完成2020珠峰高程测量任务。值得一提的是,这登山队员执行这项任务时,还有一个特殊的伙伴,即荣耀30系列。登激光雷达,又一个被美国专利卡脖子的产品作者黄莺本周,两家激光雷达企业在中美两个资本市场同时表现不佳,一家上市首日以微跌收盘,另一家则终止上市进程。一直大步快跑的激光雷达,为何在资本面前突然变了天?美国东部时间3月12日拖拉机之王!247KMH,堪比法拉利,网友不想在公路上看到它说起拖拉机,鸡哥相信很多网友的脑海里会出现这么一个画面。锈迹斑斑,浑身土灰,开起来慢吞吞的,发动机一响整条街都是哐哐哐的笑声。而随着时代的发展,人们生活条件越来越好,别说在城市里见界读丨亿亿亿倍!中国成功研制祖冲之二号与九章二号欧界报道近日中国量子计算领域接连有两个好消息,这两个重大进展意味着我国成为了当前世界上唯一一个在两种物理体系上,达到了所谓量子计算优越性的国家。量子计算的屠龙刀和倚天剑,中国一下子界读丨卢伟冰狂吹的这颗芯片真的那么牛?果然是神机全靠吹欧界报道前几日红米Note11系列举行发布会,声势浩大,尤其是120W快充得到一众追捧。不仅如此,这次的外观设计也得到了很大的提升,Note11系列确实能靠脸吃饭。但经过几轮发烧友界读丨全球第4大芯片代工厂格芯登陆美股,却强调在中国之外欧界报道10月28日晚10点,名列全球第4位的芯片代工厂格芯(GlobalFoundries)正式登陆美股纳斯达克,IPO总价高达26亿美元。在这次登陆美股中,我们能发现格芯隐晦地界读丨反转又反转?!台积电声明拒绝向美移交客户机密数据欧界报道这个月关于台积电软硬的反转,真的太多了。9月末,美国要求台积电三星等芯片代工巨头在11月8日前提交相关的客户机密数据。网络上关于台积电是否屈服的传闻不绝于耳。讽刺的是,关于全网性价比最高的手机,价格竟然不到3000块,小米都怕了说到性价比手机,大家第一个想到的肯定都是小米,但是今天小编要给大家介绍的,是vivos10,号称全网最高性价比的手机!快拿好小本本,小编我要开始划重点了!我们可以直观的看到VIVO2021真香打脸旗舰华为Mate40pro绝对YYDS终于说服男朋友给我换手机了,虎视眈眈盯了好久iphone12,死都不肯给我买,硬生生要给我买华为Mate40pro,据说这个大猪蹄子说还托关系等了好久呢,呵呵,谁稀罕啊在用了半个月
广电5G运营加速!河南江苏宁夏纷纷注册中广电移动分公司DVBCN注意到,短短两日,就公开可查询信息中在中广电移动网络有限公司省级分公司已经注册了三家,中广电移动网络有限公司河南分公司中广电移动网络有限公司江苏分公司中广电移动网络有限公知网,一家资本独大的后果机哥首页的盆友们都在讨论,关于知网的消息。一听这个名字,就梦回大学知网,就是那个在毕业季,被无数学生心心念念的网站。无论是写论文期间查资料,还是写完论文后查重,无数年轻人在深夜,用消失10年的张朝阳突然上起物理课,现在的他居然这么接地气了最近,张朝阳的物理课突然上了热搜。这个消失在我们视野里快10年的互联网大佬突然又冒了出来,这难免让人好奇。打开视频,张朝阳正一手搭讲台一手插裤兜,很是放松地在解答着学员们的问题,而注意!这37款App被点名,速自查4月20日,工信部发布一批关于侵害用户权益行为的APP通报37款APP违规!通报称,依据个人信息保护法网络安全法电信条例电信和互联网用户个人信息保护规定等法律法规,工信部近期组织第硬核!北大教授给我的Java程序员面试题宝典,让我轻松进了大厂这套Java程序员面试题是从基础到入门再到进阶进行整理的,整体清晰条理,为了让大家能够轻松理解,整理成了电子版分享给大家。Java基础部分1。与区别?和都是逻辑运算符,都是判断两边预算2000元左右,入手哪一款手机最合适?预算2000元左右,入手哪一款手机最合适呢?首先要明确需求,手机要续航还是性能,这里推荐几款性价比高的手机供朋友们挑选。1Redmik40s红米K40S继承了红米K40的优秀焊门员最新显卡天梯图及简评最近等等党们终于等到显卡降价了,那些囤积炒显卡的商家可能要头疼一阵了。大家如果考虑入手显卡,不妨先参照一下这张图。看一下性能排名,再参照一下价格。前一段时间发布的两款甜品卡RTX3苹果13promax上手简评,8000多米大洋,是否符合你的预期?苹果13promax开篇老规矩,上配置图摄像头全新的传感器和镜头A15仿生芯片中的全新图像信号处理器驱动新版广角镜头采用的像素尺寸增加至1。9微米广角镜头低光拍摄时进光量提升高达2iPad选购2022完全指南丨最强游戏机mini升级,Air性价比不及当年春季发布会结束,一年一度的iPad完全选购指南又跟大家见面了。在这么长时间后,四个产品线也早已全员正式焕新,包括迟迟不见变化的mini也以全面屏新姿态登场。但更为重要的是,高端线的2022Q1手机销量排名出炉,OPPO稳居第四,国产品牌未来可期近日,知名数据机构Canalys发布了2022年Q1手机销量排名出炉。虽然由于经济条件不利和季节性需求低迷,智能手机的总出货量下降了11。但一众国产手机品牌的表现还是堪称可圈可点,畅销iPhone榜单出炉,iPhone13销量最好,最受大家欢迎近日,消费者数据研究机构CIRP公布了一份关于iPhone手机不同机型的销量情况,在2022年的3月季度iPhone手机销量中,iPhone13系列新机销量占据了全部iPhone销