Adds condition support to processor Applier interface and methods
Adds support to process streaming data using channels
Adds exported processor Streamer interface (with example in examples/streaming/)
Adds stream type to internal/transform
Refactors process/aggregate to use a private method for aggregating data
Fixes some tests and refactors some imports
Motivation and Context
This PR adds a third type of data transformation called "streaming" (or "stream") that uses channels to process data. This solves a couple problems:
Running processors on continuous streams of data, like data read from a network (socket, streaming RPC, etc.)
Processing data as a pipeline and not a batch
The second problem is the most impactful because it improves how users run our default ITL application. With stream support the application creates a concurrency pipeline for data processing, so data is always sent to the next processor in series whenever the processor is ready to accept more data. This is different from the batch transform, where data must always be sent to the next processor as a group; batching data can create unintentional bottlenecks in the system depending on the configuration. Eventually stream processing should be the default settings for all non-transfer data processing use cases.
This PR also adds condition support to the processor Applier interface, which means that the Apply method now checks if the data passes a condition before processing. This simplifies some of the batching code and should reduce user confusion (e.g., "why is the configured condition not working?"), but also solves a problem we have with meta-processors like process/pipeline not using configured conditions.
How Has This Been Tested?
Added and updated unit tests, integration tested on a high-volume production deployment. Here's some real-world evidence from AWS X-Ray of how the streaming transformation differs from batch transformation:
Streaming
Batch
Notice how in the streaming screencap the Kinesis PutRecord calls occur before any DynamoDB GetItem calls -- that's because the concurrency pipeline is continuously sending data to the sink instead of waiting for all data to be processed first. In the batching screencap, all data processing (i.e., calls to DynamoDB GetItem) must complete before sending data to the sink.
Types of changes
[ ] Bug fix (non-breaking change which fixes an issue)
[x] New feature (non-breaking change which adds functionality)
[ ] Breaking change (fix or feature that would cause existing functionality to change)
Checklist:
[x] My code follows the code style of this project.
[x] My change requires a change to the documentation.
Description
examples/streaming/
)internal/transform
process/aggregate
to use a private method for aggregating dataMotivation and Context
This PR adds a third type of data transformation called "streaming" (or "stream") that uses channels to process data. This solves a couple problems:
The second problem is the most impactful because it improves how users run our default ITL application. With stream support the application creates a concurrency pipeline for data processing, so data is always sent to the next processor in series whenever the processor is ready to accept more data. This is different from the batch transform, where data must always be sent to the next processor as a group; batching data can create unintentional bottlenecks in the system depending on the configuration. Eventually stream processing should be the default settings for all non-transfer data processing use cases.
This PR also adds condition support to the processor Applier interface, which means that the
Apply
method now checks if the data passes a condition before processing. This simplifies some of the batching code and should reduce user confusion (e.g., "why is the configured condition not working?"), but also solves a problem we have with meta-processors likeprocess/pipeline
not using configured conditions.How Has This Been Tested?
Added and updated unit tests, integration tested on a high-volume production deployment. Here's some real-world evidence from AWS X-Ray of how the streaming transformation differs from batch transformation:
Streaming
Batch
Notice how in the streaming screencap the Kinesis PutRecord calls occur before any DynamoDB GetItem calls -- that's because the concurrency pipeline is continuously sending data to the sink instead of waiting for all data to be processed first. In the batching screencap, all data processing (i.e., calls to DynamoDB GetItem) must complete before sending data to the sink.
Types of changes
Checklist: