blob: 4250888377c83e432cc74c12eca431537d22aa4f [file] [log] [blame]
Petri Lehtinenf8547992012-02-02 17:17:36 +02001#-*- coding: iso-8859-1 -*-
Thomas Wouters49fd7fa2006-04-21 10:40:58 +00002# pysqlite2/test/hooks.py: tests for various SQLite-specific hooks
3#
Gerhard Häringe7ea7452008-03-29 00:45:29 +00004# Copyright (C) 2006-2007 Gerhard Häring <gh@ghaering.de>
Thomas Wouters49fd7fa2006-04-21 10:40:58 +00005#
6# This file is part of pysqlite.
7#
8# This software is provided 'as-is', without any express or implied
9# warranty. In no event will the authors be held liable for any damages
10# arising from the use of this software.
11#
12# Permission is granted to anyone to use this software for any purpose,
13# including commercial applications, and to alter it and redistribute it
14# freely, subject to the following restrictions:
15#
16# 1. The origin of this software must not be misrepresented; you must not
17# claim that you wrote the original software. If you use this software
18# in a product, an acknowledgment in the product documentation would be
19# appreciated but is not required.
20# 2. Altered source versions must be plainly marked as such, and must not be
21# misrepresented as being the original software.
22# 3. This notice may not be removed or altered from any source distribution.
23
Christian Heimes05e8be12008-02-23 18:30:17 +000024import unittest
Thomas Wouters477c8d52006-05-27 19:21:47 +000025import sqlite3 as sqlite
Thomas Wouters49fd7fa2006-04-21 10:40:58 +000026
Hai Shi598a9512020-08-07 23:18:38 +080027from test.support.os_helper import TESTFN, unlink
28
Aviv Palivoda0e6cb2e2017-04-09 12:11:59 +030029
Thomas Wouters49fd7fa2006-04-21 10:40:58 +000030class CollationTests(unittest.TestCase):
Erlend Egeberg Aasland849e3392021-01-07 01:05:07 +010031 def test_create_collation_not_string(self):
Serhiy Storchaka407ac472016-09-27 00:10:03 +030032 con = sqlite.connect(":memory:")
33 with self.assertRaises(TypeError):
34 con.create_collation(None, lambda x, y: (x > y) - (x < y))
35
Erlend Egeberg Aasland849e3392021-01-07 01:05:07 +010036 def test_create_collation_not_callable(self):
Thomas Wouters49fd7fa2006-04-21 10:40:58 +000037 con = sqlite.connect(":memory:")
Berker Peksag1003b342016-06-12 22:34:49 +030038 with self.assertRaises(TypeError) as cm:
Thomas Wouters49fd7fa2006-04-21 10:40:58 +000039 con.create_collation("X", 42)
Berker Peksag1003b342016-06-12 22:34:49 +030040 self.assertEqual(str(cm.exception), 'parameter must be callable')
Thomas Wouters49fd7fa2006-04-21 10:40:58 +000041
Erlend Egeberg Aasland849e3392021-01-07 01:05:07 +010042 def test_create_collation_not_ascii(self):
Thomas Wouters49fd7fa2006-04-21 10:40:58 +000043 con = sqlite.connect(":memory:")
Berker Peksag1003b342016-06-12 22:34:49 +030044 with self.assertRaises(sqlite.ProgrammingError):
Mark Dickinsona56c4672009-01-27 18:17:45 +000045 con.create_collation("collä", lambda x, y: (x > y) - (x < y))
Thomas Wouters49fd7fa2006-04-21 10:40:58 +000046
Erlend Egeberg Aasland849e3392021-01-07 01:05:07 +010047 def test_create_collation_bad_upper(self):
Serhiy Storchaka407ac472016-09-27 00:10:03 +030048 class BadUpperStr(str):
49 def upper(self):
50 return None
51 con = sqlite.connect(":memory:")
52 mycoll = lambda x, y: -((x > y) - (x < y))
53 con.create_collation(BadUpperStr("mycoll"), mycoll)
54 result = con.execute("""
55 select x from (
56 select 'a' as x
57 union
58 select 'b' as x
59 ) order by x collate mycoll
60 """).fetchall()
61 self.assertEqual(result[0][0], 'b')
62 self.assertEqual(result[1][0], 'a')
63
Erlend Egeberg Aasland849e3392021-01-07 01:05:07 +010064 def test_collation_is_used(self):
Thomas Wouters49fd7fa2006-04-21 10:40:58 +000065 def mycoll(x, y):
66 # reverse order
Mark Dickinsona56c4672009-01-27 18:17:45 +000067 return -((x > y) - (x < y))
Thomas Wouters49fd7fa2006-04-21 10:40:58 +000068
69 con = sqlite.connect(":memory:")
70 con.create_collation("mycoll", mycoll)
71 sql = """
72 select x from (
73 select 'a' as x
74 union
75 select 'b' as x
76 union
77 select 'c' as x
78 ) order by x collate mycoll
79 """
80 result = con.execute(sql).fetchall()
Berker Peksag48b5c982016-06-14 00:42:50 +030081 self.assertEqual(result, [('c',), ('b',), ('a',)],
82 msg='the expected order was not returned')
Thomas Wouters49fd7fa2006-04-21 10:40:58 +000083
84 con.create_collation("mycoll", None)
Berker Peksag1003b342016-06-12 22:34:49 +030085 with self.assertRaises(sqlite.OperationalError) as cm:
Thomas Wouters49fd7fa2006-04-21 10:40:58 +000086 result = con.execute(sql).fetchall()
Berker Peksag1003b342016-06-12 22:34:49 +030087 self.assertEqual(str(cm.exception), 'no such collation sequence: mycoll')
Thomas Wouters49fd7fa2006-04-21 10:40:58 +000088
Erlend Egeberg Aasland849e3392021-01-07 01:05:07 +010089 def test_collation_returns_large_integer(self):
Serhiy Storchaka3cf96ac2013-02-07 17:01:47 +020090 def mycoll(x, y):
91 # reverse order
92 return -((x > y) - (x < y)) * 2**32
93 con = sqlite.connect(":memory:")
94 con.create_collation("mycoll", mycoll)
95 sql = """
96 select x from (
97 select 'a' as x
98 union
99 select 'b' as x
100 union
101 select 'c' as x
102 ) order by x collate mycoll
103 """
104 result = con.execute(sql).fetchall()
105 self.assertEqual(result, [('c',), ('b',), ('a',)],
106 msg="the expected order was not returned")
107
Erlend Egeberg Aasland849e3392021-01-07 01:05:07 +0100108 def test_collation_register_twice(self):
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000109 """
110 Register two different collation functions under the same name.
111 Verify that the last one is actually used.
112 """
113 con = sqlite.connect(":memory:")
Mark Dickinsona56c4672009-01-27 18:17:45 +0000114 con.create_collation("mycoll", lambda x, y: (x > y) - (x < y))
115 con.create_collation("mycoll", lambda x, y: -((x > y) - (x < y)))
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000116 result = con.execute("""
117 select x from (select 'a' as x union select 'b' as x) order by x collate mycoll
118 """).fetchall()
Berker Peksag1003b342016-06-12 22:34:49 +0300119 self.assertEqual(result[0][0], 'b')
120 self.assertEqual(result[1][0], 'a')
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000121
Erlend Egeberg Aasland849e3392021-01-07 01:05:07 +0100122 def test_deregister_collation(self):
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000123 """
124 Register a collation, then deregister it. Make sure an error is raised if we try
125 to use it.
126 """
127 con = sqlite.connect(":memory:")
Mark Dickinsona56c4672009-01-27 18:17:45 +0000128 con.create_collation("mycoll", lambda x, y: (x > y) - (x < y))
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000129 con.create_collation("mycoll", None)
Berker Peksag1003b342016-06-12 22:34:49 +0300130 with self.assertRaises(sqlite.OperationalError) as cm:
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000131 con.execute("select 'a' as x union select 'b' as x order by x collate mycoll")
Berker Peksag1003b342016-06-12 22:34:49 +0300132 self.assertEqual(str(cm.exception), 'no such collation sequence: mycoll')
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000133
Gerhard Häringe7ea7452008-03-29 00:45:29 +0000134class ProgressTests(unittest.TestCase):
Erlend Egeberg Aasland849e3392021-01-07 01:05:07 +0100135 def test_progress_handler_used(self):
Gerhard Häringe7ea7452008-03-29 00:45:29 +0000136 """
137 Test that the progress handler is invoked once it is set.
138 """
139 con = sqlite.connect(":memory:")
140 progress_calls = []
141 def progress():
142 progress_calls.append(None)
143 return 0
144 con.set_progress_handler(progress, 1)
145 con.execute("""
146 create table foo(a, b)
147 """)
Gregory P. Smith04cecaf2009-07-04 08:32:15 +0000148 self.assertTrue(progress_calls)
Gerhard Häringe7ea7452008-03-29 00:45:29 +0000149
150
Erlend Egeberg Aasland849e3392021-01-07 01:05:07 +0100151 def test_opcode_count(self):
Gerhard Häringe7ea7452008-03-29 00:45:29 +0000152 """
153 Test that the opcode argument is respected.
154 """
155 con = sqlite.connect(":memory:")
156 progress_calls = []
157 def progress():
158 progress_calls.append(None)
159 return 0
160 con.set_progress_handler(progress, 1)
161 curs = con.cursor()
162 curs.execute("""
163 create table foo (a, b)
164 """)
165 first_count = len(progress_calls)
166 progress_calls = []
167 con.set_progress_handler(progress, 2)
168 curs.execute("""
169 create table bar (a, b)
170 """)
171 second_count = len(progress_calls)
Benjamin Peterson966f2fc2014-03-12 21:51:52 -0500172 self.assertGreaterEqual(first_count, second_count)
Gerhard Häringe7ea7452008-03-29 00:45:29 +0000173
Erlend Egeberg Aasland849e3392021-01-07 01:05:07 +0100174 def test_cancel_operation(self):
Gerhard Häringe7ea7452008-03-29 00:45:29 +0000175 """
176 Test that returning a non-zero value stops the operation in progress.
177 """
178 con = sqlite.connect(":memory:")
Gerhard Häringe7ea7452008-03-29 00:45:29 +0000179 def progress():
Gerhard Häringe7ea7452008-03-29 00:45:29 +0000180 return 1
181 con.set_progress_handler(progress, 1)
182 curs = con.cursor()
183 self.assertRaises(
184 sqlite.OperationalError,
185 curs.execute,
186 "create table bar (a, b)")
187
Erlend Egeberg Aasland849e3392021-01-07 01:05:07 +0100188 def test_clear_handler(self):
Gerhard Häringe7ea7452008-03-29 00:45:29 +0000189 """
190 Test that setting the progress handler to None clears the previously set handler.
191 """
192 con = sqlite.connect(":memory:")
193 action = 0
194 def progress():
Petri Lehtinenc86d9e22012-02-17 21:30:55 +0200195 nonlocal action
Gerhard Häringe7ea7452008-03-29 00:45:29 +0000196 action = 1
197 return 0
198 con.set_progress_handler(progress, 1)
199 con.set_progress_handler(None, 1)
200 con.execute("select 1 union select 2 union select 3").fetchall()
Gregory P. Smith04cecaf2009-07-04 08:32:15 +0000201 self.assertEqual(action, 0, "progress handler was not cleared")
Gerhard Häringe7ea7452008-03-29 00:45:29 +0000202
Antoine Pitrou5bfa0622011-04-04 00:12:04 +0200203class TraceCallbackTests(unittest.TestCase):
Erlend Egeberg Aasland849e3392021-01-07 01:05:07 +0100204 def test_trace_callback_used(self):
Antoine Pitrou5bfa0622011-04-04 00:12:04 +0200205 """
206 Test that the trace callback is invoked once it is set.
207 """
208 con = sqlite.connect(":memory:")
209 traced_statements = []
210 def trace(statement):
211 traced_statements.append(statement)
212 con.set_trace_callback(trace)
213 con.execute("create table foo(a, b)")
214 self.assertTrue(traced_statements)
215 self.assertTrue(any("create table foo" in stmt for stmt in traced_statements))
216
Erlend Egeberg Aasland849e3392021-01-07 01:05:07 +0100217 def test_clear_trace_callback(self):
Antoine Pitrou5bfa0622011-04-04 00:12:04 +0200218 """
219 Test that setting the trace callback to None clears the previously set callback.
220 """
221 con = sqlite.connect(":memory:")
222 traced_statements = []
223 def trace(statement):
224 traced_statements.append(statement)
225 con.set_trace_callback(trace)
226 con.set_trace_callback(None)
227 con.execute("create table foo(a, b)")
228 self.assertFalse(traced_statements, "trace callback was not cleared")
229
Erlend Egeberg Aasland849e3392021-01-07 01:05:07 +0100230 def test_unicode_content(self):
Antoine Pitrou5bfa0622011-04-04 00:12:04 +0200231 """
232 Test that the statement can contain unicode literals.
233 """
234 unicode_value = '\xf6\xe4\xfc\xd6\xc4\xdc\xdf\u20ac'
235 con = sqlite.connect(":memory:")
236 traced_statements = []
237 def trace(statement):
238 traced_statements.append(statement)
239 con.set_trace_callback(trace)
240 con.execute("create table foo(x)")
Antoine Pitrouf4e18102011-04-04 01:50:50 +0200241 con.execute('insert into foo(x) values ("%s")' % unicode_value)
Antoine Pitrou5bfa0622011-04-04 00:12:04 +0200242 con.commit()
243 self.assertTrue(any(unicode_value in stmt for stmt in traced_statements),
Antoine Pitrou43b21682011-04-04 00:50:01 +0200244 "Unicode data %s garbled in trace callback: %s"
245 % (ascii(unicode_value), ', '.join(map(ascii, traced_statements))))
Antoine Pitrou5bfa0622011-04-04 00:12:04 +0200246
Erlend Egeberg Aasland849e3392021-01-07 01:05:07 +0100247 def test_trace_callback_content(self):
Aviv Palivoda0e6cb2e2017-04-09 12:11:59 +0300248 # set_trace_callback() shouldn't produce duplicate content (bpo-26187)
249 traced_statements = []
250 def trace(statement):
251 traced_statements.append(statement)
252
253 queries = ["create table foo(x)",
254 "insert into foo(x) values(1)"]
255 self.addCleanup(unlink, TESTFN)
256 con1 = sqlite.connect(TESTFN, isolation_level=None)
257 con2 = sqlite.connect(TESTFN)
258 con1.set_trace_callback(trace)
259 cur = con1.cursor()
260 cur.execute(queries[0])
261 con2.execute("create table bar(x)")
262 cur.execute(queries[1])
263 self.assertEqual(traced_statements, queries)
Antoine Pitrou5bfa0622011-04-04 00:12:04 +0200264
265
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000266def suite():
Erlend Egeberg Aasland849e3392021-01-07 01:05:07 +0100267 tests = [
268 CollationTests,
269 ProgressTests,
270 TraceCallbackTests,
271 ]
272 return unittest.TestSuite(
273 [unittest.TestLoader().loadTestsFromTestCase(t) for t in tests]
274 )
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000275
276def test():
277 runner = unittest.TextTestRunner()
278 runner.run(suite())
279
280if __name__ == "__main__":
281 test()