pywebio / PyWebIO

Write interactive web app in script way.
https://pywebio.readthedocs.io
MIT License
4.49k stars 384 forks source link

Question: How to communicate with all sessions? #565

Closed WolfDWyc closed 1 year ago

WolfDWyc commented 1 year ago

I want to a create a multiplayer game that updates in real-time.

I want to be able to be in 1 session and run a function for all connected sessions.

Example:


def update():
   put_text("Updated")

# Called when clicking a button from put_button
def on_click():
    # The function I want to exist
    run_in_all_sessions(update)

In this example, when a button is clicked on any session, all sessions should get the text

The equivalence of this in a more traditional server-client model would probably be using a websocket (Something like this: https://flask-socketio.readthedocs.io/en/latest/getting_started.html#broadcasting), but this isn't possible in pywebio.

Is there a way to do this?

FlyHighest commented 1 year ago

What about a while-true cycle in every client that requests new information from the server?

ll606 commented 1 year ago

We can create a DataManager class with singleton pattern to send data to all sessions through queues.

Maybe you can try this small demo:

from queue import Queue
from pywebio.output import put_button, put_text
from pywebio import start_server
from typing import Callable

class DataManager:

    def __new__(cls) -> 'DataManager':
        if not hasattr(cls, 'instance'):
            cls.instance = object.__new__(cls)
            cls.session_queues = []
        return cls.instance 

    def run_in_all_sessions(self, func: Callable):
        for queue in self.session_queues:
            queue.put(func)

    def register(self, queue: Queue):
        self.session_queues.append(queue)

def update():
    put_text('updated')

def on_click():
    DataManager().run_in_all_sessions(update)

def game():
    queue = Queue()
    DataManager().register(queue)
    put_button('update', on_click)
    while 1:
        func: Callable = queue.get()
        func()

if __name__ == '__main__':
    start_server(game)

We can also use another thread for update and use data manager as a decorator to register the session and manage the update within the data manager itself. This is much easier to use.

from queue import Queue
from pywebio.output import put_button, put_text
from pywebio import start_server
from pywebio.session import register_thread, get_current_session
from threading import Thread
from typing import Callable

class DataManager:

    def __new__(cls) -> 'DataManager':
        if not hasattr(cls, 'instance'):
            cls.instance = object.__new__(cls)
            cls.session_queues = []
        return cls.instance 

    def run_in_all_sessions(self, func: Callable):
        for queue in self.session_queues:
            queue.put(func)

    def register(self, func):

        def decorator(*args, **kwargs):
            update_thread = Thread(target=self.update_session, daemon=True)
            register_thread(update_thread)
            update_thread.start()

            res = func(*args, **kwargs)
            return res
        return decorator

    def update_session(self):
        queue = Queue()
        self.session_queues.append(queue)
        while True:
            func = queue.get()
            func()

def update():
    put_text('updated')

def on_click():
    DataManager().run_in_all_sessions(update)

@DataManager().register
def game():
    put_button('update', on_click)

if __name__ == '__main__':
    start_server(game)
WolfDWyc commented 1 year ago

I ended up coming up with my own solution after digging in the source code a few days after I asked this. (Sorry I didn't close this earlier!)

This is a really cool solution though! It seems to work and the 2nd option is even better as it doesn't require an infinite loop listening for events. It does however require quite a lot of code.

So in case anyone finds it useful, below is the solution I used. It uses implementation details of ThreadBasedSession so it may be less stable, but it's a bit less code since it leaves the thread management to pywebio.

from functools import wraps
from typing import Callable
from uuid import uuid4

from pywebio.output import put_button, put_text
from pywebio import start_server
from pywebio.session import ThreadBasedSession

sessions = []

def for_all_sessions(func: Callable) -> Callable:

    @wraps(func)
    def wrapper(*args, **kwargs):
        for session in sessions:
            func_id = str(uuid4())

            def inner(_):
                func(*args, **kwargs)

            session.callbacks[func_id] = (inner, False)
            session.callback_mq.put({"task_id": func_id, "data": None})

    return wrapper

def register_session():
    sessions.append(ThreadBasedSession.get_current_session())

@for_all_sessions
def on_click():    
    put_text('updated')

def game():
    register_session()
    put_button('update', on_click)

if __name__ == '__main__':
    start_server(game)