Advertisement

Apache Spark的常用算子和函数

阅读量:

一、RDD 常用算子

1. 转换算子(Transformations)

map(func)
对 RDD 中的每个元素应用函数func,返回新的 RDD。

python

复制代码
        1. rdd = sc.parallelize([1, 2, 3])

    
        2. rdd.map(lambda x: x * 2)  # 结果:[2, 4, 6]
    

filter(func)
过滤出满足函数func条件的元素,返回新的 RDD。

python

复制代码
    rdd.filter(lambda x: x > 2)  # 结果:[3]

    
    

flatMap(func)
先对元素应用func,再将结果展平,常用于分词。

python

复制代码
        1. rdd = sc.parallelize(["Hello world", "Spark is fast"])

    
        2. rdd.flatMap(lambda x: x.split(" "))  # 结果:["Hello", "world", "Spark", "is", "fast"]
    

union(otherRDD)
合并两个 RDD,不去重。

python

复制代码
        1. rdd1 = sc.parallelize([1, 2])

    
        2. rdd2 = sc.parallelize([3, 4])
    
        3. rdd1.union(rdd2)  # 结果:[1, 2, 3, 4]
    

distinct()
去重,返回元素唯一的 RDD(可能触发 Shuffle)。

python

复制代码
        1. rdd = sc.parallelize([1, 2, 2, 3])

    
        2. rdd.distinct()  # 结果:[1, 2, 3]
    

groupByKey()
按 key 分组,返回(K, Iterable[V])对。

python

复制代码
        1. rdd = sc.parallelize([(1, "a"), (2, "b"), (1, "c")])

    
        2. rdd.groupByKey().collect()  # 结果:[(1, ["a", "c"]), (2, ["b"])]
    

reduceByKey(func)
按 key 分组后对值进行聚合,性能优于groupByKey

python

复制代码
    rdd.reduceByKey(lambda x, y: x + y)  # 结果:[(1, "ac"), (2, "b")]

    
    

join(otherRDD)
对两个 KV 格式的 RDD 进行内连接。

python

复制代码
        1. rdd1 = sc.parallelize([(1, "a"), (2, "b")])

    
        2. rdd2 = sc.parallelize([(1, "x"), (2, "y")])
    
        3. rdd1.join(rdd2)  # 结果:[(1, ("a", "x")), (2, ("b", "y"))]
    

sortByKey(ascending=True)
按 key 排序。

python

复制代码
        1. rdd = sc.parallelize([(3, "c"), (1, "a"), (2, "b")])

    
        2. rdd.sortByKey()  # 结果:[(1, "a"), (2, "b"), (3, "c")]
    
2. 行动算子(Actions)

collect()
将 RDD 的所有元素收集到 Driver 端,慎用(可能导致 OOM)。

python

复制代码
    rdd.collect()  # 返回Python列表

    
    

count()
返回 RDD 的元素个数。

python

复制代码
    rdd.count()  # 结果:3

    
    

reduce(func)
聚合 RDD 中的元素。

python

复制代码
        1. rdd = sc.parallelize([1, 2, 3])

    
        2. rdd.reduce(lambda x, y: x + y)  # 结果:6
    

take(n)
返回前 n 个元素。

python

复制代码
    rdd.take(2)  # 结果:[1, 2]

    
    

first()
返回第一个元素。

python

复制代码
    rdd.first()  # 结果:1

    
    

saveAsTextFile(path)
将 RDD 保存为文本文件。

python

复制代码
    rdd.saveAsTextFile("hdfs://path/to/output")

    
    

foreach(func)
对 RDD 的每个元素执行函数func(常用于输出到外部系统)。

python

复制代码
    rdd.foreach(lambda x: print(x))

    
    

二、DataFrame/Dataset 常用函数

1. 转换操作

select(cols)
选择列。

python

复制代码
    df.select("name", "age")

    
    

filter(condition)
过滤行。

python

复制代码
    df.filter(df["age"] > 20)

    
    

groupBy(cols)
分组后进行聚合。

python

复制代码
    df.groupBy("gender").avg("age")

    
    

orderBy(cols)
排序。

python

复制代码
    df.orderBy("age", ascending=False)

    
    

join(other_df, on, how)
连接两个 DataFrame。

python

复制代码
    df.join(other_df, on="id", how="inner")

    
    

withColumn(colName, col)
添加列或修改现有列。

python

复制代码
    df.withColumn("age_squared", df["age"] ** 2)

    
    

drop(col)
删除列。

python

复制代码
    df.drop("age")

    
    
2. 行动操作

show(n)
显示前 n 行。

python

复制代码
    df.show(5)

    
    

count()
统计行数。

python

复制代码
    df.count()

    
    

collect()
收集所有行到 Driver 端。

python

复制代码
    df.collect()

    
    

write.save(path)
保存 DataFrame 到文件。

python

复制代码
    df.write.parquet("hdfs://path/to/parquet")

    
    

三、Spark SQL 常用函数

聚合函数
sum(), avg(), count(), min(), max(), stddev(), variance()等。

python

复制代码
        1. from pyspark.sql.functions import sum

    
        2. df.select(sum("salary"))
    

字符串函数
concat(), substring(), lower(), upper(), trim(), regexp_replace()等。

python

复制代码
        1. from pyspark.sql.functions import concat

    
        2. df.select(concat(df["first_name"], " ", df["last_name"]))
    

日期时间函数
current_date(), date_format(), unix_timestamp(), datediff()等。

python

复制代码
        1. from pyspark.sql.functions import current_date

    
        2. df.withColumn("today", current_date())
    

窗口函数
row_number(), rank(), dense_rank(), lag(), lead()等。

python

复制代码
        1. from pyspark.sql.window import Window

    
        2. from pyspark.sql.functions import row_number
    
        3. window_spec = Window.partitionBy("dept").orderBy("salary")
    
        4. df.withColumn("rank", row_number().over(window_spec))
    

四、Spark Streaming 常用算子

transform(func)
对 DStream 的每个 RDD 应用转换函数。

python

复制代码
    stream.transform(lambda rdd: rdd.filter(lambda x: x > 0))

    
    

window(windowDuration, slideDuration)
创建滑动窗口。

python

复制代码
    stream.window(windowDuration=30, slideDuration=10)  # 30秒窗口,每10秒滑动一次

    
    

reduceByKeyAndWindow(func, windowDuration, slideDuration)
窗口内按 key 聚合。

python

复制代码
    pairs.reduceByKeyAndWindow(lambda x, y: x + y, 30, 10)

    
    

updateStateByKey(func)
维护状态(需启用检查点)。

python

复制代码
        1. def updateFunc(new_values, last_sum):

    
        2.     return sum(new_values) + (last_sum or 0)
    
        3. pairs.updateStateByKey(updateFunc)
    

五、MLlib 常用函数

特征处理
VectorAssembler, StandardScaler, StringIndexer, OneHotEncoder, Tokenizer等。

python

复制代码
        1. from pyspark.ml.feature import VectorAssembler

    
        2. assembler = VectorAssembler(inputCols=["age", "height"], outputCol="features")
    

算法模型 * 分类LogisticRegression, RandomForestClassifier, NaiveBayes等。
* 回归LinearRegression, DecisionTreeRegressor等。
* 聚类KMeans, DBSCAN, GaussianMixture等。
* 推荐系统ALS(协同过滤)。

python

复制代码
        1. from pyspark.ml.classification import LogisticRegression

    
        2. lr = LogisticRegression(featuresCol="features", labelCol="label")
    

总结

  • RDD 算子 :适用于低级别操作,灵活性高但需手动管理数据分区和序列化。
  • DataFrame/Dataset 函数 :结构化数据处理更高效,支持 SQL 语法和优化执行计划。
  • Spark SQL 函数 :提供丰富的内置函数,简化数据处理逻辑。
  • Spark Streaming 算子 :专为流数据处理设计,支持窗口操作和状态维护。
  • MLlib 函数 :封装了常见机器学习算法,支持分布式训练

全部评论 (0)

还没有任何评论哟~