blob: 03cdfef322413b249c4b0fbabf8be873369fbdef [file] [log] [blame]
Guido van Rossum68bbcd22007-02-27 17:19:33 +00001"""Unit tests for io.py."""
2
Guido van Rossum28524c72007-02-27 05:47:44 +00003import unittest
4from test import test_support
5
6import io
7
Guido van Rossum01a27522007-03-07 01:00:12 +00008class MockIO(io.RawIOBase):
9 def __init__(self, readStack=()):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000010 self._readStack = list(readStack)
Guido van Rossum01a27522007-03-07 01:00:12 +000011 self._writeStack = []
Guido van Rossum68bbcd22007-02-27 17:19:33 +000012
13 def read(self, n=None):
14 try:
15 return self._readStack.pop(0)
16 except:
17 return io.EOF
18
Guido van Rossum01a27522007-03-07 01:00:12 +000019 def write(self, b):
20 self._writeStack.append(b)
21 return len(b)
22
23 def writable(self):
24 return True
25
Guido van Rossum68bbcd22007-02-27 17:19:33 +000026 def fileno(self):
27 return 42
28
29 def readable(self):
30 return True
31
Guido van Rossum01a27522007-03-07 01:00:12 +000032 def seekable(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000033 return True
34
Guido van Rossum01a27522007-03-07 01:00:12 +000035 def seek(self, pos, whence):
36 pass
37
38 def tell(self):
Guido van Rossum68bbcd22007-02-27 17:19:33 +000039 return 42
40
Guido van Rossum01a27522007-03-07 01:00:12 +000041class MockNonBlockWriterIO(io.RawIOBase):
42 def __init__(self, blockingScript):
43 self.bs = list(blockingScript)
44 self._write_stack = []
45 def write(self, b):
46 self._write_stack.append(b)
47 n = self.bs.pop(0)
48 if (n < 0):
49 raise io.BlockingIO(0, "test blocking", -n)
50 else:
51 return n
52 def writable(self):
53 return True
Guido van Rossum68bbcd22007-02-27 17:19:33 +000054
Guido van Rossum28524c72007-02-27 05:47:44 +000055class IOTest(unittest.TestCase):
56
57 def write_ops(self, f):
58 f.write(b"blah.")
59 f.seek(0)
60 f.write(b"Hello.")
61 self.assertEqual(f.tell(), 6)
62 f.seek(-1, 1)
63 self.assertEqual(f.tell(), 5)
64 f.write(" world\n\n\n")
65 f.seek(0)
66 f.write("h")
67 f.seek(-2, 2)
68 f.truncate()
69
70 def read_ops(self, f):
71 data = f.read(5)
72 self.assertEqual(data, b"hello")
Guido van Rossum00efead2007-03-07 05:23:25 +000073 n = f.readinto(data)
74 self.assertEqual(n, 5)
Guido van Rossum28524c72007-02-27 05:47:44 +000075 self.assertEqual(data, b" worl")
Guido van Rossum00efead2007-03-07 05:23:25 +000076 n = f.readinto(data)
77 self.assertEqual(n, 2)
78 self.assertEqual(len(data), 5)
79 self.assertEqual(data[:2], b"d\n")
Guido van Rossum28524c72007-02-27 05:47:44 +000080 f.seek(0)
81 self.assertEqual(f.read(20), b"hello world\n")
82 f.seek(-6, 2)
83 self.assertEqual(f.read(5), b"world")
84 f.seek(-6, 1)
85 self.assertEqual(f.read(5), b" worl")
86 self.assertEqual(f.tell(), 10)
87
88 def test_raw_file_io(self):
89 f = io.open(test_support.TESTFN, "wb", buffering=0)
90 self.assertEqual(f.readable(), False)
91 self.assertEqual(f.writable(), True)
92 self.assertEqual(f.seekable(), True)
93 self.write_ops(f)
94 f.close()
95 f = io.open(test_support.TESTFN, "rb", buffering=0)
96 self.assertEqual(f.readable(), True)
97 self.assertEqual(f.writable(), False)
98 self.assertEqual(f.seekable(), True)
99 self.read_ops(f)
100 f.close()
101
102 def test_raw_bytes_io(self):
103 f = io.BytesIO()
104 self.write_ops(f)
105 data = f.getvalue()
106 self.assertEqual(data, b"hello world\n")
107 f = io.BytesIO(data)
108 self.read_ops(f)
109
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000110class BytesIOTest(unittest.TestCase):
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000111 def testInit(self):
112 buf = b"1234567890"
113 bytesIo = io.BytesIO(buf)
114
115 def testRead(self):
116 buf = b"1234567890"
117 bytesIo = io.BytesIO(buf)
118
119 self.assertEquals(buf[:1], bytesIo.read(1))
120 self.assertEquals(buf[1:5], bytesIo.read(4))
121 self.assertEquals(buf[5:], bytesIo.read(900))
122 self.assertEquals(io.EOF, bytesIo.read())
123
124 def testReadNoArgs(self):
125 buf = b"1234567890"
126 bytesIo = io.BytesIO(buf)
127
128 self.assertEquals(buf, bytesIo.read())
129 self.assertEquals(io.EOF, bytesIo.read())
130
131 def testSeek(self):
132 buf = b"1234567890"
133 bytesIo = io.BytesIO(buf)
134
135 bytesIo.read(5)
136 bytesIo.seek(0)
137 self.assertEquals(buf, bytesIo.read())
138
139 bytesIo.seek(3)
140 self.assertEquals(buf[3:], bytesIo.read())
141
142 def testTell(self):
143 buf = b"1234567890"
144 bytesIo = io.BytesIO(buf)
145
146 self.assertEquals(0, bytesIo.tell())
147 bytesIo.seek(5)
148 self.assertEquals(5, bytesIo.tell())
149 bytesIo.seek(10000)
150 self.assertEquals(10000, bytesIo.tell())
151
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000152class BufferedReaderTest(unittest.TestCase):
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000153 def testRead(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000154 rawIo = MockIO((b"abc", b"d", b"efg"))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000155 bufIo = io.BufferedReader(rawIo)
156
157 self.assertEquals(b"abcdef", bufIo.read(6))
158
Guido van Rossum01a27522007-03-07 01:00:12 +0000159 def testReadNonBlocking(self):
160 # Inject some None's in there to simulate EWOULDBLOCK
161 rawIo = MockIO((b"abc", b"d", None, b"efg", None, None))
162 bufIo = io.BufferedReader(rawIo)
163
164 self.assertEquals(b"abcd", bufIo.read(6))
165 self.assertEquals(b"e", bufIo.read(1))
166 self.assertEquals(b"fg", bufIo.read())
167 self.assert_(None is bufIo.read())
168 self.assertEquals(io.EOF, bufIo.read())
169
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000170 def testReadToEof(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000171 rawIo = MockIO((b"abc", b"d", b"efg"))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000172 bufIo = io.BufferedReader(rawIo)
173
174 self.assertEquals(b"abcdefg", bufIo.read(9000))
175
176 def testReadNoArgs(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000177 rawIo = MockIO((b"abc", b"d", b"efg"))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000178 bufIo = io.BufferedReader(rawIo)
179
180 self.assertEquals(b"abcdefg", bufIo.read())
181
182 def testFileno(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000183 rawIo = MockIO((b"abc", b"d", b"efg"))
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000184 bufIo = io.BufferedReader(rawIo)
185
186 self.assertEquals(42, bufIo.fileno())
187
188 def testFilenoNoFileno(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000189 # XXX will we always have fileno() function? If so, kill
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000190 # this test. Else, write it.
191 pass
192
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000193class BufferedWriterTest(unittest.TestCase):
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000194 def testWrite(self):
195 # Write to the buffered IO but don't overflow the buffer.
Guido van Rossum01a27522007-03-07 01:00:12 +0000196 writer = MockIO()
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000197 bufIo = io.BufferedWriter(writer, 8)
198
199 bufIo.write(b"abc")
200
201 self.assertFalse(writer._writeStack)
202
203 def testWriteOverflow(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000204 writer = MockIO()
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000205 bufIo = io.BufferedWriter(writer, 8)
206
207 bufIo.write(b"abc")
208 bufIo.write(b"defghijkl")
209
210 self.assertEquals(b"abcdefghijkl", writer._writeStack[0])
211
Guido van Rossum01a27522007-03-07 01:00:12 +0000212 def testWriteNonBlocking(self):
213 raw = MockNonBlockWriterIO((9, 2, 22, -6, 10, 12, 12))
214 bufIo = io.BufferedWriter(raw, 8, 16)
215
216 bufIo.write(b"asdf")
217 bufIo.write(b"asdfa")
218 self.assertEquals(b"asdfasdfa", raw._write_stack[0])
219
220 bufIo.write(b"asdfasdfasdf")
221 self.assertEquals(b"asdfasdfasdf", raw._write_stack[1])
222 bufIo.write(b"asdfasdfasdf")
223 self.assertEquals(b"dfasdfasdf", raw._write_stack[2])
224 self.assertEquals(b"asdfasdfasdf", raw._write_stack[3])
225
226 bufIo.write(b"asdfasdfasdf")
227
228 # XXX I don't like this test. It relies too heavily on how the algorithm
229 # actually works, which we might change. Refactor later.
230
231 def testFileno(self):
232 rawIo = MockIO((b"abc", b"d", b"efg"))
233 bufIo = io.BufferedWriter(rawIo)
234
235 self.assertEquals(42, bufIo.fileno())
236
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000237 def testFlush(self):
Guido van Rossum01a27522007-03-07 01:00:12 +0000238 writer = MockIO()
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000239 bufIo = io.BufferedWriter(writer, 8)
240
241 bufIo.write(b"abc")
242 bufIo.flush()
243
244 self.assertEquals(b"abc", writer._writeStack[0])
245
Guido van Rossum01a27522007-03-07 01:00:12 +0000246class BufferedRWPairTest(unittest.TestCase):
247 def testRWPair(self):
248 r = MockIO(())
249 w = MockIO()
250 pair = io.BufferedRWPair(r, w)
251
252 # XXX need implementation
253
254class BufferedRandom(unittest.TestCase):
255 def testReadAndWrite(self):
256 raw = MockIO((b"asdf", b"ghjk"))
257 rw = io.BufferedRandom(raw, 8, 12)
258
259 self.assertEqual(b"as", rw.read(2))
260 rw.write(b"ddd")
261 rw.write(b"eee")
262 self.assertFalse(raw._writeStack) # Buffer writes
263 self.assertEqual(b"ghjk", rw.read()) # This read forces write flush
264 self.assertEquals(b"dddeee", raw._writeStack[0])
265
266 def testSeekAndTell(self):
267 raw = io.BytesIO(b"asdfghjkl")
268 rw = io.BufferedRandom(raw)
269
270 self.assertEquals(b"as", rw.read(2))
271 self.assertEquals(2, rw.tell())
272 rw.seek(0, 0)
273 self.assertEquals(b"asdf", rw.read(4))
274
275 rw.write(b"asdf")
276 rw.seek(0, 0)
277 self.assertEquals(b"asdfasdfl", rw.read())
278 self.assertEquals(9, rw.tell())
279 rw.seek(-4, 2)
280 self.assertEquals(5, rw.tell())
281 rw.seek(2, 1)
282 self.assertEquals(7, rw.tell())
283 self.assertEquals(b"fl", rw.read(11))
284
285# XXX Tests for open()
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000286
Guido van Rossum28524c72007-02-27 05:47:44 +0000287def test_main():
Guido van Rossum68bbcd22007-02-27 17:19:33 +0000288 test_support.run_unittest(IOTest, BytesIOTest, BufferedReaderTest,
Guido van Rossum01a27522007-03-07 01:00:12 +0000289 BufferedWriterTest, BufferedRWPairTest,
290 BufferedRandom)
Guido van Rossum28524c72007-02-27 05:47:44 +0000291
292if __name__ == "__main__":
293 test_main()