Deferring Processing of Azure Service Bus Messages

Sometimes when you’re handling a message from a message queue, you realise that you can’t currently process it, but might be able to at some time in the future. What would be nice is to delay or defer processing of the message for a set amount of time.

Unfortunately, with brokered messages in  Azure Service Bus, there is no built-in feature to do this simply, but there are a few workarounds. In this post, we’ll look at four separate techniques: let the lock time out, sleep and abandon, defer the message, and resubmit the message.

Let the Lock Time Out

The simplest option is doing nothing. When you get your BrokeredMessage, don’t call Complete or Abandon. This will mean that the lock on the message will eventually time out, and it will become available for processing again once that happens. By default the lock duration for a message is 1 minute, but this can be configured for a queue by using the QueueDescription.LockDuration property.

The advantage is that this is a very simple way of deferring re-processing the message for about a minute. The main disadvantage is that the time is not so easy to control as the lock duration is a property of the queue, not the message being received.

In the following simple example, we create a queue with a lock duration of 30 seconds, send a message, but then never actually complete or abandon it in the handler. This results in us seeing the same message getting retried with an incrementing Delivery Count until eventually it is dead-lettered automatically on the 10th attempt.

// some connection string
string connectionString = "";
const string queueName = "TestQueue";

// PART 1 - CREATE THE QUEUE
var namespaceManager = 
    NamespaceManager.CreateFromConnectionString(connectionString);

// ensure it is empty
if (namespaceManager.QueueExists(queueName))
{
    namespaceManager.DeleteQueue(queueName);
}
var queueDescription = new QueueDescription(queueName);
queueDescription.LockDuration = TimeSpan.FromSeconds(30);
namespaceManager.CreateQueue(queueDescription);

// PART 2 - SEND A MESSAGE
var body = "Hello World";
var message = new BrokeredMessage(body);
var client = QueueClient.CreateFromConnectionString(connectionString, 
                                                    queueName);
client.Send(message);

// PART 3 - RECEIVE MESSAGES
// Configure the callback options.
var options = new OnMessageOptions();
options.AutoComplete = false; // we will call complete ourself
options.AutoRenewTimeout = TimeSpan.FromMinutes(1); 

// Callback to handle received messages.
client.OnMessage(m =>
{
    // Process message from queue.
    Console.WriteLine("-----------------------------------");
    Console.WriteLine($"RX: {DateTime.UtcNow.TimeOfDay} - " + 
                      "{m.MessageId} - '{m.GetBody()}'");
    Console.WriteLine($"DeliveryCount: {m.DeliveryCount}");

    // Don't abandon, don't complete - let the lock timeout
    // m.Abandon();
}, options);

Sleep and Abandon

If we want greater control of how long we will wait before resubmitting the message, we can explicitly call abandon after sleeping for the required duration. Sadly there is no AbandonAfter method on brokered message. But it’s very easy to wait and then call Abandon. Here we wait for two minutes before abandoning the message:

client.OnMessage(m =>
{
    Console.WriteLine("-----------------------------------");
    Console.WriteLine($"RX: {DateTime.UtcNow.TimeOfDay} -" + 
                      " {m.MessageId} - '{m.GetBody()}'");
    Console.WriteLine($"DeliveryCount: {m.DeliveryCount}");

    // optional - sleep until we want to retry
    Thread.Sleep(TimeSpan.FromMinutes(2));

    Console.WriteLine("Abandoning...");
    m.Abandon();

}, options);

Interestingly, I thought I might need to periodically call RenewLock on the brokered message during the two minute sleep, but it appears that the Azure SDK OnMessage function is doing this automatically for us. The down-side of this approach is of course that our handler is now in charge of marking time, and so if we wanted to hold off for an hour or longer, then this would tie up resources in the handling process, and wouldn’t work if the computer running the handler were to fail. So this is not ideal.

Defer the Message

It turns out that BrokeredMessage has a Defer method whose name suggests it can do exactly what we want – put this message aside for processing later. But, we can’t specify how long we want to defer it for, and when you defer it, it will not be retrieved again by the OnMessage function we’ve been using in our demos.

So how do you get a deferred message back? Well, you must remember it’s sequence number, and then use a special overload of QueueClient.Receive that will retrieve a message by sequence number.

This ends up getting a little bit complicated as now we need to remember the sequence number somehow. What you could do is post another message to yourself, setting the ScheduledEnqueueTimeUtc to the appropriate time, and that message simply contains the sequence number of the deferred message. When you get that message you can call Receive passing in that sequence number and try to process the message again.

This approach does work, but as I said, it seems over-complicated, so let’s look at one final approach.

Resubmit Message

The final approach is simply to Complete the original message and resubmit a clone of that message scheduled to be handled at a set time in the future. The Clone method on BrokeredMessage makes this easy to do. Let’s look at an example:

client.OnMessage(m =>
{
    Console.WriteLine("--------------------------------------------");
    Console.WriteLine($"RX: {m.MessageId} - '{m.GetBody()}'");
    Console.WriteLine($"DeliveryCount: {m.DeliveryCount}");

    // Send a clone with a deferred wait of 5 seconds
    var clone = m.Clone();
    clone.ScheduledEnqueueTimeUtc = DateTime.UtcNow.AddSeconds(5);
    client.Send(clone);

    // Remove original message from queue.
    m.Complete();
}, options);

Here we simply clone the original message, set up the scheduled enqueue time, send the clone and complete the original. Are there any downsides here?

Well, it’s a shame that sending the clone and completing the original are not an atomic operation, so there is a very slim chance of us seeing the original again should the handling process crash at just the wrong moment.

And the other issue is that DeliveryCount on the clone will always be 1, because this is a brand new message. So we could infinitely resubmit and never get round to dead-lettering this message.

Fortunately, that can be fixed by adding our own resubmit count as a property of the message:

client.OnMessage(m =>
{
    int resubmitCount = m.Properties.ContainsKey("ResubmitCount") ? 
                       (int)m.Properties["ResubmitCount"] : 0;

    Console.WriteLine("--------------------------------------------");
    Console.WriteLine($"RX: {m.MessageId} - '{m.GetBody<string>()}'");
    Console.WriteLine($"DeliveryCount: {m.DeliveryCount}, " + 
                      $"ResubmitCount: {resubmitCount}");

    if (resubmitCount > 5)
    {
        Console.WriteLine("DEAD-LETTERING");
        m.DeadLetter("Too many retries", 
                     $"ResubmitCount is {resubmitCount}");
    }
    else
    {
        // Send a clone with a deferred wait of 5 seconds
        var clone = m.Clone();
        clone.ScheduledEnqueueTimeUtc = DateTime.UtcNow.AddSeconds(5);
        clone.Properties["ResubmitCount"] = resubmitCount + 1;
        client.Send(clone);

        // Remove message from queue.
        m.Complete();
    }
}, options);

Happy coding!

MongoDb example

Simple example for MongoDB. Save and retrieve data from Azure Cosmos DB.

Create an Azure Cosmos Db as MongoDb

For creating a new MongoDb on Azure, search from the list of resources, Azure Cosmos Db. Then Add a new database and you see the following screen.

Add new CosmosDB

Overview

When you created a new MongoDb, you see an Overview where there are general information about how many queries the database did (split on insert, update, cancel, query, count, others).

Azure Overview

Under Connection String you have the connection to use in your application. In this project you insert in Program.cs the connection string in the variable called connectionString.

DataExplorer

You can explorer all data in your Mongo database. Click on Data Explorer and you can see everything. Also, you can execute same queries.

Azure CosmosDb Data Explorer

You find an example application on my GitHub.

WebRole and WorkerRole Templates in VS 2015

Download the Azure SDK for Visual Studio 2015 from here: https://azure.microsoft.com/en-us/downloads/

It should force you to close Visual Studio, but if it doesn't, do so anyways. Once it's installed, you can reboot it.

When you go to add a new project, you can look under Cloud and then choose Azure Cloud Service.

Visual-Studio-2015-New-Project-Azure-Cloud-Service

This will give you the same old familiar screen, where you can choose a Web Role or Worker Role:

Visual-Studio-2015-New-Project-Azure-Cloud-Service

Microsoft Azure Storage Explorer for OS X, Linux, and Windows (and it's free)

microsoft-azure-storage-explorer-screenshot

Microsoft Azure Storage Explorer (Preview) is a standalone app from Microsoft that allows you to easily work with Azure Storage data. The Preview release currently supports Azure Blobs only. Tables, queues, and files are coming soon.

Features

  • Mac OS X, Linux, and Windows versions
  • Sign in to view your Storage Accounts – use your Org Account, Microsoft Account, 2FA, etc
  • Add Storage Accounts by account name and key, as well as custom endpoints
  • Add Storage Accounts for Azure China
  • Add blob containers with Shared Access Signatures (SAS) key
  • Local development storage (use storage emulator, Windows-only)
  • ARM and Classic resource support
  • Create and delete blobs, queues, or tables
  • Search for specific blobs, queues, or tables
  • Explore the contents of blob containers
  • View and navigate through directories
  • Upload, download, and delete blobs and folders
  • Open and view the contents text and picture blobs
  • View and edit blob properties and metadata
  • Generate SAS keys
  • Manage and create Stored Access Policies (SAP)
  • Search for blobs by prefix
  • Drag ‘n drop files to upload or download

Known Issues

  • Cannot view/take actions on Queues or Tables (coming soon!)
  • Linux install needs gcc version updated or upgraded – steps to upgrade are below:
    • sudo add-apt-repository ppa:ubuntu-toolchain-r/test
    • sudo apt-get update
    • sudo apt-get upgrade
    • sudo apt-get dist-upgrade
  • After reentering credentials, you may need to manually refresh Storage Explorer to show your Storage Accounts

Project Deco

Looking for the old open-source version? It's still available on GitHub!

Advertsing

125X125_06

Planet Xamarin

Planet Xamarin




TagCloud

MonthList