Source code for pcapkit.utilities.decorators

# -*- coding: utf-8 -*-
# pylint: disable=protected-access
"""Decorator Functions
=========================

.. module:: pcapkit.utilities.decorators

:mod:`pcapkit.utilities.decorators` contains several useful
decorators, including :func:`~pcapkit.utilities.decorators.seekset`,
:func:`~pcapkit.utilities.decorators.beholder` and
:func:`~pcapkit.utilities.decorators.prepare`.

"""
import functools
import io
import os
import traceback
from typing import TYPE_CHECKING, cast

from pcapkit.utilities.exceptions import StructError, stacklevel
from pcapkit.utilities.logging import DEVMODE, VERBOSE, logger

if TYPE_CHECKING:
    from typing import IO, Any, Callable, Optional, Type, TypeVar

    from typing_extensions import Concatenate, ParamSpec

    from pcapkit.protocols.protocol import ProtocolBase as Protocol
    from pcapkit.protocols.schema.schema import Schema

    P = ParamSpec('P')
    R_seekset = TypeVar('R_seekset')
    R_beholder = TypeVar('R_beholder', bound=Protocol)
    R_prepare = TypeVar('R_prepare', bound=Schema)

__all__ = ['seekset', 'beholder', 'prepare']


[docs] def seekset(func: 'Callable[Concatenate[Protocol, P], R_seekset]') -> 'Callable[P, R_seekset]': """Read file from start then set back to original. Important: This decorator function is designed for decorating *class methods*. The decorator will keep the current offset of :attr:`self._file <pcapkit.protocols.protocol.Protocol._file>`, then call the decorated function. Afterwards, it will rewind the offset of :attr:`self._file <pcapkit.protocols.protocol.Protocol._file>` to the original and returns the return value from the decorated function. Note: The decorated function should have following signature:: func(self: 'pcapkit.protocols.protocol.ProtocolBase', *args: 'typing.Any', **kwargs: 'typing.Any') -> 'typing.Any' See Also: :meth:`pcapkit.protocols.protocol.Protocol._read_packet` :param func: decorated function :meta decorator: """ @functools.wraps(func) def seekcur(*args: 'P.args', **kw: 'P.kwargs') -> 'R_seekset': # extract self object self = cast('Protocol', args[0]) # move file pointer seek_cur = self._file.tell() self._file.seek(self._seekset, os.SEEK_SET) # call method return_ = func(*args, **kw) # reset file pointer self._file.seek(seek_cur, os.SEEK_SET) return return_ return seekcur
[docs] def beholder(func: 'Callable[Concatenate[Protocol, int, Optional[int], P], R_beholder]') -> 'Callable[P, R_beholder]': """Behold extraction procedure. Important: This decorator function is designed for decorating *class methods*. This decorator first keep the current offset of :attr:`self._file <pcapkit.protocols.protocol.Protocol._file>`, then try to call the decorated function. Should any exception raised, it will re-parse the :attr:`self._file <pcapkit.protocols.protocol.Protocol._file>` as :class:`~pcapkit.protocols.misc.raw.Raw` protocol. Note: The decorated function should have following signature:: func(self: 'pcapkit.protocols.protocol.ProtocolBase', proto: 'int', length: 'typing.Optional[int]', *args: 'typing.Any', **kwargs: 'typing.Any') -> 'pcapkit.protocols.protocol.ProtocolBase' See Also: :meth:`pcapkit.protocols.protocol.Protocol._decode_next_layer` :param func: decorated function :meta decorator: """ @functools.wraps(func) def behold(*args: 'P.args', **kwargs: 'P.kwargs') -> 'R_beholder': # extract self object & args self = cast('R_beholder', args[0]) try: length = cast('int', args[2]) except IndexError: length = None # record file pointer try: # call method return func(*args, **kwargs) except Exception as exc: if isinstance(exc, StructError) and exc.eof: # pylint: disable=no-member from pcapkit.protocols.misc.null import NoPayload as protocol # isort: skip # pylint: disable=import-outside-toplevel else: from pcapkit.protocols.misc.raw import Raw as protocol # type: ignore[assignment] # isort: skip # pylint: disable=import-outside-toplevel # error = traceback.format_exc(limit=1).strip().rsplit(os.linesep, maxsplit=1)[-1] # log error logger.warning(str(exc), stack_info=DEVMODE, stacklevel=stacklevel()) if VERBOSE: logger.error('The following error occurred while parsing the packet:') traceback.print_exc() file_ = self.__header__.get_payload() next_ = protocol(file_, length, error=str(exc)) return cast('R_beholder', next_) return behold
[docs] def prepare(func: 'Callable[Concatenate[Type[R_prepare], bytes | IO[bytes], Optional[int], Optional[dict[str, Any]], P], R_prepare]') -> 'Callable[P, R_prepare]': """Prepare schema packet data before unpacking. Important: This decorate function is designed for decorating the :meth:`Schema.unpack <pcapkit.protocols.schema.schema.Schema.unpack>` *class method*. This decorator will revise the parameter list provided to the original :meth:`Schema.unpack <pcapkit.protocols.schema.schema.Schema.unpack>` method and extract necessary information based on the given parameters, then provide the revised version of parameter list to the original method. Note: The decorated function should have following signature:: func(cls: 'typing.Type[pcapkit.protocols.schema.schema.Schema]', data: 'bytes | typing.IO[bytes]', length: 'Optional[int], packet: 'Optional[dict[str, Any]', *args: 'typing.Any', **kwargs: 'Any') -> 'pcapkit.protocols.schema.schema.Schema' See Also: :meth:`pcapkit.protocols.schema.schema.Schema.unpack` :param func: decorated function :meta decorator: """ @functools.wraps(func) def unpack(*args: 'P.args', **kwargs: 'P.kwargs') -> 'R_prepare': cls = cast('Type[R_prepare]', args[0]) data = cast('bytes | IO[bytes]', args[1]) length = cast('Optional[int]', args[2]) packet = cast('Optional[dict[str, Any]]', args[3]) if isinstance(data, bytes): length = len(data) if length is None else length data = io.BytesIO(data) else: if length is None: current = data.tell() length = data.seek(0, io.SEEK_END) - current data.seek(current) if length == 0: raise EOFError if packet is None: packet = {} packet['__length__'] = length # call the user customised preparation method # then proceed with the unpacking process # and eventually revise the schema data cls.pre_unpack(packet) schema = func(cls, data, length, packet) ret = schema.post_process(packet) return cast('R_prepare', ret) return unpack