Closed mostafaghadimi closed 5 years ago
hi @mostafaghadimi nice to hear you want to give this project a spin :)
the main reason for the docker compose file your are referring to is to provide an environment for very a simple E2E test run during the build.
1) if you run docker-compose -f compose-env.yaml up
and everything starts up without issues you should be able to connect to kafka using localhost:9092
with any producer of your choice or the kafka-console-producer
in order to publish data to topics. likewise, you should be able to post a sink connector configuration by means of CURL or Postman to localhost:8083
2) not quite sure what you mean by the jar file isn't uploaded to the server. what happens is that the folder which contains the build artefact for the sink connector is mounted into the kafka-connect docker container via the volume definition in the compose file. provided that you run the docker-compose file from within the src/test/resources/docker
folder it should work fine. The relative path that you see is the source folder which points to the target
folder of the maven build artefact. This gets mounted into the correct folder of the corresponding docker container.
hope this gives you a better understanding about how this is supposed to work. again, this is there for E2E testing. you may want to use a different/better compose env for other things :)
Hi,
I have produced some data using kafka producer but they didn't save on mongodb collection. I need a sample to do so.
I mean jar file isn't located on the github repository. As you have told, it should be mounted into kafka-connect docker container, but there isn't any file to be mounted! :) cause I don't know how to build the project using maven. Would you please guide me how to run the project using maven to download the jar files or would you upload the jar files manually?
Thanks, Mostafa
Oh I see :)
1) You can build with maven simply by running from mvn clean package
in the root folder of the project (where the pom.xml
is located). This should download a bunch of dependencies when you run it first and result in the target/kafka-connect-monogdb
folder with the jar you need.
2) If for whatever reason you cannot build locally you can get a pre-build artefact in version 1.3.1 (not the latest snapshot from the master branch though) either on maven central or via confluent hub.
Thank you Hans-Peter,
The only question remained is the following:
I have produced some data using kafka producer but they didn't save on mongodb collection. I need a sample to do so. Would you explain the steps?
Not sure what you exactly mean by that. If you have successfully produced data to a kafka topic you need to run the sink connector simply by sending an HTTP POST request which contains your sink connector configuration of choice to the kafka-connect process. That's it.
You can find e.g. the configuration file for the E2E test (which is essentially only a 1:1 pass-through of the AVRO data that was produced to the kafka topic) in src/test/resources/config/sink_connector.json
. If you have JSON data in your topics instead of AVRO you choose the appropriate key/value converter settings in the configuration.
Yes I have created kafka topic, the question is exactly I don't know what and how to put data in POST request. Would you please send a sample to check whether it works or not?
Question: Is the following curl request ok? and where to create this curl request (inside the container or from our local computer)
curl -X POST -H "Content-Type: application/json" --data '{
"name": "e2e-test-mongo-sink",
"config": {
"connector.class": "at.grahsl.kafka.connect.mongodb.MongoDbSinkConnector",
"topics": "e2e-test-topic",
"mongodb.connection.uri": "mongodb://mongodb:27017/kafkaconnect?w=1&journal=true",
"mongodb.document.id.strategy": "at.grahsl.kafka.connect.mongodb.processor.id.strategy.ProvidedInValueStrategy",
"mongodb.collection": "e2e-test-collection",
"key.converter":"io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url":"http://schemaregistry:8081",
"value.converter":"io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url":"http://schemaregistry:8081"
}
}' http://localhost:8083/
Problem: Whenever I try it in the local computer, I get the following error:
{"error_code":405,"message":"HTTP 405 Method Not Allowed"}%
Here is my docker-compose.yml file:
version: '3'
services:
mongo:
image: mongo
container_name: mongo
hostname: mongo
ports:
- 27017:27017
zookeeper:
image: wurstmeister/zookeeper
container_name: zookeeper
hostname: zookeeper
ports:
- 2181:2181
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
image: wurstmeister/kafka
container_name: kafka
hostname: kafka
environment:
- KAFKA_ADVERTISED_HOST_NAME=localhost
- KAFKA_ADVERTISED_PORT=9092
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://:9092
schemaregistry:
image: confluentinc/cp-schema-registry
container_name: schemaregistry
hostname: schemaregistry
ports:
- 8081:8081
environment:
SCHEMA_REGISTRY_HOST_NAME: schemaregistry
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: zookeeper:2181
kafkaconnect:
image: confluentinc/cp-kafka-connect
container_name: kafkaconnect
hostname: kafkaconnect
depends_on:
- zookeeper
- kafka
- schemaregistry
ports:
- 8083:8083
volumes:
- ./target/kafka-connect-mongodb/:/etc/kafka-connect/jars/kafka-connect-mongodb/
environment:
CONNECT_BOOTSTRAP_SERVERS: kafka:9092
CONNECT_REST_ADVERTISED_HOST_NAME: kafkaconnect
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: compose-connect-group
CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: http://schemaregistry:8081
CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schemaregistry:8081
CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181'
CONNECT_PLUGIN_PATH: '/usr/share/java,/etc/kafka-connect/jars'
and this is my /target/kafka-connect-mongodb jar file that I have compressed it. I didn't build the whole project using maven. I have just cloned the project once, after building I just copied the required file to my directory.
The project structure looks like the following:
├── docker-compose.yml
├── Dockerfile
├── src
│ ├── main
│ │ ├── assembly
│ │ │ └── package.xml
│ │ ├── java
│ │ │ └── at
│ │ │ └── grahsl
│ │ │ └── kafka
│ │ │ └── connect
│ │ │ └── mongodb
│ │ │ ├── cdc
│ │ │ │ ├── CdcHandler.java
│ │ │ │ ├── CdcOperation.java
│ │ │ │ └── debezium
│ │ │ │ ├── DebeziumCdcHandler.java
│ │ │ │ ├── mongodb
│ │ │ │ │ ├── MongoDbDelete.java
│ │ │ │ │ ├── MongoDbHandler.java
│ │ │ │ │ ├── MongoDbInsert.java
│ │ │ │ │ └── MongoDbUpdate.java
│ │ │ │ ├── OperationType.java
│ │ │ │ └── rdbms
│ │ │ │ ├── mysql
│ │ │ │ │ └── MysqlHandler.java
│ │ │ │ ├── postgres
│ │ │ │ │ └── PostgresHandler.java
│ │ │ │ ├── RdbmsDelete.java
│ │ │ │ ├── RdbmsHandler.java
│ │ │ │ ├── RdbmsInsert.java
│ │ │ │ └── RdbmsUpdate.java
│ │ │ ├── CollectionAwareConfig.java
│ │ │ ├── converter
│ │ │ │ ├── AvroJsonSchemafulRecordConverter.java
│ │ │ │ ├── FieldConverter.java
│ │ │ │ ├── JsonRawStringRecordConverter.java
│ │ │ │ ├── JsonSchemalessRecordConverter.java
│ │ │ │ ├── RecordConverter.java
│ │ │ │ ├── SinkConverter.java
│ │ │ │ ├── SinkDocument.java
│ │ │ │ ├── SinkFieldConverter.java
│ │ │ │ └── types
│ │ │ │ └── sink
│ │ │ │ └── bson
│ │ │ │ ├── BooleanFieldConverter.java
│ │ │ │ ├── BytesFieldConverter.java
│ │ │ │ ├── Float32FieldConverter.java
│ │ │ │ ├── Float64FieldConverter.java
│ │ │ │ ├── Int16FieldConverter.java
│ │ │ │ ├── Int32FieldConverter.java
│ │ │ │ ├── Int64FieldConverter.java
│ │ │ │ ├── Int8FieldConverter.java
│ │ │ │ ├── logical
│ │ │ │ │ ├── DateFieldConverter.java
│ │ │ │ │ ├── DecimalFieldConverter.java
│ │ │ │ │ ├── TimeFieldConverter.java
│ │ │ │ │ └── TimestampFieldConverter.java
│ │ │ │ └── StringFieldConverter.java
│ │ │ ├── MongoDbSinkConnectorConfig.java
│ │ │ ├── MongoDbSinkConnector.java
│ │ │ ├── MongoDbSinkRecordBatches.java
│ │ │ ├── MongoDbSinkTask.java
│ │ │ ├── processor
│ │ │ │ ├── BlacklistKeyProjector.java
│ │ │ │ ├── BlacklistValueProjector.java
│ │ │ │ ├── DocumentIdAdder.java
│ │ │ │ ├── field
│ │ │ │ │ ├── projection
│ │ │ │ │ │ ├── BlacklistProjector.java
│ │ │ │ │ │ ├── FieldProjector.java
│ │ │ │ │ │ └── WhitelistProjector.java
│ │ │ │ │ └── renaming
│ │ │ │ │ ├── FieldnameMapping.java
│ │ │ │ │ ├── RegExpSettings.java
│ │ │ │ │ ├── RenameByMapping.java
│ │ │ │ │ ├── RenameByRegExp.java
│ │ │ │ │ └── Renamer.java
│ │ │ │ ├── id
│ │ │ │ │ └── strategy
│ │ │ │ │ ├── BsonOidStrategy.java
│ │ │ │ │ ├── FullKeyStrategy.java
│ │ │ │ │ ├── IdStrategy.java
│ │ │ │ │ ├── KafkaMetaDataStrategy.java
│ │ │ │ │ ├── PartialKeyStrategy.java
│ │ │ │ │ ├── PartialValueStrategy.java
│ │ │ │ │ ├── ProvidedInKeyStrategy.java
│ │ │ │ │ ├── ProvidedInValueStrategy.java
│ │ │ │ │ ├── ProvidedStrategy.java
│ │ │ │ │ └── UuidStrategy.java
│ │ │ │ ├── KafkaMetaAdder.java
│ │ │ │ ├── PostProcessor.java
│ │ │ │ ├── WhitelistKeyProjector.java
│ │ │ │ └── WhitelistValueProjector.java
│ │ │ ├── VersionUtil.java
│ │ │ └── writemodel
│ │ │ └── strategy
│ │ │ ├── DeleteOneDefaultStrategy.java
│ │ │ ├── MonotonicWritesDefaultStrategy.java
│ │ │ ├── ReplaceOneBusinessKeyStrategy.java
│ │ │ ├── ReplaceOneDefaultStrategy.java
│ │ │ ├── UpdateOneTimestampsStrategy.java
│ │ │ └── WriteModelStrategy.java
│ │ └── resources
│ │ ├── kafka-connect-mongodb-version.properties
│ │ └── logback.xml
│ └── test
│ ├── java
│ │ └── at
│ │ └── grahsl
│ │ └── kafka
│ │ └── connect
│ │ └── mongodb
│ │ ├── cdc
│ │ │ └── debezium
│ │ │ ├── mongodb
│ │ │ │ ├── MongoDbDeleteTest.java
│ │ │ │ ├── MongoDbHandlerTest.java
│ │ │ │ ├── MongoDbInsertTest.java
│ │ │ │ └── MongoDbUpdateTest.java
│ │ │ ├── OperationTypeTest.java
│ │ │ └── rdbms
│ │ │ ├── RdbmsDeleteTest.java
│ │ │ ├── RdbmsHandlerTest.java
│ │ │ ├── RdbmsInsertTest.java
│ │ │ └── RdbmsUpdateTest.java
│ │ ├── converter
│ │ │ ├── RecordConverterTest.java
│ │ │ ├── SinkConverterTest.java
│ │ │ ├── SinkDocumentTest.java
│ │ │ └── SinkFieldConverterTest.java
│ │ ├── data
│ │ │ └── avro
│ │ │ └── TweetMsg.java
│ │ ├── end2end
│ │ │ └── MinimumViableIT.java
│ │ ├── MongoDbSinkConnectorConfigTest.java
│ │ ├── MongoDbSinkRecordBatchesTest.java
│ │ ├── MongoDbSinkTaskTest.java
│ │ ├── processor
│ │ │ ├── DocumentIdAdderTest.java
│ │ │ ├── field
│ │ │ │ ├── projection
│ │ │ │ │ └── FieldProjectorTest.java
│ │ │ │ └── renaming
│ │ │ │ └── RenamerTest.java
│ │ │ └── id
│ │ │ └── strategy
│ │ │ └── IdStrategyTest.java
│ │ ├── ValidatorWithOperatorsTest.java
│ │ └── writemodel
│ │ └── strategy
│ │ └── WriteModelStrategyTest.java
│ └── resources
│ ├── avro
│ │ └── tweetmsg.avsc
│ ├── config
│ │ └── sink_connector.json
│ └── docker
│ └── compose-env.yml
└── target
├── kafka-connect-mongodb
│ └── kafka-connect-mongodb-1.3.2-SNAPSHOT-jar-with-dependencies.jar
└── kafka-connect-mongodb.zip
I think it's my last question. Thank you for your warm support! :) kafka-connect-mongodb.zip
PS: I run the docker-compose.yml which is located in the root of the structure, not docker-env.yml
Hi @hpgrahsl
I'm looking forward to hearing anything helpful from you. If you could help me, I would be very thankful. I have spend a lot of time resolving these issues and I don't know where to ask my questions except here and from you.
Thanks, Mostafa
@mostafaghadimi From what I've seen you are using the wrong URL for you POST request against the Kafka Connect REST API.
https://docs.confluent.io/current/connect/references/restapi.html#post--connectors
I think it might be very valuable for you to spend some time reading the Kafka Connect documentation. The Link above only discusses the REST API but there is plenty of more interesting stuff to learn about Kafka Connect. It also helps to work with any specific connector like this one :)
Hi @hpgrahsl
I have read the documentation carefully and the problem isn't solved. So I decided to implement the sink connection in a different manner. Now it works perfect.
Thanks for your attention, Mostafa
Thx @mostafaghadimi for your reply and closing. I'd really like the learn what you did in order to solve your problem. If it's an alternative approach to get your data into mongodb I'd be interested as well. If it's only due to a problem with your docker environment or the wrong REST API usage it would be a pity if you moved away from this approach ... anyway glad to hear you got it working somehow.
Hi @hpgrahsl
Sure, I have created a repository and I have explained what to do in the README.md
file.
Is there any similar way to connect kafka to redis?
Thanks, Mostafa
Hi Hans-Peter,
I'm trying to test the docker-env.yml file in the
kafka-connect-mongodb/src/test/resources/docker
directory. It runs without any error. The question is how to send data using kafka and write it on mongodb collection?The second thing is that the jar files address in
kafka-connect-container
aren't uploaded on the server. because of the address:../../../../target/kafka-connect-mongodb/
. Would you please upload the jar files?If you need my docker-compose file it can be found on stackoverflow.
Thanks, Mostafa