Chris Liechti | 9cbd646 | 2016-05-17 22:49:44 +0200 | [diff] [blame] | 1 | #!/usr/bin/env python |
| 2 | # |
| 3 | # This file is part of pySerial - Cross platform serial port support for Python |
| 4 | # (C) 2016 Chris Liechti <cliechti@gmx.net> |
| 5 | # |
| 6 | # SPDX-License-Identifier: BSD-3-Clause |
| 7 | """ |
| 8 | Test cancel functionality. |
| 9 | """ |
Chris Liechti | 9cbd646 | 2016-05-17 22:49:44 +0200 | [diff] [blame] | 10 | import sys |
| 11 | import unittest |
| 12 | import threading |
Chris Liechti | edb0714 | 2016-05-24 00:05:06 +0200 | [diff] [blame] | 13 | import time |
Chris Liechti | 9cbd646 | 2016-05-17 22:49:44 +0200 | [diff] [blame] | 14 | import serial |
| 15 | |
| 16 | # on which port should the tests be performed: |
| 17 | PORT = 'loop://' |
| 18 | |
Chris Liechti | 77d922b | 2016-05-26 17:14:58 +0200 | [diff] [blame] | 19 | |
Chris Liechti | 9cbd646 | 2016-05-17 22:49:44 +0200 | [diff] [blame] | 20 | @unittest.skipIf(not hasattr(serial.Serial, 'cancel_read'), "cancel_read not supported on platform") |
| 21 | class TestCancelRead(unittest.TestCase): |
| 22 | """Test cancel_read functionality""" |
| 23 | |
| 24 | def setUp(self): |
| 25 | # create a closed serial port |
| 26 | self.s = serial.serial_for_url(PORT) |
| 27 | self.assertTrue(hasattr(self.s, 'cancel_read'), "serial instance has no cancel_read") |
Chris Liechti | edb0714 | 2016-05-24 00:05:06 +0200 | [diff] [blame] | 28 | self.s.timeout = 10 |
| 29 | self.cancel_called = 0 |
| 30 | |
| 31 | def tearDown(self): |
| 32 | self.s.reset_output_buffer() |
| 33 | self.s.close() |
Chris Liechti | 9cbd646 | 2016-05-17 22:49:44 +0200 | [diff] [blame] | 34 | |
| 35 | def _cancel(self, num_times): |
| 36 | for i in range(num_times): |
| 37 | #~ print "cancel" |
Chris Liechti | edb0714 | 2016-05-24 00:05:06 +0200 | [diff] [blame] | 38 | self.cancel_called += 1 |
Chris Liechti | 4dcd025 | 2016-05-25 01:49:44 +0200 | [diff] [blame] | 39 | self.s.cancel_read() |
Chris Liechti | 9cbd646 | 2016-05-17 22:49:44 +0200 | [diff] [blame] | 40 | |
| 41 | def test_cancel_once(self): |
Chris Liechti | edb0714 | 2016-05-24 00:05:06 +0200 | [diff] [blame] | 42 | """Cancel read""" |
Chris Liechti | 9cbd646 | 2016-05-17 22:49:44 +0200 | [diff] [blame] | 43 | threading.Timer(1, self._cancel, ((1,))).start() |
Chris Liechti | edb0714 | 2016-05-24 00:05:06 +0200 | [diff] [blame] | 44 | t1 = time.time() |
| 45 | self.s.read(1000) |
| 46 | t2 = time.time() |
| 47 | self.assertEqual(self.cancel_called, 1) |
Chris Liechti | 8c05ebf | 2016-10-05 03:32:36 +0200 | [diff] [blame] | 48 | self.assertTrue(0.5 < (t2 - t1) < 2.5, 'Function did not return in time: {}'.format(t2 - t1)) |
Chris Liechti | 9cbd646 | 2016-05-17 22:49:44 +0200 | [diff] [blame] | 49 | #~ self.assertTrue(not self.s.isOpen()) |
| 50 | #~ self.assertRaises(serial.SerialException, self.s.open) |
| 51 | |
| 52 | #~ def test_cancel_before_read(self): |
| 53 | #~ self.s.cancel_read() |
| 54 | #~ self.s.read() |
| 55 | |
| 56 | |
Chris Liechti | 77d922b | 2016-05-26 17:14:58 +0200 | [diff] [blame] | 57 | DATA = b'#' * 1024 |
| 58 | |
Chris Liechti | 9cbd646 | 2016-05-17 22:49:44 +0200 | [diff] [blame] | 59 | |
| 60 | @unittest.skipIf(not hasattr(serial.Serial, 'cancel_write'), "cancel_read not supported on platform") |
| 61 | class TestCancelWrite(unittest.TestCase): |
| 62 | """Test cancel_write functionality""" |
| 63 | |
| 64 | def setUp(self): |
| 65 | # create a closed serial port |
Chris Liechti | edb0714 | 2016-05-24 00:05:06 +0200 | [diff] [blame] | 66 | self.s = serial.serial_for_url(PORT, baudrate=300) # extra slow ~30B/s => 1kb ~ 34s |
Chris Liechti | 9cbd646 | 2016-05-17 22:49:44 +0200 | [diff] [blame] | 67 | self.assertTrue(hasattr(self.s, 'cancel_write'), "serial instance has no cancel_write") |
Chris Liechti | edb0714 | 2016-05-24 00:05:06 +0200 | [diff] [blame] | 68 | self.s.write_timeout = 10 |
| 69 | self.cancel_called = 0 |
| 70 | |
| 71 | def tearDown(self): |
| 72 | self.s.reset_output_buffer() |
| 73 | # not all USB-Serial adapters will actually flush the output (maybe |
| 74 | # keeping the buffer in the MCU in the adapter) therefore, speed up by |
| 75 | # changing the baudrate |
| 76 | self.s.baudrate = 115200 |
| 77 | self.s.flush() |
| 78 | self.s.close() |
Chris Liechti | 9cbd646 | 2016-05-17 22:49:44 +0200 | [diff] [blame] | 79 | |
| 80 | def _cancel(self, num_times): |
| 81 | for i in range(num_times): |
Chris Liechti | edb0714 | 2016-05-24 00:05:06 +0200 | [diff] [blame] | 82 | self.cancel_called += 1 |
Chris Liechti | 4dcd025 | 2016-05-25 01:49:44 +0200 | [diff] [blame] | 83 | self.s.cancel_write() |
Chris Liechti | 9cbd646 | 2016-05-17 22:49:44 +0200 | [diff] [blame] | 84 | |
| 85 | def test_cancel_once(self): |
Chris Liechti | edb0714 | 2016-05-24 00:05:06 +0200 | [diff] [blame] | 86 | """Cancel write""" |
Chris Liechti | 9cbd646 | 2016-05-17 22:49:44 +0200 | [diff] [blame] | 87 | threading.Timer(1, self._cancel, ((1,))).start() |
Chris Liechti | edb0714 | 2016-05-24 00:05:06 +0200 | [diff] [blame] | 88 | t1 = time.time() |
Chris Liechti | 9cbd646 | 2016-05-17 22:49:44 +0200 | [diff] [blame] | 89 | self.s.write(DATA) |
Chris Liechti | edb0714 | 2016-05-24 00:05:06 +0200 | [diff] [blame] | 90 | t2 = time.time() |
| 91 | self.assertEqual(self.cancel_called, 1) |
Chris Liechti | 8c05ebf | 2016-10-05 03:32:36 +0200 | [diff] [blame] | 92 | self.assertTrue(0.5 < (t2 - t1) < 2.5, 'Function did not return in time: {}'.format(t2 - t1)) |
Chris Liechti | 9cbd646 | 2016-05-17 22:49:44 +0200 | [diff] [blame] | 93 | #~ self.assertTrue(not self.s.isOpen()) |
| 94 | #~ self.assertRaises(serial.SerialException, self.s.open) |
| 95 | |
| 96 | #~ def test_cancel_before_write(self): |
| 97 | #~ self.s.cancel_write() |
| 98 | #~ self.s.write(DATA) |
| 99 | #~ self.s.reset_output_buffer() |
| 100 | |
| 101 | |
| 102 | if __name__ == '__main__': |
Chris Liechti | 9cbd646 | 2016-05-17 22:49:44 +0200 | [diff] [blame] | 103 | sys.stdout.write(__doc__) |
| 104 | if len(sys.argv) > 1: |
| 105 | PORT = sys.argv[1] |
Chris Liechti | 3debab2 | 2016-06-20 22:52:22 +0200 | [diff] [blame] | 106 | sys.stdout.write("Testing port: {!r}\n".format(PORT)) |
Chris Liechti | 9cbd646 | 2016-05-17 22:49:44 +0200 | [diff] [blame] | 107 | sys.argv[1:] = ['-v'] |
| 108 | # When this module is executed from the command-line, it runs all its tests |
| 109 | unittest.main() |