Closed heipacker closed 3 years ago
Hi @heipacker,
Are you able to provide your package.json or able to confirm that you've installed the snappy compression as per the KafkaJS documentation: https://kafka.js.org/docs/producing#a-name-compression-snappy-a-snappy
{ "name": "task", "version": "1.0.0", "dependencies": { "@polkadot/api": "^2.8.1", "async": "^2.1.4", "kafka-node": "^5.0.0", "log4js": "^6.3.0" } }
Hi @heipacker,
Looking at your package.json
it looks like your missing the snappy compression: https://kafka.js.org/docs/producing#a-name-compression-snappy-a-snappy
I am experiencing the same issue, I added kafkajs-snappy
to package.json. It still does not work.
I am connecting my app to a Kafka cluster hosted in Confluent Cloud. Those are the dependencies I added
"kafkajs": "^1.15.0",
"kafkajs-lz4": "^2.0.0-beta.0",
"kafkajs-snappy": "^1.1.0",```
This is the stack trace I am getting
```{"level":"ERROR","timestamp":"2020-12-28T20:59:38.596Z","logger":"kafkajs","message":"[Consumer] Crash: KafkaJSNotImplemented: Snappy compression not implemented","groupId":"ccloud-dev","stack":"KafkaJSNotImplemented: Snappy compression not implemented\n
at 2 (.//api/node_modules/kafkajs/src/protocol/message/compression/index.js:16:11)\n
at lookupCodecByAttributes (.//api/node_modules/kafkajs/src/protocol/message/compression/index.js:29:18)\n
at module.exports (.//api/node_modules/kafkajs/src/protocol/recordBatch/v0/decoder.js:68:17)\n
at decodeMessages (.//api/node_modules/kafkajs/src/protocol/requests/fetch/v4/decodeMessages.js:27:35)\n
at decodePartition (.//api/node_modules/kafkajs/src/protocol/requests/fetch/v11/response.js:39:19)\n
at Decoder.readArrayAsync (.//api/node_modules/kafkajs/src/protocol/decoder.js:179:24)\n
at decodeResponse (.//api/node_modules/kafkajs/src/protocol/requests/fetch/v11/response.js:44:29)\n
at Decoder.readArrayAsync (.//api/node_modules/kafkajs/src/protocol/decoder.js:179:24)\n
at Object.decode (.//api/node_modules/kafkajs/src/protocol/requests/fetch/v11/response.js:52:35)\n
at Connection.send (.//api/node_modules/kafkajs/src/network/connection.js:328:45)\n
at processTicksAndRejections (node:internal/process/task_queues:93:5)\n
at async Broker.[private:Broker:sendRequest] (.//api/node_modules/kafkajs/src/broker/index.js:890:14)\n
at async Broker.fetch (.//api/node_modules/kafkajs/src/broker/index.js:332:12)\n
at async .//api/node_modules/kafkajs/src/consumer/consumerGroup.js:488:31"}
{"level":"INFO","timestamp":"2020-12-28T20:59:39.186Z","logger":"kafkajs","message":"[Consumer] Stopped","groupId":"ccloud-dev"}
@hussein-joe did you manage to fix this?
I am also having the same issue. I am trying to consume from a kafka topic. This is my package json -
{
"name": "my-app",
"version": "1.0.0",
"description": "",
"main": "index.js",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1"
},
"repository": {
"type": "git",
"url": ""
},
"author": "",
"license": "ISC",
"bugs": {
"url": ""
},
"homepage": "",
"dependencies": {
"got": "^11.8.1",
"kafkajs": "^1.15.0",
"kafkajs-snappy": "^1.1.0"
}
}
This are the console logs -
$ node index.js
starting kafka consumer
{"level":"INFO","timestamp":"2021-02-23T10:06:51.962Z","logger":"kafkajs","message":"[Consumer] Starting","groupId":"my-app"}
{"level":"INFO","timestamp":"2021-02-23T10:06:51.982Z","logger":"kafkajs","message":"[ConsumerGroup] Consumer has joined the group","groupId":"my-app","memberId":"my-app-4f171fac-3323-40d6-b150-b1cfc0c9370f","leaderId":"my-app-4f171fac-3323-40d6-b150-b1cfc0c9370f","isLeader":true,"memberAssignment":{"maxwell":[0]},"groupProtocol":"RoundRobinAssigner","duration":18}
{"level":"ERROR","timestamp":"2021-02-23T10:08:26.702Z","logger":"kafkajs","message":"[Connection] Response Fetch(key: 1, version: 11)","broker":"127.0.0.1:9092","clientId":"my-app","error":"Snappy compression not implemented","correlationId":43,"size":919}
{"level":"ERROR","timestamp":"2021-02-23T10:08:26.704Z","logger":"kafkajs","message":"[Consumer] Crash: KafkaJSNotImplemented: Snappy compression not implemented","groupId":"my-app","stack":"KafkaJSNotImplemented: Snappy compression not implemented\n at 2 (/var/www/my-app/node_modules/kafkajs/src/protocol/message/compression/index.js:16:11)\n at lookupCodecByAttributes (/var/www/my-app/node_modules/kafkajs/src/protocol/message/compression/index.js:29:18)\n at module.exports (/var/www/my-app/node_modules/kafkajs/src/protocol/recordBatch/v0/decoder.js:68:17)\n at decodeMessages (/var/www/my-app/node_modules/kafkajs/src/protocol/requests/fetch/v4/decodeMessages.js:27:35)\n at decodePartition (/var/www/my-app/node_modules/kafkajs/src/protocol/requests/fetch/v11/response.js:39:19)\n at Decoder.readArrayAsync (/var/www/my-app/node_modules/kafkajs/src/protocol/decoder.js:179:24)\n at decodeResponse (/var/www/my-app/node_modules/kafkajs/src/protocol/requests/fetch/v11/response.js:44:29)\n at Decoder.readArrayAsync (/var/www/my-app/node_modules/kafkajs/src/protocol/decoder.js:179:24)\n at Object.decode (/var/www/my-app/node_modules/kafkajs/src/protocol/requests/fetch/v11/response.js:52:35)\n at Connection.send (/var/www/my-app/node_modules/kafkajs/src/network/connection.js:328:45)"}
{"level":"INFO","timestamp":"2021-02-23T10:08:26.717Z","logger":"kafkajs","message":"[Consumer] Stopped","groupId":"my-app"}
I understand now. Maxwell uses snappy
compression. For configuring snappy
in kafkajs, this link helps - https://github.com/tulios/kafkajs-snappy
const { CompressionTypes, CompressionCodecs } = require('kafkajs')
const SnappyCodec = require('kafkajs-snappy')
CompressionCodecs[CompressionTypes.Snappy] = SnappyCodec
After adding this code, the error went away.
InstrumentationEvent { id: 2, type: 'consumer.crash', timestamp: 1607266189005, payload: { error: { KafkaJSNotImplemented: Snappy compression not implemented at 2 (/usr/local/polkadot_block_task/node_modules/kafkajs/src/protocol/message/compression/index.js:17:11) at lookupCodecByAttributes (/usr/local/polkadot_block_task/node_modules/kafkajs/src/protocol/message/compression/index.js:30:18) at module.exports (/usr/local/polkadot_block_task/node_modules/kafkajs/src/protocol/messageSet/decoder.js:22:21) at decodePartition (/usr/local/polkadot_block_task/node_modules/kafkajs/src/protocol/requests/fetch/v1/response.js:22:19) at Decoder.readArrayAsync (/usr/local/polkadot_block_task/node_modules/kafkajs/src/protocol/decoder.js:168:24) at decodeResponse (/usr/local/polkadot_block_task/node_modules/kafkajs/src/protocol/requests/fetch/v1/response.js:27:29) at Decoder.readArrayAsync (/usr/local/polkadot_block_task/node_modules/kafkajs/src/protocol/decoder.js:168:24) at Object.decode (/usr/local/polkadot_block_task/node_modules/kafkajs/src/protocol/requests/fetch/v1/response.js:33:35) at Connection.send (/usr/local/polkadot_block_task/node_modules/kafkajs/src/network/connection.js:310:45) at
at process._tickCallback (internal/process/next_tick.js:189:7)
name: 'KafkaJSNotImplemented',
retriable: false,
helpUrl: undefined },
groupId: 'polkadot_block' } }