Building off of #89, this PR adds support for Symfony Messenger. Primarily this involves listening for a WorkerMessageFailedEvent (caused by unhandled errors in a job) and notifying when this happens. However, this doesn't quite work because of batch sending mode , where we flush the queue of events on shutdown. Worker processes are generally long lived and therefore won't trigger this flush often, which means events would end up not being delivered
To fix this, we flush the queue after a WorkerMessageFailedEvent, which will send the event from the failure and any manual notify calls. We also flush the queue after each job finishes by listening for WorkerMessageHandledEvent in order to send manual notify calls when jobs succeed
We will report jobs that will be retried, because a failed job could indicate a problem even if it eventually succeeds. This is also what we do on other platforms, e.g. Sidekiq in Ruby. The willRetry flag is added as metadata under the Messenger key, so jobs that will be retried can be ignored with a simple callback:
Goal
Building off of #89, this PR adds support for Symfony Messenger. Primarily this involves listening for a
WorkerMessageFailedEvent
(caused by unhandled errors in a job) and notifying when this happens. However, this doesn't quite work because of batch sending mode , where we flush the queue of events on shutdown. Worker processes are generally long lived and therefore won't trigger this flush often, which means events would end up not being deliveredTo fix this, we flush the queue after a
WorkerMessageFailedEvent
, which will send the event from the failure and any manual notify calls. We also flush the queue after each job finishes by listening forWorkerMessageHandledEvent
in order to send manual notify calls when jobs succeedWe will report jobs that will be retried, because a failed job could indicate a problem even if it eventually succeeds. This is also what we do on other platforms, e.g. Sidekiq in Ruby. The
willRetry
flag is added as metadata under theMessenger
key, so jobs that will be retried can be ignored with a simple callback:Changeset
symfony/messenger
to CIWorkerMessageFailedEvent
to notify on failureWorkerMessageHandledEvent
to flush on success