blob: 34cd233535dc16488fcf509a16333436dc12a6de [file] [log] [blame]
Petri Lehtinen8b945142013-02-23 19:05:09 +01001#-*- coding: iso-8859-1 -*-
Thomas Wouters49fd7fa2006-04-21 10:40:58 +00002# pysqlite2/test/regression.py: pysqlite regression tests
3#
Gerhard Häringf9cee222010-03-05 15:20:03 +00004# Copyright (C) 2006-2010 Gerhard Häring <gh@ghaering.de>
Thomas Wouters49fd7fa2006-04-21 10:40:58 +00005#
6# This file is part of pysqlite.
7#
8# This software is provided 'as-is', without any express or implied
9# warranty. In no event will the authors be held liable for any damages
10# arising from the use of this software.
11#
12# Permission is granted to anyone to use this software for any purpose,
13# including commercial applications, and to alter it and redistribute it
14# freely, subject to the following restrictions:
15#
16# 1. The origin of this software must not be misrepresented; you must not
17# claim that you wrote the original software. If you use this software
18# in a product, an acknowledgment in the product documentation would be
19# appreciated but is not required.
20# 2. Altered source versions must be plainly marked as such, and must not be
21# misrepresented as being the original software.
22# 3. This notice may not be removed or altered from any source distribution.
23
Gerhard Häringe7ea7452008-03-29 00:45:29 +000024import datetime
Thomas Wouters49fd7fa2006-04-21 10:40:58 +000025import unittest
Thomas Wouters477c8d52006-05-27 19:21:47 +000026import sqlite3 as sqlite
Oren Milmane56ab742017-11-07 02:01:47 +020027import weakref
28from test import support
Thomas Wouters49fd7fa2006-04-21 10:40:58 +000029
30class RegressionTests(unittest.TestCase):
31 def setUp(self):
32 self.con = sqlite.connect(":memory:")
33
34 def tearDown(self):
35 self.con.close()
36
37 def CheckPragmaUserVersion(self):
38 # This used to crash pysqlite because this pragma command returns NULL for the column name
39 cur = self.con.cursor()
40 cur.execute("pragma user_version")
41
Thomas Wouters477c8d52006-05-27 19:21:47 +000042 def CheckPragmaSchemaVersion(self):
43 # This still crashed pysqlite <= 2.2.1
44 con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_COLNAMES)
45 try:
46 cur = self.con.cursor()
47 cur.execute("pragma schema_version")
48 finally:
49 cur.close()
50 con.close()
51
52 def CheckStatementReset(self):
53 # pysqlite 2.1.0 to 2.2.0 have the problem that not all statements are
54 # reset before a rollback, but only those that are still in the
55 # statement cache. The others are not accessible from the connection object.
56 con = sqlite.connect(":memory:", cached_statements=5)
Guido van Rossum805365e2007-05-07 22:24:25 +000057 cursors = [con.cursor() for x in range(5)]
Thomas Wouters477c8d52006-05-27 19:21:47 +000058 cursors[0].execute("create table test(x)")
59 for i in range(10):
Guido van Rossum805365e2007-05-07 22:24:25 +000060 cursors[0].executemany("insert into test(x) values (?)", [(x,) for x in range(10)])
Thomas Wouters477c8d52006-05-27 19:21:47 +000061
62 for i in range(5):
63 cursors[i].execute(" " * i + "select x from test")
64
65 con.rollback()
66
Thomas Wouters0e3f5912006-08-11 14:57:12 +000067 def CheckColumnNameWithSpaces(self):
68 cur = self.con.cursor()
69 cur.execute('select 1 as "foo bar [datetime]"')
Gregory P. Smith04cecaf2009-07-04 08:32:15 +000070 self.assertEqual(cur.description[0][0], "foo bar")
Thomas Wouters0e3f5912006-08-11 14:57:12 +000071
72 cur.execute('select 1 as "foo baz"')
Gregory P. Smith04cecaf2009-07-04 08:32:15 +000073 self.assertEqual(cur.description[0][0], "foo baz")
Thomas Wouters0e3f5912006-08-11 14:57:12 +000074
Gerhard Häringe7ea7452008-03-29 00:45:29 +000075 def CheckStatementFinalizationOnCloseDb(self):
76 # pysqlite versions <= 2.3.3 only finalized statements in the statement
77 # cache when closing the database. statements that were still
Serhiy Storchaka6a7b3a72016-04-17 08:32:47 +030078 # referenced in cursors weren't closed and could provoke "
Gerhard Häringe7ea7452008-03-29 00:45:29 +000079 # "OperationalError: Unable to close due to unfinalised statements".
80 con = sqlite.connect(":memory:")
81 cursors = []
82 # default statement cache size is 100
83 for i in range(105):
84 cur = con.cursor()
85 cursors.append(cur)
86 cur.execute("select 1 x union select " + str(i))
87 con.close()
88
Berker Peksagf85bce72016-06-14 14:19:02 +030089 @unittest.skipIf(sqlite.sqlite_version_info < (3, 2, 2), 'needs sqlite 3.2.2 or newer')
Gerhard Häringe7ea7452008-03-29 00:45:29 +000090 def CheckOnConflictRollback(self):
Gerhard Häringe7ea7452008-03-29 00:45:29 +000091 con = sqlite.connect(":memory:")
92 con.execute("create table foo(x, unique(x) on conflict rollback)")
93 con.execute("insert into foo(x) values (1)")
94 try:
95 con.execute("insert into foo(x) values (1)")
96 except sqlite.DatabaseError:
97 pass
98 con.execute("insert into foo(x) values (2)")
99 try:
100 con.commit()
101 except sqlite.OperationalError:
102 self.fail("pysqlite knew nothing about the implicit ROLLBACK")
103
104 def CheckWorkaroundForBuggySqliteTransferBindings(self):
105 """
106 pysqlite would crash with older SQLite versions unless
107 a workaround is implemented.
108 """
109 self.con.execute("create table foo(bar)")
110 self.con.execute("drop table foo")
111 self.con.execute("create table foo(bar)")
112
113 def CheckEmptyStatement(self):
114 """
115 pysqlite used to segfault with SQLite versions 3.5.x. These return NULL
116 for "no-operation" statements
117 """
118 self.con.execute("")
119
120 def CheckTypeMapUsage(self):
121 """
122 pysqlite until 2.4.1 did not rebuild the row_cast_map when recompiling
123 a statement. This test exhibits the problem.
124 """
125 SELECT = "select * from foo"
126 con = sqlite.connect(":memory:",detect_types=sqlite.PARSE_DECLTYPES)
127 con.execute("create table foo(bar timestamp)")
128 con.execute("insert into foo(bar) values (?)", (datetime.datetime.now(),))
129 con.execute(SELECT)
130 con.execute("drop table foo")
131 con.execute("create table foo(bar integer)")
132 con.execute("insert into foo(bar) values (5)")
133 con.execute(SELECT)
134
Gerhard Häring873d9ff2008-02-29 22:22:09 +0000135 def CheckErrorMsgDecodeError(self):
136 # When porting the module to Python 3.0, the error message about
137 # decoding errors disappeared. This verifies they're back again.
Berker Peksag48b5c982016-06-14 00:42:50 +0300138 with self.assertRaises(sqlite.OperationalError) as cm:
Georg Brandl3dbca812008-07-23 16:10:53 +0000139 self.con.execute("select 'xxx' || ? || 'yyy' colname",
140 (bytes(bytearray([250])),)).fetchone()
Berker Peksag48b5c982016-06-14 00:42:50 +0300141 msg = "Could not decode to UTF-8 column 'colname' with text 'xxx"
142 self.assertIn(msg, str(cm.exception))
Gerhard Häring873d9ff2008-02-29 22:22:09 +0000143
Georg Brandl3dbca812008-07-23 16:10:53 +0000144 def CheckRegisterAdapter(self):
145 """
146 See issue 3312.
147 """
148 self.assertRaises(TypeError, sqlite.register_adapter, {}, None)
149
150 def CheckSetIsolationLevel(self):
Serhiy Storchaka28914922016-09-01 22:18:03 +0300151 # See issue 27881.
152 class CustomStr(str):
153 def upper(self):
154 return None
155 def __del__(self):
156 con.isolation_level = ""
157
Georg Brandl3dbca812008-07-23 16:10:53 +0000158 con = sqlite.connect(":memory:")
Serhiy Storchaka28914922016-09-01 22:18:03 +0300159 con.isolation_level = None
160 for level in "", "DEFERRED", "IMMEDIATE", "EXCLUSIVE":
161 with self.subTest(level=level):
162 con.isolation_level = level
163 con.isolation_level = level.lower()
164 con.isolation_level = level.capitalize()
165 con.isolation_level = CustomStr(level)
166
167 # setting isolation_level failure should not alter previous state
168 con.isolation_level = None
169 con.isolation_level = "DEFERRED"
170 pairs = [
171 (1, TypeError), (b'', TypeError), ("abc", ValueError),
172 ("IMMEDIATE\0EXCLUSIVE", ValueError), ("\xe9", ValueError),
173 ]
174 for value, exc in pairs:
175 with self.subTest(level=value):
176 with self.assertRaises(exc):
177 con.isolation_level = value
178 self.assertEqual(con.isolation_level, "DEFERRED")
Georg Brandl3dbca812008-07-23 16:10:53 +0000179
Gerhard Häringf9cee222010-03-05 15:20:03 +0000180 def CheckCursorConstructorCallCheck(self):
181 """
Ezio Melottib5bc3532013-08-17 16:11:40 +0300182 Verifies that cursor methods check whether base class __init__ was
183 called.
Gerhard Häringf9cee222010-03-05 15:20:03 +0000184 """
185 class Cursor(sqlite.Cursor):
186 def __init__(self, con):
187 pass
188
189 con = sqlite.connect(":memory:")
190 cur = Cursor(con)
Berker Peksag1003b342016-06-12 22:34:49 +0300191 with self.assertRaises(sqlite.ProgrammingError):
Gerhard Häringf9cee222010-03-05 15:20:03 +0000192 cur.execute("select 4+5").fetchall()
Oren Milmanedb13ae2017-11-07 02:09:49 +0200193 with self.assertRaisesRegex(sqlite.ProgrammingError,
194 r'^Base Cursor\.__init__ not called\.$'):
195 cur.close()
Gerhard Häringf9cee222010-03-05 15:20:03 +0000196
Gerhard Häring6117f422008-09-22 06:04:51 +0000197 def CheckStrSubclass(self):
198 """
199 The Python 3.0 port of the module didn't cope with values of subclasses of str.
200 """
201 class MyStr(str): pass
202 self.con.execute("select ?", (MyStr("abc"),))
Georg Brandl3dbca812008-07-23 16:10:53 +0000203
Gerhard Häringf9cee222010-03-05 15:20:03 +0000204 def CheckConnectionConstructorCallCheck(self):
205 """
Ezio Melottib5bc3532013-08-17 16:11:40 +0300206 Verifies that connection methods check whether base class __init__ was
207 called.
Gerhard Häringf9cee222010-03-05 15:20:03 +0000208 """
209 class Connection(sqlite.Connection):
210 def __init__(self, name):
211 pass
212
213 con = Connection(":memory:")
Berker Peksag1003b342016-06-12 22:34:49 +0300214 with self.assertRaises(sqlite.ProgrammingError):
Gerhard Häringf9cee222010-03-05 15:20:03 +0000215 cur = con.cursor()
Gerhard Häringf9cee222010-03-05 15:20:03 +0000216
217 def CheckCursorRegistration(self):
218 """
219 Verifies that subclassed cursor classes are correctly registered with
220 the connection object, too. (fetch-across-rollback problem)
221 """
222 class Connection(sqlite.Connection):
223 def cursor(self):
224 return Cursor(self)
225
226 class Cursor(sqlite.Cursor):
227 def __init__(self, con):
228 sqlite.Cursor.__init__(self, con)
229
230 con = Connection(":memory:")
231 cur = con.cursor()
232 cur.execute("create table foo(x)")
233 cur.executemany("insert into foo(x) values (?)", [(3,), (4,), (5,)])
234 cur.execute("select x from foo")
235 con.rollback()
Berker Peksag1003b342016-06-12 22:34:49 +0300236 with self.assertRaises(sqlite.InterfaceError):
Gerhard Häringf9cee222010-03-05 15:20:03 +0000237 cur.fetchall()
Gerhard Häringf9cee222010-03-05 15:20:03 +0000238
239 def CheckAutoCommit(self):
240 """
241 Verifies that creating a connection in autocommit mode works.
242 2.5.3 introduced a regression so that these could no longer
243 be created.
244 """
245 con = sqlite.connect(":memory:", isolation_level=None)
246
247 def CheckPragmaAutocommit(self):
248 """
249 Verifies that running a PRAGMA statement that does an autocommit does
250 work. This did not work in 2.5.3/2.5.4.
251 """
Victor Stinner0201f442010-03-13 03:28:34 +0000252 cur = self.con.cursor()
Gerhard Häringf9cee222010-03-05 15:20:03 +0000253 cur.execute("create table foo(bar)")
254 cur.execute("insert into foo(bar) values (5)")
255
256 cur.execute("pragma page_size")
257 row = cur.fetchone()
258
259 def CheckSetDict(self):
260 """
261 See http://bugs.python.org/issue7478
262
263 It was possible to successfully register callbacks that could not be
264 hashed. Return codes of PyDict_SetItem were not checked properly.
265 """
266 class NotHashable:
267 def __call__(self, *args, **kw):
268 pass
269 def __hash__(self):
270 raise TypeError()
271 var = NotHashable()
Victor Stinner0201f442010-03-13 03:28:34 +0000272 self.assertRaises(TypeError, self.con.create_function, var)
273 self.assertRaises(TypeError, self.con.create_aggregate, var)
274 self.assertRaises(TypeError, self.con.set_authorizer, var)
275 self.assertRaises(TypeError, self.con.set_progress_handler, var)
276
277 def CheckConnectionCall(self):
278 """
279 Call a connection with a non-string SQL request: check error handling
280 of the statement constructor.
281 """
282 self.assertRaises(sqlite.Warning, self.con, 1)
Gerhard Häringf9cee222010-03-05 15:20:03 +0000283
Victor Stinner35466c52010-04-22 11:23:23 +0000284 def CheckCollation(self):
285 def collation_cb(a, b):
286 return 1
287 self.assertRaises(sqlite.ProgrammingError, self.con.create_collation,
288 # Lone surrogate cannot be encoded to the default encoding (utf8)
289 "\uDC80", collation_cb)
290
Petri Lehtinen4a84f582011-05-09 12:24:09 +0200291 def CheckRecursiveCursorUse(self):
292 """
293 http://bugs.python.org/issue10811
294
295 Recursively using a cursor, such as when reusing it from a generator led to segfaults.
296 Now we catch recursive cursor usage and raise a ProgrammingError.
297 """
298 con = sqlite.connect(":memory:")
299
300 cur = con.cursor()
301 cur.execute("create table a (bar)")
302 cur.execute("create table b (baz)")
303
304 def foo():
305 cur.execute("insert into a (bar) values (?)", (1,))
306 yield 1
307
308 with self.assertRaises(sqlite.ProgrammingError):
309 cur.executemany("insert into b (baz) values (?)",
310 ((i,) for i in foo()))
311
Petri Lehtinen8b945142013-02-23 19:05:09 +0100312 def CheckConvertTimestampMicrosecondPadding(self):
313 """
314 http://bugs.python.org/issue14720
315
316 The microsecond parsing of convert_timestamp() should pad with zeros,
317 since the microsecond string "456" actually represents "456000".
318 """
319
320 con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_DECLTYPES)
321 cur = con.cursor()
322 cur.execute("CREATE TABLE t (x TIMESTAMP)")
Petri Lehtinen8b945142013-02-23 19:05:09 +0100323
Petri Lehtinen5f794092013-02-26 21:32:02 +0200324 # Microseconds should be 456000
325 cur.execute("INSERT INTO t (x) VALUES ('2012-04-04 15:06:00.456')")
326
327 # Microseconds should be truncated to 123456
328 cur.execute("INSERT INTO t (x) VALUES ('2012-04-04 15:06:00.123456789')")
329
330 cur.execute("SELECT * FROM t")
331 values = [x[0] for x in cur.fetchall()]
332
333 self.assertEqual(values, [
334 datetime.datetime(2012, 4, 4, 15, 6, 0, 456000),
335 datetime.datetime(2012, 4, 4, 15, 6, 0, 123456),
336 ])
Petri Lehtinen8b945142013-02-23 19:05:09 +0100337
Victor Stinnercb1f74e2013-12-19 16:38:03 +0100338 def CheckInvalidIsolationLevelType(self):
339 # isolation level is a string, not an integer
340 self.assertRaises(TypeError,
341 sqlite.connect, ":memory:", isolation_level=123)
342
Petri Lehtinen4a84f582011-05-09 12:24:09 +0200343
Serhiy Storchaka42d67af2014-09-11 13:29:05 +0300344 def CheckNullCharacter(self):
345 # Issue #21147
346 con = sqlite.connect(":memory:")
347 self.assertRaises(ValueError, con, "\0select 1")
348 self.assertRaises(ValueError, con, "select 1\0")
349 cur = con.cursor()
350 self.assertRaises(ValueError, cur.execute, " \0select 2")
351 self.assertRaises(ValueError, cur.execute, "select 2\0")
352
Berker Peksagcc9afa92016-08-26 22:07:51 +0300353 def CheckCommitCursorReset(self):
354 """
355 Connection.commit() did reset cursors, which made sqlite3
356 to return rows multiple times when fetched from cursors
357 after commit. See issues 10513 and 23129 for details.
358 """
359 con = sqlite.connect(":memory:")
360 con.executescript("""
361 create table t(c);
362 create table t2(c);
363 insert into t values(0);
364 insert into t values(1);
365 insert into t values(2);
366 """)
367
368 self.assertEqual(con.isolation_level, "")
369
370 counter = 0
371 for i, row in enumerate(con.execute("select c from t")):
372 with self.subTest(i=i, row=row):
373 con.execute("insert into t2(c) values (?)", (i,))
374 con.commit()
375 if counter == 0:
376 self.assertEqual(row[0], 0)
377 elif counter == 1:
378 self.assertEqual(row[0], 1)
379 elif counter == 2:
380 self.assertEqual(row[0], 2)
381 counter += 1
382 self.assertEqual(counter, 3, "should have returned exactly three rows")
383
Oren Milmane56ab742017-11-07 02:01:47 +0200384 def CheckBpo31770(self):
385 """
386 The interpreter shouldn't crash in case Cursor.__init__() is called
387 more than once.
388 """
389 def callback(*args):
390 pass
391 con = sqlite.connect(":memory:")
392 cur = sqlite.Cursor(con)
393 ref = weakref.ref(cur, callback)
394 cur.__init__(con)
395 del cur
396 # The interpreter shouldn't crash when ref is collected.
397 del ref
398 support.gc_collect()
399
Serhiy Storchaka42d67af2014-09-11 13:29:05 +0300400
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000401def suite():
402 regression_suite = unittest.makeSuite(RegressionTests, "Check")
403 return unittest.TestSuite((regression_suite,))
404
405def test():
406 runner = unittest.TextTestRunner()
407 runner.run(suite())
408
409if __name__ == "__main__":
410 test()