fugue-project / fugue

A unified interface for distributed computing. Fugue executes SQL, Python, Pandas, and Polars code on Spark, Dask and Ray without any rewrites.
https://fugue-tutorials.readthedocs.io/
Apache License 2.0
1.92k stars 94 forks source link

[BUG] #519

Closed guilhermedelyra closed 8 months ago

guilhermedelyra commented 9 months ago

Minimal Code To Reproduce

import pandas as pd
from prefect import task, flow
from prefect_fugue import fugue_engine
from fugue import fsql

@task
def load_data():
    return pd.read_parquet("https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_2022-01.parquet")

@flow
def run_sql(top, engine):
    data = load_data()
    with fugue_engine(engine):
        fsql("""
        SELECT PULocationID, COUNT(*) AS ct FROM df
        GROUP BY 1 ORDER BY 2 DESC LIMIT 2
        PRINT
        """, data)

run_sql(1, "spark")

Describe the bug "ImportError: ord() expected string of length 1, but int found. Please try to install the package by pip install fugue[sql]."

Full log: ```console /home/user/fugue_ibis_experiment/.venv/lib/python3.10/site-packages/fugue_sql/__init__.py:8: UserWarning: fsql and FugueSQLWorkflow now should be imported directly from fugue, fugue_sql will be removed in 0.9.0 warnings.warn( 02:17:10.612 | INFO | prefect.engine - Created flow run 'finicky-cat' for flow 'run-sql' 02:17:10.752 | INFO | Flow run 'finicky-cat' - Created task run 'load_data-0' for task 'load_data' 02:17:10.754 | INFO | Flow run 'finicky-cat' - Executing 'load_data-0' immediately... 02:17:12.186 | INFO | Task run 'load_data-0' - Finished in state Completed() 23/10/03 02:17:15 WARN Utils: Your hostname, WDX5CG01992LV0 resolves to a loopback address: 127.0.1.1; using 172.30.70.255 instead (on interface eth0) 23/10/03 02:17:15 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 23/10/03 02:17:16 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 02:17:19.667 | ERROR | Flow run 'finicky-cat' - Encountered exception during execution: Traceback (most recent call last): File "/home/user/fugue_ibis_experiment/.venv/lib/python3.10/site-packages/fugue/_utils/misc.py", line 20, in import_or_throw return __import__(package_name) File "/home/user/fugue_ibis_experiment/.venv/lib/python3.10/site-packages/fugue_sql_antlr/__init__.py", line 2, in from fugue_sql_antlr.constants import ( File "/home/user/fugue_ibis_experiment/.venv/lib/python3.10/site-packages/fugue_sql_antlr/constants.py", line 1, in from fugue_sql_antlr._parser.sa_fugue_sql import USE_CPP_IMPLEMENTATION File "/home/user/fugue_ibis_experiment/.venv/lib/python3.10/site-packages/fugue_sql_antlr/_parser/sa_fugue_sql.py", line 12, in from .fugue_sqlParser import fugue_sqlParser File "/home/user/fugue_ibis_experiment/.venv/lib/python3.10/site-packages/fugue_sql_antlr/_parser/fugue_sqlParser.py", line 1720, in class fugue_sqlParser ( Parser ): File "/home/user/fugue_ibis_experiment/.venv/lib/python3.10/site-packages/fugue_sql_antlr/_parser/fugue_sqlParser.py", line 1724, in fugue_sqlParser atn = ATNDeserializer().deserialize(serializedATN()) File "/home/user/.local/lib/python3.10/site-packages/antlr4/atn/ATNDeserializer.py", line 61, in deserialize self.reset(data) File "/home/user/.local/lib/python3.10/site-packages/antlr4/atn/ATNDeserializer.py", line 91, in reset temp = [ adjust(c) for c in data ] File "/home/user/.local/lib/python3.10/site-packages/antlr4/atn/ATNDeserializer.py", line 91, in temp = [ adjust(c) for c in data ] File "/home/user/.local/lib/python3.10/site-packages/antlr4/atn/ATNDeserializer.py", line 89, in adjust v = ord(c) TypeError: ord() expected string of length 1, but int found During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/home/user/fugue_ibis_experiment/.venv/lib/python3.10/site-packages/prefect/engine.py", line 829, in orchestrate_flow_run result = await flow_call.aresult() File "/home/user/fugue_ibis_experiment/.venv/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 291, in aresult return await asyncio.wrap_future(self.future) File "/home/user/fugue_ibis_experiment/.venv/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 315, in _run_sync result = self.fn(*self.args, **self.kwargs) File "/home/user/fugue_ibis_experiment/fuguecreator.py", line 16, in run_sql fsql(""" File "/home/user/fugue_ibis_experiment/.venv/lib/python3.10/site-packages/fugue/sql/api.py", line 252, in fugue_sql_flow dag = _build_dag( File "/home/user/fugue_ibis_experiment/.venv/lib/python3.10/site-packages/fugue/sql/api.py", line 284, in _build_dag dag._sql(query, global_vars, local_vars, *args, **kwargs) File "/home/user/fugue_ibis_experiment/.venv/lib/python3.10/site-packages/fugue/sql/workflow.py", line 39, in _sql FugueSQLParser = import_fsql_dependency("fugue_sql_antlr").FugueSQLParser File "/home/user/fugue_ibis_experiment/.venv/lib/python3.10/site-packages/fugue/_utils/misc.py", line 26, in import_fsql_dependency return import_or_throw( File "/home/user/fugue_ibis_experiment/.venv/lib/python3.10/site-packages/fugue/_utils/misc.py", line 22, in import_or_throw raise ImportError(str(e) + ". " + message) ImportError: ord() expected string of length 1, but int found. Please try to install the package by `pip install fugue[sql]`. 02:17:19.757 | ERROR | Flow run 'finicky-cat' - Finished in state Failed('Flow run encountered an exception. ImportError: ord() expected string of length 1, but int found. Please try to install the package by `pip install fugue[sql]`.') Traceback (most recent call last): File "/home/user/fugue_ibis_experiment/.venv/lib/python3.10/site-packages/fugue/_utils/misc.py", line 20, in import_or_throw return __import__(package_name) File "/home/user/fugue_ibis_experiment/.venv/lib/python3.10/site-packages/fugue_sql_antlr/__init__.py", line 2, in from fugue_sql_antlr.constants import ( File "/home/user/fugue_ibis_experiment/.venv/lib/python3.10/site-packages/fugue_sql_antlr/constants.py", line 1, in from fugue_sql_antlr._parser.sa_fugue_sql import USE_CPP_IMPLEMENTATION File "/home/user/fugue_ibis_experiment/.venv/lib/python3.10/site-packages/fugue_sql_antlr/_parser/sa_fugue_sql.py", line 12, in from .fugue_sqlParser import fugue_sqlParser File "/home/user/fugue_ibis_experiment/.venv/lib/python3.10/site-packages/fugue_sql_antlr/_parser/fugue_sqlParser.py", line 1720, in class fugue_sqlParser ( Parser ): File "/home/user/fugue_ibis_experiment/.venv/lib/python3.10/site-packages/fugue_sql_antlr/_parser/fugue_sqlParser.py", line 1724, in fugue_sqlParser atn = ATNDeserializer().deserialize(serializedATN()) File "/home/user/.local/lib/python3.10/site-packages/antlr4/atn/ATNDeserializer.py", line 61, in deserialize self.reset(data) File "/home/user/.local/lib/python3.10/site-packages/antlr4/atn/ATNDeserializer.py", line 91, in reset temp = [ adjust(c) for c in data ] File "/home/user/.local/lib/python3.10/site-packages/antlr4/atn/ATNDeserializer.py", line 91, in temp = [ adjust(c) for c in data ] File "/home/user/.local/lib/python3.10/site-packages/antlr4/atn/ATNDeserializer.py", line 89, in adjust v = ord(c) TypeError: ord() expected string of length 1, but int found During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/home/user/fugue_ibis_experiment/fuguecreator.py", line 23, in run_sql(1, "spark") File "/home/user/fugue_ibis_experiment/.venv/lib/python3.10/site-packages/prefect/flows.py", line 758, in __call__ return enter_flow_run_engine_from_flow_call( File "/home/user/fugue_ibis_experiment/.venv/lib/python3.10/site-packages/prefect/engine.py", line 271, in enter_flow_run_engine_from_flow_call retval = from_sync.wait_for_call_in_loop_thread( File "/home/user/fugue_ibis_experiment/.venv/lib/python3.10/site-packages/prefect/_internal/concurrency/api.py", line 243, in wait_for_call_in_loop_thread return call.result() File "/home/user/fugue_ibis_experiment/.venv/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 282, in result return self.future.result(timeout=timeout) File "/home/user/fugue_ibis_experiment/.venv/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 168, in result return self.__get_result() File "/home/user/fugue_ibis_experiment/.venv/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result raise self._exception File "/home/user/fugue_ibis_experiment/.venv/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 345, in _run_async result = await coro File "/home/user/fugue_ibis_experiment/.venv/lib/python3.10/site-packages/prefect/client/utilities.py", line 51, in with_injected_client return await fn(*args, **kwargs) File "/home/user/fugue_ibis_experiment/.venv/lib/python3.10/site-packages/prefect/engine.py", line 374, in create_then_begin_flow_run return await state.result(fetch=True) File "/home/user/fugue_ibis_experiment/.venv/lib/python3.10/site-packages/prefect/states.py", line 91, in _get_state_result raise await get_state_exception(state) File "/home/user/fugue_ibis_experiment/.venv/lib/python3.10/site-packages/prefect/engine.py", line 829, in orchestrate_flow_run result = await flow_call.aresult() File "/home/user/fugue_ibis_experiment/.venv/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 291, in aresult return await asyncio.wrap_future(self.future) File "/home/user/fugue_ibis_experiment/.venv/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 315, in _run_sync result = self.fn(*self.args, **self.kwargs) File "/home/user/fugue_ibis_experiment/fuguecreator.py", line 16, in run_sql fsql(""" File "/home/user/fugue_ibis_experiment/.venv/lib/python3.10/site-packages/fugue/sql/api.py", line 252, in fugue_sql_flow dag = _build_dag( File "/home/user/fugue_ibis_experiment/.venv/lib/python3.10/site-packages/fugue/sql/api.py", line 284, in _build_dag dag._sql(query, global_vars, local_vars, *args, **kwargs) File "/home/user/fugue_ibis_experiment/.venv/lib/python3.10/site-packages/fugue/sql/workflow.py", line 39, in _sql FugueSQLParser = import_fsql_dependency("fugue_sql_antlr").FugueSQLParser File "/home/user/fugue_ibis_experiment/.venv/lib/python3.10/site-packages/fugue/_utils/misc.py", line 26, in import_fsql_dependency return import_or_throw( File "/home/user/fugue_ibis_experiment/.venv/lib/python3.10/site-packages/fugue/_utils/misc.py", line 22, in import_or_throw raise ImportError(str(e) + ". " + message) ImportError: ord() expected string of length 1, but int found. Please try to install the package by `pip install fugue[sql]`. ```

Already reinstalled, but the error keeps on happening

Expected behavior Basically the same as the tutorial: https://fugue-tutorials.readthedocs.io/tutorials/integrations/ecosystem/prefect.html#running-sql-on-any-spark-dask-and-duckdb

Environment (please complete the following information):

packages installed:

[project]
dependencies = [
    "prefect-fugue>=0.0.1",
    "pandas>=2.1.0",
    "pyspark>=3.4.0",
    "fugue[sql]>=0.8.6",
]
Pip freeze: ```console adagio==0.2.4 aiohttp==3.8.5 aiosignal==1.3.1 aiosqlite==0.19.0 alembic==1.12.0 antlr4-python3-runtime==4.9.3 anyconfig==0.13.0 anyio==3.7.1 appdirs==1.4.4 apprise==1.5.0 arrow==1.2.3 asgi-lifespan==2.1.0 astroid==2.13.5 asttokens==2.4.0 async-timeout==4.0.3 asyncpg==0.28.0 atpublic==4.0 attrs==23.1.0 backcall==0.2.0 binaryornot==0.4.4 black==23.3.0 bleach==6.0.0 blinker==1.6.2 boto3==1.26.32 botocore==1.29.165 build==1.0.3 CacheControl==0.13.1 cachetools==5.3.1 certifi==2023.7.22 cffi==1.16.0 cfgv==3.3.1 charset-normalizer==2.1.1 click==8.1.7 cloudpickle==2.2.1 colorama==0.4.6 contourpy==1.1.1 cookiecutter==2.1.1 coolname==2.2.0 coverage==7.2.7 croniter==1.4.1 cryptography==41.0.4 cx-Oracle==8.3.0 cycler==0.11.0 databricks-cli==0.17.8 dateparser==1.1.8 decorator==5.1.1 delta-spark==2.1.0 diff-cover==7.7.0 dill==0.3.7 distlib==0.3.7 docker==6.1.3 dynaconf==3.2.3 entrypoints==0.4 et-xmlfile==1.1.0 exceptiongroup==1.1.2 executing==1.2.0 fastapi==0.103.2 filelock==3.12.2 findpython==0.4.0 findspark==2.0.1 flake8==5.0.4 Flask==2.3.3 fonttools==4.42.1 fs==2.4.16 fsspec==2023.9.2 fugue==0.8.6 fugue-sql-antlr==0.1.7 gitdb==4.0.10 GitPython==3.1.37 google-auth==2.23.2 graphviz==0.20.1 greenlet==2.0.2 griffe==0.36.4 gunicorn==21.2.0 h11==0.14.0 h2==4.1.0 hpack==4.0.0 httpcore==0.18.0 httpx==0.25.0 hyperframe==6.0.1 identify==2.5.26 idna==3.4 importlib-resources==6.1.0 iniconfig==2.0.0 installer==0.7.0 ipython==8.15.0 isort==5.12.0 itsdangerous==2.1.2 JayDeBeApi==1.2.3 jedi==0.19.0 Jinja2==3.1.2 jinja2-time==0.2.0 jmespath==1.0.1 joblib==1.3.2 JPype1==1.4.0 jsonpatch==1.33 jsonpointer==2.4 jsonschema==4.19.1 jsonschema-specifications==2023.7.1 kedro==0.18.13 kiwisolver==1.4.5 kubernetes==28.1.0 lazy-object-proxy==1.9.0 Mako==1.2.4 Markdown==3.4.4 markdown-it-py==3.0.0 MarkupSafe==2.1.3 matplotlib==3.8.0 matplotlib-inline==0.1.6 mccabe==0.7.0 mdurl==0.1.2 mlflow==2.7.1 more-itertools==10.1.0 msgpack==1.0.7 mypy-extensions==1.0.0 nodeenv==1.8.0 numpy==1.23.0 oauthlib==3.2.2 omegaconf==2.3.0 openpyxl==3.0.10 oracledb==1.4.1 orjson==3.9.7 packaging==23.1 pandas==1.4.3 parse==1.19.1 parso==0.8.3 pathspec==0.11.2 pdm==2.9.3 pendulum==2.1.2 pickleshare==0.7.5 Pillow==10.0.1 pip-tools==7.3.0 platformdirs==3.10.0 pluggy==1.2.0 pre-commit==2.21.0 prefect==2.13.4 prefect-fugue==0.0.1 prompt-toolkit==3.0.39 protobuf==4.24.3 pure-eval==0.2.2 py4j==0.10.9.7 pyarrow==13.0.0 pyasn1==0.5.0 pyasn1-modules==0.3.0 pycodestyle==2.9.1 pycparser==2.21 pydantic==1.10.13 pyflakes==2.5.0 Pygments==2.16.1 pylint==2.15.9 pyproject_hooks==1.0.0 pyspark==3.5.0 pytest==7.4.0 pytest-cov==3.0.0 pytest-mock==3.8.2 pytest-randomly==3.12.0 pytest-spark==0.6.0 python-benedict==0.32.0 python-dateutil==2.8.2 python-dotenv==1.0.0 python-fsutil==0.10.0 python-slugify==8.0.1 pytoolconfig==1.2.5 pytz==2023.3.post1 pytzdata==2020.1 PyYAML==6.0 qpd==0.4.4 querystring-parser==1.2.4 readchar==4.0.5 referencing==0.30.2 regex==2023.8.8 requests==2.28.1 requests-oauthlib==1.3.1 requests-toolbelt==1.0.0 resolvelib==1.0.1 rich==13.5.3 rope==1.9.0 rpds-py==0.10.3 rsa==4.9 ruamel.yaml==0.17.33 ruamel.yaml.clib==0.2.7 s3transfer==0.6.1 scikit-learn==1.3.1 scipy==1.11.2 shellingham==1.5.3 six==1.16.0 smmap==5.0.1 sniffio==1.3.0 SQLAlchemy==2.0.21 sqlfluff==2.3.2 sqlglot==18.7.0 sqlparse==0.4.2 sqlparser==0.0.9 stack-data==0.6.2 starlette==0.27.0 tabulate==0.9.0 tblib==2.0.0 text-unidecode==1.3 threadpoolctl==3.2.0 toml==0.10.2 tomli==2.0.1 tomlkit==0.12.1 toposort==1.10 tqdm==4.66.0 traitlets==5.10.0 triad==0.9.1 truststore==0.8.0 typer==0.9.0 typing_extensions==4.7.1 tzdata==2023.3 tzlocal==5.0.1 unearth==0.11.0 urllib3==1.26.16 uvicorn==0.23.2 virtualenv==20.24.2 wcwidth==0.2.6 webencodings==0.5.1 websocket-client==1.6.3 websockets==11.0.3 Werkzeug==2.3.7 wrapt==1.15.0 ```
goodwanghan commented 9 months ago

@guilhermedelyra thanks so much for reporting!

Acutally prefect-fugue needs refactoring because it was designed before a major refactoring of Fugue.

After the fix on the prefect-fugue side, the experience should be even simpler:

import pandas as pd
import fugue.api as fa
from prefect import task, flow

@task
def load_data():
    return pd.read_parquet("https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_2022-01.parquet")

@flow
def run_sql(top, engine):
    data = load_data()
    with fa.engine_context(engine):
        fa.fugue_sql_flow("""
        SELECT PULocationID, COUNT(*) AS ct FROM df
        GROUP BY 1 ORDER BY 2 DESC LIMIT 2
        PRINT
        """, data)

run_sql(1, "spark") # your current spark session (not recommended)
run_sql(1, "prefect_fugue/<spark_block>") # using the config in your prefect block to connect to remote spark/databricks
guilhermedelyra commented 9 months ago

that's really nice to know 🙂

Is there anything I can do to help?

goodwanghan commented 9 months ago

@guilhermedelyra for this one, this is a long due change, and I just made it. Thanks for raising this problem.

https://github.com/fugue-project/prefect-fugue/pull/9

If you are interested in contributing to either the prefect-fugue or the fugue project, please let us know. We can discuss on slack channel.

Thanks!

goodwanghan commented 8 months ago

@guilhermedelyra we have released prefect-fugue 0.0.3, and you can find instruction here: https://github.com/fugue-project/prefect-fugue/pull/10

Please let us know if you have any question or feedbacks.

Thanks

guilhermedelyra commented 8 months ago

thank you very much! 🙂 I'll be in contact any time soon