ceph: fix iterate_caps removal race

We need to be able to iterate over all caps on a session with a
possibly slow callback on each cap.  To allow this, we used to
prevent cap reordering while we were iterating.  However, we were
not safe from races with removal: removing the 'next' cap would
make the next pointer from list_for_each_entry_safe be invalid,
and cause a lock up or similar badness.

Instead, we keep an iterator pointer in the session pointing to
the current cap.  As before, we avoid reordering.  For removal,
if the cap isn't the current cap we are iterating over, we are
fine.  If it is, we clear cap->ci (to mark the cap as pending
removal) but leave it in the session list.  In iterate_caps, we
can safely finish removal and get the next cap pointer.

While we're at it, clean up put_cap to not take a cap reservation
context, as it was never used.

Signed-off-by: Sage Weil <sage@newdream.net>
diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c
index f94b56f..4958a2e 100644
--- a/fs/ceph/caps.c
+++ b/fs/ceph/caps.c
@@ -266,12 +266,11 @@
 	return cap;
 }
 
-static void put_cap(struct ceph_cap *cap,
-		    struct ceph_cap_reservation *ctx)
+void ceph_put_cap(struct ceph_cap *cap)
 {
 	spin_lock(&caps_list_lock);
-	dout("put_cap ctx=%p (%d) %d = %d used + %d resv + %d avail\n",
-	     ctx, ctx ? ctx->count : 0, caps_total_count, caps_use_count,
+	dout("put_cap %p %d = %d used + %d resv + %d avail\n",
+	     cap, caps_total_count, caps_use_count,
 	     caps_reserve_count, caps_avail_count);
 	caps_use_count--;
 	/*
@@ -282,12 +281,7 @@
 		caps_total_count--;
 		kmem_cache_free(ceph_cap_cachep, cap);
 	} else {
-		if (ctx) {
-			ctx->count++;
-			caps_reserve_count++;
-		} else {
-			caps_avail_count++;
-		}
+		caps_avail_count++;
 		list_add(&cap->caps_item, &caps_list);
 	}
 
@@ -709,7 +703,7 @@
 	struct ceph_mds_session *s = cap->session;
 
 	spin_lock(&s->s_cap_lock);
-	if (!s->s_iterating_caps) {
+	if (s->s_cap_iterator == NULL) {
 		dout("__touch_cap %p cap %p mds%d\n", &cap->ci->vfs_inode, cap,
 		     s->s_mds);
 		list_move_tail(&cap->session_caps, &s->s_caps);
@@ -865,8 +859,7 @@
  * caller should hold i_lock, and session s_mutex.
  * returns true if this is the last cap.  if so, caller should iput.
  */
-void __ceph_remove_cap(struct ceph_cap *cap,
-		       struct ceph_cap_reservation *ctx)
+void __ceph_remove_cap(struct ceph_cap *cap)
 {
 	struct ceph_mds_session *session = cap->session;
 	struct ceph_inode_info *ci = cap->ci;
@@ -874,19 +867,27 @@
 
 	dout("__ceph_remove_cap %p from %p\n", cap, &ci->vfs_inode);
 
-	/* remove from session list */
-	spin_lock(&session->s_cap_lock);
-	list_del_init(&cap->session_caps);
-	session->s_nr_caps--;
-	spin_unlock(&session->s_cap_lock);
-
 	/* remove from inode list */
 	rb_erase(&cap->ci_node, &ci->i_caps);
-	cap->session = NULL;
+	cap->ci = NULL;
 	if (ci->i_auth_cap == cap)
 		ci->i_auth_cap = NULL;
 
-	put_cap(cap, ctx);
+	/* remove from session list */
+	spin_lock(&session->s_cap_lock);
+	if (session->s_cap_iterator == cap) {
+		/* not yet, we are iterating over this very cap */
+		dout("__ceph_remove_cap  delaying %p removal from session %p\n",
+		     cap, cap->session);
+	} else {
+		list_del_init(&cap->session_caps);
+		session->s_nr_caps--;
+		cap->session = NULL;
+	}
+	spin_unlock(&session->s_cap_lock);
+
+	if (cap->session == NULL)
+		ceph_put_cap(cap);
 
 	if (!__ceph_is_any_caps(ci) && ci->i_snap_realm) {
 		struct ceph_snap_realm *realm = ci->i_snap_realm;
@@ -1022,7 +1023,7 @@
 		}
 		spin_unlock(&session->s_cap_lock);
 		p = rb_next(p);
-		__ceph_remove_cap(cap, NULL);
+		__ceph_remove_cap(cap);
 
 	}
 	spin_unlock(&inode->i_lock);
@@ -2521,7 +2522,7 @@
 			ci->i_cap_exporting_mseq = mseq;
 			ci->i_cap_exporting_issued = cap->issued;
 		}
-		__ceph_remove_cap(cap, NULL);
+		__ceph_remove_cap(cap);
 	} else {
 		WARN_ON(!cap);
 	}