Open dhiaayachi opened 2 months ago
Replay will return September 18-20 in Seattle! Secure your ticket.
Home Get started Courses Project-based tutorials Example applications Documentation
Search
Build a Background Check application with Temporal and Python
Develop for durability
On this page
Last updated on Aug 21, 2024
Tags:
This section explores how to build a Workflow for Durable Execution using the Temporal Python SDK.
Our Background Check Workflow will take a social security number, ssn
and an identifier, unique_id
, as input.
The Workflow will be responsible for coordinating a background check, which is implemented as an Activity.
Create a new file called background_check_workflow.py
.
workflows/backgroundcheck_dacx.py
from datetime import timedelta
from typing import Any, List
from temporalio import workflow
with workflow.unsafe.imports_passed_through():
from activities.ssntraceactivity_dacx import ssn_trace_activity
@workflow.defn
class BackgroundCheck:
@workflow.run
async def run(self, ssn: str, unique_id: str) -> str:
# Execute the Activity "ssn_trace_activity"
# retry for a maximum of 3 times
# wait for the activity to complete.
return await workflow.execute_activity(
ssn_trace_activity,
ssn,
unique_id,
schedule_to_close_timeout=timedelta(seconds=10),
retry_policy=workflow.RetryPolicy(
maximum_attempts=3,
maximum_interval=timedelta(seconds=10)
)
)
The Background Check Activity verifies the validity of the social security number passed as an argument to it. The Activity can be implemented as a function, and can be asynchronous or synchronous.
Create a new file called ssn_trace_activity.py
in the activities
directory.
activities/ssntraceactivity_dacx.py
from temporalio import activity
@activity.defn
async def ssn_trace_activity(ssn: str, unique_id: str) -> str:
# ... Implement logic to trace SSN.
if ssn == "555-55-5555":
return "pass"
else:
return "fail"
To run a Worker Process with a local development server, define the following steps in code:
In regards to organization, we recommend keeping Worker code separate from Workflow and Activity code.
Create a new file called main.py
in the worker
directory.
import asyncio
import os
from temporalio.client import Client, TLSConfig
from temporalio.worker import Worker
from activities.ssntraceactivity_dacx import ssn_trace_activity
from workflows.backgroundcheck_dacx import BackgroundCheck
async def main():
client = await Client.connect(
os.getenv("TEMPORAL_HOST_URL"),
namespace=os.getenv("TEMPORAL_NAMESPACE"),
tls=TLSConfig(
client_cert=open(os.getenv("TEMPORAL_MTLS_TLS_CERT"), "rb").read(),
client_private_key=open(os.getenv("TEMPORAL_MTLS_TLS_KEY"), "rb").read(),
),
)
worker = Worker(
client,
task_queue="backgroundcheck-boilerplate-task-queue",
workflows=[BackgroundCheck],
activities=[ssn_trace_activity],
)
await worker.run()
if __name__ == "__main__":
asyncio.run(main())
You can use the Temporal CLI to start a Workflow whether you are using a local development server, Temporal Cloud, or are in a self-hosted environment. However, you need to provide additional options to the command when operating with the Temporal Cloud or self-hosted environments.
How to start a Workflow with Temporal CLI when using Temporal Cloud
Run the temporal workflow start
command, and make sure to specify the certificate and private key arguments.
temporal workflow start \
--task-queue backgroundcheck-boilerplate-task-queue-cloud \
--type BackgroundCheck \
--tls-cert-path ca.pem \
--tls-key-path ca.key \
--input '"555-55-5555", "a-unique-id"' \
--namespace <namespace>.<account-id> \
--address <namespace>.<account-id>.tmprl.cloud:<port>
Make sure that the certificate path, private key path, Namespace, and address argument values match your project.
Use environment variables
Use environment variables as a way to quickly switch between a local dev server and Temporal Cloud, for example.
You can customize the environment names to be anything you want.
# set Cloud env variables
temporal env set cloud.namespace <namespace>.<account-id>
temporal env set cloud.address <namespace>.<account-id>.tmprl.cloud:<port>
temporal env set cloud.tls-cert-path ca.pem
temporal env set cloud.tls-key-path ca.key
# set local env variables
temporal env set local.namespace <namespace>
In this way, you can just provide a single --env
command option when using the Temporal CLI rather than specifying each connection option in every command.
temporal workflow start \
# ...
--env cloud \
# ...
Run the temporal workflow list
command, and make sure to specify the certificate and private key arguments.
temporal workflow list \
--tls-cert-path ca.pem \
--tls-key-path ca.key \
--namespace <namespace>.<account-id> \
--address <namespace>.<account-id>.tmprl.cloud:<port>
Visit the Workflows page of your Cloud Namespace. The URL will look something like the following:
https://cloud.temporal.io/namespaces/<namespace>.<account-id>/workflows
How to start a Workflow with the Temporal CLI when using a Self-hosted Cluster
Use your Temporal CLI alias to run the temporal workflow start
command and start your Workflow.
temporal_docker workflow start \
--task-queue backgroundcheck-boilerplate-task-queue-self-hosted \
--type BackgroundCheck \
--input '"555-55-5555", "a-unique-id"' \
--namespace backgroundcheck_namespace
Using your Temporal CLI alias, run the temporal workflow list
command.
This command lists the Workflows Executions within the Namespace:
temporal_docker workflow list \
--namespace backgroundcheck_namespace
When you visit for the first time, the Web UI directs you to http://localhost:8233/namespaces/default/workflows.
Use the Namespace dropdown to select the project Namespace you created earlier.
You should now be at http://localhost:8080/namespaces/backgroundcheck_namespace/workflows.
To start your Worker process:
Create a Dockerfile:
FROM python:3.11
RUN mkdir /app
COPY . /app
WORKDIR /app
RUN pip3 install poetry
RUN poetry config virtualenvs.create false
RUN poetry install
CMD [ "poetry", "run", "python", "/app/worker/main.py" ]
Build the image:
docker build . -t backgroundcheck-worker-image:latest
Run the Worker:
docker run --network temporal-network backgroundcheck-worker-image:latest
Each Temporal SDK has a testing suite that can be used in conjunction with a typical language specific testing framework. In the Temporal Python SDK, the testing package ( https://python.temporal.io/temporalio.testing.html) provides a test environment in which the Workflow and Activity code may be run for test purposes.
The BackgroundCheck
Workflow code checks the following conditions:
ssn_trace_activity
with the input SSN.We can also perform a Workflow Replay test, and we'll provide detailed coverage of this topic in another section.
This is a unit test written in Python using the pytest library.
The test checks the execute_workflow
method of the BackgroundCheck
Workflow.
The test creates a new WorkflowEnvironment
and a Worker
with a Task Queue and the BackgroundCheck
Workflow and ssn_trace_activity
activity.
Then, it executes the BackgroundCheck.run
method with a social security number and a unique ID, and asserts that the result is equal to "pass".
The test is marked with @pytest.mark.asyncio
to indicate that it is an asynchronous test.
docs/tutorials/python/background-check/code/backgroundcheck_dacx/tests/workflow_test.py
import uuid
import pytest
from temporalio.testing import WorkflowEnvironment
from temporalio.worker import Worker
from activities.ssntraceactivity_dacx import ssn_trace_activity
from workflows.backgroundcheck_dacx import BackgroundCheck
@pytest.mark.asyncio
async def test_execute_workflow():
task_queue_name = str(uuid.uuid4())
async with await WorkflowEnvironment.start_time_skipping() as env:
async with Worker(
env.client,
task_queue=task_queue_name,
workflows=[BackgroundCheck],
activities=[ssn_trace_activity],
):
assert "pass" == await env.client.execute_workflow(
BackgroundCheck.run,
"555-55-5555",
"a-unique-id",
id=str(uuid.uuid4()),
task_queue=task_queue_name,
)
This is a Python using the pytest framework and the ActivityEnvironment class from the Temporal Python SDK.
It tests the ssn_trace_activity
function from the activities module.
The function takes a social security number as input and returns a string indicating whether the SSN is valid or not.
The test checks if the function returns "pass" when given the SSN "55-55-555".
docs/tutorials/python/background-check/code/backgroundcheck_dacx/tests/activity_test.py
import pytest
from temporalio.testing import ActivityEnvironment
from activities.ssntraceactivity_dacx import ssn_trace_activity
@pytest.mark.asyncio
async def test_ssn_trace_activity() -> str:
activity_environment = ActivityEnvironment()
expected_output = "pass"
assert expected_output == await activity_environment.run(
ssn_trace_activity, "555-55-5555", "a-unique-id"
)
You have now built a Workflow for Durable Execution and registered your Workflow and Activity with a Worker that will run with a Temporal Cloud environment.
You also added some basic Python tests using the pytest
framework and Temporal's testing package.
Your Worker can now execute the Workflow on a schedule.
The Temporal Python SDK offers various features to build durable and resilient workflows.
In the next section of this tutorial, you'll review the code to understand what happens when a Background Check fails.
Tags:
Last updated on Aug 21, 2024
Previous\ \ Project setup Next\ \ Build a Background Check application with Temporal and Python
Copyright © 2024 Temporal Technologies Inc.
Expected Behavior
Under load the number of connections to the Postgres database backend remains fairly consistent over time.
Actual Behavior
When under load it has been observed that there are many new connections being made to the Postgres database being used as the DB backing for the history service. (200+ new connections per second during load test). The expectation is that the number of connections may rise to handle the load but it should achieve a steady state and relatively few connections killed and re-established.
One suggestion is that the method get may be being called frequently and for some reason the refcount is not incremented so remains at 0 and many new connections returned. To look into the problem further to see if this theory is valid.
https://github.com/temporalio/temporal/blob/b383ffffcbbeacdfce2fe021c30f093bab64b5d9/common/persistence/sql/factory.go#L195
Steps to Reproduce the Problem
Specifications