awslabs / aws-glue-libs

AWS Glue Libraries are additions and enhancements to Spark for ETL operations.
Other
636 stars 300 forks source link

Filter.apply() does not retain order of columns #97

Open vfrank66 opened 3 years ago

vfrank66 commented 3 years ago

I am not sure where to file bugs for aws glue libraries so let me know if I am in the wrong location.

Filter.apply() does not retain the order of the columns which is a problem for me since a few columns of mine or partition columns. This seems like unexpected behavior. Ful

        df: DynamicFrame = glue_context.create_dynamic_frame.from_catalog(
            database=database,
            table_name=table,
            additional_options={"useS3ListImplementation": True},
        )
        hash_values_list = ['1', '2']
        df = Filter.apply(frame=df, f=lambda x: x["hash_value"] not in hash_values_list)

      sink = glue_context.getSink(
          connection_type="s3",
          path=s3_storage_location,
          enableUpdateCatalog=True,
          updateBehavior="UPDATE_IN_DATABASE",
          partitionKeys=partition_keys,
      )
      sink.setFormat("glueparquet")
      sink.setCatalogInfo(
          catalogDatabase=database,
          catalogTableName=table,
      )
      errors = sink.writeFrame(df)

Errors on no partition column. The column does exist but all the fields are reordered. This seems like incorrect behavior since one Filter.apply() is not compatible with sink without addition work.

itk0492 commented 2 years ago

I have this same issue when I apply a Filter to a DynamicFrame, have you find a workaround?

vfrank66 commented 2 years ago

I just stopped using dynamicframes to perform what I needed to do. I noticed several things did not work as expected and then writing with dynamic frames is much slower than spark.DataFrames.

For this workaround I did a leftanti join to delete records.

                    current_column_order = existing_df.columns
                    if incr_syndicated_data.prefix_size_bytes < 10_485_760:
                        existing_df = existing_df.join(F.broadcast(df), "hash_value", "leftanti")
                    else:
                        existing_df = existing_df.join(df, "hash_value", "leftanti")
                    # retain column order which is important for partition update comparison check
                    existing_df = existing_df.select(current_column_order)