Closed glangford closed 10 years ago
It's an interesting idea, but KinesisClient doesn't provide a callback based api, so I'm not sure how you would use get-records with core.async. And the Kinesis Client Library already uses an unbounded thread pool under the hood...
Maybe I am misunderstanding - the intent is not to use get-records. The idea is to use IRecordProcessor
almost the same as today. i.e. adapt processRecords
in processor-factory
as outlined in Step 1.
I guess I'd be interested to see more specifics about the actual use case. core.async is useful to let you program synchronously while still working with a callback based api. A good example would be where you're making an async http call, with something like http-kit, and you want to do something with the response. But the processing didn't originate with that http call, there was some thread of execution that existed before, a request for a stock quote, whatever, that needs to make the http call and do something with the response. So it's nice to trade that callback api for core.async channels. But with Kinesis, you've just got this stream, the whole application begins with the function you provide. There's technically a callback api with the worker, but you don't have to return any value to some other thread of execution looking to do something with the result.
The other benefit of core.async is optimization, as you're tying up fewer threads than if you were using futures for everything. But the KCL is already invoking every call to processRecords in its own thread, so you're not saving anything there. (Technically, you'd actually be tying up more threads as soon as you introduce core.async.)
So basically, the only advantage could be the api, which I guess I'm just not seeing what the advantage to <!'ing off a channel would be as compared to a plain old function, when every value that you take off the channel would have been put there on its own thread from the KCL pool.
(Technically, you could supply your own ExecutorService to KCL, but I think that'll prove to be non-trivial.)
I agree that a guiding use case would be very helpful. I don't have a specific one, but with Directed Acyclic Graphs (referred to in the Kinesis docs for more complex stream processing) it would be much better to create a DAG with channels than to emit from one Kinesis stream to another Kinesis stream.
A major advantage is in the API as you say. Also the benefit of separating stream processing components from each other (and from Kinesis). The result might be similar to Apache Storm but lacking the exactly-once semantics.
Food for thought anyway as core.async evolves.
I am still learning the details of Kinesis and other AWS services, but here is one generic use case to consider: taking Kinesis data, processing it, and storing it reliably in DynamoDB, Redshift, or S3. Apologies for the length.
KinesisConnectorRecordProcessor
is the base class for any KinesisConnector. It implements IRecordProcessor
.
https://github.com/awslabs/amazon-kinesis-connectors/blob/master/src/main/java/com/amazonaws/services/kinesis/connectors/KinesisConnectorRecordProcessor.javaprocessRecords
method uses application defined filter and transformer callbacks, and stores the new records in a bufferKey design issue: records cannot be checkpointed until they are stored (in DynamoDB, Redshift, S3, etc) otherwise data can be lost
IRecordProcessor
-based approach works without risk of data loss; but often it is better to aggregate records and store them in chunks according to a defined policy. KinesisConnectorRecordProcessor
scheme was used and the app supplied the appropriate callback methods. Or, channels could be used to decouple the processing pipeline and checkpointing from record ingestion. For this use case, the main benefits of using core.async channels might be:
@mcohen01 I am almost finished a complete implementation, including a test framework that mocks Kinesis so that tests can be done outside of AWS. It won't be merge-worthy yet, but I will send a pull request soon anyway to show the general direction and get any feedback.
This is a relevant discussion thread, just fyi: https://forums.aws.amazon.com/message.jspa?messageID=531052
Background
There would be a number of benefits for some Clojure apps if a Kinesis shard could be presented as a Clojure core.async channel. Delivering shards as channels would create new options for Clojure stream consumers, beyond the limited Kinesis notion of worker and record processor and the bandwidth and other limits applied to shards.
Here is an idea for one method of doing this in Amazonica, in case it is useful.
(Disclaimer - This is a rough sketch based on an inexpert read of the AWS documentation and what I understand so far of core.async.)
Step 1 - Basic Kinesis shard->core.async channel
worker
in kinesis.clj).processRecords
inprocessor-factory
) performs blocking writes to the channel for each Kinesis record (rather than calling aprocessor
function for each record)This of course is insufficient on its own - by simply dumping shard records on a channel, we have lost the ability to know when each record is "done". We don't know when the records will be read from the channel, or when they will be processed. So a new mechanism is required to restore the ability to checkpoint sequence numbers in the shard.
Step 2 - Checkpointing
Only the core.async app knows when a record is really done; one method to communicate "doneness" back to Amazonica is by using another channel. The app can write to this channel to send completed sequence numbers back to Amazonica.
(chan (sliding-buffer 1))
. A sliding buffer channel will drop oldest values; in this case only the latest put survives.Step 3 - Complete version with checkpointing
To combine the previous steps, a variant of
processor-factory
does the following:checkpoint(String sequenceNumber)
variant ofIRecordProcessorCheckpointer
For step 3: note that
checkpoint()
(no arguments) currently used in kinesis.clj checkpoints the progress at the last record that was delivered to the record processor; with channels, we want to checkpoint a specific sequence number. This capability is added in the Kinesis Client Library version 1.1.One way (not sure if this is idiomatic) to get the latest value from the checkpoint channel, without waiting if nothing is available:
(alts!! [checkpoint-channel (timeout 0)] :priority true)
If not nil, the returned value is checkpointed.
See also:
https://github.com/awslabs/amazon-kinesis-client/blob/master/src/main/java/com/amazonaws/services/kinesis/clientlibrary/interfaces/IRecordProcessorCheckpointer.java
https://forums.aws.amazon.com/message.jspa?messageID=531052