pymodbus-dev / pymodbus

A full modbus protocol written in python
Other
2.22k stars 907 forks source link

Tremendous delay in request/response handling with cached slave context #1891

Closed martink-p closed 8 months ago

martink-p commented 9 months ago

Versions

Pymodbus Specific

Description

I have created a service which forwards modbus requests to an upstream hardware device. The requests are received by an async TCP server and are forwarded to upstream units sitting behind a Modbus/ACSII UART interface. This is done with the sync version of ModbusSerialClient. For performance reasons I have implemented a subclass of RemoteSlaveContext. To be more specific I created an override for the getValues function. It creates an automatic data cache in a dictionary and returns data from the cache with a given ttl (see below). I usually test modbus communication with ComTestPro. I found that with request intervals hitting the cache the response is tremendously slow (several seconds). This was not, if you do not hit the cache and directly request data from upstream devices. I did some investigation and timing analysis afterwards. I found that the processing time for the read of three holding registers from the upstream device is around 80ms. I also found that the processing time for the read of the same registers from the cache is around 50µs. The return values are indentical in both cases, e.g. a list with three integer values. As I couldn't see any reason for the cached response being that slow, I gave it a try and added a small delay in the case when respone data is read from the caching dictionary. This solved the delay-problem, which is weird enough. If you wait around 10ms before returning from the sublcass everything is fine. Reducing the delay towards 5ms, things get unstable.

I do not have a complete insight into the codebase of pymodbus. However, it seems that something between the async TcpServer and the upstream client gets out of sync in request/response handling if you return too fast. It then might wait for some sort of timeout before returning data to the requesting client.

For now I'm fine with the workaround of adding a small delay (e.g. like 20ms to have a safety margin). However, it would be great if somebody could elaborate a bit about this and maybe point out if this is expected behaviour.

Code and Logs


class CachedRemoteSlaveContext(RemoteSlaveContext):
    lock = None
    ttl = 250
    obj_dict = dict()

    def __init__(self, client, slave=None, ttl=250):
        super(CachedRemoteSlaveContext, self).__init__(client, slave)
        self.ttl = ttl
        self.obj_dict = dict()
        self.lock = threading.RLock()

    def getValues(self, fc_as_hex, _address, _count=1):
        with self.lock:
            # return [self.values[i] for i in range(address, address + count)]
            addresses = range(_address, _address + _count)
            start = time.time_ns()
            values = []
            if fc_as_hex in self.obj_dict:
                for addr in addresses:
                    addr_as_key = str(addr)
                    now = time.time_ns()//1_000_000
                    fc_dict = self.obj_dict[fc_as_hex]
                    if addr_as_key in fc_dict:
                        # get value from cache, if not expired
                        val, last_updated = fc_dict[addr_as_key]
                        ttl_expired = self.ttl and self.ttl < now - last_updated

                        if not ttl_expired:
                            values.append(val)
                        else:
                            # ttl expired
                            values = None # reset value list as we have no iformation about the _address/_count requirements, e.g. we cannot ensure read of single addresses
                            break
                    else:
                        # address not in cache dict
                        values = None # reset value list as we have no iformation about the _address/_count requirements, e.g. we cannot ensure read of single addresses
                        break

                if values is not None:
                    time.sleep(0.01)
                    delta = time.time_ns() - start
                    print(f"time cache: {delta}ns / {delta//1_000_000}ms")
                    return values # we have a successfull read from cache if the value list has not been reset
            else:
                # add FC to dictionary
                self.obj_dict[fc_as_hex] = dict()

            # data not in cache, request from upstream device
            values = super(CachedRemoteSlaveContext, self).getValues(fc_as_hex, _address, _count)
            now = time.time_ns()//1_000_000

            if type(values) is not ExceptionResponse:
                # add data to cache
                for i in range(_count):
                    self.obj_dict[fc_as_hex].__setitem__(str(addresses[i]), (values[i], now))

            delta = time.time_ns() - start
            print(f"time upstream: {delta}ns / {delta//1_000_000}ms")
            return values```
janiversen commented 9 months ago

simulator datastore as well as the "normal" datastore return values with no delay at all (because data is returned from an array, so that is not a problem.

You write your cache is slow, but it responds in microseconds while your device responds in milliseconds, so to me the device is a lot slower than your cache, not the other way round.

You are using threading, that does not mix well with async!!

There are a couple of known problems with the remote datastore, when sending requests to another server, basically due to the mix of sync/async classes.

Did you have a look at our forwarder example ?

janiversen commented 9 months ago

Did you solve your problem ?

martink-p commented 9 months ago

Did you solve your problem ?

No, not really. I implemented the described workaround of adding an additional delay when returning from cache.

simulator datastore as well as the "normal" datastore return values with no delay at all (because data is returned from an array, so that is not a problem. You write your cache is slow, but it responds in microseconds while your device responds in milliseconds, so to me the device is a lot slower than your cache, not the other way round.

I do not use the simulator datastore in production. However, I'm not sure why you are pointing at this. I also haven't said that my cache is slow. It is indeed really fast on an impressive way.

I think you misunderstood the whole point. The return from the device takes its time and the return from the cache takes a different time. It is however, really fast and the return from the device is slower. That is why I have implemented a cache which times out on a regular basis. This way I ensure not every request is served by the slower upstream USART/ASCII device. The issue is, that there is another unknown delay when returning from the getValues function in the above code. This delay is really long (multiple seconds) and can be ovserved by the calling api waiting ages until it returns. But, it occours only if I return data from the really fast cache within time periods below 10 ms. If I slow down the response (e.g. the return from the getValues function) I can work around this misterious delay. As this long delay also occurs when calling with a different client (ComTestPro), I can exclude the client being the reason.

The whole setup is the following:

Did you have a look at our forwarder example ?

My code is based on that forwarder example and uses asyncio. However, I started with the older RTU to TCP forwarder example and compared it to the newer one. IMHO it does the same (full code is below), with the exception that I'm directly calling serve_forever and not StartAsyncTcpServer.

You are using threading, that does not mix well with async!!

This might be. However, please point out how am I supposed to ensure the exclusive access to the pyhsical USART interface if the RemoteSlaveContext does not implement the async/await call pattern for the getValues function?

There are a couple of known problems with the remote datastore, when sending requests to another server, basically due to the mix of sync/async classes.

Okay. How am I supposed to implement this in a clean way? Is there any difference in how asyncio is setup if I'm calling StartAsyncTcpServer?

Full code:

"""Pymodbus SerialRTU2TCP Forwarder

usage :
python3 serial_forwarder.py --log DEBUG --port "/dev/ttyUSB0" --baudrate 9600 --server_ip "192.168.1.27" --server_port 5020 --slaves 1 2 3
"""
import argparse
import asyncio
import logging
import signal
import threading
import time

import pymodbus
from pymodbus.client import ModbusSerialClient
from pymodbus.framer import ModbusAsciiFramer
from pymodbus.datastore import ModbusServerContext, ModbusSlaveContext, ModbusSparseDataBlock
from pymodbus.datastore.remote import RemoteSlaveContext
from pymodbus.server.async_io import ModbusTcpServer
from pymodbus.pdu import ExceptionResponse

from cysystemd.daemon import notify, Notification

logging.basicConfig()
_logger = logging.getLogger(__file__)

def raise_graceful_exit(*_args):
    """Enters shutdown mode"""
    _logger.info("receiving shutdown signal now")
    raise SystemExit

class CachedRemoteSlaveContext(RemoteSlaveContext):
    lock = None
    ttl = 250
    obj_dict = dict()

    def __init__(self, client, slave=None, ttl=250):
        super(CachedRemoteSlaveContext, self).__init__(client, slave)
        self.ttl = ttl
        self.obj_dict = dict()
        self.lock = threading.RLock()

    def getValues(self, fc_as_hex, _address, _count=1):
        with self.lock:
            # return [self.values[i] for i in range(address, address + count)]
            addresses = range(_address, _address + _count)
            values = []
            if fc_as_hex in self.obj_dict:
                for addr in addresses:
                    addr_as_key = str(addr)
                    now = time.time_ns()//1_000_000
                    fc_dict = self.obj_dict[fc_as_hex]
                    if addr_as_key in fc_dict:
                        # get value from cache, if not expired
                        val, last_updated = fc_dict[addr_as_key]
                        ttl_expired = self.ttl and self.ttl < now - last_updated

                        if not ttl_expired:
                            values.append(val)
                        else:
                            # ttl expired
                            values = None # reset value list as we have no iformation about the _address/_count requirements, e.g. we cannot ensure read of single addresses
                            break
                    else:
                        # address not in cache dict
                        values = None # reset value list as we have no iformation about the _address/_count requirements, e.g. we cannot ensure read of single addresses
                        break

                if values is not None:
                    time.sleep(0.02)
                    return values # we have a successfull read from cache if the value list has not been reset
            else:
                # add FC to dictionary
                self.obj_dict[fc_as_hex] = dict()

            # data not in cache, request from upstream device
            values = super(CachedRemoteSlaveContext, self).getValues(fc_as_hex, _address, _count)
            now = time.time_ns()//1_000_000

            if type(values) is not ExceptionResponse:
                # add data to cache
                for i in range(_count):
                    self.obj_dict[fc_as_hex].__setitem__(str(addresses[i]), (values[i], now))

            return values

class SerialForwarderTCPServer:
    """SerialRTU2TCP Forwarder Server"""

    def __init__(self):
        """Initialize the server"""
        self.server = None
        self.daemon = False

    async def run(self):
        """Run the server"""
        port, baudrate, server_port, server_ip, slaves, simulate, daemon, no_cache, cache_ttl = get_commandline()
        self.daemon = daemon
        client = ModbusSerialClient(method="ascii", framer=ModbusAsciiFramer, port=port, baudrate=baudrate)

        if simulate:
            message = f"Simulate Modbus Server for slaves {slaves}"
            _logger.info(message)

            store = self._make_simulation_contexts(slaves)
        else:
            message = f"RTU bus on {port} - baudrate {baudrate}"
            _logger.info(message)

            store = {}
            for i in slaves:
                if no_cache:
                    store[i] = RemoteSlaveContext(client, slave=i)
                else:
                    store[i] = CachedRemoteSlaveContext(client, slave=i, ttl=cache_ttl)

        context = ModbusServerContext(slaves=store, single=False)
        self.server = ModbusTcpServer(
            context,
            address=(server_ip, server_port),
        )
        message = f"serving on {server_ip} port {server_port}"
        _logger.info(message)
        message = f"listening to slaves {context.slaves()}"
        _logger.info(message)

        if self.daemon:
            notify(Notification.READY)

        await self.server.serve_forever()

    async def stop(self):
        """Stop the server"""
        if self.server:
            if self.daemon:
                notify(Notification.STOPPING)
            await self.server.shutdown()
            _logger.info("TCP server is down")

    def _make_simulation_contexts(self, slaves):
        store = {}
        data = ModbusSparseDataBlock({1: [0]*10})
        for i in slaves:
            ctx = ModbusSlaveContext()
            ctx.register(1, "01", datablock=data)
            ctx.register(2, "02", datablock=data)
            ctx.register(3, "03", datablock=data)
            ctx.register(4, "04", datablock=data)
            ctx.register(5, "05", datablock=data)
            ctx.register(6, "06", datablock=data)
            ctx.register(16, "10", datablock=data)

            store[i] = ctx

        return store

def get_commandline():
    """Read and validate command line arguments"""
    parser = argparse.ArgumentParser(description="Command line options")
    parser.add_argument(
        "--log",
        choices=["critical", "error", "warning", "info", "debug"],
        help="set log level, default is info",
        default="info",
        type=str,
    )
    parser.add_argument(
        "--port", help="RTU serial port", default="/dev/ttyUSB0", type=str
    )
    parser.add_argument("--baudrate", help="RTU baudrate", default=9600, type=int)
    parser.add_argument("--server_port", help="server port", default=5020, type=int)
    parser.add_argument("--server_ip", help="server IP", default="127.0.0.1", type=str)
    parser.add_argument(
        "--slaves", help="list of slaves to forward", type=int, nargs="+"
    )
    parser.add_argument("--simulate", help="Simulate server, e.g. no forwarding and always returning zero to addresses 1 to 10.", default=False, action='store_true')
    parser.add_argument("--daemon", help="Run as a background service", default=False, action='store_true')
    parser.add_argument("--no_cache", help="Disable caching of modbus read transactions.", default=False, action='store_true')
    parser.add_argument("--cache_ttl", help="TTL of cached values in Milliseconds", default=250, type=int)
    parser.add_argument("--debug", help="Start remote debugging session", default=False, action='store_true')

    args = parser.parse_args()

    if args.debug:
        import debugpy
        print("Waiting for debugger attach")
        debugpy.listen(('0.0.0.0', 5678))
        debugpy.wait_for_client()
        debugpy.breakpoint()

    # set defaults
    _logger.setLevel(args.log.upper())
    pymodbus.pymodbus_apply_logging_config(args.log.upper())
    if not args.slaves:
        args.slaves = {1, 2, 3}
    return args.port, args.baudrate, args.server_port, args.server_ip, args.slaves, args.simulate, args.daemon, args.no_cache, args.cache_ttl

if __name__ == "__main__":
    server = SerialForwarderTCPServer()
    try:
        signal.signal(signal.SIGINT, raise_graceful_exit)
        asyncio.run(server.run())
    finally:
        asyncio.run(server.stop())
janiversen commented 9 months ago

I do not think I misunderstood you:

"This delay is really long (multiple seconds) and can be ovserved by the calling api waiting ages until it returns. But, it occours only if I return data from the really fast cache within time periods below 10 ms"

I understood you experience a delay in returning values from the cache.

We have no extra delay in that case, and I pointed to e.g. the simulator datastore, because that also return values from an internal cache, and without delay.

Your problem is most likely "self.lock = threading.RLock()", as I wrote earlier mixing threading with async code is generally asking for trouble.

martink-p commented 9 months ago

"This delay is really long (multiple seconds) and can be ovserved by the calling api waiting ages until it returns. But, it occours only if I return data from the really fast cache within time periods below 10 ms"

Okay, I'm sorry. I put too much information into this sentence. This was misleading. As I said, I really ment: I experience a delay if I return very fast from getValues. And, I do not experience this additional delay if I add extra waiting time of at least 10ms.

We have no extra delay in that case, and I pointed to e.g. the simulator datastore, because that also return values from an internal cache, and without delay. Your problem is most likely "self.lock = threading.RLock()", as I wrote earlier mixing threading with async code is generally asking for trouble.

The interesting point is, that I've already checked this. I've removed the "with self.lock" line from getValues and experienced the same delay. To be absolutely sure, I will double check it tomorrow. Today I'm at a remote site and cannot rely on network performance.

janiversen commented 9 months ago

I am not sure what to say, all our data stores work without delay, so without s9me more detailed information I do not think I can help you.

janiversen commented 8 months ago

Closing due to lack of information.