kolmodin / binary

Efficient, pure binary serialisation using ByteStrings in Haskell.
Other
105 stars 67 forks source link

Decoder prematurely reports "not enough bytes" #160

Closed QuRyu closed 5 years ago

QuRyu commented 5 years ago

I am streaming a file of pcap format with conduit library, combined with Decoder to parse data. My parser is defined as the following

parsePPacket :: (BS.ByteString -> Bool) -> -- discard the packet?
                (Word32 -> Get a) -> -- packet parser 
                IORef Int -> 
                Get (Maybe a) 
parsePPacket f p ref = do 
    pacc <- getWord32le 
    skip 8 
    plen <- getWord32le -- length of pcap packet 
    ignored <- return $ unsafePerformIO $ do 
                    count <- readIORef ref 
                    writeIORef ref (count+1)
                    putStrLn $ "packet " ++ show count 
                                ++ ", length: " ++ show plen 
    if (ignored `seq` plen) /= 257
        then do skip' plen 
                return Nothing 
        else do skip 42 -- skip the IP/UDP header 
                code <- getByteString 5 
                if f code 
                    then do r <- p pacc 
                            return (Just r)
                    else do let skipLen = abs (plen - 47)
                            skip' skipLen
                            return Nothing 
  where 
    skip' = skip . fromIntegral 
    getByteString' = getByteString . fromIntegral

parseB6034 :: IORef Int -> Get (Maybe MarketData)
parseB6034 ref = parsePPacket (BS.isPrefixOf quote) parseB6034' ref 
    where 
      quote :: BS.ByteString
      quote = Char8.pack "B6034"

      parseB6034' :: Word32 -> Get MarketData 
      parseB6034' pacc = do 
        _readB <- bytesRead 
        content <- getByteString 209 
        skip 1 
        return (mkMarketData pacc content)

parseB6034 builds upon parsePPacket to read a specific kind of data. The unsafePerformIO and IORef parts are intended for debugging. I have a parseGHeader function that is used to parse the first chunk of the file and it runs perfectly ok. So I have not pasted it here.

The place where I use Decoder is in the following code:

 -- | Run the given `Get` moand as many times as possible until there is 
-- no input remaining from upstream. 
conduitParseMany :: (MonadThrow m, MonadResource m, Show o) => 
                        Get o -> ConduitT BS.ByteString o m () 
conduitParseMany g = go0 0
    where 
      go0 !n = do x <- await 
                  case x of 
                    Nothing -> return () 
                    Just bs -> go n (runGetIncremental g `pushChunk` bs) 

      go !n (Fail i o e)  = throwM (DecodeError i n e "conduitParseMany")
      go !n (Partial f)   = await >>= (go n . f) 
      go !n (Done bs o v) = do yield v 
                               if BS.null bs 
                                  then go0 (n+o)
                                  else go (n+o)
                                          (runGetIncremental g `pushChunk` bs) 

conduitParseMany turns a Get monad into a Decoder and run it until there is no more input from the upstream. There is also a function conduitParseOnce, which also does the conversion but only runs the Decoder once and return (not yield) the output. Then I combine these two functions and the two parsers above to parse the file.

-- | Run the first `Get` only once and run the second `Get` 
-- as much as possible until there is no input from upstream left. 
-- Then collect the results from the second `Get` into a vector and 
-- return it with the result fromt the first `Get`.
fuseGet :: (MonadThrow m, PrimMonad m, MonadResource m, VG.Vector v b, Show b) 
           => Get a
           -> Get b 
           -> ConduitT BS.ByteString Void m (a, v b) 
fuseGet gOnce gMany =  
    (fmap fst $ (conduitParseOnce gOnce) `fuseBoth` (conduitParseMany gMany))
        `fuseBoth` C.sinkVector 

pipeline :: (VG.Vector v (Maybe MarketData)) => 
            FilePath 
         -> IORef Int 
         -> IO (PGlobalHeader, v (Maybe MarketData)) 
pipeline fp ref = do 
    ref <- newIORef 0 
    runResourceT . runConduit $ (C.sourceFile fp) .| parser
    where 
      parser = fuseGet parseGHeader (parseB6034 ref)

The pipeline function returns the data given the filepath.

My parser functions worked fine with readFile from Data.ByteString.Lazy. But in the incremental processing code above, it reports "not enough bytes" after having parsed exactly 115 packets, even though there are bytes left unconsumed. Specifically, when the program runs the line content <- getByteString 209, it reports the error.

My speculation is that the Decoder is trying to get a ByteString of length 209 at once, but the Conduit provides a chunk of ByteString of length less than 209. Therefore, the Decoder just reports the error. Is this a correct guess though?

QuRyu commented 5 years ago

I have not committed the code above to my GitHub because there are bugs contained. But if anyone needs a reference I am happy to do so.

QuRyu commented 5 years ago

realized this is actually an issue with my conduit composition, not a problem of the binary library per se.

kolmodin commented 5 years ago

Thanks for looking into this!