Open sohooo opened 1 year ago
I once created a proof of concept of this idea, however, it is currently not possible to see the schema of the taps. (The POC was done by using the Meltano python interface, which is not stable.)
We are possibly able to create assets for the taps themselves, would this help you?
Dagster recommends using SDAs for new pipelines, and I'd like to follow that mind shift in approaching a pipeline setup:
Dagster is mainly used to build data pipelines, and most data pipelines can be expressed in Dagster as sets of software-defined assets. If you’re a new Dagster user and your goal is to build a data pipeline, we recommend starting with software-defined assets and not worrying about ops or graphs. This is because most of the code you’ll be writing will directly relate to producing data assets.
It would be great if this library would support this new way of pipeline design. Please correct me if my reasoning is wrong or I'm missing something! I'm new in the field of DataOps, which a history of tooling like Jenkins, Rundeck, Gitlab, etc. :)
We are possibly able to create assets for the taps themselves, would this help you?
Yeah I think this would help!
I tried using @multi_asset
s to explicitly define the assets coming from Meltano tap-oracle
:
@multi_asset(
resource_defs={"meltano": meltano_resource},
compute_kind="meltano",
group_name="sources",
outs={
"departments": AssetOut(key_prefix=["hr"]),
"employees": AssetOut(key_prefix=["hr"]),
"jobs": AssetOut(key_prefix=["hr"]),
}
)
def meltano_run_job():
meltano_command_op("run tap-oracle target-postgres")()
Now there's a continuous lineage from Meltano-produced tables (dbt sources) to the following dbt assets:
However, when running meltano_run_job()
(by materializing the assets in the sources
group), the job fails with:
dagster._core.errors.DagsterInvalidInvocationError: Compute function of op 'run_tap_oracle_target_postgres' has context argument, but no context was provided when invoking.
Same error when using meltano_run_op()
. What am I missing? How would you design this @JulesHuisman ? :)
We are possibly able to create assets for the taps themselves, would this help you?
Yeah I think this would help!
I tried using
@multi_asset
s to explicitly define the assets coming from Meltanotap-oracle
:@multi_asset( resource_defs={"meltano": meltano_resource}, compute_kind="meltano", group_name="sources", outs={ "departments": AssetOut(key_prefix=["hr"]), "employees": AssetOut(key_prefix=["hr"]), "jobs": AssetOut(key_prefix=["hr"]), } ) def meltano_run_job(): meltano_command_op("run tap-oracle target-postgres")()
Now there's a continuous lineage from Meltano-produced tables (dbt sources) to the following dbt assets:
However, when running
meltano_run_job()
(by materializing the assets in thesources
group), the job fails with:dagster._core.errors.DagsterInvalidInvocationError: Compute function of op 'run_tap_oracle_target_postgres' has context argument, but no context was provided when invoking.
Same error when using
meltano_run_op()
. What am I missing? How would you design this @JulesHuisman ? :)
I don't think you are able to call an op like that in the asset function. You might be able to achieve this by replicating this line: https://github.com/quantile-development/dagster-meltano/blob/0517c3c44836352071250235f0f95e6fa7eb67c4/dagster_meltano/ops.py#L94 inside the asset.
We are currently stuck at creating the meltano assets dynamically from a list. Our approach at the moment looks somewhat like this but it keeps overwriting the prior assets and just displays the last one. @JulesHuisman do you know of any way how this could be achieved?
names = ["a", "b", "c"]
for name in names:
@multi_asset(
compute_kind="meltano",
group_name="sources",
outs={
name: AssetOut(key_prefix=["hr"])
}
)
def meltano_run_single(context: OpExecutionContext):
return "a"
You could do something like this, use a factory design to automatically create assets. In this example it are individual assets, but you could do the same to dynamically create multi assets.
def meltano_asset_factory(names: list) -> list:
assets = []
for name in names:
@asset
def compute():
return name
assets.append(compute)
return assets
Thank you. With the help of the dagster slack channel we kinda figured it out. Code to run the meltano job
def meltano_run_job(context, table: str):
context.log.info(context.selected_output_names)
#Run the meltano job "import_hr" with logging
execute_shell_command(
f"NO_COLOR=1 TAP_ORACLE__HR_FILTER_TABLES={table} meltano run import_hr",
output_logging="STREAM",
log=context.log,
cwd=MELTANO_PROJECT,
# env={"FILTER_TABLES":table},
)
Code to build the assets out of YAML spec
def build_asset(spec) -> AssetsDefinition:
@asset(name=spec["name"], group_name="sources", key_prefix="hr", compute_kind="meltano")
def _asset(context):
meltano_run_job(context=context, table=spec["table"])
return _asset
assets=[build_asset(spec) for spec in asset_list]
The last thing we are missing is how to correctly use the filter_table env variable.
@LeqitSebi I think the SELECT env var might help if you just want to run meltano to update a single table.
Did you move away from the multi asset approach? Multi asset makes more sense if you have many child streams. With the single asset approach would you be reiterating your API calls?
Multi asset fits better with my intended outcome, that any request to any meltano table causes a run of the whole tap and target combo.
I figured out a workflow that more or less works and gives the correct lineage, assuming you want to run the whole tap, plus downstream tables. This goes into the dagster repository.py
. You have to define your taps and streams in the python code (although probably a way to get this from meltano.yml
). My sources are defined with a raw_
prefix, so tap-freshdesk
becomes raw_freshdesk
source in dbt. I have also defined my groups in dbt. I tend to materialise entire dbt groups at ones, so I create a job and schedule for my group.
import os
from pathlib import Path
from dagster import ScheduleDefinition, DefaultScheduleStatus, Definitions, define_asset_job, AssetOut, multi_asset, OpExecutionContext, ConfigurableResource, AssetSelection
import enum
from dagster_meltano import meltano_resource
from dagster_dbt import load_assets_from_dbt_project, DbtCliResource
DBT_PROJECT_PATH = str(Path(__file__).parent.parent.parent.parent / "my_dbt_directory")
DBT_PROFILE = os.getenv('DBT_PROFILE')
DBT_TARGET = os.getenv('DBT_TARGET')
class MeltanoEnv(enum.Enum):
dev = enum.auto()
prod = enum.auto()
MELTANO_PROJECT_DIR = os.getenv("MELTANO_PROJECT_ROOT", os.getcwd())
MELTANO_BIN = os.getenv("MELTANO_BIN", "meltano")
resources= {
"dbt": DbtCliResource(project_dir=DBT_PROJECT_PATH, target=DBT_TARGET, profile=DBT_PROFILE),
# "meltano": meltano_resource,
}
ALL_TAP_STREAMS = {
"freshdesk": [
"conversations",
"ticket_fields",
"tickets_detail",
],
"mailchimp": [
"campaigns",
"lists",
"lists_members",
"reports_email_activity",
"reports_sent_to",
"reports_unsubscribes",
],
"instagram": [
"media",
"media_children",
"media_insights",
"stories",
"story_insights",
],
"tiktok": [
"accounts",
"videos",
"comments",
]
}
def meltano_asset_factory(all_tap_streams: list) -> list:
multi_assets = []
jobs = []
schedules = []
for tap_name, tap_streams in all_tap_streams.items():
@multi_asset(
name=tap_name,
resource_defs={'meltano': meltano_resource},
compute_kind="meltano",
group_name=tap_name,
outs={
stream: AssetOut(key_prefix=[f'raw_{tap_name}'])
for stream
in tap_streams
}
)
def compute(context: OpExecutionContext, meltano: ConfigurableResource):
command = f"run tap-{context.op.name} target-postgres"
meltano.execute_command(f"{command}", dict(), context.log)
return tuple([None for _ in context.selected_output_names])
multi_assets.append(compute)
asset_job = define_asset_job(f"{tap_name}_assets", AssetSelection.groups(tap_name))
basic_schedule = ScheduleDefinition(
job=asset_job,
cron_schedule="@hourly",
default_status=DefaultScheduleStatus.RUNNING
)
jobs.append(asset_job)
schedules.append(basic_schedule)
return multi_assets, jobs, schedules
meltano_assets, jobs, schedules = meltano_asset_factory(ALL_TAP_STREAMS)
dbt_assets = load_assets_from_dbt_project(DBT_PROJECT_PATH, profiles_dir=DBT_PROJECT_PATH,)
defs = Definitions(
assets= (dbt_assets + meltano_assets),
resources= resources,
jobs=jobs,
schedules=schedules,
)
@jaceksan and I are working on an extenstion of the dagster-meltano plugin that includes functionality to automatically load tapstreams into dagster as assets.
We're just getting started on it; collaborators are welcome!
@jaceksan and I are working on an extenstion of the dagster-meltano plugin that includes functionality to automatically load tapstreams into dagster as assets.
We're just getting started on it; collaborators are welcome!
If I understand correctly that would mean it's not necessary to keep a list of taps and streams in repository.py
? That would be a nice improvement! One issue I've found is regularising the job names and dbt sources/groups to make sure the dependency chain in dagster works.
@jaceksan and I are working on an extenstion of the dagster-meltano plugin that includes functionality to automatically load tapstreams into dagster as assets. We're just getting started on it; collaborators are welcome!
If I understand correctly that would mean it's not necessary to keep a list of taps and streams in
repository.py
? That would be a nice improvement! One issue I've found is regularising the job names and dbt sources/groups to make sure the dependency chain in Dagster works.
We are now struggling with how streams/attributes are defined in meltano.yml, resp. in meltano_manifest.json.
There may be globs.
I suggested running meltano select
and identifying all selected attributes from the output.
Names in this output are equal to target table names and column names, at least in my demo (tap-gihub, tap-exchangeratehost, tap-s3-csv; many targets).
I loaded dbt assets to Dagster in my demo project and asset names are equal to underlying table names. So, should not both Meltano and Dagster asset names match?
The issue I've found is consistency in naming between meltano and DBT. I don't worry too much about the asset names in dagster really.
In my instance, every tap in meltano has an equivalent source named raw_{TAP}
, and I had to specify these as related downstream assets. Hence the asset_out
part in the below.
for tap_name, tap_streams in all_tap_streams.items():
@multi_asset(
name=tap_name,
resource_defs={'meltano': meltano_resource},
compute_kind="meltano",
group_name=tap_name,
outs={
stream: AssetOut(key_prefix=[f'raw_{tap_name}'])
for stream
in tap_streams
}
)
Without this info, dagster can't infer that the dbt source is a downstream of the associated meltano stream.
Does that explain a bit more?
Hi, I want to define dagster
@asset
s for meltano runs, as the dagster SDA seems to be the recommended/most reasonable way to design new data flows. How could this be configured withdagster-meltano
?