ray-project / ray

Ray is an AI compute engine. Ray consists of a core distributed runtime and a set of AI Libraries for accelerating ML workloads.
https://ray.io
Apache License 2.0
34.17k stars 5.8k forks source link

[CORE] importing ray closes logging handlers, breaking custom logging #48846

Open Liquidmasl opened 5 days ago

Liquidmasl commented 5 days ago

What happened + What you expected to happen

I am debugging for 2 days now why our Loki logging handler will not log some lines.

I think I understand now that ray is removing all logging handlers and just.. readding (?) these?

Handler.__init__ <class 'ray._private.log.PlainRayHandler'>
Handler.__init__ <class 'logging._StderrHandler'>
Handler.__init__ <class 'logging.StreamHandler'>

Meaning the handler that is responsible for sending our logs to our logging server just disappears. I have not figured out why this happens.

I added a debug print in logging.Handerl.__init__ and logging.Handler.close, this is what i get:

Handler.close <class 'processing.intern_depend.cirq_logger.loki_logger.CustomLokiHandler'>
Handler.close <class 'processing.intern_depend.cirq_logger.loki_logger.CustomLokiQueueHandler'>
Handler.close <class 'logging.StreamHandler'>
Handler.close <class 'logging.NullHandler'>
Handler.close <class 'logging.NullHandler'>
Handler.close <class 'logging.StreamHandler'>
Handler.close <class 'logging.NullHandler'>
Handler.close <class 'logging._StderrHandler'>
Handler.__init__ <class 'ray._private.log.PlainRayHandler'>
Handler.__init__ <class 'logging._StderrHandler'>
Handler.__init__ <class 'logging.StreamHandler'>

So my loki handler is just... gone. And that without any notice, I just noticed that logs are missing on the server.

When I debug into the close method I see in the call stack that is being triggered by generate_logging_config() in ray/private/_log and it seams it is called always from ray/__init__.py in line 7

The only thing i can do is ensure that i ALWAYS import ray before i initialize my logging. a little change in linter configuration can lead to the ray import slipping below the logger initialization, silently shutting of our custom logger.

I think this is quite the severe issue! And it was a very hard and strange issue to find.

(might be related to https://github.com/ray-project/ray/issues/22312 ?)

After more trial and error I am even more confused. Using a queue handler, or another custom handler, it still works, even though apparently the handlers are closed. but when the custom handler has some custom shut down or close() code. stuff breaks. The 'minimal' reproducer is not so minimal, but it manages to reproduce without any libraries.

Versions / Dependencies

Windows 11 Ray 2.39 Python 3.11

Reproduction script

import logging
import queue
import threading
import time
from logging.handlers import QueueHandler
from logging.handlers import QueueListener

root_logger = logging.getLogger()
root_logger.setLevel(logging.DEBUG)

class CustomQueueHandler(QueueHandler):

    def __init__(self, queue, **kwargs):
        super().__init__(queue)
        self.custom_handler = BatchHandler(**kwargs)  # noqa: WPS110
        self.listener = QueueListener(self.queue, self.custom_handler)
        self.listener.start()

    def close(self):
        self.listener.stop()
        self.custom_handler.stop()
        super().close()

    def flush(self):
        self.custom_handler.flush()
        super().flush()

class BatchHandler(logging.Handler):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.lock = threading.Lock()
        self.last_handle_time = time.time()
        self.stop_flag = False
        self.batch = []
        self.thread = threading.Thread(target=self._monitor)
        self.thread.daemon = True
        self.thread.start()

    def handle(self, record):
        with self.lock:
            self.batch.append(record)
            print(self.batch)

    def _monitor(self):
        while True:
            try:
                if len(self.batch) > 3:
                    self.handle_batch()

                if time.time() - self.last_handle_time > 2:
                    self.handle_batch()

                if self.stop_flag:
                    self.handle_batch()
                    break

                time.sleep(1)
            except queue.Empty:
                pass

    def handle_batch(self):

        with self.lock:
            b = self.batch
            self.batch = []

        for i, record in enumerate(b):
            print(f'{i}: {record.msg}')

    def stop(self):
        self.stop_flag = True
        self.thread.join()

q = queue.Queue()
root_logger.addHandler(CustomQueueHandler(q))

root_logger.info('I will be printed 1')

import ray # everthing AFTER this import will not be handled/printed. Moving the import before the handler is set, everthing is printed.

root_logger.info('I will be missing when this is below the ray import ')
logging.getLogger('some.other.logger').info('I will also be missing if i am below the ray import.')

Issue Severity

~Medium: It is a significant difficulty but I can work around it.~ Severe: it blocks me and also brings me to the edge of insanity

Liquidmasl commented 4 days ago

Actualy now deeper down in my software, when i thought it finally works. it seams some modin df initialisation REIMPORTS ray, or uses some other import, or dunno, which leads to the same method being called, again destroying my handlers...

So now i have to put from ray.data import from_pandas_refs into my logging module aswell. so to make it work

alexeykudinkin commented 15 hours ago

This is a duplicate of https://github.com/ray-project/ray/issues/48732