| from _weakrefset import WeakSet |
| |
| |
| def get_cache_token(): |
| """Returns the current ABC cache token. |
| |
| The token is an opaque object (supporting equality testing) identifying the |
| current version of the ABC cache for virtual subclasses. The token changes |
| with every call to ``register()`` on any ABC. |
| """ |
| return ABCMeta._abc_invalidation_counter |
| |
| |
| class ABCMeta(type): |
| """Metaclass for defining Abstract Base Classes (ABCs). |
| |
| Use this metaclass to create an ABC. An ABC can be subclassed |
| directly, and then acts as a mix-in class. You can also register |
| unrelated concrete classes (even built-in classes) and unrelated |
| ABCs as 'virtual subclasses' -- these and their descendants will |
| be considered subclasses of the registering ABC by the built-in |
| issubclass() function, but the registering ABC won't show up in |
| their MRO (Method Resolution Order) nor will method |
| implementations defined by the registering ABC be callable (not |
| even via super()). |
| """ |
| |
| # A global counter that is incremented each time a class is |
| # registered as a virtual subclass of anything. It forces the |
| # negative cache to be cleared before its next use. |
| # Note: this counter is private. Use `abc.get_cache_token()` for |
| # external code. |
| _abc_invalidation_counter = 0 |
| |
| def __new__(mcls, name, bases, namespace, **kwargs): |
| cls = super().__new__(mcls, name, bases, namespace, **kwargs) |
| # Compute set of abstract method names |
| abstracts = {name |
| for name, value in namespace.items() |
| if getattr(value, "__isabstractmethod__", False)} |
| for base in bases: |
| for name in getattr(base, "__abstractmethods__", set()): |
| value = getattr(cls, name, None) |
| if getattr(value, "__isabstractmethod__", False): |
| abstracts.add(name) |
| cls.__abstractmethods__ = frozenset(abstracts) |
| # Set up inheritance registry |
| cls._abc_registry = WeakSet() |
| cls._abc_cache = WeakSet() |
| cls._abc_negative_cache = WeakSet() |
| cls._abc_negative_cache_version = ABCMeta._abc_invalidation_counter |
| return cls |
| |
| def register(cls, subclass): |
| """Register a virtual subclass of an ABC. |
| |
| Returns the subclass, to allow usage as a class decorator. |
| """ |
| if not isinstance(subclass, type): |
| raise TypeError("Can only register classes") |
| if issubclass(subclass, cls): |
| return subclass # Already a subclass |
| # Subtle: test for cycles *after* testing for "already a subclass"; |
| # this means we allow X.register(X) and interpret it as a no-op. |
| if issubclass(cls, subclass): |
| # This would create a cycle, which is bad for the algorithm below |
| raise RuntimeError("Refusing to create an inheritance cycle") |
| cls._abc_registry.add(subclass) |
| ABCMeta._abc_invalidation_counter += 1 # Invalidate negative cache |
| return subclass |
| |
| def _dump_registry(cls, file=None): |
| """Debug helper to print the ABC registry.""" |
| print(f"Class: {cls.__module__}.{cls.__qualname__}", file=file) |
| print(f"Inv. counter: {get_cache_token()}", file=file) |
| for name in cls.__dict__: |
| if name.startswith("_abc_"): |
| value = getattr(cls, name) |
| if isinstance(value, WeakSet): |
| value = set(value) |
| print(f"{name}: {value!r}", file=file) |
| |
| def _abc_registry_clear(cls): |
| """Clear the registry (for debugging or testing).""" |
| cls._abc_registry.clear() |
| |
| def _abc_caches_clear(cls): |
| """Clear the caches (for debugging or testing).""" |
| cls._abc_cache.clear() |
| cls._abc_negative_cache.clear() |
| |
| def __instancecheck__(cls, instance): |
| """Override for isinstance(instance, cls).""" |
| # Inline the cache checking |
| subclass = instance.__class__ |
| if subclass in cls._abc_cache: |
| return True |
| subtype = type(instance) |
| if subtype is subclass: |
| if (cls._abc_negative_cache_version == |
| ABCMeta._abc_invalidation_counter and |
| subclass in cls._abc_negative_cache): |
| return False |
| # Fall back to the subclass check. |
| return cls.__subclasscheck__(subclass) |
| return any(cls.__subclasscheck__(c) for c in (subclass, subtype)) |
| |
| def __subclasscheck__(cls, subclass): |
| """Override for issubclass(subclass, cls).""" |
| if not isinstance(subclass, type): |
| raise TypeError('issubclass() arg 1 must be a class') |
| # Check cache |
| if subclass in cls._abc_cache: |
| return True |
| # Check negative cache; may have to invalidate |
| if cls._abc_negative_cache_version < ABCMeta._abc_invalidation_counter: |
| # Invalidate the negative cache |
| cls._abc_negative_cache = WeakSet() |
| cls._abc_negative_cache_version = ABCMeta._abc_invalidation_counter |
| elif subclass in cls._abc_negative_cache: |
| return False |
| # Check the subclass hook |
| ok = cls.__subclasshook__(subclass) |
| if ok is not NotImplemented: |
| assert isinstance(ok, bool) |
| if ok: |
| cls._abc_cache.add(subclass) |
| else: |
| cls._abc_negative_cache.add(subclass) |
| return ok |
| # Check if it's a direct subclass |
| if cls in getattr(subclass, '__mro__', ()): |
| cls._abc_cache.add(subclass) |
| return True |
| # Check if it's a subclass of a registered class (recursive) |
| for rcls in cls._abc_registry: |
| if issubclass(subclass, rcls): |
| cls._abc_cache.add(subclass) |
| return True |
| # Check if it's a subclass of a subclass (recursive) |
| for scls in cls.__subclasses__(): |
| if issubclass(subclass, scls): |
| cls._abc_cache.add(subclass) |
| return True |
| # No dice; update negative cache |
| cls._abc_negative_cache.add(subclass) |
| return False |