etf1 / kafka-message-scheduler

scheduler for kafka messages
MIT License
76 stars 14 forks source link

Scheduler sending already triggered messages #53

Closed Ahmad44452 closed 1 year ago

Ahmad44452 commented 1 year ago

I have set up a local environment for testing Kafka Message Scheduler using Docker. I am running two Kafka containers managed by Zookeeper where the schedules topic has two partitions and there are two containers of kafka-message-scheduler similarly. The solution is working really good, thanks to all the developers. However I noticed something. Suppose I add a message to the schedules topic and it gets stored in the partition 0. If it's tombstone message ends up in the partition 1 after it is triggered, it is sent again in case Kafka or kafka-message-scheduler is restarted. This behavior is same, if the partition of a cancelled message varies. Am I missing some configuration or is it a bug not yet reported? I have added my docker compose code just as an extra measure. Thank you

version: '3'
services:

  zookeeper:
    image: confluentinc/cp-zookeeper
    ports:
      - 2181:2181
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
      # for health check "ruok"
      KAFKA_OPTS: "-Dzookeeper.4lw.commands.whitelist=*"
    healthcheck:
      test: ["CMD-SHELL", "echo 'ruok' | nc -w 2 localhost 2181 | grep imok"]
      interval: 20s
      timeout: 10s
      retries: 10

  kafka-1:
    image: confluentinc/cp-kafka
    depends_on:
      zookeeper:
        condition: service_healthy
    healthcheck:
      test: ["CMD-SHELL", "kafka-topics --bootstrap-server localhost:9092 --list"]
      interval: 30s
      timeout: 10s
      retries: 20
    ports:
      # Exposes 9092 for external connections to the broker
      # Use kafka-1:29092 for connections internal on the docker network
      # See https://rmoff.net/2018/08/02/kafka-listeners-explained/ for details
      - 9092:9092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-1:29092,PLAINTEXT_HOST://127.0.0.1:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'false'

  kafka-2:
    image: confluentinc/cp-kafka
    depends_on:
      zookeeper:
        condition: service_healthy
    healthcheck:
      test: ["CMD-SHELL", "kafka-topics --bootstrap-server localhost:9093 --list"]
      interval: 30s
      timeout: 10s
      retries: 20
    ports:
      # Exposes 9093 for external connections to the broker
      # Use kafka-2:29092 for connections internal on the docker network
      # See https://rmoff.net/2018/08/02/kafka-listeners-explained/ for details
      - 9093:9093
    environment:
      KAFKA_BROKER_ID: 2
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-2:29092,PLAINTEXT_HOST://127.0.0.1:9093
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'false'

  schedulder:
    image: etf1/kafka-message-scheduler:v0.0.6
    depends_on:
      kafka-1:
        condition: service_healthy
      kafka-2:
        condition: service_healthy
    deploy:
      mode: replicated
      replicas: 2
    environment:
      BOOTSTRAP_SERVERS: kafka-1:29092,kafka-2:29092
fkarakas commented 1 year ago

Hello @Ahmad44452 ,

Happy to see that you like our scheduler, you can star it on github.com ;)

Your local setup is interesting, never tried a "local" kafka cluster with two nodes. But i am sure it is working properly. But perhaps it is the issue ...

Do you have the same issue with one node ?

For you information, we are using the scheduler on production with an AWS kafka cluster with several nodes (managed kafka) and no issue at all as you described. We have thousands of schedules planned.

As you launched 2 instances of the scheduler, each one will be assigned to one of the partition and read only the messages in its assigned partition.

Perhaps your issue comes from how you publish the schedule message in the schedules topic. If you hardcoded the partition number, it will not work.

You should let kafka handle in which topic the message should go based on the key of the message. This will make sure that the ordering is guaranted and that all messages with the same key will be consumed by the same instance of the scheduler.

You should not specify a partition like 0 or 1.

In GO we do it the following way: https://github.com/etf1/kafka-message-scheduler/blob/main/clientlib/clientlib.go#L112

kafka.Message{
        TopicPartition: kafka.TopicPartition{Topic: &schedulerTopic, Partition: kafka.PartitionAny},
....
}

Note the Partition: kafka.PartitionAny meaning => let the cluster using its own hashing of the key to determine the partition.

I hope it is clear, if not please describe how you publish the message.

Regards,

Ahmad44452 commented 1 year ago

Hy @fkarakas. Yes, I am facing the same issue with a single node. And no, I am not explicitly assigning partition when adding a message to Kafka. I am using kafkajs library to add messages to Kafka and following is my code for reference

const { Kafka } = require('kafkajs');
const uuidv4 = require('uuid').v4;

const kafka = new Kafka({
  clientId: 'my-app',
  brokers: ['localhost:9092', 'localhost:9093'],
});

const producer = kafka.producer()

const uniqId = uuidv4();

producer.connect().then(() => {
  producer.send({
    topic: 'schedules',
    messages: [
      {
        headers: {
          "scheduler-epoch": `${1698566477 + 60}`,
          "scheduler-target-topic": "timers",
          "scheduler-target-key": `${uuidv4()}`
        },
        key: `${uniqId}`,
        value: `This is a scheduled message with id ${uniqId}`
      }
    ],
  }).then(() => {
    console.log(`${uniqId} added`)
    producer.disconnect()
  })
});

Although, I noticed something upon further investigation following you response. Messages with same key are supposed to go to the same partition and they do, when I add messages myself with same key, no matter what amount. However, when kafka-message-scheduler adds a tombstone message with the same key, there is a possibility it might end up in a different partition and that is what that is creating this issue on restarts.

fkarakas commented 1 year ago

Ahmad, I will try to reproduce but i dont really understand why the tombstone which is a regular message with a nil body will be delivered in a different partition. We dont do anything special when it is a tombstone message in the publish process.

How did you create the schedules topic ?

Ahmad44452 commented 1 year ago

I used the command provided in documentation of kafka-message-scheduler. After changing it according to my scenario, following was my final command

kafka-topics --bootstrap-server "kafka-1:29092,kafka-2:29092" --create --topic schedules --partitions 2 --config "cleanup.policy=compact" --config "retention.ms=-1"

And about the tombstone message. It is not necessarily added to a different partition. For example, for the following message, the tombstone message was added in the same partition as intended. image

However for another message, the tombstone was added in the other partition. image

All I could think of, for now, is that the tombstone message added by kafka-message-scheduler is being treated like a message with some different key, which sometimes results in the tombstone message being added to a different partition. Even when the keys appear to be the same, at least on the viewing side.

fkarakas commented 1 year ago

Hello @Ahmad44452 ,

After some investigations, i found not problem in the scheduler code.

I added thousands of schedules and also tried basic tombstones messages. All are well partitionned.

You can find the example code here: https://github.com/fkarakas/scheduler-issue

make multi.up
make admin
make schedules
make sqlite

This will create a sqlite database with the kafka messages and running the following query:

select key, partition, count(*) as message_by_partition
from message
where topic = 'schedules'
group by key, partition
having message_by_partition != 2

shows messages are on the same partition. (count of 2)

you can use a query tool like dbeaver to query the sqlite file.

I think the issue come from the JS library kafkajs, which is not partitioning correctly the message based on the key: https://kafka.js.org/docs/migration-guide-v2.0.0#producer-new-default-partitioner

Some original schedules are sent to the incorrect partition but the scheduler is sending the tombstone to the correct one, so that is why you can see that the tombstone and the original schedule are not on the same partition.

We use the official GOLANG library and we never encountered this issue.

The official confluent go lib is based on rdkafka lib so i recommend you to try https://github.com/Blizzard/node-rdkafka which is a javascript wrapping lib on top of rdkafka lib. Perhaps it works better.

Or you can move to GO ;))

Hope i am clear and it helps.

fkarakas commented 1 year ago

actually to make it work with kafkajs you have to configure the scheduler producer partitioner to murmur2_random because by default rdkafka is using consistent_random. kafkajs is using murmur2 that is why we had this partition issue.

Please view examples in the following files:

https://github.com/fkarakas/scheduler-issue/blob/main/docker-compose.multi.yml#L81 https://github.com/fkarakas/scheduler-issue/blob/main/config.yaml

with this changes kafkajs lib and the scheduler will use the same partitioner and you will not have any issues anymore.

Regards, Fatih

fkarakas commented 1 year ago

closing for now please feel free to reopen