amazon-archives / kinesis-storm-spout

Kinesis spout for Storm
Other
106 stars 64 forks source link

KinesisSpout throwing IllegalArgumentException #27

Open rzachariah opened 8 years ago

rzachariah commented 8 years ago

I'm encountering an exception when I submit the SampleTopology with a KinesisSpout to a cluster.

com.esotericsoftware.kryo.KryoException: java.lang.IllegalArgumentException: Can not set java.util.List field com.amazonaws.auth.AWSCredentialsProviderChain.credentialsProviders to KinesisStormClickstreamApp.CustomCredentialsProviderChain Serialization trace: credentialsProviders (KinesisStormClickstreamApp.CustomCredentialsProviderChain) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) at com.amazonaws.services.kinesis.stormspout.SerializationHelper.kryoDeserializeObject(SerializationHelper.java:74) at com.amazonaws.services.kinesis.stormspout.KinesisHelper.getKinesisCredsProvider(KinesisHelper.java:123) at com.amazonaws.services.kinesis.stormspout.KinesisHelper.makeNewKinesisClient(KinesisHelper.java:108) at com.amazonaws.services.kinesis.stormspout.KinesisHelper.getSharedkinesisClient(KinesisHelper.java:116) at com.amazonaws.services.kinesis.stormspout.KinesisHelper.getShardList(KinesisHelper.java:85) at com.amazonaws.services.kinesis.stormspout.state.zookeeper.ZookeeperStateManager.activate(ZookeeperStateManager.java:109) at com.amazonaws.services.kinesis.stormspout.KinesisSpout.activate(KinesisSpout.java:125) at backtype.storm.daemon.executor$fn__6579$fn__6594$fn__6623.invoke(executor.clj:563) at backtype.storm.util$async_loop$fn__459.invoke(util.clj:463) at clojure.lang.AFn.run(AFn.java:24) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.IllegalArgumentException: Can not set java.util.List field com.amazonaws.auth.AWSCredentialsProviderChain.credentialsProviders to KinesisStormClickstreamApp.CustomCredentialsProviderChain at sun.reflect.UnsafeFieldAccessorImpl.throwSetIllegalArgumentException(UnsafeFieldAccessorImpl.java:164) at sun.reflect.UnsafeFieldAccessorImpl.throwSetIllegalArgumentException(UnsafeFieldAccessorImpl.java:168) at sun.reflect.UnsafeObjectFieldAccessorImpl.set(UnsafeObjectFieldAccessorImpl.java:81) at java.lang.reflect.Field.set(Field.java:741) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:619)

The sample is here. https://github.com/awslabs/aws-big-data-blog/tree/master/aws-blog-kinesis-storm-clickstream-app

rzachariah commented 8 years ago

Here's the code that creates and sets the spout.

        final KinesisSpoutConfig config =
                new KinesisSpoutConfig(streamName, zookeeperEndpoint).withZookeeperPrefix(zookeeperPrefix)
                        .withInitialPositionInStream(initialPositionInStream)
                        .withRegion(Regions.fromName(regionName));

        final KinesisSpout spout = new KinesisSpout(config, new CustomCredentialsProviderChain(), new ClientConfiguration());
        TopologyBuilder builder = new TopologyBuilder();
        LOG.info("Using Kinesis stream: " + config.getStreamName());

        // Using number of shards as the parallelism hint for the spout.
        builder.setSpout("Kinesis", spout, 2);

Here's the CustomCredentialsProviderChain

/*
 * Copyright 2013-2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
 *
 * Licensed under the Amazon Software License (the "License").
 * You may not use this file except in compliance with the License.
 * A copy of the License is located at
 *
 * http://aws.amazon.com/asl/
 *
 * or in the "license" file accompanying this file. This file is distributed
 * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
 * express or implied. See the License for the specific language governing
 * permissions and limitations under the License.
 */

package KinesisStormClickstreamApp;

import com.amazonaws.auth.AWSCredentialsProviderChain;
import com.amazonaws.auth.ClasspathPropertiesFileCredentialsProvider;
import com.amazonaws.auth.EnvironmentVariableCredentialsProvider;
import com.amazonaws.auth.InstanceProfileCredentialsProvider;
import com.amazonaws.auth.SystemPropertiesCredentialsProvider;

/**
 * Credentials provider chain that is similar to DefaultAWSCredentialsProviderChain,
 * but also includes the ClasspathPropertiesFileCredentialsProvider.
 */
public class CustomCredentialsProviderChain extends AWSCredentialsProviderChain {

    public CustomCredentialsProviderChain() {
        super(new EnvironmentVariableCredentialsProvider(),
                new SystemPropertiesCredentialsProvider(),
                new ClasspathPropertiesFileCredentialsProvider("AwsCredentials.properties"),
                new InstanceProfileCredentialsProvider());
    }
}