当前位置: 首页>后端>正文

Flink 从指定checkpoint启动任务命令 flink任务自动重启

  Flink的checkpoint机制可以保证exactly-once特性,让Flink流计算结果具备可信赖性,checkpoint会异步保存遇到检查点时当时数据流记录的位置以及算子的中间状态,在Flink任务发生故障时,可以从最新保存成功的checkpoint恢复任务。看似很健壮的机制,但为什么有时候运行好好的Flink任务,总是会自己重启呢?尽管有checkpoint保证exactly-once,但对于实时性要求高的业务场景,每次重启所消耗的时间都可能会导致业务不可用。也许你也经常遇到这样的情况,checkpoint又失败了?连续失败?task manager 内存爆了? 这些情况都很容易导致Flink任务down了,这时候需要思考下你所处的业务场景下,选用的Flink State Backends是否合理?

  我们先来深入思考下,我们设置的checkpoint将这些状态快照保存到哪?Flink支持哪些保存方式?这些保存方式适用什么场景?状态快照以什么形式保存的? 搞清楚了这些,我们就能知道针对我们现有的业务场景应该选择什么样的State Backends.

  Flink支持的 State Backends

  MemoryStateBackend 默认,小状态,本地调试使用

  FsStateBackend 大状态,长窗口,高可用场景

  RocksDBStateBackend 超大状态,长窗口,高可用场景,增量checkpoint

  MemoryStateBackend

  MemoryStateBackend将key-value的中间状态和窗口算子中间结果保存在hash table,在数据流遇到检查点屏障时,MemoryStateBackend将状态快照发送给JobManager(master),并存储在Java堆中。

  我们来看下MemoryStateBackend的四个构造函数


  可知,Flink默认使用异步保存(避免阻塞数据流计算)快照的MemoryStateBackend,且默认每个状态快照大小最大为5MB,状态快照大小不能超过JobManager和TaskManagers之间可发送msg的最大size,这个配置可以通过akka.framesize(默认10M)来设置(文献2),同时总的快照大小不能超过 JobManager 的内存。

  总的来说,MemoryStateBackend使用起来简单快捷,但使用场景有限,可以用来本地小数据开发调试用,一般在大数据生产环境下,MemoryStateBackend无法满足要求。

  FsStateBackend

  FsStateBackend需要配置存储的文件系统,可以是hdfs路径:hdfs://namenode:40010/flink/checkpoints,也可以是文件系统路径:file:///data/flink/checkpoints.

  FsStateBackend将流计算数据状态存储在TaskManager的内存中,在数据流遇到检查点屏障时,再将数据快照存储在配置好的文件系统中,在JobManager内存中会存储少量的checkpoint元数据。

  我们来看下FsStateBackend的三种构造函数

 

public FsStateBackend(String checkpointDataUri, boolean asynchronousSnapshots) throws IOException {
  this(new Path(checkpointDataUri), asynchronousSnapshots);
  }
  public FsStateBackend(Path checkpointDataUri, boolean asynchronousSnapshots) throws IOException {
  this(checkpointDataUri.toUri(), asynchronousSnapshots);
  }
  //fileStateSizeThreshold默认1024
  public FsStateBackend(URI checkpointDataUri, int fileStateSizeThreshold, boolean asynchronousSnapshots) throws IOException {
  Preconditions.checkArgument(fileStateSizeThreshold = 0, The threshold for file state size must be zero or larger.);
  Preconditions.checkArgument(fileStateSizeThreshold = 1048576, The threshold for file state size cannot be larger than %s, new Object[]{1048576});
  this.fileStateThreshold = fileStateSizeThreshold;
  this.basePath = validateAndNormalizeUri(checkpointDataUri);
  this.asynchronousSnapshots = asynchronousSnapshots;
  }

 

  可知,FsStateBackend默认使用异步快照,对每个快照文件大小有要求:[0, 1048576],且状态快照大小不能超过 TaskManager 的内存。但状态快照最终保存在文件系统中,所以FsStateBackend适用于大数据的生产环境,可处理长窗口,大状态或大key-value状态任务。

 

  RocksDBStateBackend

 

  和FsStateBackend一样,RocksDBStateBackend也需要配置存储的文件系统,但RocksDBStateBackend并不受限于TaskManager 的内存,其使用嵌入式的本地数据库RocksDB将流计算数据状态存储在本地磁盘中,在数据流遇到检查点屏障时,再将整个RocksDB 数据库数据或者在超大状态流计算时只将增量的数据快照存储到配置的文件系统中,在JobManager内存中会存储少量的checkpoint元数据。

 

  我们来看下RocksDBStateBackend的构造函数

 

 

public RocksDBStateBackend(String checkpointDataUri, boolean enableIncrementalCheckpointing) throws IOException {
 
  this(new Path(checkpointDataUri).toUri(), enableIncrementalCheckpointing);
 
  }

 

  不同于MemoryStateBackend和FsStateBackend可以通过设置asynchronousSnapshots来设置是否异步快照,RocksDBStateBackend固定使用异步快照方式,但可以通过enableIncrementalCheckpointing来设置是否使用增量保存。

 

  增量保存是指每次checkpoint只保存对前一个已完成checkpoint的所有更改,而不是State Backend完整和独立的备份。对于每次checkpoint都保存完整的状态快照,增量保存可以显著减少每次checkpoint的时间,但也可能导致任务重启时恢复时间变长。增量保存checkpoint不会无限增长,旧检查点会自动合并和裁剪。增量保存不会默认开启,需要将enableIncrementalCheckpointing=true来开启。

 

  由于RocksDB的JNI API是基于 byte[] 的,对于key-value状态,单key和单value的大小最大为每个 2^31 字节。RocksDBStateBackend保存的状态快照大小只受限于磁盘可用空间的大小,所以非常适合用于处理大状态,长窗口,或大键值状态的有状态流计算任务,且RocksDBStateBackend目前唯一支持增量 checkpoint 的State Backends。增量 checkpoint 非常适用于超大状态的场景,但RocksDBStateBackend对状态快照相关操作需要序列化和反序列化,可能会影响程序吞吐量。


https://www.xamrdz.com/backend/3te1944666.html

相关文章: