blob: ab6cf1610bb8bde1c86e536abc925ebbfe8bb81f [file] [log] [blame]
Benjamin Petersone711caf2008-06-11 16:44:04 +00001#
2# Module which supports allocation of memory from an mmap
3#
4# multiprocessing/heap.py
5#
6# Copyright (c) 2007-2008, R Oudkerk --- see COPYING.txt
7#
8
9import bisect
10import mmap
11import tempfile
12import os
13import sys
14import threading
15import itertools
16
17import _multiprocessing
18from multiprocessing.util import Finalize, info
19from multiprocessing.forking import assert_spawning
20
21__all__ = ['BufferWrapper']
22
23#
24# Inheirtable class which wraps an mmap, and from which blocks can be allocated
25#
26
27if sys.platform == 'win32':
28
29 from ._multiprocessing import win32
30
31 class Arena(object):
32
33 _counter = itertools.count()
34
35 def __init__(self, size):
36 self.size = size
37 self.name = 'pym-%d-%d' % (os.getpid(), next(Arena._counter))
38 self.buffer = mmap.mmap(-1, self.size, tagname=self.name)
39 assert win32.GetLastError() == 0, 'tagname already in use'
40 self._state = (self.size, self.name)
41
42 def __getstate__(self):
43 assert_spawning(self)
44 return self._state
45
46 def __setstate__(self, state):
47 self.size, self.name = self._state = state
48 self.buffer = mmap.mmap(-1, self.size, tagname=self.name)
49 assert win32.GetLastError() == win32.ERROR_ALREADY_EXISTS
50
51else:
52
53 class Arena(object):
54
55 def __init__(self, size):
56 self.buffer = mmap.mmap(-1, size)
57 self.size = size
58 self.name = None
59
60#
61# Class allowing allocation of chunks of memory from arenas
62#
63
64class Heap(object):
65
66 _alignment = 8
67
68 def __init__(self, size=mmap.PAGESIZE):
69 self._lastpid = os.getpid()
70 self._lock = threading.Lock()
71 self._size = size
72 self._lengths = []
73 self._len_to_seq = {}
74 self._start_to_block = {}
75 self._stop_to_block = {}
76 self._allocated_blocks = set()
77 self._arenas = []
78
79 @staticmethod
80 def _roundup(n, alignment):
81 # alignment must be a power of 2
82 mask = alignment - 1
83 return (n + mask) & ~mask
84
85 def _malloc(self, size):
86 # returns a large enough block -- it might be much larger
87 i = bisect.bisect_left(self._lengths, size)
88 if i == len(self._lengths):
89 length = self._roundup(max(self._size, size), mmap.PAGESIZE)
90 self._size *= 2
91 info('allocating a new mmap of length %d', length)
92 arena = Arena(length)
93 self._arenas.append(arena)
94 return (arena, 0, length)
95 else:
96 length = self._lengths[i]
97 seq = self._len_to_seq[length]
98 block = seq.pop()
99 if not seq:
100 del self._len_to_seq[length], self._lengths[i]
101
102 (arena, start, stop) = block
103 del self._start_to_block[(arena, start)]
104 del self._stop_to_block[(arena, stop)]
105 return block
106
107 def _free(self, block):
108 # free location and try to merge with neighbours
109 (arena, start, stop) = block
110
111 try:
112 prev_block = self._stop_to_block[(arena, start)]
113 except KeyError:
114 pass
115 else:
116 start, _ = self._absorb(prev_block)
117
118 try:
119 next_block = self._start_to_block[(arena, stop)]
120 except KeyError:
121 pass
122 else:
123 _, stop = self._absorb(next_block)
124
125 block = (arena, start, stop)
126 length = stop - start
127
128 try:
129 self._len_to_seq[length].append(block)
130 except KeyError:
131 self._len_to_seq[length] = [block]
132 bisect.insort(self._lengths, length)
133
134 self._start_to_block[(arena, start)] = block
135 self._stop_to_block[(arena, stop)] = block
136
137 def _absorb(self, block):
138 # deregister this block so it can be merged with a neighbour
139 (arena, start, stop) = block
140 del self._start_to_block[(arena, start)]
141 del self._stop_to_block[(arena, stop)]
142
143 length = stop - start
144 seq = self._len_to_seq[length]
145 seq.remove(block)
146 if not seq:
147 del self._len_to_seq[length]
148 self._lengths.remove(length)
149
150 return start, stop
151
152 def free(self, block):
153 # free a block returned by malloc()
154 assert os.getpid() == self._lastpid
155 self._lock.acquire()
156 try:
157 self._allocated_blocks.remove(block)
158 self._free(block)
159 finally:
160 self._lock.release()
161
162 def malloc(self, size):
163 # return a block of right size (possibly rounded up)
164 assert 0 <= size < sys.maxsize
165 if os.getpid() != self._lastpid:
166 self.__init__() # reinitialize after fork
167 self._lock.acquire()
168 try:
169 size = self._roundup(max(size,1), self._alignment)
170 (arena, start, stop) = self._malloc(size)
171 new_stop = start + size
172 if new_stop < stop:
173 self._free((arena, new_stop, stop))
174 block = (arena, start, new_stop)
175 self._allocated_blocks.add(block)
176 return block
177 finally:
178 self._lock.release()
179
180#
181# Class representing a chunk of an mmap -- can be inherited
182#
183
184class BufferWrapper(object):
185
186 _heap = Heap()
187
188 def __init__(self, size):
189 assert 0 <= size < sys.maxsize
190 block = BufferWrapper._heap.malloc(size)
191 self._state = (block, size)
192 Finalize(self, BufferWrapper._heap.free, args=(block,))
193
194 def get_address(self):
195 (arena, start, stop), size = self._state
196 address, length = _multiprocessing.address_of_buffer(arena.buffer)
197 assert size <= length
198 return address + start
199
200 def get_size(self):
201 return self._state[1]