Building a Real-Time Fraud Detection and Analytics Pipeline on AWS with Kinesis, Apache Flink & OpenSearch
Introduction
Fraudulent activities are a significant threat to businesses in today's digital world. They can cause financial losses and security breaches, affecting both companies and their customers. That's why real-time fraud detection is crucial for modern transaction systems.
The challenge we face is processing large amounts of transactions instantly and accurately identifying suspicious patterns. Traditional methods of batch processing aren't effective in detecting fraud as it happens, which leads to delayed responses and potential financial damages.
Fortunately, there are powerful solutions available: AWS Kinesis, Apache Flink, Kinesis Firehose, and OpenSearch. Together, they form a robust real-time fraud detection and analytics pipeline capable of:
- Process millions of transactions per second
- Analyze patterns in real-time
- Trigger immediate alerts for suspicious activities
- Store transaction data for future analysis
AWS Kinesis acts as the central data stream, ingesting high-throughput transaction data in real time. This data is then processed in parallel by two consumers:
-
Apache Flink (via Kinesis Data Analytics) analyzes transaction streams for fraudulent behavior—such as detecting users with more than six consecutive high-value transactions—and sends real-time alerts through SNS.
-
Kinesis Data Firehose delivers a continuous stream of all transactions directly into Amazon OpenSearch, enabling teams to visualize, search, and analyze transactions over time with dashboards and queries.
This architecture brings several benefits to businesses:
- Minimize False Positives: Advanced pattern recognition reduces incorrect fraud flags
- Reduce Response Time: Instant detection and notification of suspicious activities
- Scale Automatically: Handle increasing transaction volumes without performance impact
- Maintain Historical Records: Store and analyze past transactions for continuous improvement
- Enable Visualization and Auditing: Full transaction visibility through OpenSearch dashboards
By integrating these AWS services, we build a comprehensive, real-time fraud detection and analytics system that not only acts instantly on threats but also supports ongoing monitoring and insights through powerful search and visualization tools.
Understanding AWS Kinesis
AWS Kinesis is Amazon's solution for real-time data streaming and analytics. This fully managed service enables you to collect, process, and analyze streaming data at any scale, making it a crucial tool for modern data-driven applications.
Kinesis Data Stream:
- Captures and stores data streams for processing
- Handles multiple data producers and consumers
- Retains data from 24 hours to 365 days
- Processes data in real-time with sub-second latency
Kinesis Data Firehouse:
- Loads streaming data into AWS data stores
- Automatically scales to match throughput
- Supports data transformation on the fly
- Enables near real-time analytics
AWS Managed Apache Flink:
- Processes data streams using SQL or Apache Flink
- Provides real-time analytics capabilities
- Integrates with machine learning models
- Generates insights from streaming data
AWS Kinesis shines in scenarios requiring immediate data processing. The service can handle various data types:
- Click streams from web applications
- Social media feeds
- IT logs and metrics
- IoT sensor data
- Financial transactions
The architecture of AWS Kinesis follows a producer-consumer model. Data producers send records to Kinesis streams, while consumers retrieve and process these records. Each stream maintains a sequence of data records organized in shards, providing ordered data delivery and dedicated throughput per shard.
Kinesis offers built-in resilience through automatic data replication across multiple AWS availability zones. The service manages the underlying infrastructure, eliminating the need for manual server provisioning or cluster management.
The pricing model follows a pay-as-you-go structure based on the amount of data ingested, stored, and processed. This flexibility allows you to scale your streaming applications cost- effectively while maintaining high performance and reliability.
Unveiling Fraudulent Footprints: Visualizing Behavior with AWS OpenSearch
Building a Fraud Detection Mechanism with AWS Services
A robust fraud detection system requires seamless integration of multiple AWS services working in harmony. Let's explore the architecture that powers real-time fraud detection and behavioral analysis using AWS services.
The core of the data pipeline begins with AWS Kinesis Data Streams, acting as the central nervous system for incoming transaction data. This real-time stream feeds directly into AWS Managed Apache Flink, where custom fraud detection algorithms perform immediate analysis. Upon identifying suspicious activity, Flink triggers instant alerts via Amazon SNS, ensuring immediate notification of potential threats.
To gain deeper insights beyond immediate detection, a parallel data flow leverages AWS Kinesis Data Firehose. Firehose subscribes to the relevant data stream (either the raw transactions or the processed output from Flink) and efficiently delivers it to Amazon OpenSearch Service. This integration enables the creation of interactive dashboards and visualizations, allowing analysts to explore patterns, user behaviors, and trends associated with fraudulent activities. This visual layer provides crucial context and aids in understanding the "how" and "why" behind the detected anomalies.
Optionally, Firehose can also be configured to deliver data to Amazon S3 for long-term archival, audit trails, or subsequent batch processing and machine learning analysis.
In essence, the data flows as follows:
- Transaction data enters AWS Kinesis Data Streams.
- AWS Managed Apache Flink consumes the stream, applying real-time fraud detection logic.
- Suspicious transactions trigger immediate alerts via Amazon SNS.
- Relevant data is simultaneously (or subsequently) streamed via AWS Kinesis Data Firehose to Amazon OpenSearch Service for visualization and behavioral analysis.
- (Optional) Data is also delivered via AWS Kinesis Data Firehose to Amazon S3 for archival and further analysis.
Connectivity between services is managed as follows:
- Kinesis-Flink Connection: AWS Managed Apache Flink applications are configured as consumers, reading directly and continuously from the designated Kinesis Data Streams.
- Flink-SNS Integration: Apache Flink applications utilize the AWS SDK to publish alert messages to specified Amazon SNS topics.
- Firehose-OpenSearch Integration: AWS Kinesis Data Firehose is configured with Amazon OpenSearch Service as a direct delivery destination, handling the complexities of data ingestion and indexing for efficient querying and visualization.
The entire architecture is designed for scalability, automatically adjusting to fluctuations in transaction volume. AWS Managed Apache Flink handles state management and exactly-once processing, while Firehose and OpenSearch are built for handling large-scale data ingestion and analysis. This integrated approach not only detects fraud in real-time but also provides the visual tools necessary to understand and combat evolving fraudulent behaviors.
Step 1 : Producers of AWS Kinesis Data Streams: Who Sends the Data?
Amazon Kinesis Data Streams (KDS) is a powerful service for ingesting real-time data at scale. But to truly leverage its power, you need producers—the components responsible for pushing data into your Kinesis stream.
So who—or what—are these producers? Let’s break it down.
🛠️ 1. Kinesis Agent
The Kinesis Agent is a standalone Java application you install on your server. It's perfect for collecting and sending data like log files or metrics from on-premise or EC2 servers directly into your Kinesis Data Stream.
- Best for: Log ingestion from servers
- Installation: Lightweight, configurable with a JSON config file
🧰 2. Kinesis Producer Library (KPL)
The KPL offers a higher-level abstraction over the lower-level Kinesis API. It simplifies the process of batching, queuing, and retrying records.
- Aggregation: One of its killer features. KPL can aggregate multiple user records into a single Kinesis record, maximizing throughput and saving costs.
- Language: Java (but works well with other languages via Kinesis Producer Daemon)
If you're building custom applications, you can use AWS SDKs available in multiple
🌐 Languages Supported: Java, Node.js, Python, Ruby, Go, and more
✍️ APIs: Use PutRecord
or PutRecords
to write data
🔧 Use case: Real-time analytics, custom dashboards, event-driven apps
💻 4. AWS CLI
Sometimes, you just want something quick and dirty—enter the AWS Command Line Interface.
-
🧪 Best for: Testing, scripting, and ad-hoc record insertion
-
⚡ Fast and simple: Fire and forget test data from your terminal
🔗 5. Direct AWS Service Integration
Certain AWS services can push data straight into Kinesis—no code, no hassle.
-
🔌 Services that integrate directly:
-
Amazon API Gateway (proxy integration)
-
AWS Lambda (as a proxy)
-
AWS IoT Core, EventBridge, and others
-
-
☁️ Use case: Serverless pipelines with minimal operational overhead
Here's how you can implement the data producer using Python AWS-SDK:
const AWS = require('aws-sdk');
// Configure AWS SDK
AWS.config.update({
region: 'us-east-1' // change if needed
});
const kinesis = new AWS.Kinesis();
const STREAM_NAME = 'transaction-stream';
// Helper: generate a random transaction
function generateTransaction() {
const userId = 'user' + Math.floor(Math.random() * 5 + 1); // user1 to user5
const transactionId = 'tx' + Math.floor(Math.random() * 1000000);
const amount = Math.random() < 0.2 ? 15000 : Math.floor(Math.random() * 1000); // occasionally trigger fraud
const timestamp = Date.now();
const location = ['NY', 'CA', 'TX', 'FL', 'WA','IN'][Math.floor(Math.random() * 5)];
return {
transactionId,
userId,
amount,
timestamp,
location
};
}
// Send transaction to Kinesis
function sendTransaction() {
const txn = generateTransaction();
const payload = JSON.stringify(txn);
const params = {
Data: payload,
PartitionKey: txn.userId, // ensures same user lands in same shard
StreamName: STREAM_NAME
};
kinesis.putRecord(params, (err, data) => {
if (err) {
console.error('Error sending to Kinesis:', err);
} else {
console.log('Sent:', payload);
}
});
}
// Send a new transaction every second
setInterval(sendTransaction, 1000);
The above code simulates real time transactions using Node.js This script:
- Randomly generates user transactions every
- Occasionally injects high-value transactions to mimic
- Sends each transaction as a JSON record to the Kinesis Data Stream.
Key Highlights:
- Uses AWS SDK to connect to Amazon Kinesis.
- Each record is sent with a PartitionKey based on userId to ensure transactions from the same user go to the same shard.
- Random values simulate both normal and suspicious
Step 2: Capturing Transaction Data in Real-Time with Kinesis Data Streams
To begin processing transactions in real time, we need a Kinesis Data Stream—a highly scalable and durable service for real-time data ingestion.
This stream acts as the entry point for all transaction records that our producer (Node.js script or any backend app) will send. Each incoming record is temporarily stored across shards, which determine the stream’s throughput capacity.
Sharding in AWS Data Stream:
- Start with 2-3 shards for initial testing
- Each shard handles up to 1MB/second input
- Each shard handles up to 1MB/second output
- Scale shards based on transaction volume
Retention Period in Data Stream:
- Set retention period between 24-168 hours
- Consider your processing window requirements
- Balance cost with data accessibility need
Setting up Kinesis Data Streams requires careful planning and implementation to ensure efficient data capture for your fraud detection system. Here's a detailed guide to configure your stream:
Open the AWS Management Console and:
- Navigate to Amazon Kinesis -> Data Stream
- Click "Create Data Stream"
- Enter a name, e.g. transaction-stream
- Set Number of shards = 2
- Click Create Stream
Once the stream is created, it may take a few seconds to become active
Best Practices while configuring AWS Kinesis Data Stream
⚙️ 1. Throughput Planning
Before you dive in, make sure your stream is sized for success.
-
-
📊 Shard Limits:
Each shard supports 1 MB/sec write and 2 MB/sec read throughput. -
🧮 Estimate Before You Build:
Calculate the average record size and records per second to determine the number of shards you need. -
🔁 Enhanced Fan-Out for Multiple Consumers:
If you have multiple downstream consumers (e.g., Apache Flink + Firehose), enable Enhanced Fan-Out. This gives each consumer a dedicated 2 MB/sec read pipe, reducing contention.
-
📈 2. Monitoring & Scaling
Kinesis offers great tools to help you watch and adapt in real time.
-
📉 CloudWatch Metrics to Monitor:
-
IncomingBytes
-
ReadProvisionedThroughputExceeded
These help detect when you're hitting shard limits.
-
-
📡 Set Up CloudWatch Alarms:
Alert on usage thresholds or sudden spikes in traffic.
🔐 3. Security & Access Control
Keep your data safe and your access policies tight.
-
🔑 Use IAM Policies:
Define who can read from and write to your streams. -
🛡️ Enable Server-Side Encryption:
Use AWS KMS to protect data in transit and at rest.
📦 4. Data Management & Replay
Make your data work harder for longer.
-
🕒 Data Retention:
Adjust from the default 24 hours up to 7 days if you need more time for reprocessing or compliance. -
⏪ Replay with TRIM_HORIZON or AT_TIMESTAMP:
Useful for replaying data into Flink or other consumers for backfills or failure recovery. -
🧯 Backup with Firehose:
For long-term storage, attach Kinesis Data Firehose to send data to Amazon S3 automatically.
By following these best practices, you ensure your Kinesis data pipeline is resilient, efficient, and scalable—ready to handle real-time workloads like fraud detection, log analytics, and IoT telemetry.
Step 3: Processing Data with Apache Flink for Fraud Detection
Apache Flink's event processing capabilities shine in fraud detection scenarios through its ability to analyze streaming data in real-time. AWS Managed Apache Flink simplifies this process by handling infrastructure management, letting you focus on building effective fraud detection logic.
Apache Flink uses several powerful techniques to detect fraudulent patterns:
- Windowing Operations: Create time-based or count-based windows to analyze transaction patterns within specific timeframes
- State Management: Track user behavior patterns across multiple transactions
- Complex Event Processing: Identify suspicious patterns by correlating multiple events in real-time
Here's a practical example of implementing fraud detection logic in Flink using JAVA Maven Project:
Project Structure:
Dependencies:
Transaction.java
package com.example.fraud.model;
public class Transaction {
public String transactionId;
public String userId;
public double amount;
public long timestamp;
public String location;
public Transaction() {}
@Override
public String toString() {
return String.format("Transaction[id=%s, user=%s, amount=%.2f, time=%d, location=%s]",
transactionId, userId, amount, timestamp, location);
}
}
SnsPublisher.java
package com.example.fraud.util;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.sns.SnsClient;
import software.amazon.awssdk.services.sns.model.PublishRequest;
public class SnsPublisher {
private static final SnsClient snsClient = SnsClient.builder()
.region(Region.US_EAST_1) // Change region if needed
.build();
private static final String TOPIC_ARN = "arn:aws:sns:us-east-1:011528266190:FraudAlerts"; // Replace with your SNS topic ARN
public static void publishAlert(String message) {
PublishRequest request = PublishRequest.builder()
.topicArn(TOPIC_ARN)
.message(message)
.subject("FRAUD ALERT")
.build();
snsClient.publish(request);
}
}
FraudDetectionJob.java
package com.example.fraud;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
import org.apache.flink.util.Collector;
import com.example.fraud.model.Transaction;
import com.example.fraud.util.SnsPublisher;
import com.fasterxml.jackson.databind.ObjectMapper;
public class FraudDetectionJob {
private static final ObjectMapper objectMapper = new ObjectMapper();
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties consumerConfig = new Properties();
consumerConfig.setProperty("aws.region", "us-east-1");
consumerConfig.setProperty("stream.initial.position", "LATEST");
FlinkKinesisConsumer<String> kinesisConsumer =
new FlinkKinesisConsumer<>("transaction-stream", new SimpleStringSchema(), consumerConfig);
env.getConfig().setAutoWatermarkInterval(1000);
env.addSource(kinesisConsumer)
.map(json -> objectMapper.readValue(json, Transaction.class))
.assignTimestampsAndWatermarks(WatermarkStrategy.<Transaction>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((txn, ts) -> txn.timestamp))
.keyBy(txn -> txn.userId)
.process(new FraudDetector())
.map(Transaction::toString)
.print();
env.execute("Real-Time Fraud Detection Job");
}
public static class FraudDetector extends KeyedProcessFunction<String, Transaction, Transaction> {
private transient ListState<Long> timestampState;
@Override
public void open(Configuration parameters) {
ListStateDescriptor<Long> descriptor = new ListStateDescriptor<>("timestamps", Long.class);
timestampState = getRuntimeContext().getListState(descriptor);
}
@Override
public void processElement(Transaction txn, Context ctx, Collector<Transaction> out) throws Exception {
boolean isFraud = false;
// Rule 1: High amount
if (txn.amount > 10000) {
isFraud = true;
}
// Rule 2: High velocity
long oneMinuteAgo = txn.timestamp - Time.minutes(1).toMilliseconds();
List<Long> timestamps = new ArrayList<>();
System.out.println(timestampState.get());
for (Long ts : timestampState.get()) {
if (ts >= oneMinuteAgo) {
timestamps.add(ts);
}
}
timestamps.add(txn.timestamp);
timestampState.update(timestamps);
if (timestamps.size() > 5) {
isFraud = true;
}
if (isFraud) {
String message = "FRAUD DETECTED: " + txn.toString();
SnsPublisher.publishAlert(message);
out.collect(txn);
}
}
}
}
You can implement various fraud detection patterns using Flink:
1. Velocity Checks
- Multiple transactions in different locations
- Rapid succession of transactions
- Unusual transaction frequency
2. Amount Pattern Analysis
- Sudden large transactions
- Multiple small transactions followed by large withdrawals
- Round number
Let’s break down the core components of the Flink job:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties consumerConfig = new Properties(); consumerConfig.setProperty("aws.region", "us-east-1"); consumerConfig.setProperty("stream.initial.position", "LATEST");
- Creates the Flink
- Sets up Kinesis stream config to start consuming from the latest records.
FlinkKinesisConsumer<String> kinesisConsumer =
new FlinkKinesisConsumer<>("transaction-stream", new SimpleStringSchema(), consumerConfig);
- Connects to the transaction-stream using Flink’s built-in Kinesis
.assignTimestampsAndWatermarks(WatermarkStrategy .forBoundedOutOf Orderness(Duration.ofSeconds(5)) .withTimestampAssigner((txn,ts) -> txn.timestamp))
- Applies a 5-second watermark to handle late-arriving
- Ensures transactions are processed in event-time order.
.keyBy(txn -> txn.userId) .process(new FraudDetector())
- Groups transactions per user using
- Passes them to FraudDetector, a custom Flink function with fraud.
if (txn.amount > 10000 || transactionsWithin1Min > 5) {
SnsPublisher.publishAlert(message);
out.collect(txn);
}
- Rule 1: Flags if amount > $10,000
- Rule 2: Checks if more than 5 transactions occurred within the last 1 minute
- If any rule is true, SNS alert is sent.
ListState timestampState = getRuntimeContext().getListState(...);
- Maintains a list of recent timestamps per userId
- Used to evaluate the frequency of transactions within a rolling
.map(Transaction::toString).print();
- Converts the flagged transaction to string
- Prints it to the Flink logs (can be replaced with a sink like S3 or Kafka).
This code forms the real-time brain of your fraud detection pipeline—processing, detecting, and alerting all in milliseconds.
Step 3: Creating AWS Managed Apache Flink in Console.
Now that our Kinesis Data Stream and JAVA code for Apache flink is ready, the next step is to build and deploy a real-time fraud detection application using AWS Managed Apache Flink.
AWS Managed Flink allows you to run Apache Flink applications without managing the infrastructure. We'll use it to consume transaction data from Kinesis, apply fraud detection logic, and trigger SNS alerts if suspicious activity is detected.
Navigate to your project root and run the following command:
mvn clean package
This will generate a JAR in the target/ directory, e.g., fraud-detection-app-new-0.0.1.jar.
Upload the JAR File to S3
- Go to the AWS S3 Console.
- Choose an existing bucket or create a new one (e.g., flink-fraud-detection-jars).
- Click Upload ! Select your JAR file (e.g.fraud-detection-app-new-0.0.1.jar).
- Copy the S3 URI, which will look like:
s3://flink-fraud-detection-jars/fraud-detection-app-new-0.0.1.jar
Create AWS Managed Apache Flink in console:
- Open the AWS Console, Go to AWS Managed Apache Flink (previously known as Kinesis Data Analysis).
2. Click Create streaming application
- Choose Create From Scratch
- Select Apache Flink VERSION 1.19 (same version used in JAVA project)
- Enter application name (e.g., fraud-detector-app).
- An IAM role will be created by default with basic permissions to get jar file from S3 Bucket and to write log streams to cloudwatch logs
- Add additional permission to the role to access SNS Topic as we have used SNS topic to push messages in our Apache Flink Java project
- Choose Deployment mode - Development/Production, we will go with Development as it's just a demo project
- Create Streaming Application
- Once the application is ready -> configure -> Choose the S3 Bucket where your jar file resides and enter the correct path of the file.
- Click Run - to start the application, you should see a successful running
Step 4: Sending Alerts Upon Fraud Detection using Amazon SNS Topics
Once Apache Flink identifies suspicious patterns, you'll need a reliable system to notify relevant stakeholders immediately. Amazon Simple Notification Service (SNS) provides a powerful solution for sending real-time alerts about potential fraud.
- Navigate to the SNS console
- Select "Create topic"
- Choose "Standard" topic type
- Set appropriate access policies
- Add email endpoints for fraud analysts
- Set up SMS notifications for urgent cases
- Include HTTPS endpoints for automated systems
You can find the full source code for this project on GitHub:
JAVA(Apache Flink) - https://github.com/ShalniGerald/aws-kinesis-apache-flink.git
Producer Node.js - https://github.com/ShalniGerald/aws-kinesis-data-stream-producer.git
Visualizing Transactions with Amazon OpenSearch Service
Beyond real-time fraud detection, it's crucial to have comprehensive visibility into all transaction data for monitoring, auditing, and analysis. To achieve this, we can integrate Amazon Kinesis Data Firehose with Amazon OpenSearch Service, enabling seamless delivery of streaming data for visualization.
Architecture Overview
In this extended architecture:
-
Kinesis Data Streams continues to ingest transaction data from the Node.js application.
-
AWS Managed Apache Flink processes the data in real-time to detect fraudulent patterns and sends alerts via Amazon SNS.
-
Amazon Kinesis Data Firehose acts as an additional consumer of the same Kinesis Data Stream, delivering all transaction data to Amazon OpenSearch Service for visualization.
This setup allows for parallel processing—real-time fraud detection and comprehensive data visualization—without impacting the performance of either pipeline.
Setting Up Kinesis Data Firehose to OpenSearch
Step 1 - Create an OpenSearch Service Domain:
- Navigate to the Amazon OpenSearch Service console.
- Choose "Create Domain" and configure necessary settings
- Choose a Deployment Option, whether to create the Domain with Standby(This approach will reserve a node in a AZ as a standby node) or without Standby
- Select appropriate Instance Family, Type and EBS Volumes for the nodes to be placed
- Select the number of Master, Data and Coordinate nodes required, For High availability select more than 3 nodes accross AZ's
- For Networking, give public access as we will be accessing the Open Search publicly not from a VPC
- For Fine Grained Access - Create a Master User name and password
- Leave everything as default and create the domain, this might take up to 15 -20 minutes to be fully active
Step 2 - Create a Kinesis Data Firehose Delivery Stream:
- Go to the Amazon Kinesis console and select "Create delivery stream."
- For the source, choose "Kinesis Data Stream" and select your existing stream.
- For the destination, choose "Amazon OpenSearch Service" and specify the domain you created.
- Under Transform records section - Turn on data transformation, and create a python lambda function as we will be using this function to transform the record to add coordinates and date field in each record for detailed visualization in the OpenSearch.
- In the Destination Settings Section, Add the Index name, this will create a index in the AWS OpenSearch when the data is inserted, we can modify the index template and mappings in the OpenSearch Dev Tools.
- In the Backup settings, configure the S3 bucket to store messages that failed to be delivered to OpenSearch by the Firehose delivery stream
- Configure buffering hints, IAM roles as needed.
Lambda Transformer
import base64import jsonfrom datetime import datetime# Static mapping of country codes to lat/lon (example only)COUNTRY_COORDS = {"CA": {"lat": 56.1304, "lon": -106.3468}, # Canada"US": {"lat": 37.0902, "lon": -95.7129}, # USA"IN": {"lat": 20.5937, "lon": 78.9629}, # India"NY": {"lat": 40.7128, "lon": -74.0060}, # New York"TX": {"lat": 31.9686, "lon": -99.9018}, # Texas"FL": {"lat": 27.9944, "lon": -81.7603}, # Florida"WA": {"lat": 47.7511, "lon": -120.7401}, # Washington# Add more countries as needed}def lambda_handler(event, context):output = []for record in event['records']:payload = base64.b64decode(record['data']).decode('utf-8')data = json.loads(payload)print(data, payload)# Convert timestamp to datetime objecttimestamp_ms = data.get("timestamp")if timestamp_ms:timestamp_s = timestamp_ms / 1000 # Convert milliseconds to secondsdate_obj = datetime.utcfromtimestamp(timestamp_s)# Format the datetime object to a stringformatted_date = date_obj.strftime('%Y-%m-%d %H:%M:%S')data["date"] = formatted_date# Add coordinates based on locationcountry_code = data.get("location")if country_code in COUNTRY_COORDS:data["coordinates"] = COUNTRY_COORDS[country_code]# Prepare transformed payloadtransformed_payload = json.dumps(data) + "\n"output_record = {'recordId': record['recordId'],'result': 'Ok','data': base64.b64encode(transformed_payload.encode('utf-8')).decode('utf-8')}output.append(output_record)return {'records': output}
IAM Roles for AWS Kinesis Fire house:
By default, AWS creates a IAM role to access the S3 Bucket(to store logs in case of message delivery failover), Lambda Function(For data transformation) and Cloudwatch logs, In addition to these permissions, we need to grant Kinesis permission to poll from the Kinesis Data Stream and deliver to the AWS OpenSearch Service
{"Version": "2012-10-17","Statement": [{"Sid":"Elasticsearch-Statement","Effect": "Allow","Action": ["es:*"],"Resource": "arn:aws:es:us-east-1:011528266190:domain/elasticsearchdomain-1"},{"Sid": "KinesisStream-statement","Effect": "Allow","Action": ["kinesis:ListShards","kinesis:GetShardIterator","kinesis:GetRecords","kinesis:DescribeStream"],"Resource": "arn:aws:kinesis:us-east-1:011528266190:stream/transaction-stream"}]}
Configuring OpenSearch Access for Kinesis Firehose:
Once your Kinesis Firehose stream is set up to deliver data to OpenSearch, it's important to ensure Firehose is authorized to write to the domain. This is especially critical if your OpenSearch domain uses Fine-Grained Access Control (FGAC).
Why Role Mapping Is Needed
Even though Kinesis Firehose has IAM permissions to call OpenSearch APIs, OpenSearch has its own access control layer. If you're using FGAC, OpenSearch enforces additional role-based security. This means you must map the Firehose IAM role as a backend role within OpenSearch.
Without this role mapping:
-
OpenSearch will reject incoming writes with
403 Forbidden
orUnauthorized
errors. -
Your delivery stream status will show failed deliveries in CloudWatch logs.
How to Map the IAM Role in OpenSearch
-
Open OpenSearch Dashboards.
-
Navigate to Security > Roles.
-
Choose the role you want Firehose to use (e.g.,
all_access
or a custom index writer role). -
Click on Mapped users and add the ARN of the IAM role used by your Firehose stream.
-
Save the changes.
With the setup complete, incoming data from the Kinesis Data Stream will be ingested by Kinesis Data Firehose and indexed into OpenSearch, enabling real-time visualization and analysis within the OpenSearch Dashboards.
To visualize data in OpenSearch Dashboard:
- Go to OpenSearch Service -> Click on the dashboard url provided by AWS
- Login with the master credentials
- Now you will see empty screen with no dashboards
- Navigate to the Dev Tools from the left panel, and type in the following command to verify if the index is created and data is visible
GET _cat/indices -> Lists all indexes available
GET <your_index_name>/_search -> Lists all document within the index
- To generate visualizations such as pie charts, line graphs, and other analytical metrics, navigate to the Visualize section from the left-hand menu in OpenSearch Dashboards. Create your desired visualization using the available data sources, then add the resulting visual component to a custom dashboard for real-time monitoring and analysis.
Fraud Detection dashboard
Screenshot of SNS Notification when a fraud is detected :
S3 Receives the messages that were not successfully delivered to OpenSearch.
Optional Step: Storing Transaction Data with Kinesis Data Firehose for Further Analysis
Kinesis Data Firehose adds a valuable layer to your fraud detection system by enabling seamless data storage and historical analysis capabilities. This service automatically delivers your streaming data to Amazon S3, creating a robust archive for future reference and analysis.
- Automatic Scaling: Handles varying data volumes without manual intervention
- Data Transformation: Converts data formats on-the-fly before storage(using AWS Lambda -serverless function)
- Cost-Effective: Pay only for the actual data transferred
- Zero Maintenance: Fully managed service requiring no infrastructure setup
Data Storage Patterns for S3:
- Time-based partitioning
- Custom prefixes for efficient querying
- Compression options for storage optimization
- Automatic data encryption at rest
The stored transaction data in S3 enables powerful analytical capabilities through various AWS services:
- Amazon Athena: Run SQL queries directly on S3 data
- Amazon QuickSight: Create visual dashboards and reports
- Amazon SageMaker: Build ML models using historical fraud patterns
Storage Configuration Best Practices:
- Set appropriate buffer sizes and intervals
- Enable error logging to separate S3 prefix
- Implement lifecycle policies for cost management
- Use data partitioning for query optimization
This historical data repository becomes invaluable for identifying long-term fraud patterns and improving detection algorithms. The combination of real-time processing and historical analysis creates a comprehensive fraud detection strategy that adapts to emerging threats.
Conclusion: Embracing the Future of Fraud Detection Technology with AWS Services
The integration of AWS Kinesis and Apache Flink represents a powerful approach to modern fraud detection. This combination delivers real-time processing capabilities essential for identifying and preventing fraudulent activities in today's fast-paced digital landscape.
The architecture we've explored offers distinct advantages:
- Real-Time Processing: AWS Kinesis Data Streams capture transaction data instantly
- Smart Detection: AWS Managed Apache Flink applies sophisticated fraud detection algorithms
- Instant Alerts: AWS SNS Topics deliver immediate notifications to stakeholders
- Data Preservation: AWS Kinesis Firehose stores transaction records for future analysis
Your fraud detection system gains adaptability and scalability through these AWS services. The platform evolves with your needs, handling increasing transaction volumes while maintaining performance. Machine learning capabilities enable the system to recognize new fraud patterns, strengthening your security posture.
The future of fraud detection lies in intelligent, automated systems that learn and adapt. AWS services provide the foundation for building such systems, offering:
- Seamless integration with existing infrastructure
- Cost-effective scaling options
- Advanced analytics capabilities
- Robust security features
Your organization can stay ahead of fraudulent activities by implementing this AWS-based fraud detection mechanism. The combination of real-time processing, intelligent analysis, and immediate alerting creates a robust defense against financial threats.
💡 Scaling Further with Enhanced Fan-Out
As the scale and complexity of fraud detection systems grow, it's essential to ensure that the underlying architecture can keep up with performance demands—especially when every millisecond counts.
One powerful feature in Amazon Kinesis Data Streams that helps take this to the next level is Enhanced Fan-Out (EFO).
With EFO, each consumer gets its own dedicated 2 MB/second read throughput, allowing multiple applications—like analytics, monitoring, alerting, or storage—to read from the same stream in parallel and without interference. This is a game-changer compared to shared throughput models, where consumers compete for bandwidth.
I have illustrated the difference between Standard and Enhanced Fan-Out Architecture below:
Why use Enhanced Fan-Out for fraud detection?
-
⚡ Lower latency: Near-instant delivery of data to Flink applications, improving fraud detection speed.
-
🚀 High scalability: Easily add new consumers (e.g., alerting systems, audit logs, AI model feedback loops) without re-architecting.
-
🔄 Independent processing: Different consumers can process the same data in real time, independently and concurrently.
Imagine a scenario where:
-
One Flink app flags suspicious transactions.
-
Another app logs all events into a data lake.
-
A third app triggers real-time SMS/email alerts via SNS.
With Enhanced Fan-Out, all of these systems can run simultaneously, without slowing each other down—delivering a true real-time, multi-layered fraud prevention strategy.
🚨 Final Thoughts
While this blog focused on building a real-time fraud detection pipeline using AWS Kinesis Data Streams and Apache Flink, implementing Enhanced Fan-Out unlocks next-level performance and flexibility. For organizations handling high-volume, latency-sensitive data streams, it's a strategic upgrade that brings both technical robustness and business agility.