blob: 58263795302c4f587c8f64a32fe98b92e0852236 [file] [log] [blame]
Benjamin Petersone711caf2008-06-11 16:44:04 +00001#
2# Module which supports allocation of ctypes objects from shared memory
3#
4# multiprocessing/sharedctypes.py
5#
R. David Murrayd3820662010-12-14 01:41:07 +00006# Copyright (c) 2006-2008, R Oudkerk
7# All rights reserved.
8#
9# Redistribution and use in source and binary forms, with or without
10# modification, are permitted provided that the following conditions
11# are met:
12#
13# 1. Redistributions of source code must retain the above copyright
14# notice, this list of conditions and the following disclaimer.
15# 2. Redistributions in binary form must reproduce the above copyright
16# notice, this list of conditions and the following disclaimer in the
17# documentation and/or other materials provided with the distribution.
18# 3. Neither the name of author nor the names of any contributors may be
19# used to endorse or promote products derived from this software
20# without specific prior written permission.
21#
22# THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND
23# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
24# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
25# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
26# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
27# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
28# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
29# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
30# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
31# OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32# SUCH DAMAGE.
Benjamin Petersone711caf2008-06-11 16:44:04 +000033#
34
Benjamin Petersone711caf2008-06-11 16:44:04 +000035import ctypes
36import weakref
Benjamin Petersone711caf2008-06-11 16:44:04 +000037
38from multiprocessing import heap, RLock
Amaury Forgeot d'Arc949d47d2008-08-19 21:30:55 +000039from multiprocessing.forking import assert_spawning, ForkingPickler
Benjamin Petersone711caf2008-06-11 16:44:04 +000040
41__all__ = ['RawValue', 'RawArray', 'Value', 'Array', 'copy', 'synchronized']
42
43#
44#
45#
46
47typecode_to_type = {
48 'c': ctypes.c_char, 'u': ctypes.c_wchar,
49 'b': ctypes.c_byte, 'B': ctypes.c_ubyte,
50 'h': ctypes.c_short, 'H': ctypes.c_ushort,
51 'i': ctypes.c_int, 'I': ctypes.c_uint,
52 'l': ctypes.c_long, 'L': ctypes.c_ulong,
53 'f': ctypes.c_float, 'd': ctypes.c_double
54 }
55
56#
57#
58#
59
60def _new_value(type_):
61 size = ctypes.sizeof(type_)
62 wrapper = heap.BufferWrapper(size)
63 return rebuild_ctype(type_, wrapper, None)
64
65def RawValue(typecode_or_type, *args):
66 '''
67 Returns a ctypes object allocated from shared memory
68 '''
69 type_ = typecode_to_type.get(typecode_or_type, typecode_or_type)
70 obj = _new_value(type_)
71 ctypes.memset(ctypes.addressof(obj), 0, ctypes.sizeof(obj))
72 obj.__init__(*args)
73 return obj
74
75def RawArray(typecode_or_type, size_or_initializer):
76 '''
77 Returns a ctypes array allocated from shared memory
78 '''
79 type_ = typecode_to_type.get(typecode_or_type, typecode_or_type)
80 if isinstance(size_or_initializer, int):
81 type_ = type_ * size_or_initializer
Mark Dickinson89461ef2011-03-26 10:19:03 +000082 obj = _new_value(type_)
83 ctypes.memset(ctypes.addressof(obj), 0, ctypes.sizeof(obj))
84 return obj
Benjamin Petersone711caf2008-06-11 16:44:04 +000085 else:
86 type_ = type_ * len(size_or_initializer)
87 result = _new_value(type_)
88 result.__init__(*size_or_initializer)
89 return result
90
Benjamin Petersond5cd65b2008-06-27 22:16:47 +000091def Value(typecode_or_type, *args, lock=None):
Benjamin Petersone711caf2008-06-11 16:44:04 +000092 '''
93 Return a synchronization wrapper for a Value
94 '''
Benjamin Petersone711caf2008-06-11 16:44:04 +000095 obj = RawValue(typecode_or_type, *args)
Jesse Nollerb0516a62009-01-18 03:11:38 +000096 if lock is False:
97 return obj
98 if lock in (True, None):
Benjamin Petersone711caf2008-06-11 16:44:04 +000099 lock = RLock()
Jesse Nollerb0516a62009-01-18 03:11:38 +0000100 if not hasattr(lock, 'acquire'):
101 raise AttributeError("'%r' has no method 'acquire'" % lock)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000102 return synchronized(obj, lock)
103
104def Array(typecode_or_type, size_or_initializer, **kwds):
105 '''
106 Return a synchronization wrapper for a RawArray
107 '''
108 lock = kwds.pop('lock', None)
109 if kwds:
110 raise ValueError('unrecognized keyword argument(s): %s' % list(kwds.keys()))
111 obj = RawArray(typecode_or_type, size_or_initializer)
Jesse Nollerb0516a62009-01-18 03:11:38 +0000112 if lock is False:
113 return obj
114 if lock in (True, None):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000115 lock = RLock()
Jesse Nollerb0516a62009-01-18 03:11:38 +0000116 if not hasattr(lock, 'acquire'):
117 raise AttributeError("'%r' has no method 'acquire'" % lock)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000118 return synchronized(obj, lock)
119
120def copy(obj):
121 new_obj = _new_value(type(obj))
122 ctypes.pointer(new_obj)[0] = obj
123 return new_obj
124
125def synchronized(obj, lock=None):
126 assert not isinstance(obj, SynchronizedBase), 'object already synchronized'
127
128 if isinstance(obj, ctypes._SimpleCData):
129 return Synchronized(obj, lock)
130 elif isinstance(obj, ctypes.Array):
131 if obj._type_ is ctypes.c_char:
132 return SynchronizedString(obj, lock)
133 return SynchronizedArray(obj, lock)
134 else:
135 cls = type(obj)
136 try:
137 scls = class_cache[cls]
138 except KeyError:
139 names = [field[0] for field in cls._fields_]
140 d = dict((name, make_property(name)) for name in names)
141 classname = 'Synchronized' + cls.__name__
142 scls = class_cache[cls] = type(classname, (SynchronizedBase,), d)
143 return scls(obj, lock)
144
145#
146# Functions for pickling/unpickling
147#
148
149def reduce_ctype(obj):
150 assert_spawning(obj)
151 if isinstance(obj, ctypes.Array):
152 return rebuild_ctype, (obj._type_, obj._wrapper, obj._length_)
153 else:
154 return rebuild_ctype, (type(obj), obj._wrapper, None)
155
156def rebuild_ctype(type_, wrapper, length):
157 if length is not None:
158 type_ = type_ * length
Amaury Forgeot d'Arc949d47d2008-08-19 21:30:55 +0000159 ForkingPickler.register(type_, reduce_ctype)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000160 obj = type_.from_address(wrapper.get_address())
161 obj._wrapper = wrapper
162 return obj
163
164#
165# Function to create properties
166#
167
168def make_property(name):
169 try:
170 return prop_cache[name]
171 except KeyError:
172 d = {}
173 exec(template % ((name,)*7), d)
174 prop_cache[name] = d[name]
175 return d[name]
176
177template = '''
178def get%s(self):
179 self.acquire()
180 try:
181 return self._obj.%s
182 finally:
183 self.release()
184def set%s(self, value):
185 self.acquire()
186 try:
187 self._obj.%s = value
188 finally:
189 self.release()
190%s = property(get%s, set%s)
191'''
192
193prop_cache = {}
194class_cache = weakref.WeakKeyDictionary()
195
196#
197# Synchronized wrappers
198#
199
200class SynchronizedBase(object):
201
202 def __init__(self, obj, lock=None):
203 self._obj = obj
204 self._lock = lock or RLock()
205 self.acquire = self._lock.acquire
206 self.release = self._lock.release
207
208 def __reduce__(self):
209 assert_spawning(self)
210 return synchronized, (self._obj, self._lock)
211
212 def get_obj(self):
213 return self._obj
214
215 def get_lock(self):
216 return self._lock
217
218 def __repr__(self):
219 return '<%s wrapper for %s>' % (type(self).__name__, self._obj)
220
221
222class Synchronized(SynchronizedBase):
223 value = make_property('value')
224
225
226class SynchronizedArray(SynchronizedBase):
227
228 def __len__(self):
229 return len(self._obj)
230
231 def __getitem__(self, i):
232 self.acquire()
233 try:
234 return self._obj[i]
235 finally:
236 self.release()
237
238 def __setitem__(self, i, value):
239 self.acquire()
240 try:
241 self._obj[i] = value
242 finally:
243 self.release()
244
245 def __getslice__(self, start, stop):
246 self.acquire()
247 try:
248 return self._obj[start:stop]
249 finally:
250 self.release()
251
252 def __setslice__(self, start, stop, values):
253 self.acquire()
254 try:
255 self._obj[start:stop] = values
256 finally:
257 self.release()
258
259
260class SynchronizedString(SynchronizedArray):
261 value = make_property('value')
262 raw = make_property('raw')