crossbario / crossbar

Crossbar.io - WAMP application router
https://crossbar.io/
Other
2.05k stars 274 forks source link

rlinks: events not forwarded b/w nodes connected to same instance #1918

Closed asimfarooq5 closed 2 years ago

asimfarooq5 commented 2 years ago

Event fired from instance A not published to instance C, also events from Instance C not published to Instance A.

om26er commented 2 years ago

@asimfarooq5 Can you share the configs that I could use to reproduce that bug

asimfarooq5 commented 2 years ago

INSTANCE A config

{
    "$schema": "https://raw.githubusercontent.com/crossbario/crossbar/master/crossbar.json",
    "version": 2,
    "controller": {
    },
    "workers": [
        {
            "type": "router",
            "realms": [
                {
                    "name": "realm1",
                    "roles": [
                        {
                            "name": "anonymous",
                            "permissions": [
                                {
                                    "uri": "",
                                    "match": "prefix",
                                    "allow": {
                                        "call": true,
                                        "register": true,
                                        "publish": true,
                                        "subscribe": true
                                    },
                                    "disclose": {
                                        "caller": false,
                                        "publisher": false
                                    },
                                    "cache": true
                                }
                            ]
                        },
                        {
                            "name": "router2router",
                            "permissions": [
                                {
                                    "uri": "",
                                    "match": "prefix",
                                    "allow": {
                                        "call": true,
                                        "register": true,
                                        "publish": true,
                                        "subscribe": true
                                    },
                                    "disclose": {
                                        "caller": true,
                                        "publisher": true
                                    },
                                    "cache": true
                                }
                            ]
                        }
                    ],
                    "rlinks": [
                        {
                            "id": "edge",
                            "realm": "realm1",
                            "transport": {
                                "type": "rawsocket",
                                "endpoint": {
                                    "type": "tcp",
                                    "host": "localhost",
                                    "port": 6688
                                },
                                "url": "rs://localhost:6688",
                                "serializer": "cbor"
                            },
                            "forward_local_events": true,
                            "forward_remote_events": true,
                            "forward_local_invocations": false,
                            "forward_remote_invocations": false
                        }
                    ]
                }
            ],          
            "transports": [
                {
                    "type": "rawsocket",
                    "endpoint": {
                        "type": "tcp",
                        "port": 5588,
                        "backlog": 1024
                    },
                    "options": {
                        "max_message_size": 1048576
                    },
                    "serializers": [
                       "cbor",
                       "msgpack",
                       "json"
                   ]            
                },      
                {
                    "type": "websocket",
                    "endpoint": {
                        "type": "tcp",
                        "port": 8080,
                        "backlog": 1024
                    },
                    "serializers": [
                        "cbor", "msgpack", "json"
                    ]
                }
            ]
        }
    ]
}

INSTANCE B config

{
    "$schema": "https://raw.githubusercontent.com/crossbario/crossbar/master/crossbar.json",
    "version": 2,
    "controller": {
    },
    "workers": [
        {
            "type": "router",
            "realms": [
                {
                    "name": "realm1",
                    "roles": [
                        {
                            "name": "anonymous",
                            "permissions": [
                                {
                                    "uri": "",
                                    "match": "prefix",
                                    "allow": {
                                        "call": true,
                                        "register": true,
                                        "publish": true,
                                        "subscribe": true
                                    },
                                    "disclose": {
                                        "caller": true,
                                        "publisher": true
                                    },
                                    "cache": true
                                }
                            ]
                        },
                        {
                            "name": "router2router",
                            "permissions": [
                                {
                                    "uri": "",
                                    "match": "prefix",
                                    "allow": {
                                        "call": true,
                                        "register": true,
                                        "publish": true,
                                        "subscribe": true
                                    },
                                    "disclose": {
                                        "caller": true,
                                        "publisher": true
                                    },
                                    "cache": true
                                }
                            ]
                        }
                    ]

                }
            ],
            "transports": [
                {
                    "type": "rawsocket",
                    "endpoint": {
                        "type": "tcp",
                        "port": 6688,
                        "backlog": 1024
                    },
                    "options": {
                        "max_message_size": 1048576
                    },
                    "serializers": [
                        "cbor",
                        "msgpack",
                        "json"
                    ],
                    "auth": {
                        "cryptosign": {
                            "type": "static",
                            "principals": {
                                "edge": {
                                    "realm": "realm1",
                                    "role": "router2router",
                                    "authorized_keys": [
                                        "3fe2fa16176a7d82214c543286de56e07b6751de6dadd39686ecb5bd68229466"
                                    ]
                                },
                                "cloud2": {
                                    "realm": "realm1",
                                    "role": "router2router",
                                    "authorized_keys": [
                                        "134ccb24bb4bf9229b7521bd6fc52fa71e6a70cdd99155482117f04933aabbe6"
                                    ]
                                }
                            }
                        }
                    }
                },
                {
                    "type": "websocket",
                    "endpoint": {
                        "type": "tcp",
                        "port": 8081,
                        "backlog": 1024
                    },
                    "serializers": [
                        "cbor", "msgpack", "json"
                    ]
                }
            ]
        }
    ]
}

INSTANCE C config

{
    "$schema": "https://raw.githubusercontent.com/crossbario/crossbar/master/crossbar.json",
    "version": 2,
    "controller": {
    },
    "workers": [
        {
            "type": "router",
            "realms": [
                {
                    "name": "realm1",
                    "roles": [
                        {
                            "name": "anonymous",
                            "permissions": [
                                {
                                    "uri": "",
                                    "match": "prefix",
                                    "allow": {
                                        "call": true,
                                        "register": true,
                                        "publish": true,
                                        "subscribe": true
                                    },
                                    "disclose": {
                                        "caller": true,
                                        "publisher": true
                                    },
                                    "cache": true
                                }
                            ]
                        },
                        {
                            "name": "router2router",
                            "permissions": [
                                {
                                    "uri": "",
                                    "match": "prefix",
                                    "allow": {
                                        "call": true,
                                        "register": true,
                                        "publish": true,
                                        "subscribe": true
                                    },
                                    "disclose": {
                                        "caller": true,
                                        "publisher": true
                                    },
                                    "cache": true
                                }
                            ]
                        }
                    ],
                    "rlinks": [
                        {
                            "id": "edge",
                            "realm": "realm1",
                            "transport": {
                                "type": "rawsocket",
                                "endpoint": {
                                    "type": "tcp",
                                    "host": "localhost",
                                    "port": 6688
                                },
                                "url": "rs://localhost:6688",
                                "serializer": "cbor"
                            },
                            "forward_local_events": true,
                            "forward_remote_events": true,
                            "forward_local_invocations": false,
                            "forward_remote_invocations": false
                        }
                    ]
                }
            ],
            "transports": [
                {
                    "type": "rawsocket",
                    "endpoint": {
                        "type": "tcp",
                        "port": 7788,
                        "backlog": 1024
                    },
                    "options": {
                        "max_message_size": 1048576
                    },
                    "serializers": [
                        "cbor",
                        "msgpack",
                        "json"
                    ]
                },
                {
                    "type": "websocket",
                    "endpoint": {
                        "type": "tcp",
                        "port": 8082,
                        "backlog": 1024
                    },
                    "serializers": [
                        "cbor", "msgpack", "json"
                    ]
                }
            ]
        }
    ]
}
om26er commented 2 years ago

It's probably useful to have separate configs for each node. However I am able to reproduce the issue with the provided config as well.

om26er commented 2 years ago

Here is the basic outline of rlink structure that is reported in the issue

INSTANCE A --------> INSTANCE B <-------- INSTANCE C

An event published by a client connected to instance A is correctly received on Instance B but NOT on a client connected to instance C (and vice versa).

@oberstet What are your thoughts on this, do you think it's worked as intended or is that a bug ?

oberstet commented 2 years ago

What are your thoughts on this, do you think it's worked as intended or is that a bug ?

this topology is supposed to work, so sth is not going as intended ..

@om26er this network topology is different from a full mesh, but it is relevant. here is why we want to test both:

  1. a full mesh of nodes (with rlinks between any two nodes in the mesh .. that is, at least one undirectional rlink between two nodes) replicates the topology we want to use for "clustering" - that is HA and scale-out within a data-center (eg within a AWS region spanning AZs in that region)
  2. a tree like topology, which is what @asimfarooq5 created in minimal form (node B is the tree root, nodes A/C are leaf nodes) replicates the topology we want to use for "edge-cloud" - that is a single node or a full mesh of nodes running in the cloud, and edge nodes running on uplinks with NAT/Firewall restrictions

The design of rlinks was created with both of above specific scenarios in mind, though it should generally be able to cover many more scenarios with different topologies.

Finally, but that is still not yet fully implemented, we want to cover scenarios where the topology spans multiple different node operators / owners, aka "federation".


Having said that, the code and design contains the following to make above possible:

  1. knobs forward_local_events/forward_remote_events/forward_local_invocations/forward_remote_invocations to control if an rlink A=>B will replicate local (node A) publications/registrations on the remote side (node B), and vice versa
  2. this is because between two nodes A and B there could be 2 unidirectional rlinks, A=>B and B=>A
  3. every WAMP message for a publish/registration that is forwarded from a node A over an rlink to a remote node B will have "node A" appended to the forward_for list of nodes - this list effectively carries the set and the route a forwarded WAMP message has taken through the rlink-connected network of nodes
oberstet commented 2 years ago

eg REGISTER.options.forward_for is a dict with session, authid and authrole

https://github.com/crossbario/autobahn-python/blob/9de497273c6db50d8697019844f6a7fd3a867789/autobahn/wamp/message.py#L4808 https://github.com/crossbario/autobahn-python/blob/9de497273c6db50d8697019844f6a7fd3a867789/autobahn/wamp/message.py#L4846

and for PUBLISH

https://github.com/crossbario/autobahn-python/blob/9de497273c6db50d8697019844f6a7fd3a867789/autobahn/wamp/message.py#L2517 https://github.com/crossbario/autobahn-python/blob/9de497273c6db50d8697019844f6a7fd3a867789/autobahn/wamp/message.py#L2587

this is then used in CB here: https://gist.github.com/oberstet/d270630c3f635ed27f986d0101a5b817

om26er commented 2 years ago

I investigated into this. It seems the message does arrive, however the router eats it here https://github.com/crossbario/crossbar/blob/c5184c74d9bc23a1f1f51bee72c9b954ea701fef/crossbar/worker/rlink.py#L117

Which causes the "local leg" to not receive the event, removing that return statement fixes the issue.

Thinking a bit more about it, I believe the above condition tries to avoid duplicate forwarding of events among different nodes

om26er commented 2 years ago

@oberstet what are your thoughts on the above ? We obviously don't want redundant event forwarding but at the same time need to support the above topology as well.

oberstet commented 2 years ago

@om26er we should first create an automated test that demonstrates the bug. a failing test can be added in different ways, all these are here https://github.com/crossbario/crossbar/tree/master/test - not all of the stuff in this folder is run as part of the automated CI though (currently) ..

oberstet commented 2 years ago

"the event comes already forwarded from a router node"

I actually don't understand why this is handled in rlink.py, and not in the router files.

forward_for is supposed to work like this:

this should ensure that messages forwarded flood the network, but don't loop back to previous nodes ..

oberstet commented 2 years ago

closing - can't reproduce (no CI test to demonstrate)