File Extractor#

pcapkit.foundation.extraction contains Extractor only, which synthesises file I/O and protocol analysis, coordinates information exchange in all network layers, extracts parametres from a PCAP file.

Todo

Implement engine support for pypcap & pycapfile.

class pcapkit.foundation.extraction.Extractor(fin=None, fout=None, format=None, auto=True, extension=True, store=True, files=False, nofile=False, verbose=False, engine=None, layer=None, protocol=None, reassembly=False, reasm_strict=True, reasm_store=True, trace=False, trace_fout=None, trace_format=None, trace_byteorder='little', trace_nanosecond=False, ip=False, ipv4=False, ipv6=False, tcp=False, buffer_size=8192, buffer_save=False, buffer_path=None, no_eof=False)[source]#

Bases: Generic[_P]

Extractor for PCAP files.

Notes

For supported engines, please refer to run().

__init__(fin=None, fout=None, format=None, auto=True, extension=True, store=True, files=False, nofile=False, verbose=False, engine=None, layer=None, protocol=None, reassembly=False, reasm_strict=True, reasm_store=True, trace=False, trace_fout=None, trace_format=None, trace_byteorder='little', trace_nanosecond=False, ip=False, ipv4=False, ipv6=False, tcp=False, buffer_size=8192, buffer_save=False, buffer_path=None, no_eof=False)[source]#

Initialise PCAP Reader.

Parameters:
  • fin (Union[str, IO[bytes], None]) – file name to be read or a binary IO object; if file not exist, raise FileNotFound

  • fout (Optional[str]) – file name to be written

  • format (Optional[Literal['pcap', 'json', 'tree', 'plist']]) – file format of output

  • auto (bool) – if automatically run till EOF

  • extension (bool) – if check and append extensions to output file

  • store (bool) – if store extracted packet info

  • files (bool) – if split each frame into different files

  • nofile (bool) – if no output file is to be dumped

  • verbose (Union[bool, Callable[[Extractor, Union[Frame, PCAPNG, Packet, Packet, Packet]], Any]]) – a bool value or a function takes the Extractor instance and current parsed frame (depends on engine selected) as parameters to print verbose output information

  • engine (Optional[Literal['default', 'pcapkit', 'dpkt', 'scapy', 'pyshark']]) – extraction engine to be used

  • layer (Optional[Literal['link', 'internet', 'transport', 'application', 'none']]) – extract til which layer

  • protocol (Union[str, ProtocolBase, Type[ProtocolBase], None]) – extract til which protocol

  • reassembly (bool) – if perform reassembly

  • reasm_strict (bool) – if set strict flag for reassembly

  • reasm_store (bool) – if store reassembled datagrams

  • trace (bool) – if trace TCP traffic flows

  • trace_fout (Optional[str]) – path name for flow tracer if necessary

  • trace_format (Optional[Literal['pcap', 'json', 'tree', 'plist']]) – output file format of flow tracer

  • trace_byteorder (Literal['big', 'little']) – output file byte order

  • trace_nanosecond (bool) – output nanosecond-resolution file flag

  • ip (bool) – if record data for IPv4 & IPv6 reassembly (must be used with reassembly=True)

  • ipv4 (bool) – if perform IPv4 reassembly (must be used with reassembly=True)

  • ipv6 (bool) – if perform IPv6 reassembly (must be used with reassembly=True)

  • tcp (bool) – if perform TCP reassembly and/or flow tracing (must be used with reassembly=True or trace=True)

  • buffer_size (int) – buffer size for reading input file (for SeekableReader only)

  • buffer_save (bool) – if save buffer to file (for SeekableReader only)

  • buffer_path (Optional[str]) – path name for buffer file if necessary (for SeekableReader only)

  • no_eof (bool) – if raise EOFError when EOF

Warns:

pcapkit.utilities.warnings.FormatWarning – Warns under following circumstances:

  • If using PCAP output for TCP flow tracing while the extraction engine is PyShark.

  • If output file format is not supported.

property length: int#

Frame number (of current extracted frame or all).

property format: Literal['pcap', 'json', 'tree', 'plist']#

Format of output file.

Raises:

UnsupportedCall – If self._flag_q is set as True, as output is disabled by initialisation parameter.

property input: str#

Name of input PCAP file.

property output: str#

Name of output file.

Raises:

UnsupportedCall – If self._flag_q is set as True, as output is disabled by initialisation parameter.

property frame: tuple[Frame | PCAPNG | Packet | Packet | Packet, ...]#

Extracted frames.

Raises:

UnsupportedCall – If self._flag_d is False, as storing frame data is disabled.

property reassembly: ReassemblyData#

Frame record for reassembly.

Raises:

UnsupportedCall – If self._flag_r is False, as reassembly is disabled.

property trace: TraceFlowData#

Index table for traced flow.

Raises:

UnsupportedCall – If self._flag_t is False, as flow tracing is disabled.

property engine: Engine#

PCAP extraction engine.

classmethod register_dumper(format, dumper, ext)[source]#

Register a new dumper class.

Notes

The full qualified class name of the new dumper class should be as {dumper.module}.{dumper.name}.

Parameters:
Return type:

None

classmethod register_engine(name, engine)[source]#

Register a new extraction engine.

Notes

The full qualified class name of the new extraction engine should be as {engine.module}.{engine.name}.

Parameters:
Return type:

None

classmethod register_reassembly(protocol, reassembly)[source]#

Register a new reassembly engine.

Notes

The full qualified class name of the new reassembly engine should be as {reassembly.module}.{reassembly.name}.

Parameters:
Return type:

None

classmethod register_traceflow(protocol, traceflow)[source]#

Register a new flow tracing engine.

Notes

The full qualified class name of the new flow tracing engine should be as {traceflow.module}.{traceflow.name}.

Parameters:
Return type:

None

run()[source]#

Start extraction.

We uses import_test() to check if a certain engine is available or not. For supported engines, each engine has different driver method:

Warns:

pcapkit.utilities.warnings.EngineWarning – If the extraction engine is not available. This is either due to dependency not installed, or supplied engine unknown.

Return type:

None

static import_test(engine, *, name=None)[source]#

Test import for extractcion engine.

Parameters:
  • engine (str) – Extraction engine module name.

  • name (Optional[str]) – Extraction engine display name.

Warns:

pcapkit.utilities.warnings.EngineWarning – If the engine module is not installed.

Return type:

Optional[ModuleType]

Returns:

If succeeded, returns the module; otherwise, returns None.

classmethod make_name(fin='in.pcap', fout='out', fmt='tree', extension=True, *, files=False, nofile=False)[source]#

Generate input and output filenames.

The method will perform following processing:

  1. sanitise fin as the input PCAP filename; in.pcap as default value and append .pcap extension if needed and extension is True; as well as test if the file exists;

  2. if nofile is True, skips following processing;

  3. if fmt provided, then it presumes corresponding output file extension;

  4. if fout not provided, it presumes the output file name based on the presumptive file extension; the stem of the output file name is set as out; should the file extension is not available, then it raises FormatError;

  5. if fout provided, it presumes corresponding output format if needed; should the presumption cannot be made, then it raises FormatError;

  6. it will also append corresponding file extension to the output file name if needed and extension is True.

And the method returns the generated input and output filenames as follows:

  1. input filename

  2. output filename / directory name

  3. output format

  4. output file extension (without .)

  5. if split each frame into different files

Parameters:
  • fin (Union[str, IO[bytes]]) – Input filename or a binary IO object.

  • fout (str) – Output filename.

  • fmt (Literal['pcap', 'json', 'tree', 'plist']) – Output file format.

  • extension (bool) – If append .pcap file extension to the input filename if fin does not have such file extension; if check and append extensions to output file.

  • files (bool) – If split each frame into different files.

  • nofile (bool) – If no output file is to be dumped.

Return type:

tuple[str, Optional[str], Literal['pcap', 'json', 'tree', 'plist'], Optional[str], bool]

Returns:

Generated input and output filenames.

Raises:
  • FileNotFound – If input file does not exists.

  • FormatError – If output format not provided and cannot be presumpted.

record_header()[source]#

Read global header.

The method will parse the PCAP global header and save the parsed result to its extraction context. Information such as PCAP version, data link layer protocol type, nanosecond flag and byteorder will also be save the current Engine instance as well.

If TCP flow tracing is enabled, the nanosecond flag and byteorder will be used for the output PCAP file of the traced TCP flows.

For output, the method will dump the parsed PCAP global header under the name of Global Header.

Return type:

Engine

record_frames()[source]#

Read packet frames.

The method calls self._exeng.read_frame to parse each frame from the input PCAP file; and performs cleanup by calling self._exeng.close upon completion of the parsing process. :rtype: None

Notes

Under non-auto mode, i.e. self._flag_a is False, the method performs no action.

__output__: DefaultDict[str, tuple[Union[ModuleDescriptor[Dumper], Type[Dumper]], str | None]]#

Format dumper mapping for writing output files. The values should be a tuple representing the module name and class name, or a dictdumper.dumper.Dumper subclass, and corresponding file extension.

__engine__: dict[str, Union[ModuleDescriptor[Engine], Type[Engine]]]#

Engine mapping for extracting frames. The values should be a tuple representing the module name and class name, or an Engine subclass.

__reassembly__: dict[str, Union[ModuleDescriptor[Reassembly], Type[Reassembly]]]#

Reassembly support mapping for extracting frames. The values should be a tuple representing the module name and class name, or a Reassembly subclass.

__traceflow__: dict[str, Union[ModuleDescriptor[TraceFlow], Type[TraceFlow]]]#

Flow tracing support mapping for extracting frames. The values should be a tuple representing the module name and class name, or a TraceFlow subclass.

_cleanup()[source]#

Cleanup after extraction & analysis.

The method calls self._exeng.close, sets self._flag_e as True and closes the input file (if necessary).

Return type:

None

_flag_a: bool#

Auto extract flag. It indicates if the extraction process should continue automatically until the EOF is reached.

_flag_d: bool#

Store data flag. It indicates if the extracted frames should be stored in memory.

_flag_e: bool#

EOF flag. It indicates if the EOF is reached.

_flag_q: bool#

No output file, i.e., no output file is to be generated.

_flag_t: bool#

Trace flag. It indicates if the flow tracing is enabled.

_flag_v: bool#

Verbose flag. This is used to determine if the verbose callback function should be called at each frame.

_flag_n: bool#

No EOF flag. It is useful when the input file is a live capture, as the extraction process will not stop until the user interrupt the process.

_flag_s: bool#

Input filename flag. It indicates if the input file is a file name or a binary IO object. For the latter, we should not close the file object after extraction.

_ifile: BufferedReader#

Input file object.

_ofile: Union[Dumper, Type[Dumper]]#

Output file object.

_frnum: int#

Frame number.

_reasm: ReassemblyManager#

Frame record for reassembly.

_trace: TraceFlowManager#

Frame record for flow tracing.

_exnam: Literal['default', 'pcapkit', 'dpkt', 'scapy', 'pyshark']#

Extraction engine name.

_exeng: Engine[TypeVar(_P)]#

Extraction engine instance.

_exlyr: Literal['link', 'internet', 'transport', 'application', 'none']#

Extract til layer.

_exptl: Union[str, ProtocolBase, Type[ProtocolBase]]#

Extract til protocol.

__iter__()[source]#

Iterate and parse PCAP frame.

Raises:

IterableError – If self._flag_a is True, as such operation is not applicable.

Return type:

Extractor

__next__()[source]#

Iterate and parse next PCAP frame.

It will call self._exeng.read_frame to parse next PCAP frame internally, until the EOF reached; then it calls self._cleanup for the aftermath.

Return type:

TypeVar(_P)

__call__()[source]#

Works as a simple wrapper for the iteration protocol.

Raises:

IterableError – If self._flag_a is True, as iteration is not applicable.

Return type:

TypeVar(_P)

Type Variables#

pcapkit.foundation.extraction._P: Any#