fsprojects / FSharp.CloudAgent

Allows running F# Agents in a distributed manner using Azure Service Bus.
http://fsprojects.github.io/FSharp.CloudAgent/
The Unlicense
39 stars 14 forks source link

Messages exist in queue but are not received when listening #10

Closed talbottmike closed 9 years ago

talbottmike commented 9 years ago

I'm able to add messages to the queue but when I try to receive them nothing is received.

I've used the Azure Service Bus in linqpad with C# sample code so I know the queue works, but I'm probably missing something on the F# implementation with CloudAgent.

I tried serializing the object before adding it to the queue so that the content would just be a string that I could deserialize on receipt, but still had the same issue.

If I set a breakpoint on the line inbox.Receive() it gets hit, but a breakpoint on the following line never does.

Thanks for the help.

I'm not sure if it'd be more helpful to put the code in this issue or point you to my project so here are both. https://github.com/talbottmike/TalBot/tree/AzureServiceBus

The type I'm posting is in TalBot.Types...

type IncomingMessage = 
    {
        token : string;
        teamId : string;
        teamDomain : string;
        channelId : string;
        channelName : string;
        timestamp : decimal;
        userId : string;
        userName : string;
        text : string;
        triggerWord : string;
    }

I'm accessing the queue in TalBot.BotHelper around the beginning of the module.

let createResilientAgent agentId =
    MailboxProcessor.Start(fun inbox ->
        async {
            while true do
                let! message, replyChannel = inbox.Receive()
                printfn "%s is the channelName." message.channelName
                printfn "%s" (agentId.ToString())

                match message with
                | { channelName = "#bot-log" } -> 
                    printfn "success"
                    replyChannel Completed // all good, message was processed
                | { channelName = "snapple" } -> 
                    printfn "snapple failed"
                    replyChannel Failed // error occurred, try again
                | _ -> 
                    printfn "abandoned"
                    replyChannel Abandoned // give up with this message.
        })

let readFromServiceQueue () =
    let serviceBusReadConnection () = ServiceBusConnection serviceBusReadConnectionString
    let cloudReadConnection () = WorkerCloudConnection(serviceBusReadConnection (), Queue "queue")
    printfn "trying to read from the queue"
    let disposable = ConnectionFactory.StartListening(cloudReadConnection (), createResilientAgent >> Messaging.CloudAgentKind.ResilientCloudAgent)
    disposable.Dispose()    
isaacabraham commented 9 years ago

That should serialize fine. I'll try to repro this evening, but in the meantime can you check the payload of the message on the queue (you can use the Service Bus Explorer or maybe even just VS) to see how it has been serialized?

Also - what version of JSON .Net are you using?

talbottmike commented 9 years ago

Service Bus Explorer is a helpful tool. Thanks for pointing it out.

Here is the message text from a queue item.

{"token":"mytoken","teamId":"myteamid","teamDomain":"myteamdomain","channelId":"mychannelid","channelName":"bot-log","timestamp":1433205126.000015,"userId":"U0305UA8G","userName":"mtalbott","text":"bot test5","triggerWord":"bot"}

In my paket.lock file I'm referencing more than one version of Json. FSharp.CloudAgent (0.3) -Newtonsoft.Json (>= 6.0.6) Microsoft.AspNet.WebApi.Client (5.2.3) -Microsoft.Net.Http (>= 2.2.22) - framework: portable-wp80+win+net45+wp81+wpa81 -Newtonsoft.Json (>= 6.0.4) - framework: portable-wp80+win+net45+wp81+wpa81, >= net45 Newtonsoft.Json (6.0.8)

I need to get back to the code that pays the bills, but I can try setting these all to 6.0.6 this evening and see if that helps.

talbottmike commented 9 years ago

Setting the version of JSON .Net to 6.0.6 didn't help. I'm trying out VS2015 and noticed the diagnostic tools show exceptions that have been thrown and caught. There are repeated exceptions that state - Exception A System.InvalidOperationException was thrown: "The MailboxProcessor has already been started." System.InvalidOperationException

The exceptions occur when this line runs let disposable = ConnectionFactory.StartListening(cloudReadConnection (), createResilientAgent >> Messaging.CloudAgentKind.ResilientCloudAgent)

Not sure if that's helpful or unrelated.

talbottmike commented 9 years ago

Any other thoughts or suggestions on this?

isaacabraham commented 9 years ago

Looking at this today.

isaacabraham commented 9 years ago

OK. I think I know why you aren't receiving any messages - I think it's how you're using the agent. In the readFromServiceQueue function, you're starting up an agent pool listening to a service bus queue, but then immediately afterwards disposing of the pool.

I've not got a complete understanding of how you're intended to use it but I would recommend that you slightly change how your application uses CloudAgent. CloudAgent doesn't need you to manually start listening and stop listening on demand, which is what you're effectively doing now - the bot locks into an infinite while loop, constantly creating a listener and then shutting it down immediately after. Instead, all you need to do is on startup of your service, once, create the worker pool (i.e. the call to StartListening). CloudAgent will automatically poll in the background for new messages as they arrive and dispatch them to an agent as needed. You won't need to worry about threads being taken up or memory being used by agents as MailboxProcessors by their nature are very lightweight and when no messages are being processed do not utilise much in the way of resources.

Have a look at this sample https://gist.github.com/isaacabraham/48c29056bfe1c03ee303 which shows a basic client / server running (you can launch these as two console apps). In the context of your application, when you start the windows service up, that's where you want to start listening once. There's no need to lock into a loop polling - Cloud Agent does all of that for you.

talbottmike commented 9 years ago

Yep. That's what I was doing wrong. Thanks for your help. I appreciate it.