StartConsumers() from kafka/main.go calls panic(batchPipeline) when readAndCommitBatchPipelineResults() returns an error. But this will trigger panic and terminate the process if processMessagesIntoBatchPipeline() posts a new message to batchPipeline channel.
In general the logic of handling transient errors is broken. processMessagesIntoBatchPipeline() can only terminate by panic and that kills the process. If this is intended bbehavior, then the code to call StartConsumers() as a recovery should be removed.
StartConsumers() from kafka/main.go calls panic(batchPipeline) when readAndCommitBatchPipelineResults() returns an error. But this will trigger panic and terminate the process if processMessagesIntoBatchPipeline() posts a new message to batchPipeline channel.
In general the logic of handling transient errors is broken. processMessagesIntoBatchPipeline() can only terminate by panic and that kills the process. If this is intended bbehavior, then the code to call StartConsumers() as a recovery should be removed.