qubole / kinesis-sql

Kinesis Connector for Structured Streaming
http://www.qubole.com
Apache License 2.0
137 stars 80 forks source link

Kinesis Application State in Dynamo DB #22

Open VikramBPurohit opened 6 years ago

VikramBPurohit commented 6 years ago

Hi @itsvikramagr,

Thanks again for writing this library and providing constant support to community.

Our spark structure streaming code doesn't need to be stateful. We are ok with cluster or executors going down as long as Kinesis is managing consumer's application state.

https://docs.aws.amazon.com/streams/latest/dev/kinesis-record-processor-ddb.html

Could you shed some light on -

  1. Why Kinesis isn't able to manage offset/application state in dynamo db although kinesis sql is calling describe stream API?
  2. I don't see any place in kinesis-sql code which is setting application name for Kinesis.

Thanks in advance for your response!

itsvikramagr commented 6 years ago

Kinesis SQL library does not use KCL for most of its working. KCL and structured streaming APIs does not go along so well. Also, KCL comes with an extra cost of DynamoDB which can be avoided for structured streaming

But at the same time, while designing kinesis-sql, we knew that users would want more reliable shard progress committer. Hence the module provides an option of pluggable committer. We can do the following to use another committer

private def metadataCommitter: MetadataCommitter[ShardInfo] = {
    metaDataCommitterType.toLowerCase(Locale.ROOT) match {
      case "hdfs" =>
        new HDFSMetadataCommitter[ ShardInfo ](metaDataCommitterPath, hadoopConf(sqlContext))
      case _ => throw new IllegalArgumentException("only HDFS is supported")
    }
  } 

private def metaDataCommitterType: String = {
    sourceOptions.getOrElse("executor.metadata.committer", "hdfs").toString
  } 

I am relying on the community to implement dynamodb committer. cc @VikramBPurohit