Open fjetter opened 5 years ago
Hi @fjetter ,
My apologies for the delay in response. You raised this when I was fairly busy, and I hadn't gone through old issues until now.
Everything you say makes a lot of sense to me. I think that Dask could really use an active memory manager that thinks about replication like how you describe. It's unlikely that I personally will build such a thing, but I think that you and your group could handle it easily.
Here are some links to prior discussion that might be helpful in informing design.
replicate
method with a more active system like how you describe: https://github.com/dask/distributed/issues/1002But neither is exactly what you are talking about. I think that ideally this would be a Scheduler extension that would keep a desired replication of every piece of data (informed either by heuristic, or explicit request from the user) and would send out requests to workers to duplicate or remove data as necessary to keep things on target. This sounds like an interesting problem to solve, and highly valuable for many applications.
Another approach would be to have workers retire themselves gracefully as they leave the cluster. My guess is that when your nodes die there is some way to have them give you a bit of warning. In these cases you could have them call self.close_gracefully()
and move their data to other peers before leaving the network.
This is already quite helpful. The memory manager sounds quite promising but I will need to talk to my team first and will come back to you if we intend to pursue this further.
Another approach would be to have workers retire themselves gracefully
We were also discussing graceful downscaling but there are still some issues for us
Regarding 2. a thought just popped into my mind. Not sure if this is feasible, yet. Did you ever think about signal handlers for the worker/nanny? Something like SIGTERM -> stop calculation immediately and retire gracefully if possible.
Yes. There was some early attempt at this here I think: https://github.com/dask/distributed/pull/2844
On Wed, Nov 13, 2019 at 2:30 AM fjetter notifications@github.com wrote:
This is already quite helpful. The memory manager sounds quite promising but I will need to talk to my team first and will come back to you if we intend to pursue this further.
Another approach would be to have workers retire themselves gracefully
We were also discussing graceful downscaling but there are still some issues for us
- Ideally the solution should handle ungraceful downscaling since we can not necessarily give the guarantees that the worker gets enough time to handle its shutdown gracefully. We realise that this is a big ask, though.
- Mostly related to our setup but we are not quite sure how we would even trigger the graceful downscaling. We wouldn't want to rely on Client connections since we don't want to couple our cluster manager to any particular distributed version (Protocol/API stability, etc.).
Regarding 2. a thought just popped into my mind. Not sure if this is feasible, yet. Did you ever think about signal handlers for the worker/nanny? Something like SIGTERM -> stop calculation immediately and retire gracefully if possible.
— You are receiving this because you commented. Reply to this email directly, view it on GitHub https://github.com/dask/distributed/issues/3184?email_source=notifications&email_token=AACKZTGLX6WFMLFY3FDXK5LQTPJLDA5CNFSM4JG2YGZ2YY3PNVWWK3TUL52HS4DFVREXG43VMVBW63LNMVXHJKTDN5WW2ZLOORPWSZGOED5VA5Q#issuecomment-553341046, or unsubscribe https://github.com/notifications/unsubscribe-auth/AACKZTHGZZKINUGEHKVSU33QTPJLDANCNFSM4JG2YGZQ .
The memory manager sounds quite promising but I will need to talk to my team first and will come back to you if we intend to pursue this further.
I am also quite happy to engage here. I think that actve memory management is important for many workloads. I have some rough thoughts on how to design this, but I don't currently have enough continuous time to devote to it. I would be very happy to engage with regular meetings, review, and so on.
Another approach would be to have workers retire themselves gracefully as they leave the cluster. My guess is that when your nodes die there is some way to have them give you a bit of warning. In these cases you could have them call self.close_gracefully() and move their data to other peers before leaving the network.
For anyone coming here with a similar issue, we will now try the following: Teach our container scheduler to notify up-front if a worker is taken down. This does not help in cases of node failures but with regular maintenance and auto-scaling it should be effective.
We cannot solve that easily via SIGTERM as we also run reverse proxies / side-cars and need a coordinated shutdown. In our case having a simple HTTP method is the simplest solution for now. I am not sure if such a thing will be to general interest upstream, and we therefore take the following live for now as a tactical fix:
import logging
import click
logger = logging.getLogger("distributed.custom")
@click.command()
@click.option("--graceful-shutdown-endpoint")
def dask_setup(worker, graceful_shutdown_endpoint):
""" Loaded at Dask worker startup via --preload """
try:
from distributed.dashboard.utils import RequestHandler
from distributed.dashboard.worker import routes
class TerminationHandler(RequestHandler):
"""
Custom HTTP handler to trigger a graceful shutdown via Aurora's HTTP Lifecycle
"""
def post(self):
logger.info(
"HTTP lifecycle triggered. Initiating graceful shutdown."
)
self.server.io_loop.add_callback(self.server.close_gracefully)
self.write("shutting down")
self.set_header("Content-Type", "text/plain")
routes.append((graceful_shutdown_endpoint, TerminationHandler))
except Exception:
logger.exception(
"Dask integration failed. Continuing without graceful worker termination"
)
else:
logger.info(
"Dask integration succeeded. Graceful worker termination activated"
)
This module is then pre-loaded into the dask-worker
process.
We are operating our distributed clusters in a cloud environment where we need to deal with frequently failing nodes. We usually dispatch jobs automatically and are bound to certain SLAs and therefore expect our jobs to finish in a more or less well defined time. While distributed offers resilience in terms of graph recalculation we're facing the issue that the recalculation introduces severe performance issues for us.
We are looking for something which would allow us to recover faster in scenarios where individual workers die such that we do not need to recalculate large, expensive chunks of the graph, e.g. by persisting or replicating valuable, small intermediate results.
Ideally the solution would be handled by the scheduler itself, s.t. many different applications can benefit of it (e.g. via a scheduler plugin/extension). We were thinking about milestone/snapshotting where the user can label certain results to be worthy to be repliacated (and later forgotten once another milestone passes/completes). We also discussed some kind of automatic replication based on heuristics (e.g.
bytes_result < x and runtime of task > Y
-> replicate result) to soften the blow in case of failures.My questions would be:
Researching existing github issues, I only found #2748 which discusses this scenario briefly but is ultimately closed without a proper resolution to this topic. The only solution which is suggested is a caching library but persisting every single result is most likely not an option.