ocfs2: Add a mount option "coherency=*" to handle cluster coherency for O_DIRECT writes.

Currently, the default behavior of O_DIRECT writes was allowing
concurrent writing among nodes to the same file, with no cluster
coherency guaranteed (no EX lock held).  This can leave stale data in
the cache for buffered reads on other nodes.

The new mount option introduce a chance to choose two different
behaviors for O_DIRECT writes:

    * coherency=full, as the default value, will disallow
                      concurrent O_DIRECT writes by taking
                      EX locks.

    * coherency=buffered, allow concurrent O_DIRECT writes
                          without EX lock among nodes, which
                          gains high performance at risk of
                          getting stale data on other nodes.

Signed-off-by: Tristan Ye <tristan.ye@oracle.com>
Signed-off-by: Joel Becker <joel.becker@oracle.com>
diff --git a/fs/ocfs2/file.c b/fs/ocfs2/file.c
index 13af993..9e8cc43 100644
--- a/fs/ocfs2/file.c
+++ b/fs/ocfs2/file.c
@@ -2225,6 +2225,8 @@
 	struct file *file = iocb->ki_filp;
 	struct inode *inode = file->f_path.dentry->d_inode;
 	struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
+	int full_coherency = !(osb->s_mount_opt &
+			       OCFS2_MOUNT_COHERENCY_BUFFERED);
 
 	mlog_entry("(0x%p, %u, '%.*s')\n", file,
 		   (unsigned int)nr_segs,
@@ -2248,14 +2250,37 @@
 		have_alloc_sem = 1;
 	}
 
-	/* concurrent O_DIRECT writes are allowed */
-	rw_level = !direct_io;
+	/*
+	 * Concurrent O_DIRECT writes are allowed with
+	 * mount_option "coherency=buffered".
+	 */
+	rw_level = (!direct_io || full_coherency);
+
 	ret = ocfs2_rw_lock(inode, rw_level);
 	if (ret < 0) {
 		mlog_errno(ret);
 		goto out_sems;
 	}
 
+	/*
+	 * O_DIRECT writes with "coherency=full" need to take EX cluster
+	 * inode_lock to guarantee coherency.
+	 */
+	if (direct_io && full_coherency) {
+		/*
+		 * We need to take and drop the inode lock to force
+		 * other nodes to drop their caches.  Buffered I/O
+		 * already does this in write_begin().
+		 */
+		ret = ocfs2_inode_lock(inode, NULL, 1);
+		if (ret < 0) {
+			mlog_errno(ret);
+			goto out_sems;
+		}
+
+		ocfs2_inode_unlock(inode, 1);
+	}
+
 	can_do_direct = direct_io;
 	ret = ocfs2_prepare_inode_for_write(file, ppos,
 					    iocb->ki_left, appending,