axman6 / amazonka-s3-streaming

Provides a conduit based interface to uploading data to S3 using the Multipart API
MIT License
20 stars 23 forks source link

Amazonka-2.0: Better streaming of parts? #26

Open endgame opened 2 years ago

endgame commented 2 years ago

At a high level, amazonka-s3-streaming draws ByteStrings from a conduit and assembles them into the parts of an S3 Multi-Part Upload:

https://github.com/axman6/amazonka-s3-streaming/blob/b5e41dcf16141446b8d35175ef8e55c80287e2ce/src/Network/AWS/S3/StreamingUpload.hs#L88-L91

At the moment, chunked uploads are broken on the Hackage 1.6.1 release of amazonka (see e.g. https://github.com/brendanhay/amazonka/issue/596 https://github.com/brendanhay/amazonka/issue/547), but once amazonka-2.0 comes out, better streaming would become possible - but for some frustrating limitations:

ghost commented 2 years ago

I think I agree on all points, though I think that in the branch which will target amazonka >= 1.6.1 I'll stick to the current implementation of building a buffer and writing that out when its full (needing some the changes we've spoken about regarding capping the chunk size etc.)

For Amazonka 2.0, I wouldn't mind playing with concurrency for the chunk sending, but what I'm not too sure about is what constraints I am happy to place on the user so that I can fork threads and still manage resources and exceptions. Lifted-async seems useful but I don't know how onerous MonadBaseControl would be for a user of the lib. I could potentially offer a concurrent-stream-in-and-send-to-s3 version with a MonadBaseControl constraint and one with the current constraints that buffers and sends the chunk before moving onto the next one.

endgame commented 2 years ago

[I]n the branch which will target amazonka >= 1.6.1 I'll stick to the current implementation of building a buffer and writing that out when its full (needing some the changes we've spoken about regarding capping the chunk size etc.)

You will have to - chunked uploads are broken on 1.6.1.

For Amazonka-2.0, if you want streaming directly from the source conduit into amazonka (without assembling the buffer yourself), I can't see a way that doesn't involve some kind of rendezvous structure. I'd stay away from anything demanding MonadBaseControl - it's not really seeing as much use in Snoymanverse projects, and think UnliftIO.Async provides similar functions. Also, resourcet (used by amazonka) already forces a dependency on unliftio-core via runResourceT :: MonadUnliftIO m => ResourceT m a -> m a, so you wouldn't be adding additional constraints to users of amazonka-s3-streaming.

endgame commented 1 year ago

For streamUpload, this library has to buffer each chunk in full before sending it to amazonka. To get perfect streaming, I have to "seal" the conduit, and then I can write a function that streams a single chunk's worth of data from a SealedConduitT:

-- | Extract a single chunk from a stream, reallocating as little as possible.
streamChunk :: forall m a.
  MonadIO m
  => ChunkSize
  -> SealedConduitT () ByteString m a
  -> SealedConduitT () ByteString m (SealedConduitT () ByteString m a)
streamChunk size (SealedConduitT pipe) =
  SealedConduitT $ SealedConduitT <$> pipeChunk pipe
  where
    pipeChunk
      :: Pipe () () ByteString () m a
      -> Pipe () () ByteString () m (Pipe () () ByteString () m a)
    pipeChunk = loop size
      where
        loop !n = \case
          HaveOutput p b -> case compare n bLen of
            -- Emit 'n' bytes from 'b' and push the rest onto the return value
            LT ->
              let (bN, bRest) = BS.splitAt n b
              in  HaveOutput (pure $ HaveOutput p bRest) bN
            -- Emit 'b' and then we're done
            EQ -> HaveOutput (pure p) b
            -- 'b' fits entirely in the stream we want to emit
            GT -> HaveOutput (loop (n - bLen) p) b
            where
              bLen = BS.length b
          NeedInput f _ -> loop n $ f ()
          Done a -> pure $ Done a
          PipeM m -> PipeM $ loop n <$> m
          Leftover p () -> loop n p

I can't figure out how to plug this in, because streamUpload returns a conduit as a result, drawing input using the monadic action await provided by conduit. There should be some kind of streaming await that pulls up to N bytes and returns them as a sub-conduit.

I am increasingly of the opinion that having "input" in a streaming library's core transformer type is a misfeature, and you should instead consume an input stream as a function argument.