API Privilege Escalation Detection Tool Written in Python

API Privilege Escalation Detection Tool

#!/usr/bin/env python3
"""API Privilege Escalation Detection Tool
功能:检测Web接口的水平越权和垂直越权漏洞
作者:pp
版本:1.0"""

import json
import requests
import time
import hashlib
import hmac
import base64
from typing import Dict, List, Any, Optional
from urllib.parse import urlparse, parse_qs
import argparse
import sys

class APIPrivilegeEscalationScanner:
    """
    API Privilege Escalation Detector
    """

    def __init__(self, config: Dict[str, Any]):
        """
        Initialize the scanner

        Args:
            config: Configuration dictionary containing scan parameters
        """
        self.config = config
        self.results = []
        self.session = requests.Session()

        # Set request headers
        if 'headers' in config:
            self.session.headers.update(config['headers'])

    def send_request(self, request_config: Dict[str, Any]) -> Dict[str, Any]:
        """
        Send HTTP request

        Args:
            request_config: Request configuration

        Returns:
            Response result dictionary
        """
        try:
            method = request_config.get('method', 'GET').upper()
            url = request_config['url']
            headers = request_config.get('headers', {})
            data = request_config.get('data', None)
            params = request_config.get('params', None)

            # Handle authentication information
            auth_config = request_config.get('auth', {})
            if auth_config:
                headers = self._add_auth_headers(headers, auth_config)

            # Send request
            start_time = time.time()
            response = self.session.request(
                method=method,
                url=url,
                headers=headers,
                json=data if isinstance(data, dict) else None,
                data=data if isinstance(data, str) else None,
                params=params,
                timeout=self.config.get('timeout', 30),
                verify=self.config.get('verify_ssl', True)
            )
            response_time = round((time.time() - start_time) * 1000, 2)

            # Build response result
            result = {
                'status_code': response.status_code,
                'headers': dict(response.headers),
                'content': response.text,
                'response_time_ms': response_time,
                'size_bytes': len(response.content)
            }

            # Try to parse JSON response
            try:
                result['json'] = response.json()
            except:
                result['json'] = None

            return result

        except Exception as e:
            return {
                'error': str(e),
                'status_code': 0
            }

    def _add_auth_headers(self, headers: Dict[str, str], auth_config: Dict[str, Any]) -> Dict[str, str]:
        """
        Add authentication headers

        Args:
            headers: Original headers
            auth_config: Authentication configuration

        Returns:
            Updated headers
        """
        auth_type = auth_config.get('type', 'bearer')

        if auth_type == 'bearer':
            token = auth_config.get('token', '')
            headers['Authorization'] = f"Bearer {token}"

        elif auth_type == 'basic':
            username = auth_config.get('username', '')
            password = auth_config.get('password', '')
            credentials = base64.b64encode(f"{username}:{password}".encode()).decode()
            headers['Authorization'] = f"Basic {credentials}"

        elif auth_type == 'api_key':
            key_name = auth_config.get('key_name', 'X-API-Key')
            key_value = auth_config.get('key_value', '')
            headers[key_name] = key_value

        elif auth_type == 'hmac':
            # HMAC signature authentication
            secret = auth_config.get('secret', '')
            message = auth_config.get('message', '')
            signature = hmac.new(
                secret.encode(),
                message.encode(),
                hashlib.sha256
            ).hexdigest()
            headers['X-Signature'] = signature

        return headers

    def check_horizontal_privilege_escalation(self, test_cases: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
        """
        Check for horizontal privilege escalation vulnerabilities

        Args:
            test_cases: List of test cases

        Returns:
            List of detection results
        """
        results = []

        for test_case in test_cases:
            print(f"Testing horizontal privilege escalation: {test_case.get('description', 'Unknown')}")

            # Send normal user request
            normal_response = self.send_request(test_case['normal_user_request'])

            # Send attacker request
            attacker_response = self.send_request(test_case['attacker_request'])

            # Analyze results
            vulnerability_found = self._analyze_horizontal_escalation(
                normal_response,
                attacker_response,
                test_case
            )

            result = {
                'test_type': 'horizontal_privilege_escalation',
                'description': test_case.get('description', ''),
                'vulnerability_found': vulnerability_found,
                'normal_user_response': self._sanitize_response(normal_response),
                'attacker_response': self._sanitize_response(attacker_response),
                'test_case': self._sanitize_test_case(test_case)
            }

            results.append(result)

        return results

    def check_vertical_privilege_escalation(self, test_cases: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
        """
        Check for vertical privilege escalation vulnerabilities

        Args:
            test_cases: List of test cases

        Returns:
            List of detection results
        """
        results = []

        for test_case in test_cases:
            print(f"Testing vertical privilege escalation: {test_case.get('description', 'Unknown')}")

            # Send low privilege user request
            low_privilege_response = self.send_request(test_case['low_privilege_request'])

            # Analyze results
            vulnerability_found = self._analyze_vertical_escalation(
                low_privilege_response,
                test_case
            )

            result = {
                'test_type': 'vertical_privilege_escalation',
                'description': test_case.get('description', ''),
                'vulnerability_found': vulnerability_found,
                'low_privilege_response': self._sanitize_response(low_privilege_response),
                'test_case': self._sanitize_test_case(test_case)
            }

            results.append(result)

        return results

    def _analyze_horizontal_escalation(self, normal_response: Dict, attacker_response: Dict,
                                     test_case: Dict) -> bool:
        """
        Analyze horizontal privilege escalation detection results

        Args:
            normal_response: Normal user response
            attacker_response: Attacker response
            test_case: Test case

        Returns:
            Whether a vulnerability was found
        """
        # If the attacker request fails, it is usually not a privilege escalation
        if attacker_response.get('error') or attacker_response.get('status_code', 0) >= 500:
            return False

        expected_status = test_case.get('expected_attacker_status', 403)

        # Status code analysis
        if attacker_response.get('status_code') == expected_status:
            return False

        # Content similarity analysis (simplified version)
        if (attacker_response.get('status_code') == normal_response.get('status_code') == 200):
            # Check if response content is similar
            normal_content = str(normal_response.get('content', ''))
            attacker_content = str(attacker_response.get('content', ''))

            # Simple similarity check (more complex algorithms can be used in practice)
            if (len(normal_content) > 0 and
                len(attacker_content) > 0 and
                self._calculate_similarity(normal_content, attacker_content) > 0.7):
                return True

        return False

    def _analyze_vertical_escalation(self, low_privilege_response: Dict, test_case: Dict) -> bool:
        """
        Analyze vertical privilege escalation detection results

        Args:
            low_privilege_response: Low privilege user response
            test_case: Test case

        Returns:
            Whether a vulnerability was found
        """
        if low_privilege_response.get('error'):
            return False

        expected_status = test_case.get('expected_status', 403)
        actual_status = low_privilege_response.get('status_code')

        # If a low privilege user successfully accessed a high privilege interface
        if actual_status not in [401, 403, expected_status]:
            if 200 <= actual_status < 300:
                return True

        return False

    def _calculate_similarity(self, str1: str, str2: str) -> float:
        """
        Calculate the similarity between two strings (simplified version)

        Args:
            str1: String 1
            str2: String 2

        Returns:
            Similarity score 0-1
        """
        if not str1 or not str2:
            return 0.0

        # Simple similarity calculation
        set1 = set(str1.split())
        set2 = set(str2.split())

        if not set1 or not set2:
            return 0.0

        intersection = set1.intersection(set2)
        union = set1.union(set2)

        return len(intersection) / len(union) if union else 0.0

    def _sanitize_response(self, response: Dict) -> Dict:
        """
        Sanitize response data, removing sensitive information

        Args:
            response: Original response

        Returns:
            Sanitized response
        """
        sanitized = response.copy()

        # Remove potentially sensitive headers
        sensitive_headers = ['authorization', 'cookie', 'set-cookie', 'x-auth-token']
        sanitized['headers'] = {
            k: v for k, v in sanitized.get('headers', {}).items()
            if k.lower() not in sensitive_headers
        }

        # Limit content length
        if 'content' in sanitized and len(sanitized['content']) > 1000:
            sanitized['content'] = sanitized['content'][:1000] + "... [TRUNCATED]"

        return sanitized

    def _sanitize_test_case(self, test_case: Dict) -> Dict:
        """
        Sanitize test case, removing sensitive information

        Args:
            test_case: Original test case

        Returns:
            Sanitized test case
        """
        sanitized = test_case.copy()

        # Sanitize authentication information
        for key in ['normal_user_request', 'attacker_request', 'low_privilege_request']:
            if key in sanitized and 'auth' in sanitized[key]:
                sanitized[key]['auth'] = {**sanitized[key]['auth'], 'token': '***', 'password': '***'}

        return sanitized

    def scan(self, scan_config: Dict[str, Any]) -> Dict[str, Any]:
        """
        Execute scan

        Args:
            scan_config: Scan configuration

        Returns:
            Scan results
        """
        print("Starting API privilege escalation detection scan...")

        results = {
            'scan_id': hashlib.md5(str(time.time()).encode()).hexdigest()[:8],
            'start_time': time.strftime('%Y-%m-%d %H:%M:%S'),
            'horizontal_privilege_results': [],
            'vertical_privilege_results': []
        }

        # Horizontal privilege detection
        if 'horizontal_privilege_tests' in scan_config:
            horizontal_results = self.check_horizontal_privilege_escalation(
                scan_config['horizontal_privilege_tests']
            )
            results['horizontal_privilege_results'] = horizontal_results

        # Vertical privilege detection
        if 'vertical_privilege_tests' in scan_config:
            vertical_results = self.check_vertical_privilege_escalation(
                scan_config['vertical_privilege_tests']
            )
            results['vertical_privilege_results'] = vertical_results

        # Statistics
        results['end_time'] = time.strftime('%Y-%m-%d %H:%M:%S')
        results['total_tests'] = len(results['horizontal_privilege_results']) + len(results['vertical_privilege_results'])
        results['vulnerabilities_found'] = sum(
            1 for r in results['horizontal_privilege_results'] + results['vertical_privilege_results']
            if r['vulnerability_found']
        )

        print(f"Scan complete! A total of {results['total_tests']} tests executed, found {results['vulnerabilities_found']} vulnerabilities.")

        return results


def load_config_from_file(file_path: str) -> Dict[str, Any]:
    """
    Load configuration from file

    Args:
        file_path: Configuration file path

    Returns:
        Configuration dictionary
    """
    try:
        with open(file_path, 'r', encoding='utf-8') as f:
            return json.load(f)
    except Exception as e:
        print(f"Failed to load configuration file: {e}")
        sys.exit(1)


def save_results_to_file(results: Dict[str, Any], file_path: str):
    """
    Save results to file

    Args:
        results: Scan results
        file_path: Output file path
    """
    try:
        with open(file_path, 'w', encoding='utf-8') as f:
            json.dump(results, f, indent=2, ensure_ascii=False)
        print(f"Results saved to: {file_path}")
    except Exception as e:
        print(f"Failed to save results: {e}")


def main():
    """Main function"""
    parser = argparse.ArgumentParser(description='API Privilege Escalation Detection Tool')
    parser.add_argument('-c', '--config', required=True, help='Scan configuration file path')
    parser.add_argument('-o', '--output', help='Output results file path')

    args = parser.parse_args()

    # Load configuration
    config = load_config_from_file(args.config)

    # Create scanner
    scanner = APIPrivilegeEscalationScanner(config.get('scanner_config', {}))

    # Execute scan
    results = scanner.scan(config.get('scan_config', {}))

    # Output results
    if args.output:
        save_results_to_file(results, args.output)
    else:
        print(json.dumps(results, indent=2, ensure_ascii=False))


if __name__ == "__main__":
    main()

Configuration File Example (config.json)

{
  "scanner_config": {
    "timeout": 30,
    "verify_ssl": false,
    "headers": {
      "User-Agent": "APIPrivilegeScanner/1.0",
      "Content-Type": "application/json"
    }
  },
  "scan_config": {
    "horizontal_privilege_tests": [
      {
        "description": "Test user information query horizontal privilege escalation",
        "normal_user_request": {
          "method": "GET",
          "url": "https://api.example.com/v1/users/1001/profile",
          "headers": {
            "Content-Type": "application/json"
          },
          "auth": {
            "type": "bearer",
            "token": "user1001_token_here"
          }
        },
        "attacker_request": {
          "method": "GET",
          "url": "https://api.example.com/v1/users/1002/profile",
          "headers": {
            "Content-Type": "application/json"
          },
          "auth": {
            "type": "bearer",
            "token": "user1001_token_here"
          }
        },
        "expected_attacker_status": 403
      },
      {
        "description": "Test order access horizontal privilege escalation",
        "normal_user_request": {
          "method": "GET",
          "url": "https://api.example.com/v1/orders/2001",
          "auth": {
            "type": "bearer",
            "token": "user1001_token_here"
          }
        },
        "attacker_request": {
          "method": "GET",
          "url": "https://api.example.com/v1/orders/2002",
          "auth": {
            "type": "bearer",
            "token": "user1001_token_here"
          }
        },
        "expected_attacker_status": 403
      }
    ],
    "vertical_privilege_tests": [
      {
        "description": "Test normal user accessing admin interface",
        "low_privilege_request": {
          "method": "GET",
          "url": "https://api.example.com/v1/admin/users",
          "auth": {
            "type": "bearer",
            "token": "normal_user_token_here"
          }
        },
        "expected_status": 403
      },
      {
        "description": "Test user creating admin account",
        "low_privilege_request": {
          "method": "POST",
          "url": "https://api.example.com/v1/admin/users",
          "headers": {
            "Content-Type": "application/json"
          },
          "data": {
            "username": "newadmin",
            "password": "password123",
            "role": "administrator"
          },
          "auth": {
            "type": "bearer",
            "token": "normal_user_token_here"
          }
        },
        "expected_status": 403
      }
    ]
  }
}

Usage Example

1. Save the code as api_privilege_scanner.py
2. Create a configuration file config.json (using the example above)
3. Run the scan:

python api_privilege_scanner.py -c config.json -o results.json

Output Result Example

{
  "scan_id": "a1b2c3d4",
  "start_time": "2024-01-15 10:30:00",
  "end_time": "2024-01-15 10:31:23",
  "total_tests": 4,
  "vulnerabilities_found": 2,
  "horizontal_privilege_results": [
    {
      "test_type": "horizontal_privilege_escalation",
      "description": "Test user information query horizontal privilege escalation",
      "vulnerability_found": true,
      "normal_user_response": {
        "status_code": 200,
        "headers": {
          "content-type": "application/json",
          "content-length": "156"
        },
        "content": "{\"user_id\": 1001, \"username\": \"user1\", \"email\": \"[email protected]\"}",
        "response_time_ms": 245.67,
        "size_bytes": 156,
        "json": {
          "user_id": 1001,
          "username": "user1",
          "email": "[email protected]"
        }
      },
      "attacker_response": {
        "status_code": 200,
        "headers": {
          "content-type": "application/json",
          "content-length": "156"
        },
        "content": "{\"user_id\": 1002, \"username\": \"user2\", \"email\": \"[email protected]\"}",
        "response_time_ms": 234.12,
        "size_bytes": 156,
        "json": {
          "user_id": 1002,
          "username": "user2",
          "email": "[email protected]"
        }
      },
      "test_case": {
        "description": "Test user information query horizontal privilege escalation",
        "normal_user_request": {
          "method": "GET",
          "url": "https://api.example.com/v1/users/1001/profile",
          "headers": {
            "Content-Type": "application/json"
          },
          "auth": {
            "type": "bearer",
            "token": "***"
          }
        },
        "attacker_request": {
          "method": "GET",
          "url": "https://api.example.com/v1/users/1002/profile",
          "headers": {
            "Content-Type": "application/json"
          },
          "auth": {
            "type": "bearer",
            "token": "***"
          }
        },
        "expected_attacker_status": 403
      }
    }
  ],
  "vertical_privilege_results": [
    {
      "test_type": "vertical_privilege_escalation",
      "description": "Test normal user accessing admin interface",
      "vulnerability_found": false,
      "low_privilege_response": {
        "status_code": 403,
        "headers": {
          "content-type": "application/json",
          "content-length": "45"
        },
        "content": "{\"error\": \"Forbidden\", \"code\": 403}",
        "response_time_ms": 178.45,
        "size_bytes": 45,
        "json": {
          "error": "Forbidden",
          "code": 403
        }
      },
      "test_case": {
        "description": "Test normal user accessing admin interface",
        "low_privilege_request": {
          "method": "GET",
          "url": "https://api.example.com/v1/admin/users",
          "auth": {
            "type": "bearer",
            "token": "***"
          }
        },
        "expected_status": 403
      }
    }
  ]
}

Tool Features

1. Supports multiple authentication methods: Bearer Token, Basic Auth, API Key, HMAC
2. Detects two types of privilege escalation: horizontal and vertical
3. Detailed test reports: including request response details and vulnerability evidence
4. Sensitive information protection: automatically sanitizes tokens, passwords, and other sensitive information
5. Flexible configuration: supports JSON configuration files
6. Error handling: comprehensive exception handling mechanism

This tool can help security testers systematically detect API access control vulnerabilities, improving application security.

Leave a Comment