Fraudulent activities are a significant threat to businesses in today's digital world. They can cause financial losses and security breaches, affecting both companies and their customers. That's why real-time fraud detection is crucial for modern transaction systems.
The challenge we face is processing large amounts of transactions instantly and accurately identifying suspicious patterns. Traditional methods of batch processing aren't effective in detecting fraud as it happens, which leads to delayed responses and potential financial damages.
Fortunately, there are powerful solutions available: AWS Kinesis, Apache Flink, Kinesis Firehose, and OpenSearch. Together, they form a robust real-time fraud detection and analytics pipeline capable of:
AWS Kinesis acts as the central data stream, ingesting high-throughput transaction data in real time. This data is then processed in parallel by two consumers:
Apache Flink (via Kinesis Data Analytics) analyzes transaction streams for fraudulent behavior—such as detecting users with more than six consecutive high-value transactions—and sends real-time alerts through SNS.
Kinesis Data Firehose delivers a continuous stream of all transactions directly into Amazon OpenSearch, enabling teams to visualize, search, and analyze transactions over time with dashboards and queries.
This architecture brings several benefits to businesses:
By integrating these AWS services, we build a comprehensive, real-time fraud detection and analytics system that not only acts instantly on threats but also supports ongoing monitoring and insights through powerful search and visualization tools.
AWS Kinesis is Amazon's solution for real-time data streaming and analytics. This fully managed service enables you to collect, process, and analyze streaming data at any scale, making it a crucial tool for modern data-driven applications.
AWS Kinesis shines in scenarios requiring immediate data processing. The service can handle various data types:
The architecture of AWS Kinesis follows a producer-consumer model. Data producers send records to Kinesis streams, while consumers retrieve and process these records. Each stream maintains a sequence of data records organized in shards, providing ordered data delivery and dedicated throughput per shard.
Kinesis offers built-in resilience through automatic data replication across multiple AWS availability zones. The service manages the underlying infrastructure, eliminating the need for manual server provisioning or cluster management.
The pricing model follows a pay-as-you-go structure based on the amount of data ingested, stored, and processed. This flexibility allows you to scale your streaming applications cost- effectively while maintaining high performance and reliability.
The core of the data pipeline begins with AWS Kinesis Data Streams, acting as the central nervous system for incoming transaction data. This real-time stream feeds directly into AWS Managed Apache Flink, where custom fraud detection algorithms perform immediate analysis. Upon identifying suspicious activity, Flink triggers instant alerts via Amazon SNS, ensuring immediate notification of potential threats.
To gain deeper insights beyond immediate detection, a parallel data flow leverages AWS Kinesis Data Firehose. Firehose subscribes to the relevant data stream (either the raw transactions or the processed output from Flink) and efficiently delivers it to Amazon OpenSearch Service. This integration enables the creation of interactive dashboards and visualizations, allowing analysts to explore patterns, user behaviors, and trends associated with fraudulent activities. This visual layer provides crucial context and aids in understanding the "how" and "why" behind the detected anomalies.
Optionally, Firehose can also be configured to deliver data to Amazon S3 for long-term archival, audit trails, or subsequent batch processing and machine learning analysis.
In essence, the data flows as follows:
Connectivity between services is managed as follows:
The entire architecture is designed for scalability, automatically adjusting to fluctuations in transaction volume. AWS Managed Apache Flink handles state management and exactly-once processing, while Firehose and OpenSearch are built for handling large-scale data ingestion and analysis. This integrated approach not only detects fraud in real-time but also provides the visual tools necessary to understand and combat evolving fraudulent behaviors.
Amazon Kinesis Data Streams (KDS) is a powerful service for ingesting real-time data at scale. But to truly leverage its power, you need producers—the components responsible for pushing data into your Kinesis stream.
So who—or what—are these producers? Let’s break it down.
🛠️ 1. Kinesis Agent
The Kinesis Agent is a standalone Java application you install on your server. It's perfect for collecting and sending data like log files or metrics from on-premise or EC2 servers directly into your Kinesis Data Stream.
🧰 2. Kinesis Producer Library (KPL)
The KPL offers a higher-level abstraction over the lower-level Kinesis API. It simplifies the process of batching, queuing, and retrying records.
If you're building custom applications, you can use AWS SDKs available in multiple
🌐 Languages Supported: Java, Node.js, Python, Ruby, Go, and more
✍️ APIs: Use PutRecord
or PutRecords
to write data
🔧 Use case: Real-time analytics, custom dashboards, event-driven apps
💻 4. AWS CLI
Sometimes, you just want something quick and dirty—enter the AWS Command Line Interface.
🧪 Best for: Testing, scripting, and ad-hoc record insertion
⚡ Fast and simple: Fire and forget test data from your terminal
Certain AWS services can push data straight into Kinesis—no code, no hassle.
🔌 Services that integrate directly:
Amazon API Gateway (proxy integration)
AWS Lambda (as a proxy)
AWS IoT Core, EventBridge, and others
☁️ Use case: Serverless pipelines with minimal operational overhead
Here's how you can implement the data producer using Python AWS-SDK:
const AWS = require('aws-sdk');
// Configure AWS SDK
AWS.config.update({
region: 'us-east-1' // change if needed
});
const kinesis = new AWS.Kinesis();
const STREAM_NAME = 'transaction-stream';
// Helper: generate a random transaction
function generateTransaction() {
const userId = 'user' + Math.floor(Math.random() * 5 + 1); // user1 to user5
const transactionId = 'tx' + Math.floor(Math.random() * 1000000);
const amount = Math.random() < 0.2 ? 15000 : Math.floor(Math.random() * 1000); // occasionally trigger fraud
const timestamp = Date.now();
const location = ['NY', 'CA', 'TX', 'FL', 'WA','IN'][Math.floor(Math.random() * 5)];
return {
transactionId,
userId,
amount,
timestamp,
location
};
}
// Send transaction to Kinesis
function sendTransaction() {
const txn = generateTransaction();
const payload = JSON.stringify(txn);
const params = {
Data: payload,
PartitionKey: txn.userId, // ensures same user lands in same shard
StreamName: STREAM_NAME
};
kinesis.putRecord(params, (err, data) => {
if (err) {
console.error('Error sending to Kinesis:', err);
} else {
console.log('Sent:', payload);
}
});
}
// Send a new transaction every second
setInterval(sendTransaction, 1000);
The above code simulates real time transactions using Node.js This script:
To begin processing transactions in real time, we need a Kinesis Data Stream—a highly scalable and durable service for real-time data ingestion.
This stream acts as the entry point for all transaction records that our producer (Node.js script or any backend app) will send. Each incoming record is temporarily stored across shards, which determine the stream’s throughput capacity.
Setting up Kinesis Data Streams requires careful planning and implementation to ensure efficient data capture for your fraud detection system. Here's a detailed guide to configure your stream:
Open the AWS Management Console and:
Once the stream is created, it may take a few seconds to become active
⚙️ 1. Throughput Planning
Before you dive in, make sure your stream is sized for success.
📊 Shard Limits:
Each shard supports 1 MB/sec write and 2 MB/sec read throughput.
🧮 Estimate Before You Build:
Calculate the average record size and records per second to determine the number of shards you need.
🔁 Enhanced Fan-Out for Multiple Consumers:
If you have multiple downstream consumers (e.g., Apache Flink + Firehose), enable Enhanced Fan-Out. This gives each consumer a dedicated 2 MB/sec read pipe, reducing contention.
📈 2. Monitoring & Scaling
Kinesis offers great tools to help you watch and adapt in real time.
📉 CloudWatch Metrics to Monitor:
IncomingBytes
ReadProvisionedThroughputExceeded
These help detect when you're hitting shard limits.
📡 Set Up CloudWatch Alarms:
Alert on usage thresholds or sudden spikes in traffic.
🔐 3. Security & Access Control
Keep your data safe and your access policies tight.
🔑 Use IAM Policies:
Define who can read from and write to your streams.
🛡️ Enable Server-Side Encryption:
Use AWS KMS to protect data in transit and at rest.
📦 4. Data Management & Replay
Make your data work harder for longer.
🕒 Data Retention:
Adjust from the default 24 hours up to 7 days if you need more time for reprocessing or compliance.
⏪ Replay with TRIM_HORIZON or AT_TIMESTAMP:
Useful for replaying data into Flink or other consumers for backfills or failure recovery.
🧯 Backup with Firehose:
For long-term storage, attach Kinesis Data Firehose to send data to Amazon S3 automatically.
By following these best practices, you ensure your Kinesis data pipeline is resilient, efficient, and scalable—ready to handle real-time workloads like fraud detection, log analytics, and IoT telemetry.
Apache Flink's event processing capabilities shine in fraud detection scenarios through its ability to analyze streaming data in real-time. AWS Managed Apache Flink simplifies this process by handling infrastructure management, letting you focus on building effective fraud detection logic.
Apache Flink uses several powerful techniques to detect fraudulent patterns:
Here's a practical example of implementing fraud detection logic in Flink using JAVA Maven Project:
Project Structure:
Dependencies:
package com.example.fraud.model;
public class Transaction {
public String transactionId;
public String userId;
public double amount;
public long timestamp;
public String location;
public Transaction() {}
@Override
public String toString() {
return String.format("Transaction[id=%s, user=%s, amount=%.2f, time=%d, location=%s]",
transactionId, userId, amount, timestamp, location);
}
}
SnsPublisher.java
package com.example.fraud.util;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.sns.SnsClient;
import software.amazon.awssdk.services.sns.model.PublishRequest;
public class SnsPublisher {
private static final SnsClient snsClient = SnsClient.builder()
.region(Region.US_EAST_1) // Change region if needed
.build();
private static final String TOPIC_ARN = "arn:aws:sns:us-east-1:011528266190:FraudAlerts"; // Replace with your SNS topic ARN
public static void publishAlert(String message) {
PublishRequest request = PublishRequest.builder()
.topicArn(TOPIC_ARN)
.message(message)
.subject("FRAUD ALERT")
.build();
snsClient.publish(request);
}
}
FraudDetectionJob.java
package com.example.fraud;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
import org.apache.flink.util.Collector;
import com.example.fraud.model.Transaction;
import com.example.fraud.util.SnsPublisher;
import com.fasterxml.jackson.databind.ObjectMapper;
public class FraudDetectionJob {
private static final ObjectMapper objectMapper = new ObjectMapper();
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties consumerConfig = new Properties();
consumerConfig.setProperty("aws.region", "us-east-1");
consumerConfig.setProperty("stream.initial.position", "LATEST");
FlinkKinesisConsumer<String> kinesisConsumer =
new FlinkKinesisConsumer<>("transaction-stream", new SimpleStringSchema(), consumerConfig);
env.getConfig().setAutoWatermarkInterval(1000);
env.addSource(kinesisConsumer)
.map(json -> objectMapper.readValue(json, Transaction.class))
.assignTimestampsAndWatermarks(WatermarkStrategy.<Transaction>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((txn, ts) -> txn.timestamp))
.keyBy(txn -> txn.userId)
.process(new FraudDetector())
.map(Transaction::toString)
.print();
env.execute("Real-Time Fraud Detection Job");
}
public static class FraudDetector extends KeyedProcessFunction<String, Transaction, Transaction> {
private transient ListState<Long> timestampState;
@Override
public void open(Configuration parameters) {
ListStateDescriptor<Long> descriptor = new ListStateDescriptor<>("timestamps", Long.class);
timestampState = getRuntimeContext().getListState(descriptor);
}
@Override
public void processElement(Transaction txn, Context ctx, Collector<Transaction> out) throws Exception {
boolean isFraud = false;
// Rule 1: High amount
if (txn.amount > 10000) {
isFraud = true;
}
// Rule 2: High velocity
long oneMinuteAgo = txn.timestamp - Time.minutes(1).toMilliseconds();
List<Long> timestamps = new ArrayList<>();
System.out.println(timestampState.get());
for (Long ts : timestampState.get()) {
if (ts >= oneMinuteAgo) {
timestamps.add(ts);
}
}
timestamps.add(txn.timestamp);
timestampState.update(timestamps);
if (timestamps.size() > 5) {
isFraud = true;
}
if (isFraud) {
String message = "FRAUD DETECTED: " + txn.toString();
SnsPublisher.publishAlert(message);
out.collect(txn);
}
}
}
}
You can implement various fraud detection patterns using Flink:
1. Velocity Checks
2. Amount Pattern Analysis
Let’s break down the core components of the Flink job:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties consumerConfig = new Properties(); consumerConfig.setProperty("aws.region", "us-east-1"); consumerConfig.setProperty("stream.initial.position", "LATEST");
FlinkKinesisConsumer<String> kinesisConsumer =
new FlinkKinesisConsumer<>("transaction-stream", new SimpleStringSchema(), consumerConfig);
.assignTimestampsAndWatermarks(WatermarkStrategy .forBoundedOutOf Orderness(Duration.ofSeconds(5)) .withTimestampAssigner((txn,ts) -> txn.timestamp))
.keyBy(txn -> txn.userId) .process(new FraudDetector())
if (txn.amount > 10000 || transactionsWithin1Min > 5) {
SnsPublisher.publishAlert(message);
out.collect(txn);
}
ListState timestampState = getRuntimeContext().getListState(...);
.map(Transaction::toString).print();
This code forms the real-time brain of your fraud detection pipeline—processing, detecting, and alerting all in milliseconds.
AWS Managed Flink allows you to run Apache Flink applications without managing the infrastructure. We'll use it to consume transaction data from Kinesis, apply fraud detection logic, and trigger SNS alerts if suspicious activity is detected.
Navigate to your project root and run the following command:
mvn clean package
This will generate a JAR in the target/ directory, e.g., fraud-detection-app-new-0.0.1.jar.
Upload the JAR File to S3
s3://flink-fraud-detection-jars/fraud-detection-app-new-0.0.1.jar
Create AWS Managed Apache Flink in console:
2. Click Create streaming application
Once Apache Flink identifies suspicious patterns, you'll need a reliable system to notify relevant stakeholders immediately. Amazon Simple Notification Service (SNS) provides a powerful solution for sending real-time alerts about potential fraud.
You can find the full source code for this project on GitHub:
JAVA(Apache Flink) - https://github.com/ShalniGerald/aws-kinesis-apache-flink.git
Producer Node.js - https://github.com/ShalniGerald/aws-kinesis-data-stream-producer.git
Beyond real-time fraud detection, it's crucial to have comprehensive visibility into all transaction data for monitoring, auditing, and analysis. To achieve this, we can integrate Amazon Kinesis Data Firehose with Amazon OpenSearch Service, enabling seamless delivery of streaming data for visualization.
In this extended architecture:
Kinesis Data Streams continues to ingest transaction data from the Node.js application.
AWS Managed Apache Flink processes the data in real-time to detect fraudulent patterns and sends alerts via Amazon SNS.
Amazon Kinesis Data Firehose acts as an additional consumer of the same Kinesis Data Stream, delivering all transaction data to Amazon OpenSearch Service for visualization.
This setup allows for parallel processing—real-time fraud detection and comprehensive data visualization—without impacting the performance of either pipeline.
Step 1 - Create an OpenSearch Service Domain:
Step 2 - Create a Kinesis Data Firehose Delivery Stream:
Lambda Transformer
import base64import jsonfrom datetime import datetime# Static mapping of country codes to lat/lon (example only)COUNTRY_COORDS = {"CA": {"lat": 56.1304, "lon": -106.3468}, # Canada"US": {"lat": 37.0902, "lon": -95.7129}, # USA"IN": {"lat": 20.5937, "lon": 78.9629}, # India"NY": {"lat": 40.7128, "lon": -74.0060}, # New York"TX": {"lat": 31.9686, "lon": -99.9018}, # Texas"FL": {"lat": 27.9944, "lon": -81.7603}, # Florida"WA": {"lat": 47.7511, "lon": -120.7401}, # Washington# Add more countries as needed}def lambda_handler(event, context):output = []for record in event['records']:payload = base64.b64decode(record['data']).decode('utf-8')data = json.loads(payload)print(data, payload)# Convert timestamp to datetime objecttimestamp_ms = data.get("timestamp")if timestamp_ms:timestamp_s = timestamp_ms / 1000 # Convert milliseconds to secondsdate_obj = datetime.utcfromtimestamp(timestamp_s)# Format the datetime object to a stringformatted_date = date_obj.strftime('%Y-%m-%d %H:%M:%S')data["date"] = formatted_date# Add coordinates based on locationcountry_code = data.get("location")if country_code in COUNTRY_COORDS:data["coordinates"] = COUNTRY_COORDS[country_code]# Prepare transformed payloadtransformed_payload = json.dumps(data) + "\n"output_record = {'recordId': record['recordId'],'result': 'Ok','data': base64.b64encode(transformed_payload.encode('utf-8')).decode('utf-8')}output.append(output_record)return {'records': output}
IAM Roles for AWS Kinesis Fire house:
By default, AWS creates a IAM role to access the S3 Bucket(to store logs in case of message delivery failover), Lambda Function(For data transformation) and Cloudwatch logs, In addition to these permissions, we need to grant Kinesis permission to poll from the Kinesis Data Stream and deliver to the AWS OpenSearch Service
{"Version": "2012-10-17","Statement": [{"Sid":"Elasticsearch-Statement","Effect": "Allow","Action": ["es:*"],"Resource": "arn:aws:es:us-east-1:011528266190:domain/elasticsearchdomain-1"},{"Sid": "KinesisStream-statement","Effect": "Allow","Action": ["kinesis:ListShards","kinesis:GetShardIterator","kinesis:GetRecords","kinesis:DescribeStream"],"Resource": "arn:aws:kinesis:us-east-1:011528266190:stream/transaction-stream"}]}
Configuring OpenSearch Access for Kinesis Firehose:
Once your Kinesis Firehose stream is set up to deliver data to OpenSearch, it's important to ensure Firehose is authorized to write to the domain. This is especially critical if your OpenSearch domain uses Fine-Grained Access Control (FGAC).
Why Role Mapping Is Needed
Even though Kinesis Firehose has IAM permissions to call OpenSearch APIs, OpenSearch has its own access control layer. If you're using FGAC, OpenSearch enforces additional role-based security. This means you must map the Firehose IAM role as a backend role within OpenSearch.
Without this role mapping:
OpenSearch will reject incoming writes with 403 Forbidden
or Unauthorized
errors.
Your delivery stream status will show failed deliveries in CloudWatch logs.
How to Map the IAM Role in OpenSearch
Open OpenSearch Dashboards.
Navigate to Security > Roles.
Choose the role you want Firehose to use (e.g., all_access
or a custom index writer role).
Click on Mapped users and add the ARN of the IAM role used by your Firehose stream.
Save the changes.
With the setup complete, incoming data from the Kinesis Data Stream will be ingested by Kinesis Data Firehose and indexed into OpenSearch, enabling real-time visualization and analysis within the OpenSearch Dashboards.
To visualize data in OpenSearch Dashboard:
GET _cat/indices -> Lists all indexes available
GET <your_index_name>/_search -> Lists all document within the index
Fraud Detection dashboard
Screenshot of SNS Notification when a fraud is detected :
S3 Receives the messages that were not successfully delivered to OpenSearch.
Kinesis Data Firehose adds a valuable layer to your fraud detection system by enabling seamless data storage and historical analysis capabilities. This service automatically delivers your streaming data to Amazon S3, creating a robust archive for future reference and analysis.
Data Storage Patterns for S3:
The stored transaction data in S3 enables powerful analytical capabilities through various AWS services:
Storage Configuration Best Practices:
This historical data repository becomes invaluable for identifying long-term fraud patterns and improving detection algorithms. The combination of real-time processing and historical analysis creates a comprehensive fraud detection strategy that adapts to emerging threats.
The integration of AWS Kinesis and Apache Flink represents a powerful approach to modern fraud detection. This combination delivers real-time processing capabilities essential for identifying and preventing fraudulent activities in today's fast-paced digital landscape.
The architecture we've explored offers distinct advantages:
Your fraud detection system gains adaptability and scalability through these AWS services. The platform evolves with your needs, handling increasing transaction volumes while maintaining performance. Machine learning capabilities enable the system to recognize new fraud patterns, strengthening your security posture.
The future of fraud detection lies in intelligent, automated systems that learn and adapt. AWS services provide the foundation for building such systems, offering:
Your organization can stay ahead of fraudulent activities by implementing this AWS-based fraud detection mechanism. The combination of real-time processing, intelligent analysis, and immediate alerting creates a robust defense against financial threats.
As the scale and complexity of fraud detection systems grow, it's essential to ensure that the underlying architecture can keep up with performance demands—especially when every millisecond counts.
One powerful feature in Amazon Kinesis Data Streams that helps take this to the next level is Enhanced Fan-Out (EFO).
With EFO, each consumer gets its own dedicated 2 MB/second read throughput, allowing multiple applications—like analytics, monitoring, alerting, or storage—to read from the same stream in parallel and without interference. This is a game-changer compared to shared throughput models, where consumers compete for bandwidth.
I have illustrated the difference between Standard and Enhanced Fan-Out Architecture below:
⚡ Lower latency: Near-instant delivery of data to Flink applications, improving fraud detection speed.
🚀 High scalability: Easily add new consumers (e.g., alerting systems, audit logs, AI model feedback loops) without re-architecting.
🔄 Independent processing: Different consumers can process the same data in real time, independently and concurrently.
Imagine a scenario where:
One Flink app flags suspicious transactions.
Another app logs all events into a data lake.
A third app triggers real-time SMS/email alerts via SNS.
With Enhanced Fan-Out, all of these systems can run simultaneously, without slowing each other down—delivering a true real-time, multi-layered fraud prevention strategy.
While this blog focused on building a real-time fraud detection pipeline using AWS Kinesis Data Streams and Apache Flink, implementing Enhanced Fan-Out unlocks next-level performance and flexibility. For organizations handling high-volume, latency-sensitive data streams, it's a strategic upgrade that brings both technical robustness and business agility.