modin-project / modin

Modin: Scale your Pandas workflows by changing a single line of code
http://modin.readthedocs.io
Apache License 2.0
9.89k stars 653 forks source link

BUG: FileNotFoundError on read_csv method with url argument #5532

Open TheWCKD opened 1 year ago

TheWCKD commented 1 year ago

Modin version checks

Reproducible Example

import modin.pandas as pd
import ray

ray.init(
    num_cpus=6,
    ignore_reinit_error=True,
    runtime_env={"pip": ["modin"], "env_vars": {"__MODIN_AUTOIMPORT_PANDAS__": "1"}},
)

url = 'http://127.0.0.1:9000/exp?query=SELECT+pair%2C+price%2C+ts+FROM+%27klines_1s%27+WHERE+ts+%3E%3D+%272022-01-01T00%3A00%3A00.000000Z%27+AND+ts+%3C+%272022-01-02T00%3A00%3A00.000000Z%27'
seconds = pd.read_csv(url)

Issue Description

BadHttpMessage: 400, message='Expected HTTP/'
The above exception was the direct cause of the following exception:

FileNotFoundError: http://127.0.0.1:9000/exp?query=SELECT+pair%2C+price%2C+ts+FROM+%27klines_1s%27+WHERE+ts+%3E%3D+%272022-01-01T00%3A00%3A00.000000Z%27+AND+ts+%3C+%272022-01-02T00%3A00%3A00.000000Z%27

image

Expected Behavior

Pandas read_csv function works perfectly for this example, however Modin doesn't.

Error Logs

```python-traceback Replace this line with the error backtrace (if applicable). ```

Installed Versions

INSTALLED VERSIONS ------------------ commit : ba7ab8eb80dd6412f275ffff806db37ff7ebdb01 python : 3.10.8.final.0 python-bits : 64 OS : Linux OS-release : 5.15.0-58-generic Version : #64-Ubuntu SMP Thu Jan 5 11:43:13 UTC 2023 machine : x86_64 processor : x86_64 byteorder : little LC_ALL : None LANG : en_US.UTF-8 LOCALE : en_US.UTF-8 Modin dependencies ------------------ modin : 0.18.0 ray : 2.2.0 dask : 2022.12.1 distributed : 2022.12.1 hdk : None pandas dependencies ------------------- pandas : 1.5.2 numpy : 1.23.4 pytz : 2022.5 dateutil : 2.8.2 setuptools : 65.6.3 pip : 22.3.1 Cython : None pytest : None hypothesis : None sphinx : None blosc : None feather : None xlsxwriter : None lxml.etree : None html5lib : None pymysql : None psycopg2 : None jinja2 : 3.1.2 IPython : 8.7.0 pandas_datareader: None bs4 : None bottleneck : None brotli : None fastparquet : None fsspec : 2022.11.0 gcsfs : None matplotlib : 3.6.2 numba : 0.56.4 numexpr : None odfpy : None openpyxl : None pandas_gbq : None pyarrow : 10.0.0 pyreadstat : None pyxlsb : None s3fs : None scipy : None snappy : None sqlalchemy : None tables : None tabulate : None xarray : None xlrd : None xlwt : None zstandard : None tzdata : None
vnlitvinov commented 1 year ago

Hi @TheWCKD, thanks for reporting!

Could you please provide the error message you're encountering? It would help diagnosing further.

Also, is it possible for you to provide more info on what is running on your http://127.0.0.1:9000, as I cannot reproduce the issue locally without that info?

TheWCKD commented 1 year ago

Hi! @vnlitvinov, The 127.0.0.1:9000 is my database on which I'm running a query returning a downloadable CSV file with the query results. The errors I'm getting are those:

image

YarShev commented 1 year ago

@TheWCKD, can you run ray.init() after declaring the var with url? I suspect workers may not see the url.

TheWCKD commented 1 year ago

@TheWCKD, can you run ray.init() after declaring the var with url? I suspect workers may not see the url.

Same issue, even when I did what you mentioned.

import modin.pandas as pd
import ray

url = 'http://127.0.0.1:9000/exp?query=SELECT+pair%2C+price%2C+ts+FROM+%27klines_1s%27+WHERE+ts+%3E%3D+%272022-01-01T00%3A00%3A00.000000Z%27+AND+ts+%3C+%272022-01-02T00%3A00%3A00.000000Z%27'

ray.init(
    num_cpus=6,
    ignore_reinit_error=True,
    runtime_env={"pip": ["modin"], "env_vars": {"__MODIN_AUTOIMPORT_PANDAS__": "1"}},
)

seconds = pd.read_csv(url)
TheWCKD commented 1 year ago

Seems like the problem starts from the aiohttp package? Here's the full stack trace:

```log 2023-01-17 11:38:59,152 INFO worker.py:1370 -- Calling ray.init() again after it has already been called. Traceback (most recent call last): File "/home/tibi/miniconda3/envs/historical_prices/lib/python3.10/site-packages/aiohttp/client_reqrep.py", line 899, in start message, payload = await protocol.read() # type: ignore[union-attr] File "/home/tibi/miniconda3/envs/historical_prices/lib/python3.10/site-packages/aiohttp/streams.py", line 616, in read await self._waiter File "/home/tibi/miniconda3/envs/historical_prices/lib/python3.10/site-packages/aiohttp/client_proto.py", line 213, in data_received messages, upgraded, tail = self._parser.feed_data(data) File "aiohttp/_http_parser.pyx", line 551, in aiohttp._http_parser.HttpParser.feed_data aiohttp.http_exceptions.BadHttpMessage: 400, message='Expected HTTP/' The above exception was the direct cause of the following exception: Traceback (most recent call last): File "/home/tibi/miniconda3/envs/historical_prices/lib/python3.10/site-packages/fsspec/implementations/http.py", line 407, in _info await _file_info( File "/home/tibi/miniconda3/envs/historical_prices/lib/python3.10/site-packages/fsspec/implementations/http.py", line 788, in _file_info r = await session.get(url, allow_redirects=ar, **kwargs) File "/home/tibi/miniconda3/envs/historical_prices/lib/python3.10/site-packages/aiohttp/client.py", line 560, in _request await resp.start(conn) File "/home/tibi/miniconda3/envs/historical_prices/lib/python3.10/site-packages/aiohttp/client_reqrep.py", line 901, in start raise ClientResponseError( aiohttp.client_exceptions.ClientResponseError: 400, message='Expected HTTP/', url=URL('http://127.0.0.1:9000/exp?query=SELECT+pair,+price,+ts+FROM+'klines_1s'+WHERE+ts+%3E%3D+'2022-01-01T00:00:00.000000Z'+AND+ts+%3C+'2022-01-02T00:00:00.000000Z'') The above exception was the direct cause of the following exception: Traceback (most recent call last): File "/tmp/ipykernel_5635/2623319069.py", line 16, in seconds = pd.read_csv(url) File "/home/tibi/miniconda3/envs/historical_prices/lib/python3.10/site-packages/modin/logging/logger_decorator.py", line 128, in run_and_log return obj(*args, **kwargs) File "/home/tibi/miniconda3/envs/historical_prices/lib/python3.10/site-packages/modin/pandas/io.py", line 218, in read_csv return _read(**kwargs) File "/home/tibi/miniconda3/envs/historical_prices/lib/python3.10/site-packages/modin/pandas/io.py", line 89, in _read pd_obj = FactoryDispatcher.read_csv(**kwargs) File "/home/tibi/miniconda3/envs/historical_prices/lib/python3.10/site-packages/modin/core/execution/dispatching/factories/dispatcher.py", line 185, in read_csv return cls.get_factory()._read_csv(**kwargs) File "/home/tibi/miniconda3/envs/historical_prices/lib/python3.10/site-packages/modin/core/execution/dispatching/factories/factories.py", line 217, in _read_csv return cls.io_cls.read_csv(**kwargs) File "/home/tibi/miniconda3/envs/historical_prices/lib/python3.10/site-packages/modin/logging/logger_decorator.py", line 128, in run_and_log return obj(*args, **kwargs) File "/home/tibi/miniconda3/envs/historical_prices/lib/python3.10/site-packages/modin/core/io/file_dispatcher.py", line 154, in read query_compiler = cls._read(*args, **kwargs) File "/home/tibi/miniconda3/envs/historical_prices/lib/python3.10/site-packages/modin/logging/logger_decorator.py", line 128, in run_and_log return obj(*args, **kwargs) File "/home/tibi/miniconda3/envs/historical_prices/lib/python3.10/site-packages/modin/core/io/text/text_file_dispatcher.py", line 1037, in _read with OpenFile( File "/home/tibi/miniconda3/envs/historical_prices/lib/python3.10/site-packages/modin/core/io/file_dispatcher.py", line 94, in __enter__ return self.file.open() File "/home/tibi/miniconda3/envs/historical_prices/lib/python3.10/site-packages/fsspec/core.py", line 135, in open return self.__enter__() File "/home/tibi/miniconda3/envs/historical_prices/lib/python3.10/site-packages/fsspec/core.py", line 103, in __enter__ f = self.fs.open(self.path, mode=mode) File "/home/tibi/miniconda3/envs/historical_prices/lib/python3.10/site-packages/fsspec/spec.py", line 1106, in open f = self._open( File "/home/tibi/miniconda3/envs/historical_prices/lib/python3.10/site-packages/fsspec/implementations/http.py", line 346, in _open size = size or self.info(path, **kwargs)["size"] File "/home/tibi/miniconda3/envs/historical_prices/lib/python3.10/site-packages/fsspec/asyn.py", line 113, in wrapper return sync(self.loop, func, *args, **kwargs) File "/home/tibi/miniconda3/envs/historical_prices/lib/python3.10/site-packages/fsspec/asyn.py", line 98, in sync raise return_result File "/home/tibi/miniconda3/envs/historical_prices/lib/python3.10/site-packages/fsspec/asyn.py", line 53, in _runner result[0] = await coro File "/home/tibi/miniconda3/envs/historical_prices/lib/python3.10/site-packages/fsspec/implementations/http.py", line 420, in _info raise FileNotFoundError(url) from exc FileNotFoundError: http://127.0.0.1:9000/exp?query=SELECT+pair%2C+price%2C+ts+FROM+%27klines_1s%27+WHERE+ts+%3E%3D+%272022-01-01T00%3A00%3A00.000000Z%27+AND+ts+%3C+%272022-01-02T00%3A00%3A00.000000Z%27 ```
TheWCKD commented 1 year ago

But it's strange because making a HTTP GET request with aiohttp package alone works perfect:

import aiohttp
import asyncio

url = "http://127.0.0.1:9000/exp?query=SELECT+pair%2C+price%2C+ts+FROM+%27klines_1s%27+WHERE+ts+%3E%3D+%272022-01-01T00%3A00%3A00.000000Z%27+AND+ts+%3C+%272022-01-02T00%3A00%3A00.000000Z%27"

async def main():
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as resp:
            print(resp.status)

asyncio.run(main()) # returns 200
TheWCKD commented 1 year ago

I had the same issue with Dask engine too, however they mention the read_csv from Dask only supports few urls, and I can't see one starting with normal http:// image

However, pandas read_csv states it accepts any http: image

You might imagine downloading the csv on the hard drive every time I run a query on my database just to have it read by modin as a .csv file is really inefficient.

YarShev commented 1 year ago

@TheWCKD, can you try to run your example with Dask engine (MODIN_ENGINE=dask) to see if the issue still persists?

TheWCKD commented 1 year ago

@TheWCKD, can you try to run your example with Dask engine (MODIN_ENGINE=dask) to see if the issue still persists?

Yup, same error. I ran the following:

import modin.pandas as pd
import modin.config as cfg
cfg.Engine.put('dask')

url = 'http://127.0.0.1:9000/exp?query=SELECT+pair%2C+price%2C+ts+FROM+%27klines_1s%27+WHERE+ts+%3E%3D+%272022-01-01T00%3A00%3A00.000000Z%27+AND+ts+%3C+%272022-01-02T00%3A00%3A00.000000Z%27'
seconds = pd.read_csv(url)

and I'm still getting the same error: image

YarShev commented 1 year ago

Hmm, strange. @vnlitvinov, did you reproduce the error?

Meanwhile, @TheWCKD, I wonder if you are setting any vars regarding proxy (http_proxy, https_proxy)? Could you try to unset those?

unset http_proxy
unset https_proxy
YarShev commented 1 year ago

Also, I wonder if one of the workers listens to the same port (i.e., 9000)? Can you check it for Dask engine, for example?

import modin.pandas as pd
import modin.config as cfg
from distributed.client import Client

client = Client(n_workers=<your_num_cpus>, threads_per_worker=1)
client.scheduler_info()["workers"]

cfg.Engine.put('dask')

url = 'http://127.0.0.1:9000/exp?query=SELECT+pair%2C+price%2C+ts+FROM+%27klines_1s%27+WHERE+ts+%3E%3D+%272022-01-01T00%3A00%3A00.000000Z%27+AND+ts+%3C+%272022-01-02T00%3A00%3A00.000000Z%27'
seconds = pd.read_csv(url)
TheWCKD commented 1 year ago

Hello again, @YarShev, thanks for holding onto the issue! I don't have any proxy settings, and as I said, the read_csv function from pandas works perfectly fine with the localhost url.

I have even tested it with a REST GET request and this is what I got, maybe it helps: image

I have run your code above and no workers seem to listen to port 9000 as you can see below, and I still got the same error, really, really strange...

image

YarShev commented 1 year ago

Yes, it is really confusing. I wonder if the issue persists with every distributed engine Modin has? Can you try to run Modin with unidist? Here is some info regarding the run of Modin with unidist https://modin.readthedocs.io/en/stable/development/using_pandas_on_unidist.html.

vnlitvinov commented 1 year ago

@YarShev I cannot repro this, as I don't have that specific local server running the SQL-over-http thing. My intuition says that it reacts with a 404 on some repeated request (e.g. when a worker wants to download a piece of the file for parsing) or something similar.

@TheWCKD could you run this code piece and report back?

from modin.core.io.file_dispatcher import OpenFile

with OpenFile(your_url) as f:
    pos = f.tell()
    line = f.readline()
    f.seek(pos)
    blob = f.read(4096)
print(pos, line, hex(blob))
vnlitvinov commented 1 year ago

You might imagine downloading the csv on the hard drive every time I run a query on my database just to have it read by modin as a .csv file is really inefficient.

Also, note that this is only a single piece of the puzzle - pandas and Modin work differently with such URLs. Modin would try to open your URL multiple times from different workers and would try to read chunks from it. Note that, if your server does not cache responses, this would most likely make it execute the query multiple times. So, to make things efficient, one would have to cache the response on a server side to make sure workers are getting exact same output each (maybe by introducing some middleware on the http server).

Having said that, I see that the server seems to be storing some SQL, is there any particular reason you cannot tell Modin to read from SQL directly without an intermediate http server?

TheWCKD commented 1 year ago

@YarShev I cannot repro this, as I don't have that specific local server running the SQL-over-http thing. My intuition says that it reacts with a 404 on some repeated request (e.g. when a worker wants to download a piece of the file for parsing) or something similar.

@TheWCKD could you run this code piece and report back?

from modin.core.io.file_dispatcher import OpenFile

with OpenFile(your_url) as f:
    pos = f.tell()
    line = f.readline()
    f.seek(pos)
    blob = f.read(4096)
print(pos, line, hex(blob))

I ran this exact piece of code and still receiving the same errors:

BadHttpMessage: 400, message='Expected HTTP/'
FileNotFoundError: http://127.0.0.1:9000/exp?query=SELECT+pair%2C+price%2C+ts+FROM+%27klines_1s%27+WHERE+ts+%3E%3D+%272022-01-01T00%3A00%3A00.000000Z%27+AND+ts+%3C+%272022-01-02T00%3A00%3A00.000000Z%27
TheWCKD commented 1 year ago

You might imagine downloading the csv on the hard drive every time I run a query on my database just to have it read by modin as a .csv file is really inefficient.

Also, note that this is only a single piece of the puzzle - pandas and Modin work differently with such URLs. Modin would try to open your URL multiple times from different workers and would try to read chunks from it. Note that, if your server does not cache responses, this would most likely make it execute the query multiple times. So, to make things efficient, one would have to cache the response on a server side to make sure workers are getting exact same output each (maybe by introducing some middleware on the http server).

Having said that, I see that the server seems to be storing some SQL, is there any particular reason you cannot tell Modin to read from SQL directly without an intermediate http server?

Thank you for your extensive review. I am using a fast time-series database service called questdb and the fastest way to query the enormous amount of data I have is through their HTTP REST API (https://questdb.io/docs/develop/query-data/#http-rest-api) by returning a csv. I have previously tried the read_sql methods from pandas, however the database doesn't support postgres server side cursors and I'm unable to read any data this way.

I guess I'll have to stick to pandas read_csv then, perhaps you're right and the database supports only one request for the entire query. Thank you for all your help!

vnlitvinov commented 1 year ago

I have no idea on what questdb is, but their docs claim to support Postgres cursors: https://questdb.io/docs/develop/query-data/#postgresql-wire-protocol, maybe using .read_sql_query() could work? Also, one could compose a rather simple intermediate caching server in Python to put in between your questdb and Modin code... or even just download that URL to a temporary file in your script, read that and remove the file.

With all that, my gut feeling tells me that the bug you're seeing is probably either a bug in fsspec or a problem in questdb (i.e. Modin itself is not directly to blame here). Reference: https://github.com/fsspec/filesystem_spec/pull/701 - if you look at that PR, you'll see that fsspec errors out if it cannot query file size over HTTP, and I'm not sure questdb responds what size is the query result... it might not even support chunked download of the result, which would mean Modin won't be able to parse it in a parallel way.

TheWCKD commented 1 year ago

I have no idea on what questdb is, but their docs claim to support Postgres cursors: https://questdb.io/docs/develop/query-data/#postgresql-wire-protocol, maybe using .read_sql_query() could work? Also, one could compose a rather simple intermediate caching server in Python to put in between your questdb and Modin code... or even just download that URL to a temporary file in your script, read that and remove the file.

With all that, my gut feeling tells me that the bug you're seeing is probably either a bug in fsspec or a problem in questdb (i.e. Modin itself is not directly to blame here). Reference: https://github.com/fsspec/filesystem_spec/pull/701 - if you look at that PR, you'll see that fsspec errors out if it cannot query file size over HTTP, and I'm not sure questdb responds what size is the query result... it might not even support chunked download of the result, which would mean Modin won't be able to parse it in a parallel way.

Yup, it supports cursors, but not server side cursors (https://www.psycopg.org/psycopg3/docs/advanced/cursors.html).

You're probably right, questdb may not respond with the size of the query result. I'll make sure to ask them these specific questions on Slack. For now I'll stick with pandas and if I manage to find out the culprit I'll make sure to leave a comment on this issue. Thanks for your time and help! I'll close this for now.