emrebekar / node-red-contrib-kafkajs

5 stars 11 forks source link

Consumer stuck in "Idle" until flow redeployed #15

Open mossc001 opened 10 months ago

mossc001 commented 10 months ago

There's probably a really obvious answer and solution to this but the consumer successfully connects, reads and gets a payload of data then goes to idle. However, I know there are payloads almost every 2 seconds as I can see them in a Kafka Consumer in Docker.

Any idea why it goes to Idle then never receives any further data until redeployed? The settings between the two are exactly the same apart from ClientID and Group for obvious reasons.

Idle: image

Docker Kafka Consumer: image

thomasvnl commented 6 months ago

I have this exact same issue and no resolution as of yet.

mossc001 commented 6 months ago

I have this exact same issue and no resolution as of yet.

I worked out that it's because the LZ4 compression doesn't exist which was my issue. Modified the raw code and it fixed it.

thomasvnl commented 6 months ago

I have this exact same issue and no resolution as of yet.

I worked out that it's because the LZ4 compression doesn't exist which was my issue. Modified the raw code and it fixed it.

Can you tell me where you solved the raw code to fix this? Also, how did you figure out that that was the issue? Initially the consumer connects and consumes data. How does it stop working because of LZ4 compression (or a lack thereof) down the line?

mossc001 commented 6 months ago

I have this exact same issue and no resolution as of yet.

I worked out that it's because the LZ4 compression doesn't exist which was my issue. Modified the raw code and it fixed it.

Can you tell me where you solved the raw code to fix this? Also, how did you figure out that that was the issue? Initially the consumer connects and consumes data. How does it stop working because of LZ4 compression (or a lack thereof) down the line?

Hi @thomasvnl , this was what initially threw me that it initially consumed but somehow it was related to LZ4.

Install the palette of "@ecraneworldwide/node-red-contrib-lz4"

Then in the following files, make the modifications: • lz4.js > ~/.node-red/node_modules/kafkajs/src/protocol/message/compression • index.js > ~/.node-red/node_modules/kafkajs/src/protocol/message/compression

lz4.js

const { promisify } = require('util')
const lz4lib = require('lz4')

//console.log('Attempt to do lz4 codec with ' + lz4lib.decode)

const docompress = lz4lib.encode
const dodecompress = lz4lib.decode

module.exports = {
  /**
   * @param {Encoder} encoder
   * @returns {Promise}
   */
  async compress(encoder) {
    return await docompress(encoder.buffer)
  },

  /**
   * @param {Buffer} buffer
   * @returns {Promise}
   */
  async decompress(buffer) {
    return dodecompress(buffer)
  },
}

and...

index.js

const { KafkaJSNotImplemented } = require('../../../errors')

const COMPRESSION_CODEC_MASK = 0x07

const Types = {
  None: 0,
  GZIP: 1,
  Snappy: 2,
  LZ4: 3,
  ZSTD: 4,
  TEST: 5,
}

const Codecs = {
  [Types.GZIP]: () => require('./gzip'),
  [Types.Snappy]: () => {
    throw new KafkaJSNotImplemented('Snappy compression not implemented')
  },
//  [Types.LZ4]: () => {
//    throw new KafkaJSNotImplemented('LZ4 compression not implemented')
//  },
  [Types.LZ4]: () => require('./lz4'),
  [Types.ZSTD]: () => {
    throw new KafkaJSNotImplemented('ZSTD compression not implemented')
  },
  [Types.TEST]: () => require('./lz4'),
}

const lookupCodec = type => (Codecs[type] ? Codecs[type]() : null)
const lookupCodecByAttributes = attributes => {
  const codec = Codecs[attributes & COMPRESSION_CODEC_MASK]
  return codec ? codec() : null
}

module.exports = {
  Types,
  Codecs,
  lookupCodec,
  lookupCodecByAttributes,
  COMPRESSION_CODEC_MASK,
}