Closed original-brownbear closed 4 years ago
BTW:
Even the asymptotic case of INT_MAX for the interval looks problematic:
Result "org.logstash.benchmark.QueueBenchmark.pushToPersistedQueue":
345.335 ±(99.9%) 29.489 ops/ms [Average]
(min, avg, max) = (283.847, 345.335, 393.920), stdev = 33.959
CI (99.9%): [315.846, 374.823] (assumes normal distribution)
# Run complete. Total time: 00:00:42
Benchmark Mode Cnt Score Error Units
QueueBenchmark.pushToPersistedQueue thrpt 20 345.335 ± 29.489 ops/ms
~ 46MB/s => this is less than 10% of what my SSD is capable off. The problem goes away for very large event sizes, so it's clearly an issue with the overhead per write and not I/O throughput.
<3 this amazing work.
This is awesome analysis. I'd love to see a POC of append-based checkpoints and how the numbers contrast with this design.
@suyograo the thing is https://github.com/elastic/logstash/issues/7162#issuecomment-302723324 already makes up a close case to that since you still have some sync on page creation. But since I just had 5 minutes to try this out, the numbers for larger page sizes are a little troubling ...
If I run 1GB files and INT_MAX for the checkpointing interval I get this:
# Run complete. Total time: 00:06:49
Benchmark Mode Cnt Score Error Units
QueueBenchmark.pushToPersistedQueue thrpt 20 32.183 ± 2.485 ops/ms
~ 4-5MB/s
So I'm actually slower with larger page sizes by a wide margin (this is kind of obvious if you think about the way mmap()
works, because it has to map a much wider address space incurring a lot more pagefaults).
=> Not sure it's worth improving the checkpointing, I think we need to rather improve our writing itself and design checkpointing accordingly. We simply need to move to standard appends to a FileChannel
/ DirectByteBuffer
combination and calling sync
on the underlying fd
periodically. That allows the OS to sync a much smaller buffer, not driving the paging logic wild :)
I already mentioned this in https://github.com/elastic/logstash/issues/6961#issuecomment-300481437 a little.
From what I remember of the PQ design, the separate checkpoint file (being atomically written, and under the block/page size of the file system) was a design point we chose to do. If we change that, we should be make sure whatever replaces it considers the reasons we designed the checkpoint file the way we did.
We can talk about this more next week I think, I'm in favor of speeding up checkpointing (or redesigning it to be durable in another mechanism than the checkpoint strategy today)
@jordansissel I think my comment above https://github.com/elastic/logstash/issues/7162#issuecomment-302796372 probably shows that just looking into the checkpointing makes little sense in isolation. Even if we fix the checkpointing to be faster, we don't really get to a place all that exciting.
I think my points for a discussion next week would be these:
C - C % ACK_INTERVAL
physically persisted? (that's how I understand it)
sync()
on it's fd
periodically wouldn't it? You could write out some checkpointing/indexing async and in a non-blocking fashion to make recovery easier/faster but reading the file till you hit a broken record would be fine as well.It seems to me that the fact that we're using mmap kind of leads to all the other issues. Especially considering that we map whole page files which are then limited to 2GB in size (created and written to sequentially) => under pressure we're starting to create a lot of files and consume a ton of memory when doing so (as compared to say Kafka where you're pushing to multiple files on disk).
I think we are mixing 2 concerns here:
As @jordansissel said, the checkpoint strategy was chosen for its durability characteristics, a very small file can be written atomically. The checkpoints are necessary to keep track of the acking for at-least-once delivery at the persistence layer. There are certainly optimizations we could do in the checkpointing strategy but there will always be a relatively high cost in doing a checkpoint and this is why is a configurable behaviour.
For the mmaped page files, we can definitely test alternate IO strategies to see if we can improve throughput.
Let's not forget that these tests run in isolation and I believe only test one side of the equation which is the write.
Also, the dev process we try to follow is to first make it "right" then optimize it. I am all for finding potential optimizations but we have to keep the larger context in mind, there is more to it than just the raw IO write throughput.
@colinsurprenant
I think we are mixing 2 concerns here
Yea this devolved a little as admitted above, but in any case the checkpointing strategy and I/O approach go hand in hand when it comes to planning a fast solution. I don't think it makes much sense planning on these independently if one gates the performance of the other.
As @jordansissel said, the checkpoint strategy was chosen for its durability characteristics, a very small file can be written atomically.
But so is a FileChannel.write
to an index file holding the same information for all checkpoints followed by a fd.sync
on its fd (this is probably even safer, since it's much faster to append to an existing file when compared to creating a file and subsequently writing to it, which is an order of magnitude+ slower most likely).
There are certainly optimizations we could do in the checkpointing strategy but there will always be a relatively high cost in doing a checkpoint and this is why is a configurable behaviour.
But see https://github.com/elastic/logstash/issues/7162#issuecomment-302796372, even without checkpointing we're running into a serious upper bound (I'm not sure but single digit MB/s look like they may actually cause functional issues for users in terms of unexpectedly low throughput).
Let's not forget that these tests run in isolation and I believe only test one side of the equation which is the write.
But, the performance bound of this write is the performance bound of the overall system in this case right? Even if your reads are 100x this fast it doesn't matter does it?
Also, the dev process we try to follow is to first make it "right" then optimize it. I am all for finding potential optimizations but we have to keep the larger context in mind, there is more to it than just the raw IO write throughput.
In general I agree (obviously), but when it comes to optimizing these I/O issues, it's a different story in my experience. You need to have a plan and interfaces that that are compatible with Java I/O best practices. One issue I see with the current approach is that it handles each file as a ByteBuffer
interface from the outside. This is not an issue for the in memory pages, but handling a file like this basically requires tracking memory -> disk offset mappings all over the place without ever using them for example. I think some early discussion/action is well worth it here considering that fixing these issues gets harder and harder the more code is piled on top of things?
(e.g. https://github.com/elastic/logstash/pull/6998, which would be a non-issue in a more standard append to file I/O approach, but requires a lot of thought that would be broken by changing the I/O approach)
Maybe you could create new design issue(s) for your persistence re-implementation ideas? these are certainly worth discussing in details.
I've been sick (or family has been) the past 5 days so I've been unable to respond fully. I'm really backlogged, but wanted to get you a rough answer about where I was going with my comment:
Our choice of checkpoint mechanism was made to ensure durability of commits to the queue (both writes and acks)
In general, for any given feature of our software, if we can improve the performance without breaking the feature, I will not discourage the effort :)
If I show any worry or concern about the effort, it is mainly due to us not having tooling available to verify durability properties of the queue, so a performance-focused approach might reduce durability without us knowing it. Something I am keeping in mind, anyway..
because it has to map a much wider address space incurring a lot more pagefaults
I am not seeing this in your data in your description, so it is hard to evaluate and make suggestions.
Do you have vmstat
or perf stat
(linux only) results during these tests? These command would show pagefaults (and other nice activity) during your benchmarks and we could compare the two.
We shouldn't be using MMaped files for small writes.
I'm not seeing data for this in the issue description. Can you provide details?
It would massively faster to simply append to a single file and calling sync() on it's fd periodically wouldn't it
This might be the case, but we should test to verify. As a historical example, ext3 filesystem had this fun property that calling fsync() had some unexpected (to users) bad performance under default configuration.
It seems to me that the fact that we're using mmap kind of leads to all the other issues
Which is this a problem for? checkpoints? page files? both? Can we demonstrate the problems?
consume a ton of memory when doing so
My understanding of mmap may be wrong, but it does not consume physical memory.
we map whole page files which are then limited to 2GB in size
Where does this 2gb limit come from? On my laptop (64bit linux), mmap
takes a size_t
for length, and size_t
is a 64bit value:
size_t x;
printf("%d\n", sizeof(x));
// prints 8
Looking at reads/writes, we will need lseek
to jump to the appropriate head position of a page (or the first un-acked on restart), and lseek
takes an off_t
, which is a 64bit value on my 64-bit linux laptop.
In Java, MappedByteBuffer comes from FileChannel.map which takes a long
for a length.
I might be missing something, but both mmap and read/write approaches appear to me to have the same maximum file size limit (2^63-1) Can you help me understand where the "2gb limit" is?
@jordansissel
Thanks for looking into this :)
If I show any worry or concern about the effort, it is mainly due to us not having tooling available to verify durability properties of the queue, so a performance-focused approach might reduce durability without us knowing it.
I think there is no way to actually verify durability as you yourself pointed out the other day when talking about NFS. IMO we can't really do better than guarantee sync
is called on whatever file/fd we handle, the rest is up to the OS/FS/User isn't it?
Do you have vmstat or perf stat (linux only) results during these tests?
Urgh I'd love to :( but as you know I'm travelling right now and will be travelling again next week which limits me to my Mac. But, I'll try to get a run on GCP/AWS in whenever I find a quiet moment for that :) That said, anecdotal evidence: https://github.com/apache/spark/blob/master/common/network-common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java#L63
Still data turns out to be:
1GB pagesize:
# Run complete. Total time: 00:06:49
Benchmark Mode Cnt Score Error Units
QueueBenchmark.pushToPersistedQueue thrpt 20 32.183 ± 2.485 ops/ms
~ 4-5MB/s
128MB pagesize:
Result "org.logstash.benchmark.QueueBenchmark.pushToPersistedQueue":
345.335 ±(99.9%) 29.489 ops/ms [Average]
(min, avg, max) = (283.847, 345.335, 393.920), stdev = 33.959
CI (99.9%): [315.846, 374.823] (assumes normal distribution)
# Run complete. Total time: 00:00:42
Benchmark Mode Cnt Score Error Units
QueueBenchmark.pushToPersistedQueue thrpt 20 345.335 ± 29.489 ops/ms
40+MB/s
Which is this a problem for? checkpoints? page files? both? Can we demonstrate the problems?
Yes very easily, just run the jmh benchmark with standard settings and 1GB page file size back to back. The difference in throughput is about an order of magnitude on ext4 Linux and OSX for me. (see above, sorry typed this first)
My understanding of mmap may be wrong, but it does not consume physical memory.
We're calling .load
on memory mapped buffers which makes this thing consume memory. Your understanding is not wrong, but Java caches more than a pure mmap
in C
does (well the latter caches nothing, but you get the point :)). Maybe we can remove those .load
calls though, but still Java is a tricky beast here, that's why we're having this hacky discussion #6961 :)
Where does this 2gb limit come from?
This is just a Java implementation detail, it hides mmap
behind the ByteBuffer
interface which only allows you int32
addressing :(
In Java, MappedByteBuffer comes from FileChannel.map which takes a long for a length.
Jup ... not sure why that is even, given that that interface actually throws if you pass in more than INT32_MAX ... for the length (probably some reverse compatibility thing, but the actual values are limited to int
in that API).
The code in JDK8 reads:
public MappedByteBuffer map(MapMode var1, long var2, long var4) throws IOException {
this.ensureOpen();
if (var1 == null) {
throw new NullPointerException("Mode is null");
} else if (var2 < 0L) {
throw new IllegalArgumentException("Negative position");
} else if (var4 < 0L) {
throw new IllegalArgumentException("Negative size");
} else if (var2 + var4 < 0L) {
throw new IllegalArgumentException("Position + size overflow");
} else if (var4 > 2147483647L) {
throw new IllegalArgumentException("Size exceeds Integer.MAX_VALUE");
} else {
IMO we can't really do better than guarantee sync is called on whatever file/fd we handle
We have multiple files right now (multiple pages, multiple checkpoints), and doing things in order is important. It is possible (in my past experience) to create a file, write, sync it, then lose power before the file metadata in the directory inode has been committed correctly, so you technically wrote the data to disk but no directory knows the file exists to the data is lost :\
In our multiple-page-file queue, we can create a new page, write events, sync the page, but not the metadata.
Looking at source code for MappedByteBuffer's force
('solaris' target, couldn't find linux) method, the force
method appears to only call msync
(which is probably-correct in the general case), so we would need to also fsync separately when we create a new page (and also whenever we create whatever other files. Syncing just the buffer isn't enough, I think.
ByteBuffer interface which only allows you int32 addressing
Ahh, then maybe we can use this as an opportunity to remove the 'page size' setting and just hard-code it to 2gb? :)
1GB pagesize: ~ 4-5MB/s .... 128MB pagesize: 40+MB/s
A smaller page size (especially one not much different in size, only factor 8 different in size) seems odd the performace is also factor 8 inversely related to the page size? (1gb:128mb is 1/8th size, 5mb:40mb is 8x). This is confusing behavior. It this what you mean about physical memory usage being a problem?
} else if (var4 > 2147483647L) {
(╯°□°)╯︵ ┻━┻
Ok I guess this makes sense given your other pointers to the java source.
@jordansissel
Syncing just the buffer isn't enough, I think.
Jup me and @robbavey came to the same conclusion talking about this, it's def not the same as as sync
on a fd
.
Ahh, then maybe we can use this as an opportunity to remove the 'page size' setting and just hard-code it to 2gb? :)
I think the current max queue size defaults to 1GB right? :D Not sure I agree with this right now though, 2GB is hell performance wise for the current code. Sadly enough smaller pages are faster with what we have :(
It this what you mean about physical memory usage being a problem?
Yes (memory usage may not be the 100% correct term, address space usage may be better), the issue is simply that you start forcing the OS to address and sync a huge address space, while writing chunks of the order of a few hundred bytes tops in each write. Your writes stay as fast as they always are but the size of the address space blows up the cost of all sync operations => jup this is a hyperbola I think :)
2GB is hell performance wise for the current code.
And if we remove the mmap-memory-overhead you are describing, does 2gb page size sound reasonable as a default?
@jordansissel I like 2 GB, especially with us not running any compactions (yet) larger == faster imo. Kafka has a 1GB default too, that seems like a reasonable validation also :)
checkpointing should probably go to some append mode index file.
I'm open to this as an implementation model, if I can try to restate this specific checkpointing proposal:
1) Replace current checkpoint model (small file written atomically) with a log-style append model. This would remove the cost of the atomic checkpoint file creation. 2) We can reuse/keep the checkpoint serialization 3) Reading a checkpoint means seeking the last correctly-written checkpoint in the file and starting from there. 4) at creation of any file, immediately call fsync() before using it.
(Also a given requirement that we need to support users upgrading with an older checkpoint/queue disk format to coexist with the new one, assuming we change anything)
@original-brownbear Can you write up a design proposal for your checkpoint, and, at your option, your page file (switch from mmap to use normal seek/read/write) strategy changes?
Once we have that, I think it'll be easier to digest than this longer issue, and we can get some feedback on that. There we can also discuss and address compatibility and upgrade scenarios for existing PQ users.
@jordansissel will write it up soon then :)
I would like us to take a step back here and reflect on the larger problem this issue is creating here. Performance and benchmarks don’t mean much without looking at them in their proper context. We were/are very aware about the performance tradeoffs and consciously decided to first focus on coming up with a model that works with the required characteristics and after optimize. That is how we work, first make it right then optimize, rince, repeat. Some work has been done to identify performance characteristics of the PQ but we decided to focus on other, what we deemed more important, issues because performance-wise, it seemed “good enough”, for now.
It is totally OK that we play with this and identify and document what we think might be performance bottleneck. These are all good findings and yes, let’s make sure we keep track and continue to analyse.
But what is not OK here is that a single write throughput benchmark leads almost directly into engaging into a persistence model redesign.
While we definitely want to embrace initiatives, there is a high cost associated with such a thread. We are going very rapidly into a performance rabbit hole involving a lot of investigation, discussion, analysis but for what exactly? Are we conscious about the cost of going down that rabbit hole and the associated potential benefits? Let’s be clear here, we are in fact revisiting the whole persistence/durability model, we are not just talking about low hanging fruits optimizations. I for one, don't know that the first phase of "make it right" is done.
If we agree on starting a project for PQ 2.0 with a redesigned model for the sake of improving some performance characteristics while 1.0 is barely out and we haven’t even really started to get appreciable usage feedback from our users then let’s make sure this is a conscious decision.
It seems to me that we are re-designing the racing car after its first few laps - lets gather telemetry data for a bit.
Lets not forget how the PQ feature unfolded. Here is some of what happened.
We knew we were not designing a low latency, lock free, high performance Queue.
We worked with @bleskes to build version 1 using battle tested mechanisms shamelessly thieved from Lucene - with simplicity as the motivator.
We acknowledged that the Queue is rarely the bottleneck in a LS pipeline.
We acknowledged that a refactor/redesign was inevitable at some future time.
But what is not OK here is that a single write throughput benchmark leads almost directly into engaging into a persistence model redesign.
I think I understand your feelings here. I want to note strongly that we haven't made any decisions to change anything.
Maybe there are things to improve, maybe there aren't. It's not harmful to ruminate or discuss these ideas. There is no requirement to change the code. The only expected outcome of this discussion, for me, is that by engaging with this process, I will learn something. Maybe we will learn new things about Java IO (like mmap limited to 2gb, as I learned yesterday, or that it's occupying JVM heap?). Maybe we will find reinforcement that our current design is pretty solid!
There's a lot of exciting possible outcomes of this scenario, and none of them (for me) are code changes. It is an opportunity to explore, build teamwork, share knowledge, and practice communication.
Here's an example where no decision is made, no code changed, and yet, I feel happy with the outcome so far -- In responding/reading this issue, it reminded me of why I personally have anxiety about changing the PQ -- because we can't easily verify durability properties. Then I remembered two things -- durability is given by fsync/msync/fdatasync, and I remembered that I've used FUSE in the past. We could simulate power failures with FUSE by forgetting any data that is not sync'd (so close
would forget it, etc). This is not a solution I had thought of before, and it may not even satisfy the durability testing needs, but it is at least a line of thinking towards providing programmatic validation of our design, and I am happy for that.
I totally agree with @jordansissel above. I think the discoveries here shouldn't force any changes right away.
What they should result in is a realization of existing issues (imo) and limitations of how far we can go with the approach in general. We're planning further features based on PQ (replay mode, move to distributed logic) that are, as far as I can see, impossible to achieve in a scalable manner using the existing approach due to it's performance characteristics.
So if the current design precludes these features, isn't all effort invested toward it from this point forward just wasted? This doesn't mean that redesign has to happen right now, but that is has to happen before moving to feature X to avoid the overhead of having to refactor X and PQ then?
@jordansissel Obviously, I do agree with what you are saying and in essence this is what I tried to convey in
It is totally OK that we play with this and identify and document what we think might be performance bottleneck. These are all good findings and yes, let’s make sure we keep track and continue to analyse.
And this is not what I tried to raise as the problem. I'll try to recap what I see:
There are a few problems here. First we green light a POC for something that seem trivial but is in fact pretty much a rearchitecting of the design, all of this based on a micro-benchmark and without taking into account the larger context.
What I would like to see is that we can scope this discussions and not just shoot in all directions "just because".
@original-brownbear
We're planning further features based on PQ (replay mode, move to distributed logic) that are, as far as I can see, impossible to achieve in a scalable manner using the existing approach due to it's performance characteristics.
This is what I mean when I say "shooting in all directions". How can you jump into such a conclusion «impossible to achieve in a scalable manner using the existing approach due to it's performance characteristics» right here, right now? You may be right, may be not, but this is not the point.
We are jumping the gun and not correctly focusing here by not scoping the discussion. Not all problem can be solved all-at-once, this is why we iterate.
One must also bear in mind here is that via the WriteClient/ReadClient pairs the Queue is designed to be drop-in replaceable. This means that we could work on PQ vN without needing more than bug fixes for vN-1.
Below I list a few meaty concepts I think we might sink our collective nashers into.
/cc @jsvd - the above two bullet points
@original-brownbear, @suyograo, @colinsurprenant, and I met to discuss this on video.
We worked to figure out next steps, and agreed (I think?) on the following:
Since changing the checkpointing strategy will probably involve re-thinking the overall design I would strongly suggest any new design ideas are first discussed in separate design issue(s).
To that effect I will try to improve the current design issue #5636 and clarify the details of the checkpointing strategy. In a nutshell, so this can be kept in mind in further discussions, the basic idea is this:
.force()
a written byte to the mmap buffer will be written to the file regardless if the process crash before a .force()
call. This allows us to provide our optimistic recovery feature to balance durability vs throughput in process crash situation (not pull-the-plug). queue.checkpoint.acks=X
setting is for, it guarantees that a maximum of X
already acked events might be replayed as duplicates in a crash recovery scenario.Other notes:
Closing - no followup is planned about this in the foreseeable future. We will reboot if need be.
Running the benchmark
org.logstash.benchmark.QueueBenchmark#pushToPersistedQueue
with.checkpointMaxWrites(1024)
produces:~9MB/s
While running at
50_000
writes before ack comes with:~40MB/s
This is measured on EXT4 and HFS with relatively similar results. Especially on EXT4 backed by an SSD this is not reasonable since it has local fsync (HFS probably does too, but I'm not sure about the specifics). Basically you could argue that the bulk of the runtime goes towards checkpointing. Moreover the overheads we generally have when writing are pretty high, given that I can ramp up the I/O throughput to the disk limit of ~600MB/s by using absurdly large objects.
We should make checkpointing more transparent. I'll add/edit in a few suggestions soon, but long story short:
1024
intervals ... checkpointing should probably go to some append mode index file.