Closed yvesgoeleven closed 7 years ago
Hmm, the storage makes sure only one of them can go ahead and dispatch any given TM?
@yvesgoeleven still relevant?
@andreasohlund Yes
It becomes even more relevant since when we're replacing the distributor (which used a single TM for all workers) with client-side distribution, we're running multiple competing TMs and the way TM queries the database is not really optimized for competing (although it should not cause any errors).
This means we're are either loosing timeouts on scale-in (cloud PaaS scenario) or dispatching same timeouts multiple times if transaction is not working. Isn't that bad @andreasohlund @SzymonPobiega ?
Hmmm... The current timeout manager favors duplicates over lost messages. It means, in non DTC scenarios, timeout messages might get duplicated. When DTC is on, we handle that (thanks to @timbussmann )
@SzymonPobiega doubt this is entirely up to TM to decide. In Azure PaaS scenario, if you have a scaled out endpoint that had a timeout and endpoint was scaled in, there's a good chance that timeout will not get invoked.
Why is that @SeanFeldman ? It that a problem in the Azure persistence or a design flow in the core?
BTW, @janovesk found that Consul can provide a reliable master election for any services that use Consul for service discovery.
When master election was taken out of the core, an alternative was implemented for Azure timeout persistence. Since we don't have transactions to rely on like the other persistence do, an alternative was impleented in Azure timeout persister. As a result of that, when an endpoint instance that has created a timeout is scaled in, that timeout won't get executed. So it's a combination of Azure and core deficiencies @SzymonPobiega.
@SeanFeldman What means "scaled in", stopping of an instance decreasing the number of active instances?
@ramonsmits yes. The opposite of scaling out.
I am not sure I follow. If I understand correctly, the TimeoutPersister
queries a table which name is specified in the config so all the instances query the same table. Or am I missing something and each instance gets its own queue and that's why when instance is gone, so are the timeouts.
We could use leases
as a master election algo. There's a problem when an operation is long lasting (some pause, like GC or access to a remote resource), but we could assume long enough period. That would mean that when a master role is killed we need to wait. Beside unplanned shutdowns we can react to a role stepping down and remove the lease.
@SzymonPobiega If we considered Consul
, it could provide so much more (configuration, etc.)
Still not following why scale-in would cause timeout loss. There will still be some remaining instances which run TMs and will keep processing the timeouts that are in the store.
As for master election, it would improve scale-out scenarios in terms of resource usage but it is a quite complex problem to solve. Also, as it is not a matter of correctness but efficiency, it would be enough to have weak master election which means the algo ensures there is at least one master present (ideally one) but there might be times when there is more than one. A strong master election (at most one node acting as master) would be an overkill here (despite the fact that I would love to get a stab at implementing Paxos)
+1 @SzymonPobiega I see not timeout losses as well. It's independent.
One master - no collisions (in terms of transactions, locks etc), many masters - possible collisions when dispatching timeouts. That depends on the value I don't know, which is average/mean numbers of timeouts being dispatched per second (you know that there's Lamport and only a few guys more who did it correctly, use Raft maybe if needed?)
@SzymonPobiega I was wrong, no lost timeouts, but duplicates indeed. Lease based solution was in place in the past @Scooletz if not mistaken. /cc @yvesgoeleven
@SeanFeldman OK. Aren't users already prepared for duplicates when running on azure where there is no transactions?
Duplicate messages yes. Don't think they are expecting duplicate timeouts. Also, I'm oblivious to why this was removed in the past.
@SeanFeldman but duplicate timeouts can happen for any non-DTC transport even without scaling out (TryRemove
might fail for some reason and the dispatch message is going to be retried which will result in a duplicate).
With regards to removing it, that must have happened before I joined. Do you know where I could fine the version of TM that still had master election? BTW, I believe some form of master election and cluster self-awareness would be beneficial not only for TM but also for other features.
@SzymonPobiega pre-dates me as well. @yvesgoeleven do you recall when was it used?
It must date back to the time that azure persister was still part of the core, v2.X I guess... very long time ago
@mikeminutillo has an awesome idea for the master election using only a shared queue for signalling.
@SzymonPobiega @mikeminutillo care to explain? Some kind of heartbeat system that points to the currently active master?
@yvesgoeleven @mikeminutillo found the SWIM protocol specification. It seems like it can be used for weakly-consitent leader election (we want a single leader for performance reasons but we tolerate two leaders with regards to correctness)
I was thinking about a simple approach where each instance sends a message to the queue with a randomly generated number. The one with highest number wins. If an instance receives somebody else's message, it includes that other instance in the message it sends to the queue. After a while everybody should converge on the same view of the cluster.
@SzymonPobiega got it, that's actually very similar to how I do hosting distribution in MessageHandler's cloudservices' fabric, just used the other way around.
Here's an example of my algorithm with 4 instances of an endpoint A, B, C and D. Initially they know only about their ranks and hence each one considers itself to be a master
A - A:10 - master
B - B:7 - master
C - C:2 - master
D - D:5 - master
Each of them sends a message to the shared queue. Because it is equally likely for each of them to pick up a message from that queue, by sheer luck the messages get to:
A -> A
B -> C
C -> B
D -> C
After they are processed (1st round), here's what the nodes know (C knows it should not be the master)
A - A:10 - master
B - B:7, C:2 - master
C - C:2, B:7 - slave
D - D:5 - master
Now, randomly, the messages are sent to:
A -> D
B -> A
C -> A
D -> B
And after this round, they all know a bit more....
A -> A:10, B:7, C:2 - master
B -> B:7, C:2, D:5 - master
C -> C:2, B:7
D -> A:10, D:5
And they send what they know again...
A -> B
B -> D
C -> D
D -> B
This time, although not yet everybody knows everything, they know who's the boss
A -> A:10, B:7, C:2 - master
B -> B:7, C:2, D:5, A:10
C -> C:2, B:7
D -> B:7, C:2, D:5, A:10
I don't know yet what would be an expected time (number of rounds) required to converge. I guess I am to stupid to calculate that analytically but I bet I can do some simulations.
BTW, as @mikeminutillo pointed out same algorithm can be used at the system scale to built the routing table.
The routing case is interesting because each node is the source of truth for it's own data. No conflicts in the cluster. I do have a SWIM cluster that runs on top of MSMQ satellites that I want to try and build config-free routing on.
To achieve convergence faster @SzymonPobiega you'd have each node read a message, update it's internal state, write the whole state to the new message (but only if it's changed). In a small cluster the data is small. When a new node joins it will add a new message to the queue which will trigger the cluster off again.
Master election would only be relevant for MSMQ and RabbitMQ right?
Since ASB isn't affected and ASQ and SQL should have a native timeoutmanager instead that wouldn't have this issue?
@andreasohlund it applies to all, ASB only does native deferall, not native timeout scheduling. There is a sattellite in each instance of and endpoint, sending a copy of the scheduled timeout. Only 1 of them should send
@mikeminutillo comment https://github.com/Particular/NServiceBus/issues/2602#issuecomment-242461000 proposal is extremely valuable as this operation is commutative. It makes your proposal @SzymonPobiega state-based CRDT.
Instead of ranks I would propose Guid
and lexical ordering maybe? What do you think @SzymonPobiega
ASB only does native deferall, not native timeout scheduling
Not following, are you saying that ASB is using the TM?
@andreasohlund I'm mixing up things sorry, it would occur with scheduling
But the NSB scheduling support is using the transport provided IDeferMessages
impl so that should be fine as well?
@andreasohlund correct about IDeferMessages
@andreasohlund what do you mean by
ASQ and SQL should have a native timeoutmanager instead
in https://github.com/Particular/NServiceBus/issues/2602#issuecomment-242646264
There's no native capability for ASQ. Did you mean "to implemented within the transport"?
Although it's a fancy solution I wonder how it works when the master stops or when nodes enlist elastically? Seems that this needs to know which nodes there are. All nodes need to be cluster aware.
Why not just an atomic write for a lease that uses optimistic concurrency control?
All nodes read the item, check if the timestamp is expired, if yes, set lease duration and write lease data. If write OCC succeeds, you are are the master. Master can extend lease while it owns it and at a graceful shutdown it can expire itself.
This only requires that clocks are in sync and can be implemented in a few seconds and supported by I think almost all storage providers by it relational, document, blob or for that fact even a file system.
@andreasohlund what do you mean by ASQ and SQL should have a native timeoutmanager instead in #2602 (comment)
A native SQL persister would have all instances connect to the same sqlserver so it wouldn't have this issue.
And same would go for ASQ? (all instances would pull from the same storage account)
@andreasohlund for MSMQ and RabbitMQ, a single endpoint running SQL transport could be a really good TM replacement. We would only have to bundle it somehow so that it is easy to install.
@andreasohlund for MSMQ and RabbitMQ, a single endpoint running SQL transport could be a really good TM replacement. We would only have to bundle it somehow so that it is easy to install.
@SzymonPobiega Nice algorithm, but...
If for example node A is really, really, awfully slow and node B, C & D are extremely fast (compared to A) then A might never get a single message. It will always think it is the mater node, without ever learning about the others. Highly unlikely, but possible nonetheless.
I like the lease idea better. Shared storage and the first to start up writes to that shared storage with a lease-expiration datetime. Others coming in try to do the same, but will fail and are not the master. The master keeps updating the lease-expiration datetime and the others will keep polling it. First to detect that the lease expired, updates the row to become the master.
@dvdstelt I agree that shared storage based approach is way easier. Should we have something like NSB clustering package that encapsulates things like cluster node discovery and leader election? Then other higher-level features could take advantage of this.
Should we have something like NSB clustering package that encapsulates things like cluster node discovery and leader election
💯 I see it as a stack:
Node Discovery Node Health Data Exchange Leader Election
The first 2 are covered by a heartbeating protocol (Heartbeats in SC) or a membership protocol (a la SWIM).
Once you have those Data Exchange can be handled via a centralized push (like SC) or an Infection Protocol (like SWIM)
Leader election you end up with something like RAFT.
@mikeminutillo
Although I'd love to implement RAFT, as far as I understand replicated state machines algorithms, using RAFT just for leader election can be an overkill. There are easier to implement algorithms that provide at most one
running node.
Given that it looks like we've got viable approaches for native deferral for all transports other than MSMQ, the Timeout Manager could become an MSMQ-only thing.
In that scenario, do we still need something like this to coordinate timeout managers?
Agree with @bording , we should spend our efforts on native TM implementations instead. Closing this
When running multiple instances of an endpoint, multiple embedded timeout managers are running as well. This will cause timeouts to trigger on all of these at the same time, resulting in multiple messages being sent.
We should add a concept to the core that elects the master timeout manager and only send from that one, and probably add a specific implementation for certain environments (in azure one would typically use blob storage leases in order to implement this)