scodec / scodec-akka

Provides interop between scodec and akka
BSD 3-Clause "New" or "Revised" License
14 stars 3 forks source link

Add CodecStage for akka-streams #3

Open danielwegener opened 8 years ago

danielwegener commented 8 years ago

Given scodec-streams, It would be great to have a similar mechanism for akka-streams that allows one to lift a

A dependency on scalaz-streams should not be required.

danielwegener commented 8 years ago

After some first experiments I'd like to share my observations and ideas.

An akka-stream is potentially infinite, demand-driven sequence of elements that can be terminated (completed/cancelled) from the upstream or downstream side either successfully or with a failure at any time.

Akka uses ByteString as representation for a bunch of bytes while scodec uses BitVector/ByteVector. ByteString and ByteVector are isomorphic while a BitVector can only be converted into a ByteVector by possibly introducing some padding.

Akka-streams have a single type T on each port such that each element emitted or consumed must be of the same type.

I see the following use-cases for StreamCodec[T]s:

Decoding a continious potentially huge (or infinite) blob Source[BitVector] into a Source[T]

Encoding a potentially infinite Source[T] into a Source[BitVector]

Decoding a framed, possibly infinite, Source[BitVector]

It turns out that this use case does not need a StreamCodec but can simply be done by mapping a Decoder over a Source[BitVector] to get a Attempt[DecodingResult] that can act propertly on remaning bits and decoding failures.

Encoding a framed Source[T] where each T encodes into zero or more BitVectors

Turns out that this use case also does not need a StreamCodec since a Source[T] can be brought into a shape Source[T1] such that each T1 encodes in to zero or one Attempt[BitVector].

Further ideas

Closing the gap between BitVector/ByteVector and ByteString The actual conversion mechanism is already solved by scodec-akka. We could additionally provide the following stages:

API

The following API proposal is inspired by scodecs own api and the scodec-stream API. The general idea is to describe a StreamCodec by combining different strict Codecs into a sequence of StreamCodecs that describe a lazily runnable chain of decoding computations that have safe exit points (StreamCodec becomes Complete) where upstream is allowed to complete without producing an Err.

I have yet looked merely at the decoding side of the problem since it seemed more complicated. I doubt that all of these combinators have useful/possible counterparts in the encoding domain. I tend to follow the scodec-streams example and leave the construction of StreamEncoders and StreamDecoders seperate beside the most simple combinators (see https://github.com/scodec/scodec-stream/blob/series/1.0.x/src/main/scala/scodec/stream/codec/StreamCodec.scala).

trait StreamDecoder[+A] extends StreamDecoderOps[A] { self =>
  /**
   * This is probably a very clunky attempt. I am happy to hear better ideas!
   * @returns stream decoding state after running this decoder with the remainding BitVector.
   * Could be: Emitting, NextDecoder, Fail, Complete
   * - Emitting signals that this decoder emitted an element ready to be pushed downstream.
   * - NextDecoder signals that this decoder is complete and has a StreamDecoder that will decode the remainding BitVector
   * - Complete signals that this decoder does not expect any futher elements.
   * - Fail signals that this decoder  (remainder=bits)
   * Turns out that a StreamDecoder can signal Emitting and Complete/Fail/NextDecoder at the same time.
   * An implementing stage can control when to run the next step of this continuation like structure.
   */
  def decode(bits:BitVector): ???
}
trait StreamDecoderOps[+A] {
  /** Decode `this` and then decodes `sd` after `this` completes. Completes when `sd` completes. 
   * (should support A>:B too) */
  def ++:[B>:A](sd:StreamDecoder[B]):StreamDecoder[B]

  /** alias for [[StreamDecoder$.or]] */
  def |[B>:A](d:StreamDecoder[B]):StreamDecoder[B]

}
object StreamDecoder {
  /** Completes instantly.
   * This is a neutral element for `++:` and `or` */
  val complete: StreamCodec[Nothing]

  /** Instantly fails with the given Err. */
  def fail(err: =>Err): StreamDecoder[Nothing]

  /** Decodes a single `A` using `d` and then completes. */
  def once[A](d: Decoder[A]): StreamDecoder[A]

  /** Like [[once]], but halts normally and leaves the
   * input unconsumed in the event of a decoding error. */
  def tryOnce[A](dec: Decoder[A]): StreamDecoder[A]

/** Decodes an `Iterable[A]` using `d`, emits each of its elements and then completes. */
def all[A](d:Decoder[Iterable[A]]):StreamDecoder[A]

  /** Repeatedly runs `d` until upstream completes. Fails if upstream completes while the current 
   *  decoding of `d` is incomplete. Or if the  */
  def many[A](d: StreamDecoder[A]): StreamDecoder[A]

  /** Decodes `countCodec` to `n` and then decodes `valueCodec` `n` times.
   * Completes after `n` values have been decoded successfully. */
  def streamOfN[A](countCodec: Decoder[Int], valueCodec: StreamDecoder[A]):StreamDecoder[A]

  /** Wait until `numberOfBits` bits are available, then runs `dec`. 
   *  Runs `dec` if upstream completes even if the buffer has not yet reached `numberOfBits`.
   *  Should have an `chunkedBytes` equivalent. */
  def chunked[A](dec:StreamDecoder[A])(numberOfBits: Long):StreamDecoder[A]

  /** Runs `d` but discards all its emitted values */
  def drop(d:StreamDecoder[_]): StreamDecoder[Nothing]

  /** Instantly emits `a` without consuming any bits */
  def provide[A](a:A):StreamDecoder[A]

  /** Consumes `numberOfBits` and completes. Completes or fails if upstream terminates and the 
   * given `numberOfBits` are not yet consumed. Should have an ignoreBytes equivalent.  */
  def ignore(numberOfBits: Long, failIfUpstreamTerminates:Boolean = false):StreamDecoder[Nothing]

  /** Run `d`` using only the first `numberOfBits` bits of
   * the current stream, then advance the cursor by that many bits on completion. */
  def isolate[A](numberOfBits: Long)(d: StreamDecoder[A]): StreamDecoder[A]

  /** Runs `s1`, then runs `s2` if `s1` emits no elements.
   * Fails if `s1` or `s2` fails.
   * Completes if either `s1` completes after emitting an element or `d` completes. */
  def or[A](s1: StreamDecoder[A], s2: StreamDecoder[A]):StreamDecoder[A]
}

StreamDecoder API should look similar but is TBD

Stage Implementation considerations

The stage has to deal with possible states:

I came up with an experimental (and not very elegant) prototype for decoding (ongoing): https://github.com/danielwegener/scodec-akka/blob/wip-stream-codecs/src/test/scala/scodec/interop/akka/stream/StreamDecoderTest.scala

WDYT? /cc @mpilquist

mpilquist commented 8 years ago

@danielwegener This is fantastic! I really like this proposal and it would be great to get this implemented.

aloiscochard commented 8 years ago

:+1:

eirirlar commented 7 years ago

This would be a nice feature for the project I'm currently on, which uses bot scodec and akka-streams. Are there any plans to wrap this up and release it?

danielwegener commented 7 years ago

I'd really love to but I do not find the time to work on that. My paid projects took me too far away from the scala land unfortunately :( I wasn't really happy with the decoder dsl experiments mentioned above. It was like I could not nail down the proper abstraction (or internal api) for a really composable StreamDecoder. So everything felt a bit ad hoc. If anyone want's to give it a try, feel free to take it as a base or as a learned lessen how not so solve it ;)

eirirlar commented 7 years ago

Totally understand :) Going for a custom framing protocol instead as this is a bit out of scope for me at the moment.

unoexperto commented 6 years ago

I just spent 2 days debugging parsing of 20GB PSQL COPY binary dump to find out that bottleneck was in scodec-stream (or fs2). Simple implementation of akka.stream.Source.unfold around BitVector.fromNmap is ~300 times faster. So something is fishy there, @mpilquist

mpilquist commented 6 years ago

@unoexperto Please open an issue on fs2 and describe what you encountered more.