Horusiath / Akkling

Experimental F# typed API for Akka.NET
Apache License 2.0
225 stars 45 forks source link

Parent (a supervisor of its child) not received Terminated message of child #138

Closed ingted closed 3 years ago

ingted commented 3 years ago
#r "nuget: System.Runtime.CompilerServices.Unsafe"
#r "nuget: System.Collections.Immutable"
#r "nuget: Akka"
#r "nuget: Akka.FSharp"
#r "nuget: Akkling"

//open Akka.FSharp
open Akka.Actor
open Akkling
open Akkling.Actors

let system = Akka.FSharp.System.create "NotificationWorker" (Akka.FSharp.Configuration.defaultConfig())

let prop : Props<obj> =
    props(fun m ->
        let single = lazy 123
        let single2 = lazy 123
        let mutable c = Unchecked.defaultof<IActorRef>
        let create () = 
            Akka.FSharp.Spawn.spawn m ("CHILD" + System.Guid.NewGuid().ToString()) <|             
                Akka.FSharp.Spawn.actorOf2 (
                    fun mb m -> 
                        if not single2.IsValueCreated then
                            mb.Defer (fun () -> 
                                if not single.IsValueCreated then
                                    printfn "child is down, repeat"
                                    single.Value |> ignore
                            )
                            single2.Value |> ignore
                        printfn "child: %A" m
                        if m.GetType() = typeof<int> then
                            if unbox m = 0 then //failwith "childError"
                                c.Tell PoisonPill.Instance 
                )
        c <- create ()

        let rec loop () = actor {
            let! msg = m.Receive ()
            match msg with
            | Terminated(ref, existenceConfirmed, addressTerminated) ->  
                printfn "parent ===> %A %A" (existenceConfirmed, addressTerminated) ref.Path
                c <- create()
            | x -> 
                c.Tell x
                printfn "%A" x
            return! loop ()
        }
        loop ()) 

let strategy =
    Strategy.AllForOne (fun e ->
        match e with
        //| :? DivideByZeroException -> Directive.Resume
        //| :? ArgumentException -> Directive.Stop
        | _ -> Directive.Stop)

let aref = 
    Akkling.Spawn.spawn system ("test" + System.Guid.NewGuid().ToString()) {
        prop with SupervisionStrategy = Some strategy
    }

aref <! box 0

And result:

child: PreStart PreStart

child: val it : unit = ()

0 0 child is down, repeat

"parent ===> %A %A" was not printed...

However, if we use a monitor and try to terminate the parent, the monitor would receive the Termiated message

let watcher:IActorRef<obj> = 
    spawnAnonymous system (props <| fun context ->    
        monitor context aref |> ignore
        let rec loop() = actor {
            let! msg = context.Receive()
            match msg with 
            | Terminated(ref, existenceConfirmed, addressTerminated) -> 
                printfn "monitor ===> %A %A" (existenceConfirmed, addressTerminated) ref.Path
                return! loop()
            | _ -> return Unhandled        
        }
        loop ())

(retype aref) <! PoisonPill.Instance

Results:

PostStop monitor ===> monitor ===> [INFO][12/29/2020 5:29:12 AM][Thread 0043][akka://NotificationWorker/user/test1df80b93-e957-4a06-b18a-6fcdfede6eae/CHILD67c2e185-3c52-4a75-ada8-74c81212d337] Message [LifecycleEvent] from akka://NotificationWorker/user/test1df80b93-e957-4a06-b18a-6fcdfede6eae to akka://NotificationWorker/user/test1df80b93-e957-4a06-b18a-6fcdfede6eae/CHILD67c2e185-3c52-4a75-ada8-74c81212d337 was not delivered. [1] dead letters encountered. If this is not an expected behavior then akka://NotificationWorker/user/test1df80b93-e957-4a06-b18a-6fcdfede6eae/CHILD67c2e185-3c52-4a75-ada8-74c81212d337 may have terminated unexpectedly. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. (true, false) (true, false) akka://NotificationWorker/user/test1df80b93-e957-4a06-b18a-6fcdfede6eae akka://NotificationWorker/user/test1df80b93-e957-4a06-b18a-6fcdfede6eae

ingted commented 3 years ago
#r "nuget: System.Runtime.CompilerServices.Unsafe"
#r "nuget: System.Collections.Immutable"
#r "nuget: Akka"
#r "nuget: Akka.FSharp"
#r "nuget: Akkling"

//open Akka.FSharp
open Akka.Actor
open Akkling
open Akkling.Actors

let system = Akka.FSharp.System.create "NotificationWorker" (Akka.FSharp.Configuration.defaultConfig())

let prop : Props<obj> =
    props(fun m ->
        let single = lazy 123
        let single2 = lazy 123
        let mutable c = Unchecked.defaultof<IActorRef>
        let mutable c2 = Unchecked.defaultof<IActorRef<obj>>
        let create () = 
            Akka.FSharp.Spawn.spawn m ("CHILD" + System.Guid.NewGuid().ToString()) <|             
                Akka.FSharp.Spawn.actorOf2 (
                    fun mb m -> 
                        if not single2.IsValueCreated then
                            mb.Defer (fun () -> 
                                if not single.IsValueCreated then
                                    printfn "child is down, repeat"
                                    single.Value |> ignore
                            )
                            single2.Value |> ignore
                        printfn "child: %A" m
                        if m.GetType() = typeof<int> then
                            if unbox m = 0 then //failwith "childError"
                                c.Tell PoisonPill.Instance 
                )
        let create2 () = 
            spawn m ("CHILD2" + System.Guid.NewGuid().ToString()) <| props(           
                actorOf2 (
                    fun mb m -> 
                        printfn "child2: %A" m
                        if m.GetType() = typeof<int> then
                            if unbox m = 0 then //failwith "childError"
                                c2.Tell(PoisonPill.Instance, Akka.Actor.ActorRefs.Nobody) 
                        Ignore
                ))
        c <- create ()
        c2 <- create2 ()

        let rec loop () = actor {
            let! msg = m.Receive ()
            match msg with
            | Terminated(ref, existenceConfirmed, addressTerminated) ->  
                printfn "parent ===> %A %A" (existenceConfirmed, addressTerminated) ref.Path
                c <- create()
                c2 <- create2()
            | x -> 
                c.Tell x
                c2.Tell(x, Akka.Actor.ActorRefs.Nobody)
                printfn "%A" x
            return! loop ()
        }
        loop ()) 

let strategy =
    Strategy.AllForOne (fun e ->
        match e with
        //| :? DivideByZeroException -> Directive.Resume
        //| :? ArgumentException -> Directive.Stop
        | _ -> Directive.Stop)

let aref = 
    Akkling.Spawn.spawn system ("test" + System.Guid.NewGuid().ToString()) {
        prop with SupervisionStrategy = Some strategy
    }

let watcher:IActorRef<obj> = 
    spawnAnonymous system (props <| fun context ->    
        monitor context aref |> ignore
        let rec loop() = actor {
            let! msg = context.Receive()
            match msg with 
            | Terminated(ref, existenceConfirmed, addressTerminated) -> 
                printfn "monitor ===> %A %A" (existenceConfirmed, addressTerminated) ref.Path
                return! loop()
            | _ -> return Unhandled        
        }
        loop ())

//(retype aref) <! PoisonPill.Instance
aref <! box 0

spawn with Akkling child are the same... Or is this by design of Akka.net? Poisoned child will not be able to send message to parent anymore...?

Horusiath commented 3 years ago

If you want to receive Terminated message from any actor, you need to monitor (Akka.NET Watch method) that actor first.

ingted commented 3 years ago

Oh... got it!! Thanks so much!!