Source code for pcapkit.interface.core

# -*- coding: utf-8 -*-
"""Core Interface
====================

.. module:: pcapkit.interface.core

:mod:`pcapkit.interface.core` defines core user-oriented
interfaces, variables, and etc., which wraps around the
foundation classes from :mod:`pcapkit.foundation`.

"""
import io
import sys
from typing import TYPE_CHECKING

from pcapkit.foundation.extraction import Extractor
from pcapkit.foundation.reassembly.ipv4 import IPv4 as IPv4_Reassembly
from pcapkit.foundation.reassembly.ipv6 import IPv6 as IPv6_Reassembly
from pcapkit.foundation.reassembly.tcp import TCP as TCP_Reassembly
from pcapkit.foundation.traceflow.tcp import TCP as TCP_TraceFlow
from pcapkit.protocols.protocol import ProtocolBase as Protocol
from pcapkit.utilities.exceptions import FormatError

if TYPE_CHECKING:
    from typing import IO, Optional, Type

    from typing_extensions import Literal

    from pcapkit.foundation.extraction import Engines, Formats, Layers, Protocols, VerboseHandler
    from pcapkit.foundation.reassembly.reassembly import ReassemblyBase as Reassembly
    from pcapkit.foundation.traceflow.traceflow import TraceFlowBase as TraceFlow

__all__ = [
    'extract', 'reassemble', 'trace',                       # interface functions
    'TREE', 'JSON', 'PLIST', 'PCAP',                        # format macros
    'LINK', 'INET', 'TRANS', 'APP', 'RAW',                  # layer macros
    'DPKT', 'Scapy', 'PyShark', 'PCAPKit',                  # engine macros
]

# output file formats
TREE = 'tree'
JSON = 'json'
PLIST = 'plist'
PCAP = 'pcap'

# layer thresholds
RAW = 'none'
LINK = 'link'
INET = 'internet'
TRANS = 'transport'
APP = 'application'

# extraction engines
DPKT = 'dpkt'
Scapy = 'scapy'
PCAPKit = 'default'
PyShark = 'pyshark'


[docs] def extract(fin: 'Optional[str | IO[bytes]]' = None, fout: 'Optional[str]' = None, format: 'Optional[Formats]' = None, # basic settings # pylint: disable=redefined-builtin auto: 'bool' = True, extension: 'bool' = True, store: 'bool' = True, # internal settings # pylint: disable=line-too-long files: 'bool' = False, nofile: 'bool' = False, verbose: 'bool | VerboseHandler' = False, # output settings # pylint: disable=line-too-long engine: 'Optional[Engines]' = None, layer: 'Optional[Layers] | Type[Protocol]' = None, # extraction settings # pylint: disable=line-too-long protocol: 'Optional[Protocols]' = None, # extraction settings # pylint: disable=line-too-long reassembly: 'bool' = False, reasm_strict: 'bool' = True, reasm_store: 'bool' = True, # reassembly settings # pylint: disable=line-too-long trace: 'bool' = False, trace_fout: 'Optional[str]' = None, trace_format: 'Optional[Formats]' = None, # trace settings # pylint: disable=line-too-long trace_byteorder: 'Literal["big", "little"]' = sys.byteorder, trace_nanosecond: 'bool' = False, # trace settings # pylint: disable=line-too-long ip: 'bool' = False, ipv4: 'bool' = False, ipv6: 'bool' = False, tcp: 'bool' = False, # reassembly/trace settings # pylint: disable=line-too-long buffer_size: 'int' = io.DEFAULT_BUFFER_SIZE, buffer_save: 'bool' = False, buffer_path: 'Optional[str]' = None, # buffer settings # pylint: disable=line-too-long no_eof: 'bool' = False) -> 'Extractor': """Extract a PCAP file. Arguments: fin: file name to be read or a binary IO object; if file not exist, raise :exc:`FileNotFound` fout: file name to be written format: file format of output auto: if automatically run till EOF extension: if check and append extensions to output file store: if store extracted packet info files: if split each frame into different files nofile: if no output file is to be dumped verbose: a :obj:`bool` value or a function takes the :class:`Extractor` instance and current parsed frame (depends on engine selected) as parameters to print verbose output information engine: extraction engine to be used layer: extract til which layer protocol: extract til which protocol reassembly: if perform reassembly reasm_strict: if set strict flag for reassembly reasm_store: if store reassembled datagrams trace: if trace TCP traffic flows trace_fout: path name for flow tracer if necessary trace_format: output file format of flow tracer trace_byteorder: output file byte order trace_nanosecond: output nanosecond-resolution file flag ip: if record data for IPv4 & IPv6 reassembly (must be used with ``reassembly=True``) ipv4: if perform IPv4 reassembly (must be used with ``reassembly=True``) ipv6: if perform IPv6 reassembly (must be used with ``reassembly=True``) tcp: if perform TCP reassembly and/or flow tracing (must be used with ``reassembly=True`` or ``trace=True``) buffer_size: buffer size for reading input file (for :class:`~pcapkit.corekit.io.SeekableReader` only) buffer_save: if save buffer to file (for :class:`~pcapkit.corekit.io.SeekableReader` only) buffer_path: path name for buffer file if necessary (for :class:`~pcapkit.corekit.io.SeekableReader` only) no_eof: if not raise :exc:`EOFError` when reach EOF Returns: An :class:`~pcapkit.foundation.extraction.Extractor` object. """ if isinstance(layer, type) and issubclass(layer, Protocol): layer = (layer.__layer__ or 'none').lower() # type: ignore[assignment] return Extractor(fin=fin, fout=fout, format=format, store=store, files=files, nofile=nofile, auto=auto, verbose=verbose, extension=extension, engine=engine, layer=layer, protocol=protocol, # type: ignore[arg-type] ip=ip, ipv4=ipv4, ipv6=ipv6, tcp=tcp, reassembly=reassembly, reasm_store=reasm_store, reasm_strict=reasm_strict, trace=trace, trace_fout=trace_fout, trace_format=trace_format, trace_byteorder=trace_byteorder, trace_nanosecond=trace_nanosecond, buffer_size=buffer_size, buffer_path=buffer_path, buffer_save=buffer_save, no_eof=no_eof)
[docs] def reassemble(protocol: 'str | Type[Protocol]', strict: 'bool' = False) -> 'Reassembly': """Reassemble fragmented datagrams. Arguments: protocol: protocol to be reassembled strict: if return all datagrams (including those not implemented) when submit Returns: A :class:`~pcapkit.foundation.reassembly.reassembly.Reassembly` object of corresponding protocol. Raises: FormatError: If ``protocol`` is **NOT** any of IPv4, IPv6 or TCP. """ if isinstance(protocol, type) and issubclass(protocol, Protocol): protocol = protocol.id()[0] if protocol == 'IPv4': return IPv4_Reassembly(strict=strict) if protocol == 'IPv6': return IPv6_Reassembly(strict=strict) if protocol == 'TCP': return TCP_Reassembly(strict=strict) raise FormatError(f'Unsupported reassembly protocol: {protocol}')
[docs] def trace(protocol: 'str | Type[Protocol]', fout: 'Optional[str]', format: 'Optional[str]', # pylint: disable=redefined-builtin byteorder: 'Literal["little", "big"]' = sys.byteorder, nanosecond: bool = False) -> 'TraceFlow': """Trace flows. Arguments: protocol: protocol to be reassembled fout: output path format: output format byteorder: output file byte order nanosecond: output nanosecond-resolution file flag Returns: A :class:`~pcapkit.foundation.traceflow.traceflow.TraceFlow` object. Raises: FormatError: If ``protocol`` is **NOT** TCP. """ if isinstance(protocol, type) and issubclass(protocol, Protocol): protocol = protocol.id()[0] if protocol == 'TCP': return TCP_TraceFlow(fout=fout, format=format, byteorder=byteorder, nanosecond=nanosecond) raise FormatError(f'Unsupported flow tracing protocol: {protocol}')