| import unittest |
| import os |
| |
| try: |
| # For Pythons w/distutils pybsddb |
| from bsddb3 import db |
| except ImportError: |
| from bsddb import db |
| |
| try: |
| from bsddb3 import test_support |
| except ImportError: |
| from test import test_support |
| |
| from test_all import get_new_environment_path, get_new_database_path |
| |
| |
| class DBSequenceTest(unittest.TestCase): |
| import sys |
| if sys.version_info[:3] < (2, 4, 0): |
| def assertTrue(self, expr, msg=None): |
| self.failUnless(expr,msg=msg) |
| |
| def setUp(self): |
| self.int_32_max = 0x100000000 |
| self.homeDir = get_new_environment_path() |
| self.filename = "test" |
| |
| self.dbenv = db.DBEnv() |
| self.dbenv.open(self.homeDir, db.DB_CREATE | db.DB_INIT_MPOOL, 0666) |
| self.d = db.DB(self.dbenv) |
| self.d.open(self.filename, db.DB_BTREE, db.DB_CREATE, 0666) |
| |
| def tearDown(self): |
| if hasattr(self, 'seq'): |
| self.seq.close() |
| del self.seq |
| if hasattr(self, 'd'): |
| self.d.close() |
| del self.d |
| if hasattr(self, 'dbenv'): |
| self.dbenv.close() |
| del self.dbenv |
| |
| test_support.rmtree(self.homeDir) |
| |
| def test_get(self): |
| self.seq = db.DBSequence(self.d, flags=0) |
| start_value = 10 * self.int_32_max |
| self.assertEqual(0xA00000000, start_value) |
| self.assertEquals(None, self.seq.init_value(start_value)) |
| self.assertEquals(None, self.seq.open(key='id', txn=None, flags=db.DB_CREATE)) |
| self.assertEquals(start_value, self.seq.get(5)) |
| self.assertEquals(start_value + 5, self.seq.get()) |
| |
| def test_remove(self): |
| self.seq = db.DBSequence(self.d, flags=0) |
| self.assertEquals(None, self.seq.open(key='foo', txn=None, flags=db.DB_CREATE)) |
| self.assertEquals(None, self.seq.remove(txn=None, flags=0)) |
| del self.seq |
| |
| def test_get_key(self): |
| self.seq = db.DBSequence(self.d, flags=0) |
| key = 'foo' |
| self.assertEquals(None, self.seq.open(key=key, txn=None, flags=db.DB_CREATE)) |
| self.assertEquals(key, self.seq.get_key()) |
| |
| def test_get_dbp(self): |
| self.seq = db.DBSequence(self.d, flags=0) |
| self.assertEquals(None, self.seq.open(key='foo', txn=None, flags=db.DB_CREATE)) |
| self.assertEquals(self.d, self.seq.get_dbp()) |
| |
| def test_cachesize(self): |
| self.seq = db.DBSequence(self.d, flags=0) |
| cashe_size = 10 |
| self.assertEquals(None, self.seq.set_cachesize(cashe_size)) |
| self.assertEquals(None, self.seq.open(key='foo', txn=None, flags=db.DB_CREATE)) |
| self.assertEquals(cashe_size, self.seq.get_cachesize()) |
| |
| def test_flags(self): |
| self.seq = db.DBSequence(self.d, flags=0) |
| flag = db.DB_SEQ_WRAP; |
| self.assertEquals(None, self.seq.set_flags(flag)) |
| self.assertEquals(None, self.seq.open(key='foo', txn=None, flags=db.DB_CREATE)) |
| self.assertEquals(flag, self.seq.get_flags() & flag) |
| |
| def test_range(self): |
| self.seq = db.DBSequence(self.d, flags=0) |
| seq_range = (10 * self.int_32_max, 11 * self.int_32_max - 1) |
| self.assertEquals(None, self.seq.set_range(seq_range)) |
| self.seq.init_value(seq_range[0]) |
| self.assertEquals(None, self.seq.open(key='foo', txn=None, flags=db.DB_CREATE)) |
| self.assertEquals(seq_range, self.seq.get_range()) |
| |
| def test_stat(self): |
| self.seq = db.DBSequence(self.d, flags=0) |
| self.assertEquals(None, self.seq.open(key='foo', txn=None, flags=db.DB_CREATE)) |
| stat = self.seq.stat() |
| for param in ('nowait', 'min', 'max', 'value', 'current', |
| 'flags', 'cache_size', 'last_value', 'wait'): |
| self.assertTrue(param in stat, "parameter %s isn't in stat info" % param) |
| |
| if db.version() >= (4,7) : |
| # This code checks a crash solved in Berkeley DB 4.7 |
| def test_stat_crash(self) : |
| d=db.DB() |
| d.open(None,dbtype=db.DB_HASH,flags=db.DB_CREATE) # In RAM |
| seq = db.DBSequence(d, flags=0) |
| |
| self.assertRaises(db.DBNotFoundError, seq.open, |
| key='id', txn=None, flags=0) |
| |
| self.assertRaises(db.DBInvalidArgError, seq.stat) |
| |
| d.close() |
| |
| def test_64bits(self) : |
| # We don't use both extremes because they are problematic |
| value_plus=(1L<<63)-2 |
| self.assertEquals(9223372036854775806L,value_plus) |
| value_minus=(-1L<<63)+1 # Two complement |
| self.assertEquals(-9223372036854775807L,value_minus) |
| self.seq = db.DBSequence(self.d, flags=0) |
| self.assertEquals(None, self.seq.init_value(value_plus-1)) |
| self.assertEquals(None, self.seq.open(key='id', txn=None, |
| flags=db.DB_CREATE)) |
| self.assertEquals(value_plus-1, self.seq.get(1)) |
| self.assertEquals(value_plus, self.seq.get(1)) |
| |
| self.seq.remove(txn=None, flags=0) |
| |
| self.seq = db.DBSequence(self.d, flags=0) |
| self.assertEquals(None, self.seq.init_value(value_minus)) |
| self.assertEquals(None, self.seq.open(key='id', txn=None, |
| flags=db.DB_CREATE)) |
| self.assertEquals(value_minus, self.seq.get(1)) |
| self.assertEquals(value_minus+1, self.seq.get(1)) |
| |
| def test_multiple_close(self): |
| self.seq = db.DBSequence(self.d) |
| self.seq.close() # You can close a Sequence multiple times |
| self.seq.close() |
| self.seq.close() |
| |
| def test_suite(): |
| suite = unittest.TestSuite() |
| if db.version() >= (4,3): |
| suite.addTest(unittest.makeSuite(DBSequenceTest)) |
| return suite |
| |
| |
| if __name__ == '__main__': |
| unittest.main(defaultTest='test_suite') |