什么是Clustering
开门见山,Clustering主要有两个作用:数据小文件合并和重排序。
当数据写入Hudi表时,为了提高写入效率和存储利用率,可能会产生大量小文件。Hudi的Clustering机制允许在后台周期性地将这些小文件合并成大文件,从而减少存储碎片和元数据管理开销,提高查询性能。
Clustering过程可以重新组织和排序数据,依据用户指定的列进行排序,这样能提升相关查询的性能,比如范围扫描或者JOIN操作,通过预排序的数据,查询引擎能够更高效地处理查询请求。
本篇分析clustering的源代码,包含clustering执行计划的创建和根据计划执行clustering过程两个部分。
创建clustering执行计划
创建执行计划位于ClusteringPlanActionExecutor
类的execute
方法,代码如下所示:
@Override
public Option<HoodieClusteringPlan> execute() {
// 创建执行计划
Option<HoodieClusteringPlan> planOption = createClusteringPlan();
// 如果计划创建成功(可能存在没有file slice需要cluster的情况)
if (planOption.isPresent()) {
// 创建clustering instant
// clustering instant的类型是replace commit,意味这clustering之后的数据文件替换掉先前的
HoodieInstant clusteringInstant =
new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.REPLACE_COMMIT_ACTION, instantTime);
try {
HoodieRequestedReplaceMetadata requestedReplaceMetadata = HoodieRequestedReplaceMetadata.newBuilder()
.setOperationType(WriteOperationType.CLUSTER.name())
.setExtraMetadata(extraMetadata.orElse(Collections.emptyMap()))
.setClusteringPlan(planOption.get())
.build();
// 添加到pending commit中
table.getActiveTimeline().saveToPendingReplaceCommit(clusteringInstant,
TimelineMetadataUtils.serializeRequestedReplaceMetadata(requestedReplaceMetadata));
} catch (IOException ioe) {
throw new HoodieIOException("Exception scheduling clustering", ioe);
}
}
return planOption;
}
}
该方法创建clustering执行计划,然后再创建一个pending replace commit。因为clutering完成之后,新生成的数据文件会替换掉原有的数据文件,因此对应的commit类型为replace。
继续分析createClusteringPlan
方法。其中首先判断是否满足可执行clustering的条件,然后获取配置的clustering策略类,创建clustering计划。
Clustering并不是说每次schedule都必须要执行。为了效率clustering要求至少要经过N次commit之后,才会schedule。此限制通过配置项hoodie.clustering.inline.max.commits
或hoodie.clustering.async.max.commits
(分别对应inline和异步)来控制。如果满足clustering条件,通过hoodie.clustering.plan.strategy.class
配置的策略类生成执行计划。
代码如下所示:
protected Option<HoodieClusteringPlan> createClusteringPlan() {
LOG.info("Checking if clustering needs to be run on " + config.getBasePath());
// 获取上一次clustering对应的instant
Option<HoodieInstant> lastClusteringInstant = table.getActiveTimeline().getLastClusterCommit();
// 获取上次clustering之后提交的次数
int commitsSinceLastClustering = table.getActiveTimeline().getCommitsTimeline().filterCompletedInstants()
.findInstantsAfter(lastClusteringInstant.map(HoodieInstant::getTimestamp).orElse("0"), Integer.MAX_VALUE)
.countInstants();
// 读取hoodie.clustering.inline.max.commits配置,默认为4
// 该配置项表示在上次clustering之后至少需要经历几次commit才能schedule下一次clustering
// 这里处理inline clustering的配置
if (config.inlineClusteringEnabled() && config.getInlineClusterMaxCommits() > commitsSinceLastClustering) {
LOG.warn("Not scheduling inline clustering as only " + commitsSinceLastClustering
+ " commits was found since last clustering " + lastClusteringInstant + ". Waiting for "
+ config.getInlineClusterMaxCommits());
return Option.empty();
}
// 同上,但这里处理异步clustering的配置
// 配置项为hoodie.clustering.async.max.commits,默认值4
if ((config.isAsyncClusteringEnabled() || config.scheduleInlineClustering()) && config.getAsyncClusterMaxCommits() > commitsSinceLastClustering) {
LOG.warn("Not scheduling async clustering as only " + commitsSinceLastClustering
+ " commits was found since last clustering " + lastClusteringInstant + ". Waiting for "
+ config.getAsyncClusterMaxCommits());
return Option.empty();
}
LOG.info("Generating clustering plan for table " + config.getBasePath());
// 加载clustering策略类,对应配置项hoodie.clustering.plan.strategy.class
// 默认为SparkSizeBasedClusteringPlanStrategy
ClusteringPlanStrategy strategy = (ClusteringPlanStrategy) ReflectionUtils.loadClass(
ClusteringPlanStrategy.checkAndGetClusteringPlanStrategy(config),
new Class<?>[] {HoodieTable.class, HoodieEngineContext.class, HoodieWriteConfig.class}, table, context, config);
// 生成clustering计划
return strategy.generateClusteringPlan();
}
接着我们聚焦默认的策略SparkSizeBasedClusteringPlanStrategy
。该策略根据文件大小来决定文件数据是否参与clustering。分析clustering计划生成步骤。
generateClusteringPlan
方法位于SparkSizeBasedClusteringPlanStrategy
的父类PartitionAwareClusteringPlanStrategy
中。该方法根据hoodie.clustering.plan.strategy.partition.selected
,hoodie.clustering.plan.strategy.partition.regex.pattern
和hoodie.clustering.plan.partition.filter.mode
条件过滤出符合要求的partition path。获取它们包含的file slice。从这些file slice中筛选出小文件(小于hoodie.clustering.plan.strategy.small.file.limit
的文件)。将这些按照clutering要求的group大小(hoodie.clustering.plan.strategy.max.bytes.per.group
),分成若干个group。Group数量上限为hoodie.clustering.plan.strategy.max.num.groups
。此步骤对应小文件合并功能。
代码如下所示:
@Override
public Option<HoodieClusteringPlan> generateClusteringPlan() {
if (!checkPrecondition()) {
return Option.empty();
}
// 获取metaclient,用来操作metadata
HoodieTableMetaClient metaClient = getHoodieTable().getMetaClient();
LOG.info("Scheduling clustering for " + metaClient.getBasePath());
// 获取写配置
HoodieWriteConfig config = getWriteConfig();
// 读取配置项hoodie.clustering.plan.strategy.partition.selected
// 确定在哪些分区运行clustering
String partitionSelected = config.getClusteringPartitionSelected();
LOG.info("Scheduling clustering partitionSelected: " + partitionSelected);
List<String> partitionPaths;
// 如果没有配置
if (StringUtils.isNullOrEmpty(partitionSelected)) {
// get matched partitions if set
// 读取hoodie.clustering.plan.strategy.partition.regex.pattern配置
// 获取正则表达式匹配的partition path
partitionPaths = getRegexPatternMatchedPartitions(config, FSUtils.getAllPartitionPaths(getEngineContext(), config.getMetadataConfig(), metaClient.getBasePath()));
// filter the partition paths if needed to reduce list status
} else {
// 如果配置了partitionSelected,优先这个配置
partitionPaths = Arrays.asList(partitionSelected.split(","));
}
// 过滤需要clustering的分区
// 过滤策略对应配置项hoodie.clustering.plan.partition.filter.mode
// 可用策略为NONE,RECENT_DAYS,SELECTED_PARTITIONS和DAY_ROLLING
partitionPaths = filterPartitionPaths(partitionPaths);
LOG.info("Scheduling clustering partitionPaths: " + partitionPaths);
// 如果所有的分区都被排除了,返回空
if (partitionPaths.isEmpty()) {
// In case no partitions could be picked, return no clustering plan
return Option.empty();
}
// 排除掉分区中已经要做clustering的file group(pending状态)
// 筛选出小文件
// 决定小文件判断阈值的配置项为hoodie.clustering.plan.strategy.small.file.limit
// 将其映射为HoodieClusteringGroup
// 映射逻辑后面分析
List<HoodieClusteringGroup> clusteringGroups = getEngineContext()
.flatMap(
partitionPaths,
partitionPath -> {
List<FileSlice> fileSlicesEligible = getFileSlicesEligibleForClustering(partitionPath).collect(Collectors.toList());
return buildClusteringGroupsForPartition(partitionPath, fileSlicesEligible).limit(getWriteConfig().getClusteringMaxNumGroups());
},
partitionPaths.size())
.stream()
.limit(getWriteConfig().getClusteringMaxNumGroups())
.collect(Collectors.toList());
if (clusteringGroups.isEmpty()) {
LOG.warn("No data available to cluster");
return Option.empty();
}
// 构造cluster策略
HoodieClusteringStrategy strategy = HoodieClusteringStrategy.newBuilder()
.setStrategyClassName(getWriteConfig().getClusteringExecutionStrategyClass())
.setStrategyParams(getStrategyParams())
.build();
// 构造clustering计划
return Option.of(HoodieClusteringPlan.newBuilder()
.setStrategy(strategy)
.setInputGroups(clusteringGroups)
.setExtraMetadata(getExtraMetadata())
.setVersion(getPlanVersion())
.setPreserveHoodieMetadata(true)
.build());
}
上面的filterPartitionPaths
通过配置的hoodie.clustering.plan.partition.filter.mode
过滤出所需的partition。具有有如下选项:
- NONE: 不过滤,返回所有partition path。
- RECENT_DAYS: 按照partition path倒序排序。跳过
hoodie.clustering.plan.strategy.daybased.skipfromlatest.partitions
个partition,返回hoodie.clustering.plan.strategy.daybased.lookback.partitions
个partition。如果partition path是日期,可以实现过滤出最近N天的数据。 - SELECTED_PARTITIONS: 获取
hoodie.clustering.plan.strategy.cluster.begin.partition
和hoodie.clustering.plan.strategy.cluster.end.partition
之间的分区。 - DAY_ROLLING: 每次clustering一部分分区。如果分区的index对24取余等于排期时候当前时间的小时数,则该分区需要clustering。
buildClusteringGroupsForPartition
方法将筛选出的file slice按照从小到大排序。然后按照clustering配置的group size和group数量条件,合并为clustering group。
protected Stream<HoodieClusteringGroup> buildClusteringGroupsForPartition(String partitionPath, List<FileSlice> fileSlices) {
// 获取写入配置
HoodieWriteConfig writeConfig = getWriteConfig();
List<Pair<List<FileSlice>, Integer>> fileSliceGroups = new ArrayList<>();
List<FileSlice> currentGroup = new ArrayList<>();
// Sort fileSlices before dividing, which makes dividing more compact
// file slice按照base file大小排序,如果文件不存在,按照最大大小排序
List<FileSlice> sortedFileSlices = new ArrayList<>(fileSlices);
sortedFileSlices.sort((o1, o2) -> (int)
((o2.getBaseFile().isPresent() o2.getBaseFile().get().getFileSize() : writeConfig.getParquetMaxFileSize())
- (o1.getBaseFile().isPresent() o1.getBaseFile().get().getFileSize() : writeConfig.getParquetMaxFileSize())));
long totalSizeSoFar = 0;
for (FileSlice currentSlice : sortedFileSlices) {
// 遍历所有file slice
// 获取当前file slice的大小,如果文件不存在,获取大小上限
long currentSize = currentSlice.getBaseFile().isPresent() currentSlice.getBaseFile().get().getFileSize() : writeConfig.getParquetMaxFileSize();
// check if max size is reached and create new group, if needed.
// 如果本次累积的文件大小大于hoodie.clustering.plan.strategy.max.bytes.per.group
// 并且当前group不为空
if (totalSizeSoFar + currentSize > writeConfig.getClusteringMaxBytesInGroup() && !currentGroup.isEmpty()) {
// totalSizeSoFar除以hoodie.clustering.plan.strategy.target.file.max.bytes向上取整
// 计算出输出组编号
int numOutputGroups = getNumberOfOutputFileGroups(totalSizeSoFar, writeConfig.getClusteringTargetFileMaxBytes());
LOG.info("Adding one clustering group " + totalSizeSoFar + " max bytes: "
+ writeConfig.getClusteringMaxBytesInGroup() + " num input slices: " + currentGroup.size() + " output groups: " + numOutputGroups);
// 加入到fileSliceGroups集合中,保存结果
// 保存了输出组组和输出组编号
fileSliceGroups.add(Pair.of(currentGroup, numOutputGroups));
// 结果保存之后,清零currentGroup和totalSizeSoFar
currentGroup = new ArrayList<>();
totalSizeSoFar = 0;
// if fileSliceGroups's size reach the max group, stop loop
// 检查file group个数是否超过了hoodie.clustering.plan.strategy.max.num.groups
// 超过的话退出循环,本次不再处理后面的file slice
if (fileSliceGroups.size() >= writeConfig.getClusteringMaxNumGroups()) {
LOG.info("Having generated the maximum number of groups : " + writeConfig.getClusteringMaxNumGroups());
break;
}
}
// Add to the current file-group
// 加入到当前文件组
currentGroup.add(currentSlice);
// assume each file group size is ~= parquet.max.file.size
// 累积大小到totalSizeSoFar变量
totalSizeSoFar += currentSize;
}
if (!currentGroup.isEmpty()) {
// 处理最后一个output group
// shouldClusteringSingleGroup在下面两个配置项任意一个启用的时候为true
// 表示只有一个输出文件组的话,也clustering
// hoodie.clustering.plan.strategy.sort.columns
// hoodie.clustering.plan.strategy.single.group.clustering.enabled
if (currentGroup.size() > 1 || writeConfig.shouldClusteringSingleGroup()) {
int numOutputGroups = getNumberOfOutputFileGroups(totalSizeSoFar, writeConfig.getClusteringTargetFileMaxBytes());
LOG.info("Adding final clustering group " + totalSizeSoFar + " max bytes: "
+ writeConfig.getClusteringMaxBytesInGroup() + " num input slices: " + currentGroup.size() + " output groups: " + numOutputGroups);
fileSliceGroups.add(Pair.of(currentGroup, numOutputGroups));
}
}
// 构造并返回fileSliceGroups
return fileSliceGroups.stream().map(fileSliceGroup ->
HoodieClusteringGroup.newBuilder()
.setSlices(getFileSliceInfo(fileSliceGroup.getLeft()))
.setNumOutputFileGroups(fileSliceGroup.getRight())
.setMetrics(buildMetrics(fileSliceGroup.getLeft()))
.build());
}
到此为止clustering计划生成部分分析完毕。
根据执行计划执行clustering
Clustering的执行开始于BaseHoodieWriteClient::cluster
。
在clustering之前,首先执行preWrite
操作。
public HoodieWriteMetadata<O> cluster(String clusteringInstant, boolean shouldComplete) {
// 创建hudi table,根据引擎(Spark/Flink)和表类型(MOR/COW)的不同,有多种实现类
HoodieTable table = createTable(config, context.getHadoopConf().get());
// 执行写入前操作,包含:
// inflight和requested instant去掉本次instant
// 启动clean和archive服务(如果开启的话)
preWrite(clusteringInstant, WriteOperationType.CLUSTER, table.getMetaClient());
// 执行clutering
return tableServiceClient.cluster(clusteringInstant, shouldComplete);
}
接着是BaseHoodieTableServiceClient::cluster
方法。该方法检测当前clustering是否已经pending,配置监控,执行clustering并返回clustering执行结果元数据。
public HoodieWriteMetadata<O> cluster(String clusteringInstant, boolean shouldComplete) {
// 同上个方法,获取table
HoodieTable<?, I, ?, T> table = createTable(config, context.getHadoopConf().get());
HoodieTimeline pendingClusteringTimeline = table.getActiveTimeline().filterPendingReplaceTimeline();
HoodieInstant inflightInstant = HoodieTimeline.getReplaceCommitInflightInstant(clusteringInstant);
// 检查本次cluster是否已经pending状态。如果是,需要回滚
if (pendingClusteringTimeline.containsInstant(inflightInstant)) {
table.rollbackInflightClustering(inflightInstant, commitToRollback -> getPendingRollbackInfo(table.getMetaClient(), commitToRollback, false));
table.getMetaClient().reloadActiveTimeline();
}
// cluster时长计时器监控
clusteringTimer = metrics.getClusteringCtx();
LOG.info("Starting clustering at " + clusteringInstant);
// 调用table的cluster服务
HoodieWriteMetadata<T> writeMetadata = table.cluster(context, clusteringInstant);
// 转换metadata到对应计算引擎格式
HoodieWriteMetadata<O> clusteringMetadata = convertToOutputMetadata(writeMetadata);
// Validation has to be done after cloning. if not, it could result in referencing the write status twice which means clustering could get executed twice.
// 检查cluster写入状态不能为空
validateClusteringCommit(clusteringMetadata, clusteringInstant, table);
// Publish file creation metrics for clustering.
// 读取并返回监控信息
if (config.isMetricsOn()) {
clusteringMetadata.getWriteStats()
.ifPresent(hoodieWriteStats -> hoodieWriteStats.stream()
.filter(hoodieWriteStat -> hoodieWriteStat.getRuntimeStats() != null)
.map(hoodieWriteStat -> hoodieWriteStat.getRuntimeStats().getTotalCreateTime())
.forEach(metrics::updateClusteringFileCreationMetrics));
}
// TODO : Where is shouldComplete used
if (shouldComplete && clusteringMetadata.getCommitMetadata().isPresent()) {
completeClustering((HoodieReplaceCommitMetadata) clusteringMetadata.getCommitMetadata().get(), table, clusteringInstant, Option.ofNullable(convertToWriteStatus(writeMetadata)));
}
return clusteringMetadata;
}
以Spark为例,我们查看COW表的HoodieSparkCopyOnWriteTable::cluster
逻辑。
public HoodieWriteMetadata<HoodieData<WriteStatus>> cluster(HoodieEngineContext context,
String clusteringInstantTime) {
return new SparkExecuteClusteringCommitActionExecutor<>(context, config, this, clusteringInstantTime).execute();
}
此逻辑交由SparkExecuteClusteringCommitActionExecutor
执行。继续分析SparkExecuteClusteringCommitActionExecutor::execute
方法,它调用了BaseCommitActionExecutor::executeClustering
方法。
@Override
public HoodieWriteMetadata<HoodieData<WriteStatus>> execute() {
return executeClustering(clusteringPlan);
}
BaseCommitActionExecutor::executeClustering
该方法反射加载hoodie.clustering.execution.strategy.class
配置项对应的clustering策略(默认为SparkSortAndSizeExecutionStrategy
),然后执行clustering。
protected HoodieWriteMetadata<HoodieData<WriteStatus>> executeClustering(HoodieClusteringPlan clusteringPlan) {
// 创建instant
HoodieInstant instant = HoodieTimeline.getReplaceCommitRequestedInstant(instantTime);
// Mark instant as clustering inflight
// 标记instant为inflight状态
table.getActiveTimeline().transitionReplaceRequestedToInflight(instant, Option.empty());
table.getMetaClient().reloadActiveTimeline();
// Disable auto commit. Strategy is only expected to write data in new files.
// 禁用自动commit
config.setValue(HoodieWriteConfig.AUTO_COMMIT_ENABLE, Boolean.FALSE.toString());
// 添加_hoodie_commit_time等5个元数据字段到schema中
final Schema schema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema()));
// 加载hoodie.clustering.execution.strategy.class配置项对应的clustering策略类
// 执行它的performClustering方法
// 对于默认的配置,clustering策略类为SparkSortAndSizeExecutionStrategy
HoodieWriteMetadata<HoodieData<WriteStatus>> writeMetadata = (
(ClusteringExecutionStrategy<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>>)
ReflectionUtils.loadClass(config.getClusteringExecutionStrategyClass(),
new Class<?>[] {HoodieTable.class, HoodieEngineContext.class, HoodieWriteConfig.class}, table, context, config))
.performClustering(clusteringPlan, schema, instantTime);
// 获取写入状态
HoodieData<WriteStatus> writeStatusList = writeMetadata.getWriteStatuses();
// 更新表索引,更新数据所在位置
HoodieData<WriteStatus> statuses = updateIndex(writeStatusList, writeMetadata);
// 持久化保存
statuses.persist(config.getString(WRITE_STATUS_STORAGE_LEVEL_VALUE), context, HoodieData.HoodieDataCacheKey.of(config.getBasePath(), instantTime));
// triggers clustering.
// 更新writeMetadata中的writestats
writeMetadata.setWriteStats(statuses.map(WriteStatus::getStat).collectAsList());
// 获取clustering操作的数据文件file id和partition path
writeMetadata.setPartitionToReplaceFileIds(getPartitionToReplacedFileIds(clusteringPlan, writeMetadata));
// 提交修改的writeMetadata,clustering对后续操作生效
commitOnAutoCommit(writeMetadata);
if (!writeMetadata.getCommitMetadata().isPresent()) {
HoodieCommitMetadata commitMetadata = CommitUtils.buildMetadata(writeMetadata.getWriteStats().get(), writeMetadata.getPartitionToReplaceFileIds(),
extraMetadata, operationType, getSchemaToStoreInCommit(), getCommitActionType());
writeMetadata.setCommitMetadata(Option.of(commitMetadata));
}
return writeMetadata;
}
Clustering的执行细节位于策略类中。我们这里分析默认的策略类SparkSortAndSizeExecutionStrategy::performClustering
方法。该方法位于父类MultipleSparkJobExecutionStrategy::performClustering
中。该方法使用线程池,一个线程处理一个input group(对应执行计划中提到的clustering group),但线程数不能超过配置的最大值。
@Override
public HoodieWriteMetadata<HoodieData<WriteStatus>> performClustering(final HoodieClusteringPlan clusteringPlan, final Schema schema, final String instantTime) {
JavaSparkContext engineContext = HoodieSparkEngineContext.getSparkContext(getEngineContext());
// 是否保留元数据,默认为true
boolean shouldPreserveMetadata = Option.ofNullable(clusteringPlan.getPreserveHoodieMetadata()).orElse(true);
// 使用专门的线程执行clustering。创建clustering线程池
// 取InputGroups数量(plan中clustering生成file group的数量)
// 最大值为hoodie.clustering.max.parallelism,最大值默认15
ExecutorService clusteringExecutorService = Executors.newFixedThreadPool(
Math.min(clusteringPlan.getInputGroups().size(), writeConfig.getClusteringMaxParallelism()),
new CustomizedThreadFactory("clustering-job-group", true));
try {
// execute clustering for each group async and collect WriteStatus
// 在线程池中执行clustering,获取执行结果
Stream<HoodieData<WriteStatus>> writeStatusesStream = FutureUtils.allOf(
clusteringPlan.getInputGroups().stream()
.map(inputGroup -> {
// hoodie.datasource.write.row.writer.enable如果为true,使用Spark原生的Row类型,避免类型转换引发的额外代价
if (getWriteConfig().getBooleanOrDefault("hoodie.datasource.write.row.writer.enable", true)) {
return runClusteringForGroupAsyncAsRow(inputGroup,
clusteringPlan.getStrategy().getStrategyParams(),
shouldPreserveMetadata,
instantTime,
clusteringExecutorService);
}
return runClusteringForGroupAsync(inputGroup,
clusteringPlan.getStrategy().getStrategyParams(),
shouldPreserveMetadata,
instantTime,
clusteringExecutorService);
})
.collect(Collectors.toList()))
.join()
.stream();
JavaRDD<WriteStatus>[] writeStatuses = convertStreamToArray(writeStatusesStream.map(HoodieJavaRDD::getJavaRDD));
JavaRDD<WriteStatus> writeStatusRDD = engineContext.union(writeStatuses);
HoodieWriteMetadata<HoodieData<WriteStatus>> writeMetadata = new HoodieWriteMetadata<>();
writeMetadata.setWriteStatuses(HoodieJavaRDD.of(writeStatusRDD));
return writeMetadata;
} finally {
clusteringExecutorService.shutdown();
}
}
我们继续分析默认配置的执行路线ClusteringPlanActionExecutor::runClusteringForGroupAsyncAsRow
。该方法获取到所有需要clustering的数据到Spark的dataset,读取表schema和各个file id从属的partition path的对应关系。然后执行clustering。
private CompletableFuture<HoodieData<WriteStatus>> runClusteringForGroupAsyncAsRow(HoodieClusteringGroup clusteringGroup,
Map<String, String> strategyParams,
boolean shouldPreserveHoodieMetadata,
String instantTime,
ExecutorService clusteringExecutorService) {
return CompletableFuture.supplyAsync(() -> {
// 获取spark context
JavaSparkContext jsc = HoodieSparkEngineContext.getSparkContext(getEngineContext());
// 转换所有clustering涉及到的数据为Spark DataSet
Dataset<Row> inputRecords = readRecordsForGroupAsRow(jsc, clusteringGroup, instantTime);
// 获取带有元数据字段的schema
Schema readerSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(getWriteConfig().getSchema()));
// 转换clustering的file slice为HoodieFileGroupId
// 保存的是partition path和file id的对应关系
List<HoodieFileGroupId> inputFileIds = clusteringGroup.getSlices().stream()
.map(info -> new HoodieFileGroupId(info.getPartitionPath(), info.getFileId()))
.collect(Collectors.toList());
// 执行clustering
return performClusteringWithRecordsAsRow(inputRecords, clusteringGroup.getNumOutputFileGroups(), instantTime, strategyParams, readerSchema, inputFileIds, shouldPreserveHoodieMetadata,
clusteringGroup.getExtraMetadata());
}, clusteringExecutorService);
}
SparkSortAndSizeExecutionStrategy::performClusteringWithRecordsAsRow
方法获取分区器,将数据重新排序,最后使用批量插入的方式,写回parquet文件。
@Override
public HoodieData<WriteStatus> performClusteringWithRecordsAsRow(Dataset<Row> inputRecords,
int numOutputGroups,
String instantTime, Map<String, String> strategyParams,
Schema schema,
List<HoodieFileGroupId> fileGroupIdList,
boolean shouldPreserveHoodieMetadata,
Map<String, String> extraMetadata) {
LOG.info("Starting clustering for a group, parallelism:" + numOutputGroups + " commit:" + instantTime);
// 生成写入配置,clustering输出多少个file group就配置多少个bulk insert并行度
HoodieWriteConfig newConfig = HoodieWriteConfig.newBuilder()
.withBulkInsertParallelism(numOutputGroups)
.withProps(getWriteConfig().getProps()).build();
// 配置最大parquet文件大小为clustering目标文件最大上限
// 对应配置项为hoodie.clustering.plan.strategy.target.file.max.bytes
newConfig.setValue(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE, String.valueOf(getWriteConfig().getClusteringTargetFileMaxBytes()));
// 获取分区器
BulkInsertPartitioner<Dataset<Row>> partitioner = getRowPartitioner(strategyParams, schema);
// 使用分区器分区数据(数据重新排序)
Dataset<Row> repartitionedRecords = partitioner.repartitionRecords(inputRecords, numOutputGroups);
// 将重排序之后的数据批量插入
return HoodieDatasetBulkInsertHelper.bulkInsert(repartitionedRecords, instantTime, getHoodieTable(), newConfig,
partitioner.arePartitionRecordsSorted(), shouldPreserveHoodieMetadata);
}
接下来分析的重点是clustering的另一个功能:将数据重排序。因此重点是分区器和分区器重排序的逻辑。获取分区器的逻辑位于它的父类MultipleSparkJobExecutionStrategy::getRowPartitioner
中。代码如下:
private <I> BulkInsertPartitioner<I> getPartitioner(Map<String, String> strategyParams,
Schema schema,
boolean isRowPartitioner) {
// 获取排序字段配置项
// 对应的配置项为hoodie.clustering.plan.strategy.sort.columns
// 使用逗号分隔
Option<String[]> orderByColumnsOpt =
Option.ofNullable(strategyParams.get(PLAN_STRATEGY_SORT_COLUMNS.key()))
.map(listStr -> listStr.split(","));
return orderByColumnsOpt.map(orderByColumns -> {
// 获取hoodie.layout.optimize.strategy配置,字段可使用zorder或者hilbert曲线排序或者linear线性排序
HoodieClusteringConfig.LayoutOptimizationStrategy layoutOptStrategy = getWriteConfig().getLayoutOptimizationStrategy();
switch (layoutOptStrategy) {
case ZORDER:
case HILBERT:
return isRowPartitioner
new RowSpatialCurveSortPartitioner(getWriteConfig())
: new RDDSpatialCurveSortPartitioner((HoodieSparkEngineContext) getEngineContext(), orderByColumns, layoutOptStrategy,
getWriteConfig().getLayoutOptimizationCurveBuildMethod(), HoodieAvroUtils.addMetadataFields(schema), recordType);
case LINEAR:
return isRowPartitioner
new RowCustomColumnsSortPartitioner(orderByColumns, getWriteConfig())
: new RDDCustomColumnsSortPartitioner(orderByColumns, HoodieAvroUtils.addMetadataFields(schema), getWriteConfig());
default:
throw new UnsupportedOperationException(String.format("Layout optimization strategy '%s' is not supported", layoutOptStrategy));
}
}).orElseGet(() -> isRowPartitioner
BulkInsertInternalPartitionerWithRowsFactory.get(getWriteConfig(), getHoodieTable().isPartitioned(), true)
: BulkInsertInternalPartitionerFactory.get(getHoodieTable(), getWriteConfig(), true));
}
对于使用Spark原生Row类型的情况,isRowPartitioner
为true
。如果使用ZORDER
或者HILBERT
排序策略,使用RowSpatialCurveSortPartitioner
,LINEAR排序策略对应的是RowCustomColumnsSortPartitioner
。
接下来我们分别分析这两个partitioner是如何对数据重排序的。
首先是RowSpatialCurveSortPartitioner::repartitionRecords
,代码如下:
@Override
public Dataset<Row> repartitionRecords(Dataset<Row> records, int outputPartitions) {
return reorder(records, outputPartitions);
}
repartitionRecords
调用了reorder
方法。
protected Dataset<Row> reorder(Dataset<Row> dataset, int numOutputGroups) {
// 检查排序字段配置
if (orderByColumns.length == 0) {
// No-op
return dataset;
}
List<String> orderedCols = Arrays.asList(orderByColumns);
// curveCompositionStrategyType默认为DIRECT
switch (curveCompositionStrategyType) {
case DIRECT:
return SpaceCurveSortingHelper.orderDataFrameByMappingValues(dataset, layoutOptStrategy, orderedCols, numOutputGroups);
case SAMPLE:
return SpaceCurveSortingHelper.orderDataFrameBySamplingValues(dataset, layoutOptStrategy, orderedCols, numOutputGroups);
default:
throw new UnsupportedOperationException(String.format("Unsupported space-curve curve building strategy (%s)", curveCompositionStrategyType));
}
}
SpatialCurveCompositionStrategyType
中SAMPLE的数据排序分布效果较DIRECT更好,但是执行速度更慢。默认配置的是DIRECT类型。
接下来分析DIRECT类型处理方式,对应的是SpaceCurveSortingHelper::orderDataFrameByMappingValues
。该方法首先判断排序字段配置的合法性。然后将数据按照排序字段,使用Z曲线或者是Hilbert曲线重排序。
public static Dataset<Row> orderDataFrameByMappingValues(
Dataset<Row> df,
HoodieClusteringConfig.LayoutOptimizationStrategy layoutOptStrategy,
List<String> orderByCols,
int targetPartitionCount
) {
// 获取字段名称和StructField的对应关系
Map<String, StructField> columnsMap =
Arrays.stream(df.schema().fields())
.collect(Collectors.toMap(StructField::name, Function.identity()));
// 检查排序字段是否出现在schema中
List<String> checkCols =
orderByCols.stream()
.filter(columnsMap::containsKey)
.collect(Collectors.toList());
// 如果没有,说明排序字段配置有误,跳过不再继续执行
if (orderByCols.size() != checkCols.size()) {
LOG.error(String.format("Trying to ordering over a column(s) not present in the schema (%s); skipping", CollectionUtils.diff(orderByCols, checkCols)));
return df;
}
// In case when there's just one column to be ordered by, we can skip space-curve
// ordering altogether (since it will match linear ordering anyway)
// 如果排序字段只有一个,没必要使用空间曲线方式排序,直接使用Spark排序
if (orderByCols.size() == 1) {
String orderByColName = orderByCols.get(0);
LOG.debug(String.format("Single column to order by (%s), skipping space-curve ordering", orderByColName));
// TODO validate if we need Spark to re-partition
return df.repartitionByRange(targetPartitionCount, new Column(orderByColName));
}
// 字段个数
int fieldNum = df.schema().fields().length;
// 返回排序字段对应的index和字段信息对应关系
Map<Integer, StructField> fieldMap =
orderByCols.stream()
.collect(
Collectors.toMap(e -> Arrays.asList(df.schema().fields()).indexOf(columnsMap.get(e)), columnsMap::get));
JavaRDD<Row> sortedRDD;
// 根据布局优化策略,排序RDD
switch (layoutOptStrategy) {
case ZORDER:
sortedRDD = createZCurveSortedRDD(df.toJavaRDD(), fieldMap, fieldNum, targetPartitionCount);
break;
case HILBERT:
sortedRDD = createHilbertSortedRDD(df.toJavaRDD(), fieldMap, fieldNum, targetPartitionCount);
break;
default:
throw new UnsupportedOperationException(String.format("Not supported layout-optimization strategy (%s)", layoutOptStrategy));
}
// Compose new {@code StructType} for ordered RDDs
// 为排序后的RDD创建StructType(schema)
StructType newStructType = composeOrderedRDDStructType(df.schema());
// 返回dataset
return df.sparkSession()
.createDataFrame(sortedRDD, newStructType)
.drop("Index");
}
我们先看第一种情况,Z曲线排序。方法位于SpaceCurveSortingHelper::createZCurveSortedRDD
。
该方法将多个排序字段的值映射为8字节内容(多的截取少的补充),然后每个字段的字节内容各取一位拼接到一起,然后再各取第二位拼接……一直循环,这个步骤称之为二进制数据交织(interleaving)。将交织之后的值作为一个字段,拼接在数据中。然后按照该字段的内容排序。
private static JavaRDD<Row> createZCurveSortedRDD(JavaRDD<Row> originRDD, Map<Integer, StructField> fieldMap, int fieldNum, int fileNum) {
return originRDD.map(row -> {
// 将数据中每个排序字段的值填充为8字节内容
// 多的截取少的补充
byte[][] zBytes = fieldMap.entrySet().stream()
.map(entry -> {
int index = entry.getKey();
StructField field = entry.getValue();
return mapColumnValueTo8Bytes(row, index, field.dataType());
})
.toArray(byte[][]::new);
// Interleave received bytes to produce Z-curve ordinal
// 将这些排序字段的值交织起来
// 比如有A,B两个排序字段。A字段值取1位,然后取B字段值1位,然后A再取下一位,B取下一位,以此类推
byte[] zOrdinalBytes = BinaryUtil.interleaving(zBytes, 8);
// 追加zOrdinalBytes到Row
return appendToRow(row, zOrdinalBytes);
})
// 按照该字段的值(zOrdinalBytes,位于row的末尾,index正好是fieldNum)排序
.sortBy(f -> new ByteArraySorting((byte[]) f.get(fieldNum)), true, fileNum);
}
第二种情况为Hilbert曲线,对应方法为SpaceCurveSortingHelper::createHilbertSortedRDD
。和ZOrder曲线排序处理逻辑基本相同,只是将Z曲线替换成了Hilbert曲线。
private static JavaRDD<Row> createHilbertSortedRDD(JavaRDD<Row> originRDD, Map<Integer, StructField> fieldMap, int fieldNum, int fileNum) {
// NOTE: Here {@code mapPartitions} is used to make sure Hilbert curve instance is initialized
// only once per partition
return originRDD.mapPartitions(rows -> {
// 创建hilbert fieldMap个数维度曲线
HilbertCurve hilbertCurve = HilbertCurve.bits(63).dimensions(fieldMap.size());
return new Iterator<Row>() {
@Override
public boolean hasNext() {
return rows.hasNext();
}
@Override
public Row next() {
Row row = rows.next();
// 将row中的排序字段值映射为long类型
long[] longs = fieldMap.entrySet().stream()
.mapToLong(entry -> {
int index = entry.getKey();
StructField field = entry.getValue();
return mapColumnValueToLong(row, index, field.dataType());
})
.toArray();
// Map N-dimensional coordinates into position on the Hilbert curve
// 使用hilbert曲线索引上面的long值,结果作为后面的排序依据
byte[] hilbertCurvePosBytes = HilbertCurveUtils.indexBytes(hilbertCurve, longs, 63);
return appendToRow(row, hilbertCurvePosBytes);
}
};
})
.sortBy(f -> new ByteArraySorting((byte[]) f.get(fieldNum)), true, fileNum);
}
和空间曲线的方式相比,LINEAR线性排序显得较为简单。代码位于RowCustomColumnsSortPartitioner::repartitionRecords
。通过spark的sort算子按照配置的column排序。
@Override
public Dataset<Row> repartitionRecords(Dataset<Row> records, int outputSparkPartitions) {
return records
.sort(Arrays.stream(sortColumnNames).map(Column::new).toArray(Column[]::new))
.coalesce(outputSparkPartitions);
}
到这里位置分区器的逻辑分析完毕。