kazu-yamamoto / http2

HTTP/2.0 library including HPACK
BSD 3-Clause "New" or "Revised" License
86 stars 23 forks source link

Slow or stuck stream consumer can cause memory to blow up #62

Closed akshaymankar closed 2 weeks ago

akshaymankar commented 1 year ago

Here is an example with a stuck stream consumer:

{-# LANGUAGE OverloadedStrings #-}

module Main where

import Control.Concurrent
import Control.Concurrent.Async
import Control.Exception
import Control.Monad
import qualified Data.ByteString as BS
import qualified Data.ByteString.Builder as Builder
import qualified Data.ByteString.Lazy as LBS
import Data.Streaming.Network
import Network.HTTP.Types
import qualified Network.HTTP2.Client as Client
import qualified Network.HTTP2.Server as Server
import Network.Socket

testServer :: Server.Request -> Server.Aux -> (Server.Response -> [Server.PushPromise] -> IO ()) -> IO ()
testServer req _ respWriter = do
  case Server.requestPath req of
    Just "/inifite" -> do
      let infiniteBSWriter :: (Builder.Builder -> IO ()) -> IO () -> IO ()
          infiniteBSWriter bsWriter flush = do
            bsWriter $ Builder.lazyByteString $ LBS.concat $ replicate 1000 "foo\n"
            flush
            infiniteBSWriter bsWriter flush
          infiniteResponse = Server.responseStreaming status200 [] infiniteBSWriter
      respWriter infiniteResponse []
    _ -> do
      respWriter (Server.responseNoBody status404 []) []

testClient :: Client.Client ()
testClient client = client (Client.requestNoBody "GET" "/inifite" []) $ \res ->
  if Client.responseStatus res == Just status200
    then do
      bs <- Client.getResponseBodyChunk res
      putStrLn $ "Got chunk of size: " <> show (BS.length bs)
      threadDelay maxBound
    else error $ "the response isn't 200, due to a typo?"

main :: IO ()
main = do
  bracket (bindRandomPortTCP "*") (close . snd) $ \(serverPort, listenSock) -> do
    listen listenSock 1024
    _ <- async $ forever $ do
      (sock, _) <- accept listenSock
      let cleanup cfg = do
            Client.freeSimpleConfig cfg
            close sock
      async $ bracket (Server.allocSimpleConfig sock 4096) cleanup $ \cfg -> do
        Server.run cfg testServer

    let clientConfig =
          Client.ClientConfig
            { Client.scheme = "http",
              Client.authority = "localhost",
              Client.cacheLimit = 20
            }
    bracket (fst <$> getSocketTCP "localhost" serverPort) close $ \sock ->
      bracket (Client.allocSimpleConfig sock 4096) Client.freeSimpleConfig $ \http2Cfg -> do
        Client.run clientConfig http2Cfg $ testClient
    pure ()

I can see the traffic in wireshark, so this should mean that the server isn't stuck and filling up the memory. So, this has to be on the client side.

kazu-yamamoto commented 1 year ago

This is probably because the flow control is disabled in v4.1.0 Do you see this bug also in v4.0.0?

akshaymankar commented 1 year ago

I can confirm this doesn't happen with v4.0.0.

akshaymankar commented 1 year ago

However, with v4.0.0, the stuck stream causes other streams to also get stuck:

Here is an updated example:

{-# LANGUAGE OverloadedStrings #-}

module Main where

import Control.Concurrent
import Control.Concurrent.Async
import Control.Exception
import Control.Monad
import qualified Data.ByteString as BS
import qualified Data.ByteString.Builder as Builder
import qualified Data.ByteString.Lazy as LBS
import Data.Streaming.Network
import Network.HTTP.Types
import qualified Network.HTTP2.Client as Client
import qualified Network.HTTP2.Server as Server
import Network.Socket

testServer :: Server.Request -> Server.Aux -> (Server.Response -> [Server.PushPromise] -> IO ()) -> IO ()
testServer req _ respWriter = do
  case Server.requestPath req of
    Just "/inifite" -> do
      let infiniteBSWriter :: (Builder.Builder -> IO ()) -> IO () -> IO ()
          infiniteBSWriter bsWriter flush = do
            bsWriter $ Builder.lazyByteString $ LBS.concat $ replicate 1000 "foo\n"
            flush
            infiniteBSWriter bsWriter flush
          infiniteResponse = Server.responseStreaming status200 [] infiniteBSWriter
      respWriter infiniteResponse []
    Just "/echo" -> do
      reqBody <- readRequestBody req
      respWriter (Server.responseBuilder status200 [] (Builder.lazyByteString reqBody)) []
    _ -> do
      respWriter (Server.responseNoBody status404 []) []

stuckClient :: Client.Client ()
stuckClient client = client (Client.requestNoBody "GET" "/inifite" []) $ \res ->
  if Client.responseStatus res == Just status200
    then do
      bs <- Client.getResponseBodyChunk res
      putStrLn $ "Got chunk of size: " <> show (BS.length bs)
      threadDelay maxBound
    else error $ "the response isn't 200, due to a typo?"

echoClient :: Client.Client ()
echoClient client = client (Client.requestBuilder "GET" "/echo" [] "Yo") $ \res ->
  if Client.responseStatus res == Just status200
    then do
      resp <- readResponseBody res
      putStrLn $ "Got echo response: " <> show resp
    else error $ "the response isn't 200, due to a typo?"

main :: IO ()
main = do
  bracket (bindRandomPortTCP "*") (close . snd) $ \(serverPort, listenSock) -> do
    listen listenSock 1024
    _ <- async $ forever $ do
      (sock, _) <- accept listenSock
      let cleanup cfg = do
            Client.freeSimpleConfig cfg
            close sock
      async $ bracket (Server.allocSimpleConfig sock 4096) cleanup $ \cfg -> do
        Server.run cfg testServer

    let clientConfig =
          Client.ClientConfig
            { Client.scheme = "http",
              Client.authority = "localhost",
              Client.cacheLimit = 20
            }
    bracket (fst <$> getSocketTCP "localhost" serverPort) close $ \sock ->
      bracket (Client.allocSimpleConfig sock 4096) Client.freeSimpleConfig $ \http2Cfg -> do
        Client.run clientConfig http2Cfg $ \sendReq -> do
          stuck <- async $ stuckClient sendReq
          threadDelay 100000
          echoClient sendReq
          putStrLn $ "!!!! echo request worked even if stuck request was stuck !!!!"
          wait stuck
    pure ()

readRequestBody :: Server.Request -> IO LBS.ByteString
readRequestBody req = readChunks (Server.getRequestBodyChunk req)

readResponseBody :: Client.Response -> IO LBS.ByteString
readResponseBody res = readChunks (Client.getResponseBodyChunk res)

readChunks :: IO BS.ByteString -> IO LBS.ByteString
readChunks action = LBS.fromChunks <$> go []
  where
    go chunks = do
      action >>= \c -> case c of
        "" -> pure chunks
        _ -> go (c : chunks)
kazu-yamamoto commented 1 year ago

Thank you for your research and the code. I will look into it.

kazu-yamamoto commented 1 year ago

With your first example and http2 v4.1.0, I cannot reproduce memory-blow-up on macOS. I'm watching the process by ps.

Would you tell me your OS?

akshaymankar commented 1 year ago

I am using Linux, NixOS specifically.

kazu-yamamoto commented 1 year ago

@akshaymankar I would appreciated if you was able to test this issue to v5.0.0.

kazu-yamamoto commented 4 months ago

The new server architecture has been merged. In the old model, a lot of threads are created for streaming. But in the current model, only two threads are spawned for streaming.

kazu-yamamoto commented 2 weeks ago

Closing. Please reopen if necessary.