dagster-io / dagster

An orchestration platform for the development, production, and observation of data assets.
https://dagster.io
Apache License 2.0
10.7k stars 1.33k forks source link

integration-embedded-elt - Asset Materialisation Fails To Complete #21051

Open tomwalls opened 3 months ago

tomwalls commented 3 months ago

Dagster version

dagster, version 1.6.13

What's the issue?

I have my Dagster project setup to move csv files from a local folder on my machine into a MotherDuck instance, using integration-embedded-elt (sling integration).

When I configure sling with the sling_replication.yaml, to process all files in the folder and merge them into one table in MotherDuck, the data from all my csv files gets loaded into MotherDuck, but the asset materialisation on dagster never completes/finishes, it just seems to get stuck "in progress"

When I configure sling with the sling_replication.yaml, to process all files in the folder as separate files and create a table per file in MotherDuck, the data from all my csv files gets loaded into MotherDuck, one table per file, and the asset materialisation completes successfully in dagster

What did you expect to happen?

When my sling_replication.yaml is configured to load all files into a single table in my warehouse, then the dagster job completes/finishes and the asset gets materialised, instead of being stuck "Running"

How to reproduce?

setup single sling asset to move all files from local folder to warehouse table (in a single table)

# sling_replication.yaml

source: 'local://'
target: MOTHERDUCK

# default config options which apply to all streams
defaults:
  mode: full-refresh
  object: 'raw_data.{stream_file_folder}_{stream_file_name}'

streams:
  # a stream with my parts, all sub files will be merged into one table
  "file://data/raw/":
    source_options:
      format: csv
      transforms: [remove_accents] # Apply transforms. Here we are removing diacritics (accents) from string values.`

#assets.py

from dagster import file_relative_path
from dagster_embedded_elt.sling import (
    DagsterSlingTranslator,
    SlingResource,
    sling_assets,
)
from dagster_embedded_elt.sling.resources import (
    SlingResource,
)
replication_config = file_relative_path(__file__, "../sling_replication.yaml")

@sling_assets(replication_config=replication_config)
def my_assets(sling: SlingResource):
    yield from sling.replicate(
        replication_config=replication_config,
        dagster_sling_translator=DagsterSlingTranslator(),
    )
#__init__.py

from dagster import Definitions, load_assets_from_modules, EnvVar
from dagster_embedded_elt.sling.resources import (
    SlingConnectionResource,
    SlingResource,
)
from . import assets

all_assets = load_assets_from_modules([assets])

sling_resource = SlingResource(
    connections=[
        SlingConnectionResource(
            name="MOTHERDUCK",
            type="motherduck",
            database="my_db",
            motherduck_token=EnvVar("MOTHERDUCK_TOKEN"),
            duckdb_version="0.9.2"
        ),
    ]
)

defs = Definitions(
    assets=all_assets,
    resources={
         "sling": sling_resource,}
)

Deployment type

Local

Deployment details

No response

Additional information

Using Mac OS as my local machine Files/Data very small, 8 csv files, total size less than 1mb Table is created in motherduck warehouse with "_tmp" on the end of the table name

Message from the maintainers

Impacted by this issue? Give it a πŸ‘! We factor engagement into prioritization.

sotte commented 2 months ago

I have the same issue. But I use a local duckdb db, not motherduck. Almost the same code.

duckdb 0.10 dagster 1.7.2

cmpadden commented 2 months ago

Hi @sotte - I am attempting to reproduce the hanging execution using a local duckdb instance with no luck. I'm curious, how does your project differ from the following?

# main.py

from dagster import (
    AssetExecutionContext,
    Definitions,
    file_relative_path,
)
from dagster_embedded_elt.sling import (
    DagsterSlingTranslator,
    SlingResource,
    sling_assets,
)
from dagster_embedded_elt.sling.resources import SlingConnectionResource, SlingResource

replication_config = file_relative_path(__file__, "sling_replication.yaml")

@sling_assets(replication_config=replication_config)
def my_assets(context: AssetExecutionContext, sling: SlingResource):
    yield from sling.replicate(
        context=context,
        replication_config=replication_config,
        dagster_sling_translator=DagsterSlingTranslator(),
    )

sling_resource = SlingResource(
    connections=[
        SlingConnectionResource(
            name="LOCAL_DUCKDB",
            type="duckdb",
            connection_string="duckdb://local.duckdb",
            duckdb_version="0.9.2",
        ),
    ]
)

defs = Definitions(
    assets=[my_assets],
    resources={
        "sling": sling_resource,
    },
)
# sling_replication.yaml

source: 'local://'
target: LOCAL_DUCKDB

# default config options which apply to all streams
defaults:
  mode: full-refresh
  object: 'main.{stream_file_folder}_{stream_file_name}'

streams:
  # a stream with my parts, all sub files will be merged into one table
  "file://data/raw/":
    source_options:
      format: csv
      transforms: [remove_accents] # Apply transforms. Here we are removing diacritics (accents) from string values.`

Running via dagster dev -f main.py, materialization time was ~2 seconds.

image
D select id, first_name, last_name, _sling_loaded_at from main.data_raw limit 5;
β”Œβ”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚  id   β”‚ first_name β”‚ last_name β”‚ _sling_loaded_at β”‚
β”‚ int32 β”‚  varchar   β”‚  varchar  β”‚      int64       β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚     1 β”‚ Sean       β”‚ Meeson    β”‚       1714579320 β”‚
β”‚     2 β”‚ Magnum     β”‚ Hedgeman  β”‚       1714579320 β”‚
β”‚     3 β”‚ Erna       β”‚ Mont      β”‚       1714579320 β”‚
β”‚     4 β”‚ Meg        β”‚ Gammack   β”‚       1714579320 β”‚
β”‚     5 β”‚ Dennison   β”‚ Stickens  β”‚       1714579320 β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

CSV was randomly generated from https://mockaroo.com/

$ head -n 5 data/raw/mock.csv
id,first_name,last_name,email,gender,ip_address
1,Sean,Meeson,smeeson0@wordpress.com,Male,236.13.124.217
2,Magnum,Hedgeman,mhedgeman1@soup.io,Male,231.0.108.109
3,Erna,Mont,emont2@people.com.cn,Female,73.213.34.234
4,Meg,Gammack,mgammack3@hibu.com,Female,168.246.236.186

$ wc -l data/raw/mock.csv
    1001 data/raw/mock.csv

Edit: when I duplicate the csv to 3 files, it still works as expected. I also confirmed it works when the files have differing schemas (one additional column).

$ cp mock.csv mock2.csv
$ cp mock.csv mock3.csv

D select count(*) from data_raw;
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ count_star() β”‚
β”‚    int64     β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚         3000 β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
sotte commented 2 months ago

Sadly it was a temp project which I wiped since then @cmpadden

My setup was pretty much like your code, but it did not use the yaml file, but the python equivalent. Given that your code works on my machine, I think the yaml file must have been wrong. Which still, is a bit surprising, given that the process (the generation of the asset) was running and not stopping but not throwing any errors.

If I find out more, I'll post it here.