Wireshark网络协议分析实战
概述
Wireshark是世界上最流行的网络协议分析器,它允许用户捕获和交互式浏览网络流量。作为网络故障排除、分析和安全审计的重要工具,Wireshark在网络安全领域有着广泛的应用。
基础概念
数据包捕获
- 混杂模式:捕获所有经过网卡的数据包
- 监控模式:捕获无线网络中的所有数据包
- 过滤器:选择性捕获特定类型的数据包
协议层次
- 物理层:电气信号传输
- 数据链路层:以太网、WiFi帧
- 网络层:IP协议
- 传输层:TCP、UDP协议
- 应用层:HTTP、FTP、DNS等
界面介绍
主要组件
1. 菜单栏:文件操作、捕获设置、分析工具
2. 工具栏:常用功能快捷按钮
3. 过滤器栏:显示过滤器输入
4. 数据包列表:捕获的数据包概览
5. 数据包详情:选中数据包的协议层次
6. 十六进制视图:原始数据显示
7. 状态栏:捕获统计信息
颜色规则
- 绿色:HTTP流量
- 蓝色:DNS流量
- 黑色:TCP错误
- 红色:TCP问题
- 黄色:路由协议
捕获配置
接口选择
1. 启动Wireshark
2. 选择捕获接口
3. 配置捕获选项
4. 设置捕获过滤器
5. 开始捕获
捕获过滤器
# 捕获特定主机流量
host 192.168.1.100
# 捕获特定端口流量
port 80
# 捕获TCP流量
tcp
# 捕获HTTP流量
tcp port 80 or tcp port 443
# 捕获特定网段流量
net 192.168.1.0/24
# 组合条件
host 192.168.1.100 and port 80
显示过滤器
基本语法
# 协议过滤
http
tcp
udp
dns
# IP地址过滤
ip.addr == 192.168.1.100
ip.src == 192.168.1.100
ip.dst == 192.168.1.100
# 端口过滤
tcp.port == 80
tcp.srcport == 80
tcp.dstport == 80
# 逻辑操作符
and, or, not
==, !=, <, >, <=, >=
contains, matches
高级过滤
# HTTP方法过滤
http.request.method == "GET"
http.request.method == "POST"
# HTTP状态码过滤
http.response.code == 200
http.response.code >= 400
# 内容过滤
http contains "password"
tcp contains "admin"
# 时间过滤
frame.time >= "2024-01-01 00:00:00"
# 数据包大小过滤
frame.len > 1000
# TCP标志过滤
tcp.flags.syn == 1
tcp.flags.ack == 1
tcp.flags.rst == 1
协议分析
HTTP分析
# 查看HTTP请求
http.request
# 查看HTTP响应
http.response
# 查看特定URL
http.request.uri contains "/login"
# 查看用户代理
http.user_agent contains "Mozilla"
# 查看Cookie
http.cookie contains "session"
TCP分析
# TCP连接建立
tcp.flags.syn == 1 and tcp.flags.ack == 0
# TCP连接确认
tcp.flags.syn == 1 and tcp.flags.ack == 1
# TCP重传
tcp.analysis.retransmission
# TCP重复ACK
tcp.analysis.duplicate_ack
# TCP窗口满
tcp.analysis.zero_window
DNS分析
# DNS查询
dns.flags.response == 0
# DNS响应
dns.flags.response == 1
# DNS查询类型
dns.qry.type == 1 # A记录
dns.qry.type == 28 # AAAA记录
# DNS错误
dns.flags.rcode != 0
安全分析实战
恶意流量检测
1. 异常连接检测
#!/usr/bin/env python3
import pyshark
from collections import defaultdict
def analyze_connections(pcap_file):
"""分析异常连接"""
cap = pyshark.FileCapture(pcap_file)
connections = defaultdict(int)
for packet in cap:
if hasattr(packet, 'tcp'):
src_ip = packet.ip.src
dst_ip = packet.ip.dst
dst_port = packet.tcp.dstport
connection = f"{src_ip} -> {dst_ip}:{dst_port}"
connections[connection] += 1
# 检测异常连接(连接次数过多)
suspicious = [(conn, count) for conn, count in connections.items() if count > 100]
print("可疑连接:")
for conn, count in suspicious:
print(f"{conn}: {count} 次连接")
# 使用示例
analyze_connections('network_traffic.pcap')
2. DDoS攻击检测
#!/usr/bin/env python3
import pyshark
from datetime import datetime, timedelta
def detect_ddos(pcap_file, threshold=1000):
"""检测DDoS攻击"""
cap = pyshark.FileCapture(pcap_file)
packet_count = defaultdict(int)
time_window = timedelta(seconds=60) # 1分钟时间窗口
for packet in cap:
if hasattr(packet, 'ip'):
timestamp = datetime.fromtimestamp(float(packet.sniff_timestamp))
dst_ip = packet.ip.dst
# 统计每分钟到达目标IP的数据包数量
minute_key = timestamp.replace(second=0, microsecond=0)
packet_count[(dst_ip, minute_key)] += 1
# 检测超过阈值的情况
attacks = [(ip, time, count) for (ip, time), count in packet_count.items() if count > threshold]
print("检测到的DDoS攻击:")
for ip, time, count in attacks:
print(f"目标IP: {ip}, 时间: {time}, 数据包数量: {count}")
# 使用示例
detect_ddos('ddos_traffic.pcap', threshold=500)
密码嗅探
HTTP认证分析
# 查找HTTP基本认证
http.authbasic
# 查找包含密码的POST请求
http.request.method == "POST" and http contains "password"
# 查找登录相关的请求
http.request.uri contains "login" or http contains "username"
FTP密码捕获
#!/usr/bin/env python3
import pyshark
def extract_ftp_credentials(pcap_file):
"""提取FTP凭据"""
cap = pyshark.FileCapture(pcap_file, display_filter='ftp')
credentials = []
current_session = {}
for packet in cap:
if hasattr(packet, 'ftp'):
ftp_data = packet.ftp.request_command if hasattr(packet.ftp, 'request_command') else ''
if 'USER' in ftp_data:
username = ftp_data.split('USER ')[1].strip()
current_session['username'] = username
current_session['src_ip'] = packet.ip.src
elif 'PASS' in ftp_data:
password = ftp_data.split('PASS ')[1].strip()
current_session['password'] = password
if 'username' in current_session:
credentials.append(current_session.copy())
current_session = {}
print("发现的FTP凭据:")
for cred in credentials:
print(f"IP: {cred.get('src_ip')}, 用户名: {cred.get('username')}, 密码: {cred.get('password')}")
# 使用示例
extract_ftp_credentials('ftp_traffic.pcap')
网络扫描检测
端口扫描检测
#!/usr/bin/env python3
import pyshark
from collections import defaultdict
def detect_port_scan(pcap_file):
"""检测端口扫描"""
cap = pyshark.FileCapture(pcap_file)
scan_attempts = defaultdict(set)
for packet in cap:
if hasattr(packet, 'tcp'):
src_ip = packet.ip.src
dst_ip = packet.ip.dst
dst_port = packet.tcp.dstport
# 检测SYN扫描
if packet.tcp.flags_syn == '1' and packet.tcp.flags_ack == '0':
scan_attempts[f"{src_ip}->{dst_ip}"].add(dst_port)
# 检测扫描多个端口的行为
scanners = [(source_dest, ports) for source_dest, ports in scan_attempts.items() if len(ports) > 10]
print("检测到的端口扫描:")
for source_dest, ports in scanners:
print(f"{source_dest}: 扫描了 {len(ports)} 个端口")
print(f"端口列表: {sorted(list(ports))}")
# 使用示例
detect_port_scan('scan_traffic.pcap')
流量分析
统计分析
协议分布
# 在Wireshark中使用统计菜单
Statistics -> Protocol Hierarchy
Statistics -> Conversations
Statistics -> Endpoints
流量趋势
#!/usr/bin/env python3
import pyshark
import matplotlib.pyplot as plt
from datetime import datetime
from collections import defaultdict
def analyze_traffic_trend(pcap_file):
"""分析流量趋势"""
cap = pyshark.FileCapture(pcap_file)
traffic_by_time = defaultdict(int)
for packet in cap:
timestamp = datetime.fromtimestamp(float(packet.sniff_timestamp))
minute_key = timestamp.replace(second=0, microsecond=0)
traffic_by_time[minute_key] += int(packet.length)
# 绘制流量趋势图
times = sorted(traffic_by_time.keys())
volumes = [traffic_by_time[t] for t in times]
plt.figure(figsize=(12, 6))
plt.plot(times, volumes)
plt.title('网络流量趋势')
plt.xlabel('时间')
plt.ylabel('流量 (字节)')
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()
# 使用示例
analyze_traffic_trend('network_traffic.pcap')
异常检测
流量异常
#!/usr/bin/env python3
import pyshark
import numpy as np
from scipy import stats
def detect_traffic_anomalies(pcap_file):
"""检测流量异常"""
cap = pyshark.FileCapture(pcap_file)
packet_sizes = []
for packet in cap:
packet_sizes.append(int(packet.length))
# 使用Z-score检测异常
z_scores = np.abs(stats.zscore(packet_sizes))
threshold = 3
anomalies = [(i, size) for i, (size, z) in enumerate(zip(packet_sizes, z_scores)) if z > threshold]
print(f"检测到 {len(anomalies)} 个异常数据包:")
for idx, size in anomalies[:10]: # 显示前10个
print(f"数据包 {idx}: 大小 {size} 字节")
# 使用示例
detect_traffic_anomalies('network_traffic.pcap')
高级功能
流重组
TCP流跟踪
# 右键点击TCP数据包
# 选择 "Follow" -> "TCP Stream"
# 查看完整的TCP会话内容
HTTP对象导出
# File -> Export Objects -> HTTP
# 导出HTTP传输的文件
解密功能
SSL/TLS解密
1. 获取私钥文件
2. Edit -> Preferences -> Protocols -> TLS
3. 添加RSA密钥列表
4. 配置密钥文件路径
WPA/WPA2解密
1. 捕获完整的4次握手
2. Edit -> Preferences -> Protocols -> IEEE 802.11
3. 添加预共享密钥
4. 输入SSID和密码
自定义协议
Lua脚本
-- 自定义协议解析器
local custom_proto = Proto("custom", "Custom Protocol")
local f_command = ProtoField.uint8("custom.command", "Command", base.HEX)
local f_data = ProtoField.string("custom.data", "Data")
custom_proto.fields = {f_command, f_data}
function custom_proto.dissector(buffer, pinfo, tree)
local length = buffer:len()
if length == 0 then return end
pinfo.cols.protocol = custom_proto.name
local subtree = tree:add(custom_proto, buffer(), "Custom Protocol Data")
subtree:add(f_command, buffer(0,1))
subtree:add(f_data, buffer(1))
end
-- 注册协议
local tcp_port = DissectorTable.get("tcp.port")
tcp_port:add(12345, custom_proto)
性能优化
捕获优化
1. 使用捕获过滤器减少数据量
2. 设置合适的缓冲区大小
3. 选择正确的网络接口
4. 避免在高负载时长时间捕获
分析优化
1. 使用显示过滤器聚焦分析
2. 关闭不需要的协议解析器
3. 限制显示的数据包数量
4. 使用统计功能而非逐包分析
最佳实践
安全考虑
- 权限管理:以最小权限运行
- 数据保护:加密存储捕获文件
- 隐私保护:遵守数据保护法规
- 访问控制:限制捕获文件访问
分析流程
- 明确目标:确定分析目的
- 合理过滤:使用过滤器聚焦问题
- 系统分析:从协议栈角度分析
- 记录发现:详细记录分析结果
报告编写
# 网络流量分析报告
## 分析概述
- 分析时间:2024-01-15 10:00-12:00
- 数据源:内网流量捕获
- 分析工具:Wireshark 4.0
## 发现问题
1. 异常DNS查询
2. 可疑外连行为
3. 未加密敏感数据传输
## 详细分析
### 异常DNS查询
- 时间:10:30-10:45
- 源IP:192.168.1.100
- 查询域名:suspicious-domain.com
- 分析:可能的恶意软件通信
## 建议措施
1. 阻断可疑域名
2. 检查相关主机
3. 加强监控
总结
Wireshark是网络安全分析的重要工具,掌握其使用方法对于网络故障排除、安全事件调查和网络性能优化都有重要意义。通过系统学习和实践,可以有效提高网络分析能力,为网络安全防护提供有力支持。
> 评论区域 (2 条)_
发表评论