Azure Service Bus (ASB)

Azure Service Bus (ASB)


Intro

Microsoft Azure Service Bus is a fully managed enterprise integration message broker.


Documentation

 


Tips and Tidbits

 

  • Discover Azure message queues

  • Explore Azure Service Bus

  • Azure supports two types of queue mechanisms:

    • Service Bus queues

      • Service Bus queues are part of a broader Azure messaging infrastructure that supports queuing, publish/subscribe, and more advanced integration patterns.

    • Storage queues.

      • Storage queues are part of the Azure Storage infrastructure.

      • They allow you to store large numbers of messages.

      • You access messages from anywhere in the world via authenticated calls using HTTP or HTTPS.

      • A queue message can be up to 64 KB in size. A queue may contain millions of messages,

      • Queues are commonly used to create a backlog of work to process asynchronously.

  • Data is transferred between different applications and services using messages.

    • A message is a container decorated with metadata, and contains data.

    • The data can be any kind of information, including structured data encoded with the common formats such as the following ones: JSON, XML, Apache Avro, Plain Text.

  • common messaging scenarios are:

    • Messaging. Transfer business data, such as sales or purchase orders, journals, or inventory movements.

    • Decouple applications. Improve reliability and scalability of applications and services. Client and service don't have to be online at the same time

    • Topics and subscriptions. Enable 1:n relationships between publishers and subscribers.

    • Message sessions. Implement workflows that require message ordering or message deferral.

 

 

  • Messages in queues are ordered and timestamped on arrival.

  • Messages are delivered in pull mode

    • To realize a first-in, first-out (FIFO) guarantee in Service Bus, use sessions. 

    • Message sessions enable joint and ordered handling of unbounded sequences of related messages.

  • duplicate detection enables the sender to resend the same message, and the queue discards any duplicate copies.

    • the newly sent message is instantly ignored and dropped

  • Store the messages for an unlimited period of time.

  • The autoforwarding feature chains a queue or subscription to another queue or topic that is in the same namespace.

  • Scheduled delivery: You can submit messages to a queue or topic for delayed processing. You can schedule a job to become available for processing by a system at a certain time.

  • Filtering and actions: Subscribers can define which messages they want to receive from a topic.

    • These messages are specified in the form of one or more named subscription rules.

  • The purpose of the dead-letter queue is to hold messages that can't be delivered to any receiver, or messages that couldn't be processed. Messages can then be removed from the DLQ and inspected. 

 

  • Lockout period

 

 

 

 

 

 

 


Azure Service Bus - Message expiration (Time to Live)

 

  • Azure Service Bus - Message expiration (Time to Live)

  • All messages sent into a queue or topic are subject to a default expiration that is set at the entity level.

    • It can also be set in the portal during creation and adjusted later.

    • The default expiration is used for all messages sent to the entity where time-to-live isn't explicitly set.

  • The default expiration also functions as a ceiling for the time-to-live value.

    • Messages that have a longer time-to-live expiration than the default value are silently adjusted to the default message time-to-live value before being enqueued.

  • The default time-to-live value for a brokered message is the largest possible value for a signed 64-bit integer if not otherwise specified.

  • For messaging entities (queues and topics), the default expiration time is also largest possible value for a signed 64-bit integer for Service Bus standard and premium tiers.

    • For the basic tier, the default (also maximum) expiration time is 14 days.

  • If the topic specifies a smaller TTL than the subscription, the topic TTL is applied.

 


Choose a message queue solution

  • Choose a message queue solution

  • Use Service Bus queues when:

    • receive messages without having to poll the queue

    • provide a guaranteed first-in-first-out (FIFO) ordered delivery

    • automatic duplicate detection

    • process messages as parallel long-running streams

    • require transactional behavior and atomicity when sending or receiving multiple messages from a queue.

 

  • Use Storage queues when:

    • must store over 80 gigabytes of messages in a queue.

    • Track progress for processing a message in the queue. It's useful if the worker processing a message crashes. Another worker can then use that information to continue from where the prior worker left off.

    • You require server side logs of all of the transactions executed against your queues.

    • Queues are commonly used to create a backlog of work to process asynchronously.




Get message counters

 

Counter

Description

Counter

Description

ActiveMessageCount

Number of messages in the queue or subscription that are in the active state and ready for delivery.

ScheduledMessageCount

Number of messages in the scheduled state.

DeadLetterMessageCount

Number of messages in the dead-letter queue.

TransferMessageCount

Number of messages pending transfer into another queue or topic.

TransferDeadLetterMessageCount

Number of messages that failed to transfer into another queue or topic and have been moved into the transfer dead-letter queue.



Considerations for scaling threshold values for special metrics


Protocols

 

  • The primary wire protocol for Service Bus is Advanced Messaging Queueing Protocol (AMQP) 1.0, an open ISO/IEC standard.

  • It allows customers to write applications that work against Service Bus and on-premises brokers such as ActiveMQ or RabbitMQ.

  • The AMQP protocol guide provides detailed information in case you want to build such an abstraction.

 


Service Bus queues, topics, and subscriptions

 

  • Service Bus queues, topics, and subscriptions

  • The messaging entities that form the core of the messaging capabilities in Service Bus are queues, topics and subscriptions, and rules/actions.

  • Queues offer First In, First Out (FIFO) message delivery to one or more competing consumers.

    • That is, receivers typically receive and process messages in the order in which they were added to the queue.

    • only one message consumer receives and processes each message.

  • Because producers and consumers are not aware of each other, a consumer can be upgraded without having any effect on the producer.

  • You can specify two different modes in which Service Bus receives messages:

    • Receive and delete

      • marks the message as being consumed and returns it to the consumer application

      • works best for scenarios in which the application can tolerate not processing a message if an application failure occurs.

    • Peek lock.

      • the receive operation becomes two-stage, which makes it possible to support applications that can't tolerate missing messages

      • Finds the next message to be consumed, locks it to prevent other consumers from receiving it, and then, return the message to the application.

      • After the application finishes processing the message, it requests the Service Bus service to complete the second stage of the receive process.

        • Then, the service marks the message as being consumed.

        • there's a timeout associated with the lock. If the application fails to process the message before the lock timeout expires, Service Bus unlocks the message and makes it available to be received again.

 

  • In contrast to queues, topics and subscriptions provide a one-to-many form of communication in a publish and subscribe pattern.

  • It's useful for scaling to large numbers of recipients.

  • Each published message is made available to each subscription registered with the topic.

  • Publisher sends a message to a topic and one or more subscribers receive a copy of the message, depending on filter rules set on these subscriptions.

  • consumers receive messages from subscriptions of the topic.

    • A topic subscription resembles a virtual queue that receives copies of the messages that are sent to the topic.

    • Consumers receive messages from a subscription identically to the way they receive messages from a queue.

 

  • Rules and actions

  • In many scenarios, messages that have specific characteristics must be processed in different ways.

  • To enable this processing, you can configure subscriptions to find messages that have desired properties and then perform certain modifications to those properties

  • This filtering is accomplished using subscription filters.

    • Such modifications are called filter actions. When a subscription is created, you can supply a filter expression that operates on the properties of the message.

    • The properties can be both the system properties (for example, Label) and custom application properties (for example, StoreName.)

      • The SQL filter expression is optional.

 


Topic filters and actions

 

  • Topic filters and actions

  • Subscribers can define which messages they want to receive from a topic.

  • These messages are specified in the form of one or more named subscription rules.

  • Each rule consists of a filter condition that selects particular messages, and optionally contain an action that annotates the selected message.

  • Filters

  • Service Bus supports three filter conditions:

    • SQL Filters - A SqlFilter holds a SQL-like conditional expression that is evaluated in the broker against the arriving messages' user-defined properties and system properties.

      • All system properties must be prefixed with sys. in the conditional expression.

      • The SQL-language subset for filter conditions tests for the existence of properties (EXISTS), null-values (IS NULL), logical NOT/AND/OR, relational operators, simple numeric arithmetic, and simple text pattern matching with LIKE.

    • Boolean filters - The TrueFilter and FalseFilter either cause all arriving messages (true) or none of the arriving messages (false) to be selected for the subscription.

      • These two filters derive from the SQL filter.

    • Correlation Filters - A CorrelationFilter holds a set of conditions that are matched against one or more of an arriving message's user and system properties.

      • A common use is to match against the CorrelationId property, but the application can also choose to match against the following properties:

        • ContentType

        • Label

        • MessageId

        • ReplyTo

        • ReplyToSessionId

        • SessionId

        • To

        • any user-defined properties.

        • A match exists when an arriving message's value for a property is equal to the value specified in the correlation filter

  •  


Service Bus message payloads and serialization

  • Explore Service Bus message payloads and serialization

  • Messages carry a payload and metadata.

  • The metadata is in the form of key-value pair properties, and describes the payload, and gives handling instructions to Service Bus and applications.

    • Occasionally, that metadata alone is sufficient to carry the information that the sender wants to communicate to receivers, and the payload remains empty.

  • A Service Bus message consists of a binary payload section that Service Bus never handles in any form on the service-side, and two sets of properties.

    • The broker properties are predefined by the system.

      • These predefined properties either control message-level functionality inside the broker, or they map to common and standardized metadata items.

      • A subset of the broker properties: To, ReplyTo, ReplyToSessionId, MessageId, CorrelationId, and SessionId, are used to help applications route messages to particular destinations.

        • the publisher owns a queue into which it expects replies to be delivered. The address of that queue is expressed in the ReplyTo property of the outbound message.

        • When the consumer responds, it copies the MessageId of the handled message into the CorrelationId property of the reply message and delivers the message to the destination indicated by the ReplyTo property.

        •  

    • The user properties are a collection of key-value pairs that can be defined and set by the application.

  • the payload is always an opaque, binary block.

    • The ContentType property enables applications to describe the payload, with the suggested format for the property values being a MIME content-type description according to IETF RFC2045; for example, application/json;charset=utf-8

 


Use the Azure CLI to create a Service Bus namespace and a queue

 

az group create --name ContosoRG --location eastus az servicebus namespace create --resource-group ContosoRG --name ContosoSBusNS --location eastus az servicebus queue create --resource-group ContosoRG --namespace-name ContosoSBusNS --name ContosoOrdersQueue Run the following command to get the primary connection string for the namespace. az servicebus namespace authorization-rule keys list --resource-group ContosoRG --namespace-name ContosoSBusNS --name RootManageSharedAccessKey --query primaryConnectionString --output tsv

 


Send and receive message from a Service Bus queue by using .NET.

 

  • Exercise: Send and receive message from a Service Bus queue by using .NET.

  • Create a Service Bus messaging namespace.

  • az servicebus namespace create \ --resource-group az204-svcbus-rg \ --name $myNameSpaceName \ --location $myLocation

     

  • Create a Service Bus queue

  • az servicebus queue create --resource-group az204-svcbus-rg \ --namespace-name $myNameSpaceName \ --name az204-queue

 

  • Retrieve the connection string for the Service Bus Namespace

    • elect the az204svcbus namespace resource you just created.

    • Select Shared access policies in the Settings section, then select the RootManageSharedAccessKey policy.

    • Copy the Primary Connection String

  • Create console app to send messages to the queue

  • code . dotnet new console dotnet add package Azure.Messaging.ServiceBus

     

  • using System.Threading.Tasks; using Azure.Messaging.ServiceBus; ... // connection string to your Service Bus namespace static string connectionString = "<NAMESPACE CONNECTION STRING>"; // name of your Service Bus topic static string queueName = "az204-queue"; // the client that owns the connection and can be used to create senders and receivers static ServiceBusClient client; // the sender used to publish messages to the queue static ServiceBusSender sender; // number of messages to be sent to the queue private const int numOfMessages = 3; static async Task Main() { // Create the clients that we'll use for sending and processing messages. client = new ServiceBusClient(connectionString); sender = client.CreateSender(queueName); // create a batch using ServiceBusMessageBatch messageBatch = await sender.CreateMessageBatchAsync(); for (int i = 1; i <= 3; i++) { // try adding a message to the batch if (!messageBatch.TryAddMessage(new ServiceBusMessage($"Message {i}"))) { // if it is too large for the batch throw new Exception($"The message {i} is too large to fit in the batch."); } } try { // Use the producer client to send the batch of messages to the Service Bus queue await sender.SendMessagesAsync(messageBatch); Console.WriteLine($"A batch of {numOfMessages} messages has been published to the queue."); } finally { // Calling DisposeAsync on client types is required to ensure that network // resources and other unmanaged objects are properly cleaned up. await sender.DisposeAsync(); await client.DisposeAsync(); } Console.WriteLine("Press any key to end the application"); Console.ReadKey(); }

 

  • receive messages from the queue

  • // the processor that reads and processes messages from the queue static ServiceBusProcessor processor; // handle received messages static async Task MessageHandler(ProcessMessageEventArgs args) { string body = args.Message.Body.ToString(); Console.WriteLine($"Received: {body}"); // complete the message. messages is deleted from the queue. await args.CompleteMessageAsync(args.Message); } // handle any errors when receiving messages static Task ErrorHandler(ProcessErrorEventArgs args) { Console.WriteLine(args.Exception.ToString()); return Task.CompletedTask; } static async Task Main() { // Create the client object that will be used to create sender and receiver objects client = new ServiceBusClient(connectionString); // create a processor that we can use to process the messages processor = client.CreateProcessor(queueName, new ServiceBusProcessorOptions()); try { // add handler to process messages processor.ProcessMessageAsync += MessageHandler; // add handler to process any errors processor.ProcessErrorAsync += ErrorHandler; // start processing await processor.StartProcessingAsync(); Console.WriteLine("Wait for a minute and then press any key to end the processing"); Console.ReadKey(); // stop processing Console.WriteLine("\nStopping the receiver..."); await processor.StopProcessingAsync(); Console.WriteLine("Stopped receiving messages"); } finally { // Calling DisposeAsync on client types is required to ensure that network // resources and other unmanaged objects are properly cleaned up. await processor.DisposeAsync(); await client.DisposeAsync(); } }

 


Programming: Azure Service Bus Topic And Subscription (Pub-Sub)

class Program { static ISubscriptionClient subscriptionClient; static void Main(string[] args) { string sbConnectionString = "Endpoint=sb://mobilerecharge.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=KVb9ubc9XaV0dT/1dMjW9CzPWvA/JGvVvUZ64U21IBI="; string sbTopic = "offers"; string sbSubscription = "akki5677"; try { subscriptionClient = new SubscriptionClient(sbConnectionString, sbTopic, sbSubscription); var messageHandlerOptions = new MessageHandlerOptions(ExceptionReceivedHandler) { MaxConcurrentCalls = 1, AutoComplete = false }; subscriptionClient.RegisterMessageHandler(ReceiveMessagesAsync, messageHandlerOptions); } catch (Exception ex) { Console.WriteLine(ex.Message); } finally { Console.ReadKey(); subscriptionClient.CloseAsync(); } } static async Task ReceiveMessagesAsync(Message message, CancellationToken token) { Console.WriteLine($"Subscribed message: {Encoding.UTF8.GetString(message.Body)}"); await subscriptionClient.CompleteAsync(message.SystemProperties.LockToken); } static Task ExceptionReceivedHandler(ExceptionReceivedEventArgs exceptionReceivedEventArgs) { Console.WriteLine(exceptionReceivedEventArgs.Exception); return Task.CompletedTask; } }