cloudify / scalazon

Idiomatic, opinionated Scala library for AWS
MIT License
30 stars 13 forks source link

Incomplete data retrieved. #5

Open bamos opened 10 years ago

bamos commented 10 years ago

Hi Federico,

I've modified your example to add 10 items to Kinesis and then attempt to retrieve them 10 separate times to illustrate a bug I'm seeing in some of my other projects where not all of the records are retrieved:

Do you know if this is expected behavior from Kinesis? Am I doing something subtly wrong when retrieving the data?

Below, some attempts retrieve all 10 records, but other attempts, such as the last one, read less than 10 records.

stream created
stream active
ACTIVE
true
data stored: shardId-000000000000, 49535162123327736836226361226397834013393393790669029377
data stored: shardId-000000000000, 49535162123327736836226361326022690278750787438945239041
data stored: shardId-000000000000, 49535162123327736836226361368991565008786413418608328705
data stored: shardId-000000000000, 49535162123327736836226362473531453190127716825632342017
data stored: shardId-000000000000, 49535162123327736836226362521162196476315335329660796929
data stored: shardId-000000000000, 49535162123327736836226362486934026452529592727012114433
data stored: shardId-000000000000, 49535162123327736836226362545355462718347236243547357185
data stored: shardId-000000000000, 49535162123327736836226362496631199651551572522497474561
data stored: shardId-000000000000, 49535162123327736836226362425675822156409032816724017153
data stored: shardId-000000000000, 49535162123327736836226363990199997461027351869823909889
Retrieving data, attempt 1
data retrieved
==Record chunk.
Read 0 records.

Retrieving data, attempt 2
data retrieved
==Record chunk.
sequenceNumber: 49535162123327736836226361226397834013393393790669029377
data: hello
partitionKey: k1
sequenceNumber: 49535162123327736836226361326022690278750787438945239041
data: hello
partitionKey: k1
sequenceNumber: 49535162123327736836226361368991565008786413418608328705
data: hello
partitionKey: k1
Read 3 records.

Retrieving data, attempt 3
data retrieved
==Record chunk.
sequenceNumber: 49535162123327736836226361226397834013393393790669029377
data: hello
partitionKey: k1
sequenceNumber: 49535162123327736836226361326022690278750787438945239041
data: hello
partitionKey: k1
sequenceNumber: 49535162123327736836226361368991565008786413418608328705
data: hello
partitionKey: k1
sequenceNumber: 49535162123327736836226362425675822156409032816724017153
data: hello
partitionKey: k1
sequenceNumber: 49535162123327736836226362473531453190127716825632342017
data: hello
partitionKey: k1
sequenceNumber: 49535162123327736836226362486934026452529592727012114433
data: hello
partitionKey: k1
sequenceNumber: 49535162123327736836226362496631199651551572522497474561
data: hello
partitionKey: k1
sequenceNumber: 49535162123327736836226362521162196476315335329660796929
data: hello
partitionKey: k1
sequenceNumber: 49535162123327736836226362545355462718347236243547357185
data: hello
partitionKey: k1
sequenceNumber: 49535162123327736836226363990199997461027351869823909889
data: hello
partitionKey: k1
Read 10 records.

Retrieving data, attempt 4
data retrieved
==Record chunk.
sequenceNumber: 49535162123327736836226361226397834013393393790669029377
data: hello
partitionKey: k1
sequenceNumber: 49535162123327736836226361326022690278750787438945239041
data: hello
partitionKey: k1
sequenceNumber: 49535162123327736836226361368991565008786413418608328705
data: hello
partitionKey: k1
sequenceNumber: 49535162123327736836226362425675822156409032816724017153
data: hello
partitionKey: k1
sequenceNumber: 49535162123327736836226362473531453190127716825632342017
data: hello
partitionKey: k1
Read 5 records.

Retrieving data, attempt 5
data retrieved
==Record chunk.
sequenceNumber: 49535162123327736836226361226397834013393393790669029377
data: hello
partitionKey: k1
sequenceNumber: 49535162123327736836226361326022690278750787438945239041
data: hello
partitionKey: k1
sequenceNumber: 49535162123327736836226361368991565008786413418608328705
data: hello
partitionKey: k1
sequenceNumber: 49535162123327736836226362425675822156409032816724017153
data: hello
partitionKey: k1
sequenceNumber: 49535162123327736836226362473531453190127716825632342017
data: hello
partitionKey: k1
Read 5 records.

Retrieving data, attempt 6
data retrieved
==Record chunk.
sequenceNumber: 49535162123327736836226361226397834013393393790669029377
data: hello
partitionKey: k1
sequenceNumber: 49535162123327736836226361326022690278750787438945239041
data: hello
partitionKey: k1
sequenceNumber: 49535162123327736836226361368991565008786413418608328705
data: hello
partitionKey: k1
sequenceNumber: 49535162123327736836226362425675822156409032816724017153
data: hello
partitionKey: k1
sequenceNumber: 49535162123327736836226362473531453190127716825632342017
data: hello
partitionKey: k1
Read 5 records.

Retrieving data, attempt 7
data retrieved
==Record chunk.
sequenceNumber: 49535162123327736836226361226397834013393393790669029377
data: hello
partitionKey: k1
sequenceNumber: 49535162123327736836226361326022690278750787438945239041
data: hello
partitionKey: k1
sequenceNumber: 49535162123327736836226361368991565008786413418608328705
data: hello
partitionKey: k1
sequenceNumber: 49535162123327736836226362425675822156409032816724017153
data: hello
partitionKey: k1
sequenceNumber: 49535162123327736836226362473531453190127716825632342017
data: hello
partitionKey: k1
sequenceNumber: 49535162123327736836226362486934026452529592727012114433
data: hello
partitionKey: k1
sequenceNumber: 49535162123327736836226362496631199651551572522497474561
data: hello
partitionKey: k1
sequenceNumber: 49535162123327736836226362521162196476315335329660796929
data: hello
partitionKey: k1
sequenceNumber: 49535162123327736836226362545355462718347236243547357185
data: hello
partitionKey: k1
sequenceNumber: 49535162123327736836226363990199997461027351869823909889
data: hello
partitionKey: k1
Read 10 records.

Retrieving data, attempt 8
data retrieved
==Record chunk.
sequenceNumber: 49535162123327736836226361226397834013393393790669029377
data: hello
partitionKey: k1
sequenceNumber: 49535162123327736836226361326022690278750787438945239041
data: hello
partitionKey: k1
sequenceNumber: 49535162123327736836226361368991565008786413418608328705
data: hello
partitionKey: k1
sequenceNumber: 49535162123327736836226362425675822156409032816724017153
data: hello
partitionKey: k1
sequenceNumber: 49535162123327736836226362473531453190127716825632342017
data: hello
partitionKey: k1
sequenceNumber: 49535162123327736836226362486934026452529592727012114433
data: hello
partitionKey: k1
sequenceNumber: 49535162123327736836226362496631199651551572522497474561
data: hello
partitionKey: k1
sequenceNumber: 49535162123327736836226362521162196476315335329660796929
data: hello
partitionKey: k1
sequenceNumber: 49535162123327736836226362545355462718347236243547357185
data: hello
partitionKey: k1
sequenceNumber: 49535162123327736836226363990199997461027351869823909889
data: hello
partitionKey: k1
Read 10 records.

Retrieving data, attempt 9
data retrieved
==Record chunk.
sequenceNumber: 49535162123327736836226361226397834013393393790669029377
data: hello
partitionKey: k1
sequenceNumber: 49535162123327736836226361326022690278750787438945239041
data: hello
partitionKey: k1
sequenceNumber: 49535162123327736836226361368991565008786413418608328705
data: hello
partitionKey: k1
sequenceNumber: 49535162123327736836226362425675822156409032816724017153
data: hello
partitionKey: k1
sequenceNumber: 49535162123327736836226362473531453190127716825632342017
data: hello
partitionKey: k1
sequenceNumber: 49535162123327736836226362486934026452529592727012114433
data: hello
partitionKey: k1
sequenceNumber: 49535162123327736836226362496631199651551572522497474561
data: hello
partitionKey: k1
sequenceNumber: 49535162123327736836226362521162196476315335329660796929
data: hello
partitionKey: k1
sequenceNumber: 49535162123327736836226362545355462718347236243547357185
data: hello
partitionKey: k1
sequenceNumber: 49535162123327736836226363990199997461027351869823909889
data: hello
partitionKey: k1
Read 10 records.

Retrieving data, attempt 10
data retrieved
==Record chunk.
sequenceNumber: 49535162123327736836226361226397834013393393790669029377
data: hello
partitionKey: k1
sequenceNumber: 49535162123327736836226361326022690278750787438945239041
data: hello
partitionKey: k1
sequenceNumber: 49535162123327736836226361368991565008786413418608328705
data: hello
partitionKey: k1
Read 3 records.

stream deleted
cloudify commented 10 years ago

Mmh this is weird, I haven't seen this issue yet, have you tried using different horizons? I'm actually not using Kinesis in production yet, so I haven't done any proper testing. The whole shard iterator thing for fetching the messages is not super super clear in the docs, I may have made a mistake in the fetch logic...

bamos commented 10 years ago

Hi Federico, I've found a solution to this. Instead of getting records by only the first shard iterator, an application should continuously use the nextIterator and poll for new records. The following code is working well for me.

    // Initialize the shard iterators.
    val initialShardIteratorsRequest = for {
      shards <- stream.get.shards.list
      initialShardIterator <- Future.sequence(shards.map {
        shard => implicitExecute(shard.iterator)
      })
    } yield initialShardIterator
    var shardIterators = Await.result(initialShardIteratorsRequest,
      30.seconds).asInstanceOf[List[ShardIterator]]

    // Continuously poll for records.
    while (shardIterators != null) {
      val recordChunksRequest = for {
        recordChunkIterator <- Future.sequence(shardIterators.map {
          iterator => implicitExecute(iterator.nextRecords)
        })
      } yield recordChunkIterator
      val recordChunks = Await.result(recordChunksRequest, 30.seconds)
      //println("recordChunks: " + recordChunks.toString)
      val nextShardIterators = new MutableList[ShardIterator]()
      for (recordChunk <- recordChunks) {
        //println("==Record chunk:" + recordChunk.toString)
        for (record <- recordChunk.records) {
          println("sequenceNumber: " + record.sequenceNumber)
          //printData(record.data.array())
          println("partitionKey: " + record.partitionKey)
        }
        nextShardIterators += recordChunk.nextIterator
      }
      shardIterators = nextShardIterators.toList
      Thread.sleep(1000)
    }
bamos commented 10 years ago

CC @alexanderdean