dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.58k stars 718 forks source link

Data loss during p2p shuffle #8426

Open nickto opened 10 months ago

nickto commented 10 months ago

Describe the issue: After performing actions that involve (p2p?) shuffling, e.g., shuffling or merging, the number of rows in a data frame slightly (which is different from issue https://github.com/dask/distributed/issues/4386) decreases.

Minimal Complete Verifiable Example:

I could not reproduce using a synthetic example, but I have a data frame with a single column "id" of type int64. Min value is 23, max value is 301,870,552 (so, no integer overflows or anything like that), no missing values. It is partitioned into 32 partitions.

ddf = dd.read_parquet(url).persist()
print(ddf.shape[0].compute())
# 301870552

for i in range(5):
    n_rows = ddf.shuffle(on="id", npartitions=32).shape[0].compute()
    print(n_rows)
# 301870552
# 301870552
# 301870544 <-- smaller
# 301870552
# 301870552

for i in range(5):
    n_rows = ddf.shuffle(on="id", npartitions=32, shuffle="p2p").shape[0].compute()
    print(n_rows)
# 301870552
# 301870552
# 301870552
# 301870552
# 301870545 <-- smaller

for i in range(10):
    n_rows = ddf.shuffle(on="id", npartitions=32, shuffle="tasks").shape[0].compute()
    print(n_rows)
# 301870552
# ...
# 301870552

Anything else we need to know?: I am running it in a k8s cluster with 32 workers with 8Gi of RAM, scattered across 7 nodes.

The problem seem to disappear, when

Originally discovered while using merging. Merging leads to silent loss of data. Joining using index and .join() results in an exception:

TypeError: cannot do slice indexing on Index with these indexers [1937738] of type int64

although items with ID 1937738 are present in both tables.

Environment:

nickto commented 10 months ago

Managed to create a synthetic example

import dask.dataframe as dd

# Create a cluster with 32 workers, 8Gi and 2 CPU each, based on ghcr.io/dask/dask:2023.11.0-py3.10 image,
# distributed across 7 physical nodes. 
client, cluster = ...

# Generate a data frame with random integers using delayed execution
@dask.delayed
def generate_df(n_rows):
    from numpy.random import randint

    return pd.DataFrame({"id": randint(23, 3_985_265, n_rows)})

ddf = dd.from_delayed([generate_df(10_000_000) for _ in range(32)]).persist()

n_rows = ddf.shape[0].compute()
print(n_rows)
# 320000000

for i in range(20):
    n_rows = ddf.shuffle(on="id", npartitions=32, shuffle="p2p").shape[0].compute()
    print(n_rows)
# 319999992 <-- wrong
# 320000000
# 320000000
# 320000000
# 319999979 <-- wrong
# 320000000
# 319999992 <-- wrong
# 319999992 <-- wrong
# 320000000
# 320000000
# 320000000
# 320000000
# 320000000
# 319999985 <-- wrong
# 320000000
# 319999992 <-- wrong
# 319999984 <-- wrong
# 320000000
# 320000000
# 320000000

client.close()
cluster.close()
hendrikmakait commented 10 months ago

@nickto, thanks for raising this issue. Silent data loss is indeed serious. I'll take a deeper look at this. Could you re-run your reproducer with the latest release (or, even better, the current main)? This will help us rule out that this problem has been fixed in the meantime. Also, are there any relevant logs that might point us toward the culprit?

hendrikmakait commented 10 months ago

Unfortunately, I cannot reproduce this error even when running the shuffle 100 times. I've tried replicating your deployment running 32 workers on a m5.large (2 CPU, 8 GiB memory) instance each using the ghcr.io/dask/dask:2023.11.0-py3.10 Docker image on the cluster and a matching local environment created with mamba/pip.

To identify the issue, let's try to reduce the number of unknowns:

dbalabka commented 10 months ago

@hendrikmakait, I'm able to reproduce it on 2023.11.0 with smaller amount of workers = 16. I can not reproduce it with the workers' number = 8. It might be server/network/load specific, especially in our environment. However, it should not silently drop rows.

I can confirm that the docker/client/server versions match. It is not reproducible with LocalCluster it kind of proves that it might be server/network specific. We use dask-kubernetes to deploy on our on-prem Kubernetes.

I'm going to:

  1. Enable debug logging
  2. Use the latest versions.
  3. Also, it might be related to dask cluster settings, so I will provide settings.
hendrikmakait commented 10 months ago

~Out of curiosity, can you try reproducing without relying on np.random.randint? For example, create IDs based on a range or use dask.array to generate the random data instead (preferably the former).~

Forget it, I missed the .persist() call.

hendrikmakait commented 10 months ago

On second thought, it would still be valuable to see if you can reproduce the issue without randomizing the IDs. This might help reduce the number of moving parts.

hendrikmakait commented 10 months ago

However, it should not silently drop rows.

Agreed! Unfortunately, I have very little to work with without a general reproducer that doesn't depend on your specific deployment environment. Let's try to boil this further down to something that also fails for me.

dbalabka commented 10 months ago

@hendrikmakait , still reproducible on dask==2023.12.1 & pandas==2.1.4. Are you using Kubernetes to deploy the environment? I don't believe that Kubernetes might be the cause, but it might reveal the problem because of network instability or any other factors.

hendrikmakait commented 10 months ago

Thanks for the update. As a Coiled employee, I almost exclusively use Coiled to deploy Dask in the cloud (which is not based on k8s).

I'm not yet convinced that network instability is the problem here just given the symptoms of your issue. If the network were to blame, I'd expect you to lose much larger chunks of data somewhere between thousands and hundreds of thousands. I might be wrong of course.

There are a few things we can do here:

dbalabka commented 10 months ago

@hendrikmakait , thanks a lot for your support.

by removing randomization or finding ways to eliminate or amplify the problem

The problem is reproducible w/o the random:

@dask.delayed
def generate_df():
    from numpy import arange

    df = pd.DataFrame({"id": arange(23, 3_985_265)})
    return df

ddf = dd.from_delayed([generate_df() for _ in range(WORKERS)]).persist()

n_rows = ddf.shape[0].compute()
print(n_rows)

from  dask.distributed import print
for i in range(20):
    n_rows = ddf.shuffle(on="id", npartitions=WORKERS, shuffle="p2p").shape[0].compute()
    print(n_rows)

Also, I tried to enable debug logging, but, unfortunately, I didn't notice any suspicious messages:

DASK_DISTRIBUTED__LOGGING__DISTRIBUTED=debug
DASK_DISTRIBUTED__LOGGING__DISTRIBUTED_CLIENT=debug
DASK_DISTRIBUTED__LOGGING__DISTRIBUTED_SCHEDULER=debug
DASK_DISTRIBUTED__LOGGING__DISTRIBUTED_SHUFFLE=debug

I'm currently looking into additional consistency checks within P2P, which might help us see where things are going wrong.

I spent some time reviewing the code of the shuffling module. I want to add some extra logging into it to narrow down the issue as you suggested. If you have any specific suggestions, I would be happy to try.

I have some idea that it might be connected to the memory/buffer code part: https://github.com/dask/distributed/blob/main/distributed/shuffle/_memory.py https://github.com/dask/distributed/blob/main/distributed/shuffle/_buffer.py or closed connection handling logic: https://github.com/dask/distributed/blob/main/distributed/shuffle/_core.py#L478-L496

We can have a quick synchronous chat about your problem and your deployment situation, maybe that will help us find a clue. Feel free to hit me up on https://dask.slack.com/ to schedule something after the holidays.

Starting tomorrow, I'm out of the office. So, it seems we have to continue in the new year.

Merry Christmas and Happy New Year! 🎊🥂🎉🎅

dbalabka commented 10 months ago

@hendrikmakait, is there any quick way to update the code of the dask library on schedulers and workers? We use client.upload_file to upload our packages as egg file. However, it seems that it does not reload the library similarly to Jupyters %load_ext autoreload

dbalabka commented 9 months ago

@hendrikmakait I tried on GKE but can not reproduce.

hendrikmakait commented 9 months ago

@dbalabka: Happy New Year!

The problem is reproducible w/o the random:

Thanks for confirming, how often does the problem pop up without randomness involved?

@hendrikmakait, is there any quick way to update the code of the dask library on schedulers and workers?

There are a few ways to update third-party packages on the fly, but I'm not sure how well (if at all) they work when using them to update Dask itself. When using client.upload_file, you could try client.restart() to restart workers, which should refresh worker subprocesses. There's also the PipInstall plugin you could use to install from a public/private fork` (https://distributed.dask.org/en/latest/plugins.html#distributed.diagnostics.plugin.PipInstall). Note that none of these mechanisms perform a hot reload on an existing scheduler.

@hendrikmakait I tried on GKE but can not reproduce.

Thanks for trying this out! This sounds like there might be something in your current deployment that breaks pre-conditions required by Dask or the P2P mechanism.

dbalabka commented 9 months ago

Thanks for confirming, how often does the problem pop up without randomness involved?

I don't see much difference between random and simple arange mentioned above.

Note that none of these mechanisms perform a hot reload on an existing scheduler.

It seems that hot reload is already implemented but does not work for me: https://github.com/dask/distributed/blob/81774d41cb2a0b4258b36b29f2448b27cf62c363/distributed/utils.py#L1170

hendrikmakait commented 9 months ago

It seems that hot reload is already implemented but does not work for me:

https://github.com/dask/distributed/blob/81774d41cb2a0b4258b36b29f2448b27cf62c363/distributed/utils.py#L1170

Let me rephrase my earlier statement: Given that the scheduler code has already been imported and the scheduler is already running when you run the upload, I'd be surprised if hot reloading worked reliably for the scheduler code itself.

dbalabka commented 9 months ago

@hendrikmakait , we narrowed the issue to be able to work around the problem. It is connected to the fact that workers are distributed across multiple regions. Using only one region solves the issue. I haven't yet attempted to reproduce the issue in GKE with nodes located in multiple regions such as the US and EU. However, I reckon that it will result in the same behavior.

mrocklin commented 9 months ago

FWIW, dask clusters spread across regions are likely to be very expensive due to how cloud price cross region data transfer. Even within the same region you'd want to make sure you're in the same availability zone. Setting up distributed compute systems in the cloud in a low-cost way can be surprisingly complex.

On Mon, Jan 8, 2024 at 3:03 PM Dmitry Balabka @.***> wrote:

@hendrikmakait https://github.com/hendrikmakait , we narrowed the issue to be able to work around the problem. It is connected to the fact that workers are distributed across multiple regions. Using only one region solves the issue. I haven't yet attempted to reproduce the issue in GKE with nodes located in multiple regions such as the US and EU. However, I reckon that it will result in the same behavior.

— Reply to this email directly, view it on GitHub https://github.com/dask/distributed/issues/8426#issuecomment-1881817121, or unsubscribe https://github.com/notifications/unsubscribe-auth/AACKZTFR6MUZGBZJQBQBC4DYNRNLDAVCNFSM6AAAAABA3RESCWVHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMYTQOBRHAYTOMJSGE . You are receiving this because you are subscribed to this thread.Message ID: @.***>

dbalabka commented 9 months ago

@mrocklin , I fully support your concerns when we are using Dask in the clouds, such as AWS, GCP, etc. In our case, we are using on-prem Kubernetes, and data transfer cost isn't a concern. However, we plan to avoid using Dask workers distributed across multi-region to avoid the above described issue.

While we do not fully understand the exact cause of the problem, there is a probability that the data loss might happen even in a single region/zone of Dask workers. I believe that there is an unhandled edge case, which hopefully manifests itself only when network latency is very high. It would be great if Dask can handle this case or at least throw an error.

mrocklin commented 9 months ago

Makes sense.

If it's helpful for folks trying to create a reproducer, the linux command tc can be used to add artificial network latencies to a network interface. I found it useful in early testing of Dask .

On Tue, Jan 9, 2024 at 4:35 AM Dmitry Balabka @.***> wrote:

@mrocklin https://github.com/mrocklin , I fully support your concerns when we are using Dask in the clouds, such as AWS, GCP, etc. In our case, we are using on-prem Kubernetes, and data transfer cost isn't a concern. However, we plan to avoid using Dask workers distributed across multi-region to avoid the above described issue.

While we do not fully understand the exact cause of the problem, there is a probability that the data loss might happen even in a single region/zone of Dask workers. I believe that there is an unhandled edge case, which hopefully manifests itself only when network latency is very high. It would be great if Dask can handle this case or at least throw an error.

— Reply to this email directly, view it on GitHub https://github.com/dask/distributed/issues/8426#issuecomment-1882817444, or unsubscribe https://github.com/notifications/unsubscribe-auth/AACKZTFDCPPMLYIGH2ECHULYNUMNLAVCNFSM6AAAAABA3RESCWVHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMYTQOBSHAYTONBUGQ . You are receiving this because you were mentioned.Message ID: @.***>