crossbario / crossbar

Crossbar.io - WAMP application router
https://crossbar.io/
Other
2.05k stars 274 forks source link

Adjust MQTT bridge to multi-node/clustering changes #2048

Open jsanyi opened 1 year ago

jsanyi commented 1 year ago

I'm running 22.6.1 from a pip install in venv, and cannot get mqtt transport working, see crash log below.

=====

2022-09-08T10:06:50+0200 [Router      17775] Unhandled Error
Traceback (most recent call last):
  File "/home/crossbar/crossbar-venv/lib/python3.10/site-packages/twisted/python/log.py", line 80, in callWithContext
    return context.call({ILogContext: newCtx}, func, *args, **kw)
  File "/home/crossbar/crossbar-venv/lib/python3.10/site-packages/twisted/python/context.py", line 117, in callWithContext
    return self.currentContext().callWithContext(ctx, func, *args, **kw)
  File "/home/crossbar/crossbar-venv/lib/python3.10/site-packages/twisted/python/context.py", line 82, in callWithContext
    return func(*args, **kw)
  File "/home/crossbar/crossbar-venv/lib/python3.10/site-packages/twisted/internet/posixbase.py", line 683, in _doReadOrWrite
    why = selectable.doRead()
--- <exception caught here> ---
  File "/home/crossbar/crossbar-venv/lib/python3.10/site-packages/twisted/internet/tcp.py", line 1410, in doRead
    protocol.makeConnection(transport)
  File "/home/crossbar/crossbar-venv/lib/python3.10/site-packages/twisted/internet/protocol.py", line 509, in makeConnection
    self.connectionMade()
  File "/home/crossbar/crossbar-venv/lib/python3.10/site-packages/crossbar/bridge/mqtt/wamp.py", line 222, in connectionMade
    self._when_ready()
  File "/home/crossbar/crossbar-venv/lib/python3.10/site-packages/crossbar/bridge/mqtt/wamp.py", line 241, in _when_ready
    self._wamp_session.onOpen(self._wamp_transport)
  File "/home/crossbar/crossbar-venv/lib/python3.10/site-packages/crossbar/router/session.py", line 421, in onOpen
    assert isinstance(transport,
builtins.AssertionError: unexpected router transport type <class 'crossbar.bridge.mqtt.wamp.WampTransport'>

2022-09-08T10:06:50+0200 [Router      17775] WampMQTTServerProtocol.process_connect(packet=Connect(client_id='mqtt-explorer-a16589b3', flags=ConnectFlags(username=False, password=False, will=False, will_retain=False, will_qos=0, clean_session=True, reserved=False), keep_alive=60, will_topic=None, will_message=None, username=None, password=None))
2022-09-08T10:06:50+0200 [Router      17775] Error handling a Connect, dropping connection
2022-09-08T10:06:50+0200 [Router      17775] Unhandled Error
Traceback (most recent call last):
  File "/home/crossbar/crossbar-venv/lib/python3.10/site-packages/crossbar/worker/main.py", line 351, in _run_command_exec_worker
    reactor.run()
  File "/home/crossbar/crossbar-venv/lib/python3.10/site-packages/twisted/internet/base.py", line 1318, in run
    self.mainLoop()
  File "/home/crossbar/crossbar-venv/lib/python3.10/site-packages/twisted/internet/base.py", line 1331, in mainLoop
    reactorBaseSelf.doIteration(t)
  File "/home/crossbar/crossbar-venv/lib/python3.10/site-packages/twisted/internet/epollreactor.py", line 244, in doPoll
    log.callWithLogger(selectable, _drdw, selectable, fd, event)
--- <exception caught here> ---
  File "/home/crossbar/crossbar-venv/lib/python3.10/site-packages/twisted/python/log.py", line 96, in callWithLogger
    return callWithContext({"system": lp}, func, *args, **kw)
  File "/home/crossbar/crossbar-venv/lib/python3.10/site-packages/twisted/python/log.py", line 80, in callWithContext
    return context.call({ILogContext: newCtx}, func, *args, **kw)
  File "/home/crossbar/crossbar-venv/lib/python3.10/site-packages/twisted/python/context.py", line 117, in callWithContext
    return self.currentContext().callWithContext(ctx, func, *args, **kw)
  File "/home/crossbar/crossbar-venv/lib/python3.10/site-packages/twisted/python/context.py", line 82, in callWithContext
    return func(*args, **kw)
  File "/home/crossbar/crossbar-venv/lib/python3.10/site-packages/twisted/internet/posixbase.py", line 696, in _doReadOrWrite
    self._disconnectSelectable(selectable, why, inRead)
  File "/home/crossbar/crossbar-venv/lib/python3.10/site-packages/twisted/internet/posixbase.py", line 300, in _disconnectSelectable
    selectable.connectionLost(f)
  File "/home/crossbar/crossbar-venv/lib/python3.10/site-packages/twisted/internet/tcp.py", line 326, in connectionLost
    protocol.connectionLost(reason)
  File "/home/crossbar/crossbar-venv/lib/python3.10/site-packages/crossbar/bridge/mqtt/wamp.py", line 227, in connectionLost
    self._wamp_session.onMessage(msg)
  File "/home/crossbar/crossbar-venv/lib/python3.10/site-packages/crossbar/router/session.py", line 457, in onMessage
    if not self._pending_session_id:
builtins.AttributeError: 'RouterSession' object has no attribute '_pending_session_id'

=====

This is my transport configuration:

                {
                    "id": "mqtt001",
                    "type": "mqtt",
                    "endpoint": {
                        "type": "tcp",
                        "port": 1883
                    },
                    "options": {
                        "realm": "realm1",
                        "role": "anonymous",
                        "payload_mapping": {
                            "": {
                                "type": "native",
                                "serializer": "json"
                            }
                        },
                        "auth": {
                            "anonymous": {
                                "type": "static",
                                "role": "anonymous"
                            }
                        }
                    }
                }
oberstet commented 1 year ago

ah, right, looks like WampMQTTServerProtocol needs some adjustment to the recent multi-node / clustering (proxy workers, rlinks) changes in crossbar. nothing big, but someone would need to dig into.

richard-pianka commented 1 year ago

has there been any progress on this issue?

richard-pianka commented 1 year ago

apologies for the double comment – this is blocking us from upgrading to newer versions of crossbar. we're willing to pay to have this issue resolved if someone with more expertise is willing to fix it.

/cc @oberstet

oberstet commented 1 year ago

@richard-pianka Hi Richard, sorry for not reacting before, but I'm busy these days with mundane things like making money;) However, as you mention you would pay for fixing this, so I'd be happy to resolve it. Please get in touch with me via email "tobias dot oberstein at gmail dot com".

As mentioned before, I don't expect this to be complex but tricky to fix the right location in the right way. I know the code base inside out, and for fixing the issue, I'd also create proper, automated test cases using your MQTT client library (or any standard one that fits the bill, like we already use in crossbar-examples) and actually running Crossbar.io in multi-process/node clustered configuration (multiple proxy and router worker) - the whole reason for the original change that lead to this regression ...