Broken Access Control is a critical security vulnerability that occurs when applications fail to properly enforce authorization policies, allowing users to access resources, perform actions, or view data beyond their intended privileges. This vulnerability manifests when access control mechanisms are improperly implemented, missing, or can be bypassed through manipulation of requests, URLs, or application state.

Impact Severity: Broken access control can lead to unauthorized data disclosure, data modification, privilege escalation, account takeover, and complete system compromise. According to OWASP, this vulnerability consistently ranks as the #1 web application security risk.

Attack Vectors: Attackers exploit these vulnerabilities through URL manipulation, parameter tampering, token modification, forced browsing, privilege escalation techniques, and session hijacking.

Common Vulnerability Patterns:

1. Insecure Direct Object References (IDOR)

GET /api/users/1234/profile HTTP/1.1
Authorization: Bearer user_token_5678

Attacker changes user ID to access other users’ data:

GET /api/users/9999/profile HTTP/1.1
Authorization: Bearer user_token_5678

2. Vertical Privilege Escalation

POST /admin/users HTTP/1.1
Content-Type: application/json
Authorization: Bearer regular_user_token
 
{"username": "newadmin", "role": "administrator"}

3. Horizontal Privilege Escalation

GET /dashboard?user_id=victim_user_id HTTP/1.1
Cookie: session=attacker_session_id

4. Forced Browsing (Directory Traversal)

GET /admin/panel HTTP/1.1
GET /backup/database.sql HTTP/1.1
GET /../../../etc/passwd HTTP/1.1

5. Method-based Bypass

# If POST is protected but PUT isn't
PUT /api/users/1234 HTTP/1.1
Content-Type: application/json
 
{"role": "admin", "permissions": ["all"]}

6. Parameter Tampering

<!-- Hidden form fields manipulation -->
<input type="hidden" name="user_role" value="admin">
<input type="hidden" name="price" value="0.01">

7. JSON/XML Parameter Injection

{
  "user_id": 1234,
  "role": "user",
  "admin": true,
  "permissions": ["read", "write", "delete"]
}

Comprehensive Prevention Strategies

1. Implement Robust Authorization Frameworks

Role-Based Access Control (RBAC) Implementation:

from enum import Enum
from typing import List, Dict, Any
 
class Role(Enum):
    ADMIN = "admin"
    USER = "user"
    MODERATOR = "moderator"
    GUEST = "guest"
 
class Permission(Enum):
    READ = "read"
    WRITE = "write"
    DELETE = "delete"
    ADMIN = "admin"
 
class AccessControl:
    def __init__(self):
        self.role_permissions = {
            Role.ADMIN: [Permission.READ, Permission.WRITE, Permission.DELETE, Permission.ADMIN],
            Role.MODERATOR: [Permission.READ, Permission.WRITE, Permission.DELETE],
            Role.USER: [Permission.READ, Permission.WRITE],
            Role.GUEST: [Permission.READ]
        }
 
    def check_permission(self, user_role: Role, required_permission: Permission) -> bool:
        """Check if user role has required permission."""
        return required_permission in self.role_permissions.get(user_role, [])
 
    def authorize_resource_access(self, user_id: int, resource_owner_id: int,
                                user_role: Role, required_permission: Permission) -> bool:
        """Authorize access to a specific resource."""
 
        # Check if user has the required permission
        if not self.check_permission(user_role, required_permission):
            return False
 
        # For non-admin users, ensure they can only access their own resources
        if user_role != Role.ADMIN and user_id != resource_owner_id:
            return False
 
        return True
 
# Usage example
access_control = AccessControl()
 
def get_user_profile(requesting_user_id: int, target_user_id: int, user_role: Role):
    """Secure endpoint for getting user profile."""
 
    if not access_control.authorize_resource_access(
        requesting_user_id, target_user_id, user_role, Permission.READ
    ):
        raise PermissionError("Access denied: Insufficient privileges")
 
    return fetch_user_profile(target_user_id)

2. Server-Side Authorization Enforcement

Middleware-based Authorization:

from functools import wraps
from flask import request, jsonify, g
 
def require_permission(permission: Permission):
    """Decorator to enforce permission requirements."""
    def decorator(f):
        @wraps(f)
        def decorated_function(*args, **kwargs):
            # Extract user from authenticated session
            user = g.current_user
 
            if not user:
                return jsonify({"error": "Authentication required"}), 401
 
            if not access_control.check_permission(user.role, permission):
                return jsonify({"error": "Insufficient privileges"}), 403
 
            return f(*args, **kwargs)
        return decorated_function
    return decorator
 
def require_resource_ownership():
    """Decorator to ensure user can only access their own resources."""
    def decorator(f):
        @wraps(f)
        def decorated_function(*args, **kwargs):
            user = g.current_user
            resource_id = kwargs.get('user_id') or request.view_args.get('user_id')
 
            # Admins can access any resource
            if user.role == Role.ADMIN:
                return f(*args, **kwargs)
 
            # Regular users can only access their own resources
            if str(user.id) != str(resource_id):
                return jsonify({"error": "Access denied"}), 403
 
            return f(*args, **kwargs)
        return decorated_function
    return decorator
 
# Usage in Flask routes
@app.route('/api/users/<int:user_id>/profile', methods=['GET'])
@require_permission(Permission.READ)
@require_resource_ownership()
def get_user_profile(user_id):
    return jsonify(get_user_data(user_id))

3. Secure Object Reference Implementation

Indirect Object References:

import uuid
from typing import Dict, Optional
 
class SecureObjectReference:
    def __init__(self):
        self.reference_map: Dict[str, int] = {}
        self.reverse_map: Dict[int, str] = {}
 
    def create_reference(self, actual_id: int, user_id: int) -> str:
        """Create secure reference for an object."""
        # Generate UUID-based reference
        reference = str(uuid.uuid4())
 
        # Store mapping with user context
        key = f"{user_id}:{reference}"
        self.reference_map[key] = actual_id
        self.reverse_map[actual_id] = reference
 
        return reference
 
    def resolve_reference(self, reference: str, user_id: int) -> Optional[int]:
        """Resolve secure reference to actual ID."""
        key = f"{user_id}:{reference}"
        return self.reference_map.get(key)
 
# Usage example
secure_refs = SecureObjectReference()
 
@app.route('/api/documents/<reference>')
@require_permission(Permission.READ)
def get_document(reference: str):
    user = g.current_user
 
    # Resolve secure reference to actual document ID
    document_id = secure_refs.resolve_reference(reference, user.id)
 
    if not document_id:
        return jsonify({"error": "Document not found"}), 404
 
    # Additional authorization check
    if not user_can_access_document(user.id, document_id):
        return jsonify({"error": "Access denied"}), 403
 
    return jsonify(get_document_data(document_id))

4. Input Validation and Sanitization

from marshmallow import Schema, fields, validate, ValidationError
 
class UserUpdateSchema(Schema):
    """Schema for validating user update requests."""
    username = fields.Str(validate=validate.Length(min=3, max=50))
    email = fields.Email()
    # Explicitly exclude sensitive fields
    role = fields.Str(dump_only=True)  # Read-only field
    permissions = fields.List(fields.Str(), dump_only=True)
    is_admin = fields.Bool(dump_only=True)
 
def validate_user_input(json_data: dict, user_role: Role) -> dict:
    """Validate and sanitize user input based on role."""
 
    # Regular users cannot modify role-related fields
    if user_role != Role.ADMIN:
        sensitive_fields = ['role', 'permissions', 'is_admin', 'account_status']
        for field in sensitive_fields:
            if field in json_data:
                raise ValidationError(f"Field '{field}' cannot be modified")
 
    schema = UserUpdateSchema()
    try:
        return schema.load(json_data)
    except ValidationError as err:
        raise ValidationError(f"Validation failed: {err.messages}")

5. Session Security and Token Management

import jwt
import hashlib
from datetime import datetime, timedelta
from typing import Dict, Any
 
class SecureSessionManager:
    def __init__(self, secret_key: str):
        self.secret_key = secret_key
        self.active_sessions: Dict[str, Dict] = {}
 
    def create_session_token(self, user_id: int, user_role: str,
                           session_data: Dict[str, Any]) -> str:
        """Create secure JWT session token."""
 
        payload = {
            'user_id': user_id,
            'role': user_role,
            'session_id': self.generate_session_id(user_id),
            'issued_at': datetime.utcnow().isoformat(),
            'expires_at': (datetime.utcnow() + timedelta(hours=8)).isoformat(),
            'permissions': self.get_user_permissions(user_role)
        }
 
        token = jwt.encode(payload, self.secret_key, algorithm='HS256')
 
        # Store session metadata
        self.active_sessions[payload['session_id']] = {
            'user_id': user_id,
            'created_at': datetime.utcnow(),
            'last_activity': datetime.utcnow(),
            'ip_address': session_data.get('ip_address'),
            'user_agent': session_data.get('user_agent')
        }
 
        return token
 
    def validate_session_token(self, token: str, request_ip: str) -> Dict[str, Any]:
        """Validate session token and check security constraints."""
        try:
            payload = jwt.decode(token, self.secret_key, algorithms=['HS256'])
 
            # Check expiration
            expires_at = datetime.fromisoformat(payload['expires_at'])
            if datetime.utcnow() > expires_at:
                raise jwt.ExpiredSignatureError("Session expired")
 
            # Validate active session
            session_id = payload['session_id']
            if session_id not in self.active_sessions:
                raise jwt.InvalidTokenError("Session invalidated")
 
            session_data = self.active_sessions[session_id]
 
            # Check IP consistency (optional security measure)
            if session_data['ip_address'] != request_ip:
                self.log_security_event('ip_mismatch', session_id, request_ip)
 
            # Update last activity
            session_data['last_activity'] = datetime.utcnow()
 
            return payload
 
        except jwt.InvalidTokenError as e:
            raise PermissionError(f"Invalid session: {str(e)}")
 
    def generate_session_id(self, user_id: int) -> str:
        """Generate unique session identifier."""
        data = f"{user_id}:{datetime.utcnow().isoformat()}:{uuid.uuid4()}"
        return hashlib.sha256(data.encode()).hexdigest()

6. Real-time Security Monitoring

import logging
from datetime import datetime, timedelta
from collections import defaultdict
 
class AccessControlMonitor:
    def __init__(self):
        self.failed_access_attempts = defaultdict(list)
        self.suspicious_patterns = defaultdict(int)
 
    def log_access_attempt(self, user_id: int, resource: str,
                          action: str, success: bool, ip_address: str):
        """Log all access attempts for monitoring."""
 
        log_entry = {
            'timestamp': datetime.utcnow(),
            'user_id': user_id,
            'resource': resource,
            'action': action,
            'success': success,
            'ip_address': ip_address
        }
 
        # Log to security system
        logging.info(f"Access attempt: {log_entry}")
 
        if not success:
            self.failed_access_attempts[user_id].append(log_entry)
            self.detect_suspicious_activity(user_id, ip_address)
 
    def detect_suspicious_activity(self, user_id: int, ip_address: str):
        """Detect patterns indicating potential attacks."""
 
        current_time = datetime.utcnow()
        recent_failures = [
            attempt for attempt in self.failed_access_attempts[user_id]
            if current_time - attempt['timestamp'] < timedelta(minutes=15)
        ]
 
        # Alert on multiple failed access attempts
        if len(recent_failures) > 5:
            self.alert_security_team(
                f"User {user_id} has {len(recent_failures)} failed access attempts from {ip_address}"
            )
 
        # Detect privilege escalation attempts
        escalation_patterns = ['admin', 'root', 'superuser', 'elevated']
        for attempt in recent_failures:
            if any(pattern in attempt['resource'].lower() for pattern in escalation_patterns):
                self.alert_security_team(
                    f"Potential privilege escalation attempt by user {user_id}"
                )
                break
 
    def alert_security_team(self, message: str):
        """Send security alerts to monitoring systems."""
        logging.critical(f"SECURITY ALERT: {message}")
        # Integrate with SIEM, Slack, email, etc.