Open AlanConfluent opened 4 years ago
There are two main solution that we have thought of to lower the overhead in computing lags. At the moment the main issue is that we call Kafka to get offsets every time we want to send lags, which, if done often, could overwhelm Kafka. If we can make this much cheaper, we can do it more often and therefore make the lags more real-time. Here are the current ideas:
Get offset from Kafka Consumers: Within the Streams implementation, there's a Kafka consumer reading from the changelog topic associated with each state store. This consumer is being constantly updated as it reads records and it could expose these with no additional calls to Kafka. This would be a drop in replacement for the current implementation as each host would have reported current and end offsets, and we would take the max of all reported end offsets to determine the value to use in lag computation (as we currently do).
Active reports end offset: Since the active is writing to the changelog, it determines what the end offset is. It could use the lag reporting REST calls we currently do within the cluster to report the end offsets. Meanwhile, other hosts could report their current offsets (which like 1. need to be exposed in Streams) and combined with the active-reported end offset, lags could be computed for all hosts.
When considering the two choices, we need to consider the scenarios of a new ksqlDB node addition to the cluster, removal from the cluster, getting bounced, partition from Kafka, and accuracy.
Addition
Removal
Bounce
Kafka Partition
Accuracy
My opinion is to go with option 1, given that 2. is a little more complex in that it requires the active host to behave differently to the standbys. Meanwhile, the only thing you get is a slight bit of additional accuracy since it special-cases getting the end offset from the active.
Option 2. Also requires that the active be working to get new reports of the end offset (nothing would be writing the changelog topic anyway if it was down, so the point is a bit moot). But there could be scenarios where nodes had no cached values and couldn't answer the question of lag until the active came back up.
In the comment above.. do 1 and 2 under each section (Addition, Removal,...) etc describe how option 1 and 2 would behave?
nothing would be writing the changelog topic anyway if it was down, so the point is a bit moot).
Real issue would be when active is still alive, just can't talk to the router.. Here it's writing to the changelog and the real end offsets are moving.. but router only has a "stale" end offset from the last time active reported it.. if we compute lag based on that (time or message lag, does not matter), then we could incorrectly allow pull queries to execute when they should have been failed (i.e standbys are reporting their consume positions, lag is accepted w.r.t stale endoffset we have, but not the real endoffset).. We could add a mechanism to fail the pull queries if the end offset were not refreshed for X secs, still won't theoretically eliminate the scenario above.. In short, to keep supporting the current semantics (i.e fail my query if estimated lag > threshold), option 1 is the better approach IMO.
If what we build here is just an optimization to correctly pick the most caught up replica each time (i.e just route in order of greatest endoffset) and leave the semantics of correctness to an upper layer effort (e.g end-to-end state freshness which is probably what the user wants), we can just simplify our implementation to even just share the position.
Concretely, next steps could be
In the comment above.. do 1 and 2 under each section (Addition, Removal,...) etc describe how option 1 and 2 would behave?
That's what I was trying to do is find where each might break down under these common operations.
Real issue would be when active is still alive, just can't talk to the router.. Here it's writing to the changelog and the real end offsets are moving.. but router only has a "stale" end offset from the last time active reported it.. if we compute lag based on that (time or message lag, does not matter), then we could incorrectly allow pull queries to execute when they should have been failed (i.e standbys are reporting their consume positions, lag is accepted w.r.t stale endoffset we have, but not the real endoffset).. We could add a mechanism to fail the pull queries if the end offset were not refreshed for X secs, still won't theoretically eliminate the scenario above..
Yeah, above I didn't consider partition between hosts in the cluster, but that's possible. As you're saying, I think that's where expiring lag info comes in. That, coupled with failing to route when no data is available will create the failing pull query we want in this partition scenario.
In short, to keep supporting the current semantics (i.e fail my query if estimated lag > threshold), option 1 is the better approach IMO.
I agree. I might not have been clear in my previous comment, but I don't think option 2 gets much benefit for the additional complexity and the fact that it has a single point of failure for reporting end offsets.
If what we build here is just an optimization to correctly pick the most caught up replica each time (i.e just route in order of greatest endoffset) and leave the semantics of correctness to an upper layer effort (e.g end-to-end state freshness which is probably what the user wants), we can just simplify our implementation to even just share the position.
I agree, though currently, we don't order based on offsets. We should probably do that change. Also, I assume you mean greatest currentOffset, right?
Concretely, next steps could be
- We could explore option 1 feasibility (if it's not feasible, then it makes our options clear anyway)
Will explore this in Streams. @guozhangwang Do you have pointers on where I can find the consumer offsets in streams code?
- As others scope out the higher level consistency model/refine the semantics, we then take a look at what the requirements are for this lower level infrastructure we have (i.e pick the replicas that are live and most caught up)
I agree.
I can help out here with option 1. I did some digging before.. Option 1 would be a change in KafkaStreams#allLocalStorePartitionLags
. Today we do a adminClient.listOffsets(...)
to fetch the endOffsets, while the positions/consume offset is fetched from streams threads' memory..
Each streams thread contains a KafkaConsumer to read from changelog topic.. and the key idea here is to make a public API change in KafkaConsumer#endOffsets()
and use that for end offsets, than relying on what we do today..
If you trace the consumer code for endOffsets()
you will find that it delegates to
fetcher.endOffsets(partitions, time.timer(timeout));
and this fetcher has the endOffsets for the topic partitions that it's fetching already in memory in Fetcher#FetchManagerMetrics#recordPartitionLag
(this is how it exposes a JMX metric for lag; you can hookup jconsole to a ksql server and you can see these mbeans).. As fetcher reads records for the partitions it's been subscribed to, Fetcher#fetchRecords()
maintains the lag using the code snippet below
Long partitionLag = subscriptions.partitionLag(completedFetch.partition, isolationLevel);
if (partitionLag != null)
this.sensors.recordPartitionLag(completedFetch.partition, partitionLag);
For purposes of exploring option 1.. We can just first try to understand this code path, make some PoC code changes to fetch out this endOffset via a public API.. engage with AK devs to see if this can be supported..
P.S: There could be races, caveats for this working reliably, that I also have not fully explored.. This is just a promising direction at this point is all..
I'm wondering in the long run, should we be collecting lags only between active-standby tasks, also should we consider giving user a more flexible knob to control their trade-off between data staleness v.s. availability.
1) Today the user specifies a single value as in "tolerance" in terms of number of messages, measured by comparing the active and standby tasks. Let's say a user specifies she's willing to tolerate N messages, and then when a query response is returned, she only knows that this answer is no more than N messages stale compared with the active task. However she does not know if it is N-1 messages behind, or if it is just 1 message behind.
2) If the user is developing an app that would insert into the source topic and then query the derived views, the "lag" would not only include the lag between the active and its corresponding standby, but also include the lag when the record is fetched from the source topic, processed, sent to repartition topics, and eventually updated the active task as well. Today this lag is not exposed for users to tradeoff.
3) In addition, quantifying lag by the num.messages is less intuitive to me compared with quantifying by time difference --- by tolerating N messages, under high traffic it could mean lagging by a few seconds or even sub-second, while under low traffic it could mean lagging by minutes or hours.
So I think in the long run, in terms of letting users to specify a global config about her tolerance in staleness quantified by num.messages, we should allow a user to specify, per-query, on how much lag in terms of time difference comparing with the originate time (i.e. when the original record that derived the updates in the queried state) she's willing to tolerate, and upon processing we could hold the query for a while if necessary to satisfy her specific "freshness" requirement -- and whether we should ask active or standby is per implementation details and abstract away from the user. And in the short term, the first thing we could do is to associate each query result with the timestamp of this result's snapshot, so that a customer can basically infer herself whether this answer is good enough to be used or not.
I would agree with you on need for improving additional information returned on query result (1), lag-wrt-source (2) and (3).
But I think having a more real-time lag information is an orthogonal problem to solve. It's a routing optimization that
A) lets us pick the replica that is most ahead (instead of broadcasting query to all replicas, multiplying the amount of I/O on the cluster) B) lets us avoid wasteful network I/O to not even contact replicas that are lagging by a lot.
I feel A & B are important at scale. A could be built using just offset positions of active/standbys. B needs the endoffsets to derive some lag metric..
the first thing we could do is to associate each query result with the timestamp of this result's snapshot, so that a customer can basically infer herself whether this answer is good enough to be used or not.
This can be done. @AlanConfluent , @vpapavas we had an issue for this? (return additional information ...). We probably need a API from Streams that just hands the currentPositions allLocalStoreCurrentPositions()
that provides local stream time and offset position. (ala allLocalStorePartitionLags()
.
That said, I think always forcing an user to do this post filtering, may not be ideal.. We need to then explain staleness etc to all users, increases the bar to understand the system and so on. My guess is we will have to support this, in some form within ksqlDB (server/client)..
I agree with Guozhang that reasoning in offsets is hard and reasoning about just one leg of the topology is not very meaningful to the end user. They want to compare the source timestamp (i.e. when they wrote to a source DB or Kafka) to the timestamp of the data they read to know if this contains the written data.
But I think having a more real-time lag information is an orthogonal problem to solve. It's a routing optimization that
The thing is, that as Vinoth mentioned, this is more about doing coarse system optimizations (routing) than it is about giving the end user something to reason about.
A) can be accomplished with timestamps, but B) with timestamps makes less sense. If you said "lagging X seconds behind", what are you comparing as the end timestamp, now or the timestamp of the last record? If now, that doesn't make sense if no records have been sent recently, and if the last record, then the write times may determine the duration lag more than the system freshness. In this case, offsets make more sense for accomplishing B).
I can see the argument that in the future where we use timestamps everywhere, filtering as we do with B just isn't won't be required with queries with a "as of X time" tacked on. The issue is with queries that don't have it and we have to choose a reasonable default. Then we're back to solving B) and timestamps don't really make sense.
The issue with doing B) with offsets is that it requires some internal systems knowledge to set a reasonable lag cutoff and yet we're asking the end user to understand that. Maybe we just need to have staleness levels LOW, MED, HIGH that map to fixed lags so that it's easy to understand?
This can be done. @AlanConfluent , @vpapavas we had an issue for this? (return additional information ...). We probably need a API from Streams that just hands the currentPositions
allLocalStoreCurrentPositions()
that provides local stream time and offset position. (alaallLocalStorePartitionLags()
.
I think this is as close as we get: #4472
That still doesn't allow the system to make decisions itself around routing if it defers all decision making to the client who is the sole one who can interpret staleness.
That sounds great.
I've also revamped the original one-pager which I hope have addressed your comments in the old doc, more comments are appreciated before we do another round-table deep-dive: https://confluentinc.atlassian.net/wiki/spaces/KSTREAMS/pages/1185023285/Streaming+Staleness+Completeness+and+Consistency
Is your feature request related to a problem? Please describe. Currently, lag fetching requires a call to Kafka which is fairly heavy weight and takes a while, so we only do every so often. It would be better if these could somehow be known in a lower overhead manner.
Not sure if this would be different than asking for it and caching it at the application level, which is what we do now.
Describe the solution you'd like Not sure what the solution is yet.
Describe alternatives you've considered Asking for it and caching at the application level, the current solution.
Additional context Add any other context or screenshots about the feature request here.