# -*- coding: utf-8 -*-
# mypy: disable-error-code=assignment
"""header schema for pcapng file format"""
import base64
import collections
import collections.abc
import io
import struct
import sys
from typing import TYPE_CHECKING, Any, cast
from pcapkit.const.pcapng.block_type import BlockType as Enum_BlockType
from pcapkit.const.pcapng.filter_type import FilterType as Enum_FilterType
from pcapkit.const.pcapng.hash_algorithm import HashAlgorithm as Enum_HashAlgorithm
from pcapkit.const.pcapng.option_type import OptionType as Enum_OptionType
from pcapkit.const.pcapng.record_type import RecordType as Enum_RecordType
from pcapkit.const.pcapng.secrets_type import SecretsType as Enum_SecretsType
from pcapkit.const.pcapng.verdict_type import VerdictType as Enum_VerdictType
from pcapkit.const.reg.linktype import LinkType as Enum_LinkType
from pcapkit.corekit.fields.collections import OptionField
from pcapkit.corekit.fields.ipaddress import (IPv4AddressField, IPv4InterfaceField,
IPv6AddressField, IPv6InterfaceField)
from pcapkit.corekit.fields.misc import ForwardMatchField, PayloadField, SchemaField, SwitchField
from pcapkit.corekit.fields.numbers import (EnumField, Int32Field, Int64Field, NumberField,
UInt8Field, UInt16Field, UInt32Field, UInt64Field)
from pcapkit.corekit.fields.strings import BitField, BytesField, PaddingField, StringField
from pcapkit.corekit.multidict import MultiDict, OrderedMultiDict
from pcapkit.protocols.schema.schema import EnumSchema, Schema, schema_final
from pcapkit.utilities.exceptions import FieldValueError, ProtocolError, stacklevel
from pcapkit.utilities.logging import SPHINX_TYPE_CHECKING
from pcapkit.utilities.warnings import ProtocolWarning, warn
__all__ = [
'PCAPNG',
'Option', 'UnknownOption',
'EndOfOption', 'CommentOption', 'CustomOption',
'IF_NameOption', 'IF_DescriptionOption', 'IF_IPv4AddrOption', 'IF_IPv6AddrOption',
'IF_MACAddrOption', 'IF_EUIAddrOption', 'IF_SpeedOption', 'IF_TSResolOption',
'IF_TZoneOption', 'IF_FilterOption', 'IF_OSOption', 'IF_FCSLenOption',
'IF_TSOffsetOption', 'IF_HardwareOption', 'IF_TxSpeedOption', 'IF_RxSpeedOption',
'EPB_FlagsOption', 'EPB_HashOption', 'EPB_DropCountOption', 'EPB_PacketIDOption',
'EPB_QueueOption', 'EPB_VerdictOption',
'NS_DNSNameOption', 'NS_DNSIP4AddrOption', 'NS_DNSIP6AddrOption',
'ISB_StartTimeOption', 'ISB_EndTimeOption', 'ISB_IFRecvOption', 'ISB_IFDropOption',
'ISB_FilterAcceptOption', 'ISB_OSDropOption', 'ISB_UsrDelivOption',
'PACK_FlagsOption', 'PACK_HashOption',
'NameResolutionRecord', 'UnknownRecord', 'EndRecord', 'IPv4Record', 'IPv6Record',
'DSBSecrets', 'UnknownSecrets', 'TLSKeyLog', 'WireGuardKeyLog', 'ZigBeeNWKKey',
'ZigBeeAPSKey',
'BlockType',
'UnknownBlock', 'SectionHeaderBlock', 'InterfaceDescriptionBlock',
'EnhancedPacketBlock', 'SimplePacketBlock', 'NameResolutionBlock',
'InterfaceStatisticsBlock', 'SystemdJournalExportBlock', 'DecryptionSecretsBlock',
'CustomBlock', 'PacketBlock',
]
if TYPE_CHECKING:
from ipaddress import IPv4Address, IPv4Interface, IPv6Address, IPv6Interface
from typing import Any, Callable, DefaultDict, Iterable, Optional, Type
from typing_extensions import Literal, Self
from pcapkit.corekit.fields.field import FieldBase as Field
from pcapkit.protocols.misc.pcapng import TLSKeyLabel, WireGuardKeyLabel
from pcapkit.protocols.protocol import ProtocolBase as Protocol
if SPHINX_TYPE_CHECKING:
from typing_extensions import TypedDict
[docs]
class ByteorderTest(TypedDict):
"""Test for byteorder."""
#: Byteorder magic number.
byteorder: int
[docs]
class ResolutionData(TypedDict):
"""Data for resolution."""
#: Resolution type flag (0: 10-based, 1: 2-based).
flag: int
#: Resolution value.
resolution: int
[docs]
class EPBFlags(TypedDict):
"""EPB flags."""
#: Inbound / Outbound packet (``00`` = information not available,
#: ``01`` = inbound, ``10`` = outbound)
direction: int
#: Reception type (``000`` = not specified, ``001`` = unicast,
#: ``010`` = multicast, ``011`` = broadcast, ``100`` = promiscuous).
reception: int
#: FCS length, in octets (``0000`` if this information is not available).
#: This value overrides the ``if_fcslen`` option of the Interface Description
#: Block, and is used with those link layers (e.g. PPP) where the length of
#: the FCS can change during time.
fcs_len: int
#: Link-layer-dependent error - CRC error (bit 24).
crc_error: int
#: Link-layer-dependent error - packet too long error (bit 25).
too_long: int
#: Link-layer-dependent error - packet too short error (bit 26).
too_short: int
#: Link-layer-dependent error - wrong Inter Frame Gap error (bit 27).
gap_error: int
#: Link-layer-dependent error - unaligned frame error (bit 28).
unaligned_error: int
#: Link-layer-dependent error - Start Frame Delimiter error (bit 29).
delimiter_error: int
#: Link-layer-dependent error - preamble error (bit 30).
preamble_error: int
#: Link-layer-dependent error - symbol error (bit 31).
symbol_error: int
[docs]
class PACKFlags(TypedDict):
"""PACK flags."""
#: Inbound / Outbound packet (``00`` = information not available,
#: ``01`` = inbound, ``10`` = outbound)
direction: int
#: Reception type (``000`` = not specified, ``001`` = unicast,
#: ``010`` = multicast, ``011`` = broadcast, ``100`` = promiscuous).
reception: int
#: FCS length, in octets (``0000`` if this information is not available).
#: This value overrides the ``if_fcslen`` option of the Interface Description
#: Block, and is used with those link layers (e.g. PPP) where the length of
#: the FCS can change during time.
fcs_len: int
#: Link-layer-dependent error - CRC error (bit 24).
crc_error: int
#: Link-layer-dependent error - packet too long error (bit 25).
too_long: int
#: Link-layer-dependent error - packet too short error (bit 26).
too_short: int
#: Link-layer-dependent error - wrong Inter Frame Gap error (bit 27).
gap_error: int
#: Link-layer-dependent error - unaligned frame error (bit 28).
unaligned_error: int
#: Link-layer-dependent error - Start Frame Delimiter error (bit 29).
delimiter_error: int
#: Link-layer-dependent error - preamble error (bit 30).
preamble_error: int
#: Link-layer-dependent error - symbol error (bit 31).
symbol_error: int
[docs]
def byteorder_callback(field: 'NumberField', packet: 'dict[str, Any]') -> 'None':
"""Update byte order of PCAP-NG file.
Args:
field: Field instance.
packet: Packet data.
"""
if 'byteorder' not in packet and '__packet__' in packet:
field._byteorder = packet['__packet__'].get('byteorder', sys.byteorder)
else:
field._byteorder = packet.get('byteorder', sys.byteorder)
[docs]
def shb_byteorder_callback(field: 'NumberField', packet: 'dict[str, Any]') -> 'None':
"""Update byte order of PCAP-NG file for SHB.
Args:
field: Field instance.
packet: Packet data.
"""
magic = packet['match']['byteorder'] # type: int
if magic == 0x1A2B3C4D:
field._byteorder = 'big'
elif magic == 0x4D3C2B1A:
field._byteorder = 'little'
else:
raise ProtocolError(f'unknown byteorder magic: {magic:#x}')
[docs]
def pcapng_block_selector(packet: 'dict[str, Any]') -> 'Field':
"""Selector function for :attr:`PCAPNG.block` field.
Args:
pkt: Packet data.
Returns:
Returns a :class:`~pcapkit.corekit.fields.misc.SchemaField`
wrapped :class:`~pcapkit.protocols.schema.misc.pcapng.BlockType`
subclass instance.
See Also:
* :class:`pcapkit.const.pcapng.block_type.BlockType`
* :class:`pcapkit.protocols.schema.misc.pcapng.BlockType`
"""
block_type = packet['type'] # type: Enum_BlockType
schema = BlockType.registry[block_type]
return SchemaField(length=packet['__length__'], schema=schema)
[docs]
def dsb_secrets_selector(packet: 'dict[str, Any]') -> 'Field':
"""Selector function for :attr:`DecryptionSecretsBlock.secrets_data` field.
Args:
pkt: Packet data.
Returns:
* If ``secrets_type`` is unknown, returns a
:class:`~pcapkit.corekit.fields.strings.BytesField` instance.
* If ``secret_type`` is :attr:`~pcapkit.const.pcapng.secrets_type.Secrets_Type.TLS_Key_Log`
and/or :attr:`~pcapkit.const.pcapng.secrets_type.Secrets_Type.WireGuard_Key_Log`,
returns a :class:`~pcapkit.corekit.fields.strings.StringField` instance.
* Otherwise, returns a :class:`~pcapkit.corekit.fields.misc.SchemaField`
wrapped :class:`~pcapkit.protocols.schema.misc.pcapng.DSBSecrets`
subclass instance.
See Also:
* :class:`pcapkit.const.pcapng.secrets_type.Secrets_Type`
* :class:`pcapkit.protocols.schema.misc.pcapng.DSBSecrets`
"""
secrets_type = packet['secrets_type'] # type: int
schema = DSBSecrets.registry[secrets_type]
return SchemaField(length=packet['secrets_length'], schema=schema)
class OptionEnumField(EnumField):
"""Enumerated value for protocol fields.
Args:
length: Field size (in bytes); if a callable is given, it should return
an integer value and accept the current packet as its only argument.
default: Field default value, if any.
signed: Whether the field is signed.
byteorder: Field byte order.
bit_length: Field bit length.
namespace: Option namespace, i.e., namespace of the enum item.
callback: Callback function to be called upon
:meth:`self.__call__ <pcapkit.corekit.fields.field.FieldBase.__call__>`.
Important:
This class is specifically designed for :class:`~pcapkit.const.pcapng.option_type.OptionType`
as it is actually a :class:`~enum.StrEnum` class.
"""
if TYPE_CHECKING:
_namespace: 'Enum_OptionType'
def __init__(self, length: 'int | Callable[[dict[str, Any]], int]',
default: 'Enum_OptionType' = Enum_OptionType.opt_endofopt, signed: 'bool' = False,
byteorder: 'Literal["little", "big"]' = 'big',
bit_length: 'Optional[int]' = None,
namespace: 'str' = 'opt',
callback: 'Callable[[Self, dict[str, Any]], None]' = lambda *_: None) -> 'None':
super().__init__(length, default, signed, byteorder, bit_length, Enum_OptionType, callback)
self._opt_ns = namespace
def pre_process(self, value: 'int | Enum_OptionType', packet: 'dict[str, Any]') -> 'int | bytes':
"""Process field value before construction (packing).
Arguments:
value: Field value.
packet: Packet data.
Returns:
Processed field value.
"""
if isinstance(value, Enum_OptionType):
value = value.opt_value
return super().pre_process(value, packet)
def post_process(self, value: 'int | bytes', packet: 'dict[str, Any]') -> 'Enum_OptionType':
"""Process field value after parsing (unpacked).
Args:
value: Field value.
packet: Packet data.
Returns:
Processed field value.
"""
value = super(EnumField, self).post_process(value, packet)
return self._namespace.get(value, namespace=self._opt_ns)
[docs]
@schema_final
class PCAPNG(Schema):
"""Header schema for PCAP-NG file blocks."""
#: Block type.
type: 'Enum_BlockType' = EnumField(length=4, namespace=Enum_BlockType, callback=byteorder_callback)
#: Block specific data.
block: 'BlockType' = SwitchField(
selector=pcapng_block_selector,
)
if TYPE_CHECKING:
def __init__(self, type: 'Enum_BlockType', block: 'BlockType | bytes') -> 'None': ...
[docs]
class BlockType(EnumSchema[Enum_BlockType]):
"""Header schema for PCAP-NG file blocks."""
__default__ = lambda: UnknownBlock
[docs]
def post_process(self, packet: 'dict[str, Any]') -> 'Schema':
"""Revise ``schema`` data after unpacking process.
This method validates the two block lengths and raises
:exc:`~pcapkit.utilities.exceptions.ProtocolError` if they are not
equal.
Args:
packet: Unpacked data.
Returns:
Revised schema.
"""
if self.length != self.length2:
block_type = packet.get('__packet__', {}).get('type', 'N/A')
#raise ProtocolError(f'PCAP-NG: [Block {block_type}] block length mismatch: {self.length} != {self.length2}')
warn(f'PCAP-NG: [Block {block_type}] block length mismatch: {self.length} != {self.length2}',
ProtocolWarning, stacklevel=stacklevel())
return self
if TYPE_CHECKING:
length: int
length2: int
[docs]
@schema_final
class UnknownBlock(BlockType):
"""Header schema for unknown PCAP-NG file blocks."""
#: Block total length.
length: 'int' = UInt32Field(callback=byteorder_callback)
#: Block body (including padding).
body: 'bytes' = BytesField(length=lambda pkt: pkt['length'] - 12)
#: Block total length.
length2: 'int' = UInt32Field(callback=byteorder_callback)
if TYPE_CHECKING:
def __init__(self, length: 'int', body: 'bytes', length2: 'int') -> 'None': ...
[docs]
class Option(EnumSchema[Enum_OptionType]):
"""Header schema for PCAP-NG file options."""
__additional__ = ['__enum__', '__namespace__']
__excluded__ = ['__enum__', '__namespace__']
#: Namespace of PCAP-NG option type numbers.
__namespace__: 'str' = None # type: ignore[assignment]
#: Mapping of PCAP-NG option type numbers to schemas.
__enum__: 'DefaultDict[str, DefaultDict[Enum_OptionType, Type[Option]]]' = collections.defaultdict(
lambda: Option.registry['opt'], {
'opt': collections.defaultdict(lambda: UnknownOption),
'if': collections.defaultdict(lambda: UnknownOption),
'epb': collections.defaultdict(lambda: UnknownOption),
'ns': collections.defaultdict(lambda: UnknownOption),
'isb': collections.defaultdict(lambda: UnknownOption),
'dsb': collections.defaultdict(lambda: UnknownOption),
'pack': collections.defaultdict(lambda: UnknownOption),
},
)
[docs]
def __init_subclass__(cls, /, code: 'Optional[Enum_OptionType | Iterable[Enum_OptionType]]' = None,
namespace: 'Optional[str]' = None, *args: 'Any', **kwargs: 'Any') -> 'None':
"""Register option type to :attr:`__enum__` mapping.
Args:
code: Option type code. It can be either a single option type enumeration
or a list of option type enumerations.
namespace: Namespace of option type enumeration. If not given, the value
will be inferred from the option type code.
*args: Arbitrary positional arguments.
**kwargs: Arbitrary keyword arguments.
If ``code`` is provided, the subclass will be registered to the
:attr:`__enum__` mapping with the given ``code``. If ``code`` is
not given, the subclass will not be registered.
Examples:
.. code-block:: python
from pcapkit.const.pcapng.option_type import OptionType as Enum_OptionType
from pcapkit.protocols.schema.misc.pcapng improt Option
class NewOption(Option, namespace='opt', code=Enum_OptionType.opt_new):
...
See Also:
- :class:`pcapkit.const.pcapng.option_type.OptionType`
"""
if namespace is not None:
cls.__namespace__ = namespace
if code is not None:
if namespace is None:
namespace = cast('Optional[str]', cls.__namespace__)
if not isinstance(code, Enum_OptionType):
for _code in code:
Option.register(_code, cls, namespace)
else:
Option.register(code, cls, namespace)
super().__init_subclass__()
[docs]
@staticmethod
def register(code: 'Enum_OptionType', cls: 'Type[Option]', ns: 'Optional[str]' = None) -> 'None':
"""Register option type to :attr:`__enum__` mapping.
Args:
code: Option type code.
cls: Option type schema.
ns: Namespace of option type enumeration. If not given, the value
will be inferred from the option type code.
"""
if ns is None:
ns = code.name.split('_')[0]
if ns == 'opt':
for key in Option.registry:
Option.registry[key][code] = cls
elif ns in Option.registry:
Option.registry[ns][code] = cls
else:
Option.registry[ns] = Option.registry['opt'].copy()
Option.registry[ns][code] = cls
if TYPE_CHECKING:
#: Option type.
type: 'Enum_OptionType'
#: Option data length.
length: 'int'
[docs]
class _OPT_Option(Option, namespace='opt'):
"""Header schema for ``opt_*`` options."""
#: Option type.
type: 'Enum_OptionType' = OptionEnumField(length=2, namespace='opt', callback=byteorder_callback)
#: Option data length.
length: 'int' = UInt16Field(callback=byteorder_callback)
[docs]
@schema_final
class UnknownOption(_OPT_Option):
"""Header schema for unknown PCAP-NG file options."""
#: Option value.
data: 'bytes' = BytesField(length=lambda pkt: pkt['length'])
#: Padding.
padding: 'bytes' = PaddingField(length=lambda pkt: (4 - pkt['length'] % 4) % 4)
if TYPE_CHECKING:
def __init__(self, type: 'Enum_OptionType', length: 'int', data: 'bytes') -> 'None': ...
[docs]
@schema_final
class EndOfOption(_OPT_Option, code=Enum_OptionType.opt_endofopt):
"""Header schema for PCAP-NG file ``opt_endofopt`` options."""
if TYPE_CHECKING:
def __init__(self, type: 'Enum_OptionType', length: 'int') -> 'None': ...
[docs]
@schema_final
class CustomOption(_OPT_Option, code=[Enum_OptionType.opt_custom_2988,
Enum_OptionType.opt_custom_2989,
Enum_OptionType.opt_custom_19372,
Enum_OptionType.opt_custom_19373]):
"""Header schema for PCAP-NG file ``opt_custom`` options."""
#: Private enterprise number (PEN).
pen: 'int' = UInt32Field(callback=byteorder_callback)
#: Custom data.
data: 'bytes' = BytesField(length=lambda pkt: pkt['length'] - 4)
#: Padding.
padding: 'bytes' = PaddingField(length=lambda pkt: (4 - pkt['length'] % 4) % 4)
if TYPE_CHECKING:
def __init__(self, type: 'int', length: 'int', pen: 'int', data: 'bytes') -> 'None': ...
[docs]
class _IF_Option(Option, namespace='if'):
"""Header schema for ``if_*`` options."""
#: Option type.
type: 'Enum_OptionType' = OptionEnumField(length=2, namespace='if', callback=byteorder_callback)
#: Option data length.
length: 'int' = UInt16Field(callback=byteorder_callback)
[docs]
@schema_final
class IF_NameOption(_IF_Option, code=Enum_OptionType.if_name):
"""Header schema for PCAP-NG file ``if_name`` options."""
#: Interface name.
name: 'str' = StringField(length=lambda pkt: pkt['length'], encoding='utf-8')
#: Padding.
padding: 'bytes' = PaddingField(length=lambda pkt: (4 - pkt['length'] % 4) % 4)
if TYPE_CHECKING:
def __init__(self, type: 'Enum_OptionType', length: 'int', name: 'str') -> 'None': ...
[docs]
@schema_final
class IF_DescriptionOption(_IF_Option, code=Enum_OptionType.if_description):
"""Header schema for PCAP-NG file ``if_description`` options."""
#: Interface description.
description: 'str' = StringField(length=lambda pkt: pkt['length'], encoding='utf-8')
#: Padding.
padding: 'bytes' = PaddingField(length=lambda pkt: (4 - pkt['length'] % 4) % 4)
if TYPE_CHECKING:
def __init__(self, type: 'Enum_OptionType', length: 'int', description: 'str') -> 'None': ...
[docs]
@schema_final
class IF_IPv4AddrOption(_IF_Option, code=Enum_OptionType.if_IPv4addr):
"""Header schema for PCAP-NG file ``if_IPv4addr`` options."""
#: IPv4 interface.
interface: 'IPv4Interface' = IPv4InterfaceField()
#: Padding.
padding: 'bytes' = PaddingField(length=lambda pkt: (4 - pkt['length'] % 4) % 4)
if TYPE_CHECKING:
def __init__(self, type: 'Enum_OptionType', length: 'int', interface: 'IPv4Interface | str') -> 'None': ...
[docs]
@schema_final
class IF_IPv6AddrOption(_IF_Option, code=Enum_OptionType.if_IPv6addr):
"""Header schema for PCAP-NG file ``if_IPv6addr`` options."""
#: IPv6 interface.
interface: 'IPv6Interface' = IPv6InterfaceField()
#: Padding.
padding: 'bytes' = PaddingField(length=lambda pkt: (4 - pkt['length'] % 4) % 4)
if TYPE_CHECKING:
def __init__(self, type: 'Enum_OptionType', length: 'int', interface: 'IPv6Interface | str') -> 'None': ...
[docs]
@schema_final
class IF_MACAddrOption(_IF_Option, code=Enum_OptionType.if_MACaddr):
"""Header schema for PCAP-NG file ``if_MACaddr`` options."""
#: MAC interface.
interface: 'bytes' = BytesField(length=6)
#: Padding.
padding: 'bytes' = PaddingField(length=lambda pkt: (4 - pkt['length'] % 4) % 4)
if TYPE_CHECKING:
def __init__(self, type: 'Enum_OptionType', length: 'int', interface: 'bytes') -> 'None': ...
[docs]
@schema_final
class IF_EUIAddrOption(_IF_Option, code=Enum_OptionType.if_EUIaddr):
"""Header schema for PCAP-NG file ``if_EUIaddr`` options."""
#: EUI interface.
interface: 'bytes' = BytesField(length=8)
#: Padding.
padding: 'bytes' = PaddingField(length=lambda pkt: (4 - pkt['length'] % 4) % 4)
if TYPE_CHECKING:
def __init__(self, type: 'Enum_OptionType', length: 'int', interface: 'bytes') -> 'None': ...
[docs]
@schema_final
class IF_SpeedOption(_IF_Option, code=Enum_OptionType.if_speed):
"""Header schema for PCAP-NG file ``if_speed`` options."""
#: Interface speed, in bits per second.
speed: 'int' = UInt64Field(callback=byteorder_callback)
#: Padding.
padding: 'bytes' = PaddingField(length=lambda pkt: (4 - pkt['length'] % 4) % 4)
if TYPE_CHECKING:
def __init__(self, type: 'Enum_OptionType', length: 'int', speed: 'int') -> 'None': ...
[docs]
@schema_final
class IF_TSResolOption(_IF_Option, code=Enum_OptionType.if_tsresol):
"""Header schema for PCAP-NG file ``if_tsresol`` options."""
#: Interface timestamp resolution, in units per second.
tsresol: 'ResolutionData' = BitField(length=1, namespace={
'flag': (0, 1),
'resolution': (1, 7),
})
#: Padding.
padding: 'bytes' = PaddingField(length=lambda pkt: (4 - pkt['length'] % 4) % 4)
[docs]
def post_process(self, packet: 'dict[str, Any]') -> 'IF_TSResolOption':
"""Revise ``schema`` data after unpacking process.
Args:
packet: Unpacked data.
Returns:
Revised schema.
"""
base = 10 if self.tsresol['flag'] == 0 else 2
self.resolution = base ** self.tsresol['resolution']
return self
if TYPE_CHECKING:
#: Interface timestamp resolution, in units per second.
resolution: 'int'
def __init__(self, type: 'Enum_OptionType', length: 'int', tsresol: 'ResolutionData') -> 'None': ...
[docs]
@schema_final
class IF_TZoneOption(_IF_Option, code=Enum_OptionType.if_tzone):
"""Header schema for PCAP-NG file ``if_tzone`` options."""
#: Interface time zone (as in seconds difference from GMT).
tzone: 'int' = Int32Field(callback=byteorder_callback)
#: Padding.
padding: 'bytes' = PaddingField(length=lambda pkt: (4 - pkt['length'] % 4) % 4)
if TYPE_CHECKING:
def __init__(self, type: 'Enum_OptionType', length: 'int', tzone: 'int') -> 'None': ...
[docs]
@schema_final
class IF_FilterOption(_IF_Option, code=Enum_OptionType.if_filter):
"""Header schema for PCAP-NG file ``if_filter`` options."""
#: Filter code.
code: 'Enum_FilterType' = EnumField(length=1, namespace=Enum_FilterType, callback=byteorder_callback)
#: Capture filter.
filter: 'bytes' = BytesField(length=lambda pkt: pkt['length'] - 1)
#: Padding.
padding: 'bytes' = PaddingField(length=lambda pkt: (4 - pkt['length'] % 4) % 4)
if TYPE_CHECKING:
def __init__(self, type: 'Enum_OptionType', length: 'int', code: 'Enum_FilterType', filter: 'bytes') -> 'None': ...
[docs]
@schema_final
class IF_OSOption(_IF_Option, code=Enum_OptionType.if_os):
"""Header schema for PCAP-NG file ``if_os`` options."""
#: OS information.
os: 'str' = StringField(length=lambda pkt: pkt['length'], encoding='utf-8')
#: Padding.
padding: 'bytes' = PaddingField(length=lambda pkt: (4 - pkt['length'] % 4) % 4)
if TYPE_CHECKING:
def __init__(self, type: 'Enum_OptionType', length: 'int', os: 'str') -> 'None': ...
[docs]
@schema_final
class IF_FCSLenOption(_IF_Option, code=Enum_OptionType.if_fcslen):
"""Header schema for PCAP-NG file ``if_fcslen`` options."""
#: FCS length.
fcslen: 'int' = UInt8Field(callback=byteorder_callback)
#: Padding.
padding: 'bytes' = PaddingField(length=lambda pkt: (4 - pkt['length'] % 4) % 4)
if TYPE_CHECKING:
def __init__(self, type: 'Enum_OptionType', length: 'int', fcslen: 'int') -> 'None': ...
[docs]
@schema_final
class IF_TSOffsetOption(_IF_Option, code=Enum_OptionType.if_tsoffset):
"""Header schema for PCAP-NG file ``if_tsoffset`` options."""
#: Timestamp offset (in seconds).
tsoffset: 'int' = Int64Field(callback=byteorder_callback)
#: Padding.
padding: 'bytes' = PaddingField(length=lambda pkt: (4 - pkt['length'] % 4) % 4)
if TYPE_CHECKING:
def __init__(self, type: 'Enum_OptionType', length: 'int', tsoffset: 'int') -> 'None': ...
[docs]
@schema_final
class IF_HardwareOption(_IF_Option, code=Enum_OptionType.if_hardware):
"""Header schema for PCAP-NG file ``if_hardware`` options."""
#: Hardware information.
hardware: 'str' = StringField(length=lambda pkt: pkt['length'], encoding='utf-8')
#: Padding.
padding: 'bytes' = PaddingField(length=lambda pkt: (4 - pkt['length'] % 4) % 4)
if TYPE_CHECKING:
def __init__(self, type: 'Enum_OptionType', length: 'int', hardware: 'str') -> 'None': ...
[docs]
@schema_final
class IF_TxSpeedOption(_IF_Option, code=Enum_OptionType.if_txspeed):
"""Header schema for PCAP-NG file ``if_txspeed`` options."""
#: Interface transmit speed, in bits per second.
tx_speed: 'int' = UInt64Field(callback=byteorder_callback)
#: Padding.
padding: 'bytes' = PaddingField(length=lambda pkt: (4 - pkt['length'] % 4) % 4)
if TYPE_CHECKING:
def __init__(self, type: 'Enum_OptionType', length: 'int', tx_speed: 'int') -> 'None': ...
[docs]
@schema_final
class IF_RxSpeedOption(_IF_Option, code=Enum_OptionType.if_rxspeed):
"""Header schema for PCAP-NG file ``if_rxspeed`` options."""
#: Interface receive speed, in bits per second.
rx_speed: 'int' = UInt64Field(callback=byteorder_callback)
#: Padding.
padding: 'bytes' = PaddingField(length=lambda pkt: (4 - pkt['length'] % 4) % 4)
if TYPE_CHECKING:
def __init__(self, type: 'Enum_OptionType', length: 'int', rx_speed: 'int') -> 'None': ...
[docs]
@schema_final
class InterfaceDescriptionBlock(BlockType, code=Enum_BlockType.Interface_Description_Block):
"""Header schema for PCAP-NG Interface Description Block (IDB)."""
#: Block total length.
length: 'int' = UInt32Field(callback=byteorder_callback)
#: Link type.
linktype: 'Enum_LinkType' = EnumField(length=2, namespace=Enum_LinkType, callback=byteorder_callback)
#: Reserved.
reserved: 'bytes' = PaddingField(length=2)
#: Snap length.
snaplen: 'int' = UInt32Field(default=0, callback=byteorder_callback)
#: Options.
options: 'list[Option]' = OptionField(
length=lambda pkt: pkt['length'] - 20,
base_schema=_IF_Option,
type_name='type',
registry=Option.registry['if'],
eool=Enum_OptionType.opt_endofopt,
)
#: Padding.
padding: 'bytes' = PaddingField(length=lambda pkt: pkt['__option_padding__'])
#: Block total length.
length2: 'int' = UInt32Field(callback=byteorder_callback)
if TYPE_CHECKING:
def __init__(self, length: 'int', linktype: 'int', snaplen: 'int',
options: 'list[Option | bytes] | bytes', length2: 'int') -> 'None': ...
[docs]
class _EPB_Option(Option, namespace='epb'):
"""Header schema for ``epb_*`` options."""
#: Option type.
type: 'Enum_OptionType' = OptionEnumField(length=2, namespace='epb', callback=byteorder_callback)
#: Option data length.
length: 'int' = UInt16Field(callback=byteorder_callback)
[docs]
@schema_final
class EPB_FlagsOption(_EPB_Option, code=Enum_OptionType.epb_flags):
"""Header schema for PCAP-NG ``epb_flags`` options."""
#: Flags.
flags: 'EPBFlags' = BitField(length=4, namespace={
'direction': (0, 2),
'reception': (2, 3),
'fcs_len': (5, 4),
'crc_error': (24, 1),
'too_long': (25, 1),
'too_short': (26, 1),
'gap_error': (27, 1),
'unaligned_error': (28, 1),
'delimiter_error': (29, 1),
'preamble_error': (30, 1),
'symbol_error': (31, 1),
})
#: Padding.
padding: 'bytes' = PaddingField(length=lambda pkt: (4 - pkt['length'] % 4) % 4)
if TYPE_CHECKING:
def __init__(self, type: 'Enum_OptionType', length: 'int', flags: 'EPBFlags') -> 'None': ...
[docs]
@schema_final
class EPB_HashOption(_EPB_Option, code=Enum_OptionType.epb_hash):
"""Header schema for PCAP-NG ``epb_hash`` options."""
#: Hash algorithm.
func: 'Enum_HashAlgorithm' = EnumField(length=1, namespace=Enum_HashAlgorithm, callback=byteorder_callback)
#: Hash value.
data: 'bytes' = BytesField(length=lambda pkt: pkt['length'] - 1)
#: Padding.
padding: 'bytes' = PaddingField(length=lambda pkt: (4 - pkt['length'] % 4) % 4)
if TYPE_CHECKING:
def __init__(self, type: 'Enum_OptionType', length: 'int', func: 'Enum_HashAlgorithm', data: 'bytes') -> 'None': ...
[docs]
@schema_final
class EPB_DropCountOption(_EPB_Option, code=Enum_OptionType.epb_dropcount):
"""Header schema for PCAP-NG ``epb_dropcount`` options."""
#: Number of packets dropped by the interface.
drop_count: 'int' = UInt64Field(callback=byteorder_callback)
#: Padding.
padding: 'bytes' = PaddingField(length=lambda pkt: (4 - pkt['length'] % 4) % 4)
if TYPE_CHECKING:
def __init__(self, type: 'Enum_OptionType', length: 'int', drop_count: 'int') -> 'None': ...
[docs]
@schema_final
class EPB_PacketIDOption(_EPB_Option, code=Enum_OptionType.epb_packetid):
"""Header schema for PCAP-NG ``epb_packetid`` options."""
#: Packet ID.
packet_id: 'int' = UInt64Field(callback=byteorder_callback)
#: Padding.
padding: 'bytes' = PaddingField(length=lambda pkt: (4 - pkt['length'] % 4) % 4)
if TYPE_CHECKING:
def __init__(self, type: 'Enum_OptionType', length: 'int', packet_id: 'int') -> 'None': ...
[docs]
@schema_final
class EPB_QueueOption(_EPB_Option, code=Enum_OptionType.epb_queue):
"""Header schema for PCAP-NG ``epb_queue`` options."""
#: Queue ID.
queue_id: 'int' = UInt32Field(callback=byteorder_callback)
#: Padding.
padding: 'bytes' = PaddingField(length=lambda pkt: (4 - pkt['length'] % 4) % 4)
if TYPE_CHECKING:
def __init__(self, type: 'Enum_OptionType', length: 'int', queue_id: 'int') -> 'None': ...
[docs]
@schema_final
class EPB_VerdictOption(_EPB_Option, code=Enum_OptionType.epb_verdict):
"""Header schema for PCAP-NG ``epb_verdict`` options."""
#: Verdict type.
verdict: 'Enum_VerdictType' = EnumField(length=1, namespace=Enum_VerdictType, callback=byteorder_callback)
#: Verdict value.
value: 'bytes' = BytesField(length=lambda pkt: pkt['length'] - 1)
#: Padding.
padding: 'bytes' = PaddingField(length=lambda pkt: (4 - pkt['length'] % 4) % 4)
if TYPE_CHECKING:
def __init__(self, type: 'Enum_OptionType', length: 'int', verdict: 'Enum_VerdictType', value: 'bytes') -> 'None': ...
[docs]
@schema_final
class EnhancedPacketBlock(BlockType, code=Enum_BlockType.Enhanced_Packet_Block):
"""Header schema for PCAP-NG Enhanced Packet Block (EPB)."""
__payload__ = 'packet_data'
#: Block total length.
length: 'int' = UInt32Field(callback=byteorder_callback)
#: Interface ID.
interface_id: 'int' = UInt32Field(callback=byteorder_callback)
#: Higher 32-bit of timestamp (in seconds).
timestamp_high: 'int' = UInt32Field(callback=byteorder_callback)
#: Lower 32-bit of timestamp (in seconds).
timestamp_low: 'int' = UInt32Field(callback=byteorder_callback)
#: Captured packet length.
captured_len: 'int' = UInt32Field(callback=byteorder_callback)
#: Original packet length.
original_len: 'int' = UInt32Field(callback=byteorder_callback)
#: Packet data.
packet_data: 'bytes' = PayloadField(length=lambda pkt: pkt['captured_len'])
#: Padding.
padding_data: 'bytes' = PaddingField(length=lambda pkt: (4 - pkt['captured_len'] % 4) % 4)
#: Options.
options: 'list[Option]' = OptionField(
length=lambda pkt: pkt['length'] - 32 - pkt['captured_len'] - len(pkt['padding_data']),
base_schema=_EPB_Option,
type_name='type',
registry=Option.registry['epb'],
eool=Enum_OptionType.opt_endofopt,
)
#: Padding.
padding_opts: 'bytes' = PaddingField(length=lambda pkt: pkt['__option_padding__'])
#: Block total length.
length2: 'int' = UInt32Field(callback=byteorder_callback)
if TYPE_CHECKING:
def __init__(self, length: 'int', interface_id: 'int', timestamp_high: 'int',
timestamp_low: 'int', captured_len: 'int', original_len: 'int',
packet_data: 'bytes | Protocol | Schema',
options: 'list[Option | bytes] | bytes', length2: 'int') -> 'None': ...
[docs]
@schema_final
class SimplePacketBlock(BlockType, code=Enum_BlockType.Simple_Packet_Block):
"""Header schema for PCAP-NG Simple Packet Block (SPB)."""
__payload__ = 'packet_data'
#: Block total length.
length: 'int' = UInt32Field(callback=byteorder_callback)
#: Original packet length.
original_len: 'int' = UInt32Field(callback=byteorder_callback)
#: Packet data.
packet_data: 'bytes' = PayloadField(length=lambda pkt: min(pkt.get('snaplen', 0xFFFFFFFFFFFFFFFF),
pkt['original_len']))
#: Padding.
padding: 'bytes' = PaddingField(length=lambda pkt: (4 - len(pkt['packet_data']) % 4) % 4)
#: Block total length.
length2: 'int' = UInt32Field(callback=byteorder_callback)
if TYPE_CHECKING:
def __init__(self, length: 'int', original_len: 'int',
packet_data: 'bytes | Protocol | Schema',
length2: 'int') -> 'None': ...
[docs]
class NameResolutionRecord(EnumSchema[Enum_RecordType]):
"""Header schema for PCAP-NG NRB records."""
__default__ = lambda: UnknownRecord
#: Record type.
type: 'Enum_RecordType' = EnumField(length=2, namespace=Enum_RecordType, callback=byteorder_callback)
#: Record value length.
length: 'int' = UInt16Field(callback=byteorder_callback)
[docs]
@schema_final
class UnknownRecord(NameResolutionRecord):
"""Header schema for PCAP-NG NRB unknown records."""
#: Unknown record data.
data: 'bytes' = BytesField(length=lambda pkt: pkt['length'])
#: Padding.
padding: 'bytes' = PaddingField(length=lambda pkt: (4 - pkt['length'] % 4) % 4)
if TYPE_CHECKING:
def __init__(self, type: 'Enum_RecordType', length: 'int', data: 'bytes') -> 'None': ...
[docs]
@schema_final
class EndRecord(NameResolutionRecord, code=Enum_RecordType.nrb_record_end):
"""Header schema for PCAP-NG ``nrb_record_end`` records."""
if TYPE_CHECKING:
def __init__(self, type: 'Enum_RecordType', length: 'int') -> 'None': ...
[docs]
@schema_final
class IPv4Record(NameResolutionRecord, code=Enum_RecordType.nrb_record_ipv4):
"""Header schema for PCAP-NG NRB ``nrb_record_ipv4`` records."""
#: IPv4 address.
ip: 'IPv4Address' = IPv4AddressField()
#: Name resolution data.
resol: 'str' = StringField(length=lambda pkt: pkt['length'] - 4)
#: Padding.
padding: 'bytes' = PaddingField(length=lambda pkt: (4 - pkt['length'] % 4) % 4)
[docs]
def post_process(self, packet: 'dict[str, Any]') -> 'Schema':
"""Revise ``schema`` data after unpacking process.
Args:
packet: Unpacked data.
Returns:
Revised schema.
"""
self.names = self.resol.rstrip('\x00').split('\x00')
return self
if TYPE_CHECKING:
#: Name resolution records.
names: 'list[str]'
def __init__(self, type: 'Enum_RecordType', length: 'int', ip: 'IPv4Address | str | bytes | int', resol: 'str') -> 'None': ...
[docs]
@schema_final
class IPv6Record(NameResolutionRecord, code=Enum_RecordType.nrb_record_ipv6):
"""Header schema for PCAP-NG NRB ``nrb_record_ipv4`` records."""
#: IPv4 address.
ip: 'IPv6Address' = IPv6AddressField()
#: Name resolution data.
resol: 'str' = StringField(length=lambda pkt: pkt['length'] - 4)
#: Padding.
padding: 'bytes' = PaddingField(length=lambda pkt: (4 - pkt['length'] % 4) % 4)
[docs]
def post_process(self, packet: 'dict[str, Any]') -> 'Schema':
"""Revise ``schema`` data after unpacking process.
Args:
packet: Unpacked data.
Returns:
Revised schema.
"""
self.names = self.resol.rstrip('\x00').split('\x00')
return self
if TYPE_CHECKING:
#: Name resolution records.
names: 'list[str]'
def __init__(self, type: 'Enum_RecordType', length: 'int', ip: 'IPv6Address | str | bytes | int', resol: 'str') -> 'None': ...
[docs]
class _NS_Option(Option, namespace='ns'):
"""Header schema for ``ns_*`` options."""
#: Option type.
type: 'Enum_OptionType' = OptionEnumField(length=2, namespace='ns', callback=byteorder_callback)
#: Option data length.
length: 'int' = UInt16Field(callback=byteorder_callback)
[docs]
@schema_final
class NS_DNSNameOption(_NS_Option, code=Enum_OptionType.ns_dnsname):
"""Header schema for PCAP-NG ``ns_dnsname`` option."""
#: DNS name.
name: 'str' = StringField(length=lambda pkt: pkt['length'])
if TYPE_CHECKING:
def __init__(self, type: 'Enum_OptionType', length: 'int', name: 'str') -> 'None': ...
[docs]
@schema_final
class NS_DNSIP4AddrOption(_NS_Option, code=Enum_OptionType.ns_dnsIP4addr):
"""Header schema for PCAP-NG ``ns_dnsIP4addr`` option."""
#: IPv4 address.
ip: 'IPv4Address' = IPv4AddressField()
if TYPE_CHECKING:
def __init__(self, type: 'Enum_OptionType', length: 'int', ip: 'IPv4Address | str | bytes | int') -> 'None': ...
[docs]
@schema_final
class NS_DNSIP6AddrOption(_NS_Option, code=Enum_OptionType.ns_dnsIP6addr):
"""Header schema for PCAP-NG ``ns_dnsIP6addr`` option."""
#: IPv6 address.
ip: 'IPv6Address' = IPv6AddressField()
if TYPE_CHECKING:
def __init__(self, type: 'Enum_OptionType', length: 'int', ip: 'IPv6Address | bytes | str | int') -> 'None': ...
[docs]
@schema_final
class NameResolutionBlock(BlockType, code=Enum_BlockType.Name_Resolution_Block):
"""Header schema for PCAP-NG Name Resolution Block (NRB)."""
#: Record total length.
length: 'int' = UInt32Field(callback=byteorder_callback)
#: Name resolution records.
records: 'list[NameResolutionRecord]' = OptionField(
length=lambda pkt: pkt['length'] - 12,
base_schema=NameResolutionRecord,
type_name='type',
registry=NameResolutionRecord.registry,
eool=Enum_RecordType.nrb_record_end,
)
#: Options.
options: 'list[Option]' = OptionField(
length=lambda pkt: pkt['__option_padding__'] - 4 if pkt['__option_padding__'] else 0,
base_schema=_NS_Option,
type_name='type',
registry=Option.registry['ns'],
eool=Enum_OptionType.opt_endofopt,
)
#: Padding.
padding: 'bytes' = PaddingField(length=lambda pkt: pkt['__option_padding__'])
#: Block total length.
length2: 'int' = UInt32Field(callback=byteorder_callback)
[docs]
def post_process(self, packet: 'dict[str, Any]') -> 'Self':
"""Revise ``schema`` data after unpacking process.
Args:
packet: Unpacked data.
Returns:
Revised schema.
"""
self = cast('Self', super().post_process(packet))
mapping = MultiDict() # type: MultiDict[IPv4Address | IPv6Address, str]
reverse_mapping = MultiDict() # type: MultiDict[str, IPv4Address | IPv6Address]
for record in self.records:
if isinstance(record, (IPv4Record, IPv6Record)):
for name in record.names:
mapping.add(record.ip, name)
reverse_mapping.add(name, record.ip)
self.mapping = mapping
self.reverse_mapping = reverse_mapping
return self
if TYPE_CHECKING:
#: Name resolution mapping (IP address -> name).
mapping: 'MultiDict[IPv4Address | IPv6Address, str]'
#: Name resolution mapping (name -> IP address).
reverse_mapping: 'MultiDict[str, IPv4Address | IPv6Address]'
def __init__(self, length: 'int',
records: 'list[NameResolutionRecord | bytes] | bytes',
options: 'list[Option | bytes] | bytes', length2: 'int') -> 'None': ...
[docs]
class _ISB_Option(Option, namespace='isb'):
"""Header schema for ``isb_*`` options."""
#: Option type.
type: 'Enum_OptionType' = OptionEnumField(length=2, namespace='isb', callback=byteorder_callback)
#: Option data length.
length: 'int' = UInt16Field(callback=byteorder_callback)
[docs]
@schema_final
class ISB_StartTimeOption(_ISB_Option, code=Enum_OptionType.isb_starttime):
"""Header schema for PCAP-NG ``isb_starttime`` option."""
#: Timestamp (higher 32 bits).
timestamp_high: 'int' = UInt32Field(callback=byteorder_callback)
#: Timestamp (lower 32 bits).
timestamp_low: 'int' = UInt32Field(callback=byteorder_callback)
if TYPE_CHECKING:
def __init__(self, type: 'Enum_OptionType', length: 'int', timestamp_high: 'int', timestamp_low: 'int') -> 'None': ...
[docs]
@schema_final
class ISB_EndTimeOption(_ISB_Option, code=Enum_OptionType.isb_endtime):
"""Header schema for PCAP-NG ``isb_endtime`` option."""
#: Timestamp (higher 32 bits).
timestamp_high: 'int' = UInt32Field(callback=byteorder_callback)
#: Timestamp (lower 32 bits).
timestamp_low: 'int' = UInt32Field(callback=byteorder_callback)
if TYPE_CHECKING:
def __init__(self, type: 'Enum_OptionType', length: 'int', timestamp_high: 'int', timestamp_low: 'int') -> 'None': ...
[docs]
@schema_final
class ISB_IFRecvOption(_ISB_Option, code=Enum_OptionType.isb_ifrecv):
"""Header schema for PCAP-NG ``isb_ifrecv`` option."""
#: Number of packets received.
packets: 'int' = UInt64Field(callback=byteorder_callback)
if TYPE_CHECKING:
def __init__(self, type: 'Enum_OptionType', length: 'int', packets: 'int') -> 'None': ...
[docs]
@schema_final
class ISB_IFDropOption(_ISB_Option, code=Enum_OptionType.isb_ifdrop):
"""Header schema for PCAP-NG ``isb_ifdrop`` option."""
#: Number of packets dropped.
packets: 'int' = UInt64Field(callback=byteorder_callback)
if TYPE_CHECKING:
def __init__(self, type: 'Enum_OptionType', length: 'int', packets: 'int') -> 'None': ...
[docs]
@schema_final
class ISB_FilterAcceptOption(_ISB_Option, code=Enum_OptionType.isb_filteraccept):
"""Header schema for PCAP-NG ``isb_filteraccept`` option."""
#: Number of packets accepted by filter.
packets: 'int' = UInt64Field(callback=byteorder_callback)
if TYPE_CHECKING:
def __init__(self, type: 'Enum_OptionType', length: 'int', packets: 'int') -> 'None': ...
[docs]
@schema_final
class ISB_OSDropOption(_ISB_Option, code=Enum_OptionType.isb_osdrop):
"""Header schema for PCAP-NG ``isb_osdrop`` option."""
#: Number of packets dropped by OS.
packets: 'int' = UInt64Field(callback=byteorder_callback)
if TYPE_CHECKING:
def __init__(self, type: 'Enum_OptionType', length: 'int', packets: 'int') -> 'None': ...
[docs]
@schema_final
class ISB_UsrDelivOption(_ISB_Option, code=Enum_OptionType.isb_usrdeliv):
"""Header schema for PCAP-NG ``isb_usrdeliv`` option."""
#: Number of packets delivered to user.
packets: 'int' = UInt64Field(callback=byteorder_callback)
if TYPE_CHECKING:
def __init__(self, type: 'Enum_OptionType', length: 'int', packets: 'int') -> 'None': ...
[docs]
@schema_final
class InterfaceStatisticsBlock(BlockType):
"""Header schema for PCAP-NG Interface Statistics Block (ISB)."""
#: Block total length.
length: 'int' = UInt32Field(callback=byteorder_callback)
#: Interface ID.
interface_id: 'int' = UInt32Field(callback=byteorder_callback)
#: Timestamp (higher 32 bits).
timestamp_high: 'int' = UInt32Field(callback=byteorder_callback)
#: Timestamp (lower 32 bits).
timestamp_low: 'int' = UInt32Field(callback=byteorder_callback)
#: Options.
options: 'list[Option]' = OptionField(
length=lambda pkt: pkt['length'] - 20,
base_schema=_ISB_Option,
type_name='type',
registry=Option.registry['isb'],
eool=Enum_OptionType.opt_endofopt,
)
#: Padding.
padding: 'bytes' = PaddingField(length=lambda pkt: pkt['__option_padding__'])
#: Block total length.
length2: 'int' = UInt32Field(callback=byteorder_callback)
if TYPE_CHECKING:
def __init__(self, length: 'int', interface_id: 'int',
timestamp_high: 'int', timestamp_low: 'int',
options: 'list[Option | bytes] | bytes', length2: 'int') -> 'None': ...
[docs]
@schema_final
class SystemdJournalExportBlock(BlockType, code=Enum_BlockType.systemd_Journal_Export_Block):
"""Header schema for PCAP-NG :manpage:`systemd(1)` Journal Export Block."""
#: Block total length.
length: 'int' = UInt32Field(callback=byteorder_callback)
#: Journal entry.
entry: 'bytes' = BytesField(length=lambda pkt: pkt['length'] - 12)
#: Block total length.
length2: 'int' = UInt32Field(callback=byteorder_callback)
[docs]
def post_process(self, packet: 'dict[str, Any]') -> 'Self':
"""Revise ``schema`` data after unpacking process.
Args:
packet: Unpacked data.
Returns:
Revised schema.
"""
self = cast('Self', super().post_process(packet))
data = [] # type: list[OrderedMultiDict[str, str | bytes]]
for entry_buffer in self.entry.split(b'\n\n'):
entry = OrderedMultiDict() # type: OrderedMultiDict[str, str | bytes]
entry_data = io.BytesIO(entry_buffer)
while True:
line = entry_data.readline().strip()
if not line:
break
line_split = line.split(b'=', maxsplit=1)
if len(line_split) == 2:
key, value = line_split
entry.add(key.decode('utf-8'), value.decode('utf-8'))
else:
length = struct.unpack('<Q', entry_data.read(4))[0] # type: int
entry.add(line.decode('utf-8'), entry_data.read(length))
entry_data.read() # Skip trailing newline.
data.append(entry)
self.data = data
return self
if TYPE_CHECKING:
#: Journal entry (decoded).
data: 'list[OrderedMultiDict[str, str | bytes]]'
def __init__(self, length: 'int', entry: 'bytes', length2: 'int') -> 'None': ...
[docs]
class DSBSecrets(EnumSchema[Enum_SecretsType]):
"""Header schema for DSB secrets data."""
__default__ = lambda: UnknownSecrets
[docs]
@schema_final
class UnknownSecrets(DSBSecrets):
"""Header schema for unknown DSB secrets data."""
#: Secrets data.
data: 'bytes' = BytesField(length=lambda pkt: pkt['__length__'])
if TYPE_CHECKING:
def __init__(self, data: 'bytes') -> 'None': ...
[docs]
@schema_final
class TLSKeyLog(DSBSecrets, code=Enum_SecretsType.TLS_Key_Log):
"""Header schema for TLS Key Log secrets data."""
#: TLS key log data.
data: 'str' = StringField(length=lambda pkt: pkt['__length__'], encoding='ascii')
[docs]
def post_process(self, packet: 'dict[str, Any]') -> 'Schema':
"""Revise ``schema`` data after unpacking process.
Args:
packet: Unpacked data.
Returns:
Revised schema.
"""
from pcapkit.protocols.misc.pcapng import TLSKeyLabel
entries = collections.defaultdict(OrderedMultiDict) # type: dict[TLSKeyLabel, OrderedMultiDict[bytes, bytes]]
for line in self.data.splitlines():
if not line or line.startswith('#'):
continue
label, random, secret = line.strip().split()
label_enum = TLSKeyLabel(label.upper())
entries[label_enum].add(bytes.fromhex(random),
bytes.fromhex(secret))
self.entries = entries
return self
if TYPE_CHECKING:
#: TLS Key Log entries.
entries: 'dict[TLSKeyLabel, OrderedMultiDict[bytes, bytes]]'
def __init__(self, data: 'str') -> 'None': ...
[docs]
@schema_final
class WireGuardKeyLog(DSBSecrets, code=Enum_SecretsType.WireGuard_Key_Log):
"""Header schema for WireGuard Key Log secrets data."""
#: WireGuard key log data.
data: 'str' = StringField(length=lambda pkt: pkt['__length__'], encoding='ascii')
[docs]
def post_process(self, packet: 'dict[str, Any]') -> 'Schema':
"""Revise ``schema`` data after unpacking process.
Args:
packet: Unpacked data.
Returns:
Revised schema.
"""
from pcapkit.protocols.misc.pcapng import WireGuardKeyLabel
entries = OrderedMultiDict() # type: OrderedMultiDict[WireGuardKeyLabel, bytes]
for line in self.data.splitlines():
if not line or line.startswith('#'):
continue
label, op, secret = line.strip().split()
if op != '=':
raise FieldValueError('invalid WireGuard key log format: {line!r}')
label_enum = WireGuardKeyLabel(label.upper())
entries.add(label_enum, base64.b64decode(secret))
self.entries = entries
return self
if TYPE_CHECKING:
#: WireGuard Key Log entries.
entries: 'OrderedMultiDict[WireGuardKeyLabel, bytes]'
def __init__(self, data: 'str') -> 'None': ...
[docs]
@schema_final
class ZigBeeNWKKey(DSBSecrets, code=Enum_SecretsType.ZigBee_NWK_Key):
"""Header schema for ZigBee NWK Key and ZigBee PANID secrets data."""
#: AES-128 NKW key.
key: 'bytes' = BytesField(length=16)
#: ZigBee PANID.
panid: 'int' = UInt16Field(byteorder='little')
#: Padding.
padding: 'bytes' = BytesField(length=2)
if TYPE_CHECKING:
def __init__(self, key: 'bytes', panid: 'int') -> 'None': ...
[docs]
@schema_final
class ZigBeeAPSKey(DSBSecrets, code=Enum_SecretsType.ZigBee_APS_Key):
"""Header schema for ZigBee APS Key secrets data."""
#: AES-128 APS key.
key: 'bytes' = BytesField(length=16)
#: ZigBee PANID.
panid: 'int' = UInt16Field(byteorder='little')
#: Low node short address.
addr_low: 'int' = UInt16Field(byteorder='little')
#: High node short address.
addr_high: 'int' = UInt16Field(byteorder='little')
#: Padding.
padding: 'bytes' = BytesField(length=2)
if TYPE_CHECKING:
def __init__(self, key: 'bytes', panid: 'int', addr_low: 'int', addr_high: 'int') -> 'None': ...
class _DSB_Option(Option, namespace='dsb'):
"""Header schema for ``dsb_*`` options."""
#: Option type.
type: 'Enum_OptionType' = OptionEnumField(length=2, namespace='dsb', callback=byteorder_callback)
#: Option data length.
length: 'int' = UInt16Field(callback=byteorder_callback)
[docs]
@schema_final
class DecryptionSecretsBlock(BlockType, code=Enum_BlockType.Decryption_Secrets_Block):
"""Header schema for PCAP-NG Decryption Secrets Block (DSB)."""
#: Block total length.
length: 'int' = UInt32Field(callback=byteorder_callback)
#: Secrets type.
secrets_type: 'Enum_SecretsType' = EnumField(length=4, namespace=Enum_SecretsType, callback=byteorder_callback)
#: Secrets length.
secrets_length: 'int' = UInt32Field(callback=byteorder_callback)
#: Secrets data.
secrets_data: 'DSBSecrets' = SwitchField(
selector=dsb_secrets_selector,
)
#: Padding.
padding_data: 'bytes' = BytesField(length=lambda pkt: (4 - pkt['secrets_length'] % 4) % 4)
#: Options.
options: 'list[Option]' = OptionField(
length=lambda pkt: pkt['length'] - 20 - pkt['secrets_length'] - len(pkt['padding_data']),
base_schema=_DSB_Option,
type_name='type',
registry=Option.registry['dsb'],
eool=Enum_OptionType.opt_endofopt,
)
#: Padding.
padding_opts: 'bytes' = PaddingField(length=lambda pkt: pkt['__option_padding__'])
#: Block total length.
length2: 'int' = UInt32Field(callback=byteorder_callback)
if TYPE_CHECKING:
def __init__(self, length: 'int', secrets_type: 'Enum_SecretsType',
secrets_length: 'int', secrets_data: 'DSBSecrets | bytes',
options: 'list[Option | bytes] | bytes', length2: 'int') -> 'None': ...
[docs]
@schema_final
class CustomBlock(BlockType, code=[Enum_BlockType.Custom_Block_that_rewriters_can_copy_into_new_files,
Enum_BlockType.Custom_Block_that_rewriters_should_not_copy_into_new_files]):
"""Header schema for PCAP-NG Custom Block (CB)."""
#: Block total length.
length: 'int' = UInt32Field(callback=byteorder_callback)
#: Private enterprise number.
pen: 'int' = UInt32Field(callback=byteorder_callback)
#: Custom data.
data: 'bytes' = BytesField(length=lambda pkt: pkt['length'] - 16)
#: Padding.
padding: 'bytes' = BytesField(length=lambda pkt: (4 - pkt['data'] % 4) % 4)
#: Block total length.
length2: 'int' = UInt32Field(callback=byteorder_callback)
if TYPE_CHECKING:
def __init__(self, length: 'int', pen: 'int', data: 'bytes', length2: 'int') -> 'None': ...
[docs]
class _PACK_Option(Option, namespace='pack'):
"""Header schema for ``pack_*`` options."""
#: Option type.
type: 'Enum_OptionType' = OptionEnumField(length=2, namespace='pack', callback=byteorder_callback)
#: Option data length.
length: 'int' = UInt16Field(callback=byteorder_callback)
[docs]
@schema_final
class PACK_FlagsOption(_PACK_Option, code=Enum_OptionType.pack_flags):
"""Header schema for PCAP-NG ``pack_flags`` options."""
#: Flags.
flags: 'PACKFlags' = BitField(length=4, namespace={
'direction': (0, 2),
'reception': (2, 3),
'fcs_len': (5, 4),
'crc_error': (24, 1),
'too_long': (25, 1),
'too_short': (26, 1),
'gap_error': (27, 1),
'unaligned_error': (28, 1),
'delimiter_error': (29, 1),
'preamble_error': (30, 1),
'symbol_error': (31, 1),
})
#: Padding.
padding: 'bytes' = PaddingField(length=lambda pkt: (4 - pkt['length'] % 4) % 4)
if TYPE_CHECKING:
def __init__(self, type: 'Enum_OptionType', length: 'int', flags: 'EPBFlags') -> 'None': ...
[docs]
@schema_final
class PACK_HashOption(_PACK_Option, code=Enum_OptionType.pack_hash):
"""Header schema for PCAP-NG ``pack_hash`` options."""
#: Hash algorithm.
func: 'Enum_HashAlgorithm' = EnumField(length=1, namespace=Enum_HashAlgorithm, callback=byteorder_callback)
#: Hash value.
data: 'bytes' = BytesField(length=lambda pkt: pkt['length'] - 1)
#: Padding.
padding: 'bytes' = PaddingField(length=lambda pkt: (4 - pkt['length'] % 4) % 4)
if TYPE_CHECKING:
def __init__(self, type: 'Enum_OptionType', length: 'int', func: 'Enum_HashAlgorithm', data: 'bytes') -> 'None': ...
[docs]
@schema_final
class PacketBlock(BlockType, code=Enum_BlockType.Packet_Block):
"""Header schema for PCAP-NG Packet Block (obsolete)."""
__payload__ = 'packet_data'
#: Block total length.
length: 'int' = UInt32Field(callback=byteorder_callback)
#: Interface ID.
interface_id: 'int' = UInt32Field(callback=byteorder_callback)
#: Drops count.
drop_count: 'int' = UInt32Field(callback=byteorder_callback, default=0xFFFF)
#: Timestamp (high).
timestamp_high: 'int' = UInt32Field(callback=byteorder_callback)
#: Timestamp (low).
timestamp_low: 'int' = UInt32Field(callback=byteorder_callback)
#: Captured packet length.
captured_length: 'int' = UInt32Field(callback=byteorder_callback)
#: Original packet length.
original_length: 'int' = UInt32Field(callback=byteorder_callback)
#: Packet data.
packet_data: 'bytes' = PayloadField(length=lambda pkt: pkt['captured_length'])
#: Padding.
padding_data: 'bytes' = BytesField(length=lambda pkt: (4 - pkt['captured_length'] % 4) % 4)
#: Options.
options: 'list[Option]' = OptionField(
length=lambda pkt: pkt['length'] - 32 - pkt['captured_length'] - len(pkt['padding_data']),
base_schema=_PACK_Option,
type_name='type',
registry=Option.registry['pack'],
eool=Enum_OptionType.opt_endofopt,
)
#: Padding.
padding_opts: 'bytes' = PaddingField(length=lambda pkt: pkt['__option_padding__'])
#: Block total length.
length2: 'int' = UInt32Field(callback=byteorder_callback)
if TYPE_CHECKING:
def __init__(self, length: 'int', interface_id: 'int', drop_count: 'int',
timestamp_high: 'int', timestamp_low: 'int', captured_length: 'int',
original_length: 'int', packet_data: 'bytes | Protocol | Schema',
options: 'list[Option | bytes] | bytes', length2: 'int') -> 'None': ...