fugue-project / fugue

A unified interface for distributed computing. Fugue executes SQL, Python, Pandas, and Polars code on Spark, Dask and Ray without any rewrites.
https://fugue-tutorials.readthedocs.io/
Apache License 2.0
2.01k stars 94 forks source link

[BUG] Unable to use spark backend on High Concurrency Databricks Clusters #517

Closed jstammers closed 1 year ago

jstammers commented 1 year ago

Minimal Code To Reproduce

from fugue import api as fa
from fugue.column import functions as f, col
import pandas as pd

df = pd.DataFrame({"a":[1,2,2],"b":[1,2,3]})

df_agg = fa.aggregate(df, partition_by=["a"], engine='spark',agg=f.sum(col("b")))

Describe the bug I am trying to execute some code on a shared databricks cluster and have encountered the following error

Traceback ```bash File /local_disk0/.ephemeral_nfs/envs/pythonEnv-db41b783-be44-4fb2-9cf0-f9266ee90aee/lib/python3.10/site-packages/fugue/execution/api.py:1219, in aggregate(df, partition_by, engine, engine_conf, as_fugue, as_local, **agg_kwcols) 1184 """Aggregate on dataframe 1185 1186 :param df: the dataframe to aggregate on (...) 1213 fa.aggregate(df, "a", x=f.max(col("b"))) 1214 """ 1215 cols = [ 1216 v.alias(k) if isinstance(v, ColumnExpr) else lit(v).alias(k) 1217 for k, v in agg_kwcols.items() 1218 ] -> 1219 return run_engine_function( 1220 lambda e: e.aggregate( 1221 as_fugue_df(df), 1222 partition_spec=None 1223 if partition_by is None 1224 else PartitionSpec(by=partition_by), 1225 agg_cols=cols, 1226 ), 1227 engine=engine, 1228 engine_conf=engine_conf, 1229 infer_by=[df], 1230 as_fugue=as_fugue, 1231 as_local=as_local, 1232 ) File /local_disk0/.ephemeral_nfs/envs/pythonEnv-db41b783-be44-4fb2-9cf0-f9266ee90aee/lib/python3.10/site-packages/fugue/execution/api.py:171, in run_engine_function(func, engine, engine_conf, as_fugue, as_local, infer_by) 145 def run_engine_function( 146 func: Callable[[ExecutionEngine], Any], 147 engine: AnyExecutionEngine = None, (...) 151 infer_by: Optional[List[Any]] = None, 152 ) -> Any: 153 """Run a lambda function based on the engine provided 154 155 :param engine: an engine like object, defaults to None (...) 169 This function is for deveopment use. Users should not need it. 170 """ --> 171 with engine_context(engine, engine_conf=engine_conf, infer_by=infer_by) as e: 172 res = func(e) 174 if isinstance(res, DataFrame): File /usr/lib/python3.10/contextlib.py:281, in contextmanager..helper(*args, **kwds) 279 @wraps(func) 280 def helper(*args, **kwds): --> 281 return _GeneratorContextManager(func, args, kwds) File /usr/lib/python3.10/contextlib.py:103, in _GeneratorContextManagerBase.__init__(self, func, args, kwds) 102 def __init__(self, func, args, kwds): --> 103 self.gen = func(*args, **kwds) 104 self.func, self.args, self.kwds = func, args, kwds 105 # Issue 19330: ensure context manager instances have good docstrings File /local_disk0/.ephemeral_nfs/envs/pythonEnv-db41b783-be44-4fb2-9cf0-f9266ee90aee/lib/python3.10/site-packages/fugue/execution/api.py:49, in engine_context(engine, engine_conf, infer_by) 21 @contextmanager 22 def engine_context( 23 engine: AnyExecutionEngine = None, 24 engine_conf: Any = None, 25 infer_by: Optional[List[Any]] = None, 26 ) -> Iterator[ExecutionEngine]: 27 """Make an execution engine and set it as the context engine. This function 28 is thread safe and async safe. 29 (...) 47 48 """ ---> 49 e = make_execution_engine(engine, engine_conf, infer_by=infer_by) 50 return e._as_context() File /local_disk0/.ephemeral_nfs/envs/pythonEnv-db41b783-be44-4fb2-9cf0-f9266ee90aee/lib/python3.10/site-packages/fugue/execution/factory.py:334, in make_execution_engine(engine, conf, infer_by, **kwargs) 332 result = engine 333 else: --> 334 result = parse_execution_engine(engine, conf, **kwargs) 335 sql_engine = make_sql_engine(None, result) 336 result.set_sql_engine(sql_engine) File /local_disk0/.ephemeral_nfs/envs/pythonEnv-db41b783-be44-4fb2-9cf0-f9266ee90aee/lib/python3.10/site-packages/triad/utils/dispatcher.py:111, in conditional_dispatcher.._run.._Dispatcher.__call__(self, *args, **kwds) 110 def __call__(self, *args: Any, **kwds: Any) -> Any: --> 111 return self.run_top(*args, **kwds) File /local_disk0/.ephemeral_nfs/envs/pythonEnv-db41b783-be44-4fb2-9cf0-f9266ee90aee/lib/python3.10/site-packages/triad/utils/dispatcher.py:268, in ConditionalDispatcher.run_top(self, *args, **kwargs) 263 def run_top(self, *args: Any, **kwargs: Any) -> Any: 264 """Execute the first matching child function 265 266 :return: the return of the child function 267 """ --> 268 return list(itertools.islice(self.run(*args, **kwargs), 1))[0] File /local_disk0/.ephemeral_nfs/envs/pythonEnv-db41b783-be44-4fb2-9cf0-f9266ee90aee/lib/python3.10/site-packages/triad/utils/dispatcher.py:258, in ConditionalDispatcher.run(self, *args, **kwargs) 256 for f in self._funcs: 257 if self._match(f[2], *args, **kwargs): --> 258 yield f[3](*args, **kwargs) 259 has_return = True 260 if not has_return: File /local_disk0/.ephemeral_nfs/envs/pythonEnv-db41b783-be44-4fb2-9cf0-f9266ee90aee/lib/python3.10/site-packages/fugue/execution/factory.py:77, in register_execution_engine..(engine, conf, **kwargs) 74 if isinstance(name_or_type, str): 75 nm = name_or_type 76 parse_execution_engine.register( # type: ignore ---> 77 func=lambda engine, conf, **kwargs: func(conf, **kwargs), 78 matcher=lambda engine, conf, **kwargs: isinstance(engine, str) 79 and engine == nm, 80 priority=_get_priority(on_dup), 81 ) 82 else: # type 83 tp = name_or_type File /local_disk0/.ephemeral_nfs/envs/pythonEnv-db41b783-be44-4fb2-9cf0-f9266ee90aee/lib/python3.10/site-packages/fugue_spark/registry.py:54, in _register_engines..(conf, **kwargs) 51 def _register_engines() -> None: 52 register_execution_engine( 53 "spark", ---> 54 lambda conf, **kwargs: SparkExecutionEngine(conf=conf), 55 on_dup="ignore", 56 ) 57 register_execution_engine( 58 SparkSession, 59 lambda session, conf, **kwargs: SparkExecutionEngine(session, conf=conf), 60 on_dup="ignore", 61 ) 62 if SparkConnectSession is not None: File /local_disk0/.ephemeral_nfs/envs/pythonEnv-db41b783-be44-4fb2-9cf0-f9266ee90aee/lib/python3.10/site-packages/fugue_spark/execution_engine.py:340, in SparkExecutionEngine.__init__(self, spark_session, conf) 337 cf = dict(FUGUE_SPARK_DEFAULT_CONF) 338 if not self.is_spark_connect: 339 cf.update( --> 340 {x[0]: x[1] for x in spark_session.sparkContext.getConf().getAll()} 341 ) 342 cf.update(ParamDict(conf)) 343 super().__init__(cf) File /databricks/spark/python/pyspark/context.py:2538, in SparkContext.getConf(self) 2533 def getConf(self) -> SparkConf: 2534 """Return a copy of this SparkContext's configuration :class:`SparkConf`. 2535 2536 .. versionadded:: 2.1.0 2537 """ -> 2538 conf = SparkConf() 2539 conf.setAll(self._conf.getAll()) 2540 return conf File /databricks/spark/python/pyspark/conf.py:132, in SparkConf.__init__(self, loadDefaults, _jvm, _jconf) 128 _jvm = _jvm or SparkContext._jvm 130 if _jvm is not None: 131 # JVM is created, so create self._jconf directly through JVM --> 132 self._jconf = _jvm.SparkConf(loadDefaults) 133 self._conf = None 134 else: 135 # JVM is not created, so store data in self._conf first File /databricks/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py:1587, in JavaClass.__call__(self, *args) 1581 command = proto.CONSTRUCTOR_COMMAND_NAME +\ 1582 self._command_header +\ 1583 args_command +\ 1584 proto.END_COMMAND_PART 1586 answer = self._gateway_client.send_command(command) -> 1587 return_value = get_return_value( 1588 answer, self._gateway_client, None, self._fqn) 1590 for temp_arg in temp_args: 1591 if hasattr(temp_arg, "_detach"): File /databricks/spark/python/pyspark/errors/exceptions/captured.py:188, in capture_sql_exception..deco(*a, **kw) 186 def deco(*a: Any, **kw: Any) -> Any: 187 try: --> 188 return f(*a, **kw) 189 except Py4JJavaError as e: 190 converted = convert_exception(e.java_exception) File /databricks/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/protocol.py:330, in get_return_value(answer, gateway_client, target_id, name) 326 raise Py4JJavaError( 327 "An error occurred while calling {0}{1}{2}.\n". 328 format(target_id, ".", name), value) 329 else: --> 330 raise Py4JError( 331 "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n". 332 format(target_id, ".", name, value)) 333 else: 334 raise Py4JError( 335 "An error occurred while calling {0}{1}{2}". 336 format(target_id, ".", name)) Py4JError: An error occurred while calling None.org.apache.spark.SparkConf. Trace: py4j.security.Py4JSecurityException: Constructor public org.apache.spark.SparkConf(boolean) is not whitelisted. at py4j.security.WhitelistingPy4JSecurityManager.checkConstructor(WhitelistingPy4JSecurityManager.java:451) at py4j.Gateway.invoke(Gateway.java:256) at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80) at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69) at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:195) at py4j.ClientServerConnection.run(ClientServerConnection.java:115) at java.lang.Thread.run(Thread.java:750) ```

From looking into this a little further, I think it's related to the fact that [certain functions are disabled for security reasons on high concurrency clusters. Is there another way I can configure fugue to use the spark execution engine in this instance?

Expected behavior I would expect this aggregation to be executed, as it would on other databricks clusters I have tried

Environment (please complete the following information):

goodwanghan commented 1 year ago

Ah, this is really interesting, I don't even know SparkConf can be access controlled. Maybe we can ignore the error if SparkConf doesn't work. I will release a dev version if you want to try.

jstammers commented 1 year ago

Thanks @goodwanghan - I wasn't aware either, until I encountered this error. I'd be happy to try a dev version once you've made the necessary changes

goodwanghan commented 1 year ago

@jstammers please try 0.8.7.dev5, it may have solved this issue

jstammers commented 1 year ago

That seems to work for me. Thanks for solving this so quickly!