Improved reliability when saving "mirror" dict_database contents by
employing a fcntl.flock() based process synchronization to protect
against 2 or more processes updating the db at the same time. Also
switched to using a temporary file to protect against write failures
(like disk full) situations from corrupting the existent data. Updated 
unittest.

Signed-off-by: Mihai Rusu <dizzy@google.com>


git-svn-id: http://test.kernel.org/svn/autotest/trunk@3596 592f7852-d20e-0410-864c-8624ca9c26a4
diff --git a/mirror/database.py b/mirror/database.py
index 6d8afff..3cbcf6d 100644
--- a/mirror/database.py
+++ b/mirror/database.py
@@ -3,7 +3,8 @@
 # This file contains the classes used for the known kernel versions persistent
 # storage
 
-import cPickle
+import cPickle, fcntl, os, tempfile
+
 
 class item(object):
     """Wrap a file item stored in a database."""
@@ -76,12 +77,27 @@
             # no db file, considering as if empty dictionary
             res = {}
         else:
-            res = cPickle.load(fd)
+            try:
+                res = cPickle.load(fd)
+            finally:
+                fd.close()
 
         return res
 
 
-    def merge_dictionary(self, values, _open_func=open):
+    def _aquire_lock(self):
+        fd = os.open(self.path + '.lock', os.O_RDONLY | os.O_CREAT)
+        try:
+            # this may block
+            fcntl.flock(fd, fcntl.LOCK_EX)
+        except Exception, err:
+            os.close(fd)
+            raise err
+
+        return fd
+
+
+    def merge_dictionary(self, values):
         """
         Merge the contents of "values" with the current contents of the
         database.
@@ -89,8 +105,33 @@
         if not values:
             return
 
-        contents = self.get_dictionary()
-        contents.update(values)
-        # FIXME implement some kind of protection against full disk problem
-        cPickle.dump(contents, _open_func(self.path, 'wb'),
-                     protocol=cPickle.HIGHEST_PROTOCOL)
+        # use file locking to make the read/write of the file atomic
+        lock_fd = self._aquire_lock()
+
+        # make sure we release locks in case of exceptions (in case the
+        # process dies the OS will release them for us)
+        try:
+            contents = self.get_dictionary()
+            contents.update(values)
+
+            # use a tempfile/atomic-rename technique to not require
+            # synchronization for get_dictionary() calls and also protect
+            # against full disk file corruption situations
+            fd, fname = tempfile.mkstemp(prefix=os.path.basename(self.path),
+                                         dir=os.path.dirname(self.path))
+            write_file = os.fdopen(fd, 'wb')
+            try:
+                try:
+                    cPickle.dump(contents, write_file,
+                                 protocol=cPickle.HIGHEST_PROTOCOL)
+                finally:
+                    write_file.close()
+
+                # this is supposed to be atomic on POSIX systems
+                os.rename(fname, self.path)
+            except Exception:
+                os.unlink(fname)
+                raise
+        finally:
+            # close() releases any locks on that fd
+            os.close(lock_fd)
diff --git a/mirror/database_unittest.py b/mirror/database_unittest.py
index 45a9285..a3b6815 100644
--- a/mirror/database_unittest.py
+++ b/mirror/database_unittest.py
@@ -15,15 +15,18 @@
         'file2': database.item('file2', 20, 20000),
         }
 
-    class _file_instance:
-        pass
-
     def setUp(self):
         self.god = mock.mock_god()
 
         self.god.stub_function(database.cPickle, 'load')
         self.god.stub_function(database.cPickle, 'dump')
-        self.open_mock = self.god.create_mock_function('open')
+        self.god.stub_function(database.tempfile, 'mkstemp')
+        self.god.stub_function(database.os, 'fdopen')
+        self.god.stub_function(database.os, 'close')
+        self.god.stub_function(database.os, 'rename')
+        self.god.stub_function(database.os, 'unlink')
+        self._open_mock = self.god.create_mock_function('open')
+        self._file_instance = self.god.create_mock_class(file, 'file')
 
 
     def tearDown(self):
@@ -32,35 +35,37 @@
 
     def test_get_dictionary_no_file(self):
         # record
-        (self.open_mock.expect_call(self._path, 'rb')
+        (self._open_mock.expect_call(self._path, 'rb')
             .and_raises(IOError('blah')))
 
         # playback
         db = database.dict_database(self._path)
-        self.assertEqual(db.get_dictionary(_open_func=self.open_mock), {})
+        self.assertEqual(db.get_dictionary(_open_func=self._open_mock), {})
 
         self.god.check_playback()
 
 
     def test_get_dictionary(self):
         # record
-        (self.open_mock.expect_call(self._path, 'rb')
+        (self._open_mock.expect_call(self._path, 'rb')
             .and_return(self._file_instance))
         (database.cPickle.load.expect_call(self._file_instance)
             .and_return(self._db_contents))
+        self._file_instance.close.expect_call()
 
         # playback
         db = database.dict_database(self._path)
-        self.assertEqual(db.get_dictionary(_open_func=self.open_mock),
+        self.assertEqual(db.get_dictionary(_open_func=self._open_mock),
                          self._db_contents)
 
         self.god.check_playback()
 
 
-    def test_merge_dictionary(self):
+    def _setup_merge_dictionary(self):
         # setup
         db = database.dict_database(self._path)
         self.god.stub_function(db, 'get_dictionary')
+        self.god.stub_function(db, '_aquire_lock')
 
         new_files = {
             'file3': database.item('file3', 30, 30000),
@@ -70,15 +75,41 @@
         all_files.update(new_files)
 
         # record
+        db._aquire_lock.expect_call().and_return(3)
         db.get_dictionary.expect_call().and_return(self._db_contents)
+        (database.tempfile.mkstemp.expect_call(prefix=self._path, dir='')
+                .and_return((4, 'tmpfile')))
+        database.os.fdopen.expect_call(4, 'wb').and_return(self._file_instance)
 
-        (self.open_mock.expect_call(self._path, 'wb')
-            .and_return(self._file_instance))
+        return db, new_files, all_files
+
+
+    def test_merge_dictionary(self):
+        db, new_files, all_files = self._setup_merge_dictionary()
+
         database.cPickle.dump.expect_call(all_files, self._file_instance,
-                                               protocol=2)
+                protocol=database.cPickle.HIGHEST_PROTOCOL)
+        self._file_instance.close.expect_call()
+        database.os.rename.expect_call('tmpfile', self._path)
+        database.os.close.expect_call(3)
 
         # playback
-        db.merge_dictionary(new_files, _open_func=self.open_mock)
+        db.merge_dictionary(new_files)
+        self.god.check_playback()
+
+
+    def test_merge_dictionary_disk_full(self):
+        err = Exception('fail')
+        db, new_files, all_files = self._setup_merge_dictionary()
+
+        database.cPickle.dump.expect_call(all_files, self._file_instance,
+                protocol=database.cPickle.HIGHEST_PROTOCOL).and_raises(err)
+        self._file_instance.close.expect_call().and_raises(err)
+        database.os.unlink.expect_call('tmpfile')
+        database.os.close.expect_call(3)
+
+        # playback
+        self.assertRaises(Exception, db.merge_dictionary, new_files)
         self.god.check_playback()