Source code for pcapkit.foundation.engines.pcapng

# -*- coding: utf-8 -*-
"""PCAP-NG Support
=====================

.. module:: pcapkit.foundation.engines.pcapng

This module contains the implementation for PCAP-NG file extraction
support, as is used by :class:`pcapkit.foundation.extraction.Extractor`.

"""
from logging import warn
from typing import TYPE_CHECKING, cast

from pcapkit.const.pcapng.block_type import BlockType as Enum_BlockType
from pcapkit.corekit.infoclass import Info, info_final
from pcapkit.foundation.engines.engine import EngineBase as Engine
from pcapkit.protocols.misc.pcapng import PCAPNG as P_PCAPNG
from pcapkit.utilities.exceptions import FormatError, stacklevel
from pcapkit.utilities.warnings import DeprecatedFormatWarning

__all__ = ['PCAPNG']

if TYPE_CHECKING:
    from pcapkit.foundation.extraction import Extractor
    from pcapkit.protocols.data.misc.pcapng import PCAPNG as Data_PCAPNG
    from pcapkit.protocols.data.misc.pcapng import CustomBlock as Data_CustomBlock
    from pcapkit.protocols.data.misc.pcapng import \
        DecryptionSecretsBlock as Data_DecryptionSecretsBlock
    from pcapkit.protocols.data.misc.pcapng import EnhancedPacketBlock as Data_EnhancedPacketBlock
    from pcapkit.protocols.data.misc.pcapng import \
        InterfaceDescriptionBlock as Data_InterfaceDescriptionBlock
    from pcapkit.protocols.data.misc.pcapng import \
        InterfaceStatisticsBlock as Data_InterfaceStatisticsBlock
    from pcapkit.protocols.data.misc.pcapng import NameResolutionBlock as Data_NameResolutionBlock
    from pcapkit.protocols.data.misc.pcapng import PacketBlock as Data_PacketBlock
    from pcapkit.protocols.data.misc.pcapng import SectionHeaderBlock as Data_SectionHeaderBlock
    from pcapkit.protocols.data.misc.pcapng import \
        SystemdJournalExportBlock as Data_SystemdJournalExportBlock
    from pcapkit.protocols.data.misc.pcapng import UnknownBlock as Data_UnknownBlock


[docs] @info_final class Context(Info): """Context manager for PCAP-NG file format.""" #: Section header. section: 'Data_SectionHeaderBlock' def __post_init__(self) -> None: """Post initialisation hook.""" self.__update__( interfaces=[], #packets=[], names=[], journals=[], secrets=[], custom=[], statistics=[], unknown=[], ) if TYPE_CHECKING: #: Interface descriptions. interfaces: 'list[Data_InterfaceDescriptionBlock]' #: Packets. #packets: 'list[Data_PacketBlock | Data_SimplePacketBlock | Data_EnhancedPacketBlock]' #: Name resolution records. names: 'list[Data_NameResolutionBlock]' #: :manpage:`systemd(1)` journal export records. journals: 'list[Data_SystemdJournalExportBlock]' #: Decryption secrets. secrets: 'list[Data_DecryptionSecretsBlock]' #: Custom blocks. custom: 'list[Data_CustomBlock]' #: Interface statistics. statistics: 'list[Data_InterfaceStatisticsBlock]' #: Unknown blocks. unknown: 'list[Data_UnknownBlock]' def __init__(self, section: 'Data_SectionHeaderBlock') -> 'None': ...
[docs] class PCAPNG(Engine[P_PCAPNG]): """PCAP-NG file extraction support. Args: extractor: :class:`~pcapkit.foundation.extraction.Extractor` instance. """ if TYPE_CHECKING: #: Current context. _ctx: 'Context' #: File context storage. _ctx_list: 'list[Context]' MAGIC_NUMBER = ( b'\x0a\x0d\x0d\x0a', ) ########################################################################## # Defaults. ########################################################################## #: Engine name. __engine_name__ = 'PCAP-NG' #: Engine module name. __engine_module__ = 'pcapkit.protocols.misc.pcapng' ########################################################################## # Data models. ########################################################################## def __init__(self, extractor: 'Extractor') -> 'None': self._ctx = None # type: ignore[assignment] self._ctx_list = [] super().__init__(extractor) ########################################################################## # Methods. ##########################################################################
[docs] def run(self) -> 'None': """Start extraction. This method is the entry point for PCAP-NG file extraction. It will directly extract the first block, which should be a section header block, and then save the related information into the internal context storage. """ ext = self._extractor shb = P_PCAPNG(ext._ifile, num=0, sct=1, ctx=None) if shb.info.type != Enum_BlockType.Section_Header_Block: raise FormatError(f'PCAP-NG: [SHB] invalid block type: {shb.info.type!r}') self._ctx = Context(cast('Data_SectionHeaderBlock', shb.info)) self._ctx_list.append(self._ctx) shb._ctx = self._ctx self._write_file(shb.info, name=f'Section Header {len(self._ctx_list)}')
[docs] def read_frame(self) -> 'P_PCAPNG': """Read frames. This method performs following tasks: - read the next block from input file; - check if the block is a packet block; - if not, save the block into the internal context storage and repeat; - if yes, save the related information into the internal context storage; - write the parsed block into output file. - reassemble IP and/or TCP fragments; - trace TCP flows if any; - record frame information if any. Returns: Parsed PCAP-NG block. """ from pcapkit.toolkit.pcapng import (ipv4_reassembly, ipv6_reassembly, tcp_reassembly, tcp_traceflow) ext = self._extractor while True: # read next block block = P_PCAPNG(ext._ifile, num=ext._frnum+1, sct=len(self._ctx_list), ctx=self._ctx, layer=ext._exlyr, protocol=ext._exptl, __packet__={ 'snaplen': self._get_snaplen(), }) # check block type if block.info.type == Enum_BlockType.Section_Header_Block: self._ctx = Context(cast('Data_SectionHeaderBlock', block.info)) self._ctx_list.append(self._ctx) block._ctx = self._ctx self._write_file(block.info, name=f'Section Header {len(self._ctx_list)}') elif block.info.type == Enum_BlockType.Interface_Description_Block: self._ctx.interfaces.append(cast('Data_InterfaceDescriptionBlock', block.info)) self._write_file(block.info, name=f'Interface Description {len(self._ctx.interfaces)}') elif block.info.type == Enum_BlockType.Name_Resolution_Block: self._ctx.names.append(cast('Data_NameResolutionBlock', block.info)) self._write_file(block.info, name=f'Name Resolution {len(self._ctx.names)}') elif block.info.type == Enum_BlockType.systemd_Journal_Export_Block: self._ctx.journals.append(cast('Data_SystemdJournalExportBlock', block.info)) self._write_file(block.info, name=f'systemd Journal Export {len(self._ctx.journals)}') elif block.info.type == Enum_BlockType.Decryption_Secrets_Block: self._ctx.secrets.append(cast('Data_DecryptionSecretsBlock', block.info)) self._write_file(block.info, name=f'Decryption Secrets {len(self._ctx.secrets)}') elif block.info.type == Enum_BlockType.Interface_Statistics_Block: isb_info = cast('Data_InterfaceStatisticsBlock', block.info) if isb_info.interface_id >= len(self._ctx.interfaces): raise FormatError(f'PCAP-NG: [ISB] invalid interface ID: {isb_info.interface_id}') self._ctx.statistics.append(isb_info) self._write_file(isb_info, name=f'Interface Statistics {len(self._ctx.statistics)}') elif block.info.type in (Enum_BlockType.Custom_Block_that_rewriters_can_copy_into_new_files, Enum_BlockType.Custom_Block_that_rewriters_should_not_copy_into_new_files): self._ctx.custom.append(cast('Data_CustomBlock', block.info)) self._write_file(block.info, name=f'Custom {len(self._ctx.custom)}') elif block.info.type == Enum_BlockType.Enhanced_Packet_Block: epb_info = cast('Data_EnhancedPacketBlock', block.info) if epb_info.interface_id >= len(self._ctx.interfaces): raise FormatError(f'PCAP-NG: [EPB] invalid interface ID: {epb_info.interface_id}') break elif block.info.type == Enum_BlockType.Simple_Packet_Block: if len(self._ctx.interfaces) != 1: raise FormatError(f'PCAP-NG: [SPB] invalid section with {len(self._ctx.interfaces)} interfaces') break elif block.info.type == Enum_BlockType.Packet_Block: pack_info = cast('Data_PacketBlock', block.info) if pack_info.interface_id >= len(self._ctx.interfaces): raise FormatError(f'PCAP-NG: [Packet] invalid interface ID: {pack_info.interface_id}') warn('PCAP-NG: [Packet] deprecated block type', DeprecatedFormatWarning, stacklevel=stacklevel()) break else: self._ctx.unknown.append(cast('Data_UnknownBlock', block.info)) self._write_file(block.info, name=f'Unknown {len(self._ctx.unknown)}') # increment frame number ext._frnum += 1 # verbose output ext._vfunc(ext, block) # write plist self._write_file(block.info, name=f'Frame {ext._frnum}') # record fragments if ext._flag_r: if ext._ipv4: data_ipv4 = ipv4_reassembly(block) if data_ipv4 is not None: ext._reasm.ipv4(data_ipv4) if ext._ipv6: data_ipv6 = ipv6_reassembly(block) if data_ipv6 is not None: ext._reasm.ipv6(data_ipv6) if ext._tcp: data_tcp = tcp_reassembly(block) if data_tcp is not None: ext._reasm.tcp(data_tcp) # trace flows if ext._flag_t: if ext._tcp: data_tf_tcp = tcp_traceflow(block, nanosecond=block.nanosecond) if data_tf_tcp is not None: ext._trace.tcp(data_tf_tcp) # record blocks if ext._flag_d: ext._frame.append(block) # return block record return block
########################################################################## # Utilities. ########################################################################## def _write_file(self, block: 'Data_PCAPNG', *, name: 'str') -> 'None': """Write the parsed block into output file. Args: block: The parsed block. name: The name of the block. """ ext = self._extractor if ext._flag_q: return if ext._flag_f: ofile = ext._ofile(f'{ext._ofnm}/{name}.{ext._fext}') ofile(block.to_dict(), name=name) else: ext._ofile(block.to_dict(), name=name) ofile = ext._ofile ext._offmt = ofile.kind def _get_snaplen(self) -> 'int': """Get snapshot length from the current context. This method is used for providing the snapshot length to the ``__packet__`` argument when parsing a Simple Packet Block (SPB). Notes: If there is no interface, return ``0xFFFF_FFFF_FFFF_FFFF``. """ if self._ctx.interfaces: return self._ctx.interfaces[0].snaplen return 0xFFFF_FFFF_FFFF_FFFF