holoviz / panel

Panel: The powerful data exploration & web app framework for Python
https://panel.holoviz.org
BSD 3-Clause "New" or "Revised" License
4.78k stars 518 forks source link

Dynamically scaling Panel #4183

Open govinda18 opened 1 year ago

govinda18 commented 1 year ago

Is your feature request related to a problem? Please describe.

Currently two users simultaneously using a panel app would block each other. One workaround to this is to use num-threads but this however is not the most elegant solution as it requires the developer to be cautious with handling GIL. Logging this request to explore solutions on dynamically scaling a panel server as more users join.

Describe the solution you'd like

One idea is to have several workers at the server's disposal and events can be redirected to the workers. This requires a mechanism to appropriately store state such that the tasks assigned to workers can be uniquely identified with the clients that they are associated to.

Describe alternatives you've considered

One of the solutions using static scaling is to make multiple panel servers sit behind an nginx load balancer. This is what we are currently using.

A simple demo for the issue:

def f(m=10):
    time.sleep(5)
    print(time.time(), os.getpid())
    return f"# Hello {m}"

def get_app():
    # time.sleep(5)
    return pn.interact(f)
MarcSkovMadsen commented 1 year ago

Have you tried --num-procs @govinda18? This will start the number of workers you provide (or an optimal number of workers if you provide 0).

govinda18 commented 1 year ago

Have you tried --num-procs @govinda18? This will start the number of workers you provide

It does not work as one would want it to actually due to https://stackoverflow.com/a/63849068/7398517. Therefore a better load-balancing solution any way is to use https://docs.bokeh.org/en/latest/docs/user_guide/server/deploy.html#load-balancing instead.

However, these options limit you in terms of optimal resource allocation. If you use too many, you may be consuming a lot of resources, mostly idle and too few, which would not help balance the load. Ideally, I would want to scale things as and when the load increases dynamically. Maybe use some standard request-response designs or something of that sort.

TheoMathurin commented 1 year ago

I would also very much like a way to scale panel apps dynamically for the same reasons mentioned by @govinda18. It would dramatically improve both the developper and user experience for production apps which are likely to be simultaneously accessed by several clients.

Threading using --num-threads helps, but not that much for callbacks with CPU-bound tasks which make your process busy for some time. Also, the fact that bokeh and panel apps are stateful means a client is connected to the same python process throughout. This has advantages but it makes scaling less straightforward compared to a stateless server. It may be a lot of work or even not really feasible, but if the said process could indeed hand tasks to a pool of independent workers so that it's still able to handle further requests, that would be a game-changer IMHO.

By the way the panel docs section mentioning --num-procs is a bit misleading as it makes it look like you get something equivalent to a standard load balancer, while it's clear that requests are not balanced.

philippjfr commented 1 year ago

By the way the panel docs section mentioning --num-procs is a bit misleading as it makes it look like you get something equivalent to a standard load balancer, while it's clear that requests are not balanced.

Could you propose some language to clarify this?

TheoMathurin commented 1 year ago

@philippjfr Sure, I actually thought I would submit a PR.

philippjfr commented 1 year ago

That would be very much appreciated!

ndmlny-qs commented 1 year ago

@govinda18 & @TheoMathurin I have cobbled together a solution that uses dask and panel together. It does not scale workers in dask, but in the very least the "blocking" computation can be scheduled on dask such that the user does not get blocked by another user...as long as there are dask workers available. This is using dask's LocalCluster object, and has the default number of workers available for my machine. Adaptively scaling workers will require a custom implementation of dask's Adaptive object, see https://github.com/dask/dask-jobqueue/issues/122 for a discussion related to job queues using a SLURM cluster.

Below is the output, which has a computation that takes roughly 10 seconds to compute (see the code at the end). The top cell output begins its execution and 10s later produces the result. The bottom cell starts a few seconds after the top cell, but still returns the result 10s later.

panel-scaling-example.webm

I was struggling to get the callbacks correct until I stumbled across this discourse post https://discourse.holoviz.org/t/panel-webapp-with-dask-bokeh-tornado-and-asynchronous-tasks/2388/5. The post outlines the need for periodic callbacks in order for panel to check for future objects (the computations being done with dask) being complete inside a futures list object. This makes sense, as we need to trigger panel to check the futures list object for changes (see the code). If we make param watch for changes to a list then it will ultimately never trigger a change since the underlying Python object (a list) never actually changes. Even if we add or remove items from it, see https://param.holoviz.org/user_guide/Dependencies_and_Watchers.html#param-trigger. I'll have to think about it more, but there might be a way to use a deque object.

To run the code, have both files in the same directory and run panel serve dask_example.py.

code

# cluster.py
#
# NOTE: However you start your cluster, it needs to have access to the
#       `blocking_computation` method. If you get an error that says `dask` cannot
#       pickle the blocking computation, then change your `PYTHONPATH` to include the
#       directory where this file lives.
#
# Ensure this file is in the sample place as the `dask_example.py` file.
import time
import numpy as np
from dask.distributed import LocalCluster

def blocking_computation(x: float) -> float:
    samples = []
    for _ in range(1000):
        time.sleep(0.01)
        samples.append(np.random.normal(loc=1.0, scale=1.0))
    return x + int(np.ceil(np.mean(samples)))

if __name__ == "__main__":
    cluster = LocalCluster(scheduler_port="8786")
    print(cluster.scheduler_address)
    input()
# dask_example.py
from __future__ import annotations
from datetime import datetime as dt
import panel as pn
import param
from dask.distributed import Client
from cluster import blocking_computation
pn.extension()

class DaskExample(param.Parameterized):

    input_ = param.Parameter(default=0.0, label="Input")
    output = param.Parameter(default=1, label="Input + 1")
    start = param.String(default="", label="Start time")
    end = param.String(default="", label="End time")
    compute = param.Event(label="Compute")

    def __init__(
        self: DaskExample,
        refresh_rate: int = 500,
        **params,
    ) -> None:
        super().__init__(**params)
        self.futures = []
        self.client = Client()
        self.refresh_rate = refresh_rate
        self.computing = pn.indicators.LoadingSpinner(
            value=False,
            width=100,
            height=100,
        )
        self.param.watch(self.create_futures, ["compute"])

    def start_refreshing(self: DaskExample) -> None:
        pn.state.add_periodic_callback(self.compute_futures, self.refresh_rate)

    def compute_futures(self: DaskExample, *args) -> None:
        if self.futures:
            for future, callback in self.futures:
                if future.status == "finished":
                    callback(future)
                    self.futures.remove((future, callback))

    def callback(self: DaskExample, future) -> None:
        self.output = future.result()
        self.update_end_time()
        self.computing.value = False

    def create_futures(self: DaskExample, *args):
        self.update_start_time()
        self.computing.value = True
        future = self.client.submit(blocking_computation, self.input_)
        self.futures.append((future, self.callback))

    def update_start_time(self: DaskExample) -> None:
        start = dt.now()
        self.start = start.strftime("%Y-%m-%d %H:%M:%S,%f")[:-3]
        self.end = ""

    def update_end_time(self: DaskExample) -> None:
        end = dt.now()
        self.end = end.strftime("%Y-%m-%d %H:%M:%S,%f")[:-3]

example = DaskExample()
pn.state.onload(example.start_refreshing)
pn.Row(example.param, example.computing).servable()
MarcSkovMadsen commented 1 year ago

I am not experienced in Dask or Async but it seems to me, that it should be possible to combine and would make the client code simpler.

I really want to learn about using Panel and Dask in combination so I've made a feature request here https://github.com/holoviz/panel/issues/4233

MarcSkovMadsen commented 1 year ago

For example this I believe is simpler and closer to something I would use. From my experiments it adds ~2x0,1sec for transfering data between Panel Server and Dask cluster.

cluster.py

# cluster.py
from dask.distributed import LocalCluster

SCHEDULER_PORT = 64719

if __name__ == '__main__':
    cluster = LocalCluster(scheduler_port=SCHEDULER_PORT, n_workers=4)
    print(cluster.scheduler_address)
    input()

tasks.py

import time

from datetime import datetime as dt

import numpy as np

def blocking_computation(x: float) -> float:
    start = dt.now()
    print(start.strftime("%Y-%m-%d %H:%M:%S,%f")[:-3])
    samples = []
    for _ in range(1000):
        time.sleep(0.01)
        samples.append(np.random.normal(loc=1.0, scale=1.0))
    end = dt.now()
    print(end.strftime("%Y-%m-%d %H:%M:%S,%f")[:-3])
    return x + int(np.ceil(np.mean(samples)))

app.py

# app.py
from __future__ import annotations

import asyncio
from datetime import datetime as dt

import panel as pn
import param
from dask.distributed import Client

from tasks import blocking_computation

pn.extension(sizing_mode="stretch_width")

DASK_ADDRESS = "tcp://127.0.0.1:64719"

async def get_client():
    if not "dask-client" in pn.state.cache:
        pn.state.cache["dask-client"]=await Client(DASK_ADDRESS, asynchronous=True)
    return pn.state.cache["dask-client"]

async def submit(func, *args, **kwargs):
    client = await get_client()
    return await client.submit(func, *args, **kwargs)

class DaskExample(param.Parameterized):

    input_ = param.Parameter(default=0.0, label="Input")
    output = param.Parameter(default=1, label="Input + 1")
    start = param.String(default="", label="Start time")
    end = param.String(default="", label="End time")
    compute = param.Event(label="Compute")

    def __init__(
        self: DaskExample,
        refresh_rate: int = 500,
        **params,
    ) -> None:
        super().__init__(**params)
        self.computing = pn.indicators.LoadingSpinner(
            value=False,
            width=100,
            height=100,
        )

    @pn.depends("compute", watch=True)
    async def create_futures(self: DaskExample, *args):
        self.update_start_time()
        self.output = await submit(blocking_computation, self.input_)
        self.update_end_time()

    def update_start_time(self: DaskExample) -> None:
        self.param.compute.constant=True
        self.computing.value = True
        start = dt.now()
        self.start = start.strftime("%Y-%m-%d %H:%M:%S,%f")[:-3]
        print(self.start)
        self.end = ""

    def update_end_time(self: DaskExample) -> None:
        end = dt.now()
        self.end = end.strftime("%Y-%m-%d %H:%M:%S,%f")[:-3]
        print(self.end)
        self.computing.value = False
        self.param.compute.constant=False

slider = pn.widgets.IntSlider(value=0, start=0, end=10)

example = DaskExample()

pn.template.FastListTemplate(
    main = [
        pn.Column("## Offload asynchronously to Dask", example.param, example.computing),
        pn.Column("## Not blocked", slider, slider.param.value),
    ],
    site="Awesome Panel",
    title="Offload compute intensive tasks to Dask cluster",
).servable()
python cluster.py
panel serve app.py --autoreload

https://user-images.githubusercontent.com/42288570/209424815-7c47a9eb-4006-481a-b1c9-8802ec64aed7.mp4

MarcSkovMadsen commented 1 year ago

I've updated the example above. After some experiments it turned out that it will avoid a lot of issues if you start the Dask cluster separately from the client. That is why I created the cluster.py file.

I don't know how to set self.client = await get_client() in the __init__ method. Thus I just cached the get_client method and for simplicity added the submit method.

TheoMathurin commented 1 year ago

Thanks @MarcSkovMadsen, this really looks like a good lead.

Just a comment: the sleep operation is not CPU intensive and does not hold the GIL. Therefore the slider value update would work just by having a threaded panel server. Presumably it's not enabled though, so what we are seeing is really the effect of having a separate dask cluster.

A tenth of a second is acceptable overhead I'd say although we would have to see how this scales. In any case, that's nice!

MarcSkovMadsen commented 1 year ago

After some reading and experimentation I've found.

See https://github.com/holoviz/panel/issues/4239.

My reflection is that if the overhead was close to 1msec then you could/ should almost always use Dask as the execution engine. That would potentially make Panel scale really, really well.

govinda18 commented 1 year ago

Thanks a lot @ndmlny-qs and @MarcSkovMadsen for the work here. Apologies for the delayed response as I was on a vacation last week.

Some thoughts from the first read:

  1. I feel the approach here can be bit more generalized so as to reduce the amount of efforts on the developer. Will spend some time on the design I have in mind and get back.
  2. On overheads, I feel like even 7 msec is a reasonable overhead (may be I am wrong but for most of my usecases this is probably acceptable). Will playaround with this too.

Adaptively scaling workers will require a custom implementation of dask's Adaptive object, see https://github.com/dask/dask-jobqueue/issues/122 for a discussion related to job queues using a SLURM cluster.

From what I could gather, it is a dask specific resistance. I have not worked closely with dask but I know for one that celery provides a way to add queues dynamically. What I mean here is that dynamically scaling workers should probably be an implementation at the developer's end as long as we have a solution/guideline flexible enough to be able to plugin other techs into it as well.

Will spend some time playing around as well. If you guys have any thoughts, feel free to add.

MarcSkovMadsen commented 1 year ago

Hi @govinda18

Looking forward to see the results from the Celery experimentation. I think we need some more examples and knowledge on making Panel really scale by outsourcing the hard work to Dask, Celery, and Databases like Postgres, SQLlite, DuckDB etc.