blob: d735d9c255feec22a40644450aa827f1326b4ca4 [file] [log] [blame]
Michael Foorde6410c52010-03-29 20:04:23 +00001import unittest
2from test import test_support
3from weakref import proxy, ref, WeakSet
4import operator
5import copy
6import string
7import os
8from random import randrange, shuffle
9import sys
10import warnings
11import collections
12import gc
13import contextlib
Kristján Valur Jónsson222b2842013-12-05 10:03:45 +000014from UserString import UserString as ustr
Michael Foorde6410c52010-03-29 20:04:23 +000015
16
17class Foo:
18 pass
19
20class SomeClass(object):
21 def __init__(self, value):
22 self.value = value
23 def __eq__(self, other):
24 if type(other) != type(self):
25 return False
26 return other.value == self.value
27
28 def __ne__(self, other):
29 return not self.__eq__(other)
30
31 def __hash__(self):
32 return hash((SomeClass, self.value))
33
Antoine Pitrouc56bca32012-03-01 16:26:35 +010034class RefCycle(object):
35 def __init__(self):
36 self.cycle = self
37
Michael Foorde6410c52010-03-29 20:04:23 +000038class TestWeakSet(unittest.TestCase):
39
40 def setUp(self):
41 # need to keep references to them
42 self.items = [SomeClass(c) for c in ('a', 'b', 'c')]
43 self.items2 = [SomeClass(c) for c in ('x', 'y', 'z')]
44 self.letters = [SomeClass(c) for c in string.ascii_letters]
Meador Inge104f1892012-03-04 22:02:17 -060045 self.ab_items = [SomeClass(c) for c in 'ab']
46 self.abcde_items = [SomeClass(c) for c in 'abcde']
47 self.def_items = [SomeClass(c) for c in 'def']
48 self.ab_weakset = WeakSet(self.ab_items)
49 self.abcde_weakset = WeakSet(self.abcde_items)
50 self.def_weakset = WeakSet(self.def_items)
Michael Foorde6410c52010-03-29 20:04:23 +000051 self.s = WeakSet(self.items)
52 self.d = dict.fromkeys(self.items)
53 self.obj = SomeClass('F')
54 self.fs = WeakSet([self.obj])
55
56 def test_methods(self):
57 weaksetmethods = dir(WeakSet)
58 for method in dir(set):
59 if method == 'test_c_api' or method.startswith('_'):
60 continue
61 self.assertIn(method, weaksetmethods,
62 "WeakSet missing method " + method)
63
64 def test_new_or_init(self):
65 self.assertRaises(TypeError, WeakSet, [], 2)
66
67 def test_len(self):
68 self.assertEqual(len(self.s), len(self.d))
69 self.assertEqual(len(self.fs), 1)
70 del self.obj
71 self.assertEqual(len(self.fs), 0)
72
73 def test_contains(self):
74 for c in self.letters:
75 self.assertEqual(c in self.s, c in self.d)
Georg Brandl52f83952011-02-25 10:39:23 +000076 # 1 is not weakref'able, but that TypeError is caught by __contains__
77 self.assertNotIn(1, self.s)
Michael Foorde6410c52010-03-29 20:04:23 +000078 self.assertIn(self.obj, self.fs)
79 del self.obj
80 self.assertNotIn(SomeClass('F'), self.fs)
81
82 def test_union(self):
83 u = self.s.union(self.items2)
84 for c in self.letters:
85 self.assertEqual(c in u, c in self.d or c in self.items2)
86 self.assertEqual(self.s, WeakSet(self.items))
87 self.assertEqual(type(u), WeakSet)
88 self.assertRaises(TypeError, self.s.union, [[]])
89 for C in set, frozenset, dict.fromkeys, list, tuple:
90 x = WeakSet(self.items + self.items2)
91 c = C(self.items2)
92 self.assertEqual(self.s.union(c), x)
Antoine Pitrou94c2d6df52012-03-04 20:47:05 +010093 del c
94 self.assertEqual(len(u), len(self.items) + len(self.items2))
95 self.items2.pop()
96 gc.collect()
97 self.assertEqual(len(u), len(self.items) + len(self.items2))
Michael Foorde6410c52010-03-29 20:04:23 +000098
99 def test_or(self):
100 i = self.s.union(self.items2)
101 self.assertEqual(self.s | set(self.items2), i)
102 self.assertEqual(self.s | frozenset(self.items2), i)
103
104 def test_intersection(self):
Antoine Pitrou94c2d6df52012-03-04 20:47:05 +0100105 s = WeakSet(self.letters)
106 i = s.intersection(self.items2)
Michael Foorde6410c52010-03-29 20:04:23 +0000107 for c in self.letters:
Antoine Pitrou94c2d6df52012-03-04 20:47:05 +0100108 self.assertEqual(c in i, c in self.items2 and c in self.letters)
109 self.assertEqual(s, WeakSet(self.letters))
Michael Foorde6410c52010-03-29 20:04:23 +0000110 self.assertEqual(type(i), WeakSet)
111 for C in set, frozenset, dict.fromkeys, list, tuple:
112 x = WeakSet([])
Antoine Pitrou94c2d6df52012-03-04 20:47:05 +0100113 self.assertEqual(i.intersection(C(self.items)), x)
114 self.assertEqual(len(i), len(self.items2))
115 self.items2.pop()
116 gc.collect()
117 self.assertEqual(len(i), len(self.items2))
Michael Foorde6410c52010-03-29 20:04:23 +0000118
119 def test_isdisjoint(self):
120 self.assertTrue(self.s.isdisjoint(WeakSet(self.items2)))
121 self.assertTrue(not self.s.isdisjoint(WeakSet(self.letters)))
122
123 def test_and(self):
124 i = self.s.intersection(self.items2)
125 self.assertEqual(self.s & set(self.items2), i)
126 self.assertEqual(self.s & frozenset(self.items2), i)
127
128 def test_difference(self):
129 i = self.s.difference(self.items2)
130 for c in self.letters:
131 self.assertEqual(c in i, c in self.d and c not in self.items2)
132 self.assertEqual(self.s, WeakSet(self.items))
133 self.assertEqual(type(i), WeakSet)
134 self.assertRaises(TypeError, self.s.difference, [[]])
135
136 def test_sub(self):
137 i = self.s.difference(self.items2)
138 self.assertEqual(self.s - set(self.items2), i)
139 self.assertEqual(self.s - frozenset(self.items2), i)
140
141 def test_symmetric_difference(self):
142 i = self.s.symmetric_difference(self.items2)
143 for c in self.letters:
144 self.assertEqual(c in i, (c in self.d) ^ (c in self.items2))
145 self.assertEqual(self.s, WeakSet(self.items))
146 self.assertEqual(type(i), WeakSet)
147 self.assertRaises(TypeError, self.s.symmetric_difference, [[]])
Antoine Pitrou94c2d6df52012-03-04 20:47:05 +0100148 self.assertEqual(len(i), len(self.items) + len(self.items2))
149 self.items2.pop()
150 gc.collect()
151 self.assertEqual(len(i), len(self.items) + len(self.items2))
Michael Foorde6410c52010-03-29 20:04:23 +0000152
153 def test_xor(self):
154 i = self.s.symmetric_difference(self.items2)
155 self.assertEqual(self.s ^ set(self.items2), i)
156 self.assertEqual(self.s ^ frozenset(self.items2), i)
157
158 def test_sub_and_super(self):
Meador Inge104f1892012-03-04 22:02:17 -0600159 self.assertTrue(self.ab_weakset <= self.abcde_weakset)
160 self.assertTrue(self.abcde_weakset <= self.abcde_weakset)
161 self.assertTrue(self.abcde_weakset >= self.ab_weakset)
162 self.assertFalse(self.abcde_weakset <= self.def_weakset)
163 self.assertFalse(self.abcde_weakset >= self.def_weakset)
Michael Foorde6410c52010-03-29 20:04:23 +0000164 self.assertTrue(set('a').issubset('abc'))
165 self.assertTrue(set('abc').issuperset('a'))
166 self.assertFalse(set('a').issubset('cbs'))
167 self.assertFalse(set('cbs').issuperset('a'))
168
Meador Inge104f1892012-03-04 22:02:17 -0600169 def test_lt(self):
170 self.assertTrue(self.ab_weakset < self.abcde_weakset)
171 self.assertFalse(self.abcde_weakset < self.def_weakset)
172 self.assertFalse(self.ab_weakset < self.ab_weakset)
173 self.assertFalse(WeakSet() < WeakSet())
174
175 def test_gt(self):
176 self.assertTrue(self.abcde_weakset > self.ab_weakset)
177 self.assertFalse(self.abcde_weakset > self.def_weakset)
178 self.assertFalse(self.ab_weakset > self.ab_weakset)
179 self.assertFalse(WeakSet() > WeakSet())
180
Michael Foorde6410c52010-03-29 20:04:23 +0000181 def test_gc(self):
182 # Create a nest of cycles to exercise overall ref count check
183 s = WeakSet(Foo() for i in range(1000))
184 for elem in s:
185 elem.cycle = s
186 elem.sub = elem
187 elem.set = WeakSet([elem])
188
189 def test_subclass_with_custom_hash(self):
190 # Bug #1257731
191 class H(WeakSet):
192 def __hash__(self):
193 return int(id(self) & 0x7fffffff)
194 s=H()
195 f=set()
196 f.add(s)
197 self.assertIn(s, f)
198 f.remove(s)
199 f.add(s)
200 f.discard(s)
201
202 def test_init(self):
203 s = WeakSet()
204 s.__init__(self.items)
205 self.assertEqual(s, self.s)
206 s.__init__(self.items2)
207 self.assertEqual(s, WeakSet(self.items2))
208 self.assertRaises(TypeError, s.__init__, s, 2);
209 self.assertRaises(TypeError, s.__init__, 1);
210
211 def test_constructor_identity(self):
212 s = WeakSet(self.items)
213 t = WeakSet(s)
214 self.assertNotEqual(id(s), id(t))
215
216 def test_hash(self):
217 self.assertRaises(TypeError, hash, self.s)
218
219 def test_clear(self):
220 self.s.clear()
221 self.assertEqual(self.s, WeakSet([]))
222 self.assertEqual(len(self.s), 0)
223
224 def test_copy(self):
225 dup = self.s.copy()
226 self.assertEqual(self.s, dup)
227 self.assertNotEqual(id(self.s), id(dup))
228
229 def test_add(self):
230 x = SomeClass('Q')
231 self.s.add(x)
232 self.assertIn(x, self.s)
233 dup = self.s.copy()
234 self.s.add(x)
235 self.assertEqual(self.s, dup)
236 self.assertRaises(TypeError, self.s.add, [])
237 self.fs.add(Foo())
238 self.assertTrue(len(self.fs) == 1)
239 self.fs.add(self.obj)
240 self.assertTrue(len(self.fs) == 1)
241
242 def test_remove(self):
243 x = SomeClass('a')
244 self.s.remove(x)
245 self.assertNotIn(x, self.s)
246 self.assertRaises(KeyError, self.s.remove, x)
247 self.assertRaises(TypeError, self.s.remove, [])
248
249 def test_discard(self):
250 a, q = SomeClass('a'), SomeClass('Q')
251 self.s.discard(a)
252 self.assertNotIn(a, self.s)
253 self.s.discard(q)
254 self.assertRaises(TypeError, self.s.discard, [])
255
256 def test_pop(self):
257 for i in range(len(self.s)):
258 elem = self.s.pop()
259 self.assertNotIn(elem, self.s)
260 self.assertRaises(KeyError, self.s.pop)
261
262 def test_update(self):
263 retval = self.s.update(self.items2)
264 self.assertEqual(retval, None)
265 for c in (self.items + self.items2):
266 self.assertIn(c, self.s)
267 self.assertRaises(TypeError, self.s.update, [[]])
268
269 def test_update_set(self):
270 self.s.update(set(self.items2))
271 for c in (self.items + self.items2):
272 self.assertIn(c, self.s)
273
274 def test_ior(self):
275 self.s |= set(self.items2)
276 for c in (self.items + self.items2):
277 self.assertIn(c, self.s)
278
279 def test_intersection_update(self):
280 retval = self.s.intersection_update(self.items2)
281 self.assertEqual(retval, None)
282 for c in (self.items + self.items2):
283 if c in self.items2 and c in self.items:
284 self.assertIn(c, self.s)
285 else:
286 self.assertNotIn(c, self.s)
287 self.assertRaises(TypeError, self.s.intersection_update, [[]])
288
289 def test_iand(self):
290 self.s &= set(self.items2)
291 for c in (self.items + self.items2):
292 if c in self.items2 and c in self.items:
293 self.assertIn(c, self.s)
294 else:
295 self.assertNotIn(c, self.s)
296
297 def test_difference_update(self):
298 retval = self.s.difference_update(self.items2)
299 self.assertEqual(retval, None)
300 for c in (self.items + self.items2):
301 if c in self.items and c not in self.items2:
302 self.assertIn(c, self.s)
303 else:
304 self.assertNotIn(c, self.s)
305 self.assertRaises(TypeError, self.s.difference_update, [[]])
306 self.assertRaises(TypeError, self.s.symmetric_difference_update, [[]])
307
308 def test_isub(self):
309 self.s -= set(self.items2)
310 for c in (self.items + self.items2):
311 if c in self.items and c not in self.items2:
312 self.assertIn(c, self.s)
313 else:
314 self.assertNotIn(c, self.s)
315
316 def test_symmetric_difference_update(self):
317 retval = self.s.symmetric_difference_update(self.items2)
318 self.assertEqual(retval, None)
319 for c in (self.items + self.items2):
320 if (c in self.items) ^ (c in self.items2):
321 self.assertIn(c, self.s)
322 else:
323 self.assertNotIn(c, self.s)
324 self.assertRaises(TypeError, self.s.symmetric_difference_update, [[]])
325
326 def test_ixor(self):
327 self.s ^= set(self.items2)
328 for c in (self.items + self.items2):
329 if (c in self.items) ^ (c in self.items2):
330 self.assertIn(c, self.s)
331 else:
332 self.assertNotIn(c, self.s)
333
334 def test_inplace_on_self(self):
335 t = self.s.copy()
336 t |= t
337 self.assertEqual(t, self.s)
338 t &= t
339 self.assertEqual(t, self.s)
340 t -= t
341 self.assertEqual(t, WeakSet())
342 t = self.s.copy()
343 t ^= t
344 self.assertEqual(t, WeakSet())
345
346 def test_eq(self):
347 # issue 5964
348 self.assertTrue(self.s == self.s)
349 self.assertTrue(self.s == WeakSet(self.items))
350 self.assertFalse(self.s == set(self.items))
351 self.assertFalse(self.s == list(self.items))
352 self.assertFalse(self.s == tuple(self.items))
353 self.assertFalse(self.s == 1)
354
Benjamin Peterson1cf48b42013-05-22 13:25:41 -0700355 def test_ne(self):
356 self.assertTrue(self.s != set(self.items))
357 s1 = WeakSet()
358 s2 = WeakSet()
359 self.assertFalse(s1 != s2)
360
Michael Foorde6410c52010-03-29 20:04:23 +0000361 def test_weak_destroy_while_iterating(self):
362 # Issue #7105: iterators shouldn't crash when a key is implicitly removed
363 # Create new items to be sure no-one else holds a reference
364 items = [SomeClass(c) for c in ('a', 'b', 'c')]
365 s = WeakSet(items)
366 it = iter(s)
367 next(it) # Trigger internal iteration
368 # Destroy an item
369 del items[-1]
370 gc.collect() # just in case
371 # We have removed either the first consumed items, or another one
372 self.assertIn(len(list(it)), [len(items), len(items) - 1])
373 del it
374 # The removal has been committed
375 self.assertEqual(len(s), len(items))
376
377 def test_weak_destroy_and_mutate_while_iterating(self):
378 # Issue #7105: iterators shouldn't crash when a key is implicitly removed
379 items = [SomeClass(c) for c in string.ascii_letters]
380 s = WeakSet(items)
381 @contextlib.contextmanager
382 def testcontext():
383 try:
384 it = iter(s)
385 next(it)
386 # Schedule an item for removal and recreate it
387 u = SomeClass(str(items.pop()))
388 gc.collect() # just in case
389 yield u
390 finally:
391 it = None # should commit all removals
392
393 with testcontext() as u:
394 self.assertNotIn(u, s)
395 with testcontext() as u:
396 self.assertRaises(KeyError, s.remove, u)
397 self.assertNotIn(u, s)
398 with testcontext() as u:
399 s.add(u)
400 self.assertIn(u, s)
401 t = s.copy()
402 with testcontext() as u:
403 s.update(t)
404 self.assertEqual(len(s), len(t))
405 with testcontext() as u:
406 s.clear()
407 self.assertEqual(len(s), 0)
408
Antoine Pitrouc56bca32012-03-01 16:26:35 +0100409 def test_len_cycles(self):
410 N = 20
411 items = [RefCycle() for i in range(N)]
412 s = WeakSet(items)
413 del items
414 it = iter(s)
415 try:
416 next(it)
417 except StopIteration:
418 pass
419 gc.collect()
420 n1 = len(s)
421 del it
422 gc.collect()
423 n2 = len(s)
424 # one item may be kept alive inside the iterator
425 self.assertIn(n1, (0, 1))
426 self.assertEqual(n2, 0)
427
428 def test_len_race(self):
429 # Extended sanity checks for len() in the face of cyclic collection
430 self.addCleanup(gc.set_threshold, *gc.get_threshold())
431 for th in range(1, 100):
432 N = 20
433 gc.collect(0)
434 gc.set_threshold(th, th, th)
435 items = [RefCycle() for i in range(N)]
436 s = WeakSet(items)
437 del items
438 # All items will be collected at next garbage collection pass
439 it = iter(s)
440 try:
441 next(it)
442 except StopIteration:
443 pass
444 n1 = len(s)
445 del it
446 n2 = len(s)
447 self.assertGreaterEqual(n1, 0)
448 self.assertLessEqual(n1, N)
449 self.assertGreaterEqual(n2, 0)
450 self.assertLessEqual(n2, n1)
451
Kristján Valur Jónsson222b2842013-12-05 10:03:45 +0000452 def test_weak_destroy_while_iterating(self):
453 # Issue #7105: iterators shouldn't crash when a key is implicitly removed
454 # Create new items to be sure no-one else holds a reference
455 items = [ustr(c) for c in ('a', 'b', 'c')]
456 s = WeakSet(items)
457 it = iter(s)
458 next(it) # Trigger internal iteration
459 # Destroy an item
460 del items[-1]
461 gc.collect() # just in case
462 # We have removed either the first consumed items, or another one
463 self.assertIn(len(list(it)), [len(items), len(items) - 1])
464 del it
465 # The removal has been committed
466 self.assertEqual(len(s), len(items))
467
468 def test_weak_destroy_and_mutate_while_iterating(self):
469 # Issue #7105: iterators shouldn't crash when a key is implicitly removed
470 items = [ustr(c) for c in string.ascii_letters]
471 s = WeakSet(items)
472 @contextlib.contextmanager
473 def testcontext():
474 try:
475 it = iter(s)
Antoine Pitroubd4b6672013-12-18 00:28:36 +0100476 # Start iterator
477 yielded = ustr(str(next(it)))
Kristján Valur Jónsson222b2842013-12-05 10:03:45 +0000478 # Schedule an item for removal and recreate it
479 u = ustr(str(items.pop()))
Antoine Pitroubd4b6672013-12-18 00:28:36 +0100480 if yielded == u:
481 # The iterator still has a reference to the removed item,
482 # advance it (issue #20006).
483 next(it)
Kristján Valur Jónsson222b2842013-12-05 10:03:45 +0000484 gc.collect() # just in case
485 yield u
486 finally:
487 it = None # should commit all removals
488
489 with testcontext() as u:
490 self.assertFalse(u in s)
491 with testcontext() as u:
492 self.assertRaises(KeyError, s.remove, u)
493 self.assertFalse(u in s)
494 with testcontext() as u:
495 s.add(u)
496 self.assertTrue(u in s)
497 t = s.copy()
498 with testcontext() as u:
499 s.update(t)
500 self.assertEqual(len(s), len(t))
501 with testcontext() as u:
502 s.clear()
503 self.assertEqual(len(s), 0)
504
Michael Foorde6410c52010-03-29 20:04:23 +0000505
506def test_main(verbose=None):
507 test_support.run_unittest(TestWeakSet)
508
509if __name__ == "__main__":
510 test_main(verbose=True)