ipasis
Blog/Engineering

Building a High-Performance Fraud Rule Engine with IP Intelligence

February 15, 20268 min read

The Architecture of Real-Time Fraud Detection

A robust fraud rule engine operates on a simple premise: context determines trust. Raw request headers alone are insufficient for detecting sophisticated actors using residential proxies or hijacked subnets. To make accurate decisions, your pipeline must enrich incoming requests with metadata before execution logic applies.

An effective architecture typically follows this flow:

  1. Ingestion: The edge load balancer receives the request.
  2. Enrichment: The system asynchronously queries an IP intelligence provider (like IPASIS) to retrieve connection types (VPN, Proxy, Tor), geolocation, and ASN details.
  3. Evaluation: The rule engine processes the enriched data against a set of logic gates.
  4. Decision: The system returns ALLOW, BLOCK, or CHALLENGE (2FA/Captcha).

Data Points That Matter

When designing rules, focus on high-fidelity signals. Generic geolocation is often noisy; focus on infrastructure indicators instead:

  • Connection Type: Is the IP associated with a data center, VPN, or Tor exit node?
  • ISP/ASN: Is the traffic originating from a residential ISP (Comcast, AT&T) or a cloud provider (AWS, DigitalOcean)?
  • Geo-Velocity: Has the user's IP moved an impossible distance in a short timeframe?

Implementation: A Python-based Scoring Engine

Below is a simplified implementation of a scoring engine using Python. This class fetches IP data and calculates a composite risk score based on weighted rules.

import requests
from dataclasses import dataclass
from typing import Dict, Any

@dataclass
class FraudDecision:
    action: str  # ALLOW, BLOCK, CHALLENGE
    risk_score: int
    reasons: list

class FraudEngine:
    def __init__(self, api_key: str):
        self.api_key = api_key
        self.api_endpoint = "https://api.ipasis.com/v1/"
        # Define thresholds
        self.BLOCK_THRESHOLD = 80
        self.CHALLENGE_THRESHOLD = 50

    def get_ip_data(self, ip_address: str) -> Dict[str, Any]:
        try:
            response = requests.get(
                f"{self.api_endpoint}{ip_address}",
                headers={"X-API-Key": self.api_key},
                timeout=0.5
            )
            return response.json() if response.status_code == 200 else {}
        except requests.RequestException:
            # Fail open or closed depending on business logic
            return {}

    def evaluate(self, ip_address: str, transaction_value: float) -> FraudDecision:
        data = self.get_ip_data(ip_address)
        score = 0
        reasons = []

        # Rule 1: Proxy/VPN Detection (+40 risk)
        if data.get("security", {}).get("is_proxy") or data.get("security", {}).get("is_vpn"):
            score += 40
            reasons.append("Anonymous Connection")

        # Rule 2: Tor Detection (+100 risk - Immediate Block)
        if data.get("security", {}).get("is_tor"):
            score += 100
            reasons.append("Tor Exit Node")

        # Rule 3: High Value Transaction on Datacenter IP (+30 risk)
        connection_type = data.get("connection", {}).get("type")
        if connection_type == "hosting" and transaction_value > 500:
            score += 30
            reasons.append("High Value on Hosting IP")

        # Determine Action
        if score >= self.BLOCK_THRESHOLD:
            return FraudDecision("BLOCK", score, reasons)
        elif score >= self.CHALLENGE_THRESHOLD:
            return FraudDecision("CHALLENGE", score, reasons)
        
        return FraudDecision("ALLOW", score, reasons)

# Usage
engine = FraudEngine(api_key="YOUR_IPASIS_KEY")
decision = engine.evaluate("192.168.1.1", 1200.00)
print(f"Action: {decision.action}, Reasons: {decision.reasons}")

Performance and Caching

Latency is critical in transaction processing. API lookups add network overhead. To mitigate this in high-throughput environments:

  1. Redis Caching: Cache the API response for the IP address. IP reputation changes slowly; a TTL of 10-60 minutes is usually acceptable.
  2. Async/Non-blocking I/O: In Node.js or Go, fire the IP lookup asynchronously while validating other static rules (email format, bin checks) to run in parallel.

Example: Caching Strategy

func GetIPData(ip string) (IPData, error) {
    // Check Redis first
    val, err := redisClient.Get(ctx, ip).Result()
    if err == nil {
        return Deserialize(val), nil
    }

    // Cache Miss: Call IPASIS API
    data := CallIPasisAPI(ip)
    
    // Write to Redis with TTL
    redisClient.Set(ctx, ip, Serialize(data), 15 * time.Minute)
    
    return data, nil
}

Handling False Positives

Binary rules (e.g., "Block all VPNs") often lead to false positives, as legitimate users utilize VPNs for privacy. A weighted scoring system, as demonstrated above, is superior.

  • Datacenter IP + High Value Transaction: High Risk.
  • Datacenter IP + Login to existing account: Low Risk.

Contextualizing the IP intelligence with user behavior reduces friction for legitimate customers while stopping automated attacks.

FAQ

Q: Should I block all AWS/GCP traffic?

A: Generally, yes, for B2C login/checkout flows. Legitimate human users rarely browse from AWS EC2 instances. However, whitelist API endpoints that expect server-to-server communication.

Q: How does latency impact the user experience?

A: IP intelligence APIs like IPASIS are optimized for sub-100ms responses. When combined with caching, the impact is negligible compared to the cost of a chargeback.

Q: Can I use this for rate limiting?

A: Yes. Instead of rate limiting by IP (which proxies bypass), rate limit by ASN or Device Fingerprint. If 500 requests come from a single residential ASN in 1 minute, throttle the entire subnet.

Secure Your Stack with IPASIS

Building a fraud engine requires data you can trust. IPASIS provides enterprise-grade IP intelligence with precision detection for VPNs, proxies, and crawler networks.

Get your free API key and start enriching your security logs today.

Start detecting VPNs and Bots today.

Identify anonymized traffic instantly with IPASIS.

Get API Key