Source code for pcapkit.protocols.transport.udp

# -*- coding: utf-8 -*-
# mypy: disable-error-code=dict-item
"""UDP - User Datagram Protocol
==================================

.. module:: pcapkit.protocols.transport.udp

:mod:`pcapkit.protocols.transport.udp` contains
:class:`~pcapkit.protocols.transport.udp.UDP` only,
which implements extractor for User Datagram Protocol
(UDP) [*]_, whose structure is described as below:

======= ========= ===================== ===============================
Octets      Bits        Name                    Description
======= ========= ===================== ===============================
  0           0   ``udp.srcport``             Source Port
  2          16   ``udp.dstport``             Destination Port
  4          32   ``udp.len``                 Length (header includes)
  6          48   ``udp.checksum``            Checksum
======= ========= ===================== ===============================

.. [*] https://en.wikipedia.org/wiki/User_Datagram_Protocol

"""
import collections
from typing import TYPE_CHECKING

from pcapkit.const.reg.transtype import TransType as Enum_TransType
from pcapkit.corekit.module import ModuleDescriptor
from pcapkit.protocols.data.transport.udp import UDP as Data_UDP
from pcapkit.protocols.schema.transport.udp import UDP as Schema_UDP
from pcapkit.protocols.transport.transport import Transport

if TYPE_CHECKING:
    from typing import Any, Optional

    from typing_extensions import Literal

    from pcapkit.const.reg.apptype import AppType as Enum_AppType
    from pcapkit.protocols.protocol import ProtocolBase as Protocol
    from pcapkit.protocols.schema.schema import Schema

__all__ = ['UDP']


[docs] class UDP(Transport[Data_UDP, Schema_UDP], schema=Schema_UDP, data=Data_UDP): """This class implements User Datagram Protocol. This class currently supports parsing of the following protocols, which are registered in the :attr:`self.__proto__ <pcapkit.protocols.transport.udp.UDP.__proto__>` attribute: .. list-table:: :header-rows: 1 * - Port Number - Protocol * - 80 - :class:`pcapkit.protocols.application.http.HTTP` """ ########################################################################## # Defaults. ########################################################################## #: DefaultDict[int, ModuleDescriptor[Protocol] | Type[Protocol]]: Protocol #: index mapping for decoding next layer, c.f. #: :meth:`self._decode_next_layer <pcapkit.protocols.transport.transport.Transport._decode_next_layer>` #: & :meth:`self._import_next_layer <pcapkit.protocols.protocol.Protocol._import_next_layer>`. __proto__ = collections.defaultdict( lambda: ModuleDescriptor('pcapkit.protocols.misc.raw', 'Raw'), { 80: ModuleDescriptor('pcapkit.protocols.application.http', 'HTTP'), # HTTP }, ) ########################################################################## # Properties. ########################################################################## @property def name(self) -> 'Literal["User Datagram Protocol"]': """Name of current protocol.""" return 'User Datagram Protocol' @property def length(self) -> 'Literal[8]': """Header length of current protocol.""" return 8 @property def src(self) -> 'Enum_AppType': """Source port.""" return self._info.srcport @property def dst(self) -> 'Enum_AppType': """Destination port.""" return self._info.dstport ########################################################################## # Methods. ##########################################################################
[docs] def read(self, length: 'Optional[int]' = None, **kwargs: 'Any') -> 'Data_UDP': # pylint: disable=unused-argument """Read User Datagram Protocol (UDP). Structure of UDP header [:rfc:`768`]: .. code-block:: text 0 7 8 15 16 23 24 31 +--------+--------+--------+--------+ | Source | Destination | | Port | Port | +--------+--------+--------+--------+ | | | | Length | Checksum | +--------+--------+--------+--------+ | | data octets ... +---------------- ... Args: length: Length of packet data. **kwargs: Arbitrary keyword arguments. Returns: Parsed packet data. """ if length is None: length = len(self) schema = self.__header__ udp = Data_UDP( srcport=schema.srcport, dstport=schema.dstport, len=schema.len, checksum=schema.checksum, ) return self._decode_next_layer(udp, (udp.srcport.port, udp.dstport.port), udp.len - 8)
[docs] def make(self, srcport: 'Enum_AppType | int' = 0, dstport: 'Enum_AppType | int' = 0, checksum: 'bytes' = b'\x00\x00', payload: 'bytes | Schema | Protocol' = b'', **kwargs: 'Any') -> 'Schema_UDP': """Make (construct) packet data. Args: srcport: Source port. dstport: Destination port. checksum: Checksum. payload: Payload data. **kwargs: Arbitrary keyword arguments. Returns: Constructed packet data. """ return Schema_UDP( srcport=srcport, dstport=dstport, len=8 + len(payload), checksum=checksum, payload=payload, )
########################################################################## # Data models. ########################################################################## def __length_hint__(self) -> 'Literal[8]': """Return an estimated length for the object.""" return 8
[docs] @classmethod def __index__(cls) -> 'Enum_TransType': # pylint: disable=invalid-index-returned """Numeral registry index of the protocol. Returns: Numeral registry index of the protocol in `IANA`_. .. _IANA: https://www.iana.org/assignments/protocol-numbers/protocol-numbers.xhtml """ return Enum_TransType.UDP # type: ignore[return-value]
########################################################################## # Utilities. ##########################################################################
[docs] @classmethod def _make_data(cls, data: 'Data_UDP') -> 'dict[str, Any]': # type: ignore[override] """Create key-value pairs from ``data`` for protocol construction. Args: data: protocol data Returns: Key-value pairs for protocol construction. """ return { 'srcport': data.srcport, 'dstport': data.dstport, 'checksum': data.checksum, 'payload': cls._make_payload(data), }