范文健康探索娱乐情感热点
投稿投诉
热点动态
科技财经
情感日志
励志美文
娱乐时尚
游戏搞笑
探索旅游
历史星座
健康养生
美丽育儿
范文作文
教案论文
国学影视

scalaAkka并发编程定时任务简易通信框架

  1. Akka并发编程框架简介1.1 Akka概述
  Akka是一个用于构建高并发、分布式和可扩展的基于事件驱动的应用工具包。Akka是使用scala开发的库,同时可以使用scala和Java语言来开发基于Akka的应用程序。 1.2 Akka特性提供基于异步非阻塞、高性能的事件驱动编程模型 内置容错机制,允许Actor在出错时进行恢复或者重置操作 超级轻量级的事件处理(每GB堆内存几百万Actor) 使用Akka可以在单机上构建高并发程序,也可以在网络中构建分布式程序。 1.3 Akka通信过程
  以下图片说明了Akka Actor的并发编程模型的基本流程: 学生创建一个ActorSystem 通过ActorSystem来创建一个ActorRef(老师的引用),并将消息发送给ActorRef ActorRef将消息发送给Message Dispatcher(消息分发器) Message Dispatcher将消息按照顺序保存到目标Actor的MailBox中 Message Dispatcher将MailBox放到一个线程中 MailBox按照顺序取出消息,最终将它递给TeacherActor接受的方法中 2. 创建Actor
  Akka中,也是基于Actor来进行编程的。类似于之前学习过的Actor。但是Akka的Actor的编写、创建方法和之前有一些不一样。 2.1 API介绍ActorSystem: 它负责创建和监督Actor 1.  在Akka中,ActorSystem是一个重量级的结构,它需要分配多个线程. 2.  在实际应用中, ActorSystem通常是一个单例对象, 可以使用它创建很多Actor. 3.  直接使用`context.system`就可以获取到管理该Actor的ActorSystem的引用实现Actor类 1. 定义类或者单例对象继承Actor(注意:要导入akka.actor包下的Actor) 2. 实现receive方法,receive方法中直接处理消息**即可,不需要添加loop和react方法调用. Akka会自动调用receive来接收消息. 3. 【可选】还可以实现preStart()方法, 该方法在Actor对象构建后执行,在Actor生命周期中仅执行一次.加载Actor 1. 要创建Akka的Actor,必须要先获取创建一个ActorSystem。需要给ActorSystem指定一个名称,并可以去加载一些配置项(后面会使用到) 2. 调用ActorSystem.actorOf(Props(Actor对象), "Actor名字")来加载Actor.2.2 Actor Path
  每一个Actor都有一个Path,这个路径可以被外部引用。路径的格式如下:
  Actor类型
  路径
  示例
  本地Actor
  akka://actorSystem名称/user/Actor名称
  akka://SimpleAkkaDemo/user/senderActor
  远程Actor
  akka.tcp://my-sys@ip地址:port/user/Actor名称
  akka.tcp://192.168.10.17:5678/user/service-b 2.3 入门案例2.3.1 需求
  基于Akka创建两个Actor,Actor之间可以互相发送消息。 2.3.2 实现步骤创建Maven模块 创建并加载Actor 发送/接收消息 2.3.3 创建Maven模块
  使用Akka需要导入Akka库,这里我们使用Maven来管理项目, 具体步骤如下: 创建Maven模块. 打开pom.xml文件,导入akka Maven依赖和插件.                       org.scala-lang             scala-library             ${scala.version}                                com.typesafe.akka             akka-actor_2.11             2.3.14                                com.typesafe.akka             akka-remote_2.11             2.3.14                                com.itheima             spark-demo-common             1.0-SNAPSHOT                              src/main/scala         src/test/scala                                       net.alchim31.maven                 scala-maven-plugin                 3.2.2                                                                                            compile                             testCompile                                                                                                                -dependencyfile                                 ${project.build.directory}/.scala_dependencies                                                                                                                                        org.apache.maven.plugins                 maven-shade-plugin                 2.4.3                                                               package                                                      shade                                                                                                                                                     *:*                                                                              META-INF/*.SF                                         META-INF/*.DSA                                         META-INF/*.RSA                                                                                                                                                                                                      reference.conf                                                                                                                                                                                                                                                                2.3.4 创建并加载Actor
  到这, 我们已经把Maven项目创建起来了, 后续我们都会采用Maven来管理我们的项目. 接下来, 我们来实现:
  创建并加载Actor, 这里, 我们要创建两个Actor: SenderActor:用来发送消息 ReceiverActor:用来接收,回复消息
  具体步骤在src/main/scala文件夹下创建包: com.itheima.akka.demo 在该包下创建两个Actor(注意: 用object修饰的单例对象).SenderActor: 表示发送消息的Actor对象.ReceiverActor: 表示接收消息的Actor对象. 在该包下创建单例对象Entrance, 并封装main方法, 表示整个程序的入口. 把程序启动起来, 如果不报错, 说明代码是没有问题的.
  参考代码object SenderActor extends Actor {     /*     细节:          在Actor并发编程模型中, 需要实现act方法, 想要持续接收消息, 可通过loop + react实现.         在Akka编程模型中, 需要实现receive方法, 直接在receive方法中编写偏函数处理消息即可.     */     //重写receive()方法     override def receive: Receive = {         case x => println(x)     } }   object ReceiverActor extends Actor{     //重写receive()方法     override def receive: Receive = {         case x => println(x)     } }  object Entrance {        def main(args:Array[String]) = {         //1. 实现一个Actor Trait, 其实就是创建两个Actor对象(上述步骤已经实现).          //2. 创建ActorSystem         //两个参数的意思分别是:ActorSystem的名字, 加载配置文件(此处先不设置)         val actorSystem = ActorSystem("actorSystem",ConfigFactory.load())          //3. 加载Actor         //actorOf方法的两个参数意思是: 1. 具体的Actor对象. 2.该Actor对象的名字         val senderActor = actorSystem.actorOf(Props(SenderActor), "senderActor")         val receiverActor = actorSystem.actorOf(Props(ReceiverActor), "receiverActor")     } }2.3.5 发送/接收消息
  思路分析使用样例类封装消息SubmitTaskMessage——提交任务消息SuccessSubmitTaskMessage——任务提交成功消息 使用!发送异步无返回消息.
  参考代码MessagePackage.scala文件中的代码 /**   * 记录发送消息的 样例类.   * @param msg  具体的要发送的信息.   */ case class SubmitTaskMessage(msg:String)  /**   * 记录 回执信息的 样例类.   * @param msg  具体的回执信息.   */ case class SuccessSubmitTaskMessage(msg:String)Entrance.scala文件中的代码 //程序主入口. object Entrance {   def main(args: Array[String]): Unit = {     //1. 创建ActorSystem, 用来管理所有用户自定义的Actor.     val actorSystem = ActorSystem("actorSystem", ConfigFactory.load())     //2. 通过ActorSystem, 来管理我们自定义的Actor(SenderActor, ReceiverActor)     val senderActor = actorSystem.actorOf(Props(SenderActor), "senderActor")     val receiverActor = actorSystem.actorOf(Props(ReceiverActor), "receiverActor")       //3. 由ActorSystem给 SenderActor发送一句话"start".     senderActor ! "start"   } }SenderActor.scala文件中的代码 object SenderActor extends Actor{   override def receive: Receive = {    //1. 接收Entrance发送过来的: start     case "start" => {       //2. 打印接收到的数据.       println("SenderActor接收到: Entrance发送过来的 start 信息.")        //3. 获取ReceiverActor的具体路径.       //参数: 要获取的Actor的具体路径.       //格式: akka://actorSystem的名字/user/要获取的Actor的名字.       val receiverActor = context.actorSelection("akka://actorSystem/user/receiverActor")        //4. 给ReceiverActor发送消息: 采用样例类SubmitTaskMessage       receiverActor ! SubmitTaskMessage("我是SenderActor, 我在给你发消息!...")     }        //5. 接收ReceiverActor发送过来的回执信息.     case SuccessSubmitTaskMessage(msg) => println(s"SenderActor接收到回执信息: ${msg} ")   } }ReceiverActor.scala文件中的代码 object ReceiverActor extends Actor {   override def receive: Receive = {     //1. 接收SenderActor发送过来的消息.     case SubmitTaskMessage(msg) => {       //2. 打印接收到的消息.       println(s"ReceiverActor接收到: ${msg}")        //3. 给出回执信息.       sender ! SuccessSubmitTaskMessage("接收任务成功!. 我是ReceiverActor")     }   } }
  输出结果SenderActor接收到: Entrance发送过来的 start 信息. ReceiverActor接收到: 我是SenderActor, 我在给你发消息!... SenderActor接收到回执信息: 接收任务成功!. 我是ReceiverActor3. Akka定时任务
  需求: 如果我们想要使用Akka框架定时的执行一些任务,该如何处理呢?
  答: 在Akka中,提供了一个 scheduler对象来实现定时调度功能。使用ActorSystem.scheduler.schedule()方法,就可以启动一个定时任务。3.1 schedule()方法的格式方式一: 采用发送消息的形式实现. def schedule(     initialDelay: FiniteDuration,		// 延迟多久后启动定时任务     interval: FiniteDuration,			// 每隔多久执行一次     receiver: ActorRef,					// 给哪个Actor发送消息     message: Any)						// 要发送的消息 (implicit executor: ExecutionContext)	// 隐式参数:需要手动导入方式二: 采用自定义方式实现. def schedule(     initialDelay: FiniteDuration,			// 延迟多久后启动定时任务     interval: FiniteDuration				// 每隔多久执行一次 )(f:   Unit)								// 定期要执行的函数,可以将逻辑写在这里 (implicit executor: ExecutionContext)		// 隐式参数:需要手动导入注意: 不管使用上述的哪种方式实现定时器, 都需要导入隐式转换和隐式参数, 具体如下:
  //导入隐式转换, 用来支持 定时器.
  import actorSystem.dispatcher
  //导入隐式参数, 用来给定时器设置默认参数.
  import scala.concurrent.duration._3.2 案例
  需求定义一个ReceiverActor, 用来循环接收消息, 并打印接收到的内容. 创建一个ActorSystem, 用来管理所有用户自定义的Actor. 关联ActorSystem和ReceiverActor. 导入隐式转换和隐式参数. 通过定时器, 定时(间隔1秒)给ReceiverActor发送一句话.方式一: 采用发送消息的形式实现.方式二: 采用自定义方式实现.
  参考代码//案例: 演示Akka中的定时器. object MainActor {   //1. 定义一个Actor, 用来循环接收消息, 并打印.   object ReceiverActor extends Actor {     override def receive: Receive = {       case x => println(x)      //不管接收到的是什么, 都打印.     }   }    def main(args: Array[String]): Unit = {     //2. 创建一个ActorSystem, 用来管理所有用户自定义的Actor.     val actorSystem = ActorSystem("actorSystem", ConfigFactory.load())     //3. 关联ActorSystem和ReceiverActor.     val receiverActor = actorSystem.actorOf(Props(ReceiverActor), "receiverActor")      //4. 导入隐式转换和隐式参数.     //导入隐式转换, 用来支持 定时器.     import actorSystem.dispatcher     //导入隐式参数, 用来给定时器设置默认参数.     import scala.concurrent.duration._      //5. 通过定时器, 定时(间隔1秒)给ReceiverActor发送一句话.     //方式一: 通过定时器的第一种方式实现, 传入四个参数.     //actorSystem.scheduler.schedule(3.seconds, 2.seconds, receiverActor, "你好, 我是种哥, 我有种子你买吗?...")      //方式二: 通过定时器的第二种方式实现, 传入两个时间, 和一个函数.     //actorSystem.scheduler.schedule(0 seconds, 2 seconds)(receiverActor ! "新上的种子哟, 你没见过! 嘿嘿嘿...")      //实际开发写法     actorSystem.scheduler.schedule(0 seconds, 2 seconds){       receiverActor ! "新上的种子哟, 你没见过! 嘿嘿嘿..."     }   } }4. 实现两个进程之间的通信4.1 案例介绍
  基于Akka实现在两个 进程间发送、接收消息。WorkerActor启动后去连接MasterActor,并发送消息给MasterActor. MasterActor接收到消息后,再回复消息给WorkerActor。
  4.2 Worker实现
  步骤创建一个Maven模块,导入依赖和配置文件.创建Maven模块 创建启动WorkerActor.在src/main/scala文件夹下创建包,在该包下创建 WorkerActor(单例对象的形式创建).在该包下创建Entrance单例对象, 里边定义main方法 发送"setup"消息给WorkerActor,WorkerActor接收打印消息. 启动测试.
  参考代码WorkerActor.scala文件中的代码 //1. 创建WorkActor, 用来接收和发送消息. object WorkerActor extends Actor{     override def receive: Receive = {         //2. 接收消息.         case x => println(x)     } }Entrance.scala文件中的代码 //程序入口. //当前ActorSystem对象的路径  akka.tcp://actorSystem@127.0.0.1:9999 object Entrance {     def main(args: Array[String]): Unit = {         //1. 创建ActorSystem.         val actorSystem = ActorSystem("actorSystem", ConfigFactory.load())         //2. 通过ActorSystem, 加载自定义的WorkActor.         val workerActor = actorSystem.actorOf(Props(WorkerActor), "workerActor")         //3. 给WorkActor发送一句话.         workerActor ! "setup"     } }	  //启动测试: 右键, 执行, 如果打印结果出现"setup", 说明程序执行没有问题.4.3 Master实现
  步骤创建一个Maven模块,导入依赖和配置文件.创建Maven模块. 创建启动MasterActor.在src/main/scala文件夹下创建包,在该包下创建 MasterActor(单例对象的形式创建).在该包下创建Entrance单例对象, 里边定义main方法 WorkerActor发送"connect"消息给MasterActor MasterActor回复"success"消息给WorkerActor WorkerActor接收并打印接收到的消息 启动Master、Worker测试
  参考代码MasterActor.scala文件中的代码 //MasterActor: 用来接收WorkerActor发送的数据, 并给其返回 回执信息. //负责管理MasterActor的ActorSystem的地址:  akka.tcp://actorSystem@127.0.0.1:8888 object MasterActor extends Actor{   override def receive: Receive = {     //1. 接收WorkerActor发送的数据     case "connect" => {       println("MasterActor接收到: connect!...")        //2. 给WorkerActor回执一句话.       sender ! "success"     }   } }Entrance.scala文件中的代码 //Master模块的主入口 object Entrance {   def main(args: Array[String]): Unit = {     //1. 创建ActorSystem, 用来管理用户所有的自定义Actor.     val actorSystem = ActorSystem("actorSystem", ConfigFactory.load())     //2. 关联ActorSystem和MasterActor.     val masterActor = actorSystem.actorOf(Props(MasterActor), "masterActor")     //3. 给masterActor发送一句话: 测试数据, 用来测试.     //masterActor ! "测试数据"   } }WorkerActor.scala文件中的代码(就修改了第3步) //WorkerActor: 用来接收ActorSystem发送的消息, 并发送消息给MasterActor, 然后接收MasterActor的回执信息. //负责管理WorkerActor的ActorSystem的地址:  akka.tcp://actorSystem@127.0.0.1:9999 object WorkerActor extends Actor{   override def receive: Receive = {     //1. 接收Entrance发送过来的: setup.     case "setup" => {       println("WorkerActor接收到: Entrance发送过来的指令 setup!.")        //2. 获取MasterActor的引用.       val masterActor = context.system.actorSelection("akka.tcp://actorSystem@127.0.0.1:8888/user/masterActor")        //3. 给MasterActor发送一句话.       masterActor ! "connect"     }        //4. 接收MasterActor的回执信息.     case "success" => println("WorkerActor接收到: success!")   } }5. 案例: 简易版spark通信框架5.1 案例介绍
  模拟Spark的Master与Worker通信. 一个Master管理多个Worker 若干个Worker(Worker可以按需添加)向Master发送注册信息向Master定时发送心跳信息 5.2 实现思路构建Master、Worker阶段构建Master ActorSystem、Actor构建Worker ActorSystem、Actor Worker注册阶段Worker进程向Master注册(将自己的ID、CPU核数、内存大小(M)发送给Master) Worker定时发送心跳阶段Worker定期向Master发送心跳消息 Master定时心跳检测阶段Master定期检查Worker心跳,将一些超时的Worker移除,并对Worker按照内存进行倒序排序 多个Worker测试阶段启动多个Worker,查看是否能够注册成功,并停止某个Worker查看是否能够正确移除 5.3 工程搭建
  需求
  本项目使用Maven搭建工程.
  步骤分别搭建以下几个项目, Group ID统一都为: com.yueda, 具体工程名如下:
  工程名
  说明
  spark-demo-common
  存放公共的消息、实体类
  spark-demo-master
  Akka Master节点
  spark-demo-worker
  Akka Worker节点 导入依赖   分别在三个项目下的src/main, src/test下, 创建scala目录. 导入配置文件(资料包中的application.conf) 修改Master的端口为7000 修改Worker的端口为8000 5.4 构建Master和Worker
  需求
  分别构建Master和Worker,并启动测试
  步骤创建并加载Master Actor 创建并加载Worker Actor 测试是否能够启动成功
  参考代码完成master模块中的代码, 即: 在src/main/scala下创建包: com.itheima.spark.master, 包中代码如下: MasterActor.scala文件中的代码 //Master: 用来管理多个Worker的. //MasterActor的路径: akka.tcp://actorSystem@127.0.0.1:7000 object MasterActor extends Actor{     override def receive: Receive = {         case x => println(x)     } }Master.scala文件中的代码 //程序入口: 相当于我们以前写的MainActor object Master {     def main(args: Array[String]): Unit = {         //1. 创建ActorSystem.         val actorSystem = ActorSystem("actorSystem", ConfigFactory.load())         //2. 通过ActorSystem, 关联MasterActor.         val masterActor = actorSystem.actorOf(Props(MasterActor), "masterActor")         //3. 启动程序, 如果不报错, 说明代码没有问题.     } }
  完成worker模块中的代码, 即: 在src/main/scala下创建包: com.itheima.spark.worker, 包中代码如下: WorkerActor.scala文件中的代码 //WorkerActor的地址: akka.tcp://actorSystem@127.0.0.1:7100 object WorkerActor extends Actor{     override def receive: Receive = {         case x => println(x)     } }
  Worker.scala文件中的代码 //程序入口 object Worker {     def main(args: Array[String]): Unit = {         //1. 创建ActorSystem.         val actorSystem = ActorSystem("actorSystem", ConfigFactory.load())         //2. 通过ActorSystem, 关联MasterActor.         val workerActor = actorSystem.actorOf(Props(WorkerActor), "workerActor")         //3. 启动程序, 如果不报错, 说明代码没有问题.         workerActor ! "hello"     } }5.5 Worker注册阶段实现
  需求
  在Worker启动时,发送注册消息给Master.
  思路分析Worker向Master发送注册消息(workerid、cpu核数、内存大小)随机生成CPU核(1、2、3、4、6、8)随机生成内存大小(512、1024、2048、4096)(单位M) Master保存Worker信息,并给Worker回复注册成功消息 启动测试
  具体步骤在spark-demo-common项目的src/main/scala文件夹下创建包 在WorkerActor单例对象中定义一些成员变量, 分别表示:masterActorRef: 表示MasterActor的引用.workerid: 表示当前WorkerActor对象的id.cpu: 表示当前WorkerActor对象的CPU核数.mem: 表示当前WorkerActor对象的内存大小.cup_list: 表示当前WorkerActor对象的CPU核心数的取值范围.mem_list: 表示当前WorkerActor对象的内存大小的取值范围. 在WorkerActor的preStart()方法中, 封装注册信息, 并发送给MasterActor. 在MasterActor中接收WorkerActor提交的注册信息, 并保存到双列集合中.. MasterActor给WorkerActor发送回执信息(注册成功信息.). 在WorkerActor中接收MasterActor回复的 注册成功信息.
  参考代码WorkerActor.scala文件中的代码 //WorkerActor的地址: akka.tcp://actorSystem@127.0.0.1:7100 object WorkerActor extends Actor {     //1 定义成员变量, 记录MasterActor的引用, 以及WorkerActor提交的注册参数信息.     private var masterActorRef: ActorSelection = _    //表示MasterActor的引用.     private var workerid:String = _                   //表示WorkerActor的id     private var cpu:Int = _                           //表示WorkerActor的CPU核数     private var mem:Int = _                           //表示WorkerActor的内存大小.     private val cpu_list = List(1, 2, 3, 4, 6, 8)  //CPU核心数的取值范围     private val mem_list = List(512, 1024, 2048, 4096) //内存大小取值范围       //2. 重写preStart()方法, 里边的内容: 在Actor启动之前就会执行.     override def preStart(): Unit = {         //3. 获取Master的引用.         masterActorRef = context.actorSelection("akka.tcp://actorSystem@127.0.0.1:7000/usre/masterActor")          //4. 构建注册消息.         workerid = UUID.randomUUID().toString     //设置workerActor的id         val r = new Random()         cpu = cpu_list(r.nextInt(cpu_list.length))         mem = mem_list(r.nextInt(mem_list.length))         //5. 将WorkerActor的提交信息封装成 WorkerRegisterMessage对象.         var registerMessage = WorkerRegisterMessage(workerid, cpu, mem)         //6. 发送消息给MasterActor.         masterActorRef ! registerMessage     }      override def receive: Receive = {         case x => println(x)     } }MasterActor.scala文件中的代码 //Master: 用来管理多个Worker的. //MasterActor的路径: akka.tcp://actorSystem@127.0.0.1:7000 object MasterActor extends Actor{     //1. 定义一个可变的Map集合, 用来保存注册成功好的Worker信息.     private val regWorkerMap = collection.mutable.Map[String, WorkerInfo]()      override def receive: Receive = {         case WorkerRegisterMessage(workId, cpu, mem) => {             //2. 打印接收到的注册信息             println(s"MasterActor: 接收到worker注册信息, ${workId}, ${cpu}, ${mem}")              //3. 把注册成功后的保存信息保存到: workInfo中.             regWorkerMap +=  workId -> WorkerInfo(workId, cpu, mem)              //4. 回复一个注册成功的消息.             sender ! RegisterSuccessMessage         }     } }修改WorkerActor.scala文件中receive()方法的代码 override def receive: Receive = {     case RegisterSuccessMessage => println("WorkerActor: 注册成功!") }5.6 Worker定时发送心跳阶段
  需求
  Worker接收到Master返回的注册成功信息后,定时给Master发送心跳消息。而Master收到Worker发送的心跳消息后,需要更新对应Worker的最后心跳时间。
  思路分析编写工具类读取心跳发送时间间隔 创建心跳消息 Worker接收到注册成功后,定时发送心跳消息 Master收到心跳消息,更新Worker最后心跳时间 启动测试
  具体步骤在worker的src/main/resources文件夹下的 application.conf文件中添加一个配置. worker.heartbeat.interval = 5 //配置worker发送心跳的周期(单位是 s) 在worker项目的com.itheima.spark.work包下创建一个新的单例对象: ConfigUtils, 用来读取配置文件信息. 在WorkerActor的receive()方法中, 定时给MasterActor发送心跳信息. Master接收到心跳消息, 更新Worker最后心跳时间. .
  参考代码worker项目的ConfigUtils.scala文件中的代码 object ConfigUtils {     //1. 获取配置信息对象.     private val config = ConfigFactory.load()     //2. 获取worker心跳的具体周期     val `worker.heartbeat.interval` = config.getInt("worker.heartbeat.interval") }修改WorkerActor.scala文件的receive()方法中的代码 override def receive: Receive = {     case RegisterSuccessMessage => {         //1. 打印接收到的 注册成功消息         println("WorkerActor: 接收到注册成功消息!")         //2. 导入时间单位隐式转换 和 隐式参数         import scala.concurrent.duration._         import context.dispatcher            //3. 定时给Master发送心跳消息.         context.system.scheduler.schedule(0 seconds, ConfigUtil.`worker.heartbeat.interval` seconds){             //3.1 采用自定义的消息的形式发送 心跳信息.             masterActorRef ! WorkerHeartBeatMessage(workerId, cpu, mem)         }     } }MasterActor.scala文件中的代码 object MasterActor extends Actor {     //1. 定义一个可变的Map集合, 用来保存注册成功好的Worker信息.     private val regWorkerMap = collection.mutable.Map[String, WorkerInfo]()      override def receive: Receive = {         //接收注册信息.         case WorkerRegisterMessage(workId, cpu, mem) => {             //2. 打印接收到的注册信息             println(s"MasterActor: 接收到worker注册信息, ${workId}, ${cpu}, ${mem}")              //3. 把注册成功后的保存信息保存到: workInfo中.             regWorkerMap += workId -> WorkerInfo(workId, cpu, mem, new Date().getTime)              //4. 回复一个注册成功的消息.             sender ! RegisterSuccessMessage         }          //接收心跳消息         case WorkerHeartBeatMessage(workId, cpu, mem) => {             //1. 打印接收到的心跳消息.             println(s"MasterActor: 接收到${workId}的心跳信息")              //2. 更新指定Worker的最后一次心跳时间.             regWorkerMap += workId -> WorkerInfo(workId, cpu, mem, new Date().getTime)             //3. 为了测试代码逻辑是否OK, 我们可以打印下 regWorkerMap的信息             println(regWorkerMap)         }     } }5.7 Master定时心跳检测阶段
  需求
  如果某个worker超过一段时间没有发送心跳,Master需要将该worker从当前的Worker集合中移除。可以通过Akka的定时任务,来实现心跳超时检查。
  思路分析编写工具类,读取检查心跳间隔时间间隔、超时时间 定时检查心跳,过滤出来大于超时时间的Worker 移除超时的Worker 对现有Worker按照内存进行降序排序,打印可用Worker
  具体步骤修改Master的application.conf配置文件, 添加两个配置 #配置检查Worker心跳的时间周期(单位: 秒) master.check.heartbeat.interval = 6 master.check.heartbeat.timeout = 15  在Master项目的com.itheima.spark.master包下创建: ConfigUtils工具类(单例对象), 用来读取配置文件信息. 在MasterActor中开始检查心跳(即: 修改MasterActor#preStart中的代码.). 开启Master, 然后开启Worker, 进行测试.
  参考代码Master项目的ConfigUtils.scala文件中的代码 //针对Master的工具类. object ConfigUtil {     //1. 获取到配置文件对象.     private val config: Config = ConfigFactory.load()     //2. 获取检查Worker心跳的时间周期(单位: 秒)     val `master.check.heartbeat.interval` = config.getInt("master.check.heartbeat.interval")     //3. 获取worker心跳超时的时间(秒)     val `master.check.heartbeat.timeout` = config.getInt("master.check.heartbeat.timeout") }MasterActor.scala文件的preStart()方法中的代码 //5. 定时检查worker的心跳信息 override def preStart(): Unit = {     //5.1 导入时间转换隐式类型 和 定时任务隐式变量     import scala.concurrent.duration._     import context.dispatcher      //5.2 启动定时任务.     context.system.scheduler.schedule(0 seconds, ConfigUtil.`master.check.heartbeat.interval` seconds) {         //5.3 过滤大于超时时间的Worker.         val timeOutWorkerMap = regWorkerMap.filter {             keyval =>             //5.3.1 获取最后一次心跳更新时间.             val lastHeatBeatTime = keyval._2.lastHeartBeatTime             //5.3.2 超时公式: 当前系统时间 - 最后一次心跳时间 > 超时时间(配置文件信息 * 1000)             if (new Date().getTime - lastHeatBeatTime > ConfigUtil.`master.check.heartbeat.timeout` * 1000) true else false         }         //5.4 移除超时的Worker         if(!timeOutWorkerMap.isEmpty) {             //如果要被移除的Worker集合不为空, 则移除此 timeOutWorkerMap             //注意: 双列集合是根据键移除元素的, 所以最后的 _._1是在获取键.             regWorkerMap --= timeOutWorkerMap.map(_._1)         }         //5.5 对worker按照内存大小进行降序排序, 打印Worker         //_._2 获取所有的WorkInfo对象.         val workerList = regWorkerMap.map(_._2).toList         //5.6 按照内存进行降序排序.         val sortedWorkerList = workerList.sortBy(_.mem).reverse         //5.7 打印结果         println("按照内存的大小降序排列的Worker列表: ")         println(sortedWorkerList)     } }5.8 多个Worker测试阶段
  需求
  修改配置文件,启动多个worker进行测试。
  大白话: 启动一个Worker, 就修改一次Worker项目下的application.conf文件中记录的端口号, 然后重新开启Worker即可.
  步骤测试启动新的Worker是否能够注册成功 停止Worker,测试是否能够从现有列表删除

2019年必买的6类高性价比Linux开发板Dragonboard和Firefly专场大家好,我是人见人爱的小月月,在之前的文章里,我们聊到了2019年必买的6类高性价比Linux开发板OlinuXino专场2019年必买的6类高性价比Linux开发板BPI专场20IT产业的三大定律,你都知道么?作为IT从业人员,如果你不知道IT产业的三大定律,肯定会有人嘲笑你。今天小月月就带大家了解一下三大定律。1摩尔定律这是由英特尔(Intel)创始人之一戈登摩尔(GordonMoor奇瑞新能源艾瑞泽e安全系数高,你还说国产车不靠谱吗?汽车最重要的是不是品质,越是配置,更加不是发动机和变速箱,最重要的不可否认的是安全。每辆汽车在出厂之前都会做安全方面的测试,并且会送去专业的检测机构去检验,符合一定的标准才会出厂销新款奇瑞小蚂蚁带着更大的诚意来啦随着消费者对新能源汽需求的提高,新能源汽车厂家不断在已有的汽车基础上进行升级,奇瑞小蚂蚁这款极受消费者欢迎的汽车也在前段时间迎来了它的新款汽车,即奇瑞小蚂蚁20万蚁粉款,这款汽车在Arduino系列新增四款Nano板,包含WiFiBLE传感器以及硬件加密大家好,我是小月月,MakerFaire2019正在美国加州的SanMateo举行,按照惯例,Arduino会在展会上宣布他们的新产品。今年也不例外,他们发布了新的Nano系列,共乘风破浪,吉利博越PRO搭载的车机系统GKUI实力强大必须承认的是,智能化生活早已渗透到了日常生活之中,它关系到人们生活的每时每刻。当汽车被注入了灵魂,它就不再是一件普通的出行代步工具,车机系统可以说是汽车的大脑。而这款吉利博越PRO车机系统GKUI强大丰富的功能让人期待车机系统,可以说是人们日常用车中常常被忽视的,但又是每天都在接触的配置。一套好的车机系统,不仅仅可以为人们提供丰富的娱乐体验,而且可以为驾驶提供便捷。这款被称为最黑的黑科技的吉克智多媒体车机GKUI,人车家互联,好看还好用汽车想要实现智能化,网联化,车机系统非常关键。正如当年智能手机的崛起并非单纯依靠硬件水平的提高,更多的是得益于系统上的优势。车载系统的好坏对于一些功能的实现和用户体验具有至关重要的矽速科技推出MaixHubAI模型平台,让AI应用不再难面对新兴的AI浪潮,你是否觉得无从入手?觉得算法太过艰涩,AI硬件太过昂贵?矽速科技继推出6美金的AI模组M1,和百元级的AI开发板系列MAIX后,又隆重推出MaixHub模型平台真正好用的GKUI车机系统功能丰富,非常吸引人汽车正在朝着智能化的方向不断前行,对汽车消费者来说,车不只是冰冷的金属机器,而是有头脑的出行伙伴。在汽车智能化道路上,智能车机系统在不断升级进化,带给用户更有趣智能的出行体验。其中高质感汽车奇瑞大蚂蚁很全面随着国内便宜实惠的大尺寸SUV新能源汽车越来越多,很多人开始接受了这类型汽车的存在。但是在新能源汽车市场,很多汽车在设计的时候哦,总是顾此失彼,很多新能源汽车只顾续航,却忽略了汽车
易烊千玺20岁获金像奖,宋文清19岁成博士00后不是被洗脑的一代01hr最近,易烊千玺喜提第39届香港电影金像奖最佳新演员,沸腾了热搜。而他出演的少年的你也包揽了8项大奖。可谓是赢了个大满贯。易烊千玺获得最佳新演员奖原本是一件喜大普奔的事情,毕江阴石塘湾公园系个人建造,免费开放,还有断桥,谭纶大司马牌坊难忘的2020年还有十多天就要过去了,3月份时做梦没有想到,今年还能到全国各地到处跑,5月下旬开始从常州出发,沿着山东半岛,渤海湾骑车到秦皇岛,26天骑行3千公里,把沿途知名旅游景志愿军老战士薛筱金,骑行天下找战友,委托骑友多多爷爷找孟小英为纪念抗美援朝出国作战70周年,国家为每一位参加过抗美援朝的革命战士颁发一枚中国人民志愿军抗美援朝出国作战70周年纪念章。常州市武进区中医院10月27日专门召开全院党员干部大会,医常州小黄山国庆未开放,封山改造,丹阳七峰山,变废为宝成网红地10月7日,一去丹阳七峰山。国庆中秋小长假还剩两天了,我随着常州市老年自行车骑游队,前往丹阳市七峰山景区游玩。65个人,65辆车,浩浩荡荡从龙城大道向西转上239省道,一直向北骑,常州三杰立体形象惊现孟河,龙城大道往机场高架路又延长了秋分时节,江南水稻成熟,又是一年丰收时,常州网红打卡地孟河稻田艺术园还没有去,赶紧去看看,再迟水稻收割就看不到啦。今天一早6点钟,骑着自行车出了常州市区,上了龙城大道一直向西骑,龙镇江五峰山,悬索桥上跑火车,连云港直达苏锡常沪,不用绕安徽难忘的2020年还有几天就要过去了,3月份时,宅在家里做梦没有想到今年还能到处旅游,5月份就从常州出发,沿江苏,山东,河北省沿海一个人一辆车骑到秦皇岛,看到祖国大地上复工复产生机勃苏锡常没有直达南通,盐城火车,要到上海换乘?这样走省100元钱多多爷爷老家苏北盐城滨海县,儿子家在常州,还有叔叔,内侄等亲戚家住苏州,无锡,多年来经常往来于苏南苏北,近几年来常住常州,回老家时只能坐客车。一直盼望着能乘坐火车跨越长江,快捷往来上海三价就低之后,楼市未来怎么走?上海的政策,三价就低,变相提高首套二套首付,减少买家杠杆率,目前首套基本需要6成以上,这个政策直接打住房地产的七寸,死死摁住不让房价大涨。事实上,房地产也好,股市也罢,一旦银根收紧注意!今天起,巴州地区这些地方计划停电!最多影响5000户尊敬的广大电力用户为您提供2021年8月1日8日的计划检修停电信息给您带来的不便敬请谅解!巴州这些地方计划停电你准备好了吗?在停电区域的小伙伴请奔走相告提前做好准备库尔勒计划停电停CoCo都可起诉同名企业侵害商标权,获赔5。5万天眼查App显示,近日,上海馥邑企业管理有限公司与杭州都可生物科技连锁有限公司北京微梦创科网络技术有限公司侵害商标权纠纷一审民事判决书公布,案号为(2019)沪0115民初8506为什么在商标申请前都会进行商标检索呢?商标检索,也就是商标注册申请前的商标查询,是商标申请人或代理人查询申请注册的商标是否存在在先相同或近似商标。虽然商标查询并不是商标申请的必须步骤,但是在商标市场的发展中,这一步已经