blob: 3c863a7c837bc17b6865735e09675db6981fe868 [file] [log] [blame]
Neal Norwitze241ce82003-02-17 18:17:05 +00001"Test posix functions"
2
Benjamin Petersonee8712c2008-05-20 21:35:26 +00003from test import support
R. David Murrayeb3615d2009-04-22 02:24:39 +00004
5# Skip these tests if there is no posix module.
6posix = support.import_module('posix')
Neal Norwitze241ce82003-02-17 18:17:05 +00007
Antoine Pitroub7572f02009-12-02 20:46:48 +00008import errno
Ronald Oussorenb6ee4f52010-07-23 13:53:51 +00009import sys
Neal Norwitze241ce82003-02-17 18:17:05 +000010import time
11import os
Charles-François Natali1e045b12011-05-22 20:42:32 +020012import fcntl
Christian Heimesd5e2b6f2008-03-19 21:50:51 +000013import pwd
Benjamin Petersondcf97b92008-07-02 17:30:14 +000014import shutil
Benjamin Peterson052a02b2010-08-17 01:27:09 +000015import stat
Ned Deilyba2eab22011-07-26 13:53:55 -070016import tempfile
Neal Norwitze241ce82003-02-17 18:17:05 +000017import unittest
18import warnings
R. David Murraya21e4ca2009-03-31 23:16:50 +000019
Ned Deilyba2eab22011-07-26 13:53:55 -070020_DUMMY_SYMLINK = os.path.join(tempfile.gettempdir(),
21 support.TESTFN + '-dummy-symlink')
Neal Norwitze241ce82003-02-17 18:17:05 +000022
23class PosixTester(unittest.TestCase):
24
25 def setUp(self):
26 # create empty file
Benjamin Petersonee8712c2008-05-20 21:35:26 +000027 fp = open(support.TESTFN, 'w+')
Neal Norwitze241ce82003-02-17 18:17:05 +000028 fp.close()
Ned Deily3eb67d52011-06-28 00:00:28 -070029 self.teardown_files = [ support.TESTFN ]
Brett Cannonc8d502e2010-03-20 21:53:28 +000030 self._warnings_manager = support.check_warnings()
31 self._warnings_manager.__enter__()
32 warnings.filterwarnings('ignore', '.* potential security risk .*',
33 RuntimeWarning)
Neal Norwitze241ce82003-02-17 18:17:05 +000034
35 def tearDown(self):
Ned Deily3eb67d52011-06-28 00:00:28 -070036 for teardown_file in self.teardown_files:
37 support.unlink(teardown_file)
Brett Cannonc8d502e2010-03-20 21:53:28 +000038 self._warnings_manager.__exit__(None, None, None)
Neal Norwitze241ce82003-02-17 18:17:05 +000039
40 def testNoArgFunctions(self):
41 # test posix functions which take no arguments and have
42 # no side-effects which we need to cleanup (e.g., fork, wait, abort)
Guido van Rossumf0af3e32008-10-02 18:55:37 +000043 NO_ARG_FUNCTIONS = [ "ctermid", "getcwd", "getcwdb", "uname",
Guido van Rossum687b9c02007-10-25 23:18:51 +000044 "times", "getloadavg",
Neal Norwitze241ce82003-02-17 18:17:05 +000045 "getegid", "geteuid", "getgid", "getgroups",
Ross Lagerwall7807c352011-03-17 20:20:30 +020046 "getpid", "getpgrp", "getppid", "getuid", "sync",
Neal Norwitze241ce82003-02-17 18:17:05 +000047 ]
Neal Norwitz71b13e82003-02-23 22:12:24 +000048
Neal Norwitze241ce82003-02-17 18:17:05 +000049 for name in NO_ARG_FUNCTIONS:
50 posix_func = getattr(posix, name, None)
51 if posix_func is not None:
52 posix_func()
Neal Norwitz2ff51a82003-02-17 22:40:31 +000053 self.assertRaises(TypeError, posix_func, 1)
Neal Norwitze241ce82003-02-17 18:17:05 +000054
Martin v. Löwis7aed61a2009-11-27 14:09:49 +000055 if hasattr(posix, 'getresuid'):
56 def test_getresuid(self):
57 user_ids = posix.getresuid()
58 self.assertEqual(len(user_ids), 3)
59 for val in user_ids:
60 self.assertGreaterEqual(val, 0)
61
62 if hasattr(posix, 'getresgid'):
63 def test_getresgid(self):
64 group_ids = posix.getresgid()
65 self.assertEqual(len(group_ids), 3)
66 for val in group_ids:
67 self.assertGreaterEqual(val, 0)
68
69 if hasattr(posix, 'setresuid'):
70 def test_setresuid(self):
71 current_user_ids = posix.getresuid()
72 self.assertIsNone(posix.setresuid(*current_user_ids))
73 # -1 means don't change that value.
74 self.assertIsNone(posix.setresuid(-1, -1, -1))
75
76 def test_setresuid_exception(self):
77 # Don't do this test if someone is silly enough to run us as root.
78 current_user_ids = posix.getresuid()
79 if 0 not in current_user_ids:
80 new_user_ids = (current_user_ids[0]+1, -1, -1)
81 self.assertRaises(OSError, posix.setresuid, *new_user_ids)
82
83 if hasattr(posix, 'setresgid'):
84 def test_setresgid(self):
85 current_group_ids = posix.getresgid()
86 self.assertIsNone(posix.setresgid(*current_group_ids))
87 # -1 means don't change that value.
88 self.assertIsNone(posix.setresgid(-1, -1, -1))
89
90 def test_setresgid_exception(self):
91 # Don't do this test if someone is silly enough to run us as root.
92 current_group_ids = posix.getresgid()
93 if 0 not in current_group_ids:
94 new_group_ids = (current_group_ids[0]+1, -1, -1)
95 self.assertRaises(OSError, posix.setresgid, *new_group_ids)
96
Antoine Pitroub7572f02009-12-02 20:46:48 +000097 @unittest.skipUnless(hasattr(posix, 'initgroups'),
98 "test needs os.initgroups()")
99 def test_initgroups(self):
100 # It takes a string and an integer; check that it raises a TypeError
101 # for other argument lists.
102 self.assertRaises(TypeError, posix.initgroups)
103 self.assertRaises(TypeError, posix.initgroups, None)
104 self.assertRaises(TypeError, posix.initgroups, 3, "foo")
105 self.assertRaises(TypeError, posix.initgroups, "foo", 3, object())
106
107 # If a non-privileged user invokes it, it should fail with OSError
108 # EPERM.
109 if os.getuid() != 0:
110 name = pwd.getpwuid(posix.getuid()).pw_name
111 try:
112 posix.initgroups(name, 13)
113 except OSError as e:
Ezio Melottib3aedd42010-11-20 19:04:17 +0000114 self.assertEqual(e.errno, errno.EPERM)
Antoine Pitroub7572f02009-12-02 20:46:48 +0000115 else:
116 self.fail("Expected OSError to be raised by initgroups")
117
Neal Norwitze241ce82003-02-17 18:17:05 +0000118 def test_statvfs(self):
119 if hasattr(posix, 'statvfs'):
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000120 self.assertTrue(posix.statvfs(os.curdir))
Neal Norwitze241ce82003-02-17 18:17:05 +0000121
122 def test_fstatvfs(self):
123 if hasattr(posix, 'fstatvfs'):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000124 fp = open(support.TESTFN)
Neal Norwitze241ce82003-02-17 18:17:05 +0000125 try:
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000126 self.assertTrue(posix.fstatvfs(fp.fileno()))
Neal Norwitze241ce82003-02-17 18:17:05 +0000127 finally:
128 fp.close()
129
130 def test_ftruncate(self):
131 if hasattr(posix, 'ftruncate'):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000132 fp = open(support.TESTFN, 'w+')
Neal Norwitze241ce82003-02-17 18:17:05 +0000133 try:
134 # we need to have some data to truncate
135 fp.write('test')
136 fp.flush()
137 posix.ftruncate(fp.fileno(), 0)
138 finally:
139 fp.close()
140
Ross Lagerwall7807c352011-03-17 20:20:30 +0200141 @unittest.skipUnless(hasattr(posix, 'truncate'), "test needs posix.truncate()")
142 def test_truncate(self):
143 with open(support.TESTFN, 'w') as fp:
144 fp.write('test')
145 fp.flush()
146 posix.truncate(support.TESTFN, 0)
147
148 @unittest.skipUnless(hasattr(posix, 'fexecve'), "test needs posix.fexecve()")
149 @unittest.skipUnless(hasattr(os, 'fork'), "test needs os.fork()")
Ross Lagerwalldedf6cf2011-03-20 18:27:05 +0200150 @unittest.skipUnless(hasattr(os, 'waitpid'), "test needs os.waitpid()")
Ross Lagerwall7807c352011-03-17 20:20:30 +0200151 def test_fexecve(self):
152 fp = os.open(sys.executable, os.O_RDONLY)
153 try:
154 pid = os.fork()
155 if pid == 0:
156 os.chdir(os.path.split(sys.executable)[0])
157 posix.fexecve(fp, [sys.executable, '-c', 'pass'], os.environ)
158 else:
Ross Lagerwalldedf6cf2011-03-20 18:27:05 +0200159 self.assertEqual(os.waitpid(pid, 0), (pid, 0))
Ross Lagerwall7807c352011-03-17 20:20:30 +0200160 finally:
161 os.close(fp)
162
163 @unittest.skipUnless(hasattr(posix, 'waitid'), "test needs posix.waitid()")
164 @unittest.skipUnless(hasattr(os, 'fork'), "test needs os.fork()")
165 def test_waitid(self):
166 pid = os.fork()
167 if pid == 0:
168 os.chdir(os.path.split(sys.executable)[0])
169 posix.execve(sys.executable, [sys.executable, '-c', 'pass'], os.environ)
170 else:
171 res = posix.waitid(posix.P_PID, pid, posix.WEXITED)
172 self.assertEqual(pid, res.si_pid)
173
174 @unittest.skipUnless(hasattr(posix, 'lockf'), "test needs posix.lockf()")
175 def test_lockf(self):
176 fd = os.open(support.TESTFN, os.O_WRONLY | os.O_CREAT)
177 try:
178 os.write(fd, b'test')
179 os.lseek(fd, 0, os.SEEK_SET)
180 posix.lockf(fd, posix.F_LOCK, 4)
181 # section is locked
182 posix.lockf(fd, posix.F_ULOCK, 4)
183 finally:
184 os.close(fd)
185
186 @unittest.skipUnless(hasattr(posix, 'pread'), "test needs posix.pread()")
187 def test_pread(self):
188 fd = os.open(support.TESTFN, os.O_RDWR | os.O_CREAT)
189 try:
190 os.write(fd, b'test')
191 os.lseek(fd, 0, os.SEEK_SET)
192 self.assertEqual(b'es', posix.pread(fd, 2, 1))
Florent Xiclunae41f0de2011-11-11 19:39:25 +0100193 # the first pread() shouldn't disturb the file offset
Ross Lagerwall7807c352011-03-17 20:20:30 +0200194 self.assertEqual(b'te', posix.read(fd, 2))
195 finally:
196 os.close(fd)
197
198 @unittest.skipUnless(hasattr(posix, 'pwrite'), "test needs posix.pwrite()")
199 def test_pwrite(self):
200 fd = os.open(support.TESTFN, os.O_RDWR | os.O_CREAT)
201 try:
202 os.write(fd, b'test')
203 os.lseek(fd, 0, os.SEEK_SET)
204 posix.pwrite(fd, b'xx', 1)
205 self.assertEqual(b'txxt', posix.read(fd, 4))
206 finally:
207 os.close(fd)
208
209 @unittest.skipUnless(hasattr(posix, 'posix_fallocate'),
210 "test needs posix.posix_fallocate()")
211 def test_posix_fallocate(self):
212 fd = os.open(support.TESTFN, os.O_WRONLY | os.O_CREAT)
213 try:
214 posix.posix_fallocate(fd, 0, 10)
215 except OSError as inst:
216 # issue10812, ZFS doesn't appear to support posix_fallocate,
217 # so skip Solaris-based since they are likely to have ZFS.
218 if inst.errno != errno.EINVAL or not sys.platform.startswith("sunos"):
219 raise
220 finally:
221 os.close(fd)
222
223 @unittest.skipUnless(hasattr(posix, 'posix_fadvise'),
224 "test needs posix.posix_fadvise()")
225 def test_posix_fadvise(self):
226 fd = os.open(support.TESTFN, os.O_RDONLY)
227 try:
228 posix.posix_fadvise(fd, 0, 0, posix.POSIX_FADV_WILLNEED)
229 finally:
230 os.close(fd)
231
232 @unittest.skipUnless(hasattr(posix, 'futimes'), "test needs posix.futimes()")
233 def test_futimes(self):
234 now = time.time()
235 fd = os.open(support.TESTFN, os.O_RDONLY)
236 try:
237 posix.futimes(fd, None)
Brian Curtinc1b65d12011-11-07 14:18:54 -0600238 posix.futimes(fd)
Ross Lagerwall7807c352011-03-17 20:20:30 +0200239 self.assertRaises(TypeError, posix.futimes, fd, (None, None))
240 self.assertRaises(TypeError, posix.futimes, fd, (now, None))
241 self.assertRaises(TypeError, posix.futimes, fd, (None, now))
242 posix.futimes(fd, (int(now), int(now)))
243 posix.futimes(fd, (now, now))
244 finally:
245 os.close(fd)
246
247 @unittest.skipUnless(hasattr(posix, 'lutimes'), "test needs posix.lutimes()")
248 def test_lutimes(self):
249 now = time.time()
250 posix.lutimes(support.TESTFN, None)
251 self.assertRaises(TypeError, posix.lutimes, support.TESTFN, (None, None))
252 self.assertRaises(TypeError, posix.lutimes, support.TESTFN, (now, None))
253 self.assertRaises(TypeError, posix.lutimes, support.TESTFN, (None, now))
254 posix.lutimes(support.TESTFN, (int(now), int(now)))
255 posix.lutimes(support.TESTFN, (now, now))
Brian Curtinc1b65d12011-11-07 14:18:54 -0600256 posix.lutimes(support.TESTFN)
Ross Lagerwall7807c352011-03-17 20:20:30 +0200257
258 @unittest.skipUnless(hasattr(posix, 'futimens'), "test needs posix.futimens()")
259 def test_futimens(self):
260 now = time.time()
261 fd = os.open(support.TESTFN, os.O_RDONLY)
262 try:
263 self.assertRaises(TypeError, posix.futimens, fd, (None, None), (None, None))
264 self.assertRaises(TypeError, posix.futimens, fd, (now, 0), None)
265 self.assertRaises(TypeError, posix.futimens, fd, None, (now, 0))
266 posix.futimens(fd, (int(now), int((now - int(now)) * 1e9)),
267 (int(now), int((now - int(now)) * 1e9)))
Brian Curtinc1b65d12011-11-07 14:18:54 -0600268 posix.futimens(fd)
Ross Lagerwall7807c352011-03-17 20:20:30 +0200269 finally:
270 os.close(fd)
271
272 @unittest.skipUnless(hasattr(posix, 'writev'), "test needs posix.writev()")
273 def test_writev(self):
274 fd = os.open(support.TESTFN, os.O_RDWR | os.O_CREAT)
275 try:
276 os.writev(fd, (b'test1', b'tt2', b't3'))
277 os.lseek(fd, 0, os.SEEK_SET)
278 self.assertEqual(b'test1tt2t3', posix.read(fd, 10))
279 finally:
280 os.close(fd)
281
282 @unittest.skipUnless(hasattr(posix, 'readv'), "test needs posix.readv()")
283 def test_readv(self):
284 fd = os.open(support.TESTFN, os.O_RDWR | os.O_CREAT)
285 try:
286 os.write(fd, b'test1tt2t3')
287 os.lseek(fd, 0, os.SEEK_SET)
288 buf = [bytearray(i) for i in [5, 3, 2]]
289 self.assertEqual(posix.readv(fd, buf), 10)
290 self.assertEqual([b'test1', b'tt2', b't3'], [bytes(i) for i in buf])
291 finally:
292 os.close(fd)
293
Neal Norwitze241ce82003-02-17 18:17:05 +0000294 def test_dup(self):
295 if hasattr(posix, 'dup'):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000296 fp = open(support.TESTFN)
Neal Norwitze241ce82003-02-17 18:17:05 +0000297 try:
298 fd = posix.dup(fp.fileno())
Ezio Melottie9615932010-01-24 19:26:24 +0000299 self.assertIsInstance(fd, int)
Neal Norwitze241ce82003-02-17 18:17:05 +0000300 os.close(fd)
301 finally:
302 fp.close()
303
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000304 def test_confstr(self):
305 if hasattr(posix, 'confstr'):
306 self.assertRaises(ValueError, posix.confstr, "CS_garbage")
307 self.assertEqual(len(posix.confstr("CS_PATH")) > 0, True)
308
Neal Norwitze241ce82003-02-17 18:17:05 +0000309 def test_dup2(self):
310 if hasattr(posix, 'dup2'):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000311 fp1 = open(support.TESTFN)
312 fp2 = open(support.TESTFN)
Neal Norwitze241ce82003-02-17 18:17:05 +0000313 try:
314 posix.dup2(fp1.fileno(), fp2.fileno())
315 finally:
316 fp1.close()
317 fp2.close()
318
Charles-François Natali1e045b12011-05-22 20:42:32 +0200319 @unittest.skipUnless(hasattr(os, 'O_CLOEXEC'), "needs os.O_CLOEXEC")
Charles-François Natali239bb962011-06-03 12:55:15 +0200320 @support.requires_linux_version(2, 6, 23)
Charles-François Natali1e045b12011-05-22 20:42:32 +0200321 def test_oscloexec(self):
322 fd = os.open(support.TESTFN, os.O_RDONLY|os.O_CLOEXEC)
323 self.addCleanup(os.close, fd)
Victor Stinnere36f3752011-05-24 00:29:43 +0200324 self.assertTrue(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC)
Charles-François Natali1e045b12011-05-22 20:42:32 +0200325
Skip Montanaro98470002005-06-17 01:14:49 +0000326 def test_osexlock(self):
327 if hasattr(posix, "O_EXLOCK"):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000328 fd = os.open(support.TESTFN,
Skip Montanaro98470002005-06-17 01:14:49 +0000329 os.O_WRONLY|os.O_EXLOCK|os.O_CREAT)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000330 self.assertRaises(OSError, os.open, support.TESTFN,
Skip Montanaro98470002005-06-17 01:14:49 +0000331 os.O_WRONLY|os.O_EXLOCK|os.O_NONBLOCK)
332 os.close(fd)
333
334 if hasattr(posix, "O_SHLOCK"):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000335 fd = os.open(support.TESTFN,
Skip Montanaro98470002005-06-17 01:14:49 +0000336 os.O_WRONLY|os.O_SHLOCK|os.O_CREAT)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000337 self.assertRaises(OSError, os.open, support.TESTFN,
Skip Montanaro98470002005-06-17 01:14:49 +0000338 os.O_WRONLY|os.O_EXLOCK|os.O_NONBLOCK)
339 os.close(fd)
340
341 def test_osshlock(self):
342 if hasattr(posix, "O_SHLOCK"):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000343 fd1 = os.open(support.TESTFN,
Skip Montanaro98470002005-06-17 01:14:49 +0000344 os.O_WRONLY|os.O_SHLOCK|os.O_CREAT)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000345 fd2 = os.open(support.TESTFN,
Skip Montanaro98470002005-06-17 01:14:49 +0000346 os.O_WRONLY|os.O_SHLOCK|os.O_CREAT)
347 os.close(fd2)
348 os.close(fd1)
349
350 if hasattr(posix, "O_EXLOCK"):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000351 fd = os.open(support.TESTFN,
Skip Montanaro98470002005-06-17 01:14:49 +0000352 os.O_WRONLY|os.O_SHLOCK|os.O_CREAT)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000353 self.assertRaises(OSError, os.open, support.TESTFN,
Skip Montanaro98470002005-06-17 01:14:49 +0000354 os.O_RDONLY|os.O_EXLOCK|os.O_NONBLOCK)
355 os.close(fd)
356
Neal Norwitze241ce82003-02-17 18:17:05 +0000357 def test_fstat(self):
358 if hasattr(posix, 'fstat'):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000359 fp = open(support.TESTFN)
Neal Norwitze241ce82003-02-17 18:17:05 +0000360 try:
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000361 self.assertTrue(posix.fstat(fp.fileno()))
Neal Norwitze241ce82003-02-17 18:17:05 +0000362 finally:
363 fp.close()
364
365 def test_stat(self):
366 if hasattr(posix, 'stat'):
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000367 self.assertTrue(posix.stat(support.TESTFN))
Neal Norwitze241ce82003-02-17 18:17:05 +0000368
Benjamin Peterson052a02b2010-08-17 01:27:09 +0000369 @unittest.skipUnless(hasattr(posix, 'mkfifo'), "don't have mkfifo()")
370 def test_mkfifo(self):
371 support.unlink(support.TESTFN)
372 posix.mkfifo(support.TESTFN, stat.S_IRUSR | stat.S_IWUSR)
373 self.assertTrue(stat.S_ISFIFO(posix.stat(support.TESTFN).st_mode))
374
375 @unittest.skipUnless(hasattr(posix, 'mknod') and hasattr(stat, 'S_IFIFO'),
376 "don't have mknod()/S_IFIFO")
377 def test_mknod(self):
378 # Test using mknod() to create a FIFO (the only use specified
379 # by POSIX).
380 support.unlink(support.TESTFN)
381 mode = stat.S_IFIFO | stat.S_IRUSR | stat.S_IWUSR
382 try:
383 posix.mknod(support.TESTFN, mode, 0)
384 except OSError as e:
385 # Some old systems don't allow unprivileged users to use
386 # mknod(), or only support creating device nodes.
387 self.assertIn(e.errno, (errno.EPERM, errno.EINVAL))
388 else:
389 self.assertTrue(stat.S_ISFIFO(posix.stat(support.TESTFN).st_mode))
390
Benjamin Peterson1baf4652009-12-31 03:11:23 +0000391 def _test_all_chown_common(self, chown_func, first_param):
392 """Common code for chown, fchown and lchown tests."""
393 if os.getuid() == 0:
394 try:
395 # Many linux distros have a nfsnobody user as MAX_UID-2
396 # that makes a good test case for signedness issues.
397 # http://bugs.python.org/issue1747858
398 # This part of the test only runs when run as root.
399 # Only scary people run their tests as root.
400 ent = pwd.getpwnam('nfsnobody')
401 chown_func(first_param, ent.pw_uid, ent.pw_gid)
402 except KeyError:
403 pass
404 else:
405 # non-root cannot chown to root, raises OSError
406 self.assertRaises(OSError, chown_func,
407 first_param, 0, 0)
408 # test a successful chown call
409 chown_func(first_param, os.getuid(), os.getgid())
Christian Heimesd5e2b6f2008-03-19 21:50:51 +0000410
Benjamin Peterson1baf4652009-12-31 03:11:23 +0000411 @unittest.skipUnless(hasattr(posix, 'chown'), "test needs os.chown()")
412 def test_chown(self):
413 # raise an OSError if the file does not exist
414 os.unlink(support.TESTFN)
415 self.assertRaises(OSError, posix.chown, support.TESTFN, -1, -1)
Christian Heimesd5e2b6f2008-03-19 21:50:51 +0000416
Benjamin Peterson1baf4652009-12-31 03:11:23 +0000417 # re-create the file
Victor Stinnerbf816222011-06-30 23:25:47 +0200418 support.create_empty_file(support.TESTFN)
Benjamin Peterson1baf4652009-12-31 03:11:23 +0000419 self._test_all_chown_common(posix.chown, support.TESTFN)
420
421 @unittest.skipUnless(hasattr(posix, 'fchown'), "test needs os.fchown()")
422 def test_fchown(self):
423 os.unlink(support.TESTFN)
424
425 # re-create the file
426 test_file = open(support.TESTFN, 'w')
427 try:
428 fd = test_file.fileno()
429 self._test_all_chown_common(posix.fchown, fd)
430 finally:
431 test_file.close()
432
433 @unittest.skipUnless(hasattr(posix, 'lchown'), "test needs os.lchown()")
434 def test_lchown(self):
435 os.unlink(support.TESTFN)
436 # create a symlink
Ned Deily3eb67d52011-06-28 00:00:28 -0700437 os.symlink(_DUMMY_SYMLINK, support.TESTFN)
Benjamin Peterson1baf4652009-12-31 03:11:23 +0000438 self._test_all_chown_common(posix.lchown, support.TESTFN)
Christian Heimesd5e2b6f2008-03-19 21:50:51 +0000439
Neal Norwitze241ce82003-02-17 18:17:05 +0000440 def test_chdir(self):
441 if hasattr(posix, 'chdir'):
442 posix.chdir(os.curdir)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000443 self.assertRaises(OSError, posix.chdir, support.TESTFN)
Neal Norwitze241ce82003-02-17 18:17:05 +0000444
Martin v. Löwisc9e1c7d2010-07-23 12:16:41 +0000445 def test_listdir(self):
446 if hasattr(posix, 'listdir'):
447 self.assertTrue(support.TESTFN in posix.listdir(os.curdir))
448
449 def test_listdir_default(self):
450 # When listdir is called without argument, it's the same as listdir(os.curdir)
451 if hasattr(posix, 'listdir'):
452 self.assertTrue(support.TESTFN in posix.listdir())
Neal Norwitze241ce82003-02-17 18:17:05 +0000453
Antoine Pitrou8250e232011-02-25 23:41:16 +0000454 @unittest.skipUnless(hasattr(posix, 'fdlistdir'), "test needs posix.fdlistdir()")
455 def test_fdlistdir(self):
456 f = posix.open(posix.getcwd(), posix.O_RDONLY)
457 self.assertEqual(
458 sorted(posix.listdir('.')),
459 sorted(posix.fdlistdir(f))
460 )
461 # Check the fd was closed by fdlistdir
462 with self.assertRaises(OSError) as ctx:
463 posix.close(f)
464 self.assertEqual(ctx.exception.errno, errno.EBADF)
465
Neal Norwitze241ce82003-02-17 18:17:05 +0000466 def test_access(self):
467 if hasattr(posix, 'access'):
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000468 self.assertTrue(posix.access(support.TESTFN, os.R_OK))
Neal Norwitze241ce82003-02-17 18:17:05 +0000469
470 def test_umask(self):
471 if hasattr(posix, 'umask'):
472 old_mask = posix.umask(0)
Ezio Melottie9615932010-01-24 19:26:24 +0000473 self.assertIsInstance(old_mask, int)
Neal Norwitze241ce82003-02-17 18:17:05 +0000474 posix.umask(old_mask)
475
476 def test_strerror(self):
477 if hasattr(posix, 'strerror'):
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000478 self.assertTrue(posix.strerror(0))
Neal Norwitze241ce82003-02-17 18:17:05 +0000479
480 def test_pipe(self):
481 if hasattr(posix, 'pipe'):
482 reader, writer = posix.pipe()
483 os.close(reader)
484 os.close(writer)
485
Charles-François Natalidaafdd52011-05-29 20:07:40 +0200486 @unittest.skipUnless(hasattr(os, 'pipe2'), "test needs os.pipe2()")
Charles-François Natali239bb962011-06-03 12:55:15 +0200487 @support.requires_linux_version(2, 6, 27)
Charles-François Natalidaafdd52011-05-29 20:07:40 +0200488 def test_pipe2(self):
489 self.assertRaises(TypeError, os.pipe2, 'DEADBEEF')
490 self.assertRaises(TypeError, os.pipe2, 0, 0)
491
Charles-François Natali368f34b2011-06-06 19:49:47 +0200492 # try calling with flags = 0, like os.pipe()
493 r, w = os.pipe2(0)
Charles-François Natalidaafdd52011-05-29 20:07:40 +0200494 os.close(r)
495 os.close(w)
496
497 # test flags
498 r, w = os.pipe2(os.O_CLOEXEC|os.O_NONBLOCK)
499 self.addCleanup(os.close, r)
500 self.addCleanup(os.close, w)
501 self.assertTrue(fcntl.fcntl(r, fcntl.F_GETFD) & fcntl.FD_CLOEXEC)
502 self.assertTrue(fcntl.fcntl(w, fcntl.F_GETFD) & fcntl.FD_CLOEXEC)
503 # try reading from an empty pipe: this should fail, not block
504 self.assertRaises(OSError, os.read, r, 1)
505 # try a write big enough to fill-up the pipe: this should either
506 # fail or perform a partial write, not block
507 try:
508 os.write(w, b'x' * support.PIPE_MAX_SIZE)
509 except OSError:
510 pass
511
Neal Norwitze241ce82003-02-17 18:17:05 +0000512 def test_utime(self):
513 if hasattr(posix, 'utime'):
514 now = time.time()
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000515 posix.utime(support.TESTFN, None)
516 self.assertRaises(TypeError, posix.utime, support.TESTFN, (None, None))
517 self.assertRaises(TypeError, posix.utime, support.TESTFN, (now, None))
518 self.assertRaises(TypeError, posix.utime, support.TESTFN, (None, now))
519 posix.utime(support.TESTFN, (int(now), int(now)))
520 posix.utime(support.TESTFN, (now, now))
Neal Norwitze241ce82003-02-17 18:17:05 +0000521
Ned Deily3eb67d52011-06-28 00:00:28 -0700522 def _test_chflags_regular_file(self, chflags_func, target_file):
523 st = os.stat(target_file)
524 self.assertTrue(hasattr(st, 'st_flags'))
525 chflags_func(target_file, st.st_flags | stat.UF_IMMUTABLE)
526 try:
527 new_st = os.stat(target_file)
528 self.assertEqual(st.st_flags | stat.UF_IMMUTABLE, new_st.st_flags)
529 try:
530 fd = open(target_file, 'w+')
531 except IOError as e:
532 self.assertEqual(e.errno, errno.EPERM)
533 finally:
534 posix.chflags(target_file, st.st_flags)
Thomas Wouterscf297e42007-02-23 15:07:44 +0000535
Ned Deily3eb67d52011-06-28 00:00:28 -0700536 @unittest.skipUnless(hasattr(posix, 'chflags'), 'test needs os.chflags()')
537 def test_chflags(self):
538 self._test_chflags_regular_file(posix.chflags, support.TESTFN)
539
540 @unittest.skipUnless(hasattr(posix, 'lchflags'), 'test needs os.lchflags()')
541 def test_lchflags_regular_file(self):
542 self._test_chflags_regular_file(posix.lchflags, support.TESTFN)
543
544 @unittest.skipUnless(hasattr(posix, 'lchflags'), 'test needs os.lchflags()')
545 def test_lchflags_symlink(self):
546 testfn_st = os.stat(support.TESTFN)
547
548 self.assertTrue(hasattr(testfn_st, 'st_flags'))
549
550 os.symlink(support.TESTFN, _DUMMY_SYMLINK)
551 self.teardown_files.append(_DUMMY_SYMLINK)
552 dummy_symlink_st = os.lstat(_DUMMY_SYMLINK)
553
554 posix.lchflags(_DUMMY_SYMLINK,
555 dummy_symlink_st.st_flags | stat.UF_IMMUTABLE)
556 try:
557 new_testfn_st = os.stat(support.TESTFN)
558 new_dummy_symlink_st = os.lstat(_DUMMY_SYMLINK)
559
560 self.assertEqual(testfn_st.st_flags, new_testfn_st.st_flags)
561 self.assertEqual(dummy_symlink_st.st_flags | stat.UF_IMMUTABLE,
562 new_dummy_symlink_st.st_flags)
563 finally:
564 posix.lchflags(_DUMMY_SYMLINK, dummy_symlink_st.st_flags)
Thomas Wouterscf297e42007-02-23 15:07:44 +0000565
Guido van Rossum98297ee2007-11-06 21:34:58 +0000566 def test_environ(self):
Victor Stinner17b490d2010-05-06 22:19:30 +0000567 if os.name == "nt":
568 item_type = str
569 else:
570 item_type = bytes
Guido van Rossum98297ee2007-11-06 21:34:58 +0000571 for k, v in posix.environ.items():
Victor Stinner17b490d2010-05-06 22:19:30 +0000572 self.assertEqual(type(k), item_type)
573 self.assertEqual(type(v), item_type)
Guido van Rossum98297ee2007-11-06 21:34:58 +0000574
Benjamin Petersondcf97b92008-07-02 17:30:14 +0000575 def test_getcwd_long_pathnames(self):
576 if hasattr(posix, 'getcwd'):
577 dirname = 'getcwd-test-directory-0123456789abcdef-01234567890abcdef'
578 curdir = os.getcwd()
579 base_path = os.path.abspath(support.TESTFN) + '.getcwd'
580
581 try:
582 os.mkdir(base_path)
583 os.chdir(base_path)
584 except:
Benjamin Petersone549ead2009-03-28 21:42:05 +0000585# Just returning nothing instead of the SkipTest exception,
Benjamin Petersondcf97b92008-07-02 17:30:14 +0000586# because the test results in Error in that case.
587# Is that ok?
Benjamin Petersone549ead2009-03-28 21:42:05 +0000588# raise unittest.SkipTest("cannot create directory for testing")
Benjamin Petersondcf97b92008-07-02 17:30:14 +0000589 return
590
591 def _create_and_do_getcwd(dirname, current_path_length = 0):
592 try:
593 os.mkdir(dirname)
594 except:
Benjamin Petersone549ead2009-03-28 21:42:05 +0000595 raise unittest.SkipTest("mkdir cannot create directory sufficiently deep for getcwd test")
Benjamin Petersondcf97b92008-07-02 17:30:14 +0000596
597 os.chdir(dirname)
598 try:
599 os.getcwd()
600 if current_path_length < 1027:
601 _create_and_do_getcwd(dirname, current_path_length + len(dirname) + 1)
602 finally:
603 os.chdir('..')
604 os.rmdir(dirname)
605
606 _create_and_do_getcwd(dirname)
607
608 finally:
Benjamin Petersondcf97b92008-07-02 17:30:14 +0000609 os.chdir(curdir)
R. David Murray414c91f2009-07-09 20:12:31 +0000610 support.rmtree(base_path)
Benjamin Petersondcf97b92008-07-02 17:30:14 +0000611
Ross Lagerwallb0ae53d2011-06-10 07:30:30 +0200612 @unittest.skipUnless(hasattr(posix, 'getgrouplist'), "test needs posix.getgrouplist()")
613 @unittest.skipUnless(hasattr(pwd, 'getpwuid'), "test needs pwd.getpwuid()")
614 @unittest.skipUnless(hasattr(os, 'getuid'), "test needs os.getuid()")
615 def test_getgrouplist(self):
616 with os.popen('id -G') as idg:
617 groups = idg.read().strip()
618
619 if not groups:
620 raise unittest.SkipTest("need working 'id -G'")
621
622 self.assertEqual(
623 set([int(x) for x in groups.split()]),
624 set(posix.getgrouplist(pwd.getpwuid(os.getuid())[0],
625 pwd.getpwuid(os.getuid())[3])))
626
Antoine Pitrou318b8f32011-01-12 18:45:27 +0000627 @unittest.skipUnless(hasattr(os, 'getegid'), "test needs os.getegid()")
Ronald Oussorenb6ee4f52010-07-23 13:53:51 +0000628 def test_getgroups(self):
629 with os.popen('id -G') as idg:
630 groups = idg.read().strip()
631
632 if not groups:
633 raise unittest.SkipTest("need working 'id -G'")
634
Ronald Oussoren7fb6f512010-08-01 19:18:13 +0000635 # 'id -G' and 'os.getgroups()' should return the same
636 # groups, ignoring order and duplicates.
Antoine Pitrou318b8f32011-01-12 18:45:27 +0000637 # #10822 - it is implementation defined whether posix.getgroups()
638 # includes the effective gid so we include it anyway, since id -G does
Ronald Oussorencb615e62010-07-24 14:15:19 +0000639 self.assertEqual(
Ronald Oussoren7fb6f512010-08-01 19:18:13 +0000640 set([int(x) for x in groups.split()]),
Antoine Pitrou318b8f32011-01-12 18:45:27 +0000641 set(posix.getgroups() + [posix.getegid()]))
Ronald Oussorenb6ee4f52010-07-23 13:53:51 +0000642
Antoine Pitrouf65132d2011-02-25 23:25:17 +0000643 # tests for the posix *at functions follow
644
645 @unittest.skipUnless(hasattr(posix, 'faccessat'), "test needs posix.faccessat()")
646 def test_faccessat(self):
647 f = posix.open(posix.getcwd(), posix.O_RDONLY)
648 try:
649 self.assertTrue(posix.faccessat(f, support.TESTFN, os.R_OK))
650 finally:
651 posix.close(f)
652
653 @unittest.skipUnless(hasattr(posix, 'fchmodat'), "test needs posix.fchmodat()")
654 def test_fchmodat(self):
655 os.chmod(support.TESTFN, stat.S_IRUSR)
656
657 f = posix.open(posix.getcwd(), posix.O_RDONLY)
658 try:
659 posix.fchmodat(f, support.TESTFN, stat.S_IRUSR | stat.S_IWUSR)
660
661 s = posix.stat(support.TESTFN)
662 self.assertEqual(s[0] & stat.S_IRWXU, stat.S_IRUSR | stat.S_IWUSR)
663 finally:
664 posix.close(f)
665
666 @unittest.skipUnless(hasattr(posix, 'fchownat'), "test needs posix.fchownat()")
667 def test_fchownat(self):
668 support.unlink(support.TESTFN)
Victor Stinnerbf816222011-06-30 23:25:47 +0200669 support.create_empty_file(support.TESTFN)
Antoine Pitrouf65132d2011-02-25 23:25:17 +0000670
671 f = posix.open(posix.getcwd(), posix.O_RDONLY)
672 try:
673 posix.fchownat(f, support.TESTFN, os.getuid(), os.getgid())
674 finally:
675 posix.close(f)
676
677 @unittest.skipUnless(hasattr(posix, 'fstatat'), "test needs posix.fstatat()")
678 def test_fstatat(self):
679 support.unlink(support.TESTFN)
680 with open(support.TESTFN, 'w') as outfile:
681 outfile.write("testline\n")
682
683 f = posix.open(posix.getcwd(), posix.O_RDONLY)
684 try:
685 s1 = posix.stat(support.TESTFN)
686 s2 = posix.fstatat(f, support.TESTFN)
687 self.assertEqual(s1, s2)
688 finally:
689 posix.close(f)
690
691 @unittest.skipUnless(hasattr(posix, 'futimesat'), "test needs posix.futimesat()")
692 def test_futimesat(self):
693 f = posix.open(posix.getcwd(), posix.O_RDONLY)
694 try:
695 now = time.time()
696 posix.futimesat(f, support.TESTFN, None)
Brian Curtinc1b65d12011-11-07 14:18:54 -0600697 posix.futimesat(f, support.TESTFN)
Antoine Pitrouf65132d2011-02-25 23:25:17 +0000698 self.assertRaises(TypeError, posix.futimesat, f, support.TESTFN, (None, None))
699 self.assertRaises(TypeError, posix.futimesat, f, support.TESTFN, (now, None))
700 self.assertRaises(TypeError, posix.futimesat, f, support.TESTFN, (None, now))
701 posix.futimesat(f, support.TESTFN, (int(now), int(now)))
702 posix.futimesat(f, support.TESTFN, (now, now))
703 finally:
704 posix.close(f)
705
706 @unittest.skipUnless(hasattr(posix, 'linkat'), "test needs posix.linkat()")
707 def test_linkat(self):
708 f = posix.open(posix.getcwd(), posix.O_RDONLY)
709 try:
710 posix.linkat(f, support.TESTFN, f, support.TESTFN + 'link')
711 # should have same inodes
712 self.assertEqual(posix.stat(support.TESTFN)[1],
713 posix.stat(support.TESTFN + 'link')[1])
714 finally:
715 posix.close(f)
716 support.unlink(support.TESTFN + 'link')
717
718 @unittest.skipUnless(hasattr(posix, 'mkdirat'), "test needs posix.mkdirat()")
719 def test_mkdirat(self):
720 f = posix.open(posix.getcwd(), posix.O_RDONLY)
721 try:
722 posix.mkdirat(f, support.TESTFN + 'dir')
723 posix.stat(support.TESTFN + 'dir') # should not raise exception
724 finally:
725 posix.close(f)
726 support.rmtree(support.TESTFN + 'dir')
727
728 @unittest.skipUnless(hasattr(posix, 'mknodat') and hasattr(stat, 'S_IFIFO'),
729 "don't have mknodat()/S_IFIFO")
730 def test_mknodat(self):
731 # Test using mknodat() to create a FIFO (the only use specified
732 # by POSIX).
733 support.unlink(support.TESTFN)
734 mode = stat.S_IFIFO | stat.S_IRUSR | stat.S_IWUSR
735 f = posix.open(posix.getcwd(), posix.O_RDONLY)
736 try:
737 posix.mknodat(f, support.TESTFN, mode, 0)
738 except OSError as e:
739 # Some old systems don't allow unprivileged users to use
740 # mknod(), or only support creating device nodes.
741 self.assertIn(e.errno, (errno.EPERM, errno.EINVAL))
742 else:
743 self.assertTrue(stat.S_ISFIFO(posix.stat(support.TESTFN).st_mode))
744 finally:
745 posix.close(f)
746
747 @unittest.skipUnless(hasattr(posix, 'openat'), "test needs posix.openat()")
748 def test_openat(self):
749 support.unlink(support.TESTFN)
750 with open(support.TESTFN, 'w') as outfile:
751 outfile.write("testline\n")
752 a = posix.open(posix.getcwd(), posix.O_RDONLY)
753 b = posix.openat(a, support.TESTFN, posix.O_RDONLY)
754 try:
755 res = posix.read(b, 9).decode(encoding="utf-8")
756 self.assertEqual("testline\n", res)
757 finally:
758 posix.close(a)
759 posix.close(b)
760
761 @unittest.skipUnless(hasattr(posix, 'readlinkat'), "test needs posix.readlinkat()")
762 def test_readlinkat(self):
763 os.symlink(support.TESTFN, support.TESTFN + 'link')
764 f = posix.open(posix.getcwd(), posix.O_RDONLY)
765 try:
766 self.assertEqual(posix.readlink(support.TESTFN + 'link'),
767 posix.readlinkat(f, support.TESTFN + 'link'))
768 finally:
769 support.unlink(support.TESTFN + 'link')
770 posix.close(f)
771
772 @unittest.skipUnless(hasattr(posix, 'renameat'), "test needs posix.renameat()")
773 def test_renameat(self):
774 support.unlink(support.TESTFN)
Victor Stinnerbf816222011-06-30 23:25:47 +0200775 support.create_empty_file(support.TESTFN + 'ren')
Antoine Pitrouf65132d2011-02-25 23:25:17 +0000776 f = posix.open(posix.getcwd(), posix.O_RDONLY)
777 try:
778 posix.renameat(f, support.TESTFN + 'ren', f, support.TESTFN)
779 except:
780 posix.rename(support.TESTFN + 'ren', support.TESTFN)
781 raise
782 else:
783 posix.stat(support.TESTFN) # should not throw exception
784 finally:
785 posix.close(f)
786
787 @unittest.skipUnless(hasattr(posix, 'symlinkat'), "test needs posix.symlinkat()")
788 def test_symlinkat(self):
789 f = posix.open(posix.getcwd(), posix.O_RDONLY)
790 try:
791 posix.symlinkat(support.TESTFN, f, support.TESTFN + 'link')
792 self.assertEqual(posix.readlink(support.TESTFN + 'link'), support.TESTFN)
793 finally:
794 posix.close(f)
795 support.unlink(support.TESTFN + 'link')
796
797 @unittest.skipUnless(hasattr(posix, 'unlinkat'), "test needs posix.unlinkat()")
798 def test_unlinkat(self):
799 f = posix.open(posix.getcwd(), posix.O_RDONLY)
Victor Stinnerbf816222011-06-30 23:25:47 +0200800 support.create_empty_file(support.TESTFN + 'del')
Antoine Pitrouf65132d2011-02-25 23:25:17 +0000801 posix.stat(support.TESTFN + 'del') # should not throw exception
802 try:
803 posix.unlinkat(f, support.TESTFN + 'del')
804 except:
805 support.unlink(support.TESTFN + 'del')
806 raise
807 else:
808 self.assertRaises(OSError, posix.stat, support.TESTFN + 'link')
809 finally:
810 posix.close(f)
811
812 @unittest.skipUnless(hasattr(posix, 'utimensat'), "test needs posix.utimensat()")
813 def test_utimensat(self):
814 f = posix.open(posix.getcwd(), posix.O_RDONLY)
815 try:
816 now = time.time()
817 posix.utimensat(f, support.TESTFN, None, None)
Brian Curtin569b4942011-11-07 16:09:20 -0600818 posix.utimensat(f, support.TESTFN)
819 posix.utimensat(f, support.TESTFN, flags=os.AT_SYMLINK_NOFOLLOW)
Antoine Pitrouf65132d2011-02-25 23:25:17 +0000820 self.assertRaises(TypeError, posix.utimensat, f, support.TESTFN, (None, None), (None, None))
821 self.assertRaises(TypeError, posix.utimensat, f, support.TESTFN, (now, 0), None)
822 self.assertRaises(TypeError, posix.utimensat, f, support.TESTFN, None, (now, 0))
823 posix.utimensat(f, support.TESTFN, (int(now), int((now - int(now)) * 1e9)),
824 (int(now), int((now - int(now)) * 1e9)))
Brian Curtin569b4942011-11-07 16:09:20 -0600825 posix.utimensat(dirfd=f, path=support.TESTFN,
826 atime=(int(now), int((now - int(now)) * 1e9)),
827 mtime=(int(now), int((now - int(now)) * 1e9)))
Antoine Pitrouf65132d2011-02-25 23:25:17 +0000828 finally:
829 posix.close(f)
830
831 @unittest.skipUnless(hasattr(posix, 'mkfifoat'), "don't have mkfifoat()")
832 def test_mkfifoat(self):
833 support.unlink(support.TESTFN)
834 f = posix.open(posix.getcwd(), posix.O_RDONLY)
835 try:
836 posix.mkfifoat(f, support.TESTFN, stat.S_IRUSR | stat.S_IWUSR)
837 self.assertTrue(stat.S_ISFIFO(posix.stat(support.TESTFN).st_mode))
838 finally:
839 posix.close(f)
840
Benjamin Peterson94b580d2011-08-02 17:30:04 -0500841 requires_sched_h = unittest.skipUnless(hasattr(posix, 'sched_yield'),
842 "don't have scheduling support")
Benjamin Peterson2740af82011-08-02 17:41:34 -0500843 requires_sched_affinity = unittest.skipUnless(hasattr(posix, 'cpu_set'),
Benjamin Peterson50ba2712011-08-02 22:15:40 -0500844 "don't have sched affinity support")
Benjamin Peterson94b580d2011-08-02 17:30:04 -0500845
846 @requires_sched_h
847 def test_sched_yield(self):
848 # This has no error conditions (at least on Linux).
849 posix.sched_yield()
850
851 @requires_sched_h
Charles-François Nataliea0d5fc2011-09-06 19:03:35 +0200852 @unittest.skipUnless(hasattr(posix, 'sched_get_priority_max'),
853 "requires sched_get_priority_max()")
Benjamin Peterson94b580d2011-08-02 17:30:04 -0500854 def test_sched_priority(self):
855 # Round-robin usually has interesting priorities.
856 pol = posix.SCHED_RR
857 lo = posix.sched_get_priority_min(pol)
858 hi = posix.sched_get_priority_max(pol)
859 self.assertIsInstance(lo, int)
860 self.assertIsInstance(hi, int)
861 self.assertGreaterEqual(hi, lo)
Benjamin Peterson539b6c42011-08-02 22:09:37 -0500862 # OSX evidently just returns 15 without checking the argument.
863 if sys.platform != "darwin":
Benjamin Petersonc1581582011-08-02 22:10:55 -0500864 self.assertRaises(OSError, posix.sched_get_priority_min, -23)
865 self.assertRaises(OSError, posix.sched_get_priority_max, -23)
Benjamin Peterson94b580d2011-08-02 17:30:04 -0500866
Benjamin Petersonc5fce4d2011-08-02 18:07:32 -0500867 @unittest.skipUnless(hasattr(posix, 'sched_setscheduler'), "can't change scheduler")
Benjamin Peterson94b580d2011-08-02 17:30:04 -0500868 def test_get_and_set_scheduler_and_param(self):
869 possible_schedulers = [sched for name, sched in posix.__dict__.items()
870 if name.startswith("SCHED_")]
871 mine = posix.sched_getscheduler(0)
872 self.assertIn(mine, possible_schedulers)
873 try:
Jesus Ceaceb5d162011-09-10 01:16:55 +0200874 parent = posix.sched_getscheduler(os.getppid())
Benjamin Peterson94b580d2011-08-02 17:30:04 -0500875 except OSError as e:
Jesus Ceaceb5d162011-09-10 01:16:55 +0200876 if e.errno != errno.EPERM:
Benjamin Peterson94b580d2011-08-02 17:30:04 -0500877 raise
878 else:
Jesus Ceaceb5d162011-09-10 01:16:55 +0200879 self.assertIn(parent, possible_schedulers)
Benjamin Peterson94b580d2011-08-02 17:30:04 -0500880 self.assertRaises(OSError, posix.sched_getscheduler, -1)
881 self.assertRaises(OSError, posix.sched_getparam, -1)
882 param = posix.sched_getparam(0)
883 self.assertIsInstance(param.sched_priority, int)
Benjamin Peterson18592ca2011-08-02 18:48:59 -0500884 try:
885 posix.sched_setscheduler(0, mine, param)
886 except OSError as e:
887 if e.errno != errno.EPERM:
888 raise
Charles-François Natali7b911cb2011-08-21 12:41:43 +0200889
890 # POSIX states that calling sched_setparam() on a process with a
891 # scheduling policy other than SCHED_FIFO or SCHED_RR is
892 # implementation-defined: FreeBSD returns EINVAL.
893 if not sys.platform.startswith('freebsd'):
894 posix.sched_setparam(0, param)
895 self.assertRaises(OSError, posix.sched_setparam, -1, param)
896
Benjamin Peterson94b580d2011-08-02 17:30:04 -0500897 self.assertRaises(OSError, posix.sched_setscheduler, -1, mine, param)
898 self.assertRaises(TypeError, posix.sched_setscheduler, 0, mine, None)
899 self.assertRaises(TypeError, posix.sched_setparam, 0, 43)
900 param = posix.sched_param(None)
901 self.assertRaises(TypeError, posix.sched_setparam, 0, param)
902 large = 214748364700
903 param = posix.sched_param(large)
904 self.assertRaises(OverflowError, posix.sched_setparam, 0, param)
905 param = posix.sched_param(sched_priority=-large)
906 self.assertRaises(OverflowError, posix.sched_setparam, 0, param)
907
Benjamin Petersonc5fce4d2011-08-02 18:07:32 -0500908 @unittest.skipUnless(hasattr(posix, "sched_rr_get_interval"), "no function")
Benjamin Peterson94b580d2011-08-02 17:30:04 -0500909 def test_sched_rr_get_interval(self):
Benjamin Peterson43234ab2011-08-02 22:19:14 -0500910 try:
911 interval = posix.sched_rr_get_interval(0)
912 except OSError as e:
913 # This likely means that sched_rr_get_interval is only valid for
914 # processes with the SCHED_RR scheduler in effect.
915 if e.errno != errno.EINVAL:
916 raise
917 self.skipTest("only works on SCHED_RR processes")
Benjamin Peterson94b580d2011-08-02 17:30:04 -0500918 self.assertIsInstance(interval, float)
919 # Reasonable constraints, I think.
920 self.assertGreaterEqual(interval, 0.)
921 self.assertLess(interval, 1.)
922
Benjamin Peterson2740af82011-08-02 17:41:34 -0500923 @requires_sched_affinity
Benjamin Peterson94b580d2011-08-02 17:30:04 -0500924 def test_sched_affinity(self):
925 mask = posix.sched_getaffinity(0, 1024)
926 self.assertGreaterEqual(mask.count(), 1)
927 self.assertIsInstance(mask, posix.cpu_set)
928 self.assertRaises(OSError, posix.sched_getaffinity, -1, 1024)
929 empty = posix.cpu_set(10)
930 posix.sched_setaffinity(0, mask)
931 self.assertRaises(OSError, posix.sched_setaffinity, 0, empty)
932 self.assertRaises(OSError, posix.sched_setaffinity, -1, mask)
933
Benjamin Peterson2740af82011-08-02 17:41:34 -0500934 @requires_sched_affinity
Benjamin Peterson94b580d2011-08-02 17:30:04 -0500935 def test_cpu_set_basic(self):
936 s = posix.cpu_set(10)
937 self.assertEqual(len(s), 10)
938 self.assertEqual(s.count(), 0)
939 s.set(0)
940 s.set(9)
941 self.assertTrue(s.isset(0))
942 self.assertTrue(s.isset(9))
943 self.assertFalse(s.isset(5))
944 self.assertEqual(s.count(), 2)
945 s.clear(0)
946 self.assertFalse(s.isset(0))
947 self.assertEqual(s.count(), 1)
948 s.zero()
949 self.assertFalse(s.isset(0))
950 self.assertFalse(s.isset(9))
951 self.assertEqual(s.count(), 0)
952 self.assertRaises(ValueError, s.set, -1)
953 self.assertRaises(ValueError, s.set, 10)
954 self.assertRaises(ValueError, s.clear, -1)
955 self.assertRaises(ValueError, s.clear, 10)
956 self.assertRaises(ValueError, s.isset, -1)
957 self.assertRaises(ValueError, s.isset, 10)
958
Benjamin Peterson2740af82011-08-02 17:41:34 -0500959 @requires_sched_affinity
Benjamin Peterson94b580d2011-08-02 17:30:04 -0500960 def test_cpu_set_cmp(self):
961 self.assertNotEqual(posix.cpu_set(11), posix.cpu_set(12))
962 l = posix.cpu_set(10)
963 r = posix.cpu_set(10)
964 self.assertEqual(l, r)
965 l.set(1)
966 self.assertNotEqual(l, r)
967 r.set(1)
968 self.assertEqual(l, r)
969
Benjamin Peterson2740af82011-08-02 17:41:34 -0500970 @requires_sched_affinity
Benjamin Peterson94b580d2011-08-02 17:30:04 -0500971 def test_cpu_set_bitwise(self):
972 l = posix.cpu_set(5)
973 l.set(0)
974 l.set(1)
975 r = posix.cpu_set(5)
976 r.set(1)
977 r.set(2)
978 b = l & r
979 self.assertEqual(b.count(), 1)
980 self.assertTrue(b.isset(1))
981 b = l | r
982 self.assertEqual(b.count(), 3)
983 self.assertTrue(b.isset(0))
984 self.assertTrue(b.isset(1))
985 self.assertTrue(b.isset(2))
986 b = l ^ r
987 self.assertEqual(b.count(), 2)
988 self.assertTrue(b.isset(0))
989 self.assertFalse(b.isset(1))
990 self.assertTrue(b.isset(2))
991 b = l
992 b |= r
993 self.assertIs(b, l)
994 self.assertEqual(l.count(), 3)
995
Victor Stinner8b905bd2011-10-25 13:34:04 +0200996 def test_rtld_constants(self):
997 # check presence of major RTLD_* constants
998 posix.RTLD_LAZY
999 posix.RTLD_NOW
1000 posix.RTLD_GLOBAL
1001 posix.RTLD_LOCAL
1002
Ronald Oussorenb6ee4f52010-07-23 13:53:51 +00001003class PosixGroupsTester(unittest.TestCase):
1004
1005 def setUp(self):
1006 if posix.getuid() != 0:
1007 raise unittest.SkipTest("not enough privileges")
1008 if not hasattr(posix, 'getgroups'):
1009 raise unittest.SkipTest("need posix.getgroups")
1010 if sys.platform == 'darwin':
1011 raise unittest.SkipTest("getgroups(2) is broken on OSX")
1012 self.saved_groups = posix.getgroups()
1013
1014 def tearDown(self):
1015 if hasattr(posix, 'setgroups'):
1016 posix.setgroups(self.saved_groups)
1017 elif hasattr(posix, 'initgroups'):
1018 name = pwd.getpwuid(posix.getuid()).pw_name
1019 posix.initgroups(name, self.saved_groups[0])
1020
1021 @unittest.skipUnless(hasattr(posix, 'initgroups'),
1022 "test needs posix.initgroups()")
1023 def test_initgroups(self):
1024 # find missing group
1025
Antoine Pitroue5a91012010-09-04 17:32:06 +00001026 g = max(self.saved_groups) + 1
Ronald Oussorenb6ee4f52010-07-23 13:53:51 +00001027 name = pwd.getpwuid(posix.getuid()).pw_name
1028 posix.initgroups(name, g)
1029 self.assertIn(g, posix.getgroups())
1030
1031 @unittest.skipUnless(hasattr(posix, 'setgroups'),
1032 "test needs posix.setgroups()")
1033 def test_setgroups(self):
Antoine Pitroue5a91012010-09-04 17:32:06 +00001034 for groups in [[0], list(range(16))]:
Ronald Oussorenb6ee4f52010-07-23 13:53:51 +00001035 posix.setgroups(groups)
1036 self.assertListEqual(groups, posix.getgroups())
1037
Neal Norwitze241ce82003-02-17 18:17:05 +00001038def test_main():
Antoine Pitrou68c95922011-03-20 17:33:57 +01001039 try:
1040 support.run_unittest(PosixTester, PosixGroupsTester)
1041 finally:
1042 support.reap_children()
Neal Norwitze241ce82003-02-17 18:17:05 +00001043
1044if __name__ == '__main__':
1045 test_main()