kuujo / vertigo

Flow-based programming for the Vert.x application platform.
Apache License 2.0
155 stars 24 forks source link

executor -> scatter/gather aggregator? #27

Open ghost opened 10 years ago

ghost commented 10 years ago

Especially given Executor already aggregates multiple results internally since #17, how about a variant that batches all those results together and passes it to a handler instead of repeatedly calling the result handler? Otherwise it's hard to tell when you've collected the last result arising from a given source message (or is it? vertx and vertigo newbie here...) even though that is effectively already known.

It seems like it would be very handy for a simple scatter/gather aggregation use case. I'm not sure whether it should be a whole separate component type ("Aggregator" or whatever) given it's so very similar to the existing Executor, but otoh the type signature of the handler may have to be very slightly different to take a collection of messages (although I suppose java also allows overloaded methods...)

kuujo commented 10 years ago

I completely agree that executors should aggregate messages into a collection before the result handler is called. This is certainly feasible too.

I've been thinking a lot about the executor, and truthfully, I'm not really too fond of it. For one thing, I think it would be ideal to be able to deploy a network and then perform a remote procedure call on the network from the context from which it was deployed. This is probably the ideal behavior in most situations. For instance, I might want to make a REST server that performs a distributed RPC on a Vertigo network. I would want to create the HTTP server and deploy the network from the same context and then be able to perform the RPC from that context. Currently, the only way to accomplish this is by creating an HTTP server within an executor component, but I'm not sure this is ideal.

So, I'm considering whether Vertigo should just provide utilities for collecting and aggregating messages or perhaps provide aggregation as a separate component as you mentioned.

The way I see it there are a few options:

The only reason I am hesitant to deprecate and remove the Executor component type all together is the simplicity of the API. That said, I think the network configuration requirements for executors is a significant drawback. Also, I think the executor leaves a lot to desire for the reason you mentioned (you don't know when the call is complete).

Since the Executor receives feedback on when a message tree has been completely processed, it's certainly feasible to aggregate the results and pass a Collection to the result handler. I think adding a Collection to the interface would break the API, but perhaps this could be an opportunity to rename the poorly named Executor component at a minimum.

I honestly think I would prefer maintain only two types of components - feeders and workers - and implement operations on top of those types (splitters, aggregators, filters, etc). A while back I actually experimented with some new component implementations for splitters, aggregators, et. al., but I decided to stop development of a higher level API until some more structural changes have been completely. Specifically, I would like to provide options for strong ordering and exactly-once processing. Once those features are complete I would like to see higher level APIs and operations.

What are your thoughts? I'm not totally prepared to remove Executor, but that's only because I don't have a definite plan for what will replace it (and something must obviously replace it).

ghost commented 10 years ago

Well, there's something to be said for simplicity (and option 1 does happen to be a sufficient solution for my immediate need, cough). Other options seem more involved. Just a minor release with an Aggregator / aggregation-capable Executor might be a nice interim thing, with the cool stuff coming later.

Indeed option 2 (collector/aggregator utility classes) might longer-term desirable. And strong ordering and exactly-once are both also attractive features, of course, and option 2 seems to be in the same space, sort of. (I was also thinking of some ordering stuff, actually, there's a particular partial ordering we might want preserved at ingress and exit but out-of-order execution in between would probably mean more performance. That has implementation overlap with correlation and aggregation - basically I was considering naively aggregating then ordering, aggregating then ordering, within each correlated result group, but maybe ordering could be amortised across aggregation if correlators/aggregators/etc. are pluggable entities in themselves that are delegated to, saves having to have a different component type for every variation possible, I was going to try the naive pattern first to see if it was worth something more complex though, TBH it probably won't be in our case). I'm personally vague on what such an api should/would look like though (well, storm+trident exists too, but its streams-of-batches model is a bit different and we kinda want to avoid trident-like batching), you probably already have a much stronger idea given the 0.7 roadmap that's now up.

I'm not at all sure I'm "getting" Option 3 (separate API for "rpc") though - what if you want an executor/aggregator-type component (/functionality-of-some-sort) to work inside some larger network, then you still end up needing something from option 1 or 2 anyway (unless I've misunderstood (edit- actually you may not have meant them as mutually exclusive))? The thing outside the network will still need something to call for rpc, so the network "as a whole" still needs to have a spot for input to go and output to come back.

kuujo commented 10 years ago

Okay, that sounds good to me too.

I've already implemented this in a new branch. Since the Collection async handler would break the Executor API, and since I don't like the Executor anyways, I've implemented it in a separate extension of the Feeder interface. I realized that it's already true that much of the Executor interface replicates the Feeder interface with different method names, so it's makes sense to just make this feature an extension of the feeder.

The RPCFeeder simply adds the execute() methods that aggregate any descendent messages that make it back to the feeder during the processing of a given message. As expected, it was pretty simple to provide the collection as the handler result rather than calling the handler for each result since the data was already aggregated internally. Indeed, this should have been done in the first place, but aggregation was actually a feature that wasn't added until later (originally the Executor only supported a single result).

I'm going to commit this tomorrow and try to push a minor release for you. Currently, there are a lot of changes in master but it's definitely not ready for a release, so I'm going to create a release from a branch off the previous release. The release will likely contain this feature as well as support for deployment of worker verticles. All the other newer features will be pushed to the next big release since a lot of complimenting features are still missing. It's likely that the Executor will be deprecated in favor of the RPCFeeder once the next big release comes.

Edit: Alternatively, I guess an additional method could be added to the Executor, e.g. executeMany or something, but that seems sloppy. The truth is that the original design of the execute method was flawed, and there probably aren't many use cases for requiring single-message results (an implementation detail), so I think it just needs to be replaced.

Also, what I meant by providing an API outside of network components in option 3 was something like this:

vertigo.deployLocalNetwork(network, new Handler<AsyncResult<NetworkContext>>() {
  public void handle(AsyncResult<NetworkContext> result) {
    RPCClient client = new RPCClient(result.result(), vertx.eventBus());
    client.execute(new JsonObject().putString("foo", "bar"), new Handler<AsyncResult<Collection<JsonMessage>>>() {
      ...
    }
  }
});

That is, the ability to deploy a network and then send messages to it from the context in which it was deployed (rather than from within a component, e.g. Executor).

Thanks for the input. I'll post back here when I get it committed. Scratch that, I'll wait for your input :-)

kuujo commented 10 years ago

By the way, the RPCFeeder or something like that is actually more in line with the direction in which Vertigo is going. That is, the goal is to provide only two basic component types - a data source and a data processor (Feeder and Worker) - and build a higher level API on top of that. So ultimately I planned to replace the executor with something.

Also, an external RPC client may go against plans for Vertigo anyways. One of the features being planned is runtime network configuration changes (so networks don't have to be stopped to be updated, but instead Vertigo will coordinate to update components without losing messages) and any sort of external object that understands the internal details of a Vertigo network could be difficult to manage through that process.

Perhaps this is a good move.

ghost commented 10 years ago

Thanks! If you're willing to do a minor release shortly with an aggregation facility (one way or another given your edit), well, that'd be cool.

Having RPCFeeder be an extended Feeder also seems quite sensible (and Storm DRPC does have a DRPCSpout, heh)

I'll split the rest of my reply into two, X/Y, as the second bit sure turned out more speculative/rambling.

X. the execute one vs many api question.

executeMany() did initially sound messy to me too.

But consider a request to just some long pipeline - a network one might make just for reliability (and perhaps easy composition of predefined components during design)? That could often result in 1:1 request:response. So one-reply might actually be quite a common use case, if maybe not for you or me in particular.

So perhaps you might want to provide both an executeSingle() and an executeMany(), with executeMany() the fundamental and executeSingle() a convenience method provided to actively consider it an error (→ something in the AsyncResult.exception I guess) if more than one result comes back.

executeSingle()/execute() might be a nicer clean-slate method naming if not constrained to Executor's current interface, but note current Executor.execute() is not quite executeSingle() as it's not actually considering it an error for it to get more than one either.

So a bit like the filter()/all() methods vs. the get() convenience method in a python ORM layer, say.

ghost commented 10 years ago

Part 2. Bearing in mind I haven't actually tried anything below so take with big pinch of salt, just been trying to think about it. I drafted most of the below before seeing your latest comment regarding runtime hot reconfiguration concerns etc, but taking that into consideration, maybe half-formed thoughts below could still help spark some ideas to help with stopping external objects knowing any internals of the network, if "external rpcs to the network" remains a goal, so may as well still inflict it upon you.

Y. Regarding option 3 / network-external RPC
Y.1 RPCFeeders as RPC targets

I'm slightly wary of the name RPCFeeder insofar as you've probably got your option 3 (I think I understand better now and presume is overlapping with existing #19 ) in mind. A useful feature perhaps, but if such things had to be declared as whole networks / only usable from outside the network, it would have its own disadvantages too. And possibly advantages, admittedly - if you wanted to enforce acyclicity of individual networks as declared by users, say, and/or hide the need to do the explicit feedback/circular wiring.

In the option 3 case, my understanding is that you would still have some specific component(s) within a network that are the ultimate targets for the external RPCs from outside the network. - i.e. an RPCFeeder (even if option 2 does mean they are implemented differently later).

When sending an external RPC "to the network", though, it may be best not to address it to a particular component within the network, just "to the network". That would allow information-hiding / reimplementation of a different network providing the same rpc api. (and maybe facilitate hot reconfiguration). But then either you allow only one RPCFeeder per Network, or you need a way to select rpc targets.

Feeders have eventbus addresses, so I suppose those addresses are already designators that a network-level rpc call an RPCClient might be able to nominate. But rather than using the feeder address directly as a rpc target designators when doing an external rpc to the network, you might instead want to consider something like Network.addRPCFeeder('rpctargetname', ...), i.e. a named target facility like the named streams (and of course a default for the common case of a network with exactly one RPCFeeder), and an RPCClient.execute('rpctargetname', ...).

Though the feedback/circular-connection wiring needed to make Executor work seemed somewhat intuitive to me, if you're also thinking of hiding that connection as mentioned above, then you need a way to say which outputs are to be fed back, Alternative to such explicit wiring, you might have some Network.addRPCWorker('rpctargetname', ...) that paints certain workers as rpc outputs for a given rpc target, again with a default, that thus can be implicitly wired back to the correct RPCFeeder in the deployed network during deployment.

Y.2. Identifying which feeders are external rpc targets

So, Network currently has its addExecutor() method of course, that is distinct from addFeeder(), that could (be considered to) paint any Executors added as designated external RPC input targets, conceptually (I did suspect that might be part of why it was decided to make Feeder and Executor separate things).

However, if an RPCFeeder is to be just added in with an addFeeder() call (rather than still having an addRPCFeeder() somewhat like addExecutor() currently), then you might need some other way to indicate which RPCFeeders are true external RPC targets. So you might want a subclass/interface XRPCFeeder to paint RPCFeeders that are for such external use, perhaps enforcing that each XRPCFeeder is, within a particular network, either the default external RPC target or a uniquely designated alternate external RPC target.

It may help to rename RPCFeederCollectingFeeder (or ExecutorFeeder, or...) and then XRPCFeederRPCFeeder when trying to decipher the above.

Y.3 re external RPC abstraction/layering concerns.

API-details wise, rather than what you just showed - passing in non-JsonMessages and receiving a collection of JsonMessages back - isn't such an "external RPC to a vertigo network" sort of both an abstraction and unreliability/reliability boundary, so maybe RPCClient.execute() shouldn't expose JsonMessage structure at all and just be a RPCClient.execute(JsonObject body, Handler<AsyncResult<Collection<JsonObject>>> handler) ?

Sure, it'll presumably ultimately hit the real RPCFeeder (as in an XRPCFeeder sense), but it can supply a handler to that that unwraps the JsonMessage body.

In contrast, "network internal" uses of an RPCFeeder (as in a CollectingFeeder sense) might want to do hairier things, so RPCFeeder's own execute() might well deal in full vertigo JsonMessage structure.

Y.4 implicit provision of an eventbus-based rpc api, no RPCClient?

Hmm. Maybe a bit heavy but, could it be arranged that, when a Network is deployed, a special RPCServerVerticle is auto-prepared (I don't mean quite another normal component, think a bit like an AuditorVerticle is auto-prepared by the AbstractCluster) that directly listens on eventbus address, accepts external vertx Messages sent to it, relaying to the relevant in-network RPCFeeders, and responding with the RPCFeeder's results on the Message.reply, rather than even having an exposed RPCClient at all?

Yes, the RPCServerVerticle might be a bottleneck, except it itself could be multiply-instantiated, and yes it involves another hop "outside" the network (but clients doing RPC from outside the network have "no right" to expect reliability), but then other vertx modules/components expecting vertx semantics could treat the vertigo network as a black box (though I suppose you may then feel obliged to check the Message payloads are json dynamically for correctness in RPCServerVerticle, whereas the RPCClient.execute() type signature enforces that).

(Not sure how that would tie in with named rpc targets also proposed above - just more eventbus addresses maybe).

Or maybe that's not sane and back to Y.3 where, to eventbus-expose the rpc api, at least the deployer of the network would just have be responsible for sending to the network with the help of a RPCClient instance, relaying from a vertx message receiver that the deployer prepared.

But either way that does sort of relate to plans to being able to deploy a json-specified network from the command line directly mentioned in 0.7 roadmap - that may well be a generic network-deployer-verticle/module that you're planning, and maybe that could do with such a the raw eventbus <-> network rpc-type interaction being auto-provided by Y.3 or Y.4 methods or equally likely some other way I haven't considered.

Phew, hope all that wasn't complete rubbish.
kuujo commented 10 years ago

I love the long convo :-)

I'm actually glad this issue is being tackled right now because it's raising a lot of questions about proper design. You seem to be very much on the right page.

I think you're very close to the model that will need to be used to provide external access to networks over the event bus. Essentially, provide a newer version of the executor which can be used as an entry point to the network. So, RPCClient or whatever would communicate with whatever RPCFeeder instances exist in the network to essentially perform RPC externally.

The reason I say I'm glad this issue is being tackled is because some of the factories in Vertigo need to be refactored in order for this to work. As you mentioned - and I agree - we don't want to keep adding methods like addRPCFeeder and addFilter and addFunction and all that stuff as the API becomes more specialized. I prefer that the API only visibly support two component types - Feeder and Worker - and that the component implementation be up to the user.

So, that means Vertigo needs to be able to set up the correct components without knowing the implementation details. This is fine in Java. It's easy to abstract the feeder or worker implementation because users can extend RPCFeederVerticle or whatever which will internally construct the correct component class. But the issue arises with language bindings. Both the Javascript and Python APIs don't exactly have any way of providing identifying information like a verticle class, but Vertigo needs to be able to provide Javascript and Python scripts with access to the correct type of components.

I spent a lot of time on component factories and providing the right component types to language modules. So you're right, there was a reason the addExecutor method was added instead of using addFeeder, and there's a reason types are stored in component definitions. Essentially, the Javascript and Python APIs work like this:

The last part is the important step. Originally, Vertigo required users to create their own components within component implementations. So, in Javascript, if you created a network like:

network.addFeeder("foo", "foo.js");

Then you would still have to actually create the feeder instance within foo.js

var vertigo = require('vertigo');
vertigo.createFeeder().start(function(feeder) {
  feeder.emit({foo: 'bar'});
});

I didn't like this because it meant feeders could be deployed as workers and workers could be deployed as executors. I like type safety, and that was just not proper, nor was it as easy to use as it could have been. So, with the addition of component typing I was able to clean up that Javascript feeder code like so:

var vertigo = require('vertigo');
vertigo.feeder.emit({foo: 'bar'});

Essentially, if foo.js was deployed as a feeder, then a feeder will be available in the vertigo module. But if you try to access worker, it will be undefined.

So, I'm sure you can see how this is beneficial. But I'm also sure you can see why a separate addExecutor method has to be provided, and why this is an issue in general for polymorphic types. This API simply doesn't support more than one implementation, and that's an issue.

Indeed, this is an issue that will need to be resolved at some point as there have long been plans for higher level APIs such as filters, aggregators, etc.

So I'm sort of brainstorming on how to fix this deficiency right now. I've written the RPCFeeder or whatever it will be called, and it's easy to make it available in an RPCFeederVerticle verticle class, but as for language implementations like Javascript and Python, I can't see any good way to ensure the correct feeder implementation is created for any given script without that information being provided in the network configuration.

The other alternative that I've often considered is separating tools like these - tools other than the bare minimum Feeder and Worker - into a separate higher level API. That would allow the current simple feeder/worker API to remain unchanged, while allowing users to use a separate API within component implementations to create specialized feeders or workers. I'm sure an API could be made to play nicely with Vertigo in language modules. For instance:

var vertigo = require('vertigo');
var higherLevelAPI = require('higherLevelAPI');
var rpc = higherLevelAPI.createRPC(vertigo.feeder);
rpc.execute({foo: 'bar'}, function(error, result) {
  ...
});

By the way, I do probably favor the idea of exposing a raw event bus API for externally communicating with a network's feeders or executors. I'm just not sure it's wise to add another verticle to all networks, most of which may not even need/use it. Perhaps it could be a network option though.

Awesome input!

Like I said, I'm working on fixing the executor. Hopefully I'll have something useful tonight. Obviously providing the Collection to the result handler is not the hard part, just need to explore options for any required structural changes and ensure changes work with future planning. I definitely feel that the executor needs to become a Feeder implementation in some way, as I extended the BasicFeeder to re-implement the executor and only needed to override two methods :-/ I think I knew that at the time it was created, but as I was mentioning a lot of the current design decisions were based around compatibility with other languages, so hopefully we can just find an elegant way around those design restrictions.

kuujo commented 10 years ago

a649286e4ef59803926daeef7252bb1f8ea6edff

kuujo commented 10 years ago

Alright, I think I came up with a way that we can provide custom feeder/worker implementations that are compatible with the current API for dynamic languages. Basically, the implementations will have to be dynamically constructed in dynamic languages, with the specific method for doing that obviously dependent upon the language.

For instance, in Python, the current API in a feeder component works like so:

from vertigo import feeder
feeder.emit({'foo': 'bar'})

Internally, the Python module constructs the feeder when the vertigo module is imported. But that process simply has to be moved to separate feeder implementation modules in order to support different feeder types:

from vertigo.feeder import drpc_feeder as feeder
def result_handler(error, result):
  print result.body
feeder.emit({'foo': 'bar'}, result_handler)

So, considering this, I think it would be fine to commit the DRPCFeeder to master and release it whenever we can finalize the naming and the implementation.

The current implementation that I referenced above actually doesn't change the feeder interface at all. It simply changes the MessageId in the original Feeder interface to a generic type, with the BasicFeeder returning a MessageId and the DRPCFeeder returning a Collection<JsonMessage>. But I'm not so sure I like that. Perhaps it should extend the feeder and just use the execute() methods.

As for an executeSingle or executeOne method, that naming doesn't seem accurate to me. The count really applies to the number of results, and those names make me think of input, not output. Maybe a simple execute method can be added to a DRPCFeeder for now and then perhaps another single-result method can be added later. Alternatively, separate implementations could be implemented for each, or an option could be implemented for the result type.

As for networks and supporting external calls to a network via the event bus: rather than adding a single static entry point to networks, perhaps a new abstraction could be used specifically for this purpose. I wonder if there are enough use cases to warrant it, but something like a Service or whatever to make networks accessible over the event bus or remotely. I'm not totally sure it's necessary since it can be done inside of feeders anyways, but perhaps it would mean less work for the user.

network.addService(new RestService('http://localhost:8080'));

Rather than

public class RestFeeder extends FeederVerticle {

  @Override
  public void start(Feeder feeder) {
    vertx.createHttpServer().start(new Handler<AsyncResult<HttpServer>>() {
      ...
    });
  }

  @Override
  protected void nextMessage(Feeder feeder) {

  }

}

or

network.addService(new EventBusService("some.address"));

rather than

public class EventBusFeeder extends FeederVerticle {

  @Override
  public void start(final Feeder feeder) {
    vertx.eventBus().registerHandler("some.address", new Handler<Message<JsonObject>>() {
      public void handle(Message<JsonObject> message) {
        if (!feeder.feedQueueFull()) {
          feeder.emit(message.body());
        }
      }
    });
  }

}

Basically, those services are just common feeder implementations, but they are also likely very common use cases, especially in the context of Vert.x. Workers could still subscribe to input from specific services, the difference being just that services don't require specific implementations by the user.

ghost commented 10 years ago

I confess I hadn't really thought much about other language needs (we are actually python users locally, both cpython and jython, but not in conjunction with vertx, at least not to date)

The module import solution certainly sounds smart and fits in with the style of the existing vertx/vertigo python api. That api seems geared towards short "script-y" python, unlike the groovy language module there doesn't seem to be support for "class-y" usage - i.e. I can't do apython:baz.bar:Foo where Foo is python-side subclass of a python Verticle class, say (or even a python:baz.bar somewhat analogous to python -m baz.bar. There's only foo.py). I know a class-y api would be a little more complicated to implement with jython than it sounds, but perhaps not impossible. In any case quite out of immediate scope here, you presumably need to fit in with the current established apis, not some hypothetical extended one.

ghost commented 10 years ago

Regarding a separate "Service" API, I reckon you mightn't need it if such services are just Feeders too (and you don't want complexities of the information hiding and implicit wiring and verticles as previously discussed). Since you already allow for supply of config objs to FeederVerticles at Network.addFeeder(), that config would just supply relevant extra args - like what eventbus address the EventBusFeeder should bind or what url the RestFeeder should bind - anyway. Supplying such ready-made feeders would presumably be handy though.

ghost commented 10 years ago

Regarding the DRPCFeeder impl, you say:

But I'm not so sure I like that. Perhaps it should extend the feeder and just use the execute() methods.

[SNIP - Nonsense largely arising from dgolden forgetting about Java's irritating type erasure, apologies]

ghost commented 10 years ago

I know 0.7.0 is now going to bring a lot of interesting changes, have been quietly following the developments and group discussions - but sounds like it's now going to be (quite reasonably) a couple more months coming as a result.

And unfortunately the drpc-feeder branch now also pre-dates various 0.6.4-era improvements and bugfixes at this stage.

For our use I have just (admittedly less than elegantly) locally extended the 0.6.4 Executor and BasicExecutor (rather than forward-porting from drpc-feeder branch, sorry...). (Modulo Java 8 interface default methods) it's of course dubious to add methods to interfaces like that instead of making an extended child interface, though I'd wonder how likely it is there are Executor impls other than BasicExecutor out there.

(With the 0.7.0 api changing so significantly as per the readme etc., not sure it's worth worrying too much about 0.6.x api's long-term design..)

ghost commented 10 years ago

Vertigo 0.7 is in beta and quite different. Not in a bad way, but just so that this ticket is pretty irrelevant by now, probably should just be closed...