blob: 0a25ef05c7f2ce14d67f1e292b2b07881fa9e953 [file] [log] [blame]
Benjamin Petersone711caf2008-06-11 16:44:04 +00001#
2# Module which supports allocation of memory from an mmap
3#
4# multiprocessing/heap.py
5#
R. David Murrayd3820662010-12-14 01:41:07 +00006# Copyright (c) 2006-2008, R Oudkerk
7# All rights reserved.
8#
9# Redistribution and use in source and binary forms, with or without
10# modification, are permitted provided that the following conditions
11# are met:
12#
13# 1. Redistributions of source code must retain the above copyright
14# notice, this list of conditions and the following disclaimer.
15# 2. Redistributions in binary form must reproduce the above copyright
16# notice, this list of conditions and the following disclaimer in the
17# documentation and/or other materials provided with the distribution.
18# 3. Neither the name of author nor the names of any contributors may be
19# used to endorse or promote products derived from this software
20# without specific prior written permission.
21#
22# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND
23# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
24# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
25# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
26# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
27# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
28# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
29# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
31# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32# SUCH DAMAGE.
Benjamin Petersone711caf2008-06-11 16:44:04 +000033#
34
35import bisect
36import mmap
37import tempfile
38import os
39import sys
40import threading
41import itertools
42
43import _multiprocessing
44from multiprocessing.util import Finalize, info
45from multiprocessing.forking import assert_spawning
46
47__all__ = ['BufferWrapper']
48
49#
50# Inheirtable class which wraps an mmap, and from which blocks can be allocated
51#
52
53if sys.platform == 'win32':
54
Brian Curtin6aa8bc32010-08-04 15:54:19 +000055 from _multiprocessing import win32
Benjamin Petersone711caf2008-06-11 16:44:04 +000056
57 class Arena(object):
58
59 _counter = itertools.count()
60
61 def __init__(self, size):
62 self.size = size
63 self.name = 'pym-%d-%d' % (os.getpid(), next(Arena._counter))
64 self.buffer = mmap.mmap(-1, self.size, tagname=self.name)
65 assert win32.GetLastError() == 0, 'tagname already in use'
66 self._state = (self.size, self.name)
67
68 def __getstate__(self):
69 assert_spawning(self)
70 return self._state
71
72 def __setstate__(self, state):
73 self.size, self.name = self._state = state
74 self.buffer = mmap.mmap(-1, self.size, tagname=self.name)
75 assert win32.GetLastError() == win32.ERROR_ALREADY_EXISTS
76
77else:
78
79 class Arena(object):
80
81 def __init__(self, size):
82 self.buffer = mmap.mmap(-1, size)
83 self.size = size
84 self.name = None
85
86#
87# Class allowing allocation of chunks of memory from arenas
88#
89
90class Heap(object):
91
92 _alignment = 8
93
94 def __init__(self, size=mmap.PAGESIZE):
95 self._lastpid = os.getpid()
96 self._lock = threading.Lock()
97 self._size = size
98 self._lengths = []
99 self._len_to_seq = {}
100 self._start_to_block = {}
101 self._stop_to_block = {}
102 self._allocated_blocks = set()
103 self._arenas = []
Charles-François Natali778db492011-07-02 14:35:49 +0200104 # list of pending blocks to free - see free() comment below
105 self._pending_free_blocks = []
Benjamin Petersone711caf2008-06-11 16:44:04 +0000106
107 @staticmethod
108 def _roundup(n, alignment):
109 # alignment must be a power of 2
110 mask = alignment - 1
111 return (n + mask) & ~mask
112
113 def _malloc(self, size):
114 # returns a large enough block -- it might be much larger
115 i = bisect.bisect_left(self._lengths, size)
116 if i == len(self._lengths):
117 length = self._roundup(max(self._size, size), mmap.PAGESIZE)
118 self._size *= 2
119 info('allocating a new mmap of length %d', length)
120 arena = Arena(length)
121 self._arenas.append(arena)
122 return (arena, 0, length)
123 else:
124 length = self._lengths[i]
125 seq = self._len_to_seq[length]
126 block = seq.pop()
127 if not seq:
128 del self._len_to_seq[length], self._lengths[i]
129
130 (arena, start, stop) = block
131 del self._start_to_block[(arena, start)]
132 del self._stop_to_block[(arena, stop)]
133 return block
134
135 def _free(self, block):
136 # free location and try to merge with neighbours
137 (arena, start, stop) = block
138
139 try:
140 prev_block = self._stop_to_block[(arena, start)]
141 except KeyError:
142 pass
143 else:
144 start, _ = self._absorb(prev_block)
145
146 try:
147 next_block = self._start_to_block[(arena, stop)]
148 except KeyError:
149 pass
150 else:
151 _, stop = self._absorb(next_block)
152
153 block = (arena, start, stop)
154 length = stop - start
155
156 try:
157 self._len_to_seq[length].append(block)
158 except KeyError:
159 self._len_to_seq[length] = [block]
160 bisect.insort(self._lengths, length)
161
162 self._start_to_block[(arena, start)] = block
163 self._stop_to_block[(arena, stop)] = block
164
165 def _absorb(self, block):
166 # deregister this block so it can be merged with a neighbour
167 (arena, start, stop) = block
168 del self._start_to_block[(arena, start)]
169 del self._stop_to_block[(arena, stop)]
170
171 length = stop - start
172 seq = self._len_to_seq[length]
173 seq.remove(block)
174 if not seq:
175 del self._len_to_seq[length]
176 self._lengths.remove(length)
177
178 return start, stop
179
Charles-François Natali778db492011-07-02 14:35:49 +0200180 def _free_pending_blocks(self):
181 # Free all the blocks in the pending list - called with the lock held.
182 while True:
183 try:
184 block = self._pending_free_blocks.pop()
185 except IndexError:
186 break
Benjamin Petersone711caf2008-06-11 16:44:04 +0000187 self._allocated_blocks.remove(block)
188 self._free(block)
Charles-François Natali778db492011-07-02 14:35:49 +0200189
190 def free(self, block):
191 # free a block returned by malloc()
192 # Since free() can be called asynchronously by the GC, it could happen
193 # that it's called while self._lock is held: in that case,
194 # self._lock.acquire() would deadlock (issue #12352). To avoid that, a
195 # trylock is used instead, and if the lock can't be acquired
196 # immediately, the block is added to a list of blocks to be freed
197 # synchronously sometimes later from malloc() or free(), by calling
198 # _free_pending_blocks() (appending and retrieving from a list is not
199 # strictly thread-safe but under cPython it's atomic thanks to the GIL).
200 assert os.getpid() == self._lastpid
201 if not self._lock.acquire(False):
202 # can't acquire the lock right now, add the block to the list of
203 # pending blocks to free
204 self._pending_free_blocks.append(block)
205 else:
206 # we hold the lock
207 try:
208 self._free_pending_blocks()
209 self._allocated_blocks.remove(block)
210 self._free(block)
211 finally:
212 self._lock.release()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000213
214 def malloc(self, size):
215 # return a block of right size (possibly rounded up)
216 assert 0 <= size < sys.maxsize
217 if os.getpid() != self._lastpid:
218 self.__init__() # reinitialize after fork
219 self._lock.acquire()
Charles-François Natali778db492011-07-02 14:35:49 +0200220 self._free_pending_blocks()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000221 try:
222 size = self._roundup(max(size,1), self._alignment)
223 (arena, start, stop) = self._malloc(size)
224 new_stop = start + size
225 if new_stop < stop:
226 self._free((arena, new_stop, stop))
227 block = (arena, start, new_stop)
228 self._allocated_blocks.add(block)
229 return block
230 finally:
231 self._lock.release()
232
233#
234# Class representing a chunk of an mmap -- can be inherited
235#
236
237class BufferWrapper(object):
238
239 _heap = Heap()
240
241 def __init__(self, size):
242 assert 0 <= size < sys.maxsize
243 block = BufferWrapper._heap.malloc(size)
244 self._state = (block, size)
245 Finalize(self, BufferWrapper._heap.free, args=(block,))
246
247 def get_address(self):
248 (arena, start, stop), size = self._state
249 address, length = _multiprocessing.address_of_buffer(arena.buffer)
250 assert size <= length
251 return address + start
252
253 def get_size(self):
254 return self._state[1]