dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.58k stars 718 forks source link

Closing dangling stream #2507

Open dl9900 opened 5 years ago

dl9900 commented 5 years ago

I'm creating temporary Clients to run custom task graphs on my remote cluster and was having problems with lots of socket connections sticking around and results stuck on the cluster due to exceptions causing the results on Futures to not be requested.

Explicitly calling Client.close() seemed to fix all that as I don't see any more stuck things on the cluster after an exceptions. But now I'm seeing a TCP connection not getting closed cleanly. Here's some code to reproduce the problem:

from dask.distributed import Client, as_completed                 
import uuid                                                       

def get_results():                                                
    c = Client('HOSTNAME:8786', set_as_default=False)

    def daskfn():                                                 
        return 'results'                                          

    futs = []                                                     
    for i in range(100):                                          
        key = f'testfn-{uuid.uuid4()}'                            
        futs.append(c.get({key: (daskfn, )}, key, sync=False))    

    results = []                                                  
    try:                                                          
        for f in as_completed(futs):                              
            results.append(f.result())                            
    finally:                                                      
        c.close()                                                 
    return results 

results = get_results()        

After this, using the command line program ss, I see a socket in CLOSE-WAIT state. And, if I do something that triggers garbage collection or call gc.collect(), I see the following warning:

distributed.comm.tcp - WARNING - Closing dangling stream in <TCP local=tcp://IPADDR:41742 remote=tcp://HOSTNAME:8786>
dl9900 commented 5 years ago

After some more research, it looks like instantiating that single Client object creates 2 connections to the scheduler. One from Client._ensure_connected() and another from yield self.scheduler.identity() which seems to create an rpc object which never gets cleaned up by Client.close(), but then gets garbage collected since there are no longer any references other than the WeakRef in rpc.available

dl9900 commented 5 years ago

Commenting out this line in rpc.close_comms() solves my problem.

yield comm.write({'op': 'close', 'reply': False})

I'm guessing this comm.write() call either doesn't return before the TCPConnection gets garbage collected, or doesn't ever return? Anyone have any thoughts on what's going on and how to avoid it? Am I using the Client() in an unintended way?

mrocklin commented 5 years ago

Thanks for the excellent bug report @dl9900

It looks like we do intend to close the Client.scheduler PooledRPC object

https://github.com/dask/distributed/blob/0a5b8da4041414e1e621c1b8d613730720c6e684/distributed/client.py#L1119-L1120

However it looks like when we migrated this object from a normal rpc object to a PooledRPC object we neglected to implement an explicit close mechanism.

https://github.com/dask/distributed/blob/0a5b8da4041414e1e621c1b8d613730720c6e684/distributed/core.py#L649-L650

Compared to the method for rpc

https://github.com/dask/distributed/blob/0a5b8da4041414e1e621c1b8d613730720c6e684/distributed/core.py#L589-L593

It sounds like maybe we should revisit this, and see if there is something we can do to more cleanly close things out.

Is this something that you have time to explore? If so, that would be very welcome.

CMCDragonkai commented 5 years ago

I use a context manager like this:

@contextlib.contextmanager
def scheduler_ctx(remote=None):
    try:
        if remote is None:
            cluster = dask.distributed.LocalCluster(
                scheduler_port=0, diagnostics_port=0)
        else:
            cluster = remote
        scheduler = dask.distributed.Client(cluster, set_as_default=False)
        yield scheduler
    finally:
        scheduler.close()
        if isinstance(cluster, dask.distributed.LocalCluster):
            cluster.close()

And I notice that I still get warnings even when I've ran scheduler.close() or cluster.close(). These warnings make me think that I'm not "shutting down" the system properly, but I don't see any way to solve the warnings.

dl9900 commented 5 years ago

Yes, the warnings are from when the connections get garbage collected, because as @mrocklin said above, the close methods haven't been implemented yet. I haven't had time to look at this, and the warnings are benign so far, so hasn't been a priority for my users.

hoangthienan95 commented 5 years ago

Does anyone know if this has been resolved? I'm having those warnings even when using LocalCluster and it makes using Dask much slower than not using it...

marchezinixd commented 5 years ago

Any news on this issue?

mrocklin commented 5 years ago

If there is news then it will be reported here. If this issue is important to you @marchezinixd then you might consider taking on the work outlined above in https://github.com/dask/distributed/issues/2507#issuecomment-462058821

kylejn27 commented 5 years ago

I'll take a look at this

jrbourbeau commented 5 years ago

Great, thanks @kylejn27!

kylejn27 commented 5 years ago

I'm only able to reproduce when writing or reading large files into dask dataframes, the original example produced no warnings for me.

Here's my reproduction using to_csv with the latest distributed version, commit ec1ffaa:

cluster = LocalCluster()
client = Client(cluster)

n_centers = 12
n_features = 20
n_samples_per_block = 380000
n_blocks = 500
centers = np.zeros((n_centers, n_features))

X, y = make_classification(n_samples=10000, n_features=4, random_state=0)
X_small, y_small = make_blobs(n_samples=1000, centers=12, n_features=n_features, random_state=0)
for i in range(n_centers):
    centers[i] = X_small[y_small == i].mean(0)
delayeds = [dask.delayed(make_blobs)(n_samples=n_samples_per_block,
                                     centers=centers,
                                     n_features=n_features,
                                     random_state=i)[0]
            for i in range(n_blocks)]
X = da.concatenate([da.from_delayed(obj, shape=(n_samples_per_block, n_features), dtype=X.dtype)
          for obj in delayeds])

df = dd.from_array(X)
df.to_csv('large_random_dataset.csv')
distributed.comm.tcp - WARNING - Closing dangling stream in <TCP  local=tcp://127.0.0.1:62036 remote=tcp://127.0.0.1:61745>
distributed.comm.tcp - WARNING - Closing dangling stream in <TCP  local=tcp://127.0.0.1:62037 remote=tcp://127.0.0.1:61745>
...
...
kylejn27 commented 5 years ago

~Just got more time to take a look at this problem. Seems like it's been fixed already. My colleagues and I are getting a closing dangling stream warning when reading large files from s3 or disk which is why I was interested in this issue.~

~https://github.com/dask/distributed/blob/master/distributed/core.py#L688~

~@jrbourbeau I think this issue can be closed because of the code linked above and that I wasn't able to reproduce with the original example~

Seems that I missed the PooledRPC part of this, disregard the original request to close

kylejn27 commented 4 years ago

Did a bit of digging in on this problem the other day, specifically regarding dangling streams. I made a little progress but I'm not sure its worth my time to investigate further

I implemented the close_rpc method by calling self.pool.remove(self.addr). This closed the connections when the cluster was closed but had no effect on the dangling streams warning.

Here's what I've found:

The connection between the client and scheduler is getting broken while the scheduler is trying to read from the client. This can be observed from the Stream is Closed exception on the Scheduler.

When the connection gets broken it causes the client to delete the TCP connection object before it's been closed and then start a new one. This is indicated first by the dangling stream warning which is logged upon deletion of the TCP object and then the Setting TCP keepalive message which is logged upon instantiation of a TCP object in the set_tcp_timeout method.

There's a Stream is Closed exception occuring here https://github.com/dask/distributed/blob/master/distributed/core.py#L338 https://github.com/dask/distributed/blob/master/distributed/comm/tcp.py#L201

Scheduler Logs

distributed.comm.tcp - DEBUG - Incoming connection from 'tcp://[::1]:55311' to 'tcp://127.0.0.1:8786'
distributed.comm.tcp - DEBUG - Setting TCP keepalive: nprobes=10, idle=10, interval=2
distributed.core - DEBUG - Connection from 'tcp://[::1]:55311' to Scheduler
distributed.core - DEBUG - Lost connection to 'tcp://[::1]:55311' while reading message: in <closed TCP>: Stream is closed. Last operation: None

Client Logs

distributed.comm.tcp - WARNING - Closing dangling stream in <TCP  local=tcp://[::1]:55311 remote=tcp://localhost:8786>
distributed.comm.tcp - DEBUG - Setting TCP keepalive: nprobes=10, idle=1, interval=1

Next Steps

If I were to investigate this further I'd try to figure out why the connection is being broken between the client and scheduler. Its probably not a huge problem since the TCP object eventually gets destroyed on the client side but if these were closed properly it might prevent many new unnecessary connections from being created (which could be a potential performance issue).

I'm thinking that the connection pool isn't behaving correctly on long running operations like downloading large parquet files from s3. I think that when the dask worker is doing some operation that takes a while and the connection times out between the client and scheduler and then closes before the TCP object can be reused as intended by the connection pool.

I also found this stack overflow post which may or may not be relevant https://stackoverflow.com/questions/11161626/tornado-server-throws-error-stream-is-closed