tulios / kafkajs

A modern Apache Kafka client for node.js
https://kafka.js.org
MIT License
3.7k stars 523 forks source link

Question: TypeError: request is not a function #779

Closed chualanagit closed 4 years ago

chualanagit commented 4 years ago

Describe the bug

ReferenceError: You are trying to `import` a file after the Jest environment has been torn down.

      at 5 (node_modules/kafkajs/src/protocol/requests/metadata/index.js:28:21)
      at Broker.metadata (node_modules/kafkajs/src/broker/index.js:190:7)
      at node_modules/kafkajs/src/cluster/brokerPool.js:127:38
      at retry (node_modules/kafkajs/src/retry/index.js:43:5)
      at node_modules/kafkajs/src/retry/index.js:61:5

and eachMessage()is not able to consume messages from Kafka topic

To Reproduce

Consumer code:

    test('test consumer consumes correct data from producer', async () => {
        const restartOnFailure = jest.fn(async () => false)
        const kafka = new Kafka({
            clientId: 'example-consumer',
            logLevel: logLevel.ERROR,
            brokers: ['localhost:9092'],
        })
        const consumer = kafka.consumer({ 
            groupId: 'test-group',
            retry: {
                retries: 0,
                initialRetryTime: 1,
                restartOnFailure,
            },
        });
        const run = async () => {
            await consumer.connect()
            await consumer.subscribe({ topic: 'test-topic', fromBeginning: true })
            await consumer.run({
                eachMessage: async ({ topic, partition, message }) => {
                    const decodedKey = await registry.decode(message.key)
                    const decodedValue = await registry.decode(message.value)
                    console.log({ decodedKey, decodedValue })
                },
            })
        }
        await run().catch(console.error)
    });
    afterAll(async () => {
        await consumer.disconnect()
    });
  1. Run a producer that produces messages to a topic on time and then disconnect
  2. Run a consumer that subscribes to that topic and logs each topic offset
  3. Error occurs and no message or topic are logged

Expected behavior Being able to consume messages and verify correctness in an integration test

Observed behavior Error described above

Environment:

Nevon commented 4 years ago

Could you please edit your issue to use the issue template? There is a lot of information missing that would be helpful to have:

**Describe the bug**
A clear and concise description of what the bug is.

**To Reproduce**
Please provide either a link to a:

1. [failing test in a KafkaJS fork](https://github.com/tulios/kafkajs/blob/2faef0719eeba50759eccc2b7d8800dbe63803f3/src/consumer/__tests__/consumeMessages.spec.js#L52-L97)
2. [repository with an example project reproducing the issue](https://github.com/tulios/kafkajs/blob/master/examples/consumer.js)

If none of the above are possible to provide, please write down the exact steps to reproduce the behavior:
1. Run a producer that continuously produces messages to a topic
2. Run a consumer that subscribes to that topic and logs each topic offset
3. After the consumer has consumed 100 messages, it...

**Expected behavior**
A clear and concise description of what you expected to happen.

**Observed behavior**
A clear and concise description of what *did* happen. Please include any relevant logs [with the log level set to debug](https://kafka.js.org/docs/configuration#logging).

**Environment:**
 - OS: [e.g. Mac OS 10.15.3]
 - KafkaJS version [e.g. 1.12.0]
 - Kafka version [e.g. 2.3.1]
 - NodeJS version [e.g. 10.13.0]

**Additional context**
Add any other context about the problem here.
ankon commented 4 years ago

As a data point: I'm seeing this error message with different stack traces when running tests locally:

 FAIL  src/consumer/__tests__/subscribe.spec.js
  ● Consumer › with regex › subscribes to all matching topics

    TypeError: request is not a function

      at 2 (src/protocol/requests/apiVersions/index.js:28:16)
      at Broker.apiVersions (src/broker/index.js:163:52)
      at Broker.connect (src/broker/index.js:109:36)
      at src/cluster/brokerPool.js:317:9
      at BrokerPool.findBroker (src/cluster/brokerPool.js:242:5)
      at Cluster.findBroker (src/cluster/index.js:234:14)
      at src/consumer/consumerGroup.js:453:24
Nevon commented 4 years ago

I have a suspicion that this has to do with the lazy loading of the protocol modules. We require only the needed modules for encoding/decoding requests and responses, like this for example:

https://github.com/tulios/kafkajs/blob/9ed0787bbe16df4402923f0ea898a2fca6200736/src/protocol/requests/listOffsets/index.js#L17-L21

I wonder if there is some weirdness there. What version(s) of Node are you encountering these issues in, and are you able to consistently reproduce it? I have never had it happen for me.

chualanagit commented 4 years ago

I'm using Node version v14.3.0 and npm 6.14.4 and was able to consistently reproduce the issue. Sorry about the initial question without detailed explanation, will update soon.

ankon commented 4 years ago

What version(s) of Node are you encountering these issues in, and are you able to consistently reproduce it? I have never had it happen for me.

I'm using NocdeJS 12.18.0/npm 6.14.4.

I looked a bit further, and for my case in the tests these errors are always paired with earlier warnings like these:

ReferenceError: You are trying to `import` a file after the Jest environment has been torn down.

      27 |   5: ({ topics, allowAutoTopicCreation }) => {
      28 |     const request = require('./v5/request')
    > 29 |     const response = require('./v5/response')
         |                      ^
      30 |     return { request: request({ topics, allowAutoTopicCreation }), response }
      31 |   },
      32 | }

      at metadata (src/protocol/requests/metadata/index.js:29:22)
      at Broker.metadata (src/broker/index.js:190:7)
      at fn (src/cluster/brokerPool.js:127:38)
      at start (src/retry/index.js:43:5)
      at src/retry/index.js:61:5

(Usually, but not always coming from retry, and not all of these lead to the request isn't a function errors so I didn't link them in my head).

In these cases it definitely seems to be lazy loading is the "problem": Jest recognizes that it it supposed to be done testing, and when the retry happens Jest rejects the import with the given message -- and returns an empty object.

TL;DR: While I'm getting similar errors in tests, these likely are caused by different things.

chualanagit commented 4 years ago

Yes, you are correct, I've been consistently getting the reference error as well.

ankon commented 4 years ago

Yes, you are correct, I've been consistently getting the reference error as well.

Are you in a unit test context?

chualanagit commented 4 years ago

I'm doing an integration test.

chualanagit commented 4 years ago

I've also been getting this: ● Cannot log after tests are done. Did you forget to wait for something async in your test?. I assume this is sth spit out by jest also.

Nevon commented 4 years ago

Aha, I think we have hit the nail on the head. I bet what is happening is that your consumer is still trying to do something while jest is performing some kind of teardown, possibly because of a retry or something of that nature. I'm not sure exactly why this would screw up the module cache, since I don't believe Jest should touch it in between runs, but it sounds like it's something related.

Regarding logging after tests are done, make sure that you are stopping and disconnecting your consumer (awaiting it to finish) in an afterEach or afterAll function, otherwise you will leave behind a running consumer.

For integration tests, you could also try --runInBand and see if that helps.

ankon commented 4 years ago

Did you forget to wait for something async in your test?

Thinking aloud: Indeed, this message is correct, and points exactly to the problem -- Somehow your environment sometimes/always? leads to a situation where stuff gets left behind on a retry? Maybe a slow broker, maybe indeed some missing await.

One thing to try: What happens if you switch off retries in the KafkaJS configuration (needs current beta/master, see https://github.com/tulios/kafkajs/pull/643)

chualanagit commented 4 years ago

Hey, @Nevon @ankon and all, I've cleaned up this thread for a bit and incorporated all the changes yall suggested. The update is made in the first post here. The only error that currently persists is the ReferenceError and the fact that I can't successfully consume message.

goriunov commented 4 years ago

@chualanagit can you change your run function to

const run = async () => {
  await consumer.connect()
  await consumer.subscribe({ topic: 'test-topic', fromBeginning: true })
  await new Promise((resolve) => {
    consumer.run({
      eachMessage: async ({ topic, partition, message }) => {
        const decodedKey = await registry.decode(message.key)
        const decodedValue = await registry.decode(message.value)
        console.log({ decodedKey, decodedValue })
        resolve();
      },
    })
  })
}

I think your test ends too early as await consumer.run will not wait for your messages, also you may want to extend time on jest as connection to kafka may take abit of time

chualanagit commented 4 years ago

@chualanagit can you change your run function to

const run = async () => {
  await consumer.connect()
  await consumer.subscribe({ topic: 'test-topic', fromBeginning: true })
  await new Promise((resolve) => {
    consumer.run({
      eachMessage: async ({ topic, partition, message }) => {
        const decodedKey = await registry.decode(message.key)
        const decodedValue = await registry.decode(message.value)
        console.log({ decodedKey, decodedValue })
        resolve();
      },
    })
  })
}

I think your test ends too early as await consumer.run will not wait for your messages, also you may want to extend time on jest as connection to kafka may take abit of time

giving it a try now

chualanagit commented 4 years ago

@goriunov good call! The ReferenceError was resolved! However, the integration test still timed out after 3 minutes on this:

   Attempted to log "{"level":"ERROR","timestamp":"2020-06-24T22:21:37.857Z","logger":"kafkajs","message":"[Connection] Response Heartbeat(key: 12, version: 1)","broker":"localhost:9092","clientId":"example-consumer","error":"The coordinator is not aware of this member","correlationId":88,"size":10}".

      at BufferedConsole.error (node_modules/@jest/console/build/BufferedConsole.js:165:10)
      at node_modules/kafkajs/src/loggers/console.js:15:22
      at node_modules/kafkajs/src/loggers/index.js:16:3
      at Connection.logError (node_modules/kafkajs/src/network/connection.js:89:7)
      at Connection.send (node_modules/kafkajs/src/network/connection.js:329:14)
      at Broker.heartbeat (node_modules/kafkajs/src/broker/index.js:314:12)
      at ConsumerGroup.heartbeat (node_modules/kafkajs/src/consumer/consumerGroup.js:317:7)
      at Runner.fetch (node_modules/kafkajs/src/consumer/runner.js:388:5)
      at node_modules/kafkajs/src/consumer/runner.js:403:9

I suspect my problem is similar to #130; however, I'm setting my timeout to 200 seconds. That should be long enough for my consumer to consume the message and for the heartbeat to send?

Nevon commented 4 years ago

I'm not sure why your test is not exiting, but if your test looks like it does in the top post, it doesn't really seem complete, since there's nothing asserting that you have consumed a message, and nothing that would cause it to wait for one.

Here you can see how we do our integration test for consuming messages (ignore the metadata stuff. That's specific to what we want to test. Not relevant for you):

https://github.com/tulios/kafkajs/blob/cff2cfc8713ce23fd8a8174f24a2e9307c927eed/src/consumer/__tests__/consumeMessages.spec.js#L52-L97

As you can see, we:

  1. Start the consumer
  2. Wait for it to join the group
  3. We produce a bunch of messages
  4. We wait for the messages to have been consumed
  5. We do some assertions on the messages

One important note is that for each test run, we generate a new groupId and a new topic. This is important because otherwise you will have problems if you run several tests concurrently, or if you run them several times in quick succession, since on the second run your consumer will try to join an already existing group, and the previous consumer may not have been kicked out of the group yet, so it will take some time before the group is in sync. Better to just start from a clean slate on each run.

chualanagit commented 4 years ago

would it be a problem if consumer and producer are not created from the same Kafka instance? For example:

    const kafka = new Kafka({
        clientId: 'example-consumer',
        logLevel: logLevel.ERROR,
        brokers: ['localhost:9092'],
    })
    consumer = kafka.consumer({
        groupId: groupId,
    });

    const kafka = new Kafka({
        clientId: 'example-producer',
        logLevel: logLevel.ERROR,
        brokers: ['localhost:9092'],
    })
    producer = kafka.producer();
Nevon commented 4 years ago

Nope. Consumers and producers are isolated from one another, regardless of if they come from the same KafkaJS instance. The only thing that does is to let them share some common configuration.

chualanagit commented 4 years ago

I'm not sure why your test is not exiting, but if your test looks like it does in the top post, it doesn't really seem complete, since there's nothing asserting that you have consumed a message, and nothing that would cause it to wait for one.

Here you can see how we do our integration test for consuming messages (ignore the metadata stuff. That's specific to what we want to test. Not relevant for you):

https://github.com/tulios/kafkajs/blob/cff2cfc8713ce23fd8a8174f24a2e9307c927eed/src/consumer/__tests__/consumeMessages.spec.js#L52-L97

As you can see, we:

  1. Start the consumer
  2. Wait for it to join the group
  3. We produce a bunch of messages
  4. We wait for the messages to have been consumed
  5. We do some assertions on the messages

One important note is that for each test run, we generate a new groupId and a new topic. This is important because otherwise you will have problems if you run several tests concurrently, or if you run them several times in quick succession, since on the second run your consumer will try to join an already existing group, and the previous consumer may not have been kicked out of the group yet, so it will take some time before the group is in sync. Better to just start from a clean slate on each run.

why is the example you gave consuming the message first before producer sends unlike the example given here

Nevon commented 4 years ago

It doesn't really matter in this case, as we are subscribing with fromBeginning: true, which means that the consumer will start from the beginning of the topic if there is not previously committed offset to resume from, so regardless of whether you start the consumer before or after the producer has sent its messages, they will get consumed. It just reads more logical to me that we start the consumer, then we produce some messages and then we wait for the messages to have been consumed, but it's just personal preference.

yunnysunny commented 1 year ago

I have a suspicion that this has to do with the lazy loading of the protocol modules. We require only the needed modules for encoding/decoding requests and responses, like this for example:

https://github.com/tulios/kafkajs/blob/9ed0787bbe16df4402923f0ea898a2fca6200736/src/protocol/requests/listOffsets/index.js#L17-L21

I wonder if there is some weirdness there. What version(s) of Node are you encountering these issues in, and are you able to consistently reproduce it? I have never had it happen for me.

As Nevon mentioned , the kafkajs loads protocol modules lazied. When you test runs faster than kafka connect resolved, loading protocol modules will been called after jest tear down. So if your testcases do not have any assert on kafka producing or consuming data, you can manul mock kafkajs function in this tests. A demo is showed below: https://github.com/whyun-demo/jest-afterall/blob/174e6c59c5333a60988c5f37f4acf2a02300bbee/test/setup.ts#L1-L12 @devSajan

devSajan commented 1 year ago

getting same error on "kafkajs": "^2.2.4". not sure what is the exact error

{"level":"ERROR","timestamp":"2023-08-21T14:55:28.029Z","logger":"kafkajs","message":"[Consumer] Crash: TypeError: request is not a function","groupId":"group-3","stack":"TypeError: request is not a function\n    at 3 (/app/node_modules/kafkajs/src/protocol/requests/syncGroup/index.js:30:16)\n    at Broker.syncGroup (/app/node_modules/kafkajs/src/broker/index.js:429:7)\n    at ConsumerGroup.[private:ConsumerGroup:sync] (/app/node_modules/kafkajs/src/consumer/consumerGroup.js:233:57)\n    at /app/node_modules/kafkajs/src/consumer/consumerGroup.js:336:33\n    at processTicksAndRejections (node:internal/process/task_queues:95:5)\n    at Runner.start (/app/node_modules/kafkajs/src/consumer/runner.js:84:7)\n    at start (/app/node_modules/kafkajs/src/consumer/index.js:243:7)\n    at Object.run (/app/node_modules/kafkajs/src/consumer/index.js:304:5)\n    at async Promise.all (index 0)"}
|