Source code for deluxe.mappings

# Copyright (c) 2024 - Gilles Coissac
# This file is part of standard-deluxe library.
#
# standard-deluxe is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published
# by the Free Software Foundation, either version 3 of the License,
# or (at your option) any later version.
#
# standard-deluxe is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty
# of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
# See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with standard-deluxe. If not, see <https://www.gnu.org/licenses/>
#
# ruff: noqa: A002
"""A collection of Mapping types.

This module provides specialized mapping classes built on top of Python's
:class:`collections.abc.Mapping` and :class:`collections.OrderedDict`.
"""

from __future__ import annotations

from collections import OrderedDict
from collections.abc import (
    Callable,
    ItemsView,
    Iterable,
    Iterator,
    KeysView,
    Mapping,
    MutableMapping,
    ValuesView,
)
from copy import deepcopy
from typing import (
    TYPE_CHECKING,
    Any,
    ClassVar,
    Literal,
    Protocol,
    Self,
    TypeVar,
    cast,
    final,
    overload,
)

from deluxe.importers import import_fresh_module


if TYPE_CHECKING:
    from types import ModuleType


__all__ = (
    "FilteredView",
    "FrozenMap",
    "OrderableDict",
    "view_filter",
)


_KT = TypeVar("_KT")
_VT = TypeVar("_VT")
_VT_co = TypeVar("_VT_co", covariant=True)


@final
class _Link(Protocol[_KT]):  # pragma: no cover
    __slots__: tuple[str, ...] = ("__weakref__", "key", "next", "prev")

    def __init__(self) -> None:
        self.key: _KT
        self.next: _Link[_KT]
        self.prev: _Link[_KT]


class _OrderableDictMeta(type):
    def __new__(
        cls: type[_OrderableDictMeta],
        name: str,
        bases: tuple[type, ...],
        namespace: dict[str, object],
        **kwds: object,
    ) -> _OrderableDictMeta:
        collections_ = cast(
            "ModuleType",
            import_fresh_module("collections", blocked=["_collections"]),
        )
        bases = (collections_.OrderedDict,)
        return super().__new__(cls, name, bases, namespace, **kwds)


class _SupportsKeysAndGetItem(Protocol[_KT, _VT_co]):
    def keys(self) -> Iterable[_KT]: ...
    def __getitem__(self, key: _KT, /) -> _VT_co: ...


[docs] @final class OrderableDict(OrderedDict[_KT, _VT], metaclass=_OrderableDictMeta): # pyright: ignore[reportUninitializedInstanceVariable] """A :class:`~collections.OrderedDict` subclass with positional key manipulation. Extends :class:`~collections.OrderedDict` with :meth:`~OrderableDict.after` and :meth:`~OrderableDict.before` methods for inserting, moving, or querying entries relative to other keys and :attr:`~OrderableDict.first` / :attr:`~OrderableDict.last` properties for O(1) access to boundary keys. """ def __init__( self, other: _SupportsKeysAndGetItem[_KT, _VT] | Iterable[tuple[_KT, _VT]] = (), /, **kwds: _VT, ) -> None: super().__init__(other, **kwds) self.__map: dict[_KT, _Link[_KT]] = getattr(self, "_OrderedDict__map") # noqa: B009 self.__root: _Link[_KT] = getattr(self, "_OrderedDict__root") # noqa: B009 @property def first(self) -> _KT: """Return the first key.""" return self.__root.next.key @property def last(self) -> _KT: """Return the last key.""" return self.__root.prev.key def _insert(self, *, key: _KT, value: _VT, _from: _KT) -> bool: if _from not in self: msg = f"{_from} not in OrderedDict" raise KeyError(msg) exist: bool = key in self super().__setitem__(key, value) return not exist def _move_key(self, key: _KT, other: _KT, before: bool = False) -> None: ref = self.__map[other] link = self.__map[key] link_prev = link.prev link_next = link.next soft_link = link_next.prev # remove ref link_prev.next = link_next link_next.prev = link_prev # insert ref if before: ref_prev = ref.prev link.prev = ref_prev link.next = ref_prev.next ref.prev = soft_link ref_prev.next = link else: ref_next = ref.next link.prev = ref_next.prev link.next = ref_next ref.next = link ref_next.prev = soft_link
[docs] def after( self, key: _KT, other: _KT | None = None, value: _VT | None = None ) -> tuple[_KT, _VT] | None: """Insert, move, or return a (key, value) pair after another key. If ``other`` is not provided, returns the ``(key, value)`` pair found after ``key`` in the ordering. If ``value`` is ``None``, move ``key`` after ``other`` in the dict (the entry must already exist). If ``value`` is provided, insert the ``(key, value)`` pair after ``other``. If the key already exists in the dict, the value is updated but no move occurs. Args: key: The key to look up, move, or insert. other (``None``, optional): Reference key. If ``None``, the method returns the pair after ``key``. Defaults to ``None``. value (``None``, optional): If provided, insert this value for ``key``; if ``None``, move the existing entry. Defaults to ``None``. Returns: ``tuple`` | ``None``: A ``(key, value)`` pair if ``other`` is ``None``, otherwise ``None``. Raises: KeyError: If ``key`` or ``other`` does not exist in the dict. """ if other is None: if key not in self: msg = f"{key} not in OrderableDict" raise KeyError(msg) next_: _KT = self.__map[key].next.key return (next_, self[next_]) if value is None: if key not in self or other not in self: msg = f"{key} or {other} not in OrderableDict" raise KeyError(msg) self._move_key(key=key, other=other) elif self._insert(key=key, value=value, _from=other): self._move_key(key=key, other=other) return None
[docs] def before( self, key: _KT, other: _KT | None = None, value: _VT | None = None ) -> tuple[_KT, _VT] | None: """Insert, move, or return a (key, value) pair before another key. If ``other`` is not provided, returns the ``(key, value)`` pair found before ``key`` in the ordering. If ``value`` is ``None``, move ``key`` before ``other`` in the dict (the entry must already exist). If ``value`` is provided, insert the ``(key, value)`` pair before ``other``. If the key already exists in the dict, the value is updated but no move occurs. Args: key: The key to look up, move, or insert. other (``None``, optional): Reference key. If ``None``, the method returns the pair before ``key``. Defaults to ``None``. value (``None``, optional): If provided, insert this value for ``key``; if ``None``, move the existing entry. Defaults to ``None``. Returns: ``tuple`` | ``None``: A ``(key, value)`` pair if ``other`` is ``None``, otherwise ``None``. Raises: KeyError: If ``key`` or ``other`` does not exist in the dict. """ if other is None: if key not in self: msg = f"{key} not in OrderableDict" raise KeyError(msg) prev_: _KT = self.__map[key].prev.key return (prev_, self[prev_]) if value is None: if key not in self or other not in self: msg = f"{key} or {other} not in OrderedDict" raise KeyError(msg) self._move_key(key=key, other=other, before=True) elif self._insert(key=key, value=value, _from=other): self._move_key(key=key, other=other, before=True) return None
# @final # class _FrozenDecoratorMeta(type, Generic[_KT, _VT]): # __required_keys__: frozenset[str] = frozenset() # __optional_keys__: frozenset[str] = frozenset() # __total__: bool = True # def __new__( # cls: type[_FrozenDecoratorMeta[_KT, _VT]], # name: str, # bases: tuple[type, ...], # namespace: dict[str, object], # **kwds: object, # ) -> _FrozenDecoratorMeta[_KT, _VT]: # if not any(issubclass(Mapping, base) for base in bases): # raise TypeError # return super().__new__(cls, name, bases, namespace, **kwds) # def __call__(self, *args: object, **kwds: object) -> FrozenMap[_KT, _VT]: # mapping = cast( # "Mapping[_KT, _VT]", # cast("object", super().__call__(*args, **kwds)), # ) # return FrozenMap(mapping)
[docs] class FrozenMap(Mapping[_KT, _VT]): # noqa: PLW1641 """A read-only :class:`~collections.abc.Mapping` that cannot be modified. Items are provided at instantiation and cannot be added, changed, or removed afterward. Note that :class:`FrozenMap` does **not** enforce the immutability of the values themselves; use immutable containers as values if that is required. .. note:: This class exists primarily because :class:`types.MappingProxyType`, which offers similar behavior cannot be subclassed. The following dunder methods are implemented to satisfy the :class:`~collections.abc.Mapping` protocol: * :meth:`~object.__contains__`, :meth:`~object.__getitem__`, :meth:`~object.__len__`, :meth:`~object.__iter__` * :meth:`~object.__eq__` / :meth:`~object.__ne__` compare by items. * :meth:`~object.__repr__` produces a ``FrozenMap({...})`` string. Args: other (:obj:`~collections.abc.Mapping`, optional): The source mapping to freeze. If ``None``, keyword arguments are used instead. Defaults to ``None``. **kwds: Additional key-value pairs merged into the mapping. """ __slots__: ClassVar[tuple[str, ...]] = ("_items", "_keys", "_source", "_values") @overload def __new__(cls, other: Literal[None] = None, /, **kwds: _VT) -> FrozenMap[str, _VT]: ... # noqa: PYI061 @overload def __new__(cls, other: Mapping[_KT, _VT], /, **kwds: _VT) -> FrozenMap[_KT, _VT]: ... def __new__( # noqa: D102 cls, other: Mapping[_KT, _VT] | None = None, /, **kwds: _VT ) -> FrozenMap[_KT, _VT] | FrozenMap[str, _VT]: if other is None: return cls(kwds) # pyright: ignore[reportCallIssue, reportArgumentType] self = object.__new__(cls) self._source = other self_ = cast("Mapping[_KT, _VT]", cast("object", self)) self._keys = KeysView[_KT](self_) self._values = ValuesView(self_) self._items = ItemsView(self_) return self if TYPE_CHECKING: def __init__(self, other: Mapping[_KT, _VT] | None = None, /, **kwds: _VT) -> None: # noqa: ARG002 self._source: Mapping[_KT, _VT] self._keys: KeysView[_KT] self._values: ValuesView[_VT] self._items: ItemsView[_KT, _VT]
[docs] def keys(self) -> KeysView[_KT]: """Return a view of the keys in the mapping. Returns: :obj:`KeysView`: A view of the keys in the mapping. """ return self._keys
[docs] def values(self) -> ValuesView[_VT]: """Return a view of the values in the mapping. Returns: :obj:`ValuesView`: A view of the values in the mapping. """ return self._values
[docs] def items(self) -> ItemsView[_KT, _VT]: """Return a view of the (key, value) pairs in the mapping. Returns: :obj:`ItemsView`: A view of the (key, value) pairs in the mapping. """ return self._items
[docs] def get(self, key: _KT, default: _VT | None = None) -> _VT | None: # pyright: ignore[reportIncompatibleMethodOverride] """Return the value associated with the specified key. If the key is not found in the mapping, the default value is returned. Args: key: The key to look up in the mapping. default (:obj:`~typing.Any` | ``None``, optional): The value to return if the key is not found. Defaults to ``None``. Returns: :obj:`~typing.Any` | ``None``: The value associated with the key if found, otherwise the default value. """ return self._source.get(key, default)
def __contains__(self, key: _KT) -> bool: # pyright: ignore[reportIncompatibleMethodOverride] return key in self._source def __eq__(self, other: object, /) -> bool: if isinstance(other, Mapping): return self.items() == other.items() return NotImplemented def __ne__(self, other: object, /) -> bool: return not self.__eq__(other) def __getitem__(self, key: _KT) -> _VT: return self._source[key] def __len__(self) -> int: return len(self._source) def __iter__(self) -> Iterator[_KT]: for item in self._source.items(): yield item[0] def __repr__(self) -> str: repr_ = ", ".join(f"{i[0]!r}: {i[1]!r}" for i in iter(self.items())) return f"{self.__class__.__name__}({'{'}{repr_}{'}'})"
view_filter = Callable[[Any, Any], bool]
[docs] class FilteredView(FrozenMap[_KT, _VT]): """Dynamically filtered read-only view on a :class:`~collections.abc.Mapping`. Provides a live, filtered view of a source mapping: when the source mapping is modified, this view reflects those changes automatically. Dunder methods :meth:`~object.__contains__`, :meth:`~object.__getitem__`, :meth:`~object.__len__`, and :meth:`~object.__iter__` all respect the filter predicate, only entries for which the filter returns ``True`` are visible. Examples:: >>> d = {'a': 1, 'b': 2, 'c': 3} >>> FilteredView(d, lambda k, v: v > 1) FilteredView({'b': 2, 'c': 3}) Args: source (:obj:`~collections.abc.Mapping`): The source mapping for this view. filter (callable, optional): A callable ``(key, value) -> bool`` that determines whether an entry is included. If ``None``, an identity-like filter is used: entries whose value evaluates to ``False`` are excluded. Defaults to ``None``. """ __slots__: ClassVar[tuple[str, ...]] = ("_filter",) def __new__(cls, source: Mapping[_KT, _VT], filter: view_filter | None = None) -> Self: # noqa: D102 self = cast("Self", super().__new__(cls, source)) self._filter = filter or (lambda _, v: bool(v)) return self if TYPE_CHECKING: def __init__(self, source: Mapping[_KT, _VT], filter: view_filter | None = None) -> None: # noqa: ARG002 self._filter: view_filter
[docs] def get(self, key: _KT, default: _VT | None = None) -> _VT | None: """Return the value associated with the specified key. If the key is not found in the source mapping, or if it is present but excluded by the view's filter, the default value is returned. Args: key: The key to look up in the mapping. default (:obj:`~typing.Any` | ``None``, optional): The value to return if the key is not found or filtered out. Defaults to ``None``. Returns: :obj:`~typing.Any` | ``None``: The value associated with the key if found and not filtered, otherwise the default value. """ if key in self._source and self._filter(key, val := self._source[key]): return val return default
def __contains__(self, key: _KT) -> bool: return key in self._source and self._filter(key, self._source[key]) def _copy(self, deep: bool, memo: dict[int, Any] | None): type_ = type(self._source) if issubclass(type_, MutableMapping): mut = type_() for k, v in self.items(): mut[k] = deepcopy(v, memo) if deep else v return mut tmp: dict[_KT, _VT] = {k: deepcopy(v, memo) if deep else v for k, v in self.items()} try: frozen = type_(tmp) # pyright: ignore[reportCallIssue] except Exception: # noqa: BLE001 # pragma: no cover frozen = FrozenMap(tmp) return frozen def __copy__(self): """Return a filtered shallow copy of the source mapping. Returns: :obj:`~collections.abc.Mapping`: A filtered shallow copy of the same type as the source mapping. """ return self._copy(deep=False, memo=None) def __deepcopy__(self, memo: dict[int, Any] | None = None): """Return a filtered deep copy of the source mapping. Args: memo (:obj:`dict` [ :obj:`int`, :obj:`~typing.Any` ], optional): The memoisation dictionary used by :func:`~copy.deepcopy`. Defaults to ``None``. Returns: :obj:`~collections.abc.Mapping`: A filtered deep copy of the same type as the source mapping. """ return self._copy(deep=True, memo=memo) def __getitem__(self, key: _KT) -> _VT: if self._filter(key, val := self._source[key]): return val raise KeyError(key) def __len__(self) -> int: return len(self._source) - sum(not (self._filter(*i)) for i in self._source.items()) def __iter__(self) -> Iterator[_KT]: for item in self._source.items(): if self._filter(*item): yield item[0]
Mapping.register(FilteredView) # pyright: ignore[reportUnknownMemberType, reportAttributeAccessIssue]