Writing AWS Lambda functions behind SQS with LambdaQ

🔗 Thursday, 11 April 2024 • Cariad Eccleston • @cariad@gamedev.lgbt

I got bored of writing the boilerplate. My team got bored of reviewing it. No more!

A screenshot of the AWS Step Functions console showing the numbers 100 and 15 being summed.

I write a lot of Amazon Web Services Lambda functions, and the vast majority are invoked asynchronously by messages on SQS queues published by Step Functions state machines, rather than directly and synchronously.

It just makes more sense to me to have functions pop messages as-and-when they’re ready, rather than having the state machine try, fail, back-off and repeat, right?

The downside, naturally, is the boilerplate in each function for parsing SQS events and finalising each task’s success or failure with calls to the Step Functions SendTaskSuccess and SendTaskFailure APIs.

That’s not so bad, though. The work I enjoy a little less is migrating Lambda functions from synchronous to asynchronous invocation.

When a function is invoked directly, the event is the message; it’s a one-shot task that returns its single response. When a function is invoked via an SQS queue, though, the event is a collection or one-or-more SQS records, each containing a serialised message that must be responded to via Step Functions' APIs. And updating a function’s entrypoint to support both conditions while state machines migrate from synchronous to asynchronous invocation is… well, it’s a living. But I’m bored of writing the same boilerplate every week.

And that’s why I wrote LambdaQ!

LambdaQ

LambdaQ (“lambda-queue”) is a Python package that helps you write Amazon Web Services Lambda functions that can be called either directly or by messages on SQS queues published by Step Functions state machines.

The package takes care of the event handling, message deserialisation and API calls, while you focus on writing decoupled, testable message-handling code.

Time for an example, right?

Example

Let’s say you want to write a Lambda function that receives two numbers and returns their sum.

We’ll start by defining dictionaries for the message (i.e. the numbers to sum) and the response (i.e. the sum):

from typing import TypedDict

class _Inputs(TypedDict):
    x: int
    y: int

class Inputs(_Inputs, total=False):
    task_token: str

class Sum(TypedDict):
    result: int

In this case, the task_token key is optional because it’ll be present only when the function is called asynchronously by a state machine.

With those strongly-typed inputs and outputs in place, we can write a message handler:

from lambdaq import Metadata

def perform_sum(message: Inputs, metadata: Metadata) -> Sum:
    return Sum(result=message["x"] + message["y"])

The message handler’s signature is important; it must accept a message and a lambdaq.Metadata instance. In this case, our message is an Inputs instance and we return a Sum instance.

The message handler doesn’t care whether the message came from direct invocation or via an SQS queue. It doesn’t care if the response will be returned directly or via a Step Functions API call. It doesn’t care if its exceptions will be re-thrown to the caller or handled and returned to the state machine via its APIs. It’s just a decoupled, testable function.

Finally, we give the Lambda function an entrypoint that calls lambdaq.handle_event:

from lambdaq import handle_event

def main(event: Any, context: Any) -> Sum | None:
    return handle_event(
        event,
        perform_sum,
        "task_token",
    )

The three arguments for handle_event are the invocation event, a reference to the message-handling function and the task token’s key. The function will return a response when called directly, or None when called via a queue.

And that’s it!

If we invoke the function directly then we get the sum back immediately:

Screenshot of the AWS Lambda console showing a a test payload to request the sum of the integers 1 and 2.
Screenshot of the AWS Lambda console showing a function's result is the integer value 3.

…and if a state machine invokes the function asynchronously via an SQS queue then the response is returned when it’s good and ready:

A screenshot of the AWS Step Functions console showing the numbers 100 and 15 being summed.

And if your queue is configured to buffer and batch messages to Lambda functions, LambdaQ handles that too and calls your message handler for each one.

Installation

LambdaQ v1 requires Python 3.10 or later and can be installed from PyPI.

pip install lambdaq

The code is released under the MIT licence at github.com/cariad/lambdaq, too.

Have fun!