Give AlbumentationsX a star on GitHub — it powers this leaderboard

Star on GitHub

pytest-docker-tools

Docker integration tests for pytest

Rank: #3013Downloads: 1,685,715 (30 days)

Description

pytest-docker-tools

You have written a software application (in any language) and have packaged it as a Docker image. Now you want to smoke test the built image or do some integration testing with other containers before releasing it. You:

  • want to reason about your environment in a similar way to a docker-compose.yml`
  • want the environment to be automatically created and destroyed as tests run
  • want the option to reuse previously created resources (e.g. containers) when executing tests in high frequency
  • don't want to have to write loads of boilerplate code for creating the test environment
  • want to be able to run the tests in parallel
  • want the tests to be reliable

pytest-docker-tools is a set of opinionated helpers for creating py.test fixtures for your smoke testing and integration testing. It strives to keep your environment definition declarative, like a docker-compose.yml. It embraces py.test fixture overloading. It tries not to be too magical. It ended up kind of magical, but no more so that py.test itself.

The main interface provided by this library is a set of 'fixture factories'. It provides a 'best in class' implementation of a fixture, and then allows you to treat it as a template - injecting your own configuration declaratively. You can define your fixtures in your conftest.py and access them from all your tests, and you can override them as needed in individual test modules.

The API is straightforward and implicitly captures the dependencies between fixtures in the specification. For example, here is how it might look if you were building out a microservice and wanted to point its DNS and a mock DNS server:

# conftest.py

from http.client import HTTPConnection

import pytest
from pytest_docker_tools import build, container

fakedns_image = build(
    path='examples/resolver-service/dns',
)

fakedns = container(
    image='{fakedns_image.id}',
    environment={
        'DNS_EXAMPLE_COM__A': '127.0.0.1',
    }
)

apiserver_image = build(
    path='examples/resolver-service/api',
)

apiserver = container(
    image='{apiserver_image.id}',
    ports={
        '8080/tcp': None,
    },
    dns=['{fakedns.ips.primary}']
)


@pytest.fixture
def apiclient(apiserver):
    port = apiserver.ports['8080/tcp'][0]
    return HTTPConnection(f'localhost:{port}')

You can now create a test that exercises your microservice:

# test_smoketest.py

import socket

def test_my_frobulator(apiserver):
    sock = socket.socket()
    sock.connect(('127.0.0.1', apiserver.ports['8080/tcp'][0]))


def test_my_frobulator_works_after_restart(apiserver):
    apiserver.restart()

    sock = socket.socket()
    sock.connect(('127.0.0.1', apiserver.ports['8080/tcp'][0]))

In this example all the dependencies will be resolved in order and once per session:

  • The latest redis:latest will be fetched
  • A container image will be build from the Dockerfile in the db folder.

Then once per test:

  • A new volume will be created
  • A new 'backend' container will be created from redis:latest. It will be attached to the new volume.
  • A new 'frontend' container will be created from the freshly built container. It will be given the IP if the backend via an environment variable. Port 3679 in the container will be exposed as an ephemeral port on the host.

The test can then run and access the container via its ephemeral high port. At the end of the test the environment will be thrown away.

If the test fails the docker logs output from each container will be captured and added to the test output.

In the example you'll notice we defined an apiclient fixture. Of course if you use that it will implicitly pull in both of the server fixtures and 'just work':

# test_smoketest.py

import json


def test_api_server(apiclient):
    apiclient.request('GET', '/')
    response = apiclient.getresponse()
    assert response.status == 200
    assert json.loads(response.read()) == {'result': '127.0.0.1'}

Scope

All of the fixture factories take the scope keyword. Fixtures created with these factories will behave like any py.test fixture with that scope.

In this example we create a memcache that is session scoped and another that is module scoped.

# conftest.py

from pytest_docker_tools import container, fetch

memcache_image = fetch(repository='memcached:latest')

memcache_session = container(
    image='{memcache_image.id}',
    scope='session',
    ports={
        '11211/tcp': None,
    },
)

memcache_module = container(
    image='{memcache_image.id}',
    scope='module',
    ports={
        '11211/tcp': None,
    },
)

When test_scope_1.py runs neither container is running so a new instance of each is started. Their scope is longer than a single function so they are kept alive for the next test that needs them.

# test_scope_1.py

import socket

def test_session_1(memcache_session):
    sock = socket.socket()
    sock.connect(('127.0.0.1', memcache_session.ports['11211/tcp'][0]))
    sock.sendall(b'set mykey 0 600 4\r\ndata\r\n')
    sock.sendall(b'get mykey\r\n')
    assert sock.recv(1024) == b'STORED\r\nVALUE mykey 0 4\r\ndata\r\nEND\r\n'
    sock.close()

def test_session_2(memcache_session):
    sock = socket.socket()
    sock.connect(('127.0.0.1', memcache_session.ports['11211/tcp'][0]))
    sock.sendall(b'set mykey 0 600 4\r\ndata\r\n')
    sock.sendall(b'get mykey\r\n')
    assert sock.recv(1024) == b'STORED\r\nVALUE mykey 0 4\r\ndata\r\nEND\r\n'
    sock.close()

def test_module_1(memcache_module):
    sock = socket.socket()
    sock.connect(('127.0.0.1', memcache_module.ports['11211/tcp'][0]))
    sock.sendall(b'set mykey 0 600 4\r\ndata\r\n')
    sock.sendall(b'get mykey\r\n')
    assert sock.recv(1024) == b'STORED\r\nVALUE mykey 0 4\r\ndata\r\nEND\r\n'
    sock.close()

def test_module_2(memcache_module):
    sock = socket.socket()
    sock.connect(('127.0.0.1', memcache_module.ports['11211/tcp'][0]))
    sock.sendall(b'set mykey 0 600 4\r\ndata\r\n')
    sock.sendall(b'get mykey\r\n')
    assert sock.recv(1024) == b'STORED\r\nVALUE mykey 0 4\r\ndata\r\nEND\r\n'
    sock.close()

When test_scope_2.py runs the session scoped container is still running, so it will be reused. But we are now in a new module now so the module scoped container will have been destroyed. A new empty instance will be created.

# test_scope_2.py

import socket

def test_session_3(memcache_session):
    sock = socket.socket()
    sock.connect(('127.0.0.1', memcache_session.ports['11211/tcp'][0]))
    sock.sendall(b'get mykey\r\n')
    assert sock.recv(1024).endswith(b'END\r\n')
    sock.close()

def test_module_3(memcache_module):
    sock = socket.socket()
    sock.connect(('127.0.0.1', memcache_module.ports['11211/tcp'][0]))
    sock.sendall(b'get mykey\r\n')
    assert sock.recv(1024) == b'END\r\n'
    sock.close()

Parallelism

Integration and smoke tests are often slow, but a lot of time is spent waiting. So running tests in parallel is a great way to speed them up. pytest-docker-tools avoids creating resource names that could collide. It also makes it easy to not care what port your service is bound to. This means its a great fit for use with pytest-xdist.

Here is a bare minimum example that just tests creating and destroying 100 instances of a redis fixture that runs under xdist. Create a test_xdist.py plugin:


import pytest
from pytest_docker_tools import container, fetch

my_redis_image = fetch(repository='redis:latest')

my_redis = container(
    image='{my_redis_image.id}',
)


@pytest.mark.parametrize("i", list(range(100)))
def test_xdist(i, my_redis):
    assert my_redis.status == "running"

And invoke it with:

pytest test_xdist.py -n auto

It will create a worker per core and run the tests in parallel:

===================================== test session starts ======================================
platform darwin -- Python 3.6.5, pytest-3.6.3, py-1.5.4, pluggy-0.6.0
rootdir: ~/pytest-docker-tools, inifile:
plugins: xdist-1.22.2, forked-0.2, docker-tools-0.0.2
gw0 [100] / gw1 [100] / gw2 [100] / gw3 [100] / gw4 [100] / gw5 [100] / gw6 [100] / gw7 [100]
scheduling tests via LoadScheduling
......................................................................................... [ 82%]
...........                                                                              [100%]
================================= 100 passed in 70.08 seconds ==================================

Factories Reference

Containers

To create a container in your tests use the container fixture factory.

from pytest_docker_tools import container

my_microservice_backend = container(image='redis:latest')

The default scope for this factory is function. This means a new container will be created for each test.

The container fixture factory supports all parameters that can be passed to the docker-py run method. See here for them all.

Any string variables are interpolated against other defined fixtures. This means that a fixture can depend on other fixtures, and they will be built and run in order.

For example:

from pytest_docker_tools import container, fetch

redis_image = fetch(repository='redis:latest')
redis = container(image='{redis_image.id}')


def test_container_starts(redis):
    assert redis.status == "running"

This will fetch the latest redis:latest first, and then run a container from the exact image that was pulled. Note that if you don't use build or fetch to prepare a Docker image then the tag or hash that you specify must already exist on the host where you are running the tests. There is no implicit fetching of Docker images.

The container will be ready when the test is started, and will be automatically deleted after the tes