Open MSKazemi opened 4 years ago
version:
{'scheduler': {'host': (('python', '3.7.4.final.0'),
('python-bits', 64),
('OS', 'Linux'),
('OS-release', '3.10.0-693.21.1.el7.x86_64'),
('machine', 'x86_64'),
('processor', 'x86_64'),
('byteorder', 'little'),
('LC_ALL', 'None'),
('LANG', 'en_US.UTF-8')),
'packages': {'dask': '2.11.0',
'distributed': '2.11.0',
'msgpack': '0.6.2',
'cloudpickle': '1.2.2',
'tornado': '6.0.3',
'toolz': '0.10.0',
'numpy': '1.18.1',
'lz4': None,
'blosc': None}},
'workers': {},
'client': {'host': [('python', '3.7.4.final.0'),
('python-bits', 64),
('OS', 'Linux'),
('OS-release', '3.10.0-693.21.1.el7.x86_64'),
('machine', 'x86_64'),
('processor', 'x86_64'),
('byteorder', 'little'),
('LC_ALL', 'None'),
('LANG', 'en_US.UTF-8')],
'packages': {'dask': '2.11.0',
'distributed': '2.11.0',
'msgpack': '0.6.2',
'cloudpickle': '1.2.2',
'tornado': '6.0.3',
'toolz': '0.10.0',
'numpy': '1.18.1',
'lz4': None,
'blosc': None}}}
Hard to tell what is going on ...
Googling "Connection to scheduler broken. Reconnecting" dask
got me to https://github.com/dask/distributed/issues/2124#issuecomment-405915108. You can try this suggestion: client.get_versions(check=True)
.
I know it may be some effort but I would encourage to put together a stand-alone snippet reproducing the problem so that other people can try to debug it. See https://matthewrocklin.com/blog/work/2018/02/28/minimal-bug-reports
Different directions you can take:
dask-jobqueue
? Can you reproduce with a distributed.LocalCluster
? If that is the case this may be a distributed
issue.Try upgrading to Tornado 6.0.4 which was released this week. It fixed some StreamClosedError
bug. I'm not sure if that's what you're experiencing or not, but it's worth a quick try.
Try upgrading to Tornado 6.0.4 which was released this week. It fixed some
StreamClosedError
bug. I'm not sure if that's what you're experiencing or not, but it's worth a quick try.
Thank you, The installed version was 6.0.3 which I updated to version 6.0.4 but there is still a same error.
Hard to tell what is going on ...
Googling
"Connection to scheduler broken. Reconnecting" dask
got me to dask/distributed#2124 (comment). You can try this suggestion:client.get_versions(check=True)
.The versions on different nodes are the same.
I know it may be some effort but I would encourage to put together a stand-alone snippet reproducing the problem so that other people can try to debug it. See https://matthewrocklin.com/blog/work/2018/02/28/minimal-bug-reportsDifferent directions you can take:
- is it related to
dask-jobqueue
? Can you reproduce with adistributed.LocalCluster
? If that is the case this may be adistributed
issue.- Can you create a simpler script (part of your data), generated random data that recreates the issue
I do not have that error on the LocalCluster. It arises just in the cluster.
The simple version of my scripts.
import pandas as pd, numpy as np
from fastparquet import ParquetFile
from dask_jobqueue import SLURMCluster
from dask.distributed import Client
import dask.dataframe as ds
cluster = SLURMCluster(cores=36,
processes=12,
memory='115 GB',
project='YYY',
queue='ZZZ',
shebang='#!/bin/bash',
local_directory='X/X/X',
walltime='02:00:00',
interface ='ib0',
death_timeout=36000)
dask.config.set({'distributed.admin.tick.limit':'3h'})
# cluster.start_workers(36)
cluster.scale(36)
client = Client(cluster)
def q95(x):
return x.quantile(0.95)
path_name = 'ABC'
data = ds.read_parquet(path_name,columns=['node','value'])
data['value'] = data['value'].astype('float')
node = data.groupby('node').apply(q95,meta={'value': 'float'})
node.to_csv('Result.csv')
Thanks for your code! This would be great if you could provide the data so that someone can try to reproduce. If the data is something you can not share, please try to reproduce on some random generated data. This is explained in more details in https://matthewrocklin.com/blog/work/2018/02/28/minimal-bug-reports.
Maybe https://github.com/dask/distributed/issues/2801#issuecomment-536187211 can be relevant (I googled "full garbage collections took" dask
).
Thanks for your code! This would be great if you could provide the data so that someone can try to reproduce. If the data is something you can not share, please try to reproduce on some random generated data. This is explained in more details in https://matthewrocklin.com/blog/work/2018/02/28/minimal-bug-reports.
Maybe dask/distributed#2801 (comment) can be relevant (I googled
"full garbage collections took" dask
).
Scripts to generate the artefactual data set. Originally, size of the dataset is 100GB which separated in 9000 parts.
import pandas as pd
import numpy as np
import dask.dataframe as ds
row_num = 10000000
data = ds.from_pandas(pd.DataFrame({'value' : np.random.randint(25, 50, size=(row_num)),
'node': ['x' + str(np.random.randint(0, 50)) + 'y' + str(np.random.randint(25, 50)) + 'z' + str(np.random.randint(25, 50)) for i in range(row_num)],
'timestamp' : 1000*np.random.randint(1550000000, 1583692360, size=(row_num))}) ,npartitions = 100)
data.to_parquet('testData')
Please, if you want to maximise the likelihood of people helping you you need to make it easy to run your code.
In this particular instance, it would help a lot if you could put all your code in a single snippet that someone can try to copy and paste (OK in this case it won't work right away because nobody has the same cluster as yours but hopefully you see my point).
Something you could try also is to post on the Gitter Dask channel to see if people have suggestions off the top of their head at leat on how to debug your problem.
I personally don't have any strong experience neither with Parquet files nor with the dask Dataframe API, sorry!
In the following code first I define the parameters then the artifactual random dataset is created and finally, we have very simple data analysis. The following scripts work for the small data set for example data_row_num = 1000000. The problem is when I increase the size of data with increasing the data_row_num in code it seems that it stops working. Although I increase the computing resources, I have not any CPU and Memory usage, then the job killed automatically, and in the log file of the, I have the following errors. Used to be I have the same error on a big dataset with more cluster resources, But at least in that version, I have memory and CPU usage. In this sample version with less compute resources I do not have the memory and CPU usage. Maybe the resources are not enough but why it does not use the available resource.
import os, datetime, sys, time, pandas as pd, numpy as np
from fastparquet import ParquetFile
from dask_jobqueue import SLURMCluster
from dask.distributed import Client
import dask.dataframe as ds
import dask
#---------------------------------------------------------
# Parameters:
cores=12
processes=12
memory = str(cores*3.5)+' GB'
walltime='01:00:00'
cluster_scale = 1
path_server = '/X/Y/'
np.random.seed(120)
data_row_num = 100000000
data_npartitions = 100
file_name = 'testData'
resultName = 'reuslt'
#---------------------------------------------------------
# Cluster Definition
cluster = SLURMCluster(cores = cores,
processes = processes,
memory = memory,
project = 'X',
queue = 'X',
shebang = '#!/bin/bash',
local_directory = path_server,
walltime = walltime,
interface ='ib0',
death_timeout = 36000)
# extra=['--memory-limit 0']
dask.config.set({'distributed.admin.tick.limit':'3h'})
cluster.scale(cluster_scale)
client = Client(cluster)
#---------------------------------------------------------
# Create the Artifactual Dataset
data = ds.from_pandas(pd.DataFrame({'value' : np.random.randint(25, 50, size=(data_row_num)),
'node': ['r' + str(np.random.randint(1, 51)) + 'c' + str(np.random.randint(1, 19)) + 's' + str(np.random.randint(1, 5)) for i in range(data_row_num)],
'timestamp' : 1000*np.random.randint(1550000000, 1583692360, size=(data_row_num))}) ,npartitions = data_npartitions)
data.to_parquet(path_server + file_name)
print('Artifactual Dataset Created.')
#---------------------------------------------------------
# Data Analysis
from fastparquet import ParquetFile
import dask.dataframe as ds
import numpy as np, pandas as pd
def q95(x):
return x.quantile(0.95)
path_name = path_server+file_name
print ('start to read')
data = ds.read_parquet(path_name,columns=['node','value'])
# data['node'] = data['node'].astype(str)
data['value'] = data['value'].astype('float')
node95 = data.groupby('node').apply(q95,meta={'value': 'float'})
node95.to_csv(path_server + resultName)
Log:
distributed.nanny - INFO - Start Nanny at: 'tcp://10.27.50.45:26400'
distributed.worker - INFO - Start worker at: tcp://10.27.50.45:17107
distributed.worker - INFO - Listening to: tcp://10.27.50.45:17107
distributed.worker - INFO - dashboard at: 10.27.50.45:18175
distributed.worker - INFO - Waiting to connect to: tcp://10.27.33.5:14348
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - Threads: 12
distributed.worker - INFO - Memory: 42.00 GB
distributed.worker - INFO - Local Directory: /gpfs/scratch/userexternal/mseyedka/worker-bvx8pg8x
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - Registered to: tcp://10.27.33.5:14348
distributed.worker - INFO - -------------------------------------------------
distributed.core - INFO - Starting established connection
distributed.worker - INFO - Connection to scheduler broken. Reconnecting...
tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <tornado.platform.asyncio.AsyncIOLoop object at 0x2af57c583990>>, <Task finished coro=<Worker.heartbeat() done, defined at /galileo/home/userexternal/mseyedka/miniconda3/lib/python3.7/site-packages/distributed/worker.py:883> exception=CommClosedError('in <closed TCP>: ConnectionResetError: [Errno 104] Connection reset by peer')>)
Traceback (most recent call last):
File "/galileo/home/userexternal/mseyedka/miniconda3/lib/python3.7/site-packages/distributed/comm/tcp.py", line 188, in read
n_frames = await stream.read_bytes(8)
tornado.iostream.StreamClosedError: Stream is closed
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/galileo/home/userexternal/mseyedka/miniconda3/lib/python3.7/site-packages/tornado/ioloop.py", line 743, in _run_callback
ret = callback()
File "/galileo/home/userexternal/mseyedka/miniconda3/lib/python3.7/site-packages/tornado/ioloop.py", line 767, in _discard_future_result
future.result()
File "/galileo/home/userexternal/mseyedka/miniconda3/lib/python3.7/site-packages/distributed/worker.py", line 920, in heartbeat
raise e
File "/galileo/home/userexternal/mseyedka/miniconda3/lib/python3.7/site-packages/distributed/worker.py", line 893, in heartbeat
metrics=await self.get_metrics(),
File "/galileo/home/userexternal/mseyedka/miniconda3/lib/python3.7/site-packages/distributed/utils_comm.py", line 391, in retry_operation
operation=operation,
File "/galileo/home/userexternal/mseyedka/miniconda3/lib/python3.7/site-packages/distributed/utils_comm.py", line 379, in retry
return await coro()
File "/galileo/home/userexternal/mseyedka/miniconda3/lib/python3.7/site-packages/distributed/core.py", line 757, in send_recv_from_rpc
result = await send_recv(comm=comm, op=key, **kwargs)
File "/galileo/home/userexternal/mseyedka/miniconda3/lib/python3.7/site-packages/distributed/core.py", line 540, in send_recv
response = await comm.read(deserializers=deserializers)
File "/galileo/home/userexternal/mseyedka/miniconda3/lib/python3.7/site-packages/distributed/comm/tcp.py", line 208, in read
convert_stream_closed_error(self, e)
File "/galileo/home/userexternal/mseyedka/miniconda3/lib/python3.7/site-packages/distributed/comm/tcp.py", line 121, in convert_stream_closed_error
raise CommClosedError("in %s: %s: %s" % (obj, exc.__class__.__name__, exc))
distributed.comm.core.CommClosedError: in <closed TCP>: ConnectionResetError: [Errno 104] Connection reset by peer
distributed.worker - INFO - Stopping worker at tcp://10.27.50.45:17107
distributed.nanny - INFO - Worker closed
distributed.nanny - INFO - Closing Nanny at 'tcp://10.27.50.45:26400'
distributed.dask_worker - INFO - End worker
Did you find the solution?
Did you find the solution? No.
any solution to above problem?. I am aslo facing similar problem if i try to iterate my function over 10k iterations but works perfectly for 200 iterations
Hi @vishnuk94, are you getting this exception with the same code as @MSKazemi?
I finally took some time to try @MSKazemi code. I'm still at the data generation part, it is first generating a dataset in memory using Pandas which takes a long time. It would be better to generate the dataset directly using Dask DataFrame.
Okay, so I finally executed all the code above using a PBSCluster
on an HPC facility and it finished without error.
What I noticed is that in the Dashboard I see a lot of red bars, meaning a lot of shuffle and exchange between processes. But nothing other than that.
No my code is on using dask.delayed to paralellize a for loop to run a model over large number of iterations(say >20k). However when i tried to run the model with for loop over small iterables( say<200). it works perfectly. Persuming that this can be due to memory error i added following lines in Slurm script As initially there are some warnings saying memory is 95 % of the capacity ulimit -s unlimited ulimit -v unlimited But it did not solved the problem
I am trying to do data analysis on the 9900 parquet files that in total they have 100GB size.
After 70K garbage collections warning:
distributed.utils_perf - WARNING - full garbage collections took 60% CPU time recently (threshold: 10%)
My job killed and there is the following error.