Closed guibou closed 2 months ago
Wow this shouldn't be happening.
Is this a known limitation that code with before(All) should be run in a sequential context, even if the associated resource allocations are independent?
Nope, that should all just work.
The tests run with sequential
because it uses mvars to test behaviour.
It looks like the tests have ignored this problem as a result.
Do you have an obvious idea where the problem may be? If not, tell me and I'll start investigating (We care about sydtest now and this behavior is really annoying right now so I can justify using my company time to fix this).
I only see one MVar
in the async runner: the failFastVar
.
The --fail-fast
option doesn't work very well with the async runner IIRC (I don't use --fail-fast
with the async runner, but use --debug
instead), so we may be able to fix multiple problems at once.
If not, tell me and I'll start investigating
Make sure to look at the development
branch because the sydtest test suite has been refactored out of the sydtest
package. It's now in a separate sydtest-test
package.
I only see one MVar in the async runner: the failFastVar.
You are right! But actually, I think the problem is elsewhere and that's only a symptom.
Having a look at jobQueueWorker
:
jobQueueWorker :: JobQueue -> Int -> IO ()
jobQueueWorker jobQueue workerIx = do
let workingSem = jobQueueWorking jobQueue V.! workerIx
forever $ do
job <- dequeueJob jobQueue
print "dequeue"
bracket_
(print "waitQem" >> waitQSem workingSem >> print "gotSem")
(signalQSem workingSem)
(job workerIx)
(the print
are mine)
And blockUntilDone
:
-- | Block until all workers are done (waiting to dequeue a job).
blockUntilDone :: JobQueue -> IO ()
blockUntilDone JobQueue {..} = do
-- Wait until the queue is empty.
print "wait empty"
atomically $ isEmptyTBQueue jobQueueTBQueue >>= STM.check
print "wait all"
-- No new work can be started now, because the queue is empty.
-- That means that all workers are either waiting for another job or still
-- doing a job.
-- Wait for all workers to stop working.
-- That means that they're all just done working now or waiting for another job.
-- Both are fine.
V.forM_ jobQueueWorking waitQSem
-- The workers are now all done and the queue is empty, and this is the only
-- thread enqueueing jobs, so no work is happening.
-- Release all the workers so they can work again after this function.
print "signal all"
V.forM_ jobQueueWorking signalQSem
The semantic is that the complete tree of job to be run is walked, pushing job to the queue, then blockUntilDone
is called.
Now suppose that the workers hadn't time to get the jobs from the queue. The queue is then containing 1
item (if our spec had only one it
).
blockUntilDone
will wait on the queue to be empty. This will happen immediatly after jobQueueWorker
picked the job with dequeueJob
However, now, the job
is not in the queue anymore, so V.forM_ jobQueueWorking waitQSem
is run, consumnig all the job token. And then signalQSem
is done, refreshing all job token, and then blockUNtilDone
terminates, the main loop terminates.
BUT in jobQueueWoker
, we just enter the bracket_
.
The race condition is where I print "dequeue"
, if the job stays too long there because of scheduling, the blockUntilDone
can terminate.
The solution would be to ensure that dequeuing the job AND picking a semaphore token happen in an atomic way.
I guess this output confirm this scenario:
"wait empty"
"dequeue"
"wait all"
"waitQem"
"signal all"
"gotSem"
"failure of failFastVar"
"failure of var"
"failure of race"
"end of printer"
diff --git a/sydtest/src/Test/Syd/Runner/Asynchronous.hs b/sydtest/src/Test/Syd/Runner/Asynchronous.hs
index 413c354..c90ade0 100644
--- a/sydtest/src/Test/Syd/Runner/Asynchronous.hs
+++ b/sydtest/src/Test/Syd/Runner/Asynchronous.hs
@@ -73,14 +73,14 @@ data JobQueue = JobQueue
jobQueueTBQueue :: !(TBQueue Job),
-- | One semaphore per worker, which needs to be awaited before the worker
-- can start doing a job.
- jobQueueWorking :: !(Vector QSem)
+ jobQueueWorkingCount :: !(TVar Int)
}
-- | Make a new job queue with a given number of workers and capacity
newJobQueue :: Word -> Word -> IO JobQueue
newJobQueue nbWorkers spots = do
jobQueueTBQueue <- newTBQueueIO (fromIntegral spots)
- jobQueueWorking <- V.replicateM (fromIntegral nbWorkers) (newQSem 1)
+ jobQueueWorkingCount <- newTVarIO (fromIntegral 0)
pure JobQueue {..}
-- | Enqueue a job, block until that's possible.
@@ -105,11 +105,11 @@ blockUntilDone JobQueue {..} = do
-- Wait for all workers to stop working.
-- That means that they're all just done working now or waiting for another job.
-- Both are fine.
- V.forM_ jobQueueWorking waitQSem
- -- The workers are now all done and the queue is empty, and this is the only
- -- thread enqueueing jobs, so no work is happening.
- -- Release all the workers so they can work again after this function.
- V.forM_ jobQueueWorking signalQSem
+ atomically $ do
+ c <- readTVar jobQueueWorkingCount
+ if c > 0
+ then retry
+ else pure ()
withJobQueueWorkers :: Word -> JobQueue -> IO a -> IO a
withJobQueueWorkers nbWorkers jobQueue func =
@@ -121,14 +121,16 @@ withJobQueueWorkers nbWorkers jobQueue func =
(\_ -> func)
jobQueueWorker :: JobQueue -> Int -> IO ()
-jobQueueWorker jobQueue workerIx = do
- let workingSem = jobQueueWorking jobQueue V.! workerIx
+jobQueueWorker JobQueue{..} workerIx = do
forever $ do
- job <- dequeueJob jobQueue
- bracket_
- (waitQSem workingSem)
- (signalQSem workingSem)
- (job workerIx)
+ bracket
+ (atomically $ do
+ job <- readTBQueue jobQueueTBQueue
+ modifyTVar' jobQueueWorkingCount (+1)
+ pure job
+ )
+ (\_ -> atomically $ modifyTVar jobQueueWorkingCount (subtract 1))
+ (\job -> job workerIx)
-- The plan is as follows:
--
This looks like to work. I'll open an MR tomorrow with comment and more tests.
The idea is instead of a semaphore, I use a TVar counting the number of running jobs. The bracket for a job atomically dequeue and increase the count or decreases it.
The wait all jobs ensure that the queue is empty (as before) and then waits until the job count is 0 (maybe both operations need to happen in the same transaction, I need to think).
edit:
Actually, the race condition ALWAYS exists, and is not related to the usage of before
in any way. The problem appears even with this simple codebase:
import Test.Syd
main = sydTest $ do
describe "test" $ do
it "cheval" $ do
1 `shouldBe` 1
However, the more test the test group contains, the less the chance of the race condition have to appear because while waiting for the i-1
sem in V.forM_ jobQueueWorking waitQSem
, it gives "enough" time for the i...N
other workers to do their waitQSem
after picking a job.
So it explains why this could have been hidden for so long, it is really rare to have a test suite with really few AND slow items.
I'll write the MR.
Context
edit the race condition happen even with a simple
it
, I've updated the title, but not the rest of the description.A really simple test suite with
beforeAll_
can either fail the test withAsynCancelled
or crash the complete test suite withMain: thread blocked indefinitely in an MVar operation
.Note that I've read the "footgun" documentation section for
around
but it don't think I'm in this context.Reproduction
I do have the following test suite:
Built with
ghc -threaded Main.hs
and run with./Main --continuous +RTS -N
(in my context-N
means-N8
)After a while:
Note that sometime I also have something like:
This is with the following test suite which is basically the same as before, but the code in the
it
(the one which will be run in a "job" by the scheduler) takes longer.Discussion
My tests are run in parallel, but I'm surprised to see that most test in
sydtest
test suite usingbeforeAll
are run withsequential
.My current guess is that they may be a race condition in the Asynchroneous runner. Either the thread producing on an
MVar
is killed and the complete runner crashs, or the opposite and asyncrace
(orconcurrently
, ...) raisesAsyncCancelled
in the job running the current content. My second example, which takes more time in the "job" part seems to trigger this more often.edit I've also tried with
before
and I'm observing the same problem.There is no problem if this part of the suite is run with
sequential
Questions
before(All)
should be run in asequential
context, even if the associated ressource allocations are independent?