blob: c356d4706c475d4f038e000a3e616b60f60b3e0f [file] [log] [blame]
Anthony Baxterc51ee692006-04-01 00:57:31 +00001#-*- coding: ISO-8859-1 -*-
2# pysqlite2/test/dbapi.py: tests for DB-API compliance
3#
Gerhard Häring3bbb6722010-03-05 09:12:37 +00004# Copyright (C) 2004-2010 Gerhard Häring <gh@ghaering.de>
Anthony Baxterc51ee692006-04-01 00:57:31 +00005#
6# This file is part of pysqlite.
7#
8# This software is provided 'as-is', without any express or implied
9# warranty. In no event will the authors be held liable for any damages
10# arising from the use of this software.
11#
12# Permission is granted to anyone to use this software for any purpose,
13# including commercial applications, and to alter it and redistribute it
14# freely, subject to the following restrictions:
15#
16# 1. The origin of this software must not be misrepresented; you must not
17# claim that you wrote the original software. If you use this software
18# in a product, an acknowledgment in the product documentation would be
19# appreciated but is not required.
20# 2. Altered source versions must be plainly marked as such, and must not be
21# misrepresented as being the original software.
22# 3. This notice may not be removed or altered from any source distribution.
23
24import unittest
Gerhard Häring1cc60ed2008-02-29 22:08:41 +000025import sys
Anthony Baxterc51ee692006-04-01 00:57:31 +000026import sqlite3 as sqlite
Victor Stinner6a102812010-04-27 23:55:59 +000027try:
28 import threading
29except ImportError:
30 threading = None
Anthony Baxterc51ee692006-04-01 00:57:31 +000031
32class ModuleTests(unittest.TestCase):
33 def CheckAPILevel(self):
34 self.assertEqual(sqlite.apilevel, "2.0",
35 "apilevel is %s, should be 2.0" % sqlite.apilevel)
36
37 def CheckThreadSafety(self):
38 self.assertEqual(sqlite.threadsafety, 1,
39 "threadsafety is %d, should be 1" % sqlite.threadsafety)
40
41 def CheckParamStyle(self):
42 self.assertEqual(sqlite.paramstyle, "qmark",
43 "paramstyle is '%s', should be 'qmark'" %
44 sqlite.paramstyle)
45
46 def CheckWarning(self):
Ezio Melotti2623a372010-11-21 13:34:58 +000047 self.assertTrue(issubclass(sqlite.Warning, StandardError),
48 "Warning is not a subclass of StandardError")
Anthony Baxterc51ee692006-04-01 00:57:31 +000049
50 def CheckError(self):
Gregory P. Smith1844b0d2009-07-04 08:42:10 +000051 self.assertTrue(issubclass(sqlite.Error, StandardError),
Anthony Baxterc51ee692006-04-01 00:57:31 +000052 "Error is not a subclass of StandardError")
53
54 def CheckInterfaceError(self):
Gregory P. Smith1844b0d2009-07-04 08:42:10 +000055 self.assertTrue(issubclass(sqlite.InterfaceError, sqlite.Error),
Anthony Baxterc51ee692006-04-01 00:57:31 +000056 "InterfaceError is not a subclass of Error")
57
58 def CheckDatabaseError(self):
Gregory P. Smith1844b0d2009-07-04 08:42:10 +000059 self.assertTrue(issubclass(sqlite.DatabaseError, sqlite.Error),
Anthony Baxterc51ee692006-04-01 00:57:31 +000060 "DatabaseError is not a subclass of Error")
61
62 def CheckDataError(self):
Gregory P. Smith1844b0d2009-07-04 08:42:10 +000063 self.assertTrue(issubclass(sqlite.DataError, sqlite.DatabaseError),
Anthony Baxterc51ee692006-04-01 00:57:31 +000064 "DataError is not a subclass of DatabaseError")
65
66 def CheckOperationalError(self):
Gregory P. Smith1844b0d2009-07-04 08:42:10 +000067 self.assertTrue(issubclass(sqlite.OperationalError, sqlite.DatabaseError),
Anthony Baxterc51ee692006-04-01 00:57:31 +000068 "OperationalError is not a subclass of DatabaseError")
69
70 def CheckIntegrityError(self):
Gregory P. Smith1844b0d2009-07-04 08:42:10 +000071 self.assertTrue(issubclass(sqlite.IntegrityError, sqlite.DatabaseError),
Anthony Baxterc51ee692006-04-01 00:57:31 +000072 "IntegrityError is not a subclass of DatabaseError")
73
74 def CheckInternalError(self):
Gregory P. Smith1844b0d2009-07-04 08:42:10 +000075 self.assertTrue(issubclass(sqlite.InternalError, sqlite.DatabaseError),
Anthony Baxterc51ee692006-04-01 00:57:31 +000076 "InternalError is not a subclass of DatabaseError")
77
78 def CheckProgrammingError(self):
Gregory P. Smith1844b0d2009-07-04 08:42:10 +000079 self.assertTrue(issubclass(sqlite.ProgrammingError, sqlite.DatabaseError),
Anthony Baxterc51ee692006-04-01 00:57:31 +000080 "ProgrammingError is not a subclass of DatabaseError")
81
82 def CheckNotSupportedError(self):
Gregory P. Smith1844b0d2009-07-04 08:42:10 +000083 self.assertTrue(issubclass(sqlite.NotSupportedError,
Anthony Baxterc51ee692006-04-01 00:57:31 +000084 sqlite.DatabaseError),
85 "NotSupportedError is not a subclass of DatabaseError")
86
87class ConnectionTests(unittest.TestCase):
88 def setUp(self):
89 self.cx = sqlite.connect(":memory:")
90 cu = self.cx.cursor()
91 cu.execute("create table test(id integer primary key, name text)")
92 cu.execute("insert into test(name) values (?)", ("foo",))
93
94 def tearDown(self):
95 self.cx.close()
96
97 def CheckCommit(self):
98 self.cx.commit()
99
100 def CheckCommitAfterNoChanges(self):
101 """
102 A commit should also work when no changes were made to the database.
103 """
104 self.cx.commit()
105 self.cx.commit()
106
107 def CheckRollback(self):
108 self.cx.rollback()
109
110 def CheckRollbackAfterNoChanges(self):
111 """
112 A rollback should also work when no changes were made to the database.
113 """
114 self.cx.rollback()
115 self.cx.rollback()
116
117 def CheckCursor(self):
118 cu = self.cx.cursor()
119
120 def CheckFailedOpen(self):
121 YOU_CANNOT_OPEN_THIS = "/foo/bar/bla/23534/mydb.db"
122 try:
123 con = sqlite.connect(YOU_CANNOT_OPEN_THIS)
124 except sqlite.OperationalError:
125 return
126 self.fail("should have raised an OperationalError")
127
128 def CheckClose(self):
129 self.cx.close()
130
131 def CheckExceptions(self):
132 # Optional DB-API extension.
Gregory P. Smith1844b0d2009-07-04 08:42:10 +0000133 self.assertEqual(self.cx.Warning, sqlite.Warning)
134 self.assertEqual(self.cx.Error, sqlite.Error)
135 self.assertEqual(self.cx.InterfaceError, sqlite.InterfaceError)
136 self.assertEqual(self.cx.DatabaseError, sqlite.DatabaseError)
137 self.assertEqual(self.cx.DataError, sqlite.DataError)
138 self.assertEqual(self.cx.OperationalError, sqlite.OperationalError)
139 self.assertEqual(self.cx.IntegrityError, sqlite.IntegrityError)
140 self.assertEqual(self.cx.InternalError, sqlite.InternalError)
141 self.assertEqual(self.cx.ProgrammingError, sqlite.ProgrammingError)
142 self.assertEqual(self.cx.NotSupportedError, sqlite.NotSupportedError)
Anthony Baxterc51ee692006-04-01 00:57:31 +0000143
144class CursorTests(unittest.TestCase):
145 def setUp(self):
146 self.cx = sqlite.connect(":memory:")
147 self.cu = self.cx.cursor()
148 self.cu.execute("create table test(id integer primary key, name text, income number)")
149 self.cu.execute("insert into test(name) values (?)", ("foo",))
150
151 def tearDown(self):
152 self.cu.close()
153 self.cx.close()
154
155 def CheckExecuteNoArgs(self):
156 self.cu.execute("delete from test")
157
158 def CheckExecuteIllegalSql(self):
159 try:
160 self.cu.execute("select asdf")
161 self.fail("should have raised an OperationalError")
162 except sqlite.OperationalError:
163 return
164 except:
165 self.fail("raised wrong exception")
166
167 def CheckExecuteTooMuchSql(self):
168 try:
169 self.cu.execute("select 5+4; select 4+5")
170 self.fail("should have raised a Warning")
171 except sqlite.Warning:
172 return
173 except:
174 self.fail("raised wrong exception")
175
176 def CheckExecuteTooMuchSql2(self):
177 self.cu.execute("select 5+4; -- foo bar")
178
179 def CheckExecuteTooMuchSql3(self):
180 self.cu.execute("""
181 select 5+4;
182
183 /*
184 foo
185 */
186 """)
187
188 def CheckExecuteWrongSqlArg(self):
189 try:
190 self.cu.execute(42)
191 self.fail("should have raised a ValueError")
192 except ValueError:
193 return
194 except:
195 self.fail("raised wrong exception.")
196
197 def CheckExecuteArgInt(self):
198 self.cu.execute("insert into test(id) values (?)", (42,))
199
200 def CheckExecuteArgFloat(self):
201 self.cu.execute("insert into test(income) values (?)", (2500.32,))
202
203 def CheckExecuteArgString(self):
204 self.cu.execute("insert into test(name) values (?)", ("Hugo",))
205
Petri Lehtinen0518f472012-02-01 22:20:12 +0200206 def CheckExecuteArgStringWithZeroByte(self):
207 self.cu.execute("insert into test(name) values (?)", ("Hu\x00go",))
208
209 self.cu.execute("select name from test where id=?", (self.cu.lastrowid,))
210 row = self.cu.fetchone()
211 self.assertEqual(row[0], "Hu\x00go")
212
Anthony Baxterc51ee692006-04-01 00:57:31 +0000213 def CheckExecuteWrongNoOfArgs1(self):
214 # too many parameters
215 try:
216 self.cu.execute("insert into test(id) values (?)", (17, "Egon"))
217 self.fail("should have raised ProgrammingError")
218 except sqlite.ProgrammingError:
219 pass
220
221 def CheckExecuteWrongNoOfArgs2(self):
222 # too little parameters
223 try:
224 self.cu.execute("insert into test(id) values (?)")
225 self.fail("should have raised ProgrammingError")
226 except sqlite.ProgrammingError:
227 pass
228
229 def CheckExecuteWrongNoOfArgs3(self):
230 # no parameters, parameters are needed
231 try:
232 self.cu.execute("insert into test(id) values (?)")
233 self.fail("should have raised ProgrammingError")
234 except sqlite.ProgrammingError:
235 pass
236
Gerhard Häring1cc60ed2008-02-29 22:08:41 +0000237 def CheckExecuteParamList(self):
238 self.cu.execute("insert into test(name) values ('foo')")
239 self.cu.execute("select name from test where name=?", ["foo"])
240 row = self.cu.fetchone()
Gregory P. Smith1844b0d2009-07-04 08:42:10 +0000241 self.assertEqual(row[0], "foo")
Gerhard Häring1cc60ed2008-02-29 22:08:41 +0000242
243 def CheckExecuteParamSequence(self):
244 class L(object):
245 def __len__(self):
246 return 1
247 def __getitem__(self, x):
248 assert x == 0
249 return "foo"
250
251 self.cu.execute("insert into test(name) values ('foo')")
252 self.cu.execute("select name from test where name=?", L())
253 row = self.cu.fetchone()
Gregory P. Smith1844b0d2009-07-04 08:42:10 +0000254 self.assertEqual(row[0], "foo")
Gerhard Häring1cc60ed2008-02-29 22:08:41 +0000255
Anthony Baxterc51ee692006-04-01 00:57:31 +0000256 def CheckExecuteDictMapping(self):
257 self.cu.execute("insert into test(name) values ('foo')")
258 self.cu.execute("select name from test where name=:name", {"name": "foo"})
259 row = self.cu.fetchone()
Gregory P. Smith1844b0d2009-07-04 08:42:10 +0000260 self.assertEqual(row[0], "foo")
Anthony Baxterc51ee692006-04-01 00:57:31 +0000261
Gerhard Häring1cc60ed2008-02-29 22:08:41 +0000262 def CheckExecuteDictMapping_Mapping(self):
263 # Test only works with Python 2.5 or later
264 if sys.version_info < (2, 5, 0):
265 return
266
267 class D(dict):
268 def __missing__(self, key):
269 return "foo"
270
271 self.cu.execute("insert into test(name) values ('foo')")
272 self.cu.execute("select name from test where name=:name", D())
273 row = self.cu.fetchone()
Gregory P. Smith1844b0d2009-07-04 08:42:10 +0000274 self.assertEqual(row[0], "foo")
Gerhard Häring1cc60ed2008-02-29 22:08:41 +0000275
Anthony Baxterc51ee692006-04-01 00:57:31 +0000276 def CheckExecuteDictMappingTooLittleArgs(self):
277 self.cu.execute("insert into test(name) values ('foo')")
278 try:
279 self.cu.execute("select name from test where name=:name and id=:id", {"name": "foo"})
280 self.fail("should have raised ProgrammingError")
281 except sqlite.ProgrammingError:
282 pass
283
284 def CheckExecuteDictMappingNoArgs(self):
285 self.cu.execute("insert into test(name) values ('foo')")
286 try:
287 self.cu.execute("select name from test where name=:name")
288 self.fail("should have raised ProgrammingError")
289 except sqlite.ProgrammingError:
290 pass
291
292 def CheckExecuteDictMappingUnnamed(self):
293 self.cu.execute("insert into test(name) values ('foo')")
294 try:
295 self.cu.execute("select name from test where name=?", {"name": "foo"})
296 self.fail("should have raised ProgrammingError")
297 except sqlite.ProgrammingError:
298 pass
299
300 def CheckClose(self):
301 self.cu.close()
302
303 def CheckRowcountExecute(self):
304 self.cu.execute("delete from test")
305 self.cu.execute("insert into test(name) values ('foo')")
306 self.cu.execute("insert into test(name) values ('foo')")
307 self.cu.execute("update test set name='bar'")
Gregory P. Smith1844b0d2009-07-04 08:42:10 +0000308 self.assertEqual(self.cu.rowcount, 2)
Anthony Baxterc51ee692006-04-01 00:57:31 +0000309
Gerhard Häring7f7ca352008-05-31 21:33:27 +0000310 def CheckRowcountSelect(self):
311 """
312 pysqlite does not know the rowcount of SELECT statements, because we
313 don't fetch all rows after executing the select statement. The rowcount
314 has thus to be -1.
315 """
316 self.cu.execute("select 5 union select 6")
Gregory P. Smith1844b0d2009-07-04 08:42:10 +0000317 self.assertEqual(self.cu.rowcount, -1)
Gerhard Häring7f7ca352008-05-31 21:33:27 +0000318
Anthony Baxterc51ee692006-04-01 00:57:31 +0000319 def CheckRowcountExecutemany(self):
320 self.cu.execute("delete from test")
321 self.cu.executemany("insert into test(name) values (?)", [(1,), (2,), (3,)])
Gregory P. Smith1844b0d2009-07-04 08:42:10 +0000322 self.assertEqual(self.cu.rowcount, 3)
Anthony Baxterc51ee692006-04-01 00:57:31 +0000323
Anthony Baxter72289a62006-04-04 06:29:05 +0000324 def CheckTotalChanges(self):
325 self.cu.execute("insert into test(name) values ('foo')")
326 self.cu.execute("insert into test(name) values ('foo')")
327 if self.cx.total_changes < 2:
328 self.fail("total changes reported wrong value")
329
Anthony Baxterc51ee692006-04-01 00:57:31 +0000330 # Checks for executemany:
331 # Sequences are required by the DB-API, iterators
332 # enhancements in pysqlite.
333
334 def CheckExecuteManySequence(self):
335 self.cu.executemany("insert into test(income) values (?)", [(x,) for x in range(100, 110)])
336
337 def CheckExecuteManyIterator(self):
338 class MyIter:
339 def __init__(self):
340 self.value = 5
341
342 def next(self):
343 if self.value == 10:
344 raise StopIteration
345 else:
346 self.value += 1
347 return (self.value,)
348
349 self.cu.executemany("insert into test(income) values (?)", MyIter())
350
351 def CheckExecuteManyGenerator(self):
352 def mygen():
353 for i in range(5):
354 yield (i,)
355
356 self.cu.executemany("insert into test(income) values (?)", mygen())
357
358 def CheckExecuteManyWrongSqlArg(self):
359 try:
360 self.cu.executemany(42, [(3,)])
361 self.fail("should have raised a ValueError")
362 except ValueError:
363 return
364 except:
365 self.fail("raised wrong exception.")
366
367 def CheckExecuteManySelect(self):
368 try:
369 self.cu.executemany("select ?", [(3,)])
370 self.fail("should have raised a ProgrammingError")
371 except sqlite.ProgrammingError:
372 return
373 except:
374 self.fail("raised wrong exception.")
375
376 def CheckExecuteManyNotIterable(self):
377 try:
378 self.cu.executemany("insert into test(income) values (?)", 42)
379 self.fail("should have raised a TypeError")
380 except TypeError:
381 return
382 except Exception, e:
383 print "raised", e.__class__
384 self.fail("raised wrong exception.")
385
386 def CheckFetchIter(self):
387 # Optional DB-API extension.
388 self.cu.execute("delete from test")
389 self.cu.execute("insert into test(id) values (?)", (5,))
390 self.cu.execute("insert into test(id) values (?)", (6,))
391 self.cu.execute("select id from test order by id")
392 lst = []
393 for row in self.cu:
394 lst.append(row[0])
Gregory P. Smith1844b0d2009-07-04 08:42:10 +0000395 self.assertEqual(lst[0], 5)
396 self.assertEqual(lst[1], 6)
Anthony Baxterc51ee692006-04-01 00:57:31 +0000397
398 def CheckFetchone(self):
399 self.cu.execute("select name from test")
400 row = self.cu.fetchone()
Gregory P. Smith1844b0d2009-07-04 08:42:10 +0000401 self.assertEqual(row[0], "foo")
Anthony Baxterc51ee692006-04-01 00:57:31 +0000402 row = self.cu.fetchone()
Gregory P. Smith1844b0d2009-07-04 08:42:10 +0000403 self.assertEqual(row, None)
Anthony Baxterc51ee692006-04-01 00:57:31 +0000404
405 def CheckFetchoneNoStatement(self):
406 cur = self.cx.cursor()
407 row = cur.fetchone()
Gregory P. Smith1844b0d2009-07-04 08:42:10 +0000408 self.assertEqual(row, None)
Anthony Baxterc51ee692006-04-01 00:57:31 +0000409
410 def CheckArraySize(self):
411 # must default ot 1
Gregory P. Smith1844b0d2009-07-04 08:42:10 +0000412 self.assertEqual(self.cu.arraysize, 1)
Anthony Baxterc51ee692006-04-01 00:57:31 +0000413
414 # now set to 2
415 self.cu.arraysize = 2
416
417 # now make the query return 3 rows
418 self.cu.execute("delete from test")
419 self.cu.execute("insert into test(name) values ('A')")
420 self.cu.execute("insert into test(name) values ('B')")
421 self.cu.execute("insert into test(name) values ('C')")
422 self.cu.execute("select name from test")
423 res = self.cu.fetchmany()
424
Gregory P. Smith1844b0d2009-07-04 08:42:10 +0000425 self.assertEqual(len(res), 2)
Anthony Baxterc51ee692006-04-01 00:57:31 +0000426
427 def CheckFetchmany(self):
428 self.cu.execute("select name from test")
429 res = self.cu.fetchmany(100)
Gregory P. Smith1844b0d2009-07-04 08:42:10 +0000430 self.assertEqual(len(res), 1)
Anthony Baxterc51ee692006-04-01 00:57:31 +0000431 res = self.cu.fetchmany(100)
Gregory P. Smith1844b0d2009-07-04 08:42:10 +0000432 self.assertEqual(res, [])
Anthony Baxterc51ee692006-04-01 00:57:31 +0000433
Gerhard Häring1cc60ed2008-02-29 22:08:41 +0000434 def CheckFetchmanyKwArg(self):
435 """Checks if fetchmany works with keyword arguments"""
436 self.cu.execute("select name from test")
437 res = self.cu.fetchmany(size=100)
Gregory P. Smith1844b0d2009-07-04 08:42:10 +0000438 self.assertEqual(len(res), 1)
Gerhard Häring1cc60ed2008-02-29 22:08:41 +0000439
Anthony Baxterc51ee692006-04-01 00:57:31 +0000440 def CheckFetchall(self):
441 self.cu.execute("select name from test")
442 res = self.cu.fetchall()
Gregory P. Smith1844b0d2009-07-04 08:42:10 +0000443 self.assertEqual(len(res), 1)
Anthony Baxterc51ee692006-04-01 00:57:31 +0000444 res = self.cu.fetchall()
Gregory P. Smith1844b0d2009-07-04 08:42:10 +0000445 self.assertEqual(res, [])
Anthony Baxterc51ee692006-04-01 00:57:31 +0000446
447 def CheckSetinputsizes(self):
448 self.cu.setinputsizes([3, 4, 5])
449
450 def CheckSetoutputsize(self):
451 self.cu.setoutputsize(5, 0)
452
453 def CheckSetoutputsizeNoColumn(self):
454 self.cu.setoutputsize(42)
455
456 def CheckCursorConnection(self):
457 # Optional DB-API extension.
Gregory P. Smith1844b0d2009-07-04 08:42:10 +0000458 self.assertEqual(self.cu.connection, self.cx)
Anthony Baxterc51ee692006-04-01 00:57:31 +0000459
460 def CheckWrongCursorCallable(self):
461 try:
462 def f(): pass
463 cur = self.cx.cursor(f)
464 self.fail("should have raised a TypeError")
465 except TypeError:
466 return
467 self.fail("should have raised a ValueError")
468
469 def CheckCursorWrongClass(self):
470 class Foo: pass
471 foo = Foo()
472 try:
473 cur = sqlite.Cursor(foo)
474 self.fail("should have raised a ValueError")
475 except TypeError:
476 pass
477
Victor Stinner6a102812010-04-27 23:55:59 +0000478@unittest.skipUnless(threading, 'This test requires threading.')
Anthony Baxterc51ee692006-04-01 00:57:31 +0000479class ThreadTests(unittest.TestCase):
480 def setUp(self):
481 self.con = sqlite.connect(":memory:")
482 self.cur = self.con.cursor()
483 self.cur.execute("create table test(id integer primary key, name text, bin binary, ratio number, ts timestamp)")
484
485 def tearDown(self):
486 self.cur.close()
487 self.con.close()
488
489 def CheckConCursor(self):
490 def run(con, errors):
491 try:
492 cur = con.cursor()
493 errors.append("did not raise ProgrammingError")
494 return
495 except sqlite.ProgrammingError:
496 return
497 except:
498 errors.append("raised wrong exception")
499
500 errors = []
501 t = threading.Thread(target=run, kwargs={"con": self.con, "errors": errors})
502 t.start()
503 t.join()
504 if len(errors) > 0:
505 self.fail("\n".join(errors))
506
507 def CheckConCommit(self):
508 def run(con, errors):
509 try:
510 con.commit()
511 errors.append("did not raise ProgrammingError")
512 return
513 except sqlite.ProgrammingError:
514 return
515 except:
516 errors.append("raised wrong exception")
517
518 errors = []
519 t = threading.Thread(target=run, kwargs={"con": self.con, "errors": errors})
520 t.start()
521 t.join()
522 if len(errors) > 0:
523 self.fail("\n".join(errors))
524
525 def CheckConRollback(self):
526 def run(con, errors):
527 try:
528 con.rollback()
529 errors.append("did not raise ProgrammingError")
530 return
531 except sqlite.ProgrammingError:
532 return
533 except:
534 errors.append("raised wrong exception")
535
536 errors = []
537 t = threading.Thread(target=run, kwargs={"con": self.con, "errors": errors})
538 t.start()
539 t.join()
540 if len(errors) > 0:
541 self.fail("\n".join(errors))
542
543 def CheckConClose(self):
544 def run(con, errors):
545 try:
546 con.close()
547 errors.append("did not raise ProgrammingError")
548 return
549 except sqlite.ProgrammingError:
550 return
551 except:
552 errors.append("raised wrong exception")
553
554 errors = []
555 t = threading.Thread(target=run, kwargs={"con": self.con, "errors": errors})
556 t.start()
557 t.join()
558 if len(errors) > 0:
559 self.fail("\n".join(errors))
560
561 def CheckCurImplicitBegin(self):
562 def run(cur, errors):
563 try:
564 cur.execute("insert into test(name) values ('a')")
565 errors.append("did not raise ProgrammingError")
566 return
567 except sqlite.ProgrammingError:
568 return
569 except:
570 errors.append("raised wrong exception")
571
572 errors = []
573 t = threading.Thread(target=run, kwargs={"cur": self.cur, "errors": errors})
574 t.start()
575 t.join()
576 if len(errors) > 0:
577 self.fail("\n".join(errors))
578
579 def CheckCurClose(self):
580 def run(cur, errors):
581 try:
582 cur.close()
583 errors.append("did not raise ProgrammingError")
584 return
585 except sqlite.ProgrammingError:
586 return
587 except:
588 errors.append("raised wrong exception")
589
590 errors = []
591 t = threading.Thread(target=run, kwargs={"cur": self.cur, "errors": errors})
592 t.start()
593 t.join()
594 if len(errors) > 0:
595 self.fail("\n".join(errors))
596
597 def CheckCurExecute(self):
598 def run(cur, errors):
599 try:
600 cur.execute("select name from test")
601 errors.append("did not raise ProgrammingError")
602 return
603 except sqlite.ProgrammingError:
604 return
605 except:
606 errors.append("raised wrong exception")
607
608 errors = []
609 self.cur.execute("insert into test(name) values ('a')")
610 t = threading.Thread(target=run, kwargs={"cur": self.cur, "errors": errors})
611 t.start()
612 t.join()
613 if len(errors) > 0:
614 self.fail("\n".join(errors))
615
616 def CheckCurIterNext(self):
617 def run(cur, errors):
618 try:
619 row = cur.fetchone()
620 errors.append("did not raise ProgrammingError")
621 return
622 except sqlite.ProgrammingError:
623 return
624 except:
625 errors.append("raised wrong exception")
626
627 errors = []
628 self.cur.execute("insert into test(name) values ('a')")
629 self.cur.execute("select name from test")
630 t = threading.Thread(target=run, kwargs={"cur": self.cur, "errors": errors})
631 t.start()
632 t.join()
633 if len(errors) > 0:
634 self.fail("\n".join(errors))
635
636class ConstructorTests(unittest.TestCase):
637 def CheckDate(self):
638 d = sqlite.Date(2004, 10, 28)
639
640 def CheckTime(self):
641 t = sqlite.Time(12, 39, 35)
642
643 def CheckTimestamp(self):
644 ts = sqlite.Timestamp(2004, 10, 28, 12, 39, 35)
645
646 def CheckDateFromTicks(self):
647 d = sqlite.DateFromTicks(42)
648
649 def CheckTimeFromTicks(self):
650 t = sqlite.TimeFromTicks(42)
651
652 def CheckTimestampFromTicks(self):
653 ts = sqlite.TimestampFromTicks(42)
654
655 def CheckBinary(self):
656 b = sqlite.Binary(chr(0) + "'")
657
658class ExtensionTests(unittest.TestCase):
659 def CheckScriptStringSql(self):
660 con = sqlite.connect(":memory:")
661 cur = con.cursor()
662 cur.executescript("""
663 -- bla bla
664 /* a stupid comment */
665 create table a(i);
666 insert into a(i) values (5);
667 """)
668 cur.execute("select i from a")
669 res = cur.fetchone()[0]
Gregory P. Smith1844b0d2009-07-04 08:42:10 +0000670 self.assertEqual(res, 5)
Anthony Baxterc51ee692006-04-01 00:57:31 +0000671
672 def CheckScriptStringUnicode(self):
673 con = sqlite.connect(":memory:")
674 cur = con.cursor()
675 cur.executescript(u"""
676 create table a(i);
677 insert into a(i) values (5);
678 select i from a;
679 delete from a;
680 insert into a(i) values (6);
681 """)
682 cur.execute("select i from a")
683 res = cur.fetchone()[0]
Gregory P. Smith1844b0d2009-07-04 08:42:10 +0000684 self.assertEqual(res, 6)
Anthony Baxterc51ee692006-04-01 00:57:31 +0000685
Gerhard Häring3bbb6722010-03-05 09:12:37 +0000686 def CheckScriptSyntaxError(self):
Anthony Baxterc51ee692006-04-01 00:57:31 +0000687 con = sqlite.connect(":memory:")
688 cur = con.cursor()
689 raised = False
690 try:
Gerhard Häring3bbb6722010-03-05 09:12:37 +0000691 cur.executescript("create table test(x); asdf; create table test2(x)")
692 except sqlite.OperationalError:
Anthony Baxterc51ee692006-04-01 00:57:31 +0000693 raised = True
Gregory P. Smith1844b0d2009-07-04 08:42:10 +0000694 self.assertEqual(raised, True, "should have raised an exception")
Anthony Baxterc51ee692006-04-01 00:57:31 +0000695
696 def CheckScriptErrorNormal(self):
697 con = sqlite.connect(":memory:")
698 cur = con.cursor()
699 raised = False
700 try:
701 cur.executescript("create table test(sadfsadfdsa); select foo from hurz;")
702 except sqlite.OperationalError:
703 raised = True
Gregory P. Smith1844b0d2009-07-04 08:42:10 +0000704 self.assertEqual(raised, True, "should have raised an exception")
Anthony Baxterc51ee692006-04-01 00:57:31 +0000705
706 def CheckConnectionExecute(self):
707 con = sqlite.connect(":memory:")
708 result = con.execute("select 5").fetchone()[0]
Gregory P. Smith1844b0d2009-07-04 08:42:10 +0000709 self.assertEqual(result, 5, "Basic test of Connection.execute")
Anthony Baxterc51ee692006-04-01 00:57:31 +0000710
711 def CheckConnectionExecutemany(self):
712 con = sqlite.connect(":memory:")
713 con.execute("create table test(foo)")
714 con.executemany("insert into test(foo) values (?)", [(3,), (4,)])
715 result = con.execute("select foo from test order by foo").fetchall()
Gregory P. Smith1844b0d2009-07-04 08:42:10 +0000716 self.assertEqual(result[0][0], 3, "Basic test of Connection.executemany")
717 self.assertEqual(result[1][0], 4, "Basic test of Connection.executemany")
Anthony Baxterc51ee692006-04-01 00:57:31 +0000718
719 def CheckConnectionExecutescript(self):
720 con = sqlite.connect(":memory:")
721 con.executescript("create table test(foo); insert into test(foo) values (5);")
722 result = con.execute("select foo from test").fetchone()[0]
Gregory P. Smith1844b0d2009-07-04 08:42:10 +0000723 self.assertEqual(result, 5, "Basic test of Connection.executescript")
Anthony Baxterc51ee692006-04-01 00:57:31 +0000724
Gerhard Häring3bbb6722010-03-05 09:12:37 +0000725class ClosedConTests(unittest.TestCase):
Anthony Baxterc51ee692006-04-01 00:57:31 +0000726 def setUp(self):
727 pass
728
729 def tearDown(self):
730 pass
731
732 def CheckClosedConCursor(self):
733 con = sqlite.connect(":memory:")
734 con.close()
735 try:
736 cur = con.cursor()
737 self.fail("Should have raised a ProgrammingError")
738 except sqlite.ProgrammingError:
739 pass
740 except:
741 self.fail("Should have raised a ProgrammingError")
742
743 def CheckClosedConCommit(self):
744 con = sqlite.connect(":memory:")
745 con.close()
746 try:
747 con.commit()
748 self.fail("Should have raised a ProgrammingError")
749 except sqlite.ProgrammingError:
750 pass
751 except:
752 self.fail("Should have raised a ProgrammingError")
753
754 def CheckClosedConRollback(self):
755 con = sqlite.connect(":memory:")
756 con.close()
757 try:
758 con.rollback()
759 self.fail("Should have raised a ProgrammingError")
760 except sqlite.ProgrammingError:
761 pass
762 except:
763 self.fail("Should have raised a ProgrammingError")
764
765 def CheckClosedCurExecute(self):
766 con = sqlite.connect(":memory:")
767 cur = con.cursor()
768 con.close()
769 try:
770 cur.execute("select 4")
771 self.fail("Should have raised a ProgrammingError")
772 except sqlite.ProgrammingError:
773 pass
774 except:
775 self.fail("Should have raised a ProgrammingError")
776
Gerhard Häring3bbb6722010-03-05 09:12:37 +0000777 def CheckClosedCreateFunction(self):
778 con = sqlite.connect(":memory:")
779 con.close()
780 def f(x): return 17
781 try:
782 con.create_function("foo", 1, f)
783 self.fail("Should have raised a ProgrammingError")
784 except sqlite.ProgrammingError:
785 pass
786 except:
787 self.fail("Should have raised a ProgrammingError")
788
789 def CheckClosedCreateAggregate(self):
790 con = sqlite.connect(":memory:")
791 con.close()
792 class Agg:
793 def __init__(self):
794 pass
795 def step(self, x):
796 pass
797 def finalize(self):
798 return 17
799 try:
800 con.create_aggregate("foo", 1, Agg)
801 self.fail("Should have raised a ProgrammingError")
802 except sqlite.ProgrammingError:
803 pass
804 except:
805 self.fail("Should have raised a ProgrammingError")
806
807 def CheckClosedSetAuthorizer(self):
808 con = sqlite.connect(":memory:")
809 con.close()
810 def authorizer(*args):
811 return sqlite.DENY
812 try:
813 con.set_authorizer(authorizer)
814 self.fail("Should have raised a ProgrammingError")
815 except sqlite.ProgrammingError:
816 pass
817 except:
818 self.fail("Should have raised a ProgrammingError")
819
820 def CheckClosedSetProgressCallback(self):
821 con = sqlite.connect(":memory:")
822 con.close()
823 def progress(): pass
824 try:
825 con.set_progress_handler(progress, 100)
826 self.fail("Should have raised a ProgrammingError")
827 except sqlite.ProgrammingError:
828 pass
829 except:
830 self.fail("Should have raised a ProgrammingError")
831
832 def CheckClosedCall(self):
833 con = sqlite.connect(":memory:")
834 con.close()
835 try:
836 con()
837 self.fail("Should have raised a ProgrammingError")
838 except sqlite.ProgrammingError:
839 pass
840 except:
841 self.fail("Should have raised a ProgrammingError")
842
843class ClosedCurTests(unittest.TestCase):
844 def setUp(self):
845 pass
846
847 def tearDown(self):
848 pass
849
850 def CheckClosed(self):
851 con = sqlite.connect(":memory:")
852 cur = con.cursor()
853 cur.close()
854
855 for method_name in ("execute", "executemany", "executescript", "fetchall", "fetchmany", "fetchone"):
856 if method_name in ("execute", "executescript"):
857 params = ("select 4 union select 5",)
858 elif method_name == "executemany":
859 params = ("insert into foo(bar) values (?)", [(3,), (4,)])
860 else:
861 params = []
862
863 try:
864 method = getattr(cur, method_name)
865
866 method(*params)
867 self.fail("Should have raised a ProgrammingError: method " + method_name)
868 except sqlite.ProgrammingError:
869 pass
870 except:
871 self.fail("Should have raised a ProgrammingError: " + method_name)
872
Anthony Baxterc51ee692006-04-01 00:57:31 +0000873def suite():
874 module_suite = unittest.makeSuite(ModuleTests, "Check")
875 connection_suite = unittest.makeSuite(ConnectionTests, "Check")
876 cursor_suite = unittest.makeSuite(CursorTests, "Check")
877 thread_suite = unittest.makeSuite(ThreadTests, "Check")
878 constructor_suite = unittest.makeSuite(ConstructorTests, "Check")
879 ext_suite = unittest.makeSuite(ExtensionTests, "Check")
Gerhard Häring3bbb6722010-03-05 09:12:37 +0000880 closed_con_suite = unittest.makeSuite(ClosedConTests, "Check")
881 closed_cur_suite = unittest.makeSuite(ClosedCurTests, "Check")
882 return unittest.TestSuite((module_suite, connection_suite, cursor_suite, thread_suite, constructor_suite, ext_suite, closed_con_suite, closed_cur_suite))
Anthony Baxterc51ee692006-04-01 00:57:31 +0000883
884def test():
885 runner = unittest.TextTestRunner()
886 runner.run(suite())
887
888if __name__ == "__main__":
889 test()