ceph: fix directory fsync

fsync() on directory should flush dirty caps and wait for any
uncommitted directory opertions to commit. But ceph_dir_fsync()
only waits for uncommitted directory opertions.

Signed-off-by: Yan, Zheng <zyan@redhat.com>
diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c
index e9b03b5..dc98833 100644
--- a/fs/ceph/caps.c
+++ b/fs/ceph/caps.c
@@ -1838,13 +1838,16 @@
 	struct ceph_osd_request *req;
 	u64 last_tid;
 
+	if (!S_ISREG(inode->i_mode))
+		return;
+
 	spin_lock(&ci->i_unsafe_lock);
 	if (list_empty(head))
 		goto out;
 
 	/* set upper bound as _last_ entry in chain */
-	req = list_entry(head->prev, struct ceph_osd_request,
-			 r_unsafe_item);
+	req = list_last_entry(head, struct ceph_osd_request,
+			      r_unsafe_item);
 	last_tid = req->r_tid;
 
 	do {
@@ -1862,13 +1865,59 @@
 		 */
 		if (list_empty(head))
 			break;
-		req = list_entry(head->next, struct ceph_osd_request,
-				 r_unsafe_item);
+		req = list_first_entry(head, struct ceph_osd_request,
+				       r_unsafe_item);
 	} while (req->r_tid < last_tid);
 out:
 	spin_unlock(&ci->i_unsafe_lock);
 }
 
+/*
+ * wait for any uncommitted directory operations to commit.
+ */
+static int unsafe_dirop_wait(struct inode *inode)
+{
+	struct ceph_inode_info *ci = ceph_inode(inode);
+	struct list_head *head = &ci->i_unsafe_dirops;
+	struct ceph_mds_request *req;
+	u64 last_tid;
+	int ret = 0;
+
+	if (!S_ISDIR(inode->i_mode))
+		return 0;
+
+	spin_lock(&ci->i_unsafe_lock);
+	if (list_empty(head))
+		goto out;
+
+	req = list_last_entry(head, struct ceph_mds_request,
+			      r_unsafe_dir_item);
+	last_tid = req->r_tid;
+
+	do {
+		ceph_mdsc_get_request(req);
+		spin_unlock(&ci->i_unsafe_lock);
+
+		dout("unsafe_dirop_wait %p wait on tid %llu (until %llu)\n",
+		     inode, req->r_tid, last_tid);
+		ret = !wait_for_completion_timeout(&req->r_safe_completion,
+					ceph_timeout_jiffies(req->r_timeout));
+		if (ret)
+			ret = -EIO;  /* timed out */
+
+		ceph_mdsc_put_request(req);
+
+		spin_lock(&ci->i_unsafe_lock);
+		if (ret || list_empty(head))
+			break;
+		req = list_first_entry(head, struct ceph_mds_request,
+				       r_unsafe_dir_item);
+	} while (req->r_tid < last_tid);
+out:
+	spin_unlock(&ci->i_unsafe_lock);
+	return ret;
+}
+
 int ceph_fsync(struct file *file, loff_t start, loff_t end, int datasync)
 {
 	struct inode *inode = file->f_mapping->host;
@@ -1882,24 +1931,30 @@
 
 	ret = filemap_write_and_wait_range(inode->i_mapping, start, end);
 	if (ret < 0)
-		return ret;
+		goto out;
+
+	if (datasync)
+		goto out;
+
 	mutex_lock(&inode->i_mutex);
 
 	dirty = try_flush_caps(inode, flush_tid);
 	dout("fsync dirty caps are %s\n", ceph_cap_string(dirty));
 
+	ret = unsafe_dirop_wait(inode);
+
 	/*
 	 * only wait on non-file metadata writeback (the mds
 	 * can recover size and mtime, so we don't need to
 	 * wait for that)
 	 */
-	if (!datasync && (dirty & ~CEPH_CAP_ANY_FILE_WR)) {
+	if (!ret && (dirty & ~CEPH_CAP_ANY_FILE_WR)) {
 		ret = wait_event_interruptible(ci->i_cap_wq,
-				       caps_are_flushed(inode, flush_tid));
+					caps_are_flushed(inode, flush_tid));
 	}
-
-	dout("fsync %p%s done\n", inode, datasync ? " datasync" : "");
 	mutex_unlock(&inode->i_mutex);
+out:
+	dout("fsync %p%s result=%d\n", inode, datasync ? " datasync" : "", ret);
 	return ret;
 }
 
diff --git a/fs/ceph/dir.c b/fs/ceph/dir.c
index 3dec27e..424e231 100644
--- a/fs/ceph/dir.c
+++ b/fs/ceph/dir.c
@@ -1224,60 +1224,6 @@
 }
 
 /*
- * an fsync() on a dir will wait for any uncommitted directory
- * operations to commit.
- */
-static int ceph_dir_fsync(struct file *file, loff_t start, loff_t end,
-			  int datasync)
-{
-	struct inode *inode = file_inode(file);
-	struct ceph_inode_info *ci = ceph_inode(inode);
-	struct list_head *head = &ci->i_unsafe_dirops;
-	struct ceph_mds_request *req;
-	u64 last_tid;
-	int ret = 0;
-
-	dout("dir_fsync %p\n", inode);
-	ret = filemap_write_and_wait_range(inode->i_mapping, start, end);
-	if (ret)
-		return ret;
-	mutex_lock(&inode->i_mutex);
-
-	spin_lock(&ci->i_unsafe_lock);
-	if (list_empty(head))
-		goto out;
-
-	req = list_entry(head->prev,
-			 struct ceph_mds_request, r_unsafe_dir_item);
-	last_tid = req->r_tid;
-
-	do {
-		ceph_mdsc_get_request(req);
-		spin_unlock(&ci->i_unsafe_lock);
-
-		dout("dir_fsync %p wait on tid %llu (until %llu)\n",
-		     inode, req->r_tid, last_tid);
-		ret = !wait_for_completion_timeout(&req->r_safe_completion,
-					ceph_timeout_jiffies(req->r_timeout));
-		if (ret)
-			ret = -EIO;  /* timed out */
-
-		ceph_mdsc_put_request(req);
-
-		spin_lock(&ci->i_unsafe_lock);
-		if (ret || list_empty(head))
-			break;
-		req = list_entry(head->next,
-				 struct ceph_mds_request, r_unsafe_dir_item);
-	} while (req->r_tid < last_tid);
-out:
-	spin_unlock(&ci->i_unsafe_lock);
-	mutex_unlock(&inode->i_mutex);
-
-	return ret;
-}
-
-/*
  * We maintain a private dentry LRU.
  *
  * FIXME: this needs to be changed to a per-mds lru to be useful.
@@ -1347,7 +1293,7 @@
 	.open = ceph_open,
 	.release = ceph_release,
 	.unlocked_ioctl = ceph_ioctl,
-	.fsync = ceph_dir_fsync,
+	.fsync = ceph_fsync,
 };
 
 const struct file_operations ceph_snapdir_fops = {