blob: 6aa86d5766e56843a836d5e6a1edaa6ad11ee9aa [file] [log] [blame]
Petri Lehtinen8b945142013-02-23 19:05:09 +01001#-*- coding: iso-8859-1 -*-
Thomas Wouters49fd7fa2006-04-21 10:40:58 +00002# pysqlite2/test/regression.py: pysqlite regression tests
3#
Gerhard Häringf9cee222010-03-05 15:20:03 +00004# Copyright (C) 2006-2010 Gerhard Häring <gh@ghaering.de>
Thomas Wouters49fd7fa2006-04-21 10:40:58 +00005#
6# This file is part of pysqlite.
7#
8# This software is provided 'as-is', without any express or implied
9# warranty. In no event will the authors be held liable for any damages
10# arising from the use of this software.
11#
12# Permission is granted to anyone to use this software for any purpose,
13# including commercial applications, and to alter it and redistribute it
14# freely, subject to the following restrictions:
15#
16# 1. The origin of this software must not be misrepresented; you must not
17# claim that you wrote the original software. If you use this software
18# in a product, an acknowledgment in the product documentation would be
19# appreciated but is not required.
20# 2. Altered source versions must be plainly marked as such, and must not be
21# misrepresented as being the original software.
22# 3. This notice may not be removed or altered from any source distribution.
23
Gerhard Häringe7ea7452008-03-29 00:45:29 +000024import datetime
Thomas Wouters49fd7fa2006-04-21 10:40:58 +000025import unittest
Thomas Wouters477c8d52006-05-27 19:21:47 +000026import sqlite3 as sqlite
Oren Milmane56ab742017-11-07 02:01:47 +020027import weakref
gescheitb9a03762019-07-13 06:15:49 +030028import functools
Oren Milmane56ab742017-11-07 02:01:47 +020029from test import support
Thomas Wouters49fd7fa2006-04-21 10:40:58 +000030
31class RegressionTests(unittest.TestCase):
32 def setUp(self):
33 self.con = sqlite.connect(":memory:")
34
35 def tearDown(self):
36 self.con.close()
37
38 def CheckPragmaUserVersion(self):
39 # This used to crash pysqlite because this pragma command returns NULL for the column name
40 cur = self.con.cursor()
41 cur.execute("pragma user_version")
42
Thomas Wouters477c8d52006-05-27 19:21:47 +000043 def CheckPragmaSchemaVersion(self):
44 # This still crashed pysqlite <= 2.2.1
45 con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_COLNAMES)
46 try:
47 cur = self.con.cursor()
48 cur.execute("pragma schema_version")
49 finally:
50 cur.close()
51 con.close()
52
53 def CheckStatementReset(self):
54 # pysqlite 2.1.0 to 2.2.0 have the problem that not all statements are
55 # reset before a rollback, but only those that are still in the
56 # statement cache. The others are not accessible from the connection object.
57 con = sqlite.connect(":memory:", cached_statements=5)
Guido van Rossum805365e2007-05-07 22:24:25 +000058 cursors = [con.cursor() for x in range(5)]
Thomas Wouters477c8d52006-05-27 19:21:47 +000059 cursors[0].execute("create table test(x)")
60 for i in range(10):
Guido van Rossum805365e2007-05-07 22:24:25 +000061 cursors[0].executemany("insert into test(x) values (?)", [(x,) for x in range(10)])
Thomas Wouters477c8d52006-05-27 19:21:47 +000062
63 for i in range(5):
64 cursors[i].execute(" " * i + "select x from test")
65
66 con.rollback()
67
Thomas Wouters0e3f5912006-08-11 14:57:12 +000068 def CheckColumnNameWithSpaces(self):
69 cur = self.con.cursor()
70 cur.execute('select 1 as "foo bar [datetime]"')
Serhiy Storchakab1465682020-03-21 15:53:28 +020071 self.assertEqual(cur.description[0][0], "foo bar [datetime]")
Thomas Wouters0e3f5912006-08-11 14:57:12 +000072
73 cur.execute('select 1 as "foo baz"')
Gregory P. Smith04cecaf2009-07-04 08:32:15 +000074 self.assertEqual(cur.description[0][0], "foo baz")
Thomas Wouters0e3f5912006-08-11 14:57:12 +000075
Gerhard Häringe7ea7452008-03-29 00:45:29 +000076 def CheckStatementFinalizationOnCloseDb(self):
77 # pysqlite versions <= 2.3.3 only finalized statements in the statement
78 # cache when closing the database. statements that were still
Serhiy Storchaka6a7b3a72016-04-17 08:32:47 +030079 # referenced in cursors weren't closed and could provoke "
Gerhard Häringe7ea7452008-03-29 00:45:29 +000080 # "OperationalError: Unable to close due to unfinalised statements".
81 con = sqlite.connect(":memory:")
82 cursors = []
83 # default statement cache size is 100
84 for i in range(105):
85 cur = con.cursor()
86 cursors.append(cur)
87 cur.execute("select 1 x union select " + str(i))
88 con.close()
89
Berker Peksagf85bce72016-06-14 14:19:02 +030090 @unittest.skipIf(sqlite.sqlite_version_info < (3, 2, 2), 'needs sqlite 3.2.2 or newer')
Gerhard Häringe7ea7452008-03-29 00:45:29 +000091 def CheckOnConflictRollback(self):
Gerhard Häringe7ea7452008-03-29 00:45:29 +000092 con = sqlite.connect(":memory:")
93 con.execute("create table foo(x, unique(x) on conflict rollback)")
94 con.execute("insert into foo(x) values (1)")
95 try:
96 con.execute("insert into foo(x) values (1)")
97 except sqlite.DatabaseError:
98 pass
99 con.execute("insert into foo(x) values (2)")
100 try:
101 con.commit()
102 except sqlite.OperationalError:
103 self.fail("pysqlite knew nothing about the implicit ROLLBACK")
104
105 def CheckWorkaroundForBuggySqliteTransferBindings(self):
106 """
107 pysqlite would crash with older SQLite versions unless
108 a workaround is implemented.
109 """
110 self.con.execute("create table foo(bar)")
111 self.con.execute("drop table foo")
112 self.con.execute("create table foo(bar)")
113
114 def CheckEmptyStatement(self):
115 """
116 pysqlite used to segfault with SQLite versions 3.5.x. These return NULL
117 for "no-operation" statements
118 """
119 self.con.execute("")
120
121 def CheckTypeMapUsage(self):
122 """
123 pysqlite until 2.4.1 did not rebuild the row_cast_map when recompiling
124 a statement. This test exhibits the problem.
125 """
126 SELECT = "select * from foo"
127 con = sqlite.connect(":memory:",detect_types=sqlite.PARSE_DECLTYPES)
128 con.execute("create table foo(bar timestamp)")
129 con.execute("insert into foo(bar) values (?)", (datetime.datetime.now(),))
130 con.execute(SELECT)
131 con.execute("drop table foo")
132 con.execute("create table foo(bar integer)")
133 con.execute("insert into foo(bar) values (5)")
134 con.execute(SELECT)
135
Miss Islington (bot)f76a3882020-09-17 00:57:07 -0700136 def CheckBindMutatingList(self):
137 # Issue41662: Crash when mutate a list of parameters during iteration.
138 class X:
139 def __conform__(self, protocol):
140 parameters.clear()
141 return "..."
142 parameters = [X(), 0]
143 con = sqlite.connect(":memory:",detect_types=sqlite.PARSE_DECLTYPES)
144 con.execute("create table foo(bar X, baz integer)")
145 # Should not crash
146 with self.assertRaises(IndexError):
147 con.execute("insert into foo(bar, baz) values (?, ?)", parameters)
148
Gerhard Häring873d9ff2008-02-29 22:22:09 +0000149 def CheckErrorMsgDecodeError(self):
150 # When porting the module to Python 3.0, the error message about
151 # decoding errors disappeared. This verifies they're back again.
Berker Peksag48b5c982016-06-14 00:42:50 +0300152 with self.assertRaises(sqlite.OperationalError) as cm:
Georg Brandl3dbca812008-07-23 16:10:53 +0000153 self.con.execute("select 'xxx' || ? || 'yyy' colname",
154 (bytes(bytearray([250])),)).fetchone()
Berker Peksag48b5c982016-06-14 00:42:50 +0300155 msg = "Could not decode to UTF-8 column 'colname' with text 'xxx"
156 self.assertIn(msg, str(cm.exception))
Gerhard Häring873d9ff2008-02-29 22:22:09 +0000157
Georg Brandl3dbca812008-07-23 16:10:53 +0000158 def CheckRegisterAdapter(self):
159 """
160 See issue 3312.
161 """
162 self.assertRaises(TypeError, sqlite.register_adapter, {}, None)
163
164 def CheckSetIsolationLevel(self):
Serhiy Storchaka28914922016-09-01 22:18:03 +0300165 # See issue 27881.
166 class CustomStr(str):
167 def upper(self):
168 return None
169 def __del__(self):
170 con.isolation_level = ""
171
Georg Brandl3dbca812008-07-23 16:10:53 +0000172 con = sqlite.connect(":memory:")
Serhiy Storchaka28914922016-09-01 22:18:03 +0300173 con.isolation_level = None
174 for level in "", "DEFERRED", "IMMEDIATE", "EXCLUSIVE":
175 with self.subTest(level=level):
176 con.isolation_level = level
177 con.isolation_level = level.lower()
178 con.isolation_level = level.capitalize()
179 con.isolation_level = CustomStr(level)
180
181 # setting isolation_level failure should not alter previous state
182 con.isolation_level = None
183 con.isolation_level = "DEFERRED"
184 pairs = [
185 (1, TypeError), (b'', TypeError), ("abc", ValueError),
186 ("IMMEDIATE\0EXCLUSIVE", ValueError), ("\xe9", ValueError),
187 ]
188 for value, exc in pairs:
189 with self.subTest(level=value):
190 with self.assertRaises(exc):
191 con.isolation_level = value
192 self.assertEqual(con.isolation_level, "DEFERRED")
Georg Brandl3dbca812008-07-23 16:10:53 +0000193
Gerhard Häringf9cee222010-03-05 15:20:03 +0000194 def CheckCursorConstructorCallCheck(self):
195 """
Ezio Melottib5bc3532013-08-17 16:11:40 +0300196 Verifies that cursor methods check whether base class __init__ was
197 called.
Gerhard Häringf9cee222010-03-05 15:20:03 +0000198 """
199 class Cursor(sqlite.Cursor):
200 def __init__(self, con):
201 pass
202
203 con = sqlite.connect(":memory:")
204 cur = Cursor(con)
Berker Peksag1003b342016-06-12 22:34:49 +0300205 with self.assertRaises(sqlite.ProgrammingError):
Gerhard Häringf9cee222010-03-05 15:20:03 +0000206 cur.execute("select 4+5").fetchall()
Oren Milmanedb13ae2017-11-07 02:09:49 +0200207 with self.assertRaisesRegex(sqlite.ProgrammingError,
208 r'^Base Cursor\.__init__ not called\.$'):
209 cur.close()
Gerhard Häringf9cee222010-03-05 15:20:03 +0000210
Gerhard Häring6117f422008-09-22 06:04:51 +0000211 def CheckStrSubclass(self):
212 """
213 The Python 3.0 port of the module didn't cope with values of subclasses of str.
214 """
215 class MyStr(str): pass
216 self.con.execute("select ?", (MyStr("abc"),))
Georg Brandl3dbca812008-07-23 16:10:53 +0000217
Gerhard Häringf9cee222010-03-05 15:20:03 +0000218 def CheckConnectionConstructorCallCheck(self):
219 """
Ezio Melottib5bc3532013-08-17 16:11:40 +0300220 Verifies that connection methods check whether base class __init__ was
221 called.
Gerhard Häringf9cee222010-03-05 15:20:03 +0000222 """
223 class Connection(sqlite.Connection):
224 def __init__(self, name):
225 pass
226
227 con = Connection(":memory:")
Berker Peksag1003b342016-06-12 22:34:49 +0300228 with self.assertRaises(sqlite.ProgrammingError):
Gerhard Häringf9cee222010-03-05 15:20:03 +0000229 cur = con.cursor()
Gerhard Häringf9cee222010-03-05 15:20:03 +0000230
231 def CheckCursorRegistration(self):
232 """
233 Verifies that subclassed cursor classes are correctly registered with
234 the connection object, too. (fetch-across-rollback problem)
235 """
236 class Connection(sqlite.Connection):
237 def cursor(self):
238 return Cursor(self)
239
240 class Cursor(sqlite.Cursor):
241 def __init__(self, con):
242 sqlite.Cursor.__init__(self, con)
243
244 con = Connection(":memory:")
245 cur = con.cursor()
246 cur.execute("create table foo(x)")
247 cur.executemany("insert into foo(x) values (?)", [(3,), (4,), (5,)])
248 cur.execute("select x from foo")
249 con.rollback()
Berker Peksag1003b342016-06-12 22:34:49 +0300250 with self.assertRaises(sqlite.InterfaceError):
Gerhard Häringf9cee222010-03-05 15:20:03 +0000251 cur.fetchall()
Gerhard Häringf9cee222010-03-05 15:20:03 +0000252
253 def CheckAutoCommit(self):
254 """
255 Verifies that creating a connection in autocommit mode works.
256 2.5.3 introduced a regression so that these could no longer
257 be created.
258 """
259 con = sqlite.connect(":memory:", isolation_level=None)
260
261 def CheckPragmaAutocommit(self):
262 """
263 Verifies that running a PRAGMA statement that does an autocommit does
264 work. This did not work in 2.5.3/2.5.4.
265 """
Victor Stinner0201f442010-03-13 03:28:34 +0000266 cur = self.con.cursor()
Gerhard Häringf9cee222010-03-05 15:20:03 +0000267 cur.execute("create table foo(bar)")
268 cur.execute("insert into foo(bar) values (5)")
269
270 cur.execute("pragma page_size")
271 row = cur.fetchone()
272
Victor Stinner0201f442010-03-13 03:28:34 +0000273 def CheckConnectionCall(self):
274 """
275 Call a connection with a non-string SQL request: check error handling
276 of the statement constructor.
277 """
Victor Stinnerc6a23202019-06-26 03:16:24 +0200278 self.assertRaises(TypeError, self.con, 1)
Gerhard Häringf9cee222010-03-05 15:20:03 +0000279
Victor Stinner35466c52010-04-22 11:23:23 +0000280 def CheckCollation(self):
281 def collation_cb(a, b):
282 return 1
283 self.assertRaises(sqlite.ProgrammingError, self.con.create_collation,
284 # Lone surrogate cannot be encoded to the default encoding (utf8)
285 "\uDC80", collation_cb)
286
Petri Lehtinen4a84f582011-05-09 12:24:09 +0200287 def CheckRecursiveCursorUse(self):
288 """
289 http://bugs.python.org/issue10811
290
291 Recursively using a cursor, such as when reusing it from a generator led to segfaults.
292 Now we catch recursive cursor usage and raise a ProgrammingError.
293 """
294 con = sqlite.connect(":memory:")
295
296 cur = con.cursor()
297 cur.execute("create table a (bar)")
298 cur.execute("create table b (baz)")
299
300 def foo():
301 cur.execute("insert into a (bar) values (?)", (1,))
302 yield 1
303
304 with self.assertRaises(sqlite.ProgrammingError):
305 cur.executemany("insert into b (baz) values (?)",
306 ((i,) for i in foo()))
307
Petri Lehtinen8b945142013-02-23 19:05:09 +0100308 def CheckConvertTimestampMicrosecondPadding(self):
309 """
310 http://bugs.python.org/issue14720
311
312 The microsecond parsing of convert_timestamp() should pad with zeros,
313 since the microsecond string "456" actually represents "456000".
314 """
315
316 con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_DECLTYPES)
317 cur = con.cursor()
318 cur.execute("CREATE TABLE t (x TIMESTAMP)")
Petri Lehtinen8b945142013-02-23 19:05:09 +0100319
Petri Lehtinen5f794092013-02-26 21:32:02 +0200320 # Microseconds should be 456000
321 cur.execute("INSERT INTO t (x) VALUES ('2012-04-04 15:06:00.456')")
322
323 # Microseconds should be truncated to 123456
324 cur.execute("INSERT INTO t (x) VALUES ('2012-04-04 15:06:00.123456789')")
325
326 cur.execute("SELECT * FROM t")
327 values = [x[0] for x in cur.fetchall()]
328
329 self.assertEqual(values, [
330 datetime.datetime(2012, 4, 4, 15, 6, 0, 456000),
331 datetime.datetime(2012, 4, 4, 15, 6, 0, 123456),
332 ])
Petri Lehtinen8b945142013-02-23 19:05:09 +0100333
Victor Stinnercb1f74e2013-12-19 16:38:03 +0100334 def CheckInvalidIsolationLevelType(self):
335 # isolation level is a string, not an integer
336 self.assertRaises(TypeError,
337 sqlite.connect, ":memory:", isolation_level=123)
338
Petri Lehtinen4a84f582011-05-09 12:24:09 +0200339
Serhiy Storchaka42d67af2014-09-11 13:29:05 +0300340 def CheckNullCharacter(self):
341 # Issue #21147
342 con = sqlite.connect(":memory:")
343 self.assertRaises(ValueError, con, "\0select 1")
344 self.assertRaises(ValueError, con, "select 1\0")
345 cur = con.cursor()
346 self.assertRaises(ValueError, cur.execute, " \0select 2")
347 self.assertRaises(ValueError, cur.execute, "select 2\0")
348
Berker Peksagcc9afa92016-08-26 22:07:51 +0300349 def CheckCommitCursorReset(self):
350 """
351 Connection.commit() did reset cursors, which made sqlite3
352 to return rows multiple times when fetched from cursors
353 after commit. See issues 10513 and 23129 for details.
354 """
355 con = sqlite.connect(":memory:")
356 con.executescript("""
357 create table t(c);
358 create table t2(c);
359 insert into t values(0);
360 insert into t values(1);
361 insert into t values(2);
362 """)
363
364 self.assertEqual(con.isolation_level, "")
365
366 counter = 0
367 for i, row in enumerate(con.execute("select c from t")):
368 with self.subTest(i=i, row=row):
369 con.execute("insert into t2(c) values (?)", (i,))
370 con.commit()
371 if counter == 0:
372 self.assertEqual(row[0], 0)
373 elif counter == 1:
374 self.assertEqual(row[0], 1)
375 elif counter == 2:
376 self.assertEqual(row[0], 2)
377 counter += 1
378 self.assertEqual(counter, 3, "should have returned exactly three rows")
379
Oren Milmane56ab742017-11-07 02:01:47 +0200380 def CheckBpo31770(self):
381 """
382 The interpreter shouldn't crash in case Cursor.__init__() is called
383 more than once.
384 """
385 def callback(*args):
386 pass
387 con = sqlite.connect(":memory:")
388 cur = sqlite.Cursor(con)
389 ref = weakref.ref(cur, callback)
390 cur.__init__(con)
391 del cur
392 # The interpreter shouldn't crash when ref is collected.
393 del ref
394 support.gc_collect()
395
Zackery Spytz842acaa2018-12-17 07:52:45 -0700396 def CheckDelIsolation_levelSegfault(self):
397 with self.assertRaises(AttributeError):
398 del self.con.isolation_level
399
gescheitb9a03762019-07-13 06:15:49 +0300400 def CheckBpo37347(self):
401 class Printer:
402 def log(self, *args):
403 return sqlite.SQLITE_OK
Serhiy Storchaka42d67af2014-09-11 13:29:05 +0300404
gescheitb9a03762019-07-13 06:15:49 +0300405 for method in [self.con.set_trace_callback,
406 functools.partial(self.con.set_progress_handler, n=1),
407 self.con.set_authorizer]:
408 printer_instance = Printer()
409 method(printer_instance.log)
410 method(printer_instance.log)
411 self.con.execute("select 1") # trigger seg fault
412 method(None)
Sergey Fedoseev5b25f1d2018-12-05 22:50:26 +0500413
Sergey Fedoseev5b25f1d2018-12-05 22:50:26 +0500414
415
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000416def suite():
417 regression_suite = unittest.makeSuite(RegressionTests, "Check")
Sergey Fedoseev5b25f1d2018-12-05 22:50:26 +0500418 return unittest.TestSuite((
419 regression_suite,
Sergey Fedoseev5b25f1d2018-12-05 22:50:26 +0500420 ))
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000421
422def test():
423 runner = unittest.TextTestRunner()
424 runner.run(suite())
425
426if __name__ == "__main__":
427 test()