# -*- coding: utf-8 -*-"""DPKT Support==================.. module:: pcapkit.foundation.engines.dpktThis module contains the implementation for `DPKT`_ enginesupport, as is used by :class:`pcapkit.foundation.extraction.Extractor`... _DPKT: https://dpkt.readthedocs.io"""fromtypingimportTYPE_CHECKING,castfrompcapkit.const.reg.linktypeimportLinkTypeasEnum_LinkTypefrompcapkit.foundation.engines.engineimportEngineBaseasEnginefrompcapkit.utilities.exceptionsimportFormatError,stacklevelfrompcapkit.utilities.warningsimportAttributeWarning,DPKTWarning,warn__all__=['DPKT']ifTYPE_CHECKING:fromtypingimportOptional,Type,Unionfromdpkt.dpktimportPacketasDPKTPacketfromdpkt.pcapimportReaderasPCAPReaderfromdpkt.pcapngimportReaderasPCAPNGReaderfrompcapkit.foundation.extractionimportExtractorReader=Union[PCAPReader,PCAPNGReader]
[docs]defrun(self)->'None':"""Call :class:`dpkt.pcap.Reader` to extract PCAP files. This method assigns :attr:`self._expkg <DPKT._expkg>` as :mod:`dpkt` and :attr:`self._extmp <DPKT._extmp>` as an iterator from :class:`dpkt.pcap.Reader`. Warns: AttributeWarning: If :attr:`self.extractor._exlyr <pcapkit.foundation.extraction.Extractor._exlyr>` and/or :attr:`self.extractor._exptl <pcapkit.foundation.extraction.Extractor._exptl>` is provided as the DPKT engine currently does not support such operations. Raises: FormatError: If the file format is not supported, i.e., not a PCAP and/or PCAP-NG file. """frompcapkit.foundation.engines.pcapimportPCAPfrompcapkit.foundation.engines.pcapngimportPCAPNGext=self._extractordpkt=self._expkgifext._exlyr!='none'orext._exptl!='null':warn("'Extractor(engine=dpkt)' does not support protocol and layer threshold; "f"'layer={ext._exlyr}' and 'protocol={ext._exptl}' ignored",AttributeWarning,stacklevel=stacklevel())# setup verbose handlerifext._flag_v:frompcapkit.toolkit.dpktimportpacket2chain# isort:skipext._vfunc=lambdae,f:print(f'Frame {e._frnum:>3d}: {packet2chain(f)}'# pylint: disable=protected-access)# pylint: disable=logging-fstring-interpolationifext.magic_numberinPCAP.MAGIC_NUMBER:reader=dpkt.pcap.Reader(ext._ifile)elifext.magic_numberinPCAPNG.MAGIC_NUMBER:reader=dpkt.pcapng.Reader(ext._ifile)else:raiseFormatError(f'unsupported file format: {ext.magic_number!r}')# extract & analyse fileself._extmp=reader
[docs]defread_frame(self)->'DPKTPacket':"""Read frames with DPKT engine. Returns: Parsed frame instance. See Also: Please refer to :meth:`PCAP.read_frame <pcapkit.foundation.engines.pcap.PCAP.read_frame>` for more operational information. """frompcapkit.toolkit.dpktimport(ipv4_reassembly,ipv6_reassembly,packet2dict,tcp_reassembly,tcp_traceflow)ext=self._extractorreader=self._extmplinktype=Enum_LinkType.get(reader.datalink())# fetch DPKT packettimestamp,pkt=cast('tuple[float, bytes]',next(reader))protocol=self._get_protocol(linktype)packet=protocol(pkt)# type: DPKTPacket# verbose outputext._frnum+=1ext._vfunc(ext,packet)# write plistfrnum=f'Frame {ext._frnum}'ifnotext._flag_q:info=packet2dict(packet,timestamp,data_link=linktype)ifext._flag_f:ofile=ext._ofile(f'{ext._ofnm}/{frnum}.{ext._fext}')ofile(info,name=frnum)else:ext._ofile(info,name=frnum)ofile=ext._ofileext._offmt=ofile.kind# record fragmentsifext._flag_r:ifext._ipv4:data_ipv4=ipv4_reassembly(packet,count=ext._frnum)ifdata_ipv4isnotNone:ext._reasm.ipv4(data_ipv4)ifext._ipv6:data_ipv6=ipv6_reassembly(packet,count=ext._frnum)ifdata_ipv6isnotNone:ext._reasm.ipv6(data_ipv6)ifext._tcp:data_tcp=tcp_reassembly(packet,count=ext._frnum)ifdata_tcpisnotNone:ext._reasm.tcp(data_tcp)# trace flowsifext._flag_t:ifext._tcp:data_tf_tcp=tcp_traceflow(packet,timestamp,data_link=linktype,count=ext._frnum)ifdata_tf_tcpisnotNone:ext._trace.tcp(data_tf_tcp)# record framesifext._flag_d:# setattr(packet, 'packet2dict', packet2dict)# setattr(packet, 'packet2chain', packet2chain)ext._frame.append(packet)# return frame recordreturnpacket
########################################################################### Utilities.##########################################################################def_get_protocol(self,linktype:'Optional[Enum_LinkType]'=None)->'Type[DPKTPacket]':"""Returns the protocol for parsing the current packet. Args: linktype: Link type code. """dpkt=self._expkgreader=self._extmpiflinktypeisNone:linktype=Enum_LinkType.get(reader.datalink())iflinktype==Enum_LinkType.ETHERNET:pkg=dpkt.ethernet.Etherneteliflinktype.value==Enum_LinkType.IPV4:pkg=dpkt.ip.IPeliflinktype.value==Enum_LinkType.IPV6:pkg=dpkt.ip6.IP6else:warn('unrecognised link layer protocol; all analysis functions ignored',DPKTWarning,stacklevel=stacklevel())classRawPacket(dpkt.dpkt.Packet):# type: ignore[name-defined]"""Raw packet."""def__len__(ext)->'int':returnlen(ext.data)def__bytes__(ext)->'bytes':returnext.datadefunpack(ext,buf:'bytes')->'None':ext.data=bufpkg=RawPacketreturnpkg