python asyncio SQS consumer
In this post we are going to stand up a basic Python SQS consumer that leverages asyncio and experiments with some different types of workloads.
Why would you be interested in this post?
- You're working in Python and want to create a service to consume from SQS.
- You're interested in an application of asyncio (in a Python 3.12 context).
- You're interested in seeing HTTPX used for an IO-bound workload (take an SQS message and process several HTTP calls).
This service was created with the idea that you would ultimately run it in something like Kubernetes. In this post, we will stop significantly short of containerizing and running in a cluster.
Overview of Key Dependencies
I will provide an overview of the interesting dependencies I selected for this service. I tried to articulate the reasoning/idea behind why I used what, but all of this in the context of trying to construct a basic service that would run in a container, that could process SQS messages and make HTTP calls.
The really notable stuff in this service
- aiobotocore 2.8.0 https://github.com/aio-libs/aiobotocore and https://aiobotocore.readthedocs.io/en/latest/
- The dependency I struggled the most with. I was looking for an asynchronous way to get SQS messages.
- None of this should be a criticism of this project, I think it is fairly unclear if you're new to Python+SQS what the best options for a client are. At least when I compare it to other clients, like picking an HTTP client where you find clear leaders to pick from.
- Discussions on this are probably pretty similar to other discussions in the Python community about how to deal with libraries that don't support asyncio async/await patterns. Resulting in these replicas of the original non-async project.
- This project looks to be reasonably maintained, and it seemed very unlikely the boto was going to get asyncio support.
- The dependency I struggled the most with. I was looking for an asynchronous way to get SQS messages.
- httpx 0.24.2 https://www.python-httpx.org/
- I haven't tried other client's asyncio support, on cursory review I was pleased with the documentation and never hit any roadblocks. It just worked and the code was clean wrt async/await.
- pydantic 2.5.2 https://docs.pydantic.dev/latest/
- I used this to model the JSON coming out of SQS and the request/response from the HTTPX calls. If I had to do it over again I might go with dataclasses, but I was partially using this as an opportunity to explore pydantic.
- python 3.12 https://www.python.org/downloads/release/python-3120/
- It would be fair criticism that I don't effectively leverage new language features (when applicable).
Less critical things:
- python-json-logger 2.0.7 https://github.com/madzak/python-json-logger
- This helped me structure the logs in JSON.
- black 23.11.0 https://black.readthedocs.io/en/stable/
- I have been getting a ton of value here, ymmv. I love that it consistently and with no effort on my part produces very readable and organized formatting.
- mypy 1.7.1 https://mypy.readthedocs.io/en/stable/
- static type checker - I still bump into odd things, but have found the juice worth the squeeze as more developers come into a project.
- respx 0.20.2 https://lundberg.github.io/respx/
- Used for mocking out HTTPX. I found it pretty helpful, but saw some peers get bound up by not understanding the scope of it.
Service Setup - Python and Poetry
There are many ways to manage dependencies and Python environments, I won't argue this is the best. But it has worked well for me developing on Ubuntu where I do most of the actual coding in IntelliJ and have multiple projects with different Python versions (some having more complicated dependencies).
Requirements (you should have this already)
- Mamba installed (in my case I went for micromamba) https://mamba.readthedocs.io/en/latest/user_guide/micromamba.html
Setup the mamba environment
mamba create -n sqs-async-consumer -c conda-forge python=3.12
Optional Tip After Mamba Environment Creation
If you use ZSH, you can modify your .zshrc file to automatically set the environment when you navigate to the directory for the project.
function mamba_auto_activate() {
if [ "$(pwd)" = "/home/dev/Desktop/python/sqs-async-consumer" ]; then
mamba activate sqs-async-consumer
fi
}
chpwd_functions+=("mamba_auto_activate")
Install Poetry
I think there are a number of ways to install poetry https://python-poetry.org/
I am really happy with poetry (relative to pip) for tracking my python services dependencies, versioning, and locking dependencies.
I have had much less success trying to use poetry for virtualization (hence me still leaning on mamba). This could just be my own failing or lack of sufficient knowledge + research into poetry and how best to use it. When I have tried to run pure poetry, I get tangled up with IntelliJ not being able to correctly interact with the environment (important for me since I prefer to use Intellij to run/test/debug the service during my more normal development cycle).
Install Poetry Dependencies
This will install aiobotocore and all the other stuff we need for the service to run.
poetry install
The pyproject.toml
is a critical file to review https://github.com/AnthonyHonstain/sqs-async-python-consumer/blob/main/pyproject.toml
[tool.poetry]
name = "sqs-consumer-project"
version = "0.1.0"
description = ""
authors = ["Your Name <you@example.com>"]
readme = "README.md"
[tool.poetry.dependencies]
python = "^3.12"
aiobotocore = "^2.8.0"
pydantic = "^2.5.2"
black = "^23.11.0"
httpx = "^0.25.2"
mypy = "^1.7.1"
python-json-logger = "^2.0.7"
[tool.poetry.group.dev.dependencies]
pytest = "^7.4.3"
pytest-asyncio = "^0.23.2"
respx = "^0.20.2"
[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"
# Black configuration
[tool.black]
line-length = 120
I also found it helpful to check the config
poetry config --list
Docker Compose wiremock and localstack
We will use docker compose to stand up wiremock and localstack for development and testing.
- localstack is used for getting a local version of AWS's SQS product.
- Note that this docker-compose.yml also contains a basic init of localstack to automatically create the queue you want for local development.
- wiremock is used for proving a mock HTTP service
- Note - it is created with some starter mappings
Source for the docker-compose https://github.com/AnthonyHonstain/sqs-async-python-consumer/blob/main/docker-compose.yml
This is the docker-compose.yml
file for the service
version: '3.8'
services:
localstack:
image: localstack/localstack:3.1.0
ports:
- "4566:4566" # LocalStack's default edge port
- "4571:4571" # Deprecated port, but can be included for backward compatibility
environment:
- SERVICES=sqs
#- DEBUG=1
- DATA_DIR=/tmp/localstack/data
volumes:
# https://docs.localstack.cloud/getting-started/installation/#docker-compose
- "${LOCALSTACK_VOLUME_DIR:-./volume}:/var/lib/localstack"
wiremock:
image: wiremock/wiremock:3.3.1-2
ports:
- "8080:8080" # Default Wiremock port
volumes:
- ./wiremock:/home/wiremock
command: --verbose
localstack-init:
image: amazon/aws-cli:2.15.15
depends_on:
- localstack
environment:
AWS_ACCESS_KEY_ID: 'test'
AWS_SECRET_ACCESS_KEY: 'test'
AWS_DEFAULT_REGION: 'us-east-1'
volumes:
- ./init-localstack.sh:/init-localstack.sh # Corrected volume mount
entrypoint: /bin/sh
command: -c "/init-localstack.sh"
Setting up IntelliJ
I use the Ultimate version, but PyCharm should be fine. https://www.jetbrains.com/
Open module settings
Configure an SDK
Example IntelliJ configuration for running locally:
Example IntelliJ configuration for running all tests:
Run Tests
This can be done with poetry run pytest
Start the required dependencies
Run The Service Locally
You can use IntelliJ with the setting provided previously in this guide, or you can use the command line: poetry run python sqs_consumer_project/sqs_consumer.py
You can use jq to format the results: poetry run python sqs_consumer_project/sqs_consumer.py 2>&1 | jq
❯ poetry run python sqs_consumer_project/sqs_consumer.py
{"asctime": "2024-02-18 07:46:12,742", "name": "root", "levelname": "INFO", "message": "Polling for messages", "taskName": "Task-3"}
{"asctime": "2024-02-18 07:46:12,744", "name": "root", "levelname": "INFO", "message": "Polling for messages", "taskName": "Task-2"}
{"asctime": "2024-02-18 07:46:12,748", "name": "root", "levelname": "INFO", "message": "receive_messages got messages", "taskName": "Task-3", "message_count": 1}
{"asctime": "2024-02-18 07:46:12,748", "name": "root", "levelname": "INFO", "message": "Starting MessageId processing", "taskName": "Task-3", "message_id": "a4f2d436-36f3-408d-a747-f14687e103a1"}
{"asctime": "2024-02-18 07:46:12,748", "name": "root", "levelname": "INFO", "message": "pydantic model", "taskName": "Task-3", "sqs_message": "name='Anthony' age=2"}
{"asctime": "2024-02-18 07:46:12,748", "name": "root", "levelname": "INFO", "message": "Started work", "taskName": "Task-3", "message_name": "Anthony", "message_age": 2}
{"asctime": "2024-02-18 07:46:14,750", "name": "root", "levelname": "INFO", "message": "Polling for messages", "taskName": "Task-2"}
{"asctime": "2024-02-18 07:46:16,761", "name": "root", "levelname": "INFO", "message": "Polling for messages", "taskName": "Task-2"}
{"asctime": "2024-02-18 07:46:16,945", "name": "httpx", "levelname": "INFO", "message": "HTTP Request: POST http://localhost:8080/record_user \"HTTP/1.1 200 OK\"", "taskName": "Task-3"}
{"asctime": "2024-02-18 07:46:16,946", "name": "root", "levelname": "INFO", "message": "Received response", "taskName": "Task-3", "user_id": "xxxxxx"}
{"asctime": "2024-02-18 07:46:16,946", "name": "root", "levelname": "INFO", "message": "Completed work", "taskName": "Task-3", "message_name": "Anthony", "message_age": 3, "user_id": "xxxxxx"}
{"asctime": "2024-02-18 07:46:16,958", "name": "root", "levelname": "INFO", "message": "Polling for messages", "taskName": "Task-3"}
{"asctime": "2024-02-18 07:46:18,766", "name": "root", "levelname": "INFO", "message": "Polling for messages", "taskName": "Task-2"}
{"asctime": "2024-02-18 07:46:18,963", "name": "root", "levelname": "INFO", "message": "Polling for messages", "taskName": "Task-3"}
{"asctime": "2024-02-18 07:46:18,988", "name": "root", "levelname": "ERROR", "message": "Cancel Error", "taskName": "Task-2"}
{"asctime": "2024-02-18 07:46:18,988", "name": "root", "levelname": "INFO", "message": "Finished", "taskName": "Task-2"}
{"asctime": "2024-02-18 07:46:18,988", "name": "root", "levelname": "ERROR", "message": "Cancel Error", "taskName": "Task-3"}
{"asctime": "2024-02-18 07:46:18,988", "name": "root", "levelname": "INFO", "message": "Finished", "taskName": "Task-3"}
{"asctime": "2024-02-18 07:46:19,000", "name": "root", "levelname": "INFO", "message": "Script interrupted by user", "taskName": null}
The important sequence of work that one of the tasks will go through consuming SQS messages:
"message": "receive_messages got messages"
We have a message to work from SQS."message": "Started work"
We are going to make the HTTP call (we expect a delay).- You will see other work happening in the service during this time
"message": "Received response"
We got an HTTP response"message": "Completed work"
We successfully completed our work on the message and will signal it can be deleted.
Summary
We provided the outline of a service that could consume SQS messages and make HTTP calls and got basic functionality working. We also got some rudimentary logging and testing established.
You can get the whole repo on github if you want to pull it down to modify or review. This service is missing some things you would want before using it in a production context (configuration and more robust testing being notable). We may take this service and experiment with HTTPX/concurrency in subsequent posts.