Merge branch 'for-linus-bugs' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client

Pull ceph bug-fixes from Sage Weil:
 "These include a couple fixes to the new fscache code that went in
  during the last cycle (which will need to go stable@ shortly as well),
  a couple client-side directory fragmentation fixes, a fix for a race
  in the cap release queuing path, and a couple race fixes in the
  request abort and resend code.

  Obviously some of this could have gone into 3.12 final, but I
  preferred to overtest rather than send things in for a late -rc, and
  then my travel schedule intervened"

* 'for-linus-bugs' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client:
  ceph: allocate non-zero page to fscache in readpage()
  ceph: wake up 'safe' waiters when unregistering request
  ceph: cleanup aborted requests when re-sending requests.
  ceph: handle race between cap reconnect and cap release
  ceph: set caps count after composing cap reconnect message
  ceph: queue cap release in __ceph_remove_cap()
  ceph: handle frag mismatch between readdir request and reply
  ceph: remove outdated frag information
  ceph: hung on ceph fscache invalidate in some cases
diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c
index 6df8bd4..1e561c0 100644
--- a/fs/ceph/addr.c
+++ b/fs/ceph/addr.c
@@ -216,7 +216,7 @@
 	}
 	SetPageUptodate(page);
 
-	if (err == 0)
+	if (err >= 0)
 		ceph_readpage_to_fscache(inode, page);
 
 out:
diff --git a/fs/ceph/cache.c b/fs/ceph/cache.c
index 7db2e6c..8c44fdd 100644
--- a/fs/ceph/cache.c
+++ b/fs/ceph/cache.c
@@ -324,6 +324,9 @@
 {
 	struct ceph_inode_info *ci = ceph_inode(inode);
 
+	if (!PageFsCache(page))
+		return;
+
 	fscache_wait_on_page_write(ci->fscache, page);
 	fscache_uncache_page(ci->fscache, page);
 }
diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c
index 13976c3..3c0a4bd 100644
--- a/fs/ceph/caps.c
+++ b/fs/ceph/caps.c
@@ -897,7 +897,7 @@
  * caller should hold i_ceph_lock.
  * caller will not hold session s_mutex if called from destroy_inode.
  */
-void __ceph_remove_cap(struct ceph_cap *cap)
+void __ceph_remove_cap(struct ceph_cap *cap, bool queue_release)
 {
 	struct ceph_mds_session *session = cap->session;
 	struct ceph_inode_info *ci = cap->ci;
@@ -909,6 +909,16 @@
 
 	/* remove from session list */
 	spin_lock(&session->s_cap_lock);
+	/*
+	 * s_cap_reconnect is protected by s_cap_lock. no one changes
+	 * s_cap_gen while session is in the reconnect state.
+	 */
+	if (queue_release &&
+	    (!session->s_cap_reconnect ||
+	     cap->cap_gen == session->s_cap_gen))
+		__queue_cap_release(session, ci->i_vino.ino, cap->cap_id,
+				    cap->mseq, cap->issue_seq);
+
 	if (session->s_cap_iterator == cap) {
 		/* not yet, we are iterating over this very cap */
 		dout("__ceph_remove_cap  delaying %p removal from session %p\n",
@@ -1023,7 +1033,6 @@
 	struct ceph_mds_cap_release *head;
 	struct ceph_mds_cap_item *item;
 
-	spin_lock(&session->s_cap_lock);
 	BUG_ON(!session->s_num_cap_releases);
 	msg = list_first_entry(&session->s_cap_releases,
 			       struct ceph_msg, list_head);
@@ -1052,7 +1061,6 @@
 		     (int)CEPH_CAPS_PER_RELEASE,
 		     (int)msg->front.iov_len);
 	}
-	spin_unlock(&session->s_cap_lock);
 }
 
 /*
@@ -1067,12 +1075,8 @@
 	p = rb_first(&ci->i_caps);
 	while (p) {
 		struct ceph_cap *cap = rb_entry(p, struct ceph_cap, ci_node);
-		struct ceph_mds_session *session = cap->session;
-
-		__queue_cap_release(session, ceph_ino(inode), cap->cap_id,
-				    cap->mseq, cap->issue_seq);
 		p = rb_next(p);
-		__ceph_remove_cap(cap);
+		__ceph_remove_cap(cap, true);
 	}
 }
 
@@ -2791,7 +2795,7 @@
 			}
 			spin_unlock(&mdsc->cap_dirty_lock);
 		}
-		__ceph_remove_cap(cap);
+		__ceph_remove_cap(cap, false);
 	}
 	/* else, we already released it */
 
@@ -2931,9 +2935,12 @@
 	if (!inode) {
 		dout(" i don't have ino %llx\n", vino.ino);
 
-		if (op == CEPH_CAP_OP_IMPORT)
+		if (op == CEPH_CAP_OP_IMPORT) {
+			spin_lock(&session->s_cap_lock);
 			__queue_cap_release(session, vino.ino, cap_id,
 					    mseq, seq);
+			spin_unlock(&session->s_cap_lock);
+		}
 		goto flush_cap_releases;
 	}
 
diff --git a/fs/ceph/dir.c b/fs/ceph/dir.c
index 868b61d..2a0bcae 100644
--- a/fs/ceph/dir.c
+++ b/fs/ceph/dir.c
@@ -352,8 +352,18 @@
 		}
 
 		/* note next offset and last dentry name */
+		rinfo = &req->r_reply_info;
+		if (le32_to_cpu(rinfo->dir_dir->frag) != frag) {
+			frag = le32_to_cpu(rinfo->dir_dir->frag);
+			if (ceph_frag_is_leftmost(frag))
+				fi->next_offset = 2;
+			else
+				fi->next_offset = 0;
+			off = fi->next_offset;
+		}
 		fi->offset = fi->next_offset;
 		fi->last_readdir = req;
+		fi->frag = frag;
 
 		if (req->r_reply_info.dir_end) {
 			kfree(fi->last_name);
@@ -363,7 +373,6 @@
 			else
 				fi->next_offset = 0;
 		} else {
-			rinfo = &req->r_reply_info;
 			err = note_last_dentry(fi,
 				       rinfo->dir_dname[rinfo->dir_nr-1],
 				       rinfo->dir_dname_len[rinfo->dir_nr-1]);
diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c
index 8549a48..9a8e396 100644
--- a/fs/ceph/inode.c
+++ b/fs/ceph/inode.c
@@ -577,6 +577,8 @@
 	int issued = 0, implemented;
 	struct timespec mtime, atime, ctime;
 	u32 nsplits;
+	struct ceph_inode_frag *frag;
+	struct rb_node *rb_node;
 	struct ceph_buffer *xattr_blob = NULL;
 	int err = 0;
 	int queue_trunc = 0;
@@ -751,15 +753,38 @@
 	/* FIXME: move me up, if/when version reflects fragtree changes */
 	nsplits = le32_to_cpu(info->fragtree.nsplits);
 	mutex_lock(&ci->i_fragtree_mutex);
+	rb_node = rb_first(&ci->i_fragtree);
 	for (i = 0; i < nsplits; i++) {
 		u32 id = le32_to_cpu(info->fragtree.splits[i].frag);
-		struct ceph_inode_frag *frag = __get_or_create_frag(ci, id);
-
-		if (IS_ERR(frag))
-			continue;
+		frag = NULL;
+		while (rb_node) {
+			frag = rb_entry(rb_node, struct ceph_inode_frag, node);
+			if (ceph_frag_compare(frag->frag, id) >= 0) {
+				if (frag->frag != id)
+					frag = NULL;
+				else
+					rb_node = rb_next(rb_node);
+				break;
+			}
+			rb_node = rb_next(rb_node);
+			rb_erase(&frag->node, &ci->i_fragtree);
+			kfree(frag);
+			frag = NULL;
+		}
+		if (!frag) {
+			frag = __get_or_create_frag(ci, id);
+			if (IS_ERR(frag))
+				continue;
+		}
 		frag->split_by = le32_to_cpu(info->fragtree.splits[i].by);
 		dout(" frag %x split by %d\n", frag->frag, frag->split_by);
 	}
+	while (rb_node) {
+		frag = rb_entry(rb_node, struct ceph_inode_frag, node);
+		rb_node = rb_next(rb_node);
+		rb_erase(&frag->node, &ci->i_fragtree);
+		kfree(frag);
+	}
 	mutex_unlock(&ci->i_fragtree_mutex);
 
 	/* were we issued a capability? */
@@ -1250,8 +1275,20 @@
 	int err = 0, i;
 	struct inode *snapdir = NULL;
 	struct ceph_mds_request_head *rhead = req->r_request->front.iov_base;
-	u64 frag = le32_to_cpu(rhead->args.readdir.frag);
 	struct ceph_dentry_info *di;
+	u64 r_readdir_offset = req->r_readdir_offset;
+	u32 frag = le32_to_cpu(rhead->args.readdir.frag);
+
+	if (rinfo->dir_dir &&
+	    le32_to_cpu(rinfo->dir_dir->frag) != frag) {
+		dout("readdir_prepopulate got new frag %x -> %x\n",
+		     frag, le32_to_cpu(rinfo->dir_dir->frag));
+		frag = le32_to_cpu(rinfo->dir_dir->frag);
+		if (ceph_frag_is_leftmost(frag))
+			r_readdir_offset = 2;
+		else
+			r_readdir_offset = 0;
+	}
 
 	if (req->r_aborted)
 		return readdir_prepopulate_inodes_only(req, session);
@@ -1315,7 +1352,7 @@
 		}
 
 		di = dn->d_fsdata;
-		di->offset = ceph_make_fpos(frag, i + req->r_readdir_offset);
+		di->offset = ceph_make_fpos(frag, i + r_readdir_offset);
 
 		/* inode */
 		if (dn->d_inode) {
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index b7bda5d..d90861f 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -43,6 +43,7 @@
  */
 
 struct ceph_reconnect_state {
+	int nr_caps;
 	struct ceph_pagelist *pagelist;
 	bool flock;
 };
@@ -443,6 +444,7 @@
 	INIT_LIST_HEAD(&s->s_waiting);
 	INIT_LIST_HEAD(&s->s_unsafe);
 	s->s_num_cap_releases = 0;
+	s->s_cap_reconnect = 0;
 	s->s_cap_iterator = NULL;
 	INIT_LIST_HEAD(&s->s_cap_releases);
 	INIT_LIST_HEAD(&s->s_cap_releases_done);
@@ -642,6 +644,8 @@
 		req->r_unsafe_dir = NULL;
 	}
 
+	complete_all(&req->r_safe_completion);
+
 	ceph_mdsc_put_request(req);
 }
 
@@ -986,7 +990,7 @@
 	dout("removing cap %p, ci is %p, inode is %p\n",
 	     cap, ci, &ci->vfs_inode);
 	spin_lock(&ci->i_ceph_lock);
-	__ceph_remove_cap(cap);
+	__ceph_remove_cap(cap, false);
 	if (!__ceph_is_any_real_caps(ci)) {
 		struct ceph_mds_client *mdsc =
 			ceph_sb_to_client(inode->i_sb)->mdsc;
@@ -1231,9 +1235,7 @@
 	session->s_trim_caps--;
 	if (oissued) {
 		/* we aren't the only cap.. just remove us */
-		__queue_cap_release(session, ceph_ino(inode), cap->cap_id,
-				    cap->mseq, cap->issue_seq);
-		__ceph_remove_cap(cap);
+		__ceph_remove_cap(cap, true);
 	} else {
 		/* try to drop referring dentries */
 		spin_unlock(&ci->i_ceph_lock);
@@ -1416,7 +1418,6 @@
 	unsigned num;
 
 	dout("discard_cap_releases mds%d\n", session->s_mds);
-	spin_lock(&session->s_cap_lock);
 
 	/* zero out the in-progress message */
 	msg = list_first_entry(&session->s_cap_releases,
@@ -1443,8 +1444,6 @@
 		msg->front.iov_len = sizeof(*head);
 		list_add(&msg->list_head, &session->s_cap_releases);
 	}
-
-	spin_unlock(&session->s_cap_lock);
 }
 
 /*
@@ -1875,8 +1874,11 @@
 	int mds = -1;
 	int err = -EAGAIN;
 
-	if (req->r_err || req->r_got_result)
+	if (req->r_err || req->r_got_result) {
+		if (req->r_aborted)
+			__unregister_request(mdsc, req);
 		goto out;
+	}
 
 	if (req->r_timeout &&
 	    time_after_eq(jiffies, req->r_started + req->r_timeout)) {
@@ -2186,7 +2188,6 @@
 	if (head->safe) {
 		req->r_got_safe = true;
 		__unregister_request(mdsc, req);
-		complete_all(&req->r_safe_completion);
 
 		if (req->r_got_unsafe) {
 			/*
@@ -2238,8 +2239,7 @@
 	err = ceph_fill_trace(mdsc->fsc->sb, req, req->r_session);
 	if (err == 0) {
 		if (result == 0 && (req->r_op == CEPH_MDS_OP_READDIR ||
-				    req->r_op == CEPH_MDS_OP_LSSNAP) &&
-		    rinfo->dir_nr)
+				    req->r_op == CEPH_MDS_OP_LSSNAP))
 			ceph_readdir_prepopulate(req, req->r_session);
 		ceph_unreserve_caps(mdsc, &req->r_caps_reservation);
 	}
@@ -2490,6 +2490,7 @@
 	cap->seq = 0;        /* reset cap seq */
 	cap->issue_seq = 0;  /* and issue_seq */
 	cap->mseq = 0;       /* and migrate_seq */
+	cap->cap_gen = cap->session->s_cap_gen;
 
 	if (recon_state->flock) {
 		rec.v2.cap_id = cpu_to_le64(cap->cap_id);
@@ -2552,6 +2553,8 @@
 	} else {
 		err = ceph_pagelist_append(pagelist, &rec, reclen);
 	}
+
+	recon_state->nr_caps++;
 out_free:
 	kfree(path);
 out_dput:
@@ -2579,6 +2582,7 @@
 	struct rb_node *p;
 	int mds = session->s_mds;
 	int err = -ENOMEM;
+	int s_nr_caps;
 	struct ceph_pagelist *pagelist;
 	struct ceph_reconnect_state recon_state;
 
@@ -2610,20 +2614,38 @@
 	dout("session %p state %s\n", session,
 	     session_state_name(session->s_state));
 
+	spin_lock(&session->s_gen_ttl_lock);
+	session->s_cap_gen++;
+	spin_unlock(&session->s_gen_ttl_lock);
+
+	spin_lock(&session->s_cap_lock);
+	/*
+	 * notify __ceph_remove_cap() that we are composing cap reconnect.
+	 * If a cap get released before being added to the cap reconnect,
+	 * __ceph_remove_cap() should skip queuing cap release.
+	 */
+	session->s_cap_reconnect = 1;
 	/* drop old cap expires; we're about to reestablish that state */
 	discard_cap_releases(mdsc, session);
+	spin_unlock(&session->s_cap_lock);
 
 	/* traverse this session's caps */
-	err = ceph_pagelist_encode_32(pagelist, session->s_nr_caps);
+	s_nr_caps = session->s_nr_caps;
+	err = ceph_pagelist_encode_32(pagelist, s_nr_caps);
 	if (err)
 		goto fail;
 
+	recon_state.nr_caps = 0;
 	recon_state.pagelist = pagelist;
 	recon_state.flock = session->s_con.peer_features & CEPH_FEATURE_FLOCK;
 	err = iterate_session_caps(session, encode_caps_cb, &recon_state);
 	if (err < 0)
 		goto fail;
 
+	spin_lock(&session->s_cap_lock);
+	session->s_cap_reconnect = 0;
+	spin_unlock(&session->s_cap_lock);
+
 	/*
 	 * snaprealms.  we provide mds with the ino, seq (version), and
 	 * parent for all of our realms.  If the mds has any newer info,
@@ -2646,11 +2668,18 @@
 
 	if (recon_state.flock)
 		reply->hdr.version = cpu_to_le16(2);
-	if (pagelist->length) {
-		/* set up outbound data if we have any */
-		reply->hdr.data_len = cpu_to_le32(pagelist->length);
-		ceph_msg_data_add_pagelist(reply, pagelist);
+
+	/* raced with cap release? */
+	if (s_nr_caps != recon_state.nr_caps) {
+		struct page *page = list_first_entry(&pagelist->head,
+						     struct page, lru);
+		__le32 *addr = kmap_atomic(page);
+		*addr = cpu_to_le32(recon_state.nr_caps);
+		kunmap_atomic(addr);
 	}
+
+	reply->hdr.data_len = cpu_to_le32(pagelist->length);
+	ceph_msg_data_add_pagelist(reply, pagelist);
 	ceph_con_send(&session->s_con, reply);
 
 	mutex_unlock(&session->s_mutex);
diff --git a/fs/ceph/mds_client.h b/fs/ceph/mds_client.h
index c2a19fb..4c053d0 100644
--- a/fs/ceph/mds_client.h
+++ b/fs/ceph/mds_client.h
@@ -132,6 +132,7 @@
 	struct list_head  s_caps;     /* all caps issued by this session */
 	int               s_nr_caps, s_trim_caps;
 	int               s_num_cap_releases;
+	int		  s_cap_reconnect;
 	struct list_head  s_cap_releases; /* waiting cap_release messages */
 	struct list_head  s_cap_releases_done; /* ready to send */
 	struct ceph_cap  *s_cap_iterator;
diff --git a/fs/ceph/super.h b/fs/ceph/super.h
index 6014b0a..ef4ac38 100644
--- a/fs/ceph/super.h
+++ b/fs/ceph/super.h
@@ -741,13 +741,7 @@
 			int fmode, unsigned issued, unsigned wanted,
 			unsigned cap, unsigned seq, u64 realmino, int flags,
 			struct ceph_cap_reservation *caps_reservation);
-extern void __ceph_remove_cap(struct ceph_cap *cap);
-static inline void ceph_remove_cap(struct ceph_cap *cap)
-{
-	spin_lock(&cap->ci->i_ceph_lock);
-	__ceph_remove_cap(cap);
-	spin_unlock(&cap->ci->i_ceph_lock);
-}
+extern void __ceph_remove_cap(struct ceph_cap *cap, bool queue_release);
 extern void ceph_put_cap(struct ceph_mds_client *mdsc,
 			 struct ceph_cap *cap);