当前位置: 首页>编程语言>正文

flinksql sink 并行度设置 flink并行度设置规则


FLink-10-Flink相关概念-并行度/task/subtask/taskslot

  • Flink相关概念-并行度/task/subtask/taskslot
  • 1.并行度
  • 2.task 与算子链(operator chain)相关概念:
  • 3.相关API介绍
  • 1.设置并行度的算子
  • 2.设置槽位共享组的算子
  • 3.主动隔离算子链的算子
  • 4.分区partition算子 - 数据分发策略


Flink相关概念-并行度/task/subtask/taskslot

flink Job运行在集群上,最小单位是TaskSlot槽位。

1.并行度

  • 一个job可以有一个或者多个task,这主要取决于代码中每个算子的并行度以及代码逻辑的设置,每个task可以单独设置并行度,并行度>1的话,此task就会产生多个并行实例(subTask)并行运行,subtask会被分配到不同的taskSlot中运行。
  • 基本概念解析:
  • 用户通过算子 api 所开发的代码,会被 flink 任务提交客户端解析成 jobGraph
  • 然后,jobGraph 提交到集群 JobManager,转化成 ExecutionGraph(并行化后的执行图)
  • 然后,ExecutionGraph 中的各个 task 会以多并行实例(subTask)部署到 taskmanager 上执行;
  • subTask 运行的位置是 taskmanager 所提供的槽位(task slot),槽位简单理解就是线程;
  • 同一个Task的多个并行示例,不能放在同一个taskslot上执行。
    同时,一个taskSlot,可以运行多个不同Task的各自一个并行实例。
  • taskSlot实际上就是线程运行资源的隔离,可以简单理解为线程。
  • job中只要有一个task的并行度 > 集群可用的总槽位数,这个job直接就提交失败。
  • 不同flink-job的不同task的并行实例不能放在同一个taskslot中。每个taskslot只能服务于一个flink-job。
  • taskslot的数量取决于服务器集群的配置,一般建议同cpu核数一致,一旦涉及到taskslot的flink配置定好了后,taskslot这个数值就是固定的。
  • 基于代码维度来进行分析,task与taskSlot之间的关系,如下图所示:
  • 当前job作业中task 的最大并行度 <= 集群中展示的taskSlot数量。

2.task 与算子链(operator chain)相关概念:

  • 上下游算子,能否 chain 在一起,放在一个 Task 中,取决于如下 3 个条件:
  • 1.上下游算子实例间是 oneToOne 数据传输(forward模式传输),如果上下游之间不能OneToOne,就不能组成算子链;
  • 2.上下游算子并行度相同;
  • 3.上下游算子属于相同的 slotSharingGroup(槽位共享组)
  • 关于slotSharingGroup(槽位共享组),槽位共享组是由开发人员在设计代码时决定的;
  • 代码API:op().slotSharingGroup(“share1”);后面的算子设置不同的槽位组,后面如果不设置槽位组的话,就跟前面的槽位组一样,这样前面有压力的算子对应的task就会分配到与其他槽位组不同的taskSlot上面了。
  • 属于同一个共享组的算子,允许共享槽位
  • 属于不同共享组的算子,决不允许共享操作。
  • 之所以设计槽位共享组的底层思考:因为在真实作业中,有两个task运行任务很重,基于刚刚的两个条件,这两个task完全可以分配到一个taskSlot中,但是分配到一起的话,此taskSlot会负担很重,造成运算缓慢等问题,所以就设计出来“槽位共享组”的概念,避免资源分配不均衡的情况出现。
  • 逻辑解释如下图:

  • 问题:map().map() 这两个算子,在默认情况下,是会被绑定成一个算子链,但是,实际开发中,这两个map算子运算量很大,不想让其绑定在一个算子链上,应该怎么做?然后甚至不想让他们放在相同的taskSlot槽位内运行,应该怎么做?
  • 针对问题1:可以设置不同的并行度,两者直接并行度不同,即可将其分为两个task;也可以在算子后面使用 disableChaining(),手动控制断开链。则两个不同的task不会形成算子链。
  • 针对问题2:可以设置不同的槽位共享组,即可实现两个task的同时也可实现不在一个taskSlot槽位上。
  • 每一个算子的逻辑都可以成为一个独立的task,但是这样处理,不同task之间有可能进行网络的传输来传递数据结果,造成了不必要的计算时间的损失;
  • 同时,多个算子的逻辑可以串行在一个Task中调用,这叫做OperatorChain ,即把多个算子的计算逻辑放在一个task中调用,这样就避免了刚刚说的网络传输的缺点。
  • task其实是一个类,subtask其实是一个对象,task的一个并行实例在flink中叫做subTask,task是由一个或者多个算子组成,subtask是task的一个运行实例,task里面的算子等相关逻辑被subtask来实际运行;
  • subtask可以理解为是真正干活的,task可以理解为组织干活的领导者,如果task的并行度为1,那么task的运行实例就是此subtask,两者一致,概念不一致。

flinksql sink 并行度设置 flink并行度设置规则,flinksql sink 并行度设置 flink并行度设置规则_flinksql sink 并行度设置,第1张

flinksql sink 并行度设置 flink并行度设置规则,flinksql sink 并行度设置 flink并行度设置规则_数据传输_02,第2张

  • 上图跟下图相比较,减少了网络传输,并减少了线程数据,flink尽量多的将不同算子放在同一个task中,但是涉及到keyby这种类型的算子需要按照维度分组的,则不能跟上游算子放在一起计算,因为会基于上游不同的节点产生的数据需要汇总按不同维度来进行计算。
  • 即不产生shuffle的算子,可以放在同一个task中,也就是算子绑定

3.相关API介绍

即使满足刚刚三个条件,也不一定就非要把上下游算子绑定成算子链; flink 提供了相关的 api,来让用户可以根据自己的需求,进行灵活的算子链合并或拆分;

1.设置并行度的算子

setParallelism 设置算子的并行度

2.设置槽位共享组的算子

slotSharingGroup 设置算子的槽位共享组

3.主动隔离算子链的算子

disableChaining 对算子禁用前后链合并
startNewChain 对算子开启新链(即禁用算子前链合并)

  • startNewChain()和disableChain()的区别:

4.分区partition算子 - 数据分发策略

  • 上游数据发往下游的时候,会出现数据传输,上下游在同一节点上的时候,直接传输即可,但是实际中,上下游传输数据的时候,会跨节点传输,如果跨节点传输的话,需要注意选择使用什么样的传输策略。
  • 分区算子:用于指定上游 task 的各并行 subtask 与下游 task 的 subtask 之间如何传输数据;
  • Flink 中,对于上下游 subTask 之间的数据传输控制,由 ChannelSelector 策略来控制,而且 Flink 内针 对各种场景,开发了众多 ChannelSelector 的具体实现:
  • 设置数据传输策略时,不需要显式指定 partitioner,而是调用封装好的算子即可:
  • 默认情况下,flink 会优先使用 REBALANCE 分发策略
  • global模式
    上游多个,下游只有一个
  • recale模式
    下游并行度是上游并行度的倍数或者反之,形成了固定的链,然后轮询写入
  • shuffle模式
    如果算子与算子之间使用shuffle机制进行数据传输,那就打破了算子链的第一条oneToOne模式,那就不会形成一个算子链。



https://www.xamrdz.com/lan/55k1935554.html

相关文章: