blob: 7d8cd54ba0e5b3371037a14b05129a831c4c915e [file] [log] [blame]
Thomas Wouters47b49bf2007-08-30 22:15:33 +00001import pipes
2import os
3import string
4import unittest
Serhiy Storchakad7062de2016-05-05 10:55:45 +03005import shutil
Hai Shibb0424b2020-08-04 00:47:42 +08006from test.support import run_unittest, reap_children
7from test.support.os_helper import TESTFN, unlink
8
Thomas Wouters47b49bf2007-08-30 22:15:33 +00009
10if os.name != 'posix':
Benjamin Petersone549ead2009-03-28 21:42:05 +000011 raise unittest.SkipTest('pipes module only works on posix')
Thomas Wouters47b49bf2007-08-30 22:15:33 +000012
13TESTFN2 = TESTFN + "2"
14
Thomas Woutersbca54802007-09-10 19:32:14 +000015# tr a-z A-Z is not portable, so make the ranges explicit
16s_command = 'tr %s %s' % (string.ascii_lowercase, string.ascii_uppercase)
17
Thomas Wouters47b49bf2007-08-30 22:15:33 +000018class SimplePipeTests(unittest.TestCase):
19 def tearDown(self):
20 for f in (TESTFN, TESTFN2):
21 unlink(f)
22
23 def testSimplePipe1(self):
Serhiy Storchakad7062de2016-05-05 10:55:45 +030024 if shutil.which('tr') is None:
25 self.skipTest('tr is not available')
Thomas Wouters47b49bf2007-08-30 22:15:33 +000026 t = pipes.Template()
Thomas Woutersbca54802007-09-10 19:32:14 +000027 t.append(s_command, pipes.STDIN_STDOUT)
Serhiy Storchaka5b10b982019-03-05 10:06:26 +020028 with t.open(TESTFN, 'w') as f:
29 f.write('hello world #1')
Antoine Pitrou92f60ed2010-10-14 22:11:44 +000030 with open(TESTFN) as f:
31 self.assertEqual(f.read(), 'HELLO WORLD #1')
Thomas Wouters47b49bf2007-08-30 22:15:33 +000032
33 def testSimplePipe2(self):
Serhiy Storchakad7062de2016-05-05 10:55:45 +030034 if shutil.which('tr') is None:
35 self.skipTest('tr is not available')
Antoine Pitrou92f60ed2010-10-14 22:11:44 +000036 with open(TESTFN, 'w') as f:
37 f.write('hello world #2')
Thomas Wouters47b49bf2007-08-30 22:15:33 +000038 t = pipes.Template()
Thomas Woutersbca54802007-09-10 19:32:14 +000039 t.append(s_command + ' < $IN > $OUT', pipes.FILEIN_FILEOUT)
Thomas Wouters47b49bf2007-08-30 22:15:33 +000040 t.copy(TESTFN, TESTFN2)
Antoine Pitrou92f60ed2010-10-14 22:11:44 +000041 with open(TESTFN2) as f:
42 self.assertEqual(f.read(), 'HELLO WORLD #2')
Thomas Wouters47b49bf2007-08-30 22:15:33 +000043
44 def testSimplePipe3(self):
Serhiy Storchakad7062de2016-05-05 10:55:45 +030045 if shutil.which('tr') is None:
46 self.skipTest('tr is not available')
Antoine Pitrou92f60ed2010-10-14 22:11:44 +000047 with open(TESTFN, 'w') as f:
48 f.write('hello world #2')
Thomas Wouters47b49bf2007-08-30 22:15:33 +000049 t = pipes.Template()
Thomas Woutersbca54802007-09-10 19:32:14 +000050 t.append(s_command + ' < $IN', pipes.FILEIN_STDOUT)
Antoine Pitrou9b14f602009-12-08 19:53:23 +000051 f = t.open(TESTFN, 'r')
52 try:
53 self.assertEqual(f.read(), 'HELLO WORLD #2')
54 finally:
55 f.close()
Thomas Wouters47b49bf2007-08-30 22:15:33 +000056
57 def testEmptyPipeline1(self):
58 # copy through empty pipe
59 d = 'empty pipeline test COPY'
Antoine Pitrou92f60ed2010-10-14 22:11:44 +000060 with open(TESTFN, 'w') as f:
61 f.write(d)
62 with open(TESTFN2, 'w') as f:
63 f.write('')
Thomas Wouters47b49bf2007-08-30 22:15:33 +000064 t=pipes.Template()
65 t.copy(TESTFN, TESTFN2)
Antoine Pitrou92f60ed2010-10-14 22:11:44 +000066 with open(TESTFN2) as f:
67 self.assertEqual(f.read(), d)
Thomas Wouters47b49bf2007-08-30 22:15:33 +000068
69 def testEmptyPipeline2(self):
70 # read through empty pipe
71 d = 'empty pipeline test READ'
Antoine Pitrou92f60ed2010-10-14 22:11:44 +000072 with open(TESTFN, 'w') as f:
73 f.write(d)
Thomas Wouters47b49bf2007-08-30 22:15:33 +000074 t=pipes.Template()
Antoine Pitrou9b14f602009-12-08 19:53:23 +000075 f = t.open(TESTFN, 'r')
76 try:
77 self.assertEqual(f.read(), d)
78 finally:
79 f.close()
Thomas Wouters47b49bf2007-08-30 22:15:33 +000080
81 def testEmptyPipeline3(self):
82 # write through empty pipe
83 d = 'empty pipeline test WRITE'
84 t = pipes.Template()
Antoine Pitrou92f60ed2010-10-14 22:11:44 +000085 with t.open(TESTFN, 'w') as f:
86 f.write(d)
87 with open(TESTFN) as f:
88 self.assertEqual(f.read(), d)
Thomas Wouters47b49bf2007-08-30 22:15:33 +000089
Thomas Wouters47b49bf2007-08-30 22:15:33 +000090 def testRepr(self):
91 t = pipes.Template()
92 self.assertEqual(repr(t), "<Template instance, steps=[]>")
93 t.append('tr a-z A-Z', pipes.STDIN_STDOUT)
94 self.assertEqual(repr(t),
95 "<Template instance, steps=[('tr a-z A-Z', '--')]>")
96
97 def testSetDebug(self):
98 t = pipes.Template()
99 t.debug(False)
100 self.assertEqual(t.debugging, False)
101 t.debug(True)
102 self.assertEqual(t.debugging, True)
103
104 def testReadOpenSink(self):
105 # check calling open('r') on a pipe ending with
106 # a sink raises ValueError
107 t = pipes.Template()
108 t.append('boguscmd', pipes.SINK)
109 self.assertRaises(ValueError, t.open, 'bogusfile', 'r')
110
111 def testWriteOpenSource(self):
112 # check calling open('w') on a pipe ending with
113 # a source raises ValueError
114 t = pipes.Template()
115 t.prepend('boguscmd', pipes.SOURCE)
116 self.assertRaises(ValueError, t.open, 'bogusfile', 'w')
117
118 def testBadAppendOptions(self):
119 t = pipes.Template()
120
121 # try a non-string command
122 self.assertRaises(TypeError, t.append, 7, pipes.STDIN_STDOUT)
123
124 # try a type that isn't recognized
125 self.assertRaises(ValueError, t.append, 'boguscmd', 'xx')
126
127 # shouldn't be able to append a source
128 self.assertRaises(ValueError, t.append, 'boguscmd', pipes.SOURCE)
129
130 # check appending two sinks
131 t = pipes.Template()
132 t.append('boguscmd', pipes.SINK)
133 self.assertRaises(ValueError, t.append, 'boguscmd', pipes.SINK)
134
135 # command needing file input but with no $IN
136 t = pipes.Template()
137 self.assertRaises(ValueError, t.append, 'boguscmd $OUT',
138 pipes.FILEIN_FILEOUT)
139 t = pipes.Template()
140 self.assertRaises(ValueError, t.append, 'boguscmd',
141 pipes.FILEIN_STDOUT)
142
143 # command needing file output but with no $OUT
144 t = pipes.Template()
145 self.assertRaises(ValueError, t.append, 'boguscmd $IN',
146 pipes.FILEIN_FILEOUT)
147 t = pipes.Template()
148 self.assertRaises(ValueError, t.append, 'boguscmd',
149 pipes.STDIN_FILEOUT)
150
151
152 def testBadPrependOptions(self):
153 t = pipes.Template()
154
155 # try a non-string command
156 self.assertRaises(TypeError, t.prepend, 7, pipes.STDIN_STDOUT)
157
158 # try a type that isn't recognized
159 self.assertRaises(ValueError, t.prepend, 'tr a-z A-Z', 'xx')
160
161 # shouldn't be able to prepend a sink
162 self.assertRaises(ValueError, t.prepend, 'boguscmd', pipes.SINK)
163
164 # check prepending two sources
165 t = pipes.Template()
166 t.prepend('boguscmd', pipes.SOURCE)
167 self.assertRaises(ValueError, t.prepend, 'boguscmd', pipes.SOURCE)
168
169 # command needing file input but with no $IN
170 t = pipes.Template()
171 self.assertRaises(ValueError, t.prepend, 'boguscmd $OUT',
172 pipes.FILEIN_FILEOUT)
173 t = pipes.Template()
174 self.assertRaises(ValueError, t.prepend, 'boguscmd',
175 pipes.FILEIN_STDOUT)
176
177 # command needing file output but with no $OUT
178 t = pipes.Template()
179 self.assertRaises(ValueError, t.prepend, 'boguscmd $IN',
180 pipes.FILEIN_FILEOUT)
181 t = pipes.Template()
182 self.assertRaises(ValueError, t.prepend, 'boguscmd',
183 pipes.STDIN_FILEOUT)
184
185 def testBadOpenMode(self):
186 t = pipes.Template()
187 self.assertRaises(ValueError, t.open, 'bogusfile', 'x')
188
189 def testClone(self):
190 t = pipes.Template()
191 t.append('tr a-z A-Z', pipes.STDIN_STDOUT)
192
193 u = t.clone()
194 self.assertNotEqual(id(t), id(u))
195 self.assertEqual(t.steps, u.steps)
196 self.assertNotEqual(id(t.steps), id(u.steps))
197 self.assertEqual(t.debugging, u.debugging)
198
199def test_main():
200 run_unittest(SimplePipeTests)
Antoine Pitrou9b14f602009-12-08 19:53:23 +0000201 reap_children()
Thomas Wouters47b49bf2007-08-30 22:15:33 +0000202
203if __name__ == "__main__":
204 test_main()