Closed cxrodgers closed 3 years ago
Update: I copied an earlier line of code from gonogo.py, the line that initializes the child. This uses key='CHILD'. https://github.com/wehr-lab/autopilot/blob/dc9833ae827301219d50620fa8e1b22266bb222d/autopilot/tasks/gonogo.py#L126
This message is received by the child, and I can encode arbitrary information in the "value" field, which is great! However, the received key
is always set to "START" by something (the Station?), and I need to be able to set it to something in listens
so that I can call a function.
This is the received message:
DEBUG:core.networking.Net_Node._rpi02:RECEIVED: ID: rpi02_4; TO: _rpi02; SENDER: rpi02; KEY: START; FLAGS: {}; VALUE: {'foo': 'bar', 'subject': 'testmouse', 'inner_key': 'WAIT'}
Note that the KEY is START. I tried specifying an "inner_key" in VALUE but I don't think this did anything. What I would like to do is set the KEY to "wait", and attach that to a handle in listens. But if I set the KEY to anything other than "CHILD", the message isn't received properly.
# Three possible mesages from the parent
self.listens = {
'WAIT': self.l_wait,
'PLAY': self.l_play,
'STOP': self.l_stop,
}
perfect timing. just got finished with another project and am turning to autopilot to fix a few bugs & wrap up 0.4.0.
tl;dr:
'KEY'
item to the value
of the message, the pilot station will use that as the key per https://github.com/wehr-lab/autopilot/blob/dc9833ae827301219d50620fa8e1b22266bb222d/autopilot/core/networking.py#L1263long version:
yes it is high time to re-architect the networking modules. i have this sketched in as v0.5.0, which will be to a) make a unitary inheritance system so that b) everything can be given a unique id so that c) it's a lot easier to send messages to arbitrary recipients, eg. being able to do to='agent_id.object_id'
.
Originally, the idea was to have a treelike network structure, where everything has a well-defined 'parent' except the apex node (typically the terminal) which only has children. The idea was that it would make network topologies easier to keep track of/make routing easier, but that almost immediately proved to be wrong and very limiting, so there are a number of hacks to send messages directly, and having them all sort-of work but have completely unclear logic is probably the worst of all possible worlds.
So the way it works is that net nodes all send messages 'through' a station object, which runs in a separate process and handles things like repeating dropped messages so they don't clog up the main agent process (i also think this should be rebuilt...). so there's an attempt at abstraction around the zmq frames that are built that, for net nodes, defaults to "actually" sending the message to the parent station object and the intended recipient is set in the to
field in a serialized Message
object, see https://github.com/wehr-lab/autopilot/blob/dc9833ae827301219d50620fa8e1b22266bb222d/autopilot/core/networking.py#L1579-L1582
'multihop' messages (ie. to
is a list) were sort of half-implemented, where they should be formatted as zmq frames as
[hop_0, hop_1, ..., hop_n, final_recipient, serialized_message]
but net nodes don't respect that, and stations don't either. i'll work on fixing that now.
The second question is related, where when trying to send to another agent it's ambiguous whether it should go 'up the tree' or 'down the tree' (though it works if the 'downstream' node has already connected to the agent in question so it's in the senders
dictionary, that's just very brittle), so the 'CHILD'
key is used so that the message is explicitly sent to the child, which has obvious problems of both elegance and practicality. The problem here is zmq related: there's a basic asymmetry between zmq.ROUTER
and zmq.DEALER
sockets. ROUTER
sockets are bound
to a particular ip and port that they can receive messages at from any socket that connects to them, but DEALER
sockets connect
to a port bound
by another zmq socket, so they can send and receive messages only from the connected socket. Net_Node
objects use DEALER
sockets because the assumption was that we'll be sending messages to the Station
object that has a ROUTER
socket.
So that's all very messy and i'll fix it along with ^^ so that you can multihop from the pilot to the specific object you're intending to control.
For 0.5.0 i'll be changing the logic so that net_nodes always bind a port with a router, and just search for any unbound port and then notify the rest of the swarm that it exists and how to contact it, etc. then it's actually not a huge deal/computationally expensive to just open a bunch of dealer sockets on the fly to anyone we want to communicate with
Wonderful, thanks for the explanation!
Concerning your workaround using L1263, I modified the message to be as follows:
self.node.send(
to=prefs.get('NAME'),
key='CHILD',
value={'foo': 'bar', 'subject': self.subject, 'KEY': 'WAIT'},
)
However I then get this error
Traceback (most recent call last):
File "/usr/lib/python3.7/threading.py", line 917, in _bootstrap_inner
self.run()
File "/usr/lib/python3.7/threading.py", line 865, in run
self._target(*self._args, **self._kwargs)
File "/home/pi/dev/autopilot/autopilot/core/networking.py", line 1264, in l_child
KEY = msg.value['keys']
KeyError: 'keys'
I suspect it's because on Lines 1263-4 you have
if 'KEY' in msg.value.keys():
KEY = msg.value['keys']
But you might have meant:
if 'KEY' in msg.value.keys():
KEY = msg.value['KEY']
In any case, if I also add a key called "keys" with the same value as the key called "KEY":
self.node.send(
to=prefs.get('NAME'),
key='CHILD',
value={'foo': 'bar', 'subject': self.subject, 'KEY': 'WAIT', 'keys': 'WAIT'},
)
I get a little bit further, but then this happens:
ERROR:core.networking_02.Pilot_Station.Pilot_Station-3:ERROR: No function could be found for msg id rpi01_9 with key: WAIT
Traceback (most recent call last):
File "/home/pi/dev/autopilot/autopilot/core/networking.py", line 597, in handle_listen
listen_funk = self.listens[msg.key]
KeyError: 'WAIT'
That Line 597 is in class Station, so I think the Station is looking for 'WAIT' in its own listens
, but it should be propagating that WAIT
to the Net_Node. Or something like that. I'll keep thinking about it.
And sorry, but a follow-up question: will this workaround of using key='CHILD'
scale up to the case of a Pilot having multiple Children? If the message is broadcast to all Children, I can just encode the intended target Child in the message payload. But if it is only sent to the first Child, or something like that, then I won't be able to contact the others. If you don't know the answer right away, don't worry, I'll give it a shot soon and see what happens :)
good lord that is sloppy, sorry about that. i'll clean that key mismatch up as well.
that key error is because a station will try to call a listen if the to
field matches its id, cleaning this up as well so that the to
in the message is always the intended recipient and all the hops are only in the zmq message. a quick and dirty fix would be to try to forward the message on in the case of a KeyError
(aka like self.send(msg=msg)
.
started working on this in hotfix
branch, but have to go cook dinner and don't know if i'll have time afterwards. in any case will have this done by tomorrow!
ty for bearing through this atrocious code, autopilot is striated where the first layer was me learning how to program and now all that has to be replaced now that i actually know how things sort of work... hardware got reinvented, next is the plugin system, then networking... and so on
have a draft of direct messaging between nodes here: https://github.com/wehr-lab/autopilot/commit/b000504b76f6dad9697e99e8d46b20b1a74181d9
i'm going to write tests (novel, i know) for the changes and do some godforsaken manual testing to make sure the changes weren't breaking, but it is a simplifying change.
Net_Node
s can be given a router_port
on init, and if gotten will bind a ROUTER
socket to receive messages from networking objects other than the Station
that they are 'behind'. Should handle messages the same way as usual, by calling a listen
function with a matching keyNet_Node
rather than just a Station
object. So by default the connections are 'many input, one output'.to
field is in the senders
dict (automatically added on receiving a message from another node), then the router is used, otherwise the previous dealer-only approach is used. node.router.connect('tcp://ip.address:port')
and as long as both of their identities are set (should be) and unique (idk), and you add the bytes
encoded id of the recipient to the senders
dict (dict so lookups are O(1)) then it should workthe Station
objects also had/have a huge amount of extraneous logic in them that I bought for the low, low price of "not having a clear messaging architecture when they were first implemented" and I tried to clean some of that out, so there might be breaking changes that i'll work out over the course of this afternoon, shouldn't take longer than today or tomorrow to finish though.
(oh yeah and i also split up the networking.py file into its own module, as is long overdue for everything in the core
module)
Wow, thanks for all the hard work! I hope it goes without saying, there's nothing urgent about my request, feel free to prioritize as you see fit.
I didn't fully understand the previous networking model, so I'm not really following your described changes, but when I go back to lab tomorrow I'll work through my use case and I think I'll be able to figure it out, using the info you've provided here. Or, since you're actively developing, I could also wait for the dust to settle a bit. Let me know if there's a particularly good time for me to alpha test some stuff. thanks!
trust me no one understood the previous/current networking model ;) especially looking at it now, really got away from me at some point.
the goal here will be to get the networking modules to a state where they do what you need them to do (and I need them to do for another project) for now, so i'll try to get direct sending between nodes tested and write an example notebook, and then there will be significantly more dust to settle for 0.5.0.
between those points the API will change substantially, but i'm getting better at writing docs for upgrading so the breaking changes will be gentle and no-one will look back reminiscently at the current way.
Ok, I've pulled all the new changes from the hotfix
branch. I think I just need one last tip.
Here's how I initialize Net_Node on the parent Pilot. Do I need to specify router_port?
self.node = Net_Node(id="T_{}".format(prefs.get('NAME')),
upstream=prefs.get('NAME'),
port=prefs.get('MSGPORT'),
listens={},
instance=False)
By the way, note I had to set instance
to False or it crashes with a threading error.
And here's how I initialize Net_Node on the Child.
self.listens = {
'WAIT': self.l_wait,
'PLAY': self.l_play,
'STOP': self.l_stop,
}
self.node = Net_Node('child_pi',
upstream=prefs.get('NAME'),
port=prefs.get('MSGPORT'),
listens=self.listens,
instance=False,
)
What is the correct format for the message send? I've been doing this, from the parent:
self.node.send(
to='child_pi',
key='WAIT',
value={'foo': 'bar'},
)
I think it almost works. On the parent Pilot, I see
DEBUG:networking.station_02.Pilot_Station.Pilot_Station-3:FORWARDING (dealer): [b'T_rpi01', b'rpi01', b'child_pi', b'{"flags": {}, "timestamp": "2021-05-26T15:03:48.140249", "ttl": 5, "sender": "T_rpi01", "to": "child_pi", "key": "WAIT", "value": {"foo": "bar"}, "id": "T_rpi01_1"}']
But I don't see anything receive on the child Pi, and it does not enter the self.l_wait()
function.
I guess I'm confused about:
upstream
of a Net_Node just supposed to be prefs.get('NAME'), ie, its own Station? Or is the upstream supposed to be the parent Pi?In this case think of each of these nodes as independent of the pilot, nodes you're just using for the task, separate from prefs like MSGPORT
which are used by the pilot and terminal's station objects.
The connections are still asymmetrical, where
upstream
, port
, upstream_ip
, all configure a (dealer) socket that can initiate a connection to a single, already-bound other (router) socketrouter_port
binds a (router) socket that can be connected to by any number of other (dealer) sockets, but the other sockets have to initiate the connection first.So the order is important:
router_port
, which claims the port and allows other nodes to connect to it.router_port
, where upstream
== the id, port
== router_port
, upstream_ip
== ip of the parent node ('localhost'
is default, if on the same ip). Specifically for this version, we also need to send a message so the net node object is aware of the connection.router_port
, then the parent can send messages back using that same port -- but it needs to have the child node connect to it first, it can't initiate the connection.Usually this doesn't matter all that much because zmq is smart and will usually send any messages after the child connects anyway.
To answer your questions specifically:
router_port
tells the node to bind a new port to receive messages at, it should be different than prefs.get('MSGPORT')
(you should get an error because that port should already be bound by the Station
object). upstream
is rapidly becoming a bad misnomer, it just means the id
of the node that it connects to with its dealer socket.The direction I want to move towards is (hopefully) a much clearer system where every object has a unique ID where you can send messages just to 'agent_id.object_id'
and everything else is hidden, so you don't have to mess around manually with ports and ips and etc. this is sort of an intermediate form, where nodes can directly connect to each other without needing to bother with the cumbersome and slow node -> station -> station -> node pattern but you still have to juggle a few IPs and ports. I would recommend maybe adding them as parameters to the task class so you can set them durably without hardcoding.
i'll be sure to include a worked example when i pull this back after testing multihop routing
so a quick example slightly modified from the test is
from autopilot.networking import Net_Node
def hello(value):
print(f'hello, {value}')
node_1 = Net_Node(
id='a',
upstream='', port=5000,
router_port = 5001,
listens = {'HELLO': hello},
instance = False
)
node_2 = Net_Node(
id='b',
upstream='a',
port=5001,
upstream_ip='localhost', # or whatever
listens = {'HELLO': hello},
instance = False
)
node_2.send('a', 'HELLO', 'my name is b')
# node_1 should get the message hopefully,
# and now node_1 can send messages back
# you might need some short sleep to establish the connection if
# you're literally doing them sequentially
node_1.send('b', 'HELLO', 'b, my name is a')
It works!! I basically used your example, but the "upstream_ip" is the IP of the parent Pi. I also added a little waiting loop on the parent (which is upstream), so that it didn't begin the task until it had been contacted by each Child. Otherwise, the child's first message to the parent might arrive too late, after the parent had already tried to send a message about the first trial.
In this example, is any traffic actually going over port 5000? Or is port=5000
just specified because port
is a required argument? I see that no upstream
is needed for node_1, presumably because node_1 is itself the upstream
in this application.
Also, when I have multiple Child Pis all connecting to the same Parent, would you recommend using the same Net_Node on the Parent? Alternatively I could have a separate Net_Node to communicate with each Child. For simplicity, I think I'd rather use just the one, as long as it's okay for multiple children to be connecting to the same parent Net_Node on the same router_port.
yesssss glad it works :) feels like magic every time the networking modules actually get a message through
port=5000
just specified because port is a required argument, before the requirement made sense because all Net_Nodes were expected to be 'downstream' of some Station that they sent all messages to, but seems like that can be relaxed.
Yes! totally fine to connect multiple DEALER
sockets to one ROUTER
socket -- that's the expected pattern i think. After they've sent a message to the parent pi, the send
method should automatically use the router to send messages addressed back to the connected children. Currently Net_Node
s only get the value
field of the message, so you wouldn't necessarily be able to tell which pi was sending the message unless you put that in the value
field. I have thought for awhile that doesn't make a lot of sense, and that all listen types across stations and nodes should just receive the whole message.
multihop messaging appears to work as well:
https://travis-ci.com/github/wehr-lab/autopilot/builds/227201171 https://github.com/wehr-lab/autopilot/commit/58c18140f474e7d2560deb8f6dcd74e840534b93
see the test here for an example: https://github.com/wehr-lab/autopilot/blob/58c18140f474e7d2560deb8f6dcd74e840534b93/tests/test_networking.py#L107
so there, for a series of networking objects node_1 -> station_1 -> station_2 -> station_3 -> node_3
, the message is correctly forwarded when sent like this
node_1.send(to=['station_1', 'station_2', 'station_3', 'node_3'],
key='GOTIT', value=0)
note that Net_Node
s do not forward messages, they are expected to be the terminal senders/recipients, hence the node to node pattern in previous messages. (again, in the future, my hope is that everything will be p2p <3)
I'm going to do a few more manual tests with the rigs/tasks in our lab just to make qualitatively sure (thank you lack of unit testing, working on it) nothing breaks and then i'll close this and pull back to dev. thanks for raising this :)
alright i'm relatively convinced that the networking fix doesn't break anything. merging now!
Background: I am trying to control 8 speakers. Current approach is to use one Pilot and three Child, each controlling two speakers.
Right now, I'm just trying to connect one Child to the Pilot. I'm using
tasks.gonogo
as an example. However, I think it may (?) have a bug in it. https://github.com/wehr-lab/autopilot/blob/dc9833ae827301219d50620fa8e1b22266bb222d/autopilot/tasks/gonogo.py#L170Note that the
to
argument is a list. The documentation of Net_Node.send says this is acceptable. But it crashes here: https://github.com/wehr-lab/autopilot/blob/dc9833ae827301219d50620fa8e1b22266bb222d/autopilot/core/networking.py#L1582 when it tries to convert the list usingbytes
.What is the correct syntax for sending messages to a Child? I tried using the name of the child (e.g., "rpi02") as well as "T_rpi02" and the name of the Child task's Net_Node. Then the message sends okay but it doesn't get received. Although this might be a networking problem, I can debug further if I understand better what the
to
field should be. Thanks!