# -*- coding: utf-8 -*-"""Scapy Support===================.. module:: pcapkit.foundation.engines.scapyThis module contains the implementation for `Scapy`_ enginesupport, as is used by :class:`pcapkit.foundation.extraction.Extractor`... _Scapy: https://scapy.net"""fromtypingimportTYPE_CHECKING,castfrompcapkit.foundation.engines.engineimportEngineBaseasEnginefrompcapkit.utilities.exceptionsimportstacklevelfrompcapkit.utilities.warningsimportAttributeWarning,warn__all__=['Scapy']ifTYPE_CHECKING:fromtypingimportIteratorfromscapy.packetimportPacketasScapyPacketfrompcapkit.foundation.extractionimportExtractor
[docs]defrun(self)->'None':"""Call :func:`scapy.sendrecv.sniff` to extract PCAP files. This method assigns :attr:`self._expkg <Scapy._expkg>` as :mod:`scapy.sendrecv` and :attr:`self._extmp <Scapy._extmp>` as an iterator from :func:`scapy.sendrecv.sniff`. Warns: AttributeWarning: If :attr:`self.extractor._exlyr <pcapkit.foundation.extraction.Extractor._exlyr>` and/or :attr:`self.extractor._exptl <pcapkit.foundation.extraction.Extractor._exptl>` is provided as the Scapy engine currently does not support such operations. """ext=self._extractorifext._exlyr!='none'orext._exptl!='null':warn("'Extractor(engine=scapy)' does not support protocol and layer threshold; "f"'layer={ext._exlyr}' and 'protocol={ext._exptl}' ignored",AttributeWarning,stacklevel=stacklevel())# setup verbose handlerifext._flag_v:frompcapkit.toolkit.scapyimportpacket2chain# isort:skipext._vfunc=lambdae,f:print(f'Frame {e._frnum:>3d}: {packet2chain(f)}'# pylint: disable=protected-access)# pylint: disable=logging-fstring-interpolation# extract & analyse fileself._extmp=iter(self._expkg.sniff(offline=ext._ifnm))
[docs]defread_frame(self)->'ScapyPacket':"""Read frames with Scapy engine. Returns: Parsed frame instance. See Also: Please refer to :meth:`PCAP.read_frame <pcapkit.foundation.engines.pcap.PCAP.read_frame>` for more operational information. """frompcapkit.toolkit.scapyimport(ipv4_reassembly,ipv6_reassembly,packet2dict,tcp_reassembly,tcp_traceflow)ext=self._extractor# fetch Scapy packetpacket=next(self._extmp)# verbose outputext._frnum+=1ext._vfunc(ext,packet)# write plistfrnum=f'Frame {ext._frnum}'ifnotext._flag_q:info=packet2dict(packet)ifext._flag_f:ofile=ext._ofile(f'{ext._ofnm}/{frnum}.{ext._fext}')ofile(info,name=frnum)else:ext._ofile(info,name=frnum)# record fragmentsifext._flag_r:ifext._ipv4:data_ipv4=ipv4_reassembly(packet,count=ext._frnum)ifdata_ipv4isnotNone:ext._reasm.ipv4(data_ipv4)ifext._ipv6:data_ipv6=ipv6_reassembly(packet,count=ext._frnum)ifdata_ipv6isnotNone:ext._reasm.ipv6(data_ipv6)ifext._tcp:data_tcp=tcp_reassembly(packet,count=ext._frnum)ifdata_tcpisnotNone:ext._reasm.tcp(data_tcp)# trace flowsifext._flag_t:ifext._tcp:data_tf_tcp=tcp_traceflow(packet,count=ext._frnum)ifdata_tf_tcpisnotNone:ext._trace.tcp(data_tf_tcp)# record framesifext._flag_d:# setattr(packet, 'packet2dict', packet2dict)# setattr(packet, 'packet2chain', packet2chain)ext._frame.append(packet)# return frame recordreturnpacket