Source code for pcapkit.foundation.engines.scapy

# -*- coding: utf-8 -*-
"""Scapy Support
===================

.. module:: pcapkit.foundation.engines.scapy

This module contains the implementation for `Scapy`_ engine
support, as is used by :class:`pcapkit.foundation.extraction.Extractor`.

.. _Scapy: https://scapy.net

"""
from typing import TYPE_CHECKING, cast

from pcapkit.foundation.engines.engine import EngineBase as Engine
from pcapkit.utilities.exceptions import stacklevel
from pcapkit.utilities.warnings import AttributeWarning, warn

__all__ = ['Scapy']

if TYPE_CHECKING:
    from typing import Iterator

    from scapy.packet import Packet as ScapyPacket

    from pcapkit.foundation.extraction import Extractor


[docs] class Scapy(Engine['ScapyPacket']): """Scapy engine support. Args: extractor: :class:`~pcapkit.foundation.extraction.Extractor` instance. """ if TYPE_CHECKING: import scapy.sendrecv #: Engine extraction package. _expkg: 'scapy.sendrecv' #: Engine extraction temporary storage. _extmp: 'Iterator[ScapyPacket]' ########################################################################## # Defaults. ########################################################################## #: Engine name. __engine_name__ = 'Scapy' #: Engine module name. __engine_module__ = 'scapy' ########################################################################## # Data models. ########################################################################## def __init__(self, extractor: 'Extractor') -> 'None': from scapy import sendrecv as scapy # isort:skip self._expkg = scapy self._extmp = cast('Iterator[ScapyPacket]', None) super().__init__(extractor) ########################################################################## # Methods. ##########################################################################
[docs] def run(self) -> 'None': """Call :func:`scapy.sendrecv.sniff` to extract PCAP files. This method assigns :attr:`self._expkg <Scapy._expkg>` as :mod:`scapy.sendrecv` and :attr:`self._extmp <Scapy._extmp>` as an iterator from :func:`scapy.sendrecv.sniff`. Warns: AttributeWarning: If :attr:`self.extractor._exlyr <pcapkit.foundation.extraction.Extractor._exlyr>` and/or :attr:`self.extractor._exptl <pcapkit.foundation.extraction.Extractor._exptl>` is provided as the Scapy engine currently does not support such operations. """ ext = self._extractor if ext._exlyr != 'none' or ext._exptl != 'null': warn("'Extractor(engine=scapy)' does not support protocol and layer threshold; " f"'layer={ext._exlyr}' and 'protocol={ext._exptl}' ignored", AttributeWarning, stacklevel=stacklevel()) # setup verbose handler if ext._flag_v: from pcapkit.toolkit.scapy import packet2chain # isort:skip ext._vfunc = lambda e, f: print( f'Frame {e._frnum:>3d}: {packet2chain(f)}' # pylint: disable=protected-access ) # pylint: disable=logging-fstring-interpolation # extract & analyse file self._extmp = iter(self._expkg.sniff(offline=ext._ifnm))
[docs] def read_frame(self) -> 'ScapyPacket': """Read frames with Scapy engine. Returns: Parsed frame instance. See Also: Please refer to :meth:`PCAP.read_frame <pcapkit.foundation.engines.pcap.PCAP.read_frame>` for more operational information. """ from pcapkit.toolkit.scapy import (ipv4_reassembly, ipv6_reassembly, packet2dict, tcp_reassembly, tcp_traceflow) ext = self._extractor # fetch Scapy packet packet = next(self._extmp) # verbose output ext._frnum += 1 ext._vfunc(ext, packet) # write plist frnum = f'Frame {ext._frnum}' if not ext._flag_q: info = packet2dict(packet) if ext._flag_f: ofile = ext._ofile(f'{ext._ofnm}/{frnum}.{ext._fext}') ofile(info, name=frnum) else: ext._ofile(info, name=frnum) # record fragments if ext._flag_r: if ext._ipv4: data_ipv4 = ipv4_reassembly(packet, count=ext._frnum) if data_ipv4 is not None: ext._reasm.ipv4(data_ipv4) if ext._ipv6: data_ipv6 = ipv6_reassembly(packet, count=ext._frnum) if data_ipv6 is not None: ext._reasm.ipv6(data_ipv6) if ext._tcp: data_tcp = tcp_reassembly(packet, count=ext._frnum) if data_tcp is not None: ext._reasm.tcp(data_tcp) # trace flows if ext._flag_t: if ext._tcp: data_tf_tcp = tcp_traceflow(packet, count=ext._frnum) if data_tf_tcp is not None: ext._trace.tcp(data_tf_tcp) # record frames if ext._flag_d: # setattr(packet, 'packet2dict', packet2dict) # setattr(packet, 'packet2chain', packet2chain) ext._frame.append(packet) # return frame record return packet