resonatehq / resonate

a dead simple programming model for the cloud
https://www.resonatehq.io
Apache License 2.0
405 stars 32 forks source link

Add router and sender subsystems #405

Closed dfarr closed 2 months ago

dfarr commented 3 months ago

Add router and sender subsystems.

Router

type RouterSubmission struct {
    Promise *promise.Promise
}

type RouterCompletion struct {
    Matched bool
    Command *CreateTaskCommand
}

A router submission/completion matches and returns a create task command when a corresponding task must be created in the same transaction as a promise.

The router subsystem can be configured with a list of sources that will be applied against all create promise requests in order, the first source to match "wins" and subsequent sources are not checked. The following is an example configuration:

aio:
  subsystems:
    router:
      config:
        sources:
          - name: "source-1"
            type: "tag"
            data:
              key: "tag-1"
          - name: "source-2"
            type: "tag"
            data:
              key: "tag-2"
          - name: "source-3"
            type: "tag"
            data:
              key: "tag-3"

The PR implements a tag source (as indicated by type: "tag"), this is currently the only supported source. Tag sources match any promise with the tag named in the config and extract a receiver by:

  1. check if the tag is valid json and if so trying to unmarshal it to a receiver
  2. checking if the tag is a valid url
  3. assume the value is a logical receiver

If the tag value is assumed to be a logical receiver the string value of the tag is stored as the receiver and mapped to a physical receiver in the sender subsystem.

Sender

type SenderSubmission struct {
    Task *task.Task
}

type SenderCompletion struct {
    Success bool
}

A sender submission/completion takes a task (which includes a receiver and the id/counter needed for the body of the message) and indicates if the "enqueue" was successful or unsuccessful.

The sender subsystem can be configured with plugins that implement a plugin interface.

type Plugin interface {
    String() string
    Type() string
    Start() error
    Stop() error
    Enqueue(*Message) bool
}

type Message struct {
    Data []byte
    Body []byte
    Done func(bool, error)
}

This PR implements the http plugin, which is currently the only supported plugin. Additionally, the sender subsystem can be configured with targets (aka physical receivers), if this subsystem get a receiver that is a string it looks up the corresponding physical receiver from this list. The following is an example configuration:

aio:
  subsystems:
    sender:
      config:
        targets:
          - name: "target-1"
            type: "http"
            data:
              url: http://localhost:3001
          - name: "target-2"
            type: "http"
            data:
              url: http://localhost:3002
          - name: "target-3"
            type: "http"
            data:
              url: http://localhost:3003
codecov[bot] commented 3 months ago

Codecov Report

Attention: Patch coverage is 54.70790% with 659 lines in your changes missing coverage. Please review.

Project coverage is 55.96%. Comparing base (4226d32) to head (f228c2c). Report is 2 commits behind head on main.

Files with missing lines Patch % Lines
internal/app/subsystems/aio/sender/sender.go 0.00% 130 Missing :warning:
...nternal/app/subsystems/api/grpc/api/callback.pb.go 12.29% 105 Missing and 2 partials :warning:
internal/app/subsystems/aio/router/router.go 37.06% 88 Missing and 2 partials :warning:
cmd/config/config.go 0.00% 57 Missing :warning:
internal/app/coroutines/claimTask.go 0.00% 42 Missing :warning:
...nternal/app/subsystems/api/grpc/api/schedule.pb.go 48.71% 20 Missing :warning:
internal/app/coroutines/enqueueTasks.go 6.25% 14 Missing and 1 partial :warning:
...rnal/app/subsystems/aio/store/postgres/postgres.go 72.72% 12 Missing and 3 partials :warning:
internal/app/subsystems/aio/store/sqlite/sqlite.go 71.15% 12 Missing and 3 partials :warning:
internal/app/plugins/http/http.go 76.66% 10 Missing and 4 partials :warning:
... and 32 more
Additional details and impacted files ```diff @@ Coverage Diff @@ ## main #405 +/- ## ========================================== - Coverage 56.92% 55.96% -0.96% ========================================== Files 114 119 +5 Lines 10331 10919 +588 ========================================== + Hits 5881 6111 +230 - Misses 4030 4383 +353 - Partials 420 425 +5 ```

:umbrella: View full report in Codecov by Sentry.
:loudspeaker: Have feedback on the report? Share it here.