blob: 401ba79df82fb033bb6ac5e5e5ac3c7c57786e52 [file] [log] [blame]
Anthony Baxterc51ee692006-04-01 00:57:31 +00001#-*- coding: ISO-8859-1 -*-
2# pysqlite2/test/dbapi.py: tests for DB-API compliance
3#
Gerhard Häring3bbb6722010-03-05 09:12:37 +00004# Copyright (C) 2004-2010 Gerhard Häring <gh@ghaering.de>
Anthony Baxterc51ee692006-04-01 00:57:31 +00005#
6# This file is part of pysqlite.
7#
8# This software is provided 'as-is', without any express or implied
9# warranty. In no event will the authors be held liable for any damages
10# arising from the use of this software.
11#
12# Permission is granted to anyone to use this software for any purpose,
13# including commercial applications, and to alter it and redistribute it
14# freely, subject to the following restrictions:
15#
16# 1. The origin of this software must not be misrepresented; you must not
17# claim that you wrote the original software. If you use this software
18# in a product, an acknowledgment in the product documentation would be
19# appreciated but is not required.
20# 2. Altered source versions must be plainly marked as such, and must not be
21# misrepresented as being the original software.
22# 3. This notice may not be removed or altered from any source distribution.
23
24import unittest
Gerhard Häring1cc60ed2008-02-29 22:08:41 +000025import sys
Anthony Baxterc51ee692006-04-01 00:57:31 +000026import sqlite3 as sqlite
Victor Stinner6a102812010-04-27 23:55:59 +000027try:
28 import threading
29except ImportError:
30 threading = None
Anthony Baxterc51ee692006-04-01 00:57:31 +000031
32class ModuleTests(unittest.TestCase):
33 def CheckAPILevel(self):
34 self.assertEqual(sqlite.apilevel, "2.0",
35 "apilevel is %s, should be 2.0" % sqlite.apilevel)
36
37 def CheckThreadSafety(self):
38 self.assertEqual(sqlite.threadsafety, 1,
39 "threadsafety is %d, should be 1" % sqlite.threadsafety)
40
41 def CheckParamStyle(self):
42 self.assertEqual(sqlite.paramstyle, "qmark",
43 "paramstyle is '%s', should be 'qmark'" %
44 sqlite.paramstyle)
45
46 def CheckWarning(self):
47 self.assert_(issubclass(sqlite.Warning, StandardError),
48 "Warning is not a subclass of StandardError")
49
50 def CheckError(self):
Gregory P. Smith1844b0d2009-07-04 08:42:10 +000051 self.assertTrue(issubclass(sqlite.Error, StandardError),
Anthony Baxterc51ee692006-04-01 00:57:31 +000052 "Error is not a subclass of StandardError")
53
54 def CheckInterfaceError(self):
Gregory P. Smith1844b0d2009-07-04 08:42:10 +000055 self.assertTrue(issubclass(sqlite.InterfaceError, sqlite.Error),
Anthony Baxterc51ee692006-04-01 00:57:31 +000056 "InterfaceError is not a subclass of Error")
57
58 def CheckDatabaseError(self):
Gregory P. Smith1844b0d2009-07-04 08:42:10 +000059 self.assertTrue(issubclass(sqlite.DatabaseError, sqlite.Error),
Anthony Baxterc51ee692006-04-01 00:57:31 +000060 "DatabaseError is not a subclass of Error")
61
62 def CheckDataError(self):
Gregory P. Smith1844b0d2009-07-04 08:42:10 +000063 self.assertTrue(issubclass(sqlite.DataError, sqlite.DatabaseError),
Anthony Baxterc51ee692006-04-01 00:57:31 +000064 "DataError is not a subclass of DatabaseError")
65
66 def CheckOperationalError(self):
Gregory P. Smith1844b0d2009-07-04 08:42:10 +000067 self.assertTrue(issubclass(sqlite.OperationalError, sqlite.DatabaseError),
Anthony Baxterc51ee692006-04-01 00:57:31 +000068 "OperationalError is not a subclass of DatabaseError")
69
70 def CheckIntegrityError(self):
Gregory P. Smith1844b0d2009-07-04 08:42:10 +000071 self.assertTrue(issubclass(sqlite.IntegrityError, sqlite.DatabaseError),
Anthony Baxterc51ee692006-04-01 00:57:31 +000072 "IntegrityError is not a subclass of DatabaseError")
73
74 def CheckInternalError(self):
Gregory P. Smith1844b0d2009-07-04 08:42:10 +000075 self.assertTrue(issubclass(sqlite.InternalError, sqlite.DatabaseError),
Anthony Baxterc51ee692006-04-01 00:57:31 +000076 "InternalError is not a subclass of DatabaseError")
77
78 def CheckProgrammingError(self):
Gregory P. Smith1844b0d2009-07-04 08:42:10 +000079 self.assertTrue(issubclass(sqlite.ProgrammingError, sqlite.DatabaseError),
Anthony Baxterc51ee692006-04-01 00:57:31 +000080 "ProgrammingError is not a subclass of DatabaseError")
81
82 def CheckNotSupportedError(self):
Gregory P. Smith1844b0d2009-07-04 08:42:10 +000083 self.assertTrue(issubclass(sqlite.NotSupportedError,
Anthony Baxterc51ee692006-04-01 00:57:31 +000084 sqlite.DatabaseError),
85 "NotSupportedError is not a subclass of DatabaseError")
86
87class ConnectionTests(unittest.TestCase):
88 def setUp(self):
89 self.cx = sqlite.connect(":memory:")
90 cu = self.cx.cursor()
91 cu.execute("create table test(id integer primary key, name text)")
92 cu.execute("insert into test(name) values (?)", ("foo",))
93
94 def tearDown(self):
95 self.cx.close()
96
97 def CheckCommit(self):
98 self.cx.commit()
99
100 def CheckCommitAfterNoChanges(self):
101 """
102 A commit should also work when no changes were made to the database.
103 """
104 self.cx.commit()
105 self.cx.commit()
106
107 def CheckRollback(self):
108 self.cx.rollback()
109
110 def CheckRollbackAfterNoChanges(self):
111 """
112 A rollback should also work when no changes were made to the database.
113 """
114 self.cx.rollback()
115 self.cx.rollback()
116
117 def CheckCursor(self):
118 cu = self.cx.cursor()
119
120 def CheckFailedOpen(self):
121 YOU_CANNOT_OPEN_THIS = "/foo/bar/bla/23534/mydb.db"
122 try:
123 con = sqlite.connect(YOU_CANNOT_OPEN_THIS)
124 except sqlite.OperationalError:
125 return
126 self.fail("should have raised an OperationalError")
127
128 def CheckClose(self):
129 self.cx.close()
130
131 def CheckExceptions(self):
132 # Optional DB-API extension.
Gregory P. Smith1844b0d2009-07-04 08:42:10 +0000133 self.assertEqual(self.cx.Warning, sqlite.Warning)
134 self.assertEqual(self.cx.Error, sqlite.Error)
135 self.assertEqual(self.cx.InterfaceError, sqlite.InterfaceError)
136 self.assertEqual(self.cx.DatabaseError, sqlite.DatabaseError)
137 self.assertEqual(self.cx.DataError, sqlite.DataError)
138 self.assertEqual(self.cx.OperationalError, sqlite.OperationalError)
139 self.assertEqual(self.cx.IntegrityError, sqlite.IntegrityError)
140 self.assertEqual(self.cx.InternalError, sqlite.InternalError)
141 self.assertEqual(self.cx.ProgrammingError, sqlite.ProgrammingError)
142 self.assertEqual(self.cx.NotSupportedError, sqlite.NotSupportedError)
Anthony Baxterc51ee692006-04-01 00:57:31 +0000143
144class CursorTests(unittest.TestCase):
145 def setUp(self):
146 self.cx = sqlite.connect(":memory:")
147 self.cu = self.cx.cursor()
148 self.cu.execute("create table test(id integer primary key, name text, income number)")
149 self.cu.execute("insert into test(name) values (?)", ("foo",))
150
151 def tearDown(self):
152 self.cu.close()
153 self.cx.close()
154
155 def CheckExecuteNoArgs(self):
156 self.cu.execute("delete from test")
157
158 def CheckExecuteIllegalSql(self):
159 try:
160 self.cu.execute("select asdf")
161 self.fail("should have raised an OperationalError")
162 except sqlite.OperationalError:
163 return
164 except:
165 self.fail("raised wrong exception")
166
167 def CheckExecuteTooMuchSql(self):
168 try:
169 self.cu.execute("select 5+4; select 4+5")
170 self.fail("should have raised a Warning")
171 except sqlite.Warning:
172 return
173 except:
174 self.fail("raised wrong exception")
175
176 def CheckExecuteTooMuchSql2(self):
177 self.cu.execute("select 5+4; -- foo bar")
178
179 def CheckExecuteTooMuchSql3(self):
180 self.cu.execute("""
181 select 5+4;
182
183 /*
184 foo
185 */
186 """)
187
188 def CheckExecuteWrongSqlArg(self):
189 try:
190 self.cu.execute(42)
191 self.fail("should have raised a ValueError")
192 except ValueError:
193 return
194 except:
195 self.fail("raised wrong exception.")
196
197 def CheckExecuteArgInt(self):
198 self.cu.execute("insert into test(id) values (?)", (42,))
199
200 def CheckExecuteArgFloat(self):
201 self.cu.execute("insert into test(income) values (?)", (2500.32,))
202
203 def CheckExecuteArgString(self):
204 self.cu.execute("insert into test(name) values (?)", ("Hugo",))
205
206 def CheckExecuteWrongNoOfArgs1(self):
207 # too many parameters
208 try:
209 self.cu.execute("insert into test(id) values (?)", (17, "Egon"))
210 self.fail("should have raised ProgrammingError")
211 except sqlite.ProgrammingError:
212 pass
213
214 def CheckExecuteWrongNoOfArgs2(self):
215 # too little parameters
216 try:
217 self.cu.execute("insert into test(id) values (?)")
218 self.fail("should have raised ProgrammingError")
219 except sqlite.ProgrammingError:
220 pass
221
222 def CheckExecuteWrongNoOfArgs3(self):
223 # no parameters, parameters are needed
224 try:
225 self.cu.execute("insert into test(id) values (?)")
226 self.fail("should have raised ProgrammingError")
227 except sqlite.ProgrammingError:
228 pass
229
Gerhard Häring1cc60ed2008-02-29 22:08:41 +0000230 def CheckExecuteParamList(self):
231 self.cu.execute("insert into test(name) values ('foo')")
232 self.cu.execute("select name from test where name=?", ["foo"])
233 row = self.cu.fetchone()
Gregory P. Smith1844b0d2009-07-04 08:42:10 +0000234 self.assertEqual(row[0], "foo")
Gerhard Häring1cc60ed2008-02-29 22:08:41 +0000235
236 def CheckExecuteParamSequence(self):
237 class L(object):
238 def __len__(self):
239 return 1
240 def __getitem__(self, x):
241 assert x == 0
242 return "foo"
243
244 self.cu.execute("insert into test(name) values ('foo')")
245 self.cu.execute("select name from test where name=?", L())
246 row = self.cu.fetchone()
Gregory P. Smith1844b0d2009-07-04 08:42:10 +0000247 self.assertEqual(row[0], "foo")
Gerhard Häring1cc60ed2008-02-29 22:08:41 +0000248
Anthony Baxterc51ee692006-04-01 00:57:31 +0000249 def CheckExecuteDictMapping(self):
250 self.cu.execute("insert into test(name) values ('foo')")
251 self.cu.execute("select name from test where name=:name", {"name": "foo"})
252 row = self.cu.fetchone()
Gregory P. Smith1844b0d2009-07-04 08:42:10 +0000253 self.assertEqual(row[0], "foo")
Anthony Baxterc51ee692006-04-01 00:57:31 +0000254
Gerhard Häring1cc60ed2008-02-29 22:08:41 +0000255 def CheckExecuteDictMapping_Mapping(self):
256 # Test only works with Python 2.5 or later
257 if sys.version_info < (2, 5, 0):
258 return
259
260 class D(dict):
261 def __missing__(self, key):
262 return "foo"
263
264 self.cu.execute("insert into test(name) values ('foo')")
265 self.cu.execute("select name from test where name=:name", D())
266 row = self.cu.fetchone()
Gregory P. Smith1844b0d2009-07-04 08:42:10 +0000267 self.assertEqual(row[0], "foo")
Gerhard Häring1cc60ed2008-02-29 22:08:41 +0000268
Anthony Baxterc51ee692006-04-01 00:57:31 +0000269 def CheckExecuteDictMappingTooLittleArgs(self):
270 self.cu.execute("insert into test(name) values ('foo')")
271 try:
272 self.cu.execute("select name from test where name=:name and id=:id", {"name": "foo"})
273 self.fail("should have raised ProgrammingError")
274 except sqlite.ProgrammingError:
275 pass
276
277 def CheckExecuteDictMappingNoArgs(self):
278 self.cu.execute("insert into test(name) values ('foo')")
279 try:
280 self.cu.execute("select name from test where name=:name")
281 self.fail("should have raised ProgrammingError")
282 except sqlite.ProgrammingError:
283 pass
284
285 def CheckExecuteDictMappingUnnamed(self):
286 self.cu.execute("insert into test(name) values ('foo')")
287 try:
288 self.cu.execute("select name from test where name=?", {"name": "foo"})
289 self.fail("should have raised ProgrammingError")
290 except sqlite.ProgrammingError:
291 pass
292
293 def CheckClose(self):
294 self.cu.close()
295
296 def CheckRowcountExecute(self):
297 self.cu.execute("delete from test")
298 self.cu.execute("insert into test(name) values ('foo')")
299 self.cu.execute("insert into test(name) values ('foo')")
300 self.cu.execute("update test set name='bar'")
Gregory P. Smith1844b0d2009-07-04 08:42:10 +0000301 self.assertEqual(self.cu.rowcount, 2)
Anthony Baxterc51ee692006-04-01 00:57:31 +0000302
Gerhard Häring7f7ca352008-05-31 21:33:27 +0000303 def CheckRowcountSelect(self):
304 """
305 pysqlite does not know the rowcount of SELECT statements, because we
306 don't fetch all rows after executing the select statement. The rowcount
307 has thus to be -1.
308 """
309 self.cu.execute("select 5 union select 6")
Gregory P. Smith1844b0d2009-07-04 08:42:10 +0000310 self.assertEqual(self.cu.rowcount, -1)
Gerhard Häring7f7ca352008-05-31 21:33:27 +0000311
Anthony Baxterc51ee692006-04-01 00:57:31 +0000312 def CheckRowcountExecutemany(self):
313 self.cu.execute("delete from test")
314 self.cu.executemany("insert into test(name) values (?)", [(1,), (2,), (3,)])
Gregory P. Smith1844b0d2009-07-04 08:42:10 +0000315 self.assertEqual(self.cu.rowcount, 3)
Anthony Baxterc51ee692006-04-01 00:57:31 +0000316
Anthony Baxter72289a62006-04-04 06:29:05 +0000317 def CheckTotalChanges(self):
318 self.cu.execute("insert into test(name) values ('foo')")
319 self.cu.execute("insert into test(name) values ('foo')")
320 if self.cx.total_changes < 2:
321 self.fail("total changes reported wrong value")
322
Anthony Baxterc51ee692006-04-01 00:57:31 +0000323 # Checks for executemany:
324 # Sequences are required by the DB-API, iterators
325 # enhancements in pysqlite.
326
327 def CheckExecuteManySequence(self):
328 self.cu.executemany("insert into test(income) values (?)", [(x,) for x in range(100, 110)])
329
330 def CheckExecuteManyIterator(self):
331 class MyIter:
332 def __init__(self):
333 self.value = 5
334
335 def next(self):
336 if self.value == 10:
337 raise StopIteration
338 else:
339 self.value += 1
340 return (self.value,)
341
342 self.cu.executemany("insert into test(income) values (?)", MyIter())
343
344 def CheckExecuteManyGenerator(self):
345 def mygen():
346 for i in range(5):
347 yield (i,)
348
349 self.cu.executemany("insert into test(income) values (?)", mygen())
350
351 def CheckExecuteManyWrongSqlArg(self):
352 try:
353 self.cu.executemany(42, [(3,)])
354 self.fail("should have raised a ValueError")
355 except ValueError:
356 return
357 except:
358 self.fail("raised wrong exception.")
359
360 def CheckExecuteManySelect(self):
361 try:
362 self.cu.executemany("select ?", [(3,)])
363 self.fail("should have raised a ProgrammingError")
364 except sqlite.ProgrammingError:
365 return
366 except:
367 self.fail("raised wrong exception.")
368
369 def CheckExecuteManyNotIterable(self):
370 try:
371 self.cu.executemany("insert into test(income) values (?)", 42)
372 self.fail("should have raised a TypeError")
373 except TypeError:
374 return
375 except Exception, e:
376 print "raised", e.__class__
377 self.fail("raised wrong exception.")
378
379 def CheckFetchIter(self):
380 # Optional DB-API extension.
381 self.cu.execute("delete from test")
382 self.cu.execute("insert into test(id) values (?)", (5,))
383 self.cu.execute("insert into test(id) values (?)", (6,))
384 self.cu.execute("select id from test order by id")
385 lst = []
386 for row in self.cu:
387 lst.append(row[0])
Gregory P. Smith1844b0d2009-07-04 08:42:10 +0000388 self.assertEqual(lst[0], 5)
389 self.assertEqual(lst[1], 6)
Anthony Baxterc51ee692006-04-01 00:57:31 +0000390
391 def CheckFetchone(self):
392 self.cu.execute("select name from test")
393 row = self.cu.fetchone()
Gregory P. Smith1844b0d2009-07-04 08:42:10 +0000394 self.assertEqual(row[0], "foo")
Anthony Baxterc51ee692006-04-01 00:57:31 +0000395 row = self.cu.fetchone()
Gregory P. Smith1844b0d2009-07-04 08:42:10 +0000396 self.assertEqual(row, None)
Anthony Baxterc51ee692006-04-01 00:57:31 +0000397
398 def CheckFetchoneNoStatement(self):
399 cur = self.cx.cursor()
400 row = cur.fetchone()
Gregory P. Smith1844b0d2009-07-04 08:42:10 +0000401 self.assertEqual(row, None)
Anthony Baxterc51ee692006-04-01 00:57:31 +0000402
403 def CheckArraySize(self):
404 # must default ot 1
Gregory P. Smith1844b0d2009-07-04 08:42:10 +0000405 self.assertEqual(self.cu.arraysize, 1)
Anthony Baxterc51ee692006-04-01 00:57:31 +0000406
407 # now set to 2
408 self.cu.arraysize = 2
409
410 # now make the query return 3 rows
411 self.cu.execute("delete from test")
412 self.cu.execute("insert into test(name) values ('A')")
413 self.cu.execute("insert into test(name) values ('B')")
414 self.cu.execute("insert into test(name) values ('C')")
415 self.cu.execute("select name from test")
416 res = self.cu.fetchmany()
417
Gregory P. Smith1844b0d2009-07-04 08:42:10 +0000418 self.assertEqual(len(res), 2)
Anthony Baxterc51ee692006-04-01 00:57:31 +0000419
420 def CheckFetchmany(self):
421 self.cu.execute("select name from test")
422 res = self.cu.fetchmany(100)
Gregory P. Smith1844b0d2009-07-04 08:42:10 +0000423 self.assertEqual(len(res), 1)
Anthony Baxterc51ee692006-04-01 00:57:31 +0000424 res = self.cu.fetchmany(100)
Gregory P. Smith1844b0d2009-07-04 08:42:10 +0000425 self.assertEqual(res, [])
Anthony Baxterc51ee692006-04-01 00:57:31 +0000426
Gerhard Häring1cc60ed2008-02-29 22:08:41 +0000427 def CheckFetchmanyKwArg(self):
428 """Checks if fetchmany works with keyword arguments"""
429 self.cu.execute("select name from test")
430 res = self.cu.fetchmany(size=100)
Gregory P. Smith1844b0d2009-07-04 08:42:10 +0000431 self.assertEqual(len(res), 1)
Gerhard Häring1cc60ed2008-02-29 22:08:41 +0000432
Anthony Baxterc51ee692006-04-01 00:57:31 +0000433 def CheckFetchall(self):
434 self.cu.execute("select name from test")
435 res = self.cu.fetchall()
Gregory P. Smith1844b0d2009-07-04 08:42:10 +0000436 self.assertEqual(len(res), 1)
Anthony Baxterc51ee692006-04-01 00:57:31 +0000437 res = self.cu.fetchall()
Gregory P. Smith1844b0d2009-07-04 08:42:10 +0000438 self.assertEqual(res, [])
Anthony Baxterc51ee692006-04-01 00:57:31 +0000439
440 def CheckSetinputsizes(self):
441 self.cu.setinputsizes([3, 4, 5])
442
443 def CheckSetoutputsize(self):
444 self.cu.setoutputsize(5, 0)
445
446 def CheckSetoutputsizeNoColumn(self):
447 self.cu.setoutputsize(42)
448
449 def CheckCursorConnection(self):
450 # Optional DB-API extension.
Gregory P. Smith1844b0d2009-07-04 08:42:10 +0000451 self.assertEqual(self.cu.connection, self.cx)
Anthony Baxterc51ee692006-04-01 00:57:31 +0000452
453 def CheckWrongCursorCallable(self):
454 try:
455 def f(): pass
456 cur = self.cx.cursor(f)
457 self.fail("should have raised a TypeError")
458 except TypeError:
459 return
460 self.fail("should have raised a ValueError")
461
462 def CheckCursorWrongClass(self):
463 class Foo: pass
464 foo = Foo()
465 try:
466 cur = sqlite.Cursor(foo)
467 self.fail("should have raised a ValueError")
468 except TypeError:
469 pass
470
Victor Stinner6a102812010-04-27 23:55:59 +0000471@unittest.skipUnless(threading, 'This test requires threading.')
Anthony Baxterc51ee692006-04-01 00:57:31 +0000472class ThreadTests(unittest.TestCase):
473 def setUp(self):
474 self.con = sqlite.connect(":memory:")
475 self.cur = self.con.cursor()
476 self.cur.execute("create table test(id integer primary key, name text, bin binary, ratio number, ts timestamp)")
477
478 def tearDown(self):
479 self.cur.close()
480 self.con.close()
481
482 def CheckConCursor(self):
483 def run(con, errors):
484 try:
485 cur = con.cursor()
486 errors.append("did not raise ProgrammingError")
487 return
488 except sqlite.ProgrammingError:
489 return
490 except:
491 errors.append("raised wrong exception")
492
493 errors = []
494 t = threading.Thread(target=run, kwargs={"con": self.con, "errors": errors})
495 t.start()
496 t.join()
497 if len(errors) > 0:
498 self.fail("\n".join(errors))
499
500 def CheckConCommit(self):
501 def run(con, errors):
502 try:
503 con.commit()
504 errors.append("did not raise ProgrammingError")
505 return
506 except sqlite.ProgrammingError:
507 return
508 except:
509 errors.append("raised wrong exception")
510
511 errors = []
512 t = threading.Thread(target=run, kwargs={"con": self.con, "errors": errors})
513 t.start()
514 t.join()
515 if len(errors) > 0:
516 self.fail("\n".join(errors))
517
518 def CheckConRollback(self):
519 def run(con, errors):
520 try:
521 con.rollback()
522 errors.append("did not raise ProgrammingError")
523 return
524 except sqlite.ProgrammingError:
525 return
526 except:
527 errors.append("raised wrong exception")
528
529 errors = []
530 t = threading.Thread(target=run, kwargs={"con": self.con, "errors": errors})
531 t.start()
532 t.join()
533 if len(errors) > 0:
534 self.fail("\n".join(errors))
535
536 def CheckConClose(self):
537 def run(con, errors):
538 try:
539 con.close()
540 errors.append("did not raise ProgrammingError")
541 return
542 except sqlite.ProgrammingError:
543 return
544 except:
545 errors.append("raised wrong exception")
546
547 errors = []
548 t = threading.Thread(target=run, kwargs={"con": self.con, "errors": errors})
549 t.start()
550 t.join()
551 if len(errors) > 0:
552 self.fail("\n".join(errors))
553
554 def CheckCurImplicitBegin(self):
555 def run(cur, errors):
556 try:
557 cur.execute("insert into test(name) values ('a')")
558 errors.append("did not raise ProgrammingError")
559 return
560 except sqlite.ProgrammingError:
561 return
562 except:
563 errors.append("raised wrong exception")
564
565 errors = []
566 t = threading.Thread(target=run, kwargs={"cur": self.cur, "errors": errors})
567 t.start()
568 t.join()
569 if len(errors) > 0:
570 self.fail("\n".join(errors))
571
572 def CheckCurClose(self):
573 def run(cur, errors):
574 try:
575 cur.close()
576 errors.append("did not raise ProgrammingError")
577 return
578 except sqlite.ProgrammingError:
579 return
580 except:
581 errors.append("raised wrong exception")
582
583 errors = []
584 t = threading.Thread(target=run, kwargs={"cur": self.cur, "errors": errors})
585 t.start()
586 t.join()
587 if len(errors) > 0:
588 self.fail("\n".join(errors))
589
590 def CheckCurExecute(self):
591 def run(cur, errors):
592 try:
593 cur.execute("select name from test")
594 errors.append("did not raise ProgrammingError")
595 return
596 except sqlite.ProgrammingError:
597 return
598 except:
599 errors.append("raised wrong exception")
600
601 errors = []
602 self.cur.execute("insert into test(name) values ('a')")
603 t = threading.Thread(target=run, kwargs={"cur": self.cur, "errors": errors})
604 t.start()
605 t.join()
606 if len(errors) > 0:
607 self.fail("\n".join(errors))
608
609 def CheckCurIterNext(self):
610 def run(cur, errors):
611 try:
612 row = cur.fetchone()
613 errors.append("did not raise ProgrammingError")
614 return
615 except sqlite.ProgrammingError:
616 return
617 except:
618 errors.append("raised wrong exception")
619
620 errors = []
621 self.cur.execute("insert into test(name) values ('a')")
622 self.cur.execute("select name from test")
623 t = threading.Thread(target=run, kwargs={"cur": self.cur, "errors": errors})
624 t.start()
625 t.join()
626 if len(errors) > 0:
627 self.fail("\n".join(errors))
628
629class ConstructorTests(unittest.TestCase):
630 def CheckDate(self):
631 d = sqlite.Date(2004, 10, 28)
632
633 def CheckTime(self):
634 t = sqlite.Time(12, 39, 35)
635
636 def CheckTimestamp(self):
637 ts = sqlite.Timestamp(2004, 10, 28, 12, 39, 35)
638
639 def CheckDateFromTicks(self):
640 d = sqlite.DateFromTicks(42)
641
642 def CheckTimeFromTicks(self):
643 t = sqlite.TimeFromTicks(42)
644
645 def CheckTimestampFromTicks(self):
646 ts = sqlite.TimestampFromTicks(42)
647
648 def CheckBinary(self):
649 b = sqlite.Binary(chr(0) + "'")
650
651class ExtensionTests(unittest.TestCase):
652 def CheckScriptStringSql(self):
653 con = sqlite.connect(":memory:")
654 cur = con.cursor()
655 cur.executescript("""
656 -- bla bla
657 /* a stupid comment */
658 create table a(i);
659 insert into a(i) values (5);
660 """)
661 cur.execute("select i from a")
662 res = cur.fetchone()[0]
Gregory P. Smith1844b0d2009-07-04 08:42:10 +0000663 self.assertEqual(res, 5)
Anthony Baxterc51ee692006-04-01 00:57:31 +0000664
665 def CheckScriptStringUnicode(self):
666 con = sqlite.connect(":memory:")
667 cur = con.cursor()
668 cur.executescript(u"""
669 create table a(i);
670 insert into a(i) values (5);
671 select i from a;
672 delete from a;
673 insert into a(i) values (6);
674 """)
675 cur.execute("select i from a")
676 res = cur.fetchone()[0]
Gregory P. Smith1844b0d2009-07-04 08:42:10 +0000677 self.assertEqual(res, 6)
Anthony Baxterc51ee692006-04-01 00:57:31 +0000678
Gerhard Häring3bbb6722010-03-05 09:12:37 +0000679 def CheckScriptSyntaxError(self):
Anthony Baxterc51ee692006-04-01 00:57:31 +0000680 con = sqlite.connect(":memory:")
681 cur = con.cursor()
682 raised = False
683 try:
Gerhard Häring3bbb6722010-03-05 09:12:37 +0000684 cur.executescript("create table test(x); asdf; create table test2(x)")
685 except sqlite.OperationalError:
Anthony Baxterc51ee692006-04-01 00:57:31 +0000686 raised = True
Gregory P. Smith1844b0d2009-07-04 08:42:10 +0000687 self.assertEqual(raised, True, "should have raised an exception")
Anthony Baxterc51ee692006-04-01 00:57:31 +0000688
689 def CheckScriptErrorNormal(self):
690 con = sqlite.connect(":memory:")
691 cur = con.cursor()
692 raised = False
693 try:
694 cur.executescript("create table test(sadfsadfdsa); select foo from hurz;")
695 except sqlite.OperationalError:
696 raised = True
Gregory P. Smith1844b0d2009-07-04 08:42:10 +0000697 self.assertEqual(raised, True, "should have raised an exception")
Anthony Baxterc51ee692006-04-01 00:57:31 +0000698
699 def CheckConnectionExecute(self):
700 con = sqlite.connect(":memory:")
701 result = con.execute("select 5").fetchone()[0]
Gregory P. Smith1844b0d2009-07-04 08:42:10 +0000702 self.assertEqual(result, 5, "Basic test of Connection.execute")
Anthony Baxterc51ee692006-04-01 00:57:31 +0000703
704 def CheckConnectionExecutemany(self):
705 con = sqlite.connect(":memory:")
706 con.execute("create table test(foo)")
707 con.executemany("insert into test(foo) values (?)", [(3,), (4,)])
708 result = con.execute("select foo from test order by foo").fetchall()
Gregory P. Smith1844b0d2009-07-04 08:42:10 +0000709 self.assertEqual(result[0][0], 3, "Basic test of Connection.executemany")
710 self.assertEqual(result[1][0], 4, "Basic test of Connection.executemany")
Anthony Baxterc51ee692006-04-01 00:57:31 +0000711
712 def CheckConnectionExecutescript(self):
713 con = sqlite.connect(":memory:")
714 con.executescript("create table test(foo); insert into test(foo) values (5);")
715 result = con.execute("select foo from test").fetchone()[0]
Gregory P. Smith1844b0d2009-07-04 08:42:10 +0000716 self.assertEqual(result, 5, "Basic test of Connection.executescript")
Anthony Baxterc51ee692006-04-01 00:57:31 +0000717
Gerhard Häring3bbb6722010-03-05 09:12:37 +0000718class ClosedConTests(unittest.TestCase):
Anthony Baxterc51ee692006-04-01 00:57:31 +0000719 def setUp(self):
720 pass
721
722 def tearDown(self):
723 pass
724
725 def CheckClosedConCursor(self):
726 con = sqlite.connect(":memory:")
727 con.close()
728 try:
729 cur = con.cursor()
730 self.fail("Should have raised a ProgrammingError")
731 except sqlite.ProgrammingError:
732 pass
733 except:
734 self.fail("Should have raised a ProgrammingError")
735
736 def CheckClosedConCommit(self):
737 con = sqlite.connect(":memory:")
738 con.close()
739 try:
740 con.commit()
741 self.fail("Should have raised a ProgrammingError")
742 except sqlite.ProgrammingError:
743 pass
744 except:
745 self.fail("Should have raised a ProgrammingError")
746
747 def CheckClosedConRollback(self):
748 con = sqlite.connect(":memory:")
749 con.close()
750 try:
751 con.rollback()
752 self.fail("Should have raised a ProgrammingError")
753 except sqlite.ProgrammingError:
754 pass
755 except:
756 self.fail("Should have raised a ProgrammingError")
757
758 def CheckClosedCurExecute(self):
759 con = sqlite.connect(":memory:")
760 cur = con.cursor()
761 con.close()
762 try:
763 cur.execute("select 4")
764 self.fail("Should have raised a ProgrammingError")
765 except sqlite.ProgrammingError:
766 pass
767 except:
768 self.fail("Should have raised a ProgrammingError")
769
Gerhard Häring3bbb6722010-03-05 09:12:37 +0000770 def CheckClosedCreateFunction(self):
771 con = sqlite.connect(":memory:")
772 con.close()
773 def f(x): return 17
774 try:
775 con.create_function("foo", 1, f)
776 self.fail("Should have raised a ProgrammingError")
777 except sqlite.ProgrammingError:
778 pass
779 except:
780 self.fail("Should have raised a ProgrammingError")
781
782 def CheckClosedCreateAggregate(self):
783 con = sqlite.connect(":memory:")
784 con.close()
785 class Agg:
786 def __init__(self):
787 pass
788 def step(self, x):
789 pass
790 def finalize(self):
791 return 17
792 try:
793 con.create_aggregate("foo", 1, Agg)
794 self.fail("Should have raised a ProgrammingError")
795 except sqlite.ProgrammingError:
796 pass
797 except:
798 self.fail("Should have raised a ProgrammingError")
799
800 def CheckClosedSetAuthorizer(self):
801 con = sqlite.connect(":memory:")
802 con.close()
803 def authorizer(*args):
804 return sqlite.DENY
805 try:
806 con.set_authorizer(authorizer)
807 self.fail("Should have raised a ProgrammingError")
808 except sqlite.ProgrammingError:
809 pass
810 except:
811 self.fail("Should have raised a ProgrammingError")
812
813 def CheckClosedSetProgressCallback(self):
814 con = sqlite.connect(":memory:")
815 con.close()
816 def progress(): pass
817 try:
818 con.set_progress_handler(progress, 100)
819 self.fail("Should have raised a ProgrammingError")
820 except sqlite.ProgrammingError:
821 pass
822 except:
823 self.fail("Should have raised a ProgrammingError")
824
825 def CheckClosedCall(self):
826 con = sqlite.connect(":memory:")
827 con.close()
828 try:
829 con()
830 self.fail("Should have raised a ProgrammingError")
831 except sqlite.ProgrammingError:
832 pass
833 except:
834 self.fail("Should have raised a ProgrammingError")
835
836class ClosedCurTests(unittest.TestCase):
837 def setUp(self):
838 pass
839
840 def tearDown(self):
841 pass
842
843 def CheckClosed(self):
844 con = sqlite.connect(":memory:")
845 cur = con.cursor()
846 cur.close()
847
848 for method_name in ("execute", "executemany", "executescript", "fetchall", "fetchmany", "fetchone"):
849 if method_name in ("execute", "executescript"):
850 params = ("select 4 union select 5",)
851 elif method_name == "executemany":
852 params = ("insert into foo(bar) values (?)", [(3,), (4,)])
853 else:
854 params = []
855
856 try:
857 method = getattr(cur, method_name)
858
859 method(*params)
860 self.fail("Should have raised a ProgrammingError: method " + method_name)
861 except sqlite.ProgrammingError:
862 pass
863 except:
864 self.fail("Should have raised a ProgrammingError: " + method_name)
865
Anthony Baxterc51ee692006-04-01 00:57:31 +0000866def suite():
867 module_suite = unittest.makeSuite(ModuleTests, "Check")
868 connection_suite = unittest.makeSuite(ConnectionTests, "Check")
869 cursor_suite = unittest.makeSuite(CursorTests, "Check")
870 thread_suite = unittest.makeSuite(ThreadTests, "Check")
871 constructor_suite = unittest.makeSuite(ConstructorTests, "Check")
872 ext_suite = unittest.makeSuite(ExtensionTests, "Check")
Gerhard Häring3bbb6722010-03-05 09:12:37 +0000873 closed_con_suite = unittest.makeSuite(ClosedConTests, "Check")
874 closed_cur_suite = unittest.makeSuite(ClosedCurTests, "Check")
875 return unittest.TestSuite((module_suite, connection_suite, cursor_suite, thread_suite, constructor_suite, ext_suite, closed_con_suite, closed_cur_suite))
Anthony Baxterc51ee692006-04-01 00:57:31 +0000876
877def test():
878 runner = unittest.TextTestRunner()
879 runner.run(suite())
880
881if __name__ == "__main__":
882 test()