blob: 67557e19c796bb511750ab49be4af040b29551de [file] [log] [blame]
Petri Lehtinen8b945142013-02-23 19:05:09 +01001#-*- coding: iso-8859-1 -*-
Thomas Wouters49fd7fa2006-04-21 10:40:58 +00002# pysqlite2/test/regression.py: pysqlite regression tests
3#
Gerhard Häringf9cee222010-03-05 15:20:03 +00004# Copyright (C) 2006-2010 Gerhard Häring <gh@ghaering.de>
Thomas Wouters49fd7fa2006-04-21 10:40:58 +00005#
6# This file is part of pysqlite.
7#
8# This software is provided 'as-is', without any express or implied
9# warranty. In no event will the authors be held liable for any damages
10# arising from the use of this software.
11#
12# Permission is granted to anyone to use this software for any purpose,
13# including commercial applications, and to alter it and redistribute it
14# freely, subject to the following restrictions:
15#
16# 1. The origin of this software must not be misrepresented; you must not
17# claim that you wrote the original software. If you use this software
18# in a product, an acknowledgment in the product documentation would be
19# appreciated but is not required.
20# 2. Altered source versions must be plainly marked as such, and must not be
21# misrepresented as being the original software.
22# 3. This notice may not be removed or altered from any source distribution.
23
Gerhard Häringe7ea7452008-03-29 00:45:29 +000024import datetime
Thomas Wouters49fd7fa2006-04-21 10:40:58 +000025import unittest
Thomas Wouters477c8d52006-05-27 19:21:47 +000026import sqlite3 as sqlite
Oren Milmane56ab742017-11-07 02:01:47 +020027import weakref
gescheitb9a03762019-07-13 06:15:49 +030028import functools
Oren Milmane56ab742017-11-07 02:01:47 +020029from test import support
Thomas Wouters49fd7fa2006-04-21 10:40:58 +000030
31class RegressionTests(unittest.TestCase):
32 def setUp(self):
33 self.con = sqlite.connect(":memory:")
34
35 def tearDown(self):
36 self.con.close()
37
38 def CheckPragmaUserVersion(self):
39 # This used to crash pysqlite because this pragma command returns NULL for the column name
40 cur = self.con.cursor()
41 cur.execute("pragma user_version")
42
Thomas Wouters477c8d52006-05-27 19:21:47 +000043 def CheckPragmaSchemaVersion(self):
44 # This still crashed pysqlite <= 2.2.1
45 con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_COLNAMES)
46 try:
47 cur = self.con.cursor()
48 cur.execute("pragma schema_version")
49 finally:
50 cur.close()
51 con.close()
52
53 def CheckStatementReset(self):
54 # pysqlite 2.1.0 to 2.2.0 have the problem that not all statements are
55 # reset before a rollback, but only those that are still in the
56 # statement cache. The others are not accessible from the connection object.
57 con = sqlite.connect(":memory:", cached_statements=5)
Guido van Rossum805365e2007-05-07 22:24:25 +000058 cursors = [con.cursor() for x in range(5)]
Thomas Wouters477c8d52006-05-27 19:21:47 +000059 cursors[0].execute("create table test(x)")
60 for i in range(10):
Guido van Rossum805365e2007-05-07 22:24:25 +000061 cursors[0].executemany("insert into test(x) values (?)", [(x,) for x in range(10)])
Thomas Wouters477c8d52006-05-27 19:21:47 +000062
63 for i in range(5):
64 cursors[i].execute(" " * i + "select x from test")
65
66 con.rollback()
67
Thomas Wouters0e3f5912006-08-11 14:57:12 +000068 def CheckColumnNameWithSpaces(self):
69 cur = self.con.cursor()
70 cur.execute('select 1 as "foo bar [datetime]"')
Serhiy Storchakab1465682020-03-21 15:53:28 +020071 self.assertEqual(cur.description[0][0], "foo bar [datetime]")
Thomas Wouters0e3f5912006-08-11 14:57:12 +000072
73 cur.execute('select 1 as "foo baz"')
Gregory P. Smith04cecaf2009-07-04 08:32:15 +000074 self.assertEqual(cur.description[0][0], "foo baz")
Thomas Wouters0e3f5912006-08-11 14:57:12 +000075
Gerhard Häringe7ea7452008-03-29 00:45:29 +000076 def CheckStatementFinalizationOnCloseDb(self):
77 # pysqlite versions <= 2.3.3 only finalized statements in the statement
78 # cache when closing the database. statements that were still
Serhiy Storchaka6a7b3a72016-04-17 08:32:47 +030079 # referenced in cursors weren't closed and could provoke "
Gerhard Häringe7ea7452008-03-29 00:45:29 +000080 # "OperationalError: Unable to close due to unfinalised statements".
81 con = sqlite.connect(":memory:")
82 cursors = []
83 # default statement cache size is 100
84 for i in range(105):
85 cur = con.cursor()
86 cursors.append(cur)
87 cur.execute("select 1 x union select " + str(i))
88 con.close()
89
90 def CheckOnConflictRollback(self):
Gerhard Häringe7ea7452008-03-29 00:45:29 +000091 con = sqlite.connect(":memory:")
92 con.execute("create table foo(x, unique(x) on conflict rollback)")
93 con.execute("insert into foo(x) values (1)")
94 try:
95 con.execute("insert into foo(x) values (1)")
96 except sqlite.DatabaseError:
97 pass
98 con.execute("insert into foo(x) values (2)")
99 try:
100 con.commit()
101 except sqlite.OperationalError:
102 self.fail("pysqlite knew nothing about the implicit ROLLBACK")
103
104 def CheckWorkaroundForBuggySqliteTransferBindings(self):
105 """
106 pysqlite would crash with older SQLite versions unless
107 a workaround is implemented.
108 """
109 self.con.execute("create table foo(bar)")
110 self.con.execute("drop table foo")
111 self.con.execute("create table foo(bar)")
112
113 def CheckEmptyStatement(self):
114 """
115 pysqlite used to segfault with SQLite versions 3.5.x. These return NULL
116 for "no-operation" statements
117 """
118 self.con.execute("")
119
120 def CheckTypeMapUsage(self):
121 """
122 pysqlite until 2.4.1 did not rebuild the row_cast_map when recompiling
123 a statement. This test exhibits the problem.
124 """
125 SELECT = "select * from foo"
126 con = sqlite.connect(":memory:",detect_types=sqlite.PARSE_DECLTYPES)
127 con.execute("create table foo(bar timestamp)")
128 con.execute("insert into foo(bar) values (?)", (datetime.datetime.now(),))
129 con.execute(SELECT)
130 con.execute("drop table foo")
131 con.execute("create table foo(bar integer)")
132 con.execute("insert into foo(bar) values (5)")
133 con.execute(SELECT)
134
Serhiy Storchaka0b419b72020-09-17 10:35:44 +0300135 def CheckBindMutatingList(self):
136 # Issue41662: Crash when mutate a list of parameters during iteration.
137 class X:
138 def __conform__(self, protocol):
139 parameters.clear()
140 return "..."
141 parameters = [X(), 0]
142 con = sqlite.connect(":memory:",detect_types=sqlite.PARSE_DECLTYPES)
143 con.execute("create table foo(bar X, baz integer)")
144 # Should not crash
145 with self.assertRaises(IndexError):
146 con.execute("insert into foo(bar, baz) values (?, ?)", parameters)
147
Gerhard Häring873d9ff2008-02-29 22:22:09 +0000148 def CheckErrorMsgDecodeError(self):
149 # When porting the module to Python 3.0, the error message about
150 # decoding errors disappeared. This verifies they're back again.
Berker Peksag48b5c982016-06-14 00:42:50 +0300151 with self.assertRaises(sqlite.OperationalError) as cm:
Georg Brandl3dbca812008-07-23 16:10:53 +0000152 self.con.execute("select 'xxx' || ? || 'yyy' colname",
153 (bytes(bytearray([250])),)).fetchone()
Berker Peksag48b5c982016-06-14 00:42:50 +0300154 msg = "Could not decode to UTF-8 column 'colname' with text 'xxx"
155 self.assertIn(msg, str(cm.exception))
Gerhard Häring873d9ff2008-02-29 22:22:09 +0000156
Georg Brandl3dbca812008-07-23 16:10:53 +0000157 def CheckRegisterAdapter(self):
158 """
159 See issue 3312.
160 """
161 self.assertRaises(TypeError, sqlite.register_adapter, {}, None)
162
163 def CheckSetIsolationLevel(self):
Serhiy Storchaka28914922016-09-01 22:18:03 +0300164 # See issue 27881.
165 class CustomStr(str):
166 def upper(self):
167 return None
168 def __del__(self):
169 con.isolation_level = ""
170
Georg Brandl3dbca812008-07-23 16:10:53 +0000171 con = sqlite.connect(":memory:")
Serhiy Storchaka28914922016-09-01 22:18:03 +0300172 con.isolation_level = None
173 for level in "", "DEFERRED", "IMMEDIATE", "EXCLUSIVE":
174 with self.subTest(level=level):
175 con.isolation_level = level
176 con.isolation_level = level.lower()
177 con.isolation_level = level.capitalize()
178 con.isolation_level = CustomStr(level)
179
180 # setting isolation_level failure should not alter previous state
181 con.isolation_level = None
182 con.isolation_level = "DEFERRED"
183 pairs = [
184 (1, TypeError), (b'', TypeError), ("abc", ValueError),
185 ("IMMEDIATE\0EXCLUSIVE", ValueError), ("\xe9", ValueError),
186 ]
187 for value, exc in pairs:
188 with self.subTest(level=value):
189 with self.assertRaises(exc):
190 con.isolation_level = value
191 self.assertEqual(con.isolation_level, "DEFERRED")
Georg Brandl3dbca812008-07-23 16:10:53 +0000192
Gerhard Häringf9cee222010-03-05 15:20:03 +0000193 def CheckCursorConstructorCallCheck(self):
194 """
Ezio Melottib5bc3532013-08-17 16:11:40 +0300195 Verifies that cursor methods check whether base class __init__ was
196 called.
Gerhard Häringf9cee222010-03-05 15:20:03 +0000197 """
198 class Cursor(sqlite.Cursor):
199 def __init__(self, con):
200 pass
201
202 con = sqlite.connect(":memory:")
203 cur = Cursor(con)
Berker Peksag1003b342016-06-12 22:34:49 +0300204 with self.assertRaises(sqlite.ProgrammingError):
Gerhard Häringf9cee222010-03-05 15:20:03 +0000205 cur.execute("select 4+5").fetchall()
Oren Milmanedb13ae2017-11-07 02:09:49 +0200206 with self.assertRaisesRegex(sqlite.ProgrammingError,
207 r'^Base Cursor\.__init__ not called\.$'):
208 cur.close()
Gerhard Häringf9cee222010-03-05 15:20:03 +0000209
Gerhard Häring6117f422008-09-22 06:04:51 +0000210 def CheckStrSubclass(self):
211 """
212 The Python 3.0 port of the module didn't cope with values of subclasses of str.
213 """
214 class MyStr(str): pass
215 self.con.execute("select ?", (MyStr("abc"),))
Georg Brandl3dbca812008-07-23 16:10:53 +0000216
Gerhard Häringf9cee222010-03-05 15:20:03 +0000217 def CheckConnectionConstructorCallCheck(self):
218 """
Ezio Melottib5bc3532013-08-17 16:11:40 +0300219 Verifies that connection methods check whether base class __init__ was
220 called.
Gerhard Häringf9cee222010-03-05 15:20:03 +0000221 """
222 class Connection(sqlite.Connection):
223 def __init__(self, name):
224 pass
225
226 con = Connection(":memory:")
Berker Peksag1003b342016-06-12 22:34:49 +0300227 with self.assertRaises(sqlite.ProgrammingError):
Gerhard Häringf9cee222010-03-05 15:20:03 +0000228 cur = con.cursor()
Gerhard Häringf9cee222010-03-05 15:20:03 +0000229
230 def CheckCursorRegistration(self):
231 """
232 Verifies that subclassed cursor classes are correctly registered with
233 the connection object, too. (fetch-across-rollback problem)
234 """
235 class Connection(sqlite.Connection):
236 def cursor(self):
237 return Cursor(self)
238
239 class Cursor(sqlite.Cursor):
240 def __init__(self, con):
241 sqlite.Cursor.__init__(self, con)
242
243 con = Connection(":memory:")
244 cur = con.cursor()
245 cur.execute("create table foo(x)")
246 cur.executemany("insert into foo(x) values (?)", [(3,), (4,), (5,)])
247 cur.execute("select x from foo")
248 con.rollback()
Berker Peksag1003b342016-06-12 22:34:49 +0300249 with self.assertRaises(sqlite.InterfaceError):
Gerhard Häringf9cee222010-03-05 15:20:03 +0000250 cur.fetchall()
Gerhard Häringf9cee222010-03-05 15:20:03 +0000251
252 def CheckAutoCommit(self):
253 """
254 Verifies that creating a connection in autocommit mode works.
255 2.5.3 introduced a regression so that these could no longer
256 be created.
257 """
258 con = sqlite.connect(":memory:", isolation_level=None)
259
260 def CheckPragmaAutocommit(self):
261 """
262 Verifies that running a PRAGMA statement that does an autocommit does
263 work. This did not work in 2.5.3/2.5.4.
264 """
Victor Stinner0201f442010-03-13 03:28:34 +0000265 cur = self.con.cursor()
Gerhard Häringf9cee222010-03-05 15:20:03 +0000266 cur.execute("create table foo(bar)")
267 cur.execute("insert into foo(bar) values (5)")
268
269 cur.execute("pragma page_size")
270 row = cur.fetchone()
271
Victor Stinner0201f442010-03-13 03:28:34 +0000272 def CheckConnectionCall(self):
273 """
274 Call a connection with a non-string SQL request: check error handling
275 of the statement constructor.
276 """
Victor Stinnerc6a23202019-06-26 03:16:24 +0200277 self.assertRaises(TypeError, self.con, 1)
Gerhard Häringf9cee222010-03-05 15:20:03 +0000278
Victor Stinner35466c52010-04-22 11:23:23 +0000279 def CheckCollation(self):
280 def collation_cb(a, b):
281 return 1
282 self.assertRaises(sqlite.ProgrammingError, self.con.create_collation,
283 # Lone surrogate cannot be encoded to the default encoding (utf8)
284 "\uDC80", collation_cb)
285
Petri Lehtinen4a84f582011-05-09 12:24:09 +0200286 def CheckRecursiveCursorUse(self):
287 """
288 http://bugs.python.org/issue10811
289
290 Recursively using a cursor, such as when reusing it from a generator led to segfaults.
291 Now we catch recursive cursor usage and raise a ProgrammingError.
292 """
293 con = sqlite.connect(":memory:")
294
295 cur = con.cursor()
296 cur.execute("create table a (bar)")
297 cur.execute("create table b (baz)")
298
299 def foo():
300 cur.execute("insert into a (bar) values (?)", (1,))
301 yield 1
302
303 with self.assertRaises(sqlite.ProgrammingError):
304 cur.executemany("insert into b (baz) values (?)",
305 ((i,) for i in foo()))
306
Petri Lehtinen8b945142013-02-23 19:05:09 +0100307 def CheckConvertTimestampMicrosecondPadding(self):
308 """
309 http://bugs.python.org/issue14720
310
311 The microsecond parsing of convert_timestamp() should pad with zeros,
312 since the microsecond string "456" actually represents "456000".
313 """
314
315 con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_DECLTYPES)
316 cur = con.cursor()
317 cur.execute("CREATE TABLE t (x TIMESTAMP)")
Petri Lehtinen8b945142013-02-23 19:05:09 +0100318
Petri Lehtinen5f794092013-02-26 21:32:02 +0200319 # Microseconds should be 456000
320 cur.execute("INSERT INTO t (x) VALUES ('2012-04-04 15:06:00.456')")
321
322 # Microseconds should be truncated to 123456
323 cur.execute("INSERT INTO t (x) VALUES ('2012-04-04 15:06:00.123456789')")
324
325 cur.execute("SELECT * FROM t")
326 values = [x[0] for x in cur.fetchall()]
327
328 self.assertEqual(values, [
329 datetime.datetime(2012, 4, 4, 15, 6, 0, 456000),
330 datetime.datetime(2012, 4, 4, 15, 6, 0, 123456),
331 ])
Petri Lehtinen8b945142013-02-23 19:05:09 +0100332
Victor Stinnercb1f74e2013-12-19 16:38:03 +0100333 def CheckInvalidIsolationLevelType(self):
334 # isolation level is a string, not an integer
335 self.assertRaises(TypeError,
336 sqlite.connect, ":memory:", isolation_level=123)
337
Petri Lehtinen4a84f582011-05-09 12:24:09 +0200338
Serhiy Storchaka42d67af2014-09-11 13:29:05 +0300339 def CheckNullCharacter(self):
340 # Issue #21147
341 con = sqlite.connect(":memory:")
342 self.assertRaises(ValueError, con, "\0select 1")
343 self.assertRaises(ValueError, con, "select 1\0")
344 cur = con.cursor()
345 self.assertRaises(ValueError, cur.execute, " \0select 2")
346 self.assertRaises(ValueError, cur.execute, "select 2\0")
347
Berker Peksagcc9afa92016-08-26 22:07:51 +0300348 def CheckCommitCursorReset(self):
349 """
350 Connection.commit() did reset cursors, which made sqlite3
351 to return rows multiple times when fetched from cursors
352 after commit. See issues 10513 and 23129 for details.
353 """
354 con = sqlite.connect(":memory:")
355 con.executescript("""
356 create table t(c);
357 create table t2(c);
358 insert into t values(0);
359 insert into t values(1);
360 insert into t values(2);
361 """)
362
363 self.assertEqual(con.isolation_level, "")
364
365 counter = 0
366 for i, row in enumerate(con.execute("select c from t")):
367 with self.subTest(i=i, row=row):
368 con.execute("insert into t2(c) values (?)", (i,))
369 con.commit()
370 if counter == 0:
371 self.assertEqual(row[0], 0)
372 elif counter == 1:
373 self.assertEqual(row[0], 1)
374 elif counter == 2:
375 self.assertEqual(row[0], 2)
376 counter += 1
377 self.assertEqual(counter, 3, "should have returned exactly three rows")
378
Oren Milmane56ab742017-11-07 02:01:47 +0200379 def CheckBpo31770(self):
380 """
381 The interpreter shouldn't crash in case Cursor.__init__() is called
382 more than once.
383 """
384 def callback(*args):
385 pass
386 con = sqlite.connect(":memory:")
387 cur = sqlite.Cursor(con)
388 ref = weakref.ref(cur, callback)
389 cur.__init__(con)
390 del cur
391 # The interpreter shouldn't crash when ref is collected.
392 del ref
393 support.gc_collect()
394
Zackery Spytz842acaa2018-12-17 07:52:45 -0700395 def CheckDelIsolation_levelSegfault(self):
396 with self.assertRaises(AttributeError):
397 del self.con.isolation_level
398
gescheitb9a03762019-07-13 06:15:49 +0300399 def CheckBpo37347(self):
400 class Printer:
401 def log(self, *args):
402 return sqlite.SQLITE_OK
Serhiy Storchaka42d67af2014-09-11 13:29:05 +0300403
gescheitb9a03762019-07-13 06:15:49 +0300404 for method in [self.con.set_trace_callback,
405 functools.partial(self.con.set_progress_handler, n=1),
406 self.con.set_authorizer]:
407 printer_instance = Printer()
408 method(printer_instance.log)
409 method(printer_instance.log)
410 self.con.execute("select 1") # trigger seg fault
411 method(None)
Sergey Fedoseev5b25f1d2018-12-05 22:50:26 +0500412
Sergey Fedoseev5b25f1d2018-12-05 22:50:26 +0500413
414
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000415def suite():
416 regression_suite = unittest.makeSuite(RegressionTests, "Check")
Sergey Fedoseev5b25f1d2018-12-05 22:50:26 +0500417 return unittest.TestSuite((
418 regression_suite,
Sergey Fedoseev5b25f1d2018-12-05 22:50:26 +0500419 ))
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000420
421def test():
422 runner = unittest.TextTestRunner()
423 runner.run(suite())
424
425if __name__ == "__main__":
426 test()