dlt-hub / dlt

data load tool (dlt) is an open source Python library that makes data loading easy 🛠️
https://dlthub.com/docs
Apache License 2.0
2.7k stars 180 forks source link

Error sending Delta files to Azure Gen2 Storage if over a certain size. #2086

Open pwr-philarmstrong opened 2 days ago

pwr-philarmstrong commented 2 days ago

dlt version

1.3.0

Describe the problem

I have a pipeline that copies a table from sql server to azure gen2 storage. It creates delta files and works fine if the parquet files are small however when they get larger I get a failure sending and it goes into a retry loop.

Logging the azure storage I can see this sort of error details

StatusCode         500
StatusText         InternalError
DurationMs         29870
ServerLatencyMs    379
Uri                https://playgrounddatapstorage.blob.core.windows.net:443/bronze/risk360_us_sql_server_test/elapsedtime/part-00001-151d1409-68ce-4e58-8065-f6c535dfdd38-c000.snappy.parquet
ServiceType        blob
ObjectKey          /playgrounddatapstorage/bronze/risk360_us_sql_server_test/elapsedtime/part-00001-151d1409-68ce-4e58-8065-f6c535dfdd38-c000.snappy.parquet
RequestHeaderSize  452
RequestBodySize    58179144
ResponseHeaderSize 173
Category           StorageWrite
TlsVersion         TLS 1.2
MetricResponseType NetworkError
SourceAccessTier   Invalid

the pipeline part looks like this

           # Create a dlt pipeline object
            pipeline = dlt.pipeline(
                pipeline_name="client_tables_to_delta_az",
                destination=filesystem(bucket_url=f'az://bronze/'), #abfss
                dataset_name=folder_dataset_name,
            )            

            # Run the pipeline
            load_info = pipeline.run(all_data(connection_string, database_item), 
                                     write_disposition="append", 
                                     table_format="delta",                                                                  
                                    )

and this is a chunk of the log file around where it fails though it only shows that its waiting and that if fails after 5 retries.

2024-11-21 17:34:11 - azure.core.pipeline.policies.http_logging_policy - INFO - Request URL: 'https://playgrounddatapstorage.blob.core.windows.net/bronze/risk360_us_sql_server_test/init'
Request method: 'DELETE'
Request headers:
    'x-ms-version': 'REDACTED'
    'Accept': 'application/xml'
    'x-ms-date': 'REDACTED'
    'x-ms-client-request-id': 'd22291ad-a82e-11ef-9aac-8c3b4a05b5bc'
    'User-Agent': 'azsdk-python-storage-blob/12.23.1 Python/3.12.6 (Windows-11-10.0.22631-SP0)'
    'Authorization': 'REDACTED'
No body was attached to the request
2024-11-21 17:34:12 - azure.core.pipeline.policies.http_logging_policy - INFO - Response status: 202
Response headers:
    'Content-Length': '0'
    'Server': 'Windows-Azure-Blob/1.0 Microsoft-HTTPAPI/2.0'
    'x-ms-request-id': '6b951afc-201e-0061-763b-3cd464000000'
    'x-ms-client-request-id': 'd22291ad-a82e-11ef-9aac-8c3b4a05b5bc'
    'x-ms-version': 'REDACTED'
    'x-ms-delete-type-permanent': 'REDACTED'
    'x-ms-deletion-id': 'REDACTED'
    'Date': 'Thu, 21 Nov 2024 17:34:11 GMT'
2024-11-21 17:34:12 - azure.core.pipeline.policies.http_logging_policy - INFO - Request URL: 'https://playgrounddatapstorage.blob.core.windows.net/bronze/risk360_us_sql_server_test/init?comp=REDACTED'
Request method: 'PUT'
Request headers:
    'Content-Length': '52'
    'x-ms-meta-is_directory': 'REDACTED'
    'x-ms-version': 'REDACTED'
    'Content-Type': 'application/xml'
    'Accept': 'application/xml'
    'x-ms-date': 'REDACTED'
    'x-ms-client-request-id': 'd237588e-a82e-11ef-9e2e-8c3b4a05b5bc'
    'User-Agent': 'azsdk-python-storage-blob/12.23.1 Python/3.12.6 (Windows-11-10.0.22631-SP0)'
    'Authorization': 'REDACTED'
A body is sent with the request
2024-11-21 17:34:12 - azure.core.pipeline.policies.http_logging_policy - INFO - Response status: 201
Response headers:
    'Content-Length': '0'
    'Last-Modified': 'Thu, 21 Nov 2024 17:34:12 GMT'
    'Etag': '"0x8DD0A52B69D7BFC"'
    'Server': 'Windows-Azure-Blob/1.0 Microsoft-HTTPAPI/2.0'
    'x-ms-request-id': '6b951b43-201e-0061-333b-3cd464000000'
    'x-ms-client-request-id': 'd237588e-a82e-11ef-9e2e-8c3b4a05b5bc'
    'x-ms-version': 'REDACTED'
    'x-ms-content-crc64': 'REDACTED'
    'x-ms-request-server-encrypted': 'REDACTED'
    'Date': 'Thu, 21 Nov 2024 17:34:11 GMT'
2024-11-21 17:34:12,193|[INFO]|54140|53952|dlt|load.py|resume_started_jobs:291|Found 0 that are already started and should be continued
2024-11-21 17:34:12,193|[INFO]|54140|53952|dlt|load.py|complete_jobs:390|Will complete 0 for 1732210401.2547312
2024-11-21 17:34:12,195|[INFO]|54140|53952|dlt|load.py|start_new_jobs:274|Will load additional 2, creating jobs
2024-11-21 17:34:12,196|[INFO]|54140|53952|dlt|load.py|submit_job:169|Will load file 1732210401.2547312\new_jobs\elapsedtime.ea8c3353ec.0.parquet with table name elapsedtime
2024-11-21 17:34:12,669|[INFO]|54140|53952|dlt|load.py|submit_job:169|Will load file 1732210401.2547312\new_jobs\_dlt_pipeline_state.4857574e24.0.jsonl with table name _dlt_pipeline_state
2024-11-21 17:34:13,686|[INFO]|54140|53952|dlt|load.py|complete_jobs:390|Will complete 2 for 1732210401.2547312
2024-11-21 17:34:13,687|[DEBUG]|54140|53952|dlt|load.py|complete_jobs:393|Checking state for job elapsedtime.ea8c3353ec.parquet
2024-11-21 17:34:13,694|[INFO]|54140|53952|dlt|load.py|create_followup_jobs:369|Job elapsedtime.ea8c3353ec.parquet CREATED a new FOLLOWUP JOB C:\Users\PHILAR~1\AppData\Local\Temp\elapsedtime.ea8c3353ec.0.reference placed in new_jobs
2024-11-21 17:34:13,696|[INFO]|54140|53952|dlt|load.py|complete_jobs:459|Job for elapsedtime.ea8c3353ec.parquet completed in load 1732210401.2547312
--------------------- Load all_data in 1732210401.2547312 ----------------------
Jobs: 1/2 (50.0%) | Time: 4.19s | Rate: 0.24/s
Memory usage: 266.70 MB (83.70%) | CPU usage: 0.00%

2024-11-21 17:34:13,717|[DEBUG]|54140|53952|dlt|load.py|complete_jobs:393|Checking state for job _dlt_pipeline_state.4857574e24.jsonl
2024-11-21 17:34:13,719|[INFO]|54140|53952|dlt|load.py|complete_jobs:459|Job for _dlt_pipeline_state.4857574e24.jsonl completed in load 1732210401.2547312
2024-11-21 17:34:13,720|[INFO]|54140|53952|dlt|load.py|start_new_jobs:274|Will load additional 1, creating jobs
2024-11-21 17:34:13,721|[INFO]|54140|53952|dlt|load.py|submit_job:169|Will load file 1732210401.2547312\new_jobs\elapsedtime.ea8c3353ec.0.reference with table name elapsedtime
2024-11-21 17:34:13,751|[INFO]|54140|53632|dlt|filesystem.py|run:138|Will copy file(s) ['C:\\Users\\PhilArmstrong\\.dlt\\pipelines\\client_tables_to_delta_az\\load\\normalized\\1732210401.2547312\\completed_jobs\\elapsedtime.ea8c3353ec.0.parquet'] to delta table az://bronze/risk360_us_sql_server_test/elapsedtime [arrow buffer: 0]
2024-11-21 17:34:14,764|[INFO]|54140|53952|dlt|load.py|complete_jobs:390|Will complete 1 for 1732210401.2547312
2024-11-21 17:34:14,765|[DEBUG]|54140|53952|dlt|load.py|complete_jobs:393|Checking state for job elapsedtime.ea8c3353ec.reference
2024-11-21 17:34:14,766|[DEBUG]|54140|53952|dlt|load.py|complete_jobs:397|job elapsedtime.ea8c3353ec.reference still running
2024-11-21 17:34:14,768|[INFO]|54140|53952|dlt|load.py|start_new_jobs:274|Will load additional 0, creating jobs
2024-11-21 17:34:15,777|[INFO]|54140|53952|dlt|load.py|complete_jobs:390|Will complete 1 for 1732210401.2547312
2024-11-21 17:34:15,778|[DEBUG]|54140|53952|dlt|load.py|complete_jobs:393|Checking state for job elapsedtime.ea8c3353ec.reference
2024-11-21 17:34:15,778|[DEBUG]|54140|53952|dlt|load.py|complete_jobs:397|job elapsedtime.ea8c3353ec.reference still running
2024-11-21 17:34:15,779|[INFO]|54140|53952|dlt|load.py|start_new_jobs:274|Will load additional 0, creating jobs
2024-11-21 17:34:16,790|[INFO]|54140|53952|dlt|load.py|complete_jobs:390|Will complete 1 for 1732210401.2547312
2024-11-21 17:34:16,791|[DEBUG]|54140|53952|dlt|load.py|complete_jobs:393|Checking state for job elapsedtime.ea8c3353ec.reference
2024-11-21 17:34:16,792|[DEBUG]|54140|53952|dlt|load.py|complete_jobs:397|job elapsedtime.ea8c3353ec.reference still running
2024-11-21 17:34:16,793|[INFO]|54140|53952|dlt|load.py|start_new_jobs:274|Will load additional 0, creating jobs
2024-11-21 17:34:17,802|[INFO]|54140|53952|dlt|load.py|complete_jobs:390|Will complete 1 for 1732210401.2547312
2024-11-21 17:44:33,778|[DEBUG]|36652|42288|dlt|load.py|complete_jobs:393|Checking state for job elapsedtime.ea8c3353ec.reference
2024-11-21 17:44:33,780|[DEBUG]|36652|42288|dlt|load.py|complete_jobs:397|job elapsedtime.ea8c3353ec.reference still running
2024-11-21 17:44:33,781|[INFO]|36652|42288|dlt|load.py|start_new_jobs:274|Will load additional 0, creating jobs
2024-11-21 17:44:34,788|[INFO]|36652|42288|dlt|load.py|complete_jobs:390|Will complete 1 for 1732210401.2547312
2024-11-21 17:44:34,789|[DEBUG]|36652|42288|dlt|load.py|complete_jobs:393|Checking state for job elapsedtime.ea8c3353ec.reference
2024-11-21 17:44:34,790|[DEBUG]|36652|42288|dlt|load.py|complete_jobs:397|job elapsedtime.ea8c3353ec.reference still running
2024-11-21 17:44:34,792|[INFO]|36652|42288|dlt|load.py|start_new_jobs:274|Will load additional 0, creating jobs
2024-11-21 17:44:35,586|[ERROR]|36652|17540|dlt|load.py|w_run_job:248|Transient exception in job elapsedtime.ea8c3353ec.reference in file C:\Users\PhilArmstrong\.dlt\pipelines\client_tables_to_delta_az\load\normalized\1732210401.2547312\started_jobs\elapsedtime.ea8c3353ec.1.reference
Traceback (most recent call last):
  File "C:\Users\PhilArmstrong\OneDrive - POWWR\Dev\venv\Lib\site-packages\dlt\common\destination\reference.py", line 420, in run_managed
    self.run()
  File "C:\Users\PhilArmstrong\OneDrive - POWWR\Dev\venv\Lib\site-packages\dlt\destinations\impl\filesystem\filesystem.py", line 161, in run
    write_delta_table(
  File "C:\Users\PhilArmstrong\OneDrive - POWWR\Dev\venv\Lib\site-packages\dlt\common\libs\deltalake.py", line 88, in write_delta_table
    write_deltalake(  # type: ignore[call-overload]
  File "C:\Users\PhilArmstrong\OneDrive - POWWR\Dev\venv\Lib\site-packages\deltalake\writer.py", line 323, in write_deltalake
    write_deltalake_rust(
OSError: Generic MicrosoftAzure error: Error after 5 retries in 180.8821565s, max_retries:10, retry_timeout:180s, source:error sending request for url (https://playgrounddatapstorage.blob.core.windows.net/bronze/risk360_us_sql_server_test/elapsedtime/part-00001-36118c25-ac6d-43b3-aad3-90a6131126f2-c000.snappy.parquet)
2024-11-21 17:44:35,801|[INFO]|36652|42288|dlt|load.py|complete_jobs:390|Will complete 1 for 1732210401.2547312
2024-11-21 17:44:35,802|[DEBUG]|36652|42288|dlt|load.py|complete_jobs:393|Checking state for job elapsedtime.ea8c3353ec.reference
2024-11-21 17:44:35,805|[WARNING]|36652|42288|dlt|load.py|complete_jobs:430|Job for elapsedtime.ea8c3353ec.reference retried in load 1732210401.2547312 with message Generic MicrosoftAzure error: Error after 5 retries in 180.8821565s, max_retries:10, retry_timeout:180s, source:error sending request for url (https://playgrounddatapstorage.blob.core.windows.net/bronze/risk360_us_sql_server_test/elapsedtime/part-00001-36118c25-ac6d-43b3-aad3-90a6131126f2-c000.snappy.parquet)
2024-11-21 17:44:35,805|[INFO]|36652|42288|dlt|load.py|start_new_jobs:274|Will load additional 1, creating jobs
2024-11-21 17:44:35,806|[INFO]|36652|42288|dlt|load.py|submit_job:169|Will load file 1732210401.2547312\new_jobs\elapsedtime.ea8c3353ec.2.reference with table name elapsedtime
2024-11-21 17:44:35,852|[INFO]|36652|17540|dlt|filesystem.py|run:138|Will copy file(s) ['C:\\Users\\PhilArmstrong\\.dlt\\pipelines\\client_tables_to_delta_az\\load\\normalized\\1732210401.2547312\\completed_jobs\\elapsedtime.ea8c3353ec.0.parquet'] to delta table az://bronze/risk360_us_sql_server_test/elapsedtime [arrow buffer: 0]
2024-11-21 17:44:36,870|[INFO]|36652|42288|dlt|load.py|complete_jobs:390|Will complete 1 for 1732210401.2547312
2024-11-21 17:44:36,924|[DEBUG]|36652|42288|dlt|load.py|complete_jobs:393|Checking state for job elapsedtime.ea8c3353ec.reference
2024-11-21 17:44:36,928|[DEBUG]|36652|42288|dlt|load.py|complete_jobs:397|job elapsedtime.ea8c3353ec.reference still running
2024-11-21 17:44:36,947|[INFO]|36652|42288|dlt|load.py|start_new_jobs:274|Will load additional 0, creating jobs
2024-11-21 17:44:37,962|[INFO]|36652|42288|dlt|load.py|complete_jobs:390|Will complete 1 for 1732210401.2547312

Expected behavior

I would expect the files to either be successfully sent to azure storage either as a single file as with the smaller files or broken into blocks.

Steps to reproduce

I've managed to create some code to generate the issue It works if the file is sent as parquet but if the format is delta it fails.

import dlt
from dlt import pipeline
from dlt.sources.filesystem import filesystem, read_csv
from dlt.destinations import filesystem as fs_destination
import pandas as pd
import numpy as np

dlt.config["normalize.data_writer.file_max_bytes"] = 5000000
dlt.config["runtime.log_level"] = "DEBUG"
dlt.config["progress"] = "log"

# Generate a dataset of 100MB
def generate_data(num_rows=4000000):
    data = {
        'id': np.arange(num_rows),
        'value': np.random.rand(num_rows),
        'timestamp': pd.date_range(start='1/1/2022', periods=num_rows, freq='T')
    }
    df = pd.DataFrame(data)
    return df

@dlt.resource(table_name='data3')
def reader():
    data = generate_data()
    yield data

# Define the destination (Azure Blob Storage)
mydestination = fs_destination(
    destination=filesystem(bucket_url=f'az://bronze/'), 
    #credentials={
    #    "account_name": "your-azure-account-name",
    #    "account_key": "your-azure-account-key",
    #},
)

# Create a pipeline
pipeline = dlt.pipeline(
    pipeline_name="dummy_to_delta", 
    dataset_name="dummy_to_delta", 
    destination=mydestination,
)

info = pipeline.run(reader, write_disposition="append", 
                    #loader_file_format="parquet",
                    table_format="delta",)
print(info)

Operating system

Windows

Runtime environment

Local

Python version

3.11

dlt data source

microsoft sql server but the problem also happens with a df datasource

dlt destination

Filesystem & buckets

Other deployment details

No response

Additional information

I have a pipeline that copies a table from sql server to azure gen2 storage. It creates delta files and works fine if the parquet files are small however when they get larger I get a failure sending and it goes into a retry loop.

Logging the azure storage I can see this sort of error details

StatusCode         500
StatusText         InternalError
DurationMs         29870
ServerLatencyMs    379
Uri                https://playgrounddatapstorage.blob.core.windows.net:443/bronze/risk360_us_sql_server_test/elapsedtime/part-00001-151d1409-68ce-4e58-8065-f6c535dfdd38-c000.snappy.parquet
ServiceType        blob
ObjectKey          /playgrounddatapstorage/bronze/risk360_us_sql_server_test/elapsedtime/part-00001-151d1409-68ce-4e58-8065-f6c535dfdd38-c000.snappy.parquet
RequestHeaderSize  452
RequestBodySize    58179144
ResponseHeaderSize 173
Category           StorageWrite
TlsVersion         TLS 1.2
MetricResponseType NetworkError
SourceAccessTier   Invalid

the pipeline part looks like this

           # Create a dlt pipeline object
            pipeline = dlt.pipeline(
                pipeline_name="client_tables_to_delta_az",
                destination=filesystem(bucket_url=f'az://bronze/'), #abfss
                dataset_name=folder_dataset_name,
            )            

            # Run the pipeline
            load_info = pipeline.run(all_data(connection_string, database_item), 
                                     write_disposition="append", 
                                     table_format="delta",                                                                  
                                    )

Looking at the azure logs for smaller files they look like this

StatusCode         201
StatusText         Success
DurationMs         10
ServerLatencyMs    10
Uri                https://playgrounddatapstorage2.blob.core.windows.net:443/bronze/risk360_us_sql_server/customercountlog/database_name=Dfsc_Clearview/year=2028/part-00001-38473a92-4ebc-4991-bb54-381028d88834-c000.snappy.parquet
ServiceType        blob
ObjectKey          /playgrounddatapstorage2/bronze/risk360_us_sql_server/customercountlog/database_name=Dfsc_Clearview/year=2028/part-00001-38473a92-4ebc-4991-bb54-381028d88834-c000.snappy.parquet
RequestHeaderSize  489
RequestBodySize    4581
ResponseHeaderSize 279
Category           StorageRead
TlsVersion         TLS 1.2
MetricResponseType Success
SourceAccessTier   Invalid
SourceSystem       Azure

I also tried sending the larger parquet file using a standalone python script and the azure.storage.blob package

from azure.storage.blob import BlobServiceClient
from datetime import timedelta  # Import timedelta

# Create a BlobServiceClient with custom timeout settings
blob_service_client = BlobServiceClient(
    account_url="https://playgrounddatapstorage.blob.core.windows.net/",
    credential="Mykey")

# Upload a large file
blob_client = blob_service_client.get_blob_client(container="bronze", blob="test_load_large_file.parquet")
with open("C:/data/dlt/load_data_delta/dbo_elapsedtime/risk360_us_sql_server/elapsedtime/database_name=PJM_Settlements/year=2018/part-00001-8d376a4b-848b-4f27-84bc-34b70f9999b7-c000.snappy.parquet", "rb") as data:
   blob_client.upload_blob(data, overwrite=True)

This worked fine and seemed to send the file in blocks the logs for one block look like this

StatusCode         201
StatusText         Success
DurationMs         6812
ServerLatencyMs    205
Uri                https://playgrounddatapstorage.blob.core.windows.net:443/bronze/test_load_large_file.parquet?comp=block&blockid=TURBd01EQXdNREF3TURBd01EQXdNREF3TURBd01EQXdNakE1TnpFMU1qQSUzRA%3D%3D
ServiceType        blob
ObjectKey          /playgrounddatapstorage/bronze/test_load_large_file.parquet
RequestHeaderSize  652
RequestBodySize    4194304
ResponseHeaderSize 237
Category           StorageWrite
TlsVersion         TLS 1.3
MetricResponseType Success
SourceAccessTier   Invalid
SourceSystem       Azure

I was also able to send pure parquet files to azure without an issue however delta seems to create larger parquet files.

I also tried adjusting a number of the dlt config items e.g.

# Set the DLT configuration
dlt.config["buffer_max_items"] = 25000
dlt.config["sources.sql_database.buffer_max_items"] = 25000
dlt.config["extract.workers"] = 1
dlt.config["normalize.workers"] = 1
dlt.config["load.workers"] = 1
dlt.config["normalize.data_writer.file_max_bytes"] = 5000000
dlt.config["data_writer.file_max_bytes"] = 10000
dlt.config["sources.data_writer.file_max_bytes"] = 10000
dlt.config["data_writer.file_max_items"] = 15000
dlt.config["file_max_items"] = 15000
dlt.config["sources.data_writer.file_max_items"] = 25000
dlt.config["sources.data_writer.file_max_items"] = 25000
rudolfix commented 2 days ago

I think that your problem is related to this: https://github.com/dlt-hub/dlt/issues/2030 which in turn unfortunately waits for a delta "bug" to be fixed.

we'll probably merge #2030 to give our users workaround (merge many files instead of a single big dataset)

pwr-philarmstrong commented 2 days ago

ok. not sure of the details of that issue. Its worth pointing out that the file is created correctly in the normalized completed_jobs folder, but when it sends to azure it tries do it in a single block. If I use parquet and not delta the file is sent using small blocks. Also Delta files are created fine to a local filesystem its just when I want to use azure that if fails. Not sure if other cloud services have a similar issue.