File Extractor¶
pcapkit.foundation.extraction contains
Extractor only,
which synthesises file I/O and protocol analysis,
coordinates information exchange in all network layers,
extracts parametres from a PCAP file.
- class pcapkit.foundation.extraction.Extractor(fin=None, fout=None, format=None, auto=True, extension=True, store=True, files=False, nofile=False, verbose=False, engine=None, layer=None, protocol=None, reassembly=False, reasm_strict=True, reasm_store=True, trace=False, trace_fout=None, trace_format=None, trace_byteorder='little', trace_nanosecond=False, ip=False, ipv4=False, ipv6=False, tcp=False, buffer_size=8192, buffer_save=False, buffer_path=None, no_eof=False)[source]¶
-
Extractor for PCAP files.
Notes
For supported engines, please refer to
run().- __init__(fin=None, fout=None, format=None, auto=True, extension=True, store=True, files=False, nofile=False, verbose=False, engine=None, layer=None, protocol=None, reassembly=False, reasm_strict=True, reasm_store=True, trace=False, trace_fout=None, trace_format=None, trace_byteorder='little', trace_nanosecond=False, ip=False, ipv4=False, ipv6=False, tcp=False, buffer_size=8192, buffer_save=False, buffer_path=None, no_eof=False)[source]¶
Initialise PCAP Reader.
- Parameters:
fin (
Union[str,IO[bytes],None]) – file name to be read or a binary IO object; if file not exist, raiseFileNotFoundformat (
Optional[Literal['pcap','json','tree','plist']]) – file format of outputauto (
bool) – if automatically run till EOFextension (
bool) – if check and append extensions to output filestore (
bool) – if store extracted packet infofiles (
bool) – if split each frame into different filesnofile (
bool) – if no output file is to be dumpedverbose (
Union[bool,Callable[[Extractor,Union[Frame,PCAPNG,Packet,Packet,Packet]],Any]]) – aboolvalue or a function takes theExtractorinstance and current parsed frame (depends on engine selected) as parameters to print verbose output informationengine (
Optional[Literal['default','pcapkit','dpkt','scapy','pyshark']]) – extraction engine to be usedlayer (
Optional[Literal['link','internet','transport','application','none']]) – extract til which layerprotocol (
Union[str,ProtocolBase,Type[ProtocolBase],None]) – extract til which protocolreassembly (
bool) – if perform reassemblyreasm_strict (
bool) – if set strict flag for reassemblyreasm_store (
bool) – if store reassembled datagramstrace (
bool) – if trace TCP traffic flowstrace_fout (
Optional[str]) – path name for flow tracer if necessarytrace_format (
Optional[Literal['pcap','json','tree','plist']]) – output file format of flow tracertrace_byteorder (
Literal['big','little']) – output file byte ordertrace_nanosecond (
bool) – output nanosecond-resolution file flagip (
bool) – if record data for IPv4 & IPv6 reassembly (must be used withreassembly=True)ipv4 (
bool) – if perform IPv4 reassembly (must be used withreassembly=True)ipv6 (
bool) – if perform IPv6 reassembly (must be used withreassembly=True)tcp (
bool) – if perform TCP reassembly and/or flow tracing (must be used withreassembly=Trueortrace=True)buffer_size (
int) – buffer size for reading input file (forSeekableReaderonly)buffer_save (
bool) – if save buffer to file (forSeekableReaderonly)buffer_path (
Optional[str]) – path name for buffer file if necessary (forSeekableReaderonly)
- Warns:
pcapkit.utilities.warnings.FormatWarning – Warns under following circumstances:
If using PCAP output for TCP flow tracing while the extraction engine is PyShark.
If output file format is not supported.
- property format: Literal['pcap', 'json', 'tree', 'plist']¶
Format of output file.
- Raises:
UnsupportedCall – If
self._flag_qis set asTrue, as output is disabled by initialisation parameter.
- property output: str¶
Name of output file.
- Raises:
UnsupportedCall – If
self._flag_qis set asTrue, as output is disabled by initialisation parameter.
- property frame: tuple[Frame | PCAPNG | Packet | Packet | Packet, ...]¶
Extracted frames.
- Raises:
UnsupportedCall – If
self._flag_disFalse, as storing frame data is disabled.
- property reassembly: ReassemblyData¶
Frame record for reassembly.
ipv4– tuple of IPv4 payload fragment (reasm.ipv4.datagram)ipv6– tuple of IPv6 payload fragment (reasm.ipv6.datagram)tcp– tuple of TCP payload fragment (reasm.tcp.datagram)
- Raises:
UnsupportedCall – If
self._flag_risFalse, as reassembly is disabled.
- property trace: TraceFlowData¶
Index table for traced flow.
tcp– tuple of TCP flows (trace.tcp.index)
- Raises:
UnsupportedCall – If
self._flag_tisFalse, as flow tracing is disabled.
- classmethod register_dumper(format, dumper, ext)[source]¶
Register a new dumper class.
Notes
The full qualified class name of the new dumper class should be as
{dumper.module}.{dumper.name}.- Parameters:
format (
str) – format namedumper (
Union[ModuleDescriptor[Dumper],Type[Dumper]]) – module descriptor or adictdumper.dumper.Dumpersubclassext (
str) – file extension
- Return type:
- classmethod register_engine(name, engine)[source]¶
Register a new extraction engine.
Notes
The full qualified class name of the new extraction engine should be as
{engine.module}.{engine.name}.
- classmethod register_reassembly(protocol, reassembly)[source]¶
Register a new reassembly engine.
Notes
The full qualified class name of the new reassembly engine should be as
{reassembly.module}.{reassembly.name}.- Parameters:
protocol (
str) – protocol namereassembly (
Union[ModuleDescriptor[Reassembly],Type[Reassembly]]) – module descriptor or aReassemblysubclass
- Return type:
- classmethod register_traceflow(protocol, traceflow)[source]¶
Register a new flow tracing engine.
Notes
The full qualified class name of the new flow tracing engine should be as
{traceflow.module}.{traceflow.name}.
- run()[source]¶
Start extraction.
We uses
import_test()to check if a certain engine is available or not. For supported engines, each engine has different driver method:Default drivers:
PCAP Format:
pcapkit.foundation.engines.pcap.PCAPPCAP-NG Format:
pcapkit.foundation.engines.pcapng.PCAPNG
DPKT driver:
pcapkit.foundation.engines.dpkt.DPKTScapy driver:
pcapkit.foundation.engines.scapy.ScapyPyShark driver:
pcapkit.foundation.engines.pyshark.PyShark
- Warns:
pcapkit.utilities.warnings.EngineWarning – If the extraction engine is not available. This is either due to dependency not installed, or supplied engine unknown.
- Return type:
None
- static import_test(engine, *, name=None)[source]¶
Test import for extractcion engine.
- Parameters:
- Warns:
pcapkit.utilities.warnings.EngineWarning – If the engine module is not installed.
- Return type:
- Returns:
If succeeded, returns the module; otherwise, returns
None.
- classmethod make_name(fin='in.pcap', fout='out', fmt='tree', extension=True, *, files=False, nofile=False)[source]¶
Generate input and output filenames.
The method will perform following processing:
sanitise
finas the input PCAP filename;in.pcapas default value and append.pcapextension if needed andextensionisTrue; as well as test if the file exists;if
nofileisTrue, skips following processing;if
fmtprovided, then it presumes corresponding output file extension;if
foutnot provided, it presumes the output file name based on the presumptive file extension; the stem of the output file name is set asout; should the file extension is not available, then it raisesFormatError;if
foutprovided, it presumes corresponding output format if needed; should the presumption cannot be made, then it raisesFormatError;it will also append corresponding file extension to the output file name if needed and
extensionisTrue.
And the method returns the generated input and output filenames as follows:
input filename
output filename / directory name
output format
output file extension (without
.)if split each frame into different files
- Parameters:
fin (
Union[str,IO[bytes]]) – Input filename or a binary IO object.fout (
str) – Output filename.fmt (
Literal['pcap','json','tree','plist']) – Output file format.extension (
bool) – If append.pcapfile extension to the input filename iffindoes not have such file extension; if check and append extensions to output file.files (
bool) – If split each frame into different files.nofile (
bool) – If no output file is to be dumped.
- Return type:
tuple[str,Optional[str],Literal['pcap','json','tree','plist'],Optional[str],bool]- Returns:
Generated input and output filenames.
- Raises:
FileNotFound – If input file does not exists.
FormatError – If output format not provided and cannot be presumpted.
- record_header()[source]¶
Read global header.
The method will parse the PCAP global header and save the parsed result to its extraction context. Information such as PCAP version, data link layer protocol type, nanosecond flag and byteorder will also be save the current
Engineinstance as well.If TCP flow tracing is enabled, the nanosecond flag and byteorder will be used for the output PCAP file of the traced TCP flows.
For output, the method will dump the parsed PCAP global header under the name of
Global Header.- Return type:
- record_frames()[source]¶
Read packet frames.
The method calls
self._exeng.read_frameto parse each frame from the input PCAP file; and performs cleanup by callingself._exeng.closeupon completion of the parsing process. :rtype:NoneNotes
Under non-auto mode, i.e.
self._flag_aisFalse, the method performs no action.
-
__output__:
DefaultDict[str,tuple[Union[ModuleDescriptor[Dumper],Type[Dumper]],str|None]]¶ Format dumper mapping for writing output files. The values should be a tuple representing the module name and class name, or a
dictdumper.dumper.Dumpersubclass, and corresponding file extension.
-
__engine__:
dict[str,Union[ModuleDescriptor[Engine],Type[Engine]]]¶ Engine mapping for extracting frames. The values should be a tuple representing the module name and class name, or an
Enginesubclass.
-
__reassembly__:
dict[str,Union[ModuleDescriptor[Reassembly],Type[Reassembly]]]¶ Reassembly support mapping for extracting frames. The values should be a tuple representing the module name and class name, or a
Reassemblysubclass.
-
__traceflow__:
dict[str,Union[ModuleDescriptor[TraceFlow],Type[TraceFlow]]]¶ Flow tracing support mapping for extracting frames. The values should be a tuple representing the module name and class name, or a
TraceFlowsubclass.
- _cleanup()[source]¶
Cleanup after extraction & analysis.
The method calls
self._exeng.close, setsself._flag_easTrueand closes the input file (if necessary).- Return type:
-
_flag_a:
bool¶ Auto extract flag. It indicates if the extraction process should continue automatically until the EOF is reached.
-
_flag_v:
bool¶ Verbose flag. This is used to determine if the verbose callback function should be called at each frame.
-
_flag_n:
bool¶ No EOF flag. It is useful when the input file is a live capture, as the extraction process will not stop until the user interrupt the process.
-
_flag_s:
bool¶ Input filename flag. It indicates if the input file is a file name or a binary IO object. For the latter, we should not close the file object after extraction.
-
_ifile:
BufferedReader¶ Input file object.
-
_reasm:
ReassemblyManager¶ Frame record for reassembly.
-
_trace:
TraceFlowManager¶ Frame record for flow tracing.
-
_exptl:
Union[str,ProtocolBase,Type[ProtocolBase]]¶ Extract til protocol.
- __iter__()[source]¶
Iterate and parse PCAP frame.
- Raises:
IterableError – If
self._flag_aisTrue, as such operation is not applicable.- Return type:
- __next__()[source]¶
Iterate and parse next PCAP frame.
It will call
self._exeng.read_frameto parse next PCAP frame internally, until the EOF reached; then it callsself._cleanupfor the aftermath.- Return type:
TypeVar(_P)
- __call__()[source]¶
Works as a simple wrapper for the iteration protocol.
- Raises:
IterableError – If
self._flag_aisTrue, as iteration is not applicable.- Return type:
TypeVar(_P)