lightbend / cloudflow

Cloudflow enables users to quickly develop, orchestrate, and operate distributed streaming applications on Kubernetes.
https://cloudflow.io
Apache License 2.0
321 stars 89 forks source link

More powerful and flexible API for sending messages to outlets #982

Open vkorenev opened 3 years ago

vkorenev commented 3 years ago

Is your feature request related to a problem? Please describe.

Background: I'm working on an integration project where I'm using Cloudflow with various other Akka streams sinks and sources.

There are some scenarios which do not seem to be supported well by Cloudflow API:

  1. After a message is sent successfully to an outlet, some actions need to be performed other than committing a Kafka offset.
  2. For each message received from a Kafka topic, a collection of outbound messages is generated. The offset for inbound message should be committed after all outbound messages generated from it are successfully written to outlet.
  3. Same as case 2, but an Akka stream of outbound messages is generated instead of a strict collection. This stream can be potentially very large.

There could be also combinations of cases 1 and 2 or 3 when some element received from a non-Kafka Source is transformed into multiple Kafka messages. After sending is finished, some action needs to be performed.

Some less abstract examples:

It's not hard to support these scenarios with Alpakka Kafka:

  1. Producer.flexiFlow is useful for the scenario 1. It allows to pass some data through and to perform some actions after messages are sent. However, Cloudflow does not seem to have anything similar to it.
  2. ProducerMessage.MultiMessage used together with a committableSink provides a solution for the scenario 2. However, Cloudflow's committableSink takes only one outbound message paired with Committable.
  3. One of the solutions for scenario 3 can be spawning a nested Akka stream inside mapAsync, running it to completion, and then performing necessary actions. To help with that, both Producer.plainSink and Producer.committableSink return a Future as a materialized value allowing to monitor stream termination. However, Cloudflow counterparts are not designed to be used in a stream which lifetime is shorter that that of the streamlet.

Is your feature request related to a specific runtime of cloudflow or applicable for all runtimes?

It's related to Akka Streamlets.

Describe the solution you'd like

One option would be adding more sinks and flows to Cloudflow API:

Another option might be providing some API for adapting Alpakka Kafka sinks and flows to work with Cloudflow CodecOutlet.

Describe alternatives you've considered

I tried to add a Flow similar to a flexiFlowfrom Alpakka Kafka. For that I used some of AkkaStreamletContext public methods and copy-pasted some code from private ones:

import akka.NotUsed
import akka.kafka.{ProducerMessage, ProducerSettings}
import akka.kafka.scaladsl.Producer
import akka.stream.scaladsl.Flow
import cloudflow.akkastream.AkkaStreamletContext
import cloudflow.streamlets.{CodecOutlet, Topic}
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.ByteArraySerializer

import java.nio.charset.StandardCharsets

object AkkaStreamletContextExt {
  def flexiFlow[T, PassThrough](
      outlet: CodecOutlet[T]
  )(implicit context: AkkaStreamletContext): Flow[(T, PassThrough), PassThrough, NotUsed] = {
    val topic = context.findTopicForPort(outlet)
    val producerSettings = ProducerSettings(context.system, new ByteArraySerializer, new ByteArraySerializer)
      .withBootstrapServers(context.runtimeBootstrapServers(topic))
      .withProperties(topic.kafkaProducerProperties)

    Flow[(T, PassThrough)]
      .map { case (value, committable) =>
        ProducerMessage.Message(producerRecord(outlet, topic, value), committable)
      }
      .via(Producer.flexiFlow(producerSettings))
      .map(results => results.passThrough)
  }

  private def producerRecord[T](outlet: CodecOutlet[T], topic: Topic, value: T) = {
    val key = outlet.partitioner(value)
    val bytesKey = keyBytes(key)
    val bytesValue = outlet.codec.encode(value)
    new ProducerRecord(topic.name, bytesKey, bytesValue)
  }

  private def keyBytes(key: String) = if (key != null) key.getBytes(StandardCharsets.UTF_8) else null
}

However, in this case I can no longer test my streamlet with AkkaStreamletTestKit.

RayRoestenburg commented 3 years ago

Hi @vkorenev thanks for the feedback!

In general, there is absolutely nothing wrong with using Alpakka Kafka directly. I agree it would be nice, as you say:

Another option might be providing some API for adapting Alpakka Kafka sinks and flows to work with Cloudflow CodecOutlet.

We can check what we can do in this regard. Right now, (as you show in your example) you can use the codec directly on CodecOutlet / CodecInlet, which is available as codec method. The StreamletContext provides findTopicForPort(port), where port is an inlet or an outlet you can pass in, which returns a Topic, which you can use to get the Kafka bootstrapServers with the method runtimeBootstrapServers(topic). This provides you with everything you need to create an Alpakka Kafka Source / Sink from ConsumerSettings or ProducerSettings. But I agree this is not the most user friendly right now, which we can improve upon. (Sorry, I just noticed that this is exactly what you did)

I'm thinking maybe we can add some convenience methods to create a ConsumerSettings or ProducerSettings from an inlet, outlet respectively. Another convenience is to add a method to create a ProducerRecord, as you have in your example. What do you think?

If Alpakka Kafka would support the idea of a topic-id, a way to configure topics by id in the configuration, and methods to create a source / sink from a particular topic-id, we could leverage this in Cloudflow. All that would be left then is to plugin the codec, which could possible also be added in some way. This is just me brainstorming, let me know if this makes sense.

Some more comments, and questions for clarification:

For:

  1. After a message is sent successfully to an outlet, some actions need to be performed other than committing a Kafka offset.

I assume when you say "other than committing a Kafka offset", you mean you want to commit the offset, and do something after the commit is successful? This assumes that commits are completed per message, and that the next operation only occurs after the commit has occurred? (Please note that this has performance consequences.)

For points 2 and 3:

  1. For each message received from a Kafka topic, a collection of outbound messages is generated. The offset for inbound message should be committed after all outbound messages generated from it are successfully written to outlet.
  2. Same as case 2, but an Akka stream of outbound messages is generated instead of a strict collection. This stream can be potentially very large.

In general, the committableSink(outlet) can be used to acknowledge reading from an inlet, as well as writing messages to the outlet, at the same time. The committableSink commits in batches. You can configure the committing / acknowledging reads behaviour with CommitterSettings. Batching acknowledgements is good for performance, but it obviously results in more duplicate messages on restart. So I at least wanted to point out that you can configure this CommitterSettings differently.

If you create many messages for every message in the source, commits happen after a change of the offset is observed. In CommitterSettings this is set by default to NextOffsetObserved

So if you use, say mapConcat on a source, this already works. For this to work you need to use the sourceWithCommittableContext. if you generate more messages per incoming message, or even streams of messages per message, as long as the context is retained, the committer will commit on next offset observed.

Let me know if I am misunderstanding your need, or if there is something that you need specifically in this case, that is not supported by ...WithCommittableContext methods and the CommitterSettings, defaulting to NextOffsetObserved.

RayRoestenburg commented 3 years ago

Please also note that the AkkaStreamletTestkit does not provide features for acknowledging / committing offsets.

vkorenev commented 3 years ago

Hi @RayRoestenburg, thank you for your reply!

In general, there is absolutely nothing wrong with using Alpakka Kafka directly. I agree it would be nice, as you say:

Another option might be providing some API for adapting Alpakka Kafka sinks and flows to work with Cloudflow CodecOutlet.

We can check what we can do in this regard. Right now, (as you show in your example) you can use the codec directly on CodecOutlet / CodecInlet, which is available as codec method. The StreamletContext provides findTopicForPort(port), where port is an inlet or an outlet you can pass in, which returns a Topic, which you can use to get the Kafka bootstrapServers with the method runtimeBootstrapServers(topic). This provides you with everything you need to create an Alpakka Kafka Source / Sink from ConsumerSettings or ProducerSettings. But I agree this is not the most user friendly right now, which we can improve upon. (Sorry, I just noticed that this is exactly what you did)

I'm thinking maybe we can add some convenience methods to create a ConsumerSettings or ProducerSettings from an inlet, outlet respectively. Another convenience is to add a method to create a ProducerRecord, as you have in your example. What do you think?

That would be nice. The only possible problem which I see here is if this can be made to work with the test kit. For example, the code which I provided throws at val topic = context.findTopicForPort(outlet) in tests. That's because the AkkaStreamletTestKit sets the context to cloudflow.akkastream.testkit.TestContext which overrides everything related to Kafka with its own implementation.

  1. After a message is sent successfully to an outlet, some actions need to be performed other than committing a Kafka offset.

I assume when you say "other than committing a Kafka offset", you mean you want to commit the offset, and do something after the commit is successful? This assumes that commits are completed per message, and that the next operation only occurs after the commit has occurred? (Please note that this has performance consequences.)

I mean some non-Kafka source and Kafka sink. Since the source is not Kafka, the "commit" action is not committing Kafka offset, but some other source-specific action. For example, if the source is the Alpakka adapter for JMS, then the action may be calling commit() on the source element. If the source is a directory with files, the action could be deleting or moving the file. But what is important is that the action needs to be performed after the message(s) generated for the source element have been successfully written to the Kafka producer.

The most straightforward approach would probably be implementing custom Committable and passing it to Cloudflow's committableSink(outlet). However, Committable is not supposed to be extended as it is declared with @DoNotInherit annotation. Also my understanding is that the default behaviour of the committableSink is to commit once in a while for performance reasons, which is good for Kafka source. But other sources may require every element to be "committed". However, this might be solved by providing CommitterSettings configured to commit every single message.

  1. For each message received from a Kafka topic, a collection of outbound messages is generated. The offset for inbound message should be committed after all outbound messages generated from it are successfully written to outlet.
  2. Same as case 2, but an Akka stream of outbound messages is generated instead of a strict collection. This stream can be potentially very large.

In general, the committableSink(outlet) can be used to acknowledge reading from an inlet, as well as writing messages to the outlet, at the same time. The committableSink commits in batches. You can configure the committing / acknowledging reads behaviour with CommitterSettings. Batching acknowledgements is good for performance, but it obviously results in more duplicate messages on restart. So I at least wanted to point out that you can configure this CommitterSettings differently.

If you create many messages for every message in the source, commits happen after a change of the offset is observed. In CommitterSettings this is set by default to NextOffsetObserved

So if you use, say mapConcat on a source, this already works. For this to work you need to use the sourceWithCommittableContext. if you generate more messages per incoming message, or even streams of messages per message, as long as the context is retained, the committer will commit on next offset observed.

Nice! Thanks for explaining! I didn't know about that. Yes, this is a solution for one input Kafka message being mapped to many output Kafka messages. However, there might be a case with a non-Kafka source element being mapped to multiple Kafka output elements.