范文健康探索娱乐情感热点
投稿投诉
热点动态
科技财经
情感日志
励志美文
娱乐时尚
游戏搞笑
探索旅游
历史星座
健康养生
美丽育儿
范文作文
教案论文
国学影视

Spark(十六)SparkStreaming需求练习

  一.环境准备1.pom文件              org.apache.spark         spark-core_2.12         3.0.0                    org.apache.spark         spark-streaming_2.12         3.0.0                    org.apache.spark         spark-streaming-kafka-0-10_2.12         3.0.0                         com.alibaba         druid         1.1.10                    mysql         mysql-connector-java         5.1.27       com.fasterxml.jackson.core     jackson-core     2.10.1                                                            net.alchim31.maven                 scala-maven-plugin                 3.2.2                                                                                                                     compile                                                                                                          org.apache.maven.plugins                 maven-assembly-plugin                 3.0.0                                                               jar-with-dependencies                                                                                                     make-assembly                         package                                                      single                                                                                           2.beanimport java.text.SimpleDateFormat import java.util.Date //数据格式:1597148289569,华北,北京,102,4,2020-08-11,11:12 case class AdsInfo(ts: Long,         area: String,         city: String,         userId: String,         adsId: String,         var dayString: String = null, // yyyy-MM-dd         var hmString: String = null) { // hh:mm          val date = new Date(ts)         dayString = new SimpleDateFormat("yyyy-MM-dd").format(date)         hmString = new SimpleDateFormat("HH:mm").format(date) }  3.工具类JDBCUtilsobject JDBCUtil {      // 创建连接池对象     var dataSource:DataSource = init()      // 连接池的初始化     def init():DataSource = {          val paramMap = new java.util.HashMap[String, String]()         paramMap.put("driverClassName", PropertiesUtil.getValue("jdbc.driver.name"))         paramMap.put("url", PropertiesUtil.getValue("jdbc.url"))         paramMap.put("username", PropertiesUtil.getValue("jdbc.user"))         paramMap.put("password", PropertiesUtil.getValue("jdbc.password"))         paramMap.put("maxActive", PropertiesUtil.getValue("jdbc.datasource.size"))          // 使用Druid连接池对象         DruidDataSourceFactory.createDataSource(paramMap)     }      // 从连接池中获取连接对象     def getConnection(): Connection = {         dataSource.getConnection     }      def main(args: Array[String]): Unit = {          println(getConnection())      } }  Properties工具类/**  * project.properties文件  */ #jdbc配置 jdbc.datasource.size=10 jdbc.url=jdbc:mysql://hadoop102:3306/steamingproject?useUnicode=true&characterEncoding=utf8&rewriteBatchedStatements=true jdbc.user=root jdbc.password=root jdbc.driver.name=com.mysql.jdbc.Driver  # Kafka配置 kafka.broker.list=hadoop102:9092,hadoop103:9092,hadoop104:9092 kafka.topic=mytest kafka.group.id=cg1  import java.util.ResourceBundle /**  * Properties文件工具类  */ object PropertiesUtil {      // 绑定配置文件     // ResourceBundle专门用于读取配置文件,所以读取时,不需要增加扩展名     // 国际化 = I18N => Properties     val summer: ResourceBundle = ResourceBundle.getBundle("project")      def getValue( key : String ): String = {         summer.getString(key)     }      def main(args: Array[String]): Unit = {          println(getValue("jdbc.user"))      } }  3.创建BaseApp/**  * @description: 基础类  * @author: HaoWu  * @create: 2020年08月11日  */ abstract class BaseApp {   val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("myAPP")   val ssc: StreamingContext = new StreamingContext(conf, Seconds(3))   //设置消费kafka的参数,可以参考kafka.consumer.ConsumerConfig类中配置说明   val kafkaParams: Map[String, Object] = Map[String, Object](     "bootstrap.servers" -> "hadoop102:9092,hadoop103:9092,hadoop104:9092", //zookeeper的host,port     "group.id" -> "g3", //消费者组     "enable.auto.commit" -> "true", //是否自动提交     "auto.commit.interval.ms" -> "500", //500ms自动提交offset     "key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",     "value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",     "auto.offset.reset" -> "earliest" //第一次运行,从最初始偏移量开始消费数据   )    //消费kafka的mytest主题生成DStream   val ds: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](     ssc,     LocationStrategies.PreferConsistent,     //订阅主题     ConsumerStrategies.Subscribe[String, String](List("mytest"),       kafkaParams))     /**    *  将输入流InputDStream[ConsumerRecord[String, String]]=>stream[对象]    * @param ds    * @return    */   def getAllBeans(ds: InputDStream[ConsumerRecord[String, String]]): DStream[AdsInfo] = {     val result: DStream[AdsInfo] = ds.map(       record => {         val arr: Array[String] = record.value().split(",")         AdsInfo(arr(0).toLong, arr(1), arr(2), arr(3), arr(4))       }     )     result   }    /**    * 处理逻辑    * @param opt    */   def runApp(opt: => Unit): Unit = {     try {       //处理逻辑       opt       //执行程序       ssc.start()       ssc.awaitTermination()     } catch {       case e: Exception => e.getMessage     }   }  }  需求一:动态添加黑名单
  实现实时的动态黑名单机制:将每天对某个广告点击超过 100 次的用户拉黑。
  注:黑名单保存到MySQL中。
  思路分析
  1)读取Kafka数据之后,并对MySQL中存储的黑名单数据做校验;
  2)校验通过则对给用户点击广告次数累加一并存入MySQL;
  3)在存入MySQL之后对数据做校验,如果单日超过100次则将该用户加入黑名单。
  准备工作 1)存放黑名单用户的表 CREATE TABLE black_list (userid CHAR(2) PRIMARY KEY); 2)存放单日各用户点击每个广告的次数 CREATE TABLE user_ad_count ( 	dt date, 	userid CHAR (2), 	adid CHAR (2), 	count BIGINT, 	PRIMARY KEY (dt, userid, adid) ); /**  * @description: 需求一:动态添加黑名单  *               说明:实现实时的动态黑名单机制:将每天对某个广告点击超过 100 次的用户拉黑  *               (用户,广告id,时间,次数)  *               注:黑名单保存到MySQL中  * @author: HaoWu  * @create: 2020年08月12日  */ object ProjectDemo_1 extends BaseApp {   def main(args: Array[String]): Unit = {     runApp {       val asdInfo: DStream[AdsInfo] = getAllBeans(ds)        /**        * 校验数据是否在黑名单中        */       def isBlackList(userid: String, connection: Connection): Boolean = {         var flag: Boolean = true         val sql =           """             |select * from black_list where userid = ?             |""".stripMargin         val ps: PreparedStatement = connection.prepareStatement(sql)         ps.setString(1, userid)         val result: ResultSet = ps.executeQuery()         if (result != null) {           flag = false         }         flag       }        //1.聚合当前批次数据((timestamp,userid,adsid),count)       val countDS: DStream[((String, String, String), Long)] = asdInfo.map {         //((2020-08-11,102,1),1)         case adsInfo: AdsInfo => ((adsInfo.dayString, adsInfo.userId, adsInfo.adsId), 1L)       }.reduceByKey(_ + _)         countDS.foreachRDD(         rdd => rdd.foreachPartition {           iter => {             //2.向mysql插入数据,准备插入sql和连接             val connection: Connection = JDBCUtil.getConnection()             val sql =               """                 |insert into user_ad_count values(?,?,?,?)                 |ON DUPLICATE KEY UPDATE COUNT= count + ?                 |""".stripMargin             val ps: PreparedStatement = connection.prepareStatement(sql)             //2.过滤出在名单中的数据             iter.filter {               case ((_, userid, _), _) => val falg = isBlackList(userid, connection); falg             }               //往mysql重插入更新数据               .foreach {                 case ((date, userid, adsid), count) => {                   ps.setString(1, date)                   ps.setString(2, userid)                   ps.setString(3, adsid)                   ps.setLong(4, count)                   ps.setLong(5, count)                   ps.executeUpdate()                 }               }             //关闭             ps.close()              //3.插入成功之后,查询对应得userid点击广告此时是否 > 100?             val sql2 =               """                 |select userid from user_ad_count where count > 20                 |""".stripMargin             val ps2: PreparedStatement = connection.prepareStatement(sql2)             val resultSet: ResultSet = ps2.executeQuery()             //封装查询出的黑名单列表             val block_list = new mutable.HashSet[String]()             while (resultSet.next()) {               val userid: String = resultSet.getString("userid")               block_list + userid             }             //关闭resulteSet,PreparedStatement             resultSet.close()             ps2.close()              //4.将block_list数据依次插入黑名单表,没有就插入,有就更新             val sql3: String =               """                 |INSERT INTO black_list VALUES (?)                 |ON DUPLICATE KEY UPDATE userid=?                 |""".stripMargin             val ps3: PreparedStatement = connection.prepareStatement(sql3)             for (userid <- block_list) {               ps3.setString(1, userid)               ps3.setString(2, userid)               ps3.executeUpdate()             }             ps3.close()             connection.close()           }         }       )     }    } } 需求二:广告点击量实时统计
  描述 :实时统计每天各地区各城市各广告的点击总流量,并将其存入MySQL
  步骤 :①updateStateByKey有状态累加计算 ②向mysql执行插入更新操作
  Mysql表  CREATE TABLE area_city_ad_count ( 	dt date, 	area CHAR(4), 	city CHAR(4), 	adid CHAR(2),   count BIGINT, 	PRIMARY KEY (dt,area,city,adid)  --联合主键 );
  代码实现 import java.sql.{Connection, PreparedStatement} import com.spark.streaming_need.bean.AdsInfo import com.spark.streaming_need.utils.JDBCUtil import org.apache.spark.streaming.dstream.DStream  /**  * @description: 需求二:广告点击量实时统计  *               描述:实时统计每天各地区各城市各广告的点击总流量,并将其存入MySQL  * @author: HaoWu  * @create: 2020年08月11日  */ object ProjectDemo_2 extends BaseApp {   def main(args: Array[String]): Unit = {     runApp {       //updateStateByKey算子有状态,需要checkpoint       ssc.checkpoint("function2")        //1.单个批次内对数据进行按照天维度的聚合统计       //数据格式:1597148289569,华北,北京,102,4       val DsAds: DStream[AdsInfo] = getAllBeans(ds)       val kvDS: DStream[((String, String, String, String), Int)] = DsAds.map {         case (adsInfo) => {           ((adsInfo.dayString, adsInfo.area, adsInfo.city, adsInfo.adsId), 1)         }       }        //2.结合MySQL数据跟当前批次数据更新原有的数据       //计算当前批次和之前的数据累加结果       val result: DStream[((String, String, String, String), Int)] = kvDS.updateStateByKey {         case (seq, opt) => {           var sum: Int = seq.sum           val value = opt.getOrElse(0)           sum += value           Some(sum)         }       }       //3.将结果写入Mysql       result.foreachRDD(         rdd => {           rdd.foreachPartition {             iter => {               //每个分区创建一个Connection连接               val connection: Connection = JDBCUtil.getConnection()               //准备sql,实现mysql的upsert操作               val sql =                 """                   |insert into area_city_ad_count values (?,?,?,?,?)                   |on duplicate key update count=?                   |""".stripMargin               //PreparedStatement               val ps: PreparedStatement = connection.prepareStatement(sql)               //RDD分区中的每个数据都执行写出               iter.foreach {                 case ((dayString, area, city, adsId), count) => {                   //填充占位符                   ps.setString(1, dayString)                   ps.setString(2, area)                   ps.setString(3, city)                   ps.setString(4, adsId)                   ps.setInt(5, count)                   ps.setInt(6, count)                   //执行写入                   ps.executeUpdate()                 }               }               //关闭资源               ps.close()               connection.close()             }           }         }       )     }   } } 需求三:最近一小时广告点击量需求说明
  求最近1h的广告点击量,要求按照以下结果显示 结果展示: 1:List [15:50->10,15:51->25,15:52->30] 2:List [15:50->10,15:51->25,15:52->30] 3:List [15:50->10,15:51->25,15:52->30]
  思路分析
  1)开窗确定时间范围;
  2)在窗口内将数据转换数据结构为((adid,hm),count);
  3)按照广告id进行分组处理,组内按照时分排序。
  代码实现 import org.apache.spark.streaming.{Minutes, Seconds} import org.apache.spark.streaming.dstream.DStream  /**  * @description: 需求三:最近一小时广告点击量,3秒更新一次  * @author:  * 结果展示:  * 1:List [15:50->10,15:51->25,15:52->30]  * 2:List [15:50->10,15:51->25,15:52->30]  * 3:List [15:50->10,15:51->25,15:52->30]  * @create: 2020年08月12日  */ object ProjectDemo_3 extends BaseApp {   def main(args: Array[String]): Unit = {     //运行app     runApp {       val AdsDStream: DStream[((String, String), Int)] = getAllBeans(ds).map {         case adsInfo => ((adsInfo.adsId, adsInfo.hmString), 1)       }       val result: DStream[(String, List[(String, Int)])] = AdsDStream         //窗口内聚合         .reduceByKeyAndWindow((a: Int, b: Int) => {           a + b         }, Minutes(60), Seconds(3))         .map { case ((adsId, ahmString), count) => (adsId, (ahmString, count)) }         //按照广告id分组         .groupByKey()         //组内按时间升序         .mapValues {           case iter => iter.toList.sortBy(_._1)         }       result.print(10)     }   } }
  结果 ------------------------------------------- Time: 1597234032000 ms ------------------------------------------- (1,List((20:01,12), (20:02,112), (20:03,98), (20:04,95), (20:05,104), (20:06,96), (20:07,13))) (2,List((20:01,24), (20:02,97), (20:03,99), (20:04,103), (20:05,95), (20:06,105), (20:07,6))) (3,List((20:01,30), (20:02,87), (20:03,92), (20:04,108), (20:05,117), (20:06,88), (20:07,22))) (4,List((20:01,15), (20:02,101), (20:03,100), (20:04,99), (20:05,84), (20:06,112), (20:07,22))) (5,List((20:01,19), (20:02,103), (20:03,111), (20:04,95), (20:05,100), (20:06,99), (20:07,10)))  ------------------------------------------- Time: 1597234035000 ms ------------------------------------------- (1,List((20:01,12), (20:02,112), (20:03,98), (20:04,95), (20:05,104), (20:06,96), (20:07,20))) (2,List((20:01,24), (20:02,97), (20:03,99), (20:04,103), (20:05,95), (20:06,105), (20:07,13))) (3,List((20:01,30), (20:02,87), (20:03,92), (20:04,108), (20:05,117), (20:06,88), (20:07,26))) (4,List((20:01,15), (20:02,101), (20:03,100), (20:04,99), (20:05,84), (20:06,112), (20:07,26))) (5,List((20:01,19), (20:02,103), (20:03,111), (20:04,95), (20:05,100), (20:06,99), (20:07,15)))  ------------------------------------------- Time: 1597234038000 ms ------------------------------------------- (1,List((20:01,12), (20:02,112), (20:03,98), (20:04,95), (20:05,104), (20:06,96), (20:07,23))) (2,List((20:01,24), (20:02,97), (20:03,99), (20:04,103), (20:05,95), (20:06,105), (20:07,16))) (3,List((20:01,30), (20:02,87), (20:03,92), (20:04,108), (20:05,117), (20:06,88), (20:07,34))) (4,List((20:01,15), (20:02,101), (20:03,100), (20:04,99), (20:05,84), (20:06,112), (20:07,30))) (5,List((20:01,19), (20:02,103), (20:03,111), (20:04,95), (20:05,100), (20:06,99), (20:07,20)))

煮面条时,最忌水开下锅了,大厨教你正确做法,做出面条爽滑劲道很多朋友比较喜欢早上吃面条,因为面条的做法比较简单,只需要用水简单地一煮就可以了,并不会耗费很长时间。很多朋友在煮面条的时候都有这样的疑问,面条究竟是该凉水下锅还是开水下锅呢?其实扔掉牛仔裤吧?今春都在穿长腿裤,高级优雅,看着就气质春来是时候抛弃厚重的衣服了,看惯了牛仔裤的话,可以尝试一下长腿裤,显腿长还有气质感,而且搭配好的话超级的具有看点,拍照都会很好看。什么是长腿裤长腿裤其实大家可以从文字上去理解,就是孩子总是撒谎,有效的方法不是打骂,这三点才是关键文丨木棉妈妈没有哪个人喜欢撒谎的孩子。小时候我们学习课文放羊的孩子,故事告诉我们,要做个诚实的孩子,撒谎的孩子要承担相应的后果。现实生活中,撒谎这个坏毛病总是让家长们很头疼,也经常最高级的养生少(建议收藏)活了141岁的药王孙思邈,他在孙思邈方书中说过一句话口中言少,心中事少,腹里食少,自然睡少,依次四少,神仙诀了。寥寥数字,道出了最简单易行的养生道理,适合我们学习。口中言少少说话,根治生理性打鼾,做好这四点打呼噜在医学上称为打鼾,是人在睡眠状态下发出的粗重呼吸声。打鼾通常分为生理性打鼾和病理性打鼾。生理性打鼾声音较轻,一般由睡眠姿势不当枕头过高或过低身体过度疲劳引起病理性打鼾是受慢性袁泉你的后半生,就只做精英女人?袁泉又出演了职场女精英,近期在热播剧相逢时节里出演年薪200万,不管是气质还是穿搭都是妥妥的女强人形象。袁泉的演技各方面都十分在线,一眼看去就让人信服,走路的方式说话的语调智慧的神建议头发少的人用侧柏叶煮过的水洗头,头发又多又顺我的头发遗传我爸,头发少还细软,高中学习压力大掉了好多后来养头发养了好多年终于变成了别人羡慕的头发之前听别人说用侧柏叶洗头会长头发刚开始我还半信半疑,后来用了这个方法分分钟打脸!用如何在两个月内快速美白?这些知识你必须知道俗话说一白遮千丑,皮肤白皙真的太重要了。那么如何有效变白呢?ok。以下满满干货,记好笔记。妈妈再也不用担心我白不起来了。你属于哪种黑?判断自己属于哪种黑后,再对症下药!后天晒黑肤色这才是适合普通60岁奶奶的打扮九分裤平底鞋,耐看又高级上了年纪的女人,会随着时间流逝慢慢沉淀,变得越来越成熟稳重,愈发有迷人的韵味儿。有人觉得奶奶辈儿应该随波逐流,朴素低调,但是这世间并没有规定任何人应该如何活,女人越是到老越要大胆一微胖奶奶春季别总穿碎花裙了,这些裙装更实用得体,值得借鉴裙装是春季典型的气质单品,但并非所有裙装都能帮助普通人穿出好气质,甚至很多裙装穿在不同人身上,效果截然不同。对于上了年纪的微胖奶奶,如果想要借助裙装展现魅力,那么首先就要选对裙装款恭喜!姚明减肥成功,40岁叶莉高兴了表态二胎事宜姚明要加把劲目前CBA季后赛即将在南昌上演,目前入围季后赛席位的12支球队也将乘坐包机前往,也是为了避免让球员和外界有过多接触,确保季后赛的顺利进行。目前季后赛的赛程也是已经公布,其中附加赛和
给美女换脸,年赚16亿!最会耍大牌中国男人,还在狂飙阅读此文之前,麻烦您点击下关注,方便与您讨论分享,也能及时观看下一篇精彩文章。非常感谢您的关注!95版武则天中主要讲述了我国历史上也是唯一一位的女皇武则天的传奇一生。要饰演女皇武则如果你正在步入老年,建议花两分钟看看这篇文章如果你正在步入老年了,要花两分钟看看这篇文章,快乐自然来!1hr如果你正在步入老年了要懂得与过去做个了断人到老年,在心态上就会发生根本性变化,以后的岁月不再为别人活着,而是为自己活重装机兵昙花一现的特色系统,DIY涂装真的专为红狼而生吗?这是老男孩游戏盒的第554篇原创,作者小雷最近老男孩游戏盒在小破站直播重装机兵钢之季节,突然感慨这个系列其实存在好多昙花一现的烂尾系统,就比如钢之季节的厨师系统,玩家可以使用采集到大量上市!叶酸是包菜的17倍,中老年多吃这3样,手脚麻利精神好春耕进行时上了年纪之后,不仅身子骨不如从前,就连精气神也远不如前,久而久之,身体功能就会多出受损,这时候就急需补充营养来维持身体的健康。叶酸是体内一种不可或缺的营养物质,很多人认为春天怎么穿更洋气?这些搭配思路都为你总结好了,照着穿简单好看爱美是女人的天性,相信很多小姐姐们都想在这个春天打扮得更洋气好看,但不少人总会因为欠缺搭配头绪,花费了时间与财力却无法达成愿望。春天,对于这些不知道怎么穿更洋气的女生,建议你看看这NASA预计参宿四今年将发生超新星爆炸,人体的铁元素来自于超新星就在前不久NASA就严重警告,我们夜空最亮的星,参宿四很有可能在2023年发生超新星爆炸。参宿四是一颗红超巨星,位于猎户座,这是一颗巨大的红超巨星,是人类目前已知最大和最亮的恒星之巴黎时装周过于暴露的服装设计,网友直呼辣眼睛近日,巴黎时装周举办,又一次带来了时尚界的狂欢。然而,今年巴黎时装周过于暴露的服装设计也引起了网上一片冷嘲热讽的声音,有网友直呼辣眼睛惊掉下巴。衣服的主要功能是保暖御寒防护身体遮蔽马到成功,马元素创意的标志设计集锦在中国以及世界范围呢马文化比较普遍,今天分享几款马元素创意的标志设计,看看都是如何设计创意的。用马作为标志的企业比较多像保时捷法拉利还是奢侈品牌爱马仕都用马的元素,当然还有很多品牌粉色少女心。粉嫩少女心。春季穿搭必不可少的就是粉色mina与大家一起pick下女明星们粉色少女心的穿搭吧01hr迪丽热巴图片来源于迪丽热巴工作室微博httpsweibo。com626932990年代内娱多敢说?媒体评出不受欢迎的十大明星,巩俐刘晓庆上榜文乔乔编辑乔乔现今娱乐圈的评选活动繁花似锦,层出不穷,却也失去了评选最初的意义。然而在二十多年前,互联网还不发达,家家户户还处于通过报纸杂志电视等传统媒体了解信息的时代。这张保存完淮北相山区本周六,黄里杏花节邀您来赏!来源人民网人民网淮北3月10日电(吕欢欢)又是一年芳草绿,十里杏花分外红。人民网安徽频道从淮北市相山区文旅体局获悉,2023相山黄里杏花文化旅游节将于3月11日(本周六)在淮北盛大