Azure / azure-functions-servicebus-extension

Service Bus extension for Azure Functions
MIT License
65 stars 36 forks source link

ServiceBus Session support #16

Closed Zenuka closed 4 years ago

Zenuka commented 9 years ago

I've posted this question yesterday on StackOverflow but I guess this is a better place to ask; is it possible to have something like ServiceBusSessionTrigger for handling session through the webjobs sdk?

mathewc commented 9 years ago

Can you detail more on the scenarios you're thinking of for it, and how you'd like it to work? The new extensibility model we're opening up would allow you to write one yourself if you wanted to. However, if this is something that many people would find useful, it would make sense for us to bake into our core ServiceBus assembly.

It's not clear to me whether this would need to be a completely new trigger binding, or whether on the trigger/receive side of things it couldn't just be an additional mode on the existing ServiceBusTrigger binding. Behind the scenes, it polls for and accepts sessions, and invokes your job function for session messages.

As far as sending session based messages, I think our ServiceBus binding (e.g. used when sending a BrokeredMessage via out param binding) would work as is? You just set your session ID on the message before sending.

Zenuka commented 9 years ago

I'm going to use it for handling the messages of one session in a specific sequence and preferably multiple sessions at once. Since it's currently not possible, I'm going to use this article to process multiple sessions per webjob. How should it work? Good question... I think it would be nice to have a method something like this:

public static void ProcessQueueMessages([ServiceBusTrigger(queueName:"inputqueue", useSessions: true)] List<string> messages) { ... }

Or whatever type of object you expect. Also you would need to specify somewhere in the config the timeout between sessions; after how many (milli)seconds will you assume the session is finished?

And you are correct, for sending messages you just need to set something in the SessionId.

JasonBSteele commented 8 years ago

I'd love to see this feature as well. I need my inbound messages to be in a guaranteed delivery order, and it seems using the Service Bus Session is the way to do this. However it is not at all clear how I would implement this in a WebJob.

isaacabraham commented 8 years ago

@mathewc

Not having this is a blocker for me currently. We're using queues + the webjobs SDK as a very lightweight, scalable reactive work item list - messages come on, and processing those messages leads to 0 .. M new messages going back onto the queue. When we need to have guaranteed sequential processing - almost simulating an actor-style model - service bus sessions is the only option. This is crucial if we have some critical region of data processing and don't want e.g. transactions on SQL database (or can't e.g. Azure blobs).

Having an easy way to target messages on a specific session and have them work transparently within webjobs would be a huge benefit and would open a ton of possibilities. So on the input side I would love to see it "just work": -

MikeYeager commented 8 years ago

I need this feature as well. We have a WebAPI call that's taking longer and longer as we add more work to each call. When it started to timeout in Azure, we decided to move the process into a WebJob so that clients can initiate the task, then check back to get progress. The clients in this case are browsers making AJAX calls. If we can't correlate the requests with the responses, they can't check on progress or completion of the WebJob. We started looking at Azure Storage Queues, but found that there's no way for the original caller to only receive responses intended for them. We can't create a response queue for each client, because each client is a browser. We saw that Service Bus Queues and Topics allow us to use the SessionId to correlate requests and responses. The client (browser) can filter the response queue to only see responses intended for them. It sounded like a perfect solution. After wrestling with configuration we got the JobHost.RunAndBlock() to almost run, but we get the following error:

It is not possible for an entity that requires sessions to create a non-sessionful message receiver.

We found this documentation at http://www.cloudcasts.net/devguide/Default.aspx?id=13029

If sessions are required on a queue it is not possible to use the QueueClient Receive method to dequeue messages. Attempting to do so will result in.

InvalidOperationException

It is not possible for an entity that requires sessions to create a non-sessionful message receiver.

We assume the WebJobs SDK is using the QueueClient Receive method. We're going to try to work around this by having the request queue send a Guid as part of the payload and not require Sessions on the request queue. We'll require sessions on the response queue and see if we can use the Guid in the payload as the SessionId for the response. Will let you know if that works.

isaacabraham commented 8 years ago

One thing I've noticed since my OP is that you can use [<Singleton>] on e.g. a Storage Queue and use the SingletonScope to achieve virtually the same thing (as far as I can see) as Service Bus sessions. Can anyone confirm that?

MikeYeager commented 8 years ago

FYI, the work-around mentioned above worked perfectly: "having the request queue send a Guid as part of the payload and not requiring Sessions on the request queue. We'll require sessions on the response queue and use the Guid in the payload as the SessionId for the response."

fjeldstad commented 8 years ago

I would really like to see this implemented as well. I don't use the WebJobs SDK directly that much, but if I understand it correctly this would allow for Azure Functions written in for example JavaScript to handle message sessions (something that's not supported in the Node.js Azure SDK since it uses the REST interface of Azure Service Bus).

Bigshooter commented 7 years ago

Was there any traction on this?

benjamineberle commented 7 years ago

+1 Our requirement is FIFO by tenant in a multi-tenant system. In this case data imports via upsert

brettsam commented 7 years ago

@amit-kumar2 -- is this the issue that you brought up to me yesterday? If so, would you want to weigh in with your scenario here so we have it captured?

appalaraju commented 6 years ago

HI,

i am using below code with Servicebus and Topic-Subscription with session enabled

public static void ProcessTopicMessage([ServiceBusTrigger("%ServicebusTopic%", "%ServicebusSubscription%")] BrokeredMessage message, TextWriter logger) { }

but the problem i face with continuously running web job is host.RunAndBlock(); throws below error so how to resolve this issue? Azure web job does not support Session enabled topic-subscription?

An unhandled exception of type 'System.InvalidOperationException' occurred in mscorlib.dll

Additional information: It is not possible for an entity that requires sessions to create a non-sessionful message receiver. TrackingId:074fbc74-b5e9-4fc2-886b-ed2f4e324304_G5_B15, SystemTracker:az-g3ms-servicebus:Topic:az-rsd-dev-topic|az-rsd-dev-subscription, Timestamp:5/11/2018 5:16:14 AM

raulbojalil commented 6 years ago

Hi @appalaraju were you able to resolve your issue?. In my case I'm not getting any exceptions, but the web job is not being fired either. If I disable sessions in my queue the web job starts working again.

MikeYeager commented 6 years ago

As far as we have been able to determine, we simply have to accept that the queue will throw exceptions on occasion with no discern-able reason so we just have to handle and ignore it. As far as not working when sessions are enabled, we have not encountered that. Have you tried turning on diagnostics in the portal?

mathewc commented 6 years ago

Related to background errors coming from MessageReceivers - we just made some improvements in that area: https://github.com/Azure/azure-webjobs-sdk/issues/1703

appalaraju commented 6 years ago

Hi @raulbojalil,MikeYeager

Have you tried with Topic + Session enabled Subscription?

in my case, i have created a web job and used the code(ProcessTopicMessage) which i specified above with Topic and session enabled subscription and ran it locally( in my machine).

i am facing the issue which i specified above.

any idea on when can Azure web job works properly with Topic + Session enabled Subscription?

JohannesHoppe commented 6 years ago

Warning: This horse is dead 🐴!

You need to process your messages in FIFO style? Better use some other technology and stop waiting for Microsoft to support their own technology stack!

ridhoq commented 6 years ago

If Microsoft cannot prioritize this, would anybody else be interested in attempting to put together a PR for this? I am not sure I can do it alone, but I'm happy to take a stab at it.

davidrevoledo commented 6 years ago

It'd be good to have this feature :(

JasonBSteele commented 6 years ago

So it's been over 2 years since this issue was raised and still nothing on whether/when this will be addressed.

Please vote for it at https://feedback.azure.com/forums/355860-azure-functions/suggestions/13882992-support-for-session-enabled-azure-service-bus-queu

davidrevoledo commented 5 years ago

This is a limitation for us, any eta to implement this ?

davidrevoledo commented 5 years ago

@ridhoq If you want I join the community support for this

JasonBSteele commented 5 years ago

On October 18th 2018 a comment was added to https://feedback.azure.com/forums/355860-azure-functions/suggestions/13882992-support-for-session-enabled-azure-service-bus-queu to say the work has been planned... so there is hope!

fabiocav commented 5 years ago

Assigning this to sprint 45 for initial design and scoping

brettsam commented 5 years ago

Design is ready for internal review. Will be reviewing and planning in Sprint 47. When finalized, we'll respond back here.

fabiocav commented 5 years ago

Design work completed. Moving this to sprint 48 to track the implementation

alrod commented 5 years ago

Pre-release nuget package is out. Change Microsoft.Azure.WebJobs.Extensions.ServiceBus nuget package in your VS project if you want to try to: https://www.nuget.org/packages/Microsoft.Azure.WebJobs.Extensions.ServiceBus/3.1.0-beta1

Using: public static void Run([ServiceBusTrigger("core-test-queue1-sessions", Connection = "AzureWebJobsServiceBus", IsSessionsEnabled = true)]string myQueueItem, ClientEntity clientEntity, ILogger log)

IsSessionsEnabled = true - if you are pointed to a sessions enabled queue/subscription. ClientEntity clientEntity - for management operations (like MessageReceiver for a regular queue/subscription).

Also you can specify new SessionHandlerOptions section in host.json:

{
    "version": "2.0",
    "extensions": {
        "serviceBus": {
            "SessionHandlerOptions":
             {
                "MaxAutoRenewDuration": "00:01:00",
                "MessageWaitTimeout": "00:05:00",
                "MaxConcurrentSessions": 16,
                "AutoComplete": true,
             }
        }
    }
}

I'll appreciate for any feedback.

fabiocav commented 5 years ago

Moving this to sprint 49 to allow more time for feedback before this work is merged.

appalaraju commented 5 years ago

Is this feature available in Azure functions?

appalaraju commented 5 years ago

does this feature supported by topic-subscription?

erik-neumann commented 5 years ago

@alrod I cant access the package using your link https://myget.org/feed/azure-appservice/package/nuget/Microsoft.Azure.WebJobs.Extensions.ServiceBus/3.0.5-11562 anymore. Few days ago it worked. Any idea? I was using the ServiceBus extension using the Session support, and it worked pretty well so far.

lukasvosyka commented 5 years ago

Having the same issue as @erik-neumann ! Cannot see the Microsoft.Azure.WebJobs.Extensions.ServiceBus nightly builds at all anymore. They seem to be not listed anymore at https://myget.org/gallery/azure-appservice

alrod commented 5 years ago

@erik-neumann, @lukasvosyka please use follwoing link to download the package: https://www.nuget.org/packages/Microsoft.Azure.WebJobs.Extensions.ServiceBus/3.1.0-beta1

alrod commented 5 years ago

@appalaraju , SB sessions feature will be available in PROD later (in May). You can try it using instructions above and it works with topic/subscriptions.

ppusateri commented 5 years ago

@alrod - Trying to use 3.1.0 beta 1, but package restoration fails because it depends on Microsoft.Azure.WebJobs >= 3.0.8. The latest I can see is 3.0.6. Do we need to wait until 3.0.8 is available to try this?

erik-neumann commented 5 years ago

@ppusateri I made it run by additionally referencing Microsoft.Azure.WebJobs 3.0.8 from the myGet feed: https://myget.org/feed/azure-appservice/package/nuget/Microsoft.Azure.WebJobs/3.0.8

paulbatum commented 5 years ago

@ppusateri @erik-neumann sorry about this, I can confirm there's a problem regarding the availability of the 3.0.8 package on NuGet. We're double checking now,

fabiocav commented 5 years ago

Again, apologies about this. The packages are up on NuGet and the dependencies should be valid now.

Please let us know if you continue to run into issues.

blake-dan commented 5 years ago

Hi, @fabiocav, @alrod

I have a ServiceBusTrigger function running great where it is getting the message as a string with IsSessionsEnabled = true. However, when I try to bind to the message as a BrokeredMessage I'm getting a deserialization error. The messages do have a session id in the header. Here is the error:

System.Private.CoreLib: Exception while executing function: QueueTrigger. Microsoft.Azure.WebJobs.Host: Exception binding parameter 'message'. System.Private.DataContractSerialization: There was an error deserializing the object of type Microsoft.ServiceBus.Messaging.BrokeredMessage. The input source is not correctly formatted. System.Private.DataContractSerialization: The input source is not correctly formatted.

Here is my function definition: [FunctionName("QueueTrigger")] public static void Run( [ServiceBusTrigger("%AsbQueueName%", Connection = "AsbQueueConnection", IsSessionsEnabled = true)] BrokeredMessage message, ILogger log) {

Should this work? Thanks!

JasonBSteele commented 5 years ago

Here is my function definition: [FunctionName("QueueTrigger")] public static void Run( [ServiceBusTrigger("%AsbQueueName%", Connection = "AsbQueueConnection", IsSessionsEnabled = true)] BrokeredMessage message, ILogger log)

The example above uses a string: public static void Run([ServiceBusTrigger("core-test-queue1-sessions", Connection = "AzureWebJobsServiceBus", IsSessionsEnabled = true)]string myQueueItem, ClientEntity clientEntity, ILogger log)

Also BrokeredMessage is from the old Service Bus nuget package which this may not be compatible with.

appalaraju commented 5 years ago

Hi ,

I just want your confirmation whether "ServiceBus Session" support my requirement or not?

my requirement is there are multiple devices which processes cloud requests concurrently and provide responses to the each cloud request with some responses in the specific order like ( In-progress,pending,going-on,success) at the same time. Please look into below example

  1. Let us say there are 2 devices ( ex: DV1 and DV2).
  2. Each device sends response messages to IOT Hub in sequential/ordered manner at the same time. ( ex : DV1-In-progress,DV1-pending,DV1-going-on,DV1-success). ( ex : DV2-In-progress,DV2-pending,DV2-going-on,DV2-success).
  3. all brokeredmessage coming from same device will be sent to cloud with same session key in order to maintain the sequence.
  4. Let us say there are 2 web jobs running in the cloud with IsSessionsEnabled = true
  5. So if messages from different devices(ex: DV1 and DV2) reach topic-subscription at the same time then there is chance that topic-subscription will be filled with messages in the order of jumbling like ( ex : DV1-In-progress,DV2-In-progress,DV1-pending,DV1-going-on,DV2-pending,DV1-success,DV2-going-on,DV2-success)
  6. So here 2 web jobs try read the messages from topic at the same time
  7. So as per my understanding, "IsSessionsEnabled = true" concept enables one web job to read Device specific messages in the same order sent by one device(ex: DV1) ( ex: DV1-In-progress,DV1-pending,DV1-going-on,DV1-success) and other web job reads DV2 messages ( ex: DV2-In-progress,DV2-pending,DV2-going-on,DV2-success)
  8. is my understanding correct?

Thanks, Raju

JasonBSteele commented 5 years ago

3. Each brokeredmessage will be sent to cloud with unique session key.

You need to set the SessionId of the message to the device Id so that the messages you want to keep in sequence with each other share the same SessionId.

alrod commented 5 years ago

@blake-dan, SB Sessions is a V2 runtime feature. You should use Message instead of BrokeredMessage: https://docs.microsoft.com/en-us/azure/azure-functions/functions-bindings-service-bus#trigger---usage

alrod commented 5 years ago

@appalaraju, @JasonBSteele is right.

If you want guaranteed ServiceBusTrigger processing sequence "DV1-In-progress,DV1-pending,DV1-going-on,DV1-success", DV1 must send the messages with the same SessinoId and with the order "DV1-In-progress,DV1-pending,DV1-going-on,DV1-success"

appalaraju commented 5 years ago

@JasonBSteele , @alrod --> thanks for your confirmation and valuable details.

appalaraju commented 5 years ago

Hi All,

I have few more questions as below.Please look into them.

  1. I am using topic-subscription so web jobs read messages from the defined topic-subscription. there is option called "Enable Partitioning" for a topic so to make "ServiceBus Session" concept works properly, should "Enable Partitioning" option be set to "True" or "False"?
  2. Is there any document published by Microsoft team to understand "ServiceBus Session" concept more clearly?

please look into below example.

1) Let us say there are 2 devices ( ex: DV1 and DV2). 2) Each device sends response messages to IOT Hub in sequential/ordered manner at the same time. ( ex : DV1-In-progress,DV1-pending,DV1-going-on,DV1-success). ( ex : DV2-In-progress,DV2-pending,DV2-going-on,DV2-success). 3) all brokeredmessage coming from same device will be sent to cloud with same session key in order to maintain the sequence. 4) Let us say there is 1 web job( 1 concurrent thread) running in the cloud with IsSessionsEnabled = true 5) So if messages from different devices(ex: DV1 and DV2) reach topic-subscription at the same time then there is chance that topic-subscription will be filled with messages in the order of jumbling like ( ex : DV1-In-progress,DV2-In-progress,DV1-pending,DV1-going-on,DV2-pending,DV1-success,DV2-going-on,DV2-success) 6) So here 1 web job try read the messages from topic. 7) So as per my understanding, "IsSessionsEnabled = true" concept enables that the single web job firstly reads all same Device specific messages in the same order sent by one device(ex: DV1) ( ex: DV1-In-progress,DV1-pending,DV1-going-on,DV1-success) so after reading DV1 messages then only that web job reads DV2 messages ( ex: DV2-In-progress,DV2-pending,DV2-going-on,DV2-success) 8) is my understanding correct?

dcarr42 commented 5 years ago

Partitioning is not required for session queues. But if enabled the high consistency is reduced as messaging operations are always routed to a specific partition.

JasonBSteele commented 5 years ago

7. So as per my understanding, "IsSessionsEnabled = true" concept enables that the single web job firstly reads all same Device specific messages in the same order sent by one device(ex: DV1) ( ex: DV1-In-progress,DV1-pending,DV1-going-on,DV1-success) so after reading DV1 messages then only that web job reads DV2 messages ( ex: DV2-In-progress,DV2-pending,DV2-going-on,DV2-success)

This is my understanding...

By default your WebJob or Function can have multiple instances running. An instance will be created to handle the first DV1 message, then another will be created to handle the first DV2 messages. While these instances hold on to the Session Lock they will continue to receive the messages for the relevant devices.

If they don't receive a message from a device for a while they will release the Session Lock and be shut down. When a new message for the device eventually does arrive a new instance is created that will continue to receive messages for that device until it too is shut down.

The important thing is that there will only ever be one instance receiving messages for a Device/Session and it will receive them one at a time. But there will be multiple instances which handle other Devices/Sessions in parallel.

brotaru89 commented 5 years ago

Hi, @fabiocav @alrod

Using the trigger with MessageReceiver does not work when IsSessionsEnabled is set to True. Example:

([ServiceBusTrigger("my.queue.with.sessions", Connection = "ServiceBusEndpoint", IsSessionsEnabled = true)]string myQueueItem, string messageId, ILogger log, MessageReceiver messageReceiver, string lockToken)

The "messageReceiver" variable is NULL. Can you please have a look? Thanks!

dcarr42 commented 5 years ago

@brotaru89 I think this is intentional you will need to bind to ClientEntity and cast.