blob: 0c178252d5087ebd27f9e7fbd4d72c78535505ff [file] [log] [blame]
Benjamin Petersone711caf2008-06-11 16:44:04 +00001#
2# Module which supports allocation of ctypes objects from shared memory
3#
4# multiprocessing/sharedctypes.py
5#
R. David Murrayd3820662010-12-14 01:41:07 +00006# Copyright (c) 2006-2008, R Oudkerk
Richard Oudkerk3e268aa2012-04-30 12:13:55 +01007# Licensed to PSF under a Contributor Agreement.
Benjamin Petersone711caf2008-06-11 16:44:04 +00008#
9
Benjamin Petersone711caf2008-06-11 16:44:04 +000010import ctypes
11import weakref
Benjamin Petersone711caf2008-06-11 16:44:04 +000012
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010013from . import heap
Richard Oudkerkb1694cf2013-10-16 16:41:56 +010014from . import get_context
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010015
Richard Oudkerkb1694cf2013-10-16 16:41:56 +010016from .context import assert_spawning
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010017from .reduction import ForkingPickler
Benjamin Petersone711caf2008-06-11 16:44:04 +000018
19__all__ = ['RawValue', 'RawArray', 'Value', 'Array', 'copy', 'synchronized']
20
21#
22#
23#
24
25typecode_to_type = {
26 'c': ctypes.c_char, 'u': ctypes.c_wchar,
27 'b': ctypes.c_byte, 'B': ctypes.c_ubyte,
28 'h': ctypes.c_short, 'H': ctypes.c_ushort,
29 'i': ctypes.c_int, 'I': ctypes.c_uint,
30 'l': ctypes.c_long, 'L': ctypes.c_ulong,
31 'f': ctypes.c_float, 'd': ctypes.c_double
32 }
33
34#
35#
36#
37
38def _new_value(type_):
39 size = ctypes.sizeof(type_)
40 wrapper = heap.BufferWrapper(size)
41 return rebuild_ctype(type_, wrapper, None)
42
43def RawValue(typecode_or_type, *args):
44 '''
45 Returns a ctypes object allocated from shared memory
46 '''
47 type_ = typecode_to_type.get(typecode_or_type, typecode_or_type)
48 obj = _new_value(type_)
49 ctypes.memset(ctypes.addressof(obj), 0, ctypes.sizeof(obj))
50 obj.__init__(*args)
51 return obj
52
53def RawArray(typecode_or_type, size_or_initializer):
54 '''
55 Returns a ctypes array allocated from shared memory
56 '''
57 type_ = typecode_to_type.get(typecode_or_type, typecode_or_type)
58 if isinstance(size_or_initializer, int):
59 type_ = type_ * size_or_initializer
Mark Dickinson89461ef2011-03-26 10:19:03 +000060 obj = _new_value(type_)
61 ctypes.memset(ctypes.addressof(obj), 0, ctypes.sizeof(obj))
62 return obj
Benjamin Petersone711caf2008-06-11 16:44:04 +000063 else:
64 type_ = type_ * len(size_or_initializer)
65 result = _new_value(type_)
66 result.__init__(*size_or_initializer)
67 return result
68
Richard Oudkerkb1694cf2013-10-16 16:41:56 +010069def Value(typecode_or_type, *args, lock=True, ctx=None):
Benjamin Petersone711caf2008-06-11 16:44:04 +000070 '''
71 Return a synchronization wrapper for a Value
72 '''
Benjamin Petersone711caf2008-06-11 16:44:04 +000073 obj = RawValue(typecode_or_type, *args)
Jesse Nollerb0516a62009-01-18 03:11:38 +000074 if lock is False:
75 return obj
76 if lock in (True, None):
Richard Oudkerkb1694cf2013-10-16 16:41:56 +010077 ctx = ctx or get_context()
78 lock = ctx.RLock()
Jesse Nollerb0516a62009-01-18 03:11:38 +000079 if not hasattr(lock, 'acquire'):
80 raise AttributeError("'%r' has no method 'acquire'" % lock)
Richard Oudkerkb1694cf2013-10-16 16:41:56 +010081 return synchronized(obj, lock, ctx=ctx)
Benjamin Petersone711caf2008-06-11 16:44:04 +000082
Richard Oudkerkb1694cf2013-10-16 16:41:56 +010083def Array(typecode_or_type, size_or_initializer, *, lock=True, ctx=None):
Benjamin Petersone711caf2008-06-11 16:44:04 +000084 '''
85 Return a synchronization wrapper for a RawArray
86 '''
Benjamin Petersone711caf2008-06-11 16:44:04 +000087 obj = RawArray(typecode_or_type, size_or_initializer)
Jesse Nollerb0516a62009-01-18 03:11:38 +000088 if lock is False:
89 return obj
90 if lock in (True, None):
Richard Oudkerkb1694cf2013-10-16 16:41:56 +010091 ctx = ctx or get_context()
92 lock = ctx.RLock()
Jesse Nollerb0516a62009-01-18 03:11:38 +000093 if not hasattr(lock, 'acquire'):
94 raise AttributeError("'%r' has no method 'acquire'" % lock)
Richard Oudkerkb1694cf2013-10-16 16:41:56 +010095 return synchronized(obj, lock, ctx=ctx)
Benjamin Petersone711caf2008-06-11 16:44:04 +000096
97def copy(obj):
98 new_obj = _new_value(type(obj))
99 ctypes.pointer(new_obj)[0] = obj
100 return new_obj
101
Richard Oudkerkb1694cf2013-10-16 16:41:56 +0100102def synchronized(obj, lock=None, ctx=None):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000103 assert not isinstance(obj, SynchronizedBase), 'object already synchronized'
Richard Oudkerkb1694cf2013-10-16 16:41:56 +0100104 ctx = ctx or get_context()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000105
106 if isinstance(obj, ctypes._SimpleCData):
Richard Oudkerkb1694cf2013-10-16 16:41:56 +0100107 return Synchronized(obj, lock, ctx)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000108 elif isinstance(obj, ctypes.Array):
109 if obj._type_ is ctypes.c_char:
Richard Oudkerkb1694cf2013-10-16 16:41:56 +0100110 return SynchronizedString(obj, lock, ctx)
111 return SynchronizedArray(obj, lock, ctx)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000112 else:
113 cls = type(obj)
114 try:
115 scls = class_cache[cls]
116 except KeyError:
117 names = [field[0] for field in cls._fields_]
118 d = dict((name, make_property(name)) for name in names)
119 classname = 'Synchronized' + cls.__name__
120 scls = class_cache[cls] = type(classname, (SynchronizedBase,), d)
Richard Oudkerkb1694cf2013-10-16 16:41:56 +0100121 return scls(obj, lock, ctx)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000122
123#
124# Functions for pickling/unpickling
125#
126
127def reduce_ctype(obj):
128 assert_spawning(obj)
129 if isinstance(obj, ctypes.Array):
130 return rebuild_ctype, (obj._type_, obj._wrapper, obj._length_)
131 else:
132 return rebuild_ctype, (type(obj), obj._wrapper, None)
133
134def rebuild_ctype(type_, wrapper, length):
135 if length is not None:
136 type_ = type_ * length
Amaury Forgeot d'Arc949d47d2008-08-19 21:30:55 +0000137 ForkingPickler.register(type_, reduce_ctype)
Richard Oudkerk26cdf1f2012-05-26 22:09:59 +0100138 buf = wrapper.create_memoryview()
139 obj = type_.from_buffer(buf)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000140 obj._wrapper = wrapper
141 return obj
142
143#
144# Function to create properties
145#
146
147def make_property(name):
148 try:
149 return prop_cache[name]
150 except KeyError:
151 d = {}
152 exec(template % ((name,)*7), d)
153 prop_cache[name] = d[name]
154 return d[name]
155
156template = '''
157def get%s(self):
158 self.acquire()
159 try:
160 return self._obj.%s
161 finally:
162 self.release()
163def set%s(self, value):
164 self.acquire()
165 try:
166 self._obj.%s = value
167 finally:
168 self.release()
169%s = property(get%s, set%s)
170'''
171
172prop_cache = {}
173class_cache = weakref.WeakKeyDictionary()
174
175#
176# Synchronized wrappers
177#
178
179class SynchronizedBase(object):
180
Richard Oudkerkb1694cf2013-10-16 16:41:56 +0100181 def __init__(self, obj, lock=None, ctx=None):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000182 self._obj = obj
Richard Oudkerkb1694cf2013-10-16 16:41:56 +0100183 if lock:
184 self._lock = lock
185 else:
186 ctx = ctx or get_context(force=True)
187 self._lock = ctx.RLock()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000188 self.acquire = self._lock.acquire
189 self.release = self._lock.release
190
191 def __reduce__(self):
192 assert_spawning(self)
193 return synchronized, (self._obj, self._lock)
194
195 def get_obj(self):
196 return self._obj
197
198 def get_lock(self):
199 return self._lock
200
201 def __repr__(self):
202 return '<%s wrapper for %s>' % (type(self).__name__, self._obj)
203
204
205class Synchronized(SynchronizedBase):
206 value = make_property('value')
207
208
209class SynchronizedArray(SynchronizedBase):
210
211 def __len__(self):
212 return len(self._obj)
213
214 def __getitem__(self, i):
215 self.acquire()
216 try:
217 return self._obj[i]
218 finally:
219 self.release()
220
221 def __setitem__(self, i, value):
222 self.acquire()
223 try:
224 self._obj[i] = value
225 finally:
226 self.release()
227
228 def __getslice__(self, start, stop):
229 self.acquire()
230 try:
231 return self._obj[start:stop]
232 finally:
233 self.release()
234
235 def __setslice__(self, start, stop, values):
236 self.acquire()
237 try:
238 self._obj[start:stop] = values
239 finally:
240 self.release()
241
242
243class SynchronizedString(SynchronizedArray):
244 value = make_property('value')
245 raw = make_property('raw')