kuujo / vertigo

Flow-based programming for the Vert.x application platform.
Apache License 2.0
155 stars 24 forks source link

Support multiple emissions per worker when replying to an Executor #17

Closed andredasilvapinto closed 11 years ago

andredasilvapinto commented 11 years ago

I'm using an Executor to call a Worker and do something else with the reply. However, my Worker, besides emitting a response back to the Executor:

worker.emit(replyData, receivedMessage);

also needs to emit other message to a different target:

worker.emit(someOtherData);

This should happen before acknowledging the message:

worker.ack(message);

The problem is that when I have that other message before the reply emission, the Executor that "called" the Worker will never be notified of the ack, leading to a time out. If I remove the "someOtherData" emission, there is no time out.

Am I doing something wrong or is this a known limitation of Vertigo?

andredasilvapinto commented 11 years ago

I'm not sure this is related with having more than one emission. Sometimes the reply works with two emissions, and sometimes it fails with just one. It usually happens more often when I use multiple emissions in the worker.

Also, when I don't receive the reply I'm getting:

SEVERE: net.kuujo.vertigo.runtime.TimeoutException: Processing timed out.

30 seconds after the request.

I'm trying to make sense of all of this, but it has not been easy.

andredasilvapinto commented 11 years ago

Looks like DefaultOutputCollector.emitChild() is sending the fork message after publishing it (same for emitNew()). Are you sure that's what you want to do? Won't the Auditor handlers fail to find the child message in nodes (because it wasn't created yet)?

Anyway, even if this is a problem, it is not the only one, as I have modified the code to reverse the order of operations and I still had communication issues (e.g.: reply handlers which are not triggered after replying and consequently timeouts).

kuujo commented 11 years ago

I've refactored the auditor (again) actually. We'll have to see if this is still an issue.

I think the intended behavior for executors should indeed be to allow multiple emissions by some worker to result in multiple results to the initial executor. The problem may be that once the executor receives the first result the handler is removed?

https://github.com/kuujo/vertigo/blob/master/src/main/java/net/kuujo/vertigo/rpc/AbstractExecutor.java#L301

This is actually illustrative of the problem. When the executor's execute() method is called, the AbstractExecutor stores a result handler for the message result. The problem is that it has no way to know exactly how many results it should expect to receive since the executor doesn't inherently know anything about whatever worker it's receiving messages from. It has to remove the handler at some point, so it simply removes the handler once any result is received.

However, this should be able to be refactored to work with the intended behavior. Especially because of how the auditor was refactored (it uses some math that is much more reliable with a much smaller memory footprint), it should always be true that the calling executor receives all execution results before receiving an ack message. So, if we assume that then it's possible that we can simply remove the execution handler once the ack is received and not once the result is received. My only concern is that there could be a race condition if there is a bottleneck between the last worker and the executor but not between the auditor and the executor, but I'm not sure how real of a concern that is.

I will submit a PR and you can test it out :-)

kuujo commented 11 years ago

One more thought. I'm not sure if it is proper behavior to call the execution handler before the message is acked, which is why the current code waits for the ack before calling the handler. Perhaps the execution's results will have to be stored in a results list until the execution is acked before handlers are actually called. On the other hand, maybe it should just be a requirement that execution handlers be idempotent.

kuujo commented 11 years ago

Check out that fix. It uses the first approach of storing results in a list until the message is acked. Of course, this could have potential negative effects if the results list is massive, but I think it captures the expected behavior of the executor in only providing results if the execution is entirely successful.

Also, maybe the executor should provide for unlimited timeouts. Currently, it always sets a reply timeout, but that may not necessarily be the desired behavior. Feeders and executors have the ability to re-emit failed messages automatically, and users may want to continue the execution until it's successfully completed (for timeouts).

On a related note, the executor timer system needs to be refactored. Currently, it starts a new timer for each execution. I don't believe in this behavior, as this can mean thousands of new timers per second. Perhaps it should take the auditor's approach of a single periodic timer that times out timed out executions.

In fact, feeders may need some sort of internal timer as well. I'm not sure. In theory, if an auditor failed then any feeders that registered messages in that auditor instance would never be notified of timeouts, and this is a problem. Feeders and executors could use their own timeouts with auditor-based timeouts being a fail-safe or vice versa. I just don't like the idea that timeouts only occur in auditors.

kuujo commented 11 years ago

By the way, if you're curious about how the auditor and message IDs have been changed (again) for development purposes:

kuujo commented 11 years ago

Also, hooks could be updated to reflect the API changes, with OutputCollector returning MessageId instances rather than String IDs, and each Hook type receiving MessageId as an argument rather than the String ID. The MessageId contains a lot more useful information about a message, such as where it came from, the root message in the tree, etc. Alternatively, the actually emitted/acked/failed/timedout JsonMessage could be passed to hooks as well. The problem is, each of these would be a big API change. What do you think?

kuujo commented 11 years ago

Also:

https://github.com/kuujo/vertigo/blob/master/src/main/java/net/kuujo/vertigo/output/DefaultOutputCollector.java#L285

The way the OutputCollector builds and emits messages is done in a very specific way. I don't know if you still see the same problems as before, but it's actually just message ID/parent/root management.

When emitting a new message (without a parent), first a base message is created and then children are created for each message it emits. This is simply a result of how the auditor works. Creating child messages has the effect of giving those children the parent's ID as the root. This allows the auditor to track the messages correctly since new messages have no root and thus the auditor would not be able to track them properly. It also gives us a root ID to return to the user to use as a correlation ID.

Alternatively, when emitting a child message, the OutputCollector creates a single child of the parent and then creates a child of that child for each channel. Again, the first child simply gives us a correlation ID to return to the user, and each message emitted from the component has the returned ID as a parent. But as far as the auditor is concerned we're actually just creating children of the parent message. So, this is sort of managing both the return value for the user and create/fork messages for the auditor.

kuujo commented 10 years ago

Hey Andre,

I know this issue is closed, but I figured this is as good a place to ask as any. I'd like to recognize developers that contribute to Vertigo and related projects on the project page and in the Vertigo module's mod.json file (and thus the Vert.x module registry). So, I'd certainly like to add you. Is that okay with you?

Jordan

andredasilvapinto commented 10 years ago

Hi Jordan, it's perfectly fine for me, but my contribution was very very small. I'm not sure it's even worth mentioning. P.S.: Sorry for not having replied to your comments, but there were several high priority issues I needed to work on at the company I work for (things like thread leaks in production environments and such), and now that I'm back to Vert.x we ended up using a simpler (but not so powerful) solution than Vertigo for our proof of concept.

Thanks for all your work and good luck for the EclipseCon, I saw your name there on the Vert.x day ;)

kuujo commented 10 years ago

It's all good, no apologies necessary. I actually took a break over the last month as well to study more about fault-tolerant distributed systems, and as a result I've begun work on some projects to provide geniune fault-tolerance for Vertigo using the Raft consensus algorithm: copycat

I'm glad you guys were able to find a good solution. I've actually been testing Vertigo in some smaller projects at my company for a few months now and I think with a few more core features (options for strong ordering and exactly once processing) I'll be pushing to get it into production as part of our larger Vert.x system.

Thanks for the update :-) I'll evaluate your contributions and perhaps offer credit somewhere. I thought you were very helpful, and I appreciate that.