status-im / nim-chronos

Chronos - An efficient library for asynchronous programming
https://status-im.github.io/nim-chronos/docs/chronos
Apache License 2.0
353 stars 51 forks source link

Address #329. #330

Closed cheatfate closed 1 year ago

Menduist commented 1 year ago

This seems to address the Finished state, but not the Error state. Here is a modification to the test for Error state:

  test "AsyncStream(AsyncStream) write on error test":
    proc testWriteEof(address: TransportAddress): Future[bool] {.async.} =
      let
        size = 10240
        message = createBigMessage("ABCDEFGHIJKLMNOP", size)

      proc processClient(server: StreamServer,
                         transp: StreamTransport) {.async.} =
        var wstream = newAsyncStreamWriter(transp)
        var wbstream = newBoundedStreamWriter(wstream, uint64(size))
        try:
          check wbstream.atEof() == false
          await wbstream.write(message)
          check wbstream.atEof() == false
          await transp.closeWait()
          # Put the stream in error state
          expect BoundedStreamOverflowError:
            await wbstream.write(message)
          echo "before write"
          await wbstream.write(message)
          echo "AFTER WRITE"
          await wbstream.closeWait()
          check wbstream.atEof() == true
        finally:
          await wstream.closeWait()
          await transp.closeWait()

      let flags = {ServerFlags.ReuseAddr, ServerFlags.TcpNoDelay}
      var server = createStreamServer(address, processClient, flags = flags)
      server.start()
      var conn = await connect(server.localAddress())
      try:
        discard await conn.consume()
      finally:
        await conn.closeWait()
        server.stop()
        await server.closeWait()
      return true

    check waitFor(testWriteEof(initTAddress("127.0.0.1:46001"))) == true

diff with your test:

@@ -497,10 +497,13 @@ suite "AsyncStream test suite":
           check wbstream.atEof() == false
           await wbstream.write(message)
           check wbstream.atEof() == false
-          await wbstream.finish()
-          check wbstream.atEof() == true
-          expect AsyncStreamWriteEOFError:
+          await transp.closeWait()
+          # Put the stream in error state
+          expect BoundedStreamOverflowError:
             await wbstream.write(message)
+          echo "before write"
+          await wbstream.write(message)
+          echo "AFTER WRITE"
           await wbstream.closeWait()
           check wbstream.atEof() == true
         finally:

If you run this, you'll see that "AFTER WRITE" is never reached

cheatfate commented 1 year ago

I'm not sure what you are trying to achieve with your additions to test, but what i see that it works correctly. Second write call should generate same exception like first one, so your AFTER WRITE will never be printed.

Proper version of test should looks like this, and it works:

      proc processClient(server: StreamServer,
                         transp: StreamTransport) {.async.} =
        var wstream = newAsyncStreamWriter(transp)
        var wbstream = newBoundedStreamWriter(wstream, uint64(size))
        try:
          check wbstream.atEof() == false
          await wbstream.write(message)
          check wbstream.atEof() == false
          await wbstream.finish()
          check wbstream.atEof() == true
          expect AsyncStreamWriteEOFError:
            await wbstream.write(message)
          expect AsyncStreamWriteEOFError:
            await wbstream.write(message)
          expect AsyncStreamWriteEOFError:
            await wbstream.write(message)
          await wbstream.closeWait()
          check wbstream.atEof() == true
        finally:
          await wstream.closeWait()
          await transp.closeWait()
Menduist commented 1 year ago

My test puts the stream into the Error state, which this PR does not handle. The same thing happens with TLS stream when the underlying connection is abruptly closed: https://github.com/status-im/nim-chronos/blob/189f6e390c4ed73921b8b847b65d4f1d331752f4/chronos/streams/tlsstream.nim#L151-L154

I slightly tweaked my test to show that the write is indeed hanging:

  test "AsyncStream(AsyncStream) write on error test":
    proc testWriteEof(address: TransportAddress): Future[bool] {.async.} = 
      let
        size = 10240
        message = createBigMessage("ABCDEFGHIJKLMNOP", size)
        finished = newFuture[void]()

      proc processClient(server: StreamServer,
                         transp: StreamTransport) {.async.} = 
        var wstream = newAsyncStreamWriter(transp)
        var wbstream = newBoundedStreamWriter(wstream, uint64(size))
        try:
          check wbstream.atEof() == false
          await wbstream.write(message)
          check wbstream.atEof() == false
          # Put the stream in error state by closing the underlying connection
          await transp.closeWait()
          expect BoundedStreamOverflowError:
            await wbstream.write(message)
          check wbstream.state == AsyncStreamState.Error # Now we are in error state
          expect AsyncStreamWriteEOFError:
            # hangs instead of popping error
            await wbstream.write(message).wait(5.seconds)
          await wbstream.closeWait()
          check wbstream.atEof() == true
        finally:
          await wstream.closeWait()
          await transp.closeWait()
          finished.complete()

      let flags = {ServerFlags.ReuseAddr, ServerFlags.TcpNoDelay}
      var server = createStreamServer(address, processClient, flags = flags)
      server.start()
      var conn = await connect(server.localAddress())
      try:
        discard await conn.consume()
        await finished
      finally:
        await conn.closeWait()
        server.stop()
        await server.closeWait()
      return true

    check waitFor(testWriteEof(initTAddress("127.0.0.1:46001"))) == true
    /home/tavon/Status/nim-chronos/tests/testasyncstream.nim(505, 17): Expect Failed, unexpected AsyncTimeoutError (Timeout exceeded!) was thrown.
/home/tavon/.nimble/pkgs/unittest2-0.0.5/unittest2.nim(848) testasyncstream
/home/tavon/.nimble/pkgs/unittest2-0.0.5/unittest2.nim(898) runTest`gensym1048
/home/tavon/Status/nim-chronos/chronos/asyncloop.nim(1203) waitFor
/home/tavon/Status/nim-chronos/chronos/asyncloop.nim(295) poll
/home/tavon/Status/nim-chronos/chronos/asyncmacro2.nim(288) processClient
/home/tavon/Status/nim-chronos/chronos/asyncfutures2.nim(505) internalCheckComplete

  [FAILED] AsyncStream(AsyncStream) write on error test

Instead of an AsyncTimeoutError caused by wait, the write should fail instantly

cheatfate commented 1 year ago

Your test is incorrect, it fails because expectation is wrong, i even not sure why did you expect that exceptions... Please check my previous test and you understand what exceptions you could expect.

cheatfate commented 1 year ago

Also this wstream.error is not reliable source of information, it could be cleared or even not present.

Menduist commented 1 year ago

My test was just here to highlight that writing on Error state was hanging, but it is now fixed with your latest commit that checks that the future is still running, so thanks. If we can merge this ASAP as this is still causing FD leaks in nwaku, would be great, thanks

Menduist commented 1 year ago

Seems to fix the issue correctly, anything missing here?