Hello! I am processing messages in django celery and kombu from kafka with confluentkafka connection. The thing is message is getting processed exactly twice. At first it is processed as expected and get acknowledged. Then i can see yet anoter try to process the same message but it got skipped due to check. Here is my code
def process_message_callback(body, message):
if message.acknowledged:
print(f"Message has {body} already been acknowledged, the state is {message._state} skipping...")
return
try:
# Parse the decoded JSON data
print(f"Received message: {body}")
print(f"message is: {message}")
except (json.JSONDecodeError, binascii.Error) as e:
print(f"Error decoding or processing message: {e}")
finally: # Acknowledge only on successful processing
message.ack()
@shared_task
def process_message():
print("Processing message")
with Connection(settings.CELERY_BROKER_URL) as conn:
topic=settings.KAFKA_TOPICS['incoming_reporter']
exchange = Exchange(topic, type='direct')
queue = Queue(topic, exchange, routing_key=topic)
queue.maybe_bind(conn)
# queue.declare() # Ensure the queue is declared
# The consumer must be declared inside of with statement.
with conn.Consumer(queue, callbacks=[process_message_callback]) as consumer:
consumer.register_callback(process_message_callback)
while True:
try:
conn.drain_events(timeout=1)
except socket.timeout:
print("No new messages within timeout. Continuing...")
except Exception as e: # Catch all exceptions
print(f"An error occurred: {e}")
finally:
conn.close() #close the connection to release resources`
And what i can see from the logs
PROCESSING THE MESSAGE >>>>
celery-worker-1 | [2024-06-20 15:31:56,733: WARNING/ForkPoolWorker-3] Received message: text is random agitated-alligator
celery-worker-1 | [2024-06-20 15:31:56,733: WARNING/ForkPoolWorker-3] message is: <Message object at 0xffff8705e330 with details {'state': 'RECEIVED', 'content_type': 'text/plain', 'delivery_tag': '09a10795-eaf0-406e-883a-d3f3620141a4', 'body_length': 33, 'properties': {}, 'delivery_info': {'exchange': 'incoming_reporter', 'routing_key': 'incoming_reporter'}}>
<<<< PROCESSING THE MESSAGE
------ RECEIVED DUPLICATE AND SKIPED -------- >>>>
celery-worker-1 | [2024-06-20 15:31:56,733: WARNING/ForkPoolWorker-3] Message has text is random agitated-alligator already been acknowledged, the state is ACK skipping...```
Hello! I am processing messages in django celery and kombu from kafka with confluentkafka connection. The thing is message is getting processed exactly twice. At first it is processed as expected and get acknowledged. Then i can see yet anoter try to process the same message but it got skipped due to check. Here is my code
`from celery import shared_task from django.conf import settings from kombu import Connection, Exchange, Queue import json import base64 import binascii import socket
def process_message_callback(body, message): if message.acknowledged: print(f"Message has {body} already been acknowledged, the state is {message._state} skipping...")
@shared_task def process_message(): print("Processing message") with Connection(settings.CELERY_BROKER_URL) as conn: topic=settings.KAFKA_TOPICS['incoming_reporter'] exchange = Exchange(topic, type='direct') queue = Queue(topic, exchange, routing_key=topic)
queue.maybe_bind(conn)
And what i can see from the logs