Guido van Rossum | 68bbcd2 | 2007-02-27 17:19:33 +0000 | [diff] [blame] | 1 | """Unit tests for io.py.""" |
| 2 | |
Guido van Rossum | 28524c7 | 2007-02-27 05:47:44 +0000 | [diff] [blame] | 3 | import unittest |
| 4 | from test import test_support |
| 5 | |
| 6 | import io |
| 7 | |
Guido van Rossum | a9e2024 | 2007-03-08 00:43:48 +0000 | [diff] [blame^] | 8 | |
Guido van Rossum | 01a2752 | 2007-03-07 01:00:12 +0000 | [diff] [blame] | 9 | class MockIO(io.RawIOBase): |
Guido van Rossum | a9e2024 | 2007-03-08 00:43:48 +0000 | [diff] [blame^] | 10 | |
Guido van Rossum | 01a2752 | 2007-03-07 01:00:12 +0000 | [diff] [blame] | 11 | def __init__(self, readStack=()): |
Guido van Rossum | 68bbcd2 | 2007-02-27 17:19:33 +0000 | [diff] [blame] | 12 | self._readStack = list(readStack) |
Guido van Rossum | 01a2752 | 2007-03-07 01:00:12 +0000 | [diff] [blame] | 13 | self._writeStack = [] |
Guido van Rossum | 68bbcd2 | 2007-02-27 17:19:33 +0000 | [diff] [blame] | 14 | |
| 15 | def read(self, n=None): |
| 16 | try: |
| 17 | return self._readStack.pop(0) |
| 18 | except: |
| 19 | return io.EOF |
| 20 | |
Guido van Rossum | 01a2752 | 2007-03-07 01:00:12 +0000 | [diff] [blame] | 21 | def write(self, b): |
| 22 | self._writeStack.append(b) |
| 23 | return len(b) |
| 24 | |
| 25 | def writable(self): |
| 26 | return True |
| 27 | |
Guido van Rossum | 68bbcd2 | 2007-02-27 17:19:33 +0000 | [diff] [blame] | 28 | def fileno(self): |
| 29 | return 42 |
| 30 | |
| 31 | def readable(self): |
| 32 | return True |
| 33 | |
Guido van Rossum | 01a2752 | 2007-03-07 01:00:12 +0000 | [diff] [blame] | 34 | def seekable(self): |
Guido van Rossum | 68bbcd2 | 2007-02-27 17:19:33 +0000 | [diff] [blame] | 35 | return True |
| 36 | |
Guido van Rossum | 01a2752 | 2007-03-07 01:00:12 +0000 | [diff] [blame] | 37 | def seek(self, pos, whence): |
| 38 | pass |
| 39 | |
| 40 | def tell(self): |
Guido van Rossum | 68bbcd2 | 2007-02-27 17:19:33 +0000 | [diff] [blame] | 41 | return 42 |
| 42 | |
Guido van Rossum | a9e2024 | 2007-03-08 00:43:48 +0000 | [diff] [blame^] | 43 | |
Guido van Rossum | 01a2752 | 2007-03-07 01:00:12 +0000 | [diff] [blame] | 44 | class MockNonBlockWriterIO(io.RawIOBase): |
Guido van Rossum | a9e2024 | 2007-03-08 00:43:48 +0000 | [diff] [blame^] | 45 | |
Guido van Rossum | 01a2752 | 2007-03-07 01:00:12 +0000 | [diff] [blame] | 46 | def __init__(self, blockingScript): |
| 47 | self.bs = list(blockingScript) |
| 48 | self._write_stack = [] |
Guido van Rossum | a9e2024 | 2007-03-08 00:43:48 +0000 | [diff] [blame^] | 49 | |
Guido van Rossum | 01a2752 | 2007-03-07 01:00:12 +0000 | [diff] [blame] | 50 | def write(self, b): |
| 51 | self._write_stack.append(b) |
| 52 | n = self.bs.pop(0) |
| 53 | if (n < 0): |
| 54 | raise io.BlockingIO(0, "test blocking", -n) |
| 55 | else: |
| 56 | return n |
Guido van Rossum | a9e2024 | 2007-03-08 00:43:48 +0000 | [diff] [blame^] | 57 | |
Guido van Rossum | 01a2752 | 2007-03-07 01:00:12 +0000 | [diff] [blame] | 58 | def writable(self): |
| 59 | return True |
Guido van Rossum | 68bbcd2 | 2007-02-27 17:19:33 +0000 | [diff] [blame] | 60 | |
Guido van Rossum | a9e2024 | 2007-03-08 00:43:48 +0000 | [diff] [blame^] | 61 | |
Guido van Rossum | 28524c7 | 2007-02-27 05:47:44 +0000 | [diff] [blame] | 62 | class IOTest(unittest.TestCase): |
| 63 | |
Guido van Rossum | 4d0f5a4 | 2007-03-07 22:59:39 +0000 | [diff] [blame] | 64 | def tearDown(self): |
| 65 | test_support.unlink(test_support.TESTFN) |
| 66 | |
Guido van Rossum | 28524c7 | 2007-02-27 05:47:44 +0000 | [diff] [blame] | 67 | def write_ops(self, f): |
| 68 | f.write(b"blah.") |
| 69 | f.seek(0) |
| 70 | f.write(b"Hello.") |
| 71 | self.assertEqual(f.tell(), 6) |
| 72 | f.seek(-1, 1) |
| 73 | self.assertEqual(f.tell(), 5) |
| 74 | f.write(" world\n\n\n") |
| 75 | f.seek(0) |
| 76 | f.write("h") |
| 77 | f.seek(-2, 2) |
| 78 | f.truncate() |
| 79 | |
| 80 | def read_ops(self, f): |
| 81 | data = f.read(5) |
| 82 | self.assertEqual(data, b"hello") |
Guido van Rossum | 00efead | 2007-03-07 05:23:25 +0000 | [diff] [blame] | 83 | n = f.readinto(data) |
| 84 | self.assertEqual(n, 5) |
Guido van Rossum | 28524c7 | 2007-02-27 05:47:44 +0000 | [diff] [blame] | 85 | self.assertEqual(data, b" worl") |
Guido van Rossum | 00efead | 2007-03-07 05:23:25 +0000 | [diff] [blame] | 86 | n = f.readinto(data) |
| 87 | self.assertEqual(n, 2) |
| 88 | self.assertEqual(len(data), 5) |
| 89 | self.assertEqual(data[:2], b"d\n") |
Guido van Rossum | 28524c7 | 2007-02-27 05:47:44 +0000 | [diff] [blame] | 90 | f.seek(0) |
| 91 | self.assertEqual(f.read(20), b"hello world\n") |
| 92 | f.seek(-6, 2) |
| 93 | self.assertEqual(f.read(5), b"world") |
| 94 | f.seek(-6, 1) |
| 95 | self.assertEqual(f.read(5), b" worl") |
| 96 | self.assertEqual(f.tell(), 10) |
| 97 | |
| 98 | def test_raw_file_io(self): |
| 99 | f = io.open(test_support.TESTFN, "wb", buffering=0) |
| 100 | self.assertEqual(f.readable(), False) |
| 101 | self.assertEqual(f.writable(), True) |
| 102 | self.assertEqual(f.seekable(), True) |
| 103 | self.write_ops(f) |
| 104 | f.close() |
| 105 | f = io.open(test_support.TESTFN, "rb", buffering=0) |
| 106 | self.assertEqual(f.readable(), True) |
| 107 | self.assertEqual(f.writable(), False) |
| 108 | self.assertEqual(f.seekable(), True) |
| 109 | self.read_ops(f) |
| 110 | f.close() |
| 111 | |
| 112 | def test_raw_bytes_io(self): |
| 113 | f = io.BytesIO() |
| 114 | self.write_ops(f) |
| 115 | data = f.getvalue() |
| 116 | self.assertEqual(data, b"hello world\n") |
| 117 | f = io.BytesIO(data) |
| 118 | self.read_ops(f) |
| 119 | |
Guido van Rossum | a9e2024 | 2007-03-08 00:43:48 +0000 | [diff] [blame^] | 120 | def test_fileio_FileIO(self): |
| 121 | import _fileio |
| 122 | f = _fileio._FileIO(test_support.TESTFN, "w") |
| 123 | self.assertEqual(f.readable(), False) |
| 124 | self.assertEqual(f.writable(), True) |
| 125 | self.assertEqual(f.seekable(), True) |
| 126 | self.write_ops(f) |
| 127 | f.close() |
| 128 | f = _fileio._FileIO(test_support.TESTFN, "r") |
| 129 | self.assertEqual(f.readable(), True) |
| 130 | self.assertEqual(f.writable(), False) |
| 131 | self.assertEqual(f.seekable(), True) |
| 132 | self.read_ops(f) |
| 133 | f.close() |
| 134 | |
| 135 | def test_PyFileIO(self): |
| 136 | f = io._PyFileIO(test_support.TESTFN, "w") |
| 137 | self.assertEqual(f.readable(), False) |
| 138 | self.assertEqual(f.writable(), True) |
| 139 | self.assertEqual(f.seekable(), True) |
| 140 | self.write_ops(f) |
| 141 | f.close() |
| 142 | f = io._PyFileIO(test_support.TESTFN, "r") |
| 143 | self.assertEqual(f.readable(), True) |
| 144 | self.assertEqual(f.writable(), False) |
| 145 | self.assertEqual(f.seekable(), True) |
| 146 | self.read_ops(f) |
| 147 | f.close() |
| 148 | |
| 149 | |
Guido van Rossum | 68bbcd2 | 2007-02-27 17:19:33 +0000 | [diff] [blame] | 150 | class BytesIOTest(unittest.TestCase): |
Guido van Rossum | a9e2024 | 2007-03-08 00:43:48 +0000 | [diff] [blame^] | 151 | |
Guido van Rossum | 68bbcd2 | 2007-02-27 17:19:33 +0000 | [diff] [blame] | 152 | def testInit(self): |
| 153 | buf = b"1234567890" |
| 154 | bytesIo = io.BytesIO(buf) |
| 155 | |
| 156 | def testRead(self): |
| 157 | buf = b"1234567890" |
| 158 | bytesIo = io.BytesIO(buf) |
| 159 | |
| 160 | self.assertEquals(buf[:1], bytesIo.read(1)) |
| 161 | self.assertEquals(buf[1:5], bytesIo.read(4)) |
| 162 | self.assertEquals(buf[5:], bytesIo.read(900)) |
| 163 | self.assertEquals(io.EOF, bytesIo.read()) |
| 164 | |
| 165 | def testReadNoArgs(self): |
| 166 | buf = b"1234567890" |
| 167 | bytesIo = io.BytesIO(buf) |
| 168 | |
| 169 | self.assertEquals(buf, bytesIo.read()) |
| 170 | self.assertEquals(io.EOF, bytesIo.read()) |
| 171 | |
| 172 | def testSeek(self): |
| 173 | buf = b"1234567890" |
| 174 | bytesIo = io.BytesIO(buf) |
| 175 | |
| 176 | bytesIo.read(5) |
| 177 | bytesIo.seek(0) |
| 178 | self.assertEquals(buf, bytesIo.read()) |
| 179 | |
| 180 | bytesIo.seek(3) |
| 181 | self.assertEquals(buf[3:], bytesIo.read()) |
| 182 | |
| 183 | def testTell(self): |
| 184 | buf = b"1234567890" |
| 185 | bytesIo = io.BytesIO(buf) |
| 186 | |
| 187 | self.assertEquals(0, bytesIo.tell()) |
| 188 | bytesIo.seek(5) |
| 189 | self.assertEquals(5, bytesIo.tell()) |
| 190 | bytesIo.seek(10000) |
| 191 | self.assertEquals(10000, bytesIo.tell()) |
| 192 | |
Guido van Rossum | a9e2024 | 2007-03-08 00:43:48 +0000 | [diff] [blame^] | 193 | |
Guido van Rossum | 68bbcd2 | 2007-02-27 17:19:33 +0000 | [diff] [blame] | 194 | class BufferedReaderTest(unittest.TestCase): |
Guido van Rossum | a9e2024 | 2007-03-08 00:43:48 +0000 | [diff] [blame^] | 195 | |
Guido van Rossum | 68bbcd2 | 2007-02-27 17:19:33 +0000 | [diff] [blame] | 196 | def testRead(self): |
Guido van Rossum | 01a2752 | 2007-03-07 01:00:12 +0000 | [diff] [blame] | 197 | rawIo = MockIO((b"abc", b"d", b"efg")) |
Guido van Rossum | 68bbcd2 | 2007-02-27 17:19:33 +0000 | [diff] [blame] | 198 | bufIo = io.BufferedReader(rawIo) |
| 199 | |
| 200 | self.assertEquals(b"abcdef", bufIo.read(6)) |
| 201 | |
Guido van Rossum | 01a2752 | 2007-03-07 01:00:12 +0000 | [diff] [blame] | 202 | def testReadNonBlocking(self): |
| 203 | # Inject some None's in there to simulate EWOULDBLOCK |
| 204 | rawIo = MockIO((b"abc", b"d", None, b"efg", None, None)) |
| 205 | bufIo = io.BufferedReader(rawIo) |
| 206 | |
| 207 | self.assertEquals(b"abcd", bufIo.read(6)) |
| 208 | self.assertEquals(b"e", bufIo.read(1)) |
| 209 | self.assertEquals(b"fg", bufIo.read()) |
| 210 | self.assert_(None is bufIo.read()) |
| 211 | self.assertEquals(io.EOF, bufIo.read()) |
| 212 | |
Guido van Rossum | 68bbcd2 | 2007-02-27 17:19:33 +0000 | [diff] [blame] | 213 | def testReadToEof(self): |
Guido van Rossum | 01a2752 | 2007-03-07 01:00:12 +0000 | [diff] [blame] | 214 | rawIo = MockIO((b"abc", b"d", b"efg")) |
Guido van Rossum | 68bbcd2 | 2007-02-27 17:19:33 +0000 | [diff] [blame] | 215 | bufIo = io.BufferedReader(rawIo) |
| 216 | |
| 217 | self.assertEquals(b"abcdefg", bufIo.read(9000)) |
| 218 | |
| 219 | def testReadNoArgs(self): |
Guido van Rossum | 01a2752 | 2007-03-07 01:00:12 +0000 | [diff] [blame] | 220 | rawIo = MockIO((b"abc", b"d", b"efg")) |
Guido van Rossum | 68bbcd2 | 2007-02-27 17:19:33 +0000 | [diff] [blame] | 221 | bufIo = io.BufferedReader(rawIo) |
| 222 | |
| 223 | self.assertEquals(b"abcdefg", bufIo.read()) |
| 224 | |
| 225 | def testFileno(self): |
Guido van Rossum | 01a2752 | 2007-03-07 01:00:12 +0000 | [diff] [blame] | 226 | rawIo = MockIO((b"abc", b"d", b"efg")) |
Guido van Rossum | 68bbcd2 | 2007-02-27 17:19:33 +0000 | [diff] [blame] | 227 | bufIo = io.BufferedReader(rawIo) |
| 228 | |
| 229 | self.assertEquals(42, bufIo.fileno()) |
| 230 | |
| 231 | def testFilenoNoFileno(self): |
Guido van Rossum | 01a2752 | 2007-03-07 01:00:12 +0000 | [diff] [blame] | 232 | # XXX will we always have fileno() function? If so, kill |
Guido van Rossum | 68bbcd2 | 2007-02-27 17:19:33 +0000 | [diff] [blame] | 233 | # this test. Else, write it. |
| 234 | pass |
| 235 | |
Guido van Rossum | a9e2024 | 2007-03-08 00:43:48 +0000 | [diff] [blame^] | 236 | |
Guido van Rossum | 68bbcd2 | 2007-02-27 17:19:33 +0000 | [diff] [blame] | 237 | class BufferedWriterTest(unittest.TestCase): |
Guido van Rossum | a9e2024 | 2007-03-08 00:43:48 +0000 | [diff] [blame^] | 238 | |
Guido van Rossum | 68bbcd2 | 2007-02-27 17:19:33 +0000 | [diff] [blame] | 239 | def testWrite(self): |
| 240 | # Write to the buffered IO but don't overflow the buffer. |
Guido van Rossum | 01a2752 | 2007-03-07 01:00:12 +0000 | [diff] [blame] | 241 | writer = MockIO() |
Guido van Rossum | 68bbcd2 | 2007-02-27 17:19:33 +0000 | [diff] [blame] | 242 | bufIo = io.BufferedWriter(writer, 8) |
| 243 | |
| 244 | bufIo.write(b"abc") |
| 245 | |
| 246 | self.assertFalse(writer._writeStack) |
| 247 | |
| 248 | def testWriteOverflow(self): |
Guido van Rossum | 01a2752 | 2007-03-07 01:00:12 +0000 | [diff] [blame] | 249 | writer = MockIO() |
Guido van Rossum | 68bbcd2 | 2007-02-27 17:19:33 +0000 | [diff] [blame] | 250 | bufIo = io.BufferedWriter(writer, 8) |
| 251 | |
| 252 | bufIo.write(b"abc") |
| 253 | bufIo.write(b"defghijkl") |
| 254 | |
| 255 | self.assertEquals(b"abcdefghijkl", writer._writeStack[0]) |
| 256 | |
Guido van Rossum | 01a2752 | 2007-03-07 01:00:12 +0000 | [diff] [blame] | 257 | def testWriteNonBlocking(self): |
| 258 | raw = MockNonBlockWriterIO((9, 2, 22, -6, 10, 12, 12)) |
| 259 | bufIo = io.BufferedWriter(raw, 8, 16) |
| 260 | |
| 261 | bufIo.write(b"asdf") |
| 262 | bufIo.write(b"asdfa") |
| 263 | self.assertEquals(b"asdfasdfa", raw._write_stack[0]) |
| 264 | |
| 265 | bufIo.write(b"asdfasdfasdf") |
| 266 | self.assertEquals(b"asdfasdfasdf", raw._write_stack[1]) |
| 267 | bufIo.write(b"asdfasdfasdf") |
| 268 | self.assertEquals(b"dfasdfasdf", raw._write_stack[2]) |
| 269 | self.assertEquals(b"asdfasdfasdf", raw._write_stack[3]) |
| 270 | |
| 271 | bufIo.write(b"asdfasdfasdf") |
| 272 | |
| 273 | # XXX I don't like this test. It relies too heavily on how the algorithm |
| 274 | # actually works, which we might change. Refactor later. |
| 275 | |
| 276 | def testFileno(self): |
| 277 | rawIo = MockIO((b"abc", b"d", b"efg")) |
| 278 | bufIo = io.BufferedWriter(rawIo) |
| 279 | |
| 280 | self.assertEquals(42, bufIo.fileno()) |
| 281 | |
Guido van Rossum | 68bbcd2 | 2007-02-27 17:19:33 +0000 | [diff] [blame] | 282 | def testFlush(self): |
Guido van Rossum | 01a2752 | 2007-03-07 01:00:12 +0000 | [diff] [blame] | 283 | writer = MockIO() |
Guido van Rossum | 68bbcd2 | 2007-02-27 17:19:33 +0000 | [diff] [blame] | 284 | bufIo = io.BufferedWriter(writer, 8) |
| 285 | |
| 286 | bufIo.write(b"abc") |
| 287 | bufIo.flush() |
| 288 | |
| 289 | self.assertEquals(b"abc", writer._writeStack[0]) |
| 290 | |
Guido van Rossum | a9e2024 | 2007-03-08 00:43:48 +0000 | [diff] [blame^] | 291 | |
Guido van Rossum | 01a2752 | 2007-03-07 01:00:12 +0000 | [diff] [blame] | 292 | class BufferedRWPairTest(unittest.TestCase): |
Guido van Rossum | a9e2024 | 2007-03-08 00:43:48 +0000 | [diff] [blame^] | 293 | |
Guido van Rossum | 01a2752 | 2007-03-07 01:00:12 +0000 | [diff] [blame] | 294 | def testRWPair(self): |
| 295 | r = MockIO(()) |
| 296 | w = MockIO() |
| 297 | pair = io.BufferedRWPair(r, w) |
| 298 | |
| 299 | # XXX need implementation |
| 300 | |
Guido van Rossum | a9e2024 | 2007-03-08 00:43:48 +0000 | [diff] [blame^] | 301 | |
Guido van Rossum | 01a2752 | 2007-03-07 01:00:12 +0000 | [diff] [blame] | 302 | class BufferedRandom(unittest.TestCase): |
Guido van Rossum | a9e2024 | 2007-03-08 00:43:48 +0000 | [diff] [blame^] | 303 | |
Guido van Rossum | 01a2752 | 2007-03-07 01:00:12 +0000 | [diff] [blame] | 304 | def testReadAndWrite(self): |
| 305 | raw = MockIO((b"asdf", b"ghjk")) |
| 306 | rw = io.BufferedRandom(raw, 8, 12) |
| 307 | |
| 308 | self.assertEqual(b"as", rw.read(2)) |
| 309 | rw.write(b"ddd") |
| 310 | rw.write(b"eee") |
| 311 | self.assertFalse(raw._writeStack) # Buffer writes |
| 312 | self.assertEqual(b"ghjk", rw.read()) # This read forces write flush |
| 313 | self.assertEquals(b"dddeee", raw._writeStack[0]) |
| 314 | |
| 315 | def testSeekAndTell(self): |
| 316 | raw = io.BytesIO(b"asdfghjkl") |
| 317 | rw = io.BufferedRandom(raw) |
| 318 | |
| 319 | self.assertEquals(b"as", rw.read(2)) |
| 320 | self.assertEquals(2, rw.tell()) |
| 321 | rw.seek(0, 0) |
| 322 | self.assertEquals(b"asdf", rw.read(4)) |
| 323 | |
| 324 | rw.write(b"asdf") |
| 325 | rw.seek(0, 0) |
| 326 | self.assertEquals(b"asdfasdfl", rw.read()) |
| 327 | self.assertEquals(9, rw.tell()) |
| 328 | rw.seek(-4, 2) |
| 329 | self.assertEquals(5, rw.tell()) |
| 330 | rw.seek(2, 1) |
| 331 | self.assertEquals(7, rw.tell()) |
| 332 | self.assertEquals(b"fl", rw.read(11)) |
| 333 | |
| 334 | # XXX Tests for open() |
Guido van Rossum | 68bbcd2 | 2007-02-27 17:19:33 +0000 | [diff] [blame] | 335 | |
Guido van Rossum | 28524c7 | 2007-02-27 05:47:44 +0000 | [diff] [blame] | 336 | def test_main(): |
Guido van Rossum | 68bbcd2 | 2007-02-27 17:19:33 +0000 | [diff] [blame] | 337 | test_support.run_unittest(IOTest, BytesIOTest, BufferedReaderTest, |
Guido van Rossum | 01a2752 | 2007-03-07 01:00:12 +0000 | [diff] [blame] | 338 | BufferedWriterTest, BufferedRWPairTest, |
| 339 | BufferedRandom) |
Guido van Rossum | 28524c7 | 2007-02-27 05:47:44 +0000 | [diff] [blame] | 340 | |
| 341 | if __name__ == "__main__": |
| 342 | test_main() |