akka / akka-meta

This repository is dedicated to high-level feature discussions and for persisting design decisions.
Apache License 2.0
201 stars 23 forks source link

How shall we surface acknowledged Sources in Akka Streams? #13

Open rkuhn opened 8 years ago

rkuhn commented 8 years ago

These are some initial thoughts, exemplified on a hypothetical streaming Kafka connector API. Use-cases that need to be supported:

Here we only use in-stream data elements, exclusively.

class KafkaSource extends
  GraphStage[FanOutShape2[Confirmation, Message, ConfirmationAck]]

The primary output is the Message, which has an associated Confirmation member that will eventually be fed back. After confirmation has successfully moved the cursor, a ConfirmationAck is produced.

Pros

Here we provide multiple independent pieces in order to allow usage in normal Flow syntax.

class KafkaSource extends
  GraphStageWithMaterializedValue[SourceShape[Message], KafkaReader]
class KafkaConfirmation extends
  GraphStageWithMaterializedValue[FlowShape[Message, Confirmation], KafkaConfirmer]
trait KafkaReader {
  def wireUp(c: KafkaConfirmer): Unit
}

Here the idea is that users can use the linear Source & Flow DSL as in:

KafkaSource().map(...).viaMat(KafkaReader())(_ wireUp _)

This only works well if the whole setup is created in a place that has this kind of overview. The KafkaReader will forward the confirmations to the KafkaSource using the GraphStage’s asyncInput facility, all Kafka interaction is owned by the KafkaSource.

Pros

While the correct message and confirmation types need to travel through streams to the right places in all solutions, this one encapsulates the concern of confirmation within the data elements themselves.

class KafkaSource extends GraphStage[SourceShape[Message]]
trait Message {
  def confirm(): Future[Done]
}

This allows usage as a bog-standard Source.

Pros

*easy to set up

Cons

Backpressure of confirmations is a potential problem which probably can be handled by conflating them (since a confirmation moves a cursor and therefore confirmations are cumulative).

Other API Options / Considerations

Factory Methods

Any of the above can be combined with factory methods that turn a Flow[Message, Confirmation] into a Source[ConfirmationAck], but this has several downsides:

It would be possible to turn a stream of confirmable messages (Option 3) into “Flow” that forwards immutable messages and wants a stream of confirmations back. This adapter would make it possible to get around the serialization limitations.

Fan-Out of confirmable messages

Broadcasting confirmable messages to multiple destinations is troublesome: which of them shall be routed back to the source in order to move the cursor? There is no general solution to this problem, we may just not do anything about it. If we do something, the drawback will be that the messages need to become mutable and thread-safe because the required confirmation count should match the fan-out factor.

13h3r commented 8 years ago

Option 1

complex wiring required, does not work without GraphDSL Lets provide simple wrappers that exposes complex shapes as Source or as Flow[Confirmation, Msg]. Why this will not work?

Option 2

This only works well if the whole setup is created in a place that has this kind of overview. The KafkaReader will forward the confirmations to the KafkaSource using the GraphStage’s asyncInput facility, all Kafka interaction is owned by the KafkaSource.

There is the problem with shutdown. If kafka client owned by source at which moment it can be closed? If at moment of source completes then such client unable to commit messages that currently processing.

Option 3 Same problem with shutdown. When can we close kafka client? Should we track status of emitted messages somehow?

rkuhn commented 8 years ago

The “simple wrappers” proposal does not work because the complexity is essential, it cannot be hidden. In particular it is impossible to break up the complex shape and offer it as a Source plus Flow, that is exactly what does not work—the kafka2 code right now does something like this, but it cheats by pre-materializing and using RS interfaces, which is undesirable because the resulting pieces are not reusable.

rkuhn commented 8 years ago

Concerning termination: that would need to be communicated back to the Source when it hits the confirmation input. This is of course trivial in Option 1, but the equivalent thing can be done in the other options as well.

13h3r commented 8 years ago

The “simple wrappers” proposal does not work because the complexity is essential, it cannot be hidden. In particular it is impossible to break up the complex shape and offer it as a Source plus Flow,

Sure it is unable to split complex shape into Flow and Source. But, most of the times you do not cares about commit confirmation and Flow[Messages, Commit] will be enough. I am talking about providing easy ways to get just Flow instead of complex shape. If you need fullfeatured processing then use GraphDSL or/and factory methods (have same idea in my user level code already).

that is exactly what does not work—the kafka2 code right now does something like this, but it cheats by pre-materializing and using RS interfaces, which is undesirable because the resulting pieces are not reusable

Could you please point me when kafka2 uses RS and which parts are not reusable?

13h3r commented 8 years ago

Concerning termination: that would need to be communicated back to the Source when it hits the confirmation input. This is of course trivial in Option 1, but the equivalent thing can be done in the other options as well.

Could you explain what kind of communication you are talking about? Communication on graph level? On akka-stream implementation level? Or some other way?

I am asking because I try to implement it in such way and failed because of shape reusage, lifecycles and absence of way to communicate.

ktoso commented 8 years ago

Could you please point me when kafka2 uses RS and which parts are not reusable?

Highlighting the prematerialization bit which we think is very undesirable as akka streams are "reusable blueprints, which one materializes". The below stream allocates resources before it actually is run:

val consumerWithOffsetSink = kafka.consumeWithOffsetSink(consumerProperties) // <<< already "alive!"

Source.fromPublisher(consumerWithOffsetSink.publisher)
  .map(processMessage(_))
  .to(consumerWithOffsetSink.offsetCommitSink) // above needed in order to inject in fluent api in 2 places
  .run() // we want only when run() to allocate resources
13h3r commented 8 years ago

@ktoso got it. I will try to explain current situation in more details to be clear.

Right now there are 3 API's in the reactive kafka's master:

All these API provides kafka reading/writing capabilities. I started kafka2 because the current graph stage API does not fully conform akka-stream design principles. Problems that you pointed is problems of current graph stage API and because of these problems I had started to design the kafka2 API. We agreed with @kciesielski that the kafka2 API will replace current graph stage API in a near future.

As far as I can see Option 1 references kafka2 API which should not have problems with akka-stream design and stages reusage. If it still has could you please point to it.

Thanks for reading I hope I explained current state of reactive-kafka :)

13h3r commented 8 years ago

@rkuhn There are also two (optional) additional aspects we may warn about:

These cases are less important, but we may want to check solutions against them.

patriknw commented 8 years ago

I think manually setting topic, partitions assignment, and offset should be done before materialization, i.e. user can do it in the ConsumerProvider. Changing it afterwards should not be necessary.

ConsumerRebalanceListener should perhaps be part of the materialized value, possibly also the KafkaConsumer (or a subset of its api) to be able to grab current position, which is a typical use case for the ConsumerRebalanceListener according to the docs

13h3r commented 8 years ago

@patriknw I did not get how ConsumerRebalanceListener can be used to get position? It invoked when partition assigned/revoked from client. Client may setup offset of partition or save current offset, but it can be done without ConsumerRebalanceListener at all.

Also, we can expose subset of KafkaConsumer API, but it should be done via callbacks and in asynchronous manner because kafka client is not thread safe

Something like this:

  override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = {
    val consumer = consumerProvider()
    // skipped
    val logic = new TimerGraphStageLogic(shape) {
      // existing logic
      val committedOffset = getAsyncCallback[(TopicPartition, Promise[OffsetAndMetadata])] {
        case (partition, promise) =>
          promise.success(consumer.committed(partition))
          ()
      }
    }
    val control = new Control {
      def committedOffset(partition: TopicPartition) = {
        val p = Promise[OffsetAndMetadata]
        logic.committedOffset.invoke((partition, p))
        p.future
      }
      override def stop(): Unit = ??? // skipped

    }
    (logic, control)
  }