| import pipes | 
 | import os | 
 | import string | 
 | import unittest | 
 | from test.support import TESTFN, run_unittest, unlink, reap_children | 
 |  | 
 | if os.name != 'posix': | 
 |     raise unittest.SkipTest('pipes module only works on posix') | 
 |  | 
 | TESTFN2 = TESTFN + "2" | 
 |  | 
 | # tr a-z A-Z is not portable, so make the ranges explicit | 
 | s_command = 'tr %s %s' % (string.ascii_lowercase, string.ascii_uppercase) | 
 |  | 
 | class SimplePipeTests(unittest.TestCase): | 
 |     def tearDown(self): | 
 |         for f in (TESTFN, TESTFN2): | 
 |             unlink(f) | 
 |  | 
 |     def testSimplePipe1(self): | 
 |         t = pipes.Template() | 
 |         t.append(s_command, pipes.STDIN_STDOUT) | 
 |         f = t.open(TESTFN, 'w') | 
 |         f.write('hello world #1') | 
 |         f.close() | 
 |         with open(TESTFN) as f: | 
 |             self.assertEqual(f.read(), 'HELLO WORLD #1') | 
 |  | 
 |     def testSimplePipe2(self): | 
 |         with open(TESTFN, 'w') as f: | 
 |             f.write('hello world #2') | 
 |         t = pipes.Template() | 
 |         t.append(s_command + ' < $IN > $OUT', pipes.FILEIN_FILEOUT) | 
 |         t.copy(TESTFN, TESTFN2) | 
 |         with open(TESTFN2) as f: | 
 |             self.assertEqual(f.read(), 'HELLO WORLD #2') | 
 |  | 
 |     def testSimplePipe3(self): | 
 |         with open(TESTFN, 'w') as f: | 
 |             f.write('hello world #2') | 
 |         t = pipes.Template() | 
 |         t.append(s_command + ' < $IN', pipes.FILEIN_STDOUT) | 
 |         f = t.open(TESTFN, 'r') | 
 |         try: | 
 |             self.assertEqual(f.read(), 'HELLO WORLD #2') | 
 |         finally: | 
 |             f.close() | 
 |  | 
 |     def testEmptyPipeline1(self): | 
 |         # copy through empty pipe | 
 |         d = 'empty pipeline test COPY' | 
 |         with open(TESTFN, 'w') as f: | 
 |             f.write(d) | 
 |         with open(TESTFN2, 'w') as f: | 
 |             f.write('') | 
 |         t=pipes.Template() | 
 |         t.copy(TESTFN, TESTFN2) | 
 |         with open(TESTFN2) as f: | 
 |             self.assertEqual(f.read(), d) | 
 |  | 
 |     def testEmptyPipeline2(self): | 
 |         # read through empty pipe | 
 |         d = 'empty pipeline test READ' | 
 |         with open(TESTFN, 'w') as f: | 
 |             f.write(d) | 
 |         t=pipes.Template() | 
 |         f = t.open(TESTFN, 'r') | 
 |         try: | 
 |             self.assertEqual(f.read(), d) | 
 |         finally: | 
 |             f.close() | 
 |  | 
 |     def testEmptyPipeline3(self): | 
 |         # write through empty pipe | 
 |         d = 'empty pipeline test WRITE' | 
 |         t = pipes.Template() | 
 |         with t.open(TESTFN, 'w') as f: | 
 |             f.write(d) | 
 |         with open(TESTFN) as f: | 
 |             self.assertEqual(f.read(), d) | 
 |  | 
 |     def testQuoting(self): | 
 |         safeunquoted = string.ascii_letters + string.digits + '@%_-+=:,./' | 
 |         unicode_sample = '\xe9\xe0\xdf'  # e + acute accent, a + grave, sharp s | 
 |         unsafe = '"`$\\!' + unicode_sample | 
 |  | 
 |         self.assertEqual(pipes.quote(''), "''") | 
 |         self.assertEqual(pipes.quote(safeunquoted), safeunquoted) | 
 |         self.assertEqual(pipes.quote('test file name'), "'test file name'") | 
 |         for u in unsafe: | 
 |             self.assertEqual(pipes.quote('test%sname' % u), | 
 |                               "'test%sname'" % u) | 
 |         for u in unsafe: | 
 |             self.assertEqual(pipes.quote("test%s'name'" % u), | 
 |                              "'test%s'\"'\"'name'\"'\"''" % u) | 
 |  | 
 |     def testRepr(self): | 
 |         t = pipes.Template() | 
 |         self.assertEqual(repr(t), "<Template instance, steps=[]>") | 
 |         t.append('tr a-z A-Z', pipes.STDIN_STDOUT) | 
 |         self.assertEqual(repr(t), | 
 |                     "<Template instance, steps=[('tr a-z A-Z', '--')]>") | 
 |  | 
 |     def testSetDebug(self): | 
 |         t = pipes.Template() | 
 |         t.debug(False) | 
 |         self.assertEqual(t.debugging, False) | 
 |         t.debug(True) | 
 |         self.assertEqual(t.debugging, True) | 
 |  | 
 |     def testReadOpenSink(self): | 
 |         # check calling open('r') on a pipe ending with | 
 |         # a sink raises ValueError | 
 |         t = pipes.Template() | 
 |         t.append('boguscmd', pipes.SINK) | 
 |         self.assertRaises(ValueError, t.open, 'bogusfile', 'r') | 
 |  | 
 |     def testWriteOpenSource(self): | 
 |         # check calling open('w') on a pipe ending with | 
 |         # a source raises ValueError | 
 |         t = pipes.Template() | 
 |         t.prepend('boguscmd', pipes.SOURCE) | 
 |         self.assertRaises(ValueError, t.open, 'bogusfile', 'w') | 
 |  | 
 |     def testBadAppendOptions(self): | 
 |         t = pipes.Template() | 
 |  | 
 |         # try a non-string command | 
 |         self.assertRaises(TypeError, t.append, 7, pipes.STDIN_STDOUT) | 
 |  | 
 |         # try a type that isn't recognized | 
 |         self.assertRaises(ValueError, t.append, 'boguscmd', 'xx') | 
 |  | 
 |         # shouldn't be able to append a source | 
 |         self.assertRaises(ValueError, t.append, 'boguscmd', pipes.SOURCE) | 
 |  | 
 |         # check appending two sinks | 
 |         t = pipes.Template() | 
 |         t.append('boguscmd', pipes.SINK) | 
 |         self.assertRaises(ValueError, t.append, 'boguscmd', pipes.SINK) | 
 |  | 
 |         # command needing file input but with no $IN | 
 |         t = pipes.Template() | 
 |         self.assertRaises(ValueError, t.append, 'boguscmd $OUT', | 
 |                            pipes.FILEIN_FILEOUT) | 
 |         t = pipes.Template() | 
 |         self.assertRaises(ValueError, t.append, 'boguscmd', | 
 |                            pipes.FILEIN_STDOUT) | 
 |  | 
 |         # command needing file output but with no $OUT | 
 |         t = pipes.Template() | 
 |         self.assertRaises(ValueError, t.append, 'boguscmd $IN', | 
 |                            pipes.FILEIN_FILEOUT) | 
 |         t = pipes.Template() | 
 |         self.assertRaises(ValueError, t.append, 'boguscmd', | 
 |                            pipes.STDIN_FILEOUT) | 
 |  | 
 |  | 
 |     def testBadPrependOptions(self): | 
 |         t = pipes.Template() | 
 |  | 
 |         # try a non-string command | 
 |         self.assertRaises(TypeError, t.prepend, 7, pipes.STDIN_STDOUT) | 
 |  | 
 |         # try a type that isn't recognized | 
 |         self.assertRaises(ValueError, t.prepend, 'tr a-z A-Z', 'xx') | 
 |  | 
 |         # shouldn't be able to prepend a sink | 
 |         self.assertRaises(ValueError, t.prepend, 'boguscmd', pipes.SINK) | 
 |  | 
 |         # check prepending two sources | 
 |         t = pipes.Template() | 
 |         t.prepend('boguscmd', pipes.SOURCE) | 
 |         self.assertRaises(ValueError, t.prepend, 'boguscmd', pipes.SOURCE) | 
 |  | 
 |         # command needing file input but with no $IN | 
 |         t = pipes.Template() | 
 |         self.assertRaises(ValueError, t.prepend, 'boguscmd $OUT', | 
 |                            pipes.FILEIN_FILEOUT) | 
 |         t = pipes.Template() | 
 |         self.assertRaises(ValueError, t.prepend, 'boguscmd', | 
 |                            pipes.FILEIN_STDOUT) | 
 |  | 
 |         # command needing file output but with no $OUT | 
 |         t = pipes.Template() | 
 |         self.assertRaises(ValueError, t.prepend, 'boguscmd $IN', | 
 |                            pipes.FILEIN_FILEOUT) | 
 |         t = pipes.Template() | 
 |         self.assertRaises(ValueError, t.prepend, 'boguscmd', | 
 |                            pipes.STDIN_FILEOUT) | 
 |  | 
 |     def testBadOpenMode(self): | 
 |         t = pipes.Template() | 
 |         self.assertRaises(ValueError, t.open, 'bogusfile', 'x') | 
 |  | 
 |     def testClone(self): | 
 |         t = pipes.Template() | 
 |         t.append('tr a-z A-Z', pipes.STDIN_STDOUT) | 
 |  | 
 |         u = t.clone() | 
 |         self.assertNotEqual(id(t), id(u)) | 
 |         self.assertEqual(t.steps, u.steps) | 
 |         self.assertNotEqual(id(t.steps), id(u.steps)) | 
 |         self.assertEqual(t.debugging, u.debugging) | 
 |  | 
 | def test_main(): | 
 |     run_unittest(SimplePipeTests) | 
 |     reap_children() | 
 |  | 
 | if __name__ == "__main__": | 
 |     test_main() |