Open zbentley opened 3 years ago
Could someone please point me at what code I should look at if I wanted to put up a PR to fix this? It's quite a hassle for environments with more than one Pulsar cluster (and thus more than one client). I'm happy to do the work, but am not familiar with C++ enough to find out the code responsible here--I think it has something to do with constexpr
but am not positive.
In C++ client, the logger factory is a global object and can only be initialized only once in the constructor of Client
.
The LogUtils::setLoggerFactory
method ensures that the logger factory can only be initialized once.
I think it's better not to change the current behavior. Each C++ source file uses DECLARE_LOG_OBJECT
macro to get the
thread local logger from the factory.
Once a logger of a thread is created, even if the global logger factory was reset, the logger won't be affected.
The issue had no activity for 30 days, mark with Stale label.
@BewareMyPower that's pretty unusual and bad behavior at least in the Python ecosystem. Client objects aren't singletons; they shouldn't store state once discarded. Most production code that uses clients for anything (databases, brokers, you name it) supports the total destruction and then reconnection/reconfiguration of clients.
That includes logging behavior; especially because there are some really weird interactions between Python clients with the logger set and the Python runtime itself (https://github.com/apache/pulsar/issues/16527), I think it's important to make sure that when a Client
object is garbage collected in Python, all state is reset to what it was before that Client
was created. That includes:
In general, the client shouldn't use process-global state; just instance-global state. Instances are not necesarily long-lived or singletons.
For example, our production use case maintains (and periodically disconnects/reconnects) a handful of clients to different brokers, each with different logging settings.
Yes. I think it's just because all Pulsar C++ client's source code share the same logger factory. But the ClientConfiguration
has a logger factory config.
I think we should add an independent logger config to C++ client that manages the logger of the whole library, it should not be configured on a Client
. And we should make it clear which classes can be affected by changing the Client
's logger.
I think that still may result in issues, though. What if the library-global logger factory is configured to wrap a logger object whose lifecycle is not bound to the library--like a Python logger, which may need to be mutated, created, destroyed etc., between constructions of Client
? Wouldn't that result in issues like this one?
In other words, I think a library-global logger is not a good idea so long as we support passing in logger objects from Python. I think the two are mutually exclusive (if you don't want bugs/weird behavior that is).
IMO, the library-global logger is more for safety. Currently C++ client uses a thread local logger for all source files of the library so that the user-provided Logger implementation doesn't need to be thread-safe. If a logger binds to a specific class instance like Client
, the logger could be used by multiple threads, so it has to be thread-safe.
Do you have any better idea? @merlimat
To double down, I think the statement @merlimat made here:
you're expected to keep 1 single instance of C++ Pulsar client for the duration of your application
is not going to be true in a lot of the Python world. In Python, the pulsar client is one library of many in potentially long-lived applications; it's not necessarily part of a global, nor is it expected (or desired) for it to manage a global pool of connection state. Python programs can and will attempt to briefly use pulsar clients and then fully dispose of them, potentially doing this many times throughout their life.
@BewareMyPower re: thread safety, some thoughts:
Moved to the new dedicated client repo.
Describe the bug When
Client
instances are constructed with the Python driver, they seem to accumulate the value of thelogger
argument as global state. This means two things:logger
, all subsequent clients constructed with alogger
will not use it.These behaviors both occur whether or not previously-constructed clients still exist--even if previous clients have been disconnected and garbage collected, issues still occur.
To Reproduce
localhost:6650
.def logger_with_prefix(prefix): logger = logging.getLogger('test') ch = logging.StreamHandler() formatter = logging.Formatter('{}: %(message)s'.format(prefix)) ch.setFormatter(formatter) logger.addHandler(ch) return logger
if name == 'main': print("Creating first") first = Client(service_url='pulsar://localhost:6650/') print("Destroying first") del first print("Creating second") second = Client( service_url='pulsar://localhost:6650/', logger=logger_with_prefix("FOO"), ) consumer = second.subscribe('sometopic', 'somesub') consumer.close()
FOO
logger and once on theBAR
logger.def logger_with_prefix(prefix): logger = logging.getLogger('test') ch = logging.StreamHandler() formatter = logging.Formatter('{}: %(message)s'.format(prefix)) ch.setFormatter(formatter) logger.addHandler(ch) return logger
if name == 'main': print("Creating first") first = Client( service_url='pulsar://localhost:6650/', logger = logger_with_prefix("FOO"), ) print("Destroying first") del first print("Creating second") second = Client( service_url='pulsar://localhost:6650/', ) consumer = second.subscribe('sometopic', 'somesub') consumer.close()
∴ python tests/benchmark/scratch.py Creating first Destroying first Creating second 2021-08-30 16:44:35.805 INFO [0x1178f2e00] Client:88 | Subscribing on Topic :sometopic 2021-08-30 16:44:35.806 INFO [0x1178f2e00] ConnectionPool:84 | Created connection for pulsar://localhost:6650/ 2021-08-30 16:44:35.808 INFO [0x70000c911000] ClientConnection:372 | [127.0.0.1:57417 -> 127.0.0.1:6650] Connected to broker 2021-08-30 16:44:35.821 INFO [0x70000c911000] HandlerBase:55 | [persistent://public/default/sometopic, somesub, 0] Getting connection from pool 2021-08-30 16:44:35.822 INFO [0x70000c911000] ConnectionPool:84 | Created connection for pulsar://localhost:6650 2021-08-30 16:44:35.823 INFO [0x70000c911000] ClientConnection:374 | [127.0.0.1:57418 -> 127.0.0.1:6650] Connected to broker through proxy. Logical broker: pulsar://localhost:6650 2021-08-30 16:44:35.839 INFO [0x70000c911000] ConsumerImpl:220 | [persistent://public/default/sometopic, somesub, 0] Created consumer on broker [127.0.0.1:57418 -> 127.0.0.1:6650] 2021-08-30 16:44:35.839 INFO [0x1178f2e00] ConsumerImpl:874 | [persistent://public/default/sometopic, somesub, 0] Closing consumer for topic persistent://public/default/sometopic 2021-08-30 16:44:35.840 INFO [0x70000c911000] ConsumerImpl:930 | [persistent://public/default/sometopic, somesub, 0] Closed consumer 0 2021-08-30 16:44:35.848 INFO [0x1178f2e00] ClientConnection:1446 | [127.0.0.1:57418 -> 127.0.0.1:6650] Connection closed 2021-08-30 16:44:35.848 ERROR [0x70000c911000] ClientConnection:531 | [127.0.0.1:57418 -> 127.0.0.1:6650] Read failed: Operation canceled 2021-08-30 16:44:35.849 INFO [0x1178f2e00] ClientConnection:261 | [127.0.0.1:57418 -> 127.0.0.1:6650] Destroyed connection 2021-08-30 16:44:35.849 ERROR [0x70000c911000] ClientConnection:531 | [127.0.0.1:57417 -> 127.0.0.1:6650] Read failed: Operation canceled 2021-08-30 16:44:35.849 INFO [0x1178f2e00] ClientConnection:1446 | [127.0.0.1:57417 -> 127.0.0.1:6650] Connection closed 2021-08-30 16:44:35.849 INFO [0x1178f2e00] ClientConnection:261 | [127.0.0.1:57417 -> 127.0.0.1:6650] Destroyed connection
import logging from pulsar import Client
def logger_with_prefix(prefix): logger = logging.getLogger('test') ch = logging.StreamHandler() formatter = logging.Formatter('{}: %(message)s'.format(prefix)) ch.setFormatter(formatter) logger.addHandler(ch) return logger
if name == 'main': print("Creating first") first = Client( service_url='pulsar://localhost:6650/', logger = logger_with_prefix("FOO"), ) print("Destroying first") del first print("Creating second") second = Client( service_url='pulsar://localhost:6650/', logger=logger_with_prefix("BAR"), ) consumer = second.subscribe('sometopic', 'somesub') consumer.close()
∴ python tests/benchmark/scratch.py Creating first Destroying first Creating second FOO: Subscribing on Topic :sometopic FOO: Created connection for pulsar://localhost:6650/ FOO: [127.0.0.1:57427 -> 127.0.0.1:6650] Connected to broker FOO: [persistent://public/default/sometopic, somesub, 0] Getting connection from pool FOO: Created connection for pulsar://localhost:6650 FOO: [127.0.0.1:57428 -> 127.0.0.1:6650] Connected to broker through proxy. Logical broker: pulsar://localhost:6650 FOO: [persistent://public/default/sometopic, somesub, 0] Created consumer on broker [127.0.0.1:57428 -> 127.0.0.1:6650] FOO: [persistent://public/default/sometopic, somesub, 0] Closing consumer for topic persistent://public/default/sometopic FOO: [persistent://public/default/sometopic, somesub, 0] Closed consumer 0 FOO: [127.0.0.1:57428 -> 127.0.0.1:6650] Connection closed FOO: [127.0.0.1:57427 -> 127.0.0.1:6650] Connection closed