Closed aquamatthias closed 6 years ago
Interesting, the shutdown scenarios definitely need thorough testing (we have a couple of outstanding issues). This is an example of why I've not closed those issues yet :) I've been slowly hardening these as I go, those issues will live on until I'm 100% happy.
Are you able to share any test code which simulates this?
@markglh I started to implement an akka stream source on top of reactive-kinesis. The Source looks like this:
import akka.Done
import{ActorRef, ActorSystem}
import akka.pattern.pipe
import{Attributes, BufferOverflowException, Outlet, SourceShape}
import{GraphStage, GraphStageLogic, OutHandler}
import com.typesafe.scalalogging.LazyLogging
import com.weightwatchers.reactive.kinesis.consumer.KinesisConsumer
import com.weightwatchers.reactive.kinesis.consumer.KinesisConsumer.ConsumerConf
import com.weightwatchers.reactive.kinesis.consumer.ConsumerWorker.{ConsumerShutdown, ConsumerWorkerFailure, EventProcessed, ProcessEvent}
import com.weightwatchers.reactive.kinesis.models.ConsumerEvent
* The KinesisEvent is passed through the stream.
* Every event has to be committed explicitly.
sealed trait KinesisEvent {
def event: ConsumerEvent
def commit(successful: Boolean = true): KinesisEvent
* Actor based implementation of KinesisEvent.
private[kinesis] case class KinesisActorEvent(event: ConsumerEvent)(implicit sender: ActorRef) extends KinesisEvent {
def commit(successful: Boolean = true): KinesisEvent = {
sender ! EventProcessed(event.sequenceNumber, successful)
* A KinesisSourceGraph will attach to a kinesis stream with the provided configuration and constitute a Source[KinesisEvent, NotUsed].
* Usage:
* {{{
* val config = ConfigFactory.load()
* val consumerConfig = ConsumerConf(config.getConfig("kinesis"), "some-consumer")
* val source = Source.fromGraph(new KinesisSourceGraph(consumerConf, system))
* }}}
* Assuming a configuration file like this:
* {{{
* kinesis {
* application-name = "SampleService"
* some-consumer {
* stream-name = "sample-consumer"
* }
* }
* }}}
* See reference.conf for a list of all available config options.
* @param config the kinesis stream configuration.
* @param actorSystem the actor system.
class KinesisSourceGraph(config: ConsumerConf, actorSystem: ActorSystem) extends GraphStage[SourceShape[KinesisEvent]] with LazyLogging {
private[this] val out: Outlet[KinesisEvent] = Outlet("KinesisSource.out")
override val shape: SourceShape[KinesisEvent] = SourceShape.of(out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
// KCL will read events in batches. The stream should be able to buffer the whole batch.
private[this] val bufferSize: Int = config.kclConfiguration.getMaxRecords
// The queue to buffer events that can not be pushed downstream.
private[this] val messages = new java.util.ArrayDeque[KinesisEvent]()
// The kinesis consumer to read from.
private[this]var kinesisConsumer: Option[KinesisConsumer] = None
setHandler(out, new OutHandler {
override def onPull(): Unit = if (!messages.isEmpty) push(out, messages.poll())
override def preStart(): Unit = {
// the underlying stage actor reference of this graph stage.
val actor = getStageActor(receive).ref
val consumer = KinesisConsumer(config, actor, actorSystem)
// start() creates a long running future that returns, if the consumer worker is done or failed.
import actorSystem.dispatcher
consumer.start().map(_ => Done).pipeTo(actor)
kinesisConsumer = Some(consumer)
override def postStop(): Unit = {"Stopping Source $out. ${messages.size()} messages are buffered unprocessed.")
def receive: Receive = {
case (_, _: ProcessEvent) if messages.size > bufferSize =>
// The messages are processed not fast enough, so the messages in the buffer exceeds the maximum buffersize
// Fail the stage to prevent message overflow.
// Ideally we could control when the next batch is fetched.
logger.warn(s"Buffer of size $bufferSize is full. Fail the stream.")
failStage(BufferOverflowException(s"Buffer overflow (max capacity was: $bufferSize)!"))
case (actorRef: ActorRef, ProcessEvent(event)) =>
// A new event that needs to be processed.
// Always use the queue to guarantee the correct order of messages."Process event: $event")
while (isAvailable(out) && !messages.isEmpty) push(out, messages.poll())
case (_, ConsumerShutdown(shardId)) =>
// A consumer shutdown occurs when another source is created and hence the Kinesis shards are rebalanced.
// This message is received, if the graceful shutdown is finalized.
// Once is fixed, we should drop all buffered
// messages of this shard."Consumer shutdown for shard $shardId")
case (_, ConsumerWorkerFailure(failedEvents, shardId)) =>
// Send for all events of a batch, where the processing has failed (after configured retries)
// Since proceeding is not possible, the stream is failed.
logger.error(s"Consumer worker failure for shard $shardId")
failStage(new IllegalStateException(s"Failed Events: $failedEvents for shardId: $shardId"))
case (_, Done) =>
// The KinesisConsumer has been finished, so the stage is completed."Kinesis Consumer finished.")
case (_, Failure(ex)) =>
// The KinesisConsumer failed. This should also fail the stage.
logger.error("Kinesis Consumer failed", ex)
A main class is needed which creates a simple stream and reads from the stream:
import{Sink, Source}
import com.typesafe.scalalogging.StrictLogging
import com.weightwatchers.reactive.kinesis.consumer.KinesisConsumer.ConsumerConf
import scala.concurrent.Await
import scala.concurrent.duration.Duration
* Simple Test Consumer: This process will read from the configured stream.
* Start as many consumers as wanted - each worker has a unique id.
object TestConsumer extends StrictLogging {
def main(args: Array[String]): Unit = {
implicit val actorSystem: ActorSystem = ActorSystem("kinesis")
implicit val materializer: ActorMaterializer = ActorMaterializer()
val conf = ConsumerConf(actorSystem.settings.config.getConfig("kinesis"), "test-consumer")
val result = Source.fromGraph(new KinesisSourceGraph(conf, actorSystem))
.map { e =>
e.commit()"Received: ${e.event.payload}")
Await.ready(result, Duration.Inf)
The log statements are derived from starting/stopping 2 processes of TestConsumer
from a kinesis stream with 2 shards.
BTW: I saw akka streams on your road map as well. Do you want me to create a PR with this implementation?
Great, thanks for sharing that!
I don't see why not - I might struggle to get enough time to really dig into the PR in the next couple of weeks though - I'd like to really think about the implementation before we merge, especially the failure and checkpoint scenarios. What I like to avoid is changing the interface massively after merging.
That said, let's get the ball rolling :D
In our setup we have a Kinesis stream with 2 shards and 2 consumer (C1 and C2 in my description). Both consumer C1 and C2 will get a lease for one shard and read from the stream as expected. If C2 fails (process dies), C1 will read from both shards. C2 restarts and will take over a lease for one shard. The
is not killed when a lease is lost. We see the following debug output in C1:From here on, we see this exception logged in the checkpoint interval (every 3 seconds):
Everything works as expected, since the lease is taken over by another consumer. Nevertheless I see this as a problem, since the logs will emit warnings, which are no warnings. Not sure if this can lead to other unwanted behaviour?