coiled / benchmarks

BSD 3-Clause "New" or "Revised" License
32 stars 17 forks source link

[TPC-H] PySpark workloads are vulnerable to undetected worker loss #1449

Closed hendrikmakait closed 8 months ago

hendrikmakait commented 8 months ago

In this run, the PySpark cluster lost three workers over time without us ever detecting let alone healing this. Things went wrong for PySpark and likely skewed test results afterward.

At the very least, we should be able to assert that a PySpark cluster still has the requested # of workers when beginning a new query.

Cluster: https://cloud.coiled.io/clusters/410048/ CI Run: https://github.com/coiled/benchmarks/actions/runs/8231277279/job/22506263406#step:8:745

hendrikmakait commented 8 months ago

@milesgranger: Any ideas if it's possible for us to assert the correct number of Spark workers?

milesgranger commented 8 months ago

Hrmm, I'm almost inclined to think that this is kinda on Spark for losing and not recovering workers. But otherwise, I'd think one can check with sc.getConf().get("spark.executor.instances") (as we looked into this config setting) but I don't know if that's dynamic?

Edit: actually that, and another way explained here: https://stackoverflow.com/questions/38660907/how-to-get-the-number-of-workersexecutors-in-pyspark

milesgranger commented 8 months ago

I can poke around at this tomorrow if that'd be helpful.

hendrikmakait commented 8 months ago

Hrmm, I'm almost inclined to think that this is kinda on Spark for losing and not recovering workers.

I'd generally side with you, but I'm not entirely sure if it's on us (our cluster configuration/setup) that those workers drop without healing.

I can poke around at this tomorrow if that'd be helpful.

If it's quick to do, that would be nice!

ntabris commented 8 months ago

From what I see, when the Spark worker exits, dask nanny/worker is also getting killed, so the VM shuts itself down. Is that what other folks are seeing?

(For example, https://cloud.coiled.io/clusters/410048/account/foobar/information?account=&scopes=%7B%22type%22%3A%22account%22%2C%22id%22%3A7568%2C%22name%22%3A%22foobar%22%2C%22organizationId%22%3A9991%7D&tab=Logs&filterPattern=10.0.21.218&daskLogs=1)

milesgranger commented 8 months ago

From what I see, when the Spark worker exits, dask nanny/worker is also getting killed, so the VM shuts itself down. Is that what other folks are seeing?

10.0.27.248 is interesting. It starts and seems perfectly happy and then scheduler kills it due to heartbeat timeout. While according to the (spark) scheduler, it's processing tasks fine less than 10sec before it's killed. Maybe the answer at least to this case is to bump/remove Dask heartbeat timeout for TPC-H/Spark clusters?

milesgranger commented 8 months ago

Checking back in for counting the number of executors, indeed conf.get("spark.executor.instances") will not change over the course of the cluster.

However, with the Spark REST API I've confirmed that /applications/[app-id]/executors works well to get the current count of active executors.

Only need to have port 4040 opened up by default or at least a flag as it's closed now closed. I opened in the linked PR along w/ the required other ports, but I'd guess platform has a better way to deal with this. @ntabris

For now I have this implementation here for suggestions: https://github.com/coiled/benchmarks/pull/1450

milesgranger commented 8 months ago

@hendrikmakait, I was curious if https://github.com/coiled/benchmarks/pull/1450 closes this, now that it tracks/warns about change in executors. But maybe we still want an assert somewhere?

hendrikmakait commented 8 months ago

But maybe we still want an assert somewhere?

I was asking myself the same question. Maybe an assertion makes more sense? Otherwise, I fear we won't really notice the warnings.