# -*- coding: utf-8 -*-"""PCAP Support==================.. module:: pcapkit.foundation.engines.pcapThis module contains the implementation for PCAP file extractionsupport, as is used by :class:`pcapkit.foundation.extraction.Extractor`."""fromtypingimportTYPE_CHECKINGfrompcapkit.foundation.engines.engineimportEngineBaseasEnginefrompcapkit.protocols.misc.pcap.frameimportFramefrompcapkit.protocols.misc.pcap.headerimportHeader__all__=['PCAP']ifTYPE_CHECKING:frompcapkit.const.reg.linktypeimportLinkTypeasEnum_LinkTypefrompcapkit.corekit.versionimportVersionInfo
[docs]classPCAP(Engine[Frame]):"""PCAP file extraction support. Args: extractor: :class:`~pcapkit.foundation.extraction.Extractor` instance. """ifTYPE_CHECKING:#: Global header._gbhdr:'Header'#: Version info._vinfo:'VersionInfo'#: Data link layer protocol._dlink:'Enum_LinkType'#: Nanosecond flag._nnsec:'bool'MAGIC_NUMBER=(b'\xa1\xb2\x3c\x4d',b'\xa1\xb2\xc3\xd4',b'\x4d\x3c\xb2\xa1',b'\xd4\xc3\xb2\xa1',)########################################################################### Defaults.###########################################################################: Engine name.__engine_name__='PCAP'#: Engine module name.__engine_module__='pcapkit.protocols.misc.pcap'########################################################################### Properties.##########################################################################@propertydefheader(self)->'Header':"""Global header."""returnself._gbhdr@propertydefversion(self)->'VersionInfo':"""Version of input PCAP file."""returnself._vinfo@propertydefdlink(self)->'Enum_LinkType':"""Data link layer protocol."""returnself._dlink@propertydefnanosecond(self)->'bool':"""Nanosecond flag."""returnself._nnsec########################################################################### Methods.##########################################################################
[docs]defrun(self)->'None':"""Start extraction. This method is the entry point for PCAP file extraction. It will start the extraction process by parsing the PCAP global header and then halt the extraction process until the :meth:`self.extractor.record_frames <pcapkit.foundation.extraction.Extractor.record_frames>` method is called. The method will parse the PCAP global header and save the parsed result as :attr:`self.header <header>`. Information such as PCAP version, data link layer protocol type, nanosecond flag and byteorder will also be save the current :class:`PCAP` engine instance. For output, the method will dump the parsed PCAP global header under the name of ``Global Header``. """# pylint: disable=attribute-defined-outside-init,protected-accessext=self._extractorself._gbhdr=Header(ext._ifile)self._vinfo=self._gbhdr.versionself._dlink=self._gbhdr.protocolself._nnsec=self._gbhdr.nanosecondifext._flag_q:returnifext._flag_f:ofile=ext._ofile(f'{ext._ofnm}/Global Header.{ext._fext}')ofile(self._gbhdr.info.to_dict(),name='Global Header')else:ext._ofile(self._gbhdr.info.to_dict(),name='Global Header')ofile=ext._ofileext._offmt=ofile.kind
[docs]defread_frame(self)->'Frame':"""Read frames. This method performs following operations: - extract frames and each layer of packets; - make :class:`~pcapkit.corekit.infoclass.Info` object out of frame properties; - write to output file with corresponding dumper; - reassemble IP and/or TCP datagram; - trace TCP flows if any; - record frame :class:`~pcapkit.corekit.infoclass.Info` object to frame storage. Returns: Parsed frame instance. """frompcapkit.toolkit.pcapimport(ipv4_reassembly,ipv6_reassembly,tcp_reassembly,tcp_traceflow)ext=self._extractor# read frame headerframe=Frame(ext._ifile,num=ext._frnum+1,header=self._gbhdr.info,layer=ext._exlyr,protocol=ext._exptl,nanosecond=self._nnsec)ext._frnum+=1# verbose outputext._vfunc(ext,frame)# write plistfrnum=f'Frame {ext._frnum}'ifnotext._flag_q:ifext._flag_f:ofile=ext._ofile(f'{ext._ofnm}/{frnum}.{ext._fext}')ofile(frame.info.to_dict(),name=frnum)else:ext._ofile(frame.info.to_dict(),name=frnum)# record fragmentsifext._flag_r:ifext._ipv4:data_ipv4=ipv4_reassembly(frame)ifdata_ipv4isnotNone:ext._reasm.ipv4(data_ipv4)ifext._ipv6:data_ipv6=ipv6_reassembly(frame)ifdata_ipv6isnotNone:ext._reasm.ipv6(data_ipv6)ifext._tcp:data_tcp=tcp_reassembly(frame)ifdata_tcpisnotNone:ext._reasm.tcp(data_tcp)# trace flowsifext._flag_t:ifext._tcp:data_tf_tcp=tcp_traceflow(frame,data_link=self._dlink)ifdata_tf_tcpisnotNone:ext._trace.tcp(data_tf_tcp)# record framesifext._flag_d:ext._frame.append(frame)# return frame recordreturnframe