# -*- coding: utf-8 -*-
"""Multi-Mapping Dictionary
==============================
.. module:: pcapkit.corekit.multidict
:mod:`pcapkit.corekit.multidict` contains multi-mapping dictionary classes,
which are used to store multiple mappings of the same key. The implementation
is inspired and based on the `Werkzeug`_ project.
.. _Werkzeug: https://werkzeug.palletsprojects.com/
"""
import copy
from typing import TYPE_CHECKING, Generic, TypeVar, cast, overload
from pcapkit.utilities.exceptions import MissingKeyError, UnsupportedCall
if TYPE_CHECKING:
from typing import Any, Iterable, Iterator, Mapping, NoReturn, Optional, SupportsIndex
from typing_extensions import Literal
__all__ = ['MultiDict', 'OrderedMultiDict']
###############################################################################
# Type variables
###############################################################################
_T = TypeVar('_T')
_KT = TypeVar('_KT')
_VT = TypeVar('_VT')
###############################################################################
# Internals
###############################################################################
class _omd_bucket(Generic[_KT, _VT]):
"""Wraps values in the :class:`OrderedMultiDict`.
This makes it possible to keep an order over multiple different keys. It
requires a lot of extra memory and slows down access a lit, but makes it
possible to access elements in ``O(1)`` and iterate in ``O(n)``
"""
prev: 'Optional[_omd_bucket]'
next: 'Optional[_omd_bucket]'
key: '_KT'
value: '_VT'
__slots__ = ('prev', 'key', 'value', 'next')
def __init__(self, omd: 'OrderedMultiDict', key: '_KT', value: '_VT') -> 'None':
self.prev = omd._last_bucket
self.key = key
self.value = value
self.next = None
if omd._first_bucket is None:
omd._first_bucket = self
if omd._last_bucket is not None:
omd._last_bucket.next = self
omd._last_bucket = self
def unlink(self, omd: 'OrderedMultiDict') -> 'None':
if self.prev:
self.prev.next = self.next
if self.next:
self.next.prev = self.prev
if omd._first_bucket is self: # pylint: disable=protected-access
omd._first_bucket = self.next # pylint: disable=protected-access
if omd._last_bucket is self: # pylint: disable=protected-access
omd._last_bucket = self.prev # pylint: disable=protected-access
class _Missing:
def __repr__(self) -> 'str':
return "no value"
def __reduce__(self) -> 'str':
return "_missing"
_missing = _Missing()
###############################################################################
# Helpers
###############################################################################
def iter_multi_items(mapping: 'Mapping[_KT, _VT | Iterable[_VT]] | Iterable[tuple[_KT, _VT]]') -> 'Iterator[tuple[_KT, _VT]]': # pylint: disable=line-too-long
"""Iterates over the items of a mapping yielding keys and values
without dropping any from more complex structures.
"""
if isinstance(mapping, MultiDict):
yield from mapping.items(multi=True)
elif isinstance(mapping, dict):
for key, value in mapping.items():
if isinstance(value, (tuple, list)):
for v in value:
yield key, v
else:
yield key, value
else:
yield from mapping # type: ignore[misc]
###############################################################################
# Data structures
###############################################################################
[docs]
class MultiDict(dict, Generic[_KT, _VT]):
"""A :class:`MultiDict` is a dictionary subclass customized to deal with
multiple values for the same key.
:class:`MultiDict` implements all standard dictionary methods. Internally,
it saves all values for a key as a list, but the standard :obj:`dict`
access methods will only return the first value for a key. If you want to
gain access to the other values, too, you have to use the :meth:`getlist`
and similar methods.
Args:
mapping: The initial value for the :class:`MultiDict`. Either a
regular :obj:`dict`, an iterable of ``(key, value)`` tuples, or
:obj:`None`.
It behaves like a normal :obj:`dict` thus all :obj:`dict` functions will
only return the first value when multiple values for one key are found.
See Also:
The class is inspired from and based on the `Werkzeug`_ project (c.f.
``werkzeug.datastructures.MultiDict``).
"""
def __init__(self, mapping: 'Optional[dict[_KT, _VT] | Iterable[tuple[_KT, _VT]]]' = None) -> 'None': # pylint: disable=line-too-long
if isinstance(mapping, MultiDict):
dict.__init__(self, ((k, v[:]) for k, v in mapping.items()))
elif isinstance(mapping, dict):
tmp = {} # type: dict[_KT, list[_VT]]
for key, value in mapping.items():
if isinstance(value, (tuple, list)):
if len(value) == 0:
continue
value = list(value)
else:
value = [value]
tmp[key] = value
dict.__init__(self, tmp)
else:
tmp = {}
for key, value in mapping or ():
tmp.setdefault(key, []).append(value)
dict.__init__(self, tmp)
def __getstate__(self) -> 'dict[_KT, list[_VT]]':
return dict(self.lists())
def __setstate__(self, value: 'Iterable[tuple[_KT, list[_VT]]]') -> 'None':
dict.clear(self)
dict.update(self, value) # type: ignore[arg-type]
def __iter__(self) -> 'Iterator[_KT]':
return dict.__iter__(self)
def __getitem__(self, key: '_KT') -> '_VT':
"""Return the first data value for this key.
Args:
key: The key to be looked up.
Raises:
KeyError: if the key does not exist.
"""
if key in self:
lst = dict.__getitem__(self, key)
if len(lst) > 0:
return lst[0]
raise MissingKeyError(key, quiet=True)
def __setitem__(self, key: '_KT', value: '_VT') -> 'None':
"""Like :meth:`add` but removes an existing key first.
Args:
key: The key for the value.
value: The value to set.
"""
dict.__setitem__(self, key, [value])
[docs]
def add(self, key: '_KT', value: '_VT') -> 'None':
"""Adds a new value for the key.
Args:
key: The key for the value.
value: The value to add.
"""
dict.setdefault(self, key, []).append(value) # type: ignore[arg-type,attr-defined]
@overload
def get(self, key: '_KT') -> '_VT | _T': ...
@overload
def get(self, key: '_KT', default: '_VT | _T' = ...) -> '_VT | _T': ...
[docs]
def get(self, key: '_KT', default: '_VT | _T' = None) -> '_VT | _T': # type: ignore[assignment,misc]
try:
return self[key]
except MissingKeyError:
return default
[docs]
def getlist(self, key: '_KT') -> 'list[_VT]':
"""Return the list of items for a given key.
If that key is not in the :class:`MultiDict`, the return value
will be an empty list.
Args:
key: The key to be looked up.
Returns:
A :obj:`list` of all the values for the key.
"""
try:
rv = dict.__getitem__(self, key) # type: list[_VT]
except KeyError:
return []
return list(rv)
[docs]
def setlist(self, key: '_KT', new_list: 'Iterable[_VT]') -> 'None':
"""Remove the old values for a key and add new ones.
Notes:
The list you pass the values in will be shallow-copied before it is
inserted in the dictionary.
Args:
key: The key for which the values are set.
new_list: An iterable with the new values for the key. Old values
are removed first.
"""
dict.__setitem__(self, key, list(new_list))
[docs]
def setdefault(self, key: '_KT', default: 'Optional[_VT]' = None) -> '_VT':
"""If key is in the dictionary, returns its value.
If not, set it to default and return default.
Args:
key: The key to be looked up.
default: The value to be set.
Returns:
The value of the key.
"""
if key not in self:
self[key] = cast('_VT', default)
else:
default = self[key]
return default # type: ignore[return-value]
[docs]
def setlistdefault(self, key: '_KT', default_list: 'Optional[Iterable[_VT]]' = None) -> 'list[_VT]':
"""Like :meth:`setdefault` but sets multiple values.
The list returned is not a copy, but the list that is actually used
internally. This means that you can put new values into the
:class:`dict <MultiDict>` by appending items to the list.
Args:
key: The key to be looked up.
default_list: An iterable of default values. It is either copied
(in case it was a :obj:`list`) or converted into a :obj:`list`
before returned.
"""
if key not in self:
default_list = list(default_list or ())
dict.__setitem__(self, key, default_list)
else:
default_list = cast('list[_VT]', dict.__getitem__(self, key))
return default_list
[docs]
def items(self, multi: 'bool' = False) -> 'Iterable[tuple[_KT, _VT]]': # type: ignore[override]
"""Return an interator of ``(key, value)`` paris.
Args:
multi: If set to :obj:`True` the iterator returned will have a pair
for each value of each key. Otherwise it will only contain
pairs for the first value of each key.
"""
for key, values in dict.items(self):
if multi:
for value in values:
yield key, value
else:
yield key, values[0]
[docs]
def lists(self) -> 'Iterator[tuple[_KT, list[_VT]]]':
"""Return an iterator of ``(key, values)`` pairs, where ``values`` is
the :obj:`list` of all values associated with the key."""
for key, values in dict.items(self):
yield key, list(values)
[docs]
def values(self) -> 'Iterator[_VT]': # type: ignore[override]
"""Returns an iterator of the first value on every key's value list."""
for values in dict.values(self):
yield values[0]
[docs]
def listvalues(self) -> 'Iterator[list[_VT]]':
"""Return an iterator of all values associated with a key. Zipping
:meth:`keys` and this is the same as calling :meth:`lists`."""
return dict.values(self) # type: ignore[return-value]
def copy(self) -> 'MultiDict[_KT, _VT]':
"""Return a shallow copy of this object."""
return self.__class__(self)
def deepcopy(self, memo: 'Optional[dict]' = None) -> 'MultiDict[_KT, _VT]':
"""Return a deep copy of this object."""
return self.__class__(copy.deepcopy(self.to_dict(flat=False), memo=memo)) # type: ignore[arg-type]
@overload
def to_dict(self, flat: 'Literal[True]' = ...) -> 'dict[_KT, _VT]': ...
@overload
def to_dict(self, flat: 'Literal[False]') -> 'dict[_KT, list[_VT]]': ...
[docs]
def to_dict(self, flat: 'bool' = True) -> 'dict[_KT, _VT] | dict[_KT, list[_VT]]':
"""Return the contents as regular :obj:`dict`.
If ``flat`` is :obj:`True` the returned :obj:`dict` will only have the
first item present, if ``flat`` is :obj:`False` all values will be
returned as lists.
Args:
flat: If set to :obj:`False` the :obj:`dict` returned will have
lists with all the values in it. Otherwise it will only contain
the first value for each key.
"""
if flat:
return dict(self.items())
return dict(self.lists())
[docs]
def update(self, mapping: 'Mapping[_KT, _VT] | Iterable[tuple[_KT, _VT]]') -> 'None': # type: ignore[override]
""":meth:`update` extends rather than replaces existing key lists.
If the value :obj:`list` for a key in ``other_dict`` is empty, no new
values will be added to the :obj:`dict` and the key will not be
created.
Args:
mapping: The extending value for the :class:`MultiDict`. Either a
regular :obj:`dict`, an iterable of ``(key, value)`` tuples, or
:obj:`None`.
"""
for key, value in iter_multi_items(mapping):
MultiDict.add(self, key, value)
@overload
def pop(self, key: '_KT') -> '_VT': ...
@overload
def pop(self, key: '_KT', default: '_VT | _T' = ...) -> '_VT | _T': ...
[docs]
def pop(self, key: '_KT', default: '_VT | _T' = _missing) -> '_VT | _T': # type: ignore[assignment,misc]
"""Pop the first item for a :obj:`list` on the :obj:`dict`.
Afterwards the ``key`` is removed from the :obj:`dict`, so additional
values are discarded.
Args:
key: The key to pop.
default: If provided the value to return if the key was not in the
dictionary.
"""
try:
lst = dict.pop(self, key)
if len(lst) == 0:
raise MissingKeyError(key)
return lst[0]
except KeyError:
if default is not _missing:
return default
raise MissingKeyError(key) from None
[docs]
def popitem(self) -> 'tuple[_KT, _VT]':
"""Pop an item from the :obj:`dict`."""
try:
item = dict.popitem(self)
if len(item[1]) == 0: # type: ignore[arg-type]
raise MissingKeyError(item[0])
return (item[0], item[1][0]) # type: ignore[index,return-value]
except KeyError as e:
raise MissingKeyError(e.args[0]) from None
[docs]
def poplist(self, key: '_KT') -> 'list[_VT]':
"""Pop the :obj:`list` for a key from the :obj:`dict`.
If the key is not in the :obj:`dict` an empty :obj:`list` is returned.
Notes:
If the key does no longer exist a :obj:`list` is returned instead
of raising an error.
Args:
key: The key to pop.
"""
return dict.pop(self, key, [])
[docs]
def popitemlist(self) -> 'tuple[_KT, list[_VT]]':
"""Pop a ``(key, list)`` :obj:`tuple` from the :obj:`dict`."""
try:
return dict.popitem(self) # type: ignore[return-value]
except KeyError as e:
raise MissingKeyError(e.args[0]) from None
def __copy__(self) -> 'MultiDict[_KT, _VT]':
return self.copy()
def __deepcopy__(self, memo: 'Optional[dict]' = None) -> 'MultiDict[_KT, _VT]':
return self.deepcopy(memo=memo)
def __repr__(self) -> 'str':
return f'{type(self).__name__}({list(self.items(multi=True))!r})'
[docs]
class OrderedMultiDict(MultiDict[_KT, _VT]):
"""Works like a regular :class:`MultiDict` but preserves the order of the
fields.
Args:
mapping: The initial value for the :class:`MultiDict`. Either a
regular :obj:`dict`, an iterable of ``(key, value)`` tuples, or
:obj:`None`.
To convert the ordered multi dict into a :obj:`list` you can us the
:meth:`items` method and pass it ``multi=True``.
In general an :class:`OrderedMultiDict` is an order of magnitude slower
than a :class:`MultiDict`.
Notes:
Due to a limitation in Python you cannot convert an ordered multi dict
into a regular :obj:`dict` by using ``dict(multidict)``. Instead you
have to use the :meth:`to_dict` method, otherwise the internal bucket
objects are exposed.
"""
_first_bucket: 'Optional[_omd_bucket[_KT, _VT]]'
_last_bucket: 'Optional[_omd_bucket[_KT, _VT]]'
def __init__(self, mapping: 'Optional[dict[_KT, _VT] | Iterable[tuple[_KT, _VT]]]' = None) -> 'None': # pylint: disable=line-too-long
dict.__init__(self) # pylint: disable=non-parent-init-called
self._first_bucket = self._last_bucket = None
if mapping is not None:
OrderedMultiDict.update(self, mapping)
def __eq__(self, other: 'Any') -> 'bool':
if not isinstance(other, MultiDict):
return NotImplemented
if isinstance(other, OrderedMultiDict):
iter1 = iter(self.items(multi=True))
iter2 = iter(other.items(multi=True))
try:
for k1, v1 in iter1:
k2, v2 = next(iter2)
if k1 != k2 or v1 != v2:
return False
except StopIteration:
return False
try:
next(iter2)
except StopIteration:
return True
return False
if len(self) != len(other):
return False
for key, values in self.lists():
if other.getlist(key) != values:
return False
return True
__hash__ = None # type: ignore[assignment]
def __reduce_ex__(self, protocol: 'SupportsIndex') -> 'tuple[type, tuple[list[tuple[_KT, _VT]]]]':
return type(self), (list(self.items(multi=True)),)
def __getstate__(self) -> 'list[tuple[_KT, _VT]]': # type: ignore[override]
return list(self.items(multi=True))
def __setstate__(self, values: 'Iterable[tuple[_KT, _VT]]') -> 'None': # type: ignore[override]
dict.clear(self)
for key, value in values:
self.add(key, value)
def __getitem__(self, key: '_KT') -> '_VT':
if key in self:
return dict.__getitem__(self, key)[0].value
raise MissingKeyError(key, quiet=True)
def __setitem__(self, key: '_KT', value: '_VT') -> 'None':
self.poplist(key)
self.add(key, value)
def __delitem__(self, key: '_KT') -> 'None':
self.pop(key)
def keys(self) -> 'Iterator[_KT]': # type: ignore[override]
return (key for key, _ in self.items())
def __iter__(self) -> 'Iterator[_KT]':
return iter(self.keys())
def values(self) -> 'Iterator[_VT]': # type: ignore[override]
return (value for _, value in self.items())
def items(self, multi: 'bool' = False) -> 'Iterator[tuple[_KT, _VT]]': # type: ignore[override]
ptr = self._first_bucket
if multi:
while ptr is not None:
yield ptr.key, ptr.value
ptr = ptr.next
else:
returned_keys = set() # type: 'set[_KT]'
while ptr is not None:
if ptr.key not in returned_keys:
returned_keys.add(ptr.key)
yield ptr.key, ptr.value
ptr = ptr.next
def lists(self) -> 'Iterator[tuple[_KT, list[_VT]]]':
returned_keys = set() # type: 'set[_KT]'
ptr = self._first_bucket
while ptr is not None:
if ptr.key not in returned_keys:
yield ptr.key, self.getlist(ptr.key)
returned_keys.add(ptr.key)
ptr = ptr.next
def listvalues(self) -> 'Iterator[list[_VT]]':
for _, values in self.lists():
yield values
def add(self, key: '_KT', value: '_VT') -> 'None':
dict.setdefault(self, key, []).append(_omd_bucket(self, key, value)) # type: ignore[arg-type,attr-defined]
def getlist(self, key: '_KT') -> 'list[_VT]':
try:
rv = dict.__getitem__(self, key)
except KeyError:
return []
return [x.value for x in rv]
def setlist(self, key: '_KT', new_list: 'Iterable[_VT]') -> 'None':
self.poplist(key)
for value in new_list:
self.add(key, value)
def setlistdefault(self, key: '_KT', default_list: 'Optional[Iterable[_VT]]' = None) -> 'NoReturn':
raise UnsupportedCall('setlistdefault is unsupported for ordered multi dicts')
def update(self, mapping: 'Mapping[_KT, _VT] | Iterable[tuple[_KT, _VT]]') -> 'None': # type: ignore[override]
for key, value in iter_multi_items(mapping):
OrderedMultiDict.add(self, key, value)
def poplist(self, key: '_KT') -> 'list[_VT]':
buckets = dict.pop(self, key, []) # type: list[_omd_bucket[_KT, _VT]]
for bucket in buckets:
bucket.unlink(self)
return [x.value for x in buckets]
@overload
def pop(self, key: '_KT') -> '_VT': ...
@overload
def pop(self, key: '_KT', default: '_VT | _T' = ...) -> '_VT | _T': ...
def pop(self, key: '_KT', default: '_VT | _T' = _missing) -> '_VT | _T': # type: ignore[assignment,misc]
try:
buckets = dict.pop(self, key) # type: list[_omd_bucket[_KT, _VT]]
except KeyError:
if default is not _missing:
return default
raise MissingKeyError(key) from None
for bucket in buckets:
bucket.unlink(self)
return buckets[0].value
def popitem(self) -> 'tuple[_KT, _VT]':
try:
key, buckets = cast('tuple[_KT, list[_omd_bucket[_KT, _VT]]]', dict.popitem(self))
except KeyError as e:
raise MissingKeyError(str(e)) from None
for bucket in buckets:
bucket.unlink(self)
return key, buckets[0].value
def popitemlist(self) -> 'tuple[_KT, list[_VT]]':
try:
key, buckets = cast('tuple[_KT, list[_omd_bucket[_KT, _VT]]]', dict.popitem(self))
except KeyError as e:
raise MissingKeyError(str(e)) from None
for bucket in buckets:
bucket.unlink(self)
return key, [x.value for x in buckets]