scapy 包总结

发布于 2024-06-20 06:21:41 字数 11124 浏览 13 评论 0

本文主要依据官网文档(英文) 和源码相关函数用法编写。如有错误,欢迎反馈。

scapy 包介绍

scapy 包模块主要用来构造或者伪造网络中的各种数据报文。提供了从 Ether 层、IP 层、传输层(UDP/TCP)、数据层各层的数据报文字段的构造方法。也可以用来解析数据报文。并能实现伪造异常报文,网络攻击,探测等功能。

scapy 模块还支持构造 IPv6 报文,802.11 无线报文以及蓝牙报文(只支持 Linux 系统)。

安装

python3.6,scapy 最新版本 2.4.0,本文根据此版本编写。

pip3 install scapy

用法

本文主要具体通过查看 scapy 包模块下的相关源码、查看官方文档以及开发工具程序实践,等方式总结 scapy 包的一些常用的用法。

涉及到更高深或更复杂的用法,请点击这里查看 官方文档

构造常见协议报文

site-packages/scapy/layers 下,包含构造各种协议报文的方法。有构造其他协议报文需求的读者建议可以去看这下面的源码。

对于报文类,如果想知道报文类提供了字段选项,可以用 ls() 方法查看,如下所示

>>> ls(IP)

构造 ICMP 报文

str='abdndjafn'
pkt=IP(src,dst=192.168.1.1)/ICMP(type=8)/Raw(str)

构造 DHCP 报文

>>> pkt=Ether(src=self.smac,dst="ff:ff:ff:ff:ff:ff")/IP(src=self.sip,dst="255.255.255.255")/UDP(sport=68,dport=67)/BOOTP(chaddr=[mac2str(self.smac)],ciaddr=self.ciaddr,xid=translate_id)/DHCP(options=[("message-type","request"),("hostname",self.hostname),"end",('pad',self.pad)])

构造 IGMPv2 报文

from scapy.contrib.igmp import *
pkt=Ether(src,dst)/ip(src,dst)/IGMP(type=0x11,mrode=20,gaddr="")/Padding('000000000')

构造 IGMPv3 报文

from scapy.contrib.igmp import *
from scapy.contrib.igmp import *
a=Ether()
b=IP(src,dst)
c=IGMPv3(type=0x11,mrcode=10)
d=IGMPv3mq(gaddr='',qqic=125,qrv=2,srcaddrs=['192.168.10.1','192.168.11.1'])
pkt=a/b/c/d

构造数据传输 UDP 报文

pkt=Ether(src,dst)/ip(src,dst)/UDP(sport,dport)/Raw()

构造数据传输 TCP 报文

>>> pkt=Ether(src,dst)/ip(src,dst)/TCP(sport,dport)/Raw()

构造 PPPoE 报文

>>> pkt=Ether(src,dst)/PPP()

报文长度填充

报文长度填充实例如下所示

>>> Padding(str='11111111')
>>> Raw(load='advd')

具体参考源码 site-packages/scapy/packet.py

常用工具函数

列出所有 scapy 中的命令或函数

>>> lsc()

打印查看某个函数或者类的帮助信息

>>> help(sendp)

查看报文字段信息

>>> ls(pkt)

显示一个报文摘要

>>> pkt.summary()

展示报文内容

>>> pkt.show()
>>> pkt.display()

显示聚合的数据包(例如,计算好了校验和的)

>>> pkt.show2()

返回可以生产数据包的 Scapy 命令

>>> pkt.command()

traceroute 方法

>>> Traceroute('114.114.114.114')

发送报文

发送报文方法的源码路径

site-packages/scapy/sendrecv.py

send() 方法

三层以上,不能指定网络接口。

当 loop=1 时,一直发包。以下例子为发 10 个报文,报文间隔为 1s。

send(pkt,loop=0,inter=1,count=10)

sendp() 方法

工作在 2 层,发包时必须指定网络接口。其他参数与 send() 方法一致。

sendp(pkt,iface,loop=0,inter=1,count=5)

sendpfast() 方法

工作在二层,可以指定网络接口和速率发包。Windows 平台需要其他依赖库。

sendpfast(pkt,iface,pps,mbps,loop=0)

sr() 方法

发送数据包和接收响应,工作在 3 层(IP 和 ARP)。该函数返回有回应的数据包和没有回应的数据包。返回的两个列表数据,第一个就是发送的数据包及其应答组成的列表,第二个是无应答数据包组成的列表。

>>> sr(pkt) 
>>> ans,unans=_
>>> ans.summary()

sr1() 方法

与 sr 类似,工作在 3 层(IP 和 ARP)。但它只返回应答发送的分组(或分组集),用来返回一个应答数据包。

>>> pkt1=IP()/ICMP()/Padding(str)
>>> p=sr1(pkt)
>>> p.show()
>>> pkt=IP(dst=‘192.168.1.1’)/UDP()/DNS(rd=1,qd=DNSQR(qname=‘ www.baidu.com ’))
>>> ans=sr1(pkt_dns)
>>> ans.show()

srp() 方法

工作在二层(Ether,802.3),返回 2 个列表数据,第一个为返回结果,第二个无应答数据包组成

>>> pkt=Ether(dst=“ff:ff:ff:ff:ff:ff”)/ARP(pdst=“192.168.1.0/24”,timeout=2)
>>> ans,unans=srp(pkt)
>>> ans.show()

数据包处理

嗅探报文

sniff(count=0,offline=None,store=True,prn=None,filter,timeout,iface)
  • count=200 抓到 200 个报文,即停止嗅探报文
  • offline=‘D:\2019H1work\igmp.pcap’ 解析本地 cap 文件
  • prn 为报文处理函数,回调此函数对抓到的报文进行处理
  • filter=‘arp or icmp or (udp and src port 68 and dst port 67)’ 过滤条件,伯克利语法
  • timeout 抓包时间,默认为 None
  • store=0 避免将所有的数据包存储在内存。

保存数据报文

pkts=sniff(filter=‘arp or icmp’,iface=‘eth1’,timeout=120)
wrpcap(‘test.cap’,pkts) #写到当前目录 test.cap 中

读取本地数据报文

有 2 种方法可以实现读取本地数据包,如下

pkts= rdpcap(‘D:\\2019H1work\\igmp.pcap’ ) 
pkts= sniff(offline=‘D:\\2019H1work\\igmp.pcap’ )

数据包内容提取

其实在 sniff() 嗅探报文或者 rdpcap() 读取报文的时候,报文已经自动解析完成了。

我们可以直接获取报文不同层的字段值

>>> pkts=sniff(filter=‘igmp’,iface=‘eth1’)
>>> type(pkts) #报文类型
>>> pkts.show() #报文列表
>>> pkts[0][IP].show #第一个报文的 IP 层
>>> pkts[0][Ether].src #第一个报文的源 MAC 地址
>>> pkts[IGMP].show() #只显示 IGMP 报文
>>> pkts[IGMP][0][IGMP].show() #显示 IGMP 报文第一个报文的 IGMP 层字段信息

代码实例

实例一

#encoding=utf-8
import psutil
from scapy.config import *
from scapy.layers.all import *
from scapy.packet import *
from scapy.contrib.igmp import *
from scapy.sendrecv import *
import wmi
class IgmpSer(object):
#初始化参数
def __init__(self):
self.sip='192.168.11.1'
self.pro_dip='224.0.0.1'
self.data_dip='239.192.45.86'
self.data_dmac=''
self.inter=1
self.iface_name='test'
self.iface="Intel(R) 82579LM Gigabit Network Connection"
#选择网卡,并获取 iface name,并返回网卡名列表
def select_iface(self):
name_list=[]
ifaces=psutil.net_if_addrs()
for k,v in ifaces.items():
for item in v:
if item[0] == 2 and not item[1] == '127.0.0.1':
iface.append(k)
elif item[0] == -1 and not item[1] == '00-00-00-00-00-00-00-E0':
# iface.append((k,item[1]))
pass
return name_list
# 获取选定网卡的 MAC 地址,功能无法生效
def get_mac_addr(self,iface):
mac={}
ifaces=psutil.net_if_addrs()
for k,v in ifaces.items():
for item in v:
if k == iface and item[0] == -1 and not item[1] == '00-00-00-00-00-00-00-E0':
mac=item[1]
mac=mac.replace("-",":")
# print(mac)
# print(k,item[0],item[1])
return mac
#返回网卡描述信息的列表,用以 sendp 传参
def get_iface_desc(self):
iface_list=[]
w=wmi.WMI()
iface=w.Win32_NetworkAdapterConfiguration (IPEnabled=1)
for interface in iface:
iface_desc=interface.Description
iface_list.append(iface_desc)
return iface_list
#根据组播 IP 生成组播 Mac 地址
def get_multicast_mac(self,ip):
mac_s='01:00:5e:'
ip_list=ip.split('.')[1:]
num_list=list(map(int,ip_list))
i=0
if num_list[0] >= 128:
num_list[0]=num_list[0]-128
for v in num_list:
str=hex(v)
if len(str)==3:
str=str.replace("0x","0")
num_list[i]=str
elif len(str)==4:
str=str.replace("0x","")
num_list[i]=str
i=i+1
mac_end=':'.join(num_list)
mac_addr=mac_s+mac_end
return mac_addr
#构造、发送通用组查询报文
def general_query(self):
srcmac=self.get_mac_addr(self.iface_name)
dstmac=self.get_multicast_mac('224.0.0.1')
str='00000000000000'
a=Ether(src=srcmac,dst=dstmac)
b=IP(src=self.sip,dst='224.0.0.1')
c=IGMP(type=0x11,gaddr='224.2.2.4')
pkt=a/b/c
a=Ether(src=src_mac,dst='')
b=IP(src='',dst='1.1.1.1')
c=IGMP(type=0x11,gaddr='')
x=a/b/c
x[IGMP].igmpize()
#pkt=IP(dst='224.0.0.1',src='192.168.11.1')/IGMP(type=0x11,gaddr='')/Padding(str)
sendp(pkt,iface=self.iface,verbose=False)
# send(pkt)
#构造、发送特定组查询报文
def general_query_speical(self):
src_mac=self.get_mac_addr(self.iface)
print('src_mac',src_mac)
dst_mac=self.get_multicast_mac(self.data_dip)
print('dst_mac',dst_mac)
a1=Ether(src,dst)
b1=IP(src='',dst='224.0.0.1')
c1=IGMP(type=0x11,gaddr=self.data_dip)
pkt=a1/b1/c1/Padding(str)
a=Ether()
b=IP(src=self.sip,dst=self.data_dip)
c=IGMP(type=0x11,gaddr=self.data_dip)
pkt=a/b/c
# send(p)
sendp(pkt,iface=self.iface,inter=1,count=4)
#构造组播数据报文
def udp_stream(self,**kwrgs):
dst_mac=self.get_multicast_mac('0.0.0.0')
src_mac=self.get_mac_addr(self.iface)
print(src_mac)
str='0000000000000'
a=Ether()
b=IP(src=self.sip,dst='0.0.0.0',flags=2)
c=UDP(sport=7411,dport=7411)
pkt=a/b/c/Raw(load=("AAAAAAAaaaaaaa") + str)
sendp(pkt,iface=self.iface,loop=1,inter=2)
# sendpfast(pkt,iface=self.iface,loop=1,mbps=8)
#构造组播 IPv4 报文
def ipv4_stream(self):
dst_mac=self.get_multicast_mac(self.data_dip)
src_mac=self.get_mac_addr(self.iface)
str='00000000000000000000000000'
a=Ether(src=src_mac,dst=dst_mac)
b=IP(src=self.sip,dst=self.data_dip)
pkt=a/b/Padding(str)
sendp(pkt,iface=self.iface,loop=1,inter=1)
#嗅探过滤报文
def sniff_pkt(self):
sniff(filter='',store=0,timeout=20,prn=Packet.summary,iface=self.iface)
#对嗅探的报文,调用此函数,进行下一步处理
def parser_pkt(self,pkt):
pass
if __name__ == '__main__':
igmpser=IgmpSer()
# igmpser.get_multicast_mac(ip='224.1.1.1')
# print(igmpser.get_mac_addr(iface='test'))
# print(igmpser.select_iface())
igmpser.general_query_speical()

实例二



工具应用

PyQt 和 scapy 库开发发包工具,请访问我的 Github

技术难点

1、怎么解决在 windows 系统下选择指定网卡接口进行发包、嗅探的问题,cmd 中输入命令 wmic ->NIC ,查看网卡接口的 Index。 sendp(pkt,iface=’eth7’)

2、怎么解决 UDP 报文长度过大,能自动分片的问题
3、sendpfast() 方法发送大流量,报 type 类型错误的问题

4、怎么解决拉起线程,持续监听报文,解析报文的问题

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据

关于作者

提笔落墨

暂无简介

0 文章
0 评论
23 人气
更多

推荐作者

玍銹的英雄夢

文章 0 评论 0

我不会写诗

文章 0 评论 0

十六岁半

文章 0 评论 0

浸婚纱

文章 0 评论 0

qq_kJ6XkX

文章 0 评论 0

    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
    原文