ceph: introduce a new inode flag indicating if cached dentries are ordered

After creating/deleting/renaming file, offsets of sibling dentries may
change. So we can not use cached dentries to satisfy readdir. But we can
still use the cached dentries to conclude -ENOENT for lookup.

This patch introduces a new inode flag indicating if child dentries are
ordered. The flag is set at the same time marking a directory complete.
After creating/deleting/renaming file, we clear the flag on directory
inode. This prevents ceph_readdir() from using cached dentries to satisfy
readdir syscall.

Signed-off-by: Yan, Zheng <zyan@redhat.com>
diff --git a/fs/ceph/dir.c b/fs/ceph/dir.c
index e6d63f8..6526199 100644
--- a/fs/ceph/dir.c
+++ b/fs/ceph/dir.c
@@ -183,7 +183,7 @@
 	spin_unlock(&parent->d_lock);
 
 	/* make sure a dentry wasn't dropped while we didn't have parent lock */
-	if (!ceph_dir_is_complete(dir)) {
+	if (!ceph_dir_is_complete_ordered(dir)) {
 		dout(" lost dir complete on %p; falling back to mds\n", dir);
 		dput(dentry);
 		err = -EAGAIN;
@@ -261,10 +261,6 @@
 
 	/* always start with . and .. */
 	if (ctx->pos == 0) {
-		/* note dir version at start of readdir so we can tell
-		 * if any dentries get dropped */
-		fi->dir_release_count = atomic_read(&ci->i_release_count);
-
 		dout("readdir off 0 -> '.'\n");
 		if (!dir_emit(ctx, ".", 1, 
 			    ceph_translate_ino(inode->i_sb, inode->i_ino),
@@ -289,7 +285,7 @@
 	if ((ctx->pos == 2 || fi->dentry) &&
 	    !ceph_test_mount_opt(fsc, NOASYNCREADDIR) &&
 	    ceph_snap(inode) != CEPH_SNAPDIR &&
-	    __ceph_dir_is_complete(ci) &&
+	    __ceph_dir_is_complete_ordered(ci) &&
 	    __ceph_caps_issued_mask(ci, CEPH_CAP_FILE_SHARED, 1)) {
 		u32 shared_gen = ci->i_shared_gen;
 		spin_unlock(&ci->i_ceph_lock);
@@ -312,6 +308,13 @@
 
 	/* proceed with a normal readdir */
 
+	if (ctx->pos == 2) {
+		/* note dir version at start of readdir so we can tell
+		 * if any dentries get dropped */
+		fi->dir_release_count = atomic_read(&ci->i_release_count);
+		fi->dir_ordered_count = ci->i_ordered_count;
+	}
+
 more:
 	/* do we have the correct frag content buffered? */
 	if (fi->frag != frag || fi->last_readdir == NULL) {
@@ -446,8 +449,12 @@
 	 */
 	spin_lock(&ci->i_ceph_lock);
 	if (atomic_read(&ci->i_release_count) == fi->dir_release_count) {
-		dout(" marking %p complete\n", inode);
-		__ceph_dir_set_complete(ci, fi->dir_release_count);
+		if (ci->i_ordered_count == fi->dir_ordered_count)
+			dout(" marking %p complete and ordered\n", inode);
+		else
+			dout(" marking %p complete\n", inode);
+		__ceph_dir_set_complete(ci, fi->dir_release_count,
+					fi->dir_ordered_count);
 	}
 	spin_unlock(&ci->i_ceph_lock);
 
diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c
index 7b61390..72607c1 100644
--- a/fs/ceph/inode.c
+++ b/fs/ceph/inode.c
@@ -389,6 +389,7 @@
 	ci->i_version = 0;
 	ci->i_time_warp_seq = 0;
 	ci->i_ceph_flags = 0;
+	ci->i_ordered_count = 0;
 	atomic_set(&ci->i_release_count, 1);
 	atomic_set(&ci->i_complete_count, 0);
 	ci->i_symlink = NULL;
@@ -845,7 +846,8 @@
 	    (issued & CEPH_CAP_FILE_EXCL) == 0 &&
 	    !__ceph_dir_is_complete(ci)) {
 		dout(" marking %p complete (empty)\n", inode);
-		__ceph_dir_set_complete(ci, atomic_read(&ci->i_release_count));
+		__ceph_dir_set_complete(ci, atomic_read(&ci->i_release_count),
+					ci->i_ordered_count);
 	}
 
 	/* were we issued a capability? */
@@ -1206,8 +1208,8 @@
 			ceph_invalidate_dentry_lease(dn);
 
 			/* d_move screws up sibling dentries' offsets */
-			ceph_dir_clear_complete(dir);
-			ceph_dir_clear_complete(olddir);
+			ceph_dir_clear_ordered(dir);
+			ceph_dir_clear_ordered(olddir);
 
 			dout("dn %p gets new offset %lld\n", req->r_old_dentry,
 			     ceph_dentry(req->r_old_dentry)->offset);
@@ -1219,6 +1221,7 @@
 		if (!rinfo->head->is_target) {
 			dout("fill_trace null dentry\n");
 			if (dn->d_inode) {
+				ceph_dir_clear_ordered(dir);
 				dout("d_delete %p\n", dn);
 				d_delete(dn);
 			} else {
@@ -1235,7 +1238,7 @@
 
 		/* attach proper inode */
 		if (!dn->d_inode) {
-			ceph_dir_clear_complete(dir);
+			ceph_dir_clear_ordered(dir);
 			ihold(in);
 			dn = splice_dentry(dn, in, &have_lease);
 			if (IS_ERR(dn)) {
@@ -1265,7 +1268,7 @@
 		BUG_ON(!dir);
 		BUG_ON(ceph_snap(dir) != CEPH_SNAPDIR);
 		dout(" linking snapped dir %p to dn %p\n", in, dn);
-		ceph_dir_clear_complete(dir);
+		ceph_dir_clear_ordered(dir);
 		ihold(in);
 		dn = splice_dentry(dn, in, NULL);
 		if (IS_ERR(dn)) {
diff --git a/fs/ceph/super.h b/fs/ceph/super.h
index b82f507..aca2287 100644
--- a/fs/ceph/super.h
+++ b/fs/ceph/super.h
@@ -256,6 +256,7 @@
 	u32 i_time_warp_seq;
 
 	unsigned i_ceph_flags;
+	int i_ordered_count;
 	atomic_t i_release_count;
 	atomic_t i_complete_count;
 
@@ -434,14 +435,19 @@
 /*
  * Ceph inode.
  */
-#define CEPH_I_NODELAY   4  /* do not delay cap release */
-#define CEPH_I_FLUSH     8  /* do not delay flush of dirty metadata */
-#define CEPH_I_NOFLUSH  16  /* do not flush dirty caps */
+#define CEPH_I_DIR_ORDERED	1  /* dentries in dir are ordered */
+#define CEPH_I_NODELAY		4  /* do not delay cap release */
+#define CEPH_I_FLUSH		8  /* do not delay flush of dirty metadata */
+#define CEPH_I_NOFLUSH		16 /* do not flush dirty caps */
 
 static inline void __ceph_dir_set_complete(struct ceph_inode_info *ci,
-					   int release_count)
+					   int release_count, int ordered_count)
 {
 	atomic_set(&ci->i_complete_count, release_count);
+	if (ci->i_ordered_count == ordered_count)
+		ci->i_ceph_flags |= CEPH_I_DIR_ORDERED;
+	else
+		ci->i_ceph_flags &= ~CEPH_I_DIR_ORDERED;
 }
 
 static inline void __ceph_dir_clear_complete(struct ceph_inode_info *ci)
@@ -455,16 +461,35 @@
 		atomic_read(&ci->i_release_count);
 }
 
+static inline bool __ceph_dir_is_complete_ordered(struct ceph_inode_info *ci)
+{
+	return __ceph_dir_is_complete(ci) &&
+		(ci->i_ceph_flags & CEPH_I_DIR_ORDERED);
+}
+
 static inline void ceph_dir_clear_complete(struct inode *inode)
 {
 	__ceph_dir_clear_complete(ceph_inode(inode));
 }
 
-static inline bool ceph_dir_is_complete(struct inode *inode)
+static inline void ceph_dir_clear_ordered(struct inode *inode)
 {
-	return __ceph_dir_is_complete(ceph_inode(inode));
+	struct ceph_inode_info *ci = ceph_inode(inode);
+	spin_lock(&ci->i_ceph_lock);
+	ci->i_ordered_count++;
+	ci->i_ceph_flags &= ~CEPH_I_DIR_ORDERED;
+	spin_unlock(&ci->i_ceph_lock);
 }
 
+static inline bool ceph_dir_is_complete_ordered(struct inode *inode)
+{
+	struct ceph_inode_info *ci = ceph_inode(inode);
+	bool ret;
+	spin_lock(&ci->i_ceph_lock);
+	ret = __ceph_dir_is_complete_ordered(ci);
+	spin_unlock(&ci->i_ceph_lock);
+	return ret;
+}
 
 /* find a specific frag @f */
 extern struct ceph_inode_frag *__ceph_find_frag(struct ceph_inode_info *ci,
@@ -580,6 +605,7 @@
 	char *last_name;       /* last entry in previous chunk */
 	struct dentry *dentry; /* next dentry (for dcache readdir) */
 	int dir_release_count;
+	int dir_ordered_count;
 
 	/* used for -o dirstat read() on directory thing */
 	char *dir_info;