Open Shamshiel opened 3 years ago
Hey @Shamshiel are you able to provide the config you used for this? Assuming each client had their own consumerId?
To reproduce this issue you can use this config in the current E2E tests in config.app.sync.ts
:
@Module({
imports: [
KafkaModule.register([
{
name: 'KAFKA_SERVICE',
options: {
client: {
clientId: 'test-e2e',
brokers: ['localhost:9092'],
retry: {
retries: 2,
initialRetryTime: 30,
},
},
consumer: {
groupId: 'test-e2e-consumer',
allowAutoTopicCreation: true,
},
deserializer: new KafkaAvroResponseDeserializer({
host: 'http://localhost:8081/'
}),
serializer: new KafkaAvroRequestSerializer({
config: {
host: 'http://localhost:8081/'
},
schemas: [
{
topic: TOPIC_NAME,
key: TOPIC_NAME,
value: TOPIC_NAME,
}
],
}),
}
},
{
name: 'KAFKA_SERVICE_2',
options: {
client: {
clientId: 'test-e2e-2',
brokers: ['localhost:9092'],
retry: {
retries: 2,
initialRetryTime: 30,
},
},
consumer: {
groupId: 'test-e2e-consumer-2',
allowAutoTopicCreation: true,
},
deserializer: new KafkaAvroResponseDeserializer({
host: 'http://localhost:8081/'
}),
serializer: new KafkaAvroRequestSerializer({
config: {
host: 'http://localhost:8081/'
},
schemas: [
{
topic: TOPIC_NAME,
key: TOPIC_NAME,
value: TOPIC_NAME,
}
],
}),
}
},
]),
TestConsumer
],
})
export default class AppModule {}
The result of the test will be the following:
AppModule Sync (e2e)
× We can SEND and ACCEPT AVRO messages (5134 ms)
● AppModule Sync (e2e) › We can SEND and ACCEPT AVRO messages
expect(received).toBe(expected) // Object.is equality
Expected: 2
Received: 4
I register two clients in the test and only the KAFKA_SERVICE
client is used but I still receive four messages instead of the two that I should get. If you debug the code you can see that the method onModuleInit()
in combination with the SUBSCRIBER_MAP
is why this happens. KAFKA_SERVICE_2
will be initalized on startup and iterate over the SUBSCRIBER_MAP
. The SUBSCRIBER_MAP
is global and was filled with one topic. This topic will be initalized for all clients in onModuleInit()
.
If I register two clients
KAFKA_CLIENT_1
andKAFKA_CLIENT_2
on the same broker I receive every message twice because the@SubscribeTo
will be registered to both clients regardless of what I configured.The problem is the static/global
SUBSCRIBER_MAP
because it is the same for everyKafkaService
. So if I subscribe to the topic hero.kill.dragon withKAFKA_CLIENT_1
, the current implementation will also subscribe the topic hero.kill.dragon toKAFKA_CLIENT_2
. This happens inonModuleInit()
in theKafkaService
.I think to solve this problem every
KafkaService
needs its ownSUBSCRIBER_MAP
andSUBSCRIBER_OBJECT_MAP
.I'm pretty sure this will also lead to weird behaviour with different brokers.