dagster-io / dagster

An orchestration platform for the development, production, and observation of data assets.
https://dagster.io
Apache License 2.0
10.66k stars 1.32k forks source link

Databricks integration #2458

Closed sd2k closed 4 years ago

sd2k commented 4 years ago

I've been following Dagster for a month or so as we're looking to revamp our data pipelines at $company. We'll be using Spark for the majority of our ETLs, but using Databricks to manage our infrastructure rather than manually managing Spark clusters.

I noticed that there was an EMR launcher for launching PySpark solids added a week or so ago. I haven't used EMR much, but think Databricks has a similar workflow where jobs are submitted using their Jobs API (which I've been using in the past through Airflow).

Does this sound like a good candidate for a dagster-databricks integration library? And if so, are their any plans to support Databricks already, or would you be accepting contributions there?

Thanks!

schrockn commented 4 years ago

yes please! that would be fantastic

sd2k commented 4 years ago

OK cool. I've made a start on this and have an external step launcher which is working, but I think I may need to add a new system storage for DBFS (the Databricks filesystem) at the very least - I'm not using AWS so can't rely on the S3 storage unfortunately. I'll also have to add an Azure storage system too later but that can be a separate PR :)


I've noticed something weird happening when trying to unpickle the events pickled by my main function (the entrypoint used by the remote PySpark). It looks like when the event objects are pickled and unpickled they end up as the parent namedtuple classes:

# print(events) on remote PySpark:
[DagsterEvent(event_type_value='STEP_START', pipeline_name='pyspark_pipe', step_key='make_df_solid.compute', solid_handle=SolidHandle(name='make_df_solid', parent=None), step_kind_value='COMPUTE', logging_tags={'pipeline': 'pyspark_pipe', 'step_key': 'make_df_solid.compute', 'solid': 'make_df_solid', 'solid_definition': 'make_df_solid'}, event_specific_data=None, message='Started execution of step "make_df_solid.compute".'), DagsterEvent(event_type_value='STEP_OUTPUT', pipeline_name='pyspark_pipe', step_key='make_df_solid.compute', solid_handle=SolidHandle(name='make_df_solid', parent=None), step_kind_value='COMPUTE', logging_tags={'pipeline': 'pyspark_pipe', 'step_key': 'make_df_solid.compute', 'solid': 'make_df_solid', 'solid_definition': 'make_df_solid'}, event_specific_data=StepOutputData(step_output_handle=StepOutputHandle(step_key='make_df_solid.compute', output_name='result'), intermediate_materialization=None, type_check_data=TypeCheckData(success=True, label='result', description=None, metadata_entries=[])), message='Yielded output "result" of type "PySparkDataFrame". (Type check passed).'), DagsterEvent(event_type_value='STEP_SUCCESS', pipeline_name='pyspark_pipe', step_key='make_df_solid.compute', solid_handle=SolidHandle(name='make_df_solid', parent=None), step_kind_value='COMPUTE', logging_tags={'pipeline': 'pyspark_pipe', 'step_key': 'make_df_solid.compute', 'solid': 'make_df_solid', 'solid_definition': 'make_df_solid'}, event_specific_data=StepSuccessData(duration_ms=285.62570000030973), message='Finished execution of step "make_df_solid.compute" in 285ms.')]

# contents of events.pkl, downloaded from DBFS, and unpickled locally:
In [1]: import pickle

In [2]: events = pickle.load(open("events.pkl", "rb"))

In [3]: events
Out[3]:
[_DagsterEvent(event_type_value='STEP_START', pipeline_name='pyspark_pipe', step_key='make_df_solid.compute', solid_handle=_SolidHandle(name='make_df_solid', parent=None), step_kind_value='COMPUTE', logging_tags={'pipeline': 'pyspark_pipe', 'step_key': 'make_df_solid.compute', 'solid': 'make_df_solid', 'solid_definition': 'make_df_solid'}, event_specific_data=None, message='Started execution of step "make_df_solid.compute".'),
 _DagsterEvent(event_type_value='STEP_OUTPUT', pipeline_name='pyspark_pipe', step_key='make_df_solid.compute', solid_handle=_SolidHandle(name='make_df_solid', parent=None), step_kind_value='COMPUTE', logging_tags={'pipeline': 'pyspark_pipe', 'step_key': 'make_df_solid.compute', 'solid': 'make_df_solid', 'solid_definition': 'make_df_solid'}, event_specific_data=_StepOutputData(step_output_handle=_StepOutputHandle(step_key='make_df_solid.compute', output_name='result'), intermediate_materialization=None, type_check_data=_TypeCheckData(success=True, label='result', description=None, metadata_entries=[])), message='Yielded output "result" of type "PySparkDataFrame". (Type check passed).'),
 _DagsterEvent(event_type_value='STEP_SUCCESS', pipeline_name='pyspark_pipe', step_key='make_df_solid.compute', solid_handle=_SolidHandle(name='make_df_solid', parent=None), step_kind_value='COMPUTE', logging_tags={'pipeline': 'pyspark_pipe', 'step_key': 'make_df_solid.compute', 'solid': 'make_df_solid', 'solid_definition': 'make_df_solid'}, event_specific_data=_StepSuccessData(duration_ms=285.62570000030973), message='Finished execution of step "make_df_solid.compute" in 285ms.')]

I've worked around it by using dagster.serdes.{de}serialize_value, but I can't tell why it's happening here but not in the local_external_step_launcher. Python versions are both close if not identical (3.7.5 locally, 3.7.3 remotely). Has this behaviour been seen before?

schrockn commented 4 years ago

Wow that's really strange. I've never seen that behavior before. Have you been able to get a repro in a more controlled environment? Or is this only happening in this environment where you are pickling in the remote execution environment that is different from the local environment?

schrockn commented 4 years ago
import pickle
from collections import namedtuple

from dagster.serdes import whitelist_for_serdes

@whitelist_for_serdes
class ADataClass(namedtuple('_ADataClass', 'some_data')):
    pass

inst = ADataClass('foo')
print(inst)
print(pickle.loads(pickle.dumps(inst)))

Works as expected

$ python repro_pickle.py
ADataClass(some_data='foo')
ADataClass(some_data='foo')
sd2k commented 4 years ago

Ah, it looks like it happens as soon as PySpark is imported; if you add import pyspark to the top of your repro script and rerun it will occur:

$ PYENV_VERSION=dagster37 pyenv exec python repro_pickle.py
ADataClass(some_data='foo')
_ADataClass(some_data='foo')

A bit of digging implicates this: https://jira.apache.org/jira/browse/SPARK-22674. I can't tell how the remote EMR executor is working since that must also import pyspark, but that seems to be the issue.

schrockn commented 4 years ago

That pyspark bug is wild. Can you just use dagster.serdes instead?

natekupp commented 4 years ago

hey @sd2k - let me know when you start looking at implementing the system storage for DBFS, happy to help. You can check out this diff for an example of what's needed to add a new system storage: https://dagster.phacility.com/D2259

sd2k commented 4 years ago

That pyspark bug is wild. Can you just use dagster.serdes instead?

Yep I can do, works fine.

hey @sd2k - let me know when you start looking at implementing the system storage for DBFS, happy to help. You can check out this diff for an example of what's needed to add a new system storage: https://dagster.phacility.com/D2259

Cheers! I'll take a look today and let you know if I have any questions.

sd2k commented 4 years ago

Hmm it does look like a much bigger undertaking than I expected :smile: I'm a bit unsure whether this is the best way to proceed!

The workflow I think Databricks recommend is to not use the DBFS root, instead preferring to either mount an object storage account or access the object store directly (e.g. S3 and Azure. Either way, some kind of config needs executing prior to the pipeline run to mount the object store or set credentials for API calls.

Mounting the object store allows you to access it via DBFS using Spark APIs and local filesystem APIs which is convenient for interactive/notebook use, but not much different when running jobs. I get the feeling that a DBFSSystemStorage is the wrong approach, and instead a Databricks job should require either an Azure or AWS storage account resource to handle persistence. To read/write data inside Spark we could then just rely on the SparkDataFrameS3StoragePlugin (and an equivalent Azure version) to read/write data.

If that sounds reasonable then I need to work on an Azure storage system!

sd2k commented 4 years ago

Another question is how to handle 'Delta Lake' storage. For my purposes the basic idea is that Spark DataFrames would be saved using df.write.format('delta').save('<path>') rather than df.write.parquet('<path>'). My hunch is that it would currently require another TypeStoragePlugin and another system storage indicating that the data was stored in a Delta Lake, but the actual content of the new subclasses should be minimal since a Delta Lake is just a wrapper around parquet files on a filesystem anyway.

I also need to look into the lakehouse package and see if there's any overlap there, but I imagine it complements rather than overlaps!