purescript-contrib / purescript-aff-bus

Many-to-many broadcasting
Apache License 2.0
16 stars 12 forks source link

Attempting to write to a bus during an existing write is ignored #5

Closed garyb closed 6 years ago

garyb commented 6 years ago
test_writeInWrite ∷ ∀ eff. Aff (Effects eff) Unit
test_writeInWrite = do
  bus ← Bus.make
  ref ← liftEff $ newRef false

  let
    proc = do
      res ← Bus.read bus
      case res of
        1 → Bus.write 2 bus
        2 → liftEff (writeRef ref true)
        _ → pure unit
      proc

  f ← forkAff proc

  Bus.write 1 bus

  assert =<< liftEff (readRef ref)
  killFiber (error "Done") f
  log "OK"
garyb commented 6 years ago

Hmm, this might be an AVar problem in general actually, since Bus is a very thin layer over AVar.

garyb commented 6 years ago

Forking and introducing some async makes it work:

test_writeInWrite ∷ ∀ eff. Aff (Effects eff) Unit
test_writeInWrite = do
  bus ← Bus.make
  ref ← liftEff $ newRef false

  let
    proc = do
      res ← Bus.read bus
      traceAnyA { "proc-in": res }
      case res of
        1 → void $ forkAff do
          delay (Milliseconds 10.0)
          Bus.write 2 bus
        2 → liftEff (writeRef ref true)
        _ → pure unit
      traceAnyA "proc-out"
      proc

  f ← forkAff proc

  Bus.write 1 bus

  delay (Milliseconds 100.0)

  assert =<< liftEff (readRef ref)
  killFiber (error "Done") f
  log "OK"

But interestingly, if you change the definition of write to:

write a (Bus avar) = putVar a avar *> void (takeVar avar)

(put rather than tryPut) you get this crazy nonsense from the traces in the above:

{ 'proc-in': 1 }
'proc-out'
{ 'proc-in': 1 }
'proc-out'
{ 'proc-in': 2 }
'proc-out'
{ 'proc-in': 2 }
'proc-out'
{ 'proc-in': 2 }
'proc-out'
{ 'proc-in': 2 }
'proc-out'

Again perhaps pointing at AVar problems in general.

garyb commented 6 years ago

I tried changing the tryPut to put as I thought it might be related to that as to why the write 2 was being ignored, but it shouldn't be, as if you print the result of tryTakeVar it's always Nothing:

  traceAnyA =<< tryTakeVar avar
  tryPutVar a avar *> void (takeVar avar)

in the first example/failing test... so whatever the cause of things going missing it's not because of the AVar already being populated.

natefaubion commented 6 years ago

It could be that the implementation is invoking the write callback before emptying the avar.

natefaubion commented 6 years ago

I would not expect your original example to work, even with the old Bus. In particular:

    proc = do
      res ← Bus.read bus
      case res of
        1 → Bus.write 2 bus
        2 → liftEff (writeRef ref true)
        _ → pure unit
      proc

Upon receiving 1, you write 2, but you are not reading from the Bus again yet, so you would never receive the message you sent yourself. Bus has always been ephemeral. If you wanted to make sure you got the message you sent yourself, you could use parallelism:

    proc = do
      res ← Bus.read bus
      case res of
        1 → sequential (parallel proc <|> parallel (Bus.write 2 bus))
        2 → liftEff (writeRef ref true) *> proc
        _ → proc
natefaubion commented 6 years ago

Hmm, that one might leak. Recursing forever in parallelism is not a good idea. I think we need an unsafe fork combinator which is essentially liftEff <<< launchAff, to ensure we don't hold onto these references.

natefaubion commented 6 years ago

This still relies on the order Aff evaluates forks, which is supposed to be opaque to an end user. I don't think it's possible with Bus as is to guarantee a self message without additional buffering.

garyb commented 6 years ago

Yeah, after our discussion yesterday I realised the above would never work due to the read not being ready again yet. Using old bus fixed the real problem we were having rather than this example, but we also managed to fix it a different way by using a queued-read-forever thing. I'd like to provide a readForever or something in this library to avoid the various footguns we're encountering just now, so will open a PR when we're sure we have something that works.