uber-go / cadence-client

Framework for authoring workflows and activities running on top of the Cadence orchestration engine.
https://cadenceworkflow.io
MIT License
345 stars 131 forks source link

v2 SDK design ideas #1133

Open vytautas-karpavicius opened 2 years ago

vytautas-karpavicius commented 2 years ago

Background

With gRPC migration happening in Cadence, the Go client needs to migrate as well. Currently the public API surface exposes Thrift types directly to users which makes this migration impossible without breaking changes to the API. Here are some v2 API ideas considered for the next major version. Please comment on those or propose your own, as this is a good time for such changes to be considered.

Goals

Proposal

Here is an attempt to design v2 API. Several ideas are being proposed. They are mostly orthogonal and can be considered separately. For now it only involves Client. Worker part remains untouched.

API Overview

Client
  - Domains() -> Domains
    - Register(ctx context.Context, name string, replication DomainReplicationConfig, opts ...DomainRegisterOptions) -> (Domain, error)
    - List(ctx context.Context, query string, page Page, opts ...DomainListOptions) -> ([]Domain, Page, error)
    - Get(name string) -> Domain

    Domain
     - Describe(ctx context.Context, opts ...DomainDescribeOptions) -> (DomainInfo, error)
     - Update(ctx context.Context, opts ...DomainUpdateOptions) -> error
     - Failover(ctx context.Context, cluster string, opts ...DomainFailoverOptions) -> error
     - Deprecate(ctx context.Context, opts ...DomainDeprecateOptions) -> error
     - BadBinaries() -> BadBinaries
       - List(ctx context.Context, opts ...BadBinaryListOptions) -> ([]BadBinary, error)
       - Add(ctx context.Context, checksum string, reason string, opts ...BadBinaryAddOptions) -> error
       - Delete(ctx context.Context, checksum string, opts ...BadBinaryDeleteOptions) -> error

     - Workflows() -> Workflows
       - Start(ctx context.Context, workflowType string, args []interface{}, taskList string, timeout time.Duration, opts ...WorkflowStartOptions) -> (Workflow, error)
       - SignalWithStart(ctx context.Context, signalName string, signalArgs []interface{}, workflowType string, args []interface{}, taskList string, timeout time.Duration, opts ...WorkflowStartOptions) -> (Workflow, error)
       - Count(ctx context.Context, query Query, opts ...WorkflowCountOptions) -> (int64, error)
       - List(ctx context.Context, query Query, page Page, opts ...WorkflowListOptions) -> ([]WorkflowExecution, *Page, error)
       - Get(workflowID, runID string) -> Workflow
       - GetCurrent(workflowID string) -> Workflow

       Workflow
        - Signal(ctx context.Context, signalName string, args []interface{}, opts ...WorkflowSignalOptions) -> error
        - Query(ctx context.Context, queryType string, args []interface{}, opts ...WorkflowQueryOptions) -> (encoded.Value, error)
        - Describe(ctx context.Context, opts ...WorkflowDescribeOptions)-> (WorkflowInfo, error)
        - Cancel(ctx context.Context, opts ...WorkflowCancelOptions) -> error
        - Terminate(ctx context.Context, reason string, opts ...WorkflowTerminateOptions) -> error
        - Reset(ctx context.Context, reason string, point WorkflowResetPoint, opts ...WorkflowResetOptions) -> (Workflow, error)
        - GetResult(ctx context.Context, opts ...WorkflowGetResultOptions) -> (encoded.Value, error)
        - Observe(ctx context.Context, opts ...WorkflowObserveOptions) -> (chan Event, error)
        - Activities() -> WorkflowActivities <embeds Activities>
          - GetByID(activityID string) -> Activity

          Activity
           - Complete(ctx context.Context, result interface{}, err error, opts ...ActivityCompleteOptions) -> error
           - RecordHeartbeat(ctx context.Context, details interface{}, opts ...ActivityRecordHeartbeatOptions) -> error

     - TaskLists() -> TaskLists
       - Get(name string, type TaskListType) -> TaskList

       TaskList
        - Describe(ctx context.Context) -> (TaskListInfo, error)

Activities() -> Activities
  - Get(taskToken []byte) -> Activity

SearchAttributes() -> SearchAttributes
  - List(ctx context.Context) ([]SearchAttribute, error)

Ideas to consider

Multi level entities We will provide high level entities for Domain, Workflow, Tasklist, etc. that will have functions to operate on them. This will bring API towards CRUD style, which is familiar to many developers. From users points of view such structure gives several benefits:

Splitting domain update into several functions While we have a single RPC call for updating the domain, it contains many fields for different use cases. Updating fields for some different use cases can even result in errors, for example: no other domain updates are allowed during failover operation. Instead we should have separate methods for such use cases. They would accept required arguments only making API and code using it cleaner. They would also have different semantics behind; with their own errors. So far I have separated cases for deprecation, failover, operating on bad binaries. It does not seem likely that those could be used together. We could also consider separating archival/visibility enabling/disabling.

Pagination model Page structure can be used to encapsulate pagination related data.

// Page contains pagination data
// Zero value Page{}, means first page with default size
Page struct {
    Size int32   // 0 - means default page size
    Token []byte // nil - means first page
}

func FirstPage() { return Page{} }

List functions accept Page for and also returns Page for next page query. It will keep the initial Size set, but will update Token for next page retrieval. Example:

workflows, nextPage, err := domain.Workflows().List(ctx, query, Page{})
// do something
if nextPage != nil {
workflows, nextPage, err = domain.Workflows().List(ctx, query, *nextPage)
}
// no more pages here

Workflow observation Currently we have exposed GetWorkflowHistory which returns all raw Thrift history events. We should not expose history to users as it really is an implementation detail and is a subject to change. Additionally this bloats the API, if we were to expose it with RPC agnostic types. Lots of mappers would be needed for that. However, it may be useful for observation of workflow progress. Therefore we could employ something in between. Expose Event structure but only with a few critical fields, leave all the details as JSON encoded blob. This could be used for visibility or logging. Accessing its internals would require declaring necessary types for users with a risk of them changing over time.

Event struct {
    ID int32
    Timestamp time.Time
    Data string // JSON blob
    // Maybe some other fields, that could be added later in non-breaking way if needed
}

Another proposal is to return them as a channel chan Event. This resembles the streaming nature of events and will hide either long-polls or gRPC streams underneath. Finished workflow will close the channel.

Workflow replay Not exposing event types has an implication for history replay. We will no longer have a function variant that takes raw history events. Replaying history from JSON files (either Thrift based or Proto based) will still be possible.

Use encoded.Value for returned payload Currently we have two different ways to obtain payload result:

Scan workflows as a List option Signature for listing and scanning workflows is identical. The difference is only in usage pattern and performance. Therefore we could simplify API by exposing it as a simple option for listing workflows:

workflows.List(ctx, query,  Page{}, WithScan())

Archived workflows as a List option Currently we have separate functions for listing workflows vs listing archived workflows. As input/output for such calls are the same we could simplify the API by exposing archived workflows as a list option.

workflows.List(ctx, query,  Page{}, WithArchived())

Open / Closed workflows as List option Currently we have separate functions for listing open and closed workflows. Listing closed workflows additionally includes a filter to specify workflow close status. This close status could be expressed as broader workflow status that includes OPEN as well, leaving only one List variant.

Query Builder We could provide QueryBuilder utility for easier query construction. It could offer an interface independent of backing storage, such that different builder implementations could be provided if a server were set up to use different storage. Server may communicate its capabilities to the client which would instantiate appropriate QueryBuilder implementation. It also increases discoverability of what can be queried.

QueryBuilder interface {
    WorkflowStart(from, to *time.Time) Query
    WorkflowClose(from, to *time.Time) Query
    WorkflowStatus(in ...WorkflowStatus) Query
    WorkflowType(wfType string) Query
    ...
    And(a, b Query) Query
    Or(a, b Query) Query
    Not(a Query) Query

    All(clauses ...Query) Query
    Any(clauses ...Query) Query
}

Example:

qb := client.QueryBuilder()
query := qb.And(
  qb.WorkflowStart(time.Now().Sub(time.Hour), nil),
  qb.Not(qb.WorkflowStatus(WorkflowStatusOpen)))

workflows, nextPage, err := client.Domains().Get(“my-domain”).Workflows().List(ctx, query, Page{Size:100}) 

Unify workflow listing API for ES and non-ES queries Currently we have two variants of visibility queries:

Workflow reset points Currently workflow reset function takes raw request struct where user needs to specify exact event ID for workflow to get reset to. Our CLI has a bunch of options to make this reset easier by finding event ID. For example, LastDecisionCompleted, DecisionCompleteTime, BadBinary, etc. This could be moved to the Client instead or event to the server itself with additional IDL changes.

- ResetToLastDecisionCompleted() WorkflowResetPoint
- ResetToLastDecisionScheduled() WorkflowResetPoint
- ResetToLastContinueAsNew() WorkflowResetPoint
- ResetToFirstDecisionCompleted() WorkflowResetPoint
- ResetToFirstDecisionScheduled() WorkflowResetPoint
- ResetToBadBinary(checksum string) WorkflowResetPoint
- ResetToEarliestDecisionCompletedAfter(timestamp time.Time) WorkflowResetPoint
- ResetToEventId(eventId int64) WorkflowResetPoint

Usage example:

newWorkflow, err := workflow.Reset(ctx, “reason”, ResetToLastDecisionCompleted())

This would expose more user friendly options to the general client, not just the CLI. Another option is not expose this for the client at all, see proposal below.

Exposure of operator functions Related to the idea above is the more general question of exposing operator related functions. We have these options:

Unified reason Currently from a client perspective a reason for termination and reset are not required. CLI requires a reason for reset, but not for termination. We should unify this, either both required as positional parameters or both as options.

Helpers for domain replication To simplify configuration of domain replication these could be useful:

func GlobalDomain(activeCluster string, clusters []string) DomainReplicationConfig {
    return &globalDomain{activeCluster, clusters}
}

func LocalDomain(cluster string) DomainReplicationConfig {
    return &localDomain{cluster}
}

Usage example:

client.Domains().Register(ctx, “my-domain-name”, GlobalDomain(“cluster-A”, []string{“cluster-A”, “cluster-B”})