As part of the “2011-08-18” version, we have introduced several commonly requested features to the Windows Azure Queue service. The benefits of these new features are:
- Allow applications to store larger messages
- Allow applications to schedule work to be processed at a later time
- Allow efficient processing for long running tasks, by adding:
- Leasing: Processing applications can now extend the visibility timeout on a message they have dequeued and hence maintain a lease on the message
- Progress Tracking: Processing applications can update the message content of a message they have dequeued to save progress state so that a new worker can continue from that state if the prior worker crashed.
That was then
To better understand these features, let us quickly summarize the messaging semantics in Windows Azure Queue. The Windows Azure Queue service provides a scalable message delivery system that can be used to build workflow and decouple components that need to communicate. With the 2009-09-19 version of the service, users could add up to 8KB messages into the queue. When adding a message, users specify a time to live (< 7 days) after which the message is automatically deleted if it still exists in the queue. When added to the queue, a message is visible and a candidate to be dequeued to be processed by workers. Workers use a 2-phase dequeue/delete pattern. This semantic required the workers to estimate the time it would take to process the message at the time of message is retrieved, often referred to as a non-renewable lease period of the message called the “visibility timeout”. This non-renewable lease period had a limit of 2 hours. When the message is retrieved, a unique token called a pop receipt is associated with the message and must be used for subsequent operations on the message. Once the message is retrieved from the Queue, the message becomes invisible in the queue. When a message is completely processed, the worker subsequently issues a request to delete the message using the pop receipt. This 2-phase process ensures that a message is available to another worker if the initial worker crashes while processing the message.
This is now
With the 2011-08-18 version, we focused on streamlining the use of Windows Azure Queues to make them simpler and more efficient. First, we made it extremely simple for workers to process long running jobs efficiently – this required the ability to extend the lease on the message by providing a new visibility timeout. Without this ability, workers would have had to provide a generous lease period to the “Get Messages” API since the lease period is set before the message is inspected.
To further improve efficiency, we now allow workers to also update the message contents they have dequeued. This can be used to store progress information and intermittent states so that if the worker crashes, a new worker can resume the work rather than starting from scratch. Finally, we targeted scenarios that allow support for larger messages and allow scheduling of work when adding messages to the queue. To reiterate, the following features in the 2011-08-18 version, makes working with Windows Azure Queues simpler and more efficient:
- The maximum message size has been increased to 64KB which will allow more applications to store the full message in the queue, instead of storing the actual message contents in blobs, and to now keep progress information in the message.
- A message can be added to the queue with a visibility timeout so that it becomes visible to workers at a later time.
- A lease on the message can be extended by the worker that did the original dequeue so that it can continue processing the message.
- The maximum visibilitytimeout for both scheduling future work, dequeueing a message, and updating it for leasing has been extended to 7 days.
- The message content can now be updated to save the progress state, which allows other workers to resume processing the message without the need to start over from the beginning.
NOTE: The current storage client library (version 1.5) uses the 2009-09-19 version and hence these new features are not available. We will be releasing an update with these new features in a future release of the SDK. Until that time we have provided some extension methods later in this posting that allow you to start using these new features today.
We will now go over the changes to the Windows Azure Queue service APIs in detail.
PUT Message
The “PUT Message” REST API is used to add messages to the queue. It now allows the message content to be up to 64KB and also provides an optional visibility timeout parameter. For example, you can now put a message into the queue with a visibilitytimeout of 24 hours, and the message will sit in the queue invisible until that time. Then at that time it will become visible for workers to process (along with the other messages in that queue).
By default, the visibilitytimeout used is 0 which implies that a message becomes visible for processing as soon as it is added to the queue. The visibilitytimeout is specified in seconds and must be >= 0 and < 604,800 (7 days). It also should be less than the “time to live”. Time to live has a default value of 7 days after which a message is automatically removed from the queue if it still exists. A message will be deleted from the queue after its time to live has been reached, regardless of whether it has become visible or not.
REST Examples
Here is a REST example on how to add a message that will be visible in 10 minutes. The visibility timeout is provided as a query parameter to the URI called “visibilitytimeout” and is in seconds. The optional expiry time is provided as messagettl query parameter and is set in seconds here 2 days in this example.
Request:
POST http://cohowinery.queue.core.windows.net/videoprocessing/messages?visibilitytimeout=600&messagettl=172800&timeout=30 HTTP/1.1 x-ms-version: 2011-08-18 x-ms-date: Fri, 02 Sep 2011 05:03:21 GMT Authorization: SharedKey cohowinery:sr8rIheJmCd6npMSx7DfAY3L//V3uWvSXOzUBCV9Ank= Content-Length: 100 <QueueMessage> <MessageText>PHNhbXBsZT5zYW1wbGUgbWVzc2FnZTwvc2FtcGxlPg==</MessageText> </QueueMessage>
Storage Client Library Example
We will use the extension methods provided at the end of this blog to show how to add messages that are made visible at a later time.
Let us look at the scenario of a video processing workflow for Coho Winery. Videos are uploaded by the Marketing team at Coho Winery. Once these videos are uploaded, they need to be processed before it can be displayed on the Coho Winery web site – the workflow is:
- Scan for virus
- Encode the video in multiple formats
- Compress the video for efficiency and this is compressed to the new location that the website picks it up from.
When uploading the videos initially, the component adds a message to the queue after the videos is uploaded. However, 1 day is allowed before the video is processed to allow a period of time for changes to be made to the video in the workflow. The message is appended to the queue with delayed visibility to allow this grace 1 day time period. A set of instructions go into the message which include the format, encoder to use, compression to use, scanners to use etc. The idea is that in addition to this information required for processing the message, we will also save the current state in the message. The format used is as follows; the first 2 characters represent the processing state, followed by the actual content.
/// <summary> /// Add message for each blob in input directory. /// After uploading, add a message to the queue with invisibility of 1 day /// to allow the blob to be uploaded. /// </summary> private static void UploadVideos() { CloudQueueClient queueClient = Account.CreateCloudQueueClient(); CloudQueue queue = queueClient.GetQueueReference(QueueName); queue.EncodeMessage = false; string[] content = GetMessageContent(); for (int i = 0; i < content.Length; i++) { // upload the blob (not provided for brevity…) // Call the extension method provided at the end of this post queue.PutMessage( Account.Credentials, EncodeMessage(content[i], ProcessingState.VirusScan), StartVisibilityTimeout, // set to 1 day MessageTtl, // set to 3 days ServerRequestTimeout); } } /// <summary> /// The processing stages for a message /// </summary> public enum ProcessingState : int { VirusScan = 1, Encoder = 2, Compress = 3, Completed = 4 } /// <summary> /// Form of the queue message is: [2 digits for state][Actual Message content] /// </summary> /// <param name="content"></param> /// <param name="state"></param> /// <returns></returns> private static string EncodeMessage(string content, ProcessingState state) { return string.Format("{0:D2}{1}", (int)state, content); }
Update Message
The “Update Message” REST API is used to extend the lease period (aka visibility timeout) and/or update the message content. A worker that is processing a message can now determine the extra processing time it needs based on the content of a message. The lease period, specified in seconds, must be >= 0 and is relative to the current time. 0 makes the message visible at that time in the queue as a candidate for processing. The maximum value for lease period is 7 days. Note, when updating the visibilitytimeout it can go beyond the expiry time (or time to live) that is defined when the message was added to the queue. But the expiry time will take precedence and the message will be deleted at that time from the queue.
Update Message can also be used by workers to store the processing state in the message. This processing state can then be used by another worker to resume processing if the former worker crashed or got interrupted and the message has not yet expired.
When getting a message, the worker gets back a pop-receipt. A valid pop-receipt is needed to perform any action on the message while it is invisible in the queue. The Update Message requires the pop receipt returned during the “Get Messages” request or a previous Update Message. The pop receipt is invalid (400 HTTP status code) if:
- The message has expired.
- The message has been deleted using the last pop receipt received either from “Get Messages” or “Update Message”.
- The invisibility time has elapsed and the message has been retrieved by another “Get Messages” call.
- The message has been updated with a new visibility timeout and hence a new pop receipt is returned. Each time the message is updated, it gets a new pop-receipt which is returned with the UpdateMessage call.
NOTE: When a worker goes to renew the lease (extend the visibility timeout), if for some reason the pop receipt is not received by the client (e.g., network error), the client can retry the request with the pop receipt it currently has. But if that retry fails with “Message not found” then the client should give up processing the message, and get a new message to process. This is because the prior message did have its visibility timeout extended, but it now has a new pop receipt, and that message will become visible again after the timeout elapses at which time a worker can dequeue it again and continue processing it.
The pop receipt returned in the response should be used for subsequent “Delete Message” and “Update Message” APIs. The new next visibility timeout is also returned in the response header.
REST Examples
Update a message to set the visibility timeout to 1 minute.
PUT http://cohowinery.queue.core.windosws.net/videoprocessing/messages/663d89aa-d1d9-42a2-9a6a-fcf822a97d2c?popreceipt=AgAAAAEAAAApAAAAGIw6Q29bzAE%3d&visibilitytimeout=60&timeout=30 HTTP/1.1 x-ms-version: 2011-08-18 x-ms-date: Fri, 02 Sep 2011 05:03:21 GMT Authorization: SharedKey cohowinery:batcrWZ35InGCZeTUFWMdIQiOZPCW7UEyeGdDOg7WW4= Host: 10.200.21.10 Content-Length: 75 <QueueMessage><MessageText>new-message-content</MessageText></QueueMessage>
Storage Client Library Example
Continuing with the example of video processing workflow for Coho Winery, we will now go over the processing part of the workflow. The video processing task is a long running task and we would like to divide the work into stages defined by the ProcessingState enumeration mentioned above. The workflow is to retrieve a message, then decode its content to get the processing state and the actual content. To retrieve, we use the new extension method since the September 2009 version of GetMessage API blocked visibility timeouts of longer than 2 hours on the client side, and therefore won’t support this workflow. ProcessMessages starts a timer to iterate through all the current messages retrieved and renew the lease or delete the message based on the processing state and when the message will be visible again. ProcessMessages converts the QueueMessage retrieved into MessageInfo and adds it to the list of messages that needs to be renewed. The MessageInfo class exists since the QueueMessage class does not allow updating the pop receipt which needs to set on every Update message.
public class MessageInfo { /// <summary> /// Message info constructor /// </summary> /// <param name="queue"></param> /// <param name="messageId"></param> /// <param name="popRceeipt"></param> public MessageInfo( CloudQueue queue, string messageId, string popRceeipt, string content, ProcessingState state) { this.Queue = queue; this.MessageId = messageId; this.PopReceipt = popRceeipt; this.State = state; this.Content = content; } /// <summary> /// The queue to which the message belongs to /// </summary> public CloudQueue Queue { get; private set; } /// <summary> /// The message id for the message /// </summary> public string MessageId { get; private set; } /// <summary> /// The pop receipt to use for update and delete /// </summary> public string PopReceipt { get; set; } /// <summary> /// The content of the message /// </summary> public string Content { get; set; } /// <summary> /// Next visibility time /// </summary> public DateTime NextVisibility { get; set; } /// <summary> /// The processing state the message is in. If completed, it will be /// deleted from the queue /// </summary> public ProcessingState State { get; set; } } /// <summary> /// Called every minute to renew the lease /// </summary> private static void OnRenewLeaseTimer(object state) { // Exception handling hidden for brevity... // traversing from last to allow deleting the message // from the list for ( int i = MessageList.Count-1; i >= 0; i--) { MessageInfo message = MessageList[i]; // if the message is completed - let us delete it if(message.State == ProcessingState.Completed) { message.Queue.DeleteMessage(message.MessageId, message.PopReceipt); Console.WriteLine( "Deleted Message Id {0} to stage {1}", message.MessageId, (int)message.State); MessageList.RemoveAt(i); } else if ( message.NextVisibility.Subtract(DateTime.UtcNow).TotalSeconds < RenewalTime) { // if next visibility is < renewal time then let us renew it again DateTime nextVisibilityTime; string newPopReceipt; // based on whether we need to stop or not and the state, we will // update the visibility // NOTE: we always update content but we can be smart about it and update only // if state changes message.Queue.UpdateMessage( Account.Credentials, message.MessageId, message.PopReceipt, VisibilityTimeout, EncodeMessage(message.Content, message.State), ServerRequestTimeout, out newPopReceipt, out nextVisibilityTime); message.PopReceipt = newPopReceipt; message.NextVisibility = nextVisibilityTime; Console.WriteLine( "Updated Message Id {0} to stage {1} Next visible at {2}", message.MessageId, (int)message.State, nextVisibilityTime); } } } // NOTE: Exception handling is excluded here for brevity /// <summary> /// Processes a given number of messages. It iterates through stages and extends /// visibility and saves state if it should continue processing. /// </summary> private static void ProcessMessages() { CloudQueueClient queueClient = Account.CreateCloudQueueClient(); CloudQueue queue = queueClient.GetQueueReference(QueueName); queue.EncodeMessage = false; Timer timer = new Timer(new TimerCallback(OnRenewLeaseTimer), null, 0, TimerInterval); while (true) { QueueMessage message = queue.GetMessages( Account.Credentials, VisibilityTimeout, 1 /* message count */, ServerRequestTimeout).FirstOrDefault(); if (message == null) { Thread.Sleep(PollingTime); continue; } string messageContent = message.Text; Console.WriteLine( "\n\nGot message Content={0} Length={1} Id={2} InsertedAt={3} Visibility={4}", messageContent, messageContent.Length, message.Id, message.InsertionTime, message.TimeNextVisible); string content; ProcessingState state; DecodeMessage(messageContent, out content, out state); MessageInfo msgInfo = new MessageInfo( queue, message.Id, message.PopReceipt, content, state); MessageList.Add( msgInfo ); Console.WriteLine("Message Id {0} is in stage {1}", message.Id, (int)state); // keep processing until we complete all stages of processing or // we have next visibility < UtcNow i.e. lease lost while (state != ProcessingState.Completed && msgInfo.NextVisibility < DateTime.UtcNow) { // do some work.. ProcessStage(msgInfo.MessageId, msgInfo.Content, ref state); msgInfo.State = state; } } }
Get Messages
The “Get Messages” REST API is used to retrieve messages. The only change in 2011-08-18 version is that the visibility timeout has been extended from 2 hours to 7 days.
REST Examples
Get messages with visibility timeout set to 4 hours (provided in seconds).
GET http://cohowinery.queue.core.windosws.net/videoprocessing/messages? visibilitytimeout=14400&timeout=30 HTTP/1.1
x-ms-version: 2011-08-18
x-ms-date: Fri, 02 Sep 2011 05:03:21 GMT
Authorization: SharedKey cohowinery:batcrWZ35InGCZeTUFWMdIQiOZPCW7UEyeGdDOg7WW4=
Host: 10.200.21.10
Storage Client Library Example
The example in Update Message covers the invocation of GetMessages extension.
Storage Client Library Extensions
As we mentioned above, the existing Storage Client library released in SDK version 1.5 does not support the new version, therefore we have provided sample extension methods described in this blog post so you can start using these new features today. These extension methods can help you issue such requests. Please test this thoroughly before using it in production to ensure it meets your needs.
We have provided 2 extension methods:
- PutMessage: implements adding a message to the queue with visibility timeout.
- UpdateMessage: implements updating a message (content and/or visibility timeout. It returns the new pop receipt and next visibility timeout. It does not change the CloudQueueMessage type, as pop receipt and next visibility are not publically accessible.
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Net; using Microsoft.WindowsAzure.StorageClient.Protocol; using System.IO; using Microsoft.WindowsAzure; using Microsoft.WindowsAzure.StorageClient; // NOTE: Please test these before using in production public static class QueueExtensions { /// <summary> /// Add a message to the queue. The visibility timeout param can be used to optionally /// make the message visible at a future time /// </summary> /// <param name="queue"> /// The queue to add message to /// </param> /// <param name="credentials"> /// The storage credentials used for signing /// </param> /// <param name="message"> /// The message content /// </param> /// <param name="visibilityTimeout"> /// value in seconds and should be greater than or equal to 0 and less than 604800 (7 days). /// It should also be less than messageTimeToLive /// </param> /// <param name="messageTimeToLive"> /// (Optional) Time after which the message expires if it is not deleted from the queue. /// It can be a maximum time of 7 days. /// </param> /// <param name="timeout"> /// Server timeout value /// </param> public static void PutMessage( this CloudQueue queue, StorageCredentials credentials, string message, int? visibilityTimeout, int? messageTimeToLive, int timeout) { StringBuilder builder = new StringBuilder(queue.Uri.AbsoluteUri); builder.AppendFormat("/messages?timeout={0}", timeout); if (messageTimeToLive != null) { builder.AppendFormat("&messagettl={0}", messageTimeToLive.ToString()); } if (visibilityTimeout != null) { builder.AppendFormat("&visibilitytimeout={0}", visibilityTimeout); } HttpWebRequest request = (HttpWebRequest)HttpWebRequest.Create(builder.ToString()); request.Method = "POST"; request.Headers.Add("x-ms-version", "2011-08-18"); byte[] buffer = QueueRequest.GenerateMessageRequestBody(message); request.ContentLength = buffer.Length; credentials.SignRequest(request); using (Stream stream = request.GetRequestStream()) { stream.Write(buffer, 0, buffer.Length); } try { using(HttpWebResponse response = (HttpWebResponse)request.GetResponse()) { // we expect 201 for Put Message if (response.StatusCode != HttpStatusCode.Created) { throw new InvalidOperationException("Unexpected response code."); } } } catch (WebException e) { // Log any exceptions for debugging LogWebException(e); throw; } } /// <summary> /// Update the message to extend visibility timeout and optionally /// the message contents /// </summary> /// <param name="queue"> /// The queue to operate on /// </param> /// <param name="credentials"> /// The storage credentials used for signing /// </param> /// <param name="messageId"> /// The ID of message to extend the lease on /// </param> /// <param name="popReceipt"> /// pop receipt to use /// </param> /// <param name="visibilityTimeout"> /// Value should be greater than or equal to 0 and less than 7. /// </param> /// <param name="messageBody"> /// (optional) The message content /// </param> /// <param name="timeout"> /// Server timeout value /// </param> /// <param name="newPopReceiptID"> /// Return the new pop receipt that should be used for subsequent requests when /// the lease is held /// </param> /// <param name="nextVisibilityTime"> /// Return the next visibility time for the message. This is time until which the lease is held /// </param> public static void UpdateMessage( this CloudQueue queue, StorageCredentials credentials, string messageId, string popReceipt, int visibilityTimeout, string messageBody, int timeout, out string newPopReceiptID, out DateTime nextVisibilityTime) { StringBuilder builder = new StringBuilder(queue.Uri.AbsoluteUri); builder.AppendFormat( "/messages/{0}?timeout={1}&popreceipt={2}&visibilitytimeout={3}", messageId, timeout, Uri.EscapeDataString(popReceipt), visibilityTimeout); HttpWebRequest request = (HttpWebRequest)HttpWebRequest.Create(builder.ToString()); request.Method = "PUT"; request.Headers.Add("x-ms-version", "2011-08-18"); if (messageBody != null) { byte[] buffer = QueueRequest.GenerateMessageRequestBody(messageBody); request.ContentLength = buffer.Length; credentials.SignRequest(request); using (Stream stream = request.GetRequestStream()) { stream.Write(buffer, 0, buffer.Length); } } else { request.ContentLength = 0; credentials.SignRequest(request); } try { using (HttpWebResponse response = (HttpWebResponse)request.GetResponse()) { if (response.StatusCode != HttpStatusCode.NoContent) { throw new InvalidOperationException("Unexpected response code."); } newPopReceiptID = response.Headers["x-ms-popreceipt"]; nextVisibilityTime = DateTime.Parse(response.Headers["x-ms-time-next-visible"]); } } catch (WebException e) { // Log any exceptions for debugging LogWebException(e); throw; } } /// <summary> /// Get messages has been provided only because storage client library does not allow /// invisibility timeout to exceed 2 hours /// </summary> /// <param name="queue"> /// The queue to operate on /// </param> /// <param name="credentials"> /// The storage credentials used for signing /// </param> /// <param name="messageId"> /// The ID of message to extend the lease on /// </param> /// <param name="popReceipt"> /// pop receipt to use /// </param> /// <param name="visibilityTimeout"> /// Value should be greater than or equal to 0 and less than 7. /// </param> /// <param name="messageBody"> /// (optional) The message content /// </param> /// <param name="timeout"> /// Server timeout value /// </param> /// <param name="newPopReceiptID"> /// Return the new pop receipt that should be used for subsequent requests when /// the lease is held /// </param> /// <param name="nextVisibilityTime"> /// Return the next visibility time for the message. This is time until which the lease is held /// </param> public static IEnumerable<QueueMessage> GetMessages( this CloudQueue queue, StorageCredentials credentials, int? visibilityTimeout, int? messageCount, int timeout) { StringBuilder builder = new StringBuilder(queue.Uri.AbsoluteUri); builder.AppendFormat( "/messages?timeout={0}", timeout); if (messageCount != null) { builder.AppendFormat("&numofmessages={0}", messageCount); } if (visibilityTimeout != null) { builder.AppendFormat("&visibilitytimeout={0}", visibilityTimeout); } HttpWebRequest request = (HttpWebRequest)HttpWebRequest.Create(builder.ToString()); request.Method = "GET"; request.Headers.Add("x-ms-version", "2011-08-18"); credentials.SignRequest(request); try { using (HttpWebResponse response = (HttpWebResponse)request.GetResponse()) { if (response.StatusCode != HttpStatusCode.OK) { throw new InvalidOperationException("Unexpected response code."); } GetMessagesResponse msgResponses = QueueResponse.GetMessages(response); // force it to be parsed right away else the response will be closed // since QueueResponse.GetMessages parses responses lazily. QueueMessage[] messages = msgResponses.Messages.ToArray<QueueMessage>(); return messages.AsEnumerable<QueueMessage>(); } } catch (WebException e) { // Log any exceptions for debugging LogWebException(e); throw; } } /// <summary> /// Log the exception in your preferred logging system /// </summary> /// <param name="e"> /// The exception to log /// </param> private static void LogWebException(WebException e) { HttpWebResponse response = e.Response as HttpWebResponse; Console.WriteLine(string.Format( "Request failed with '{0}'. Status={1} RequestId={2} Exception={3}", e.Message, response.StatusCode, response != null ? response.Headers["x-ms-request-id"] : "<NULL>", e.ToString())); // Log to your favorite location… } }
Jai Haridas
I have been conducting a lot of Azure workshops, and just wanted to say a big thanks for these improvements !!. Customers will be very happy in my next boot camp . Kudos to the Storage team !
If the best practise to store large messages (>64kb) is a combination of storing the key in the queue and the message in the BLOB, cant you make this as your internal implementation and drop the message size limitation for a message in the queue?
@Ravi, Thanks for the feature request. We have noted it down in the list of feature requests to consider in the future. You can also look at using client side abstractions that makes this simpler (windowsazurecat.com/…/implementing-storage-abstraction-layer-to-support-very-large-messages-in-windows-azure-queues)
@Ravi – My initial thoughts exactly when reading this blog
@Jai – Thanks for the link. Although the hyperlink is TL;DR, the conclusion looks promising and I'll review it more when I need queueing. I hope MSFT Eugne from PnP is using that code, seems like something of his quality.
Hi,
Why isn't there a way to block on the GetMessage method until a message is available?
A blocking version of GetMessage with an optional timeout parameter would be very nice.
—
Adar Wesley
@Adar, thank you for the feedback. We will look at providing this functionality in the storage client library, to abstract that away.
I just downloaded the Nov 2011 release 1.6 Azure SDK and tried to use the UpdateMesasge method to update the content of a queue message. If i set the 'MessageUpdateFields' to 'MessageUpdateFields.Content' then it raises the exception 'Calls to UpdateMessage must include the Visibility flag' and if I set it to 'MessageUpdateFields.Visibility' it just dones not update the message contents.
Any idea what is going on?
Thanks.
@Vikram,
MessageUpdateFields enum is declared as flags. To update both content and visibility, have you tried setting it to:
MessageUpdateFields.Content | MessageUpdateFields.Visibility