fugue-project / fugue

A unified interface for distributed computing. Fugue executes SQL, Python, Pandas, and Polars code on Spark, Dask and Ray without any rewrites.
https://fugue-tutorials.readthedocs.io/
Apache License 2.0
1.92k stars 94 forks source link

[BUG] v0.8.4 does not work on databricks without changes #479

Closed alunap closed 1 year ago

alunap commented 1 year ago

Minimal Code To Reproduce

captures_table = spark.sql(f"""SELECT id, keyword_context, model_prediction
                            FROM bronze.captures 
                            WHERE model_prediction = -1 AND
                           policy IN ({policies})""")

def predict(df:pd.DataFrame, model:Pipeline) -> pd.DataFrame:
  df.loc[:, 'keyword_context'] = df.keyword_context.apply(lambda x: x.replace("\n", " "))
  prediction = model.predict(df.keyword_context)
  df['model_prediction'] = prediction
  return df

result = transform(
  captures_table,
  predict,
  schema="*+model_prediction:int8",
  params=dict(model=model),
  engine=spark
)

Describe the bug v0.8.4 stopped working on databricks spark.

FugueInterfacelessError: (‘DataFrame[id: bigint, keyword_context: string, model_prediction: smallint] is not a valid creator’, AttributeError(‘DataFrame[id: bigint, keyword_context: string, model_prediction: smallint] is not a function’))

Thanks to Han Wang, this was identified as a missing prerequisite: cloudpickle. Further, I used to be able to simply add 'fugue' as a pypi library in databricks clusters, but now that also gives an error, so I need to set it as 'fugue[spark]'. With those two things done, fugue once again works for Databricks. One other point is that cloudpickle is included by default in 'ML' versions of databricks clusters, but not standard clusters (it is not necessary to take the GPU ML version, though).

Expected behavior The function is called and the model prediction added to the dataframe as a new column.

Environment (please complete the following information):

goodwanghan commented 1 year ago

This has been resolved by https://github.com/fugue-project/fugue/pull/474

Please try fugue==0.8.5.dev1