logging.info("Started Document processing Consumer ...")
async def error_cb(e):
sys.exit()
async def disconnected_cb():
logging.info(f"Got disconnected from NATS server .. Retrying .")
async def reconnected_cb():
logging.info("Got reconnected...")
nc = await nats.connect(
servers=f"nats://{NATS_HOST}:{NATS_PORT}",
error_cb=error_cb,
reconnected_cb=reconnected_cb,
disconnected_cb=disconnected_cb,
max_reconnect_attempts=MAX_RECONNECT_ATTEMPTS,
)
# Create JetStream context.
js = nc.jetstream()
## PERSIST ON THIS SUBJECT
await js.add_stream(
subjects=[DOCUMENT_EXT_SUBJECT],
name="sample-stream",
## Extra
retention=RetentionPolicy.WORK_QUEUE,
)
await js.subscribe(
DOCUMENT_EXT_SUBJECT,
stream="sample-stream",
queue = "worker_queue", # also knowas as "deliver group". In core nats, it's called "queue group".
cb=task_cb,
manual_ack=True,
config=consumer_config,
)
def signal_handler():
sys.exit()
for sig in ('SIGINT', 'SIGTERM'):
asyncio.get_running_loop().add_signal_handler(getattr(signal, sig), signal_handler)
await nc.flush()
logging.info("Done ... ?")
if name == 'main':
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(run())
loop.run_forever()
except Exception as e:
print("Got error : ", e)
finally:
loop.close()
### MY PROBLEM :
I populate the queue using `producer.py`.
Then I run `consumer.py` script in multple terminal as separate processes.
What's happening is, things don't happen in parallel, like it happened in celery.
Although there are 3 processes to consume message, while a task is being processed inside one of the consumer process, nats server doesn't push any task to other 2 consumer. Everything happens one after another.
am I doing something wrong ? Could it be because I my usecase is for cpu-bound task ?
I am trying to use nats for job queuing. I basically want it's as an alternative to celery+rabbitmq for my usecase. I have two scripts.
producer.py
consumer.py
import nats from nats.js.api import RetentionPolicy, ConsumerConfig, AckPolicy
consumer_config = ConsumerConfig( ack_wait=900, max_deliver=1, max_ack_pending=1, ack_policy=AckPolicy.EXPLICIT )
Nats
NATS_HOST = '0.0.0.0' NATS_PORT = '4222' DOCUMENT_EXT_SUBJECT = "file_process"
MAX_RECONNECT_ATTEMPTS = 10
A cpu bound task
def time_consuming_task(): import time; time.sleep(50) return
async def task_cb(msg): time_consuming_task() await msg.ack() print("acknowledged document-extraction !")
async def run():
if name == 'main': loop = asyncio.get_event_loop() try: loop.run_until_complete(run()) loop.run_forever() except Exception as e: print("Got error : ", e) finally: loop.close()