def _individual_message_commit(offline_ingestors, consumer, logger):
logger.info("{} offline ingestors running".format(len(offline_ingestors)))
for job_id, job_item in offline_ingestors.items():
result = job_item["proc"].poll()
if result is not None:
logger.info("Offline ingestor for job id {} ended with result {}".format(job_id,result))
if result == 0:
logger.info("Executing commit for message with job id {}".format(job_id))
consumer.commit(message=job_item["message"])
logger.info("Removed ingestor for message with job id {} from queue".format(job_id))
Currently this code is run only when config.kafka.individual_message_commit option is active.
But it has some logging lines that need to be run always.
Currently this code is run only when
config.kafka.individual_message_commit
option is active. But it has some logging lines that need to be run always.