blob: 901a05bfea8ef5288716fb7d6f56ec0c95d6aa78 [file] [log] [blame]
Guido van Rossum731630b1996-08-19 22:26:43 +00001"""Implements (a subset of) Sun XDR -- eXternal Data Representation.
2
3See: RFC 1014
4
5This module will conditionally use the _xdrmodule.so module to get
6support for those representations we can't do much with from Python.
7
8"""
9
10import struct
11from types import LongType
12
13# workaround Python 1.4b2 bug
14import sys
15sys.path[0] = '.'
16
17# use C layer XDR libraries for some data types if available
18try:
19 import _xdr
20except ImportError:
21 _xdr = None
22
23# this test is done to see if machine representation is the same as
24# network representation. if so, we can use module struct for packing
25# some data types
26__USE_MACHINE_REP = (struct.pack('l', 1) == '\0\0\0\1')
27
28# exceptions
29class Error:
30 """Exception class for this module. Use:
31
32 except xdrlib.Error, var:
33 # var has the Error instance for the exception
34
35 Public ivars:
36 msg -- contains the message
37
38 """
39 def __init__(self, msg):
40 self.msg = msg
41 def __repr__(self):
42 return repr(self.msg)
43 def __str__(self):
44 return str(self.msg)
45
46
47class ConversionError(Error):
48 pass
49
50
51
52class Packer:
53 """Pack various data representations into a buffer."""
54
55 def __init__(self):
56 self.reset()
57
58 def reset(self):
59 self.__buf = ''
60
61 def get_buffer(self):
62 return self.__buf
63 # backwards compatibility
64 get_buf = get_buffer
65
66 def pack_uint(self, x):
67 self.__buf = self.__buf + \
68 (chr(int(x>>24 & 0xff)) + chr(int(x>>16 & 0xff)) + \
69 chr(int(x>>8 & 0xff)) + chr(int(x & 0xff)))
70 if __USE_MACHINE_REP:
71 def pack_uint(self, x):
72 if type(x) == LongType:
73 x = int((x + 0x80000000L) % 0x100000000L - 0x80000000L)
74 self.__buf = self.__buf + struct.pack('l', x)
75
76 pack_int = pack_uint
77 pack_enum = pack_int
78
79 def pack_bool(self, x):
80 if x: self.__buf = self.__buf + '\0\0\0\1'
81 else: self.__buf = self.__buf + '\0\0\0\0'
82
83 def pack_uhyper(self, x):
84 self.pack_uint(int(x>>32 & 0xffffffff))
85 self.pack_uint(int(x & 0xffffffff))
86
87 pack_hyper = pack_uhyper
88
89 def pack_float(self, x):
90 raise ConversionError('Not supported')
91 def pack_double(self, x):
92 raise ConversionError('Not supported')
93 # get these from the C layer if available
94 if _xdr:
95 def pack_float(self, x):
96 try: self.__buf = self.__buf + _xdr.pack_float(x)
97 except _xdr.error, msg:
98 raise ConversionError(msg)
99 def pack_double(self, x):
100 try: self.__buf = self.__buf + _xdr.pack_double(x)
101 except _xdr.error, msg:
102 raise ConversionError(msg)
103
104 def pack_fstring(self, n, s):
105 if n < 0:
106 raise ValueError, 'fstring size must be nonnegative'
107 n = ((n+3)/4)*4
108 data = s[:n]
109 data = data + (n - len(data)) * '\0'
110 self.__buf = self.__buf + data
111
112 pack_fopaque = pack_fstring
113
114 def pack_string(self, s):
115 n = len(s)
116 self.pack_uint(n)
117 self.pack_fstring(n, s)
118
119 pack_opaque = pack_string
120 pack_bytes = pack_string
121
122 def pack_list(self, list, pack_item):
123 for item in list:
124 self.pack_uint(1)
125 pack_item(item)
126 self.pack_uint(0)
127
128 def pack_farray(self, n, list, pack_item):
129 if len(list) <> n:
130 raise ValueError, 'wrong array size'
131 for item in list:
132 pack_item(item)
133
134 def pack_array(self, list, pack_item):
135 n = len(list)
136 self.pack_uint(n)
137 self.pack_farray(n, list, pack_item)
138
139
140
141class Unpacker:
142 """Unpacks various data representations from the given buffer."""
143
144 def __init__(self, data):
145 self.reset(data)
146
147 def reset(self, data):
148 self.__buf = data
149 self.__pos = 0
150
151 def get_position(self):
152 return self.__pos
153
154 def set_position(self, position):
155 self.__pos = position
156
157 def done(self):
158 if self.__pos < len(self.__buf):
159 raise Error('unextracted data remains')
160
161 def unpack_uint(self):
162 i = self.__pos
163 self.__pos = j = i+4
164 data = self.__buf[i:j]
165 if len(data) < 4:
166 raise EOFError
167 x = long(ord(data[0]))<<24 | ord(data[1])<<16 | \
168 ord(data[2])<<8 | ord(data[3])
169 # Return a Python long only if the value is not representable
170 # as a nonnegative Python int
171 if x < 0x80000000L:
172 x = int(x)
173 return x
174 if __USE_MACHINE_REP:
175 def unpack_uint(self):
176 i = self.__pos
177 self.__pos = j = i+4
178 data = self.__buf[i:j]
179 if len(data) < 4:
180 raise EOFError
181 return struct.unpack('l', data)[0]
182
183 def unpack_int(self):
184 x = self.unpack_uint()
185 if x >= 0x80000000L:
186 x = x - 0x100000000L
187 return int(x)
188
189 unpack_enum = unpack_int
190 unpack_bool = unpack_int
191
192 def unpack_uhyper(self):
193 hi = self.unpack_uint()
194 lo = self.unpack_uint()
195 return long(hi)<<32 | lo
196
197 def unpack_hyper(self):
198 x = self.unpack_uhyper()
199 if x >= 0x8000000000000000L:
200 x = x - 0x10000000000000000L
201 return x
202
203 def unpack_float(self):
204 raise ConversionError('Not supported')
205 def unpack_double(self):
206 raise ConversionError('Not supported')
207 # get these from the C layer if available
208 if _xdr:
209 def unpack_float(self):
210 i = self.__pos
211 self.__pos = j = i+4
212 data = self.__buf[i:j]
213 if len(data) < 4:
214 raise EOFError
215 try: return _xdr.unpack_float(data)
216 except _xdr.error, msg:
217 raise ConversionError(msg)
218
219 def unpack_double(self):
220 i = self.__pos
221 self.__pos = j = i+8
222 data = self.__buf[i:j]
223 if len(data) < 8:
224 raise EOFError
225 try: return _xdr.unpack_double(data)
226 except _xdr.error, msg:
227 raise ConversionError(msg)
228
229 def unpack_fstring(self, n):
230 if n < 0:
231 raise ValueError, 'fstring size must be nonnegative'
232 i = self.__pos
233 j = i + (n+3)/4*4
234 if j > len(self.__buf):
235 raise EOFError
236 self.__pos = j
237 return self.__buf[i:i+n]
238
239 unpack_fopaque = unpack_fstring
240
241 def unpack_string(self):
242 n = self.unpack_uint()
243 return self.unpack_fstring(n)
244
245 unpack_opaque = unpack_string
246 unpack_bytes = unpack_string
247
248 def unpack_list(self, unpack_item):
249 list = []
250 while 1:
251 x = self.unpack_uint()
252 if x == 0: break
253 if x <> 1:
254 raise ConversionError('0 or 1 expected, got ' + `x`)
255 item = unpack_item()
256 list.append(item)
257 return list
258
259 def unpack_farray(self, n, unpack_item):
260 list = []
261 for i in range(n):
262 list.append(unpack_item())
263 return list
264
265 def unpack_array(self, unpack_item):
266 n = self.unpack_uint()
267 return self.unpack_farray(n, unpack_item)
268
269
270# test suite
271def __test():
272 p = Packer()
273 packtest = [
274 (p.pack_uint, (9,)),
275 (p.pack_bool, (None,)),
276 (p.pack_bool, ('hello',)),
277 (p.pack_uhyper, (45L,)),
278 (p.pack_float, (1.9,)),
279 (p.pack_double, (1.9,)),
280 (p.pack_string, ('hello world',)),
281 (p.pack_list, (range(5), p.pack_uint)),
282 (p.pack_array, (['what', 'is', 'hapnin', 'doctor'], p.pack_string)),
283 ]
284 succeedlist = [1] * len(packtest)
285 count = 0
286 for method, args in packtest:
287 print 'pack test', count,
288 try:
289 apply(method, args)
290 print 'succeeded'
291 except ConversionError, var:
292 print 'ConversionError:', var.msg
293 succeedlist[count] = 0
294 count = count + 1
295 data = p.get_buffer()
296 # now verify
297 up = Unpacker(data)
298 unpacktest = [
299 (up.unpack_uint, (), lambda x: x == 9),
300 (up.unpack_bool, (), lambda x: not x),
301 (up.unpack_bool, (), lambda x: x),
302 (up.unpack_uhyper, (), lambda x: x == 45L),
303 (up.unpack_float, (), lambda x: 1.89 < x < 1.91),
304 (up.unpack_double, (), lambda x: 1.89 < x < 1.91),
305 (up.unpack_string, (), lambda x: x == 'hello world'),
306 (up.unpack_list, (up.unpack_uint,), lambda x: x == range(5)),
307 (up.unpack_array, (up.unpack_string,),
308 lambda x: x == ['what', 'is', 'hapnin', 'doctor']),
309 ]
310 count = 0
311 for method, args, pred in unpacktest:
312 print 'unpack test', count,
313 try:
314 if succeedlist[count]:
315 x = apply(method, args)
316 print pred(x) and 'succeeded' or 'failed', ':', x
317 else:
318 print 'skipping'
319 except ConversionError, var:
320 print 'ConversionError:', var.msg
321 count = count + 1
322
323if __name__ == '__main__':
324 __test()