Source code for pcapkit.protocols.internet.ipv6

# -*- coding: utf-8 -*-
"""IPv6 - Internet Protocol version 6
========================================

.. module:: pcapkit.protocols.internet.ipv6

:mod:`pcapkit.protocols.internet.ipv6` contains
:class:`~pcapkit.protocols.internet.ipv6.IPv6` only,
which implements extractor for Internet Protocol
version 6 (IPv6) [*]_, whose structure is described
as below:

======= ========= ===================== =======================================
Octets      Bits        Name                    Description
======= ========= ===================== =======================================
  0           0   ``ip.version``              Version (``6``)
  0           4   ``ip.class``                Traffic Class
  1          12   ``ip.label``                Flow Label
  4          32   ``ip.payload``              Payload Length (header excludes)
  6          48   ``ip.next``                 Next Header
  7          56   ``ip.limit``                Hop Limit
  8          64   ``ip.src``                  Source Address
  24        192   ``ip.dst``                  Destination Address
======= ========= ===================== =======================================

.. [*] https://en.wikipedia.org/wiki/IPv6_packet

"""
import ipaddress
from typing import TYPE_CHECKING

from pcapkit.const.ipv6.extension_header import ExtensionHeader as Enum_ExtensionHeader
from pcapkit.const.reg.transtype import TransType as Enum_TransType
from pcapkit.corekit.module import ModuleDescriptor
from pcapkit.corekit.multidict import OrderedMultiDict
from pcapkit.corekit.protochain import ProtoChain
from pcapkit.protocols.data.internet.ipv6 import IPv6 as Data_IPv6
from pcapkit.protocols.internet.ip import IP
from pcapkit.protocols.schema.internet.ipv6 import IPv6 as Schema_IPv6
from pcapkit.utilities.decorators import beholder

if TYPE_CHECKING:
    from enum import IntEnum as StdlibEnum
    from ipaddress import IPv6Address
    from typing import Any, Optional, Type

    from aenum import IntEnum as AenumEnum
    from typing_extensions import Literal

    from pcapkit.protocols.protocol import ProtocolBase as Protocol

__all__ = ['IPv6']


[docs] class IPv6(IP[Data_IPv6, Schema_IPv6], schema=Schema_IPv6, data=Data_IPv6): """This class implements Internet Protocol version 6.""" ########################################################################## # Properties. ########################################################################## @property def name(self) -> 'Literal["Internet Protocol version 6"]': """Name of corresponding protocol.""" return 'Internet Protocol version 6' @property def length(self) -> 'Literal[40]': """Header length of corresponding protocol.""" return 40 @property def protocol(self) -> 'Enum_TransType': """Name of next layer protocol.""" return self._info.protocol # source IP address @property def src(self) -> 'IPv6Address': """Source IP address.""" return self._info.src # destination IP address @property def dst(self) -> 'IPv6Address': """Destination IP address.""" return self._info.dst @property def extension_headers(self) -> 'OrderedMultiDict[Enum_ExtensionHeader, Protocol]': """IPv6 extension header records.""" return self._exthdr ########################################################################## # Methods. ##########################################################################
[docs] def read(self, length: 'Optional[int]' = None, *, __packet__: 'Optional[dict[str, Any]]' = None, **kwargs: 'Any') -> 'Data_IPv6': # pylint: disable=unused-argument """Read Internet Protocol version 6 (IPv6). Structure of IPv6 header [:rfc:`2460`]: .. code-block:: text 0 1 2 3 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ |Version| Traffic Class | Flow Label | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | Payload Length | Next Header | Hop Limit | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | | + + | | + Source Address + | | + + | | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | | + + | | + Destination Address + | | + + | | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ Args: length: Length of packet data. __packet__: Optional packet data. **kwargs: Arbitrary keyword arguments. Returns: Parsed packet data. """ if length is None: length = len(self) schema = self.__header__ ipv6 = Data_IPv6.from_dict({ 'version': schema.hextet['version'], 'class': schema.hextet['class'], 'label': schema.hextet['label'], 'payload': schema.length, 'next': schema.next, 'limit': schema.limit, 'src': schema.src, 'dst': schema.dst, }) # type: Data_IPv6 # update packet info if __packet__ is None: __packet__ = {} __packet__.update({ 'src': ipv6.src, 'dst': ipv6.dst, }) return self._decode_next_layer(ipv6, schema.next, ipv6.payload, packet=__packet__) # pylint: disable=no-member
[docs] def make(self, traffic_class: 'int' = 0, flow_label: 'int' = 0, next: 'Enum_TransType | StdlibEnum | AenumEnum | str | int' = Enum_TransType.UDP, next_default: 'Optional[int]' = None, next_namespace: 'Optional[dict[str, int] | dict[int, str] | Type[StdlibEnum] | Type[AenumEnum]]' = None, # pylint: disable=line-too-long next_reversed: 'bool' = False, hop_limit: 'int' = 64, # reasonable default src: 'IPv6Address | str | bytes | int' = '::1', dst: 'IPv6Address | str | bytes | int' = '::', payload: 'bytes | Protocol | Schema' = b'', **kwargs: 'Any') -> 'Schema_IPv6': """Make (construct) packet data. Args: traffic_class: Traffic class. flow_label: Flow label. next: Next header. next_default: Default value of next header. next_namespace: Namespace of next header. next_reversed: Whether to reverse the namespace of next header. hop_limit: Hop limit. src: Source IP address. dst: Destination IP address. payload: Payload data. **kwargs: Arbitrary keyword arguments. Returns: Constructed packet data. """ next_val = self._make_index(next, next_default, namespace=next_namespace, reversed=next_reversed, pack=False) return Schema_IPv6( hextet={ 'version': 6, 'class': traffic_class, 'label': flow_label, }, length=len(payload), next=next_val, # type: ignore[arg-type] limit=hop_limit, src=src, dst=dst, payload=payload, )
[docs] @classmethod def id(cls) -> 'tuple[Literal["IPv6"]]': # type: ignore[override] """Index ID of the protocol. Returns: Index ID of the protocol. """ return ('IPv6',)
########################################################################## # Data models. ########################################################################## def __length_hint__(self) -> 'Literal[40]': """Return an estimated length for the object.""" return 40
[docs] @classmethod def __index__(cls) -> 'Enum_TransType': # pylint: disable=invalid-index-returned """Numeral registry index of the protocol. Returns: Numeral registry index of the protocol in `IANA`_. .. _IANA: https://www.iana.org/assignments/protocol-numbers/protocol-numbers.xhtml """ return Enum_TransType.IPv6 # type: ignore[return-value]
########################################################################## # Utilities. ##########################################################################
[docs] @classmethod def _make_data(cls, data: 'Data_IPv6') -> 'dict[str, Any]': # type: ignore[override] """Create key-value pairs from ``data`` for protocol construction. Args: data: protocol data Returns: Key-value pairs for protocol construction. """ return { 'traffic_class': data['class'], 'flow_label': data.label, 'next': data.next, 'hop_limit': data.limit, 'src': data.src, 'dst': data.dst, 'payload': cls._make_payload(data) }
def _read_ip_hextet(self) -> 'tuple[int, int, int]': """Read first four hextets of IPv6. Returns: Parsed hextets data, including version number, traffic class and flow label. """ _htet = self._read_fileng(4).hex() _vers = int(_htet[0], base=16) # version number (6) _tcls = int(_htet[0:2], base=16) # traffic class _flow = int(_htet[2:], base=16) # flow label return (_vers, _tcls, _flow) def _read_ip_addr(self) -> 'IPv6Address': """Read IP address. Returns: Parsed IP address. """ return ipaddress.ip_address(self._read_fileng(16)) # type: ignore[return-value]
[docs] def _decode_next_layer(self, ipv6: 'Data_IPv6', proto: 'Optional[int]' = None, # type: ignore[override] # pylint: disable=arguments-differ,arguments-renamed length: 'Optional[int]' = None, *, packet: 'Optional[dict[str, Any]]' = None) -> 'Data_IPv6': # pylint: disable=arguments-differ """Decode next layer extractor. Arguments: ipv6: info buffer proto: next layer protocol name length: valid (*not padding*) length packet: packet info (passed from :meth:`self.unpack <pcapkit.protocols.protocol.Protocol.unpack>`) Returns: Current protocol with next layer extracted. """ #: Extension headers. self._exthdr = OrderedMultiDict() # type: OrderedMultiDict[Enum_ExtensionHeader, Protocol] # pylint: disable=attribute-defined-outside-init hdr_len = self.length # header length raw_len = ipv6.payload # payload length _protos = [] # ProtoChain buffer # traverse if next header is an extensive header payload = self.__header__.get_payload() while True: try: ex_proto = Enum_ExtensionHeader(proto) except ValueError: break # # directly break when No Next Header occurs # if proto.name == 'IPv6-NoNxt': # proto = None # break # make protocol name next_ = self._import_next_layer(proto, packet=packet, version=6, extension=True, payload=payload) # type: ignore[misc,call-arg,arg-type] info = next_.info name = next_.alias.lstrip('IPv6-').lower() ipv6.__update__({ name: info, }) # record protocol name # self._protos = ProtoChain(name, chain, alias) _protos.append(next_) proto = info.next # update header & payload length hdr_len += next_.length # type: ignore[assignment] raw_len -= next_.length # keep record of extension headers self._exthdr.add(ex_proto, next_) # keep original data after fragment header if ex_proto == Enum_ExtensionHeader.IPv6_Frag: ipv6.__update__({ 'fragment': self._read_packet(header=hdr_len, payload=raw_len), }) break # update payload for next extension header payload = payload[next_.length:] # record real header & payload length (headers exclude) ipv6.__update__({ 'hdr_len': hdr_len, 'raw_len': raw_len, # update next header 'protocol': proto, }) ipv6_exthdr = ProtoChain.from_list(_protos) # type: ignore[arg-type] return super()._decode_next_layer(ipv6, proto, raw_len, packet=packet, ipv6_exthdr=ipv6_exthdr, payload=payload)
@beholder # type: ignore[arg-type] def _import_next_layer(self, proto: 'int', length: 'Optional[int]' = None, *, # pylint: disable=arguments-differ packet: 'Optional[dict[str, Any]]' = None, version: 'Literal[4, 6]' = 4, extension: 'bool' = False, payload: 'Optional[bytes]' = None) -> 'Protocol': """Import next layer extractor. Arguments: proto: next layer protocol index length: valid (*non-padding*) length packet: packet info (passed from :meth:`self.unpack <pcapkit.protocols.protocol.Protocol.unpack>`) version: IP protocol version extension: if is extension header payload: payload from packet. If not provided, will extract from :meth:`self.__header__.get_payload <pcapkit.protocols.schema.schema.Schema.get_payload>` Returns: Instance of next layer. """ if TYPE_CHECKING: protocol: 'Type[Protocol]' if payload is None: file_ = self.__header__.get_payload() else: file_ = payload if length is None: length = len(file_) if length == 0: from pcapkit.protocols.misc.null import \ NoPayload as protocol # isort: skip # pylint: disable=import-outside-toplevel elif self._sigterm: from pcapkit.protocols.misc.raw import \ Raw as protocol # isort: skip # pylint: disable=import-outside-toplevel else: protocol = self.__proto__[proto] # type: ignore[assignment] if isinstance(protocol, ModuleDescriptor): protocol = protocol.klass # type: ignore[unreachable] self.__proto__[proto] = protocol # update mapping upon import next_ = protocol(file_, length, version=version, extension=extension, # type: ignore[abstract] alias=proto, packet=packet, layer=self._exlayer, protocol=self._exproto) return next_