• Simple AWS
  • Posts
  • Reacting to changes with DynamoDB streams

Reacting to changes with DynamoDB streams

Executing a Lambda function every time an item changes in a DynamoDB table

You have an e-commerce, and you've implemented your backend with Lambda functions and DynamoDB. Thanks to our past issues you've already designed the data, throttled writes and implemented transactions. No need to go back to those issues though.

When a product's stock falls below a certain threshold (stored in the same Product item), you want to trigger your stock replenishment process, which consists of writing a replenishment order to another DynamoDB table (we'll make this a lot more complicated this in next week's issue).

We're going to use the following AWS services:

  • DynamoDB: Our friendly neighborhood NoSQL database. Now it has Streams!

  • Lambda: Serverless compute, which can be triggered by other AWS services.

Flow of messages

Solution step by step

I'm assuming everything is set up up until the user receives the OK.

  • Create the DynamoDB Stream

    1. Go to the AWS Console and open the DynamoDB service

    2. Select the Orders table.

    3. Click on the "Overview" tab and scroll down to the "DynamoDB Stream details" section.

    4. Click "Enable DynamoDB stream".

    5. Set the view type to "New and old images".

    6. Click "Enable"

  • Set up the IAM Policy for the StockReplenishment function

    1. Go to the IAM console.

    2. Click on "Policies" in the sidebar and click "Create policy".

    3. Choose the "JSON" tab and paste this policy document. Then click Review policy, name your policy "StockReplenishmentPolicy" and click Create.

{
  "Version": "2012-10-17",
  "Statement": [{
      "Sid": "DynamoDBStreamReadAccess",
      "Effect": "Allow",
      "Action": [
        "dynamodb:GetRecords",
        "dynamodb:GetShardIterator",
        "dynamodb:DescribeStream",
        "dynamodb:ListStreams"
      ],
      "Resource": "arn:aws:dynamodb:*:*:table/Orders/stream/*"
    },
    {
      "Sid": "DynamoDBTableWriteAccess",
      "Effect": "Allow",
      "Action": [
        "dynamodb:PutItem"
      ],
      "Resource": "arn:aws:dynamodb:*:*:table/Replenishment"
    }
  ]
}
  • Set up the IAM Role for the StockReplenishment function

    1. Go back to the IAM Console.

    2. Click on "Roles" in the sidebar and then click on "Create role".

    3. Choose "Lambda" as the service that will use this role.

    4. Click "Next: Permissions".

    5. In "Attach permissions policies", add the policies "AWSLambdaBasicExecutionRole" and "StockReplenishmentPolicy" (the one you just created).

    6. Click "Next: Tags", click "Next: Review".

    7. Name the role "StockReplenishmentRole", then click "Create role".

  • Create the Lambda function for the Stock Replenishment Service

    1. Go to the Lambda Console and click "Create function".

    2. Name the function “StockReplenishment" and pick Node.js as the runtime.

    3. Put the following code:

const AWS = require('aws-sdk');
const docClient = new AWS.DynamoDB.DocumentClient();
const {
  v4: uuidv4
} = require('uuid');

exports.handler = async (event) => {
  for (const record of event.Records) {
    if (record.eventName === 'MODIFY') {
      const oldImage = AWS.DynamoDB.Converter.unmarshall(record.dynamodb.OldImage);
      const newImage = AWS.DynamoDB.Converter.unmarshall(record.dynamodb.NewImage);

      if (newImage.stock < newImage.reorderThreshold && oldImage.stock >= oldImage.reorderThreshold) {
        const params = {
          TableName: 'ReplenishmentTable',
          Item: {
            'orderId': uuidv4(),
            'productId': newImage.productId,
            'date': new Date().toISOString(),
            'amountToReplenish': newImage.amountToReplenish
          }
        };

        try {
          const data = await docClient.put(params).promise();
          console.log('Replenishment order added to table', data);
        } catch (err) {
          console.log('Error adding replenishment order to table', err);
        }
      }
    }
  }
};
  • Set the trigger for the StockReplenishment function

    1. On the same Lambda function page, click on "Add trigger".

    2. Choose "DynamoDB" from the dropdown menu.

    3. Select the Orders DynamoDB table and the stream.

    4. Leave the "Batch size" as 100 (default value) and "Starting position" as "Latest".

    5. Enable the trigger (don't forget this step!).

    6. Click "Add".

  • Attach the IAM Role to the Lambda function

    1. On the same Lambda function page, go to the "Configuration" tab.

    2. In the "General configuration" section, find the "Execution role" setting.

    3. Click on "Edit" next to "Execution role". Select "Use an existing role" and choose "StockReplenishmentRole".

    4. Click "Save".

  • Test it!

    1. On the Lambda function page, click on "Test".

    2. Click on "Create new test event".

    3. In the event template, select "Amazon DynamoDB update stream".

    4. In the test event data, input the JSON object below to simulate an event.

    5. Click "Create" and then "Test".

    6. Check the execution result and logs to verify that the Lambda function correctly writes an item to the replenishment table.

{
  "Records": [{
    "eventID": "example",
    "eventName": "MODIFY",
    "eventVersion": "1.0",
    "eventSource": "aws:dynamodb",
    "awsRegion": "us-east-1",
    "dynamodb": {
      "ApproximateCreationDateTime": 1623867600,
      "Keys": {
        "productId": {
          "S": "p#1234"
        }
      },
      "OldImage": {
        "productId": {
          "S": "p#1234"
        },
        "stock": {
          "N": "51"
        },
        "reorderThreshold": {
          "N": "50"
        },
        "amountToReplenish": {
          "N": "30"
        }
      },
      "NewImage": {
        "productId": {
          "S": "p#1234"
        },
        "stock": {
          "N": "49"
        },
        "reorderThreshold": {
          "N": "50"
        },
        "amountToReplenish": {
          "N": "30"
        }
      },
      "SequenceNumber": "example",
      "SizeBytes": 128,
      "StreamViewType": "NEW_AND_OLD_IMAGES"
    },
    "eventSourceARN": "example"
  }]
}

Solution explanation

  • Create the DynamoDB Stream

    This is the key to the solution. The StockReplenishment function is just a regular Lambda function (we'll go over the code in a bit), that's triggered every time an item is updated in our DynamoDB table. DynamoDB Streams provide an ordered set of changes from your DynamoDB table for up to 24 hours after items are written. Once you enable the stream on your table events begin to flow into the stream.

  • Set up the IAM Policy for the StockReplenishment function
    This policy lets our function read events from the Stream of the Orders table, and write to the Replenishment table.

  • Set up the IAM Role for the StockReplenishment function
    The role uses the policy created in the previous step, plus the "AWSLambdaBasicExecutionRole" managed policy which allows the function to write logs to CloudWatch Logs.

  • Create the Lambda function for the Stock Replenishment Service
    The interesting part here is the event the Lambda function receives. We can check the type of event, and see the item before the changes (oldImage) and after the changes (newImage). That way we can compare the values before and after, to see if it was this particular order that put the stock below the threshold, thus avoiding creating a replenishment order for every order made while the stock is low.

  • Set the trigger for the StockReplenishment function
    We're triggering the StockReplenishment Lambda function from the DynamoDB Stream directly.

  • Attach the IAM Role to the Lambda function
    Just attaching the role we created earlier.

  • Test it!

    We need to generate a test event that looks like the record our Lambda function will receive from the DynamoDB Stream.
    Of course, after this local test you should test the whole system.

Discussion

The obvious question is why didn't we just add this to the Orders service. It's just an if statement and a write to the database, right?

The answer is that we could run into a race condition where two instances of that function each thought they had put the stock below the threshold, and we got 2 replenishment orders created for the same product. We do have transactions in the Orders service, and we can add a check for the value of stock and a write to the Replenishment table. However, we can't add conditions for some of the operations of the transaction. We want our transaction to look like this:

  1. Check value of stock

  2. If stock > quantity, continue with the transaction

  3. Decrease stock by quantity

  4. Write order

  5. Check value of threshold

  6. Check or calculate value of newStock

  7. If newStock < threshold, continue with the transaction

  8. Write replenishmentOrder

That looks nice. The problem is with step 7, if that step returned false, DynamoDB would abort the whole transaction, and our order wouldn't be written. Conditions in DynamoDB transactions don't apply to some operations, either they're all true and the transaction is committed as a whole, or at least one of them is false and the transaction is rolled back.

DynamoDB Streams delivers events exactly once, so there's no risk of processing the same event multiple times.

One thing that we could actually do to make this work on inside the Orders service is to set up a second transaction which checks for any existing replenishment orders for that product, and only creates a new one if there are none. For this particular case, that would be sufficient. But I still wanted to introduce this solution, for other use cases where you can't add that arbitrary constraint, or can't check if the result of processing the event was already written or not.

Best Practices

Operational Excellence

  • Use X-Ray for tracing: AWS X-Ray helps you analyze and debug distributed applications like this one. You can trace requests from the moment they hit the API Gateway in front of the Orders function, follow them through the DynamoDB write, the event in the DynamoDB stream, the Replenishment function, all the way to the final write in the Replenishment table.

Security

  • Encrypt at Rest: Enable DynamoDB encryption at rest. It's transparent to you, and you can use Key Management Service (KMS) to manage the encryption keys.

  • Use minimum permissions: I say this on every issue, because it's worth saying on every issue. You're granting your Lambda function permissions to do stuff on your AWS account, through the Policy attached to the Role associated with the function. Make sure those permissions are only what the function needs to do its job.

Reliability

  • Enable DynamoDB Auto Scaling: DynamoDB Auto Scaling automatically adjusts read and write capacity settings in response to actual traffic patterns.

  • Throttle writes if necessary: Lambda functions scale really fast. DynamoDB tables in Provisioned mode don't scale as fast. If a ton of users go to your website, your Lambda functions will be able to handle the traffic spike, but they'll overload a DynamoDB table. To prevent that, use an SQS queue to throttle those writes, like we discussed in the issue from 2 weeks ago.

    Use Dead Letter Queues for Lambda: Dead Letter Queues (DLQs) can be set up for AWS Lambda to direct unprocessed events triggered by the DynamoDB Streams. This would help in capturing any event data that wasn't processed, so you can inspect why it wasn't handled.

  • Enable DynamoDB Point-in-Time Recovery: This feature provides continuous backups of your DynamoDB table data for the last 35 days. Useful in case of accidental deletion or modification!

Performance Efficiency

  • Use the right view type: With DynamoDB Streams you can set different view types. The difference is in the data that's read from the table and passed on the event. Since you're charged per reads or per capacity consumed by those reads, pick the right view type for your use case so you're only reading the necessary data, and not more.

    • KEYS_ONLY: Only includes the key attribute(s) of the modified item. Useful when you only need to know which items were modified, without requiring any details about the change.

    • NEW_IMAGE: Includes the entire item as it appears after it was modified. Useful when you need to see the current state of an item after it's been changed, without needing to know what it looked like before.

    • OLD_IMAGE: Provides the entire item as it appeared before it was modified. Useful when you need to know what an item looked like prior to a change, without needing to see what it looks like now.

    • NEW_AND_OLD_IMAGES: Provides both the new and the old images of the item. Useful when you need to compare the state of an item before and after a modification.

  • Optimize your Lambda function: We discussed Lambdas in the final issue of 2022. That's a long time ago for this newsletter, which has been running for only 6 months! I promise to do a proper Lambdas issue soon. In the meantime, here's the 20 advanced tips for Lambda issue.

Cost Optimization

  • Right-size your DynamoDB tables: There's Provisioned mode, where you set the capacity and have it auto-scale, and there's On-Demand Mode, where it just works and you're billed per operation. On-Demand mode is awesome, but it's 5x more expensive than Provisioned mode. Use it if you're paying very little, but once your bill starts growing, switch to Provisioned mode and right-size it.

  • Use Savings Plans: Once you have some base traffic, get a Savings Plan so you pay a lot less for that known traffic.

Resources

This time I've got a free ebook for you: 101 Tips For Becoming A Better Developer, by Harley Ferguson. You get it for free when you subscribe to his newsletter, The 10X Developer. Here's how:

We've been talking a lot about DynamoDB. If you like this, you should try these workshops:

And if you want to get an AWS certification, these are by far the best courses I've seen (in fact I bought them all). I also give them for free if you refer subscribers to Simple AWS!

Did you like this issue?

Login or Subscribe to participate in polls.

Join the conversation

or to participate.