Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client

* 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client: (28 commits)
  ceph: update discussion list address in MAINTAINERS
  ceph: some documentations fixes
  ceph: fix use after free on mds __unregister_request
  ceph: avoid loaded term 'OSD' in documention
  ceph: fix possible double-free of mds request reference
  ceph: fix session check on mds reply
  ceph: handle kmalloc() failure
  ceph: propagate mds session allocation failures to caller
  ceph: make write_begin wait propagate ERESTARTSYS
  ceph: fix snap rebuild condition
  ceph: avoid reopening osd connections when address hasn't changed
  ceph: rename r_sent_stamp r_stamp
  ceph: fix connection fault con_work reentrancy problem
  ceph: prevent dup stale messages to console for restarting mds
  ceph: fix pg pool decoding from incremental osdmap update
  ceph: fix mds sync() race with completing requests
  ceph: only release unused caps with mds requests
  ceph: clean up handle_cap_grant, handle_caps wrt session mutex
  ceph: fix session locking in handle_caps, ceph_check_caps
  ceph: drop unnecessary WARN_ON in caps migration
  ...
diff --git a/Documentation/filesystems/00-INDEX b/Documentation/filesystems/00-INDEX
index 3bae418..4303614 100644
--- a/Documentation/filesystems/00-INDEX
+++ b/Documentation/filesystems/00-INDEX
@@ -16,6 +16,8 @@
 	- information about the BeOS filesystem for Linux.
 bfs.txt
 	- info for the SCO UnixWare Boot Filesystem (BFS).
+ceph.txt
+	- info for the Ceph Distributed File System
 cifs.txt
 	- description of the CIFS filesystem.
 coda.txt
diff --git a/Documentation/filesystems/ceph.txt b/Documentation/filesystems/ceph.txt
index 6e03917..0660c9f 100644
--- a/Documentation/filesystems/ceph.txt
+++ b/Documentation/filesystems/ceph.txt
@@ -8,7 +8,7 @@
 
  * POSIX semantics
  * Seamless scaling from 1 to many thousands of nodes
- * High availability and reliability.  No single points of failure.
+ * High availability and reliability.  No single point of failure.
  * N-way replication of data across storage nodes
  * Fast recovery from node failures
  * Automatic rebalancing of data on node addition/removal
@@ -94,7 +94,7 @@
 
   wsize=X
 	Specify the maximum write size in bytes.  By default there is no
-	maximu.  Ceph will normally size writes based on the file stripe
+	maximum.  Ceph will normally size writes based on the file stripe
 	size.
 
   rsize=X
@@ -115,7 +115,7 @@
 	number of entries in that directory.
 
   nocrc
-	Disable CRC32C calculation for data writes.  If set, the OSD
+	Disable CRC32C calculation for data writes.  If set, the storage node
 	must rely on TCP's error correction to detect data corruption
 	in the data payload.
 
@@ -133,7 +133,8 @@
 	http://ceph.newdream.net/
 
 The Linux kernel client source tree is available at
-	git://ceph.newdream.net/linux-ceph-client.git
+	git://ceph.newdream.net/git/ceph-client.git
+	git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client.git
 
 and the source for the full system is at
-	git://ceph.newdream.net/ceph.git
+	git://ceph.newdream.net/git/ceph.git
diff --git a/MAINTAINERS b/MAINTAINERS
index 1a203f9..088bd41 100644
--- a/MAINTAINERS
+++ b/MAINTAINERS
@@ -1443,7 +1443,7 @@
 
 CEPH DISTRIBUTED FILE SYSTEM CLIENT
 M:	Sage Weil <sage@newdream.net>
-L:	ceph-devel@lists.sourceforge.net
+L:	ceph-devel@vger.kernel.org
 W:	http://ceph.newdream.net/
 T:	git git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client.git
 S:	Supported
diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c
index 23bb0ce..ce8ef61 100644
--- a/fs/ceph/addr.c
+++ b/fs/ceph/addr.c
@@ -919,6 +919,10 @@
 /*
  * We are only allowed to write into/dirty the page if the page is
  * clean, or already dirty within the same snap context.
+ *
+ * called with page locked.
+ * return success with page locked,
+ * or any failure (incl -EAGAIN) with page unlocked.
  */
 static int ceph_update_writeable_page(struct file *file,
 			    loff_t pos, unsigned len,
@@ -961,9 +965,11 @@
 			snapc = ceph_get_snap_context((void *)page->private);
 			unlock_page(page);
 			ceph_queue_writeback(inode);
-			wait_event_interruptible(ci->i_cap_wq,
+			r = wait_event_interruptible(ci->i_cap_wq,
 			       context_is_writeable_or_written(inode, snapc));
 			ceph_put_snap_context(snapc);
+			if (r == -ERESTARTSYS)
+				return r;
 			return -EAGAIN;
 		}
 
@@ -1035,7 +1041,7 @@
 	int r;
 
 	do {
-		/* get a page*/
+		/* get a page */
 		page = grab_cache_page_write_begin(mapping, index, 0);
 		if (!page)
 			return -ENOMEM;
diff --git a/fs/ceph/auth_x.c b/fs/ceph/auth_x.c
index f031842..8d8a849 100644
--- a/fs/ceph/auth_x.c
+++ b/fs/ceph/auth_x.c
@@ -28,6 +28,12 @@
 	return (ac->want_keys & xi->have_keys) == ac->want_keys;
 }
 
+static int ceph_x_encrypt_buflen(int ilen)
+{
+	return sizeof(struct ceph_x_encrypt_header) + ilen + 16 +
+		sizeof(u32);
+}
+
 static int ceph_x_encrypt(struct ceph_crypto_key *secret,
 			  void *ibuf, int ilen, void *obuf, size_t olen)
 {
@@ -150,6 +156,11 @@
 		struct timespec validity;
 		struct ceph_crypto_key old_key;
 		void *tp, *tpend;
+		struct ceph_timespec new_validity;
+		struct ceph_crypto_key new_session_key;
+		struct ceph_buffer *new_ticket_blob;
+		unsigned long new_expires, new_renew_after;
+		u64 new_secret_id;
 
 		ceph_decode_need(&p, end, sizeof(u32) + 1, bad);
 
@@ -182,16 +193,16 @@
 			goto bad;
 
 		memcpy(&old_key, &th->session_key, sizeof(old_key));
-		ret = ceph_crypto_key_decode(&th->session_key, &dp, dend);
+		ret = ceph_crypto_key_decode(&new_session_key, &dp, dend);
 		if (ret)
 			goto out;
 
-		ceph_decode_copy(&dp, &th->validity, sizeof(th->validity));
-		ceph_decode_timespec(&validity, &th->validity);
-		th->expires = get_seconds() + validity.tv_sec;
-		th->renew_after = th->expires - (validity.tv_sec / 4);
-		dout(" expires=%lu renew_after=%lu\n", th->expires,
-		     th->renew_after);
+		ceph_decode_copy(&dp, &new_validity, sizeof(new_validity));
+		ceph_decode_timespec(&validity, &new_validity);
+		new_expires = get_seconds() + validity.tv_sec;
+		new_renew_after = new_expires - (validity.tv_sec / 4);
+		dout(" expires=%lu renew_after=%lu\n", new_expires,
+		     new_renew_after);
 
 		/* ticket blob for service */
 		ceph_decode_8_safe(&p, end, is_enc, bad);
@@ -216,10 +227,21 @@
 		dout(" ticket blob is %d bytes\n", dlen);
 		ceph_decode_need(&tp, tpend, 1 + sizeof(u64), bad);
 		struct_v = ceph_decode_8(&tp);
-		th->secret_id = ceph_decode_64(&tp);
-		ret = ceph_decode_buffer(&th->ticket_blob, &tp, tpend);
+		new_secret_id = ceph_decode_64(&tp);
+		ret = ceph_decode_buffer(&new_ticket_blob, &tp, tpend);
 		if (ret)
 			goto out;
+
+		/* all is well, update our ticket */
+		ceph_crypto_key_destroy(&th->session_key);
+		if (th->ticket_blob)
+			ceph_buffer_put(th->ticket_blob);
+		th->session_key = new_session_key;
+		th->ticket_blob = new_ticket_blob;
+		th->validity = new_validity;
+		th->secret_id = new_secret_id;
+		th->expires = new_expires;
+		th->renew_after = new_renew_after;
 		dout(" got ticket service %d (%s) secret_id %lld len %d\n",
 		     type, ceph_entity_type_name(type), th->secret_id,
 		     (int)th->ticket_blob->vec.iov_len);
@@ -242,7 +264,7 @@
 				   struct ceph_x_ticket_handler *th,
 				   struct ceph_x_authorizer *au)
 {
-	int len;
+	int maxlen;
 	struct ceph_x_authorize_a *msg_a;
 	struct ceph_x_authorize_b msg_b;
 	void *p, *end;
@@ -253,15 +275,15 @@
 	dout("build_authorizer for %s %p\n",
 	     ceph_entity_type_name(th->service), au);
 
-	len = sizeof(*msg_a) + sizeof(msg_b) + sizeof(u32) +
-		ticket_blob_len + 16;
-	dout("  need len %d\n", len);
-	if (au->buf && au->buf->alloc_len < len) {
+	maxlen = sizeof(*msg_a) + sizeof(msg_b) +
+		ceph_x_encrypt_buflen(ticket_blob_len);
+	dout("  need len %d\n", maxlen);
+	if (au->buf && au->buf->alloc_len < maxlen) {
 		ceph_buffer_put(au->buf);
 		au->buf = NULL;
 	}
 	if (!au->buf) {
-		au->buf = ceph_buffer_new(len, GFP_NOFS);
+		au->buf = ceph_buffer_new(maxlen, GFP_NOFS);
 		if (!au->buf)
 			return -ENOMEM;
 	}
@@ -296,6 +318,7 @@
 	au->buf->vec.iov_len = p - au->buf->vec.iov_base;
 	dout(" built authorizer nonce %llx len %d\n", au->nonce,
 	     (int)au->buf->vec.iov_len);
+	BUG_ON(au->buf->vec.iov_len > maxlen);
 	return 0;
 
 out_buf:
diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c
index db122bb..7d0a0d0 100644
--- a/fs/ceph/caps.c
+++ b/fs/ceph/caps.c
@@ -1407,6 +1407,7 @@
  */
 void ceph_check_caps(struct ceph_inode_info *ci, int flags,
 		     struct ceph_mds_session *session)
+	__releases(session->s_mutex)
 {
 	struct ceph_client *client = ceph_inode_to_client(&ci->vfs_inode);
 	struct ceph_mds_client *mdsc = &client->mdsc;
@@ -1414,7 +1415,6 @@
 	struct ceph_cap *cap;
 	int file_wanted, used;
 	int took_snap_rwsem = 0;             /* true if mdsc->snap_rwsem held */
-	int drop_session_lock = session ? 0 : 1;
 	int issued, implemented, want, retain, revoking, flushing = 0;
 	int mds = -1;   /* keep track of how far we've gone through i_caps list
 			   to avoid an infinite loop on retry */
@@ -1639,7 +1639,7 @@
 	if (queue_invalidate)
 		ceph_queue_invalidate(inode);
 
-	if (session && drop_session_lock)
+	if (session)
 		mutex_unlock(&session->s_mutex);
 	if (took_snap_rwsem)
 		up_read(&mdsc->snap_rwsem);
@@ -2195,18 +2195,19 @@
  * Handle a cap GRANT message from the MDS.  (Note that a GRANT may
  * actually be a revocation if it specifies a smaller cap set.)
  *
- * caller holds s_mutex.
+ * caller holds s_mutex and i_lock, we drop both.
+ *
  * return value:
  *  0 - ok
  *  1 - check_caps on auth cap only (writeback)
  *  2 - check_caps (ack revoke)
  */
-static int handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant,
-			    struct ceph_mds_session *session,
-			    struct ceph_cap *cap,
-			    struct ceph_buffer *xattr_buf)
+static void handle_cap_grant(struct inode *inode, struct ceph_mds_caps *grant,
+			     struct ceph_mds_session *session,
+			     struct ceph_cap *cap,
+			     struct ceph_buffer *xattr_buf)
 	__releases(inode->i_lock)
-
+	__releases(session->s_mutex)
 {
 	struct ceph_inode_info *ci = ceph_inode(inode);
 	int mds = session->s_mds;
@@ -2216,7 +2217,7 @@
 	u64 size = le64_to_cpu(grant->size);
 	u64 max_size = le64_to_cpu(grant->max_size);
 	struct timespec mtime, atime, ctime;
-	int reply = 0;
+	int check_caps = 0;
 	int wake = 0;
 	int writeback = 0;
 	int revoked_rdcache = 0;
@@ -2329,11 +2330,12 @@
 		if ((used & ~newcaps) & CEPH_CAP_FILE_BUFFER)
 			writeback = 1; /* will delay ack */
 		else if (dirty & ~newcaps)
-			reply = 1;     /* initiate writeback in check_caps */
+			check_caps = 1;  /* initiate writeback in check_caps */
 		else if (((used & ~newcaps) & CEPH_CAP_FILE_CACHE) == 0 ||
 			   revoked_rdcache)
-			reply = 2;     /* send revoke ack in check_caps */
+			check_caps = 2;     /* send revoke ack in check_caps */
 		cap->issued = newcaps;
+		cap->implemented |= newcaps;
 	} else if (cap->issued == newcaps) {
 		dout("caps unchanged: %s -> %s\n",
 		     ceph_cap_string(cap->issued), ceph_cap_string(newcaps));
@@ -2346,6 +2348,7 @@
 					      * pending revocation */
 		wake = 1;
 	}
+	BUG_ON(cap->issued & ~cap->implemented);
 
 	spin_unlock(&inode->i_lock);
 	if (writeback)
@@ -2359,7 +2362,14 @@
 		ceph_queue_invalidate(inode);
 	if (wake)
 		wake_up(&ci->i_cap_wq);
-	return reply;
+
+	if (check_caps == 1)
+		ceph_check_caps(ci, CHECK_CAPS_NODELAY|CHECK_CAPS_AUTHONLY,
+				session);
+	else if (check_caps == 2)
+		ceph_check_caps(ci, CHECK_CAPS_NODELAY, session);
+	else
+		mutex_unlock(&session->s_mutex);
 }
 
 /*
@@ -2548,9 +2558,8 @@
 			ci->i_cap_exporting_issued = cap->issued;
 		}
 		__ceph_remove_cap(cap);
-	} else {
-		WARN_ON(!cap);
 	}
+	/* else, we already released it */
 
 	spin_unlock(&inode->i_lock);
 }
@@ -2621,9 +2630,7 @@
 	u64 cap_id;
 	u64 size, max_size;
 	u64 tid;
-	int check_caps = 0;
 	void *snaptrace;
-	int r;
 
 	dout("handle_caps from mds%d\n", mds);
 
@@ -2668,8 +2675,9 @@
 	case CEPH_CAP_OP_IMPORT:
 		handle_cap_import(mdsc, inode, h, session,
 				  snaptrace, le32_to_cpu(h->snap_trace_len));
-		check_caps = 1; /* we may have sent a RELEASE to the old auth */
-		goto done;
+		ceph_check_caps(ceph_inode(inode), CHECK_CAPS_NODELAY,
+				session);
+		goto done_unlocked;
 	}
 
 	/* the rest require a cap */
@@ -2686,16 +2694,8 @@
 	switch (op) {
 	case CEPH_CAP_OP_REVOKE:
 	case CEPH_CAP_OP_GRANT:
-		r = handle_cap_grant(inode, h, session, cap, msg->middle);
-		if (r == 1)
-			ceph_check_caps(ceph_inode(inode),
-					CHECK_CAPS_NODELAY|CHECK_CAPS_AUTHONLY,
-					session);
-		else if (r == 2)
-			ceph_check_caps(ceph_inode(inode),
-					CHECK_CAPS_NODELAY,
-					session);
-		break;
+		handle_cap_grant(inode, h, session, cap, msg->middle);
+		goto done_unlocked;
 
 	case CEPH_CAP_OP_FLUSH_ACK:
 		handle_cap_flush_ack(inode, tid, h, session, cap);
@@ -2713,9 +2713,7 @@
 
 done:
 	mutex_unlock(&session->s_mutex);
-
-	if (check_caps)
-		ceph_check_caps(ceph_inode(inode), CHECK_CAPS_NODELAY, NULL);
+done_unlocked:
 	if (inode)
 		iput(inode);
 	return;
@@ -2838,11 +2836,18 @@
 	struct ceph_cap *cap;
 	struct ceph_mds_request_release *rel = *p;
 	int ret = 0;
-
-	dout("encode_inode_release %p mds%d drop %s unless %s\n", inode,
-	     mds, ceph_cap_string(drop), ceph_cap_string(unless));
+	int used = 0;
 
 	spin_lock(&inode->i_lock);
+	used = __ceph_caps_used(ci);
+
+	dout("encode_inode_release %p mds%d used %s drop %s unless %s\n", inode,
+	     mds, ceph_cap_string(used), ceph_cap_string(drop),
+	     ceph_cap_string(unless));
+
+	/* only drop unused caps */
+	drop &= ~used;
+
 	cap = __get_cap_for_mds(ci, mds);
 	if (cap && __cap_is_valid(cap)) {
 		if (force ||
diff --git a/fs/ceph/dir.c b/fs/ceph/dir.c
index 5107384..8a9116e 100644
--- a/fs/ceph/dir.c
+++ b/fs/ceph/dir.c
@@ -288,8 +288,10 @@
 			CEPH_MDS_OP_LSSNAP : CEPH_MDS_OP_READDIR;
 
 		/* discard old result, if any */
-		if (fi->last_readdir)
+		if (fi->last_readdir) {
 			ceph_mdsc_put_request(fi->last_readdir);
+			fi->last_readdir = NULL;
+		}
 
 		/* requery frag tree, as the frag topology may have changed */
 		frag = ceph_choose_frag(ceph_inode(inode), frag, NULL, NULL);
diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c
index 7abe1ae..aca82d5 100644
--- a/fs/ceph/inode.c
+++ b/fs/ceph/inode.c
@@ -378,6 +378,22 @@
 
 	ceph_queue_caps_release(inode);
 
+	/*
+	 * we may still have a snap_realm reference if there are stray
+	 * caps in i_cap_exporting_issued or i_snap_caps.
+	 */
+	if (ci->i_snap_realm) {
+		struct ceph_mds_client *mdsc =
+			&ceph_client(ci->vfs_inode.i_sb)->mdsc;
+		struct ceph_snap_realm *realm = ci->i_snap_realm;
+
+		dout(" dropping residual ref to snap realm %p\n", realm);
+		spin_lock(&realm->inodes_with_caps_lock);
+		list_del_init(&ci->i_snap_realm_item);
+		spin_unlock(&realm->inodes_with_caps_lock);
+		ceph_put_snap_realm(mdsc, realm);
+	}
+
 	kfree(ci->i_symlink);
 	while ((n = rb_first(&ci->i_fragtree)) != NULL) {
 		frag = rb_entry(n, struct ceph_inode_frag, node);
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index a260010..5c7920b 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -328,6 +328,8 @@
 	struct ceph_mds_session *s;
 
 	s = kzalloc(sizeof(*s), GFP_NOFS);
+	if (!s)
+		return ERR_PTR(-ENOMEM);
 	s->s_mdsc = mdsc;
 	s->s_mds = mds;
 	s->s_state = CEPH_MDS_SESSION_NEW;
@@ -529,7 +531,7 @@
 {
 	dout("__unregister_request %p tid %lld\n", req, req->r_tid);
 	rb_erase(&req->r_node, &mdsc->request_tree);
-	ceph_mdsc_put_request(req);
+	RB_CLEAR_NODE(&req->r_node);
 
 	if (req->r_unsafe_dir) {
 		struct ceph_inode_info *ci = ceph_inode(req->r_unsafe_dir);
@@ -538,6 +540,8 @@
 		list_del_init(&req->r_unsafe_dir_item);
 		spin_unlock(&ci->i_unsafe_lock);
 	}
+
+	ceph_mdsc_put_request(req);
 }
 
 /*
@@ -862,6 +866,7 @@
 	if (time_after_eq(jiffies, session->s_cap_ttl) &&
 	    time_after_eq(session->s_cap_ttl, session->s_renew_requested))
 		pr_info("mds%d caps stale\n", session->s_mds);
+	session->s_renew_requested = jiffies;
 
 	/* do not try to renew caps until a recovering mds has reconnected
 	 * with its clients. */
@@ -874,7 +879,6 @@
 
 	dout("send_renew_caps to mds%d (%s)\n", session->s_mds,
 		ceph_mds_state_name(state));
-	session->s_renew_requested = jiffies;
 	msg = create_session_msg(CEPH_SESSION_REQUEST_RENEWCAPS,
 				 ++session->s_renew_seq);
 	if (IS_ERR(msg))
@@ -1566,8 +1570,13 @@
 
 	/* get, open session */
 	session = __ceph_lookup_mds_session(mdsc, mds);
-	if (!session)
+	if (!session) {
 		session = register_session(mdsc, mds);
+		if (IS_ERR(session)) {
+			err = PTR_ERR(session);
+			goto finish;
+		}
+	}
 	dout("do_request mds%d session %p state %s\n", mds, session,
 	     session_state_name(session->s_state));
 	if (session->s_state != CEPH_MDS_SESSION_OPEN &&
@@ -1770,7 +1779,7 @@
 	dout("handle_reply %p\n", req);
 
 	/* correct session? */
-	if (!req->r_session && req->r_session != session) {
+	if (req->r_session != session) {
 		pr_err("mdsc_handle_reply got %llu on session mds%d"
 		       " not mds%d\n", tid, session->s_mds,
 		       req->r_session ? req->r_session->s_mds : -1);
@@ -2682,29 +2691,41 @@
  */
 static void wait_unsafe_requests(struct ceph_mds_client *mdsc, u64 want_tid)
 {
-	struct ceph_mds_request *req = NULL;
+	struct ceph_mds_request *req = NULL, *nextreq;
 	struct rb_node *n;
 
 	mutex_lock(&mdsc->mutex);
 	dout("wait_unsafe_requests want %lld\n", want_tid);
+restart:
 	req = __get_oldest_req(mdsc);
 	while (req && req->r_tid <= want_tid) {
+		/* find next request */
+		n = rb_next(&req->r_node);
+		if (n)
+			nextreq = rb_entry(n, struct ceph_mds_request, r_node);
+		else
+			nextreq = NULL;
 		if ((req->r_op & CEPH_MDS_OP_WRITE)) {
 			/* write op */
 			ceph_mdsc_get_request(req);
+			if (nextreq)
+				ceph_mdsc_get_request(nextreq);
 			mutex_unlock(&mdsc->mutex);
 			dout("wait_unsafe_requests  wait on %llu (want %llu)\n",
 			     req->r_tid, want_tid);
 			wait_for_completion(&req->r_safe_completion);
 			mutex_lock(&mdsc->mutex);
-			n = rb_next(&req->r_node);
 			ceph_mdsc_put_request(req);
-		} else {
-			n = rb_next(&req->r_node);
+			if (!nextreq)
+				break;  /* next dne before, so we're done! */
+			if (RB_EMPTY_NODE(&nextreq->r_node)) {
+				/* next request was removed from tree */
+				ceph_mdsc_put_request(nextreq);
+				goto restart;
+			}
+			ceph_mdsc_put_request(nextreq);  /* won't go away */
 		}
-		if (!n)
-			break;
-		req = rb_entry(n, struct ceph_mds_request, r_node);
+		req = nextreq;
 	}
 	mutex_unlock(&mdsc->mutex);
 	dout("wait_unsafe_requests done\n");
diff --git a/fs/ceph/messenger.c b/fs/ceph/messenger.c
index 781656a..a32f0f8 100644
--- a/fs/ceph/messenger.c
+++ b/fs/ceph/messenger.c
@@ -366,6 +366,14 @@
 }
 
 /*
+ * return true if this connection ever successfully opened
+ */
+bool ceph_con_opened(struct ceph_connection *con)
+{
+	return con->connect_seq > 0;
+}
+
+/*
  * generic get/put
  */
 struct ceph_connection *ceph_con_get(struct ceph_connection *con)
@@ -830,13 +838,6 @@
 	con->in_base_pos = 0;
 }
 
-static void prepare_read_connect_retry(struct ceph_connection *con)
-{
-	dout("prepare_read_connect_retry %p\n", con);
-	con->in_base_pos = strlen(CEPH_BANNER) + sizeof(con->actual_peer_addr)
-		+ sizeof(con->peer_addr_for_me);
-}
-
 static void prepare_read_ack(struct ceph_connection *con)
 {
 	dout("prepare_read_ack %p\n", con);
@@ -1146,7 +1147,7 @@
 		}
 		con->auth_retry = 1;
 		prepare_write_connect(con->msgr, con, 0);
-		prepare_read_connect_retry(con);
+		prepare_read_connect(con);
 		break;
 
 	case CEPH_MSGR_TAG_RESETSESSION:
@@ -1843,8 +1844,6 @@
 		goto out;
 	}
 
-	clear_bit(BUSY, &con->state);  /* to avoid an improbable race */
-
 	mutex_lock(&con->mutex);
 	if (test_bit(CLOSED, &con->state))
 		goto out_unlock;
diff --git a/fs/ceph/messenger.h b/fs/ceph/messenger.h
index 4caaa59..a343dae 100644
--- a/fs/ceph/messenger.h
+++ b/fs/ceph/messenger.h
@@ -223,6 +223,7 @@
 			  struct ceph_connection *con);
 extern void ceph_con_open(struct ceph_connection *con,
 			  struct ceph_entity_addr *addr);
+extern bool ceph_con_opened(struct ceph_connection *con);
 extern void ceph_con_close(struct ceph_connection *con);
 extern void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg);
 extern void ceph_con_revoke(struct ceph_connection *con, struct ceph_msg *msg);
diff --git a/fs/ceph/osd_client.c b/fs/ceph/osd_client.c
index dbe63db9..c7b4ded 100644
--- a/fs/ceph/osd_client.c
+++ b/fs/ceph/osd_client.c
@@ -413,11 +413,22 @@
  */
 static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
 {
+	struct ceph_osd_request *req;
 	int ret = 0;
 
 	dout("__reset_osd %p osd%d\n", osd, osd->o_osd);
 	if (list_empty(&osd->o_requests)) {
 		__remove_osd(osdc, osd);
+	} else if (memcmp(&osdc->osdmap->osd_addr[osd->o_osd],
+			  &osd->o_con.peer_addr,
+			  sizeof(osd->o_con.peer_addr)) == 0 &&
+		   !ceph_con_opened(&osd->o_con)) {
+		dout(" osd addr hasn't changed and connection never opened,"
+		     " letting msgr retry");
+		/* touch each r_stamp for handle_timeout()'s benfit */
+		list_for_each_entry(req, &osd->o_requests, r_osd_item)
+			req->r_stamp = jiffies;
+		ret = -EAGAIN;
 	} else {
 		ceph_con_close(&osd->o_con);
 		ceph_con_open(&osd->o_con, &osdc->osdmap->osd_addr[osd->o_osd]);
@@ -633,7 +644,7 @@
 	reqhead->flags |= cpu_to_le32(req->r_flags);  /* e.g., RETRY */
 	reqhead->reassert_version = req->r_reassert_version;
 
-	req->r_sent_stamp = jiffies;
+	req->r_stamp = jiffies;
 	list_move_tail(&osdc->req_lru, &req->r_req_lru_item);
 
 	ceph_msg_get(req->r_request); /* send consumes a ref */
@@ -660,7 +671,7 @@
 	unsigned long timeout = osdc->client->mount_args->osd_timeout * HZ;
 	unsigned long keepalive =
 		osdc->client->mount_args->osd_keepalive_timeout * HZ;
-	unsigned long last_sent = 0;
+	unsigned long last_stamp = 0;
 	struct rb_node *p;
 	struct list_head slow_osds;
 
@@ -697,12 +708,12 @@
 		req = list_entry(osdc->req_lru.next, struct ceph_osd_request,
 				 r_req_lru_item);
 
-		if (time_before(jiffies, req->r_sent_stamp + timeout))
+		if (time_before(jiffies, req->r_stamp + timeout))
 			break;
 
-		BUG_ON(req == last_req && req->r_sent_stamp == last_sent);
+		BUG_ON(req == last_req && req->r_stamp == last_stamp);
 		last_req = req;
-		last_sent = req->r_sent_stamp;
+		last_stamp = req->r_stamp;
 
 		osd = req->r_osd;
 		BUG_ON(!osd);
@@ -718,7 +729,7 @@
 	 */
 	INIT_LIST_HEAD(&slow_osds);
 	list_for_each_entry(req, &osdc->req_lru, r_req_lru_item) {
-		if (time_before(jiffies, req->r_sent_stamp + keepalive))
+		if (time_before(jiffies, req->r_stamp + keepalive))
 			break;
 
 		osd = req->r_osd;
@@ -862,7 +873,9 @@
 
 	dout("kick_requests osd%d\n", kickosd ? kickosd->o_osd : -1);
 	if (kickosd) {
-		__reset_osd(osdc, kickosd);
+		err = __reset_osd(osdc, kickosd);
+		if (err == -EAGAIN)
+			return 1;
 	} else {
 		for (p = rb_first(&osdc->osds); p; p = n) {
 			struct ceph_osd *osd =
@@ -913,7 +926,7 @@
 
 kick:
 		dout("kicking %p tid %llu osd%d\n", req, req->r_tid,
-		     req->r_osd->o_osd);
+		     req->r_osd ? req->r_osd->o_osd : -1);
 		req->r_flags |= CEPH_OSD_FLAG_RETRY;
 		err = __send_request(osdc, req);
 		if (err) {
diff --git a/fs/ceph/osd_client.h b/fs/ceph/osd_client.h
index 1b1a3ca..b075991 100644
--- a/fs/ceph/osd_client.h
+++ b/fs/ceph/osd_client.h
@@ -70,7 +70,7 @@
 
 	char              r_oid[40];          /* object name */
 	int               r_oid_len;
-	unsigned long     r_sent_stamp;
+	unsigned long     r_stamp;            /* send OR check time */
 	bool              r_resend;           /* msg send failed, needs retry */
 
 	struct ceph_file_layout r_file_layout;
diff --git a/fs/ceph/osdmap.c b/fs/ceph/osdmap.c
index b83f269..d82fe87 100644
--- a/fs/ceph/osdmap.c
+++ b/fs/ceph/osdmap.c
@@ -480,6 +480,14 @@
 	return NULL;
 }
 
+void __decode_pool(void **p, struct ceph_pg_pool_info *pi)
+{
+	ceph_decode_copy(p, &pi->v, sizeof(pi->v));
+	calc_pg_masks(pi);
+	*p += le32_to_cpu(pi->v.num_snaps) * sizeof(u64);
+	*p += le32_to_cpu(pi->v.num_removed_snap_intervals) * sizeof(u64) * 2;
+}
+
 /*
  * decode a full map.
  */
@@ -526,12 +534,8 @@
 				   ev, CEPH_PG_POOL_VERSION);
 			goto bad;
 		}
-		ceph_decode_copy(p, &pi->v, sizeof(pi->v));
+		__decode_pool(p, pi);
 		__insert_pg_pool(&map->pg_pools, pi);
-		calc_pg_masks(pi);
-		*p += le32_to_cpu(pi->v.num_snaps) * sizeof(u64);
-		*p += le32_to_cpu(pi->v.num_removed_snap_intervals)
-			* sizeof(u64) * 2;
 	}
 	ceph_decode_32_safe(p, end, map->pool_max, bad);
 
@@ -714,8 +718,7 @@
 			pi->id = pool;
 			__insert_pg_pool(&map->pg_pools, pi);
 		}
-		ceph_decode_copy(p, &pi->v, sizeof(pi->v));
-		calc_pg_masks(pi);
+		__decode_pool(p, pi);
 	}
 
 	/* old_pool */
diff --git a/fs/ceph/snap.c b/fs/ceph/snap.c
index bf2a5f3..df04e21 100644
--- a/fs/ceph/snap.c
+++ b/fs/ceph/snap.c
@@ -314,9 +314,9 @@
 	   because we rebuild_snap_realms() works _downward_ in
 	   hierarchy after each update.) */
 	if (realm->cached_context &&
-	    realm->cached_context->seq <= realm->seq &&
+	    realm->cached_context->seq == realm->seq &&
 	    (!parent ||
-	     realm->cached_context->seq <= parent->cached_context->seq)) {
+	     realm->cached_context->seq >= parent->cached_context->seq)) {
 		dout("build_snap_context %llx %p: %p seq %lld (%d snaps)"
 		     " (unchanged)\n",
 		     realm->ino, realm, realm->cached_context,
@@ -818,7 +818,9 @@
 			 * queued (again) by ceph_update_snap_trace()
 			 * below.  Queue it _now_, under the old context.
 			 */
+			spin_lock(&realm->inodes_with_caps_lock);
 			list_del_init(&ci->i_snap_realm_item);
+			spin_unlock(&realm->inodes_with_caps_lock);
 			spin_unlock(&inode->i_lock);
 
 			ceph_queue_cap_snap(ci,