# -*- coding: utf-8 -*-
"""miscellaneous field class"""
import copy
import io
from typing import TYPE_CHECKING, TypeVar, cast
from pcapkit.corekit.fields.field import FieldBase, NoValue
from pcapkit.utilities.exceptions import FieldError, NoDefaultValue
__all__ = [
    'ConditionalField', 'PayloadField',
    'SwitchField', 'ForwardMatchField',
    'NoValueField',
]
if TYPE_CHECKING:
    from typing import IO, Any, Callable, Optional, Type
    from typing_extensions import Self
    from pcapkit.corekit.fields.field import NoValueType
    from pcapkit.protocols.protocol import ProtocolBase as Protocol
    from pcapkit.protocols.schema.schema import Schema
_TC = TypeVar('_TC')
_TS = TypeVar('_TS', bound='Schema')
_TP = TypeVar('_TP', bound='Protocol')
_TN = TypeVar('_TN', bound='NoValueType')
[docs]
class NoValueField(FieldBase[_TN]):
    """Schema field for no value type (or :obj:`None`)."""
    _default = NoValue
    @property
    def template(self) -> 'str':
        """Field template."""
        return '0s'
    @property
    def length(self) -> 'int':
        """Field size."""
        return 0
[docs]
    def pack(self, value: 'Optional[_TN]', packet: 'dict[str, Any]') -> 'bytes':
        """Pack field value into :obj:`bytes`.
        Args:
            value: Field value.
            packet: Packet data.
        Returns:
            Packed field value.
        """
        return b'' 
[docs]
    def unpack(self, buffer: 'bytes | IO[bytes]', packet: 'dict[str, Any]') -> '_TN':
        """Unpack field value from :obj:`bytes`.
        Args:
            buffer: Field buffer.
            packet: Packet data.
        Returns:
            Unpacked field value.
        """
        return None  # type: ignore[return-value] 
 
[docs]
class ConditionalField(FieldBase[_TC]):
    """Conditional value for protocol fields.
    Args:
        field: Field instance.
        condition: Field condition function (this function should return a bool
            value and accept the current packet :class:`pcapkit.corekit.infoclass.Info`
            as its only argument).
    """
    @property
    def name(self) -> 'str':
        """Field name."""
        return self._field.name
    @name.setter
    def name(self, value: 'str') -> 'None':
        """Set field name."""
        self._field.name = value
    @property
    def default(self) -> '_TC | NoValueType':
        """Field default value."""
        return self._field.default
    @default.setter
    def default(self, value: '_TC | NoValueType') -> 'None':
        """Set field default value."""
        self._field.default = value
    @default.deleter
    def default(self) -> 'None':
        """Delete field default value."""
        self._field.default = NoValue
    @property
    def template(self) -> 'str':
        """Field template."""
        return self._field.template
    @property
    def length(self) -> 'int':
        """Field size."""
        return self._field.length
    @property
    def optional(self) -> 'bool':
        """Field is optional."""
        return True
    @property
    def field(self) -> 'FieldBase[_TC]':
        """Field instance."""
        return self._field
    def __init__(self, field: 'FieldBase[_TC]',  # pylint: disable=super-init-not-called
                 condition: 'Callable[[dict[str, Any]], bool]') -> 'None':
        self._field = field  # type: FieldBase[_TC]
        self._condition = condition
[docs]
    def __call__(self, packet: 'dict[str, Any]') -> 'Self':
        """Update field attributes.
        Arguments:
            packet: Packet data.
        Returns:
            Updated field instance.
        This method will return a new instance of :class:`ConditionalField`
        instead of updating the current instance.
        """
        new_self = copy.copy(self)
        if new_self._condition(packet):
            new_self._field = new_self._field(packet)
        return new_self 
[docs]
    def pre_process(self, value: '_TC', packet: 'dict[str, Any]') -> 'Any':  # pylint: disable=unused-argument
        """Process field value before construction (packing).
        Arguments:
            value: Field value.
            packet: Packet data.
        Returns:
            Processed field value.
        """
        return self._field.pre_process(value, packet) 
[docs]
    def pack(self, value: 'Optional[_TC]', packet: 'dict[str, Any]') -> 'bytes':
        """Pack field value into :obj:`bytes`.
        Args:
            value: Field value.
            packet: Packet data.
        Returns:
            Packed field value.
        """
        if not self._condition(packet):
            return b''
        return self._field.pack(value, packet) 
[docs]
    def post_process(self, value: 'Any', packet: 'dict[str, Any]') -> '_TC':  # pylint: disable=unused-argument
        """Process field value after parsing (unpacking).
        Args:
            value: Field value.
            packet: Packet data.
        Returns:
            Processed field value.
        """
        return self._field.post_process(value, packet) 
[docs]
    def unpack(self, buffer: 'bytes | IO[bytes]', packet: 'dict[str, Any]') -> '_TC':
        """Unpack field value from :obj:`bytes`.
        Args:
            buffer: Field buffer.
            packet: Packet data.
        Returns:
            Unpacked field value.
        """
        if not self._condition(packet):
            return self._field.default  # type: ignore[return-value]
        return self._field.unpack(buffer, packet) 
[docs]
    def test(self, packet: 'dict[str, Any]') -> 'bool':
        """Test field condition.
        Arguments:
            packet: Current packet.
        Returns:
            bool: Test result.
        """
        return self._condition(packet) 
 
[docs]
class PayloadField(FieldBase[_TP]):
    """Payload value for protocol fields.
    Args:
        length: Field size (in bytes); if a callable is given, it should return
            an integer value and accept the current packet as its only argument.
        default: Field default value.
        protocol: Payload protocol.
        callback: Callback function to be called upon
            :meth:`self.__call__ <pcapkit.corekit.fields.field.FieldBase.__call__>`.
    """
    @property
    def template(self) -> 'str':
        """Field template."""
        return self._template
    @property
    def length(self) -> 'int':
        """Field size."""
        return self._length
    @property
    def optional(self) -> 'bool':
        """Field is optional."""
        return True
    @property
    def protocol(self) -> 'Type[_TP]':
        """Payload protocol."""
        if self._protocol is None:
            from pcapkit.protocols.misc.raw import Raw  # type: ignore[unreachable] # pylint: disable=import-outside-top-level # isort:skip
            return Raw
        return self._protocol
    @protocol.setter
    def protocol(self, protocol: 'Type[_TP] | str') -> 'None':
        """Set payload protocol.
        Arguments:
            protocol: Payload protocol.
        """
        if isinstance(protocol, str):
            from pcapkit.protocols import __proto__  # pylint: disable=import-outside-top-level
            protocol = cast('Type[_TP]', __proto__.get(protocol))
        self._protocol = protocol
    def __init__(self, length: 'int | Callable[[dict[str, Any]], int]' = lambda _: -1,
                 default: '_TP | NoValueType | bytes' = NoValue,
                 protocol: 'Optional[Type[_TP]]' = None,
                 callback: 'Callable[[Self, dict[str, Any]], None]' = lambda *_: None) -> 'None':
        #self._name = '<payload>'
        self._default = default  # type: ignore[assignment]
        self._protocol = protocol  # type: ignore[assignment]
        self._callback = callback
        self._length_callback = None
        if not isinstance(length, int):
            self._length_callback, length = length, -1
        self._length = length
        self._template = f'{self._length}s' if self._length >= 0 else '1024s'  # use a reasonable default
[docs]
    def __call__(self, packet: 'dict[str, Any]') -> 'Self':
        """Update field attributes.
        Args:
            packet: Packet data.
        Returns:
            Updated field instance.
        This method will return a new instance of :class:`PayloadField`
        instead of updating the current instance.
        """
        new_self = copy.copy(self)
        new_self._callback(new_self, packet)
        if new_self._length_callback is not None:
            new_self._length = new_self._length_callback(packet)
            new_self._template = f'{new_self._length}s'
        return new_self 
[docs]
    def pack(self, value: 'Optional[_TP | Schema | bytes]', packet: 'dict[str, Any]') -> 'bytes':
        """Pack field value into :obj:`bytes`.
        Args:
            value: Field value.
            packet: Packet data.
        Returns:
            Packed field value.
        """
        if value is None:
            if self._default is NoValue:
                raise NoDefaultValue(f'Field {self.name} has no default value.')
            value = cast('_TP', self._default)
        from pcapkit.protocols.schema.schema import \
            
Schema  # pylint: disable=import-outside-top-level
        if isinstance(value, bytes):
            return value
        if isinstance(value, Schema):
            return value.pack()
        return value.data  # type: ignore[union-attr] 
[docs]
    def unpack(self, buffer: 'bytes | IO[bytes]', packet: 'dict[str, Any]') -> '_TP':
        """Unpack field value from :obj:`bytes`.
        Args:
            buffer: Field buffer.
            packet: Packet data.
        Returns:
            Unpacked field value.
        """
        if self._protocol is None:
            if isinstance(buffer, bytes):  # type: ignore[unreachable]
                return cast('_TP', buffer)
            return cast('_TP', buffer.read())
        if isinstance(buffer, bytes):
            file = io.BytesIO(buffer)  # type: IO[bytes]
        else:
            file = buffer
        length = self._length if self._length > 0 else None
        return self._protocol(file, length)  # type: ignore[abstract] 
 
[docs]
class SwitchField(FieldBase[_TC]):
    """Conditional type-switching field for protocol schema.
    Args:
        selector: Callable function to select field type, which should accept
            the current packet as its only argument and return a field instance.
    """
    @property
    def name(self) -> 'str':
        """Field name."""
        return self._field.name
    @name.setter
    def name(self, value: 'str') -> 'None':
        """Set field name."""
        self._field.name = value
    @property
    def default(self) -> '_TC | NoValueType':
        """Field default value."""
        return self._field.default
    @default.setter
    def default(self, value: '_TC | NoValueType') -> 'None':
        """Set field default value."""
        self._field.default = value
    @default.deleter
    def default(self) -> 'None':
        """Delete field default value."""
        self._field.default = NoValue
    @property
    def template(self) -> 'str':
        """Field template."""
        return self._field.template
    @property
    def length(self) -> 'int':
        """Field size."""
        return self._field.length
    @property
    def optional(self) -> 'bool':
        """Field is optional."""
        return True
    @property
    def field(self) -> 'FieldBase[_TC]':
        """Field instance."""
        return self._field
    def __init__(self, selector: 'Callable[[dict[str, Any]], FieldBase[_TC]]' = lambda _: NoValueField()) -> 'None':  # type: ignore[assignment,return-value]
        #self._name = '<switch>'
        self._field = cast('FieldBase[_TC]', NoValueField())
        self._selector = selector
[docs]
    def __call__(self, packet: 'dict[str, Any]') -> 'SwitchField[_TC]':
        """Call field.
        Args:
            packet: Packet data.
        Returns:
            New field instance.
        This method will return a new instance of :class:`SwitchField`
        instead of updating the current instance.
        """
        new_self = copy.copy(self)
        new_self._field = new_self._selector(packet)(packet)
        new_self._field.name = self.name
        return new_self 
[docs]
    def pre_process(self, value: '_TC', packet: 'dict[str, Any]') -> 'Any':  # pylint: disable=unused-argument
        """Process field value before construction (packing).
        Arguments:
            value: Field value.
            packet: Packet data.
        Returns:
            Processed field value.
        """
        if self._field is None:
            return NoValue  # type: ignore[unreachable]
        return self._field.pre_process(value, packet) 
[docs]
    def pack(self, value: 'Optional[_TC]', packet: 'dict[str, Any]') -> 'bytes':
        """Pack field value into :obj:`bytes`.
        Args:
            value: Field value.
            packet: Packet data.
        Returns:
            Packed field value.
        """
        if self._field is None:
            return b''  # type: ignore[unreachable]
        return self._field.pack(value, packet) 
[docs]
    def post_process(self, value: 'Any', packet: 'dict[str, Any]') -> '_TC':  # pylint: disable=unused-argument
        """Process field value after parsing (unpacking).
        Args:
            value: Field value.
            packet: Packet data.
        Returns:
            Processed field value.
        """
        if self._field is None:
            return NoValue  # type: ignore[unreachable]
        return self._field.post_process(value, packet) 
[docs]
    def unpack(self, buffer: 'bytes | IO[bytes]', packet: 'dict[str, Any]') -> '_TC':
        """Unpack field value from :obj:`bytes`.
        Args:
            buffer: Field buffer.
            packet: Packet data.
        Returns:
            Unpacked field value.
        """
        if self._field is None:
            return None  # type: ignore[unreachable]
        return self._field.unpack(buffer, packet) 
 
[docs]
class SchemaField(FieldBase[_TS]):
    """Schema field for protocol schema.
    Args:
        length: Field size (in bytes); if a callable is given, it should return
            an integer value and accept the current packet as its only argument.
        schema: Field schema.
        default: Default value for field.
        packet: Optional packet data for unpacking and/or packing purposes.
        callback: Callback function to process field value, which should accept
            the current field and the current packet as its arguments.
    """
    @property
    def length(self) -> 'int':
        """Field size."""
        return self._length  # type: ignore[has-type]
    @property
    def optional(self) -> 'bool':
        """Field is optional."""
        return True
    @property
    def schema(self) -> 'Type[_TS]':
        """Field schema."""
        return self._schema
    def __init__(self, length: 'int | Callable[[dict[str, Any]], int]' = lambda _: -1,
                 schema: 'Optional[Type[_TS]]' = None,
                 default: '_TS | NoValueType | bytes' = NoValue,
                 packet: 'Optional[dict[str, Any]]' = None,
                 callback: 'Callable[[Self, dict[str, Any]], None]' = lambda *_: None) -> 'None':
        #self._name = '<schema>'
        self._callback = callback
        if packet is None:
            packet = {}
        self._packet = packet
        if schema is None:
            raise FieldError('Schema field must have a schema.')
        self._schema = schema
        if isinstance(default, bytes):
            default = cast('_TS', schema.unpack(default))  # type: ignore[call-arg,misc]
        self._default = default
        self._length_callback = None
        if not isinstance(length, int):
            self._length_callback, length = length, -1
        self._length = length
        self._template = f'{self._length}s' if self._length >= 0 else '1024s'  # use a reasonable default
[docs]
    def __call__(self, packet: 'dict[str, Any]') -> 'Self':
        """Update field attributes.
        Args:
            packet: Packet data.
        Returns:
            New field instance.
        This method will return a new instance of :class:`SchemaField`
        instead of updating the current instance.
        """
        new_self = copy.copy(self)
        new_self._callback(new_self, packet)
        if new_self._length_callback is not None:
            new_self._length = new_self._length_callback(packet)
            new_self._template = f'{new_self._length}s' if self._length >= 0 else '1024s'  # use a reasonable default
        return new_self 
[docs]
    def pack(self, value: 'Optional[_TS | bytes]', packet: 'dict[str, Any]') -> 'bytes':
        """Pack field value into :obj:`bytes`.
        Args:
            value: Field value.
            packet: Packet data.
        Returns:
            Packed field value.
        Notes:
            We will use ``packet`` as a ``__packet__`` key in the packet context
            passed to the underlying :class:`~pcapkit.protocols.schema.schema.Schema`
            for packing purposes.
        """
        if value is None:
            if self._default is NoValue:
                raise NoDefaultValue(f'Field {self.name} has no default value.')
            value = cast('_TS', self._default)
        if isinstance(value, bytes):
            return value
        packet.update(self._packet)
        return value.pack({
            '__packet__': packet,
        }) 
[docs]
    def unpack(self, buffer: 'bytes | IO[bytes]', packet: 'dict[str, Any]') -> '_TS':
        """Unpack field value from :obj:`bytes`.
        Args:
            buffer: Field buffer.
            packet: Packet data.
        Returns:
            Unpacked field value.
        Notes:
            We will use ``packet`` as a ``__packet__`` key in the packet context
            passed to the underlying :class:`~pcapkit.protocols.schema.schema.Schema`
            for unpacking purposes.
        """
        if isinstance(buffer, bytes):
            file = io.BytesIO(buffer)  # type: IO[bytes]
        else:
            file = buffer
        packet.update(self._packet)
        return cast('_TS', self._schema.unpack(file, self.length, {  # type: ignore[call-arg,misc]
            '__packet__': packet,
        })) 
 
[docs]
class ForwardMatchField(FieldBase[_TC]):
    """Schema field for non-capturing forward matching.
    Args:
        field: Field to forward match.
    """
    @property
    def name(self) -> 'str':
        """Field name."""
        return self._field.name
    @name.setter
    def name(self, value: 'str') -> 'None':
        """Set field name."""
        self._field.name = value
    @property
    def default(self) -> '_TC | NoValueType':
        """Field default value."""
        return self._field.default
    @default.setter
    def default(self, value: '_TC | NoValueType') -> 'None':
        """Set field default value."""
        self._field.default = value
    @default.deleter
    def default(self) -> 'None':
        """Delete field default value."""
        self._field.default = NoValue
    @property
    def template(self) -> 'str':
        """Field template."""
        return self._field.template
    @property
    def length(self) -> 'int':
        """Field size."""
        return self._field.length
    @property
    def optional(self) -> 'bool':
        """Field is optional."""
        return True
    @property
    def field(self) -> 'FieldBase[_TC]':
        """Field instance."""
        return self._field
    def __init__(self, field: 'FieldBase[_TC]') -> 'None':
        #self._name = '<forward_match>'
        self._field = field
[docs]
    def __call__(self, packet: 'dict[str, Any]') -> 'Self':
        """Update field attributes.
        Arguments:
            packet: Packet data.
        Returns:
            Updated field instance.
        This method will return a new instance of :class:`ConditionalField`
        instead of updating the current instance.
        """
        new_self = copy.copy(self)
        new_self._field = new_self._field(packet)
        return new_self 
[docs]
    def pack(self, value: 'Optional[_TC]', packet: 'dict[str, Any]') -> 'bytes':
        """Pack field value into :obj:`bytes`.
        Args:
            value: Field value.
            packet: Packet data.
        Returns:
            Packed field value.
        """
        return b'' 
[docs]
    def unpack(self, buffer: 'bytes | IO[bytes]', packet: 'dict[str, Any]') -> '_TC':
        """Unpack field value from :obj:`bytes`.
        Args:
            buffer: Field buffer.
            packet: Packet data.
        Returns:
            Unpacked field value.
        """
        return self._field.unpack(buffer, packet)