dagster-io / dagster

An orchestration platform for the development, production, and observation of data assets.
https://dagster.io
Apache License 2.0
11.04k stars 1.38k forks source link

dagster doesn't properly release locked duckdb with partitioned asset #19068

Open Sage0614 opened 7 months ago

Sage0614 commented 7 months ago

Dagster version

1.5.14

What's the issue?

I use dagster_duckdb_polars for partitioned asset with time window partition, when I try to backfill for the whole history the issue is I experience some partition fails because of "Could not set lock on file "db.db": Resource temporarily unavailable"

I suspect it is dagster's issue that doesn't realease the table lock properly, because:

  1. not all partition failed, I have around 120 partions, somewhere around 20 fails if backfill all, around 3 fails when I try to backfill the failed partions, and after 3-4 times retry all partitions will succeed;
  2. which partition is going to fail is quite random;
  3. I tried backfill_policy=BackfillPolicy(max_partitions_per_run=1) for the asset, and in the job defination set {"execution": {"config": {"multiprocess": {{"max_concurrent": 1}}}}} this doesn't relieve the issue

What did you expect to happen?

should be able to write to duckdb with partitioned data without cause table to be locked, at least in sequencial mode (max_concurrent =1)

How to reproduce?

create some large upsteam data and write to a duckdb table partitioned by time, in my case I have 120 monthly partition and upstream data have around 100 million records

Deployment type

Local

Deployment details

No response

Additional information

No response

Message from the maintainers

Impacted by this issue? Give it a 👍! We factor engagement into prioritization.

basaran commented 3 months ago

I ran into the same issue during the sample project. If I rerun it passes along.

2024-05-04 20:35:43 -0400 - dagster - ERROR - __ASSET_JOB - 2fb83681-ffb8-4cdb-bf98-7e43645a9525 - 1027389 - manhattan_stats - STEP_FAILURE - Execution of step "manhattan_stats" failed.

dagster._core.errors.DagsterExecutionStepExecutionError: Error occurred while executing op "manhattan_stats"::

duckdb.duckdb.IOException: IO Error: Could not set lock on file "/home/michael/projects/dagster/dagster_university/data/staging/data.duckdb": Conflicting lock is held in /usr/bin/python3.11 (PID 1027395) by user michael. See also https://duckdb.org/docs/connect/concurrency

Stack Trace:
  File "/home/michael/projects/dagster/.venv/lib/python3.11/site-packages/dagster/_core/execution/plan/utils.py", line 54, in op_execution_error_boundary
    yield
  File "/home/michael/projects/dagster/.venv/lib/python3.11/site-packages/dagster/_utils/__init__.py", line 463, in iterate_with_context
    next_output = next(iterator)
                  ^^^^^^^^^^^^^^
  File "/home/michael/projects/dagster/.venv/lib/python3.11/site-packages/dagster/_core/execution/plan/compute_generator.py", line 131, in _coerce_op_compute_fn_to_iterator
    result = invoke_compute_fn(
             ^^^^^^^^^^^^^^^^^^
  File "/home/michael/projects/dagster/.venv/lib/python3.11/site-packages/dagster/_core/execution/plan/compute_generator.py", line 125, in invoke_compute_fn
    return fn(context, **args_to_pass) if context_arg_provided else fn(**args_to_pass)
                                                                    ^^^^^^^^^^^^^^^^^^
  File "/home/michael/projects/dagster/dagster_university/dagster_university/assets/metrics.py", line 29, in manhattan_stats
    conn = duckdb.connect(os.getenv("DUCKDB_DATABASE"))
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
basaran commented 3 months ago

I was able to work around this with a custom connect function wrapping retry logic:

def connect_to_duckdb(database, max_retries=10, retry_delay=2):
    """
    Attempts to connect to a DuckDB database with a retry mechanism.
    Args:
        max_retries (int): Maximum number of retries before giving up.
        retry_delay (int): Time to wait between retries (in seconds).

    Returns:
        duckdb.DuckDBPyConnection or None: Returns a DuckDB connection if successful, None otherwise.
    """
    retry_count = 0
    connection = None

    while retry_count < max_retries:
        try:
            connection = duckdb.connect(database)
            logging.info("Connected to DuckDB successfully!")
            break  # Exit the loop on successful connection
        except Exception as e:
            logging.error(
                f"Connection attempt failed: {e}. Retrying in {retry_delay} seconds...")
            time.sleep(retry_delay)  # Wait before retrying
            retry_count += 1  # Increment the retry count

    if connection is None:
        logging.error("Failed to connect to DuckDB after maximum retries.")

    return connection  # Return the established connection or None if failed

It seems similar issues were reported in the past too. #18746.

nobelsmith commented 2 months ago

@basaran I am relatively new to both duckdb and dagster. I was wondering if the code that you shared is a helper function or some sort of custom dagster resource? Also, are you using this code snippet instead of importing DuckDBResource? Thanks for any help!

nobelsmith commented 2 months ago

I tried implementing your code snippet as a part of a custom duckdb resource using the ConfigurableResource class, but it didn't end up doing it for me. I still ran into the same problem with partitions. Is this something that will potentially be patched soon or should I look to use a different technology other than duckdb?

garethbrickman commented 2 months ago

@nobelsmith There was an attempt in this PR https://github.com/dagster-io/dagster/pull/18873 but it seems to have stalled. If you'd like you can try pick up the thread there.

basaran commented 2 months ago

@nobelsmith, I just use the connect_to_duckdb function above in an asset. Such as:

@asset(
    deps=["taxi_zones_file"]
)
def taxi_zones(context):
    sql_query = f"""
        create or replace table zones as (
            select
                LocationID as zone_id,
                zone,
                borough,
                the_geom as geometry
        from '{constants.TAXI_ZONES_FILE_PATH}'
        );
    """
    # conn = duckdb.connect(os.getenv("DUCKDB_DATABASE"))
    conn = connect_to_duckdb(context, os.getenv('DUCKDB_DATABASE'))
    conn.execute(sql_query)