blob: da5bd21d0802afaac83a9573717d61ac352008c0 [file] [log] [blame]
Thomas Wouters49fd7fa2006-04-21 10:40:58 +00001#-*- coding: ISO-8859-1 -*-
2# pysqlite2/test/transactions.py: tests transactions
3#
Gerhard Häringe7ea7452008-03-29 00:45:29 +00004# Copyright (C) 2005-2007 Gerhard Häring <gh@ghaering.de>
Thomas Wouters49fd7fa2006-04-21 10:40:58 +00005#
6# This file is part of pysqlite.
7#
8# This software is provided 'as-is', without any express or implied
9# warranty. In no event will the authors be held liable for any damages
10# arising from the use of this software.
11#
12# Permission is granted to anyone to use this software for any purpose,
13# including commercial applications, and to alter it and redistribute it
14# freely, subject to the following restrictions:
15#
16# 1. The origin of this software must not be misrepresented; you must not
17# claim that you wrote the original software. If you use this software
18# in a product, an acknowledgment in the product documentation would be
19# appreciated but is not required.
20# 2. Altered source versions must be plainly marked as such, and must not be
21# misrepresented as being the original software.
22# 3. This notice may not be removed or altered from any source distribution.
23
24import os, unittest
25import sqlite3 as sqlite
26
27def get_db_path():
28 return "sqlite_testdb"
29
30class TransactionTests(unittest.TestCase):
31 def setUp(self):
32 try:
33 os.remove(get_db_path())
Christian Heimesdd15f6c2008-03-16 00:07:10 +000034 except OSError:
Thomas Wouters49fd7fa2006-04-21 10:40:58 +000035 pass
36
37 self.con1 = sqlite.connect(get_db_path(), timeout=0.1)
38 self.cur1 = self.con1.cursor()
39
40 self.con2 = sqlite.connect(get_db_path(), timeout=0.1)
41 self.cur2 = self.con2.cursor()
42
43 def tearDown(self):
44 self.cur1.close()
45 self.con1.close()
46
47 self.cur2.close()
48 self.con2.close()
49
Christian Heimesdd15f6c2008-03-16 00:07:10 +000050 try:
51 os.unlink(get_db_path())
52 except OSError:
53 pass
Thomas Wouters49fd7fa2006-04-21 10:40:58 +000054
55 def CheckDMLdoesAutoCommitBefore(self):
56 self.cur1.execute("create table test(i)")
57 self.cur1.execute("insert into test(i) values (5)")
58 self.cur1.execute("create table test2(j)")
59 self.cur2.execute("select i from test")
60 res = self.cur2.fetchall()
61 self.failUnlessEqual(len(res), 1)
62
63 def CheckInsertStartsTransaction(self):
64 self.cur1.execute("create table test(i)")
65 self.cur1.execute("insert into test(i) values (5)")
66 self.cur2.execute("select i from test")
67 res = self.cur2.fetchall()
68 self.failUnlessEqual(len(res), 0)
69
70 def CheckUpdateStartsTransaction(self):
71 self.cur1.execute("create table test(i)")
72 self.cur1.execute("insert into test(i) values (5)")
73 self.con1.commit()
74 self.cur1.execute("update test set i=6")
75 self.cur2.execute("select i from test")
76 res = self.cur2.fetchone()[0]
77 self.failUnlessEqual(res, 5)
78
79 def CheckDeleteStartsTransaction(self):
80 self.cur1.execute("create table test(i)")
81 self.cur1.execute("insert into test(i) values (5)")
82 self.con1.commit()
83 self.cur1.execute("delete from test")
84 self.cur2.execute("select i from test")
85 res = self.cur2.fetchall()
86 self.failUnlessEqual(len(res), 1)
87
88 def CheckReplaceStartsTransaction(self):
89 self.cur1.execute("create table test(i)")
90 self.cur1.execute("insert into test(i) values (5)")
91 self.con1.commit()
92 self.cur1.execute("replace into test(i) values (6)")
93 self.cur2.execute("select i from test")
94 res = self.cur2.fetchall()
95 self.failUnlessEqual(len(res), 1)
96 self.failUnlessEqual(res[0][0], 5)
97
98 def CheckToggleAutoCommit(self):
99 self.cur1.execute("create table test(i)")
100 self.cur1.execute("insert into test(i) values (5)")
101 self.con1.isolation_level = None
102 self.failUnlessEqual(self.con1.isolation_level, None)
103 self.cur2.execute("select i from test")
104 res = self.cur2.fetchall()
105 self.failUnlessEqual(len(res), 1)
106
107 self.con1.isolation_level = "DEFERRED"
108 self.failUnlessEqual(self.con1.isolation_level , "DEFERRED")
109 self.cur1.execute("insert into test(i) values (5)")
110 self.cur2.execute("select i from test")
111 res = self.cur2.fetchall()
112 self.failUnlessEqual(len(res), 1)
113
114 def CheckRaiseTimeout(self):
115 self.cur1.execute("create table test(i)")
116 self.cur1.execute("insert into test(i) values (5)")
117 try:
118 self.cur2.execute("insert into test(i) values (5)")
119 self.fail("should have raised an OperationalError")
120 except sqlite.OperationalError:
121 pass
122 except:
123 self.fail("should have raised an OperationalError")
124
Gerhard Häringe7ea7452008-03-29 00:45:29 +0000125 def CheckLocking(self):
126 """
127 This tests the improved concurrency with pysqlite 2.3.4. You needed
128 to roll back con2 before you could commit con1.
129 """
130 self.cur1.execute("create table test(i)")
131 self.cur1.execute("insert into test(i) values (5)")
132 try:
133 self.cur2.execute("insert into test(i) values (5)")
134 self.fail("should have raised an OperationalError")
135 except sqlite.OperationalError:
136 pass
137 except:
138 self.fail("should have raised an OperationalError")
139 # NO self.con2.rollback() HERE!!!
140 self.con1.commit()
141
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000142class SpecialCommandTests(unittest.TestCase):
143 def setUp(self):
144 self.con = sqlite.connect(":memory:")
145 self.cur = self.con.cursor()
146
147 def CheckVacuum(self):
148 self.cur.execute("create table test(i)")
149 self.cur.execute("insert into test(i) values (5)")
150 self.cur.execute("vacuum")
151
152 def CheckDropTable(self):
153 self.cur.execute("create table test(i)")
154 self.cur.execute("insert into test(i) values (5)")
155 self.cur.execute("drop table test")
156
157 def CheckPragma(self):
158 self.cur.execute("create table test(i)")
159 self.cur.execute("insert into test(i) values (5)")
160 self.cur.execute("pragma count_changes=1")
161
162 def tearDown(self):
163 self.cur.close()
164 self.con.close()
165
166def suite():
167 default_suite = unittest.makeSuite(TransactionTests, "Check")
168 special_command_suite = unittest.makeSuite(SpecialCommandTests, "Check")
169 return unittest.TestSuite((default_suite, special_command_suite))
170
171def test():
172 runner = unittest.TextTestRunner()
173 runner.run(suite())
174
175if __name__ == "__main__":
176 test()