blob: 14cae25001a4eb9ff9582b450f042f0b5d1745e7 [file] [log] [blame]
Anthony Baxterc51ee692006-04-01 00:57:31 +00001#-*- coding: ISO-8859-1 -*-
2# pysqlite2/test/transactions.py: tests transactions
3#
Gerhard Häring1cc60ed2008-02-29 22:08:41 +00004# Copyright (C) 2005-2007 Gerhard Häring <gh@ghaering.de>
Anthony Baxterc51ee692006-04-01 00:57:31 +00005#
6# This file is part of pysqlite.
7#
8# This software is provided 'as-is', without any express or implied
9# warranty. In no event will the authors be held liable for any damages
10# arising from the use of this software.
11#
12# Permission is granted to anyone to use this software for any purpose,
13# including commercial applications, and to alter it and redistribute it
14# freely, subject to the following restrictions:
15#
16# 1. The origin of this software must not be misrepresented; you must not
17# claim that you wrote the original software. If you use this software
18# in a product, an acknowledgment in the product documentation would be
19# appreciated but is not required.
20# 2. Altered source versions must be plainly marked as such, and must not be
21# misrepresented as being the original software.
22# 3. This notice may not be removed or altered from any source distribution.
23
Gerhard Häring1cc60ed2008-02-29 22:08:41 +000024import sys
Anthony Baxterc51ee692006-04-01 00:57:31 +000025import os, unittest
26import sqlite3 as sqlite
27
28def get_db_path():
Anthony Baxter72289a62006-04-04 06:29:05 +000029 return "sqlite_testdb"
Anthony Baxterc51ee692006-04-01 00:57:31 +000030
31class TransactionTests(unittest.TestCase):
32 def setUp(self):
33 try:
34 os.remove(get_db_path())
35 except:
36 pass
37
38 self.con1 = sqlite.connect(get_db_path(), timeout=0.1)
39 self.cur1 = self.con1.cursor()
40
41 self.con2 = sqlite.connect(get_db_path(), timeout=0.1)
42 self.cur2 = self.con2.cursor()
43
44 def tearDown(self):
45 self.cur1.close()
46 self.con1.close()
47
48 self.cur2.close()
49 self.con2.close()
50
Anthony Baxter72289a62006-04-04 06:29:05 +000051 os.unlink(get_db_path())
52
Anthony Baxterc51ee692006-04-01 00:57:31 +000053 def CheckDMLdoesAutoCommitBefore(self):
54 self.cur1.execute("create table test(i)")
55 self.cur1.execute("insert into test(i) values (5)")
56 self.cur1.execute("create table test2(j)")
57 self.cur2.execute("select i from test")
58 res = self.cur2.fetchall()
59 self.failUnlessEqual(len(res), 1)
60
61 def CheckInsertStartsTransaction(self):
62 self.cur1.execute("create table test(i)")
63 self.cur1.execute("insert into test(i) values (5)")
64 self.cur2.execute("select i from test")
65 res = self.cur2.fetchall()
66 self.failUnlessEqual(len(res), 0)
67
68 def CheckUpdateStartsTransaction(self):
69 self.cur1.execute("create table test(i)")
70 self.cur1.execute("insert into test(i) values (5)")
71 self.con1.commit()
72 self.cur1.execute("update test set i=6")
73 self.cur2.execute("select i from test")
74 res = self.cur2.fetchone()[0]
75 self.failUnlessEqual(res, 5)
76
77 def CheckDeleteStartsTransaction(self):
78 self.cur1.execute("create table test(i)")
79 self.cur1.execute("insert into test(i) values (5)")
80 self.con1.commit()
81 self.cur1.execute("delete from test")
82 self.cur2.execute("select i from test")
83 res = self.cur2.fetchall()
84 self.failUnlessEqual(len(res), 1)
85
86 def CheckReplaceStartsTransaction(self):
87 self.cur1.execute("create table test(i)")
88 self.cur1.execute("insert into test(i) values (5)")
89 self.con1.commit()
90 self.cur1.execute("replace into test(i) values (6)")
91 self.cur2.execute("select i from test")
92 res = self.cur2.fetchall()
93 self.failUnlessEqual(len(res), 1)
94 self.failUnlessEqual(res[0][0], 5)
95
96 def CheckToggleAutoCommit(self):
97 self.cur1.execute("create table test(i)")
98 self.cur1.execute("insert into test(i) values (5)")
99 self.con1.isolation_level = None
100 self.failUnlessEqual(self.con1.isolation_level, None)
101 self.cur2.execute("select i from test")
102 res = self.cur2.fetchall()
103 self.failUnlessEqual(len(res), 1)
104
105 self.con1.isolation_level = "DEFERRED"
106 self.failUnlessEqual(self.con1.isolation_level , "DEFERRED")
107 self.cur1.execute("insert into test(i) values (5)")
108 self.cur2.execute("select i from test")
109 res = self.cur2.fetchall()
110 self.failUnlessEqual(len(res), 1)
111
112 def CheckRaiseTimeout(self):
113 self.cur1.execute("create table test(i)")
114 self.cur1.execute("insert into test(i) values (5)")
115 try:
116 self.cur2.execute("insert into test(i) values (5)")
117 self.fail("should have raised an OperationalError")
118 except sqlite.OperationalError:
119 pass
120 except:
121 self.fail("should have raised an OperationalError")
122
Gerhard Häring1cc60ed2008-02-29 22:08:41 +0000123 def CheckLocking(self):
124 """
125 This tests the improved concurrency with pysqlite 2.3.4. You needed
126 to roll back con2 before you could commit con1.
127 """
128 self.cur1.execute("create table test(i)")
129 self.cur1.execute("insert into test(i) values (5)")
130 try:
131 self.cur2.execute("insert into test(i) values (5)")
132 self.fail("should have raised an OperationalError")
133 except sqlite.OperationalError:
134 pass
135 except:
136 self.fail("should have raised an OperationalError")
137 # NO self.con2.rollback() HERE!!!
138 self.con1.commit()
139
Anthony Baxterc51ee692006-04-01 00:57:31 +0000140class SpecialCommandTests(unittest.TestCase):
141 def setUp(self):
142 self.con = sqlite.connect(":memory:")
143 self.cur = self.con.cursor()
144
145 def CheckVacuum(self):
146 self.cur.execute("create table test(i)")
147 self.cur.execute("insert into test(i) values (5)")
148 self.cur.execute("vacuum")
149
150 def CheckDropTable(self):
151 self.cur.execute("create table test(i)")
152 self.cur.execute("insert into test(i) values (5)")
153 self.cur.execute("drop table test")
154
155 def CheckPragma(self):
156 self.cur.execute("create table test(i)")
157 self.cur.execute("insert into test(i) values (5)")
158 self.cur.execute("pragma count_changes=1")
159
160 def tearDown(self):
161 self.cur.close()
162 self.con.close()
163
164def suite():
165 default_suite = unittest.makeSuite(TransactionTests, "Check")
166 special_command_suite = unittest.makeSuite(SpecialCommandTests, "Check")
167 return unittest.TestSuite((default_suite, special_command_suite))
168
169def test():
170 runner = unittest.TextTestRunner()
171 runner.run(suite())
172
173if __name__ == "__main__":
174 test()