blob: 4e93c12f81549099ccef5ee65fca1ddf83e2be88 [file] [log] [blame]
Benjamin Petersone711caf2008-06-11 16:44:04 +00001#
2# Module which supports allocation of memory from an mmap
3#
4# multiprocessing/heap.py
5#
R. David Murrayd3820662010-12-14 01:41:07 +00006# Copyright (c) 2006-2008, R Oudkerk
Richard Oudkerk3e268aa2012-04-30 12:13:55 +01007# Licensed to PSF under a Contributor Agreement.
Benjamin Petersone711caf2008-06-11 16:44:04 +00008#
9
10import bisect
11import mmap
Benjamin Petersone711caf2008-06-11 16:44:04 +000012import os
13import sys
14import threading
15import itertools
16
17import _multiprocessing
18from multiprocessing.util import Finalize, info
19from multiprocessing.forking import assert_spawning
20
21__all__ = ['BufferWrapper']
22
23#
24# Inheirtable class which wraps an mmap, and from which blocks can be allocated
25#
26
27if sys.platform == 'win32':
28
Antoine Pitrou23bba4c2012-04-18 20:51:15 +020029 import _winapi
Benjamin Petersone711caf2008-06-11 16:44:04 +000030
31 class Arena(object):
32
33 _counter = itertools.count()
34
35 def __init__(self, size):
36 self.size = size
37 self.name = 'pym-%d-%d' % (os.getpid(), next(Arena._counter))
38 self.buffer = mmap.mmap(-1, self.size, tagname=self.name)
Antoine Pitrou23bba4c2012-04-18 20:51:15 +020039 assert _winapi.GetLastError() == 0, 'tagname already in use'
Benjamin Petersone711caf2008-06-11 16:44:04 +000040 self._state = (self.size, self.name)
41
42 def __getstate__(self):
43 assert_spawning(self)
44 return self._state
45
46 def __setstate__(self, state):
47 self.size, self.name = self._state = state
48 self.buffer = mmap.mmap(-1, self.size, tagname=self.name)
Antoine Pitrou23bba4c2012-04-18 20:51:15 +020049 assert _winapi.GetLastError() == _winapi.ERROR_ALREADY_EXISTS
Benjamin Petersone711caf2008-06-11 16:44:04 +000050
51else:
52
53 class Arena(object):
54
55 def __init__(self, size):
56 self.buffer = mmap.mmap(-1, size)
57 self.size = size
58 self.name = None
59
60#
61# Class allowing allocation of chunks of memory from arenas
62#
63
64class Heap(object):
65
66 _alignment = 8
67
68 def __init__(self, size=mmap.PAGESIZE):
69 self._lastpid = os.getpid()
70 self._lock = threading.Lock()
71 self._size = size
72 self._lengths = []
73 self._len_to_seq = {}
74 self._start_to_block = {}
75 self._stop_to_block = {}
76 self._allocated_blocks = set()
77 self._arenas = []
Charles-François Natali778db492011-07-02 14:35:49 +020078 # list of pending blocks to free - see free() comment below
79 self._pending_free_blocks = []
Benjamin Petersone711caf2008-06-11 16:44:04 +000080
81 @staticmethod
82 def _roundup(n, alignment):
83 # alignment must be a power of 2
84 mask = alignment - 1
85 return (n + mask) & ~mask
86
87 def _malloc(self, size):
88 # returns a large enough block -- it might be much larger
89 i = bisect.bisect_left(self._lengths, size)
90 if i == len(self._lengths):
91 length = self._roundup(max(self._size, size), mmap.PAGESIZE)
92 self._size *= 2
93 info('allocating a new mmap of length %d', length)
94 arena = Arena(length)
95 self._arenas.append(arena)
96 return (arena, 0, length)
97 else:
98 length = self._lengths[i]
99 seq = self._len_to_seq[length]
100 block = seq.pop()
101 if not seq:
102 del self._len_to_seq[length], self._lengths[i]
103
104 (arena, start, stop) = block
105 del self._start_to_block[(arena, start)]
106 del self._stop_to_block[(arena, stop)]
107 return block
108
109 def _free(self, block):
110 # free location and try to merge with neighbours
111 (arena, start, stop) = block
112
113 try:
114 prev_block = self._stop_to_block[(arena, start)]
115 except KeyError:
116 pass
117 else:
118 start, _ = self._absorb(prev_block)
119
120 try:
121 next_block = self._start_to_block[(arena, stop)]
122 except KeyError:
123 pass
124 else:
125 _, stop = self._absorb(next_block)
126
127 block = (arena, start, stop)
128 length = stop - start
129
130 try:
131 self._len_to_seq[length].append(block)
132 except KeyError:
133 self._len_to_seq[length] = [block]
134 bisect.insort(self._lengths, length)
135
136 self._start_to_block[(arena, start)] = block
137 self._stop_to_block[(arena, stop)] = block
138
139 def _absorb(self, block):
140 # deregister this block so it can be merged with a neighbour
141 (arena, start, stop) = block
142 del self._start_to_block[(arena, start)]
143 del self._stop_to_block[(arena, stop)]
144
145 length = stop - start
146 seq = self._len_to_seq[length]
147 seq.remove(block)
148 if not seq:
149 del self._len_to_seq[length]
150 self._lengths.remove(length)
151
152 return start, stop
153
Charles-François Natali778db492011-07-02 14:35:49 +0200154 def _free_pending_blocks(self):
155 # Free all the blocks in the pending list - called with the lock held.
156 while True:
157 try:
158 block = self._pending_free_blocks.pop()
159 except IndexError:
160 break
Benjamin Petersone711caf2008-06-11 16:44:04 +0000161 self._allocated_blocks.remove(block)
162 self._free(block)
Charles-François Natali778db492011-07-02 14:35:49 +0200163
164 def free(self, block):
165 # free a block returned by malloc()
166 # Since free() can be called asynchronously by the GC, it could happen
167 # that it's called while self._lock is held: in that case,
168 # self._lock.acquire() would deadlock (issue #12352). To avoid that, a
169 # trylock is used instead, and if the lock can't be acquired
170 # immediately, the block is added to a list of blocks to be freed
171 # synchronously sometimes later from malloc() or free(), by calling
172 # _free_pending_blocks() (appending and retrieving from a list is not
173 # strictly thread-safe but under cPython it's atomic thanks to the GIL).
174 assert os.getpid() == self._lastpid
175 if not self._lock.acquire(False):
176 # can't acquire the lock right now, add the block to the list of
177 # pending blocks to free
178 self._pending_free_blocks.append(block)
179 else:
180 # we hold the lock
181 try:
182 self._free_pending_blocks()
183 self._allocated_blocks.remove(block)
184 self._free(block)
185 finally:
186 self._lock.release()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000187
188 def malloc(self, size):
189 # return a block of right size (possibly rounded up)
190 assert 0 <= size < sys.maxsize
191 if os.getpid() != self._lastpid:
192 self.__init__() # reinitialize after fork
193 self._lock.acquire()
Charles-François Natali778db492011-07-02 14:35:49 +0200194 self._free_pending_blocks()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000195 try:
196 size = self._roundup(max(size,1), self._alignment)
197 (arena, start, stop) = self._malloc(size)
198 new_stop = start + size
199 if new_stop < stop:
200 self._free((arena, new_stop, stop))
201 block = (arena, start, new_stop)
202 self._allocated_blocks.add(block)
203 return block
204 finally:
205 self._lock.release()
206
207#
Richard Oudkerk26cdf1f2012-05-26 22:09:59 +0100208# Class representing a chunk of an mmap -- can be inherited by child process
Benjamin Petersone711caf2008-06-11 16:44:04 +0000209#
210
211class BufferWrapper(object):
212
213 _heap = Heap()
214
215 def __init__(self, size):
216 assert 0 <= size < sys.maxsize
217 block = BufferWrapper._heap.malloc(size)
218 self._state = (block, size)
219 Finalize(self, BufferWrapper._heap.free, args=(block,))
220
Richard Oudkerk26cdf1f2012-05-26 22:09:59 +0100221 def create_memoryview(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000222 (arena, start, stop), size = self._state
Richard Oudkerk26cdf1f2012-05-26 22:09:59 +0100223 return memoryview(arena.buffer)[start:start+size]