Apache Spark的常用算子和函数
一、RDD 常用算子
1. 转换算子(Transformations)
map(func)
对 RDD 中的每个元素应用函数func,返回新的 RDD。
python
1. rdd = sc.parallelize([1, 2, 3])
2. rdd.map(lambda x: x * 2) # 结果:[2, 4, 6]
filter(func)
过滤出满足函数func条件的元素,返回新的 RDD。
python
rdd.filter(lambda x: x > 2) # 结果:[3]
flatMap(func)
先对元素应用func,再将结果展平,常用于分词。
python
1. rdd = sc.parallelize(["Hello world", "Spark is fast"])
2. rdd.flatMap(lambda x: x.split(" ")) # 结果:["Hello", "world", "Spark", "is", "fast"]
union(otherRDD)
合并两个 RDD,不去重。
python
1. rdd1 = sc.parallelize([1, 2])
2. rdd2 = sc.parallelize([3, 4])
3. rdd1.union(rdd2) # 结果:[1, 2, 3, 4]
distinct()
去重,返回元素唯一的 RDD(可能触发 Shuffle)。
python
1. rdd = sc.parallelize([1, 2, 2, 3])
2. rdd.distinct() # 结果:[1, 2, 3]
groupByKey()
按 key 分组,返回(K, Iterable[V])对。
python
1. rdd = sc.parallelize([(1, "a"), (2, "b"), (1, "c")])
2. rdd.groupByKey().collect() # 结果:[(1, ["a", "c"]), (2, ["b"])]
reduceByKey(func)
按 key 分组后对值进行聚合,性能优于groupByKey。
python
rdd.reduceByKey(lambda x, y: x + y) # 结果:[(1, "ac"), (2, "b")]
join(otherRDD)
对两个 KV 格式的 RDD 进行内连接。
python
1. rdd1 = sc.parallelize([(1, "a"), (2, "b")])
2. rdd2 = sc.parallelize([(1, "x"), (2, "y")])
3. rdd1.join(rdd2) # 结果:[(1, ("a", "x")), (2, ("b", "y"))]
sortByKey(ascending=True)
按 key 排序。
python
1. rdd = sc.parallelize([(3, "c"), (1, "a"), (2, "b")])
2. rdd.sortByKey() # 结果:[(1, "a"), (2, "b"), (3, "c")]
2. 行动算子(Actions)
collect()
将 RDD 的所有元素收集到 Driver 端,慎用(可能导致 OOM)。
python
rdd.collect() # 返回Python列表
count()
返回 RDD 的元素个数。
python
rdd.count() # 结果:3
reduce(func)
聚合 RDD 中的元素。
python
1. rdd = sc.parallelize([1, 2, 3])
2. rdd.reduce(lambda x, y: x + y) # 结果:6
take(n)
返回前 n 个元素。
python
rdd.take(2) # 结果:[1, 2]
first()
返回第一个元素。
python
rdd.first() # 结果:1
saveAsTextFile(path)
将 RDD 保存为文本文件。
python
rdd.saveAsTextFile("hdfs://path/to/output")
foreach(func)
对 RDD 的每个元素执行函数func(常用于输出到外部系统)。
python
rdd.foreach(lambda x: print(x))
二、DataFrame/Dataset 常用函数
1. 转换操作
select(cols)
选择列。
python
df.select("name", "age")
filter(condition)
过滤行。
python
df.filter(df["age"] > 20)
groupBy(cols)
分组后进行聚合。
python
df.groupBy("gender").avg("age")
orderBy(cols)
排序。
python
df.orderBy("age", ascending=False)
join(other_df, on, how)
连接两个 DataFrame。
python
df.join(other_df, on="id", how="inner")
withColumn(colName, col)
添加列或修改现有列。
python
df.withColumn("age_squared", df["age"] ** 2)
drop(col)
删除列。
python
df.drop("age")
2. 行动操作
show(n)
显示前 n 行。
python
df.show(5)
count()
统计行数。
python
df.count()
collect()
收集所有行到 Driver 端。
python
df.collect()
write.save(path)
保存 DataFrame 到文件。
python
df.write.parquet("hdfs://path/to/parquet")
三、Spark SQL 常用函数
聚合函数
sum(), avg(), count(), min(), max(), stddev(), variance()等。
python
1. from pyspark.sql.functions import sum
2. df.select(sum("salary"))
字符串函数
concat(), substring(), lower(), upper(), trim(), regexp_replace()等。
python
1. from pyspark.sql.functions import concat
2. df.select(concat(df["first_name"], " ", df["last_name"]))
日期时间函数
current_date(), date_format(), unix_timestamp(), datediff()等。
python
1. from pyspark.sql.functions import current_date
2. df.withColumn("today", current_date())
窗口函数
row_number(), rank(), dense_rank(), lag(), lead()等。
python
1. from pyspark.sql.window import Window
2. from pyspark.sql.functions import row_number
3. window_spec = Window.partitionBy("dept").orderBy("salary")
4. df.withColumn("rank", row_number().over(window_spec))
四、Spark Streaming 常用算子
transform(func)
对 DStream 的每个 RDD 应用转换函数。
python
stream.transform(lambda rdd: rdd.filter(lambda x: x > 0))
window(windowDuration, slideDuration)
创建滑动窗口。
python
stream.window(windowDuration=30, slideDuration=10) # 30秒窗口,每10秒滑动一次
reduceByKeyAndWindow(func, windowDuration, slideDuration)
窗口内按 key 聚合。
python
pairs.reduceByKeyAndWindow(lambda x, y: x + y, 30, 10)
updateStateByKey(func)
维护状态(需启用检查点)。
python
1. def updateFunc(new_values, last_sum):
2. return sum(new_values) + (last_sum or 0)
3. pairs.updateStateByKey(updateFunc)
五、MLlib 常用函数
特征处理
VectorAssembler, StandardScaler, StringIndexer, OneHotEncoder, Tokenizer等。
python
1. from pyspark.ml.feature import VectorAssembler
2. assembler = VectorAssembler(inputCols=["age", "height"], outputCol="features")
算法模型 * 分类 :LogisticRegression, RandomForestClassifier, NaiveBayes等。
* 回归 :LinearRegression, DecisionTreeRegressor等。
* 聚类 :KMeans, DBSCAN, GaussianMixture等。
* 推荐系统 :ALS(协同过滤)。
python
1. from pyspark.ml.classification import LogisticRegression
2. lr = LogisticRegression(featuresCol="features", labelCol="label")
总结
- RDD 算子 :适用于低级别操作,灵活性高但需手动管理数据分区和序列化。
- DataFrame/Dataset 函数 :结构化数据处理更高效,支持 SQL 语法和优化执行计划。
- Spark SQL 函数 :提供丰富的内置函数,简化数据处理逻辑。
- Spark Streaming 算子 :专为流数据处理设计,支持窗口操作和状态维护。
- MLlib 函数 :封装了常见机器学习算法,支持分布式训练
