Temporal is a distributed, scalable, durable, and highly available orchestration engine used to execute asynchronous, long-running business logic in a scalable and resilient way.
"Temporal Python SDK" is the framework for authoring workflows and activities using the Python programming language.
Also see:
In addition to features common across all Temporal SDKs, the Python SDK also has the following interesting features:
Type Safe
This library uses the latest typing and MyPy support with generics to ensure all calls can be typed. For example,
starting a workflow with an int
parameter when it accepts a str
parameter would cause MyPy to fail.
Different Activity Types
The activity worker has been developed to work with async def
, threaded, and multiprocess activities. While
async def
activities are the easiest and recommended, care has been taken to make heartbeating and cancellation also
work across threads/processes.
Custom asyncio
Event Loop
The workflow implementation basically turns async def
functions into workflows backed by a distributed, fault-tolerant
event loop. This means task management, sleep, cancellation, etc have all been developed to seamlessly integrate with
asyncio
concepts.
See the blog post introducing the Python SDK for an informal introduction to the features and their implementation.
Contents
We will guide you through the Temporal basics to create a "hello, world!" script on your machine. It is not intended as one of the ways to use Temporal, but in reality it is very simplified and decidedly not "the only way" to use Temporal. For more information, check out the docs references in "Next Steps" below the quick start.
Install the temporalio
package from PyPI.
These steps can be followed to use with a virtual environment and pip
:
pip
- python -m pip install -U pip
pip
may not pick the right wheelpython -m pip install temporalio
The SDK is now ready for use. To build from source, see "Building" near the end of this documentation.
NOTE: This README is for the current branch and not necessarily what's released on PyPI
.
Create the following in activities.py
:
from temporalio import activity
@activity.defn
def say_hello(name: str) -> str:
return f"Hello, {name}!"
Create the following in workflows.py
:
from datetime import timedelta
from temporalio import workflow
# Import our activity, passing it through the sandbox
with workflow.unsafe.imports_passed_through():
from .activities import say_hello
@workflow.defn
class SayHello:
@workflow.run
async def run(self, name: str) -> str:
return await workflow.execute_activity(
say_hello, name, schedule_to_close_timeout=timedelta(seconds=5)
)
Create the following in run_worker.py
:
import asyncio
import concurrent.futures
from temporalio.client import Client
from temporalio.worker import Worker
# Import the activity and workflow from our other files
from .activities import say_hello
from .workflows import SayHello
async def main():
# Create client connected to server at the given address
client = await Client.connect("localhost:7233")
# Run the worker
with concurrent.futures.ThreadPoolExecutor(max_workers=100) as activity_executor:
worker = Worker(
client,
task_queue="my-task-queue",
workflows=[SayHello],
activities=[say_hello],
activity_executor=activity_executor,
)
await worker.run()
if __name__ == "__main__":
asyncio.run(main())
Assuming you have a Temporal server running on localhost, this will run the worker:
python run_worker.py
Create the following script at run_workflow.py
:
import asyncio
from temporalio.client import Client
# Import the workflow from the previous code
from .workflows import SayHello
async def main():
# Create client connected to server at the given address
client = await Client.connect("localhost:7233")
# Execute a workflow
result = await client.execute_workflow(SayHello.run, "my name", id="my-workflow-id", task_queue="my-task-queue")
print(f"Result: {result}")
if __name__ == "__main__":
asyncio.run(main())
Assuming you have run_worker.py
running from before, this will run the workflow:
python run_workflow.py
The output will be:
Result: Hello, my-name!
Temporal can be implemented in your code in many different ways, to suit your application's needs. The links below will give you much more information about how Temporal works with Python:
From here, you will find reference documentation about specific pieces of the Temporal Python SDK that were built around Temporal concepts. This section is not intended as a how-to guide -- For more how-to oriented information, check out the links in the Next Steps section above.
A client can be created and used to start a workflow like so:
from temporalio.client import Client
async def main():
# Create client connected to server at the given address and namespace
client = await Client.connect("localhost:7233", namespace="my-namespace")
# Start a workflow
handle = await client.start_workflow(MyWorkflow.run, "some arg", id="my-workflow-id", task_queue="my-task-queue")
# Wait for result
result = await handle.result()
print(f"Result: {result}")
Some things to note about the above code:
Client
does not have an explicit "close"tls
argument to connect
can be set to True
or a TLSConfig
objectstart_workflow
. If there are multiple arguments, only the
non-type-safe form of start_workflow
can be used (i.e. the one accepting a string workflow name) and it must be in
the args
keyword argument.handle
represents the workflow that was started and can be used for more than just getting the resultclient.execute_workflow
which
does the same thingClients also provide a shallow copy of their config for use in making slightly different clients backed by the same
connection. For instance, given the client
above, this is how to have a client in another namespace:
config = client.config()
config["namespace"] = "my-other-namespace"
other_ns_client = Client(**config)
Data converters are used to convert raw Temporal payloads to/from actual Python types. A custom data converter of type
temporalio.converter.DataConverter
can be set via the data_converter
client parameter. Data converters are a
combination of payload converters, payload codecs, and failure converters. Payload converters convert Python values
to/from serialized bytes. Payload codecs convert bytes to bytes (e.g. for compression or encryption). Failure converters
convert exceptions to/from serialized failures.
The default data converter supports converting multiple types including:
None
bytes
google.protobuf.message.Message
- As JSON when encoding, but has ability to decode binary proto from other languagesjson.dump
supports nativelyset
dict()
method and a static parse_obj()
method, e.g.
Pydantic modelsThis notably doesn't include any date
, time
, or datetime
objects as they may not work across SDKs.
Users are strongly encouraged to use a single dataclass
for parameter and return types so fields with defaults can be
easily added without breaking compatibility.
Classes with generics may not have the generics properly resolved. The current implementation does not have generic type resolution. Users should use concrete types.
For converting from JSON, the workflow/activity type hint is taken into account to convert to the proper type. Care has
been taken to support all common typings including Optional
, Union
, all forms of iterables and mappings, NewType
,
etc in addition to the regular JSON values mentioned before.
Data converters contain a reference to a payload converter class that is used to convert to/from payloads/values. This
is a class and not an instance because it is instantiated on every workflow run inside the sandbox. The payload
converter is usually a CompositePayloadConverter
which contains a multiple EncodingPayloadConverter
s it uses to try
to serialize/deserialize payloads. Upon serialization, each EncodingPayloadConverter
is tried until one succeeds. The
EncodingPayloadConverter
provides an "encoding" string serialized onto the payload so that, upon deserialization, the
specific EncodingPayloadConverter
for the given "encoding" is used.
The default data converter uses the DefaultPayloadConverter
which is simply a CompositePayloadConverter
with a known
set of default EncodingPayloadConverter
s. To implement a custom encoding for a custom type, a new
EncodingPayloadConverter
can be created for the new type. For example, to support IPv4Address
types:
class IPv4AddressEncodingPayloadConverter(EncodingPayloadConverter):
@property
def encoding(self) -> str:
return "text/ipv4-address"
def to_payload(self, value: Any) -> Optional[Payload]:
if isinstance(value, ipaddress.IPv4Address):
return Payload(
metadata={"encoding": self.encoding.encode()},
data=str(value).encode(),
)
else:
return None
def from_payload(self, payload: Payload, type_hint: Optional[Type] = None) -> Any:
assert not type_hint or type_hint is ipaddress.IPv4Address
return ipaddress.IPv4Address(payload.data.decode())
class IPv4AddressPayloadConverter(CompositePayloadConverter):
def __init__(self) -> None:
# Just add ours as first before the defaults
super().__init__(
IPv4AddressEncodingPayloadConverter(),
*DefaultPayloadConverter.default_encoding_payload_converters,
)
my_data_converter = dataclasses.replace(
DataConverter.default,
payload_converter_class=IPv4AddressPayloadConverter,
)
Imports are left off for brevity.
This is good for many custom types. However, sometimes you want to override the behavior of the just the existing JSON encoding payload converter to support a new type. It is already the last encoding data converter in the list, so it's the fall-through behavior for any otherwise unknown type. Customizing the existing JSON converter has the benefit of making the type work in lists, unions, etc.
The JSONPlainPayloadConverter
uses the Python json library with an
advanced JSON encoder by default and a custom value conversion method to turn json.load
ed values to their type hints.
The conversion can be customized for serialization with a custom json.JSONEncoder
and deserialization with a custom
JSONTypeConverter
. For example, to support IPv4Address
types in existing JSON conversion:
class IPv4AddressJSONEncoder(AdvancedJSONEncoder):
def default(self, o: Any) -> Any:
if isinstance(o, ipaddress.IPv4Address):
return str(o)
return super().default(o)
class IPv4AddressJSONTypeConverter(JSONTypeConverter):
def to_typed_value(
self, hint: Type, value: Any
) -> Union[Optional[Any], _JSONTypeConverterUnhandled]:
if issubclass(hint, ipaddress.IPv4Address):
return ipaddress.IPv4Address(value)
return JSONTypeConverter.Unhandled
class IPv4AddressPayloadConverter(CompositePayloadConverter):
def __init__(self) -> None:
# Replace default JSON plain with our own that has our encoder and type
# converter
json_converter = JSONPlainPayloadConverter(
encoder=IPv4AddressJSONEncoder,
custom_type_converters=[IPv4AddressJSONTypeConverter()],
)
super().__init__(
*[
c if not isinstance(c, JSONPlainPayloadConverter) else json_converter
for c in DefaultPayloadConverter.default_encoding_payload_converters
]
)
my_data_converter = dataclasses.replace(
DataConverter.default,
payload_converter_class=IPv4AddressPayloadConverter,
)
Now IPv4Address
can be used in type hints including collections, optionals, etc.
Workers host workflows and/or activities. Here's how to run a worker:
import asyncio
import logging
from temporalio.client import Client
from temporalio.worker import Worker
# Import your own workflows and activities
from my_workflow_package import MyWorkflow, my_activity
async def run_worker(stop_event: asyncio.Event):
# Create client connected to server at the given address
client = await Client.connect("localhost:7233", namespace="my-namespace")
# Run the worker until the event is set
worker = Worker(client, task_queue="my-task-queue", workflows=[MyWorkflow], activities=[my_activity])
async with worker:
await stop_event.wait()
Some things to note about the above code:
async with
, run()
and shutdown()
may be used insteadWorkflows are defined as classes decorated with @workflow.defn
. The method invoked for the workflow is decorated with
@workflow.run
. Methods for signals, queries, and updates are decorated with @workflow.signal
, @workflow.query
and @workflow.update
respectively. Here's an example of a workflow:
import asyncio
from datetime import timedelta
from temporalio import workflow
# Pass the activities through the sandbox
with workflow.unsafe.imports_passed_through():
from .my_activities import GreetingInfo, create_greeting_activity
@workflow.defn
class GreetingWorkflow:
def __init__(self) -> None:
self._current_greeting = "<unset>"
self._greeting_info = GreetingInfo()
self._greeting_info_update = asyncio.Event()
self._complete = asyncio.Event()
@workflow.run
async def run(self, name: str) -> str:
self._greeting_info.name = name
while True:
# Store greeting
self._current_greeting = await workflow.execute_activity(
create_greeting_activity,
self._greeting_info,
start_to_close_timeout=timedelta(seconds=5),
)
workflow.logger.debug("Greeting set to %s", self._current_greeting)
# Wait for salutation update or complete signal (this can be
# cancelled)
await asyncio.wait(
[
asyncio.create_task(self._greeting_info_update.wait()),
asyncio.create_task(self._complete.wait()),
],
return_when=asyncio.FIRST_COMPLETED,
)
if self._complete.is_set():
return self._current_greeting
self._greeting_info_update.clear()
@workflow.signal
async def update_salutation(self, salutation: str) -> None:
self._greeting_info.salutation = salutation
self._greeting_info_update.set()
@workflow.signal
async def complete_with_greeting(self) -> None:
self._complete.set()
@workflow.query
def current_greeting(self) -> str:
return self._current_greeting
@workflow.update
def set_and_get_greeting(self, greeting: str) -> str:
old = self._current_greeting
self._current_greeting = greeting
return old
This assumes there's an activity in my_activities.py
like:
from dataclasses import dataclass
from temporalio import workflow
@dataclass
class GreetingInfo:
salutation: str = "Hello"
name: str = "<unknown>"
@activity.defn
def create_greeting_activity(info: GreetingInfo) -> str:
return f"{info.salutation}, {info.name}!"
Some things to note about the above workflow code:
temporalio
imports should usually be "passed through" the sandbox. See the
Workflow Sandbox section for more details.@workflow.run
which is an async def
functionset
iteration, threading, no randomness, no external calls to
processes, no network IO, and no global state mutation. All code must run in the implicit asyncio
event loop and be
deterministic. Also see the Asyncio and Determinism section later.@activity.defn
is explained in a later section. For normal simple string concatenation, this would just be done in
the workflow. The activity is for demonstration purposes only.workflow.execute_activity(create_greeting_activity, ...
is actually a typed signature, and MyPy will fail if the
self._greeting_info
parameter is not a GreetingInfo
Here are the decorators that can be applied:
@workflow.defn
- Defines a workflow class
name
param to customize the workflow name, otherwise it defaults to the unqualified class namedynamic=True
which means all otherwise unhandled workflows fall through to this. If present, cannot have
name
argument, and run method must accept a single parameter of Sequence[temporalio.common.RawValue]
type. The
payload of the raw value can be converted via workflow.payload_converter().from_payload
.@workflow.run
- Defines the primary workflow run method
@workflow.defn
, not a base class (but can also be defined on the same
method of a base class)async def
methodself
, followed by positional arguments. Best practice is to only take a single
argument that is an object/dataclass of fields that can be added to as needed.@workflow.init
- Specifies that the __init__
method accepts the workflow's arguments.
__init__
method, the parameters of which must then be identical to those of
the @workflow.run
method.__init__
method, before any signal or update handler has a chance to execute.@workflow.signal
- Defines a method as a signal
async
or non-async
method at any point in the class hierarchy, but if the decorated method
is overridden, then the override must also be decorated.name
param to customize the signal name, otherwise it defaults to the unqualified method name.dynamic=True
which means all otherwise unhandled signals fall through to this. If present, cannot have
name
argument, and method parameters must be self
, a string signal name, and a
Sequence[temporalio.common.RawValue]
.@workflow.update
- Defines a method as an update
async
or non-async
method at any point in the class hierarchy, but if the decorated method
is overridden, then the override must also be decorated.async
or non-async
name
and dynamic
parameters like signal, with the same semantics.@update_handler_method.validator
.
To reject an update before any events are written to history, throw an exception in a validator. Validators cannot
be async
, cannot mutate workflow state, and return nothing.@workflow.query
- Defines a method as a query
async
name
and dynamic
parameters like signal and update, with the same semantics.To start a locally-defined workflow from a client, you can simply reference its method like so:
from temporalio.client import Client
from my_workflow_package import GreetingWorkflow
async def create_greeting(client: Client) -> str:
# Start the workflow
handle = await client.start_workflow(GreetingWorkflow.run, "my name", id="my-workflow-id", task_queue="my-task-queue")
# Change the salutation
await handle.signal(GreetingWorkflow.update_salutation, "Aloha")
# Tell it to complete
await handle.signal(GreetingWorkflow.complete_with_greeting)
# Wait and return result
return await handle.result()
Some things to note about the above code:
GreetingWorkflow
from the previous section"Aloha, my name!"
id
and task_queue
are required for running a workflowclient.start_workflow
is typed, so MyPy would fail if "my name"
were something besides a stringhandle.signal
is typed, so MyPy would fail if "Aloha"
were something besides a string or if we provided a
parameter to the parameterless complete_with_greeting
handle.result
is typed to the workflow itself, so MyPy would fail if we said this create_greeting
returned
something besides a stringworkflow.start_activity()
which accepts either an activity function reference
or a string name.args
keyword argument.start_to_close_timeout
or schedule_to_close_timeout
must be provided.asyncio.Task
and supports basic task featuresworkflow.execute_activity()
helper is provided which takes the same arguments as
workflow.start_activity()
and await
s on the result. This should be used in most cases unless advanced task
capabilities are needed.workflow.start_local_activity()
and
workflow.execute_local_activity()
workflow.start_activity_method()
,
workflow.execute_activity_method()
, workflow.start_local_activity_method()
, and
workflow.execute_local_activity_method()
instead.__call__
). Invokers should use workflow.start_activity_class()
,
workflow.execute_activity_class()
, workflow.start_local_activity_class()
, and
workflow.execute_local_activity_class()
instead.workflow.start_child_workflow()
which accepts either a workflow run method
reference or a string name. The arguments to the workflow are positional.args
keyword argument.id
must be provided.await
of the start does not complete until the start has been accepted by the serverasyncio.Task
and supports basic task features. The handle also has
some child info and supports signalling the child workflowworkflow.execute_child_workflow()
helper is provided which takes the same arguments as
workflow.start_child_workflow()
and await
s on the result. This should be used in most cases unless advanced task
capabilities are needed.asyncio.sleep()
asyncio
calls with timeouts (e.g. asyncio.wait_for
)call_at
or timeout_at
, should be based on the current loop time
(i.e. workflow.time()
) and not an actual point in time. This is because fixed times are translated to relative ones
by subtracting the current loop time which may not be the actual current time.workflow.wait_condition
is an async function that doesn't return until a provided callback returns truetimeout
can optionally be provided which will throw a asyncio.TimeoutError
if reached (internally backed by
asyncio.wait_for
which uses a timer)Workflows must be deterministic. Workflows are backed by a custom
asyncio event loop. This means many of the common asyncio
calls work
as normal. Some asyncio features are disabled such as:
to_thread()
, run_coroutine_threadsafe()
, loop.run_in_executor()
, etcloop.close()
, loop.stop()
, loop.run_forever()
,
loop.set_task_factory()
, etcAlso, there are some asyncio
utilities that internally use set()
which can make them non-deterministic from one
worker to the next. Therefore the following asyncio
functions have workflow
-module alternatives that are
deterministic:
asyncio.as_completed()
- use workflow.as_completed()
asyncio.wait()
- use workflow.wait()
Cancellation is done using asyncio
task cancellation.
This means that tasks are requested to be cancelled but can catch the
asyncio.CancelledError
, thus
allowing them to perform some cleanup before allowing the cancellation to proceed (i.e. re-raising the error), or to
deny the cancellation entirely. It also means that
asyncio.shield()
can be used to
protect tasks against cancellation.
The following tasks, when cancelled, perform a Temporal cancellation:
When the workflow itself is requested to cancel, Task.cancel
is called on the main workflow task. Therefore,
asyncio.CancelledError
can be caught in order to handle the cancel gracefully.
Workflows follow asyncio
cancellation rules exactly which can cause confusion among Python developers. Cancelling a
task doesn't always cancel the thing it created. For example, given
task = asyncio.create_task(workflow.start_child_workflow(...
, calling task.cancel
does not cancel the child
workflow, it only cancels the starting of it, which has no effect if it has already started. However, cancelling the
result of handle = await workflow.start_child_workflow(...
or
task = asyncio.create_task(workflow.execute_child_workflow(...
does cancel the child workflow.
Also, due to Temporal rules, a cancellation request is a state not an event. Therefore, repeated cancellation requests are not delivered, only the first. If the workflow chooses swallow a cancellation, it cannot be requested again.
While running in a workflow, in addition to features documented elsewhere, the following items are available from the
temporalio.workflow
package:
continue_as_new()
- Async function to stop the workflow immediately and continue as newinfo()
- Returns information about the current workflowlogger
- A logger for use in a workflow (properly skips logging on replay)now()
- Returns the "current time" from the workflow's perspectivetemporalio.exceptions.FailureError
will fail the workflow with that exception
temporalio.exceptions.ApplicationError
. This can
be marked non-retryable or include details as needed.FailureError
and will fail the workflow when uncaught.temporalio.exceptions.FailureError
raised in an update handler will fail
the update instead of failing the workflow.ApplicationError
as mentioned above.This default can be changed by providing a list of exception types to workflow_failure_exception_types
when creating a
Worker
or failure_exception_types
on the @workflow.defn
decorator. If a workflow-thrown exception is an instance
of any type in either list, it will fail the workflow (or update) instead of the workflow task. This means a value of
[Exception]
will cause every exception to fail the workflow instead of the workflow task. Also, as a special case, if
temporalio.workflow.NondeterminismError
(or any superclass of it) is set, non-deterministic exceptions will fail the
workflow. WARNING: These settings are experimental.
Signal and update handlers are defined using decorated methods as shown in the example above. Client code
sends signals and updates using workflow_handle.signal
, workflow_handle.execute_update
, or
workflow_handle.start_update
. When the workflow receives one of these requests, it starts an asyncio.Task
executing
the corresponding handler method with the argument(s) from the request.
The handler methods may be async def
and can do all the async operations described above (e.g. invoking activities and
child workflows, and waiting on timers and conditions). Notice that this means that handler tasks will be executing
concurrently with respect to each other and the main workflow task. Use
asyncio.Lock and
asyncio.Semaphore if necessary.
Your main workflow task may finish as a result of successful completion, cancellation, continue-as-new, or failure. You
should ensure that all in-progress signal and update handler tasks have finished before this happens; if you do not, you
will see a warning (the warning can be disabled via the workflow.signal
/workflow.update
decorators). One way to
ensure that handler tasks have finished is to wait on the workflow.all_handlers_finished
condition:
await workflow.wait_condition(workflow.all_handlers_finished)
workflow.get_external_workflow_handle()
inside a workflow returns a handle to interact with another workflowworkflow.get_external_workflow_handle_for()
can be used instead for a type safe handleawait handle.signal()
can be called on the handle to signal the external workflowawait handle.cancel()
can be called on the handle to send a cancel to the external workflowWorkflow testing can be done in an integration-test fashion against a real server, however it is hard to simulate timeouts and other long time-based code. Using the time-skipping workflow test environment can help there.
The time-skipping temporalio.testing.WorkflowEnvironment
can be created via the static async start_time_skipping()
.
This internally downloads the Temporal time-skipping test server to a temporary directory if it doesn't already exist,
then starts the test server which has special APIs for skipping time.
NOTE: The time-skipping test environment does not work on ARM. The SDK will try to download the x64 binary on macOS for use with the Intel emulator, but for Linux or Windows ARM there is no proper time-skipping test server at this time.
Anytime a workflow result is waited on, the time-skipping server automatically advances to the next event it can. To
manually advance time before waiting on the result of a workflow, the WorkflowEnvironment.sleep
method can be used.
Here's a simple example of a workflow that sleeps for 24 hours:
import asyncio
from temporalio import workflow
@workflow.defn
class WaitADayWorkflow:
@workflow.run
async def run(self) -> str:
await asyncio.sleep(24 * 60 * 60)
return "all done"
An integration test of this workflow would be way too slow. However the time-skipping server automatically skips to the next event when we wait on the result. Here's a test for that workflow:
from temporalio.testing import WorkflowEnvironment
from temporalio.worker import Worker
async def test_wait_a_day_workflow():
async with await WorkflowEnvironment.start_time_skipping() as env:
async with Worker(env.client, task_queue="tq1", workflows=[WaitADayWorkflow]):
assert "all done" == await env.client.execute_workflow(WaitADayWorkflow.run, id="wf1", task_queue="tq1")
That test will run almost instantly. This is because by calling execute_workflow
on our client, we have asked the
environment to automatically skip time as much as it can (basically until the end of the workflow or until an activity
is run).
To disable automatic time-skipping while waiting for a workflow result, run code inside a
with env.auto_time_skipping_disabled():
block.
Until a workflow is waited on, all time skipping in the time-skipping environment is done manually via
WorkflowEnvironment.sleep
.
Here's workflow that waits for a signal or times out:
import asyncio
from temporalio import workflow
@workflow.defn
class SignalWorkflow:
def __init__(self) -> None:
self.signal_received = False
@workflow.run
async def run(self) -> str:
# Wait for signal or timeout in 45 seconds
try:
await workflow.wait_condition(lambda: self.signal_received, timeout=45)
return "got signal"
except asyncio.TimeoutError:
return "got timeout"
@workflow.signal
def some_signal(self) -> None:
self.signal_received = True
To test a normal signal, you might:
from temporalio.testing import WorkflowEnvironment
from temporalio.worker import Worker
async def test_signal_workflow():
async with await WorkflowEnvironment.start_time_skipping() as env:
async with Worker(env.client, task_queue="tq1", workflows=[SignalWorkflow]):
# Start workflow, send signal, check result
handle = await env.client.start_workflow(SignalWorkflow.run, id="wf1", task_queue="tq1")
await handle.signal(SignalWorkflow.some_signal)
assert "got signal" == await handle.result()
But how would you test the timeout part? Like so:
from temporalio.testing import WorkflowEnvironment
from temporalio.worker import Worker
async def test_signal_workflow_timeout():
async with await WorkflowEnvironment.start_time_skipping() as env:
async with Worker(env.client, task_queue="tq1", workflows=[SignalWorkflow]):
# Start workflow, advance time past timeout, check result
handle = await env.client.start_workflow(SignalWorkflow.run, id="wf1", task_queue="tq1")
await env.sleep(50)
assert "got timeout" == await handle.result()
Also, the current time of the workflow environment can be obtained via the async WorkflowEnvironment.get_current_time
method.
Activities are just functions decorated with @activity.defn
. Simply write different ones and pass those to the worker
to have different activities called during the test.
By default workflows are run in a sandbox to help avoid non-deterministic code. If a call that is known to be non-deterministic is performed, an exception will be thrown in the workflow which will "fail the task" which means the workflow will not progress until fixed.
The sandbox is not foolproof and non-determinism can still occur. It is simply a best-effort way to catch bad code early. Users are encouraged to define their workflows in files with no other side effects.
The sandbox offers a mechanism to pass through modules from outside the sandbox. By default this already includes all standard library modules and Temporal modules. For performance and behavior reasons, users are encouraged to pass through all third party modules whose calls will be deterministic. This includes modules containing the activities to be referenced in workflows. See "Passthrough Modules" below on how to do this.
If you are getting an error like:
temporalio.worker.workflow_sandbox.restrictions.RestrictedWorkflowAccessError: Cannot access http.client.IncompleteRead.\_mroentries_\ from inside a workflow. If this is code from a module not used in a workflow or known to only be used deterministically from a workflow, mark the import as pass through.
Then you are either using an invalid construct from the workflow, this is a known limitation of the sandbox, or most commonly this is from a module that is safe to pass through (see "Passthrough Modules" section below).
The sandbox is made up of two components that work closely together:
Global state isolation is performed by using exec
. Upon workflow start, the file that the workflow is defined in is
imported into a new sandbox created for that workflow run. In order to keep the sandbox performant a known set of
"passthrough modules" are passed through from outside of the sandbox when they are imported. These are expected to be
side-effect free on import and have their non-deterministic aspects restricted. By default the entire Python standard
library, temporalio
, and a couple of other modules are passed through from outside of the sandbox. To update this
list, see "Customizing the Sandbox".
Restrictions preventing known non-deterministic library calls are achieved using proxy objects on modules wrapped around
the custom importer set in the sandbox. Many restrictions apply at workflow import time and workflow run time, while
some restrictions only apply at workflow run time. A default set of restrictions is included that prevents most
dangerous standard library calls. However it is known in Python that some otherwise-non-deterministic invocations, like
reading a file from disk via open
or using os.environ
, are done as part of importing modules. To customize what is
and isn't restricted, see "Customizing the Sandbox".
There are three increasingly-scoped ways to avoid the sandbox. Users are discouraged from avoiding the sandbox if possible.
To remove restrictions around a particular block of code, use with temporalio.workflow.unsafe.sandbox_unrestricted():
.
The workflow will still be running in the sandbox, but no restrictions for invalid library calls will be applied.
To run an entire workflow outside of a sandbox, set sandboxed=False
on the @workflow.defn
decorator when defining
it. This will run the entire workflow outside of the workflow which means it can share global state and other bad
things.
To disable the sandbox entirely for a worker, set the Worker
init's workflow_runner
keyword argument to
temporalio.worker.UnsandboxedWorkflowRunner()
. This value is defaulted to
temporalio.worker.workflow_sandbox.SandboxedWorkflowRunner()
so by changing it to the unsandboxed runner, the sandbox
will not be used at all.
⚠️ WARNING: APIs in the temporalio.worker.workflow_sandbox
module are not yet considered stable and may change in
future releases.
When creating the Worker
, the workflow_runner
is defaulted to
temporalio.worker.workflow_sandbox.SandboxedWorkflowRunner()
. The SandboxedWorkflowRunner
's init accepts a
restrictions
keyword argument that is defaulted to SandboxRestrictions.default
. The SandboxRestrictions
dataclass
is immutable and contains three fields that can be customized, but only two have notable value. See below.
By default the sandbox completely reloads non-standard-library and non-Temporal modules for every workflow run. To make the sandbox quicker and use less memory when importing known-side-effect-free third party modules, they can be marked as passthrough modules.
For performance and behavior reasons, users are encouraged to pass through all third party modules whose calls will be deterministic.
One way to pass through a module is at import time in the workflow file using the imports_passed_through
context
manager like so:
# my_workflow_file.py
from temporalio import workflow
with workflow.unsafe.imports_passed_through():
import pydantic
@workflow.defn
class MyWorkflow:
...
Alternatively, this can be done at worker creation time by customizing the runner's restrictions. For example:
my_worker = Worker(
...,
workflow_runner=SandboxedWorkflowRunner(
restrictions=SandboxRestrictions.default.with_passthrough_modules("pydantic")
)
)
In both of these cases, now the pydantic
module will be passed through from outside of the sandbox instead of
being reloaded for every workflow run.
SandboxRestrictions.invalid_module_members
contains a root matcher that applies to all module members. This already
has a default set which includes things like datetime.date.today()
which should never be called from a workflow. To
remove this restriction:
my_restrictions = dataclasses.replace(
SandboxRestrictions.default,
invalid_module_members=SandboxRestrictions.invalid_module_members_default.with_child_unrestricted(
"datetime", "date", "today",
),
)
my_worker = Worker(..., workflow_runner=SandboxedWorkflowRunner(restrictions=my_restrictions))
Restrictions can also be added by |
'ing together matchers, for example to restrict the datetime.date
class from
being used altogether:
my_restrictions = dataclasses.replace(
SandboxRestrictions.default,
invalid_module_members=SandboxRestrictions.invalid_module_members_default | SandboxMatcher(
children={"datetime": SandboxMatcher(use={"date"})},
),
)
my_worker = Worker(..., workflow_runner=SandboxedWorkflowRunner(restrictions=my_restrictions))
See the API for more details on exact fields and their meaning.
Below are known sandbox issues. As the sandbox is developed and matures, some may be resolved.
Currently the sandbox references/alters the global sys.modules
and builtins
fields while running workflow code. In
order to prevent affecting other sandboxed code, thread locals are leveraged to only intercept these values during the
workflow thread running. Therefore, technically if top-level import code starts a thread, it may lose sandbox
protection.
The sandbox is built to catch many non-deterministic and state sharing issues, but it is not secure. Some known bad
calls are intercepted, but for performance reasons, every single attribute get/set cannot be checked. Therefore a simple
call like setattr(temporalio.common, "__my_key", "my value")
will leak across sandbox runs.
The sandbox is only a helper, it does not provide full protection.
The sandbox does not add significant CPU or memory overhead for workflows that are in files which only import standard library modules. This is because they are passed through from outside of the sandbox. However, every non-standard-library import that is performed at the top of the same file the workflow is in will add CPU overhead (the module is re-imported every workflow run) and memory overhead (each module independently cached as part of the workflow run for isolation reasons). This becomes more apparent for large numbers of workflow runs.
To mitigate this, users should:
Extending a restricted class causes Python to instantiate the restricted metaclass which is unsupported. Therefore if
you attempt to use a class in the sandbox that extends a restricted class, it will fail. For example, if you have a
class MyZipFile(zipfile.ZipFile)
and try to use that class inside a workflow, it will fail.
Classes used inside the workflow should not extend restricted classes. For situations where third-party modules need to at import time, they should be marked as pass through modules.
If an object is restricted, internal C Python validation may fail in some cases. For example, running
dict.items(os.__dict__)
will fail with:
descriptor 'items' for 'dict' objects doesn't apply to a '_RestrictedProxy' object
This is a low-level check that cannot be subverted. The solution is to not use restricted objects inside the sandbox. For situations where third-party modules need to at import time, they should be marked as pass through modules.
Due to https://bugs.python.org/issue44847, classes that are wrapped and then
checked to see if they are subclasses of another via is_subclass
may fail (see also
this wrapt issue).
If the Pydantic dependency is in compiled form (the default) and you are using a Pydantic model inside a workflow
sandbox that uses a datetime
type, it will grab the wrong validator and use date
instead. This is because our
patched form of issubclass
is bypassed by compiled Pydantic.
To work around, either don't use datetime
-based Pydantic model fields in workflows, or mark datetime
library as
passthrough (means you lose protection against calling the non-deterministic now()
), or use non-compiled Pydantic
dependency.
Activities are decorated with @activity.defn
like so:
from temporalio import activity
@activity.defn
def say_hello_activity(name: str) -> str:
return f"Hello, {name}!"
Some things to note about activity definitions:
say_hello_activity
is synchronous which is the recommended activity type (see "Types of Activities" below), but
it can be async
@activity.defn(name="my activity")
__call__
). An instance of the class should be
what is registered with the worker.@activity.defn
can have dynamic=True
set which means all otherwise unhandled activities fall through to this.
If present, cannot have name
argument, and the activity function must accept a single parameter of
Sequence[temporalio.common.RawValue]
. The payload of the raw value can be converted via
activity.payload_converter().from_payload
.There are 3 types of activity callables accepted and described below: synchronous multithreaded, synchronous multiprocess/other, and asynchronous. Only positional parameters are allowed in activity callables.
Synchronous activities, i.e. functions that do not have async def
, can be used with workers, but the
activity_executor
worker parameter must be set with a concurrent.futures.Executor
instance to use for executing the
activities.
All long running, non-local activities should heartbeat so they can be cancelled. Cancellation in threaded activities throws but multiprocess/other activities does not. The sections below on each synchronous type explain further. There are also calls on the context that can check for cancellation. For more information, see "Activity Context" and "Heartbeating and Cancellation" sections later.
Note, all calls from an activity to functions in the temporalio.activity
package are powered by
contextvars. Therefore, new threads starting inside of
activities must copy_context()
and then .run()
manually to ensure temporalio.activity
calls like heartbeat
still
function in the new threads.
If any activity ever throws a concurrent.futures.BrokenExecutor
, the failure is consisted unrecoverable and the worker
will fail and shutdown.
If activity_executor
is set to an instance of concurrent.futures.ThreadPoolExecutor
then the synchronous activities
are considered multithreaded activities. If max_workers
is not set to at least the worker's
max_concurrent_activities
setting a warning will be issued. Besides activity_executor
, no other worker parameters
are required for synchronous multithreaded activities.
By default, cancellation of a synchronous multithreaded activity is done via a temporalio.exceptions.CancelledError
thrown into the activity thread. Activities that do not wish to have cancellation thrown can set
no_thread_cancel_exception=True
in the @activity.defn
decorator.
Code that wishes to be temporarily shielded from the cancellation exception can run inside
with activity.shield_thread_cancel_exception():
. But once the last nested form of that block is finished, even if
there is a return statement within, it will throw the cancellation if there was one. A try
+
except temporalio.exceptions.CancelledError
would have to surround the with
to handle the cancellation explicitly.
If activity_executor
is set to an instance of concurrent.futures.Executor
that is not
concurrent.futures.ThreadPoolExecutor
, then the synchronous activities are considered multiprocess/other activities.
Users should prefer threaded activities over multiprocess ones since, among other reasons, threaded activities can raise
on cancellation.
These require special primitives for heartbeating and cancellation. The shared_state_manager
worker parameter must be
set to an instance of temporalio.worker.SharedStateManager
. The most common implementation can be created by passing a
multiprocessing.managers.SyncManager
(i.e. result of multiprocessing.managers.Manager()
) to
temporalio.worker.SharedStateManager.create_from_multiprocessing()
.
Also, all of these activity functions must be "picklable".
Asynchronous activities are functions defined with async def
. Asynchronous activities are often much more performant
than synchronous ones. When using asynchronous activities no special worker parameters are needed.
⚠️ WARNING: Do not block the thread in async def
Python functions. This can stop the processing of the rest of the
Temporal.
Cancellation for asynchronous activities is done via
asyncio.Task.cancel
. This means that
asyncio.CancelledError
will be raised (and can be caught, but it is not recommended). A non-local activity must
heartbeat to receive cancellation and there are other ways to be notified about cancellation (see "Activity Context" and
"Heartbeating and Cancellation" later).
During activity execution, an implicit activity context is set as a
context variable. The context variable itself is not visible, but
calls in the temporalio.activity
package make use of it. Specifically:
in_activity()
- Whether an activity context is presentinfo()
- Returns the immutable info of the currently running activityheartbeat(*details)
- Record a heartbeatis_cancelled()
- Whether a cancellation has been requested on this activitywait_for_cancelled()
- async
call to wait for cancellation requestwait_for_cancelled_sync(timeout)
- Synchronous blocking call to wait for cancellation requestshield_thread_cancel_exception()
- Context manager for use in with
clauses by synchronous multithreaded activities
to prevent cancel exception from being thrown during the block of codeis_worker_shutdown()
- Whether the worker has started graceful shutdownwait_for_worker_shutdown()
- async
call to wait for start of graceful worker shutdownwait_for_worker_shutdown_sync(timeout)
- Synchronous blocking call to wait for start of graceful worker shutdownraise_complete_async()
- Raise an error that this activity will be completed asynchronously (i.e. after return of
the activity function in a separate client call)With the exception of in_activity()
, if any of the functions are called outside of an activity context, an error
occurs. Synchronous activities cannot call any of the async
functions.
In order for a non-local activity to be notified of cancellation requests, it must be given a heartbeat_timeout
at
invocation time and invoke temporalio.activity.heartbeat()
inside the activity. It is strongly recommended that all
but the fastest executing activities call this function regularly. "Types of Activities" has specifics on cancellation
for synchronous and asynchronous activities.
In addition to obtaining cancellation information, heartbeats also support detail data that is persisted on the server
for retrieval during activity retry. If an activity calls temporalio.activity.heartbeat(123, 456)
and then fails and
is retried, temporalio.activity.info().heartbeat_details
will return an iterable containing 123
and 456
on the
next run.
Heartbeating has no effect on local activities.
An activity can react to a worker shutdown. Using is_worker_shutdown
or one of the wait_for_worker_shutdown
functions an activity can react to a shutdown.
When the graceful_shutdown_timeout
worker parameter is given a datetime.timedelta
, on shutdown the worker will
notify activities of the graceful shutdown. Once that timeout has passed (or if wasn't set), the worker will perform
cancellation of all outstanding activities.
The shutdown()
invocation will wait on all activities to complete, so if a long-running activity does not at least
respect cancellation, the shutdown may never complete.
Unit testing an activity or any code that could run in an activity is done via the
temporalio.testing.ActivityEnvironment
class. Simply instantiate this and any callable + params passed to run
will
be invoked inside the activity context. The following are attributes/methods on the environment that can be used to
affect calls activity code might make to functions on the temporalio.activity
package.
info
property can be set to customize what is returned from activity.info()
on_heartbeat
property can be set to handle activity.heartbeat()
callscancel()
can be invoked to simulate a cancellation of the activityworker_shutdown()
can be invoked to simulate a worker shutdown during execution of the activityGiven a workflow's history, it can be replayed locally to check for things like non-determinism errors. For example,
assuming history_str
is populated with a JSON string history either exported from the web UI or from tctl
, the
following function will replay it:
from temporalio.client import WorkflowHistory
from temporalio.worker import Replayer
async def run_replayer(history_str: str):
replayer = Replayer(workflows=[SayHello])
await replayer.replay_workflow(WorkflowHistory.from_json(history_str))
This will throw an error if any non-determinism is detected.
Replaying from workflow history is a powerful concept that many use to test that workflow alterations won't cause non-determinisms with past-complete workflows. The following code will make sure that all workflow histories for a certain workflow type (i.e. workflow class) are safe with the current code.
from temporalio.client import Client, WorkflowHistory
from temporalio.worker import Replayer
async def check_past_histories(my_client: Client):
replayer = Replayer(workflows=[SayHello])
await replayer.replay_workflows(
await my_client.list_workflows("WorkflowType = 'SayHello'").map_histories(),
)
OpenTelemetry support requires the optional opentelemetry
dependencies which are part of the opentelemetry
extra.
When using pip
, running
pip install temporalio[opentelemetry]
will install needed dependencies. Then the temporalio.contrib.opentelemetry.TracingInterceptor
can be created and set
as an interceptor on the interceptors
argument of Client.connect
. When set, spans will be created for all client
calls and for all activity and workflow invocations on the worker, spans will be created and properly serialized through
the server to give one proper trace for a workflow execution.
Python currently has two somewhat-incompatible protobuf library versions - the 3.x series and the 4.x series. Python currently recommends 4.x and that is the primary supported version. Some libraries like Pulumi require 4.x. Other libraries such as ONNX and Streamlit, for one reason or another, have/will not leave 3.x.
To support these, Temporal Python SDK allows any protobuf library >= 3.19. However, the C extension in older Python versions can cause issues with the sandbox due to global state sharing. Temporal strongly recommends using the latest protobuf 4.x library unless you absolutely cannot at which point some proto libraries may have to be marked as Passthrough Modules.
Below are known compatibility issues with the Python SDK.
When using gevent.monkey.patch_all()
, asyncio event loops can get messed up, especially those using custom event loops
like Temporal. See this gevent issue. This is a known incompatibility and
users are encouraged to not use gevent in asyncio applications (including Temporal). But if you must, there is
a sample showing how it is possible.
The Python SDK is built to work with Python 3.8 and newer. It is built using SDK Core which is written in Rust.
To build the SDK from source for use as a dependency, the following prerequisites are required:
pip
is in usepython -m pip install poetry
)python -m pip install poethepoet
)macOS note: If errors are encountered, it may be better to install Python and Rust as recommended from their websites
instead of via brew
.
With the prerequisites installed, first clone the SDK repository recursively:
git clone --recursive https://github.com/temporalio/sdk-python.git
cd sdk-python
Use poetry
to install the dependencies with --no-root
to not install this package (because we still need to build
it):
poetry install --no-root --all-extras
Now perform the release build:
This will take a while because Rust will compile the core project in release mode (see Local SDK development environment for the quicker approach to local development).
poetry build
The compiled wheel doesn't have the exact right tags yet for use, so run this script to fix it:
poe fix-wheel
The whl
wheel file in dist/
is now ready to use.
The wheel can now be installed into any virtual environment.
For example, create a virtual environment somewhere and then run the following inside the virtual environment:
pip install wheel
pip install /path/to/cloned/sdk-python/dist/*.whl
Create this Python file at example.py
:
import asyncio
from temporalio import workflow, activity
from temporalio.client import Client
from temporalio.worker import Worker
@workflow.defn
class SayHello:
@workflow.run
async def run(self, name: str) -> str:
return f"Hello, {name}!"
async def main():
client = await Client.connect("localhost:7233")
async with Worker(client, task_queue="my-task-queue", workflows=[SayHello]):
result = await client.execute_workflow(SayHello.run, "Temporal",
id="my-workflow-id", task_queue="my-task-queue")
print(f"Result: {result}")
if __name__ == "__main__":
asyncio.run(main())
Assuming there is a local Temporal server running, execute the
file with python
(or python3
if necessary):
python example.py
It should output:
Result: Hello, Temporal!
For local development, it is often quicker to use debug builds and a local virtual environment.
While not required, it often helps IDEs if we put the virtual environment .venv
directory in the project itself. This
can be configured system-wide via:
poetry config virtualenvs.in-project true
Now perform the same steps as the "Prepare" section above by installing the prerequisites, cloning the project, installing dependencies, and generating the protobuf code:
git clone --recursive https://github.com/temporalio/sdk-python.git
cd sdk-python
poetry install --no-root --all-extras
Now compile the Rust extension in develop mode which is quicker than release mode:
poe build-develop
That step can be repeated for any Rust changes made.
The environment is now ready to develop in.
To execute tests:
poe test
This runs against Temporalite. To run against the time-skipping test
server, pass --workflow-environment time-skipping
. To run against the default
namespace of an already-running
server, pass the host:port
to --workflow-environment
. Can also use regular pytest arguments. For example, here's how
to run a single test with debug logs on the console:
poe test -s --log-cli-level=DEBUG -k test_sync_activity_thread_cancel_caught
To allow for backwards compatibility, protobuf code is generated on the 3.x series of the protobuf library. To generate
protobuf code, you must be on Python <= 3.10, and then run poetry add "protobuf<4"
. Then the protobuf files can be
generated via poe gen-protos
. Tests can be run for protobuf version 3 by setting the TEMPORAL_TEST_PROTO3
env var
to 1
prior to running tests.
Do not commit poetry.lock
or pyproject.toml
changes. To go back from this downgrade, restore both of those files
and run poetry install --no-root --all-extras
. Make sure you poe format
the results.
For a less system-intrusive approach, you can (note this approach may have a bug):
docker build -f scripts/_proto/Dockerfile .
docker run --rm -v "${PWD}/temporalio/api:/api_new" -v "${PWD}/temporalio/bridge/proto:/bridge_new" <just built image sha>
poe format
dataclass
or partial
), but not allowed to do this for
any temporalio
packages (except temporalio.types
) or any classes/functions that aren't clear when unqualified.@staticmethod