azure-servicebus
Microsoft Azure Service Bus Client Library for Python
Description
Azure Service Bus client library for Python
Azure Service Bus is a high performance cloud-managed messaging service for providing real-time and fault-tolerant communication between distributed senders and receivers.
Service Bus provides multiple mechanisms for asynchronous highly reliable communication, such as structured first-in-first-out messaging, publish/subscribe capabilities, and the ability to easily scale as your needs grow.
Use the Service Bus client library for Python to communicate between applications and services and implement asynchronous messaging patterns.
- Create Service Bus namespaces, queues, topics, and subscriptions, and modify their settings.
- Send and receive messages within your Service Bus channels.
- Utilize message locks, sessions, and dead letter functionality to implement complex messaging patterns.
Source code | [Package (PyPi)][pypi] | Package (Conda) | [API reference documentation][api_docs] | [Product documentation][product_docs] | Samples | Changelog
NOTE: If you are using version 0.50 or lower and want to migrate to the latest version of this package please look at our [migration guide to move from Service Bus V0.50 to Service Bus V7][migration_guide].
Getting started
Install the package
Install the Azure Service Bus client library for Python with [pip][pip]:
pip install azure-servicebus
Prerequisites:
To use this package, you must have:
- Azure subscription - [Create a free account][azure_sub]
- Azure Service Bus - [Namespace and management credentials][service_bus_namespace]
- Python 3.9 or later - [Install Python][python]
If you need an Azure service bus namespace, you can create it via the [Azure Portal][azure_namespace_creation]. If you do not wish to use the graphical portal UI, you can use the Azure CLI via [Cloud Shell][cloud_shell_bash], or Azure CLI run locally, to create one with this Azure CLI command:
az servicebus namespace create --resource-group <resource-group-name> --name <servicebus-namespace-name> --location <servicebus-namespace-location>
Authenticate the client
Interaction with Service Bus starts with an instance of the ServiceBusClient class. You either need a connection string with SAS key, or a namespace and one of its account keys to instantiate the client object.
Please find the samples linked below for demonstration as to how to authenticate via either approach.
[Create client from connection string][sample_authenticate_client_connstr]
- To obtain the required credentials, one can use the [Azure CLI][azure_cli] snippet (Formatted for Bash Shell) at the top of the linked sample to populate an environment variable with the service bus connection string (you can also find these values in the [Azure Portal][azure_portal] by following the step-by-step guide to [Get a service bus connection string][get_servicebus_conn_str]).
[Create client using the azure-identity library][sample_authenticate_client_aad]:
- This constructor takes the fully qualified namespace of your Service Bus instance and a credential that implements the
[TokenCredential][token_credential_interface]
protocol. There are implementations of the
TokenCredentialprotocol available in the [azure-identity package][pypi_azure_identity]. The fully qualified namespace is of the format<yournamespace.servicebus.windows.net>. - To use the credential types provided by
azure-identity, please install the package:pip install azure-identity - Additionally, to use the async API, you must first install an async transport, such as
aiohttp:pip install aiohttp - When using Azure Active Directory, your principal must be assigned a role which allows access to Service Bus, such as the Azure Service Bus Data Owner role. For more information about using Azure Active Directory authorization with Service Bus, please refer to [the associated documentation][servicebus_aad_authentication].
Note: client can be initialized without a context manager, but must be manually closed via client.close() to not leak resources.
Key concepts
Once you've initialized a ServiceBusClient, you can interact with the primary resource types within a Service Bus Namespace, of which multiple can exist and on which actual message transmission takes place, the namespace often serving as an application container:
-
[Queue][queue_concept]: Allows for Sending and Receiving of message. Often used for point-to-point communication.
-
[Topic][topic_concept]: As opposed to Queues, Topics are better suited to publish/subscribe scenarios. A topic can be sent to, but requires a subscription, of which there can be multiple in parallel, to consume from.
-
[Subscription][subscription_concept]: The mechanism to consume from a Topic. Each subscription is independent, and receives a copy of each message sent to the topic. Rules and Filters can be used to tailor which messages are received by a specific subscription.
For more information about these resources, see [What is Azure Service Bus?][service_bus_overview].
To interact with these resources, one should be familiar with the following SDK concepts:
-
[ServiceBusClient][client_reference]: This is the object a user should first initialize to connect to a Service Bus Namespace. To interact with a queue, topic, or subscription, one would spawn a sender or receiver off of this client.
-
[ServiceBusSender][sender_reference]: To send messages to a Queue or Topic, one would use the corresponding
get_queue_senderorget_topic_sendermethod off of aServiceBusClientinstance as seen here. -
[ServiceBusReceiver][receiver_reference]: To receive messages from a Queue or Subscription, one would use the corresponding
get_queue_receiverorget_subscription_receivermethod off of aServiceBusClientinstance as seen here. -
[ServiceBusMessage][message_reference]: When sending, this is the type you will construct to contain your payload. When receiving, this is where you will access the payload.
Thread safety
We do not guarantee that the ServiceBusClient, ServiceBusSender, and ServiceBusReceiver are thread-safe or coroutine-safe. We do not recommend reusing these instances across threads or sharing them between coroutines. It is up to the running application to use these classes in a concurrency-safe manner.
The data model type, ServiceBusMessageBatch is not thread-safe or coroutine-safe. It should not be shared across threads nor used concurrently with client methods.
For scenarios requiring concurrent sending from multiple threads, ensure proper thread-safety management using mechanisms like threading.Lock(). Note: Native async APIs should be used instead of running in a ThreadPoolExecutor, if possible.
import threading
from concurrent.futures import ThreadPoolExecutor
from azure.servicebus import ServiceBusClient, ServiceBusMessage
from azure.identity import DefaultAzureCredential
SERVICE_BUS_NAMESPACE = "<your-namespace>.servicebus.windows.net"
QUEUE_NAME = "<your-queue-name>"
lock = threading.Lock()
def send_batch(sender_id, sender):
with lock:
messages = [ServiceBusMessage(f"Message {i} from sender {sender_id}") for i in range(10)]
sender.send_messages(messages)
print(f"Sender {sender_id} sent messages.")
credential = DefaultAzureCredential()
client = ServiceBusClient(fully_qualified_namespace=SERVICE_BUS_NAMESPACE, credential=credential)
with client:
sender = client.get_queue_sender(queue_name=QUEUE_NAME)
with sender:
with ThreadPoolExecutor(max_workers=5) as executor:
for i in range(5):
executor.submit(send_batch, i, sender)
For scenarios requiring concurrent sending in asyncio applications, ensure proper coroutine-safety management using mechanisms like asyncio.Lock().
import asyncio
from azure.servicebus.aio import ServiceBusClient
from azure.servicebus import ServiceBusMessage
from azure.identity.aio import DefaultAzureCredential
SERVICE_BUS_NAMESPACE = "<your-namespace>.servicebus.windows.net"
QUEUE_NAME = "<your-queue-name>"
lock = asyncio.Lock()
async def send_batch(sender_id, sender):
async with lock:
messages = [ServiceBusMessage(f"Message {i} from sender {sender_id}") for i in range(10)]
await sender.send_messages(messages)
print(f"Sender {sender_id} sent messages.")
credential = DefaultAzureCredential()
client = ServiceBusClient(fully_qualified_namespace=SERVICE_BUS_NAMESPACE, credential=credential)
async with client:
sender = client.get_queue_sender(queue_name=QUEUE_NAME)
async with sender:
await asyncio.gather(*(send_batch(i, sender) for i in range(5)))
Examples
The following sections provide several code snippets covering some of the most common Service Bus tasks, including:
- Send messages to a queue
- Receive messages from a queue
- Send and receive a message from a session enabled queue
- [Working with topics and subscriptions](#working-with-topics-and-subscriptions "Working with topics and subscriptio