Closed andredasilvapinto closed 11 years ago
Yep. Actually I was just working on this when you submitted the issue :-)
I refactored the auditor to use only a single timer for all message timeouts. So, whereas it used to set many timers, each of which could have a number of IDs to expire once the timer expired, now a single periodic timer simply checks the beginning of the nodes
map for expired nodes. Nodes are added in the order in which they're created, so it should only ever have to check a few expired nodes at the beginning of the map. This seemed a lot more efficient to me.
As for the exception, the timing out feeder test was properly showing that exception, and since I made the changes I haven't seen it again. That said, I'm still not too big a fan of how the auditor is removing items from that map. I think the code still needs to be refactored because there seems to be a lot of potential for these types of exceptions. Maybe we need to add an expire
method on the Node
so that it can handle timeouts differently than other types of failures.
Oh, just saw your comment on the commit :-)
Actually, I think the current way the auditor removes the expired node:
...and then fails the node, that failure ultimately results in the node I think attempting to remove itself from the nodes
list again as well. That may not be a big deal, but it seems sloppy. The node should recognize the difference between timing out and being explicitly failed, and the removal of the node from the nodes
map should probably not be the responsibility of the node itself either (that can be easily moved out to the Root
node's failHandler
). But these things are why I get the sense that it may need to be refactored a little, though the code as it stands now may actually be stable (fixed the memory leak and iteration issues).
By the way, thanks for the help :-). I'm glad people are taking an interest, and I'm going to start submitting PRs myself to get some input on new features or fixes.
Right. I saw that you removed the clearNode calls from the handlers but haven't noticed you have added them to checkAck. Nevertheless, as you said, it won't be causing a ConcurrentModificationException because, as the node was already removed from the nodes list by using the iterator, the second remove call will not change the collection again. It will just return false. Therefore, no ConcurrentModificationException will be raised. Yet, I agree with what you said. There are probably better ways to implement these behaviours. But my understanding about the Vertigo internals is only circumstantial and not broad enough to provide a more proficient solution.
Thanks for your work.
I'm going to come up with something tonight :-)
Perhaps I will explain the internals of acking in Vertigo and the auditor in particular. I've provided some descriptions in the documentation, but more information could be of use to any future contributors anyways. Plus, I'm certainly open to ideas for improving the process.
I'm writing this as a general purpose description, so it may contain a lot of obvious details
Obviously, when a network is deployed, one or more auditor instances is deployed with the network. Auditors can be deployed on any Vert.x instance in a cluster. The coordinators do not specifically deploy them in the current Vert.x instance. (On a side note, in the future it may be prudent to better coordinate where modules/verticles and auditors are deployed in a cluster. Via certainly has the capability to assign deployments in this manner. This could allow network deployments to be performed in such a way that maximizes performance in some cases by effectively reducing inter-node communication).
The auditor's task is to track messages from creation to completion throughout an entire network. Each message emitted from any component can be either emitted as a new message or as the child of another message (child messages are created by defining relationships when messages are emitted from a component: https://github.com/kuujo/vertigo/blob/master/src/main/java/net/kuujo/vertigo/output/OutputCollector.java#L100). When a new message is created, the OutputCollector
first creates a single message. Then, for each Connection
to which the message is written (to some unique event bus address), the message is copied and thus receives another unique ID. Once the message has been written to the appropriate addresses, the OutputCollector
notifies the auditor by sending it a create
message:
The create message has the following keys:
id
- the base message ID. This is the initial message created by the OutputCollector
forks
- an array of child message IDs, one for each Connection
to which the message was sentNote that each message tree is assigned an auditor from the network. So, if a child message is created, it always inherits the parent's auditor address. This ensures that the same auditor always receives messages regarding the same message tree.
When the auditor receives the create
message, it creates a new internal Root
instance. The root represents a root node in a message tree. Each Node
instance can be assigned an ackHandler
and failHandler
. Obviously, if the node is acked the ackHandler
is called, and if the node is failed the failHandler
is called (including for timeouts). In the case of a Root
instance, the auditor assigns an external ackHandler
and failHandler
to the Root
which will publish()
a message notifying any interested parties that the message was acked or failed respectively. Also, when the new Root
is created, it is provided an expiration time. This is the actual time at which the message expires. The periodic timer which I mentioned earlier will periodically iterate over the nodes
map and fail()
any nodes whose expiration time is <= System.currentTimeMillis()
. Previously, a new timer was created every 1/10 second, and nodes created within that 1/10 second were assigned to the current timer. When the timer was triggered, it would fail any nodes assigned to it that had not yet been acked.
The second command to the auditor is the fork
message. The fork
message indicates to the auditor a new relationship between a parent node and any number of children. When the OutputCollector
emits a child message, it will notify the appropriate auditor of the relationship with the following keys:
parent
- the parent message IDforks
- an array of message childrenThe auditor will use the parent ID to look up the parent Node
instance. Then, for each new child node, it calls the addChild()
method on the parent Node
instance. It is up to the Node
to add ackHandler
and failHandler
handlers to the new children.
All this leads us to the meat of the algorithm, ack
and fail
messages. I think the most important part to note about how ack/fail mechanisms work is the order in which events must occur in order to consider a message processed. This is demonstrated in the node ackHandler
and failHandler
handlers. Each node contains two important booleans, ready
and acked
. Before a node can call the ackHandler
that was set on it, both of these booleans must be true
. However, if a node is ever failed then it will immediately call the failHandler
set on it. The ready
boolean is set once all the children of a node have triggered their ackHandler
handlers.
Here the node checks to make sure that all of its children have been acked simply by comparing a list of acked children with a list of all children. Once that condition evaluates to true
, the node considers itself ready
to be acked. But the node may have already been acked, so the checkAck()
method determines whether the node can be considered completely processed (ready
and acked
). Similarly, when the node is acked via an ack
message, the node checks whether it can be considered completely processed. In this manner, messages have to meet two conditions in order to be considered completely processed:
The end result is that the message tree will be entirely constructed and then acked from the bottom up. All bottom level nodes are acked, then their parents are notified, then their parents, and so on all the way up to the Root
instance. Alternatively, if any node is failed then its parent is immediately failed, and its parent, and so on all the way up to the Root
instance. Then, the root calls its ackHandler
or failHandler
which publishes the appropriate message.
Finally, there is one more feature to the auditor (which, coincidentally, I believe needs some refactoring as well). The ack/fail/notification process is in some ways dependent upon events occurring in a specific order. For instance, it's possible that if a message is acked before any child messages are emitted, the auditor could consider the message to be completed and thus begin the chain of ack notifications. Alternatively, some users could require that a message be held in memory for some period of time before a child is created. Ideally, users should always ack messages only once they've finished creating children of the message. But in order to facilitate time differences that may be required, the auditor also provides a delay function. The delay is simply the period of time (which defaults to 0
) the auditor will wait after a message is acked to allow components an opportunity to create additional child messages. If a child is created during this period, the node will not be considered ready
and will instead begin to wait for any new child messages to be completed.
I think one of the primary goals with the auditor is to ensure that it can reliably track complex message trees without knowledge of network structures. This is because Vertigo is intentionally designed to support very flexible network structures. For instance, nested networks can be created by deploying two networks, one with a component which receives input from a component from the other network. Also, users can arbitrarily tap into network components with listeners (https://github.com/kuujo/vertigo/blob/master/src/test/java/net/kuujo/vertigo/test/integration/ListenerTest.java). But with these types of features the acking system should still continue to work, and the current structure of the auditor allows that to happen. Once a message tree is assigned an auditor, it will continue to use that auditor regardless of whether the tree is even in the original network any more. So, if a message is created by network1 and its descendants eventually make their way into network2, the same auditor will continue to track the completion of the message regardless of that fact, and thus acking will continue to behave as expected even though the messages are in a different network.
In fact, the way components communicate in general is modeled around the flexibility of network structures (see The API has changed! for more info) which is why different networks can receive input from each other in the first place. The Vertigo communication system essentially behaves like a complex publish/subscribe system, with destination components publishing heartbeats to source components rather than source components containing an absolute list of message destination addresses. In this way, network definitions are essentially just used for setup purposes, and network structures can theoretically change while running (in fact this is another feature that has been suggested by some people).
I think I came up with a pretty good solution for the auditor, but I'm open to criticisms.
I've added an additional mechanism to separate timed out roots from fails nodes. Essentially, the auditor maintains two maps, a roots
map and a nodes
map. Neither map ever contains objects from the other. Root
instances are stored in roots
and Node
instances in nodes
. So, when the auditor checks for expired Root
instances, it has a much shorter map to iterate since it doesn't have to iterate over additional nodes for which expiration is irrelevant. Additionally, the delay
mechanism can be refactored to behave similarly to the expire
mechanism, except that it can iterate over the nodes
map.
When a Root
is failed, the auditor now publishes a new timeout
message. The timeout
message will be handled differently than current fail
messages. Essentially, fail
will act as a user-created failure - user defined feedback - and timeout
will act as a system-created failure. This can be very useful as users can now distinguish between the two, meaning timed-out messages can be re-emitted and failed messages can be handled in other ways. So, failures can be used to provide feedback between components.
I think this makes the failure mechanism much more useful, and it can allow users to enable autoRetry
on feeders without having to worry about user-failed messages being repeatedly re-sent and re-failed.
Moved this to #8 since the original issue has been fixed.
This last solution looks coherent and logic to me. Both the differentiation between timeouts and application errors and the performance optimizations by splitting the node maps are nice improvements to the overall quality of the Auditor code.
Thanks for your work and the through description above. I still need to get a more global understanding of the way the different pieces work together, but your explanation surely helped. I think I will be using it as a quick reference for when I have doubts regarding that part of the code.
Nodes that timed out are being removed from the same collection that is being used for iteration while performing a for each iteration (iteration in checkExpire, removal in clearNode). The way that you do this causes a ConcurrentModificationException ( http://docs.oracle.com/javase/7/docs/api/java/util/ConcurrentModificationException.html ). The following stack trace shows the situation itself:
I've committed a possible fix for this in my branch, but it is just a workaround. There are better solutions that can be implemented (that's why I'm not making a pull request). One of the possible solutions would be to explicitly use an Iterator and then removing it by using the iterator itself (e.g.: http://stackoverflow.com/questions/10218883/best-way-to-prevent-concurrent-modification-exception ). Anyway, you can see my workaround here: https://github.com/andredasilvapinto/vertigo/commit/03ccf95ea0be21c6510b71798cffaac8d5da1159