Source code for pcapkit.foundation.extraction

# -*- coding: utf-8 -*-
# pylint: disable=import-outside-toplevel,fixme
# mypy: disable-error-code=dict-item
"""Extractor for PCAP Files
==============================

.. module:: pcapkit.foundation.extraction

:mod:`pcapkit.foundation.extraction` contains
:class:`~pcapkit.foundation.extraction.Extractor` only,
which synthesises file I/O and protocol analysis,
coordinates information exchange in all network layers,
extracts parametres from a PCAP file.

"""
import collections
import importlib
import io
import os
import sys
from typing import TYPE_CHECKING, Generic, TypeVar, cast

from dictdumper.dumper import Dumper

from pcapkit.corekit.io import SeekableReader
from pcapkit.corekit.module import ModuleDescriptor
from pcapkit.dumpkit.common import make_dumper
from pcapkit.foundation.engines.engine import Engine
from pcapkit.foundation.engines.pcap import PCAP as PCAP_Engine
from pcapkit.foundation.engines.pcapng import PCAPNG as PCAPNG_Engine
from pcapkit.foundation.reassembly import ReassemblyManager
from pcapkit.foundation.reassembly.data import ReassemblyData
from pcapkit.foundation.reassembly.reassembly import Reassembly
from pcapkit.foundation.traceflow import TraceFlowManager
from pcapkit.foundation.traceflow.data import TraceFlowData
from pcapkit.foundation.traceflow.traceflow import TraceFlow
from pcapkit.utilities.exceptions import (CallableError, FileNotFound, FormatError, IterableError,
                                          RegistryError, UnsupportedCall, stacklevel)
from pcapkit.utilities.logging import logger
from pcapkit.utilities.warnings import (EngineWarning, ExtractionWarning, FormatWarning,
                                        RegistryWarning, warn)

if TYPE_CHECKING:
    from io import BufferedReader
    from types import ModuleType, TracebackType
    from typing import IO, Any, Callable, DefaultDict, Optional, Type, Union

    from dpkt.dpkt import Packet as DPKTPacket
    from pyshark.packet.packet import Packet as PySharkPacket
    from scapy.packet import Packet as ScapyPacket
    from typing_extensions import Literal

    from pcapkit.foundation.reassembly.ipv4 import IPv4 as IPv4_Reassembly
    from pcapkit.foundation.reassembly.ipv6 import IPv6 as IPv6_Reassembly
    from pcapkit.foundation.reassembly.tcp import TCP as TCP_Reassembly
    from pcapkit.foundation.traceflow.tcp import TCP as TCP_TraceFlow
    from pcapkit.protocols.misc.pcap.frame import Frame
    from pcapkit.protocols.misc.pcapng import PCAPNG
    from pcapkit.protocols.protocol import ProtocolBase as Protocol

    Formats = Literal['pcap', 'json', 'tree', 'plist']
    Engines = Literal['default', 'pcapkit', 'dpkt', 'scapy', 'pyshark']
    Layers = Literal['link', 'internet', 'transport', 'application', 'none']

    Packet = Union[Frame, PCAPNG, ScapyPacket, DPKTPacket, PySharkPacket]

    Protocols = Union[str, Protocol, Type[Protocol]]
    VerboseHandler = Callable[['Extractor', Packet], Any]

__all__ = ['Extractor']

_P = TypeVar('_P')


[docs] class Extractor(Generic[_P]): """Extractor for PCAP files. Notes: For supported engines, please refer to :meth:`~pcapkit.foundation.extraction.Extractor.run`. """ if TYPE_CHECKING: #: Input file name. _ifnm: 'str' #: Output file name. _ofnm: 'Optional[str]' #: Output file extension. _fext: 'Optional[str]' #: Auto extract flag. It indicates if the extraction process should #: continue automatically until the EOF is reached. _flag_a: 'bool' #: Store data flag. It indicates if the extracted frames should be #: stored in memory. _flag_d: 'bool' #: EOF flag. It indicates if the EOF is reached. _flag_e: 'bool' #: Split file flag, i.e. dump each frame into different files. _flag_f: 'bool' #: No output file, i.e., no output file is to be generated. _flag_q: 'bool' #: Trace flag. It indicates if the flow tracing is enabled. _flag_t: 'bool' #: Verbose flag. This is used to determine if the verbose callback #: function should be called at each frame. _flag_v: 'bool' #: No EOF flag. It is useful when the input file is a live capture, #: as the extraction process will not stop until the user interrupt #: the process. _flag_n: 'bool' #: Input filename flag. It indicates if the input file is a file #: name or a binary IO object. For the latter, we should not close #: the file object after extraction. _flag_s: 'bool' #: Verbose callback function. #_vfunc: 'VerboseHandler' #: Frame number. _frnum: 'int' #: Frame records. _frame: 'list[Packet]' #: Frame record for reassembly. _reasm: 'ReassemblyManager' #: Frame record for flow tracing. _trace: 'TraceFlowManager' #: IPv4 flag. It indicates if the IPv4 reassembly and/or flow tracing #: is enabled. _ipv4: 'bool' #: IPv6 flag. It indicates if the IPv6 reassembly and/or flow tracing #: is enabled. _ipv6: 'bool' #: TCP flag. It indicates if the TCP reassembly and/or flow tracing #: is enabled. _tcp: 'bool' #: Extract til protocol. _exptl: 'Protocols' #: Extract til layer. _exlyr: 'Layers' #: Extraction engine name. _exnam: 'Engines' #: Extraction engine instance. _exeng: 'Engine[_P]' #: Input file object. _ifile: 'BufferedReader' #: Output file object. _ofile: 'Dumper | Type[Dumper]' #: Magic number. _magic: 'bytes' #: Output format. _offmt: 'Formats' #: List of potential PCAP file extentions. PCAP_EXT = ['.pcap', '.cap', '.pcapng'] ########################################################################## # Defaults. ########################################################################## #: Format dumper mapping for writing output files. The values should be a #: tuple representing the module name and class name, or a #: :class:`dictdumper.dumper.Dumper` subclass, and corresponding file extension. __output__ = collections.defaultdict( lambda: (ModuleDescriptor('pcapkit.dumpkit', 'NotImplementedIO'), None), { 'pcap': (ModuleDescriptor('pcapkit.dumpkit', 'PCAPIO'), '.pcap'), 'cap': (ModuleDescriptor('pcapkit.dumpkit', 'PCAPIO'), '.pcap'), 'plist': (ModuleDescriptor('dictdumper', 'PLIST'), '.plist'), 'xml': (ModuleDescriptor('dictdumper', 'PLIST'), '.plist'), 'json': (ModuleDescriptor('dictdumper', 'JSON'), '.json'), 'tree': (ModuleDescriptor('dictdumper', 'Tree'), '.txt'), 'text': (ModuleDescriptor('dictdumper', 'Text'), '.txt'), 'txt': (ModuleDescriptor('dictdumper', 'Tree'), '.txt'), }, ) # type: DefaultDict[str, tuple[ModuleDescriptor[Dumper] | Type[Dumper], str | None]] #: Engine mapping for extracting frames. The values should be a tuple representing #: the module name and class name, or an :class:`~pcapkit.foundation.engines.engine.Engine` #: subclass. __engine__ = { 'scapy': ModuleDescriptor('pcapkit.foundation.engines.scapy', 'Scapy'), 'dpkt': ModuleDescriptor('pcapkit.foundation.engines.dpkt', 'DPKT'), 'pyshark': ModuleDescriptor('pcapkit.foundation.engines.pyshark', 'PyShark'), } # type: dict[str, ModuleDescriptor[Engine] | Type[Engine]] #: Reassembly support mapping for extracting frames. The values should be a tuple #: representing the module name and class name, or a :class:`~pcapkit.foundation.reassembly.reassembly.Reassembly` #: subclass. __reassembly__ = { 'ipv4': ModuleDescriptor('pcapkit.foundation.reassembly.ipv4', 'IPv4'), 'ipv6': ModuleDescriptor('pcapkit.foundation.reassembly.ipv6', 'IPv6'), 'tcp': ModuleDescriptor('pcapkit.foundation.reassembly.tcp', 'TCP'), } # type: dict[str, ModuleDescriptor[Reassembly] | Type[Reassembly]] #: Flow tracing support mapping for extracting frames. The values should be a tuple #: representing the module name and class name, or a :class:`~pcapkit.foundation.traceflow.traceflow.TraceFlow` #: subclass. __traceflow__ = { 'tcp': ModuleDescriptor('pcapkit.foundation.traceflow.tcp', 'TCP'), } # type: dict[str, ModuleDescriptor[TraceFlow] | Type[TraceFlow]] ########################################################################## # Properties. ########################################################################## @property def length(self) -> 'int': """Frame number (of current extracted frame or all).""" return self._frnum @property def format(self) -> 'Formats': """Format of output file. Raises: UnsupportedCall: If :attr:`self._flag_q <pcapkit.foundation.extraction.Extractor._flag_q>` is set as :data:`True`, as output is disabled by initialisation parameter. """ if self._flag_q: raise UnsupportedCall("'Extractor(nofile=True)' object has no attribute 'format'") return self._offmt @property def input(self) -> 'str': """Name of input PCAP file.""" return self._ifnm @property def output(self) -> 'str': """Name of output file. Raises: UnsupportedCall: If :attr:`self._flag_q <pcapkit.foundation.extraction.Extractor._flag_q>` is set as :data:`True`, as output is disabled by initialisation parameter. """ if self._flag_q: raise UnsupportedCall("'Extractor(nofile=True)' object has no attribute 'format'") return cast('str', self._ofnm) @property def frame(self) -> 'tuple[Packet, ...]': """Extracted frames. Raises: UnsupportedCall: If :attr:`self._flag_d <pcapkit.foundation.extraction.Extractor._flag_d>` is :data:`False`, as storing frame data is disabled. """ if self._flag_d: return tuple(self._frame) raise UnsupportedCall("'Extractor(store=False)' object has no attribute 'frame'") @property def reassembly(self) -> 'ReassemblyData': """Frame record for reassembly. * ``ipv4`` -- tuple of IPv4 payload fragment (:term:`reasm.ipv4.datagram`) * ``ipv6`` -- tuple of IPv6 payload fragment (:term:`reasm.ipv6.datagram`) * ``tcp`` -- tuple of TCP payload fragment (:term:`reasm.tcp.datagram`) Raises: UnsupportedCall: If :attr:`self._flag_r <pcapkit.foundation.extraction.Extractor._flag_r>` is :data:`False`, as reassembly is disabled. """ if self._flag_r: data = ReassemblyData( ipv4=tuple(self._reasm.ipv4.datagram) if self._ipv4 else None, ipv6=tuple(self._reasm.ipv6.datagram) if self._ipv6 else None, tcp=tuple(self._reasm.tcp.datagram) if self._tcp else None, ) return data raise UnsupportedCall("'Extractor(reassembly=False)' object has no attribute 'reassembly'") @property def trace(self) -> 'TraceFlowData': """Index table for traced flow. * ``tcp`` -- tuple of TCP flows (:term:`trace.tcp.index`) Raises: UnsupportedCall: If :attr:`self._flag_t <pcapkit.foundation.extraction.Extractor._flag_t>` is :data:`False`, as flow tracing is disabled. """ if self._flag_t: data = TraceFlowData( tcp=tuple(self._trace.tcp.index) if self._tcp else None, ) return data raise UnsupportedCall("'Extractor(trace=False)' object has no attribute 'trace'") @property def engine(self) -> 'Engine': """PCAP extraction engine.""" return self._exeng @property def magic_number(self) -> 'bytes': """Magic number of input PCAP file.""" return self._magic ########################################################################## # Methods. ##########################################################################
[docs] @classmethod def register_dumper(cls, format: 'str', dumper: 'ModuleDescriptor[Dumper] | Type[Dumper]', ext: 'str') -> 'None': r"""Register a new dumper class. Notes: The full qualified class name of the new dumper class should be as ``{dumper.module}.{dumper.name}``. Arguments: format: format name dumper: module descriptor or a :class:`dictdumper.dumper.Dumper` subclass ext: file extension """ if isinstance(dumper, ModuleDescriptor): dumper = dumper.klass if not issubclass(dumper, Dumper): raise RegistryError(f'dumper must be a Dumper subclass, not {dumper!r}') if format in cls.__output__: warn(f'dumper {format} already registered, overwriting', RegistryWarning) cls.__output__[format] = (dumper, ext)
[docs] @classmethod def register_engine(cls, name: 'str', engine: 'ModuleDescriptor[Engine] | Type[Engine]') -> 'None': r"""Register a new extraction engine. Notes: The full qualified class name of the new extraction engine should be as ``{engine.module}.{engine.name}``. Arguments: name: engine name engine: module descriptor or an :class:`~pcapkit.foundation.engines.engine.Engine` subclass """ if isinstance(engine, ModuleDescriptor): engine = engine.klass if not issubclass(engine, Engine): raise RegistryError(f'engine must be an Engine subclass, not {engine!r}') if name in cls.__engine__: warn(f'engine {name} already registered, overwriting', RegistryWarning) cls.__engine__[name] = engine
[docs] @classmethod def register_reassembly(cls, protocol: 'str', reassembly: 'ModuleDescriptor[Reassembly] | Type[Reassembly]') -> 'None': r"""Register a new reassembly engine. Notes: The full qualified class name of the new reassembly engine should be as ``{reassembly.module}.{reassembly.name}``. Arguments: protocol: protocol name reassembly: module descriptor or a :class:`~pcapkit.foundation.reassembly.reassembly.Reassembly` subclass """ if isinstance(reassembly, ModuleDescriptor): reassembly = reassembly.klass if not issubclass(reassembly, Reassembly): raise RegistryError(f'reassembly must be a Reassembly subclass, not {reassembly!r}') if protocol in cls.__reassembly__: warn(f'reassembly {protocol} already registered, overwriting', RegistryWarning) cls.__reassembly__[protocol] = reassembly
[docs] @classmethod def register_traceflow(cls, protocol: 'str', traceflow: 'ModuleDescriptor[TraceFlow] | Type[TraceFlow]') -> 'None': r"""Register a new flow tracing engine. Notes: The full qualified class name of the new flow tracing engine should be as ``{traceflow.module}.{traceflow.name}``. Arguments: protocol: protocol name traceflow: module descriptor or a :class:`~pcapkit.foundation.traceflow.traceflow.TraceFlow` subclass """ if isinstance(traceflow, ModuleDescriptor): traceflow = traceflow.klass if not issubclass(traceflow, TraceFlow): raise RegistryError(f'traceflow must be a TraceFlow subclass, not {traceflow!r}') if protocol in cls.__traceflow__: warn(f'traceflow {protocol} already registered, overwriting', RegistryWarning) cls.__traceflow__[protocol] = traceflow
[docs] def run(self) -> 'None': # pylint: disable=inconsistent-return-statements """Start extraction. We uses :meth:`~pcapkit.foundation.extraction.Extractor.import_test` to check if a certain engine is available or not. For supported engines, each engine has different driver method: * Default drivers: - PCAP Format: :class:`pcapkit.foundation.engines.pcap.PCAP` - PCAP-NG Format: :class:`pcapkit.foundation.engines.pcapng.PCAPNG` * DPKT driver: :class:`pcapkit.foundation.engines.dpkt.DPKT` * Scapy driver: :class:`pcapkit.foundation.engines.scapy.Scapy` * PyShark driver: :class:`pcapkit.foundation.engines.pyshark.PyShark` Warns: pcapkit.utilities.warnings.EngineWarning: If the extraction engine is not available. This is either due to dependency not installed, or supplied engine unknown. :rtype: None """ if self._exnam in self.__engine__: # check if engine is supported eng = self.__engine__[self._exnam] if isinstance(eng, ModuleDescriptor): eng = eng.klass if self.import_test(eng.module, name=eng.name) is not None: # type: ignore[arg-type] self._exeng = eng(self) self._exeng.run() # start iteration self.record_frames() return warn(f'engine {eng.name} (`{eng.module}`) is not installed; ' 'using default engine instead', EngineWarning, stacklevel=stacklevel()) self._exnam = 'default' # using default/pcapkit engine if self._exnam not in ('default', 'pcapkit'): warn(f'unsupported extraction engine: {self._exnam}; ' 'using default engine instead', EngineWarning, stacklevel=stacklevel()) self._exnam = 'default' # using default/pcapkit engine if self._magic in PCAP_Engine.MAGIC_NUMBER: self._exeng = cast('Engine[_P]', PCAP_Engine(self)) elif self._magic in PCAPNG_Engine.MAGIC_NUMBER: self._exeng = cast('Engine[_P]', PCAPNG_Engine(self)) else: raise FormatError(f'unknown file format: {self._magic!r}') # start engine self._exeng.run() # start iteration self.record_frames()
[docs] @staticmethod def import_test(engine: 'str', *, name: 'Optional[str]' = None) -> 'Optional[ModuleType]': """Test import for extractcion engine. Args: engine: Extraction engine module name. name: Extraction engine display name. Warns: pcapkit.utilities.warnings.EngineWarning: If the engine module is not installed. Returns: If succeeded, returns the module; otherwise, returns :data:`None`. """ try: module = importlib.import_module(engine) except ImportError: module = None warn(f"extraction engine '{name or engine}' not available; " 'using default engine instead', EngineWarning, stacklevel=stacklevel()) return module
[docs] @classmethod def make_name(cls, fin: 'str | IO[bytes]' = 'in.pcap', fout: 'str' = 'out', fmt: 'Formats' = 'tree', extension: 'bool' = True, *, files: 'bool' = False, nofile: 'bool' = False) -> 'tuple[str, Optional[str], Formats, Optional[str], bool]': """Generate input and output filenames. The method will perform following processing: 1. sanitise ``fin`` as the input PCAP filename; ``in.pcap`` as default value and append ``.pcap`` extension if needed and ``extension`` is :data:`True`; as well as test if the file exists; 2. if ``nofile`` is :data:`True`, skips following processing; 3. if ``fmt`` provided, then it presumes corresponding output file extension; 4. if ``fout`` not provided, it presumes the output file name based on the presumptive file extension; the stem of the output file name is set as ``out``; should the file extension is not available, then it raises :exc:`~pcapkit.utilities.exceptions.FormatError`; 5. if ``fout`` provided, it presumes corresponding output format if needed; should the presumption cannot be made, then it raises :exc:`~pcapkit.utilities.exceptions.FormatError`; 6. it will also append corresponding file extension to the output file name if needed and ``extension`` is :data:`True`. And the method returns the generated input and output filenames as follows: 0. input filename 1. output filename / directory name 2. output format 3. output file extension (without ``.``) 4. if split each frame into different files Args: fin: Input filename or a binary IO object. fout: Output filename. fmt: Output file format. extension: If append ``.pcap`` file extension to the input filename if ``fin`` does not have such file extension; if check and append extensions to output file. files: If split each frame into different files. nofile: If no output file is to be dumped. Returns: Generated input and output filenames. Raises: FileNotFound: If input file does not exists. FormatError: If output format not provided and cannot be presumpted. """ if isinstance(fin, str): if extension: # pylint: disable=else-if-used ifnm = fin if os.path.splitext(fin)[1] in cls.PCAP_EXT else f'{fin}.pcap' else: ifnm = fin if not os.path.isfile(ifnm): raise FileNotFound(2, 'No such file or directory', ifnm) else: ifnm = fin.name if nofile: ofnm = None ext = None else: ext = cls.__output__[fmt][1] if ext is None: raise FormatError(f'unknown output format: {fmt}') if (parent := os.path.split(fout)[0]): os.makedirs(parent, exist_ok=True) if files: ofnm = fout os.makedirs(ofnm, exist_ok=True) elif extension: ofnm = fout if os.path.splitext(fout)[1] == ext else f'{fout}{ext}' else: ofnm = fout return ifnm, ofnm, fmt, ext, files
[docs] def record_header(self) -> 'Engine': """Read global header. The method will parse the PCAP global header and save the parsed result to its extraction context. Information such as PCAP version, data link layer protocol type, nanosecond flag and byteorder will also be save the current :class:`~pcapkit.foundation.engins.engine.Engine` instance as well. If TCP flow tracing is enabled, the nanosecond flag and byteorder will be used for the output PCAP file of the traced TCP flows. For output, the method will dump the parsed PCAP global header under the name of ``Global Header``. """ # pylint: disable=attribute-defined-outside-init,protected-access if self._magic in PCAP_Engine.MAGIC_NUMBER: engine = PCAP_Engine(self) engine.run() self._ifile.seek(0, os.SEEK_SET) return engine # type: ignore[return-value] if self._magic in PCAPNG_Engine.MAGIC_NUMBER: engine = PCAPNG_Engine(self) # type: ignore[assignment] engine.run() self._ifile.seek(0, os.SEEK_SET) return engine # type: ignore[return-value] raise FormatError(f'unknown file format: {self._magic!r}')
[docs] def record_frames(self) -> 'None': """Read packet frames. The method calls :meth:`self._exeng.read_frame <pcapkit.foundation.engines.engine.Engine.read_frame>` to parse each frame from the input PCAP file; and performs cleanup by calling :meth:`self._exeng.close <pcapkit.foundation.engines.engine.Engine.close>` upon completion of the parsing process. Notes: Under non-auto mode, i.e. :attr:`self._flag_a <Extractor._flag_a>` is :data:`False`, the method performs no action. """ if self._flag_a: while True: try: self._exeng.read_frame() except (EOFError, StopIteration): warn('EOF reached', ExtractionWarning, stacklevel=stacklevel()) if self._flag_n: continue # quit when EOF break except KeyboardInterrupt: self._cleanup() raise self._cleanup()
########################################################################## # Data models. ##########################################################################
[docs] def __init__(self, fin: 'Optional[str | IO[bytes]]' = None, fout: 'Optional[str]' = None, format: 'Optional[Formats]' = None, # basic settings # pylint: disable=redefined-builtin auto: 'bool' = True, extension: 'bool' = True, store: 'bool' = True, # internal settings # pylint: disable=line-too-long files: 'bool' = False, nofile: 'bool' = False, verbose: 'bool | VerboseHandler' = False, # output settings # pylint: disable=line-too-long engine: 'Optional[Engines]' = None, layer: 'Optional[Layers]' = None, protocol: 'Optional[Protocols]' = None, # extraction settings # pylint: disable=line-too-long reassembly: 'bool' = False, reasm_strict: 'bool' = True, reasm_store: 'bool' = True, # reassembly settings # pylint: disable=line-too-long trace: 'bool' = False, trace_fout: 'Optional[str]' = None, trace_format: 'Optional[Formats]' = None, # trace settings # pylint: disable=line-too-long trace_byteorder: 'Literal["big", "little"]' = sys.byteorder, trace_nanosecond: 'bool' = False, # trace settings # pylint: disable=line-too-long ip: 'bool' = False, ipv4: 'bool' = False, ipv6: 'bool' = False, tcp: 'bool' = False, # reassembly/trace settings # pylint: disable=line-too-long buffer_size: 'int' = io.DEFAULT_BUFFER_SIZE, buffer_save: 'bool' = False, buffer_path: 'Optional[str]' = None, # buffer settings # pylint: disable=line-too-long no_eof: 'bool' = False) -> 'None': """Initialise PCAP Reader. Args: fin: file name to be read or a binary IO object; if file not exist, raise :exc:`FileNotFound` fout: file name to be written format: file format of output auto: if automatically run till EOF extension: if check and append extensions to output file store: if store extracted packet info files: if split each frame into different files nofile: if no output file is to be dumped verbose: a :obj:`bool` value or a function takes the :class:`Extractor` instance and current parsed frame (depends on engine selected) as parameters to print verbose output information engine: extraction engine to be used layer: extract til which layer protocol: extract til which protocol reassembly: if perform reassembly reasm_strict: if set strict flag for reassembly reasm_store: if store reassembled datagrams trace: if trace TCP traffic flows trace_fout: path name for flow tracer if necessary trace_format: output file format of flow tracer trace_byteorder: output file byte order trace_nanosecond: output nanosecond-resolution file flag ip: if record data for IPv4 & IPv6 reassembly (must be used with ``reassembly=True``) ipv4: if perform IPv4 reassembly (must be used with ``reassembly=True``) ipv6: if perform IPv6 reassembly (must be used with ``reassembly=True``) tcp: if perform TCP reassembly and/or flow tracing (must be used with ``reassembly=True`` or ``trace=True``) buffer_size: buffer size for reading input file (for :class:`~pcapkit.corekit.io.SeekableReader` only) buffer_save: if save buffer to file (for :class:`~pcapkit.corekit.io.SeekableReader` only) buffer_path: path name for buffer file if necessary (for :class:`~pcapkit.corekit.io.SeekableReader` only) no_eof: if raise :exc:`EOFError` when EOF Warns: pcapkit.utilities.warnings.FormatWarning: Warns under following circumstances: * If using PCAP output for TCP flow tracing while the extraction engine is PyShark. * If output file format is not supported. """ if fin is None: fin = 'in.pcap' if fout is None: fout = 'out' if format is None: format = 'tree' ifnm, ofnm, fmt, oext, files = self.make_name(fin, fout, format, extension, files=files, nofile=nofile) self._ifnm = ifnm # input file name self._ofnm = ofnm # output file name self._fext = oext # output file extension self._flag_a = auto # auto extract flag self._flag_d = store # store data flag self._flag_e = False # EOF flag self._flag_f = files # split file flag self._flag_q = nofile # no output flag self._flag_r = reassembly # reassembly flag self._flag_t = trace # trace flag self._flag_v = False # verbose flag self._flag_s = isinstance(fin, str) # input filename flag self._flag_n = no_eof # no EOF flag # verbose callback function if isinstance(verbose, bool): self._flag_v = verbose if verbose: self._vfunc = lambda e, f: print( f'Frame {e._frnum:>3d}: {f.protochain}' # pylint: disable=protected-access ) # pylint: disable=logging-fstring-interpolation else: self._vfunc = lambda e, f: None else: self._flag_v = True self._vfunc = verbose self._frnum = 0 # frame number self._frame = [] # frame record self._ipv4 = ipv4 or ip # IPv4 Reassembly self._ipv6 = ipv6 or ip # IPv6 Reassembly self._tcp = tcp # TCP Reassembly self._exptl = protocol or 'null' # extract til protocol self._exlyr = cast('Layers', (layer or 'none').lower()) # extract til layer self._exnam = cast('Engines', (engine or 'default').lower()) # extract using engine if reassembly: reasm_obj_ipv4 = reasm_obj_ipv6 = reasm_obj_tcp = None if self._ipv4: logger.info('IPv4 reassembly enabled') reasm_cls_ipv4 = self.__reassembly__['ipv4'] if isinstance(reasm_cls_ipv4, ModuleDescriptor): reasm_cls_ipv4 = reasm_cls_ipv4.klass self.__reassembly__['ipv4'] = reasm_cls_ipv4 # update mapping upon import reasm_obj_ipv4 = cast('IPv4_Reassembly', reasm_cls_ipv4(strict=reasm_strict, store=reasm_store)) if self._ipv6: logger.info('IPv6 reassembly enabled') reasm_cls_ipv6 = self.__reassembly__['ipv6'] if isinstance(reasm_cls_ipv6, ModuleDescriptor): reasm_cls_ipv6 = reasm_cls_ipv6.klass self.__reassembly__['ipv6'] = reasm_cls_ipv6 # update mapping upon import reasm_obj_ipv6 = cast('IPv6_Reassembly', reasm_cls_ipv6(strict=reasm_strict, store=reasm_store)) if self._tcp: logger.info('TCP reassembly enabled') reasm_cls_tcp = self.__reassembly__['tcp'] if isinstance(reasm_cls_tcp, ModuleDescriptor): reasm_cls_tcp = reasm_cls_tcp.klass self.__reassembly__['tcp'] = reasm_cls_tcp # update mapping upon import reasm_obj_tcp = cast('TCP_Reassembly', reasm_cls_tcp(strict=reasm_strict, store=reasm_store)) self._reasm = ReassemblyManager( ipv4=reasm_obj_ipv4, ipv6=reasm_obj_ipv6, tcp=reasm_obj_tcp, ) if trace: trace_obj_tcp = None if self._exnam in ('pyshark',) and trace_format in ('pcap',): warn(f"'Extractor(engine={self._exnam})' does not support 'trace_format={trace_format}'; " "using 'trace_format=None' instead", FormatWarning, stacklevel=stacklevel()) trace_format = None if self._tcp: logger.info('TCP flow tracing enabled') trace_cls_tcp = self.__traceflow__['tcp'] if isinstance(trace_cls_tcp, ModuleDescriptor): trace_cls_tcp = trace_cls_tcp.klass self.__traceflow__['tcp'] = trace_cls_tcp # update mapping upon import trace_obj_tcp = cast('TCP_TraceFlow', trace_cls_tcp(fout=trace_fout, format=trace_format, byteorder=trace_byteorder, nanosecond=trace_nanosecond)) self._trace = TraceFlowManager( tcp=trace_obj_tcp, ) if self._flag_s: self._ifile = open(ifnm, 'rb') # input file # pylint: disable=unspecified-encoding,consider-using-with else: self._ifile = cast('BufferedReader', fin) if not self._ifile.seekable(): self._ifile = SeekableReader(self._ifile, buffer_size, buffer_save, buffer_path, stream_closing=not self._flag_s) if not self._flag_q: output, ext = self.__output__[fmt] if ext is None: warn(f'Unsupported output format: {fmt}; disabled file output feature', FormatWarning, stacklevel=stacklevel()) if isinstance(output, ModuleDescriptor): output = output.klass self.__output__[fmt] = (output, ext) # update mapping upon import dumper = make_dumper(output) self._ofile = dumper if self._flag_f else dumper(ofnm) # output file # NOTE: we use peek() to read the magic number, as the file pointer # will not be moved after reading; however, the returned bytes object # may not be exactly 4 bytes, so we use [:4] to get the first 4 bytes self._magic = self._ifile.peek(4)[:4] #self._magic = self._ifile.read(4) # magic number #self._ifile.seek(0, os.SEEK_SET) self.run() # start extraction
[docs] def __iter__(self) -> 'Extractor': """Iterate and parse PCAP frame. Raises: IterableError: If :attr:`self._flag_a <pcapkit.foundation.extraction.Extractor._flag_a>` is :data:`True`, as such operation is not applicable. """ if not self._flag_a: return self raise IterableError("'Extractor(auto=True)' object is not iterable")
[docs] def __next__(self) -> '_P': """Iterate and parse next PCAP frame. It will call :meth:`self._exeng.read_frame <pcapkit.foundation.engines.engine.Engine.read_frame>` to parse next PCAP frame internally, until the EOF reached; then it calls :meth:`self._cleanup <_cleanup>` for the aftermath. """ while True: try: return self._exeng.read_frame() except (EOFError, StopIteration) as error: warn('EOF reached', ExtractionWarning, stacklevel=stacklevel()) if self._flag_n: continue self._cleanup() raise StopIteration from error # pylint: disable=raise-missing-from except KeyboardInterrupt: self._cleanup() raise
[docs] def __call__(self) -> '_P': """Works as a simple wrapper for the iteration protocol. Raises: IterableError: If :attr:`self._flag_a <pcapkit.foundation.extraction.Extractor._flag_a>` is :data:`True`, as iteration is not applicable. """ if not self._flag_a: while True: try: return self._exeng.read_frame() except (EOFError, StopIteration): warn('EOF reached', ExtractionWarning, stacklevel=stacklevel()) if self._flag_n: continue self._cleanup() raise except KeyboardInterrupt: self._cleanup() raise raise CallableError("'Extractor(auto=True)' object is not callable")
def __enter__(self) -> 'Extractor': """Uses :class:`Extractor` as a context manager.""" return self def __exit__(self, exc_type: 'Type[BaseException] | None', exc_value: 'BaseException | None', traceback: 'TracebackType | None') -> 'None': # pylint: disable=unused-argument """Close the input file when exits.""" self._ifile.close() self._exeng.close() ########################################################################## # Utilities. ##########################################################################
[docs] def _cleanup(self) -> 'None': """Cleanup after extraction & analysis. The method calls :meth:`self._exeng.close <pcapkit.foundation.engines.engine.Engine.close>`, sets :attr:`self._flag_e <pcapkit.foundation.extraction.Extractor._flag_e>` as :data:`True` and closes the input file (if necessary). """ # pylint: disable=attribute-defined-outside-init self._flag_e = True if isinstance(self._ifile, SeekableReader): self._ifile.close() elif not self._flag_s: self._ifile.close() self._exeng.close()