Building a real-time data processing pipeline with Kinesis and Lambda
How to set up Kinesis Data Streams and AWS Lambda to ingest, process and store clickstreams

Guille Ojeda
January 16, 2023
Welcome to Simple AWS! A free newsletter that helps you build on AWS without being an expert. This is issue #11. Shall we?
Use case: Building a real-time data processing pipeline with Kinesis and Lambda
AWS Services involved: Kinesis, Lambda
Scenario
Your business is generating a large amount of clickstream data from your web or mobile app, and you need to process them in real-time to gain insights and improve user engagement. As always, you want to do this in a scalable, secure, and cost-efficient way.
Services
AWS Kinesis and AWS Lambda are the services that we'll be using to analyze clickstream data. Kinesis allows for the ingestion of streaming data, while Lambda is used to process the data streams in real-time.
Solution

- Consider the volume of data and the required throughput.
- Set up a Kinesis Data Stream to ingest the clickstream data. Configure the number of shards, which determine the number of data streams that can be ingested and processed simultaneously, to handle your requirements.
- Collect the clickstream data and send it to the Kinesis stream.
- (Optional) Set up sessionization of the clickstream data using Kinesis Data Analytics.
- Create a Lambda function to process the clickstream data in real-time. This function can be triggered by the Kinesis stream, and it will contain the logic for processing the data, such as filtering, transforming, or aggregating the data.
- Send data from the Lambda function to Kinesis Data Firehose, to store the processed data in S3 or Redshift for further analysis.
- Set up CloudWatch to monitor the pipeline and troubleshoot any issues.
- Test the pipeline to ensure it is processing the data correctly.
How to Optimize
Operational Excellence
- Test end-to-end data flow: In addition to testing individual components, set up a test environment that mimics the production environment as closely as possible. Use this environment to test the entire data flow, from data collection to data processing and storage. This will help you identify and fix any issues with data format, data loss, or data processing errors.
- Monitor and troubleshoot the pipeline: CloudWatch allows you to view metrics, check logs, and set up alarms to be notified when something goes wrong. This helps you ensure the pipeline is running smoothly and quickly troubleshoot any issues that arise.
- Set up automatic scaling for the Kinesis stream: You can scale your Kinesis stream with CloudWatch and Lambda, or use On-Demand Mode. This ensures that the pipeline can handle the desired throughput and reduce costs by only using the necessary resources.
- Use Infrastructure as Code for your infrastructure: An IaC tool allows you to version control your infrastructure, rollback to previous versions and make it easy to replicate your infrastructure.
Security
- Don't let everyone write to your Kinesis stream: You can use API Gateway to expose access to your Kinesis stream, and protect it with Cognito, in a very similar way to how you expose serverless endpoints.
- Encrypt the data in S3: You don't actually need to do anything, because starting January 5 all uploads to S3 are encrypted. But you can use your own key if you want.
- Use IAM roles for Lambda to access the Kinesis stream: IAM roles allow you to grant permissions to Lambda to access the Kinesis stream while enforcing least privilege access controls.
Reliability
- Use SNS for notifications: SNS can be used to send notifications when certain events occur in the pipeline. This allows you to quickly identify and address issues that may impact the pipeline's reliability.
- Set up dedicated throughput: Use Kinesis Data Streams Enhanced Fan-Out to set up consumers with dedicated throughput.
- Handle duplicate events: Producers may retry on error, and Lambda consumers automatically retry on error. This can insert duplicate records into the stream. Write your consumers so they're idempotent.
Performance Efficiency
- Optimize the Lambda function's processing time: You can do this by reducing the number of calls to external services, using a high-performance language runtime, and optimizing the function's memory usage.
- Use AWS Lambda provisioned concurrency: Lambda provisioned concurrency allows you to automatically scale your Lambda function's capacity based on the number of incoming requests. This way you can set a minimum capacity and avoid cold starts for that capacity.
Cost Optimization
- Tune the Kinesis Stream shard count: The shard count determines the number of data streams that can be ingested and processed simultaneously. You should consider the volume of data and the required throughput when configuring the stream in Provisioned Mode, or just use On-Demand Mode.
- Optimize the Lambda function's memory: Fine-tune your Lambda function's configuration of CPU and memory to reduce your Lambda costs.
Resources
Here's a code sample to send data to a Kinesis Data Stream, in JavaScript:
const AWS = require('aws-sdk');
const kinesis = new AWS.Kinesis();
const sendData = async (streamName, data) => {
const params = {
Data: JSON.stringify(data),
PartitionKey: 'partitionKey',
StreamName: streamName
};
try {
await kinesis.putRecord(params).promise();
console.log(`Data sent to stream: ${data}`);
} catch (err) {
console.log(err);
}
};
const data = { userId: "123", event: "pageview" };
sendData("my-clickstream-data", data);
And here's a code sample to process the data and store it to S3 with a Lambda function in JavaScript:
const AWS = require('aws-sdk');
const kinesis = new AWS.Kinesis();\
const Firehose = new AWS.Firehose();
exports.handler = async (event) => {
const records = event.Records;
let data;
records.forEach((record) => {
const payload = new Buffer(record.kinesis.data, 'base64').toString('ascii');
data = JSON.parse(payload);
console.log(data);
});
processData(data);
storeData("my-clickstream-data-delivery-stream", data);
return {};
};
const processData = (data) => {
// add your data processing logic here
}
const storeData = async (deliveryStreamName, data) => {
const params = {
DeliveryStreamName: deliveryStreamName,
Record: {
Data: JSON.stringify(data)
}
};
try {
await Firehose.putRecord(params).promise();
console.log(`Data stored: ${data}`);
} catch (err) {
console.log(err);
}
};
If getting AWS Certified is among your new year's resolutions, let me recommend Adrian Cantrill's courses. With their mix of theory and practice, they're the best I've seen. I've literally bought them all (haven't watched them all yet). <-- This recommendation contains affiliate links.
Some of the above resources are paid promotions or contain affiliate links. I only recommend resources I've tried for myself and found actually useful, regardless of whether I get paid for it or not.
Misc.
I finally got around to setting up the domain! For now it's newsletter.simpleaws.dev, pending a proper configuration once I'm back from my holidays.
Also, I've split the recommendations in the How to Improve section on the five well-architected pillars (there are actually six now, but sustainability is harder to apply).
One more thing. I'm not sure if you noticed, but I've been skipping the haikus lately. On one hand, they're cool. On the other hand, they're fluff. Would you like them back? (if you've subscribed this month, you may not have heard that during last month I used to include a haiku (short poem) about AWS on every issue)
Do you miss the haikus? |
Thank you for reading! See ya on the next issue.