awakesecurity / gRPC-haskell

Haskell gRPC support
https://hackage.haskell.org/package/grpc-haskell
Apache License 2.0
236 stars 74 forks source link

Server crashes when using async with ServerStreaming #135

Closed lzszt closed 2 years ago

lzszt commented 2 years ago

I'm trying to implement a server, which spawns an asynchronous action. I then call send function from within this async block. This results in the server crashing with various errors:

E0511 07:02:51.828270196   10996 server.cc:201]              assertion failed: queue.Pop() == nullptr
Aborted (core dumped)
double free or corruption (out)
Aborted (core dumped)
double free or corruption (!prev)
Segmentation fault (core dumped)
corrupted size vs. prev_size while consolidating
Aborted (core dumped)

The server implementation looks like this:

startSubscriptionHandler ::
  ServerRequest
    'ServerStreaming
    Api.StartStreamRequest
    Api.StreamNotification ->
  IO
    (ServerResponse 'ServerStreaming Api.StreamNotification)
startSubscriptionHandler (ServerWriterRequest _metadata Api.StartStreamRequest send) = do
  reponseCode <- do
    async $
      forM_ [1 :: Word32 .. 1000000] $ \i -> do
        res <- send $ Api.StreamNotification i
        case res of
          Left err -> fail $ "error: " <> show err
          Right () -> pure ()
    pure StatusOk
  pure $ ServerWriterResponse mempty reponseCode mempty

main :: IO ()
main = do
  Api.dataApiServer
    ( Api.DataApi
        startSubscriptionHandler
    )
    defaultServiceOptions

Am I doing something wrong? Or is the use async in this context not supported?

gbaz commented 2 years ago

This looks wrong to me. My recollection is that sending the response finishes the interaction -- so you can only send the response after you finish sending the streaming results.

lzszt commented 2 years ago

Thank you, this seems to have fixed to problem.