temporalio / sdk-core

Core Temporal SDK that can be used as a base for language specific Temporal SDKs
MIT License
262 stars 70 forks source link

UpdateAdmitted event processing #700

Closed dandavison closed 2 months ago

dandavison commented 5 months ago

This PR adds handling for the new WorkflowExecutionUpdateAdmitted event ("UAdm") that the server is going to start sending for reapplied updates (i.e. after reset / conflict resolution).

If the server does request an update by sending the new event type, then:

  1. It will not send a protocol message.
  2. The event will have the update request payload.
  3. If the update is accepted, then the server will write a WorkflowExecutionUpdateAccepted event ("UAcc") without the request payload.

Implementation notes

How this was tested

This samples-python script against this WIP server branch

dandavison commented 2 months ago

Handling the remaining activations in the test so that panic doesn't happen would be good

@Sushisource I've done that in https://github.com/temporalio/sdk-core/pull/700/commits/03e65364c44df66e89d62fb87174d5288125b5da, but not in a very satisfying way:

    // There is an activation containing a RemoveFromCache job left unhandled
    replay_worker.poll_workflow_activation().await.unwrap();

This may be unimportant details but while I'm learning core: is it obvious to you why there's an activation left consisting of [RemoveFromCache] when we complete the post-reset workflow execution, but there wasn't earlier in the test, when we completed the pre-reset workflow execution, or indeed in your pre-existing test update_workflow?

I thought perhaps the [RemoveFromJob] activation would be added every time we complete an activation with a CompleteWorkflowExecution command, in which case I could have dealt with it at that point, but that doesn't seem to be the case.

Here's the backtrace at the point responsible where we call `create_evict_activation`: ``` { fn: "temporal_sdk_core_protos::coresdk::workflow_activation::create_evict_activation", file: "/Users/dan/src/temporalio/sdk-core/sdk-core-protos/src/lib.rs", line: 467 }, { fn: "temporal_sdk_core::worker::workflow::managed_run::ManagedRun::update_to_acts", file: "./src/worker/workflow/managed_run.rs", line: 918 }, { fn: "temporal_sdk_core::worker::workflow::managed_run::ManagedRun::check_more_activations", file: "./src/worker/workflow/managed_run.rs", line: 313 }, { fn: "temporal_sdk_core::worker::workflow::workflow_stream::WFStream::process_post_activation", file: "./src/worker/workflow/workflow_stream.rs", line: 380 }, { fn: "temporal_sdk_core::worker::workflow::workflow_stream::WFStream::build_internal::{{closure}}", file: "./src/worker/workflow/workflow_stream.rs", line: 132 }, { fn: ">::call_mut", file: "/Users/dan/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-util-0.3.30/src/fns.rs", line: 28 }, { fn: " as futures_core::stream::Stream>::poll_next::{{closure}}", file: "/Users/dan/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-util-0.3.30/src/str eam/stream/map.rs", line: 59 }, { fn: "core::option::Option::map", file: "/rustc/129f3b9964af4d4a709d1383930ade12dfe7c081/library/core/src/option.rs", line: 1072 }, { fn: " as futures_core::stream::Stream>::poll_next", file: "/Users/dan/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-util-0.3.30/src/stream/stream/ma p.rs", line: 59 }, { fn: " as futures_core::stream::Stream>::poll_next", file: "/Users/dan/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-util-0.3.30/src/stream/stream/ma p.rs", line: 58 }, { fn: " as futures_core::stream::Stream>::poll_next", file: "/Users/dan/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-util-0.3.30/src/lib.rs", line: 10 2 }, { fn: " as futures_core::stream::Stream>::poll_next", file: "/Users/dan/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-util-0.3.30/src /stream/stream/take_while.rs", line: 80 }, { fn: "futures_util::stream::stream::StreamExt::poll_next_unpin", file: "/Users/dan/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-util-0.3.30/src/stream/stream/mod.rs", line: 1638 }, { fn: " as core::future::future::Future>::poll", file: "/Users/dan/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-util-0.3.30/src/stream/stream/next.rs ", line: 32 }, { fn: "temporal_sdk_core::worker::workflow::Workflows::new::{{closure}}::{{closure}}", file: "./src/worker/workflow/mod.rs", line: 213 }, { fn: " as core::future::future::Future>::poll::{{closure}}", file: "/Users/dan/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.37.0/src/task/local.rs", line: 978 }, { fn: "tokio::task::local::LocalSet::with::{{closure}}", file: "/Users/dan/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.37.0/src/task/local.rs", line: 730 }, { fn: "std::thread::local::LocalKey::try_with", file: "/rustc/129f3b9964af4d4a709d1383930ade12dfe7c081/library/std/src/thread/local.rs", line: 286 }, { fn: "std::thread::local::LocalKey::with", file: "/rustc/129f3b9964af4d4a709d1383930ade12dfe7c081/library/std/src/thread/local.rs", line: 262 }, { fn: "tokio::task::local::LocalSet::with", file: "/Users/dan/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.37.0/src/task/local.rs", line: 728 }, { fn: " as core::future::future::Future>::poll", file: "/Users/dan/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.37.0/src/task/local.rs", line: 968 }, { fn: "tokio::task::local::LocalSet::run_until::{{closure}}", file: "/Users/dan/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.37.0/src/task/local.rs", line: 635 }, { fn: " as core::future::future::Future>::poll", file: "/rustc/129f3b9964af4d4a709d1383930ade12dfe7c081/library/core/src/future/future.rs", line: 123 }, { fn: "tokio::runtime::scheduler::current_thread::CoreGuard::block_on::{{closure}}::{{closure}}::{{closure}}", file: "/Users/dan/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.37.0/src/runtime/sch eduler/current_thread/mod.rs", line: 659 }, { fn: "tokio::runtime::coop::with_budget", file: "/Users/dan/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.37.0/src/runtime/coop.rs", line: 107 }, { fn: "tokio::runtime::coop::budget", file: "/Users/dan/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.37.0/src/runtime/coop.rs", line: 73 }, { fn: "tokio::runtime::scheduler::current_thread::CoreGuard::block_on::{{closure}}::{{closure}}", file: "/Users/dan/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.37.0/src/runtime/scheduler/curren t_thread/mod.rs", line: 659 }, { fn: "tokio::runtime::scheduler::current_thread::Context::enter", file: "/Users/dan/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.37.0/src/runtime/scheduler/current_thread/mod.rs", line: 404 }, { fn: "tokio::runtime::scheduler::current_thread::CoreGuard::block_on::{{closure}}", file: "/Users/dan/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.37.0/src/runtime/scheduler/current_thread/mod. rs", line: 658 }, { fn: "tokio::runtime::scheduler::current_thread::CoreGuard::enter::{{closure}}", file: "/Users/dan/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.37.0/src/runtime/scheduler/current_thread/mod.rs" , line: 737 }, { fn: "tokio::runtime::context::scoped::Scoped::set", file: "/Users/dan/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.37.0/src/runtime/context/scoped.rs", line: 40 }, { fn: "tokio::runtime::context::set_scheduler::{{closure}}", file: "/Users/dan/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.37.0/src/runtime/context.rs", line: 176 }, { fn: "std::thread::local::LocalKey::try_with", file: "/rustc/129f3b9964af4d4a709d1383930ade12dfe7c081/library/std/src/thread/local.rs", line: 286 }, { fn: "std::thread::local::LocalKey::with", file: "/rustc/129f3b9964af4d4a709d1383930ade12dfe7c081/library/std/src/thread/local.rs", line: 262 }, { fn: "tokio::runtime::context::set_scheduler", file: "/Users/dan/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.37.0/src/runtime/context.rs", line: 176 }, { fn: "tokio::runtime::scheduler::current_thread::CoreGuard::enter", file: "/Users/dan/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.37.0/src/runtime/scheduler/current_thread/mod.rs", line: 737 } , { fn: "tokio::runtime::scheduler::current_thread::CoreGuard::block_on", file: "/Users/dan/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.37.0/src/runtime/scheduler/current_thread/mod.rs", line: 64 6 }, { fn: "tokio::runtime::scheduler::current_thread::CurrentThread::block_on::{{closure}}", file: "/Users/dan/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.37.0/src/runtime/scheduler/current_thread/ mod.rs", line: 175 }, { fn: "tokio::runtime::context::runtime::enter_runtime", file: "/Users/dan/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.37.0/src/runtime/context/runtime.rs", line: 65 }, { fn: "tokio::runtime::scheduler::current_thread::CurrentThread::block_on", file: "/Users/dan/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.37.0/src/runtime/scheduler/current_thread/mod.rs", line : 167 }, { fn: "tokio::runtime::runtime::Runtime::block_on", file: "/Users/dan/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.37.0/src/runtime/runtime.rs", line: 349 }, { fn: "tokio::task::local::LocalSet::block_on", file: "/Users/dan/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.37.0/src/task/local.rs", line: 592 }, { fn: "temporal_sdk_core::worker::workflow::Workflows::new::{{closure}}", file: "./src/worker/workflow/mod.rs", line: 192 }, ```