delta-io / delta-rs

A native Rust library for Delta Lake, with bindings into Python
https://delta-io.github.io/delta-rs/
Apache License 2.0
2k stars 363 forks source link

Delta table expect a "__index_level_0__" column #1698

Closed ThomasMargnac closed 9 months ago

ThomasMargnac commented 9 months ago

Environment

Delta-rs version: 0.10.2

Binding: Python 3.9.17

Environment: local


Bug

What happened:

I am trying to pull new data (which contains text) from a delta table in my bucket A, apply some transformations to it (removing urls, removing hashtags, …) and finally load transformed data into a delta table in my bucket B.    The first time I ran this pipeline, it worked perfectly fine. Then I inserted new data in my delta table (bucket A). The second time, it failed and displayed the following error:

Traceback (most recent call last):
  File "//pipelines/silver_pipeline.py", line 23, in <module>
    main()
  File "//pipelines/silver_pipeline.py", line 18, in main
    load_into_delta_lake(
  File "/etl/load/load_into_silver.py", line 24, in load_into_delta_lake
    write_deltalake(
  File "/usr/local/lib/python3.9/site-packages/deltalake/writer.py", line 180, in write_deltalake
    raise ValueError(
ValueError: Schema of data does not match table schema
Table schema:
x: int64
y: string
z: timestamp[us]
__index_level_0__: int64
-- schema metadata --
pandas: '{"index_columns": ["__index_level_0__"], "column_indexes": [{"na' + 1781
Data Schema:
x: int64
y: string
z: timestamp[us]

Apparently, a column named "index_level_0" is required but it is not a column defined by me.

What you expected to happen:

I expected my transformed data to be stored in my delta table (bucket B) without a problem.

How to reproduce it:

Here is my Python script to reproduce it:

from deltalake import DeltaTable
from deltalake.writer import write_deltalake
from datetime import datetime
import pandas as pd
import os

def extract():
    # Getting credentials
    access_id = os.environ.get("MINIO_ID")
    access_secret = os.environ.get("MINIO_SECRET")
    # Defining properties of Data Lakehouse
    storage_options = {
        "AWS_ACCESS_KEY_ID": str(access_id),
        "AWS_SECRET_ACCESS_KEY": str(access_secret),
        "AWS_REGION": "us-east-1",
        "AWS_ENDPOINT_URL": "http://minio:9000",
        "AWS_STORAGE_ALLOW_HTTP": "TRUE",
        "AWS_S3_ALLOW_UNSAFE_RENAME": "TRUE"
    }
    s3_endpoint = "s3://bronze/data"
    # Reading data from Delta Lake bronze layer
    now = datetime.now()
    data = DeltaTable(
        table_uri=s3_endpoint,
        storage_options=storage_options
    ).to_pandas()
    data['z'] = pd.to_datetime(
        arg=data['z'],
        format="%Y-%m-%d %H:%M:%S.%f"
    )
    # Filtering data
    last_bronze_check_file = "last_bronze_check.txt"
    last_bronze_check = None
    # Getting last time bronze was pulled
    try:
        with open(last_bronze_check_file, "r") as file:
            last_bronze_check = datetime.strptime(file.read(), "%Y-%m-%d %H:%M:%S.%f")
    except FileNotFoundError as error:
        print(error)
    if last_bronze_check != None:
        data = data[data['z'] > last_bronze_check].copy()
    # Updating last time data was pulled
    with open(last_bronze_check_file, "w") as file:
        file.write(str(now))
    # Returning data
    return data

def load(
    data: pd.DataFrame
):
    if len(data) > 0:
        # Getting credentials
        access_id = os.environ.get("MINIO_ID")
        access_secret = os.environ.get("MINIO_SECRET")
        # Defining properties of Data Lakehouse
        storage_options = {
            "AWS_ACCESS_KEY_ID": str(access_id),
            "AWS_SECRET_ACCESS_KEY": str(access_secret),
            "AWS_REGION": "us-east-1",
            "AWS_ENDPOINT_URL": "http://minio:9000",
            "AWS_STORAGE_ALLOW_HTTP": "TRUE",
            "AWS_S3_ALLOW_UNSAFE_RENAME": "TRUE"
        }
        # Writing data to Delta Lake
        s3_endpoint = "s3://silver/data"
        write_deltalake(
            table_or_uri=s3_endpoint,
            data=data,
            mode='append',
            storage_options=storage_options
        )

if __name__ == "__main__":
    data = extract()
    load(data)

More details:

ion-elgreco commented 9 months ago

This is because Pandas has a lovely index.... So, the main issue is that on the first write the index column stayed in the data while writing to parquet. What PyArrow version are you using?

I can see pa.Table.from_pandas has preserve_index parameter,

ThomasMargnac commented 9 months ago

I am using PyArrow 12.0.0

titowoche30 commented 9 months ago

Same issue here. @ion-elgreco suggestion fixed it

data = pa.Table.from_pandas(data, preserve_index=False)

write_deltalake(
            table_or_uri=s3_endpoint,
            data=data,
            mode='append',
            storage_options=storage_options
        )
ThomasMargnac commented 9 months ago

Thanks a lot @ion-elgreco and @titowoche30, it worked! After I had an issue on the first write related to the data schema but I fixed my data schema in pa.Table.from_pandas and everything works fine.