eBay / cassandra-river

Cassandra river for Elastic search.
Apache License 2.0
38 stars 25 forks source link

indexing a cassandra column family #1

Open macca129 opened 10 years ago

macca129 commented 10 years ago

I have successfully installed the river and can successfully connect to the cassandra cluster, however I am missing something fundamental since I cannot see a index building.

I have 10,000 rows in my CF in a 3 node cluster and see activity in the ES logs [2014-02-27 16:01:03,516][INFO ][river.cassandra ] [Warren III Worthington] [cassandra][maccasIndex] Starting thread with 1000 keys [2014-02-27 16:01:03,716][INFO ][river.cassandra ] [Warren III Worthington] [cassandra][maccasIndex] Starting thread with 738 keys

this repeats over and over again.

I am not using Auth in the cluster so have omitted the properties

sanitised

curl -XPUT 'localhost:9200/_river/maccasIndex/_meta' -d '{ "type" : "cassandra", "cassandra" : { "cluster_name" : "MaccasCluster", "keyspace" : "MaccasKeyspace", "column_family" : "MaccasCF", "batch_size" : 1000, "hosts" : "dsc1n1:9160,dsc1n2:9160,dsc1n3:9160"

},
"index" : {
    "index" : "maccasIndex",
    "type" : "cassandra"
}

}'

Cassandra info: [cqlsh 3.1.2 | Cassandra 1.2.6.1 | CQL spec 3.0.0 | Thrift protocol 19.36.0]

the CF definition:

CREATE TABLE "MaccasCF" ( key text PRIMARY KEY, "___class" text, "addressLine1" text, "addressLine2" text, ......


I am definitely missing something

http://localhost:9200/_status?pretty=1

{ "_shards" : { "total" : 2, "successful" : 1, "failed" : 0 }, "indices" : { "_river" : { "index" : { "primary_size_in_bytes" : 3820, "size_in_bytes" : 3820 }, "translog" : { "operations" : 0 }, "docs" : { "num_docs" : 1, "max_doc" : 1, "deleted_docs" : 0 }, "merges" : { "current" : 0, "current_docs" : 0, "current_size_in_bytes" : 0, "total" : 0, "total_time_in_millis" : 0, "total_docs" : 0, "total_size_in_bytes" : 0 }, "refresh" : { "total" : 2, "total_time_in_millis" : 84 }, "flush" : { "total" : 1, "total_time_in_millis" : 20 }, "shards" : { "0" : [ { "routing" : { "state" : "STARTED", "primary" : true, "node" : "czJqPzFBSseiXoKX0YtWkA", "relocating_node" : null, "shard" : 0, "index" : "_river" }, "state" : "STARTED", "index" : { "size_in_bytes" : 3820 }, "translog" : { "id" : 1393509195769, "operations" : 0 }, "docs" : { "num_docs" : 1, "max_doc" : 1, "deleted_docs" : 0 }, "merges" : { "current" : 0, "current_docs" : 0, "current_size_in_bytes" : 0, "total" : 0, "total_time_in_millis" : 0, "total_docs" : 0, "total_size_in_bytes" : 0 }, "refresh" : { "total" : 2, "total_time_in_millis" : 84 }, "flush" : { "total" : 1, "total_time_in_millis" : 20 } } ] } } } }

http://localhost:9200/_river/maccasIndex/_mapping?pretty=1 { "_river" : { "mappings" : { "maccasIndex" : { "properties" : { "cassandra" : { "properties" : { "batch_size" : { "type" : "long" }, "cluster_name" : { "type" : "string" }, "column_family" : { "type" : "string" }, "hosts" : { "type" : "string" }, "keyspace" : { "type" : "string" } } }, "index" : { "properties" : { "index" : { "type" : "string" }, "type" : { "type" : "string" } } }, "type" : { "type" : "string" } } } } }

}

if I run:

http://localhost:9200/_river/maccasIndex/_search?q=_:_pretty=1

I get {"took":268,"timed_out":false,"_shards":{"total":1,"successful":1,"failed":0},"hits":{"total":0,"max_score":null,"hits":[]}}

but no sign of any indexed data

Can you tell me where I am going wrong?

utsengar commented 10 years ago

Do you ever see this log.INFO statement Inserting {} keys in ES? If not then I think there is a bug here: https://github.com/eBay/cassandra-river/blob/master/src/main/java/org/elasticsearch/river/cassandra/CassandraRiver.java#L166

It's not calling saveToEs with the bulk data.

macca129 commented 10 years ago

No I don't ever see that statement in the log I just see the "starting thread with 1000 keys"

macca129 commented 10 years ago

I thought my syntax might be incorrect

utsengar commented 10 years ago

Can you add these two lines before the if check on line 165 in the class CassandraRiver:

logger.info("Current bulk size: {}", bulk.numberOfActions());
logger.info("Total batch size set: {}", this.batchSize);

Rebuild the plugin and run it again. I am not very sure what is wrong there, will need to debug it. It works on my local machine.

macca129 commented 10 years ago

sure thing what do I need to do to perform the rebuild? LINUX centos with maven installed

utsengar commented 10 years ago

You just need mvn clean package and then copy over the jar. Its in the README.md:

Build: mvn clean package

Install: ./bin/plugin -url file:elasticsearch-river-cassandra/target/releases/elasticsearch-river-cassandra-1.0.0-SNAPSHOT.zip -install river-cassandra
macca129 commented 10 years ago

ok I have rebuilt but I'm not seeing any additional statements in the logs

macca129 commented 10 years ago

logs output....

[2014-02-27 17:39:57,286][INFO ][river.cassandra ] [Mary Walker] [cassandra][maccasIndex] Starting thread with 1000 keys [2014-02-27 17:39:57,396][INFO ][river.cassandra ] [Mary Walker] [cassandra][maccasIndex] Starting thread with 738 keys [2014-02-27 17:39:57,575][INFO ][river.cassandra ] [Mary Walker] [cassandra][maccasIndex] Starting thread with 1000 keys [2014-02-27 17:39:57,699][INFO ][river.cassandra ] [Mary Walker] [cassandra][maccasIndex] Starting thread with 1000 keys [2014-02-27 17:39:57,791][INFO ][river.cassandra ] [Mary Walker] [cassandra][maccasIndex] Starting thread with 1000 keys [2014-02-27 17:39:57,955][INFO ][river.cassandra ] [Mary Walker] [cassandra][maccasIndex] Starting thread with 1000 keys [2014-02-27 17:39:58,072][INFO ][river.cassandra ] [Mary Walker] [cassandra][maccasIndex] Starting thread with 1000 keys [2014-02-27 17:39:58,212][INFO ][river.cassandra ] [Mary Walker] [cassandra][maccasIndex] Starting thread with 1000 keys [2014-02-27 17:39:58,387][INFO ][river.cassandra ] [Mary Walker] [cassandra][maccasIndex] Starting thread with 1000 keys

macca129 commented 10 years ago

Am I supposed to see: I would expect to see 10,000 json documents indexed, and I would like to search on all the other fields and return a record ID which is my primary key in the C* db CF

utsengar commented 10 years ago

yes, that is the expected behavior. There looks an issue with the data or the river where your bulk object is not being constructed.

Can you add the same print statements after this line: BulkRequestBuilder bulk = client.prepareBulk();

And also add this print statement after: for(String key : this.keys.rowColumnMap.keySet()){ logger.info("Current key: {}", key);

macca129 commented 10 years ago

ok this is what I see now

[2014-02-27 17:59:45,301][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Current key: 6a81fdb0-7952-11e3-b704-000c29151863 [2014-02-27 17:59:45,415][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Starting thread with 1000 keys [2014-02-27 17:59:45,415][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Current bulk size: 0 [2014-02-27 17:59:45,415][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Total batch size set: 1000 [2014-02-27 17:59:45,415][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Current key: 1f10f110-7952-11e3-b704-000c29151863 [2014-02-27 17:59:45,509][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Starting thread with 1000 keys [2014-02-27 17:59:45,509][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Current bulk size: 0 [2014-02-27 17:59:45,509][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Total batch size set: 1000 [2014-02-27 17:59:45,509][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Current key: 146688b0-7952-11e3-b704-000c29151863 [2014-02-27 17:59:45,680][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Starting thread with 1000 keys [2014-02-27 17:59:45,680][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Current bulk size: 0 [2014-02-27 17:59:45,681][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Total batch size set: 1000 [2014-02-27 17:59:45,681][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Current key: 545d80e0-7952-11e3-b704-000c29151863 [2014-02-27 17:59:45,779][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Starting thread with 1000 keys [2014-02-27 17:59:45,779][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Current bulk size: 0 [2014-02-27 17:59:45,780][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Total batch size set: 1000 [2014-02-27 17:59:45,780][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Current key: a6b53720-7952-11e3-b704-000c29151863 [2014-02-27 17:59:45,960][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Starting thread with 1000 keys [2014-02-27 17:59:45,960][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Current bulk size: 0 [2014-02-27 17:59:45,960][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Total batch size set: 1000 [2014-02-27 17:59:45,960][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Current key: 610fc730-7952-11e3-b704-000c29151863 [2014-02-27 17:59:46,088][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Starting thread with 1000 keys [2014-02-27 17:59:46,089][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Current bulk size: 0 [2014-02-27 17:59:46,089][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Total batch size set: 1000 [2014-02-27 17:59:46,089][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Current key: c2145b40-7952-11e3-b704-000c29151863 [2014-02-27 17:59:46,192][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Starting thread with 1000 keys [2014-02-27 17:59:46,193][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Current bulk size: 0 [2014-02-27 17:59:46,193][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Total batch size set: 1000 [2014-02-27 17:59:46,193][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Current key: 9704f310-7952-11e3-b704-000c29151863 [2014-02-27 17:59:46,307][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Starting thread with 1000 keys [2014-02-27 17:59:46,307][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Current bulk size: 0 [2014-02-27 17:59:46,307][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Total batch size set: 1000 [2014-02-27 17:59:46,307][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Current key: ff236b30-7951-11e3-b704-000c29151863 [2014-02-27 17:59:46,435][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Starting thread with 1000 keys [2014-02-27 17:59:46,436][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Current bulk size: 0 [2014-02-27 17:59:46,436][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Total batch size set: 1000 [2014-02-27 17:59:46,436][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Current key: 6df99840-7952-11e3-b704-000c29151863 [2014-02-27 17:59:46,549][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Starting thread with 1000 keys [2014-02-27 17:59:46,549][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Current bulk size: 0 [2014-02-27 17:59:46,549][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Total batch size set: 1000 [2014-02-27 17:59:46,549][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Current key: 562b8a20-7952-11e3-b704-000c29151863 [2014-02-27 17:59:46,652][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Starting thread with 1000 keys [2014-02-27 17:59:46,653][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Current bulk size: 0 [2014-02-27 17:59:46,653][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Total batch size set: 1000 [2014-02-27 17:59:46,653][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Current key: 3f23d940-7952-11e3-b704-000c29151863 [2014-02-27 17:59:46,805][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Starting thread with 1000 keys [2014-02-27 17:59:46,805][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Current bulk size: 0 [2014-02-27 17:59:46,806][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Total batch size set: 1000 [2014-02-27 17:59:46,806][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Current key: c8e6e0a0-7952-11e3-b704-000c29151863 [2014-02-27 17:59:46,897][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Starting thread with 1000 keys [2014-02-27 17:59:46,897][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Current bulk size: 0 [2014-02-27 17:59:46,897][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Total batch size set: 1000 [2014-02-27 17:59:46,897][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Current key: 7324af30-7952-11e3-b704-000c29151863 [2014-02-27 17:59:47,013][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Starting thread with 1000 keys [2014-02-27 17:59:47,014][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Current bulk size: 0 [2014-02-27 17:59:47,014][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Total batch size set: 1000 [2014-02-27 17:59:47,014][INFO ][river.cassandra ] [Hrimhari] [cassandra][maccasIndex] Current key: e901fba0-7951-11e3-b704-000c29151863

macca129 commented 10 years ago

FYI all CF fields including the PRIMARY KEY field are of type "text"

utsengar commented 10 years ago

So the problem is some where in this line: bulk.add(indexRequest(this.indexName).type(this.typeName).id(id).source(this.keys.rowColumnMap.get(key))); is the problem. This line is not really adding documents to the ES indexer.

I don't have much time right now to reproduce your scenario right now since this works fine on my machine locally. Also I wrote this river for a quick prototype to connect ES with cassandra. If you plan to use it in production, I strongly recommend to understand how it works and improve it.

I would be more than happy to help you debug it.

macca129 commented 10 years ago

perhaps if I recreate your table structure and your create index statement (from your environment) I can remove my schema from the problem?