OpenCyphal / pycyphal

Python implementation of the Cyphal protocol stack.
https://pycyphal.readthedocs.io/
MIT License
117 stars 105 forks source link

[WIP] Moving Cyphal/UDP to multicast #253

Closed maksimdrachov closed 1 year ago

maksimdrachov commented 1 year ago

This MR is based on this proposol discussed in the OpenCyphal forum.

In short, the changes can be broken down into three pieces:

1. Datagram header format

Current:

uint8 version = 0
uint8 priority
void16 
uint32 frame_index_eot
uint64 transfer_id
void64

Proposal:

uint8 version = 1   # UPDATE
uint8 priority
uint16 source_node_id   # NEW
uint32 frame_index_eot
uint64 transfer_id
void64

Note: version will be bumped to 1, however no backward-compatibility changes are made (since the protocol is still in development).

2. Message

Current:

    fixed         reserved
   (9 bits)       (3 bits)
   ________          _
  /        \        / \
  11101111.0ddddddd.000sssss.ssssssss
  \__/      \_____/    \____________/
(4 bits)    (7 bits)      (13 bits)
  IPv4      subnet-ID     subject-ID
multicast   \_______________________/
 prefix             (23 bits)
            collision-free multicast
               addressing limit of
              Ethernet MAC for IPv4

Proposal:

    fixed   message  reserved
   (9 bits) select.  (3 bits)
   ________   res.|  _
  /        \     vv / \
  11101111.0ddddd00.000sssss.ssssssss
  \__/      \___/      \____________/
(4 bits)   (5 bits)       (13 bits)
  IPv4     subnet-ID      subject-ID
multicast   \_______________________/
 prefix             (23 bits)
            collision-free multicast
               addressing limit of
              Ethernet MAC for IPv4

3. Service

Current: regular unicast

Proposal:

    fixed          service
   (9 bits)  res.  selector
   ________      ||
  /        \     vv
  11101111.0ddddd01.nnnnnnnn.nnnnnnnn
  \__/      \___/   \_______________/
(4 bits)   (5 bits)     (16 bits)
  IPv4     subnet-ID     node-ID
multicast   \_______________________/
 prefix             (23 bits)
            collision-free multicast
               addressing limit of
              Ethernet MAC for IPv4

TODO

maksimdrachov commented 1 year ago

Question: Is anonymous listening possible or not? There seems to be some contradiction:

pycyphal/transport/udp/__init__.py:20:

The concept of anonymous transfer is not defined for Cyphal/UDP

vs

pycyphal/transport/udp/__init__.py:302

>>> tr_1 = pycyphal.transport.udp.UDPTransport('127.9.15.254', local_node_id=None)  # Anonymous is only for listening.
maksimdrachov commented 1 year ago

Question: Does it make sense to move source_node_id to class Frame (defined in pycyphal/transport/commons/high_overhead_transport/_frame.py)?

pavel-kirienko commented 1 year ago

Is anonymous listening possible or not? There seems to be some contradiction:

There is no contradiction: "The concept of anonymous transfer is not defined for Cyphal/UDP" means that you can't send anonymous transfers, but you can still create anonymous nodes that cannot communicate (they can only listen).

Does it make sense to move source_node_id to class Frame ( defined inpycyphal/transport/commons/high_overhead_transport/_frame.py)?

Probably not (yet) because you would have to update other transports dependent on this class.

maksimdrachov commented 1 year ago

Ok, thanks.

"Datagram" still needs some work, will ping you when a review is needed.

maksimdrachov commented 1 year ago

Does the passing of the local_node_id as source_node_id look right? (see last commit)

maksimdrachov commented 1 year ago

I have updated the unit test, to take into account source_node_id

Functional tests (run pytest from pycyphal/transport/udp):

Screenshot 2022-10-15 at 15 21 56

Integration tests (pytest -k udp):

Screenshot 2022-10-15 at 15 21 29

Can you review these changes? I think the first step of implementing changes related to "Datagram" is done.

pavel-kirienko commented 1 year ago

The changes look good to me. Later on, we may want to enable anonymous transfers for UDP as I wrote on the forum; it should be a cheap change to introduce. The relevant part of the source code can be found by searching for "In Cyphal/UDP, the anonymous mode is somewhat bolted-on."

The text and diagrams in the module docstring at _udp.py will also need to be updated, perhaps later.

Nice work so far ;)

maksimdrachov commented 1 year ago

Later on, we may want to enable anonymous transfers for UDP as I wrote on the forum; it should be a cheap change to introduce.

The text and diagrams in the module docstring at _udp.py will also need to be updated, perhaps later.

Added to todo-list 👍

maksimdrachov commented 1 year ago

@pavel-kirienko

Could you review these changes? I just want to make sure that this part is correct (pycyphal/transport/udp/_ip) before I start updating the larger code base/unit tests.

Also some questions:

  1. service_data_specifier_to_multicast_group and message_data_specifier_to_multicast_group are pretty similar; so just combine them right? Instead of 4 functions in _endpoint_mapping.py, this would result in 2. (Equating MESSAGE_ID and SERVICE_ID into one variable NODE_ID would also help in this regard.)

  2. I renamed most of the constants used in _endpoint_mapping.py, let me know if this change make sense.

pavel-kirienko commented 1 year ago

Nice progress but there seems to be a problem with subject-/service-/node-ID mixup. Please give another look at section 4.1.1 "Transport model" of the Specification.


To publish a message on subject S, we send a multicast datagram to the multicast group whose address is computed as:

image

And the destination UDP port is set to 16383.


To send a request or response on service X to node N, we send a multicast datagram to the multicast group whose address is computed as:

image

And the destination UDP port is set to (16384 + X * 2 + (is_response)).

pavel-kirienko commented 1 year ago

Could you please update your branch to sync up with master?

maksimdrachov commented 1 year ago

Could you confirm the changes are correct? I think I have addressed the issues.

maksimdrachov commented 1 year ago

Regarding pycyphal/transport/udp/_ip/_v4.py:

class IPv4SocketFactory(SocketFactory):

  def __init__(self, domain_id: int)

  def make_output_socket(
    self, remote_node_id: typing.Optional[int], data_specifier: pycyphal.transport.DataSpecifier
  ) -> socket.socket:
    # General setup
    s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
    s.setblocking(False)
    s.bind((str(self._local), 0))  # QUESTION: What local IP address bind to? (Does it need to be bound at all?)

    # Message
    remote_ip = message_data_specifier_to_multicast_group(self._domain_id, data_specifier)
    remote_port = SUBJECT_PORT

    # Service
    remote_ip = service_data_specifier_to_multicast_group(self._domain_id, remote_node_id, data_specifier)
    remote_port = service_data_specifier_to_udp_port(data_specifier)

    # Connect
    s.connect((str(remote_ip), remote_port))

  def make_input_socket(
    self, remote_node_id: typing.Optional[int], data_specifier: pycyphal.transport.DataSpecifier # CHANGE: need remote_node_id for service
    ) -> socket.socket:
    # General setup
    s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
    s.setblocking(False)

    # Message
    multicast_ip = message_data_specifier_to_multicast_group(self._domain_id, data_specifier)
    multicast_port = SUBJECT_PORT

    # Service
    multicast_ip = service_data_specifier_to_multicast_group(self._domain_id_, remote_node_id, data_specifier)
    multicast_port = service_data_specifier_to_udp_port(data_specifier)

    # Bind
    s.bind((str(multicast_ip), multicast_port))

Look right?

maksimdrachov commented 1 year ago

@pavel-kirienko

I think it's almost finished.

The last change that needs to be addressed, is mainly related to udp/_socket_reader.py.

class SocketReader contains the following note:

This class is the solution to the UDP demultiplexing problem. The objective is to read data from the supplied socket, parse it, and then forward it to interested listeners.

Why can't we ask the operating system to do this for us? Because there is no portable way of doing this(except for multicast sockets). Even on GNU/Linux, there is a risk of race conditions, but I'll spare you the details. Those who care may read this: https://stackoverflow.com/a/54156768/1007777.

This seems to suggest that some significant overhaul/simplification can be done here. Can you give some pointers?

Some smaller questions:

  1. pycyphal/transport/udp/_tracer.py:160:
if ip_destination.is_multicast:
    if udp_packet.destination_port == SUBJECT_PORT:
        # Message packet
        dst_nid = None  # Broadcast
        data_spec = multicast_group_to_message_data_specifier(ip_source, ip_destination)
    else:
        # Service packet
        data_spec = udp_port_to_service_data_specifier(udp_packet.destination_port)
        # QUESTION: Correct to use DOMAIN_ID_MASK here? (or make a seperate function for this?)
        domain_id = (int(ip_destination)&DOMAIN_ID_MASK)>>18
        dst_nid = service_multicast_group_to_node_id(domain_id, ip_destination)
  1. tests/transport/udp/ip/v4.py:53/81:
msg_i = fac.make_input_socket(None, MessageDataSpecifier(612))
test_msg_o.sendto(b"Seagull", ("239.52.2.100", SUBJECT_PORT))
time.sleep(1) ##QUESTION: BlockingIOError: [Errno 35] Resource temporarily unavailable
rx = msg_i.recvfrom(1024)
assert rx[0] == b"Seagull"
assert rx[1][0] == "127.0.0.1"  # Same address we just bound to.

This extra sleep(1) wasn't necessary before, however after some changes it started failing with the above error. Any remarks on this?

pavel-kirienko commented 1 year ago

This seems to suggest that some significant overhaul/simplification can be done here. Can you give some pointers?

Yes, indeed, as you have correctly guessed, it should be possible to get rid of the SocketReader or at least simplify it. There may be an arbitrary number of sockets connected to the same multicast endpoint and the OS should perform demultiplexing correctly for you. Maybe for now, in the interest of minimizing the scope of this changeset, we should keep the socket reader in place and consider removing it later.

QUESTION: Correct to use DOMAIN_ID_MASK here? (or make a seperate function for this?)

I am not sure what are you going to use the domain-ID here for? It doesn't seem to be needed to parse the frame unless I am missing something.

This extra sleep(1) wasn't necessary before, however after some changes it started failing with the above error. Any remarks on this?

The kernel works in mysterious ways, what else can I say? Since this is just a test, it is fine to simply keep the sleep in place.

pavel-kirienko commented 1 year ago

There is some discussion on the forum that you should be aware of: https://forum.opencyphal.org/t/cyphal-udp-architectural-issues-caused-by-the-dependency-between-the-nodes-ip-address-and-its-identity/1765/41

There's nothing major but it seems like we'll have to shuffle some bits around the header and the IP address, start using one common UDP port number for all traffic and discriminate services based on a dedicated service-ID field in the header instead of UDP ports, and also possibly add a header checksum. All of these changes seem quite minor in comparison to what you've already implemented here.

maksimdrachov commented 1 year ago

@pavel-kirienko

First I re-wrote pycyphal/transport/udp/_socket_reader.py like this:

def _dispatch_frame(
    self, timestamp: Timestamp, source_ip_address: _IPAddress, frame: typing.Optional[UDPFrame]
) -> None:
    # Do not accept datagrams emitted by the local node itself. Do not update the statistics either.
    external = self._anonymous or (source_ip_address != self._local_ip_address)
    if not external:
        return

    # Process the datagram. This is where the actual demultiplexing takes place.
    # The node-ID mapper will return None for datagrams coming from outside of our Cyphal subnet.
    handled = False
    source_node_id = None
    if frame is not None:
        # if source_ip_address is part of our Cyphal subnet
        if (DOMAIN_ID_MASK & int(source_ip_address)) == (DOMAIN_ID_MASK & int(self._local_ip_address)):
            source_node_id = frame.source_node_id
        # if source_ip_address is not part of our Cyphal subnet, source_node_id is None
        else:
            source_node_id = None

Now I'm starting to suspect that this is not how it's meant to be. Instead it should be:

def __init__(
    self,
    sock: socket.socket,
    local_ip_address: _IPAddress,
    domain_id: int,
    anonymous: bool,
    statistics: SocketReaderStatistics,
):
  self._domain_id = domain_id
def _dispatch_frame(
    self, timestamp: Timestamp, source_ip_address: _IPAddress, frame: typing.Optional[UDPFrame]
) -> None:
    # Do not accept datagrams emitted by the local node itself. Do not update the statistics either.
    external = self._anonymous or (source_ip_address != self._local_ip_address)
    if not external:
        return

    # Process the datagram. This is where the actual demultiplexing takes place.
    # The node-ID mapper will return None for datagrams coming from outside of our Cyphal subnet.
    handled = False
    source_node_id = None
    if frame is not None:
        # if source_ip_address is part of our Cyphal subnet
        if self._domain_id == (DOMAIN_ID_MASK & int(source_ip_address)):
            source_node_id = frame.source_node_id
        # if source_ip_address is not part of our Cyphal subnet, source_node_id is None
        else:
            source_node_id = None

I'm not sure if source_ip_address is the multicast address here?

Note to self: replace subnet with domain_id, to avoid further confusion.

pavel-kirienko commented 1 year ago

Now I'm starting to suspect that this is not how it's meant to be

You are correct in suspecting this!

I'm not sure if source_ip_address is the multicast address here?

In this new design, unicast IP addresses are no longer relevant at all. Any node can operate on any domain-ID with any node-ID regardless of its identity on the IP layer. Parameters like the source_ip_address should no longer be used anywhere except the socket factory (where it is needed only to initialize the socket correctly, this is done once per socket). The socket reader now only needs to read the datagram and pass it along regardless of the source address or the local IP address.

maksimdrachov commented 1 year ago

@pavel-kirienko

Can you check this _socket_reader implementation?

Main changes:

New

accepted_datagrams: typing.Dict[typing.Optional[int], int] = dataclasses.field(default_factory=dict) dropped_datagrams: typing.Dict[typing.Optional[int], int] = dataclasses.field(default_factory=dict)

Keys:

None: Invalid node-ID (for dropped_datagrams)

None: anonymous frame (for accepted_datagrams)

Int: node-ID



Concerning the keys: use `None` for anonymous frames? (currently `0xffff` but easy fix)

(Ignore the `QUESTION`s in the code.)

Unit tests `_unittest_socket_reader`and `_unittest_socket_reader_endpoint_reuse` are running successfully, but need some additional changes to better reflect the new class structure.
pavel-kirienko commented 1 year ago

I suspect that this updated socket reader is going to incite false node-ID collision warnings from the application layer because if a node both publishes and subscribes to some subject, it will receive its own publications. There should be some check in place that would discard incoming traffic where the source node-ID equals the local node-ID, I think.

Concerning the keys: use 0xffff for anonymous frames or None?

None. We need to keep in mind the difference between the high-layer concept (which is the lack of a node-ID) and its manifestation on the wire (which is 0xFFFF).

maksimdrachov commented 1 year ago

@pavel-kirienko

I'm having this issue in _unittest_udp_transport_ipv4:

I'm trying to send a service request from client_requester to server_listener.

At first I defined server_listener as follows:

server_listener = tr.get_input_session(
    InputSessionSpecifier(ServiceDataSpecifier(444, ServiceDataSpecifier.Role.REQUEST), None), meta # remote_node_id is None
)

This makes sense from the perspective that a service input session needs to accept calls from all nodes.

However, if the test fails because there is no subscription to the multicast address 239.53.0.111, therefore client_requester is not receiving any of the sent frames.

# from pycyphal/transport/udp/_ip/_v4.py:
elif isinstance(data_specifier, ServiceDataSpecifier):
    multicast_ip = service_data_specifier_to_multicast_group(self._domain_id, remote_node_id)
    multicast_port = service_data_specifier_to_udp_port(data_specifier)
    if sys.platform.startswith("linux") or sys.platform.startswith("darwin"):

Then I though, maybe it makes sense for server_listener to be defined as follows:

server_listener = tr.get_input_session(
    InputSessionSpecifier(ServiceDataSpecifier(444, ServiceDataSpecifier.Role.REQUEST), 111), meta # remote_node_id is 111
)

111 is its own node_id, so it is basically subscribing itself to the multicast address where the frames are sent.

Then it does receive the frames, however they are discarded since the source_node_id (222 in this case) does not match the subscribed node_id 111:

# from pycyphal/transport/udp/_socket_reader.py
# Handle non-anonymous frames
if source_node_id is not None:
    # Each frame is sent to the promiscuous listener (None) and to the selective listener (source_node_id).
    # We parse the frame before invoking the listener in order to avoid the double parsing workload.
    for key in (None, source_node_id):
        try:
            callback = self._listeners[key]
        except LookupError:
            pass
        else:
            handled = True
            try:
                callback(timestamp, frame)
            except Exception as ex:  # pragma: no cover
                _logger.exception("%r: Unhandled exception in the listener for node-ID %r: %s", self, key, ex)

I'm prone to thinking the first approach is the right one. However, I'm not sure how to setup this promiscuous multicast address correctly, so it receives all the frames that are sent.

The relevant part of _v4.py:

if isinstance(data_specifier, MessageDataSpecifier):
    multicast_ip = message_data_specifier_to_multicast_group(self._domain_id, data_specifier)
    multicast_port = SUBJECT_PORT
    if sys.platform.startswith("linux") or sys.platform.startswith("darwin"):
        # Binding to the multicast group address is necessary on GNU/Linux: https://habr.com/ru/post/141021/
        s.bind((str(multicast_ip), multicast_port))
    else:
        # Binding to a multicast address is not allowed on Windows, and it is not necessary there. Error is:
        #   OSError: [WinError 10049] The requested address is not valid in its context
        s.bind(("", multicast_port))
    try:
        # Note that using INADDR_ANY in IP_ADD_MEMBERSHIP doesn't actually mean "any",
        # it means "choose one automatically"; see https://tldp.org/HOWTO/Multicast-HOWTO-6.html
        # This is why we have to specify the interface explicitly here.
        s.setsockopt(
            socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, multicast_ip.packed + self._local_ip_addr.packed
        )
    except OSError as ex:
        s.close()
        if ex.errno in (errno.EADDRNOTAVAIL, errno.ENODEV):
            raise InvalidMediaConfigurationError(
                f"Could not register multicast group membership {multicast_ip} via {self._local_ip_addr} using {s} "
                f"[{errno.errorcode[ex.errno]}]"
            ) from None
        raise  # pragma: no cover
elif isinstance(data_specifier, ServiceDataSpecifier):
    multicast_ip = service_data_specifier_to_multicast_group(self._domain_id, remote_node_id)
    multicast_port = service_data_specifier_to_udp_port(data_specifier)
    if sys.platform.startswith("linux") or sys.platform.startswith("darwin"):
        s.bind((str(multicast_ip), multicast_port))
    else:
        s.bind(("", multicast_port))
    try:
        s.setsockopt(
            socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, multicast_ip.packed + self._local_ip_addr.packed
        )
    except OSError as ex:
        s.close()
        if ex.errno in (errno.EADDRNOTAVAIL, errno.ENODEV):
            raise InvalidMediaConfigurationError(
                f"Could not register multicast group membership {multicast_ip} via {self._local_ip_addr} using {s} "
                f"[{errno.errorcode[ex.errno]}]"
            ) from None
        raise  # pragma: no cover
pavel-kirienko commented 1 year ago

@maksimdrachov

I'm prone to thinking the first approach is the right one.

This is correct.

However, I'm not sure how to setup this promiscuous multicast address correctly, so it receives all the frames that are sent.

The socket factory is not involved in the distinction between selective and promiscuous sessions: regardless of the mode, the socket would be configured identically. The distinction matters here where the socket reader is configured:

https://github.com/OpenCyphal/pycyphal/blob/8273967696d9c250264a252399681ae8d725b962/pycyphal/transport/udp/_udp.py#L340

Observe the logic: when you request a new input session, the UDP transport checks if there's a socket available for this endpoint already (perhaps in a different mode; e.g. there could be a selective session while you want to set up a new promiscuous session with the same data specifier). If no such endpoint is found, a new socket reader is created, otherwise the existing socket reader is used to add a new listener to it.

What you described suggests that the socket factory is operating properly but the socket reader is, apparently, somehow mishandling promiscuous listeners. I would check that.

pavel-kirienko commented 1 year ago

Looking at the latest proposals from @thirtytwobits I am actually thinking if it would be easier to modify the input logic such that there is always exactly one socket dedicated to incoming service transfers (unless the transport is configured in the anonymous mode, in which case it wouldn't be possible to receive service transfers anyway so no socket is needed). Then you add some kind of basic dispatcher that receives frames from that socket and forwards them to the appropriate listener, much like the SocketReader currently does but the listener would be identified based not only on the remote address but also on the service-ID.

Next, considering that selective listeners aren't really used with subjects --- all subjects are promiscuous (there may be some support for selective subject listeners but this is not really part of the spec so not required) --- we can apply the next obvious simplification which is to remove the SocketReader completely and simply keep one socket per UDPInputSession of a subject (we may need a new specialization for that, like UDPSubjectInputSession, because the existing subclasses do own sockets).

image

All of this amounts to a somewhat deeper refactoring than I would initially be comfortable with but if we accept Scott's suggestions (so far there seems to be no reason not to), then we'd have to do it anyway. If you're available, we could meet next week face2face and discuss this. I'm hoping to bring the discussion around Scott's proposals to some sort of tentative conclusion by then.

maksimdrachov commented 1 year ago

@pavel-kirienko

output

Call-flow:

# _udp.py
def get_output_session(
    self, specifier: pycyphal.transport.OutputSessionSpecifier, payload_metadata: pycyphal.transport.PayloadMetadata
) -> UDPOutputSession

# |
# v

# _udp.py
sock = self._sock_factory.make_output_socket(specifier.remote_node_id, specifier.data_specifier)

# |
# v

# _ip/_v4.py
def make_output_socket(
    self, remote_node_id: typing.Optional[int], data_specifier: pycyphal.transport.DataSpecifier
) -> socket.socket

For Message, remote_node_id is always None. The socket is connected to the following multicast address:

remote_ip = message_data_specifier_to_multicast_group(self._subnet_id, data_specifier)
remote_port = SUBJECT_PORT

For Service, remote_node_id can't be None. (A service request always needs to be directed towards a certain remote_node_id). The multicast address is determined as follows:

remote_ip = service_data_specifier_to_multicast_group(self._subnet_id, remote_node_id)
remote_port = service_data_specifier_to_udp_port(data_specifier)

All output sockets are stored in:

# _udp.py
self._output_registry: typing.Dict[pycyphal.transport.OutputSessionSpecifier, UDPOutputSession] = {}

(Each UDPOutputSession has it's own socket, sending is done through UDPOutputSession.send())

Removal of SocketReader does not affect this part of the implementation.

input

Call-flow (current implementation):

# _udp.py
def get_input_session(
    self, specifier: pycyphal.transport.InputSessionSpecifier, payload_metadata: pycyphal.transport.PayloadMetadata
) -> UDPInputSession:

# |
# v

# _udp.py
def _setup_input_session(
    self, specifier: pycyphal.transport.InputSessionSpecifier, payload_metadata: pycyphal.transport.PayloadMetadata
) -> None:

# |
# v

# _udp.py
self._socket_reader_registry[specifier.data_specifier] = SocketReader(
    sock=self._sock_factory.make_input_socket(specifier.remote_node_id, specifier.data_specifier),
    local_node_id=self._local_node_id,
    statistics=self._statistics.received_datagrams.setdefault(
        specifier.data_specifier, SocketReaderStatistics()
    ),
)

# |
# v

# _ip/_v4.py
def make_input_socket(
    self, remote_node_id: typing.Optional[int], data_specifier: pycyphal.transport.DataSpecifier
) -> socket.socket:

Call-flow (ported)

# _udp.py
def get_input_session(
    self, specifier: pycyphal.transport.InputSessionSpecifier, payload_metadata: pycyphal.transport.PayloadMetadata
) -> UDPInputSession:

# |
# v

# _udp.py
sock = self._sock_factory.make_input_socket(specifier.remote_node_id, specifier.data_specifier)

# |
# v

# _ip/_v4.py
def make_input_socket(
    self, remote_node_id: typing.Optional[int], data_specifier: pycyphal.transport.DataSpecifier
) -> socket.socket:
    # adds listeners for relevant subject-ID/service-ID

For Message, remote_node_id is always None. The socket is bound to the following multicast address:

multicast_ip = message_data_specifier_to_multicast_group(self._subnet_id, data_specifier)
multicast_port = SUBJECT_PORT

For Service, remote_node_id is always None. The socket is bound to the following multicast address:

multicast_ip = service_data_specifier_to_multicast_group(self._subnet_id, remote_node_id)
multicast_port = service_data_specifier_to_udp_port(data_specifier)

QUESTION 1: I guess remote_node_id doesn't make a lot of sense in the input case, since both Message and Service are assumed to be receiving from all possible sources?

All input sockets are stored in:

# _udp.py
self._input_registry: typing.Dict[pycyphal.transport.InputSessionSpecifier, UDPInputSession] = {}

(Each UDPInputSession has it's own socket, receiving is done through UDPInputSession.receive())

conclusion

Here some changes that need to be made due to removal of SocketReader:

Multicast addresses are:

z = subject-ID (Message)

Multicast address     |  Message, Output     Message, Input      Service, Output     Service, Input
---------------------------------------------------------------------------------------------------
remote_node_id = y    |  Not supported       Not supported       239.x.x.y           Not supported
                      |  
remote_node_id = None |  239.x.x.z           239.x.x.z           Not supported       239.x.255.255
                      |

Two types of UDPInputSession:

QUESTION 2: No use of SelectiveUDPInputSession? I guess "Not supported" of "Message, Input" is SelectiveUDPInputSession? QUESTION 3: What with SocketReaderStatistics? Not used in tests/_udp.py so not strictly necessary?

Regarding's Scott's changes, I've added to TODO, will implement after this Service part is done.

pavel-kirienko commented 1 year ago

For Service, remote_node_id can't be None. (A service request always needs to be directed towards a certain remote_node_id). The multicast address is determined as follows:

remote_ip = service_data_specifier_to_multicast_group(self._subnet_id, remote_node_id)
remote_port = service_data_specifier_to_udp_port(data_specifier)

This is generally correct but the remote port is now (per Scott's proposal) fixed at 9382 (not sure how the number was chosen but there is probably no difference). It is simpler than it used to be.

For Service, remote_node_id is always None. The socket is bound to the following multicast address: ... QUESTION 1: I guess remote_node_id doesn't make a lot of sense in the input case, since both Message and Service are assumed to be receiving from all possible sources?

This is incorrect because service listeners can be promiscuous (accept service transfers from any remote node) or selective (accept service transfers from a specific remote node); the remote_node_id is None in the former case and a valid integer in the latter.

The promiscuous kind is used with service servers (see pycyphal.presentation.Server) because a server needs to be able to receive a request from any client on the network.

The selective kind is used with service clients (see pycyphal.presentation.Client) because client instances are instantiated for a specific remote endpoint that provides the service. See Presentation.make_client() in the linked docs. The objective of this design is to provide sufficient context to the transport layer implementation to allow the transport to perform required packet filtering at the low level instead of requiring the application to do this at the high level manually.

The above also applies to QUESTION 2.

Each UDPInputSession has it's own socket, receiving is done through UDPInputSession.receive()

Good. I think we should drop a few lines in the comments explaining that the UDP demultiplexing problem does not apply in this case because it is only manifested with point-to-point connections, not multicast. I am referring to this piece:

https://github.com/OpenCyphal/pycyphal/blob/8273967696d9c250264a252399681ae8d725b962/pycyphal/transport/udp/_socket_reader.py#L58-L61


I see that you decided to keep one input socket per session. I suppose this is also fine, although some inefficiencies due to data duplication in the kernel might arise compared to the approach I suggested originally, where there would be one input socket responsible for all incoming service transfers, which are then dispatched to the correct UDPInputSession based on the service-ID and the remote node-ID. Perhaps your approach is simpler and the performance issues would never arise --- we'll see. Let's proceed as you implemented it and see if it requires changes in the future.

All SocketReader's functionality, is also moved into UDPInputSession:

If there's a dedicated socket per session, why do we need to add/remove listeners dynamically? I assume the lifetime of a socket matches that of the session that owns it, so there should be no need for dynamic listener configuration. Am I missing something here?

QUESTION 3: What with SocketReaderStatistics? Not used in tests/_udp.py so not strictly necessary?

We can safely drop this for now. This is needed only for high-level diagnostics and it does not directly affect the functionality of the library. For instance, yakut subscribe, yakut call, etc. emit this information at exit.

pavel-kirienko commented 1 year ago

@maksimdrachov, please either rebase on master or merge it into your branch. There have been important changes related to the CI.

maksimdrachov commented 1 year ago

@pavel-kirienko

Can you check udp/_ip? It's changed according to Scott's post.

I added to some unit tests here:

maksimdrachov commented 1 year ago

@pavel-kirienko

Re-iterating:

UDPInputSession       |  Message                        Service
----------------------------------------------------------------------------------
remote_node_id = int  |  Not supported                  SelectiveUDPInputSession
                      |  
remote_node_id = None |  PromiscuousUDPInputSession     PromiscuousUDPInputSession
                      |

To make it a bit more clear for myself, this is how I see that UDPInputSession needs to look (mirroring with UDPOutputSession):

_input.py:

_output.py:

Is this correct?


Some smaller questions/remarks:

udp/_ip/_v4.py:

# make_output_socket
elif isinstance(data_specifier, ServiceDataSpecifier):
    s.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_IF, self._local_ip_addr.packed)
    s.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, IPv4SocketFactory.MULTICAST_TTL)
    remote_ip = service_node_id_to_multicast_group(remote_node_id)
    remote_port = DESTINATION_PORT

# make_input_socket
elif isinstance(data_specifier, ServiceDataSpecifier):
    multicast_ip = service_node_id_to_multicast_group(remote_node_id)
    multicast_port = DESTINATION_PORT
    if sys.platform.startswith("linux") or sys.platform.startswith("darwin"):
        s.bind((str(multicast_ip), multicast_port))
    else:
        s.bind(("", multicast_port))
    try:
        s.setsockopt(
            socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, multicast_ip.packed + self._local_ip_addr.packed
        )

Add a check if ServiceDataSpecifier is Request (for make_input_socket) or Response (for make_output_socket)?


In udp/_ip/_endpoint_mapping.py, we define:

SUBJECT_ID_MASK = 2**15 - 1

however:

# transport/_data_specifier.py
@dataclasses.dataclass(frozen=True)
class MessageDataSpecifier(DataSpecifier):
    SUBJECT_ID_MASK = 2**13 - 1

    subject_id: int

    def __post_init__(self) -> None:
        if not (0 <= self.subject_id <= self.SUBJECT_ID_MASK):
            raise ValueError(f"Invalid subject-ID: {self.subject_id}")

Any comment on this apparent discrepancy?

pavel-kirienko commented 1 year ago

Is this correct?

Seems so, at least on the high level. I presume the next step would be to drop the no longer needed SocketReader, right?

Add a check if ServiceDataSpecifier is Request (for make_input_socket) or Response (for make_output_socket)?

I don't see what for. The request/response distinction does not seem to be manifested at the socket level, does it?

Any comment on this apparent discrepancy?

It is simply that Scott's multicast group address format reserves more bits for the subject-ID than necessary; that is all. You can see the same discrepancy in the Cyphal/serial header format. Although it seems inconsistent, ultimately, it does not matter, and it enables separate parts of the protocol to evolve independently.

maksimdrachov commented 1 year ago

@pavel-kirienko

I presume the next step would be to drop the no longer needed SocketReader, right?

Sure! (Currently leaving it in for reference)

I don't see what for. The request/response distinction does not seem to be manifested at the socket level, does it?

I was thinking maybe it would be good to warn the user if he tries to instantiate an input socket for a Response (which wouldn't make sense?). I guess it doesn't really matter for the actual socket.

It is simply that Scott's multicast group address format reserves more bits for the subject-ID than necessary; that is all.

Ok, I though maybe this was something overlooked in Scott's proposal.


I've updated the _output_session unit test, there's this small inconsistency I don't get:

assert (
    memoryview(b"\x01\x07\x06\x00\xae\x08A\xc11\xd4\x00\x00\x00\x00\x00\x00\x01\x00\x00\x80\x00\x00\x03<"),
    memoryview(b"e"),
) == UDPFrame(
    priority=Priority.OPTIONAL,
    source_node_id=6,
    destination_node_id=2222,
    snm=True,
    subject_id=None,
    service_id=321,
    rnr=True,
    transfer_id=54321,
    index=1,
    end_of_transfer=True,
    user_data=0,
    payload=memoryview(b"e"),
).compile_header_and_payload()
assert data_main_b == (
    b"\x01\x07\x06\x00\xae\x08A\xc11\xd4\x00\x00\x00\x00\x00\x00\x01\x00\x00\x80\x00\x00V\x03"
    + b"e"
    + pycyphal.transport.commons.crc.CRC32C.new(b"one", b"two", b"three").value_as_bytes
)

Both check out correctly, however the bytes are not exactly the same (at the end the V and <).

What am I missing? The content is the same, but the header crc is somehow different?

pavel-kirienko commented 1 year ago

I was thinking maybe it would be good to warn the user if he tries to instantiate an input socket for a Response (which wouldn't make sense?). I guess it doesn't really matter for the actual socket.

Maybe a check would make sense but then I urge you to think whether you are violating the principle of separation of concerns here. The job of the socket factory is to make sockets, not to check the protocol semantics.

What am I missing?

Here you seem to be computing the header CRC from the payload, whereas it should be computed from the header itself (as the name suggests):

https://github.com/maksimdrachov/pycyphal/blob/572deedd6359d41088e5fc1f9115210bed75c0de/pycyphal/transport/udp/_frame.py#L124

First you compile all fields except for the header into one binary blob, then compute its CRC and add it at the end.

There are two other major issues:

  1. Adding new dependencies is not permitted. Please drop the dependency on the CRC library and use this instead: https://pycyphal.readthedocs.io/en/stable/api/pycyphal.transport.commons.crc.html#pycyphal.transport.commons.crc.CRC16CCITT

  2. Mutual exclusion within a set of values should be expressed through the type system explicitly. This is very important for building self-explanatory and safe APIs. This means that this is not a good design:

    snm: bool  # Service, Not Message
    subject_id: int | None
    service_id: int | None
    rnr: bool | None  # Request, Not Response

While this is a solid alternative:

https://github.com/OpenCyphal/pycyphal/blob/75883ec57981ec9c3d659d471a7a2bad9c7f8c4c/pycyphal/transport/serial/_frame.py#L44

The definition of the DataSpecifier is here:

https://github.com/OpenCyphal/pycyphal/blob/75883ec57981ec9c3d659d471a7a2bad9c7f8c4c/pycyphal/transport/_data_specifier.py#L10-L51

maksimdrachov commented 1 year ago

@pavel-kirienko

Could you check the current implementation of pycyphal/transport/udp/_frame.py?

I think it should be correct now.

Next up: input session (tomorrow)

maksimdrachov commented 1 year ago

A rough update on the current status:

Even though it is "working", still some things remain to be done:

maksimdrachov commented 1 year ago

_input.py:

class UDPInputSessionStatistics(pycyphal.transport.SessionStatistics):
    pass

@dataclasses.dataclass
class PromiscuousUDPInputSessionStatistics(UDPInputSessionStatistics):
    reassembly_errors_per_source_node_id: typing.Dict[
        int, typing.Dict[TransferReassembler.Error, int]
    ] = dataclasses.field(default_factory=dict)
    """
    Keys are source node-IDs; values are dicts where keys are error enum members and values are counts.
    """

@dataclasses.dataclass
class SelectiveUDPInputSessionStatistics(UDPInputSessionStatistics):
    reassembly_errors: typing.Dict[TransferReassembler.Error, int] = dataclasses.field(default_factory=dict)
    """
    Keys are error enum members and values are counts.
    """

_output.py:

# this is a new class!
class UDPOutputSessionStatistics(pycyphal.transport.SessionStatistics):
    sent_frames_per_destination_node_id: typing.Dict[int, int] = dataclasses.field(default_factory=dict)
    frames_timeout_per_destination_node_id: typing.Dict[int, int] = dataclasses.field(default_factory=dict)
    """
    Keys are destination node-IDs; values are number of frames sent.
    """

_udp.py:

class UDPTransportStatistics(pycyphal.transport.TransportStatistics):
    input_sessions: typing.Dict[
        int, typing.Union[PromiscuousUDPInputSessionStatistics, SelectiveUDPInputSessionStatistics]
    ] = dataclasses.field(default_factory=dict)
    """
    Keys are source node-IDs; values are session statistics.
    """
    output_sessions: typing.Dict[int, UDPOutputSessionStatistics] = dataclasses.field(default_factory=dict)
    """
    Keys are destination node-IDs; values are session statistics.
    """

or:

@dataclasses.dataclass
class UDPTransportStatistics(pycyphal.transport.TransportStatistics):
    received_datagrams: typing.Dict[pycyphal.transport.DataSpecifier, typing.Dict[int, int]] = dataclasses.field(
        default_factory=dict
    )
    """
    Basic input session statistics: keyed by data specifier, values are dicts keyed by source node ID
    (values of second dict are number of datagrams received)
    """
    sent_datagrams: typing.Dict[pycyphal.transport.DataSpecifier, typing.Dict[int, int]] = dataclasses.field(
        default_factory=dict
    )
    """
    Basic output session statistics: keyed by data specifier, values are dicts keyed by destination node ID
    (values of second dict are number of datagrams sent)
    """

Which one of UDPTransportStatistics do you prefer?

maksimdrachov commented 1 year ago

image

pavel-kirienko commented 1 year ago
diff --git a/pycyphal/transport/udp/_session/_input.py b/pycyphal/transport/udp/_session/_input.py
index 80c4053..e7733a0 100644
--- a/pycyphal/transport/udp/_session/_input.py
+++ b/pycyphal/transport/udp/_session/_input.py
@@ -11,6 +11,7 @@ import typing
 import select
 import asyncio
 import logging
+import threading
 import dataclasses
 import pycyphal
 from pycyphal.transport import Timestamp
@@ -86,7 +87,11 @@ class UDPInputSession(pycyphal.transport.InputSession):
         assert isinstance(self._payload_metadata, pycyphal.transport.PayloadMetadata)
         assert callable(self._finalizer)
         self._transfer_id_timeout = self.DEFAULT_TRANSFER_ID_TIMEOUT
-        self._queue: asyncio.Queue[pycyphal.transport.TransferFrom] = asyncio.Queue()
+        self._frame_queue: asyncio.Queue[typing.Tuple[Timestamp, UDPFrame | None]] = asyncio.Queue()
+        self._thread = threading.Thread(target=self._reader_thread, name=str(self),
+                                        args=(asyncio.get_running_loop(),),
+                                        daemon=True)
+        self._thread.start()

     async def receive(self, monotonic_deadline: float) -> typing.Optional[pycyphal.transport.TransferFrom]:
         """
@@ -98,62 +103,56 @@ class UDPInputSession(pycyphal.transport.InputSession):
         """
         if self._closed:
             raise pycyphal.transport.ResourceClosedError(f"{self} is closed")
-
-        transfer_ready = False
-
-        consume_success = await self._consume(monotonic_deadline=monotonic_deadline)
-        while consume_success:  # if something was consumed, try to consume more
-            transfer_ready = True
-            consume_success = await self._consume(monotonic_deadline=monotonic_deadline)
-
-        if transfer_ready:
-            _logger.debug("%s: Consumed a datagram", self)
-            loop = asyncio.get_running_loop()
+        loop = asyncio.get_running_loop()
+        while True:
+            timeout = monotonic_deadline - loop.time()
             try:
-                timeout = monotonic_deadline - loop.time()
                 if timeout > 0:
-                    transfer = await asyncio.wait_for(self._queue.get(), timeout)
+                    ts, frame = await asyncio.wait_for(self._frame_queue.get(), timeout=timeout)
                 else:
-                    transfer = self._queue.get_nowait()
+                    ts, frame = self._frame_queue.get_nowait()
             except (asyncio.TimeoutError, asyncio.QueueEmpty):
                 # If there are unprocessed transfers, allow the caller to read them even if the instance is closed.
                 if self._finalizer is None:
                     raise pycyphal.transport.ResourceClosedError(f"{self} is closed") from None
                 return None
+            if frame is None:
+                self._statistics.errors += 1
+                continue
+            if not self.specifier.is_promiscuous:
+                if frame.source_node_id != self._specifier.remote_node_id:
+                    continue
+            self._statistics.frames += 1
+            source_node_id = frame.source_node_id
+            assert isinstance(source_node_id,
+                              int) and 0 <= source_node_id <= NODE_ID_MASK, "Internal protocol violation"
+            # Anonymous - no reconstruction needed
+            if source_node_id == NODE_ID_MASK:
+                transfer = TransferReassembler.construct_anonymous_transfer(ts, frame)
+            # # Single-frame transfer - no reconstruction needed # TODO FIXME
+            # elif frame.single_frame_transfer:
+            #     transfer = TransferReassembler.construct_uniframe_transfer(timestamp, frame)
+            # Otherwise, fetch the reassembler and process the frame
             else:
-                assert isinstance(transfer, pycyphal.transport.TransferFrom), "Internal protocol violation"
-                assert (
-                    transfer.source_node_id == self._specifier.remote_node_id or self._specifier.remote_node_id is None
-                )
+                _logger.debug("%s: Processing frame: %s", self, frame)
+                transfer = self._get_reassembler(source_node_id).process_frame(ts, frame, self._transfer_id_timeout)
+            if transfer is not None:
+                self._statistics.transfers += 1
+                self._statistics.payload_bytes += sum(map(len, transfer.fragmented_payload))
+                _logger.debug("%s: Received transfer: %s; current stats: %s", self, transfer, self._statistics)
                 return transfer
-        else:  # No datagrams were consumed
-            return None
-
-    async def _consume(self, monotonic_deadline: float) -> bool:
-        """
-        This method is used to consume the datagrams from the socket.
-        The method will block until a datagram is available or the deadline is reached.
-        If a datagram is read, it will call the underlying ``_process_frame()`` method
-        of the input session (PromiscuousInputSession or SelectiveInputSession).
-        If no datagram is read and the deadline is reached, the method will return ``False``.
-        """
-        loop = asyncio.get_running_loop()

-        while self._sock.fileno() >= 0:
+    def _reader_thread(self, loop: asyncio.AbstractEventLoop) -> None:
+        while not self._closed and self._sock.fileno() >= 0:
             try:
-                _logger.debug("monotonic_deadline-loop.time()=%s", monotonic_deadline - loop.time())
-                read_ready, _, _ = select.select([self._sock], [], [], monotonic_deadline - loop.time())
+                read_ready, _, _ = select.select([self._sock], [], [], 1.0)
                 if self._sock in read_ready:
                     # TODO: use socket timestamping when running on GNU/Linux (Windows does not support timestamping).
                     ts = pycyphal.transport.Timestamp.now()
-
                     # Notice that we MUST create a new buffer for each received datagram to avoid race conditions.
                     # Buffer memory cannot be shared because the rest of the stack is completely zero-copy;
                     # meaning that the data we allocate here, at the very bottom of the protocol stack,
                     # is likely to be carried all the way up to the application layer without being copied.
-                    # await asyncio.wait_for(
-                    #     loop.sock_recv_into(self._sock, _READ_SIZE), timeout=monotonic_deadline - loop.time()
-                    # )
                     data, endpoint = self._sock.recvfrom(_READ_SIZE)
                     assert len(data) < _READ_SIZE, "Datagram might have been truncated"
                     frame = UDPFrame.parse(memoryview(data))
@@ -164,49 +163,10 @@ class UDPInputSession(pycyphal.transport.InputSession):
                         endpoint,
                         frame,
                     )
-                    self._process_frame(ts, frame)
-                    return True
-                else:
-                    return False  # there was no datagram to read
-            except (asyncio.TimeoutError):
-                return False
+                    # TODO Handle QueueFull
+                    loop.call_soon_threadsafe(lambda: self._frame_queue.put_nowait((ts, frame)))
             except Exception as ex:
                 _logger.exception("%s: Exception while consuming UDP frames: %s", self, ex)
-                time.sleep(1)
-                return False
-
-    def _process_frame(self, timestamp: pycyphal.transport.Timestamp, frame: UDPFrame) -> None:
-
-        if frame is None:
-            self._statistics.errors += 1
-            return
-        if not self.specifier.is_promiscuous:
-            if frame.source_node_id != self._specifier.remote_node_id:
-                return
-        self._statistics.frames += 1
-
-        source_node_id = frame.source_node_id
-        assert isinstance(source_node_id, int) and 0 <= source_node_id <= NODE_ID_MASK, "Internal protocol violation"
-
-        # Anonymous - no reconstruction needed
-        if source_node_id == NODE_ID_MASK:
-            transfer = TransferReassembler.construct_anonymous_transfer(timestamp, frame)
-        # # Single-frame transfer - no reconstruction needed
-        # elif frame.single_frame_transfer:
-        #     transfer = TransferReassembler.construct_uniframe_transfer(timestamp, frame)
-        # Otherwise, fetch the reassembler and process the frame
-        else:
-            _logger.debug("%s: Processing frame: %s", self, frame)
-            transfer = self._get_reassembler(source_node_id).process_frame(timestamp, frame, self._transfer_id_timeout)
-
-        if transfer is not None:
-            self._statistics.transfers += 1
-            self._statistics.payload_bytes += sum(map(len, transfer.fragmented_payload))
-            _logger.debug("%s: Received transfer: %s; current stats: %s", self, transfer, self._statistics)
-            try:
-                self._queue.put_nowait(transfer)
-            except asyncio.QueueFull:
-                self._statistics.drops += len(transfer.fragmented_payload)  # queue_overflows

     @property
     def transfer_id_timeout(self) -> float:
@@ -228,9 +188,13 @@ class UDPInputSession(pycyphal.transport.InputSession):
         return self._payload_metadata

     def close(self) -> None:
+        self._closed = True
         if self._finalizer is not None:
             self._finalizer()
             self._finalizer = None
+        # TODO See https://github.com/OpenCyphal/pycyphal/blob/6a432b10c4307b006067fd0bdc867bc17790fa67/pycyphal/transport/udp/_socket_reader.py#L152-L162
+        # (EXTREMELY IMPORTANT)
+        self._sock.close()

     @property
     def socket(self) -> socket_.socket:
diff --git a/pycyphal/transport/udp/_session/_output.py b/pycyphal/transport/udp/_session/_output.py
index 2ae5734..2540ef0 100644
--- a/pycyphal/transport/udp/_session/_output.py
+++ b/pycyphal/transport/udp/_session/_output.py
@@ -201,8 +201,6 @@ class UDPOutputSession(pycyphal.transport.OutputSession):
                     loop.sock_sendall(self._sock, b"".join((header, payload))),
                     timeout=monotonic_deadline - loop.time(),
                 )
-                _logger.debug("sent completed")
-                assert False
                 # TODO: use socket timestamping when running on Linux (Windows does not support timestamping).
                 # Depending on the chosen approach, timestamping on Linux may require us to launch a new thread
                 # reading from the socket's error message queue and then matching the returned frames with a
diff --git a/tests/transport/redundant/_redundant.py b/tests/transport/redundant/_redundant.py
index 6aa094c..89374af 100644
--- a/tests/transport/redundant/_redundant.py
+++ b/tests/transport/redundant/_redundant.py
@@ -224,7 +224,7 @@ async def _unittest_redundant_transport(caplog: typing.Any) -> None:
     tr_a.attach_inferior(udp_a)
     # tr_a.attach_inferior(serial_a)

-    # tr_b.attach_inferior(udp_b)
+    tr_b.attach_inferior(udp_b)
     # tr_b.attach_inferior(serial_b)

     # assert tr_a.protocol_parameters == ProtocolParameters(
@@ -247,7 +247,7 @@ async def _unittest_redundant_transport(caplog: typing.Any) -> None:
         Transfer(
             timestamp=Timestamp.now(), priority=Priority.LOW, transfer_id=5, fragmented_payload=[memoryview(b"uio")]
         ),
-        monotonic_deadline=loop.time() + 1.0,
+        monotonic_deadline=loop.time() + 10.0,
     )
     _logger.debug("=================pub_a.send() completed=================")
     rx = await sub_any_b.receive(loop.time() + 1.0)
maksimdrachov commented 1 year ago

Concerning last commit:

I changed reader_thread from asyncio, to a regular thread. Is this ok?

The reason is that some unit tests would fail due to not having having any asyncio.get_running_loop() to call. This could be fixed by doing the following (in UDPInputSession::init()):

# create asyncio EventLoop, if there is none
try:
    self._loop = asyncio.get_running_loop()
except RuntimeError:
    self._loop = asyncio.new_event_loop()

However then still, some unit test would fail due to loop.call_soon_threadsafe(lambda:self._frame_queue.put_nowait((ts, frame))) never getting executed. (The frane would get read from the Socket, but never actually put in the Queue).

This is solved by using self._frame_queue.put_nowait((ts, frame)) instead.

EDIT: Nevermind, getting RuntimeError: Non-thread-safe operation invoked on an event loop other than the current one, which is I suspect caused by this change.


Concering the construct_uniframe, it has been omitted: process_frame is able to handle both uni-, as multiframes.


Currently working: pytest pycyphal pytest tests/transport

pavel-kirienko commented 1 year ago

The reason is that some unit tests would fail due to not having having any asyncio.get_running_loop() to call. EDIT: Nevermind

Yeah. My patch was correct in this respect. The worker thread is supposed to operate on the event loop of the main thread via call_soon_threadsafe, which is passed to the worker as its argument. If you create a new event loop in the worker context and invoke the callback that belongs to the main thread context in it, you are effectively loading up a machine gun and pointing it at your foot with the safety off.

maksimdrachov commented 1 year ago

remaining failing tests:

FAILED tests/application/file.py::_unittest_file - AssertionError: assert (PosixPath('/...ixPath('abc')) == (PosixPath('/...ixPath('abc'))
FAILED tests/transport/udp/ip/link_layer.py::_unittest_encode_decode_ethernet - AttributeError: AF_IRDA
FAILED tests/transport/udp/ip/link_layer.py::_unittest_sniff - IndexError: list index out of range
FAILED tests/transport/udp/ip/v4.py::_unittest_socket_factory - BlockingIOError: [Errno 35] Resource temporarily unavailable
FAILED tests/transport/udp/ip/v4.py::_unittest_sniffer - IndexError: list index out of range
maksimdrachov commented 1 year ago

still todo:

@pavel-kirienko @thirtytwobits Reminder: choose Cyphal port?

maksimdrachov commented 1 year ago

Screenshot 2023-02-19 at 15 50 24

pavel-kirienko commented 1 year ago

Docs preview https://pycyphal--253.org.readthedocs.build/en/253/api/pycyphal.transport.udp.html

pavel-kirienko commented 1 year ago

Regarding PyLint:

maksimdrachov commented 1 year ago
maksimdrachov commented 1 year ago

The problem with Windows remains.

Take the following example code (generated by ChatGPT):

import socket
import struct

MCAST_GRP = "224.1.1.1"
MCAST_PORT = 5007

sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(("", MCAST_PORT))

mreq = struct.pack("4sl", socket.inet_aton(MCAST_GRP), socket.INADDR_ANY)

sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)

# Retrieve the membership information for the socket
local_addr, local_port = sock.getsockname()
print(f"Subscribed to multicast group: {local_addr}")

while True:
    print(sock.recv(10240))

Gives following result:

option-1-res

Another suggestion, using getsockopt():

import socket

sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)

sock.bind(("0.0.0.0", 12345))

# join the multicast group
group = "224.0.0.1"
interface = "127.0.0.1"
sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, socket.inet_aton(group) + socket.inet_aton(interface))

mcast_addr = sock.getsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_IF)

print(f"Multicast address: {mcast_addr}")

option-2-res


getsockname() gives 0.0.0.0, because that's how bind() works in Windows. If we try to bind() to the multicast address it doesn't work. getsockopt() also doesn't give desired result. Still need to try on another laptop, maybe due to firewall/security settings.

SchoberMJ commented 1 year ago

From the jit.si OpenCyphal meeting this morning:

@maksimdrachov there might be an issue when trying to bind to all multicast groups

import socket
import struct

MCAST_GRP = "224.1.1.1"
MCAST_PORT = 5007

sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind((MCAST_GRP, MCAST_PORT))

mreq = struct.pack("4sl", socket.inet_aton(MCAST_GRP), socket.INADDR_ANY)
print(f"socket.inet_aton(MCAST_GRP): {socket.inet_aton(MCAST_GRP)}")
print(f"mreq: {mreq}")

sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)

# Retrieve the membership information for the socket
local_addr, local_port = sock.getsockname()
print(f"Subscribed to multicast group: {local_addr}")
print(f"Local Port: {local_port}")

while True:
    print(sock.recv(10240))

This results in

socket.inet_aton(MCAST_GRP): b'\xe0\x01\x01\x01'
mreq: b'\xe0\x01\x01\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
Subscribed to multicast group: 224.1.1.1
Local Port: 5007

Not sure if the intent was to bind to all groups or just the specific multicast group. If you want to bind to all groups there might be some extra steps that need to be taken.

edit: Feel free to ignore my comment if it is actually irrelevant to the problem, just my 2-cents from the meeting.

maksimdrachov commented 1 year ago

Thanks!

The problem is only when using Windows, you can't bind to MCAST_GRP:

image

It works however, if you don't bind:

image

(And it is really only receiving from that specific multicast address, i've checked)

Now, however there doesn't seem to be a straightforward way to find out which multicast group sock is subscribed to (without referring to MCAST_GRP).

maksimdrachov commented 1 year ago

For future reference (code example of this problem):

https://github.com/maksimdrachov/multicast-question

Retrieving membership information through getsockopt doesn't seem to be an option:

https://tldp.org/HOWTO/Multicast-HOWTO-6.html

pavel-kirienko commented 1 year ago
image

This is easy to fix: make UDPTransport accept IPAddress|str; if the argument is a string, use ipaddress.ip_address(str) to construct IPAddress.

Aside from that (and a few related type errors), the only remaining issue is that of the statistics. https://github.com/OpenCyphal/pycyphal/issues/279

pavel-kirienko commented 1 year ago

Please also bump the minor version number and add a new section to the changelog.