mdavidsaver / pvxs

PVA protocol client/server library and utilities.
https://mdavidsaver.github.io/pvxs/
Other
19 stars 25 forks source link

Status of put operation #60

Closed dvermd closed 7 months ago

dvermd commented 8 months ago

I want to create several put operations, store them in some container and when completed (the result lambda is called), remove the corresponding operation from the container.

I can't find any relation between the Result passed to the lambda and the operation returned by exec. I think I could spawn a thread to iterate the container and call op->wait(0) - if waiting 0 is supported - and check if Timeout is thrown but there must be another way.

Is there a better way to reset the operations stored in a container when completed ?

mdavidsaver commented 8 months ago

I am not sure I can give a specific answer in isolation. Overall, I have found it best to structure ownership so that in progress Operation objects are kept by whatever object will consume the results. This can help to provide a natural bound on the number of in-progress operations. For example, it is generally not desirable to have two in-progress get() or put() operations to the same PV.

Although you might end up re-inventing asyncio.Future (cf. p4p.client.asyncio.Context). I also think it would be straightforward to wrapper an Operation in a concurrent.futures.Future. Either would give "only once" behavior within a framework which can handle errors and cancellation.

mdavidsaver commented 8 months ago

oops, wrong repository. I have python on the brain this morning...

mdavidsaver commented 8 months ago

My general advice is the same, to think about the consumer end first. With c++, it is necessary to take more care to avoid reference loops, or surprising destruction sequences, when manipulating std::shared_ptr within completion callbacks.

Also, consider that PVXS allows operations to be initiated on currently disconnected PVs. So it is possible to initiate two different PUT operations on the same PV with different values. However, when/if that PV reconnects, it is undefined which will be executed first. So simply storing all in-progress Operation objects in one collection could become a recipe for a resource leak, or undefined behavior.

If you would like more specific advice, please provide some details about your application.

dvermd commented 7 months ago

Here is kind of what I came up to:

class Var {
public:
    void set(...) {
        PutOperation &put_op = _put_operations.emplace_back();
        put_op._op_container = &_put_operations;
        put_op._op = _monitor->context().put(name())
                .fetchPresent(false)
                .build([variable = this](pvxs::Value&& unused(prototype)) -> pvxs::Value {
                    pvxs::Value value = variable->_value;
                    value["timestamp"] = variable->ts();

                    return value;
                })
                .result([&put_op](pvxs::client::Result&& unused(prototype)){
                    put_op.result();
                })
                .exec();

    }

private:
    struct PutOperation {
        std::shared_ptr<pvxs::client::Operation> _op;
        std::vector<PutOperation> *_op_container;
        void result() {
            std::remove_if(_op_container->begin(), _op_container->end(), [this](auto &put_op) {
                return put_op._op == this->_op;
            });
        }
    };
    std::vector<PutOperation> _put_operations;
};

Of course accessing _put_operations must be protected by a mutex or a lockfree container