blob: dbccc5f7174094f78c6d1d6f06b26a3f43979469 [file] [log] [blame]
Chris Liechti9cbd6462016-05-17 22:49:44 +02001#!/usr/bin/env python
2#
3# This file is part of pySerial - Cross platform serial port support for Python
4# (C) 2016 Chris Liechti <cliechti@gmx.net>
5#
6# SPDX-License-Identifier: BSD-3-Clause
7"""
8Test cancel functionality.
9"""
10
11import os
12import sys
13import unittest
14import threading
15import serial
16
17# on which port should the tests be performed:
18PORT = 'loop://'
19
20@unittest.skipIf(not hasattr(serial.Serial, 'cancel_read'), "cancel_read not supported on platform")
21class TestCancelRead(unittest.TestCase):
22 """Test cancel_read functionality"""
23
24 def setUp(self):
25 # create a closed serial port
26 self.s = serial.serial_for_url(PORT)
27 self.assertTrue(hasattr(self.s, 'cancel_read'), "serial instance has no cancel_read")
28 #~ self.s.timeout = 10
29
30 def _cancel(self, num_times):
31 for i in range(num_times):
32 #~ print "cancel"
33 self.s.cancel_read()
34
35 def test_cancel_once(self):
36 threading.Timer(1, self._cancel, ((1,))).start()
37 self.s.read()
38 #~ self.assertTrue(not self.s.isOpen())
39 #~ self.assertRaises(serial.SerialException, self.s.open)
40
41 #~ def test_cancel_before_read(self):
42 #~ self.s.cancel_read()
43 #~ self.s.read()
44
45
46DATA = b'#'*1024
47
48@unittest.skipIf(not hasattr(serial.Serial, 'cancel_write'), "cancel_read not supported on platform")
49class TestCancelWrite(unittest.TestCase):
50 """Test cancel_write functionality"""
51
52 def setUp(self):
53 # create a closed serial port
54 self.s = serial.serial_for_url(PORT, baudrate=50) # extra slow
55 self.assertTrue(hasattr(self.s, 'cancel_write'), "serial instance has no cancel_write")
56 #~ self.s.write_timeout = 10
57
58 def _cancel(self, num_times):
59 for i in range(num_times):
60 self.s.cancel_write()
61
62 def test_cancel_once(self):
63 threading.Timer(1, self._cancel, ((1,))).start()
64 self.s.write(DATA)
65 self.s.reset_output_buffer()
66 #~ self.assertTrue(not self.s.isOpen())
67 #~ self.assertRaises(serial.SerialException, self.s.open)
68
69 #~ def test_cancel_before_write(self):
70 #~ self.s.cancel_write()
71 #~ self.s.write(DATA)
72 #~ self.s.reset_output_buffer()
73
74
75if __name__ == '__main__':
76 import sys
77 sys.stdout.write(__doc__)
78 if len(sys.argv) > 1:
79 PORT = sys.argv[1]
80 sys.stdout.write("Testing port: %r\n" % PORT)
81 sys.argv[1:] = ['-v']
82 # When this module is executed from the command-line, it runs all its tests
83 unittest.main()