Closed sakno closed 3 years ago
@potrusil-osi, you was right about SkipAsync
. The bug in StreamSegment.Position
property setter. Now it's fixed. Also, the second minor performance improved is also pushed to develop branch. Command identifier used by Interpreter Framework is now a part of log entry metadata and doesn't require serialization. However, there is another side of this change: log entry representing snapshot must not be passed to the interpreter. Snapshot doesn't have its own identifier.
@sakno, it is unfortunate that I cannot pass snapshot to the interpreter anymore. I currently use it to store the in-memory state and the snapshot command is how it is often initialized. It is easy and elegant. Isn't there a way how to work around the limitation?
Could you please provide a simple example of how you use custom command type for the snapshot? Do you serialize identifier manually inside of SnapshotBuilder
?
@potrusil-osi , support of snapshot entries returned back. CommandHandler
attribute has special parameter indicating that the marked method is a handler for snapshot log entry:
[CommandHandler(IsSnapshotHandler = true)]
public ValueTask HandleSnapshot(SnapshotCommand command, CancellationToken token)
{
}
Thanks for such a fast response! I guess you don't want me to give you the example anymore... But no, I was no serializing the identifier inside SnapshotBuilder
- it just used its own instance of CommandInterpreter
to apply individual entries (including the previous snapshot) and then wrote the SnapshotCommand-based entry at the very end. It worked.
After getting the latest changes (and updating the CommandHandler for snapshot) I'm consistently getting this exception in a follower:
Unhandled exception. DotNext.Net.Cluster.Consensus.Raft.PersistentState+MissingLogEntryException: Log entry with relative index 1 doesn't exist in partition ...\Logs\partition-0\sp-0\0
at DotNext.Net.Cluster.Consensus.Raft.PersistentState.Partition.ReadAsync(StreamSegment reader, Memory`1 buffer, Int32 index, Boolean refreshStream, CancellationToken token) in /_/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/PersistentState.Partition.cs:line 86
at DotNext.Net.Cluster.Consensus.Raft.PersistentState.ReadAsync[TResult](LogEntryConsumer`2 reader, DataAccessSession session, Int64 startIndex, Int64 endIndex, CancellationToken token) in /_/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/PersistentState.cs:line 170
at DotNext.Net.Cluster.Consensus.Raft.PersistentState.ReadAsync[TResult](LogEntryConsumer`2 reader, Int64 startIndex, Int64 endIndex, CancellationToken token) in /_/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/PersistentState.cs:line 234
at DotNext.Net.Cluster.Consensus.Raft.RaftCluster`1.<DotNext.Net.Cluster.Consensus.Raft.IRaftStateMachine.MoveToCandidateState>g__PreVoteAsync|76_0(Int64 currentTerm) in /_/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/RaftCluster.cs:line 750
at DotNext.Net.Cluster.Consensus.Raft.RaftCluster`1.DotNext.Net.Cluster.Consensus.Raft.IRaftStateMachine.MoveToCandidateState() in /_/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/RaftCluster.cs:line 717
at System.Threading.Tasks.Task.<>c.<ThrowAsync>b__139_1(Object state)
at System.Threading.QueueUserWorkItemCallbackDefaultContext.Execute()
at System.Threading.ThreadPoolWorkQueue.Dispatch()
The 0
partition file mentioned in the error is completely empty (unlike the same file in the leader).
Physical layout of log entries on the disk in 3.1.0 is not compatible with previous versions. As a result, if new PersistentState
uses files generated by previous version then you may have unpredictable results. I'll include this fact in release notes. Currently, you need to delete the entire directory with log entries and snapshot.
I delete the logs every time before my tests. I'll keep digging myself what could be causing it...
Oh, binary form of data representation is efficient when transferring over the wire but very fragile and require a lot of testing... I fixed issue caused invalid calculation of Content-Length
header when transferring log entries over the wire. Probably, corrupted stream of log entries can cause error you described above. Fix is pushed.
The last fix seems to fix the error. But I'm getting new errors now.
This one occurs periodically on the node that is the very first leader:
System.Net.Http.HttpRequestException: Response status code does not indicate success: 500 (Internal Server Error).
at System.Net.Http.HttpResponseMessage.EnsureSuccessStatusCode()
at DotNext.Net.Cluster.Consensus.Raft.Http.RaftClusterMember.SendAsync[TResult,TMessage](TMessage message, CancellationToken token) in /_/src/cluster/DotNext.AspNetCore.Cluster/Net/Cluster/Consensus/Raft/Http/RaftClusterMember.cs:line 132
And this occurs on startup if I do not clear the logs from the previous run:
System.ArgumentException: Log entry must have associated command identifier (Parameter 'entry')
at ...SdsQueueAuditTrail.ApplyAsync(LogEntry entry) in .../SdsQueueAuditTrail.cs:line 32
at DotNext.Net.Cluster.Consensus.Raft.PersistentState.ApplyAsync(Int64 startIndex, CancellationToken token) in /_/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/PersistentState.cs:line 812
at DotNext.Net.Cluster.Consensus.Raft.PersistentState.ReplayAsync(CancellationToken token) in /_/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/PersistentState.cs:line 857
at DotNext.Net.Cluster.Consensus.Raft.RaftCluster`1.StartAsync(CancellationToken token) in /_/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/RaftCluster.cs:line 354
at DotNext.Net.Cluster.Consensus.Raft.Http.RaftHttpCluster.StartAsync(CancellationToken token) in /_/src/cluster/DotNext.AspNetCore.Cluster/Net/Cluster/Consensus/Raft/Http/RaftHttpCluster.cs:line 127
at DotNext.Net.Cluster.Consensus.Raft.Http.Hosting.RaftHostedCluster.StartAsync(CancellationToken token) in /_/src/cluster/DotNext.AspNetCore.Cluster/Net/Cluster/Consensus/Raft/Http/Hosting/RaftHostedCluster.cs:line 88
at Microsoft.Extensions.Hosting.Internal.Host.StartAsync(CancellationToken cancellationToken)
Fixed. Command id was not passed over the wire correctly.
Everything works well now. Thanks!
@sakno, after all the optimizations I'm getting much better numbers... The suggestion to have a buffer with a different size during compaction (described in #56) still holds. I have modified dotNext locally to give me the option and writing to the snapshot file is much faster thanks to that. I set the buffer to approximately the size of snapshot.
Great news! Currently I'm working on buffered network I/O that allows to copy log entry content passed over the network before calling AppendAsync
. This will reduce lock contention time between replication process and user code calling AppendAsync
manually. Additionally, your suggestion has been added to the task list.
Did you change the buffer size passed to FileStream
constructor? There is may be another issue. I'm using FileOptions.WriteThrough
to bypass any intermediate buffers. For me, it's strange that bufferSize
has any sense. Could you please remove WriteThrough
flag and check performance again? Without this flag, I/O is less durable however it might be more performant. If so, I'll add this flag to configuration of PersistentState
.
I'm getting about 50% performance increase without that flag! I wonder if I need to test everything again to figure out the ideal buffer sizes.
I guess without WriteThrough flag the size of the buffer has small impact on performance.
Now WriteThrough
flag is disabled by default but can be enabled using Options.WriteThrough
property. Changes are pushed to develop branch.
Buffer sizes are now separated in Options
class.
@potrusil-osi , I'm finished buffering API for log entries. It has two use cases:
AppendAsync
from user codeAppendAsync
by transport infrastructureIn case of Interpreter Framework, you can use it easily as follows:
var entry = interpreter.CreateLogEntry<MyCommand>(...);
using var buffered = await BufferedRaftLogEntry.CopyAsync(entry, options, token);
log.AppendAsync(buffered, token);
This make sense only for large log entries. Also, you need to experiment with settings from RaftLogEntryBufferingOptions
class which instance must be passed to CopyAsync
method. It has MemoryThreshold
option. If the size of log entry is larger than this threshold then disk persistence will be used for buffering. This is relevant for transport-level buffering because read from the disk is faster than read over the network but probably make no sense for custom log entries acquired constructed by Interpreter. As a result, use buffered log entries for appropriate commands.
Now I'm in a halfway to a new version of PersistentState
with parallel compaction. It will have the following characteristics:
At the moment I implemented one innovative thing: concurrent sorted linked list for partitions. This hand-written data structure is much better than SortedDictionary<K, V>
from .NET that has O(log n) in comparison to O(1) average for search provided by a new structure. Moreover, new structure provides lock-free adding/removing operations.
The remaining half way is about writing special lock type on top from AsyncTrigger
.
@potrusil-osi , everything is ready for beta-testing. It was really challenging :sunglasses: There are few options for background compaction available:
Option # 1:
If you're using UsePersistenceEngine
extension method for registering custom WAL derived from PersistentState
and PersistentState.Options.CompactionMode
is set to Background
then the library automatically registers background compaction worker.
By default, the worker implements incremental compaction. It means that the only one partition will be compacted at a time. It gives you the best performance with minimal interference with writes. However, this approach has drawbacks: if there are too many writes then disk space will grow faster than compaction.
To replace incremental compaction, you need to implement DotNext.IO.Log.ILogCompactionSupport
interface in you custom WAL. It requires only one method. This method will be called by compaction worker on every commit. As a result, you are able to define how deep the compaction should be performed. PersistentState.ForceCompactionAsync(long count, CancellationToken)
now gives this opportunity. The first parameter accepts compaction factor. Zero means that no compaction should be performed. 1 means that compaction should be minimal (as in incremental approach). In other words, this factor means how many partitions you want to squash. The number of available partitions for squashing can be requested via PersistentState.CompactionCount
property.
Option # 2:
If you're not using UsePersistenceEngine
extension method then you need to write Hosted Service and register it in DI container. The service is responsible for running compaction in the background. You can use built-in background compaction worker as an example.
I would appreciate if you will able to contribute a good benchmark. Unfortunately, BenchmarkDotNet is not applicable for load testing. Also, it would be great if you'll share the final measurements of your application with background compaction.
As an option, EWMA can be used as a base for adaptive algorithm for background compaction. During application lifetime the algorithm can compute exponentially weighted average value of compaction factor and pass it to ForceCompactionAsync
method. It's really trivial. However, its application is limited to situation when application has floating workload. If not, EWMA will give the maximum compaction factor which leads to high interference with writes. Moreover, the existing incremental approach should work fine with floating workload.
hey @sakno, thank you for so much work! I'll try to take advantage of the most recent changes as soon as possible.
I have two questions:
ForceCompactionAsync
which helped in many areas. But to take it to the next level I'd like to optimize the clear operation so that:
SkipAsync
is very cheap if IAsyncBinaryReader
backed by the stream (which is true for PersistentState
). I cannot implement some app-specific optimizations because the current WAL is general-purpose. However, you can reorganize your binary format and place BLOBs in the beginning of the stream so they can be skipped easily with one call. And, of course, you don't need to save skipped content to the new snapshot during compaction.Another compaction mode has been added in the latest commit. Now there are three compaction modes:
Sequential
which is the only compaction mode available in 3.0.x or earlier. Offers the best optimization of disk space by the cost of the commit performance.Background
which allows to run compaction in parallel with writesForeground
which is running during commit process in parallel with applying of the committed entries. It requires no ceremony as Background
but much more efficient than Sequential
.Foreground
compaction mode tries to squash the same number of previously committed entries in parallel as the number of entries requested for the commit. For instance, you have 3 previously committed entries and you want to commit another 3 entries. In this case, compaction process squashes 3 previously committed entries in parallel with applying new 3 committed entries. If overridden ApplyAsync
method has approx the same performance as overridden SnapshotBuilder.ApplyAsync
then overhead will be close to zero.
Also, I have an idea about your second question. I can provide optional interface called IRandomAccessSnapshotSupport
with a single method. You can implement this interface in the class derived from SnapshotBuilder
. In this case I'll able to provide existing snapshot as read-only memory-mapped file and you will get random access to the snapshot contents at high speed instead of sequential reading and skipping. Of course, by the cost of your RAM (but without GC pressure). What do you think?
I think I didn't explain my use case very well. The biggest pain point is that when the compaction is executed on the background there are many partition files full of add operations (the snapshot file is quite small). When clear is committed and the compaction is triggered, almost all of the partitions (including the previous snapshot) can be dropped, except add entries in the last few partitions (which are squashed into a relatively small new snapshot).
To support that specific use case, the state implementation would probably need to be able to set custom metadata for the snapshot and all the partitions, access it during compaction, and influence the compaction algorithm to skip/delete certain partition files based on custom logic. But I'm not sure all this would make any sense for a general purpose WAL.
Metadata will not help, because replication relies on IAuditTrail
interface which may be represented by any custom WAL implementation. Log compaction and snapshotting is not the only strategy and the developer may choose to implements its own WAL based on different compaction algorithm, such as B-tree merging.
Compaction always starts from the first committed log entry to the last committed log entry. There is no way to look forward. This is the nature and main property of state machine: the state of the node can be reconstructed by the repeating of applied log entries.
However, I can add partition number to LogEntry
type and you can remember the partition (or absolute index, which is better) in which clear command sits. This can be captured in your ApplyAsync
method and saved into the snapshot. During the compaction, you can skip the entire entries without calling SkipAsync
using condition entry.Index <= firstCleanCommandOccurrence
.
UPD: Also I can add AdjustStartIndex(ref long index)
virtual method to SnapshotBuilder
that will help you to adjust the starting position of the log entry during compaction and skip unnecessary entries using the condition described above.
All this would be nice if the clear operation always cleared everything before. But that is not the case. It has a sequence number parameter which means that all items with sequence numbers less that the provided one should be cleared. Those sequence numbers are provided by the application. That's why I was thinking about custom metadata assigned to snapshot or partitions files - in my case it would be the highest sequence number in that file.
On the other hand, I could easily remember mapping between sequence numbers and entry indexes... In that case the AdjustStartIndex(ref long index)
would be really interesting. Especially if you optimized it even more - if the new start index is bigger than the index of the last entry in a file, then the file can be skipped...
Yes, it will allow to skip partition files without reading from them.
SnapshotBuilder.AdjustIndex
has been added to develop branch. In your case all skipped partitions remain unread and removed at the final stage of the compaction. But a little performance overhead still exists: partition table is now organized as sorted linked list. If your switch the index in a random way then finding the right partition cannot be done in O(1). Anyway, this overhead is very small in comparison to I/O operations.
@sakno, I'm looking how to create the mapping between sequence numbers and entry indexes and it is currently not that easy. I'm using the interpreter framework which deserializes an entry through the formatter. But because the formatter doesn't have access to the LogEntry object anymore, there is no way to create the <sequence number, entry index> pair (the sequence number is being deserialized). Well, I could work around it by setting the "latest entry index" property on the interpreter (by whoever uses the interpreter, i.e. the state or the snapshot builder), but it is not very elegant for multiple reasons. Could you rather make the current entry index available somehow (either in the formatter or the interpreter)?
Interpreter Framework is not tightly coupled with PersistentState
so there is now way to do that. However, you can use S.T.AsyncLocal<T>
to pass the index from PersistentState.ApplyAsync
to the formatter.
I'm trying to use the new version of ForceCompactionAsync(long count, CancellationToken token)
and as the first argument I'd like to provide only as many partitions as necessary for the clear command. In other words, I have a specific entry index and would like to translate it to the count argument. It seems it is currently not possible because I do not have the snapshot index. Any thoughts?
You can remember this index using ApplyAsync
method. Check IsSnapshot
property of LogEntry
type. But you need careful here, compaction can be executed in parallel with commit process. Therefore, ApplyAsync
can be called concurrently.
I have the latest version and have seen a new exception when the application was shutting down:
Unhandled exception. System.AggregateException: One or more errors occurred. (Cannot access a disposed object.
Object name: 'AsyncExclusiveLock'.)
---> System.ObjectDisposedException: Cannot access a disposed object.
Object name: 'AsyncExclusiveLock'.
--- End of inner exception stack trace ---
at System.Threading.Tasks.Task.ThrowIfExceptional(Boolean includeTaskCanceledExceptions)
at System.Threading.Tasks.Task`1.GetResultCore(Boolean waitCompletionNotification)
at DotNext.Threading.Tasks.CompletedTask`2.WhenCanceled(Task`1 task) in /_/src/DotNext/Threading/Tasks/CompletedTask.cs:line 24
at System.Threading.Tasks.ContinuationResultTaskFromResultTask`2.InnerInvoke()
at System.Threading.ExecutionContext.RunInternal(ExecutionContext executionContext, ContextCallback callback, Object state)
--- End of stack trace from previous location where exception was thrown ---
at System.Threading.Tasks.Task.ExecuteWithThreadLocal(Task& currentTaskSlot, Thread threadPoolThread)
--- End of stack trace from previous location where exception was thrown ---
at DotNext.Threading.AsyncLock.TryAcquireAsync(TimeSpan timeout, Boolean suppressCancellation, CancellationToken token) in /_/src/DotNext.Threading/Threading/AsyncLock.cs:line 252
at DotNext.Net.Cluster.Consensus.Raft.RaftCluster`1.DotNext.Net.Cluster.Consensus.Raft.IRaftStateMachine.MoveToCandidateState() in /_/src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/RaftCluster.cs:line 717
at System.Threading.Tasks.Task.<>c.<ThrowAsync>b__139_1(Object state)
at System.Threading.QueueUserWorkItemCallbackDefaultContext.Execute()
at System.Threading.ThreadPoolWorkQueue.Dispatch()
It's an error that exist for a long time since 1.x. Usually, it has no effect on application and you can ignore it.
Hi @sakno, it was a bit tricky to figure out the snapshot index. First, I was struggling where exactly I should be waiting for the snapshot to capture its index. It cannot be PersistentState.ApplyAsync
because it is not called with the latest snapshot. It could theoretically be SnapshotBuilder.ApplyAsync
, but I should not be capturing the index of a snapshot entry but rather index of the last entry. The problem with this approach is that SnapshotBuilder.ApplyAsync
doesn't need to be called at all in case all partitions being compacted are skipped (thanks for the new AdjustIndex
). So what I ended up doing was to capture the second argument of AdjustIndex
, endIndex
. It is a bit tricky, but it works.
I was benchmarking compaction with the latest changes and it was quite slow in my specific case (a lot of partition files, all of them are skipped). In fact, it was so slow (hundreds of milliseconds) that it often caused the leader to lose its leadership because of the compaction lock being held the whole time. Almost all the compaction time was spent in RemovePartitionsAsync
. When I locally modified that method to execute the partition disposal on the background (the code after the partitions are detached) everything was way faster. After that change the compaction takes only couple of milliseconds and the leadership is stable. Would it be possible to do something about it officially?
PersistentState.ApplyAsync
is called with latest snapshot just once - at application startup (if ReplayOnInitialize
is configured). This is when you can remember it. Afterwards, you can catch it in SnapshotBuilder.ApplyAsync
and update easily.
I'm curious what exactly slow in RemovePartitionsAsync
? There are only two potentially heavy operations:
await partition.DisposeAsync().ConfigureAwait(false)
File.Delete(fileName)
It would be great if you have a chance to measure these two lines of code separately. DisposeAsync
should not be slow because all buffers at the time of the call are flushed already. File.Delete
probably yes, but usually OS just erases file metadata from the filesystem table without filling with zeroes of its content. Removing partitions out of the lock is very dangerous:
What I can do:
DisposeAsync
and File.Delete
without the lock.I think it was the DisposeAsync
that took the longest. But I will measure it again.
For my local testing I did what you describe as option 2). I understood that modifying the linked list without a lock was dangerous, but even though it worked to call DisposeAsync
and File.Delete
without the lock in my tests, I wasn't sure if it was ultimately safe or not.
DisposeAsync
takes a bit longer than File.Delete
but not much. They are both quite expensive.
This is because of tests. In the real world, some of your cluster nodes may be too far behind in terms of WAL state. It will force the leader to read from the beginning of the WAL. And this is the moment when the app can crash due to concurrent read/write from the list or even continue to work with damaged state. I think this is irreproducible in any synthetic unit/integration test as well as many situations in distributed/multi-threading scenarios, such as dead locks.
Anyway, I'm trying to implement option 2: I need to remove the head partitions in the linked list when running within the lock and detach them. Then, I can safely release the lock and continue to do heavy I/O operations with these detached partitions without worrying about potential concurrency.
One more thing: PersistentState.ApplyAsync
may receive a snapshot not only at startup. If your node too far behind of the leader then snapshot installation procedure may be triggered by it. In this case, the leader will send the snapshot instead of log entries to that node you'll get the snapshot entry in the method.
@potrusil-osi , I have moved partitions cleanup out of the lock for all compaction modes.
Do you have any additional suggestions? If no, I'll start preparing the release.
Not now. We just moved the testing to a testing cluster which might reveal some more things.
@potrusil-osi , there is some additional performance optimization has added recently: you can enable copy-on-read to reduce lock contention between writes and replication process. Replication requires read lock which cannot co-exist with write/compaction. As a result, the duration of read lock depends on network latency. It can be eliminated if we could have a copy of log entries to be replicated outside of the read lock. Now this behavior can be enabled via CopyOnReadOptions configuration property of PersistentState.Options
class and increase throughput of writes by the cost of RAM.
@potrusil-osi , final performance improvements now available. There are many options to achieve the best performance with persistent WAL:
Options.UseCaching
property is enabled. Moreover, you can control the eviction policy of the cache using Options.CacheEvictionPolicy
property.
See suggestions in #56 . Tasks:
SkipAsync
toIAsyncBinaryReader
PersistentState
in a way that allows to do the compaction in parallel with writes or readsfsync
for every write must be disabled by default (FileOptions.WriteThrough
flag)