Closed Aklakan closed 2 years ago
No, not really; I'll come back to #JENA-2309 soon (ideally this week, or at least next one).
Its more related to my contribution of the ServiceExeutorRegistory and whether and how it would have to be revised. We discussed bulk requests in service clauses quite a long time ago (e.g. the Lotico event back then) and it has been a recurring issue in general and in our group - so instead of always spending time on workarounds I thought maybe its time to try to tackle it :)
Talking of finishing things ... ... documentation for custom service execution for Wikidata would be good ... the bearer auth should now work for you; it is naturally environment sensitive so the more validation the better
Updated title because it turned that the bulk retrieval has to be cache-aware if caching is desired. The PR draft already includes a fully streaming caching system that "de-partitions" bulk retrieval responses in order to cache ranges of data on a per-input-binding basis. Conversely, creation of the bulk request considers the ranges of data already present in the cache.
In the process of implementing this bulk retrieval and caching I have extended the custom service executor system such that the main contribution of this PR (i.e. bulk processing and caching) can be added as a plugin with no strong ties to ARQ.
I suppose that these revisions could/should eventually be moved to their own separate issue and PR but because they happened in the context of this issue I am writing this here for now. I am proposing the following revisions and extensions:
The original ServiceExectorFactory continues to exist with the same signature but is now called ServiceExecutor. The rationale is that an executor is already a factory for executions.
public interface ServiceExecutor {
ServiceExecution createExecution(OpService opExecute, OpService original, Binding binding,
ExecutionContext execCxt);
}
There is now a new ChainingServiceExectutor which enables implementing chains (much like jetty servlet filters):
public interface ChainingServiceExecutor {
ServiceExecution createExecution(OpService opExecute, OpService opOriginal, Binding binding,
ExecutionContext execCxt, ServiceExecutor chain);
}
This allows for passing modified/rewritten ops down the chain - which is not possible with the ServiceExecutor interface.
The ServiceExecutorRegistry
still exists but now internally holds ChainingServiceExecutor
instances rather than ServiceExecutor
.
There is ServiceExecutorOverRegistry
which abstracts the existing ServiceExecutorRegistry
.
public interface ServiceExecutorBulk {
ServiceExecution createExecution(OpService opService, QueryIterator input, ExecutionContext execCxt);
}
public interface ChainingServiceExecutorBulk { ServiceExecution createExecution(OpService opService, QueryIterator input, ExecutionContext execCxt, ServiceExecutorBulk chain); }
* There is now a new `ServiceExecutorRegistryBulk` which is essentially the same as `ServiceExecutorRegistry` but with an internal list of `ChainingServiceExecutorBulk` instances.
* By default, `ServiceExecutorRegistryBulk` is initialized with an `ChainingServiceExecutorBulkToSingle` instance which bridges to the non-bulk registry.
* `QueryIterService` is superseded by `QueryIterServiceBulk`which essentially only delegates to the bulk registry:
```java
@Override
protected QueryIterator nextStage(QueryIterator input) {
...
ServiceExecutorRegistryBulk registry = ServiceExecutorRegistryBulk.get(cxt);
ServiceExecutor seviceExecutor = new ServiceExecutorBulkOverRegistry(registry);
ServiceExecution svcExec = serviceExecutor.createExecution(opService, input, execCxt);
QueryIterator result = svcExec.exec();
return result;
}
This way, caching and bulk retrieval system can be plugged in using
ServiceExecutorRegistryBulk.get().chain(new ChainingServiceExecutorBulkSpecial());
ChainingServiceExecutorBulkSpecial addes support for special service iris such as
SERVICE <cache:bulk+20:correlate:http://dbpedia.org/sparql> { }
or equivalently
SERVICE <cache:bulk+20:correlate> { SERVICE <http://dbpedia.org/sparql> { } }
where:
Rename.reverseVarRename
such that substitution with the outer variables (regardless of scoping) can be performed. It allows for
BIND(<s> AS ?s)
SERVICE <correlate:http://foo/sparql> { {
SELECT ?x { { BIND(?s AS ?x) } UNION { BIND(?s AS ?y) }
} }
to yield bindings where ?x and ?y are bound to <s>
.
(There is a caveat that this only works reliably if none of the joining variables is projected because otherwise VarFinder may prevent conversion of OpJoin to OpSequence).
With the revised plugin system it is easy to add/remove this special functionality or add more custom ones.
A screenshot with cache and correlate in action to give some impression about the current state of the implementation.
This allows for passing modified/rewritten ops down the chain - which is not possible with the ServiceExecutor interface.
Put any state the processing needs as an object in the context carried by the execution context.
This has per execution scope.
Yes, that's what I am doing already for configuration of bulk sizes or caching as in ChainingServiceExecutorBulkSpecial.java#L132.
What I meant is, that the chaining API allows for passing arguments to whatever service executors remain in the two service executor registries (bulk and non-bulk).
I added a new example to the CustomServiceExecutor examples which uses the chaining API - just below the non-chaining version. In summary:
"Old" approach:
ServiceExecutorFactory relaySef = (opExecute, original, binding, execCxt) -> {
if (opExecute.getService().equals(WIKIDATA)) {
opExecute = new OpService(DBPEDIA, opExecute.getSubOp(), opExecute.getSilent());
// ISSUE: We cannot easily forward the new OpExecute to the remainder of the
// registered service executors. We would have to check at which index 'relaySef'
// is registered and then manually do the forwarding
// So for simplicity we just invoked the httpService directly - bypassing anything
// in the executor registry
return ServiceExecutorRegistry.httpService.createExecutor(opExecute, original, binding, execCxt);
}
return null;
};
Improved API:
ChainingServiceExecutor relaySef = (opExecute, original, binding, execCxt, chain) -> {
if (opExecute.getService().equals(WIKIDATA)) {
opExecute = new OpService(DBPEDIA, opExecute.getSubOp(), opExecute.getSilent());
}
// The remainder of the registry is nicely taken into account
return chain.createExecution(opExecute, original, binding, execCxt);
};
Discussion about improving the SERVICE extension mechanism separately from the extension to do bulk retrieval, caching etc on the PR #1315 .
Related: correlated subquery / lateral join for subqueries: https://www.ontotext.com/blog/graphdb-users-ask-does-graphdb-support-reusing-values-in-sub-selects/
The reason why - at least at present - I like the SERVICE <correlate:> {}
approach is that it gives full control to the user within the SPARQL 1.1 syntax. Caching is most useful with correlated joins so that the result of the right-hand-side is cached for each input binding. In addition, with the special urn urn:arq:self
(which is implied if no other service IRI is specified) to correlate back to the active dataset it's a very powerful feature.
In case you'd prefer an alternative approach please let me know.
Either. We don't have to stay within strict SPARQL syntax because the meaning of the evaluation is engine specific so custom syntax is not a restriction except on 3rd party UI tools.
(first pass): Stardog looking like "SERVICE" is used only for a stored query but could be extended to "SERVICE { annotate { subquery } }". The meaning variables in "annotate" is quoting.
Ontotext is inline query, possible less clear to the user with the use of VALUES
We can have both! Nice syntax and a forced syntax.
LOOP (?v1 ?v2) { subquery }
which loops on the LHS.
(there are other kinds of correlated query - EXISTS is a correlated ASK; and like SQL, there could be a filter function "IN { }").
The advantage of extra LOOP syntax would be that the loop variables can be explicitly specified in an easy way. With the SERVICE approach I was thinking about the rule that any variable visible on the lhs and mentioned in the rhs would be implicit loop variables.
With regard to caching and bulk retrieval, I think those would remain as options on the SERVICE clause: The LOOP yields a QueryIterator of input bindings and the service extension would then take care of grouping as many input bindings as it sees fit into batch requests (as it is done right now) - so all options (except for correlate) would still be at the level of the service clause:
LOOP (?x) { SERVICE <cache:bulk+20:http://dbpedia.org/sparql> { }
(The options may also be represented using your proposed syntax extension - something along the lines of SERVICE <http://dbpedia.org/sparql> ARGS bulk=20 cache
)
Note, that IMHO a clean solution would require making TransformScopeRename (and related classes) aware of correlated joins and scoping of the loop variables. So this feature actually has implications on the core of ARQ.
ARGS bulk=20 cache
-- no string -- is harder because SPARQL has fixed keywords that drive the grammar. The tokenizer knows the keywords.
A parser that has a keyword-neutral tokenizer moves the work into the grammar part.
Q: What are the variables in LOOP for? Is it so they are replaced even if not exposed in the projection results? (this changed how variable scoping is done - at the moment, some variables are renamed as ?/var
to hide thm from other, unrelated uses of ?var
.
Whatever the technology grand plan is, we need to make sure you're not blocked waiting for some deep change in the general query engine.
My first though about how to interpret LOOP variables was as follows:
?s ?p ?o .
LOOP (?s) {
?s ?p ?o # becomes ?s ?/p ?/o
LOOP (?p) {
?s ?p ?o # becomes ?/s ?/p ?//o
}
LOOP (?s ?p) {
?s ?p ?o # becomes ?s ?/p ?///o
}
}
But this means that any 'output' would have to be declared in addition and I guess that would be quite odd to use.
So if I understand correctly then the purpose of loop variables is to specify which of the variables to substitute regardless of scoping:
?s ?p ?o
LOOP (?s) {
SELECT (MIN(?o) AS ?x) {
?s ?p ?o # Substitute the ?s here for any input ?s - even if internally we have ?/s ?/p ?/o
} GROUP BY ?p
}
This would mean that LOOP itself does not start a new scope - so ?x is visible to the outside as is.
So my remark with having to touch TransformScopeRename
may thus be wrong and it's actually just a matter of getting the substitution right in the presence of scoped variables (and leaving the scope of unaffected variables untouched).
For my work on the plugin I may need to revise with my recent thoughts on this subject.
I think I am now working on the last problem for finishing the functionality of the PR: dealing with unknown result set limits in the cache: A service clause with caching and/or bulk enabled should yield the exact same result as if these modifiers were absent. This means I need to fiddle in some logic tha considers the 'largest seen result set size on a service so far' and which avoids serving data from cache if it cannot be guaranteed that the invisible result set size limit will be adhered to.
The code of this PR is now under jena-extras/jena-serviceplugins
(maybe singular?).
The code should now cover all functionality and some simple examples work - so my next step is on this PR is writing tests especially for the result set limit detection & handling logic.
Ref issue #1387.
There are now tests that compare the outcomes of queries with different parameters for bulk size and caching enabled/disabled. So far all succeed - whoever reads this and has interest, feel free propose more tests.
The jena-serviceenhancer module (under jena-extras) now also has an assembler and a test case to check whether assembly works.
The pom includes an ext
profile that creates a shaded jar for use with Fuseki.
Launching fuseki with the serviceenhancer jar on the classpath and an appropriate assembler successfully gives access to the enhanced functionality (see the queries in the test suite).
Currently the core interfaces of the caching API are in our third party 'aksw-commons' project. I'll try to move this to the module to make it self-contained. The idea would be to have a basic in-memory cache provider part of the module and allow for support for other cache providers (I actually also have a disk based one, but that one could be loaded as a separate plugin via a maven dependency). The main issue I now have to work on is that the cache API lacks a method to suppress eviction of data ranges that have been scheduled for answering an active query. Other than that of course cleanup is needed.
One opinion I'm interested in is how the Fuseki integration should work - should it remain an extra module that can be shaded as it is currently done? And if so what would be an appropriate maven profile name?
I am quite excited about the progress - finally nice filtering over cached statistics. Once cached, it only took 0.128 seconds to get out the triples per graph over half a billion quads in a TDB2 . The filter statement in the picture runs on the cached result so changing it still makes the query execute instantly. Whether I can finally start with the clean up depends yet on how many more bugs @LorenzBuehmann finds though :)
The PR is now overall ready for review.
There is a preliminary jena-extras/jena-serviceenhancer/README.md which I will extend in the next days and the javadocs will need another pass too. There are several test cases which may also be a good starting point. Due to vacation progress is a bit slow.
Document can be after the release though to make impact it is better to have enough for users when you have their attention.
The next Jena release cycle is in the vacation season for many so might be more about preparation/"experimental"/"beta" then do more at the one after that (end October maybe).
The choice is yours of how to get the most here.
@afs Some time ago you mentioned that you unsubscribed from the PR stream so I am just dropping a note here as well that from my side the requested changes were implemented and the PR should be complete for a release of the experimental module.
I tried to unsubscribe :-) Seems there are numerous ways GH will resubscribe people!
The associated PR #1315 has been merged.
Jena's current approach to fetch bindings for a SERVICE clause is to instantiate the service pattern for each binding. This issue is about adding support for bulk requests. Opening this issue to allocate a issue number for my corresponding PR draft.