flink流式增量查询hudi表流程分析

flink流式增量查询hudi表流程分析

环境

  • flink 1.13.6
  • hudi 0.11.0
  • merge on read 表

代码示例

tEnv.executeSql(“CREATE TABLE tb_person_hudi ( id BIGINT, age INT, name STRING,create_time TIMESTAMP ( 3 ), time_stamp TIMESTAMP(3),PRIMARY KEY ( id ) NOT ENFORCED ) WITH (” + “‘connector’ = ‘hudi’,” + “‘table.type’ = ‘MERGE_ON_READ’,” + “‘path’ = ‘file:///D:/data/hadoop3.2.1/warehouse/tb_person_hudi’,” + “‘read.start-commit’ = ‘20220722103000’,” + // “‘read.end-commit’ = ‘20220722104000’,” + “‘read.task’ = ‘1’,” + “‘read.streaming.enabled’ = ‘true’,” + “‘read.streaming.check-interval’ = ’30’ ” + “)”);Table table = tEnv.sqlQuery(“select * from tb_person_hudi “);tEnv.toChangelogStream(table).print().setParallelism(1);env.execute(“test”);

流程分析

hudi源入口(HoodieTableSource)

HoodieTableSource实现ScanTableSource,SupportsPartitionPushDown,SupportsProjectionPushDown,SupportsLimitPushDown,SupportsFilterPushDown接口,后4个接口主要是支持对查询计划的优化。ScanTableSource则提供了读取hudi表的具体实现,核心方法为org.apache.hudi.table.HoodieTableSource#getScanRuntimeProvider:

if (conf.getBoolean(FlinkOptions.READ_AS_STREAMING)) { //开启了流式读(read.streaming.enabled) StreamReadMonitoringFunction monitoringFunction = new StreamReadMonitoringFunction( conf, FilePathUtils.toFlinkPath(path), maxCompactionMemoryInBytes, getRequiredPartitionPaths()); InputFormat inputFormat = getInputFormat(true); OneInputStreamOperatorFactory factory = StreamReadOperator.factory((MergeOnReadInputFormat) inputFormat); SingleOutputStreamOperator source = execEnv.addSource(monitoringFunction, getSourceOperatorName(“split_monitor”)) .setParallelism(1) .transform(“split_reader”, typeInfo, factory) .setParallelism(conf.getInteger(FlinkOptions.READ_TASKS)); return new DataStreamSource(source);}

上面代码在流环境中创建了一个SourceFunction(StreamReadMonitoringFunction)和一个自定义的转换(StreamReadOperator)

  • StreamReadMonitoringFunction: 监控hudi表元数据目录(.hoodie)获取需要被读取的文件分片(MergeOnReadInputSplit,一个base parquet文件和一组log文件),然后把分片递给下游的转换算子StreamReadOperator进行文件读取;固定一个线程去监控,名称为split_monitorxxxxx.
  • StreamReadOperator:将按timeline升序收到的MergeOnReadInputSplit一个一个地读取分片数据;算子名称为split_reader->xxxxx,可以通过设置read.tasks进行设置并行度

定时监控元数据获得增量分片(StreamReadMonitoringFunction)

StreamReadMonitoringFunction负责定时(read.streaming.check-interval)扫描hudi表的元数据目录.hoodie,如果发现在active timeline上有新增的instant[action=commit,deltacommit,compaction,replace && active=completed],从这些instant信息中可以知道数据变更写到了哪些文件(parquet,log),然后构建成分片对象(MergeOnReadInputSplit)。

  • 核心属性:issuedInstant,这个是增量查询的依据,记录着当前已经消费的数据的最新instant,类似于kafka的offset,但是hudi是基于timeline.该值是有状态的,维护在ListState中,所以flink job重启依然可以做到增量。
  • 核心方法:StreamReadMonitoringFunction#monitorDirAndForwardSplits,很简单,就做了两件事,调用IncrementalInputSplits#inputSplits获取到增量分片(有序),然后传递给下游的算子(StreamReadOperator)

public void monitorDirAndForwardSplits(SourceContext context) { HoodieTableMetaClient metaClient = getOrCreateMetaClient(); IncrementalInputSplits.Result result = incrementalInputSplits.inputSplits(metaClient, this.hadoopConf, this.issuedInstant); for (MergeOnReadInputSplit split : result.getInputSplits()) { context.collect(split); }}

获取增量分片(IncrementalInputSplits)

主要逻辑在方法IncrementalInputSplits#inputSplits(metaClient, hadoopConf, issuedInstant),需要先了解hudi关于timeline和instant的一些基本概念,详细的流程如下图所示:

如果flink job首次运行指定了read.start-commit和read.end-commit,但是该范围是比较久以前,instant已经被归档,那么流作业将永远不能消费到数据

https://github.com/apache/hudi/issues/6167

读取数据文件(StreamReadOperator)

StreamReadOperator算子接收分片后会缓存在队列Queue splits,然后不停从队列中poll分片放到线程池中执行

private void processSplits() throws IOException { format.open(split); consumeAsMiniBatch(split); enqueueProcessSplits(); }

主要有三个步骤

  • 从队列中peek分片,调用MergeOnReadInputFormat.open构建迭代器,迭代器是用来进行文件的数据读取,一个迭代器对应一个分片(多个物理文件,base+log),对应不同读取的场景,有几种迭代器:BaseFileOnlyFilteringIterator,BaseFileOnlyIterator,LogFileOnlyIterator,MergeIterator,SkipMergeIterator
  • 微批量消费,每批只读2048记录,将把记录传递给下游的算子消费同时标记消费的总数,如果该分片读到了尾,则将该分片从队列中弹出,并关闭MergeOnReadInputFormat
  • 继续处理队列中的分片,回到步骤1,如果上一次的分片没消费完,那么本次循环将继续消费,只不过是由另一个线程处理。
  • 郑重声明:本文内容及图片均整理自互联网,不代表本站立场,版权归原作者所有,如有侵权请联系管理员(admin#wlmqw.com)删除。
    (0)
    用户投稿
    上一篇 2022年7月24日
    下一篇 2022年7月24日

    相关推荐

    • 经济日报携手京东发布数据-水饮市场 健康是关键

      数据来源 京东消费及产业发展研究院 “喝水”日益精细化 “喝水”看似是件小事,背后却隐藏着巨大的消费市场。目前,水饮行业延续了蓬勃发展态势,由消费需求带动的产品创新驱动整个行业持续…

      2022年8月14日
    • 直播背后的原理是?初识视频流协议 HLS 和 RTMP

      HTTP Live Streaming (HLS) HTTP Live Streaming 简称为 HLS, 是一个基于 HTTP 的视频流协议,由 APPLE 公司提出和实现。苹…

      2022年6月25日
    • 核心产业规模超4000亿企业数量逾3000家

      在广州某行业展会,厂商展出人工智能设备。 IC供图 工信部日前公布的数据显示,我国智能产业规模持续壮大。据测算,我国人工智能核心产业规模超过4000亿元,企业数量超过3000家,初…

      2022年8月16日
    • 最小二乘法公式推导过程

      假设现在有n对坐标系中的点 现在要做k阶多项式拟合,多项式函数如下 将已知的观测点数据代入上述公式得到如下n组等式: …… 最小二乘法(又称最小平方法)是一…

      2022年6月28日
    • 纯干货 – 巨详细化妆步骤,手残党赶紧进来抄功课

      Hi~美少女们大家好呀~ 后台总是有新手宝宝给我们留言说,学化妆的时候总是会遇到各种各样的问题: 新手学化妆怎么入门?要买哪些化妆品?有些化妆教程步骤太复杂,对新手来说不友好,适合…

      2022年8月17日
    • 经济环境回暖 数字经济将迎来更快发展

      来源:【消费日报】 本报讯 在2022全球数字经济大会上,国家工业信息安全发展研究中心发布《2022年数字经济形势研判》报告(以下简称“报告”),报告指出,2022年以来,受疫情影…

      2022年8月18日
    • 扬杰科技:上半年净利同比预增50%-80%

      【扬杰科技:上半年净利同比预增50%-80%】财联社6月30日电,扬杰科技公告,预计上半年净利润5.16亿元-6.20亿元,同比增长50%-80%。报告期内,公司聚焦主业发展方向,…

      2022年7月1日
    • 可变数据软件如何生成16进制的序列号

      可变数据软件中除了可以生成常见到的流水号序列号、奇数流水号序列号、偶数流水号序列号、跳号流水号序列号之外,还可以生成2进制、10进制、16进制、33进制的序列号,本文中就主要讲的是…

      2022年8月12日
    • 周鹏离队、郭艾伦转会!辽粤成难兄难弟,CBA究竟怎么了?

      CBA休赛期劲爆消息不断,前有郭艾伦向辽宁男篮申请交易,后有篮球媒体人爆料周鹏将离开广东男篮加盟深圳男篮,一时之间,CBA休赛期打破了原来的寂静状态,而无论是郭艾伦还是周鹏,都是广…

      2022年8月5日
    • 「双语太空播报」NASA的飞马号将携阿尔忒弥斯一号火箭硬件

      NASA的Pegasus驳船为该机构位于佛罗里达州的肯尼迪航天中心绘制了航向图,其中载有航天发射系统(SLS)火箭的Artemis I核心级。核心级将提供200多万磅的推力,帮助发…

      2022年8月16日

    联系我们

    联系邮箱:admin#wlmqw.com
    工作时间:周一至周五,10:30-18:30,节假日休息