Closed gouthamve closed 2 years ago
I love this trade-off: It's enough for our use cases.
I think there is also an opportunity to take it even further and periodically create new blocks if those are before WAL time completely. 🤔 It does not make sense to tie this to the WAL cycle in that sense.
This is definitely meaningful! I am investigating using Prometheus as IoT DB, but since Promeeus does not support push old data, So I am using Victoriametrics for prometheus remote write. When Prometheus supports this feature, I will use Prometheus and Thanos.
I tried doing a dirty proof-of-concept implementation for this, and this is what I got: https://github.com/prometheus/prometheus/compare/main...colega:poc-out-of-order-series
The `OOOHead` implements a `BlockReader` interface so it's compactable as any other blocks. ## Memseries & ooo chunks Then, inside of the memseries the are two heads: the usual one and the one collecting out of order samples:
When a sample arrives and it's OOO, we push it to the OOO head chunk, which uses the same XOR encoding as usual, but I had to make a small adjustment to support negative deltas. There's still room for improvement there though. Whenever we decide that the head chunk has to be mmapped (question: when? at 120 samples? see questions at the end), we cut a new head chunk, we sort the one we had and we write it to the disk using a separate mmapper that writes into a separate folder. Notice that the head chunk doesn't have the samples sorted so it can't be queryable (at least not straightforward: we could however build sorted head chunk on the fly when querying in the future), however the mmpapped ooo chunks could be perfectly queryable. I didn't do this in the PoC. ## Compaction Now our Head(s) look like this:
I've added a `WriteCombined` method to the compactor, which is like `Write` but it can take multiple blocks, taking advantage of the functionality already present for the compaction itself. We also take advantage of the `allow-overlapping-blocks` flag, needed for all this functionality, as OOO samples inevitable overlap existing blocks. Once compacted, the OOOHead block may overlap the existing huge blocks, and that would compact them (right now, see below) into a one huge block:
And here's where the the ooo data becomes queryable in the PoC implementation.
I think that it should be done whenever one of this happens:
PrepareForCompaction()
)
OOOHead
vs []OOOHead
When mmapping the chunks, we could split them into smaller chunks that properly align with existing blocks, and avoid overlapping several blocks, so their compaction doesn't merge existing blocks.
We would aim for this scenario:
After each commit we could ask the head which is the set of OOOHeads it has (each OOOHead would point to a set of mmapped chunks within a given block range) and start querying them and writing them into storage.
What happens if someone sends a sample that is 1 month old? Could that trigger a re-compaction of an already huge block? We could limit that by limiting the window within which we accept samples, like for instance 12h back from now.
On the other hand, we persist the ooo chunks coming from the head as tiny OOO blocks on disk, that overlap with existing bigger blocks, and set them a mark as they come from an OOO write: then delay the merging for those ones for some amount of time (maybe more samples arrive?) or even delay them until blocks have to be naturally merged by their age. All of this would take advantage of the allow-overlapping-blocks
feature that already knows how to merge multiple blocks when querying.
Depending on the needs, we can compact earlier or later:
I haven't looked into isolation at all, but it should be even easier than with the usual head, since we assume that the data isn't queryable for a while after ingestion. We just need to be careful and not persist the non-committed writes into disk.
We let our samples go though the WAL as all the samples, so we don't need extra work to ensure durability, actually, we need the opposite: how to cleanup the rests of a crash.
Current solution relies on disk mmapper just to remove chunks that are older than the head start, however we can't rely on that anymore (that's one of the reasons why we need a sepate disk mmapper pointing to a separate folder).
We need a way to tell: this chunk has been already persisted into a block. Since blocks have metadata on their parents, we could have a unique ID within the chunk (if there isn't one yet, haven't checked) and point them there. Then on TSDB start we could clean up the OOO chunks that have a block created from them.
Honestly, I didn't go into the rabbit hole of chunk adapters etc., I just assumed that chunks are completely rewritten from scratch when two blocks are verticaly merged (I can't see how it would work otherwise) so that's something we would be taking advantage of here too.
I would like to hear opinions on this, and see if there's actually any future in this approach, I'm most likely skipped some topics that have been previously discussed in terms of OOO ingestion, and this is my first interaction with the TSDB code so I also could have missed a lot of details, so any feedback is appreciated.
@colega thank you for working on this! Your approach is definitely promising and worth exploring. I will take a look at open question in coming days.
@gouthamve you had mentioned that you are interested in tackling this. This above approach is worth considering.
Nice, worth checking definitely 💪🏽
We at Grafana Labs are planning to invest some time at building a solution for this, something along the lines @colega described above. We will propose a design for this soon. cc @Dieterbe
As we speak in the storage working group I think @Dieterbe and @colega are trying to solve a different problem. Not beyond WAL ingest, but actually short term out of time append 🤔
Last time I discussed, it was about ingesting parallel out of order stream with each stream getting samples in order (maintained per series). So it could be years old as long as those stream of samples are coming from oldest to newest order (and upto some X OOO streams per series). So it would not handle the stream of samples that are coming in reversed order (because each sample would create a new OOO stream).
This is a small change over @colega's description here which involved a single OOO chunk per series. The above new approach was chose to be prototyped first because what we saw was users need the samples to be queries soon in most cases, and while the samples are coming out of order/bound, they are usually coming in order for that brief period in the past, so optimising for multiple OOO streams with sorted chunks in each stream makes efficient use of compression and immediately available for efficient query since chunks already sorted.
All this is still to be tested, above are only heuristics.
@Dieterbe @colega could you please briefly describe the approach that you plan to prototype? (or if anything more to be added or fixed from what I said above?)
Since the prototype post from @colega we have been refining our thinking a bit (and this process is not quite complete), but at this time, it looks like we can say: 1) we're more interested in solving "data within last x minutes or y hours coming in out of order" (I plan to get more accurate numbers for x and y over the next few weeks, but possibly y<=1). For data that's "very old" (many hours or more) we can already import blocks. 2) having data query-able is preferable for us rather than not query-able. (even when that comes at a higher resource cost), which means we want to be able to have the samples sorted
We have a couple ideas, that need some refinement but basically we want to avoid duplicating structures/information (i.e. do what we need within memSeries
so we track the labels for a series only once, even if a series is in order and out of order "simultaneously"), and then tracking the actual OOO samples using any of the following (or a combination thereof):
Note that there's a variety of ways to combine these approaches (e.g. a ROB that is sized large enough (or is dynamically resized) to cover all OOO samples, with a sorted chunk behind it. Or have it cover the brunt, but not all of the OOO samples, and combine it with set of sorted chunks "behind it", or do infrequent decode-recodes when inserting OOO samples in it that don't fit within ROB)
I want to refine these ideas more. What I mean with that:
I imagine this will require a few steps of prototyping and iteration based on what we observe with real data..
I'm very new to the TSDB code base, but eager to learn, and i'll definitely appreciate help from both of you TSDB maintainers :-) (PS: it looks like @colega will work on something else and i will work on this project)
We (@jesusvazquez, @Dieterbe, @codesome) are currently working on a MVP for this as part of Mimir. Our ongoing work lives in the out-of-order
branch of grafana/mimir-prometheus
(still in early stages, nothing usable yet). We will share an outline of how we are designing the solution and the extensions done to the TSDB.
We took this route for faster feedback loop; we will dogfood the implementation via Mimir in our production clusters in coming months when the MVP is ready, and once we know that things are stable enough and the design is accepted here, we will open a PR against prometheus/prometheus to move our implementation to upstream.
Hello everyone! We finally have something to show on the out of order topic.
We've been working on a design doc for the last couple of months, we would appreciate if you had the time to review and comment on the proposal https://docs.google.com/document/d/1Kppm7qL9C-BJB1j6yb6-9ObG3AbdZnFUBYPNNWwDBYM/edit?usp=sharing
As @codesome pointed out above the work is public in our prometheus fork mimir-prometheus https://github.com/grafana/mimir-prometheus/compare/out-of-order and it will be eventually upstreamed to prometheus once we've succesfully tested it in our Mimir production clusters.
Can you submit the doc to https://prometheus.io/docs/introduction/design-doc/ ?
An update: We have the design doc implemented and working in this branch. We have tested it in our dev env via Mimir for a few weeks now. We will be doing more larger scale tests soon.
Is there any update about isolation support?
On 01 Jun 15:28, Ganesh Vernekar wrote:
An update: We have the design doc implemented and working in this branch. We have tested it in our dev env via Mimir for a few weeks now. We will be doing more larger scale tests soon.
-- Reply to this email directly or view it on GitHub: https://github.com/prometheus/prometheus/issues/8535#issuecomment-1144205814 You are receiving this because you commented.
Message ID: @.***>
-- Julien Pivotto O11y - https://o11y.eu/
We are not planning to support isolation in the first iteration since the out of order support without isolation itself is a little complex. We will take a dig at it once the first iteration is stable.
:wave: Quick update about our progress here
We've been running out of order support for up to 2 hours of old samples with a customer for the past 3 weeks.
Some rough numbers are 4000 samples out of order samples per second ingested at peak, around 60k series with 15 second scrape interval.
We plan to open PRs in prometheus repository in July once this is completely stable.
Proposal
Prometheus now accepts remote write data. But it can't ingest data more than an hour old reliably. This means if there is an outage or network partition and the downstream Prometheus has issues with pushing for more than an hour, it is likely that there will be data loss after the issues are resolved. Cortex and Thanos, which consume TSDB have the same limitation.
We should solve this issue in TSDB, and a very interesting trade-off was presented at the storage working group. When ingesting data that is outside the current head block, we don't need to make it immediately available for querying. We could write to a log and compact it in the background before making it available for querying. I think its a fair trade-off that wouldn't use too much extra resources. I'm curious what others think!