OVO Tech Blog
OVO Tech Blog

Our journey navigating the technosphere

I work in the Orion Migrations team

Share


Tags


Working with SQS and lambdas

For a while (since mid 2018) on AWS we have been able to use SQS queues as event sources for lambda functions . As a life-long lazy person, I saw this as an opportunity to do less work by writing less code. As a career-long clumsy coder, I also saw the opportunity to reduce my opportunity to make mistakes, by writing less code.

Less code how?

Looking into the specifics of how SQS and lambdas interact, we can see things that we can do with infrastructure which we would otherwise have done with code.

Start with a trivial lambda

Here’s a trivial python lambda, which does pretty much nothing.

def lambda_handler(event, context):
    value1 = event["key1"]
    stuff = { 'name': value1 } 
    print('hello: ' + value1)
    return {
        'statusCode': 200,
        'body': json.dumps(stuff)
    }

I can trigger this with a trivial event like this

{"key1":"x"}

Give it a queue

It’s simple to add a SQS trigger, create the queue, add the permission to the lambda’s execution role. So let’s add some events (I'm just using the web console to do this) and let it rock:

{"key1":"x"}
{"key1":"y"}
...

Oops

[ERROR] KeyError: 'key1'
Traceback (most recent call last):
  File "/var/task/lambda_function.py", line 4, in lambda_handler
    value1 = event["key1"]

We’ve hit an error!

Not only that, but for a single queue item we hit this error again and again, because the SQS lambda trigger will try to reprocess the item, and keep doing so until the max retention period set on the queue, which could be a long time.

So what happened? Looking into the SQS docs we can see that you can’t just bang the items into a queue and expect it to work. The event that is fed to the lambda is now of the form:

{
    "Records": [
        {
            "messageId": "059f36b4-87a3-44ab-83d2-661975830a7d",
            "receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a...",
            "body": "{ /”key1/”: /”y/”/ }",
            "attributes": {
                "ApproximateReceiveCount": "1",
                "SentTimestamp": "1545082649183",
                "SenderId": "AIDAIENQZJOLO23YVJ4VO",
                "ApproximateFirstReceiveTimestamp": "1545082649185"
            },
            "messageAttributes": {},
            "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3",
            "eventSource": "aws:sqs",
            "eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:my-queue",
            "awsRegion": "us-east-2"
        },
        {
            "messageId": "2e1424d4-f796-459a-8184-9c92662be6da",
            "receiptHandle": "AQEBzWwaftRI0KuVm4tP+/7q1rGgNqicHq...",
            "body": "{ /”key1/”: /”y/”/ }",
            "attributes": {
                "ApproximateReceiveCount": "1",
                "SentTimestamp": "1545082650636",
                "SenderId": "AIDAIENQZJOLO23YVJ4VO",
                "ApproximateFirstReceiveTimestamp": "1545082650649"
            },
            "messageAttributes": {},
            "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3",
            "eventSource": "aws:sqs",
            "eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:my-queue",
            "awsRegion": "us-east-2"
        }
    ]
}

That’s quite a lot more stuff in it than we put in. There is a lot of AWS metadata, much of which we can ignore but might be useful if we wanted to do something non-standard in our interaction with the queue. The key bit is:

"Records": [
        {
"body": "{ /"key1/": /"x/"/ }"
},{
"body": "{ /"key1/": /"y/"/ }",
}
]

Accept a queue or an object
Because the SQS lambda trigger batches messages, our lambda can no longer expect a single event to process. It needs to know about the batch, so we have to do a little work for our lambda to make sense for its new input

def lambda_handler(event, context):
    records = event['Records']
    for item in records:
        body = json.loads(item["body"])
        value1 = body["key1"]
        stuff = { 'name': value1 } 
        print('hello: ' + value1)
        
    return {
        'statusCode': 200,
        'body': str(len(records)) + ' processed'
    }

Now we run without error. The designers at AWS could have built it so that it just worked without altering the lambda, making adding a queue to the workflow more seamless, but clearly allowing batching of calls was more important, and this is pretty much the minimal change that would require.

One annoyance here is now I can no longer run my lambda with a single record, unless I add it to the queue first. If we want to preserve that then we could add a little conditional logic to ensure that we can call the lambda either way:

import json

def is_a_queue(event):
    return not bool(event.get('Records'))


def read_queue(event):
    records = []
    for record in event.get('Records', []):
        records.append(json.loads(record.get('body')))

    return records

def lambda_handler(event, context):
    if is_a_queue(event):
        records = read_queue(event)
    else:
        records = [event]
        
    for item in records:
        print('hello: ' + item.get('key1'))
        
    return {
        'statusCode': 200,
        'body': str(len(records)) + ' processed'
    }

Controlling rate

Queuing up a load of work is all very well, but sometimes we don’t want to blast through all the work as fast as we can. For example maybe our lambda calls a service which we don’t want to overload. Or maybe it is just important that we call it at certain times. It looks like AWS has got something we can use for this.

This looks great! (as long as we only need to delay things by 15 minutes). So let’s test this by putting in 1 message with no delay, then another with a 2 minute delay:

So apparently my message with a 2 minute delay was not delayed by 2 minutes. The only reason we see it 14 seconds after was because I’m putting these into the console manually, and that’s how long it took me to send the second message. If I were to queue both these through a script, they would have got processed immediately after each other. So what happened?

Um?
A quick peruse of the internet indicates that, if there is a lambda consuming a queue, the delay is ignored! AWS doesn’t document this so it is perhaps an oversight on their part. Maybe the consumption of the queue by a lambda is not treated the same under the hood as with a regular consumer of the queue. Let’s test this hypothesis.

Consuming the Queue explicitly

I have removed the SQS trigger from the lambda, and added code inside the lambda to look at the queue and process each message it finds as it did previously.

import json
import boto3
import time

sqs = boto3.client('sqs')

queueUrl = 'https://sqs.eu-west-2.amazonaws.com/816733975012/jonnys-test-queue'

def process_queue():
    response = sqs.receive_message(
        QueueUrl=queueUrl
    )
    
    messages = response.get('Messages',[])
    for message in messages:
        print('hello: ' + json.loads(message.get('Body','{}')).get('key1'))
        receipt_handle = message['ReceiptHandle']

        sqs.delete_message(
            QueueUrl=queueUrl,
            ReceiptHandle=receipt_handle
        )
        
def lambda_handler(event, context):
    interval = 1
    for i in range(300):
        time.sleep(1)
        process_queue()
        
    return {
        'statusCode': 200,
        'body': str(len(messages)) + ' processed'
    }

I had to increase the lambda timeout to run this, it is usually set to just a few seconds.

Well, this is somewhat disappointing. Even though I am no longer using the lambda trigger, still the delay has no effect. In fact the message with the delay was even processed first, despite me adding it second, as if to mock me. Sadly I feel this is the end of the road for slowing things down in this way

Other ways to slow processing the queue down

Having failed to do this using the features of SQS, the question of how to control the speed of processing remains. Our problem is that the tools are designed to do the job as quick as possible. Given the opportunity, AWS unleashes swarms of instances of the lambda on the problem. So one way to control the speed is to hobble this capability.

Setting this value means that only 2 copies of the lambda can run concurrently, rather than as many as AWS can let loose on the work. It is hard to guess exactly what value to set here, depending on how long the lambda needs to run, how much the resource the lambda is using can take. I have found that setting it low, monitoring the logs and slowly increasing it until you reach a level which suits you works well.

This is a somewhat crude and unsatisfying way of doing things, and takes quite a bit of manual intervention. Also it has further problems. So hopefully the brains at AWS come up with something which meets this need - or actually implements their delays correctly.

I work in the Orion Migrations team

View Comments