blob: c870ae9048b4f131a71beb1b5827ba13f0eda2f3 [file] [log] [blame]
Ivan Levkivskyi03e3c342018-02-18 12:41:58 +00001from _weakrefset import WeakSet
2
3
4def get_cache_token():
5 """Returns the current ABC cache token.
6
7 The token is an opaque object (supporting equality testing) identifying the
8 current version of the ABC cache for virtual subclasses. The token changes
9 with every call to ``register()`` on any ABC.
10 """
11 return ABCMeta._abc_invalidation_counter
12
13
14class ABCMeta(type):
15 """Metaclass for defining Abstract Base Classes (ABCs).
16
17 Use this metaclass to create an ABC. An ABC can be subclassed
18 directly, and then acts as a mix-in class. You can also register
19 unrelated concrete classes (even built-in classes) and unrelated
20 ABCs as 'virtual subclasses' -- these and their descendants will
21 be considered subclasses of the registering ABC by the built-in
22 issubclass() function, but the registering ABC won't show up in
23 their MRO (Method Resolution Order) nor will method
24 implementations defined by the registering ABC be callable (not
25 even via super()).
26 """
27
28 # A global counter that is incremented each time a class is
29 # registered as a virtual subclass of anything. It forces the
30 # negative cache to be cleared before its next use.
31 # Note: this counter is private. Use `abc.get_cache_token()` for
32 # external code.
33 _abc_invalidation_counter = 0
34
Serhiy Storchaka2085bd02019-06-01 11:00:15 +030035 def __new__(mcls, name, bases, namespace, /, **kwargs):
Ivan Levkivskyi03e3c342018-02-18 12:41:58 +000036 cls = super().__new__(mcls, name, bases, namespace, **kwargs)
37 # Compute set of abstract method names
38 abstracts = {name
39 for name, value in namespace.items()
40 if getattr(value, "__isabstractmethod__", False)}
41 for base in bases:
42 for name in getattr(base, "__abstractmethods__", set()):
43 value = getattr(cls, name, None)
44 if getattr(value, "__isabstractmethod__", False):
45 abstracts.add(name)
46 cls.__abstractmethods__ = frozenset(abstracts)
47 # Set up inheritance registry
48 cls._abc_registry = WeakSet()
49 cls._abc_cache = WeakSet()
50 cls._abc_negative_cache = WeakSet()
51 cls._abc_negative_cache_version = ABCMeta._abc_invalidation_counter
52 return cls
53
54 def register(cls, subclass):
55 """Register a virtual subclass of an ABC.
56
57 Returns the subclass, to allow usage as a class decorator.
58 """
59 if not isinstance(subclass, type):
60 raise TypeError("Can only register classes")
61 if issubclass(subclass, cls):
62 return subclass # Already a subclass
63 # Subtle: test for cycles *after* testing for "already a subclass";
64 # this means we allow X.register(X) and interpret it as a no-op.
65 if issubclass(cls, subclass):
66 # This would create a cycle, which is bad for the algorithm below
67 raise RuntimeError("Refusing to create an inheritance cycle")
68 cls._abc_registry.add(subclass)
69 ABCMeta._abc_invalidation_counter += 1 # Invalidate negative cache
70 return subclass
71
72 def _dump_registry(cls, file=None):
73 """Debug helper to print the ABC registry."""
74 print(f"Class: {cls.__module__}.{cls.__qualname__}", file=file)
75 print(f"Inv. counter: {get_cache_token()}", file=file)
76 for name in cls.__dict__:
77 if name.startswith("_abc_"):
78 value = getattr(cls, name)
79 if isinstance(value, WeakSet):
80 value = set(value)
81 print(f"{name}: {value!r}", file=file)
82
83 def _abc_registry_clear(cls):
84 """Clear the registry (for debugging or testing)."""
85 cls._abc_registry.clear()
86
87 def _abc_caches_clear(cls):
88 """Clear the caches (for debugging or testing)."""
89 cls._abc_cache.clear()
90 cls._abc_negative_cache.clear()
91
92 def __instancecheck__(cls, instance):
93 """Override for isinstance(instance, cls)."""
94 # Inline the cache checking
95 subclass = instance.__class__
96 if subclass in cls._abc_cache:
97 return True
98 subtype = type(instance)
99 if subtype is subclass:
100 if (cls._abc_negative_cache_version ==
101 ABCMeta._abc_invalidation_counter and
102 subclass in cls._abc_negative_cache):
103 return False
104 # Fall back to the subclass check.
105 return cls.__subclasscheck__(subclass)
106 return any(cls.__subclasscheck__(c) for c in (subclass, subtype))
107
108 def __subclasscheck__(cls, subclass):
109 """Override for issubclass(subclass, cls)."""
jab40472dd2018-03-23 00:26:06 +1300110 if not isinstance(subclass, type):
111 raise TypeError('issubclass() arg 1 must be a class')
Ivan Levkivskyi03e3c342018-02-18 12:41:58 +0000112 # Check cache
113 if subclass in cls._abc_cache:
114 return True
115 # Check negative cache; may have to invalidate
116 if cls._abc_negative_cache_version < ABCMeta._abc_invalidation_counter:
117 # Invalidate the negative cache
118 cls._abc_negative_cache = WeakSet()
119 cls._abc_negative_cache_version = ABCMeta._abc_invalidation_counter
120 elif subclass in cls._abc_negative_cache:
121 return False
122 # Check the subclass hook
123 ok = cls.__subclasshook__(subclass)
124 if ok is not NotImplemented:
125 assert isinstance(ok, bool)
126 if ok:
127 cls._abc_cache.add(subclass)
128 else:
129 cls._abc_negative_cache.add(subclass)
130 return ok
131 # Check if it's a direct subclass
132 if cls in getattr(subclass, '__mro__', ()):
133 cls._abc_cache.add(subclass)
134 return True
135 # Check if it's a subclass of a registered class (recursive)
136 for rcls in cls._abc_registry:
137 if issubclass(subclass, rcls):
138 cls._abc_cache.add(subclass)
139 return True
140 # Check if it's a subclass of a subclass (recursive)
141 for scls in cls.__subclasses__():
142 if issubclass(subclass, scls):
143 cls._abc_cache.add(subclass)
144 return True
145 # No dice; update negative cache
146 cls._abc_negative_cache.add(subclass)
147 return False