WW-Digital / reactive-kinesis

A Scala & Akka based implementation for working with Amazon Kinesis Streams
Apache License 2.0
34 stars 13 forks source link

reactive-kinesis Build Status Coverage Status

Kinesis client built upon Amazon's KCL (Kinesis Client Library) & KPL (Kinesis Producer Library).

It's worth familiarising yourself with Sequence numbers and Sub sequence numbers.

Contents

Dependency Resolution

SBT

"com.weightwatchers" %% "reactive-kinesis" % "0.5.5"

Maven 2.11

<dependency>
  <groupId>com.weightwatchers</groupId>
  <artifactId>reactive-kinesis_2.11</artifactId>
  <version>0.5.5</version>
  <type>pom</type>
</dependency>

Maven 2.12

<dependency>
  <groupId>com.weightwatchers</groupId>
  <artifactId>reactive-kinesis_2.12</artifactId>
  <version>0.5.5</version>
  <type>pom</type>
</dependency>

Snapshots will be published here

You will need the following resolver for snapshots: https://oss.jfrog.org/artifactory/oss-snapshot-local

Considerations When Using Kinesis in a Distributed Environment

Required Minimum Sharding for Read-Based Applications

From http://docs.aws.amazon.com/streams/latest/dev/kinesis-record-processor-scaling.html:

Typically, when you use the KCL, you should ensure that the number of instances does not exceed the number of shards (except for failure standby purposes). Each shard is processed by exactly one KCL worker and has exactly one corresponding record processor, so you never need multiple instances to process one shard. However, one worker can process any number of shards, so it's fine if the number of shards exceeds the number of instances.

For our purposes, this means any service reading from Kinesis should expect to have one shard per instance, as a minimum. Note that this is specifically for consuming events. Producers don't have the same shard restrictions.

DynamoDB Checkpoint Storage

Amazon's KCL uses DynamoDB to checkpoint progress through reading the stream. When DynamoDB tables are provisioned automatically, for this purpose, they may have a relatively high write-throughput, which can incur additional cost.

You should make sure that the DynamoDB table used for checkpointing your stream

  1. Has a reasonable write throughput defined

  2. Is cleaned up when you're done with it -- KCL will not automatically delete it for you

The checkpointer will automatically throttle if the write throughput is not sufficient, look out for the following info log:

Throttled by DynamoDB on checkpointing -- backing off...

Usage

Kinesis streams and auth

The stream you have configured must already exist in AWS, e.g.

aws kinesis create-stream --stream-name core-test-foo --shard-count 1

For local development, it's expected that you already have a file called ~/.aws/credentials, which contains an AWS access key and secret e.g.

[default]
aws_access_key_id=AKIAXXXXXXXXX999999X
aws_secret_access_key=AAAAAAAAAAAA000000000+AAAAAAAAAAAAAAAAAA

Both the producer and consumer use the DefaultAWSCredentialsProviderChain.

Defining a config file in the client application

You'll need some configuration values provided in the application which leverages this library, As a minimum you'll need:

kinesis {

   application-name = "SampleService"

   # The name of the this producer, we can have many producers per application.
   # MUST contain the stream-name as a minimum. Any additional settings defined will override
   # defaults in the kinesis.producer reference.conf for this producer only.
   some-producer {
      # The name of the producer stream
      stream-name = "sample-producer"

      kpl {
         Region = us-east-1
      }
   }

   # The name of the consumer, we can have many consumers per application
   some-consumer {
      # The name of the consumer stream, MUST be specified per consumer and MUST exist
      stream-name = "sample-consumer"
   }

   some-other-consumer {
      stream-name = "another-sample-consumer"
   }
}

These values will override the default reference.conf. See src/main/resources/reference.conf for a complete reference configuration. and src/it/resources/application.conf for a more detailed override example.

The name of the producer/consumer configuration value MUST match what is specified when instantiating the library. This will be merged with the default-consumer/default-producer from reference.confconfiguration at runtime.

Once these are defined, you can pass them into the Kinesis producer and consumer using a config object (see code examples below).

Note that the application-name is combined with the stream-name for each consumer to define the DynamoDB table for checkpointing. For example: SampleService-sample-consumer.

Notable Consumer Configuration Values

Notable Producer Configuration Values

Typed Configuration - Producer

If you don't want to depend on config files, there's a typed configuration class available: KinesisProducerConfig

You can construct it in a few ways:

// With default values
val defaultProducerConfig = KinesisProducerConfig()

// With a provided KinesisProducerConfiguration from the Java KPL library
val awsKinesisConfig: KinesisProducerConfiguration = ...
val producerConfig = KinesisProducerConfig(awsKinesisConfig)

// With a typesafe-config object
val typesafeConfig: Config = ...
val producerConfig = KinesisProducerConfig(typesafeConfig)

// With a typesafe-config object and an AWSCredentialsProvider
val typesafeConfig: Config = ...
val credentialsProvider: AWSCredentialsProvider = ...
val producerConfig = KinesisProducerConfig(typesafeConfig, credentialsProvider)

These can be used to create a ProducerConf and ultimately a KinesisProducer, like so:

val producerConfig: KinesisProducerConfig = ...
val producerConf: ProducerConf = ProducerConf(producerConfig, "my-stream-name", None, None)

Usage: Consumer

reactive-kinesis provides two different ways to consume messages from Kinesis: Actor Based Consumer and Akka Stream Source.

Consumer Architecture

Actor Based Consumer

Implementing the consumer requires a simple actor which is responsible for processing messages sent to it by the library. We call this the Event Processor. Upon creating an instance of the KinesisConsumer, internally one ConsumerWorker (this is different from the KCL Worker) is created per shard (shards are distributed amongst consumers automatically). This consumer worker is what sends messages to the Event Processor. Note that the Event Processor is shared amongst ALL shards, so it is important not to cache the sender of previous messages. It is perfectly valid to use a router to spread the work amongst many Event Processor actors.

import akka.actor.{Actor, Props}
import com.typesafe.config.ConfigFactory
import com.typesafe.scalalogging.LazyLogging
import com.weightwatchers.reactive.kinesis.consumer.ConsumerWorker.{ConsumerShutdown, ConsumerWorkerFailure, EventProcessed, ProcessEvent}
import com.weightwatchers.reactive.kinesis.consumer.KinesisConsumer
import com.weightwatchers.reactive.kinesis.consumer.KinesisConsumer.ConsumerConf

class TestEventProcessor() extends Actor with LazyLogging {

  import scala.concurrent.duration._

  implicit val timeout = akka.util.Timeout(5.minutes)

  // scalastyle:off method.length
  override def receive: Receive = {

    case ProcessEvent(event) => {
      //Do some processing here...
      sender ! EventProcessed(event.sequenceNumber)
    }
    case ConsumerWorkerFailure(failedEvents, shardId) =>
    // Consumer failure, no more messages will be consumed!! Depending on the purpose of this service this may be critical.

    case ConsumerShutdown(shardId) =>
    // The Consumer has shutdown all shards on this instance (gracefully, or as a result of a failure).
  }
}

object Consumer extends App {
  val system = akka.actor.ActorSystem.create("test-system")
  val config = ConfigFactory.load()
  val eventProcessor = system.actorOf(Props[TestEventProcessor], "test-processor")
  val consumer = KinesisConsumer(ConsumerConf(config.getConfig("kinesis"), "some-consumer"),
                                 eventProcessor, system)
  consumer.start()
}

The following types must be handled:

import com.weightwatchers.reactive.kinesis.consumer.ConsumerWorker.{ConsumerShutdown, ConsumerWorkerFailure, EventProcessed, ProcessEvent}

/**
  * Sent to the eventProcessor for each message in the batch.
  */
  case class ProcessEvent(consumerEvent: ConsumerEvent)

/**
  * Expected in response to a [[ProcessEvent]] message after processing is complete
  *
  * @param compoundSeqNo This is a combination of the sequence and subsequence numbers
  * @param successful    Set this to false to skip this message.
  */
case class EventProcessed(compoundSeqNo: CompoundSequenceNumber, successful: Boolean = true)

/**
  * Sent to the eventProcessor if batch processing fails (above the tolerance after retrying)
  * before shutting down processing on this shard.
  * @param failedEvents The events that failed processing within the time.
  * @param shardId      The shardId of the worker causing the failure.
  */
case class ConsumerWorkerFailure(failedEvents: Seq[ConsumerEvent], shardId: String)

/**
  * Sent to the eventProcessor upon shutdown of this worker.
  */
case class ConsumerShutdown(shardId: String)

Important considerations when implementing the Event Processor

Checkpointing

The client will handle checkpointing asynchronously PER SHARD according to the configuration using a separate actor.

Akka Stream Source

An Akka Source is provided that can be used with streams. It is possible to create a source from a ConsumerConf or directly from the consumer name that is defined in the configuration. Every message that is emitted to the stream is of type CommitableEvent[ConsumerEvent] and has to be committed explicitly downstream with a call to event.commit(). It is possible to map to a different type of CommittableEvent via the map and mapAsync functionality. A KinesisConsumer is created internally for the Kinesis.source, when the factory method isn't defined.

import com.weightwatchers.reactive.kinesis.stream._

Kinesis
  .source("consumer-name")
  .take(100)
  .map(event => event.map(_.payloadAsString())) // read and map payload as string
  .mapAsync(10)(event => event.mapAsync(Downloader.download(event.payload))) // handle an async message
  .map(event => event.commit()) // mark the event as handled by calling commit
  .runWith(Sink.seq) 

Or you can explicitly pass a lambda, to create the KinesisConsumer. You can save a reference to this KinesisConsumer and use it to manually shutdown your consumer when needed.

  import akka.actor.ActorSystem
  import akka.stream.scaladsl.Sink
  import com.weightwatchers.reactive.kinesis.consumer.KinesisConsumer
  import com.weightwatchers.reactive.kinesis.stream._

  implicit val sys = ActorSystem("kinesis-consumer-system")
  implicit val materializer: Materializer = ActorMaterializer()
  import sys.dispatcher

  var consumer = Option.empty[KinesisConsumer]

  Kinesis.source(
    consumerName = "some-consumer",
    createConsumer = (conf, ref) => {
      val c = KinesisConsumer(conf, ref, sys)
      consumer = Option(c)
      c
    })
    .take(100)
    .map(event => event.map(_.payloadAsString()))
    .mapAsync(10)(event => event.mapAsync(Downloader.download(event.payload)))
    .map(event => event.commit())
    .runWith(Sink.seq)

  consumer.foreach { c =>
    c.stop()
  }

All rules described here for the KinesisConsumer also apply for the stream source.

Graceful Shutdown

Currently the KinesisConsumer Shutdown works as follows:

Usage: Producer

The KPL Client sends messages in batches, each message creates a Future which completes upon successful send or failure.

See Amazon's documentation for more information: https://github.com/awslabs/amazon-kinesis-producer

Actor Based Implementation

This implementation will optionally throttle of the number of futures currently in play, according to the max-outstanding-requests property.

Each future is handled within the actor and a message will be returned to the sender upon completion.

The following messages are supported:

import com.weightwatchers.reactive.kinesis.producer.KinesisProducerActor.{SendFailed, SendSuccessful, SendWithCallback}

/**
  * Send a message to Kinesis, registering a callback response of [[SendSuccessful]] or [[SendFailed]] accordingly.
  */
case class SendWithCallback(producerEvent: ProducerEvent, messageId: String = UUID_GENERATOR.generate().toString)

/**
  * Send a message to Kinesis witout any callbacks. Fire and forget.
  */
case class Send(producerEvent: ProducerEvent)

/**
  * Sent to the sender in event of a successful completion.
  *
  * @param messageId        The id of the event that was sent.
  * @param userRecordResult The Kinesis data regarding the send.
  */
case class SendSuccessful(messageId: String, userRecordResult: UserRecordResult)

/**
    * Sent to the sender in event of a failed completion.
    *
    * @param event     The current event that failed.
    * @param messageId The id of the event that failed.
    * @param reason    The exception causing the failure.
    */
case class SendFailed(event: ProducerEvent, messageId: String, reason: Throwable)

Within an Actor (Strongly recommended)

import java.util.UUID

import akka.actor.Actor
import com.typesafe.config.Config
import com.weightwatchers.reactive.kinesis.models.ProducerEvent
import com.weightwatchers.reactive.kinesis.producer.KinesisProducerActor
import com.weightwatchers.reactive.kinesis.producer.KinesisProducerActor.{SendFailed, SendSuccessful, SendWithCallback}
import samples.SomeActor.DoSomething

object SomeActor {
  case object DoSomething
}

class SomeActor(kinesisConfig: Config) extends Actor {

  val kpa = context.actorOf(
    KinesisProducerActor.props(kinesisConfig, "some-producer"))

  override def receive: Receive = {
    case DoSomething =>
      //Do something exciting!
      val producerEvent = ProducerEvent(UUID.randomUUID.toString, "{Some Payload}")
      kpa ! SendWithCallback(producerEvent)

    //Callbacks from the KinesisProducerActor
    case SendSuccessful(messageId, _) =>
      println(s"Successfully sent $messageId")

    case SendFailed(event, messageId, reason) =>
      println(s"Failed to send event ${event.partitionKey} with $messageId, cause: ${reason.getMessage}")
  }
}

Scheme for kinesis retries in services

Kinesis Retry model with service DB:


Algorithm steps to publish event to stream:

if sendSuccessful
    continue
if sendFailed
    save event in db
if backgroundJob.connect(stream).isSuccessful
    go to step1
    delete event from DB 
else
    do nothing.

Kinesis retries diagram scheme

From outside of an Actor

import java.util.UUID
import com.typesafe.config._
import com.weightwatchers.reactive.kinesis.models._
import com.weightwatchers.reactive.kinesis.producer.KinesisProducerActor
import com.weightwatchers.reactive.kinesis.producer.KinesisProducerActor.Send

implicit val system = akka.actor.ActorSystem.create()

val kinesisConfig: Config = ConfigFactory.load().getConfig("kinesis")

// where testProducer is the name in the configuration
val kpa = system.actorOf(KinesisProducerActor.props(kinesisConfig, "some-producer"))

val producerEvent = ProducerEvent(UUID.randomUUID.toString, "{Some Payload}")
kpa ! Send(producerEvent) //Send without a callback confirmation

Pure Scala based implementation (simple wrapper around KPL)

Note that future throttling will be unavailable using this method.

import java.util.UUID
import com.amazonaws.services.kinesis.producer.{UserRecordFailedException, UserRecordResult}
import com.weightwatchers.reactive.kinesis.producer.KinesisProducer
import com.weightwatchers.reactive.kinesis.producer.ProducerConf
import com.typesafe.config._
import com.weightwatchers.reactive.kinesis.models._
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global //Not for production

val kinesisConfig: Config = ConfigFactory.load().getConfig("kinesis")

val kpl = KinesisProducer(ProducerConf(kinesisConfig, "some-producer"))

val producerEvent = ProducerEvent(UUID.randomUUID.toString, "{Some Payload}")

val callback: Future[UserRecordResult] = kpl.addUserRecord(producerEvent)

callback onSuccess {
  case result =>
    println("Success!!")
}

callback onFailure {
  case ex: UserRecordFailedException =>
    println(s"Failure! ${ex.getMessage}")
  case ex =>
    println(s"Critical Failure! ${ex.getMessage}")
}

Akka Stream Sink

An Akka Sink is provided which can be used to publish messages via streams. Every message is sent as ProduserEvent to the Sink, which defines the PartitionKey as well as the payload. The Sink is created from a ProducerConf or directly with a KinesisProducerActor. See Kinesis for the various options.

The Sink expects an acknowledgement for every message sent to Kinesis. An amount of unacknowledged messages can be configured, before back pressure is applied. This throttling is controlled by the kinesis.{producer}.akka.max-outstanding-requests configuration value. Please note: a default value (1000 messages) is applied, if throttling is not configured.

The provided Sink produces a Future[Done] as materialized value. This future succeeds, if all messages from upstream are sent to Kinesis and acknowledged. It fails if a message could not be send to Kinesis or upstream fails.

import akka.stream.scaladsl.Source
import com.weightwatchers.reactive.kinesis.models._
import com.weightwatchers.reactive.kinesis.stream._

Source(1.to(100).map(_.toString))
  .map(num => ProducerEvent(num, num))
  .runWith(Kinesis.sink("producer-name"))
  .onComplete {
    case Success(_) => println("All messages are published successfully.")
    case Failure(ex) => println(s"Failed to publish messages: ${ex.getMessage}")
  }

A long running flow can be easily achieved using a SourceQueue. In this case the flow stays open as long as needed. New elements can be published via the materialized queue:

import akka.stream.scaladsl.Source
import com.weightwatchers.reactive.kinesis.models._
import com.weightwatchers.reactive.kinesis.stream._

val sourceQueue = Source.queue[ProducerEvent](1000, OverflowStrategy.fail)
  .toMat(Kinesis.sink("producer-name"))(Keep.left)
  .run()

sourceQueue.offer(ProducerEvent("foo", "bar"))
sourceQueue.offer(ProducerEvent("foo", "baz"))

The Sink uses a KinesisProducerActor under the cover. All rules regarding this actor also apply for the Sink.

Running the reliability test

Delete & recreate kinesisstreams and dynamo table

Execute this command in a shell. If you don't have access to WW AWS resources, you'll need it:

aws kinesis delete-stream --stream-name test-kinesis-reliability && aws dynamodb delete-table --table-name KinesisReliabilitySpec && sleep 90 && aws kinesis create-stream --stream-name test-kinesis-reliability --shard-count 2

Running the producer-consumer test

Run the SimpleKinesisProducer using the App object.

Wait for hte producer to publish all messages.

At the end the producer will print the number of unconfirmed and failed messages (both should be 0!).

Run the SimpleKinesisConsumer using the App object.

Now, wait for two messages that look like this to appear in the consumer window:

2016-06-14 20:09:24,938 c.w.c.e.KinesisRecordProcessingManager - Initializing record processor for shard: shardId-000000000001
2016-06-14 20:09:24,963 c.w.c.e.KinesisRecordProcessingManager - Initializing record processor for shard: shardId-000000000000

As the test progresses, watch the consumer window for a message of this format:

**** PIT STOP OK

You'll see some stats logged regarding messages/sec processed, near that line.

FAQ

Contributor Guide

Code Formatting

This project uses scalafmt and will automatically fail the build if any files do not match the expected formatting.

Please run sbt scalafmt before committing and pushing changes.

Integration tests

As part of the travis build, integration tests will run against a Kinesis localstack instance. You can run these locally as follows:

Tag Requirements

Uses tags and sbt-git to determine the current version.

Version information

Valid Release Tag Examples:

v1.2.3 (version=1.2.3) v1.2.3-M1 (version=1.2.3-M1)

Invalid Release Tag Examples:

v1.2.3-SNAPSHOT v1.2.3-M1-SNAPSHOT v1.2.3-X1 1.2.3

If the current version on master is a snapshot (release tag + x commits), then the artifact will be deployed to the JFrog OSS repository:

Contribution policy

Contributions via GitHub pull requests are gladly accepted from their original author. Along with any pull requests, please state that the contribution is your original work and that you license the work to the project under the project's open source license. Whether or not you state this explicitly, by submitting any copyrighted material via pull request, email, or other means you agree to license the material under the project's open source license and warrant that you have the legal authority to do so.

Changelog

See the releases tab: https://github.com/WW-Digital/reactive-kinesis/releases

License

This code is open source software licensed under the Apache 2.0 license.