Source code for pcapkit.protocols.internet.internet

# -*- coding: utf-8 -*-
# mypy: disable-error-code=dict-item
"""Base Protocol
===================

.. module:: pcapkit.protocols.internet.internet

:mod:`pcapkit.protocols.internet.internet` contains :class:`~pcapkit.protocols.internet.internet.Internet`,
which is a base class for internet layer protocols, eg. :class:`~pcapkit.protocols.internet.ah.AH`,
:class:`~pcapkit.protocols.internet.ipsec.IPsec`, :class:`~pcapkit.protocols.internet.ipv4.IPv4`,
:class:`~pcapkit.protocols.internet.ipv6.IPv6`, :class:`~pcapkit.protocols.internet.ipx.IPX`, and etc.

"""
import collections
from typing import TYPE_CHECKING, Generic, cast

from pcapkit.const.reg.transtype import TransType as Enum_TransType
from pcapkit.corekit.module import ModuleDescriptor
from pcapkit.corekit.protochain import ProtoChain
from pcapkit.protocols.protocol import _PT, _ST
from pcapkit.protocols.protocol import ProtocolBase as Protocol
from pcapkit.utilities.decorators import beholder
from pcapkit.utilities.exceptions import RegistryError
from pcapkit.utilities.warnings import RegistryWarning, warn

if TYPE_CHECKING:
    from typing import Any, Optional, Type

    from typing_extensions import Literal

__all__ = ['Internet']


[docs] class Internet(Protocol[_PT, _ST], Generic[_PT, _ST]): # pylint: disable=abstract-method """Abstract base class for internet layer protocol family. This class currently supports parsing of the following protocols, which are registered in the :attr:`self.__proto__ <pcapkit.protocols.internet.internet.Internet.__proto__>` attribute: .. list-table:: :header-rows: 1 * - Index - Protocol * - :attr:`~pcapkit.const.reg.transtype.TransType.HOPOPT` - :class:`pcapkit.protocols.internet.hopopt.HOPOPT` * - :attr:`~pcapkit.const.reg.transtype.TransType.IPv4` - :class:`pcapkit.protocols.internet.ipv4.IPv4` * - :attr:`~pcapkit.const.reg.transtype.TransType.TCP` - :class:`pcapkit.protocols.transport.tcp.TCP` * - :attr:`~pcapkit.const.reg.transtype.TransType.UDP` - :class:`pcapkit.protocols.transport.udp.UDP` * - :attr:`~pcapkit.const.reg.transtype.TransType.IPv6` - :class:`pcapkit.protocols.internet.ipv6.IPv6` * - :attr:`~pcapkit.const.reg.transtype.TransType.IPv6_Route` - :class:`pcapkit.protocols.internet.ipv6_route.IPv6_Route` * - :attr:`~pcapkit.const.reg.transtype.TransType.IPv6_Frag` - :class:`pcapkit.protocols.internet.ipv6_frag.IPv6_Frag` * - :attr:`~pcapkit.const.reg.transtype.TransType.AH` - :class:`pcapkit.protocols.internet.ah.AH` * - :attr:`~pcapkit.const.reg.transtype.TransType.IPv6_NoNxt` - :class:`pcapkit.protocols.misc.raw.Raw` * - :attr:`~pcapkit.const.reg.transtype.TransType.IPv6_Opts` - :class:`pcapkit.protocols.internet.ipv6_opts.IPv6_Opts` * - :attr:`~pcapkit.const.reg.transtype.TransType.IPX_in_IP` - :class:`pcapkit.protocols.internet.ipx.IPX` * - :attr:`~pcapkit.const.reg.transtype.TransType.Mobility_Header` - :class:`pcapkit.protocols.internet.mh.MH` * - :attr:`~pcapkit.const.reg.transtype.TransType.HIP` - :class:`pcapkit.protocols.internet.hip.HIP` """ ########################################################################## # Defaults. ########################################################################## #: Layer of protocol. __layer__ = 'Internet' # type: Literal['Internet'] #: DefaultDict[int, ModuleDescriptor[Protocol] | Type[Protocol]]: Protocol index mapping for decoding next layer, #: c.f. :meth:`self._decode_next_layer <pcapkit.protocols.internet.internet.Internet._decode_next_layer>` #: & :meth:`self._import_next_layer <pcapkit.protocols.internet.internet.Internet._import_next_layer>`. __proto__ = collections.defaultdict( lambda: ModuleDescriptor('pcapkit.protocols.misc.raw', 'Raw'), { Enum_TransType.HOPOPT: ModuleDescriptor('pcapkit.protocols.internet.hopopt', 'HOPOPT'), Enum_TransType.IPv4: ModuleDescriptor('pcapkit.protocols.internet.ipv4', 'IPv4'), Enum_TransType.TCP: ModuleDescriptor('pcapkit.protocols.transport.tcp', 'TCP'), Enum_TransType.UDP: ModuleDescriptor('pcapkit.protocols.transport.udp', 'UDP'), Enum_TransType.IPv6: ModuleDescriptor('pcapkit.protocols.internet.ipv6', 'IPv6'), Enum_TransType.IPv6_Route: ModuleDescriptor('pcapkit.protocols.internet.ipv6_route', 'IPv6_Route'), Enum_TransType.IPv6_Frag: ModuleDescriptor('pcapkit.protocols.internet.ipv6_frag', 'IPv6_Frag'), Enum_TransType.AH: ModuleDescriptor('pcapkit.protocols.internet.ah', 'AH'), Enum_TransType.IPv6_NoNxt: ModuleDescriptor('pcapkit.protocols.misc.raw', 'Raw'), Enum_TransType.IPv6_Opts: ModuleDescriptor('pcapkit.protocols.internet.ipv6_opts', 'IPv6_Opts'), Enum_TransType.IPX_in_IP: ModuleDescriptor('pcapkit.protocols.internet.ipx', 'IPX'), Enum_TransType.Mobility_Header: ModuleDescriptor('pcapkit.protocols.internet.mh', 'MH'), Enum_TransType.HIP: ModuleDescriptor('pcapkit.protocols.internet.hip', 'HIP'), }, ) ########################################################################## # Properties. ########################################################################## # protocol layer @property def layer(self) -> 'Literal["Internet"]': """Protocol layer.""" return self.__layer__ ########################################################################## # Methods. ##########################################################################
[docs] @classmethod def register(cls, code: 'Enum_TransType', protocol: 'ModuleDescriptor[Protocol] | Type[Protocol]') -> 'None': # type: ignore[override] r"""Register a new protocol class. Notes: The full qualified class name of the new protocol class should be as ``{protocol.module}.{protocol.name}``. Arguments: code: protocol code as in :class:`~pcapkit.const.reg.transtype.TransType` protocol: module descriptor or a :class:`~pcapkit.protocols.protocol.Protocol` subclass """ if isinstance(protocol, ModuleDescriptor): protocol = protocol.klass if not issubclass(protocol, Protocol): raise RegistryError(f'protocol must be a Protocol subclass, not {protocol!r}') if code in cls.__proto__: warn(f'protocol {code} already registered, overwriting', RegistryWarning) cls.__proto__[code] = protocol
########################################################################## # Utilities. ########################################################################## def _read_protos(self, size: 'int') -> 'Enum_TransType': """Read next layer protocol type. Arguments: size: buffer size Returns: Next layer's protocol enumeration. """ _byte = self._read_unpack(size) _prot = Enum_TransType.get(_byte) return _prot
[docs] def _decode_next_layer(self, dict_: '_PT', proto: 'Optional[int]' = None, # pylint: disable=arguments-differ length: 'Optional[int]' = None, *, packet: 'Optional[dict[str, Any]]' = None, version: 'Literal[4, 6]' = 4, ipv6_exthdr: 'Optional[ProtoChain]' = None, payload: 'Optional[bytes]' = None) -> '_PT': r"""Decode next layer extractor. Arguments: dict\_: info buffer proto: next layer protocol index length: valid (*non-padding*) length packet: packet info (passed from :meth:`self.unpack <pcapkit.protocols.protocol.Protocol.unpack>`) version: IP version ipv6_exthdr: protocol chain of IPv6 extension headers payload: payload from packet. If not provided, will extract from :meth:`self.__header__.get_payload <pcapkit.protocols.schema.schema.Schema.get_payload>` Returns: Current protocol with next layer extracted. Notes: We added a new key ``__next_type__`` to ``dict_`` to store the next layer protocol type, and a new key ``__next_name__`` to store the next layer protocol name. These two keys will **NOT** be included when :meth:`Info.to_dict <pcapkit.corekit.infoclass.Info.to_dict>` is called. """ next_ = cast('Protocol', # type: ignore[redundant-cast] self._import_next_layer(proto, length, packet=packet, version=version, payload=payload)) # type: ignore[arg-type,misc,call-arg] info, chain = next_.info, next_.protochain # make next layer protocol name layer = next_.info_name # proto = next_.__class__.__name__ # write info and protocol chain into dict dict_.__update__({ layer: info, '__next_type__': type(next_), '__next_name__': layer, }) self._next = next_ # pylint: disable=attribute-defined-outside-init if ipv6_exthdr is not None: if chain is not None: chain = ipv6_exthdr + chain else: chain = ipv6_exthdr # type: ignore[unreachable] self._protos = ProtoChain(self.__class__, self.alias, basis=chain) # pylint: disable=attribute-defined-outside-init return dict_
[docs] @beholder # type: ignore[arg-type] def _import_next_layer(self, proto: 'int', length: 'Optional[int]' = None, *, # pylint: disable=arguments-differ packet: 'Optional[dict[str, Any]]' = None, version: 'Literal[4, 6]' = 4, extension: 'bool' = False, payload: 'Optional[bytes]' = None) -> 'Protocol': """Import next layer extractor. Arguments: proto: next layer protocol index length: valid (*non-padding*) length packet: packet info (passed from :meth:`self.unpack <pcapkit.protocols.protocol.Protocol.unpack>`) version: IP protocol version extension: if is extension header payload: payload from packet. If not provided, will extract from :meth:`self.__header__.get_payload <pcapkit.protocols.schema.schema.Schema.get_payload>` Returns: Instance of next layer. """ if TYPE_CHECKING: protocol: 'Type[Protocol]' if payload is None: file_ = self.__header__.get_payload() else: file_ = payload if length is None: length = len(file_) if length == 0: from pcapkit.protocols.misc.null import NoPayload as protocol # isort: skip # pylint: disable=import-outside-toplevel elif self._sigterm: from pcapkit.protocols.misc.raw import Raw as protocol # isort: skip # pylint: disable=import-outside-toplevel else: protocol = self.__proto__[proto] # type: ignore[assignment] if isinstance(protocol, ModuleDescriptor): protocol = protocol.klass # type: ignore[unreachable] self.__proto__[proto] = protocol # update mapping upon import next_ = protocol(file_, length, version=version, extension=extension, # type: ignore[abstract] alias=proto, packet=packet, layer=self._exlayer, protocol=self._exproto) return next_