Open sryza opened 2 years ago
Best that we've got right now:
from dagster import (
graph_asset,
op,
DynamicOut,
DynamicOutput,
Out,
In,
Nothing,
DailyPartitionsDefinition,
)
from typing import List
@op(out=DynamicOut())
def find_files_to_process():
files = ["a", "b", "c"]
for file_name in files:
# TODO: write files to somewhere
yield DynamicOutput(mapping_key=file_name, value=file_name)
@op
def load_to_table(context, file_name):
"""Load the file into the table"""
context.log.info(f"Loading to table for file {file_name}")
return file_name
@op(ins={"all_files": In(Nothing)}, out=Out(Nothing))
def merge(all_file_names: List[str]):
"""Merge all the files"""
@graph_asset(partitions_def=DailyPartitionsDefinition(start_date="2022-06-01"))
def my_merged_file():
return merge(find_files_to_process().map(load_to_table).collect())
As I was trying this example, it seems AssetGroup
doesn't currently exist in Dagster.
It works just by removing its mentions in the repo and the imports:
@repository
def repo():
return [
AssetsDefinition.from_graph(
my_merged_file,
partitions_def=DailyPartitionsDefinition(start_date="2022-06-01"),
)
]
Thanks for catching that @NicolasPA - I just updated the comment
You forgot the import! :ship:
Oops you are right - fixed
Since I'd prefer my graph to show source_files
-> table
to show the two data structures, I've added an asset source_files
that is used as the input of the graph backed asset table
.
So visually it goes from a lonely asset in your example:
To a more expressive graph:
Exploded op graph view:
Additionally, and it's a bit out of scope here but can be useful, I'm using a sensor that may modify the list of files that the source_files
outputs using an optional parameter and the newly added (🙏) possibility to specify a run_config
in run_request_for_partition
(use case is automated backfill of partial partitions).
import time
from typing import List
from dagster import (
graph,
AssetsDefinition,
op,
DynamicOut,
DynamicOutput,
repository,
DailyPartitionsDefinition,
Field,
Array,
asset,
OpExecutionContext,
Output,
AssetSelection,
define_asset_job,
sensor,
)
DAILY_PARTITIONS = DailyPartitionsDefinition(start_date="2022-06-01")
@asset(
description="Files to load",
partitions_def=DAILY_PARTITIONS,
config_schema={
"selected_file_paths": Field(Array(str), is_required=False, default_value=[])
},
)
def source_files(context: OpExecutionContext):
selected_file_paths = context.op_config["selected_file_paths"]
if selected_file_paths:
context.log.info(f"Found selected file paths: {selected_file_paths}")
file_paths = selected_file_paths
else:
context.log.info("Looking for paths matching the pattern.")
file_paths = ["a", "b", "c"]
return Output(file_paths)
@op(out=DynamicOut())
def output_files_dynamically(source_files):
for file_name in source_files:
yield DynamicOutput(mapping_key=file_name, value=file_name)
@op
def load_to_table(context, file_name):
"""Load the file into the table"""
context.log.info(f"Loading to table for file {file_name}")
return file_name
@op(description="Loaded table")
def merge(all_file_names: List[str]):
"""Merge all the files"""
@graph(name="table")
def load_files_to_table_graph(source_files):
return merge(output_files_dynamically(source_files).map(load_to_table).collect())
table = AssetsDefinition.from_graph(
load_files_to_table_graph,
partitions_def=DAILY_PARTITIONS,
keys_by_input_name={"source_files": source_files.asset_key},
)
load_files_to_table_job = define_asset_job(
name="load_files_to_table_job",
selection=AssetSelection.assets(source_files, table),
partitions_def=DAILY_PARTITIONS,
)
@sensor(job=load_files_to_table_job)
def new_files_sensor():
# some missing files detection logic, result in:
new_files_partitions = [
{"partition_date": "2022-11-05", "selected_file_paths": ["d", "e"]},
{"partition_date": "2022-11-06", "selected_file_paths": ["f", "g"]},
]
for new_files_partition in new_files_partitions:
run_config = {
"ops": {
"source_files": {
"config": {
"selected_file_paths": new_files_partition[
"selected_file_paths"
]
}
}
}
}
yield load_files_to_table_job.run_request_for_partition(
partition_key=new_files_partition["partition_date"], run_config=run_config
)
@repository
def repo():
return [
source_files,
table,
new_files_sensor,
]
A little quirk to note about the name and description that show up for the graph-backed asset on Dagit : while the name of the asset uses the name defined in the graph, the description that appears is the one from the last op of the graph, which is surprising, hence why my merge
op has the "Loaded table" description.
I think both name and description should be picked from the graph by default, and it should be possible to overwrite this default with optional arguments in AssetsDefinition.from_graph()
.
From @slopp:
The [user] had a fairly common request: parallelization. One of their assets was enriching existing data row-by-row by calling an API and they wanted flexibility in how those calls were run in parallel. This could be done with regular python parallelization code within an asset, but then dagster is not aware of the parallelization. Instead, I tried to use dagster to manage the parallelization, and while the results were cool, parallelizing an asset for the first time was complicated and undocumented AF . I ended up splitting the asset in into a graph of ops: one op produced dynamic output, another represented the enrichment, and a third collected the results. A graph chained those ops together. But the hard part was figuring out where to put my asset stuff, like the asset key, prefix, and io manager. I expected those to go somewhere in my call to AssetDefinitions.from_graph but actually it was my op that coalesced the collected results?
I'm back with additional complexity for late Christmas!
I'm adding to the above, that I have 2 different types of files, and I want each file type to go through a type specific processing. So in terms of Dagster notions, I'm adding the "multiple output" concept.
This means the output_files_dynamically
now yields two outputs and the merge
op takes 2 inputs, one for each type, instead of one.
A limitation I encountered is that the list of inputs taken by the op is static, I can't use *args
to get a dynamic number of inputs.
So if I add a 3rd type, I'll have to add a third input to merge
, and if I end up with 20 types, then merge
will have 20 inputs.
But it works!
import time
from typing import List
from dagster import (
graph,
AssetsDefinition,
op,
DynamicOut,
DynamicOutput,
repository,
DailyPartitionsDefinition,
Field,
Array,
asset,
OpExecutionContext,
Output,
AssetSelection,
define_asset_job,
sensor,
asset_sensor,
RunRequest,
)
DAILY_PARTITIONS = DailyPartitionsDefinition(start_date="2022-06-01")
@asset(
description="Files to load",
partitions_def=DAILY_PARTITIONS,
key_prefix="source",
config_schema={
"selected_file_paths": Field(Array(str), is_required=False, default_value=[])
},
)
def source_files(context: OpExecutionContext):
selected_file_paths = context.op_config["selected_file_paths"]
if selected_file_paths:
context.log.info(f"Found selected file paths: {selected_file_paths}")
file_paths = selected_file_paths
else:
context.log.info("Looking for paths matching the pattern.")
file_paths = ["type_1_a", "type_1_b", "type_2_c"]
return Output(file_paths)
@op(
out={
"type_1": DynamicOut(),
"type_2": DynamicOut(),
},
)
def output_files_dynamically(context: OpExecutionContext, source_files):
for file_path in source_files:
if "type_1" in file_path:
yield DynamicOutput(
value=file_path,
output_name="type_1",
mapping_key=file_path,
)
elif "type_2" in file_path:
yield DynamicOutput(
value=file_path,
output_name="type_2",
mapping_key=file_path,
)
else:
context.log.warning(
f"The path pattern of file: {file_path} is not among the supported file path patterns: type_1, type_2."
)
@op
def load_to_table_type_1(context, file_name):
"""Load the file into the table"""
context.log.info(f"Loading to table for file {file_name}")
return file_name
@op
def load_to_table_type_2(context, file_name):
"""Load the file into the table"""
context.log.info(f"Loading to table for file {file_name}")
return file_name
@op(description="Loaded table")
def merge(type1_files: List[str], type2_files: List[str]):
"""Merge all the files"""
@graph(name="table", description="lala")
def load_files_to_table_graph(source_files):
type_1_files, type_2_files = output_files_dynamically(source_files)
processed_type_1_files = type_1_files.map(load_to_table_type_1).collect()
processed_type_2_files = type_2_files.map(load_to_table_type_2).collect()
return merge(processed_type_1_files, processed_type_2_files)
table = AssetsDefinition.from_graph(
load_files_to_table_graph,
partitions_def=DAILY_PARTITIONS,
keys_by_input_name={"source_files": source_files.asset_key},
key_prefix="source",
)
load_files_to_table_job = define_asset_job(
name="load_files_to_table_job",
selection=AssetSelection.assets(source_files, table),
partitions_def=DAILY_PARTITIONS,
)
@sensor(job=load_files_to_table_job)
def new_files_sensor():
# some files detection logic returns this:
new_files_partitions = [
{"partition_date": "2022-11-05", "selected_file_paths": ["d_type_1", "e_type_2", "x_type_2", "y_type_3"]},
{"partition_date": "2022-11-06", "selected_file_paths": ["f_type_1", "g_type_2"]},
]
source_files_asset_key = source_files.key.to_python_identifier()
for new_files_partition in new_files_partitions:
run_config = {
"ops": {
source_files_asset_key: {
"config": {
"selected_file_paths": new_files_partition[
"selected_file_paths"
]
}
}
}
}
yield load_files_to_table_job.run_request_for_partition(
partition_key=new_files_partition["partition_date"], run_config=run_config
)
@repository
def repo():
return [
source_files,
table,
new_files_sensor
]
A related user request from Axel Bock:
Hi all, after reading docs and FAQ :wink: i still have questions. I can’t find any “recommended” way to do this:
from what i’ve read, each “final CSV” file should be an asset, which is then processed. so my main trouble lies in the “transformation” from an “incomplete asset” (e.g. a ZIP file) to several “more fine-grained assets” (e.g. several CSV files) which are then, again, processed further. my initial thought was to have sensors, which react on asset creations, but i was unable to connect the components … :
https://dagster.slack.com/archives/C01U954MEER/p1677406576894829
@sryza I was hoping to achieve something similar, to dynamically define assets by looping through the content of an upstream asset. In your example above, the IO Manager is also triggered to handle the output of find_files_to_process
. In my case it doesn't make sense, I would only need to process with the IO Manager the final output after the mapping. Is there a way to achieve that?
Asset1 (materialised) -> Op1 (dynamic output, non-materialized) -> Op2/Assets (multiple assets, materialised)
@aaaaahaaaaa you should be able to do something like
@op(out=DynamicOut())
def find_files_to_process() -> Generator[DynamicOutput[None]]:
files = ["a", "b", "c"]
for file_name in files:
# TODO: write files to somewhere
yield DynamicOutput(mapping_key=file_name, value=None)
@op(ins={"file_name": In(dagster_type=Nothing)})
def load_to_table(context):
"""Load the file into the table"""
context.log.info(f"Loading to table for file {context.get_mapping_key()}")
...
@sryza Okay I see, thanks. It's not ideal because I'm loosing the original value
since it has to be transformed to be a valid mapping_key
. But I'll find a workaround there if that's the only way to make this work.
Oh I see. You can always use the mem_io_manager if you're using the in-process executor. But if you're using a cross-process executor, then data needs to be stored somewhere to make it across the process boundary between steps.
@sryza We do use the in-process executor so using the mem_io_manager
was actually a good solution, it works nicely.
However, I have one last issue remaining in that scenario: how can I set the asset key dynamically for each asset generated by the second op
, so it's then picked up but my main IO-manager? Each key has to be unique because the value is used for materializing the data (e.g. SQL table name).
Currently my main IO-manager is raising:
Attempting to access asset_key, but it was not provided when constructing the OutputContext
@aaaaahaaaaa - software-defined assets currently don't support dynamic asset keys, and it's unlikely we will in the future. However, we will likely add support for dynamic partition keys, which is the solution proposed in the issue description above.
This doesn't work yet, but the way it could work in your case once we support this could be something like:
@op(out=DynamicOut())
def find_files_to_process() -> Generator[DynamicOutput[None]]:
files = ["a", "b", "c"]
for file_name in files:
data_in_file = read_file(file_name)
yield DynamicOutput(mapping_key=file_name, value=data_in_file)
@op(ins={io_manager_key="main_io_manager")
def load_to_table(data_in_file):
return data_in_file
@graph_asset(partitions_from_dynamic_output=True, partitions_def=DynamicPartitionsDefinition(name="files"))
def files_asset():
return find_files_to_process.map(load_to_table)
@sryza I see, thanks for the feedback, I very much appreciate the time and effort walking me through this. So if I follow correctly, this approach will most likely never be compatible with IO-managers that materialise assets based on asset keys (e.g. the BigQuery IO-manager in its current state)?
@aaaaahaaaaa I think we could make it compatible by adding a per-asset option on the bigquery IO manager to store each partition in a separate table.
This is something that would be helpful to me, at least in terms of being able to materialize an asset based on a brand new dynamic partition. Here is how I am currently accomplishing this:
@asset(partitions_def=dynamic_partitions_def)
def asset_1():
pass
asset_1_job = define_asset_job('asset_1_job', selection=[asset_1])
@op
def get_partition_key():
# some complex logic to get partition_key
return "foo"
@op
def materialize_asset_1(context, partition_key):
context.instance.add_dynamic_partitions(dynamic_partitions_def.name, partition_keys=[partition_key])
graphql.client.submit_job_execution(
'asset_1_job',
tags={
'dagster/partition': partition_key
},
repository_location_name=DAGSTER_CODE_LOCATION,
repository_name='__repository__',
run_config= { ... },
instance=context.instance
)
@job
def get_partition_key_and_materialize():
partition_key = get_partition_key()
materialize_asset_1(partition_key)
@sryza Do you mean dagster doesn't support dynamic asset keys
like :
@asset
def sample_df()-> pd.DataFrame:
df = pd.DataFrame(
np.arange(16).reshape(-1, 4),
columns=list('abcd')
)
df.index = pd.date_range('2023', periods=df.shape[0])
df.index.name = 'keep_index'
return df
@asset
def add_one(sample_df:pd.DataFrame)-> pd.DataFrame:
return sample_df + 1
@multi_asset
def multi_output(sample_df:pd.DataFrame)-> pd.DataFrame:
lst_vola_window = [30, 60]
for i in lst_vola_window:
yield Output(value=sample_df+i, output_name=f"multi_output_{i}")
I was struggling to create dynamic asset, above code got error. Do you have any advise?
@eromoe exactly. Some of the comments on this issue include workarounds.
@sryza Sorry, I don't get it, I veiw all but the partition method doesn't look like match my purpose.
My case is generate label or feature by some params . For example, itertools.product
some lists to a params collection, produce corresponding assets(with special name ) . like
@asset
def create_labels(stock_hfq_df:pd.DataFrame)->pd.DataFrame:
label_types = ['LEXLB']
lst_vola_window = [30, 60]
lst_vola_omega = [2]
for type, window, omega in product(label_types, lst_vola_window, lst_vola_omega):
df = group_generate_labels(stock_hfq_df, type, window, omega)
create_asset(value=df, output_name=f"label_{type}_{window}_{omega}")
It is easy in Airflow, which using for loop create many operators. Could you give me an example ?
@eromoe does this help? If not, would you mind creating a new Github discussion describing exactly what you're trying to accomplish?
@sryza Is this issue on your roadmap short term by any chance? This is a use case we keep bumping into on a regular basis in our team.
@aaaaahaaaaa alas it's not on our short term roadmap. I am hoping we can get to it in 2024.
@sryza Also adding that we have several use cases for this. Would be great to know when it is added to the short term roadmap!
I think this discussion I opened is related to this feature? https://github.com/dagster-io/dagster/discussions/23888
User story
During a run, I want to be able to create new partitions for a dynamically partitioned asset, and have a step run for each of those partitions.
Potential solution
It could look something like this:
What's happening with this code snippet, if you click the button to materialize
customer_dataframes
andcustomer_stats
:AssetMaterialization
events will be produced, or what partitions those events will correspond to. Rather, that is determined at runtime. The body of the function determines a set of mapping_keys to yield, which determines the set of partitions that are materialized.DynamicOutput
withmapping_key="pepsi"
is yielded, then Dagster will create a "pepsi" partition in the "customers"DynamicPartitionsDefinition
. Then it will invoke theIOManager
'shandle_output
with anOutputContext
that includes "pepsi" as itsasset_partition_key
. Then it will record anAssetMaterialization
withasset _key="customers_dataframe"
, andpartition="pepsi"
.customer_stats
, Dagster will launch a separate step for each of of theDynamicOutput
s yielded during the execution ofcustomer_dataframes
. Each of those steps will produce a singleAssetMaterialization
withasset_key="customer_stats"
, and the partition equal to whatever the partition is is for that step.Considerations:
Relevant discussions and requests: