Source code for pcapkit.foundation.engines.dpkt

# -*- coding: utf-8 -*-
"""DPKT Support
==================

.. module:: pcapkit.foundation.engines.dpkt

This module contains the implementation for `DPKT`_ engine
support, as is used by :class:`pcapkit.foundation.extraction.Extractor`.

.. _DPKT: https://dpkt.readthedocs.io

"""
from typing import TYPE_CHECKING, cast

from pcapkit.const.reg.linktype import LinkType as Enum_LinkType
from pcapkit.foundation.engines.engine import EngineBase as Engine
from pcapkit.utilities.exceptions import FormatError, stacklevel
from pcapkit.utilities.warnings import AttributeWarning, DPKTWarning, warn

__all__ = ['DPKT']

if TYPE_CHECKING:
    from typing import Optional, Type, Union

    from dpkt.dpkt import Packet as DPKTPacket
    from dpkt.pcap import Reader as PCAPReader
    from dpkt.pcapng import Reader as PCAPNGReader

    from pcapkit.foundation.extraction import Extractor

    Reader = Union[PCAPReader, PCAPNGReader]


[docs] class DPKT(Engine['DPKTPacket']): """DPKT engine support. Args: extractor: :class:`~pcapkit.foundation.extraction.Extractor` instance. """ if TYPE_CHECKING: import dpkt #: Engine extraction package. _expkg: 'dpkt' #: Engine extraction temporary storage. _extmp: 'Reader' ########################################################################## # Defaults. ########################################################################## #: Engine name. __engine_name__ = 'DPKT' #: Engine module name. __engine_module__ = 'dpkt' ########################################################################## # Data models. ########################################################################## def __init__(self, extractor: 'Extractor') -> 'None': import dpkt # isort:skip self._expkg = dpkt self._extmp = cast('Reader', None) super().__init__(extractor) ########################################################################## # Methods. ##########################################################################
[docs] def run(self) -> 'None': """Call :class:`dpkt.pcap.Reader` to extract PCAP files. This method assigns :attr:`self._expkg <DPKT._expkg>` as :mod:`dpkt` and :attr:`self._extmp <DPKT._extmp>` as an iterator from :class:`dpkt.pcap.Reader`. Warns: AttributeWarning: If :attr:`self.extractor._exlyr <pcapkit.foundation.extraction.Extractor._exlyr>` and/or :attr:`self.extractor._exptl <pcapkit.foundation.extraction.Extractor._exptl>` is provided as the DPKT engine currently does not support such operations. Raises: FormatError: If the file format is not supported, i.e., not a PCAP and/or PCAP-NG file. """ from pcapkit.foundation.engines.pcap import PCAP from pcapkit.foundation.engines.pcapng import PCAPNG ext = self._extractor dpkt = self._expkg if ext._exlyr != 'none' or ext._exptl != 'null': warn("'Extractor(engine=dpkt)' does not support protocol and layer threshold; " f"'layer={ext._exlyr}' and 'protocol={ext._exptl}' ignored", AttributeWarning, stacklevel=stacklevel()) # setup verbose handler if ext._flag_v: from pcapkit.toolkit.dpkt import packet2chain # isort:skip ext._vfunc = lambda e, f: print( f'Frame {e._frnum:>3d}: {packet2chain(f)}' # pylint: disable=protected-access ) # pylint: disable=logging-fstring-interpolation if ext.magic_number in PCAP.MAGIC_NUMBER: reader = dpkt.pcap.Reader(ext._ifile) elif ext.magic_number in PCAPNG.MAGIC_NUMBER: reader = dpkt.pcapng.Reader(ext._ifile) else: raise FormatError(f'unsupported file format: {ext.magic_number!r}') # extract & analyse file self._extmp = reader
[docs] def read_frame(self) -> 'DPKTPacket': """Read frames with DPKT engine. Returns: Parsed frame instance. See Also: Please refer to :meth:`PCAP.read_frame <pcapkit.foundation.engines.pcap.PCAP.read_frame>` for more operational information. """ from pcapkit.toolkit.dpkt import (ipv4_reassembly, ipv6_reassembly, packet2dict, tcp_reassembly, tcp_traceflow) ext = self._extractor reader = self._extmp linktype = Enum_LinkType.get(reader.datalink()) # fetch DPKT packet timestamp, pkt = cast('tuple[float, bytes]', next(reader)) protocol = self._get_protocol(linktype) packet = protocol(pkt) # type: DPKTPacket # verbose output ext._frnum += 1 ext._vfunc(ext, packet) # write plist frnum = f'Frame {ext._frnum}' if not ext._flag_q: info = packet2dict(packet, timestamp, data_link=linktype) if ext._flag_f: ofile = ext._ofile(f'{ext._ofnm}/{frnum}.{ext._fext}') ofile(info, name=frnum) else: ext._ofile(info, name=frnum) ofile = ext._ofile ext._offmt = ofile.kind # record fragments if ext._flag_r: if ext._ipv4: data_ipv4 = ipv4_reassembly(packet, count=ext._frnum) if data_ipv4 is not None: ext._reasm.ipv4(data_ipv4) if ext._ipv6: data_ipv6 = ipv6_reassembly(packet, count=ext._frnum) if data_ipv6 is not None: ext._reasm.ipv6(data_ipv6) if ext._tcp: data_tcp = tcp_reassembly(packet, count=ext._frnum) if data_tcp is not None: ext._reasm.tcp(data_tcp) # trace flows if ext._flag_t: if ext._tcp: data_tf_tcp = tcp_traceflow(packet, timestamp, data_link=linktype, count=ext._frnum) if data_tf_tcp is not None: ext._trace.tcp(data_tf_tcp) # record frames if ext._flag_d: # setattr(packet, 'packet2dict', packet2dict) # setattr(packet, 'packet2chain', packet2chain) ext._frame.append(packet) # return frame record return packet
########################################################################## # Utilities. ########################################################################## def _get_protocol(self, linktype: 'Optional[Enum_LinkType]' = None) -> 'Type[DPKTPacket]': """Returns the protocol for parsing the current packet. Args: linktype: Link type code. """ dpkt = self._expkg reader = self._extmp if linktype is None: linktype = Enum_LinkType.get(reader.datalink()) if linktype == Enum_LinkType.ETHERNET: pkg = dpkt.ethernet.Ethernet elif linktype.value == Enum_LinkType.IPV4: pkg = dpkt.ip.IP elif linktype.value == Enum_LinkType.IPV6: pkg = dpkt.ip6.IP6 else: warn('unrecognised link layer protocol; all analysis functions ignored', DPKTWarning, stacklevel=stacklevel()) class RawPacket(dpkt.dpkt.Packet): # type: ignore[name-defined] """Raw packet.""" def __len__(ext) -> 'int': return len(ext.data) def __bytes__(ext) -> 'bytes': return ext.data def unpack(ext, buf: 'bytes') -> 'None': ext.data = buf pkg = RawPacket return pkg