当前位置: 首页>编程语言>正文

请检查spark配置 Java gateway process exited before sending its port number 简述spark的检查点

一个Streaming应用程序要求7天24小时不间断运行,因此必须适应各种导致应用程序失败的场景。Spark Streaming的检查点具有容错机制,有足够的信息能够支持故障恢复。支持两种数据类型的检查点:元数据检查点和数据检查点。

(1)元数据检查点,在类似HDFS的容错存储上,保存Streaming计算信息。这种检查点用来恢复运行Streaming应用程序失败的Driver进程。

(2)数据检查点,在进行跨越多个批次合并数据的有状态操作时尤其重要。在这种转换操作情况下,依赖前一批次的RDD生成新的RDD,随着时间不断增加,RDD依赖链的长度也在增加,为了避免这种无限增加恢复时间的情况,通过周期检查将转换RDD的中间状态进行可靠存储,借以切断无限增加的依赖。使用有状态的转换,如果updateStateByKey或者reduceByKeyAndWindow在应用程序中使用,那么需要提供检查点路径,对RDD进行周期性检查。

元数据检查点主要用来恢复失败的Driver进程,而数据检查点主要用来恢复有状态的转换操作。无论是Driver失败,还是Worker失败,这种检查点机制都能快速恢复。许多Spark Streaming都是使用检查点方式。但是简单的Streaming应用程序,不包含状态转换操作不能运行检查点;从Driver程序故障中恢复可能会造成一些收到没有处理的数据丢失。

为了让一个Spark Streaming程序能够被恢复,需要启用检查点,必须设置一个容错的、可靠的文件系统(如HDFS、S3等)路径保存检查点信息,同时设置时间间隔。

streamingContext.checkpoint(checkpointDirectory)//checkpointDirectory是一个文件系统路径(最好是一个可靠的比如hdfs://....)

dstream.checkpoint(checkpointInterval)//设置时间间隔

当程序第一次启动时,创建一个新的StreamingContext,接着创建所有的数据流,然后再调用start()方法。

//定义一个创建并设置StreamingContext的函数

def functionToCreateContext(): StreamingContext = {

val ssc = new StreamingContext(...)               //创建StreamingContext实例

val DsSream = ssc.socketTextStream(...)      //创建DStream

...

ssc.checkpoint(checkpointDirectory)           //设置检查点机制

ssc

}

//从检查点数据重建或者新建一个StreamingContext

val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreate-Context_)

//在context需要做额外的设置完成,不考虑是否被启动或重新启动

context. ...

//启动context

context.start()

context.awaitTermination()

通过使用getOrCreate创建StreamingContext。

当程序因为异常重启时,如果检查点路径存在,则context将从检查点数据中重建。如果检查点目录不存在(首次运行),将会调用functionToCreateContext函数新建context函数新建context,并设置DStream。

但是,Streaming需要保存中间数据到容错存储系统,这个策略会引入存储开销,进而可能会导致相应的批处理时间变长,因此,检查点的时间间隔需要精心设置。采取小批次时,每批次检查点可以显著减少操作的吞吐量;相反,检查点太少可能会导致每批次任务大小的增加。对于RDD检查点的有状态转换操作,其检查点间隔默认设置成DStream的滑动间隔的5~10倍。

故障恢复可以使用Spark的Standalone模式自动完成,该模式允许任何Spark应用程序的Driver在集群内启动,并在失败时重启。而对于YARN或Mesos这样的部署环境,则必须通过其他的机制重启Driver。


https://www.xamrdz.com/lan/5td1925731.html

相关文章: