scapy 包总结

发布于 2024-06-20 06:21:41 字数 11124 浏览 22 评论 0

本文主要依据官网文档(英文) 和源码相关函数用法编写。如有错误,欢迎反馈。

scapy 包介绍

scapy 包模块主要用来构造或者伪造网络中的各种数据报文。提供了从 Ether 层、IP 层、传输层(UDP/TCP)、数据层各层的数据报文字段的构造方法。也可以用来解析数据报文。并能实现伪造异常报文,网络攻击,探测等功能。

scapy 模块还支持构造 IPv6 报文,802.11 无线报文以及蓝牙报文(只支持 Linux 系统)。

安装

python3.6,scapy 最新版本 2.4.0,本文根据此版本编写。

pip3 install scapy

用法

本文主要具体通过查看 scapy 包模块下的相关源码、查看官方文档以及开发工具程序实践,等方式总结 scapy 包的一些常用的用法。

涉及到更高深或更复杂的用法,请点击这里查看 官方文档

构造常见协议报文

site-packages/scapy/layers 下,包含构造各种协议报文的方法。有构造其他协议报文需求的读者建议可以去看这下面的源码。

对于报文类,如果想知道报文类提供了字段选项,可以用 ls() 方法查看,如下所示

>>> ls(IP)

构造 ICMP 报文

str='abdndjafn'
pkt=IP(src,dst=192.168.1.1)/ICMP(type=8)/Raw(str)

构造 DHCP 报文

>>> pkt=Ether(src=self.smac,dst="ff:ff:ff:ff:ff:ff")/IP(src=self.sip,dst="255.255.255.255")/UDP(sport=68,dport=67)/BOOTP(chaddr=[mac2str(self.smac)],ciaddr=self.ciaddr,xid=translate_id)/DHCP(options=[("message-type","request"),("hostname",self.hostname),"end",('pad',self.pad)])

构造 IGMPv2 报文

from scapy.contrib.igmp import *
pkt=Ether(src,dst)/ip(src,dst)/IGMP(type=0x11,mrode=20,gaddr="")/Padding('000000000')

构造 IGMPv3 报文

from scapy.contrib.igmp import *
from scapy.contrib.igmp import *
a=Ether()
b=IP(src,dst)
c=IGMPv3(type=0x11,mrcode=10)
d=IGMPv3mq(gaddr='',qqic=125,qrv=2,srcaddrs=['192.168.10.1','192.168.11.1'])
pkt=a/b/c/d

构造数据传输 UDP 报文

pkt=Ether(src,dst)/ip(src,dst)/UDP(sport,dport)/Raw()

构造数据传输 TCP 报文

>>> pkt=Ether(src,dst)/ip(src,dst)/TCP(sport,dport)/Raw()

构造 PPPoE 报文

>>> pkt=Ether(src,dst)/PPP()

报文长度填充

报文长度填充实例如下所示

>>> Padding(str='11111111')
>>> Raw(load='advd')

具体参考源码 site-packages/scapy/packet.py

常用工具函数

列出所有 scapy 中的命令或函数

>>> lsc()

打印查看某个函数或者类的帮助信息

>>> help(sendp)

查看报文字段信息

>>> ls(pkt)

显示一个报文摘要

>>> pkt.summary()

展示报文内容

>>> pkt.show()
>>> pkt.display()

显示聚合的数据包(例如,计算好了校验和的)

>>> pkt.show2()

返回可以生产数据包的 Scapy 命令

>>> pkt.command()

traceroute 方法

>>> Traceroute('114.114.114.114')

发送报文

发送报文方法的源码路径

site-packages/scapy/sendrecv.py

send() 方法

三层以上,不能指定网络接口。

当 loop=1 时,一直发包。以下例子为发 10 个报文,报文间隔为 1s。

send(pkt,loop=0,inter=1,count=10)

sendp() 方法

工作在 2 层,发包时必须指定网络接口。其他参数与 send() 方法一致。

sendp(pkt,iface,loop=0,inter=1,count=5)

sendpfast() 方法

工作在二层,可以指定网络接口和速率发包。Windows 平台需要其他依赖库。

sendpfast(pkt,iface,pps,mbps,loop=0)

sr() 方法

发送数据包和接收响应,工作在 3 层(IP 和 ARP)。该函数返回有回应的数据包和没有回应的数据包。返回的两个列表数据,第一个就是发送的数据包及其应答组成的列表,第二个是无应答数据包组成的列表。

>>> sr(pkt) 
>>> ans,unans=_
>>> ans.summary()

sr1() 方法

与 sr 类似,工作在 3 层(IP 和 ARP)。但它只返回应答发送的分组(或分组集),用来返回一个应答数据包。

>>> pkt1=IP()/ICMP()/Padding(str)
>>> p=sr1(pkt)
>>> p.show()
>>> pkt=IP(dst=‘192.168.1.1’)/UDP()/DNS(rd=1,qd=DNSQR(qname=‘ www.baidu.com ’))
>>> ans=sr1(pkt_dns)
>>> ans.show()

srp() 方法

工作在二层(Ether,802.3),返回 2 个列表数据,第一个为返回结果,第二个无应答数据包组成

>>> pkt=Ether(dst=“ff:ff:ff:ff:ff:ff”)/ARP(pdst=“192.168.1.0/24”,timeout=2)
>>> ans,unans=srp(pkt)
>>> ans.show()

数据包处理

嗅探报文

sniff(count=0,offline=None,store=True,prn=None,filter,timeout,iface)
  • count=200 抓到 200 个报文,即停止嗅探报文
  • offline=‘D:\2019H1work\igmp.pcap’ 解析本地 cap 文件
  • prn 为报文处理函数,回调此函数对抓到的报文进行处理
  • filter=‘arp or icmp or (udp and src port 68 and dst port 67)’ 过滤条件,伯克利语法
  • timeout 抓包时间,默认为 None
  • store=0 避免将所有的数据包存储在内存。

保存数据报文

pkts=sniff(filter=‘arp or icmp’,iface=‘eth1’,timeout=120)
wrpcap(‘test.cap’,pkts) #写到当前目录 test.cap 中

读取本地数据报文

有 2 种方法可以实现读取本地数据包,如下

pkts= rdpcap(‘D:\\2019H1work\\igmp.pcap’ ) 
pkts= sniff(offline=‘D:\\2019H1work\\igmp.pcap’ )

数据包内容提取

其实在 sniff() 嗅探报文或者 rdpcap() 读取报文的时候,报文已经自动解析完成了。

我们可以直接获取报文不同层的字段值

>>> pkts=sniff(filter=‘igmp’,iface=‘eth1’)
>>> type(pkts) #报文类型
>>> pkts.show() #报文列表
>>> pkts[0][IP].show #第一个报文的 IP 层
>>> pkts[0][Ether].src #第一个报文的源 MAC 地址
>>> pkts[IGMP].show() #只显示 IGMP 报文
>>> pkts[IGMP][0][IGMP].show() #显示 IGMP 报文第一个报文的 IGMP 层字段信息

代码实例

实例一

#encoding=utf-8
import psutil
from scapy.config import *
from scapy.layers.all import *
from scapy.packet import *
from scapy.contrib.igmp import *
from scapy.sendrecv import *
import wmi
class IgmpSer(object):
#初始化参数
def __init__(self):
self.sip='192.168.11.1'
self.pro_dip='224.0.0.1'
self.data_dip='239.192.45.86'
self.data_dmac=''
self.inter=1
self.iface_name='test'
self.iface="Intel(R) 82579LM Gigabit Network Connection"
#选择网卡,并获取 iface name,并返回网卡名列表
def select_iface(self):
name_list=[]
ifaces=psutil.net_if_addrs()
for k,v in ifaces.items():
for item in v:
if item[0] == 2 and not item[1] == '127.0.0.1':
iface.append(k)
elif item[0] == -1 and not item[1] == '00-00-00-00-00-00-00-E0':
# iface.append((k,item[1]))
pass
return name_list
# 获取选定网卡的 MAC 地址,功能无法生效
def get_mac_addr(self,iface):
mac={}
ifaces=psutil.net_if_addrs()
for k,v in ifaces.items():
for item in v:
if k == iface and item[0] == -1 and not item[1] == '00-00-00-00-00-00-00-E0':
mac=item[1]
mac=mac.replace("-",":")
# print(mac)
# print(k,item[0],item[1])
return mac
#返回网卡描述信息的列表,用以 sendp 传参
def get_iface_desc(self):
iface_list=[]
w=wmi.WMI()
iface=w.Win32_NetworkAdapterConfiguration (IPEnabled=1)
for interface in iface:
iface_desc=interface.Description
iface_list.append(iface_desc)
return iface_list
#根据组播 IP 生成组播 Mac 地址
def get_multicast_mac(self,ip):
mac_s='01:00:5e:'
ip_list=ip.split('.')[1:]
num_list=list(map(int,ip_list))
i=0
if num_list[0] >= 128:
num_list[0]=num_list[0]-128
for v in num_list:
str=hex(v)
if len(str)==3:
str=str.replace("0x","0")
num_list[i]=str
elif len(str)==4:
str=str.replace("0x","")
num_list[i]=str
i=i+1
mac_end=':'.join(num_list)
mac_addr=mac_s+mac_end
return mac_addr
#构造、发送通用组查询报文
def general_query(self):
srcmac=self.get_mac_addr(self.iface_name)
dstmac=self.get_multicast_mac('224.0.0.1')
str='00000000000000'
a=Ether(src=srcmac,dst=dstmac)
b=IP(src=self.sip,dst='224.0.0.1')
c=IGMP(type=0x11,gaddr='224.2.2.4')
pkt=a/b/c
a=Ether(src=src_mac,dst='')
b=IP(src='',dst='1.1.1.1')
c=IGMP(type=0x11,gaddr='')
x=a/b/c
x[IGMP].igmpize()
#pkt=IP(dst='224.0.0.1',src='192.168.11.1')/IGMP(type=0x11,gaddr='')/Padding(str)
sendp(pkt,iface=self.iface,verbose=False)
# send(pkt)
#构造、发送特定组查询报文
def general_query_speical(self):
src_mac=self.get_mac_addr(self.iface)
print('src_mac',src_mac)
dst_mac=self.get_multicast_mac(self.data_dip)
print('dst_mac',dst_mac)
a1=Ether(src,dst)
b1=IP(src='',dst='224.0.0.1')
c1=IGMP(type=0x11,gaddr=self.data_dip)
pkt=a1/b1/c1/Padding(str)
a=Ether()
b=IP(src=self.sip,dst=self.data_dip)
c=IGMP(type=0x11,gaddr=self.data_dip)
pkt=a/b/c
# send(p)
sendp(pkt,iface=self.iface,inter=1,count=4)
#构造组播数据报文
def udp_stream(self,**kwrgs):
dst_mac=self.get_multicast_mac('0.0.0.0')
src_mac=self.get_mac_addr(self.iface)
print(src_mac)
str='0000000000000'
a=Ether()
b=IP(src=self.sip,dst='0.0.0.0',flags=2)
c=UDP(sport=7411,dport=7411)
pkt=a/b/c/Raw(load=("AAAAAAAaaaaaaa") + str)
sendp(pkt,iface=self.iface,loop=1,inter=2)
# sendpfast(pkt,iface=self.iface,loop=1,mbps=8)
#构造组播 IPv4 报文
def ipv4_stream(self):
dst_mac=self.get_multicast_mac(self.data_dip)
src_mac=self.get_mac_addr(self.iface)
str='00000000000000000000000000'
a=Ether(src=src_mac,dst=dst_mac)
b=IP(src=self.sip,dst=self.data_dip)
pkt=a/b/Padding(str)
sendp(pkt,iface=self.iface,loop=1,inter=1)
#嗅探过滤报文
def sniff_pkt(self):
sniff(filter='',store=0,timeout=20,prn=Packet.summary,iface=self.iface)
#对嗅探的报文,调用此函数,进行下一步处理
def parser_pkt(self,pkt):
pass
if __name__ == '__main__':
igmpser=IgmpSer()
# igmpser.get_multicast_mac(ip='224.1.1.1')
# print(igmpser.get_mac_addr(iface='test'))
# print(igmpser.select_iface())
igmpser.general_query_speical()

实例二



工具应用

PyQt 和 scapy 库开发发包工具,请访问我的 Github

技术难点

1、怎么解决在 windows 系统下选择指定网卡接口进行发包、嗅探的问题,cmd 中输入命令 wmic ->NIC ,查看网卡接口的 Index。 sendp(pkt,iface=’eth7’)

2、怎么解决 UDP 报文长度过大,能自动分片的问题
3、sendpfast() 方法发送大流量,报 type 类型错误的问题

4、怎么解决拉起线程,持续监听报文,解析报文的问题

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据

关于作者

提笔落墨

暂无简介

文章
评论
26 人气
更多

推荐作者

迎风吟唱

文章 0 评论 0

qq_hXErI

文章 0 评论 0

茶底世界

文章 0 评论 0

捎一片雪花

文章 0 评论 0

文章 0 评论 0

    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文