NCEAS / metadig-engine

MetaDig Engine: multi-dialect metadata assessment engine
7 stars 5 forks source link

unclosed connections and channels in rabbitmq #327

Closed mbjones closed 1 year ago

mbjones commented 1 year ago

The current implementation seems to leave large numbers of connections and channels open. Generally we only want one or a few connections open for each producer and consumer, and they would have typically one channel per thread. See https://www.rabbitmq.com/connections.html#basics

Looking at the current management interface, in production I see:

I also note that in the code, I see only 4 places where these are created:

However, it also seems like they are not closed properly when the class is destroyed. At a minimum this would happen when new Worker and Scorer consumer pods are created.

jeanetteclark commented 1 year ago

it looks like there is a shutdown method in the Controller class (635) but as far as I can tell it isn't actually called anywhere, only defined.

I think that this method should be called within a finally block at the end of the processQualityRequest (335) and processScorerRequest (424) methods.

other locations where it needs to be implemented:

let me know if this sounds right @mbjones. If so I can implement those changes. I'll need help with deployment most likely

jeanetteclark commented 1 year ago

okay I pushed some things to bugfix-#327-rabbitmq-connections @mbjones should I do a PR for review?

jeanetteclark commented 1 year ago

So, we determined that the fix above isn't gonna work. The connections have to stay open because they are all both sending and receiving messages, so if we close them they won't be able to receive.

I had a look at the k8s logs for a worker pod and see this:

20230125-21:13:16: [ERROR]: RabbitMQ connection error: channel is already closed due to channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - delivery acknowledgement on channel 1 timed out. Timeout value used: 1800000 ms. This timeout value can be configured, see consumers doc guide to learn more, class-id=0, method-id=0) [edu.ucsb.nceas.mdqengine.Worker:360]
20230125-21:13:16: [INFO]: Resetting RabbitMQ queues and resending completed report...

This gets triggered in a catch block at Worker:362

similarly, I see in the rabbitmq pod logs:

2023-01-26 21:30:51.811911+00:00 [warn] <0.25981.47> Consumer 1 on channel 1 has timed out waiting for delivery acknowledgement.

From the docs:

If a consumer does not ack its delivery for more than the timeout value (30 minutes by default), its channel will be closed with a PRECONDITION_FAILED channel exception.

I'm not sure exactly the sequence of events, but I think messages are going unacknowledged, causing channels to get shutdown with this timeout. When the callback method is run by a new report, a new connection and channel are opened when the catch block (Worker:362) is hit, probably because the channel got closed on the previous timeout. Note that this catch block (unlike the try above it) does NOT have a call to basicAck(), causing the connection to timeout again. Note that since the connections stay live, they pile up while channels aren't as numerous because they are constantly timing out/getting shut down (currently ~800 connections and 33 channels).

I'm also not sure of the solution, but definitely a basicAck needs to get added somewhere. Possibly as a finally block after the try/catch in Worker:302 (returnReport is the method containing the catch block that will reopen channels if they are closed).

would love to discuss whether this make sense @mbjones @iannesbitt. I will also see if I can recreate this fun behavior in the mini-program I made to try and mimic it.

mbjones commented 1 year ago

Nice writeup. You've definitely hit a core issue. As we discussed the other day, the whole design of using 2 queues is to get around these timeout problems. Clearly it was not sufficient. I think a sequence diagram will help conceptualize the flow so that it is clear what is happening. Similar issues might be latent in the new Metacat queue processing code, as illustrated in this sequence diagram. Let's discuss, maybe next week while we are in Santa Barbara -- a little whiteboard session might go a long way.

iannesbitt commented 1 year ago

Huh. Maybe this explains why the toy program always seems to have 1 unacked message whenever I feed the Consumer queue a bunch (10+) of things at a time...

iannesbitt commented 1 year ago

I agree Jeanette that the basicAck() at Worker:357 should be moved somewhere where it will be covered even in the case of an error. This seems like it could be causing a problem even if it's not causing this particular problem.

iannesbitt commented 1 year ago

Now that I think about it, I think the 1 unacked that I see when I fill up the toy program's queue may just represent the message that's currently in processing by the Worker, so there will always be one as long as the queue has unprocessed items. I was confusing this behavior with the behavior we saw in prod-metadig's queue when we saw the admin interface report 3 unacked messages and no load.

I thought this from the tutorial was interesting as it also appears to describe a symptom of the issue:

Messages will be redelivered when your client quits (which may look like random redelivery), but RabbitMQ will eat more and more memory as it won't be able to release any unacked messages.

mbjones commented 1 year ago

@jeanetteclark @iannesbitt poking around, I realized Peter did generate some sequence diagrams -- the plant UML is in the docs folder, and here's a couple examples:

iannesbitt commented 1 year ago

Last night I removed the basicAck() call from Jeanette's toy program and watched how RabbitMQ handled messages. The messages would process but then be recycled from the Unacked queue to each new Worker that I started. I'm assuming this is happening automatically whenever Kubernetes sees a connection marked dead, but whereas it knows how to start and kill Workers, maybe it is doing so in a way that leaves the dead connections open.

One thing we should consider is that there is a potential to lose jobs if the solution to this problem is to simply put a basicAck() in a finally clause. If a job fails to process but is positively acknowledged anyway, the failed job will be discarded from the queue (which may or may not be what we want). The docs say the way around this is to use a negative acknowledgement (in the catch in our case) that requeues the job, such as channel.basicReject(deliveryTag, requeue: true); (which causes the message to be requeued rather than discarded after being acknowledged).

If we decide this is the solution, we will need some logic that removes the job from the queue if it fails continuously, but this should at least prevent unwarranted positive acks. We could also use this type of rejection with requeue: false to get rid of old jobs that might cause Kubernetes to spawn new connections/channels. That seems like it would be more in the spirit of RabbitMQ's acknowledgement system than our current setup.

iannesbitt commented 1 year ago

I just added jeanetteclark/rabbit-learning@9a6b691 to show an example of this. I think this type of fix would at least help keep the number of connections due to unacked messages from continuing to climb as happens at the moment. The three of us should discuss what should happen to queue messages that fail twice in a row (requeue or discard) because it's possible that this logic would just keep failures in a loop forever, which we obviously don't want. The other option is to keep track of failed message tags and discard the ones that go through the loop more than once (a total of 4 failures in the case of the toy logic).

jeanetteclark commented 1 year ago

okay the new fix is pushed @12d75eb

I do have a failing test when I run mvn install but it also fails on main (failure is isedu.ucsb.nceas.mdqengine.filestore.FilestoreTestIT:64)

I'm gonna do some reading on the deploy process, @mbjones can you have a look at the fix in the meantime?

mbjones commented 1 year ago

@jeanetteclark I left comments on the commit https://github.com/NCEAS/metadig-engine/commit/12d75eb24d1186a91be15bb90a1d01b1546f43a6

The deploy process is basically a helm upgrade, but first we have to build and publish a new image. I think we should use this opportunity to change where images are hosted, as peter was using dockerhub which is not our working model. So I could review this change with you to use the GHCR instead.

jeanetteclark commented 1 year ago

so initially, things are getting closed because of long processing jobs. see from the logs:

20230113-04:11:34: [INFO]: Total elapsed time (seconds): 2479 for metadataPid: urn:uuid:5c0c1da6-ef0b-47a5-bf31-1a6fe503bad1, suiteId: FAIR-suite-0.3.1
 [edu.ucsb.nceas.mdqengine.Worker:343]
20230113-04:11:34: [INFO]:  [x] Done [edu.ucsb.nceas.mdqengine.Worker:353]
20230113-04:11:34: [ERROR]: RabbitMQ connection error: channel is already closed due to channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - delivery acknowledgement on channel 1 timed out. Timeout value used: 1800000 ms. This timeout value can be configured, see consumers doc guide to learn more, class-id=0, method-id=0) [edu.ucsb.nceas.mdqengine.Worker:360]

I think the solution might be to move the basicAck up to be called sooner, so that when the message is received, its acked, then the report is processed, and then the results are returned to the completed queue. this would keep it from timing out, and utilize this 2 stage queue system in the way that I think it was designed, such that really long jobs can be handled without doing something like just increasing the timeout default

jeanetteclark commented 1 year ago

@mbjones pointed out on slack that if we run things in the order:

  1. receive message
  2. ack it
  3. do work
  4. send message to next queue when work succeeds

we don't have a way of recovering if things fail in step 3.

In doing a little googling around, I found this SO thread which has an interesting idea, though it really seems like it is (again) rewriting a task queue that should be handling all that stuff for us. There is also the workaround of just increasing the timeout to something really long, which is by far the easiest solution but also the least satisfying.

The only other solution would be figuring out how to make the controller anticipate a message from the completed queue for every message it sends to the quality queue. If it doesn't receive a completed message, then it would re-queue its message to the quality queue. Again though, isn't this the whole point of a work queue to not have to write all that stuff yourself?

mbjones commented 1 year ago

I went through that exact same internal debate yesterday. That strategy is called "premature acknowledgement" or "preemptive acknowledgement". I also spent a while poking around the interwebs about solutions to this - it seems to be a common problem/complaint. While mostly this will be a rare concern for us, the one place it could happen somewhat regularly is if k8s moves a worker from one node to another in the middle of processing a job, causing it to never finish.

Our one saving grace is that report generation should be entirely idempotent. So, I think a periodic controller sweep to check for reports that were queued and acked but never reported back would be a decent solution - all it would have to do is resubmit the task to rmq again. If for some reason the first task did eventually complete, the second job should just be producing identical output -- so no harm, no foul.

The only concern to watch here are infinite cycles, where we just keep regenerating the same task over and over and it never completes in the window between sweeps (possibly due to a v. long process or bug). I see two fairly simple ways to guard against that:

In either case, once a job hit a certain number of retries, we'll want to shunt that off into a dead letter exchange for manually dealing with it. This all assumes the Controller is managing the submission of quality messages, and that there is a way for it to track final completion of those -- maybe that's just a 'success' status for the run in the runs table. From a precursory inspection of the db, I don't see a way that quality messages are being tracked in the controller -- I think they are fire-and-forget right now. Happy to discuss.

jeanetteclark commented 1 year ago

With a preemptive ack, we will definitely need to do some kind of sweep to make sure we don't lose jobs. I was just able to recreate the initial timeout issue locally (in a toy program). I fixed it using the preemptive ack, however if I kill the worker mid-process that message (as expected) gets lost and the job never completes. Other jobs queued up behind it are still completed and finished as normal.

You are correct that it is fire and forget on the scheduler side.

One way to implement a sweep could be to insert it into the RequestReportJob.java class (linked above). If we use the taskList.csv to get the cron schedule we should be able to get the harvest date prior to the one that is currently running. Retrieving the list of pids between harvest0 and harvest1 (current harvest), and checking to make sure they are all present in the runs table will tell us if any were missed. If any were missed, they can be inserted into the pid list for the current harvest, or be rerun in a separate try with some care to preventing it from repeating endlessly. We could also be more generous with the timeframe for the sweep, and just get a list of pids from the week prior to the current harvest and check that against the runs table.

This keeps RMQ out of it, but I think it might be the easiest way. Otherwise I think we would to need to figure out a way to store RMQ message payloads temporarily in the Controller, and that doesn't really make sense to me since we already do that in the runs table.

Thoughts?

mbjones commented 1 year ago

Glad you were able to replicate the issue!

Your approach sounds reasonable, although possibly complicated in that you are recalculating the PID/SUITE combo for everything as you check it, which requires querying the dataone servers, which change state frequently. So reconstructing things might not be simple.

Another way might be thinking about inserting an entry in the runs table when the first job ack comes back that has basic fields filled in and a status of 'running' or something like that, along with an 'run_count' counter. When the completed queue fires and runs, the run row would be updated with full run info and status='success' as it is now. So then, checking for dangling jobs is a matter of a simple sql query for runs with status 'running' that were started delta time ago or longer (e.g., 24 hours or something). The info in the table would be sufficient to requeue the job and increment the run_count counter upon the first ack. If we did something like that, we should discuss the state changes in the 'status' lifecycle.

jeanetteclark commented 1 year ago

Why would I need to recalculate everything? As far as I can tell, an object only gets inserted into the runs table if it succeeds. So if the query to Solr says pid0, pid1, pid2 were all updated between harvest0 and harvest1, and pid1 and pid2 are discovered in the runs table, then those pids are filtered out of the original list and pid0 is sent to RequestReportJob

I do think that your way is more robust though, since it actually tracks the status of the jobs. I think this maybe was the original intent of the runs table but it doesn't seem to do that

mbjones commented 1 year ago

I think you need more than just the pid updates. The determination of which runs need to happen is the intersection of PID updates and SUITE schedule updates. And it is that intersection that you would have to recalculate each time to figure out what was missed. and it may not even be possible. For example, if the regex to match formatIds changes in the schedule, then you will no longer know if some PIDs might be missing from a previously requested schedule. For example, imagine a schedule change from this:

quality,quality-dataone-fair,metadig,10 0/1 * * * ?,"^eml.*|^http.*eml.*|.*www.isotc211.org.*;FAIR-suite-0.3.1;urn:node:CN;2020-08-28T00:00:00.000Z;1;1000"

to this:

quality,quality-dataone-fair,metadig,10 0/1 * * * ?,"^eml.*|^http.*eml.*;FAIR-suite-0.3.1;urn:node:CN;2020-08-28T00:00:00.000Z;1;1000"

With that change, any ISO runs that would have been in the previous request are no longer calculatable from the schedule. The approach I am outlining records the fact that a PID/SUITE combo was requested at some point in time, and makes sure that it eventually succeeds even if the schedule changes. While its unlikely that we will reduce the scope of these schedule regexes, I always like it when systems behave consistently -- much easier to debug.

jeanetteclark commented 1 year ago

ohhh I see, that makes sense. okay I'll start wrapping my head around the change with the modifications to the runs table - and create a separate issue for it since I think we are well beyond just unclosed connections at this point.

jeanetteclark commented 1 year ago

After lots of testing, the solution described above seems to be working correctly on the dev cluster using the snapshot release. To test, I submitted batches of 100 of the largest metadata records on the ADC a few times to the test server and observed both the logs and the RMQ dashboard. The only error I am seeing is this: https://github.com/NCEAS/metadig-engine/issues/360 which seems unrelated to any of the code changed here