Source code for pcapkit.protocols.internet.ipv4

# -*- coding: utf-8 -*-
"""IPv4 - Internet Protocol version 4
========================================

.. module:: pcapkit.protocols.internet.ipv4

:mod:`pcapkit.protocols.internet.ipv4` contains
:class:`~pcapkit.protocols.internet.ipv4.IPv4` only,
which implements extractor for Internet Protocol
version 4 (IPv4) [*]_, whose structure is described
as below:

======= ========= ====================== =============================================
Octets      Bits        Name                    Description
======= ========= ====================== =============================================
  0           0   ``ip.version``              Version (``4``)
  0           4   ``ip.hdr_len``              Internal Header Length (IHL)
  1           8   ``ip.dsfield.dscp``         Differentiated Services Code Point (DSCP)
  1          14   ``ip.dsfield.ecn``          Explicit Congestion Notification (ECN)
  2          16   ``ip.len``                  Total Length
  4          32   ``ip.id``                   Identification
  6          48                               Reserved Bit (must be ``\\x00``)
  6          49   ``ip.flags.df``             Don't Fragment (DF)
  6          50   ``ip.flags.mf``             More Fragments (MF)
  6          51   ``ip.frag_offset``          Fragment Offset
  8          64   ``ip.ttl``                  Time To Live (TTL)
  9          72   ``ip.proto``                Protocol (Transport Layer)
  10         80   ``ip.checksum``             Header Checksum
  12         96   ``ip.src``                  Source IP Address
  16        128   ``ip.dst``                  Destination IP Address
  20        160   ``ip.options``              IP Options (if IHL > ``5``)
======= ========= ====================== =============================================

.. [*] https://en.wikipedia.org/wiki/IPv4

"""
import datetime
import ipaddress
import math
from typing import TYPE_CHECKING, cast

from pcapkit.const.ipv4.classification_level import ClassificationLevel as Enum_ClassificationLevel
from pcapkit.const.ipv4.option_class import OptionClass as Enum_OptionClass
from pcapkit.const.ipv4.option_number import OptionNumber as Enum_OptionNumber
from pcapkit.const.ipv4.protection_authority import ProtectionAuthority as Enum_ProtectionAuthority
from pcapkit.const.ipv4.qs_function import QSFunction as Enum_QSFunction
from pcapkit.const.ipv4.router_alert import RouterAlert as Enum_RouterAlert
from pcapkit.const.ipv4.tos_del import ToSDelay as Enum_ToSDelay
from pcapkit.const.ipv4.tos_ecn import ToSECN as Enum_ToSECN
from pcapkit.const.ipv4.tos_pre import ToSPrecedence as Enum_ToSPrecedence
from pcapkit.const.ipv4.tos_rel import ToSReliability as Enum_ToSReliability
from pcapkit.const.ipv4.tos_thr import ToSThroughput as Enum_ToSThroughput
from pcapkit.const.ipv4.ts_flag import TSFlag as Enum_TSFlag
from pcapkit.const.reg.transtype import TransType as Enum_TransType
from pcapkit.corekit.multidict import OrderedMultiDict
from pcapkit.protocols.data.internet.ipv4 import EOOLOption as Data_EOOLOption
from pcapkit.protocols.data.internet.ipv4 import ESECOption as Data_ESECOption
from pcapkit.protocols.data.internet.ipv4 import Flags as Data_Flags
from pcapkit.protocols.data.internet.ipv4 import IPv4 as Data_IPv4
from pcapkit.protocols.data.internet.ipv4 import LSROption as Data_LSROption
from pcapkit.protocols.data.internet.ipv4 import MTUPOption as Data_MTUPOption
from pcapkit.protocols.data.internet.ipv4 import MTUROption as Data_MTUROption
from pcapkit.protocols.data.internet.ipv4 import NOPOption as Data_NOPOption
from pcapkit.protocols.data.internet.ipv4 import OptionType as Data_OptionType
from pcapkit.protocols.data.internet.ipv4 import QSOption as Data_QSOption
from pcapkit.protocols.data.internet.ipv4 import \
    QuickStartReportOption as Data_QuickStartReportOption
from pcapkit.protocols.data.internet.ipv4 import \
    QuickStartRequestOption as Data_QuickStartRequestOption
from pcapkit.protocols.data.internet.ipv4 import RROption as Data_RROption
from pcapkit.protocols.data.internet.ipv4 import RTRALTOption as Data_RTRALTOption
from pcapkit.protocols.data.internet.ipv4 import SECOption as Data_SECOption
from pcapkit.protocols.data.internet.ipv4 import SIDOption as Data_SIDOption
from pcapkit.protocols.data.internet.ipv4 import SSROption as Data_SSROption
from pcapkit.protocols.data.internet.ipv4 import ToSField as Data_ToSField
from pcapkit.protocols.data.internet.ipv4 import TROption as Data_TROption
from pcapkit.protocols.data.internet.ipv4 import TSOption as Data_TSOption
from pcapkit.protocols.data.internet.ipv4 import UnassignedOption as Data_UnassignedOption
from pcapkit.protocols.internet.ip import IP
from pcapkit.protocols.schema.internet.ipv4 import EOOLOption as Schema_EOOLOption
from pcapkit.protocols.schema.internet.ipv4 import ESECOption as Schema_ESECOption
from pcapkit.protocols.schema.internet.ipv4 import IPv4 as Schema_IPv4
from pcapkit.protocols.schema.internet.ipv4 import LSROption as Schema_LSROption
from pcapkit.protocols.schema.internet.ipv4 import MTUPOption as Schema_MTUPOption
from pcapkit.protocols.schema.internet.ipv4 import MTUROption as Schema_MTUROption
from pcapkit.protocols.schema.internet.ipv4 import NOPOption as Schema_NOPOption
from pcapkit.protocols.schema.internet.ipv4 import QSOption as Schema_QSOption
from pcapkit.protocols.schema.internet.ipv4 import \
    QuickStartReportOption as Schema_QuickStartReportOption
from pcapkit.protocols.schema.internet.ipv4 import \
    QuickStartRequestOption as Schema_QuickStartRequestOption
from pcapkit.protocols.schema.internet.ipv4 import RROption as Schema_RROption
from pcapkit.protocols.schema.internet.ipv4 import RTRALTOption as Schema_RTRALTOption
from pcapkit.protocols.schema.internet.ipv4 import SECOption as Schema_SECOption
from pcapkit.protocols.schema.internet.ipv4 import SIDOption as Schema_SIDOption
from pcapkit.protocols.schema.internet.ipv4 import SSROption as Schema_SSROption
from pcapkit.protocols.schema.internet.ipv4 import TROption as Schema_TROption
from pcapkit.protocols.schema.internet.ipv4 import TSOption as Schema_TSOption
from pcapkit.protocols.schema.internet.ipv4 import UnassignedOption as Schema_UnassignedOption
from pcapkit.protocols.schema.schema import Schema
from pcapkit.utilities.exceptions import ProtocolError
from pcapkit.utilities.warnings import ProtocolWarning, RegistryWarning, warn

if TYPE_CHECKING:
    from datetime import timedelta
    from enum import IntEnum as StdlibEnum
    from ipaddress import IPv4Address
    from typing import Any, Callable, Optional, Type

    from aenum import IntEnum as AenumEnum
    from mypy_extensions import DefaultArg, KwArg, NamedArg
    from typing_extensions import Literal

    from pcapkit.protocols.data.internet.ipv4 import Option as Data_Option
    from pcapkit.protocols.protocol import ProtocolBase as Protocol
    from pcapkit.protocols.schema.internet.ipv4 import Option as Schema_Option

    Option = OrderedMultiDict[Enum_OptionNumber, Data_Option]
    OptionParser = Callable[[Schema_Option, NamedArg(Option, 'options')], Data_Option]
    OptionConstructor = Callable[[Enum_OptionNumber, DefaultArg(Optional[Data_Option]),
                                  KwArg(Any)], Schema_Option]

__all__ = ['IPv4']


[docs] class IPv4(IP[Data_IPv4, Schema_IPv4], schema=Schema_IPv4, data=Data_IPv4): """This class implements Internet Protocol version 4. This class currently supports parsing of the following IPv4 options, which are directly mapped to the :class:`pcapkit.const.ipv4.option_number.OptionNumber` enumeration: .. list-table:: :header-rows: 1 * - Option Code - Option Parser - Option Constructor * - :attr:`~pcapkit.const.ipv4.option_number.OptionNumber.EOOL` - :meth:`~pcapkit.protocols.internet.ipv4.IPv4._read_opt_eool` - :meth:`~pcapkit.protocols.internet.ipv4.IPv4._make_opt_eool` * - :attr:`~pcapkit.const.ipv4.option_number.OptionNumber.NOP` - :meth:`~pcapkit.protocols.internet.ipv4.IPv4._read_opt_nop` - :meth:`~pcapkit.protocols.internet.ipv4.IPv4._make_opt_nop` * - :attr:`~pcapkit.const.ipv4.option_number.OptionNumber.SEC` - :meth:`~pcapkit.protocols.internet.ipv4.IPv4._read_opt_sec` - :meth:`~pcapkit.protocols.internet.ipv4.IPv4._make_opt_sec` * - :attr:`~pcapkit.const.ipv4.option_number.OptionNumber.LSR` - :meth:`~pcapkit.protocols.internet.ipv4.IPv4._read_opt_lsr` - :meth:`~pcapkit.protocols.internet.ipv4.IPv4._make_opt_lsr` * - :attr:`~pcapkit.const.ipv4.option_number.OptionNumber.TS` - :meth:`~pcapkit.protocols.internet.ipv4.IPv4._read_opt_ts` - :meth:`~pcapkit.protocols.internet.ipv4.IPv4._make_opt_ts` * - :attr:`~pcapkit.const.ipv4.option_number.OptionNumber.E_SEC` - :meth:`~pcapkit.protocols.internet.ipv4.IPv4._read_opt_e_sec` - :meth:`~pcapkit.protocols.internet.ipv4.IPv4._make_opt_e_sec` * - :attr:`~pcapkit.const.ipv4.option_number.OptionNumber.RR` - :meth:`~pcapkit.protocols.internet.ipv4.IPv4._read_opt_rr` - :meth:`~pcapkit.protocols.internet.ipv4.IPv4._make_opt_rr` * - :attr:`~pcapkit.const.ipv4.option_number.OptionNumber.SID` - :meth:`~pcapkit.protocols.internet.ipv4.IPv4._read_opt_sid` - :meth:`~pcapkit.protocols.internet.ipv4.IPv4._make_opt_sid` * - :attr:`~pcapkit.const.ipv4.option_number.OptionNumber.SSR` - :meth:`~pcapkit.protocols.internet.ipv4.IPv4._read_opt_ssr` - :meth:`~pcapkit.protocols.internet.ipv4.IPv4._make_opt_ssr` * - :attr:`~pcapkit.const.ipv4.option_number.OptionNumber.MTUP` - :meth:`~pcapkit.protocols.internet.ipv4.IPv4._read_opt_mtup` - :meth:`~pcapkit.protocols.internet.ipv4.IPv4._make_opt_mtup` * - :attr:`~pcapkit.const.ipv4.option_number.OptionNumber.MTUR` - :meth:`~pcapkit.protocols.internet.ipv4.IPv4._read_opt_mtur` - :meth:`~pcapkit.protocols.internet.ipv4.IPv4._make_opt_mtur` * - :attr:`~pcapkit.const.ipv4.option_number.OptionNumber.TR` - :meth:`~pcapkit.protocols.internet.ipv4.IPv4._read_opt_tr` - :meth:`~pcapkit.protocols.internet.ipv4.IPv4._make_opt_tr` * - :attr:`~pcapkit.const.ipv4.option_number.OptionNumber.RTRALT` - :meth:`~pcapkit.protocols.internet.ipv4.IPv4._read_opt_rtralt` - :meth:`~pcapkit.protocols.internet.ipv4.IPv4._make_opt_rtralt` * - :attr:`~pcapkit.const.ipv4.option_number.OptionNumber.QS` - :meth:`~pcapkit.protocols.internet.ipv4.IPv4._read_opt_qs` - :meth:`~pcapkit.protocols.internet.ipv4.IPv4._make_opt_qs` """ ########################################################################## # Properties. ########################################################################## @property def name(self) -> 'Literal["Internet Protocol version 4"]': """Name of corresponding protocol.""" return 'Internet Protocol version 4' @property def length(self) -> 'int': """Header length of corresponding protocol.""" return self._info.hdr_len @property def protocol(self) -> 'Enum_TransType': """Name of next layer protocol.""" return self._info.protocol # source IP address @property def src(self) -> 'IPv4Address': """Source IP address.""" return self._info.src # destination IP address @property def dst(self) -> 'IPv4Address': """Destination IP address.""" return self._info.dst ########################################################################## # Methods. ##########################################################################
[docs] def read(self, length: 'Optional[int]' = None, **kwargs: 'Any') -> 'Data_IPv4': # pylint: disable=unused-argument """Read Internet Protocol version 4 (IPv4). Structure of IPv4 header [:rfc:`791`]: .. code-block:: text 0 1 2 3 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ |Version| IHL |Type of Service| Total Length | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | Identification |Flags| Fragment Offset | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | Time to Live | Protocol | Header Checksum | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | Source Address | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | Destination Address | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | Options | Padding | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ Args: length: Length of packet data. **kwargs: Arbitrary keyword arguments. Returns: Parsed packet data. """ if length is None: length = len(self) schema = self.__header__ if schema.vihl['version'] != 4: raise ProtocolError(f"[IPv4] invalid version: {schema.vihl['version']}") ipv4 = Data_IPv4( version=schema.vihl['version'], # type: ignore[arg-type] hdr_len=schema.vihl['ihl'] * 4, tos=Data_ToSField.from_dict({ 'pre': Enum_ToSPrecedence.get(schema.tos['pre']), 'del': Enum_ToSDelay.get(schema.tos['del']), 'thr': Enum_ToSThroughput.get(schema.tos['thr']), 'rel': Enum_ToSReliability.get(schema.tos['rel']), 'ecn': Enum_ToSECN.get(schema.tos['ecn']), }), len=schema.length, id=schema.id, flags=Data_Flags( df=bool(schema.flags['df']), mf=bool(schema.flags['mf']), ), offset=int(schema.flags['offset']) * 8, ttl=datetime.timedelta(seconds=schema.ttl), protocol=schema.proto, checksum=schema.chksum, src=schema.src, dst=schema.dst, ) _optl = ipv4.hdr_len - 20 if _optl: ipv4.__update__([ ('options', self._read_ipv4_options(_optl)), ]) return self._decode_next_layer(ipv4, ipv4.protocol, ipv4.len - ipv4.hdr_len)
[docs] def make(self, tos_pre: 'Enum_ToSPrecedence | StdlibEnum | AenumEnum | int | str' = Enum_ToSPrecedence.Routine, tos_pre_default: 'Optional[int]' = None, tos_pre_namespace: 'Optional[dict[str, int] | dict[int, str] | Type[StdlibEnum] | Type[AenumEnum]]' = None, # pylint: disable=line-too-long tos_pre_reversed: 'bool' = False, tos_del: 'Enum_ToSDelay | StdlibEnum | AenumEnum | int | str' = Enum_ToSDelay.NORMAL, tos_del_default: 'Optional[int]' = None, tos_del_namespace: 'Optional[dict[str, int] | dict[int, str] | Type[StdlibEnum] | Type[AenumEnum]]' = None, # pylint: disable=line-too-long tos_del_reversed: 'bool' = False, tos_thr: 'Enum_ToSThroughput | StdlibEnum | AenumEnum | int | str' = Enum_ToSThroughput.NORMAL, tos_thr_default: 'Optional[int]' = None, tos_thr_namespace: 'Optional[dict[str, int] | dict[int, str] | Type[StdlibEnum] | Type[AenumEnum]]' = None, # pylint: disable=line-too-long tos_thr_reversed: 'bool' = False, tos_rel: 'Enum_ToSReliability | StdlibEnum | AenumEnum | int | str' = Enum_ToSReliability.NORMAL, tos_rel_default: 'Optional[int]' = None, tos_rel_namespace: 'Optional[dict[str, int] | dict[int, str] | Type[StdlibEnum] | Type[AenumEnum]]' = None, # pylint: disable=line-too-long tos_rel_reversed: 'bool' = False, tos_ecn: 'Enum_ToSECN | StdlibEnum | AenumEnum | int | str' = Enum_ToSECN.Not_ECT, tos_ecn_default: 'Optional[int]' = None, tos_ecn_namespace: 'Optional[dict[str, int] | dict[int, str] | Type[StdlibEnum] | Type[AenumEnum]]' = None, # pylint: disable=line-too-long tos_ecn_reversed: 'bool' = False, id: 'int' = 0, df: 'bool' = False, mf: 'bool' = False, offset: 'int' = 0, ttl: 'timedelta | int' = 0, protocol: 'Enum_TransType | StdlibEnum | AenumEnum | int | str' = Enum_TransType.UDP, protocol_default: 'Optional[int]' = None, protocol_namespace: 'Optional[dict[str, int] | dict[int, str] | Type[StdlibEnum] | Type[AenumEnum]]' = None, # pylint: disable=line-too-long protocol_reversed: 'bool' = False, checksum: 'bytes' = b'\x00\x00', src: 'IPv4Address | str | int | bytes' = '127.0.0.1', dst: 'IPv4Address | str | int | bytes' = '0.0.0.0', # nosec: B104 options: 'Optional[list[Schema_Option | tuple[Enum_OptionNumber, dict[str, Any]] | bytes] | Option]' = None, # pylint: disable=line-too-long payload: 'bytes | Protocol | Schema' = b'', **kwargs: 'Any') -> 'Schema_IPv4': """Make (construct) packet data. Args: tos_pre: Precedence of the packet. tos_pre_default: Default value of ``tos_pre``. tos_pre_namespace: Namespace of ``tos_pre``. tos_pre_reversed: If the namespace of ``tos_pre`` is reversed. tos_del: Delay of the packet. tos_del_default: Default value of ``tos_del``. tos_del_namespace: Namespace of ``tos_del``. tos_del_reversed: If the namespace of ``tos_del`` is reversed. tos_thr: Throughput of the packet. tos_thr_default: Default value of ``tos_thr``. tos_thr_namespace: Namespace of ``tos_thr``. tos_thr_reversed: If the namespace of ``tos_thr`` is reversed. tos_rel: Reliability of the packet. tos_rel_default: Default value of ``tos_rel``. tos_rel_namespace: Namespace of ``tos_rel``. tos_rel_reversed: If the namespace of ``tos_rel`` is reversed. tos_ecn: ECN of the packet. tos_ecn_default: Default value of ``tos_ecn``. tos_ecn_namespace: Namespace of ``tos_ecn``. tos_ecn_reversed: If the namespace of ``tos_ecn`` is reversed. id: Identification of the packet. df: Don't fragment flag. mf: More fragments flag. offset: Fragment offset. ttl: Time to live of the packet. protocol: Payload protocol of the packet. protocol_default: Default value of ``protocol``. protocol_namespace: Namespace of ``protocol``. protocol_reversed: If the namespace of ``protocol`` is reversed. checksum: Checksum of the packet. src: Source address of the packet. dst: Destination address of the packet. options: Options of the packet. payload: Payload of the packet. **kwargs: Arbitrary keyword arguments. Returns: Constructed packet data. """ tos_pre_val = self._make_index(tos_pre, tos_pre_default, namespace=tos_pre_namespace, reversed=tos_pre_reversed, pack=False) tos_del_val = self._make_index(tos_del, tos_del_default, namespace=tos_del_namespace, reversed=tos_del_reversed, pack=False) tos_thr_val = self._make_index(tos_thr, tos_thr_default, namespace=tos_thr_namespace, reversed=tos_thr_reversed, pack=False) tos_rel_val = self._make_index(tos_rel, tos_rel_default, namespace=tos_rel_namespace, reversed=tos_rel_reversed, pack=False) tos_ecn_val = self._make_index(tos_ecn, tos_ecn_default, namespace=tos_ecn_namespace, reversed=tos_ecn_reversed, pack=False) proto = self._make_index(protocol, protocol_default, namespace=protocol_namespace, reversed=protocol_reversed, pack=False) ttl_val = ttl if isinstance(ttl, int) else math.ceil(ttl.total_seconds()) if options is not None: options_value, total_length = self._make_ipv4_options(options) else: options_value, total_length = [], 0 ihl = 5 + math.ceil(total_length / 4) len = ihl * 4 + len(payload) return Schema_IPv4( vihl={ 'version': 4, 'ihl': ihl, }, tos={ 'pre': tos_pre_val, 'del': tos_del_val, 'thr': tos_thr_val, 'rel': tos_rel_val, 'ecn': tos_ecn_val, }, length=len, id=id, flags={ 'df': df, 'mf': mf, 'offset': offset, }, ttl=ttl_val, proto=proto, # type: ignore[arg-type] chksum=checksum, src=src, dst=dst, options=options_value, payload=payload, )
[docs] @classmethod def id(cls) -> 'tuple[Literal["IPv4"]]': # type: ignore[override] """Index ID of the protocol. Returns: Index ID of the protocol. """ return ('IPv4',)
[docs] @classmethod def register_option(cls, code: 'Enum_OptionNumber', meth: 'str | tuple[OptionParser, OptionConstructor]') -> 'None': """Register an option parser. Args: code: IPv4 option code. meth: Method name or callable to parse and/or construct the option. """ name = code.name.lower() if hasattr(cls, f'_read_opt_{name}'): warn(f'option {code} already registered, overwriting', RegistryWarning) if isinstance(meth, str): meth = (getattr(cls, f'_read_opt_{meth}', cls._read_opt_unassigned), # type: ignore[arg-type] getattr(cls, f'_make_opt_{meth}', cls._make_opt_unassigned)) # type: ignore[arg-type] setattr(cls, f'_read_opt_{name}', meth[0]) setattr(cls, f'_make_opt_{name}', meth[1])
########################################################################## # Data models. ########################################################################## def __length_hint__(self) -> 'Literal[20]': """Return an estimated length for the object.""" return 20
[docs] @classmethod def __index__(cls) -> 'Enum_TransType': # pylint: disable=invalid-index-returned """Numeral registry index of the protocol. Returns: Numeral registry index of the protocol in `IANA`_. .. _IANA: https://www.iana.org/assignments/protocol-numbers/protocol-numbers.xhtml """ return Enum_TransType.IPv4 # type: ignore[return-value]
########################################################################## # Utilities. ##########################################################################
[docs] @classmethod def _make_data(cls, data: 'Data_IPv4') -> 'dict[str, Any]': # type: ignore[override] """Create key-value pairs from ``data`` for protocol construction. Args: data: protocol data Returns: Key-value pairs for protocol construction. """ return { 'tos_pre': data.tos.pre, 'tos_del': data.tos['del'], 'tos_thr': data.tos.thr, 'tos_rel': data.tos.rel, 'tos_ecn': data.tos.ecn, 'id': data.id, 'df': data.flags.df, 'mf': data.flags.mf, 'offset': data.offset, 'ttl': data.ttl, 'protocol': data.protocol, 'checksum': data.checksum, 'src': data.src, 'dst': data.dst, 'options': data.options, 'payload': cls._make_payload(data), }
def _read_ipv4_addr(self) -> 'IPv4Address': """Read IP address. Returns: Parsed IP address. """ _byte = self._read_fileng(4) # _addr = '.'.join([str(_) for _ in _byte]) # return _addr return ipaddress.ip_address(_byte) # type: ignore[return-value] def _read_ipv4_opt_type(self, code: 'int') -> 'Data_OptionType': """Read option type field. Arguments: code: option kind value Returns: Extracted IPv4 option type, as an object of the option flag (copied flag), option class, and option number. """ oflg = bool(code >> 7) ocls = Enum_OptionClass.get((code >> 5) & 0b11) onum = code & 0b11111 return Data_OptionType.from_dict({ 'change': oflg, 'class': ocls, 'number': onum, })
[docs] def _read_ipv4_options(self, length: 'int') -> 'Option': """Read IPv4 option list. Arguments: length: length of options Returns: Extracted IPv4 options. Raises: ProtocolError: If the threshold is **NOT** matching. """ counter = 0 # length of read option list options = OrderedMultiDict() # type: Option for schema in self.__header__.options: kind = schema.type name = kind.name.lower() meth_name = f'_read_opt_{name}' meth = cast('OptionParser', getattr(self, meth_name, self._read_opt_unassigned)) data = meth(schema, options=options) # record option data counter += data.length options.add(kind, data) # break when End of Option List (EOOL) triggered if kind == Enum_OptionNumber.EOOL: break # check threshold if counter > length: raise ProtocolError(f'IPv4: invalid format') return options
[docs] def _read_opt_unassigned(self, schema: 'Schema_UnassignedOption', *, options: 'Option') -> 'Data_UnassignedOption': # pylint: disable=unused-argument """Read IPv4 unassigned options. Structure of IPv4 unassigned options [:rfc:`791`]: .. code-block:: text 0 1 2 3 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | type | length | option data ... +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ Arguments: schema: parsed option schema options: extracted IPv4 options Returns: Parsed option data. Raises: ProtocolError: If ``length`` is **LESS THAN** ``3``. """ if schema.length < 3: raise ProtocolError(f'{self.alias}: [OptNo {schema.type}] invalid format') opt = Data_UnassignedOption( code=schema.type, type=self._read_ipv4_opt_type(schema.type), length=schema.length, data=schema.data, ) return opt
[docs] def _read_opt_eool(self, schema: 'Schema_EOOLOption', *, options: 'Option') -> 'Data_EOOLOption': # pylint: disable=unused-argument """Read IPv4 End of Option List (``EOOL``) option. Structure of IPv4 End of Option List (``EOOL``) option [:rfc:`719`]: .. code-block:: text +--------+ |00000000| +--------+ Type=0 Arguments: schema: parsed option schema options: extracted IPv4 options Returns: Parsed option data. """ opt = Data_EOOLOption( code=schema.type, type=self._read_ipv4_opt_type(schema.type), length=1, ) return opt
[docs] def _read_opt_nop(self, schema: 'Schema_NOPOption', *, options: 'Option') -> 'Data_NOPOption': # pylint: disable=unused-argument """Read IPv4 No Operation (``NOP``) option. Structure of IPv4 No Operation (``NOP``) option [:rfc:`719`]: .. code-block:: text +--------+ |00000001| +--------+ Type=1 Arguments: schema: parsed option schema options: extracted IPv4 options Returns: Parsed option data. """ opt = Data_NOPOption( code=schema.type, type=self._read_ipv4_opt_type(schema.type), length=1, ) return opt
[docs] def _read_opt_sec(self, schema: 'Schema_SECOption', *, options: 'Option') -> 'Data_SECOption': # pylint: disable=unused-argument """Read IPv4 Security (``SEC``) option. Structure of IPv4 Security (``SEC``) option [:rfc:`1108`]: .. code-block:: text +------------+------------+------------+-------------//----------+ | 10000010 | XXXXXXXX | SSSSSSSS | AAAAAAA[1] AAAAAAA0 | | | | | [0] | +------------+------------+------------+-------------//----------+ TYPE = 130 LENGTH CLASSIFICATION PROTECTION LEVEL AUTHORITY FLAGS Arguments: schema: parsed option schema options: extracted IPv4 options Returns: Parsed option data. Raises: ProtocolError: If ``length`` is **LESS THAN** ``3``. """ if schema.length < 3: raise ProtocolError(f'{self.alias}: [OptNo {schema.type}] invalid format') if schema.length > 3: flags = [] # type: list[Enum_ProtectionAuthority] for base, byte in enumerate(schema.data): for bit in range(7): authority = Enum_ProtectionAuthority.get(base * 8 + bit) if byte & (0x80 >> bit): if 'Unassigned' in authority.name: warn(f'{self.alias}: [OptNo {schema.type}] invalid format: unknown protection authority: {authority}', ProtocolWarning) flags.append(authority) if byte & 0x01 == 1 and base < schema.length - 4: #raise ProtocolError(f'{self.alias}: [OptNo {kind}] invalid format: remaining data') warn(f'{self.alias}: [OptNo {schema.type}] invalid format: remaining data', ProtocolWarning) if schema.data[-1] & 0x01 == 0: warn(f'{self.alias}: [OptNo {schema.type}] invalid format: field termination indicator not set', ProtocolWarning) else: flags = [] opt = Data_SECOption( code=schema.type, type=self._read_ipv4_opt_type(schema.type), length=schema.length, level=schema.level, flags=tuple(flags), ) return opt
[docs] def _read_opt_lsr(self, schema: 'Schema_LSROption', *, options: 'Option') -> 'Data_LSROption': # pylint: disable=unused-argument """Read IPv4 Loose Source Route (``LSR``) option. Structure of IPv4 Loose Source Route (``LSR``) option [:rfc:`791`]: .. code-block:: text +--------+--------+--------+---------//--------+ |10000011| length | pointer| route data | +--------+--------+--------+---------//--------+ Arguments: schema: parsed option schema options: extracted IPv4 options Returns: Parsed option data. Raises: ProtocolError: If option is malformed. """ if schema.length < 3 or (schema.length - 3) % 4 != 0: raise ProtocolError(f'{self.alias}: [OptNo {schema.type}] invalid format') if schema.pointer < 4: raise ProtocolError(f'{self.alias}: [OptNo {schema.type}] invalid format: pointer too small: {schema.pointer}') opt = Data_LSROption( code=schema.type, type=self._read_ipv4_opt_type(schema.type), length=schema.length, pointer=schema.pointer, route=tuple(schema.route), ) return opt
[docs] def _read_opt_ts(self, schema: 'Schema_TSOption', *, options: 'Option') -> 'Data_TSOption': # pylint: disable=unused-argument """Read IPv4 Time Stamp (``TS``) option. Structure of IPv4 Time Stamp (``TS``) option [:rfc:`791`]: .. code-block:: text +--------+--------+--------+--------+ |01000100| length | pointer|oflw|flg| +--------+--------+--------+--------+ | internet address | +--------+--------+--------+--------+ | timestamp | +--------+--------+--------+--------+ | . | . . Arguments: schema: parsed option schema options: extracted IPv4 options Returns: Parsed option data. Raises: ProtocolError: If the option is malformed. """ if schema.length > 40 or schema.length < 4: raise ProtocolError(f'{self.alias}: [OptNo {schema.type}] invalid format') if schema.pointer < 5: raise ProtocolError(f'{self.alias}: [OptNo {schema.type}] invalid format: pointer too small: {schema.pointer}') opt = Data_TSOption( code=schema.type, type=self._read_ipv4_opt_type(schema.type), length=schema.length, pointer=schema.pointer, overflow=schema.flags['oflw'], flag=schema.ts_flag, timestamp=schema.timestamp, ) return opt
[docs] def _read_opt_e_sec(self, schema: 'Schema_ESECOption', *, options: 'Option') -> 'Data_ESECOption': # pylint: disable=unused-argument """Read IPv4 Extended Security (``E-SEC``) option. Structure of IPv4 Extended Security (``E-SEC``) option [:rfc:`1108`]: .. code-block:: text +------------+------------+------------+-------//-------+ | 10000101 | 000LLLLL | AAAAAAAA | add sec info | +------------+------------+------------+-------//-------+ TYPE = 133 LENGTH ADDITIONAL ADDITIONAL SECURITY INFO SECURITY FORMAT CODE INFO Arguments: schema: parsed option schema options: extracted IPv4 options Returns: Parsed option data. Raises: ProtocolError: If ``length`` is **LESS THAN** ``3``. """ if schema.length < 3: raise ProtocolError(f'{self.alias}: [OptNo {schema.type}] invalid format') opt = Data_ESECOption( code=schema.type, type=self._read_ipv4_opt_type(schema.type), length=schema.length, format=schema.format, info=schema.info, ) return opt
[docs] def _read_opt_rr(self, schema: 'Schema_RROption', *, options: 'Option') -> 'Data_RROption': # pylint: disable=unused-argument """Read IPv4 Record Route (``RR``) option. Structure of IPv4 Record Route (``RR``) option [:rfc:`791`]: .. code-block:: text +--------+--------+--------+---------//--------+ |00000111| length | pointer| route data | +--------+--------+--------+---------//--------+ Type=7 Arguments: schema: parsed option schema options: extracted IPv4 options Returns: Parsed option data. Raises: ProtocolError: If option is malformed. """ if schema.length < 3 or (schema.length - 3) % 4 != 0: raise ProtocolError(f'{self.alias}: [OptNo {schema.type}] invalid format') if schema.pointer < 4: raise ProtocolError(f'{self.alias}: [OptNo {schema.type}] invalid format: pointer too small: {schema.pointer}') opt = Data_RROption( code=schema.type, type=self._read_ipv4_opt_type(schema.type), length=schema.length, pointer=schema.pointer, route=tuple(schema.route), ) return opt
[docs] def _read_opt_sid(self, schema: 'Schema_SIDOption', *, options: 'Option') -> 'Data_SIDOption': # pylint: disable=unused-argument """Read IPv4 Stream ID (``SID``) option. Structure of IPv4 Stream ID (``SID``) option [:rfc:`791`][:rfc:`6814`]: .. code-block:: text +--------+--------+--------+--------+ |10001000|00000010| Stream ID | +--------+--------+--------+--------+ Type=136 Length=4 Arguments: schema: parsed option schema options: extracted IPv4 options Returns: Parsed option data. Raises: ProtocolError: If ``length`` is **NOT** ``4``. """ if schema.length != 4: raise ProtocolError(f'{self.alias}: [OptNo {schema.type}] invalid format') opt = Data_SIDOption( code=schema.type, type=self._read_ipv4_opt_type(schema.type), length=schema.length, sid=schema.sid, ) return opt
[docs] def _read_opt_ssr(self, schema: 'Schema_SSROption', *, options: 'Option') -> 'Data_SSROption': # pylint: disable=unused-argument """Read IPv4 Strict Source Route (``SSR``) option. Structure of IPv4 Strict Source Route (``SSR``) option [:rfc:`791`]: .. code-block:: text +--------+--------+--------+---------//--------+ |10001001| length | pointer| route data | +--------+--------+--------+---------//--------+ Type=137 Arguments: schema: parsed option schema options: extracted IPv4 options Returns: Parsed option data. Raises: ProtocolError: If option is malformed. """ if schema.length < 3 or (schema.length - 3) % 4 != 0: raise ProtocolError(f'{self.alias}: [OptNo {schema.type}] invalid format') if schema.pointer < 4: raise ProtocolError(f'{self.alias}: [OptNo {schema.type}] invalid format: pointer too small: {schema.pointer}') opt = Data_SSROption( code=schema.type, type=self._read_ipv4_opt_type(schema.type), length=schema.length, pointer=schema.pointer, route=tuple(schema.route), ) return opt
[docs] def _read_opt_mtup(self, schema: 'Schema_MTUPOption', *, options: 'Option') -> 'Data_MTUPOption': # pylint: disable=unused-argument """Read IPv4 MTU Probe (``MTUP``) option. Structure of IPv4 MTU Probe (``MTUP``) option [:rfc:`1063`][:rfc:`1191`]: .. code-block:: text +--------+--------+--------+--------+ |00001011|00000100| 2 octet value | +--------+--------+--------+--------+ Arguments: schema: parsed option schema options: extracted IPv4 options Returns: Parsed option data. Raises: ProtocolError: If ``length`` is **NOT** ``4``. """ if schema.length != 4: raise ProtocolError(f'{self.alias}: [OptNo {schema.type}] invalid format') opt = Data_MTUPOption( code=schema.type, type=self._read_ipv4_opt_type(schema.type), length=schema.length, mtu=schema.mtu, ) return opt
[docs] def _read_opt_mtur(self, schema: 'Schema_MTUROption', *, options: 'Option') -> 'Data_MTUROption': # pylint: disable=unused-argument """Read IPv4 MTU Reply (``MTUR``) option. Structure of IPv4 MTU Reply (``MTUR``) option [:rfc:`1063`][:rfc:`1191`]: .. code-block:: text +--------+--------+--------+--------+ |00001100|00000100| 2 octet value | +--------+--------+--------+--------+ Arguments: schema: parsed option schema options: extracted IPv4 options Returns: Parsed option data. Raises: ProtocolError: If ``length`` is **NOT** ``4``. """ if schema.length != 4: raise ProtocolError(f'{self.alias}: [OptNo {schema.type}] invalid format') opt = Data_MTUROption( code=schema.type, type=self._read_ipv4_opt_type(schema.type), length=schema.length, mtu=schema.mtu, ) return opt
[docs] def _read_opt_tr(self, schema: 'Schema_TROption', *, options: 'Option') -> 'Data_TROption': # pylint: disable=unused-argument """Read IPv4 Traceroute (``TR``) option. Structure of IPv4 Traceroute (``TR``) option [:rfc:`1393`][:rfc:`6814`]: .. code-block:: text 0 8 16 24 +-+-+-+-+-+-+-+-+---------------+---------------+---------------+ |F| C | Number | Length | ID Number | +-+-+-+-+-+-+-+-+---------------+---------------+---------------+ | Outbound Hop Count | Return Hop Count | +---------------+---------------+---------------+---------------+ | Originator IP Address | +---------------+---------------+---------------+---------------+ Arguments: schema: parsed option schema options: extracted IPv4 options Returns: Parsed option data. Raises: ProtocolError: If ``length`` is **NOT** ``12``. """ if schema.length != 12: raise ProtocolError(f'{self.alias}: [OptNo {schema.type}] invalid format') opt = Data_TROption.from_dict({ 'code': schema.type, 'type': self._read_ipv4_opt_type(schema.type), 'length': schema.length, 'id': schema.id, 'outbound': schema.out, 'return': schema.ret, 'originator': schema.origin, }) return opt
[docs] def _read_opt_rtralt(self, schema: 'Schema_RTRALTOption', *, options: 'Option') -> 'Data_RTRALTOption': # pylint: disable=unused-argument """Read IPv4 Router Alert (``RTRALT``) option. Structure of IPv4 Router Alert (``RTRALT``) option [:rfc:`2113`]: .. code:: text +--------+--------+--------+--------+ |10010100|00000100| 2 octet value | +--------+--------+--------+--------+ Arguments: schema: parsed option schema options: extracted IPv4 options Returns: Parsed option data. Raises: ProtocolError: If ``length`` is **NOT** ``4``. """ if schema.length != 4: raise ProtocolError(f'{self.alias}: [OptNo {schema.type}] invalid format') opt = Data_RTRALTOption( code=schema.type, type=self._read_ipv4_opt_type(schema.type), length=schema.length, alert=schema.alert, ) return opt
[docs] def _read_opt_qs(self, schema: 'Schema_QSOption', *, options: 'Option') -> 'Data_QSOption': # pylint: disable=unused-argument """Read IPv4 Quick Start (``QS``) option. Structure of IPv4 Quick Start (``QS``) option [:rfc:`4782`]: * A Quick-Start Request .. code-block:: text 0 1 2 3 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | Option | Length=8 | Func. | Rate | QS TTL | | | | 0000 |Request| | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | QS Nonce | R | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ * Report of Approved Rate .. code-block:: text 0 1 2 3 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | Option | Length=8 | Func. | Rate | Not Used | | | | 1000 | Report| | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | QS Nonce | R | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ Arguments: schema: parsed option schema options: extracted IPv4 options Returns: Parsed option data. Raises: ProtocolError: If the option is malformed. """ if schema.length != 8: raise ProtocolError(f'{self.alias}: [OptNo {schema.type}] invalid format') func = schema.func if func == Enum_QSFunction.Quick_Start_Request: schema_req = cast('Schema_QuickStartRequestOption', schema) rate = schema_req.flags['rate'] opt = Data_QuickStartRequestOption( code=schema.type, type=self._read_ipv4_opt_type(schema.type), length=schema_req.length, func=func, rate=40000 * (2 ** rate) / 1000 if rate > 0 else 0, ttl=datetime.timedelta(seconds=schema_req.ttl), nonce=schema_req.nonce['nonce'], ) # type: Data_QSOption elif func == Enum_QSFunction.Report_of_Approved_Rate: schema_rep = cast('Schema_QuickStartReportOption', schema) rate = schema_rep.flags['rate'] opt = Data_QuickStartReportOption( code=schema.type, type=self._read_ipv4_opt_type(schema.type), length=schema_rep.length, func=func, rate=40000 * (2 ** rate) / 1000 if rate > 0 else 0, nonce=schema_rep.nonce['nonce'], ) else: raise ProtocolError(f'{self.alias}: [OptNo {schema.type}] unknown QS function: {func}') return opt
[docs] def _make_ipv4_options(self, options: 'list[Schema_Option | tuple[Enum_OptionNumber, dict[str, Any]] | bytes] | Option') -> 'tuple[list[Schema_Option | bytes], int]': """Make options for IPv4. Args: option: IPv4 options Returns: Tuple of options and total length of options. """ total_length = 0 if isinstance(options, list): options_list = [] # type: list[Schema_Option | bytes] for schema in options: if isinstance(schema, bytes): code = Enum_OptionNumber.get(schema[0]) if code in (Enum_OptionNumber.NOP, Enum_OptionNumber.EOOL): # ignore padding options by default continue data = schema # type: Schema_Option | bytes data_len = len(data) elif isinstance(schema, Schema): code = schema.type if code in (Enum_OptionNumber.NOP, Enum_OptionNumber.EOOL): # ignore padding options by default continue data = schema data_len = len(schema.pack()) else: code, args = cast('tuple[Enum_OptionNumber, dict[str, Any]]', schema) if code in (Enum_OptionNumber.NOP, Enum_OptionNumber.EOOL): # ignore padding options by default continue name = f'_make_opt_{code.name.lower()}' meth = cast('OptionConstructor', getattr(self, name, self._make_opt_unassigned)) data = meth(code, **args) data_len = len(data.pack()) options_list.append(data) total_length += data_len # force alignment to 32-bit boundary if data_len % 4: pad_len = 4 - (data_len % 4) pad_opt = self._make_opt_nop(Enum_OptionNumber.NOP) # type: ignore[arg-type] total_length += pad_len for _ in range(pad_len - 1): options_list.append(pad_opt) options_list.append(Enum_OptionNumber.EOOL) # type: ignore[arg-type] return options_list, total_length options_list = [] for code, option in options.items(multi=True): # ignore padding options by default if code in (Enum_OptionNumber.NOP, Enum_OptionNumber.EOOL): continue name = f'_make_opt_{code.name.lower()}' meth = cast('OptionConstructor', getattr(self, name, self._make_opt_unassigned)) data = meth(code, option) data_len = len(data.pack()) options_list.append(data) total_length += data_len # force alignment to 32-bit boundary if data_len % 4: pad_len = 4 - (data_len % 4) pad_opt = self._make_opt_nop(Enum_OptionNumber.NOP) # type: ignore[arg-type] total_length += pad_len for _ in range(pad_len - 1): options_list.append(pad_opt) options_list.append(Enum_OptionNumber.EOOL) # type: ignore[arg-type] return options_list, total_length
[docs] def _make_opt_unassigned(self, kind: 'Enum_OptionNumber', option: 'Optional[Data_UnassignedOption]' = None, *, data: 'bytes', **kwargs: 'Any') -> 'Schema_Option': """Make IPv4 unassigned options. Args: kind: option type code option: option data data: option payload **kwargs: arbitrary keyword arguments Returns: Constructured option schema. """ if option is not None: data = option.data return Schema_UnassignedOption( type=kind, length=len(data), data=data, )
[docs] def _make_opt_eool(self, kind: 'Enum_OptionNumber', option: 'Optional[Data_EOOLOption]' = None, **kwargs: 'Any') -> 'Schema_EOOLOption': """Make IPv4 End of Option List (``EOOL``) option. Args: kind: option type code option: option data **kwargs: arbitrary keyword arguments Returns: Constructured option schema. """ return Schema_EOOLOption( type=kind, length=1, )
[docs] def _make_opt_nop(self, kind: 'Enum_OptionNumber', option: 'Optional[Data_NOPOption]' = None, **kwargs: 'Any') -> 'Schema_NOPOption': """Make IPv4 No Operation (``NOP``) option. Args: kind: option type code option: option data **kwargs: arbitrary keyword arguments Returns: Constructured option schema. """ return Schema_NOPOption( type=kind, length=1, )
[docs] def _make_opt_sec(self, kind: 'Enum_OptionNumber', option: 'Optional[Data_SECOption]' = None, *, level: 'Enum_ClassificationLevel | StdlibEnum | AenumEnum | int | str' = Enum_ClassificationLevel.Unclassified, level_default: 'Optional[int]' = None, level_namespace: 'Optional[dict[str, int] | dict[int, str] | Type[StdlibEnum] | Type[AenumEnum]]' = None, # pylint: disable=line-too-long level_reversed: 'bool' = False, authorities: 'Optional[list[Enum_ProtectionAuthority]]' = None, **kwargs: 'Any') -> 'Schema_SECOption': """Make IPv4 Security (``SEC``) option. Args: kind: option type code option: option data sec: security option **kwargs: arbitrary keyword arguments Returns: Constructured option schema. """ if option is not None: level_val = option.level authorities = cast('list[Enum_ProtectionAuthority]', option.flags) else: level_val = self._make_index(level, level_default, namespace=level_namespace, # type: ignore[assignment] reversed=level_reversed, pack=False) authorities = [] if authorities is None else authorities if authorities: max_auth = max(authorities) int_len = math.ceil(max_auth / 8) data_list = [b'0' for _ in range(int_len * 8)] for auth in authorities: data_list[auth] = b'1' data = int(b''.join(data_list), base=2).to_bytes(int_len, 'big', signed=False) else: data = b'' return Schema_SECOption( type=kind, length=3 + len(data), level=level_val, data=data, )
[docs] def _make_opt_lsr(self, kind: 'Enum_OptionNumber', option: 'Optional[Data_LSROption]' = None, *, counts: 'int' = 10, # reasonable default route: 'Optional[list[IPv4Address | str | bytes | int]]' = None, **kwargs: 'Any') -> 'Schema_LSROption': """Make IPv4 Loose Source and Record Route (``LSR``) option. Args: kind: option type code option: option data counts: maximum number of addresses to record route: list of IPv4 addresses as recorded routes **kwargs: arbitrary keyword arguments Returns: Constructured option schema. """ if option is not None: route = cast('list[IPv4Address | str | bytes | int]', option.route) pointer = option.pointer length = option.length else: route = [] if route is None else route length = 3 + counts * 4 pointer = 4 + min(len(route), counts) * 4 return Schema_LSROption( type=kind, length=length, pointer=pointer, route=route, )
[docs] def _make_opt_ts(self, kind: 'Enum_OptionNumber', option: 'Optional[Data_TSOption]' = None, *, counts: 'int' = 5, overflow: 'int' = 0, timestamp: 'Optional[list[int | timedelta] | dict[IPv4Address, int | timedelta]]' = None, **kwargs: 'Any') -> 'Schema_TSOption': """Make IPv4 Timestamp (``TS``) option. Args: kind: option type code option: option data counts: maximum number of timestamps to record timestamp: list of timestamps **kwargs: arbitrary keyword arguments Returns: Constructured option schema. """ if option is not None: ts_list = [] # type: list[int] if isinstance(option.timestamp, tuple): for ts in option.timestamp: if not isinstance(ts, int): ts = math.floor(ts.total_seconds() * 1000) if ts.bit_length() > 31: warn(f'{self.alias}: [OptNo {kind}] timestamp value is too large: {ts}', ProtocolWarning) ts = ts | 0x80000000 ts_list.append(ts) else: for ip, ts in option.timestamp.items(True): ts_list.append(int(ip)) if not isinstance(ts, int): ts = math.floor(ts.total_seconds() * 1000) if ts.bit_length() > 31: warn(f'{self.alias}: [OptNo {kind}] timestamp value is too large: {ts}', ProtocolWarning) ts = ts | 0x80000000 ts_list.append(ts) length = option.length pointer = option.pointer overflow = option.overflow flag = option.flag else: ts_list = [] if isinstance(timestamp, list): flag = Enum_TSFlag.Timestamp_Only # type: ignore[assignment] counts = min(9, counts) # 9 is the maximum number of timestamps length = 4 + counts * 4 for index, ts in enumerate(timestamp): if index >= counts: warn(f'{self.alias}: [OptNo {kind}] too many timestamps: {len(timestamp)}', ProtocolWarning) break if not isinstance(ts, int): ts = math.floor(ts.total_seconds() * 1000) if ts.bit_length() > 31: warn(f'{self.alias}: [OptNo {kind}] timestamp value is too large: {ts}', ProtocolWarning) ts = ts | 0x80000000 ts_list.append(ts) elif isinstance(timestamp, dict): flag = Enum_TSFlag.IP_with_Timestamp # type: ignore[assignment] counts = min(4, counts) # 4 is the maximum number of timestamps length = 4 + counts * 8 for index, (ip, ts) in enumerate(timestamp.items()): if index >= counts: warn(f'{self.alias}: [OptNo {kind}] too many timestamps: {len(timestamp)}', ProtocolWarning) break ts_list.append(int(ip)) if not isinstance(ts, int): ts = math.floor(ts.total_seconds() * 1000) if ts == 0: flag = Enum_TSFlag.Prespecified_IP_with_Timestamp # type: ignore[assignment] if ts.bit_length() > 31: warn(f'{self.alias}: [OptNo {kind}] timestamp value is too large: {ts}', ProtocolWarning) ts = ts | 0x80000000 ts_list.append(ts) else: raise ProtocolError(f'{self.alias}: [OptNo {kind}] invalid timestamp value: {timestamp}') pointer = 5 + len(ts_list) * 4 return Schema_TSOption( type=kind, length=length, pointer=pointer, flags={ 'oflw': overflow, 'flag': flag, }, data=ts_list, )
[docs] def _make_opt_e_sec(self, kind: 'Enum_OptionNumber', option: 'Optional[Data_ESECOption]' = None, *, format: 'int' = 0, info: 'Optional[bytes]' = None, **kwargs: 'Any') -> 'Schema_ESECOption': """Make IPv4 Extended Security (``E-SEC``) option. Args: kind: option type code option: option data format: additional security information format code info: additional security information **kwargs: arbitrary keyword arguments Returns: Constructured option schema. """ if option is not None: length = option.length format = option.format info = option.info else: length = (3 + len(info)) if info is not None else 3 return Schema_ESECOption( type=kind, length=length, format=format, info=info, )
[docs] def _make_opt_rr(self, kind: 'Enum_OptionNumber', option: 'Optional[Data_RROption]' = None, *, counts: 'int' = 10, # reasonable default route: 'Optional[list[IPv4Address | str | bytes | int]]' = None, **kwargs: 'Any') -> 'Schema_RROption': """Make IPv4 Record Route (``RR``) option. Args: kind: option type code option: option data counts: maximum number of addresses to record route: list of IPv4 addresses as recorded routes **kwargs: arbitrary keyword arguments Returns: Constructured option schema. """ if option is not None: route = cast('list[IPv4Address | str | bytes | int]', option.route) pointer = option.pointer length = option.length else: route = [] if route is None else route length = 3 + counts * 4 pointer = 4 + min(len(route), counts) * 4 return Schema_RROption( type=kind, length=length, pointer=pointer, route=route, )
[docs] def _make_opt_sid(self, kind: 'Enum_OptionNumber', option: 'Optional[Data_SIDOption]' = None, *, sid: 'int' = 0, **kwargs: 'Any') -> 'Schema_SIDOption': """Make IPv4 Stream ID (``SID``) option. Args: kind: option type code option: option data sid: stream ID **kwargs: arbitrary keyword arguments Returns: Constructured option schema. """ if option is not None: sid = option.sid return Schema_SIDOption( type=kind, length=4, sid=sid, )
[docs] def _make_opt_ssr(self, kind: 'Enum_OptionNumber', option: 'Optional[Data_SSROption]' = None, *, counts: 'int' = 10, # reasonable default route: 'Optional[list[IPv4Address | str | bytes | int]]' = None, **kwargs: 'Any') -> 'Schema_SSROption': """Make IPv4 Strict Source Route (``SSR``) option. Args: kind: option type code option: option data counts: maximum number of addresses to record route: list of IPv4 addresses as recorded routes **kwargs: arbitrary keyword arguments Returns: Constructured option schema. """ if option is not None: route = cast('list[IPv4Address | str | bytes | int]', option.route) pointer = option.pointer length = option.length else: route = [] if route is None else route length = 3 + counts * 4 pointer = 4 + min(len(route), counts) * 4 return Schema_SSROption( type=kind, length=length, pointer=pointer, route=route, )
[docs] def _make_opt_mtup(self, kind: 'Enum_OptionNumber', option: 'Optional[Data_MTUPOption]' = None, *, mtu: 'int' = 0, **kwargs: 'Any') -> 'Schema_MTUPOption': """Make IPv4 MTU Probe (``MTUP``) option. Args: kind: option type code option: option data mtu: MTU value **kwargs: arbitrary keyword arguments Returns: Constructured option schema. """ if option is not None: mtu = option.mtu return Schema_MTUPOption( type=kind, length=4, mtu=mtu, )
[docs] def _make_opt_mtur(self, kind: 'Enum_OptionNumber', option: 'Optional[Data_MTUROption]' = None, *, mtu: 'int' = 0, **kwargs: 'Any') -> 'Schema_MTUROption': """Make IPv4 MTU Reply (``MTUR``) option. Args: kind: option type code option: option data mtu: MTU value **kwargs: arbitrary keyword arguments Returns: Constructured option schema. """ if option is not None: mtu = option.mtu return Schema_MTUROption( type=kind, length=4, mtu=mtu, )
[docs] def _make_opt_tr(self, kind: 'Enum_OptionNumber', option: 'Optional[Data_TROption]' = None, *, id: 'int' = 0, out: 'int' = 0, ret: 'int' = 0, origin: 'IPv4Address | str | bytes | int' = '127.0.0.1', **kwargs: 'Any') -> 'Schema_TROption': """Make IPv4 Traceroute (``TR``) option. Args: kind: option type code option: option data id: ID number out: outbound hop count ret: return hop count origin: originator IP address **kwargs: arbitrary keyword arguments Returns: Constructured option schema. """ if option is not None: id = option.id out = option.outbound ret = option['return'] origin = option.originator return Schema_TROption( type=kind, length=12, id=id, out=out, ret=ret, origin=origin, )
[docs] def _make_opt_rtralt(self, kind: 'Enum_OptionNumber', option: 'Optional[Data_RTRALTOption]' = None, *, alert: 'Enum_RouterAlert | StdlibEnum | AenumEnum | int | str' = Enum_RouterAlert.Aggregated_Reservation_Nesting_Level_0, alert_default: 'Optional[int]' = None, alert_namespace: 'Optional[dict[str, int] | dict[int, str] | Type[StdlibEnum] | Type[AenumEnum]]' = None, # pylint: disable=line-too-long alert_reversed: 'bool' = False, **kwargs: 'Any') -> 'Schema_RTRALTOption': """Make IPv4 Router Alert (``RTRALT``) option. Args: kind: option type code option: option data alert: router alert type alert_default: default value for router alert type alert_namespace: namespace for router alert type alert_reversed: whether router alert type is reversed **kwargs: arbitrary keyword arguments Returns: Constructured option schema. """ if option is not None: alert_val = option.alert else: alert_val = self._make_index(alert, alert_default, namespace=alert_namespace, # type: ignore[assignment] reversed=alert_reversed, pack=False) return Schema_RTRALTOption( type=kind, length=4, alert=alert_val, )
[docs] def _make_opt_qs(self, kind: 'Enum_OptionNumber', option: 'Optional[Data_QuickStartRequestOption | Data_QuickStartReportOption]' = None, *, func: 'Enum_QSFunction | StdlibEnum | AenumEnum | str | int' = Enum_QSFunction.Quick_Start_Request, func_default: 'Optional[int]' = None, func_namespace: 'Optional[dict[str, int] | dict[int, str] | Type[StdlibEnum] | Type[AenumEnum]]' = None, # pylint: disable=line-too-long func_reversed: 'bool' = False, rate: 'int' = 0, ttl: 'timedelta | int' = 0, nonce: 'int' = 0, **kwargs: 'Any') -> 'Schema_QSOption': """Make IPv4 Quick-Start (``QS``) option. Args: code: option type value opt: option data func: QS function type func_default: default value for QS function type func_namespace: namespace for QS function type func_reversed: reversed flag for QS function type rate: rate (in kbps) ttl: time to live (in seconds) nonce: nonce value **kwargs: arbitrary keyword arguments Returns: Constructured option schema. """ if option is not None: func_enum = option.func rate = option.rate ttl = getattr(option, 'ttl', 0) nonce = option.nonce else: func_enum = self._make_index(func, func_default, namespace=func_namespace, # type: ignore[assignment] reversed=func_reversed, pack=False) rate_val = math.floor(math.log2(rate * 1000 / 40000)) if rate > 0 else 0 if func_enum == Enum_QSFunction.Quick_Start_Request: ttl_value = ttl if isinstance(ttl, int) else math.floor(ttl.total_seconds()) return Schema_QuickStartRequestOption( type=kind, length=8, flags={ 'func': func_enum, 'rate': rate_val, }, ttl=ttl_value, nonce={ 'nonce': nonce, }, ) if func_enum == Enum_QSFunction.Report_of_Approved_Rate: return Schema_QuickStartReportOption( type=kind, length=8, flags={ 'func': func_enum, 'rate': rate_val, }, nonce={ 'nonce': nonce, }, ) raise ProtocolError(f'{self.alias}: [OptNo {kind}] invalid QS function: {func_enum}')