libceph: fix osd request queuing on osdmap updates

If we send a request to osd A, and the request's pg remaps to osd B and
then back to A in quick succession, we need to resend the request to A. The
old code was only calling kick_requests after processing all incremental
maps in a message, so it was very possible to not resend a request that
needed to be resent.  This would make the osd eventually time out (at least
with the current default of osd timeouts enabled).

The correct approach is to scan requests on every map incremental.  This
patch refactors the kick code in a few ways:
 - all requests are either on req_lru (in flight), req_unsent (ready to
   send), or req_notarget (currently map to no up osd)
 - mapping always done by map_request (previous map_osds)
 - if the mapping changes, we requeue.  requests are resent only after all
   map incrementals are processed.
 - some osd reset code is moved out of kick_requests into a separate
   function
 - the "kick this osd" functionality is moved to kick_osd_requests, as it
   is unrelated to scanning for request->pg->osd mapping changes

Signed-off-by: Sage Weil <sage@newdream.net>
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index 3e20a12..b85ed5a 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -22,10 +22,9 @@
 #define OSD_OPREPLY_FRONT_LEN	512
 
 static const struct ceph_connection_operations osd_con_ops;
-static int __kick_requests(struct ceph_osd_client *osdc,
-			  struct ceph_osd *kickosd);
 
-static void kick_requests(struct ceph_osd_client *osdc, struct ceph_osd *osd);
+static void send_queued(struct ceph_osd_client *osdc);
+static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd);
 
 static int op_needs_trail(int op)
 {
@@ -529,6 +528,35 @@
 	return NULL;
 }
 
+/*
+ * Resubmit requests pending on the given osd.
+ */
+static void __kick_osd_requests(struct ceph_osd_client *osdc,
+				struct ceph_osd *osd)
+{
+	struct ceph_osd_request *req;
+	int err;
+
+	dout("__kick_osd_requests osd%d\n", osd->o_osd);
+	err = __reset_osd(osdc, osd);
+	if (err == -EAGAIN)
+		return;
+
+	list_for_each_entry(req, &osd->o_requests, r_osd_item) {
+		list_move(&req->r_req_lru_item, &osdc->req_unsent);
+		dout("requeued %p tid %llu osd%d\n", req, req->r_tid,
+		     osd->o_osd);
+		req->r_flags |= CEPH_OSD_FLAG_RETRY;
+	}
+}
+
+static void kick_osd_requests(struct ceph_osd_client *osdc,
+			      struct ceph_osd *kickosd)
+{
+	mutex_lock(&osdc->request_mutex);
+	__kick_osd_requests(osdc, kickosd);
+	mutex_unlock(&osdc->request_mutex);
+}
 
 /*
  * If the osd connection drops, we need to resubmit all requests.
@@ -543,7 +571,8 @@
 	dout("osd_reset osd%d\n", osd->o_osd);
 	osdc = osd->o_osdc;
 	down_read(&osdc->map_sem);
-	kick_requests(osdc, osd);
+	kick_osd_requests(osdc, osd);
+	send_queued(osdc);
 	up_read(&osdc->map_sem);
 }
 
@@ -781,20 +810,20 @@
 		ceph_con_revoke(&req->r_osd->o_con, req->r_request);
 		req->r_sent = 0;
 	}
-	list_del_init(&req->r_req_lru_item);
 }
 
 /*
  * Pick an osd (the first 'up' osd in the pg), allocate the osd struct
  * (as needed), and set the request r_osd appropriately.  If there is
- * no up osd, set r_osd to NULL.
+ * no up osd, set r_osd to NULL.  Move the request to the appropiate list
+ * (unsent, homeless) or leave on in-flight lru.
  *
  * Return 0 if unchanged, 1 if changed, or negative on error.
  *
  * Caller should hold map_sem for read and request_mutex.
  */
-static int __map_osds(struct ceph_osd_client *osdc,
-		      struct ceph_osd_request *req)
+static int __map_request(struct ceph_osd_client *osdc,
+			 struct ceph_osd_request *req)
 {
 	struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base;
 	struct ceph_pg pgid;
@@ -802,11 +831,13 @@
 	int o = -1, num = 0;
 	int err;
 
-	dout("map_osds %p tid %lld\n", req, req->r_tid);
+	dout("map_request %p tid %lld\n", req, req->r_tid);
 	err = ceph_calc_object_layout(&reqhead->layout, req->r_oid,
 				      &req->r_file_layout, osdc->osdmap);
-	if (err)
+	if (err) {
+		list_move(&req->r_req_lru_item, &osdc->req_notarget);
 		return err;
+	}
 	pgid = reqhead->layout.ol_pgid;
 	req->r_pgid = pgid;
 
@@ -823,7 +854,7 @@
 	    (req->r_osd == NULL && o == -1))
 		return 0;  /* no change */
 
-	dout("map_osds tid %llu pgid %d.%x osd%d (was osd%d)\n",
+	dout("map_request tid %llu pgid %d.%x osd%d (was osd%d)\n",
 	     req->r_tid, le32_to_cpu(pgid.pool), le16_to_cpu(pgid.ps), o,
 	     req->r_osd ? req->r_osd->o_osd : -1);
 
@@ -841,10 +872,12 @@
 	if (!req->r_osd && o >= 0) {
 		err = -ENOMEM;
 		req->r_osd = create_osd(osdc);
-		if (!req->r_osd)
+		if (!req->r_osd) {
+			list_move(&req->r_req_lru_item, &osdc->req_notarget);
 			goto out;
+		}
 
-		dout("map_osds osd %p is osd%d\n", req->r_osd, o);
+		dout("map_request osd %p is osd%d\n", req->r_osd, o);
 		req->r_osd->o_osd = o;
 		req->r_osd->o_con.peer_name.num = cpu_to_le64(o);
 		__insert_osd(osdc, req->r_osd);
@@ -855,6 +888,9 @@
 	if (req->r_osd) {
 		__remove_osd_from_lru(req->r_osd);
 		list_add(&req->r_osd_item, &req->r_osd->o_requests);
+		list_move(&req->r_req_lru_item, &osdc->req_unsent);
+	} else {
+		list_move(&req->r_req_lru_item, &osdc->req_notarget);
 	}
 	err = 1;   /* osd or pg changed */
 
@@ -869,16 +905,6 @@
 			  struct ceph_osd_request *req)
 {
 	struct ceph_osd_request_head *reqhead;
-	int err;
-
-	err = __map_osds(osdc, req);
-	if (err < 0)
-		return err;
-	if (req->r_osd == NULL) {
-		dout("send_request %p no up osds in pg\n", req);
-		ceph_monc_request_next_osdmap(&osdc->client->monc);
-		return 0;
-	}
 
 	dout("send_request %p tid %llu to osd%d flags %d\n",
 	     req, req->r_tid, req->r_osd->o_osd, req->r_flags);
@@ -898,6 +924,21 @@
 }
 
 /*
+ * Send any requests in the queue (req_unsent).
+ */
+static void send_queued(struct ceph_osd_client *osdc)
+{
+	struct ceph_osd_request *req, *tmp;
+
+	dout("send_queued\n");
+	mutex_lock(&osdc->request_mutex);
+	list_for_each_entry_safe(req, tmp, &osdc->req_unsent, r_req_lru_item) {
+		__send_request(osdc, req);
+	}
+	mutex_unlock(&osdc->request_mutex);
+}
+
+/*
  * Timeout callback, called every N seconds when 1 or more osd
  * requests has been active for more than N seconds.  When this
  * happens, we ping all OSDs with requests who have timed out to
@@ -916,7 +957,6 @@
 	unsigned long keepalive =
 		osdc->client->options->osd_keepalive_timeout * HZ;
 	unsigned long last_stamp = 0;
-	struct rb_node *p;
 	struct list_head slow_osds;
 
 	dout("timeout\n");
@@ -925,21 +965,6 @@
 	ceph_monc_request_next_osdmap(&osdc->client->monc);
 
 	mutex_lock(&osdc->request_mutex);
-	for (p = rb_first(&osdc->requests); p; p = rb_next(p)) {
-		req = rb_entry(p, struct ceph_osd_request, r_node);
-
-		if (req->r_resend) {
-			int err;
-
-			dout("osdc resending prev failed %lld\n", req->r_tid);
-			err = __send_request(osdc, req);
-			if (err)
-				dout("osdc failed again on %lld\n", req->r_tid);
-			else
-				req->r_resend = false;
-			continue;
-		}
-	}
 
 	/*
 	 * reset osds that appear to be _really_ unresponsive.  this
@@ -963,7 +988,7 @@
 		BUG_ON(!osd);
 		pr_warning(" tid %llu timed out on osd%d, will reset osd\n",
 			   req->r_tid, osd->o_osd);
-		__kick_requests(osdc, osd);
+		__kick_osd_requests(osdc, osd);
 	}
 
 	/*
@@ -991,7 +1016,7 @@
 
 	__schedule_osd_timeout(osdc);
 	mutex_unlock(&osdc->request_mutex);
-
+	send_queued(osdc);
 	up_read(&osdc->map_sem);
 }
 
@@ -1109,108 +1134,61 @@
 	ceph_msg_dump(msg);
 }
 
-
-static int __kick_requests(struct ceph_osd_client *osdc,
-			  struct ceph_osd *kickosd)
+static void reset_changed_osds(struct ceph_osd_client *osdc)
 {
-	struct ceph_osd_request *req;
 	struct rb_node *p, *n;
-	int needmap = 0;
-	int err;
 
-	dout("kick_requests osd%d\n", kickosd ? kickosd->o_osd : -1);
-	if (kickosd) {
-		err = __reset_osd(osdc, kickosd);
-		if (err == -EAGAIN)
-			return 1;
-	} else {
-		for (p = rb_first(&osdc->osds); p; p = n) {
-			struct ceph_osd *osd =
-				rb_entry(p, struct ceph_osd, o_node);
+	for (p = rb_first(&osdc->osds); p; p = n) {
+		struct ceph_osd *osd = rb_entry(p, struct ceph_osd, o_node);
 
-			n = rb_next(p);
-			if (!ceph_osd_is_up(osdc->osdmap, osd->o_osd) ||
-			    memcmp(&osd->o_con.peer_addr,
-				   ceph_osd_addr(osdc->osdmap,
-						 osd->o_osd),
-				   sizeof(struct ceph_entity_addr)) != 0)
-				__reset_osd(osdc, osd);
-		}
+		n = rb_next(p);
+		if (!ceph_osd_is_up(osdc->osdmap, osd->o_osd) ||
+		    memcmp(&osd->o_con.peer_addr,
+			   ceph_osd_addr(osdc->osdmap,
+					 osd->o_osd),
+			   sizeof(struct ceph_entity_addr)) != 0)
+			__reset_osd(osdc, osd);
 	}
-
-	for (p = rb_first(&osdc->requests); p; p = rb_next(p)) {
-		req = rb_entry(p, struct ceph_osd_request, r_node);
-
-		if (req->r_resend) {
-			dout(" r_resend set on tid %llu\n", req->r_tid);
-			__cancel_request(req);
-			goto kick;
-		}
-		if (req->r_osd && kickosd == req->r_osd) {
-			__cancel_request(req);
-			goto kick;
-		}
-
-		err = __map_osds(osdc, req);
-		if (err == 0)
-			continue;  /* no change */
-		if (err < 0) {
-			/*
-			 * FIXME: really, we should set the request
-			 * error and fail if this isn't a 'nofail'
-			 * request, but that's a fair bit more
-			 * complicated to do.  So retry!
-			 */
-			dout(" setting r_resend on %llu\n", req->r_tid);
-			req->r_resend = true;
-			continue;
-		}
-		if (req->r_osd == NULL) {
-			dout("tid %llu maps to no valid osd\n", req->r_tid);
-			needmap++;  /* request a newer map */
-			continue;
-		}
-
-kick:
-		dout("kicking %p tid %llu osd%d\n", req, req->r_tid,
-		     req->r_osd ? req->r_osd->o_osd : -1);
-		req->r_flags |= CEPH_OSD_FLAG_RETRY;
-		err = __send_request(osdc, req);
-		if (err) {
-			dout(" setting r_resend on %llu\n", req->r_tid);
-			req->r_resend = true;
-		}
-	}
-
-	return needmap;
 }
 
 /*
- * Resubmit osd requests whose osd or osd address has changed.  Request
- * a new osd map if osds are down, or we are otherwise unable to determine
- * how to direct a request.
- *
- * Close connections to down osds.
- *
- * If @who is specified, resubmit requests for that specific osd.
+ * Requeue requests whose mapping to an OSD has changed.  If requests map to
+ * no osd, request a new map.
  *
  * Caller should hold map_sem for read and request_mutex.
  */
-static void kick_requests(struct ceph_osd_client *osdc,
-			  struct ceph_osd *kickosd)
+static void kick_requests(struct ceph_osd_client *osdc)
 {
-	int needmap;
+	struct ceph_osd_request *req;
+	struct rb_node *p;
+	int needmap = 0;
+	int err;
 
+	dout("kick_requests\n");
 	mutex_lock(&osdc->request_mutex);
-	needmap = __kick_requests(osdc, kickosd);
+	for (p = rb_first(&osdc->requests); p; p = rb_next(p)) {
+		req = rb_entry(p, struct ceph_osd_request, r_node);
+		err = __map_request(osdc, req);
+		if (err < 0)
+			continue;  /* error */
+		if (req->r_osd == NULL) {
+			dout("%p tid %llu maps to no osd\n", req, req->r_tid);
+			needmap++;  /* request a newer map */
+		} else if (err > 0) {
+			dout("%p tid %llu requeued on osd%d\n", req, req->r_tid,
+			     req->r_osd ? req->r_osd->o_osd : -1);
+			req->r_flags |= CEPH_OSD_FLAG_RETRY;
+		}
+	}
 	mutex_unlock(&osdc->request_mutex);
 
 	if (needmap) {
 		dout("%d requests for down osds, need new map\n", needmap);
 		ceph_monc_request_next_osdmap(&osdc->client->monc);
 	}
-
 }
+
+
 /*
  * Process updated osd map.
  *
@@ -1263,6 +1241,8 @@
 				ceph_osdmap_destroy(osdc->osdmap);
 				osdc->osdmap = newmap;
 			}
+			kick_requests(osdc);
+			reset_changed_osds(osdc);
 		} else {
 			dout("ignoring incremental map %u len %d\n",
 			     epoch, maplen);
@@ -1300,6 +1280,7 @@
 			osdc->osdmap = newmap;
 			if (oldmap)
 				ceph_osdmap_destroy(oldmap);
+			kick_requests(osdc);
 		}
 		p += maplen;
 		nr_maps--;
@@ -1308,8 +1289,7 @@
 done:
 	downgrade_write(&osdc->map_sem);
 	ceph_monc_got_osdmap(&osdc->client->monc, osdc->osdmap->epoch);
-	if (newmap)
-		kick_requests(osdc, NULL);
+	send_queued(osdc);
 	up_read(&osdc->map_sem);
 	wake_up_all(&osdc->client->auth_wq);
 	return;
@@ -1347,15 +1327,22 @@
 	 * the request still han't been touched yet.
 	 */
 	if (req->r_sent == 0) {
-		rc = __send_request(osdc, req);
-		if (rc) {
-			if (nofail) {
-				dout("osdc_start_request failed send, "
-				     " marking %lld\n", req->r_tid);
-				req->r_resend = true;
-				rc = 0;
-			} else {
-				__unregister_request(osdc, req);
+		rc = __map_request(osdc, req);
+		if (rc < 0)
+			return rc;
+		if (req->r_osd == NULL) {
+			dout("send_request %p no up osds in pg\n", req);
+			ceph_monc_request_next_osdmap(&osdc->client->monc);
+		} else {
+			rc = __send_request(osdc, req);
+			if (rc) {
+				if (nofail) {
+					dout("osdc_start_request failed send, "
+					     " will retry %lld\n", req->r_tid);
+					rc = 0;
+				} else {
+					__unregister_request(osdc, req);
+				}
 			}
 		}
 	}
@@ -1441,6 +1428,8 @@
 	INIT_LIST_HEAD(&osdc->osd_lru);
 	osdc->requests = RB_ROOT;
 	INIT_LIST_HEAD(&osdc->req_lru);
+	INIT_LIST_HEAD(&osdc->req_unsent);
+	INIT_LIST_HEAD(&osdc->req_notarget);
 	osdc->num_requests = 0;
 	INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout);
 	INIT_DELAYED_WORK(&osdc->osds_timeout_work, handle_osds_timeout);