blob: b08adf1d8097b3936a3b31ed56292b9c79c32e4f [file] [log] [blame]
Petri Lehtinenf8547992012-02-02 17:17:36 +02001#-*- coding: iso-8859-1 -*-
Thomas Wouters49fd7fa2006-04-21 10:40:58 +00002# pysqlite2/test/hooks.py: tests for various SQLite-specific hooks
3#
Gerhard Häringe7ea7452008-03-29 00:45:29 +00004# Copyright (C) 2006-2007 Gerhard Häring <gh@ghaering.de>
Thomas Wouters49fd7fa2006-04-21 10:40:58 +00005#
6# This file is part of pysqlite.
7#
8# This software is provided 'as-is', without any express or implied
9# warranty. In no event will the authors be held liable for any damages
10# arising from the use of this software.
11#
12# Permission is granted to anyone to use this software for any purpose,
13# including commercial applications, and to alter it and redistribute it
14# freely, subject to the following restrictions:
15#
16# 1. The origin of this software must not be misrepresented; you must not
17# claim that you wrote the original software. If you use this software
18# in a product, an acknowledgment in the product documentation would be
19# appreciated but is not required.
20# 2. Altered source versions must be plainly marked as such, and must not be
21# misrepresented as being the original software.
22# 3. This notice may not be removed or altered from any source distribution.
23
Christian Heimes05e8be12008-02-23 18:30:17 +000024import unittest
Thomas Wouters477c8d52006-05-27 19:21:47 +000025import sqlite3 as sqlite
Thomas Wouters49fd7fa2006-04-21 10:40:58 +000026
Hai Shi598a9512020-08-07 23:18:38 +080027from test.support.os_helper import TESTFN, unlink
28
Aviv Palivoda0e6cb2e2017-04-09 12:11:59 +030029
Thomas Wouters49fd7fa2006-04-21 10:40:58 +000030class CollationTests(unittest.TestCase):
Serhiy Storchaka407ac472016-09-27 00:10:03 +030031 def CheckCreateCollationNotString(self):
32 con = sqlite.connect(":memory:")
33 with self.assertRaises(TypeError):
34 con.create_collation(None, lambda x, y: (x > y) - (x < y))
35
Thomas Wouters49fd7fa2006-04-21 10:40:58 +000036 def CheckCreateCollationNotCallable(self):
37 con = sqlite.connect(":memory:")
Berker Peksag1003b342016-06-12 22:34:49 +030038 with self.assertRaises(TypeError) as cm:
Thomas Wouters49fd7fa2006-04-21 10:40:58 +000039 con.create_collation("X", 42)
Berker Peksag1003b342016-06-12 22:34:49 +030040 self.assertEqual(str(cm.exception), 'parameter must be callable')
Thomas Wouters49fd7fa2006-04-21 10:40:58 +000041
42 def CheckCreateCollationNotAscii(self):
43 con = sqlite.connect(":memory:")
Berker Peksag1003b342016-06-12 22:34:49 +030044 with self.assertRaises(sqlite.ProgrammingError):
Mark Dickinsona56c4672009-01-27 18:17:45 +000045 con.create_collation("collä", lambda x, y: (x > y) - (x < y))
Thomas Wouters49fd7fa2006-04-21 10:40:58 +000046
Serhiy Storchaka407ac472016-09-27 00:10:03 +030047 def CheckCreateCollationBadUpper(self):
48 class BadUpperStr(str):
49 def upper(self):
50 return None
51 con = sqlite.connect(":memory:")
52 mycoll = lambda x, y: -((x > y) - (x < y))
53 con.create_collation(BadUpperStr("mycoll"), mycoll)
54 result = con.execute("""
55 select x from (
56 select 'a' as x
57 union
58 select 'b' as x
59 ) order by x collate mycoll
60 """).fetchall()
61 self.assertEqual(result[0][0], 'b')
62 self.assertEqual(result[1][0], 'a')
63
R David Murray3f7beb92013-01-10 20:18:21 -050064 @unittest.skipIf(sqlite.sqlite_version_info < (3, 2, 1),
65 'old SQLite versions crash on this test')
Thomas Wouters49fd7fa2006-04-21 10:40:58 +000066 def CheckCollationIsUsed(self):
67 def mycoll(x, y):
68 # reverse order
Mark Dickinsona56c4672009-01-27 18:17:45 +000069 return -((x > y) - (x < y))
Thomas Wouters49fd7fa2006-04-21 10:40:58 +000070
71 con = sqlite.connect(":memory:")
72 con.create_collation("mycoll", mycoll)
73 sql = """
74 select x from (
75 select 'a' as x
76 union
77 select 'b' as x
78 union
79 select 'c' as x
80 ) order by x collate mycoll
81 """
82 result = con.execute(sql).fetchall()
Berker Peksag48b5c982016-06-14 00:42:50 +030083 self.assertEqual(result, [('c',), ('b',), ('a',)],
84 msg='the expected order was not returned')
Thomas Wouters49fd7fa2006-04-21 10:40:58 +000085
86 con.create_collation("mycoll", None)
Berker Peksag1003b342016-06-12 22:34:49 +030087 with self.assertRaises(sqlite.OperationalError) as cm:
Thomas Wouters49fd7fa2006-04-21 10:40:58 +000088 result = con.execute(sql).fetchall()
Berker Peksag1003b342016-06-12 22:34:49 +030089 self.assertEqual(str(cm.exception), 'no such collation sequence: mycoll')
Thomas Wouters49fd7fa2006-04-21 10:40:58 +000090
Serhiy Storchaka3cf96ac2013-02-07 17:01:47 +020091 def CheckCollationReturnsLargeInteger(self):
92 def mycoll(x, y):
93 # reverse order
94 return -((x > y) - (x < y)) * 2**32
95 con = sqlite.connect(":memory:")
96 con.create_collation("mycoll", mycoll)
97 sql = """
98 select x from (
99 select 'a' as x
100 union
101 select 'b' as x
102 union
103 select 'c' as x
104 ) order by x collate mycoll
105 """
106 result = con.execute(sql).fetchall()
107 self.assertEqual(result, [('c',), ('b',), ('a',)],
108 msg="the expected order was not returned")
109
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000110 def CheckCollationRegisterTwice(self):
111 """
112 Register two different collation functions under the same name.
113 Verify that the last one is actually used.
114 """
115 con = sqlite.connect(":memory:")
Mark Dickinsona56c4672009-01-27 18:17:45 +0000116 con.create_collation("mycoll", lambda x, y: (x > y) - (x < y))
117 con.create_collation("mycoll", lambda x, y: -((x > y) - (x < y)))
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000118 result = con.execute("""
119 select x from (select 'a' as x union select 'b' as x) order by x collate mycoll
120 """).fetchall()
Berker Peksag1003b342016-06-12 22:34:49 +0300121 self.assertEqual(result[0][0], 'b')
122 self.assertEqual(result[1][0], 'a')
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000123
124 def CheckDeregisterCollation(self):
125 """
126 Register a collation, then deregister it. Make sure an error is raised if we try
127 to use it.
128 """
129 con = sqlite.connect(":memory:")
Mark Dickinsona56c4672009-01-27 18:17:45 +0000130 con.create_collation("mycoll", lambda x, y: (x > y) - (x < y))
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000131 con.create_collation("mycoll", None)
Berker Peksag1003b342016-06-12 22:34:49 +0300132 with self.assertRaises(sqlite.OperationalError) as cm:
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000133 con.execute("select 'a' as x union select 'b' as x order by x collate mycoll")
Berker Peksag1003b342016-06-12 22:34:49 +0300134 self.assertEqual(str(cm.exception), 'no such collation sequence: mycoll')
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000135
Gerhard Häringe7ea7452008-03-29 00:45:29 +0000136class ProgressTests(unittest.TestCase):
137 def CheckProgressHandlerUsed(self):
138 """
139 Test that the progress handler is invoked once it is set.
140 """
141 con = sqlite.connect(":memory:")
142 progress_calls = []
143 def progress():
144 progress_calls.append(None)
145 return 0
146 con.set_progress_handler(progress, 1)
147 con.execute("""
148 create table foo(a, b)
149 """)
Gregory P. Smith04cecaf2009-07-04 08:32:15 +0000150 self.assertTrue(progress_calls)
Gerhard Häringe7ea7452008-03-29 00:45:29 +0000151
152
153 def CheckOpcodeCount(self):
154 """
155 Test that the opcode argument is respected.
156 """
157 con = sqlite.connect(":memory:")
158 progress_calls = []
159 def progress():
160 progress_calls.append(None)
161 return 0
162 con.set_progress_handler(progress, 1)
163 curs = con.cursor()
164 curs.execute("""
165 create table foo (a, b)
166 """)
167 first_count = len(progress_calls)
168 progress_calls = []
169 con.set_progress_handler(progress, 2)
170 curs.execute("""
171 create table bar (a, b)
172 """)
173 second_count = len(progress_calls)
Benjamin Peterson966f2fc2014-03-12 21:51:52 -0500174 self.assertGreaterEqual(first_count, second_count)
Gerhard Häringe7ea7452008-03-29 00:45:29 +0000175
176 def CheckCancelOperation(self):
177 """
178 Test that returning a non-zero value stops the operation in progress.
179 """
180 con = sqlite.connect(":memory:")
Gerhard Häringe7ea7452008-03-29 00:45:29 +0000181 def progress():
Gerhard Häringe7ea7452008-03-29 00:45:29 +0000182 return 1
183 con.set_progress_handler(progress, 1)
184 curs = con.cursor()
185 self.assertRaises(
186 sqlite.OperationalError,
187 curs.execute,
188 "create table bar (a, b)")
189
190 def CheckClearHandler(self):
191 """
192 Test that setting the progress handler to None clears the previously set handler.
193 """
194 con = sqlite.connect(":memory:")
195 action = 0
196 def progress():
Petri Lehtinenc86d9e22012-02-17 21:30:55 +0200197 nonlocal action
Gerhard Häringe7ea7452008-03-29 00:45:29 +0000198 action = 1
199 return 0
200 con.set_progress_handler(progress, 1)
201 con.set_progress_handler(None, 1)
202 con.execute("select 1 union select 2 union select 3").fetchall()
Gregory P. Smith04cecaf2009-07-04 08:32:15 +0000203 self.assertEqual(action, 0, "progress handler was not cleared")
Gerhard Häringe7ea7452008-03-29 00:45:29 +0000204
Antoine Pitrou5bfa0622011-04-04 00:12:04 +0200205class TraceCallbackTests(unittest.TestCase):
206 def CheckTraceCallbackUsed(self):
207 """
208 Test that the trace callback is invoked once it is set.
209 """
210 con = sqlite.connect(":memory:")
211 traced_statements = []
212 def trace(statement):
213 traced_statements.append(statement)
214 con.set_trace_callback(trace)
215 con.execute("create table foo(a, b)")
216 self.assertTrue(traced_statements)
217 self.assertTrue(any("create table foo" in stmt for stmt in traced_statements))
218
219 def CheckClearTraceCallback(self):
220 """
221 Test that setting the trace callback to None clears the previously set callback.
222 """
223 con = sqlite.connect(":memory:")
224 traced_statements = []
225 def trace(statement):
226 traced_statements.append(statement)
227 con.set_trace_callback(trace)
228 con.set_trace_callback(None)
229 con.execute("create table foo(a, b)")
230 self.assertFalse(traced_statements, "trace callback was not cleared")
231
232 def CheckUnicodeContent(self):
233 """
234 Test that the statement can contain unicode literals.
235 """
236 unicode_value = '\xf6\xe4\xfc\xd6\xc4\xdc\xdf\u20ac'
237 con = sqlite.connect(":memory:")
238 traced_statements = []
239 def trace(statement):
240 traced_statements.append(statement)
241 con.set_trace_callback(trace)
242 con.execute("create table foo(x)")
Antoine Pitrouf4e18102011-04-04 01:50:50 +0200243 # Can't execute bound parameters as their values don't appear
244 # in traced statements before SQLite 3.6.21
245 # (cf. http://www.sqlite.org/draft/releaselog/3_6_21.html)
246 con.execute('insert into foo(x) values ("%s")' % unicode_value)
Antoine Pitrou5bfa0622011-04-04 00:12:04 +0200247 con.commit()
248 self.assertTrue(any(unicode_value in stmt for stmt in traced_statements),
Antoine Pitrou43b21682011-04-04 00:50:01 +0200249 "Unicode data %s garbled in trace callback: %s"
250 % (ascii(unicode_value), ', '.join(map(ascii, traced_statements))))
Antoine Pitrou5bfa0622011-04-04 00:12:04 +0200251
Aviv Palivoda0e6cb2e2017-04-09 12:11:59 +0300252 @unittest.skipIf(sqlite.sqlite_version_info < (3, 3, 9), "sqlite3_prepare_v2 is not available")
253 def CheckTraceCallbackContent(self):
254 # set_trace_callback() shouldn't produce duplicate content (bpo-26187)
255 traced_statements = []
256 def trace(statement):
257 traced_statements.append(statement)
258
259 queries = ["create table foo(x)",
260 "insert into foo(x) values(1)"]
261 self.addCleanup(unlink, TESTFN)
262 con1 = sqlite.connect(TESTFN, isolation_level=None)
263 con2 = sqlite.connect(TESTFN)
264 con1.set_trace_callback(trace)
265 cur = con1.cursor()
266 cur.execute(queries[0])
267 con2.execute("create table bar(x)")
268 cur.execute(queries[1])
269 self.assertEqual(traced_statements, queries)
Antoine Pitrou5bfa0622011-04-04 00:12:04 +0200270
271
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000272def suite():
273 collation_suite = unittest.makeSuite(CollationTests, "Check")
Gerhard Häringe7ea7452008-03-29 00:45:29 +0000274 progress_suite = unittest.makeSuite(ProgressTests, "Check")
Antoine Pitrou5bfa0622011-04-04 00:12:04 +0200275 trace_suite = unittest.makeSuite(TraceCallbackTests, "Check")
276 return unittest.TestSuite((collation_suite, progress_suite, trace_suite))
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000277
278def test():
279 runner = unittest.TextTestRunner()
280 runner.run(suite())
281
282if __name__ == "__main__":
283 test()