libceph: a major OSD client update

This is a major sync up, up to ~Jewel.  The highlights are:

- per-session request trees (vs a global per-client tree)
- per-session locking (vs a global per-client rwlock)
- homeless OSD session
- no ad-hoc global per-client lists
- support for pool quotas
- foundation for watch/notify v2 support
- foundation for map check (pool deletion detection) support

The switchover is incomplete: lingering requests can be setup and
teared down but aren't ever reestablished.  This functionality is
restored with the introduction of the new lingering infrastructure
(ceph_osd_linger_request, linger_work, etc) in a later commit.

Signed-off-by: Ilya Dryomov <idryomov@gmail.com>
diff --git a/net/ceph/debugfs.c b/net/ceph/debugfs.c
index 6d3ff71..61dbd9d 100644
--- a/net/ceph/debugfs.c
+++ b/net/ceph/debugfs.c
@@ -182,21 +182,39 @@
 	seq_putc(s, '\n');
 }
 
+static void dump_requests(struct seq_file *s, struct ceph_osd *osd)
+{
+	struct rb_node *n;
+
+	mutex_lock(&osd->lock);
+	for (n = rb_first(&osd->o_requests); n; n = rb_next(n)) {
+		struct ceph_osd_request *req =
+		    rb_entry(n, struct ceph_osd_request, r_node);
+
+		dump_request(s, req);
+	}
+
+	mutex_unlock(&osd->lock);
+}
+
 static int osdc_show(struct seq_file *s, void *pp)
 {
 	struct ceph_client *client = s->private;
 	struct ceph_osd_client *osdc = &client->osdc;
-	struct rb_node *p;
+	struct rb_node *n;
 
-	mutex_lock(&osdc->request_mutex);
-	for (p = rb_first(&osdc->requests); p; p = rb_next(p)) {
-		struct ceph_osd_request *req;
+	down_read(&osdc->lock);
+	seq_printf(s, "REQUESTS %d homeless %d\n",
+		   atomic_read(&osdc->num_requests),
+		   atomic_read(&osdc->num_homeless));
+	for (n = rb_first(&osdc->osds); n; n = rb_next(n)) {
+		struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node);
 
-		req = rb_entry(p, struct ceph_osd_request, r_node);
-
-		dump_request(s, req);
+		dump_requests(s, osd);
 	}
-	mutex_unlock(&osdc->request_mutex);
+	dump_requests(s, &osdc->homeless_osd);
+
+	up_read(&osdc->lock);
 	return 0;
 }
 
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index d1c8e06..4c856c8 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -25,16 +25,6 @@
 
 static const struct ceph_connection_operations osd_con_ops;
 
-static void __send_queued(struct ceph_osd_client *osdc);
-static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd);
-static void __register_request(struct ceph_osd_client *osdc,
-			       struct ceph_osd_request *req);
-static void __unregister_request(struct ceph_osd_client *osdc,
-				 struct ceph_osd_request *req);
-static void __unregister_linger_request(struct ceph_osd_client *osdc,
-					struct ceph_osd_request *req);
-static void __enqueue_request(struct ceph_osd_request *req);
-
 /*
  * Implement client access to distributed object storage cluster.
  *
@@ -53,6 +43,43 @@
  * channel with an OSD is reset.
  */
 
+static void link_request(struct ceph_osd *osd, struct ceph_osd_request *req);
+static void unlink_request(struct ceph_osd *osd, struct ceph_osd_request *req);
+
+#if 1
+static inline bool rwsem_is_wrlocked(struct rw_semaphore *sem)
+{
+	bool wrlocked = true;
+
+	if (unlikely(down_read_trylock(sem))) {
+		wrlocked = false;
+		up_read(sem);
+	}
+
+	return wrlocked;
+}
+static inline void verify_osdc_locked(struct ceph_osd_client *osdc)
+{
+	WARN_ON(!rwsem_is_locked(&osdc->lock));
+}
+static inline void verify_osdc_wrlocked(struct ceph_osd_client *osdc)
+{
+	WARN_ON(!rwsem_is_wrlocked(&osdc->lock));
+}
+static inline void verify_osd_locked(struct ceph_osd *osd)
+{
+	struct ceph_osd_client *osdc = osd->o_osdc;
+
+	WARN_ON(!(mutex_is_locked(&osd->lock) &&
+		  rwsem_is_locked(&osdc->lock)) &&
+		!rwsem_is_wrlocked(&osdc->lock));
+}
+#else
+static inline void verify_osdc_locked(struct ceph_osd_client *osdc) { }
+static inline void verify_osdc_wrlocked(struct ceph_osd_client *osdc) { }
+static inline void verify_osd_locked(struct ceph_osd *osd) { }
+#endif
+
 /*
  * calculate the mapping of a file extent onto an object, and fill out the
  * request accordingly.  shorten extent as necessary if it crosses an
@@ -336,18 +363,14 @@
 	dout("%s %p (r_request %p r_reply %p)\n", __func__, req,
 	     req->r_request, req->r_reply);
 	WARN_ON(!RB_EMPTY_NODE(&req->r_node));
-	WARN_ON(!list_empty(&req->r_req_lru_item));
-	WARN_ON(!list_empty(&req->r_osd_item));
 	WARN_ON(!list_empty(&req->r_linger_item));
 	WARN_ON(!list_empty(&req->r_linger_osd_item));
 	WARN_ON(req->r_osd);
 
 	if (req->r_request)
 		ceph_msg_put(req->r_request);
-	if (req->r_reply) {
-		ceph_msg_revoke_incoming(req->r_reply);
+	if (req->r_reply)
 		ceph_msg_put(req->r_reply);
-	}
 
 	for (which = 0; which < req->r_num_ops; which++)
 		osd_req_op_data_release(req, which);
@@ -418,8 +441,6 @@
 	INIT_LIST_HEAD(&req->r_unsafe_item);
 	INIT_LIST_HEAD(&req->r_linger_item);
 	INIT_LIST_HEAD(&req->r_linger_osd_item);
-	INIT_LIST_HEAD(&req->r_req_lru_item);
-	INIT_LIST_HEAD(&req->r_osd_item);
 
 	target_init(&req->r_t);
 
@@ -869,141 +890,11 @@
 	return osd->o_osd == CEPH_HOMELESS_OSD;
 }
 
-static struct ceph_osd_request *
-__lookup_request_ge(struct ceph_osd_client *osdc,
-		    u64 tid)
+static bool osd_registered(struct ceph_osd *osd)
 {
-	struct ceph_osd_request *req;
-	struct rb_node *n = osdc->requests.rb_node;
+	verify_osdc_locked(osd->o_osdc);
 
-	while (n) {
-		req = rb_entry(n, struct ceph_osd_request, r_node);
-		if (tid < req->r_tid) {
-			if (!n->rb_left)
-				return req;
-			n = n->rb_left;
-		} else if (tid > req->r_tid) {
-			n = n->rb_right;
-		} else {
-			return req;
-		}
-	}
-	return NULL;
-}
-
-static void __kick_linger_request(struct ceph_osd_request *req)
-{
-	struct ceph_osd_client *osdc = req->r_osdc;
-	struct ceph_osd *osd = req->r_osd;
-
-	/*
-	 * Linger requests need to be resent with a new tid to avoid
-	 * the dup op detection logic on the OSDs.  Achieve this with
-	 * a re-register dance instead of open-coding.
-	 */
-	ceph_osdc_get_request(req);
-	if (!list_empty(&req->r_linger_item))
-		__unregister_linger_request(osdc, req);
-	else
-		__unregister_request(osdc, req);
-	__register_request(osdc, req);
-	ceph_osdc_put_request(req);
-
-	/*
-	 * Unless request has been registered as both normal and
-	 * lingering, __unregister{,_linger}_request clears r_osd.
-	 * However, here we need to preserve r_osd to make sure we
-	 * requeue on the same OSD.
-	 */
-	WARN_ON(req->r_osd || !osd);
-	req->r_osd = osd;
-
-	dout("%s requeueing %p tid %llu\n", __func__, req, req->r_tid);
-	__enqueue_request(req);
-}
-
-/*
- * Resubmit requests pending on the given osd.
- */
-static void __kick_osd_requests(struct ceph_osd_client *osdc,
-				struct ceph_osd *osd)
-{
-	struct ceph_osd_request *req, *nreq;
-	LIST_HEAD(resend);
-	LIST_HEAD(resend_linger);
-	int err;
-
-	dout("%s osd%d\n", __func__, osd->o_osd);
-	err = __reset_osd(osdc, osd);
-	if (err)
-		return;
-
-	/*
-	 * Build up a list of requests to resend by traversing the
-	 * osd's list of requests.  Requests for a given object are
-	 * sent in tid order, and that is also the order they're
-	 * kept on this list.  Therefore all requests that are in
-	 * flight will be found first, followed by all requests that
-	 * have not yet been sent.  And to resend requests while
-	 * preserving this order we will want to put any sent
-	 * requests back on the front of the osd client's unsent
-	 * list.
-	 *
-	 * So we build a separate ordered list of already-sent
-	 * requests for the affected osd and splice it onto the
-	 * front of the osd client's unsent list.  Once we've seen a
-	 * request that has not yet been sent we're done.  Those
-	 * requests are already sitting right where they belong.
-	 */
-	list_for_each_entry(req, &osd->o_requests, r_osd_item) {
-		if (!req->r_sent)
-			break;
-
-		if (!req->r_linger) {
-			dout("%s requeueing %p tid %llu\n", __func__, req,
-			     req->r_tid);
-			list_move_tail(&req->r_req_lru_item, &resend);
-			req->r_flags |= CEPH_OSD_FLAG_RETRY;
-		} else {
-			list_move_tail(&req->r_req_lru_item, &resend_linger);
-		}
-	}
-	list_splice(&resend, &osdc->req_unsent);
-
-	/*
-	 * Both registered and not yet registered linger requests are
-	 * enqueued with a new tid on the same OSD.  We add/move them
-	 * to req_unsent/o_requests at the end to keep things in tid
-	 * order.
-	 */
-	list_for_each_entry_safe(req, nreq, &osd->o_linger_requests,
-				 r_linger_osd_item) {
-		WARN_ON(!list_empty(&req->r_req_lru_item));
-		__kick_linger_request(req);
-	}
-
-	list_for_each_entry_safe(req, nreq, &resend_linger, r_req_lru_item)
-		__kick_linger_request(req);
-}
-
-/*
- * If the osd connection drops, we need to resubmit all requests.
- */
-static void osd_reset(struct ceph_connection *con)
-{
-	struct ceph_osd *osd = con->private;
-	struct ceph_osd_client *osdc;
-
-	if (!osd)
-		return;
-	dout("osd_reset osd%d\n", osd->o_osd);
-	osdc = osd->o_osdc;
-	down_read(&osdc->map_sem);
-	mutex_lock(&osdc->request_mutex);
-	__kick_osd_requests(osdc, osd);
-	__send_queued(osdc);
-	mutex_unlock(&osdc->request_mutex);
-	up_read(&osdc->map_sem);
+	return !RB_EMPTY_NODE(&osd->o_node);
 }
 
 /*
@@ -1013,17 +904,18 @@
 {
 	atomic_set(&osd->o_ref, 1);
 	RB_CLEAR_NODE(&osd->o_node);
-	INIT_LIST_HEAD(&osd->o_requests);
+	osd->o_requests = RB_ROOT;
 	INIT_LIST_HEAD(&osd->o_linger_requests);
 	INIT_LIST_HEAD(&osd->o_osd_lru);
 	INIT_LIST_HEAD(&osd->o_keepalive_item);
 	osd->o_incarnation = 1;
+	mutex_init(&osd->lock);
 }
 
 static void osd_cleanup(struct ceph_osd *osd)
 {
 	WARN_ON(!RB_EMPTY_NODE(&osd->o_node));
-	WARN_ON(!list_empty(&osd->o_requests));
+	WARN_ON(!RB_EMPTY_ROOT(&osd->o_requests));
 	WARN_ON(!list_empty(&osd->o_linger_requests));
 	WARN_ON(!list_empty(&osd->o_osd_lru));
 	WARN_ON(!list_empty(&osd->o_keepalive_item));
@@ -1077,30 +969,6 @@
 
 DEFINE_RB_FUNCS(osd, struct ceph_osd, o_osd, o_node)
 
-/*
- * remove an osd from our map
- */
-static void __remove_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
-{
-	dout("%s %p osd%d\n", __func__, osd, osd->o_osd);
-	WARN_ON(!list_empty(&osd->o_requests));
-	WARN_ON(!list_empty(&osd->o_linger_requests));
-
-	list_del_init(&osd->o_osd_lru);
-	erase_osd(&osdc->osds, osd);
-}
-
-static void remove_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
-{
-	dout("%s %p osd%d\n", __func__, osd, osd->o_osd);
-
-	if (!RB_EMPTY_NODE(&osd->o_node)) {
-		ceph_con_close(&osd->o_con);
-		__remove_osd(osdc, osd);
-		put_osd(osd);
-	}
-}
-
 static void __move_osd_to_lru(struct ceph_osd *osd)
 {
 	struct ceph_osd_client *osdc = osd->o_osdc;
@@ -1117,7 +985,7 @@
 
 static void maybe_move_osd_to_lru(struct ceph_osd *osd)
 {
-	if (list_empty(&osd->o_requests) &&
+	if (RB_EMPTY_ROOT(&osd->o_requests) &&
 	    list_empty(&osd->o_linger_requests))
 		__move_osd_to_lru(osd);
 }
@@ -1135,29 +1003,63 @@
 }
 
 /*
+ * Close the connection and assign any leftover requests to the
+ * homeless session.
+ */
+static void close_osd(struct ceph_osd *osd)
+{
+	struct ceph_osd_client *osdc = osd->o_osdc;
+	struct rb_node *n;
+
+	verify_osdc_wrlocked(osdc);
+	dout("%s osd %p osd%d\n", __func__, osd, osd->o_osd);
+
+	ceph_con_close(&osd->o_con);
+
+	for (n = rb_first(&osd->o_requests); n; ) {
+		struct ceph_osd_request *req =
+		    rb_entry(n, struct ceph_osd_request, r_node);
+
+		n = rb_next(n); /* unlink_request() */
+
+		dout(" reassigning req %p tid %llu\n", req, req->r_tid);
+		unlink_request(osd, req);
+		link_request(&osdc->homeless_osd, req);
+	}
+
+	__remove_osd_from_lru(osd);
+	erase_osd(&osdc->osds, osd);
+	put_osd(osd);
+}
+
+/*
  * reset osd connect
  */
-static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
+static int reopen_osd(struct ceph_osd *osd)
 {
 	struct ceph_entity_addr *peer_addr;
 
-	dout("__reset_osd %p osd%d\n", osd, osd->o_osd);
-	if (list_empty(&osd->o_requests) &&
+	dout("%s osd %p osd%d\n", __func__, osd, osd->o_osd);
+
+	if (RB_EMPTY_ROOT(&osd->o_requests) &&
 	    list_empty(&osd->o_linger_requests)) {
-		remove_osd(osdc, osd);
+		close_osd(osd);
 		return -ENODEV;
 	}
 
-	peer_addr = &osdc->osdmap->osd_addr[osd->o_osd];
+	peer_addr = &osd->o_osdc->osdmap->osd_addr[osd->o_osd];
 	if (!memcmp(peer_addr, &osd->o_con.peer_addr, sizeof (*peer_addr)) &&
 			!ceph_con_opened(&osd->o_con)) {
-		struct ceph_osd_request *req;
+		struct rb_node *n;
 
 		dout("osd addr hasn't changed and connection never opened, "
 		     "letting msgr retry\n");
 		/* touch each r_stamp for handle_timeout()'s benfit */
-		list_for_each_entry(req, &osd->o_requests, r_osd_item)
+		for (n = rb_first(&osd->o_requests); n; n = rb_next(n)) {
+			struct ceph_osd_request *req =
+			    rb_entry(n, struct ceph_osd_request, r_node);
 			req->r_stamp = jiffies;
+		}
 
 		return -EAGAIN;
 	}
@@ -1169,73 +1071,84 @@
 	return 0;
 }
 
-/*
- * Register request, assign tid.  If this is the first request, set up
- * the timeout event.
- */
-static void __register_request(struct ceph_osd_client *osdc,
-			       struct ceph_osd_request *req)
+static struct ceph_osd *lookup_create_osd(struct ceph_osd_client *osdc, int o,
+					  bool wrlocked)
 {
-	req->r_tid = ++osdc->last_tid;
-	req->r_request->hdr.tid = cpu_to_le64(req->r_tid);
-	dout("__register_request %p tid %lld\n", req, req->r_tid);
-	insert_request(&osdc->requests, req);
-	ceph_osdc_get_request(req);
-	osdc->num_requests++;
+	struct ceph_osd *osd;
+
+	if (wrlocked)
+		verify_osdc_wrlocked(osdc);
+	else
+		verify_osdc_locked(osdc);
+
+	if (o != CEPH_HOMELESS_OSD)
+		osd = lookup_osd(&osdc->osds, o);
+	else
+		osd = &osdc->homeless_osd;
+	if (!osd) {
+		if (!wrlocked)
+			return ERR_PTR(-EAGAIN);
+
+		osd = create_osd(osdc, o);
+		insert_osd(&osdc->osds, osd);
+		ceph_con_open(&osd->o_con, CEPH_ENTITY_TYPE_OSD, osd->o_osd,
+			      &osdc->osdmap->osd_addr[osd->o_osd]);
+	}
+
+	dout("%s osdc %p osd%d -> osd %p\n", __func__, osdc, o, osd);
+	return osd;
 }
 
 /*
- * called under osdc->request_mutex
+ * Create request <-> OSD session relation.
+ *
+ * @req has to be assigned a tid, @osd may be homeless.
  */
-static void __unregister_request(struct ceph_osd_client *osdc,
-				 struct ceph_osd_request *req)
+static void link_request(struct ceph_osd *osd, struct ceph_osd_request *req)
 {
-	if (RB_EMPTY_NODE(&req->r_node)) {
-		dout("__unregister_request %p tid %lld not registered\n",
-			req, req->r_tid);
-		return;
-	}
+	verify_osd_locked(osd);
+	WARN_ON(!req->r_tid || req->r_osd);
+	dout("%s osd %p osd%d req %p tid %llu\n", __func__, osd, osd->o_osd,
+	     req, req->r_tid);
 
-	dout("__unregister_request %p tid %lld\n", req, req->r_tid);
-	erase_request(&osdc->requests, req);
-	osdc->num_requests--;
+	if (!osd_homeless(osd))
+		__remove_osd_from_lru(osd);
+	else
+		atomic_inc(&osd->o_osdc->num_homeless);
 
-	if (req->r_osd) {
-		/* make sure the original request isn't in flight. */
-		ceph_msg_revoke(req->r_request);
-
-		list_del_init(&req->r_osd_item);
-		maybe_move_osd_to_lru(req->r_osd);
-		if (list_empty(&req->r_linger_osd_item))
-			req->r_osd = NULL;
-	}
-
-	list_del_init(&req->r_req_lru_item);
-	ceph_osdc_put_request(req);
+	get_osd(osd);
+	insert_request(&osd->o_requests, req);
+	req->r_osd = osd;
 }
 
-/*
- * Cancel a previously queued request message
- */
-static void __cancel_request(struct ceph_osd_request *req)
+static void unlink_request(struct ceph_osd *osd, struct ceph_osd_request *req)
 {
-	if (req->r_sent && req->r_osd) {
-		ceph_msg_revoke(req->r_request);
-		req->r_sent = 0;
-	}
+	verify_osd_locked(osd);
+	WARN_ON(req->r_osd != osd);
+	dout("%s osd %p osd%d req %p tid %llu\n", __func__, osd, osd->o_osd,
+	     req, req->r_tid);
+
+	req->r_osd = NULL;
+	erase_request(&osd->o_requests, req);
+	put_osd(osd);
+
+	if (!osd_homeless(osd))
+		maybe_move_osd_to_lru(osd);
+	else
+		atomic_dec(&osd->o_osdc->num_homeless);
 }
 
-static void __register_linger_request(struct ceph_osd_client *osdc,
+static void __register_linger_request(struct ceph_osd *osd,
 				    struct ceph_osd_request *req)
 {
 	dout("%s %p tid %llu\n", __func__, req, req->r_tid);
 	WARN_ON(!req->r_linger);
 
 	ceph_osdc_get_request(req);
-	list_add_tail(&req->r_linger_item, &osdc->req_linger);
-	if (req->r_osd)
-		list_add_tail(&req->r_linger_osd_item,
-			      &req->r_osd->o_linger_requests);
+	list_add_tail(&req->r_linger_item, &osd->o_osdc->req_linger);
+	list_add_tail(&req->r_linger_osd_item, &osd->o_linger_requests);
+	__remove_osd_from_lru(osd);
+	req->r_osd = osd;
 }
 
 static void __unregister_linger_request(struct ceph_osd_client *osdc,
@@ -1255,7 +1168,7 @@
 	if (req->r_osd) {
 		list_del_init(&req->r_linger_osd_item);
 		maybe_move_osd_to_lru(req->r_osd);
-		if (list_empty(&req->r_osd_item))
+		if (RB_EMPTY_ROOT(&req->r_osd->o_requests))
 			req->r_osd = NULL;
 	}
 	ceph_osdc_put_request(req);
@@ -1291,11 +1204,20 @@
 	return false;
 }
 
+static bool pool_full(struct ceph_osd_client *osdc, s64 pool_id)
+{
+	struct ceph_pg_pool_info *pi;
+
+	pi = ceph_pg_pool_by_id(osdc->osdmap, pool_id);
+	if (!pi)
+		return false;
+
+	return __pool_full(pi);
+}
+
 /*
  * Returns whether a request should be blocked from being sent
  * based on the current osdmap and osd_client settings.
- *
- * Caller should hold map_sem for read.
  */
 static bool target_should_be_paused(struct ceph_osd_client *osdc,
 				    const struct ceph_osd_request_target *t,
@@ -1421,87 +1343,6 @@
 	return ct_res;
 }
 
-static void __enqueue_request(struct ceph_osd_request *req)
-{
-	struct ceph_osd_client *osdc = req->r_osdc;
-
-	dout("%s %p tid %llu to osd%d\n", __func__, req, req->r_tid,
-	     req->r_osd ? req->r_osd->o_osd : -1);
-
-	if (req->r_osd) {
-		__remove_osd_from_lru(req->r_osd);
-		list_add_tail(&req->r_osd_item, &req->r_osd->o_requests);
-		list_move_tail(&req->r_req_lru_item, &osdc->req_unsent);
-	} else {
-		list_move_tail(&req->r_req_lru_item, &osdc->req_notarget);
-	}
-}
-
-/*
- * Pick an osd (the first 'up' osd in the pg), allocate the osd struct
- * (as needed), and set the request r_osd appropriately.  If there is
- * no up osd, set r_osd to NULL.  Move the request to the appropriate list
- * (unsent, homeless) or leave on in-flight lru.
- *
- * Return 0 if unchanged, 1 if changed, or negative on error.
- *
- * Caller should hold map_sem for read and request_mutex.
- */
-static int __map_request(struct ceph_osd_client *osdc,
-			 struct ceph_osd_request *req, int force_resend)
-{
-	enum calc_target_result ct_res;
-	int err;
-
-	dout("map_request %p tid %lld\n", req, req->r_tid);
-
-	ct_res = calc_target(osdc, &req->r_t, NULL, force_resend);
-	switch (ct_res) {
-	case CALC_TARGET_POOL_DNE:
-		list_move(&req->r_req_lru_item, &osdc->req_notarget);
-		return -EIO;
-	case CALC_TARGET_NO_ACTION:
-		return 0;  /* no change */
-	default:
-		BUG_ON(ct_res != CALC_TARGET_NEED_RESEND);
-	}
-
-	dout("map_request tid %llu pgid %lld.%x osd%d (was osd%d)\n",
-	     req->r_tid, req->r_t.pgid.pool, req->r_t.pgid.seed, req->r_t.osd,
-	     req->r_osd ? req->r_osd->o_osd : -1);
-
-	if (req->r_osd) {
-		__cancel_request(req);
-		list_del_init(&req->r_osd_item);
-		list_del_init(&req->r_linger_osd_item);
-		req->r_osd = NULL;
-	}
-
-	req->r_osd = lookup_osd(&osdc->osds, req->r_t.osd);
-	if (!req->r_osd && req->r_t.osd >= 0) {
-		err = -ENOMEM;
-		req->r_osd = create_osd(osdc, req->r_t.osd);
-		if (!req->r_osd) {
-			list_move(&req->r_req_lru_item, &osdc->req_notarget);
-			goto out;
-		}
-
-		dout("map_request osd %p is osd%d\n", req->r_osd,
-		     req->r_osd->o_osd);
-		insert_osd(&osdc->osds, req->r_osd);
-
-		ceph_con_open(&req->r_osd->o_con,
-			      CEPH_ENTITY_TYPE_OSD, req->r_osd->o_osd,
-			      &osdc->osdmap->osd_addr[req->r_osd->o_osd]);
-	}
-
-	__enqueue_request(req);
-	err = 1;   /* osd or pg changed */
-
-out:
-	return err;
-}
-
 static void setup_request_data(struct ceph_osd_request *req,
 			       struct ceph_msg *msg)
 {
@@ -1648,8 +1489,16 @@
 {
 	struct ceph_osd *osd = req->r_osd;
 
+	verify_osd_locked(osd);
 	WARN_ON(osd->o_osd != req->r_t.osd);
 
+	/*
+	 * We may have a previously queued request message hanging
+	 * around.  Cancel it to avoid corrupting the msgr.
+	 */
+	if (req->r_sent)
+		ceph_msg_revoke(req->r_request);
+
 	req->r_flags |= CEPH_OSD_FLAG_KNOWN_REDIR;
 	if (req->r_attempts)
 		req->r_flags |= CEPH_OSD_FLAG_RETRY;
@@ -1671,24 +1520,11 @@
 	ceph_con_send(&osd->o_con, ceph_msg_get(req->r_request));
 }
 
-/*
- * Send any requests in the queue (req_unsent).
- */
-static void __send_queued(struct ceph_osd_client *osdc)
-{
-	struct ceph_osd_request *req, *tmp;
-
-	dout("__send_queued\n");
-	list_for_each_entry_safe(req, tmp, &osdc->req_unsent, r_req_lru_item) {
-		list_move_tail(&req->r_req_lru_item, &osdc->req_lru);
-		send_request(req);
-	}
-}
-
 static void maybe_request_map(struct ceph_osd_client *osdc)
 {
 	bool continuous = false;
 
+	verify_osdc_locked(osdc);
 	WARN_ON(!osdc->osdmap->epoch);
 
 	if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL) ||
@@ -1705,38 +1541,121 @@
 		ceph_monc_renew_subs(&osdc->client->monc);
 }
 
-/*
- * Caller should hold map_sem for read and request_mutex.
- */
-static int __ceph_osdc_start_request(struct ceph_osd_client *osdc,
-				     struct ceph_osd_request *req,
-				     bool nofail)
+static void __submit_request(struct ceph_osd_request *req, bool wrlocked)
 {
-	int rc;
+	struct ceph_osd_client *osdc = req->r_osdc;
+	struct ceph_osd *osd;
+	bool need_send = false;
+	bool promoted = false;
 
-	__register_request(osdc, req);
-	req->r_sent = 0;
-	req->r_got_reply = 0;
-	rc = __map_request(osdc, req, 0);
-	if (rc < 0) {
-		if (nofail) {
-			dout("osdc_start_request failed map, "
-				" will retry %lld\n", req->r_tid);
-			rc = 0;
-		} else {
-			__unregister_request(osdc, req);
-		}
-		return rc;
+	WARN_ON(req->r_tid || req->r_got_reply);
+	dout("%s req %p wrlocked %d\n", __func__, req, wrlocked);
+
+again:
+	calc_target(osdc, &req->r_t, &req->r_last_force_resend, false);
+	osd = lookup_create_osd(osdc, req->r_t.osd, wrlocked);
+	if (IS_ERR(osd)) {
+		WARN_ON(PTR_ERR(osd) != -EAGAIN || wrlocked);
+		goto promote;
 	}
 
-	if (req->r_osd == NULL) {
-		dout("send_request %p no up osds in pg\n", req);
-		ceph_monc_request_next_osdmap(&osdc->client->monc);
+	if ((req->r_flags & CEPH_OSD_FLAG_WRITE) &&
+	    ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSEWR)) {
+		dout("req %p pausewr\n", req);
+		req->r_t.paused = true;
+		maybe_request_map(osdc);
+	} else if ((req->r_flags & CEPH_OSD_FLAG_READ) &&
+		   ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSERD)) {
+		dout("req %p pauserd\n", req);
+		req->r_t.paused = true;
+		maybe_request_map(osdc);
+	} else if ((req->r_flags & CEPH_OSD_FLAG_WRITE) &&
+		   !(req->r_flags & (CEPH_OSD_FLAG_FULL_TRY |
+				     CEPH_OSD_FLAG_FULL_FORCE)) &&
+		   (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL) ||
+		    pool_full(osdc, req->r_t.base_oloc.pool))) {
+		dout("req %p full/pool_full\n", req);
+		pr_warn_ratelimited("FULL or reached pool quota\n");
+		req->r_t.paused = true;
+		maybe_request_map(osdc);
+	} else if (!osd_homeless(osd)) {
+		need_send = true;
 	} else {
-		__send_queued(osdc);
+		maybe_request_map(osdc);
 	}
 
-	return 0;
+	mutex_lock(&osd->lock);
+	/*
+	 * Assign the tid atomically with send_request() to protect
+	 * multiple writes to the same object from racing with each
+	 * other, resulting in out of order ops on the OSDs.
+	 */
+	req->r_tid = atomic64_inc_return(&osdc->last_tid);
+	link_request(osd, req);
+	if (need_send)
+		send_request(req);
+	mutex_unlock(&osd->lock);
+
+	if (promoted)
+		downgrade_write(&osdc->lock);
+	return;
+
+promote:
+	up_read(&osdc->lock);
+	down_write(&osdc->lock);
+	wrlocked = true;
+	promoted = true;
+	goto again;
+}
+
+static void account_request(struct ceph_osd_request *req)
+{
+	unsigned int mask = CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK;
+
+	if (req->r_flags & CEPH_OSD_FLAG_READ) {
+		WARN_ON(req->r_flags & mask);
+		req->r_flags |= CEPH_OSD_FLAG_ACK;
+	} else if (req->r_flags & CEPH_OSD_FLAG_WRITE)
+		WARN_ON(!(req->r_flags & mask));
+	else
+		WARN_ON(1);
+
+	WARN_ON(req->r_unsafe_callback && (req->r_flags & mask) != mask);
+	atomic_inc(&req->r_osdc->num_requests);
+}
+
+static void submit_request(struct ceph_osd_request *req, bool wrlocked)
+{
+	ceph_osdc_get_request(req);
+	account_request(req);
+	__submit_request(req, wrlocked);
+}
+
+static void __finish_request(struct ceph_osd_request *req)
+{
+	struct ceph_osd_client *osdc = req->r_osdc;
+	struct ceph_osd *osd = req->r_osd;
+
+	verify_osd_locked(osd);
+	dout("%s req %p tid %llu\n", __func__, req, req->r_tid);
+
+	unlink_request(osd, req);
+	atomic_dec(&osdc->num_requests);
+
+	/*
+	 * If an OSD has failed or returned and a request has been sent
+	 * twice, it's possible to get a reply and end up here while the
+	 * request message is queued for delivery.  We will ignore the
+	 * reply, so not a big deal, but better to try and catch it.
+	 */
+	ceph_msg_revoke(req->r_request);
+	ceph_msg_revoke_incoming(req->r_reply);
+}
+
+static void finish_request(struct ceph_osd_request *req)
+{
+	__finish_request(req);
+	ceph_osdc_put_request(req);
 }
 
 static void __complete_request(struct ceph_osd_request *req)
@@ -1747,6 +1666,13 @@
 		complete_all(&req->r_completion);
 }
 
+static void cancel_request(struct ceph_osd_request *req)
+{
+	dout("%s req %p tid %llu\n", __func__, req, req->r_tid);
+
+	finish_request(req);
+}
+
 /*
  * Timeout callback, called every N seconds.  When 1 or more OSD
  * requests has been active for more than N seconds, we send a keepalive
@@ -1758,44 +1684,49 @@
 	struct ceph_osd_client *osdc =
 		container_of(work, struct ceph_osd_client, timeout_work.work);
 	struct ceph_options *opts = osdc->client->options;
-	struct ceph_osd_request *req;
-	struct ceph_osd *osd;
-	struct list_head slow_osds;
-	dout("timeout\n");
-	down_read(&osdc->map_sem);
+	unsigned long cutoff = jiffies - opts->osd_keepalive_timeout;
+	LIST_HEAD(slow_osds);
+	struct rb_node *n, *p;
 
-	ceph_monc_request_next_osdmap(&osdc->client->monc);
-
-	mutex_lock(&osdc->request_mutex);
+	dout("%s osdc %p\n", __func__, osdc);
+	down_write(&osdc->lock);
 
 	/*
 	 * ping osds that are a bit slow.  this ensures that if there
 	 * is a break in the TCP connection we will notice, and reopen
 	 * a connection with that osd (from the fault callback).
 	 */
-	INIT_LIST_HEAD(&slow_osds);
-	list_for_each_entry(req, &osdc->req_lru, r_req_lru_item) {
-		if (time_before(jiffies,
-				req->r_stamp + opts->osd_keepalive_timeout))
-			break;
+	for (n = rb_first(&osdc->osds); n; n = rb_next(n)) {
+		struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node);
+		bool found = false;
 
-		osd = req->r_osd;
-		BUG_ON(!osd);
-		dout(" tid %llu is slow, will send keepalive on osd%d\n",
-		     req->r_tid, osd->o_osd);
-		list_move_tail(&osd->o_keepalive_item, &slow_osds);
+		for (p = rb_first(&osd->o_requests); p; p = rb_next(p)) {
+			struct ceph_osd_request *req =
+			    rb_entry(p, struct ceph_osd_request, r_node);
+
+			if (time_before(req->r_stamp, cutoff)) {
+				dout(" req %p tid %llu on osd%d is laggy\n",
+				     req, req->r_tid, osd->o_osd);
+				found = true;
+			}
+		}
+
+		if (found)
+			list_move_tail(&osd->o_keepalive_item, &slow_osds);
 	}
+
+	if (atomic_read(&osdc->num_homeless) || !list_empty(&slow_osds))
+		maybe_request_map(osdc);
+
 	while (!list_empty(&slow_osds)) {
-		osd = list_entry(slow_osds.next, struct ceph_osd,
-				 o_keepalive_item);
+		struct ceph_osd *osd = list_first_entry(&slow_osds,
+							struct ceph_osd,
+							o_keepalive_item);
 		list_del_init(&osd->o_keepalive_item);
 		ceph_con_keepalive(&osd->o_con);
 	}
 
-	__send_queued(osdc);
-	mutex_unlock(&osdc->request_mutex);
-	up_read(&osdc->map_sem);
-
+	up_write(&osdc->lock);
 	schedule_delayed_work(&osdc->timeout_work,
 			      osdc->client->options->osd_keepalive_timeout);
 }
@@ -1809,18 +1740,17 @@
 	struct ceph_osd *osd, *nosd;
 
 	dout("%s osdc %p\n", __func__, osdc);
-	down_read(&osdc->map_sem);
-	mutex_lock(&osdc->request_mutex);
-
+	down_write(&osdc->lock);
 	list_for_each_entry_safe(osd, nosd, &osdc->osd_lru, o_osd_lru) {
 		if (time_before(jiffies, osd->lru_ttl))
 			break;
 
-		remove_osd(osdc, osd);
+		WARN_ON(!RB_EMPTY_ROOT(&osd->o_requests));
+		WARN_ON(!list_empty(&osd->o_linger_requests));
+		close_osd(osd);
 	}
 
-	mutex_unlock(&osdc->request_mutex);
-	up_read(&osdc->map_sem);
+	up_write(&osdc->lock);
 	schedule_delayed_work(&osdc->osds_timeout_work,
 			      round_jiffies_relative(delay));
 }
@@ -2045,8 +1975,9 @@
  * when we get the safe reply	r_unsafe_cb(false),	r_cb/r_completion,
  *				r_safe_completion	r_safe_completion
  */
-static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg)
+static void handle_reply(struct ceph_osd *osd, struct ceph_msg *msg)
 {
+	struct ceph_osd_client *osdc = osd->o_osdc;
 	struct ceph_osd_request *req;
 	struct MOSDOpReply m;
 	u64 tid = le64_to_cpu(msg->hdr.tid);
@@ -2057,14 +1988,19 @@
 
 	dout("%s msg %p tid %llu\n", __func__, msg, tid);
 
-	down_read(&osdc->map_sem);
-	mutex_lock(&osdc->request_mutex);
-	req = lookup_request(&osdc->requests, tid);
-	if (!req) {
-		dout("%s no tid %llu\n", __func__, tid);
-		goto out_unlock;
+	down_read(&osdc->lock);
+	if (!osd_registered(osd)) {
+		dout("%s osd%d unknown\n", __func__, osd->o_osd);
+		goto out_unlock_osdc;
 	}
-	ceph_osdc_get_request(req);
+	WARN_ON(osd->o_osd != le64_to_cpu(msg->hdr.src.num));
+
+	mutex_lock(&osd->lock);
+	req = lookup_request(&osd->o_requests, tid);
+	if (!req) {
+		dout("%s osd%d tid %llu unknown\n", __func__, osd->o_osd, tid);
+		goto out_unlock_session;
+	}
 
 	ret = decode_MOSDOpReply(msg, &m);
 	if (ret) {
@@ -2083,7 +2019,7 @@
 			dout("req %p tid %llu retry_attempt %d != %d, ignoring\n",
 			     req, req->r_tid, m.retry_attempt,
 			     req->r_attempts - 1);
-			goto out_put;
+			goto out_unlock_session;
 		}
 	} else {
 		WARN_ON(1); /* MOSDOpReply v4 is assumed */
@@ -2092,22 +2028,14 @@
 	if (!ceph_oloc_empty(&m.redirect.oloc)) {
 		dout("req %p tid %llu redirect pool %lld\n", req, req->r_tid,
 		     m.redirect.oloc.pool);
-		__unregister_request(osdc, req);
+		unlink_request(osd, req);
+		mutex_unlock(&osd->lock);
 
 		ceph_oloc_copy(&req->r_t.target_oloc, &m.redirect.oloc);
-
-		/*
-		 * Start redirect requests with nofail=true.  If
-		 * mapping fails, request will end up on the notarget
-		 * list, waiting for the new osdmap (which can take
-		 * a while), even though the original request mapped
-		 * successfully.  In the future we might want to follow
-		 * original request's nofail setting here.
-		 */
-		ret = __ceph_osdc_start_request(osdc, req, true);
-		BUG_ON(ret);
-
-		goto out_put;
+		req->r_flags |= CEPH_OSD_FLAG_REDIRECTED;
+		req->r_tid = 0;
+		__submit_request(req, false);
+		goto out_unlock_osdc;
 	}
 
 	if (m.num_ops != req->r_num_ops) {
@@ -2137,19 +2065,19 @@
 		req->r_got_reply = true;
 	} else if (!(m.flags & CEPH_OSD_FLAG_ONDISK)) {
 		dout("req %p tid %llu dup ack\n", req, req->r_tid);
-		goto out_put;
+		goto out_unlock_session;
 	}
 
 	if (done_request(req, &m)) {
-		__unregister_request(osdc, req);
+		__finish_request(req);
 		if (req->r_linger) {
 			WARN_ON(req->r_unsafe_callback);
-			__register_linger_request(osdc, req);
+			__register_linger_request(osd, req);
 		}
 	}
 
-	mutex_unlock(&osdc->request_mutex);
-	up_read(&osdc->map_sem);
+	mutex_unlock(&osd->lock);
+	up_read(&osdc->lock);
 
 	if (done_request(req, &m)) {
 		if (already_acked && req->r_unsafe_callback) {
@@ -2175,14 +2103,13 @@
 
 fail_request:
 	req->r_result = -EIO;
-	__unregister_request(osdc, req);
+	__finish_request(req);
 	__complete_request(req);
 	complete_all(&req->r_safe_completion);
-out_put:
-	ceph_osdc_put_request(req);
-out_unlock:
-	mutex_unlock(&osdc->request_mutex);
-	up_read(&osdc->map_sem);
+out_unlock_session:
+	mutex_unlock(&osd->lock);
+out_unlock_osdc:
+	up_read(&osdc->lock);
 }
 
 static void set_pool_was_full(struct ceph_osd_client *osdc)
@@ -2197,126 +2124,66 @@
 	}
 }
 
-static void reset_changed_osds(struct ceph_osd_client *osdc)
+static bool pool_cleared_full(struct ceph_osd_client *osdc, s64 pool_id)
 {
-	struct rb_node *p, *n;
+	struct ceph_pg_pool_info *pi;
 
-	dout("%s %p\n", __func__, osdc);
-	for (p = rb_first(&osdc->osds); p; p = n) {
-		struct ceph_osd *osd = rb_entry(p, struct ceph_osd, o_node);
+	pi = ceph_pg_pool_by_id(osdc->osdmap, pool_id);
+	if (!pi)
+		return false;
 
-		n = rb_next(p);
-		if (!ceph_osd_is_up(osdc->osdmap, osd->o_osd) ||
-		    memcmp(&osd->o_con.peer_addr,
-			   ceph_osd_addr(osdc->osdmap,
-					 osd->o_osd),
-			   sizeof(struct ceph_entity_addr)) != 0)
-			__reset_osd(osdc, osd);
-	}
+	return pi->was_full && !__pool_full(pi);
 }
 
 /*
- * Requeue requests whose mapping to an OSD has changed.  If requests map to
- * no osd, request a new map.
- *
- * Caller should hold map_sem for read.
+ * Requeue requests whose mapping to an OSD has changed.
  */
-static void kick_requests(struct ceph_osd_client *osdc, bool force_resend,
-			  bool force_resend_writes)
+static void scan_requests(struct ceph_osd *osd,
+			  bool force_resend,
+			  bool cleared_full,
+			  bool check_pool_cleared_full,
+			  struct rb_root *need_resend,
+			  struct list_head *need_resend_linger)
 {
-	struct ceph_osd_request *req, *nreq;
-	struct rb_node *p;
-	int needmap = 0;
-	int err;
-	bool force_resend_req;
+	struct ceph_osd_client *osdc = osd->o_osdc;
+	struct rb_node *n;
+	bool force_resend_writes;
 
-	dout("kick_requests %s %s\n", force_resend ? " (force resend)" : "",
-		force_resend_writes ? " (force resend writes)" : "");
-	mutex_lock(&osdc->request_mutex);
-	for (p = rb_first(&osdc->requests); p; ) {
-		req = rb_entry(p, struct ceph_osd_request, r_node);
-		p = rb_next(p);
+	for (n = rb_first(&osd->o_requests); n; ) {
+		struct ceph_osd_request *req =
+		    rb_entry(n, struct ceph_osd_request, r_node);
+		enum calc_target_result ct_res;
 
-		/*
-		 * For linger requests that have not yet been
-		 * registered, move them to the linger list; they'll
-		 * be sent to the osd in the loop below.  Unregister
-		 * the request before re-registering it as a linger
-		 * request to ensure the __map_request() below
-		 * will decide it needs to be sent.
-		 */
-		if (req->r_linger && list_empty(&req->r_linger_item)) {
-			dout("%p tid %llu restart on osd%d\n",
-			     req, req->r_tid,
-			     req->r_osd ? req->r_osd->o_osd : -1);
-			ceph_osdc_get_request(req);
-			__unregister_request(osdc, req);
-			__register_linger_request(osdc, req);
-			ceph_osdc_put_request(req);
-			continue;
+		n = rb_next(n); /* unlink_request() */
+
+		dout("%s req %p tid %llu\n", __func__, req, req->r_tid);
+		ct_res = calc_target(osdc, &req->r_t,
+				     &req->r_last_force_resend, false);
+		switch (ct_res) {
+		case CALC_TARGET_NO_ACTION:
+			force_resend_writes = cleared_full ||
+			    (check_pool_cleared_full &&
+			     pool_cleared_full(osdc, req->r_t.base_oloc.pool));
+			if (!force_resend &&
+			    (!(req->r_flags & CEPH_OSD_FLAG_WRITE) ||
+			     !force_resend_writes))
+				break;
+
+			/* fall through */
+		case CALC_TARGET_NEED_RESEND:
+			unlink_request(osd, req);
+			insert_request(need_resend, req);
+			break;
+		case CALC_TARGET_POOL_DNE:
+			break;
 		}
-
-		force_resend_req = force_resend ||
-			(force_resend_writes &&
-				req->r_flags & CEPH_OSD_FLAG_WRITE);
-		err = __map_request(osdc, req, force_resend_req);
-		if (err < 0)
-			continue;  /* error */
-		if (req->r_osd == NULL) {
-			dout("%p tid %llu maps to no osd\n", req, req->r_tid);
-			needmap++;  /* request a newer map */
-		} else if (err > 0) {
-			if (!req->r_linger) {
-				dout("%p tid %llu requeued on osd%d\n", req,
-				     req->r_tid,
-				     req->r_osd ? req->r_osd->o_osd : -1);
-				req->r_flags |= CEPH_OSD_FLAG_RETRY;
-			}
-		}
-	}
-
-	list_for_each_entry_safe(req, nreq, &osdc->req_linger,
-				 r_linger_item) {
-		dout("linger req=%p req->r_osd=%p\n", req, req->r_osd);
-
-		err = __map_request(osdc, req,
-				    force_resend || force_resend_writes);
-		dout("__map_request returned %d\n", err);
-		if (err < 0)
-			continue;  /* hrm! */
-		if (req->r_osd == NULL || err > 0) {
-			if (req->r_osd == NULL) {
-				dout("lingering %p tid %llu maps to no osd\n",
-				     req, req->r_tid);
-				/*
-				 * A homeless lingering request makes
-				 * no sense, as it's job is to keep
-				 * a particular OSD connection open.
-				 * Request a newer map and kick the
-				 * request, knowing that it won't be
-				 * resent until we actually get a map
-				 * that can tell us where to send it.
-				 */
-				needmap++;
-			}
-
-			dout("kicking lingering %p tid %llu osd%d\n", req,
-			     req->r_tid, req->r_osd ? req->r_osd->o_osd : -1);
-			__register_request(osdc, req);
-			__unregister_linger_request(osdc, req);
-		}
-	}
-	reset_changed_osds(osdc);
-	mutex_unlock(&osdc->request_mutex);
-
-	if (needmap) {
-		dout("%d requests for down osds, need new map\n", needmap);
-		ceph_monc_request_next_osdmap(&osdc->client->monc);
 	}
 }
 
 static int handle_one_map(struct ceph_osd_client *osdc,
-			  void *p, void *end, bool incremental)
+			  void *p, void *end, bool incremental,
+			  struct rb_root *need_resend,
+			  struct list_head *need_resend_linger)
 {
 	struct ceph_osdmap *newmap;
 	struct rb_node *n;
@@ -2362,11 +2229,51 @@
 	}
 
 	was_full &= !ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL);
-	kick_requests(osdc, skipped_map, was_full);
+	scan_requests(&osdc->homeless_osd, skipped_map, was_full, true,
+		      need_resend, need_resend_linger);
+
+	for (n = rb_first(&osdc->osds); n; ) {
+		struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node);
+
+		n = rb_next(n); /* close_osd() */
+
+		scan_requests(osd, skipped_map, was_full, true, need_resend,
+			      need_resend_linger);
+		if (!ceph_osd_is_up(osdc->osdmap, osd->o_osd) ||
+		    memcmp(&osd->o_con.peer_addr,
+			   ceph_osd_addr(osdc->osdmap, osd->o_osd),
+			   sizeof(struct ceph_entity_addr)))
+			close_osd(osd);
+	}
 
 	return 0;
 }
 
+static void kick_requests(struct ceph_osd_client *osdc,
+			  struct rb_root *need_resend,
+			  struct list_head *need_resend_linger)
+{
+	struct rb_node *n;
+
+	for (n = rb_first(need_resend); n; ) {
+		struct ceph_osd_request *req =
+		    rb_entry(n, struct ceph_osd_request, r_node);
+		struct ceph_osd *osd;
+
+		n = rb_next(n);
+		erase_request(need_resend, req); /* before link_request() */
+
+		WARN_ON(req->r_osd);
+		calc_target(osdc, &req->r_t, NULL, false);
+		osd = lookup_create_osd(osdc, req->r_t.osd, true);
+		link_request(osd, req);
+		if (!req->r_linger) {
+			if (!osd_homeless(osd) && !req->r_t.paused)
+				send_request(req);
+		}
+	}
+}
+
 /*
  * Process updated osd map.
  *
@@ -2381,13 +2288,15 @@
 	u32 nr_maps, maplen;
 	u32 epoch;
 	struct ceph_fsid fsid;
+	struct rb_root need_resend = RB_ROOT;
+	LIST_HEAD(need_resend_linger);
 	bool handled_incremental = false;
 	bool was_pauserd, was_pausewr;
 	bool pauserd, pausewr;
 	int err;
 
 	dout("%s have %u\n", __func__, osdc->osdmap->epoch);
-	down_write(&osdc->map_sem);
+	down_write(&osdc->lock);
 
 	/* verify fsid */
 	ceph_decode_need(&p, end, sizeof(fsid), bad);
@@ -2412,7 +2321,8 @@
 		    osdc->osdmap->epoch + 1 == epoch) {
 			dout("applying incremental map %u len %d\n",
 			     epoch, maplen);
-			err = handle_one_map(osdc, p, p + maplen, true);
+			err = handle_one_map(osdc, p, p + maplen, true,
+					     &need_resend, &need_resend_linger);
 			if (err)
 				goto bad;
 			handled_incremental = true;
@@ -2443,7 +2353,8 @@
 			     osdc->osdmap->epoch);
 		} else {
 			dout("taking full map %u len %d\n", epoch, maplen);
-			err = handle_one_map(osdc, p, p + maplen, false);
+			err = handle_one_map(osdc, p, p + maplen, false,
+					     &need_resend, &need_resend_linger);
 			if (err)
 				goto bad;
 		}
@@ -2464,20 +2375,60 @@
 	if (was_pauserd || was_pausewr || pauserd || pausewr)
 		maybe_request_map(osdc);
 
-	mutex_lock(&osdc->request_mutex);
-	__send_queued(osdc);
-	mutex_unlock(&osdc->request_mutex);
+	kick_requests(osdc, &need_resend, &need_resend_linger);
 
 	ceph_monc_got_map(&osdc->client->monc, CEPH_SUB_OSDMAP,
 			  osdc->osdmap->epoch);
-	up_write(&osdc->map_sem);
+	up_write(&osdc->lock);
 	wake_up_all(&osdc->client->auth_wq);
 	return;
 
 bad:
 	pr_err("osdc handle_map corrupt msg\n");
 	ceph_msg_dump(msg);
-	up_write(&osdc->map_sem);
+	up_write(&osdc->lock);
+}
+
+/*
+ * Resubmit requests pending on the given osd.
+ */
+static void kick_osd_requests(struct ceph_osd *osd)
+{
+	struct rb_node *n;
+
+	for (n = rb_first(&osd->o_requests); n; n = rb_next(n)) {
+		struct ceph_osd_request *req =
+		    rb_entry(n, struct ceph_osd_request, r_node);
+
+		if (!req->r_linger) {
+			if (!req->r_t.paused)
+				send_request(req);
+		}
+	}
+}
+
+/*
+ * If the osd connection drops, we need to resubmit all requests.
+ */
+static void osd_fault(struct ceph_connection *con)
+{
+	struct ceph_osd *osd = con->private;
+	struct ceph_osd_client *osdc = osd->o_osdc;
+
+	dout("%s osd %p osd%d\n", __func__, osd, osd->o_osd);
+
+	down_write(&osdc->lock);
+	if (!osd_registered(osd)) {
+		dout("%s osd%d unknown\n", __func__, osd->o_osd);
+		goto out_unlock;
+	}
+
+	if (!reopen_osd(osd))
+		kick_osd_requests(osd);
+	maybe_request_map(osdc);
+
+out_unlock:
+	up_write(&osdc->lock);
 }
 
 /*
@@ -2680,17 +2631,11 @@
 			    struct ceph_osd_request *req,
 			    bool nofail)
 {
-	int rc;
+	down_read(&osdc->lock);
+	submit_request(req, false);
+	up_read(&osdc->lock);
 
-	down_read(&osdc->map_sem);
-	mutex_lock(&osdc->request_mutex);
-
-	rc = __ceph_osdc_start_request(osdc, req, nofail);
-
-	mutex_unlock(&osdc->request_mutex);
-	up_read(&osdc->map_sem);
-
-	return rc;
+	return 0;
 }
 EXPORT_SYMBOL(ceph_osdc_start_request);
 
@@ -2703,13 +2648,12 @@
 {
 	struct ceph_osd_client *osdc = req->r_osdc;
 
-	mutex_lock(&osdc->request_mutex);
+	down_write(&osdc->lock);
 	if (req->r_linger)
 		__unregister_linger_request(osdc, req);
-	__unregister_request(osdc, req);
-	mutex_unlock(&osdc->request_mutex);
-
-	dout("%s %p tid %llu canceled\n", __func__, req, req->r_tid);
+	if (req->r_osd)
+		cancel_request(req);
+	up_write(&osdc->lock);
 }
 EXPORT_SYMBOL(ceph_osdc_cancel_request);
 
@@ -2744,32 +2688,40 @@
  */
 void ceph_osdc_sync(struct ceph_osd_client *osdc)
 {
-	struct ceph_osd_request *req;
-	u64 last_tid, next_tid = 0;
+	struct rb_node *n, *p;
+	u64 last_tid = atomic64_read(&osdc->last_tid);
 
-	mutex_lock(&osdc->request_mutex);
-	last_tid = osdc->last_tid;
-	while (1) {
-		req = __lookup_request_ge(osdc, next_tid);
-		if (!req)
-			break;
-		if (req->r_tid > last_tid)
-			break;
+again:
+	down_read(&osdc->lock);
+	for (n = rb_first(&osdc->osds); n; n = rb_next(n)) {
+		struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node);
 
-		next_tid = req->r_tid + 1;
-		if ((req->r_flags & CEPH_OSD_FLAG_WRITE) == 0)
-			continue;
+		mutex_lock(&osd->lock);
+		for (p = rb_first(&osd->o_requests); p; p = rb_next(p)) {
+			struct ceph_osd_request *req =
+			    rb_entry(p, struct ceph_osd_request, r_node);
 
-		ceph_osdc_get_request(req);
-		mutex_unlock(&osdc->request_mutex);
-		dout("sync waiting on tid %llu (last is %llu)\n",
-		     req->r_tid, last_tid);
-		wait_for_completion(&req->r_safe_completion);
-		mutex_lock(&osdc->request_mutex);
-		ceph_osdc_put_request(req);
+			if (req->r_tid > last_tid)
+				break;
+
+			if (!(req->r_flags & CEPH_OSD_FLAG_WRITE))
+				continue;
+
+			ceph_osdc_get_request(req);
+			mutex_unlock(&osd->lock);
+			up_read(&osdc->lock);
+			dout("%s waiting on req %p tid %llu last_tid %llu\n",
+			     __func__, req, req->r_tid, last_tid);
+			wait_for_completion(&req->r_safe_completion);
+			ceph_osdc_put_request(req);
+			goto again;
+		}
+
+		mutex_unlock(&osd->lock);
 	}
-	mutex_unlock(&osdc->request_mutex);
-	dout("sync done (thru tid %llu)\n", last_tid);
+
+	up_read(&osdc->lock);
+	dout("%s done last_tid %llu\n", __func__, last_tid);
 }
 EXPORT_SYMBOL(ceph_osdc_sync);
 
@@ -2793,18 +2745,14 @@
 
 	dout("init\n");
 	osdc->client = client;
-	init_rwsem(&osdc->map_sem);
-	mutex_init(&osdc->request_mutex);
-	osdc->last_tid = 0;
+	init_rwsem(&osdc->lock);
 	osdc->osds = RB_ROOT;
 	INIT_LIST_HEAD(&osdc->osd_lru);
 	spin_lock_init(&osdc->osd_lru_lock);
-	osdc->requests = RB_ROOT;
-	INIT_LIST_HEAD(&osdc->req_lru);
-	INIT_LIST_HEAD(&osdc->req_unsent);
-	INIT_LIST_HEAD(&osdc->req_notarget);
 	INIT_LIST_HEAD(&osdc->req_linger);
-	osdc->num_requests = 0;
+	osd_init(&osdc->homeless_osd);
+	osdc->homeless_osd.o_osdc = osdc;
+	osdc->homeless_osd.o_osd = CEPH_HOMELESS_OSD;
 	INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout);
 	INIT_DELAYED_WORK(&osdc->osds_timeout_work, handle_osds_timeout);
 	spin_lock_init(&osdc->event_lock);
@@ -2861,13 +2809,19 @@
 	cancel_delayed_work_sync(&osdc->timeout_work);
 	cancel_delayed_work_sync(&osdc->osds_timeout_work);
 
-	mutex_lock(&osdc->request_mutex);
+	down_write(&osdc->lock);
 	while (!RB_EMPTY_ROOT(&osdc->osds)) {
 		struct ceph_osd *osd = rb_entry(rb_first(&osdc->osds),
 						struct ceph_osd, o_node);
-		remove_osd(osdc, osd);
+		close_osd(osd);
 	}
-	mutex_unlock(&osdc->request_mutex);
+	up_write(&osdc->lock);
+	WARN_ON(atomic_read(&osdc->homeless_osd.o_ref) != 1);
+	osd_cleanup(&osdc->homeless_osd);
+
+	WARN_ON(!list_empty(&osdc->osd_lru));
+	WARN_ON(atomic_read(&osdc->num_requests));
+	WARN_ON(atomic_read(&osdc->num_homeless));
 
 	ceph_osdmap_destroy(osdc->osdmap);
 	mempool_destroy(osdc->req_mempool);
@@ -2982,19 +2936,15 @@
 static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
 {
 	struct ceph_osd *osd = con->private;
-	struct ceph_osd_client *osdc;
+	struct ceph_osd_client *osdc = osd->o_osdc;
 	int type = le16_to_cpu(msg->hdr.type);
 
-	if (!osd)
-		goto out;
-	osdc = osd->o_osdc;
-
 	switch (type) {
 	case CEPH_MSG_OSD_MAP:
 		ceph_osdc_handle_map(osdc, msg);
 		break;
 	case CEPH_MSG_OSD_OPREPLY:
-		handle_reply(osdc, msg);
+		handle_reply(osd, msg);
 		break;
 	case CEPH_MSG_WATCH_NOTIFY:
 		handle_watch_notify(osdc, msg);
@@ -3004,7 +2954,7 @@
 		pr_err("received unknown message type %d %s\n", type,
 		       ceph_msg_type_name(type));
 	}
-out:
+
 	ceph_msg_put(msg);
 }
 
@@ -3019,21 +2969,27 @@
 {
 	struct ceph_osd *osd = con->private;
 	struct ceph_osd_client *osdc = osd->o_osdc;
-	struct ceph_msg *m;
+	struct ceph_msg *m = NULL;
 	struct ceph_osd_request *req;
 	int front_len = le32_to_cpu(hdr->front_len);
 	int data_len = le32_to_cpu(hdr->data_len);
-	u64 tid;
+	u64 tid = le64_to_cpu(hdr->tid);
 
-	tid = le64_to_cpu(hdr->tid);
-	mutex_lock(&osdc->request_mutex);
-	req = lookup_request(&osdc->requests, tid);
+	down_read(&osdc->lock);
+	if (!osd_registered(osd)) {
+		dout("%s osd%d unknown, skipping\n", __func__, osd->o_osd);
+		*skip = 1;
+		goto out_unlock_osdc;
+	}
+	WARN_ON(osd->o_osd != le64_to_cpu(hdr->src.num));
+
+	mutex_lock(&osd->lock);
+	req = lookup_request(&osd->o_requests, tid);
 	if (!req) {
 		dout("%s osd%d tid %llu unknown, skipping\n", __func__,
 		     osd->o_osd, tid);
-		m = NULL;
 		*skip = 1;
-		goto out;
+		goto out_unlock_session;
 	}
 
 	ceph_msg_revoke_incoming(req->r_reply);
@@ -3045,7 +3001,7 @@
 		m = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, front_len, GFP_NOFS,
 				 false);
 		if (!m)
-			goto out;
+			goto out_unlock_session;
 		ceph_msg_put(req->r_reply);
 		req->r_reply = m;
 	}
@@ -3056,14 +3012,16 @@
 			req->r_reply->data_length);
 		m = NULL;
 		*skip = 1;
-		goto out;
+		goto out_unlock_session;
 	}
 
 	m = ceph_msg_get(req->r_reply);
 	dout("get_reply tid %lld %p\n", tid, m);
 
-out:
-	mutex_unlock(&osdc->request_mutex);
+out_unlock_session:
+	mutex_unlock(&osd->lock);
+out_unlock_osdc:
+	up_read(&osdc->lock);
 	return m;
 }
 
@@ -3083,8 +3041,8 @@
 	case CEPH_MSG_OSD_OPREPLY:
 		return get_reply(con, hdr, skip);
 	default:
-		pr_info("alloc_msg unexpected msg type %d from osd%d\n", type,
-			osd->o_osd);
+		pr_warn("%s osd%d unknown msg type %d, skipping\n", __func__,
+			osd->o_osd, type);
 		*skip = 1;
 		return NULL;
 	}
@@ -3188,5 +3146,5 @@
 	.alloc_msg = alloc_msg,
 	.sign_message = osd_sign_message,
 	.check_message_signature = osd_check_message_signature,
-	.fault = osd_reset,
+	.fault = osd_fault,
 };