blob: 6f42ef32fa696151a1c68ddae664b22b3a7d8949 [file] [log] [blame]
Ivan Levkivskyi03e3c342018-02-18 12:41:58 +00001from _weakrefset import WeakSet
2
3
4def get_cache_token():
5 """Returns the current ABC cache token.
6
7 The token is an opaque object (supporting equality testing) identifying the
8 current version of the ABC cache for virtual subclasses. The token changes
9 with every call to ``register()`` on any ABC.
10 """
11 return ABCMeta._abc_invalidation_counter
12
13
14class ABCMeta(type):
15 """Metaclass for defining Abstract Base Classes (ABCs).
16
17 Use this metaclass to create an ABC. An ABC can be subclassed
18 directly, and then acts as a mix-in class. You can also register
19 unrelated concrete classes (even built-in classes) and unrelated
20 ABCs as 'virtual subclasses' -- these and their descendants will
21 be considered subclasses of the registering ABC by the built-in
22 issubclass() function, but the registering ABC won't show up in
23 their MRO (Method Resolution Order) nor will method
24 implementations defined by the registering ABC be callable (not
25 even via super()).
26 """
27
28 # A global counter that is incremented each time a class is
29 # registered as a virtual subclass of anything. It forces the
30 # negative cache to be cleared before its next use.
31 # Note: this counter is private. Use `abc.get_cache_token()` for
32 # external code.
33 _abc_invalidation_counter = 0
34
35 def __new__(mcls, name, bases, namespace, **kwargs):
36 cls = super().__new__(mcls, name, bases, namespace, **kwargs)
37 # Compute set of abstract method names
38 abstracts = {name
39 for name, value in namespace.items()
40 if getattr(value, "__isabstractmethod__", False)}
41 for base in bases:
42 for name in getattr(base, "__abstractmethods__", set()):
43 value = getattr(cls, name, None)
44 if getattr(value, "__isabstractmethod__", False):
45 abstracts.add(name)
46 cls.__abstractmethods__ = frozenset(abstracts)
47 # Set up inheritance registry
48 cls._abc_registry = WeakSet()
49 cls._abc_cache = WeakSet()
50 cls._abc_negative_cache = WeakSet()
51 cls._abc_negative_cache_version = ABCMeta._abc_invalidation_counter
52 return cls
53
54 def register(cls, subclass):
55 """Register a virtual subclass of an ABC.
56
57 Returns the subclass, to allow usage as a class decorator.
58 """
59 if not isinstance(subclass, type):
60 raise TypeError("Can only register classes")
61 if issubclass(subclass, cls):
62 return subclass # Already a subclass
63 # Subtle: test for cycles *after* testing for "already a subclass";
64 # this means we allow X.register(X) and interpret it as a no-op.
65 if issubclass(cls, subclass):
66 # This would create a cycle, which is bad for the algorithm below
67 raise RuntimeError("Refusing to create an inheritance cycle")
68 cls._abc_registry.add(subclass)
69 ABCMeta._abc_invalidation_counter += 1 # Invalidate negative cache
70 return subclass
71
72 def _dump_registry(cls, file=None):
73 """Debug helper to print the ABC registry."""
74 print(f"Class: {cls.__module__}.{cls.__qualname__}", file=file)
75 print(f"Inv. counter: {get_cache_token()}", file=file)
76 for name in cls.__dict__:
77 if name.startswith("_abc_"):
78 value = getattr(cls, name)
79 if isinstance(value, WeakSet):
80 value = set(value)
81 print(f"{name}: {value!r}", file=file)
82
83 def _abc_registry_clear(cls):
84 """Clear the registry (for debugging or testing)."""
85 cls._abc_registry.clear()
86
87 def _abc_caches_clear(cls):
88 """Clear the caches (for debugging or testing)."""
89 cls._abc_cache.clear()
90 cls._abc_negative_cache.clear()
91
92 def __instancecheck__(cls, instance):
93 """Override for isinstance(instance, cls)."""
94 # Inline the cache checking
95 subclass = instance.__class__
96 if subclass in cls._abc_cache:
97 return True
98 subtype = type(instance)
99 if subtype is subclass:
100 if (cls._abc_negative_cache_version ==
101 ABCMeta._abc_invalidation_counter and
102 subclass in cls._abc_negative_cache):
103 return False
104 # Fall back to the subclass check.
105 return cls.__subclasscheck__(subclass)
106 return any(cls.__subclasscheck__(c) for c in (subclass, subtype))
107
108 def __subclasscheck__(cls, subclass):
109 """Override for issubclass(subclass, cls)."""
110 # Check cache
111 if subclass in cls._abc_cache:
112 return True
113 # Check negative cache; may have to invalidate
114 if cls._abc_negative_cache_version < ABCMeta._abc_invalidation_counter:
115 # Invalidate the negative cache
116 cls._abc_negative_cache = WeakSet()
117 cls._abc_negative_cache_version = ABCMeta._abc_invalidation_counter
118 elif subclass in cls._abc_negative_cache:
119 return False
120 # Check the subclass hook
121 ok = cls.__subclasshook__(subclass)
122 if ok is not NotImplemented:
123 assert isinstance(ok, bool)
124 if ok:
125 cls._abc_cache.add(subclass)
126 else:
127 cls._abc_negative_cache.add(subclass)
128 return ok
129 # Check if it's a direct subclass
130 if cls in getattr(subclass, '__mro__', ()):
131 cls._abc_cache.add(subclass)
132 return True
133 # Check if it's a subclass of a registered class (recursive)
134 for rcls in cls._abc_registry:
135 if issubclass(subclass, rcls):
136 cls._abc_cache.add(subclass)
137 return True
138 # Check if it's a subclass of a subclass (recursive)
139 for scls in cls.__subclasses__():
140 if issubclass(subclass, scls):
141 cls._abc_cache.add(subclass)
142 return True
143 # No dice; update negative cache
144 cls._abc_negative_cache.add(subclass)
145 return False