ceph: re-send flushing caps (which are revoked) in reconnect stage

if flushing caps were revoked, we should re-send the cap flush in
client reconnect stage. This guarantees that MDS processes the cap
flush message before issuing the flushing caps to other client.

Signed-off-by: Yan, Zheng <zyan@redhat.com>
diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c
index 4202727..69a16044 100644
--- a/fs/ceph/caps.c
+++ b/fs/ceph/caps.c
@@ -1486,6 +1486,7 @@
 
 	cf = kmalloc(sizeof(*cf), GFP_ATOMIC);
 	cf->caps = flushing;
+	cf->kick = false;
 
 	spin_lock(&mdsc->cap_dirty_lock);
 	list_del_init(&ci->i_dirty_item);
@@ -2101,7 +2102,8 @@
 
 static int __kick_flushing_caps(struct ceph_mds_client *mdsc,
 				struct ceph_mds_session *session,
-				struct ceph_inode_info *ci)
+				struct ceph_inode_info *ci,
+				bool kick_all)
 {
 	struct inode *inode = &ci->vfs_inode;
 	struct ceph_cap *cap;
@@ -2127,7 +2129,9 @@
 
 		for (n = rb_first(&ci->i_cap_flush_tree); n; n = rb_next(n)) {
 			cf = rb_entry(n, struct ceph_cap_flush, i_node);
-			if (cf->tid >= first_tid)
+			if (cf->tid < first_tid)
+				continue;
+			if (kick_all || cf->kick)
 				break;
 		}
 		if (!n) {
@@ -2136,6 +2140,8 @@
 		}
 
 		cf = rb_entry(n, struct ceph_cap_flush, i_node);
+		cf->kick = false;
+
 		first_tid = cf->tid + 1;
 
 		dout("kick_flushing_caps %p cap %p tid %llu %s\n", inode,
@@ -2149,6 +2155,49 @@
 	return delayed;
 }
 
+void ceph_early_kick_flushing_caps(struct ceph_mds_client *mdsc,
+				   struct ceph_mds_session *session)
+{
+	struct ceph_inode_info *ci;
+	struct ceph_cap *cap;
+	struct ceph_cap_flush *cf;
+	struct rb_node *n;
+
+	dout("early_kick_flushing_caps mds%d\n", session->s_mds);
+	list_for_each_entry(ci, &session->s_cap_flushing, i_flushing_item) {
+		spin_lock(&ci->i_ceph_lock);
+		cap = ci->i_auth_cap;
+		if (!(cap && cap->session == session)) {
+			pr_err("%p auth cap %p not mds%d ???\n",
+				&ci->vfs_inode, cap, session->s_mds);
+			spin_unlock(&ci->i_ceph_lock);
+			continue;
+		}
+
+
+		/*
+		 * if flushing caps were revoked, we re-send the cap flush
+		 * in client reconnect stage. This guarantees MDS * processes
+		 * the cap flush message before issuing the flushing caps to
+		 * other client.
+		 */
+		if ((cap->issued & ci->i_flushing_caps) !=
+		    ci->i_flushing_caps) {
+			spin_unlock(&ci->i_ceph_lock);
+			if (!__kick_flushing_caps(mdsc, session, ci, true))
+				continue;
+			spin_lock(&ci->i_ceph_lock);
+		}
+
+		for (n = rb_first(&ci->i_cap_flush_tree); n; n = rb_next(n)) {
+			cf = rb_entry(n, struct ceph_cap_flush, i_node);
+			cf->kick = true;
+		}
+
+		spin_unlock(&ci->i_ceph_lock);
+	}
+}
+
 void ceph_kick_flushing_caps(struct ceph_mds_client *mdsc,
 			     struct ceph_mds_session *session)
 {
@@ -2158,7 +2207,7 @@
 
 	dout("kick_flushing_caps mds%d\n", session->s_mds);
 	list_for_each_entry(ci, &session->s_cap_flushing, i_flushing_item) {
-		int delayed = __kick_flushing_caps(mdsc, session, ci);
+		int delayed = __kick_flushing_caps(mdsc, session, ci, false);
 		if (delayed) {
 			spin_lock(&ci->i_ceph_lock);
 			__cap_delay_requeue(mdsc, ci);
@@ -2191,7 +2240,7 @@
 
 		spin_unlock(&ci->i_ceph_lock);
 
-		delayed = __kick_flushing_caps(mdsc, session, ci);
+		delayed = __kick_flushing_caps(mdsc, session, ci, true);
 		if (delayed) {
 			spin_lock(&ci->i_ceph_lock);
 			__cap_delay_requeue(mdsc, ci);
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index 31f6a78..89e4305 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -2982,6 +2982,9 @@
 
 	reply->hdr.data_len = cpu_to_le32(pagelist->length);
 	ceph_msg_data_add_pagelist(reply, pagelist);
+
+	ceph_early_kick_flushing_caps(mdsc, session);
+
 	ceph_con_send(&session->s_con, reply);
 
 	mutex_unlock(&session->s_mutex);
diff --git a/fs/ceph/super.h b/fs/ceph/super.h
index 94d9147..e7f13f7 100644
--- a/fs/ceph/super.h
+++ b/fs/ceph/super.h
@@ -189,9 +189,10 @@
 struct ceph_cap_flush {
 	u64 tid;
 	int caps;
-	struct rb_node g_node;
+	bool kick;
+	struct rb_node g_node; // global
 	union {
-		struct rb_node i_node;
+		struct rb_node i_node; // inode
 		struct list_head list;
 	};
 };
@@ -868,6 +869,8 @@
 extern int ceph_write_inode(struct inode *inode, struct writeback_control *wbc);
 extern int ceph_fsync(struct file *file, loff_t start, loff_t end,
 		      int datasync);
+extern void ceph_early_kick_flushing_caps(struct ceph_mds_client *mdsc,
+					  struct ceph_mds_session *session);
 extern void ceph_kick_flushing_caps(struct ceph_mds_client *mdsc,
 				    struct ceph_mds_session *session);
 extern struct ceph_cap *ceph_get_cap_for_mds(struct ceph_inode_info *ci,