blob: 5f30aeb92ae9272157250ad9b45ae56b9418b0a3 [file] [log] [blame]
Chris Liechti9cbd6462016-05-17 22:49:44 +02001#!/usr/bin/env python
2#
3# This file is part of pySerial - Cross platform serial port support for Python
4# (C) 2016 Chris Liechti <cliechti@gmx.net>
5#
6# SPDX-License-Identifier: BSD-3-Clause
7"""
8Test cancel functionality.
9"""
10
11import os
12import sys
13import unittest
14import threading
Chris Liechtiedb07142016-05-24 00:05:06 +020015import time
Chris Liechti9cbd6462016-05-17 22:49:44 +020016import serial
17
18# on which port should the tests be performed:
19PORT = 'loop://'
20
21@unittest.skipIf(not hasattr(serial.Serial, 'cancel_read'), "cancel_read not supported on platform")
22class TestCancelRead(unittest.TestCase):
23 """Test cancel_read functionality"""
24
25 def setUp(self):
26 # create a closed serial port
27 self.s = serial.serial_for_url(PORT)
28 self.assertTrue(hasattr(self.s, 'cancel_read'), "serial instance has no cancel_read")
Chris Liechtiedb07142016-05-24 00:05:06 +020029 self.s.timeout = 10
30 self.cancel_called = 0
31
32 def tearDown(self):
33 self.s.reset_output_buffer()
34 self.s.close()
Chris Liechti9cbd6462016-05-17 22:49:44 +020035
36 def _cancel(self, num_times):
37 for i in range(num_times):
38 #~ print "cancel"
39 self.s.cancel_read()
Chris Liechtiedb07142016-05-24 00:05:06 +020040 self.cancel_called += 1
Chris Liechti9cbd6462016-05-17 22:49:44 +020041
42 def test_cancel_once(self):
Chris Liechtiedb07142016-05-24 00:05:06 +020043 """Cancel read"""
Chris Liechti9cbd6462016-05-17 22:49:44 +020044 threading.Timer(1, self._cancel, ((1,))).start()
Chris Liechtiedb07142016-05-24 00:05:06 +020045 t1 = time.time()
46 self.s.read(1000)
47 t2 = time.time()
48 self.assertEqual(self.cancel_called, 1)
49 self.assertTrue(0.5 < (t2 - t1) < 2, 'Function did not return in time: {}'.format(t2-t1))
Chris Liechti9cbd6462016-05-17 22:49:44 +020050 #~ self.assertTrue(not self.s.isOpen())
51 #~ self.assertRaises(serial.SerialException, self.s.open)
52
53 #~ def test_cancel_before_read(self):
54 #~ self.s.cancel_read()
55 #~ self.s.read()
56
57
58DATA = b'#'*1024
59
60@unittest.skipIf(not hasattr(serial.Serial, 'cancel_write'), "cancel_read not supported on platform")
61class TestCancelWrite(unittest.TestCase):
62 """Test cancel_write functionality"""
63
64 def setUp(self):
65 # create a closed serial port
Chris Liechtiedb07142016-05-24 00:05:06 +020066 self.s = serial.serial_for_url(PORT, baudrate=300) # extra slow ~30B/s => 1kb ~ 34s
Chris Liechti9cbd6462016-05-17 22:49:44 +020067 self.assertTrue(hasattr(self.s, 'cancel_write'), "serial instance has no cancel_write")
Chris Liechtiedb07142016-05-24 00:05:06 +020068 self.s.write_timeout = 10
69 self.cancel_called = 0
70
71 def tearDown(self):
72 self.s.reset_output_buffer()
73 # not all USB-Serial adapters will actually flush the output (maybe
74 # keeping the buffer in the MCU in the adapter) therefore, speed up by
75 # changing the baudrate
76 self.s.baudrate = 115200
77 self.s.flush()
78 self.s.close()
Chris Liechti9cbd6462016-05-17 22:49:44 +020079
80 def _cancel(self, num_times):
81 for i in range(num_times):
82 self.s.cancel_write()
Chris Liechtiedb07142016-05-24 00:05:06 +020083 self.cancel_called += 1
Chris Liechti9cbd6462016-05-17 22:49:44 +020084
85 def test_cancel_once(self):
Chris Liechtiedb07142016-05-24 00:05:06 +020086 """Cancel write"""
Chris Liechti9cbd6462016-05-17 22:49:44 +020087 threading.Timer(1, self._cancel, ((1,))).start()
Chris Liechtiedb07142016-05-24 00:05:06 +020088 t1 = time.time()
Chris Liechti9cbd6462016-05-17 22:49:44 +020089 self.s.write(DATA)
Chris Liechtiedb07142016-05-24 00:05:06 +020090 t2 = time.time()
91 self.assertEqual(self.cancel_called, 1)
92 self.assertTrue(0.5 < (t2 - t1) < 2, 'Function did not return in time: {}'.format(t2-t1))
Chris Liechti9cbd6462016-05-17 22:49:44 +020093 #~ self.assertTrue(not self.s.isOpen())
94 #~ self.assertRaises(serial.SerialException, self.s.open)
95
96 #~ def test_cancel_before_write(self):
97 #~ self.s.cancel_write()
98 #~ self.s.write(DATA)
99 #~ self.s.reset_output_buffer()
100
101
102if __name__ == '__main__':
103 import sys
104 sys.stdout.write(__doc__)
105 if len(sys.argv) > 1:
106 PORT = sys.argv[1]
107 sys.stdout.write("Testing port: %r\n" % PORT)
108 sys.argv[1:] = ['-v']
109 # When this module is executed from the command-line, it runs all its tests
110 unittest.main()