mre / kafka-influxdb

High performance Kafka consumer for InfluxDB. Supports collectd message formats.
Apache License 2.0
216 stars 54 forks source link

No messages read from kafka #5

Closed aglagla closed 8 years ago

aglagla commented 8 years ago

Kafka jar : kafka_2.11-0.8.2.1.jar Python mods : kafka-influxdb==0.5.0 kafka-python==0.9.4 config.yaml :

kafka:
  host: "localhost"
  port: 9092
  topic: "mytopic"
influxdb:
  host: "localhost"
  port: 8086
  user: "myuser"
  password: "mypwd"
  dbname: "mydb"
  retention_policy: "default"
encoder: "collectd_graphite_encoder"
benchmark: false
buffer_size: 1000
verbose: false
statistics: true

On a simple local configuration, kafka broker is on localhost:9092, influxdb and collectd running as well. Collectd is sending messages to kafka (checked with kafka-console-consumer.sh) Influxdb accepting requests. However, when starting kafka_influxdb as follows (foreground mode) kafka_influxdb -c ./config.yaml -vvv

Traceback (most recent call last):
  File "/usr/lib/python2.7/logging/__init__.py", line 851, in emit
    msg = self.format(record)
  File "/usr/lib/python2.7/logging/__init__.py", line 724, in format
    return fmt.format(record)
  File "/usr/lib/python2.7/logging/__init__.py", line 464, in format
    record.message = record.getMessage()
  File "/usr/lib/python2.7/logging/__init__.py", line 328, in getMessage
    msg = msg % self.args
TypeError: not all arguments converted during string formatting
Logged from file kafka_influxdb.py, line 100
Traceback (most recent call last):
  File "/usr/lib/python2.7/logging/__init__.py", line 851, in emit
    msg = self.format(record)
  File "/usr/lib/python2.7/logging/__init__.py", line 724, in format
    return fmt.format(record)
  File "/usr/lib/python2.7/logging/__init__.py", line 464, in format
    record.message = record.getMessage()
  File "/usr/lib/python2.7/logging/__init__.py", line 328, in getMessage
    msg = msg % self.args
TypeError: not all arguments converted during string formatting
Logged from file kafka_influxdb.py, line 118
Traceback (most recent call last):
  File "/usr/lib/python2.7/logging/__init__.py", line 851, in emit
    msg = self.format(record)
  File "/usr/lib/python2.7/logging/__init__.py", line 724, in format
    return fmt.format(record)
  File "/usr/lib/python2.7/logging/__init__.py", line 464, in format
    record.message = record.getMessage()
  File "/usr/lib/python2.7/logging/__init__.py", line 328, in getMessage
    msg = msg % self.args
TypeError: not all arguments converted during string formatting
Logged from file kafka_influxdb.py, line 130
INFO:root:Connecting to InfluxDB at localhost:8086...
INFO:root:Creating database mydb if not exists
INFO:root:Creating InfluxDB database if not exists: mydb
INFO:urllib3.connectionpool:Starting new HTTP connection (1): localhost
DEBUG:urllib3.connectionpool:Setting read timeout to None
DEBUG:urllib3.connectionpool:"GET /query?q=CREATE+DATABASE+mydb&db=mydb HTTP/1.1" 200 72
INFO:root:database already exists
INFO:root:Listening for messages on Kafka topic mytopic...

lsof -nn on the process gives the following output :

COMMAND     PID USER   FD   TYPE    DEVICE SIZE/OFF    NODE NAME
kafka_inf 23950 root  txt    REG       8,2  3345416  131084 /usr/bin/python2.7
kafka_inf 23950 root  mem    REG       8,2    47712 4722756 /lib/x86_64-linux-gnu/libnss_files-2.19.so
kafka_inf 23950 root  mem    REG       8,2   194174 1449890 /usr/local/lib/python2.7/dist-packages/pandas/_testing.so
kafka_inf 23950 root  mem    REG       8,2   589629 1449892 /usr/local/lib/python2.7/dist-packages/pandas/msgpack.so
kafka_inf 23950 root  mem    REG       8,2   304666 1450056 /usr/local/lib/python2.7/dist-packages/pandas/json.so
kafka_inf 23950 root  mem    REG       8,2  2005176 1449891 /usr/local/lib/python2.7/dist-packages/pandas/parser.so
kafka_inf 23950 root  mem    REG       8,2  1437878 1450065 /usr/local/lib/python2.7/dist-packages/pandas/_sparse.so
kafka_inf 23950 root  mem    REG       8,2  1653285 1450058 /usr/local/lib/python2.7/dist-packages/pandas/_period.so
kafka_inf 23950 root  mem    REG       8,2  1054219 1450055 /usr/local/lib/python2.7/dist-packages/pandas/index.so
kafka_inf 23950 root  mem    REG       8,2 12434880 1450136 /usr/local/lib/python2.7/dist-packages/pandas/algos.so
kafka_inf 23950 root  mem    REG       8,2    34256  272801 /usr/lib/python2.7/lib-dynload/_csv.x86_64-linux-gnu.so
kafka_inf 23950 root  mem    REG       8,2  4801036 1450057 /usr/local/lib/python2.7/dist-packages/pandas/lib.so
kafka_inf 23950 root  mem    REG       8,2  5514790 1450137 /usr/local/lib/python2.7/dist-packages/pandas/tslib.so
kafka_inf 23950 root  mem    REG       8,2  2128770  923406 /usr/local/lib/python2.7/dist-packages/numpy/random/mtrand.so
kafka_inf 23950 root  mem    REG       8,2   135687 1063785 /usr/local/lib/python2.7/dist-packages/numpy/fft/fftpack_lite.so
kafka_inf 23950 root  mem    REG       8,2     7632  272779 /usr/lib/python2.7/lib-dynload/future_builtins.x86_64-linux-gnu.so
kafka_inf 23950 root  mem    REG       8,2   673678  923385 /usr/local/lib/python2.7/dist-packages/numpy/linalg/_umath_linalg.so
kafka_inf 23950 root  mem    REG       8,2   244856  135734 /usr/lib/x86_64-linux-gnu/libquadmath.so.0.0.0
kafka_inf 23950 root  mem    REG       8,2    90080 4722708 /lib/x86_64-linux-gnu/libgcc_s.so.1
kafka_inf 23950 root  mem    REG       8,2  1153392  135335 /usr/lib/x86_64-linux-gnu/libgfortran.so.3.0.0
kafka_inf 23950 root  mem    REG       8,2   511344  789219 /usr/lib/libblas/libblas.so.3.0
kafka_inf 23950 root  mem    REG       8,2  5882272  792640 /usr/lib/lapack/liblapack.so.3.0
kafka_inf 23950 root  mem    REG       8,2    51744  923391 /usr/local/lib/python2.7/dist-packages/numpy/linalg/lapack_lite.so
kafka_inf 23950 root  mem    REG       8,2   102644 1207007 /usr/local/lib/python2.7/dist-packages/numpy/lib/_compiled_base.so
kafka_inf 23950 root  mem    REG       8,2   625265 1063910 /usr/local/lib/python2.7/dist-packages/numpy/core/scalarmath.so
kafka_inf 23950 root  mem    REG       8,2  2234567 1063884 /usr/local/lib/python2.7/dist-packages/numpy/core/umath.so
kafka_inf 23950 root  mem    REG       8,2  7452726 1063879 /usr/local/lib/python2.7/dist-packages/numpy/core/multiarray.so
kafka_inf 23950 root  mem    REG       8,2  1270596 1450138 /usr/local/lib/python2.7/dist-packages/pandas/hashtable.so
kafka_inf 23950 root  mem    REG       8,2    18936 4722839 /lib/x86_64-linux-gnu/libuuid.so.1.3.0
kafka_inf 23950 root  mem    REG       8,2    30944  139649 /usr/lib/x86_64-linux-gnu/libffi.so.6.0.1
kafka_inf 23950 root  mem    REG       8,2   136232  272793 /usr/lib/python2.7/lib-dynload/_ctypes.x86_64-linux-gnu.so
kafka_inf 23950 root  mem    REG       8,2    54064  272798 /usr/lib/python2.7/lib-dynload/_json.x86_64-linux-gnu.so
kafka_inf 23950 root  mem    REG       8,2    33448  272783 /usr/lib/python2.7/lib-dynload/_multiprocessing.x86_64-linux-gnu.so
kafka_inf 23950 root  mem    REG       8,2    77752  271801 /usr/lib/python2.7/lib-dynload/parser.x86_64-linux-gnu.so
kafka_inf 23950 root  mem    REG       8,2   387256 4718682 /lib/x86_64-linux-gnu/libssl.so.1.0.0
kafka_inf 23950 root  mem    REG       8,2    38480  272785 /usr/lib/python2.7/lib-dynload/_ssl.x86_64-linux-gnu.so
kafka_inf 23950 root  mem    REG       8,2  1930528 4718681 /lib/x86_64-linux-gnu/libcrypto.so.1.0.0
kafka_inf 23950 root  mem    REG       8,2    20664  271802 /usr/lib/python2.7/lib-dynload/_hashlib.x86_64-linux-gnu.so
kafka_inf 23950 root  mem    REG       8,2   109232  272794 /usr/lib/python2.7/lib-dynload/datetime.x86_64-linux-gnu.so
kafka_inf 23950 root  mem    REG       8,2  7476720  138263 /usr/lib/locale/locale-archive
kafka_inf 23950 root  mem    REG       8,2  1071552 4722731 /lib/x86_64-linux-gnu/libm-2.19.so
kafka_inf 23950 root  mem    REG       8,2   100728 4722844 /lib/x86_64-linux-gnu/libz.so.1.2.8
kafka_inf 23950 root  mem    REG       8,2    10680 4722836 /lib/x86_64-linux-gnu/libutil-2.19.so
kafka_inf 23950 root  mem    REG       8,2    14664 4722698 /lib/x86_64-linux-gnu/libdl-2.19.so
kafka_inf 23950 root  mem    REG       8,2  1845024 4722681 /lib/x86_64-linux-gnu/libc-2.19.so
kafka_inf 23950 root  mem    REG       8,2   141574 4722801 /lib/x86_64-linux-gnu/libpthread-2.19.so
kafka_inf 23950 root  mem    REG       8,2   149120 4722657 /lib/x86_64-linux-gnu/ld-2.19.so
kafka_inf 23950 root    0u   CHR     136,9      0t0      12 /dev/pts/9
kafka_inf 23950 root    1u   CHR     136,9      0t0      12 /dev/pts/9
kafka_inf 23950 root    2u   CHR     136,9      0t0      12 /dev/pts/9
kafka_inf 23950 root    3u  IPv4 261900078      0t0     TCP 127.0.0.1:37828->127.0.0.1:8086 (EST

It doesn't seem to be talking to kafka nor consuming any messages.

Any help would be appreciated.

Regards,

Alexis

mre commented 8 years ago

Thanks for reporting. I will have a look as soon as I find the time. Will keep you updated on the process. For a start, could you reduce the buffer_size in the config to 1 and see if you get any output?

aglagla commented 8 years ago

No changes with the buffer size to 1 I straced the process as well, with just the connect call :

$ sudo strace -e connect kafka_influxdb -c config.yaml -vvv
--- SIGCHLD {si_signo=SIGCHLD, si_code=CLD_EXITED, si_pid=24219, si_status=0, si_utime=0, si_stime=0} ---
--- SIGCHLD {si_signo=SIGCHLD, si_code=CLD_EXITED, si_pid=24221, si_status=0, si_utime=0, si_stime=0} ---
Traceback (most recent call last):
  File "/usr/lib/python2.7/logging/__init__.py", line 851, in emit
    msg = self.format(record)
  File "/usr/lib/python2.7/logging/__init__.py", line 724, in format
    return fmt.format(record)
  File "/usr/lib/python2.7/logging/__init__.py", line 464, in format
    record.message = record.getMessage()
  File "/usr/lib/python2.7/logging/__init__.py", line 328, in getMessage
    msg = msg % self.args
TypeError: not all arguments converted during string formatting
Logged from file kafka_influxdb.py, line 100
Traceback (most recent call last):
  File "/usr/lib/python2.7/logging/__init__.py", line 851, in emit
    msg = self.format(record)
  File "/usr/lib/python2.7/logging/__init__.py", line 724, in format
    return fmt.format(record)
  File "/usr/lib/python2.7/logging/__init__.py", line 464, in format
    record.message = record.getMessage()
  File "/usr/lib/python2.7/logging/__init__.py", line 328, in getMessage
    msg = msg % self.args
TypeError: not all arguments converted during string formatting
Logged from file kafka_influxdb.py, line 118
Traceback (most recent call last):
  File "/usr/lib/python2.7/logging/__init__.py", line 851, in emit
    msg = self.format(record)
  File "/usr/lib/python2.7/logging/__init__.py", line 724, in format
    return fmt.format(record)
  File "/usr/lib/python2.7/logging/__init__.py", line 464, in format
    record.message = record.getMessage()
  File "/usr/lib/python2.7/logging/__init__.py", line 328, in getMessage
    msg = msg % self.args
TypeError: not all arguments converted during string formatting
Logged from file kafka_influxdb.py, line 130
INFO:root:Connecting to InfluxDB at localhost:8086...
INFO:root:Creating database mydb if not exists
INFO:root:Creating InfluxDB database if not exists: mydb
INFO:urllib3.connectionpool:Starting new HTTP connection (1): localhost
connect(3, {sa_family=AF_LOCAL, sun_path="/var/run/nscd/socket"}, 110) = -1 ENOENT (No such file or directory)
connect(3, {sa_family=AF_LOCAL, sun_path="/var/run/nscd/socket"}, 110) = -1 ENOENT (No such file or directory)
connect(3, {sa_family=AF_INET, sin_port=htons(8086), sin_addr=inet_addr("127.0.0.1")}, 16) = 0
DEBUG:urllib3.connectionpool:Setting read timeout to None
DEBUG:urllib3.connectionpool:"GET /query?q=CREATE+DATABASE+mydb&db=mydb HTTP/1.1" 200 72
INFO:root:database already exists
INFO:root:Listening for messages on Kafka topic aleph...
panda87 commented 8 years ago

I have the same problem after I ran the setup.py file. @mre Do you have any idea on that?

mre commented 8 years ago

@panda87 no idea yet. Did it work before for you or was it never working? Also did you try to run it with Docker?

@aglagla what happens when you try to read on a topic that does not exist? Will kafka-influxdb complain?

panda87 commented 8 years ago

@mre Its the first time I tried to implement this project so I don't have too much experience on that. No, didn't try to run it with Docker since the docker-compose contains another services which already installed on my system. However, I'll try to run it with Docker

panda87 commented 8 years ago

@mre the Docker stuck on Starting to consume messages No message consumed

mre commented 8 years ago

@panda87 @aglagla could you guys try again now? I think I found the bug:

In the beginning I had a simple handle_read() method. It would just read messages from Kafka and yield the values to the caller. So far so good.

After some time I discovered a bug where kafka-influxdb was not reconnecting after Kafka crashed. So I added retry handling for that case. The handler was a simple wrapper function around handle_read():

def read():
  # Reconnect on error
  while True:
    yield from handle_read()

The yield from returns all messages from handle_read(). Unfortunately this only worked in Python 3.4, so I removed it again.

def read():
  # Reconnect on error
  while True:
    handle_read()

What I should have done instead was:

def read():
  # Reconnect on error
  while True:
    for msg in handle_read():
      yield msg

So I forgot to yield the messages back to the caller. This is now fixed. Sorry for the inconvenience.

aglagla commented 8 years ago

Many thanks for your feedback and sorry for not being more responsive. Traveling at the moment but will definitely be testing the new release in the coming weeks.

Cheers,

Alexis

On Wed, Oct 7, 2015 at 4:08 PM, Matthias Endler notifications@github.com wrote:

@panda87 https://github.com/panda87 @aglagla https://github.com/aglagla could you guys try again now? I think I found the bug:

In the beginning I had a simple handle_read() method. It would just read messages from Kafka and yield the values to the caller. So far so good.

After some time I discovered a bug where kafka-influxdb was not reconnecting after Kafka crashed. So I added retry handling for that case. The handler was a simple wrapper function around handle_read():

def read():

Reconnect on error

while True: yield from handle_read()

The yield from returns all messages from handle_read(). Unfortunately this only worked in Python 3.4, so I removed it again.

def read():

Reconnect on error

while True: handle_read()

What I should have done instead was:

def read():

Reconnect on error

while True: for msg in handle_read(): yield msg

So I forgot to yield the messages back to the caller. This is now fixed. Sorry for the inconvenience.

— Reply to this email directly or view it on GitHub https://github.com/mre/kafka-influxdb/issues/5#issuecomment-146205715.

mre commented 8 years ago

Thanks @aglagla. I appreciate any feedback :smiley:

panda87 commented 8 years ago

Thanks @mre I'll check this now and let you know.

panda87 commented 8 years ago

@mre I cloned the updated project and just build the Dockerfile. During the build it's complained about many issues. Im coping part of them

warning: no files found matching 'README.md'

no previously-included directories found matching 'documentation/_build'
zip_safe flag not set; analyzing archive contents...
six: module references __path__

In file included from ext/_yaml.c:343:0:
ext/_yaml.h:6:0: warning: "PyUnicode_FromString" redefined
 #define PyUnicode_FromString(s) PyUnicode_DecodeUTF8((s), strlen(s), "strict")
 ^
In file included from /usr/local/include/python2.7/Python.h:85:0,
                 from ext/_yaml.c:16:
/usr/local/include/python2.7/unicodeobject.h:281:0: note: this is the location of the previous definition
 # define PyUnicode_FromString PyUnicodeUCS4_FromString
 ^
ext/_yaml.c: In function ‘__pyx_pf_5_yaml_get_version_string’:
ext/_yaml.c:1346:17: warning: assignment discards ‘const’ qualifier from pointer target type
   __pyx_v_value = yaml_get_version_string();
......

no previously-included directories found matching 'doc/.build'

zip_safe flag not set; analyzing archive contents...

zip_safe flag not set; analyzing archive contents...
tests.influxdb.client_test_with_server: module references __file__

/usr/local/lib/python2.7/site-packages/setuptools/dist.py:285: UserWarning: Normalizing '2015.09.06.2' to '2015.9.6.2'
  normalized_version,
zip_safe flag not set; analyzing archive contents...
certifi.core: module references __file__

error: six 1.10.0 is installed but six==1.9.0 is required by set(['influxdb'])

Maybe another packages should be updated?

mre commented 8 years ago

You are right @panda87. Thanks for the hint. I hadn't updated the setup.py script with the new required packages.

This is fixed now. I'm loading them dynamically from the requirements.txt file. This avoids duplicate code and much confusion. Would you mind doing another test run? Thanks for your patience btw.

panda87 commented 8 years ago

Thanks @mre I see a progress, im still getting the same errors but the images eventually built. Now I'm getting this: Waiting for Kafka connection at kafka:9092. This may take a while... Waiting for InfluxDB connection at influxdb:8086. This may take a while... Starting to consume messages Reading config file config_example.yaml Traceback (most recent call last): File "/usr/local/bin/kafka_influxdb", line 9, in load_entry_point('kafka-influxdb==0.6.0', 'console_scripts', 'kafka_influxdb')() File "/usr/local/lib/python2.7/site-packages/kafka_influxdb-0.6.0-py2.7.egg/kafka_influxdb/kafka_influxdb.py", line 100, in main start_consumer(config) File "/usr/local/lib/python2.7/site-packages/kafka_influxdb-0.6.0-py2.7.egg/kafka_influxdb/kafka_influxdb.py", line 131, in start_consumer client.consume() File "/usr/local/lib/python2.7/site-packages/kafka_influxdb-0.6.0-py2.7.egg/kafka_influxdb/kafka_influxdb.py", line 35, in consume self.buffer.append(self.encoder.encode(raw_message)) File "/usr/local/lib/python2.7/site-packages/kafka_influxdb-0.6.0-py2.7.egg/kafka_influxdb/encoder/collectd_json_encoder.py", line 27, in encode raise NotImplemented TypeError: exceptions must be old-style classes or derived from BaseException, not NotImplementedType

My CollectD logs are JSON, there is a problem with that?

btw, 2 questions.

  1. Does the KAFKA_HOST param support multiple hostnames?
  2. My InfluxDB cluster is behind ELB with TLS (port 8086), how can I set it to be also TLS?

D.

mre commented 8 years ago

Yeah, so the errors are actually C compiler warnings from a dependency, namely PyYAML 3.11. So there's not much we could do here, except maybe fixing that in upstream or switching to another library. For now it's fine I guess.

The CollectD JSON format is currently not implement, but if you send me a couple of sample messages, I can have a look and maybe write an encoder for it. Maybe we can handle this in a separate issue.

As for your questions:

  1. Do you want to read from different Kafka topics or do you want to read from the same topic but from different partitions?
  2. It should be possible. We use influxdb-python as the library to talk to InfluxDB. It has a parameter for that, see http://influxdb-python.readthedocs.org/en/latest/api-documentation.html#influxdbclient. So I will create a new issue and see what I can do.
panda87 commented 8 years ago

I created new issue for the JSON encoder.

  1. I want the ability to be able consume from the cluster if the specific server I out will be down, so this will try the first hostname, and if it will get timeout, this will go the next hostname and so on.
  2. The ability to read from a several topics (from the same config file) and route the to the appropriate InfluxDB db's sounds a good feature, currently I can run more that one container, so maybe this will be not on the first priority (for me, at least).
  3. Yes, the ssl and verify_sll params are exists and this is what I look for. I'll open a new issue for that.

Thanks for your time @mre D.

mre commented 8 years ago
  1. Sounds like a great idea. I'll open an issue for that.
  2. Yeah, I guess for now we should go with separate consumers per topic. Tools like Logstash do that as well.
  3. Thanks for that. For reference: #7
aglagla commented 8 years ago

Hi @mre

Finally got some time to test and, after pulling the new release and installing it (thanks for the setup.py update), ran my test with the same config.yaml as before in debug mode with a small set of collectd metrics and everything works perfectly. Checked influxdb and as expected all metrics are there.

Thanks again for your responsiveness, much appreciated.

Alexis

On Wed, Oct 7, 2015 at 7:39 PM, Matthias Endler notifications@github.com wrote:

Thanks @aglagla https://github.com/aglagla. I appreciate any feedback [image: :smiley:]

— Reply to this email directly or view it on GitHub https://github.com/mre/kafka-influxdb/issues/5#issuecomment-146272565.

mre commented 8 years ago

Nice! Thanks for your feedback. Closing.