Source code for pcapkit.protocols.transport.tcp

# -*- coding: utf-8 -*-
# mypy: disable-error-code=dict-item
"""TCP - Transmission Control Protocol
=========================================

.. module:: pcapkit.protocols.transport.tcp

:mod:`pcapkit.protocols.transport.tcp` contains
:class:`~pcapkit.protocols.transport.tcp.TCP` only,
which implements extractor for Transmission Control
Protocol (TCP) [*]_, whose structure is described as
below:

======= ========= ========================= =======================================
Octets      Bits        Name                    Description
======= ========= ========================= =======================================
  0           0   ``tcp.srcport``           Source Port
  2          16   ``tcp.dstport``           Destination Port
  4          32   ``tcp.seq``               Sequence Number
  8          64   ``tcp.ack``               Acknowledgement Number (if ACK set)
  12         96   ``tcp.hdr_len``           Data Offset
  12        100                             Reserved (must be ``\\x00``)
  12        103   ``tcp.flags.ns``          ECN Concealment Protection (NS)
  13        104   ``tcp.flags.cwr``         Congestion Window Reduced (CWR)
  13        105   ``tcp.flags.ece``         ECN-Echo (ECE)
  13        106   ``tcp.flags.urg``         Urgent (URG)
  13        107   ``tcp.flags.ack``         Acknowledgement (ACK)
  13        108   ``tcp.flags.psh``         Push Function (PSH)
  13        109   ``tcp.flags.rst``         Reset Connection (RST)
  13        110   ``tcp.flags.syn``         Synchronize Sequence Numbers (SYN)
  13        111   ``tcp.flags.fin``         Last Packet from Sender (FIN)
  14        112   ``tcp.window_size``       Size of Receive Window
  16        128   ``tcp.checksum``          Checksum
  18        144   ``tcp.urgent_pointer``    Urgent Pointer (if URG set)
  20        160   ``tcp.opt``               TCP Options (if data offset > 5)
======= ========= ========================= =======================================

.. [*] https://en.wikipedia.org/wiki/Transmission_Control_Protocol

"""
import collections
import datetime
import ipaddress
import math
from typing import TYPE_CHECKING, cast

from pcapkit.const.reg.transtype import TransType
from pcapkit.const.tcp.checksum import Checksum as Enum_Checksum
from pcapkit.const.tcp.flags import Flags as Enum_Flags
from pcapkit.const.tcp.mp_tcp_option import MPTCPOption as Enum_MPTCPOption
from pcapkit.const.tcp.option import Option as Enum_Option
from pcapkit.corekit.module import ModuleDescriptor
from pcapkit.corekit.multidict import OrderedMultiDict
from pcapkit.protocols.data.transport.tcp import CC as Data_CC
from pcapkit.protocols.data.transport.tcp import MPTCPDSS as Data_MPTCPDSS
from pcapkit.protocols.data.transport.tcp import SACK as Data_SACK
from pcapkit.protocols.data.transport.tcp import TCP as Data_TCP
from pcapkit.protocols.data.transport.tcp import AlternateChecksumData as Data_AlternateChecksumData
from pcapkit.protocols.data.transport.tcp import \
    AlternateChecksumRequest as Data_AlternateChecksumRequest
from pcapkit.protocols.data.transport.tcp import Authentication as Data_Authentication
from pcapkit.protocols.data.transport.tcp import CCEcho as Data_CCEcho
from pcapkit.protocols.data.transport.tcp import CCNew as Data_CCNew
from pcapkit.protocols.data.transport.tcp import Echo as Data_Echo
from pcapkit.protocols.data.transport.tcp import EchoReply as Data_EchoReply
from pcapkit.protocols.data.transport.tcp import EndOfOptionList as Data_EndOfOptionList
from pcapkit.protocols.data.transport.tcp import FastOpenCookie as Data_FastOpenCookie
from pcapkit.protocols.data.transport.tcp import Flags as Data_Flags
from pcapkit.protocols.data.transport.tcp import MaximumSegmentSize as Data_MaximumSegmentSize
from pcapkit.protocols.data.transport.tcp import MD5Signature as Data_MD5Signature
from pcapkit.protocols.data.transport.tcp import MPTCPAddAddress as Data_MPTCPAddAddress
from pcapkit.protocols.data.transport.tcp import MPTCPCapable as Data_MPTCPCapable
from pcapkit.protocols.data.transport.tcp import MPTCPCapableFlag as Data_MPTCPCapableFlag
from pcapkit.protocols.data.transport.tcp import MPTCPFallback as Data_MPTCPFallback
from pcapkit.protocols.data.transport.tcp import MPTCPFastclose as Data_MPTCPFastclose
from pcapkit.protocols.data.transport.tcp import MPTCPJoinACK as Data_MPTCPJoinACK
from pcapkit.protocols.data.transport.tcp import MPTCPJoinSYN as Data_MPTCPJoinSYN
from pcapkit.protocols.data.transport.tcp import MPTCPJoinSYNACK as Data_MPTCPJoinSYNACK
from pcapkit.protocols.data.transport.tcp import MPTCPPriority as Data_MPTCPPriority
from pcapkit.protocols.data.transport.tcp import MPTCPRemoveAddress as Data_MPTCPRemoveAddress
from pcapkit.protocols.data.transport.tcp import MPTCPUnknown as Data_MPTCPUnknown
from pcapkit.protocols.data.transport.tcp import NoOperation as Data_NoOperation
from pcapkit.protocols.data.transport.tcp import \
    PartialOrderConnectionPermitted as Data_PartialOrderConnectionPermitted
from pcapkit.protocols.data.transport.tcp import \
    PartialOrderServiceProfile as Data_PartialOrderServiceProfile
from pcapkit.protocols.data.transport.tcp import QuickStartResponse as Data_QuickStartResponse
from pcapkit.protocols.data.transport.tcp import SACKBlock as Data_SACKBlock
from pcapkit.protocols.data.transport.tcp import SACKPermitted as Data_SACKPermitted
from pcapkit.protocols.data.transport.tcp import Timestamps as Data_Timestamps
from pcapkit.protocols.data.transport.tcp import UnassignedOption as Data_UnassignedOption
from pcapkit.protocols.data.transport.tcp import UserTimeout as Data_UserTimeout
from pcapkit.protocols.data.transport.tcp import WindowScale as Data_WindowScale
from pcapkit.protocols.schema.transport.tcp import CC as Schema_CC
from pcapkit.protocols.schema.transport.tcp import MPTCPDSS as Schema_MPTCPDSS
from pcapkit.protocols.schema.transport.tcp import SACK as Schema_SACK
from pcapkit.protocols.schema.transport.tcp import TCP as Schema_TCP
from pcapkit.protocols.schema.transport.tcp import \
    AlternateChecksumData as Schema_AlternateChecksumData
from pcapkit.protocols.schema.transport.tcp import \
    AlternateChecksumRequest as Schema_AlternateChecksumRequest
from pcapkit.protocols.schema.transport.tcp import Authentication as Schema_Authentication
from pcapkit.protocols.schema.transport.tcp import CCEcho as Schema_CCEcho
from pcapkit.protocols.schema.transport.tcp import CCNew as Schema_CCNew
from pcapkit.protocols.schema.transport.tcp import Echo as Schema_Echo
from pcapkit.protocols.schema.transport.tcp import EchoReply as Schema_EchoReply
from pcapkit.protocols.schema.transport.tcp import EndOfOptionList as Schema_EndOfOptionList
from pcapkit.protocols.schema.transport.tcp import FastOpenCookie as Schema_FastOpenCookie
from pcapkit.protocols.schema.transport.tcp import MaximumSegmentSize as Schema_MaximumSegmentSize
from pcapkit.protocols.schema.transport.tcp import MD5Signature as Schema_MD5Signature
from pcapkit.protocols.schema.transport.tcp import MPTCPAddAddress as Schema_MPTCPAddAddress
from pcapkit.protocols.schema.transport.tcp import MPTCPCapable as Schema_MPTCPCapable
from pcapkit.protocols.schema.transport.tcp import MPTCPFallback as Schema_MPTCPFallback
from pcapkit.protocols.schema.transport.tcp import MPTCPFastclose as Schema_MPTCPFastclose
from pcapkit.protocols.schema.transport.tcp import MPTCPJoinACK as Schema_MPTCPJoinACK
from pcapkit.protocols.schema.transport.tcp import MPTCPJoinSYN as Schema_MPTCPJoinSYN
from pcapkit.protocols.schema.transport.tcp import MPTCPJoinSYNACK as Schema_MPTCPJoinSYNACK
from pcapkit.protocols.schema.transport.tcp import MPTCPPriority as Schema_MPTCPPriority
from pcapkit.protocols.schema.transport.tcp import MPTCPRemoveAddress as Schema_MPTCPRemoveAddress
from pcapkit.protocols.schema.transport.tcp import MPTCPUnknown as Schema_MPTCPUnknown
from pcapkit.protocols.schema.transport.tcp import NoOperation as Schema_NoOperation
from pcapkit.protocols.schema.transport.tcp import \
    PartialOrderConnectionPermitted as Schema_PartialOrderConnectionPermitted
from pcapkit.protocols.schema.transport.tcp import \
    PartialOrderServiceProfile as Schema_PartialOrderServiceProfile
from pcapkit.protocols.schema.transport.tcp import QuickStartResponse as Schema_QuickStartResponse
from pcapkit.protocols.schema.transport.tcp import SACKBlock as Schema_SACKBlock
from pcapkit.protocols.schema.transport.tcp import SACKPermitted as Schema_SACKPermitted
from pcapkit.protocols.schema.transport.tcp import Timestamps as Schema_Timestamps
from pcapkit.protocols.schema.transport.tcp import UnassignedOption as Schema_UnassignedOption
from pcapkit.protocols.schema.transport.tcp import UserTimeout as Schema_UserTimeout
from pcapkit.protocols.schema.transport.tcp import WindowScale as Schema_WindowScale
from pcapkit.protocols.transport.transport import Transport
from pcapkit.utilities.exceptions import ProtocolError
from pcapkit.utilities.warnings import RegistryWarning, warn

if TYPE_CHECKING:
    from datetime import timedelta
    from enum import IntEnum as StdlibEnum
    from ipaddress import IPv4Address, IPv6Address
    from typing import Any, Callable, DefaultDict, Optional, Type

    from aenum import IntEnum as AenumEnum
    from mypy_extensions import DefaultArg, KwArg, NamedArg
    from typing_extensions import Literal

    from pcapkit.const.reg.apptype import AppType as Enum_AppType
    from pcapkit.protocols.data.transport.tcp import MPTCP as Data_MPTCP
    from pcapkit.protocols.data.transport.tcp import MPTCPJoin as Data_MPTCPJoin
    from pcapkit.protocols.data.transport.tcp import Option as Data_Option
    from pcapkit.protocols.protocol import ProtocolBase as Protocol
    from pcapkit.protocols.schema.schema import Schema
    from pcapkit.protocols.schema.transport.tcp import MPTCP as Schema_MPTCP
    from pcapkit.protocols.schema.transport.tcp import Flags as Schema_Flags
    from pcapkit.protocols.schema.transport.tcp import MPTCPJoin as Schema_MPTCPJoin
    from pcapkit.protocols.schema.transport.tcp import Option as Schema_Option

    Option = OrderedMultiDict[Enum_Option, Data_Option]
    OptionParser = Callable[[Schema_Option, NamedArg(Option, 'options')], Data_Option]
    MPOptionParser = Callable[[Schema_MPTCP, NamedArg(Option, 'options')], Data_MPTCP]
    OptionConstructor = Callable[[Enum_Option, DefaultArg(Optional[Data_Option]),
                                  KwArg(Any)], Schema_Option]
    MPOptionConstructor = Callable[[Enum_MPTCPOption, DefaultArg(Optional[Data_MPTCP]),
                                    KwArg(Any)], Schema_MPTCP]

__all__ = ['TCP']


[docs] class TCP(Transport[Data_TCP, Schema_TCP], schema=Schema_TCP, data=Data_TCP): """This class implements Transmission Control Protocol. This class currently supports parsing of the following protocols, which are registered in the :attr:`self.__proto__ <pcapkit.protocols.transport.tcp.TCP.__proto__>` attribute: .. list-table:: :header-rows: 1 * - Port Number - Protocol * - 21 - :class:`pcapkit.protocols.application.ftp.FTP` * - 80 - :class:`pcapkit.protocols.application.http.HTTP` This class currently supports parsing of the following TCP options, which are directly mapped to the :class:`pcapkit.const.tcp.option.Option` enumeration: .. list-table:: :header-rows: 1 * - Option Code - Option Parser - Option Constructor * - :attr:`~pcapkit.const.tcp.option.Option.End_of_Option_List` - :meth:`~pcapkit.protocols.transport.tcp.TCP._read_mode_eool` - :meth:`~pcapkit.protocols.transport.tcp.TCP._make_mode_eool` * - :attr:`~pcapkit.const.tcp.option.Option.No_Operation` - :meth:`~pcapkit.protocols.transport.tcp.TCP._read_mode_nop` - :meth:`~pcapkit.protocols.transport.tcp.TCP._make_mode_nop` * - :attr:`~pcapkit.const.tcp.option.Option.Maximum_Segment_Size` - :meth:`~pcapkit.protocols.transport.tcp.TCP._read_mode_mss` - :meth:`~pcapkit.protocols.transport.tcp.TCP._make_mode_mss` * - :attr:`~pcapkit.const.tcp.option.Option.Window_Scale` - :meth:`~pcapkit.protocols.transport.tcp.TCP._read_mode_ws` - :meth:`~pcapkit.protocols.transport.tcp.TCP._make_mode_ws` * - :attr:`~pcapkit.const.tcp.option.Option.SACK_Permitted` - :meth:`~pcapkit.protocols.transport.tcp.TCP._read_mode_sackpmt` - :meth:`~pcapkit.protocols.transport.tcp.TCP._make_mode_sackpmt` * - :attr:`~pcapkit.const.tcp.option.Option.SACK` - :meth:`~pcapkit.protocols.transport.tcp.TCP._read_mode_sack` - :meth:`~pcapkit.protocols.transport.tcp.TCP._make_mode_sack` * - :attr:`~pcapkit.const.tcp.option.Option.Echo` - :meth:`~pcapkit.protocols.transport.tcp.TCP._read_mode_echo` - :meth:`~pcapkit.protocols.transport.tcp.TCP._make_mode_echo` * - :attr:`~pcapkit.const.tcp.option.Option.Echo_Reply` - :meth:`~pcapkit.protocols.transport.tcp.TCP._read_mode_echore` - :meth:`~pcapkit.protocols.transport.tcp.TCP._make_mode_echore` * - :attr:`~pcapkit.const.tcp.option.Option.Timestamps` - :meth:`~pcapkit.protocols.transport.tcp.TCP._read_mode_ts` - :meth:`~pcapkit.protocols.transport.tcp.TCP._make_mode_ts` * - :attr:`~pcapkit.const.tcp.option.Option.Partial_Order_Connection_Permitted` - :meth:`~pcapkit.protocols.transport.tcp.TCP._read_mode_poc` - :meth:`~pcapkit.protocols.transport.tcp.TCP._make_mode_poc` * - :attr:`~pcapkit.const.tcp.option.Option.Partial_Order_Service_Profile` - :meth:`~pcapkit.protocols.transport.tcp.TCP._read_mode_pocsp` - :meth:`~pcapkit.protocols.transport.tcp.TCP._make_mode_pocsp` * - :attr:`~pcapkit.const.tcp.option.Option.CC` - :meth:`~pcapkit.protocols.transport.tcp.TCP._read_mode_cc` - :meth:`~pcapkit.protocols.transport.tcp.TCP._make_mode_cc` * - :attr:`~pcapkit.const.tcp.option.Option.CC_NEW` - :meth:`~pcapkit.protocols.transport.tcp.TCP._read_mode_ccnew` - :meth:`~pcapkit.protocols.transport.tcp.TCP._make_mode_ccnew` * - :attr:`~pcapkit.const.tcp.option.Option.CC_ECHO` - :meth:`~pcapkit.protocols.transport.tcp.TCP._read_mode_ccecho` - :meth:`~pcapkit.protocols.transport.tcp.TCP._make_mode_ccecho` * - :attr:`~pcapkit.const.tcp.option.Option.TCP_Alternate_Checksum_Request` - :meth:`~pcapkit.protocols.transport.tcp.TCP._read_mode_chkreq` - :meth:`~pcapkit.protocols.transport.tcp.TCP._make_mode_chkreq` * - :attr:`~pcapkit.const.tcp.option.Option.TCP_Alternate_Checksum_Data` - :meth:`~pcapkit.protocols.transport.tcp.TCP._read_mode_chksum` - :meth:`~pcapkit.protocols.transport.tcp.TCP._make_mode_chksum` * - :attr:`~pcapkit.const.tcp.option.Option.MD5_Signature_Option` - :meth:`~pcapkit.protocols.transport.tcp.TCP._read_mode_sig` - :meth:`~pcapkit.protocols.transport.tcp.TCP._make_mode_sig` * - :attr:`~pcapkit.const.tcp.option.Option.Quick_Start_Response` - :meth:`~pcapkit.protocols.transport.tcp.TCP._read_mode_qs` - :meth:`~pcapkit.protocols.transport.tcp.TCP._make_mode_qs` * - :attr:`~pcapkit.const.tcp.option.Option.User_Timeout_Option` - :meth:`~pcapkit.protocols.transport.tcp.TCP._read_mode_timeout` - :meth:`~pcapkit.protocols.transport.tcp.TCP._make_mode_timeout` * - :attr:`~pcapkit.const.tcp.option.Option.TCP_Authentication_Option` - :meth:`~pcapkit.protocols.transport.tcp.TCP._read_mode_ao` - :meth:`~pcapkit.protocols.transport.tcp.TCP._make_mode_ao` * - :attr:`~pcapkit.const.tcp.option.Option.Multipath_TCP` - :meth:`~pcapkit.protocols.transport.tcp.TCP._read_mode_mp` - :meth:`~pcapkit.protocols.transport.tcp.TCP._make_mode_mp` * - :attr:`~pcapkit.const.tcp.option.Option.TCP_Fast_Open_Cookie` - :meth:`~pcapkit.protocols.transport.tcp.TCP._read_mode_fastopen` - :meth:`~pcapkit.protocols.transport.tcp.TCP._make_mode_fastopen` This class currently supports parsing of the following Multipath TCP options, which are directly mapped to the :class:`pcapkit.const.tcp.mp_tcp_option.MPTCPOption` enumeration: .. list-table:: :header-rows: 1 * - Option Code - Option Parser - Option Constructor * - :attr:`~pcapkit.const.tcp.mp_tcp_option.MPTCPOption.MP_CAPABLE` - :meth:`~pcapkit.protocols.transport.tcp.TCP._read_mptcp_capable` - :meth:`~pcapkit.protocols.transport.tcp.TCP._make_mptcp_capable` * - :attr:`~pcapkit.const.tcp.mp_tcp_option.MPTCPOption.MP_JOIN` - :meth:`~pcapkit.protocols.transport.tcp.TCP._read_mptcp_join` - :meth:`~pcapkit.protocols.transport.tcp.TCP._make_mptcp_join` * - :attr:`~pcapkit.const.tcp.mp_tcp_option.MPTCPOption.DSS` - :meth:`~pcapkit.protocols.transport.tcp.TCP._read_mptcp_dss` - :meth:`~pcapkit.protocols.transport.tcp.TCP._make_mptcp_dss` * - :attr:`~pcapkit.const.tcp.mp_tcp_option.MPTCPOption.ADD_ADDR` - :meth:`~pcapkit.protocols.transport.tcp.TCP._read_mptcp_addaddr` - :meth:`~pcapkit.protocols.transport.tcp.TCP._make_mptcp_addaddr` * - :attr:`~pcapkit.const.tcp.mp_tcp_option.MPTCPOption.REMOVE_ADDR` - :meth:`~pcapkit.protocols.transport.tcp.TCP._read_mptcp_remove` - :meth:`~pcapkit.protocols.transport.tcp.TCP._make_mptcp_remove` * - :attr:`~pcapkit.const.tcp.mp_tcp_option.MPTCPOption.MP_PRIO` - :meth:`~pcapkit.protocols.transport.tcp.TCP._read_mptcp_prio` - :meth:`~pcapkit.protocols.transport.tcp.TCP._make_mptcp_prio` * - :attr:`~pcapkit.const.tcp.mp_tcp_option.MPTCPOption.MP_FAIL` - :meth:`~pcapkit.protocols.transport.tcp.TCP._read_mptcp_fail` - :meth:`~pcapkit.protocols.transport.tcp.TCP._make_mptcp_fail` * - :attr:`~pcapkit.const.tcp.mp_tcp_option.MPTCPOption.MP_FASTCLOSE` - :meth:`~pcapkit.protocols.transport.tcp.TCP._read_mptcp_fastclose` - :meth:`~pcapkit.protocols.transport.tcp.TCP._make_mptcp_fastclose` """ ########################################################################## # Defaults. ########################################################################## #: DefaultDict[int, ModuleDescriptor[Protocol] | Type[Protocol]]: Protocol #: index mapping for decoding next layer, c.f. #: :meth:`self._decode_next_layer <pcapkit.protocols.transport.transport.Transport._decode_next_layer>` #: & :meth:`self._import_next_layer <pcapkit.protocols.protocol.Protocol._import_next_layer>`. __proto__ = collections.defaultdict( lambda: ModuleDescriptor('pcapkit.protocols.misc.raw', 'Raw'), { 21: ModuleDescriptor('pcapkit.protocols.application.ftp', 'FTP'), # FTP 80: ModuleDescriptor('pcapkit.protocols.application.httpv1', 'HTTP'), # HTTP/1.* }, ) #: DefaultDict[Enum_Option, str | tuple[OptionParser, OptionConstructor]]: Option #: code to method mapping, c.f. :meth:`_read_tcp_options` and #: :meth:`_make_tcp_options`. Method names are expected to be referred to #: the class by ``_read_mode_${name}`` and ``_make_mode_${name}``, and if #: such name not found, the value should then be a method that can parse #: the option by itself. __option__ = collections.defaultdict( lambda: 'donone', { Enum_Option.End_of_Option_List: 'eool', # [RFC 793] End of Option List Enum_Option.No_Operation: 'nop', # [RFC 793] No-Operation Enum_Option.Maximum_Segment_Size: 'mss', # [RFC 793] Maximum Segment Size Enum_Option.Window_Scale: 'ws', # [RFC 7323] Window Scale Enum_Option.SACK_Permitted: 'sackpmt', # [RFC 2018] SACK Permitted Enum_Option.SACK: 'sack', # [RFC 2018] SACK Enum_Option.Echo: 'echo', # [RFC 1072] Echo Enum_Option.Echo_Reply: 'echore', # [RFC 1072] Echo Reply Enum_Option.Timestamps: 'ts', # [RFC 7323] Timestamps Enum_Option.Partial_Order_Connection_Permitted: 'poc', # [RFC 1693] POC Permitted Enum_Option.Partial_Order_Service_Profile: 'pocsp', # [RFC 1693] POC-Serv Profile Enum_Option.CC: 'cc', # [RFC 1644] Connection Count Enum_Option.CC_NEW: 'ccnew', # [RFC 1644] CC.NEW Enum_Option.CC_ECHO: 'ccecho', # [RFC 1644] CC.ECHO Enum_Option.TCP_Alternate_Checksum_Request: 'chkreq', # [RFC 1146] Alt-Chksum Request Enum_Option.TCP_Alternate_Checksum_Data: 'chksum', # [RFC 1146] Alt-Chksum Data Enum_Option.MD5_Signature_Option: 'sig', # [RFC 2385] MD5 Signature Option Enum_Option.Quick_Start_Response: 'qs', # [RFC 4782] Quick-Start Response Enum_Option.User_Timeout_Option: 'timeout', # [RFC 5482] User Timeout Option Enum_Option.TCP_Authentication_Option: 'ao', # [RFC 5925] TCP Authentication Option Enum_Option.Multipath_TCP: 'mp', # [RFC 6824] Multipath TCP Enum_Option.TCP_Fast_Open_Cookie: 'fastopen', # [RFC 7413] Fast Open }, ) # type: DefaultDict[int, str | tuple[OptionParser, OptionConstructor]] #: DefaultDict[Enum_MPTCPOption, str | tuple[MPOptionParser, MPOptionConstructor]]: Option #: code to method mapping, c.f. :meth:`_read_mode_mp` and :meth:`_make_mode_mp`. #: Method names are expected to be referred to the class by ``_read_mptcp_${name}`` #: and ``_make_mptcp_${name}``, and if such name not found, the value should #: then be a method that can parse the option by itself. __mp_option__ = collections.defaultdict( lambda: 'unknown', { Enum_MPTCPOption.MP_CAPABLE: 'capable', Enum_MPTCPOption.MP_JOIN: 'join', Enum_MPTCPOption.DSS: 'dss', Enum_MPTCPOption.ADD_ADDR: 'addaddr', Enum_MPTCPOption.REMOVE_ADDR: 'removeaddr', Enum_MPTCPOption.MP_PRIO: 'prio', Enum_MPTCPOption.MP_FAIL: 'fail', Enum_MPTCPOption.MP_FASTCLOSE: 'fastclose', }, ) # type: DefaultDict[int, str | tuple[MPOptionParser, MPOptionConstructor]] ########################################################################## # Properties. ########################################################################## @property def name(self) -> 'Literal["Transmission Control Protocol"]': """Name of current protocol.""" return 'Transmission Control Protocol' @property def length(self) -> 'int': """Header length of current protocol.""" return self._info.hdr_len @property def src(self) -> 'Enum_AppType': """Source port.""" return self._info.srcport @property def dst(self) -> 'Enum_AppType': """Destination port.""" return self._info.dstport @property def connection(self) -> 'Enum_Flags': """Connection flags.""" return self._flags ########################################################################## # Methods. ##########################################################################
[docs] def read(self, length: 'Optional[int]' = None, **kwargs: 'Any') -> 'Data_TCP': # pylint: disable=unused-argument """Read Transmission Control Protocol (TCP). Structure of TCP header [:rfc:`793`]:: 0 1 2 3 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | Source Port | Destination Port | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | Sequence Number | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | Acknowledgement Number | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | Data | |U|A|P|R|S|F| | | Offset| Reserved |R|C|S|S|Y|I| Window | | | |G|K|H|T|N|N| | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | Checksum | Urgent Pointer | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | Options | Padding | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | data | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ Args: length: Length of packet data. **kwargs: Arbitrary keyword arguments. Returns: Parsed packet data. """ if length is None: length = len(self) schema = self.__header__ tcp = Data_TCP( srcport=schema.srcport, dstport=schema.dstport, seq=schema.seq, ack=schema.ack, hdr_len=schema.offset['offset'] * 4, flags=Data_Flags( #ns=bool(schema.offset['ns']), cwr=bool(schema.flags['cwr']), ece=bool(schema.flags['ece']), urg=bool(schema.flags['urg']), ack=bool(schema.flags['ack']), psh=bool(schema.flags['psh']), rst=bool(schema.flags['rst']), syn=bool(schema.flags['syn']), fin=bool(schema.flags['fin']), ), window_size=schema.window, checksum=schema.checksum, urgent_pointer=schema.urgent, ) # connection control flags _flag = cast('Enum_Flags', 0) for key, val in schema.flags.items(): if val == 1: _flag |= Enum_Flags.get(key.upper()) self._flags = _flag tcp.__update__({ 'connection': self._flags, }) _optl = tcp.hdr_len - 20 if _optl: tcp.__update__({ 'options': self._read_tcp_options(_optl), }) return self._decode_next_layer(tcp, (tcp.srcport.port, tcp.dstport.port), length - tcp.hdr_len)
[docs] def make(self, srcport: 'Enum_AppType | int' = 0, dstport: 'Enum_AppType | int' = 0, seq_no: 'int' = 0, ack_no: 'int' = 0, ns: 'bool' = False, cwr: 'bool' = False, ece: 'bool' = False, urg: 'bool' = False, ack: 'bool' = False, psh: 'bool' = False, rst: 'bool' = False, syn: 'bool' = False, fin: 'bool' = False, window: 'int' = 65535, # reasonable default value checksum: 'bytes' = b'\x00\x00', urgent: 'int' = 0, options: 'Optional[list[Schema_Option | tuple[Enum_Option, dict[str, Any]] | bytes] | Option]' = None, # pylint: disable=line-too-long payload: 'bytes | Protocol | Schema' = b'', **kwargs: 'Any') -> 'Schema_TCP': """Make (construct) packet data. Args: srcport: Source port. dstport: Destination port. seq_no: Sequence number. ack_no: Acknowledgement number. ns: ECN-nonce concealment protection. cwr: Congestion window reduced. ece: ECN-Echo. urg: Urgent. ack: Acknowledgement. psh: Push function. rst: Reset connection. syn: Synchronize sequence numbers. fin: Last packet from sender. window: Window size. checksum: Checksum. urgent: Urgent pointer. options: TCP options. payload: Payload of the packet. **kwargs: Arbitrary keyword arguments. Returns: Constructed packet data. """ if options is not None: options_value, total_length = self._make_tcp_options(options) else: options_value, total_length = [], 0 offset = math.ceil((20 + total_length) / 4) flags = { 'cwr': int(cwr), 'ece': int(ece), 'urg': int(urg), 'ack': int(ack), 'psh': int(psh), 'rst': int(rst), 'syn': int(syn), 'fin': int(fin), } # type: Schema_Flags _flag = cast('Enum_Flags', 0) for key, val in flags.items(): if val == 1: _flag |= Enum_Flags.get(key.upper()) self._flags = _flag return Schema_TCP( srcport=srcport, dstport=dstport, seq=seq_no, ack=ack_no, offset={ 'offset': offset, 'ns': int(ns), }, flags=flags, window=window, checksum=checksum, urgent=urgent, options=options_value, payload=payload, )
[docs] @classmethod def register_option(cls, code: 'Enum_Option', meth: 'str | tuple[OptionParser, OptionConstructor]') -> 'None': """Register an option parser. Args: code: TCP option code. meth: Method name or callable to parse and/or construct the option. """ if code in cls.__option__: warn(f'option {code} already registered, overwriting', RegistryWarning) cls.__option__[code] = meth
[docs] @classmethod def register_mp_option(cls, code: 'Enum_MPTCPOption', meth: 'str | tuple[MPOptionParser, MPOptionConstructor]') -> 'None': """Register an MPTCP option parser. Args: code: MPTCP option code. meth: Method name or callable to parse and/or construct the option. """ if code in cls.__mp_option__: warn(f'option {code} already registered, overwriting', RegistryWarning) cls.__mp_option__[code] = meth
########################################################################## # Data models. ########################################################################## def __length_hint__(self) -> 'Literal[20]': """Return an estimated length for the object.""" return 20
[docs] @classmethod def __index__(cls) -> 'TransType': # pylint: disable=invalid-index-returned """Numeral registry index of the protocol. Returns: Numeral registry index of the protocol in `IANA`_. .. _IANA: https://www.iana.org/assignments/protocol-numbers/protocol-numbers.xhtml """ return TransType.TCP # type: ignore[return-value]
########################################################################## # Utilities. ##########################################################################
[docs] @classmethod def _make_data(cls, data: 'Data_TCP') -> 'dict[str, Any]': # type: ignore[override] """Create key-value pairs from ``data`` for protocol construction. Args: data: protocol data Returns: Key-value pairs for protocol construction. """ return { 'srcport': data.srcport, 'dstport': data.dstport, 'seq_no': data.seq, 'ack_no': data.ack, #'ns': data.flags.ns, 'cwr': data.flags.cwr, 'ece': data.flags.ece, 'urg': data.flags.urg, 'ack': data.flags.ack, 'psh': data.flags.psh, 'rst': data.flags.rst, 'syn': data.flags.syn, 'fin': data.flags.fin, 'window': data.window_size, 'checksum': data.checksum, 'urgent': data.urgent_pointer, 'options': getattr(data, 'options', None), 'payload': cls._make_payload(data), }
[docs] def _read_tcp_options(self, size: 'int') -> 'Option': """Read TCP option list. Arguments: size: length of option list Returns: Extracted TCP options. Raises: ProtocolError: If the threshold is **NOT** matching. """ counter = 0 # length of read option list options = OrderedMultiDict() # type: Option for schema in self.__header__.options: kind = schema.kind name = self.__option__[kind] if isinstance(name, str): meth_name = f'_read_mode_{name}' meth = cast('OptionParser', getattr(self, meth_name, self._read_mode_donone)) else: meth = name[0] data = meth(schema, options=options) # record option data options.add(kind, data) counter += len(schema) # break when End of Option List (EOOL) triggered if kind == Enum_Option.End_of_Option_List: break # check threshold if counter > size: raise ProtocolError('TCP: invalid format') return options
[docs] def _read_mode_donone(self, schema: 'Schema_UnassignedOption', *, options: 'Option') -> 'Data_UnassignedOption': # pylint: disable=unused-argument """Read options request no process. Arguments: schema: parsed option schema options: extracted TCP options Returns: Parsed option data. """ option = Data_UnassignedOption( kind=schema.kind, length=schema.length, data=schema.data, ) return option
[docs] def _read_mode_eool(self, schema: 'Schema_EndOfOptionList', *, options: 'Option') -> 'Data_EndOfOptionList': # pylint: disable=unused-argument """Read TCP End of Option List option. Structure of TCP end of option list option [:rfc:`793`]: .. code-block:: text +--------+ |00000000| +--------+ Kind=0 Arguments: schema: parsed option schema options: extracted TCP options Returns: Parsed option data. """ return Data_EndOfOptionList( kind=schema.kind, length=1, )
[docs] def _read_mode_nop(self, schema: 'Schema_NoOperation', *, options: 'Option') -> 'Data_NoOperation': # pylint: disable=unused-argument """Read TCP No Operation option. Structure of TCP maximum segment size option [:rfc:`793`]: .. code-block:: text +--------+ |00000001| +--------+ Kind=1 Arguments: schema: parsed option schema options: extracted TCP options Returns: Parsed option data. """ return Data_NoOperation( kind=schema.kind, length=1, )
[docs] def _read_mode_mss(self, schema: 'Schema_MaximumSegmentSize', *, options: 'Option') -> 'Data_MaximumSegmentSize': # pylint: disable=unused-argument """Read TCP max segment size option. Structure of TCP maximum segment size option [:rfc:`793`]: .. code-block:: text +--------+--------+---------+--------+ |00000010|00000100| max seg size | +--------+--------+---------+--------+ Kind=2 Length=4 Arguments: schema: parsed option schema options: extracted TCP options Returns: Parsed option data. Raises: ProtocolError: If length is **NOT** ``4``. """ if schema.length != 4: raise ProtocolError(f'{self.alias}: [OptNo {schema.kind}] invalid format') data = Data_MaximumSegmentSize( kind=schema.kind, length=schema.length, mss=schema.mss, ) return data
[docs] def _read_mode_ws(self, schema: 'Schema_WindowScale', *, options: 'Option') -> 'Data_WindowScale': # pylint: disable=unused-argument """Read TCP windows scale option. Structure of TCP window scale option [:rfc:`7323`]: .. code-block:: text +---------+---------+---------+ | Kind=3 |Length=3 |shift.cnt| +---------+---------+---------+ 1 1 1 Arguments: schema: parsed option schema options: extracted TCP options Returns: Parsed option data. Raises: ProtocolError: If length is **NOT** ``3``. """ if schema.length != 3: raise ProtocolError(f'{self.alias}: [OptNo {schema.kind}] invalid format') data = Data_WindowScale( kind=schema.kind, length=schema.length, shift=schema.shift, ) return data
[docs] def _read_mode_sackpmt(self, schema: 'Schema_SACKPermitted', *, options: 'Option') -> 'Data_SACKPermitted': # pylint: disable=unused-argument """Read TCP SACK permitted option. Structure of TCP SACK permitted option [:rfc:`2018`]: .. code-block:: text +---------+---------+ | Kind=4 | Length=2| +---------+---------+ Arguments: schema: parsed option schema options: extracted TCP options Returns: Parsed option data. Raises: ProtocolError: If length is **NOT** ``2``. """ if schema.length != 2: raise ProtocolError(f'{self.alias}: [OptNo {schema.kind}] invalid format') return Data_SACKPermitted( kind=schema.kind, length=schema.length, )
[docs] def _read_mode_sack(self, schema: 'Schema_SACK', *, options: 'Option') -> 'Data_SACK': # pylint: disable=unused-argument """Read TCP SACK option. Structure of TCP SACK option [:rfc:`2018`]: .. code-block:: text +--------+--------+ | Kind=5 | Length | +--------+--------+--------+--------+ | Left Edge of 1st Block | +--------+--------+--------+--------+ | Right Edge of 1st Block | +--------+--------+--------+--------+ | | / . . . / | | +--------+--------+--------+--------+ | Left Edge of nth Block | +--------+--------+--------+--------+ | Right Edge of nth Block | +--------+--------+--------+--------+ Arguments: schema: parsed option schema options: extracted TCP options Returns: Parsed option data. Raises: ProtocolError: If length is **NOT** multiply of ``8`` plus ``2``. """ data = Data_SACK( kind=schema.kind, length=schema.length, sack=tuple( Data_SACKBlock( left=block.left, right=block.right, ) for block in schema.sack ), ) return data
[docs] def _read_mode_echo(self, schema: 'Schema_Echo', *, options: 'Option') -> 'Data_Echo': # pylint: disable=unused-argument """Read TCP echo option. Structure of TCP echo option [:rfc:`1072`]: .. code-block:: text +--------+--------+--------+--------+--------+--------+ | Kind=6 | Length | 4 bytes of info to be echoed | +--------+--------+--------+--------+--------+--------+ Arguments: schema: parsed option schema options: extracted TCP options Returns: Parsed option data. Raises: ProtocolError: If length is **NOT** ``6``. """ if schema.length != 6: raise ProtocolError(f'{self.alias}: [OptNo {schema.kind}] invalid format') data = Data_Echo( kind=schema.kind, length=schema.length, data=schema.data, ) return data
[docs] def _read_mode_echore(self, schema: 'Schema_EchoReply', *, options: 'Option') -> 'Data_EchoReply': # pylint: disable=unused-argument """Read TCP echo reply option. Structure of TCP echo reply option [:rfc:`1072`]: .. code-block:: text +--------+--------+--------+--------+--------+--------+ | Kind=7 | Length | 4 bytes of echoed info | +--------+--------+--------+--------+--------+--------+ Arguments: schema: parsed option schema options: extracted TCP options Returns: Parsed option data. Raises: ProtocolError: If length is **NOT** ``6``. """ if schema.length != 6: raise ProtocolError(f'{self.alias}: [OptNo {schema.kind}] invalid format') data = Data_EchoReply( kind=schema.kind, length=schema.length, data=schema.data, ) return data
[docs] def _read_mode_ts(self, schema: 'Schema_Timestamps', *, options: 'Option') -> 'Data_Timestamps': # pylint: disable=unused-argument """Read TCP timestamps option. Structure of TCP timestamp option [:rfc:`7323`]: .. code-block:: text +-------+-------+---------------------+---------------------+ |Kind=8 | 10 | TS Value (TSval) |TS Echo Reply (TSecr)| +-------+-------+---------------------+---------------------+ 1 1 4 4 Arguments: schema: parsed option schema options: extracted TCP options Returns: Parsed option data. Raises: ProtocolError: If length is **NOT** ``10``. """ if schema.length != 10: raise ProtocolError(f'{self.alias}: [OptNo {schema.kind}] invalid format') data = Data_Timestamps( kind=schema.kind, length=schema.length, timestamp=schema.value, echo=schema.reply, ) return data
[docs] def _read_mode_poc(self, schema: 'Schema_PartialOrderConnectionPermitted', *, options: 'Option') -> 'Data_PartialOrderConnectionPermitted': # pylint: disable=unused-argument """Read TCP partial order connection service profile option. Structure of TCP ``POC-Permitted`` option [:rfc:`1693`][:rfc:`6247`]: .. code-block:: text +-----------+-------------+ | Kind=9 | Length=2 | +-----------+-------------+ Arguments: schema: parsed option schema options: extracted TCP options Returns: Parsed option data. Raises: ProtocolError: If length is **NOT** ``2``. """ if schema.length != 2: raise ProtocolError(f'{self.alias}: [OptNo {schema.kind}] invalid format') return Data_PartialOrderConnectionPermitted( kind=schema.kind, length=schema.length, )
[docs] def _read_mode_pocsp(self, schema: 'Schema_PartialOrderServiceProfile', *, options: 'Option') -> 'Data_PartialOrderServiceProfile': # pylint: disable=unused-argument """Read TCP partial order connection service profile option. Structure of TCP ``POC-SP`` option [:rfc:`1693`][:rfc:`6247`]: .. code-block:: text 1 bit 1 bit 6 bits +----------+----------+------------+----------+--------+ | Kind=10 | Length=3 | Start_flag | End_flag | Filler | +----------+----------+------------+----------+--------+ Arguments: schema: parsed option schema options: extracted TCP options Returns: Parsed option data. Raises: ProtocolError: If length is **NOT** ``3``. """ if schema.length != 3: raise ProtocolError(f'{self.alias}: [OptNo {schema.kind}] invalid format') data = Data_PartialOrderServiceProfile( kind=schema.kind, length=schema.length, start=bool(schema.profile['start']), end=bool(schema.profile['end']), ) return data
[docs] def _read_mode_cc(self, schema: 'Schema_CC', *, options: 'Option') -> 'Data_CC': # pylint: disable=unused-argument """Read TCP connection count option. Structure of TCP ``CC`` option [:rfc:`1644`]: .. code-block:: text +--------+--------+--------+--------+--------+--------+ |00001011|00000110| Connection Count: SEG.CC | +--------+--------+--------+--------+--------+--------+ Kind=11 Length=6 Arguments: schema: parsed option schema options: extracted TCP options Returns: Parsed option data. Raises: ProtocolError: If length is **NOT** ``6``. """ if schema.length != 6: raise ProtocolError(f'{self.alias}: [OptNo {schema.kind}] invalid format') data = Data_CC( kind=schema.kind, length=schema.length, cc=schema.count, ) return data
[docs] def _read_mode_ccnew(self, schema: 'Schema_CCNew', *, options: 'Option') -> 'Data_CCNew': # pylint: disable=unused-argument """Read TCP connection count (new) option. Structure of TCP ``CC.NEW`` option [:rfc:`1644`]: .. code-block:: text +--------+--------+--------+--------+--------+--------+ |00001100|00000110| Connection Count: SEG.CC | +--------+--------+--------+--------+--------+--------+ Kind=12 Length=6 Arguments: schema: parsed option schema options: extracted TCP options Returns: Parsed option data. Raises: ProtocolError: If length is **NOT** ``6``. """ if schema.length != 6: raise ProtocolError(f'{self.alias}: [OptNo {schema.kind}] invalid format') data = Data_CCNew( kind=schema.kind, length=schema.length, cc=schema.count, ) return data
[docs] def _read_mode_ccecho(self, schema: 'Schema_CCEcho', *, options: 'Option') -> 'Data_CCEcho': # pylint: disable=unused-argument """Read TCP connection count (echo) option. Structure of TCP ``CC.ECHO`` option [:rfc:`1644`]: .. code-block:: text +--------+--------+--------+--------+--------+--------+ |00001101|00000110| Connection Count: SEG.CC | +--------+--------+--------+--------+--------+--------+ Kind=13 Length=6 Arguments: schema: parsed option schema options: extracted TCP options Returns: Parsed option data. Raises: ProtocolError: If length is **NOT** ``6``. """ if schema.length != 6: raise ProtocolError(f'{self.alias}: [OptNo {schema.kind}] invalid format') data = Data_CCEcho( kind=schema.kind, length=schema.length, cc=schema.count, ) return data
[docs] def _read_mode_chkreq(self, schema: 'Schema_AlternateChecksumRequest', *, options: 'Option') -> 'Data_AlternateChecksumRequest': # pylint: disable=unused-argument """Read TCP Alternate Checksum Request option. Structure of TCP ``CHKSUM-REQ`` [:rfc:`1146`][:rfc:`6247`]: .. code-block:: text +----------+----------+----------+ | Kind=14 | Length=3 | chksum | +----------+----------+----------+ Arguments: schema: parsed option schema options: extracted TCP options Returns: Parsed option data. Raises: ProtocolError: If length is **NOT** ``3``. """ if schema.length != 3: raise ProtocolError(f'{self.alias}: [OptNo {schema.kind}] invalid format') data = Data_AlternateChecksumRequest( kind=schema.kind, length=schema.length, chksum=schema.algorithm, ) return data
[docs] def _read_mode_chksum(self, schema: 'Schema_AlternateChecksumData', *, options: 'Option') -> 'Data_AlternateChecksumData': # pylint: disable=unused-argument """Read Alternate Checksum Data option. Structure of TCP ``CHKSUM`` [:rfc:`1146`][:rfc:`6247`]: .. code-block:: text +---------+---------+---------+ +---------+ | Kind=15 |Length=N | data | ... | data | +---------+---------+---------+ +---------+ Arguments: schema: parsed option schema options: extracted TCP options Returns: Parsed option data. """ data = Data_AlternateChecksumData( kind=schema.kind, length=schema.length, data=schema.data, ) return data
[docs] def _read_mode_sig(self, schema: 'Schema_MD5Signature', *, options: 'Option') -> 'Data_MD5Signature': # pylint: disable=unused-argument """Read MD5 Signature option. Structure of TCP ``SIG`` option [:rfc:`2385`]: .. code-block:: text +---------+---------+-------------------+ | Kind=19 |Length=18| MD5 digest... | +---------+---------+-------------------+ | | +---------------------------------------+ | | +---------------------------------------+ | | +-------------------+-------------------+ | | +-------------------+ Arguments: schema: parsed option schema options: extracted TCP options Returns: Parsed option data. Raises: ProtocolError: If length is **NOT** ``18``. """ if schema.length != 18: raise ProtocolError(f'{self.alias}: [OptNo {schema.kind}] invalid format') data = Data_MD5Signature( kind=schema.kind, length=schema.length, digest=schema.digest, ) return data
[docs] def _read_mode_qs(self, schema: 'Schema_QuickStartResponse', *, options: 'Option') -> 'Data_QuickStartResponse': # pylint: disable=unused-argument """Read Quick-Start Response option. Structure of TCP ``QSopt`` [:rfc:`4782`]: .. code-block:: text 0 1 2 3 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | Kind | Length=8 | Resv. | Rate | TTL Diff | | | | |Request| | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | QS Nonce | R | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ Arguments: schema: parsed option schema options: extracted TCP options Returns: Parsed option data. Raises: ProtocolError: If length is **NOT** ``8``. """ size = self._read_unpack(1) if schema.length != 8: raise ProtocolError(f'{self.alias}: [OptNo {schema.kind}] invalid format') rate = schema.flags['rate'] data = Data_QuickStartResponse( kind=schema.kind, length=schema.length, req_rate=40000 * (2 ** rate) / 1000 if rate > 0 else 0, ttl_diff=schema.diff, nonce=schema.nonce['nonce'], ) return data
[docs] def _read_mode_timeout(self, schema: 'Schema_UserTimeout', *, options: 'Option') -> 'Data_UserTimeout': # pylint: disable=unused-argument """Read User Timeout option. Structure of TCP ``TIMEOUT`` [:rfc:`5482`]: .. code-block:: text 0 1 2 3 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | Kind = 28 | Length = 4 |G| User Timeout | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ Arguments: schema: parsed option schema options: extracted TCP options Returns: Parsed option data. Raises: ProtocolError: If length is **NOT** ``4``. """ if schema.length != 4: raise ProtocolError(f'{self.alias}: [OptNo {schema.kind}] invalid format') if schema.info['granularity'] == 1: time = datetime.timedelta(minutes=schema.info['timeout']) else: time = datetime.timedelta(seconds=schema.info['timeout']) data = Data_UserTimeout( kind=schema.kind, length=schema.length, timeout=time, ) return data
[docs] def _read_mode_ao(self, schema: 'Schema_Authentication', *, options: 'Option') -> 'Data_Authentication': # pylint: disable=unused-argument """Read Authentication option. Structure of TCP ``AOopt`` [:rfc:`5925`]: .. code-block:: text +------------+------------+------------+------------+ | Kind=29 | Length | KeyID | RNextKeyID | +------------+------------+------------+------------+ | MAC ... +-----------------------------------... ...-----------------+ ... MAC (con't) | ...-----------------+ Arguments: schema: parsed option schema options: extracted TCP options Returns: Parsed option data. Raises: ProtocolError: If length is **NOT** larger than or equal to ``4``. """ if schema.length < 4: raise ProtocolError(f'{self.alias}: [OptNo {schema.kind}] invalid format') data = Data_Authentication( kind=schema.kind, length=schema.length, key_id=schema.key_id, next_key_id=schema.next_key_id, mac=schema.mac, ) return data
[docs] def _read_mode_mp(self, schema: 'Schema_MPTCP', *, options: 'Option') -> 'Data_MPTCP': # pylint: disable=unused-argument """Read Multipath TCP option. Structure of ``MP-TCP`` [:rfc:`6824`]: .. code-block:: text 1 2 3 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 +---------------+---------------+-------+-----------------------+ | Kind | Length |Subtype| | +---------------+---------------+-------+ | | Subtype-specific data | | (variable length) | +---------------------------------------------------------------+ Arguments: schema: parsed option schema options: extracted TCP options Returns: Parsed option data. """ subtype = schema.subtype name = self.__mp_option__[subtype] if isinstance(name, str): meth_name = f'_read_mptcp_{name}' meth = cast('MPOptionParser', getattr(self, meth_name, self._read_mptcp_unknown)) else: meth = name[0] data = meth(schema, options=options) return data
[docs] def _read_mptcp_unknown(self, schema: 'Schema_MPTCPUnknown', *, options: 'Option') -> 'Data_MPTCPUnknown': # pylint: disable=unused-argument """Read unknown MPTCP subtype. Arguments: schema: parsed option schema options: extracted TCP options Returns: Parsed option data. """ data = Data_MPTCPUnknown( kind=Enum_Option.Multipath_TCP, # type: ignore[arg-type] length=schema.length, subtype=schema.subtype, data=schema.test['data'].to_bytes(1, 'big', signed=False) + schema.data, ) return data
[docs] def _read_mptcp_capable(self, schema: 'Schema_MPTCPCapable', *, options: 'Option') -> 'Data_MPTCPCapable': # pylint: disable=unused-argument """Read Multipath Capable option. Structure of ``MP_CAPABLE`` [:rfc:`6824`]: .. code-block:: text 1 2 3 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 +---------------+---------------+-------+-------+---------------+ | Kind | Length |Subtype|Version|A|B|C|D|E|F|G|H| +---------------+---------------+-------+-------+---------------+ | Option Sender's Key (64 bits) | | | | | +---------------------------------------------------------------+ | Option Receiver's Key (64 bits) | | (if option Length == 20) | | | +---------------------------------------------------------------+ Arguments: schema: parsed option schema options: extracted TCP options Returns: Parsed option data. Raises: ProtocolError: If length is **NOT** ``20`` or ``32``. """ if schema.length not in (20, 32): raise ProtocolError(f'{self.alias}: [OptNo {schema.kind}] invalid format') data = Data_MPTCPCapable( kind=Enum_Option.Multipath_TCP, # type: ignore[arg-type] length=schema.length, subtype=schema.subtype, version=schema.test['version'], flags=Data_MPTCPCapableFlag( req=bool(schema.flags['req']), ext=bool(schema.flags['ext']), hsa=bool(schema.flags['hsa']), ), skey=schema.skey, rkey=schema.rkey if schema.length == 32 else None, ) return data
[docs] def _read_mptcp_join(self, schema: 'Schema_MPTCPJoin', *, options: 'Option') -> 'Data_MPTCPJoin': """Read Join Connection option. Arguments: schema: parsed option schema options: extracted TCP options Returns: Parsed option data. Raises: ProtocolError: If the option is not given on a valid SYN/ACK packet. """ if Enum_Flags.SYN in self._flags and Enum_Flags.ACK not in self._flags: # MP_JOIN-SYN return self._read_join_syn(schema, options=options) # type: ignore[arg-type] if Enum_Flags.SYN in self._flags and Enum_Flags.ACK in self._flags: # MP_JOIN-SYN/ACK return self._read_join_synack(schema, options=options) # type: ignore[arg-type] if Enum_Flags.SYN not in self._flags and Enum_Flags.ACK in self._flags: # MP_JOIN-ACK return self._read_join_ack(schema, options=options) # type: ignore[arg-type] raise ProtocolError(f'{self.alias}: : [OptNo {schema.kind}] {schema.subtype}: invalid flags combination')
[docs] def _read_join_syn(self, schema: 'Schema_MPTCPJoinSYN', *, options: 'Option') -> 'Data_MPTCPJoinSYN': # pylint: disable=unused-argument """Read Join Connection option for Initial SYN. Structure of ``MP_JOIN-SYN`` [:rfc:`6824`]: .. code-block:: text 1 2 3 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 +---------------+---------------+-------+-----+-+---------------+ | Kind | Length = 12 |Subtype| |B| Address ID | +---------------+---------------+-------+-----+-+---------------+ | Receiver's Token (32 bits) | +---------------------------------------------------------------+ | Sender's Random Number (32 bits) | +---------------------------------------------------------------+ Arguments: schema: parsed option schema options: extracted TCP options Returns: Parsed option data. Raises: ProtocolError: If length is **NOT** ``12``. """ if schema.length != 12: raise ProtocolError(f'{self.alias}: [OptNo {schema.kind}] invalid format') data = Data_MPTCPJoinSYN( kind=Enum_Option.Multipath_TCP, # type: ignore[arg-type] length=schema.length, subtype=schema.subtype, connection=Enum_Flags.SYN, # type: ignore[arg-type] backup=bool(schema.test['backup']), addr_id=schema.addr_id, token=schema.token, nonce=schema.nonce, ) return data
[docs] def _read_join_synack(self, schema: 'Schema_MPTCPJoinSYNACK', options: 'Option') -> 'Data_MPTCPJoinSYNACK': # pylint: disable=unused-argument """Read Join Connection option for Responding SYN/ACK. Structure of ``MP_JOIN-SYN/ACK`` [:rfc:`6824`]: .. code-block:: text 1 2 3 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 +---------------+---------------+-------+-----+-+---------------+ | Kind | Length = 16 |Subtype| |B| Address ID | +---------------+---------------+-------+-----+-+---------------+ | | | Sender's Truncated HMAC (64 bits) | | | +---------------------------------------------------------------+ | Sender's Random Number (32 bits) | +---------------------------------------------------------------+ Arguments: schema: parsed option schema options: extracted TCP options Returns: Parsed option data. Raises: ProtocolError: If length is **NOT** ``20``. """ if schema.length != 20: raise ProtocolError(f'{self.alias}: [OptNo {schema.kind}] invalid format') data = Data_MPTCPJoinSYNACK( kind=Enum_Option.Multipath_TCP, # type: ignore[arg-type] length=schema.length, subtype=schema.subtype, connection=Enum_Flags.SYN | Enum_Flags.ACK, # type: ignore[arg-type] backup=bool(schema.test['backup']), addr_id=schema.addr_id, hmac=schema.hmac, nonce=schema.nonce, ) return data
[docs] def _read_join_ack(self, schema: 'Schema_MPTCPJoinACK', *, options: 'Option') -> 'Data_MPTCPJoinACK': # pylint: disable=unused-argument """Read Join Connection option for Third ACK. Structure of ``MP_JOIN-ACK`` [:rfc:`6824`]: .. code-block:: text 1 2 3 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 +---------------+---------------+-------+-----------------------+ | Kind | Length = 24 |Subtype| (reserved) | +---------------+---------------+-------+-----------------------+ | | | | | Sender's HMAC (160 bits) | | | | | +---------------------------------------------------------------+ Arguments: schema: parsed option schema options: extracted TCP options Returns: Parsed option data. Raises: ProtocolError: If length is **NOT** ``24``. """ if schema.length != 24: raise ProtocolError(f'{self.alias}: [OptNo {schema.kind}] invalid format') data = Data_MPTCPJoinACK( kind=Enum_Option.Multipath_TCP, # type: ignore[arg-type] length=schema.length, subtype=schema.subtype, connection=Enum_Flags.ACK, # type: ignore[arg-type] hmac=schema.hmac, ) return data
[docs] def _read_mptcp_dss(self, schema: 'Schema_MPTCPDSS', *, options: 'Option') -> 'Data_MPTCPDSS': # pylint: disable=unused-argument """Read Data Sequence Signal (Data ACK and Data Sequence Mapping) option. Structure of ``DSS`` [:rfc:`6824`]: .. code-block:: text 1 2 3 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 +---------------+---------------+-------+----------------------+ | Kind | Length |Subtype| (reserved) |F|m|M|a|A| +---------------+---------------+-------+----------------------+ | | | Data ACK (4 or 8 octets, depending on flags) | | | +--------------------------------------------------------------+ | | | Data sequence number (4 or 8 octets, depending on flags) | | | +--------------------------------------------------------------+ | Subflow Sequence Number (4 octets) | +-------------------------------+------------------------------+ | Data-Level Length (2 octets) | Checksum (2 octets) | +-------------------------------+------------------------------+ Arguments: schema: parsed option schema options: extracted TCP options Returns: Parsed option data. """ data = Data_MPTCPDSS( kind=Enum_Option.Multipath_TCP, # type: ignore[arg-type] length=schema.length, subtype=schema.subtype, data_fin=bool(schema.flags['F']), ack=schema.ack, dsn=schema.dsn, ssn=schema.ssn, dl_len=schema.dl_len, checksum=schema.checksum, ) return data
[docs] def _read_mptcp_addaddr(self, schema: 'Schema_MPTCPAddAddress', *, options: 'Option') -> 'Data_MPTCPAddAddress': # pylint: disable=unused-argument """Read Add Address option. Structure of ``ADD_ADDR`` [:rfc:`6824`]: .. code-block:: text 1 2 3 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 +---------------+---------------+-------+-------+---------------+ | Kind | Length |Subtype| IPVer | Address ID | +---------------+---------------+-------+-------+---------------+ | Address (TCP - 4 octets / IPv6 - 16 octets) | +-------------------------------+-------------------------------+ | Port (2 octets, optional) | +-------------------------------+ Arguments: schema: parsed option schema options: extracted TCP options Returns: Parsed option data. Raises: ProtocolError: Invalid IP version and/or addresses. """ if schema.test['version'] not in (4, 6): raise ProtocolError(f'{self.alias}: [OptNo {schema.kind}] invalid IP version') data = Data_MPTCPAddAddress( kind=Enum_Option.Multipath_TCP, # type: ignore[arg-type] length=schema.length, subtype=schema.subtype, version=schema.test['version'], addr_id=schema.addr_id, addr=schema.address, port=schema.port, ) return data
[docs] def _read_mptcp_remove(self, schema: 'Schema_MPTCPRemoveAddress', *, options: 'Option') -> 'Data_MPTCPRemoveAddress': # pylint: disable=unused-argument """Read Remove Address option. Structure of ``REMOVE_ADDR`` [:rfc:`6824`]: .. code-block:: text 1 2 3 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 +---------------+---------------+-------+-------+---------------+ | Kind | Length = 3+n |Subtype|(resvd)| Address ID | ... +---------------+---------------+-------+-------+---------------+ (followed by n-1 Address IDs, if required) Arguments: schema: parsed option schema options: extracted TCP options Returns: Parsed option data. Raises: ProtocolError: If the length is smaller than **3**. """ if schema.length < 3: raise ProtocolError(f'{self.alias}: [OptNo {schema.kind}] invalid format') data = Data_MPTCPRemoveAddress( kind=Enum_Option.Multipath_TCP, # type: ignore[arg-type] length=schema.length, subtype=schema.subtype, addr_id=tuple(schema.addr_id), ) return data
[docs] def _read_mptcp_prio(self, schema: 'Schema_MPTCPPriority', *, options: 'Option') -> 'Data_MPTCPPriority': # pylint: disable=unused-argument """Read Change Subflow Priority option. Structure of ``MP_PRIO`` [RFC 6824]: .. code-block:: text 1 2 3 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 +---------------+---------------+-------+-----+-+--------------+ | Kind | Length |Subtype| |B| AddrID (opt) | +---------------+---------------+-------+-----+-+--------------+ Arguments: schema: parsed option schema options: extracted TCP options Returns: Parsed option data. Raises: ProtocolError: If the length is smaller than **3**. """ if schema.length not in (3, 4): raise ProtocolError(f'{self.alias}: [OptNo {schema.kind}] invalid format') data = Data_MPTCPPriority( kind=Enum_Option.Multipath_TCP, # type: ignore[arg-type] length=schema.length, subtype=schema.subtype, backup=bool(schema.test['backup']), addr_id=schema.addr_id, ) return data
[docs] def _read_mptcp_fail(self, schema: 'Schema_MPTCPFallback', *, options: 'Option') -> 'Data_MPTCPFallback': # pylint: disable=unused-argument """Read Fallback option. Structure of ``MP_FAIL`` [:rfc:`6824`]: .. code-block:: text 1 2 3 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 +---------------+---------------+-------+----------------------+ | Kind | Length=12 |Subtype| (reserved) | +---------------+---------------+-------+----------------------+ | | | Data Sequence Number (8 octets) | | | +--------------------------------------------------------------+ Arguments: schema: parsed option schema options: extracted TCP options Returns: Parsed option data. Raises: ProtocolError: If the length is **NOT** 12. """ if schema.length != 12: raise ProtocolError(f'{self.alias}: [OptNo {schema.kind}] invalid format') data = Data_MPTCPFallback( kind=Enum_Option.Multipath_TCP, # type: ignore[arg-type] length=schema.length, subtype=schema.subtype, dsn=schema.dsn, ) return data
[docs] def _read_mptcp_fastclose(self, schema: 'Schema_MPTCPFastclose', options: 'Option') -> 'Data_MPTCPFastclose': # pylint: disable=unused-argument """Read Fast Close option. Structure of ``MP_FASTCLOSE`` [RFC 6824]: .. code-block:: text 1 2 3 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 +---------------+---------------+-------+-----------------------+ | Kind | Length |Subtype| (reserved) | +---------------+---------------+-------+-----------------------+ | Option Receiver's Key | | (64 bits) | | | +---------------------------------------------------------------+ Arguments: schema: parsed option schema options: extracted TCP options Returns: Parsed option data. Raises: ProtocolError: If the length is **NOT** 16. """ if schema.length != 16: raise ProtocolError(f'{self.alias}: [OptNo {schema.kind}] invalid format') data = Data_MPTCPFastclose( kind=Enum_Option.Multipath_TCP, # type: ignore[arg-type] length=schema.length, subtype=schema.subtype, rkey=schema.key, ) return data
[docs] def _read_mode_fastopen(self, schema: 'Schema_FastOpenCookie', *, options: 'Option') -> 'Data_FastOpenCookie': # pylint: disable=unused-argument """Read Fast Open option. Structure of TCP ``FASTOPEN`` [:rfc:`7413`]: .. code-block:: text +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | Kind | Length | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | | ~ Cookie ~ | | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ Arguments: schema: parsed option schema options: extracted TCP options Returns: Parsed option data. Raises: ProtocolError: If length is **NOT** valid. """ if not (6 <= schema.length <= 18 or schema.length == 2) and schema.length % 2 != 0: raise ProtocolError(f'{self.alias}: [OptNo {schema.kind}] invalid format') data = Data_FastOpenCookie( kind=schema.kind, length=schema.length, cookie=schema.cookie, ) return data
[docs] def _make_tcp_options(self, options: 'list[Schema_Option | tuple[Enum_Option, dict[str, Any]] | bytes] | Option') -> 'tuple[list[Schema_Option | bytes], int]': """Make options for TCP. Args: options: TCP options Returns: Tuple of options and total length of options. """ total_length = 0 if isinstance(options, list): options_list = [] # type: list[Schema_Option | bytes] for schema in options: if isinstance(schema, bytes): code = Enum_Option.get(schema[0]) if code in (Enum_Option.No_Operation, Enum_Option.End_of_Option_List): # ignore padding options by default continue data = schema # type: Schema_Option | bytes data_len = len(data) elif isinstance(schema, Schema): code = schema.type if code in (Enum_Option.No_Operation, Enum_Option.End_of_Option_List): # ignore padding options by default continue data = schema data_len = len(schema.pack()) else: code, args = cast('tuple[Enum_Option, dict[str, Any]]', schema) if code in (Enum_Option.No_Operation, Enum_Option.End_of_Option_List): # ignore padding options by default continue name = self.__option__[code] if isinstance(name, str): meth_name = f'_make_mode_{name}' meth = cast('OptionConstructor', getattr(self, meth_name, self._make_mode_donone)) else: meth = name[1] data = meth(code, **args) data_len = len(data.pack()) options_list.append(data) total_length += data_len # force alignment to 32-bit boundary if data_len % 4: pad_len = 4 - (data_len % 4) pad_opt = self._make_mode_nop(Enum_Option.No_Operation) # type: ignore[arg-type] total_length += pad_len for _ in range(pad_len - 1): options_list.append(pad_opt) options_list.append(self._make_mode_eool(Enum_Option.End_of_Option_List)) # type: ignore[arg-type] return options_list, total_length options_list = [] for code, option in options.items(multi=True): # ignore padding options by default if code in (Enum_Option.No_Operation, Enum_Option.End_of_Option_List): continue name = self.__option__[code] if isinstance(name, str): meth_name = f'_make_mode_{name}' meth = cast('OptionConstructor', getattr(self, meth_name, self._make_mode_donone)) else: meth = name[1] data = meth(code, option) data_len = len(data.pack()) options_list.append(data) total_length += data_len # force alignment to 32-bit boundary if data_len % 4: pad_len = 4 - (data_len % 4) pad_opt = self._make_mode_nop(Enum_Option.No_Operation) # type: ignore[arg-type] total_length += pad_len for _ in range(pad_len - 1): options_list.append(pad_opt) options_list.append(self._make_mode_eool(Enum_Option.End_of_Option_List)) # type: ignore[arg-type] return options_list, total_length
[docs] def _make_mode_donone(self, code: 'Enum_Option', opt: 'Optional[Data_UnassignedOption]' = None, *, data: 'bytes' = b'', **kwargs: 'Any') -> 'Schema_UnassignedOption': """Make TCP unassigned option. Args: code: option code opt: option data data: option payload in :obj:`bytes` **kwargs: arbitrary keyword arguments Returns: Constructed option schema. """ if opt is not None: data = opt.data return Schema_UnassignedOption( kind=code, length=len(data) + 2, data=data, )
[docs] def _make_mode_eool(self, code: 'Enum_Option', opt: 'Optional[Data_EndOfOptionList]' = None, **kwargs: 'Any') -> 'Schema_EndOfOptionList': """Make TCP End of Option List option. Args: code: option code opt: option data **kwargs: arbitrary keyword arguments Returns: Constructed option schema. """ return Schema_EndOfOptionList( kind=code, length=1, )
[docs] def _make_mode_nop(self, code: 'Enum_Option', opt: 'Optional[Data_NoOperation]' = None, **kwargs: 'Any') -> 'Schema_NoOperation': """Make TCP NoOperation option. Args: code: option code opt: option data **kwargs: arbitrary keyword arguments Returns: Constructed option schema. """ return Schema_NoOperation( kind=code, length=1, )
[docs] def _make_mode_mss(self, code: 'Enum_Option', opt: 'Optional[Data_MaximumSegmentSize]' = None, *, mss: 'int' = 65535, # reasonable default value **kwargs: 'Any') -> 'Schema_MaximumSegmentSize': """Make TCP maximum segment size option. Args: code: option code opt: option data mss: maximum segment size **kwargs: arbitrary keyword arguments Returns: Constructed option schema. """ if opt is not None: mss = opt.mss return Schema_MaximumSegmentSize( kind=code, length=4, mss=mss, )
[docs] def _make_mode_ws(self, code: 'Enum_Option', opt: 'Optional[Data_WindowScale]' = None, *, shift: 'int' = 0, # reasonable default value **kwargs: 'Any') -> 'Schema_WindowScale': """Make TCP window scale option. Args: code: option code opt: option data shift: window scale shift count **kwargs: arbitrary keyword arguments Returns: Constructed option schema. """ if opt is not None: shift = opt.shift return Schema_WindowScale( kind=code, length=3, scale=shift, )
[docs] def _make_mode_sackpmt(self, code: 'Enum_Option', opt: 'Optional[Data_SACKPermitted]' = None, **kwargs: 'Any') -> 'Schema_SACKPermitted': """Make TCP SACK permitted option. Args: code: option code opt: option data **kwargs: arbitrary keyword arguments Returns: Constructed option schema. """ return Schema_SACKPermitted( kind=code, length=2, )
[docs] def _make_mode_sack(self, code: 'Enum_Option', opt: 'Optional[Data_SACK]' = None, *, sack: 'Optional[list[tuple[int, int]]]' = None, **kwargs: 'Any') -> 'Schema_SACK': """Make TCP SACK option. Args: code: option code opt: option data sack: SACK blocks **kwargs: arbitrary keyword arguments Returns: Constructed option schema. """ if opt is not None: sack_val = [Schema_SACKBlock( left=block.left, right=block.right, ) for block in opt.sack] else: sack_val = [] if sack is not None: for left, right in sack: sack_val.append(Schema_SACKBlock( left=left, right=right, )) return Schema_SACK( kind=code, length=2 + (len(sack_val) * 8), sack=sack_val, )
[docs] def _make_mode_echo(self, code: 'Enum_Option', opt: 'Optional[Data_Echo]' = None, *, data: 'bytes' = b'\x00\x00\x00\x00', **kwargs: 'Any') -> 'Schema_Echo': """Make TCP echo option. Args: code: option code opt: option data data: 4 bytes of info to be echoed **kwargs: arbitrary keyword arguments Returns: Constructed option schema. """ if opt is not None: data = opt.data return Schema_Echo( kind=code, length=6, data=data, )
[docs] def _make_mode_echore(self, code: 'Enum_Option', opt: 'Optional[Data_EchoReply]' = None, *, data: 'bytes' = b'\x00\x00\x00\x00', **kwargs: 'Any') -> 'Schema_EchoReply': """Make TCP echo reply option. Args: code: option code opt: option data data: 4 bytes of echoed info **kwargs: arbitrary keyword arguments Returns: Constructed option schema. """ if opt is not None: data = opt.data return Schema_EchoReply( kind=code, length=6, data=data, )
[docs] def _make_mode_ts(self, code: 'Enum_Option', opt: 'Optional[Data_Timestamps]' = None, *, tsval: 'int' = 0, tsecr: 'int' = 0, **kwargs: 'Any') -> 'Schema_Timestamps': """Make TCP timestamps option. Args: code: option code opt: option data tsval: timestamp value tsecr: timestamp echo reply **kwargs: arbitrary keyword arguments Returns: Constructed option schema. """ if opt is not None: tsval = opt.timestamp tsecr = opt.echo return Schema_Timestamps( kind=code, length=10, value=tsval, reply=tsecr, )
[docs] def _make_mode_poc(self, code: 'Enum_Option', opt: 'Optional[Data_PartialOrderConnectionPermitted]' = None, **kwargs: 'Any') -> 'Schema_PartialOrderConnectionPermitted': """Make TCP partial order connection option. Args: code: option code opt: option data **kwargs: arbitrary keyword arguments Returns: Constructed option schema. """ return Schema_PartialOrderConnectionPermitted( kind=code, length=2, )
[docs] def _make_mode_pocsp(self, code: 'Enum_Option', opt: 'Optional[Data_PartialOrderServiceProfile]' = None, *, start: 'bool' = False, end: 'bool' = False, **kwargs: 'Any') -> 'Schema_PartialOrderServiceProfile': """Make TCP partial order connection service profile option. Args: code: option code opt: option data start: start partial order connection end: end partial order connection **kwargs: arbitrary keyword arguments Returns: Constructed option schema. """ if opt is not None: start = opt.start end = opt.end return Schema_PartialOrderServiceProfile( kind=code, length=3, profile={ 'start': start, 'end': end, }, )
[docs] def _make_mode_cc(self, code: 'Enum_Option', opt: 'Optional[Data_CC]' = None, *, count: 'int' = 0, **kwargs: 'Any') -> 'Schema_CC': """Make TCP connection count option. Args: code: option code opt: option data count: connection count **kwargs: arbitrary keyword arguments Returns: Constructed option schema. """ if opt is not None: count = opt.cc return Schema_CC( kind=code, length=6, count=count, )
[docs] def _make_mode_ccnew(self, code: 'Enum_Option', opt: 'Optional[Data_CCNew]' = None, *, count: 'int' = 0, **kwargs: 'Any') -> 'Schema_CCNew': """Make TCP connection count new option. Args: code: option code opt: option data count: connection count **kwargs: arbitrary keyword arguments Returns: Constructed option schema. """ if opt is not None: count = opt.cc return Schema_CCNew( kind=code, length=6, count=count, )
[docs] def _make_mode_ccecho(self, code: 'Enum_Option', opt: 'Optional[Data_CCEcho]' = None, *, count: 'int' = 0, **kwargs: 'Any') -> 'Schema_CCEcho': """Make TCP connection count echo option. Args: code: option code opt: option data count: connection count **kwargs: arbitrary keyword arguments Returns: Constructed option schema. """ if opt is not None: count = opt.cc return Schema_CCEcho( kind=code, length=6, count=count, )
[docs] def _make_mode_chkreq(self, code: 'Enum_Option', opt: 'Optional[Data_AlternateChecksumRequest]' = None, *, algorithm: 'Enum_Checksum | StdlibEnum | AenumEnum | int | str' = Enum_Checksum.TCP_checksum, algorithm_default: 'Optional[int]' = None, algorithm_namespace: 'Optional[dict[str, int] | dict[int, str] | Type[StdlibEnum] | Type[AenumEnum]]' = None, # pylint: disable=line-too-long algorithm_reversed: 'bool' = False, **kwargs: 'Any') -> 'Schema_AlternateChecksumRequest': """Make TCP alternate checksum request option. Args: code: option code opt: option data algorithm: checksum algorithm algorithm_default: default value for checksum algorithm algorithm_namespace: namespace for checksum algorithm algorithm_reversed: reversed flag for checksum algorithm **kwargs: arbitrary keyword arguments Returns: Constructed option schema. """ if opt is not None: algorithm_val = opt.chksum else: algorithm_val = self._make_index(algorithm, algorithm_default, namespace=algorithm_namespace, # type: ignore[assignment] reversed=algorithm_reversed, pack=False) return Schema_AlternateChecksumRequest( kind=code, length=3, algorithm=algorithm_val, )
[docs] def _make_mode_chksum(self, code: 'Enum_Option', opt: 'Optional[Data_AlternateChecksumData]' = None, *, data: 'bytes' = b'', **kwargs: 'Any') -> 'Schema_AlternateChecksumData': """Make TCP alternate checksum data option. Args: code: option code opt: option data data: checksum data **kwargs: arbitrary keyword arguments Returns: Constructed option schema. """ if opt is not None: data = opt.data return Schema_AlternateChecksumData( kind=code, length=2 + len(data), data=data, )
[docs] def _make_mode_sig(self, code: 'Enum_Option', opt: 'Optional[Data_MD5Signature]' = None, *, digest: 'bytes' = bytes(16), **kwargs: 'Any') -> 'Schema_MD5Signature': """Make TCP MD5 signature option. Args: code: option code opt: option data digest: digest **kwargs: arbitrary keyword arguments Returns: Constructed option schema. """ if opt is not None: digest = opt.digest return Schema_MD5Signature( kind=code, length=18, digest=digest, )
[docs] def _make_mode_qs(self, code: 'Enum_Option', opt: 'Optional[Data_QuickStartResponse]' = None, *, rate: 'int' = 0, diff: 'timedelta | int' = 0, nonce: 'int' = 0, **kwargs: 'Any') -> 'Schema_QuickStartResponse': """Make TCP quick start response option. Args: code: option code opt: option data rate: rate (in kbps) diff: time to live (in seconds) difference nonce: nonce value **kwargs: arbitrary keyword arguments Returns: Constructed option schema. """ if opt is not None: rate = opt.req_rate diff = opt.ttl_diff nonce = opt.nonce rate_val = math.floor(math.log2(rate * 1000 / 40000)) if rate > 0 else 0 diff_val = diff if isinstance(diff, int) else math.floor(diff.total_seconds()) return Schema_QuickStartResponse( kind=code, length=8, flags={ 'rate': rate_val, }, diff=diff_val, nonce={ 'nonce': nonce, }, )
[docs] def _make_mode_timeout(self, code: 'Enum_Option', opt: 'Optional[Data_UserTimeout]' = None, *, timeout: 'timedelta | int' = 0, **kwargs: 'Any') -> 'Schema_UserTimeout': """Make TCP user timeout option. Args: code: option code opt: option data timeout: timeout value **kwargs: arbitrary keyword arguments Returns: Constructed option schema. """ if opt is not None: timeout_val = math.floor(opt.timeout.total_seconds()) else: timeout_val = timeout if isinstance(timeout, int) else math.floor(timeout.total_seconds()) granularity = timeout_val.bit_length() > 15 timeout_val = math.floor(timeout_val / 60) if granularity else timeout_val if timeout_val.bit_length() > 15: raise ProtocolError(f'TCP: [OptNo {code}] timeout value too large: {timeout}') return Schema_UserTimeout( kind=code, length=3, info={ 'granularity': granularity, 'timeout': timeout_val, }, )
[docs] def _make_mode_ao(self, code: 'Enum_Option', opt: 'Optional[Data_Authentication]' = None, *, key_id: 'int' = 0, next_key_id: 'int' = 0, mac: 'bytes' = b'', **kwargs: 'Any') -> 'Schema_Authentication': """Make TCP authentication option. Args: code: option code opt: option data key_id: key ID next_key_id: next key ID mac: MAC value **kwargs: arbitrary keyword arguments Returns: Constructed option schema. """ if opt is not None: key_id = opt.key_id next_key_id = opt.next_key_id mac = opt.mac return Schema_Authentication( kind=code, length=4 + len(mac), key_id=key_id, next_key_id=next_key_id, mac=mac, )
[docs] def _make_mode_mp(self, code: 'Enum_Option', opt: 'Optional[Data_MPTCP]' = None, *, subtype: 'Enum_MPTCPOption | StdlibEnum | AenumEnum | int | str' = Enum_MPTCPOption.MP_CAPABLE, subtype_default: 'Optional[int]' = None, subtype_namespace: 'Optional[dict[str, int] | dict[int, str] | Type[StdlibEnum] | Type[AenumEnum]]' = None, # pylint: disable=line-too-long subtype_reversed: 'bool' = False, **kwargs: 'Any') -> 'Schema_MPTCP': """Make multipath TCP option. Args: code: option code opt: option data subtype: MPTCP subtype subtype_default: default value for MPTCP subtype subtype_namespace: namespace for MPTCP subtype subtype_reversed: reversed flag for MPTCP subtype **kwargs: arbitrary keyword arguments Returns: Constructed option schema. """ if opt is not None: subtype_val = opt.subtype else: subtype_val = self._make_index(subtype, subtype_default, namespace=subtype_namespace, # type: ignore[assignment] reversed=subtype_reversed, pack=False) name = self.__mp_option__[subtype_val] if isinstance(name, str): meth_name = f'_make_mptcp_{name}' meth = cast('MPOptionConstructor', getattr(self, meth_name, self._make_mptcp_unknown)) else: meth = name[1] schema = meth(subtype_val, opt, **kwargs) return schema
[docs] def _make_mptcp_unknown(self, subtype: 'Enum_MPTCPOption', opt: 'Optional[Data_MPTCPUnknown]' = None, *, data: 'bytes' = b'\x00', **kwargs: 'Any') -> 'Schema_MPTCPUnknown': """Make unknown multipath TCP option. Args: subtype: MPTCP subtype opt: option data data: option payload data **kwargs: arbitrary keyword arguments Returns: Constructed option schema. """ if opt is not None: data = opt.data return Schema_MPTCPUnknown( kind=Enum_Option.MPTCP, length=2 + len(data), test={ 'subtype': subtype.value, 'data': data[0] & 0x0F if data else 0, }, data=data[1:], )
[docs] def _make_mptcp_capable(self, subtype: 'Enum_MPTCPOption', opt: 'Optional[Data_MPTCPCapable]' = None, *, version: 'int' = 0, flag_req: 'bool' = False, flag_ext: 'bool' = False, flag_hsa: 'bool' = False, skey: 'int' = 0, rkey: 'Optional[int]' = 0, **kwargs: 'Any') -> 'Schema_MPTCPCapable': """Make multipath TCP capable option. Args: subtype: MPTCP subtype opt: option data version: MPTCP version flag_req: checksum required flag flag_ext: extensability flag flag_hsa: use of HMAC-SHA1 flag skey: option sender's key rkey: option receiver's key **kwargs: arbitrary keyword arguments Returns: Constructed option schema. """ if opt is not None: version = opt.version flag_req = opt.flags.req flag_ext = opt.flags.ext flag_hsa = opt.flags.hsa skey = opt.skey rkey = opt.rkey return Schema_MPTCPCapable( kind=Enum_Option.MPTCP, length=20 if rkey is None else 32, test={ 'subtype': subtype.value, 'version': version, }, flags={ 'req': flag_req, 'ext': flag_ext, 'hsa': flag_hsa, }, skey=skey, rkey=rkey, )
[docs] def _make_mptcp_join(self, subtype: 'Enum_MPTCPOption', opt: 'Optional[Data_MPTCPJoin]' = None, **kwargs: 'Any') -> 'Schema_MPTCPJoin': """Make multipath TCP join option. Args: subtype: MPTCP subtype opt: option data **kwargs: arbitrary keyword arguments Returns: Constructed option schema. """ if Enum_Flags.SYN in self._flags and Enum_Flags.ACK not in self._flags: # MP_JOIN-SYN return self._make_join_syn(subtype, opt, **kwargs) # type: ignore[arg-type] if Enum_Flags.SYN in self._flags and Enum_Flags.ACK in self._flags: # MP_JOIN-SYN/ACK return self._make_join_synack(subtype, opt, **kwargs) # type: ignore[arg-type] if Enum_Flags.SYN not in self._flags and Enum_Flags.ACK in self._flags: # MP_JOIN-ACK return self._make_join_ack(subtype, opt, **kwargs) # type: ignore[arg-type] raise ProtocolError(f'{self.alias}: : [OptNo {Enum_Option.Multipath_TCP}] {subtype}: invalid flags combination')
[docs] def _make_join_syn(self, subtype: 'Enum_MPTCPOption', opt: 'Optional[Data_MPTCPJoinSYN]' = None, *, backup: 'bool' = False, addr_id: 'int' = 0, token: 'int' = 0, nonce: 'int' = 0, **kwargs: 'Any') -> 'Schema_MPTCPJoinSYN': """Make multipath TCP join SYN option. Args: subtype: MPTCP subtype opt: option data backup: backup flag addr_id: address ID token: receiver's token nonce: sender's random number **kwargs: arbitrary keyword arguments Returns: Constructed option schema. """ if opt is not None: backup = opt.backup addr_id = opt.addr_id token = opt.token nonce = opt.nonce return Schema_MPTCPJoinSYN( kind=Enum_Option.MPTCP, length=12, test={ 'subtype': subtype.value, 'backup': backup, }, addr_id=addr_id, token=token, nonce=nonce, )
[docs] def _make_join_synack(self, subtype: 'Enum_MPTCPOption', opt: 'Optional[Data_MPTCPJoinSYNACK]' = None, *, backup: 'bool' = False, addr_id: 'int' = 0, hmac: 'bytes' = bytes(8), nonce: 'int' = 0, **kwargs: 'Any') -> 'Schema_MPTCPJoinSYNACK': """Make multipath TCP join SYN/ACK option. Args: subtype: MPTCP subtype opt: option data backup: backup flag addr_id: address ID hmac: sender's truncated HMAC nonce: sender's random number **kwargs: arbitrary keyword arguments Returns: Constructed option schema. """ if opt is not None: backup = opt.backup addr_id = opt.addr_id nonce = opt.nonce nonce = opt.nonce return Schema_MPTCPJoinSYNACK( kind=Enum_Option.MPTCP, length=12, test={ 'subtype': subtype.value, 'backup': backup, }, addr_id=addr_id, hmac=hmac, nonce=nonce, )
[docs] def _make_join_ack(self, subtype: 'Enum_MPTCPOption', opt: 'Optional[Data_MPTCPJoinACK]' = None, *, hmac: 'bytes' = bytes(20), **kwargs: 'Any') -> 'Schema_MPTCPJoinACK': """Make multipath TCP join ACK option. Args: subtype: MPTCP subtype opt: option data hmac: sender's HMAC **kwargs: arbitrary keyword arguments Returns: Constructed option schema. """ if opt is not None: hmac = opt.hmac return Schema_MPTCPJoinACK( kind=Enum_Option.MPTCP, length=8, test={ 'subtype': subtype.value, }, hmac=hmac, )
[docs] def _make_mptcp_dss(self, subtype: 'Enum_MPTCPOption', opt: 'Optional[Data_MPTCPDSS]' = None, *, data_fin: 'bool' = False, ack: 'Optional[int]' = None, dsn: 'Optional[int]' = None, ssn: 'Optional[int]' = None, dl_len: 'Optional[int]' = None, checksum: 'Optional[bytes]' = None, **kwargs: 'Any') -> 'Schema_MPTCPDSS': """Make multipath TCP DSS option. Args: subtype: MPTCP subtype opt: option data data_fin: ``DATA_FIN`` flag ack: Data ACK dsn: data sequence number ssn: subflow sequence number dl_len: data-level length checksum: checksum **kwargs: arbitrary keyword arguments Returns: Constructed option schema. """ if opt is not None: data_fin = opt.data_fin ack = opt.ack dsn = opt.dsn ssn = opt.ssn dl_len = opt.dl_len checksum = opt.checksum flag_A = ack is not None flag_a = cast('int', ack).bit_length() > 32 if flag_A else False flag_M = dsn is not None flag_m = cast('int', dsn).bit_length() > 32 if flag_M else False if flag_M and (ssn is None or dl_len is None or checksum is None): raise ProtocolError(f'{self.alias}: : [OptNo {Enum_Option.Multipath_TCP}] {subtype}: missing required fields') if not flag_M and (ssn is not None or dl_len is not None or checksum is not None): raise ProtocolError(f'{self.alias}: : [OptNo {Enum_Option.Multipath_TCP}] {subtype}: missing required fields') return Schema_MPTCPDSS( kind=Enum_Option.MPTCP, length=4 + (4 if flag_A else 0) + (4 if flag_a else 0) + (12 if flag_M else 0) + (4 if flag_m else 0), test={ 'subtype': subtype.value, }, flags={ 'F': data_fin, 'A': flag_A, 'm': flag_m, 'M': flag_M, 'a': flag_a, 'A': flag_A, }, ack=ack, dsn=dsn, ssn=ssn, dl_len=dl_len, checksum=checksum, )
[docs] def _make_mptcp_addaddr(self, subtype: 'Enum_MPTCPOption', opt: 'Optional[Data_MPTCPAddAddress]' = None, *, addr_id: 'int' = 0, addr: 'IPv4Address | IPv6Address | int | bytes | str' = '0.0.0.0', # nosec: B104 port: 'Optional[int]' = None, **kwargs: 'Any') -> 'Schema_MPTCPAddAddress': """Make multipath TCP add address option. Args: subtype: MPTCP subtype opt: option data addr_id: address ID addr: address port: port number **kwargs: arbitrary keyword arguments Returns: Constructed option schema. """ if opt is not None: addr_id = opt.addr_id addr_val = opt.addr port = opt.port else: addr_val = ipaddress.ip_address(addr) version = addr_val.version return Schema_MPTCPAddAddress( kind=Enum_Option.MPTCP, length=4 + (4 if version == 4 else 16) + (2 if port is not None else 0), test={ 'subtype': subtype.value, 'version': version, }, addr_id=addr_id, address=addr_val, port=port, )
[docs] def _make_mptcp_remove(self, subtype: 'Enum_MPTCPOption', opt: 'Optional[Data_MPTCPRemoveAddress]' = None, *, addr_id: 'Optional[list[int]]' = None, **kwargs: 'Any') -> 'Schema_MPTCPRemoveAddress': """Make multipath TCP remove address option. Args: subtype: MPTCP subtype opt: option data addr_id: address ID list **kwargs: arbitrary keyword arguments Returns: Constructed option schema. """ if opt is not None: addr_id_list = cast('list[int]', opt.addr_id) else: addr_id_list = addr_id if addr_id is not None else [] return Schema_MPTCPRemoveAddress( kind=Enum_Option.MPTCP, length=4, test={ 'subtype': subtype.value, }, addr_id=addr_id_list, )
[docs] def _make_mptcp_prio(self, subtype: 'Enum_MPTCPOption', opt: 'Optional[Data_MPTCPPriority]' = None, *, backup: 'bool' = False, addr_id: 'Optional[int]' = None, **kwargs: 'Any') -> 'Schema_MPTCPPriority': """Make multipath TCP priority option. Args: subtype: MPTCP subtype opt: option data backup: backup flag addr_id: address ID **kwargs: arbitrary keyword arguments Returns: Constructed option schema. """ if opt is not None: addr_id = opt.addr_id return Schema_MPTCPPriority( kind=Enum_Option.MPTCP, length=4, test={ 'subtype': subtype.value, 'backup': backup, }, addr_id=addr_id, )
[docs] def _make_mptcp_fail(self, subtype: 'Enum_MPTCPOption', opt: 'Optional[Data_MPTCPFallback]' = None, *, dsn: 'int' = 0, **kwargs: 'Any') -> 'Schema_MPTCPFallback': """Make multipath TCP fail option. Args: subtype: MPTCP subtype opt: option data dsn: data sequence number **kwargs: arbitrary keyword arguments Returns: Constructed option schema. """ if opt is not None: dsn = opt.dsn return Schema_MPTCPFallback( kind=Enum_Option.MPTCP, length=12, test={ 'subtype': subtype.value, }, dsn=dsn, )
[docs] def _make_mptcp_fastclose(self, subtype: 'Enum_MPTCPOption', opt: 'Optional[Data_MPTCPFastclose]' = None, *, key: 'int' = 0, **kwargs: 'Any') -> 'Schema_MPTCPFastclose': """Make multipath TCP fastclose option. Args: subtype: MPTCP subtype opt: option data key: option receiver's key **kwargs: arbitrary keyword arguments Returns: Constructed option schema. """ if opt is not None: key = opt.rkey return Schema_MPTCPFastclose( kind=Enum_Option.MPTCP, length=12, test={ 'subtype': subtype.value, }, key=key, )
[docs] def _make_mode_fastopen(self, code: 'Enum_Option', opt: 'Optional[Data_FastOpenCookie]' = None, *, cookie: 'Optional[bytes]' = None, **kwargs: 'Any') -> 'Schema_FastOpenCookie': """Make TCP Fast Open option. Args: code: option code opt: option data cookie: fast open cookie **kwargs: arbitrary keyword arguments Returns: Constructed option schema. """ if opt is not None: cookie = opt.cookie return Schema_FastOpenCookie( kind=code, length=2 + (len(cookie) if cookie is not None else 0), cookie=cookie, )