# -*- coding: utf-8 -*-
"""container field class"""
import copy
import io
from typing import TYPE_CHECKING, Generic, TypeVar, cast
from pcapkit.corekit.fields.field import FieldBase
from pcapkit.corekit.multidict import OrderedMultiDict
from pcapkit.utilities.compat import List
from pcapkit.utilities.exceptions import FieldValueError
__all__ = [
'ListField', 'OptionField',
]
if TYPE_CHECKING:
from collections import defaultdict
from enum import IntEnum as StdlibEnum
from typing import IO, Any, Callable, Optional, Type
from aenum import IntEnum as AenumEnum
from typing_extensions import Self
from pcapkit.protocols.schema.schema import Schema
_TL = TypeVar('_TL', 'Schema', 'FieldBase', 'bytes')
_TS = TypeVar('_TS', bound='Schema')
[docs]
class ListField(FieldBase[List[_TL]], Generic[_TL]):
"""Field list for protocol fields.
Args:
length: Field size (in bytes); if a callable is given, it should return
an integer value and accept the current packet as its only argument.
item_type: Field type of the contained items.
callback: Callback function to be called upon
:meth:`self.__call__ <pcapkit.corekit.fields.field.FieldBase.__call__>`.
This field is used to represent a list of fields, as in the case of lists of
constrant-length-field items in a protocol.
"""
@property
def length(self) -> 'int':
"""Field size."""
return self._length
@property
def optional(self) -> 'bool':
"""Field is optional."""
return True
def __init__(self, length: 'int | Callable[[dict[str, Any]], int]' = lambda _: -1,
item_type: 'Optional[FieldBase]' = None,
callback: 'Callable[[Self, dict[str, Any]], None]' = lambda *_: None) -> 'None':
#self._name = '<list>'
self._callback = callback
self._item_type = item_type
self._length_callback = None
if not isinstance(length, int):
self._length_callback, length = length, -1
self._length = length
self._template = '0s'
[docs]
def __call__(self, packet: 'dict[str, Any]') -> 'Self':
"""Update field attributes.
Args:
packet: Packet data.
Returns:
Updated field instance.
This method will return a new instance of :class:`ListField`
instead of updating the current instance.
"""
new_self = copy.copy(self)
new_self._callback(self, packet)
if new_self._length_callback is not None:
new_self._length = new_self._length_callback(packet)
new_self._template = f'{new_self._length}s'
return new_self
[docs]
def pack(self, value: 'Optional[list[_TL]]', packet: 'dict[str, Any]') -> 'bytes':
"""Pack field value into :obj:`bytes`.
Args:
value: Field value.
packet: Packet data.
Returns:
Packed field value.
"""
if value is None:
return b''
from pcapkit.protocols.schema.schema import \
Schema # pylint: disable=import-outside-top-level
temp = [] # type: list[bytes]
for item in value:
if isinstance(item, bytes):
temp.append(item)
elif isinstance(item, Schema):
temp.append(item.pack(packet))
elif self._item_type is not None:
temp.append(self._item_type.pack(item, packet))
else:
raise FieldValueError(f'Field {self.name} has invalid value.')
return b''.join(temp)
[docs]
def unpack(self, buffer: 'bytes | IO[bytes]', packet: 'dict[str, Any]') -> 'bytes | list[_TL]':
"""Unpack field value from :obj:`bytes`.
Args:
buffer: Field buffer.
packet: Packet data.
Returns:
Unpacked field value.
"""
length = self._length
if isinstance(buffer, bytes):
file = io.BytesIO(buffer) # type: IO[bytes]
else:
file = buffer
if self._item_type is None:
return file.read(length)
from pcapkit.corekit.fields.misc import SchemaField
is_schema = isinstance(self._item_type, SchemaField)
temp = [] # type: list[_TL]
while length > 0:
field = self._item_type(packet)
if is_schema:
data = cast('SchemaField', self._item_type).unpack(file, packet)
length -= len(data)
if length < 0:
raise FieldValueError(f'Field {self.name} has invalid length.')
else:
length -= field.length
if length < 0:
raise FieldValueError(f'Field {self.name} has invalid length.')
buffer = file.read(field.length)
data = field.unpack(buffer, packet)
temp.append(data)
return temp
[docs]
class OptionField(ListField, Generic[_TS]):
"""Field list for protocol options.
Args:
length: Field size (in bytes); if a callable is given, it should return
an integer value and accept the current packet as its only argument.
base_schema: Base schema for option fields.
type_name: Name of the option type field.
registry: Option registry, as in a mapping from option types (enumeration
values) to option schemas, with the default value being the unknown
option schema.
eool: Enumeration of the EOOL (end-of-option-list, or equivalent) option
callback: Callback function to be called upon
:meth:`self.__call__ <pcapkit.corekit.fields.field.FieldBase.__call__>`.
This field is used to represent a list of fields, as in the case of lists of
options and/or parameters in a protocol.
"""
@property
def base_schema(self) -> 'Type[_TS]':
"""Base schema."""
return self._base_schema
@property
def type_name(self) -> 'str':
"""Type name."""
return self._type_name
@property
def registry(self) -> 'defaultdict[int | StdlibEnum | AenumEnum, Type[_TS]]':
"""Option registry."""
return self._registry
@property
def eool(self) -> 'int | StdlibEnum | AenumEnum':
"""EOOL option."""
return self._eool
@property
def option_padding(self) -> 'int':
"""Length option padding data."""
return self._option_padding
def __init__(self, length: 'int | Callable[[dict[str, Any]], int]' = lambda _: -1,
base_schema: 'Optional[Type[_TS]]' = None,
type_name: 'str' = 'type',
registry: 'Optional[defaultdict[int | StdlibEnum | AenumEnum, Type[_TS]]]' = None,
eool: 'Optional[int | StdlibEnum | AenumEnum]' = None,
callback: 'Callable[[Self, dict[str, Any]], None]' = lambda *_: None) -> 'None':
super().__init__(length, None, callback)
#self._name = '<option>'
self._eool = eool
self._option_padding = 0
if base_schema is None:
raise FieldValueError('Field <option> has no base schema.')
self._base_schema = base_schema
if not hasattr(self._base_schema, type_name):
raise FieldValueError(f'Field <option> has no type field "{type_name}".')
self._type_name = type_name
if registry is None:
raise FieldValueError('Field <option> has no registry.')
self._registry = registry
[docs]
def unpack(self, buffer: 'bytes | IO[bytes]', packet: 'dict[str, Any]') -> 'list[_TS]':
"""Unpack field value from :obj:`bytes`.
Args:
buffer: Field buffer.
packet: Packet data.
Returns:
Unpacked field value.
Important:
If the option list ended before the specified size limit,
set :attr:`self.option_padding <OptionField.option_padding>`
as the remaining length to the ``packet`` argument such that
the next fields can be aware of such informations.
"""
length = self._length
if isinstance(buffer, bytes):
file = io.BytesIO(buffer) # type: IO[bytes]
else:
file = buffer
# make a copy of the ``packet`` dict so that we can include
# parsed option schema in the ``packet`` dict
new_packet = packet.copy()
new_packet[self.name] = OrderedMultiDict()
temp = [] # type: list[_TS]
while length > 0:
# unpack option type using base schema
meta = self._base_schema.unpack(file, length, packet) # type: ignore[call-arg,misc,var-annotated]
code = cast('int', meta[self._type_name])
schema = self._registry[code]
# rewind to the beginning of the option
file.seek(-len(meta), io.SEEK_CUR)
# unpack option using option schema
data = schema.unpack(file, length, packet) # type: ignore[call-arg,misc,var-annotated]
new_packet[self.name].add(code, data)
temp.append(data)
# update length
length -= len(data)
# check for EOOL
if code == self._eool:
break
self._option_padding = length
return temp