temporalio / sdk-python

Temporal Python SDK
MIT License
466 stars 71 forks source link

[Bug] Worker hangs after polling workflow task queue #631

Open kelkawi-a opened 2 months ago

kelkawi-a commented 2 months ago

What are you really trying to do?

To connect a worker to a Temporal server with an authorization header.

Describe the bug

Starting a worker with an incorrect token, causing the server to respond with Request unauthorized, causes the worker to hang indefinitely.

Minimal Reproduction

import asyncio

from temporalio import activity, client, workflow, worker

@activity.defn
async def a() -> None:
    pass

@workflow.defn
class Workflow:
    @workflow.run
    async def run(self) -> None:
        pass

async def main():
    c = await client.Client.connect(
        "my_temporal_host:7233",
        rpc_metadata={"authorization": "wrong_token"},
        tls=True,
    )
    w = worker.Worker(
        c,
        task_queue="default",
        activities=[a],
        workflows=[Workflow],
    )
    await w.run()

if __name__ == "__main__":
    asyncio.run(main())

Output:

2024-09-03T07:26:48.707Z [temporal-worker] 2024-09-03T07:26:48.707200Z ERROR temporal_client::retry: gRPC call poll_workflow_task_queue retried 52 times error=Status { code: Internal, message: "protocol error: received message with invalid compression flag: 60 (valid flags are 0 and 1) while receiving response with status: 504 Gateway Timeout", metadata: MetadataMap { headers: {"date": "Tue, 03 Sep 2024 07:26:48 GMT", "content-type": "text/html", "content-length": "160", "strict-transport-security": "max-age=15724800; includeSubDomains"} }, source: None }
2024-09-03T07:26:49.544Z [temporal-worker] 2024-09-03T07:26:49.544345Z ERROR temporal_client::retry: gRPC call poll_workflow_task_queue retried 52 times error=Status { code: Internal, message: "protocol error: received message with invalid compression flag: 60 (valid flags are 0 and 1) while receiving response with status: 504 Gateway Timeout", metadata: MetadataMap { headers: {"date": "Tue, 03 Sep 2024 07:26:49 GMT", "content-type": "text/html", "content-length": "160", "strict-transport-security": "max-age=15724800; includeSubDomains"} }, source: None }
2024-09-03T07:27:51.707Z [temporal-worker] 2024-09-03T07:27:51.707200Z ERROR temporal_client::retry: gRPC call poll_workflow_task_queue retried 53 times error=Status { code: Internal, message: "protocol error: received message with invalid compression flag: 60 (valid flags are 0 and 1) while receiving response with status: 504 Gateway Timeout", metadata: MetadataMap { headers: {"date": "Tue, 03 Sep 2024 07:27:51 GMT", "content-type": "text/html", "content-length": "160", "strict-transport-security": "max-age=15724800; includeSubDomains"} }, source: None }
2024-09-03T07:27:52.968Z [temporal-worker] 2024-09-03T07:27:52.967933Z ERROR temporal_client::retry: gRPC call poll_workflow_task_queue retried 52 times error=Status { code: Internal, message: "protocol error: received message with invalid compression flag: 60 (valid flags are 0 and 1) while receiving response with status: 504 Gateway Timeout", metadata: MetadataMap { headers: {"date": "Tue, 03 Sep 2024 07:27:52 GMT", "content-type": "text/html", "content-length": "160", "strict-transport-security": "max-age=15724800; includeSubDomains"} }, source: None }
2024-09-03T07:28:00.036Z [temporal-worker] 2024-09-03T07:28:00.036155Z ERROR temporal_client::retry: gRPC call poll_workflow_task_queue retried 53 times error=Status { code: Internal, message: "protocol error: received message with invalid compression flag: 60 (valid flags are 0 and 1) while receiving response with status: 504 Gateway Timeout", metadata: MetadataMap { headers: {"date": "Tue, 03 Sep 2024 07:28:00 GMT", "content-type": "text/html", "content-length": "160", "strict-transport-security": "max-age=15724800; includeSubDomains"} }, source: None }
2024-09-03T07:28:00.687Z [temporal-worker] 2024-09-03T07:28:00.687484Z ERROR temporal_client::retry: gRPC call poll_workflow_task_queue retried 53 times error=Status { code: Internal, message: "protocol error: received message with invalid compression flag: 60 (valid flags are 0 and 1) while receiving response with status: 504 Gateway Timeout", metadata: MetadataMap { headers: {"date": "Tue, 03 Sep 2024 07:28:00 GMT", "content-type": "text/html", "content-length": "160", "strict-transport-security": "max-age=15724800; includeSubDomains"} }, source: None }
2024-09-03T07:28:03.579Z [temporal-worker] 2024-09-03T07:28:03.578940Z  WARN temporal_sdk_core::worker::workflow::wft_poller: Error while polling for workflow tasks error=Status { code: PermissionDenied, message: "Request unauthorized.", details: b"\x08\x07\x12\x15Request unauthorized.\x1aJ\nHtype.googleapis.com/temporal.api.errordetails.v1.PermissionDeniedFailure", metadata: MetadataMap { headers: {"date": "Tue, 03 Sep 2024 07:28:03 GMT", "content-type": "application/grpc", "content-length": "0", "strict-transport-security": "max-age=15724800; includeSubDomains"} }, source: None }
2024-09-03T07:28:03.970Z [temporal-worker] 2024-09-03T07:28:03.583738Z ERROR temporal_sdk_core::worker::workflow::workflow_stream: Workflow processing encountered fatal error and must shut down TonicError(Status { code: PermissionDenied, message: "Request unauthorized.", details: b"\x08\x07\x12\x15Request unauthorized.\x1aJ\nHtype.googleapis.com/temporal.api.errordetails.v1.PermissionDeniedFailure", metadata: MetadataMap { headers: {"date": "Tue, 03 Sep 2024 07:28:03 GMT", "content-type": "application/grpc", "content-length": "0", "strict-transport-security": "max-age=15724800; includeSubDomains"} }, source: None })
2024-09-03T07:28:03.970Z [temporal-worker] 2024-09-03T07:28:03.969933Z  WARN temporal_sdk_core::worker::workflow::wft_poller: Error while polling for workflow tasks error=Status { code: PermissionDenied, message: "Request unauthorized.", details: b"\x08\x07\x12\x15Request unauthorized.\x1aJ\nHtype.googleapis.com/temporal.api.errordetails.v1.PermissionDeniedFailure", metadata: MetadataMap { headers: {"date": "Tue, 03 Sep 2024 07:28:03 GMT", "content-type": "application/grpc", "content-length": "0", "strict-transport-security": "max-age=15724800; includeSubDomains"} }, source: None }
2024-09-03T07:28:03.970Z [temporal-worker] 2024-09-03T07:28:03.969960Z ERROR temporal_sdk_core::worker::workflow::workflow_stream: Workflow processing encountered fatal error and must shut down TonicError(Status { code: PermissionDenied, message: "Request unauthorized.", details: b"\x08\x07\x12\x15Request unauthorized.\x1aJ\nHtype.googleapis.com/temporal.api.errordetails.v1.PermissionDeniedFailure", metadata: MetadataMap { headers: {"date": "Tue, 03 Sep 2024 07:28:03 GMT", "content-type": "application/grpc", "content-length": "0", "strict-transport-security": "max-age=15724800; includeSubDomains"} }, source: None })
2024-09-03T07:28:04.023Z [temporal-worker] Worker failed, shutting down
2024-09-03T07:28:04.023Z [temporal-worker] Traceback (most recent call last):
2024-09-03T07:28:04.023Z [temporal-worker]   File "/lib/python3.10/site-packages/temporalio/worker/_workflow.py", line 143, in run
2024-09-03T07:28:04.023Z [temporal-worker]     act = await self._bridge_worker().poll_workflow_activation()
2024-09-03T07:28:04.023Z [temporal-worker]   File "/lib/python3.10/site-packages/temporalio/bridge/worker.py", line 141, in poll_workflow_activation
2024-09-03T07:28:04.023Z [temporal-worker]     await self._ref.poll_workflow_activation()
2024-09-03T07:28:04.023Z [temporal-worker] RuntimeError: Poll failure: Unhandled grpc error when workflow polling: Status { code: PermissionDenied, message: "Request unauthorized.", details: b"\x08\x07\x12\x15Request unauthorized.\x1aJ\nHtype.googleapis.com/temporal.api.errordetails.v1.PermissionDeniedFailure", metadata: MetadataMap { headers: {"date": "Tue, 03 Sep 2024 07:28:03 GMT", "content-type": "application/grpc", "content-length": "0", "strict-transport-security": "max-age=15724800; includeSubDomains"} }, source: None }
2024-09-03T07:28:04.023Z [temporal-worker] 
2024-09-03T07:28:04.023Z [temporal-worker] The above exception was the direct cause of the following exception:
2024-09-03T07:28:04.023Z [temporal-worker] 
2024-09-03T07:28:04.023Z [temporal-worker] Traceback (most recent call last):
2024-09-03T07:28:04.023Z [temporal-worker]   File "/lib/python3.10/site-packages/temporalio/worker/_workflow.py", line 153, in run
2024-09-03T07:28:04.023Z [temporal-worker]     raise RuntimeError("Workflow worker failed") from err
2024-09-03T07:28:04.023Z [temporal-worker] RuntimeError: Workflow worker failed

Environment/Versions

Additional context

Related issue: #459

cretz commented 2 months ago

causes the worker to hang indefinitely.

We do check that the worker can connect to the namespace using "describe namespace". So you have a token that works on some calls but not others? Is this self-hosted or cloud?

The stack trace seems to be showing it raising an exception. Are you sure that worker.run() does not raise an exception here? It does take a minute because we retry all polling errors just in case they are spurious.

kelkawi-a commented 2 months ago

This issue is two-fold, I reported the one relevant to the SDK here:

  1. I have a worker which sends the correct token to a self-hosted Temporal server (v1.23.1). There is an issue at the ingress level which sometimes causes the server to respond with a 504 Bad Gateway.
  2. As the worker continues to receive this response from the ingress, it exhibits the behavior reported in this bug, eventually hanging and failing to re-connect.

The second issue is what i'm reporting here. The worker ends up raising a RuntimeError and not recovering from it.

cretz commented 2 months ago

The worker ends up raising a RuntimeError and not recovering from it.

Some client errors we can detect as recoverable. For ones we can't, we still try to recover for a minute before failing the worker. We intentionally fail the worker instead of letting it operate in a failed state on something that is not quickly/obviously recoverable. Can you confirm whether worker.run() does or does not raise an exception here? We start a worker shutdown, but a worker shutdown sends cancellation to activities and has to wait for activities to complete (see https://github.com/temporalio/sdk-python?tab=readme-ov-file#worker-shutdown).

kelkawi-a commented 2 months ago

I'm not sure if you're referring to something different, but the logs shared in the original bug report show that worker.run() is raising a RuntimeError.

cretz commented 2 months ago

Ah, I see it in the trace now. This is intentional behavior. A fatal error (or at least one we can't tell is non-fatal) that doesn't fix itself after a minute will cause the worker to fail and shutdown instead of pretending to work silently while continuing to fail. You may restart the worker if you wish, though many prefer not to blindly restart but rather investigate.

kelkawi-a commented 2 months ago

In our case the worker is failing after a number of intermittent network issues, so we would rather it restarts instead. Is there any recommendation around how to restarting the worker? I tried a simple while True loop to catch the RuntimeError exception raised and re-run await worker.run(), which seemed to complain (I don't have the logs from that right now but I can get them).

cretz commented 2 months ago

I tried a simple while True loop

This should work. Similarly you can consider having whatever is monitoring the process/pod/container do the restart at an outer level. I would recommend at least also alerting or something on fatal worker error or you won't know your worker isn't working.

kelkawi-a commented 2 months ago

I did try the following to retry the worker:

    while True:
        try:
            await worker.run()
        except RuntimeError as e:
            print(f"RuntimeError caught: {e}. Retrying...")
            await asyncio.sleep(5)  # wait for 5 seconds before retrying
        except Exception as e:
            print(f"An unexpected error occurred: {e}")
            raise

After running for a while, the worker crashed with the following error when trying to restart:

  File "/home/kelkawi/.cache/pypoetry/virtualenvs/boarding-w3anQpca-py3.10/lib/python3.10/site-packages/temporalio/worker/_workflow.py", line 143, in run
    act = await self._bridge_worker().poll_workflow_activation()
  File "/home/kelkawi/.cache/pypoetry/virtualenvs/boarding-w3anQpca-py3.10/lib/python3.10/site-packages/temporalio/bridge/worker.py", line 141, in poll_workflow_activation
    await self._ref.poll_workflow_activation()
RuntimeError: Poll failure: Unhandled grpc error when workflow polling: Status { code: PermissionDenied, message: "Request unauthorized.", details: b"\x08\x07\x12\x15Request unauthorized.\x1aJ\nHtype.googleapis.com/temporal.api.errordetails.v1.PermissionDeniedFailure", metadata: MetadataMap { headers: {"date": "Thu, 05 Sep 2024 09:43:21 GMT", "content-type": "application/grpc", "content-length": "0", "strict-transport-security": "max-age=15724800; includeSubDomains"} }, source: None }

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/kelkawi/.cache/pypoetry/virtualenvs/boarding-w3anQpca-py3.10/lib/python3.10/site-packages/temporalio/worker/_workflow.py", line 153, in run
    raise RuntimeError("Workflow worker failed") from err
RuntimeError: Workflow worker failed
RuntimeError caught: Workflow worker failed. Retrying...
An unexpected error occurred: 'NoneType' object has no attribute 'validate'
Traceback (most recent call last):
  File "/home/kelkawi/Desktop/CanonicalRepos/hr-automation/boarding/worker.py", line 219, in <module>
    asyncio.run(run_worker())
  File "/usr/lib/python3.10/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/usr/lib/python3.10/asyncio/base_events.py", line 649, in run_until_complete
    return future.result()
  File "/home/kelkawi/Desktop/CanonicalRepos/hr-automation/boarding/worker.py", line 207, in run_worker
    await worker.run()
  File "/home/kelkawi/.cache/pypoetry/virtualenvs/boarding-w3anQpca-py3.10/lib/python3.10/site-packages/temporalio/worker/_worker.py", line 478, in run
    await self._bridge_worker.validate()
  File "/home/kelkawi/.cache/pypoetry/virtualenvs/boarding-w3anQpca-py3.10/lib/python3.10/site-packages/temporalio/bridge/worker.py", line 133, in validate
    await self._ref.validate()
AttributeError: 'NoneType' object has no attribute 'validate'
cretz commented 2 months ago

A worker is meant for one run/shutdown. If you want to run a new worker you will need to recreate it again. Unfortunately some validation we added is happening before this check, we will fix that.