python / asyncio

asyncio historical repository
https://docs.python.org/3/library/asyncio.html
1.03k stars 178 forks source link

Can't receive replies to multicast UDP packets #480

Open gpjt opened 7 years ago

gpjt commented 7 years ago

It doesn't appear to be possible to receive replies to multicast UDP messages. Server-side multicast does work, but only with a bit of extra config.

Given the following (working) server code, adapted from the UDP Echo example:

import asyncio
import logging
import socket
import struct

BROADCAST_PORT = 1910
BROADCAST_ADDR = "239.255.255.250"

class MulticastServerProtocol:

    def connection_made(self, transport):
        self.transport = transport

    def datagram_received(self, data, addr):
        print('Received {!r} from {!r}'.format(data, addr))
        data = "I received {!r}".format(data).encode("ascii")
        print('Send {!r} to {!r}'.format(data, addr))
        self.transport.sendto(data, addr)

loop = asyncio.get_event_loop()
loop.set_debug(True)
logging.basicConfig(level=logging.DEBUG)

sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.bind(('', BROADCAST_PORT))
group = socket.inet_aton(BROADCAST_ADDR)
mreq = struct.pack('4sL', group, socket.INADDR_ANY)
sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)

listen = loop.create_datagram_endpoint(
    MulticastServerProtocol,
    sock=sock,
)
transport, protocol = loop.run_until_complete(listen)

loop.run_forever()
loop.close()

...the following non-asyncio client code sends a broadcast packet and correctly receives the responses:

import socket
import struct
import sys

BROADCAST_PORT = 1910
BROADCAST_ADDR = "239.255.255.250"

sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.settimeout(3)
ttl = struct.pack('b', 1)
sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, ttl)

try:
    sent = sock.sendto(
        sys.argv[1].encode("ascii"),
        (BROADCAST_ADDR, BROADCAST_PORT)
    )

    while True:
        try:
            data, server = sock.recvfrom(1024)
        except socket.timeout:
            break
        else:
            print("Reply from {}: {!r}".format(server, data))

finally:
    sock.close()

However, the following code, which I'm pretty sure is the asyncio equivalent, sends out the mutlicast packet correctly but never receives a response:

import asyncio
import socket
import struct
import sys

BROADCAST_PORT = 1910
BROADCAST_ADDR = "239.255.255.250"

class DiscoveryClientProtocol:
    def __init__(self, loop):
        self.loop = loop
        self.transport = None

    def connection_made(self, transport):
        self.transport = transport
        sock = self.transport.get_extra_info('socket')
        sock.settimeout(3)
        ttl = struct.pack('b', 1)
        sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, ttl)

        self.transport.sendto(sys.argv[1].encode("ascii"))

    def datagram_received(self, data, addr):
        print("Reply from {}: {!r}".format(addr, data))
        # Don't close the socket as we might get multiple responses.

    def error_received(self, exc):
        print('Error received:', exc)

    def connection_lost(self, exc):
        print("Socket closed, stop the event loop")
        self.loop.stop()

loop = asyncio.get_event_loop()
connect = loop.create_datagram_endpoint(
    lambda: DiscoveryClientProtocol(loop),
    remote_addr=(BROADCAST_ADDR, BROADCAST_PORT),
)
transport, protocol = loop.run_until_complete(connect)
loop.run_forever()
transport.close()
loop.close()
Martiusweb commented 7 years ago

Hi, I haven't looked in depth, but this:

    sock.settimeout(3)

puts the socket in blocking mode. You must call sock.settimeout(0) or sock.set_blocking(False).

gpjt commented 7 years ago

I get the same effect if I remove that line, though :-( Likewise if I replace it with sock.settimeout(0) or with sock.setblocking(False)

jnises commented 7 years ago

Any news on this? I seem to be running into the same issue.

Julius2342 commented 7 years ago

I experience the same problem. What I found out, asnycio server works, if there is a non-asyncio server running in parallel ...

vxgmichel commented 7 years ago

I've been able to reproduce your issue using this code.

The problem is related to asyncio connecting to the broadcast address. Quoting this post:

Now, the main problem is that once you connect() a UDP socket, that effectively establishes a one-to-one relationship, such that you can only send out data to that one address, AND you can only receive data from that one address... So, anything that arrives that is NOT from the address you connect()'d to is dropped... And, of course, nothing that arrives will actually appear to be FROM the broadcast address (either the limited or subnet one)... Instead, it'll be from the actual host IP that really sent it...

The following patch fixes the issue:

diff --git a/asyncio/base_events.py b/asyncio/base_events.py
index 0174375..5b1256e 100644
--- a/asyncio/base_events.py
+++ b/asyncio/base_events.py
@@ -828,7 +828,8 @@ def create_datagram_endpoint(self, protocol_factory,
                     if local_addr:
                         sock.bind(local_address)
                     if remote_addr:
-                        yield from self.sock_connect(sock, remote_address)
+                        if not allow_broadcast:
+                            yield from self.sock_connect(sock, remote_address)
                         r_addr = remote_address
                 except OSError as exc:
                     if sock is not None:
diff --git a/asyncio/selector_events.py b/asyncio/selector_events.py
index ed2b4d7..02082c8 100644
--- a/asyncio/selector_events.py
+++ b/asyncio/selector_events.py
@@ -1044,7 +1044,7 @@ def sendto(self, data, addr=None):
             # Attempt to send it right away first.
             try:
                 if self._address:
-                    self._sock.send(data)
+                    self._sock.sendto(data, self._address)
                 else:
                     self._sock.sendto(data, addr)
                 return

There might be a better way though.

frawau commented 6 years ago

I am not sure the current status of this discussion, but for the sake of those stumbling upon this thread...

asyncio can handle this. Just create your own socket and pass it on to created_datagram_endpoint, asyncio will leave your socket alone. Also you must specify address and port in self.transport.sendto. Here is the client (slightly modified to support IPv6 multicast, just in case someone is interrested)

import asyncio
import socket
import struct
import sys

BROADCAST_PORT = 1910
BROADCAST_ADDR = "239.255.255.250"
#BROADCAST_ADDR = "ff0e::10"

class DiscoveryClientProtocol:
    def __init__(self, loop, addr):
        self.loop = loop
        self.transport = None
        self.addr = addr

    def connection_made(self, transport):
        self.transport = transport
        sock = self.transport.get_extra_info('socket')
        sock.settimeout(3)
        addrinfo = socket.getaddrinfo(self.addr, None)[0]
        if addrinfo[0] == socket.AF_INET: # IPv4
            ttl = struct.pack('@i', 1)
            sock.setsockopt(socket.IPPROTO_IP, 
                socket.IP_MULTICAST_TTL, ttl)
        else:
            ttl = struct.pack('@i', 2)
            sock.setsockopt(socket.IPPROTO_IPV6, 
                socket.IPV6_MULTICAST_HOPS, ttl)

        self.transport.sendto(sys.argv[1].encode("ascii"), (self.addr,BROADCAST_PORT))

    def datagram_received(self, data, addr):
        print("Reply from {}: {!r}".format(addr, data))
        # Don't close the socket as we might get multiple responses.

    def error_received(self, exc):
        print('Error received:', exc)

    def connection_lost(self, exc):
        print("Socket closed, stop the event loop")
        self.loop.stop()

loop = asyncio.get_event_loop()

addrinfo = socket.getaddrinfo(BROADCAST_ADDR, None)[0]
sock = socket.socket(addrinfo[0], socket.SOCK_DGRAM)
connect = loop.create_datagram_endpoint(
    lambda: DiscoveryClientProtocol(loop,BROADCAST_ADDR),
    sock=sock,
)
transport, protocol = loop.run_until_complete(connect)
loop.run_forever()
transport.close()
loop.close()

So as to cover the IPv6 bits, here is the server side

import asyncio
import logging
import socket
import struct

BROADCAST_PORT = 1910
BROADCAST_ADDR = "239.255.255.250"
#BROADCAST_ADDR = "ff0e::10"

class MulticastServerProtocol:

    def connection_made(self, transport):
        self.transport = transport

    def datagram_received(self, data, addr):
        print('Received {!r} from {!r}'.format(data, addr))
        data = "I received {!r}".format(data).encode("ascii")
        print('Send {!r} to {!r}'.format(data, addr))
        self.transport.sendto(data, addr)

loop = asyncio.get_event_loop()
loop.set_debug(True)
logging.basicConfig(level=logging.DEBUG)

addrinfo = socket.getaddrinfo(BROADCAST_ADDR, None)[0]
sock = socket.socket(addrinfo[0], socket.SOCK_DGRAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
group_bin = socket.inet_pton(addrinfo[0], addrinfo[4][0])
if addrinfo[0] == socket.AF_INET: # IPv4
    sock.bind(('', BROADCAST_PORT))
    mreq = group_bin + struct.pack('=I', socket.INADDR_ANY)
    sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)
else:
    sock.bind(('', BROADCAST_PORT))
    mreq = group_bin + struct.pack('@I', 0)
    sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_JOIN_GROUP, mreq)

listen = loop.create_datagram_endpoint(
    MulticastServerProtocol,
    sock=sock,
)
transport, protocol = loop.run_until_complete(listen)

loop.run_forever()
loop.close()

Me think this issue is a non issue and therefore should be closed