dagster-io / dagster

An orchestration platform for the development, production, and observation of data assets.
https://dagster.io
Apache License 2.0
11.65k stars 1.47k forks source link

Allow dagster-ge solids to work with additional input types #2910

Closed sd2k closed 4 years ago

sd2k commented 4 years ago

Currently the solid returned by ge_validation_solid_factory only works for pandas DataFrames. It should be fairly easy to add a few extra parameters to the factory so that the input type can be customised. It would also be good if the batch_kwargs could be customised, since GE's SparkDataFrame often needs additional options passing (e.g. reader_options).

Happy to submit a PR for review if work hasn't already been done here. The diff I had in mind was something like:

-def ge_validation_solid_factory(datasource_name, suite_name, validation_operator_name=None):
+def ge_validation_solid_factory(
+    datasource_name, suite_name, validation_operator_name=None, input_dagster_type=DataFrame,
+):
     """
         Generates solids for interacting with GE, currently only works on pandas dataframes
     Args:
@@ -46,7 +50,7 @@ def ge_validation_solid_factory(datasource_name, suite_name, validation_operator
     """

     @solid(
-        input_defs=[InputDefinition("pandas_df", dagster_type=DataFrame)],
+        input_defs=[InputDefinition("df")],
         output_defs=[
             OutputDefinition(
                 dagster_type=dict,
@@ -58,10 +62,11 @@ def ge_validation_solid_factory(datasource_name, suite_name, validation_operator
         """,
             )
         ],
+        config_schema={"extra_batch_kwargs": Field(Permissive()),},
         required_resource_keys={"ge_data_context"},
         tags={"kind": "ge"},
     )
-    def ge_validation_solid(context, pandas_df):
+    def ge_validation_solid(context, df):
         data_context = context.resources.ge_data_context
         if validation_operator_name is not None:
             validation_operator = validation_operator_name
@@ -73,8 +78,9 @@ def ge_validation_solid_factory(datasource_name, suite_name, validation_operator
             validation_operator = "ephemeral_validation"
         suite = data_context.get_expectation_suite(suite_name)
         batch_kwargs = {
-            "dataset": pandas_df,
+            "dataset": df,
             "datasource": datasource_name,
+            **context.solid_config["extra_batch_kwargs"],
         }
         batch = data_context.get_batch(batch_kwargs, suite)
         run_id = {

I think this would work for Spark DataFrames, but probably not for other datasources which don't expect a dataset batch_kwarg; those would need to be handled differently somehow.

schrockn commented 4 years ago

Instead of getting rid of the type information, I think we could just parameterize it. E.g. add a dagster_type and parameter_name parameter to ge_validation_solid_factory. Re: batch_kwargs I'm not sure what the differences are on the GE side, but I would guess the right place to plop "extra_batch_kwargs" args would be as a parameter to the solid factory not in config

sd2k commented 4 years ago

Hmm, yes it looks like my diff was pretty far from what I intended :smile: - I did include the dagster_type (as input_dagster_type) as a parameter to ge_validation_solid_factory, but forgot to add it to the InputDefinition. And roger that re. the extra_batch_kwargs.