NOAA-OWP / wres

Code and scripts for the Water Resources Evaluation Service
Other
2 stars 1 forks source link

As a developer, if the broker goes down and then comes back up, I want the worker-shims to re-establish a connection to the broker #87

Open epag opened 2 months ago

epag commented 2 months ago

Author Name: Hank (Hank) Original Redmine Issue: 108367, https://vlab.noaa.gov/redmine/issues/108367 Original Date: 2022-09-16 Original Assignee: Hank


Evidence that the worker-shims are not communicating with the broker when it comes back up are provided in #108269-71 or thereabouts. We see the worker-shim continuing to monitor any on-going evaluations, but, when that evaluation completes, it encounters problems messaging the broker and then does not pick up a new job. Instead, I need to @docker container restart deploymentworker?@ to get the worker to start picking up jobs.

Since its is not overly likely that the broker will go down, but not the tasker and workers, I view this as a normal priority issue. However, if I ever see this in the wild, it will move to high or urgent.

Thanks,

Hank

epag commented 2 months ago

Original Redmine Comment Author Name: Hank (Hank) Original Date: 2022-09-22T17:09:52Z


This is a retry issue. Needs to be appropriately long to wait for a broker to come back. Would check to see how retries are currently handled. Could implement a heart beat that reacts when the broker goes down forcing a connection retry.

Need to examine the worker logs to see what it is seeing on its end. I know it creates a @Channel@, but when the broker goes down, what does that @Channel@ see? How does the tasker react?

Check best practices for RabbbitMQ as well.

Hank

epag commented 2 months ago

Original Redmine Comment Author Name: Hank (Hank) Original Date: 2022-09-23T19:12:50Z


Looking at the code...

Here is the code in the worker-shim that pulls messages from the broker job queue:

        // Get work from the queue
        try ( Connection connection = factory.newConnection();
              Channel receiveChannel = connection.createChannel() )
        {
            // Take precisely one job at a time:
            receiveChannel.basicQos( 1 );

            receiveChannel.queueDeclare( RECV_QUEUE_NAME, true, false, false, null );

            BlockingQueue<WresProcess> processToLaunch = new ArrayBlockingQueue<>( 1 );

            JobReceiver receiver = new JobReceiver( receiveChannel,
                                                    wresExecutable,
                                                    processToLaunch );

            receiveChannel.basicConsume( RECV_QUEUE_NAME, false, receiver );

            while ( !Worker.killed )
            {
                LOGGER.info( "Waiting for work..." );
                WresProcess wresProcess = processToLaunch.poll( 2, TimeUnit.MINUTES );

                if ( wresProcess != null )
                {
                    // Launch WRES if the consumer found a message saying so.
                    wresProcess.call();
                    // Tell broker it is OK to get more messages by acknowledging
                    receiveChannel.basicAck( wresProcess.getDeliveryTag(), false );
                }
            }
        }
</code>

The @Channel@ is created once and used indefinitely as it asks the queue for more work.

The same thing is noticed in the three @*Messenger@ classes: the channel is created when the messaging thread is initialized. Its never checked again. So, if the broker goes down and comes back up while an evaluation is on-going, that evaluation will stop sending messages. One question that needs to be answered: Does the worker create exception messages when the @Channel@ tries to communicate with the down broker? I think I recall seeing exceptions (could be wrong), but it would be easy enough to double check when the time comes. Just stop the broker during an evaluation and check the logs of the worker-shim.

Regardless, this points to some sort of check of the @Channel@ to determine the status of the broker, and recreating or otherwise instructing the @Channel@ to reestablish a connection if needed. Identifying the best approach will require some research.

Hank

epag commented 2 months ago

Original Redmine Comment Author Name: Hank (Hank) Original Date: 2022-09-30T17:22:41Z


I think I might try to tackle this for 6.8, because I think it might be possible to complete it in time for a deployment of 6.8 next week (possibly). #97182 will probably take longer to finalize.

I think resolving this is just a matter of reorganizing how the worker uses the @Channel@ so that its possible to check on the connection and recreate it if its disconnected.

Hank

epag commented 2 months ago

Original Redmine Comment Author Name: Hank (Hank) Original Date: 2022-09-30T17:56:01Z


Surely, it can't be that simple:

@factory.setAutomaticRecoveryEnabled(true);@

I added that to the connection factory in @Worker.java@, deployed to -ti, stopped the broker, started the broker, posted a smoketest, and it picked up the job. I need to compare this with what I noticed before (i.e., what caused me to write this ticket).

I've posted below what is reported in the Docker logs for the worker during this experiment.

Hank

===============================================================

[Hank@nwcal-wres-ti02 deployment]$ docker logs deployment_worker_1
2022-09-30T17:49:47.669+0000 [main] INFO wres.worker.Worker - Using broker at host 'broker', vhost 'wres', port '5671'
2022-09-30T17:49:48.069+0000 [main] INFO wres.worker.Worker - Waiting for work...
2022-09-30T17:50:56.609+0000 [RabbitMQ Error On Write Thread] ERROR com.rabbitmq.client.impl.ForgivingExceptionHandler - An unexpected connection driver error occurred
java.net.SocketException: Connection reset by peer (Write failed)
        at java.base/java.net.SocketOutputStream.socketWrite0(Native Method)
        at java.base/java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:110)
        at java.base/java.net.SocketOutputStream.write(SocketOutputStream.java:150)
        at java.base/sun.security.ssl.SSLSocketOutputRecord.deliver(SSLSocketOutputRecord.java:345)
        at java.base/sun.security.ssl.SSLSocketImpl$AppOutputStream.write(SSLSocketImpl.java:1304)
        at java.base/java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:81)
        at java.base/java.io.BufferedOutputStream.flush(BufferedOutputStream.java:142)
        at java.base/java.io.DataOutputStream.flush(DataOutputStream.java:123)
        at com.rabbitmq.client.impl.SocketFrameHandler.flush(SocketFrameHandler.java:197)
        at com.rabbitmq.client.impl.AMQConnection.flush(AMQConnection.java:636)
        at com.rabbitmq.client.impl.AMQCommand.transmit(AMQCommand.java:134)
        at com.rabbitmq.client.impl.AMQChannel.quiescingTransmit(AMQChannel.java:455)
        at com.rabbitmq.client.impl.AMQChannel.quiescingTransmit(AMQChannel.java:434)
        at com.rabbitmq.client.impl.AMQConnection.handleConnectionClose(AMQConnection.java:915)
        at com.rabbitmq.client.impl.AMQConnection.processControlCommand(AMQConnection.java:868)
        at com.rabbitmq.client.impl.AMQConnection$1.processAsync(AMQConnection.java:265)
        at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:182)
        at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:114)
        at com.rabbitmq.client.impl.AMQConnection.readFrame(AMQConnection.java:719)
        at com.rabbitmq.client.impl.AMQConnection.access$300(AMQConnection.java:47)
        at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:666)
        at java.base/java.lang.Thread.run(Thread.java:829)
2022-09-30T17:51:29.650+0000 [AMQP Connection 172.19.254.66:5671] ERROR com.rabbitmq.client.impl.ForgivingExceptionHandler - Caught an exception during connection recovery!
java.net.NoRouteToHostException: No route to host (Host unreachable)
        at java.base/java.net.PlainSocketImpl.socketConnect(Native Method)
        at java.base/java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:412)
        at java.base/java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:255)
        at java.base/java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:237)
        at java.base/java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
        at java.base/java.net.Socket.connect(Socket.java:609)
        at java.base/sun.security.ssl.SSLSocketImpl.connect(SSLSocketImpl.java:305)
        at com.rabbitmq.client.impl.SocketFrameHandlerFactory.create(SocketFrameHandlerFactory.java:60)
        at com.rabbitmq.client.impl.recovery.RecoveryAwareAMQConnectionFactory.newConnection(RecoveryAwareAMQConnectionFactory.java:63)
        at com.rabbitmq.client.impl.recovery.AutorecoveringConnection.recoverConnection(AutorecoveringConnection.java:623)
        at com.rabbitmq.client.impl.recovery.AutorecoveringConnection.beginAutomaticRecovery(AutorecoveringConnection.java:584)
        at com.rabbitmq.client.impl.recovery.AutorecoveringConnection.lambda$addAutomaticRecoveryListener$3(AutorecoveringConnection.java:519)
        at com.rabbitmq.client.impl.AMQConnection.notifyRecoveryCanBeginListeners(AMQConnection.java:817)
        at com.rabbitmq.client.impl.AMQConnection.doFinalShutdown(AMQConnection.java:794)
        at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:678)
        at java.base/java.lang.Thread.run(Thread.java:829)
2022-09-30T17:51:37.656+0000 [AMQP Connection 172.19.254.66:5671] ERROR com.rabbitmq.client.impl.ForgivingExceptionHandler - Caught an exception during connection recovery!
java.net.NoRouteToHostException: No route to host (Host unreachable)
        at java.base/java.net.PlainSocketImpl.socketConnect(Native Method)
        at java.base/java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:412)
        at java.base/java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:255)
        at java.base/java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:237)
        at java.base/java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
        at java.base/java.net.Socket.connect(Socket.java:609)
        at java.base/sun.security.ssl.SSLSocketImpl.connect(SSLSocketImpl.java:305)
        at com.rabbitmq.client.impl.SocketFrameHandlerFactory.create(SocketFrameHandlerFactory.java:60)
        at com.rabbitmq.client.impl.recovery.RecoveryAwareAMQConnectionFactory.newConnection(RecoveryAwareAMQConnectionFactory.java:63)
        at com.rabbitmq.client.impl.recovery.AutorecoveringConnection.recoverConnection(AutorecoveringConnection.java:623)
        at com.rabbitmq.client.impl.recovery.AutorecoveringConnection.beginAutomaticRecovery(AutorecoveringConnection.java:584)
        at com.rabbitmq.client.impl.recovery.AutorecoveringConnection.lambda$addAutomaticRecoveryListener$3(AutorecoveringConnection.java:519)
        at com.rabbitmq.client.impl.AMQConnection.notifyRecoveryCanBeginListeners(AMQConnection.java:817)
        at com.rabbitmq.client.impl.AMQConnection.doFinalShutdown(AMQConnection.java:794)
        at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:678)
        at java.base/java.lang.Thread.run(Thread.java:829)
2022-09-30T17:51:48.069+0000 [main] INFO wres.worker.Worker - Waiting for work...
2022-09-30T17:52:15.956+0000 [main] INFO wres.worker.WresProcess - Started subprocess Process[pid=52, exitValue="not exited"]
OpenJDK 64-Bit Server VM warning: Option UseConcMarkSweepGC was deprecated in version 9.0 and will likely be removed in a future release.
Started recording 1.

Use jcmd 52 JFR.dump name=1 to copy recording data to file.
2022-09-30T17:52:17.135+0000 INFO Main WRES version 20220930-7930863-dev
... SNIP (the evaluation stdout)
epag commented 2 months ago

Original Redmine Comment Author Name: Hank (Hank) Original Date: 2022-09-30T17:56:44Z


I also need to look into the retry rules governing that reconnect process.

Hank

epag commented 2 months ago

Original Redmine Comment Author Name: Hank (Hank) Original Date: 2022-09-30T17:58:58Z


From the description, it refers to #108269-71 and states this:

We see the worker-shim continuing to monitor any on-going evaluations, but, when that evaluation completes, it encounters problems messaging the broker and then does not pick up a new job.

I've already seen it pick up a job after the broker is stopped and restarted.

What I haven't seen is what happens the broker is stopped during an evaluation. I'll try that now. I also want to see what happens if the broker is down for a while; i.e., minutes or hours instead of seconds.

Hank

epag commented 2 months ago

Original Redmine Comment Author Name: Hank (Hank) Original Date: 2022-09-30T18:32:30Z


Well, its not quite that simple.

I posted 5 HEFS jobs. While the first two were processing (this is entry machine only), I stopped the broker. I waited a couple minutes and restarted it.

Here is what I see in the worker side:

2022-09-30T18:06:16.680+0000 [RabbitMQ Error On Write Thread] ERROR com.rabbitmq.client.impl.ForgivingExceptionHandler - An unexpected connection driver error occurred
java.net.SocketException: Connection reset by peer (Write failed)
        at java.base/java.net.SocketOutputStream.socketWrite0(Native Method)
        at java.base/java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:110)
        at java.base/java.net.SocketOutputStream.write(SocketOutputStream.java:150)
        at java.base/sun.security.ssl.SSLSocketOutputRecord.deliver(SSLSocketOutputRecord.java:345)
        at java.base/sun.security.ssl.SSLSocketImpl$AppOutputStream.write(SSLSocketImpl.java:1304)
        at java.base/java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:81)
        at java.base/java.io.BufferedOutputStream.flush(BufferedOutputStream.java:142)
        at java.base/java.io.DataOutputStream.flush(DataOutputStream.java:123)
        at com.rabbitmq.client.impl.SocketFrameHandler.flush(SocketFrameHandler.java:197)
        at com.rabbitmq.client.impl.AMQConnection.flush(AMQConnection.java:636)
        at com.rabbitmq.client.impl.AMQCommand.transmit(AMQCommand.java:134)
        at com.rabbitmq.client.impl.AMQChannel.quiescingTransmit(AMQChannel.java:455)
        at com.rabbitmq.client.impl.AMQChannel.quiescingTransmit(AMQChannel.java:434)
        at com.rabbitmq.client.impl.AMQConnection.handleConnectionClose(AMQConnection.java:915)
        at com.rabbitmq.client.impl.AMQConnection.processControlCommand(AMQConnection.java:868)
        at com.rabbitmq.client.impl.AMQConnection$1.processAsync(AMQConnection.java:265)
        at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:182)
        at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:114)
        at com.rabbitmq.client.impl.AMQConnection.readFrame(AMQConnection.java:719)
        at com.rabbitmq.client.impl.AMQConnection.access$300(AMQConnection.java:47)
        at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:666)
        at java.base/java.lang.Thread.run(Thread.java:829)
2022-09-30T18:07:05.763+0000 [AMQP Connection 172.19.254.66:5671] ERROR com.rabbitmq.client.impl.ForgivingExceptionHandler - Caught an exception during connection recovery!
java.net.NoRouteToHostException: No route to host (Host unreachable)
        at java.base/java.net.PlainSocketImpl.socketConnect(Native Method)
        at java.base/java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:412)
        at java.base/java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:255)
        at java.base/java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:237)
        at java.base/java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
        at java.base/java.net.Socket.connect(Socket.java:609)
        at java.base/sun.security.ssl.SSLSocketImpl.connect(SSLSocketImpl.java:305)
        at com.rabbitmq.client.impl.SocketFrameHandlerFactory.create(SocketFrameHandlerFactory.java:60)
        at com.rabbitmq.client.impl.recovery.RecoveryAwareAMQConnectionFactory.newConnection(RecoveryAwareAMQConnectionFactory.java:63)
        at com.rabbitmq.client.impl.recovery.AutorecoveringConnection.recoverConnection(AutorecoveringConnection.java:623)
        at com.rabbitmq.client.impl.recovery.AutorecoveringConnection.beginAutomaticRecovery(AutorecoveringConnection.java:584)
        at com.rabbitmq.client.impl.recovery.AutorecoveringConnection.lambda$addAutomaticRecoveryListener$3(AutorecoveringConnection.java:519)
        at com.rabbitmq.client.impl.AMQConnection.notifyRecoveryCanBeginListeners(AMQConnection.java:817)
        at com.rabbitmq.client.impl.AMQConnection.doFinalShutdown(AMQConnection.java:794)
        at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:678)
        at java.base/java.lang.Thread.run(Thread.java:829)
2022-09-30T18:07:10.773+0000 [AMQP Connection 172.19.254.66:5671] ERROR com.rabbitmq.client.impl.ForgivingExceptionHandler - Caught an exception during connection recovery!
java.net.UnknownHostException: broker
...

The worker reports an initial socket exception followed by failed reconnect events every 5 seconds.

I then see evidence that it connected, immediately picked up a job, and noted that the job was already being processed. This entire time, the evaluation was still on-going by the @WresProcess@ that had been created for the original HEFS job.

2022-09-30T18:07:40.891+0000 [pool-1-thread-25] WARN wres.worker.JobReceiver - Job directory /mnt/wres_share/evaluations/wres_job_4741016284156534466 already existed indicating another process started working here.
java.nio.file.FileAlreadyExistsException: /mnt/wres_share/evaluations/wres_job_4741016284156534466
        at java.base/sun.nio.fs.UnixException.translateToIOException(UnixException.java:94)
        at java.base/sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:111)
        at java.base/sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:116)
        at java.base/sun.nio.fs.UnixFileSystemProvider.createDirectory(UnixFileSystemProvider.java:389)
        at java.base/java.nio.file.Files.createDirectory(Files.java:690)
        at wres.worker.JobReceiver.handleDelivery(JobReceiver.java:113)
        at com.rabbitmq.client.impl.ConsumerDispatcher$5.run(ConsumerDispatcher.java:149)
        at com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:111)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)
2022-09-30T18:08:41.418+0000 [pool-5-thread-1] INFO wres.worker.JobOutputMessenger - Found a file related to output /mnt/wres_share/evaluations/wres_job_4741016284156534466/wres_evaluation_ieZI41MHkkuANdavRaJ5EVPJ_DY/evaluation.csv.gz

The on-going evaluation then completes and the worker fails to try to obtain another job:

2022-09-30T18:12:57.745+0000 [main] INFO wres.worker.WresProcess - Subprocess Process[pid=225, exitValue=0] exited 0
2022-09-30T18:12:57.750+0000 [main] ERROR wres.worker.Worker - Unchecked exception while talking to the broker
com.rabbitmq.client.AlreadyClosedException: connection is already closed due to connection error; protocol method: #method<connection.close>(reply-code=320, reply-text=CONNECTION_FORCED - broker forced connection closure with reason 'shutdown', class-id=0, method-id=0)
        at com.rabbitmq.client.impl.AMQConnection.ensureIsOpen(AMQConnection.java:175)
        at com.rabbitmq.client.impl.AMQConnection.createChannel(AMQConnection.java:615)
        at wres.worker.WresProcess.sendResponse(WresProcess.java:250)
        at wres.worker.WresProcess.call(WresProcess.java:181)
        at wres.worker.Worker.main(Worker.java:112)
Exception in thread "main" com.rabbitmq.client.AlreadyClosedException: connection is already closed due to connection error; protocol method: #method<connection.close>(reply-code=320, reply-text=CONNECTION_FORCED - broker forced connection closure with reason 'shutdown', class-id=0, method-id=0)
        at com.rabbitmq.client.impl.AMQConnection.ensureIsOpen(AMQConnection.java:175)
        at com.rabbitmq.client.impl.AMQConnection.createChannel(AMQConnection.java:615)
        at wres.worker.WresProcess.sendResponse(WresProcess.java:250)
        at wres.worker.WresProcess.call(WresProcess.java:181)
        at wres.worker.Worker.main(Worker.java:112)

I'm not sure why it didn't pick up another job, other than it never really reconnected. Or it could be due to exceptions related to the other Channels being created without the recovery flag.

I'm going to modify the code so that all of the Channels created by the worker-shim code include the recovery flag.

Hank

epag commented 2 months ago

Original Redmine Comment Author Name: Hank (Hank) Original Date: 2022-09-30T18:48:05Z


So the recovery setting goes into the first @Channel@ created in @Worker.java@. That @Channel@ is passed down to the @JobReceiver@, which grabs the @Connection@ from it and reuses it to create @Channel@ instances to communicate with the other queues that are created for the WRES process. I guess that means that all of the different @Channel@ instances are going through the same broker @Connection@. Hence, if that recovers, then communication should be initiated again.

However, for the running job, I see no more stdout in the worker log once the broker goes down. I can only assume that wherever the exception is being detected, the worker is sort of abandoning its position and no longer watching the @WresProcess@ that is still running. Perhaps the different messengers are also failing.

On Monday, I think I'll insert some debug lines into the worker code so that I can better follow what happens to the underlying processes when the broker connection is lost.

As before, when I restart the two workers, the evaluations are picked up and processed using the same job id.

That's it for today. I'm going to estimate a total time of 16 hours to resolve this ticket, but it all depends on how quickly I can figure out what happens in the worker when the broker connection is closed, because its not clear to me at the moment.

Hank

epag commented 2 months ago

Original Redmine Comment Author Name: Hank (Hank) Original Date: 2022-10-03T13:33:10Z


Taking a closer look at what happens under the hood...

There is an on-going evaluation that typically takes about an hour and a half to complete. I can see the queues in the broker, each with an appropriate routine key linking to it. That's all typical.

On Friday, when I stopped the broker and restarted it, I saw evidence that it at least attempted to create another process to handle the job. The evidence was that an attempt was made to create the directory, again. This implies that another call to @handleDelivery@ in @JobReceiver@ was made. This further implies that the acked message was unacked when the broker stopped and restarted, thus thinking it had to deliver the job again.

The @handleDelivery@ method not only tries to create the output directory, but also constructs the @WresProcess@. The @WresProcess@ is then placed in a queue in the worker-shim and is picked up when the active process completes. At least, I think. I might need to experiment with that. Essentially, what I need to see is if the evaluation is completed a second time after the first run completes.

As for that first run, I don't see any further messaging from the WRES process, which implies that the job's queues in the broker are not receiving the posted messages for the ongoing job when the broker is stopped/started. Either that or the tasker is getting the messages from the queue. Hmmm...

I'm going to post an HEFS evaluation, stop the broker, and then restart it.

Hank

epag commented 2 months ago

Original Redmine Comment Author Name: Hank (Hank) Original Date: 2022-10-03T13:49:30Z


When the broker comes up, the workers clearly receive re-delivery of the jobs that are already running. Again, this message is proof of that:

2022-10-03T13:36:33.270+0000 [pool-1-thread-17] WARN wres.worker.JobReceiver - Job directory /mnt/wres_share/evaluations/wres_job_2475706965009917405 already existed indicating another process started working here.
java.nio.file.FileAlreadyExistsException: /mnt/wres_share/evaluations/wres_job_2475706965009917405
        at java.base/sun.nio.fs.UnixException.translateToIOException(UnixException.java:94)
        at java.base/sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:111)
        at java.base/sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:116)
        at java.base/sun.nio.fs.UnixFileSystemProvider.createDirectory(UnixFileSystemProvider.java:389)
        at java.base/java.nio.file.Files.createDirectory(Files.java:690)
        at wres.worker.JobReceiver.handleDelivery(JobReceiver.java:113)
        at com.rabbitmq.client.impl.ConsumerDispatcher$5.run(ConsumerDispatcher.java:149)
        at com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:111)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)

In fact, the worker-shim processing the HEFS evaluation, in this case, received the delivery of the long AHPS evaluation that is being processed by the other worker-shim. As can be seen by this code snippet, the @WresProcess@ is still being created even though the directory was found to exist, which is expected and reasonable behavior:

        try
        {
            Files.createDirectory( outputPath, fileAttribute );
            LOGGER.debug( "Created job directory {}.", outputPath );
        }
        catch ( FileAlreadyExistsException faee )
        {
            LOGGER.warn( "Job directory {} already existed indicating another process started working here.",
                         outputPath, faee );
        }
        catch ( IOException ioe )
        {
            throw new IllegalStateException( "Failed to create directory for job "
                                             + jobId, ioe );
        }

        // Translate the message into a command
        ProcessBuilder processBuilder = createBuilderFromMessage( body,
                                                                  outputPath,
                                                                  jobIdString );
        // Set up the information needed to launch process and send info back
        WresProcess wresProcess = new WresProcess( processBuilder,
                                                   properties.getReplyTo(),
                                                   properties.getCorrelationId(),
                                                   this.getChannel().getConnection(),
                                                   envelope,
                                                   outputPath );
        // Share the process information with the caller
        this.getProcessToLaunch().offer( wresProcess );
</code>

However, I note that the @getProcessToLaunch()@ returns a @BlockingQueue@ declared with a capacity of 1. Since the currently running process is executing, that queue has nothing in it at the time the new evaluation is delivered. Thus, I think the evaluation delivered will be processed once the current evaluation completes.

Let's see if that happens. The HEFS evaluation should be the first to complete,

Hank

epag commented 2 months ago

Original Redmine Comment Author Name: Hank (Hank) Original Date: 2022-10-03T13:59:28Z


That HEFS evaluation had this process information:

98      106876  25153 99 13:35 ?        00:11:18 java -Xms2560m -Xmx2560m -XX:MaxDirectMemorySize=512m -XX:+HeapDumpOnOutOfMemoryError -XX:+UseConcMarkSweepGC -Duser.timezone=UTC -Djava.util.logging.config.class=wres.system.logging.JavaUtilLoggingRedirector -Ducar.unidata.io.http.maxReadCacheSize=200000 ...

That was several minutes ago. The evaluation should have completed by now. What do I see in that worker that was handling it? Communication with the broker fails:

2022-10-03T13:40:26.626+0000 [pool-15-thread-1] INFO wres.worker.JobOutputMessenger - Found a file related to output /mnt/wres_share/evaluations/wres_job_2475706965009917405/wres_evaluation_L-oESR8dPYG8lm95PPwC1GVfGAA/evaluation.csv.gz
2022-10-03T13:44:43.528+0000 [main] INFO wres.worker.WresProcess - Subprocess Process[pid=2643, exitValue=0] exited 0
2022-10-03T13:44:43.533+0000 [main] ERROR wres.worker.Worker - Unchecked exception while talking to the broker
com.rabbitmq.client.AlreadyClosedException: connection is already closed due to connection error; protocol method: #method<connection.close>(reply-code=320, reply-text=CONNECTION_FORCED - broker forced connection closure with reason 'shutdown', class-id=0, method-id=0)
        at com.rabbitmq.client.impl.AMQConnection.ensureIsOpen(AMQConnection.java:175)
        at com.rabbitmq.client.impl.AMQConnection.createChannel(AMQConnection.java:615)
        at wres.worker.WresProcess.sendResponse(WresProcess.java:250)
        at wres.worker.WresProcess.call(WresProcess.java:181)
        at wres.worker.Worker.main(Worker.java:112)

The @WresProcess@ attempts to send a message to the broker, fails, does not catch the exception, which kicks it up to the @Worker@ @main@. What I can't figure out is, why does the @Worker@ process still show up in a @ps -ef | grep java@? That exception is at the end of the @main@ method, implying that the method left the eternal loop within it. Shouldn't that result in that process exiting? What I do know is that that process isn't generating any activity or log messages anymore. Maybe its some Docker magic that keeps it running? Let me see,

Hank

So

epag commented 2 months ago

Original Redmine Comment Author Name: Hank (Hank) Original Date: 2022-10-03T14:12:35Z


I'm at a bit of a loss on the behavior. The exception, posted in my previous comment, is clear evidence that the process is at the end of the @main@:

    public static void main( String[] args )
            throws IOException, TimeoutException, InterruptedException
    {
        if ( args.length != 1 )
        {
            throw new IllegalArgumentException( "First arg must be an executable wres path." );
        }
...
        // Get work from the queue
        try ( Connection connection = factory.newConnection();
              Channel receiveChannel = connection.createChannel() )
        {
...
        }
        catch ( IOException | TimeoutException | InterruptedException e )
        {
            String message = "Checked exception while talking to the broker";
            LOGGER.error( message, e );
            throw e;
        }
        catch ( RuntimeException re )
        {
            String message = "Unchecked exception while talking to the broker";
            LOGGER.error( message, re );
            throw re;
        }
    }
}
</code>

Yet, I still see the original @Worker@ executing in the container:

UID         PID   PPID  C STIME TTY          TIME CMD
wres_do+      1      0  0 Sep30 ?        00:02:14 java -Xms64m -Xmx64m -XX:+HeapDumpOnOutOfMemoryError -Dwres.broker=broker -Dcom.redhat.fips=false -Djava.io.tmpdir=/mnt/wres_share/evaluations -XX:HeapDumpPath=/mnt/wres_share/heap_dumps/worker-shim -XX:OnOutOfMemoryError=mv /mnt/wres_share/he
ap_dumps/worker-shim/java_pid%p.hprof /mnt/wres_share/heap_dumps/worker-shim/java_pid%p_`hostname`.hprof; chmod 775 /mnt/wres_share/heap_dumps/worker-shim/java_pid%p_`hostname`.hprof -classpath /opt/wres-worker-20220916-0025af9-dev/lib/wres-worker-20220916-0025af9-dev.jar:/opt/wres-worker-202
20916-0025af9-dev/lib/wres-messages-20220907-e31c553-dev.jar:/opt/wres-worker-20220916-0025af9-dev/lib/amqp-client-5.15.0.jar:/opt/wres-worker-20220916-0025af9-dev/lib/logback-classic-1.4.0.jar:/opt/wres-worker-20220916-0025af9-dev/lib/slf4j-api-2.0.0.jar:/opt/wres-worker-20220916-0025af9-dev
/lib/protobuf-java-3.20.1.jar:/opt/wres-worker-20220916-0025af9-dev/lib/logback-core-1.4.0.jar:/opt/wres-worker-20220916-0025af9-dev/lib/conf wres.worker.Worker /usr/bin/wres

Note the @STIME@. How can the process remaining running when its @main@ is done? Maybe I'm not understand multithreading in Java. If there are multiple threads running, but the @main@ is done, what happens to the process?

Hank

epag commented 2 months ago

Original Redmine Comment Author Name: Hank (Hank) Original Date: 2022-10-03T14:14:51Z


Oh, I see this is a Google search:

"A standard program (with only one thread) stops running when the main thread finishes its work. The main thread finishes executing, the program terminates, and the JVM frees its memory." "If we launch a child thread, the program keeps running, even if the main thread finishes.

So there are still threads, hence the @Worker@ still runs even if the @main@ is done.

This is definitely an ugly situation. So the question is, what is the best way to handle this?

Hank

epag commented 2 months ago

Original Redmine Comment Author Name: Hank (Hank) Original Date: 2022-10-03T14:30:08Z


I think what I actually want is for the workers to just stop while the broker is down and restart after the broker comes back. The broker going down without the workers also going down, and while evaluations are on-going (I confirmed it can handle it if no evaluations are on-going) should be a rare event. Thus, I'm fine with it behaving in a manner identical to the entire service being restarted. Furthermore, the solution will likely be much simpler than attempting to make it so that all of the @Connection@ and @Channel@ instances used by the worker-shim are properly handled while the broker is down.

Would it be possible to handle this via Docker? That is, tell Docker to look for the broker, and, if it does not exist, stop the worker. On the workers-only machine, it may need to check for broker connectivity, in general. Is that possible?

Let me investigate,

Hank

epag commented 2 months ago

Original Redmine Comment Author Name: James (James) Original Date: 2022-10-03T14:40:06Z


There should not be any non-daemon threads running at the end of the main thread. If there are, that is a bug to fix. Only non-daemon threads will prevent the jvm from exiting on completion.

You should perform a thread dump to see which non-daemon threads are still running. It could be an open resource or something like that. edit: or a thread pool that is not shut down cleanly.

epag commented 2 months ago

Original Redmine Comment Author Name: Hank (Hank) Original Date: 2022-10-03T14:50:17Z


There are several threads that could be still running. I'll see what a thread dump tells me.

Regardless, that will only allow the Worker to cleanly stop when the exception occurs. It won't prevent the exception. That would require code changes to harden the worker-shim against a broker failure, or getting the worker-shims to stop when the broker stops... which is likely not possible.

I need to prepare an agenda for today's WRDS call, first, and I also need to report on the experiment in #95867, so I'll probably revisit this later this afternoon.

Thanks,

Hank

epag commented 2 months ago

Original Redmine Comment Author Name: James (James) Original Date: 2022-10-03T14:52:57Z


We don't want the worker shim to stop when the broker becomes unavailable. We want the worker shim to retry connections until it succeeds.

epag commented 2 months ago

Original Redmine Comment Author Name: Hank (Hank) Original Date: 2022-10-03T16:41:41Z


From #95867, I need to move my experiments with the tasker/worker/broker to the -dev COWRES. First, let me get a thread dump from the worker containers,

Hank

epag commented 2 months ago

Original Redmine Comment Author Name: Hank (Hank) Original Date: 2022-10-03T17:00:53Z


James,

I was hoping to use @jstack@ to get a thread dump, but its not available on the staging machines. It is available on the -ti01, but I don't think that will do me any good.

Is there another way to get a thread stack for the worker-shim process?

Hank

epag commented 2 months ago

Original Redmine Comment Author Name: Hank (Hank) Original Date: 2022-10-03T17:27:02Z


@kill -3@ is supposed to generate a thread dump, but I appear to not be able to kill the process; the command does nothing from within the container.

Hank

epag commented 2 months ago

Original Redmine Comment Author Name: Hank (Hank) Original Date: 2022-10-03T17:29:41Z


Presumably, docker wouldn't react well to me killing the process at the heart of a container within a container, so it doesn't let me. Makes sense. :)

Still trying to find a reasonable way to get the thread dump,

Hank

epag commented 2 months ago

Original Redmine Comment Author Name: Hank (Hank) Original Date: 2022-10-03T17:37:49Z


On-line, most websites refer to either @jstack@ or @kill -3@, neither of which appear to be an option. The java on the -ti02 and in the container does not include @jstack@, and, in fact, appears very stripped down. When I attempted to run a version of @jstack@ copied from the -ti01, it failed due to some missing shared libraries. The COWRES machines aren't set up to run @jstack@.

I think I'm out of luck getting a thread dump on the worker shim processes that are still up despite the main threads ending.

Hank

epag commented 2 months ago

Original Redmine Comment Author Name: Hank (Hank) Original Date: 2022-10-03T17:56:34Z


I can always recreate the issue later. For now, I'm going to restart the worker containers so that they can finish up the evaluations that were on-going. I'll also look at the code to see if I can spot how to make it so that the Channels attempt to reconnect after the broker comes up.

What I find curious is that the worker-shim is able to receive delivery of jobs from the broker, which means that there is an open line of communication between the broker and worker-shim after restarting the broker. We just need to make sure other lines of communication are re-etablished, as well.

Hank

epag commented 2 months ago

Original Redmine Comment Author Name: James (James) Original Date: 2022-10-03T18:21:56Z


Sorry, I was in a meeting. I generally use VisualVM to get a thread dump. The @docker kill@ should relay a kill signal to the contained process. I think you can even send a specific signal, like @docker kill --signal=3 my_container@, but I'm not sure whether that will do what you want. Fortunately, I don't have these issues with software availability etc. There is an option to create thread dumps within a jfr too, but we don't have jfr running on the worker shim, only the worker.

epag commented 2 months ago

Original Redmine Comment Author Name: Hank (Hank) Original Date: 2022-10-03T19:04:42Z


Got it. Thanks.

I was able to deploy the version of the WRES used in this ticket to the -dev. Good.

All that's left is to wait for the COWRES to complete its on-going evaluations. Or I could stop it, remove the .aof, and clear out the broker queues. Think I'd rather wait, since I only have an hour of work remaining.

Tomorrow, I'll come back to this issue using the -dev COWRES, and let #95867 experiments begin in staging.

Hank

epag commented 2 months ago

Original Redmine Comment Author Name: Hank (Hank) Original Date: 2022-10-04T12:36:36Z


Using the -dev COWRES, I posted one HEFS job, so that only one worker is busy, and then stopped the broker. Upon restarting, I can now see that the previously idle worker picked up that one HEFS job again, so that both workers are working on it. Not good. I assume the worker that picked up the original run will bomb once that evaluation process completes, as it did in staging, yesterday, but its not done yet.

To summarize the problems:

  1. After the broker goes down, when it restarts, it delivers the queued messages, again, forgetting that a message has already been pushed out. We want that behavior for a system restart, but we don't want it when only the broker goes down. Perhaps there is a way for the broker to detect that a message is already being processed after the restart?

  2. If an evaluation is on-going when a broker goes down and comes up, then that worker-shim fails to auto-recover the connection and crashes when a @Channel@ is opened to send the evaluation complete message. Or perhaps it is auto-recovering, but the @Channel@ is just not being created. I don't know which. By crash, I mean what I reported yesterday, that an unchecked exception is thrown causing the worker-shim main to exit, essentially leaving it dead in the water.

Lastly, the second attempt at the HEFS evaluation appears to be stepping on the toes of the first attempt, because I'm seeing messages like this:

2022-10-04T12:32:48.749+0000 WARN DatabaseTimeSeriesIngester Failed to ingest a time-series from (Disposition: XML_PI_TIMESERIES; URI: file:///home/ISED/wres/wresTestData/issue92087/inputs/ASEN6HUD_Precipitation_HEFS_Baseline_Validation.tgz/null/clone_hefs_hud85/EVSdata/198501161200_ASEN6HUD_MEFP_FMAP.xml; Type: ENSEMBLE_FORECASTS; Orientation: RIGHT) on attempt 1 of 10. Continuing to retry until the maximum retry count of 10 is reached. There are 9 attempts remaining.
2022-10-04T12:32:48.762+0000 INFO DatabaseTimeSeriesIngester Succesfully ingested a time-series from (Disposition: XML_PI_TIMESERIES; URI: file:///home/ISED/wres/wresTestData/issue92087/inputs/ASEN6HUD_Precipitation_HEFS_Baseline_Validation.tgz/null/clone_hefs_hud85/EVSdata/198501161200_ASEN6HUD_MEFP_FMAP.xml; Type: ENSEMBLE_FORECASTS; Orientation: RIGHT) after 1 retries.
2022-10-04T12:32:48.807+0000 WARN DatabaseTimeSeriesIngester Failed to ingest a time-series from (Disposition: XML_PI_TIMESERIES; URI: file:///home/ISED/wres/wresTestData/issue92087/inputs/ASEN6HUD_Precipitation_HEFS_Baseline_Validation.tgz/null/clone_hefs_hud85/EVSdata/198501171200_ASEN6HUD_MEFP_FMAP.xml; Type: ENSEMBLE_FORECASTS; Orientation: RIGHT) on attempt 1 of 10. Continuing to retry until the maximum retry count of 10 is reached. There are 9 attempts remaining.
2022-10-04T12:32:48.819+0000 INFO DatabaseTimeSeriesIngester Succesfully ingested a time-series from (Disposition: XML_PI_TIMESERIES; URI: file:///home/ISED/wres/wresTestData/issue92087/inputs/ASEN6HUD_Precipitation_HEFS_Baseline_Validation.tgz/null/clone_hefs_hud85/EVSdata/198501171200_ASEN6HUD_MEFP_FMAP.xml; Type: ENSEMBLE_FORECASTS; Orientation: RIGHT) after 1 retries.

Both worker processes are reading data from the same .tgz file using the new ingest mechanism, not that that necessarily has anything to do with it. Ingest is taking a long time, so neither process is complete yet.

Hank

epag commented 2 months ago

Original Redmine Comment Author Name: James (James) Original Date: 2022-10-04T12:46:52Z


Right, that ingest behavior is expected when multiple clients are attempting to ingest the same thing at the same time. What you're witnessing there is in-band retries, which is one of the improvements associated with #99811 (previously, it would delay the retries until completion, potentially leading to an oome as the series to retry wait in main memory). edit: and, yes, it will be significantly slower to ingest the same things at the same time. edit2: and you shouldn't think of it as "stepping on toes", it is an orderly behavior that will eventually see both evaluations succeed and you will see two sub-directories, with separate evaluation ids, inside the outer job dir.

Regarding (1), that's a tricky one and will require some thought. Essentially, a worker shim that is processing a job when the broker dies needs to reclaim that job from the broker. The mechanism used with the graphics clients is that a graphics client claims a job and no graphics client can claim a claimed job. A job remains claimed until it reports completion, whether success or fail (including timed out).

epag commented 2 months ago

Original Redmine Comment Author Name: James (James) Original Date: 2022-10-04T12:56:55Z


You may need to adopt something similar to the way the stats messaging works, although that is undoubtedly more complicated than the way the tasker/workers are currently set-up.

The way it works for format subscribers is that a little dance takes place, initially. The core app (read: tasker in your case) advertises jobs and associated work (formats). The available graphics clients offer to do jobs (read: worker shim in your case). A little dance takes place and the core app decides on one client and then advertises that it has accepted that client. Then all clients know, including the one that accepted. Until that client has reported success or fail, it owns that job. One failure mode is a client timeout relative to regular progress updates, so no job lasts forever if the client becomes unresponsive.

epag commented 2 months ago

Original Redmine Comment Author Name: Hank (Hank) Original Date: 2022-10-04T13:15:24Z


Thanks for the tips on (1). I'm investigating (2), first, but am not optimistic this is going to be an easy solution.

The worker that was processing the evaluation when the broker stopped/started was left in a bad state, as expected. Here are the messages at the end of the docker logs:

2022-10-04T12:34:02.160+0000 [pool-3-thread-1] WARN wres.worker.JobOutputMessenger - Stopped watching for output early because sun.nio.fs.LinuxWatchService$LinuxWatchKey@3b1b9a3 reset was invalid.
2022-10-04T12:34:05.873+0000 [main] INFO wres.worker.WresProcess - Subprocess Process[pid=40, exitValue=5] exited 5
2022-10-04T12:34:05.882+0000 [main] ERROR wres.worker.Worker - Unchecked exception while talking to the broker
com.rabbitmq.client.AlreadyClosedException: connection is already closed due to connection error; protocol method: #method<connection.close>(reply-code=320, reply-text=CONNECTION_FORCED - broker forced connection closure with reason 'shutdown', class-id=0, method-id=0)
        at com.rabbitmq.client.impl.AMQConnection.ensureIsOpen(AMQConnection.java:175)
        at com.rabbitmq.client.impl.AMQConnection.createChannel(AMQConnection.java:615)
        at wres.worker.WresProcess.sendResponse(WresProcess.java:250)
        at wres.worker.WresProcess.call(WresProcess.java:181)
        at wres.worker.Worker.main(Worker.java:112)
Exception in thread "main" com.rabbitmq.client.AlreadyClosedException: connection is already closed due to connection error; protocol method: #method<connection.close>(reply-code=320, reply-text=CONNECTION_FORCED - broker forced connection closure with reason 'shutdown', class-id=0, method-id=0)
        at com.rabbitmq.client.impl.AMQConnection.ensureIsOpen(AMQConnection.java:175)
        at com.rabbitmq.client.impl.AMQConnection.createChannel(AMQConnection.java:615)
        at wres.worker.WresProcess.sendResponse(WresProcess.java:250)
        at wres.worker.WresProcess.call(WresProcess.java:181)
        at wres.worker.Worker.main(Worker.java:112)

The other worker, which picked up the evaluation after the restart, looks to be in a good state and is waiting for work.

Bottom line, even though I see the @Connection@ recovery attempts in the worker's logs, it does not appear as though the recovery is successful. Either that or we are not initializing our @Channel@ instances using the @Connection@ in a way that allows for recovery. I'll do some web searching to see if there is something specific we have to do with a @Channel@ to recover it.

Upping the estimated time. Thanks,

Hank

epag commented 2 months ago

Original Redmine Comment Author Name: James (James) Original Date: 2022-10-04T13:31:12Z


Hank, I am pretty sure a channel is a transient thing. A channel is like a lightweight connection within a connection used for multiplexing over a single tcp connection. When the connection dies, all channels die and they should not recover. I think you want tcp connection retries and new channels when a connection is established.

epag commented 2 months ago

Original Redmine Comment Author Name: Hank (Hank) Original Date: 2022-10-04T13:38:10Z


The documentation says its supposed to re-open channels:

https://www.rabbitmq.com/api-guide.html#:~:text=RabbitMQ%20Java%20client%20supports%20automatic,%2C%20bindings%2C%20and%20consumers).

If it doesn't re-open channels, then our current strategy virtually everywhere of creating a @Channel@ in a @try@ and leaving it open until the evaluation completes, potentially for hours, needs to be overhauled. That's why I'm hoping it will just re-open per the documentation. Otherwise, this is a pretty big change.

Hank

epag commented 2 months ago

Original Redmine Comment Author Name: James (James) Original Date: 2022-10-04T13:45:46Z


I think this may be semantics. As I say, I'm pretty sure a channel is supposed to be a transient thing and hence "re-opening" means opening anew.

You should also read the limitations of the strategy in your docs above as they are probably relevant here, notably:

When a connection is closed by the application via the Connection.Close method, connection recovery will not be initiated.

In other words, if you have a try-with-resources and this close is an @Autocloseable::close@, then I guess you won't get retries. That's what it sounds like. There is also this, but it is not relevant here, I think:

If initial client connection to a RabbitMQ node fails, automatic connection recovery won't kick in. Applications developers are responsible for retrying such connections, logging failed attempts, implementing a limit to the number of retries and so on.

epag commented 2 months ago

Original Redmine Comment Author Name: Hank (Hank) Original Date: 2022-10-04T13:57:02Z


When a connection is closed by the application via the Connection.Close method, connection recovery will not be initiated.

Saw the too. That's one of the things I have to figure out. However, there is evidence that it is attempting to recover, which is why I don't think this is coming into play.

The first failure appears to consistently be when the evaluation process completes. I'll focus my attention there and see if I can fix that failure,

Hank

epag commented 2 months ago

Original Redmine Comment Author Name: James (James) Original Date: 2022-10-04T14:08:40Z


Hank wrote:

However, there is evidence that it is attempting to recover, which is why I don't think this is coming into play.

If there is evidence of recovery attempts, then perhaps the recovery settings are not adequate for the downtime experienced. I would expect something like reconnections with exponential back-off for a cycle lasting for several tens of minutes at least. However, I think what your exception here points to:

Exception in thread "main" com.rabbitmq.client.AlreadyClosedException: connection is already closed due to connection error; protocol method: #method<connection.close>(reply-code=320, reply-text=CONNECTION_FORCED - broker forced connection closure with reason 'shutdown', class-id=0, method-id=0)
        at com.rabbitmq.client.impl.AMQConnection.ensureIsOpen(AMQConnection.java:175)
        at com.rabbitmq.client.impl.AMQConnection.createChannel(AMQConnection.java:615)
        at wres.worker.WresProcess.sendResponse(WresProcess.java:250)
        at wres.worker.WresProcess.call(WresProcess.java:181)
        at wres.worker.Worker.main(Worker.java:112)

Is that the broker terminated all client connections and that rabbitmq perhaps cannot recover easily from broker failures, only network failures. In other words, the recovery is network-failure-oriented, assumes that the broker is still running.

This thread seems to point to the same thing and provides a suggestion:

https://stackoverflow.com/questions/29760670/rabbitmq-connection-recovery-mechanism

epag commented 2 months ago

Original Redmine Comment Author Name: James (James) Original Date: 2022-10-04T14:11:21Z


Actually, that thread basically sounds identical to your problem, even the misleading connection retries.

epag commented 2 months ago

Original Redmine Comment Author Name: Hank (Hank) Original Date: 2022-10-04T14:18:07Z


Good find. Thanks!

Someone recommended a "shutdown-listener approach", which I'll look up, but it does sound like a pretty big change to fix this. Perhaps rather than starting the channels in multiple different places, I should have a single central point of control for the connection and associated channels.

First, let me read about the shutdown-listener approach,

Hank

epag commented 2 months ago

Original Redmine Comment Author Name: Hank (Hank) Original Date: 2022-10-04T14:23:08Z


So from the stack overflow,

The only way to work around this problem is to register a ShutDownListener and re-initialize the rabbit mq connection factory, connection, and the channels.

I need to listen for the ShutDownListener, start retrying a connection repeatedly, and then, once established, re-establish the connection and recreate the channels. Something like that.

If messages are attempted in the mean time, the default behavior of RabbitMQ when a Channel can't talk to the broker is to just dispose of such messages, I believe. I can implement that easily in a central control point by just logging that a message was attempted and discarded because the broker is not available. Or I can queue up the messages with the risk of running out of memory. Hmmm...

Hank

epag commented 2 months ago

Original Redmine Comment Author Name: James (James) Original Date: 2022-10-04T14:25:00Z


I think the upshot is that the wrapped client (worker shim) is responsible for reconnection attempts when the broker shuts down, not the rabbitmq client wrapper. In other words, you can register a shutdown listener on the client side and when that hook is engaged, you can retry connections. That way, the client doesn't except/terminate when the broker terminates. A bit fugly, tbh. This is handled better by an activemq client.

epag commented 2 months ago

Original Redmine Comment Author Name: James (James) Original Date: 2022-10-04T14:28:43Z


Hank wrote:

I need to listen for the ShutDownListener, start retrying a connection repeatedly, and then, once established, re-establish the connection and recreate the channels. Something like that.

Sounds right.

Hank wrote:

If messages are attempted in the mean time, the default behavior of RabbitMQ when a Channel can't talk to the broker is to just dispose of such messages, I believe. I can implement that easily in a central control point by just logging that a message was attempted and discarded because the broker is not available. Or I can queue up the messages with the risk of running out of memory. Hmmm...

Hmmm, that sounds weird. I would expect blocking behavior on the client side until a connection is re-established, so no messages sent or disposed of. On the broker side, I would expect all messages to be persisted and available on recovery. There shouldn't be any need for persistence on the client side until we need to worry about recovery of evaluations-in-progress, which may never happen (much, much harder and requires significant work in the core app too, plus local cluster server mode on each worker).

epag commented 2 months ago

Original Redmine Comment Author Name: Hank (Hank) Original Date: 2022-10-04T14:38:13Z


Hmmm, that sounds weird. I would expect blocking behavior on the client side until a connection is re-established, so no messages sent or disposed of. On the broker side, I would expect all messages to be persisted and available on recovery. There shouldn't be any need for persistence on the client side until we need to worry about recovery of evaluations-in-progress, which may never happen (much, much harder and requires significant work in the core app too, plus local cluster server mode on each worker).

The "until we need to worry about recovery of evaluations-in-progress" is the problem I am attempting to solve. I may not have stated that clearly.

If no evaluations are on-going, a broker going down and coming up is not a problem. The worker-shim seemingly reconnects fine, receives delivery of new jobs, and processes those new jobs correctly. Everything is hunky dory. I've seen this both when no worker is active at the time of restart, and one worker is active but the other isn't: the inactive worker receives jobs correctly.

I only have problems when an evaluation is in progress and the broker stops (but not the workers). When the broker comes back up, I see the evidence of reconnects, the broker starts delivering jobs to the worker-shim, even though the worker is already processing a job, and the existing @WresProcess@ keeps processing. When that process completes, that's when the errors happen that lead to the main thread ending.

The unlikelihood of this happening in the wild is why I was seeking a simple solution, but there does not appear to be one.

And so I wonder if we should just note the proposed solution and leave it for later until we actually see it happen in the wild.

Hank

epag commented 2 months ago

Original Redmine Comment Author Name: Hank (Hank) Original Date: 2022-10-04T14:41:28Z


Actually, I may not be understanding your comment, "recovery of evaluations-in-progress", correctly, since your solution points to something much more complicated than what I'm proposing. I don't see any need for changes in the core app.

Hank

epag commented 2 months ago

Original Redmine Comment Author Name: James (James) Original Date: 2022-10-04T14:51:13Z


You cannot recover an evaluation that started in the core app unless the core app has a recovery mode for evaluations in progress. It doesn't. When something goes wrong, it dies. But that is wasteful. Imagine an evaluation that takes 12 hours and you are at 11h30m and the app dies. Yuck. But that is a hard problem to solve and will need long-running workers aka cluster server mode on the worker, not a separate worker per evaluation. edit: and also a way to determine the cause of death, otherwise you may get into a doom loop of retried evaluations that are doomed to fail.

epag commented 2 months ago

Original Redmine Comment Author Name: James (James) Original Date: 2022-10-04T14:51:49Z


In other words, you're talking about restarting evaluations that died, I am talking about recovering them, which is much harder. edit: any why I simply mentioned this for context - to be clear, I am not proposing we address that one any time soon.

epag commented 2 months ago

Original Redmine Comment Author Name: James (James) Original Date: 2022-10-04T14:54:59Z


Hank wrote:

And so I wonder if we should just note the proposed solution and leave it for later until we actually see it happen in the wild.

It's up to you, but perhaps the shutdown hook is not that hard. But I am not looking at the code, so I cannot see how connections/channels are exposed. Unless you have a centralized abstraction for connections, like a pool, then it may be much harder to achieve quickly.

epag commented 2 months ago

Original Redmine Comment Author Name: Hank (Hank) Original Date: 2022-10-04T15:16:21Z


Got it. Thanks for clarifying.

What I have is a top level main that is creating a factory and creating a top level @Connection@ and receiving @Channel@; i.e., the @Channel@ that receives the job. That @Channel@ is passed into a job receiver which is responsible to initializing a @WresProcess@ when a job is received. The @Connection@ pulled from that @Channel@ is then handed off to the @WresProcess@ to setup the messengers needed to communication job information back to the tasker; i.e., job completion, output, stdout, stderr.

I think I'll need some sort of top-level @Connection@ and messaging control class. That way, the @Channel@ instances to support a process can be recreated when a @Connection@ is re-established. Messages would all be pushed through the control class, so that it can queue them up while the broker is down, and send them once the broker @Connection@ is re-established.

This would be a relative small job, perhaps a half-day of coding, if I had access to an IDE to code and test it. Unfortunately, while I have an IDE I can use for the first draft, testing and fixing it needs to be done through @vi@ on the -ti01. Ugly. Anyway, before I can start, I really need to look at the messaging classes more closely to gain a clearer understanding of what the messaging control class will look like.I can then update the estimated time and decide if its worth it.

I had selected this change because I was hoping it would be easier than #97182 and would be doable for 6.8. Now I'm not as sure... though with the AAR swallowing my time soon, maybe 6.8 deployment will be delayed. Tomorrow's ISED call may clarify that. Anyway, I have lunch to eat and then some more code to review and a decision to make this afternoon.

Thanks,

Hank

epag commented 2 months ago

Original Redmine Comment Author Name: Hank (Hank) Original Date: 2022-10-04T16:29:26Z


I have a good idea of how to do it using the existing messenger classes. I'll start on it after the WRDS meeting,

Hank

epag commented 2 months ago

Original Redmine Comment Author Name: Hank (Hank) Original Date: 2022-10-04T20:39:25Z


OHRFC's issue is taking precedence, and I think I need to turn my attention to evaluation failures when posting data not removing the data ticket, if I can find it.

My idea did evolve as I started coding it. Instead of having some central resource for the connection and channels, I was thinking about wrapping @Connection@ inside of a @WresConnection@ that can handle shutdowns, etc., and then having it rebuild the channels as necessary. All requests for channel would still be obtained via @connection.getChannel()@, but would call the @WresConnection@ version, which would take arguments that identify the channel (need to think about that), and the @Channel@ would not be kept locally. Rather, whenever the @Channel@ is needed, the @WresConnection.getChannel@ is called to obtain it.

Anyway, I only started the coding and then stopped due to meetings and #103897. On hold for now,

Hank

epag commented 2 months ago

Original Redmine Comment Author Name: James (James) Original Date: 2022-10-04T21:21:55Z


Ordinarily, you would have a connection pool that manages connections and eliminates stale connections. We use Hikari for this behavior when managing database connections. However, in this case, connection pooling is more of an anti-pattern because channels are cheaper than connections, so having one connection with a pool of channels sounds more appropriate, but with heartbeating and retries on the connection (edit: expiring/refreshing the channels as needed). So I think having some kind of wrapper could work. But please don't call it a @WresConnection@ :-) Call it a @ConnectionWithRetries@ or a @ResilientConnection@ or something descriptive like that. But is sounds like a viable strategy.

epag commented 2 months ago

Original Redmine Comment Author Name: Hank (Hank) Original Date: 2022-10-05T11:22:07Z


How about @HankConnection@? :)

Hank

epag commented 2 months ago

Original Redmine Comment Author Name: Hank (Hank) Original Date: 2022-10-05T15:32:58Z


James,

Quick question because my Java is rusty...

If I need to map two @String@ instances to a single value, what can I use as a key that contains those two string instances? I suppose I could combine them into one string, but that feels hokey.

Hank