miguelgrinberg / Flask-SocketIO

Socket.IO integration for Flask applications.
MIT License
5.31k stars 888 forks source link

A callback isn't invoked when emitting from a Celery process. #1910

Closed valerykustov closed 1 year ago

valerykustov commented 1 year ago

Description

I have a SocketIO client and two HTTP entrypoints. When a request comes to the first of them the server emits to the client. When a request comes to the second one the servers creates a background Celery task which then emits to the client. Both emits have a callback function assigned to them. Both emits reach the client. However, the callback is not invoked for the Celery emit though the acknowledgment is received.

Steps to reproduce:

You need redis server installed and available at localhost.

Versions:

amqp==5.1.1
async-timeout==4.0.2
bidict==0.22.0
billiard==3.6.4.0
celery==5.2.7
click==8.1.3
click-didyoumean==0.3.0
click-plugins==1.1.1
click-repl==0.2.0
Deprecated==1.2.13
Flask==2.2.2
Flask-SocketIO==5.3.2
flower==1.2.0
gevent==22.10.2
gevent-websocket==0.10.1
greenlet==2.0.1
hiredis==2.0.0
humanize==4.4.0
itsdangerous==2.1.2
Jinja2==3.1.2
kombu==5.2.4
MarkupSafe==2.1.1
packaging==21.3
prometheus-client==0.15.0
prompt-toolkit==3.0.32
pyparsing==3.0.9
python-engineio==4.3.4
python-socketio==5.7.2
pytz==2022.6
redis==4.3.4
six==1.16.0
tornado==6.2
vine==5.0.0
wcwidth==0.2.5
Werkzeug==2.2.2
wrapt==1.14.1
zope.event==4.5.0
zope.interface==5.5.2

Files structure:

background/
├── celery.sh
├── main.py
├── tasks.py
└── templates
    └── user.html

background/main.py:

from gevent.monkey import patch_all
patch_all()

from background.tasks import send_message_with_celery, ack_callback

import logging

from flask import Flask, render_template, request
from flask_socketio import SocketIO, join_room

app = Flask(__name__)
app.config['SECRET_KEY'] = 'secret!'
app.logger.setLevel(logging.INFO)
socketio = SocketIO(app, message_queue='redis://', logger=True, engineio_logger=True)

@app.route('/user/<int:user_id>')
def user(user_id):
    return render_template('user.html', user_id=user_id)

@socketio.on('join')
def on_join(room):
    join_room(room)
    socketio.emit(
        'roomEntrance', f'sid {request.sid} has entered the {room=}',
        to=room,
        callback=ack_callback
    )
    return f'entered {room=}'

@app.route('/online_message_to/<int:recipient_id>')
def online_message_to(recipient_id):
    socketio.emit(
        'onlineMessage', 'online message from hook',
        to=f'room_{recipient_id}',
        callback=ack_callback
    )
    return f'sent to room_{recipient_id}'

@app.route('/background_message_to/<int:recipient_id>')
def background_message_to(recipient_id):
    send_message_with_celery.delay(recipient_id=recipient_id)
    return 'the message was put to the queue'

if __name__ == '__main__':
    socketio.run(app, host='0.0.0.0', port=5002)

background/tasks.py:

from celery import Celery
from flask_socketio import SocketIO

celery_app = Celery(__name__, imports='background.tasks', broker_url='redis://localhost')

socketio_celery = SocketIO(message_queue='redis://', logger=True, engineio_logger=True)

def ack_callback(*args, **kwargs):
    res = f'ack!, {args=}, {kwargs=}'
    print(res)

@celery_app.task()
def send_message_with_celery(recipient_id):
    socketio_celery.emit(
        'backgroundMessage', 'background message from hook',
        to=f'room_{recipient_id}', namespace='/',
        callback=ack_callback
    )

background/templates/user.html:

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>User {{ user_id }}</title>
</head>
<body>
<h1>Hello, user {{ user_id }}</h1>
<script src="https://cdn.socket.io/4.4.1/socket.io.min.js" integrity="sha384-fKnu0iswBIqkjxrhQCTZ7qlLHOFEgNkRmK2vaO/LbTZSXdJfAu6ewRBdwHPhBo/H" crossorigin="anonymous"></script>
    <script>
      const socket = io();
    </script>
    <script type="text/javascript">
      (() => {
        socket.onAny((eventName, data, callback) => {
          console.log(eventName+ ': ' + data);
          if(typeof callback !== "undefined"){
            callback("Ack from JS!" + " eventName: " + eventName + ", data: " + JSON.stringify(data));
          };
        });
      })();
    </script>
    <script>
        socket.emit('join', 'room_{{ user_id }}')
    </script>
  </body>
</html>

background/celery.sh:

#!/bin/bash
celery -A background.tasks.celery_app worker --loglevel=info

Start redis server and run Celery: /bin/bash background/celery.sh. Then start the application and visit http://localhost:5002/user/1.

Now if you open http://localhost:5002/online_message_to/1 then a message will be delivered to the client and you'll see something like this in the server logs:

emitting event "onlineMessage" to room_1 [/]
pubsub message: emit
LeWtgnRcpyHFvfVRAAAC: Sending packet MESSAGE data 25["onlineMessage","online message from hook"]
LeWtgnRcpyHFvfVRAAAC: Received packet MESSAGE data 35["Ack from JS! eventName: onlineMessage, data: \"online message from hook\""]
received ack from JUA_8xmKfrs5CgNOAAAD [/]
pubsub message: callback
ack!, args=('Ack from JS! eventName: onlineMessage, data: "online message from hook"',), kwargs={}

Notice the last line. It is created from the ack_callback function.

If you open http://localhost:5002/background_message_to/1 then a message will be delivered, too. But the callback will not be invoked. The server logs will look like this:

pubsub message: emit
LeWtgnRcpyHFvfVRAAAC: Sending packet MESSAGE data 26["backgroundMessage","background message from hook"]
LeWtgnRcpyHFvfVRAAAC: Received packet MESSAGE data 36["Ack from JS! eventName: backgroundMessage, data: \"background message from hook\""]
received ack from JUA_8xmKfrs5CgNOAAAD [/]
pubsub message: callback

The Celery logs will look like this:

[2022-11-20 21:00:18,076: INFO/MainProcess] Task background.tasks.send_message_with_celery[4922492b-2739-4f39-a5fb-fbcfeccd2a2d] received
emitting event "backgroundMessage" to room_1 [/]
[2022-11-20 21:00:18,077: INFO/ForkPoolWorker-15] emitting event "backgroundMessage" to room_1 [/]
[2022-11-20 21:00:18,078: INFO/ForkPoolWorker-15] Task background.tasks.send_message_with_celery[4922492b-2739-4f39-a5fb-fbcfeccd2a2d] succeeded in 0.0007624930003657937s: None

Both logs are missing the print of the ack.

The expected behaviour:

The callback should be invoked and the acknowledgment should be printed to the Celery logs. But I'm not pretty sure about it though as I am new to both Celery and SocketIO.

miguelgrinberg commented 1 year ago

Callbacks cannot be invoked in external workers. An external worker is a write-only instance, it cannot receive anything. If you want bidirectional support, then connect to the server as a client.

valerykustov commented 1 year ago

Thank you for your suggestion!

But I thought that is the purpose of publication of callback messages in the pub/sub system, i.e. redis in this example. It even contains the ack text:

{'args': ('Ack from JS! eventName: backgroundMessage, data: "background '
          'message from hook"',),
 'host_id': '69e34b8ccefa499e897885c431b46327',
 'id': 2,
 'method': 'callback',
 'namespace': '/',
 'sid': 'room_1'}

Then maybe it should be a feature request to make external workers execute the callbacks which they receive?

miguelgrinberg commented 1 year ago

External workers do not read from the queue, they only write. If you look at the code you will notice that the client manager instance is initialized with the write_only=True option.

The method that you are using to communicate with the server from the Celery worker is really not the most flexible, it was built long ago and has been obsoleted by the client in the python-socketio package, which has bidirectional support.

The payload that is written to the pub/sub queue does include the callback, but this was built for a completely different purpose, which is to support horizontally scaled servers. The support for external processes was never extended to take advantage of this, because at this point using a client is actually preferred.

valerykustov commented 1 year ago

Thank you for clarification! I'll try to switch to another mechanism.