sinjax / squall

An implementation of Rete for Jena Rules and SPARQL on the Storm distributed stream processing framework
3 stars 1 forks source link

mqtt adapter #3

Open stmeissner opened 10 years ago

stmeissner commented 10 years ago

hi!

What needs to be done to receive RDF triples carried over MQTT protocol (http://mqtt.org/)?

My gut feeling tells me, it should be solved on storm-level, right? https://github.com/shirou/storm-mqtt

Moreover the C-SPARQL queries should be issued as MQTT subscriptions then.

cheers Stefan

sinjax commented 10 years ago

Hey Stefan,

Yes you could make an MQTT spout. In the old version of the tool (ReteStormTool) you'd have to do this by programming a new tool "mode" which was backed by a new "ReteTopologyBuilder" which (like the KestrelStaticDataSPARQLReteTopologyBuilder) created a storm spout which consumed from MQTT.

In the new version of the tool (SquallTool) MQTT could be added as a new input protocol and could be handled in a much nicer, and more generic way.

Now as to whether the C-SPARQL queries should be issued directly to MQTT, I'm not sure that makes sense? The whole point of squall is that you distribute the literal answer of the queries using storm or whatever distributed stream framework. If you were using MQTT to answer the queries, then squall would have no purpose at all? HOWEVER. One cool thing might be to do a very simple (but still very high volume) query to MQTT to do some basic and initial filtering of the data, and then run a more involved query on squall which FURTHER filtered.... does that make sense?

e.g. the MQTT query could just be a simple CONSTRUCT {FILTER} where the squall query could be more SELECT { ... many joins ... sub queries... etc}.

The benefit would be that the distributed rete in squall should be able to more efficiently handle the more complex query. In theory anyway ;)

stmeissner commented 10 years ago

Hi Sian,

you got me right with the idea of distributing the RETE algorithm. Of course it will be a trade-off between lower query processing load vs. network latency + serialisation/de-serialisation effort.

regarding SquallTool you are talking in conjunctive, so at the moment it is not possible?

sinjax commented 10 years ago

The SquallTool currently doesn't support SPARQL It was made to be extensible and is by far a better way to do things than the retestormtool. but yeah... currently it has no sparql support...

it will! Like we're developing it and this is one of the things we want to do... but yeah... it's a work in progress :D

scalen commented 10 years ago

Hi, I'm one of the other Devs on Squall.

Having looked at MQTT for the first time just now, there are a number of ways it could be utilised in this project.

Firstly, I believe you meant that you were looking to use a ReteStorm/Squall instance as a black box within a MQTT network, right? i.e. MQTT subscription(s) provide an input to the queries implemented in ReteStorm, then the output of the implemented queries could be exposed as MQTT publication(s).

Also a possibilty (though one we do not personally intend to investigate at the moment) would be creating a new builder that deploys the network using MQTT rather than Storm. If you were interested in that then it would mean digging into the code far more extensively (but probably less so than doing what you have proposed in the original ReteStorm!). The package hierarchy to look in in that case would be org.openimaj.squall.build.

I'll go into more detail about exactly how to get a Squall-based black box interacting with MQTT streams as input and output below.

Good Luck :-)


In ReteStorm, being tied to Storm on a fundamental level, this would be achieved by writing an MQTT Spout that subscribes to an MQTT publication and an MQTT "sink" (1) that publishes to a new one. Because of the heavy interlinking between all the functionality in ReteStorm, a new builder would also need to be written that makes use of these MQTT Spouts and "sinks", which is where things get really complicated.

In the newer version of the project, Squall, this would become much simpler (though I will go into more detail), as we abstract the concepts of Spouts, Bolts, etc. into their core intended functionality, which can then be wrapped in various distribution-specific classes as desired.

Subscribing to an MQTT Publication

So instead of producing a new Spout and integrating it into the builder, you would write one or two new Functions, one for reading an MQTT IRI and producing an InputStream (Function<URI, InputStream>) (2), and maybe one for turning the provided InputStream into a Stream of Contexts (Function<InputStream, Stream>). If the InputStream produced by your MQTT SchemeFunction is in the form of Turtle or nTriples, then there are already ProfileFunctions for these in org.openimaj.squall.compile.data.source, so you need not write the second function. If the InputStream produced is in some other standard form, then you can write your own ProfileFunction to convert the URI into a Steam. However, if you can't produce an InputStream in some standard form, you can write a single PureSchemeFunction (Function<URI, Stream>) that performs both functions at once (e.g. the KestrelSchemeFunction (2)).

Whatever, you produce, the functions must be added to URIProfileISourceFactory using the appropriate static methods:

After that, there is nothing more you need to do to subscribe to MQTT publications, other than reference them in your query source using your custom/standardised mqtt:// scheme.

The RIF compiler interprets "http://www.w3.org/ns/stream/" profile imports from the original RIF/XML document from which it receives the rule definitions (3), while the Jena Rule compiler receives its ISources ready made by some preprocessing in the SquallTool. C-SPARQL data sources will likely assume the profile "http://www.w3.org/ns/stream/Turtle" for any source using a scheme that uses a SchemeFunction rather than a PureSchemeFunction, but will use the IRIs provided by FROM STREAM statements.

Publishing a New MQTT Publication

At the other end, to produce a publication(s) from Squall, all you need to do is create a new class that implements the IOperation interface in org.openimaj.squall.compile.data, a new class (MQTTOperationMode) that extends OperationMode (org.openimaj.squall.tool.modes.operation), and extend the SquallTool to use your own extended version of the OperationModeOption enum (same package) that includes and enum for your new MQTT-based operation mode.


(1) Declare a new sink as in the code snippet below. See package org.openimaj.rdf.storm.sparql.topology.bolt.sink for examples using files and kestrel queues.

class MQTTPublication extends org.openimaj.rdf.storm.sparql.topology.bolt.StormSPARQLReteConflictSetBolt.StormSPARQLReteConflictSetBoltSink {...}

(2) e.g. there are simple examples for the http:// protocol and the file:// protocol, as well as more complex functions for interpreting custom protocols such as kestrel:// for specifying Kestrel queues as input streams (org.openimaj.squall.compile.data.source.KestrelSchemeFunction). I imagine an MQTTSchemeFunction would fall somewhere between the the two in terms of complexity.

(3) e.g.

<Import>
    <location>...</location>
    <profile>http://www.w3.org/ns/stream/...</profile>
</Import>
stmeissner commented 10 years ago

Thanks David,

this is what I call a detailed response :) I'll have a look at this.

For now it is important for me to read RDF from MQTT publications, don't bother about squall-internal MQTT communication. This would make things too complicated ;)