Source code for pcapkit.foundation.engines.pyshark

# -*- coding: utf-8 -*-
"""PyShark Support
=====================

.. module:: pcapkit.foundation.engines.pyshark

This module contains the implementation for `PyShark`_ engine
support, as is used by :class:`pcapkit.foundation.extraction.Extractor`.

.. _PyShark: https://kiminewt.github.io/pyshark

"""
from typing import TYPE_CHECKING, cast

from pcapkit.foundation.engines.engine import EngineBase as Engine
from pcapkit.foundation.reassembly import ReassemblyManager
from pcapkit.utilities.exceptions import stacklevel
from pcapkit.utilities.warnings import AttributeWarning, warn

__all__ = ['PyShark']

if TYPE_CHECKING:
    from pyshark.capture.file_capture import FileCapture
    from pyshark.packet.packet import Packet as PySharkPacket

    from pcapkit.foundation.extraction import Extractor


[docs] class PyShark(Engine['PySharkPacket']): """PyShark engine support. Args: extractor: :class:`~pcapkit.foundation.extraction.Extractor` instance. """ if TYPE_CHECKING: import pyshark #: Engine extraction package. _expkg: 'pyshark' #: Engine extraction temporary storage. _extmp: 'FileCapture' ########################################################################## # Defaults. ########################################################################## #: Engine name. __engine_name__ = 'PyShark' #: Engine module name. __engine_module__ = 'pyshark' ########################################################################## # Data models. ########################################################################## def __init__(self, extractor: 'Extractor') -> 'None': import pyshark # isort:skip self._expkg = pyshark self._extmp = cast('FileCapture', None) super().__init__(extractor) ########################################################################## # Methods. ##########################################################################
[docs] def run(self) -> 'None': """Call :class:`pyshark.FileCapture` to extract PCAP files. This method assigns :attr:`self._expkg <PyShark._expkg>` as :mod:`pyshark` and :attr:`self._extmp <PyShark._extmp>` as an iterator from :class:`pyshark.FileCapture`. Warns: AttributeWarning: Warns under following circumstances: * if :attr:`self.extractor._exlyr <pcapkit.foundation.extraction.Extractor._exlyr>` and/or :attr:`self.extractor._exptl <pcapkit.foundation.extraction.Extractor._exptl>` is provided as the PyShark engine currently does not support such operations. * if reassembly is enabled, as the PyShark engine currently does not support such operation. """ ext = self._extractor if ext._exlyr != 'none' or ext._exptl != 'null': warn("'Extractor(engine='pyshark')' does not support protocol and layer threshold; " f"'layer={ext._exlyr}' and 'protocol={ext._exptl}' ignored", AttributeWarning, stacklevel=stacklevel()) if ext._flag_r and (ext._ipv4 or ext._ipv6 or ext._tcp): ext._flag_r = False ext._reasm = ReassemblyManager(ipv4=None, ipv6=None, tcp=None) warn("'Extractor(engine='pyshark')' object dose not support reassembly; " f"so 'ipv4={ext._ipv4}', 'ipv6={ext._ipv6}' and 'tcp={ext._tcp}' will be ignored", AttributeWarning, stacklevel=stacklevel()) # setup verbose handler if ext._flag_v: ext._vfunc = lambda e, f: print( f'Frame {e._frnum:>3d}: {f.frame_info.protocols}' # pylint: disable=protected-access ) # pylint: disable=logging-fstring-interpolation # extract & analyse file self._extmp = self._expkg.FileCapture(ext._ifnm, keep_packets=False)
[docs] def read_frame(self) -> 'PySharkPacket': """Read frames with PyShark engine. Returns: Parsed frame instance. See Also: Please refer to :meth:`PCAP.read_frame <pcapkit.foundation.engines.pcap.PCAP.read_frame>` for more operational information. """ from pcapkit.toolkit.pyshark import packet2dict, tcp_traceflow ext = self._extractor # fetch PyShark packet packet = cast('PySharkPacket', self._extmp.next()) # verbose output ext._frnum = int(packet.number) ext._vfunc(ext, packet) # write plist frnum = f'Frame {ext._frnum}' if not ext._flag_q: info = packet2dict(packet) if ext._flag_f: ofile = ext._ofile(f'{ext._ofnm}/{frnum}.{ext._fext}') ofile(info, name=frnum) else: ext._ofile(info, name=frnum) ofile = ext._ofile ext._offmt = ofile.kind # trace flows if ext._flag_t: if ext._tcp: data_tf_tcp = tcp_traceflow(packet) if data_tf_tcp is not None: ext._trace.tcp(data_tf_tcp) # record frames if ext._flag_d: # setattr(packet, 'packet2dict', packet2dict) ext._frame.append(packet) # return frame record return packet
[docs] def close(self) -> 'None': """Close engine. This method is to be used for closing the engine instance. It is to close the engine instance after the extraction process is finished. """ self._extmp.close()