rwynn / monstache

a go daemon that syncs MongoDB to Elasticsearch in realtime. you know, for search.
https://rwynn.github.io/monstache-site/
MIT License
1.29k stars 182 forks source link

Map index on DELETE action - Not accessing plugin #691

Closed Arenhardt closed 1 year ago

Arenhardt commented 1 year ago

My use case is based a performance issue. Currently I have a bigger mongo database that is mapped in a single and bigger ElasticSearch index. My idea is to split the index based in a info contained on each DOC on mongo. So the source of the data will be maintained unchanged and inside the Map of the plugin based on a string choose the destiny of the data.

my_db.collection -> database collection

my_db.collection.0, my_db.collection.1, my_db.collection.2 .... -> pool of indexes inside ElasticSearch

I'm able to overwrite the Index name to be used by Monstache in a custom plugin using Map:

func Map(input *monstachemap.MapperPluginInput) (output *monstachemap.MapperPluginOutput, err error) {
        .....
        output = &monstachemap.MapperPluginOutput{Document: input.Document, Index: custom_index}
    return
}
TRACE 2023/08/17 17:29:28 POST /_bulk HTTP/1.1
Host: elasticsearch:9200
User-Agent: elastic/7.0.22 (linux-amd64)
Content-Length: 417
Accept: application/json
Content-Type: application/x-ndjson
Accept-Encoding: gzip

{"index":{"_index":"my_db.collection.1","_id":"64de58f5c714b2e1a9bb87c0","routing":"6955911230540267522","version":7268344658753452040,"version_type":"external"}}
{"account_id":"111111111111111", .....}

TRACE 2023/08/17 17:29:28 HTTP/1.1 200 OK
Content-Type: application/json; charset=UTF-8

{"took":19,"errors":false,"items":[{"index":{"_index":"my_db.collection.1","_type":"_doc","_id":"64de58f5c714b2e1a9bb87c0","_version":7268344658753452040,"result":"created","_shards":{"total":2,"successful":1,"failed":0},"_seq_no":4,"_primary_term":1,"status":201}}]}

But during deletion process it doesn't work:

TRACE 2023/08/17 17:29:48 POST /_bulk HTTP/1.1
Host: elasticsearch:9200
User-Agent: elastic/7.0.22 (linux-amd64)
Content-Length: 142
Accept: application/json
Content-Type: application/x-ndjson
Accept-Encoding: gzip

{"delete":{"_index":"my_db.collection","_id":"64de58f5c714b2e1a9bb87c0","version":7268344739681397664,"version_type":"external"}}

TRACE 2023/08/17 17:29:48 HTTP/1.1 200 OK
Content-Type: application/json; charset=UTF-8

{"took":16,"errors":false,"items":[{"delete":{"_index":"my_db.collection","_type":"_doc","_id":"64de58f5c714b2e1a9bb87c0","_version":7268344739681397664,"result":"not_found","_shards":{"total":2,"successful":1,"failed":0},"_seq_no":2,"_primary_term":1,"status":404}}]}

The usage of routing-namespaces doesn't look so smart

rwynn commented 1 year ago

Can you share what you have for these relevant configs? Routing namespaces should be configured to be the MongoDB name-space(s) that you are redirecting, not the destiny (index) your plugin sends to.


# tell monstache don't assume where ANY deleted doc will be, instead search for it by id
routing-namespaces = [ "" ]

# optional, instead, to be very specific, tell monstache you are redirecting events 
# from source namespace my_db.collection
# routing-namespaces = [ "my_db.collection" ]

# this is already the default (stateless)
delete-strategy = 0

# optional, if you still haven't had success you can try enabled this, allows monstache to delete more than 1 doc in Elastic
# if multiple docs match on the id
disable-delete-protection = true

# optional, if you want to restrict monstache to only search for docs to delete according to an index pattern
# the default is to search all indices
delete-index-pattern = "my_db.collection*"

Relevant delete handling code here: https://github.com/rwynn/monstache/blob/39b8a2503070e04135a64ad805ae957aed29fa18/monstache.go#L4003

Make sure it's going into the stateless block and that it's performing a search by having the following return true.

if routingNamespaces[""] || routingNamespaces[op.Namespace] {

If all else fails, you already have a go plugin, so you can implement the delete yourself by adding a Process function.

// all events including deletes flow through here.
func Process(input*monstachemap.ProcessPluginInput) error
Arenhardt commented 1 year ago

I saw the problem trying to create my own solution. The watch event for deletion isn't like other ones that provide the doc body. So in my case that I was trying to route the deletion to the right index based on a value present on the doc it couldn't work.

So I created a delete-index-pattern = "my_db.collection*" on my config. It's not a solution with a wonderful performance (we need to search across all the indexes) but in my case, most of the records are updated to a status of removed instead of a record remotion so it will be performed rarely

Thank you for your help @rwynn !