Source code for pcapkit.foundation.extraction
# -*- coding: utf-8 -*-
# pylint: disable=import-outside-toplevel,fixme
# mypy: disable-error-code=dict-item
"""Extractor for PCAP Files
==============================
.. module:: pcapkit.foundation.extraction
:mod:`pcapkit.foundation.extraction` contains
:class:`~pcapkit.foundation.extraction.Extractor` only,
which synthesises file I/O and protocol analysis,
coordinates information exchange in all network layers,
extracts parametres from a PCAP file.
"""
import collections
import importlib
import io
import os
import sys
from typing import TYPE_CHECKING, Generic, TypeVar, cast
from dictdumper.dumper import Dumper
from pcapkit.corekit.io import SeekableReader
from pcapkit.corekit.module import ModuleDescriptor
from pcapkit.dumpkit.common import make_dumper
from pcapkit.foundation.engines.engine import Engine
from pcapkit.foundation.engines.pcap import PCAP as PCAP_Engine
from pcapkit.foundation.engines.pcapng import PCAPNG as PCAPNG_Engine
from pcapkit.foundation.reassembly import ReassemblyManager
from pcapkit.foundation.reassembly.data import ReassemblyData
from pcapkit.foundation.reassembly.reassembly import Reassembly
from pcapkit.foundation.traceflow import TraceFlowManager
from pcapkit.foundation.traceflow.data import TraceFlowData
from pcapkit.foundation.traceflow.traceflow import TraceFlow
from pcapkit.utilities.exceptions import (CallableError, FileNotFound, FormatError, IterableError,
                                          RegistryError, UnsupportedCall, stacklevel)
from pcapkit.utilities.logging import logger
from pcapkit.utilities.warnings import (EngineWarning, ExtractionWarning, FormatWarning,
                                        RegistryWarning, warn)
if TYPE_CHECKING:
    from io import BufferedReader
    from types import ModuleType, TracebackType
    from typing import IO, Any, Callable, DefaultDict, Optional, Type, Union
    from dpkt.dpkt import Packet as DPKTPacket
    from pyshark.packet.packet import Packet as PySharkPacket
    from scapy.packet import Packet as ScapyPacket
    from typing_extensions import Literal
    from pcapkit.foundation.reassembly.ipv4 import IPv4 as IPv4_Reassembly
    from pcapkit.foundation.reassembly.ipv6 import IPv6 as IPv6_Reassembly
    from pcapkit.foundation.reassembly.tcp import TCP as TCP_Reassembly
    from pcapkit.foundation.traceflow.tcp import TCP as TCP_TraceFlow
    from pcapkit.protocols.misc.pcap.frame import Frame
    from pcapkit.protocols.misc.pcapng import PCAPNG
    from pcapkit.protocols.protocol import ProtocolBase as Protocol
    Formats = Literal['pcap', 'json', 'tree', 'plist']
    Engines = Literal['default', 'pcapkit', 'dpkt', 'scapy', 'pyshark']
    Layers = Literal['link', 'internet', 'transport', 'application', 'none']
    Packet = Union[Frame, PCAPNG, ScapyPacket, DPKTPacket, PySharkPacket]
    Protocols = Union[str, Protocol, Type[Protocol]]
    VerboseHandler = Callable[['Extractor', Packet], Any]
__all__ = ['Extractor']
_P = TypeVar('_P')
[docs]
class Extractor(Generic[_P]):
    """Extractor for PCAP files.
    Notes:
        For supported engines, please refer to
        :meth:`~pcapkit.foundation.extraction.Extractor.run`.
    """
    if TYPE_CHECKING:
        #: Input file name.
        _ifnm: 'str'
        #: Output file name.
        _ofnm: 'Optional[str]'
        #: Output file extension.
        _fext: 'Optional[str]'
        #: Auto extract flag. It indicates if the extraction process should
        #: continue automatically until the EOF is reached.
        _flag_a: 'bool'
        #: Store data flag. It indicates if the extracted frames should be
        #: stored in memory.
        _flag_d: 'bool'
        #: EOF flag. It indicates if the EOF is reached.
        _flag_e: 'bool'
        #: Split file flag, i.e. dump each frame into different files.
        _flag_f: 'bool'
        #: No output file, i.e., no output file is to be generated.
        _flag_q: 'bool'
        #: Trace flag. It indicates if the flow tracing is enabled.
        _flag_t: 'bool'
        #: Verbose flag. This is used to determine if the verbose callback
        #: function should be called at each frame.
        _flag_v: 'bool'
        #: No EOF flag. It is useful when the input file is a live capture,
        #: as the extraction process will not stop until the user interrupt
        #: the process.
        _flag_n: 'bool'
        #: Input filename flag. It indicates if the input file is a file
        #: name or a binary IO object. For the latter, we should not close
        #: the file object after extraction.
        _flag_s: 'bool'
        #: Verbose callback function.
        #_vfunc: 'VerboseHandler'
        #: Frame number.
        _frnum: 'int'
        #: Frame records.
        _frame: 'list[Packet]'
        #: Frame record for reassembly.
        _reasm: 'ReassemblyManager'
        #: Frame record for flow tracing.
        _trace: 'TraceFlowManager'
        #: IPv4 flag. It indicates if the IPv4 reassembly and/or flow tracing
        #: is enabled.
        _ipv4: 'bool'
        #: IPv6 flag. It indicates if the IPv6 reassembly and/or flow tracing
        #: is enabled.
        _ipv6: 'bool'
        #: TCP flag. It indicates if the TCP reassembly and/or flow tracing
        #: is enabled.
        _tcp: 'bool'
        #: Extract til protocol.
        _exptl: 'Protocols'
        #: Extract til layer.
        _exlyr: 'Layers'
        #: Extraction engine name.
        _exnam: 'Engines'
        #: Extraction engine instance.
        _exeng: 'Engine[_P]'
        #: Input file object.
        _ifile: 'BufferedReader'
        #: Output file object.
        _ofile: 'Dumper | Type[Dumper]'
        #: Magic number.
        _magic: 'bytes'
        #: Output format.
        _offmt: 'Formats'
    #: List of potential PCAP file extentions.
    PCAP_EXT = ['.pcap', '.cap', '.pcapng']
    ##########################################################################
    # Defaults.
    ##########################################################################
    #: Format dumper mapping for writing output files. The values should be a
    #: tuple representing the module name and class name, or a
    #: :class:`dictdumper.dumper.Dumper` subclass, and corresponding file extension.
    __output__ = collections.defaultdict(
        lambda: (ModuleDescriptor('pcapkit.dumpkit', 'NotImplementedIO'), None),
        {
            'pcap': (ModuleDescriptor('pcapkit.dumpkit', 'PCAPIO'), '.pcap'),
            'cap': (ModuleDescriptor('pcapkit.dumpkit', 'PCAPIO'), '.pcap'),
            'plist': (ModuleDescriptor('dictdumper', 'PLIST'), '.plist'),
            'xml': (ModuleDescriptor('dictdumper', 'PLIST'), '.plist'),
            'json': (ModuleDescriptor('dictdumper', 'JSON'), '.json'),
            'tree': (ModuleDescriptor('dictdumper', 'Tree'), '.txt'),
            'text': (ModuleDescriptor('dictdumper', 'Text'), '.txt'),
            'txt': (ModuleDescriptor('dictdumper', 'Tree'), '.txt'),
        },
    )  # type: DefaultDict[str, tuple[ModuleDescriptor[Dumper] | Type[Dumper], str | None]]
    #: Engine mapping for extracting frames. The values should be a tuple representing
    #: the module name and class name, or an :class:`~pcapkit.foundation.engines.engine.Engine`
    #: subclass.
    __engine__ = {
        'scapy': ModuleDescriptor('pcapkit.foundation.engines.scapy', 'Scapy'),
        'dpkt': ModuleDescriptor('pcapkit.foundation.engines.dpkt', 'DPKT'),
        'pyshark': ModuleDescriptor('pcapkit.foundation.engines.pyshark', 'PyShark'),
    }  # type: dict[str, ModuleDescriptor[Engine] | Type[Engine]]
    #: Reassembly support mapping for extracting frames. The values should be a tuple
    #: representing the module name and class name, or a :class:`~pcapkit.foundation.reassembly.reassembly.Reassembly`
    #: subclass.
    __reassembly__ = {
        'ipv4': ModuleDescriptor('pcapkit.foundation.reassembly.ipv4', 'IPv4'),
        'ipv6': ModuleDescriptor('pcapkit.foundation.reassembly.ipv6', 'IPv6'),
        'tcp': ModuleDescriptor('pcapkit.foundation.reassembly.tcp', 'TCP'),
    }  # type: dict[str, ModuleDescriptor[Reassembly] | Type[Reassembly]]
    #: Flow tracing support mapping for extracting frames. The values should be a tuple
    #: representing the module name and class name, or a :class:`~pcapkit.foundation.traceflow.traceflow.TraceFlow`
    #: subclass.
    __traceflow__ = {
        'tcp': ModuleDescriptor('pcapkit.foundation.traceflow.tcp', 'TCP'),
    }  # type: dict[str, ModuleDescriptor[TraceFlow] | Type[TraceFlow]]
    ##########################################################################
    # Properties.
    ##########################################################################
    @property
    def length(self) -> 'int':
        """Frame number (of current extracted frame or all)."""
        return self._frnum
    @property
    def format(self) -> 'Formats':
        """Format of output file.
        Raises:
            UnsupportedCall: If :attr:`self._flag_q <pcapkit.foundation.extraction.Extractor._flag_q>`
                is set as :data:`True`, as output is disabled by initialisation parameter.
        """
        if self._flag_q:
            raise UnsupportedCall("'Extractor(nofile=True)' object has no attribute 'format'")
        return self._offmt
    @property
    def input(self) -> 'str':
        """Name of input PCAP file."""
        return self._ifnm
    @property
    def output(self) -> 'str':
        """Name of output file.
        Raises:
            UnsupportedCall: If :attr:`self._flag_q <pcapkit.foundation.extraction.Extractor._flag_q>`
                is set as :data:`True`, as output is disabled by initialisation parameter.
        """
        if self._flag_q:
            raise UnsupportedCall("'Extractor(nofile=True)' object has no attribute 'format'")
        return cast('str', self._ofnm)
    @property
    def frame(self) -> 'tuple[Packet, ...]':
        """Extracted frames.
        Raises:
            UnsupportedCall: If :attr:`self._flag_d <pcapkit.foundation.extraction.Extractor._flag_d>`
                is :data:`False`, as storing frame data is disabled.
        """
        if self._flag_d:
            return tuple(self._frame)
        raise UnsupportedCall("'Extractor(store=False)' object has no attribute 'frame'")
    @property
    def reassembly(self) -> 'ReassemblyData':
        """Frame record for reassembly.
        * ``ipv4`` -- tuple of IPv4 payload fragment (:term:`reasm.ipv4.datagram`)
        * ``ipv6`` -- tuple of IPv6 payload fragment (:term:`reasm.ipv6.datagram`)
        * ``tcp`` -- tuple of TCP payload fragment (:term:`reasm.tcp.datagram`)
        Raises:
            UnsupportedCall: If :attr:`self._flag_r <pcapkit.foundation.extraction.Extractor._flag_r>`
                is :data:`False`, as reassembly is disabled.
        """
        if self._flag_r:
            data = ReassemblyData(
                ipv4=tuple(self._reasm.ipv4.datagram) if self._ipv4 else None,
                ipv6=tuple(self._reasm.ipv6.datagram) if self._ipv6 else None,
                tcp=tuple(self._reasm.tcp.datagram) if self._tcp else None,
            )
            return data
        raise UnsupportedCall("'Extractor(reassembly=False)' object has no attribute 'reassembly'")
    @property
    def trace(self) -> 'TraceFlowData':
        """Index table for traced flow.
        * ``tcp`` -- tuple of TCP flows (:term:`trace.tcp.index`)
        Raises:
            UnsupportedCall: If :attr:`self._flag_t <pcapkit.foundation.extraction.Extractor._flag_t>`
                is :data:`False`, as flow tracing is disabled.
        """
        if self._flag_t:
            data = TraceFlowData(
                tcp=tuple(self._trace.tcp.index) if self._tcp else None,
            )
            return data
        raise UnsupportedCall("'Extractor(trace=False)' object has no attribute 'trace'")
    @property
    def engine(self) -> 'Engine':
        """PCAP extraction engine."""
        return self._exeng
    @property
    def magic_number(self) -> 'bytes':
        """Magic number of input PCAP file."""
        return self._magic
    ##########################################################################
    # Methods.
    ##########################################################################
[docs]
    @classmethod
    def register_dumper(cls, format: 'str', dumper: 'ModuleDescriptor[Dumper] | Type[Dumper]', ext: 'str') -> 'None':
        r"""Register a new dumper class.
        Notes:
            The full qualified class name of the new dumper class
            should be as ``{dumper.module}.{dumper.name}``.
        Arguments:
            format: format name
            dumper: module descriptor or a :class:`dictdumper.dumper.Dumper` subclass
            ext: file extension
        """
        if isinstance(dumper, ModuleDescriptor):
            dumper = dumper.klass
        if not issubclass(dumper, Dumper):
            raise RegistryError(f'dumper must be a Dumper subclass, not {dumper!r}')
        if format in cls.__output__:
            warn(f'dumper {format} already registered, overwriting', RegistryWarning)
        cls.__output__[format] = (dumper, ext)
[docs]
    @classmethod
    def register_engine(cls, name: 'str', engine: 'ModuleDescriptor[Engine] | Type[Engine]') -> 'None':
        r"""Register a new extraction engine.
        Notes:
            The full qualified class name of the new extraction engine
            should be as ``{engine.module}.{engine.name}``.
        Arguments:
            name: engine name
            engine: module descriptor or an
                :class:`~pcapkit.foundation.engines.engine.Engine` subclass
        """
        if isinstance(engine, ModuleDescriptor):
            engine = engine.klass
        if not issubclass(engine, Engine):
            raise RegistryError(f'engine must be an Engine subclass, not {engine!r}')
        if name in cls.__engine__:
            warn(f'engine {name} already registered, overwriting', RegistryWarning)
        cls.__engine__[name] = engine
[docs]
    @classmethod
    def register_reassembly(cls, protocol: 'str', reassembly: 'ModuleDescriptor[Reassembly] | Type[Reassembly]') -> 'None':
        r"""Register a new reassembly engine.
        Notes:
            The full qualified class name of the new reassembly engine
            should be as ``{reassembly.module}.{reassembly.name}``.
        Arguments:
            protocol: protocol name
            reassembly: module descriptor or a
                :class:`~pcapkit.foundation.reassembly.reassembly.Reassembly` subclass
        """
        if isinstance(reassembly, ModuleDescriptor):
            reassembly = reassembly.klass
        if not issubclass(reassembly, Reassembly):
            raise RegistryError(f'reassembly must be a Reassembly subclass, not {reassembly!r}')
        if protocol in cls.__reassembly__:
            warn(f'reassembly {protocol} already registered, overwriting', RegistryWarning)
        cls.__reassembly__[protocol] = reassembly
[docs]
    @classmethod
    def register_traceflow(cls, protocol: 'str', traceflow: 'ModuleDescriptor[TraceFlow] | Type[TraceFlow]') -> 'None':
        r"""Register a new flow tracing engine.
        Notes:
            The full qualified class name of the new flow tracing engine
            should be as ``{traceflow.module}.{traceflow.name}``.
        Arguments:
            protocol: protocol name
            traceflow: module descriptor or a
                :class:`~pcapkit.foundation.traceflow.traceflow.TraceFlow` subclass
        """
        if isinstance(traceflow, ModuleDescriptor):
            traceflow = traceflow.klass
        if not issubclass(traceflow, TraceFlow):
            raise RegistryError(f'traceflow must be a TraceFlow subclass, not {traceflow!r}')
        if protocol in cls.__traceflow__:
            warn(f'traceflow {protocol} already registered, overwriting', RegistryWarning)
        cls.__traceflow__[protocol] = traceflow
[docs]
    def run(self) -> 'None':  # pylint: disable=inconsistent-return-statements
        """Start extraction.
        We uses :meth:`~pcapkit.foundation.extraction.Extractor.import_test` to check if
        a certain engine is available or not. For supported engines, each engine has
        different driver method:
        * Default drivers:
          - PCAP Format: :class:`pcapkit.foundation.engines.pcap.PCAP`
          - PCAP-NG Format: :class:`pcapkit.foundation.engines.pcapng.PCAPNG`
        * DPKT driver: :class:`pcapkit.foundation.engines.dpkt.DPKT`
        * Scapy driver: :class:`pcapkit.foundation.engines.scapy.Scapy`
        * PyShark driver: :class:`pcapkit.foundation.engines.pyshark.PyShark`
        Warns:
            pcapkit.utilities.warnings.EngineWarning: If the extraction engine is not
                available. This is either due to dependency not installed, or supplied
                engine unknown.
        :rtype: None
        """
        if self._exnam in self.__engine__:  # check if engine is supported
            eng = self.__engine__[self._exnam]
            if isinstance(eng, ModuleDescriptor):
                eng = eng.klass
            if self.import_test(eng.module, name=eng.name) is not None:  # type: ignore[arg-type]
                self._exeng = eng(self)
                self._exeng.run()
                # start iteration
                self.record_frames()
                return
            warn(f'engine {eng.name} (`{eng.module}`) is not installed; '
                 'using default engine instead', EngineWarning, stacklevel=stacklevel())
            self._exnam = 'default'  # using default/pcapkit engine
        if self._exnam not in ('default', 'pcapkit'):
            warn(f'unsupported extraction engine: {self._exnam}; '
                 'using default engine instead', EngineWarning, stacklevel=stacklevel())
            self._exnam = 'default'  # using default/pcapkit engine
        if self._magic in PCAP_Engine.MAGIC_NUMBER:
            self._exeng = cast('Engine[_P]', PCAP_Engine(self))
        elif self._magic in PCAPNG_Engine.MAGIC_NUMBER:
            self._exeng = cast('Engine[_P]', PCAPNG_Engine(self))
        else:
            raise FormatError(f'unknown file format: {self._magic!r}')
        # start engine
        self._exeng.run()
        # start iteration
        self.record_frames()
[docs]
    @staticmethod
    def import_test(engine: 'str', *, name: 'Optional[str]' = None) -> 'Optional[ModuleType]':
        """Test import for extractcion engine.
        Args:
            engine: Extraction engine module name.
            name: Extraction engine display name.
        Warns:
            pcapkit.utilities.warnings.EngineWarning: If the engine module is not installed.
        Returns:
            If succeeded, returns the module; otherwise, returns :data:`None`.
        """
        try:
            module = importlib.import_module(engine)
        except ImportError:
            module = None
            warn(f"extraction engine '{name or engine}' not available; "
                 'using default engine instead', EngineWarning, stacklevel=stacklevel())
        return module
[docs]
    @classmethod
    def make_name(cls, fin: 'str | IO[bytes]' = 'in.pcap', fout: 'str' = 'out',
                  fmt: 'Formats' = 'tree', extension: 'bool' = True, *, files: 'bool' = False,
                  nofile: 'bool' = False) -> 'tuple[str, Optional[str], Formats, Optional[str], bool]':
        """Generate input and output filenames.
        The method will perform following processing:
        1. sanitise ``fin`` as the input PCAP filename; ``in.pcap`` as default value and
           append ``.pcap`` extension if needed and ``extension`` is :data:`True`; as well
           as test if the file exists;
        2. if ``nofile`` is :data:`True`, skips following processing;
        3. if ``fmt`` provided, then it presumes corresponding output file extension;
        4. if ``fout`` not provided, it presumes the output file name based on the presumptive
           file extension; the stem of the output file name is set as ``out``; should the file
           extension is not available, then it raises :exc:`~pcapkit.utilities.exceptions.FormatError`;
        5. if ``fout`` provided, it presumes corresponding output format if needed; should the
           presumption cannot be made, then it raises :exc:`~pcapkit.utilities.exceptions.FormatError`;
        6. it will also append corresponding file extension to the output file name if needed
           and ``extension`` is :data:`True`.
        And the method returns the generated input and output filenames as follows:
        0. input filename
        1. output filename / directory name
        2. output format
        3. output file extension (without ``.``)
        4. if split each frame into different files
        Args:
            fin: Input filename or a binary IO object.
            fout: Output filename.
            fmt: Output file format.
            extension: If append ``.pcap`` file extension to the input filename
                if ``fin`` does not have such file extension; if check and append extensions
                to output file.
            files: If split each frame into different files.
            nofile: If no output file is to be dumped.
        Returns:
            Generated input and output filenames.
        Raises:
            FileNotFound: If input file does not exists.
            FormatError: If output format not provided and cannot be presumpted.
        """
        if isinstance(fin, str):
            if extension:  # pylint: disable=else-if-used
                ifnm = fin if os.path.splitext(fin)[1] in cls.PCAP_EXT else f'{fin}.pcap'
            else:
                ifnm = fin
            if not os.path.isfile(ifnm):
                raise FileNotFound(2, 'No such file or directory', ifnm)
        else:
            ifnm = fin.name
        if nofile:
            ofnm = None
            ext = None
        else:
            ext = cls.__output__[fmt][1]
            if ext is None:
                raise FormatError(f'unknown output format: {fmt}')
            if (parent := os.path.split(fout)[0]):
                os.makedirs(parent, exist_ok=True)
            if files:
                ofnm = fout
                os.makedirs(ofnm, exist_ok=True)
            elif extension:
                ofnm = fout if os.path.splitext(fout)[1] == ext else f'{fout}{ext}'
            else:
                ofnm = fout
        return ifnm, ofnm, fmt, ext, files
[docs]
    def record_header(self) -> 'Engine':
        """Read global header.
        The method will parse the PCAP global header and save the parsed result
        to its extraction context. Information such as PCAP version, data link
        layer protocol type, nanosecond flag and byteorder will also be save
        the current :class:`~pcapkit.foundation.engins.engine.Engine` instance
        as well.
        If TCP flow tracing is enabled, the nanosecond flag and byteorder will
        be used for the output PCAP file of the traced TCP flows.
        For output, the method will dump the parsed PCAP global header under
        the name of ``Global Header``.
        """
        # pylint: disable=attribute-defined-outside-init,protected-access
        if self._magic in PCAP_Engine.MAGIC_NUMBER:
            engine = PCAP_Engine(self)
            engine.run()
            self._ifile.seek(0, os.SEEK_SET)
            return engine  # type: ignore[return-value]
        if self._magic in PCAPNG_Engine.MAGIC_NUMBER:
            engine = PCAPNG_Engine(self)  # type: ignore[assignment]
            engine.run()
            self._ifile.seek(0, os.SEEK_SET)
            return engine  # type: ignore[return-value]
        raise FormatError(f'unknown file format: {self._magic!r}')
[docs]
    def record_frames(self) -> 'None':
        """Read packet frames.
        The method calls :meth:`self._exeng.read_frame <pcapkit.foundation.engines.engine.Engine.read_frame>`
        to parse each frame from the input PCAP file; and
        performs cleanup by calling :meth:`self._exeng.close <pcapkit.foundation.engines.engine.Engine.close>`
        upon completion of the parsing process.
        Notes:
            Under non-auto mode, i.e. :attr:`self._flag_a <Extractor._flag_a>` is
            :data:`False`, the method performs no action.
        """
        if self._flag_a:
            while True:
                try:
                    self._exeng.read_frame()
                except (EOFError, StopIteration):
                    warn('EOF reached', ExtractionWarning, stacklevel=stacklevel())
                    if self._flag_n:
                        continue
                    # quit when EOF
                    break
                except KeyboardInterrupt:
                    self._cleanup()
                    raise
            self._cleanup()
    ##########################################################################
    # Data models.
    ##########################################################################
[docs]
    def __init__(self,
                 fin: 'Optional[str | IO[bytes]]' = None, fout: 'Optional[str]' = None, format: 'Optional[Formats]' = None,     # basic settings # pylint: disable=redefined-builtin
                 auto: 'bool' = True, extension: 'bool' = True, store: 'bool' = True,                                           # internal settings # pylint: disable=line-too-long
                 files: 'bool' = False, nofile: 'bool' = False, verbose: 'bool | VerboseHandler' = False,                       # output settings # pylint: disable=line-too-long
                 engine: 'Optional[Engines]' = None, layer: 'Optional[Layers]' = None, protocol: 'Optional[Protocols]' = None,  # extraction settings # pylint: disable=line-too-long
                 reassembly: 'bool' = False, reasm_strict: 'bool' = True, reasm_store: 'bool' = True,                           # reassembly settings # pylint: disable=line-too-long
                 trace: 'bool' = False, trace_fout: 'Optional[str]' = None, trace_format: 'Optional[Formats]' = None,           # trace settings # pylint: disable=line-too-long
                 trace_byteorder: 'Literal["big", "little"]' = sys.byteorder, trace_nanosecond: 'bool' = False,                 # trace settings # pylint: disable=line-too-long
                 ip: 'bool' = False, ipv4: 'bool' = False, ipv6: 'bool' = False, tcp: 'bool' = False,                           # reassembly/trace settings # pylint: disable=line-too-long
                 buffer_size: 'int' = io.DEFAULT_BUFFER_SIZE, buffer_save: 'bool' = False, buffer_path: 'Optional[str]' = None, # buffer settings # pylint: disable=line-too-long
                 no_eof: 'bool' = False) -> 'None':
        """Initialise PCAP Reader.
        Args:
            fin: file name to be read or a binary IO object;
                if file not exist, raise :exc:`FileNotFound`
            fout: file name to be written
            format: file format of output
            auto: if automatically run till EOF
            extension: if check and append extensions to output file
            store: if store extracted packet info
            files: if split each frame into different files
            nofile: if no output file is to be dumped
            verbose: a :obj:`bool` value or a function takes the :class:`Extractor`
                instance and current parsed frame (depends on engine selected) as
                parameters to print verbose output information
            engine: extraction engine to be used
            layer: extract til which layer
            protocol: extract til which protocol
            reassembly: if perform reassembly
            reasm_strict: if set strict flag for reassembly
            reasm_store: if store reassembled datagrams
            trace: if trace TCP traffic flows
            trace_fout: path name for flow tracer if necessary
            trace_format: output file format of flow tracer
            trace_byteorder: output file byte order
            trace_nanosecond: output nanosecond-resolution file flag
            ip: if record data for IPv4 & IPv6 reassembly (must be used with ``reassembly=True``)
            ipv4: if perform IPv4 reassembly (must be used with ``reassembly=True``)
            ipv6: if perform IPv6 reassembly (must be used with ``reassembly=True``)
            tcp: if perform TCP reassembly and/or flow tracing
                (must be used with ``reassembly=True`` or ``trace=True``)
            buffer_size: buffer size for reading input file (for :class:`~pcapkit.corekit.io.SeekableReader` only)
            buffer_save: if save buffer to file (for :class:`~pcapkit.corekit.io.SeekableReader` only)
            buffer_path: path name for buffer file if necessary (for :class:`~pcapkit.corekit.io.SeekableReader` only)
            no_eof: if raise :exc:`EOFError` when EOF
        Warns:
            pcapkit.utilities.warnings.FormatWarning: Warns under following circumstances:
                * If using PCAP output for TCP flow tracing while the extraction engine is PyShark.
                * If output file format is not supported.
        """
        if fin is None:
            fin = 'in.pcap'
        if fout is None:
            fout = 'out'
        if format is None:
            format = 'tree'
        ifnm, ofnm, fmt, oext, files = self.make_name(fin, fout, format, extension, files=files, nofile=nofile)
        self._ifnm = ifnm  # input file name
        self._ofnm = ofnm  # output file name
        self._fext = oext  # output file extension
        self._flag_a = auto                  # auto extract flag
        self._flag_d = store                 # store data flag
        self._flag_e = False                 # EOF flag
        self._flag_f = files                 # split file flag
        self._flag_q = nofile                # no output flag
        self._flag_r = reassembly            # reassembly flag
        self._flag_t = trace                 # trace flag
        self._flag_v = False                 # verbose flag
        self._flag_s = isinstance(fin, str)  # input filename flag
        self._flag_n = no_eof                # no EOF flag
        # verbose callback function
        if isinstance(verbose, bool):
            self._flag_v = verbose
            if verbose:
                self._vfunc = lambda e, f: print(
                    f'Frame {e._frnum:>3d}: {f.protochain}'  # pylint: disable=protected-access
                )  # pylint: disable=logging-fstring-interpolation
            else:
                self._vfunc = lambda e, f: None
        else:
            self._flag_v = True
            self._vfunc = verbose
        self._frnum = 0   # frame number
        self._frame = []  # frame record
        self._ipv4 = ipv4 or ip  # IPv4 Reassembly
        self._ipv6 = ipv6 or ip  # IPv6 Reassembly
        self._tcp = tcp          # TCP Reassembly
        self._exptl = protocol or 'null'                              # extract til protocol
        self._exlyr = cast('Layers', (layer or 'none').lower())       # extract til layer
        self._exnam = cast('Engines', (engine or 'default').lower())  # extract using engine
        if reassembly:
            reasm_obj_ipv4 = reasm_obj_ipv6 = reasm_obj_tcp = None
            if self._ipv4:
                logger.info('IPv4 reassembly enabled')
                reasm_cls_ipv4 = self.__reassembly__['ipv4']
                if isinstance(reasm_cls_ipv4, ModuleDescriptor):
                    reasm_cls_ipv4 = reasm_cls_ipv4.klass
                    self.__reassembly__['ipv4'] = reasm_cls_ipv4  # update mapping upon import
                reasm_obj_ipv4 = cast('IPv4_Reassembly', reasm_cls_ipv4(strict=reasm_strict, store=reasm_store))
            if self._ipv6:
                logger.info('IPv6 reassembly enabled')
                reasm_cls_ipv6 = self.__reassembly__['ipv6']
                if isinstance(reasm_cls_ipv6, ModuleDescriptor):
                    reasm_cls_ipv6 = reasm_cls_ipv6.klass
                    self.__reassembly__['ipv6'] = reasm_cls_ipv6  # update mapping upon import
                reasm_obj_ipv6 = cast('IPv6_Reassembly', reasm_cls_ipv6(strict=reasm_strict, store=reasm_store))
            if self._tcp:
                logger.info('TCP reassembly enabled')
                reasm_cls_tcp = self.__reassembly__['tcp']
                if isinstance(reasm_cls_tcp, ModuleDescriptor):
                    reasm_cls_tcp = reasm_cls_tcp.klass
                    self.__reassembly__['tcp'] = reasm_cls_tcp  # update mapping upon import
                reasm_obj_tcp = cast('TCP_Reassembly', reasm_cls_tcp(strict=reasm_strict, store=reasm_store))
            self._reasm = ReassemblyManager(
                ipv4=reasm_obj_ipv4,
                ipv6=reasm_obj_ipv6,
                tcp=reasm_obj_tcp,
            )
        if trace:
            trace_obj_tcp = None
            if self._exnam in ('pyshark',) and trace_format in ('pcap',):
                warn(f"'Extractor(engine={self._exnam})' does not support 'trace_format={trace_format}'; "
                     "using 'trace_format=None' instead", FormatWarning, stacklevel=stacklevel())
                trace_format = None
            if self._tcp:
                logger.info('TCP flow tracing enabled')
                trace_cls_tcp = self.__traceflow__['tcp']
                if isinstance(trace_cls_tcp, ModuleDescriptor):
                    trace_cls_tcp = trace_cls_tcp.klass
                    self.__traceflow__['tcp'] = trace_cls_tcp  # update mapping upon import
                trace_obj_tcp = cast('TCP_TraceFlow', trace_cls_tcp(fout=trace_fout, format=trace_format,
                                                                    byteorder=trace_byteorder, nanosecond=trace_nanosecond))
            self._trace = TraceFlowManager(
                tcp=trace_obj_tcp,
            )
        if self._flag_s:
            self._ifile = open(ifnm, 'rb')  # input file # pylint: disable=unspecified-encoding,consider-using-with
        else:
            self._ifile = cast('BufferedReader', fin)
        if not self._ifile.seekable():
            self._ifile = SeekableReader(self._ifile, buffer_size, buffer_save, buffer_path,
                                         stream_closing=not self._flag_s)
        if not self._flag_q:
            output, ext = self.__output__[fmt]
            if ext is None:
                warn(f'Unsupported output format: {fmt}; disabled file output feature',
                     FormatWarning, stacklevel=stacklevel())
            if isinstance(output, ModuleDescriptor):
                output = output.klass
                self.__output__[fmt] = (output, ext)  # update mapping upon import
            dumper = make_dumper(output)
            self._ofile = dumper if self._flag_f else dumper(ofnm)  # output file
        # NOTE: we use peek() to read the magic number, as the file pointer
        # will not be moved after reading; however, the returned bytes object
        # may not be exactly 4 bytes, so we use [:4] to get the first 4 bytes
        self._magic = self._ifile.peek(4)[:4]
        #self._magic = self._ifile.read(4)  # magic number
        #self._ifile.seek(0, os.SEEK_SET)
        self.run()    # start extraction
[docs]
    def __iter__(self) -> 'Extractor':
        """Iterate and parse PCAP frame.
        Raises:
            IterableError: If :attr:`self._flag_a <pcapkit.foundation.extraction.Extractor._flag_a>`
                is :data:`True`, as such operation is not applicable.
        """
        if not self._flag_a:
            return self
        raise IterableError("'Extractor(auto=True)' object is not iterable")
[docs]
    def __next__(self) -> '_P':
        """Iterate and parse next PCAP frame.
        It will call :meth:`self._exeng.read_frame <pcapkit.foundation.engines.engine.Engine.read_frame>`
        to parse next PCAP frame internally, until the EOF reached;
        then it calls :meth:`self._cleanup <_cleanup>` for the aftermath.
        """
        while True:
            try:
                return self._exeng.read_frame()
            except (EOFError, StopIteration) as error:
                warn('EOF reached', ExtractionWarning, stacklevel=stacklevel())
                if self._flag_n:
                    continue
                self._cleanup()
                raise StopIteration from error  # pylint: disable=raise-missing-from
            except KeyboardInterrupt:
                self._cleanup()
                raise
[docs]
    def __call__(self) -> '_P':
        """Works as a simple wrapper for the iteration protocol.
        Raises:
            IterableError: If :attr:`self._flag_a <pcapkit.foundation.extraction.Extractor._flag_a>`
                is :data:`True`, as iteration is not applicable.
        """
        if not self._flag_a:
            while True:
                try:
                    return self._exeng.read_frame()
                except (EOFError, StopIteration):
                    warn('EOF reached', ExtractionWarning, stacklevel=stacklevel())
                    if self._flag_n:
                        continue
                    self._cleanup()
                    raise
                except KeyboardInterrupt:
                    self._cleanup()
                    raise
        raise CallableError("'Extractor(auto=True)' object is not callable")
    def __enter__(self) -> 'Extractor':
        """Uses :class:`Extractor` as a context manager."""
        return self
    def __exit__(self, exc_type: 'Type[BaseException] | None', exc_value: 'BaseException | None',
                 traceback: 'TracebackType | None') -> 'None':  # pylint: disable=unused-argument
        """Close the input file when exits."""
        self._ifile.close()
        self._exeng.close()
    ##########################################################################
    # Utilities.
    ##########################################################################
[docs]
    def _cleanup(self) -> 'None':
        """Cleanup after extraction & analysis.
        The method calls :meth:`self._exeng.close <pcapkit.foundation.engines.engine.Engine.close>`,
        sets :attr:`self._flag_e <pcapkit.foundation.extraction.Extractor._flag_e>`
        as :data:`True` and closes the input file (if necessary).
        """
        # pylint: disable=attribute-defined-outside-init
        self._flag_e = True
        if isinstance(self._ifile, SeekableReader):
            self._ifile.close()
        elif not self._flag_s:
            self._ifile.close()
        self._exeng.close()