thinkaurelius / titan

Distributed Graph Database
http://titandb.io
Apache License 2.0
5.25k stars 1.01k forks source link

Initial Spark support #1045

Closed mbroecheler closed 9 years ago

mbroecheler commented 9 years ago

as a first step toward #1021, it seems we can get Spark support in Titan by simply reusing the existing InputFormats for Hadoop in Spark. While this may not be the most efficient way to go about this, it would provide us with an easy first integration opportunity to investigate Spark support and get some feedback in the Titan 0.9M2 release.

It is unclear as to what exactly is needed here, but @okram and @dkuppitz can help.

dalaro commented 9 years ago

Here's an old repo that might be tangentially useful on the input side. It's the simplest possible proof-of-concept that I could concoct that wrapped a Titan 0.5 inputformat in a Spark RDD: https://github.com/dalaro/titan-spark-test. Haven't touched it in a couple of months though.

mbroecheler commented 9 years ago

From what I gathered this wouldn't even be necessary since all we need is the Hadoop InputFormat, right @okram.

okram commented 9 years ago

I wrote an email to you guys, but here it is for issue record:

  1. Reading from Titan will be easy as that is simply setting the InputFormat to be TitanCassandraInputFormat in your Hadoop properties file --- along with the various properties that Titan requires.
  2. Writing to Titan is a different story. TitanCassandraOutputFormat will not work as that was something very specific to Faunus where the OutputFormat could contain any number of MapReduce jobs. We are no longer doing that as that is particular to Titan. Instead, we will have to use BulkLoaderVertexProgram. That hasn't been written yet (though I think Dan still might have a version that he and I wrote for TP3/Titan some time ago that is Titan specific).
dalaro commented 9 years ago

Reading is indeed pretty simple. I ran into a thorny classpath conflict around Netty that affects Spark, but I made some temporary hacks to get it working, though I need to return to this to see what else is affected by the conflict.

Anyway, with Netty classpath conflicts out of the way, I could read from Titan-Cassandra after preloading GotG through OLTP:

gremlin> graph = GraphFactory.open('hg.prop')
==>hadoopgraph[cassandrainputformat->gryooutputformat]
gremlin> g = graph.traversal(computer(SparkGraphComputer))
==>graphtraversalsource[hadoopgraph[cassandrainputformat->gryooutputformat], sparkgraphcomputer]
gremlin> g.V().count()
... (spark progress messages) ...
==>12                                                                           
gremlin> g.V().valueMap()
... (spark progress messages) ...
==>[name:[hercules], age:[30]]                                                  
==>[name:[cerberus]]
==>[name:[saturn], age:[10000]]
==>[name:[sea]]
==>[name:[pluto], age:[4000]]
==>[name:[jupiter], age:[5000]]
==>[name:[nemean]]
==>[name:[hydra]]
==>[name:[neptune], age:[4500]]
==>[name:[sky]]
==>[name:[alcmene], age:[45]]
==>[name:[tartarus]]
# hg.prop
gremlin.graph=org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph
gremlin.hadoop.graphInputFormat=com.thinkaurelius.titan.hadoop.formats.cassandra.CassandraInputFormat
gremlin.hadoop.graphOutputFormat=org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoOutputFormat
gremlin.hadoop.memoryOutputFormat=org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat
gremlin.hadoop.deriveMemory=false
gremlin.hadoop.jarsInDistributedCache=true
gremlin.hadoop.inputLocation=none
gremlin.hadoop.outputLocation=output
input.conf.storage.backend=cassandra
input.conf.storage.hostname=localhost
#####################################
# GiraphGraphComputer Configuration #
#####################################
giraph.minWorkers=2
giraph.maxWorkers=2
giraph.useOutOfCoreGraph=true
giraph.useOutOfCoreMessages=true
mapred.map.child.java.opts=-Xmx1024m
mapred.reduce.child.java.opts=-Xmx1024m
giraph.numInputThreads=4
giraph.numComputeThreads=4
giraph.maxMessagesInMemory=100000
####################################
# SparkGraphComputer Configuration #
####################################
spark.master=local[4]
spark.executor.memory=1g
spark.serializer=org.apache.spark.serializer.KryoSerializer
cassandra.input.partitioner.class=org.apache.cassandra.dht.Murmur3Partitioner

I'm fuzzy on how the write side will work. I suspect the worst case is probably something like writing Gryo from Spark to disk/HDFS and then executing a separate BulkLoaderVertexProgram computation that reads the Gryo files. The BulkLoader Marko and I worked on is still in titan09, though I haven't touched it in a while.

dalaro commented 9 years ago

I also just noticed that vertices without relations do not appear in Spark count or valueMap output. This could be due to CassandraInputFormat (i.e. the Titan bits involved here). I'm not sure at the moment.

okram commented 9 years ago

@dalaro -- what do you mean "vertices without relations." You mean edge-less vertices? I don't have any test cases that test with edge-less vertices so perhaps its a Spark thing...... ?? eek. We may have to create a new @LoadGraphData with a toy graph that has "corner case topologies": edge-less vertices, self-loops, zero property knows-edges and single property knows-edges, etc. cc/ @spmallette

dalaro commented 9 years ago

@okram right, g.addVertex() ; g.tx().commit().

At its storage level, Titan doesn't really allow for a relationless vertex. There's a hidden system relation on every extant vertex, even if a vertex hase no user-visible relations. I suspect that I'm not translating that into a TP3 compatible analog in the inputformat (if one exists in TP3).

okram commented 9 years ago

An edge-less vertex is simply a vertex with v.edges(BOTH).hasNext() == false. It is nothing "special" in TP3. Is there a way for you to test edge-less-ness at the TitanXXXInputFormat level? That is, without SparkGraphComputer. In other words, just get a RecordReader from your InputFormat and iterate to see if edge-less vertices come back. See the following "file-based" InputFormat test helper for inspiration.

https://github.com/apache/incubator-tinkerpop/blob/master/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/TestFileReaderWriterHelper.java

dalaro commented 9 years ago

Working with Marko, I did the following on this issue today:

This gives us two ways to write to Titan from GraphComputers:

This stuff is still pretty raw and experimental. Known problems:

But this is just the proof-of-concept stage. Here's what I can do now (b4e62ef44976ec858bd0f27b2b013fefcc5be835). Feedback in general and in particular from @dkuppitz would be welcome.

  1. Store Spark-computed PageRank on Titan vertices using the OutputFormat

    # Load GotG into Titan-Cassandra, run PageRank, save results using TitanH1OutputFormat
    # (Shell commands below)
    bin/titan.sh stop
    mvn clean install -DskipTests=true
    rm lib/netty-3.2.7.Final.jar
    bin/titan.sh start
    bin/gremlin.sh <<EOF
    t = TitanFactory.open('conf/titan-cassandra-es.properties')
    GraphOfTheGodsFactory.load(t)
    
    // Pre-create pagerank property keys to avoid lock contention in the outputformat
    m = t.openManagement()
    m.makePropertyKey('gremlin.pageRankVertexProgram.pageRank').dataType(Double.class).make()
    m.makePropertyKey('gremlin.pageRankVertexProgram.edgeCount').dataType(Double.class).make()
    m.commit()
    t.close()
    
    // Temporary hack -- avoids an annoying but harmless NPE in future.get() below
    InputOutputHelper.registerInputOutputPair(FileInputFormat.class, com.thinkaurelius.titan.hadoop.formats.TitanH1OutputFormat.class)
    
    // Run PageRank on Spark and write the element compute keys to the respective Titan vertices
    hadoopGraph = GraphFactory.open('hg.prop')
    future = hadoopGraph.compute(SparkGraphComputer.class).program(new PageRankVertexProgram()).submit()
    future.get()
    
    // Dump vertices in OLTP, showing the just-added PR props
    t = TitanFactory.open('conf/titan-cassandra-es.properties')
    t.traversal().V().valueMap()
    EOF

    Here is the hg.prop file referenced above:

    gremlin.graph=org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph
    gremlin.hadoop.graphInputFormat=com.thinkaurelius.titan.hadoop.formats.cassandra.CassandraInputFormat
    gremlin.hadoop.graphOutputFormat=com.thinkaurelius.titan.hadoop.formats.TitanH1OutputFormat
    gremlin.hadoop.memoryOutputFormat=org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat
    gremlin.hadoop.deriveMemory=false
    gremlin.hadoop.jarsInDistributedCache=true
    gremlin.hadoop.inputLocation=none
    gremlin.hadoop.outputLocation=output
    input.conf.storage.backend=cassandrathrift
    input.conf.storage.hostname=localhost
    #####################################
    # GiraphGraphComputer Configuration #
    #####################################
    giraph.minWorkers=2
    giraph.maxWorkers=2
    giraph.useOutOfCoreGraph=true
    giraph.useOutOfCoreMessages=true
    mapred.map.child.java.opts=-Xmx1024m
    mapred.reduce.child.java.opts=-Xmx1024m
    giraph.numInputThreads=4
    giraph.numComputeThreads=4
    giraph.maxMessagesInMemory=100000
    ####################################
    # SparkGraphComputer Configuration #
    ####################################
    spark.master=local[4]
    spark.executor.memory=1g
    spark.serializer=org.apache.spark.serializer.KryoSerializer
    cassandra.input.partitioner.class=org.apache.cassandra.dht.Murmur3Partitioner

    The last traversal should print something like this, irrespective of line order:

    ==>[gremlin.pageRankVertexProgram.pageRank:[0.23864803741939838], name:[cerberus], gremlin.pageRankVertexProgram.edgeCount:[1.0]]
    ==>[gremlin.pageRankVertexProgram.pageRank:[0.2295501250550773], name:[sea], gremlin.pageRankVertexProgram.edgeCount:[0.0]]
    ==>[gremlin.pageRankVertexProgram.pageRank:[0.21761710958434682], name:[sky], gremlin.pageRankVertexProgram.edgeCount:[0.0]]
    ==>[gremlin.pageRankVertexProgram.pageRank:[0.17550000000000002], name:[nemean], gremlin.pageRankVertexProgram.edgeCount:[0.0]]
    ==>[gremlin.pageRankVertexProgram.pageRank:[0.29716723463942407], name:[pluto], gremlin.pageRankVertexProgram.edgeCount:[4.0], age:[4000]]
    ==>[gremlin.pageRankVertexProgram.pageRank:[0.17550000000000002], name:[hydra], gremlin.pageRankVertexProgram.edgeCount:[0.0]]
    ==>[gremlin.pageRankVertexProgram.pageRank:[0.15000000000000002], name:[hercules], gremlin.pageRankVertexProgram.edgeCount:[5.0], age:[30]]
    ==>[gremlin.pageRankVertexProgram.pageRank:[0.17550000000000002], name:[alcmene], gremlin.pageRankVertexProgram.edgeCount:[0.0], age:[45]]
    ==>[gremlin.pageRankVertexProgram.pageRank:[0.31819816247447563], name:[jupiter], gremlin.pageRankVertexProgram.edgeCount:[4.0], age:[5000]]
    ==>[gremlin.pageRankVertexProgram.pageRank:[0.2807651470037452], name:[neptune], gremlin.pageRankVertexProgram.edgeCount:[3.0], age:[4500]]
    ==>[gremlin.pageRankVertexProgram.pageRank:[0.41599886933191116], name:[tartarus], gremlin.pageRankVertexProgram.edgeCount:[0.0]]
    ==>[gremlin.pageRankVertexProgram.pageRank:[0.21761710958434682], name:[saturn], gremlin.pageRankVertexProgram.edgeCount:[0.0], age:[10000]]
  2. Load Grateful Dead into Titan-Cassandra on Spark using BulkLoaderVertexProgram

    # Load Grateful Dead into Titan-Cassandra with Spark
    # (Shell commands below)
    bin/titan.sh stop
    mvn clean install -DskipTests=true
    rm lib/netty-3.2.7.Final.jar
    bin/titan.sh start
    bin/gremlin.sh <<EOF
    // Spark
    graph = GraphFactory.open('blvp.prop')
    
    // Doesn't work on Spark; debugging shows the VP config is empty at runtime
    // future = graph.compute(SparkGraphComputer.class).program(new BulkLoaderVertexProgram()).result(GraphComputer.ResultGraph.NEW).persist(GraphComputer.Persist.EDGES).submit()
    
    apacheGraphConf = new org.apache.commons.configuration.BaseConfiguration()
    apacheGraphConf.setProperty('storage.backend', 'cassandrathrift')
    future = graph.compute(SparkGraphComputer.class).program(new BulkLoaderVertexProgram().useGraphConfig(apacheGraphConf)).result(GraphComputer.ResultGraph.NEW).persist(GraphComputer.Persist.EDGES).submit()
    
    future.get()
    
    t = TitanFactory.open('conf/titan-cassandra-es.properties')
    t.traversal().V().valueMap()
    t.close()
    EOF

    The valueMap() should print familiar Grateful Dead stuff, such as:

    ==>[name:[Tampa_Red]]
    ==>[name:[Lesh_Hart_Kreutzmann]]
    ==>[name:[Garcia]]
    ==>[name:[THE FROZEN LOGGER], songType:[cover], performances:[6]]
    ==>[name:[TASTEBUD], songType:[original], performances:[1]]
    ==>[name:[ROCKIN PNEUMONIA], songType:[], performances:[0]]
    ==>[name:[Greenwich_Barry_Spector]]
    ==>[name:[JAM], songType:[original], performances:[24]]
    [etc...]

    Here's blvp.prop referenced above:

    gremlin.graph=org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph
    gremlin.hadoop.graphInputFormat=org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoInputFormat
    gremlin.hadoop.graphOutputFormat=org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoOutputFormat
    gremlin.hadoop.memoryOutputFormat=org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat
    gremlin.hadoop.deriveMemory=false
    gremlin.hadoop.jarsInDistributedCache=true
    gremlin.hadoop.inputLocation=data/grateful-dead-vertices.gio
    gremlin.hadoop.outputLocation=output
    input.conf.storage.backend=cassandrathrift
    input.conf.storage.hostname=localhost
    titan.bulkload.graphconfig.storage.backend=cassandrathrift
    titan.bulkload.graphconfig.storage.hostname=localhost
    #####################################
    # GiraphGraphComputer Configuration #
    #####################################
    giraph.minWorkers=1
    giraph.maxWorkers=1
    giraph.SplitMasterWorker=false
    giraph.useOutOfCoreGraph=true
    giraph.useOutOfCoreMessages=true
    mapred.map.child.java.opts=-Xmx1024m
    mapred.reduce.child.java.opts=-Xmx1024m
    giraph.numInputThreads=4
    giraph.numComputeThreads=4
    giraph.maxMessagesInMemory=100000
    ####################################
    # SparkGraphComputer Configuration #
    ####################################
    spark.master=local[4]
    spark.executor.memory=1g
    spark.serializer=org.apache.spark.serializer.KryoSerializer
    cassandra.input.partitioner.class=org.apache.cassandra.dht.Murmur3Partitioner
  3. Load Grateful Dead into Titan-Cassandra on Giraph using BulkLoaderVertexProgram

    # Load Grateful Dead into Titan-Cassandra with Giraph
    # (Shell commands below)
    bin/titan.sh stop
    mvn clean install -DskipTests=true
    rm lib/netty-3.2.7.Final.jar
    bin/titan.sh start
    bin/gremlin.sh <<EOF
    graph = GraphFactory.open('blvp.prop')
    future = graph.compute(GiraphGraphComputer.class).program(new BulkLoaderVertexProgram()).result(GraphComputer.ResultGraph.NEW).persist(GraphComputer.Persist.EDGES).submit()
    future.get()
    
    t = TitanFactory.open('conf/titan-cassandra-es.properties')
    t.traversal().V().valueMap()
    t.close()
    EOF

    The blvp.prop file is the same one used for Spark (above).

dalaro commented 9 years ago

Related commits:

dalaro commented 9 years ago

The netty dependency issue was fixed in 0c36a4ff1cffe26162d75df42f97a2ce19da4924. I tweaked the configuration keys and changed BLVP's ResultGraph default to NEW in fab0f61a025f824f76980d54c3aa2cfa435a8c79. These issues were all obviously problems in Titan. The remaining issues are either likely to be TP3 issues (like the seeming difference in SparkGC and GiraphGC config processing) or ambiguous (like the hung future). I think this is an acceptable spot to close this issue and push future work to other issues.

Here are the changes from the config key tweaks:

input.conf.* -> titanmr.ioformat.conf.*
titan.bulkload.graphconfig.* -> titanmr.bulkload.conf.*

Here's hg.prop with those changes:

gremlin.graph=org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph
gremlin.hadoop.graphInputFormat=com.thinkaurelius.titan.hadoop.formats.cassandra.CassandraInputFormat
gremlin.hadoop.graphOutputFormat=com.thinkaurelius.titan.hadoop.formats.TitanH1OutputFormat
gremlin.hadoop.memoryOutputFormat=org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat
gremlin.hadoop.deriveMemory=false
gremlin.hadoop.jarsInDistributedCache=true
gremlin.hadoop.inputLocation=none
gremlin.hadoop.outputLocation=output
titanmr.ioformat.conf.storage.backend=cassandrathrift
titanmr.ioformat.conf.storage.hostname=localhost
#####################################
# GiraphGraphComputer Configuration #
#####################################
giraph.minWorkers=2
giraph.maxWorkers=2
giraph.useOutOfCoreGraph=true
giraph.useOutOfCoreMessages=true
mapred.map.child.java.opts=-Xmx1024m
mapred.reduce.child.java.opts=-Xmx1024m
giraph.numInputThreads=4
giraph.numComputeThreads=4
giraph.maxMessagesInMemory=100000
####################################
# SparkGraphComputer Configuration #
####################################
spark.master=local[4]
spark.executor.memory=1g
spark.serializer=org.apache.spark.serializer.KryoSerializer
cassandra.input.partitioner.class=org.apache.cassandra.dht.Murmur3Partitioner

Here's blvp.prop with those changes:

gremlin.graph=org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph
gremlin.hadoop.graphInputFormat=org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoInputFormat
gremlin.hadoop.graphOutputFormat=org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoOutputFormat
gremlin.hadoop.memoryOutputFormat=org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat
gremlin.hadoop.deriveMemory=false
gremlin.hadoop.jarsInDistributedCache=true
gremlin.hadoop.inputLocation=data/grateful-dead-vertices.gio
gremlin.hadoop.outputLocation=output
titanmr.ioformat.conf.storage.backend=cassandrathrift
titanmr.ioformat.conf.storage.hostname=localhost
titanmr.bulkload.conf.storage.backend=cassandrathrift
titanmr.bulkload.conf.storage.hostname=localhost
#####################################
# GiraphGraphComputer Configuration #
#####################################
giraph.minWorkers=1
giraph.maxWorkers=1
giraph.SplitMasterWorker=false
giraph.useOutOfCoreGraph=true
giraph.useOutOfCoreMessages=true
mapred.map.child.java.opts=-Xmx1024m
mapred.reduce.child.java.opts=-Xmx1024m
giraph.numInputThreads=4
giraph.numComputeThreads=4
giraph.maxMessagesInMemory=100000
####################################
# SparkGraphComputer Configuration #
####################################
spark.master=local[4]
spark.executor.memory=1g
spark.serializer=org.apache.spark.serializer.KryoSerializer
cassandra.input.partitioner.class=org.apache.cassandra.dht.Murmur3Partitioner
okram commented 9 years ago

@dalaro --- if there are aspects you think are wrong because of TP3, please file issues on our issue tracker. Thanks -- that was cool to see it work!