Blizzard / node-rdkafka

Node.js bindings for librdkafka
MIT License
2.11k stars 395 forks source link

How to subscribe/assign a consumer based on a timestamp instead of offset? #336

Closed ottomata closed 6 years ago

ottomata commented 6 years ago

Hiya!

I'm looking into building in timestamp based subscription support to KafkaSSE and Wikimedia EventStreams. Using the latest node-rdkafka and a Kafka 1.0 cluster, I can get the message timestamps from the consumer no problem. But, if I'd like to use one of these to assign the consumer to a particular position in the partition, how do I do so? Ideally, I'd be able to just provide the timestamp in the assignment I give to consumer assign(), e.g.

kafkaConsumer.assign([{
    topic: 'my-topic',
    partition: 0,
    timestamp: 1515618893265,
}]

I understand that librdkafka doesn't work this way (right?). It seems with librdkafka, you must use first make a request for the offset associated with a timestamp, and then use that in your consumer assignment. I could use this if it was supported in node-rdkafka.

Is there a way to do this now that I am missing?

JaapRood commented 6 years ago

You present an interesting use-case, but as far as I know Kafka can only consume from an offset. It's the key by which the order guarantee is achieved. I haven't done this myself, but I'm pretty sure timestamps can be out of order because you can write it yourself, allowing for late arriving messages. That would make consuming from a timestamp pretty hard, as you can't be sure by reading a random offset that all messages before it have a timestamp earlier than it.

The use of a monotonic clock for ordering (the incrementing offset rather than a timestamp) is inherent to Kafka's architecture and avoids a whole set of issues, like synchronising clocks, late arriving data, etc. I don't have a strong enough grip on this to explain all those moving forces in great detail. For more info I'd recommend this book by Martin Kleppmann.

ottomata commented 6 years ago

but as far as I know Kafka can only consume from an offset

Since 0.10.1, Kafka has had a timestamp offset index from which you can lookup an offset by timestamp.

Time-based Search: This release of Kafka adds support for a searchable index for each topic based off of message timestamps, which were added in 0.10.0.0. This allows for finer-grained log retention than was possible previously using only the timestamps from the log segments. It also enables consumer support for offset lookup by timestamp, which allows you to seek to a position in the topic based on a certain time.

Ya, the order isn't quite as strong as offsets are, but for many use cases (including mine) that is totally fine. :)

JaapRood commented 6 years ago

You're totally right! For those as curious as me, digging a little bit deeper, the KIP that introduced this mentions some usecases it's meant to support:

  1. From time to time, applications may need to reconsume the messages for a cerntain period, so they will need to rewind the offsets back to, say 6 hours ago, and reconsume all the messages after that.
  2. In a multi-datacenter enviroment, users may have different Kafka clusters in each datacenter for disater recovery. If one of the datacenter failed, the applications may need to switch the consumption from one data center to another datacenter. Because the offset between two different Kafka clusters are independent, users cannot use the offsets from the failed datacenter to consume from the DR datacenter. In this case, searching by timestamp will help because the messages should have same timestamp if users are using CreateTime. Even if users are using LogAppendTime, a more granular search based on timestamp can still help reduce the amount of messages to be reconsumed.

Looking at bit more at how they chose to implement this, it seems they have added a offsetsForTimes to the KafkaConsumer, accepting a list of topic partitions and timestamps to look for. It looks like librdkafka, which node-rdkafka are essentially bindings to, has support for this as well. In node-rdkafka however, it's not a method that seems to have been added to the KafkaConsumer or Client.

The way it works in the other clients is

  1. Call consumer.offsetsForTimes with the right topic-partitions pairs and timestamp. Receive an offset per topic-partition pair.
  2. Call consumer.assign with the offsets received.

TL;DR: as far as I can see, bindings have to be made to librdkafka. Unfortunately, I've never done these kinds of bindings before, so can be of little help.

webmakersteve commented 6 years ago

https://github.com/Blizzard/node-rdkafka/commit/735e99c03f57cfc778e56618c7928a4a8b6925b8

I added offsetsForTime to the consumer. Can you try that?

DylanVerstraete commented 6 years ago

@webmakersteve Can you show an example of how to use this?

Lets say I want to get all messages from a specific topic from the last 5 minutes.

Thx

webmakersteve commented 6 years ago

Will add one to the README

ottomata commented 6 years ago

Sorry haven't been able to find time to try this yet, but am excited!

swapsCAPS commented 6 years ago

Ah a bit obscure, but it turns out you can just give it an array of objects like so:

const topicPartitions = [ {topic: topicName, partition: id, offset: timestamp} ]
consumer.offsetsForTimes(topicPartitions, console.log)

Currently I only have one partition though. Would I have to iterate over all partitions of a topic if I had more?

ottomata commented 6 years ago

K trying this out, but I haven't gotten to work yet. Running Kafka 1.0, node-rdkafka 2.3.2, librdkafka 0.11. To make sure I have good timestamp indexed offsets, I query my 'hi' topic with kafkacat:

./kafkacat -b localhost:9092 -Q -t hi:0:1521788682000
hi [0] offset 431

Looks good.

Now node-rdkafka:

var Kafka = require('node-rdkafka');
var consumer = new Kafka.KafkaConsumer({
  'group.id': 'kafka',
  'metadata.broker.list': 'localhost:9092',
}, {});

var timeout = 10000; // 10 seconds
consumer.offsetsForTimes(
    [ {topic: 'hi', partition: 0, offset: (Date.now - 60)*1000 } ],
    timeout,
    console.log
);

Getting:

Error: Client is disconnected
    at KafkaConsumer.Client.offsetsForTimes (.../node_modules/node-rdkafka/lib/client.js:415:17)
    at repl:1:10
    at sigintHandlersWrap (vm.js:22:35)
    at sigintHandlersWrap (vm.js:73:12)
    at ContextifyScript.Script.runInThisContext (vm.js:21:12)
    at REPLServer.defaultEval (repl.js:340:29)
    at bound (domain.js:280:14)
    at REPLServer.runBound [as eval] (domain.js:293:12)
    at REPLServer.<anonymous> (repl.js:539:10)
    at emitOne (events.js:101:20)
ottomata commented 6 years ago

Oh, I need to consumer.connect(), sorry. Trying...

ottomata commented 6 years ago

I'm having a bit of trouble running node-rdkafka. npm install seems to build both librdkafka and node-rdkafka just fine, but when I attempt to run consumer.connect(), I get a segfault. It seems to be linking against my Debian installed version of librdkafka1 at /usr/lib/x86_64-linux-gnu/librdkafka.so.1 when running, event though I have not set BUILD_LIBRDKAFKA=0. If I uninstall librdkafka1 package, I get

Error: librdkafka.so.1: cannot open shared object file: No such file or directory

Even though the build went fine. I could just be doing something wrong though, still poking around.

The segfault trace I get while librdkafka1=0.11.4-1~bpo9+1 Debian package is installed is:

PID 6732 received SIGSEGV for address: 0x0
/vagrant/srv/kafkasse/node_modules/segfault-handler/build/Release/segfault-handler.node(+0x1a8a)[0x7f0034052a8a]
/lib/x86_64-linux-gnu/libpthread.so.0(+0x110c0)[0x7f00366090c0]
/lib/x86_64-linux-gnu/libc.so.6(+0x8f50a)[0x7f00362e850a]
node(lh_insert+0x14a)[0x8a10aa]
node(OBJ_NAME_add+0x71)[0x8ac591]
/usr/lib/x86_64-linux-gnu/libcrypto.so.1.1(+0x150a3c)[0x7f002c6f2a3c]
/usr/lib/x86_64-linux-gnu/libcrypto.so.1.1(+0x1680d9)[0x7f002c70a0d9]
/lib/x86_64-linux-gnu/libpthread.so.0(+0xe739)[0x7f0036606739]
/usr/lib/x86_64-linux-gnu/libcrypto.so.1.1(CRYPTO_THREAD_run_once+0x9)[0x7f002c75ff59]
/usr/lib/x86_64-linux-gnu/libcrypto.so.1.1(OPENSSL_init_crypto+0x1b3)[0x7f002c70a533]
/usr/lib/x86_64-linux-gnu/libssl.so.1.1(OPENSSL_init_ssl+0x74)[0x7f002cc73bd4]
/usr/lib/x86_64-linux-gnu/librdkafka.so.1(+0x37e95)[0x7f002d105e95]
/usr/lib/x86_64-linux-gnu/librdkafka.so.1(rd_kafka_new+0x90a)[0x7f002d0e751a]
/vagrant/srv/kafkasse/node_modules/node-rdkafka/build/Release/librdkafka++.so.1(_ZN7RdKafka13KafkaConsumer6createEPNS_4ConfERNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEE+0x15c)[0x7f002d3b277c]
/vagrant/srv/kafkasse/node_modules/node-rdkafka/build/Release/node-librdkafka.node(_ZN9NodeKafka13KafkaConsumer7ConnectEv+0x5e)[0x7f002d5e06ee]
/vagrant/srv/kafkasse/node_modules/node-rdkafka/build/Release/node-librdkafka.node(_ZN9NodeKafka7Workers20KafkaConsumerConnect7ExecuteEv+0x27)[0x7f002d5ee267]
/vagrant/srv/kafkasse/node_modules/node-rdkafka/build/Release/node-librdkafka.node(_ZN3Nan12AsyncExecuteEP9uv_work_s+0x32)[0x7f002d5de102]
node[0x132dbc1]
/lib/x86_64-linux-gnu/libpthread.so.0(+0x7494)[0x7f00365ff494]
/lib/x86_64-linux-gnu/libc.so.6(clone+0x3f)[0x7f0036341aff]
Segmentation fault

(I got ^ using https://www.npmjs.com/package/segfault-handler)

webmakersteve commented 6 years ago

This is something I need to fix. I changed over to linking dynamically rather than statically allow librdkafka to build the way it wants, and it evidently didnt find the librdkafka.so file it was looking for.

What version of NPM are you using? I'm trying to track down this bug but have not had much success reproducing. But when I did get it to reproduce it was using NPM v3.

Can you list the directory contents of build/deps?

ottomata commented 6 years ago
$ npm --version
3.10.10

$ ls ./node_modules/node-rdkafka/build/deps/
librdkafka.target.mk
webmakersteve commented 6 years ago

Can you call ./configure and then npm install again and ls the same directory? That fixed it for the other person using NPM 3. I have an idea of a fix, but just want to make sure you're experiencing the same problem.

ottomata commented 6 years ago
$ cd ./node_modules/node-rdkafka
$ ./configure
# ...
$ npm install .
# ...
$ ls ./build/deps
librdkafka.target.mk
ottomata commented 6 years ago

Probably not relevant, but I do get

make: warning:  Clock skew detected.  Your build may be incomplete.

When running npm install.

webmakersteve commented 6 years ago

That fixed it on the other machine. Can you send the output of configure? Sorry to keep asking you to do little incremental things but not able to reproduce on my machine.

My suspicion was that configure is not getting the overriden libdir, which should cause make install to install into the build directory rather than the root directory.

ottomata commented 6 years ago
$ ./configure
using cache file config.cache
checking for OS or distribution... ok (Debian)
checking for C compiler from CC env... failed
checking for gcc (by command)... ok (cached)
checking for C++ compiler from CXX env... failed
checking for C++ compiler (g++)... ok (cached)
checking executable ld... ok (cached)
checking executable nm... ok (cached)
checking executable objdump... ok (cached)
checking executable strip... ok (cached)
checking for pkgconfig (by command)... failed
checking for install (by command)... ok (cached)
checking for PIC (by compile)... ok (cached)
checking for GNU-compatible linker options... ok (cached)
checking for GNU linker-script ld flag... ok (cached)
checking for __atomic_32 (by compile)... ok (cached)
checking for __atomic_64 (by compile)... ok (cached)
checking for socket (by compile)... ok (cached)
parsing version '0x000b03ff'... ok (0.11.3)
checking for libpthread (by compile)... ok (cached)
checking for zlib (by compile)... ok (cached)
checking for libcrypto (by compile)... ok (cached)
checking for liblz4 (by compile)... failed (disable)
checking for libssl (by compile)... ok (cached)
checking for libsasl2 (by compile)... ok (cached)
checking for crc32chw (by compile)... ok (cached)
checking for regex (by compile)... ok (cached)
checking for librt (by compile)... ok (cached)
checking for strndup (by compile)... ok (cached)
checking for strerror_r (by compile)... ok (cached)
checking for libdl (by compile)... ok (cached)
checking for nm (by env NM)... ok (cached)
checking for python (by command)... ok (cached)
Generated Makefile.config
Generated config.h

Configuration summary:
  prefix                   /usr/local
  ARCH                     x86_64
  CPU                      generic
  GEN_PKG_CONFIG           y
  ENABLE_DEVEL             n
  ENABLE_VALGRIND          n
  ENABLE_REFCNT_DEBUG      n
  ENABLE_SHAREDPTR_DEBUG   n
  ENABLE_LZ4_EXT           y
  ENABLE_SSL               y
  ENABLE_SASL              y
  MKL_APP_NAME             librdkafka
  MKL_APP_DESC_ONELINE     The Apache Kafka C/C++ library
  MKL_DISTRO               Debian
  SOLIB_EXT                .so
  CC                       gcc
  CXX                      g++
  LD                       ld
  NM                       nm
  OBJDUMP                  objdump
  STRIP                    strip
  CPPFLAGS                 -g -O2 -fPIC -Wall -Wsign-compare -Wfloat-equal -Wpointer-arith -Wcast-align
  PKG_CONFIG               pkg-config
  INSTALL                  install
  LIB_LDFLAGS              -shared -Wl,-soname,$(LIBFILENAME)
  LDFLAG_LINKERSCRIPT      -Wl,--version-script=
  RDKAFKA_VERSION_STR      0.11.3
  MKL_APP_VERSION          0.11.3
  LIBS                     -lpthread -lz -lcrypto -lssl -lsasl2 -lrt -ldl
  CXXFLAGS                 -Wno-non-virtual-dtor
  SYMDUMPER                $(NM) -D
  exec_prefix              /usr/local
  bindir                   /usr/local/bin
  sbindir                  /usr/local/sbin
  libexecdir               /usr/local/libexec
  datadir                  /usr/local/share
  sysconfdir               /usr/local/etc
  sharedstatedir           /usr/local/com
  localstatedir            /usr/local/var
  libdir                   /usr/local/lib
  includedir               /usr/local/include
  infodir                  /usr/local/info
  mandir                   /usr/local/man
Generated config.cache

Now type 'make' to build
webmakersteve commented 6 years ago

Strange. It isn't overwriting the prefix for you. I think that's the root of the problem here.

Will try to look into it.

ottomata commented 6 years ago

Any luck?

webmakersteve commented 6 years ago

Sorry - haven't had nearly enough time with some changes at the office. Can you try 2.3.3? That fixed a similar issue on a coworkers machine related to the npm version.

If that doesn't work can you send me:

  1. deps/librdkafka/config.h
  2. node version
  3. npm version
  4. ls -al build/deps/

Thanks. Sorry - this is a difficult issue to pin point and a lot of similar issues actually have differet causes.

ottomata commented 6 years ago

Ok, some differences, but I still get segfault. Along the way I upgraded npm (since you mentioned issues with npm 3), and am now on npm 6.0.1.

build/deps has stuff now, so that's good!

$ ls build/deps/
include  librdkafka.a  librdkafka++.a  librdkafka.so  librdkafka++.so  librdkafka.so.1  librdkafka++.so.1  librdkafka.target.mk  pkgconfig

And it seems the prefix problem is fixed

$ ./configure
using cache file config.cache
checking for OS or distribution... ok (Debian)
checking for C compiler from CC env... failed
checking for gcc (by command)... ok (cached)
checking for C++ compiler from CXX env... failed
checking for C++ compiler (g++)... ok (cached)
checking executable ld... ok (cached)
checking executable nm... ok (cached)
checking executable objdump... ok (cached)
checking executable strip... ok (cached)
checking for pkgconfig (by command)... failed
checking for install (by command)... ok (cached)
checking for PIC (by compile)... ok (cached)
checking for GNU-compatible linker options... ok (cached)
checking for GNU linker-script ld flag... ok (cached)
checking for __atomic_32 (by compile)... ok (cached)
checking for __atomic_64 (by compile)... ok (cached)
checking for socket (by compile)... ok (cached)
parsing version '0x000b04ff'... ok (0.11.4)
checking for librt (by compile)... ok (cached)
checking for libpthread (by compile)... ok (cached)
checking for libdl (by compile)... ok (cached)
checking for zlib (by compile)... ok (cached)
checking for libcrypto (by compile)... ok (cached)
checking for liblz4 (by compile)... failed (disable)
checking for libssl (by compile)... ok (cached)
checking for libsasl2 (by compile)... ok (cached)
checking for crc32chw (by compile)... ok (cached)
checking for regex (by compile)... ok (cached)
checking for strndup (by compile)... ok (cached)
checking for strerror_r (by compile)... ok (cached)
checking for pthread_setname_gnu (by compile)... ok (cached)
checking for nm (by env NM)... ok (cached)
checking for python (by command)... ok (cached)
Generated Makefile.config
Generated config.h

Configuration summary:
  prefix                   /vagrant/srv/kafkasse/node_modules/node-rdkafka/build/deps
  ARCH                     x86_64
  CPU                      generic
  GEN_PKG_CONFIG           y
  ENABLE_DEVEL             n
  ENABLE_VALGRIND          n
  ENABLE_REFCNT_DEBUG      n
  ENABLE_SHAREDPTR_DEBUG   n
  ENABLE_LZ4_EXT           y
  ENABLE_SSL               y
  ENABLE_SASL              y
  libdir                   /vagrant/srv/kafkasse/node_modules/node-rdkafka/build/deps
  MKL_APP_NAME             librdkafka
  MKL_APP_DESC_ONELINE     The Apache Kafka C/C++ library
  LDFLAGS                  -L/vagrant/srv/kafkasse/node_modules/node-rdkafka/build/deps
  MKL_DISTRO               Debian
  SOLIB_EXT                .so
  CC                       gcc
  CXX                      g++
  LD                       ld
  NM                       nm
  OBJDUMP                  objdump
  STRIP                    strip
  CPPFLAGS                 -g -O2 -fPIC -Wall -Wsign-compare -Wfloat-equal -Wpointer-arith -Wcast-align
  PKG_CONFIG               pkg-config
  INSTALL                  install
  LIB_LDFLAGS              -shared -Wl,-soname,$(LIBFILENAME)
  LDFLAG_LINKERSCRIPT      -Wl,--version-script=
  RDKAFKA_VERSION_STR      0.11.4
  MKL_APP_VERSION          0.11.4
  LIBS                     -lsasl2 -lssl -lcrypto -lz -ldl -lpthread -lrt
  CXXFLAGS                 -Wno-non-virtual-dtor
  SYMDUMPER                $(NM) -D
  exec_prefix              /vagrant/srv/kafkasse/node_modules/node-rdkafka/build/deps
  bindir                   /vagrant/srv/kafkasse/node_modules/node-rdkafka/build/deps/bin
  sbindir                  /vagrant/srv/kafkasse/node_modules/node-rdkafka/build/deps/sbin
  libexecdir               /vagrant/srv/kafkasse/node_modules/node-rdkafka/build/deps/libexec
  datadir                  /vagrant/srv/kafkasse/node_modules/node-rdkafka/build/deps/share
  sysconfdir               /vagrant/srv/kafkasse/node_modules/node-rdkafka/build/deps/etc
  sharedstatedir           /vagrant/srv/kafkasse/node_modules/node-rdkafka/build/deps/com
  localstatedir            /vagrant/srv/kafkasse/node_modules/node-rdkafka/build/deps/var
  includedir               /vagrant/srv/kafkasse/node_modules/node-rdkafka/build/deps/include
  infodir                  /vagrant/srv/kafkasse/node_modules/node-rdkafka/build/deps/info
  mandir                   /vagrant/srv/kafkasse/node_modules/node-rdkafka/build/deps/man
Generated config.cache

Now type 'make' to build

But I still segfault after consumer.connect():

PID 4033 received SIGSEGV for address: 0x0
/vagrant/srv/kafkasse/node_modules/segfault-handler/build/Release/segfault-handler.node(+0x1a3b)[0x7f6311dcaa3b]
/lib/x86_64-linux-gnu/libpthread.so.0(+0x110c0)[0x7f631c0ad0c0]
/lib/x86_64-linux-gnu/libc.so.6(+0x8f50a)[0x7f631bd8c50a]
node(lh_insert+0x14a)[0x8a04ba]
node(OBJ_NAME_add+0x71)[0x8ab9a1]
/usr/lib/x86_64-linux-gnu/libcrypto.so.1.1(+0x150a3c)[0x7f631254ea3c]
/usr/lib/x86_64-linux-gnu/libcrypto.so.1.1(+0x1680d9)[0x7f63125660d9]
/lib/x86_64-linux-gnu/libpthread.so.0(+0xe739)[0x7f631c0aa739]
/usr/lib/x86_64-linux-gnu/libcrypto.so.1.1(CRYPTO_THREAD_run_once+0x9)[0x7f63125bbf59]
/usr/lib/x86_64-linux-gnu/libcrypto.so.1.1(OPENSSL_init_crypto+0x1b3)[0x7f6312566533]
/usr/lib/x86_64-linux-gnu/libssl.so.1.1(OPENSSL_init_ssl+0x74)[0x7f63128bdbd4]
/vagrant/srv/kafkasse/node_modules/node-rdkafka/build/deps/librdkafka.so.1(+0x37455)[0x7f6312d4f455]
/vagrant/srv/kafkasse/node_modules/node-rdkafka/build/deps/librdkafka.so.1(rd_kafka_new+0x8ba)[0x7f6312d3241a]
/vagrant/srv/kafkasse/node_modules/node-rdkafka/build/deps/librdkafka++.so.1(_ZN7RdKafka13KafkaConsumer6createEPNS_4ConfERNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEE+0x15c)[0x7f63180acaec]
/vagrant/srv/kafkasse/node_modules/node-rdkafka/build/Release/node-librdkafka.node(_ZN9NodeKafka13KafkaConsumer7ConnectEv+0x5e)[0x7f63182db6fe]
/vagrant/srv/kafkasse/node_modules/node-rdkafka/build/Release/node-librdkafka.node(_ZN9NodeKafka7Workers20KafkaConsumerConnect7ExecuteEv+0x27)[0x7f63182e93e7]
/vagrant/srv/kafkasse/node_modules/node-rdkafka/build/Release/node-librdkafka.node(_ZN3Nan12AsyncExecuteEP9uv_work_s+0x32)[0x7f63182d9112]
node[0x132c701]
/lib/x86_64-linux-gnu/libpthread.so.0(+0x7494)[0x7f631c0a3494]
/lib/x86_64-linux-gnu/libc.so.6(clone+0x3f)[0x7f631bde5aff]
ottomata commented 6 years ago

Whatever was happening before I guess was not caused by the shared object linking. This time the node-rdkafka-built librdkafka.so.1 is used. Maybe there is some version bug with my version of libssl or libcrypto?

webmakersteve commented 6 years ago

The new version of OS X has an incompatible version of libssl. To fix it on my machine this is what I did:

LDFLAGS=-L/usr/local/opt/openssl/lib
CPPFLAGS=-I/usr/local/opt/openssl/include
ottomata commented 6 years ago

Hm, ok. I'm actually building in a Debian Stretch virtual machine.
What version of openssl is in your /usr/local/opt?

webmakersteve commented 6 years ago

1.0.2o

ottomata commented 6 years ago

I'm building with openssl 1.1.0f-3+deb9u2 and libssl1.1 1.1.0f-3+deb9u2

Hm.

webmakersteve commented 6 years ago

I think there is an incompatibility with the new version of SSL. Can you try downgrading?

Librdkafka tests against 1.0.2o

ottomata commented 6 years ago

IT WORKED! OK totally my fault with incompatible libssl then! Most of our prod setup is Debian Jessie, which I think still uses 1.0.x. Sorry for all the headache, thanks!

webmakersteve commented 6 years ago

Happy it worked out! This has been a huge headache since the High Sierra update too

ottomata commented 6 years ago

@webmakersteve thanks again so much for this. It enabled me to implement and deploy: https://wikitech.wikimedia.org/wiki/EventStreams#Timestamp_Historical_Consumption See also https://stream.wikimedia.org/?doc#!/Streams/get_v2_stream_streams

Now folks can publicly consume Wikimedia events (like realtime article edit metadata) and provide a historical timestamp from which they want to start consuming. Very cool.