Give AlbumentationsX a star on GitHub — it powers this leaderboard

Star on GitHub

aiormq

Pure python AMQP asynchronous client library

Rank: #2016Downloads: 3,669,800 (30 days)Stars: 314Forks: 64

Description

======
AIORMQ
======

.. image:: https://coveralls.io/repos/github/mosquito/aiormq/badge.svg?branch=master
   :target: https://coveralls.io/github/mosquito/aiormq?branch=master
   :alt: Coveralls

.. image:: https://img.shields.io/pypi/status/aiormq.svg
   :target: https://github.com/mosquito/aiormq
   :alt: Status

.. image:: https://github.com/mosquito/aiormq/workflows/tests/badge.svg
   :target: https://github.com/mosquito/aiormq/actions?query=workflow%3Atests
   :alt: Build status

.. image:: https://img.shields.io/pypi/v/aiormq.svg
   :target: https://pypi.python.org/pypi/aiormq/
   :alt: Latest Version

.. image:: https://img.shields.io/pypi/wheel/aiormq.svg
   :target: https://pypi.python.org/pypi/aiormq/

.. image:: https://img.shields.io/pypi/pyversions/aiormq.svg
   :target: https://pypi.python.org/pypi/aiormq/

.. image:: https://img.shields.io/pypi/l/aiormq.svg
   :target: https://github.com/mosquito/aiormq/blob/master/LICENSE.md


aiormq is a pure python AMQP client library.

.. contents:: Table of contents

Status
======

* 3.x.x branch - Production/Stable
* 4.x.x branch - Unstable (Experimental)
* 5.x.x and greater is only Production/Stable releases.

Features
========

* Connecting by URL

 * amqp example: **amqp://user:password@server.host/vhost**
 * secure amqp example: **amqps://user:password@server.host/vhost?cafile=ca.pem&keyfile=key.pem&certfile=cert.pem&no_verify_ssl=0**

* Buffered queue for received frames
* Only `PLAIN`_ auth mechanism support
* `Publisher confirms`_ support
* `Transactions`_ support
* Channel based asynchronous locks

  .. note::
      AMQP 0.9.1 requires serialize sending for some frame types
      on the channel. e.g. Content body must be following after
      content header. But frames might be sent asynchronously
      on another channels.

* Tracking unroutable messages
  (Use **connection.channel(on_return_raises=False)** for disabling)
* Full SSL/TLS support, using your choice of:
    * ``amqps://`` url query parameters:
        * ``cafile=`` - string contains path to ca certificate file
        * ``capath=`` - string contains path to ca certificates
        * ``cadata=`` - base64 encoded ca certificate data
        * ``keyfile=`` - string contains path to key file
        * ``certfile=`` - string contains path to certificate file
        * ``no_verify_ssl`` - boolean disables certificates validation
    * ``context=`` `SSLContext`_ keyword argument to ``connect()``.
* Python `type hints`_
* Uses `pamqp`_ as an AMQP 0.9.1 frame encoder/decoder


.. _Publisher confirms: https://www.rabbitmq.com/confirms.html
.. _Transactions: https://www.rabbitmq.com/semantics.html
.. _PLAIN: https://www.rabbitmq.com/authentication.html
.. _type hints: https://docs.python.org/3/library/typing.html
.. _pamqp: https://pypi.org/project/pamqp/
.. _SSLContext: https://docs.python.org/3/library/ssl.html#ssl.SSLContext

Tutorial
========

Introduction
------------

Simple consumer
***************

.. code-block:: python

    import asyncio
    import aiormq

    async def on_message(message):
        """
        on_message doesn't necessarily have to be defined as async.
        Here it is to show that it's possible.
        """
        print(f" [x] Received message {message!r}")
        print(f"Message body is: {message.body!r}")
        print("Before sleep!")
        await asyncio.sleep(5)   # Represents async I/O operations
        print("After sleep!")


    async def main():
        # Perform connection
        connection = await aiormq.connect("amqp://guest:guest@localhost/")

        # Creating a channel
        channel = await connection.channel()

        # Declaring queue
        declare_ok = await channel.queue_declare('hello', auto_delete=True)
        consume_ok = await channel.basic_consume(
            declare_ok.queue, on_message, no_ack=True
        )


    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
    loop.run_forever()


Simple publisher
****************

.. code-block:: python
    :name: test_simple_publisher

    import asyncio
    from typing import Optional

    import aiormq
    from aiormq.abc import DeliveredMessage


    MESSAGE: Optional[DeliveredMessage] = None


    async def main():
        global MESSAGE

        body = b'Hello World!'

        # Perform connection
        connection = await aiormq.connect("amqp://guest:guest@localhost//")

        # Creating a channel
        channel = await connection.channel()

        declare_ok = await channel.queue_declare("hello", auto_delete=True)

        # Sending the message
        await channel.basic_publish(body, routing_key='hello')
        print(f" [x] Sent {body}")

        MESSAGE = await channel.basic_get(declare_ok.queue)
        print(f" [x] Received message from {declare_ok.queue!r}")


    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

    assert MESSAGE is not None
    assert MESSAGE.routing_key == "hello"
    assert MESSAGE.body == b'Hello World!'


Work Queues
-----------

Create new task
***************

.. code-block:: python

    import sys
    import asyncio
    import aiormq


    async def main():
        # Perform connection
        connection = await aiormq.connect("amqp://guest:guest@localhost/")

        # Creating a channel
        channel = await connection.channel()

        body = b' '.join(sys.argv[1:]) or b"Hello World!"

        # Sending the message
        await channel.basic_publish(
            body,
            routing_key='task_queue',
            properties=aiormq.spec.Basic.Properties(
                delivery_mode=1,
            )
        )

        print(f" [x] Sent {body!r}")

        await connection.close()


    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())


Simple worker
*************

.. code-block:: python

    import asyncio
    import aiormq
    import aiormq.abc


    async def on_message(message: aiormq.abc.DeliveredMessage):
        print(f" [x] Received message {message!r}")
        print(f"     Message body is: {message.body!r}")


    async def main():
        # Perform connection
        connection = await aiormq.connect("amqp://guest:guest@localhost/")


        # Creating a channel
        channel = await connection.channel()
        await channel.basic_qos(prefetch_count=1)

        # Declaring queue
        declare_ok = await channel.queue_declare('task_queue', durable=True)

        # Start listening the queue with name 'task_queue'
        await channel.basic_consume(declare_ok.queue, on_message, no_ack=True)


    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

    # we enter a never-ending loop that waits for data and runs
    # callbacks whenever necessary.
    print(" [*] Waiting for messages. To exit press CTRL+C")
    loop.run_forever()


Publish Subscribe
-----------------

Publisher
*********

.. code-block:: python

    import sys
    import asyncio
    import aiormq


    async def main():
        # Perform connection
        connection = await aiormq.connect("amqp://guest:guest@localhost/")

        # Creating a channel
        channel = await connection.channel()

        await channel.exchange_declare(
            exchange='logs', exchange_type='fanout'
        )

        body = b' '.join(sys.argv[1:]) or b"Hello World!"

        # Sending the message
        await channel.basic_publish(
            body, routing_key='info', exchange='logs'
        )

        print(f" [x] Sent {body!r}")

        await connection.close()


    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())


Subscriber
**********

.. code-block:: python

    import asyncio
    import aiormq
    import aiormq.abc


    async def on_message(message: aiormq.abc.DeliveredMessage):
        print(f"[x] {message.body!r}")

        await message.channel.basic_ack(
            message.delivery.delivery_tag
        )


    async def main():
        # Perform connection
        connection = await aiormq.connect("amqp://guest:guest@localhost/")

        # Creating a channel
        channel = await connection.channel()
        await channel.basic_qos(prefetch_count=1)

        await channel.exchange_declare(
            exchange='logs', exchange_type='fanout'
        )

        # Declaring queue
        declare_ok = await channel.queue_declare(exclusive=True)

        # Binding the queue to the exchange
        await channel.queue_bind(declare_ok.queue, 'logs')

        # Start listening the queue with name 'task_queue'
        await channel.basic_consume(declare_ok.queue, on_message)


    loop = asyncio.get_event_loop()
    loop.create_task(main())

    # we enter a never-ending loop that waits for data
    # and runs callbacks whenever necessary.
    print(' [*] Waiting for logs. To exit press CTRL+C')
    loop.run_forever()


Routing
-------

Direct consumer
***************

.. code-block:: python

    import sys
    import asyncio
    import aiormq
    import aiormq.abc


    async def on_message(message: aiormq.abc.DeliveredMessage):
        print(f" [x] {message.delivery.routing_key!r}:{message.body!r}"
        await message.channel.basic_ack(
            message.delivery.delivery_tag
        )


    async def main():
        # Perform connection
        connection = aiormq.Connection("amqp://guest:guest@localhost/")
        await connection.connect()

        # Creating a channel
        channel = await connection.channel()
        await channel.basic_qos(prefetch_count=1)

        severities = sys.argv[1:]

        if not severities:
            sys.stderr.write(f"Usage: {sys.argv[0]} [info] [warning] [error]\n")
            sys.exit(1)

        # Declare an exchange
        await channel.exchange_declare(
            exchange='logs', exchange_type='direct'
        )

        # Declaring random queue
        declare_ok = await channel.queue_declare(durable=True, auto_delete=True)

        for severity in severities:
            await channel.queue_bind(