log4mongo / log4mongo-python

python logging handler for mongo database
http://log4mongo.org
Other
111 stars 37 forks source link

support pymongo connecting in background #51

Open chuckliu1979 opened 2 years ago

chuckliu1979 commented 2 years ago

hi,

in pymongo https://github.com/mongodb/mongo-python-driver/blob/master/pymongo/mongo_client.py: "```python

Starting with version 3.0 the MongoClient constructor no longer blocks while connecting

to the server or servers, and it no longer raises ConnectionFailure if they are unavailable.

Instead, the constructor returns immediately and launches the connection process on background

threads.



so log4mongo handler could continue initialize when mongodb not running. I made a patch for this:
> import threading
4,8c15
< try:
<     from pymongo import MongoClient as Connection
< except ImportError:
<     from pymongo import Connection
< 
---
> from pymongo import MongoClient
10,18c17,19
< from pymongo.errors import OperationFailure, PyMongoError
< import pymongo
< if pymongo.version_tuple[0] >= 3:
<     from pymongo.errors import ServerSelectionTimeoutError
<     write_method = 'insert_one'
<     write_many_method = 'insert_many'
< else:
<     write_method = 'save'
<     write_many_method = 'insert'
---
> from pymongo.errors import ConfigurationError, OperationFailure, ServerSelectionTimeoutError
> 
> # pylint: disable=pointless-string-statement,invalid-name
46a48
> # pylint: disable=too-many-instance-attributes,too-many-arguments,too-many-locals
49,50c51
<     DEFAULT_PROPERTIES = logging.LogRecord(
<         '', '', '', '', '', '', '', '').__dict__.keys()
---
>     DEFAULT_PROPERTIES = logging.LogRecord("", "", "", "", "", "", "", "").__dict__.keys()  # type: ignore
56,65c57,66
<             'timestamp': dt.datetime.utcnow(),
<             'level': record.levelname,
<             'thread': record.thread,
<             'threadName': record.threadName,
<             'message': record.getMessage(),
<             'loggerName': record.name,
<             'fileName': record.pathname,
<             'module': record.module,
<             'method': record.funcName,
<             'lineNumber': record.lineno
---
>             "timestamp": dt.datetime.utcnow(),
>             "level": record.levelname,
>             "thread": record.thread,
>             "threadName": record.threadName,
>             "message": record.getMessage(),
>             "loggerName": record.name,
>             "fileName": record.pathname,
>             "module": record.module,
>             "method": record.funcName,
>             "lineNumber": record.lineno,
69,75c70,72
<             document.update({
<                 'exception': {
<                     'message': str(record.exc_info[1]),
<                     'code': 0,
<                     'stackTrace': self.formatException(record.exc_info)
<                 }
<             })
---
>             document.update(
>                 {"exception": {"message": str(record.exc_info[1]), "code": 0, "stackTrace": self.formatException(record.exc_info)}}
>             )
78,79c75
<             contextual_extra = set(record.__dict__).difference(
<                 set(self.DEFAULT_PROPERTIES))
---
>             contextual_extra = set(record.__dict__).difference(set(self.DEFAULT_PROPERTIES))
87,92c83,101
< 
<     def __init__(self, level=logging.NOTSET, host='localhost', port=27017,
<                  database_name='logs', collection='logs',
<                  username=None, password=None, authentication_db='admin',
<                  fail_silently=False, formatter=None, capped=False,
<                  capped_max=1000, capped_size=1000000, reuse=True, **kwargs):
---
>     def __init__(
>         self,
>         level=logging.NOTSET,
>         host="localhost",
>         port=27017,
>         database_name="logs",
>         collection="logs",
>         username=None,
>         password=None,
>         authentication_db="admin",
>         fail_silently=False,
>         formatter=None,
>         capped=False,
>         capped_max=1000,
>         capped_size=1000000,
>         reuse=True,
>         ttl=3600,
>         **kwargs,
>     ):
122a132,133
>         self.ttl = ttl
>         self.ttl_index = ""
127c138
<         global _connection
---
>         global _connection  # pylint: disable=global-statement
131,150c142,148
<             if pymongo.version_tuple[0] < 3:
<                 try:
<                     self.connection = Connection(host=self.host,
<                                                  port=self.port, **kwargs)
<                 # pymongo >= 3.0 does not raise this error
<                 except PyMongoError:
<                     if self.fail_silently:
<                         return
<                     else:
<                         raise
<             else:
<                 self.connection = Connection(host=self.host, port=self.port,
<                                              **kwargs)
<                 try:
<                     self.connection.is_primary
<                 except ServerSelectionTimeoutError:
<                     if self.fail_silently:
<                         return
<                     else:
<                         raise
---
>             self.connection = MongoClient(host=self.host, port=self.port, **kwargs)
>             try:
>                 self.connection.is_primary
>             except ServerSelectionTimeoutError:
>                 if self.fail_silently:
>                     return
>                 raise
151a150
>         self._setup()
153c152,153
<         self.db = self.connection[self.database_name]
---
>     def _setup(self):
>         self.db = self.connection[self.database_name]  # type: ignore
155,157c155,156
<             auth_db = self.connection[self.authentication_database_name]
<             self.authenticated = auth_db.authenticate(self.username,
<                                                       self.password)
---
>             auth_db = self.connection[self.authentication_database_name]  # type: ignore
>             self.authenticated = auth_db.authenticate(self.username, self.password)
164,166c163
<                 self.collection = Collection(self.db, self.collection_name,
<                                              capped=True, max=self.capped_max,
<                                              size=self.capped_size)
---
>                 self.collection = Collection(self.db, self.collection_name, capped=True, max=self.capped_max, size=self.capped_size)
172a170,178
>     def _ensure_log4mongo_index(self):
>         if not self.ttl_index and self.ttl > 0:
>             try:
>                 self.ttl_index = self.collection.create_index(  # type: ignore
>                     "timestamp", expireAfterSeconds=self.ttl, background=True
>                 )
>             except (TypeError, ConfigurationError, ServerSelectionTimeoutError):
>                 pass
> 
178c184
<             self.db.logout()
---
>             self.db.logout()  # type: ignore
183a190,197
> 
>         if self.collection is None:
>             try:
>                 self._setup()
>             except ServerSelectionTimeoutError:
>                 pass
>         self._ensure_log4mongo_index()
> 
186,187c200,201
<                 getattr(self.collection, write_method)(self.format(record))
<             except Exception:
---
>                 self.collection.insert_one(self.format(record))
>             except Exception:  # pylint: disable=broad-except
191c205
<     def __exit__(self, type, value, traceback):
---
>     def __exit__(self, type, value, traceback):  # pylint: disable=redefined-builtin
196,203c210,231
< 
<     def __init__(self, level=logging.NOTSET, host='localhost', port=27017,
<                  database_name='logs', collection='logs',
<                  username=None, password=None, authentication_db='admin',
<                  fail_silently=False, formatter=None, capped=False,
<                  capped_max=1000, capped_size=1000000, reuse=True,
<                  buffer_size=100, buffer_periodical_flush_timing=5.0,
<                  buffer_early_flush_level=logging.CRITICAL, **kwargs):
---
>     def __init__(
>         self,
>         level=logging.NOTSET,
>         host="localhost",
>         port=27017,
>         database_name="logs",
>         collection="logs",
>         username=None,
>         password=None,
>         authentication_db="admin",
>         fail_silently=False,
>         formatter=None,
>         capped=False,
>         capped_max=1000,
>         capped_size=1000000,
>         reuse=True,
>         ttl=3600,
>         buffer_size=100,
>         buffer_periodical_flush_timing=5.0,
>         buffer_early_flush_level=logging.CRITICAL,
>         **kwargs,
>     ):
219,222c247,265
<         MongoHandler.__init__(self, level=level, host=host, port=port, database_name=database_name, collection=collection,
<                               username=username, password=password, authentication_db=authentication_db,
<                               fail_silently=fail_silently, formatter=formatter, capped=capped, capped_max=capped_max,
<                               capped_size=capped_size, reuse=reuse, **kwargs)
---
>         MongoHandler.__init__(
>             self,
>             level=level,
>             host=host,
>             port=port,
>             database_name=database_name,
>             collection=collection,
>             username=username,
>             password=password,
>             authentication_db=authentication_db,
>             fail_silently=fail_silently,
>             formatter=formatter,
>             capped=capped,
>             capped_max=capped_max,
>             capped_size=capped_size,
>             reuse=reuse,
>             ttl=ttl,
>             **kwargs,
>         )
227c270
<         self.last_record = None #kept for handling the error on flush
---
>         self.last_record = None  # kept for handling the error on flush
230c273,274
<         self._buffer_lock = None
---
>         self.buffer_lock = threading.RLock()
> 
237,238c281
<             import atexit
<             atexit.register(self.destroy)
---
>             import atexit  # pylint: disable=import-outside-toplevel
240,241c283
<             import threading
<             self._buffer_lock = threading.RLock()
---
>             atexit.register(self.destroy)
258c300,302
<             self._timer_stopper, self.buffer_timer_thread = call_repeatedly(self.buffer_periodical_flush_timing, self.flush_to_mongo)
---
>             self._timer_stopper, self.buffer_timer_thread = call_repeatedly(
>                 self.buffer_periodical_flush_timing, self.flush_to_mongo
>             )
262,263c306,315
< 
<         self.add_to_buffer(record)
---
>         if self.collection is None:
>             try:
>                 MongoHandler._setup(self)
>             except ServerSelectionTimeoutError:
>                 pass
>         MongoHandler._ensure_log4mongo_index(self)
> 
>         with self.buffer_lock:
>             self.last_record = record
>             self.buffer.append(self.format(record))
267,287d318
<         return
< 
<     def buffer_lock_acquire(self):
<         """Acquire lock on buffer (only if periodical flush is set)."""
<         if self._buffer_lock:
<             self._buffer_lock.acquire()
< 
<     def buffer_lock_release(self):
<         """Release lock on buffer (only if periodical flush is set)."""
<         if self._buffer_lock:
<             self._buffer_lock.release()
< 
<     def add_to_buffer(self, record):
<         """Add a formatted record to buffer."""
< 
<         self.buffer_lock_acquire()
< 
<         self.last_record = record
<         self.buffer.append(self.format(record))
< 
<         self.buffer_lock_release()
292,302c323,329
<             self.buffer_lock_acquire()
<             try:
< 
<                 getattr(self.collection, write_many_method)(self.buffer)
<                 self.empty_buffer()
< 
<             except Exception as e:
<                 if not self.fail_silently:
<                     self.handleError(self.last_record) #handling the error on flush
<             finally:
<                 self.buffer_lock_release()
---
>             with self.buffer_lock:
>                 try:
>                     self.collection.insert_many(self.buffer)
>                     self.empty_buffer()
>                 except Exception:  # pylint: disable=broad-except
>                     if not self.fail_silently:
>                         self.handleError(self.last_record)  # type: ignore
315,316d341
< 
<