blob: f70688d8b5803db48cc46824e94c85d8ddc7bcc7 [file] [log] [blame]
Neal Norwitze241ce82003-02-17 18:17:05 +00001"Test posix functions"
2
Benjamin Petersonee8712c2008-05-20 21:35:26 +00003from test import support
R. David Murrayeb3615d2009-04-22 02:24:39 +00004
5# Skip these tests if there is no posix module.
6posix = support.import_module('posix')
Neal Norwitze241ce82003-02-17 18:17:05 +00007
Antoine Pitroub7572f02009-12-02 20:46:48 +00008import errno
Ronald Oussorenb6ee4f52010-07-23 13:53:51 +00009import sys
Neal Norwitze241ce82003-02-17 18:17:05 +000010import time
11import os
Charles-François Natali1e045b12011-05-22 20:42:32 +020012import fcntl
Christian Heimesd5e2b6f2008-03-19 21:50:51 +000013import pwd
Benjamin Petersondcf97b92008-07-02 17:30:14 +000014import shutil
Benjamin Peterson052a02b2010-08-17 01:27:09 +000015import stat
Ned Deilyba2eab22011-07-26 13:53:55 -070016import tempfile
Neal Norwitze241ce82003-02-17 18:17:05 +000017import unittest
18import warnings
R. David Murraya21e4ca2009-03-31 23:16:50 +000019
Ned Deilyba2eab22011-07-26 13:53:55 -070020_DUMMY_SYMLINK = os.path.join(tempfile.gettempdir(),
21 support.TESTFN + '-dummy-symlink')
Neal Norwitze241ce82003-02-17 18:17:05 +000022
23class PosixTester(unittest.TestCase):
24
25 def setUp(self):
26 # create empty file
Benjamin Petersonee8712c2008-05-20 21:35:26 +000027 fp = open(support.TESTFN, 'w+')
Neal Norwitze241ce82003-02-17 18:17:05 +000028 fp.close()
Ned Deily3eb67d52011-06-28 00:00:28 -070029 self.teardown_files = [ support.TESTFN ]
Brett Cannonc8d502e2010-03-20 21:53:28 +000030 self._warnings_manager = support.check_warnings()
31 self._warnings_manager.__enter__()
32 warnings.filterwarnings('ignore', '.* potential security risk .*',
33 RuntimeWarning)
Neal Norwitze241ce82003-02-17 18:17:05 +000034
35 def tearDown(self):
Ned Deily3eb67d52011-06-28 00:00:28 -070036 for teardown_file in self.teardown_files:
37 support.unlink(teardown_file)
Brett Cannonc8d502e2010-03-20 21:53:28 +000038 self._warnings_manager.__exit__(None, None, None)
Neal Norwitze241ce82003-02-17 18:17:05 +000039
40 def testNoArgFunctions(self):
41 # test posix functions which take no arguments and have
42 # no side-effects which we need to cleanup (e.g., fork, wait, abort)
Guido van Rossumf0af3e32008-10-02 18:55:37 +000043 NO_ARG_FUNCTIONS = [ "ctermid", "getcwd", "getcwdb", "uname",
Guido van Rossum687b9c02007-10-25 23:18:51 +000044 "times", "getloadavg",
Neal Norwitze241ce82003-02-17 18:17:05 +000045 "getegid", "geteuid", "getgid", "getgroups",
Ross Lagerwall7807c352011-03-17 20:20:30 +020046 "getpid", "getpgrp", "getppid", "getuid", "sync",
Neal Norwitze241ce82003-02-17 18:17:05 +000047 ]
Neal Norwitz71b13e82003-02-23 22:12:24 +000048
Neal Norwitze241ce82003-02-17 18:17:05 +000049 for name in NO_ARG_FUNCTIONS:
50 posix_func = getattr(posix, name, None)
51 if posix_func is not None:
52 posix_func()
Neal Norwitz2ff51a82003-02-17 22:40:31 +000053 self.assertRaises(TypeError, posix_func, 1)
Neal Norwitze241ce82003-02-17 18:17:05 +000054
Martin v. Löwis7aed61a2009-11-27 14:09:49 +000055 if hasattr(posix, 'getresuid'):
56 def test_getresuid(self):
57 user_ids = posix.getresuid()
58 self.assertEqual(len(user_ids), 3)
59 for val in user_ids:
60 self.assertGreaterEqual(val, 0)
61
62 if hasattr(posix, 'getresgid'):
63 def test_getresgid(self):
64 group_ids = posix.getresgid()
65 self.assertEqual(len(group_ids), 3)
66 for val in group_ids:
67 self.assertGreaterEqual(val, 0)
68
69 if hasattr(posix, 'setresuid'):
70 def test_setresuid(self):
71 current_user_ids = posix.getresuid()
72 self.assertIsNone(posix.setresuid(*current_user_ids))
73 # -1 means don't change that value.
74 self.assertIsNone(posix.setresuid(-1, -1, -1))
75
76 def test_setresuid_exception(self):
77 # Don't do this test if someone is silly enough to run us as root.
78 current_user_ids = posix.getresuid()
79 if 0 not in current_user_ids:
80 new_user_ids = (current_user_ids[0]+1, -1, -1)
81 self.assertRaises(OSError, posix.setresuid, *new_user_ids)
82
83 if hasattr(posix, 'setresgid'):
84 def test_setresgid(self):
85 current_group_ids = posix.getresgid()
86 self.assertIsNone(posix.setresgid(*current_group_ids))
87 # -1 means don't change that value.
88 self.assertIsNone(posix.setresgid(-1, -1, -1))
89
90 def test_setresgid_exception(self):
91 # Don't do this test if someone is silly enough to run us as root.
92 current_group_ids = posix.getresgid()
93 if 0 not in current_group_ids:
94 new_group_ids = (current_group_ids[0]+1, -1, -1)
95 self.assertRaises(OSError, posix.setresgid, *new_group_ids)
96
Antoine Pitroub7572f02009-12-02 20:46:48 +000097 @unittest.skipUnless(hasattr(posix, 'initgroups'),
98 "test needs os.initgroups()")
99 def test_initgroups(self):
100 # It takes a string and an integer; check that it raises a TypeError
101 # for other argument lists.
102 self.assertRaises(TypeError, posix.initgroups)
103 self.assertRaises(TypeError, posix.initgroups, None)
104 self.assertRaises(TypeError, posix.initgroups, 3, "foo")
105 self.assertRaises(TypeError, posix.initgroups, "foo", 3, object())
106
107 # If a non-privileged user invokes it, it should fail with OSError
108 # EPERM.
109 if os.getuid() != 0:
110 name = pwd.getpwuid(posix.getuid()).pw_name
111 try:
112 posix.initgroups(name, 13)
113 except OSError as e:
Ezio Melottib3aedd42010-11-20 19:04:17 +0000114 self.assertEqual(e.errno, errno.EPERM)
Antoine Pitroub7572f02009-12-02 20:46:48 +0000115 else:
116 self.fail("Expected OSError to be raised by initgroups")
117
Neal Norwitze241ce82003-02-17 18:17:05 +0000118 def test_statvfs(self):
119 if hasattr(posix, 'statvfs'):
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000120 self.assertTrue(posix.statvfs(os.curdir))
Neal Norwitze241ce82003-02-17 18:17:05 +0000121
122 def test_fstatvfs(self):
123 if hasattr(posix, 'fstatvfs'):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000124 fp = open(support.TESTFN)
Neal Norwitze241ce82003-02-17 18:17:05 +0000125 try:
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000126 self.assertTrue(posix.fstatvfs(fp.fileno()))
Neal Norwitze241ce82003-02-17 18:17:05 +0000127 finally:
128 fp.close()
129
130 def test_ftruncate(self):
131 if hasattr(posix, 'ftruncate'):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000132 fp = open(support.TESTFN, 'w+')
Neal Norwitze241ce82003-02-17 18:17:05 +0000133 try:
134 # we need to have some data to truncate
135 fp.write('test')
136 fp.flush()
137 posix.ftruncate(fp.fileno(), 0)
138 finally:
139 fp.close()
140
Ross Lagerwall7807c352011-03-17 20:20:30 +0200141 @unittest.skipUnless(hasattr(posix, 'truncate'), "test needs posix.truncate()")
142 def test_truncate(self):
143 with open(support.TESTFN, 'w') as fp:
144 fp.write('test')
145 fp.flush()
146 posix.truncate(support.TESTFN, 0)
147
148 @unittest.skipUnless(hasattr(posix, 'fexecve'), "test needs posix.fexecve()")
149 @unittest.skipUnless(hasattr(os, 'fork'), "test needs os.fork()")
Ross Lagerwalldedf6cf2011-03-20 18:27:05 +0200150 @unittest.skipUnless(hasattr(os, 'waitpid'), "test needs os.waitpid()")
Ross Lagerwall7807c352011-03-17 20:20:30 +0200151 def test_fexecve(self):
152 fp = os.open(sys.executable, os.O_RDONLY)
153 try:
154 pid = os.fork()
155 if pid == 0:
156 os.chdir(os.path.split(sys.executable)[0])
157 posix.fexecve(fp, [sys.executable, '-c', 'pass'], os.environ)
158 else:
Ross Lagerwalldedf6cf2011-03-20 18:27:05 +0200159 self.assertEqual(os.waitpid(pid, 0), (pid, 0))
Ross Lagerwall7807c352011-03-17 20:20:30 +0200160 finally:
161 os.close(fp)
162
163 @unittest.skipUnless(hasattr(posix, 'waitid'), "test needs posix.waitid()")
164 @unittest.skipUnless(hasattr(os, 'fork'), "test needs os.fork()")
165 def test_waitid(self):
166 pid = os.fork()
167 if pid == 0:
168 os.chdir(os.path.split(sys.executable)[0])
169 posix.execve(sys.executable, [sys.executable, '-c', 'pass'], os.environ)
170 else:
171 res = posix.waitid(posix.P_PID, pid, posix.WEXITED)
172 self.assertEqual(pid, res.si_pid)
173
174 @unittest.skipUnless(hasattr(posix, 'lockf'), "test needs posix.lockf()")
175 def test_lockf(self):
176 fd = os.open(support.TESTFN, os.O_WRONLY | os.O_CREAT)
177 try:
178 os.write(fd, b'test')
179 os.lseek(fd, 0, os.SEEK_SET)
180 posix.lockf(fd, posix.F_LOCK, 4)
181 # section is locked
182 posix.lockf(fd, posix.F_ULOCK, 4)
183 finally:
184 os.close(fd)
185
186 @unittest.skipUnless(hasattr(posix, 'pread'), "test needs posix.pread()")
187 def test_pread(self):
188 fd = os.open(support.TESTFN, os.O_RDWR | os.O_CREAT)
189 try:
190 os.write(fd, b'test')
191 os.lseek(fd, 0, os.SEEK_SET)
192 self.assertEqual(b'es', posix.pread(fd, 2, 1))
193 # the first pread() shoudn't disturb the file offset
194 self.assertEqual(b'te', posix.read(fd, 2))
195 finally:
196 os.close(fd)
197
198 @unittest.skipUnless(hasattr(posix, 'pwrite'), "test needs posix.pwrite()")
199 def test_pwrite(self):
200 fd = os.open(support.TESTFN, os.O_RDWR | os.O_CREAT)
201 try:
202 os.write(fd, b'test')
203 os.lseek(fd, 0, os.SEEK_SET)
204 posix.pwrite(fd, b'xx', 1)
205 self.assertEqual(b'txxt', posix.read(fd, 4))
206 finally:
207 os.close(fd)
208
209 @unittest.skipUnless(hasattr(posix, 'posix_fallocate'),
210 "test needs posix.posix_fallocate()")
211 def test_posix_fallocate(self):
212 fd = os.open(support.TESTFN, os.O_WRONLY | os.O_CREAT)
213 try:
214 posix.posix_fallocate(fd, 0, 10)
215 except OSError as inst:
216 # issue10812, ZFS doesn't appear to support posix_fallocate,
217 # so skip Solaris-based since they are likely to have ZFS.
218 if inst.errno != errno.EINVAL or not sys.platform.startswith("sunos"):
219 raise
220 finally:
221 os.close(fd)
222
223 @unittest.skipUnless(hasattr(posix, 'posix_fadvise'),
224 "test needs posix.posix_fadvise()")
225 def test_posix_fadvise(self):
226 fd = os.open(support.TESTFN, os.O_RDONLY)
227 try:
228 posix.posix_fadvise(fd, 0, 0, posix.POSIX_FADV_WILLNEED)
229 finally:
230 os.close(fd)
231
232 @unittest.skipUnless(hasattr(posix, 'futimes'), "test needs posix.futimes()")
233 def test_futimes(self):
234 now = time.time()
235 fd = os.open(support.TESTFN, os.O_RDONLY)
236 try:
237 posix.futimes(fd, None)
238 self.assertRaises(TypeError, posix.futimes, fd, (None, None))
239 self.assertRaises(TypeError, posix.futimes, fd, (now, None))
240 self.assertRaises(TypeError, posix.futimes, fd, (None, now))
241 posix.futimes(fd, (int(now), int(now)))
242 posix.futimes(fd, (now, now))
243 finally:
244 os.close(fd)
245
246 @unittest.skipUnless(hasattr(posix, 'lutimes'), "test needs posix.lutimes()")
247 def test_lutimes(self):
248 now = time.time()
249 posix.lutimes(support.TESTFN, None)
250 self.assertRaises(TypeError, posix.lutimes, support.TESTFN, (None, None))
251 self.assertRaises(TypeError, posix.lutimes, support.TESTFN, (now, None))
252 self.assertRaises(TypeError, posix.lutimes, support.TESTFN, (None, now))
253 posix.lutimes(support.TESTFN, (int(now), int(now)))
254 posix.lutimes(support.TESTFN, (now, now))
255
256 @unittest.skipUnless(hasattr(posix, 'futimens'), "test needs posix.futimens()")
257 def test_futimens(self):
258 now = time.time()
259 fd = os.open(support.TESTFN, os.O_RDONLY)
260 try:
261 self.assertRaises(TypeError, posix.futimens, fd, (None, None), (None, None))
262 self.assertRaises(TypeError, posix.futimens, fd, (now, 0), None)
263 self.assertRaises(TypeError, posix.futimens, fd, None, (now, 0))
264 posix.futimens(fd, (int(now), int((now - int(now)) * 1e9)),
265 (int(now), int((now - int(now)) * 1e9)))
266 finally:
267 os.close(fd)
268
269 @unittest.skipUnless(hasattr(posix, 'writev'), "test needs posix.writev()")
270 def test_writev(self):
271 fd = os.open(support.TESTFN, os.O_RDWR | os.O_CREAT)
272 try:
273 os.writev(fd, (b'test1', b'tt2', b't3'))
274 os.lseek(fd, 0, os.SEEK_SET)
275 self.assertEqual(b'test1tt2t3', posix.read(fd, 10))
276 finally:
277 os.close(fd)
278
279 @unittest.skipUnless(hasattr(posix, 'readv'), "test needs posix.readv()")
280 def test_readv(self):
281 fd = os.open(support.TESTFN, os.O_RDWR | os.O_CREAT)
282 try:
283 os.write(fd, b'test1tt2t3')
284 os.lseek(fd, 0, os.SEEK_SET)
285 buf = [bytearray(i) for i in [5, 3, 2]]
286 self.assertEqual(posix.readv(fd, buf), 10)
287 self.assertEqual([b'test1', b'tt2', b't3'], [bytes(i) for i in buf])
288 finally:
289 os.close(fd)
290
Neal Norwitze241ce82003-02-17 18:17:05 +0000291 def test_dup(self):
292 if hasattr(posix, 'dup'):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000293 fp = open(support.TESTFN)
Neal Norwitze241ce82003-02-17 18:17:05 +0000294 try:
295 fd = posix.dup(fp.fileno())
Ezio Melottie9615932010-01-24 19:26:24 +0000296 self.assertIsInstance(fd, int)
Neal Norwitze241ce82003-02-17 18:17:05 +0000297 os.close(fd)
298 finally:
299 fp.close()
300
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000301 def test_confstr(self):
302 if hasattr(posix, 'confstr'):
303 self.assertRaises(ValueError, posix.confstr, "CS_garbage")
304 self.assertEqual(len(posix.confstr("CS_PATH")) > 0, True)
305
Neal Norwitze241ce82003-02-17 18:17:05 +0000306 def test_dup2(self):
307 if hasattr(posix, 'dup2'):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000308 fp1 = open(support.TESTFN)
309 fp2 = open(support.TESTFN)
Neal Norwitze241ce82003-02-17 18:17:05 +0000310 try:
311 posix.dup2(fp1.fileno(), fp2.fileno())
312 finally:
313 fp1.close()
314 fp2.close()
315
Charles-François Natali1e045b12011-05-22 20:42:32 +0200316 @unittest.skipUnless(hasattr(os, 'O_CLOEXEC'), "needs os.O_CLOEXEC")
Charles-François Natali239bb962011-06-03 12:55:15 +0200317 @support.requires_linux_version(2, 6, 23)
Charles-François Natali1e045b12011-05-22 20:42:32 +0200318 def test_oscloexec(self):
319 fd = os.open(support.TESTFN, os.O_RDONLY|os.O_CLOEXEC)
320 self.addCleanup(os.close, fd)
Victor Stinnere36f3752011-05-24 00:29:43 +0200321 self.assertTrue(fcntl.fcntl(fd, fcntl.F_GETFD) & fcntl.FD_CLOEXEC)
Charles-François Natali1e045b12011-05-22 20:42:32 +0200322
Skip Montanaro98470002005-06-17 01:14:49 +0000323 def test_osexlock(self):
324 if hasattr(posix, "O_EXLOCK"):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000325 fd = os.open(support.TESTFN,
Skip Montanaro98470002005-06-17 01:14:49 +0000326 os.O_WRONLY|os.O_EXLOCK|os.O_CREAT)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000327 self.assertRaises(OSError, os.open, support.TESTFN,
Skip Montanaro98470002005-06-17 01:14:49 +0000328 os.O_WRONLY|os.O_EXLOCK|os.O_NONBLOCK)
329 os.close(fd)
330
331 if hasattr(posix, "O_SHLOCK"):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000332 fd = os.open(support.TESTFN,
Skip Montanaro98470002005-06-17 01:14:49 +0000333 os.O_WRONLY|os.O_SHLOCK|os.O_CREAT)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000334 self.assertRaises(OSError, os.open, support.TESTFN,
Skip Montanaro98470002005-06-17 01:14:49 +0000335 os.O_WRONLY|os.O_EXLOCK|os.O_NONBLOCK)
336 os.close(fd)
337
338 def test_osshlock(self):
339 if hasattr(posix, "O_SHLOCK"):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000340 fd1 = os.open(support.TESTFN,
Skip Montanaro98470002005-06-17 01:14:49 +0000341 os.O_WRONLY|os.O_SHLOCK|os.O_CREAT)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000342 fd2 = os.open(support.TESTFN,
Skip Montanaro98470002005-06-17 01:14:49 +0000343 os.O_WRONLY|os.O_SHLOCK|os.O_CREAT)
344 os.close(fd2)
345 os.close(fd1)
346
347 if hasattr(posix, "O_EXLOCK"):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000348 fd = os.open(support.TESTFN,
Skip Montanaro98470002005-06-17 01:14:49 +0000349 os.O_WRONLY|os.O_SHLOCK|os.O_CREAT)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000350 self.assertRaises(OSError, os.open, support.TESTFN,
Skip Montanaro98470002005-06-17 01:14:49 +0000351 os.O_RDONLY|os.O_EXLOCK|os.O_NONBLOCK)
352 os.close(fd)
353
Neal Norwitze241ce82003-02-17 18:17:05 +0000354 def test_fstat(self):
355 if hasattr(posix, 'fstat'):
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000356 fp = open(support.TESTFN)
Neal Norwitze241ce82003-02-17 18:17:05 +0000357 try:
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000358 self.assertTrue(posix.fstat(fp.fileno()))
Neal Norwitze241ce82003-02-17 18:17:05 +0000359 finally:
360 fp.close()
361
362 def test_stat(self):
363 if hasattr(posix, 'stat'):
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000364 self.assertTrue(posix.stat(support.TESTFN))
Neal Norwitze241ce82003-02-17 18:17:05 +0000365
Benjamin Peterson052a02b2010-08-17 01:27:09 +0000366 @unittest.skipUnless(hasattr(posix, 'mkfifo'), "don't have mkfifo()")
367 def test_mkfifo(self):
368 support.unlink(support.TESTFN)
369 posix.mkfifo(support.TESTFN, stat.S_IRUSR | stat.S_IWUSR)
370 self.assertTrue(stat.S_ISFIFO(posix.stat(support.TESTFN).st_mode))
371
372 @unittest.skipUnless(hasattr(posix, 'mknod') and hasattr(stat, 'S_IFIFO'),
373 "don't have mknod()/S_IFIFO")
374 def test_mknod(self):
375 # Test using mknod() to create a FIFO (the only use specified
376 # by POSIX).
377 support.unlink(support.TESTFN)
378 mode = stat.S_IFIFO | stat.S_IRUSR | stat.S_IWUSR
379 try:
380 posix.mknod(support.TESTFN, mode, 0)
381 except OSError as e:
382 # Some old systems don't allow unprivileged users to use
383 # mknod(), or only support creating device nodes.
384 self.assertIn(e.errno, (errno.EPERM, errno.EINVAL))
385 else:
386 self.assertTrue(stat.S_ISFIFO(posix.stat(support.TESTFN).st_mode))
387
Benjamin Peterson1baf4652009-12-31 03:11:23 +0000388 def _test_all_chown_common(self, chown_func, first_param):
389 """Common code for chown, fchown and lchown tests."""
390 if os.getuid() == 0:
391 try:
392 # Many linux distros have a nfsnobody user as MAX_UID-2
393 # that makes a good test case for signedness issues.
394 # http://bugs.python.org/issue1747858
395 # This part of the test only runs when run as root.
396 # Only scary people run their tests as root.
397 ent = pwd.getpwnam('nfsnobody')
398 chown_func(first_param, ent.pw_uid, ent.pw_gid)
399 except KeyError:
400 pass
401 else:
402 # non-root cannot chown to root, raises OSError
403 self.assertRaises(OSError, chown_func,
404 first_param, 0, 0)
405 # test a successful chown call
406 chown_func(first_param, os.getuid(), os.getgid())
Christian Heimesd5e2b6f2008-03-19 21:50:51 +0000407
Benjamin Peterson1baf4652009-12-31 03:11:23 +0000408 @unittest.skipUnless(hasattr(posix, 'chown'), "test needs os.chown()")
409 def test_chown(self):
410 # raise an OSError if the file does not exist
411 os.unlink(support.TESTFN)
412 self.assertRaises(OSError, posix.chown, support.TESTFN, -1, -1)
Christian Heimesd5e2b6f2008-03-19 21:50:51 +0000413
Benjamin Peterson1baf4652009-12-31 03:11:23 +0000414 # re-create the file
Victor Stinnerbf816222011-06-30 23:25:47 +0200415 support.create_empty_file(support.TESTFN)
Benjamin Peterson1baf4652009-12-31 03:11:23 +0000416 self._test_all_chown_common(posix.chown, support.TESTFN)
417
418 @unittest.skipUnless(hasattr(posix, 'fchown'), "test needs os.fchown()")
419 def test_fchown(self):
420 os.unlink(support.TESTFN)
421
422 # re-create the file
423 test_file = open(support.TESTFN, 'w')
424 try:
425 fd = test_file.fileno()
426 self._test_all_chown_common(posix.fchown, fd)
427 finally:
428 test_file.close()
429
430 @unittest.skipUnless(hasattr(posix, 'lchown'), "test needs os.lchown()")
431 def test_lchown(self):
432 os.unlink(support.TESTFN)
433 # create a symlink
Ned Deily3eb67d52011-06-28 00:00:28 -0700434 os.symlink(_DUMMY_SYMLINK, support.TESTFN)
Benjamin Peterson1baf4652009-12-31 03:11:23 +0000435 self._test_all_chown_common(posix.lchown, support.TESTFN)
Christian Heimesd5e2b6f2008-03-19 21:50:51 +0000436
Neal Norwitze241ce82003-02-17 18:17:05 +0000437 def test_chdir(self):
438 if hasattr(posix, 'chdir'):
439 posix.chdir(os.curdir)
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000440 self.assertRaises(OSError, posix.chdir, support.TESTFN)
Neal Norwitze241ce82003-02-17 18:17:05 +0000441
Martin v. Löwisc9e1c7d2010-07-23 12:16:41 +0000442 def test_listdir(self):
443 if hasattr(posix, 'listdir'):
444 self.assertTrue(support.TESTFN in posix.listdir(os.curdir))
445
446 def test_listdir_default(self):
447 # When listdir is called without argument, it's the same as listdir(os.curdir)
448 if hasattr(posix, 'listdir'):
449 self.assertTrue(support.TESTFN in posix.listdir())
Neal Norwitze241ce82003-02-17 18:17:05 +0000450
Antoine Pitrou8250e232011-02-25 23:41:16 +0000451 @unittest.skipUnless(hasattr(posix, 'fdlistdir'), "test needs posix.fdlistdir()")
452 def test_fdlistdir(self):
453 f = posix.open(posix.getcwd(), posix.O_RDONLY)
454 self.assertEqual(
455 sorted(posix.listdir('.')),
456 sorted(posix.fdlistdir(f))
457 )
458 # Check the fd was closed by fdlistdir
459 with self.assertRaises(OSError) as ctx:
460 posix.close(f)
461 self.assertEqual(ctx.exception.errno, errno.EBADF)
462
Neal Norwitze241ce82003-02-17 18:17:05 +0000463 def test_access(self):
464 if hasattr(posix, 'access'):
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000465 self.assertTrue(posix.access(support.TESTFN, os.R_OK))
Neal Norwitze241ce82003-02-17 18:17:05 +0000466
467 def test_umask(self):
468 if hasattr(posix, 'umask'):
469 old_mask = posix.umask(0)
Ezio Melottie9615932010-01-24 19:26:24 +0000470 self.assertIsInstance(old_mask, int)
Neal Norwitze241ce82003-02-17 18:17:05 +0000471 posix.umask(old_mask)
472
473 def test_strerror(self):
474 if hasattr(posix, 'strerror'):
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000475 self.assertTrue(posix.strerror(0))
Neal Norwitze241ce82003-02-17 18:17:05 +0000476
477 def test_pipe(self):
478 if hasattr(posix, 'pipe'):
479 reader, writer = posix.pipe()
480 os.close(reader)
481 os.close(writer)
482
Charles-François Natalidaafdd52011-05-29 20:07:40 +0200483 @unittest.skipUnless(hasattr(os, 'pipe2'), "test needs os.pipe2()")
Charles-François Natali239bb962011-06-03 12:55:15 +0200484 @support.requires_linux_version(2, 6, 27)
Charles-François Natalidaafdd52011-05-29 20:07:40 +0200485 def test_pipe2(self):
486 self.assertRaises(TypeError, os.pipe2, 'DEADBEEF')
487 self.assertRaises(TypeError, os.pipe2, 0, 0)
488
Charles-François Natali368f34b2011-06-06 19:49:47 +0200489 # try calling with flags = 0, like os.pipe()
490 r, w = os.pipe2(0)
Charles-François Natalidaafdd52011-05-29 20:07:40 +0200491 os.close(r)
492 os.close(w)
493
494 # test flags
495 r, w = os.pipe2(os.O_CLOEXEC|os.O_NONBLOCK)
496 self.addCleanup(os.close, r)
497 self.addCleanup(os.close, w)
498 self.assertTrue(fcntl.fcntl(r, fcntl.F_GETFD) & fcntl.FD_CLOEXEC)
499 self.assertTrue(fcntl.fcntl(w, fcntl.F_GETFD) & fcntl.FD_CLOEXEC)
500 # try reading from an empty pipe: this should fail, not block
501 self.assertRaises(OSError, os.read, r, 1)
502 # try a write big enough to fill-up the pipe: this should either
503 # fail or perform a partial write, not block
504 try:
505 os.write(w, b'x' * support.PIPE_MAX_SIZE)
506 except OSError:
507 pass
508
Neal Norwitze241ce82003-02-17 18:17:05 +0000509 def test_utime(self):
510 if hasattr(posix, 'utime'):
511 now = time.time()
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000512 posix.utime(support.TESTFN, None)
513 self.assertRaises(TypeError, posix.utime, support.TESTFN, (None, None))
514 self.assertRaises(TypeError, posix.utime, support.TESTFN, (now, None))
515 self.assertRaises(TypeError, posix.utime, support.TESTFN, (None, now))
516 posix.utime(support.TESTFN, (int(now), int(now)))
517 posix.utime(support.TESTFN, (now, now))
Neal Norwitze241ce82003-02-17 18:17:05 +0000518
Ned Deily3eb67d52011-06-28 00:00:28 -0700519 def _test_chflags_regular_file(self, chflags_func, target_file):
520 st = os.stat(target_file)
521 self.assertTrue(hasattr(st, 'st_flags'))
522 chflags_func(target_file, st.st_flags | stat.UF_IMMUTABLE)
523 try:
524 new_st = os.stat(target_file)
525 self.assertEqual(st.st_flags | stat.UF_IMMUTABLE, new_st.st_flags)
526 try:
527 fd = open(target_file, 'w+')
528 except IOError as e:
529 self.assertEqual(e.errno, errno.EPERM)
530 finally:
531 posix.chflags(target_file, st.st_flags)
Thomas Wouterscf297e42007-02-23 15:07:44 +0000532
Ned Deily3eb67d52011-06-28 00:00:28 -0700533 @unittest.skipUnless(hasattr(posix, 'chflags'), 'test needs os.chflags()')
534 def test_chflags(self):
535 self._test_chflags_regular_file(posix.chflags, support.TESTFN)
536
537 @unittest.skipUnless(hasattr(posix, 'lchflags'), 'test needs os.lchflags()')
538 def test_lchflags_regular_file(self):
539 self._test_chflags_regular_file(posix.lchflags, support.TESTFN)
540
541 @unittest.skipUnless(hasattr(posix, 'lchflags'), 'test needs os.lchflags()')
542 def test_lchflags_symlink(self):
543 testfn_st = os.stat(support.TESTFN)
544
545 self.assertTrue(hasattr(testfn_st, 'st_flags'))
546
547 os.symlink(support.TESTFN, _DUMMY_SYMLINK)
548 self.teardown_files.append(_DUMMY_SYMLINK)
549 dummy_symlink_st = os.lstat(_DUMMY_SYMLINK)
550
551 posix.lchflags(_DUMMY_SYMLINK,
552 dummy_symlink_st.st_flags | stat.UF_IMMUTABLE)
553 try:
554 new_testfn_st = os.stat(support.TESTFN)
555 new_dummy_symlink_st = os.lstat(_DUMMY_SYMLINK)
556
557 self.assertEqual(testfn_st.st_flags, new_testfn_st.st_flags)
558 self.assertEqual(dummy_symlink_st.st_flags | stat.UF_IMMUTABLE,
559 new_dummy_symlink_st.st_flags)
560 finally:
561 posix.lchflags(_DUMMY_SYMLINK, dummy_symlink_st.st_flags)
Thomas Wouterscf297e42007-02-23 15:07:44 +0000562
Guido van Rossum98297ee2007-11-06 21:34:58 +0000563 def test_environ(self):
Victor Stinner17b490d2010-05-06 22:19:30 +0000564 if os.name == "nt":
565 item_type = str
566 else:
567 item_type = bytes
Guido van Rossum98297ee2007-11-06 21:34:58 +0000568 for k, v in posix.environ.items():
Victor Stinner17b490d2010-05-06 22:19:30 +0000569 self.assertEqual(type(k), item_type)
570 self.assertEqual(type(v), item_type)
Guido van Rossum98297ee2007-11-06 21:34:58 +0000571
Benjamin Petersondcf97b92008-07-02 17:30:14 +0000572 def test_getcwd_long_pathnames(self):
573 if hasattr(posix, 'getcwd'):
574 dirname = 'getcwd-test-directory-0123456789abcdef-01234567890abcdef'
575 curdir = os.getcwd()
576 base_path = os.path.abspath(support.TESTFN) + '.getcwd'
577
578 try:
579 os.mkdir(base_path)
580 os.chdir(base_path)
581 except:
Benjamin Petersone549ead2009-03-28 21:42:05 +0000582# Just returning nothing instead of the SkipTest exception,
Benjamin Petersondcf97b92008-07-02 17:30:14 +0000583# because the test results in Error in that case.
584# Is that ok?
Benjamin Petersone549ead2009-03-28 21:42:05 +0000585# raise unittest.SkipTest("cannot create directory for testing")
Benjamin Petersondcf97b92008-07-02 17:30:14 +0000586 return
587
588 def _create_and_do_getcwd(dirname, current_path_length = 0):
589 try:
590 os.mkdir(dirname)
591 except:
Benjamin Petersone549ead2009-03-28 21:42:05 +0000592 raise unittest.SkipTest("mkdir cannot create directory sufficiently deep for getcwd test")
Benjamin Petersondcf97b92008-07-02 17:30:14 +0000593
594 os.chdir(dirname)
595 try:
596 os.getcwd()
597 if current_path_length < 1027:
598 _create_and_do_getcwd(dirname, current_path_length + len(dirname) + 1)
599 finally:
600 os.chdir('..')
601 os.rmdir(dirname)
602
603 _create_and_do_getcwd(dirname)
604
605 finally:
Benjamin Petersondcf97b92008-07-02 17:30:14 +0000606 os.chdir(curdir)
R. David Murray414c91f2009-07-09 20:12:31 +0000607 support.rmtree(base_path)
Benjamin Petersondcf97b92008-07-02 17:30:14 +0000608
Ross Lagerwallb0ae53d2011-06-10 07:30:30 +0200609 @unittest.skipUnless(hasattr(posix, 'getgrouplist'), "test needs posix.getgrouplist()")
610 @unittest.skipUnless(hasattr(pwd, 'getpwuid'), "test needs pwd.getpwuid()")
611 @unittest.skipUnless(hasattr(os, 'getuid'), "test needs os.getuid()")
612 def test_getgrouplist(self):
613 with os.popen('id -G') as idg:
614 groups = idg.read().strip()
615
616 if not groups:
617 raise unittest.SkipTest("need working 'id -G'")
618
619 self.assertEqual(
620 set([int(x) for x in groups.split()]),
621 set(posix.getgrouplist(pwd.getpwuid(os.getuid())[0],
622 pwd.getpwuid(os.getuid())[3])))
623
Antoine Pitrou318b8f32011-01-12 18:45:27 +0000624 @unittest.skipUnless(hasattr(os, 'getegid'), "test needs os.getegid()")
Ronald Oussorenb6ee4f52010-07-23 13:53:51 +0000625 def test_getgroups(self):
626 with os.popen('id -G') as idg:
627 groups = idg.read().strip()
628
629 if not groups:
630 raise unittest.SkipTest("need working 'id -G'")
631
Ronald Oussoren7fb6f512010-08-01 19:18:13 +0000632 # 'id -G' and 'os.getgroups()' should return the same
633 # groups, ignoring order and duplicates.
Antoine Pitrou318b8f32011-01-12 18:45:27 +0000634 # #10822 - it is implementation defined whether posix.getgroups()
635 # includes the effective gid so we include it anyway, since id -G does
Ronald Oussorencb615e62010-07-24 14:15:19 +0000636 self.assertEqual(
Ronald Oussoren7fb6f512010-08-01 19:18:13 +0000637 set([int(x) for x in groups.split()]),
Antoine Pitrou318b8f32011-01-12 18:45:27 +0000638 set(posix.getgroups() + [posix.getegid()]))
Ronald Oussorenb6ee4f52010-07-23 13:53:51 +0000639
Antoine Pitrouf65132d2011-02-25 23:25:17 +0000640 # tests for the posix *at functions follow
641
642 @unittest.skipUnless(hasattr(posix, 'faccessat'), "test needs posix.faccessat()")
643 def test_faccessat(self):
644 f = posix.open(posix.getcwd(), posix.O_RDONLY)
645 try:
646 self.assertTrue(posix.faccessat(f, support.TESTFN, os.R_OK))
647 finally:
648 posix.close(f)
649
650 @unittest.skipUnless(hasattr(posix, 'fchmodat'), "test needs posix.fchmodat()")
651 def test_fchmodat(self):
652 os.chmod(support.TESTFN, stat.S_IRUSR)
653
654 f = posix.open(posix.getcwd(), posix.O_RDONLY)
655 try:
656 posix.fchmodat(f, support.TESTFN, stat.S_IRUSR | stat.S_IWUSR)
657
658 s = posix.stat(support.TESTFN)
659 self.assertEqual(s[0] & stat.S_IRWXU, stat.S_IRUSR | stat.S_IWUSR)
660 finally:
661 posix.close(f)
662
663 @unittest.skipUnless(hasattr(posix, 'fchownat'), "test needs posix.fchownat()")
664 def test_fchownat(self):
665 support.unlink(support.TESTFN)
Victor Stinnerbf816222011-06-30 23:25:47 +0200666 support.create_empty_file(support.TESTFN)
Antoine Pitrouf65132d2011-02-25 23:25:17 +0000667
668 f = posix.open(posix.getcwd(), posix.O_RDONLY)
669 try:
670 posix.fchownat(f, support.TESTFN, os.getuid(), os.getgid())
671 finally:
672 posix.close(f)
673
674 @unittest.skipUnless(hasattr(posix, 'fstatat'), "test needs posix.fstatat()")
675 def test_fstatat(self):
676 support.unlink(support.TESTFN)
677 with open(support.TESTFN, 'w') as outfile:
678 outfile.write("testline\n")
679
680 f = posix.open(posix.getcwd(), posix.O_RDONLY)
681 try:
682 s1 = posix.stat(support.TESTFN)
683 s2 = posix.fstatat(f, support.TESTFN)
684 self.assertEqual(s1, s2)
685 finally:
686 posix.close(f)
687
688 @unittest.skipUnless(hasattr(posix, 'futimesat'), "test needs posix.futimesat()")
689 def test_futimesat(self):
690 f = posix.open(posix.getcwd(), posix.O_RDONLY)
691 try:
692 now = time.time()
693 posix.futimesat(f, support.TESTFN, None)
694 self.assertRaises(TypeError, posix.futimesat, f, support.TESTFN, (None, None))
695 self.assertRaises(TypeError, posix.futimesat, f, support.TESTFN, (now, None))
696 self.assertRaises(TypeError, posix.futimesat, f, support.TESTFN, (None, now))
697 posix.futimesat(f, support.TESTFN, (int(now), int(now)))
698 posix.futimesat(f, support.TESTFN, (now, now))
699 finally:
700 posix.close(f)
701
702 @unittest.skipUnless(hasattr(posix, 'linkat'), "test needs posix.linkat()")
703 def test_linkat(self):
704 f = posix.open(posix.getcwd(), posix.O_RDONLY)
705 try:
706 posix.linkat(f, support.TESTFN, f, support.TESTFN + 'link')
707 # should have same inodes
708 self.assertEqual(posix.stat(support.TESTFN)[1],
709 posix.stat(support.TESTFN + 'link')[1])
710 finally:
711 posix.close(f)
712 support.unlink(support.TESTFN + 'link')
713
714 @unittest.skipUnless(hasattr(posix, 'mkdirat'), "test needs posix.mkdirat()")
715 def test_mkdirat(self):
716 f = posix.open(posix.getcwd(), posix.O_RDONLY)
717 try:
718 posix.mkdirat(f, support.TESTFN + 'dir')
719 posix.stat(support.TESTFN + 'dir') # should not raise exception
720 finally:
721 posix.close(f)
722 support.rmtree(support.TESTFN + 'dir')
723
724 @unittest.skipUnless(hasattr(posix, 'mknodat') and hasattr(stat, 'S_IFIFO'),
725 "don't have mknodat()/S_IFIFO")
726 def test_mknodat(self):
727 # Test using mknodat() to create a FIFO (the only use specified
728 # by POSIX).
729 support.unlink(support.TESTFN)
730 mode = stat.S_IFIFO | stat.S_IRUSR | stat.S_IWUSR
731 f = posix.open(posix.getcwd(), posix.O_RDONLY)
732 try:
733 posix.mknodat(f, support.TESTFN, mode, 0)
734 except OSError as e:
735 # Some old systems don't allow unprivileged users to use
736 # mknod(), or only support creating device nodes.
737 self.assertIn(e.errno, (errno.EPERM, errno.EINVAL))
738 else:
739 self.assertTrue(stat.S_ISFIFO(posix.stat(support.TESTFN).st_mode))
740 finally:
741 posix.close(f)
742
743 @unittest.skipUnless(hasattr(posix, 'openat'), "test needs posix.openat()")
744 def test_openat(self):
745 support.unlink(support.TESTFN)
746 with open(support.TESTFN, 'w') as outfile:
747 outfile.write("testline\n")
748 a = posix.open(posix.getcwd(), posix.O_RDONLY)
749 b = posix.openat(a, support.TESTFN, posix.O_RDONLY)
750 try:
751 res = posix.read(b, 9).decode(encoding="utf-8")
752 self.assertEqual("testline\n", res)
753 finally:
754 posix.close(a)
755 posix.close(b)
756
757 @unittest.skipUnless(hasattr(posix, 'readlinkat'), "test needs posix.readlinkat()")
758 def test_readlinkat(self):
759 os.symlink(support.TESTFN, support.TESTFN + 'link')
760 f = posix.open(posix.getcwd(), posix.O_RDONLY)
761 try:
762 self.assertEqual(posix.readlink(support.TESTFN + 'link'),
763 posix.readlinkat(f, support.TESTFN + 'link'))
764 finally:
765 support.unlink(support.TESTFN + 'link')
766 posix.close(f)
767
768 @unittest.skipUnless(hasattr(posix, 'renameat'), "test needs posix.renameat()")
769 def test_renameat(self):
770 support.unlink(support.TESTFN)
Victor Stinnerbf816222011-06-30 23:25:47 +0200771 support.create_empty_file(support.TESTFN + 'ren')
Antoine Pitrouf65132d2011-02-25 23:25:17 +0000772 f = posix.open(posix.getcwd(), posix.O_RDONLY)
773 try:
774 posix.renameat(f, support.TESTFN + 'ren', f, support.TESTFN)
775 except:
776 posix.rename(support.TESTFN + 'ren', support.TESTFN)
777 raise
778 else:
779 posix.stat(support.TESTFN) # should not throw exception
780 finally:
781 posix.close(f)
782
783 @unittest.skipUnless(hasattr(posix, 'symlinkat'), "test needs posix.symlinkat()")
784 def test_symlinkat(self):
785 f = posix.open(posix.getcwd(), posix.O_RDONLY)
786 try:
787 posix.symlinkat(support.TESTFN, f, support.TESTFN + 'link')
788 self.assertEqual(posix.readlink(support.TESTFN + 'link'), support.TESTFN)
789 finally:
790 posix.close(f)
791 support.unlink(support.TESTFN + 'link')
792
793 @unittest.skipUnless(hasattr(posix, 'unlinkat'), "test needs posix.unlinkat()")
794 def test_unlinkat(self):
795 f = posix.open(posix.getcwd(), posix.O_RDONLY)
Victor Stinnerbf816222011-06-30 23:25:47 +0200796 support.create_empty_file(support.TESTFN + 'del')
Antoine Pitrouf65132d2011-02-25 23:25:17 +0000797 posix.stat(support.TESTFN + 'del') # should not throw exception
798 try:
799 posix.unlinkat(f, support.TESTFN + 'del')
800 except:
801 support.unlink(support.TESTFN + 'del')
802 raise
803 else:
804 self.assertRaises(OSError, posix.stat, support.TESTFN + 'link')
805 finally:
806 posix.close(f)
807
808 @unittest.skipUnless(hasattr(posix, 'utimensat'), "test needs posix.utimensat()")
809 def test_utimensat(self):
810 f = posix.open(posix.getcwd(), posix.O_RDONLY)
811 try:
812 now = time.time()
813 posix.utimensat(f, support.TESTFN, None, None)
814 self.assertRaises(TypeError, posix.utimensat, f, support.TESTFN, (None, None), (None, None))
815 self.assertRaises(TypeError, posix.utimensat, f, support.TESTFN, (now, 0), None)
816 self.assertRaises(TypeError, posix.utimensat, f, support.TESTFN, None, (now, 0))
817 posix.utimensat(f, support.TESTFN, (int(now), int((now - int(now)) * 1e9)),
818 (int(now), int((now - int(now)) * 1e9)))
819 finally:
820 posix.close(f)
821
822 @unittest.skipUnless(hasattr(posix, 'mkfifoat'), "don't have mkfifoat()")
823 def test_mkfifoat(self):
824 support.unlink(support.TESTFN)
825 f = posix.open(posix.getcwd(), posix.O_RDONLY)
826 try:
827 posix.mkfifoat(f, support.TESTFN, stat.S_IRUSR | stat.S_IWUSR)
828 self.assertTrue(stat.S_ISFIFO(posix.stat(support.TESTFN).st_mode))
829 finally:
830 posix.close(f)
831
Benjamin Peterson94b580d2011-08-02 17:30:04 -0500832 requires_sched_h = unittest.skipUnless(hasattr(posix, 'sched_yield'),
833 "don't have scheduling support")
Benjamin Peterson2740af82011-08-02 17:41:34 -0500834 requires_sched_affinity = unittest.skipUnless(hasattr(posix, 'cpu_set'),
Benjamin Peterson50ba2712011-08-02 22:15:40 -0500835 "don't have sched affinity support")
Benjamin Peterson94b580d2011-08-02 17:30:04 -0500836
837 @requires_sched_h
838 def test_sched_yield(self):
839 # This has no error conditions (at least on Linux).
840 posix.sched_yield()
841
842 @requires_sched_h
Charles-François Nataliea0d5fc2011-09-06 19:03:35 +0200843 @unittest.skipUnless(hasattr(posix, 'sched_get_priority_max'),
844 "requires sched_get_priority_max()")
Benjamin Peterson94b580d2011-08-02 17:30:04 -0500845 def test_sched_priority(self):
846 # Round-robin usually has interesting priorities.
847 pol = posix.SCHED_RR
848 lo = posix.sched_get_priority_min(pol)
849 hi = posix.sched_get_priority_max(pol)
850 self.assertIsInstance(lo, int)
851 self.assertIsInstance(hi, int)
852 self.assertGreaterEqual(hi, lo)
Benjamin Peterson539b6c42011-08-02 22:09:37 -0500853 # OSX evidently just returns 15 without checking the argument.
854 if sys.platform != "darwin":
Benjamin Petersonc1581582011-08-02 22:10:55 -0500855 self.assertRaises(OSError, posix.sched_get_priority_min, -23)
856 self.assertRaises(OSError, posix.sched_get_priority_max, -23)
Benjamin Peterson94b580d2011-08-02 17:30:04 -0500857
Benjamin Petersonc5fce4d2011-08-02 18:07:32 -0500858 @unittest.skipUnless(hasattr(posix, 'sched_setscheduler'), "can't change scheduler")
Benjamin Peterson94b580d2011-08-02 17:30:04 -0500859 def test_get_and_set_scheduler_and_param(self):
860 possible_schedulers = [sched for name, sched in posix.__dict__.items()
861 if name.startswith("SCHED_")]
862 mine = posix.sched_getscheduler(0)
863 self.assertIn(mine, possible_schedulers)
864 try:
Jesus Ceaceb5d162011-09-10 01:16:55 +0200865 parent = posix.sched_getscheduler(os.getppid())
Benjamin Peterson94b580d2011-08-02 17:30:04 -0500866 except OSError as e:
Jesus Ceaceb5d162011-09-10 01:16:55 +0200867 if e.errno != errno.EPERM:
Benjamin Peterson94b580d2011-08-02 17:30:04 -0500868 raise
869 else:
Jesus Ceaceb5d162011-09-10 01:16:55 +0200870 self.assertIn(parent, possible_schedulers)
Benjamin Peterson94b580d2011-08-02 17:30:04 -0500871 self.assertRaises(OSError, posix.sched_getscheduler, -1)
872 self.assertRaises(OSError, posix.sched_getparam, -1)
873 param = posix.sched_getparam(0)
874 self.assertIsInstance(param.sched_priority, int)
Benjamin Peterson18592ca2011-08-02 18:48:59 -0500875 try:
876 posix.sched_setscheduler(0, mine, param)
877 except OSError as e:
878 if e.errno != errno.EPERM:
879 raise
Charles-François Natali7b911cb2011-08-21 12:41:43 +0200880
881 # POSIX states that calling sched_setparam() on a process with a
882 # scheduling policy other than SCHED_FIFO or SCHED_RR is
883 # implementation-defined: FreeBSD returns EINVAL.
884 if not sys.platform.startswith('freebsd'):
885 posix.sched_setparam(0, param)
886 self.assertRaises(OSError, posix.sched_setparam, -1, param)
887
Benjamin Peterson94b580d2011-08-02 17:30:04 -0500888 self.assertRaises(OSError, posix.sched_setscheduler, -1, mine, param)
889 self.assertRaises(TypeError, posix.sched_setscheduler, 0, mine, None)
890 self.assertRaises(TypeError, posix.sched_setparam, 0, 43)
891 param = posix.sched_param(None)
892 self.assertRaises(TypeError, posix.sched_setparam, 0, param)
893 large = 214748364700
894 param = posix.sched_param(large)
895 self.assertRaises(OverflowError, posix.sched_setparam, 0, param)
896 param = posix.sched_param(sched_priority=-large)
897 self.assertRaises(OverflowError, posix.sched_setparam, 0, param)
898
Benjamin Petersonc5fce4d2011-08-02 18:07:32 -0500899 @unittest.skipUnless(hasattr(posix, "sched_rr_get_interval"), "no function")
Benjamin Peterson94b580d2011-08-02 17:30:04 -0500900 def test_sched_rr_get_interval(self):
Benjamin Peterson43234ab2011-08-02 22:19:14 -0500901 try:
902 interval = posix.sched_rr_get_interval(0)
903 except OSError as e:
904 # This likely means that sched_rr_get_interval is only valid for
905 # processes with the SCHED_RR scheduler in effect.
906 if e.errno != errno.EINVAL:
907 raise
908 self.skipTest("only works on SCHED_RR processes")
Benjamin Peterson94b580d2011-08-02 17:30:04 -0500909 self.assertIsInstance(interval, float)
910 # Reasonable constraints, I think.
911 self.assertGreaterEqual(interval, 0.)
912 self.assertLess(interval, 1.)
913
Benjamin Peterson2740af82011-08-02 17:41:34 -0500914 @requires_sched_affinity
Benjamin Peterson94b580d2011-08-02 17:30:04 -0500915 def test_sched_affinity(self):
916 mask = posix.sched_getaffinity(0, 1024)
917 self.assertGreaterEqual(mask.count(), 1)
918 self.assertIsInstance(mask, posix.cpu_set)
919 self.assertRaises(OSError, posix.sched_getaffinity, -1, 1024)
920 empty = posix.cpu_set(10)
921 posix.sched_setaffinity(0, mask)
922 self.assertRaises(OSError, posix.sched_setaffinity, 0, empty)
923 self.assertRaises(OSError, posix.sched_setaffinity, -1, mask)
924
Benjamin Peterson2740af82011-08-02 17:41:34 -0500925 @requires_sched_affinity
Benjamin Peterson94b580d2011-08-02 17:30:04 -0500926 def test_cpu_set_basic(self):
927 s = posix.cpu_set(10)
928 self.assertEqual(len(s), 10)
929 self.assertEqual(s.count(), 0)
930 s.set(0)
931 s.set(9)
932 self.assertTrue(s.isset(0))
933 self.assertTrue(s.isset(9))
934 self.assertFalse(s.isset(5))
935 self.assertEqual(s.count(), 2)
936 s.clear(0)
937 self.assertFalse(s.isset(0))
938 self.assertEqual(s.count(), 1)
939 s.zero()
940 self.assertFalse(s.isset(0))
941 self.assertFalse(s.isset(9))
942 self.assertEqual(s.count(), 0)
943 self.assertRaises(ValueError, s.set, -1)
944 self.assertRaises(ValueError, s.set, 10)
945 self.assertRaises(ValueError, s.clear, -1)
946 self.assertRaises(ValueError, s.clear, 10)
947 self.assertRaises(ValueError, s.isset, -1)
948 self.assertRaises(ValueError, s.isset, 10)
949
Benjamin Peterson2740af82011-08-02 17:41:34 -0500950 @requires_sched_affinity
Benjamin Peterson94b580d2011-08-02 17:30:04 -0500951 def test_cpu_set_cmp(self):
952 self.assertNotEqual(posix.cpu_set(11), posix.cpu_set(12))
953 l = posix.cpu_set(10)
954 r = posix.cpu_set(10)
955 self.assertEqual(l, r)
956 l.set(1)
957 self.assertNotEqual(l, r)
958 r.set(1)
959 self.assertEqual(l, r)
960
Benjamin Peterson2740af82011-08-02 17:41:34 -0500961 @requires_sched_affinity
Benjamin Peterson94b580d2011-08-02 17:30:04 -0500962 def test_cpu_set_bitwise(self):
963 l = posix.cpu_set(5)
964 l.set(0)
965 l.set(1)
966 r = posix.cpu_set(5)
967 r.set(1)
968 r.set(2)
969 b = l & r
970 self.assertEqual(b.count(), 1)
971 self.assertTrue(b.isset(1))
972 b = l | r
973 self.assertEqual(b.count(), 3)
974 self.assertTrue(b.isset(0))
975 self.assertTrue(b.isset(1))
976 self.assertTrue(b.isset(2))
977 b = l ^ r
978 self.assertEqual(b.count(), 2)
979 self.assertTrue(b.isset(0))
980 self.assertFalse(b.isset(1))
981 self.assertTrue(b.isset(2))
982 b = l
983 b |= r
984 self.assertIs(b, l)
985 self.assertEqual(l.count(), 3)
986
Ronald Oussorenb6ee4f52010-07-23 13:53:51 +0000987class PosixGroupsTester(unittest.TestCase):
988
989 def setUp(self):
990 if posix.getuid() != 0:
991 raise unittest.SkipTest("not enough privileges")
992 if not hasattr(posix, 'getgroups'):
993 raise unittest.SkipTest("need posix.getgroups")
994 if sys.platform == 'darwin':
995 raise unittest.SkipTest("getgroups(2) is broken on OSX")
996 self.saved_groups = posix.getgroups()
997
998 def tearDown(self):
999 if hasattr(posix, 'setgroups'):
1000 posix.setgroups(self.saved_groups)
1001 elif hasattr(posix, 'initgroups'):
1002 name = pwd.getpwuid(posix.getuid()).pw_name
1003 posix.initgroups(name, self.saved_groups[0])
1004
1005 @unittest.skipUnless(hasattr(posix, 'initgroups'),
1006 "test needs posix.initgroups()")
1007 def test_initgroups(self):
1008 # find missing group
1009
Antoine Pitroue5a91012010-09-04 17:32:06 +00001010 g = max(self.saved_groups) + 1
Ronald Oussorenb6ee4f52010-07-23 13:53:51 +00001011 name = pwd.getpwuid(posix.getuid()).pw_name
1012 posix.initgroups(name, g)
1013 self.assertIn(g, posix.getgroups())
1014
1015 @unittest.skipUnless(hasattr(posix, 'setgroups'),
1016 "test needs posix.setgroups()")
1017 def test_setgroups(self):
Antoine Pitroue5a91012010-09-04 17:32:06 +00001018 for groups in [[0], list(range(16))]:
Ronald Oussorenb6ee4f52010-07-23 13:53:51 +00001019 posix.setgroups(groups)
1020 self.assertListEqual(groups, posix.getgroups())
1021
Neal Norwitze241ce82003-02-17 18:17:05 +00001022def test_main():
Antoine Pitrou68c95922011-03-20 17:33:57 +01001023 try:
1024 support.run_unittest(PosixTester, PosixGroupsTester)
1025 finally:
1026 support.reap_children()
Neal Norwitze241ce82003-02-17 18:17:05 +00001027
1028if __name__ == '__main__':
1029 test_main()