Neutrino3316 / rss_spider

A spider to grap and store data from rss source.
0 stars 0 forks source link

Update logging for multiprocessing #33

Open Neutrino3316 opened 4 years ago

Neutrino3316 commented 4 years ago

Use queue handler to log throught different process.

The offical documentation is important.

Here's an example from stackoverflow and jruere/multiprocessing-logging

import multiprocessing
import logging
from logging.handlers import QueueHandler, QueueListener
import time
import random

def f(i):
    time.sleep(random.uniform(.01, .05))
    logging.info('function called with {} in worker thread.'.format(i))
    time.sleep(random.uniform(.01, .05))
    return i

def worker_init(q):
    # all records from worker processes go to qh and then into q
    qh = QueueHandler(q)
    logger = logging.getLogger("worker_i")
    logger.setLevel(logging.DEBUG)
    logger.addHandler(qh)

def logger_init():
    q = multiprocessing.Queue()
    # this is the handler for all log records
    handler = logging.StreamHandler()
    # handler.setFormatter(logging.Formatter("%(levelname)s: %(asctime)s - %(process)s - %(message)s"))
    handler.setFormatter(logging.Formatter(
        "%(asctime)s %(levelname)s %(filename)s:%(lineno)d PID:%(process)d %(name)s \t %(message)s"
    ))

    # ql gets records from the queue and sends them to the handler
    ql = QueueListener(q, handler)
    ql.start()

    logger = logging.getLogger()
    logger.setLevel(logging.DEBUG)
    # add the handler to the logger so records from this process are handled
    logger.addHandler(handler)

    return ql, q

def main():
    q_listener, q = logger_init()

    logging.info('hello from main thread')
    pool = multiprocessing.Pool(4, worker_init, [q])
    for result in pool.map(f, range(10)):
        pass
    pool.close()
    pool.join()
    q_listener.stop()

if __name__ == '__main__':
    main()
Neutrino3316 commented 4 years ago

From the logging documentation of python, we know we may could use the property of getLogger() function to log in different processes.

getLogger() returns a reference to a logger instance with the specified name if it is provided, or root if not. The names are period-separated hierarchical structures. Multiple calls to getLogger() with the same name will return a reference to the same logger object. Loggers that are further down in the hierarchical list are children of loggers higher up in the list. For example, given a logger with a name of foo, loggers with names of foo.bar, foo.bar.baz, and foo.bam are all descendants of foo.

import multiprocessing
import logging
from logging.handlers import QueueHandler, QueueListener
import time
import random

def f(i):
    time.sleep(random.uniform(.01, .05))
    logger = logging.getLogger("main.worker")
    logger.info('function called with {} in worker thread.'.format(i))
    time.sleep(random.uniform(.01, .05))
    return i

def worker_init(q):
    # all records from worker processes go to qh and then into q
    qh = QueueHandler(q)
    logger = logging.getLogger("main")
    logger.setLevel(logging.DEBUG)
    logger.addHandler(qh)

def logger_init():
    q = multiprocessing.Queue()
    # this is the handler for all log records
    handler = logging.StreamHandler()
    # handler.setFormatter(logging.Formatter("%(levelname)s: %(asctime)s - %(process)s - %(message)s"))
    handler.setFormatter(logging.Formatter(
        "%(asctime)s %(levelname)s %(filename)s:%(lineno)d PID:%(process)d %(name)s \t %(message)s"
    ))

    # ql gets records from the queue and sends them to the handler
    ql = QueueListener(q, handler)
    ql.start()

    logger = logging.getLogger("main")
    logger.setLevel(logging.DEBUG)
    # add the handler to the logger so records from this process are handled
    logger.addHandler(handler)

    return ql, q

def main():
    q_listener, q = logger_init()

    logger = logging.getLogger("main.root")
    logger.info('hello from main thread')
    pool = multiprocessing.Pool(4, worker_init, [q])
    for result in pool.map(f, range(10)):
        pass
    pool.close()
    pool.join()
    q_listener.stop()

if __name__ == '__main__':
    main()
Neutrino3316 commented 4 years ago

I think we need reconsider how to use multi multiprocessing. The program initialize every work in one process, and then use a multiprocessing pool, add every work.show_id function, this will go wrong with the queuelistner. Perhaps we should start a new process for each worker, and initialize the work in its process rather than the main process?


Without the QueueListener

import multiprocessing
import logging
from logging.handlers import QueueHandler, QueueListener
import time
import random

class Worker:

    def __init__(self, worker_id: int):
        self.worker_id = worker_id
        # self.queue_handler = queue_handler
        # self.logger = logging.getLogger("main.worker_%d" % self.worker_id)
        # self.logger.addHandler(self.queue_handler)
        # self.logger.info("message from worker %d" % self.worker_id)
        print("worker %d is created" % self.worker_id)

    def show_id(self):
        time.sleep(random.uniform(.01, .05))
        print("show_id called, %d" % self.worker_id)
        time.sleep(random.uniform(.01, .05))

def logger_init():
    queue = multiprocessing.Queue()
    # this is the handler for all log records
    handler = logging.StreamHandler()
    # handler.setFormatter(logging.Formatter("%(levelname)s: %(asctime)s - %(process)s - %(message)s"))
    handler.setFormatter(logging.Formatter(
        "%(asctime)s %(levelname)s %(filename)s:%(lineno)d PID:%(process)d %(name)s \t %(message)s"
    ))

    # queue_listener gets records from the queue and sends them to the handler
    queue_listener = logging.handlers.QueueListener(queue, handler)
    queue_listener.start()

    logger = logging.getLogger("main")
    logger.setLevel(logging.DEBUG)
    # add the handler to the logger so records from this process are handled
    logger.addHandler(handler)

    return queue_listener, queue

if __name__ == '__main__':

    queue_listener, queue = logger_init()

    logger = logging.getLogger("main.root")
    logger.info("logging from main, starting.")

    worker_num = 7
    worker_list = []
    for i in range(worker_num):
        worker_list.append(Worker(i))

    # show in one process
    # for worker in worker_list:
    #     worker.show_id()

    # show in multiprocess
    pool = multiprocessing.Pool(worker_num)
    for worker in worker_list:
        pool.apply_async(worker.show_id)
    pool.close()
    pool.join()
    queue_listener.stop()

    logger.info("logging from main, everything ended.")

Output:

2020-02-06 17:24:08,157 INFO test.py:50 PID:27780 main.root logging from main, starting. worker 0 is created worker 1 is created worker 2 is created worker 3 is created worker 4 is created worker 5 is created worker 6 is created show_id called, 0 show_id called, 1 show_id called, 3 show_id called, 5 show_id called, 2 show_id called, 4 show_id called, 6 2020-02-06 17:24:09,096 INFO test.py:69 PID:27780 main.root logging from main, everything ended.


With the QueueListener

import multiprocessing
import logging
from logging.handlers import QueueHandler, QueueListener
import time
import random

class Worker:

    def __init__(self, worker_id: int, queue):
        self.worker_id = worker_id
        self.queue_handler = QueueHandler(queue)
        self.logger = logging.getLogger("main.worker_%d" % self.worker_id)
        self.logger.addHandler(self.queue_handler)
        self.logger.info("message from worker %d" % self.worker_id)
        print("worker %d is created" % self.worker_id)

    def show_id(self):
        time.sleep(random.uniform(.01, .05))
        print("show_id called, %d" % self.worker_id)
        self.logger.info("message from show_id %d" % self.worker_id)
        time.sleep(random.uniform(.01, .05))

def logger_init():
    queue = multiprocessing.Queue()
    # this is the handler for all log records
    handler = logging.StreamHandler()
    # handler.setFormatter(logging.Formatter("%(levelname)s: %(asctime)s - %(process)s - %(message)s"))
    handler.setFormatter(logging.Formatter(
        "%(asctime)s %(levelname)s %(filename)s:%(lineno)d PID:%(process)d %(name)s \t %(message)s"
    ))

    # queue_listener gets records from the queue and sends them to the handler
    queue_listener = logging.handlers.QueueListener(queue, handler)
    queue_listener.start()

    logger = logging.getLogger("main")
    logger.setLevel(logging.DEBUG)
    # add the handler to the logger so records from this process are handled
    logger.addHandler(handler)

    return queue_listener, queue

if __name__ == '__main__':

    queue_listener, queue = logger_init()

    logger = logging.getLogger("main.root")
    logger.info("logging from main, starting.")

    worker_num = 7
    worker_list = []
    for i in range(worker_num):
        worker_list.append(Worker(i, queue))

    # show in one process
    # for worker in worker_list:
    #     worker.show_id()

    # show in multiprocess
    pool = multiprocessing.Pool(worker_num)
    for worker in worker_list:
        pool.apply_async(worker.show_id)
    pool.close()
    pool.join()
    queue_listener.stop()

    logger.info("logging from main, everything ended.")

The output:

2020-02-06 17:33:18,392 INFO test.py:51 PID:27812 main.root logging from main, starting. 2020-02-06 17:33:18,392 INFO test.py:15 PID:27812 main.worker_0 message from worker 0 2020-02-06 17:33:18,393 INFO test.py:15 PID:27812 main.worker_1 message from worker 1 2020-02-06 17:33:18,393 INFO test.py:15 PID:27812 main.worker_2 message from worker 2 2020-02-06 17:33:18,393 INFO test.py:15 PID:27812 main.worker_3 message from worker 3 2020-02-06 17:33:18,392 INFO test.py:15 PID:27812 main.worker_0 message from worker 0 2020-02-06 17:33:18,394 INFO test.py:15 PID:27812 main.worker_4 message from worker 4 2020-02-06 17:33:18,393 INFO test.py:15 PID:27812 main.worker_1 message from worker 1 2020-02-06 17:33:18,394 INFO test.py:15 PID:27812 main.worker_5 message from worker 5 2020-02-06 17:33:18,395 INFO test.py:15 PID:27812 main.worker_6 message from worker 6 2020-02-06 17:33:18,393 INFO test.py:15 PID:27812 main.worker_2 message from worker 2 2020-02-06 17:33:18,393 INFO test.py:15 PID:27812 main.worker_3 message from worker 3 2020-02-06 17:33:18,394 INFO test.py:15 PID:27812 main.worker_4 message from worker 4 2020-02-06 17:33:18,394 INFO test.py:15 PID:27812 main.worker_5 message from worker 5 2020-02-06 17:33:18,395 INFO test.py:15 PID:27812 main.worker_6 message from worker 6 worker 0 is created worker 1 is created worker 2 is created worker 3 is created worker 4 is created worker 5 is created worker 6 is created 2020-02-06 17:33:19,260 INFO test.py:70 PID:27812 main.root logging from main, everything ended.

The show_id function is never called!