blob: e473749981c9dc839d2a71fd3ca6001926c17c89 [file] [log] [blame]
Benjamin Petersone711caf2008-06-11 16:44:04 +00001#
2# Module which supports allocation of ctypes objects from shared memory
3#
4# multiprocessing/sharedctypes.py
5#
R. David Murrayd3820662010-12-14 01:41:07 +00006# Copyright (c) 2006-2008, R Oudkerk
Richard Oudkerk3e268aa2012-04-30 12:13:55 +01007# Licensed to PSF under a Contributor Agreement.
Benjamin Petersone711caf2008-06-11 16:44:04 +00008#
9
Benjamin Petersone711caf2008-06-11 16:44:04 +000010import ctypes
11import weakref
Benjamin Petersone711caf2008-06-11 16:44:04 +000012
13from multiprocessing import heap, RLock
Amaury Forgeot d'Arc949d47d2008-08-19 21:30:55 +000014from multiprocessing.forking import assert_spawning, ForkingPickler
Benjamin Petersone711caf2008-06-11 16:44:04 +000015
16__all__ = ['RawValue', 'RawArray', 'Value', 'Array', 'copy', 'synchronized']
17
18#
19#
20#
21
22typecode_to_type = {
23 'c': ctypes.c_char, 'u': ctypes.c_wchar,
24 'b': ctypes.c_byte, 'B': ctypes.c_ubyte,
25 'h': ctypes.c_short, 'H': ctypes.c_ushort,
26 'i': ctypes.c_int, 'I': ctypes.c_uint,
27 'l': ctypes.c_long, 'L': ctypes.c_ulong,
28 'f': ctypes.c_float, 'd': ctypes.c_double
29 }
30
31#
32#
33#
34
35def _new_value(type_):
36 size = ctypes.sizeof(type_)
37 wrapper = heap.BufferWrapper(size)
38 return rebuild_ctype(type_, wrapper, None)
39
40def RawValue(typecode_or_type, *args):
41 '''
42 Returns a ctypes object allocated from shared memory
43 '''
44 type_ = typecode_to_type.get(typecode_or_type, typecode_or_type)
45 obj = _new_value(type_)
46 ctypes.memset(ctypes.addressof(obj), 0, ctypes.sizeof(obj))
47 obj.__init__(*args)
48 return obj
49
50def RawArray(typecode_or_type, size_or_initializer):
51 '''
52 Returns a ctypes array allocated from shared memory
53 '''
54 type_ = typecode_to_type.get(typecode_or_type, typecode_or_type)
55 if isinstance(size_or_initializer, int):
56 type_ = type_ * size_or_initializer
Mark Dickinson89461ef2011-03-26 10:19:03 +000057 obj = _new_value(type_)
58 ctypes.memset(ctypes.addressof(obj), 0, ctypes.sizeof(obj))
59 return obj
Benjamin Petersone711caf2008-06-11 16:44:04 +000060 else:
61 type_ = type_ * len(size_or_initializer)
62 result = _new_value(type_)
63 result.__init__(*size_or_initializer)
64 return result
65
Benjamin Petersond5cd65b2008-06-27 22:16:47 +000066def Value(typecode_or_type, *args, lock=None):
Benjamin Petersone711caf2008-06-11 16:44:04 +000067 '''
68 Return a synchronization wrapper for a Value
69 '''
Benjamin Petersone711caf2008-06-11 16:44:04 +000070 obj = RawValue(typecode_or_type, *args)
Jesse Nollerb0516a62009-01-18 03:11:38 +000071 if lock is False:
72 return obj
73 if lock in (True, None):
Benjamin Petersone711caf2008-06-11 16:44:04 +000074 lock = RLock()
Jesse Nollerb0516a62009-01-18 03:11:38 +000075 if not hasattr(lock, 'acquire'):
76 raise AttributeError("'%r' has no method 'acquire'" % lock)
Benjamin Petersone711caf2008-06-11 16:44:04 +000077 return synchronized(obj, lock)
78
79def Array(typecode_or_type, size_or_initializer, **kwds):
80 '''
81 Return a synchronization wrapper for a RawArray
82 '''
83 lock = kwds.pop('lock', None)
84 if kwds:
85 raise ValueError('unrecognized keyword argument(s): %s' % list(kwds.keys()))
86 obj = RawArray(typecode_or_type, size_or_initializer)
Jesse Nollerb0516a62009-01-18 03:11:38 +000087 if lock is False:
88 return obj
89 if lock in (True, None):
Benjamin Petersone711caf2008-06-11 16:44:04 +000090 lock = RLock()
Jesse Nollerb0516a62009-01-18 03:11:38 +000091 if not hasattr(lock, 'acquire'):
92 raise AttributeError("'%r' has no method 'acquire'" % lock)
Benjamin Petersone711caf2008-06-11 16:44:04 +000093 return synchronized(obj, lock)
94
95def copy(obj):
96 new_obj = _new_value(type(obj))
97 ctypes.pointer(new_obj)[0] = obj
98 return new_obj
99
100def synchronized(obj, lock=None):
101 assert not isinstance(obj, SynchronizedBase), 'object already synchronized'
102
103 if isinstance(obj, ctypes._SimpleCData):
104 return Synchronized(obj, lock)
105 elif isinstance(obj, ctypes.Array):
106 if obj._type_ is ctypes.c_char:
107 return SynchronizedString(obj, lock)
108 return SynchronizedArray(obj, lock)
109 else:
110 cls = type(obj)
111 try:
112 scls = class_cache[cls]
113 except KeyError:
114 names = [field[0] for field in cls._fields_]
115 d = dict((name, make_property(name)) for name in names)
116 classname = 'Synchronized' + cls.__name__
117 scls = class_cache[cls] = type(classname, (SynchronizedBase,), d)
118 return scls(obj, lock)
119
120#
121# Functions for pickling/unpickling
122#
123
124def reduce_ctype(obj):
125 assert_spawning(obj)
126 if isinstance(obj, ctypes.Array):
127 return rebuild_ctype, (obj._type_, obj._wrapper, obj._length_)
128 else:
129 return rebuild_ctype, (type(obj), obj._wrapper, None)
130
131def rebuild_ctype(type_, wrapper, length):
132 if length is not None:
133 type_ = type_ * length
Amaury Forgeot d'Arc949d47d2008-08-19 21:30:55 +0000134 ForkingPickler.register(type_, reduce_ctype)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000135 obj = type_.from_address(wrapper.get_address())
136 obj._wrapper = wrapper
137 return obj
138
139#
140# Function to create properties
141#
142
143def make_property(name):
144 try:
145 return prop_cache[name]
146 except KeyError:
147 d = {}
148 exec(template % ((name,)*7), d)
149 prop_cache[name] = d[name]
150 return d[name]
151
152template = '''
153def get%s(self):
154 self.acquire()
155 try:
156 return self._obj.%s
157 finally:
158 self.release()
159def set%s(self, value):
160 self.acquire()
161 try:
162 self._obj.%s = value
163 finally:
164 self.release()
165%s = property(get%s, set%s)
166'''
167
168prop_cache = {}
169class_cache = weakref.WeakKeyDictionary()
170
171#
172# Synchronized wrappers
173#
174
175class SynchronizedBase(object):
176
177 def __init__(self, obj, lock=None):
178 self._obj = obj
179 self._lock = lock or RLock()
180 self.acquire = self._lock.acquire
181 self.release = self._lock.release
182
183 def __reduce__(self):
184 assert_spawning(self)
185 return synchronized, (self._obj, self._lock)
186
187 def get_obj(self):
188 return self._obj
189
190 def get_lock(self):
191 return self._lock
192
193 def __repr__(self):
194 return '<%s wrapper for %s>' % (type(self).__name__, self._obj)
195
196
197class Synchronized(SynchronizedBase):
198 value = make_property('value')
199
200
201class SynchronizedArray(SynchronizedBase):
202
203 def __len__(self):
204 return len(self._obj)
205
206 def __getitem__(self, i):
207 self.acquire()
208 try:
209 return self._obj[i]
210 finally:
211 self.release()
212
213 def __setitem__(self, i, value):
214 self.acquire()
215 try:
216 self._obj[i] = value
217 finally:
218 self.release()
219
220 def __getslice__(self, start, stop):
221 self.acquire()
222 try:
223 return self._obj[start:stop]
224 finally:
225 self.release()
226
227 def __setslice__(self, start, stop, values):
228 self.acquire()
229 try:
230 self._obj[start:stop] = values
231 finally:
232 self.release()
233
234
235class SynchronizedString(SynchronizedArray):
236 value = make_property('value')
237 raw = make_property('raw')