ocfs2: take data locks around extend

We need to take a data lock around extends to protect the pages that
ocfs2_zero_extend is going to be pulling into the page cache. Otherwise an
extend on one node might populate the page cache with data pages that have
no lock coverage.

Signed-off-by: Mark Fasheh <mark.fasheh@oracle.com>
diff --git a/fs/ocfs2/aops.c b/fs/ocfs2/aops.c
index 0d858d0..47152bf 100644
--- a/fs/ocfs2/aops.c
+++ b/fs/ocfs2/aops.c
@@ -276,13 +276,29 @@
 	return ret;
 }
 
+/* This can also be called from ocfs2_write_zero_page() which has done
+ * it's own cluster locking. */
+int ocfs2_prepare_write_nolock(struct inode *inode, struct page *page,
+			       unsigned from, unsigned to)
+{
+	int ret;
+
+	down_read(&OCFS2_I(inode)->ip_alloc_sem);
+
+	ret = block_prepare_write(page, from, to, ocfs2_get_block);
+
+	up_read(&OCFS2_I(inode)->ip_alloc_sem);
+
+	return ret;
+}
+
 /*
  * ocfs2_prepare_write() can be an outer-most ocfs2 call when it is called
  * from loopback.  It must be able to perform its own locking around
  * ocfs2_get_block().
  */
-int ocfs2_prepare_write(struct file *file, struct page *page,
-			unsigned from, unsigned to)
+static int ocfs2_prepare_write(struct file *file, struct page *page,
+			       unsigned from, unsigned to)
 {
 	struct inode *inode = page->mapping->host;
 	int ret;
@@ -295,11 +311,7 @@
 		goto out;
 	}
 
-	down_read(&OCFS2_I(inode)->ip_alloc_sem);
-
-	ret = block_prepare_write(page, from, to, ocfs2_get_block);
-
-	up_read(&OCFS2_I(inode)->ip_alloc_sem);
+	ret = ocfs2_prepare_write_nolock(inode, page, from, to);
 
 	ocfs2_meta_unlock(inode, 0);
 out:
@@ -625,11 +637,31 @@
 	int ret;
 
 	mlog_entry_void();
+
+	/*
+	 * We get PR data locks even for O_DIRECT.  This allows
+	 * concurrent O_DIRECT I/O but doesn't let O_DIRECT with
+	 * extending and buffered zeroing writes race.  If they did
+	 * race then the buffered zeroing could be written back after
+	 * the O_DIRECT I/O.  It's one thing to tell people not to mix
+	 * buffered and O_DIRECT writes, but expecting them to
+	 * understand that file extension is also an implicit buffered
+	 * write is too much.  By getting the PR we force writeback of
+	 * the buffered zeroing before proceeding.
+	 */
+	ret = ocfs2_data_lock(inode, 0);
+	if (ret < 0) {
+		mlog_errno(ret);
+		goto out;
+	}
+	ocfs2_data_unlock(inode, 0);
+
 	ret = blockdev_direct_IO_no_locking(rw, iocb, inode,
 					    inode->i_sb->s_bdev, iov, offset,
 					    nr_segs, 
 					    ocfs2_direct_IO_get_blocks,
 					    ocfs2_dio_end_io);
+out:
 	mlog_exit(ret);
 	return ret;
 }
diff --git a/fs/ocfs2/aops.h b/fs/ocfs2/aops.h
index d40456d..e88c3f0 100644
--- a/fs/ocfs2/aops.h
+++ b/fs/ocfs2/aops.h
@@ -22,8 +22,8 @@
 #ifndef OCFS2_AOPS_H
 #define OCFS2_AOPS_H
 
-int ocfs2_prepare_write(struct file *file, struct page *page,
-			unsigned from, unsigned to);
+int ocfs2_prepare_write_nolock(struct inode *inode, struct page *page,
+			       unsigned from, unsigned to);
 
 struct ocfs2_journal_handle *ocfs2_start_walk_page_trans(struct inode *inode,
 							 struct page *page,
diff --git a/fs/ocfs2/file.c b/fs/ocfs2/file.c
index 581eb45..20fffeed 100644
--- a/fs/ocfs2/file.c
+++ b/fs/ocfs2/file.c
@@ -613,7 +613,8 @@
 
 /* Some parts of this taken from generic_cont_expand, which turned out
  * to be too fragile to do exactly what we need without us having to
- * worry about recursive locking in ->commit_write(). */
+ * worry about recursive locking in ->prepare_write() and
+ * ->commit_write(). */
 static int ocfs2_write_zero_page(struct inode *inode,
 				 u64 size)
 {
@@ -641,7 +642,7 @@
 		goto out;
 	}
 
-	ret = ocfs2_prepare_write(NULL, page, offset, offset);
+	ret = ocfs2_prepare_write_nolock(inode, page, offset, offset);
 	if (ret < 0) {
 		mlog_errno(ret);
 		goto out_unlock;
@@ -695,13 +696,26 @@
 	return ret;
 }
 
+/* 
+ * A tail_to_skip value > 0 indicates that we're being called from
+ * ocfs2_file_aio_write(). This has the following implications:
+ *
+ * - we don't want to update i_size
+ * - di_bh will be NULL, which is fine because it's only used in the
+ *   case where we want to update i_size.
+ * - ocfs2_zero_extend() will then only be filling the hole created
+ *   between i_size and the start of the write.
+ */
 static int ocfs2_extend_file(struct inode *inode,
 			     struct buffer_head *di_bh,
-			     u64 new_i_size)
+			     u64 new_i_size,
+			     size_t tail_to_skip)
 {
 	int ret = 0;
 	u32 clusters_to_add;
 
+	BUG_ON(!tail_to_skip && !di_bh);
+
 	/* setattr sometimes calls us like this. */
 	if (new_i_size == 0)
 		goto out;
@@ -714,27 +728,44 @@
 		OCFS2_I(inode)->ip_clusters;
 
 	if (clusters_to_add) {
+		/* 
+		 * protect the pages that ocfs2_zero_extend is going to
+		 * be pulling into the page cache.. we do this before the
+		 * metadata extend so that we don't get into the situation
+		 * where we've extended the metadata but can't get the data
+		 * lock to zero.
+		 */
+		ret = ocfs2_data_lock(inode, 1);
+		if (ret < 0) {
+			mlog_errno(ret);
+			goto out;
+		}
+
 		ret = ocfs2_extend_allocation(inode, clusters_to_add);
 		if (ret < 0) {
 			mlog_errno(ret);
-			goto out;
+			goto out_unlock;
 		}
 
-		ret = ocfs2_zero_extend(inode, new_i_size);
+		ret = ocfs2_zero_extend(inode, (u64)new_i_size - tail_to_skip);
 		if (ret < 0) {
 			mlog_errno(ret);
-			goto out;
+			goto out_unlock;
 		}
-	} 
-
-	/* No allocation required, we just use this helper to
-	 * do a trivial update of i_size. */
-	ret = ocfs2_simple_size_update(inode, di_bh, new_i_size);
-	if (ret < 0) {
-		mlog_errno(ret);
-		goto out;
 	}
 
+	if (!tail_to_skip) {
+		/* We're being called from ocfs2_setattr() which wants
+		 * us to update i_size */
+		ret = ocfs2_simple_size_update(inode, di_bh, new_i_size);
+		if (ret < 0)
+			mlog_errno(ret);
+	}
+
+out_unlock:
+	if (clusters_to_add) /* this is the only case in which we lock */
+		ocfs2_data_unlock(inode, 1);
+
 out:
 	return ret;
 }
@@ -793,7 +824,7 @@
 		if (i_size_read(inode) > attr->ia_size)
 			status = ocfs2_truncate_file(inode, bh, attr->ia_size);
 		else
-			status = ocfs2_extend_file(inode, bh, attr->ia_size);
+			status = ocfs2_extend_file(inode, bh, attr->ia_size, 0);
 		if (status < 0) {
 			if (status != -ENOSPC)
 				mlog_errno(status);
@@ -1049,21 +1080,12 @@
 		if (!clusters)
 			break;
 
-		ret = ocfs2_extend_allocation(inode, clusters);
+		ret = ocfs2_extend_file(inode, NULL, newsize, count);
 		if (ret < 0) {
 			if (ret != -ENOSPC)
 				mlog_errno(ret);
 			goto out;
 		}
-
-		/* Fill any holes which would've been created by this
-		 * write. If we're O_APPEND, this will wind up
-		 * (correctly) being a noop. */
-		ret = ocfs2_zero_extend(inode, (u64) newsize - count);
-		if (ret < 0) {
-			mlog_errno(ret);
-			goto out;
-		}
 		break;
 	}