Open Frank-Krick opened 5 years ago
Hmm.. Interesting problem.
I’ll start with some clarifications regarding the changes made in 3.0 which have brought us here. For Orleans persistent streams to integrate with other systems, including legacy systems and data feeds not generated by streams themselves, there was a strong need to be able to specialize how data was converted to and from the underlying queuing system. This necessitated a customizable translation between the data in the queue and the data in the cache. Layers already existed for these translations, but were quite cumbersome to use.
To expose this layer as a pluggable component, we needed both the cache and the queue sides of the translation to be formalized. The queue side is defined by the underlying persistent queue so we need conform to that, but the cache side could be standardized.
Previously the cache was quite generic, too generic to easily support a uniform surface. This was by design, as we wanted to support the caching of a wide variety of data, as not to unnecessarily limit the utility of these systems. To support the pluggable data adapter, we elected to simplify the cache by formalizing it around what we felt was a reasonable set of information which we believed most queuing systems could be conformed to. This was an understood tradeoff, simplicity at the cost of flexibility, made on the suspicion that most queues could be conformed to the pattern, even if it wasn’t an exact match. This pattern was vetted against all of the current persistent stream providers we support (not all from azure) as well as a couple other queues out there we were aware of (like kafka), and the pattern seemed to hold. We unfortunately didn’t vet that it would work for Kinesis. This was an oversight; Kinesis is a major queuing service we’d prefer work well with our features.
With the above in mind, the first thing that comes to mind is whether Kinesis data can be conformed to the new patterns. I want to be clear, the purpose of a beta is to vet changes, so I’m not ruling out further changes to streams to support Kinesis prior to 3.0 final, but I wish to vet that code changes are necessary.
To explore the specific issue raised regarding stream sequence token, and the sequence number and EventIndex: These values are used to locate data in the cache. What’s critical is that the translation from the queue data into the cache and to the sequence token generate the same values and that the ordering is consistent between the queue ordering information and these values. Do these 128 digit strings have any formal constraints other than ordering? For instance, are they numeric or date time values encoded in string form?
Looks like sequence numbers are numerics in string form..
From: https://docs.aws.amazon.com/kinesis/latest/APIReference/API_SequenceNumberRange.html
Pattern: 0|([1-9]\d{0,128})
If this is the case, we should be able to safely convert them from/to the sequence token's sequence number.
@Frank-Krick, would you please vet this, as time allows?
Hi @jason-bragg,
thanks for the detailed response.
Kinesis does indeed use strings as sequence numbers. But wouldn't a number with 128 digits still be much bigger than 64 bits? The string converted to a number takes 186 bits and a long is 64 bits and an int 32 bits if I'm not mistaken.
We can probably drop 4 Bits, which encode the version in the sequence number but that still would leave us with 182 bits. Maybe we can also drop the shard id which would save 32 bits more. But we'd need a way to reconstruct the shard id in some other way. And even then we'd end up with 150 bits.
Example for a sequence number: 49594381653722342349602542076569927730095950034565595138
The undocumented sequence number setup is here btw.: https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/checkpoint/SequenceNumberValidator.java
I'll have a look at that tomorrow, but my current feeling is that the storage mechanism has to allow for longer sequence numbers.
Thanks,
Frank
Btw. we'd also be interested in getting the kinesis provider upstream at some point in the future once we know how we can integrate it better in the orleans streaming framework.
The sequence number in the token (and cache) need only express ordering, so any portions of the kinesis sequence number not related to that (like version and shard) need not be accounted for. The full kinesis sequence number string, including version and shard, would probably be needed by checkpointing logic, but that should be orthogonal to the token changes.
Having said that, I tend to agree with you that the kinesis sequence number is not well defined enough to safely and reliably pull out ordering information that could be conformed to 'long'.
The reason 'long' was selected is primarily because we wanted to use a value type in the cache for ordering, both to reduce GC issues, and for high performance comparisons.
We could keep ordering information in a byte array obtained from the block pool, which avoids the GC hit, but we'd still take a perf hit from comparing byte arrays rather than 'long'.
Alternatively we could role back the cache to be generic and align specialized sequence tokens to specialized caches, allowing users with their own needs to specialize the cache and sequence tokens as needed, as I presume is the case for the 2.x Kinesis stream provider.
I'll discuss these options with the team. Open to suggestions :)
Update:
Discussed with team. Current plan is to investigate option:
We could keep ordering information in a byte array obtained from the block pool, which avoids the GC hit, but we'd still take a perf hit from comparing byte arrays rather than 'long'.
We think we can keep the perf hit down, and this will allow an open ended set of ordering information. We've not yet committed to having this done by 3.0 release, as this will depend on how the prototyping goes and release pressures.
@Frank-Krick, Do you see any issues with this approach?
That approach sounds good. It should allow us to encode all the possible kinesis sequence numbers. Thanks.
Hi @jason-bragg,
I'm finishing up a kinesis stream consumer implementation for orleans. This includes writing xml documentation for the classes, making the implementation better configurable and writing test cases.
Currently I'm just storing the sequence number as string in my implementation but would like to try and implement a pooled byte array to store the sequence numbers. What pool implementation are you thinking about using? ArrayPool<T>
or some custom implementation?
Also how would you want me to include the implementation into the orleans source tree? Currently I plan to create a new project Orleans.Streaming.Kinesis
in the folder /src/AWS
for the implementation and I'd put the test cases in /test/Extensions/AWSUtils.Tests/Streaming
.
I assume I'd have to add the projects to OrleansCrossPlatform.sln
and Orleans.sln
but I'm not sure how you handle the Nuget packages.
Thanks for your help and let me know how to best approach this.
That's great! Excited to see what you come up with.
Regarding projects, I agree with /src/AWS/Orleans.Streaming.Kinesis for the main package but suggest /test/Extensions/AWS/Orleans.Streaming.Kinesis.Tests for the test project. You can shove them into AWSUtils.Test if it's too much work to have them in their own project, but we're trying to move to a more modular system, wherein each package has it's own test package.
Regarding the pooling, I've been (poorly) prototyping an approach wherein I store the streamId, sequence info, and payload all in a segment from the block pool. Struct looks something like:
public struct CachedMessage
{
public (int Offset, int Count) SequenceTokenWindow;
public (int Offset, int Count) StreamIdWindow;
public (int Offset, int Count) PayloadWindow;
public DateTime EnqueueTimeUtc;
public DateTime DequeueTimeUtc;
public ArraySegment<byte> Segment;
}
In this approach, I'm just reusing the existing pool, and indexing into it for comparisons. A sad and incomplete prototype of this can be found in https://github.com/jason-bragg/orleans/tree/streams/OpaqueSequenceNumber. Be warned, this code doesn't even compile and is going in a direction I'm not yet convinced of.
Regarding how to wire up the new projects for nuget builds, I'm not familiar with how we do that. Maybe @sergeybykov can advise?
@Frank-Krick, A bit further along, core mostly builds now, just a few tests to update, and some known issues with the SegmentBuilder usage, plus cleanup, and per testing. Hope to get it far enough along to push WorkInProgress PR tomorrow, to get more eyes on it. Some parts still a bit hacky, but much of that is related to backwards compatibility limitations.. shouldn't effect a new provider like what you're working on.
Thanks, I'll have a look.
The kinesis receiver is running in our staging environment since this morning and I'll be working on unit tests, cleanup and documentation later today.
We've moved this issue to the Backlog. This means that it is not going to be worked on for the coming release. We review items in the backlog at the end of each milestone/release and depending on the team's priority we may reconsider this issue for the following milestone.
@Frank-Krick Do you have repository committed anywhere which implements PersistentStreamProvider for Kinesis Steams.
🦵🏈
We have an implementation of a persistent stream adapter to consume Amazon Kinesis streams which we are using in production for several month now.
Amazon Kinesis streams have a sequence number consisting of 128 digits (186 Bits) which is much bigger than the attributes SequenceNumber and EventIndex defined in StreamSequenceToken. The changes in #5580 therefore break our stream provider implementation.
StreamSequenceToken seems to be very much specific to Azure streams and it is not apparent how an implementation of StreamSequenceToken should be mapped to other persistent stream providers and it seems to be not a very good choice to tie the StreamSequenceToken to closely with stream implementations from a given vendor.