snoyberg / conduit

A streaming data library
895 stars 194 forks source link

Chunk bytestring conduit #438

Open domenkozar opened 4 years ago

domenkozar commented 4 years ago

I'd propose adding chunk :: MonadIO m => ChunkSize -> ConduitT ByteString ByteString m () to conduit as it seems quite common use case to chunk up a bytestring stream (for example when uploading to some service like amazonka-s3-streaming).

Implementation from https://github.com/blitzcode/conduit-chunked


-- Chunking with a raw buffer

data S = S (ForeignPtr Word8) (Ptr Word8) {-# UNPACK #-} !Int

newS :: ChunkSize -> IO S
newS chunkSize = do
    fptr <- mallocByteString chunkSize
    return (S fptr (unsafeForeignPtrToPtr fptr) 0)

processChunk :: ChunkSize -> ByteString -> S -> IO ([ByteString], S)
processChunk chunkSize input =
    loop id 0
  where
    loop front idxIn s@(S fptr ptr idxOut)
        | idxIn >= B.length input = return (front [], s)
        | otherwise = do
            pokeByteOff ptr idxOut (unsafeIndex input idxIn)
            let idxOut' = idxOut + 1
                idxIn' = idxIn + 1
            if idxOut' >= chunkSize
                then do
                    let bs = PS fptr 0 idxOut'
                    s' <- newS chunkSize
                    loop (front . (bs:)) idxIn' s'
                else loop front idxIn' (S fptr ptr idxOut')

chunk :: MonadIO m => ChunkSize -> ConduitT ByteString ByteString m ()
chunk chunkSize =
    liftIO (newS chunkSize) >>= loop
  where
    loop s@(S fptr _ len) = do
        mbs <- await
        case mbs of
            Nothing -> yield $ PS fptr 0 len
            Just bs -> do
                (bss, s') <- liftIO $ processChunk chunkSize bs s
                mapM_ yield bss
                loop s'
snoyberg commented 4 years ago

I typically use builders for chunking. Are there cases where this kind of approach is demonstrably faster?

domenkozar commented 4 years ago

According to the https://github.com/blitzcode/conduit-chunked#benchmark, it's about 10x slower, but that benchmark should probably be updated and reran since it's quite old.

snoyberg commented 4 years ago

I wouldn't be surprised to hear that, it seems reasonable that rechunking could get some performance improvements. In any event: I'd accept a PR adding some kind of functionality like this.