Open a8m opened 8 years ago
Are there any plans to implement this? It would be nice to have access to the partition keys of the user records.
In the meantime, it could be helpful to document this. https://github.com/a8m/kinesis-producer/blob/master/aggregation-format.md is misleading.
It took some digging into the code and finding https://github.com/a8m/kinesis-producer/blob/master/aggregator.go#L34-L37 to realize why my partition keys weren't what I was expecting on the consumer side.
Hey @jawang35, Thanks for bringing up this issue.
I actually started implementing this a few years ago, and it made this package much complicated and no more stateless, unlike today. So, I've abandoned this. There's a chance I'll come back to it in the future, and add this, but I don't want to promise.
The general idea I used was to hold an aggregator per shard, and a ShardMap
container for managing this logic. The producer was interacting with the ShardMap
instead of a single aggregator (as it's today).
type ShardMap struct {
shards []*kinesis.Shard
aggregators []*Aggregator
// internal state.
}
// periodically fetch the kinesis stream shards.
// shardByPK returns the kinesis shard from the given partition key.
func (m *ShardMap) shardByPK(pk string) (*kinesis.Shard, bool) {
hk := hashKey(pk)
i, j := 0, len(m.shards)-1
for i < j {
shard := m.shards[(i+j)/2]
start, end := big.NewInt(int64(0)), big.NewInt(int64(0))
start, _ = start.SetString(*shard.HashKeyRange.StartingHashKey, 10)
end, _ = end.SetString(*shard.HashKeyRange.EndingHashKey, 10)
if start.Sub(start, hk).Sign() == -1 && end.Sub(end, hk).Sign() == 1 {
return shard, true
}
// binary search.
}
return nil, false
}
// Calculate a new explicit hash key based on the given partition key.
// (following the algorithm from the original KPL).
func hashKey(pk string) *big.Int {
h := md5.New()
h.Write([]byte(pk))
sum := h.Sum(nil)
hk := big.NewInt(int64(0))
for i := 0; i < md5.Size; i++ {
p := big.NewInt(int64(sum[i]))
p = p.Lsh(p, uint((16-i-1)*8))
hk = hk.Add(hk, p)
}
return hk
}
I can also share that this change wasn't really needed for my use cases. I worked on a few projects that used Kinesis. Some of them had tens shards, some of them had hundreds. In both cases, the scale was high, and the KCLs were configured to "checkpoint" every 5K (a few seconds).
The components that were producing the data to Kinesis use some of the logic I've added above, this way:
The code for the Producer
was as follows:
pk := goodPK()
producer.Put(buf, pk)
// ...
// This code uses the example above.
func goodPK() (pk string) {
// the real code was much optimized; use cache, etc.
for i := 0; i < tries; i++ {
pk = uuid.V4()
shard, ok := shardByPK(pk)
if ok && !shard.IsHot() {
return pk
}
}
return
}
I hope this helps you somehow, and I'll try to better document this in the README. Thanks
Is keeping the architecture stateless a priority for this library? The single aggregator is still stateful (though significantly simpler) isn't it?
I'm a little new to Kinesis and streaming architectures. I read that size of the partition key is included in the 1MiB of data per shard and my data is organized as key/value pairs so I was hoping to use the key as the partition key to save on ingress. But that means that I'd need access to the partition keys of the user records. Is this an antipattern? I could easily use a UUID for the partition key and just include the key in the user record data.
Alternatively why not include the user partition keys even if we aren't mapping them to the correct shards? The shard mapping behavior is the same anyways but at least we have access to the user partition keys. If you're open to this as a temporary solution I can open a PR for it.
Is there any plan to check the above PR and merge, I need this feature. I want to create Aggregated Record which holds only those user records that are supposed to at a specific shard
Useful links: