Source code for pcapkit.foundation.engines.pyshark
# -*- coding: utf-8 -*-"""PyShark Support=====================.. module:: pcapkit.foundation.engines.pysharkThis module contains the implementation for `PyShark`_ enginesupport, as is used by :class:`pcapkit.foundation.extraction.Extractor`... _PyShark: https://kiminewt.github.io/pyshark"""fromtypingimportTYPE_CHECKING,castfrompcapkit.foundation.engines.engineimportEngineBaseasEnginefrompcapkit.foundation.reassemblyimportReassemblyManagerfrompcapkit.utilities.exceptionsimportstacklevelfrompcapkit.utilities.warningsimportAttributeWarning,warn__all__=['PyShark']ifTYPE_CHECKING:frompyshark.capture.file_captureimportFileCapturefrompyshark.packet.packetimportPacketasPySharkPacketfrompcapkit.foundation.extractionimportExtractor
[docs]defrun(self)->'None':"""Call :class:`pyshark.FileCapture` to extract PCAP files. This method assigns :attr:`self._expkg <PyShark._expkg>` as :mod:`pyshark` and :attr:`self._extmp <PyShark._extmp>` as an iterator from :class:`pyshark.FileCapture`. Warns: AttributeWarning: Warns under following circumstances: * if :attr:`self.extractor._exlyr <pcapkit.foundation.extraction.Extractor._exlyr>` and/or :attr:`self.extractor._exptl <pcapkit.foundation.extraction.Extractor._exptl>` is provided as the PyShark engine currently does not support such operations. * if reassembly is enabled, as the PyShark engine currently does not support such operation. """ext=self._extractorifext._exlyr!='none'orext._exptl!='null':warn("'Extractor(engine='pyshark')' does not support protocol and layer threshold; "f"'layer={ext._exlyr}' and 'protocol={ext._exptl}' ignored",AttributeWarning,stacklevel=stacklevel())ifext._flag_rand(ext._ipv4orext._ipv6orext._tcp):ext._flag_r=Falseext._reasm=ReassemblyManager(ipv4=None,ipv6=None,tcp=None)warn("'Extractor(engine='pyshark')' object dose not support reassembly; "f"so 'ipv4={ext._ipv4}', 'ipv6={ext._ipv6}' and 'tcp={ext._tcp}' will be ignored",AttributeWarning,stacklevel=stacklevel())# setup verbose handlerifext._flag_v:ext._vfunc=lambdae,f:print(f'Frame {e._frnum:>3d}: {f.frame_info.protocols}'# pylint: disable=protected-access)# pylint: disable=logging-fstring-interpolation# extract & analyse fileself._extmp=self._expkg.FileCapture(ext._ifnm,keep_packets=False)
[docs]defread_frame(self)->'PySharkPacket':"""Read frames with PyShark engine. Returns: Parsed frame instance. See Also: Please refer to :meth:`PCAP.read_frame <pcapkit.foundation.engines.pcap.PCAP.read_frame>` for more operational information. """frompcapkit.toolkit.pysharkimportpacket2dict,tcp_traceflowext=self._extractor# fetch PyShark packetpacket=cast('PySharkPacket',self._extmp.next())# verbose outputext._frnum=int(packet.number)ext._vfunc(ext,packet)# write plistfrnum=f'Frame {ext._frnum}'ifnotext._flag_q:info=packet2dict(packet)ifext._flag_f:ofile=ext._ofile(f'{ext._ofnm}/{frnum}.{ext._fext}')ofile(info,name=frnum)else:ext._ofile(info,name=frnum)ofile=ext._ofileext._offmt=ofile.kind# trace flowsifext._flag_t:ifext._tcp:data_tf_tcp=tcp_traceflow(packet)ifdata_tf_tcpisnotNone:ext._trace.tcp(data_tf_tcp)# record framesifext._flag_d:# setattr(packet, 'packet2dict', packet2dict)ext._frame.append(packet)# return frame recordreturnpacket
[docs]defclose(self)->'None':"""Close engine. This method is to be used for closing the engine instance. It is to close the engine instance after the extraction process is finished. """self._extmp.close()