范文健康探索娱乐情感热点
投稿投诉
热点动态
科技财经
情感日志
励志美文
娱乐时尚
游戏搞笑
探索旅游
历史星座
健康养生
美丽育儿
范文作文
教案论文
国学影视

RocketMQStreams1。1。0轻量级流处理再出发

  一、背景
  流处理是数据集成领域一个重要话题,他能显著减少数据输入和结果输出之间延迟,在对时间延迟敏感的商业场景,例如安全、智能运维、实时推荐,有大量的需求。RocketMQ作为一款消息中间件,已经在业务集成领域展现出巨大价值,但是在数据集成领域还有较大拓展空间。通过支持流处理可以带动RocketMQ进入数据集成领域,拓展RocketMQ的使用范围。
  RocketMQ Streams是一款基于RocketMQ为基础的轻量级流计算引擎,具有资源消耗少、部署简单、功能全面的特点,目前已经在社区开源。RocketMQ Streams在阿里云内部被使用在对资源比较敏感,同时又强烈需要流计算的场景,比如在自建机房的云安全场景下,对处理结果的实时性要求非常高,同时考虑到部署和运维成本,轻量级计算引擎就成为一种可行的新选择。
  自RocketMQ Streams开源以来,吸引了大量用户调研和试用。但是也存在一些问题,在RocketMQ Streams 1.1.0中,主要针对以下问题做出了改进和优化。 面向用户API不够友好,不能使用泛型,不支持自定义序列化/反序列化; 代码冗余,在RocketMQ Streams中存在序列化反序列化流处理拓扑模块,RocketMQ Streams作为轻量级流处理SDK,构建好流处理节点之后应该可以直接处理数据,不存在本地保存和网络传输需求。 流处理过程不清晰,含有大量缓存逻辑; 存在大量支持SQL的逻辑,这部分和SDK方式运行流处理任务的逻辑无关;
  在RocketMQ Streams 1.1.0中,对上述问题做出了改进,期望能带来更好的使用体验。同时,重新设计了流处理拓扑构建过程、去掉冗余代码,使得代码更容易被理解。
  二、典型使用示例
  本地运行下列示例步骤: 部署RocketMQ 5.0; 使用mqAdmin创建topic; 构建示例工程,添加依赖,启动示例。RocketMQ Streams 坐标:      org.apache.rocketmq     rocketmq-streams     1.1.0 向topic中写入相应数据,并观察结果。
  更详细文档请参考:RocketMQ Streams quick start WordCountpublic class WordCount {     public static void main(String[] args) {         StreamBuilder builder = new StreamBuilder("wordCount");          builder.source("sourceTopic", total -> {                     String value = new String(total, StandardCharsets.UTF_8);                     return new Pair<>(null, value);                 })                 .flatMap((ValueMapperAction>) value -> {                     String[] splits = value.toLowerCase().split("W+");                     return Arrays.asList(splits);                 })                 .keyBy(value -> value)                 .count()                 .toRStream()                 .print();          TopologyBuilder topologyBuilder = builder.build();          Properties properties = new Properties();         properties.put(MixAll.NAMESRV_ADDR_PROPERTY, "127.0.0.1:9876");          RocketMQStream rocketMQStream = new RocketMQStream(topologyBuilder, properties);          final CountDownLatch latch = new CountDownLatch(1);          Runtime.getRuntime().addShutdownHook(new Thread("wordcount-shutdown-hook") {             @Override             public void run() {                 rocketMQStream.stop();                 latch.countDown();             }         });          try {             rocketMQStream.start();             latch.await();         } catch (final Throwable e) {             System.exit(1);         }         System.exit(0);     } }窗口聚合public class WindowCount {     public static void main(String[] args) {         StreamBuilder builder = new StreamBuilder("windowCountUser");          AggregateAction aggregateAction = (key, value, accumulator) -> new Num(value.getName(), 100);          builder.source("user", source -> {                     User user1 = JSON.parseObject(source, User.class);                     return new Pair<>(null, user1);                 })                 .selectTimestamp(User::getTimestamp)                 .filter(value -> value.getAge() > 0)                 .keyBy(value -> "key")                 .window(WindowBuilder.tumblingWindow(Time.seconds(15)))                 .aggregate(aggregateAction)                 .toRStream()                 .print();          TopologyBuilder topologyBuilder = builder.build();          Properties properties = new Properties();         properties.putIfAbsent(MixAll.NAMESRV_ADDR_PROPERTY, "127.0.0.1:9876");         properties.put(Constant.TIME_TYPE, TimeType.EVENT_TIME);         properties.put(Constant.ALLOW_LATENESS_MILLISECOND, 2000);          RocketMQStream rocketMQStream = new RocketMQStream(topologyBuilder, properties);          rocketMQStream.start();     } }双流JOIN
  下例是窗口JOIN,也支持无窗口类型的双流JOIN。 public class JoinWindow {     public static void main(String[] args) {         StreamBuilder builder = new StreamBuilder("joinWindow");          //左流         RStream user = builder.source("user", total -> {             User user1 = JSON.parseObject(total, User.class);             return new Pair<>(null, user1);         });          //右流         RStream num = builder.source("num", source -> {             Num user12 = JSON.parseObject(source, Num.class);             return new Pair<>(null, user12);         });          //自定义join后的运算         ValueJoinAction action = new ValueJoinAction() {             @Override             public Union apply(User value1, Num value2) {                 ...             }         };          user.join(num)                 .where(User::getName)                 .equalTo(Num::getName)                 .window(WindowBuilder.tumblingWindow(Time.seconds(30)))                 .apply(action)                 .print();          TopologyBuilder topologyBuilder = builder.build();          Properties properties = new Properties();         properties.put(MixAll.NAMESRV_ADDR_PROPERTY, "127.0.0.1:9876");          RocketMQStream rocketMQStream = new RocketMQStream(topologyBuilder, properties);          rocketMQStream.start();     } }Java对象支持public class Demo {     public static void main(String[] args) {         StreamBuilder builder = new StreamBuilder("demo");          builder.source("user", new KeyValueDeserializer() {                     @Override                     public Pair deserialize(byte[] total) throws Throwable {                          //自定义反序列化                     }                 })                 .keyBy(User::getAge)                 .count(User::getName)                 .toRStream()                 .print();          TopologyBuilder topologyBuilder = builder.build();          Properties properties = new Properties();         properties.put(MixAll.NAMESRV_ADDR_PROPERTY, "127.0.0.1:9876");          RocketMQStream rocketMQStream = new RocketMQStream(topologyBuilder, properties);          rocketMQStream.start();     } }
  三、方案设计
  RocketMQ Streams作为客户端SDK直接被使用方依赖,从而获得流处理能力。他从RocketMQ的source topic中读取数据,经过处理后将结果写回到RocketMQ的sink topic中。这种架构的好处是业务无需添加任务第三方依赖,直接从数据源读取数据即可获得流处理能力。 流处理拓扑构建过程
  在使用者书写上述及连表达式时,发生第一次构建,即逻辑节点的添加,前后算子具有父子关系,构建后形成逻辑节点,多个逻辑节点形成链表。
  逻辑构建结束后,调用StreamBuilder#build()方法进行第二次构建,将逻辑节点中可能包含的多个真实节点添加拓扑,形成处理拓扑图。
  经过两次两次构建后,处理拓扑已经完整。但是为了区分不同topic使用不同拓扑处理,需要在数据来临前的重平衡阶段,创建真实数据处理节点,这是第三次构建。 逻辑节点构建(第一次构建)
  逻辑节点本身不包括实际操作,但是可由逻辑节点继续构建出实际节点,一个逻辑节点可能包含一实际节点,也可能包含多个实际节点,例如count逻辑算子不仅仅包含累加这个实际操作,累加前需要对相同key的数据路由到同一计算实例上,因此还需要包含sink、source两个实际节点,但是这些只会在构建实际节点时体现出来,不会在添加逻辑节点阶段体现。
  每个逻辑节点都是GraphNode的子类,构建时,将子节点算子加入父节点child集合中,将父节点加入子节点parent集合中。这个构建过程中使用Pipeline均为同一个实例。随着构建过程,将逻辑节点加入到pipeline中,父子节点形成以root节点为根节点的链表。
  添加逻辑节点逻辑: @Override public  GroupedStream map(ValueMapperAction mapperAction) {     //1、确定节点名称      //2、实现Supplier类,实现数据处理逻辑      //3、实例化逻辑节点类GraphNode      //4、将逻辑节点GraphNode添加到pipeline中形成链表 }
  可以看到逻辑节点的添加非常通用,实现不同功能的算子,只需要实现算子对应的数据实际处理逻辑即可,如果将新增算子形成拓扑图等等后续工作完全不用关心,降低了新算子开发的门槛。
  在逻辑节点的构建过程中,有两类比较特殊的算子,一个是实现数据分组的shuffle算子,一个是实现双流聚合的Join算子。
  shuffle逻辑算子的功能是将含有相同key的数据发送到同一个队列中,方便后续算子对相同key的数据进行统计。他通常是keyBy后面紧跟的算子,例如keyBy("年纪").count(),那么count就是一个shuffle算子类型。shuffle逻辑算子包含三个实际处理过程: 将数据按照Key的hash%queueNum发送到对应队列; 从RocketMQ中拉取上述数据到本地; 按照shuffle节点中定义的逻辑进行处理,例如累加。
  Join算子的功能是实现双流聚合,将两个数据流聚合成一个。
  Join拓扑图
  在左流和右流上添加KeyBy算子,对左流和右流进行分别过滤;之后在左流和右流上分别添加标签节点,在数据中添加此数据是左流还是有流,之后将两个标签节点,指向一个共同的Join节点,数据在此完成汇聚,按照使用者给定的ValueJoinAction节点处理。
  Join使用方式: StreamBuilder builder = new StreamBuilder(jobId);  RStream leftStream = builder.source(...); RStream rightStream = builder.source(...);  ValueJoinAction action = new ValueJoinAction(){...};  leftStream.join(rightStream)           .where(左流字段)           .equalTo(右流字段)           .apply(action)           .print();
  Join实现伪代码: //左右流按照各自字段分组,含有相同key的字段会被回写到同一个队列里面; GroupedStream leftGroupedStream = leftStream.keyBy(左流字段); //因为后面左右流数据会在一起处理,为了区分数据来源,在数据中添加标记是左流还是右流 leftGroupedStream.addGraphNode(addTag); //获取leftGroupedStream最后的逻辑节点 GraphNode leftLast = leftGroupedStream.getLast();      GroupedStream rightGroupedStream = leftStream.keyBy(右流字段); rightGroupedStream.addGraphNode(addTag); GraphNode rightLast = rightGroupedStream.getLast();  //数据汇聚节点 ProcessorNode commChild = new ProcessorNode(name, temp, "聚合数据实际操作"); commChild.addParent(leftLast); commChild.addParent(rightLast);  //统一数据流 RStreamImpl commRStream = new RStreamImpl<>(Pipeline, commChild); //继续在统一数据流上操作 commRStream...物理构建(第二次构建)
  构建逻辑节点完毕后,从ROOT节点开始遍历,调用GraphNode逻辑节点 addRealNode 方法,构建真实节点构建工厂类
  在第二次构建实际节点过程中,会对逻辑节点进行拆解,对于大多数逻辑节点,只需要构建一个实际节点,但是对于某些特殊的逻辑节点需要构建多个实际节点才能与之对应,例如shuffle类型逻辑节点,他需要包含三个实际节点:发送数据节点、消费数据节点、处理数据节点。
  shuffle类型逻辑节点父节点必须是GroupBy,例如上图所示的count是shuffle节点,Window节点也可以是逻辑节点。 第二次构建并不会直接生成处理数据的 Processor,而是产生ProcessorFactory对象。为什么不生成直接能处理数据的Processor对象呢?因为一个RocketMQ Streams实例需要同时拉取不同队列进行流计算,为了能将不同队列的流计算过程区别开,针对每个队列会由独立的Processor实例进行处理,因此第二次构建仅仅构建出ProcessorFactory,在重平衡确定流处理实例要拉去哪些队列后,在由ProcessorFactory实例化Processor。第三次构建
  客户程序依赖RocketMQ Streams获得流计算能力,因此客户程序本质上是就是一个RocketMQ Client(见6.1.16方案架构图)。在RocketMQ Client发生重平衡时,会将RocketMQ Server所包含的队列在客户端中重新分配,第三次构建,也就是右ProcessorFactory实例化Processor,就发生在重平衡发生后,拉取数据前。第三次真实的构建出了处理数据的Processor,并将子节点Processor添加进入父节点Processor中。 数据处理过程状态恢复
  流处理过程中产生的计算状态保存、恢复涉及到流处理过程的正确性。在流处理实例宕机的情况下,该流处理实例上消费的队列会被重平衡到其他流处理实例上。如果对该队列进行了有状态计算,那么产生的状态也需要在新的流计算实例上恢复。如上图中,Instance1宕机,他消费的MQ2和MQ3被分别迁移到Instance2和Instance3上,MQ2和MQ3对应的状态(紫色和蓝色)也需要在Instance2和Instance3上恢复出来。 存储介质
  使用本地RocksDB,远程RocketMQ的组合,作为状态存储介质。流计算在计算状态时,RocksDB在使用有限内存情况作为状态的临时本地存储于算子交互,在计算结束后提交消费位点时将本次计算产生的状态一并写入RocketMQ中。消费位点提交、计算结果写出、状态保存需要保持原子状态,这一内容在后面流计算正确性中讨论。 状态持久化存储
  RocketMQ作为消息临时存储,存在数据最大过期时间,一旦过期后,数据会被删除。但是状态存储介质本质上是以KV方式存储数据,不希望KV数据随着时间过期而被删除。因此,使用Compact topic作为状态存储,他会对同一队列的数据 按照Key对数据进行压缩,相同Key的数据只保留offset最大的一条。 //key如果决定数据被发送到某个Broker的哪个队列 int queueId = hash(key) % queueNum
  但是在RocketMQ中队列数会随着Broker扩缩容而增加或者减少,扩缩Broker数量前后,相同的Key可能被发送到不同的队列,那么按照上述规则进行Compact后得到的最新值就是错误的,使用Compact topic作为KV存储就失去了意义。
  因此在状态topic是Compact topic的基础上,再将状态topic创建为Static topic(Logic Queue),即状态topic即是Compact topic也是Static topic。这样才能解耦队列数量与Broker数量,使队列数量在扩缩Broker情况下仍然不变,保证含有相同Key的数据能被发送到同一队列中。
  点击查看原文,获取更多福利!
  https://developer.aliyun.com/article/1141069?utm_content=g_1000367577
  版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

小胖一出,花海靠边?融一其随狼队输的不是eStar,而是清融时光飞逝,转眼间KPL夏季赛常规赛第九周就画上了完美的句号,先给大家浅谈一下第九周结束后S组和A组的情况1。S组目前只有北京WB这一支战队杀出了重围,进入到了季后赛胜者组,所以他们九重试炼强力武器组合万剑归宗流本文首发九重攻略组公众号,未经授权禁止抄袭文章转载至其它平台。哈喽大家好我是飞鹤,本期飞鹤给大家带来的是九重试炼高强度武器组合推荐,万剑归宗流,各位小伙伴一起来跟我看看吧。开局飞鹤不怕物理刺客的射手,s28赛季提高胜率的虞姬普攻流出装射手虞姬是很多物理英雄的克星,因为二技能的免疫效果,可以避免很多的致命伤,更是可以免疫防御塔的伤害。首先要了解一下虞姬这个英雄被动树神护佑虞姬的被动有机率减速敌人,还可以在被敌人追中医养生这个部位是耳鸣耳聋的死穴嗡嗡嗡耳朵里像养了一大群蜜蜂,吵得头昏脑涨,听不清别人说话,晚上睡不好觉,听力越来越差,行动变得迟缓有耳鸣的人,才知道这其中苦楚,其实耳鸣耳聋有个死穴,只需点通一处,就能防治耳鸣耳三伏天的饮食营养三伏天的持续高温使人体(尤其是室外工作人员),在生理生化以及代谢等方面均出现明显改变,营养素代谢与营养素需要也发生变化,如饮食营养调理跟不上将直接影响到人的健康。一高温对能量和营养解散模式!全队大清洗!10人离开!3人退役!再见,扣篮王CBA近期发生了众多变革的事情,先是郭艾伦主动申请离队,随后湾区翼龙的异军突起,收刮了众多CBA旧将,一时CBA有点风云突变的感觉,但是要说球员流动,还属青岛队。要说本赛季调整力度中国研发女性机器人,比日本的要先进!尤其是这项功能中国研发的女性机器人比日本先进太多了,尤其是这项服务,在单身男性的圈子炸开了锅。(此处已添加小程序,请到今日头条客户端查看)众所周知,各国都在不断突破智能领域,研发了很多智能机器人液体黄金橄榄油,这五类人群尤其需要它,来看看它的五大功能为什么明星超模营养师养生达人都超级爱吃橄榄油橄榄油富含抗氧化物质,可以消除活性氧自由基这个美容大敌只要把食用油换成橄榄油,每日使用1。52大匙的量,就能找回苗条的S曲线不易复胖,让4个不忍心卸载的国产软件,免费又实用,功能强大到离谱闲话少说,实干见真章。1轻启动(去广告神器)打开手机APP经常遇到启动页广告,不仅拖慢打开速度,误点击广告后,还会跳转到别的APP,让人防不胜防,十分烦人。为了提升APP运行速度,中国科研人员首次证明环境噪音会干扰动物利用视觉线索选择配偶锯腿原指树蛙。中科院成都生物所供图中新网成都8月8日电(记者贺劭清)记者8日从中国科学院成都生物研究所(简称中科院成都生物所)获悉,中科院成都生物所动物行为与仿生项目组研究发现,噪联想预热小新PadPro2022搭载全功能USBC接口IT之家8月8日消息,据联想官方消息,即将发布的小新PadPro2022将搭载全功能USBTypeC接口。据官方介绍,小新PadPro2022的USBC接口支持USB3。2Gen1
重庆市委台办召开渝台经贸交流合作联席会助力台企高质量发展渝台经贸交流合作联席会现场(图片来源重庆市台办)中国台湾网9月30日讯9月29日,重庆市委台办组织召开渝台经贸交流合作联席会暨重点台资企业恳谈会,听取台资企业当前生产经营情况在渝发别让原则失了人情味在这个时代,你会发现程序正义,还有包括准则,都在大量地被提到,因为人们都希望获得公平获得正义。但是反过来,就是因为对于这些原则的追寻,有的时候我们往往忽略了人,他其实不是一个集合名该提前做准备了?科学家发出预警我们或许很快就会找到外星人我们自古以来就在寻找外星人,但一直没有找到突破口。然而,最近有科学家指出外星人离我们越来越近,意味着,我们即将找到外星人,这是怎么回事呢?外星生物就好像会隐身术一样,消失于人们视线地球上曾有过上千亿人,这是否意味着泥土就是由腐尸构成的?不是地球上的泥土是怎么来的,难道真的是零落成泥碾作尘,是由腐尸构成的?太阳系一共有4颗岩石星球,形成了泥土的,只有地球,其他三颗上面都是尘土。人们与泥土泥土的组成部分为矿物质有机质水和地质年代的划分看关于史前的事,比如生物进化恐龙冰河等,常会遇到寒武纪侏罗纪三叠纪等名词,这涉及地质年代的问题。所谓地质年代,是指地球上的岩石和地层的形成时间。一般认为原始地球形成于46亿年前,以折叠屏真香,但为啥你还不买?关于折叠屏手机(仅限横向折叠),Michael其实从三星的Fold一代(于2019年发布)已经开始关注了,并持续关注了4年(没办法,疫情影响,穷了足足4年捂脸),期间都或多或少地接无墨打印更省心,喵喵机学习打印机F2S让学生又爱又恨的礼物在大部分人的印象中,打印机作为计算机的主要输出设备之一,一直存在体积庞大功能单一且操作繁琐等问题。但伴随消费升级以及技术的成熟,如今市面上也出现了一些独特的产品,比如今天本文的主角别让原则失了人情味在这个时代,你会发现程序正义,还有包括准则,都在大量地被提到,因为人们都希望获得公平获得正义。但是反过来,就是因为对于这些原则的追寻,有的时候我们往往忽略了人,他其实不是一个集合名该提前做准备了?科学家发出预警我们或许很快就会找到外星人我们自古以来就在寻找外星人,但一直没有找到突破口。然而,最近有科学家指出外星人离我们越来越近,意味着,我们即将找到外星人,这是怎么回事呢?外星生物就好像会隐身术一样,消失于人们视线地球上曾有过上千亿人,这是否意味着泥土就是由腐尸构成的?不是地球上的泥土是怎么来的,难道真的是零落成泥碾作尘,是由腐尸构成的?太阳系一共有4颗岩石星球,形成了泥土的,只有地球,其他三颗上面都是尘土。人们与泥土泥土的组成部分为矿物质有机质水和地质年代的划分看关于史前的事,比如生物进化恐龙冰河等,常会遇到寒武纪侏罗纪三叠纪等名词,这涉及地质年代的问题。所谓地质年代,是指地球上的岩石和地层的形成时间。一般认为原始地球形成于46亿年前,以