blob: 1c59a3cd31c625834a4dda5e282d86828bf888fc [file] [log] [blame]
Petri Lehtinen8b945142013-02-23 19:05:09 +01001#-*- coding: iso-8859-1 -*-
Thomas Wouters49fd7fa2006-04-21 10:40:58 +00002# pysqlite2/test/regression.py: pysqlite regression tests
3#
Gerhard Häringf9cee222010-03-05 15:20:03 +00004# Copyright (C) 2006-2010 Gerhard Häring <gh@ghaering.de>
Thomas Wouters49fd7fa2006-04-21 10:40:58 +00005#
6# This file is part of pysqlite.
7#
8# This software is provided 'as-is', without any express or implied
9# warranty. In no event will the authors be held liable for any damages
10# arising from the use of this software.
11#
12# Permission is granted to anyone to use this software for any purpose,
13# including commercial applications, and to alter it and redistribute it
14# freely, subject to the following restrictions:
15#
16# 1. The origin of this software must not be misrepresented; you must not
17# claim that you wrote the original software. If you use this software
18# in a product, an acknowledgment in the product documentation would be
19# appreciated but is not required.
20# 2. Altered source versions must be plainly marked as such, and must not be
21# misrepresented as being the original software.
22# 3. This notice may not be removed or altered from any source distribution.
23
Gerhard Häringe7ea7452008-03-29 00:45:29 +000024import datetime
Thomas Wouters49fd7fa2006-04-21 10:40:58 +000025import unittest
Thomas Wouters477c8d52006-05-27 19:21:47 +000026import sqlite3 as sqlite
Oren Milmane56ab742017-11-07 02:01:47 +020027import weakref
28from test import support
Thomas Wouters49fd7fa2006-04-21 10:40:58 +000029
30class RegressionTests(unittest.TestCase):
31 def setUp(self):
32 self.con = sqlite.connect(":memory:")
33
34 def tearDown(self):
35 self.con.close()
36
37 def CheckPragmaUserVersion(self):
38 # This used to crash pysqlite because this pragma command returns NULL for the column name
39 cur = self.con.cursor()
40 cur.execute("pragma user_version")
41
Thomas Wouters477c8d52006-05-27 19:21:47 +000042 def CheckPragmaSchemaVersion(self):
43 # This still crashed pysqlite <= 2.2.1
44 con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_COLNAMES)
45 try:
46 cur = self.con.cursor()
47 cur.execute("pragma schema_version")
48 finally:
49 cur.close()
50 con.close()
51
52 def CheckStatementReset(self):
53 # pysqlite 2.1.0 to 2.2.0 have the problem that not all statements are
54 # reset before a rollback, but only those that are still in the
55 # statement cache. The others are not accessible from the connection object.
56 con = sqlite.connect(":memory:", cached_statements=5)
Guido van Rossum805365e2007-05-07 22:24:25 +000057 cursors = [con.cursor() for x in range(5)]
Thomas Wouters477c8d52006-05-27 19:21:47 +000058 cursors[0].execute("create table test(x)")
59 for i in range(10):
Guido van Rossum805365e2007-05-07 22:24:25 +000060 cursors[0].executemany("insert into test(x) values (?)", [(x,) for x in range(10)])
Thomas Wouters477c8d52006-05-27 19:21:47 +000061
62 for i in range(5):
63 cursors[i].execute(" " * i + "select x from test")
64
65 con.rollback()
66
Thomas Wouters0e3f5912006-08-11 14:57:12 +000067 def CheckColumnNameWithSpaces(self):
68 cur = self.con.cursor()
69 cur.execute('select 1 as "foo bar [datetime]"')
Gregory P. Smith04cecaf2009-07-04 08:32:15 +000070 self.assertEqual(cur.description[0][0], "foo bar")
Thomas Wouters0e3f5912006-08-11 14:57:12 +000071
72 cur.execute('select 1 as "foo baz"')
Gregory P. Smith04cecaf2009-07-04 08:32:15 +000073 self.assertEqual(cur.description[0][0], "foo baz")
Thomas Wouters0e3f5912006-08-11 14:57:12 +000074
Gerhard Häringe7ea7452008-03-29 00:45:29 +000075 def CheckStatementFinalizationOnCloseDb(self):
76 # pysqlite versions <= 2.3.3 only finalized statements in the statement
77 # cache when closing the database. statements that were still
Serhiy Storchaka6a7b3a72016-04-17 08:32:47 +030078 # referenced in cursors weren't closed and could provoke "
Gerhard Häringe7ea7452008-03-29 00:45:29 +000079 # "OperationalError: Unable to close due to unfinalised statements".
80 con = sqlite.connect(":memory:")
81 cursors = []
82 # default statement cache size is 100
83 for i in range(105):
84 cur = con.cursor()
85 cursors.append(cur)
86 cur.execute("select 1 x union select " + str(i))
87 con.close()
88
Berker Peksagf85bce72016-06-14 14:19:02 +030089 @unittest.skipIf(sqlite.sqlite_version_info < (3, 2, 2), 'needs sqlite 3.2.2 or newer')
Gerhard Häringe7ea7452008-03-29 00:45:29 +000090 def CheckOnConflictRollback(self):
Gerhard Häringe7ea7452008-03-29 00:45:29 +000091 con = sqlite.connect(":memory:")
92 con.execute("create table foo(x, unique(x) on conflict rollback)")
93 con.execute("insert into foo(x) values (1)")
94 try:
95 con.execute("insert into foo(x) values (1)")
96 except sqlite.DatabaseError:
97 pass
98 con.execute("insert into foo(x) values (2)")
99 try:
100 con.commit()
101 except sqlite.OperationalError:
102 self.fail("pysqlite knew nothing about the implicit ROLLBACK")
103
104 def CheckWorkaroundForBuggySqliteTransferBindings(self):
105 """
106 pysqlite would crash with older SQLite versions unless
107 a workaround is implemented.
108 """
109 self.con.execute("create table foo(bar)")
110 self.con.execute("drop table foo")
111 self.con.execute("create table foo(bar)")
112
113 def CheckEmptyStatement(self):
114 """
115 pysqlite used to segfault with SQLite versions 3.5.x. These return NULL
116 for "no-operation" statements
117 """
118 self.con.execute("")
119
120 def CheckTypeMapUsage(self):
121 """
122 pysqlite until 2.4.1 did not rebuild the row_cast_map when recompiling
123 a statement. This test exhibits the problem.
124 """
125 SELECT = "select * from foo"
126 con = sqlite.connect(":memory:",detect_types=sqlite.PARSE_DECLTYPES)
127 con.execute("create table foo(bar timestamp)")
128 con.execute("insert into foo(bar) values (?)", (datetime.datetime.now(),))
129 con.execute(SELECT)
130 con.execute("drop table foo")
131 con.execute("create table foo(bar integer)")
132 con.execute("insert into foo(bar) values (5)")
133 con.execute(SELECT)
134
Gerhard Häring873d9ff2008-02-29 22:22:09 +0000135 def CheckErrorMsgDecodeError(self):
136 # When porting the module to Python 3.0, the error message about
137 # decoding errors disappeared. This verifies they're back again.
Berker Peksag48b5c982016-06-14 00:42:50 +0300138 with self.assertRaises(sqlite.OperationalError) as cm:
Georg Brandl3dbca812008-07-23 16:10:53 +0000139 self.con.execute("select 'xxx' || ? || 'yyy' colname",
140 (bytes(bytearray([250])),)).fetchone()
Berker Peksag48b5c982016-06-14 00:42:50 +0300141 msg = "Could not decode to UTF-8 column 'colname' with text 'xxx"
142 self.assertIn(msg, str(cm.exception))
Gerhard Häring873d9ff2008-02-29 22:22:09 +0000143
Georg Brandl3dbca812008-07-23 16:10:53 +0000144 def CheckRegisterAdapter(self):
145 """
146 See issue 3312.
147 """
148 self.assertRaises(TypeError, sqlite.register_adapter, {}, None)
149
150 def CheckSetIsolationLevel(self):
Serhiy Storchaka28914922016-09-01 22:18:03 +0300151 # See issue 27881.
152 class CustomStr(str):
153 def upper(self):
154 return None
155 def __del__(self):
156 con.isolation_level = ""
157
Georg Brandl3dbca812008-07-23 16:10:53 +0000158 con = sqlite.connect(":memory:")
Serhiy Storchaka28914922016-09-01 22:18:03 +0300159 con.isolation_level = None
160 for level in "", "DEFERRED", "IMMEDIATE", "EXCLUSIVE":
161 with self.subTest(level=level):
162 con.isolation_level = level
163 con.isolation_level = level.lower()
164 con.isolation_level = level.capitalize()
165 con.isolation_level = CustomStr(level)
166
167 # setting isolation_level failure should not alter previous state
168 con.isolation_level = None
169 con.isolation_level = "DEFERRED"
170 pairs = [
171 (1, TypeError), (b'', TypeError), ("abc", ValueError),
172 ("IMMEDIATE\0EXCLUSIVE", ValueError), ("\xe9", ValueError),
173 ]
174 for value, exc in pairs:
175 with self.subTest(level=value):
176 with self.assertRaises(exc):
177 con.isolation_level = value
178 self.assertEqual(con.isolation_level, "DEFERRED")
Georg Brandl3dbca812008-07-23 16:10:53 +0000179
Gerhard Häringf9cee222010-03-05 15:20:03 +0000180 def CheckCursorConstructorCallCheck(self):
181 """
Ezio Melottib5bc3532013-08-17 16:11:40 +0300182 Verifies that cursor methods check whether base class __init__ was
183 called.
Gerhard Häringf9cee222010-03-05 15:20:03 +0000184 """
185 class Cursor(sqlite.Cursor):
186 def __init__(self, con):
187 pass
188
189 con = sqlite.connect(":memory:")
190 cur = Cursor(con)
Berker Peksag1003b342016-06-12 22:34:49 +0300191 with self.assertRaises(sqlite.ProgrammingError):
Gerhard Häringf9cee222010-03-05 15:20:03 +0000192 cur.execute("select 4+5").fetchall()
Oren Milmanedb13ae2017-11-07 02:09:49 +0200193 with self.assertRaisesRegex(sqlite.ProgrammingError,
194 r'^Base Cursor\.__init__ not called\.$'):
195 cur.close()
Gerhard Häringf9cee222010-03-05 15:20:03 +0000196
Gerhard Häring6117f422008-09-22 06:04:51 +0000197 def CheckStrSubclass(self):
198 """
199 The Python 3.0 port of the module didn't cope with values of subclasses of str.
200 """
201 class MyStr(str): pass
202 self.con.execute("select ?", (MyStr("abc"),))
Georg Brandl3dbca812008-07-23 16:10:53 +0000203
Gerhard Häringf9cee222010-03-05 15:20:03 +0000204 def CheckConnectionConstructorCallCheck(self):
205 """
Ezio Melottib5bc3532013-08-17 16:11:40 +0300206 Verifies that connection methods check whether base class __init__ was
207 called.
Gerhard Häringf9cee222010-03-05 15:20:03 +0000208 """
209 class Connection(sqlite.Connection):
210 def __init__(self, name):
211 pass
212
213 con = Connection(":memory:")
Berker Peksag1003b342016-06-12 22:34:49 +0300214 with self.assertRaises(sqlite.ProgrammingError):
Gerhard Häringf9cee222010-03-05 15:20:03 +0000215 cur = con.cursor()
Gerhard Häringf9cee222010-03-05 15:20:03 +0000216
217 def CheckCursorRegistration(self):
218 """
219 Verifies that subclassed cursor classes are correctly registered with
220 the connection object, too. (fetch-across-rollback problem)
221 """
222 class Connection(sqlite.Connection):
223 def cursor(self):
224 return Cursor(self)
225
226 class Cursor(sqlite.Cursor):
227 def __init__(self, con):
228 sqlite.Cursor.__init__(self, con)
229
230 con = Connection(":memory:")
231 cur = con.cursor()
232 cur.execute("create table foo(x)")
233 cur.executemany("insert into foo(x) values (?)", [(3,), (4,), (5,)])
234 cur.execute("select x from foo")
235 con.rollback()
Berker Peksag1003b342016-06-12 22:34:49 +0300236 with self.assertRaises(sqlite.InterfaceError):
Gerhard Häringf9cee222010-03-05 15:20:03 +0000237 cur.fetchall()
Gerhard Häringf9cee222010-03-05 15:20:03 +0000238
239 def CheckAutoCommit(self):
240 """
241 Verifies that creating a connection in autocommit mode works.
242 2.5.3 introduced a regression so that these could no longer
243 be created.
244 """
245 con = sqlite.connect(":memory:", isolation_level=None)
246
247 def CheckPragmaAutocommit(self):
248 """
249 Verifies that running a PRAGMA statement that does an autocommit does
250 work. This did not work in 2.5.3/2.5.4.
251 """
Victor Stinner0201f442010-03-13 03:28:34 +0000252 cur = self.con.cursor()
Gerhard Häringf9cee222010-03-05 15:20:03 +0000253 cur.execute("create table foo(bar)")
254 cur.execute("insert into foo(bar) values (5)")
255
256 cur.execute("pragma page_size")
257 row = cur.fetchone()
258
Victor Stinner0201f442010-03-13 03:28:34 +0000259 def CheckConnectionCall(self):
260 """
261 Call a connection with a non-string SQL request: check error handling
262 of the statement constructor.
263 """
264 self.assertRaises(sqlite.Warning, self.con, 1)
Gerhard Häringf9cee222010-03-05 15:20:03 +0000265
Victor Stinner35466c52010-04-22 11:23:23 +0000266 def CheckCollation(self):
267 def collation_cb(a, b):
268 return 1
269 self.assertRaises(sqlite.ProgrammingError, self.con.create_collation,
270 # Lone surrogate cannot be encoded to the default encoding (utf8)
271 "\uDC80", collation_cb)
272
Petri Lehtinen4a84f582011-05-09 12:24:09 +0200273 def CheckRecursiveCursorUse(self):
274 """
275 http://bugs.python.org/issue10811
276
277 Recursively using a cursor, such as when reusing it from a generator led to segfaults.
278 Now we catch recursive cursor usage and raise a ProgrammingError.
279 """
280 con = sqlite.connect(":memory:")
281
282 cur = con.cursor()
283 cur.execute("create table a (bar)")
284 cur.execute("create table b (baz)")
285
286 def foo():
287 cur.execute("insert into a (bar) values (?)", (1,))
288 yield 1
289
290 with self.assertRaises(sqlite.ProgrammingError):
291 cur.executemany("insert into b (baz) values (?)",
292 ((i,) for i in foo()))
293
Petri Lehtinen8b945142013-02-23 19:05:09 +0100294 def CheckConvertTimestampMicrosecondPadding(self):
295 """
296 http://bugs.python.org/issue14720
297
298 The microsecond parsing of convert_timestamp() should pad with zeros,
299 since the microsecond string "456" actually represents "456000".
300 """
301
302 con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_DECLTYPES)
303 cur = con.cursor()
304 cur.execute("CREATE TABLE t (x TIMESTAMP)")
Petri Lehtinen8b945142013-02-23 19:05:09 +0100305
Petri Lehtinen5f794092013-02-26 21:32:02 +0200306 # Microseconds should be 456000
307 cur.execute("INSERT INTO t (x) VALUES ('2012-04-04 15:06:00.456')")
308
309 # Microseconds should be truncated to 123456
310 cur.execute("INSERT INTO t (x) VALUES ('2012-04-04 15:06:00.123456789')")
311
312 cur.execute("SELECT * FROM t")
313 values = [x[0] for x in cur.fetchall()]
314
315 self.assertEqual(values, [
316 datetime.datetime(2012, 4, 4, 15, 6, 0, 456000),
317 datetime.datetime(2012, 4, 4, 15, 6, 0, 123456),
318 ])
Petri Lehtinen8b945142013-02-23 19:05:09 +0100319
Victor Stinnercb1f74e2013-12-19 16:38:03 +0100320 def CheckInvalidIsolationLevelType(self):
321 # isolation level is a string, not an integer
322 self.assertRaises(TypeError,
323 sqlite.connect, ":memory:", isolation_level=123)
324
Petri Lehtinen4a84f582011-05-09 12:24:09 +0200325
Serhiy Storchaka42d67af2014-09-11 13:29:05 +0300326 def CheckNullCharacter(self):
327 # Issue #21147
328 con = sqlite.connect(":memory:")
329 self.assertRaises(ValueError, con, "\0select 1")
330 self.assertRaises(ValueError, con, "select 1\0")
331 cur = con.cursor()
332 self.assertRaises(ValueError, cur.execute, " \0select 2")
333 self.assertRaises(ValueError, cur.execute, "select 2\0")
334
Berker Peksagcc9afa92016-08-26 22:07:51 +0300335 def CheckCommitCursorReset(self):
336 """
337 Connection.commit() did reset cursors, which made sqlite3
338 to return rows multiple times when fetched from cursors
339 after commit. See issues 10513 and 23129 for details.
340 """
341 con = sqlite.connect(":memory:")
342 con.executescript("""
343 create table t(c);
344 create table t2(c);
345 insert into t values(0);
346 insert into t values(1);
347 insert into t values(2);
348 """)
349
350 self.assertEqual(con.isolation_level, "")
351
352 counter = 0
353 for i, row in enumerate(con.execute("select c from t")):
354 with self.subTest(i=i, row=row):
355 con.execute("insert into t2(c) values (?)", (i,))
356 con.commit()
357 if counter == 0:
358 self.assertEqual(row[0], 0)
359 elif counter == 1:
360 self.assertEqual(row[0], 1)
361 elif counter == 2:
362 self.assertEqual(row[0], 2)
363 counter += 1
364 self.assertEqual(counter, 3, "should have returned exactly three rows")
365
Oren Milmane56ab742017-11-07 02:01:47 +0200366 def CheckBpo31770(self):
367 """
368 The interpreter shouldn't crash in case Cursor.__init__() is called
369 more than once.
370 """
371 def callback(*args):
372 pass
373 con = sqlite.connect(":memory:")
374 cur = sqlite.Cursor(con)
375 ref = weakref.ref(cur, callback)
376 cur.__init__(con)
377 del cur
378 # The interpreter shouldn't crash when ref is collected.
379 del ref
380 support.gc_collect()
381
Serhiy Storchaka42d67af2014-09-11 13:29:05 +0300382
Sergey Fedoseev5b25f1d2018-12-05 22:50:26 +0500383class UnhashableFunc:
384 __hash__ = None
385
386 def __init__(self, return_value=None):
387 self.calls = 0
388 self.return_value = return_value
389
390 def __call__(self, *args, **kwargs):
391 self.calls += 1
392 return self.return_value
393
394
395class UnhashableCallbacksTestCase(unittest.TestCase):
396 """
397 https://bugs.python.org/issue34052
398
399 Registering unhashable callbacks raises TypeError, callbacks are not
400 registered in SQLite after such registration attempt.
401 """
402 def setUp(self):
403 self.con = sqlite.connect(':memory:')
404
405 def tearDown(self):
406 self.con.close()
407
408 def test_progress_handler(self):
409 f = UnhashableFunc(return_value=0)
410 with self.assertRaisesRegex(TypeError, 'unhashable type'):
411 self.con.set_progress_handler(f, 1)
412 self.con.execute('SELECT 1')
413 self.assertFalse(f.calls)
414
415 def test_func(self):
416 func_name = 'func_name'
417 f = UnhashableFunc()
418 with self.assertRaisesRegex(TypeError, 'unhashable type'):
419 self.con.create_function(func_name, 0, f)
420 msg = 'no such function: %s' % func_name
421 with self.assertRaisesRegex(sqlite.OperationalError, msg):
422 self.con.execute('SELECT %s()' % func_name)
423 self.assertFalse(f.calls)
424
425 def test_authorizer(self):
426 f = UnhashableFunc(return_value=sqlite.SQLITE_DENY)
427 with self.assertRaisesRegex(TypeError, 'unhashable type'):
428 self.con.set_authorizer(f)
429 self.con.execute('SELECT 1')
430 self.assertFalse(f.calls)
431
432 def test_aggr(self):
433 class UnhashableType(type):
434 __hash__ = None
435 aggr_name = 'aggr_name'
436 with self.assertRaisesRegex(TypeError, 'unhashable type'):
437 self.con.create_aggregate(aggr_name, 0, UnhashableType('Aggr', (), {}))
438 msg = 'no such function: %s' % aggr_name
439 with self.assertRaisesRegex(sqlite.OperationalError, msg):
440 self.con.execute('SELECT %s()' % aggr_name)
441
442
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000443def suite():
444 regression_suite = unittest.makeSuite(RegressionTests, "Check")
Sergey Fedoseev5b25f1d2018-12-05 22:50:26 +0500445 return unittest.TestSuite((
446 regression_suite,
447 unittest.makeSuite(UnhashableCallbacksTestCase),
448 ))
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000449
450def test():
451 runner = unittest.TextTestRunner()
452 runner.run(suite())
453
454if __name__ == "__main__":
455 test()