Source code for pcapkit.corekit.fields.misc

# -*- coding: utf-8 -*-
"""miscellaneous field class"""

import copy
import io
from typing import TYPE_CHECKING, TypeVar, cast

from pcapkit.corekit.fields.field import FieldBase, NoValue
from pcapkit.utilities.exceptions import FieldError, NoDefaultValue

__all__ = [
    'ConditionalField', 'PayloadField',
    'SwitchField', 'ForwardMatchField',
    'NoValueField',
]

if TYPE_CHECKING:
    from typing import IO, Any, Callable, Optional, Type

    from typing_extensions import Self

    from pcapkit.corekit.fields.field import NoValueType
    from pcapkit.protocols.protocol import ProtocolBase as Protocol
    from pcapkit.protocols.schema.schema import Schema

_TC = TypeVar('_TC')
_TS = TypeVar('_TS', bound='Schema')
_TP = TypeVar('_TP', bound='Protocol')
_TN = TypeVar('_TN', bound='NoValueType')


[docs] class NoValueField(FieldBase[_TN]): """Schema field for no value type (or :obj:`None`).""" _default = NoValue @property def template(self) -> 'str': """Field template.""" return '0s' @property def length(self) -> 'int': """Field size.""" return 0
[docs] def pack(self, value: 'Optional[_TN]', packet: 'dict[str, Any]') -> 'bytes': """Pack field value into :obj:`bytes`. Args: value: Field value. packet: Packet data. Returns: Packed field value. """ return b''
[docs] def unpack(self, buffer: 'bytes | IO[bytes]', packet: 'dict[str, Any]') -> '_TN': """Unpack field value from :obj:`bytes`. Args: buffer: Field buffer. packet: Packet data. Returns: Unpacked field value. """ return None # type: ignore[return-value]
[docs] class ConditionalField(FieldBase[_TC]): """Conditional value for protocol fields. Args: field: Field instance. condition: Field condition function (this function should return a bool value and accept the current packet :class:`pcapkit.corekit.infoclass.Info` as its only argument). """ @property def name(self) -> 'str': """Field name.""" return self._field.name @name.setter def name(self, value: 'str') -> 'None': """Set field name.""" self._field.name = value @property def default(self) -> '_TC | NoValueType': """Field default value.""" return self._field.default @default.setter def default(self, value: '_TC | NoValueType') -> 'None': """Set field default value.""" self._field.default = value @default.deleter def default(self) -> 'None': """Delete field default value.""" self._field.default = NoValue @property def template(self) -> 'str': """Field template.""" return self._field.template @property def length(self) -> 'int': """Field size.""" return self._field.length @property def optional(self) -> 'bool': """Field is optional.""" return True @property def field(self) -> 'FieldBase[_TC]': """Field instance.""" return self._field def __init__(self, field: 'FieldBase[_TC]', # pylint: disable=super-init-not-called condition: 'Callable[[dict[str, Any]], bool]') -> 'None': self._field = field # type: FieldBase[_TC] self._condition = condition
[docs] def __call__(self, packet: 'dict[str, Any]') -> 'Self': """Update field attributes. Arguments: packet: Packet data. Returns: Updated field instance. This method will return a new instance of :class:`ConditionalField` instead of updating the current instance. """ new_self = copy.copy(self) if new_self._condition(packet): new_self._field = new_self._field(packet) return new_self
[docs] def pre_process(self, value: '_TC', packet: 'dict[str, Any]') -> 'Any': # pylint: disable=unused-argument """Process field value before construction (packing). Arguments: value: Field value. packet: Packet data. Returns: Processed field value. """ return self._field.pre_process(value, packet)
[docs] def pack(self, value: 'Optional[_TC]', packet: 'dict[str, Any]') -> 'bytes': """Pack field value into :obj:`bytes`. Args: value: Field value. packet: Packet data. Returns: Packed field value. """ if not self._condition(packet): return b'' return self._field.pack(value, packet)
[docs] def post_process(self, value: 'Any', packet: 'dict[str, Any]') -> '_TC': # pylint: disable=unused-argument """Process field value after parsing (unpacking). Args: value: Field value. packet: Packet data. Returns: Processed field value. """ return self._field.post_process(value, packet)
[docs] def unpack(self, buffer: 'bytes | IO[bytes]', packet: 'dict[str, Any]') -> '_TC': """Unpack field value from :obj:`bytes`. Args: buffer: Field buffer. packet: Packet data. Returns: Unpacked field value. """ if not self._condition(packet): return self._field.default # type: ignore[return-value] return self._field.unpack(buffer, packet)
[docs] def test(self, packet: 'dict[str, Any]') -> 'bool': """Test field condition. Arguments: packet: Current packet. Returns: bool: Test result. """ return self._condition(packet)
[docs] class PayloadField(FieldBase[_TP]): """Payload value for protocol fields. Args: length: Field size (in bytes); if a callable is given, it should return an integer value and accept the current packet as its only argument. default: Field default value. protocol: Payload protocol. callback: Callback function to be called upon :meth:`self.__call__ <pcapkit.corekit.fields.field.FieldBase.__call__>`. """ @property def template(self) -> 'str': """Field template.""" return self._template @property def length(self) -> 'int': """Field size.""" return self._length @property def optional(self) -> 'bool': """Field is optional.""" return True @property def protocol(self) -> 'Type[_TP]': """Payload protocol.""" if self._protocol is None: from pcapkit.protocols.misc.raw import Raw # type: ignore[unreachable] # pylint: disable=import-outside-top-level # isort:skip return Raw return self._protocol @protocol.setter def protocol(self, protocol: 'Type[_TP] | str') -> 'None': """Set payload protocol. Arguments: protocol: Payload protocol. """ if isinstance(protocol, str): from pcapkit.protocols import __proto__ # pylint: disable=import-outside-top-level protocol = cast('Type[_TP]', __proto__.get(protocol)) self._protocol = protocol def __init__(self, length: 'int | Callable[[dict[str, Any]], int]' = lambda _: -1, default: '_TP | NoValueType | bytes' = NoValue, protocol: 'Optional[Type[_TP]]' = None, callback: 'Callable[[Self, dict[str, Any]], None]' = lambda *_: None) -> 'None': #self._name = '<payload>' self._default = default # type: ignore[assignment] self._protocol = protocol # type: ignore[assignment] self._callback = callback self._length_callback = None if not isinstance(length, int): self._length_callback, length = length, -1 self._length = length self._template = f'{self._length}s' if self._length >= 0 else '1024s' # use a reasonable default
[docs] def __call__(self, packet: 'dict[str, Any]') -> 'Self': """Update field attributes. Args: packet: Packet data. Returns: Updated field instance. This method will return a new instance of :class:`PayloadField` instead of updating the current instance. """ new_self = copy.copy(self) new_self._callback(new_self, packet) if new_self._length_callback is not None: new_self._length = new_self._length_callback(packet) new_self._template = f'{new_self._length}s' return new_self
[docs] def pack(self, value: 'Optional[_TP | Schema | bytes]', packet: 'dict[str, Any]') -> 'bytes': """Pack field value into :obj:`bytes`. Args: value: Field value. packet: Packet data. Returns: Packed field value. """ if value is None: if self._default is NoValue: raise NoDefaultValue(f'Field {self.name} has no default value.') value = cast('_TP', self._default) from pcapkit.protocols.schema.schema import \ Schema # pylint: disable=import-outside-top-level if isinstance(value, bytes): return value if isinstance(value, Schema): return value.pack() return value.data # type: ignore[union-attr]
[docs] def unpack(self, buffer: 'bytes | IO[bytes]', packet: 'dict[str, Any]') -> '_TP': """Unpack field value from :obj:`bytes`. Args: buffer: Field buffer. packet: Packet data. Returns: Unpacked field value. """ if self._protocol is None: if isinstance(buffer, bytes): # type: ignore[unreachable] return cast('_TP', buffer) return cast('_TP', buffer.read()) if isinstance(buffer, bytes): file = io.BytesIO(buffer) # type: IO[bytes] else: file = buffer length = self._length if self._length > 0 else None return self._protocol(file, length) # type: ignore[abstract]
[docs] class SwitchField(FieldBase[_TC]): """Conditional type-switching field for protocol schema. Args: selector: Callable function to select field type, which should accept the current packet as its only argument and return a field instance. """ @property def name(self) -> 'str': """Field name.""" return self._field.name @name.setter def name(self, value: 'str') -> 'None': """Set field name.""" self._field.name = value @property def default(self) -> '_TC | NoValueType': """Field default value.""" return self._field.default @default.setter def default(self, value: '_TC | NoValueType') -> 'None': """Set field default value.""" self._field.default = value @default.deleter def default(self) -> 'None': """Delete field default value.""" self._field.default = NoValue @property def template(self) -> 'str': """Field template.""" return self._field.template @property def length(self) -> 'int': """Field size.""" return self._field.length @property def optional(self) -> 'bool': """Field is optional.""" return True @property def field(self) -> 'FieldBase[_TC]': """Field instance.""" return self._field def __init__(self, selector: 'Callable[[dict[str, Any]], FieldBase[_TC]]' = lambda _: NoValueField()) -> 'None': # type: ignore[assignment,return-value] #self._name = '<switch>' self._field = cast('FieldBase[_TC]', NoValueField()) self._selector = selector
[docs] def __call__(self, packet: 'dict[str, Any]') -> 'SwitchField[_TC]': """Call field. Args: packet: Packet data. Returns: New field instance. This method will return a new instance of :class:`SwitchField` instead of updating the current instance. """ new_self = copy.copy(self) new_self._field = new_self._selector(packet)(packet) new_self._field.name = self.name return new_self
[docs] def pre_process(self, value: '_TC', packet: 'dict[str, Any]') -> 'Any': # pylint: disable=unused-argument """Process field value before construction (packing). Arguments: value: Field value. packet: Packet data. Returns: Processed field value. """ if self._field is None: return NoValue # type: ignore[unreachable] return self._field.pre_process(value, packet)
[docs] def pack(self, value: 'Optional[_TC]', packet: 'dict[str, Any]') -> 'bytes': """Pack field value into :obj:`bytes`. Args: value: Field value. packet: Packet data. Returns: Packed field value. """ if self._field is None: return b'' # type: ignore[unreachable] return self._field.pack(value, packet)
[docs] def post_process(self, value: 'Any', packet: 'dict[str, Any]') -> '_TC': # pylint: disable=unused-argument """Process field value after parsing (unpacking). Args: value: Field value. packet: Packet data. Returns: Processed field value. """ if self._field is None: return NoValue # type: ignore[unreachable] return self._field.post_process(value, packet)
[docs] def unpack(self, buffer: 'bytes | IO[bytes]', packet: 'dict[str, Any]') -> '_TC': """Unpack field value from :obj:`bytes`. Args: buffer: Field buffer. packet: Packet data. Returns: Unpacked field value. """ if self._field is None: return None # type: ignore[unreachable] return self._field.unpack(buffer, packet)
[docs] class SchemaField(FieldBase[_TS]): """Schema field for protocol schema. Args: length: Field size (in bytes); if a callable is given, it should return an integer value and accept the current packet as its only argument. schema: Field schema. default: Default value for field. packet: Optional packet data for unpacking and/or packing purposes. callback: Callback function to process field value, which should accept the current field and the current packet as its arguments. """ @property def length(self) -> 'int': """Field size.""" return self._length # type: ignore[has-type] @property def optional(self) -> 'bool': """Field is optional.""" return True @property def schema(self) -> 'Type[_TS]': """Field schema.""" return self._schema def __init__(self, length: 'int | Callable[[dict[str, Any]], int]' = lambda _: -1, schema: 'Optional[Type[_TS]]' = None, default: '_TS | NoValueType | bytes' = NoValue, packet: 'Optional[dict[str, Any]]' = None, callback: 'Callable[[Self, dict[str, Any]], None]' = lambda *_: None) -> 'None': #self._name = '<schema>' self._callback = callback if packet is None: packet = {} self._packet = packet if schema is None: raise FieldError('Schema field must have a schema.') self._schema = schema if isinstance(default, bytes): default = cast('_TS', schema.unpack(default)) # type: ignore[call-arg,misc] self._default = default self._length_callback = None if not isinstance(length, int): self._length_callback, length = length, -1 self._length = length self._template = f'{self._length}s' if self._length >= 0 else '1024s' # use a reasonable default
[docs] def __call__(self, packet: 'dict[str, Any]') -> 'Self': """Update field attributes. Args: packet: Packet data. Returns: New field instance. This method will return a new instance of :class:`SchemaField` instead of updating the current instance. """ new_self = copy.copy(self) new_self._callback(new_self, packet) if new_self._length_callback is not None: new_self._length = new_self._length_callback(packet) new_self._template = f'{new_self._length}s' if self._length >= 0 else '1024s' # use a reasonable default return new_self
[docs] def pack(self, value: 'Optional[_TS | bytes]', packet: 'dict[str, Any]') -> 'bytes': """Pack field value into :obj:`bytes`. Args: value: Field value. packet: Packet data. Returns: Packed field value. Notes: We will use ``packet`` as a ``__packet__`` key in the packet context passed to the underlying :class:`~pcapkit.protocols.schema.schema.Schema` for packing purposes. """ if value is None: if self._default is NoValue: raise NoDefaultValue(f'Field {self.name} has no default value.') value = cast('_TS', self._default) if isinstance(value, bytes): return value packet.update(self._packet) return value.pack({ '__packet__': packet, })
[docs] def unpack(self, buffer: 'bytes | IO[bytes]', packet: 'dict[str, Any]') -> '_TS': """Unpack field value from :obj:`bytes`. Args: buffer: Field buffer. packet: Packet data. Returns: Unpacked field value. Notes: We will use ``packet`` as a ``__packet__`` key in the packet context passed to the underlying :class:`~pcapkit.protocols.schema.schema.Schema` for unpacking purposes. """ if isinstance(buffer, bytes): file = io.BytesIO(buffer) # type: IO[bytes] else: file = buffer packet.update(self._packet) return cast('_TS', self._schema.unpack(file, self.length, { # type: ignore[call-arg,misc] '__packet__': packet, }))
[docs] class ForwardMatchField(FieldBase[_TC]): """Schema field for non-capturing forward matching. Args: field: Field to forward match. """ @property def name(self) -> 'str': """Field name.""" return self._field.name @name.setter def name(self, value: 'str') -> 'None': """Set field name.""" self._field.name = value @property def default(self) -> '_TC | NoValueType': """Field default value.""" return self._field.default @default.setter def default(self, value: '_TC | NoValueType') -> 'None': """Set field default value.""" self._field.default = value @default.deleter def default(self) -> 'None': """Delete field default value.""" self._field.default = NoValue @property def template(self) -> 'str': """Field template.""" return self._field.template @property def length(self) -> 'int': """Field size.""" return self._field.length @property def optional(self) -> 'bool': """Field is optional.""" return True @property def field(self) -> 'FieldBase[_TC]': """Field instance.""" return self._field def __init__(self, field: 'FieldBase[_TC]') -> 'None': #self._name = '<forward_match>' self._field = field
[docs] def __call__(self, packet: 'dict[str, Any]') -> 'Self': """Update field attributes. Arguments: packet: Packet data. Returns: Updated field instance. This method will return a new instance of :class:`ConditionalField` instead of updating the current instance. """ new_self = copy.copy(self) new_self._field = new_self._field(packet) return new_self
[docs] def pack(self, value: 'Optional[_TC]', packet: 'dict[str, Any]') -> 'bytes': """Pack field value into :obj:`bytes`. Args: value: Field value. packet: Packet data. Returns: Packed field value. """ return b''
[docs] def unpack(self, buffer: 'bytes | IO[bytes]', packet: 'dict[str, Any]') -> '_TC': """Unpack field value from :obj:`bytes`. Args: buffer: Field buffer. packet: Packet data. Returns: Unpacked field value. """ return self._field.unpack(buffer, packet)