zeromq / pyzmq

PyZMQ: Python bindings for zeromq
http://zguide.zeromq.org/py:all
BSD 3-Clause "New" or "Revised" License
3.72k stars 636 forks source link

Subscriber recieve hangs infinitely #1875

Closed alex-sherwood closed 1 year ago

alex-sherwood commented 1 year ago

What pyzmq version?

infinitely

What libzmq version?

no

Python version (and how it was installed)

3.10.5

OS

Windows 11

What happened?

A bug happened!

Code to reproduce bug

That's the server code:

# -*- coding: utf-8 -*-

#from modules.telegram_client import telegram_client
#from settings import Settings

# tg_api_cred = Settings.get_telegram_api_credentials()
# tg_client = telegram_client.TelegramClientCustom('session_name', tg_api_cred['api_id'], tg_api_cred['api_hash'],
#                                                  raise_last_call_error=True)
#
# tg_client.start(Settings.get_telegram_phone_number())

import zmq
import threading
import json
import os
import time

# Connect the publisher socket
context = zmq.Context()
publisher = context.socket(zmq.PUB)
publisher.bind("tcp://127.0.0.1:5556")

# Connect the router socket for receiving acknowledgements
router = context.socket(zmq.ROUTER)
router.bind("tcp://127.0.0.1:5557")

# Save message info to a dictionary (a stand-in for your database)
messages = {}

# Send messages
consumers = ['consumer1',  ]

for i in range(10):
    message_id = str(i)
    file_path = f'./content/{i}'

    # Save message info to the database
    messages[message_id] = {
        'file_path': file_path,
        'consumers': consumers.copy(),  # copy the list because we're going to modify it
        'processed_by': [],
    }

    # Send the message to all consumers
    publisher.send_json({
        'message_id': message_id,
        'text': f'This is message {i}',
        'media_path': file_path,
    })

# Cleanup process
def cleanup():
    while True:
        for message_id, message in messages.items():
            print(message.items())
            print(message['processed_by'])
            if set(message['consumers']) == set(message['processed_by']):
                print(f"Deleting file {message['file_path']}")
                # os.remove(message['file_path'])  # uncomment this to actually delete the file
                del messages[message_id]

        time.sleep(5)  # pause between cleanup runs

cleanup_thread = threading.Thread(target=cleanup, daemon=True)
cleanup_thread.start()

# Receive acknowledgements
while True:
    # Wait for next request from client
    message = router.recv_json()
    print(f"Received request: {message}")

    # Process the acknowledgement
    if message['message_id'] in messages:
        messages[message['message_id']]['processed_by'].append(message['consumer'])

    time.sleep(5)

That's the client code:
import zmq
import time

# Prepare context and sockets
context = zmq.Context()
consumer_id = 'consumer1'  # change this for each consumer

# Connect the subscriber socket
subscriber = context.socket(zmq.SUB)
subscriber.connect("tcp://127.0.0.1:5556")
subscriber.setsockopt_string(zmq.SUBSCRIBE, '')

# Connect the dealer socket for sending acknowledgements
dealer = context.socket(zmq.DEALER)
dealer.identity = consumer_id.encode()
dealer.connect("tcp://127.0.0.1:5557")

# Process messages
while True:  
    message = subscriber.recv_json()
    print(f"Received message: {message}")
    # Send acknowledgement
    dealer.send_json({
        'message_id': message['message_id'],
        'consumer': consumer_id,
    })

    time.sleep(2)  # pause between processing messages

Traceback, if applicable

Consumer just hangs and does nothing, reason unknown. This example was generated by chatGPT v4 and looks fine, but for some reason it won't work

More info

No response

alex-sherwood commented 1 year ago

Here's the explanation https://stackoverflow.com/questions/76395082/pyzmq-client-hangs-forever-on-receiving-data/