confluentinc / librdkafka

The Apache Kafka C/C++ library
Other
232 stars 3.15k forks source link

Creating Multiple Instance of Simple Consumer assigned to same partition. #3514

Closed devanshumisra closed 2 years ago

devanshumisra commented 3 years ago

Read the FAQ first: https://github.com/edenhill/librdkafka/wiki/FAQ

Description

Hi Eden. Been using your Simple Consumer code to consumer messages produced on a topic with a single partition. Now I have divided the topic into 4 partitions in order to make the consumption a little faster by creating 4 instances of the Simple Consumer with partition defined so that they consume only that partition messages and not others.

My Code and Config:

int main () {
    //Kafka Config
    char *brokers = "localhost:9092";
    int opt;
    rd_kafka_conf_t *conf;
    rd_kafka_topic_conf_t *topic_conf;
    char errstr[512];
    const char *debug = NULL;
    int do_conf_dump = 0;
    char tmp[16];
    rd_kafka_resp_err_t err;
    char *group = NULL;
    rd_kafka_topic_partition_list_t *topics;
    int is_subscription;
    int i;
    /* Kafka configuration */
    conf = rd_kafka_conf_new();

    /* Set logger */
    rd_kafka_conf_set_log_cb(conf, logger);

    /* Quick termination */
    snprintf(tmp, sizeof(tmp), "%i", SIGIO);
    rd_kafka_conf_set(conf, "internal.termination.signal", tmp, NULL, 0);

    /* Topic configuration */
    topic_conf = rd_kafka_topic_conf_new();

        signal(SIGINT, stop);
        signal(SIGUSR1, sig_usr1);

        if (debug &&
            rd_kafka_conf_set(conf, "debug", debug, errstr, sizeof(errstr)) !=
            RD_KAFKA_CONF_OK) {
                fprintf(stderr, "%% Debug configuration failed: %s: %s\n",
                        errstr, debug);
                exit(1);
        }
                if (!group)
                        group = "rdkafka_consumer_example";
                if (rd_kafka_conf_set(conf, "group.id", group,
                                      errstr, sizeof(errstr)) !=
                    RD_KAFKA_CONF_OK) {
                        fprintf(stderr, "%% %s\n", errstr);
                        exit(1);
                }

                /* Consumer groups always use broker based offset storage */
                if (rd_kafka_topic_conf_set(topic_conf, "offset.store.method",
                                            "broker",
                                            errstr, sizeof(errstr)) !=
                    RD_KAFKA_CONF_OK) {
                        fprintf(stderr, "%% %s\n", errstr);
                        exit(1);
                }

                /* Set default topic config for pattern-matched topics. */
                rd_kafka_conf_set_default_topic_conf(conf, topic_conf);
                /* Callback called on partition assignment changes */
                rd_kafka_conf_set_rebalance_cb(conf, rebalance_cb);

                rd_kafka_conf_set(conf, "enable.partition.eof", "true",
                                  NULL, 0);
            /* Create Kafka handle */
        if (!(rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf,
                                errstr, sizeof(errstr)))) {
                fprintf(stderr,
                        "%% Failed to create new consumer: %s\n",
                        errstr);
                exit(1);
        }

        /* Add brokers */
        if (rd_kafka_brokers_add(rk, brokers) == 0) {
                fprintf(stderr, "%% No valid brokers specified\n");
                exit(1);
        }

        rd_kafka_poll_set_consumer(rk);

        topics = rd_kafka_topic_partition_list_new(1);
        is_subscription = 1;
        char *topic = "topic_t1";
        int32_t partition = -1;

        rd_kafka_topic_partition_list_add(topics, topic, partition);

        if (is_subscription) {
                fprintf(stderr, "%% Subscribing to %d topics\n", topics->cnt);

                if ((err = rd_kafka_subscribe(rk, topics))) {
                        fprintf(stderr,
                                "%% Failed to start consuming topics: %s\n",
                                rd_kafka_err2str(err));
                        exit(1);
                }
        } else {
                fprintf(stderr, "%% Assigning %d partitions\n", topics->cnt);

                if ((err = rd_kafka_assign(rk, topics))) {
                        fprintf(stderr,
                                "%% Failed to assign partitions: %s\n",
                                rd_kafka_err2str(err));
                }
        }
        while (run) {
                rd_kafka_message_t *rkmessage;

                rkmessage = rd_kafka_consumer_poll(rk, 1000);
                if (rkmessage) {
                        cass_uuid_gen_random(uuid_gen, &uuid);
                        msg_consume(rkmessage, session, uuid, head, ap, statement);                    //This is where the message fetched from the topic t1_stream is sent to msg_consume for dissection
                        rd_kafka_message_destroy(rkmessage);
                }
        }

        err = rd_kafka_consumer_close(rk);
        if (err)
                fprintf(stderr, "%% Failed to close consumer: %s\n",
                        rd_kafka_err2str(err));
        else
                fprintf(stderr, "%% Consumer closed\n");

        rd_kafka_topic_partition_list_destroy(topics);

        /* Destroy handle */
        rd_kafka_destroy(rk);
        /* Let background threads clean up and terminate cleanly. */
        run = 5;
        while (run-- > 0 && rd_kafka_wait_destroyed(1000) == -1)
                printf("Waiting for librdkafka to decommission\n");
        if (run <= 0)
                rd_kafka_dump(stdout, rk);

        return 0;
}

Problem

I compile this program to create 4 different instances of consumer(Consumer1, Consumer2, Consumer3, Consumer4). But all of the consumer instances are assigned all the 4 partitions from the topic. Which further results in offset commit failed. If you could guide me as to how I can use this code to assign each instance of the compiled binary to exactly 1 partition, that would be helpful.

Thanks in advance.

edenhill commented 2 years ago

subscribe() takes a list of topics, not partitions.