apple / swift-distributed-actors

Peer-to-peer cluster implementation for Swift Distributed Actors
https://apple.github.io/swift-distributed-actors/
Apache License 2.0
593 stars 56 forks source link

Support subscribing to dead letters #741

Open budde opened 4 years ago

budde commented 4 years ago

It would be nice to be able to subscribe some ActorRef to deadLetters for an actor system in order to be notified about any messages sent to a terminated actor. This would be particularly useful for testing logic triggered by the termination of some watched actor.

For instance, we can imagine having an actor that tracks some state such as a string. This actor could allow other actors to subscribe to updates to its state. If a subscriber is terminated, we want our actor to remove it from its listing of subscribers in order to avoid sending dead messages and having an infinitely growing list of subscribers if they don't explicitly unsubscribe before termination.

A trivial example of such an actor implemented with Actorable:

class StatefulActor: Actorable {
  let context: Myself.Context
  var state: String
  var subscribers: Set<Actor<StatefulSubscriber>>

  init(context: Myself.Context, initialState: String) {
    self.context = context
    self.state = initialState
    self.subscribers = []
  }

  /// Add a subscriber
  // @actor
  func subscribe(_ subscriber: Actor<StatefulSubscriber>) {
    context.watch(subscriber)
    subscribers.insert(subscriber)
  }

  /// Remove a subscriber
  // @actor
  func unsubscribe(_ subscriber: Actor<StatefulSubscriber>) {
    context.watch(subscriber)
    subscribers.remove(subscriber)
  }

  /// Update our internal state and notify subscribers if it changed
  // @actor
  func updateState(_ newState: String) {
    if newState != state {
      state = newState
      subscribers.forEach { $0.stateChanged(newState: state forActor: context.myself) }
    }
  }

  /// Remove any subscriber that terminated
  // @actor
  func receiveTerminated(context: Myself.Context, terminated: Signals.Terminated) -> DeathPactDirective {
    if let subscriber = subscribers.first(where: { $0.address == terminated.address }) {
      subscribers.remove(subscriber)
    } 
  }
}

And the subscriber:

// The subscriber would make most sense as a protocol but using a class for simplicity
class StatefulSubscriber: Actorable {
  let context: Context.Myself

  init(context: Context.Myself) {
    self.context = context
  }

  // @actor
  func subscribeTo(_ actor: Actor<StatefulActor>) {
    actor.subscribe(context.myself)
  }

  // @actor
  func stateChanged(newState: String, forActor: Actor<StatefulActor>) {
    // do something with the new state
  }
}

In this example, it'd be very nice if we could write a test that ensures that the watch()/receivedTerminated() logic is implemented correctly by subscribing a test probe to the dead letters and ensuring no messages are sent to it when a state update occurs after the subscriber is terminated.

func testSubscription() throws {
  let actor = try system.spawn("stateful-actor") { StatefulActor(context: $0, initialState: "initial") }
  let subscriber = testKit.spawnActorableTestProbe(of: StatefulSubscriber.self)
  system.receptionist.register(subscriber.actor, as: "*")  // so we can check for termination later

  actor.subscribe(subscriber.actor)

  // ...

  // Set up a test probe for observing dead letters
  let deadLetterProbe: ActorTestProbe<DeadLetterSubscriberMessage> = testKit.spawnTestProbe()
  system.deadLetters.subscribe(deadLetterProbe.ref)

  subscriber.stop()
  let receptionKey: Reception.Key<Actor<StatefulSubscriber>> = "*"
  try testKit.ensureRegistered(key: receptionKey, expectedCount: 0) // await subscriber termination

  actor.updateState("final-state")
  try deadLetterProbe.expectNoMessage(for: .seconds(1)) // make sure that subscriber was removed
}
ktoso commented 4 years ago

Sounds good, let me add this capability to the dead letters office 👍