databrickslabs / dlt-meta

This is metadata driven DLT based framework for bronze/silver pipelines
Other
125 stars 54 forks source link

Multiple select expression support #17

Open pavelilyushko opened 8 months ago

pavelilyushko commented 8 months ago

Hello,

I ingest JSON data into bronze layer and then try to apply some transformations on it to promote it to the silver layer.

here's the problem: when I try to explode the ingested nested json and then select all columns I get the following error:


from pyspark.sql.types import StructType, StructField, StringType, ArrayType

data = [("id1", [("a", "111"), ("b", "222")]),

        ("id2", [("c", "333"), ("d", "444")])]

schema = StructType([

    StructField("id", StringType()),

    StructField("payload", ArrayType(

        StructType([

            StructField("c1", StringType()),

            StructField("c2", StringType())

        ])

    ))

])

df = spark.createDataFrame(data, schema)

df2 = df.selectExpr(

"explode(payload) as temp",

"temp.*"

)

display(df2.schema)

AnalysisException: Cannot resolve 'temp.*' given input columns ''.; line 1 pos 0

However if I select all columns in a separate selectExpr, all is good:


df2 = df.selectExpr(

"explode(payload) as temp").selectExpr(

"temp.*"

)

display(df2.schema)

df2:pyspark.sql.dataframe.DataFrame

c1:string

c2:string

StructType([StructField('c1', StringType(), True), StructField('c2', StringType(), True)])

Now suppose I want to drop the unwanted columns from the result:


df2 = df.selectExpr(

"explode(payload) as temp").selectExpr(

"temp.*",

"except(c1)"

)

display(df2.schema)

Which gives the error:


AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `c1` cannot be resolved. Did you mean one of the following? [`temp`].; line 1 pos 0

However if add another selectExpr on top of a previous one, it works!


df2 = df.selectExpr(

"explode(payload) as temp").selectExpr(

"temp.*").selectExpr(

"* except(c1)"

)

display(df2.schema)

StructType([StructField('c2', StringType(), True)])

As far as I understood from the DLT Meta source code:


    def get_silver_schema(self):

        """Get Silver table Schema."""

        silver_dataflow_spec: SilverDataflowSpec = self.dataflowSpec

        # source_database = silver_dataflow_spec.sourceDetails["database"]

        # source_table = silver_dataflow_spec.sourceDetails["table"]

        select_exp = silver_dataflow_spec.selectExp

        where_clause = silver_dataflow_spec.whereClause

        raw_delta_table_stream = self.spark.read.load(

            path=silver_dataflow_spec.sourceDetails["path"],

            format="delta"

            # #f"{source_database}.{source_table}"

        ).selectExpr(*select_exp)

the selectExpr is applied once on an array of select expressions.

Can we apply it separately on each of the select expression so as to avoid the above errors and make it more flexible in transforming?

Thank you

ravi-databricks commented 8 months ago

You can use extract_select_exp function to generate select_exp array for silver_transformations.json file generation then feed to dlt-meta onboarding job which will generated bronze dataflowspec. Check below example

from pyspark.sql.types import StructType, StructField, StringType, ArrayType

data = [("id1", [("a", "111"), ("b", "222")]),

        ("id2", [("c", "333"), ("d", "444")])]

def extract_select_exp(schema, element_name, view_name):
  list = schema.fields
  select_exp = []
  for element in list:
    if(element.name == element_name):
      tye = element.dataType
      fields = tye.elementType.fields
      for field in fields:
        select_exp.append(f"{view_name}.{field.name}")
  print(*select_exp)
  return select_exp

schema = StructType([

    StructField("id", StringType()),

    StructField("payload", ArrayType(

        StructType([

            StructField("c1", StringType()),

            StructField("c2", StringType())

        ])

    ))

])

df = spark.createDataFrame(data, schema)
df2 = df.selectExpr("explode(payload) as temp", *extract_select_exp(schema,"payload", "temp"))
display(df2)
pavelilyushko commented 8 months ago

Hi Ravi!

Thank you so much for the quick answer.

I got the approach you've suggested: basically instead of using

df2 = df.selectExpr(

"explode(payload) as temp",

"temp.*"

)

I'd generate the second string as an array of the named columns using the python function you provided and it would boil down to something like this in the end:

df2 = df.selectExpr(

"explode(payload) as temp",

"temp.c1",

"temp.c2"

)

However, I'd need to:

  1. add another notebook to read the schema from a bronze table and use that schema to generate an array of the concrete fields.

  2. replace the select expr placeholder in the silver transformation file with the newly generated array of columns

  3. do it for hundreds of other similar tables (which looks like a mess already)

  4. re-run the onboarding job each time (even if my schema does not change!)

  5. I still didn't get how this approach can solve the other issue - excluding certain columns from my final output (using the 'except(col)' command, for instance: "* except(temp.c1)" - this wouldn't work: AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `temp`.`c1` cannot be resolved. Did you mean one of the following? [`id`, `payload`]

The approach I'm suggesting would save me all those troubles - it's as simple as extending the function


def get_silver_schema(self):

        """Get Silver table Schema."""

        silver_dataflow_spec: SilverDataflowSpec = self.dataflowSpec

        # source_database = silver_dataflow_spec.sourceDetails["database"]

        # source_table = silver_dataflow_spec.sourceDetails["table"]

        select_exp = silver_dataflow_spec.selectExp

        where_clause = silver_dataflow_spec.whereClause

        raw_delta_table_stream = self.spark.read.load(

            path=silver_dataflow_spec.sourceDetails["path"],

            format="delta"

            # #f"{source_database}.{source_table}"

        ).selectExpr(*select_exp)

and chaining the calls to selectExpr() applied to each element of the select_exp array.

Please let me know what you think about this.

Thank you!

Pavel

ravi-databricks commented 8 months ago

@pavelilyushko , You need to do it once before running onboarding, generate your silver_transformations.jsonusing above function then load dataflowspecs using onboarding-job. Since you know the schema before in hand, you can use schema files or custom schema function which will generated silver transformation json files

pavelilyushko commented 8 months ago

@ravi-databricks my schema still can occasionally change, and I might even know about it - it should be transparent to me.

Besides as I mentioned, the solution does not address other issues, like excluding certains columns from output , or including only certain ones.

anilkulkarni87 commented 8 months ago

@ravi-databricks Can you please provide an example how the silver_transformations.json will look like for this example? I am dealing with two layers of nesting.