dagster-io / dagster

An orchestration platform for the development, production, and observation of data assets.
https://dagster.io
Apache License 2.0
11.99k stars 1.5k forks source link

Create multiple assets within a Jupyter notebook #10557

Open jamiedemaria opened 2 years ago

jamiedemaria commented 2 years ago

What's the use case?

currently, when creating an asset from a Jupyter notebook using dagstermill.define_dagstermill_asset the resulting asset only creates a single asset, and that asset is the executed notebook.

There is a use case where you may want some of the contents of the notebook to also be assets. This would involve making define_dagstermill_asset create a multi_asset instead of a singular asset. This would allow users to yield outputs within the notebook and have those outputs treated as assets.

There are a couple things to consider with this, mainly: how does it affect dagit (do we need to display each asset within the notebook as it's own asset block in the graph view)? Should subselection work?

Ideas of implementation

A minimal implementation would be to replace the asset in define_dagstermill_asset with a multi_asset, but provide no additional features in dagit. We also wouldn't support sub-selection.

Message from the maintainers

Impacted by this issue? Give it a 👍! We factor engagement into prioritization.

nickvazz commented 2 years ago

Would you be able use AssetMaterialization within a notebook as a workaround? I would assume you would lose the asset lineage visualization then correct?

jamiedemaria commented 2 years ago

That would be a good incremental step, but right now the dagstermill asset also doesn't support events (it wouldnt be challenging to add though).

jamiedemaria commented 2 years ago

This PR will support yielding events in asset notebooks, but we'll still need to convert to using a multi-asset to have more rich support for multiple output assets. https://github.com/dagster-io/dagster/pull/10593

jslorrma commented 8 months ago

Any update on this? We are currently exploring the possibilities using dagster (assets) to refactor, re-structure and orchestrate our naturally grown system of jupyter notebooks, that process process data in our company. Some notebooks contain strongly related (business) logic, but produce multiple assets. In these cases having the possibility to yield multiple assets would be great.

yuhan commented 8 months ago

Exploration note - we might be able to model papermill execution as an external execution using Pipes which supports multiple assets and other asset capabilities out of the box. We need to prototype it in order to assess the feasibility, but this may be a viable path to address this issue and many other notebook frictions in the system.

jslorrma commented 8 months ago

Exploration note - we might be able to model papermill execution as an external execution using Pipes which supports multiple assets and other asset capabilities out of the box. We need to prototype it in order to assess the feasibility, but this may be a viable path to address this issue and many other notebook frictions in the system.

Great and thanks for the hint towards Pipes. We will explore it.

motin commented 6 months ago

Idea: Use jupytext and pair your notebooks with a light format python notebook (simple .py file version of the notebook). This way you can define multiple assets as you wish (use the @asset decorator as usual), import from other notebooks, define definitions etc just as you would ordinarily.

Working example using Dagster quick start example:

# ---
# jupyter:
#   jupytext:
#     formats: ipynb,py:light
#     text_representation:
#       extension: .py
#       format_name: light
#       format_version: '1.5'
#       jupytext_version: 1.16.1
# ---

# +
import base64
import json
import os
from io import BytesIO

import matplotlib.pyplot as plt
import pandas as pd
import requests
from dagster import AssetExecutionContext, MaterializeResult, MetadataValue, asset

@asset(group_name="hackernews", compute_kind="HackerNews API")
def topstory_ids() -> None:
    """Get up to 100 top stories from the HackerNews topstories endpoint.

    API Docs: https://github.com/HackerNews/API#new-top-and-best-stories
    """
    newstories_url = "https://hacker-news.firebaseio.com/v0/topstories.json"
    top_new_story_ids = requests.get(newstories_url).json()[:100]

    os.makedirs("data", exist_ok=True)
    with open("data/topstory_ids.json", "w") as f:
        json.dump(top_new_story_ids, f)

@asset(deps=[topstory_ids], group_name="hackernews", compute_kind="HackerNews API")
def topstories(context: AssetExecutionContext) -> MaterializeResult:
    """Get items based on story ids from the HackerNews items endpoint. It may take 30 seconds to fetch all 100 items.

    API Docs: https://github.com/HackerNews/API#items
    """
    with open("data/topstory_ids.json", "r") as f:
        topstory_ids = json.load(f)

    results = []
    for item_id in topstory_ids:
        item = requests.get(f"https://hacker-news.firebaseio.com/v0/item/{item_id}.json").json()
        results.append(item)

        if len(results) % 20 == 0:
            context.log.info(f"Got {len(results)} items so far.")

    df = pd.DataFrame(results)
    df.to_csv("data/topstories.csv")

    return MaterializeResult(
        metadata={
            "num_records": len(df),  # Metadata can be any key-value pair
            "preview": MetadataValue.md(df.head().to_markdown()),
            # The `MetadataValue` class has useful static methods to build Metadata
        }
    )

@asset(deps=[topstories], group_name="hackernews", compute_kind="Plot")
def most_frequent_words(context: AssetExecutionContext) -> MaterializeResult:
    """Get the top 25 most frequent words in the titles of the top 100 HackerNews stories."""
    stopwords = ["a", "the", "an", "of", "to", "in", "for", "and", "with", "on", "is"]

    topstories = pd.read_csv("data/topstories.csv")

    # loop through the titles and count the frequency of each word
    word_counts = {}
    for raw_title in topstories["title"]:
        title = raw_title.lower()
        for word in title.split():
            cleaned_word = word.strip(".,-!?:;()[]'\"-")
            if cleaned_word not in stopwords and len(cleaned_word) > 0:
                word_counts[cleaned_word] = word_counts.get(cleaned_word, 0) + 1

    # Get the top 25 most frequent words
    top_words = {
        pair[0]: pair[1]
        for pair in sorted(word_counts.items(), key=lambda x: x[1], reverse=True)[:25]
    }

    # Make a bar chart of the top 25 words
    plt.figure(figsize=(10, 6))
    plt.bar(list(top_words.keys()), list(top_words.values()))
    plt.xticks(rotation=45, ha="right")
    plt.title("Top 25 Words in Hacker News Titles")
    plt.tight_layout()

    # Convert the image to a saveable format
    buffer = BytesIO()
    plt.savefig(buffer, format="png")
    image_data = base64.b64encode(buffer.getvalue())

    # Convert the image to Markdown to preview it within Dagster
    md_content = f"![img](data:image/png;base64,{image_data.decode()})"

    with open("data/most_frequent_words.json", "w") as f:
        json.dump(top_words, f)

    # Attach the Markdown content as metadata to the asset
    return MaterializeResult(metadata={"plot": MetadataValue.md(md_content)})

# +
from dagster import (
    Definitions,
    ScheduleDefinition,
    define_asset_job,
    load_assets_from_package_module,
)

daily_refresh_schedule = ScheduleDefinition(
    job=define_asset_job(name="all_assets_job"), cron_schedule="0 0 * * *"
)

defs = Definitions(
    assets=[topstory_ids, topstories, most_frequent_words], schedules=[daily_refresh_schedule]
)

# -

Save it as dagster_notebook_example.py and run using:

dagster dev -f dagster_notebook_example.py
ydennisy commented 5 months ago

@motin this looks interesting!

How do you handle the pairing to a regular .ipynb file which you run and store in dagster?