| """TestCases for multi-threaded access to a DB. |
| """ |
| |
| import os |
| import sys |
| import time |
| import errno |
| import shutil |
| import tempfile |
| from pprint import pprint |
| from random import random |
| |
| DASH = b'-' |
| |
| try: |
| from threading import Thread, currentThread |
| have_threads = True |
| except ImportError: |
| have_threads = False |
| |
| try: |
| WindowsError |
| except NameError: |
| class WindowsError(Exception): |
| pass |
| |
| import unittest |
| from bsddb.test.test_all import verbose |
| |
| try: |
| # For Pythons w/distutils pybsddb |
| from bsddb3 import db, dbutils |
| except ImportError: |
| # For Python 2.3 |
| from bsddb import db, dbutils |
| |
| |
| #---------------------------------------------------------------------- |
| |
| class BaseThreadedTestCase(unittest.TestCase): |
| dbtype = db.DB_UNKNOWN # must be set in derived class |
| dbopenflags = 0 |
| dbsetflags = 0 |
| envflags = 0 |
| |
| def setUp(self): |
| if verbose: |
| dbutils._deadlock_VerboseFile = sys.stdout |
| |
| homeDir = os.path.join(tempfile.gettempdir(), 'db_home') |
| self.homeDir = homeDir |
| try: |
| os.mkdir(homeDir) |
| except OSError, e: |
| if e.errno != errno.EEXIST: raise |
| self.env = db.DBEnv() |
| self.setEnvOpts() |
| self.env.open(self.homeDir, self.envflags | db.DB_CREATE) |
| |
| self.filename = self.__class__.__name__ + '.db' |
| self.d = db.DB(self.env) |
| if self.dbsetflags: |
| self.d.set_flags(self.dbsetflags) |
| self.d.open(self.filename, self.dbtype, self.dbopenflags|db.DB_CREATE) |
| |
| def tearDown(self): |
| self.d.close() |
| self.env.close() |
| try: |
| shutil.rmtree(self.homeDir) |
| except OSError, e: |
| if e.errno != errno.EEXIST: raise |
| |
| def setEnvOpts(self): |
| pass |
| |
| def makeData(self, key): |
| return DASH.join([key] * 5) |
| |
| def _writerThread(self, *args, **kwargs): |
| raise RuntimeError("must override this in a subclass") |
| |
| def _readerThread(self, *args, **kwargs): |
| raise RuntimeError("must override this in a subclass") |
| |
| def writerThread(self, *args, **kwargs): |
| try: |
| self._writerThread(*args, **kwargs) |
| except db.DBLockDeadlockError: |
| if verbose: |
| print(currentThread().getName(), 'died from', e) |
| else: |
| if verbose: |
| print(currentThread().getName(), "finished.") |
| |
| def readerThread(self, *args, **kwargs): |
| try: |
| self._readerThread(*args, **kwargs) |
| except db.DBLockDeadlockError as e: |
| if verbose: |
| print(currentThread().getName(), 'died from', e) |
| else: |
| if verbose: |
| print(currentThread().getName(), "finished.") |
| |
| |
| |
| #---------------------------------------------------------------------- |
| |
| |
| class ConcurrentDataStoreBase(BaseThreadedTestCase): |
| dbopenflags = db.DB_THREAD |
| envflags = db.DB_THREAD | db.DB_INIT_CDB | db.DB_INIT_MPOOL |
| readers = 0 # derived class should set |
| writers = 0 |
| records = 1000 |
| |
| def test01_1WriterMultiReaders(self): |
| if verbose: |
| print('\n', '-=' * 30) |
| print("Running %s.test01_1WriterMultiReaders..." % \ |
| self.__class__.__name__) |
| print('Using:', self.homeDir, self.filename) |
| |
| threads = [] |
| wt = Thread(target = self.writerThread, |
| args = (self.d, self.records), |
| name = 'the writer', |
| )#verbose = verbose) |
| threads.append(wt) |
| |
| for x in range(self.readers): |
| rt = Thread(target = self.readerThread, |
| args = (self.d, x), |
| name = 'reader %d' % x, |
| )#verbose = verbose) |
| threads.append(rt) |
| |
| for t in threads: |
| t.start() |
| for t in threads: |
| t.join() |
| |
| def _writerThread(self, d, howMany): |
| name = currentThread().getName() |
| start = 0 |
| stop = howMany |
| if verbose: |
| print(name+": creating records", start, "-", stop) |
| |
| for x in range(start, stop): |
| key = ('%04d' % x).encode("ascii") |
| d.put(key, self.makeData(key)) |
| if verbose and x > start and x % 50 == 0: |
| print(name+": records", start, "-", x, "finished") |
| |
| if verbose: |
| print("%s: finished creating records" % name) |
| |
| ## # Each write-cursor will be exclusive, the only one that can update the DB... |
| ## if verbose: print "%s: deleting a few records" % name |
| ## c = d.cursor(flags = db.DB_WRITECURSOR) |
| ## for x in range(10): |
| ## key = int(random() * howMany) + start |
| ## key = '%04d' % key |
| ## if d.has_key(key): |
| ## c.set(key) |
| ## c.delete() |
| |
| ## c.close() |
| |
| def _readerThread(self, d, readerNum): |
| time.sleep(0.01 * readerNum) |
| name = currentThread().getName() |
| |
| for loop in range(5): |
| c = d.cursor() |
| count = 0 |
| rec = c.first() |
| while rec: |
| count += 1 |
| key, data = rec |
| self.assertEqual(self.makeData(key), data) |
| rec = c.next() |
| if verbose: |
| print(name+": found", count, "records") |
| c.close() |
| time.sleep(0.05) |
| |
| |
| class BTreeConcurrentDataStore(ConcurrentDataStoreBase): |
| dbtype = db.DB_BTREE |
| writers = 1 |
| readers = 10 |
| records = 1000 |
| |
| |
| class HashConcurrentDataStore(ConcurrentDataStoreBase): |
| dbtype = db.DB_HASH |
| writers = 1 |
| readers = 0 |
| records = 1000 |
| |
| |
| #---------------------------------------------------------------------- |
| |
| class SimpleThreadedBase(BaseThreadedTestCase): |
| dbopenflags = db.DB_THREAD |
| envflags = db.DB_THREAD | db.DB_INIT_MPOOL | db.DB_INIT_LOCK |
| readers = 5 |
| writers = 3 |
| records = 1000 |
| |
| def setEnvOpts(self): |
| self.env.set_lk_detect(db.DB_LOCK_DEFAULT) |
| |
| def test02_SimpleLocks(self): |
| if verbose: |
| print('\n', '-=' * 30) |
| print("Running %s.test02_SimpleLocks..." % self.__class__.__name__) |
| |
| threads = [] |
| for x in range(self.writers): |
| wt = Thread(target = self.writerThread, |
| args = (self.d, self.records, x), |
| name = 'writer %d' % x, |
| )#verbose = verbose) |
| threads.append(wt) |
| for x in range(self.readers): |
| rt = Thread(target = self.readerThread, |
| args = (self.d, x), |
| name = 'reader %d' % x, |
| )#verbose = verbose) |
| threads.append(rt) |
| |
| for t in threads: |
| t.start() |
| for t in threads: |
| t.join() |
| |
| def _writerThread(self, d, howMany, writerNum): |
| name = currentThread().getName() |
| start = howMany * writerNum |
| stop = howMany * (writerNum + 1) - 1 |
| if verbose: |
| print("%s: creating records %d - %d" % (name, start, stop)) |
| |
| # create a bunch of records |
| for x in range(start, stop): |
| key = ('%04d' % x).encode("ascii") |
| dbutils.DeadlockWrap(d.put, key, self.makeData(key), |
| max_retries=20) |
| |
| if verbose and x % 100 == 0: |
| print("%s: records %d - %d finished" % (name, start, x)) |
| |
| # do a bit or reading too |
| if random() <= 0.05: |
| for y in range(start, x): |
| key = ('%04d' % x).encode("ascii") |
| data = dbutils.DeadlockWrap(d.get, key, max_retries=20) |
| self.assertEqual(data, self.makeData(key)) |
| |
| # flush them |
| try: |
| dbutils.DeadlockWrap(d.sync, max_retries=20) |
| except db.DBIncompleteError as val: |
| if verbose: |
| print("could not complete sync()...") |
| |
| # read them back, deleting a few |
| for x in range(start, stop): |
| key = ('%04d' % x).encode("ascii") |
| data = dbutils.DeadlockWrap(d.get, key, max_retries=20) |
| if verbose and x % 100 == 0: |
| print("%s: fetched record (%s, %s)" % (name, key, data)) |
| self.assertEqual(data, self.makeData(key)) |
| if random() <= 0.10: |
| dbutils.DeadlockWrap(d.delete, key, max_retries=20) |
| if verbose: |
| print("%s: deleted record %s" % (name, key)) |
| |
| if verbose: |
| print("%s: thread finished" % name) |
| |
| def _readerThread(self, d, readerNum): |
| time.sleep(0.01 * readerNum) |
| name = currentThread().getName() |
| |
| for loop in range(5): |
| c = d.cursor() |
| count = 0 |
| rec = dbutils.DeadlockWrap(c.first, max_retries=20) |
| while rec: |
| count += 1 |
| key, data = rec |
| self.assertEqual(self.makeData(key), data) |
| rec = dbutils.DeadlockWrap(c.next, max_retries=20) |
| if verbose: |
| print("%s: found %d records" % (name, count)) |
| c.close() |
| time.sleep(0.05) |
| |
| if verbose: |
| print("%s: thread finished" % name) |
| |
| |
| class BTreeSimpleThreaded(SimpleThreadedBase): |
| dbtype = db.DB_BTREE |
| |
| |
| class HashSimpleThreaded(SimpleThreadedBase): |
| dbtype = db.DB_HASH |
| |
| |
| #---------------------------------------------------------------------- |
| |
| |
| class ThreadedTransactionsBase(BaseThreadedTestCase): |
| dbopenflags = db.DB_THREAD | db.DB_AUTO_COMMIT |
| envflags = (db.DB_THREAD | |
| db.DB_INIT_MPOOL | |
| db.DB_INIT_LOCK | |
| db.DB_INIT_LOG | |
| db.DB_INIT_TXN |
| ) |
| readers = 0 |
| writers = 0 |
| records = 2000 |
| txnFlag = 0 |
| |
| def setEnvOpts(self): |
| #self.env.set_lk_detect(db.DB_LOCK_DEFAULT) |
| pass |
| |
| def test03_ThreadedTransactions(self): |
| if verbose: |
| print('\n', '-=' * 30) |
| print("Running %s.test03_ThreadedTransactions..." % \ |
| self.__class__.__name__) |
| |
| threads = [] |
| for x in range(self.writers): |
| wt = Thread(target = self.writerThread, |
| args = (self.d, self.records, x), |
| name = 'writer %d' % x, |
| )#verbose = verbose) |
| threads.append(wt) |
| |
| for x in range(self.readers): |
| rt = Thread(target = self.readerThread, |
| args = (self.d, x), |
| name = 'reader %d' % x, |
| )#verbose = verbose) |
| threads.append(rt) |
| |
| dt = Thread(target = self.deadlockThread) |
| dt.start() |
| |
| for t in threads: |
| t.start() |
| for t in threads: |
| t.join() |
| |
| self.doLockDetect = False |
| dt.join() |
| |
| def doWrite(self, d, name, start, stop): |
| finished = False |
| while not finished: |
| try: |
| txn = self.env.txn_begin(None, self.txnFlag) |
| for x in range(start, stop): |
| key = ('%04d' % x).encode("ascii") |
| d.put(key, self.makeData(key), txn) |
| if verbose and x % 100 == 0: |
| print("%s: records %d - %d finished" % (name, start, x)) |
| txn.commit() |
| finished = True |
| except (db.DBLockDeadlockError, db.DBLockNotGrantedError) as val: |
| if verbose: |
| print("%s: Aborting transaction (%s)" % (name, val)) |
| txn.abort() |
| time.sleep(0.05) |
| |
| def _writerThread(self, d, howMany, writerNum): |
| name = currentThread().getName() |
| start = howMany * writerNum |
| stop = howMany * (writerNum + 1) - 1 |
| if verbose: |
| print("%s: creating records %d - %d" % (name, start, stop)) |
| |
| step = 100 |
| for x in range(start, stop, step): |
| self.doWrite(d, name, x, min(stop, x+step)) |
| |
| if verbose: |
| print("%s: finished creating records" % name) |
| if verbose: |
| print("%s: deleting a few records" % name) |
| |
| finished = False |
| while not finished: |
| try: |
| recs = [] |
| txn = self.env.txn_begin(None, self.txnFlag) |
| for x in range(10): |
| key = int(random() * howMany) + start |
| key = ('%04d' % key).encode("ascii") |
| data = d.get(key, None, txn, db.DB_RMW) |
| if data is not None: |
| d.delete(key, txn) |
| recs.append(key) |
| txn.commit() |
| finished = True |
| if verbose: |
| print("%s: deleted records %s" % (name, recs)) |
| except (db.DBLockDeadlockError, db.DBLockNotGrantedError) as val: |
| if verbose: |
| print("%s: Aborting transaction (%s)" % (name, val)) |
| txn.abort() |
| time.sleep(0.05) |
| |
| if verbose: |
| print("%s: thread finished" % name) |
| |
| def _readerThread(self, d, readerNum): |
| time.sleep(0.01 * readerNum + 0.05) |
| name = currentThread().getName() |
| |
| for loop in range(5): |
| finished = False |
| while not finished: |
| try: |
| txn = self.env.txn_begin(None, self.txnFlag) |
| c = d.cursor(txn) |
| count = 0 |
| rec = c.first() |
| while rec: |
| count += 1 |
| key, data = rec |
| self.assertEqual(self.makeData(key), data) |
| rec = c.next() |
| if verbose: print("%s: found %d records" % (name, count)) |
| c.close() |
| txn.commit() |
| finished = True |
| except (db.DBLockDeadlockError, db.DBLockNotGrantedError) as val: |
| if verbose: |
| print("%s: Aborting transaction (%s)" % (name, val)) |
| c.close() |
| txn.abort() |
| time.sleep(0.05) |
| |
| time.sleep(0.05) |
| |
| if verbose: |
| print("%s: thread finished" % name) |
| |
| def deadlockThread(self): |
| self.doLockDetect = True |
| while self.doLockDetect: |
| time.sleep(0.5) |
| try: |
| aborted = self.env.lock_detect( |
| db.DB_LOCK_RANDOM, db.DB_LOCK_CONFLICT) |
| if verbose and aborted: |
| print("deadlock: Aborted %d deadlocked transaction(s)" \ |
| % aborted) |
| except db.DBError: |
| pass |
| |
| |
| class BTreeThreadedTransactions(ThreadedTransactionsBase): |
| dbtype = db.DB_BTREE |
| writers = 3 |
| readers = 5 |
| records = 2000 |
| |
| class HashThreadedTransactions(ThreadedTransactionsBase): |
| dbtype = db.DB_HASH |
| writers = 1 |
| readers = 5 |
| records = 2000 |
| |
| class BTreeThreadedNoWaitTransactions(ThreadedTransactionsBase): |
| dbtype = db.DB_BTREE |
| writers = 3 |
| readers = 5 |
| records = 2000 |
| txnFlag = db.DB_TXN_NOWAIT |
| |
| class HashThreadedNoWaitTransactions(ThreadedTransactionsBase): |
| dbtype = db.DB_HASH |
| writers = 1 |
| readers = 5 |
| records = 2000 |
| txnFlag = db.DB_TXN_NOWAIT |
| |
| |
| #---------------------------------------------------------------------- |
| |
| def test_suite(): |
| suite = unittest.TestSuite() |
| |
| if have_threads: |
| suite.addTest(unittest.makeSuite(BTreeConcurrentDataStore)) |
| suite.addTest(unittest.makeSuite(HashConcurrentDataStore)) |
| suite.addTest(unittest.makeSuite(BTreeSimpleThreaded)) |
| suite.addTest(unittest.makeSuite(HashSimpleThreaded)) |
| suite.addTest(unittest.makeSuite(BTreeThreadedTransactions)) |
| suite.addTest(unittest.makeSuite(HashThreadedTransactions)) |
| suite.addTest(unittest.makeSuite(BTreeThreadedNoWaitTransactions)) |
| suite.addTest(unittest.makeSuite(HashThreadedNoWaitTransactions)) |
| |
| else: |
| print("Threads not available, skipping thread tests.") |
| |
| return suite |
| |
| |
| if __name__ == '__main__': |
| unittest.main(defaultTest='test_suite') |