amazon-archives / kinesis-storm-spout

Kinesis spout for Storm
Other
106 stars 64 forks source link

Apache Storm - KinesisSpout throwing AmazonClientException backing off #28

Open sunny1978 opened 8 years ago

sunny1978 commented 8 years ago

2016-02-02 16:15:18 c.a.s.k.s.u.InfiniteConstantBackoffRetry [DEBUG] Caught exception of type com.amazonaws.AmazonClientException, backing off for 1000 ms.

I tested GET and PUT using Streams and Get requests - both worked flawless. I have all 3 variants Batch, Storm and Spark. Spark - used KinesisStreams - working Batch: Can you Get and Put - working Storm: planning to use KinesisSpout library from kinesis. It is failing with no clue.

Code: final KinesisSpoutConfig config = new KinesisSpoutConfig(streamname, zookeeperurl); config.withInitialPositionInStream(ipis); config.withRegion(Regions.fromName(regionName)); config.withCheckpointIntervalMillis(Integer.parseInt(checkinterval)); config.withZookeeperPrefix("kinesis-zooprefix-" + name);

System.setProperty("aws.accessKeyId", key); System.setProperty("aws.secretKey", keysecret); SystemPropertiesCredentialsProvider scp = new SystemPropertiesCredentialsProvider(); final KinesisSpout spout = new KinesisSpoutConflux(config, scp, new ClientConfiguration());

What am I doing wrong?

Storm Logs: 2016-02-02 16:15:17 c.a.s.k.s.KinesisSpout [INFO] KinesisSpoutConflux[taskIndex=0] open() called with topoConfig task index 0 for processing stream Kinesis-Conflux ... 2016-02-02 16:15:17 c.a.s.k.s.KinesisSpout [DEBUG] KinesisSpoutConflux[taskIndex=0] activating. Starting to process stream Kinesis-Test ... 2016-02-02 16:15:17 c.a.s.k.s.KinesisHelper [INFO] Using us-east-1 region

I dont see "nextTuple" getting called.

My Versions:

org.apache.storm
        <artifactId>storm-hdfs</artifactId>
        <version>0.9.3</version>
com.amazonaws kinesis-storm-spout 1.1.1
sunny1978 commented 8 years ago

Temp Fix: 1: Overwritten public class KinesisSpoutMine extends KinesisSpout overridden "open" ad added System.setProperty("aws.accessKeyId", "...key..."); System.setProperty("aws.secretKey", "...keysecret..."); super.open(..)

Next Issue: Spout cannot get messages from Stream. Its able to fetch - but cannot convert it

PUT Logic: PutRecordRequest putRecordRequest = new PutRecordRequest(); putRecordRequest.setStreamName(myStreamName); putRecordRequest.setData(ByteBuffer.wrap(String.format("Kinesis-Test:testData-%d", createTime).getBytes())); putRecordRequest.setPartitionKey(String.format("partitionKey-%d", createTime)); PutRecordResult putRecordResult = client.putRecord(putRecordRequest);

Outstanding Issues: 1: 0 byte files in HDFS 2: java.lang.IllegalArgumentException: message does not exist

Storm-KinesisSpout-getNext... (error in storm cluster) Result: All are 0 Byte Files being written to HDFS [root@sandbox logs]# hadoop fs -ls /user/conflux/storm/kinesishdfs Found 7 items -rw-r--r-- 3 storm hdfs 0 2016-02-03 01:22 /user/conflux/storm/kinesishdfs/HDFS-1-1-0-1454462371619.txt -rw-r--r-- 3 storm hdfs 0 2016-02-03 01:27 /user/conflux/storm/kinesishdfs/HDFS-1-1-0-1454462605731.txt -rw-r--r-- 3 storm hdfs 0 2016-02-03 01:33 /user/conflux/storm/kinesishdfs/HDFS-1-1-0-1454462874499.txt

2016-02-03 02:24:07 c.a.s.k.s.KinesisShardGetter [INFO] Seeking to com.amazonaws.services.kinesis.stormspout.ShardPosition@edf529e[pos=LATEST,sequenceNum=] 2016-02-03 02:24:07 c.a.s.k.s.s.z.ZookeeperStateManager [INFO] ZookeeperStateManager[taskIndex=0] got getter assignment. Handling [BufferedGetter[getter=KinesisShardGetter[shardId=shardId-000000000000]]]. 2016-02-03 02:24:07 c.b.p.s.KinesisSpoutConflux [INFO] activate:end 2016-02-03 02:24:07 c.b.p.s.KinesisSpoutConflux [INFO] nextTuple:start 2016-02-03 02:24:08 c.a.s.k.s.KinesisShardGetter [DEBUG] KinesisShardGetter[shardId=shardId-000000000000] fetched 0 records from Kinesis (requested 10000). 2016-02-03 02:24:08 c.a.s.k.s.KinesisSpout [DEBUG] KinesisSpoutConflux[taskIndex=0] Not committing to ZooKeeper. 2016-02-03 02:24:08 c.b.p.s.KinesisSpoutConflux [INFO] nextTuple:end 2016-02-03 02:24:08 c.b.p.s.KinesisSpoutConflux [INFO] nextTuple:start .... .... 2016-02-03 02:24:12 c.a.s.k.s.KinesisShardGetter [DEBUG] KinesisShardGetter[shardId=shardId-000000000000] fetched 0 records from Kinesis (requested 10000). 2016-02-03 02:24:12 c.a.s.k.s.KinesisSpout [DEBUG] KinesisSpoutConflux[taskIndex=0] committing local shard states to ZooKeeper. 2016-02-03 02:24:12 c.a.s.k.s.s.z.ZookeeperStateManager [DEBUG] ZookeeperStateManager[taskIndex=0]Local shard state for shardId-000000000000 was not dirty - not doing anything ..... .... 016-02-03 02:25:01 c.b.p.s.KinesisSpoutConflux [INFO] nextTuple:end 2016-02-03 02:25:01 c.b.p.s.KinesisSpoutConflux [INFO] nextTuple:start 2016-02-03 02:25:02 c.a.s.k.s.KinesisShardGetter [DEBUG] KinesisShardGetter[shardId=shardId-000000000000] fetched 1 records from Kinesis (requested 10000). 2016-02-03 02:25:02 c.a.s.k.s.KinesisSpout [DEBUG] KinesisSpoutConflux[taskIndex=0] emitting record with seqnum 49558606788284125675005869972919011606055542508908183554 from shard shardId-00000000000 0. 2016-02-03 02:25:02 STDIO [ERROR] java.lang.IllegalArgumentException: message does not exist 2016-02-03 02:25:02 STDIO [ERROR] at backtype.storm.tuple.Fields.fieldIndex(Fields.java:78) 2016-02-03 02:25:02 STDIO [ERROR] at backtype.storm.tuple.TupleImpl.fieldIndex(TupleImpl.java:100) 2016-02-03 02:25:02 STDIO [ERROR] at backtype.storm.tuple.TupleImpl.getValueByField(TupleImpl.java:149) 2016-02-03 02:25:02 STDIO [ERROR] at org.apache.storm.hdfs.bolt.format.DelimitedRecordFormat.format(DelimitedRecordFormat.java:80) 2016-02-03 02:25:02 STDIO [ERROR] at org.apache.storm.hdfs.bolt.HdfsBolt.execute(HdfsBolt.java:92) 2016-02-03 02:25:02 STDIO [ERROR] at com.bigdlabs.paso.sink.PasoHdfsBolt.execute(PasoHdfsBolt.java:37) 2016-02-03 02:25:02 STDIO [ERROR] at backtype.storm.daemon.executor$fn5697$tuple_action_fn5699.invoke(executor.clj:659) 2016-02-03 02:25:02 STDIO [ERROR] at backtype.storm.daemon.executor$mk_task_receiver$fn5620.invoke(executor.clj:415) 2016-02-03 02:25:02 STDIO [ERROR] at backtype.storm.disruptor$clojure_handler$reify__1741.onEvent(disruptor.clj:58) 2016-02-03 02:25:02 STDIO [ERROR] at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125) 2016-02-03 02:25:02 STDIO [ERROR] at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99) 2016-02-03 02:25:02 STDIO [ERROR] at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80) 2016-02-03 02:25:02 STDIO [ERROR] at backtype.storm.daemon.executor$fn5697$fn5710$fn5761.invoke(executor.clj:794) 2016-02-03 02:25:02 STDIO [ERROR] at backtype.storm.util$async_loop$fn__452.invoke(util.clj:465) 2016-02-03 02:25:02 STDIO [ERROR] at clojure.lang.AFn.run(AFn.java:24) 2016-02-03 02:25:02 STDIO [ERROR] at java.lang.Thread.run(Thread.java:745) 2016-02-03 02:25:02 c.a.s.k.s.s.z.InflightRecordTracker [DEBUG] Shard shardId-000000000000: Recorded emit for seq num 49558606788284125675005869972919011606055542508908183554, isRetry = false, retryNum = 0 2016-02-03 02:25:02 c.a.s.k.s.KinesisSpout [DEBUG] KinesisSpoutConflux[taskIndex=0] Not committing to ZooKeeper. 2016-02-03 02:25:02 c.b.p.s.KinesisSpoutConflux [INFO] nextTuple:end 2016-02-03 02:25:02 c.b.p.s.KinesisSpoutConflux [INFO] nextTuple:start 2016-02-03 02:25:02 c.a.s.k.s.KinesisShardGetter [DEBUG] KinesisShardGetter[shardId=shardId-000000000000] fetched 0 records from Kinesis (requested 10000). 2016-02-03 02:25:02 c.a.s.k.s.KinesisSpout [DEBUG] KinesisSpoutConflux[taskIndex=0] Not committing to ZooKeeper. 2016-02-03 02:25:02 c.b.p.s.KinesisSpoutConflux [INFO] nextTuple:end 2016-02-03 02:25:02 c.b.p.s.KinesisSpoutConflux [INFO] nextTuple:start 2016-02-03 02:25:02 c.a.s.k.s.KinesisSpout [DEBUG] KinesisSpoutConflux[taskIndex=0] Not committing to ZooKeeper.

sunny1978 commented 8 years ago

Fixed

1: 0 byte files in HDFS 2: java.lang.IllegalArgumentException: message does not exist

Fix: 1: public class MyKinesisRecordScheme implements IKinesisRecordScheme { public static final String FIELD_RECORD_DATA = "message"; public static final String FIELD_SEQUENCE_NUMBER = "sequenceNumber"; public static final String FIELD_PARTITION_KEY = "partitionKey";

public Fields getOutputFields() { return new Fields(FIELD_PARTITION_KEY, FIELD_SEQUENCE_NUMBER, FIELD_RECORD_DATA); }

}

2: final KinesisSpoutConfig config = new KinesisSpoutConfig(streamname, zookeeperurl); ....
config.withKinesisRecordScheme(new KinesisRecordSchemeConflux()); ... final KinesisSpout spout = new KinesisSpoutConflux(config, new CustomCredentialsProviderChain(), new ClientConfiguration());