Featured image of post HTTP 1.* Pcap 生成方法

HTTP 1.* Pcap 生成方法

使用 Python 脚本生成 HTTP1.* Pcap

一、概述

本文将介绍一种使用 Python 和 Scapy 库构造 HTTP/1.* Pcap 文件的方法,该方法可自定义 HTTP 请求与响应的所有参数。本文假设读者具备一定的网络协议基础,对 OSI 七层模型或 TCP/IP 模型有基本了解。

二、为什么要自己构造 Pcap

在网络安全产品的测试工作中,我们经常需要各类 Pcap 包来验证功能,例如:测试安全规则的有效性、构造特定的测试资产,或检验产品对数据处理的全流程是否正常。传统的 Pcap 获取方法,如搭建靶场环境并进行抓包,往往较为耗时,且捕获的数据不一定能 100% 贴合测试需求,定制化修改也相对繁琐。

如果能通过代码直接构造 Pcap,我们便可以精准地自定义数据包中的任何内容。与传统抓包相比,这种方法更加灵活高效。

更进一步,我们可以将这些构造逻辑封装成一套工具,通过简单的配置就能一键生成所需流量,相当于为我们的测试需求量身打造了一个 Pcap 生成库。

三、HTTP/1.* 与 TCP 的关系

HTTP 协议为应用层协议,属于 OSI 七层模型的第七层或 TCP/IP 模型的第五层。TCP 为传输层协议,属于 OSI 七层模型的第四层。

我们知道 HTTP 1.* 是基于 TCP 协议的,因此在一次完整的 HTTP 请求过程中,底层会经历完整的 TCP 连接建立与关闭流程。

下面使用一个时序图展示一次 HTTP 请求-响应对应的 TCP 层活动(注意 seq/ack 号的变化):

sequenceDiagram participant Client as 客户端 participant Server as 服务器 Client->>Server: SYN (seq=x) Server->>Client: SYN-ACK (seq=y, ack=x+1) Client->>Server: ACK (seq=x+1, ack=y+1) Note right of Client: TCP 连接已建立!开始传输 HTTP 数据 Client->>Server: HTTP 请求 [PUSH, ACK] (seq=x+1, ack=y+1) Note right of Client: 请求大小假设为 150 字节 Server->>Client: 对请求的确认 [ACK] (seq=y+1, ack=x+1+150) Server->>Client: HTTP 响应 [PUSH, ACK] (seq=y+1, ack=x+1+150) Note left of Server: 响应大小假设为 500 字节 Client->>Server: 对响应的确认 [ACK] (seq=x+1+150, ack=y+1+500) Client->>Server: FIN-ACK (seq=x+1+150, ack=y+1+500) Server->>Client: ACK (seq=y+1+500, ack=x+1+150+1) Server->>Client: FIN-ACK (seq=y+1+500, ack=x+1+150+1) Client->>Server: ACK (seq=x+1+150+1, ack=y+1+500+1) Note right of Client: TCP 连接已关闭!

整体过程可以总结为:

  1. 三次握手:客户端与服务器建立 TCP 连接。
  2. 数据传输:客户端发送 HTTP 请求,服务器响应 HTTP 数据。如果 HTTP 数据量超过了 MSS(最大报文段长度),数据会被分割到多个 TCP 包中进行传输。
  3. 四次挥手:在数据传输完成后,通过四次挥手来关闭 TCP 连接。

有几点值得注意:

  1. ACK数据包不占用序号,下一个正常的数据包仍然使用当前的序列号;
  2. 推送数据使用 PUSH-ACK 数据包,对端在收到一个或多个此类的数据包后通常会回一个 ACK 包进行确认;
  3. seq/ack 的维护必须准确,否则 Wireshark 等工具会将数据包标记为错误。

需要注意的是,本文旨在通过构造一系列 TCP 包来模拟一次理想状态下的 HTTP/1.* 交互。因此,我们仅关注 TCP 协议的基础流程,不涉及如窗口协商、拥塞控制、重传机制或连接异常等复杂情况

四、Pcap 构造工具:Scapy

Scapy 是一个由 Python 编写的强大的交互式数据包处理程序。它能够伪造、发送、捕获、分析和操作网络数据包。Scapy 的核心特点是赋予了用户在极低层级上构建网络数据包的极高自由度,因此被广泛用于网络测试、扫描、攻击、探测以及教学等领域。 我们知道,Pcap 文件记录的是数据链路层上的一系列数据帧。这些数据帧是根据 OSI 或 TCP/IP 模型,从上层到下层逐层封装而成的。要手动构造 Pcap,本质上就是要模拟这一逐层封装的过程。

Scapy 通过重载除法运算符 (/) 来实现协议分层的叠加,这种设计巧妙地映射了数据包的层级结构,极大地简化了数据包的构造过程。

例如,一个发往 192.168.1.1 的 80 端口的 TCP SYN 包可以这样简洁地表示:

1
2
3
from scapy.all import *

packet = Ether() / IP(dst="192.168.1.1") / TCP(dport=80, flags="S")

更多用法可参考 Scapy 官方文档。 关于 Pcap 文件格式的底层分析,可参阅往期文章《Libpcap 格式 Pcap 包分析》。

五、使用 Scapy 构造 HTTP 1.* 的 Pcap 包

本节将详细讲解如何构造 HTTP 1.* Pcap 包。如前所述,使用 Scapy 构造 HTTP 包,就需要我们手动处理从应用层到数据链路层的每一层协议:

  • HTTP (应用层):自定义 HTTP 协议相关内容,如请求头、请求体、响应头、响应体。
  • TCP (传输层):自定义 TCP 协议相关内容,如源端口和目的端口。
  • IP (网络层):自定义 IP 协议相关内容,如源 IP 和目的 IP。
  • Ether (数据链路层):自定义以太网协议相关内容,如源 MAC 和目的 MAC。

使用 Scapy,这个封装过程可以直观地表示为:packet = Ether() / IP() / TCP() / Raw()。其中 Raw() 层用于承载原始的 HTTP 报文字节流。

在构造过程中,我们不仅要定义每一层的数据,还要维护它们之间的关联和状态,确保生成的数据包是合法且有效的。关键点如下:

  1. 维护 HTTP 报文格式:确保 HTTP 报文格式的正确性,特别是需要动态计算的字段,例如 Content-Length(应为请求体/响应体的字节长度)。
  2. 数据分段:当 HTTP 数据超过 MSS 时,需要手动将其分割,并通过多个 TCP 包发送。本文的示例为简化起见,未展示分段过程。
  3. 字段一致性:确保不同层级间的字段保持一致。例如,HTTP 请求头 Host 字段中显式指定的端口号,应与 TCP 层的目的端口保持一致。
  4. 维护 TCP 状态:这是最关键的一步。我们需要精确地模拟 TCP 连接的生命周期,正确管理每个数据包的标志位(S, A, P, F)以及 seq 和 ack 号。(若 seq/ack 计算错误,Wireshark 等工具会将数据包标记为错误)。

总而言之,构造 HTTP 1.* Pcap 的本质就是用 Scapy 模拟一次有状态的 TCP 完整通信。由于并非真实的网络请求,我们需要自行维护 TCP 连接的状态,特别是 TCP 头部中的标志位 (Flags) 与序列号 (seq/ack)。

使用 Scapy 构造上一章节中举例的 HTTP 连接:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
from scapy.utils import wrpcap
from scapy.layers.l2 import Ether
from scapy.layers.inet import IP, TCP
from scapy.layers.http import Raw

def gen_http_pcap():
    """生成 HTTP Pcap"""
    sip = "10.67.0.63"
    dip = "10.67.0.1"
    sport = 8080
    dport = 80
    smac = "fe:2d:90:f2:11:fe"
    dmac = "12:dd:2c:21:a2:bf"
    request = "GET / HTTP/1.1\r\n" "Host: example.com\r\n" "\r\n"
    response = "HTTP/1.1 200 OK\r\n" "Content-Length: 5\r\n" "Content-Type: text/plain\r\n" "\r\n" "Hello"
    seq_x = 0
    seq_y = 0

    # 通用的 IP 层包
    sip_packet = Ether(src=smac, dst=dmac) / IP(src=sip, dst=dip)
    dip_packet = Ether(src=dmac, dst=smac) / IP(src=dip, dst=sip)

    client_port_pair = {"sport": sport, "dport": dport}
    server_port_pair = {"sport": dport, "dport": sport}

    # 三次握手: SYN, SYN-ACK, ACK
    syn_packet = sip_packet / TCP(**client_port_pair, flags="S", seq=seq_x)
    syn_ack_packet = dip_packet / TCP(**server_port_pair, flags="SA", seq=seq_y, ack=syn_packet[TCP].seq + 1)
    ack_packet = sip_packet / TCP(
        **client_port_pair, flags="A", seq=syn_ack_packet[TCP].ack, ack=syn_ack_packet[TCP].seq + 1
    )

    # HTTP 请求
    request_packet = (
        sip_packet
        / TCP(
            **client_port_pair,
            flags="PA",
            seq=syn_ack_packet[TCP].ack,
            ack=syn_ack_packet[TCP].seq + 1,
        )
        / Raw(request.encode())
    )
    dst_ack_packet = dip_packet / TCP(
        **server_port_pair,
        flags="A",
        seq=request_packet[TCP].ack,
        ack=request_packet[TCP].seq + len(request_packet[Raw].load),
    )
    # HTTP 响应
    response_packet = (
        dip_packet
        / TCP(
            **server_port_pair,
            flags="PA",
            seq=request_packet[TCP].ack,
            ack=request_packet[TCP].seq + len(request_packet[Raw].load),
        )
        / Raw(response.encode())
    )
    src_ack_packet = sip_packet / TCP(
        **client_port_pair,
        flags="A",
        seq=response_packet[TCP].ack,
        ack=response_packet[TCP].seq + len(response_packet[Raw].load),
    )

    # 四次挥手
    fin_packet = sip_packet / TCP(
        **client_port_pair, flags="FA", seq=src_ack_packet[TCP].seq, ack=src_ack_packet[TCP].ack
    )
    ack_packet_close = dip_packet / TCP(
        **server_port_pair, flags="A", seq=fin_packet[TCP].ack, ack=fin_packet[TCP].seq + 1
    )
    ack_packet_close2 = dip_packet / TCP(
        **server_port_pair, flags="FA", seq=fin_packet[TCP].ack, ack=fin_packet[TCP].seq + 1
    )
    fin_packet_ack = sip_packet / TCP(
        **client_port_pair, flags="A", seq=ack_packet_close2[TCP].ack, ack=ack_packet_close2[TCP].seq + 1
    )

    # 按顺序组合数据包
    http_traffic = [
        # 三次握手
        syn_packet,
        syn_ack_packet,
        ack_packet,
        # 数据交换
        request_packet,
        dst_ack_packet,
        response_packet,
        src_ack_packet,
        # 四次挥手
        fin_packet,
        ack_packet_close,
        ack_packet_close2,
        fin_packet_ack,
    ]

    wrpcap(f"http.pcap", http_traffic)

if __name__ == "__main__":
    gen_http_pcap()

使用 Wireshark 查看生成的数据包

HTTP1.* 样例 Pcap

六、工具的封装和使用

将上述代码封装为生成 HTTP1.* Pcap 的工具脚本,整个过程其实可以抽象为两个部分

  1. 封装生成 HTTP1.* 请求和响应的逻辑
  2. 封装模拟 TCP 传输数据生成相关网络包的逻辑

这一部分代码就不再展开介绍了,完整工具代码见 Github Gist,其实就是对上一章节中的样例代码进一步封装抽象。

下面简单介绍一下上述提供的工具脚本的用法:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import random

from scapy.utils import wrpcap

from gen_http_pcap import gen_http_request, gen_http_response, gen_http_pcap

dip, dport = "10.67.2.42", 80
sip, sport = "10.67.0.63", 9000


def gen_vul_800001():
    """
    desc: 测试弱密码
    event: 800001
    """
    # 模拟登录
    url = "/auth/login/"
    req_body = {"username": "admin", "password": "qaz123456"}
    res_header = {"Set-Cookie": "sessionid=94hi2otc9ykaoufja1ailul871tf4ha0; "
                                "expires=Fri, 19 Sep 2025 04:40:28 GMT; HttpOnly; Max-Age=3600; Path=/; SameSite=Lax"}
    res_body = {"status": "200", "message": "登陆成功", "data": None}
    request = gen_http_request(dip, dport, url, body=req_body, method="POST")
    response = gen_http_response(header=res_header, body=res_body)

    # 模拟获取数据
    url1 = "/user/admin/"
    req_head1 = {"Cookie": "sessionid=94hi2otc9ykaoufja1ailul871tf4ha0"}
    res_body1 = {"status": "200", "message": "获取用户信息成功", "data": {"username": "admin", "role": "admin"}}
    request1 = gen_http_request(dip, dport, url1, header=req_head1)
    response1 = gen_http_response(body=res_body1)

    return gen_http_pcap(src_ip=sip,
                         src_port=random.randint(20000, 50000),
                         dst_ip=dip,
                         dst_port=dport,
                         request=[request, request1],
                         response=[response, response1])


if __name__ == '__main__':
    packets = gen_vul_800001()
    wrpcap(f"800001.pcap", packets)

使用 Wireshark 查看生成的数据包

弱密码登录 Pcap

使用 Wireshark 追踪 HTTP 流

七、补充说明

  1. 真实性与局限性:需要明确的是,手动构造 Pcap 并不能完全替代在真实环境中抓包。因为我们构造的数据是预设的,可能缺少真实网络环境中的一些细微特征(如 TCP 选项、时间戳、重传等)。但对于功能验证和规则测试等场景,这种方法的效率和灵活性优势巨大。当然,如果我们能确保输入数据与真实流量一致,那么二者便没有本质区别。
  2. 构造更复杂的协议:掌握了 Scapy 的基本用法后,只要深入理解协议的底层细节,我们就能模拟任何网络协议。无论是 HTTP/2、HTTP/3 还是 TLS,都可以通过 Scapy 构造。当然,这些协议本身的状态机和加密机制更为复杂,对构造逻辑的要求也更高。感兴趣的读者可以自行尝试。当然,从根本上说,如果已从底层掌握了网络细节,即便逐个字节拼接,也能还原出完整的协议。

八、总结

本文详细介绍了一种使用 Python 和 Scapy 库手动构造 HTTP/1.* Pcap 的方法。文章从测试需求出发,阐述了手动构造 Pcap 相比传统抓包在灵活性和效率上的显著优势。本文的核心在于阐明:构造 HTTP/1.* Pcap 的本质,就是模拟一次完整的、有状态的 TCP 通信。通过 Scapy 工具,我们可以利用其强大的协议分层能力来实现这一过程。

九、参考与延伸

Licensed under CC BY-NC-SA 4.0
comments powered by Disqus
Built with Hugo
Theme Stack designed by Jimmy