mfontanini / cppkafka

Modern C++ Apache Kafka client library (wrapper for librdkafka)
BSD 2-Clause "Simplified" License
600 stars 206 forks source link

Add support for rd_kafka_destroy_flags. #247

Closed filimonov closed 4 years ago

filimonov commented 4 years ago

That allows to set flags for rd_kafka_destroy_flags in the following manner:

     consumer.set_destroy_flags(RD_KAFKA_DESTROY_F_NO_CONSUMER_CLOSE);

It allows us to prevents potential hang during termination if consumer.close() is failing because of queued callback calls.

Refs:

More:

Many thanks to @azat for help.

filimonov commented 4 years ago

BTW: it didn't help with the original problem :\ Still need draining after unsubscribe: https://github.com/edenhill/librdkafka/issues/2898

accelerated commented 4 years ago

Why do you need to connect 2 consumers inside the same app to the same topic?

filimonov commented 4 years ago

3 reasons: 1) actually i was just testing the behavior after rebalance and if was the simplest option to trigger rebalance 1) it's out of my control, users of ClickHouse can create several 'tables' = 'kafka consumers'. I can't restrict that. 2) parallel consumption is done by running several consumers instead of several queues connected to the same consumer (legacy code, should be rewritten, bit tricky).

accelerated commented 4 years ago

I would recommend not having multiple consumers in the same app if you can. It's very counterintuitive and you may run into other issues down the line.

filimonov commented 4 years ago

I would recommend not having multiple consumers in the same app if you can.

I can't :)

accelerated commented 4 years ago

@filimonov Also, since HandleDeleter cannot be set with a null KafkaHandleBase I would recommend capturing it by reference. It makes intent clearer in the class design.

filimonov commented 4 years ago

I've tried, but it seem like it will require deeper look to do that

[build] /home/mfilimonov/workspace/ClickHouse/contrib/libcxx/include/memory:2538:21: error: object of type 'cppkafka::KafkaHandleBase::HandleDeleter' cannot be assigned because its copy assignment operator is implicitly deleted
[build]     __ptr_.second() = _VSTD::forward<deleter_type>(__u.get_deleter());
[build]                     ^
[build] /home/mfilimonov/workspace/ClickHouse/contrib/cppkafka/src/kafka_handle_base.cpp:216:13: note: in instantiation of member function 'std::__1::unique_ptr<rd_kafka_s, cppkafka::KafkaHandleBase::HandleDeleter>::operator=' requested here
[build]     handle_ = HandlePtr(handle, HandleDeleter(*this));

If you can improve that - please do.

accelerated commented 4 years ago

Yeah I just see there's a requirement for the deleter to be default constructible so yes, you can't use any references unfortunately. Will let @mfontanini review. Looks good to me.

mfontanini commented 4 years ago

Thanks for the PR!