Delgan / loguru

Python logging made (stupidly) simple
MIT License
20.05k stars 703 forks source link

Using loguru with Celery and gevent to log request behaviour #1035

Open mrkmcnamee opened 12 months ago

mrkmcnamee commented 12 months ago

Hi! I wanted to share my experience with using loguru with Celery and gevent. Any feedback appreciated.

We use Celery workers to do the heavy-lifting which includes calls to REST API calls external services that may not always work. We needed a way to gracefully handle these failures and log what was happening for further analysis. For example:

response = session.get(url=url)

can throw many different exceptions which can be caught and logged as appropriate.

try:
  response = requests.get(url=url)
  response.raise_for_status()
  logger.info("URL reachable.", url=url)
except requests.exceptions.HTTPError:
  logger.error("HTTP error occurred.", url=url)
except requests.exceptions.ConnectionError:
  logger.error("Connection error occurred.", url=url)
except requests.exceptions.RequestException:
  logger.exception("Other error occurred.", url=url)
except Exception:
  logger.exception("An exception occurred.", url=url)

A request can also timeout. Most of the time you wouldn't wait forever for the server to return a response and instead would like to set a time limit on the request. The requests.get() function does support timeouts, but it is non-trivial to enforce a hard time limit using these settings as discussed here: https://stackoverflow.com/questions/21965484/timeout-for-python-requests-get-entire-response

Enter gevent which has built-in support for timeouts that can function as a hard-limit on response times. In our case gevent is also useful for making multiple requests at the same time in a non-blocking fashion, improving performance. There is some bootstrapping needed using the monkey patch, but this is automatically done when using Celery, which we are. The challenge now was to implement logging to capture the timeout. Here is the code that I used to figure out how to log the different events. Each worker failed in a different way, but I wanted a consistent way of logging the outcomes.

import logging
import sys
from time import sleep
import requests
from loguru import logger
import gevent
from celery import Celery

#
# Define a handler that intercepts logs from other loggers.
# See https://loguru.readthedocs.io/en/stable/overview.html#entirely-compatible-with-standard-logging
#
class InterceptHandler(logging.Handler):
    def emit(self, record):
        # get corresponding Loguru level if it exists
        try:
            level = logger.level(record.levelname).name
        except ValueError:
            level = record.levelno

        # find caller from where originated the logged message
        frame, depth = sys._getframe(6), 6
        while frame and frame.f_code.co_filename == logging.__file__:
            frame = frame.f_back
            depth += 1

        logger.opt(depth=depth, exception=record.exc_info).log(level, record.getMessage())

logging.getLogger("celery.app").handlers = [InterceptHandler()]
logging.getLogger("celery.worker").handlers = [InterceptHandler()]
logging.getLogger("celery.task").handlers = [InterceptHandler()]

app = Celery('myapp', broker='redis://localhost:6379')

def worker_ok():
    try:
        print("worker_ok starting.")
        r = requests.get("http://www.google.com")
        r.raise_for_status()
    except Exception:
        logger.exception("Exception occurred while running worker_ok")
        return None
    else:
        return r.status_code

def worker_exception():
    try:
        print("worker_exception starting.")
        a = 1 / 0
    except Exception:
        logger.exception("Exception occurred while running worker_exception")
        return None
    else:
        return a

@logger.catch(exception=gevent.Timeout, message="Exception occurred while running worker_timeout1")
def worker_timeout1():
    print("worker_timeout1 starting.")
    sleep(5)
    print("worker_timeout1 finishing.")
    return None

def worker_timeout2():
    timeout = gevent.Timeout(2)
    timeout.start()
    try:
        print("worker_timeout2 starting.")
        sleep(3)
    except gevent.Timeout:
        logger.exception("Exception occurred while running worker_timeout2")
        return None
    finally:
        timeout.close()

@app.task
def spawn_requests():
    logger.info("Starting workers.")
    greenlet1 = gevent.spawn(worker_ok)
    greenlet2 = gevent.spawn(worker_exception)
    greenlet3 = gevent.spawn(worker_timeout1)
    greenlet4 = gevent.spawn(worker_timeout2)

    greenlets = [greenlet1, greenlet2, greenlet3, greenlet4]

    logger.info("Waiting for workers to finish.")
    gevent.joinall(greenlets, raise_error=True, timeout=4)

    print("Successful?")
    print(f"greenlet1: {greenlet1.successful()}")
    print(f"greenlet2: {greenlet2.successful()}")
    print(f"greenlet3: {greenlet3.successful()}")
    print(f"greenlet4: {greenlet4.successful()}")

    app.control.broadcast('shutdown')

spawn_requests.delay()

To run this you will need Redis and the following command:

celery -A gevent_loguru_test.py worker -P gevent

No surprises with workers 1 and 2. Workers 3 and 4 attempted to trigger and log timeouts in different ways. Firstly, the gevent.joinall() function takes a timeout parameter which works as expected, however there is no exception thrown and no value set if the timeout occurs making it impossible to log that a timeout occurred. A substitute could be to use the successful() status (which is False for worker 3 as expected).

Worker 4 implements a timeout in the function which throws the gevent.Timeout exception (which must be explicitly caught) and this allows us to successfully log an exception for this event. The function then returns None and successful() returns True. Instead of returning None, we could raise a new Exception and then successful() would return False.

Delgan commented 12 months ago

Hi @mrkmcnamee.

Thanks for the detailed breakdown. I'm not a regular user of gevent but I learned quite a few things thanks to your clear explanations. It's also very insightful to have concrete use cases like this one, it helps to better appreciate how Loguru integrates with others frameworks.

Concerning the challenges you faced, it appears there is a notable technical and semantic distinction between a typical requests failure and the scenario where a greenlet coroutine fails to complete within the allotted time. As highlighted in one of the links you provided, the issue is not necessarily a request timeout but rather a lack of adequate time given for completion. Consequently, it seems prudent to address this at a higher level, specifically within the process responsible for managing the various coroutines, as this is where the decision to abort is made.

Regrettably, it is noteworthy that gevent does not provide a list of greenlets that have timed out after the invocation of joinall(). Additionally, there's an observed drawback in that the global timeout of joinall() might be considered "unfair," given that the latest started greenlet will have less time to complete. Consequently, I posit that a per-greenlet timeout would likely be more desirable.

I would implement it roughly like this:

import gevent
from loguru import logger

def do_request(*, succeed: bool = True, fake_duration: int = 0):
    gevent.sleep(fake_duration)
    if not succeed:
        raise Exception("Request failed")

def worker(*args, **kwargs):
    try:
        with gevent.Timeout(1):
            do_request(*args, **kwargs)
    except Exception:
        logger.error("Request failed")
    except gevent.Timeout:
        logger.error("Greenlet timed out")
    else:
        logger.success("Request succeeded")

if __name__ == "__main__":
    greenlet1 = gevent.Greenlet(worker, succeed=True)
    greenlet2 = gevent.Greenlet(worker, succeed=False)
    greenlet3 = gevent.Greenlet(worker, succeed=True, fake_duration=2)

    greenlet1.start()
    greenlet2.start()
    greenlet3.start()

    gevent.joinall([greenlet1, greenlet2, greenlet3])

Still, I'm not the most qualified to give advises regarding gevent usage.

mrkmcnamee commented 12 months ago

Nice, thanks @Delgan!

To fill in some more context, starting with an ID, we are requesting data from a service and then using the result to send more requests to drill down - a typical REST approach to discovering resources. The request activity would resemble a tree-structure with a depth of say 3. Any one of the requests could a) fail, b) take a long time or c) time out. If that happens the whole result is invalid. I have identified these constraints on the solution:

  1. Requests should start as soon as possible.
  2. No one request should take longer than 5 seconds (Done as above).
  3. The total time shouldn't exceed 30 seconds.
  4. If one request fails or times out, then all greenlets should be terminated.
  5. We want to log the duration of every request as well as log every failure (exceptions, timeouts, etc.)

This isn't totally relevant to a discussion of loguru, but I will share an implementation of the first constraint based on your solution above.

import time
import gevent
from gevent.event import AsyncResult
from loguru import logger

def do_request(*, id: str, succeed: bool = True, fake_duration: int = 0, output: AsyncResult = None, blocking_input: AsyncResult = None):
    if blocking_input:
        logger.debug(f"ID: {id}: Getting input...")
        print(blocking_input.get())

    start_time = time.perf_counter()
    gevent.sleep(fake_duration)
    execution_time = round((time.perf_counter() * 1000) - (start_time * 1000))
    logger.info(f"ID {id}: Request took {execution_time} milliseconds.")

    if not succeed:
        raise Exception(f"ID: {id}: Request failed")

    if output:
        logger.debug(f"ID: {id}: Setting output")
        output.set("Hello world!")

def worker(*args, id: str, **kwargs):
    try:
        with gevent.Timeout(3):
            do_request(*args, id=id, **kwargs)
    except Exception:
        logger.error(f"ID: {id}: Request failed")
    except gevent.Timeout:
        logger.error(f"ID: {id}: Greenlet timed out")
    else:
        logger.success(f"ID: {id}: Request succeeded")

if __name__ == "__main__":
    greenlet1 = gevent.Greenlet(worker, id=1, succeed=True)
    greenlet2 = gevent.Greenlet(worker, id=2, succeed=False)
    greenlet3 = gevent.Greenlet(worker, id=3, succeed=True, fake_duration=4)
    a = AsyncResult()
    greenlet4 = gevent.Greenlet(worker, id=4, succeed=True, fake_duration=2, output=a)
    greenlet5 = gevent.Greenlet(worker, id=5, succeed=True, blocking_input=a)

    greenlets = [greenlet1, greenlet2, greenlet3, greenlet4, greenlet5]
    [greenlet.start() for greenlet in greenlets]
    gevent.joinall(greenlets)
Delgan commented 12 months ago

Thanks for sharing the updated snippet.

I suppose you can achieve some of the mentioned constraints by using some of the gevent features. For example, I'm thinking about gevent.joinall(greenlets, timeout=30) for the 3rd constraint. However, these considerations are indeed outside Loguru's scope. I don't think the lib should have anything to do with the program's control-flow.