Source code for pcapkit.protocols.schema.internet.hopopt

# -*- coding: utf-8 -*-
# mypy: disable-error-code=assignment
"""header schema for hop-by-hop options"""

import collections
from typing import TYPE_CHECKING

from pcapkit.const.ipv6.option import Option as Enum_Option
from pcapkit.const.ipv6.qs_function import QSFunction as Enum_QSFunction
from pcapkit.const.ipv6.router_alert import RouterAlert as Enum_RouterAlert
from pcapkit.const.ipv6.seed_id import SeedID as Enum_SeedID
from pcapkit.const.ipv6.smf_dpd_mode import SMFDPDMode as Enum_SMFDPDMode
from pcapkit.const.ipv6.tagger_id import TaggerID as Enum_TaggerID
from pcapkit.const.reg.transtype import TransType as Enum_TransType
from pcapkit.corekit.fields.collections import OptionField
from pcapkit.corekit.fields.field import NoValue
from pcapkit.corekit.fields.ipaddress import IPv4AddressField, IPv6AddressField
from pcapkit.corekit.fields.misc import (ConditionalField, ForwardMatchField, NoValueField,
                                         PayloadField, SchemaField, SwitchField)
from pcapkit.corekit.fields.numbers import (EnumField, NumberField, UInt8Field, UInt16Field,
                                            UInt32Field)
from pcapkit.corekit.fields.strings import BitField, BytesField, PaddingField
from pcapkit.protocols.schema.schema import EnumSchema, Schema, schema_final
from pcapkit.utilities.exceptions import FieldValueError
from pcapkit.utilities.logging import SPHINX_TYPE_CHECKING

__all__ = [
    'HOPOPT',

    'SMFDPDOption', 'QuickStartOption',
    'UnassignedOption', 'PadOption', 'TunnelEncapsulationLimitOption',
    'RouterAlertOption', 'CALIPSOOption', 'SMFIdentificationBasedDPDOption',
    'SMFHashBasedDPDOption', 'PDMOption', 'QuickStartRequestOption',
    'QuickStartReportOption', 'RPLOption', 'MPLOption', 'ILNPOption',
    'LineIdentificationOption', 'JumboPayloadOption', 'HomeAddressOption',
    'IPDFFOption',
]

if TYPE_CHECKING:
    from ipaddress import IPv4Address, IPv6Address
    from typing import Any, DefaultDict, Optional, Type

    from pcapkit.corekit.fields.field import FieldBase as Field
    from pcapkit.protocols.protocol import ProtocolBase as Protocol

if SPHINX_TYPE_CHECKING:
    from typing_extensions import TypedDict

[docs] class TaggerIDInfo(TypedDict): """TaggerID information.""" #: SMF mode. mode: int #: TaggerID type. type: int #: TaggerID length. len: int
[docs] class QuickStartFlags(TypedDict): """Quick-Start flags.""" #: QS function. func: int #: Rate request/report. rate: int
[docs] class RPLFlags(TypedDict): """RPL flags.""" #: Down flag. down: int #: Rank error flag. rank_err: int #: Forwarding error flag. fwd_err: int
[docs] class MPLFlags(TypedDict): """MPL flags.""" #: Seed-ID type. Identifies the length of the #: Seed-ID. type: int #: Max flag. ``1`` indicates that the value in the #: sequence field is known to be the largest sequence #: number that was received from the MPL Seed. max: int #: Verification flag. ``0`` indicates that the MPL Option #: conforms to this specification. drop: int
[docs] class DFFFlags(TypedDict): """``IP_DFF`` flags.""" #: Version. ver: int #: Duplicate flag. dup: int #: Retune flag. ret: int
[docs] class SMFDPDTestFlag(TypedDict): """``SMF_DPD`` test flag.""" #: Length. len: int #: DPD mode. mode: int
[docs] class QSTestFlags(TypedDict): """Quick start test flag.""" #: QS function. func: int
[docs] class QSNonce(TypedDict): """Quick start nonce.""" #: Nonce. nonce: int
[docs] def mpl_opt_seed_id_len(pkt: 'dict[str, Any]') -> 'int': """Return MPL Seed-ID length. Args: pkt: MPL option unpacked schema. Returns: MPL Seed-ID length. """ s_type = pkt['flags']['type'] if s_type == 0: return 0 if s_type == 1: return 2 if s_type == 2: return 8 if s_type == 3: return 16 raise FieldValueError(f'HOPOPT: invalid MPL Seed-ID type: {s_type}')
[docs] def smf_dpd_data_selector(pkt: 'dict[str, Any]') -> 'Field': """Selector function for :attr:`_SMFDPDOption.data` field. Args: pkt: Packet data. Returns: * If ``mode`` is ``0``, returns a :class:`~pcapkit.corekit.fields.misc.SchemaField` wrapped :class:`~pcapkit.protocols.schema.internet.hopopt.SMFIdentificationBasedDPDOption` instance. * If ``mode`` is ``1``, returns a :class:`~pcapkit.corekit.fields.misc.SchemaField` wrapped :class:`~pcapkit.protocols.schema.internet.hopopt.SMFHashBasedDPDOption` instance. """ mode = Enum_SMFDPDMode.get(pkt['test']['mode']) schema = SMFDPDOption.registry[mode] if schema is None: raise FieldValueError(f'HOPOPT: invalid SMF DPD mode: {mode}') return SchemaField(length=pkt['test']['len'], schema=schema)
[docs] def smf_i_dpd_tid_selector(pkt: 'dict[str, Any]') -> 'Field': """Selector function for :attr:`SMFIdentificationBasedDPDOption.tid` field. Args: pkt: Packet data. Returns: * If ``tid_type`` is ``0``, returns a :class:`~pcapkit.corekit.fields.misc.NoValueField` instance. * If ``tid_type`` is ``1``, returns a :class:`~pcapkit.corekit.fields.ipaddress.IPv4AddressField` instance. * If ``tid_type`` is ``2``, returns a :class:`~pcapkit.corekit.fields.ipaddress.IPv6AddressField` instance. * Otherwise, returns a :class:`~pcapkit.corekit.fields.strings.BytesField` instance. """ tid_type = Enum_TaggerID.get(pkt['info']['type']) tid_len = pkt['info']['len'] # update type pkt['info']['type'] = tid_type if tid_type == Enum_TaggerID.NULL: if tid_len != 0: raise FieldValueError(f'HOPOPT: invalid TaggerID length: {tid_len}') return NoValueField() if tid_type == Enum_TaggerID.IPv4: if tid_len != 3: raise FieldValueError(f'HOPOPT: invalid TaggerID length: {tid_len}') return IPv4AddressField() if tid_type == Enum_TaggerID.IPv6: if tid_len != 15: raise FieldValueError(f'HOPOPT: invalid TaggerID length: {tid_len}') return IPv6AddressField() return BytesField(length=tid_len + 1)
[docs] def quick_start_data_selector(pkt: 'dict[str, Any]') -> 'Field': """Selector function for :attr:`_QuickStartOption.data` field. Args: pkt: Packet data. Returns: * If ``func`` is ``0``, returns a :class:`~pcapkit.corekit.fields.misc.SchemaField` wrapped :class:`~pcapkit.protocols.schema.internet.hopopt.QuickStartRequestOption` instance. * If ``func`` is ``8``, returns a :class:`~pcapkit.corekit.fields.misc.SchemaField` wrapped :class:`~pcapkit.protocols.schema.internet.hopopt.QuickStartReportOption` instance. """ func = Enum_QSFunction.get(pkt['flags']['func']) pkt['flags']['func'] = func schema = QuickStartOption.registry[func] if schema is None: raise FieldValueError(f'HOPOPT: invalid QS function: {func}') return SchemaField(length=5, schema=schema)
[docs] class Option(EnumSchema[Enum_Option]): """Header schema for HOPOPT options.""" __default__ = lambda: UnassignedOption #: Option type. type: 'Enum_Option' = EnumField(length=1, namespace=Enum_Option) #: Option length (conditional in case of ``Pad1`` option). len: 'int' = ConditionalField( UInt8Field(default=0), lambda pkt: pkt['type'] != Enum_Option.Pad1, )
[docs] def post_process(self, packet: 'dict[str, Any]') -> 'Schema': """Revise ``schema`` data after unpacking process. Args: packet: Unpacked data. Returns: Revised schema. """ # for Pad1 option, length is always 0 if self.type == Enum_Option.Pad1: self.len = 0 return self
[docs] @schema_final class UnassignedOption(Option): """Header schema for HOPOPT unassigned options.""" #: Option data. data: 'bytes' = BytesField(length=lambda pkt: pkt['len']) if TYPE_CHECKING: def __init__(self, type: 'Enum_Option', len: 'int', data: 'bytes') -> 'None': ...
[docs] @schema_final class PadOption(Option, code=[Enum_Option.Pad1, Enum_Option.PadN]): """Header schema for HOPOPT padding options.""" #: Padding. pad: 'bytes' = PaddingField(length=lambda pkt: pkt.get('len', 0)) if TYPE_CHECKING: def __init__(self, type: 'Enum_Option', len: 'int') -> 'None': ...
[docs] @schema_final class TunnelEncapsulationLimitOption(Option, code=Enum_Option.Tunnel_Encapsulation_Limit): """Header schema for HOPOPT tunnel encapsulation limit options.""" #: Tunnel encapsulation limit. limit: 'int' = UInt8Field() if TYPE_CHECKING: def __init__(self, type: 'Enum_Option', len: 'int', limit: 'int') -> 'None': ...
[docs] @schema_final class RouterAlertOption(Option, code=Enum_Option.Router_Alert): """Header schema for HOPOPT router alert options.""" #: Router alert. alert: 'Enum_RouterAlert' = EnumField(length=2, namespace=Enum_RouterAlert) if TYPE_CHECKING: def __init__(self, type: 'Enum_Option', len: 'int', alert: 'Enum_RouterAlert') -> 'None': ...
[docs] @schema_final class CALIPSOOption(Option, code=Enum_Option.CALIPSO): """Header schema for HOPOPT common architecture label IPv6 security options.""" #: CALIPSO domain of interpretation. domain: 'int' = UInt32Field() #: Compartment length. cmpt_len: 'int' = UInt8Field() #: Sens level. level: 'int' = UInt8Field() #: Checksum (CRC-16). checksum: 'bytes' = BytesField(length=2) #: Compartment bitmap. bitmap: 'bytes' = ConditionalField( BytesField(length=lambda pkt: pkt['cmpt_len'] * 4), lambda pkt: pkt['cmpt_len'] > 0, ) #: Padding. pad: 'bytes' = PaddingField(length=lambda pkt: pkt['len'] - 8 - pkt['cmpt_len'] * 4) if TYPE_CHECKING: def __init__(self, type: 'Enum_Option', len: 'int', domain: 'int', cmpt_len: 'int', level: 'int', checksum: 'bytes', bitmap: 'Optional[bytes]') -> 'None': ...
@schema_final class _SMFDPDOption(Schema): """Header schema for HOPOPT SMF DPD options with generic representation.""" #: SMF DPD mode. test: 'SMFDPDTestFlag' = ForwardMatchField(BitField(length=3, namespace={ 'len': (1, 8), 'mode': (16, 1), })) #: SMF DPD data. data: 'SMFIdentificationBasedDPDOption | SMFHashBasedDPDOption' = SwitchField( selector=smf_dpd_data_selector, ) def post_process(self, packet: 'dict[str, Any]') -> 'SMFDPDOption': """Revise ``schema`` data after unpacking process. Args: packet: Unpacked data. Returns: Revised schema. """ ret = self.data return ret # register ``_SMFDPDOption`` as ``SMF_DPD`` option Option.register(Enum_Option.SMF_DPD, _SMFDPDOption)
[docs] class SMFDPDOption(Option, EnumSchema[Enum_SMFDPDMode]): """Header schema for HOPOPT simplified multicast forwarding duplicate packet detection (``SMF_DPD``) options.""" __enum__: 'DefaultDict[Enum_SMFDPDMode, Type[SMFDPDOption]]' = collections.defaultdict(lambda: None) # type: ignore[arg-type,return-value] if TYPE_CHECKING: mode: 'Enum_SMFDPDMode'
[docs] @schema_final class SMFIdentificationBasedDPDOption(SMFDPDOption, code=Enum_SMFDPDMode.I_DPD): """Header schema for HOPOPT SMF identification-based DPD options.""" #: TaggerID information. info: 'TaggerIDInfo' = BitField(length=1, namespace={ 'mode': (0, 1), 'type': (1, 3), 'len': (4, 4), }) #: TaggerID. tid: 'bytes | IPv4Address | IPv6Address' = ConditionalField( SwitchField(selector=smf_i_dpd_tid_selector), lambda pkt: pkt['info']['type'] != 0, ) #: Identifier. id: 'bytes' = BytesField(length=lambda pkt: pkt['len'] - ( 1 if pkt['info']['type'] == 0 else (pkt['info']['len'] + 2) ))
[docs] def post_process(self, packet: 'dict[str, Any]') -> 'SMFIdentificationBasedDPDOption': """Revise ``schema`` data after unpacking process. Args: packet: Unpacked data. Returns: Revised schema. """ ret = super().post_process(packet) # type: SMFIdentificationBasedDPDOption ret.mode = Enum_SMFDPDMode.I_DPD return ret
if TYPE_CHECKING: def __init__(self, type: 'Enum_Option', len: 'int', info: 'TaggerIDInfo', tid: 'Optional[bytes]', id: 'bytes') -> 'None': ...
[docs] @schema_final class SMFHashBasedDPDOption(SMFDPDOption, code=Enum_SMFDPDMode.H_DPD): """Header schema for HOPOPT SMF hash-based DPD options.""" #: Hash assist value (HAV). hav: 'bytes' = BytesField(length=lambda pkt: pkt['len'])
[docs] def post_process(self, packet: 'dict[str, Any]') -> 'SMFIdentificationBasedDPDOption': """Revise ``schema`` data after unpacking process. Args: packet: Unpacked data. Returns: Revised schema. """ ret = super().post_process(packet) # type: SMFIdentificationBasedDPDOption ret.mode = Enum_SMFDPDMode.H_DPD return ret
if TYPE_CHECKING: def __init__(self, type: 'Enum_Option', len: 'int', hav: 'bytes') -> 'None': ...
[docs] @schema_final class PDMOption(Option, code=Enum_Option.PDM): """Header schema for HOPOPT performance and diagnostic metrics (PDM) options.""" #: Scale delta time last received (DTLR). scaledtlr: 'int' = UInt8Field() #: Scale delta time last sent (DTLS). scaledtls: 'int' = UInt8Field() #: Packet sequence number (PSN) this packet. psntp: 'int' = UInt16Field() #: Packet sequence number (PSN) last received. psnlr: 'int' = UInt16Field() #: Delta time last received (DTLR). deltatlr: 'int' = UInt16Field() #: Delta time last sent (DTLS). deltatls: 'int' = UInt16Field() if TYPE_CHECKING: def __init__(self, type: 'Enum_Option', len: 'int', scaledtlr: 'int', scaledtls: 'int', psntp: 'int', psnlr: 'int', deltatlr: 'int', deltatls: 'int') -> 'None': ...
[docs] @schema_final class _QuickStartOption(Schema): """Header schema for HOPOPT quick start options in generic representation.""" #: Flags. flags: 'QSTestFlags' = ForwardMatchField(BitField(length=3, namespace={ 'func': (16, 4), })) #: QS data. data: 'QuickStartRequestOption | QuickStartReportOption' = SwitchField( selector=quick_start_data_selector, )
[docs] def post_process(self, packet: 'dict[str, Any]') -> 'QuickStartOption': """Revise ``schema`` data after unpacking process. Args: packet: Unpacked data. Returns: Revised schema. """ ret = self.data ret.func = Enum_QSFunction.get(self.flags['func']) return ret
# register ``_QuickStartOption`` as ``Quick_Start`` option Option.register(Enum_Option.Quick_Start, _QuickStartOption)
[docs] class QuickStartOption(Option, EnumSchema[Enum_QSFunction]): """Header schema for HOPOPT quick start options.""" __enum__: 'DefaultDict[Enum_QSFunction, Type[QuickStartOption]]' = collections.defaultdict(lambda: None) # type: ignore[arg-type,return-value] #: Flags. flags: 'QuickStartFlags' = BitField(length=1, namespace={ 'func': (0, 4), 'rate': (4, 4), }) if TYPE_CHECKING: func: 'Enum_QSFunction'
[docs] @schema_final class QuickStartRequestOption(QuickStartOption, code=Enum_QSFunction.Quick_Start_Request): """Header schema for HOPOPT quick start request options.""" #: QS time-to-live (TTL). ttl: 'int' = UInt8Field() #: QS nonce. nonce: 'QSNonce' = BitField(length=4, namespace={ 'nonce': (0, 30), }) if TYPE_CHECKING: def __init__(self, type: 'Enum_Option', len: 'int', flags: 'QuickStartFlags', ttl: 'int', nonce: 'QSNonce') -> 'None': ...
[docs] @schema_final class QuickStartReportOption(QuickStartOption, code=Enum_QSFunction.Report_of_Approved_Rate): """Header schema for HOPOPT quick start report of approved rate options.""" #: Reserved. reserved: 'bytes' = PaddingField(length=1) #: QS nonce. nonce: 'QSNonce' = BitField(length=4, namespace={ 'nonce': (0, 30), }) if TYPE_CHECKING: def __init__(self, type: 'Enum_Option', len: 'int', flags: 'QuickStartFlags', nonce: 'QSNonce') -> 'None': ...
[docs] @schema_final class RPLOption(Option, code=[Enum_Option.RPL_Option_0x23, Enum_Option.RPL_Option_0x63]): """Header schema for HOPOPT routing protocol for low-power and lossy networks (RPL) options.""" #: Flags. flags: 'RPLFlags' = BitField(length=1, namespace={ 'down': (0, 1), 'rank_err': (1, 1), 'fwd_err': (2, 1), }) #: RPL instance ID. id: 'int' = UInt8Field() #: Sender rank. rank: 'int' = UInt16Field() if TYPE_CHECKING: def __init__(self, type: 'Enum_Option', len: 'int', flags: 'RPLFlags', id: 'int', rank: 'int') -> 'None': ...
[docs] @schema_final class MPLOption(Option, code=Enum_Option.MPL_Option): """Header schema for HOPOPT multicast protocol for low-power and lossy networks (MPL) options.""" #: Flags. flags: 'MPLFlags' = BitField(length=1, namespace={ 'type': (0, 2), 'max': (2, 1), 'drop': (3, 1), }) #: MPL sequence number. seq: 'int' = UInt8Field() #: MPL Seed-ID. seed: 'int' = ConditionalField( NumberField(length=mpl_opt_seed_id_len, signed=False), lambda pkt: pkt['flags']['type'] != Enum_SeedID.IPV6_SOURCE_ADDRESS, ) #: Reserved data (padding). pad: 'bytes' = PaddingField(length=lambda pkt: pkt['len'] - 2 - ( 0 if pkt['flags']['type'] == 0 else mpl_opt_seed_id_len(pkt) ))
[docs] def post_process(self, packet: 'dict[str, Any]') -> 'Schema': """Revise ``schema`` data after unpacking process. Args: packet: Unpacked data. Returns: Revised schema. """ if self.flags['type'] == Enum_SeedID.IPV6_SOURCE_ADDRESS: self.seed = packet.get('src', NoValue) return self
if TYPE_CHECKING: def __init__(self, type: 'Enum_Option', len: 'int', flags: 'MPLFlags', seq: 'int', seed: 'Optional[int]') -> 'None': ...
[docs] @schema_final class ILNPOption(Option, code=Enum_Option.ILNP_Nonce): """Header schema for HOPOPT identifier-locator network protocol (ILNP) options.""" #: Nonce value. nonce: 'int' = NumberField(length=lambda pkt: pkt['len'], signed=False) if TYPE_CHECKING: def __init__(self, type: 'Enum_Option', len: 'int', nonce: 'int') -> 'None': ...
[docs] @schema_final class LineIdentificationOption(Option, code=Enum_Option.Line_Identification_Option): """Header schema for HOPOPT line-identification options.""" #: Line ID length. id_len: 'int' = UInt8Field() #: Line ID. id: 'bytes' = BytesField(length=lambda pkt: pkt['id_len']) if TYPE_CHECKING: def __init__(self, type: 'Enum_Option', len: 'int', id_len: 'int', id: 'bytes') -> 'None': ...
[docs] @schema_final class JumboPayloadOption(Option, code=Enum_Option.Jumbo_Payload): """Header schema for HOPOPT jumbo payload options.""" #: Jumbo payload length. jumbo_len: 'int' = UInt32Field() if TYPE_CHECKING: def __init__(self, type: 'Enum_Option', len: 'int', jumbo_len: 'int') -> 'None': ...
[docs] @schema_final class HomeAddressOption(Option, code=Enum_Option.Home_Address): """Header schema for HOPOPT home address options.""" #: Home address. addr: 'IPv6Address' = IPv6AddressField() if TYPE_CHECKING: def __init__(self, type: 'Enum_Option', len: 'int', addr: 'IPv6Address | int | str | bytes') -> 'None': ...
[docs] @schema_final class IPDFFOption(Option, code=Enum_Option.IP_DFF): """Header schema for HOPOPT depth-first forwarding (``IP_DFF``) options.""" #: Flags. flags: 'DFFFlags' = BitField(length=1, namespace={ 'ver': (0, 2), 'dup': (2, 1), 'ret': (3, 1), }) #: Sequence number. seq: 'int' = UInt16Field() if TYPE_CHECKING: def __init__(self, type: 'Enum_Option', len: 'int', flags: 'DFFFlags', seq: 'int') -> 'None': ...
[docs] @schema_final class HOPOPT(Schema): """Header schema for HOPOPT packet.""" #: Next header. next: 'Enum_TransType' = EnumField(length=1, namespace=Enum_TransType) #: Header length. len: 'int' = UInt8Field() #: Options. options: 'list[Option]' = OptionField( length=lambda pkt: pkt['len'] * 8 + 6, base_schema=Option, type_name='type', registry=Option.registry, ) #: Payload. payload: 'bytes' = PayloadField() if TYPE_CHECKING: def __init__(self, next: 'Enum_TransType', len: 'int', options: 'bytes | list[bytes | Option]', payload: 'bytes | Protocol | Schema') -> 'None': ...