kaiko-ai / typedspark

Column-wise type annotations for pyspark DataFrames
Apache License 2.0
65 stars 4 forks source link

Add functionality to create a schema from an existing dataframe in a notebook #92

Closed nanne-aben closed 1 year ago

nanne-aben commented 1 year ago

This PR does two things:

  1. Allows for the generation of a schema, based on a dataframe you already have in memory, using create_schema()
  2. When generating a schema using load_table() or create_schema(), it will fill in the name of a StructType schema based on the column name.

Example of case 1

from pyspark.sql import SparkSession
from pyspark.sql.functions import first
from pyspark.sql.types import IntegerType, StringType
from typedspark import Column, Schema, create_partially_filled_dataset, create_schema

spark = SparkSession.builder.getOrCreate()

class A(Schema):
    id: Column[IntegerType]
    key: Column[StringType]
    value: Column[StringType]

df = (
    create_partially_filled_dataset(
        spark, 
        A, 
        {
            A.id: [1, 1, 1, 2, 2, 2, 3, 3, 3],
            A.key: ["a", "b!!", "c", "a", "b!!", "c", "a", "b!!", "c"], 
            A.value: ["alpha", "beta", "gamma", "alpha", "beta", "gamma", "alpha", "beta", "gamma"]
        }
    )
    .groupby(A.id)
    .pivot(A.key.str)
    .agg(first(A.value))
)

df, MySchema = create_schema(df, "A")
MySchema

Which will return

from pyspark.sql.types import IntegerType, StringType

from typedspark import Column, Schema

class A(Schema):
    id: Column[IntegerType]
    a: Column[StringType]
    b__: Column[StringType]
    c: Column[StringType]

Example of case 2

from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType, StringType
from typedspark import Column, Schema, StructType, create_schema, create_partially_filled_dataset

spark = SparkSession.builder.getOrCreate()

class A(Schema):
    a: Column[StringType]
    b: Column[IntegerType]
    c: Column[StringType]

class B(Schema):
    value_container: Column[StructType[A]]

df = (
    create_partially_filled_dataset(
        spark, 
        B, 
        {
            B.value_container: create_partially_filled_dataset(
                spark,
                B.value_container.dtype.schema,
                {
                    B.value_container.dtype.schema.a: ["a", "a", "b", "b"],
                }
            ).collect(),
        }
    )
)

df, MySchema = create_schema(df, "A")

MySchema.value_container.dtype.schema

which will return

from pyspark.sql.types import IntegerType, StringType

from typedspark import Column, Schema

class ValueContainer(Schema):
    a: Column[StringType]
    b: Column[IntegerType]
    c: Column[StringType]