python asyncio SQS consumer

Feb 18, 2024

In this post we are going to stand up a basic Python SQS consumer that leverages asyncio and experiments with some different types of workloads.

Why would you be interested in this post?

  • You're working in Python and want to create a service to consume from SQS.
  • You're interested in an application of asyncio (in a Python 3.12 context).
  • You're interested in seeing HTTPX used for an IO-bound workload (take an SQS message and process several HTTP calls).
💡
Source code for the consumer discussed in this post: https://github.com/AnthonyHonstain/sqs-async-python-consumer

This service was created with the idea that you would ultimately run it in something like Kubernetes. In this post, we will stop significantly short of containerizing and running in a cluster.

Overview of Key Dependencies

I will provide an overview of the interesting dependencies I selected for this service. I tried to articulate the reasoning/idea behind why I used what, but all of this in the context of trying to construct a basic service that would run in a container, that could process SQS messages and make HTTP calls.

The really notable stuff in this service

  • aiobotocore 2.8.0 https://github.com/aio-libs/aiobotocore and https://aiobotocore.readthedocs.io/en/latest/
    • The dependency I struggled the most with. I was looking for an asynchronous way to get SQS messages.
      • None of this should be a criticism of this project, I think it is fairly unclear if you're new to Python+SQS what the best options for a client are. At least when I compare it to other clients, like picking an HTTP client where you find clear leaders to pick from.
      • Discussions on this are probably pretty similar to other discussions in the Python community about how to deal with libraries that don't support asyncio async/await patterns. Resulting in these replicas of the original non-async project.
      • This project looks to be reasonably maintained, and it seemed very unlikely the boto was going to get asyncio support.
  • httpx 0.24.2 https://www.python-httpx.org/
    • I haven't tried other client's asyncio support, on cursory review I was pleased with the documentation and never hit any roadblocks. It just worked and the code was clean wrt async/await.
  • pydantic 2.5.2 https://docs.pydantic.dev/latest/
    • I used this to model the JSON coming out of SQS and the request/response from the HTTPX calls. If I had to do it over again I might go with dataclasses, but I was partially using this as an opportunity to explore pydantic.
  • python 3.12 https://www.python.org/downloads/release/python-3120/
    • It would be fair criticism that I don't effectively leverage new language features (when applicable).

Less critical things:

  • black 23.11.0 https://black.readthedocs.io/en/stable/
    • I have been getting a ton of value here, ymmv. I love that it consistently and with no effort on my part produces very readable and organized formatting.
  • mypy 1.7.1 https://mypy.readthedocs.io/en/stable/
    • static type checker - I still bump into odd things, but have found the juice worth the squeeze as more developers come into a project.
  • respx 0.20.2 https://lundberg.github.io/respx/
    • Used for mocking out HTTPX. I found it pretty helpful, but saw some peers get bound up by not understanding the scope of it.

Service Setup - Python and Poetry

There are many ways to manage dependencies and Python environments, I won't argue this is the best. But it has worked well for me developing on Ubuntu where I do most of the actual coding in IntelliJ and have multiple projects with different Python versions (some having more complicated dependencies).

Requirements (you should have this already)

Setup the mamba environment

mamba create -n sqs-async-consumer -c conda-forge  python=3.12

Optional Tip After Mamba Environment Creation

If you use ZSH, you can modify your .zshrc file to automatically set the environment when you navigate to the directory for the project.

function mamba_auto_activate() {
    if [ "$(pwd)" = "/home/dev/Desktop/python/sqs-async-consumer" ]; then
        mamba activate sqs-async-consumer
    fi
}
chpwd_functions+=("mamba_auto_activate")

Install Poetry

I think there are a number of ways to install poetry https://python-poetry.org/

I am really happy with poetry (relative to pip) for tracking my python services dependencies, versioning, and locking dependencies.

I have had much less success trying to use poetry for virtualization (hence me still leaning on mamba). This could just be my own failing or lack of sufficient knowledge + research into poetry and how best to use it. When I have tried to run pure poetry, I get tangled up with IntelliJ not being able to correctly interact with the environment (important for me since I prefer to use Intellij to run/test/debug the service during my more normal development cycle).

Install Poetry Dependencies

This will install aiobotocore and all the other stuff we need for the service to run.

poetry install

The pyproject.toml is a critical file to review https://github.com/AnthonyHonstain/sqs-async-python-consumer/blob/main/pyproject.toml

[tool.poetry]
name = "sqs-consumer-project"
version = "0.1.0"
description = ""
authors = ["Your Name <you@example.com>"]
readme = "README.md"

[tool.poetry.dependencies]
python = "^3.12"
aiobotocore = "^2.8.0"
pydantic = "^2.5.2"
black = "^23.11.0"
httpx = "^0.25.2"
mypy = "^1.7.1"
python-json-logger = "^2.0.7"

[tool.poetry.group.dev.dependencies]
pytest = "^7.4.3"
pytest-asyncio = "^0.23.2"
respx = "^0.20.2"

[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"

# Black configuration
[tool.black]
line-length = 120

I also found it helpful to check the config

poetry config --list

Docker Compose wiremock and localstack

We will use docker compose to stand up wiremock and localstack for development and testing.

  • localstack is used for getting a local version of AWS's SQS product.
    • Note that this docker-compose.yml also contains a basic init of localstack to automatically create the queue you want for local development.
  • wiremock is used for proving a mock HTTP service
    • Note - it is created with some starter mappings
💡
I ran into issues with some versions of localstack images when I was lazily trying to use latest.

Source for the docker-compose https://github.com/AnthonyHonstain/sqs-async-python-consumer/blob/main/docker-compose.yml

This is the docker-compose.yml file for the service

version: '3.8'

services:
  localstack:
    image: localstack/localstack:3.1.0
    ports:
      - "4566:4566" # LocalStack's default edge port
      - "4571:4571" # Deprecated port, but can be included for backward compatibility
    environment:
      - SERVICES=sqs
      #- DEBUG=1
      - DATA_DIR=/tmp/localstack/data
    volumes:
      # https://docs.localstack.cloud/getting-started/installation/#docker-compose
      - "${LOCALSTACK_VOLUME_DIR:-./volume}:/var/lib/localstack"

  wiremock:
    image: wiremock/wiremock:3.3.1-2
    ports:
      - "8080:8080" # Default Wiremock port
    volumes:
      - ./wiremock:/home/wiremock
    command: --verbose

  localstack-init:
    image: amazon/aws-cli:2.15.15
    depends_on:
      - localstack
    environment:
      AWS_ACCESS_KEY_ID: 'test'
      AWS_SECRET_ACCESS_KEY: 'test'
      AWS_DEFAULT_REGION: 'us-east-1'
    volumes:
      - ./init-localstack.sh:/init-localstack.sh  # Corrected volume mount
    entrypoint: /bin/sh
    command: -c "/init-localstack.sh"

Setting up IntelliJ

I use the Ultimate version, but PyCharm should be fine. https://www.jetbrains.com/

Open module settings

Configure an SDK

Example IntelliJ configuration for running locally:

Example IntelliJ configuration for running all tests:

Run Tests

This can be done with poetry run pytest

Start the required dependencies

💡
These tests require the docker container we depend on (localstack and wiremock) to be running, they are not automatically started or run by the test suite.

Run The Service Locally

You can use IntelliJ with the setting provided previously in this guide, or you can use the command line: poetry run python sqs_consumer_project/sqs_consumer.py

The docker compose up output - you can see the SQS receive and delete

You can use jq to format the results: poetry run python sqs_consumer_project/sqs_consumer.py 2>&1 | jq

❯ poetry run python sqs_consumer_project/sqs_consumer.py          

{"asctime": "2024-02-18 07:46:12,742", "name": "root", "levelname": "INFO", "message": "Polling for messages", "taskName": "Task-3"}
{"asctime": "2024-02-18 07:46:12,744", "name": "root", "levelname": "INFO", "message": "Polling for messages", "taskName": "Task-2"}
{"asctime": "2024-02-18 07:46:12,748", "name": "root", "levelname": "INFO", "message": "receive_messages got messages", "taskName": "Task-3", "message_count": 1}
{"asctime": "2024-02-18 07:46:12,748", "name": "root", "levelname": "INFO", "message": "Starting MessageId processing", "taskName": "Task-3", "message_id": "a4f2d436-36f3-408d-a747-f14687e103a1"}
{"asctime": "2024-02-18 07:46:12,748", "name": "root", "levelname": "INFO", "message": "pydantic model", "taskName": "Task-3", "sqs_message": "name='Anthony' age=2"}
{"asctime": "2024-02-18 07:46:12,748", "name": "root", "levelname": "INFO", "message": "Started work", "taskName": "Task-3", "message_name": "Anthony", "message_age": 2}
{"asctime": "2024-02-18 07:46:14,750", "name": "root", "levelname": "INFO", "message": "Polling for messages", "taskName": "Task-2"}
{"asctime": "2024-02-18 07:46:16,761", "name": "root", "levelname": "INFO", "message": "Polling for messages", "taskName": "Task-2"}
{"asctime": "2024-02-18 07:46:16,945", "name": "httpx", "levelname": "INFO", "message": "HTTP Request: POST http://localhost:8080/record_user \"HTTP/1.1 200 OK\"", "taskName": "Task-3"}
{"asctime": "2024-02-18 07:46:16,946", "name": "root", "levelname": "INFO", "message": "Received response", "taskName": "Task-3", "user_id": "xxxxxx"}
{"asctime": "2024-02-18 07:46:16,946", "name": "root", "levelname": "INFO", "message": "Completed work", "taskName": "Task-3", "message_name": "Anthony", "message_age": 3, "user_id": "xxxxxx"}
{"asctime": "2024-02-18 07:46:16,958", "name": "root", "levelname": "INFO", "message": "Polling for messages", "taskName": "Task-3"}
{"asctime": "2024-02-18 07:46:18,766", "name": "root", "levelname": "INFO", "message": "Polling for messages", "taskName": "Task-2"}
{"asctime": "2024-02-18 07:46:18,963", "name": "root", "levelname": "INFO", "message": "Polling for messages", "taskName": "Task-3"}
{"asctime": "2024-02-18 07:46:18,988", "name": "root", "levelname": "ERROR", "message": "Cancel Error", "taskName": "Task-2"}
{"asctime": "2024-02-18 07:46:18,988", "name": "root", "levelname": "INFO", "message": "Finished", "taskName": "Task-2"}
{"asctime": "2024-02-18 07:46:18,988", "name": "root", "levelname": "ERROR", "message": "Cancel Error", "taskName": "Task-3"}
{"asctime": "2024-02-18 07:46:18,988", "name": "root", "levelname": "INFO", "message": "Finished", "taskName": "Task-3"}
{"asctime": "2024-02-18 07:46:19,000", "name": "root", "levelname": "INFO", "message": "Script interrupted by user", "taskName": null}

The important sequence of work that one of the tasks will go through consuming SQS messages:

  • "message": "receive_messages got messages" We have a message to work from SQS.
  • "message": "Started work" We are going to make the HTTP call (we expect a delay).
  • You will see other work happening in the service during this time
  • "message": "Received response" We got an HTTP response
  • "message": "Completed work" We successfully completed our work on the message and will signal it can be deleted.

Summary

We provided the outline of a service that could consume SQS messages and make HTTP calls and got basic functionality working. We also got some rudimentary logging and testing established.

You can get the whole repo on github if you want to pull it down to modify or review. This service is missing some things you would want before using it in a production context (configuration and more robust testing being notable). We may take this service and experiment with HTTPX/concurrency in subsequent posts.

GitHub - AnthonyHonstain/sqs-async-python-consumer: An example service written in Python using asyncio that can consume SQS messages and execute HTTP calls based on those messages.
An example service written in Python using asyncio that can consume SQS messages and execute HTTP calls based on those messages. - GitHub - AnthonyHonstain/sqs-async-python-consumer: An example ser…