blob: 1834d0a1eb22992ac93e34c98ba167eeaf2c6b8c [file] [log] [blame]
Benjamin Petersone711caf2008-06-11 16:44:04 +00001#
2# Module which supports allocation of memory from an mmap
3#
4# multiprocessing/heap.py
5#
R. David Murray3fc969a2010-12-14 01:38:16 +00006# Copyright (c) 2006-2008, R Oudkerk
7# All rights reserved.
8#
9# Redistribution and use in source and binary forms, with or without
10# modification, are permitted provided that the following conditions
11# are met:
12#
13# 1. Redistributions of source code must retain the above copyright
14# notice, this list of conditions and the following disclaimer.
15# 2. Redistributions in binary form must reproduce the above copyright
16# notice, this list of conditions and the following disclaimer in the
17# documentation and/or other materials provided with the distribution.
18# 3. Neither the name of author nor the names of any contributors may be
19# used to endorse or promote products derived from this software
20# without specific prior written permission.
21#
22# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND
23# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
24# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
25# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
26# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
27# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
28# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
29# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
31# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32# SUCH DAMAGE.
Benjamin Petersone711caf2008-06-11 16:44:04 +000033#
34
35import bisect
36import mmap
37import tempfile
38import os
39import sys
40import threading
41import itertools
42
43import _multiprocessing
44from multiprocessing.util import Finalize, info
45from multiprocessing.forking import assert_spawning
46
47__all__ = ['BufferWrapper']
48
49#
50# Inheirtable class which wraps an mmap, and from which blocks can be allocated
51#
52
53if sys.platform == 'win32':
54
Brian Curtina6a32742010-08-04 15:47:24 +000055 from _multiprocessing import win32
Benjamin Petersone711caf2008-06-11 16:44:04 +000056
57 class Arena(object):
58
59 _counter = itertools.count()
60
61 def __init__(self, size):
62 self.size = size
63 self.name = 'pym-%d-%d' % (os.getpid(), next(Arena._counter))
64 self.buffer = mmap.mmap(-1, self.size, tagname=self.name)
65 assert win32.GetLastError() == 0, 'tagname already in use'
66 self._state = (self.size, self.name)
67
68 def __getstate__(self):
69 assert_spawning(self)
70 return self._state
71
72 def __setstate__(self, state):
73 self.size, self.name = self._state = state
74 self.buffer = mmap.mmap(-1, self.size, tagname=self.name)
75 assert win32.GetLastError() == win32.ERROR_ALREADY_EXISTS
76
77else:
78
79 class Arena(object):
80
81 def __init__(self, size):
82 self.buffer = mmap.mmap(-1, size)
83 self.size = size
84 self.name = None
85
86#
87# Class allowing allocation of chunks of memory from arenas
88#
89
90class Heap(object):
91
92 _alignment = 8
93
94 def __init__(self, size=mmap.PAGESIZE):
95 self._lastpid = os.getpid()
96 self._lock = threading.Lock()
97 self._size = size
98 self._lengths = []
99 self._len_to_seq = {}
100 self._start_to_block = {}
101 self._stop_to_block = {}
102 self._allocated_blocks = set()
103 self._arenas = []
104
105 @staticmethod
106 def _roundup(n, alignment):
107 # alignment must be a power of 2
108 mask = alignment - 1
109 return (n + mask) & ~mask
110
111 def _malloc(self, size):
112 # returns a large enough block -- it might be much larger
113 i = bisect.bisect_left(self._lengths, size)
114 if i == len(self._lengths):
115 length = self._roundup(max(self._size, size), mmap.PAGESIZE)
116 self._size *= 2
117 info('allocating a new mmap of length %d', length)
118 arena = Arena(length)
119 self._arenas.append(arena)
120 return (arena, 0, length)
121 else:
122 length = self._lengths[i]
123 seq = self._len_to_seq[length]
124 block = seq.pop()
125 if not seq:
126 del self._len_to_seq[length], self._lengths[i]
127
128 (arena, start, stop) = block
129 del self._start_to_block[(arena, start)]
130 del self._stop_to_block[(arena, stop)]
131 return block
132
133 def _free(self, block):
134 # free location and try to merge with neighbours
135 (arena, start, stop) = block
136
137 try:
138 prev_block = self._stop_to_block[(arena, start)]
139 except KeyError:
140 pass
141 else:
142 start, _ = self._absorb(prev_block)
143
144 try:
145 next_block = self._start_to_block[(arena, stop)]
146 except KeyError:
147 pass
148 else:
149 _, stop = self._absorb(next_block)
150
151 block = (arena, start, stop)
152 length = stop - start
153
154 try:
155 self._len_to_seq[length].append(block)
156 except KeyError:
157 self._len_to_seq[length] = [block]
158 bisect.insort(self._lengths, length)
159
160 self._start_to_block[(arena, start)] = block
161 self._stop_to_block[(arena, stop)] = block
162
163 def _absorb(self, block):
164 # deregister this block so it can be merged with a neighbour
165 (arena, start, stop) = block
166 del self._start_to_block[(arena, start)]
167 del self._stop_to_block[(arena, stop)]
168
169 length = stop - start
170 seq = self._len_to_seq[length]
171 seq.remove(block)
172 if not seq:
173 del self._len_to_seq[length]
174 self._lengths.remove(length)
175
176 return start, stop
177
178 def free(self, block):
179 # free a block returned by malloc()
180 assert os.getpid() == self._lastpid
181 self._lock.acquire()
182 try:
183 self._allocated_blocks.remove(block)
184 self._free(block)
185 finally:
186 self._lock.release()
187
188 def malloc(self, size):
189 # return a block of right size (possibly rounded up)
190 assert 0 <= size < sys.maxsize
191 if os.getpid() != self._lastpid:
192 self.__init__() # reinitialize after fork
193 self._lock.acquire()
194 try:
195 size = self._roundup(max(size,1), self._alignment)
196 (arena, start, stop) = self._malloc(size)
197 new_stop = start + size
198 if new_stop < stop:
199 self._free((arena, new_stop, stop))
200 block = (arena, start, new_stop)
201 self._allocated_blocks.add(block)
202 return block
203 finally:
204 self._lock.release()
205
206#
207# Class representing a chunk of an mmap -- can be inherited
208#
209
210class BufferWrapper(object):
211
212 _heap = Heap()
213
214 def __init__(self, size):
215 assert 0 <= size < sys.maxsize
216 block = BufferWrapper._heap.malloc(size)
217 self._state = (block, size)
218 Finalize(self, BufferWrapper._heap.free, args=(block,))
219
220 def get_address(self):
221 (arena, start, stop), size = self._state
222 address, length = _multiprocessing.address_of_buffer(arena.buffer)
223 assert size <= length
224 return address + start
225
226 def get_size(self):
227 return self._state[1]