equinor / ert

ERT - Ensemble based Reservoir Tool - is designed for running ensembles of dynamical models such as reservoir models, in order to do sensitivity analysis and data assimilation. ERT supports data assimilation using the Ensemble Smoother (ES), Ensemble Smoother with Multiple Data Assimilation (ES-MDA) and Iterative Ensemble Smoother (IES).
https://ert.readthedocs.io/en/latest/
GNU General Public License v3.0
102 stars 107 forks source link

Experiment Server #3437

Open sondreso opened 2 years ago

sondreso commented 2 years ago

This is an issue for tracking the experiment server milestone.

Other points:

Milestone issues:

jondequinor commented 2 years ago

Should we include prefect ensembles and ert3 experiments in this milestone?

jondequinor commented 2 years ago

Updated proposal:

What is shared between the clients and the server is:

The hierarchical structure is a tree of unique identifiers, ideally (?) the source attribute of a CloudEvent.

flowchart TB
    /ert/ex/0 --> /ert/ex/0/ee/0
    /ert/ex/0/ee/0 --> /ert/ex/0/ee/0/real/0
    /ert/ex/0/ee/0 --> /ert/ex/0/ee/0/real/1
    /ert/ex/0 --> /ert/ex/0/ee/1
    /ert/ex/0/ee/1 --> /ert/ex/0/ee/1/real0
    /ert/ex/0/ee/1 --> /ert/ex/0/ee/1/real1
    /ert/ex/0 --> /ert/ex/0/ee/n
    /ert/ex/0/ee/n --> ...

The log is a shallow object where the attributes come from the source attribute of a CloudEvent, and the values are changes to be applied to experiment constituents (realizations, jobs, steps, ensembles, experiments).

This log does not contain CloudEvents, but rather the resulting values that needs to be set (some value) or unset (null). A schema will have to exist for each constituent.

{
    "/ert/ex/0": {"_type": "experiment", "status": "RUNNING"},
    "/ert/ex/0/ee/0/real/1/step/0/job/0": {"_type": "job", "current_memory_usage": "3GB"},
    …,
    "/ert/ex/0/ee/0/real/1/step/0/job/194": {"_type": "job", "error": null}
}

Both server and client have their own ways of creating a state from this log. QT uses a tree-like structure suited for implementing a TreeModel, and the server uses something else.

The main objective of the server is to guarantee that all clients have a log that is equal to that of its own, which means it will book-keep what clients knows what.

sequenceDiagram
actor User2
actor User1
User1 ->> Server: Submit Experiment

rect rgb(191, 223, 255)
User1 ->> Server: Connect
end

Server ->> Experiment: Run Experiment
Experiment ->> Experiment: Stimuli s0 (simulation start)
Experiment ->> StateMachine: Dispatch s0
StateMachine ->> StateMachine: Create log, apply log(s0)
Server ->> StateMachine: Establish log replication session for User1
StateMachine ->> Server: Fetch structure and log(s0) for User1

rect rgb(191, 223, 255)
Note right of User1: Websockets event
Server --) User1: structure and log(s0)
User1 ->> User1: build state from structure, apply log(s0) to own state
end

Experiment ->> Evaluator: Evaluate ensemble
Evaluator -->> Experiment: Stimuli s1 (ensemble start)
Experiment ->> StateMachine: Dispatch s1
StateMachine ->> StateMachine: Apply log(s1)

rect rgb(191, 223, 191)
User2 ->> Server: Connect
end

Server ->> StateMachine: Establish log replication session for User2
StateMachine ->> Server: Fetch structure and [log(s0), log(s1)] for User2

rect rgb(191, 223, 191)
Note right of User2: Websockets event
Server --) User2: structure and and [log(s0), log(s1)]
User2 ->> User2: build state from structure, apply [log(s0), log(s1)] to own state
end

Server ->> StateMachine: After some time, check if all clients are up-to-date. User1 was at log(s0)
StateMachine ->> Server: Fetch log(s1) for User1

rect rgb(191, 223, 191)
Note right of User1: Websockets event
Server --) User1: log(s1)
User1 ->> User1: apply log(s1) to own state
end

Evaluator ->> Cluster: Start remote work
loop Until all realizations are complete
    rect rgb(191, 223, 255)
    note right of Server: Websockets event from the cluster
    Cluster --) Server: Stimuli s_n Job start|update|success
    end
    Server ->> StateMachine: Dispatch s_n
    StateMachine ->> StateMachine: Apply log(s_n)
    opt Retrieve after time t - receive all transitions in aggregated form since last publish [log(s_n_last), log(s_n)]
        StateMachine ->> Server: Fetch [log(s_n_last), log(s_n)] for User1 and User2
        rect rgb(191, 223, 191)
        Note right of User2: Websockets event
        Server --) User2: [log(s_n_last), log(s_n)]
        User2 ->> User2: Apply [log(s_n_last), log(s_n)] to own state
        end
        rect rgb(191, 223, 255)
        Note right of User1: Websockets event
        Server --) User1: [log(s_n_last), log(s_n)]
        User1 ->> User1: Apply [log(s_n_last), log(s_n)] to own state
        end
    end
end

Evaluator -->> Experiment: Stimuli s_end (ensemble success)
Experiment ->> StateMachine: Dispatch s_end
StateMachine ->> StateMachine: apply log(s_end)
Server ->> StateMachine: Check if clients are up to date. Both need s_end.
StateMachine ->> Server: Fetch log(s_end)
rect rgb(191, 223, 191)
Note right of User2: Websockets event
Server --) User2: log(s_end)
User2 ->> User2: Apply log(s_end) to own state
end
rect rgb(191, 223, 255)
Note right of User1: Websockets event
Server --) User1: log(s_end)
User1 ->> User1: Apply log(s_end) to own state
end
lars-petter-hauge commented 2 years ago

I think it makes sense to first send the structure of entities and then use the flat structure to send events/logs on a per entity level. The client will have to handle updating their state one way or another and with this approach they should be fairly free to choose their own approach.

After some time, check if all clients are up-to-date. User1 was at log(s0)

This is the part I'm a bit curious about, and what has given the most headache when trying to implement the statemachine. I don't see a good and clean way of keeping this record and implementing how all classes must behave. Either the experiment server needs to hold all chunks of logs sent to clients, or we would need to implement a way to retrieve indexed chunks of logs from the statemachine (and still the experiment server would need to keep the knowledge of which client has retrieved which index).

I'm a bit ambivalent to the alternative of implicitly flushing the logs sent in the statemachine though (which is the one I'm implementing now. As in, whenever the experiment server asks for all new logs, the statemachine would send all currently aggregated and then empty the list. Hence subsequent calls for updates from the statemachine would create different results, and if any issues arise during receiving/handling it is not possible to "recreate" the content of the chunk). This is easy to implement in the statemachine, but doesn't seem that robust and requires the most logic and care in the experiment server to handle

The main objective of the server is to guarantee that all clients have a log that is equal to that of its own, which means it will book-keep what clients knows what.

I agree about the main objective of the server though! I think I would like the clients to receive the same logs though, and as such one client would never be ahead of another.

jondequinor commented 2 years ago

Either the experiment server needs to hold all chunks of logs sent to clients

Yes, I think this is probably best. There are multiple ways to truncate logs like this should size become a problem.

Hence subsequent calls for updates from the statemachine would create different results

Agreed that this is probably too complex to do well.

I think I would like the clients to receive the same logs though, and as such one client would never be ahead of another.

Yeah, wasn't communicated well, but temporary unsynchronized clients should be fine as long as they're eventually consistent.

lars-petter-hauge commented 2 years ago

Either the experiment server needs to hold all chunks of logs sent to clients

Yes, I think this is probably best. There are multiple ways to truncate logs like this should size become a problem.

By truncate I suppose you also mean merge? Considering that one log entry supersedes a previous log entry, a connected user would only need the last one. Thus this logic must be implemented on server side, unless one would want the clients to handle redundant messages. The statemachine will also contain this logic of course.

I think I would like the clients to receive the same logs though, and as such one client would never be ahead of another.

Yeah, wasn't communicated well, but temporary unsynchronized clients should be fine as long as they're eventually consistent.

I agree that the behaviour should be fine, but I think(...) it adds complexity. I'm not completely certain though, and the other way also adds complexity in other locations.

I've added some tests for the statemachine in a branch on my fork: https://github.com/lars-petter-hauge/ert/blob/experiment_server_lp/tests/ert_tests/experiment_server/test_state_machine.py

Tests does not pass and likely there are some issues with test data, but the behaviour can be discussed with these tests in mind. Note that these were made prior to the suggestions above

lars-petter-hauge commented 2 years ago

We've looked a bit into how the current setup is made in order to better see how the next Tracker will be made. The sequencediagram presented here is a draft and needs updating (it's using threads as basis, and that's not a good angle for looking at this. But it's an angle):

sequenceDiagram
    participant LegacyEnsemble
    participant Monitor.SyncWSDuplexer_loop_thread1
    participant ert_ee_run_server
    participant ert_gui_simulation_thread
    MainThread->>ert_gui_simulation_thread: RunDialog.startSimulation()
    ert_gui_simulation_thread->>ert_gui_simulation_thread: asyncio.new_event_loop()
    ert_gui_simulation_thread->>ert_ee_run_server:run_ensemble_evaluator->EnsembleEvaluator.run()
    ert_gui_simulation_thread->>LegacyEnsemble:LegacyEnsemble.evalutate
    LegacyEnsemble->>LegacyEnsemble:asyncio.new_event_loop()
    ert_ee_run_server->>ert_ee_run_server:asyncio.new_event_loop()
    Note over ert_ee_run_server: Event loop is created on EnsembleEvaluator initilization and fed to threading.Thread
    ert_ee_run_server->>Monitor.SyncWSDuplexer_loop_thread1:EnsembleEvaluator.run()->ee_monitor.create()
    MainThread->>EvaluatorTracker.DrainerThread: RunDialog.startSimulation()
    EvaluatorTracker.DrainerThread->>EvaluatorTracker.DrainerThread:asyncio.new_event_loop()
    EvaluatorTracker.DrainerThread->Monitor.SyncWSDuplexer_loop_thread2:create_ee_monitor()
    Monitor.SyncWSDuplexer_loop_thread2->Monitor.SyncWSDuplexer_loop_thread2:asyncio.new_event_loop()