riverqueue / river

Fast and reliable background jobs in Go
https://riverqueue.com
Mozilla Public License 2.0
3.22k stars 86 forks source link

ability to add custom maintenance services to Client #373

Closed bgentry closed 1 month ago

bgentry commented 3 months ago

This exposes an interface for a rivertype.MaintenanceService, along with a way to add custom maintenance services to the Client using a new Config.MaintenanceServices field. See #372 for a bit more background on the underlying change—this one is merely illustrating how that PR makes it straightforward to expose a minimal interface for adding custom maintenance services.

brandur commented 2 months ago

The only thing that gives me a bit of pause on this one is that when it comes to exposing APIs that we don't endorse regular use and which we might want to have the flexibility to change in the future.

What do you think about the idea of adding something like an Unstable() section the client that makes it clear that APIs within are considered changeable? I think we're going to need something like this in the future anyway for adding other hooks for extended plugin functionality.

diff --git a/client.go b/client.go
index 169eb6c..2438b23 100644
--- a/client.go
+++ b/client.go
@@ -31,6 +31,7 @@ import (
    "github.com/riverqueue/river/internal/workunit"
    "github.com/riverqueue/river/riverdriver"
    "github.com/riverqueue/river/rivertype"
+   "github.com/riverqueue/river/riverunstable"
 )

 const (
@@ -306,6 +307,7 @@ type Client[TTx any] struct {
    stopped              chan struct{}
    testSignals          clientTestSignals
    uniqueInserter       *dbunique.UniqueInserter
+   unstable             *ClientUnstable[TTx]

    // workCancel cancels the context used for all work goroutines. Normal Stop
    // does not cancel that context.
@@ -463,6 +465,7 @@ func NewClient[TTx any](driver riverdriver.Driver[TTx], config *Config) (*Client
    baseservice.Init(archetype, &client.baseService)
    client.baseService.Name = "Client" // Have to correct the name because base service isn't embedded like it usually is
    client.insertNotifyLimiter = notifylimiter.NewLimiter(archetype, config.FetchCooldown)
+   client.unstable = &ClientUnstable[TTx]{client: client}

    // There are a number of internal components that are only needed/desired if
    // we're actually going to be working jobs (as opposed to just enqueueing
@@ -1802,6 +1805,24 @@ func (c *Client[TTx]) QueueResume(ctx context.Context, name string, opts *QueueP
    return tx.Commit(ctx)
 }

+// Unstable contains unstable River APIs that are meant for internal use only.
+// Anything returned by this function is not consider public-facing API and is
+// exempt from River's normal API stability guarantees. Function signatures may
+// change, types may be renamed, interfaces may change, and types may migrate
+// between packages. Breakages will be frequent, will not be noted in the
+// changelog, and will not trigger a major version bump.
+func (c *Client[TTx]) Unstable() *ClientUnstable[TTx] {
+   return c.unstable
+}
+
+type ClientUnstable[TTx any] struct {
+   client *Client[TTx]
+}
+
+func (u *ClientUnstable[TTx]) AddMaintenanceService(service riverunstable.MaintenanceService) {
+   u.client.queueMaintainer.AddService(service)
+}
+
 // Generates a default client ID using the current hostname and time.
 func defaultClientID(startedAt time.Time) string {
    host, _ := os.Hostname()
diff --git a/riverunstable/river_unstable.go b/riverunstable/river_unstable.go
new file mode 100644
index 0000000..3be92b5
--- /dev/null
+++ b/riverunstable/river_unstable.go
@@ -0,0 +1,4 @@
+package riverunstable
+
+type MaintenanceService interface {
+}
bgentry commented 1 month ago

We decided to go another direction on this for now, closing this PR to reflect that.