m-barthelemy / vapor-queues-fluent-driver

A Fluent implementation for https://github.com/vapor/queues (Vapor4)
MIT License
32 stars 16 forks source link

Jobs sometimes executed multiple times. #27

Open sidepelican opened 1 year ago

sidepelican commented 1 year ago

Jobs sometimes executed multiple times.

Issue

I have found a concurrency safety issue. A dispatched job will be dequeued twice with same jobID and payload.

Reproducing

This is a reproducing repository: https://github.com/sidepelican/QueuesFluentDriverMultipleExecution This repository dispatches a simple job several times, and automatically detects when the same job launched multiple times.

$ swift run
...
2022-12-19T19:08:24+0900 info main : job_id=940F558A-EC00-47B9-8935-45D884281708 p_id=51675C99-B581-4555-9072-A376D1E95770 [App] EchoJob!
2022-12-19T19:08:24+0900 info main : job_id=940F558A-EC00-47B9-8935-45D884281708 p_id=51675C99-B581-4555-9072-A376D1E95770 [App] EchoJob!
p_id=51675C99-B581-4555-9072-A376D1E95770 is multiple executed!

Cause

  1. Queues calls Queue.set and Queue.push in Queue.dispatch.

https://github.com/vapor/queues/blob/c95c891c3c04817eac1165587fb02457c749523a/Sources/Queues/Queue.swift#L84-L86

  1. FluentQueue.set save a JobModel. JobModel.state has .pending as initial state.

https://github.com/m-barthelemy/vapor-queues-fluent-driver/blob/b301371af7cbee57669da9b4acc329085699b7f1/Sources/QueuesFluentDriver/FluentQueue.swift#L31-L40

  1. FluentQueue.push writes the job's state to pending. The default value of state is .pending, so this operation is seemingly meaningless.

https://github.com/m-barthelemy/vapor-queues-fluent-driver/blob/b301371af7cbee57669da9b4acc329085699b7f1/Sources/QueuesFluentDriver/FluentQueue.swift#L60-L69

  1. The jobs set in 2 is ready for the workers to dequeue. What happens if a worker dequeues a job set in 2 between 2 and 3? The worker set the state to .processing and then it is overridden to .pending in 3.
  2. The state is .pending so another worker can dequeue the job. Incident happens.

How to fix?

I think there are two ways. One is to add .initialized to QueuesFluentJobState and use it as an initial value of JobModel.state. The other is to do nothing in FluentQueue.push.