Open danielmewes opened 9 years ago
(also mentioning @deontologician)
:+1:
Reopening. @deontologician has just updated the Logstash plugin (https://github.com/rethinkdb/logstash-input-rethinkdb), so we need to actually update the article to describe how to use it with Elasticsearch.
@deontologician and @chipotle could you connect please to make sure @chipotle has all the information he needs to write this?
This would require building a little example first which I haven't done yet. You have to set up the elastic search endpoint and possibly a filter
I poked at this slightly, but didn't have time to finish it. I'm pasting a partial config here so it won't get lost:
input {
rethinkdb {
host => "localhost"
port => 28015
auth_key => ""
watch_dbs => ["db1", "db2"]
watch_tables => ["test.foo", "db2.baz"]
backfill => true
}
}
filter {
mutate {rename => { "[db]" => "[@metadata][index]"
"[table]" => "[@metadata][document_type]"
dro
}
if new_val == null and old_val != null {
mutate {
replace => { "
add_field => { "[@metadata][action]" => "delete"
"[@metadata][id] => "%{[old_val][id]}"
}
}
} else if new_val != null and old_val != null {
drop { add_field => {
"[@metadata][action]" => "update"
"[@metadata][id] => "%{[new_val][id]}" }
}
} else if new_val != null and old_val == null {
drop { add_field => {
"[@metadata][action]" => "index"
"[@metadata][id] => "%{[new_val][id]}" }
}
} else {
drop { }
}
}
output {
elasticsearch {
hosts => ["localhost:9200"]
action => "%{[@metadata][action]}"
id => "%{[@metadata][id]}"
index => "%{[@metadata][index]}"
document_type => "%{[@metadata][document_type]}"
}
}
I'm really glad I stumbled upon this! I'm using the input plugin in combination with Netflix' Falcor.. writes go to Rethink and reads through elasticsearch, so of course I have to maintain parity between the DBs at all time.
Reads work like a charm and newly created entries are mirrored to elasticsearch (almost) as expected, however deleting is not mirrored to ES at all. After fumbling with Falcors cache invalidation for almost a day and then deciding to remove logstash and implement it myself, I realized this might be logstash's fault. I am currently not using any filters in my conifg and this already required me to change all my ES queries to look for the new_val
fields as below. That's the first point where I'm rather confused if it is intended or I'm just working against a badly configured logstash.
The second point as mentioned is that deletions are done in Rethink but not mirrored to ES. I guess my real question is if that is possibly due a badly configured logstash between the DBs. Also note the config pasted by @deontologician above contains an error in line 15 (dro
- third line in filter)
Some more info about the setup: I'm using Thinky as a wrapper and my primaryIndex is name
and not id
as per default. Also here's the repo and an overview of the whole thing we're (I am) building.
I would really appreciate any help on this and also volunteer to write a draft for your docs if I get this right.
Using Thinky, this is how I delete from Rethink (this works but is not mirrored in ES):
Package.get(packName).delete().run().then(function(res){ ...
My logstash config:
input {rethinkdb
{host => "localhost"
port => 28015
auth_key => ""
watch_dbs => ["cookiejar"]
watch_tables => ["cookiejar.Package"]
backfill => true
}}
output {
elasticsearch {
hosts => ["localhost:9200"]
index => "packages"
document_type => "package"
template => "_logstash/logstash.conf"
template_overwrite => true
}
stdout { codec => json_lines }
}
This is how I query ES currently (note the new_val
):
return elastic.search({
index: "packages",
type: "package",
size: config.defaultSize,
body: {
query: {
"multi_match" : {
"query": packName,
"type": "best_fields",
"fields": ["new_val.name", "new_val.description", "new_val.keywords", "new_val.owner"],
"minimum_should_match": "25%",
"fuzziness" : 2,
}
}
}
}).then(function(searchresult) {
Thanks for sharing this @BenMann . Deletions should be possible. Maybe a question of configuration? Unfortunately I'm not familiar with logstash configuration. @deontologician do you have any insights?
The logstash filter has to do a bit of work to translate new_val: null
from a changefeed into action: "delete"
(see here) when it's passed to the elasticsearch plugin. Since logstash isn't specific to exporting from RethinkDB to ElasticSearch, the interface is more general. My config above is a start at getting this working, using hidden variables that won't be persisted in the document sent to elasticsearch.
One thing that will make writing the filter less painful is rethinkdb/rethinkdb#5188, which makes it more direct to translate a change type into an elasticsearch-output action
Yeah that's exactly what I figured out :)
I read everything I could gather about logstash configs yesterday and ended up with the config below, which heavily builds on your code. However two hard things remain:
1) I'm not sure if [field] != "null"
is correct (i guess not) but [field] != null
throws an error. I'll try reversing and checking ![field]
tomorrow.
2) I currently get an error in my output, which indicates that action
has to be of index, delete, etc..
(as you also mentioned) which could be caused by 1) or something else I'm missing --> ?.
Is there a "correct" way in logstash to check if a value is null? I think it's possible with a ruby filter, but I'd prefer an easier solution if there is any. I also think the logstash docs could be more explicit on this :P
I'll keep you posted.
current (failing) config:
logstash/bin/logstash -e '
input {
rethinkdb{
host => "localhost"
port => 28015
auth_key => ""
watch_dbs => ["cookiejar"]
watch_tables => ["cookiejar.Package"]
backfill => true
}
}
filter{
if [new_val] == "null" and [old_val] != "null" {
mutate {
replace => {
add_field => {
"[@metadata][action]" => "delete"
"[@metadata][id]" => "%{[old_val][id]}"
}
}
}
}
else if [new_val] != "null" and [old_val] != "null" {
drop {
add_field => {
"[@metadata][action]" => "update"
"[@metadata][id]" => "%{[new_val][id]}"
}
}
}
else if [new_val] != "null" and [old_val] == "null" {
drop {
add_field => {
"[@metadata][action]" => "index"
"[@metadata][id]" => "%{[new_val][id]}"
}
}
}
}
output {
elasticsearch {
hosts => ["localhost:9200"]
index => "packages"
action => "%{[@metadata][action]}"
document_id => "%[@metadata][id]"
document_type => "package"
}
stdout {
codec => rubydebug{
metadata => true
}
}
}'
Thanks to Deontologician. Looking forward for a working solution. This would be part for the solution im looking for in the stackoverflow port:
http://stackoverflow.com/questions/35088964/complex-rethinkdb-json-to-elasticsearch-via-logstash
Are there any news on this? Did you have a breakthrough?
I implemented the necessary write/delete/update actions myself, which removes logstash from our setup for the moment. I am however also still interested in a solution with logstash, as I think it is generally preferable.
into rethinkDB? If so this is grande. Is this proprietary or can you share this?
@stevns From RethinkDB into elasticsearch. Like described here just without logstash. If it is of any use to you, just dig around the codebase a bit. Most important bit should be in API/rethinkDB.
Another index fulltext aproach here.
The instructions on http://rethinkdb.com/docs/elasticsearch/ are no longer going to work with RethinkDB 2.2, since the River plugin is based on the inofficial Java driver which still uses the protocol buffer protocol that we're dropping in 2.2.
I suggest the following steps:
include_initial
into the logstash plugin https://github.com/rethinkdb/rethinkdb/issues/3197 .