blob: 3ff9abd98993f7865191f6f23d2e03025bfee0da [file] [log] [blame]
Petri Lehtinen8b945142013-02-23 19:05:09 +01001#-*- coding: iso-8859-1 -*-
Thomas Wouters49fd7fa2006-04-21 10:40:58 +00002# pysqlite2/test/regression.py: pysqlite regression tests
3#
Gerhard Häringf9cee222010-03-05 15:20:03 +00004# Copyright (C) 2006-2010 Gerhard Häring <gh@ghaering.de>
Thomas Wouters49fd7fa2006-04-21 10:40:58 +00005#
6# This file is part of pysqlite.
7#
8# This software is provided 'as-is', without any express or implied
9# warranty. In no event will the authors be held liable for any damages
10# arising from the use of this software.
11#
12# Permission is granted to anyone to use this software for any purpose,
13# including commercial applications, and to alter it and redistribute it
14# freely, subject to the following restrictions:
15#
16# 1. The origin of this software must not be misrepresented; you must not
17# claim that you wrote the original software. If you use this software
18# in a product, an acknowledgment in the product documentation would be
19# appreciated but is not required.
20# 2. Altered source versions must be plainly marked as such, and must not be
21# misrepresented as being the original software.
22# 3. This notice may not be removed or altered from any source distribution.
23
Gerhard Häringe7ea7452008-03-29 00:45:29 +000024import datetime
Thomas Wouters49fd7fa2006-04-21 10:40:58 +000025import unittest
Thomas Wouters477c8d52006-05-27 19:21:47 +000026import sqlite3 as sqlite
Oren Milmane56ab742017-11-07 02:01:47 +020027import weakref
28from test import support
Thomas Wouters49fd7fa2006-04-21 10:40:58 +000029
30class RegressionTests(unittest.TestCase):
31 def setUp(self):
32 self.con = sqlite.connect(":memory:")
33
34 def tearDown(self):
35 self.con.close()
36
37 def CheckPragmaUserVersion(self):
38 # This used to crash pysqlite because this pragma command returns NULL for the column name
39 cur = self.con.cursor()
40 cur.execute("pragma user_version")
41
Thomas Wouters477c8d52006-05-27 19:21:47 +000042 def CheckPragmaSchemaVersion(self):
43 # This still crashed pysqlite <= 2.2.1
44 con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_COLNAMES)
45 try:
46 cur = self.con.cursor()
47 cur.execute("pragma schema_version")
48 finally:
49 cur.close()
50 con.close()
51
52 def CheckStatementReset(self):
53 # pysqlite 2.1.0 to 2.2.0 have the problem that not all statements are
54 # reset before a rollback, but only those that are still in the
55 # statement cache. The others are not accessible from the connection object.
56 con = sqlite.connect(":memory:", cached_statements=5)
Guido van Rossum805365e2007-05-07 22:24:25 +000057 cursors = [con.cursor() for x in range(5)]
Thomas Wouters477c8d52006-05-27 19:21:47 +000058 cursors[0].execute("create table test(x)")
59 for i in range(10):
Guido van Rossum805365e2007-05-07 22:24:25 +000060 cursors[0].executemany("insert into test(x) values (?)", [(x,) for x in range(10)])
Thomas Wouters477c8d52006-05-27 19:21:47 +000061
62 for i in range(5):
63 cursors[i].execute(" " * i + "select x from test")
64
65 con.rollback()
66
Thomas Wouters0e3f5912006-08-11 14:57:12 +000067 def CheckColumnNameWithSpaces(self):
68 cur = self.con.cursor()
69 cur.execute('select 1 as "foo bar [datetime]"')
Gregory P. Smith04cecaf2009-07-04 08:32:15 +000070 self.assertEqual(cur.description[0][0], "foo bar")
Thomas Wouters0e3f5912006-08-11 14:57:12 +000071
72 cur.execute('select 1 as "foo baz"')
Gregory P. Smith04cecaf2009-07-04 08:32:15 +000073 self.assertEqual(cur.description[0][0], "foo baz")
Thomas Wouters0e3f5912006-08-11 14:57:12 +000074
Gerhard Häringe7ea7452008-03-29 00:45:29 +000075 def CheckStatementFinalizationOnCloseDb(self):
76 # pysqlite versions <= 2.3.3 only finalized statements in the statement
77 # cache when closing the database. statements that were still
Serhiy Storchaka6a7b3a72016-04-17 08:32:47 +030078 # referenced in cursors weren't closed and could provoke "
Gerhard Häringe7ea7452008-03-29 00:45:29 +000079 # "OperationalError: Unable to close due to unfinalised statements".
80 con = sqlite.connect(":memory:")
81 cursors = []
82 # default statement cache size is 100
83 for i in range(105):
84 cur = con.cursor()
85 cursors.append(cur)
86 cur.execute("select 1 x union select " + str(i))
87 con.close()
88
Berker Peksagf85bce72016-06-14 14:19:02 +030089 @unittest.skipIf(sqlite.sqlite_version_info < (3, 2, 2), 'needs sqlite 3.2.2 or newer')
Gerhard Häringe7ea7452008-03-29 00:45:29 +000090 def CheckOnConflictRollback(self):
Gerhard Häringe7ea7452008-03-29 00:45:29 +000091 con = sqlite.connect(":memory:")
92 con.execute("create table foo(x, unique(x) on conflict rollback)")
93 con.execute("insert into foo(x) values (1)")
94 try:
95 con.execute("insert into foo(x) values (1)")
96 except sqlite.DatabaseError:
97 pass
98 con.execute("insert into foo(x) values (2)")
99 try:
100 con.commit()
101 except sqlite.OperationalError:
102 self.fail("pysqlite knew nothing about the implicit ROLLBACK")
103
104 def CheckWorkaroundForBuggySqliteTransferBindings(self):
105 """
106 pysqlite would crash with older SQLite versions unless
107 a workaround is implemented.
108 """
109 self.con.execute("create table foo(bar)")
110 self.con.execute("drop table foo")
111 self.con.execute("create table foo(bar)")
112
113 def CheckEmptyStatement(self):
114 """
115 pysqlite used to segfault with SQLite versions 3.5.x. These return NULL
116 for "no-operation" statements
117 """
118 self.con.execute("")
119
120 def CheckTypeMapUsage(self):
121 """
122 pysqlite until 2.4.1 did not rebuild the row_cast_map when recompiling
123 a statement. This test exhibits the problem.
124 """
125 SELECT = "select * from foo"
126 con = sqlite.connect(":memory:",detect_types=sqlite.PARSE_DECLTYPES)
127 con.execute("create table foo(bar timestamp)")
128 con.execute("insert into foo(bar) values (?)", (datetime.datetime.now(),))
129 con.execute(SELECT)
130 con.execute("drop table foo")
131 con.execute("create table foo(bar integer)")
132 con.execute("insert into foo(bar) values (5)")
133 con.execute(SELECT)
134
Gerhard Häring873d9ff2008-02-29 22:22:09 +0000135 def CheckErrorMsgDecodeError(self):
136 # When porting the module to Python 3.0, the error message about
137 # decoding errors disappeared. This verifies they're back again.
Berker Peksag48b5c982016-06-14 00:42:50 +0300138 with self.assertRaises(sqlite.OperationalError) as cm:
Georg Brandl3dbca812008-07-23 16:10:53 +0000139 self.con.execute("select 'xxx' || ? || 'yyy' colname",
140 (bytes(bytearray([250])),)).fetchone()
Berker Peksag48b5c982016-06-14 00:42:50 +0300141 msg = "Could not decode to UTF-8 column 'colname' with text 'xxx"
142 self.assertIn(msg, str(cm.exception))
Gerhard Häring873d9ff2008-02-29 22:22:09 +0000143
Georg Brandl3dbca812008-07-23 16:10:53 +0000144 def CheckRegisterAdapter(self):
145 """
146 See issue 3312.
147 """
148 self.assertRaises(TypeError, sqlite.register_adapter, {}, None)
149
150 def CheckSetIsolationLevel(self):
Serhiy Storchaka28914922016-09-01 22:18:03 +0300151 # See issue 27881.
152 class CustomStr(str):
153 def upper(self):
154 return None
155 def __del__(self):
156 con.isolation_level = ""
157
Georg Brandl3dbca812008-07-23 16:10:53 +0000158 con = sqlite.connect(":memory:")
Serhiy Storchaka28914922016-09-01 22:18:03 +0300159 con.isolation_level = None
160 for level in "", "DEFERRED", "IMMEDIATE", "EXCLUSIVE":
161 with self.subTest(level=level):
162 con.isolation_level = level
163 con.isolation_level = level.lower()
164 con.isolation_level = level.capitalize()
165 con.isolation_level = CustomStr(level)
166
167 # setting isolation_level failure should not alter previous state
168 con.isolation_level = None
169 con.isolation_level = "DEFERRED"
170 pairs = [
171 (1, TypeError), (b'', TypeError), ("abc", ValueError),
172 ("IMMEDIATE\0EXCLUSIVE", ValueError), ("\xe9", ValueError),
173 ]
174 for value, exc in pairs:
175 with self.subTest(level=value):
176 with self.assertRaises(exc):
177 con.isolation_level = value
178 self.assertEqual(con.isolation_level, "DEFERRED")
Georg Brandl3dbca812008-07-23 16:10:53 +0000179
Gerhard Häringf9cee222010-03-05 15:20:03 +0000180 def CheckCursorConstructorCallCheck(self):
181 """
Ezio Melottib5bc3532013-08-17 16:11:40 +0300182 Verifies that cursor methods check whether base class __init__ was
183 called.
Gerhard Häringf9cee222010-03-05 15:20:03 +0000184 """
185 class Cursor(sqlite.Cursor):
186 def __init__(self, con):
187 pass
188
189 con = sqlite.connect(":memory:")
190 cur = Cursor(con)
Berker Peksag1003b342016-06-12 22:34:49 +0300191 with self.assertRaises(sqlite.ProgrammingError):
Gerhard Häringf9cee222010-03-05 15:20:03 +0000192 cur.execute("select 4+5").fetchall()
Gerhard Häringf9cee222010-03-05 15:20:03 +0000193
Gerhard Häring6117f422008-09-22 06:04:51 +0000194 def CheckStrSubclass(self):
195 """
196 The Python 3.0 port of the module didn't cope with values of subclasses of str.
197 """
198 class MyStr(str): pass
199 self.con.execute("select ?", (MyStr("abc"),))
Georg Brandl3dbca812008-07-23 16:10:53 +0000200
Gerhard Häringf9cee222010-03-05 15:20:03 +0000201 def CheckConnectionConstructorCallCheck(self):
202 """
Ezio Melottib5bc3532013-08-17 16:11:40 +0300203 Verifies that connection methods check whether base class __init__ was
204 called.
Gerhard Häringf9cee222010-03-05 15:20:03 +0000205 """
206 class Connection(sqlite.Connection):
207 def __init__(self, name):
208 pass
209
210 con = Connection(":memory:")
Berker Peksag1003b342016-06-12 22:34:49 +0300211 with self.assertRaises(sqlite.ProgrammingError):
Gerhard Häringf9cee222010-03-05 15:20:03 +0000212 cur = con.cursor()
Gerhard Häringf9cee222010-03-05 15:20:03 +0000213
214 def CheckCursorRegistration(self):
215 """
216 Verifies that subclassed cursor classes are correctly registered with
217 the connection object, too. (fetch-across-rollback problem)
218 """
219 class Connection(sqlite.Connection):
220 def cursor(self):
221 return Cursor(self)
222
223 class Cursor(sqlite.Cursor):
224 def __init__(self, con):
225 sqlite.Cursor.__init__(self, con)
226
227 con = Connection(":memory:")
228 cur = con.cursor()
229 cur.execute("create table foo(x)")
230 cur.executemany("insert into foo(x) values (?)", [(3,), (4,), (5,)])
231 cur.execute("select x from foo")
232 con.rollback()
Berker Peksag1003b342016-06-12 22:34:49 +0300233 with self.assertRaises(sqlite.InterfaceError):
Gerhard Häringf9cee222010-03-05 15:20:03 +0000234 cur.fetchall()
Gerhard Häringf9cee222010-03-05 15:20:03 +0000235
236 def CheckAutoCommit(self):
237 """
238 Verifies that creating a connection in autocommit mode works.
239 2.5.3 introduced a regression so that these could no longer
240 be created.
241 """
242 con = sqlite.connect(":memory:", isolation_level=None)
243
244 def CheckPragmaAutocommit(self):
245 """
246 Verifies that running a PRAGMA statement that does an autocommit does
247 work. This did not work in 2.5.3/2.5.4.
248 """
Victor Stinner0201f442010-03-13 03:28:34 +0000249 cur = self.con.cursor()
Gerhard Häringf9cee222010-03-05 15:20:03 +0000250 cur.execute("create table foo(bar)")
251 cur.execute("insert into foo(bar) values (5)")
252
253 cur.execute("pragma page_size")
254 row = cur.fetchone()
255
256 def CheckSetDict(self):
257 """
258 See http://bugs.python.org/issue7478
259
260 It was possible to successfully register callbacks that could not be
261 hashed. Return codes of PyDict_SetItem were not checked properly.
262 """
263 class NotHashable:
264 def __call__(self, *args, **kw):
265 pass
266 def __hash__(self):
267 raise TypeError()
268 var = NotHashable()
Victor Stinner0201f442010-03-13 03:28:34 +0000269 self.assertRaises(TypeError, self.con.create_function, var)
270 self.assertRaises(TypeError, self.con.create_aggregate, var)
271 self.assertRaises(TypeError, self.con.set_authorizer, var)
272 self.assertRaises(TypeError, self.con.set_progress_handler, var)
273
274 def CheckConnectionCall(self):
275 """
276 Call a connection with a non-string SQL request: check error handling
277 of the statement constructor.
278 """
279 self.assertRaises(sqlite.Warning, self.con, 1)
Gerhard Häringf9cee222010-03-05 15:20:03 +0000280
Victor Stinner35466c52010-04-22 11:23:23 +0000281 def CheckCollation(self):
282 def collation_cb(a, b):
283 return 1
284 self.assertRaises(sqlite.ProgrammingError, self.con.create_collation,
285 # Lone surrogate cannot be encoded to the default encoding (utf8)
286 "\uDC80", collation_cb)
287
Petri Lehtinen4a84f582011-05-09 12:24:09 +0200288 def CheckRecursiveCursorUse(self):
289 """
290 http://bugs.python.org/issue10811
291
292 Recursively using a cursor, such as when reusing it from a generator led to segfaults.
293 Now we catch recursive cursor usage and raise a ProgrammingError.
294 """
295 con = sqlite.connect(":memory:")
296
297 cur = con.cursor()
298 cur.execute("create table a (bar)")
299 cur.execute("create table b (baz)")
300
301 def foo():
302 cur.execute("insert into a (bar) values (?)", (1,))
303 yield 1
304
305 with self.assertRaises(sqlite.ProgrammingError):
306 cur.executemany("insert into b (baz) values (?)",
307 ((i,) for i in foo()))
308
Petri Lehtinen8b945142013-02-23 19:05:09 +0100309 def CheckConvertTimestampMicrosecondPadding(self):
310 """
311 http://bugs.python.org/issue14720
312
313 The microsecond parsing of convert_timestamp() should pad with zeros,
314 since the microsecond string "456" actually represents "456000".
315 """
316
317 con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_DECLTYPES)
318 cur = con.cursor()
319 cur.execute("CREATE TABLE t (x TIMESTAMP)")
Petri Lehtinen8b945142013-02-23 19:05:09 +0100320
Petri Lehtinen5f794092013-02-26 21:32:02 +0200321 # Microseconds should be 456000
322 cur.execute("INSERT INTO t (x) VALUES ('2012-04-04 15:06:00.456')")
323
324 # Microseconds should be truncated to 123456
325 cur.execute("INSERT INTO t (x) VALUES ('2012-04-04 15:06:00.123456789')")
326
327 cur.execute("SELECT * FROM t")
328 values = [x[0] for x in cur.fetchall()]
329
330 self.assertEqual(values, [
331 datetime.datetime(2012, 4, 4, 15, 6, 0, 456000),
332 datetime.datetime(2012, 4, 4, 15, 6, 0, 123456),
333 ])
Petri Lehtinen8b945142013-02-23 19:05:09 +0100334
Victor Stinnercb1f74e2013-12-19 16:38:03 +0100335 def CheckInvalidIsolationLevelType(self):
336 # isolation level is a string, not an integer
337 self.assertRaises(TypeError,
338 sqlite.connect, ":memory:", isolation_level=123)
339
Petri Lehtinen4a84f582011-05-09 12:24:09 +0200340
Serhiy Storchaka42d67af2014-09-11 13:29:05 +0300341 def CheckNullCharacter(self):
342 # Issue #21147
343 con = sqlite.connect(":memory:")
344 self.assertRaises(ValueError, con, "\0select 1")
345 self.assertRaises(ValueError, con, "select 1\0")
346 cur = con.cursor()
347 self.assertRaises(ValueError, cur.execute, " \0select 2")
348 self.assertRaises(ValueError, cur.execute, "select 2\0")
349
Berker Peksagcc9afa92016-08-26 22:07:51 +0300350 def CheckCommitCursorReset(self):
351 """
352 Connection.commit() did reset cursors, which made sqlite3
353 to return rows multiple times when fetched from cursors
354 after commit. See issues 10513 and 23129 for details.
355 """
356 con = sqlite.connect(":memory:")
357 con.executescript("""
358 create table t(c);
359 create table t2(c);
360 insert into t values(0);
361 insert into t values(1);
362 insert into t values(2);
363 """)
364
365 self.assertEqual(con.isolation_level, "")
366
367 counter = 0
368 for i, row in enumerate(con.execute("select c from t")):
369 with self.subTest(i=i, row=row):
370 con.execute("insert into t2(c) values (?)", (i,))
371 con.commit()
372 if counter == 0:
373 self.assertEqual(row[0], 0)
374 elif counter == 1:
375 self.assertEqual(row[0], 1)
376 elif counter == 2:
377 self.assertEqual(row[0], 2)
378 counter += 1
379 self.assertEqual(counter, 3, "should have returned exactly three rows")
380
Oren Milmane56ab742017-11-07 02:01:47 +0200381 def CheckBpo31770(self):
382 """
383 The interpreter shouldn't crash in case Cursor.__init__() is called
384 more than once.
385 """
386 def callback(*args):
387 pass
388 con = sqlite.connect(":memory:")
389 cur = sqlite.Cursor(con)
390 ref = weakref.ref(cur, callback)
391 cur.__init__(con)
392 del cur
393 # The interpreter shouldn't crash when ref is collected.
394 del ref
395 support.gc_collect()
396
Serhiy Storchaka42d67af2014-09-11 13:29:05 +0300397
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000398def suite():
399 regression_suite = unittest.makeSuite(RegressionTests, "Check")
400 return unittest.TestSuite((regression_suite,))
401
402def test():
403 runner = unittest.TextTestRunner()
404 runner.run(suite())
405
406if __name__ == "__main__":
407 test()