当前位置: 首页>后端>正文

Pyspark3-API介绍和实例

PySpark是一种分布式计算框架,它使用Python代码编写。而PySpark3是PySpark的3.0版本。这篇文章将介绍PySpark3的API并提供实例。

1.SparkSession:SparkSession是PySpark3的入口点,用于与Spark集群交互。它可以配置Spark应用程序和数据源,并创建DataFrame。以下是用于创建SparkSession的代码:

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("example-app") \
    .master("local[*]") \
    .getOrCreate()

2.DataFrame:DataFrame是一种分布式数据集,类似于表格或关系型数据库的概念。DataFrame提供了广泛的操作,包括集成几乎所有SQL操作和MLlib算法。以下是读取CSV文件并创建DataFrame的代码:

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("example-app") \
    .master("local[*]") \
    .getOrCreate()

df = spark.read.csv("path/to/file.csv", header=True, inferSchema=True)

3.RDD:RDD是一种弹性分布式数据集,是PySpark的基本数据结构。RDD提供了并行化和容错处理功能,这使得数据处理非常快速。以下是创建RDD的代码:

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("example-app") \
    .master("local[*]") \
    .getOrCreate()

rdd = spark.sparkContext.parallelize([1, 2, 3, 4, 5])

RDD操作分为Transformations和Actions两类。Transformations是RDD的转换操作,通常会返回新的RDD。Actions是RDD的计算操作,通常会触发计算并返回计算结果。下面将详细介绍Pyspark3中常用的RDD操作方法及其实例。

1、map
map是RDD中最常用的转换操作之一。它接收一个函数作为参数,并将该函数应用于RDD中的每个元素。然后返回一个新的RDD,其元素为函数的输出。

例如,下面的代码展示了如何使用map函数来将一个包含数字的RDD转换为字符串类型的RDD:

numbers = sc.parallelize([1, 2, 3, 4, 5])
strings = numbers.map(lambda x: str(x))

2、filter
filter是另一种常用的转换操作。它接收一个函数作为参数,并从RDD中返回所有满足该函数条件的元素。

例如,下面的代码展示了如何使用filter函数来从一个包含数字的RDD中筛选出所有偶数:

numbers = sc.parallelize([1, 2, 3, 4, 5])
even_numbers = numbers.filter(lambda x: x % 2 == 0)

3、flatMap
flatMap和map类似,但返回的是一个扁平化的列表。它接收一个函数作为参数,并将该函数应用于RDD中的每个元素。该函数的输出应该是一个列表或迭代器。然后flatMap会将所有的输出合并成一个新的RDD。

例如,下面的代码展示了如何使用flatMap函数将一个包含字符串的RDD转换为包含单词的RDD:

strings = sc.parallelize(["hello world", "how are you"])
words = strings.flatMap(lambda x: x.split(" "))

4、union
union是将两个RDD合并为一个的操作。它返回一个新的RDD,其中包含原始RDD的所有元素和新的RDD的所有元素。

例如,下面的代码展示了如何使用union函数将两个包含数字的RDD合并为一个RDD:

numbers1 = sc.parallelize([1, 2, 3])
numbers2 = sc.parallelize([4, 5, 6])
all_numbers = numbers1.union(numbers2)

5、distinct
distinct是一个去重操作。它返回一个新的RDD,其中包含原始RDD中的所有不重复元素。

例如,下面的代码展示了如何使用distinct函数将一个包含重复元素的RDD转换为一个没有重复元素的RDD:

numbers = sc.parallelize([1, 2, 3, 3, 4, 5, 5])
unique_numbers = numbers.distinct()

6、reduce
reduce是一个计算操作。它接收一个函数作为参数,并对RDD中的所有元素进行聚合计算。该函数应该是一个二元运算,并且可以将两个元素合并为一个元素。reduce从RDD中取出前两个元素,并将它们传递给函数进行计算。然后将结果与下一个元素继续传递给函数进行计算,直到整个RDD被聚合为一个元素。最终结果是一个单独的元素。

例如,下面的代码展示了如何使用reduce函数计算一个包含数字的RDD的和:

numbers = sc.parallelize([1, 2, 3, 4, 5])
sum = numbers.reduce(lambda x, y: x + y)

7、aggregate
aggregate也是一个计算操作。它接收一个初始值和两个函数作为参数。初始值是一个与计算相关的空值。第一个函数将初始值与RDD中的第一个元素进行聚合计算。第二个函数将元素与上一次聚合计算的结果进行聚合计算。最终结果是一个单独的元素。

例如,下面的代码展示了如何使用aggregate函数计算一个包含数字的RDD的平均值:

numbers = sc.parallelize([1, 2, 3, 4, 5])
sum_count = numbers.aggregate((0, 0), lambda acc, value: (acc[0] + value, acc[1] + 1), lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1]))
average = sum_count[0] / sum_count[1]

以上是Pyspark3针对RDD操作的方法介绍和实例。RDD是Spark的重要数据结构,常见的转换和计算操作可以轻松实现。通过Pyspark3提供的Python API,可以方便地在Python中使用Spark的功能。

希望本文对大家有所帮助。


https://www.xamrdz.com/backend/3m31933837.html

相关文章: