dagster-io / dagster

An orchestration platform for the development, production, and observation of data assets.
https://dagster.io
Apache License 2.0
11.78k stars 1.48k forks source link

Docs for partitioned backfills on sling runs #24234

Open nrsmac opened 2 months ago

nrsmac commented 2 months ago

What's the issue or suggestion?

There isn't a clear documented way to use partitions in Sling. I see I can provide a partitions_def but how do those values pass to Sling for a backfill? https://docs.dagster.io/_apidocs/libraries/dagster-embedded-elt#sling-dagster-embedded-elt-sling

My defined asset:

from dagster_embedded_elt.sling import SlingResource, sling_assets
from dagster import file_relative_path
from partitions import daily_partitions_def

replication_config = file_relative_path(__file__, "../resources/replication.yml")

@sling_assets(replication_config=replication_config, partitions_def=daily_partitions_def)
def sling_assets(context, sling: SlingResource):
    yield from sling.replicate(context=context)  # Tried passing as kwargs here...
    for row in sling.stream_raw_logs():
        context.log.info(row)

In the Sling documentation, it gives an example of passing environment variables to Sling https://docs.slingdata.io/sling-cli/run/configuration/variables.

replication.yml:

source: SQLDB
target: DUCKDB

defaults:
  mode: backfill
  object: "{stream_schema}_{stream_table}"
  source_options:
    empty_as_null: false
  target_options:
    column_casing: snake
streams:
  example.stream:
    object: example.object
    primary_key: pk
    update_key: start_time
    source_options:
      limit: 1000
      #range: 2024-07-01,2024-07-02
      range:${START_DATE},${END_DATE}   # How do I get partition keys to populate here?
env:
  SLING_LOADED_AT_COLUMN: true
  SLING_STREAM_URL_COLUMN: true
  start_date: '${START_DATE}'  # In the case of using envvars, but I want the partition keys from the execution context here.
  end_date: '${END_DATE}' 

Additional information

No response

Message from the maintainers

Impacted by this issue? Give it a 👍! We factor engagement into prioritization.

cmpadden commented 2 months ago

Hi @nrsmac - @nicklausroach and I are going to explore this, and plan to update the documentation accordingly.

The replicate method passes the environment variables to the Sling subprocess. So one possible solution is to set the environment variables from the partition key. For example:

@sling_assets(
    replication_config=config_dir / "example.yaml",
    dagster_sling_translator=CustomSlingTranslatorMain(),
    partitions_def=DailyPartitionsDefinition(start_date=datetime.now()),
)
def example_sling_assets(context, embedded_elt: SlingResource):
    start_date = context.partition_key
    os.environ['START_DATE'] = start_date
    os.environ['END_DATE'] = start_date + timedelta(days=1)
    yield from embedded_elt.replicate(context=context)

Will keep you posted as we update docs. Please let me know if you make any progress yourself. Thanks!

TCronino commented 6 days ago

I am also trying to figure this out and not having much luck!

I think my issue is that the START_DATE and END_DATE variables within the replication_cofig end up getting set as 'None' as they're not being populated properly in the replication_config (or I assume the replication_config is read in BEFORE the replicate method is able to pass the environment variables in)

either way, if anyone has been able to get this to work i'd love to see an example replication_config and sling_asset!

TCronino commented 4 days ago

For anyone looking for a solution in the meantime, I was able to solve my problem by dynamically creating a new YAML file for each execution.

It feels a little clunky though so if there's an easy way to pass the DailyPartition dates into the sling_asset decorator so that I can do a Backfill with a defined range, i'd certainly be keen to hear about it!

But in case it helps anyone in the meantime, this is how I got Daily Partitions working in a Sling Backfill:

Note that the dagster asset must have the same name as your table name:

from dagster import asset
import os
from dagster import DailyPartitionsDefinition
from datetime import datetime, timedelta
import yaml
import os
import shutil

stream_name="Your Stream Name Here"
table_name="Your Table Name Here"
group_name="Your Group Name Here"

original_file= "Path to your original YAML file here"

def today_date():
    """Return today's date in YYYY-MM-DD format."""

    return datetime.today().strftime('%Y-%m-%d')

def get_next_day(date_string):
    """Given a date, return the following day as a string"""
    # Define the input date format
    date_format = "%Y-%m-%d"

    # Convert the string to a datetime object
    date_obj = datetime.strptime(date_string, date_format)

    # Add one day to the date
    next_day = date_obj + timedelta(days=1)

    # Return the next day's date as a string in the same format
    return next_day.strftime(date_format)

def SlingDailyPartitionYAMLBuilder(context, original_file, stream_name, table_name):
    """Function which edits the Sling replication YAML file to set the correct
        date range for a specific Daily Partition execution"""
    # Extract the relevant date from the Daily Partition context
    start_date = context.partition_key
    run_id = context.run_id

    # Create the relevant range to be fed into the Sling replication
    extract_range = str(start_date + ',' + get_next_day(start_date))
    tmp_table_name = str(table_name + extract_range.replace(",", "").replace("-", ""))
    context.log.info(str('Now Running ' + extract_range))

    # Create the new file name
    unique_config_dir = os.path.normpath(os.path.join("neptune-dagster-prototype/assets/sling_assets/replication_configs/.temp", os.path.basename(original_file).replace('.yaml', f'_{run_id}.yaml')))

    # Copy the original file to the new file
    shutil.copyfile(original_file, unique_config_dir)

    # Load the YAML file
    with open(unique_config_dir, 'r') as file:
        config = yaml.safe_load(file)

    # Update the specific field with the environment variable
    config['streams'][stream_name]['source_options']['range'] = extract_range
    config['streams'][stream_name]['target_options']['table_tmp'] = tmp_table_name

    # Save the updated YAML file
    with open(unique_config_dir, 'w') as file:
        yaml.safe_dump(config, file)

    return unique_config_dir

@asset( 
        group_name=group_name,
        key_prefix="target", # Don't change this - this is set by the SlingResource and unsure how to change it...       
        compute_kind="Sling",
        partitions_def=DailyPartitionsDefinition(start_date='2024-11-01', end_date=today_date()))

def your_table_name(context, sling: SlingResource):
    """ Running a Daily Backfill of the ar_resservice table using Sling""" 
    # Create a copy of the original config file with the relevant dates for this run added
    unique_config_dir=SlingDailyPartitionYAMLBuilder(context=context, original_file=original_file, stream_name=stream_name, table_name=table_name)
    context.log.info(unique_config_dir)

    # Run the sling job
    yield from sling.replicate(context=context, replication_config=unique_config_dir)

    # Delete the temporary config file
    os.remove(unique_config_dir)
    context.log.info((f"Run complete, Deleted old file: {unique_config_dir}"))