confluentinc / ksql

The database purpose-built for stream processing applications.
https://ksqldb.io
Other
99 stars 1.04k forks source link

Dropping and restarting a connector with a new config starts it twice #4924

Open MichaelDrogalis opened 4 years ago

MichaelDrogalis commented 4 years ago

Describe the bug

Uncovered in https://github.com/MichaelDrogalis/voluble/issues/4, it appears that when using embedded Connect, dropping and restarting a connector with a new configuration starts the connector twice. I'd blame my own shotty connector, but I also observed this (via logging in both cases) using the Debezium connector, too.

To Reproduce

Download the Voluble connector and put it in a directory called voluble:

confluent-hub install mdrogalis/voluble:0.2.0

Then start up the following with Docker Compose:

---
version: '2'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:5.4.0
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  broker:
    image: confluentinc/cp-enterprise-kafka:5.4.0
    hostname: broker
    container_name: broker
    depends_on:
      - zookeeper
    ports:
      - "29092:29092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092,PLAINTEXT_HOST://localhost:29092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1

  ksqldb-server:
    image: confluentinc/ksqldb-server:0.8.0
    hostname: ksqldb-server
    container_name: ksqldb-server
    depends_on:
      - broker
    ports:
      - "8088:8088"
    volumes:
      - "./voluble/:/usr/share/kafka/plugins/voluble"
    environment:
      KSQL_LISTENERS: http://0.0.0.0:8088
      KSQL_BOOTSTRAP_SERVERS: broker:9092
      KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true"
      KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true"
      KSQL_CONNECT_GROUP_ID: "ksql-connect-cluster"
      KSQL_CONNECT_BOOTSTRAP_SERVERS: "broker:9092"
      KSQL_CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.storage.StringConverter"
      KSQL_CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      KSQL_CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: "false"
      KSQL_CONNECT_CONFIG_STORAGE_TOPIC: "ksql-connect-configs"
      KSQL_CONNECT_OFFSET_STORAGE_TOPIC: "ksql-connect-offsets"
      KSQL_CONNECT_STATUS_STORAGE_TOPIC: "ksql-connect-statuses"
      KSQL_CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      KSQL_CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      KSQL_CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      KSQL_CONNECT_PLUGIN_PATH: "/usr/share/kafka/plugins"

  ksqldb-cli:
    image: confluentinc/ksqldb-cli:0.8.0
    container_name: ksqldb-cli
    depends_on:
      - broker
      - ksqldb-server
    entrypoint: /bin/sh
    tty: true

Then go into the CLI:

docker exec -it ksqldb-cli ksql http://ksqldb-server:8088

And run a simple connector configuration. This one creates exactly 3 records in a topic called customers:

CREATE SOURCE CONNECTOR customers WITH (
    'key.converter' = 'org.apache.kafka.connect.storage.StringConverter',
    'value.converter' = 'org.apache.kafka.connect.json.JsonConverter',
    'value.converter.schemas.enable' = 'false',
    'connector.class' = 'io.mdrogalis.voluble.VolubleSourceConnector',
    'genkp.customers.with' = '#{Internet.uuid}',
    'genv.customers.name.with' = '#{HitchhikersGuideToTheGalaxy.character}',
    'topic.customers.records.exactly' = 3
);

Check that there are 3 records. This should be work:

print 'customers' from beginning;

Now drop the connector:

drop connector customers;

And create it again, but this time asking it to create 5 records instead of 3:

CREATE SOURCE CONNECTOR customers WITH (
    'key.converter' = 'org.apache.kafka.connect.storage.StringConverter',
    'value.converter' = 'org.apache.kafka.connect.json.JsonConverter',
    'value.converter.schemas.enable' = 'false',
    'connector.class' = 'io.mdrogalis.voluble.VolubleSourceConnector',
    'genkp.customers.with' = '#{Internet.uuid}',
    'genv.customers.name.with' = '#{HitchhikersGuideToTheGalaxy.character}',
    'topic.customers.records.exactly' = 5                                                                                       
);

If you print the topic again, you'll see that it has 8 additional records (3 from the previous configuration, 5 from the new), for a total of 11 records.

Expected behavior

That there would be 8 records total at the end (3 from the previous configuration, and 5 from the new). Connector-specifics aside, I'd expect the connector to only run once with the newest configuration when it is launched again.

spena commented 4 years ago

@MichaelDrogalis This issue is not exclusively from KSQL. I reproduced the same thing using the Connector UI from Control Center. I don't know about connectors internals, so not sure if it's related to the Voluble connector or an internal Connect issue?

MichaelDrogalis commented 4 years ago

@spena I verified this behavior with different connectors - same happens with Debezium.

spena commented 4 years ago

@MichaelDrogalis You reproduced it using the Connect UI or just KSQL? The issue is happening in the Connect side.

MichaelDrogalis commented 4 years ago

I just used ksqlDB for both to verify that it wasn't an issue with one single connector.

spena commented 4 years ago

@MichaelDrogalis Ok. Try it directly from Connect instead of using ksqlDB. I think this is a Connect issue. I confirmed it happens using their API.

spena commented 4 years ago

Discussed this offline with the Connect team. This issue is caused by this: https://issues.apache.org/jira/browse/KAFKA-8869