eclipse / paho.mqtt.python

paho.mqtt.python
Other
2.15k stars 724 forks source link

Broken Pipe issue #727

Closed Daniel730 closed 7 months ago

Daniel730 commented 1 year ago

Hi!

I'm having trouble with python mqtt for a few months now and I can't find any solutions to this. I'm running the Python MQTT in my Raspberry PI 3 and I'm using sonoff with Tasmota. I don't know why, but after a few hours of the script running, it just stops and Notify me an Error 32: Broken Pipe. After this error, my script just stops sending my Radio Frequency codes.

If anyone can help me with this issue or point out what I'm doing wrong, I'll be glad. Thanks in advance.

Here's the code I'm using below.

from datetime import datetime
from dotenv import load_dotenv
from urllib.error import HTTPError
import bugsnag
import errno
import json
import paho.mqtt.client as mqtt
import os
import time
import urllib.parse as urlparse
import urllib3

#Config .env
load_dotenv()

#Config bugsnag
bugsnag.configure(
    api_key = os.getenv('BUGSNAG_API_KEY'),
    project_root = "./"
)

# Parse CLOUDMQTT_URL (or fallback to localhost)
url_str = os.environ.get('CLOUDMQTT_URL', 'mqtt://localhost:1883') # mqtt://localhost:1883
url = urlparse.urlparse(url_str)
topic = url.path[1:] or 'tele/sonoff/RESULT'

# Define event callbacks
def on_connect(client, userdata, flags, rc):
    if rc == 0:
        print("Connected")
        global Connected
        Connected = True
        register_log('Connected')

        # Start subscribe, with QoS level 0
        mqttc.subscribe(topic, 0)
        register_log('Subscribed to topic %s' % (topic))
    else:
        print("Failed on connection")
        register_log('Failed on connection. RC: %s' % (rc))

def on_message(client, obj, msg):
    responseBroker = str(msg.payload.decode(encoding="utf-8"))
    register_log('on_message: %s' % (responseBroker))

    if "{" in responseBroker:
        jsonLoad = json.loads(responseBroker)
        code = jsonLoad["RfReceived"]["Data"]
        url = os.getenv('BASE_URL') + str(os.getenv('CLIENT_ID')) + "/" + code + "/RFReceived"
        register_log(url)
        try:
            http = urllib3.PoolManager()
            contents = http.request('GET', url)
            print(msg.topic + ": " + url)
            print(contents.data.decode("utf-8"))
            register_log('Success')
        except (urllib3.exceptions.HTTPError, urllib3.exceptions.ProtocolError) as e:
            print('Request failed: %s' % (str(e.reason)))
            register_log('Request failed: %s' % (str(e.reason)))

def on_publish(client, obj, mid):
    register_log('on_publish called')
    print("mid: " + str(mid))
    register_log("mid: %s" % (str(mid)))
    pass

def on_subscribe(client, obj, mid, granted_qos):
    register_log('on_subscribe called')
    print("Subscribed: " + str(mid) + " " + str(granted_qos))
    register_log("Subscribed: %s %s" % (str(mid), str(granted_qos)))

def on_log(client, obj, level, string):
    print(string)

def before_notify_callback(event):
    event.user = {
        "client": os.getenv('CLIENT_ID'),
        "raspberryIp": os.getenv('RASPBERRY_IP'),
    }

def set_date_and_hour():
    global date
    global hour

    today = datetime.now()
    date = today.strftime("%Y-%m-%d")
    hour = today.strftime("%H:%M:%S")

# Setup logging
def register_log(message):
    set_date_and_hour()
    filename = "/home/dietpi/logs/%s.log" % (str(date))

    log = open(filename, "a")
    log.write("[%s %s] %s \n" % (str(date), str(hour), message))
    log.close()

set_date_and_hour()

# Call `before_notify_callback` before every event
bugsnag.before_notify(before_notify_callback)

Connected = False

mqttc = mqtt.Client()

# Assign event callbacks
mqttc.on_connect = on_connect
mqttc.on_publish = on_publish
mqttc.on_message = on_message
mqttc.on_subscribe = on_subscribe

# Uncomment to enable debug messages
# mqttc.on_log = on_log

# Connect
register_log('Attempting connection to %s in hostname %s port %s' % (url.username, url.hostname, url.port))
mqttc.username_pw_set(url.username, url.password)
mqttc.connect(url.hostname, url.port)
register_log('Connected to %s in hostname %s port %s' % (url.username, url.hostname, url.port))

# Start the loop
register_log('Starting loop_forever')
mqttc.loop_forever()

# Wait for connection
while Connected != True:
    register_log('Waiting for connection')
    time.sleep(0.1)

# Publish a message
register_log('Publishing message to topic: %s' % (topic))
mqttc.publish(topic, "Python is listening!")

try:
    while True:
        print("publishing to message")
        register_log('Publishing message to topic: %s' % (topic))
        mqttc.publish(topic, "Python is listening!")
        time.sleep(1)

except KeyboardInterrupt:
    print("exiting")
    register_log('Keyboard interrupt. Exiting')
    mqttc.disconnect()
    register_log('Disconnected')
MattBrittan commented 7 months ago

I'm going to close this due to the amount of time it's been open (apologies that no one responded to you). If you do still require help please provide the full error message (including output from just before the error, the stack trace, and ideally logs). Note that nothing under loop_forever() will be run (unless you stop the loop).