Source code for pcapkit.toolkit.dpkt

# -*- coding: utf-8 -*-
"""DPKT Tools
================

.. module:: pcapkit.toolkit.dpkt

:mod:`pcapkit.toolkit.dpkt` contains all you need for
:mod:`pcapkit` handy usage with `DPKT`_ engine. All reforming
functions returns with a flag to indicate if usable for
its caller.

.. _DPKT: https://dpkt.readthedocs.io

"""
import ipaddress
from typing import TYPE_CHECKING, cast

from pcapkit.const.reg.transtype import TransType as Enum_TransType
from pcapkit.foundation.reassembly.data.ip import Packet as IP_Packet
from pcapkit.foundation.reassembly.data.tcp import Packet as TCP_Packet
from pcapkit.foundation.traceflow.data.tcp import Packet as TF_TCP_Packet

if TYPE_CHECKING:
    from ipaddress import IPv4Address, IPv6Address
    from typing import Any, Optional

    from dpkt.dpkt import Packet
    from dpkt.ip import IP
    from dpkt.ip6 import IP6, IP6FragmentHeader
    from dpkt.tcp import TCP

    from pcapkit.const.reg.linktype import LinkType as Enum_LinkType

__all__ = [
    'ipv6_hdr_len', 'packet2chain', 'packet2dict',
    'ipv4_reassembly', 'ipv6_reassembly', 'tcp_reassembly', 'tcp_traceflow'
]


[docs] def ipv6_hdr_len(ipv6: 'IP6') -> 'int': """Calculate length of headers before IPv6 Fragment header. Args: ipv6: DPKT IPv6 packet. Returns: Length of headers before IPv6 Fragment header :class:`dpkt.ip6.IP6FragmentHeader` (:rfc:`2460#section-4.5`). As specified in :rfc:`2460#section-4.1`, such headers (before the IPv6 Fragment Header) includes Hop-by-Hop Options header :class:`dpkt.ip6.IP6HopOptsHeader` (:rfc:`2460#section-4.3`), Destination Options header :class:`dpkt.ip6.IP6DstOptHeader` (:rfc:`2460#section-4.6`) and Routing header :class:`dpkt.ip6.IP6RoutingHeader` (:rfc:`2460#section-4.4`). """ hdr_len = ipv6.__hdr_len__ # IP6HopOptsHeader / IP6DstOptHeader / IP6RoutingHeader for code in (0, 60, 43): ext_hdr = ipv6.extension_hdrs.get(code) if ext_hdr is not None: hdr_len += ext_hdr.length return hdr_len
[docs] def packet2chain(packet: 'Packet') -> 'str': """Fetch DPKT packet protocol chain. Args: packet: DPKT packet. Returns: Colon (``:``) seperated list of protocol chain. """ chain = [type(packet).__name__] payload = packet.data while not isinstance(payload, bytes): chain.append(type(payload).__name__) payload = payload.data return ':'.join(chain)
[docs] def packet2dict(packet: 'Packet', timestamp: 'float', *, data_link: 'Enum_LinkType') -> 'dict[str, Any]': """Convert DPKT packet into :obj:`dict`. Args: packet: Scapy packet. timestamp: Timestamp of packet. data_link: Data link type. Returns: Dict[str, Any]: A :obj:`dict` mapping of packet data. """ def wrapper(packet: 'Packet') -> 'dict[str, Any]': dict_ = {} # type: dict[str, Any] for field in packet.__hdr_fields__: dict_[field] = getattr(packet, field, None) payload = packet.data if not isinstance(payload, bytes): dict_[type(payload).__name__] = wrapper(payload) return dict_ return { 'timestamp': timestamp, 'packet': packet.pack(), data_link.name: wrapper(packet), }
[docs] def ipv4_reassembly(packet: 'Packet', *, count: 'int' = -1) -> 'IP_Packet[IPv4Address] | None': """Make data for IPv4 reassembly. Args: packet: DPKT packet. count: Packet index. If not provided, default to ``-1``. Returns: Data for IPv4 reassembly. * If the ``packet`` can be used for IPv4 reassembly. A packet can be reassembled if it contains IPv4 layer (:class:`dpkt.ip.IP`) and the **DF** (:attr:`dpkt.ip.IP.df`) flag is :data:`False`. * If the ``packet`` can be reassembled, then the :obj:`dict` mapping of data for IPv4 reassembly (:term:`reasm.ipv4.packet`) will be returned; otherwise, returns :data:`None`. See Also: :class:`pcapkit.foundation.reassembly.ipv4.IPv4` """ ipv4 = getattr(packet, 'ip', None) # type: Optional[IP] if ipv4 is not None: if ipv4.df: # dismiss not fragmented packet return None data = IP_Packet( bufid=( cast('IPv4Address', ipaddress.ip_address(ipv4.src)), # source IP address cast('IPv4Address', ipaddress.ip_address(ipv4.dst)), # destination IP address ipv4.id, # identification Enum_TransType.get(ipv4.p).name, # payload protocol type ), num=count, # original packet range number fo=ipv4.off, # fragment offset ihl=ipv4.__hdr_len__, # internet header length mf=bool(ipv4.mf), # more fragment flag tl=ipv4.len, # total length, header includes header=ipv4.pack()[:ipv4.__hdr_len__], # raw bytes type header payload=bytearray(ipv4.pack()[ipv4.__hdr_len__:]), # raw bytearray type payload ) return data return None
[docs] def ipv6_reassembly(packet: 'Packet', *, count: 'int' = -1) -> 'IP_Packet[IPv6Address] | None': """Make data for IPv6 reassembly. Args: packet: DPKT packet. count: Packet index. If not provided, default to ``-1``. Returns: Data for IPv6 reassembly. * If the ``packet`` can be used for IPv6 reassembly. A packet can be reassembled if it contains IPv6 layer (:class:`dpkt.ip6.IP6`) and IPv6 Fragment header (:rfc:`2460#section-4.5`, i.e., :class:`dpkt.ip6.IP6FragmentHeader`). * If the ``packet`` can be reassembled, then the :obj:`dict` mapping of data for IPv6 reassembly (:term:`reasm.ipv6.packet`) will be returned; otherwise, returns :data:`None`. See Also: :class:`pcapkit.foundation.reassembly.ipv6.IPv6` """ ipv6 = getattr(packet, 'ip6', None) # type: Optional[IP6] if ipv6 is not None: ipv6_frag = ipv6.extension_hdrs.get(44) # type: Optional[IP6FragmentHeader] if ipv6_frag is None: # dismiss not fragmented packet return None hdr_len = ipv6_hdr_len(ipv6) data = IP_Packet( bufid=( cast('IPv6Address', ipaddress.ip_address(ipv6.src)), # source IP address cast('IPv6Address', ipaddress.ip_address(ipv6.dst)), # destination IP address ipv6.flow, # label Enum_TransType.get(ipv6_frag.nh).name, # next header field in IPv6 Fragment Header ), num=count, # original packet range number fo=ipv6_frag.nxt, # fragment offset ihl=hdr_len, # header length, only headers before IPv6-Frag mf=bool(ipv6_frag.m_flag), # more fragment flag tl=len(ipv6), # total length, header includes header=ipv6.pack()[:hdr_len], # raw bytearray type header before IPv6-Frag payload=bytearray(ipv6.pack()[hdr_len+ipv6_frag:]), # raw bytearray type payload after IPv6-Frag ) return data return None
[docs] def tcp_reassembly(packet: 'Packet', *, count: 'int' = -1) -> 'TCP_Packet | None': """Make data for TCP reassembly. Args: packet: DPKT packet. count: Packet index. If not provided, default to ``-1``. Returns: Data for TCP reassembly. * If the ``packet`` can be used for TCP reassembly. A packet can be reassembled if it contains TCP layer (:class:`dpkt.tcp.TCP`). * If the ``packet`` can be reassembled, then the :obj:`dict` mapping of data for TCP reassembly (:term:`reasm.tcp.packet`) will be returned; otherwise, returns :data:`None`. See Also: :class:`pcapkit.foundation.reassembly.tcp.TCP` """ if hasattr(packet, 'ip'): ip = cast('IP', packet.ip) elif hasattr(packet, 'ip6'): ip = cast('IP6', packet.ip6) else: return None tcp = getattr(ip, 'tcp', None) # type: Optional[TCP] if tcp is not None: flags = bin(tcp.flags)[2:].zfill(8) raw_len = len(tcp.data) # payload length, header excludes data = TCP_Packet( bufid=( ipaddress.ip_address(ip.src), # source IP address tcp.sport, # source port ipaddress.ip_address(ip.dst), # destination IP address tcp.dport, # destination port ), num=count, # original packet range number ack=tcp.ack, # acknowledgement dsn=tcp.seq, # data sequence number rst=bool(int(flags[5])), # reset connection flag syn=bool(int(flags[6])), # synchronise flag fin=bool(int(flags[7])), # finish flag header=tcp.pack()[:tcp.__hdr_len__], # raw bytes type header payload=bytearray(tcp.pack()[tcp.__hdr_len__:]), # raw bytearray type payload first=tcp.seq, # this sequence number last=tcp.seq + raw_len, # next (wanted) sequence number len=raw_len, # payload length, header excludes ) return data return None
[docs] def tcp_traceflow(packet: 'Packet', timestamp: 'float', *, data_link: 'Enum_LinkType', count: 'int' = -1) -> 'TF_TCP_Packet | None': """Trace packet flow for TCP. Args: packet: DPKT packet. timestamp: Timestamp of the packet. data_link: Data link layer protocol (from global header). count: Packet index. If not provided, default to ``-1``. Returns: Data for TCP reassembly. * If the ``packet`` can be used for TCP flow tracing. A packet can be reassembled if it contains TCP layer (:class:`dpkt.tcp.TCP`). * If the ``packet`` can be reassembled, then the :obj:`dict` mapping of data for TCP flow tracing (:term:`trace.tcp.packet`) will be returned; otherwise, returns :data:`None`. See Also: :class:`pcapkit.foundation.traceflow.tcp.TCP` """ if hasattr(packet, 'ip'): ip = cast('IP', packet.ip) elif hasattr(packet, 'ip6'): ip = cast('IP6', packet.ip6) else: return None tcp = getattr(ip, 'tcp', None) # type: Optional[TCP] if tcp is not None: flags = bin(tcp.flags)[2:].zfill(8) data = TF_TCP_Packet( # type: ignore[type-var] protocol=data_link, # data link type from global header index=count, # frame number frame=packet2dict(packet, timestamp, data_link=data_link), # extracted packet syn=bool(int(flags[6])), # TCP synchronise (SYN) flag fin=bool(int(flags[7])), # TCP finish (FIN) flag src=ipaddress.ip_address(ip.src), # source IP dst=ipaddress.ip_address(ip.dst), # destination IP srcport=tcp.sport, # TCP source port dstport=tcp.dport, # TCP destination port timestamp=timestamp, # timestamp ) return data return None