ControlSystemStudio / phoebus

A framework and set of tools to monitor and operate large scale control systems, such as the ones in the accelerator community.
http://phoebus.org/
Eclipse Public License 1.0
89 stars 87 forks source link

pv.write throws timeout exception on first call #3083

Closed georgweiss closed 1 month ago

georgweiss commented 1 month ago

So I have something like:

PV pv = PVPool.getPV("myPV");
pv.onValueEvent().subscribe(value -> {
     if (!PV.isDisconnected(value)) {
          pv.asyncWrite(42);
      }
 });

The idea is to connect, write a value and then return the PV to the pool. I notice that with pva, the first call will write the value but also throw a TimeoutException (stack trace below). Subsequent calls do not throw a TimeoutException. Using pv.asyncWrite does not throw exception on first call, nor do I see the exception using ca.

Exception in thread "TCP receiver /172.18.20.51:60853" io.reactivex.rxjava3.exceptions.OnErrorNotImplementedException: The exception was not handled due to missing onError handler in the subscribe() method call. Further reading: https://github.com/ReactiveX/RxJava/wiki/Error-Handling | java.util.concurrent.TimeoutException
    at io.reactivex.rxjava3.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:717)
    at io.reactivex.rxjava3.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:714)
    at io.reactivex.rxjava3.internal.subscribers.LambdaSubscriber.onError(LambdaSubscriber.java:79)
    at io.reactivex.rxjava3.internal.subscribers.LambdaSubscriber.onNext(LambdaSubscriber.java:69)
    at io.reactivex.rxjava3.internal.operators.flowable.FlowableCreate$LatestAsyncEmitter.drain(FlowableCreate.java:687)
    at io.reactivex.rxjava3.internal.operators.flowable.FlowableCreate$LatestAsyncEmitter.onNext(FlowableCreate.java:616)
    at org.phoebus.pv.ValueEventHandler$Subscription.update(ValueEventHandler.java:38)
    at org.phoebus.pv.PV.notifyListenersOfValue(PV.java:311)
    at org.phoebus.pv.pva.PVA_PV.handleMonitor(PVA_PV.java:90)
    at org.epics.pva.client.MonitorRequest.decodeValueUpdate(MonitorRequest.java:236)
    at org.epics.pva.client.MonitorRequest.handleResponse(MonitorRequest.java:198)
    at org.epics.pva.client.MonitorHandler.handleCommand(MonitorHandler.java:40)
    at org.epics.pva.client.MonitorHandler.handleCommand(MonitorHandler.java:21)
    at org.epics.pva.common.CommandHandlers.handleCommand(CommandHandlers.java:66)
    at org.epics.pva.client.ClientTCPHandler.handleApplicationMessage(ClientTCPHandler.java:351)
    at org.epics.pva.common.TCPHandler.handleMessage(TCPHandler.java:400)
    at org.epics.pva.common.TCPHandler.receiver(TCPHandler.java:300)
    at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: java.util.concurrent.TimeoutException
    at java.base/java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1960)
    at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2095)
    at org.phoebus.pv.pva.PVA_PV.write(PVA_PV.java:190)
    at org.phoebus.service.saveandrestore.epics.SnapshotUtil.lambda$restore$0(SnapshotUtil.java:86)
    at io.reactivex.rxjava3.internal.subscribers.LambdaSubscriber.onNext(LambdaSubscriber.java:65)
    ... 18 more
kasemir commented 1 month ago

I can't duplicate that exact error. This runs for me without issues on the client side ...

/** Use with for example this database:

# `softIocPVA -d demo.db`
# where demo.db contains this:
record(ai, "demo")
{
}

 *  
 */
public class PVWriteDemo
{
    public static void main(String[] args) throws Exception
    {
        PV pv = PVPool.getPV("pva://demo");
        pv.onValueEvent().subscribe(value ->
        {
            System.out.println("Got " + value);
            if (!PV.isDisconnected(value))
            {
                double val = Math.random() * 100;
                pv.asyncWrite(val);
            }
        });

        while (true)
        {
            System.out.println("Check...");
            Thread.sleep(1000);
        }
    }
}

... but there's a different problem: The code is basically an infinite loop. PV connects, receives first value. Code writes a number, which triggers another value update, code writes another value, which again triggers a value update, and in my case the IOC(!) then crashes. If you always write the same number "42", the result depends on the MDEL setting of the record, assuming you talk to a basic IOC.

Note that the PV API makes no promises regarding the caching and threading of the underlying implementation (caj, pva, jackie, mqtt, ...). Is the value that you receive a safe copy of the data, so you can hold on to a reference? Or is the value a zero-copy reference to data in some network buffer, which would be faster and more memory efficient, but now you need to copy out what you want to keep, because it becomes invalid as soon as you leave the subscribe consumer? As for threading, in your stack trace we can see that we're basically called right from TCPHandler.receiver, that is, with minimal latency right when we receive the update over TCP. But that also means you should probably not start another network operation like PV.write or PV.asyncWrite from within there, because the PV implementation could lock. The original JNI JCA implementation was notorious for that; when you called another API method from within a callback, it would lock up.

So if you simply want to write one value (connect, write, close), try something like this, which works for me with both "ca://" and "pva://"

    public static void main(String[] args) throws Exception
    {
        PV pv = PVPool.getPV("pva://demo");
        try
        {
            // Wait until we're connected
            CountDownLatch connected = new CountDownLatch(1);
            pv.onValueEvent().subscribe(value ->
            {
                System.out.println("Got " + value);
                if (!PV.isDisconnected(value))
                    connected.countDown();
            });

            connected.await(2, TimeUnit.SECONDS);

            // Write a value, wait for that to complete (if supported)
            double val = Math.random() * 100;
            System.out.println("Writing " + val);
            pv.asyncWrite(val).get(2, TimeUnit.SECONDS);
        }
        catch (Exception ex)
        {
            ex.printStackTrace();
        }

        // Done
        PVPool.releasePV(pv);
    }
kasemir commented 1 month ago

Scratch the "no promises regarding the caching". That's true on the lower CAJ or core-pva level, but the PV API deals with VTypes, and we do provide a copy in PV.read or PV.onValueEvent. Still, the threading applies. The subscribe consumer may be called directly from within the thread that reads from the network, so you better avoid other network calls from within there.

georgweiss commented 1 month ago

I've taken precautions to avoid the infinite loop, no worries. My snippet is just the essential elements for this discussion. However, using pv.asyncWrite does not present an issue in my code, while pv.write does as described.

In any case, I've moved to call to pv.write till after the await, and that resolves the issue.

Thanks @kasemir

kasemir commented 1 month ago

.. pv.write ... after the await

If this is for something like save/restore, you might actually want to wait for all affected PVs to first connect (send the first update) and only then write the values, so you restore all or nothing. Or prompt "Only N out of M PVs are available. Restore only those, or abort?"