Source code for pcapkit.corekit.fields.strings

# -*- coding: utf-8 -*-
"""text field class"""

import urllib.parse as urllib_parse
from typing import TYPE_CHECKING, Any, Generic, TypeVar

import chardet

from pcapkit.corekit.fields.field import Field, NoValue
from pcapkit.utilities.compat import Dict

__all__ = [
    '_TextField',
    'StringField',
    'BitField',
    'PaddingField',
]

if TYPE_CHECKING:
    from typing import Callable, Optional, Tuple

    from typing_extensions import Literal, Self

    from pcapkit.corekit.fields.field import NoValueType

    NamespaceEntry = Tuple[int, int]

_T = TypeVar('_T', 'str', 'bytes', 'dict[str, Any]')


[docs] class _TextField(Field[_T], Generic[_T]): """Internal text value for protocol fields. Args: length: Field size (in bytes); if a callable is given, it should return an integer value and accept the current packet as its only argument. default: Field default value, if any. callback: Callback function to be called upon :meth:`self.__call__ <pcapkit.corekit.fields.field.FieldBase.__call__>`. """ def __init__(self, length: 'int | Callable[[dict[str, Any]], int]', default: '_T | NoValueType' = NoValue, callback: 'Callable[[Self, dict[str, Any]], None]' = lambda *_: None) -> 'None': super().__init__(length, default, callback) # type: ignore[arg-type] self._template = f'{self._length}s' if self._length >= 0 else '1024s' # reasonable default
[docs] def __call__(self, packet: 'dict[str, Any]') -> 'Self': """Update field attributes. Args: packet: Packet data. Returns: New instance of :class:`_TextField`. This method will return a new instance of :class:`_TextField` instead of updating the current instance. """ new_self = super().__call__(packet) new_self._template = f'{new_self._length}s' return new_self
[docs] class BytesField(_TextField[bytes]): """Bytes value for protocol fields. Args: length: Field size (in bytes); if a callable is given, it should return an integer value and accept the current packet as its only argument. default: Field default value, if any. callback: Callback function to be called upon :meth:`self.__call__ <pcapkit.corekit.fields.field.FieldBase.__call__>`. """
[docs] def pre_process(self, value: 'bytes', packet: 'dict[str, Any]') -> 'bytes': # pylint: disable=unused-argument """Process field value before construction (packing). Arguments: value: Field value. packet: Packet data. Returns: Processed field value. """ if self._length < 0: self._length = len(value) self._template = f'{self._length}s' return value
[docs] class StringField(_TextField[str]): r"""String value for protocol fields. Args: length: Field size (in bytes); if a callable is given, it should return an integer value and accept the current packet as its only argument. default: Field default value, if any. encoding: The encoding with which to decode the :obj:`bytes`. If not provided, :mod:`pcapkit` will first try detecting its encoding using |chardet|_. The fallback encoding would is **UTF-8**. errors: The error handling scheme to use for the handling of decoding errors. The default is ``'strict'`` meaning that decoding errors raise a :exc:`UnicodeDecodeError`. Other possible values are ``'ignore'`` and ``'replace'`` as well as any other name registered with :func:`codecs.register_error` that can handle :exc:`UnicodeDecodeError`. unquote: Whether to unquote the decoded string as a URL. Should decoding failed , the method will try again replacing ``'%'`` with ``'\x'`` then decoding the ``url`` as ``'utf-8'`` with ``'replace'`` for error handling. callback: Callback function to be called upon :meth:`self.__call__ <pcapkit.corekit.fields.field.FieldBase.__call__>`. .. |chardet| replace:: ``chardet`` .. _chardet: https://chardet.readthedocs.io """ def __init__(self, length: 'int | Callable[[dict[str, Any]], int]', default: 'str | NoValueType' = NoValue, encoding: 'Optional[str]' = None, errors: 'Literal["strict", "ignore", "replace"]' = 'strict', unquote: 'bool' = False, callback: 'Callable[[Self, dict[str, Any]], None]' = lambda *_: None) -> 'None': super().__init__(length, default, callback) self._encoding = encoding self._errors = errors self._unquote = unquote
[docs] def pre_process(self, value: 'str', packet: 'dict[str, Any]') -> 'bytes': # pylint: disable=unused-argument """Process field value before construction (packing). Arguments: value: Field value. packet: Packet data. Returns: Processed field value. """ if self._unquote: value = urllib_parse.quote(value, encoding=self._encoding or 'utf-8', errors=self._errors) if self._length < 0: self._length = len(value) self._template = f'{self._length}s' return value.encode(self._encoding or 'utf-8', self._errors)
[docs] def post_process(self, value: 'bytes', packet: 'dict[str, Any]') -> 'str': # pylint: disable=unused-argument """Process field value after parsing (unpacked). Arguments: value: Field value. packet: Packet data. Returns: Processed field value. """ if self._unquote: try: ret = urllib_parse.unquote(value, encoding=self._encoding or 'utf-8', errors=self._errors) except UnicodeError: ret = urllib_parse.unquote(value.replace(b'%', rb'\x'), encoding='utf-8', errors='replace') else: charset = self._encoding or chardet.detect(value)['encoding'] or 'utf-8' try: ret = value.decode(charset, self._errors) except UnicodeError: ret = value.decode(charset, 'replace') return ret
[docs] class BitField(_TextField[Dict[str, Any]]): """Bit value for protocol fields. Args: length: Field size (in bytes). default: Field default value, if any. namespace: Field namespace (a dict mapping field name to a tuple of start index, and length of the subfield). callback: Callback function to be called upon :meth:`self.__call__ <pcapkit.corekit.fields.field.FieldBase.__call__>`. """ def __init__(self, length: 'int', default: 'dict[str, Any] | NoValueType' = NoValue, namespace: 'Optional[dict[str, NamespaceEntry]]' = None, callback: 'Callable[[Self, dict[str, Any]], None]' = lambda *_: None) -> 'None': super().__init__(length, default, callback) self._namespace = namespace or {}
[docs] def pre_process(self, value: 'dict[str, Any]', packet: 'dict[str, Any]') -> 'bytes': # pylint: disable=unused-argument """Process field value before construction (packing). Arguments: value: Field value. packet: Packet data. Returns: Processed field value. """ buffer = bytearray(self.length * 8) for name, (start, len) in self._namespace.items(): end = start + len buffer[start:end] = f'{value[name]:0{end - start}b}'.encode() return int(b''.join(map(lambda x: b'1' if x else b'0', buffer)), 2).to_bytes(self.length, 'big')
[docs] def post_process(self, value: 'bytes', packet: 'dict[str, Any]') -> 'dict[str, Any]': # pylint: disable=unused-argument """Process field value after parsing (unpacked). Arguments: value: Field value. packet: Packet data. Returns: Processed field value. """ buffer = {} binary = ''.join(f'{byte:08b}' for byte in value) for name, (start, len) in self._namespace.items(): end = start + len buffer[name] = int(binary[start:end], 2) return buffer
[docs] class PaddingField(BytesField): """Bytes value for protocol fields. Args: length: Field size (in bytes); if a callable is given, it should return an integer value and accept the current packet as its only argument. default: Field default value, if any. callback: Callback function to be called upon :meth:`self.__call__ <pcapkit.corekit.fields.field.FieldBase.__call__>`. """