temporalio / sdk-python

Temporal Python SDK
MIT License
448 stars 66 forks source link

[Bug] OpenTelemetry interceptors report errors #160

Open anhdle14 opened 1 year ago

anhdle14 commented 1 year ago

What are you really trying to do?

I am currently using OpenTelemetry and Temporal Interceptors to send traces to Grafana Tempo via Grafana Agent.

Describe the bug

Failed to detach context
Traceback (most recent call last):
  File "/app/.venv/lib/python3.10/site-packages/temporalio/contrib/opentelemetry.py", line 419, in _top_level_workflow_context
    yield None
  File "/app/.venv/lib/python3.10/site-packages/temporalio/contrib/opentelemetry.py", line 332, in execute_workflow
    return await super().execute_workflow(input)
GeneratorExit

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/app/.venv/lib/python3.10/site-packages/opentelemetry/context/__init__.py", line 157, in detach
    _RUNTIME_CONTEXT.detach(token)  # type: ignore
  File "/app/.venv/lib/python3.10/site-packages/opentelemetry/context/contextvars_context.py", line 50, in detach
    self._current_context.reset(token)  # type: ignore
ValueError: <Token var=<ContextVar name='current_context' default={} at 0x7f33b28ea750> at 0x7f33244d7e40> was created in a different Context

It just returns errors like this without any more detailed

Minimal Reproduction

# worker.py

resource = Resource(attributes={SERVICE_NAME: "worker"})

provider = TracerProvider(resource=resource)
provider.add_span_processor(BatchSpanProcessor(ConsoleSpanExporter()))
provider.add_span_processor(BatchSpanProcessor(OTLPSpanExporter(
    endpoint="tempo-us-central1.grafana.net:443",
    headers=[
        "authorization=Basic <REDACTED>"
    ]
)))

# Sets the global default tracer provider
trace.set_tracer_provider(provider)

# Creates a tracer from the global tracer provider
tracer = trace.get_tracer(__name__)

async def worker():
    client = await Client.connect(
        target_host="127.0.0.1:7233",
        namespace="default",
        interceptors=[TracingInterceptor(tracer=tracer)],
    )

    worker = Worker(
        client=client,
        task_queue="hello-query-task-queue",
        workflows=[GreetingWorkflow],
        activities=[compose_greeting],
    )

    await worker.run()

if __name__ == "__main__":
    asyncio.run(worker())
# activity.py
resource = Resource(attributes={
    SERVICE_NAME: "python_temporal_opentelemetry_parallel"
})

provider = TracerProvider(resource=resource)
provider.add_span_processor(BatchSpanProcessor(ConsoleSpanExporter()))
provider.add_span_processor(BatchSpanProcessor(OTLPSpanExporter()))

# Sets the global default tracer provider
trace.set_tracer_provider(provider)

# Creates a tracer from the global tracer provider
tracer = trace.get_tracer(__name__)

@activity.defn
async def say_hello_activity(name: str) -> str:
    return f"Hello, {name}!"

@workflow.defn
class SayHelloWorkflow:
    @workflow.run
    async def run(self) -> None:
        otel_workflow.completed_span("parallel_wf")
        # Run 5 activities at the same time

        # ...

async def main():
    # Start client
    client = await Client.connect(
        target_host="localhost:7233",
        interceptors=[TracingInterceptor(tracer)],
    )

    # While the worker is running, use the client to run the workflow and
    # print out its result. Note, in many production setups, the client
    # would be in a completely separate process from the worker.
    handle = await client.start_workflow(
        SayHelloWorkflow.run,
        id="hello-parallel-activity-workflow-id",
        task_queue="hello-activity-task-queue",
    )

    await handle
    # print(f"Result: {result}")

if __name__ == "__main__":
    asyncio.run(main())

Environment/Versions

Additional context

N/A

cretz commented 1 year ago

Thanks for the reproduction! That really helps. I will hopefully get to this soon.

cretz commented 1 year ago

@anhdle14 - While it takes me some time to get around to this, there is https://github.com/temporalio/samples-python/tree/main/open_telemetry that does work fine. Can you confirm? Also note that OTLP package that uses an older version of protobuf may not work with ours at this time.

nathanielobrown commented 1 year ago

I'm seeing this as well in my tests with temporalio==1.2.0, opentelemetry-api==1.17.0,opentelemetry-sdk==.17.0

cretz commented 1 year ago

@nathanielobrown - Can you describe what you're trying to do exactly? Are you trying to use OpenTelemetry from a workflow or in your workflow file? Can you pass through the import?

nathanielobrown commented 1 year ago

I'm importing from temporalio.contrib.opentelemetry import TracingInterceptor and then using this interceptor for all my clients and workers. For the client I do have another interceptor I created for Sentry:

def get_worker_interceptors() -> Sequence[Interceptor]:
    return [SentryInterceptor(), TracingInterceptor()]

def get_client_interceptors() -> Sequence[ClientInterceptor]:
    return [TracingInterceptor()]

I haven't got tracing working well yet, so I'm no expert/still a bit lost in the otel stuff, but I can say I see this log output in tests that run a workflow. Maybe for all tests that run workflows, not 100% sure:

ERROR:opentelemetry.context:Failed to detach context
Traceback (most recent call last):
  File "/Users/nob/Library/Caches/pypoetry/virtualenvs/dd-6mCrh732-py3.10/lib/python3.10/site-packages/temporalio/contrib/opentelemetry.py", line 427, in _top_level_workflow_context
    yield None
  File "/Users/nob/Library/Caches/pypoetry/virtualenvs/dd-6mCrh732-py3.10/lib/python3.10/site-packages/temporalio/contrib/opentelemetry.py", line 340, in execute_workflow
    return await super().execute_workflow(input)
GeneratorExit

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/nob/Library/Caches/pypoetry/virtualenvs/dd-6mCrh732-py3.10/lib/python3.10/site-packages/opentelemetry/context/__init__.py", line 163, in detach
    _RUNTIME_CONTEXT.detach(token)  # type: ignore
  File "/Users/nob/Library/Caches/pypoetry/virtualenvs/dd-6mCrh732-py3.10/lib/python3.10/site-packages/opentelemetry/context/contextvars_context.py", line 50, in detach
    self._current_context.reset(token)  # type: ignore
ValueError: <Token var=<ContextVar name='current_context' default={} at 0x1104908b0> at 0x168a7fc40> was created in a different Context
cretz commented 1 year ago

Is that in a workflow file? Have you made sure you pass through the import?

nathanielobrown commented 1 year ago

Not sure I totally follow, but I can say I've been using UnsandboxedWorkflowRunner so haven't been worrying about "pass through" imports. My definition of getting the interceptors and creating workers are in separate modules from my workflows. Does that answer the question?

cretz commented 1 year ago

Does that answer the question?

Yes, if you're not using the sandbox then it must be some other issue. We have unit tests for OTel at https://github.com/temporalio/sdk-python/blob/main/tests/contrib/test_opentelemetry.py, but the original users' sample recreates the tracer twice and yours using Sentry + OTel which shouldn't matter. I will try to create a small reproducible test case when I can get to this issue.

nathanielobrown commented 1 year ago

Just wanted to jot down that I think this might just be an issue on worker exit. This happens all over the place in my tests, but I'm creating+shutting down workers all over the place as well. In production I see a lot of these errors when a worker is killed.

cretz commented 1 year ago

Yeah, there's an issue with detach context on generator exit (i.e. when event loop is GC'd). May be similar to https://github.com/open-telemetry/opentelemetry-python/issues/2606. I have to probably just swallow detachment failure.

anhdle14 commented 1 year ago

but the original users' sample recreates the tracer twice and yours using Sentry + OTel which shouldn't matter. I will try to create a small reproducible test case when I can get to this issue.

Sorry, I don't follow this, should I not create the tracer twice?

cretz commented 1 year ago

Sorry, I don't follow this, should I not create the tracer twice?

Correct. You are already setting it globally in worker.py that imports activity.py, you don't need to set it in activity.py. You are also including the workflow in with the activity and I would recommend that workflows be a separate file. You should not run global code in the workflow file (it is sandboxed and will run every workflow run). See README.