kuujo / vertigo

Flow-based programming for the Vert.x application platform.
Apache License 2.0
155 stars 24 forks source link

Support for message transformation #13

Open ramukima opened 10 years ago

ramukima commented 10 years ago

Do you think there is a need to support message transformation before a message gets from one component in the network to another component in the network ?

Also, it appears to me that even though you specify the input source for a component externally, some of the components may dictate the schema of the message they want to receive. e.g. they could dictate a JSONSchema for the messages they can work on. The framework could allow plugging in different schema contract from different components while chaining two components in the network. Just thinking out loud here. Does not necessarily mean this is critical for Vertigo to provide this feature.

kuujo commented 10 years ago

I do really like the idea of supporting schemas for messages, and I think this is an area that is lacking. Message schemas could help drastically reduce errors and would make component implementations much simpler to write. Currently, users would have to explicitly fail a message if its schema is not appropriate, so that means checking for fields in every component implementation. So, ideally the responsibility for checking something like a schema could be offloaded to another mechanism, and this mechanism could fail any invalid messages before the component even receives them.

On a related note, if we are going to add mechanisms that automatically fail a message for being improperly structured, it may also be prudent to provide feedback on why a message was failed. The challenge with this is that exceptions can't be sent over the event bus, but we would need some way to communicate the cause of a failure. Providing failure-specific feedback is just an idea though, perhaps it's not really necessary.

I'm definitely in support of schemas though. I think it could provide for a lot more stability. However, the schema definition would still have to occur within a component's implementation rather than externally since only the component knows the schema it requires. I'm curious to see what we can come up with for implementing this. One of the challenges with Vertigo is providing an API that can easily be supported in all languages. In Java, it would be easy to add some sort of schema declaration method to a component interface, but API requirements make interface-based schema declarations impossible (the Java WorkerVerticle could still provide a declareSchema method). I guess you would provide a declareSchema() method on the Worker interface or something:

worker.declareSchema(new MessageSchema().addField("foo", String.class)).addField(...));

This could be easily supported in all languages:

Javascript

var vertigo = require('vertigo');
worker = vertigo.createBasicWorker();
worker.declareSchema(vertigo.createSchema().addField('foo'));

Python

import vertigo
worker = vertigo.create_basic_worker()
schema = Schema()
schema.add_field('foo')
worker.declare_schema(schema)

This API is just off the top of my head, but I suppose it would be something like that. I should be able to come up with something that could be worthy. What would you imagine such an API as looking?

ramukima commented 10 years ago

Thanks for your comments Jordan. The way I thought this would work is that, when declaring the network (chaining components), you pretty much know what two components in the network you plan to connect along with the directionality. Is it possible for tapping into the chaining description (similar to how you declare Input), and indicate schema compliance by the receiving component. The payload for all components being JSON, there is high likelihood that we can use something similar to this - https://github.com/EqualExperts/json-schema-validator (Apache 2.0). There is a better JSON schema validator implementation https://github.com/fge/json-schema-validator (but LGPL licensed). See online demo - http://json-schema-validator.herokuapp.com/

I agree that the constraints are only known by the components. However, it would be nice if this could be done externally without the component indicating it during registration.

network.addVerticle("some.address", "some_verticle.py", 2) .addInput("other.address").addSchema("file://other.address.schema.json").groupBy(new FieldsGrouping("type"));

Where addSchema() could take a inline JSON string or a resource on the file system.

The error messages could be part of the schema definition itself.

Not sure if this makes sense to you.

orolle commented 10 years ago

I suggest to add the schema definition on the network connections. If the schema is added to components, its not clear if the Schema is for receiving or emitting events. Maybe the changes to vertigo are minor, bc of filterBy(). It just needs a SchemaFilter implementations and the usage is the following myNetwork.addVerticle("helloworld").addInput("foobar").filterBy(new SchemaFilter().addString(name="foo", optional=true))

kuujo commented 10 years ago

Right. Filters are one way that a schema could be added via a network definition, and it may be useful to do that.

Perhaps we can consider some existing schema validation tools and provide an API to support different methods of schema validation. Alternatively, we could choose one and use it as the sole validation framework.

There are obviously a number of ways to accomplish this. A single framework, an API that supports multiple frameworks. One of the important questions is to what extent schemas get validated. For instance, it would be easy to implement simple validation of the existence of appropriate fields in which case using a framework would be pointless, but with more complex JSON schema validation a framework could save a lot of time.

Something else that's important to remember is this has to be able to translate across the event bus. The way many Vertigo objects do that currently is by implementing the Serializable interface, which "serializes" an object's state to a JsonObject so it can be reconstructed in some component implementation. This is why it might be necessary to create an API as has been done with filters and groupings.

I think the final question is whether schemas should be declared within component implementations or at the time the network is defined, or both. The problem I have with defining a schema when the network is defined is that the schema required by the code in any given component implementation is completely dependent upon the implementation. But the networks API exposes features that are generally abstracted from component implementations, such as filters and the network structure itself. Consider this:

final BasicWorker worker = new Vertigo(this).createBasicWorker();

worker.messageHandler(new Handler<JsonMessage>() {
  public void handle(JsonMessage message) {
    int sum = message.body().getInteger("num1") + message.body().getInteger("num2");
    worker.emit(new JsonObject().putNumber("sum", sum), message);
  }
}).start();

In this case, the worker implementation itself defines that it requires messages with a num1 and num2 field. To me, since the implementation defines the structure of the message it requires, it should be the one to define the schema. Defining the schema received or emitted by any given component outside the component would require that the code defining the network understands quite a lot about each component implementation. Does anyone think I'm off base? What arguments/use cases do you have for declaring schemas when the network is constructed?

I think there probably are some good reasons for declaring schemas outside of components, so maybe I'm wrong, but I'd like to debate it a little. There are other things to remember with schemas too. For instance, I don't think schemas should be made too strict by considering messages with extra fields are invalid. There is potential that extra fields could be used further down the line. Also, like I said, to what extend should messages be validated? Should it be field existence validation? Or should it include data types and even the schema of sub-objects?

On a side note, schema support could be extended to a lot of useful features for Vertigo. For instance, if we add both input and output message schemas, that information could be used to automatically filter messages between components that would otherwise be invalid anyways (since that can easily be determined before the message is even sent) thus reducing the flow of invalid messages.

kuujo commented 10 years ago

To explain this better:

orolle commented 10 years ago

Reading your thoughts brings a whole new perspective in my mind. My thoughts are vague at the moment, but I try to explain. The new perspective (at least for me) is that we have 2 different layers to model a network and components. 1. Layer: The network models the connections between components. 2. Layer: The communication model of one component (what does a components input require and what does a components output emits). The communication model can be a static behavior (I can receive a message with this type and I emit messages of another type) or a dynamic one (If I receive a message of this type than I emit a message of another type).

An external schema definition is needed if a component can do more than is needed by the network. Think of vertxs mongodb client. The client processes messages like select, insert, update & delete. In my use case my financial trader bot crawls tweets of newspapers and corporations, computes the sentiment about companies and trades on it. For logging and back testing I need a logging component like the mongodb client to store the sentiment and the trades (this is critically). To ensure that a process is working correctly I need an external schema to ensure that the mongodb only receives insert messages.

kuujo commented 10 years ago

That PR is just some basic initial work towards some form of schema validation within components.

If I receive a message of this type than I emit a message of another type

In theory this sounds like a very interesting concept. Actually, in the example that I put in the PR above it demonstrates exactly this type of problem. A component may emit different output formats based on input data, so that we should consider factoring that into the API.

But after digging into the implementation, I'm kind of wondering whether output schemas are even necessary at all. Input schemas are useful because the InputCollector has a chance to fail an invalid message before the component ever even sees it. This is helpful because any given component does not know who will be sending it what messages and in what format, so it gives the component an additional layer of protection by ensuring any messages that make it to the component are properly formatted. But conversely, the component doesn't know to whom it is emitting output. Certainly, it knows its own output schema, and declaring a schema even inside the component won't change anything. Even with the component's output schema defined, it may have been connected to another component that requires some different schema, so emitted messages will be failed anyways. I think any schema declarations need to result in some change - some effect on the behavior of the network - and I'm not really seeing it with output schemas at this point. Perhaps it should just be up to each component to declare the schema it requires and allow any failures to bubble back up to the message source.

For schemas defined outside of a component, to me it sounds like perhaps the existing filters API should simply be extended to support more complex filters that allow validation of message structures. The current filter implementations are obviously very basic. How different is the filtering mechanism from what you're requesting? Is there any fundamental difference that would require an additional API instead of a new/better filter implementation?

With all this schema talk, I've been considering another feature that I think would be useful. With components declaring their required input schemas there may be cases where the data matches up but the field names differ. For instance, say component1 emits messages with an operator field and component2 requires messages with an operation field. In practice, the operator and operation fields both contain the same data, some operation, it's just that different developers implemented each component and it would be a pain in the ass to change either one of them. This type of issue could be resolved by providing processors (or translator or transformer or something) which could operate on messages between components. Rather than placing a mapping component in between the two (which may be a lot of unnecessary overhear) the user could just add a processor to component2's input, just as groupings and filters are currently added.

Something simple like this:

network.addComponent("component2").addInput("component1")
  .addProcessor(new FieldsMapper("operator", "operation"));

Did I just state your request a different way? (the title is Support for message transformation, which is what I'm proposing)

:-)

orolle commented 10 years ago

I think any schema declarations need to result in some change - some effect on the behavior of the network - and I'm not really seeing it with output schemas at this point.

Output schemas are relevant for constructing a network, while its not necessary in the operation of a network. Its provides some kind of type safety for component interoperation. Whats the output of the first component is the input of the second component. So the input and output schemas are overlapping or duplicates.

How different is the filtering mechanism from what you're requesting? Is there any fundamental difference that would require an additional API instead of a new/better filter implementation?

Filtering is one way to achieve type safety during runtime while Mapping provides interoperability. Input and Output Schemas seems to have an advantage during network definition, as they can check the interoperability between network components before runtime making the development easier. I think the Input and Output schemas should be defined by the component, while transformations like the FieldsMapper should be in the network (or component) definition (other transformations: Projections, Context Switching (eg. $ to €), ...). I prefer to add the transformation logic into a worker, bc transformations could be really complex (like time varying $ to € transformations). I think adding transformation to the input of a component adds optionality for the programmer which I do not think is good in the long term code maintenance and ease of use. We trade off a more complex software versus 1 more acking during runtime for 1 transformation.

kuujo commented 10 years ago

I think the Input and Output schemas should be defined by the component, while transformations like the FieldsMapper should be in the network (or component) definition

If mapping is what you want, mapping is what you'll get :-)

Okay, I think I was misunderstanding you a bit. I completely agree that adding transformations to inputs would be incredibly useful for ensuring compatibility between components. Schema declarations in components. Schema transformations outside of components. This has my full support and surely some development time :-) Watch for some more references in commits :-)

kuujo commented 10 years ago

Here's an important question before something gets worked up:

Should message transformation happen before or after filtering? Transformation of messages can have a major impact on filtering. A message that would be filtered out before transformation might not be after transformation. What do you imagine being the appropriate behavior?

orolle commented 10 years ago

Should message transformation happen before or after filtering? Transformation of messages can have a major impact on filtering. A message that would be filtered out before transformation might not be after transformation. What do you imagine being the appropriate behavior?

I do not think there is a single right approach for the order of filtering and mapping. I think the order should be a customizable like a processing pipeline. I found a categorization of operations on events in my lecture notes:

Event Processing Agent
|
---------------------------------------
|              |                     |
Filter     Transform     Detect Pattern (eg. pattern detection of an event sequence)
              |
-------------------------------------------------------------
|             |              |                              |
Translate     Aggregate      Split (split 1 Event to n)     Compose (merge n Events to 1)
|
-----------------------------------------
|                                       |
Enrich (add external data to event)     Project (similar to select in sql)

I thought heavily on that problem where to apply the event algebra and the solution is quite simple. Filtering and mapping (= projection) are only 2 of the operations above. So why limit to these 2? If I argue all operations above should be supported by operations on connections, I would need something for storing intermediate results within (obviously for Compose and Aggregate). For Filter, Projection and Split no intermediate results are necessary (these ops are applied on exactly one Event at a time). So what is this something to store intermediate results? Its obvious I think, its a vertigo Component. If I argue a Component is the solution, then filter and mapping should not happen on connections. It must happen in Components for being consistent with the other operations. Therefore I would introduce special workers for the every operation above, which are easy to configure or dummies (and than extended by programmers). For ease of use there could be component.addInput(foobar).addFilter(myFilter) which is internally translated to a component and a connection. The advantage is that every filter and mapping operations on a link or a filter and mapping defined explicitly as a Component is a Component, reducing the code to maintain for equal operations and increases reusability. This seems like a minor advantage now, but with a community and a module registry, every one can define event algebra operations as Components and share them with the community or inside a company. Think of a outliner detection component which could be shared. This gets really interesting if components and networks are shared the same way - so networks are Component-like. Then someone can integrate a shared clustering algorithm as a component in his own network without knowing if the clustering is a simple component or a complex network.

orolle commented 10 years ago

Schema declarations in components. Schema transformations outside of components. This has my full support and surely some development time :-) Watch for some more references in commits :-)

I am looking forward! I do not know how Vertigo distributes its Components but maybe schema declarations could be added to mod.json? Would be nice to reuse the module registry of vertx for distributing vertigo components :-)

{
 // Vertx mod.json + the following
  "input-schema": {
    "some-field-name": 
    {
      "type": "String", 
      "optional": false
    }
  },
  "output-schema": { ... },
  "processing-description": { some ontology stuff for better searchability and automation  }
}
kuujo commented 10 years ago

Excellent points! I was beginning to question whether filtering and transformation should happen on inputs myself considering that aggregation, splitting, etc are also desired features that seem to be on the same level as filtering and schema transformations. I think I do agree that these should be represented as standalone components. The only element that actually needs to be associated with inputs is groupings solely because they handle the actual routing of message between components. Indeed, it would be nice to continue the current API for special operations like the filterBy etc methods. This way, the API doesn't even have to change but it can simply be added to.

I think I'll take a stab at refactoring how filters work right now. Essentially, this will mean providing a simple, custom set of "filter" style workers along with an API for configuring each worker. Then this new method can be followed for the other stream operations.

I know people have asked about custom module configurations in mod.json files on the Google group, but I don't think there was much interest in it. I think they want module configurations to remain separate from mod.json files, and I'm not really sure if I agree or disagree with that requirement. But in order to co-opt the module system for configurations I think it might require a little hacking as it stands now.

kuujo commented 10 years ago

Also, maybe this is a good opportunity to make a more creative/higher level API for operating on streams. I consider the current Network API to be pretty low level, and there are tons of cool things you can do with an API once you start integrating things like filtering, aggregation, splitting, and whatever to make it incredibly easy to define operations on a network. It has to be well thought out though, but a good start is creating operations as components.

orolle commented 10 years ago

The only element that actually needs to be associated with inputs is groupings solely because they handle the actual routing of message between components.

I agree. Message routing is the sole purpose of connections. Are there other types of routing than grouping possible? Storm only supports grouping. The intention is to support the most useful routing mechanism by the basic API making it easy for future apps and higher level APIs. Grouping is hash based, maybe some tree based routing for value ranges (20 <= person-age < 30 )??? Might be interesting for aggregating filters to reduce messages send over the physical network - in practice this is very complicated and not practical (research topic is "event or message broker").

Also, maybe this is a good opportunity to make a more creative/higher level API for operating on streams. I consider the current Network API to be pretty low level, and there are tons of cool things you can do with an API once you start integrating things like filtering, aggregation, splitting, and whatever to make it incredibly easy to define operations on a network.

My personal opinion: Vertigo is a general purpose software for distributed event processing. General purpose is a hindrance for the development atm, because we cannot prioritize features and issues and attract resources. In my opinion we need a vision here. What is the application environment vertigo is the most useful? Pros imo are tight vertx integration, json based, graph orientated processing and lightweight. We could ship vertigo as event processing framework for clouds or its easy integration in vertx applications. Maybe it is possible to build a business model around vertigo - making little money.

I need a high level API for some business process modelling and workflow execution. Vertigo has nearly all requirements for that. What is missing is transactions (feature request is coming ;-), web-server integration (dynamic deployment of networks and their web ui), module registery for processes (= networks) and process steps (= components) and some UI stuff for easy development and monitoring. This is a high risk project, because it approaches business processes very different from what is industry standard. I would not use it as flag ship high level API.

It has to be well thought out though, but a good start is creating operations as components.

I agree :-)