Source code for pcapkit.foundation.engines.pcap

# -*- coding: utf-8 -*-
"""PCAP Support
==================

.. module:: pcapkit.foundation.engines.pcap

This module contains the implementation for PCAP file extraction
support, as is used by :class:`pcapkit.foundation.extraction.Extractor`.

"""
from typing import TYPE_CHECKING

from pcapkit.foundation.engines.engine import EngineBase as Engine
from pcapkit.protocols.misc.pcap.frame import Frame
from pcapkit.protocols.misc.pcap.header import Header

__all__ = ['PCAP']

if TYPE_CHECKING:
    from pcapkit.const.reg.linktype import LinkType as Enum_LinkType
    from pcapkit.corekit.version import VersionInfo


[docs] class PCAP(Engine[Frame]): """PCAP file extraction support. Args: extractor: :class:`~pcapkit.foundation.extraction.Extractor` instance. """ if TYPE_CHECKING: #: Global header. _gbhdr: 'Header' #: Version info. _vinfo: 'VersionInfo' #: Data link layer protocol. _dlink: 'Enum_LinkType' #: Nanosecond flag. _nnsec: 'bool' MAGIC_NUMBER = ( b'\xa1\xb2\x3c\x4d', b'\xa1\xb2\xc3\xd4', b'\x4d\x3c\xb2\xa1', b'\xd4\xc3\xb2\xa1', ) ########################################################################## # Defaults. ########################################################################## #: Engine name. __engine_name__ = 'PCAP' #: Engine module name. __engine_module__ = 'pcapkit.protocols.misc.pcap' ########################################################################## # Properties. ########################################################################## @property def header(self) -> 'Header': """Global header.""" return self._gbhdr @property def version(self) -> 'VersionInfo': """Version of input PCAP file.""" return self._vinfo @property def dlink(self) -> 'Enum_LinkType': """Data link layer protocol.""" return self._dlink @property def nanosecond(self) -> 'bool': """Nanosecond flag.""" return self._nnsec ########################################################################## # Methods. ##########################################################################
[docs] def run(self) -> 'None': """Start extraction. This method is the entry point for PCAP file extraction. It will start the extraction process by parsing the PCAP global header and then halt the extraction process until the :meth:`self.extractor.record_frames <pcapkit.foundation.extraction.Extractor.record_frames>` method is called. The method will parse the PCAP global header and save the parsed result as :attr:`self.header <header>`. Information such as PCAP version, data link layer protocol type, nanosecond flag and byteorder will also be save the current :class:`PCAP` engine instance. For output, the method will dump the parsed PCAP global header under the name of ``Global Header``. """ # pylint: disable=attribute-defined-outside-init,protected-access ext = self._extractor self._gbhdr = Header(ext._ifile) self._vinfo = self._gbhdr.version self._dlink = self._gbhdr.protocol self._nnsec = self._gbhdr.nanosecond if ext._flag_q: return if ext._flag_f: ofile = ext._ofile(f'{ext._ofnm}/Global Header.{ext._fext}') ofile(self._gbhdr.info.to_dict(), name='Global Header') else: ext._ofile(self._gbhdr.info.to_dict(), name='Global Header') ofile = ext._ofile ext._offmt = ofile.kind
[docs] def read_frame(self) -> 'Frame': """Read frames. This method performs following operations: - extract frames and each layer of packets; - make :class:`~pcapkit.corekit.infoclass.Info` object out of frame properties; - write to output file with corresponding dumper; - reassemble IP and/or TCP datagram; - trace TCP flows if any; - record frame :class:`~pcapkit.corekit.infoclass.Info` object to frame storage. Returns: Parsed frame instance. """ from pcapkit.toolkit.pcap import (ipv4_reassembly, ipv6_reassembly, tcp_reassembly, tcp_traceflow) ext = self._extractor # read frame header frame = Frame(ext._ifile, num=ext._frnum+1, header=self._gbhdr.info, layer=ext._exlyr, protocol=ext._exptl, nanosecond=self._nnsec) ext._frnum += 1 # verbose output ext._vfunc(ext, frame) # write plist frnum = f'Frame {ext._frnum}' if not ext._flag_q: if ext._flag_f: ofile = ext._ofile(f'{ext._ofnm}/{frnum}.{ext._fext}') ofile(frame.info.to_dict(), name=frnum) else: ext._ofile(frame.info.to_dict(), name=frnum) # record fragments if ext._flag_r: if ext._ipv4: data_ipv4 = ipv4_reassembly(frame) if data_ipv4 is not None: ext._reasm.ipv4(data_ipv4) if ext._ipv6: data_ipv6 = ipv6_reassembly(frame) if data_ipv6 is not None: ext._reasm.ipv6(data_ipv6) if ext._tcp: data_tcp = tcp_reassembly(frame) if data_tcp is not None: ext._reasm.tcp(data_tcp) # trace flows if ext._flag_t: if ext._tcp: data_tf_tcp = tcp_traceflow(frame, data_link=self._dlink) if data_tf_tcp is not None: ext._trace.tcp(data_tf_tcp) # record frames if ext._flag_d: ext._frame.append(frame) # return frame record return frame