lloydroc / ebyte-sx1276

EByte E32 for the Raspberry Pi on Linux
Apache License 2.0
20 stars 5 forks source link

Q: Difference between e32.tx.data and e32.rx.data #29

Open viaSeunghyun opened 11 months ago

viaSeunghyun commented 11 months ago

Hello? It's been a while.

I've been using E32 well, with your code making it easy. I'm logging an issue because there's an implementation I'm using that doesn't make sense to me personally, and I'd like to ask @lloydroc a question.

See the code for the E32TX and E32RX. In e32tx, we create a socket for e32.tx.data, e32rx creates a socket on e32.rx.data.

However, in reality, only the names are different and all channels (RD&WR) are bound to /run/e32.data. If these two sockets are connected to a program at the same time, both e32.service and the user script will block indefinitely (socket.timeout if set to Non-Blocking).

Oct 12 22:25:18 raspberrypi e32[15843]: e32_poll_input_enable
Oct 12 22:25:19 raspberrypi e32[15843]: e32_poll_gpio_aux: transition from IDLE to RX state
Oct 12 22:25:19 raspberrypi e32[15843]: e32_poll_input_disable
Oct 12 22:25:19 raspberrypi e32[15843]: e32_poll_uart: received 16 bytes for a total of 16 bytes from uart
Oct 12 22:25:19 raspberrypi e32[15843]: e32_poll_gpio_aux: transition from RX to IDLE state
Oct 12 22:25:19 raspberrypi e32[15843]: e32_poll_gpio_aux: additional sleep for uart buffered data
Oct 12 22:25:19 raspberrypi e32[15843]: e32_poll_gpio_aux: received 2 bytes for a total of 18 bytes from uart
Oct 12 22:25:19 raspberrypi e32[15843]: e32_write_output: sending 18 bytes to socket /home/pi/e32.rx.data
Oct 12 22:25:19 raspberrypi e32[15843]: e32_write_output: sending 18 bytes to socket /home/pi/e32.tx.data

<= This Point Blocking all Sockets and Channels. No Logging in journalctl. E32 Service and All Client is blocked. <= And when kill client process, E32 Service unblocked

Oct 12 22:27:56 raspberrypi e32[15843]: e32_poll_input_enable
Oct 12 22:27:56 raspberrypi e32[15843]: e32_poll_uart: received 512 bytes for a total of 530 bytes from uart
Oct 12 22:27:56 raspberrypi e32[15843]: e32_poll_socket_unix_data: received 0 bytes from unix domain socket: /home/pi/e32.rx.data
Oct 12 22:30:43 raspberrypi e32[15843]: ERROR [?UNKNOWN? Connection refused] e32_poll_socket_unix_data: unable to send back status to unix socket
Oct 12 22:30:43 raspberrypi e32[15843]: e32_poll_uart: received 408 bytes for a total of 938 bytes from uart
Oct 12 22:30:43 raspberrypi e32[15843]: e32_poll_uart: received 8 bytes for a total of 946 bytes from uart
Oct 12 22:30:43 raspberrypi e32[15843]: e32_poll_uart: received 1 bytes for a total of 947 bytes from uart
Oct 12 22:30:43 raspberrypi e32[15843]: e32_poll_uart: received 1 bytes for a total of 948 bytes from uart
Oct 12 22:30:43 raspberrypi e32[15843]: e32_poll_uart: received 1 bytes for a total of 949 bytes from uart
Oct 12 22:30:43 raspberrypi e32[15843]: e32_poll_uart: received 1 bytes for a total of 950 bytes from uart
Oct 12 22:30:43 raspberrypi e32[15843]: e32_poll_uart: received 1 bytes for a total of 951 bytes from uart
Oct 12 22:30:43 raspberrypi e32[15843]: e32_poll_uart: received 1 bytes for a total of 952 bytes from uart

<= The number of repetitive errors below is proportional to the client's break time.

Oct 12 22:30:43 raspberrypi e32[15843]: ERROR [EFAULT Bad address] e32_poll_uart error reading from uart, fd=8, buf=0x556332cc48, total=0x7feb388968, mode=0
Oct 12 22:30:43 raspberrypi e32[15843]: ERROR [EFAULT Bad address] e32_poll_uart error reading from uart, fd=8, buf=0x556332cc48, total=0x7feb388968, mode=0
......
......
......
Oct 12 22:30:44 raspberrypi e32[15843]: e32_poll_gpio_aux: transition from IDLE to RX state
Oct 12 22:30:44 raspberrypi e32[15843]: e32_poll_input_disable
Oct 12 22:30:44 raspberrypi e32[15843]: e32_poll_uart: received 16 bytes for a total of 16 bytes from uart
Oct 12 22:30:44 raspberrypi e32[15843]: e32_poll_gpio_aux: transition from RX to IDLE state
Oct 12 22:30:44 raspberrypi e32[15843]: e32_poll_gpio_aux: additional sleep for uart buffered data
Oct 12 22:30:44 raspberrypi e32[15843]: e32_poll_gpio_aux: received 2 bytes for a total of 18 bytes from uart
Oct 12 22:30:44 raspberrypi e32[15843]: e32_write_output: sending 18 bytes to socket /home/pi/e32.rx.data
Oct 12 22:30:44 raspberrypi e32[15843]: ERROR [?UNKNOWN? Connection refused] e32_write_output: unable to send back status to unix socket. removing from list.
Oct 12 22:30:44 raspberrypi e32[15843]: e32_poll_gpio_aux: error writing outputs after RX to IDLE transition

Is this what you intended, or is there a separate method that is valid?

lloydroc commented 11 months ago

@viaSeunghyun thank you for creating this issue. I don't think I fully understand your scenario but I think I know what is happening.

Firstly, on the implementation. It is meant to be fully asynchronous and uses the poll system call. It must block as we cannot send data from multiple sources at the same time through the UART to the e32 board. It appears the case you have it is deadlocked. I will explain why.

You are right we have the following sockets:

  1. The /run/e32.data. This socket will the e32 process will recvfrom.
  2. The /run/e32.control for reading/changing the e32 board settings. This isn't part of this discussion.
  3. The "registered" sockets from client programs.

For the registered sockets the client programs e32tx and e32rx can optionally - with command line argument - send 0 bytes to e32 it will "register" and save the source socket - file path - in memory. When sockets are registered the e32 program can send back data to the registered sockets.

This is what I think has happened in your scenario:

  1. The e32rx program is running. It has registered /run/e32.rx.data so if the e32 program receives data from the e32 board it will send what it receives out this socket back to the client program.
  2. The e32tx program is run. It will also register /run/e32.tx.data since it was not run with the --skip-registration command line argument.
  3. From the logs above we can see that it is probably blocking after trying to send data back to the /run/e32.tx.data.

We see the following from your logs:

Oct 12 22:25:18 raspberrypi e32[15843]: e32_poll_input_enable
Oct 12 22:25:19 raspberrypi e32[15843]: e32_poll_gpio_aux: transition from IDLE to RX state
Oct 12 22:25:19 raspberrypi e32[15843]: e32_poll_input_disable
Oct 12 22:25:19 raspberrypi e32[15843]: e32_poll_uart: received 16 bytes for a total of 16 bytes from uart
Oct 12 22:25:19 raspberrypi e32[15843]: e32_poll_gpio_aux: transition from RX to IDLE state

We know here the input is disabled and thus it's not listening to input. That e32 will not listen to any input from the /run/e32.data socket and clients will block momentarily. This however, I assume isn't a problem at all as nothing is sending anything.

We then see:

Oct 12 22:25:19 raspberrypi e32[15843]: e32_poll_gpio_aux: additional sleep for uart buffered data
Oct 12 22:25:19 raspberrypi e32[15843]: e32_poll_gpio_aux: received 2 bytes for a total of 18 bytes from uart
Oct 12 22:25:19 raspberrypi e32[15843]: e32_write_output: sending 18 bytes to socket /home/pi/e32.rx.data
Oct 12 22:25:19 raspberrypi e32[15843]: e32_write_output: sending 18 bytes to socket /home/pi/e32.tx.data

The last line we have sending 18 bytes to socket /home/pi/e32.tx.data which is this line. However, when we look at e32tx script the program has probably exited and the socket is not closed. This is per this comment.

Thus, the e32 program is blocked in a sendto to the registered socket /run/e32.tx.data that has not been closed by the e32tx python script. This program isn't recvfrom and it won't error out in the e32 as its' not closed.

I would assume this would work if:

  1. The e32tx program is run with the --skip-registration command line argument
  2. If you wrote more code at the bottom of e32tx to close the socket, or to listen to data. Although, I don't see why we'd want to listen to that data.

If you can confirm this we can then think about the best solution. I think the problem here is that it makes no sense to send data the e32 has received to clients that are just meant for transmitting.

viaSeunghyun commented 11 months ago

Thank you for @lloydroc detailed response. I understood the whole scenario thanks to your explanation.

To add a little more clarification, before reading your explanation, I understood that the reason for the split into e32.tx.data and e32.rx.data was to avoid conflicts that would occur if /run/e32.data was accessed internally at the same time.

I mention channels because when I analyzed only the e32tx and e32rx scripts, I was under the impression that e32.tx.data was only the TX socket and e32.rx.data was only the RX socket. In other words, I understood that e32tx was implemented so that the receive channel was blocked, and e32rx was implemented so that the transmit channel was blocked, so that the end user didn't have to think about it.

I thought that there was some kind of processing related to the sockets that I didn't know about, and that the e32 service's code would handle only transmit data on e32.tx.data and only receive data on e32.rx.data without any management.

After reading the description, I didn't understand why it couldn't be processed in parallel, but now I do.

What I didn't understand was that I was mostly in direct control of UARTs on Windows, and the Virtual Serial Port Driver I was using had full N:M concurrent parallelism (mixing both physical and virtual ports). I need to learn more about sockets on Unix and Linux. LoL

That said, my initialization code looked something like this.

@singleton
class E32:
    def __init__(self, chip='SX1276', debug=False):
        self.debug = debug
        self.MAX_BYTES = 0

        if chip == 'SX1276':
            from .EByte_SX1276 import MAX_BYTES
            self.MAX_BYTES = MAX_BYTES

        self._packet_pattern = rb'\x02[\x00-\xFF]{16}\x03'
        self.packet_pattern_regex = re.compile(self._packet_pattern)

        self.driver_sock_path = '/run/e32.data'
        self.client_sock_tx_path = f"{os.path.expanduser('~')}/e32.tx.data"
        self.client_sock_rx_path = f"{os.path.expanduser('~')}/e32.rx.data"

        if os.path.exists(self.client_sock_tx_path):
            os.remove(self.client_sock_tx_path)

        if os.path.exists(self.client_sock_rx_path):
            os.remove(self.client_sock_rx_path)

        self.client_sock_tx = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
        self.client_sock_tx.bind(self.client_sock_tx_path)

        self.client_sock_rx = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
        self.client_sock_rx.bind(self.client_sock_rx_path)

        if debug:
            print(f"Registering Socket {self.client_sock_tx_path} to {self.driver_sock_path}")

        self.client_sock_tx.sendto(b'', self.driver_sock_path)
        (msg, address) = self.client_sock_tx.recvfrom(10)

        if len(msg) != 1 or msg[0] != 0:
            print("Unable to Register TX Client")
            raise LoRa.Ebyte_Exception.DriverConnectionError
        else:
            if debug:
                print("TX Client Registered")

        self.tx_buffer: bytes = b''

        if debug:
            print(f"Registering Socket {self.client_sock_rx_path} to {self.driver_sock_path}")

        self.client_sock_rx.sendto(b'', self.driver_sock_path)
        (msg, address) = self.client_sock_rx.recvfrom(10)

        if len(msg) != 1 or msg[0] != 0:
            print("Unable to Register RX Client")
            raise LoRa.Ebyte_Exception.DriverConnectionError
        else:
            if debug:
                print("RX Client Registered")

        signal.signal(signal.SIGALRM, self.handler)
        signal.signal(signal.SIGINT, self.handler)

        self.rx_buffer: bytes = b''

        self.client_sock_rx.setblocking(False)

    def run(self):
        if len(self.tx_buffer) > 1:
            raw_msg = self.tx_buffer
            self.tx_buffer = self.tx_buffer[len(raw_msg):]
            msg = self.encode(raw_msg)

            if self.debug:
                print(f"Sending... {len(msg)} Bytes. {msg}")

            self.client_sock_tx.sendto(msg, self.driver_sock_path)
            (recv_msg, address) = self.client_sock_tx.recvfrom(self.MAX_BYTES)

            if len(recv_msg) == 1 and recv_msg[0] == 0:
                if self.debug:
                    print(f"Success! Received 1 byte from socket {address}")
            else:
                print(f"Failed to send data. len: {len(recv_msg)}")
                raise LoRa.Ebyte_Exception.DataError
        try:
            (msg, address) = self.client_sock_rx.recvfrom(59)
            print(f"received from {address} {len(msg)} bytes with {msg}")

            self.rx_buffer += msg

        except BlockingIOError:
            # print(f"no received this time")
            pass
        pass

    def send(self, msg):
        self.tx_buffer += msg

    def recv(self):
        if len(self.rx_buffer) >= 18:
            print("Decode...")
            if self.precheck_packet(self.rx_buffer):
                return self.decode(self.rx_buffer)
            else:
                if self.debug:
                    print(f"Decode Failure {self.rx_buffer}")

        return None

I was only sending on client_sock_tx and receiving on client_sock_rx, so I didn't understand what was causing the deadlock. In the example you provided, we just used a slightly different name. However, now my understanding is that I can receive data from e32.tx.data and send data from e32.rx.data.

is that correct, at least in theory?

viaSeunghyun commented 11 months ago

If you wrote more code at the bottom of to close the socket, or to listen to data. Although, I don't see why we'd want to listen to that data.e32tx

To quote this sentence, I didn't do a separate listen on e32.tx.data, but that's how the service behaved.

I did exactly what the example code in this link does. I just made sure that in my implementation it didn't work if the tx_buffer was empty.

That is, in my code, recvfrom is not called on e32.tx.data unless I call E32.send() to fill the tx_buffer. After initialization, the following line of code in E32.run() is the only call to recvfrom on ~/e32.tx.data.

(recv_msg, address) = self.client_sock_tx.recvfrom(self.MAX_BYTES) # MAX_BYTES = 59 
lloydroc commented 11 months ago

@viaSeunghyun with the implementation the sockets are unidirectional. On one end of the socket is opened for sending, the other for receiving. The socket /run/e32.data is waiting - recvfrom for clients to sendto it. In this case the socket /run/e32.rx.data which the e32rx program has bound is waiting for the e32 program to sendto. This is because e32rx has registered the /run/e32.rx.data socket. When the e32 receives data it will loop through all registered sockets and sendto data with the source as the /run/e32.data socket.

The problem we have here - at least I think - is that the e32tx has registered the socket /run/e32.tx.data for acknowledging transmissions but it's not listening on the socket after transmissions. Since the /run/e32.tx.data socket is registered when the e32 receives data it will try to send the data that it receives to it. But the e32tx program isn't recvfrom on this socket so the e32 program is blocked waiting to send the datagram. This is why I mentioned we should 1) not register /run/e32.tx.data or 2) we should recvfrom it. This would involve asynchronous I/O like using the poll system call. This is why 1 makes more sense. Because the Unix domain sockets are connectionless the /run/e32.tx.data serves a second purpose. That is when the client sends something to the e32 socket of /run/e32.data it can write back to that socket to acknowledge it was sent. It is this dual purpose nature that is confusing and be changed.

Regardless, this problem you have found is addressed, a change could be implemented to at least timeout when the e32 does a loop through the registered sockets and timeout if it cannot sendto a registered socket.

When writing, the clients if they have both tx and rx clients then it needs a way to be able to recvfrom all the registered sockets in any order. If the client has two sockets it cannot be blocked waiting recvfrom on socket 1 when socket 2 has data. This makes the implementation of clients challenging. Hence, the poll system call. Again, in this case the client realistically should only have a single socket open for receiving and not two. Since here the e32tx socket is intended to be for acknowledgement and not for receiving data. Also, it was intended to run and close the socket and not be in a loop.

The code looks quite good. However, in this section:

        try:
            (msg, address) = self.client_sock_rx.recvfrom(59)
            print(f"received from {address} {len(msg)} bytes with {msg}")

            self.rx_buffer += msg

            # to not block we would have to self.client_sock_tx.recvfrom()

        except BlockingIOError:
            # print(f"no received this time")
            pass
        pass

The e32 is also sending data to self.client_sock_tx since it was registered. If you were to recvfrom it after you recvfrom the rx it would work. But as mentioned this is pointless.

Thus, could you remove the following section so that received data will not be sent to

        if debug:
            print(f"Registering Socket {self.client_sock_tx_path} to {self.driver_sock_path}")

        self.client_sock_tx.sendto(b'', self.driver_sock_path)
        (msg, address) = self.client_sock_tx.recvfrom(10)

        if len(msg) != 1 or msg[0] != 0:
            print("Unable to Register TX Client")
            raise LoRa.Ebyte_Exception.DriverConnectionError
        else:
            if debug:
                print("TX Client Registered")

This way there is no registered socket for the transmitting side.

Again, the problem here is that the /run/e32.tx.data socket is registered for both TX acknowledgement and receiving data. Since the tx socket it's not being listened on it is blocking the e32 program. I need some time to think about the best path forward here.

viaSeunghyun commented 11 months ago

Thank you so much for taking your valuable time to answer me, even on a holiday, you responded so quickly. I don't know your timezone, but I really appreciate it.

I understand more clearly now and will modify my code and logic. Thank you for your kind response.

In the future, if I have any other questions or ideas, is it okay to post some more? Of course, most problems with my code I will try and fix on my own.

Thank you so much again!

If you wish, you can now close this issue.