temporalio / sdk-core

Core Temporal SDK that can be used as a base for language specific Temporal SDKs
MIT License
277 stars 77 forks source link

[Bug] Worker.finalize_shutdown seems to hang when poll never succeeded due to server permission failure #667

Closed cretz closed 5 months ago

cretz commented 9 months ago

Describe the bug

Originally reported at https://github.com/temporalio/sdk-python/issues/459.

To replicate, first run a local Temporal server, then alter https://github.com/temporalio/samples-go/blob/main/grpc-proxy/proxy-server/main.go to change the gRPC dialing to:

    grpcClient, err := grpc.Dial(
        upstreamFlag,
        grpc.WithTransportCredentials(insecure.NewCredentials()),
        // grpc.WithUnaryInterceptor(clientInterceptor),
        grpc.WithUnaryInterceptor(func(
            ctx context.Context, method string,
            req, reply any,
            cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
            if strings.HasPrefix(method, "/temporal.api.workflowservice.v1.WorkflowService/Poll") {
                return status.Error(codes.PermissionDenied, "DENIED!")
            }
            return invoker(ctx, method, req, reply, cc, opts...)
        }),
    )

This will simulate permission denied upstream. Then run it, e.g. go run ./grpc-proxy/proxy-server and it'll run a proxy at 127.0.0.1:8081 that proxies to 127.0.0.1:7233 by default. Then run the Python script given in that issue pointing to this proxy:

import asyncio
from temporalio import activity, client, workflow, worker

@activity.defn
async def a() -> None:
    pass

@workflow.defn
class Workflow:
    @workflow.run
    async def run(self) -> None:
        pass

async def main():
    c = await client.Client.connect(
        "127.0.0.1:8081",
        rpc_metadata={"authorization": "wrong_token"},
        # tls=True,
    )
    w = worker.Worker(
        c,
        task_queue="default",
        activities=[a],
        workflows=[Workflow],
    )
    await w.run()

if __name__ == "__main__":
    asyncio.run(main())

After about a minute or so, the worker will fatally error due to too many permission denials. But the worker process gets to await finalize_shutdown and hangs. Feel free to add logs inside .venv/Lib/site-packages/temporalio/worker/_worker.py's run call to see how far it gets.

pwalessi-dell commented 5 months ago

The worker is hanging on line 295 self.poll_returned_shutdown_token.cancelled().await; in worker/activities.rs. It seems like self.poll_returned_shutdown_token.cancel() should be getting called at some point but it is not so the activity never gets cancelled and the await never resolves. I added the following code to the poll method to test if sending the cancel message on a poilling error fixes the issue and it does:

    pub(crate) async fn poll(&self) -> Result<ActivityTask, PollActivityError> {
        let mut poller_stream = self.activity_task_stream.lock().await;
        let r= poller_stream.next().await.unwrap_or_else(|| {
            self.poll_returned_shutdown_token.cancel();
            return Err(PollActivityError::ShutDown);
        });
        println!("got r");
        if r.is_err() {
            self.poll_returned_shutdown_token.cancel();
            return Err(PollActivityError::TonicError(tonic::Status::internal("Internal error")));        
        }
        r
    }

Obviously that needs to be cleaned up. However, I don't think that this is the correct place to send cancel() but I don't know whenit should be sent.

@cretz can you comment?

cretz commented 5 months ago

I have no comment to add. Will defer to those more familiar with this logic/repo.

Sushisource commented 5 months ago

It's not the right place to call the cancel, no, the real issue in this case is that the poller_stream in that function you are referring to never terminates in this case (only ever getting poll errors), but it should still.

As for exactly where to fix that, I'd have to look a bit more. Might have some time later today.

pwalessi-dell commented 5 months ago

It's not the right place to call the cancel, no, the real issue in this case is that the poller_stream in that function you are referring to never terminates in this case (only ever getting poll errors), but it should still.

As for exactly where to fix that, I'd have to look a bit more. Might have some time later today.

Ok, thanks! If I get some time, I'll try to go in and take a look too.

Sushisource commented 5 months ago

Looking into this today