Clean的概念
Hudi表拥有时间线(Timeline)。可以理解为Hudi表的修改日志。Hudi不仅记录了什么时候发生了何种类型修改,还记录了这次修改对应的数据文件。这些数据文件可能保存了过期的数据,称之为历史数据。不难理解,表在长时间操作之后会生成大量的历史数据。这些历史数据占据了大量的存储空间。Clean就是为了解决历史数据占用过大存储空间的问题。Clean操作能够根据配置好的清理策略,按要求清除掉历史数据文件,释放空间减小磁盘压力。
Clean的配置和操作方式
Hudi Clean的核心配置项为清理策略hoodie.cleaner.policy
。它有下面3个值:
- KEEP_LATEST_COMMITS。此项为默认值。保留最近的N个commit。默认N为10。可以通过配置项
hoodie.cleaner.commits.retained
修改N的值。 - KEEP_LATEST_BY_HOURS。保留最近N小时内的commit。默认N为24。可以通过配置项
hoodie.cleaner.hours.retained
修改N的值。 - KEEP_LATEST_FILE_VERSIONS。保留最近的N个版本。默认N为3。可以通过配置项
hoodie.cleaner.fileversions.retained
修改N的值。
除此之外相关的配置项还有:
- hoodie.clean.automatic。每次commit之后是否会自动clean。默认为true。
- hoodie.clean.async。异步clean。要求
hoodie.clean.automatic
开启的时候才能够启用。允许写入和clean同时进行。默认为false。 - hoodie.clean.trigger.strategy。Clean的plan策略。依照什么标准schedule下一次clean。目前只有一个值
NUM_COMMITS
,即根据提交数。 - hoodie.clean.max.commits。上次clean之后又创建多少个commit,会schedule下一次clean操作。默认为1。
- hoodie.cleaner.incremental.mode。增量clean模式,默认启用。开启之后每次plan clean操作的时候,只需要计算上次clean后保留的最近的instant和下次clean需要保留到最近的instant之间的instant对应的文件。可减少clean plan阶段的耗时。
- hoodie.cleaner.policy.failed.writes。Clean对待之前失败写入的策略。默认是
EAGER
。EAGER支持单写入的时候,每次写入之前查找并回滚失败的写入。LAZY模式支持多写的情况下,在clean的时候回滚失败的写入。 - hoodie.cleaner.parallelism。Clean操作的并行度。默认为200。对Spark有效。一个分区的清理任务会分配给一个Spark任务。
- hoodie.clean.allow.multiple。是否允许schedule或执行多个clean。默认为true。
Clean默认是随着数据的写入操作自动进行。也可以使用专门的Spark任务离线执行。使用方式可参考官网:https://hudi.apache.org/cn/docs/hoodie_cleaner/#run-independently。
例如:
spark-submit --master local --class org.apache.hudi.utilities.HoodieCleaner `ls packaging/hudi-utilities-bundle/target/hudi-utilities-bundle-*.jar`\
--target-base-path /path/to/hoodie_table \
--hoodie-conf hoodie.cleaner.policy=KEEP_LATEST_COMMITS \
--hoodie-conf hoodie.cleaner.commits.retained=10 \
--hoodie-conf hoodie.cleaner.parallelism=200
运行cleaner,保留最近10次commit。
spark-submit --master local --class org.apache.hudi.utilities.HoodieCleaner `ls packaging/hudi-utilities-bundle/target/hudi-utilities-bundle-*.jar`\
--target-base-path /path/to/hoodie_table \
--hoodie-conf hoodie.cleaner.policy=KEEP_LATEST_FILE_VERSIONS \
--hoodie-conf hoodie.cleaner.fileversions.retained=3 \
--hoodie-conf hoodie.cleaner.parallelism=200
运行cleaner,保留最近3个版本。
spark-submit --master local --class org.apache.hudi.utilities.HoodieCleaner `ls packaging/hudi-utilities-bundle/target/hudi-utilities-bundle-*.jar`\
--target-base-path /path/to/hoodie_table \
--hoodie-conf hoodie.cleaner.policy=KEEP_LATEST_BY_HOURS \
--hoodie-conf hoodie.cleaner.hours.retained=24 \
--hoodie-conf hoodie.cleaner.parallelism=200
运行cleaner,保留最近24小时内的commit。
Clean源代码分析
BaseHoodieTableServiceClient::clean
clean的入口位于BaseHoodieTableServiceClient::clean
。内容如下:
/**
* Clean up any stale/old files/data lying around (either on file storage or index storage) based on the
* configurations and CleaningPolicy used. (typically files that no longer can be used by a running query can be
* cleaned). This API provides the flexibility to schedule clean instant asynchronously via
* {@link BaseHoodieTableServiceClient#scheduleTableService(String, Option, TableServiceType)} and disable inline scheduling
* of clean.
*
* @param cleanInstantTime instant time for clean.
* @param scheduleInline true if needs to be scheduled inline. false otherwise.
*/
@Nullable
public HoodieCleanMetadata clean(String cleanInstantTime, boolean scheduleInline) throws HoodieIOException {
// 检查hoodie.table.services.enabled配置项必须启用
// 该配置项控制着所有table service是否启用。例如archive, clean, compact和cluster
if (!tableServicesEnabled(config)) {
return null;
}
// 获取clean的定时器context,监控相关
final Timer.Context timerContext = metrics.getCleanCtx();
// 将失败的写入回滚
CleanerUtils.rollbackFailedWrites(config.getFailedWritesCleanPolicy(),
HoodieTimeline.CLEAN_ACTION, () -> rollbackFailedWrites());
// 创建hudi table,根据引擎(Spark/Flink)和表类型(MOR/COW)的不同,有多种实现类
HoodieTable table = createTable(config, hadoopConf);
// 是否允许多次clean(即上次clean还未完成的时候,还能够再schedule一次clean),对应配置项hoodie.clean.allow.multiple
// 或者timeline中不存在inflight和requested状态的instant的时候,执行这一段
if (config.allowMultipleCleans() || !table.getActiveTimeline().getCleanerTimeline().filterInflightsAndRequested().firstInstant().isPresent()) {
LOG.info("Cleaner started");
// proceed only if multiple clean schedules are enabled or if there are no pending cleans.
// 如果需要schedule clean操作
if (scheduleInline) {
// schedule clean操作
// 写入一个clean类型,状态为requested的instant
scheduleTableServiceInternal(cleanInstantTime, Option.empty(), TableServiceType.CLEAN);
// 重新载入timeline
table.getMetaClient().reloadActiveTimeline();
}
// clean操作不支持代理到table service manager
if (shouldDelegateToTableServiceManager(config, ActionType.clean)) {
LOG.warn("Cleaning is not yet supported with Table Service Manager.");
return null;
}
}
// Proceeds to execute any requested or inflight clean instances in the timeline
// 执行timeline中的状态为requested或者inflight的clean操作
HoodieCleanMetadata metadata = table.clean(context, cleanInstantTime);
// 记录clean耗时和总计删除的文件数等
if (timerContext != null && metadata != null) {
long durationMs = metrics.getDurationInMs(timerContext.stop());
metrics.updateCleanMetrics(durationMs, metadata.getTotalFilesDeleted());
LOG.info("Cleaned " + metadata.getTotalFilesDeleted() + " files"
+ " Earliest Retained Instant :" + metadata.getEarliestCommitToRetain()
+ " cleanerElapsedMs" + durationMs);
}
return metadata;
}
创建cleaner plan和clean的具体处理逻辑位于table.clean
方法。其中Flink的实现位于HoodieFlinkCopyOnWriteTable
中,Spark的实现位于HoodieSparkCopyOnWriteTable
中。下面以Flink为例分析创建clean计划和执行clean计划的逻辑。
Flink创建cleaner plan的逻辑
FlinkFlink创建cleaner plan的逻辑位于HoodieFlinkCopyOnWriteTable
。代码如下:
@Override
public Option<HoodieCleanerPlan> scheduleCleaning(HoodieEngineContext context, String instantTime, Option<Map<String, String>> extraMetadata) {
return new CleanPlanActionExecutor(context, config, this, instantTime, extraMetadata).execute();
}
Plan的操作由CleanPlanActionExecutor
完成。接下来对这个类展开分析。
CleanPlanActionExecutor
CleanPlanActionExecutor
的入口方法为execute
(执行)。
CleanPlanActionExecutor
的execute
方法内容如下:
@Override
public Option<HoodieCleanerPlan> execute() {
// 判断是否需要clean
// 如果上次clean之后commit的次数大于等于hoodie.clean.max.commits,需要clean
if (!needsCleaning(config.getCleaningTriggerStrategy())) {
return Option.empty();
}
// Plan a new clean action
// 计划新的clean操作
return requestClean(instantTime);
}
execute
方法首先判断是否需要clean。如果需要,执行requestClean
方法生成Hudi clean计划。内容如下:
protected Option<HoodieCleanerPlan> requestClean(String startCleanTime) {
// 创建cleaner plan,这里是重点,后面分析
final HoodieCleanerPlan cleanerPlan = requestClean(context);
Option<HoodieCleanerPlan> option = Option.empty();
// 如果plan中需要删除的文件不为空
if (nonEmpty(cleanerPlan.getFilePathsToBeDeletedPerPartition())
&& cleanerPlan.getFilePathsToBeDeletedPerPartition().values().stream().mapToInt(List::size).sum() > 0) {
// Only create cleaner plan which does some work
// 创建clean类型的instant
final HoodieInstant cleanInstant = new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.CLEAN_ACTION, startCleanTime);
// Save to both aux and timeline folder
try {
// 创建requested clean instant文件(.hoodie目录中)
table.getActiveTimeline().saveToCleanRequested(cleanInstant, TimelineMetadataUtils.serializeCleanerPlan(cleanerPlan));
LOG.info("Requesting Cleaning with instant time " + cleanInstant);
} catch (IOException e) {
LOG.error("Got exception when saving cleaner requested file", e);
throw new HoodieIOException(e.getMessage(), e);
}
option = Option.of(cleanerPlan);
}
return option;
}
它又调用了requestClean
重载方法。这个方法生成需要删除的文件列表。
HoodieCleanerPlan requestClean(HoodieEngineContext context) {
try {
// 生成clean计划由CleanPlanner负责
CleanPlanner<T, I, K, O> planner = new CleanPlanner<>(context, table, config);
// 根据配置的clean策略,获取需要最早保留的instant(之前的会被清理掉)
Option<HoodieInstant> earliestInstant = planner.getEarliestCommitToRetain();
context.setJobStatus(this.getClass().getSimpleName(), "Obtaining list of partitions to be cleaned: " + config.getTableName());
// 获取需要清理的文件对应的partition path
List<String> partitionsToClean = planner.getPartitionPathsToClean(earliestInstant);
// 如果为空,说明没有需要清理的文件
if (partitionsToClean.isEmpty()) {
LOG.info("Nothing to clean here. It is already clean");
return HoodieCleanerPlan.newBuilder().setPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name()).build();
}
LOG.info("Total Partitions to clean : " + partitionsToClean.size() + ", with policy " + config.getCleanerPolicy());
// 获取清理任务并行度。取待清理的partition数和配置的cleaner并行度两者的最小值
// 后面的参数对应配置项hoodie.cleaner.parallelism
int cleanerParallelism = Math.min(partitionsToClean.size(), config.getCleanerParallelism());
LOG.info("Using cleanerParallelism: " + cleanerParallelism);
context.setJobStatus(this.getClass().getSimpleName(), "Generating list of file slices to be cleaned: " + config.getTableName());
// 将每个partition path映射为partition和partition中需要删除的文件
// Pair类型第一个Boolean参数的含义为是否需要删除整个分区
// 第二个参数的含义为partition中需要删除的文件信息
Map<String, Pair<Boolean, List<CleanFileInfo>>> cleanOpsWithPartitionMeta = context
.map(partitionsToClean, partitionPathToClean -> Pair.of(partitionPathToClean, planner.getDeletePaths(partitionPathToClean)), cleanerParallelism)
.stream()
.collect(Collectors.toMap(Pair::getKey, Pair::getValue));
// 将CleanFileInfo转换为HoodieCleanFileInfo
Map<String, List<HoodieCleanFileInfo>> cleanOps = cleanOpsWithPartitionMeta.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey,
e -> CleanerUtils.convertToHoodieCleanFileInfoList(e.getValue().getValue())));
// 获取Pair中第一个参数为true的partition path列表
// 即需要删除分区的partition path
List<String> partitionsToDelete = cleanOpsWithPartitionMeta.entrySet().stream().filter(entry -> entry.getValue().getKey()).map(Map.Entry::getKey)
.collect(Collectors.toList());
// 构造cleaner plan并返回
return new HoodieCleanerPlan(earliestInstant
.map(x -> new HoodieActionInstant(x.getTimestamp(), x.getAction(), x.getState().name())).orElse(null),
planner.getLastCompletedCommitTimestamp(),
config.getCleanerPolicy().name(), CollectionUtils.createImmutableMap(),
CleanPlanner.LATEST_CLEAN_PLAN_VERSION, cleanOps, partitionsToDelete);
} catch (IOException e) {
throw new HoodieIOException("Failed to schedule clean operation", e);
}
}
该方法获取需要最早保留的instant,获取需要清理的分区和需要清理的文件,封装为HoodieCleanerPlan
返回。该方法将这些操作委托给了CleanPlanner
。接下来分析CleanPlanner
。
CleanPlanner
CleanPlanner
的getEarliestCommitToRetain
方法作用为根据清理策略计算出从哪个commit开始需要保留(这个commit之前的所有commit会被清理掉)。代码如下:
public Option<HoodieInstant> getEarliestCommitToRetain() {
Option<HoodieInstant> earliestCommitToRetain = Option.empty();
// 获取需要保留的commit个数,对应配置项hoodie.cleaner.commits.retained
int commitsRetained = config.getCleanerCommitsRetained();
// 获取需要保留多少个小时之内的commit,对应配置项hoodie.cleaner.hours.retained
int hoursRetained = config.getCleanerHoursRetained();
// 如果清理策略为保留最近N个commit
// 并且已提交的instant数量大于commitsRetained
if (config.getCleanerPolicy() == HoodieCleaningPolicy.KEEP_LATEST_COMMITS
&& commitTimeline.countInstants() > commitsRetained) {
// 找到最早的pending commit
Option<HoodieInstant> earliestPendingCommits = hoodieTable.getMetaClient()
.getActiveTimeline()
.getCommitsTimeline()
.filter(s -> !s.isCompleted()).firstInstant();
// 如果存在
if (earliestPendingCommits.isPresent()) {
// Earliest commit to retain must not be later than the earliest pending commit
// 确保最早需要保留的commit时间必须在最早的pending commit之前(pending commit不能被clean)
// nthInstant方法返回第n个instant
// commitTimeline.countInstants()(共多少个) - commitsRetained(需要保留多少个)= 第n个
earliestCommitToRetain =
commitTimeline.nthInstant(commitTimeline.countInstants() - commitsRetained).map(nthInstant -> {
// 如果第n个commit时间小于等于earliestPendingCommits,返回这个commit
if (nthInstant.compareTo(earliestPendingCommits.get()) <= 0) {
return Option.of(nthInstant);
} else {
// 否则,返回earliestPendingCommits前一个commit
return commitTimeline.findInstantsBefore(earliestPendingCommits.get().getTimestamp()).lastInstant();
}
}).orElse(Option.empty());
} else {
// 如果不存在pending的commit,直接计算第n个commit,和前面逻辑类似
earliestCommitToRetain = commitTimeline.nthInstant(commitTimeline.countInstants()
- commitsRetained); //15 instants total, 10 commits to retain, this gives 6th instant in the list
}
} else if (config.getCleanerPolicy() == HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS) {
// 如果清理策略为保留最近n小时的commit
// 获取当前时间
Instant instant = Instant.now();
ZonedDateTime currentDateTime = ZonedDateTime.ofInstant(instant, hoodieTable.getMetaClient().getTableConfig().getTimelineTimezone().getZoneId());
// 计算需要最早的需要保留的commit的时间
// 当前时间 - hoursRetained
String earliestTimeToRetain = HoodieActiveTimeline.formatDate(Date.from(currentDateTime.minusHours(hoursRetained).toInstant()));
// 找到所有时间在earliestTimeToRetain之后的commit,取第一个
earliestCommitToRetain = Option.fromJavaOptional(commitTimeline.getInstantsAsStream().filter(i -> HoodieTimeline.compareTimestamps(i.getTimestamp(),
HoodieTimeline.GREATER_THAN_OR_EQUALS, earliestTimeToRetain)).findFirst());
}
return earliestCommitToRetain;
}
CleanPlanner
的getPartitionPathsToClean
方法找出需要clean的分区。逻辑如下:
public List<String> getPartitionPathsToClean(Option<HoodieInstant> earliestRetainedInstant) throws IOException {
switch (config.getCleanerPolicy()) {
case KEEP_LATEST_COMMITS:
case KEEP_LATEST_BY_HOURS:
return getPartitionPathsForCleanByCommits(earliestRetainedInstant);
case KEEP_LATEST_FILE_VERSIONS:
// 保存最新版本的策略,必须处理所有文件。获取所有文件的partition path,后面分析
return getPartitionPathsForFullCleaning();
default:
throw new IllegalStateException("Unknown Cleaner Policy");
}
}
getPartitionPathsForCleanByCommits
方法中判断是否可以采用增量获取的方式获取需要clean的partition path。如果不行,需要扫描所有的partition。
private List<String> getPartitionPathsForCleanByCommits(Option<HoodieInstant> instantToRetain) throws IOException {
// 参数校验
if (!instantToRetain.isPresent()) {
LOG.info("No earliest commit to retain. No need to scan partitions !!");
return Collections.emptyList();
}
// 是否开启增量clean模式。开启后仅计算最近一次clean之后发生的事件。对应配置项hoodie.cleaner.incremental.mode
// 开启可提高性能,默认为开启状态
if (config.incrementalCleanerModeEnabled()) {
// 获取最近一次已完成的clean instant
Option<HoodieInstant> lastClean = hoodieTable.getCleanTimeline().filterCompletedInstants().lastInstant();
if (lastClean.isPresent()) {
// 如果存在
if (hoodieTable.getActiveTimeline().isEmpty(lastClean.get())) {
// 如果为空,删除这个instant
hoodieTable.getActiveTimeline().deleteEmptyInstantIfExists(lastClean.get());
} else {
// 获取上次clean的详细信息
HoodieCleanMetadata cleanMetadata = TimelineMetadataUtils
.deserializeHoodieCleanMetadata(hoodieTable.getActiveTimeline().getInstantDetails(lastClean.get()).get());
if ((cleanMetadata.getEarliestCommitToRetain() != null)
&& (cleanMetadata.getEarliestCommitToRetain().length() > 0)
&& !hoodieTable.getActiveTimeline().isBeforeTimelineStarts(cleanMetadata.getEarliestCommitToRetain())) {
// 如果获取到获取上次clean详细信息中的最早保留的commit
// 并且这个commit在第一个非savepoint类型的commit时间之后
// 获取上次clean之前保留的commit到这次clean需要保留的commit之间所有的instant对应文件的partition path
return getPartitionPathsForIncrementalCleaning(cleanMetadata, instantToRetain);
}
}
}
}
// 否则所有partition在clean之前都需要被扫描
return getPartitionPathsForFullCleaning();
}
getPartitionPathsForIncrementalCleaning
方法查找最近一次clean和earliestRetainedInstant
之间需要清除文件的partition path。是一种优化措施,不需要扫描所有分区。代码如下:
private List<String> getPartitionPathsForIncrementalCleaning(HoodieCleanMetadata cleanMetadata,
Option<HoodieInstant> newInstantToRetain) {
LOG.info("Incremental Cleaning mode is enabled. Looking up partition-paths that have since changed "
+ "since last cleaned at " + cleanMetadata.getEarliestCommitToRetain()
+ ". New Instant to retain : " + newInstantToRetain);
// 首先找到在cleanMetadata.getEarliestCommitToRetain()之后和newInstantToRetain之前的instant
return hoodieTable.getCompletedCommitsTimeline().getInstantsAsStream().filter(
instant -> HoodieTimeline.compareTimestamps(instant.getTimestamp(), HoodieTimeline.GREATER_THAN_OR_EQUALS,
cleanMetadata.getEarliestCommitToRetain()) && HoodieTimeline.compareTimestamps(instant.getTimestamp(),
HoodieTimeline.LESSER_THAN, newInstantToRetain.get().getTimestamp())).flatMap(instant -> {
try {
if (HoodieTimeline.REPLACE_COMMIT_ACTION.equals(instant.getAction())) {
// 如果instant为replace commit类型
HoodieReplaceCommitMetadata replaceCommitMetadata = HoodieReplaceCommitMetadata.fromBytes(
hoodieTable.getActiveTimeline().getInstantDetails(instant).get(), HoodieReplaceCommitMetadata.class);
// 已经被replace前后的文件(新老文件)都需要被清理
return Stream.concat(replaceCommitMetadata.getPartitionToReplaceFileIds().keySet().stream(), replaceCommitMetadata.getPartitionToWriteStats().keySet().stream());
} else {
// 如果是普通commit类型,获取这次commit写入的所有文件
HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
.fromBytes(hoodieTable.getActiveTimeline().getInstantDetails(instant).get(),
HoodieCommitMetadata.class);
return commitMetadata.getPartitionToWriteStats().keySet().stream();
}
} catch (IOException e) {
throw new HoodieIOException(e.getMessage(), e);
}
}).distinct().collect(Collectors.toList());
// stream去重之后返回
}
如果增量方式不可用,我们只能使用getPartitionPathsForFullCleaning
方法,暴力扫描所有的分区并返回。
private List<String> getPartitionPathsForFullCleaning() {
// Go to brute force mode of scanning all partitions
return FSUtils.getAllPartitionPaths(context, config.getMetadataConfig(), config.getBasePath());
}
我们回到CleanPlanner
的getDeletePaths
方法。它返回分区中需要删除的文件。代码如下:
/**
* Returns files to be cleaned for the given partitionPath based on cleaning policy.
*/
// 第一个boolean的含义是这个分区本身是否需要删除(如果分区内没有任何file group,分区就没必要存在)
// 第二个参数的含义是需要删除的文件信息
public Pair<Boolean, List<CleanFileInfo>> getDeletePaths(String partitionPath) {
HoodieCleaningPolicy policy = config.getCleanerPolicy();
Pair<Boolean, List<CleanFileInfo>> deletePaths;
if (policy == HoodieCleaningPolicy.KEEP_LATEST_COMMITS) {
deletePaths = getFilesToCleanKeepingLatestCommits(partitionPath);
} else if (policy == HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS) {
deletePaths = getFilesToCleanKeepingLatestVersions(partitionPath);
} else if (policy == HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS) {
deletePaths = getFilesToCleanKeepingLatestHours(partitionPath);
} else {
throw new IllegalArgumentException("Unknown cleaning policy : " + policy.name());
}
LOG.info(deletePaths.getValue().size() + " patterns used to delete in partition path:" + partitionPath);
if (deletePaths.getKey()) {
LOG.info("Partition " + partitionPath + " to be deleted");
}
return deletePaths;
}
该方法根据不同的清理策略调用不同的底层方法。其中KEEP_LATEST_COMMITS
和KEEP_LATEST_BY_HOURS
策略对应的底层方法相同。我们先分析它。
private Pair<Boolean, List<CleanFileInfo>> getFilesToCleanKeepingLatestCommits(String partitionPath) {
return getFilesToCleanKeepingLatestCommits(partitionPath, config.getCleanerCommitsRetained(), HoodieCleaningPolicy.KEEP_LATEST_COMMITS);
}
private Pair<Boolean, List<CleanFileInfo>> getFilesToCleanKeepingLatestHours(String partitionPath) {
return getFilesToCleanKeepingLatestCommits(partitionPath, 0, HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS);
}
getFilesToCleanKeepingLatestCommits
为保留最近N个commit的策略。对于KEEP_LATEST_BY_HOURS
,仍可以通过保留时间计算出需要保留的instant,因此逻辑是通用的。代码如下:
private Pair<Boolean, List<CleanFileInfo>> getFilesToCleanKeepingLatestCommits(String partitionPath, int commitsRetained, HoodieCleaningPolicy policy) {
LOG.info("Cleaning " + partitionPath + ", retaining latest " + commitsRetained + " commits. ");
List<CleanFileInfo> deletePaths = new ArrayList<>();
// Collect all the datafiles savepointed by all the savepoints
// 获取所有savepoint对应的数据文件
// 这些文件不能被删除掉
List<String> savepointedFiles = hoodieTable.getSavepointTimestamps().stream()
.flatMap(this::getSavepointedDataFiles)
.collect(Collectors.toList());
// determine if we have enough commits, to start cleaning.
// 标志着整个分区是否需要删除
boolean toDeletePartition = false;
// instant数量超过需要保留的instant数量的时候,才需要清理
if (commitTimeline.countInstants() > commitsRetained) {
// 获取需要保留的最早的instant
Option<HoodieInstant> earliestCommitToRetainOption = getEarliestCommitToRetain();
HoodieInstant earliestCommitToRetain = earliestCommitToRetainOption.get();
// all replaced file groups before earliestCommitToRetain are eligible to clean
// 获取所有的被替换文件,排除savepoint包含的文件
deletePaths.addAll(getReplacedFilesEligibleToClean(savepointedFiles, partitionPath, earliestCommitToRetainOption));
// add active files
// 以及所有的活动文件
List<HoodieFileGroup> fileGroups = fileSystemView.getAllFileGroups(partitionPath).collect(Collectors.toList());
for (HoodieFileGroup fileGroup : fileGroups) {
List<FileSlice> fileSliceList = fileGroup.getAllFileSlices().collect(Collectors.toList());
if (fileSliceList.isEmpty()) {
continue;
}
// 最新的版本
String lastVersion = fileSliceList.get(0).getBaseInstantTime();
// 在EarliestCommitToRetain之前的最新的版本
String lastVersionBeforeEarliestCommitToRetain =
getLatestVersionBeforeCommit(fileSliceList, earliestCommitToRetain);
// Ensure there are more than 1 version of the file (we only clean old files from updates)
// i.e always spare the last commit.
// 对于file slice,保留最后一个版本,排除savepoint相关的文件
for (FileSlice aSlice : fileSliceList) {
Option<HoodieBaseFile> aFile = aSlice.getBaseFile();
String fileCommitTime = aSlice.getBaseInstantTime();
if (aFile.isPresent() && savepointedFiles.contains(aFile.get().getFileName())) {
// do not clean up a savepoint data file
continue;
}
// 不删除最新版本的文件
if (policy == HoodieCleaningPolicy.KEEP_LATEST_COMMITS) {
// Dont delete the latest commit and also the last commit before the earliest commit we
// are retaining
// The window of commit retain == max query run time. So a query could be running which
// still
// uses this file.
if (fileCommitTime.equals(lastVersion) || (fileCommitTime.equals(lastVersionBeforeEarliestCommitToRetain))) {
// move on to the next file
continue;
}
} else if (policy == HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS) {
// This block corresponds to KEEP_LATEST_BY_HOURS policy
// Do not delete the latest commit.
if (fileCommitTime.equals(lastVersion)) {
// move on to the next file
continue;
}
}
// Always keep the last commit
// 如果file slice在接下来的compaction不需要被使用
// 并且file slice commit时间在earliestCommitToRetain之前
if (!isFileSliceNeededForPendingMajorOrMinorCompaction(aSlice) && HoodieTimeline
.compareTimestamps(earliestCommitToRetain.getTimestamp(), HoodieTimeline.GREATER_THAN, fileCommitTime)) {
// this is a commit, that should be cleaned.
// 需要清理这个文件
aFile.ifPresent(hoodieDataFile -> {
deletePaths.add(new CleanFileInfo(hoodieDataFile.getPath(), false));
if (hoodieDataFile.getBootstrapBaseFile().isPresent() && config.shouldCleanBootstrapBaseFile()) {
deletePaths.add(new CleanFileInfo(hoodieDataFile.getBootstrapBaseFile().get().getPath(), true));
}
});
// clean the log files for the commits, which contain cdc log files in cdc scenario
// and normal log files for mor tables.
// 清理这个slice对应的所有log文件
deletePaths.addAll(aSlice.getLogFiles().map(lf -> new CleanFileInfo(lf.getPath().toString(), false))
.collect(Collectors.toList()));
}
}
}
// if there are no valid file groups for the partition, mark it to be deleted
// 如果分区中所有file group为空,标记这个分区可以删除
if (fileGroups.isEmpty()) {
toDeletePartition = true;
}
}
return Pair.of(toDeletePartition, deletePaths);
}
保留最新N个版本的逻辑和前面的不同。接下来分析getFilesToCleanKeepingLatestVersions
方法。
private Pair<Boolean, List<CleanFileInfo>> getFilesToCleanKeepingLatestVersions(String partitionPath) {
LOG.info("Cleaning " + partitionPath + ", retaining latest " + config.getCleanerFileVersionsRetained()
+ " file versions. ");
List<CleanFileInfo> deletePaths = new ArrayList<>();
// Collect all the datafiles savepointed by all the savepoints
// 获取所有savepoint对应的数据文件
// 这些文件不能被删除掉
List<String> savepointedFiles = hoodieTable.getSavepointTimestamps().stream()
.flatMap(this::getSavepointedDataFiles)
.collect(Collectors.toList());
// In this scenario, we will assume that once replaced a file group automatically becomes eligible for cleaning completely
// In other words, the file versions only apply to the active file groups.
// 所有替换掉的文件都可以被清理
deletePaths.addAll(getReplacedFilesEligibleToClean(savepointedFiles, partitionPath, Option.empty()));
boolean toDeletePartition = false;
List<HoodieFileGroup> fileGroups = fileSystemView.getAllFileGroups(partitionPath).collect(Collectors.toList());
for (HoodieFileGroup fileGroup : fileGroups) {
// 获取需要保留最近多少个版本的file slice文件。对应配置项为hoodie.cleaner.fileversions.retained
int keepVersions = config.getCleanerFileVersionsRetained();
// do not cleanup slice required for pending compaction
// 接下来压缩操作涉及到的file slice不能够被清理
Iterator<FileSlice> fileSliceIterator =
fileGroup.getAllFileSlices()
.filter(fs -> !isFileSliceNeededForPendingMajorOrMinorCompaction(fs))
.iterator();
if (isFileGroupInPendingMajorOrMinorCompaction(fileGroup)) {
// We have already saved the last version of file-groups for pending compaction Id
// 如果接下来的压缩涉及到此file group
// compaction完毕会生成一个新版本的文件,因此这里为了确保保留keepVersions个版本的文件,先自减1
// 下面的步骤中就会有多一个老版本的file slice被清理
// 这样在clean和compaction运行之后,保留的版本数正好为keepVersions个
keepVersions--;
}
while (fileSliceIterator.hasNext() && keepVersions > 0) {
// Skip this most recent version
// 跳过keepVersions次next
// 只清理后面的file slice,相当于保留了最近keepVersions个版本
fileSliceIterator.next();
keepVersions--;
}
// Delete the remaining files
// 剩下的文件都需要被清理
while (fileSliceIterator.hasNext()) {
FileSlice nextSlice = fileSliceIterator.next();
Option<HoodieBaseFile> dataFile = nextSlice.getBaseFile();
if (dataFile.isPresent() && savepointedFiles.contains(dataFile.get().getFileName())) {
// do not clean up a savepoint data file
continue;
}
deletePaths.addAll(getCleanFileInfoForSlice(nextSlice));
}
}
// if there are no valid file groups for the partition, mark it to be deleted
// 如果分区中所有file group为空,标记这个分区可以删除
if (fileGroups.isEmpty()) {
toDeletePartition = true;
}
return Pair.of(toDeletePartition, deletePaths);
}
到这里clean计划的逻辑分析完毕。我们已经完成通过清理策略和保留commit配置计算出需要清理的partition及其下面的文件这一步。接下来是执行clean计划。
Flink的clean逻辑
HoodieFlinkCopyOnWriteTable::clean
入口方法为HoodieFlinkCopyOnWriteTable::clean
。代码如下:
@Override
public HoodieCleanMetadata clean(HoodieEngineContext context, String cleanInstantTime) {
return new CleanActionExecutor(context, config, this, cleanInstantTime).execute();
}
Flink的clean操作由CleanActionExecutor
承担。我们从它的execute
方法开始分析。
CleanActionExecutor
execute
方法执行clean计划,清理旧的schema和执行pending的clean计划。内容如下:
@Override
public HoodieCleanMetadata execute() {
List<HoodieCleanMetadata> cleanMetadataList = new ArrayList<>();
// If there are inflight(failed) or previously requested clean operation, first perform them
List<HoodieInstant> pendingCleanInstants = table.getCleanTimeline()
.filterInflightsAndRequested().getInstants();
// 检查是否有已存在的pending和requested状态的clean instant
if (pendingCleanInstants.size() > 0) {
// try to clean old history schema.
try {
// 该类用来保存schema,或者读取schema历史(和schema evolution相关,位于.hoodie/.schema目录)和清理旧的schema文件
FileBasedInternalSchemaStorageManager fss = new FileBasedInternalSchemaStorageManager(table.getMetaClient());
// 清理旧的schema文件
fss.cleanOldFiles(pendingCleanInstants.stream().map(is -> is.getTimestamp()).collect(Collectors.toList()));
} catch (Exception e) {
// we should not affect original clean logic. Swallow exception and log warn.
LOG.warn("failed to clean old history schema");
}
pendingCleanInstants.forEach(hoodieInstant -> {
// 如果instant详情为空,删除这个instant
if (table.getCleanTimeline().isEmpty(hoodieInstant)) {
table.getActiveTimeline().deleteEmptyInstantIfExists(hoodieInstant);
} else {
LOG.info("Finishing previously unfinished cleaner instant=" + hoodieInstant);
try {
// 运行pending的clean操作
// 保存clean结果到cleanMetadataList
cleanMetadataList.add(runPendingClean(table, hoodieInstant));
} catch (Exception e) {
LOG.warn("Failed to perform previous clean operation, instant: " + hoodieInstant, e);
}
}
// 重载timeline
table.getMetaClient().reloadActiveTimeline();
// 如果启用了metadata table,强制同步
// metadata table维护了文件系统试图,避免操作大表的时候list file操作成为性能瓶颈
// clean操作涉及到文件变更,因此需要强制通过metadata table
if (config.isMetadataTableEnabled()) {
table.getHoodieView().sync();
}
});
}
// return the last clean metadata for now
// TODO (NA) : Clean only the earliest pending clean just like how we do for other table services
// This requires the CleanActionExecutor to be refactored as BaseCommitActionExecutor
// 返回最后一个clean metadata
return cleanMetadataList.size() > 0 cleanMetadataList.get(cleanMetadataList.size() - 1) : null;
}
执行clean计划逻辑位于runPendingClean
方法。内容如下:
HoodieCleanMetadata runPendingClean(HoodieTable<T, I, K, O> table, HoodieInstant cleanInstant) {
try {
HoodieCleanerPlan cleanerPlan = CleanerUtils.getCleanerPlan(table.getMetaClient(), cleanInstant);
return runClean(table, cleanInstant, cleanerPlan);
} catch (IOException e) {
throw new HoodieIOException(e.getMessage(), e);
}
}
这个方法首先获取clean计划,然后执行这个计划。执行的逻辑位于runClean
方法。该方法将instant转换为inflight状态,执行clean,成功后写入clean元数据,将instant转换为completed状态。
private HoodieCleanMetadata runClean(HoodieTable<T, I, K, O> table, HoodieInstant cleanInstant, HoodieCleanerPlan cleanerPlan) {
// 检查clean instant状态必须是requested或者是inflight
ValidationUtils.checkArgument(cleanInstant.getState().equals(HoodieInstant.State.REQUESTED)
|| cleanInstant.getState().equals(HoodieInstant.State.INFLIGHT));
HoodieInstant inflightInstant = null;
try {
final HoodieTimer timer = HoodieTimer.start();
// 如果是requested状态,转换为inflight状态
if (cleanInstant.isRequested()) {
inflightInstant = table.getActiveTimeline().transitionCleanRequestedToInflight(cleanInstant,
TimelineMetadataUtils.serializeCleanerPlan(cleanerPlan));
} else {
inflightInstant = cleanInstant;
}
// 执行清理计划,后面分析
List<HoodieCleanStat> cleanStats = clean(context, cleanerPlan);
if (cleanStats.isEmpty()) {
return HoodieCleanMetadata.newBuilder().build();
}
table.getMetaClient().reloadActiveTimeline();
// 生成clean的元数据,包含保留的最早instant,开始清理时间,耗时,删除的文件数,清理的分区元数据信息等
HoodieCleanMetadata metadata = CleanerUtils.convertCleanMetadata(
inflightInstant.getTimestamp(),
Option.of(timer.endTimer()),
cleanStats
);
// 事务加锁
if (!skipLocking) {
this.txnManager.beginTransaction(Option.of(inflightInstant), Option.empty());
}
// 写入clean的元数据
writeTableMetadata(metadata, inflightInstant.getTimestamp());
// 将inflight instant转换为completed状态
table.getActiveTimeline().transitionCleanInflightToComplete(inflightInstant,
TimelineMetadataUtils.serializeCleanMetadata(metadata));
LOG.info("Marked clean started on " + inflightInstant.getTimestamp() + " as complete");
return metadata;
} catch (IOException e) {
throw new HoodieIOException("Failed to clean up after commit", e);
} finally {
// 事务锁释放
if (!skipLocking) {
this.txnManager.endTransaction(Option.of(inflightInstant));
}
}
}
clean计划的执行位于clean
方法。通过cleaner plan读取每个需要清理的partition和这些partition中需要清理的文件。然后删除这些文件。如果分区本身需要删除的化也一并删除。最后将clean操作的统计数据组装成为HoodieCleanStat
。每个partition的clean操作对应一个HoodieCleanStat
对象。
List<HoodieCleanStat> clean(HoodieEngineContext context, HoodieCleanerPlan cleanerPlan) {
int cleanerParallelism = Math.min(
cleanerPlan.getFilePathsToBeDeletedPerPartition().values().stream().mapToInt(List::size).sum(),
config.getCleanerParallelism());
LOG.info("Using cleanerParallelism: " + cleanerParallelism);
context.setJobStatus(this.getClass().getSimpleName(), "Perform cleaning of table: " + config.getTableName());
// 从clean计划中获取每个partition需要删除的文件
Stream<Pair<String, CleanFileInfo>> filesToBeDeletedPerPartition =
cleanerPlan.getFilePathsToBeDeletedPerPartition().entrySet().stream()
.flatMap(x -> x.getValue().stream().map(y -> new ImmutablePair<>(x.getKey(),
new CleanFileInfo(y.getFilePath(), y.getIsBootstrapBaseFile()))));
// 删除这些文件
Stream<ImmutablePair<String, PartitionCleanStat>> partitionCleanStats =
context.mapPartitionsToPairAndReduceByKey(filesToBeDeletedPerPartition,
iterator -> deleteFilesFunc(iterator, table), PartitionCleanStat::merge, cleanerParallelism);
Map<String, PartitionCleanStat> partitionCleanStatsMap = partitionCleanStats
.collect(Collectors.toMap(Pair::getKey, Pair::getValue));
// 获取需要删除的分区,并删除它们
List<String> partitionsToBeDeleted = cleanerPlan.getPartitionsToBeDeleted() != null cleanerPlan.getPartitionsToBeDeleted() : new ArrayList<>();
partitionsToBeDeleted.forEach(entry -> {
try {
deleteFileAndGetResult(table.getMetaClient().getFs(), table.getMetaClient().getBasePath() + "/" + entry);
} catch (IOException e) {
LOG.warn("Partition deletion failed " + entry);
}
});
// Return PartitionCleanStat for each partition passed.
// 组装并返回每个文件的清理状态
return cleanerPlan.getFilePathsToBeDeletedPerPartition().keySet().stream().map(partitionPath -> {
PartitionCleanStat partitionCleanStat = partitionCleanStatsMap.containsKey(partitionPath)
partitionCleanStatsMap.get(partitionPath)
: new PartitionCleanStat(partitionPath);
HoodieActionInstant actionInstant = cleanerPlan.getEarliestInstantToRetain();
return HoodieCleanStat.newBuilder().withPolicy(config.getCleanerPolicy()).withPartitionPath(partitionPath)
.withEarliestCommitRetained(Option.ofNullable(
actionInstant != null
new HoodieInstant(HoodieInstant.State.valueOf(actionInstant.getState()),
actionInstant.getAction(), actionInstant.getTimestamp())
: null))
.withLastCompletedCommitTimestamp(cleanerPlan.getLastCompletedCommitTimestamp())
.withDeletePathPattern(partitionCleanStat.deletePathPatterns())
.withSuccessfulDeletes(partitionCleanStat.successDeleteFiles())
.withFailedDeletes(partitionCleanStat.failedDeleteFiles())
.withDeleteBootstrapBasePathPatterns(partitionCleanStat.getDeleteBootstrapBasePathPatterns())
.withSuccessfulDeleteBootstrapBaseFiles(partitionCleanStat.getSuccessfulDeleteBootstrapBaseFiles())
.withFailedDeleteBootstrapBaseFiles(partitionCleanStat.getFailedDeleteBootstrapBaseFiles())
.isPartitionDeleted(partitionsToBeDeleted.contains(partitionPath))
.build();
}).collect(Collectors.toList());
}
到此为止Hudi表的清理操作分析完毕。
参考文献
Cleaning | Apache Hudi