eclipse / paho.mqtt.python

paho.mqtt.python
Other
2.18k stars 723 forks source link

Unable to make 340 connections #238

Open liquidharmonic opened 6 years ago

liquidharmonic commented 6 years ago

Hi Everyone,

I've been hitting my head against the wall because I can't make more than 340 connections to a local vernemq server and I have no idea why. I created a simple script to to try to make 350 connections but CONNACKs are not received after the first 340 connections are made. I have not received the connection limit on my local machine because if I run two instance a total of 680 (340 x 2) connections are established.

Is anyone aware of this bizarre limit ?

PS. My goal is to use this client with locustio for load testing.

My setup

_testpaho.py

import paho.mqtt.client as mqtt
import threading

connect_count = 0
lock = threading.Lock()
def locust_on_connect(client, flags_dict, userdata, rc):
    global lock
    global connect_count
    print(f"client: {client} connected, rc: {rc}")
    lock.acquire()
    connect_count += 1
    print(f"connect_count: {connect_count}")
    lock.release()

def locust_on_subscribe(client, userdata, mid, granted_qos):
    print(f"client: {client} subscribed, mid: {mid}")   

def log(client, userdata, level, buf):
    print(f"[paho-log][client: {client}] {buf}")

for x in range(0, 350):
    client = mqtt.Client(transport="websockets")

    host = localhost #<local server>
    port = 8888 #<websocket port> 

    client.on_connect = locust_on_connect
    client.connect_async(host, port)
    client.loop_start()

    client.subscribe("/topic", 1)

input("type enter to end")

last few lines of output

connect_count: 339
connect_count: 340

Any help is greatly appreicated.

joernheissler commented 6 years ago

I think duplicate of #183.

Each mqtt client means 3 open file descriptors: one to the mqtt server, and a pair of sockets connected to each other. 3 * 340 = 1020. And there you hit the 1024 limit of select.

liquidharmonic commented 6 years ago

Thanks @joernheissler. You are right. It is due to python being compiled with FD_SETSIZE=1024. I've tried recompiling python 3.6.3 with /usr/include/sys/_types/_fd_setsize.h setting FD_SETSIZE=2048 with no luck. Was anyone successful in increasing the FD_SETSIZE when building their own python? Or has anyone forked a copy of paho-mqtt with poll() instead of select()?

I've been using the following script to test select()

from socket import *
from select import select
s = [socket(AF_INET, SOCK_DGRAM) for i in range(2048)]
select(s, [], [], 1)

My setup:

joernheissler commented 6 years ago

I'm using asyncio. I didn't try with more than 1 connection, but I don't see why it shouldn't work.

Cameron-Rowshanbin-ConnectedLab commented 6 years ago

I've been looking at this issue as well, and it seems like if it is FD_SETSIZE that's limiting us, it's not obvious how. Using lsof -a -p <pid> while the script is running shows that we're able to open more than 1024 file descriptors.. the issue is that after 340 connections, the file descriptors to the mqtt server become closed. This is both with the default paho client as well as the one off your branch using asyncio. Definitely strange, not sure where to look next

joernheissler commented 6 years ago

it seems like if it is FD_SETSIZE that's limiting us, it's not obvious how.

It's in the select manpage. select doesn't like larger FDs. I looked at the kernel code too, but there it wasn't obvious to me if it's purely a userspace limitation or kernel too. "select() can monitor only file descriptors numbers that are less than FD_SETSIZE"

after 340 connections, the file descriptors to the mqtt server become closed. This is both with the default paho client as well as the one off your branch using asyncio.

You're saying that with asyncio this still happens? I haven't tried yet, but I really doubt it. I would be really surprised if it were happening.

not sure where to look next

strace should prove really helpful here.

joernheissler commented 6 years ago

So I wrote a program which opens 1500 useless file descriptors and then 1 mqtt connection, with asyncio. Works as expected.

Got code which won't work for you?

Cameron-Rowshanbin-ConnectedLab commented 6 years ago

Hi! Just got it working on our end, I think I had basically made a silly mistake and kept calling start_loop() in the client which ended up using select anyways. We're able to open over 340 connections (seems to hang at around 1000 connections, looking at why for that), and we're now just familiarizing ourselves enough with asyncio to fine-tune how we spin up our workers.

Our code now basically looks like your example here, except we've put 'main' in a loop to spin up hundreds of workers. We're even directly using your AsyncioHelper class.

Thanks so much for your help!

kellycampbell commented 6 years ago

We ran into this while doing some performance testing too. I have a patch which substitutes eventfd instead of select. I haven't tested on anything except linux though.

https://github.com/kellycampbell/paho.mqtt.python/commit/f23831ee365278052a3a4bd07a6851207f25a2e3

joernheissler commented 6 years ago

I think eventfd is a linux-only feature.

susfly commented 5 years ago

hitting the same problem... any news?

Scott-Wallace-ConnectedLab commented 5 years ago

What worked for us was 2 things.

  1. Changing file descriptors for the system, see: https://www.cyberciti.biz/faq/linux-increase-the-maximum-number-of-open-files/
  2. Increasing the FD_SETSIZE in /usr/include/sys/select.h, /usr/include/bits/typesizes.h, /usr/include/linux/posix_types.h and then recompiling python (this is because the select() call that python uses while setting up the connections uses this hardcoded value).
kellycampbell commented 5 years ago

@susfly the solution I posted in the comment from Nov 28, 2017 is what worked for us.

susfly commented 5 years ago

We can use multiprocessing instead. Each process 340 connects. And it works well.

fjpa121197 commented 4 years ago

Hi, can you show me an example of using multiprocessing and threading? Im trying to simulate a high number of devices sending data, im using paho-mqtt. However, I cannot manage to connect 1,000 devices (my goal is to go up to 10K devices).

import paho.mqtt.client as mqtt
import time
import threading
import logging
import thingsboard_objects as Things
import random
import datetime
logging.basicConfig(level=logging.INFO)

init_time = time.time()

def Connect(client, broker, port, token, keepalive, run_forever=False):
    connflag = False
    delay = 5
    print("connecting ",client)
    badcount = 0  # counter for bad connection attempts
    while not connflag:
        print(logging.info("connecting to broker " + str(broker)))
        # print("connecting to broker "+str(broker)+":"+str(port))
        print("Attempts ", str(badcount))
        time.sleep(2)
        try:
            client.username_pw_set(token)
            client.connect(broker, port, keepalive)
            connflag = True

        except:
            client.badconnection_flag = True
            logging.info("connection failed " + str(badcount))
            badcount += 1
            if badcount >= 3 and not run_forever:
                return -1
                raise SystemExit  # give up

    return 0

def wait_for(client, msgType, period=1, wait_time=20, running_loop=False):
    """Will wait for a particular event gives up after period*wait_time, Default=10
seconds.Returns True if succesful False if fails"""
    # running loop is true when using loop_start or loop_forever
    client.running_loop = running_loop  #
    wcount = 0
    while True:
        logging.info("waiting" + msgType)
        if msgType == "CONNACK":
            if client.on_connect:
                if client.connected_flag:
                    return True
                if client.bad_connection_flag:  #
                    return False

        if msgType == "SUBACK":
            if client.on_subscribe:
                if client.suback_flag:
                    return True
        if msgType == "MESSAGE":
            if client.on_message:
                if client.message_received_flag:
                    return True
        if msgType == "PUBACK":
            if client.on_publish:
                if client.puback_flag:
                    return True

        if not client.running_loop:
            client.loop(.01)  # check for messages manually
        time.sleep(period)
        wcount += 1
        if wcount > wait_time:
            print("return from wait loop taken too long")
            return False
    return True

def client_loop(client, broker, port, token, keepalive=300, loop_function=None,
                loop_delay=10, run_forever=False):
    """runs a loop that will auto reconnect and subscribe to topics
    pass topics as a list of tuples. You can pass a function to be
    called at set intervals determined by the loop_delay
    """
    client.run_flag = True
    client.broker = broker
    print("running loop ")
    client.reconnect_delay_set(min_delay=1, max_delay=12)

    while client.run_flag:  # loop forever

        if client.bad_connection_flag:
            break
        if not client.connected_flag:
            print("Connecting to " + broker)
            if Connect(client, broker, port, token, keepalive, run_forever) != -1:
                if not wait_for(client, "CONNACK"):
                    client.run_flag = False  # break no connack
            else:  # connect fails
                client.run_flag = False  # break
                print("quitting loop for  broker ", broker)

        client.loop(0.01)

        if client.connected_flag and loop_function:  # function to call
            loop_function(client, loop_delay)  # call function

    time.sleep(1)
    print("disconnecting from", broker)
    if client.connected_flag:
        client.disconnect()
        client.connected_flag = False

def on_log(client, userdata, level, buf):
    print(buf)

#def on_message(client, userdata, message):
#    time.sleep(1)
#    print("message received", str(message.payload.decode("utf-8")))

def on_connect(client, userdata, flags, rc):
    if rc == 0:
        client.connected_flag = True  # set flag
        for c in clients:
            if client == c["client"]:
                if c["sub_topic"] != "":
                    client.subscribe(c["sub_topic"])

                    print("connected OK")
    else:
        print("Bad connection Returned code=", rc)
        client.loop_stop()

def on_disconnect(client, userdata, rc):
    client.connected_flag = False  # set flag
    # print("client disconnected ok")

def on_publish(client, userdata, mid):
    print("In on_pub callback mid= ", mid)

def pub(client, loop_delay):

    rmd_current = round(random.uniform(0.6, 50.0), 2)
    rmd_pressure = round(random.uniform(0.6, 50.0), 2)
    global init_time
    if time.time() - init_time >= 3600:
        rmd_mnc = round(random.uniform(5.0, 30.0), 2)
        rmd_sdc = round(random.random(), 2)
        rmd_mnp = round(random.uniform(5.0, 30.0), 2)
        rmd_sdp = round(random.random(), 2)

        client.publish('v1/devices/me/telemetry',
                       '{"Current": "%s","Pressure": "%s","Str": "12341","Stp": "12340","AL1": "~","AL2": "~",'
                       '"AL3": "~","AL4": "~","AL5": "~","AL6": "~","AL7": "~","AL8": "~"}' % (rmd_current, rmd_pressure))
        client.publish('v1/devices/me/telemetry',
                       '{"MnC": "%s", "SdC": "%s", "Str": "2554","Stp": "2554", '
                       '"MnP": "%s", "SdP": "%s"}' % (rmd_mnc, rmd_sdc, rmd_mnp, rmd_sdp))

        init_time = time.time()
    else:
        client.publish('v1/devices/me/telemetry',
                       '{"Current": "%s","Pressure": "%s","Str": "12341","Stp": "12340","AL1": "~","AL2": "~",'
                       '"AL3": "~","AL4": "~","AL5": "~","AL6": "~","AL7": "~","AL8": "~"}' % (rmd_current, rmd_pressure))
    print(datetime.datetime.now())
    time.sleep(loop_delay)
    pass

def Create_connections():
    for i in range(n_clients):
        cname = "client" + str(i)
        t = int(time.time())
        client_id = cname + str(t)  # create unique client_id
        client = mqtt.Client(client_id)  # create new instance
        clients[i]["client"] = client
        clients[i]["client_id"] = client_id
        clients[i]["cname"] = cname
        broker = clients[i]["broker"]
        port = clients[i]["port"]
        token = clients[i]["token"]
        client.on_connect = on_connect
        client.on_disconnect = on_disconnect
        client.on_publish = on_publish
        #client.on_message = on_message
        t = threading.Thread(target=client_loop, args=(client, broker, port, token, 300, pub))
        threads.append(t)
        t.start()

if __name__ == '__main__':

    things_location = input("What type of thingsboard installation are you working with (demo/local)? ")

    if things_location == "local":
        type_install = 'cseetprj03.essex.ac.uk:8080'
        broker = 'cseetprj03.essex.ac.uk'
    else:
        type_install = broker = 'demo.thingsboard.io'

    header = Things.get_credentials(things_location)
    my_devices = Things.get_devices_id(header, type_install)

    clients = []
    for device in my_devices:
        device_info = {"broker": broker, "port": 1883, "name": device["name"],
                       "token": Things.get_device_token(device["id"]["id"], header, type_install)}
        clients.append(device_info)

    n_clients = len(clients)
    mqtt.Client.connected_flag = False  # create flag in class
    mqtt.Client.bad_connection_flag = False  # create flag in class

    threads = []
    print("Creating Connections ")
    no_threads = threading.active_count()
    print("current threads =", no_threads)
    print("Publishing ")
    Create_connections()

    print("All clients connected ")
    no_threads = threading.active_count()
    print("current threads =", no_threads)
    print("starting main loop")
    try:
        while no_threads == 1001:
            time.sleep(10)
            no_threads = threading.active_count()
            print("current threads =", no_threads)
            for c in clients:
                if not c["client"].connected_flag:
                    print("broker ", c["broker"], " is disconnected")

    except KeyboardInterrupt:
        print("ending")
        for c in clients:
            c["client"].run_flag = False
    time.sleep(10)

That is my code. Is multiprocessing needed for this, or what can I change to be able to scale the sending of data up to 10K connections?

Thanks in advance

fjpa121197 commented 4 years ago

We can use multiprocessing instead. Each process 340 connects. And it works well.

Hi, can you share your solution?

susfly commented 4 years ago
import multiprocessing
p = multiprocessing.Process(target=yourthreadscreateandstartfunc)

def yourthreadscreateandstartfunc:
    for loop
       create thread
       start thread

each process should less than 340 threads

fjpa121197 commented 4 years ago
import multiprocessing
p = multiprocessing.Process(target=yourthreadscreateandstartfunc)

def yourthreadscreateandstartfunc:
    for loop
       create thread
       start thread

each process should less than 340 threads

Hi,

Sorry for asking again, but I can manage to start the second process with the second portion of my clients.

import multiprocessing
import paho.mqtt.client as mqtt
import time
import threading
import logging
import math
import thingsboard_objects as Things
import random
import datetime
import numpy as np
logging.basicConfig(level=logging.INFO)

init_time = time.time()

def Connect(client, broker, port, token, keepalive, run_forever=False):
    connflag = False
    delay = 5
    print("connecting ",client)
    badcount = 0  # counter for bad connection attempts
    while not connflag:
        print(logging.info("connecting to broker " + str(broker)))
        # print("connecting to broker "+str(broker)+":"+str(port))
        print("Attempts ", str(badcount))
        time.sleep(2)
        try:
            client.username_pw_set(token)
            client.connect(broker, port, keepalive)
            connflag = True

        except:
            client.badconnection_flag = True
            logging.info("connection failed " + str(badcount))
            badcount += 1
            if badcount >= 3 and not run_forever:
                return -1
                raise SystemExit  # give up

    return 0

def wait_for(client, msgType, period=1, wait_time=15, running_loop=False):
    """Will wait for a particular event gives up after period*wait_time, Default=10
seconds.Returns True if succesful False if fails"""
    # running loop is true when using loop_start or loop_forever
    client.running_loop = running_loop  #
    wcount = 0
    while True:
        logging.info("waiting" + msgType)
        if msgType == "CONNACK":
            if client.on_connect:
                if client.connected_flag:
                    return True
                if client.bad_connection_flag:  #
                    return False

        if msgType == "SUBACK":
            if client.on_subscribe:
                if client.suback_flag:
                    return True
        if msgType == "MESSAGE":
            if client.on_message:
                if client.message_received_flag:
                    return True
        if msgType == "PUBACK":
            if client.on_publish:
                if client.puback_flag:
                    return True

        if not client.running_loop:
            client.loop(.01)  # check for messages manually
        time.sleep(period)
        wcount += 1
        if wcount > wait_time:
            print("return from wait loop taken too long")
            return False
    return True

def client_loop(client, broker, port, token, keepalive=600, loop_function=None,
                loop_delay=10, run_forever=False):
    """runs a loop that will auto reconnect and subscribe to topics
    pass topics as a list of tuples. You can pass a function to be
    called at set intervals determined by the loop_delay
    """
    client.run_flag = True
    client.broker = broker
    print("running loop ")
    client.reconnect_delay_set(min_delay=1, max_delay=12)

    while client.run_flag:  # loop forever

        if client.bad_connection_flag:
            break
        if not client.connected_flag:
            print("Connecting to " + broker)
            if Connect(client, broker, port, token, keepalive, run_forever) != -1:
                if not wait_for(client, "CONNACK"):
                    client.run_flag = True  # break no connack
            else:  # connect fails
                client.run_flag = False  # break
                print("quitting loop for  broker ", broker)

        client.loop(0.01)

        if client.connected_flag and loop_function:  # function to call
            loop_function(client, loop_delay)  # call function

    time.sleep(1)
    print("disconnecting from", broker)
    if client.connected_flag:
        client.disconnect()
        client.connected_flag = False

def on_log(client, userdata, level, buf):
    print(buf)

def on_connect(client, userdata, flags, rc):
    if rc == 0:
        client.connected_flag = True  # set flag
        for c in clients:
          print("connected OK")
    else:
        print("Bad connection Returned code=", rc)
        client.loop_stop()

def on_disconnect(client, userdata, rc):
    client.connected_flag = False  # set flag
    # print("client disconnected ok")

def on_publish(client, userdata, mid):
    print("In on_pub callback mid= ", mid)

def pub(client, loop_delay):

    rmd_current = round(random.uniform(0.6, 50.0), 2)
    rmd_pressure = round(random.uniform(0.6, 50.0), 2)
    global init_time
    if time.time() - init_time >= 3600:
        rmd_mnc = round(random.uniform(5.0, 30.0), 2)
        rmd_sdc = round(random.random(), 2)
        rmd_mnp = round(random.uniform(5.0, 30.0), 2)
        rmd_sdp = round(random.random(), 2)

        client.publish('v1/devices/me/telemetry',
                       '{"Current": "%s","Pressure": "%s","Str": "12341","Stp": "12340","AL1": "~","AL2": "~",'
                       '"AL3": "~","AL4": "~","AL5": "~","AL6": "~","AL7": "~","AL8": "~"}' % (rmd_current, rmd_pressure))
        client.publish('v1/devices/me/telemetry',
                       '{"MnC": "%s", "SdC": "%s", "Str": "2554","Stp": "2554", '
                       '"MnP": "%s", "SdP": "%s"}' % (rmd_mnc, rmd_sdc, rmd_mnp, rmd_sdp))

        init_time = time.time()
    else:
        client.publish('v1/devices/me/telemetry',
                       '{"Current": "%s","Pressure": "%s","Str": "12341","Stp": "12340","AL1": "~","AL2": "~",'
                       '"AL3": "~","AL4": "~","AL5": "~","AL6": "~","AL7": "~","AL8": "~"}' % (rmd_current, rmd_pressure))
    print(datetime.datetime.now())
    time.sleep(loop_delay)

def Create_connections(n_clients, threads):
    for i in range(len(n_clients)):
        cname = "client" + n_clients[i]["name"]
        t = int(time.time())
        client_id = cname + str(t)  # create unique client_id
        client = mqtt.Client(client_id)  # create new instance
        clients[i]["client"] = client
        clients[i]["client_id"] = client_id
        clients[i]["cname"] = cname
        broker_p = clients[i]["broker"]
        port = clients[i]["port"]
        token = clients[i]["token"]
        client.on_connect = on_connect
        client.on_disconnect = on_disconnect
        client.on_publish = on_publish
        #client.on_message = on_message
        t = threading.Thread(target=client_loop, args=(client, broker_p, port, token, 600, pub))
        threads.append(t)
        t.start()

def main_loop(clients_loop):

    mqtt.Client.connected_flag = False  # create flag in class
    mqtt.Client.bad_connection_flag = False  # create flag in class

    threads = []
    print("Creating Connections ")
    no_threads = threading.active_count()
    print("current threads =", no_threads)
    print("Publishing ")
    Create_connections(clients_loop, threads)

    print("All clients connected ")
    no_threads = threading.active_count()
    print("current threads =", no_threads)
    print("starting main loop")
    try:
        while True:
            time.sleep(10)
            no_threads = threading.active_count()
            print("current threads =", no_threads)
            for c in clients_loop:
                if not c["client"].connected_flag:
                    print("broker ", c["broker"], " is disconnected" , c["name"])
                    time.sleep(1)

    except KeyboardInterrupt:
        print("ending")
        for c in clients:
            c["client"].run_flag = False

    time.sleep(10)

if __name__ == '__main__':

    # In case the user is using a demo version or local version of thingsboard
    things_location = input("What type of thingsboard installation are you working with (demo/local)? ")

    if things_location == "demo":
        type_install = "demo.thingsboard.io"
        header = Things.get_credentials(things_location)
    elif things_location == "local":
        computer = input("Which computer? ")
        type_install = "cseetprj%s.essex.ac.uk:8080" % computer
        broker = "cseetprj%s.essex.ac.uk" % computer
        header = Things.get_credentials("local", type_install)
    else:
        print("Error: Installation not supported")

    my_devices = Things.get_devices_id(header, type_install)

    clients = []
    for device in my_devices:
        device_info = {"broker": broker, "port": 1883, "name": device["name"],
                       "token": Things.get_device_token(device["id"]["id"], header, type_install)}
        clients.append(device_info)

    if len(clients) >= 200:
        print("Splitting devices to multiprocess")
        split_by = math.ceil(len(clients) / 250)
        split_clients = np.array_split(clients, split_by)

    jobs = []
    for idx, client_portion in enumerate(split_clients):
        print("Starting process for portion %s" % (idx + 1))
        p = multiprocessing.Process(target=main_loop, args = (client_portion,))
        jobs.append(p)
        p.start()

I cannot get past the create_connection part, all the clients connect but dont publish after it. I think is related to the order or possition of functions. But I dont why, any thoughts?

fjpa121197 commented 4 years ago
import multiprocessing
p = multiprocessing.Process(target=yourthreadscreateandstartfunc)

def yourthreadscreateandstartfunc:
    for loop
       create thread
       start thread

each process should less than 340 threads

Hi, how many connections were able to make for each process? And did you change any configuration in the machine that recieve the connections?

yxlwfds commented 3 years ago

may be your server limit