After using the handler, my team and I realized that even the async version becomes sync and blocks the execution whenever the queue is full. This was a huge problem for us because we didn't know where the massive latency came from. Following investigation we realized it was this library. Here's a truly threaded handler that will never block the execution:
import time
import traceback
from logging import Handler, LogRecord
from queue import PriorityQueue, Full
from threading import Thread
from fluent.sender import FluentSender
class FluentJob(Thread):
def __init__(self, queue: PriorityQueue):
super().__init__(daemon=True)
self.queue = queue
self.sender = None
def set_sender(self, name: str, host: str, port: int):
self.sender = FluentSender(name, host, port)
def run(self):
while True:
time.sleep(0.1)
while not self.queue.empty():
try:
self.send(*self.queue.get()[3])
except Exception:
traceback.print_exc()
def send(self, time: int, data: dict):
self.sender.emit_with_time(None, time, data)
class FluentDHandler(Handler):
queue = PriorityQueue(maxsize=1000)
fluent_job = FluentJob(queue)
def __init__(self, name: str, host: str = '127.0.0.1', port: int = 24224):
super().__init__()
if self.fluent_job.sender is None:
self.fluent_job.set_sender(name, host, port)
self.fluent_job.start()
def emit(self, record: LogRecord):
try:
data = self.format(record)
time = int(record.created)
self.queue.put_nowait((100 - record.levelno, record.created, id(data), [time, data]))
except Full:
pass # TODO: Find a way to handle a full queue, discard the log for now
After using the handler, my team and I realized that even the async version becomes sync and blocks the execution whenever the queue is full. This was a huge problem for us because we didn't know where the massive latency came from. Following investigation we realized it was this library. Here's a truly threaded handler that will never block the execution: