neighbourhoodie / adonis-kafka-v6

MIT License
10 stars 0 forks source link

Multiple consumers, producers & error handling #2

Closed janl closed 6 months ago

janl commented 7 months ago

This is a first draft at allowing to configure multiple producers and consumers per Kafka instance.

It does away with starting a default consumer and producer while making the getting started documentation minimally more complicated. This allows for a lot more control over how to configure (mainly) consumers. Details in Readme.

Also first stab at consumer error handling, currently only JSON.parse() errors are handled, but you can register for those. Still need to figure out what to do if none are registered. Throw then?

ayhamthemayhem commented 7 months ago

We reviewed and tested this on a demo project where we created two producers on different endpoints like this

router.get('/', async (ctx: HttpContext) => {
  const producer = kafka.createProducer()
  await producer.start()
  await producer.send('test-1', { message: 'Hello from the root url' })
  return {
    hello: 'world from the package',
  }
})

router.get('/prod2', async (ctx: HttpContext) => {
  const producer = kafka.createProducer()
  await producer.start()
  await producer.send('test-1', { message: 'Hello from the prod2 url' })
  return {
    hello: 'world from the package',
  }
})

Consumers:

import Kafka from '@neighbourhoodie/adonis-kafka/services/kafka'

const consumerGroup1 = Kafka.createConsumer({ groupId: 'group-1' })

consumerGroup1.on(
  'test-1',
  (data: any, commit: any) => {
    console.log('consumerGroup1', data)
    // commit(false) // For error transaction
    commit() // For successful transaction
  },
  { fromBeginning: true }
)

consumerGroup1.start()

const consumerGroup2 = Kafka.createConsumer({ groupId: 'group-2' })

consumerGroup2.on(
  'test-1',
  (data: any, commit: any) => {
    console.log('consumerGroup2', data)
    // commit(false) // For error transaction
    commit() // For successful transaction
  },
  { fromBeginning: true }
)

consumerGroup2.start()

we also fixed some default values for consumer.start() so no need to pass an empty object same is done for createProducer

janl commented 7 months ago

two producers on different endpoints like this

thanks for illustrating this, we might wanna make it so that we can up the producers in start/kafka.ts and reference them in the routes, in case we want the same producer used in more than one route. I’ll see about enabling this.

janl commented 7 months ago

the latest push implements multiple producers as well:

Kafka.createProducer('foo', {} /* ProducerConfig */).start()
Kafka.createProducer('bar', {} /* ProducerConfig */).start()

// in your route, you can now do:
// router.get('/test', ( { kafka }: HttpContext)) => {
//   await kafka.producer['foo'].send('msg', {yay: 1})
//   return 'sent'
// })