范文健康探索娱乐情感热点
投稿投诉
热点动态
科技财经
情感日志
励志美文
娱乐时尚
游戏搞笑
探索旅游
历史星座
健康养生
美丽育儿
范文作文
教案论文
国学影视

阿里面试,Flink向Kafka中写入数据,如何保证数据的一致性

  这是一个朋友半个月前,去阿里面试的面试题。关于这道题的答案,我已经录制成了视频,欢迎朋友们观看。不喜欢看视频的朋友,也可以看看这篇文章。
  阿里面试,Flink向Kafka中写数据,是如何保证数据一致性的FlinkKafkaProducer的事务一致性
  我们都知道Kafka是非常受欢迎的分布式消息系统,在Flink应用中,Kafka既可以作为Source,同时也可以作为Sink。从Kafka0.11.0及以上版本就提供了对事务的支持,而Flink从Flink1.7及以上版本,就通过FlinkKafkaProducer实现了两阶段事务提交的能力。
  FlinkKafkaProducer作为Kafka的Sink,之所以能够实现两阶段提交,主要是它继承了抽象类TwoPhaseCommitSinkFunction,根据这个名字就能知道这个抽象类实现了两阶段提交。 @PublicEvolving public class FlinkKafkaProducer         extends TwoPhaseCommitSinkFunction<                 IN,                 FlinkKafkaProducer.KafkaTransactionState,                 FlinkKafkaProducer.KafkaTransactionContext> { }
  FlinkKafkaProducer要实现一致性,必然离不开TwoPhaseCommitSinkFunction实现了两个接口,一个是 CheckpointedFunction,另外一个是CheckpointListener。@PublicEvolving public abstract class TwoPhaseCommitSinkFunction extends RichSinkFunction         implements CheckpointedFunction, CheckpointListener { }
  下面我们具体看看TwoPhaseCommitSinkFunction是如何工作的。TwoPhaseCommitSinkFunction
  TwoPhaseCommitSinkFunction保留了5个函数需要子类去实现, invoke()
  FlinkKafkaProducer在接收到每一条一条消息时,都会调用invoke()方法,在invoke()方法中,会将输入的消息(next)封装成ProducerRecord,然后调用Kafka客户端的KafkaProducer.send方法,将ProducerRecord 对象写入缓冲。public void invoke(             FlinkKafkaProducer.KafkaTransactionState transaction, IN next, Context context)             throws FlinkKafkaException {         checkErroneous();          ProducerRecord record;         if (keyedSchema != null) {             byte[] serializedKey = keyedSchema.serializeKey(next);             byte[] serializedValue = keyedSchema.serializeValue(next);             String targetTopic = keyedSchema.getTargetTopic(next);             if (targetTopic == null) {                 targetTopic = defaultTopicId;             }              Long timestamp = null;             if (this.writeTimestampToKafka) {                 timestamp = context.timestamp();             }              int[] partitions = topicPartitionsMap.get(targetTopic);             if (null == partitions) {                 partitions = getPartitionsByTopic(targetTopic, transaction.producer);                 topicPartitionsMap.put(targetTopic, partitions);             }             if (flinkKafkaPartitioner != null) {                 record =                         new ProducerRecord<>(                                 targetTopic,                                 flinkKafkaPartitioner.partition(                                         next,                                         serializedKey,                                         serializedValue,                                         targetTopic,                                         partitions),                                 timestamp,                                 serializedKey,                                 serializedValue);             } else {                 record =                         new ProducerRecord<>(                                 targetTopic, null, timestamp, serializedKey, serializedValue);             }         } else if (kafkaSchema != null) {             if (kafkaSchema instanceof KafkaContextAware) {                 @SuppressWarnings("unchecked")                 KafkaContextAware contextAwareSchema = (KafkaContextAware) kafkaSchema;                  String targetTopic = contextAwareSchema.getTargetTopic(next);                 if (targetTopic == null) {                     targetTopic = defaultTopicId;                 }                 int[] partitions = topicPartitionsMap.get(targetTopic);                  if (null == partitions) {                     partitions = getPartitionsByTopic(targetTopic, transaction.producer);                     topicPartitionsMap.put(targetTopic, partitions);                 }                  contextAwareSchema.setPartitions(partitions);             }             record = kafkaSchema.serialize(next, context.timestamp());         } else {             throw new RuntimeException(                     "We have neither KafkaSerializationSchema nor KeyedSerializationSchema, this"                             + "is a bug.");         }          pendingRecords.incrementAndGet();         transaction.producer.send(record, callback);     }
  数据写入缓冲池beginTransaction()、preCommit()、commit()和abort()这四个方法,定义了两阶段提交协议的几个步骤。对于Kafka这种外部系统来说,就是通过调用两阶段提交协议对于的函数实现数据写入功能。状态快照处理
  一 在preCommit()函数中,调用 KafkaProducer 客户端 flush 方法,将缓冲区内全部记录发送到 Kafka, 但不提交 。
  TwoPhaseCommitSinkFunction里面 的preCommit()函数是在snapshotState方法中调用的,而snapshotState方法是在算子Checkpoint的时候触发的。这样就保证了算子在做Checkpoint时,所有该Checkpoint之前的数据都已经安全的发送到了下游(而不是在缓存中),如下图所示。 这些记录会写到Kafka的Topic-A中,此时数据的隔离级别是UNCOMMITED,如果有一个Topic-A的消费者程序,正在消费这个Topic中的数据,并且设置的 isolation.level=read_committed,那么此时这个消费端还无法 poll 到 flush 的数据,因为这些数据尚未 commit。具体什么时候commit呢? 在快照结束处理阶段会commit,我们稍后会讲到这一点。
  Flush数据到Kafka
  二 将本轮的CheckPointId与当前事务记录到一个 Map 表示的待提交事务集合中。
  在Flush完数据之后,Flink会将CheckPointID与当前事务记录到一个Map中,这个Map表示的待提交事务集合,key 是当前快照的 CheckpointId,value 是由 TransactionHolder 表示的事务对象。TransactionHolder 对象内部记录了 transactionalId、producerId、epoch 以及 Kafka 客户端 kafkaProducer 的引用。
  记录到Map中
  那么,这个TransactionID是怎么产生的呢?Flink用一个队列作为transactional id的Pool,新的Transaction开始时从队头拿出一个transactional id,Transaction结束时将transactional id放回队尾。 private FlinkKafkaInternalProducer createTransactionalProducer()             throws FlinkKafkaException {         String transactionalId = availableTransactionalIds.poll();         if (transactionalId == null) {             throw new FlinkKafkaException(                     FlinkKafkaErrorCode.PRODUCERS_POOL_EMPTY,                     "Too many ongoing snapshots. Increase kafka producers pool size or decrease number of concurrent checkpoints.");         }         FlinkKafkaInternalProducer producer =                 initTransactionalProducer(transactionalId, true);         producer.initTransactions();         return producer;     }  private void recycleTransactionalProducer(FlinkKafkaInternalProducer producer) {         availableTransactionalIds.add(producer.getTransactionalId());         producer.flush();         producer.close(Duration.ofSeconds(0));     }
  三 持久化当前事务处理状态,也就是将当前处理的事务详情存入状态后端,供应用恢复时使用
  快照持久化快照结束处理
  TwoPhaseCommitSinkFunction 实现了 CheckpointListener,应用中所有算子的快照处理成功后会收到基于某轮 Checkpoint 完成的通知。
  FlinkKafkaProducer#commit()方法,这个方法实际上是代理了KafkaProducer.commitTransaction()方法,正式向Kafka提交事务。 protected void commit(FlinkKafkaProducer.KafkaTransactionState transaction) {         if (transaction.isTransactional()) {             try {                 transaction.producer.commitTransaction();             } finally {                 recycleTransactionalProducer(transaction.producer);             }         }     }
  该方法的调用点位于TwoPhaseCommitSinkFunction.notifyCheckpointComplete()方法中。顾名思义,当所有检查点都成功完成之后,会回调这个方法。
  该方法每次从正在等待提交的事务句柄中取出一个,校验它的检查点ID,并调用commit()方法提交。
  根据CheckPointId的值,到"待提交事务中",获取到事务Id之后,就可以提交事务了。而具体要提交哪些事务,是从上一阶段生成的待提交事务集合中获取到的。
  在下图中的第⑫步执行成功后,flush 到 Kafka 的数据从 UNCOMMITTED 变更为 COMMITTED,这意味着此时消费端可以 poll 到这批数据了。
  提交事务总结
  Flink借用两阶段提交原理,在加上Kafka的事务特性实现了,Flink向Kafka发送数据的端到端Exactly_Once。
  而(2PC)两阶段提交的第一个阶段就是通过Kafka的事务初始化、事务开启、flush()等操作进行预提交事务;第二个阶段提交就是通过Kafka的commit操作来真正执行事务提交。
  在了解了两阶段提交原理之后,可以很快的将这种方案套用到,其他支持事务的外部存储或消息队列中。
  希望这篇文章能起到抛砖引玉的作用,欢迎大家评论、转发!

购买二手iPad如何验机,妈妈再也不用担心我买到炸弹机了购买二手iPad如何验机?预算不够,想入手一台二手的iPad,却又不知道如何才能买到放心的机器,到底该怎么办,宝宝们不要发愁,作为一名资深的验机侦探,简单五步教会大家,如何挑选到优宝宝语言发育规律语言是人类最重要的交际工具,是人们进行沟通交流的各种表达方式。那宝宝的语言发育有什么规律呢,一起来看看吧。儿童语言的发展2个月可发出几个单元音(aiO等),能与成人交流发音。4个月天使降临在人间早上陪儿子的朋友媳妇去了趟省妇幼保健院做了个产前检查,医院里面的人真多,大多数的人都是孕妇及其家属,大家都是怀着同样的心情,盼望着生一个健康聪明的宝宝!或许可能是因为,这里是孕育下为什么要把孩子送去上托育?托育泛指3岁以下的婴幼儿照护。这些年国家出台一系列文件,鼓励社会各界发展托育事业。托育也被明确列入十四五规划目标,属于重要的民生问题。各地区的托位数也成为当地政府绩效考核的重要指标宝宝不穿袜子和穿袜子,体质不一样,区别越大越明显关于孩子是不是应该光着脚丫,好多家长都是抱着矛盾的心态。年轻人这方面还挺心大的,也不觉得光着脚有什么不好。但长辈们一般都受不了,觉得让孩子光着脚丫会脚底着凉。由此产生的育儿矛盾还真川中日月长到了四川,就要有闲有心去欣赏,贴根贴地学坚强川中日月长说来很惭愧,我第一次去四川,竟然是2008年抗震时跟着去的。虽然之后还去过两三次,总是走马观花,浮光掠影,不敢说去过什么地方。蜀山蜀水天地幽,那里有太多的地方,等着我去好烧烤豆腐干烧烤系列的小吃现在的孩子都很喜欢吃,也很对大人的胃口,不然外面的烧烤也不会有那样多的人去吃了。我们出去耍都会买几包豆腐干,基本是旅行的必备零食,至少我家就是。其实自己在家完全能够做青海最著名的六大特色美食,每一种都让人垂涎欲滴,你吃过几种?青海地域辽阔,是中国面积第四大省份,生态多样,有雪山草原沙漠,还有中国第一大咸水湖三江源,自然风光雄奇壮美,物产丰富,加上民族文化的多样,孕育了多种多样的饮食文化,特色美食品种繁多红米K50Ultra再次曝光,天玑骁龙双版本,影像系统是看点红米可以说为性价比手机厂商鼻祖,在国内优先带来了性价比手机概念,凭借优秀的性能,亲民的价格,红米在中低端手机市场表现极佳。不过随着技术的进一步发展,红米也开启逐步冲击旗舰手机市场,想暑期换机畅玩游戏?除了散热背夹,这些系统设置也有效果毕业季正好碰上了夏天,相信有不少同学准备换台手机打游戏了吧?不过要想手机在炎炎夏天畅玩游戏的话,除非待在空调房里,不然没等玩几分钟,手机准会发烫。手机发烫不仅会严重影响握持手感,更华为Mate50再次被确认,称其搭载麒麟9000S,其真实情况如何?有博主爆料有可能有麒麟9000s,是麒麟9000的超频版本,功耗控制还可以,不过库存有限。华为Mate50系列会有三款机型,不过都是4G,除了可能有麒麟9000s外,肯定有骁龙8G
王林平随笔四方食事不过一碗人间烟火四方食事,不过一碗人间烟火,人间烟火,不过一处屋檐下的繁衍生息,繁衍生息,又幻化成了人间烟火,事实就是如此。家附近就有这样一处人间烟火的好去处,那就是南锣鼓巷。好久不去南锣了,因为白雪圣诞节白雪华灯,天地圣洁,人间美好,万民平安!白雪圣诞节哈瑟尔特圣诞街景海边的风毕竟湿润些,海上吹来的冬风虽然也很长,吹过的雪却是软软的。在那个飘着雪花和想象,空中有些莫名躁动的日子,我们在西欧走马观花式地过了一个圣诞节。奥斯小巧便携,露营装备好物,sanag塞那M13SPro音响体验现在,露营逐渐成为一种新的时尚,在繁忙的工作过后,利用闲暇之余去外面看看,享受享受大自然,也是非常不错的!而在这惬意的时间里面,怎么能少得了音乐的陪伴,音乐是治愈一切的良药,而在露内向型性格的人如何实现人生突破?我两个哥哥家都是男孩,同岁,可是性格差别很大。一个外向,擅长也乐于和别人交往,是通俗意义上的社会人类型。另一个比较内向,文文弱弱的,不太擅长与人交往。这让他妈妈很担心,害怕孩子这种吃鸡,把人生玩明白了1hr周末不卷,随手写点。最近吃鸡玩儿的比较多。越玩儿,越觉得吃鸡这种大逃杀游戏,不是一个简单的游戏,而是真实人生的模拟器。你在人生中遇到的大部分事情,决策,幸运,倒霉,在吃鸡里都岁末,半盏浊酒敬时光与往事时光荏苒,岁月如梭。透过指缝,昏暗的灯光折射在所剩无几且有些皱巴巴的日历上,犹如这平淡无奇的日子,有些单薄与无助,略显尴尬。起身,驻足而立,感慨岁月悠然,又好像一切是那样地顺其自然成功而美好的人生不是等靠要来的,是靠实干奋斗和打拼出来的人生在世,谁都想拥有成功而美好的人生。但成功而美好的人生不是等来的靠来的要来的,而是靠自己实干奋斗和打拼出来的。逆水行舟,不进则退。勤奋努力,好运自来。只有积极进取勤奋努力的人,才超光速时光机时间旅行我们可以在宇宙中弱相互作用力中产生始终以超光速运动的超光速粒子,原则上可以用在弱相互作用力中产生的超光速粒子制造进行时间旅行的超光速时光机。光速可以超越吗?超光速可以进行时穿越时光隧道,感受湘西凤凰古城神秘的夜色夜幕降临,华灯初上,古色古香的古城脱去白天青白灰色的外衣,换上五彩斑斓的盛装,迎接夜游客人的光临。图花滩河边图花滩河边我不顾白天走街串巷的辛劳,吃完晚餐,背着相机第一站就爬到客栈的3d云渲染网渲农场是什么?有哪些优点现在越来越多的设计师开始使用云渲染了,云渲染是什么呢?有哪些优点?今天我们就来看看吧。首先我们来了解下云渲染。对于设计师来说,渲染是所有工作的最后一环,在以前,大部分设计师都是用本科学家为什么要复活远古病毒?近日,一群科学家成功复活48500年前的史前巨型病毒的消息震惊了全世界。如今新冠肆虐全球,那么为何科学家们还要复活史前巨型病毒呢?这些病毒真的对人类没有伤害吗?据报道称,该团队成功