Closed edgBR closed 11 months ago
@edgBR are you able to build and compile the latest python release to see if that will make it run?
In the main branch there are some improvements for the merge operation, which you can use in the next python release
Hi @ion-elgreco
Where I can find the instructions for compilation?
Do I need rust installed?
Also would it be possible for you to reproduce locally?
@edgBR in the CONTRIBUTING.md you should be able to see some instructions on how to compile it.
Hi @ion-elgreco
I have cloned and installed the version based on: https://github.com/delta-io/delta-rs/commit/bca00ae0540a452cc465ab0c3d292fef90d8fda0 (last commit)
Unfortunately I am not able to build it properly.
I am getting the following error:
error: could not compile `arrow-data` (lib)
Caused by:
process didn't exit successfully: `/home/azureuser/.rustup/toolchains/stable-x86_64-unknown-linux-gnu/bin/rustc --crate-name arrow_data --edition=2021 /home/azureuser/.cargo/registry/src/index.crates.io-6f17d22bba15001f/arrow-data-48.0.1/src/lib.rs --error-format=json --json=diagnostic-rendered-ansi,artifacts,future-incompat --diagnostic-width=225 --crate-type lib --emit=dep-info,metadata,link -C embed-bitcode=no -C debuginfo=2 --cfg 'feature="ffi"' -C metadata=3bd98a31750d62d1 -C extra-filename=-3bd98a31750d62d1 --out-dir /mnt/batch/tasks/shared/LS_root/mounts/clusters/poetryebrhpc/code/Users/edgar.bahilo/delta-rs/target/debug/deps -L dependency=/mnt/batch/tasks/shared/LS_root/mounts/clusters/poetryebrhpc/code/Users/edgar.bahilo/delta-rs/target/debug/deps --extern arrow_buffer=/mnt/batch/tasks/shared/LS_root/mounts/clusters/poetryebrhpc/code/Users/edgar.bahilo/delta-rs/target/debug/deps/libarrow_buffer-fa7d385f97e94f64.rmeta --extern arrow_schema=/mnt/batch/tasks/shared/LS_root/mounts/clusters/poetryebrhpc/code/Users/edgar.bahilo/delta-rs/target/debug/deps/libarrow_schema-75a9e732fa8d2b13.rmeta --extern half=/mnt/batch/tasks/shared/LS_root/mounts/clusters/poetryebrhpc/code/Users/edgar.bahilo/delta-rs/target/debug/deps/libhalf-c4c9d4c9e1f59e7e.rmeta --extern num=/mnt/batch/tasks/shared/LS_root/mounts/clusters/poetryebrhpc/code/Users/edgar.bahilo/delta-rs/target/debug/deps/libnum-bde8b76108b30fd4.rmeta --cap-lints allow` (signal: 7, SIGBUS: access to undefined memory)
Is there a way you can try to reproduce?
I will not try to run the same code but using local storage instead of ADLSGen2 to see if the problem is also reproducible there.
BR E
Hi @ion-elgreco
I have reproduced locally and unfortunately the merge still freezes:
import logging
import os
from datetime import datetime
from io import BytesIO
from zipfile import ZipFile
import pdb
import polars as pl
from polars.io.delta import _convert_pa_schema_to_delta
import requests
from azure.identity import AzureCliCredential
from azure.storage.filedatalake import DataLakeServiceClient
from deltalake import DeltaTable, write_deltalake
from dotenv import load_dotenv
load_dotenv('../.env')
# Set the logging level for all azure-* libraries
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger('azure')
logger.setLevel(logging.ERROR)
logger_normal = logging.getLogger(__name__)
DOWNLOAD_URIS = [
"https://divvy-tripdata.s3.amazonaws.com/Divvy_Trips_2018_Q4.zip",
"https://divvy-tripdata.s3.amazonaws.com/Divvy_Trips_2019_Q1.zip",
"https://divvy-tripdata.s3.amazonaws.com/Divvy_Trips_2019_Q2.zip",
"https://divvy-tripdata.s3.amazonaws.com/Divvy_Trips_2019_Q3.zip",
"https://divvy-tripdata.s3.amazonaws.com/Divvy_Trips_2019_Q4.zip",
"https://divvy-tripdata.s3.amazonaws.com/Divvy_Trips_2020_Q1.zip",
]
LANDING_ZONE_PATH = os.getenv('LANDING_ZONE_PATH')
ACCOUNT_NAME = os.getenv('STORAGE_ACCOUNT_NAME')
BRONZE_CONTAINER = os.getenv('STAGING_PATH')
SILVER_CONTAINER = os.getenv('HISTORICAL_PATH')
GOLD_CONTAINER = os.getenv('DW_PATH')
class ETLPipeline:
"""
Mock class that simulates a modulith to process from landing to bronze, silver and gold.
It takes the assumption that we process one file at a time for simplification (mostly because
I did not want to write asyncio calls and deal with the connections in the ADLSGenClient)
"""
def __init__(self) -> None:
self.adlsgen2_client = DataLakeServiceClient(
account_url=f"https://{ACCOUNT_NAME}.dfs.core.windows.net/", credential=AzureCliCredential()
)
self.bronze_client = self.adlsgen2_client.get_directory_client(file_system=LANDING_ZONE_PATH, directory='/')
def upload_to_landing(self, uri: str):
"""_summary_
Args:
uri (str): _description_
"""
try:
response = requests.get(url=uri)
response.raise_for_status()
with ZipFile(BytesIO(response.content), 'r') as zip_ref:
# Assuming there is only one file in the zip archive
file_name = zip_ref.namelist()[0]
# Read the CSV file into a polars dataframe
raw_df = pl.read_csv(zip_ref.open(file_name).read())
# Save the DataFrame as a parquet file in blob
file_name = f"{file_name.lower().split('.csv')[0]}.parquet"
path_name = f"../data/input/{file_name}"
raw_df.write_parquet(path_name, use_pyarrow=True, compression='lz4')
# file_client = self.bronze_client.get_file_client(file_name)
# with open(file=path_name, mode="rb") as data:
# file_client.upload_data(data, overwrite=True)
# logger_normal.info(f"{file_name} uploaded to adlsgen2")
self.file_name = file_name
except requests.exceptions.HTTPError as e:
logger_normal.error(f"Failed to download {uri}. HTTP error occurred: {e}")
except Exception as e:
logger_normal.error(f"An error occurred processing {uri}: {e}")
def raw_to_bronze(self):
try:
# storage_options_raw = {"account_name": ACCOUNT_NAME, "anon": False}
# storage_options_raw_delta = {"account_name": ACCOUNT_NAME, "use_azure_cli": "True"}
logger_normal.info(f"Reading {self.file_name}")
# df = pl.read_parquet(
# source=f'abfss://{LANDING_ZONE_PATH}/{self.file_name}', storage_options=storage_options_raw
# )
df = pl.read_parquet(
source=f'../data/input/{self.file_name}'
)
#df = df.with_columns(insertion_date=datetime.now())
#df = df.with_columns([(pl.col("start_time").str.to_datetime().dt.strftime("%Y-%m").alias("monthdate"))])
logger_normal.info(f"Converting {self.file_name} to delta")
# df.write_delta(
# target=f'abfss://{BRONZE_CONTAINER}/',
# mode='append',
# storage_options=storage_options_raw_delta
# # delta_write_options={"partition_by": ['monthdate']}
# )
df.write_delta(target="/tmp/bronze",
mode='append')
# bronze_df = DeltaTable(
# table_uri=f'abfs://{BRONZE_CONTAINER}/', storage_options=storage_options_raw_delta)
# bronze_df.optimize.z_order(['trip_id'])
# bronze_df = DeltaTable(
# table_uri="tmp/bronze")
# bronze_df.optimize.z_order(['trip_id'])
except Exception as e:
logger_normal.error(e)
logger_normal.error(f"Failed to conver to delta {self.file_name}")
def bronze_to_silver(self):
try:
#storage_options_raw_delta = {"account_name": ACCOUNT_NAME, "use_azure_cli": "True"}
# source
#bronze_df = pl.read_delta(source=f'abfss://{BRONZE_CONTAINER}/', storage_options=storage_options_raw_delta)
bronze_df = DeltaTable(
table_uri="/tmp/bronze"
).to_pyarrow_table()
# target
#silver_check = self._table_checker(container=SILVER_CONTAINER, options=storage_options_raw_delta)
silver_check = self._table_checker_local(container=SILVER_CONTAINER)
if silver_check:
# bronze_df = DeltaTable(
# table_uri=f'abfss://{BRONZE_CONTAINER}/', storage_options=storage_options_raw_delta
# ).to_pyarrow_table()
logger_normal.info("Merging new data into silver")
silver_df = DeltaTable(
table_uri="/tmp/silver"
)
(
silver_df.merge(
source=bronze_df, predicate="s.trip_id = t.trip_id", source_alias="s", target_alias="t"
)
.when_matched_update_all()
.when_not_matched_insert_all()
.execute()
)
logger_normal.info("Optimizing by Z order")
# silver_df.optimize.z_order(['trip_id'])
else:
logger_normal.info("Because silver table is empty we save the first bronze file as silver")
# bronze_df.write_delta(
# target=f'abfss://{SILVER_CONTAINER}/', mode='append', storage_options=storage_options_raw_delta
# )
write_deltalake(
table_or_uri="/tmp/silver",
data=bronze_df,
mode='append')
except Exception as e:
logger_normal.error(e)
logger_normal.error(f"Failed to merge {self.file_name}")
def silver_to_gold(self):
return True
def _table_checker(self, container, options):
try:
delta_table = DeltaTable(table_uri=f"abfss://{container}/", storage_options=options)
logger_normal.info(f"Delta table version is {delta_table.version()}")
table_exist = True
logger_normal.info(f"Delta Table Exists in {container}")
except Exception as e:
logger_normal.error(e)
table_exist = False
return table_exist
def _table_checker_local(self, container):
try:
delta_table = DeltaTable(table_uri=f"/tmp/{container}")
logger_normal.info(f"Delta table version is {delta_table.version()}")
table_exist = True
logger_normal.info(f"Delta Table Exists in {container}")
except Exception as e:
logger_normal.error(e)
table_exist = False
return table_exist
if __name__ == "__main__":
etl_workflow = ETLPipeline()
for uri in DOWNLOAD_URIS:
etl_workflow.upload_to_landing(uri=uri)
etl_workflow.raw_to_bronze()
etl_workflow.bronze_to_silver()
@edgBR that's unfortunate, can you make a minimal reproducible example with fake data?
Hi @ion-elgreco
I am now using the examples from here: https://delta.io/blog/2023-10-22-delta-rs-python-v0.12.0/ and running:
from deltalake import DeltaTable, write_deltalake
from datetime import datetime
import polars as pl
from polars.io.delta import _convert_pa_schema_to_delta
def execute():
df = pl.DataFrame(
{
"sales_order_id": ["1000", "1001", "1002", "1003"],
"product": ["bike", "scooter", "car", "motorcycle"],
"order_date": [
datetime(2023, 1, 1),
datetime(2023, 1, 5),
datetime(2023, 1, 10),
datetime(2023, 2, 1),
],
"sales_price": [120.25, 2400, 32000, 9000],
"paid_by_customer": [True, False, False, True],
}
)
print(df)
df.write_delta("sales_orders", mode="append")
new_data = pl.DataFrame(
{
"sales_order_id": ["1002", "1004"],
"product": ["car", "car"],
"order_date": [datetime(2023, 1, 10), datetime(2023, 2, 5)],
"sales_price": [30000.0, 40000.0],
"paid_by_customer": [True, True],
}
)
dt = DeltaTable("sales_orders")
source = new_data.to_arrow()
delta_schema = _convert_pa_schema_to_delta(source.schema)
source = source.cast(delta_schema)
(
dt.merge(
source=source,
predicate="s.sales_order_id = t.sales_order_id",
source_alias="s",
target_alias="t",
)
.when_matched_update_all()
.when_not_matched_insert_all()
.execute()
)
if __name__ == "__main__":
execute()
Error is now:
Traceback (most recent call last):
File "/anaconda/lib/python3.10/runpy.py", line 196, in _run_module_as_main
return _run_code(code, main_globals, None,
File "/anaconda/lib/python3.10/runpy.py", line 86, in _run_code
exec(code, run_globals)
File "/mnt/batch/tasks/shared/LS_root/mounts/clusters/poetryebrhpc/code/Users/edgar.bahilo/delta-lake-examples/code/minimal.py", line 56, in <module>
execute()
File "/mnt/batch/tasks/shared/LS_root/mounts/clusters/poetryebrhpc/code/Users/edgar.bahilo/delta-lake-examples/code/minimal.py", line 25, in execute
df.write_delta("sales_orders", mode="append")
File "/mnt/batch/tasks/shared/LS_root/mounts/clusters/poetryebrhpc/code/Users/edgar.bahilo/delta-lake-examples/.venv/lib/python3.10/site-packages/polars/dataframe/frame.py", line 3655, in write_delta
write_deltalake(
File "/mnt/batch/tasks/shared/LS_root/mounts/clusters/poetryebrhpc/code/Users/edgar.bahilo/delta-lake-examples/.venv/lib/python3.10/site-packages/deltalake/writer.py", line 372, in write_deltalake
_write_new_deltalake(
OSError: Generic DeltaLocalObjectStore error: Operation not supported (os error 95)
@edgBR where are you writing to? Seems like a mounted storage. Ah it is mounted indeed, looking at your filepath, and also seems to be azureml right? :)
So writing to mounted storage is not supported. See here: https://github.com/delta-io/delta-rs/issues/1765
Some one has a fix open to allow these illegal write operations, however I would suggest to still write directly to the cloud storage and not the mounted storage.
Did you do the merge operation also to a mounted storage?
Hi,
As I am using AML (Azure Machine Learning) the folders are over a similar thing to an NFS storage.
When appending /tmp/ to the paths I get now another error:
metrics = self.table._table.merge_execute(
_internal.DeltaError: Generic DeltaTable error: Execution error: Fail to build join indices in NestedLoopJoinExec, error:Arrow error: Invalid argument error: Invalid comparison operation: LargeUtf8 == Utf8
@edgBR with which deltalake version? With version 0.14, I can run this fine.
Hi @ion-elgreco
As mentioned in the .toml I was using 0.13.
I have upgraded to 0.14 and this seems to work. Just as a side node the version 0.14 was published 7h ago in PyPI and it is not in the github releases.
Now this seems to work with local storage. Will update tomorrow about the ADLSGen2 tests.
Yes, there were some build failures with 0.14 so not everything got released to pypi and docs are not updated yet. I am looking into that now.
Hi @ion-elgreco
It seems now everything is working:
I think we can close this now.
Environment
Delta-rs version:
0.13.0
Binding: Python bindings
Environment:
Bug
What happened:
Hi,
Im trying to ramp up the knoweldge of the team in transactional datalakes. I have used delta before with pyspark with more or less proper success but in my current company we do not have yet the use case for spark (as we are not processing not even TB of data for our projects) and polars/duckDB seems a good compromise. I have created the following demo code which performs the following:
The code is as follows:
When running the code:
The merge seems not to work.
I am aware that I am not using bloom filters and partitions but I am running this in a pretty beefy machine (112RAM and 16 cores). Is this normal?
When looking to the storage account I am only able to see the first file:
And no additional commits seem to be running.
What you expected to happen:
I would expect either a warning or something that indicates why this is working so slowly.
BR E