Azure Service Bus (ASB)
- 1 Intro
- 2 Documentation
- 3 Tips and Tidbits
- 4 Azure Service Bus - Message expiration (Time to Live)
- 5 Choose a message queue solution
- 6 Get message counters
- 7 Protocols
- 8 Service Bus queues, topics, and subscriptions
- 9 Topic filters and actions
- 10 Service Bus message payloads and serialization
- 11 Use the Azure CLI to create a Service Bus namespace and a queue
- 12 Send and receive message from a Service Bus queue by using .NET.
- 13 Programming: Azure Service Bus Topic And Subscription (Pub-Sub)
Intro
Microsoft Azure Service Bus is a fully managed enterprise integration message broker.
Documentation
Azure's documentation: Service Bus Documentation
Tips and Tidbits
Azure supports two types of queue mechanisms:
Service Bus queues
Service Bus queues are part of a broader Azure messaging infrastructure that supports queuing, publish/subscribe, and more advanced integration patterns.
Storage queues.
Storage queues are part of the Azure Storage infrastructure.
They allow you to store large numbers of messages.
You access messages from anywhere in the world via authenticated calls using HTTP or HTTPS.
A queue message can be up to 64 KB in size. A queue may contain millions of messages,
Queues are commonly used to create a backlog of work to process asynchronously.
Data is transferred between different applications and services using messages.
A message is a container decorated with metadata, and contains data.
The data can be any kind of information, including structured data encoded with the common formats such as the following ones: JSON, XML, Apache Avro, Plain Text.
common messaging scenarios are:
Messaging. Transfer business data, such as sales or purchase orders, journals, or inventory movements.
Decouple applications. Improve reliability and scalability of applications and services. Client and service don't have to be online at the same time
Topics and subscriptions. Enable 1:n relationships between publishers and subscribers.
Message sessions. Implement workflows that require message ordering or message deferral.
Messages in queues are ordered and timestamped on arrival.
Messages are delivered in pull mode
To realize a first-in, first-out (FIFO) guarantee in Service Bus, use sessions.
Message sessions enable joint and ordered handling of unbounded sequences of related messages.
duplicate detection enables the sender to resend the same message, and the queue discards any duplicate copies.
the newly sent message is instantly ignored and dropped
Store the messages for an unlimited period of time.
The autoforwarding feature chains a queue or subscription to another queue or topic that is in the same namespace.
Scheduled delivery: You can submit messages to a queue or topic for delayed processing. You can schedule a job to become available for processing by a system at a certain time.
Filtering and actions: Subscribers can define which messages they want to receive from a topic.
These messages are specified in the form of one or more named subscription rules.
The purpose of the dead-letter queue is to hold messages that can't be delivered to any receiver, or messages that couldn't be processed. Messages can then be removed from the DLQ and inspected.
Lockout period
You can setup auto scale rules to increase queue size
Azure Service Bus - Message expiration (Time to Live)
All messages sent into a queue or topic are subject to a default expiration that is set at the entity level.
It can also be set in the portal during creation and adjusted later.
The default expiration is used for all messages sent to the entity where time-to-live isn't explicitly set.
The default expiration also functions as a ceiling for the time-to-live value.
Messages that have a longer time-to-live expiration than the default value are silently adjusted to the default message time-to-live value before being enqueued.
The default time-to-live value for a brokered message is the largest possible value for a signed 64-bit integer if not otherwise specified.
For messaging entities (queues and topics), the default expiration time is also largest possible value for a signed 64-bit integer for Service Bus standard and premium tiers.
For the basic tier, the default (also maximum) expiration time is 14 days.
If the topic specifies a smaller TTL than the subscription, the topic TTL is applied.
Choose a message queue solution
Use Service Bus queues when:
receive messages without having to poll the queue
provide a guaranteed first-in-first-out (FIFO) ordered delivery
automatic duplicate detection
process messages as parallel long-running streams
require transactional behavior and atomicity when sending or receiving multiple messages from a queue.
Use Storage queues when:
must store over 80 gigabytes of messages in a queue.
Track progress for processing a message in the queue. It's useful if the worker processing a message crashes. Another worker can then use that information to continue from where the prior worker left off.
You require server side logs of all of the transactions executed against your queues.
Queues are commonly used to create a backlog of work to process asynchronously.
Get message counters
Counter | Description |
---|---|
ActiveMessageCount | Number of messages in the queue or subscription that are in the active state and ready for delivery. |
ScheduledMessageCount | Number of messages in the scheduled state. |
DeadLetterMessageCount | Number of messages in the dead-letter queue. |
TransferMessageCount | Number of messages pending transfer into another queue or topic. |
TransferDeadLetterMessageCount | Number of messages that failed to transfer into another queue or topic and have been moved into the transfer dead-letter queue. |
Considerations for scaling threshold values for special metrics
Considerations for scaling threshold values for special metrics
For special metrics such as Storage or Service Bus Queue length metric, the threshold is the average number of messages available per current number of instances.
Protocols
The primary wire protocol for Service Bus is Advanced Messaging Queueing Protocol (AMQP) 1.0, an open ISO/IEC standard.
It allows customers to write applications that work against Service Bus and on-premises brokers such as ActiveMQ or RabbitMQ.
The AMQP protocol guide provides detailed information in case you want to build such an abstraction.
Service Bus queues, topics, and subscriptions
See the archicture docs on Publisher - Subscriber pattern
The messaging entities that form the core of the messaging capabilities in Service Bus are queues, topics and subscriptions, and rules/actions.
Queues offer First In, First Out (FIFO) message delivery to one or more competing consumers.
That is, receivers typically receive and process messages in the order in which they were added to the queue.
only one message consumer receives and processes each message.
Because producers and consumers are not aware of each other, a consumer can be upgraded without having any effect on the producer.
You can specify two different modes in which Service Bus receives messages:
Receive and delete
marks the message as being consumed and returns it to the consumer application
works best for scenarios in which the application can tolerate not processing a message if an application failure occurs.
Peek lock.
the receive operation becomes two-stage, which makes it possible to support applications that can't tolerate missing messages
Finds the next message to be consumed, locks it to prevent other consumers from receiving it, and then, return the message to the application.
After the application finishes processing the message, it requests the Service Bus service to complete the second stage of the receive process.
Then, the service marks the message as being consumed.
there's a timeout associated with the lock. If the application fails to process the message before the lock timeout expires, Service Bus unlocks the message and makes it available to be received again.
In contrast to queues, topics and subscriptions provide a one-to-many form of communication in a publish and subscribe pattern.
It's useful for scaling to large numbers of recipients.
Each published message is made available to each subscription registered with the topic.
Publisher sends a message to a topic and one or more subscribers receive a copy of the message, depending on filter rules set on these subscriptions.
consumers receive messages from subscriptions of the topic.
A topic subscription resembles a virtual queue that receives copies of the messages that are sent to the topic.
Consumers receive messages from a subscription identically to the way they receive messages from a queue.
Rules and actions
In many scenarios, messages that have specific characteristics must be processed in different ways.
To enable this processing, you can configure subscriptions to find messages that have desired properties and then perform certain modifications to those properties
This filtering is accomplished using subscription filters.
Such modifications are called filter actions. When a subscription is created, you can supply a filter expression that operates on the properties of the message.
The properties can be both the system properties (for example, Label) and custom application properties (for example, StoreName.)
The SQL filter expression is optional.
Topic filters and actions
Subscribers can define which messages they want to receive from a topic.
These messages are specified in the form of one or more named subscription rules.
Each rule consists of a filter condition that selects particular messages, and optionally contain an action that annotates the selected message.
Service Bus supports three filter conditions:
SQL Filters - A SqlFilter holds a SQL-like conditional expression that is evaluated in the broker against the arriving messages' user-defined properties and system properties.
All system properties must be prefixed with
sys.
in the conditional expression.The SQL-language subset for filter conditions tests for the existence of properties (
EXISTS
), null-values (IS NULL
), logical NOT/AND/OR, relational operators, simple numeric arithmetic, and simple text pattern matching withLIKE
.
Boolean filters - The TrueFilter and FalseFilter either cause all arriving messages (true) or none of the arriving messages (false) to be selected for the subscription.
These two filters derive from the SQL filter.
Correlation Filters - A CorrelationFilter holds a set of conditions that are matched against one or more of an arriving message's user and system properties.
A common use is to match against the CorrelationId property, but the application can also choose to match against the following properties:
ContentType
Label
MessageId
ReplyTo
ReplyToSessionId
SessionId
To
any user-defined properties.
A match exists when an arriving message's value for a property is equal to the value specified in the correlation filter
Service Bus message payloads and serialization
Messages carry a payload and metadata.
The metadata is in the form of key-value pair properties, and describes the payload, and gives handling instructions to Service Bus and applications.
Occasionally, that metadata alone is sufficient to carry the information that the sender wants to communicate to receivers, and the payload remains empty.
A Service Bus message consists of a binary payload section that Service Bus never handles in any form on the service-side, and two sets of properties.
The broker properties are predefined by the system.
These predefined properties either control message-level functionality inside the broker, or they map to common and standardized metadata items.
A subset of the broker properties:
To
,ReplyTo
,ReplyToSessionId
,MessageId
,CorrelationId
, andSessionId
, are used to help applications route messages to particular destinations.the publisher owns a queue into which it expects replies to be delivered. The address of that queue is expressed in the
ReplyTo
property of the outbound message.When the consumer responds, it copies the
MessageId
of the handled message into theCorrelationId
property of the reply message and delivers the message to the destination indicated by theReplyTo
property.
The user properties are a collection of key-value pairs that can be defined and set by the application.
the payload is always an opaque, binary block.
The
ContentType
property enables applications to describe the payload, with the suggested format for the property values being a MIME content-type description according to IETF RFC2045; for example,application/json;charset=utf-8
Use the Azure CLI to create a Service Bus namespace and a queue
az group create --name ContosoRG --location eastus
az servicebus namespace create --resource-group ContosoRG --name ContosoSBusNS --location eastus
az servicebus queue create --resource-group ContosoRG --namespace-name ContosoSBusNS --name ContosoOrdersQueue
Run the following command to get the primary connection string for the namespace.
az servicebus namespace authorization-rule keys list --resource-group ContosoRG --namespace-name ContosoSBusNS --name RootManageSharedAccessKey --query primaryConnectionString --output tsv
Send and receive message from a Service Bus queue by using .NET.
Exercise: Send and receive message from a Service Bus queue by using .NET.
Create a Service Bus messaging namespace.
az servicebus namespace create \ --resource-group az204-svcbus-rg \ --name $myNameSpaceName \ --location $myLocation
Create a Service Bus queue
az servicebus queue create --resource-group az204-svcbus-rg \ --namespace-name $myNameSpaceName \ --name az204-queue
Retrieve the connection string for the Service Bus Namespace
elect the az204svcbus namespace resource you just created.
Select Shared access policies in the Settings section, then select the RootManageSharedAccessKey policy.
Copy the Primary Connection String
Create console app to send messages to the queue
code . dotnet new console dotnet add package Azure.Messaging.ServiceBus
using System.Threading.Tasks; using Azure.Messaging.ServiceBus; ... // connection string to your Service Bus namespace static string connectionString = "<NAMESPACE CONNECTION STRING>"; // name of your Service Bus topic static string queueName = "az204-queue"; // the client that owns the connection and can be used to create senders and receivers static ServiceBusClient client; // the sender used to publish messages to the queue static ServiceBusSender sender; // number of messages to be sent to the queue private const int numOfMessages = 3; static async Task Main() { // Create the clients that we'll use for sending and processing messages. client = new ServiceBusClient(connectionString); sender = client.CreateSender(queueName); // create a batch using ServiceBusMessageBatch messageBatch = await sender.CreateMessageBatchAsync(); for (int i = 1; i <= 3; i++) { // try adding a message to the batch if (!messageBatch.TryAddMessage(new ServiceBusMessage($"Message {i}"))) { // if it is too large for the batch throw new Exception($"The message {i} is too large to fit in the batch."); } } try { // Use the producer client to send the batch of messages to the Service Bus queue await sender.SendMessagesAsync(messageBatch); Console.WriteLine($"A batch of {numOfMessages} messages has been published to the queue."); } finally { // Calling DisposeAsync on client types is required to ensure that network // resources and other unmanaged objects are properly cleaned up. await sender.DisposeAsync(); await client.DisposeAsync(); } Console.WriteLine("Press any key to end the application"); Console.ReadKey(); }
receive messages from the queue
// the processor that reads and processes messages from the queue static ServiceBusProcessor processor; // handle received messages static async Task MessageHandler(ProcessMessageEventArgs args) { string body = args.Message.Body.ToString(); Console.WriteLine($"Received: {body}"); // complete the message. messages is deleted from the queue. await args.CompleteMessageAsync(args.Message); } // handle any errors when receiving messages static Task ErrorHandler(ProcessErrorEventArgs args) { Console.WriteLine(args.Exception.ToString()); return Task.CompletedTask; } static async Task Main() { // Create the client object that will be used to create sender and receiver objects client = new ServiceBusClient(connectionString); // create a processor that we can use to process the messages processor = client.CreateProcessor(queueName, new ServiceBusProcessorOptions()); try { // add handler to process messages processor.ProcessMessageAsync += MessageHandler; // add handler to process any errors processor.ProcessErrorAsync += ErrorHandler; // start processing await processor.StartProcessingAsync(); Console.WriteLine("Wait for a minute and then press any key to end the processing"); Console.ReadKey(); // stop processing Console.WriteLine("\nStopping the receiver..."); await processor.StopProcessingAsync(); Console.WriteLine("Stopped receiving messages"); } finally { // Calling DisposeAsync on client types is required to ensure that network // resources and other unmanaged objects are properly cleaned up. await processor.DisposeAsync(); await client.DisposeAsync(); } }
Programming: Azure Service Bus Topic And Subscription (Pub-Sub)
A C# example of Azure Service Bus Topic And Subscription (Pub-Sub)
class Program
{
static ISubscriptionClient subscriptionClient;
static void Main(string[] args)
{
string sbConnectionString = "Endpoint=sb://mobilerecharge.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=KVb9ubc9XaV0dT/1dMjW9CzPWvA/JGvVvUZ64U21IBI=";
string sbTopic = "offers";
string sbSubscription = "akki5677";
try
{
subscriptionClient = new SubscriptionClient(sbConnectionString, sbTopic, sbSubscription);
var messageHandlerOptions = new MessageHandlerOptions(ExceptionReceivedHandler)
{
MaxConcurrentCalls = 1,
AutoComplete = false
};
subscriptionClient.RegisterMessageHandler(ReceiveMessagesAsync, messageHandlerOptions);
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
}
finally
{
Console.ReadKey();
subscriptionClient.CloseAsync();
}
}
static async Task ReceiveMessagesAsync(Message message, CancellationToken token)
{
Console.WriteLine($"Subscribed message: {Encoding.UTF8.GetString(message.Body)}");
await subscriptionClient.CompleteAsync(message.SystemProperties.LockToken);
}
static Task ExceptionReceivedHandler(ExceptionReceivedEventArgs exceptionReceivedEventArgs)
{
Console.WriteLine(exceptionReceivedEventArgs.Exception);
return Task.CompletedTask;
}
}