NOAA-OWP / wres

Code and scripts for the Water Resources Evaluation Service
Other
2 stars 1 forks source link

As a developer, I want zombie queues to be removed from the broker on a regular basis #116

Open epag opened 4 weeks ago

epag commented 4 weeks ago

Author Name: Hank (Hank) Original Redmine Issue: 116452, https://vlab.noaa.gov/redmine/issues/116452 Original Date: 2023-05-19


I've defined two types of zombies in tickets:

108371 - Queues created by the tasker for evaluations long done, because the persister has then as @IN_PROGRESS@ or @IN_QUEUE@.

116364 - A queue for an evaluation that had been marked completed, but the queue was not removed by its watcher before the tasker container stopped.

The first one, above, is not really a zombie, since the queues do not survive the service going down. Rather, they are recreated when the service comes back up. The second one is a true zombie, because it lives as a durable queue forever or until someone manually removes it via the broker monitor. Or maybe the first one is a zombie that keeps rising from the grave and the second one is an immortal that only dies when its head is cutoff... or something.

Anyway, the point is we need a way to get rid of these queues. I think with changes in #116364, the #108371 zombie is largely addressed, and, if they do arise, will be go away eventually due to the two week time limit, but I thought that before. This ticket is about the second kind of zombie that has not been addressed at all. Again, they can be removed by hand via the monitor, but we'd rather not do that. It should be handled automatically.

This ticket can be resolved when the second type of zombie, #116364, is removed automatically on a regular basis.

Thanks,

Hank

epag commented 4 weeks ago

Original Redmine Comment Author Name: Hank (Hank) Original Date: 2023-05-31T17:36:26Z


Make sure to review the EDIT at the bottom!

Noting this here, but not sure if I want to advertise it in our troubleshooting wiki. I was able to clear out a #108371 zombie where the tasker believes a job is still unprocessed, but is actually long done. I did so by going through the -ti broker monitor and publishing the message with payload "600" (all else empty) to the @job.[job id].exitCode@ queue:

!116452_monitor_queue_message_fields.png!

I pulled that constant from this in @WresProcess.java@:

    private static final int META_FAILURE_CODE = 600;
</code>

This method comment explains the purpose of the constant:

    /**
     * Helper to prepare a job that failed due to never getting the job to run
     * @param e the exception that occurred or null if none
     * @return raw message indicating meta failure
     */

    private static byte[] prepareMetaFailureResponse( Exception e )
    {
        return WresProcess.prepareResponse( META_FAILURE_CODE, e.toString() );
    }
</code>

The tasker properly processed the message as indicating a failure and all of the queues were removed. Note that if the tasker had not responded, then it would be a situation where the 5 queues for a job are actually #116364 type zombies, so posting a message won't work. In that case, they can be removed manually within the broker monitor.

To summarize, as a work-around +only+ until a proper fix for this ticket is implemented... To remove #108371 zombies, publish a 600 message to the @exitCode@ queue per the above. To remove a #116364 zombie, remove the queue directly within the monitor.

Thanks,

Hank

EDIT: It turns out the reason why the queue was removed was because the message was badly formed. Specifically:

2023-05-31T17:22:24.757+0000 [pool-1-thread-7] INFO wres.tasker.JobResultConsumer - Heard a message, consumerTag: amq.ctag-lwQCY14QKVHj5AxmjUSRtQ, envelope: Envelope(deliveryTag=1, redeliver=false, exchange=, routingKey=job.6565187957353905807.exitCode), properties: #contentHeader<basic>(content-type=null, content-encoding=null, headers={}, delivery-mode=1, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null), message: [54, 48, 48]
2023-05-31T17:22:24.758+0000 [pool-1-thread-7] WARN wres.tasker.JobResultConsumer - Could not parse a job result message.
com.google.protobuf.InvalidProtocolBufferException$InvalidWireTypeException: Protocol message tag had invalid wire type.
    at com.google.protobuf.InvalidProtocolBufferException.invalidWireType(InvalidProtocolBufferException.java:142)
    at com.google.protobuf.UnknownFieldSet$Builder.mergeFieldFrom(UnknownFieldSet.java:527)
    at com.google.protobuf.GeneratedMessageV3$Builder.parseUnknownField(GeneratedMessageV3.java:863)

Note that the job I removed was 6565187957353905807; the screenshot shows a different job only for illustration purposes.

epag commented 4 weeks ago

Original Redmine Comment Author Name: Arvin (Arvin) Original Date: 2023-07-03T13:56:48Z


Trying to understand how these zombie queues happen a bit more

epag commented 4 weeks ago

Original Redmine Comment Author Name: Arvin (Arvin) Original Date: 2023-07-03T17:31:17Z


I have been reading [[#108371]] in detail to get a better understanding of zombie queues. It a lot of information and connected parts so I'm trying to understand as much as I can.

epag commented 4 weeks ago

Original Redmine Comment Author Name: Arvin (Arvin) Original Date: 2023-07-03T20:06:35Z


    @Override
    public void run()
    {
        Consumer<GeneratedMessageV3> sharer = new JobStatusSharer( this.getJobMetadata() );

        BlockingQueue<JobStatus.job_status> jobStatusQueue
                = new ArrayBlockingQueue<>( LOCAL_Q_SIZE );

        String exchangeName = this.getJobStatusExchangeName();
        String exchangeType = "topic";
        String bindingKey = "job." + this.getJobId() + "." + TOPIC;

        try ( Channel channel = this.getConnection().createChannel() )
        {
            channel.exchangeDeclare( exchangeName, exchangeType );

            // As the consumer, I want an exclusive queue for me.
            String queueName = channel.queueDeclare(bindingKey, true, false, false, null).getQueue();
            channel.queueBind( queueName, exchangeName, bindingKey );

            JobStatusConsumer jobStatusConsumer =
                    new JobStatusConsumer( channel, jobStatusQueue );

            channel.basicConsume( queueName,
                                  true,
                                  jobStatusConsumer );
            this.getCountDownLatch().countDown();
            JobMessageHelper.waitForAllMessages( queueName,
                                                 this.getJobId(),
                                                 jobStatusQueue,
                                                 sharer,
                                                 TOPIC );

            LOGGER.info("Deleting the queue {}", queueName);
            AMQP.Queue.DeleteOk deleteOk = channel.queueDelete(queueName);
            if (deleteOk == null)
            {
                LOGGER.warn( "Delete queue with name '" + queueName + "' failed. There might be a zombie queue." );
            }
        }
        catch ( InterruptedException ie )
        {
            LOGGER.warn( "Interrupted while getting status for job {}",
                         this.getJobId(), ie );
            Thread.currentThread().interrupt();
        }
        catch ( IOException | TimeoutException e )
        {
            // Since we may or may not actually consume result, log exception here
            LOGGER.warn( "When attempting to get job status message using {}:",
                         this, e );
        }
    }
</code>

Tasker code that I think could help

epag commented 4 weeks ago

Original Redmine Comment Author Name: Arvin (Arvin) Original Date: 2023-07-05T14:13:39Z


On ticket #116364 response 45 Hank mentions this:

The zombie queues are not removed when the tasker restarts, because the status has already been received, in this case as a success. 
Hence, a watcher is never created, so the mechanism for deleting the zombies is never set up. So, I finally have a clear definition for a zombie queue:

A zombie is merely a queue for an evaluation that had been marked completed, but the queue was not removed by its watcher before the tasker container stopped.
</code>
epag commented 4 weeks ago

Original Redmine Comment Author Name: Arvin (Arvin) Original Date: 2023-07-05T17:30:22Z


So my understanding on this instance of the zombie queue is when an evaluation finishes and it begins the deleting process and the tasker get interrupted/restarted (due to containers going down, etc.) The queue cannot be removed by its watcher and that happens at this part of the code?:

@JobStatusWatcher.java@

            JobMessageHelper.waitForAllMessages( queueName,
                                                 this.getJobId(),
                                                 jobStatusQueue,
                                                 sharer,
                                                 TOPIC );

            LOGGER.info("Deleting the queue {}", queueName);
            AMQP.Queue.DeleteOk deleteOk = channel.queueDelete(queueName);
            if (deleteOk == null)
            {
                LOGGER.warn( "Delete queue with name '" + queueName + "' failed. There might be a zombie queue." );
            }
        }
</code>

Is this correct?

epag commented 4 weeks ago

Original Redmine Comment Author Name: Hank (Hank) Original Date: 2023-07-05T19:16:21Z


Yes, that sounds correct.

The path you describe (by quoting me) is one path to a "zombie" queue. I think the other path I identified was when an evaluation completes, but the persister does not hear about it, so it leaves the evaluation in-progress. Queues will then be created whenever the tasker is restarted and those queues will never be removed since the evaluation is no longer enqueued in the broker. Something like that.

Anyway, the main point here is to identify a means to remove the zombies. That is, the queues for which the corresponding job is either complete or cannot be found in the persister. Those queues need to be removed. I'm not sure that the broker will be able to let the tasker know the information it requires to identify those queues.

Anyway, I'm back tomorrow morning and can answer more questions then,

Hank

epag commented 4 weeks ago

Original Redmine Comment Author Name: James (James) Original Date: 2023-07-05T20:17:15Z


While durable queues are generally advisable, because they survive a broker restart, it sounds like we've replaced one problem - message/evaluation loss on node restart - with another problem - conflicting information about the state of an evaluation across different messaging components. Since the broker (with durable queues on a durable exchange and persistent messages) and persister both have a (potentially different) persistent view of an evaluation, there is inherently an opportunity for conflict.

With transient queues in amqp 0.9.1, the queue will be deleted on node restart, so zombies are impossible. Essentially, the broker cleans up a queue after the consumer connection is dropped. On the other hand, messages will be lost on node restart and durable queues were probably introduced to fix that problem.

We should confirm that durable queues are needed. If they are, then we need a more sophisticated approach for tracking the construction and destruction of those queues, assuming we address this problem at all. The basic principle is that, whatever creates the queue should also destroy it and there would need to be extra safeguards to ensure that happens. For example, it may be necessary to map queues to jobs in the persister so that the tasker (the thing that creates and destroys the queues) can know when a completed job coincides with a queue that shouldn't exist.

That being said, we shouldn't lose sight of zombie queues being the least problem among many. Certainly, loss of evaluation jobs is a worse problem and durable queues were probably introduced to address that. Any attempt to handle zombie queues in an automated way should be weighed against the extra code complexity and whether it is just moving the problem elsewhere. It isn't that hard or too terrible to have a manual step - there is always some admin involved with a web service.

epag commented 4 weeks ago

Original Redmine Comment Author Name: Hank (Hank) Original Date: 2023-07-06T11:39:23Z


I would have to look through the ticket history to identify why all of the queues were made durable. I know that the message queue itself was durable in order to allow an evaluation to be picked back up after restart. I'm guessing the other queues were made durable to avoid losing information about the jobs, as you said, likely messaged when the tasker was down and unable to watch for those messages.

The manual step to zombie removal is quite easy: just go into the monitor and delete the queues. Given how rare zombies occur (we haven't seen one in production in a long time), just adding a step during deployment for removing zombies should be sufficient. Having said that, if we can come up with a simple way to remove zombies in-code, that would be better; I'm just not sure how simple it will be.

Anyway, we'll see what Arvin can discover.

Hank

epag commented 4 weeks ago

Original Redmine Comment Author Name: Arvin (Arvin) Original Date: 2023-07-06T13:43:05Z


I have done some research into durable queues to get a better understanding of them. My understanding now is that because we have durable queues we have addressed one issue which according to James is a more critical issue regarding evaluation Job loss.

I'm trying to see if we can remove all Zombie queues on application startup in the code. Would this be a viable solution?

epag commented 4 weeks ago

Original Redmine Comment Author Name: Hank (Hank) Original Date: 2023-07-06T13:48:45Z


I think so, yes. It would essentially be automating a step that we could perform manually through the broker monitor. I don't think zombies form often enough to worry about removing them on a routine basis; cleaning up when the tasker is brought up should be enough, in my opinion. And, if I'm proven wrong and we decide it needs to be done more often, we can make that happen later.

My two cents,

Hank

epag commented 4 weeks ago

Original Redmine Comment Author Name: James (James) Original Date: 2023-07-06T13:55:44Z


In principle, yes. In practice, it depends how much effort is involved as to whether it's worthwhile vs. doing manually at deployment time.

epag commented 4 weeks ago

Original Redmine Comment Author Name: Arvin (Arvin) Original Date: 2023-07-06T15:49:00Z


Hank,

I saw that we can leverage using @if (queueDeclareOk.getMessageCount() == 0)@ then its a zombie queue. You mentioned something in the call of using the persister and I want to know more about that for our case.

epag commented 4 weeks ago

Original Redmine Comment Author Name: Hank (Hank) Original Date: 2023-07-06T15:59:48Z


We can talk about this in the meeting, but I'm not sure if a message count can be relied on. For example, one zombie currently in staging has 26 unread messages.

What I was referring to is this... If you know the queue name (e.g., "job.5709879514538434097.status"), then you know the job id (e.g., "5709879514538434097"), and you can use that job id to determine if the job is still being tracked (i.e., check the job metadata map to see if it exists) and is incomplete (check the status if the job is found). At startup, any queue for a job that is complete or not being tracked is a zombie. Something like that should handle the cases I know about.

Thanks,

Hank

epag commented 4 weeks ago

Original Redmine Comment Author Name: James (James) Original Date: 2023-07-06T16:21:20Z


Yeah, that's what I meant about mapping/persisting the relationship between jobs and queues. To me, that sounds more complicated than it's worth...