blob: b94cd52c9f12b7c4c4c4b9a3b153043017d7248c [file] [log] [blame]
Benjamin Petersone711caf2008-06-11 16:44:04 +00001#
2# Module which supports allocation of ctypes objects from shared memory
3#
4# multiprocessing/sharedctypes.py
5#
6# Copyright (c) 2007-2008, R Oudkerk --- see COPYING.txt
7#
8
9import sys
10import ctypes
11import weakref
Benjamin Petersone711caf2008-06-11 16:44:04 +000012
13from multiprocessing import heap, RLock
Amaury Forgeot d'Arc949d47d2008-08-19 21:30:55 +000014from multiprocessing.forking import assert_spawning, ForkingPickler
Benjamin Petersone711caf2008-06-11 16:44:04 +000015
16__all__ = ['RawValue', 'RawArray', 'Value', 'Array', 'copy', 'synchronized']
17
18#
19#
20#
21
22typecode_to_type = {
23 'c': ctypes.c_char, 'u': ctypes.c_wchar,
24 'b': ctypes.c_byte, 'B': ctypes.c_ubyte,
25 'h': ctypes.c_short, 'H': ctypes.c_ushort,
26 'i': ctypes.c_int, 'I': ctypes.c_uint,
27 'l': ctypes.c_long, 'L': ctypes.c_ulong,
28 'f': ctypes.c_float, 'd': ctypes.c_double
29 }
30
31#
32#
33#
34
35def _new_value(type_):
36 size = ctypes.sizeof(type_)
37 wrapper = heap.BufferWrapper(size)
38 return rebuild_ctype(type_, wrapper, None)
39
40def RawValue(typecode_or_type, *args):
41 '''
42 Returns a ctypes object allocated from shared memory
43 '''
44 type_ = typecode_to_type.get(typecode_or_type, typecode_or_type)
45 obj = _new_value(type_)
46 ctypes.memset(ctypes.addressof(obj), 0, ctypes.sizeof(obj))
47 obj.__init__(*args)
48 return obj
49
50def RawArray(typecode_or_type, size_or_initializer):
51 '''
52 Returns a ctypes array allocated from shared memory
53 '''
54 type_ = typecode_to_type.get(typecode_or_type, typecode_or_type)
55 if isinstance(size_or_initializer, int):
56 type_ = type_ * size_or_initializer
57 return _new_value(type_)
58 else:
59 type_ = type_ * len(size_or_initializer)
60 result = _new_value(type_)
61 result.__init__(*size_or_initializer)
62 return result
63
Benjamin Petersond5cd65b2008-06-27 22:16:47 +000064def Value(typecode_or_type, *args, lock=None):
Benjamin Petersone711caf2008-06-11 16:44:04 +000065 '''
66 Return a synchronization wrapper for a Value
67 '''
Benjamin Petersone711caf2008-06-11 16:44:04 +000068 obj = RawValue(typecode_or_type, *args)
69 if lock is None:
70 lock = RLock()
71 assert hasattr(lock, 'acquire')
72 return synchronized(obj, lock)
73
74def Array(typecode_or_type, size_or_initializer, **kwds):
75 '''
76 Return a synchronization wrapper for a RawArray
77 '''
78 lock = kwds.pop('lock', None)
79 if kwds:
80 raise ValueError('unrecognized keyword argument(s): %s' % list(kwds.keys()))
81 obj = RawArray(typecode_or_type, size_or_initializer)
82 if lock is None:
83 lock = RLock()
84 assert hasattr(lock, 'acquire')
85 return synchronized(obj, lock)
86
87def copy(obj):
88 new_obj = _new_value(type(obj))
89 ctypes.pointer(new_obj)[0] = obj
90 return new_obj
91
92def synchronized(obj, lock=None):
93 assert not isinstance(obj, SynchronizedBase), 'object already synchronized'
94
95 if isinstance(obj, ctypes._SimpleCData):
96 return Synchronized(obj, lock)
97 elif isinstance(obj, ctypes.Array):
98 if obj._type_ is ctypes.c_char:
99 return SynchronizedString(obj, lock)
100 return SynchronizedArray(obj, lock)
101 else:
102 cls = type(obj)
103 try:
104 scls = class_cache[cls]
105 except KeyError:
106 names = [field[0] for field in cls._fields_]
107 d = dict((name, make_property(name)) for name in names)
108 classname = 'Synchronized' + cls.__name__
109 scls = class_cache[cls] = type(classname, (SynchronizedBase,), d)
110 return scls(obj, lock)
111
112#
113# Functions for pickling/unpickling
114#
115
116def reduce_ctype(obj):
117 assert_spawning(obj)
118 if isinstance(obj, ctypes.Array):
119 return rebuild_ctype, (obj._type_, obj._wrapper, obj._length_)
120 else:
121 return rebuild_ctype, (type(obj), obj._wrapper, None)
122
123def rebuild_ctype(type_, wrapper, length):
124 if length is not None:
125 type_ = type_ * length
Amaury Forgeot d'Arc949d47d2008-08-19 21:30:55 +0000126 ForkingPickler.register(type_, reduce_ctype)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000127 obj = type_.from_address(wrapper.get_address())
128 obj._wrapper = wrapper
129 return obj
130
131#
132# Function to create properties
133#
134
135def make_property(name):
136 try:
137 return prop_cache[name]
138 except KeyError:
139 d = {}
140 exec(template % ((name,)*7), d)
141 prop_cache[name] = d[name]
142 return d[name]
143
144template = '''
145def get%s(self):
146 self.acquire()
147 try:
148 return self._obj.%s
149 finally:
150 self.release()
151def set%s(self, value):
152 self.acquire()
153 try:
154 self._obj.%s = value
155 finally:
156 self.release()
157%s = property(get%s, set%s)
158'''
159
160prop_cache = {}
161class_cache = weakref.WeakKeyDictionary()
162
163#
164# Synchronized wrappers
165#
166
167class SynchronizedBase(object):
168
169 def __init__(self, obj, lock=None):
170 self._obj = obj
171 self._lock = lock or RLock()
172 self.acquire = self._lock.acquire
173 self.release = self._lock.release
174
175 def __reduce__(self):
176 assert_spawning(self)
177 return synchronized, (self._obj, self._lock)
178
179 def get_obj(self):
180 return self._obj
181
182 def get_lock(self):
183 return self._lock
184
185 def __repr__(self):
186 return '<%s wrapper for %s>' % (type(self).__name__, self._obj)
187
188
189class Synchronized(SynchronizedBase):
190 value = make_property('value')
191
192
193class SynchronizedArray(SynchronizedBase):
194
195 def __len__(self):
196 return len(self._obj)
197
198 def __getitem__(self, i):
199 self.acquire()
200 try:
201 return self._obj[i]
202 finally:
203 self.release()
204
205 def __setitem__(self, i, value):
206 self.acquire()
207 try:
208 self._obj[i] = value
209 finally:
210 self.release()
211
212 def __getslice__(self, start, stop):
213 self.acquire()
214 try:
215 return self._obj[start:stop]
216 finally:
217 self.release()
218
219 def __setslice__(self, start, stop, values):
220 self.acquire()
221 try:
222 self._obj[start:stop] = values
223 finally:
224 self.release()
225
226
227class SynchronizedString(SynchronizedArray):
228 value = make_property('value')
229 raw = make_property('raw')