faust-streaming / faust

Python Stream Processing. A Faust fork
https://faust-streaming.github.io/faust/
Other
1.65k stars 183 forks source link

Concurrency issue with topics #458

Closed kpister closed 1 year ago

kpister commented 1 year ago

I have a task listening on a topic and I send messages to the topic from the web endpoint for the app. The task is flagged with concurrency=2 but cannot handle multiple records at the same time.

Below I have provided an example app (run with faust -A example_app worker -l INFO -p 5001) which you can hit a /count/ endpoint to get a counter. For example purposes, I have put a time.sleep(5) in the task handler, but this could be any slow operation.

Workflow:

Hit the count endpoint twice, in quick succession.

`curl -X "POST" "http://localhost:5001/count/"

`curl -X "POST" "http://localhost:5001/count/"

Expected outcome:

Logs should be like

Endpoint hit
Added message queue
Example Task: {'message': 'example', 'count': 2}
Endpoint hit
Added message queue
Done with Example Task:  {'message': 'example', 'count': 2}
Example Task: {'message': 'example', 'count': 3}
Done with Example Task:  {'message': 'example', 'count': 3}

Actual output

Notice the time stamps, despite curling immediately.

[2023-02-28 13:54:41,620] [30988] [INFO] Endpoint hit
[2023-02-28 13:54:41,620] [30988] [INFO] Added message queue
[2023-02-28 13:54:41,666] [30988] [INFO] Example Task: {'message': 'example', 'count': 2}
[2023-02-28 13:54:46,678] [30988] [INFO] Endpoint hit
[2023-02-28 13:54:46,680] [30988] [INFO] Added message queue
[2023-02-28 13:54:46,764] [30988] [INFO] Example Task: {'message': 'example', 'count': 3}

Versions

Code

import logging
import time
import faust
from aiokafka.helpers import create_ssl_context
from faust.sensors.datadog import Monitor
from faust.web import Request, Response, View
from faust.types.auth import AuthProtocol

logger = logging.getLogger(__name__)
broker_credentials = faust.SASLCredentials(...)
broker_credentials.protocol = AuthProtocol.SASL_SSL

app = faust.App(
    "streaming",
    version=1,
    broker="...",
    value_serializer="json",
    broker_credentials=broker_credentials,
    partition_assignment_strategy="CooperativeStickyAssignor",
    monitor=Monitor(),
)

TOPIC_NAME = "example"
example_topic = app.topic(TOPIC_NAME, value_type=str, partitions=None)

@app.agent(example_topic, concurrency=2)
async def example_task(records):
    async for key, record in records.items():
        logger.info(f"Example Task: {record}")

        # Network IO
        time.sleep(5)
        logger.info(f"Done with Example Task: {record}")

@app.page("/count/")
class CountPage(View):
    count: int = 0

    async def post(self, request: Request) -> Response:
        self.count += 1
        msg = {"message": "example", "count": self.count }

        logger.info(f"Endpoint hit")
        await example_topic.send(key=TOPIC_NAME, value=msg)
        logger.info(f"Added message queue")

        return self.json({"count": self.count})

if __name__ == "__main__":
    app.main()
Roman1us commented 1 year ago

I think you are blocking event loop with sync sleep

kpister commented 1 year ago

I see what you mean, if we use await asyncio.sleep(5) instead of time.sleep(5) then the results seem to have the right ordering.

benjwpy commented 1 year ago

When time.sleep(5) is called, it will block the entire execution of the script and it will be put on hold, just frozen, doing nothing. But when you call await asyncio.sleep(5), it will ask the event loop to run something else while your await statement finishes its execution.

kpister commented 1 year ago

Okay, so for solving my problem, I'll need to transition the network call to use an async call instead of whatever it is currently doing. This is very helpful, thank you!