temporalio / sdk-python

Temporal Python SDK
MIT License
466 stars 72 forks source link

[Bug] Failed during multiprocess queue poll for heartbeat #476

Open wilsoniya opened 8 months ago

wilsoniya commented 8 months ago

What are you really trying to do?

Operate a heavily-loaded multiprocess worker.

Describe the bug

In a heavily-loaded multiprocess worker with activities that may possibly be issuing heartbeats too frequently, I observe logging indicating the parent process times out reading from the _MultiprocessingSharedStateManager._heartbeat_queue thus the worker tree effectively hangs, i.e., exhibits no further liveness in its ability to process tasks. If this situation is unrecoverable, I would expect the worker process tree to terminate, thus allowing a restart (i.e., via systemd, k8s policy, etc.).

It's not clear to me why this timeout is happening, but it also results in the heartbeat queue filling up and blocking further progress on activities which heartbeat.

I see the following logging. First, presumably logs emitted by the parent of the process tree (i.e., the reader of the heartbeat queue):

{
  "git": "0bd697f642d775670c808da1e216c44764ddb854",
  "version": "1.2.0",
  "event": "Failed during multiprocess queue poll for heartbeat",
  "timestamp": "2024-02-13T19:33:27.414756Z",
  "logger": "temporalio.worker._activity",
  "level": "error",
  "exception": [
    {
      "exc_type": "TimeoutError",
      "exc_value": "",
      "syntax_error": null,
      "is_cause": false,
      "frames": [
        {
          "filename": "/opt/DPS/venv/lib/python3.8/site-packages/temporalio/worker/_activity.py",
          "lineno": 905,
          "name": "_heartbeat_processor",
          "line": "",
          "locals": {
            "self": "'<temporalio.worker._activity._MultiprocessingSharedStateManager object at 0x7f23'+9",
            "item": "\"(b'\\\\n$bc8c0612-11a9-4a58-ab6b-43d37f064f56\\\\x12\\\\x18replicate-treenode-20060\\\\x1a$8\"+115",
            "completion": "'<function _MultiprocessingSharedStateManager.unregister_heartbeater.<locals>.<la'+24",
            "fn": "'<function _ActivityInboundImpl.execute_activity.<locals>.<lambda> at 0x7f23412cb'+4"
          }
        },
        {
          "filename": "/opt/DPS/venv/lib/python3.8/site-packages/temporalio/worker/_activity.py",
          "lineno": 644,
          "name": "<lambda>",
          "line": "",
          "locals": {
            "details": "()",
            "heartbeat_with_context": "'<function _ActivityInboundImpl.execute_activity.<locals>.heartbeat_with_context '+18",
            "loop": "<_UnixSelectorEventLoop running=True closed=False debug=False>"
          }
        },
        {
          "filename": "/usr/lib/python3.8/concurrent/futures/_base.py",
          "lineno": 446,
          "name": "result",
          "line": "",
          "locals": {
            "self": "None",
            "timeout": "10"
          }
        }
      ]
    }
  ]
}

And second, from the a child in the process tree, as it attempts to push heartbeats to the queue:

{
  "git": "0bd697f642d775670c808da1e216c44764ddb854",
  "version": "1.2.0",
  "event": "ProgressPipe callback raised exception",
  "timestamp": "2024-02-08T21:26:47.729792Z",
  "logger": "vault.storage_manager.stream",
  "level": "error",
  "exception": [
    {
      "exc_type": "Full",
      "exc_value": "",
      "syntax_error": null,
      "is_cause": false,
      "frames": [
        {
          "filename": "/opt/DPS/vault-site/vault/storage_manager/stream.py",
          "lineno": 250,
          "name": "_invoke_callback",
          "line": "",
          "locals": {
            "self": "<vault.storage_manager.stream.ProgressPipe object at 0x7f78d9ab2eb0>",
            "e": "Full()"
          }
        },
        {
          "filename": "/opt/DPS/vault-site/vault/temporal/replication/activities.py",
          "lineno": 225,
          "name": "<lambda>",
          "line": "",
          "locals": {
            "n": "1960837120",
            "tree_node": "<TreeNode: TreeNode object (278873)>"
          }
        },
        {
          "filename": "/opt/DPS/venv/lib/python3.8/site-packages/temporalio/activity.py",
          "lineno": 274,
          "name": "heartbeat",
          "line": "",
          "locals": {
            "details": "(1960837120, 2077125573)",
            "heartbeat_fn": "<function _execute_sync_activity.<locals>.<lambda> at 0x7f78dab99790>"
          }
        },
        {
          "filename": "/opt/DPS/venv/lib/python3.8/site-packages/temporalio/worker/_activity.py",
          "lineno": 743,
          "name": "<lambda>",
          "line": "",
          "locals": {
            "details": "(1960837120, 2077125573)",
            "heartbeat_sender": "'<temporalio.worker._activity._MultiprocessingSharedHeartbeatSender object at 0x7'+12",
            "info": "\"Info(activity_id='3', activity_type='adjust_replica', attempt=1, current_attempt\"+875"
          }
        },
        {
          "filename": "/opt/DPS/venv/lib/python3.8/site-packages/temporalio/worker/_activity.py",
          "lineno": 924,
          "name": "send_heartbeat",
          "line": "",
          "locals": {
            "self": "'<temporalio.worker._activity._MultiprocessingSharedHeartbeatSender object at 0x7'+12",
            "task_token": "\"b'\\\\n$bc8c0612-11a9-4a58-ab6b-43d37f064f56\\\\x12\\\\x19replicate-treenode-278873\\\\x1a$5\"+110",
            "details": "(1960837120, 2077125573)"
          }
        },
        {
          "filename": "<string>",
          "lineno": 2,
          "name": "put",
          "line": "",
          "locals": {
            "self": "<AutoProxy[Queue] object, typeid 'Queue' at 0x7f78dab2b070>",
            "args": "\"((b'\\\\n$bc8c0612-11a9-4a58-ab6b-43d37f064f56\\\\x12\\\\x19replicate-treenode-278873\\\\x1a\"+150",
            "kwds": "{}"
          }
        },
        {
          "filename": "/usr/lib/python3.8/multiprocessing/managers.py",
          "lineno": 850,
          "name": "_callmethod",
          "line": "",
          "locals": {
            "self": "<AutoProxy[Queue] object, typeid 'Queue' at 0x7f78dab2b070>",
            "methodname": "put",
            "args": "\"((b'\\\\n$bc8c0612-11a9-4a58-ab6b-43d37f064f56\\\\x12\\\\x19replicate-treenode-278873\\\\x1a\"+150",
            "kwds": "{}",
            "conn": "<multiprocessing.connection.Connection object at 0x7f78e1c62460>",
            "kind": "#ERROR",
            "result": "Full()"
          }
        }
      ]
    }
  ]
}

Minimal Reproduction

Unfortunately, I have no reproduction, but I do have some analysis to share:

I think the log message Failed during multiprocess queue poll for heartbeat is potentially misleading, as raised here on the top Exception type. Digging into the first stack trace above, the exception is a TimeoutError and the point of origin appears to be here: https://github.com/temporalio/sdk-python/blob/1.5.0/temporalio/worker/_activity.py#L644 - the body of that expression ends with Future.result(10), which can raise TimeoutError. So the TimeoutError arises not from queue polling, but from executing a coroutine on an event loop. As to why the Future.result(10) call times out, I can't understand why.

Environment/Versions

cretz commented 8 months ago

Hrmm, so in a well-running multiprocess setup, a heartbeat should be immediate and barely queued. Because the primary process should be processing the cross-process heartbeat almost immediately.

So the TimeoutError arises not from queue polling, but from executing a coroutine on an event loop.

Well, it's more like from the lack of queue polling. I believe if the heartbeat queue is not being polled/processed fast enough, this is where this call gets hung.

As to why the Future.result(10) call times out, I can't understand why.

The only reason why I can think this would fail is if your primary process is not processing asyncio quickly enough. This often happens if you're doing a blocking call in some async def somewhere but I suppose could be caused by sending too many heartbeats too fast (or a combination of both things).

I am not sure there is much that can be done here. I believe the worker is not processing heartbeats faster than you are sending them and at some point it's a timeout. We can't queue heartbeats indefinitely. Open to suggestions though. For whatever suggestion, we likely need a replication to even confirm it solves your issue.

Alternatively, you may consider standard threaded (or async) activities that you, inside the activity, invoke multiprocessing.

wilsoniya commented 8 months ago

Thanks for replying and generally being so responsive!

The only reason why I can think this would fail is if your primary process is not processing asyncio quickly enough. This often happens if you're doing a blocking call in some async def somewhere but I suppose could be caused by sending too many heartbeats too fast (or a combination of both things).

I suspect you're right. I'm using Pydantic 2.x, and using the custom data converter recipe you provide. The activity with frequent heartbeats is sending the details parameter, thus invoking the data converter (if I understand correctly). Pydantic 2.x appears to have some serious unresolved perf issues, so I'm wondering if the primary process is bogged down doing Pydantic serde of heartbeat details. I'm going to experiment with omitting heartbeat details and see if that resolves the issue.

I am not sure there is much that can be done here. I believe the worker is not processing heartbeats faster than you are sending them and at some point it's a timeout. We can't queue heartbeats indefinitely. Open to suggestions though. For whatever suggestion, we likely need a replication to even confirm it solves your issue.

Is this return call potentially problematic?

https://github.com/temporalio/sdk-python/blob/4037dd4ce57461124612a9536f534abc05a7eb79/temporalio/worker/_activity.py#L908-L910

It seems like returning from _MultiprocessingSharedStateManager._heartbeat_processor() when we catch a TimeoutError when evaluating the heartbeat future here

https://github.com/temporalio/sdk-python/blob/4037dd4ce57461124612a9536f534abc05a7eb79/temporalio/worker/_activity.py#L644-L646

will result in processing no further heartbeats, and thus the heartbeat queue filling up and blocking further activity liveness. I have an incomplete understanding of this code, but I suspect that the error I'm describing is unrecoverable.

cretz commented 8 months ago

Is this return call potentially problematic?

I fear it will encourage lossy heartbeats as opposed to just stopping the processor. If we had an easy way to surface the exception we would, but in the meantime I am worried about just accepting some heartbeats and not others

will result in processing no further heartbeats, and thus the heartbeat queue filling up and blocking further activity liveness

Yes, this is a dangerous state to get in. I am not exactly sure what we should do when the shared heartbeat processor fails. It should never fail. I do want to avoid adding some kind of advanced code (e.g. draining just this specific activity's heartbeat queue somehow) to this never-should-happen failure state. Really it shouldn't even be a queue, it should be almost immediate where the queue never has a size more than one.

Open to suggestions about how to better surface this. Should I try to find a way to throw an exception and shut the whole worker down?

wilsoniya commented 7 months ago

Sorry for the delayed response.

Should I try to find a way to throw an exception and shut the whole worker down?

That would be ideal for my use case. But more generally, I think encountering situations one has consciously conceived of as "impossible" merits a hard crash. And given a choice between a process crashing or technically persisting in a dysfunctional state, I think the former is more conducive to detecting and recovering from a problem.

Thanks!

cretz commented 7 months ago

With Temporal, we often favor partially disfunctional workers that still work for other things (e.g. things that aren't heartbeating and workflows) over fail-fast. But maybe we can make it an option.

But I am now wondering if we should deprecate multiprocess activities and instead tell users they can use normal Python subprocessing in their activities if they must have separate processes. It doesn't have much value in the SDK anymore than any other form of remote activity would. The details of dealing with out-of-process activities is probably best left to the (advanced) user.