blob: 04649fc5492d77efd7abe271beaf5298caf8e70d [file] [log] [blame]
Petri Lehtinenf8547992012-02-02 17:17:36 +02001#-*- coding: iso-8859-1 -*-
Thomas Wouters49fd7fa2006-04-21 10:40:58 +00002# pysqlite2/test/dbapi.py: tests for DB-API compliance
3#
Gerhard Häringf9cee222010-03-05 15:20:03 +00004# Copyright (C) 2004-2010 Gerhard Häring <gh@ghaering.de>
Thomas Wouters49fd7fa2006-04-21 10:40:58 +00005#
6# This file is part of pysqlite.
7#
8# This software is provided 'as-is', without any express or implied
9# warranty. In no event will the authors be held liable for any damages
10# arising from the use of this software.
11#
12# Permission is granted to anyone to use this software for any purpose,
13# including commercial applications, and to alter it and redistribute it
14# freely, subject to the following restrictions:
15#
16# 1. The origin of this software must not be misrepresented; you must not
17# claim that you wrote the original software. If you use this software
18# in a product, an acknowledgment in the product documentation would be
19# appreciated but is not required.
20# 2. Altered source versions must be plainly marked as such, and must not be
21# misrepresented as being the original software.
22# 3. This notice may not be removed or altered from any source distribution.
23
24import unittest
Thomas Wouters49fd7fa2006-04-21 10:40:58 +000025import sqlite3 as sqlite
Victor Stinner45df8202010-04-28 22:31:17 +000026try:
27 import threading
28except ImportError:
29 threading = None
Thomas Wouters49fd7fa2006-04-21 10:40:58 +000030
Antoine Pitrou902fc8b2013-02-10 00:02:44 +010031from test.support import TESTFN, unlink
32
33
Thomas Wouters49fd7fa2006-04-21 10:40:58 +000034class ModuleTests(unittest.TestCase):
35 def CheckAPILevel(self):
36 self.assertEqual(sqlite.apilevel, "2.0",
37 "apilevel is %s, should be 2.0" % sqlite.apilevel)
38
39 def CheckThreadSafety(self):
40 self.assertEqual(sqlite.threadsafety, 1,
41 "threadsafety is %d, should be 1" % sqlite.threadsafety)
42
43 def CheckParamStyle(self):
44 self.assertEqual(sqlite.paramstyle, "qmark",
45 "paramstyle is '%s', should be 'qmark'" %
46 sqlite.paramstyle)
47
48 def CheckWarning(self):
Ezio Melottib3aedd42010-11-20 19:04:17 +000049 self.assertTrue(issubclass(sqlite.Warning, Exception),
Guido van Rossumcd16bf62007-06-13 18:07:49 +000050 "Warning is not a subclass of Exception")
Thomas Wouters49fd7fa2006-04-21 10:40:58 +000051
52 def CheckError(self):
Gregory P. Smith04cecaf2009-07-04 08:32:15 +000053 self.assertTrue(issubclass(sqlite.Error, Exception),
Guido van Rossumcd16bf62007-06-13 18:07:49 +000054 "Error is not a subclass of Exception")
Thomas Wouters49fd7fa2006-04-21 10:40:58 +000055
56 def CheckInterfaceError(self):
Gregory P. Smith04cecaf2009-07-04 08:32:15 +000057 self.assertTrue(issubclass(sqlite.InterfaceError, sqlite.Error),
Thomas Wouters49fd7fa2006-04-21 10:40:58 +000058 "InterfaceError is not a subclass of Error")
59
60 def CheckDatabaseError(self):
Gregory P. Smith04cecaf2009-07-04 08:32:15 +000061 self.assertTrue(issubclass(sqlite.DatabaseError, sqlite.Error),
Thomas Wouters49fd7fa2006-04-21 10:40:58 +000062 "DatabaseError is not a subclass of Error")
63
64 def CheckDataError(self):
Gregory P. Smith04cecaf2009-07-04 08:32:15 +000065 self.assertTrue(issubclass(sqlite.DataError, sqlite.DatabaseError),
Thomas Wouters49fd7fa2006-04-21 10:40:58 +000066 "DataError is not a subclass of DatabaseError")
67
68 def CheckOperationalError(self):
Gregory P. Smith04cecaf2009-07-04 08:32:15 +000069 self.assertTrue(issubclass(sqlite.OperationalError, sqlite.DatabaseError),
Thomas Wouters49fd7fa2006-04-21 10:40:58 +000070 "OperationalError is not a subclass of DatabaseError")
71
72 def CheckIntegrityError(self):
Gregory P. Smith04cecaf2009-07-04 08:32:15 +000073 self.assertTrue(issubclass(sqlite.IntegrityError, sqlite.DatabaseError),
Thomas Wouters49fd7fa2006-04-21 10:40:58 +000074 "IntegrityError is not a subclass of DatabaseError")
75
76 def CheckInternalError(self):
Gregory P. Smith04cecaf2009-07-04 08:32:15 +000077 self.assertTrue(issubclass(sqlite.InternalError, sqlite.DatabaseError),
Thomas Wouters49fd7fa2006-04-21 10:40:58 +000078 "InternalError is not a subclass of DatabaseError")
79
80 def CheckProgrammingError(self):
Gregory P. Smith04cecaf2009-07-04 08:32:15 +000081 self.assertTrue(issubclass(sqlite.ProgrammingError, sqlite.DatabaseError),
Thomas Wouters49fd7fa2006-04-21 10:40:58 +000082 "ProgrammingError is not a subclass of DatabaseError")
83
84 def CheckNotSupportedError(self):
Gregory P. Smith04cecaf2009-07-04 08:32:15 +000085 self.assertTrue(issubclass(sqlite.NotSupportedError,
Thomas Wouters49fd7fa2006-04-21 10:40:58 +000086 sqlite.DatabaseError),
87 "NotSupportedError is not a subclass of DatabaseError")
88
89class ConnectionTests(unittest.TestCase):
R. David Murrayd35251d2010-06-01 01:32:12 +000090
Thomas Wouters49fd7fa2006-04-21 10:40:58 +000091 def setUp(self):
92 self.cx = sqlite.connect(":memory:")
93 cu = self.cx.cursor()
94 cu.execute("create table test(id integer primary key, name text)")
95 cu.execute("insert into test(name) values (?)", ("foo",))
96
97 def tearDown(self):
98 self.cx.close()
99
100 def CheckCommit(self):
101 self.cx.commit()
102
103 def CheckCommitAfterNoChanges(self):
104 """
105 A commit should also work when no changes were made to the database.
106 """
107 self.cx.commit()
108 self.cx.commit()
109
110 def CheckRollback(self):
111 self.cx.rollback()
112
113 def CheckRollbackAfterNoChanges(self):
114 """
115 A rollback should also work when no changes were made to the database.
116 """
117 self.cx.rollback()
118 self.cx.rollback()
119
120 def CheckCursor(self):
121 cu = self.cx.cursor()
122
123 def CheckFailedOpen(self):
124 YOU_CANNOT_OPEN_THIS = "/foo/bar/bla/23534/mydb.db"
125 try:
126 con = sqlite.connect(YOU_CANNOT_OPEN_THIS)
127 except sqlite.OperationalError:
128 return
129 self.fail("should have raised an OperationalError")
130
131 def CheckClose(self):
132 self.cx.close()
133
134 def CheckExceptions(self):
135 # Optional DB-API extension.
Gregory P. Smith04cecaf2009-07-04 08:32:15 +0000136 self.assertEqual(self.cx.Warning, sqlite.Warning)
137 self.assertEqual(self.cx.Error, sqlite.Error)
138 self.assertEqual(self.cx.InterfaceError, sqlite.InterfaceError)
139 self.assertEqual(self.cx.DatabaseError, sqlite.DatabaseError)
140 self.assertEqual(self.cx.DataError, sqlite.DataError)
141 self.assertEqual(self.cx.OperationalError, sqlite.OperationalError)
142 self.assertEqual(self.cx.IntegrityError, sqlite.IntegrityError)
143 self.assertEqual(self.cx.InternalError, sqlite.InternalError)
144 self.assertEqual(self.cx.ProgrammingError, sqlite.ProgrammingError)
145 self.assertEqual(self.cx.NotSupportedError, sqlite.NotSupportedError)
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000146
R. David Murrayd35251d2010-06-01 01:32:12 +0000147 def CheckInTransaction(self):
148 # Can't use db from setUp because we want to test initial state.
149 cx = sqlite.connect(":memory:")
150 cu = cx.cursor()
151 self.assertEqual(cx.in_transaction, False)
152 cu.execute("create table transactiontest(id integer primary key, name text)")
153 self.assertEqual(cx.in_transaction, False)
154 cu.execute("insert into transactiontest(name) values (?)", ("foo",))
155 self.assertEqual(cx.in_transaction, True)
156 cu.execute("select name from transactiontest where name=?", ["foo"])
157 row = cu.fetchone()
158 self.assertEqual(cx.in_transaction, True)
159 cx.commit()
160 self.assertEqual(cx.in_transaction, False)
161 cu.execute("select name from transactiontest where name=?", ["foo"])
162 row = cu.fetchone()
163 self.assertEqual(cx.in_transaction, False)
164
165 def CheckInTransactionRO(self):
166 with self.assertRaises(AttributeError):
167 self.cx.in_transaction = True
168
Antoine Pitrou902fc8b2013-02-10 00:02:44 +0100169 def CheckOpenUri(self):
170 if sqlite.sqlite_version_info < (3, 7, 7):
171 with self.assertRaises(sqlite.NotSupportedError):
172 sqlite.connect(':memory:', uri=True)
173 return
174 self.addCleanup(unlink, TESTFN)
175 with sqlite.connect(TESTFN) as cx:
176 cx.execute('create table test(id integer)')
177 with sqlite.connect('file:' + TESTFN, uri=True) as cx:
178 cx.execute('insert into test(id) values(0)')
179 with sqlite.connect('file:' + TESTFN + '?mode=ro', uri=True) as cx:
180 with self.assertRaises(sqlite.OperationalError):
181 cx.execute('insert into test(id) values(1)')
182
183
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000184class CursorTests(unittest.TestCase):
185 def setUp(self):
186 self.cx = sqlite.connect(":memory:")
187 self.cu = self.cx.cursor()
188 self.cu.execute("create table test(id integer primary key, name text, income number)")
189 self.cu.execute("insert into test(name) values (?)", ("foo",))
190
191 def tearDown(self):
192 self.cu.close()
193 self.cx.close()
194
195 def CheckExecuteNoArgs(self):
196 self.cu.execute("delete from test")
197
198 def CheckExecuteIllegalSql(self):
199 try:
200 self.cu.execute("select asdf")
201 self.fail("should have raised an OperationalError")
202 except sqlite.OperationalError:
203 return
204 except:
205 self.fail("raised wrong exception")
206
207 def CheckExecuteTooMuchSql(self):
208 try:
209 self.cu.execute("select 5+4; select 4+5")
210 self.fail("should have raised a Warning")
211 except sqlite.Warning:
212 return
213 except:
214 self.fail("raised wrong exception")
215
216 def CheckExecuteTooMuchSql2(self):
217 self.cu.execute("select 5+4; -- foo bar")
218
219 def CheckExecuteTooMuchSql3(self):
220 self.cu.execute("""
221 select 5+4;
222
223 /*
224 foo
225 */
226 """)
227
228 def CheckExecuteWrongSqlArg(self):
229 try:
230 self.cu.execute(42)
231 self.fail("should have raised a ValueError")
232 except ValueError:
233 return
234 except:
235 self.fail("raised wrong exception.")
236
237 def CheckExecuteArgInt(self):
238 self.cu.execute("insert into test(id) values (?)", (42,))
239
240 def CheckExecuteArgFloat(self):
241 self.cu.execute("insert into test(income) values (?)", (2500.32,))
242
243 def CheckExecuteArgString(self):
244 self.cu.execute("insert into test(name) values (?)", ("Hugo",))
245
Petri Lehtinen023fe332012-02-01 22:18:19 +0200246 def CheckExecuteArgStringWithZeroByte(self):
247 self.cu.execute("insert into test(name) values (?)", ("Hu\x00go",))
248
249 self.cu.execute("select name from test where id=?", (self.cu.lastrowid,))
250 row = self.cu.fetchone()
251 self.assertEqual(row[0], "Hu\x00go")
252
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000253 def CheckExecuteWrongNoOfArgs1(self):
254 # too many parameters
255 try:
256 self.cu.execute("insert into test(id) values (?)", (17, "Egon"))
257 self.fail("should have raised ProgrammingError")
258 except sqlite.ProgrammingError:
259 pass
260
261 def CheckExecuteWrongNoOfArgs2(self):
262 # too little parameters
263 try:
264 self.cu.execute("insert into test(id) values (?)")
265 self.fail("should have raised ProgrammingError")
266 except sqlite.ProgrammingError:
267 pass
268
269 def CheckExecuteWrongNoOfArgs3(self):
270 # no parameters, parameters are needed
271 try:
272 self.cu.execute("insert into test(id) values (?)")
273 self.fail("should have raised ProgrammingError")
274 except sqlite.ProgrammingError:
275 pass
276
Gerhard Häringe7ea7452008-03-29 00:45:29 +0000277 def CheckExecuteParamList(self):
278 self.cu.execute("insert into test(name) values ('foo')")
279 self.cu.execute("select name from test where name=?", ["foo"])
280 row = self.cu.fetchone()
Gregory P. Smith04cecaf2009-07-04 08:32:15 +0000281 self.assertEqual(row[0], "foo")
Gerhard Häringe7ea7452008-03-29 00:45:29 +0000282
283 def CheckExecuteParamSequence(self):
284 class L(object):
285 def __len__(self):
286 return 1
287 def __getitem__(self, x):
288 assert x == 0
289 return "foo"
290
291 self.cu.execute("insert into test(name) values ('foo')")
292 self.cu.execute("select name from test where name=?", L())
293 row = self.cu.fetchone()
Gregory P. Smith04cecaf2009-07-04 08:32:15 +0000294 self.assertEqual(row[0], "foo")
Gerhard Häringe7ea7452008-03-29 00:45:29 +0000295
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000296 def CheckExecuteDictMapping(self):
297 self.cu.execute("insert into test(name) values ('foo')")
298 self.cu.execute("select name from test where name=:name", {"name": "foo"})
299 row = self.cu.fetchone()
Gregory P. Smith04cecaf2009-07-04 08:32:15 +0000300 self.assertEqual(row[0], "foo")
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000301
Gerhard Häringe7ea7452008-03-29 00:45:29 +0000302 def CheckExecuteDictMapping_Mapping(self):
303 class D(dict):
304 def __missing__(self, key):
305 return "foo"
306
307 self.cu.execute("insert into test(name) values ('foo')")
308 self.cu.execute("select name from test where name=:name", D())
309 row = self.cu.fetchone()
Gregory P. Smith04cecaf2009-07-04 08:32:15 +0000310 self.assertEqual(row[0], "foo")
Gerhard Häringe7ea7452008-03-29 00:45:29 +0000311
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000312 def CheckExecuteDictMappingTooLittleArgs(self):
313 self.cu.execute("insert into test(name) values ('foo')")
314 try:
315 self.cu.execute("select name from test where name=:name and id=:id", {"name": "foo"})
316 self.fail("should have raised ProgrammingError")
317 except sqlite.ProgrammingError:
318 pass
319
320 def CheckExecuteDictMappingNoArgs(self):
321 self.cu.execute("insert into test(name) values ('foo')")
322 try:
323 self.cu.execute("select name from test where name=:name")
324 self.fail("should have raised ProgrammingError")
325 except sqlite.ProgrammingError:
326 pass
327
328 def CheckExecuteDictMappingUnnamed(self):
329 self.cu.execute("insert into test(name) values ('foo')")
330 try:
331 self.cu.execute("select name from test where name=?", {"name": "foo"})
332 self.fail("should have raised ProgrammingError")
333 except sqlite.ProgrammingError:
334 pass
335
336 def CheckClose(self):
337 self.cu.close()
338
339 def CheckRowcountExecute(self):
340 self.cu.execute("delete from test")
341 self.cu.execute("insert into test(name) values ('foo')")
342 self.cu.execute("insert into test(name) values ('foo')")
343 self.cu.execute("update test set name='bar'")
Gregory P. Smith04cecaf2009-07-04 08:32:15 +0000344 self.assertEqual(self.cu.rowcount, 2)
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000345
Georg Brandlf78e02b2008-06-10 17:40:04 +0000346 def CheckRowcountSelect(self):
347 """
348 pysqlite does not know the rowcount of SELECT statements, because we
349 don't fetch all rows after executing the select statement. The rowcount
350 has thus to be -1.
351 """
352 self.cu.execute("select 5 union select 6")
Gregory P. Smith04cecaf2009-07-04 08:32:15 +0000353 self.assertEqual(self.cu.rowcount, -1)
Georg Brandlf78e02b2008-06-10 17:40:04 +0000354
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000355 def CheckRowcountExecutemany(self):
356 self.cu.execute("delete from test")
357 self.cu.executemany("insert into test(name) values (?)", [(1,), (2,), (3,)])
Gregory P. Smith04cecaf2009-07-04 08:32:15 +0000358 self.assertEqual(self.cu.rowcount, 3)
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000359
360 def CheckTotalChanges(self):
361 self.cu.execute("insert into test(name) values ('foo')")
362 self.cu.execute("insert into test(name) values ('foo')")
363 if self.cx.total_changes < 2:
364 self.fail("total changes reported wrong value")
365
366 # Checks for executemany:
367 # Sequences are required by the DB-API, iterators
368 # enhancements in pysqlite.
369
370 def CheckExecuteManySequence(self):
371 self.cu.executemany("insert into test(income) values (?)", [(x,) for x in range(100, 110)])
372
373 def CheckExecuteManyIterator(self):
374 class MyIter:
375 def __init__(self):
376 self.value = 5
377
Georg Brandla18af4e2007-04-21 15:47:16 +0000378 def __next__(self):
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000379 if self.value == 10:
380 raise StopIteration
381 else:
382 self.value += 1
383 return (self.value,)
384
385 self.cu.executemany("insert into test(income) values (?)", MyIter())
386
387 def CheckExecuteManyGenerator(self):
388 def mygen():
389 for i in range(5):
390 yield (i,)
391
392 self.cu.executemany("insert into test(income) values (?)", mygen())
393
394 def CheckExecuteManyWrongSqlArg(self):
395 try:
396 self.cu.executemany(42, [(3,)])
397 self.fail("should have raised a ValueError")
398 except ValueError:
399 return
400 except:
401 self.fail("raised wrong exception.")
402
403 def CheckExecuteManySelect(self):
404 try:
405 self.cu.executemany("select ?", [(3,)])
406 self.fail("should have raised a ProgrammingError")
407 except sqlite.ProgrammingError:
408 return
409 except:
410 self.fail("raised wrong exception.")
411
412 def CheckExecuteManyNotIterable(self):
413 try:
414 self.cu.executemany("insert into test(income) values (?)", 42)
415 self.fail("should have raised a TypeError")
416 except TypeError:
417 return
Guido van Rossumb940e112007-01-10 16:19:56 +0000418 except Exception as e:
Guido van Rossumbe19ed72007-02-09 05:37:30 +0000419 print("raised", e.__class__)
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000420 self.fail("raised wrong exception.")
421
422 def CheckFetchIter(self):
423 # Optional DB-API extension.
424 self.cu.execute("delete from test")
425 self.cu.execute("insert into test(id) values (?)", (5,))
426 self.cu.execute("insert into test(id) values (?)", (6,))
427 self.cu.execute("select id from test order by id")
428 lst = []
429 for row in self.cu:
430 lst.append(row[0])
Gregory P. Smith04cecaf2009-07-04 08:32:15 +0000431 self.assertEqual(lst[0], 5)
432 self.assertEqual(lst[1], 6)
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000433
434 def CheckFetchone(self):
435 self.cu.execute("select name from test")
436 row = self.cu.fetchone()
Gregory P. Smith04cecaf2009-07-04 08:32:15 +0000437 self.assertEqual(row[0], "foo")
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000438 row = self.cu.fetchone()
Gregory P. Smith04cecaf2009-07-04 08:32:15 +0000439 self.assertEqual(row, None)
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000440
441 def CheckFetchoneNoStatement(self):
442 cur = self.cx.cursor()
443 row = cur.fetchone()
Gregory P. Smith04cecaf2009-07-04 08:32:15 +0000444 self.assertEqual(row, None)
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000445
446 def CheckArraySize(self):
447 # must default ot 1
Gregory P. Smith04cecaf2009-07-04 08:32:15 +0000448 self.assertEqual(self.cu.arraysize, 1)
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000449
450 # now set to 2
451 self.cu.arraysize = 2
452
453 # now make the query return 3 rows
454 self.cu.execute("delete from test")
455 self.cu.execute("insert into test(name) values ('A')")
456 self.cu.execute("insert into test(name) values ('B')")
457 self.cu.execute("insert into test(name) values ('C')")
458 self.cu.execute("select name from test")
459 res = self.cu.fetchmany()
460
Gregory P. Smith04cecaf2009-07-04 08:32:15 +0000461 self.assertEqual(len(res), 2)
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000462
463 def CheckFetchmany(self):
464 self.cu.execute("select name from test")
465 res = self.cu.fetchmany(100)
Gregory P. Smith04cecaf2009-07-04 08:32:15 +0000466 self.assertEqual(len(res), 1)
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000467 res = self.cu.fetchmany(100)
Gregory P. Smith04cecaf2009-07-04 08:32:15 +0000468 self.assertEqual(res, [])
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000469
Gerhard Häringe7ea7452008-03-29 00:45:29 +0000470 def CheckFetchmanyKwArg(self):
471 """Checks if fetchmany works with keyword arguments"""
472 self.cu.execute("select name from test")
473 res = self.cu.fetchmany(size=100)
Gregory P. Smith04cecaf2009-07-04 08:32:15 +0000474 self.assertEqual(len(res), 1)
Gerhard Häringe7ea7452008-03-29 00:45:29 +0000475
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000476 def CheckFetchall(self):
477 self.cu.execute("select name from test")
478 res = self.cu.fetchall()
Gregory P. Smith04cecaf2009-07-04 08:32:15 +0000479 self.assertEqual(len(res), 1)
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000480 res = self.cu.fetchall()
Gregory P. Smith04cecaf2009-07-04 08:32:15 +0000481 self.assertEqual(res, [])
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000482
483 def CheckSetinputsizes(self):
484 self.cu.setinputsizes([3, 4, 5])
485
486 def CheckSetoutputsize(self):
487 self.cu.setoutputsize(5, 0)
488
489 def CheckSetoutputsizeNoColumn(self):
490 self.cu.setoutputsize(42)
491
492 def CheckCursorConnection(self):
493 # Optional DB-API extension.
Gregory P. Smith04cecaf2009-07-04 08:32:15 +0000494 self.assertEqual(self.cu.connection, self.cx)
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000495
496 def CheckWrongCursorCallable(self):
497 try:
498 def f(): pass
499 cur = self.cx.cursor(f)
500 self.fail("should have raised a TypeError")
501 except TypeError:
502 return
503 self.fail("should have raised a ValueError")
504
505 def CheckCursorWrongClass(self):
506 class Foo: pass
507 foo = Foo()
508 try:
509 cur = sqlite.Cursor(foo)
510 self.fail("should have raised a ValueError")
511 except TypeError:
512 pass
513
Victor Stinner45df8202010-04-28 22:31:17 +0000514@unittest.skipUnless(threading, 'This test requires threading.')
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000515class ThreadTests(unittest.TestCase):
516 def setUp(self):
517 self.con = sqlite.connect(":memory:")
518 self.cur = self.con.cursor()
519 self.cur.execute("create table test(id integer primary key, name text, bin binary, ratio number, ts timestamp)")
520
521 def tearDown(self):
522 self.cur.close()
523 self.con.close()
524
525 def CheckConCursor(self):
526 def run(con, errors):
527 try:
528 cur = con.cursor()
529 errors.append("did not raise ProgrammingError")
530 return
531 except sqlite.ProgrammingError:
532 return
533 except:
534 errors.append("raised wrong exception")
535
536 errors = []
537 t = threading.Thread(target=run, kwargs={"con": self.con, "errors": errors})
538 t.start()
539 t.join()
540 if len(errors) > 0:
541 self.fail("\n".join(errors))
542
543 def CheckConCommit(self):
544 def run(con, errors):
545 try:
546 con.commit()
547 errors.append("did not raise ProgrammingError")
548 return
549 except sqlite.ProgrammingError:
550 return
551 except:
552 errors.append("raised wrong exception")
553
554 errors = []
555 t = threading.Thread(target=run, kwargs={"con": self.con, "errors": errors})
556 t.start()
557 t.join()
558 if len(errors) > 0:
559 self.fail("\n".join(errors))
560
561 def CheckConRollback(self):
562 def run(con, errors):
563 try:
564 con.rollback()
565 errors.append("did not raise ProgrammingError")
566 return
567 except sqlite.ProgrammingError:
568 return
569 except:
570 errors.append("raised wrong exception")
571
572 errors = []
573 t = threading.Thread(target=run, kwargs={"con": self.con, "errors": errors})
574 t.start()
575 t.join()
576 if len(errors) > 0:
577 self.fail("\n".join(errors))
578
579 def CheckConClose(self):
580 def run(con, errors):
581 try:
582 con.close()
583 errors.append("did not raise ProgrammingError")
584 return
585 except sqlite.ProgrammingError:
586 return
587 except:
588 errors.append("raised wrong exception")
589
590 errors = []
591 t = threading.Thread(target=run, kwargs={"con": self.con, "errors": errors})
592 t.start()
593 t.join()
594 if len(errors) > 0:
595 self.fail("\n".join(errors))
596
597 def CheckCurImplicitBegin(self):
598 def run(cur, errors):
599 try:
600 cur.execute("insert into test(name) values ('a')")
601 errors.append("did not raise ProgrammingError")
602 return
603 except sqlite.ProgrammingError:
604 return
605 except:
606 errors.append("raised wrong exception")
607
608 errors = []
609 t = threading.Thread(target=run, kwargs={"cur": self.cur, "errors": errors})
610 t.start()
611 t.join()
612 if len(errors) > 0:
613 self.fail("\n".join(errors))
614
615 def CheckCurClose(self):
616 def run(cur, errors):
617 try:
618 cur.close()
619 errors.append("did not raise ProgrammingError")
620 return
621 except sqlite.ProgrammingError:
622 return
623 except:
624 errors.append("raised wrong exception")
625
626 errors = []
627 t = threading.Thread(target=run, kwargs={"cur": self.cur, "errors": errors})
628 t.start()
629 t.join()
630 if len(errors) > 0:
631 self.fail("\n".join(errors))
632
633 def CheckCurExecute(self):
634 def run(cur, errors):
635 try:
636 cur.execute("select name from test")
637 errors.append("did not raise ProgrammingError")
638 return
639 except sqlite.ProgrammingError:
640 return
641 except:
642 errors.append("raised wrong exception")
643
644 errors = []
645 self.cur.execute("insert into test(name) values ('a')")
646 t = threading.Thread(target=run, kwargs={"cur": self.cur, "errors": errors})
647 t.start()
648 t.join()
649 if len(errors) > 0:
650 self.fail("\n".join(errors))
651
652 def CheckCurIterNext(self):
653 def run(cur, errors):
654 try:
655 row = cur.fetchone()
656 errors.append("did not raise ProgrammingError")
657 return
658 except sqlite.ProgrammingError:
659 return
660 except:
661 errors.append("raised wrong exception")
662
663 errors = []
664 self.cur.execute("insert into test(name) values ('a')")
665 self.cur.execute("select name from test")
666 t = threading.Thread(target=run, kwargs={"cur": self.cur, "errors": errors})
667 t.start()
668 t.join()
669 if len(errors) > 0:
670 self.fail("\n".join(errors))
671
672class ConstructorTests(unittest.TestCase):
673 def CheckDate(self):
674 d = sqlite.Date(2004, 10, 28)
675
676 def CheckTime(self):
677 t = sqlite.Time(12, 39, 35)
678
679 def CheckTimestamp(self):
680 ts = sqlite.Timestamp(2004, 10, 28, 12, 39, 35)
681
682 def CheckDateFromTicks(self):
683 d = sqlite.DateFromTicks(42)
684
685 def CheckTimeFromTicks(self):
686 t = sqlite.TimeFromTicks(42)
687
688 def CheckTimestampFromTicks(self):
689 ts = sqlite.TimestampFromTicks(42)
690
691 def CheckBinary(self):
Guido van Rossumbae07c92007-10-08 02:46:15 +0000692 b = sqlite.Binary(b"\0'")
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000693
694class ExtensionTests(unittest.TestCase):
695 def CheckScriptStringSql(self):
696 con = sqlite.connect(":memory:")
697 cur = con.cursor()
698 cur.executescript("""
699 -- bla bla
700 /* a stupid comment */
701 create table a(i);
702 insert into a(i) values (5);
703 """)
704 cur.execute("select i from a")
705 res = cur.fetchone()[0]
Gregory P. Smith04cecaf2009-07-04 08:32:15 +0000706 self.assertEqual(res, 5)
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000707
Gerhard Häringf9cee222010-03-05 15:20:03 +0000708 def CheckScriptSyntaxError(self):
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000709 con = sqlite.connect(":memory:")
710 cur = con.cursor()
711 raised = False
712 try:
Gerhard Häringf9cee222010-03-05 15:20:03 +0000713 cur.executescript("create table test(x); asdf; create table test2(x)")
714 except sqlite.OperationalError:
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000715 raised = True
Gregory P. Smith04cecaf2009-07-04 08:32:15 +0000716 self.assertEqual(raised, True, "should have raised an exception")
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000717
718 def CheckScriptErrorNormal(self):
719 con = sqlite.connect(":memory:")
720 cur = con.cursor()
721 raised = False
722 try:
723 cur.executescript("create table test(sadfsadfdsa); select foo from hurz;")
724 except sqlite.OperationalError:
725 raised = True
Gregory P. Smith04cecaf2009-07-04 08:32:15 +0000726 self.assertEqual(raised, True, "should have raised an exception")
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000727
728 def CheckConnectionExecute(self):
729 con = sqlite.connect(":memory:")
730 result = con.execute("select 5").fetchone()[0]
Gregory P. Smith04cecaf2009-07-04 08:32:15 +0000731 self.assertEqual(result, 5, "Basic test of Connection.execute")
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000732
733 def CheckConnectionExecutemany(self):
734 con = sqlite.connect(":memory:")
735 con.execute("create table test(foo)")
736 con.executemany("insert into test(foo) values (?)", [(3,), (4,)])
737 result = con.execute("select foo from test order by foo").fetchall()
Gregory P. Smith04cecaf2009-07-04 08:32:15 +0000738 self.assertEqual(result[0][0], 3, "Basic test of Connection.executemany")
739 self.assertEqual(result[1][0], 4, "Basic test of Connection.executemany")
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000740
741 def CheckConnectionExecutescript(self):
742 con = sqlite.connect(":memory:")
743 con.executescript("create table test(foo); insert into test(foo) values (5);")
744 result = con.execute("select foo from test").fetchone()[0]
Gregory P. Smith04cecaf2009-07-04 08:32:15 +0000745 self.assertEqual(result, 5, "Basic test of Connection.executescript")
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000746
Gerhard Häringf9cee222010-03-05 15:20:03 +0000747class ClosedConTests(unittest.TestCase):
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000748 def setUp(self):
749 pass
750
751 def tearDown(self):
752 pass
753
754 def CheckClosedConCursor(self):
755 con = sqlite.connect(":memory:")
756 con.close()
757 try:
758 cur = con.cursor()
759 self.fail("Should have raised a ProgrammingError")
760 except sqlite.ProgrammingError:
761 pass
762 except:
763 self.fail("Should have raised a ProgrammingError")
764
765 def CheckClosedConCommit(self):
766 con = sqlite.connect(":memory:")
767 con.close()
768 try:
769 con.commit()
770 self.fail("Should have raised a ProgrammingError")
771 except sqlite.ProgrammingError:
772 pass
773 except:
774 self.fail("Should have raised a ProgrammingError")
775
776 def CheckClosedConRollback(self):
777 con = sqlite.connect(":memory:")
778 con.close()
779 try:
780 con.rollback()
781 self.fail("Should have raised a ProgrammingError")
782 except sqlite.ProgrammingError:
783 pass
784 except:
785 self.fail("Should have raised a ProgrammingError")
786
787 def CheckClosedCurExecute(self):
788 con = sqlite.connect(":memory:")
789 cur = con.cursor()
790 con.close()
791 try:
792 cur.execute("select 4")
793 self.fail("Should have raised a ProgrammingError")
794 except sqlite.ProgrammingError:
795 pass
796 except:
797 self.fail("Should have raised a ProgrammingError")
798
Gerhard Häringf9cee222010-03-05 15:20:03 +0000799 def CheckClosedCreateFunction(self):
800 con = sqlite.connect(":memory:")
801 con.close()
802 def f(x): return 17
803 try:
804 con.create_function("foo", 1, f)
805 self.fail("Should have raised a ProgrammingError")
806 except sqlite.ProgrammingError:
807 pass
808 except:
809 self.fail("Should have raised a ProgrammingError")
810
811 def CheckClosedCreateAggregate(self):
812 con = sqlite.connect(":memory:")
813 con.close()
814 class Agg:
815 def __init__(self):
816 pass
817 def step(self, x):
818 pass
819 def finalize(self):
820 return 17
821 try:
822 con.create_aggregate("foo", 1, Agg)
823 self.fail("Should have raised a ProgrammingError")
824 except sqlite.ProgrammingError:
825 pass
826 except:
827 self.fail("Should have raised a ProgrammingError")
828
829 def CheckClosedSetAuthorizer(self):
830 con = sqlite.connect(":memory:")
831 con.close()
832 def authorizer(*args):
833 return sqlite.DENY
834 try:
835 con.set_authorizer(authorizer)
836 self.fail("Should have raised a ProgrammingError")
837 except sqlite.ProgrammingError:
838 pass
839 except:
840 self.fail("Should have raised a ProgrammingError")
841
842 def CheckClosedSetProgressCallback(self):
843 con = sqlite.connect(":memory:")
844 con.close()
845 def progress(): pass
846 try:
847 con.set_progress_handler(progress, 100)
848 self.fail("Should have raised a ProgrammingError")
849 except sqlite.ProgrammingError:
850 pass
851 except:
852 self.fail("Should have raised a ProgrammingError")
853
854 def CheckClosedCall(self):
855 con = sqlite.connect(":memory:")
856 con.close()
857 try:
858 con()
859 self.fail("Should have raised a ProgrammingError")
860 except sqlite.ProgrammingError:
861 pass
862 except:
863 self.fail("Should have raised a ProgrammingError")
864
865class ClosedCurTests(unittest.TestCase):
866 def setUp(self):
867 pass
868
869 def tearDown(self):
870 pass
871
872 def CheckClosed(self):
873 con = sqlite.connect(":memory:")
874 cur = con.cursor()
875 cur.close()
876
877 for method_name in ("execute", "executemany", "executescript", "fetchall", "fetchmany", "fetchone"):
878 if method_name in ("execute", "executescript"):
879 params = ("select 4 union select 5",)
880 elif method_name == "executemany":
881 params = ("insert into foo(bar) values (?)", [(3,), (4,)])
882 else:
883 params = []
884
885 try:
886 method = getattr(cur, method_name)
887
888 method(*params)
889 self.fail("Should have raised a ProgrammingError: method " + method_name)
890 except sqlite.ProgrammingError:
891 pass
892 except:
893 self.fail("Should have raised a ProgrammingError: " + method_name)
894
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000895def suite():
896 module_suite = unittest.makeSuite(ModuleTests, "Check")
897 connection_suite = unittest.makeSuite(ConnectionTests, "Check")
898 cursor_suite = unittest.makeSuite(CursorTests, "Check")
899 thread_suite = unittest.makeSuite(ThreadTests, "Check")
900 constructor_suite = unittest.makeSuite(ConstructorTests, "Check")
901 ext_suite = unittest.makeSuite(ExtensionTests, "Check")
Gerhard Häringf9cee222010-03-05 15:20:03 +0000902 closed_con_suite = unittest.makeSuite(ClosedConTests, "Check")
903 closed_cur_suite = unittest.makeSuite(ClosedCurTests, "Check")
904 return unittest.TestSuite((module_suite, connection_suite, cursor_suite, thread_suite, constructor_suite, ext_suite, closed_con_suite, closed_cur_suite))
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000905
906def test():
907 runner = unittest.TextTestRunner()
908 runner.run(suite())
909
910if __name__ == "__main__":
911 test()