blob: 193130e2b68ca09ac59bb049a6166932670965a4 [file] [log] [blame]
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001"""Unit tests for io.py."""
2
Guido van Rossum28524c72007-02-27 05:47:44 +00003import unittest
4from test import test_support
5
6import io
7
Guido van Rossum01a27522007-03-07 01:00:12 +00008class MockIO(io.RawIOBase):
9 def __init__(self, readStack=()):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000010 self._readStack = list(readStack)
Guido van Rossum01a27522007-03-07 01:00:12 +000011 self._writeStack = []
Guido van Rossum68bbcd22007-02-27 17:19:33 +000012
13 def read(self, n=None):
14 try:
15 return self._readStack.pop(0)
16 except:
17 return io.EOF
18
Guido van Rossum01a27522007-03-07 01:00:12 +000019 def write(self, b):
20 self._writeStack.append(b)
21 return len(b)
22
23 def writable(self):
24 return True
25
Guido van Rossum68bbcd22007-02-27 17:19:33 +000026 def fileno(self):
27 return 42
28
29 def readable(self):
30 return True
31
Guido van Rossum01a27522007-03-07 01:00:12 +000032 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000033 return True
34
Guido van Rossum01a27522007-03-07 01:00:12 +000035 def seek(self, pos, whence):
36 pass
37
38 def tell(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000039 return 42
40
Guido van Rossum01a27522007-03-07 01:00:12 +000041class MockNonBlockWriterIO(io.RawIOBase):
42 def __init__(self, blockingScript):
43 self.bs = list(blockingScript)
44 self._write_stack = []
45 def write(self, b):
46 self._write_stack.append(b)
47 n = self.bs.pop(0)
48 if (n < 0):
49 raise io.BlockingIO(0, "test blocking", -n)
50 else:
51 return n
52 def writable(self):
53 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +000054
Guido van Rossum28524c72007-02-27 05:47:44 +000055class IOTest(unittest.TestCase):
56
Guido van Rossum4d0f5a42007-03-07 22:59:39 +000057 def tearDown(self):
58 test_support.unlink(test_support.TESTFN)
59
Guido van Rossum28524c72007-02-27 05:47:44 +000060 def write_ops(self, f):
61 f.write(b"blah.")
62 f.seek(0)
63 f.write(b"Hello.")
64 self.assertEqual(f.tell(), 6)
65 f.seek(-1, 1)
66 self.assertEqual(f.tell(), 5)
67 f.write(" world\n\n\n")
68 f.seek(0)
69 f.write("h")
70 f.seek(-2, 2)
71 f.truncate()
72
73 def read_ops(self, f):
74 data = f.read(5)
75 self.assertEqual(data, b"hello")
Guido van Rossum00efead2007-03-07 05:23:25 +000076 n = f.readinto(data)
77 self.assertEqual(n, 5)
Guido van Rossum28524c72007-02-27 05:47:44 +000078 self.assertEqual(data, b" worl")
Guido van Rossum00efead2007-03-07 05:23:25 +000079 n = f.readinto(data)
80 self.assertEqual(n, 2)
81 self.assertEqual(len(data), 5)
82 self.assertEqual(data[:2], b"d\n")
Guido van Rossum28524c72007-02-27 05:47:44 +000083 f.seek(0)
84 self.assertEqual(f.read(20), b"hello world\n")
85 f.seek(-6, 2)
86 self.assertEqual(f.read(5), b"world")
87 f.seek(-6, 1)
88 self.assertEqual(f.read(5), b" worl")
89 self.assertEqual(f.tell(), 10)
90
91 def test_raw_file_io(self):
92 f = io.open(test_support.TESTFN, "wb", buffering=0)
93 self.assertEqual(f.readable(), False)
94 self.assertEqual(f.writable(), True)
95 self.assertEqual(f.seekable(), True)
96 self.write_ops(f)
97 f.close()
98 f = io.open(test_support.TESTFN, "rb", buffering=0)
99 self.assertEqual(f.readable(), True)
100 self.assertEqual(f.writable(), False)
101 self.assertEqual(f.seekable(), True)
102 self.read_ops(f)
103 f.close()
104
105 def test_raw_bytes_io(self):
106 f = io.BytesIO()
107 self.write_ops(f)
108 data = f.getvalue()
109 self.assertEqual(data, b"hello world\n")
110 f = io.BytesIO(data)
111 self.read_ops(f)
112
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000113class BytesIOTest(unittest.TestCase):
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000114 def testInit(self):
115 buf = b"1234567890"
116 bytesIo = io.BytesIO(buf)
117
118 def testRead(self):
119 buf = b"1234567890"
120 bytesIo = io.BytesIO(buf)
121
122 self.assertEquals(buf[:1], bytesIo.read(1))
123 self.assertEquals(buf[1:5], bytesIo.read(4))
124 self.assertEquals(buf[5:], bytesIo.read(900))
125 self.assertEquals(io.EOF, bytesIo.read())
126
127 def testReadNoArgs(self):
128 buf = b"1234567890"
129 bytesIo = io.BytesIO(buf)
130
131 self.assertEquals(buf, bytesIo.read())
132 self.assertEquals(io.EOF, bytesIo.read())
133
134 def testSeek(self):
135 buf = b"1234567890"
136 bytesIo = io.BytesIO(buf)
137
138 bytesIo.read(5)
139 bytesIo.seek(0)
140 self.assertEquals(buf, bytesIo.read())
141
142 bytesIo.seek(3)
143 self.assertEquals(buf[3:], bytesIo.read())
144
145 def testTell(self):
146 buf = b"1234567890"
147 bytesIo = io.BytesIO(buf)
148
149 self.assertEquals(0, bytesIo.tell())
150 bytesIo.seek(5)
151 self.assertEquals(5, bytesIo.tell())
152 bytesIo.seek(10000)
153 self.assertEquals(10000, bytesIo.tell())
154
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000155class BufferedReaderTest(unittest.TestCase):
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000156 def testRead(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000157 rawIo = MockIO((b"abc", b"d", b"efg"))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000158 bufIo = io.BufferedReader(rawIo)
159
160 self.assertEquals(b"abcdef", bufIo.read(6))
161
Guido van Rossum01a27522007-03-07 01:00:12 +0000162 def testReadNonBlocking(self):
163 # Inject some None's in there to simulate EWOULDBLOCK
164 rawIo = MockIO((b"abc", b"d", None, b"efg", None, None))
165 bufIo = io.BufferedReader(rawIo)
166
167 self.assertEquals(b"abcd", bufIo.read(6))
168 self.assertEquals(b"e", bufIo.read(1))
169 self.assertEquals(b"fg", bufIo.read())
170 self.assert_(None is bufIo.read())
171 self.assertEquals(io.EOF, bufIo.read())
172
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000173 def testReadToEof(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000174 rawIo = MockIO((b"abc", b"d", b"efg"))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000175 bufIo = io.BufferedReader(rawIo)
176
177 self.assertEquals(b"abcdefg", bufIo.read(9000))
178
179 def testReadNoArgs(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000180 rawIo = MockIO((b"abc", b"d", b"efg"))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000181 bufIo = io.BufferedReader(rawIo)
182
183 self.assertEquals(b"abcdefg", bufIo.read())
184
185 def testFileno(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000186 rawIo = MockIO((b"abc", b"d", b"efg"))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000187 bufIo = io.BufferedReader(rawIo)
188
189 self.assertEquals(42, bufIo.fileno())
190
191 def testFilenoNoFileno(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000192 # XXX will we always have fileno() function? If so, kill
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000193 # this test. Else, write it.
194 pass
195
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000196class BufferedWriterTest(unittest.TestCase):
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000197 def testWrite(self):
198 # Write to the buffered IO but don't overflow the buffer.
Guido van Rossum01a27522007-03-07 01:00:12 +0000199 writer = MockIO()
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000200 bufIo = io.BufferedWriter(writer, 8)
201
202 bufIo.write(b"abc")
203
204 self.assertFalse(writer._writeStack)
205
206 def testWriteOverflow(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000207 writer = MockIO()
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000208 bufIo = io.BufferedWriter(writer, 8)
209
210 bufIo.write(b"abc")
211 bufIo.write(b"defghijkl")
212
213 self.assertEquals(b"abcdefghijkl", writer._writeStack[0])
214
Guido van Rossum01a27522007-03-07 01:00:12 +0000215 def testWriteNonBlocking(self):
216 raw = MockNonBlockWriterIO((9, 2, 22, -6, 10, 12, 12))
217 bufIo = io.BufferedWriter(raw, 8, 16)
218
219 bufIo.write(b"asdf")
220 bufIo.write(b"asdfa")
221 self.assertEquals(b"asdfasdfa", raw._write_stack[0])
222
223 bufIo.write(b"asdfasdfasdf")
224 self.assertEquals(b"asdfasdfasdf", raw._write_stack[1])
225 bufIo.write(b"asdfasdfasdf")
226 self.assertEquals(b"dfasdfasdf", raw._write_stack[2])
227 self.assertEquals(b"asdfasdfasdf", raw._write_stack[3])
228
229 bufIo.write(b"asdfasdfasdf")
230
231 # XXX I don't like this test. It relies too heavily on how the algorithm
232 # actually works, which we might change. Refactor later.
233
234 def testFileno(self):
235 rawIo = MockIO((b"abc", b"d", b"efg"))
236 bufIo = io.BufferedWriter(rawIo)
237
238 self.assertEquals(42, bufIo.fileno())
239
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000240 def testFlush(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000241 writer = MockIO()
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000242 bufIo = io.BufferedWriter(writer, 8)
243
244 bufIo.write(b"abc")
245 bufIo.flush()
246
247 self.assertEquals(b"abc", writer._writeStack[0])
248
Guido van Rossum01a27522007-03-07 01:00:12 +0000249class BufferedRWPairTest(unittest.TestCase):
250 def testRWPair(self):
251 r = MockIO(())
252 w = MockIO()
253 pair = io.BufferedRWPair(r, w)
254
255 # XXX need implementation
256
257class BufferedRandom(unittest.TestCase):
258 def testReadAndWrite(self):
259 raw = MockIO((b"asdf", b"ghjk"))
260 rw = io.BufferedRandom(raw, 8, 12)
261
262 self.assertEqual(b"as", rw.read(2))
263 rw.write(b"ddd")
264 rw.write(b"eee")
265 self.assertFalse(raw._writeStack) # Buffer writes
266 self.assertEqual(b"ghjk", rw.read()) # This read forces write flush
267 self.assertEquals(b"dddeee", raw._writeStack[0])
268
269 def testSeekAndTell(self):
270 raw = io.BytesIO(b"asdfghjkl")
271 rw = io.BufferedRandom(raw)
272
273 self.assertEquals(b"as", rw.read(2))
274 self.assertEquals(2, rw.tell())
275 rw.seek(0, 0)
276 self.assertEquals(b"asdf", rw.read(4))
277
278 rw.write(b"asdf")
279 rw.seek(0, 0)
280 self.assertEquals(b"asdfasdfl", rw.read())
281 self.assertEquals(9, rw.tell())
282 rw.seek(-4, 2)
283 self.assertEquals(5, rw.tell())
284 rw.seek(2, 1)
285 self.assertEquals(7, rw.tell())
286 self.assertEquals(b"fl", rw.read(11))
287
288# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000289
Guido van Rossum28524c72007-02-27 05:47:44 +0000290def test_main():
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000291 test_support.run_unittest(IOTest, BytesIOTest, BufferedReaderTest,
Guido van Rossum01a27522007-03-07 01:00:12 +0000292 BufferedWriterTest, BufferedRWPairTest,
293 BufferedRandom)
Guido van Rossum28524c72007-02-27 05:47:44 +0000294
295if __name__ == "__main__":
296 test_main()