DrPepper8888 / IPMN

0 stars 0 forks source link

pyspark处理clickstream #21

Open DrPepper8888 opened 7 months ago

DrPepper8888 commented 7 months ago
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, collect_list

# 初始化SparkSession
spark = SparkSession.builder \
    .appName("Pivot Table-like Operation") \
    .getOrCreate()

# 假设df是你的DataFrame,并且它已经被创建并加载了数据
# df = spark.read.csv("path/to/your/csvfile.csv", header=True, inferSchema=True)

# 按照'datet'进行分组,并收集每个分组下的'cust'和'cd'值
# 这里我们使用collect_list来收集'cd'值,并将其作为一个列表存储在新的列中
grouped_df = df.groupBy("datet").agg(
    collect_list("cust").alias("cust_list"),
    collect_list("cd").alias("cd_list")
)

# 现在grouped_df包含了每个'datet'下的'cust'和'cd'值的列表
# 如果你想要将这些列表展平并创建一个类似于Pandas pivot_table的DataFrame
# 你可以使用explode方法将列表转换为独立的行,然后使用groupBy和agg再次进行操作
from pyspark.sql.functions import explode

# 使用explode方法将'cust_list'和'cd_list'转换为独立的行
exploded_df = grouped_df.withColumn("cust", explode(col("cust_list"))) \
                        .withColumn("cd", explode(col("cd_list")))

# 再次按照'datet'和'cust'进行分组,并计算每个组合的'cd'列表
pivoted_df = exploded_df.groupBy("datet", "cust").agg(
    collect_list("cd").alias("cd_list")
)

# 显示结果
pivoted_df.show(truncate=False)