confluentinc / cp-docker-images

[DEPRECATED] Docker images for Confluent Platform.
Apache License 2.0
1.14k stars 705 forks source link

Kafka connect 5.0.0 with avro and file source connector #604

Open fracasula opened 5 years ago

fracasula commented 5 years ago

Hi everyone,

I'm trying to set up a full stack docker compose configuration. It's all working fine except for the file source connector (that I configured to work with Avro) which for some reason creates a schema "string" instead of the one I specified in the connector properties file.

Here's my docker compose configuration for the kafka-connect container:

kafka-connect:
  image: confluentinc/cp-kafka-connect:5.0.0
  hostname: kafka-connect
  ports:
  - "8083:8083"
  environment:
    CONNECT_BOOTSTRAP_SERVERS: "kafka1:19092"
    CONNECT_REST_PORT: 8083
    CONNECT_GROUP_ID: compose-connect-group
    CONNECT_CONFIG_STORAGE_TOPIC: kafka-connect-configs
    CONNECT_OFFSET_STORAGE_TOPIC: kafka-connect-offsets
    CONNECT_STATUS_STORAGE_TOPIC: kafka-connect-status
    CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
    CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: 'http://kafka-schema-registry:8081'
    CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
    CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: 'http://kafka-schema-registry:8081'
    CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
    CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
    CONNECT_REST_ADVERTISED_HOST_NAME: "kafka-connect"
    CONNECT_LOG4J_ROOT_LOGLEVEL: "INFO"
    CONNECT_LOG4J_LOGGERS: "org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR"
    CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1"
    CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1"
    CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1"
  volumes:
  - ./fixtures:/etc/kafka-connect/connectors/
  depends_on:
  - zoo1
  - kafka1
  - kafka-schema-registry
  - kafka-rest-proxy

And here are the connector properties that I'm POSTing to the Kafka Connect API (the connector is created correctly and it writes the content of the suppliers-state.txt file into the specified topic):

{
  "name": "file_source_connector_suppliers-state",
  "config": {
    "connector.class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
    "tasks.max": "1",
    "file": "/etc/kafka-connect/connectors/suppliers-state.txt",
    "topic": "suppliers-state",
    "file_reader.avro.schema": "[{\"type\":\"enum\",\"name\":\"DeliveryScheduleType\",\"namespace\":\"direct.foodchain.suppliers.state.avro\",\"symbols\":[\"fixed\",\"days_ahead\"]},{\"type\":\"record\",\"name\":\"FixedDeliveryScheduleDetails\",\"namespace\":\"direct.foodchain.suppliers.state.avro\",\"fields\":[{\"name\":\"timezone\",\"type\":\"string\"},{\"name\":\"hours_notice_required\",\"type\":\"float\"},{\"name\":\"delivers_saturday\",\"type\":\"boolean\"},{\"name\":\"delivers_sunday\",\"type\":\"boolean\"},{\"name\":\"delivers_monday\",\"type\":\"boolean\"},{\"name\":\"delivers_tuesday\",\"type\":\"boolean\"},{\"name\":\"delivers_wednesday\",\"type\":\"boolean\"},{\"name\":\"delivers_thursday\",\"type\":\"boolean\"},{\"name\":\"delivers_friday\",\"type\":\"boolean\"}]},{\"type\":\"record\",\"name\":\"DaysAheadDeliveryScheduleDetails\",\"namespace\":\"direct.foodchain.suppliers.state.avro\",\"fields\":[{\"name\":\"timezone\",\"type\":\"string\"},{\"name\":\"cutoff_time\",\"type\":\"string\"},{\"name\":\"delivers_saturday\",\"type\":\"boolean\"},{\"name\":\"delivers_sunday\",\"type\":\"boolean\"}]},{\"type\":\"record\",\"name\":\"DeliverySchedule\",\"namespace\":\"direct.foodchain.suppliers.state.avro\",\"fields\":[{\"name\":\"type\",\"type\":\"DeliveryScheduleType\"},{\"name\":\"details\",\"type\":[\"DaysAheadDeliveryScheduleDetails\",\"FixedDeliveryScheduleDetails\"]}]},{\"type\":\"record\",\"name\":\"SupplierState\",\"namespace\":\"direct.foodchain.suppliers.state.avro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"email\",\"type\":\"string\"},{\"name\":\"created_by_user_id\",\"type\":\"string\"},{\"name\":\"created_at\",\"type\":\"string\"},{\"name\":\"delivery_schedule\",\"type\":[\"null\",\"DeliverySchedule\"],\"default\":\"null\"},{\"name\":\"logo_url\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"is_using_chat\",\"type\":\"boolean\",\"default\":false},{\"name\":\"user_ids\",\"type\":{\"type\":\"array\",\"items\":\"string\"},\"default\":[]},{\"name\":\"minimum_delivery_amount\",\"type\":\"float\",\"default\":0.0},{\"name\":\"default_margin\",\"type\":[\"null\",\"float\"],\"default\":null},{\"name\":\"postcode_areas\",\"type\":{\"type\":\"array\",\"items\":\"string\"},\"default\":[]}]}]"
  }
}

I must be missing something because it's not picking my schema (see file_reader.avro.schema in the connector properties file) but it's creating a brand new one which content is just "string".

Any idea what I'm doing wrong?

Thanks!

OneCricketeer commented 5 years ago

@fracasula

org.apache.kafka.connect.file.FileStreamSourceConnector is from the upstream Apache project, and has no dependencies on Avro; therefore, the property of file_reader.avro.schema doesn't exist.

Can you provide a link to where you got that configuration from?

weirp commented 5 years ago

@fracasula Think your connector.class should be com.github.mmolimar.kafka.connect.fs.FsSourceConnector You're using the connector at https://github.com/mmolimar/kafka-connect-fs , right?