firdaus / temporal-python-sdk

MIT License
95 stars 23 forks source link

TimeoutError: Deadline exceeded on activities taking greating than 120 seconds #15

Open dkryptr opened 3 years ago

dkryptr commented 3 years ago

Here's the stack trace.

2021-05-13 13:22:37,579 | ERROR | retry.py:retry_loop:29 | run failed: Deadline exceeded, retrying in 3 seconds
Traceback (most recent call last):
  File "/Users/chad.greenburg/.pyenv/versions/3.8.5/lib/python3.8/site-packages/grpclib/client.py", line 360, in recv_initial_metadata
    headers = await self._stream.recv_headers()
  File "/Users/chad.greenburg/.pyenv/versions/3.8.5/lib/python3.8/site-packages/grpclib/protocol.py", line 349, in recv_headers
    await self.headers_received.wait()
  File "/Users/chad.greenburg/.pyenv/versions/3.8.5/lib/python3.8/asyncio/locks.py", line 309, in wait
    await fut
asyncio.exceptions.CancelledError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/chad.greenburg/.pyenv/versions/3.8.5/lib/python3.8/site-packages/temporal/retry.py", line 17, in retry_loop
    await fp(*args, **kwargs)
  File "/Users/chad.greenburg/.pyenv/versions/3.8.5/lib/python3.8/site-packages/temporal/decision_loop.py", line 1083, in run
    decision_task: PollWorkflowTaskQueueResponse = await self.poll()
  File "/Users/chad.greenburg/.pyenv/versions/3.8.5/lib/python3.8/site-packages/temporal/decision_loop.py", line 1165, in poll
    task = await self.service.poll_workflow_task_queue(
  File "/Users/chad.greenburg/.pyenv/versions/3.8.5/lib/python3.8/site-packages/temporal/api/workflowservice/v1.py", line 828, in poll_workflow_task_queue
    return await self._unary_unary(
  File "/Users/chad.greenburg/.pyenv/versions/3.8.5/lib/python3.8/site-packages/betterproto/__init__.py", line 1133, in _unary_unary
    response = await stream.recv_message()
  File "/Users/chad.greenburg/.pyenv/versions/3.8.5/lib/python3.8/site-packages/grpclib/client.py", line 408, in recv_message
    await self.recv_initial_metadata()
  File "/Users/chad.greenburg/.pyenv/versions/3.8.5/lib/python3.8/site-packages/grpclib/client.py", line 380, in recv_initial_metadata
    self.initial_metadata = im
  File "/Users/chad.greenburg/.pyenv/versions/3.8.5/lib/python3.8/site-packages/grpclib/utils.py", line 70, in __exit__
    raise self._error
asyncio.exceptions.TimeoutError: Deadline exceeded

Here's what I've uncovered:

The python-sdk starts up two parallel threads:

Both threads use the same grpc channel to communicate with the temporal server with a timeout set to 120s. Concurrent RPC calls are supported according to the grpclib docs: https://grpclib.readthedocs.io/en/latest/client.html

The workflow thread polls the workflow task queue and the activity thread polls the activity task queue. Both take 60 seconds before continuing the while loop to poll again if nothing is returned. When the activity thread receives something on the activity task queue, it starts running the activity code. Meanwhile, the workflow thread is in the middle of polling the workflow task queue.

What I'm noticing is that the workflow poll request is "blocked" and doesn't return like it usually would after 60 seconds. The workflow poll request doesn't complete until the activity in the other thread is finished. If an activity takes long enough to complete, the workflow poll request can take more than 120 seconds (note the timeout mentioned earlier) causing a deadline exceeded error.

Solution:

A couple of temporary workaround:

OR

firdaus commented 3 years ago

Hi @CGreenburg, I'm not able to reproduce this with the code below. Could you share some sample code that produces that error on your side.

import asyncio
import logging
from datetime import timedelta

from temporal.activity_method import activity_method
from temporal.workerfactory import WorkerFactory
from temporal.workflow import workflow_method, Workflow, WorkflowClient

logging.basicConfig(level=logging.INFO)

TASK_QUEUE = "HelloWorld"
NAMESPACE = "default"

# Activities Interface
class GreetingActivities:
    @activity_method(task_queue=TASK_QUEUE, schedule_to_close_timeout=timedelta(seconds=500))
    async def compose_greeting(self, greeting: str, name: str) -> str:
        raise NotImplementedError

# Activities Implementation
class GreetingActivitiesImpl:
    async def compose_greeting(self, greeting: str, name: str) -> str:
        await asyncio.sleep(200)
        return greeting + " " + name

# Workflow Interface
class GreetingWorkflow:
    @workflow_method(task_queue=TASK_QUEUE)
    async def get_greeting(self, name: str) -> str:
        raise NotImplementedError

# Workflow Implementation
class GreetingWorkflowImpl(GreetingWorkflow):

    def __init__(self):
        self.greeting_activities: GreetingActivities = Workflow.new_activity_stub(GreetingActivities)

    async def get_greeting(self, name):
        return await self.greeting_activities.compose_greeting("Hello!", name)

async def worker_main():
    client = WorkflowClient.new_client(namespace=NAMESPACE)
    factory = WorkerFactory(client, NAMESPACE)
    worker = factory.new_worker(TASK_QUEUE)
    worker.register_activities_implementation(GreetingActivitiesImpl(), "GreetingActivities")
    worker.register_workflow_implementation_type(GreetingWorkflowImpl)
    factory.start()
    print("Worker started")

if __name__ == '__main__':
    loop = asyncio.get_event_loop() 
    asyncio.ensure_future(worker_main())
    loop.run_forever()
dkryptr commented 3 years ago

We migrated a pyspark/pandas application to a workflow activity and it takes longer than 2 minutes. That's where we've seen the deadline exceeded exception. I've been replicating it using time.sleep(150).

firdaus commented 3 years ago

You might want to look into https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.loop.run_in_executor if your activity is CPU bound or if the APIs that you're using aren't async friendly.

firdaus commented 3 years ago

There was a thread about this in the past where I proposed creating an event loop per worker (running in its own thread) so that the activity code is free to block as it pleases but they feedback I got previously was that the library should do the bare minimum and allow the user to decide how to handle blocking calls.

https://community.temporal.io/t/timeline-for-python-client-support/223/26

firdaus commented 3 years ago

If you're able to, I would run my workflow and activities in separate workers.

firdaus commented 3 years ago

You can also try something like this:

import asyncio
import logging
from datetime import timedelta
from threading import Thread

from temporal.activity_method import activity_method
from temporal.workerfactory import WorkerFactory
from temporal.workflow import workflow_method, Workflow, WorkflowClient
import time

logging.basicConfig(level=logging.INFO)

TASK_QUEUE = "HelloWorld"
NAMESPACE = "default"

# Activities Interface
class GreetingActivities:
    @activity_method(task_queue=TASK_QUEUE, schedule_to_close_timeout=timedelta(seconds=500))
    async def compose_greeting(self, greeting: str, name: str) -> str:
        raise NotImplementedError

# Activities Implementation
class GreetingActivitiesImpl:
    async def compose_greeting(self, greeting: str, name: str) -> str:
        time.sleep(150)
        return greeting + " " + name

# Workflow Interface
class GreetingWorkflow:
    @workflow_method(task_queue=TASK_QUEUE)
    async def get_greeting(self, name: str) -> str:
        raise NotImplementedError

# Workflow Implementation
class GreetingWorkflowImpl(GreetingWorkflow):

    def __init__(self):
        self.greeting_activities: GreetingActivities = Workflow.new_activity_stub(GreetingActivities)

    async def get_greeting(self, name):
        return await self.greeting_activities.compose_greeting("Hello!", name)

async def worker_main(activities=False, workflows=False):
    client = WorkflowClient.new_client(namespace=NAMESPACE)
    factory = WorkerFactory(client, NAMESPACE)
    worker = factory.new_worker(TASK_QUEUE)
    if activities:
        worker.register_activities_implementation(GreetingActivitiesImpl(), "GreetingActivities")
    if workflows:
        worker.register_workflow_implementation_type(GreetingWorkflowImpl)
    factory.start()
    print("Worker started")

def thread1():
    asyncio.set_event_loop(asyncio.new_event_loop())
    loop = asyncio.get_event_loop()
    asyncio.ensure_future(worker_main(activities=True, workflows=False))
    loop.run_forever()

def thread2():
    asyncio.set_event_loop(asyncio.new_event_loop())
    loop = asyncio.get_event_loop()
    asyncio.ensure_future(worker_main(activities=False, workflows=True))
    loop.run_forever()

if __name__ == '__main__':
    t1  = Thread(target=thread1)
    t2  = Thread(target=thread2)
    t1.start()
    t2.start()
    t1.join()
    t2.join()
dkryptr commented 3 years ago

Thanks for the suggestions @firdaus!

In your thread example, it's okay that the two workers use the same task queue? They don't need to be different task queues?

firdaus commented 3 years ago

It's fine because there are separate GRPC calls behind the scenes for polling for workflow tasks and activity tasks so a workflow worker will never get the work of an activity worker.