dagster-io / dagster

An orchestration platform for the development, production, and observation of data assets.
https://dagster.io
Apache License 2.0
11.72k stars 1.48k forks source link

dagster-databricks - incompatible URI format when depending on dynamic op outputs #5836

Open Phazure opened 2 years ago

Phazure commented 2 years ago

Summary

I am attempting to dynamically trigger pyspark ops to be run on a Databricks cluster. However, it seems that that when doing so, dagster-databrick's databricks_pyspark_step_launcher uploads files to dbfs in an incompatible URI form.

Reproduction

from pathlib import Path

from dagster_databricks import databricks_pyspark_step_launcher
from dagster import Any, DynamicOut, DynamicOutput, In, List, Nothing, job, op
from dagster_aws.s3 import s3_pickle_io_manager, s3_resource
from dagster_pyspark import pyspark_resource

databricks_resource_defs = {
    "pyspark_step_launcher": databricks_pyspark_step_launcher.configured(
        {
            "run_config": {
                "run_name": "test_dagster",
                "cluster": {"existing": "my-cluster-id"},
                "libraries": [
                    {"pypi": {"package": "dagster-aws==0.13.8"}},
                    {"pypi": {"package": "dagster-databricks==0.13.8"}},
                ]
            },
            "databricks_host": "https://my.cloud.databricks.com",
            "databricks_token": {"env": "DATABRICKS_TOKEN"},
            "local_pipeline_package_path": str(Path(__file__).parent),
            "secrets_to_env_variables": [],
            "storage": {
                "s3": {
                    "access_key_key": "AWS_ACCESS_KEY_ID",
                    "secret_key_key":"AWS_SECRET_ACCESS_KEY",
                    "secret_scope": my_scope",
                }
            }
        }
    ),
    "pyspark": pyspark_resource.configured({"spark_conf": {"spark.executor.memory": "2g"}}),
    "s3": s3_resource,
    "io_manager": s3_pickle_io_manager.configured(
        {"s3_bucket": "my-bucket", "s3_prefix": "dagster-test"}
    )
}

@op(out=DynamicOut(int))
def gen_dynamic_range(context):
    context.log.info("In gen_dynamic_range.")
    for i in range(0, 3):
        yield DynamicOutput(value=i, mapping_key=f"num_{i}")

@op(required_resource_keys={"pyspark", "pyspark_step_launcher"})
def db_job(context, inputs):
    context.log.info("In db_job.")

@op(ins={"start": In(Nothing)})
def do_nothing(context):
    context.log.info("In do_nothing.")

@job(resource_defs=databricks_resource_defs)
def test_job():
    # Does not work
    dynamic_db_job_execution = gen_dynamic_range().map(db_job).collect()
    do_nothing(start=dynamic_db_job_execution)

Running this job yields the following error:

requests.exceptions.HTTPError: 400 Client Error: Bad Request for url: https://my.cloud.databricks.com/api/2.0/jobs/runs/submit
Response from server:
{ 'error_code': 'INVALID_PARAMETER_VALUE',
'message': 'Invalid python file URI: '
'dbfs:///dagster_staging/9be8dc7d-f346-452a-b874-e20016540d70/db_job[num_2]/databricks_step_main.py. '
'Please visit Databricks user guide for supported URI schemes.'}

Most likely, "[]" characters are unsupported in dbfs URIs.

Dagit UI/UX Issue Screenshots

Additional Info about Your Environment


Message from the maintainers:

Impacted by this bug? Give it a 👍. We factor engagement into prioritization.

Phazure commented 2 years ago

For what it's worth, I've also tried a naive fix locally, replacing step_key = step_run_ref.step_key with step_key = step_run_ref.step_key.replace("[", "__").replace("]", "__") in dagster-databricks/databricks-pyspark_step_launcher.py's launch_step function on line 155.

This eliminates the above URI error, but yields the following which may be a separate issue altogether:

---------------------------------------------------------------------------
DagsterExecutionStepNotFoundError         Traceback (most recent call last)
<command--1> in <module>
     12 
     13 with open(filename, "rb") as f:
---> 14   exec(compile(f.read(), filename, 'exec'))
     15 

/tmp/tmple6nsx9a.py in <module>
     61 
     62 if __name__ == "__main__":
---> 63     main(sys.argv[1], sys.argv[2], sys.argv[3])

/tmp/tmple6nsx9a.py in main(step_run_ref_filepath, setup_filepath, dagster_job_zip)
     53         print("Running dagster job")  # noqa pylint: disable=print-call
     54         with DagsterInstance.ephemeral() as instance:
---> 55             events = list(run_step_from_ref(step_run_ref, instance))
     56 
     57     events_filepath = os.path.dirname(step_run_ref_filepath) + "/" + PICKLED_EVENTS_FILE_NAME

/databricks/python/lib/python3.8/site-packages/dagster/core/execution/plan/external_step.py in run_step_from_ref(step_run_ref, instance)
    197 ) -> Iterator[DagsterEvent]:
    198     check.inst_param(instance, "instance", DagsterInstance)
--> 199     step_context = step_run_ref_to_step_context(step_run_ref, instance)
    200     return core_dagster_event_sequence_for_step(step_context)

/databricks/python/lib/python3.8/site-packages/dagster/core/execution/plan/external_step.py in step_run_ref_to_step_context(step_run_ref, instance)
    162     )
    163 
--> 164     execution_plan = create_execution_plan(
    165         pipeline,
    166         step_run_ref.run_config,

/databricks/python/lib/python3.8/site-packages/dagster/core/execution/api.py in create_execution_plan(pipeline, run_config, mode, step_keys_to_execute, known_state, instance, tags)
    742     resolved_run_config = ResolvedRunConfig.build(pipeline_def, run_config, mode=mode)
    743 
--> 744     return ExecutionPlan.build(
    745         pipeline,
    746         resolved_run_config,

/databricks/python/lib/python3.8/site-packages/dagster/core/execution/plan/plan.py in build(pipeline, resolved_run_config, step_keys_to_execute, known_state, instance, tags)
    907 
    908         # Finally, we build and return the execution plan
--> 909         return plan_builder.build()
    910 
    911     @staticmethod

/databricks/python/lib/python3.8/site-packages/dagster/core/execution/plan/plan.py in build(self)
    203 
    204         if self.step_keys_to_execute is not None:
--> 205             plan = plan.build_subset_plan(
    206                 self.step_keys_to_execute, pipeline_def, self.resolved_run_config
    207             )

/databricks/python/lib/python3.8/site-packages/dagster/core/execution/plan/plan.py in build_subset_plan(self, step_keys_to_execute, pipeline_def, resolved_run_config, step_output_versions)
    711 
    712         if bad_keys:
--> 713             raise DagsterExecutionStepNotFoundError(
    714                 f"Can not build subset plan from unknown step{'s' if len(bad_keys)> 1 else ''}: {', '.join(bad_keys)}",
    715                 step_keys=bad_keys,

DagsterExecutionStepNotFoundError: Can not build subset plan from unknown step: db_job[num_1]

Perhaps databricks-related subgraphs are somehow unaware of (dynamic) step outputs that preceded it?

yuhan commented 2 years ago

Thanks for reporting! @OwenKephart mind taking a look?

OwenKephart commented 2 years ago

Hi @Phazure , thanks again for reporting! I had a chance to look into this pretty deeply, and these are indeed two separate issues. Both should be fixable, so I'll roll these up into some other work I'm doing with the databricks_pyspark_step_launcher.

Phazure commented 2 years ago

Awesome, thank you for the quick responses! Any idea when the fixes could be released (perhaps in the next 0.13.* patch release)?

And while we're at it I suppose, would it also be possible to make the use of DB secret scopes optional when defining the step launcher specs?

            "storage": {
                "s3": {
                    "access_key_key": "AWS_ACCESS_KEY_ID",
                    "secret_key_key":"AWS_SECRET_ACCESS_KEY",
                    "secret_scope": my_scope",
                }
            }

Our DB cluster setup neither has any secret_scopes defined nor would boto3 need it for s3 access (as it is configured with an instance_profile_arn instead). For now, I've simply added the following pass line to setup_storage and it seems to work.

    def setup_storage(self, dbutils, sc):
        """Set up storage using either S3 or ADLS2."""
        if "s3" in self.storage:
            pass
            # self.setup_s3_storage(self.storage["s3"], dbutils, sc)
        elif "adls2" in self.storage:
            self.setup_adls2_storage(self.storage["adls2"], dbutils, sc)
        else:
            raise Exception("No valid storage found in Databricks configuration!")
OwenKephart commented 2 years ago

Hi @Phazure -- I'm basically doing a giant refactor / rewrite of the step launcher at the moment, but I should be able to break it up in a way that fixes for the first two issues go out in next week's release (and possibly the third issue). Thanks again for reporting all of this, it's useful to have it all in one place :)

OwenKephart commented 2 years ago

Fixes for both of these issues will go out in this week's release (0.13.12)

Phazure commented 2 years ago

Hi @OwenKephart - many thanks for the release. Will hopefully have the time to test it out this week.

In other news, have also come across another issue (feature request, not bug), which I think may be incredibly useful for any productionized Dagster deployment with Databricks ops. I've outlined the request / proposal here https://github.com/dagster-io/dagster/issues/6156 if you'd be interested in taking a look.

Phazure commented 2 years ago

Hi @OwenKephart - finally had a chance to test out dagster-databricks 0.13.14. The invalid URI and dynamic mapping issues seem to be resolved now, but had two additional comments: