环境
- flink 1.13.6
- hudi 0.11.0
- merge on read 表
代码示例
tEnv.executeSql(“CREATE TABLE tb_person_hudi ( id BIGINT, age INT, name STRING,create_time TIMESTAMP ( 3 ), time_stamp TIMESTAMP(3),PRIMARY KEY ( id ) NOT ENFORCED ) WITH (” + “‘connector’ = ‘hudi’,” + “‘table.type’ = ‘MERGE_ON_READ’,” + “‘path’ = ‘file:///D:/data/hadoop3.2.1/warehouse/tb_person_hudi’,” + “‘read.start-commit’ = ‘20220722103000’,” + // “‘read.end-commit’ = ‘20220722104000’,” + “‘read.task’ = ‘1’,” + “‘read.streaming.enabled’ = ‘true’,” + “‘read.streaming.check-interval’ = ’30’ ” + “)”);Table table = tEnv.sqlQuery(“select * from tb_person_hudi “);tEnv.toChangelogStream(table).print().setParallelism(1);env.execute(“test”);
流程分析
hudi源入口(HoodieTableSource)
HoodieTableSource实现ScanTableSource,SupportsPartitionPushDown,SupportsProjectionPushDown,SupportsLimitPushDown,SupportsFilterPushDown接口,后4个接口主要是支持对查询计划的优化。ScanTableSource则提供了读取hudi表的具体实现,核心方法为org.apache.hudi.table.HoodieTableSource#getScanRuntimeProvider:
if (conf.getBoolean(FlinkOptions.READ_AS_STREAMING)) { //开启了流式读(read.streaming.enabled) StreamReadMonitoringFunction monitoringFunction = new StreamReadMonitoringFunction( conf, FilePathUtils.toFlinkPath(path), maxCompactionMemoryInBytes, getRequiredPartitionPaths()); InputFormat inputFormat = getInputFormat(true); OneInputStreamOperatorFactory factory = StreamReadOperator.factory((MergeOnReadInputFormat) inputFormat); SingleOutputStreamOperator source = execEnv.addSource(monitoringFunction, getSourceOperatorName(“split_monitor”)) .setParallelism(1) .transform(“split_reader”, typeInfo, factory) .setParallelism(conf.getInteger(FlinkOptions.READ_TASKS)); return new DataStreamSource(source);}
上面代码在流环境中创建了一个SourceFunction(StreamReadMonitoringFunction)和一个自定义的转换(StreamReadOperator)
- StreamReadMonitoringFunction: 监控hudi表元数据目录(.hoodie)获取需要被读取的文件分片(MergeOnReadInputSplit,一个base parquet文件和一组log文件),然后把分片递给下游的转换算子StreamReadOperator进行文件读取;固定一个线程去监控,名称为split_monitorxxxxx.
- StreamReadOperator:将按timeline升序收到的MergeOnReadInputSplit一个一个地读取分片数据;算子名称为split_reader->xxxxx,可以通过设置read.tasks进行设置并行度
定时监控元数据获得增量分片(StreamReadMonitoringFunction)
StreamReadMonitoringFunction负责定时(read.streaming.check-interval)扫描hudi表的元数据目录.hoodie,如果发现在active timeline上有新增的instant[action=commit,deltacommit,compaction,replace && active=completed],从这些instant信息中可以知道数据变更写到了哪些文件(parquet,log),然后构建成分片对象(MergeOnReadInputSplit)。
- 核心属性:issuedInstant,这个是增量查询的依据,记录着当前已经消费的数据的最新instant,类似于kafka的offset,但是hudi是基于timeline.该值是有状态的,维护在ListState中,所以flink job重启依然可以做到增量。
- 核心方法:StreamReadMonitoringFunction#monitorDirAndForwardSplits,很简单,就做了两件事,调用IncrementalInputSplits#inputSplits获取到增量分片(有序),然后传递给下游的算子(StreamReadOperator)
public void monitorDirAndForwardSplits(SourceContext context) { HoodieTableMetaClient metaClient = getOrCreateMetaClient(); IncrementalInputSplits.Result result = incrementalInputSplits.inputSplits(metaClient, this.hadoopConf, this.issuedInstant); for (MergeOnReadInputSplit split : result.getInputSplits()) { context.collect(split); }}
获取增量分片(IncrementalInputSplits)
主要逻辑在方法IncrementalInputSplits#inputSplits(metaClient, hadoopConf, issuedInstant),需要先了解hudi关于timeline和instant的一些基本概念,详细的流程如下图所示:
如果flink job首次运行指定了read.start-commit和read.end-commit,但是该范围是比较久以前,instant已经被归档,那么流作业将永远不能消费到数据
https://github.com/apache/hudi/issues/6167
读取数据文件(StreamReadOperator)
StreamReadOperator算子接收分片后会缓存在队列Queue splits,然后不停从队列中poll分片放到线程池中执行
private void processSplits() throws IOException { format.open(split); consumeAsMiniBatch(split); enqueueProcessSplits(); }
主要有三个步骤: