Scooletz / QueueBatch

WebJobs/Azure Functions trigger providing batches of Azure Storage Queues messages directly to your function
Apache License 2.0
41 stars 4 forks source link
azure azure-functions azure-storage webjob-functions

Build Status

Icon

QueueBatch

QueueBatch is an Azure Functions trigger providing ability to process Azure Storage Queue messages in batches.

Why?

Azure Functions trigger for Azure Storage Queue obtains messages in batches but dispatches them in separate tasks. It's ok, if a function touches different resources. If a function does some processing and then appends to a single append blob it won't scale up nicely. The problems that might occur are:

Accessing the same resource with high frequency might simply not work. With QueueBatch, you can address it, by processing all the messages from the batch in the same function, amortizing the cost of accessing other resources.

Usage

Basic

To use QueueBatch in your Function application, you need to use a custom IMessageBatch parameter type to accept a batch and mark it with an appropriate attribute. To resolve queue name from configuration file wrap it with % sign, i.e. %queue-app-setting-key%

public static void MyFunc([QueueBatchTrigger("myqueue")] IMessageBatch batch)
{
  foreach (var msg in batch.Messages)
  {
    // do something with payload
    DoSomething(msg.Payload);
  }

  // acknowledge processing
  batch.MarkAllAsProcessed ();
}

With SuccessOrFailAsBatch set to false, you can also acknowledge only some of the messages. The rest, will be retried in a similar manner to the regural [QueueTrigger]

public static void MyFunc([QueueBatchTrigger("myqueue", SuccessOrFailAsBatch = false)] IMessageBatch batch)
{
  foreach (var msg in batch.Messages)
  {
    // do something with payload
    if (DoSomething(msg.Payload))
    {
       // mark as processed only if successful
       batch.MarkAsProcessed (msg);
    }
  }
}

Faster queues

QueueBatch provides an alternative client for accessing Azure Storage Queues that is much faster then the one provided by SDK (up to 20x). To enable it (it's opt-in), you need to set UseFasterQueues to true.

public static void MyFunc([QueueBatchTrigger("myqueue", UseFasterQueues = true)] IMessageBatch batch)
{
  // ...
}

Parallel gets

As a single operation of getting messages can obtain no more than 32, you can request issuing multiple parallel gets. The maximum number of messages in a batch will be equal to 32 * ParallelGets.

public static void MyFunc([QueueBatchTrigger("myqueue", ParallelGets = 2)] IMessageBatch batch)
{
  // ...
}

Empty batches

Sometimes it might be useful to get notifications about empty batches as well. They can be used to do some other work, like compacting your data or sending a notification that there was a run with nothing to process. After each empty batch, the back-off strategy will delay the next query for messages even more. To enable calls with empty batches, specify the following property

public static void MyFunc([QueueBatchTrigger("myqueue", RunWithEmptyBatch = true)] IMessageBatch batch)
{
  // now, if no message is retrieved from the queue, MyFunc will still be called
}

Connection

Connection is an optional property to specify queue's storage account. It's a name of an app setting that contains the storage connection string to use for this binding. When connection property is empty then default AzureWebJobsStorage connection string is used.

public static void MyFunc([QueueBatchTrigger("myqueue", Connection = "StorageConnectionAppSetting")] IMessageBatch batch)
{
  // ...
}

Licensing

QueueBatch

QueueBatch is licensed under the Apache License 2.0 license.

Azure Webjobs SDK

Azure Webjobs SDK is licensed under the MIT license as described here. Azure Webjobs SDK sources are used and partially compiled into the QueueBatch distribution as allowed under the license terms found here.

Icon

Batch Download designed by Fatahillah from The Noun Project