Open DrPepper8888 opened 7 months ago
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, struct
# 初始化SparkSession
spark = SparkSession.builder.appName("PandasToSpark").getOrCreate()
# 假设你的DataFrame名为df,目标列名为"is_high_freq"
# 将Pandas DataFrame转换为Spark DataFrame
spark_df = spark.read.csv('features.csv', header=True, inferSchema=True)
# 特征组装
assembler = VectorAssembler(inputCols=['AGE', 'TRAN_AMT', 'HOLDING_TIME', 'NET_WORTH'], outputCol="features")
# 应用特征组装到DataFrame
assembled_df = assembler.transform(spark_df)
# 逻辑回归模型
lr = LogisticRegression(featuresCol="features", labelCol="is_high_freq")
# 模型评估
evaluator = BinaryClassificationEvaluator(labelCol="is_high_freq")
# 参数网格
paramGrid = ParamGridBuilder() \
.addGrid(lr.regParam, [0.01, 0.05,0.08,0.1,0.5,0.8, 1.0]) \
.addGrid(lr.elasticNetParam, [0.0,0.1,0.3, 0.4,0.5, 0.8,1.0]) \
.build()
# 训练验证分割
tvs = TrainValidationSplit(estimator=lr, evaluator=evaluator, estimatorParamMaps=paramGrid, trainRatio=0.8)
# 训练模型
model = tvs.fit(assembled_df) # 确保使用已经应用了特征组装的DataFrame
# 预测
predictions = model.transform(assembled_df)
#################################################################
###预测集
# 假设data_df是你的新数据集DataFrame,它应该包含与训练数据相同的特征列
# 请确保data_df已经按照与训练数据相同的方式进行了预处理
# 如果需要,使用相同的特征组装步骤来处理新数据集
assembler = VectorAssembler(inputCols=['AGE', 'TRAN_AMT', 'HOLDING_TIME', 'NET_WORTH'], outputCol="features")
transformed_data_df = assembler.transform(data_df)
# 使用训练好的模型对新数据集进行预测
predictions = model.transform(transformed_data_df)
# # 显示结果
predictions.show()