范文健康探索娱乐情感热点
投稿投诉
热点动态
科技财经
情感日志
励志美文
娱乐时尚
游戏搞笑
探索旅游
历史星座
健康养生
美丽育儿
范文作文
教案论文
国学影视

万字长文Flinkcdc源码精讲(推荐收藏)(四)

  // -------------------------   SnapshotSplitReader.submitSplit方法  ------------------------------------------ public void submitSplit(MySqlSplit mySqlSplit) {         this.currentSnapshotSplit = mySqlSplit.asSnapshotSplit();         statefulTaskContext.configure(currentSnapshotSplit);      // 拿到context的queue,在pollSplitSrecords的时候需要         this.queue = statefulTaskContext.getQueue();         this.nameAdjuster = statefulTaskContext.getSchemaNameAdjuster();         this.hasNextElement.set(true);         this.reachEnd.set(false);      // 主要读取逻辑在readTask中         this.splitSnapshotReadTask =                 new MySqlSnapshotSplitReadTask(                         statefulTaskContext.getConnectorConfig(),                         statefulTaskContext.getOffsetContext(),                         statefulTaskContext.getSnapshotChangeEventSourceMetrics(),                         statefulTaskContext.getDatabaseSchema(),                         statefulTaskContext.getConnection(),                         statefulTaskContext.getDispatcher(),                         statefulTaskContext.getTopicSelector(),                         StatefulTaskContext.getClock(),                         currentSnapshotSplit);      // 提交一个runnable到线程中,主要是执行readTask的execute方法         executor.submit(                 () -> {                     try {                         currentTaskRunning = true;                        // 自己实现的contextImpl 主要记录高水位和低水位用                         final SnapshotSplitChangeEventSourceContextImpl sourceContext =                                 new SnapshotSplitChangeEventSourceContextImpl();                        // 执行readTask                         SnapshotResult snapshotResult =                                 splitSnapshotReadTask.execute(sourceContext);                         final MySqlBinlogSplit backfillBinlogSplit =                                 createBackfillBinlogSplit(sourceContext);                         // optimization that skip the binlog read when the low watermark equals high                         // watermark                        // 如由于snapshot是并行读取的,所以当该读取该split的数据,低水位和高水位相同,说明在read数据中没有出现其他操作,所以可以退出binlog优化阶段,可以认为该split范围的数据没有变更,不需要在snapshot之后进行binlog的read                         final boolean binlogBackfillRequired =                                 backfillBinlogSplit                                         .getEndingOffset()                                         .isAfter(backfillBinlogSplit.getStartingOffset());                         if (!binlogBackfillRequired) {                             dispatchHighWatermark(backfillBinlogSplit);                             currentTaskRunning = false;                             return;                         }                         // snapshot执行完成后,开始binlogReadTask的读取操作                         if (snapshotResult.isCompletedOrSkipped()) {                            // 根据snapshot read task读取结束后,会记录高低水位,水位线作为参数构建binlog read task                             final MySqlBinlogSplitReadTask backfillBinlogReadTask =                                     createBackfillBinlogReadTask(backfillBinlogSplit);                            // 执行binlog read task,由于里面的处理逻辑太复杂了,我们就不直接进行阅读了                            // 我这里直接简单介绍一下流程,就是拿到snapshot的高水位,作为endOffset,在binlog read task中,会                            // 以endOffset作为结束条件,小宇endOffset的数据都会被read,并发送下游                             backfillBinlogReadTask.execute(                                     new SnapshotBinlogSplitChangeEventSourceContextImpl());                         } else {                             readException =                                     new IllegalStateException(                                             String.format(                                                     "Read snapshot for mysql split %s fail",                                                     currentSnapshotSplit));                         }                     } catch (Exception e) {                         currentTaskRunning = false;                         LOG.error(                                 String.format(                                         "Execute snapshot read task for mysql split %s fail",                                         currentSnapshotSplit),                                 e);                         readException = e;                     }                 });     } // -------------------------   MySqlSnapshotSplitReadTask.execute(sourceContext)方法  ------------------------------------------    @Override     public SnapshotResult execute(ChangeEventSourceContext context) throws InterruptedException {         SnapshottingTask snapshottingTask = getSnapshottingTask(previousOffset);//就是new了一个         final SnapshotContext ctx;         try {             ctx = prepare(context); //重新new了一个 context对象,比较无用         } catch (Exception e) {             LOG.error("Failed to initialize snapshot context.", e);             throw new RuntimeException(e);         }         try {            // 上面都是无用代码,这里直接调用了doExecute方法,我们进入该方法看主要逻辑即可             return doExecute(context, ctx, snapshottingTask);         } catch (InterruptedException e) {             LOG.warn("Snapshot was interrupted before completion");             throw e;         } catch (Exception t) {             throw new DebeziumException(t);         }     } // -------------------------   MySqlSnapshotSplitReadTask.doExecute(sourceContext)方法  ------------------------------------------  @Override     protected SnapshotResult doExecute(             ChangeEventSourceContext context,             SnapshotContext snapshotContext,             SnapshottingTask snapshottingTask)             throws Exception {         final RelationalSnapshotChangeEventSource.RelationalSnapshotContext ctx =                 (RelationalSnapshotChangeEventSource.RelationalSnapshotContext) snapshotContext;         ctx.offset = offsetContext;        // 一个dispatcher,用于记录水位线事件,后面会通过该dispatcher发射数据,当然是通过emitter发射了         final SignalEventDispatcher signalEventDispatcher =                 new SignalEventDispatcher(                         offsetContext.getPartition(),                         topicSelector.topicNameFor(snapshotSplit.getTableId()),                         dispatcher.getQueue());     // 其实log输出的日志就已经很清晰了        // 记录低水位         final BinlogOffset lowWatermark = currentBinlogOffset(jdbcConnection);         LOG.info(                 "Snapshot step 1 - Determining low watermark {} for split {}",                 lowWatermark,                 snapshotSplit);         ((SnapshotSplitReader.SnapshotSplitChangeEventSourceContextImpl) (context))                 .setLowWatermark(lowWatermark);         signalEventDispatcher.dispatchWatermarkEvent(                 snapshotSplit, lowWatermark, SignalEventDispatcher.WatermarkKind.LOW);         LOG.info("Snapshot step 2 - Snapshotting data");        // 读取数据  主要方法重点介绍的地方         createDataEvents(ctx, snapshotSplit.getTableId());     // 记录高水位         final BinlogOffset highWatermark = currentBinlogOffset(jdbcConnection);         LOG.info(                 "Snapshot step 3 - Determining high watermark {} for split {}",                 highWatermark,                 snapshotSplit);         signalEventDispatcher.dispatchWatermarkEvent(                 snapshotSplit, highWatermark, SignalEventDispatcher.WatermarkKind.HIGH);         ((SnapshotSplitReader.SnapshotSplitChangeEventSourceContextImpl) (context))                 .setHighWatermark(highWatermark);         return SnapshotResult.completed(ctx.offset);     }   // 我们看看createDataEvents 调用过程 private void createDataEvents(             RelationalSnapshotChangeEventSource.RelationalSnapshotContext snapshotContext,             TableId tableId)             throws Exception {         EventDispatcher.SnapshotReceiver snapshotReceiver =                 dispatcher.getSnapshotChangeEventReceiver();         LOG.debug("Snapshotting table {}", tableId);         createDataEventsForTable(                 snapshotContext, snapshotReceiver, databaseSchema.tableFor(tableId));      // receiver的逻辑我们就不看了,我这里介绍一下就好      // receiver通过changeRecord方法接收到数据后,通过一个成员变量(bufferedEvent)控制,如果!=null加入队列,然后创建一个新的SourceRecord,直到所有的数据读取完成,所以说最后一条数据创建成功之后,如果没有新的数据了,则不会调用changeRecord该方法,也就是说成员变量记录了最后一个record      // 这里调用completeSnapshot方法的时候会对bufferedEvent变量进行判断,如果不等于null做一些complete相关的工作最后加入队列中,如果不调用该方法,则当前split的snapshot阶段读取的数据少了一条,嘻嘻嘻         snapshotReceiver.completeSnapshot();     } // createDataEvents中调用到本类的createDataEventsForTable,也就是开始了具体读取逻辑     private void createDataEventsForTable(             RelationalSnapshotChangeEventSource.RelationalSnapshotContext snapshotContext,             EventDispatcher.SnapshotReceiver snapshotReceiver,             Table table)             throws InterruptedException {         long exportStart = clock.currentTimeInMillis();         LOG.info("Exporting data from split "{}" of table {}", snapshotSplit.splitId(), table.id());             // 构建sql         final String selectSql =                 StatementUtils.buildSplitScanQuery(                         snapshotSplit.getTableId(),                         snapshotSplit.getSplitKeyType(),                         snapshotSplit.getSplitStart() == null,                         snapshotSplit.getSplitEnd() == null);         LOG.info(                 "For split "{}" of table {} using select statement: "{}"",                 snapshotSplit.splitId(),                 table.id(),                 selectSql);              try (PreparedStatement selectStatement =                         StatementUtils.readTableSplitDataStatement( // 创建statement,然后查询sql                                 jdbcConnection,                                 selectSql,                                 snapshotSplit.getSplitStart() == null,                                 snapshotSplit.getSplitEnd() == null, snapshotSplit.getSplitStart(),                                 snapshotSplit.getSplitEnd(),                                 snapshotSplit.getSplitKeyType().getFieldCount(),                                 connectorConfig.getQueryFetchSize());              // 然后对查询出来的数据进行封装成sourceRecord发送下游                 ResultSet rs = selectStatement.executeQuery()) {             ColumnUtils.ColumnArray columnArray = ColumnUtils.toArray(rs, table);             long rows = 0;             Threads.Timer logTimer = getTableScanLogTimer();             while (rs.next()) {                 rows++;                 final Object[] row = new Object[columnArray.getGreatestColumnPosition()];                 for (int i = 0; i < columnArray.getColumns().length; i++) {                     Column actualColumn = table.columns().get(i);                     row[columnArray.getColumns()[i].position() - 1] =                             readField(rs, i + 1, actualColumn, table);                 }                 if (logTimer.expired()) {                     long stop = clock.currentTimeInMillis();                     LOG.info(                             "Exported {} records for split "{}" after {}",                             rows,                             snapshotSplit.splitId(),                             Strings.duration(stop - exportStart));                     snapshotProgressListener.rowsScanned(table.id(), rows);                     logTimer = getTableScanLogTimer();                 }                 // 这里会将数据放入队列,通过receiver接收数据,然后再将数据放入其队列的一个过程,其实不必深入,就是封装的比较好,难以理解                 dispatcher.dispatchSnapshotEvent(                         table.id(),                         getChangeRecordEmitter(snapshotContext, table.id(), row),// 就是new了一个                         snapshotReceiver);             }             LOG.info(                     "Finished exporting {} records for split "{}", total duration "{}"",                     rows,                     snapshotSplit.splitId(),                     Strings.duration(clock.currentTimeInMillis() - exportStart));         } catch (SQLException e) {             throw new ConnectException("Snapshotting of table " + table.id() + " failed", e);         }     } // -------------------------   dispatcher.dispatchSnapshotEvent方法之后的流程  ----------------------------------  // 进入evnentDisptcher.dispatchSnapshotEvent方法    public void dispatchSnapshotEvent(T dataCollectionId, ChangeRecordEmitter changeRecordEmitter, SnapshotReceiver receiver) throws InterruptedException {         DataCollectionSchema dataCollectionSchema = schema.schemaFor(dataCollectionId);         if (dataCollectionSchema == null) {             errorOnMissingSchema(dataCollectionId, changeRecordEmitter);         }         changeRecordEmitter.emitChangeRecords(dataCollectionSchema, new Receiver() {             @Override             public void changeRecord(DataCollectionSchema schema,                                      Operation operation,                                      Object key, Struct value,                                      OffsetContext offset,                                      ConnectHeaders headers)                     throws InterruptedException {                 eventListener.onEvent(dataCollectionSchema.id(), offset, key, value);                // 真正的放入队列的逻辑在这里调用                // receiver使我们传入的  对应BufferingSnapshotChangeRecordReceiver类                 receiver.changeRecord(dataCollectionSchema, operation, key, value, offset, headers);             }         });     }   // BufferingSnapshotChangeRecordReceiver的changeRecord方法  // 前面简单介绍过他的处理逻辑了,就不必多做介绍了   @Override         public void changeRecord(DataCollectionSchema dataCollectionSchema,                                  Operation operation,                                  Object key, Struct value,                                  OffsetContext offsetContext,                                  ConnectHeaders headers)                 throws InterruptedException {             Objects.requireNonNull(value, "value must not be null");             LOGGER.trace("Received change record for {} operation on key {}", operation, key);             if (bufferedEvent != null) {                 queue.enqueue(bufferedEvent.get());             }             Schema keySchema = dataCollectionSchema.keySchema();             String topicName = topicSelector.topicNameFor((T) dataCollectionSchema.id());             // the record is produced lazily, so to have the correct offset as per the pre/post completion callbacks             bufferedEvent = () -> {                 SourceRecord record = new SourceRecord(                         offsetContext.getPartition(),                         offsetContext.getOffset(),                         topicName, null,                         keySchema, key,                         dataCollectionSchema.getEnvelopeSchema().schema(), value,                         null, headers);                 return changeEventCreator.createDataChangeEvent(record);             };         }

满含热泪写下陕西省内那些跟大秦帝国有关的历史遗迹(2)03。奋六世余烈,威震四海,横扫天下寻找心心念念的秦东陵,后埋葬君王的芷阳陵。大体位置好找,但也非顺风顺水。跟着导航,来到一片由钢丝网围起的荒地,就没路了。然后懵逼,没有任何标识,苏格兰人的前世今生(六)罗马帝国时期公元43年,罗马皇帝克劳狄乌斯御驾亲征带领罗马军团(另一种说法是派遣奥留斯普劳提乌斯统领4个军团和辅助部队)侵入不列颠。随后不列颠岛上那些不愿被降伏的当地人便都躲入了爱尔兰以及北方当世界进入足球时间,为何要讨论电视画质?新一届世界杯在卡塔尔开幕后,世界再一次进入到足球时间。早在四年前的俄罗斯世界杯上,卫星运营商Intelsat就与拉美广播商Globo合作,进行了世界杯8K实时视频传输演示,将远在俄葡萄牙20,C罗展现球王风度拒绝庆祝不贪功!第1时间拥抱B费2022年世界杯小组赛H组第2轮,最终,葡萄牙20乌拉圭,锁定胜局!小组出线了!有一说一,赛后评分C罗的6。8分,给的偏低了一点,分还可以再高一点。不说第一个球有他跳起晃了门将的功离2023年只有一个月的时间了,你年初的计划实现了吗?还记得年初的计划吗?乍一看,现在已经11月过了一大半了,马上就要12月份了。也就是说离2023年只有仅仅一个多月的时间了。回头看看你年初的计划实现了吗?实现了几个?我很遗憾地说我可送你一张专克类风湿性关节炎的方子,除陈寒,消疼痛开门见山,今天我们来聊一聊类风湿性关节炎。最近气温骤降,绵绵细雨一直不断,不少人的类风湿性关节炎已经憋不住,也想出来活动筋骨了吧?人们都说患有类风湿性关节炎的人就像行走的天气预报,手机出货量下滑,三大市场变化值得关注中新网12月1日电(中新财经记者吴涛)你有多久没买新手机了?中国信通院最新的数据显示,2022年9月,国内市场手机出货量为2092。2万部,同比下降2。4。1月至9月份,出货量为12023年半导体市场最高降幅3。6台积电明年产能将下降芯闻速递两分钟了解芯片大事2023年全球半导体市场规模最高降幅3。6根据市场调查机构Gartner公布的最新报告,全球半导体市场在2021年创造26。3增速之后,2022年增幅将缩小至4,英国买房出租5大市场(系列4)利兹作为英国NorthernPowerhouse(北部振兴计划)的主要参与者之一,利兹是英国最强大的经济体之一,也是英国令人印象深刻的房地产市场。该市是英国人口第三多的城市,有80多万如果你的宝宝有这6个表现,那就要多喝水了宝宝不爱喝水是很多家长都发愁的事情,有的宝宝是不爱喝水,有的是已经喝够了,如果你不知道该怎么判断宝宝喝没喝够的话,可以根据以下6个表现进行参考。如果你家宝宝有以下6个表现的话,那就创业新风向享帮米开拓万亿级本地生活服务市场本地生活服务是一个空间巨大的市场,前瞻产业研究院数据显示,2021年,中国互联网本地生活服务行业市场规模达到2。6万亿元,同比增速为15。1到2025年,其市场规模有望达到4万亿元
Brume2,搭载联发科MT7981BSoC的OpenWrt安全网关文章来源Brume2,搭载联发科MT7981BSoC的OpenWrt安全网关支持WireGuardVPNCNXSoftware中文站ZhihuWeChatSinaWeiboTwit国内最年轻的女院士,有颜值有才华的任咏华,是科研领域的明星文丨小辰小时候,她因温度计中的水银喜欢上了化学长大后,她总会在寂静的夜晚观赏城市五颜六色的光芒,审视自己研究的内容任咏华,出生于中国香港,无机化学家,中国科学院院士发展中国家科学院11月22日起,无锡通行政策再调整今天记者从无锡公安交警支队了解到继去年11月市区对新能源货车释放出友好信号之后今年货车通行政策将进一步调整11月22日起从车型路段和时段等多方面放宽城市配送货运车辆市区通行限制01专精特新埃斯顿国产工业机器人国家专精特新小巨人2022年1月25日,埃斯顿获得国家级专精特新小巨人称号,是第四批获得此称号的公司之一。公司成立于2002年,于2015年深交所成功上市。埃斯顿专注于高端智能装备及其核心控制和功能胡锡进收割流量,没有心理包袱一年一度的互联网大会如期进行,作为网络科技界的盛宴,以往都是大佬云集的场面,不过今年略显平淡,没有多少让大众熟悉的面孔,最让人熟悉的可能就是胡锡进了。胡锡进是谁?这个不用过多介绍了对歌尔股份感兴趣可以看看我总结的相关消息歌尔是最新款airportpro2的设计和制造者,有些配件还是独供,为什么只有3040份额呢?本来这种设计NPI是要拿大头的,苹果压价歌尔不肯,然后立讯跳出来以白菜价抢到了60的订等不了新机的朋友看过来,这3部轻松用到2027年,最低1988到手等不了新机的朋友看过来,这3部轻松用到2027年,最低1988到手一vivoX80vivo官宣将于11月20日发布新机X90系列,不过X90曝光后倒是引发很多热议,不少朋友说X90华为Mate50Pro从加价800到降500经历了什么?学苹果坑黄牛?或许华为也没有想到,时隔三年,又卷土重来,特别是在华为mate50pro上,考虑得并不充分,或者说根本想不到,华为mate50pro会如此热卖,导致发布后一直处于缺货状态。由于产品手机厂商修炼内功智能制造助力成本下降40可否杀出血路?每经记者王晶每经编辑杨夏全球智能手机市场已经从蓝海转化为红海,厂商的竞争在不断加剧。从IDC发布的2022年三季度市场份额数据来看,vivo(20)荣耀(18)OPPO(16)苹果OPPO双向扩展柔性屏专利获授权新京报贝壳财经讯11月16日,企查查APP显示,近日,OPPO广东移动通信有限公司柔性显示装置以及电子设备专利获授权。企查查专利摘要显示,本申请提供了一种柔性显示装置,包括柔性屏卷华为Mate40翻新版手机开售,搭载海思麒麟9000芯片,是否值得购买华为Mate40系列手机的翻新版本已经正式开售,这款手机保留了Mate40手机的原有配件,并且还搭载华为自主研发的海思麒麟9000芯片,但售价有些偏高,那么这款手机是否值得购买呢?