老李分享:使用 Python 的 Socket 模块开发 UDP 扫描工具
poptest是业内唯一的测试开发工程师培训机构,测试开发工程师主要是为测试服务开发测试工具,在工作中要求你做网络级别的安全性测试,但是条件限制你无法用商业工具,所以自己动手要写测试工具,在这里我们在测试开发工程师的就业培训中构建了一个场景,就是自己开发udp扫描工具,我们在现阶段主要是用python为主要开发语言来实现各种场景下的测试,而quicktestprofessional的培训我们已经免费。首先我们了解下概念:套接字能够访问底层网络信息。如,我们可以用它来检查IP和ICMP报头,他们都是OSI模型中网络层协议。
使用UDP数据包可以当发送信息穿越子网时,不同于TCP,需要三次握手。我们需要做的仅仅是等待ICMP报文回应,来判断对方主机是否可用或不可访问。ICMP协议本质上是一个特殊的控制协议,它可以指示错误报告和控制机器数据传输的的行为。
我们先创建一个socket 并将它绑定到一个外部网卡。这个网卡要启用混淆模式(promiscuous mode),他主要是获取经过这个网卡的所有数据包,包括目标地址不是它的数据包。使用 Windows 时要注意一点:我们需要发送一个 IOCTL 包才能将网卡设置为混淆模式。另外,虽然 linux 需要使用 ICMP,Windows 却可以以一种独立于协议的方式来嗅探收到的数据包。
import socket
import os
# host to listen
HOST = '192.168.1.114'
def sniffing(host, win, socket_prot):
while 1:
sniffer = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket_prot)
sniffer.bind((host, 0))
sniffer.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1)
if win == 1:
sniffer.ioctl(socket.SIO_RCVALL, socket_RCVALL_ON)
print sniffer.recvfrom(65565)
def main(host):
if os.name == 'nt':
sniffing(host, 1, socket.IPPROTO_IP)
else:
sniffing(host, 0, socket.IPPROTO_ICMP)
if __name__ == '__main__':
main(HOST)
在终端中运行如下命令进行测试: $ sudo python sniffer.py
('E\x00\x00T\xb3\xec\x00\x005\x01\xe4\x13J}\xe1\x11\xc0\xa8\x01r\x00\x00v\xdfx\xa2\x00\x01sr\x98T\x00\x00\x00\x008\xe3\r\x00\x00\x00\x00\x00\x10\x11\x12\x13\x14\x15\x16\x17\x18\x19\x1a\x1b\x1c\x1d\x1e\x1f !"#$%&\'()*+,-./01234567', ('74.125.225.17', 0))
('E\x00\x00T\xb4\x1b\x00\x005\x01\xe3\xe4J}\xe1\x11\xc0\xa8\x01r\x00\x00~\xd7x\xa2\x00\x02tr\x98T\x00\x00\x00\x00/\xea\r\x00\x00\x00\x00\x00\x10\x11\x12\x13\x14\x15\x16\x17\x18\x19\x1a\x1b\x1c\x1d\x1e\x1f !"#$%&\'()*+,-./01234567', ('74.125.225.17', 0))
我们需要对这些数据包进行解码
下面我们来研究下协议数据包组成,进行IP ICMP层解码
IP 头
典型的 IP 头有如下结构,每个字段都对应一个变量
ICMP头
ICMP 由于内容的不同其消息类型也不同,但每个消息都包括三个一致的元素:type,code (告知接收主机ICMP消息的解码类型)和 checksum。
对于我们的扫描器,如果得到的 type 和 code 值是3,这意味着 Destination Unreachable(目标不可达)和 Port Unreachable (端口不可达) ICMP 消息错误
为描述 ICMP 消息头,用 python 的 ctypes 库来创建一个类
import ctypes
class ICMP(ctypes.Structure):
_fields_ = [
('type', ctypes.c_ubyte),
('code', ctypes.c_ubyte),
('checksum', ctypes.c_ushort),
('unused', ctypes.c_ushort),
('next_hop_mtu',ctypes.c_ushort)
]
def __new__(self, socket_buffer):
return self.from_buffer_copy(socket_buffer)
def __init__(self, socket_buffer):
pass
编写消息头解码器
开发 IP/ICMP 消息头解码器。脚本创建了一个 sniffer socket,然后在循环中持续读取数据包并进行解码。
注意代码中将 IP 头的前20个字节读取到了缓存,然后再打印消息头的变量。
ICMP 头数据如下:
import socket
import os
import struct
import ctypes
from ICMPHeader import ICMP
# host to listen on
HOST = '192.168.xxx.xxx'
def main():
socket_protocol = socket.IPPROTO_ICMP
sniffer = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket_protocol)
sniffer.bind(( HOST, 0 ))
sniffer.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1)
while 1:
raw_buffer = sniffer.recvfrom(65565)
ip_header = raw_buffer
iph = struct.unpack('!BBHHHBBH4s4s' , ip_header)
version_ihl = iph
version = version_ihl >> 4
ihl = version_ihl & 0xF
iph_length = ihl * 4
ttl = iph
protocol = iph
s_addr = socket.inet_ntoa(iph);
d_addr = socket.inet_ntoa(iph);
print 'IP -> Version:' + str(version) + ', Header Length:' + str(ihl) + \
', TTL:' + str(ttl) + ', Protocol:' + str(protocol) + ', Source:'\
+ str(s_addr) + ', Destination:' + str(d_addr)
buf = raw_buffer
icmp_header = ICMP(buf)
print "ICMP -> Type:%d, Code:%d" %(icmp_header.type, icmp_header.code) + '\n'
if __name__ == '__main__':
main()
我们运行 traceroute看下数据包情况:
$ traceroute www.google.com
traceroute to www.google.com (74.125.226.50), 30 hops max, 60 byte packets
1* * *
2* * *
367.59.255.137 (67.59.255.137)17.183 ms 67.59.255.129 (67.59.255.129)70.563 ms 67.59.255.137 (67.59.255.137)21.480 ms
4451be075.cst.lightpath.net (65.19.99.117)14.639 ms rtr102.wan.hcvlny.cv.net (65.19.99.205)24.086 ms 451be075.cst.lightpath.net (65.19.107.117)24.025 ms
564.15.3.246 (64.15.3.246)24.005 ms 64.15.0.218 (64.15.0.218)23.961 ms 451be0c2.cst.lightpath.net (65.19.120.194)23.935 ms
672.14.215.203 (72.14.215.203)23.872 ms46.943 ms *
7216.239.50.141 (216.239.50.141)48.906 ms46.138 ms46.122 ms
8209.85.245.179 (209.85.245.179)46.108 ms46.095 ms46.074 ms
9lga15s43-in-f18.1e100.net (74.125.226.50)45.997 ms19.507 ms16.607 ms
会得到这种输出 (注意 ICMP 的响应类型):
sudo python ip_header_decode.py
IP -> Version:4, Header Length:5, TTL:252, Protocol:1, Source:65.19.99.117, Destination:192.168.1.114
ICMP -> Type:11, Code:0(...)IP -> Version:4, Header Length:5, TTL:250, Protocol:1, Source:72.14.215.203, Destination:192.168.1.114
ICMP -> Type:11, Code:0
IP -> Version:4, Header Length:5, TTL:56, Protocol:1, Source:74.125.226.50, Destination:192.168.1.114
ICMP -> Type:3, Code:3
IP -> Version:4, Header Length:5, TTL:249, Protocol:1, Source:216.239.50.141, Destination:192.168.1.114
ICMP -> Type:11, Code:0(...)IP -> Version:4, Header Length:5, TTL:56, Protocol:1, Source:74.125.226.50, Destination:192.168.1.114
ICMP -> Type:3, Code:3
开发扫描器
在编写完整的扫描器前首先要安装 netaddr,它是一个用于表示和处理网络地址的 python 库。
Netaddr 提供了操作 IPv4,IPv6 和子网 Mac 等地址的能力。它非常有用,因为我们会用到子网掩码,如192.168.1.0/24
$ sudo pip install netaddr
我们可以使用如下的代码段来测试这个库 (成功会打印“OK”):
import netaddr
ip = '192.168.xxx.xxx'
if ip in netaddr.IPNetwork('192.168.xxx.0/24'):
print('OK!')
进一步强化扫描器
我们会将上面所提到的组织在一起来完成我们的扫描器,然后添加一个循环来向目标子网内的所有地址发送 UDP 数据报。
import threading
import time
import socket
import os
import struct
from netaddr import IPNetwork, IPAddress
from ICMPHeader import ICMP
import ctypes
HOST = '192.168.xxx.xxx'
SUBNET = '192.168.xxx.0/24'
MESSAGE = 'hellooooo'
def udp_sender(SUBNET, MESSAGE):
time.sleep(5)
sender = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
for ip in IPNetwork(SUBNET):
try:
sender.sendto(MESSAGE, ("%s" % ip, 65212))
except:
pass
def main():
t = threading.Thread(target=udp_sender, args=(SUBNET, MESSAGE))
t.start()
socket_protocol = socket.IPPROTO_ICMP
sniffer = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket_protocol)
sniffer.bind(( HOST, 0 ))
sniffer.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1)
while 1:
raw_buffer = sniffer.recvfrom(65565)
ip_header = raw_buffer
iph = struct.unpack('!BBHHHBBH4s4s' , ip_header)
version_ihl = iph
ihl = version_ihl & 0xF
iph_length = ihl * 4
src_addr = socket.inet_ntoa(iph);
buf = raw_buffer
icmp_header = ICMP(buf)
# check for the type 3 and code and within our target subnet
if icmp_header.code == 3 and icmp_header.type == 3:
if IPAddress(src_addr) in IPNetwork(SUBNET):
if raw_buffer == MESSAGE:
print("Host up: %s" % src_addr)
if __name__ == '__main__':
main()
运行后得到的结果如下:
$ sudo python scanner.py
Host up: 192.168.1.114(...)
如果感兴趣可以进一步强化代码,进一步加强扫描器的功能,其实大家可以看到这里技术的难点是你对协议的熟悉程度,我们在做性能测试的过程中也会通过loadrunner去模拟协议的请求,所以关键是实现的内容,语言可以任意选择。欢迎大家咨询poptest测试开发工程师培训。
页:
[1]