blob: daab1ceccdeb31f99bc52dc83f0673f4e84a916a [file] [log] [blame]
Chris Liechti9cbd6462016-05-17 22:49:44 +02001#!/usr/bin/env python
2#
3# This file is part of pySerial - Cross platform serial port support for Python
4# (C) 2016 Chris Liechti <cliechti@gmx.net>
5#
6# SPDX-License-Identifier: BSD-3-Clause
7"""
8Test cancel functionality.
9"""
Chris Liechti9cbd6462016-05-17 22:49:44 +020010import sys
11import unittest
12import threading
Chris Liechtiedb07142016-05-24 00:05:06 +020013import time
Chris Liechti9cbd6462016-05-17 22:49:44 +020014import serial
15
16# on which port should the tests be performed:
17PORT = 'loop://'
18
Chris Liechti77d922b2016-05-26 17:14:58 +020019
Chris Liechti9cbd6462016-05-17 22:49:44 +020020@unittest.skipIf(not hasattr(serial.Serial, 'cancel_read'), "cancel_read not supported on platform")
21class TestCancelRead(unittest.TestCase):
22 """Test cancel_read functionality"""
23
24 def setUp(self):
25 # create a closed serial port
26 self.s = serial.serial_for_url(PORT)
27 self.assertTrue(hasattr(self.s, 'cancel_read'), "serial instance has no cancel_read")
Chris Liechtiedb07142016-05-24 00:05:06 +020028 self.s.timeout = 10
29 self.cancel_called = 0
30
31 def tearDown(self):
32 self.s.reset_output_buffer()
33 self.s.close()
Chris Liechti9cbd6462016-05-17 22:49:44 +020034
35 def _cancel(self, num_times):
36 for i in range(num_times):
37 #~ print "cancel"
Chris Liechtiedb07142016-05-24 00:05:06 +020038 self.cancel_called += 1
Chris Liechti4dcd0252016-05-25 01:49:44 +020039 self.s.cancel_read()
Chris Liechti9cbd6462016-05-17 22:49:44 +020040
41 def test_cancel_once(self):
Chris Liechtiedb07142016-05-24 00:05:06 +020042 """Cancel read"""
Chris Liechti9cbd6462016-05-17 22:49:44 +020043 threading.Timer(1, self._cancel, ((1,))).start()
Chris Liechtiedb07142016-05-24 00:05:06 +020044 t1 = time.time()
45 self.s.read(1000)
46 t2 = time.time()
47 self.assertEqual(self.cancel_called, 1)
Chris Liechti8c05ebf2016-10-05 03:32:36 +020048 self.assertTrue(0.5 < (t2 - t1) < 2.5, 'Function did not return in time: {}'.format(t2 - t1))
Chris Liechti9cbd6462016-05-17 22:49:44 +020049 #~ self.assertTrue(not self.s.isOpen())
50 #~ self.assertRaises(serial.SerialException, self.s.open)
51
52 #~ def test_cancel_before_read(self):
53 #~ self.s.cancel_read()
54 #~ self.s.read()
55
56
Chris Liechti77d922b2016-05-26 17:14:58 +020057DATA = b'#' * 1024
58
Chris Liechti9cbd6462016-05-17 22:49:44 +020059
60@unittest.skipIf(not hasattr(serial.Serial, 'cancel_write'), "cancel_read not supported on platform")
61class TestCancelWrite(unittest.TestCase):
62 """Test cancel_write functionality"""
63
64 def setUp(self):
65 # create a closed serial port
Chris Liechtiedb07142016-05-24 00:05:06 +020066 self.s = serial.serial_for_url(PORT, baudrate=300) # extra slow ~30B/s => 1kb ~ 34s
Chris Liechti9cbd6462016-05-17 22:49:44 +020067 self.assertTrue(hasattr(self.s, 'cancel_write'), "serial instance has no cancel_write")
Chris Liechtiedb07142016-05-24 00:05:06 +020068 self.s.write_timeout = 10
69 self.cancel_called = 0
70
71 def tearDown(self):
72 self.s.reset_output_buffer()
73 # not all USB-Serial adapters will actually flush the output (maybe
74 # keeping the buffer in the MCU in the adapter) therefore, speed up by
75 # changing the baudrate
76 self.s.baudrate = 115200
77 self.s.flush()
78 self.s.close()
Chris Liechti9cbd6462016-05-17 22:49:44 +020079
80 def _cancel(self, num_times):
81 for i in range(num_times):
Chris Liechtiedb07142016-05-24 00:05:06 +020082 self.cancel_called += 1
Chris Liechti4dcd0252016-05-25 01:49:44 +020083 self.s.cancel_write()
Chris Liechti9cbd6462016-05-17 22:49:44 +020084
85 def test_cancel_once(self):
Chris Liechtiedb07142016-05-24 00:05:06 +020086 """Cancel write"""
Chris Liechti9cbd6462016-05-17 22:49:44 +020087 threading.Timer(1, self._cancel, ((1,))).start()
Chris Liechtiedb07142016-05-24 00:05:06 +020088 t1 = time.time()
Chris Liechti9cbd6462016-05-17 22:49:44 +020089 self.s.write(DATA)
Chris Liechtiedb07142016-05-24 00:05:06 +020090 t2 = time.time()
91 self.assertEqual(self.cancel_called, 1)
Chris Liechti8c05ebf2016-10-05 03:32:36 +020092 self.assertTrue(0.5 < (t2 - t1) < 2.5, 'Function did not return in time: {}'.format(t2 - t1))
Chris Liechti9cbd6462016-05-17 22:49:44 +020093 #~ self.assertTrue(not self.s.isOpen())
94 #~ self.assertRaises(serial.SerialException, self.s.open)
95
96 #~ def test_cancel_before_write(self):
97 #~ self.s.cancel_write()
98 #~ self.s.write(DATA)
99 #~ self.s.reset_output_buffer()
100
101
102if __name__ == '__main__':
Chris Liechti9cbd6462016-05-17 22:49:44 +0200103 sys.stdout.write(__doc__)
104 if len(sys.argv) > 1:
105 PORT = sys.argv[1]
Chris Liechti3debab22016-06-20 22:52:22 +0200106 sys.stdout.write("Testing port: {!r}\n".format(PORT))
Chris Liechti9cbd6462016-05-17 22:49:44 +0200107 sys.argv[1:] = ['-v']
108 # When this module is executed from the command-line, it runs all its tests
109 unittest.main()