blob: 615ebf51069bef05dddb6a371f3803a3bda28269 [file] [log] [blame]
Anthony Baxterc51ee692006-04-01 00:57:31 +00001#-*- coding: ISO-8859-1 -*-
2# pysqlite2/test/dbapi.py: tests for DB-API compliance
3#
Gerhard Häring3bbb6722010-03-05 09:12:37 +00004# Copyright (C) 2004-2010 Gerhard Häring <gh@ghaering.de>
Anthony Baxterc51ee692006-04-01 00:57:31 +00005#
6# This file is part of pysqlite.
7#
8# This software is provided 'as-is', without any express or implied
9# warranty. In no event will the authors be held liable for any damages
10# arising from the use of this software.
11#
12# Permission is granted to anyone to use this software for any purpose,
13# including commercial applications, and to alter it and redistribute it
14# freely, subject to the following restrictions:
15#
16# 1. The origin of this software must not be misrepresented; you must not
17# claim that you wrote the original software. If you use this software
18# in a product, an acknowledgment in the product documentation would be
19# appreciated but is not required.
20# 2. Altered source versions must be plainly marked as such, and must not be
21# misrepresented as being the original software.
22# 3. This notice may not be removed or altered from any source distribution.
23
24import unittest
Gerhard Häring1cc60ed2008-02-29 22:08:41 +000025import sys
Anthony Baxterc51ee692006-04-01 00:57:31 +000026import sqlite3 as sqlite
Victor Stinnera3acea32014-09-05 21:05:05 +020027from test import test_support
Victor Stinner6a102812010-04-27 23:55:59 +000028try:
29 import threading
30except ImportError:
31 threading = None
Anthony Baxterc51ee692006-04-01 00:57:31 +000032
33class ModuleTests(unittest.TestCase):
34 def CheckAPILevel(self):
35 self.assertEqual(sqlite.apilevel, "2.0",
36 "apilevel is %s, should be 2.0" % sqlite.apilevel)
37
38 def CheckThreadSafety(self):
39 self.assertEqual(sqlite.threadsafety, 1,
40 "threadsafety is %d, should be 1" % sqlite.threadsafety)
41
42 def CheckParamStyle(self):
43 self.assertEqual(sqlite.paramstyle, "qmark",
44 "paramstyle is '%s', should be 'qmark'" %
45 sqlite.paramstyle)
46
47 def CheckWarning(self):
Ezio Melotti2623a372010-11-21 13:34:58 +000048 self.assertTrue(issubclass(sqlite.Warning, StandardError),
49 "Warning is not a subclass of StandardError")
Anthony Baxterc51ee692006-04-01 00:57:31 +000050
51 def CheckError(self):
Gregory P. Smith1844b0d2009-07-04 08:42:10 +000052 self.assertTrue(issubclass(sqlite.Error, StandardError),
Anthony Baxterc51ee692006-04-01 00:57:31 +000053 "Error is not a subclass of StandardError")
54
55 def CheckInterfaceError(self):
Gregory P. Smith1844b0d2009-07-04 08:42:10 +000056 self.assertTrue(issubclass(sqlite.InterfaceError, sqlite.Error),
Anthony Baxterc51ee692006-04-01 00:57:31 +000057 "InterfaceError is not a subclass of Error")
58
59 def CheckDatabaseError(self):
Gregory P. Smith1844b0d2009-07-04 08:42:10 +000060 self.assertTrue(issubclass(sqlite.DatabaseError, sqlite.Error),
Anthony Baxterc51ee692006-04-01 00:57:31 +000061 "DatabaseError is not a subclass of Error")
62
63 def CheckDataError(self):
Gregory P. Smith1844b0d2009-07-04 08:42:10 +000064 self.assertTrue(issubclass(sqlite.DataError, sqlite.DatabaseError),
Anthony Baxterc51ee692006-04-01 00:57:31 +000065 "DataError is not a subclass of DatabaseError")
66
67 def CheckOperationalError(self):
Gregory P. Smith1844b0d2009-07-04 08:42:10 +000068 self.assertTrue(issubclass(sqlite.OperationalError, sqlite.DatabaseError),
Anthony Baxterc51ee692006-04-01 00:57:31 +000069 "OperationalError is not a subclass of DatabaseError")
70
71 def CheckIntegrityError(self):
Gregory P. Smith1844b0d2009-07-04 08:42:10 +000072 self.assertTrue(issubclass(sqlite.IntegrityError, sqlite.DatabaseError),
Anthony Baxterc51ee692006-04-01 00:57:31 +000073 "IntegrityError is not a subclass of DatabaseError")
74
75 def CheckInternalError(self):
Gregory P. Smith1844b0d2009-07-04 08:42:10 +000076 self.assertTrue(issubclass(sqlite.InternalError, sqlite.DatabaseError),
Anthony Baxterc51ee692006-04-01 00:57:31 +000077 "InternalError is not a subclass of DatabaseError")
78
79 def CheckProgrammingError(self):
Gregory P. Smith1844b0d2009-07-04 08:42:10 +000080 self.assertTrue(issubclass(sqlite.ProgrammingError, sqlite.DatabaseError),
Anthony Baxterc51ee692006-04-01 00:57:31 +000081 "ProgrammingError is not a subclass of DatabaseError")
82
83 def CheckNotSupportedError(self):
Gregory P. Smith1844b0d2009-07-04 08:42:10 +000084 self.assertTrue(issubclass(sqlite.NotSupportedError,
Anthony Baxterc51ee692006-04-01 00:57:31 +000085 sqlite.DatabaseError),
86 "NotSupportedError is not a subclass of DatabaseError")
87
88class ConnectionTests(unittest.TestCase):
89 def setUp(self):
90 self.cx = sqlite.connect(":memory:")
91 cu = self.cx.cursor()
92 cu.execute("create table test(id integer primary key, name text)")
93 cu.execute("insert into test(name) values (?)", ("foo",))
94
95 def tearDown(self):
96 self.cx.close()
97
98 def CheckCommit(self):
99 self.cx.commit()
100
101 def CheckCommitAfterNoChanges(self):
102 """
103 A commit should also work when no changes were made to the database.
104 """
105 self.cx.commit()
106 self.cx.commit()
107
108 def CheckRollback(self):
109 self.cx.rollback()
110
111 def CheckRollbackAfterNoChanges(self):
112 """
113 A rollback should also work when no changes were made to the database.
114 """
115 self.cx.rollback()
116 self.cx.rollback()
117
118 def CheckCursor(self):
119 cu = self.cx.cursor()
120
121 def CheckFailedOpen(self):
122 YOU_CANNOT_OPEN_THIS = "/foo/bar/bla/23534/mydb.db"
123 try:
124 con = sqlite.connect(YOU_CANNOT_OPEN_THIS)
125 except sqlite.OperationalError:
126 return
127 self.fail("should have raised an OperationalError")
128
129 def CheckClose(self):
130 self.cx.close()
131
132 def CheckExceptions(self):
133 # Optional DB-API extension.
Gregory P. Smith1844b0d2009-07-04 08:42:10 +0000134 self.assertEqual(self.cx.Warning, sqlite.Warning)
135 self.assertEqual(self.cx.Error, sqlite.Error)
136 self.assertEqual(self.cx.InterfaceError, sqlite.InterfaceError)
137 self.assertEqual(self.cx.DatabaseError, sqlite.DatabaseError)
138 self.assertEqual(self.cx.DataError, sqlite.DataError)
139 self.assertEqual(self.cx.OperationalError, sqlite.OperationalError)
140 self.assertEqual(self.cx.IntegrityError, sqlite.IntegrityError)
141 self.assertEqual(self.cx.InternalError, sqlite.InternalError)
142 self.assertEqual(self.cx.ProgrammingError, sqlite.ProgrammingError)
143 self.assertEqual(self.cx.NotSupportedError, sqlite.NotSupportedError)
Anthony Baxterc51ee692006-04-01 00:57:31 +0000144
145class CursorTests(unittest.TestCase):
146 def setUp(self):
147 self.cx = sqlite.connect(":memory:")
148 self.cu = self.cx.cursor()
149 self.cu.execute("create table test(id integer primary key, name text, income number)")
150 self.cu.execute("insert into test(name) values (?)", ("foo",))
151
152 def tearDown(self):
153 self.cu.close()
154 self.cx.close()
155
156 def CheckExecuteNoArgs(self):
157 self.cu.execute("delete from test")
158
159 def CheckExecuteIllegalSql(self):
160 try:
161 self.cu.execute("select asdf")
162 self.fail("should have raised an OperationalError")
163 except sqlite.OperationalError:
164 return
165 except:
166 self.fail("raised wrong exception")
167
168 def CheckExecuteTooMuchSql(self):
169 try:
170 self.cu.execute("select 5+4; select 4+5")
171 self.fail("should have raised a Warning")
172 except sqlite.Warning:
173 return
174 except:
175 self.fail("raised wrong exception")
176
177 def CheckExecuteTooMuchSql2(self):
178 self.cu.execute("select 5+4; -- foo bar")
179
180 def CheckExecuteTooMuchSql3(self):
181 self.cu.execute("""
182 select 5+4;
183
184 /*
185 foo
186 */
187 """)
188
189 def CheckExecuteWrongSqlArg(self):
190 try:
191 self.cu.execute(42)
192 self.fail("should have raised a ValueError")
193 except ValueError:
194 return
195 except:
196 self.fail("raised wrong exception.")
197
198 def CheckExecuteArgInt(self):
199 self.cu.execute("insert into test(id) values (?)", (42,))
200
201 def CheckExecuteArgFloat(self):
202 self.cu.execute("insert into test(income) values (?)", (2500.32,))
203
204 def CheckExecuteArgString(self):
205 self.cu.execute("insert into test(name) values (?)", ("Hugo",))
206
Petri Lehtinen0518f472012-02-01 22:20:12 +0200207 def CheckExecuteArgStringWithZeroByte(self):
208 self.cu.execute("insert into test(name) values (?)", ("Hu\x00go",))
209
210 self.cu.execute("select name from test where id=?", (self.cu.lastrowid,))
211 row = self.cu.fetchone()
212 self.assertEqual(row[0], "Hu\x00go")
213
Anthony Baxterc51ee692006-04-01 00:57:31 +0000214 def CheckExecuteWrongNoOfArgs1(self):
215 # too many parameters
216 try:
217 self.cu.execute("insert into test(id) values (?)", (17, "Egon"))
218 self.fail("should have raised ProgrammingError")
219 except sqlite.ProgrammingError:
220 pass
221
222 def CheckExecuteWrongNoOfArgs2(self):
223 # too little parameters
224 try:
225 self.cu.execute("insert into test(id) values (?)")
226 self.fail("should have raised ProgrammingError")
227 except sqlite.ProgrammingError:
228 pass
229
230 def CheckExecuteWrongNoOfArgs3(self):
231 # no parameters, parameters are needed
232 try:
233 self.cu.execute("insert into test(id) values (?)")
234 self.fail("should have raised ProgrammingError")
235 except sqlite.ProgrammingError:
236 pass
237
Gerhard Häring1cc60ed2008-02-29 22:08:41 +0000238 def CheckExecuteParamList(self):
239 self.cu.execute("insert into test(name) values ('foo')")
240 self.cu.execute("select name from test where name=?", ["foo"])
241 row = self.cu.fetchone()
Gregory P. Smith1844b0d2009-07-04 08:42:10 +0000242 self.assertEqual(row[0], "foo")
Gerhard Häring1cc60ed2008-02-29 22:08:41 +0000243
244 def CheckExecuteParamSequence(self):
245 class L(object):
246 def __len__(self):
247 return 1
248 def __getitem__(self, x):
249 assert x == 0
250 return "foo"
251
252 self.cu.execute("insert into test(name) values ('foo')")
253 self.cu.execute("select name from test where name=?", L())
254 row = self.cu.fetchone()
Gregory P. Smith1844b0d2009-07-04 08:42:10 +0000255 self.assertEqual(row[0], "foo")
Gerhard Häring1cc60ed2008-02-29 22:08:41 +0000256
Anthony Baxterc51ee692006-04-01 00:57:31 +0000257 def CheckExecuteDictMapping(self):
258 self.cu.execute("insert into test(name) values ('foo')")
259 self.cu.execute("select name from test where name=:name", {"name": "foo"})
260 row = self.cu.fetchone()
Gregory P. Smith1844b0d2009-07-04 08:42:10 +0000261 self.assertEqual(row[0], "foo")
Anthony Baxterc51ee692006-04-01 00:57:31 +0000262
Gerhard Häring1cc60ed2008-02-29 22:08:41 +0000263 def CheckExecuteDictMapping_Mapping(self):
264 # Test only works with Python 2.5 or later
265 if sys.version_info < (2, 5, 0):
266 return
267
268 class D(dict):
269 def __missing__(self, key):
270 return "foo"
271
272 self.cu.execute("insert into test(name) values ('foo')")
273 self.cu.execute("select name from test where name=:name", D())
274 row = self.cu.fetchone()
Gregory P. Smith1844b0d2009-07-04 08:42:10 +0000275 self.assertEqual(row[0], "foo")
Gerhard Häring1cc60ed2008-02-29 22:08:41 +0000276
Anthony Baxterc51ee692006-04-01 00:57:31 +0000277 def CheckExecuteDictMappingTooLittleArgs(self):
278 self.cu.execute("insert into test(name) values ('foo')")
279 try:
280 self.cu.execute("select name from test where name=:name and id=:id", {"name": "foo"})
281 self.fail("should have raised ProgrammingError")
282 except sqlite.ProgrammingError:
283 pass
284
285 def CheckExecuteDictMappingNoArgs(self):
286 self.cu.execute("insert into test(name) values ('foo')")
287 try:
288 self.cu.execute("select name from test where name=:name")
289 self.fail("should have raised ProgrammingError")
290 except sqlite.ProgrammingError:
291 pass
292
293 def CheckExecuteDictMappingUnnamed(self):
294 self.cu.execute("insert into test(name) values ('foo')")
295 try:
296 self.cu.execute("select name from test where name=?", {"name": "foo"})
297 self.fail("should have raised ProgrammingError")
298 except sqlite.ProgrammingError:
299 pass
300
301 def CheckClose(self):
302 self.cu.close()
303
304 def CheckRowcountExecute(self):
305 self.cu.execute("delete from test")
306 self.cu.execute("insert into test(name) values ('foo')")
307 self.cu.execute("insert into test(name) values ('foo')")
308 self.cu.execute("update test set name='bar'")
Gregory P. Smith1844b0d2009-07-04 08:42:10 +0000309 self.assertEqual(self.cu.rowcount, 2)
Anthony Baxterc51ee692006-04-01 00:57:31 +0000310
Gerhard Häring7f7ca352008-05-31 21:33:27 +0000311 def CheckRowcountSelect(self):
312 """
313 pysqlite does not know the rowcount of SELECT statements, because we
314 don't fetch all rows after executing the select statement. The rowcount
315 has thus to be -1.
316 """
317 self.cu.execute("select 5 union select 6")
Gregory P. Smith1844b0d2009-07-04 08:42:10 +0000318 self.assertEqual(self.cu.rowcount, -1)
Gerhard Häring7f7ca352008-05-31 21:33:27 +0000319
Anthony Baxterc51ee692006-04-01 00:57:31 +0000320 def CheckRowcountExecutemany(self):
321 self.cu.execute("delete from test")
322 self.cu.executemany("insert into test(name) values (?)", [(1,), (2,), (3,)])
Gregory P. Smith1844b0d2009-07-04 08:42:10 +0000323 self.assertEqual(self.cu.rowcount, 3)
Anthony Baxterc51ee692006-04-01 00:57:31 +0000324
Anthony Baxter72289a62006-04-04 06:29:05 +0000325 def CheckTotalChanges(self):
326 self.cu.execute("insert into test(name) values ('foo')")
327 self.cu.execute("insert into test(name) values ('foo')")
328 if self.cx.total_changes < 2:
329 self.fail("total changes reported wrong value")
330
Anthony Baxterc51ee692006-04-01 00:57:31 +0000331 # Checks for executemany:
332 # Sequences are required by the DB-API, iterators
333 # enhancements in pysqlite.
334
335 def CheckExecuteManySequence(self):
336 self.cu.executemany("insert into test(income) values (?)", [(x,) for x in range(100, 110)])
337
338 def CheckExecuteManyIterator(self):
339 class MyIter:
340 def __init__(self):
341 self.value = 5
342
343 def next(self):
344 if self.value == 10:
345 raise StopIteration
346 else:
347 self.value += 1
348 return (self.value,)
349
350 self.cu.executemany("insert into test(income) values (?)", MyIter())
351
352 def CheckExecuteManyGenerator(self):
353 def mygen():
354 for i in range(5):
355 yield (i,)
356
357 self.cu.executemany("insert into test(income) values (?)", mygen())
358
359 def CheckExecuteManyWrongSqlArg(self):
360 try:
361 self.cu.executemany(42, [(3,)])
362 self.fail("should have raised a ValueError")
363 except ValueError:
364 return
365 except:
366 self.fail("raised wrong exception.")
367
368 def CheckExecuteManySelect(self):
369 try:
370 self.cu.executemany("select ?", [(3,)])
371 self.fail("should have raised a ProgrammingError")
372 except sqlite.ProgrammingError:
373 return
374 except:
375 self.fail("raised wrong exception.")
376
377 def CheckExecuteManyNotIterable(self):
378 try:
379 self.cu.executemany("insert into test(income) values (?)", 42)
380 self.fail("should have raised a TypeError")
381 except TypeError:
382 return
383 except Exception, e:
384 print "raised", e.__class__
385 self.fail("raised wrong exception.")
386
387 def CheckFetchIter(self):
388 # Optional DB-API extension.
389 self.cu.execute("delete from test")
390 self.cu.execute("insert into test(id) values (?)", (5,))
391 self.cu.execute("insert into test(id) values (?)", (6,))
392 self.cu.execute("select id from test order by id")
393 lst = []
394 for row in self.cu:
395 lst.append(row[0])
Gregory P. Smith1844b0d2009-07-04 08:42:10 +0000396 self.assertEqual(lst[0], 5)
397 self.assertEqual(lst[1], 6)
Anthony Baxterc51ee692006-04-01 00:57:31 +0000398
399 def CheckFetchone(self):
400 self.cu.execute("select name from test")
401 row = self.cu.fetchone()
Gregory P. Smith1844b0d2009-07-04 08:42:10 +0000402 self.assertEqual(row[0], "foo")
Anthony Baxterc51ee692006-04-01 00:57:31 +0000403 row = self.cu.fetchone()
Gregory P. Smith1844b0d2009-07-04 08:42:10 +0000404 self.assertEqual(row, None)
Anthony Baxterc51ee692006-04-01 00:57:31 +0000405
406 def CheckFetchoneNoStatement(self):
407 cur = self.cx.cursor()
408 row = cur.fetchone()
Gregory P. Smith1844b0d2009-07-04 08:42:10 +0000409 self.assertEqual(row, None)
Anthony Baxterc51ee692006-04-01 00:57:31 +0000410
411 def CheckArraySize(self):
412 # must default ot 1
Gregory P. Smith1844b0d2009-07-04 08:42:10 +0000413 self.assertEqual(self.cu.arraysize, 1)
Anthony Baxterc51ee692006-04-01 00:57:31 +0000414
415 # now set to 2
416 self.cu.arraysize = 2
417
418 # now make the query return 3 rows
419 self.cu.execute("delete from test")
420 self.cu.execute("insert into test(name) values ('A')")
421 self.cu.execute("insert into test(name) values ('B')")
422 self.cu.execute("insert into test(name) values ('C')")
423 self.cu.execute("select name from test")
424 res = self.cu.fetchmany()
425
Gregory P. Smith1844b0d2009-07-04 08:42:10 +0000426 self.assertEqual(len(res), 2)
Anthony Baxterc51ee692006-04-01 00:57:31 +0000427
428 def CheckFetchmany(self):
429 self.cu.execute("select name from test")
430 res = self.cu.fetchmany(100)
Gregory P. Smith1844b0d2009-07-04 08:42:10 +0000431 self.assertEqual(len(res), 1)
Anthony Baxterc51ee692006-04-01 00:57:31 +0000432 res = self.cu.fetchmany(100)
Gregory P. Smith1844b0d2009-07-04 08:42:10 +0000433 self.assertEqual(res, [])
Anthony Baxterc51ee692006-04-01 00:57:31 +0000434
Gerhard Häring1cc60ed2008-02-29 22:08:41 +0000435 def CheckFetchmanyKwArg(self):
436 """Checks if fetchmany works with keyword arguments"""
437 self.cu.execute("select name from test")
438 res = self.cu.fetchmany(size=100)
Gregory P. Smith1844b0d2009-07-04 08:42:10 +0000439 self.assertEqual(len(res), 1)
Gerhard Häring1cc60ed2008-02-29 22:08:41 +0000440
Anthony Baxterc51ee692006-04-01 00:57:31 +0000441 def CheckFetchall(self):
442 self.cu.execute("select name from test")
443 res = self.cu.fetchall()
Gregory P. Smith1844b0d2009-07-04 08:42:10 +0000444 self.assertEqual(len(res), 1)
Anthony Baxterc51ee692006-04-01 00:57:31 +0000445 res = self.cu.fetchall()
Gregory P. Smith1844b0d2009-07-04 08:42:10 +0000446 self.assertEqual(res, [])
Anthony Baxterc51ee692006-04-01 00:57:31 +0000447
448 def CheckSetinputsizes(self):
449 self.cu.setinputsizes([3, 4, 5])
450
451 def CheckSetoutputsize(self):
452 self.cu.setoutputsize(5, 0)
453
454 def CheckSetoutputsizeNoColumn(self):
455 self.cu.setoutputsize(42)
456
457 def CheckCursorConnection(self):
458 # Optional DB-API extension.
Gregory P. Smith1844b0d2009-07-04 08:42:10 +0000459 self.assertEqual(self.cu.connection, self.cx)
Anthony Baxterc51ee692006-04-01 00:57:31 +0000460
461 def CheckWrongCursorCallable(self):
462 try:
463 def f(): pass
464 cur = self.cx.cursor(f)
465 self.fail("should have raised a TypeError")
466 except TypeError:
467 return
468 self.fail("should have raised a ValueError")
469
470 def CheckCursorWrongClass(self):
471 class Foo: pass
472 foo = Foo()
473 try:
474 cur = sqlite.Cursor(foo)
475 self.fail("should have raised a ValueError")
476 except TypeError:
477 pass
478
Victor Stinner6a102812010-04-27 23:55:59 +0000479@unittest.skipUnless(threading, 'This test requires threading.')
Anthony Baxterc51ee692006-04-01 00:57:31 +0000480class ThreadTests(unittest.TestCase):
481 def setUp(self):
482 self.con = sqlite.connect(":memory:")
483 self.cur = self.con.cursor()
484 self.cur.execute("create table test(id integer primary key, name text, bin binary, ratio number, ts timestamp)")
485
486 def tearDown(self):
487 self.cur.close()
488 self.con.close()
489
490 def CheckConCursor(self):
491 def run(con, errors):
492 try:
493 cur = con.cursor()
494 errors.append("did not raise ProgrammingError")
495 return
496 except sqlite.ProgrammingError:
497 return
498 except:
499 errors.append("raised wrong exception")
500
501 errors = []
502 t = threading.Thread(target=run, kwargs={"con": self.con, "errors": errors})
503 t.start()
504 t.join()
505 if len(errors) > 0:
506 self.fail("\n".join(errors))
507
508 def CheckConCommit(self):
509 def run(con, errors):
510 try:
511 con.commit()
512 errors.append("did not raise ProgrammingError")
513 return
514 except sqlite.ProgrammingError:
515 return
516 except:
517 errors.append("raised wrong exception")
518
519 errors = []
520 t = threading.Thread(target=run, kwargs={"con": self.con, "errors": errors})
521 t.start()
522 t.join()
523 if len(errors) > 0:
524 self.fail("\n".join(errors))
525
526 def CheckConRollback(self):
527 def run(con, errors):
528 try:
529 con.rollback()
530 errors.append("did not raise ProgrammingError")
531 return
532 except sqlite.ProgrammingError:
533 return
534 except:
535 errors.append("raised wrong exception")
536
537 errors = []
538 t = threading.Thread(target=run, kwargs={"con": self.con, "errors": errors})
539 t.start()
540 t.join()
541 if len(errors) > 0:
542 self.fail("\n".join(errors))
543
544 def CheckConClose(self):
545 def run(con, errors):
546 try:
547 con.close()
548 errors.append("did not raise ProgrammingError")
549 return
550 except sqlite.ProgrammingError:
551 return
552 except:
553 errors.append("raised wrong exception")
554
555 errors = []
556 t = threading.Thread(target=run, kwargs={"con": self.con, "errors": errors})
557 t.start()
558 t.join()
559 if len(errors) > 0:
560 self.fail("\n".join(errors))
561
562 def CheckCurImplicitBegin(self):
563 def run(cur, errors):
564 try:
565 cur.execute("insert into test(name) values ('a')")
566 errors.append("did not raise ProgrammingError")
567 return
568 except sqlite.ProgrammingError:
569 return
570 except:
571 errors.append("raised wrong exception")
572
573 errors = []
574 t = threading.Thread(target=run, kwargs={"cur": self.cur, "errors": errors})
575 t.start()
576 t.join()
577 if len(errors) > 0:
578 self.fail("\n".join(errors))
579
580 def CheckCurClose(self):
581 def run(cur, errors):
582 try:
583 cur.close()
584 errors.append("did not raise ProgrammingError")
585 return
586 except sqlite.ProgrammingError:
587 return
588 except:
589 errors.append("raised wrong exception")
590
591 errors = []
592 t = threading.Thread(target=run, kwargs={"cur": self.cur, "errors": errors})
593 t.start()
594 t.join()
595 if len(errors) > 0:
596 self.fail("\n".join(errors))
597
598 def CheckCurExecute(self):
599 def run(cur, errors):
600 try:
601 cur.execute("select name from test")
602 errors.append("did not raise ProgrammingError")
603 return
604 except sqlite.ProgrammingError:
605 return
606 except:
607 errors.append("raised wrong exception")
608
609 errors = []
610 self.cur.execute("insert into test(name) values ('a')")
611 t = threading.Thread(target=run, kwargs={"cur": self.cur, "errors": errors})
612 t.start()
613 t.join()
614 if len(errors) > 0:
615 self.fail("\n".join(errors))
616
617 def CheckCurIterNext(self):
618 def run(cur, errors):
619 try:
620 row = cur.fetchone()
621 errors.append("did not raise ProgrammingError")
622 return
623 except sqlite.ProgrammingError:
624 return
625 except:
626 errors.append("raised wrong exception")
627
628 errors = []
629 self.cur.execute("insert into test(name) values ('a')")
630 self.cur.execute("select name from test")
631 t = threading.Thread(target=run, kwargs={"cur": self.cur, "errors": errors})
632 t.start()
633 t.join()
634 if len(errors) > 0:
635 self.fail("\n".join(errors))
636
637class ConstructorTests(unittest.TestCase):
638 def CheckDate(self):
639 d = sqlite.Date(2004, 10, 28)
640
641 def CheckTime(self):
642 t = sqlite.Time(12, 39, 35)
643
644 def CheckTimestamp(self):
645 ts = sqlite.Timestamp(2004, 10, 28, 12, 39, 35)
646
647 def CheckDateFromTicks(self):
648 d = sqlite.DateFromTicks(42)
649
650 def CheckTimeFromTicks(self):
651 t = sqlite.TimeFromTicks(42)
652
653 def CheckTimestampFromTicks(self):
654 ts = sqlite.TimestampFromTicks(42)
655
656 def CheckBinary(self):
Victor Stinnera3acea32014-09-05 21:05:05 +0200657 with test_support.check_py3k_warnings():
658 b = sqlite.Binary(chr(0) + "'")
Anthony Baxterc51ee692006-04-01 00:57:31 +0000659
660class ExtensionTests(unittest.TestCase):
661 def CheckScriptStringSql(self):
662 con = sqlite.connect(":memory:")
663 cur = con.cursor()
664 cur.executescript("""
665 -- bla bla
666 /* a stupid comment */
667 create table a(i);
668 insert into a(i) values (5);
669 """)
670 cur.execute("select i from a")
671 res = cur.fetchone()[0]
Gregory P. Smith1844b0d2009-07-04 08:42:10 +0000672 self.assertEqual(res, 5)
Anthony Baxterc51ee692006-04-01 00:57:31 +0000673
674 def CheckScriptStringUnicode(self):
675 con = sqlite.connect(":memory:")
676 cur = con.cursor()
677 cur.executescript(u"""
678 create table a(i);
679 insert into a(i) values (5);
680 select i from a;
681 delete from a;
682 insert into a(i) values (6);
683 """)
684 cur.execute("select i from a")
685 res = cur.fetchone()[0]
Gregory P. Smith1844b0d2009-07-04 08:42:10 +0000686 self.assertEqual(res, 6)
Anthony Baxterc51ee692006-04-01 00:57:31 +0000687
Gerhard Häring3bbb6722010-03-05 09:12:37 +0000688 def CheckScriptSyntaxError(self):
Anthony Baxterc51ee692006-04-01 00:57:31 +0000689 con = sqlite.connect(":memory:")
690 cur = con.cursor()
691 raised = False
692 try:
Gerhard Häring3bbb6722010-03-05 09:12:37 +0000693 cur.executescript("create table test(x); asdf; create table test2(x)")
694 except sqlite.OperationalError:
Anthony Baxterc51ee692006-04-01 00:57:31 +0000695 raised = True
Gregory P. Smith1844b0d2009-07-04 08:42:10 +0000696 self.assertEqual(raised, True, "should have raised an exception")
Anthony Baxterc51ee692006-04-01 00:57:31 +0000697
698 def CheckScriptErrorNormal(self):
699 con = sqlite.connect(":memory:")
700 cur = con.cursor()
701 raised = False
702 try:
703 cur.executescript("create table test(sadfsadfdsa); select foo from hurz;")
704 except sqlite.OperationalError:
705 raised = True
Gregory P. Smith1844b0d2009-07-04 08:42:10 +0000706 self.assertEqual(raised, True, "should have raised an exception")
Anthony Baxterc51ee692006-04-01 00:57:31 +0000707
708 def CheckConnectionExecute(self):
709 con = sqlite.connect(":memory:")
710 result = con.execute("select 5").fetchone()[0]
Gregory P. Smith1844b0d2009-07-04 08:42:10 +0000711 self.assertEqual(result, 5, "Basic test of Connection.execute")
Anthony Baxterc51ee692006-04-01 00:57:31 +0000712
713 def CheckConnectionExecutemany(self):
714 con = sqlite.connect(":memory:")
715 con.execute("create table test(foo)")
716 con.executemany("insert into test(foo) values (?)", [(3,), (4,)])
717 result = con.execute("select foo from test order by foo").fetchall()
Gregory P. Smith1844b0d2009-07-04 08:42:10 +0000718 self.assertEqual(result[0][0], 3, "Basic test of Connection.executemany")
719 self.assertEqual(result[1][0], 4, "Basic test of Connection.executemany")
Anthony Baxterc51ee692006-04-01 00:57:31 +0000720
721 def CheckConnectionExecutescript(self):
722 con = sqlite.connect(":memory:")
723 con.executescript("create table test(foo); insert into test(foo) values (5);")
724 result = con.execute("select foo from test").fetchone()[0]
Gregory P. Smith1844b0d2009-07-04 08:42:10 +0000725 self.assertEqual(result, 5, "Basic test of Connection.executescript")
Anthony Baxterc51ee692006-04-01 00:57:31 +0000726
Gerhard Häring3bbb6722010-03-05 09:12:37 +0000727class ClosedConTests(unittest.TestCase):
Anthony Baxterc51ee692006-04-01 00:57:31 +0000728 def setUp(self):
729 pass
730
731 def tearDown(self):
732 pass
733
734 def CheckClosedConCursor(self):
735 con = sqlite.connect(":memory:")
736 con.close()
737 try:
738 cur = con.cursor()
739 self.fail("Should have raised a ProgrammingError")
740 except sqlite.ProgrammingError:
741 pass
742 except:
743 self.fail("Should have raised a ProgrammingError")
744
745 def CheckClosedConCommit(self):
746 con = sqlite.connect(":memory:")
747 con.close()
748 try:
749 con.commit()
750 self.fail("Should have raised a ProgrammingError")
751 except sqlite.ProgrammingError:
752 pass
753 except:
754 self.fail("Should have raised a ProgrammingError")
755
756 def CheckClosedConRollback(self):
757 con = sqlite.connect(":memory:")
758 con.close()
759 try:
760 con.rollback()
761 self.fail("Should have raised a ProgrammingError")
762 except sqlite.ProgrammingError:
763 pass
764 except:
765 self.fail("Should have raised a ProgrammingError")
766
767 def CheckClosedCurExecute(self):
768 con = sqlite.connect(":memory:")
769 cur = con.cursor()
770 con.close()
771 try:
772 cur.execute("select 4")
773 self.fail("Should have raised a ProgrammingError")
774 except sqlite.ProgrammingError:
775 pass
776 except:
777 self.fail("Should have raised a ProgrammingError")
778
Gerhard Häring3bbb6722010-03-05 09:12:37 +0000779 def CheckClosedCreateFunction(self):
780 con = sqlite.connect(":memory:")
781 con.close()
782 def f(x): return 17
783 try:
784 con.create_function("foo", 1, f)
785 self.fail("Should have raised a ProgrammingError")
786 except sqlite.ProgrammingError:
787 pass
788 except:
789 self.fail("Should have raised a ProgrammingError")
790
791 def CheckClosedCreateAggregate(self):
792 con = sqlite.connect(":memory:")
793 con.close()
794 class Agg:
795 def __init__(self):
796 pass
797 def step(self, x):
798 pass
799 def finalize(self):
800 return 17
801 try:
802 con.create_aggregate("foo", 1, Agg)
803 self.fail("Should have raised a ProgrammingError")
804 except sqlite.ProgrammingError:
805 pass
806 except:
807 self.fail("Should have raised a ProgrammingError")
808
809 def CheckClosedSetAuthorizer(self):
810 con = sqlite.connect(":memory:")
811 con.close()
812 def authorizer(*args):
813 return sqlite.DENY
814 try:
815 con.set_authorizer(authorizer)
816 self.fail("Should have raised a ProgrammingError")
817 except sqlite.ProgrammingError:
818 pass
819 except:
820 self.fail("Should have raised a ProgrammingError")
821
822 def CheckClosedSetProgressCallback(self):
823 con = sqlite.connect(":memory:")
824 con.close()
825 def progress(): pass
826 try:
827 con.set_progress_handler(progress, 100)
828 self.fail("Should have raised a ProgrammingError")
829 except sqlite.ProgrammingError:
830 pass
831 except:
832 self.fail("Should have raised a ProgrammingError")
833
834 def CheckClosedCall(self):
835 con = sqlite.connect(":memory:")
836 con.close()
837 try:
838 con()
839 self.fail("Should have raised a ProgrammingError")
840 except sqlite.ProgrammingError:
841 pass
842 except:
843 self.fail("Should have raised a ProgrammingError")
844
845class ClosedCurTests(unittest.TestCase):
846 def setUp(self):
847 pass
848
849 def tearDown(self):
850 pass
851
852 def CheckClosed(self):
853 con = sqlite.connect(":memory:")
854 cur = con.cursor()
855 cur.close()
856
857 for method_name in ("execute", "executemany", "executescript", "fetchall", "fetchmany", "fetchone"):
858 if method_name in ("execute", "executescript"):
859 params = ("select 4 union select 5",)
860 elif method_name == "executemany":
861 params = ("insert into foo(bar) values (?)", [(3,), (4,)])
862 else:
863 params = []
864
865 try:
866 method = getattr(cur, method_name)
867
868 method(*params)
869 self.fail("Should have raised a ProgrammingError: method " + method_name)
870 except sqlite.ProgrammingError:
871 pass
872 except:
873 self.fail("Should have raised a ProgrammingError: " + method_name)
874
Anthony Baxterc51ee692006-04-01 00:57:31 +0000875def suite():
876 module_suite = unittest.makeSuite(ModuleTests, "Check")
877 connection_suite = unittest.makeSuite(ConnectionTests, "Check")
878 cursor_suite = unittest.makeSuite(CursorTests, "Check")
879 thread_suite = unittest.makeSuite(ThreadTests, "Check")
880 constructor_suite = unittest.makeSuite(ConstructorTests, "Check")
881 ext_suite = unittest.makeSuite(ExtensionTests, "Check")
Gerhard Häring3bbb6722010-03-05 09:12:37 +0000882 closed_con_suite = unittest.makeSuite(ClosedConTests, "Check")
883 closed_cur_suite = unittest.makeSuite(ClosedCurTests, "Check")
884 return unittest.TestSuite((module_suite, connection_suite, cursor_suite, thread_suite, constructor_suite, ext_suite, closed_con_suite, closed_cur_suite))
Anthony Baxterc51ee692006-04-01 00:57:31 +0000885
886def test():
887 runner = unittest.TextTestRunner()
888 runner.run(suite())
889
890if __name__ == "__main__":
891 test()