ceph: only send one flushsnap per cap_snap per mds session

Sending multiple flushsnap messages is problematic because we ignore
the response if the tid doesn't match, and the server may only respond to
each one once.  It's also a waste.

So, skip cap_snaps that are already on the flushing list, unless the caller
tells us to resend (because we are reconnecting).

Signed-off-by: Sage Weil <sage@newdream.net>
diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c
index 9fbe901..b01c316 100644
--- a/fs/ceph/caps.c
+++ b/fs/ceph/caps.c
@@ -1195,10 +1195,14 @@
  * asynchronously back to the MDS once sync writes complete and dirty
  * data is written out.
  *
+ * Unless @again is true, skip cap_snaps that were already sent to
+ * the MDS (i.e., during this session).
+ *
  * Called under i_lock.  Takes s_mutex as needed.
  */
 void __ceph_flush_snaps(struct ceph_inode_info *ci,
-			struct ceph_mds_session **psession)
+			struct ceph_mds_session **psession,
+			int again)
 		__releases(ci->vfs_inode->i_lock)
 		__acquires(ci->vfs_inode->i_lock)
 {
@@ -1240,6 +1244,13 @@
 			dout("no auth cap (migrating?), doing nothing\n");
 			goto out;
 		}
+
+		/* only flush each capsnap once */
+		if (!again && !list_empty(&capsnap->flushing_item)) {
+			dout("already flushed %p, skipping\n", capsnap);
+			continue;
+		}
+
 		mds = ci->i_auth_cap->session->s_mds;
 		mseq = ci->i_auth_cap->mseq;
 
@@ -1314,7 +1325,7 @@
 	struct inode *inode = &ci->vfs_inode;
 
 	spin_lock(&inode->i_lock);
-	__ceph_flush_snaps(ci, NULL);
+	__ceph_flush_snaps(ci, NULL, 0);
 	spin_unlock(&inode->i_lock);
 }
 
@@ -1477,7 +1488,7 @@
 
 	/* flush snaps first time around only */
 	if (!list_empty(&ci->i_cap_snaps))
-		__ceph_flush_snaps(ci, &session);
+		__ceph_flush_snaps(ci, &session, 0);
 	goto retry_locked;
 retry:
 	spin_lock(&inode->i_lock);
@@ -1894,7 +1905,7 @@
 		if (cap && cap->session == session) {
 			dout("kick_flushing_caps %p cap %p capsnap %p\n", inode,
 			     cap, capsnap);
-			__ceph_flush_snaps(ci, &session);
+			__ceph_flush_snaps(ci, &session, 1);
 		} else {
 			pr_err("%p auth cap %p not mds%d ???\n", inode,
 			       cap, session->s_mds);
diff --git a/fs/ceph/snap.c b/fs/ceph/snap.c
index 9e6eef1..190b6c4 100644
--- a/fs/ceph/snap.c
+++ b/fs/ceph/snap.c
@@ -717,7 +717,7 @@
 		igrab(inode);
 		spin_unlock(&mdsc->snap_flush_lock);
 		spin_lock(&inode->i_lock);
-		__ceph_flush_snaps(ci, &session);
+		__ceph_flush_snaps(ci, &session, 0);
 		spin_unlock(&inode->i_lock);
 		iput(inode);
 		spin_lock(&mdsc->snap_flush_lock);
diff --git a/fs/ceph/super.h b/fs/ceph/super.h
index c80bfbe..b87638e 100644
--- a/fs/ceph/super.h
+++ b/fs/ceph/super.h
@@ -828,7 +828,8 @@
 extern void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr,
 				       struct ceph_snap_context *snapc);
 extern void __ceph_flush_snaps(struct ceph_inode_info *ci,
-			       struct ceph_mds_session **psession);
+			       struct ceph_mds_session **psession,
+			       int again);
 extern void ceph_check_caps(struct ceph_inode_info *ci, int flags,
 			    struct ceph_mds_session *session);
 extern void ceph_check_delayed_caps(struct ceph_mds_client *mdsc);