simonsobs / socs

Simons Observatory specific OCS agents.
BSD 2-Clause "Simplified" License
12 stars 12 forks source link

Race Conditions when using OpCodes and session.status #607

Closed jlashner closed 5 months ago

jlashner commented 6 months ago

I'm encountering the following issue when trying to fix tests for the HWP PMX agent:

The following test is failing the assertion that assert resp.session['op_code'] == OpCode.SUCCEEDED.value

@pytest.mark.integtest
def test_hwp_rotation_set_on(wait_for_crossbar, kikusui_emu, run_agent, client):
    responses = {'output 1': '',
                 'output?': '1'}

    kikusui_emu.define_responses(responses)
    resp = client.set_on()
    print(resp)
    print(resp.session)
    assert resp.status == ocs.OK
    assert resp.session['op_code'] == OpCode.SUCCEEDED.value

with the following error message:

default response set to 'None'
msg='output 1'
response=''
msg='output?'
response='1'
OCSReply: OK : Operation "set_on" is currently not running (SUCCEEDED).
  set_on[session=2]; status=starting for 1.0 s
  messages (1 of 1):
    1704472073.685 Status is now "starting".
  other keys in .session: op_code, data
{'session_id': 2, 'op_name': 'set_on', 'op_code': 2, 'status': 'starting', 'success': True, 'start_time': 1704472073.6851099, 'end_time': None, 'data': None, 'messages': [[1704472073.6851099, 'Status is now "starting".']]}
F

================================================== FAILURES ==================================================
__________________________________________ test_hwp_rotation_set_on __________________________________________

wait_for_crossbar = None, kikusui_emu = <socs.testing.device_emulator.DeviceEmulator object at 0x7ff6fdd3bd60>
run_agent = None, client = OCSClient('hwp-pmx')

    @pytest.mark.integtest
    def test_hwp_rotation_set_on(wait_for_crossbar, kikusui_emu, run_agent, client):
        responses = {'output 1': '',
                     'output?': '1'}

        kikusui_emu.define_responses(responses)
        resp = client.set_on()
        print(resp)
        print(resp.session)

        # import time
        # time.sleep(1)
        # resp = client.set_on.status()

        # print(resp)
        # print(resp.session)

        assert resp.status == ocs.OK
>       assert resp.session['op_code'] == OpCode.SUCCEEDED.value
E       assert 2 == 5
E        +  where 5 = <OpCode.SUCCEEDED: 5>.value
E        +    where <OpCode.SUCCEEDED: 5> = OpCode.SUCCEEDED

integration/test_hwp_pmx_agent_integration.py:52: AssertionError

Based on logs, the session.status in resp is still in the starting state, instead of done, and the OpCode is STARTING.

This following test passes:

@pytest.mark.integtest
def test_hwp_rotation_set_on(wait_for_crossbar, kikusui_emu, run_agent, client):
    responses = {'output 1': '',
                 'output?': '1'}

    kikusui_emu.define_responses(responses)
    resp = client.set_on()
    import time
    time.sleep(1)
    resp = client.set_on.status()
    assert resp.status == ocs.OK
    assert resp.session['op_code'] == OpCode.SUCCEEDED.value

I think the change that instigated this issue is switching tasks from being blocking to non-blocking.

I think this is due to a race-condition in when wait resolves compared to when the session-status is updated. This points to a slightly deeper OCS issue that I think @mhasself has experience with (based on comments in the src)

There may be a way to fix this in OCS through use of callback-chains or something, but for now I think we need to be really careful about using OpCode or session.status in tests or for control flow.

mhasself commented 6 months ago

Cool that code comment says "you can't trust session.d for purpose X" and then about 8 lines later we use session.d for purpose X ...

I think a quick fix is to add a second Deferred to session (session.d_done) and then at the end of these blocks, resolve it. Then wait should check and/or wait on session.d_done instead of session.d. That has proper ordering.

jlashner commented 6 months ago

I was thinking something along those lines, but another issue I've seen is that set_status can be asynchronous, for instance if you try to run:

session.set_status('running')
print(session.status)

this will not update immediately, since it is being scheduled to run in the main thread.

Like in the testing example above, when the response is returned from wait (without the sleep) it looks like this:

{'session_id': 2, 'op_name': 'set_on', 'op_code': 2, 'status': 'starting', 'success': True, 'start_time': 1704472073.6851099, 'end_time': None, 'data': None, 'messages': [[1704472073.6851099, 'Status is now "starting".']]}

Meaning that session.success was updated in the _handle_return_task function, however the status has not updated, presumably because that was waiting to run on the main thread.

So I'm not entirely sure that when running a callback from after _handle_return_task will ensure the session.status is updated.

mhasself commented 6 months ago

Ah, yes. I see what you mean. That will require a bit more thought to get the final Deferred to fire only once the state is fully up to date.

mhasself commented 6 months ago

Ok, actually, I really don't understand how this could happen. And I also can't reproduce it. That session dict you show should really be impossible.

I think _handle_task_return_val will always run in the reactor. Even if your task was in a worker thread. But definitely if your task was registered with blocking=False! There are no other threads for the callback to run as, in that case. And I don't understand how you can get success=True and status='starting' then.

But that puzzle aside ^^^, I do acknowledge these other more clear cut problems:

  1. There's a race condition in wait(). I think the consequences of that should be that the encoded session has success=None and status = 'starting' / 'running'. I.e. the session snapshot is taken a moment too soon, but it is internally consistent.
  2. session.set_status(...) is asynchronous -- but only if you're running in a worker thread, not the reactor. If you're in the reactor (including in Task code registered with blocking=False), it should be instantaneous. Actually, if you're in a worker thread, set_status returns a Deferred and you can yield on it. But no code does that!
  3. There is an invalid encoded session pushed out at one point. It's due to the "add_message" call, between success=ok and set_status('done'). But this invalid session just gets put on the agent's main feed -- which, like, does anyone use that? Certainly http client.wait() does not.

Let's hack on it on next week (if we don't figure it out before then).

jlashner commented 6 months ago

Ok I was really curious so started poking around a bit more and think I know why this is happening....

This is due to the new structure of the OCS agents. For reference, here is the task that's causing this issue: https://github.com/simonsobs/socs/blob/710d8a39de9014725aaa220079499e7d1684e9c4/socs/agents/hwp_pmx/agent.py#L133-L141

I think what's happening, is that the set_on function starts in the reactor thread, however yields to action.deferred, which is then taken over by the main operation, which is blocking, so is running in a worker thread. This means when the callback is called here, it is not in the reactor thread, so everything after the yield statement in the task, including _handle_task_return_vals is running in the worker thread of the main process.....

Really stretching my intuition for python and twisted here image

To fix maybe the best way is to wrap action.deferred.callback in callFromThread?

mhasself commented 6 months ago

To fix maybe the best way is to wrap action.deferred.callback in callFromThread?

Ahhh, that makes some sense then. Yes, wrap all three uses of action.deferred in callFromThread and I guess that should clean up the behavior.

jlashner commented 6 months ago

Confirmed, using callFromThread on the callback makes it so the session is self-consistent in the test above. I think the wait race-conditions are probably still worth thinking about, but for the time being I'll add this everywhere we use this structure.

mhasself commented 5 months ago

I think ocs:371 means we can close this?

jlashner commented 5 months ago

Yep!