pymodbus-dev / pymodbus

A full modbus protocol written in python
Other
2.26k stars 922 forks source link

ModbusSocketFramer bug when multiple modbus messages are received in 1 tcp payload #1949

Closed phantom1299 closed 8 months ago

phantom1299 commented 8 months ago

Versions

Pymodbus Specific

Description

When receiving multiple modbus frames in a single tcp frame, pymodbus throws an error. This happens cause of a misscalculation. I fixed the issue by substracting 1 in the length calcuation of the frame in "pymodbus/framer/socket_framer.py" line 99:

    def getFrame(self):
        """Return the next frame from the buffered data.

        :returns: The next full frame buffer
        """
        length = self._hsize + self._header["len"] - 1
        print("getFrame", length, self._hsize, self._header['len'])
        return self._buffer[self._hsize : length]

This happens cause self._hsize and self._header["len"] have the uid byte. This doesn't happen when there is only 1 modbus packet inside a TCP frame cause in python my_list[:length] == my_list[:length+1] is True.

Code and Logs

Added some prints inside the framer funtions to debug

datagram_received

2024-01-22 22:54:03,852 DEBUG logging:103 recv: 0x4 0x83 0x0 0x0 0x0 0x6 0x1 0x3 0x11 0x4d 0x0 0x32 0x4 0x84 0x0 0x0 0x0 0x6 0x1 0x3 0x30 0x1 0x0 0x4 0x4 0x85 0x0 0x0 0x0 0x6 0x1 0x3 0x31 0x5 0x0 0x11 old_data:  addr=None
2024-01-22 22:54:03,852 DEBUG logging:103 Handling data: 0x4 0x83 0x0 0x0 0x0 0x6 0x1 0x3 0x11 0x4d 0x0 0x32 0x4 0x84 0x0 0x0 0x0 0x6 0x1 0x3 0x30 0x1 0x0 0x4 0x4 0x85 0x0 0x0 0x0 0x6 0x1 0x3 0x31 0x5 0x0 0x11
2024-01-22 22:54:03,852 DEBUG logging:103 Processing: 0x4 0x83 0x0 0x0 0x0 0x6 0x1 0x3 0x11 0x4d 0x0 0x32 0x4 0x84 0x0 0x0 0x0 0x6 0x1 0x3 0x30 0x1 0x0 0x4 0x4 0x85 0x0 0x0 0x0 0x6 0x1 0x3 0x31 0x5 0x0 0x11
getFrame 13 7 6
_process b'\x03\x11M\x002\x04'
2024-01-22 22:54:03,853 DEBUG logging:103 Factory Request[ReadHoldingRegistersRequest': 3]
5 b'\x11M\x002\x04'
Traceback (most recent call last):
  File ".../MyModbusServer.py", line 111, in handle
    await self.inner_handle()
  File ".../MyModbusServer.py", line 101, in inner_handle
    self._framer.processIncomingPacket(
  File ".../pymodbus/framer/base.py", line 139, in processIncomingPacket
    self.frameProcessIncomingPacket(single, callback, slave, **kwargs)
  File ".../pymodbus/framer/socket_framer.py", line 141, in frameProcessIncomingPacket
    self._process(callback, tid)
  File ".../pymodbus/framer/socket_framer.py", line 147, in _process
    if (result := self.decoder.decode(data)) is None:
  File ".../pymodbus/factory.py", line 183, in decode
    return self._helper(message)
  File ".../pymodbus/factory.py", line 216, in _helper
    request.decode(data[1:])
  File ".../pymodbus/register_read_message.py", line 49, in decode
    self.address, self.count = struct.unpack(">HH", data)
struct.error: unpack requires a buffer of 4 bytes
2024-01-22 22:54:03,855 DEBUG logging:103 Client Disconnection server_listener due to unpack requires a buffer of 4 bytes
janiversen commented 8 months ago

We have corrected a similar bug, and it is available on dev, however I think I will make a test case with multiple messages as well.

Can you please add a debug log, so I can see what is actually received.

Most devices do not accept multiple frames on the same socket (one client connection), so what is the practical use case ?

phantom1299 commented 8 months ago

I have a modbus slave which connects to a modbus master (instead of listening for clients to connect to me, I connect to the TCP server of the master). I changed the hard-coded is_server param to False in ModbusBaseServer __init__ and extended the ModbusTcpServer to have same functionality as ModbusServerRequestHandler when handling messages.

pymodbus.log

class MyModbusServer(ModbusTcpServer):
    def __init__(self, *args, **kwargs):
        self.tls_setup = CommParams(
            comm_type=CommType.TCP,
            comm_name="server_listener",
            reconnect_delay=1,
            reconnect_delay_max=10,
            timeout_connect=1,
            host="192.168.11.1",
            port=502
        )
        super().__init__(*args, **kwargs)
        self.receive_queue = asyncio.Queue()

    def _log_exception(self):
        """Show log exception."""
        Log.debug(
            "Handler for stream [{}] has been canceled", self.comm_params.comm_name
        )

    def callback_connected(self) -> None:
        """Call when connection is succcesfull."""
        try:
            self.running = True
            self._framer = self.framer(
                self.decoder,
                client=None,
            )

            # schedule the connection handler on the event loop
            self.handler_task = asyncio.create_task(self.handle())
            self.handler_task.set_name("server connection handler")
        except Exception as exc:  # pragma: no cover pylint: disable=broad-except
            Log.error(
                "Server callback_connected exception: {}; {}",
                exc,
                traceback.format_exc(),
            )

    def callback_disconnected(self, call_exc: Exception | None) -> None:
        """Call when connection is lost."""
        try:
            if self.handler_task:
                self.handler_task.cancel()
            if hasattr(self, "on_connection_lost"):
                self.on_connection_lost()
            if call_exc is None:
                self._log_exception()
            else:
                Log.debug(
                    "Client Disconnection {} due to {}",
                    self.comm_params.comm_name,
                    call_exc,
                )
            self.running = False
        except Exception as exc:  # pylint: disable=broad-except
            Log.error(
                "Datastore unable to fulfill request: {}; {}",
                exc,
                traceback.format_exc(),
            )

    async def inner_handle(self):
        """Handle handler."""
        slaves = self.context.slaves()
        # this is an asyncio.Queue await, it will never fail
        data = await self._recv_()
        if isinstance(data, tuple):
            # addr is populated when talking over UDP
            data, *addr = data
        else:
            addr = (None,)  # empty tuple

        # if broadcast is enabled make sure to
        # process requests to address 0
        if self.broadcast_enable:  # pragma: no cover
            if 0 not in slaves:
                slaves.append(0)

        Log.debug("Handling data: {}", data, ":hex")

        single = self.context.single
        self._framer.processIncomingPacket(
            data=data,
            callback=lambda x: self.execute(x, *addr),
            slave=slaves,
            single=single,
        )

    async def handle(self):
        while self.running:
            try:
                await self.inner_handle()
            except asyncio.CancelledError:
                # catch and ignore cancellation errors
                if self.running:
                    # self._log_exception()
                    self.running = False
            except Exception as exc:  # pylint: disable=broad-except
                # force TCP socket termination as processIncomingPacket
                # should handle application layer errors
                import traceback
                traceback.print_exc()
                self.transport_close()
                self.callback_disconnected(exc)

    def execute(self, request, *addr):
        """Call with the resulting message.

        :param request: The decoded request message
        :param addr: the address
        """
        if self.request_tracer:
            self.request_tracer(request, *addr)

        broadcast = False
        try:
            if self.broadcast_enable and not request.slave_id:
                broadcast = True
                # if broadcasting then execute on all slave contexts,
                # note response will be ignored
                for slave_id in self.context.slaves():
                    response = request.execute(self.context[slave_id])
            else:
                context = self.context[request.slave_id]
                response = request.execute(context)
        except NoSuchSlaveException:
            Log.error("requested slave does not exist: {}", request.slave_id)
            if self.ignore_missing_slaves:
                return  # the client will simply timeout waiting for a response
            response = request.doException(merror.GatewayNoResponse)
        except Exception as exc:  # pylint: disable=broad-except
            Log.error(
                "Datastore unable to fulfill request: {}; {}",
                exc,
                traceback.format_exc(),
            )
            response = request.doException(merror.SlaveFailure)
        # no response when broadcasting
        if not broadcast:
            response.transaction_id = request.transaction_id
            response.slave_id = request.slave_id
            skip_encoding = False
            if self.response_manipulator:
                response, skip_encoding = self.response_manipulator(response)
            self.send(response, *addr, skip_encoding=skip_encoding)

    def send(self, message, addr, **kwargs):
        """Send message."""
        if kwargs.get("skip_encoding", False):
            self.transport_send(message, addr=addr)
        elif message.should_respond:
            pdu = self._framer.buildPacket(message)
            self.transport_send(pdu, addr=addr)
        else:
            Log.debug("Skipping sending response!!")

    async def _recv_(self):  # pragma: no cover
        """Receive data from the network."""
        try:
            result = await self.receive_queue.get()
        except RuntimeError:
            Log.error("Event loop is closed")
            result = None
        return result

    def callback_data(self, data: bytes, addr: tuple | None = ()) -> int:
        """Handle received data."""
        if addr != ():
            self.receive_queue.put_nowait((data, addr))
        else:
            self.receive_queue.put_nowait(data)
        return len(data)
janiversen commented 8 months ago

Something is off here....a slave is pr modbus definition a server that listens and wait for clients to connect to it. It seems you have turned the modbus protocol upside down, that will have a lot of other implications like transaction handling, server/client decoder.

janiversen commented 8 months ago

But anyhow your original statement about a bug in socket_framer, did you try dev as I suggested.

phantom1299 commented 8 months ago

Yeah, I thought the same at first but the commercial device im trying to communicate with wants the slave to connect to itself so I had to implement a custom method as mentioned above.

Haven't tried the dev branch yet, will send an update when available

janiversen commented 8 months ago

Please read the modbus protocol, a slave do not connect....but your device is at the rs485 level below modbus, and in rs485 you have one master (client in modbus) and slaves (server ind modbus).

t-jakubek commented 8 months ago

I believe incorrect behavior can occur also for "standard" communication - communicating with slave, I send 10 (async) requests at the same time, Wireshark shows standard 1 Modbus response per TCP packet: Untitled However, in the log (please find it attached), seems 2 packets are aggregated, PyModbus ignores second part and retries request after timeout (3s). pymodbus.log

janiversen commented 8 months ago

We do not support sending multiple requests in our clients, actually it should not be possible, if you use the standard calls.

Each client knows it has exactly 0 or 1 request outstanding, and depends on that information, so getting multiple responses is not supported.

janiversen commented 8 months ago

Anyhow the software should not break down, but log an error.

janiversen commented 8 months ago

Please send the pcap file, then it is easier for me to make a test case.

t-jakubek commented 8 months ago

Just to be sure we are on the same page when it comes to multiple requests - what I do is something like (artificial example, but I used this line for gathering pcap data):

await asyncio.gather(*[client.read_holding_registers(address=x, count=2) for x in range(0, 1000, 100)])

I don't think I am doing any shenanigans with PyModbus API, the aggregation of requests in frame # 23 is done by TCP stack (Win10 x64+ CPython 3.11.6).

Because the slave is 3rd party device I can comment only on what I see in Wireshark and seems it sends one response at the time ("1" in the picture): image But the issue I see - PyModbus sometimes seems to lose a response and retransmission is required ("2" in the picture). Wireshark shows 10 separate responses, but in the log file from my previous comment we can see that PyModbus sometimes sees multiple responses "aggregated" and does not handle it correctly. The aggregation seems to be caused by OS/network stack, definitely not my application.

I belive the use case itself (send multiple async requests at once, wait for all responses) is valid - "Several MODBUS transactions can be activated simultaneously on the same TCP Connection." (after "4.2 TCP CONNECTION MANAGEMENT" from https://www.modbus.org/docs/Modbus_Messaging_Implementation_Guide_V1_0b.pdf ).

Attached please find the anonymized pcap.

janiversen commented 8 months ago

It might be you find it is a valid use case, but pymodbus still do not support it....just as many devices have problems when receiving a second request before responding to the first. This is because most devices actually do not care about the transaction id, but simply maps it into the response.

If you want pymodbus to support multiple parallel request, then pull requests are welcome. Please remember if you send 10 requests there are no guarantee that the responses are in the same order.

I do agree that pymodbus should not breakdown, and that is a case I will look at.

janiversen commented 8 months ago

Added support (silently, because there are surely corner cases, where it does not work).

Added a test case with your gather example, but corrected so it actually controls the functionality.

This will be part of v3.6.4