blob: 0d4797709962ac302161825e0a89820cd8a2a914 [file] [log] [blame]
Anthony Baxterc51ee692006-04-01 00:57:31 +00001#-*- coding: ISO-8859-1 -*-
2# pysqlite2/test/dbapi.py: tests for DB-API compliance
3#
4# Copyright (C) 2004-2005 Gerhard Häring <gh@ghaering.de>
5#
6# This file is part of pysqlite.
7#
8# This software is provided 'as-is', without any express or implied
9# warranty. In no event will the authors be held liable for any damages
10# arising from the use of this software.
11#
12# Permission is granted to anyone to use this software for any purpose,
13# including commercial applications, and to alter it and redistribute it
14# freely, subject to the following restrictions:
15#
16# 1. The origin of this software must not be misrepresented; you must not
17# claim that you wrote the original software. If you use this software
18# in a product, an acknowledgment in the product documentation would be
19# appreciated but is not required.
20# 2. Altered source versions must be plainly marked as such, and must not be
21# misrepresented as being the original software.
22# 3. This notice may not be removed or altered from any source distribution.
23
24import unittest
25import threading
26import sqlite3 as sqlite
27
28class ModuleTests(unittest.TestCase):
29 def CheckAPILevel(self):
30 self.assertEqual(sqlite.apilevel, "2.0",
31 "apilevel is %s, should be 2.0" % sqlite.apilevel)
32
33 def CheckThreadSafety(self):
34 self.assertEqual(sqlite.threadsafety, 1,
35 "threadsafety is %d, should be 1" % sqlite.threadsafety)
36
37 def CheckParamStyle(self):
38 self.assertEqual(sqlite.paramstyle, "qmark",
39 "paramstyle is '%s', should be 'qmark'" %
40 sqlite.paramstyle)
41
42 def CheckWarning(self):
43 self.assert_(issubclass(sqlite.Warning, StandardError),
44 "Warning is not a subclass of StandardError")
45
46 def CheckError(self):
47 self.failUnless(issubclass(sqlite.Error, StandardError),
48 "Error is not a subclass of StandardError")
49
50 def CheckInterfaceError(self):
51 self.failUnless(issubclass(sqlite.InterfaceError, sqlite.Error),
52 "InterfaceError is not a subclass of Error")
53
54 def CheckDatabaseError(self):
55 self.failUnless(issubclass(sqlite.DatabaseError, sqlite.Error),
56 "DatabaseError is not a subclass of Error")
57
58 def CheckDataError(self):
59 self.failUnless(issubclass(sqlite.DataError, sqlite.DatabaseError),
60 "DataError is not a subclass of DatabaseError")
61
62 def CheckOperationalError(self):
63 self.failUnless(issubclass(sqlite.OperationalError, sqlite.DatabaseError),
64 "OperationalError is not a subclass of DatabaseError")
65
66 def CheckIntegrityError(self):
67 self.failUnless(issubclass(sqlite.IntegrityError, sqlite.DatabaseError),
68 "IntegrityError is not a subclass of DatabaseError")
69
70 def CheckInternalError(self):
71 self.failUnless(issubclass(sqlite.InternalError, sqlite.DatabaseError),
72 "InternalError is not a subclass of DatabaseError")
73
74 def CheckProgrammingError(self):
75 self.failUnless(issubclass(sqlite.ProgrammingError, sqlite.DatabaseError),
76 "ProgrammingError is not a subclass of DatabaseError")
77
78 def CheckNotSupportedError(self):
79 self.failUnless(issubclass(sqlite.NotSupportedError,
80 sqlite.DatabaseError),
81 "NotSupportedError is not a subclass of DatabaseError")
82
83class ConnectionTests(unittest.TestCase):
84 def setUp(self):
85 self.cx = sqlite.connect(":memory:")
86 cu = self.cx.cursor()
87 cu.execute("create table test(id integer primary key, name text)")
88 cu.execute("insert into test(name) values (?)", ("foo",))
89
90 def tearDown(self):
91 self.cx.close()
92
93 def CheckCommit(self):
94 self.cx.commit()
95
96 def CheckCommitAfterNoChanges(self):
97 """
98 A commit should also work when no changes were made to the database.
99 """
100 self.cx.commit()
101 self.cx.commit()
102
103 def CheckRollback(self):
104 self.cx.rollback()
105
106 def CheckRollbackAfterNoChanges(self):
107 """
108 A rollback should also work when no changes were made to the database.
109 """
110 self.cx.rollback()
111 self.cx.rollback()
112
113 def CheckCursor(self):
114 cu = self.cx.cursor()
115
116 def CheckFailedOpen(self):
117 YOU_CANNOT_OPEN_THIS = "/foo/bar/bla/23534/mydb.db"
118 try:
119 con = sqlite.connect(YOU_CANNOT_OPEN_THIS)
120 except sqlite.OperationalError:
121 return
122 self.fail("should have raised an OperationalError")
123
124 def CheckClose(self):
125 self.cx.close()
126
127 def CheckExceptions(self):
128 # Optional DB-API extension.
129 self.failUnlessEqual(self.cx.Warning, sqlite.Warning)
130 self.failUnlessEqual(self.cx.Error, sqlite.Error)
131 self.failUnlessEqual(self.cx.InterfaceError, sqlite.InterfaceError)
132 self.failUnlessEqual(self.cx.DatabaseError, sqlite.DatabaseError)
133 self.failUnlessEqual(self.cx.DataError, sqlite.DataError)
134 self.failUnlessEqual(self.cx.OperationalError, sqlite.OperationalError)
135 self.failUnlessEqual(self.cx.IntegrityError, sqlite.IntegrityError)
136 self.failUnlessEqual(self.cx.InternalError, sqlite.InternalError)
137 self.failUnlessEqual(self.cx.ProgrammingError, sqlite.ProgrammingError)
138 self.failUnlessEqual(self.cx.NotSupportedError, sqlite.NotSupportedError)
139
140class CursorTests(unittest.TestCase):
141 def setUp(self):
142 self.cx = sqlite.connect(":memory:")
143 self.cu = self.cx.cursor()
144 self.cu.execute("create table test(id integer primary key, name text, income number)")
145 self.cu.execute("insert into test(name) values (?)", ("foo",))
146
147 def tearDown(self):
148 self.cu.close()
149 self.cx.close()
150
151 def CheckExecuteNoArgs(self):
152 self.cu.execute("delete from test")
153
154 def CheckExecuteIllegalSql(self):
155 try:
156 self.cu.execute("select asdf")
157 self.fail("should have raised an OperationalError")
158 except sqlite.OperationalError:
159 return
160 except:
161 self.fail("raised wrong exception")
162
163 def CheckExecuteTooMuchSql(self):
164 try:
165 self.cu.execute("select 5+4; select 4+5")
166 self.fail("should have raised a Warning")
167 except sqlite.Warning:
168 return
169 except:
170 self.fail("raised wrong exception")
171
172 def CheckExecuteTooMuchSql2(self):
173 self.cu.execute("select 5+4; -- foo bar")
174
175 def CheckExecuteTooMuchSql3(self):
176 self.cu.execute("""
177 select 5+4;
178
179 /*
180 foo
181 */
182 """)
183
184 def CheckExecuteWrongSqlArg(self):
185 try:
186 self.cu.execute(42)
187 self.fail("should have raised a ValueError")
188 except ValueError:
189 return
190 except:
191 self.fail("raised wrong exception.")
192
193 def CheckExecuteArgInt(self):
194 self.cu.execute("insert into test(id) values (?)", (42,))
195
196 def CheckExecuteArgFloat(self):
197 self.cu.execute("insert into test(income) values (?)", (2500.32,))
198
199 def CheckExecuteArgString(self):
200 self.cu.execute("insert into test(name) values (?)", ("Hugo",))
201
202 def CheckExecuteWrongNoOfArgs1(self):
203 # too many parameters
204 try:
205 self.cu.execute("insert into test(id) values (?)", (17, "Egon"))
206 self.fail("should have raised ProgrammingError")
207 except sqlite.ProgrammingError:
208 pass
209
210 def CheckExecuteWrongNoOfArgs2(self):
211 # too little parameters
212 try:
213 self.cu.execute("insert into test(id) values (?)")
214 self.fail("should have raised ProgrammingError")
215 except sqlite.ProgrammingError:
216 pass
217
218 def CheckExecuteWrongNoOfArgs3(self):
219 # no parameters, parameters are needed
220 try:
221 self.cu.execute("insert into test(id) values (?)")
222 self.fail("should have raised ProgrammingError")
223 except sqlite.ProgrammingError:
224 pass
225
226 def CheckExecuteDictMapping(self):
227 self.cu.execute("insert into test(name) values ('foo')")
228 self.cu.execute("select name from test where name=:name", {"name": "foo"})
229 row = self.cu.fetchone()
230 self.failUnlessEqual(row[0], "foo")
231
232 def CheckExecuteDictMappingTooLittleArgs(self):
233 self.cu.execute("insert into test(name) values ('foo')")
234 try:
235 self.cu.execute("select name from test where name=:name and id=:id", {"name": "foo"})
236 self.fail("should have raised ProgrammingError")
237 except sqlite.ProgrammingError:
238 pass
239
240 def CheckExecuteDictMappingNoArgs(self):
241 self.cu.execute("insert into test(name) values ('foo')")
242 try:
243 self.cu.execute("select name from test where name=:name")
244 self.fail("should have raised ProgrammingError")
245 except sqlite.ProgrammingError:
246 pass
247
248 def CheckExecuteDictMappingUnnamed(self):
249 self.cu.execute("insert into test(name) values ('foo')")
250 try:
251 self.cu.execute("select name from test where name=?", {"name": "foo"})
252 self.fail("should have raised ProgrammingError")
253 except sqlite.ProgrammingError:
254 pass
255
256 def CheckClose(self):
257 self.cu.close()
258
259 def CheckRowcountExecute(self):
260 self.cu.execute("delete from test")
261 self.cu.execute("insert into test(name) values ('foo')")
262 self.cu.execute("insert into test(name) values ('foo')")
263 self.cu.execute("update test set name='bar'")
264 self.failUnlessEqual(self.cu.rowcount, 2)
265
266 def CheckRowcountExecutemany(self):
267 self.cu.execute("delete from test")
268 self.cu.executemany("insert into test(name) values (?)", [(1,), (2,), (3,)])
269 self.failUnlessEqual(self.cu.rowcount, 3)
270
271 # Checks for executemany:
272 # Sequences are required by the DB-API, iterators
273 # enhancements in pysqlite.
274
275 def CheckExecuteManySequence(self):
276 self.cu.executemany("insert into test(income) values (?)", [(x,) for x in range(100, 110)])
277
278 def CheckExecuteManyIterator(self):
279 class MyIter:
280 def __init__(self):
281 self.value = 5
282
283 def next(self):
284 if self.value == 10:
285 raise StopIteration
286 else:
287 self.value += 1
288 return (self.value,)
289
290 self.cu.executemany("insert into test(income) values (?)", MyIter())
291
292 def CheckExecuteManyGenerator(self):
293 def mygen():
294 for i in range(5):
295 yield (i,)
296
297 self.cu.executemany("insert into test(income) values (?)", mygen())
298
299 def CheckExecuteManyWrongSqlArg(self):
300 try:
301 self.cu.executemany(42, [(3,)])
302 self.fail("should have raised a ValueError")
303 except ValueError:
304 return
305 except:
306 self.fail("raised wrong exception.")
307
308 def CheckExecuteManySelect(self):
309 try:
310 self.cu.executemany("select ?", [(3,)])
311 self.fail("should have raised a ProgrammingError")
312 except sqlite.ProgrammingError:
313 return
314 except:
315 self.fail("raised wrong exception.")
316
317 def CheckExecuteManyNotIterable(self):
318 try:
319 self.cu.executemany("insert into test(income) values (?)", 42)
320 self.fail("should have raised a TypeError")
321 except TypeError:
322 return
323 except Exception, e:
324 print "raised", e.__class__
325 self.fail("raised wrong exception.")
326
327 def CheckFetchIter(self):
328 # Optional DB-API extension.
329 self.cu.execute("delete from test")
330 self.cu.execute("insert into test(id) values (?)", (5,))
331 self.cu.execute("insert into test(id) values (?)", (6,))
332 self.cu.execute("select id from test order by id")
333 lst = []
334 for row in self.cu:
335 lst.append(row[0])
336 self.failUnlessEqual(lst[0], 5)
337 self.failUnlessEqual(lst[1], 6)
338
339 def CheckFetchone(self):
340 self.cu.execute("select name from test")
341 row = self.cu.fetchone()
342 self.failUnlessEqual(row[0], "foo")
343 row = self.cu.fetchone()
344 self.failUnlessEqual(row, None)
345
346 def CheckFetchoneNoStatement(self):
347 cur = self.cx.cursor()
348 row = cur.fetchone()
349 self.failUnlessEqual(row, None)
350
351 def CheckArraySize(self):
352 # must default ot 1
353 self.failUnlessEqual(self.cu.arraysize, 1)
354
355 # now set to 2
356 self.cu.arraysize = 2
357
358 # now make the query return 3 rows
359 self.cu.execute("delete from test")
360 self.cu.execute("insert into test(name) values ('A')")
361 self.cu.execute("insert into test(name) values ('B')")
362 self.cu.execute("insert into test(name) values ('C')")
363 self.cu.execute("select name from test")
364 res = self.cu.fetchmany()
365
366 self.failUnlessEqual(len(res), 2)
367
368 def CheckFetchmany(self):
369 self.cu.execute("select name from test")
370 res = self.cu.fetchmany(100)
371 self.failUnlessEqual(len(res), 1)
372 res = self.cu.fetchmany(100)
373 self.failUnlessEqual(res, [])
374
375 def CheckFetchall(self):
376 self.cu.execute("select name from test")
377 res = self.cu.fetchall()
378 self.failUnlessEqual(len(res), 1)
379 res = self.cu.fetchall()
380 self.failUnlessEqual(res, [])
381
382 def CheckSetinputsizes(self):
383 self.cu.setinputsizes([3, 4, 5])
384
385 def CheckSetoutputsize(self):
386 self.cu.setoutputsize(5, 0)
387
388 def CheckSetoutputsizeNoColumn(self):
389 self.cu.setoutputsize(42)
390
391 def CheckCursorConnection(self):
392 # Optional DB-API extension.
393 self.failUnlessEqual(self.cu.connection, self.cx)
394
395 def CheckWrongCursorCallable(self):
396 try:
397 def f(): pass
398 cur = self.cx.cursor(f)
399 self.fail("should have raised a TypeError")
400 except TypeError:
401 return
402 self.fail("should have raised a ValueError")
403
404 def CheckCursorWrongClass(self):
405 class Foo: pass
406 foo = Foo()
407 try:
408 cur = sqlite.Cursor(foo)
409 self.fail("should have raised a ValueError")
410 except TypeError:
411 pass
412
413class ThreadTests(unittest.TestCase):
414 def setUp(self):
415 self.con = sqlite.connect(":memory:")
416 self.cur = self.con.cursor()
417 self.cur.execute("create table test(id integer primary key, name text, bin binary, ratio number, ts timestamp)")
418
419 def tearDown(self):
420 self.cur.close()
421 self.con.close()
422
423 def CheckConCursor(self):
424 def run(con, errors):
425 try:
426 cur = con.cursor()
427 errors.append("did not raise ProgrammingError")
428 return
429 except sqlite.ProgrammingError:
430 return
431 except:
432 errors.append("raised wrong exception")
433
434 errors = []
435 t = threading.Thread(target=run, kwargs={"con": self.con, "errors": errors})
436 t.start()
437 t.join()
438 if len(errors) > 0:
439 self.fail("\n".join(errors))
440
441 def CheckConCommit(self):
442 def run(con, errors):
443 try:
444 con.commit()
445 errors.append("did not raise ProgrammingError")
446 return
447 except sqlite.ProgrammingError:
448 return
449 except:
450 errors.append("raised wrong exception")
451
452 errors = []
453 t = threading.Thread(target=run, kwargs={"con": self.con, "errors": errors})
454 t.start()
455 t.join()
456 if len(errors) > 0:
457 self.fail("\n".join(errors))
458
459 def CheckConRollback(self):
460 def run(con, errors):
461 try:
462 con.rollback()
463 errors.append("did not raise ProgrammingError")
464 return
465 except sqlite.ProgrammingError:
466 return
467 except:
468 errors.append("raised wrong exception")
469
470 errors = []
471 t = threading.Thread(target=run, kwargs={"con": self.con, "errors": errors})
472 t.start()
473 t.join()
474 if len(errors) > 0:
475 self.fail("\n".join(errors))
476
477 def CheckConClose(self):
478 def run(con, errors):
479 try:
480 con.close()
481 errors.append("did not raise ProgrammingError")
482 return
483 except sqlite.ProgrammingError:
484 return
485 except:
486 errors.append("raised wrong exception")
487
488 errors = []
489 t = threading.Thread(target=run, kwargs={"con": self.con, "errors": errors})
490 t.start()
491 t.join()
492 if len(errors) > 0:
493 self.fail("\n".join(errors))
494
495 def CheckCurImplicitBegin(self):
496 def run(cur, errors):
497 try:
498 cur.execute("insert into test(name) values ('a')")
499 errors.append("did not raise ProgrammingError")
500 return
501 except sqlite.ProgrammingError:
502 return
503 except:
504 errors.append("raised wrong exception")
505
506 errors = []
507 t = threading.Thread(target=run, kwargs={"cur": self.cur, "errors": errors})
508 t.start()
509 t.join()
510 if len(errors) > 0:
511 self.fail("\n".join(errors))
512
513 def CheckCurClose(self):
514 def run(cur, errors):
515 try:
516 cur.close()
517 errors.append("did not raise ProgrammingError")
518 return
519 except sqlite.ProgrammingError:
520 return
521 except:
522 errors.append("raised wrong exception")
523
524 errors = []
525 t = threading.Thread(target=run, kwargs={"cur": self.cur, "errors": errors})
526 t.start()
527 t.join()
528 if len(errors) > 0:
529 self.fail("\n".join(errors))
530
531 def CheckCurExecute(self):
532 def run(cur, errors):
533 try:
534 cur.execute("select name from test")
535 errors.append("did not raise ProgrammingError")
536 return
537 except sqlite.ProgrammingError:
538 return
539 except:
540 errors.append("raised wrong exception")
541
542 errors = []
543 self.cur.execute("insert into test(name) values ('a')")
544 t = threading.Thread(target=run, kwargs={"cur": self.cur, "errors": errors})
545 t.start()
546 t.join()
547 if len(errors) > 0:
548 self.fail("\n".join(errors))
549
550 def CheckCurIterNext(self):
551 def run(cur, errors):
552 try:
553 row = cur.fetchone()
554 errors.append("did not raise ProgrammingError")
555 return
556 except sqlite.ProgrammingError:
557 return
558 except:
559 errors.append("raised wrong exception")
560
561 errors = []
562 self.cur.execute("insert into test(name) values ('a')")
563 self.cur.execute("select name from test")
564 t = threading.Thread(target=run, kwargs={"cur": self.cur, "errors": errors})
565 t.start()
566 t.join()
567 if len(errors) > 0:
568 self.fail("\n".join(errors))
569
570class ConstructorTests(unittest.TestCase):
571 def CheckDate(self):
572 d = sqlite.Date(2004, 10, 28)
573
574 def CheckTime(self):
575 t = sqlite.Time(12, 39, 35)
576
577 def CheckTimestamp(self):
578 ts = sqlite.Timestamp(2004, 10, 28, 12, 39, 35)
579
580 def CheckDateFromTicks(self):
581 d = sqlite.DateFromTicks(42)
582
583 def CheckTimeFromTicks(self):
584 t = sqlite.TimeFromTicks(42)
585
586 def CheckTimestampFromTicks(self):
587 ts = sqlite.TimestampFromTicks(42)
588
589 def CheckBinary(self):
590 b = sqlite.Binary(chr(0) + "'")
591
592class ExtensionTests(unittest.TestCase):
593 def CheckScriptStringSql(self):
594 con = sqlite.connect(":memory:")
595 cur = con.cursor()
596 cur.executescript("""
597 -- bla bla
598 /* a stupid comment */
599 create table a(i);
600 insert into a(i) values (5);
601 """)
602 cur.execute("select i from a")
603 res = cur.fetchone()[0]
604 self.failUnlessEqual(res, 5)
605
606 def CheckScriptStringUnicode(self):
607 con = sqlite.connect(":memory:")
608 cur = con.cursor()
609 cur.executescript(u"""
610 create table a(i);
611 insert into a(i) values (5);
612 select i from a;
613 delete from a;
614 insert into a(i) values (6);
615 """)
616 cur.execute("select i from a")
617 res = cur.fetchone()[0]
618 self.failUnlessEqual(res, 6)
619
620 def CheckScriptErrorIncomplete(self):
621 con = sqlite.connect(":memory:")
622 cur = con.cursor()
623 raised = False
624 try:
625 cur.executescript("create table test(sadfsadfdsa")
626 except sqlite.ProgrammingError:
627 raised = True
628 self.failUnlessEqual(raised, True, "should have raised an exception")
629
630 def CheckScriptErrorNormal(self):
631 con = sqlite.connect(":memory:")
632 cur = con.cursor()
633 raised = False
634 try:
635 cur.executescript("create table test(sadfsadfdsa); select foo from hurz;")
636 except sqlite.OperationalError:
637 raised = True
638 self.failUnlessEqual(raised, True, "should have raised an exception")
639
640 def CheckConnectionExecute(self):
641 con = sqlite.connect(":memory:")
642 result = con.execute("select 5").fetchone()[0]
643 self.failUnlessEqual(result, 5, "Basic test of Connection.execute")
644
645 def CheckConnectionExecutemany(self):
646 con = sqlite.connect(":memory:")
647 con.execute("create table test(foo)")
648 con.executemany("insert into test(foo) values (?)", [(3,), (4,)])
649 result = con.execute("select foo from test order by foo").fetchall()
650 self.failUnlessEqual(result[0][0], 3, "Basic test of Connection.executemany")
651 self.failUnlessEqual(result[1][0], 4, "Basic test of Connection.executemany")
652
653 def CheckConnectionExecutescript(self):
654 con = sqlite.connect(":memory:")
655 con.executescript("create table test(foo); insert into test(foo) values (5);")
656 result = con.execute("select foo from test").fetchone()[0]
657 self.failUnlessEqual(result, 5, "Basic test of Connection.executescript")
658
659class ClosedTests(unittest.TestCase):
660 def setUp(self):
661 pass
662
663 def tearDown(self):
664 pass
665
666 def CheckClosedConCursor(self):
667 con = sqlite.connect(":memory:")
668 con.close()
669 try:
670 cur = con.cursor()
671 self.fail("Should have raised a ProgrammingError")
672 except sqlite.ProgrammingError:
673 pass
674 except:
675 self.fail("Should have raised a ProgrammingError")
676
677 def CheckClosedConCommit(self):
678 con = sqlite.connect(":memory:")
679 con.close()
680 try:
681 con.commit()
682 self.fail("Should have raised a ProgrammingError")
683 except sqlite.ProgrammingError:
684 pass
685 except:
686 self.fail("Should have raised a ProgrammingError")
687
688 def CheckClosedConRollback(self):
689 con = sqlite.connect(":memory:")
690 con.close()
691 try:
692 con.rollback()
693 self.fail("Should have raised a ProgrammingError")
694 except sqlite.ProgrammingError:
695 pass
696 except:
697 self.fail("Should have raised a ProgrammingError")
698
699 def CheckClosedCurExecute(self):
700 con = sqlite.connect(":memory:")
701 cur = con.cursor()
702 con.close()
703 try:
704 cur.execute("select 4")
705 self.fail("Should have raised a ProgrammingError")
706 except sqlite.ProgrammingError:
707 pass
708 except:
709 self.fail("Should have raised a ProgrammingError")
710
711def suite():
712 module_suite = unittest.makeSuite(ModuleTests, "Check")
713 connection_suite = unittest.makeSuite(ConnectionTests, "Check")
714 cursor_suite = unittest.makeSuite(CursorTests, "Check")
715 thread_suite = unittest.makeSuite(ThreadTests, "Check")
716 constructor_suite = unittest.makeSuite(ConstructorTests, "Check")
717 ext_suite = unittest.makeSuite(ExtensionTests, "Check")
718 closed_suite = unittest.makeSuite(ClosedTests, "Check")
719 return unittest.TestSuite((module_suite, connection_suite, cursor_suite, thread_suite, constructor_suite, ext_suite, closed_suite))
720
721def test():
722 runner = unittest.TextTestRunner()
723 runner.run(suite())
724
725if __name__ == "__main__":
726 test()