Source code for pcapkit.protocols.transport.transport

# -*- coding: utf-8 -*-
"""Base Protocol
===================

.. module:: pcapkit.protocols.transport.transport

:mod:`pcapkit.protocols.transport.transport` contains
:class:`~pcapkit.protocols.transport.transport.Transport`,
which is a base class for transport layer protocols, eg.
:class:`~pcapkit.protocols.transport.transport.tcp.TCP` and
:class:`~pcapkit.protocols.transport.transport.udp.UDP`.

"""
import io
from typing import TYPE_CHECKING, Generic

from pcapkit.corekit.module import ModuleDescriptor
from pcapkit.protocols.protocol import _PT, _ST
from pcapkit.protocols.protocol import ProtocolBase as Protocol
from pcapkit.utilities.exceptions import StructError, UnsupportedCall, stacklevel
from pcapkit.utilities.logging import DEVMODE, logger
from pcapkit.utilities.warnings import RegistryWarning, warn

if TYPE_CHECKING:
    from typing import Any, DefaultDict, Optional, Type

    from typing_extensions import Literal

__all__ = ['Transport']


[docs] class Transport(Protocol[_PT, _ST], Generic[_PT, _ST]): # pylint: disable=abstract-method """Abstract base class for transport layer protocol family.""" if TYPE_CHECKING: #: Protocol index mapping for decoding next layer, #: c.f. :meth:`self._decode_next_layer <pcapkit.protocols.transport.transport.Transport._decode_next_layer>` #: & :meth:`self._import_next_layer <pcapkit.protocols.protocol.Protocol._import_next_layer>`. __proto__: 'DefaultDict[int, ModuleDescriptor[Protocol] | Type[Protocol]]' ########################################################################## # Defaults. ########################################################################## #: Layer of protocol. __layer__ = 'Transport' # type: Literal['Transport'] ########################################################################## # Properties. ########################################################################## # protocol layer @property def layer(self) -> 'Literal["Transport"]': """Protocol layer.""" return self.__layer__ ########################################################################## # Methods. ##########################################################################
[docs] @classmethod def register(cls, code: 'int', protocol: 'ModuleDescriptor[Protocol] | Type[Protocol]') -> 'None': """Register a new protocol class. Notes: The full qualified class name of the new protocol class should be as ``{protocol.module}.{protocol.name}``. Arguments: code: port number protocol: module name Important: This method must be called from a non-abstract class, as the protocol map should be associated directly with specific transport layer protocol type. """ if cls is Transport: raise UnsupportedCall(f'{cls.__name__} is an abstract class') if isinstance(protocol, ModuleDescriptor): protocol = protocol.klass if not issubclass(protocol, Protocol): raise TypeError(f'protocol must be a Protocol subclass, not {protocol!r}') if code in cls.__proto__: warn(f'port {code} already registered, overwriting', RegistryWarning) cls.__proto__[code] = protocol
[docs] @classmethod def analyze(cls, ports: 'tuple[int, int]', payload: 'bytes', **kwargs: 'Any') -> 'Protocol': # type: ignore[override] # pylint: disable=arguments-renamed """Analyse packet payload. Args: ports: Source & destination port numbers. payload: Packet payload. **kwargs: Arbitrary keyword arguments. Returns: Parsed payload as a :class:`~pcapkit.protocols.protocol.Protocol` instance. """ if ports[0] in cls.__proto__: protocol = cls.__proto__[ports[0]] else: protocol = cls.__proto__[ports[1]] if isinstance(protocol, ModuleDescriptor): protocol = protocol.klass payload_io = io.BytesIO(payload) try: report = protocol(payload_io, len(payload), **kwargs) # type: ignore[abstract] except Exception as exc: if isinstance(exc, StructError) and exc.eof: # pylint: disable=no-member from pcapkit.protocols.misc.null import NoPayload as protocol # pylint: disable=import-outside-toplevel # isort:skip else: from pcapkit.protocols.misc.raw import Raw as protocol # pylint: disable=import-outside-toplevel # isort:skip # error = traceback.format_exc(limit=1).strip().rsplit(os.linesep, maxsplit=1)[-1] # log error logger.error(str(exc), exc_info=exc, stack_info=DEVMODE, stacklevel=stacklevel()) report = protocol(payload_io, len(payload), **kwargs) # type: ignore[abstract] return report
########################################################################## # Utilities. ##########################################################################
[docs] def _decode_next_layer(self, dict_: '_PT', ports: 'tuple[int, int]', length: 'Optional[int]' = None, *, # type: ignore[override] packet: 'Optional[dict[str, Any]]' = None) -> '_PT': # pylint: disable=arguments-renamed """Decode next layer protocol. The method will check if the next layer protocol is supported based on the source and destination port numbers. We will use the lower port number from both ports as the primary key to lookup the next layer. Arguments: dict_: info buffer ports: source & destination port numbers length: valid (*non-padding*) length packet: packet info (passed from :meth:`self.unpack <pcapkit.protocols.protocol.Protocol.unpack>`) Returns: Current protocol with next layer extracted. """ sort_port = sorted(ports) if sort_port[0] in self.__proto__: proto = sort_port[0] elif sort_port[1] in self.__proto__: proto = sort_port[1] else: proto = None return super()._decode_next_layer(dict_, proto, length, packet=packet) # type: ignore[arg-type]