On Kafka, if with_target_records_lag is provided, then records-lag-max is used as a metric to auto scale consumers to reach the specified acceptable lag;
On Kinesis, if with_target_millis_lag is provided, then MillisBehindLatestis used as a metric to auto scale consumers to reach the specified acceptable lag;
auto scaling doesn't scale higher than the number of partitions/shards.
A simple auto scaling Kafka example that starts with 10 consumers, growing and shrinking to reach the specified acceptable lag:
use amadeus::prelude::*;
#[tokio::main]
async fn main() {
let pool = ThreadPool::new()?;
let messages = Kafka::new(vec!["localhost:9092".parse().unwrap()])
.with_topic("my-topic".to_owned())
.with_group("my-group".to_owned())
.with_consumers(10)
.with_target_records_lag(10_000)
.build().await?;
messages
.par_stream()
.for_each(|messages: Result<KafkaMessageSet, _>| async {
let messages = messages.expect("kafka error");
for msg in &messages {
println!("received message {}: {} {}", msg.offset, msg.key, msg.value);
}
messages.consume();
messages.commit();
});
panic!("All Kafka consumers have died");
}
Need clarification:
Semantics of message consuming and committing: how much control should the API expose, or is it acceptable for Amadeus to handle one or both of those? How/why do applications rely on explicit consuming/committing?
How that interacts with streaming transformations like tumbling window/batching where the user wants n messages, not n message sets?
The Kinesis Client Library (KCL) creates a DynamoDB table to "track and maintain state information such as resharding events and sequence number checkpoints". It doesn't have a Rust version :'(
Is there a de facto standard serialization format for messages?
For Kinesis, I'm not sure how valuable a "raw" Kinesis Source, rather than Kinesis + DynamoDB like the KCL does, would be? Unfortunately writing a KCL equivalent for Rust is a bit much for me atm.
Assumptions:
If a Kafka topic/Kinesis data stream has 100 partitions/shards, the maximum parallelism is infinite producers but only 100 consumers. The number of partitions/shards can change dynamically (I'm not sure how commonly done this is?). It's a valid use case to have say 1000 partitions/shards, but only say 50 consumers; this provides "headroom" to scale up the number of consumers dynamically based on data volumes.
Proposed design for consuming from Kafka/Kinesis:
with_consumers(n)
;with_target_records_lag
is provided, thenrecords-lag-max
is used as a metric to auto scale consumers to reach the specified acceptable lag;with_target_millis_lag
is provided, thenMillisBehindLatest
is used as a metric to auto scale consumers to reach the specified acceptable lag;A simple auto scaling Kafka example that starts with 10 consumers, growing and shrinking to reach the specified acceptable lag:
Need clarification:
n
messages, notn
message sets?Source
, rather than Kinesis + DynamoDB like the KCL does, would be? Unfortunately writing a KCL equivalent for Rust is a bit much for me atm.Assumptions: