delta-io / delta-rs

A native Rust library for Delta Lake, with bindings into Python
https://delta-io.github.io/delta-rs/
Apache License 2.0
1.98k stars 365 forks source link

"Invalid argument error: arguments need to have the same data type" Merging to modified struct #2530

Open rob-harrison opened 1 month ago

rob-harrison commented 1 month ago

Environment

Delta-rs version: deltalake==0.17.4

Binding: python

Environment:


Bug

What happened:

  1. Table with struct column 'gps_extended_signal' created via spark
  2. Data merged successfully using python DeltaTable .merge() with pyarrow RecordBatch
  3. Modified struct column, adding 3 extra fields via spark
  4. Modified python code to include struct's new fields in pyarrow RecordBatch
  5. Attempts to subsequently merge new or existing data fail with:

File "/usr/local/lib/python3.11/site-packages/deltalake/table.py", line 1800, in execute metrics = self.table._table.merge_execute( ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ _internal.DeltaError: Generic DeltaTable error: External error: Arrow error: Invalid argument error: arguments need to have the same data type

What you expected to happen: Data merged successfully after modifying table struct

How to reproduce it: Table:

CREATE TABLE dwh.frame_test ( key STRING, gps_extended_signal STRUCT<pitch: DOUBLE, roll: DOUBLE, course_error: DOUBLE, pitch_error: DOUBLE, roll_error: DOUBLE, velocity_forward: DOUBLE, velocity_left: DOUBLE, velocity_up: DOUBLE, velocity_forward_error: DOUBLE, velocity_left_error: DOUBLE, velocity_up_error: DOUBLE, location_3d_error: DOUBLE, solution_type: STRING, undulation: DOUBLE>, ... h3_id_res9 BIGINT) USING delta PARTITIONED BY (h3_id_res9)

Alter table:

ALTER TABLE dwh.frame_test ADD COLUMNS (gps_extended_signal.position_std_up DOUBLE AFTER undulation, gps_extended_signal.position_std_north DOUBLE AFTER undulation, gps_extended_signal.position_std_east DOUBLE AFTER undulation)

Merge code (after modifying gps_extended_signal struct to add positionstd* fields):

SOURCE_ALIAS = 's'
TARGET_ALIAS = 't'

def upsert_frames(self, frames: List[ExtractedVideoFrame]):
    batch = build_pyarrow_batch(frames=frames)
    if batch.num_rows > 0:
        dt = get_delta_table(table_path=self.delta_table_path, dynamo_table_name=self.dynamo_table_name)
        partition_key_values = self._get_partition_keys(frames=frames)
        merge_predicate = f'{TARGET_ALIAS}.{HD_MAP_FRAME_PARTITION_BY_COLUMN} in ({partition_key_values}) and {TARGET_ALIAS}.{HD_MAP_FRAME_KEY_COLUMN} = {SOURCE_ALIAS}.{HD_MAP_FRAME_KEY_COLUMN}'
        update_predicate = f'{SOURCE_ALIAS}.{HD_MAP_FRAME_EXTRACTED_AT_MS_COLUMN} >= {TARGET_ALIAS}.{HD_MAP_FRAME_EXTRACTED_AT_MS_COLUMN}'
        metrics = (
            dt.merge(
                source=batch,
                predicate=merge_predicate,
                source_alias=SOURCE_ALIAS,
                target_alias=TARGET_ALIAS,
                large_dtypes=False
            )
            .when_matched_update_all(predicate=update_predicate)
            .when_not_matched_insert_all()
            .execute()

def build_pyarrow_batch(frames: List[ExtractedVideoFrame]) -> pa.RecordBatch:
    key = pa.array([frame.frame_id for frame in frames], type=pa.string())
    gps_extended_signal = pa.array([(
        frame.gps_extended_signal.pitch,
        frame.gps_extended_signal.roll,
        frame.gps_extended_signal.course_error,
        frame.gps_extended_signal.pitch_error,
        frame.gps_extended_signal.roll_error,
        frame.gps_extended_signal.velocity_forward,
        frame.gps_extended_signal.velocity_left,
        frame.gps_extended_signal.velocity_up,
        frame.gps_extended_signal.velocity_forward_error,
        frame.gps_extended_signal.velocity_left_error,
        frame.gps_extended_signal.velocity_up_error,
        frame.gps_extended_signal.location_3d_error,
        frame.gps_extended_signal.solution_type,
        frame.gps_extended_signal.undulation,
        frame.gps_extended_signal.position_std_east,
        frame.gps_extended_signal.position_std_north,
        frame.gps_extended_signal.position_std_up
    ) for frame in frames], type=GPS_EXTENDED_SIGNAL_TYPE)
    ...
    h3_id_partition = pa.array([int(getattr(frame.h3_indices, HD_MAP_FRAME_INDICES_PARTITION_BY_COLUMN)) for frame in frames], type=pa.int64())

    batch = pa.RecordBatch.from_arrays([key, gps_extended_signal, snapped_gps_point, ... h3_id_partition], names=HD_MAP_FRAME_COLUMNS)
    return batch

GPS_EXTENDED_SIGNAL_TYPE = pa.struct([
    pa.field('pitch', pa.float64()),
    pa.field('roll', pa.float64()),
    pa.field('course_error', pa.float64()),
    pa.field('pitch_error', pa.float64()),
    pa.field('roll_error', pa.float64()),
    pa.field('velocity_forward', pa.float64()),
    pa.field('velocity_left', pa.float64()),
    pa.field('velocity_up', pa.float64()),
    pa.field('velocity_forward_error', pa.float64()),
    pa.field('velocity_left_error', pa.float64()),
    pa.field('velocity_up_error', pa.float64()),
    pa.field('location_3d_error', pa.float64()),
    pa.field('solution_type', pa.string()),
    pa.field('undulation', pa.float64()),
    pa.field('position_std_east', pa.float64()),
    pa.field('position_std_north', pa.float64()),
    pa.field('position_std_up', pa.float64())
])

More details:

rob-harrison commented 3 weeks ago

bumping this - effectively it seems we're forced to create a new table to modify schema which is not ideal.