apache / camel-kafka-connector

Camel Kafka Connector allows you to use all Camel components as Kafka Connect connectors
https://camel.apache.org
Apache License 2.0
154 stars 102 forks source link

camel-mongodb-kafka-connector “camel.source.endpoint.streamFilter” invalid #1112

Open yingbo-wu opened 3 years ago

yingbo-wu commented 3 years ago
{
    "connector.class": "org.apache.camel.kafkaconnector.mongodb.CamelMongodbSourceConnector",
    "tasks.max": "1",
    "topics": "mongo.test-object.save.source",
    "camel.source.endpoint.consumerType": "changeStreams",
    "camel.source.path.connectionBean": "mongo",
    "errors.deadletterqueue.context.headers.enable": "true",
    "camel.source.endpoint.collection": "test_object",
    "camel.source.endpoint.database": "ac-test",
    "errors.deadletterqueue.topic.name": "mongo.test-object.save.source.deadletter",
    "value.converter.schemas.enable": "false",
    "errors.tolerance": "all",
    "errors.deadletterqueue.topic.replication.factor": "1",
    "camel.source.endpoint.streamFilter": "{'$match':{'$or':[{'operationType':'insert'},{'operationType':'update'},{'operationType':'replace'}]}}",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "errors.log.enable": "true",
    "camel.source.endpoint.mongoConnection": "#class:com.mongodb.client.MongoClients#create('mongodb://root:root@192.168.23.164:27017/?authSource=admin')",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "camel.source.marshal": "org.apache.camel.component.gson.GsonDataFormat",
    "camel.source.pollingConsumerQueueSize": "10000",
    "camel.source.contentLogLevel": "INFO"
}

The camel.source.endpoint.streamFilter Setting an incorrect JSON format does not result in an exception So my guess is that this setting doesn't work

In the end I surmised that it was the Camel-MongoDB component problem The following is part of the Camel-MongoDB code org/apache/camel/component/mongodb3/MongoDbChangeStreamsConsumer.java

String streamFilter = (String) getRoute().getProperties().get(STREAM_FILTER_PROPERTY);
List<BsonDocument> bsonFilter = null;
if (ObjectHelper.isNotEmpty(streamFilter)) {
    bsonFilter = singletonList(BsonDocument.parse(streamFilter));
}

The camel.source.endpoint.streamFilter is URI parameters But camel-mongodb Is to read content from properties Whether the Camle-Mongo component has an incompatible connector configuration

yingbo-wu commented 3 years ago

In addition, it is escaped as + when the setting contains Spaces

{
    "camel.source.endpoint.streamFilter": "{$match: {operationType: {$in: ['insert', 'update', 'replace']}}}"
}

The log is: MongoDb endpoint: mongodb://mongo?collection=test_object&consumerType=changeStreams&database=ac-test&mongoConnection=%23class%3Acom.mongodb.client.MongoClients%23create%28%27mongodb%3A%2F%2Froot%3Aroot%40192.168.23.164%3A27017%2F%3FauthSource%3Dadmin%27%29&streamFilter=%7B%24match%3A+%7BoperationType%3A+%7B%24in%3A+%5B%27insert%27%2C+%27update%27%2C+%27replace%27%5D%7D%7D%7D

The decode is MongoDb endpoint: mongodb://mongo?collection=test_object&consumerType=changeStreams&database=ac-test&mongoConnection=#class:com.mongodb.client.MongoClients#create('mongodb://root:root@192.168.23.164:27017/?authSource=admin')&streamFilter={$match:+{operationType:+{$in:+['insert',+'update',+'replace']}}}

valdar commented 3 years ago

@yingbo-wu as you correctly reported this is a camel component issue, I have taken the liberty to open an issue about it https://issues.apache.org/jira/browse/CAMEL-16716