0%

信息内容安全实验-被动捕包

信息内容安全实验-被动捕包

实验要求

能够捕抓到流量包,最终能解析到http请求包的头部以及响应包的返回内容即可,不需要gzip解压(如果有gzip压缩的话

实验分析

采用linux下Pypcap配合dpkt进行处理即可(后面还添加了scapy处理的部分

不过好像用scapy可以直接进行抓流量并且分析(更加方便

Pypcap

Pypcap在linux下基于libpcap。

首先我们要选好抓取的网卡接口

1
2
iface = 'ens33'
pkt = pcap.pcap(iface, promisc=True, immediate=True, timeout_ms=50)

写好这两句即可设定抓包的网卡接口

linux下和windows下这个接口是不一样的

1
2
3
4
5
6
7
8
import pcap

# 查找所有的网卡接口
interfaces = pcap.findalldevs()

# 打印所有网卡接口
for interface in interfaces:
print(interface)

这个方法可以打印所有网卡接口

windows下网卡接口是\devices\xxx这样的,这个时候可以配合wireshark

想要获取devices\xxx对应的网卡接口昵称,可以在wireshark-编辑-首选项-捕获-默认接口那里可以点开查看

Untitled

dpkt解析

解析方法如下,具体用法可以查看文档

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
def anlysisData_requests(data):
# 解析以太网帧
eth = dpkt.ethernet.Ethernet(data)
# 解析ip数据包
if isinstance(eth.data, dpkt.ip.IP):
ip = eth.data
if isinstance(ip.data, dpkt.tcp.TCP):
"""tcp"""
tcp = ip.data
# 解析80端口流量
if tcp.dport == 80 or tcp.sport == 80:
try:
#解析到http
http = dpkt.http.Request(tcp.data)
log("http requests->")
print(http)
except (dpkt.dpkt.NeedData, dpkt.dpkt.UnpackError):
pass

scapy解析

解析方法如下,个人认为比dpkt好用(debug的时候可以随时打印)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
def anlysisData_response(data):
ether_pkt = Ether(data)
try:
ip_pkt = ether_pkt[IP]
tcp_pkt = ip_pkt[TCP]
if tcp_pkt.haslayer(Raw):
if ip_pkt.flags.DF == 1:
log("该报文没有分片")
else:
log("该报文分片了")
# log(tcp_pkt.flags)
raw_pkt = tcp_pkt[Raw]
load = raw_pkt.load
log("http response->")
print(load)
except:
pass

实验代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
import pcap
import dpkt

import getopt
import sys
import datetime
import time
import os
import gzip
from dpkt.compat import BytesIO
from scapy.all import *

def captureData(iface):
pkt = pcap.pcap(iface, promisc=True, immediate=True, timeout_ms=50)
# filter method
filters = {
'DNS': 'udp port 53',
'HTTP': 'tcp port 80'
}
pkt.setfilter(filters['HTTP'])

pcap_filepath = 'pkts/pkts_{}.pcap'.format(time.strftime("%Y%m%d-%H%M%S",
time.localtime()))
pcap_file = open(pcap_filepath, 'wb')
writer = dpkt.pcap.Writer(pcap_file)
print('Start capture...')
try:
pkts_count = 0
for ptime, pdata in pkt:
writer.writepkt(pdata, ptime)
anlysisData_requests(pdata)
anlysisData_response(pdata)
pkts_count += 1
except KeyboardInterrupt as e:
writer.close()
pcap_file.close()
if not pkts_count:
os.remove(pcap_filepath)
print('%d packets received'%(pkts_count))

def log(x):
print("\x1B[36m{}\x1B[0m".format(x))

def anlysisData_requests(data):
# 解析以太网帧
eth = dpkt.ethernet.Ethernet(data)
# 解析ip数据包
if isinstance(eth.data, dpkt.ip.IP):
ip = eth.data
if isinstance(ip.data, dpkt.tcp.TCP):
"""tcp"""
tcp = ip.data
# 解析80端口流量
if tcp.dport == 80 or tcp.sport == 80:
try:
#解析到http
http = dpkt.http.Request(tcp.data)
log("http requests->")
print(http)
except (dpkt.dpkt.NeedData, dpkt.dpkt.UnpackError):
pass


def anlysisData_response(data):
ether_pkt = Ether(data)
try:
ip_pkt = ether_pkt[IP]
tcp_pkt = ip_pkt[TCP]
if tcp_pkt.haslayer(Raw):
if ip_pkt.flags.DF == 1:
log("该报文没有分片")
else:
log("该报文分片了")
# log(tcp_pkt.flags)
raw_pkt = tcp_pkt[Raw]
load = raw_pkt.load
log("http response->")
print(load)
except:
pass

def main():
iface = 'ens33'
captureData(iface)

if __name__ == "__main__":
main()