Spark SQL进行金融风险控制数据分析(Python)
发布时间
阅读量:
阅读量
- 导入必要的库
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
- 创建SparkSession
conf = SparkConf().setAppName("Financial Risk Control Data Analysis")
sc = SparkContext(conf=conf)
spark = SparkSession(sc)
- 读取数据
# 读取客户信息数据
customer_info = spark.read.format("csv").option("header", "true").load("customer_info.csv")
# 读取交易记录数据
transaction_record = spark.read.format("csv").option("header", "true").load("transaction_record.csv")
- 数据清洗和预处理
# 将客户信息数据进行清洗和预处理
customer_info = customer_info.dropDuplicates() # 去重
customer_info = customer_info.dropna() # 去除缺失值
customer_info.createOrReplaceTempView("customer_info")
# 将交易记录数据进行清洗和预处理
transaction_record = transaction_record.dropDuplicates() # 去重
transaction_record = transaction_record.dropna() # 去除缺失值
transaction_record.createOrReplaceTempView("transaction_record")
transaction_record = spark.sql("""
SELECT customer_id,
transaction_type,
CAST(amount AS DOUBLE) AS amount
FROM transaction_record
""")
- 数据分析
# 使用Spark SQL进行数据分析
customer_info.createOrReplaceTempView("customer_info")
transaction_record.createOrReplaceTempView("transaction_record")
result = spark.sql("""
SELECT ci.customer_id,
SUM(CASE WHEN tr.transaction_type = 'transfer_in' THEN tr.amount ELSE 0 END) AS transfer_in_amount,
SUM(CASE WHEN tr.transaction_type = 'transfer_out' THEN tr.amount ELSE 0 END) AS transfer_out_amount,
SUM(CASE WHEN tr.transaction_type = 'consume' THEN tr.amount ELSE 0 END) AS consume_amount
FROM customer_info ci
JOIN transaction_record tr ON ci.customer_id = tr.customer_id
GROUP BY ci.customer_id
HAVING SUM(CASE WHEN tr.transaction_type = 'transfer_in' THEN tr.amount ELSE 0 END) < SUM(CASE WHEN tr.transaction_type = 'transfer_out' THEN tr.amount ELSE 0 END)
""")
# 显示结果
result.show()
全部评论 (0)
还没有任何评论哟~
