confluentinc / librdkafka

The Apache Kafka C/C++ library
Other
320 stars 3.16k forks source link

thread is blocked when stoping the consumer #2119

Closed yorksen closed 4 years ago

yorksen commented 6 years ago

Description

I can a consumer program which simply consumes message from kafka, and an idempotent producer produces 20000000 messages of the same topic(only one partition) from another program. they works well yesterday, when I went home I left the consumer program running a whole night. but this morning when I back to work, I found the consumer program can not consume anything from kafka(kafka is still working),so I tried to stop the program and found it also can't be stopped as well, calling stack has been blocked below:

(gdb) bt
#0  pthread_cond_wait@@GLIBC_2.3.2 () at ../nptl/sysdeps/unix/sysv/linux/x86_64/pthread_cond_wait.S:185
#1  0x00007ff4fbc99fc9 in cnd_wait (cond=<optimized out>, mtx=<optimized out>) at tinycthread.c:442
#2  0x00007ff4fbc9a3a5 in cnd_timedwait_abs (cnd=cnd@entry=0x7ff4df836ee8, mtx=mtx@entry=0x7ff4df836ec0, tspec=tspec@entry=0x7ff4eabfc3a0) at tinycthread_extra.c:96
#3  0x00007ff4fbc62a4c in rd_kafka_q_pop_serve (rkq=rkq@entry=0x7ff4df836ec0, timeout_ms=timeout_ms@entry=-1, version=version@entry=0, cb_type=cb_type@entry=RD_KAFKA_Q_CB_RETURN,
    callback=callback@entry=0x0, opaque=opaque@entry=0x0) at rdkafka_queue.c:390
#4  0x00007ff4fbc62b20 in rd_kafka_q_pop (rkq=rkq@entry=0x7ff4df836ec0, timeout_ms=timeout_ms@entry=-1, version=version@entry=0) at rdkafka_queue.c:416
#5  0x00007ff4fbc63728 in rd_kafka_q_wait_result (rkq=rkq@entry=0x7ff4df836ec0, timeout_ms=timeout_ms@entry=-1) at rdkafka_queue.c:802
#6  0x00007ff4fbc2e657 in rd_kafka_consume_stop0 (rktp=0x7ff4f3c2b280) at rdkafka.c:2139
#7  rd_kafka_consume_stop (app_rkt=0x7ff4f609f400, partition=<optimized out>) at rdkafka.c:2168
#8  0x00007ff4fd54f384 in mq_consumer_consume_impl (consumer=0x7ff4f60cb380, param=0x7ff4f60d0ba8) at source/mq_consumer.c:344
#9  0x00007ff4fd54f452 in mq_consumer_thread_fun (param=0x7ff4f60d0ba0) at source/mq_consumer.c:372
#10 0x00007ff4fc709184 in start_thread (arg=0x7ff4eabfe700) at pthread_create.c:312
#11 0x00007ff4fc43603d in clone () at ../sysdeps/unix/sysv/linux/x86_64/clone.S:111
(gdb)

kafka server configs below: broker.id=0 listeners=PLAINTEXT://:9092 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 num.partitions=1 num.recovery.threads.per.data.dir=1 offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 log.dirs=/opt/kafka-logs log.cleanup.policy=delete log.retention.hours=1 log.retention.bytes=1073741824 auto.create.topics.enable=true delete.topic.enable=true log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 zookeeper.connect=localhost:2181 zookeeper.connection.timeout.ms=6000 group.initial.rebalance.delay.ms=0 delete.topic.enable=true connections.max.idle.ms=1200000

How to reproduce

leaves a consumer program running a whole night.

Checklist

IMPORTANT: We will close issues where the checklist has not been completed.

Please provide the following information:

edenhill commented 6 years ago

It seems like you are using the legacy Consumer (start(), stop()), but also specifying a group.id, which is mostly a high-level KafkaConsumer thing.

Can you share relevant parts of your consumer code?

yorksen commented 6 years ago

yes, I use rd_kafka_consumer_stop to stop the consumer with "group.id" is setted. the program is blocked at rd_kafka_consume_stop below:

static MQ_ERRCODE mq_consumer_consume_impl(mq_consumer* consumer, mq_consumer_consume_param* param)
{
MQ_ERRCODE errcode = ERR_MQ_SUCCEED;
rd_kafka_message_t** messages = NULL;
ssize_t message_num = 0;
ssize_t i = 0;
int ret = 0;
bool batch_last = false;
size_t interval = 0;
size_t timeout_delay = 0;
size_t timeout_delay_counter = 0;

errcode = mq_consumer_check_param(consumer, param);
if (ERR_MQ_SUCCEED != errcode)
{
    return errcode;
}

messages = (rd_kafka_message_t**)malloc(sizeof(*messages) * param->batch_num);
if (!messages)
{
    MQ_ERROR("failed to malloc batch messages,batch_num:%lu\n", param->batch_num);
    return ERR_MQ_MALLOC_FAILED;
}
(void)memset(messages, 0, sizeof(*messages) * param->batch_num);

if (rd_kafka_consume_start(consumer->m_topic, MQ_KAFKA_DEFAULT_PARTITION, param->offset))
{
    MQ_ERROR("failed to start cosuming,topic:%s,err:%s\n", rd_kafka_topic_name(consumer->m_topic),
            rd_kafka_err2str(rd_kafka_last_error()));
    free(messages);
    messages = NULL;
    return ERR_MQ_CONSUME_START_FAILED;
} 

interval = param->timeout_interval?param->timeout_interval:MQ_CONSUMER_DEFAULT_CONSUME_TIMEOUT;
timeout_delay = mq_consumer_get_timeout_maxdelay(param->timeout, interval);
while (likely(MQ_CONSUMER_IDLE != consumer->m_state))
{
    message_num = rd_kafka_consume_batch(consumer->m_topic, MQ_KAFKA_DEFAULT_PARTITION, interval, 
            messages, param->batch_num);
    if (unlikely(0 > message_num))
    {
        MQ_ERROR("failed to batch consuming,topic:%s\n", rd_kafka_topic_name(consumer->m_topic));
        continue;
    }

    if (unlikely(!message_num))
    {
        if (unlikely(param->timeout_cb && ((++timeout_delay_counter) >= timeout_delay)))
        {
            param->timeout_cb(rd_kafka_topic_name(consumer->m_topic), param->timeout_cb_param);
            timeout_delay_counter = 0;
        }

        continue;
    }
    timeout_delay_counter = 0;

    for (i = 0; likely(i < message_num); ++i)
    {
        batch_last = mq_consumer_is_batch_last(i, messages, message_num);
        errcode = mq_consumer_handle_message(messages[i], param->cb, batch_last, param->cb_param);
        if (unlikely(ERR_MQ_SUCCEED != errcode))
        {
            MQ_ERROR("failed to handle message,topic:%s,errcode:%lu\n", 
                    rd_kafka_topic_name(consumer->m_topic), errcode);
        }

        rd_kafka_message_destroy(messages[i]);
        messages[i] = NULL;
    }
}

free(messages);
messages = NULL;
ret = rd_kafka_consume_stop(consumer->m_topic, MQ_KAFKA_DEFAULT_PARTITION);// <-- blocked here
if (ret)
{
MQ_ERROR("failed to stop consuming,topic:%s,err:%s\n", rd_kafka_topic_name(consumer->m_topic),
rd_kafka_err2str(rd_kafka_last_error()));
return ERR_MQ_CONSUME_STOP_FAILED;
}

return ERR_MQ_SUCCEED;
}
edenhill commented 6 years ago

Are you making any calls to assign() or subscribe()?

yorksen commented 6 years ago

no, it just consumes topic via rd_kafka_consume_batch, and an idempotent producer in another process produces messages via rd_kafka_produce. there is no subscribe/publish mode in my programs.

by the way, the topic only has one partition.

edenhill commented 6 years ago

Okay, thanks.

Can you reproduce this with debug set to all and provide the logs?

yorksen commented 6 years ago

ok, when I add debug = all enable in consumer config and start the program, there are terribly too many prints like this: %7|1543308536.646|FETCH|rdkafka#consumer-2| [thrd:SZX1000449934:9093/bootstrap]: SZX1000449934:9093/1: Fetch 10/10/10 toppar(s) %7|1543308536.646|SEND|rdkafka#consumer-2| [thrd:SZX1000449934:9093/bootstrap]: SZX1000449934:9093/1: Sent FetchRequest (v4, 459 bytes @ 0, CorrId 606) %7|1543308536.710|RECV|rdkafka#consumer-2| [thrd:SZX1000449934:9092/bootstrap]: SZX1000449934:9092/0: Received FetchResponse (v4, 330 bytes, CorrId 610, rtt 99.76ms) %7|1543308536.710|FETCH|rdkafka#consumer-2| [thrd:SZX1000449934:9092/bootstrap]: SZX1000449934:9092/0: Topic topic1 [0] MessageSet size 0, error "Success", MaxOffset 0, Ver 2/2 %7|1543308536.710|FETCH|rdkafka#consumer-2| [thrd:SZX1000449934:9092/bootstrap]: SZX1000449934:9092/0: Topic topic6 [0] MessageSet size 0, error "Success", MaxOffset 0, Ver 2/2 %7|1543308536.710|FETCH|rdkafka#consumer-2| [thrd:SZX1000449934:9092/bootstrap]: SZX1000449934:9092/0: Topic 111111111111111111111111111111 [0] MessageSet size 0, error "Success", MaxOffset 0, Ver 2/2 %7|1543308536.710|FETCH|rdkafka#consumer-2| [thrd:SZX1000449934:9092/bootstrap]: SZX1000449934:9092/0: Topic datastore_scheduler_smooth_0 [0] MessageSet size 0, error "Success", MaxOffset 0, Ver 2/2 %7|1543308536.710|FETCH|rdkafka#consumer-2| [thrd:SZX1000449934:9092/bootstrap]: SZX1000449934:9092/0: Topic datastore_scheduler_smooth_4 [0] MessageSet size 0, error "Success", MaxOffset 0, Ver 2/2 %7|1543308536.710|FETCH|rdkafka#consumer-2| [thrd:SZX1000449934:9092/bootstrap]: SZX1000449934:9092/0: Topic topic123 [0] MessageSet size 0, error "Success", MaxOffset 0, Ver 2/2 %7|1543308536.710|FETCH|rdkafka#consumer-2| [thrd:SZX1000449934:9092/bootstrap]: SZX1000449934:9092/0: Fetch topic topic6 [0] at offset 0 (v2) %7|1543308536.710|FETCH|rdkafka#consumer-2| [thrd:SZX1000449934:9092/bootstrap]: SZX1000449934:9092/0: Fetch topic 111111111111111111111111111111 [0] at offset 0 (v2) %7|1543308536.710|FETCH|rdkafka#consumer-2| [thrd:SZX1000449934:9092/bootstrap]: SZX1000449934:9092/0: Fetch topic datastore_scheduler_smooth_0 [0] at offset 0 (v2) %7|1543308536.710|FETCH|rdkafka#consumer-2| [thrd:SZX1000449934:9092/bootstrap]: SZX1000449934:9092/0: Fetch topic datastore_scheduler_smooth_4 [0] at offset 0 (v2) %7|1543308536.710|FETCH|rdkafka#consumer-2| [thrd:SZX1000449934:9092/bootstrap]: SZX1000449934:9092/0: Fetch topic topic123 [0] at offset 0 (v2) %7|1543308536.710|FETCH|rdkafka#consumer-2| [thrd:SZX1000449934:9092/bootstrap]: SZX1000449934:9092/0: Fetch topic topic1 [0] at offset 0 (v2) %7|1543308536.710|FETCH|rdkafka#consumer-2| [thrd:SZX1000449934:9092/bootstrap]: SZX1000449934:9092/0: Fetch 6/6/6 toppar(s) %7|1543308536.710|SEND|rdkafka#consumer-2| [thrd:SZX1000449934:9092/bootstrap]: SZX1000449934:9092/0: Sent FetchRequest (v4, 280 bytes @ 0, CorrId 611) %7|1543308536.716|RECV|rdkafka#consumer-2| [thrd:SZX1000449934:9094/bootstrap]: SZX1000449934:9094/2: Received FetchResponse (v4, 612 bytes, CorrId 612, rtt 100.92ms) %7|1543308536.716|FETCH|rdkafka#consumer-2| [thrd:SZX1000449934:9094/bootstrap]: SZX1000449934:9094/2: Topic topic5 [0] MessageSet size 0, error "Success", MaxOffset 0, Ver 2/2 %7|1543308536.716|FETCH|rdkafka#consumer-2| [thrd:SZX1000449934:9094/bootstrap]: SZX1000449934:9094/2: Topic 222222222222222222222222222222 [0] MessageSet size 0, error "Success", MaxOffset 0, Ver 2/2 %7|1543308536.716|FETCH|rdkafka#consumer-2| [thrd:SZX1000449934:9094/bootstrap]: SZX1000449934:9094/2: Topic 333333333333333333333333333333 [0] MessageSet size 0, error "Success", MaxOffset 0, Ver 2/2 %7|1543308536.716|FETCH|rdkafka#consumer-2| [thrd:SZX1000449934:9094/bootstrap]: SZX1000449934:9094/2: Topic 555555555555555555555555555555 [0] MessageSet size 0, error "Success", MaxOffset 0, Ver 2/2 %7|1543308536.716|FETCH|rdkafka#consumer-2| [thrd:SZX1000449934:9094/bootstrap]: SZX1000449934:9094/2: Topic 666666666666666666666666666666 [0] MessageSet size 0, error "Success", MaxOffset 0, Ver 2/2 %7|1543308536.716|FETCH|rdkafka#consumer-2| [thrd:SZX1000449934:9094/bootstrap]: SZX1000449934:9094/2: Topic datastore_scheduler_smooth_1 [0] MessageSet size 0, error "Success", MaxOffset 0, Ver 2/2 %7|1543308536.716|FETCH|rdkafka#consumer-2| [thrd:SZX1000449934:9094/bootstrap]: SZX1000449934:9094/2: Topic datastore_scheduler_smooth_2 [0] MessageSet size 0, error "Success", MaxOffset 0, Ver 2/2 %7|1543308536.716|FETCH|rdkafka#consumer-2| [thrd:SZX1000449934:9094/bootstrap]: SZX1000449934:9094/2: Topic datastore_scheduler_smooth_3 [0] MessageSet size 0, error "Success", MaxOffset 0, Ver 2/2 %7|1543308536.716|FETCH|rdkafka#consumer-2| [thrd:SZX1000449934:9094/bootstrap]: SZX1000449934:9094/2: Topic datastore_scheduler_smooth_7 [0] MessageSet size 0, error "Success", MaxOffset 0, Ver 2/2 %7|1543308536.716|FETCH|rdkafka#consumer-2| [thrd:SZX1000449934:9094/bootstrap]: SZX1000449934:9094/2: Topic topic3 [0] MessageSet size 0, error "Success", MaxOffset 0, Ver 2/2 %7|1543308536.716|FETCH|rdkafka#consumer-2| [thrd:SZX1000449934:9094/bootstrap]: SZX1000449934:9094/2: Fetch topic 222222222222222222222222222222 [0] at offset 0 (v2) %7|1543308536.716|FETCH|rdkafka#consumer-2| [thrd:SZX1000449934:9094/bootstrap]: SZX1000449934:9094/2: Fetch topic 333333333333333333333333333333 [0] at offset 0 (v2) %7|1543308536.716|FETCH|rdkafka#consumer-2| [thrd:SZX1000449934:9094/bootstrap]: SZX1000449934:9094/2: Fetch topic 555555555555555555555555555555 [0] at offset 0 (v2) %7|1543308536.716|FETCH|rdkafka#consumer-2| [thrd:SZX1000449934:9094/bootstrap]: SZX1000449934:9094/2: Fetch topic 666666666666666666666666666666 [0] at offset 0 (v2) %7|1543308536.716|FETCH|rdkafka#consumer-2| [thrd:SZX1000449934:9094/bootstrap]: SZX1000449934:9094/2: Fetch topic datastore_scheduler_smooth_1 [0] at offset 0 (v2) %7|1543308536.716|FETCH|rdkafka#consumer-2| [thrd:SZX1000449934:9094/bootstrap]: SZX1000449934:9094/2: Fetch topic datastore_scheduler_smooth_2 [0] at offset 0 (v2) %7|1543308536.716|FETCH|rdkafka#consumer-2| [thrd:SZX1000449934:9094/bootstrap]: SZX1000449934:9094/2: Fetch topic datastore_scheduler_smooth_3 [0] at offset 0 (v2) %7|1543308536.716|FETCH|rdkafka#consumer-2| [thrd:SZX1000449934:9094/bootstrap]: SZX1000449934:9094/2: Fetch topic datastore_scheduler_smooth_7 [0] at offset 0 (v2) %7|1543308536.716|FETCH|rdkafka#consumer-2| [thrd:SZX1000449934:9094/bootstrap]: SZX1000449934:9094/2: Fetch topic topic3 [0] at offset 0 (v2) %7|1543308536.716|FETCH|rdkafka#consumer-2| [thrd:SZX1000449934:9094/bootstrap]: SZX1000449934:9094/2: Fetch topic topic5 [0] at offset 0 (v2) %7|1543308536.716|FETCH|rdkafka#consumer-2| [thrd:SZX1000449934:9094/bootstrap]: SZX1000449934:9094/2: Fetch 10/10/10 toppar(s) %7|1543308536.716|SEND|rdkafka#consumer-2| [thrd:SZX1000449934:9094/bootstrap]: SZX1000449934:9094/2: Sent FetchRequest (v4, 506 bytes @ 0, CorrId 613) %7|1543308536.747|RECV|rdkafka#consumer-2| [thrd:SZX1000449934:9093/bootstrap]: SZX1000449934:9093/1: Received FetchResponse (v4, 565 bytes, CorrId 606, rtt 101.08ms) %7|1543308536.747|FETCH|rdkafka#consumer-2| [thrd:SZX1000449934:9093/bootstrap]: SZX1000449934:9093/1: Topic 444444444444444444444444444444 [0] MessageSet size 0, error "Success", MaxOffset 0, Ver 2/2 %7|1543308536.747|FETCH|rdkafka#consumer-2| [thrd:SZX1000449934:9093/bootstrap]: SZX1000449934:9093/1: Topic 777777777777777777777777777777 [0] MessageSet size 0, error "Success", MaxOffset 0, Ver 2/2 %7|1543308536.747|FETCH|rdkafka#consumer-2| [thrd:SZX1000449934:9093/bootstrap]: SZX1000449934:9093/1: Topic 888888888888888888888888888888 [0] MessageSet size 0, error "Success", MaxOffset 0, Ver 2/2 %7|1543308536.747|FETCH|rdkafka#consumer-2| [thrd:SZX1000449934:9093/bootstrap]: SZX1000449934:9093/1: Topic dastore_scheduler_requests [0] MessageSet size 0, error "Success", MaxOffset 0, Ver 2/2 %7|1543308536.747|FETCH|rdkafka#consumer-2| [thrd:SZX1000449934:9093/bootstrap]: SZX1000449934:9093/1: Topic datastore_scheduler_smooth_5 [0] MessageSet size 0, error "Success", MaxOffset 0, Ver 2/2 %7|1543308536.747|FETCH|rdkafka#consumer-2| [thrd:SZX1000449934:9093/bootstrap]: SZX1000449934:9093/1: Topic datastore_scheduler_smooth_6 [0] MessageSet size 0, error "Success", MaxOffset 0, Ver 2/2 %7|1543308536.747|FETCH|rdkafka#consumer-2| [thrd:SZX1000449934:9093/bootstrap]: SZX1000449934:9093/1: Topic topic10 [0] MessageSet size 0, error "Success", MaxOffset 0, Ver 2/2 %7|1543308536.747|FETCH|rdkafka#consumer-2| [thrd:SZX1000449934:9093/bootstrap]: SZX1000449934:9093/1: Topic topic2 [0] MessageSet size 0, error "Success", MaxOffset 0, Ver 2/2 %7|1543308536.747|FETCH|rdkafka#consumer-2| [thrd:SZX1000449934:9093/bootstrap]: SZX1000449934:9093/1: Topic topic4 [0] MessageSet size 0, error "Success", MaxOffset 0, Ver 2/2 %7|1543308536.747|FETCH|rdkafka#consumer-2| [thrd:SZX1000449934:9093/bootstrap]: SZX1000449934:9093/1: Topic topic7 [0] MessageSet size 0, error "Success", MaxOffset 0, Ver 2/2 %7|1543308536.747|FETCH|rdkafka#consumer-2| [thrd:SZX1000449934:9093/bootstrap]: SZX1000449934:9093/1: Fetch topic 777777777777777777777777777777 [0] at offset 0 (v2) %7|1543308536.747|FETCH|rdkafka#consumer-2| [thrd:SZX1000449934:9093/bootstrap]: SZX1000449934:9093/1: Fetch topic 888888888888888888888888888888 [0] at offset 0 (v2) %7|1543308536.747|FETCH|rdkafka#consumer-2| [thrd:SZX1000449934:9093/bootstrap]: SZX1000449934:9093/1: Fetch topic dastore_scheduler_requests [0] at offset 0 (v2) %7|1543308536.747|FETCH|rdkafka#consumer-2| [thrd:SZX1000449934:9093/bootstrap]: SZX1000449934:9093/1: Fetch topic datastore_scheduler_smooth_5 [0] at offset 0 (v2) %7|1543308536.747|FETCH|rdkafka#consumer-2| [thrd:SZX1000449934:9093/bootstrap]: SZX1000449934:9093/1: Fetch topic datastore_scheduler_smooth_6 [0] at offset 0 (v2) %7|1543308536.747|FETCH|rdkafka#consumer-2| [thrd:SZX1000449934:9093/bootstrap]: SZX1000449934:9093/1: Fetch topic topic10 [0] at offset 0 (v2) %7|1543308536.747|FETCH|rdkafka#consumer-2| [thrd:SZX1000449934:9093/bootstrap]: SZX1000449934:9093/1: Fetch topic topic2 [0] at offset 0 (v2) %7|1543308536.747|FETCH|rdkafka#consumer-2| [thrd:SZX1000449934:9093/bootstrap]: SZX1000449934:9093/1: Fetch topic topic4 [0] at offset 0 (v2) %7|1543308536.747|FETCH|rdkafka#consumer-2| [thrd:SZX1000449934:9093/bootstrap]: SZX1000449934:9093/1: Fetch topic topic7 [0] at offset 0 (v2) %7|1543308536.747|FETCH|rdkafka#consumer-2| [thrd:SZX1000449934:9093/bootstrap]: SZX1000449934:9093/1: Fetch topic 444444444444444444444444444444 [0] at offset 0 (v2) %7|1543308536.747|FETCH|rdkafka#consumer-2| [thrd:SZX1000449934:9093/bootstrap]: SZX1000449934:9093/1: Fetch 10/10/10 toppar(s) %7|1543308536.747|SEND|rdkafka#consumer-2| [thrd:SZX1000449934:9093/bootstrap]: SZX1000449934:9093/1: Sent FetchRequest (v4, 459 bytes @ 0, CorrId 607) %7|1543308536.810|RECV|rdkafka#consumer-2| [thrd:SZX1000449934:9092/bootstrap]: SZX1000449934:9092/0: Received FetchResponse (v4, 330 bytes, CorrId 611, rtt 99.92ms) %7|1543308536.810|FETCH|rdkafka#consumer-2| [thrd:SZX1000449934:9092/bootstrap]: SZX1000449934:9092/0: Topic topic6 [0] MessageSet size 0, error "Success", MaxOffset 0, Ver 2/2 %7|1543308536.810|FETCH|rdkafka#consumer-2| [thrd:SZX1000449934:9092/bootstrap]: SZX1000449934:9092/0: Topic 111111111111111111111111111111 [0] MessageSet size 0, error "Success", MaxOffset 0, Ver 2/2 %7|1543308536.810|FETCH|rdkafka#consumer-2| [thrd:SZX1000449934:9092/bootstrap]: SZX1000449934:9092/0: Topic datastore_scheduler_smooth_0 [0] MessageSet size 0, error "Success", MaxOffset 0, Ver 2/2 %7|1543308536.810|FETCH|rdkafka#consumer-2| [thrd:SZX1000449934:9092/bootstrap]: SZX1000449934:9092/0: Topic datastore_scheduler_smooth_4 [0] MessageSet size 0, error "Success", MaxOffset 0, Ver 2/2 %7|1543308536.810|FETCH|rdkafka#consumer-2| [thrd:SZX1000449934:9092/bootstrap]: SZX1000449934:9092/0: Topic topic123 [0] MessageSet size 0, error "Success", MaxOffset 0, Ver 2/2 %7|1543308536.810|FETCH|rdkafka#consumer-2| [thrd:SZX1000449934:9092/bootstrap]: SZX1000449934:9092/0: Topic topic1 [0] MessageSet size 0, error "Success", MaxOffset 0, Ver 2/2 %7|1543308536.810|FETCH|rdkafka#consumer-2| [thrd:SZX1000449934:9092/bootstrap]: SZX1000449934:9092/0: Fetch topic 111111111111111111111111111111 [0] at offset 0 (v2) %7|1543308536.810|FETCH|rdkafka#consumer-2| [thrd:SZX1000449934:9092/bootstrap]: SZX1000449934:9092/0: Fetch topic datastore_scheduler_smooth_0 [0] at offset 0 (v2) %7|1543308536.810|FETCH|rdkafka#consumer-2| [thrd:SZX1000449934:9092/bootstrap]: SZX1000449934:9092/0: Fetch topic datastore_scheduler_smooth_4 [0] at offset 0 (v2) %7|1543308536.810|FETCH|rdkafka#consumer-2| [thrd:SZX1000449934:9092/bootstrap]: SZX1000449934:9092/0: Fetch topic topic123 [0] at offset 0 (v2) %7|1543308536.810|FETCH|rdkafka#consumer-2| [thrd:SZX1000449934:9092/bootstrap]: SZX1000449934:9092/0: Fetch topic topic1 [0] at offset 0 (v2) %7|1543308536.810|FETCH|rdkafka#consumer-2| [thrd:SZX1000449934:9092/bootstrap]: SZX1000449934:9092/0: Fetch topic topic6 [0] at offset 0 (v2) %7|1543308536.810|FETCH|rdkafka#consumer-2| [thrd:SZX1000449934:9092/bootstrap]: SZX1000449934:9092/0: Fetch 6/6/6 toppar(s) %7|1543308536.810|SEND|rdkafka#consumer-2| [thrd:SZX1000449934:9092/bootstrap]: SZX1000449934:9092/0: Sent FetchRequest (v4, 280 bytes @ 0, CorrId 612) %7|1543308536.817|RECV|rdkafka#consumer-2| [thrd:SZX1000449934:9094/bootstrap]: SZX1000449934:9094/2: Received FetchResponse (v4, 612 bytes, CorrId 613, rtt 100.97ms) %7|1543308536.817|FETCH|rdkafka#consumer-2| [thrd:SZX1000449934:9094/bootstrap]: SZX1000449934:9094/2: Topic 222222222222222222222222222222 [0] MessageSet size 0, error "Success", MaxOffset 0, Ver 2/2 %7|1543308536.817|FETCH|rdkafka#consumer-2| [thrd:SZX1000449934:9094/bootstrap]: SZX1000449934:9094/2: Topic 333333333333333333333333333333 [0] MessageSet size 0, error "Success", MaxOffset 0, Ver 2/2 %7|1543308536.817|FETCH|rdkafka#consumer-2| [thrd:SZX1000449934:9094/bootstrap]: SZX1000449934:9094/2: Topic 555555555555555555555555555555 [0] MessageSet size 0, error "Success", MaxOffset 0, Ver 2/2 %7|1543308536.817|FETCH|rdkafka#consumer-2| [thrd:SZX1000449934:9094/bootstrap]: SZX1000449934:9094/2: Topic 666666666666666666666666666666 [0] MessageSet size 0, error "Success", MaxOffset 0, Ver 2/2 %7|1543308536.817|FETCH|rdkafka#consumer-2| [thrd:SZX1000449934:9094/bootstrap]: SZX1000449934:9094/2: Topic datastore_scheduler_smooth_1 [0] MessageSet size 0, error "Success", MaxOffset 0, Ver 2/2 %7|1543308536.817|FETCH|rdkafka#consumer-2| [thrd:SZX1000449934:9094/bootstrap]: SZX1000449934:9094/2: Topic datastore_scheduler_smooth_2 [0] MessageSet size 0, error "Success", MaxOffset 0, Ver 2/2 %7|1543308536.817|FETCH|rdkafka#consumer-2| [thrd:SZX1000449934:9094/bootstrap]: SZX1000449934:9094/2: Topic datastore_scheduler_smooth_3 [0] MessageSet size 0, error "Success", MaxOffset 0, Ver 2/2 %7|1543308536.817|FETCH|rdkafka#consumer-2| [thrd:SZX1000449934:9094/bootstrap]: SZX1000449934:9094/2: Topic datastore_scheduler_smooth_7 [0] MessageSet size 0, error "Success", MaxOffset 0, Ver 2/2 %7|1543308536.817|FETCH|rdkafka#consumer-2| [thrd:SZX1000449934:9094/bootstrap]: SZX1000449934:9094/2: Topic topic3 [0] MessageSet size 0, error "Success", MaxOffset 0, Ver 2/2 %7|1543308536.817|FETCH|rdkafka#consumer-2| [thrd:SZX1000449934:9094/bootstrap]: SZX1000449934:9094/2: Topic topic5 [0] MessageSet size 0, error "Success", MaxOffset 0, Ver 2/2 %7|1543308536.817|FETCH|rdkafka#consumer-2| [thrd:SZX1000449934:9094/bootstrap]: SZX1000449934:9094/2: Fetch topic 333333333333333333333333333333 [0] at offset 0 (v2) %7|1543308536.817|FETCH|rdkafka#consumer-2| [thrd:SZX1000449934:9094/bootstrap]: SZX1000449934:9094/2: Fetch topic 555555555555555555555555555555 [0] at offset 0 (v2) %7|1543308536.817|FETCH|rdkafka#consumer-2| [thrd:SZX1000449934:9094/bootstrap]: SZX1000449934:9094/2: Fetch topic 666666666666666666666666666666 [0] at offset 0 (v2) %7|1543308536.817|FETCH|rdkafka#consumer-2| [thrd:SZX1000449934:9094/bootstrap]: SZX1000449934:9094/2: Fetch topic datastore_scheduler_smooth_1 [0] at offset 0 (v2) %7|1543308536.817|FETCH|rdkafka#consumer-2| [thrd:SZX1000449934:9094/bootstrap]: SZX1000449934:9094/2: Fetch topic datastore_scheduler_smooth_2 [0] at offset 0 (v2) %7|1543308536.817|FETCH|rdkafka#consumer-2| [thrd:SZX1000449934:9094/bootstrap]: SZX1000449934:9094/2: Fetch topic datastore_scheduler_smooth_3 [0] at offset 0 (v2) %7|1543308536.817|FETCH|rdkafka#consumer-2| [thrd:SZX1000449934:9094/bootstrap]: SZX1000449934:9094/2: Fetch topic datastore_scheduler_smooth_7 [0] at offset 0 (v2) %7|1543308536.817|FETCH|rdkafka#consumer-2| [thrd:SZX1000449934:9094/bootstrap]: SZX1000449934:9094/2: Fetch topic topic3 [0] at offset 0 (v2) %7|1543308536.817|FETCH|rdkafka#consumer-2| [thrd:SZX1000449934:9094/bootstrap]: SZX1000449934:9094/2: Fetch topic topic5 [0] at offset 0 (v2) %7|1543308536.817|FETCH|rdkafka#consumer-2| [thrd:SZX1000449934:9094/bootstrap]: SZX1000449934:9094/2: Fetch topic 222222222222222222222222222222 [0] at offset 0 (v2) %7|1543308536.817|FETCH|rdkafka#consumer-2| [thrd:SZX1000449934:9094/bootstrap]: SZX1000449934:9094/2: Fetch 10/10/10 toppar(s) %7|1543308536.817|SEND|rdkafka#consumer-2| [thrd:SZX1000449934:9094/bootstrap]: SZX1000449934:9094/2: Sent FetchRequest (v4, 506 bytes @ 0, CorrId 614) ....... it seams that the program dumps everything from the beginning of the history. it's still keep on printing at the moment....

yorksen commented 6 years ago

ok,, I feel it 's caused by the timeout 1ms which is set to my rd_kafka_consume_batch param. wait a moment.

yorksen commented 6 years ago

I have no idea,when changed to 10s, it still keep on printing seams will never be stopped.

van1988ch commented 5 years ago

i have the same question. @yorksen could you solve the problem totally?

yorksen commented 5 years ago

@van1988ch it's not solved yet,maybe the author is waiting for more information of this issue.