信息内容安全实验-被动捕包
实验要求
能够捕抓到流量包,最终能解析到http请求包的头部以及响应包的返回内容即可,不需要gzip解压(如果有gzip压缩的话
实验分析
采用linux下Pypcap配合dpkt进行处理即可(后面还添加了scapy处理的部分
不过好像用scapy可以直接进行抓流量并且分析(更加方便
Pypcap
Pypcap在linux下基于libpcap。
首先我们要选好抓取的网卡接口
1 2
| iface = 'ens33' pkt = pcap.pcap(iface, promisc=True, immediate=True, timeout_ms=50)
|
写好这两句即可设定抓包的网卡接口
linux下和windows下这个接口是不一样的
1 2 3 4 5 6 7 8
| import pcap
interfaces = pcap.findalldevs()
for interface in interfaces: print(interface)
|
这个方法可以打印所有网卡接口
windows下网卡接口是\devices\xxx这样的,这个时候可以配合wireshark
想要获取devices\xxx对应的网卡接口昵称,可以在wireshark-编辑-首选项-捕获-默认接口那里可以点开查看
dpkt解析
解析方法如下,具体用法可以查看文档
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| def anlysisData_requests(data): eth = dpkt.ethernet.Ethernet(data) if isinstance(eth.data, dpkt.ip.IP): ip = eth.data if isinstance(ip.data, dpkt.tcp.TCP): """tcp""" tcp = ip.data if tcp.dport == 80 or tcp.sport == 80: try: http = dpkt.http.Request(tcp.data) log("http requests->") print(http) except (dpkt.dpkt.NeedData, dpkt.dpkt.UnpackError): pass
|
scapy解析
解析方法如下,个人认为比dpkt好用(debug的时候可以随时打印)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| def anlysisData_response(data): ether_pkt = Ether(data) try: ip_pkt = ether_pkt[IP] tcp_pkt = ip_pkt[TCP] if tcp_pkt.haslayer(Raw): if ip_pkt.flags.DF == 1: log("该报文没有分片") else: log("该报文分片了") raw_pkt = tcp_pkt[Raw] load = raw_pkt.load log("http response->") print(load) except: pass
|
实验代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87
| import pcap import dpkt
import getopt import sys import datetime import time import os import gzip from dpkt.compat import BytesIO from scapy.all import *
def captureData(iface): pkt = pcap.pcap(iface, promisc=True, immediate=True, timeout_ms=50) filters = { 'DNS': 'udp port 53', 'HTTP': 'tcp port 80' } pkt.setfilter(filters['HTTP'])
pcap_filepath = 'pkts/pkts_{}.pcap'.format(time.strftime("%Y%m%d-%H%M%S", time.localtime())) pcap_file = open(pcap_filepath, 'wb') writer = dpkt.pcap.Writer(pcap_file) print('Start capture...') try: pkts_count = 0 for ptime, pdata in pkt: writer.writepkt(pdata, ptime) anlysisData_requests(pdata) anlysisData_response(pdata) pkts_count += 1 except KeyboardInterrupt as e: writer.close() pcap_file.close() if not pkts_count: os.remove(pcap_filepath) print('%d packets received'%(pkts_count))
def log(x): print("\x1B[36m{}\x1B[0m".format(x))
def anlysisData_requests(data): eth = dpkt.ethernet.Ethernet(data) if isinstance(eth.data, dpkt.ip.IP): ip = eth.data if isinstance(ip.data, dpkt.tcp.TCP): """tcp""" tcp = ip.data if tcp.dport == 80 or tcp.sport == 80: try: http = dpkt.http.Request(tcp.data) log("http requests->") print(http) except (dpkt.dpkt.NeedData, dpkt.dpkt.UnpackError): pass
def anlysisData_response(data): ether_pkt = Ether(data) try: ip_pkt = ether_pkt[IP] tcp_pkt = ip_pkt[TCP] if tcp_pkt.haslayer(Raw): if ip_pkt.flags.DF == 1: log("该报文没有分片") else: log("该报文分片了") raw_pkt = tcp_pkt[Raw] load = raw_pkt.load log("http response->") print(load) except: pass
def main(): iface = 'ens33' captureData(iface)
if __name__ == "__main__": main()
|