Source code for pcapkit.corekit.protochain

# -*- coding: utf-8 -*-
"""Protocol Chain
====================

.. module:: pcapkit.corekit.protochain

:mod:`pcapkit.corekit.protochain` contains special protocol
collection class :class:`~pcapkit.corekit.protochain.ProtoChain`.

"""
import collections.abc
import copy
from typing import TYPE_CHECKING, overload

from pcapkit.utilities.compat import cached_property
from pcapkit.utilities.exceptions import IndexNotFound

if TYPE_CHECKING:
    from typing import Iterator, Optional, Type

    from typing_extensions import Self

    from pcapkit.protocols.protocol import ProtocolBase as Protocol

__all__ = ['ProtoChain']


[docs] class ProtoChain(collections.abc.Sequence): """Protocols chain. Args: proto: New protocol class on the top stack. alias: New protocol alias on the top stack. basis: Original protocol chain as base stacks. """ #: Internal data storage for protocol chain. __data__: 'tuple[tuple[str, Type[Protocol]], ...]' ########################################################################## # Properties. ##########################################################################
[docs] @cached_property def protocols(self) -> 'tuple[Type[Protocol], ...]': """List of protocols in the chain.""" return tuple(data[1] for data in self.__data__)
[docs] @cached_property def aliases(self) -> 'tuple[str, ...]': """Protocol names.""" return tuple(data[0] for data in self.__data__)
@property def chain(self) -> 'str': """Protocol chain string.""" return self.__str__() ########################################################################## # Methods. ##########################################################################
[docs] @classmethod def from_list(cls, data: 'list[Protocol | Type[Protocol]]') -> 'Self': """Create a protocol chain from a list. Args: data: Protocol chain list. """ from pcapkit.protocols.protocol import \ ProtocolBase as Protocol # pylint: disable=import-outside-toplevel temp_data = [] for proto in data: if isinstance(proto, Protocol): alias = proto.alias proto = type(proto) temp_data.append((alias, proto)) obj = cls.__new__(cls) obj.__data__ = tuple(temp_data) return obj
[docs] def index(self, value: 'str | Protocol | Type[Protocol]', start: 'Optional[int]' = None, stop: 'Optional[int]' = None) -> 'int': """First index of ``value``. Args: value: Value to search. start: start offset. stop: stop offset. Raises: IndexNotFound: If the value is not present. """ if start is None: start = 0 elif start < 0: start = max(len(self) + start, 0) if stop is not None and stop < 0: stop += len(self) # prepare comparison values from pcapkit.protocols.protocol import \ ProtocolBase as Protocol # pylint: disable=import-outside-toplevel comp = Protocol.expand_comp(value) pool = self.__data__[start:stop] for idx, (alias, proto) in enumerate(pool): test_comp = (proto, alias.upper(), *(name.upper() for name in proto.id())) for test in comp: if test in test_comp: return start + idx raise IndexNotFound(f'{value!r} is not in {self.__class__.__name__!r}')
[docs] def count(self, value: 'str | Protocol | Type[Protocol]') -> int: """Number of occurrences of ``value``. Args: value: Value to search. """ # prepare comparison values from pcapkit.protocols.protocol import \ ProtocolBase as Protocol # pylint: disable=import-outside-toplevel comp = Protocol.expand_comp(value) cnt = 0 for alias, proto in self.__data__: test_comp = (proto, alias.upper(), *(name.upper() for name in proto.id())) for test in comp: if test in test_comp: cnt += 1 break return cnt
########################################################################## # Data models. ########################################################################## def __init__(self, proto: 'Protocol | Type[Protocol]', alias: 'Optional[str]' = None, *, basis: 'Optional[ProtoChain]' = None): """Initialisation. Args: proto: New protocol class on the top stack. alias: New protocol alias on the top stack. basis: Original protocol chain as base stacks. """ from pcapkit.protocols.protocol import \ ProtocolBase as Protocol # pylint: disable=import-outside-toplevel if isinstance(proto, Protocol): if alias is None: alias = proto.alias proto = type(proto) if alias is None: alias = proto.__name__ temp_data = [(alias, proto)] if basis is not None: temp_data.extend(basis) self.__data__ = tuple(temp_data)
[docs] def __repr__(self) -> 'str': """Returns representation of protocol chain data. Example: >>> protochain ProtoChain(Ethernet, IPv6, Raw) """ return f"ProtoChain({', '.join(map(lambda p: p[1].__name__, self.__data__))})"
[docs] def __str__(self) -> 'str': """Returns formatted hex representation of source data stream. Example: >>> protochain ProtoChain(Ethernet, IPv6, Raw) >>> print(protochain) Ethernet:IPv6:Raw """ return ':'.join(map(lambda p: p[0], self.__data__))
[docs] def __contains__(self, name: 'str | Protocol | Type[Protocol]') -> 'bool': # type: ignore[override] """Returns if ``name`` is in the chain. Args: name: Name to search. Returns: Whether ``name`` is in the chain. """ from pcapkit.protocols.protocol import \ ProtocolBase as Protocol # pylint: disable=import-outside-toplevel comp = Protocol.expand_comp(name) for alias, proto in self.__data__: test_comp = (proto, alias.upper(), *(name.upper() for name in proto.id())) for test in comp: if test in test_comp: return True return False
@overload def __getitem__(self, index: 'int') -> 'str': ... @overload def __getitem__(self, index: 'slice') -> 'tuple[str, ...]': ... def __getitem__(self, index: 'int | slice') -> 'str | tuple[str, ...]': """Subscription (``getitem``) support. Args: index: Indexing key. Returns: Protocol alias at such index. """ if isinstance(index, slice): return tuple(data[0] for data in self.__data__[index]) return self.__data__[index][0] def __iter__(self) -> 'Iterator[tuple[str, Type[Protocol]]]': """Iterator support. Returns: Iterator of protocol alias and class. """ return iter(self.__data__) def __len__(self) -> 'int': """Length support. Returns: Length of protocol chain. """ return len(self.__data__)
[docs] def __add__(self, other: 'ProtoChain') -> 'ProtoChain': """Merge protocol chain by appending protocols from ``other``. Args: other: Protocol chain to be merged. Returns: Merged protocol chain. """ new = copy.copy(self) new.__data__ += other.__data__ return new