范文健康探索娱乐情感热点
投稿投诉
热点动态
科技财经
情感日志
励志美文
娱乐时尚
游戏搞笑
探索旅游
历史星座
健康养生
美丽育儿
范文作文
教案论文
国学影视

(3)sparkstreaming从kafka接入实时数据流最实现数据可视化

  (1)sparkstreaming从kafka接入实时数据流最终实现数据可视化展示,我们先看下整体方案架构:
  (2)方案说明:
  1)我们通过kafka与各个业务系统的数据对接,将各系统中的数据实时接到kafka;
  2)通过sparkstreaming接入kafka数据流,定义时间窗口和计算窗口大小,业务计算逻辑处理;
  3)将结果数据写入到mysql;
  4)通过可视化平台接入mysql数据库,这里使用的是NBI大数据可视化构建平台;
  5)在平台上通过拖拽式构建各种数据应用,数据展示;
  (3)代码演示:
  定义一个kafka生产者,模拟数据源 package com.producers;  import com.alibaba.fastjson.JSONObject; import com.pojo.WaterSensor; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata;  import java.util.Properties; import java.util.Random;  /**  * Created by lj on 2022-07-18.  */ public class Kafaka_Producer {     public final static String bootstrapServers = "127.0.0.1:9092";      public static void main(String[] args) {         Properties props = new Properties();         //设置Kafka服务器地址         props.put("bootstrap.servers", bootstrapServers);         //设置数据key的序列化处理类         props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");         //设置数据value的序列化处理类         props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");         KafkaProducer producer = new KafkaProducer<>(props);          try {             int i = 0;             Random r=new Random();               String[] lang = {"flink","spark","hadoop","hive","hbase","impala","presto","superset","nbi"};              while(true) {                 Thread.sleep(2000);                 WaterSensor waterSensor = new WaterSensor(lang[r.nextInt(lang.length)]+"_kafka",i,i);                 i++;                  String msg = JSONObject.toJSONString(waterSensor);                 System.out.println(msg);                 RecordMetadata recordMetadata = producer.send(new ProducerRecord<>("kafka_data_waterSensor", null, null,  msg)).get(); //                System.out.println("recordMetadata: {"+ recordMetadata +"}");             }          } catch (Exception e) {             System.out.println(e.getMessage());         }     } }
  根据业务需要,定义各种消息对象 package com.pojo;  import java.io.Serializable; import java.util.Date;  /**  * Created by lj on 2022-07-13.  */ public class WaterSensor implements Serializable {     public String id;     public long ts;     public int vc;      public WaterSensor(){      }      public WaterSensor(String id,long ts,int vc){         this.id = id;         this.ts = ts;         this.vc = vc;     }      public int getVc() {         return vc;     }      public void setVc(int vc) {         this.vc = vc;     }      public String getId() {         return id;     }      public void setId(String id) {         this.id = id;     }      public long getTs() {         return ts;     }      public void setTs(long ts) {         this.ts = ts;     } }
  sparkstreaming数据流计算 package com.examples;  import com.alibaba.fastjson.JSONObject; import com.pojo.WaterSensor; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.TopicPartition; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.VoidFunction2; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.Time; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaInputDStream; import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.apache.spark.streaming.kafka010.ConsumerStrategies; import org.apache.spark.streaming.kafka010.KafkaUtils; import org.apache.spark.streaming.kafka010.LocationStrategies;  import java.util.*;  /**  * Created by lj on 2022-07-18.  */ public class SparkSql_Kafka {     private static String appName = "spark.streaming.demo";     private static String master = "local[*]";     private static String topics = "kafka_data_waterSensor";     private static String brokers = "127.0.0.1:9092";      public static void main(String[] args) {         //初始化sparkConf         SparkConf sparkConf = new SparkConf().setMaster(master).setAppName(appName);          //获得JavaStreamingContext         JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.minutes(3));          /**          * 设置日志的级别: 避免日志重复          */         ssc.sparkContext().setLogLevel("ERROR");          Collection topicsSet = new HashSet<>(Arrays.asList(topics.split(",")));         //kafka相关参数,必要!缺了会报错         Map kafkaParams = new HashMap<>();         kafkaParams.put("metadata.broker.list", brokers) ;         kafkaParams.put("bootstrap.servers", brokers);         kafkaParams.put("group.id", "group1");         kafkaParams.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");         kafkaParams.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");         kafkaParams.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");                  //通过KafkaUtils.createDirectStream(...)获得kafka数据,kafka相关参数由kafkaParams指定         JavaInputDStream> lines = KafkaUtils.createDirectStream(                 ssc,                 LocationStrategies.PreferConsistent(),                 ConsumerStrategies.Subscribe(topicsSet, kafkaParams)         );          JavaDStream mapDStream = lines.map(new Function, WaterSensor>() {             @Override             public WaterSensor call(ConsumerRecord s) throws Exception {                 WaterSensor waterSensor = JSONObject.parseObject(s.value().toString(),WaterSensor.class);                 return waterSensor;             }         }).window(Durations.minutes(9), Durations.minutes(6));      //指定窗口大小 和 滑动频率 必须是批处理时间的整数倍;          mapDStream.foreachRDD(new VoidFunction2, Time>() {             @Override             public void call(JavaRDD waterSensorJavaRDD, Time time) throws Exception {                 SparkSession spark = JavaSparkSessionSingleton.getInstance(waterSensorJavaRDD.context().getConf());                  Dataset dataFrame = spark.createDataFrame(waterSensorJavaRDD, WaterSensor.class);                 // 创建临时表                 dataFrame.createOrReplaceTempView("log");                 Dataset result = spark.sql("select * from log");                 System.out.println("========= " + time + "=========");                 //输出前20条数据                 result.show();                                  //数据写入mysql                 writeDataToMysql(result);             }         });          //开始作业         ssc.start();         try {             ssc.awaitTermination();         } catch (Exception e) {             e.printStackTrace();         } finally {             ssc.close();         }     } }
  NBI大数据可视化构建平台对接mysql,构建数据应用:
  NBI可视化

3年3考北大获奖200万?考生被质疑是职业考生,校方招生老师称尚未录取,教育局发提醒函通报高考喜报违规热烈祝贺2022年我校全某某同学高考总分694分勇夺茂名市第一名。6月26日,广东茂名某实验学校一则高考喜报在网上热传。同日,网传一份提醒函,点名批评这所实验学校炒作高分高考喜报。为什么俄罗斯会拥有一小部分德国领土?作为世界上最大的国家,俄罗斯是一个相当难以错过的地方,在地图上从西边的波罗的海延伸到东边的太平洋,俄罗斯联邦的连续领土几乎与南美洲一样大,它是如此之大,以至于与相距甚远的国家如挪威美专家发现不对劲台湾问题拖得越久,就越对中国有利?任何试图与中国玩战略玩智慧的国家,都必将成为笑话。美国一直非常自信地认为,只要紧紧抓住台湾牌,就可以把中国拿捏得死死的,令中国投鼠忌器,束手束脚,从而达到遏制中国的目的。美国的政客福建的朋友们,你们的养老保险上云啦提起养老保险打工人都再熟悉不过了全国有4。2亿企业职工每月从薪资单里划走这份神秘的支出很多人好奇它们最后都去了哪儿了?实际上,它们都进入了一个巨大的蓄水池企业职工养老保险基金,基金喝水后一直小便的人,与半天不去厕所的人,谁的身体更健康?俗话说,水是生命之源。人的生理极限是能七天不吃饭,但不能三天不喝水。水占我们体重的70以上。充足的水分是维持我们身体机能正常运行的重要原因之一。每天人体都需要摄入大约1000ml233岁一女子,长时间喝蒲公英泡水,身体有收获哪些好处吗?在农村,随处可见蒲公英,这不只是普普通通的植物,而且营养价值和药用价值极高,不少人习惯把蒲公英放在杯中代茶饮用。来自于杭州的李女士也是如此的,李女士今年33岁,坚持每天早晨喝杯蒲公G7国家自身难保,金砖国家成色更足,地缘博弈无法撼动合作发展6月26日,为期三天的2022年七国集团峰会正式开幕。在俄乌冲突不断持续,全球经济下行压力增大的背景下,外界对七国集团解决问题的能力产生了普遍质疑。而前几天刚刚结束的金砖国家第十四太丢人了!韩国女歌手唱歌太嗨,比基尼险些滑落走光,场面很尴尬太尴尬了,这回丢人丢大发了。近日,韩国创作歌手BIBI(金亨瑞)在湿身音乐节WATERBOMB演唱时,本想脱掉上衣秀一下性感身材,没想到比基尼的带子却散掉,险些滑落,这一幕也被现场老G7已成历史!西方对俄制裁升级后,普京拉拢伊阿筹建新G7最近几日,全球又都进入多边外交时间。6月26日至28日,七大工业国集团(G7)在德国巴伐利亚州举行本年度领导人峰会。而后,北约各国政要又将出席在马德里的北约领导人峰会。作为西方资本历史上的三个假太监有多奇葩?谋害皇帝,欺辱皇后,让太后生子在历史的长流中,出现过这么一个群体,他们是封建王朝的畸形产物,那就是太监。他们之中有的指鹿为马发动政变,有的扰乱朝政疯狂敛财,有的改进造纸术造福世界。很多人都知道太监身体上都有缺陷再看康熙王朝才懂周培公被贬11年,为何临终前还愿意献皇图孝庄知道康熙得了一张地图,她激动地前去的看个究竟。孝庄知道,这张图是周培公用多年的精力和心血才完成的,看完之后,她生气地责怪康熙,像这样的臣子,居然被他贬去冰天雪地的地方待了十一年
中产家庭有标准了!3320万户满足条件,你拖后腿了吗?在被人均可支配收入平均之后,中产家庭也有标准了,国内已有3320万户满足条件,快来看看你是不是拖了后腿?根据胡润研究院发布的中国新中产圈层白皮书,资产达到300万,就可以称为中产,6位香港女神嫁内地人,高龄生三胎,给公婆买房,一人养全家随着内地发展得越来越好,越来越多港星陆续来内地发展甚至定居安家,其中不少老牌港星娶了内地妻子,也有不少香港女星嫁给了内地人。1,张晋和蔡少芬在遇到蔡少芬的时候,张晋刚刚从一个武术指刘美希受邀参加庆祝加蓬独立62周年暨加中建交48周年国际晚会8月17日晚,加蓬驻华大使馆举行庆祝加蓬独立62周年暨加中建交48周年晚会。三十多国大使及夫人,包括美国驻华大使法国驻华大使非洲等国驻华大使以及中国外交部商务部中联部文旅部国家发改1965年,毛主席邀请蒋介石回大陆,蒋介石听后,一连开出六个条件1945年抗日战争胜利,在四年后的10月1日毛主席在北京天安门城楼上向全世界宣布了新中国政府正式成立的消息。国民党残余势力逃到了一海之隔的台湾岛上,台湾也成了郁结在每一个中国人民心大陆最后的3名军统特务,隐姓埋名64年后,在2013年留下一张合影在阅读此文之前,麻烦您点击一下关注,既方便您进行讨论与分享,又给您带来不一样的参与感,感谢您的支持!1949年,在国民党大部队败退台湾之际,有三个人却说什么都不愿离开故土。64年之一个人认知度越来越高的两种表现苹果掉在牛顿头上,如果碰巧掉在你头上,你会怎么做?有人回答如果我是果农,那我就知道果子熟了,要安排采摘。如果我是行人,我会俯身捡起来,一口吃掉。还有人的回答是这样的如果苹果落在我面刘威葳从央视跨界做演员,38岁嫁圈外富商,今46岁依然优雅美丽文Di编辑小情书往往有独立思想,知道自己人生目标的女性,无论在生活中经历了什么磨难,她总能活得随性洒脱。刘威葳可能就是很早就明白其中奥义,所以尽管她不是最红的演员,但要说有个性的演吴京为避嫌,两次拒绝女嘉宾主动挽手,仍被一些人批评不尊重女性近日,第十二届北京国际电影节落下帷幕,一个跟电影无关的话题登上热搜。在主竞赛单元的评委媒体见面会上,李雪健郭帆柯文思秦海璐吴京纷纷亮相,媒体捕捉到了一个非常有意思的镜头。有一名女嘉喊话寻找涉事男孩家长请主动站出来承担责任都说每个熊孩子背后都有一个熊家长,今天我们要喊话寻找广东深圳的这位家长,如果您看到网上刷屏的这个视频,赶紧带着自己的孩子啊,转出来好好认错赔偿吧!看看你家熊孩子干的这事儿啊,没有酿什么事情是你考上公务员才知道的?1。考上公务员不能让你大富大贵,但绝对能一直养你到骨灰盒里。2。常识部分疯狂叠刷题BUFF,往死里刷,最后真他妈就能全部都做出来(别杠,杠就是你对)3。你认真备考的样子真的很帅,就卢本伟复出失败?全网封禁3年,这个男人偷偷做了哪些神奇的事情作为曾经的直播一哥,又是突然被封禁的大冤种,五五开卢本伟虽然告别江湖,但是总有他要复出的传言,近日,卢姥爷突然跑到UZI直播间刷礼物并放出语音,本尊现身让粉丝又燃起了全新的希望。前