snowplow-archive / kinesis-example-scala-consumer

Example Scala/SBT event consumer for Amazon Kinesis
http://snowplowanalytics.com
22 stars 14 forks source link

Potentially switch to implementing IRecordProcessor #6

Closed bamos closed 10 years ago

bamos commented 10 years ago

Hi @alexanderdean, the Amazon documentation for developing a consumer application recommends implementing an IRecordProcessor instead of using the AWS library directly. Do you think we should switch to using it?

From the documentation:

Although you can use the Amazon Kinesis service API to get data from an Amazon Kinesis stream, we recommend using the design patterns and code for Kinesis applications provided here.

alexanderdean commented 10 years ago

Hi Brandon - yep I think we should switch to using IRecordProcessor for the consumer, good idea. That should hopefully fix our issues?

Perhaps we should suggest cloudify puts a warning on his direct read API methods?

BTW, for some examples of AWS using IRecordProcessor see: https://github.com/awslabs/amazon-kinesis-connectors eg:

https://github.com/awslabs/amazon-kinesis-connectors/blob/master/src/main/java/com/amazonaws/services/kinesis/connectors/KinesisConnectorRecordProcessor.java

bamos commented 10 years ago

Hi @alexanderdean,

On the IRecordProcessor, I'm starting by getting the java example working, and then I'll convert everything to Scala and tweak to what we need.

Though, one problem is that, per the documentation, using the IRecordProcessor requires DynamoDB access:

Each Kinesis application has a unique name and operates on one specific stream. The application name must be unique because it is used to create an Amazon DynamoDB table that maintains state for the application. Note that your account will be charged for the costs associated with this Amazon DynamoDB table in addition to the costs associated with Amazon Kinesis itself.

I don't think this can be circumvented because IRecordProcessor is passed into the amazon-kinesis-client library. The main entry point is in the Worker class, which doesn't seem to provide an alternate to using DynamoDB.

Do you still want to switch to implementing IRecordProcessor?

If so, can you give me minimal DynamoDB permissions?

Running the sample code gives the following error:

AmazonServiceException: Status Code: 400, AWS Service: AmazonDynamoDBv2, AWS Request ID: XXXXX, AWS Error Code: AccessDeniedException, AWS Error Message: User: arn:aws:iam::NNNN:user/brandon is not authorized to perform: dynamodb:CreateTable on resource: arn:aws:dynamodb:us-east-1:NNNNN:table/SnowplowExampleConsumer

alexanderdean commented 10 years ago

Hi @bamos - yes I think DynamoDB is pretty tightly coupled into Kinesis to manage state (stream position etc), so can't be avoided really!

I will give you DynamoDB permissions and update this ticket when you have them...

alexanderdean commented 10 years ago

Hi Brandon, you should be good to go now. Let me know if there are any issues with your access.

bamos commented 10 years ago

Thanks, works great!

bamos commented 10 years ago

Hi @alexanderdean, I've merged everything into the feature/bamos-updates branch and everything's working smoothly for these issues. Please open (or reopen) if you want me to change anything on the Kinesis consumer.

alexanderdean commented 10 years ago

Nice work!