scapy 包总结
本文主要依据官网文档(英文) 和源码相关函数用法编写。如有错误,欢迎反馈。
scapy 包介绍
scapy 包模块主要用来构造或者伪造网络中的各种数据报文。提供了从 Ether 层、IP 层、传输层(UDP/TCP)、数据层各层的数据报文字段的构造方法。也可以用来解析数据报文。并能实现伪造异常报文,网络攻击,探测等功能。
scapy 模块还支持构造 IPv6 报文,802.11 无线报文以及蓝牙报文(只支持 Linux 系统)。
安装
python3.6,scapy 最新版本 2.4.0,本文根据此版本编写。
pip3 install scapy
用法
本文主要具体通过查看 scapy 包模块下的相关源码、查看官方文档以及开发工具程序实践,等方式总结 scapy 包的一些常用的用法。
涉及到更高深或更复杂的用法,请点击这里查看 官方文档 。
构造常见协议报文
site-packages/scapy/layers 下,包含构造各种协议报文的方法。有构造其他协议报文需求的读者建议可以去看这下面的源码。
对于报文类,如果想知道报文类提供了字段选项,可以用 ls() 方法查看,如下所示
>>> ls(IP)
构造 ICMP 报文
str='abdndjafn'
pkt=IP(src,dst=192.168.1.1)/ICMP(type=8)/Raw(str)
构造 DHCP 报文
>>> pkt=Ether(src=self.smac,dst="ff:ff:ff:ff:ff:ff")/IP(src=self.sip,dst="255.255.255.255")/UDP(sport=68,dport=67)/BOOTP(chaddr=[mac2str(self.smac)],ciaddr=self.ciaddr,xid=translate_id)/DHCP(options=[("message-type","request"),("hostname",self.hostname),"end",('pad',self.pad)])
构造 IGMPv2 报文
from scapy.contrib.igmp import *
pkt=Ether(src,dst)/ip(src,dst)/IGMP(type=0x11,mrode=20,gaddr="")/Padding('000000000')
构造 IGMPv3 报文
from scapy.contrib.igmp import *
from scapy.contrib.igmp import *
a=Ether()
b=IP(src,dst)
c=IGMPv3(type=0x11,mrcode=10)
d=IGMPv3mq(gaddr='',qqic=125,qrv=2,srcaddrs=['192.168.10.1','192.168.11.1'])
pkt=a/b/c/d
构造数据传输 UDP 报文
pkt=Ether(src,dst)/ip(src,dst)/UDP(sport,dport)/Raw()
构造数据传输 TCP 报文
>>> pkt=Ether(src,dst)/ip(src,dst)/TCP(sport,dport)/Raw()
构造 PPPoE 报文
>>> pkt=Ether(src,dst)/PPP()
报文长度填充
报文长度填充实例如下所示
>>> Padding(str='11111111')
>>> Raw(load='advd')
具体参考源码 site-packages/scapy/packet.py
常用工具函数
列出所有 scapy 中的命令或函数
>>> lsc()
打印查看某个函数或者类的帮助信息
>>> help(sendp)
查看报文字段信息
>>> ls(pkt)
显示一个报文摘要
>>> pkt.summary()
展示报文内容
>>> pkt.show()
>>> pkt.display()
显示聚合的数据包(例如,计算好了校验和的)
>>> pkt.show2()
返回可以生产数据包的 Scapy 命令
>>> pkt.command()
traceroute 方法
>>> Traceroute('114.114.114.114')
发送报文
发送报文方法的源码路径
site-packages/scapy/sendrecv.py
send() 方法
三层以上,不能指定网络接口。
当 loop=1 时,一直发包。以下例子为发 10 个报文,报文间隔为 1s。
send(pkt,loop=0,inter=1,count=10)
sendp() 方法
工作在 2 层,发包时必须指定网络接口。其他参数与 send() 方法一致。
sendp(pkt,iface,loop=0,inter=1,count=5)
sendpfast() 方法
工作在二层,可以指定网络接口和速率发包。Windows 平台需要其他依赖库。
sendpfast(pkt,iface,pps,mbps,loop=0)
sr() 方法
发送数据包和接收响应,工作在 3 层(IP 和 ARP)。该函数返回有回应的数据包和没有回应的数据包。返回的两个列表数据,第一个就是发送的数据包及其应答组成的列表,第二个是无应答数据包组成的列表。
>>> sr(pkt)
>>> ans,unans=_
>>> ans.summary()
sr1() 方法
与 sr 类似,工作在 3 层(IP 和 ARP)。但它只返回应答发送的分组(或分组集),用来返回一个应答数据包。
>>> pkt1=IP()/ICMP()/Padding(str)
>>> p=sr1(pkt)
>>> p.show()
>>> pkt=IP(dst=‘192.168.1.1’)/UDP()/DNS(rd=1,qd=DNSQR(qname=‘ www.baidu.com ’))
>>> ans=sr1(pkt_dns)
>>> ans.show()
srp() 方法
工作在二层(Ether,802.3),返回 2 个列表数据,第一个为返回结果,第二个无应答数据包组成
>>> pkt=Ether(dst=“ff:ff:ff:ff:ff:ff”)/ARP(pdst=“192.168.1.0/24”,timeout=2)
>>> ans,unans=srp(pkt)
>>> ans.show()
数据包处理
嗅探报文
sniff(count=0,offline=None,store=True,prn=None,filter,timeout,iface)
- count=200 抓到 200 个报文,即停止嗅探报文
- offline=‘D:\2019H1work\igmp.pcap’ 解析本地 cap 文件
- prn 为报文处理函数,回调此函数对抓到的报文进行处理
- filter=‘arp or icmp or (udp and src port 68 and dst port 67)’ 过滤条件,伯克利语法
- timeout 抓包时间,默认为 None
- store=0 避免将所有的数据包存储在内存。
保存数据报文
pkts=sniff(filter=‘arp or icmp’,iface=‘eth1’,timeout=120)
wrpcap(‘test.cap’,pkts) #写到当前目录 test.cap 中
读取本地数据报文
有 2 种方法可以实现读取本地数据包,如下
pkts= rdpcap(‘D:\\2019H1work\\igmp.pcap’ )
pkts= sniff(offline=‘D:\\2019H1work\\igmp.pcap’ )
数据包内容提取
其实在 sniff() 嗅探报文或者 rdpcap() 读取报文的时候,报文已经自动解析完成了。
我们可以直接获取报文不同层的字段值
>>> pkts=sniff(filter=‘igmp’,iface=‘eth1’)
>>> type(pkts) #报文类型
>>> pkts.show() #报文列表
>>> pkts[0][IP].show #第一个报文的 IP 层
>>> pkts[0][Ether].src #第一个报文的源 MAC 地址
>>> pkts[IGMP].show() #只显示 IGMP 报文
>>> pkts[IGMP][0][IGMP].show() #显示 IGMP 报文第一个报文的 IGMP 层字段信息
代码实例
实例一
#encoding=utf-8
import psutil
from scapy.config import *
from scapy.layers.all import *
from scapy.packet import *
from scapy.contrib.igmp import *
from scapy.sendrecv import *
import wmi
class IgmpSer(object):
#初始化参数
def __init__(self):
self.sip='192.168.11.1'
self.pro_dip='224.0.0.1'
self.data_dip='239.192.45.86'
self.data_dmac=''
self.inter=1
self.iface_name='test'
self.iface="Intel(R) 82579LM Gigabit Network Connection"
#选择网卡,并获取 iface name,并返回网卡名列表
def select_iface(self):
name_list=[]
ifaces=psutil.net_if_addrs()
for k,v in ifaces.items():
for item in v:
if item[0] == 2 and not item[1] == '127.0.0.1':
iface.append(k)
elif item[0] == -1 and not item[1] == '00-00-00-00-00-00-00-E0':
# iface.append((k,item[1]))
pass
return name_list
# 获取选定网卡的 MAC 地址,功能无法生效
def get_mac_addr(self,iface):
mac={}
ifaces=psutil.net_if_addrs()
for k,v in ifaces.items():
for item in v:
if k == iface and item[0] == -1 and not item[1] == '00-00-00-00-00-00-00-E0':
mac=item[1]
mac=mac.replace("-",":")
# print(mac)
# print(k,item[0],item[1])
return mac
#返回网卡描述信息的列表,用以 sendp 传参
def get_iface_desc(self):
iface_list=[]
w=wmi.WMI()
iface=w.Win32_NetworkAdapterConfiguration (IPEnabled=1)
for interface in iface:
iface_desc=interface.Description
iface_list.append(iface_desc)
return iface_list
#根据组播 IP 生成组播 Mac 地址
def get_multicast_mac(self,ip):
mac_s='01:00:5e:'
ip_list=ip.split('.')[1:]
num_list=list(map(int,ip_list))
i=0
if num_list[0] >= 128:
num_list[0]=num_list[0]-128
for v in num_list:
str=hex(v)
if len(str)==3:
str=str.replace("0x","0")
num_list[i]=str
elif len(str)==4:
str=str.replace("0x","")
num_list[i]=str
i=i+1
mac_end=':'.join(num_list)
mac_addr=mac_s+mac_end
return mac_addr
#构造、发送通用组查询报文
def general_query(self):
srcmac=self.get_mac_addr(self.iface_name)
dstmac=self.get_multicast_mac('224.0.0.1')
str='00000000000000'
a=Ether(src=srcmac,dst=dstmac)
b=IP(src=self.sip,dst='224.0.0.1')
c=IGMP(type=0x11,gaddr='224.2.2.4')
pkt=a/b/c
a=Ether(src=src_mac,dst='')
b=IP(src='',dst='1.1.1.1')
c=IGMP(type=0x11,gaddr='')
x=a/b/c
x[IGMP].igmpize()
#pkt=IP(dst='224.0.0.1',src='192.168.11.1')/IGMP(type=0x11,gaddr='')/Padding(str)
sendp(pkt,iface=self.iface,verbose=False)
# send(pkt)
#构造、发送特定组查询报文
def general_query_speical(self):
src_mac=self.get_mac_addr(self.iface)
print('src_mac',src_mac)
dst_mac=self.get_multicast_mac(self.data_dip)
print('dst_mac',dst_mac)
a1=Ether(src,dst)
b1=IP(src='',dst='224.0.0.1')
c1=IGMP(type=0x11,gaddr=self.data_dip)
pkt=a1/b1/c1/Padding(str)
a=Ether()
b=IP(src=self.sip,dst=self.data_dip)
c=IGMP(type=0x11,gaddr=self.data_dip)
pkt=a/b/c
# send(p)
sendp(pkt,iface=self.iface,inter=1,count=4)
#构造组播数据报文
def udp_stream(self,**kwrgs):
dst_mac=self.get_multicast_mac('0.0.0.0')
src_mac=self.get_mac_addr(self.iface)
print(src_mac)
str='0000000000000'
a=Ether()
b=IP(src=self.sip,dst='0.0.0.0',flags=2)
c=UDP(sport=7411,dport=7411)
pkt=a/b/c/Raw(load=("AAAAAAAaaaaaaa") + str)
sendp(pkt,iface=self.iface,loop=1,inter=2)
# sendpfast(pkt,iface=self.iface,loop=1,mbps=8)
#构造组播 IPv4 报文
def ipv4_stream(self):
dst_mac=self.get_multicast_mac(self.data_dip)
src_mac=self.get_mac_addr(self.iface)
str='00000000000000000000000000'
a=Ether(src=src_mac,dst=dst_mac)
b=IP(src=self.sip,dst=self.data_dip)
pkt=a/b/Padding(str)
sendp(pkt,iface=self.iface,loop=1,inter=1)
#嗅探过滤报文
def sniff_pkt(self):
sniff(filter='',store=0,timeout=20,prn=Packet.summary,iface=self.iface)
#对嗅探的报文,调用此函数,进行下一步处理
def parser_pkt(self,pkt):
pass
if __name__ == '__main__':
igmpser=IgmpSer()
# igmpser.get_multicast_mac(ip='224.1.1.1')
# print(igmpser.get_mac_addr(iface='test'))
# print(igmpser.select_iface())
igmpser.general_query_speical()
实例二
工具应用
PyQt 和 scapy 库开发发包工具,请访问我的 Github 。
技术难点
1、怎么解决在 windows 系统下选择指定网卡接口进行发包、嗅探的问题,cmd 中输入命令 wmic ->NIC ,查看网卡接口的 Index。 sendp(pkt,iface=’eth7’)
2、怎么解决 UDP 报文长度过大,能自动分片的问题
3、sendpfast() 方法发送大流量,报 type 类型错误的问题
4、怎么解决拉起线程,持续监听报文,解析报文的问题
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。
上一篇: xlrd 模块用法的一些总结
下一篇: 彻底找到 Tomcat 启动速度慢的元凶
绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论