blob: e83ce584596eaed1d82468eb4c132eba94e05191 [file] [log] [blame]
Benjamin Petersone711caf2008-06-11 16:44:04 +00001#
2# Module which supports allocation of ctypes objects from shared memory
3#
4# multiprocessing/sharedctypes.py
5#
R. David Murrayd3820662010-12-14 01:41:07 +00006# Copyright (c) 2006-2008, R Oudkerk
7# All rights reserved.
8#
9# Redistribution and use in source and binary forms, with or without
10# modification, are permitted provided that the following conditions
11# are met:
12#
13# 1. Redistributions of source code must retain the above copyright
14# notice, this list of conditions and the following disclaimer.
15# 2. Redistributions in binary form must reproduce the above copyright
16# notice, this list of conditions and the following disclaimer in the
17# documentation and/or other materials provided with the distribution.
18# 3. Neither the name of author nor the names of any contributors may be
19# used to endorse or promote products derived from this software
20# without specific prior written permission.
21#
22# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND
23# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
24# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
25# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
26# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
27# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
28# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
29# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
31# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32# SUCH DAMAGE.
Benjamin Petersone711caf2008-06-11 16:44:04 +000033#
34
35import sys
36import ctypes
37import weakref
Benjamin Petersone711caf2008-06-11 16:44:04 +000038
39from multiprocessing import heap, RLock
Amaury Forgeot d'Arc949d47d2008-08-19 21:30:55 +000040from multiprocessing.forking import assert_spawning, ForkingPickler
Benjamin Petersone711caf2008-06-11 16:44:04 +000041
42__all__ = ['RawValue', 'RawArray', 'Value', 'Array', 'copy', 'synchronized']
43
44#
45#
46#
47
48typecode_to_type = {
49 'c': ctypes.c_char, 'u': ctypes.c_wchar,
50 'b': ctypes.c_byte, 'B': ctypes.c_ubyte,
51 'h': ctypes.c_short, 'H': ctypes.c_ushort,
52 'i': ctypes.c_int, 'I': ctypes.c_uint,
53 'l': ctypes.c_long, 'L': ctypes.c_ulong,
54 'f': ctypes.c_float, 'd': ctypes.c_double
55 }
56
57#
58#
59#
60
61def _new_value(type_):
62 size = ctypes.sizeof(type_)
63 wrapper = heap.BufferWrapper(size)
64 return rebuild_ctype(type_, wrapper, None)
65
66def RawValue(typecode_or_type, *args):
67 '''
68 Returns a ctypes object allocated from shared memory
69 '''
70 type_ = typecode_to_type.get(typecode_or_type, typecode_or_type)
71 obj = _new_value(type_)
72 ctypes.memset(ctypes.addressof(obj), 0, ctypes.sizeof(obj))
73 obj.__init__(*args)
74 return obj
75
76def RawArray(typecode_or_type, size_or_initializer):
77 '''
78 Returns a ctypes array allocated from shared memory
79 '''
80 type_ = typecode_to_type.get(typecode_or_type, typecode_or_type)
81 if isinstance(size_or_initializer, int):
82 type_ = type_ * size_or_initializer
83 return _new_value(type_)
84 else:
85 type_ = type_ * len(size_or_initializer)
86 result = _new_value(type_)
87 result.__init__(*size_or_initializer)
88 return result
89
Benjamin Petersond5cd65b2008-06-27 22:16:47 +000090def Value(typecode_or_type, *args, lock=None):
Benjamin Petersone711caf2008-06-11 16:44:04 +000091 '''
92 Return a synchronization wrapper for a Value
93 '''
Benjamin Petersone711caf2008-06-11 16:44:04 +000094 obj = RawValue(typecode_or_type, *args)
Jesse Nollerb0516a62009-01-18 03:11:38 +000095 if lock is False:
96 return obj
97 if lock in (True, None):
Benjamin Petersone711caf2008-06-11 16:44:04 +000098 lock = RLock()
Jesse Nollerb0516a62009-01-18 03:11:38 +000099 if not hasattr(lock, 'acquire'):
100 raise AttributeError("'%r' has no method 'acquire'" % lock)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000101 return synchronized(obj, lock)
102
103def Array(typecode_or_type, size_or_initializer, **kwds):
104 '''
105 Return a synchronization wrapper for a RawArray
106 '''
107 lock = kwds.pop('lock', None)
108 if kwds:
109 raise ValueError('unrecognized keyword argument(s): %s' % list(kwds.keys()))
110 obj = RawArray(typecode_or_type, size_or_initializer)
Jesse Nollerb0516a62009-01-18 03:11:38 +0000111 if lock is False:
112 return obj
113 if lock in (True, None):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000114 lock = RLock()
Jesse Nollerb0516a62009-01-18 03:11:38 +0000115 if not hasattr(lock, 'acquire'):
116 raise AttributeError("'%r' has no method 'acquire'" % lock)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000117 return synchronized(obj, lock)
118
119def copy(obj):
120 new_obj = _new_value(type(obj))
121 ctypes.pointer(new_obj)[0] = obj
122 return new_obj
123
124def synchronized(obj, lock=None):
125 assert not isinstance(obj, SynchronizedBase), 'object already synchronized'
126
127 if isinstance(obj, ctypes._SimpleCData):
128 return Synchronized(obj, lock)
129 elif isinstance(obj, ctypes.Array):
130 if obj._type_ is ctypes.c_char:
131 return SynchronizedString(obj, lock)
132 return SynchronizedArray(obj, lock)
133 else:
134 cls = type(obj)
135 try:
136 scls = class_cache[cls]
137 except KeyError:
138 names = [field[0] for field in cls._fields_]
139 d = dict((name, make_property(name)) for name in names)
140 classname = 'Synchronized' + cls.__name__
141 scls = class_cache[cls] = type(classname, (SynchronizedBase,), d)
142 return scls(obj, lock)
143
144#
145# Functions for pickling/unpickling
146#
147
148def reduce_ctype(obj):
149 assert_spawning(obj)
150 if isinstance(obj, ctypes.Array):
151 return rebuild_ctype, (obj._type_, obj._wrapper, obj._length_)
152 else:
153 return rebuild_ctype, (type(obj), obj._wrapper, None)
154
155def rebuild_ctype(type_, wrapper, length):
156 if length is not None:
157 type_ = type_ * length
Amaury Forgeot d'Arc949d47d2008-08-19 21:30:55 +0000158 ForkingPickler.register(type_, reduce_ctype)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000159 obj = type_.from_address(wrapper.get_address())
160 obj._wrapper = wrapper
161 return obj
162
163#
164# Function to create properties
165#
166
167def make_property(name):
168 try:
169 return prop_cache[name]
170 except KeyError:
171 d = {}
172 exec(template % ((name,)*7), d)
173 prop_cache[name] = d[name]
174 return d[name]
175
176template = '''
177def get%s(self):
178 self.acquire()
179 try:
180 return self._obj.%s
181 finally:
182 self.release()
183def set%s(self, value):
184 self.acquire()
185 try:
186 self._obj.%s = value
187 finally:
188 self.release()
189%s = property(get%s, set%s)
190'''
191
192prop_cache = {}
193class_cache = weakref.WeakKeyDictionary()
194
195#
196# Synchronized wrappers
197#
198
199class SynchronizedBase(object):
200
201 def __init__(self, obj, lock=None):
202 self._obj = obj
203 self._lock = lock or RLock()
204 self.acquire = self._lock.acquire
205 self.release = self._lock.release
206
207 def __reduce__(self):
208 assert_spawning(self)
209 return synchronized, (self._obj, self._lock)
210
211 def get_obj(self):
212 return self._obj
213
214 def get_lock(self):
215 return self._lock
216
217 def __repr__(self):
218 return '<%s wrapper for %s>' % (type(self).__name__, self._obj)
219
220
221class Synchronized(SynchronizedBase):
222 value = make_property('value')
223
224
225class SynchronizedArray(SynchronizedBase):
226
227 def __len__(self):
228 return len(self._obj)
229
230 def __getitem__(self, i):
231 self.acquire()
232 try:
233 return self._obj[i]
234 finally:
235 self.release()
236
237 def __setitem__(self, i, value):
238 self.acquire()
239 try:
240 self._obj[i] = value
241 finally:
242 self.release()
243
244 def __getslice__(self, start, stop):
245 self.acquire()
246 try:
247 return self._obj[start:stop]
248 finally:
249 self.release()
250
251 def __setslice__(self, start, stop, values):
252 self.acquire()
253 try:
254 self._obj[start:stop] = values
255 finally:
256 self.release()
257
258
259class SynchronizedString(SynchronizedArray):
260 value = make_property('value')
261 raw = make_property('raw')