rwynn / monstache

a go daemon that syncs MongoDB to Elasticsearch in realtime. you know, for search.
https://rwynn.github.io/monstache-site/
MIT License
1.28k stars 180 forks source link

Error starting change stream, when trying to connect to CosmosDB #454

Open BerndHardrock opened 3 years ago

BerndHardrock commented 3 years ago

We are trying to synchronise ElasticSearch with Azure CosmosDB through monstache.

Versions used:

Error:

Starting monstache ... done
Attaching to monstache
monstache    | INFO 2020/11/17 14:10:14 Started monstache version 6.7.1
monstache    | INFO 2020/11/17 14:10:14 Go version go1.14
monstache    | INFO 2020/11/17 14:10:14 MongoDB go driver v1.4.3
monstache    | INFO 2020/11/17 14:10:14 Elasticsearch go driver 7.0.21
monstache    | INFO 2020/11/17 14:10:14 Successfully connected to MongoDB version 3.6.0
monstache    | INFO 2020/11/17 14:10:14 Successfully connected to Elasticsearch version 7.7.1
monstache    | INFO 2020/11/17 14:10:14 Listening for events
monstache    | INFO 2020/11/17 14:10:14 Sending systemd READY=1
monstache    | WARN 2020/11/17 14:10:14 Systemd notification not supported (i.e. NOTIFY_SOCKET is unset)
monstache    | INFO 2020/11/17 14:10:14 Starting http server at :8080
monstache    | INFO 2020/11/17 14:10:14 Watching changes on collection myDB.myCollection
monstache    | INFO 2020/11/17 14:10:15 Direct reads completed
monstache    | ERROR 2020/11/17 14:10:15 key "operationTime" was not found
monstache    | ERROR 2020/11/17 14:10:15 Error starting change stream. Will retry: (BadValue) Change stream must be followed by a match and then a project stage

Toml config:

mongo-url = "...com:10255/...?ssl=true&replicaSet=globaldb&retrywrites=false&maxIdleTimeMS=120000"
elasticsearch-urls = ["https://...com:443"]
gzip = false
elasticsearch-max-conns = 4
elasticsearch-max-seconds = 5
elasticsearch-max-bytes = 8000000
#elasticsearch-max-docs = 1
# user and pw are set in env variables  
#elasticsearch-user = "myUser"
#elasticsearch-password = "myPW"
elasticsearch-pem-file = "/app/myCert.pem"
elasticsearch-validate-pem-file = true
direct-read-split-max = 9
verbose = true
resume = true
resume-strategy = 1
exit-after-direct-reads = false
enable-http-server = true

disable-change-events = false
direct-read-namespaces = ["myDB.myCollection"]
change-stream-namespaces = ["myDB.myCollection"]

In the Azure CosmosDB documentation we found the following regarding change streams: https://docs.microsoft.com/en-us/azure/cosmos-db/mongodb-change-streams?tabs=javascript

Can you confirm this is a CosmosDB issue and not a configuration issue from our side?

nadworny commented 3 years ago

hi @rwynn I'm working on the issue together with @BerndHardrock

In the meantime I was able to narrow down the issue. I've used ES javascript library to watch for changes and I'm getting the same error with the following $project config:

db.collection(collectionName).watch([
    {
      "$match": {
        "operationType": {
          "$in": ["insert", "update", "replace"]
        }
      }
    },
    {
      "$project": {
        "documentKey": false
      }
    }
  ], {"resumeAfter": resumeToken, "fullDocument": "updateLookup"});

I then followed an example of the watch from Azure CosmosDB docs and added the "ns": 1 to the $project config and now the error Change stream must be followed by a match and then a project stage is gone:

"$project": {
        "documentKey": false,
        "ns": 1
      }

Not sure what exactly "ns": 1 does but do you think you could implement this change? I'm unfortunately no golang specialist so can't properly create a PR for that.

Also, as CosmosDB doesn't support delete - can we disable the delete watch events through configuration?

Thanks a lot

rwynn commented 3 years ago

Hi @nadworny @BerndHardrock ,

I've never tried monstache with CosmosDB. It seems that because of these limitations it would be a challenge.

Normally, you can send empty pipeline stages to the Watch API, but seems CosmosDB does not allow that. Monstache does allow you to customize the pipeline so we can work around the limitation with monstache configuration...

sample from Cosmos docs applied...


[[pipeline]]
script = """
module.exports = function(ns, changeStream) {
  if (changeStream) {
    return [
        { $match: { "operationType": { $in: ["insert", "update", "replace"] } } },
        { $project: { "_id": 1, "fullDocument": 1, "ns": 1, "documentKey": 1 } }
    ];
  }
}
"""

However, I don't see a workaround for

The operationType and updateDescription properties are not yet supported in the output document.

Monstache uses gtm to listen to the change stream and operationType is something that gtm uses.

rwynn commented 3 years ago

You may be able to get pretty far by forking gtm and updating mapOperation to simply return u to treat everything like an update operation.

Then updating monstache to point the gtm imports to your fork.

"github.com/rwynn/gtm/v2"
"github.com/rwynn/gtm/v2/consistent"