API Privilege Escalation Detection Tool
#!/usr/bin/env python3
"""API Privilege Escalation Detection Tool
功能:检测Web接口的水平越权和垂直越权漏洞
作者:pp
版本:1.0"""
import json
import requests
import time
import hashlib
import hmac
import base64
from typing import Dict, List, Any, Optional
from urllib.parse import urlparse, parse_qs
import argparse
import sys
class APIPrivilegeEscalationScanner:
"""
API Privilege Escalation Detector
"""
def __init__(self, config: Dict[str, Any]):
"""
Initialize the scanner
Args:
config: Configuration dictionary containing scan parameters
"""
self.config = config
self.results = []
self.session = requests.Session()
# Set request headers
if 'headers' in config:
self.session.headers.update(config['headers'])
def send_request(self, request_config: Dict[str, Any]) -> Dict[str, Any]:
"""
Send HTTP request
Args:
request_config: Request configuration
Returns:
Response result dictionary
"""
try:
method = request_config.get('method', 'GET').upper()
url = request_config['url']
headers = request_config.get('headers', {})
data = request_config.get('data', None)
params = request_config.get('params', None)
# Handle authentication information
auth_config = request_config.get('auth', {})
if auth_config:
headers = self._add_auth_headers(headers, auth_config)
# Send request
start_time = time.time()
response = self.session.request(
method=method,
url=url,
headers=headers,
json=data if isinstance(data, dict) else None,
data=data if isinstance(data, str) else None,
params=params,
timeout=self.config.get('timeout', 30),
verify=self.config.get('verify_ssl', True)
)
response_time = round((time.time() - start_time) * 1000, 2)
# Build response result
result = {
'status_code': response.status_code,
'headers': dict(response.headers),
'content': response.text,
'response_time_ms': response_time,
'size_bytes': len(response.content)
}
# Try to parse JSON response
try:
result['json'] = response.json()
except:
result['json'] = None
return result
except Exception as e:
return {
'error': str(e),
'status_code': 0
}
def _add_auth_headers(self, headers: Dict[str, str], auth_config: Dict[str, Any]) -> Dict[str, str]:
"""
Add authentication headers
Args:
headers: Original headers
auth_config: Authentication configuration
Returns:
Updated headers
"""
auth_type = auth_config.get('type', 'bearer')
if auth_type == 'bearer':
token = auth_config.get('token', '')
headers['Authorization'] = f"Bearer {token}"
elif auth_type == 'basic':
username = auth_config.get('username', '')
password = auth_config.get('password', '')
credentials = base64.b64encode(f"{username}:{password}".encode()).decode()
headers['Authorization'] = f"Basic {credentials}"
elif auth_type == 'api_key':
key_name = auth_config.get('key_name', 'X-API-Key')
key_value = auth_config.get('key_value', '')
headers[key_name] = key_value
elif auth_type == 'hmac':
# HMAC signature authentication
secret = auth_config.get('secret', '')
message = auth_config.get('message', '')
signature = hmac.new(
secret.encode(),
message.encode(),
hashlib.sha256
).hexdigest()
headers['X-Signature'] = signature
return headers
def check_horizontal_privilege_escalation(self, test_cases: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""
Check for horizontal privilege escalation vulnerabilities
Args:
test_cases: List of test cases
Returns:
List of detection results
"""
results = []
for test_case in test_cases:
print(f"Testing horizontal privilege escalation: {test_case.get('description', 'Unknown')}")
# Send normal user request
normal_response = self.send_request(test_case['normal_user_request'])
# Send attacker request
attacker_response = self.send_request(test_case['attacker_request'])
# Analyze results
vulnerability_found = self._analyze_horizontal_escalation(
normal_response,
attacker_response,
test_case
)
result = {
'test_type': 'horizontal_privilege_escalation',
'description': test_case.get('description', ''),
'vulnerability_found': vulnerability_found,
'normal_user_response': self._sanitize_response(normal_response),
'attacker_response': self._sanitize_response(attacker_response),
'test_case': self._sanitize_test_case(test_case)
}
results.append(result)
return results
def check_vertical_privilege_escalation(self, test_cases: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""
Check for vertical privilege escalation vulnerabilities
Args:
test_cases: List of test cases
Returns:
List of detection results
"""
results = []
for test_case in test_cases:
print(f"Testing vertical privilege escalation: {test_case.get('description', 'Unknown')}")
# Send low privilege user request
low_privilege_response = self.send_request(test_case['low_privilege_request'])
# Analyze results
vulnerability_found = self._analyze_vertical_escalation(
low_privilege_response,
test_case
)
result = {
'test_type': 'vertical_privilege_escalation',
'description': test_case.get('description', ''),
'vulnerability_found': vulnerability_found,
'low_privilege_response': self._sanitize_response(low_privilege_response),
'test_case': self._sanitize_test_case(test_case)
}
results.append(result)
return results
def _analyze_horizontal_escalation(self, normal_response: Dict, attacker_response: Dict,
test_case: Dict) -> bool:
"""
Analyze horizontal privilege escalation detection results
Args:
normal_response: Normal user response
attacker_response: Attacker response
test_case: Test case
Returns:
Whether a vulnerability was found
"""
# If the attacker request fails, it is usually not a privilege escalation
if attacker_response.get('error') or attacker_response.get('status_code', 0) >= 500:
return False
expected_status = test_case.get('expected_attacker_status', 403)
# Status code analysis
if attacker_response.get('status_code') == expected_status:
return False
# Content similarity analysis (simplified version)
if (attacker_response.get('status_code') == normal_response.get('status_code') == 200):
# Check if response content is similar
normal_content = str(normal_response.get('content', ''))
attacker_content = str(attacker_response.get('content', ''))
# Simple similarity check (more complex algorithms can be used in practice)
if (len(normal_content) > 0 and
len(attacker_content) > 0 and
self._calculate_similarity(normal_content, attacker_content) > 0.7):
return True
return False
def _analyze_vertical_escalation(self, low_privilege_response: Dict, test_case: Dict) -> bool:
"""
Analyze vertical privilege escalation detection results
Args:
low_privilege_response: Low privilege user response
test_case: Test case
Returns:
Whether a vulnerability was found
"""
if low_privilege_response.get('error'):
return False
expected_status = test_case.get('expected_status', 403)
actual_status = low_privilege_response.get('status_code')
# If a low privilege user successfully accessed a high privilege interface
if actual_status not in [401, 403, expected_status]:
if 200 <= actual_status < 300:
return True
return False
def _calculate_similarity(self, str1: str, str2: str) -> float:
"""
Calculate the similarity between two strings (simplified version)
Args:
str1: String 1
str2: String 2
Returns:
Similarity score 0-1
"""
if not str1 or not str2:
return 0.0
# Simple similarity calculation
set1 = set(str1.split())
set2 = set(str2.split())
if not set1 or not set2:
return 0.0
intersection = set1.intersection(set2)
union = set1.union(set2)
return len(intersection) / len(union) if union else 0.0
def _sanitize_response(self, response: Dict) -> Dict:
"""
Sanitize response data, removing sensitive information
Args:
response: Original response
Returns:
Sanitized response
"""
sanitized = response.copy()
# Remove potentially sensitive headers
sensitive_headers = ['authorization', 'cookie', 'set-cookie', 'x-auth-token']
sanitized['headers'] = {
k: v for k, v in sanitized.get('headers', {}).items()
if k.lower() not in sensitive_headers
}
# Limit content length
if 'content' in sanitized and len(sanitized['content']) > 1000:
sanitized['content'] = sanitized['content'][:1000] + "... [TRUNCATED]"
return sanitized
def _sanitize_test_case(self, test_case: Dict) -> Dict:
"""
Sanitize test case, removing sensitive information
Args:
test_case: Original test case
Returns:
Sanitized test case
"""
sanitized = test_case.copy()
# Sanitize authentication information
for key in ['normal_user_request', 'attacker_request', 'low_privilege_request']:
if key in sanitized and 'auth' in sanitized[key]:
sanitized[key]['auth'] = {**sanitized[key]['auth'], 'token': '***', 'password': '***'}
return sanitized
def scan(self, scan_config: Dict[str, Any]) -> Dict[str, Any]:
"""
Execute scan
Args:
scan_config: Scan configuration
Returns:
Scan results
"""
print("Starting API privilege escalation detection scan...")
results = {
'scan_id': hashlib.md5(str(time.time()).encode()).hexdigest()[:8],
'start_time': time.strftime('%Y-%m-%d %H:%M:%S'),
'horizontal_privilege_results': [],
'vertical_privilege_results': []
}
# Horizontal privilege detection
if 'horizontal_privilege_tests' in scan_config:
horizontal_results = self.check_horizontal_privilege_escalation(
scan_config['horizontal_privilege_tests']
)
results['horizontal_privilege_results'] = horizontal_results
# Vertical privilege detection
if 'vertical_privilege_tests' in scan_config:
vertical_results = self.check_vertical_privilege_escalation(
scan_config['vertical_privilege_tests']
)
results['vertical_privilege_results'] = vertical_results
# Statistics
results['end_time'] = time.strftime('%Y-%m-%d %H:%M:%S')
results['total_tests'] = len(results['horizontal_privilege_results']) + len(results['vertical_privilege_results'])
results['vulnerabilities_found'] = sum(
1 for r in results['horizontal_privilege_results'] + results['vertical_privilege_results']
if r['vulnerability_found']
)
print(f"Scan complete! A total of {results['total_tests']} tests executed, found {results['vulnerabilities_found']} vulnerabilities.")
return results
def load_config_from_file(file_path: str) -> Dict[str, Any]:
"""
Load configuration from file
Args:
file_path: Configuration file path
Returns:
Configuration dictionary
"""
try:
with open(file_path, 'r', encoding='utf-8') as f:
return json.load(f)
except Exception as e:
print(f"Failed to load configuration file: {e}")
sys.exit(1)
def save_results_to_file(results: Dict[str, Any], file_path: str):
"""
Save results to file
Args:
results: Scan results
file_path: Output file path
"""
try:
with open(file_path, 'w', encoding='utf-8') as f:
json.dump(results, f, indent=2, ensure_ascii=False)
print(f"Results saved to: {file_path}")
except Exception as e:
print(f"Failed to save results: {e}")
def main():
"""Main function"""
parser = argparse.ArgumentParser(description='API Privilege Escalation Detection Tool')
parser.add_argument('-c', '--config', required=True, help='Scan configuration file path')
parser.add_argument('-o', '--output', help='Output results file path')
args = parser.parse_args()
# Load configuration
config = load_config_from_file(args.config)
# Create scanner
scanner = APIPrivilegeEscalationScanner(config.get('scanner_config', {}))
# Execute scan
results = scanner.scan(config.get('scan_config', {}))
# Output results
if args.output:
save_results_to_file(results, args.output)
else:
print(json.dumps(results, indent=2, ensure_ascii=False))
if __name__ == "__main__":
main()
Configuration File Example (config.json)
{
"scanner_config": {
"timeout": 30,
"verify_ssl": false,
"headers": {
"User-Agent": "APIPrivilegeScanner/1.0",
"Content-Type": "application/json"
}
},
"scan_config": {
"horizontal_privilege_tests": [
{
"description": "Test user information query horizontal privilege escalation",
"normal_user_request": {
"method": "GET",
"url": "https://api.example.com/v1/users/1001/profile",
"headers": {
"Content-Type": "application/json"
},
"auth": {
"type": "bearer",
"token": "user1001_token_here"
}
},
"attacker_request": {
"method": "GET",
"url": "https://api.example.com/v1/users/1002/profile",
"headers": {
"Content-Type": "application/json"
},
"auth": {
"type": "bearer",
"token": "user1001_token_here"
}
},
"expected_attacker_status": 403
},
{
"description": "Test order access horizontal privilege escalation",
"normal_user_request": {
"method": "GET",
"url": "https://api.example.com/v1/orders/2001",
"auth": {
"type": "bearer",
"token": "user1001_token_here"
}
},
"attacker_request": {
"method": "GET",
"url": "https://api.example.com/v1/orders/2002",
"auth": {
"type": "bearer",
"token": "user1001_token_here"
}
},
"expected_attacker_status": 403
}
],
"vertical_privilege_tests": [
{
"description": "Test normal user accessing admin interface",
"low_privilege_request": {
"method": "GET",
"url": "https://api.example.com/v1/admin/users",
"auth": {
"type": "bearer",
"token": "normal_user_token_here"
}
},
"expected_status": 403
},
{
"description": "Test user creating admin account",
"low_privilege_request": {
"method": "POST",
"url": "https://api.example.com/v1/admin/users",
"headers": {
"Content-Type": "application/json"
},
"data": {
"username": "newadmin",
"password": "password123",
"role": "administrator"
},
"auth": {
"type": "bearer",
"token": "normal_user_token_here"
}
},
"expected_status": 403
}
]
}
}
Usage Example
1. Save the code as api_privilege_scanner.py
2. Create a configuration file config.json (using the example above)
3. Run the scan:
python api_privilege_scanner.py -c config.json -o results.json
Output Result Example
{
"scan_id": "a1b2c3d4",
"start_time": "2024-01-15 10:30:00",
"end_time": "2024-01-15 10:31:23",
"total_tests": 4,
"vulnerabilities_found": 2,
"horizontal_privilege_results": [
{
"test_type": "horizontal_privilege_escalation",
"description": "Test user information query horizontal privilege escalation",
"vulnerability_found": true,
"normal_user_response": {
"status_code": 200,
"headers": {
"content-type": "application/json",
"content-length": "156"
},
"content": "{\"user_id\": 1001, \"username\": \"user1\", \"email\": \"[email protected]\"}",
"response_time_ms": 245.67,
"size_bytes": 156,
"json": {
"user_id": 1001,
"username": "user1",
"email": "[email protected]"
}
},
"attacker_response": {
"status_code": 200,
"headers": {
"content-type": "application/json",
"content-length": "156"
},
"content": "{\"user_id\": 1002, \"username\": \"user2\", \"email\": \"[email protected]\"}",
"response_time_ms": 234.12,
"size_bytes": 156,
"json": {
"user_id": 1002,
"username": "user2",
"email": "[email protected]"
}
},
"test_case": {
"description": "Test user information query horizontal privilege escalation",
"normal_user_request": {
"method": "GET",
"url": "https://api.example.com/v1/users/1001/profile",
"headers": {
"Content-Type": "application/json"
},
"auth": {
"type": "bearer",
"token": "***"
}
},
"attacker_request": {
"method": "GET",
"url": "https://api.example.com/v1/users/1002/profile",
"headers": {
"Content-Type": "application/json"
},
"auth": {
"type": "bearer",
"token": "***"
}
},
"expected_attacker_status": 403
}
}
],
"vertical_privilege_results": [
{
"test_type": "vertical_privilege_escalation",
"description": "Test normal user accessing admin interface",
"vulnerability_found": false,
"low_privilege_response": {
"status_code": 403,
"headers": {
"content-type": "application/json",
"content-length": "45"
},
"content": "{\"error\": \"Forbidden\", \"code\": 403}",
"response_time_ms": 178.45,
"size_bytes": 45,
"json": {
"error": "Forbidden",
"code": 403
}
},
"test_case": {
"description": "Test normal user accessing admin interface",
"low_privilege_request": {
"method": "GET",
"url": "https://api.example.com/v1/admin/users",
"auth": {
"type": "bearer",
"token": "***"
}
},
"expected_status": 403
}
}
]
}
Tool Features
1. Supports multiple authentication methods: Bearer Token, Basic Auth, API Key, HMAC
2. Detects two types of privilege escalation: horizontal and vertical
3. Detailed test reports: including request response details and vulnerability evidence
4. Sensitive information protection: automatically sanitizes tokens, passwords, and other sensitive information
5. Flexible configuration: supports JSON configuration files
6. Error handling: comprehensive exception handling mechanism
This tool can help security testers systematically detect API access control vulnerabilities, improving application security.