设为首页 收藏本站
查看: 883|回复: 0

[经验分享] 老李分享:使用 Python 的 Socket 模块开发 UDP 扫描工具

[复制链接]

尚未签到

发表于 2015-11-30 13:24:21 | 显示全部楼层 |阅读模式
          poptest是业内唯一的测试开发工程师培训机构,测试开发工程师主要是为测试服务开发测试工具,在工作中要求你做网络级别的安全性测试,但是条件限制你无法用商业工具,所以自己动手要写测试工具,在这里我们在测试开发工程师的就业培训中构建了一个场景,就是自己开发udp扫描工具,我们在现阶段主要是用python为主要开发语言来实现各种场景下的测试,而quicktestprofessional的培训我们已经免费。
        首先我们了解下概念:套接字能够访问底层网络信息。如,我们可以用它来检查IPICMP报头,他们都是OSI模型中网络层协议。
        使用UDP数据包可以当发送信息穿越子网时,不同于TCP,需要三次握手。我们需要做的仅仅是等待ICMP报文回应,来判断对方主机是否可用或不可访问。ICMP协议本质上是一个特殊的控制协议,它可以指示错误报告和控制机器数据传输的的行为。
      我们先创建一个socket 并将它绑定到一个外部网卡。这个网卡要启用混淆模式(promiscuous mode),他主要是获取经过这个网卡的所有数据包,包括目标地址不是它的数据包。使用 Windows 时要注意一点:我们需要发送一个 IOCTL 包才能将网卡设置为混淆模式。另外,虽然 linux 需要使用 ICMP,Windows 却可以以一种独立于协议的方式来嗅探收到的数据包。
    
  import socket
import os

# host to listen
HOST = '192.168.1.114'

def sniffing(host, win, socket_prot):
      while 1:
            sniffer = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket_prot)
            sniffer.bind((host, 0))

            sniffer.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1)

            if win == 1:
                  sniffer.ioctl(socket.SIO_RCVALL, socket_RCVALL_ON)

            
            print sniffer.recvfrom(65565)

def main(host):
      if os.name == 'nt':
            sniffing(host, 1, socket.IPPROTO_IP)
      else:
            sniffing(host, 0, socket.IPPROTO_ICMP)

if __name__ == '__main__':
      main(HOST)
  在终端中运行如下命令进行测试:   $ sudo python sniffer.py
  ('E\x00\x00T\xb3\xec\x00\x005\x01\xe4\x13J}\xe1\x11\xc0\xa8\x01r\x00\x00v\xdfx\xa2\x00\x01sr\x98T\x00\x00\x00\x008\xe3\r\x00\x00\x00\x00\x00\x10\x11\x12\x13\x14\x15\x16\x17\x18\x19\x1a\x1b\x1c\x1d\x1e\x1f !"#$%&\'()*+,-./01234567', ('74.125.225.17', 0))
('E\x00\x00T\xb4\x1b\x00\x005\x01\xe3\xe4J}\xe1\x11\xc0\xa8\x01r\x00\x00~\xd7x\xa2\x00\x02tr\x98T\x00\x00\x00\x00/\xea\r\x00\x00\x00\x00\x00\x10\x11\x12\x13\x14\x15\x16\x17\x18\x19\x1a\x1b\x1c\x1d\x1e\x1f !"#$%&\'()*+,-./01234567', ('74.125.225.17', 0))
  
  我们需要对这些数据包进行解码
下面我们来研究下协议数据包组成,进行IP ICMP层解码
  
  IP 头
典型的 IP 头有如下结构,每个字段都对应一个变量
   DSC0000.jpg
  
   ICMP头
  ICMP 由于内容的不同其消息类型也不同,但每个消息都包括三个一致的元素:type,code (告知接收主机ICMP消息的解码类型)和 checksum。
   DSC0001.gif
  对于我们的扫描器,如果得到的 type 和 code 值是3,这意味着 Destination Unreachable(目标不可达)和 Port Unreachable (端口不可达) ICMP 消息错误
为描述 ICMP 消息头,用 python 的 ctypes 库来创建一个类
  import ctypes

class ICMP(ctypes.Structure):
      _fields_ = [
      ('type',        ctypes.c_ubyte),
      ('code',        ctypes.c_ubyte),
      ('checksum',    ctypes.c_ushort),
      ('unused',      ctypes.c_ushort),
      ('next_hop_mtu',ctypes.c_ushort)
      ]

      def __new__(self, socket_buffer):
             return self.from_buffer_copy(socket_buffer)

      def __init__(self, socket_buffer):
            pass
  编写消息头解码器
  开发 IP/ICMP 消息头解码器。脚本创建了一个 sniffer socket,然后在循环中持续读取数据包并进行解码。
注意代码中将 IP 头的前20个字节读取到了缓存,然后再打印消息头的变量。
ICMP 头数据如下:
  import socket
import os
import struct
import ctypes
from ICMPHeader import ICMP

# host to listen on
HOST = '192.168.xxx.xxx'

def main():
      socket_protocol = socket.IPPROTO_ICMP
      sniffer = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket_protocol)
      sniffer.bind(( HOST, 0 ))
      sniffer.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1)

      while 1:
            raw_buffer = sniffer.recvfrom(65565)[0]
            ip_header = raw_buffer[0:20]
            iph = struct.unpack('!BBHHHBBH4s4s' , ip_header)

            version_ihl = iph[0]
            version = version_ihl >> 4
            ihl = version_ihl & 0xF
            iph_length = ihl * 4
            ttl = iph[5]
            protocol = iph[6]
            s_addr = socket.inet_ntoa(iph[8]);
            d_addr = socket.inet_ntoa(iph[9]);

            print 'IP -> Version:' + str(version) + ', Header Length:' + str(ihl) + \
            ', TTL:' + str(ttl) + ', Protocol:' + str(protocol) + ', Source:'\
             + str(s_addr) + ', Destination:' + str(d_addr)

    buf = raw_buffer[iph_length:iph_length + ctypes.sizeof(ICMP)]
            icmp_header = ICMP(buf)

            print "ICMP -> Type:%d, Code:%d" %(icmp_header.type, icmp_header.code) + '\n'

if __name__ == '__main__':
      main()
  我们运行 traceroute看下数据包情况:
  $ traceroute www.google.com
traceroute to www.google.com (74.125.226.50), 30 hops max, 60 byte packets
1  * * *
2  * * *
3  67.59.255.137 (67.59.255.137)  17.183 ms 67.59.255.129 (67.59.255.129)  70.563 ms 67.59.255.137 (67.59.255.137)  21.480 ms
4  451be075.cst.lightpath.net (65.19.99.117)  14.639 ms rtr102.wan.hcvlny.cv.net (65.19.99.205)  24.086 ms 451be075.cst.lightpath.net (65.19.107.117)  24.025 ms
5  64.15.3.246 (64.15.3.246)  24.005 ms 64.15.0.218 (64.15.0.218)  23.961 ms 451be0c2.cst.lightpath.net (65.19.120.194)  23.935 ms
6  72.14.215.203 (72.14.215.203)  23.872 ms  46.943 ms *
7  216.239.50.141 (216.239.50.141)  48.906 ms  46.138 ms  46.122 ms
8  209.85.245.179 (209.85.245.179)  46.108 ms  46.095 ms  46.074 ms
9  lga15s43-in-f18.1e100.net (74.125.226.50)  45.997 ms  19.507 ms  16.607 ms
  会得到这种输出 (注意 ICMP 的响应类型):
  sudo python ip_header_decode.py
IP -> Version:4, Header Length:5, TTL:252, Protocol:1, Source:65.19.99.117, Destination:192.168.1.114
ICMP -> Type:11, Code:0(...)IP -> Version:4, Header Length:5, TTL:250, Protocol:1, Source:72.14.215.203, Destination:192.168.1.114
ICMP -> Type:11, Code:0
IP -> Version:4, Header Length:5, TTL:56, Protocol:1, Source:74.125.226.50, Destination:192.168.1.114
ICMP -> Type:3, Code:3
IP -> Version:4, Header Length:5, TTL:249, Protocol:1, Source:216.239.50.141, Destination:192.168.1.114
ICMP -> Type:11, Code:0(...)IP -> Version:4, Header Length:5, TTL:56, Protocol:1, Source:74.125.226.50, Destination:192.168.1.114
ICMP -> Type:3, Code:3
开发扫描器
  在编写完整的扫描器前首先要安装 netaddr,它是一个用于表示和处理网络地址的 python 库。
Netaddr 提供了操作 IPv4,IPv6 和子网 Mac 等地址的能力。它非常有用,因为我们会用到子网掩码,如192.168.1.0/24
$ sudo pip install netaddr
  我们可以使用如下的代码段来测试这个库 (成功会打印“OK”):
  import netaddr

ip = '192.168.xxx.xxx'
if ip in netaddr.IPNetwork('192.168.xxx.0/24'):
    print('OK!')

进一步强化扫描器
  我们会将上面所提到的组织在一起来完成我们的扫描器,然后添加一个循环来向目标子网内的所有地址发送 UDP 数据报。
  import threading

import time
import socket
import os
import struct
from netaddr import IPNetwork, IPAddress
from ICMPHeader import ICMP
import ctypes


HOST = '192.168.xxx.xxx'

SUBNET = '192.168.xxx.0/24'

MESSAGE = 'hellooooo'


def udp_sender(SUBNET, MESSAGE):
    time.sleep(5)
    sender = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    for ip in IPNetwork(SUBNET):
        try:
            sender.sendto(MESSAGE, ("%s" % ip, 65212))
        except:
            pass

def main():
    t = threading.Thread(target=udp_sender, args=(SUBNET, MESSAGE))
    t.start()

    socket_protocol = socket.IPPROTO_ICMP
    sniffer = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket_protocol)
    sniffer.bind(( HOST, 0 ))
    sniffer.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1)

   
    while 1:
        raw_buffer = sniffer.recvfrom(65565)[0]
        ip_header = raw_buffer[0:20]
        iph = struct.unpack('!BBHHHBBH4s4s' , ip_header)

   
        version_ihl = iph[0]
        ihl = version_ihl & 0xF
        iph_length = ihl * 4
        src_addr = socket.inet_ntoa(iph[8]);

     
        buf = raw_buffer[iph_length:iph_length + ctypes.sizeof(ICMP)]
        icmp_header = ICMP(buf)

        # check for the type 3 and code and within our target subnet
        if icmp_header.code == 3 and icmp_header.type == 3:
            if IPAddress(src_addr) in IPNetwork(SUBNET):
                if raw_buffer[len(raw_buffer) - len(MESSAGE):] == MESSAGE:
                    print("Host up: %s" % src_addr)

if __name__ == '__main__':
    main()
运行后得到的结果如下:
$ sudo python scanner.py
Host up: 192.168.1.114(...)

      如果感兴趣可以进一步强化代码,进一步加强扫描器的功能,其实大家可以看到这里技术的难点是你对协议的熟悉程度,我们在做性能测试的过程中也会通过loadrunner去模拟协议的请求,所以关键是实现的内容,语言可以任意选择。欢迎大家咨询poptest测试开发工程师培训。

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-145399-1-1.html 上篇帖子: Mysql 储存过程以及 python callproc调用 下篇帖子: python计算机视觉1:基本操作与直方图
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表