Source code for pcapkit.interface.core
# -*- coding: utf-8 -*-
"""Core Interface
====================
.. module:: pcapkit.interface.core
:mod:`pcapkit.interface.core` defines core user-oriented
interfaces, variables, and etc., which wraps around the
foundation classes from :mod:`pcapkit.foundation`.
"""
import io
import sys
from typing import TYPE_CHECKING
from pcapkit.foundation.extraction import Extractor
from pcapkit.foundation.reassembly.ipv4 import IPv4 as IPv4_Reassembly
from pcapkit.foundation.reassembly.ipv6 import IPv6 as IPv6_Reassembly
from pcapkit.foundation.reassembly.tcp import TCP as TCP_Reassembly
from pcapkit.foundation.traceflow.tcp import TCP as TCP_TraceFlow
from pcapkit.protocols.protocol import ProtocolBase as Protocol
from pcapkit.utilities.exceptions import FormatError
if TYPE_CHECKING:
from typing import IO, Optional, Type
from typing_extensions import Literal
from pcapkit.foundation.extraction import Engines, Formats, Layers, Protocols, VerboseHandler
from pcapkit.foundation.reassembly.reassembly import ReassemblyBase as Reassembly
from pcapkit.foundation.traceflow.traceflow import TraceFlowBase as TraceFlow
__all__ = [
'extract', 'reassemble', 'trace', # interface functions
'TREE', 'JSON', 'PLIST', 'PCAP', # format macros
'LINK', 'INET', 'TRANS', 'APP', 'RAW', # layer macros
'DPKT', 'Scapy', 'PyShark', 'PCAPKit', # engine macros
]
# output file formats
TREE = 'tree'
JSON = 'json'
PLIST = 'plist'
PCAP = 'pcap'
# layer thresholds
RAW = 'none'
LINK = 'link'
INET = 'internet'
TRANS = 'transport'
APP = 'application'
# extraction engines
DPKT = 'dpkt'
Scapy = 'scapy'
PCAPKit = 'default'
PyShark = 'pyshark'
[docs]
def extract(fin: 'Optional[str | IO[bytes]]' = None, fout: 'Optional[str]' = None, format: 'Optional[Formats]' = None, # basic settings # pylint: disable=redefined-builtin
auto: 'bool' = True, extension: 'bool' = True, store: 'bool' = True, # internal settings # pylint: disable=line-too-long
files: 'bool' = False, nofile: 'bool' = False, verbose: 'bool | VerboseHandler' = False, # output settings # pylint: disable=line-too-long
engine: 'Optional[Engines]' = None, layer: 'Optional[Layers] | Type[Protocol]' = None, # extraction settings # pylint: disable=line-too-long
protocol: 'Optional[Protocols]' = None, # extraction settings # pylint: disable=line-too-long
reassembly: 'bool' = False, reasm_strict: 'bool' = True, reasm_store: 'bool' = True, # reassembly settings # pylint: disable=line-too-long
trace: 'bool' = False, trace_fout: 'Optional[str]' = None, trace_format: 'Optional[Formats]' = None, # trace settings # pylint: disable=line-too-long
trace_byteorder: 'Literal["big", "little"]' = sys.byteorder, trace_nanosecond: 'bool' = False, # trace settings # pylint: disable=line-too-long
ip: 'bool' = False, ipv4: 'bool' = False, ipv6: 'bool' = False, tcp: 'bool' = False, # reassembly/trace settings # pylint: disable=line-too-long
buffer_size: 'int' = io.DEFAULT_BUFFER_SIZE, buffer_save: 'bool' = False, buffer_path: 'Optional[str]' = None, # buffer settings # pylint: disable=line-too-long
no_eof: 'bool' = False) -> 'Extractor':
"""Extract a PCAP file.
Arguments:
fin: file name to be read or a binary IO object;
if file not exist, raise :exc:`FileNotFound`
fout: file name to be written
format: file format of output
auto: if automatically run till EOF
extension: if check and append extensions to output file
store: if store extracted packet info
files: if split each frame into different files
nofile: if no output file is to be dumped
verbose: a :obj:`bool` value or a function takes the :class:`Extractor`
instance and current parsed frame (depends on engine selected) as
parameters to print verbose output information
engine: extraction engine to be used
layer: extract til which layer
protocol: extract til which protocol
reassembly: if perform reassembly
reasm_strict: if set strict flag for reassembly
reasm_store: if store reassembled datagrams
trace: if trace TCP traffic flows
trace_fout: path name for flow tracer if necessary
trace_format: output file format of flow tracer
trace_byteorder: output file byte order
trace_nanosecond: output nanosecond-resolution file flag
ip: if record data for IPv4 & IPv6 reassembly (must be used with ``reassembly=True``)
ipv4: if perform IPv4 reassembly (must be used with ``reassembly=True``)
ipv6: if perform IPv6 reassembly (must be used with ``reassembly=True``)
tcp: if perform TCP reassembly and/or flow tracing
(must be used with ``reassembly=True`` or ``trace=True``)
buffer_size: buffer size for reading input file (for :class:`~pcapkit.corekit.io.SeekableReader` only)
buffer_save: if save buffer to file (for :class:`~pcapkit.corekit.io.SeekableReader` only)
buffer_path: path name for buffer file if necessary (for :class:`~pcapkit.corekit.io.SeekableReader` only)
no_eof: if not raise :exc:`EOFError` when reach EOF
Returns:
An :class:`~pcapkit.foundation.extraction.Extractor` object.
"""
if isinstance(layer, type) and issubclass(layer, Protocol):
layer = (layer.__layer__ or 'none').lower() # type: ignore[assignment]
return Extractor(fin=fin, fout=fout, format=format,
store=store, files=files, nofile=nofile,
auto=auto, verbose=verbose, extension=extension,
engine=engine, layer=layer, protocol=protocol, # type: ignore[arg-type]
ip=ip, ipv4=ipv4, ipv6=ipv6, tcp=tcp,
reassembly=reassembly, reasm_store=reasm_store, reasm_strict=reasm_strict,
trace=trace, trace_fout=trace_fout, trace_format=trace_format,
trace_byteorder=trace_byteorder, trace_nanosecond=trace_nanosecond,
buffer_size=buffer_size, buffer_path=buffer_path, buffer_save=buffer_save,
no_eof=no_eof)
[docs]
def reassemble(protocol: 'str | Type[Protocol]', strict: 'bool' = False) -> 'Reassembly':
"""Reassemble fragmented datagrams.
Arguments:
protocol: protocol to be reassembled
strict: if return all datagrams (including those not implemented) when submit
Returns:
A :class:`~pcapkit.foundation.reassembly.reassembly.Reassembly` object of corresponding protocol.
Raises:
FormatError: If ``protocol`` is **NOT** any of IPv4, IPv6 or TCP.
"""
if isinstance(protocol, type) and issubclass(protocol, Protocol):
protocol = protocol.id()[0]
if protocol == 'IPv4':
return IPv4_Reassembly(strict=strict)
if protocol == 'IPv6':
return IPv6_Reassembly(strict=strict)
if protocol == 'TCP':
return TCP_Reassembly(strict=strict)
raise FormatError(f'Unsupported reassembly protocol: {protocol}')
[docs]
def trace(protocol: 'str | Type[Protocol]', fout: 'Optional[str]',
format: 'Optional[str]', # pylint: disable=redefined-builtin
byteorder: 'Literal["little", "big"]' = sys.byteorder,
nanosecond: bool = False) -> 'TraceFlow':
"""Trace flows.
Arguments:
protocol: protocol to be reassembled
fout: output path
format: output format
byteorder: output file byte order
nanosecond: output nanosecond-resolution file flag
Returns:
A :class:`~pcapkit.foundation.traceflow.traceflow.TraceFlow` object.
Raises:
FormatError: If ``protocol`` is **NOT** TCP.
"""
if isinstance(protocol, type) and issubclass(protocol, Protocol):
protocol = protocol.id()[0]
if protocol == 'TCP':
return TCP_TraceFlow(fout=fout, format=format, byteorder=byteorder, nanosecond=nanosecond)
raise FormatError(f'Unsupported flow tracing protocol: {protocol}')