airbytehq / airbyte

The leading data integration platform for ETL / ELT data pipelines from APIs, databases & files to data warehouses, data lakes & data lakehouses. Both self-hosted and Cloud-hosted.
https://airbyte.com
Other
15.52k stars 4k forks source link

Source MongoDB: invalid $project :: caused by :: FieldPath field names may not start with '$' #14987

Open osalloum opened 2 years ago

osalloum commented 2 years ago

Environment

Current Behavior

The source connector fails to discover the schema

Expected Behavior

The source connector introspects the schema and shows the collections and destination configuration

Logs

"Invalid $project :: caused by :: FieldPath field names may not start with '$'

Steps to Reproduce

  1. Enable Profiling on Mongo
  2. Set up new mongo source connector
  3. Click on add new destination

Workaround

Stop profiling and drop collection

db.setProfilingLevel(0)
db.system.profile.drop()

Possible fix

Exclude system tables from schema discovery

marcosmarxm commented 2 years ago

Zendesk ticket #1645 has been linked to this issue.

marcosmarxm commented 2 years ago

Comment made from Zendesk by Sajarin Dider on 2022-07-25 at 15:50:

Hey @fauh45, looks like another user is also reporting the same issue: https://github.com/airbytehq/airbyte/issues/14987. Looks like there is an issue with the parsing of the $ character according to MongoDB docs: https://www.mongodb.com/docs/manual/reference/operator/aggregation/literal/
marcosmarxm commented 2 years ago

Comment made from Zendesk by Marcos Marx on 2022-07-26 at 01:59:

Ahhh I see, let me try to use their workaround first

[Discourse post]
marcosmarxm commented 2 years ago

Comment made from Zendesk by Marcos Marx on 2022-07-26 at 02:27:

Turns out at my current settings the profiling are set to 0 already. I even tried to set the profiling to 1, then do the connection setup, and do the workaround shown in the issues. Still nothing works.

Does the mongodb version could cause this error? But don’t $project already exist way long ago? Maybe just the parsing of the command given by the source connector?

[Discourse post]
marcosmarxm commented 2 years ago

Comment made from Zendesk by Marcos Marx on 2022-07-26 at 06:51:

Did you try to drop the collection the collection of profiling from the mongo database you are targeting?

db.system.profile.drop()

If that does not help, it might be a different collection that is causing the issue on your end
Unfortunately the airbyte logs are not helpful in identifying the culprit

There are 2 ways to figure it out:

The hard way:
To figure it out, if you your own custom mongo setup, you can try figure out the log output of mongo ( https://www.mongodb.com/docs/manual/reference/program/mongod/#std-option-mongod.--logpath ) and tail that log stream and filter out for $project call

tail -f /replace/with/your/path/to/mongo.log | grep '$project'

if you are using a SaaS like Atlas, you should download the full file and look into: https://www.mongodb.com/docs/atlas/mongodb-logs/

Ultimately you will be able to see error log and identify the culprit collection then figure out what can be done about it

The easy way:
Connect to mongodb with same credentials you are giving airbyte and run

db.runCommand( { listCollections: 1.0, authorizedCollections: true, filter: {type: 'collection'} } ).cursor.firstBatch

which is exactly the same call airbyte is doing airbytehq/airbyte/blob/436de264cbb9402cfb8d7b6b8d0cd996efc4f659/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io.airbyte.integrations.source.mongodb/MongoDbSource.java (can’t post link :frowning: )

If you are seeing a bunch of system tables (has “system.” in the name), that points out to your problem, you need to set up a user with lower access rights to the target db, a “read” user or “dbOwner”

[Discourse post]
ecylmz commented 1 year ago

Hi,

We are also having the same problem. Any progress on this issue?

marcosmarxm commented 1 year ago

Not yet @ecylmz please if you can give more information about how reproduce the issue it will help a lot.

nseniak commented 1 year ago

Hi @marcosmarxm I have the same problem, unfortunately I can't give you access to the database that causes the problem but I can give the stack trace if that may help:

2022-12-23 14:14:49 INFO i.a.w.i.DefaultAirbyteStreamFactory(internalLog):120 - Opened connection [connectionId{localValue:62, serverValue:703934}] to 192.168.240.56:27017
2022-12-23 14:15:11 ERROR i.a.w.i.DefaultAirbyteStreamFactory(internalLog):116 - Something went wrong in the connector. See the logs for more details.
Stack Trace: com.mongodb.MongoCommandException: Command failed with error 16410 (Location16410): 'FieldPath field names may not start with '$'.' on server 192.168.240.56:27017. The full response is {"operationTime": {"$timestamp": {"t": 1671804890, "i": 2781}}, "ok": 0.0, "errmsg": "FieldPath field names may not start with '$'.", "code": 16410, "codeName": "Location16410", "$clusterTime": {"clusterTime": {"$timestamp": {"t": 1671804890, "i": 2827}}, "signature": {"hash": {"$binary": {"base64": "AAAAAAAAAAAAAAAAAAAAAAAAAAA=", "subType": "00"}}, "keyId": 0}}}
    at com.mongodb.internal.connection.ProtocolHelper.getCommandFailureException(ProtocolHelper.java:198)
    at com.mongodb.internal.connection.InternalStreamConnection.receiveCommandMessageResponse(InternalStreamConnection.java:418)
    at com.mongodb.internal.connection.InternalStreamConnection.sendAndReceive(InternalStreamConnection.java:342)
    at com.mongodb.internal.connection.UsageTrackingInternalConnection.sendAndReceive(UsageTrackingInternalConnection.java:116)
    at com.mongodb.internal.connection.DefaultConnectionPool$PooledConnection.sendAndReceive(DefaultConnectionPool.java:647)
    at com.mongodb.internal.connection.CommandProtocolImpl.execute(CommandProtocolImpl.java:71)
    at com.mongodb.internal.connection.DefaultServer$DefaultServerProtocolExecutor.execute(DefaultServer.java:244)
    at com.mongodb.internal.connection.DefaultServerConnection.executeProtocol(DefaultServerConnection.java:227)
    at com.mongodb.internal.connection.DefaultServerConnection.command(DefaultServerConnection.java:127)
    at com.mongodb.internal.connection.DefaultServerConnection.command(DefaultServerConnection.java:117)
    at com.mongodb.internal.connection.DefaultServer$OperationCountTrackingConnection.command(DefaultServer.java:348)
    at com.mongodb.internal.operation.CommandOperationHelper.createReadCommandAndExecute(CommandOperationHelper.java:228)
    at com.mongodb.internal.operation.CommandOperationHelper.lambda$executeRetryableRead$4(CommandOperationHelper.java:210)
    at com.mongodb.internal.operation.OperationHelper.lambda$withSourceAndConnection$2(OperationHelper.java:564)
    at com.mongodb.internal.operation.OperationHelper.withSuppliedResource(OperationHelper.java:589)
    at com.mongodb.internal.operation.OperationHelper.lambda$withSourceAndConnection$3(OperationHelper.java:563)
    at com.mongodb.internal.operation.OperationHelper.withSuppliedResource(OperationHelper.java:589)
    at com.mongodb.internal.operation.OperationHelper.withSourceAndConnection(OperationHelper.java:562)
    at com.mongodb.internal.operation.CommandOperationHelper.lambda$executeRetryableRead$5(CommandOperationHelper.java:207)
    at com.mongodb.internal.async.function.RetryingSyncSupplier.get(RetryingSyncSupplier.java:65)
    at com.mongodb.internal.operation.CommandOperationHelper.executeRetryableRead(CommandOperationHelper.java:213)
    at com.mongodb.internal.operation.CommandOperationHelper.executeRetryableRead(CommandOperationHelper.java:193)
    at com.mongodb.internal.operation.AggregateOperationImpl.execute(AggregateOperationImpl.java:195)
    at com.mongodb.internal.operation.AggregateOperation.execute(AggregateOperation.java:306)
    at com.mongodb.internal.operation.AggregateOperation.execute(AggregateOperation.java:46)
    at com.mongodb.client.internal.MongoClientDelegate$DelegateOperationExecutor.execute(MongoClientDelegate.java:191)
    at com.mongodb.client.internal.MongoIterableImpl.execute(MongoIterableImpl.java:135)
    at com.mongodb.client.internal.MongoIterableImpl.iterator(MongoIterableImpl.java:92)
    at com.mongodb.client.internal.MongoIterableImpl.cursor(MongoIterableImpl.java:97)
    at io.airbyte.db.mongodb.MongoUtils.getTypes(MongoUtils.java:269)
    at io.airbyte.db.mongodb.MongoUtils.lambda$setSubFields$1(MongoUtils.java:235)
    at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
    at io.airbyte.db.mongodb.MongoUtils.setSubFields(MongoUtils.java:234)
    at io.airbyte.db.mongodb.MongoUtils.lambda$setSubFields$1(MongoUtils.java:239)
    at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
    at io.airbyte.db.mongodb.MongoUtils.setSubFields(MongoUtils.java:234)
    at io.airbyte.db.mongodb.MongoUtils.lambda$setSubFields$1(MongoUtils.java:239)
    at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
    at io.airbyte.db.mongodb.MongoUtils.setSubFields(MongoUtils.java:234)
    at io.airbyte.db.mongodb.MongoUtils.lambda$getUniqueFields$0(MongoUtils.java:212)
    at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197)
    at java.base/java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1707)
    at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
    at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
    at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:575)
    at java.base/java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
    at java.base/java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:616)
    at java.base/java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:622)
    at java.base/java.util.stream.ReferencePipeline.toList(ReferencePipeline.java:627)
    at io.airbyte.db.mongodb.MongoUtils.getUniqueFields(MongoUtils.java:215)
    at io.airbyte.integrations.source.mongodb.MongoDbSource.lambda$discoverInternal$1(MongoDbSource.java:106)
    at java.base/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
    at java.base/java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1707)
    at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
    at java.base/java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:290)
    at java.base/java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:754)
    at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373)
    at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182)
    at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655)
    at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622)
    at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165)
2022-12-23 14:15:12 INFO i.a.w.t.TemporalAttemptExecution(get):163 - Stopping cancellation check scheduling...
sajarin commented 1 year ago

@nseniak

The error message FieldPath field names may not start with '$' occurs when a MongoDB query includes a field path (a document field name or an array of field names) that starts with a dollar sign $. is not allowed in MongoDB because the dollar sign is reserved for use in special field names and operator names.

This error can occur for a few different reasons:

Hope this helps!

nseniak commented 1 year ago

@sajarin thanks for the clarification. However this error occurs while the mongodb source connectors attempts to introspect the database schema, and I have no control over the database field names.

sajarin commented 1 year ago

@nseniak thanks for clarifying, seems like the source isn't escaping and parsing the database field names correctly. If someone's willing to push a fix, we'd be happy to review it.

myesn commented 1 year ago

Same problem:

Source of MongoDB version is: 4.4.14 Destination of MongoDB version is: 4.4.14