inolen / titan-node

Wrapper around gremlin-node to provide out of the box support for Titan graph database
23 stars 13 forks source link

Using ElasticSearch w/gremlin-node (or titan-node) #1

Closed jcamins closed 10 years ago

jcamins commented 10 years ago

Is there any special configuration required to make Titan+ElasticSearch work with asynchronous gremlin-node? Changes made via v.setProperty(...) don't seem to be reflected in ElasticSearch after the commit. (also, do you prefer questions via a different medium than GitHub issues?)

inolen commented 10 years ago

GH issues are great :)

Can you paste the code entirely? It should work fine.

jcamins commented 10 years ago

Absolutely. Here is the code I'm using. ElasticSearch reports zero documents being indexed, but the vertex is really committed, since connecting to the database using a shell shows the record that I added.

var inspect = require('eyes').inspector({maxLength: false});
var path = require('path');
var Gremlin = require('gremlin'),
    gremlin = new Gremlin({
        classpath: [ path.join(__dirname, 'target', '**', '*.jar') ],
        options: [ ]
    });

var BaseConfiguration = gremlin.java.import('org.apache.commons.configuration.BaseConfiguration');
var TitanFactory = gremlin.java.import('com.thinkaurelius.titan.core.TitanFactory');
var Type = gremlin.ClassTypes;
var Parameter = gremlin.java.import('com.thinkaurelius.titan.core.Parameter');

var gconf = new BaseConfiguration();
gconf.setPropertySync("storage.backend", "cassandra");
gconf.setPropertySync("storage.hostname", "127.0.0.1");
gconf.setPropertySync("storage.keyspace", "bn_eastpenns");
gconf.setPropertySync("storage.index.search.backend", "elasticsearch");
gconf.setPropertySync("storage.index.search.client-only", true);
gconf.setPropertySync("storage.index.search.hostname", "127.0.0.1");
gconf.setPropertySync("storage.index.search.index-name", "bn_eastpenns");
var db = TitanFactory.openSync(gconf);
db.makeKeySync('example').dataTypeSync(Type.String.class)
    .indexedSync("search", Type.Vertex.class, gremlin.java.newArray('com.thinkaurelius.titan.core.Parameter', []))
    .singleSync().makeSync().getIdSync();

var g = gremlin.wrap(db);

g.addVertex(null, function (err, v) {
    v.setProperty('example', 'some data', function (err, res) {
        g.commit(function (err, res) {
            g.V().toJSON(function (err, res) {
                console.log(res);
            });
        });
    });
});
inolen commented 10 years ago

That does indeed look correct. It seems like this may be a configuration issue? What happens if you set that property through the actual gremlin shell?

jcamins commented 10 years ago

If I replace the addVertex stanza with the following synchronous equivalent, the code works, so I think it is not configuration:

var v = db.addVertexSync(null);
v.setPropertySync('example', 'some data');
db.commitSync();
g.V().toJSON(function (err, res) {
    console.log(res);
}); 

(and it works using the synchronous methods in the gremlin console)

inolen commented 10 years ago

Hmph, that it is really odd.

Try:

var txn = g.newTransaction();
txn.addVertex(null, function (err, v) {
    v.setProperty('example', 'some data', function (err, res) {
        txn.commit(function (err, res) {
            g.V().toJSON(function (err, res) {
                console.log(res);
            });
        });
    });
});

If that works, there must be something wrong in _getTransaction. Actually, that'd still be odd as the vertex makes it into Cassandra.

jcamins commented 10 years ago

I have the same problem with the explicit transaction. I'm going to try and increase the log level, see if I can get any useful feedback from Titan.

inolen commented 10 years ago

I'll take a look into this in a bit as well, this could be a nasty issue.

jcamins commented 10 years ago

TRACE shows that on the non-synchronous version, ElasticSearch is not accessed at all during the commit. The following lines are missing from the asynchronous version: [main] DEBUG c.t.t.d.es.ElasticSearchIndex - Registering string type for 1c [main] TRACE c.t.t.d.es.ElasticSearchIndex - Adding entire document 4

The asynchronous version also includes 15 extra messages from the StandardIDPool.

In case it is of use to you, here are the differing sections of the logs at level TRACE. The following is the log from the asynchronous, explicit transaction version: [Thread-6] DEBUG c.t.t.d.k.CachedKeyColumnValueStore - Cache Retrieval on vertexindex. Attempts: 1 | Misses: 1 [Thread-6] TRACE c.t.t.g.d.idassigner.StandardIDPool - [3] Returned id: 2 [Thread-6] TRACE c.t.t.g.d.idassigner.StandardIDPool - [2] Returned id: 16 [Thread-6] TRACE c.t.t.g.d.idassigner.StandardIDPool - [2] Returned id: 17 [Thread-6] TRACE c.t.t.g.d.idassigner.StandardIDPool - [2] Returned id: 18 [Thread-6] TRACE c.t.t.g.d.idassigner.StandardIDPool - [2] Returned id: 19 [Thread-6] TRACE c.t.t.g.d.idassigner.StandardIDPool - [2] Returned id: 20 [Thread-6] TRACE c.t.t.g.d.idassigner.StandardIDPool - [2] Returned id: 21 [Thread-6] TRACE c.t.t.g.d.idassigner.StandardIDPool - [2] Returned id: 22 [Thread-6] TRACE c.t.t.g.d.idassigner.StandardIDPool - [2] Returned id: 23 [Thread-6] TRACE c.t.t.g.d.idassigner.StandardIDPool - [2] Returned id: 24 [Thread-6] TRACE c.t.t.g.d.idassigner.StandardIDPool - [2] Returned id: 25 [Thread-6] TRACE c.t.t.g.d.idassigner.StandardIDPool - [2] Returned id: 26 [Thread-6] TRACE c.t.t.g.d.idassigner.StandardIDPool - [2] Returned id: 27 [Thread-6] TRACE c.t.t.g.d.idassigner.StandardIDPool - [2] Returned id: 28 [Thread-6] TRACE c.t.t.g.d.idassigner.StandardIDPool - [2] Returned id: 29 [Thread-6] TRACE c.t.t.g.d.idassigner.StandardIDPool - [2] Returned id: 30 [Thread-7] DEBUG c.t.t.g.database.StandardTitanGraph - Saving transaction. Added 16, removed 0 [Thread-7] DEBUG c.t.t.d.l.c.ExpectedValueCheckingStore - Attempting to acquireLock on KeyColumn [k=0x1-0-101-0-120-0-97-0-109-0-112-0-108-0-101-0-0, c=0x138] ev=null [Thread-7] TRACE c.t.t.d.locking.LocalLockMediator - New local lock created: KeyColumn [k=0x1-0-101-0-120-0-97-0-109-0-112-0-108-0-101-0-0, c=0x138] namespace=AstyanaxStoreManagerbn_eastpenns txn=com.thinkaurelius.titan.diskstorage.cassandra.CassandraTransaction@309824aa [Thread-7] TRACE c.t.t.d.locking.LocalLockMediator - Updated local lock expiration: KeyColumn [k=0x1-0-101-0-120-0-97-0-109-0-112-0-108-0-101-0-0, c=0x138] namespace=AstyanaxStoreManagerbn_eastpenns txn=com.thinkaurelius.titan.diskstorage.cassandra.CassandraTransaction@309824aa oldexp=1385609003490200000 newexp=1385609003490740000

[Thread-7] TRACE c.t.t.d.locking.LocalLockMediator - Local unlock succeeded: KeyColumn [k=0x1-0-101-0-120-0-97-0-109-0-112-0-108-0-101-0-0, c=0x138] namespace=AstyanaxStoreManagerbn_eastpenns txn=com.thinkaurelius.titan.diskstorage.cassandra.CassandraTransaction@309824aa

[Thread-8] WARN c.t.t.g.transaction.StandardTitanTx - Query requires iterating over all vertices [()]. For better performance, use indexes [Thread-8] INFO c.n.a.thrift.ThriftKeyspaceImpl - Detected partitioner org.apache.cassandra.dht.Murmur3Partitioner for keyspace bn_eastpenns [Thread-8] DEBUG c.t.t.d.k.CachedKeyColumnValueStore - Cache Retrieval on edgestore. Attempts: 2 | Misses: 2 [Thread-8] DEBUG c.t.t.d.k.CachedKeyColumnValueStore - Cache Retrieval on edgestore. Attempts: 3 | Misses: 3


Synchronous version with automatic thread-local transaction:

[main] TRACE c.t.t.g.d.idassigner.StandardIDPool - [2] Returned id: 16 [main] DEBUG c.t.t.g.database.StandardTitanGraph - Saving transaction. Added 16, removed 0 [main] DEBUG c.t.t.d.l.c.ExpectedValueCheckingStore - Attempting to acquireLock on KeyColumn [k=0x1-0-101-0-120-0-97-0-109-0-112-0-108-0-101-0-0, c=0x138] ev=null [main] TRACE c.t.t.d.locking.LocalLockMediator - New local lock created: KeyColumn [k=0x1-0-101-0-120-0-97-0-109-0-112-0-108-0-101-0-0, c=0x138] namespace=AstyanaxStoreManagerbn_eastpenns txn=com.thinkaurelius.titan.diskstorage.cassandra.CassandraTransaction@19d304ac [main] TRACE c.t.t.d.locking.LocalLockMediator - Updated local lock expiration: KeyColumn [k=0x1-0-101-0-120-0-97-0-109-0-112-0-108-0-101-0-0, c=0x138] namespace=AstyanaxStoreManagerbn_eastpenns txn=com.thinkaurelius.titan.diskstorage.cassandra.CassandraTransaction@19d304ac oldexp=1385609052955154000 newexp=1385609052955720000

[main] DEBUG c.t.t.d.es.ElasticSearchIndex - Registering string type for 1c [main] TRACE c.t.t.d.locking.LocalLockMediator - Local unlock succeeded: KeyColumn [k=0x1-0-101-0-120-0-97-0-109-0-112-0-108-0-101-0-0, c=0x138] namespace=AstyanaxStoreManagerbn_eastpenns txn=com.thinkaurelius.titan.diskstorage.cassandra.CassandraTransaction@19d304ac [main] TRACE c.t.t.d.es.ElasticSearchIndex - Adding entire document 4 [main] DEBUG c.t.t.g.b.TitanBlueprintsGraph - Committed thread-bound transaction standardtitantx[null] [Thread-5] WARN c.t.t.g.transaction.StandardTitanTx - Query requires iterating over all vertices [()]. For better performance, use indexes [Thread-5] INFO c.n.a.thrift.ThriftKeyspaceImpl - Detected partitioner org.apache.cassandra.dht.Murmur3Partitioner for keyspace bn_eastpenns [Thread-5] DEBUG c.t.t.d.k.CachedKeyColumnValueStore - Cache Retrieval on edgestore. Attempts: 1 | Misses: 1 [Thread-5] DEBUG c.t.t.d.k.CachedKeyColumnValueStore - Cache Retrieval on edgestore. Attempts: 2 | Misses: 2

inolen commented 10 years ago

Weird. I've not tested ES since Titan 0.4.0.

I wonder if some thread-local storage has been introduced in 0.4.1 with all of the new caching stuff that's causing us grief here.

Edit: grepping the source says no to that.

jcamins commented 10 years ago

Ah-ha! I figured it out. After initializing the types, the changes must be committed, synchronously, at which point you can use asynchronous operations. Otherwise, the type used in the asynchronous calls is not the one that I defined.

inolen commented 10 years ago

Ah right. Or, you can wait for the commit in its callback.