blob: 1be1b714e547d6b30d898398dfadd5d65a475558 [file] [log] [blame]
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001"""Unit tests for io.py."""
2
Guido van Rossum28524c72007-02-27 05:47:44 +00003import unittest
4from test import test_support
5
6import io
7
Guido van Rossum01a27522007-03-07 01:00:12 +00008class MockIO(io.RawIOBase):
9 def __init__(self, readStack=()):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000010 self._readStack = list(readStack)
Guido van Rossum01a27522007-03-07 01:00:12 +000011 self._writeStack = []
Guido van Rossum68bbcd22007-02-27 17:19:33 +000012
13 def read(self, n=None):
14 try:
15 return self._readStack.pop(0)
16 except:
17 return io.EOF
18
Guido van Rossum01a27522007-03-07 01:00:12 +000019 def write(self, b):
20 self._writeStack.append(b)
21 return len(b)
22
23 def writable(self):
24 return True
25
Guido van Rossum68bbcd22007-02-27 17:19:33 +000026 def fileno(self):
27 return 42
28
29 def readable(self):
30 return True
31
Guido van Rossum01a27522007-03-07 01:00:12 +000032 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000033 return True
34
Guido van Rossum01a27522007-03-07 01:00:12 +000035 def seek(self, pos, whence):
36 pass
37
38 def tell(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000039 return 42
40
Guido van Rossum01a27522007-03-07 01:00:12 +000041class MockNonBlockWriterIO(io.RawIOBase):
42 def __init__(self, blockingScript):
43 self.bs = list(blockingScript)
44 self._write_stack = []
45 def write(self, b):
46 self._write_stack.append(b)
47 n = self.bs.pop(0)
48 if (n < 0):
49 raise io.BlockingIO(0, "test blocking", -n)
50 else:
51 return n
52 def writable(self):
53 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +000054
Guido van Rossum28524c72007-02-27 05:47:44 +000055class IOTest(unittest.TestCase):
56
57 def write_ops(self, f):
58 f.write(b"blah.")
59 f.seek(0)
60 f.write(b"Hello.")
61 self.assertEqual(f.tell(), 6)
62 f.seek(-1, 1)
63 self.assertEqual(f.tell(), 5)
64 f.write(" world\n\n\n")
65 f.seek(0)
66 f.write("h")
67 f.seek(-2, 2)
68 f.truncate()
69
70 def read_ops(self, f):
71 data = f.read(5)
72 self.assertEqual(data, b"hello")
73 f.readinto(data)
74 self.assertEqual(data, b" worl")
75 f.readinto(data)
76 self.assertEqual(data, b"d\n")
77 f.seek(0)
78 self.assertEqual(f.read(20), b"hello world\n")
79 f.seek(-6, 2)
80 self.assertEqual(f.read(5), b"world")
81 f.seek(-6, 1)
82 self.assertEqual(f.read(5), b" worl")
83 self.assertEqual(f.tell(), 10)
84
85 def test_raw_file_io(self):
86 f = io.open(test_support.TESTFN, "wb", buffering=0)
87 self.assertEqual(f.readable(), False)
88 self.assertEqual(f.writable(), True)
89 self.assertEqual(f.seekable(), True)
90 self.write_ops(f)
91 f.close()
92 f = io.open(test_support.TESTFN, "rb", buffering=0)
93 self.assertEqual(f.readable(), True)
94 self.assertEqual(f.writable(), False)
95 self.assertEqual(f.seekable(), True)
96 self.read_ops(f)
97 f.close()
98
99 def test_raw_bytes_io(self):
100 f = io.BytesIO()
101 self.write_ops(f)
102 data = f.getvalue()
103 self.assertEqual(data, b"hello world\n")
104 f = io.BytesIO(data)
105 self.read_ops(f)
106
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000107class BytesIOTest(unittest.TestCase):
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000108 def testInit(self):
109 buf = b"1234567890"
110 bytesIo = io.BytesIO(buf)
111
112 def testRead(self):
113 buf = b"1234567890"
114 bytesIo = io.BytesIO(buf)
115
116 self.assertEquals(buf[:1], bytesIo.read(1))
117 self.assertEquals(buf[1:5], bytesIo.read(4))
118 self.assertEquals(buf[5:], bytesIo.read(900))
119 self.assertEquals(io.EOF, bytesIo.read())
120
121 def testReadNoArgs(self):
122 buf = b"1234567890"
123 bytesIo = io.BytesIO(buf)
124
125 self.assertEquals(buf, bytesIo.read())
126 self.assertEquals(io.EOF, bytesIo.read())
127
128 def testSeek(self):
129 buf = b"1234567890"
130 bytesIo = io.BytesIO(buf)
131
132 bytesIo.read(5)
133 bytesIo.seek(0)
134 self.assertEquals(buf, bytesIo.read())
135
136 bytesIo.seek(3)
137 self.assertEquals(buf[3:], bytesIo.read())
138
139 def testTell(self):
140 buf = b"1234567890"
141 bytesIo = io.BytesIO(buf)
142
143 self.assertEquals(0, bytesIo.tell())
144 bytesIo.seek(5)
145 self.assertEquals(5, bytesIo.tell())
146 bytesIo.seek(10000)
147 self.assertEquals(10000, bytesIo.tell())
148
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000149class BufferedReaderTest(unittest.TestCase):
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000150 def testRead(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000151 rawIo = MockIO((b"abc", b"d", b"efg"))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000152 bufIo = io.BufferedReader(rawIo)
153
154 self.assertEquals(b"abcdef", bufIo.read(6))
155
Guido van Rossum01a27522007-03-07 01:00:12 +0000156 def testReadNonBlocking(self):
157 # Inject some None's in there to simulate EWOULDBLOCK
158 rawIo = MockIO((b"abc", b"d", None, b"efg", None, None))
159 bufIo = io.BufferedReader(rawIo)
160
161 self.assertEquals(b"abcd", bufIo.read(6))
162 self.assertEquals(b"e", bufIo.read(1))
163 self.assertEquals(b"fg", bufIo.read())
164 self.assert_(None is bufIo.read())
165 self.assertEquals(io.EOF, bufIo.read())
166
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000167 def testReadToEof(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000168 rawIo = MockIO((b"abc", b"d", b"efg"))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000169 bufIo = io.BufferedReader(rawIo)
170
171 self.assertEquals(b"abcdefg", bufIo.read(9000))
172
173 def testReadNoArgs(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000174 rawIo = MockIO((b"abc", b"d", b"efg"))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000175 bufIo = io.BufferedReader(rawIo)
176
177 self.assertEquals(b"abcdefg", bufIo.read())
178
179 def testFileno(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000180 rawIo = MockIO((b"abc", b"d", b"efg"))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000181 bufIo = io.BufferedReader(rawIo)
182
183 self.assertEquals(42, bufIo.fileno())
184
185 def testFilenoNoFileno(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000186 # XXX will we always have fileno() function? If so, kill
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000187 # this test. Else, write it.
188 pass
189
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000190class BufferedWriterTest(unittest.TestCase):
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000191 def testWrite(self):
192 # Write to the buffered IO but don't overflow the buffer.
Guido van Rossum01a27522007-03-07 01:00:12 +0000193 writer = MockIO()
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000194 bufIo = io.BufferedWriter(writer, 8)
195
196 bufIo.write(b"abc")
197
198 self.assertFalse(writer._writeStack)
199
200 def testWriteOverflow(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000201 writer = MockIO()
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000202 bufIo = io.BufferedWriter(writer, 8)
203
204 bufIo.write(b"abc")
205 bufIo.write(b"defghijkl")
206
207 self.assertEquals(b"abcdefghijkl", writer._writeStack[0])
208
Guido van Rossum01a27522007-03-07 01:00:12 +0000209 def testWriteNonBlocking(self):
210 raw = MockNonBlockWriterIO((9, 2, 22, -6, 10, 12, 12))
211 bufIo = io.BufferedWriter(raw, 8, 16)
212
213 bufIo.write(b"asdf")
214 bufIo.write(b"asdfa")
215 self.assertEquals(b"asdfasdfa", raw._write_stack[0])
216
217 bufIo.write(b"asdfasdfasdf")
218 self.assertEquals(b"asdfasdfasdf", raw._write_stack[1])
219 bufIo.write(b"asdfasdfasdf")
220 self.assertEquals(b"dfasdfasdf", raw._write_stack[2])
221 self.assertEquals(b"asdfasdfasdf", raw._write_stack[3])
222
223 bufIo.write(b"asdfasdfasdf")
224
225 # XXX I don't like this test. It relies too heavily on how the algorithm
226 # actually works, which we might change. Refactor later.
227
228 def testFileno(self):
229 rawIo = MockIO((b"abc", b"d", b"efg"))
230 bufIo = io.BufferedWriter(rawIo)
231
232 self.assertEquals(42, bufIo.fileno())
233
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000234 def testFlush(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000235 writer = MockIO()
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000236 bufIo = io.BufferedWriter(writer, 8)
237
238 bufIo.write(b"abc")
239 bufIo.flush()
240
241 self.assertEquals(b"abc", writer._writeStack[0])
242
Guido van Rossum01a27522007-03-07 01:00:12 +0000243class BufferedRWPairTest(unittest.TestCase):
244 def testRWPair(self):
245 r = MockIO(())
246 w = MockIO()
247 pair = io.BufferedRWPair(r, w)
248
249 # XXX need implementation
250
251class BufferedRandom(unittest.TestCase):
252 def testReadAndWrite(self):
253 raw = MockIO((b"asdf", b"ghjk"))
254 rw = io.BufferedRandom(raw, 8, 12)
255
256 self.assertEqual(b"as", rw.read(2))
257 rw.write(b"ddd")
258 rw.write(b"eee")
259 self.assertFalse(raw._writeStack) # Buffer writes
260 self.assertEqual(b"ghjk", rw.read()) # This read forces write flush
261 self.assertEquals(b"dddeee", raw._writeStack[0])
262
263 def testSeekAndTell(self):
264 raw = io.BytesIO(b"asdfghjkl")
265 rw = io.BufferedRandom(raw)
266
267 self.assertEquals(b"as", rw.read(2))
268 self.assertEquals(2, rw.tell())
269 rw.seek(0, 0)
270 self.assertEquals(b"asdf", rw.read(4))
271
272 rw.write(b"asdf")
273 rw.seek(0, 0)
274 self.assertEquals(b"asdfasdfl", rw.read())
275 self.assertEquals(9, rw.tell())
276 rw.seek(-4, 2)
277 self.assertEquals(5, rw.tell())
278 rw.seek(2, 1)
279 self.assertEquals(7, rw.tell())
280 self.assertEquals(b"fl", rw.read(11))
281
282# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000283
Guido van Rossum28524c72007-02-27 05:47:44 +0000284def test_main():
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000285 test_support.run_unittest(IOTest, BytesIOTest, BufferedReaderTest,
Guido van Rossum01a27522007-03-07 01:00:12 +0000286 BufferedWriterTest, BufferedRWPairTest,
287 BufferedRandom)
Guido van Rossum28524c72007-02-27 05:47:44 +0000288
289if __name__ == "__main__":
290 test_main()