awslabs / amazon-kinesis-client

Client library for Amazon Kinesis
Apache License 2.0
644 stars 467 forks source link

Make kinesis credential provider optional when kinesis client is already pre-configured with credential provider #168

Open Jokerobo opened 7 years ago

Jokerobo commented 7 years ago

Hi there,

I'm using KCL and providing fully configured dynamo & cloudwatch & kinesis client to the worker. I managed to make it work without specifying dynamo and cloudwatch credential providers directly to the worker but to the aws client instead. However, seems like a kinesis credential provider is required, even though my kinesis client is preconfigured with a credential provider. I'm getting a null pointer exception when I didn't specify kinesis client credential provider.

Can we make kinesis credential provider optional? Feels weird to pass the credential provider twice to both kinesis client and the worker.

Related code can be found here in Worker.build():

return new Worker(config.getApplicationName(),
                    recordProcessorFactory,
                    // this is where the kinesis credential provider is required
                    new StreamConfig(new KinesisProxyFactory(config.getKinesisCredentialsProvider(),
                            kinesisClient).getProxy(config.getStreamName()),
                            config.getMaxRecords(),
                            config.getIdleTimeBetweenReadsInMillis(),
                            config.shouldCallProcessRecordsEvenForEmptyRecordList(),
                            config.shouldValidateSequenceNumberBeforeCheckpointing(),
                            config.getInitialPositionInStreamExtended()),
                    config.getInitialPositionInStreamExtended(),
                    config.getParentShardPollIntervalMillis(),
                    config.getShardSyncIntervalMillis(),
                    config.shouldCleanupLeasesUponShardCompletion(),
                    null,
                    new KinesisClientLibLeaseCoordinator(new KinesisClientLeaseManager(config.getTableName(),
                            dynamoDBClient),
                            config.getWorkerIdentifier(),
                            config.getFailoverTimeMillis(),
                            config.getEpsilonMillis(),
                            config.getMaxLeasesForWorker(),
                            config.getMaxLeasesToStealAtOneTime(),
                            metricsFactory)
                        .withInitialLeaseTableReadCapacity(config.getInitialLeaseTableReadCapacity())
                        .withInitialLeaseTableWriteCapacity(config.getInitialLeaseTableWriteCapacity()),
                    execService,
                    metricsFactory,
                    config.getTaskBackoffTimeMillis(),
                    config.getFailoverTimeMillis(),
                    config.getSkipShardSyncAtWorkerInitializationIfLeasesExist(),
                    shardPrioritization);

And this is where credential provider will be invoked throwing a NullPointerException:

public class KinesisProxy implements IKinesisProxyExtended {
    @Override
    public GetRecordsResult get(String shardIterator, int maxRecords)
        throws ResourceNotFoundException, InvalidArgumentException, ExpiredIteratorException {

        final GetRecordsRequest getRecordsRequest = new GetRecordsRequest();
        getRecordsRequest.setRequestCredentials(credentialsProvider.getCredentials());
        getRecordsRequest.setShardIterator(shardIterator);
        getRecordsRequest.setLimit(maxRecords);
        final GetRecordsResult response = client.getRecords(getRecordsRequest);
        return response;

    }
}
pfifer commented 7 years ago

Thanks for recommending this. The handling of clients in the KCL needs some updates to better integrate with the 1.11, and newer SDKs.

If anyone else is interested please respond or provide a reaction.

ObviousDWest commented 6 years ago

Given that the KPL handles credentials using a series of default fallbacks, I'd suggest this work the same way (e.g. when no credentials are passed in). Having a sample KPL working using ~/.aws/credentials already, it felt strange to have to set things up again "manually" to create a reader.