Horusiath / Akkling

Experimental F# typed API for Akka.NET
Apache License 2.0
226 stars 44 forks source link

Use case: running queries on persistent views #14

Closed chillitom closed 8 years ago

chillitom commented 8 years ago

Something I'm struggling with using the existing FSharp API is how to query the state of a persistent view actor.

What I'd like to do is..

Perhaps I'm using the old API incorrectly but the fact that Receive only takes 'Event seems very limiting.

Maybe Akkling can already cope with this but thought I'd mention this use case here.

chillitom commented 8 years ago

I've ended up writing the following to support this use case with the existing API.

[<Interface>]
type QView<'Event, 'State, 'Query> =
    inherit IActorRefFactory
    inherit ICanWatch
    inherit Snapshotter<'State>

    /// <summary>
    /// Gets <see cref="IActorRef" /> for the current actor.
    /// </summary>
    abstract Self : IActorRef

    /// <summary>
    /// Gets the current actor context.
    /// </summary>
    abstract Context : IActorContext

    /// <summary>
    /// Returns a sender of current message or <see cref="ActorRefs.NoSender" />, if none could be determined.
    /// </summary>
    abstract Sender : unit -> IActorRef

    /// <summary>
    /// Explicit signalization of unhandled message.
    /// </summary>
    abstract Unhandled : obj -> unit

    /// <summary>
    /// Lazy logging adapter. It won't be initialized until logging function will be called. 
    /// </summary>
    abstract Log : Lazy<Akka.Event.ILoggingAdapter>

    /// <summary>
    /// Defers a function execution to the moment, when actor is suposed to end it's lifecycle.
    /// Provided function is guaranteed to be invoked no matter of actor stop reason.
    /// </summary>
    abstract Defer : (unit -> unit) -> unit

    /// <summary>
    /// Returns currently attached journal actor reference.
    /// </summary>
    abstract Journal: unit -> IActorRef

    /// <summary>
    /// Returns currently attached snapshot store actor reference.
    /// </summary>
    abstract SnapshotStore: unit -> IActorRef

    /// <summary>
    /// Returns last sequence number attached to latest persisted event.
    /// </summary>
    abstract LastSequenceNr: unit -> int64

    /// <summary>
    /// Persistent actor's identifier that doesn't change across different actor incarnations.
    /// </summary>
    abstract PersistenceId: unit -> PersistenceId

    /// <summary>
    /// View's identifier that doesn't change across different view incarnations.
    /// </summary>
    abstract ViewId: unit -> PersistenceId

type QPerspective<'Event, 'State, 'Query> = {
    state: 'State
    apply: QView<'Event, 'State, 'Query> -> 'State -> 'Event -> 'State
    query: QView<'Event, 'State, 'Query> -> 'State -> 'Query -> unit
}

type FunPersistentQView<'Event, 'State, 'Query>(perspective: QPerspective<'Event, 'State, 'Query>, name: PersistenceId, viewId: PersistenceId) as this =
    inherit PersistentView()

    let mutable deferables = []
    let mutable state: 'State = perspective.state
    let mailbox = 
        let self' = this.Self
        let context = PersistentView.Context :> IActorContext
        let updateState (updater: 'Event -> 'State) e : unit = 
            state <- updater e
            ()
        { new QView<'Event, 'State, 'Query> with
            member __.Self = self'
            member __.Context = context
            member __.Sender() = this.Sender()
            member __.Unhandled msg = this.Unhandled msg
            member __.ActorOf(props, name) = context.ActorOf(props, name)
            member __.ActorSelection(path : string) = context.ActorSelection(path)
            member __.ActorSelection(path : ActorPath) = context.ActorSelection(path)
            member __.Watch(aref:IActorRef) = context.Watch aref
            member __.Unwatch(aref:IActorRef) = context.Unwatch aref
            member __.Log = lazy (Akka.Event.Logging.GetLogger(context)) 
            member __.Defer fn = deferables <- fn::deferables
            member __.Journal() = this.Journal
            member __.SnapshotStore() = this.SnapshotStore
            member __.PersistenceId() = this.PersistenceId
            member __.ViewId() = this.ViewId
            member __.LastSequenceNr() = this.LastSequenceNr
            member __.LoadSnapshot pid criteria seqNr = this.LoadSnapshot(pid, criteria, seqNr)
            member __.SaveSnapshot state = this.SaveSnapshot(state)
            member __.DeleteSnapshot seqNr timestamp = this.DeleteSnapshot(seqNr, timestamp)
            member __.DeleteSnapshots criteria = this.DeleteSnapshots(criteria) }

    member __.Sender() : IActorRef = base.Sender
    member __.Unhandled msg = base.Unhandled msg
    override x.Receive (msg: obj): bool = 
        match msg with
        | :? 'Event as e -> 
            state <- perspective.apply mailbox state e
            true
        | :? 'Query as q ->
            perspective.query mailbox state q
            true
        | _ -> 
            let serializer = UntypedActor.Context.System.Serialization.FindSerializerForType typeof<obj> :?> Akka.Serialization.NewtonSoftJsonSerializer
            match Serialization.tryDeserializeJObject serializer.Serializer msg with
            | Some(e) -> 
                state <- perspective.apply mailbox state e
                true
            | None -> false
    override x.PostStop () =
        base.PostStop ()
        List.iter (fun fn -> fn()) deferables
    default x.PersistenceId = name
    default x.ViewId = viewId

module Linq = 
    open System.Linq.Expressions
    open Akka.Actor
    open Akka.FSharp.Linq

    type PersistentExpression = 

        static member ToExpression(f : System.Linq.Expressions.Expression<System.Func<FunPersistentQView<'Event, 'State, 'Query>>>) = 
            match f with
            | Lambda(_, Invoke(Call(null, Method "ToFSharpFunc", Ar [| Lambda(_, p) |]))) -> 
                Expression.Lambda(p, [||]) :?> System.Linq.Expressions.Expression<System.Func<FunPersistentQView<'Event, 'State, 'Query>>>
            | _ -> failwith "Doesn't match"

let spawnQView (actorFactory : IActorRefFactory) (viewName: PersistenceId) (name : PersistenceId)  (perspective: QPerspective<'Event, 'State, 'Query>) (options : SpawnOption list) : IActorRef =
    let e = Linq.PersistentExpression.ToExpression(fun () -> new FunPersistentQView<'Event, 'State, 'Query>(perspective, name, viewName))
    let props = applySpawnOptions (Props.Create e) options
    actorFactory.ActorOf(props, viewName)
Horusiath commented 8 years ago

@chillitom if you start from that point, you can end up adding new generic parameter per each use case: Events, Queries, Snapshots or some control messages. I think it's better to do quite opposite - having one receiver function with one accepted message type, and let user decompose it - which is pretty easy in FP - depending on his/her needs.

For now, I have proposed the new shape of persistence API - you can see it here. I'll be happy to hear some feedback.

chillitom commented 8 years ago

Yep, I totally agree it's not a good direction to head in. I'll have a play with the new API and give you some feedback.

chillitom commented 8 years ago

So I'm struggling still with the new API, the issue I have is this, it feels like view should take two types, one to represent the events that will be loaded from the persistent actor the view is dependent on and a second type for messages that the view can handle (queries, instructions etc)

Perhaps something like the following...

type ViewMessage<'Event,'Query>
    = Event of 'Event
    | Query of 'Query

type View<'Event,Query> =
    inherit Actor<ViewMessage<'Event,'Query>>

meaning messages could be handled something like this..

type CountHistoryQuery 
    = LastValue
    | Last10

let counterHistoryView =
    spawnView system "counter-1" "count-view-1" <| fun mailbox ->
        let rec loop (state) = 
             actor { 
                let! msg = mailbox.Receive()
                match msg with
                | Event c ->
                        return! loop (state.[0] + c.Delta) :: state 
                | Query q ->
                    match q with
                    | LastValue -> mailbox.Sender() <! state.[0] 
                    | Last10 -> mailbox.Sender() <! List.take 10 state
                return! loop state
            }
        loop [0]

If the view can only handle the same events as the original actor it feels a bit limited... perhaps I'm missing something here though.

Horusiath commented 8 years ago

I don't think that resolving this issue through multiplying generic method parameters will be any win. Let's take an example - once you had generic 'Event and 'Query types, how do you want to handle other common akka.net events like LifecycleEvent (PostStop or PreStart) or PersistentEvent like ReplaySucceed, that don't have common root with ViewMessage<'Event, 'Query> except object itself. We could ofc repalce them with dedicated method handlers, but IMHO this way we will end up with API pollution sooner or later. It's easy to complicate things.

Since F# pattern matching is not nice to eye for non-algebraic data types as (for example) Scala, and the .NET type system doesn't support type unions like i.e. Ceylon does, we should find a way to utilize strong side of F# syntax in this matter. Lets take for example active patterns.

Given previously defined CounterMessage and CountHistoryQuery we could create following active patterns:

let (|Query|_|) (o : obj) = 
    match o with
    | :? CountHistoryQuery as q -> Some q
    | _ -> None

let (|Event|_|) (o : obj) = 
    match o with
    | :? CounterMessage as msg -> 
        match msg with
        | Event e -> Some e
        | _ -> None
    | _ -> None

They need to be defined only once. Now we can use them in our actor expression:

let counterHistoryView = 
    spawnView system "counter-1" "count-view-1" <| fun mailbox -> 
        let rec view state = 
            actor { 
                let! (msg: obj) = mailbox.Receive()
                match msg with
                | Event e -> return! view (e.Delta :: state)
                | Query q -> 
                    match q with
                    | LastValue -> mailbox.Sender() <! [List.head state]
                    | Last10 -> mailbox.Sender() <! (List.take 10 state)
                    return! view state
                | _ -> return Unhandled
            }
        view []

// use switch to cast IActorRef<> generic param from object to CountHistoryQuery
let typedHistoryView = counterHistoryView.Switch ()
typedHistoryView <! Last10
chillitom commented 8 years ago

That sounds good. I think I've been getting a bit confused as I assumed that the 'Message type on View had to match the event types being persisted from the write side actor.