blob: 6a13b36d1cb70ec430fe7ac50265fbcc18ad9b51 [file] [log] [blame]
Thomas Wouters47b49bf2007-08-30 22:15:33 +00001import pipes
2import os
3import string
4import unittest
Serhiy Storchakad7062de2016-05-05 10:55:45 +03005import shutil
pxinwra86a2742020-11-29 06:04:50 +08006from test.support import run_unittest, reap_children, unix_shell
Hai Shibb0424b2020-08-04 00:47:42 +08007from test.support.os_helper import TESTFN, unlink
8
Thomas Wouters47b49bf2007-08-30 22:15:33 +00009
10if os.name != 'posix':
Benjamin Petersone549ead2009-03-28 21:42:05 +000011 raise unittest.SkipTest('pipes module only works on posix')
Thomas Wouters47b49bf2007-08-30 22:15:33 +000012
pxinwra86a2742020-11-29 06:04:50 +080013if not (unix_shell and os.path.exists(unix_shell)):
14 raise unittest.SkipTest('pipes module requires a shell')
15
Thomas Wouters47b49bf2007-08-30 22:15:33 +000016TESTFN2 = TESTFN + "2"
17
Thomas Woutersbca54802007-09-10 19:32:14 +000018# tr a-z A-Z is not portable, so make the ranges explicit
19s_command = 'tr %s %s' % (string.ascii_lowercase, string.ascii_uppercase)
20
Thomas Wouters47b49bf2007-08-30 22:15:33 +000021class SimplePipeTests(unittest.TestCase):
22 def tearDown(self):
23 for f in (TESTFN, TESTFN2):
24 unlink(f)
25
26 def testSimplePipe1(self):
Serhiy Storchakad7062de2016-05-05 10:55:45 +030027 if shutil.which('tr') is None:
28 self.skipTest('tr is not available')
Thomas Wouters47b49bf2007-08-30 22:15:33 +000029 t = pipes.Template()
Thomas Woutersbca54802007-09-10 19:32:14 +000030 t.append(s_command, pipes.STDIN_STDOUT)
Serhiy Storchaka5b10b982019-03-05 10:06:26 +020031 with t.open(TESTFN, 'w') as f:
32 f.write('hello world #1')
Antoine Pitrou92f60ed2010-10-14 22:11:44 +000033 with open(TESTFN) as f:
34 self.assertEqual(f.read(), 'HELLO WORLD #1')
Thomas Wouters47b49bf2007-08-30 22:15:33 +000035
36 def testSimplePipe2(self):
Serhiy Storchakad7062de2016-05-05 10:55:45 +030037 if shutil.which('tr') is None:
38 self.skipTest('tr is not available')
Antoine Pitrou92f60ed2010-10-14 22:11:44 +000039 with open(TESTFN, 'w') as f:
40 f.write('hello world #2')
Thomas Wouters47b49bf2007-08-30 22:15:33 +000041 t = pipes.Template()
Thomas Woutersbca54802007-09-10 19:32:14 +000042 t.append(s_command + ' < $IN > $OUT', pipes.FILEIN_FILEOUT)
Thomas Wouters47b49bf2007-08-30 22:15:33 +000043 t.copy(TESTFN, TESTFN2)
Antoine Pitrou92f60ed2010-10-14 22:11:44 +000044 with open(TESTFN2) as f:
45 self.assertEqual(f.read(), 'HELLO WORLD #2')
Thomas Wouters47b49bf2007-08-30 22:15:33 +000046
47 def testSimplePipe3(self):
Serhiy Storchakad7062de2016-05-05 10:55:45 +030048 if shutil.which('tr') is None:
49 self.skipTest('tr is not available')
Antoine Pitrou92f60ed2010-10-14 22:11:44 +000050 with open(TESTFN, 'w') as f:
51 f.write('hello world #2')
Thomas Wouters47b49bf2007-08-30 22:15:33 +000052 t = pipes.Template()
Thomas Woutersbca54802007-09-10 19:32:14 +000053 t.append(s_command + ' < $IN', pipes.FILEIN_STDOUT)
Antoine Pitrou9b14f602009-12-08 19:53:23 +000054 f = t.open(TESTFN, 'r')
55 try:
56 self.assertEqual(f.read(), 'HELLO WORLD #2')
57 finally:
58 f.close()
Thomas Wouters47b49bf2007-08-30 22:15:33 +000059
60 def testEmptyPipeline1(self):
61 # copy through empty pipe
62 d = 'empty pipeline test COPY'
Antoine Pitrou92f60ed2010-10-14 22:11:44 +000063 with open(TESTFN, 'w') as f:
64 f.write(d)
65 with open(TESTFN2, 'w') as f:
66 f.write('')
Thomas Wouters47b49bf2007-08-30 22:15:33 +000067 t=pipes.Template()
68 t.copy(TESTFN, TESTFN2)
Antoine Pitrou92f60ed2010-10-14 22:11:44 +000069 with open(TESTFN2) as f:
70 self.assertEqual(f.read(), d)
Thomas Wouters47b49bf2007-08-30 22:15:33 +000071
72 def testEmptyPipeline2(self):
73 # read through empty pipe
74 d = 'empty pipeline test READ'
Antoine Pitrou92f60ed2010-10-14 22:11:44 +000075 with open(TESTFN, 'w') as f:
76 f.write(d)
Thomas Wouters47b49bf2007-08-30 22:15:33 +000077 t=pipes.Template()
Antoine Pitrou9b14f602009-12-08 19:53:23 +000078 f = t.open(TESTFN, 'r')
79 try:
80 self.assertEqual(f.read(), d)
81 finally:
82 f.close()
Thomas Wouters47b49bf2007-08-30 22:15:33 +000083
84 def testEmptyPipeline3(self):
85 # write through empty pipe
86 d = 'empty pipeline test WRITE'
87 t = pipes.Template()
Antoine Pitrou92f60ed2010-10-14 22:11:44 +000088 with t.open(TESTFN, 'w') as f:
89 f.write(d)
90 with open(TESTFN) as f:
91 self.assertEqual(f.read(), d)
Thomas Wouters47b49bf2007-08-30 22:15:33 +000092
Thomas Wouters47b49bf2007-08-30 22:15:33 +000093 def testRepr(self):
94 t = pipes.Template()
95 self.assertEqual(repr(t), "<Template instance, steps=[]>")
96 t.append('tr a-z A-Z', pipes.STDIN_STDOUT)
97 self.assertEqual(repr(t),
98 "<Template instance, steps=[('tr a-z A-Z', '--')]>")
99
100 def testSetDebug(self):
101 t = pipes.Template()
102 t.debug(False)
103 self.assertEqual(t.debugging, False)
104 t.debug(True)
105 self.assertEqual(t.debugging, True)
106
107 def testReadOpenSink(self):
108 # check calling open('r') on a pipe ending with
109 # a sink raises ValueError
110 t = pipes.Template()
111 t.append('boguscmd', pipes.SINK)
112 self.assertRaises(ValueError, t.open, 'bogusfile', 'r')
113
114 def testWriteOpenSource(self):
115 # check calling open('w') on a pipe ending with
116 # a source raises ValueError
117 t = pipes.Template()
118 t.prepend('boguscmd', pipes.SOURCE)
119 self.assertRaises(ValueError, t.open, 'bogusfile', 'w')
120
121 def testBadAppendOptions(self):
122 t = pipes.Template()
123
124 # try a non-string command
125 self.assertRaises(TypeError, t.append, 7, pipes.STDIN_STDOUT)
126
127 # try a type that isn't recognized
128 self.assertRaises(ValueError, t.append, 'boguscmd', 'xx')
129
130 # shouldn't be able to append a source
131 self.assertRaises(ValueError, t.append, 'boguscmd', pipes.SOURCE)
132
133 # check appending two sinks
134 t = pipes.Template()
135 t.append('boguscmd', pipes.SINK)
136 self.assertRaises(ValueError, t.append, 'boguscmd', pipes.SINK)
137
138 # command needing file input but with no $IN
139 t = pipes.Template()
140 self.assertRaises(ValueError, t.append, 'boguscmd $OUT',
141 pipes.FILEIN_FILEOUT)
142 t = pipes.Template()
143 self.assertRaises(ValueError, t.append, 'boguscmd',
144 pipes.FILEIN_STDOUT)
145
146 # command needing file output but with no $OUT
147 t = pipes.Template()
148 self.assertRaises(ValueError, t.append, 'boguscmd $IN',
149 pipes.FILEIN_FILEOUT)
150 t = pipes.Template()
151 self.assertRaises(ValueError, t.append, 'boguscmd',
152 pipes.STDIN_FILEOUT)
153
154
155 def testBadPrependOptions(self):
156 t = pipes.Template()
157
158 # try a non-string command
159 self.assertRaises(TypeError, t.prepend, 7, pipes.STDIN_STDOUT)
160
161 # try a type that isn't recognized
162 self.assertRaises(ValueError, t.prepend, 'tr a-z A-Z', 'xx')
163
164 # shouldn't be able to prepend a sink
165 self.assertRaises(ValueError, t.prepend, 'boguscmd', pipes.SINK)
166
167 # check prepending two sources
168 t = pipes.Template()
169 t.prepend('boguscmd', pipes.SOURCE)
170 self.assertRaises(ValueError, t.prepend, 'boguscmd', pipes.SOURCE)
171
172 # command needing file input but with no $IN
173 t = pipes.Template()
174 self.assertRaises(ValueError, t.prepend, 'boguscmd $OUT',
175 pipes.FILEIN_FILEOUT)
176 t = pipes.Template()
177 self.assertRaises(ValueError, t.prepend, 'boguscmd',
178 pipes.FILEIN_STDOUT)
179
180 # command needing file output but with no $OUT
181 t = pipes.Template()
182 self.assertRaises(ValueError, t.prepend, 'boguscmd $IN',
183 pipes.FILEIN_FILEOUT)
184 t = pipes.Template()
185 self.assertRaises(ValueError, t.prepend, 'boguscmd',
186 pipes.STDIN_FILEOUT)
187
188 def testBadOpenMode(self):
189 t = pipes.Template()
190 self.assertRaises(ValueError, t.open, 'bogusfile', 'x')
191
192 def testClone(self):
193 t = pipes.Template()
194 t.append('tr a-z A-Z', pipes.STDIN_STDOUT)
195
196 u = t.clone()
197 self.assertNotEqual(id(t), id(u))
198 self.assertEqual(t.steps, u.steps)
199 self.assertNotEqual(id(t.steps), id(u.steps))
200 self.assertEqual(t.debugging, u.debugging)
201
202def test_main():
203 run_unittest(SimplePipeTests)
Antoine Pitrou9b14f602009-12-08 19:53:23 +0000204 reap_children()
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000205
206if __name__ == "__main__":
207 test_main()