blob: 45826dd069e9e8dd459e7ba75902a8e4f7c524ad [file] [log] [blame]
Benjamin Petersone711caf2008-06-11 16:44:04 +00001#
2# Module which supports allocation of ctypes objects from shared memory
3#
4# multiprocessing/sharedctypes.py
5#
6# Copyright (c) 2007-2008, R Oudkerk --- see COPYING.txt
7#
8
9import sys
10import ctypes
11import weakref
12import copyreg
13
14from multiprocessing import heap, RLock
15from multiprocessing.forking import assert_spawning
16
17__all__ = ['RawValue', 'RawArray', 'Value', 'Array', 'copy', 'synchronized']
18
19#
20#
21#
22
23typecode_to_type = {
24 'c': ctypes.c_char, 'u': ctypes.c_wchar,
25 'b': ctypes.c_byte, 'B': ctypes.c_ubyte,
26 'h': ctypes.c_short, 'H': ctypes.c_ushort,
27 'i': ctypes.c_int, 'I': ctypes.c_uint,
28 'l': ctypes.c_long, 'L': ctypes.c_ulong,
29 'f': ctypes.c_float, 'd': ctypes.c_double
30 }
31
32#
33#
34#
35
36def _new_value(type_):
37 size = ctypes.sizeof(type_)
38 wrapper = heap.BufferWrapper(size)
39 return rebuild_ctype(type_, wrapper, None)
40
41def RawValue(typecode_or_type, *args):
42 '''
43 Returns a ctypes object allocated from shared memory
44 '''
45 type_ = typecode_to_type.get(typecode_or_type, typecode_or_type)
46 obj = _new_value(type_)
47 ctypes.memset(ctypes.addressof(obj), 0, ctypes.sizeof(obj))
48 obj.__init__(*args)
49 return obj
50
51def RawArray(typecode_or_type, size_or_initializer):
52 '''
53 Returns a ctypes array allocated from shared memory
54 '''
55 type_ = typecode_to_type.get(typecode_or_type, typecode_or_type)
56 if isinstance(size_or_initializer, int):
57 type_ = type_ * size_or_initializer
58 return _new_value(type_)
59 else:
60 type_ = type_ * len(size_or_initializer)
61 result = _new_value(type_)
62 result.__init__(*size_or_initializer)
63 return result
64
Benjamin Petersond5cd65b2008-06-27 22:16:47 +000065def Value(typecode_or_type, *args, lock=None):
Benjamin Petersone711caf2008-06-11 16:44:04 +000066 '''
67 Return a synchronization wrapper for a Value
68 '''
Benjamin Petersone711caf2008-06-11 16:44:04 +000069 obj = RawValue(typecode_or_type, *args)
70 if lock is None:
71 lock = RLock()
72 assert hasattr(lock, 'acquire')
73 return synchronized(obj, lock)
74
75def Array(typecode_or_type, size_or_initializer, **kwds):
76 '''
77 Return a synchronization wrapper for a RawArray
78 '''
79 lock = kwds.pop('lock', None)
80 if kwds:
81 raise ValueError('unrecognized keyword argument(s): %s' % list(kwds.keys()))
82 obj = RawArray(typecode_or_type, size_or_initializer)
83 if lock is None:
84 lock = RLock()
85 assert hasattr(lock, 'acquire')
86 return synchronized(obj, lock)
87
88def copy(obj):
89 new_obj = _new_value(type(obj))
90 ctypes.pointer(new_obj)[0] = obj
91 return new_obj
92
93def synchronized(obj, lock=None):
94 assert not isinstance(obj, SynchronizedBase), 'object already synchronized'
95
96 if isinstance(obj, ctypes._SimpleCData):
97 return Synchronized(obj, lock)
98 elif isinstance(obj, ctypes.Array):
99 if obj._type_ is ctypes.c_char:
100 return SynchronizedString(obj, lock)
101 return SynchronizedArray(obj, lock)
102 else:
103 cls = type(obj)
104 try:
105 scls = class_cache[cls]
106 except KeyError:
107 names = [field[0] for field in cls._fields_]
108 d = dict((name, make_property(name)) for name in names)
109 classname = 'Synchronized' + cls.__name__
110 scls = class_cache[cls] = type(classname, (SynchronizedBase,), d)
111 return scls(obj, lock)
112
113#
114# Functions for pickling/unpickling
115#
116
117def reduce_ctype(obj):
118 assert_spawning(obj)
119 if isinstance(obj, ctypes.Array):
120 return rebuild_ctype, (obj._type_, obj._wrapper, obj._length_)
121 else:
122 return rebuild_ctype, (type(obj), obj._wrapper, None)
123
124def rebuild_ctype(type_, wrapper, length):
125 if length is not None:
126 type_ = type_ * length
127 if sys.platform == 'win32' and type_ not in copyreg.dispatch_table:
128 copyreg.pickle(type_, reduce_ctype)
129 obj = type_.from_address(wrapper.get_address())
130 obj._wrapper = wrapper
131 return obj
132
133#
134# Function to create properties
135#
136
137def make_property(name):
138 try:
139 return prop_cache[name]
140 except KeyError:
141 d = {}
142 exec(template % ((name,)*7), d)
143 prop_cache[name] = d[name]
144 return d[name]
145
146template = '''
147def get%s(self):
148 self.acquire()
149 try:
150 return self._obj.%s
151 finally:
152 self.release()
153def set%s(self, value):
154 self.acquire()
155 try:
156 self._obj.%s = value
157 finally:
158 self.release()
159%s = property(get%s, set%s)
160'''
161
162prop_cache = {}
163class_cache = weakref.WeakKeyDictionary()
164
165#
166# Synchronized wrappers
167#
168
169class SynchronizedBase(object):
170
171 def __init__(self, obj, lock=None):
172 self._obj = obj
173 self._lock = lock or RLock()
174 self.acquire = self._lock.acquire
175 self.release = self._lock.release
176
177 def __reduce__(self):
178 assert_spawning(self)
179 return synchronized, (self._obj, self._lock)
180
181 def get_obj(self):
182 return self._obj
183
184 def get_lock(self):
185 return self._lock
186
187 def __repr__(self):
188 return '<%s wrapper for %s>' % (type(self).__name__, self._obj)
189
190
191class Synchronized(SynchronizedBase):
192 value = make_property('value')
193
194
195class SynchronizedArray(SynchronizedBase):
196
197 def __len__(self):
198 return len(self._obj)
199
200 def __getitem__(self, i):
201 self.acquire()
202 try:
203 return self._obj[i]
204 finally:
205 self.release()
206
207 def __setitem__(self, i, value):
208 self.acquire()
209 try:
210 self._obj[i] = value
211 finally:
212 self.release()
213
214 def __getslice__(self, start, stop):
215 self.acquire()
216 try:
217 return self._obj[start:stop]
218 finally:
219 self.release()
220
221 def __setslice__(self, start, stop, values):
222 self.acquire()
223 try:
224 self._obj[start:stop] = values
225 finally:
226 self.release()
227
228
229class SynchronizedString(SynchronizedArray):
230 value = make_property('value')
231 raw = make_property('raw')