Open ddeboer opened 3 months ago
What is the bottleneck? If it is memory consumption by Fuseki, maybe tdb2.tdbloader should be included after mapping.
The first bottleneck is an OOM when running ./map.sh
, which relies on https://github.com/RMLio/rmlmapper-java:
The RMLMapper loads all data in memory, so be aware when working with big datasets.
So why not try CARML, which is supposed to be streaming: 😄
$ java -jar $BIN_DIR/carml.jar map -m $CONFIG_DIR/geonames.ttl -o $OUTPUT_DIR/geonames.nt
Running the RMLmapper to convert geonames data to RDF, be patient...
io.carml.rdfmapper.impl.CarmlMapperException: could not find a java type corresponding to rdf type [http://www.w3.org/ns/csvw#Table]
So unfortunately it isn’t completely compatible with https://github.com/RMLio/rmlmapper-java for our mapping file. To be honest, I don’t know much about RML. @pmaria Any ideas?
Yeah, CARML does not yet support CSVW, because it was never officially part of the RML spec. In the new specs it is incorporated, but I'm still in the process of implementing those.
You could try the mapping without using CSVW:
So something like:
:GeonamesSource
a rml:LogicalSource ;
rml:source "geonamesplus.txt";
rml:referenceFormulation ql:CSV .
for all the logical sources should work. Let me know if I can help.
Thanks @pmaria. That throws:
io.carml.engine.RmlMapperException: Could not resolve source for logical source: resource <http://example.org/rules/GeonamesSource>
Even with an absolute file path as the rml:source
.
io.carml.engine.RmlMapperException: Could not resolve source for logical source: resource <http://example.org/rules/GeonamesSource>
at io.carml.engine.RmlMapper.lambda$resolveSource$3(RmlMapper.java:133)
at java.base/java.util.Optional.orElseThrow(Optional.java:403)
at io.carml.engine.RmlMapper.resolveSource(RmlMapper.java:133)
at io.carml.engine.RmlMapper.lambda$getSources$2(RmlMapper.java:113)
at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:212)
at java.base/java.util.HashMap$EntrySpliterator.forEachRemaining(HashMap.java:1858)
at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:556)
at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:546)
at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921)
at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:265)
at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:702)
at io.carml.engine.RmlMapper.getSources(RmlMapper.java:114)
at io.carml.engine.RmlMapper.map(RmlMapper.java:86)
at io.carml.engine.RmlMapper.map(RmlMapper.java:75)
at io.carml.engine.RmlMapper.map(RmlMapper.java:71)
at io.carml.engine.RmlMapper.map(RmlMapper.java:63)
at io.carml.jar.runner.CarmlMapCommand.map(CarmlMapCommand.java:174)
at io.carml.jar.runner.CarmlMapCommand.call(CarmlMapCommand.java:107)
at io.carml.jar.runner.CarmlMapCommand.call(CarmlMapCommand.java:48)
at picocli.CommandLine.executeUserObject(CommandLine.java:2041)
at picocli.CommandLine.access$1500(CommandLine.java:148)
at picocli.CommandLine$RunLast.executeUserObjectOfLastSubcommandWithSameParent(CommandLine.java:2461)
at picocli.CommandLine$RunLast.handle(CommandLine.java:2453)
at picocli.CommandLine$RunLast.handle(CommandLine.java:2415)
at picocli.CommandLine$AbstractParseResultHandler.execute(CommandLine.java:2273)
at picocli.CommandLine$RunLast.execute(CommandLine.java:2417)
at io.carml.jar.runner.option.LoggingOptions.executionStrategy(LoggingOptions.java:42)
at picocli.CommandLine.execute(CommandLine.java:2170)
at io.carml.jar.runner.CarmlRunner.run(CarmlRunner.java:33)
at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:768)
at org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:752)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:314)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1303)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1292)
at io.carml.jar.app.CarmlJarJenaApplication.main(CarmlJarJenaApplication.java:12)
at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
at java.base/java.lang.reflect.Method.invoke(Method.java:580)
at org.springframework.boot.loader.MainMethodRunner.run(MainMethodRunner.java:49)
at org.springframework.boot.loader.Launcher.launch(Launcher.java:108)
at org.springframework.boot.loader.Launcher.launch(Launcher.java:58)
at org.springframework.boot.loader.JarLauncher.main(JarLauncher.java:65)
@pmaria Using https://github.com/carml/carml-jar/tree/nde throws:
io.carml.engine.RmlMapperException: error executing function
at io.carml.engine.function.Functions$1.execute(Functions.java:87)
at io.carml.engine.rdf.RdfTermGeneratorFactory.lambda$mapExecution$24(RdfTermGeneratorFactory.java:376)
at java.base/java.util.Optional.map(Optional.java:260)
at io.carml.engine.rdf.RdfTermGeneratorFactory.mapExecution(RdfTermGeneratorFactory.java:369)
at io.carml.engine.rdf.RdfTermGeneratorFactory.mapFunctionExecution(RdfTermGeneratorFactory.java:338)
at io.carml.engine.rdf.RdfTermGeneratorFactory.lambda$getFunctionValueGenerator$18(RdfTermGeneratorFactory.java:319)
at io.carml.engine.rdf.RdfTermGeneratorFactory.generateValues(RdfTermGeneratorFactory.java:196)
at io.carml.engine.rdf.RdfTermGeneratorFactory.lambda$getGenerator$4(RdfTermGeneratorFactory.java:153)
at io.carml.engine.rdf.RdfPredicateObjectMapper.lambda$map$9(RdfPredicateObjectMapper.java:185)
at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:212)
at java.base/java.util.Iterator.forEachRemaining(Iterator.java:133)
at java.base/java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1939)
at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:556)
at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:546)
at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921)
at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:265)
at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:702)
at io.carml.engine.rdf.RdfPredicateObjectMapper.map(RdfPredicateObjectMapper.java:187)
at io.carml.engine.rdf.RdfTriplesMapper.lambda$mapEvaluation$8(RdfTriplesMapper.java:183)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:388)
at reactor.core.publisher.FluxIterable$IterableSubscription.slowPath(FluxIterable.java:335)
at reactor.core.publisher.FluxIterable$IterableSubscription.request(FluxIterable.java:294)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.onSubscribe(FluxFlatMap.java:373)
at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:201)
at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:83)
at reactor.core.publisher.Flux.subscribe(Flux.java:8777)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:430)
at reactor.core.publisher.FluxArray$ArraySubscription.slowPath(FluxArray.java:126)
at reactor.core.publisher.FluxArray$ArraySubscription.request(FluxArray.java:99)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.onSubscribe(FluxFlatMap.java:373)
at reactor.core.publisher.FluxMerge.subscribe(FluxMerge.java:73)
at reactor.core.publisher.Flux.subscribe(Flux.java:8777)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:430)
at reactor.core.publisher.FluxIterable$IterableSubscription.slowPath(FluxIterable.java:335)
at reactor.core.publisher.FluxIterable$IterableSubscription.request(FluxIterable.java:294)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.onSubscribe(FluxFlatMap.java:373)
at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:201)
at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:83)
at reactor.core.publisher.Flux.subscribe(Flux.java:8777)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:430)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129)
at reactor.core.publisher.FluxIterable$IterableSubscription.slowPath(FluxIterable.java:335)
at reactor.core.publisher.FluxIterable$IterableSubscription.request(FluxIterable.java:294)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:171)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.onSubscribe(FluxFlatMap.java:373)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:96)
at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:201)
at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:83)
at reactor.core.publisher.Flux.subscribe(Flux.java:8777)
at reactor.core.publisher.FluxFlatMap.trySubscribeScalarMap(FluxFlatMap.java:202)
at reactor.core.publisher.FluxFlatMap.subscribeOrReturn(FluxFlatMap.java:94)
at reactor.core.publisher.Flux.subscribe(Flux.java:8762)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:430)
at reactor.core.publisher.FluxIterable$IterableSubscription.slowPath(FluxIterable.java:335)
at reactor.core.publisher.FluxIterable$IterableSubscription.request(FluxIterable.java:294)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.onSubscribe(FluxFlatMap.java:373)
at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:201)
at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:83)
at reactor.core.publisher.Flux.subscribe(Flux.java:8777)
at reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.onComplete(FluxConcatArray.java:260)
at reactor.core.publisher.FluxConcatArray.subscribe(FluxConcatArray.java:79)
at reactor.core.publisher.Flux.subscribe(Flux.java:8777)
at reactor.core.publisher.Flux.blockLast(Flux.java:2755)
at io.carml.jar.runner.output.JenaOutputHandler.outputStreaming(JenaOutputHandler.java:83)
at io.carml.jar.runner.CarmlMapCommand.outputRdf(CarmlMapCommand.java:218)
at io.carml.jar.runner.CarmlMapCommand.outputWithoutPath(CarmlMapCommand.java:212)
at io.carml.jar.runner.CarmlMapCommand.lambda$handleOutput$6(CarmlMapCommand.java:188)
at java.base/java.util.Optional.orElseGet(Optional.java:364)
at io.carml.jar.runner.CarmlMapCommand.handleOutput(CarmlMapCommand.java:188)
at io.carml.jar.runner.CarmlMapCommand.call(CarmlMapCommand.java:108)
at io.carml.jar.runner.CarmlMapCommand.call(CarmlMapCommand.java:48)
at picocli.CommandLine.executeUserObject(CommandLine.java:2045)
at picocli.CommandLine.access$1500(CommandLine.java:148)
at picocli.CommandLine$RunLast.executeUserObjectOfLastSubcommandWithSameParent(CommandLine.java:2465)
at picocli.CommandLine$RunLast.handle(CommandLine.java:2457)
at picocli.CommandLine$RunLast.handle(CommandLine.java:2419)
at picocli.CommandLine$AbstractParseResultHandler.execute(CommandLine.java:2277)
at picocli.CommandLine$RunLast.execute(CommandLine.java:2421)
at io.carml.jar.runner.option.LoggingOptions.executionStrategy(LoggingOptions.java:42)
at picocli.CommandLine.execute(CommandLine.java:2174)
at io.carml.jar.runner.CarmlRunner.run(CarmlRunner.java:33)
at org.springframework.boot.SpringApplication.lambda$callRunner$5(SpringApplication.java:790)
at org.springframework.util.function.ThrowingConsumer$1.acceptWithException(ThrowingConsumer.java:83)
at org.springframework.util.function.ThrowingConsumer.accept(ThrowingConsumer.java:60)
at org.springframework.util.function.ThrowingConsumer$1.accept(ThrowingConsumer.java:88)
at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:798)
at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:789)
at org.springframework.boot.SpringApplication.lambda$callRunners$3(SpringApplication.java:774)
at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
at java.base/java.util.stream.SortedOps$SizedRefSortingSink.end(SortedOps.java:357)
at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:557)
at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:546)
at java.base/java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151)
at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174)
at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:265)
at java.base/java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:611)
at org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:774)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:341)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1354)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1343)
at io.carml.jar.app.CarmlJarJenaApplication.main(CarmlJarJenaApplication.java:12)
at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
at java.base/java.lang.reflect.Method.invoke(Method.java:580)
at org.springframework.boot.loader.launch.Launcher.launch(Launcher.java:91)
at org.springframework.boot.loader.launch.Launcher.launch(Launcher.java:53)
at org.springframework.boot.loader.launch.JarLauncher.main(JarLauncher.java:58)
Suppressed: java.lang.Exception: #block terminated with an error
at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:103)
at reactor.core.publisher.Flux.blockLast(Flux.java:2756)
... 43 more
Caused by: java.lang.reflect.InvocationTargetException
at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:115)
at java.base/java.lang.reflect.Method.invoke(Method.java:580)
at io.carml.engine.function.Functions$1.execute(Functions.java:79)
... 105 more
Caused by: java.lang.NullPointerException: Cannot invoke "String.split(String)" because "s" is null
at io.carml.jar.runner.GrelFunctions.split(GrelFunctions.java:15)
at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
... 107 more
That’s probably due to the data containing null values. Do you prefer to handle that in your implementation or should I filter out (how?) null values in the config?
@ddeboer I pushed a fix
Thanks, that helps! Got 12 GB of .nt output without any OOMs.
@pmaria However, the output for the predicate in question (alternateName
) using this config is now a set of URIs instead of literals:
<https://sws.geonames.org/2036085/> <https://www.geonames.org/ontology#name> "Limudiancun" .
<https://sws.geonames.org/2036085/> <http://www.w3.org/2003/01/geo/wgs84_pos#longitude> "126.20836" .
<https://sws.geonames.org/2036085/> <https://www.geonames.org/ontology#countryCode> "CN" .
<https://sws.geonames.org/2036085/> <http://www.w3.org/2003/01/geo/wgs84_pos#latitude> "45.96321" .
<https://sws.geonames.org/2036085/> <https://www.geonames.org/ontology#alternateName> <http://example.com/base/里木店村> .
<https://sws.geonames.org/2036085/> <https://www.geonames.org/ontology#alternateName> <http://example.com/base/Limudian> .
<https://sws.geonames.org/2036085/> <https://www.geonames.org/ontology#alternateName> <http://example.com/base/li%20mu%20dian%20cun> .
<https://sws.geonames.org/2036085/> <https://www.geonames.org/ontology#alternateName> <http://example.com/base/Limudiancun> .
@pmaria Any ideas?
Yes, this is another problem of non-specification which will be fixed with the new spec. CARML defaults to generating IRIs for functions, while RML Mapper defaults to literals.
You can add rr:termType rr:Literal
to the function calling object map.
so in this case:
:AlternateNamesSplit
rr:termType rr:Literal ;
fnml:functionValue [
rml:logicalSource :LogicalSource;
rr:predicateObjectMap [
rr:predicate fno:executes;
rr:objectMap [ rr:constant grel:string_split ];
];
rr:predicateObjectMap [
rr:predicate grel:valueParameter;
rr:objectMap [ rml:reference "alternatenames" ];
];
rr:predicateObjectMap [
rr:predicate grel:p_string_sep;
rr:objectMap [ rr:constant "," ];
];
].
Thanks, I get literals now. As I said before, I was able to generate 12 GB of N-Triples. However, when mapping now it starts slowing down ~4.1 GB (at 800% CPU, which is perhaps to be expected).
Interesting, I will see if I can reproduce locally.
I am able to reproduce this.
The problem of high CPU is a side-effect of the heap space being used up, and that is caused by the joins in the mapping. Intermediary results for joins with conditions are still stored in-memory in CARML. In theory this could be handled using some intermediate persistence, but currently this is not implemented. It does surprise me however that it costs as much memory as it does. I will see if I can investigate that further.
It is interesting that you were able to run it before though. Did anything change in the mapping?
Perhaps when I had disabled the joins. I can confirm that doing so makes the process run with ~100% CPU and more reasonable memory consumption, outputting ~11 GB of data.
Do you see any possible solutions to the join problem? I could of course offload the joins to some (shell) script, but that rather defeats the purpose of using RML.
Keeping the current selection of administrative units. Is this feasible?