This package provides an interface to the Amazon Kinesis Client Library (KCL) MultiLangDaemon for the .NET Framework.
Developers can use the KCL to build distributed applications that process streaming data reliably at scale. The KCL takes care of many of the complex tasks associated with distributed computing, such as load balancing across multiple instances, responding to instance failures, checkpointing processed records, and reacting to changes in stream volume.
This package wraps and manages the interaction with the MultiLangDaemon, which is provided as part of the Amazon KCL for Java so that developers can focus on implementing their record processing logic.
A record processor in C# typically looks something like the following:
using Amazon.Kinesis.ClientLibrary;
namespace Sample
{
public class RecordProcessor : IShardRecordProcessor
{
public void Initialize(InitializationInput input)
{
//
// Initialize the record processor
//
}
public void ProcessRecords(ProcessRecordsInput input)
{
//
// Process a batch of records from input.Records, and optionally checkpoint by calling
// input.Checkpointer.Checkpoint()
//
}
public void LeaseLost(LeaseLossInput leaseLossInput)
{
//
// Perform any cleanup required.
// This record processor has lost it's lease so checkpointing is not possible.
// This is why LeaseLostInput does not provide a Checkpointer property,
//
}
public void ShardEnded(ShardEndedInput shardEndedInput)
{
//
// The record process has processed all records in the shard, and will no longer receive records.
// It is required that this method call shardEndedInput.Checkpointer.Checkpoint() to inform the KCL
// that the record processor has acknowledged the completion of the shard.
//
}
public void ShutdownRequested(ShutdownRequestedInput shutdownRequestedInput)
{
//
// This is called when the KCL is being shutdown, and if desired the record processor can checkpoint here
// by calling shutdownRequestedInput.Checkpointer.Checkpoint(...)
//
}
}
internal class MainClass
{
public static void Main(string[] args)
{
KclProcess.Create(new RecordProcessor()).Run();
}
}
}
For more information about Amazon Kinesis and the client libraries, see the official documentation as well as the Amazon Kinesis forums.
Before running a KCL application, make sure that your environment is configured to allow the MultiLangDaemon to access your AWS security credentials.
If you've installed the AWS SDK for .NET, you may have already configured your AWS credentials using the SDK credential store in Microsoft Visual Studio; however, because Amazon KCL for .NET applications never deal with your credentials directly but defer to the MultiLangDaemon, this store is not available to your KCL application.
Instead, you can provide your credentials through environment variables (AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY) or a credential profile in your home directory.
In addition to source code for the Amazon KCL for .NET itself, this repository contains a sample application, which can serve as a starting point for your KCL application. To try out this sample application, you can download a ZIP of the latest sources, the contents of which can be opened as a solution in Microsoft Visual Studio.
The sample application consists of two projects:
A data producer (SampleProducer\SampleProducer.cs)
This program creates an Amazon Kinesis stream and continuously puts random records into it. There is commented-out code that deletes the created stream at the end; however, you should uncomment and use this code only if you do not intend to run SampleConsumer.
A data processor (SampleConsumer\SampleConsumer.cs)
A new instance of this program is invoked by the MultiLangDaemon for each shard in the stream. It consumes the data from the shard. If you no longer need to work with the stream after running SampleConsumer, remember to delete both the Amazon DynamoDB checkpoint table and the Kinesis stream in your AWS account.
The following defaults are used in the sample application:
To run the data producer, run the SampleProducer project.
Because the Amazon KCL for .NET requires the MultiLangDaemon, which is provided by the Amazon KCL for Java, a bootstrap program has been provided. This program downloads all required dependencies prior to invoking the MultiLangDaemon, which executes the processor as a subprocess.
To run the processor, first build the SampleConsumer project, then run the bootstrap project with the following configuration:
--properties kcl.properties --execute
--execute
argument, the bootstrap program outputs a command that can be used to start the KCL directly.kcl.properties
file, which contains a few important settings:
Bootstrap
is able to find its path.This sample application creates a few resources in the default region of your AWS account:
Each of these resources will continue to incur AWS service costs until they are deleted. After you are finished testing the sample application, you can delete these resources through the AWS Management Console.
The Amazon KCL for .NET uses the Amazon KCL for Java internally. We have implemented a Java-based daemon, called the MultiLangDaemon, which handles all of the heavy lifting. Our approach has the daemon spawn the user-defined record processor program as a sub-process. The MultiLangDaemon communicates with this sub-process over standard input/output using a simple protocol, and therefore the record processor program can be written in any language.
At runtime, there will always be a one-to-one correspondence between a record processor, a child process, and an Amazon Kinesis shard. The MultiLangDaemon ensures that, without any developer intervention.
In this release, we have abstracted these implementation details and exposed an interface that enables you to focus on writing record processing logic in C#. This approach enables the Amazon KCL to be language-agnostic, while providing identical features and similar parallel processing model across all languages.
RegisterStreamConsumer
SubscribeToShard
DescribeStreamConsumer
DescribeStreamSummary
IShardRecordProcessor
. This interface closely matches the Java ShardRecordProcessor
interface.IRecordProcessor
interface remains present, and will continue to work it's recommended to upgrade to the newer interface.
Shutdown
method from IRecordProcessor
has been replaced by LeaseLost
and ShardEnded
.LeaseLost
method which is invoked when a lease is lost.LeaseLost
replaces Shutdown
where ShutdownInput.Reason
was ShutdownReason.ZOMBIE
.ShardEnded
method which is invoked when all records from a split or merge have been processed.ShardEnded
replaces Shutdown
where ShutdownInput.Reason
was ShutdownReason.TERMINATE
.ShutdownRequested
which provides the record processor a last chance to checkpoint during the Amazon Kinesis Client Library shutdown process before the lease is canceled. timeoutInSeconds=<seconds to wait>
to your properties file.-l
or --log-configuration
to provide a Logback XML configuration file.This library is licensed under the Apache 2.0 License.