Open DrPepper8888 opened 3 months ago
from pyspark.sql import SparkSession from pyspark.ml.feature import VectorAssembler, PCA from pyspark.ml.clustering import KMeans from pyspark.sql.functions import col, # 初始化SparkSession spark = SparkSession.builder.appName("KMeansExample").getOrCreate() # 读取数据 df = spark.read.csv('features.csv', header=True, inferSchema=True) # 数据预处理 # 将交易日期转换为日期时间 df = df.withColumn('TRAN_DT', df['TRAN_DT'].cast('timestamp')) # 特征选择 # 选择交易金额、持有时间和净资产作为特征 assembler = VectorAssembler(inputCols=['TRAN_AMT', 'HOLDING_TIME', 'NET_WORTH'], outputCol='features') df = assembler.transform(df) # 聚类分析 # 创建KMeans模型 kmeans = KMeans().setK(5).setSeed(0) # 训练模型 model = kmeans.fit(df) # 将聚类结果添加到原始数据框中,并将新列重命名为'Cluster' df = model.transform(df).withColumnRenamed('prediction', 'Cluster') # 注册DataFrame作为临时视图 df.createOrReplaceTempView("customer_data") # 使用PCA进行降维,设置k为2,以便在二维平面上可视化 pca = PCA(inputCol='features', outputCol='pcaFeatures', k=2) pca_model = pca.fit(df) # 应用PCA转换 df_pca = pca_model.transform(df) # 可视化聚类结果 limited_data = df_pca.select('Cluster', 'pcaFeatures').limit(100).toPandas() # 由于数据量可能很大,这里取前100个点进行可视化 plt.scatter(limited_data['pcaFeatures'].apply(lambda x: x[0]), limited_data['pcaFeatures'].apply(lambda x: x[1]), c=limited_data['Cluster'], cmap='rainbow') plt.title('Customer Clusters (PCA)') plt.xlabel('PCA Feature 1') plt.ylabel('PCA Feature 2') plt.show() # 分析每个聚类的特征 summary = df.groupBy('Cluster').agg( col('TRAN_AMT').mean().alias('AverageTransactionAmount'), col('HOLDING_TIME').mean().alias('AverageHoldingTime'), col('NET_WORTH').mean().alias('AverageNetWorth') ) summary.show()