前言
相信我们在初学Flink SQL时,多少遇到过像这样的错误信息:
org.apache.flink.table.api.TableException:
X[算子名] doesn't support consuming update and delete changes which is produced by node Y[算子名]
为什么有些下游算子不能接受上游算子发来的UPDATE和DELETE消息呢?本文以1.13版本为准来简单地捋一下。
回顾ChangelogMode
笔者之前写过一篇自定义Flink SQL Connector的简明教程,其中提到在定义DynamicTableSink
(以及ScanTableSource
)的时候,都需要覆写getChangelogMode()
方法,告诉Planner这个Connector可以接受或产生的数据变化类型。变化的标记由四种RowKind
表示,即INSERT(+I)、UPDATE_BEFORE(-U)、UPDATE_AFTER(+U)和DELETE(-D):
/** Insertion operation. */
INSERT("+I", (byte) 0),
/**
* Update operation with the previous content of the updated row.
*
* <p>This kind SHOULD occur together with {@link #UPDATE_AFTER} for modelling an update that
* needs to retract the previous row first. It is useful in cases of a non-idempotent update,
* i.e., an update of a row that is not uniquely identifiable by a key.
*/
UPDATE_BEFORE("-U", (byte) 1),
/**
* Update operation with new content of the updated row.
*
* <p>This kind CAN occur together with {@link #UPDATE_BEFORE} for modelling an update that
* needs to retract the previous row first. OR it describes an idempotent update, i.e., an
* update of a row that is uniquely identifiable by a key.
*/
UPDATE_AFTER("+U", (byte) 2),
/** Deletion operation. */
DELETE("-D", (byte) 3);
但是,ChangelogMode
只能作用于Connector,中间算子(其实也包含Source/Sink)产生和接受哪些类型的变化在Planner内部是如何定义的?下面了解一下相关的两种RelTrait
s。
ModifyKindSet / UpdateKind Trait
复习一下之前讲过的RelTrait
和RelTraitDef
的含义:
A set of physical properties & their definitions carried by a relational expression.
站在RelNode
的角度上讲,ChangelogMode
确实可以作为附加在其上的物理属性。Blink Planner的物理计划层使用了两个RelTrait
来承载数据变化的语义。第一个是ModifyKindSetTrait
,表示INSERT(I)、UPDATE(U)和DELETE(D)三者组成的集合,部分代码如下,比较容易理解:
object ModifyKindSetTrait {
/**
* An empty [[ModifyKindSetTrait]] which doesn't contain any [[ModifyKind]].
*/
val EMPTY = new ModifyKindSetTrait(ModifyKindSet.newBuilder().build())
/**
* Insert-only [[ModifyKindSetTrait]].
*/
val INSERT_ONLY = new ModifyKindSetTrait(ModifyKindSet.INSERT_ONLY)
/**
* A modify [[ModifyKindSetTrait]] that contains all change operations.
*/
val ALL_CHANGES = new ModifyKindSetTrait(ModifyKindSet.ALL_CHANGES)
/**
* Creates an instance of [[ModifyKindSetTrait]] from th given [[ChangelogMode]].
*/
def fromChangelogMode(changelogMode: ChangelogMode): ModifyKindSetTrait = {
val builder = ModifyKindSet.newBuilder
changelogMode.getContainedKinds.foreach {
case RowKind.INSERT => builder.addContainedKind(ModifyKind.INSERT)
case RowKind.DELETE => builder.addContainedKind(ModifyKind.DELETE)
case _ => builder.addContainedKind(ModifyKind.UPDATE) // otherwise updates
}
new ModifyKindSetTrait(builder.build)
}
}
第二个则是UpdateKindTrait
,表示UPDATE_BEFORE(-U)和UPDATE_AFTER(+U),也不难。注意它除了可以从ChangelogMode
转换而来,还可以从ModifyKindSet
转换而来:
object UpdateKindTrait {
/**
* An [[UpdateKindTrait]] that describes the node doesn't provide any kind of updates
* as a provided trait, or requires nothing about kind of updates as a required trait.
*
* <p>It also indicates that the [[ModifyKindSetTrait]] of current node doesn't contain
* [[ModifyKind#UPDATE]] operation.
*/
val NONE = new UpdateKindTrait(UpdateKind.NONE)
/**
* An [[UpdateKindTrait]] that describes the node produces update changes just as a
* single row of [[org.apache.flink.types.RowKind#UPDATE_AFTER]]
*/
val ONLY_UPDATE_AFTER = new UpdateKindTrait(UpdateKind.ONLY_UPDATE_AFTER)
/**
* An [[UpdateKindTrait]] that describes the node produces update changes consists of
* a row of [[org.apache.flink.types.RowKind#UPDATE_BEFORE]] and
* [[org.apache.flink.types.RowKind#UPDATE_AFTER]].
*/
val BEFORE_AND_AFTER = new UpdateKindTrait(UpdateKind.BEFORE_AND_AFTER)
/**
* Returns ONLY_UPDATE_AFTER [[UpdateKindTrait]] if there is update changes.
* Otherwise, returns NONE [[UpdateKindTrait]].
*/
def onlyAfterOrNone(modifyKindSet: ModifyKindSet): UpdateKindTrait = {
val updateKind = if (modifyKindSet.contains(ModifyKind.UPDATE)) {
UpdateKind.ONLY_UPDATE_AFTER
} else {
UpdateKind.NONE
}
new UpdateKindTrait(updateKind)
}
/**
* Returns BEFORE_AND_AFTER [[UpdateKindTrait]] if there is update changes.
* Otherwise, returns NONE [[UpdateKindTrait]].
*/
def beforeAfterOrNone(modifyKindSet: ModifyKindSet): UpdateKindTrait = {
val updateKind = if (modifyKindSet.contains(ModifyKind.UPDATE)) {
UpdateKind.BEFORE_AND_AFTER
} else {
UpdateKind.NONE
}
new UpdateKindTrait(updateKind)
}
/**
* Creates an instance of [[UpdateKindTrait]] from the given [[ChangelogMode]].
*/
def fromChangelogMode(changelogMode: ChangelogMode): UpdateKindTrait = {
val hasUpdateBefore = changelogMode.contains(RowKind.UPDATE_BEFORE)
val hasUpdateAfter = changelogMode.contains(RowKind.UPDATE_AFTER)
(hasUpdateBefore, hasUpdateAfter) match {
case (true, true) => BEFORE_AND_AFTER
case (false, true) => ONLY_UPDATE_AFTER
case (true, false) =>
throw new IllegalArgumentException("Unsupported changelog mode: " +
ChangelogPlanUtils.stringifyChangelogMode(Some(changelogMode)))
case (false, false) => NONE
}
}
}
RelTrait相容性
补充一个之前的Calcite入门讲义里略去的点,就是RelTrait
的核心方法:
boolean satisfies(RelTrait trait);
它用于判断此RelTrait
与另外一个RelTrait
的相容性,亦即T1是否满足T2的约束。显然,如果T1与T2相同,或者T1比T2更严格,那么此方法返回true
,否则返回false
。举个栗子,对于RelCollation
而言,(ORDER BY a, b) satisfies (ORDER BY a)
就是成立的,反过来则不成立。
ModifyKindSetTrait#satisfies()
方法的定义如下,注释写得很清楚,即T1是T2的子集:
override def satisfies(relTrait: RelTrait): Boolean = relTrait match {
case other: ModifyKindSetTrait =>
// it’s satisfied when modify kinds are included in the required set,
// e.g. [I,U] satisfy [I,U,D]
// [I,U,D] not satisfy [I,D]
this.modifyKindSet.getContainedKinds.forall(other.modifyKindSet.contains)
case _ => false
}
UpdateKindTrait#satisfies()
则要求两者完全相同:
override def satisfies(relTrait: RelTrait): Boolean = relTrait match {
case other: UpdateKindTrait =>
// should totally match
other.updateKind == this.updateKind
case _ => false
}
接下来就可以进入Blink Planner的相关逻辑了。
物理计划阶段的ChangelogMode推断
Blink Planner通过名为FlinkChangelogModeInferenceProgram
的优化程序来为每个StreamPhysicalRel
推断出ChangelogMode
信息,并检查产生的ModifyKindSetTrait
和UpdateKindTrait
的上下游相容性。主要的逻辑分为两步:
// step1: satisfy ModifyKindSet trait
val physicalRoot = root.asInstanceOf[StreamPhysicalRel]
val rootWithModifyKindSet = SATISFY_MODIFY_KIND_SET_TRAIT_VISITOR.visit(
physicalRoot,
// we do not propagate the ModifyKindSet requirement and requester among blocks
// set default ModifyKindSet requirement and requester for root
ModifyKindSetTrait.ALL_CHANGES,
"ROOT")
// step2: satisfy UpdateKind trait
val rootModifyKindSet = getModifyKindSet(rootWithModifyKindSet)
// use the required UpdateKindTrait from parent blocks
val requiredUpdateKindTraits = if (rootModifyKindSet.contains(ModifyKind.UPDATE)) {
if (context.isUpdateBeforeRequired) {
Seq(UpdateKindTrait.BEFORE_AND_AFTER)
} else {
// update_before is not required, and input contains updates
// try ONLY_UPDATE_AFTER first, and then BEFORE_AND_AFTER
Seq(UpdateKindTrait.ONLY_UPDATE_AFTER, UpdateKindTrait.BEFORE_AND_AFTER)
}
} else {
// there is no updates
Seq(UpdateKindTrait.NONE)
}
可见是通过两个特殊定义的Visitor(参见访问者模式)对物理计划树进行遍历与转换。以SatisfyModifyKindSetTraitVisitor
为例,它的visit()
方法代码框架如下,也体现了Scala模式匹配的强大之处。
def visit(
rel: StreamPhysicalRel,
requiredTrait: ModifyKindSetTrait,
requester: String): StreamPhysicalRel = rel match {
case sink: StreamPhysicalSink =>
val name = s"Table sink '${sink.tableIdentifier.asSummaryString()}'"
val queryModifyKindSet = deriveQueryDefaultChangelogMode(sink.getInput, name)
val sinkRequiredTrait = ModifyKindSetTrait.fromChangelogMode(
sink.tableSink.getChangelogMode(queryModifyKindSet))
val children = visitChildren(sink, sinkRequiredTrait, name)
val sinkTrait = sink.getTraitSet.plus(ModifyKindSetTrait.EMPTY)
// ignore required trait from context, because sink is the true root
sink.copy(sinkTrait, children).asInstanceOf[StreamPhysicalRel]
case sink: StreamPhysicalLegacySink[_] => // ......
case deduplicate: StreamPhysicalDeduplicate =>
// deduplicate only support insert only as input
val children = visitChildren(deduplicate, ModifyKindSetTrait.INSERT_ONLY)
val providedTrait = if (!deduplicate.keepLastRow && !deduplicate.isRowtime) {
// only proctime first row deduplicate does not produce UPDATE changes
ModifyKindSetTrait.INSERT_ONLY
} else {
// other deduplicate produce update changes
ModifyKindSetTrait.ALL_CHANGES
}
createNewNode(deduplicate, children, providedTrait, requiredTrait, requester)
case agg: StreamPhysicalGroupAggregate =>
// agg support all changes in input
val children = visitChildren(agg, ModifyKindSetTrait.ALL_CHANGES)
val inputModifyKindSet = getModifyKindSet(children.head)
val builder = ModifyKindSet.newBuilder()
.addContainedKind(ModifyKind.INSERT)
.addContainedKind(ModifyKind.UPDATE)
if (inputModifyKindSet.contains(ModifyKind.UPDATE) ||
inputModifyKindSet.contains(ModifyKind.DELETE)) {
builder.addContainedKind(ModifyKind.DELETE)
}
val providedTrait = new ModifyKindSetTrait(builder.build())
createNewNode(agg, children, providedTrait, requiredTrait, requester)
case tagg: StreamPhysicalGroupTableAggregateBase => // ......
case agg: StreamPhysicalPythonGroupAggregate => // ......
case window: StreamPhysicalGroupWindowAggregateBase =>
// WindowAggregate and WindowTableAggregate support insert-only in input
val children = visitChildren(window, ModifyKindSetTrait.INSERT_ONLY)
val builder = ModifyKindSet.newBuilder()
.addContainedKind(ModifyKind.INSERT)
if (window.emitStrategy.produceUpdates) {
builder.addContainedKind(ModifyKind.UPDATE)
}
val providedTrait = new ModifyKindSetTrait(builder.build())
createNewNode(window, children, providedTrait, requiredTrait, requester)
case _: StreamPhysicalWindowAggregate | _: StreamPhysicalWindowRank =>
// WindowAggregate and WindowRank support insert-only in input
val children = visitChildren(rel, ModifyKindSetTrait.INSERT_ONLY)
val providedTrait = ModifyKindSetTrait.INSERT_ONLY
createNewNode(rel, children, providedTrait, requiredTrait, requester)
case limit: StreamPhysicalLimit => // ......
case _: StreamPhysicalRank | _: StreamPhysicalSortLimit => // ......
case sort: StreamPhysicalSort => // ......
case cep: StreamPhysicalMatch => // ......
case _: StreamPhysicalTemporalSort | _: StreamPhysicalIntervalJoin |
_: StreamPhysicalOverAggregate | _: StreamPhysicalPythonOverAggregate => // ......
case join: StreamPhysicalJoin => // ......
case windowJoin: StreamPhysicalWindowJoin => // ......
case temporalJoin: StreamPhysicalTemporalJoin => // ......
case _: StreamPhysicalCalcBase | _: StreamPhysicalCorrelateBase |
_: StreamPhysicalLookupJoin | _: StreamPhysicalExchange |
_: StreamPhysicalExpand | _: StreamPhysicalMiniBatchAssigner |
_: StreamPhysicalWatermarkAssigner | _: StreamPhysicalWindowTableFunction =>
// transparent forward requiredTrait to children
val children = visitChildren(rel, requiredTrait, requester)
val childrenTrait = children.head.getTraitSet.getTrait(ModifyKindSetTraitDef.INSTANCE)
// forward children mode
createNewNode(rel, children, childrenTrait, requiredTrait, requester)
case union: StreamPhysicalUnion => // ......
case normalize: StreamPhysicalChangelogNormalize => // ......
case ts: StreamPhysicalTableSourceScan =>
// ScanTableSource supports produces updates and deletions
val providedTrait = ModifyKindSetTrait.fromChangelogMode(ts.tableSource.getChangelogMode)
createNewNode(ts, List(), providedTrait, requiredTrait, requester)
case _: StreamPhysicalDataStreamScan | _: StreamPhysicalLegacyTableSourceScan |
_: StreamPhysicalValues => // ......
case scan: StreamPhysicalIntermediateTableScan => // ......
case _ =>
throw new UnsupportedOperationException(
s"Unsupported visit for ${rel.getClass.getSimpleName}")
}
可见,这个访问者以Sink为根开始推断ChangelogMode
,requiredTrait
参数就是父节点需要子节点满足的ModifyKindSetTrait
,一直传播到Source为止。对于那些不会对变化语义产生影响的节点(如Calc
、Exchange
等),则会直接将requiredTrait
转发到子节点。
在这过程中,若有父节点和子节点Trait不相容的情况出现,就会抛出文章开头所述的错误信息,见createNewNode()
方法:
private def createNewNode(
node: StreamPhysicalRel,
children: List[StreamPhysicalRel],
providedTrait: ModifyKindSetTrait,
requiredTrait: ModifyKindSetTrait,
requestedOwner: String): StreamPhysicalRel = {
if (!providedTrait.satisfies(requiredTrait)) {
val diff = providedTrait.modifyKindSet.minus(requiredTrait.modifyKindSet)
val diffString = diff.getContainedKinds
.toList.sorted // for deterministic error message
.map(_.toString.toLowerCase)
.mkString(" and ")
// creates a new node based on the new children, to have a more correct node description
// e.g. description of GroupAggregate is based on the ModifyKindSetTrait of children
val tempNode = node.copy(node.getTraitSet, children).asInstanceOf[StreamPhysicalRel]
val nodeString = tempNode.getRelDetailedDescription
throw new TableException(
s"$requestedOwner doesn't support consuming $diffString changes " +
s"which is produced by node $nodeString")
}
val newTraitSet = node.getTraitSet.plus(providedTrait)
node.copy(newTraitSet, children).asInstanceOf[StreamPhysicalRel]
}
为了方便理解,来用一条包含去重+窗口聚合逻辑的SQL语句做说明:
SELECT userId, COUNT(DISTINCT orderId)
FROM (
SELECT * FROM (
SELECT *, ROW_NUMBER() OVER(PARTITION BY suborderid ORDER BY procTime [ASC | DESC]) AS rn
FROM rtdw_ods.kafka_order_done_log /*+ OPTIONS('scan.startup.mode'='latest-offset') */
) WHERE rn = 1
) GROUP BY userId, TUMBLE(procTime, INTERVAL '5' SECOND);
经过试验可以发现,如果去重保留第一条数据(即ORDER BY procTime ASC
),那么这条语句可以正常执行。但若是保留最后一条数据(即ORDER BY procTime DESC
),就会抛出如下的异常:
Exception in thread "main" org.apache.flink.table.api.TableException: StreamPhysicalGroupWindowAggregate doesn't support consuming update and delete changes which is produced by node Deduplicate(keep=[LastRow], key=[suborderid], order=[PROCTIME])
at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.createNewNode(FlinkChangelogModeInferenceProgram.scala:389)
at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:166)
......
再次参考代码可得知,GroupWindowAggregate
只能接受子节点的INSERT语义,但是Deduplicate
在保留最后一条的情况下会产生回撤语义,故无法执行。
SatisfyUpdateKindTraitVisitor
的处理方式类似,不再赘述。
Hack一下
通过查看执行层的GroupWindowAggregate
代码,可知它其实是能够支持回撤流输入的。我们只需要对FlinkChangelogModeInferenceProgram
做三处简单的改动就能达到目的:
-
SatisfyModifyKindSetTraitVisitor#visit()
方法:
将StreamPhysicalGroupWindowAggregateBase
判断分支中visitChildren
方法的requiredChildrenTrait
参数由ModifyKindSetTrait.INSERT_ONLY
改成ModifyKindSetTrait.ALL_CHANGES
,表示它接受所有变更类型。 -
SatisfyUpdateKindTraitVisitor#visit()
方法:- 将第3个判断分支的条件最后加上
| _: StreamPhysicalGroupWindowAggregateBase
,表示它接受UpdateKindTrait.BEFORE_AND_AFTER
(对于回撤流)和UpdateKindTrait.NONE
(对于只追加流)。 - 相应地,在第4个判断分支的条件中删掉
_: StreamPhysicalGroupWindowAggregate | _: StreamPhysicalGroupWindowTableAggregate
两者。
- 将第3个判断分支的条件最后加上
重新构建flink-table-planner-blink模块,再提交上一节的保留最后一条去重+窗口聚合的SQL,发现可以正常执行,且结果正确。
The End
明后两天就是Flink Forward Asia 2021 Online咯~
晚安晚安。