Issue 2274: Add heapq.heappushpop().
diff --git a/Lib/heapq.py b/Lib/heapq.py
index 39e3800..23f5fcb 100644
--- a/Lib/heapq.py
+++ b/Lib/heapq.py
@@ -127,7 +127,7 @@
"""
__all__ = ['heappush', 'heappop', 'heapify', 'heapreplace', 'merge',
- 'nlargest', 'nsmallest']
+ 'nlargest', 'nsmallest', 'heappushpop']
from itertools import islice, repeat, count, imap, izip, tee
from operator import itemgetter, neg
@@ -165,6 +165,13 @@
_siftup(heap, 0)
return returnitem
+def heappushpop(heap, item):
+ """Fast version of a heappush followed by a heappop."""
+ if heap and item > heap[0]:
+ item, heap[0] = heap[0], item
+ _siftup(heap, 0)
+ return item
+
def heapify(x):
"""Transform list into a heap, in-place, in O(len(heap)) time."""
n = len(x)
@@ -304,7 +311,7 @@
# If available, use C implementation
try:
- from _heapq import heappush, heappop, heapify, heapreplace, nlargest, nsmallest
+ from _heapq import heappush, heappop, heapify, heapreplace, nlargest, nsmallest, heappushpop
except ImportError:
pass
diff --git a/Lib/test/test_heapq.py b/Lib/test/test_heapq.py
index 7f6e4b5..fec027e 100644
--- a/Lib/test/test_heapq.py
+++ b/Lib/test/test_heapq.py
@@ -107,6 +107,34 @@
self.assertRaises(TypeError, self.module.heapreplace, None, None)
self.assertRaises(IndexError, self.module.heapreplace, [], None)
+ def test_nbest_with_pushpop(self):
+ data = [random.randrange(2000) for i in range(1000)]
+ heap = data[:10]
+ self.module.heapify(heap)
+ for item in data[10:]:
+ self.module.heappushpop(heap, item)
+ self.assertEqual(list(self.heapiter(heap)), sorted(data)[-10:])
+ self.assertEqual(self.module.heappushpop([], 'x'), 'x')
+
+ def test_heappushpop(self):
+ h = []
+ x = self.module.heappushpop(h, 10)
+ self.assertEqual((h, x), ([], 10))
+
+ h = [10]
+ x = self.module.heappushpop(h, 10.0)
+ self.assertEqual((h, x), ([10], 10.0))
+ self.assertEqual(type(h[0]), int)
+ self.assertEqual(type(x), float)
+
+ h = [10];
+ x = self.module.heappushpop(h, 9)
+ self.assertEqual((h, x), ([10], 9))
+
+ h = [10];
+ x = self.module.heappushpop(h, 11)
+ self.assertEqual((h, x), ([11], 10))
+
def test_heapsort(self):
# Exercise everything with repeated heapsort checks
for trial in xrange(100):