MetPX / sarracenia

https://MetPX.github.io/sarracenia
GNU General Public License v2.0
44 stars 22 forks source link

Review amqp.py to consider publish_confirms, consumers, etc... #457

Open petersilva opened 2 years ago

petersilva commented 2 years ago

Seeing message loss in flakey_broker (sessions looking at logs with @reidsunderland ) and we think one way to address it, is to add publisher_confirms to the amqp.py module.

Looking at: https://docs.celeryproject.org/projects/amqp/en/latest/index.html

We need to add a callback to the amqp.py and have the channel refer to it. the ack will remove messages from the ack_wait worklist... (basic publish would need to put messages on the "pending_messages" waitlist... Channel.events['basic_ack'].append(my_callback) <-- search for that...

so far... the way the amqp library is used is completely sychronous. we perform an operation, we wait for it. then next operation... I think this requires having the library do some async stuff... there is mention of: Connection.heartbeat_tick(rate=2) must called at regular intervals (half of the heartbeat value if rate is 2).

if you look at mqtt.py... it is already implemented that way. so we have a model.

petersilva commented 2 years ago

side effect of #407

petersilva commented 2 years ago

I think it means putSetup launching a thread.... and putCleanup likely needs to reap it?

reidsunderland commented 2 years ago

While reading the mqtt.py code, I noticed a possible bug on line 379?

https://github.com/MetPX/sarracenia/blob/bee974e50b2236b883ed3449dd82bee1d0b5cf47/sarracenia/moth/mqtt.py#L377-L379

petersilva commented 2 years ago

yeah... err should be ex, I guess.

petersilva commented 2 years ago

based on @reidsunderland's work on #407, this does not look like a priority for now.

If we come back to this, we should also study whether use of consumers (basic_consume) would be helpful in some way. note that in brokers we never register as "consumers." It is unclear what effect the lack of registration as consumers has on message flow.

petersilva commented 2 years ago

more reading: https://www.rabbitmq.com/tutorials/tutorial-seven-java.html

reidsunderland commented 7 months ago

I made an attempt at using basic_consume instead of basic_get. It seems to work, and the flow tests pass. On branch issue_457_amqp_consumer

petersilva commented 7 months ago

very cool! Use "amqp_consumer" as a flag to use that AMQP implementation instead of the standard one, so I guess we can do A:B testing that ways... That makes it super low risk to merge, and try it out in various situations to be more certain about it.

I guess you put the flag in default.conf when you run the flow tests, and it's OK?

Yeah, the lack of registered consumers has always bothered me (The C one uses a consumer btw.) I don't know why you need the findAllSubclasses() entry point, feels like it was fine before (would have found this new one, no?)

This is a super safe easy way to introduce the code without changing behaviour, so we can kick the tires for a while. After a while we could just drop the flag (or set it to always true) or merge the classes... or something, I´m struggling to know why we would keep both once we are confident that basic_consume stuff works.

reidsunderland commented 7 months ago

Yeah, I ran the flow tests with amqp_consumer True in the default.conf.

One of the indirect benefits of using consumers is that it's much easier to find out which IP addresses are consuming from a queue, it shows up directly in the GUI or CLI. That will be nice if we need to track down where a queue is coming from.

__subclasses__() was only finding the classes that have Moth as the parent. AMQPConsumer is a subclass of AMQP and it wasn't in the list of Moth subclasses, so I had to make it recursive to find the subclasses' subclasses.

I was thinking eventually the amqp_consumer default could be changed to True or just merge the classes like you said. I'm also not sure if there's any benefit to keeping basic_get around if this is working.

More testing is needed, like having multiple nodes (IPs) consume from the same queue. It should work fine but I haven't tested that. I also want to see if there's any performance improvement with consumers. In theory consuming should be faster, but I don't know if this implementation is or not.

I'll clean up the forced log level debug in the code, and try to get it working in GitHub Actions, then create a PR?

petersilva commented 7 months ago

yeah, that sounds great!

reidsunderland commented 7 months ago

Having this code

# docs.celeryq.dev/projects/amqp/en/latest/reference/amqp.channel.html#amqp.channel.Channel.basic_consume
        self.channel.basic_consume(self.o['queueName'], no_ack=False, callback=self.__get_on_message)
        self.__stop_thread() # make sure it's not already running
        self._consumer_thread = threading.Thread(target=self.__drain_events)
        self._consumer_thread.start()

in getSetup causes it to execute even when it's just doing things like declaring queues or exchanges. I moved it into its own function that gets called by getNewMessage (same place getSetup is called).

Unscientifically testing on my computer, the static_flow test using the consumer version (4 min 55 sec) seems to be about 10 seconds faster than the basic_get version (5 min 5 sec).

petersilva commented 7 months ago

fwiw, I did a few test runs.. I'm actually surprised at how consistent it is. I don't see any performance difference. I didn't monitor load though... This is on my home PC.

basic_get (baseline)

/home/peter/Sarracenia/insects/static_flow start Tue 14 Nov 2023 08:21:33 PM EST
/home/peter/Sarracenia/insects/static_flow end Tue 14 Nov 2023 08:24:36 PM EST       3:03
/home/peter/Sarracenia/insects/static_flow start Tue 14 Nov 2023 08:26:17 PM EST
/home/peter/Sarracenia/insects/static_flow end Tue 14 Nov 2023 08:29:20 PM EST       3:03
/home/peter/Sarracenia/insects/static_flow start Tue 14 Nov 2023 08:30:00 PM EST
/home/peter/Sarracenia/insects/static_flow end Tue 14 Nov 2023 08:32:14 PM EST       2:14
/home/peter/Sarracenia/insects/flakey_broker start Tue 14 Nov 2023 08:34:27 PM EST
/home/peter/Sarracenia/insects/flakey_broker end Tue 14 Nov 2023 08:44:39 PM EST    10:12
/home/peter/Sarracenia/insects/flakey_broker start Tue 14 Nov 2023 08:46:42 PM EST
/home/peter/Sarracenia/insects/flakey_broker end Tue 14 Nov 2023 08:56:19 PM EST    10:07
/home/peter/Sarracenia/insects/flakey_broker start Tue 14 Nov 2023 08:57:57 PM EST
/home/peter/Sarracenia/insects/flakey_broker end Tue 14 Nov 2023 09:07:18 PM EST     9:21
/home/peter/Sarracenia/insects/dynamic_flow start Tue 14 Nov 2023 10:56:37 PM EST
/home/peter/Sarracenia/insects/dynamic_flow end Tue 14 Nov 2023 11:20:58 PM EST     24:19
/home/peter/Sarracenia/insects/no_mirror start Tue 14 Nov 2023 11:25:55 PM EST
/home/peter/Sarracenia/insects/no_mirror end Tue 14 Nov 2023 11:31:42 PM EST         5:47

amqp_consumer

/home/peter/Sarracenia/insects/static_flow start Tue 14 Nov 2023 11:34:37 PM EST
/home/peter/Sarracenia/insects/static_flow end Tue 14 Nov 2023 11:37:40 PM EST      3:03
/home/peter/Sarracenia/insects/static_flow start Tue 14 Nov 2023 11:41:50 PM EST
/home/peter/Sarracenia/insects/static_flow end Tue 14 Nov 2023 11:44:54 PM EST      3:04
/home/peter/Sarracenia/insects/static_flow start Tue 14 Nov 2023 11:46:18 PM EST
/home/peter/Sarracenia/insects/static_flow end Tue 14 Nov 2023 11:49:11 PM EST      2:53
/home/peter/Sarracenia/insects/flakey_broker start Tue 14 Nov 2023 11:51:03 PM EST
/home/peter/Sarracenia/insects/flakey_broker end Wed 15 Nov 2023 12:01:10 AM EST   10:07
/home/peter/Sarracenia/insects/flakey_broker start Wed 15 Nov 2023 12:03:09 AM EST
/home/peter/Sarracenia/insects/flakey_broker end Wed 15 Nov 2023 12:12:30 AM EST    9:21
/home/peter/Sarracenia/insects/flakey_broker start Wed 15 Nov 2023 12:15:18 AM EST
/home/peter/Sarracenia/insects/flakey_broker end Wed 15 Nov 2023 12:24:55 AM EST   10:13
/home/peter/Sarracenia/insects/dynamic_flow start Wed 15 Nov 2023 12:26:57 AM EST
/home/peter/Sarracenia/insects/dynamic_flow end Wed 15 Nov 2023 12:52:21 AM EST    27:18
/home/peter/Sarracenia/insects/no_mirror start Wed 15 Nov 2023 12:54:47 AM EST
/home/peter/Sarracenia/insects/no_mirror end Wed 15 Nov 2023 01:00:36 AM EST        7:23

The later runs are slower, but I think need more samples, and I don't think that's real yet. Using consumer is still great for many other reasons... but I'm not sure we can claim performance improvements yet.

petersilva commented 7 months ago

The dynamic flow has started consistently crashing/haning at the start, with the initial shovel dying (not sure if it's hung) with "malloc(): unsorted double linked list corrupted"

2023-11-16 08:37:09,339 [ERROR] 178007 sarracenia.moth.amqpconsumer __drain_events exception occurred: 'SSLTransport' object has no attribute '_quick_recv'
2023-11-16 08:37:09,339 [ERROR] 178007 sarracenia.moth.amqpconsumer __drain_events exception occurred: 'SSLTransport' object has no attribute '_quick_recv'
2023-11-16 08:37:09,340 [ERROR] 178007 sarracenia.moth.amqpconsumer __drain_events exception occurred: 'SSLTransport' object has no attribute '_quick_recv'
2023-11-16 08:37:09,340 [ERROR] 178007 sarracenia.moth.amqpconsumer __drain_events exception occurred: 'SSLTransport' object has no attribute '_quick_recv'
2023-11-16 08:37:09,340 [ERROR] 178007 sarracenia.moth.amqpconsumer __drain_events exception occurred: 'SSLTransport' object has no attribute '_quick_recv'
malloc(): unsorted double linked list corrupted
2023-11-16 08:37:09,457 177976 [CRITICAL] root run_command subprocess.run failed err=Command '['/usr/bin/python3', '/home/peter/Sarracenia/sr3/sarracenia/instance.py', '--no', '0', 'foreground', 'shovel/t_dd1_f00']' died with <Signals.SIGABRT: 6>.

no idea what happenned, but I can't get it to complete any more. If I turn amqp_consumer off it becomes functional again.

reidsunderland commented 7 months ago

I am just guessing, I think it might be related to the changes introduced by adding the new queue declares for metrics: https://github.com/MetPX/sarracenia/pull/818

Two threads might be trying to do different things with the connection at the same time?

petersilva commented 7 months ago

yup. I commented out the periodic queue declares, and it does not get stuck anymore. hmm... how to fix?

reidsunderland commented 7 months ago

From a bit of reading, connections are supposed to be thread safe, and we're using two different channels. That seems to be the right way to do this... But the drain_events is called on the connection, not the channel, and something is crashing.

That came from these examples: https://github.com/celery/py-amqp#quick-overview

petersilva commented 7 months ago

I tried removing the periodic queue declares )leaving the initial one, and that is broken also. If someone instead turns the existing declare_queue setting off, then it should work... but all the flow tests require declaring of queues. so that's a bit of engineering... (set it up so sr3 declare does the right thing, then add something to turn the option off before any configs are started. It's a bit convoluted.

reidsunderland commented 7 months ago

I found the answer, connections are not thread safe. I don't know why I thought they were. So it's almost more surprising that it seemed to work okay before

https://github.com/celery/py-amqp/issues/267#issuecomment-477416118

petersilva commented 7 months ago

do we need a thread? maybe a single flow is fine?

reidsunderland commented 7 months ago

We could probably call drain_events in getNewMessage and not use a thread, but if there's no events to drain we'd have to wait for it to timeout

petersilva commented 7 months ago

drain event has a timeout... just set to something like a microsecond... that's what I did on the C side. with a 2 ghz clock, that means it waits 2 thousand cycles, but there's still a million waits in a second... so likely fine for all purposes. In the C I started doing exponential back off... wait 1 microsecond at first, then double each time there is nothing.... until some upper bound, like a second ... I guess.

petersilva commented 7 months ago

the only point of the exponential back off is to reduce cpu load.

petersilva commented 7 months ago

perhaps try it without threads? I don't think we need it, and KISS principle applies.

reidsunderland commented 3 months ago

We have the ability to set a consumer tag with a consumer. It might be useful to have these things in the tag:

petersilva commented 3 months ago

yes. I think that's great. We already have the ip-address in the rabbitmq management gui so the hostname is useful information. we discussed privacy concerns... perhaps someone would object to divulging their hostnam to the broker... hmm... We could use the HOSTNAME environment variable, which the user can override in their config file if they object... ? is that a resonable mechanism... it gives people an out if they object, but the most useful setting is the default.

oh... just noticed that HOSTNAME to override self.hostname is not explicitly implemented in sr3... might have to check that that actually works. ( #967 )

reidsunderland commented 3 months ago

I'd like to add an option to let the user define the tag like we have for queueName, and make it work with all variables that are supported for queueName

reidsunderland commented 2 months ago

The flakey broker test is consistently failing with the AMQP consumer, it seems to be a real problem with the code, not a test issue. I'll try to figure out the problem when I have time.