streamlit / streamlit

Streamlit — A faster way to build and share data apps.
https://streamlit.io
Apache License 2.0
35.51k stars 3.08k forks source link

Trigger a script re-run from another thread #2838

Open tconkling opened 3 years ago

tconkling commented 3 years ago

I'd love to have some API like st.request_rerun(user_id: str) that could be called outside of a user script. (Currently we have st.experimental_rerun, but it can only be called by a script that's currently executing).

The use case:

If an app script has some long asynchronous process (for example, uploading a user file to some endpoint that the client does not have access to), there's no good way to signal to Streamlit "please re-run the app for user X" (or: "please re-run the app for all users").

Here's an example from the forums: https://discuss.streamlit.io/t/streamlit-ui-freezes-on-long-api-call/9949


Community voting on feature requests enables the Streamlit team to understand which features are most important to our users.

If you'd like the Streamlit team to prioritize this feature request, please use the 👍 (thumbs up emoji) reaction in response to the initial post.

nthmost commented 3 years ago

Mega plus one. I could even see a future where Streamlit exposes a programmatic API for apps that essentially turns deployed Streamlit apps into defacto microservices.

schaumb commented 3 years ago

Hi,

We created a solution for that. Check here: https://github.com/streamlit/streamlit/issues/2927 https://github.com/FloWide/streamlit_callbacks

from streamlit.callbacks.callbacks import call, rerun

if condition:
  call(rerun)
exrhizo commented 1 year ago

Okay, I hacked into streamlit and got a solution!

def get_browser_session_id() -> str:
   # Get the session_id for the current running script 
    try:
        ctx = get_script_run_ctx()
        return ctx.session_id
    except Exception as e:
        raise Exception("Could not get browser session id") from e

def find_streamlit_main_loop() -> asyncio.BaseEventLoop:
    loops = []
    for obj in gc.get_objects():
        try:
            if isinstance(obj, asyncio.BaseEventLoop):
                loops.append(obj)
        except ReferenceError:
            ...

    main_thread = next((t for t in threading.enumerate() if t.name == 'MainThread'), None)
    if main_thread is None:
        raise Exception("No main thread")
    main_loop = next((lp for lp in loops if lp._thread_id == main_thread.ident), None) # type: ignore
    if main_loop is None:
        raise Exception("No event loop on 'MainThread'")

    return main_loop

def get_streamlit_session(session_id: str) -> AppSession:
    runtime: Runtime = Runtime.instance()
    session = next((
        s.session
        for s in runtime._session_mgr.list_sessions()
        if s.session.id == session_id
    ), None)
    if session is None:
        raise Exception(f"Streamlit session not found for {session_id}")
    return session

# This is it!
# get_browser_session_id needs to be run on the relevant script thread,
# then you can call the rest of this on other threads.
streamlit_loop = find_streamlit_main_loop()
streamlit_session = get_streamlit_session(get_browser_session_id())

def notify() -> None:
    # this didn't work when I passed it in directly, I didn't really think too much about why not
    streamlit_session._handle_rerun_script_request()

# This can be called on any thread you want.
streamlit_loop.call_soon_threadsafe(notify)
CyprienRicque commented 1 year ago

Thank you @exrhizo ! Here is a way for those willing to quickly experiment

import streamlit as st
from notifier import notify  # code from @exrhizo put in notifier file without the last line
import time
import threading

st.title("Hello World")

# counter that counts +1 each time the app is run
if "counter" not in st.session_state:
    st.session_state.counter = 0

st.session_state.counter += 1
st.write(f"Counter: {st.session_state.counter}")

def delay_notify():
    time.sleep(5)
    notify()

if st.button("Notify"):
    t = threading.Thread(target=delay_notify)
    t.start()
ahoereth commented 1 year ago

Given the code above, I adjusted it to notify all available sessions, not just one. This is useful if you have a cached resource (@st.cache_resource) receiving updates in a thread (ie from a subscription) which should be distributed to all active clients.

from streamlit.runtime import Runtime
from streamlit.runtime.app_session import AppSession

def get_streamlit_sessions() -> list[AppSession]:
    runtime: Runtime = Runtime.instance()
    return [s.session for s in runtime._session_mgr.list_sessions()]

def notify() -> None:
    for session in get_streamlit_sessions():
        session._handle_rerun_script_request()

Example usage

import threading
import time
import streamlit as st

class Runner:
    def __init__(self):
        self.shared_state = 0
        thread = threading.Thread(target=self.run, daemon=True)
        thread.start()

    def run(self):
        while True:
            self.shared_state += 1
            notify()
            time.sleep(1)

@st.cache_resource
def init_runner():
    return Runner()

runner = init_runner()
st.text(runner.shared_state)
callzhang commented 10 months ago

is there a way to get the streamlit's main_loop elegantly?

lum4chi commented 8 months ago

Did you notice if the solution works also for multipage app? I've tried and the rerun always bring me back to the home page. I've solved this by adding the client state for the session:

def notify() -> None:
    for session in get_streamlit_sessions():
        session._handle_rerun_script_request(session._client_state)
callzhang commented 8 months ago

I have adopted @exrhizo 's solution, and made a small modification to avoid endless rerun:

# script name: st_hack
streamlit_loop = find_streamlit_main_loop()
streamlit_session = get_streamlit_session(get_browser_session_id())

def rerun() -> None:
    # this code will BLOCK the current thread
    if streamlit_session._browser_queue.is_empty():
        # if we don't check the queue, the state of UI will trigger UI related function to run continuously.
        streamlit_session.request_rerun()
    else:
        print('browser queue not empty, rerun cancelled!')

def notify_rerun():
    streamlit_loop.call_soon_threadsafe(rerun)

usage:

## function wrapper
def run_in_thread(func=None, *, callback=None, timeout=20):
    '''Decorator to wrap a function in a thread.
    @param func: function to be wrapped
    @param callback: function to be called after func is executed, \
        it must accept exact one argument. If callback is defined, \
            it will schedule a rerun to streamlit's main thread.
    @param timeout: timeout in seconds to wait for func to finish
    '''
    if func is None:
        # If run_in_thread is called with parentheses, return a decorator.
        return lambda func: run_in_thread(func, callback=callback, timeout=timeout)
    from streamlit.runtime.scriptrunner import add_script_run_ctx, get_script_run_ctx
    from queue import Queue
    from . import st_hack
    import inspect
    t0 = time.time()
    if callable(callback):
        sig = inspect.signature(callback)
        params = sig.parameters
        if len(params) != 1:
            raise ValueError(f"callback function ({callback.__name__}) must accept at most one argument")

    @wraps(func)
    def wrapper(*args, **kwargs):
        result_queue = Queue()
        # use a wrapper to put result in queue
        def result_wrapper(*args, **kwargs):
            result = func(*args, **kwargs)
            if isinstance(result, Queue):
                result = result.get(timeout=timeout)
            if callable(callback):
                callback(result)
                st_hack.notify_rerun()
            result_queue.put(result)
        # create a thread to run the function
        thread = threading.Thread(target=result_wrapper, args=args, kwargs=kwargs)
        ctx = get_script_run_ctx()
        add_script_run_ctx(thread, ctx)
        thread.start()
        return result_queue
    return wrapper

@tconkling do you think this is the proper way to realize off-thread rerun?