modin-project / modin

Modin: Scale your Pandas workflows by changing a single line of code
http://modin.readthedocs.io
Apache License 2.0
9.91k stars 653 forks source link

BUG: modin.pandas raises dask error #5510

Open kaijennissen opened 1 year ago

kaijennissen commented 1 year ago

Modin version checks

Reproducible Example

import modin.pandas as pd

df = pd.DataFrame({"Date": pd.date_range("2001-01-01", freq="d", periods=12)})
df["Year"] = df.Date.apply(lambda x: x.isocalendar().year)
df["Week"] = df.Date.apply(lambda x: x.isocalendar().week)
df["Yearweek"] = df.Year * 100 + df.Week

df["Monday"] = pd.to_datetime(
    df.Year.astype(str) + "-" + df.Week.astype(str) + "-" + "1", format="%Y-%W-%w"
)

Issue Description

In 2 out of 3 calls to the above code the error message displayed below is raised. While I first thought that it was related to pd.to_datetime the message also was displayed after a call to df.Date.apply.

Expected Behavior

I'd expect the code to either work or raise an error explaining what I'm doing wrong.

Error Logs

```python-traceback 2022-12-31 15:19:43,724 - distributed.worker - ERROR - Exception during execution of task lambda-3ade1fb0d35c6d6f924bbaaf93130a25. Traceback (most recent call last): File "/usr/local/Caskroom/miniconda/base/envs/modin/lib/python3.9/site-packages/distributed/worker.py", line 2364, in _prepare_args_for_execution data[k] = self.data[k] File "/usr/local/Caskroom/miniconda/base/envs/modin/lib/python3.9/site-packages/distributed/spill.py", line 269, in __getitem__ return super().__getitem__(key) File "/usr/local/Caskroom/miniconda/base/envs/modin/lib/python3.9/site-packages/zict/buffer.py", line 108, in __getitem__ raise KeyError(key) KeyError: 'deploy_dask_func-eb9b23df-06bb-4b51-aef4-309397fb2b2b' During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/usr/local/Caskroom/miniconda/base/envs/modin/lib/python3.9/site-packages/distributed/worker.py", line 2246, in execute args2, kwargs2 = self._prepare_args_for_execution(ts, args, kwargs) File "/usr/local/Caskroom/miniconda/base/envs/modin/lib/python3.9/site-packages/distributed/worker.py", line 2368, in _prepare_args_for_execution data[k] = Actor(type(self.state.actors[k]), self.address, k, self) KeyError: 'deploy_dask_func-eb9b23df-06bb-4b51-aef4-309397fb2b2b' ```

Installed Versions

INSTALLED VERSIONS ------------------ commit : 5ebbac683321d23c975d059a995eb7cc1b6353e9 python : 3.9.15.final.0 python-bits : 64 OS : Darwin OS-release : 22.2.0 Version : Darwin Kernel Version 22.2.0: Fri Nov 11 02:08:47 PST 2022; root:xnu-8792.61.2~4/RELEASE_X86_64 machine : x86_64 processor : i386 byteorder : little LC_ALL : None LANG : en_US.UTF-8 LOCALE : None.UTF-8 Modin dependencies ------------------ modin : 0.18.0+25.g5ebbac68 ray : 2.2.0 dask : 2022.12.1 distributed : 2022.12.1 hdk : None pandas dependencies ------------------- pandas : 1.5.2 numpy : 1.24.1 pytz : 2022.7 dateutil : 2.8.2 setuptools : 65.6.3 pip : 22.3.1 Cython : None pytest : None hypothesis : None sphinx : None blosc : None feather : None xlsxwriter : None lxml.etree : None html5lib : None pymysql : None psycopg2 : None jinja2 : 3.1.2 IPython : 8.7.0 pandas_datareader: None bs4 : 4.11.1 bottleneck : None brotli : fastparquet : None fsspec : 2022.11.0 gcsfs : None matplotlib : None numba : None numexpr : None odfpy : None openpyxl : None pandas_gbq : None pyarrow : 10.0.1 pyreadstat : None pyxlsb : None s3fs : None scipy : None snappy : None sqlalchemy : None tables : None tabulate : None xarray : None xlrd : None xlwt : None zstandard : None tzdata : None
RehanSD commented 1 year ago

Hi @kaijennissen! Thank you so much for opening this bug! I tried to reproduce it locally, but found that the script failed for a different reason - the output of isocalendar() was a tuple, rather than a named tuple, and so calling year and week failed. When I replaced it with the appropriate index 0, and 1, like below, the whole script worked:

import modin.pandas as pd

df = pd.DataFrame({"Date": pd.date_range("2001-01-01", freq="d", periods=12)})
df["Year"] = df.Date.apply(lambda x: x.isocalendar()[0])
df["Week"] = df.Date.apply(lambda x: x.isocalendar()[1])
df["Yearweek"] = df.Year * 100 + df.Week

df["Monday"] = pd.to_datetime(
    df.Year.astype(str) + "-" + df.Week.astype(str) + "-" + "1", format="%Y-%W-%w"
)

Please let me know if that resolves the issue for you!

kaijennissen commented 1 year ago

For me it's a named tuple.

>>> df.Date[0].isocalendar()
datetime.IsoCalendarDate(year=2001, week=1, weekday=1)

But using your code still raises the same error message. I've also tried to run the code as a script which raises the following error message. I suspect that dask is broken despite having set up a new conda env.

```python-traceback UserWarning: Dask execution environment not yet initialized. Initializing... To remove this warning, run the following python code before doing dataframe operations: from distributed import Client client = Client() UserWarning: Dask execution environment not yet initialized. Initializing... To remove this warning, run the following python code before doing dataframe operations: from distributed import Client client = Client() UserWarning: Dask execution environment not yet initialized. Initializing... To remove this warning, run the following python code before doing dataframe operations: from distributed import Client client = Client() UserWarning: Dask execution environment not yet initialized. Initializing... To remove this warning, run the following python code before doing dataframe operations: from distributed import Client client = Client() UserWarning: Dask execution environment not yet initialized. Initializing... To remove this warning, run the following python code before doing dataframe operations: from distributed import Client client = Client() UserWarning: Dask execution environment not yet initialized. Initializing... To remove this warning, run the following python code before doing dataframe operations: from distributed import Client client = Client() UserWarning: Dask execution environment not yet initialized. Initializing... To remove this warning, run the following python code before doing dataframe operations: from distributed import Client client = Client() UserWarning: Dask execution environment not yet initialized. Initializing... To remove this warning, run the following python code before doing dataframe operations: from distributed import Client client = Client() UserWarning: Dask execution environment not yet initialized. Initializing... To remove this warning, run the following python code before doing dataframe operations: from distributed import Client client = Client() UserWarning: Dask execution environment not yet initialized. Initializing... To remove this warning, run the following python code before doing dataframe operations: from distributed import Client client = Client() UserWarning: Dask execution environment not yet initialized. Initializing... To remove this warning, run the following python code before doing dataframe operations: from distributed import Client client = Client() UserWarning: Dask execution environment not yet initialized. Initializing... To remove this warning, run the following python code before doing dataframe operations: from distributed import Client client = Client() UserWarning: Dask execution environment not yet initialized. Initializing... To remove this warning, run the following python code before doing dataframe operations: from distributed import Client client = Client() UserWarning: Dask execution environment not yet initialized. Initializing... To remove this warning, run the following python code before doing dataframe operations: from distributed import Client client = Client() UserWarning: Dask execution environment not yet initialized. Initializing... To remove this warning, run the following python code before doing dataframe operations: from distributed import Client client = Client() UserWarning: Dask execution environment not yet initialized. Initializing... To remove this warning, run the following python code before doing dataframe operations: from distributed import Client client = Client() UserWarning: Dask execution environment not yet initialized. Initializing... To remove this warning, run the following python code before doing dataframe operations: from distributed import Client client = Client() UserWarning: Port 8787 is already in use. Perhaps you already have a cluster running? Hosting the HTTP server on port 59053 instead UserWarning: Port 8787 is already in use. Perhaps you already have a cluster running? Hosting the HTTP server on port 59054 instead UserWarning: Port 8787 is already in use. Perhaps you already have a cluster running? Hosting the HTTP server on port 59055 instead UserWarning: Port 8787 is already in use. Perhaps you already have a cluster running? Hosting the HTTP server on port 59056 instead UserWarning: Port 8787 is already in use. Perhaps you already have a cluster running? Hosting the HTTP server on port 59062 instead UserWarning: Port 8787 is already in use. Perhaps you already have a cluster running? Hosting the HTTP server on port 59063 instead UserWarning: Port 8787 is already in use. Perhaps you already have a cluster running? Hosting the HTTP server on port 59064 instead UserWarning: Port 8787 is already in use. Perhaps you already have a cluster running? Hosting the HTTP server on port 59066 instead UserWarning: Port 8787 is already in use. Perhaps you already have a cluster running? Hosting the HTTP server on port 59068 instead UserWarning: Port 8787 is already in use. Perhaps you already have a cluster running? Hosting the HTTP server on port 59069 instead UserWarning: Port 8787 is already in use. Perhaps you already have a cluster running? Hosting the HTTP server on port 59070 instead UserWarning: Port 8787 is already in use. Perhaps you already have a cluster running? Hosting the HTTP server on port 59073 instead UserWarning: Port 8787 is already in use. Perhaps you already have a cluster running? Hosting the HTTP server on port 59074 instead UserWarning: Port 8787 is already in use. Perhaps you already have a cluster running? Hosting the HTTP server on port 59079 instead UserWarning: Port 8787 is already in use. Perhaps you already have a cluster running? Hosting the HTTP server on port 59082 instead UserWarning: Port 8787 is already in use. Perhaps you already have a cluster running? Hosting the HTTP server on port 59083 instead 2023-01-03 12:26:30,982 - distributed.nanny - ERROR - Failed to start process Traceback (most recent call last): File "/usr/local/Caskroom/miniconda/base/envs/modin/lib/python3.9/site-packages/distributed/nanny.py", line 442, in instantiate result = await self.process.start() File "/usr/local/Caskroom/miniconda/base/envs/modin/lib/python3.9/site-packages/distributed/nanny.py", line 707, in start await self.process.start() File "/usr/local/Caskroom/miniconda/base/envs/modin/lib/python3.9/site-packages/distributed/process.py", line 38, in _call_and_set_future res = func(*args, **kwargs) File "/usr/local/Caskroom/miniconda/base/envs/modin/lib/python3.9/site-packages/distributed/process.py", line 201, in _start process.start() File "/usr/local/Caskroom/miniconda/base/envs/modin/lib/python3.9/multiprocessing/process.py", line 121, in start self._popen = self._Popen(self) File "/usr/local/Caskroom/miniconda/base/envs/modin/lib/python3.9/multiprocessing/context.py", line 284, in _Popen return Popen(process_obj) File "/usr/local/Caskroom/miniconda/base/envs/modin/lib/python3.9/multiprocessing/popen_spawn_posix.py", line 32, in __init__ super().__init__(process_obj) File "/usr/local/Caskroom/miniconda/base/envs/modin/lib/python3.9/multiprocessing/popen_fork.py", line 19, in __init__ self._launch(process_obj) File "/usr/local/Caskroom/miniconda/base/envs/modin/lib/python3.9/multiprocessing/popen_spawn_posix.py", line 42, in _launch prep_data = spawn.get_preparation_data(process_obj._name) File "/usr/local/Caskroom/miniconda/base/envs/modin/lib/python3.9/multiprocessing/spawn.py", line 154, in get_preparation_data _check_not_importing_main() File "/usr/local/Caskroom/miniconda/base/envs/modin/lib/python3.9/multiprocessing/spawn.py", line 134, in _check_not_importing_main raise RuntimeError(''' RuntimeError: An attempt has been made to start a new process before the current process has finished its bootstrapping phase. This probably means that you are not using fork to start your child processes and you have forgotten to use the proper idiom in the main module: if __name__ == '__main__': freeze_support() ... The "freeze_support()" line can be omitted if the program is not going to be frozen to produce an executable. ```
RehanSD commented 1 year ago

For me it's a named tuple.


>>> df.Date[0].isocalendar()

datetime.IsoCalendarDate(year=2001, week=1, weekday=1)

But using your code still raises the same error message. I've also tried to run the code as a script which raises the following error message. I suspect that dask is broken despite having set up a new conda env.

```python-traceback UserWarning: Dask execution environment not yet initialized. Initializing... To remove this warning, run the following python code before doing dataframe operations: from distributed import Client client = Client() UserWarning: Dask execution environment not yet initialized. Initializing... To remove this warning, run the following python code before doing dataframe operations: from distributed import Client client = Client() UserWarning: Dask execution environment not yet initialized. Initializing... To remove this warning, run the following python code before doing dataframe operations: from distributed import Client client = Client() UserWarning: Dask execution environment not yet initialized. Initializing... To remove this warning, run the following python code before doing dataframe operations: from distributed import Client client = Client() UserWarning: Dask execution environment not yet initialized. Initializing... To remove this warning, run the following python code before doing dataframe operations: from distributed import Client client = Client() UserWarning: Dask execution environment not yet initialized. Initializing... To remove this warning, run the following python code before doing dataframe operations: from distributed import Client client = Client() UserWarning: Dask execution environment not yet initialized. Initializing... To remove this warning, run the following python code before doing dataframe operations: from distributed import Client client = Client() UserWarning: Dask execution environment not yet initialized. Initializing... To remove this warning, run the following python code before doing dataframe operations: from distributed import Client client = Client() UserWarning: Dask execution environment not yet initialized. Initializing... To remove this warning, run the following python code before doing dataframe operations: from distributed import Client client = Client() UserWarning: Dask execution environment not yet initialized. Initializing... To remove this warning, run the following python code before doing dataframe operations: from distributed import Client client = Client() UserWarning: Dask execution environment not yet initialized. Initializing... To remove this warning, run the following python code before doing dataframe operations: from distributed import Client client = Client() UserWarning: Dask execution environment not yet initialized. Initializing... To remove this warning, run the following python code before doing dataframe operations: from distributed import Client client = Client() UserWarning: Dask execution environment not yet initialized. Initializing... To remove this warning, run the following python code before doing dataframe operations: from distributed import Client client = Client() UserWarning: Dask execution environment not yet initialized. Initializing... To remove this warning, run the following python code before doing dataframe operations: from distributed import Client client = Client() UserWarning: Dask execution environment not yet initialized. Initializing... To remove this warning, run the following python code before doing dataframe operations: from distributed import Client client = Client() UserWarning: Dask execution environment not yet initialized. Initializing... To remove this warning, run the following python code before doing dataframe operations: from distributed import Client client = Client() UserWarning: Dask execution environment not yet initialized. Initializing... To remove this warning, run the following python code before doing dataframe operations: from distributed import Client client = Client() UserWarning: Port 8787 is already in use. Perhaps you already have a cluster running? Hosting the HTTP server on port 59053 instead UserWarning: Port 8787 is already in use. Perhaps you already have a cluster running? Hosting the HTTP server on port 59054 instead UserWarning: Port 8787 is already in use. Perhaps you already have a cluster running? Hosting the HTTP server on port 59055 instead UserWarning: Port 8787 is already in use. Perhaps you already have a cluster running? Hosting the HTTP server on port 59056 instead UserWarning: Port 8787 is already in use. Perhaps you already have a cluster running? Hosting the HTTP server on port 59062 instead UserWarning: Port 8787 is already in use. Perhaps you already have a cluster running? Hosting the HTTP server on port 59063 instead UserWarning: Port 8787 is already in use. Perhaps you already have a cluster running? Hosting the HTTP server on port 59064 instead UserWarning: Port 8787 is already in use. Perhaps you already have a cluster running? Hosting the HTTP server on port 59066 instead UserWarning: Port 8787 is already in use. Perhaps you already have a cluster running? Hosting the HTTP server on port 59068 instead UserWarning: Port 8787 is already in use. Perhaps you already have a cluster running? Hosting the HTTP server on port 59069 instead UserWarning: Port 8787 is already in use. Perhaps you already have a cluster running? Hosting the HTTP server on port 59070 instead UserWarning: Port 8787 is already in use. Perhaps you already have a cluster running? Hosting the HTTP server on port 59073 instead UserWarning: Port 8787 is already in use. Perhaps you already have a cluster running? Hosting the HTTP server on port 59074 instead UserWarning: Port 8787 is already in use. Perhaps you already have a cluster running? Hosting the HTTP server on port 59079 instead UserWarning: Port 8787 is already in use. Perhaps you already have a cluster running? Hosting the HTTP server on port 59082 instead UserWarning: Port 8787 is already in use. Perhaps you already have a cluster running? Hosting the HTTP server on port 59083 instead 2023-01-03 12:26:30,982 - distributed.nanny - ERROR - Failed to start process Traceback (most recent call last): File "/usr/local/Caskroom/miniconda/base/envs/modin/lib/python3.9/site-packages/distributed/nanny.py", line 442, in instantiate result = await self.process.start() File "/usr/local/Caskroom/miniconda/base/envs/modin/lib/python3.9/site-packages/distributed/nanny.py", line 707, in start await self.process.start() File "/usr/local/Caskroom/miniconda/base/envs/modin/lib/python3.9/site-packages/distributed/process.py", line 38, in _call_and_set_future res = func(*args, **kwargs) File "/usr/local/Caskroom/miniconda/base/envs/modin/lib/python3.9/site-packages/distributed/process.py", line 201, in _start process.start() File "/usr/local/Caskroom/miniconda/base/envs/modin/lib/python3.9/multiprocessing/process.py", line 121, in start self._popen = self._Popen(self) File "/usr/local/Caskroom/miniconda/base/envs/modin/lib/python3.9/multiprocessing/context.py", line 284, in _Popen return Popen(process_obj) File "/usr/local/Caskroom/miniconda/base/envs/modin/lib/python3.9/multiprocessing/popen_spawn_posix.py", line 32, in __init__ super().__init__(process_obj) File "/usr/local/Caskroom/miniconda/base/envs/modin/lib/python3.9/multiprocessing/popen_fork.py", line 19, in __init__ self._launch(process_obj) File "/usr/local/Caskroom/miniconda/base/envs/modin/lib/python3.9/multiprocessing/popen_spawn_posix.py", line 42, in _launch prep_data = spawn.get_preparation_data(process_obj._name) File "/usr/local/Caskroom/miniconda/base/envs/modin/lib/python3.9/multiprocessing/spawn.py", line 154, in get_preparation_data _check_not_importing_main() File "/usr/local/Caskroom/miniconda/base/envs/modin/lib/python3.9/multiprocessing/spawn.py", line 134, in _check_not_importing_main raise RuntimeError(''' RuntimeError: An attempt has been made to start a new process before the current process has finished its bootstrapping phase. This probably means that you are not using fork to start your child processes and you have forgotten to use the proper idiom in the main module: if __name__ == '__main__': freeze_support() ... The "freeze_support()" line can be omitted if the program is not going to be frozen to produce an executable. ```

That makes sense! It seems that the issue here is with starting up Dask - even the original trace seems more to be an internal Dask error, although I can't be certain without seeing the full trace. What happens if you kill all Dask processes and restart? Could you try using Ray as an engine instead to see if the error persists (making it a Modin error) or if it goes away (implying it's an error internal to Dask)!

YarShev commented 1 year ago

@kaijennissen, can you put your code under if __name__ == '__main__':? Here you can find more info about the issue.

https://modin.readthedocs.io/en/stable/getting_started/troubleshooting.html#error-when-using-dask-engine-runtimeerror-if-name-main

kaijennissen commented 1 year ago

@RehanSD Running the script with rayworks fine.

@YarShev
Placing the code inside if __name__ == __'main__' fixes the above error but does not fix the initial error.