libceph: pool deletion detection

This adds the "map check" infrastructure for sending osdmap version
checks on CALC_TARGET_POOL_DNE and completing in-flight requests with
-ENOENT if the target pool doesn't exist or has just been deleted.

Signed-off-by: Ilya Dryomov <idryomov@gmail.com>
diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h
index 2ae7cfd..3e7bf72 100644
--- a/include/linux/ceph/osd_client.h
+++ b/include/linux/ceph/osd_client.h
@@ -151,6 +151,7 @@
 struct ceph_osd_request {
 	u64             r_tid;              /* unique for this client */
 	struct rb_node  r_node;
+	struct rb_node  r_mc_node;          /* map check */
 	struct ceph_osd *r_osd;
 
 	struct ceph_osd_request_target r_t;
@@ -191,6 +192,7 @@
 	int r_attempts;
 	struct ceph_eversion r_replay_version; /* aka reassert_version */
 	u32 r_last_force_resend;
+	u32 r_map_dne_bound;
 
 	struct ceph_osd_req_op r_ops[];
 };
@@ -218,6 +220,7 @@
 
 	struct ceph_osd_request_target t;
 	u32 last_force_resend;
+	u32 map_dne_bound;
 
 	struct timespec mtime;
 
@@ -225,6 +228,7 @@
 	struct mutex lock;
 	struct rb_node node;            /* osd */
 	struct rb_node osdc_node;       /* osdc */
+	struct rb_node mc_node;         /* map check */
 	struct list_head scan_item;
 
 	struct completion reg_commit_wait;
@@ -257,6 +261,8 @@
 	atomic64_t             last_tid;      /* tid of last request */
 	u64                    last_linger_id;
 	struct rb_root         linger_requests; /* lingering requests */
+	struct rb_root         map_checks;
+	struct rb_root         linger_map_checks;
 	atomic_t               num_requests;
 	atomic_t               num_homeless;
 	struct delayed_work    timeout_work;
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index 5ac6dce..ece2d10 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -396,6 +396,7 @@
 static void request_release_checks(struct ceph_osd_request *req)
 {
 	WARN_ON(!RB_EMPTY_NODE(&req->r_node));
+	WARN_ON(!RB_EMPTY_NODE(&req->r_mc_node));
 	WARN_ON(!list_empty(&req->r_unsafe_item));
 	WARN_ON(req->r_osd);
 }
@@ -456,6 +457,7 @@
 	init_completion(&req->r_completion);
 	init_completion(&req->r_safe_completion);
 	RB_CLEAR_NODE(&req->r_node);
+	RB_CLEAR_NODE(&req->r_mc_node);
 	INIT_LIST_HEAD(&req->r_unsafe_item);
 
 	target_init(&req->r_t);
@@ -969,6 +971,7 @@
  * We keep osd requests in an rbtree, sorted by ->r_tid.
  */
 DEFINE_RB_FUNCS(request, struct ceph_osd_request, r_tid, r_node)
+DEFINE_RB_FUNCS(request_mc, struct ceph_osd_request, r_tid, r_mc_node)
 
 static bool osd_homeless(struct ceph_osd *osd)
 {
@@ -1601,10 +1604,13 @@
 		ceph_monc_renew_subs(&osdc->client->monc);
 }
 
+static void send_map_check(struct ceph_osd_request *req);
+
 static void __submit_request(struct ceph_osd_request *req, bool wrlocked)
 {
 	struct ceph_osd_client *osdc = req->r_osdc;
 	struct ceph_osd *osd;
+	enum calc_target_result ct_res;
 	bool need_send = false;
 	bool promoted = false;
 
@@ -1612,7 +1618,10 @@
 	dout("%s req %p wrlocked %d\n", __func__, req, wrlocked);
 
 again:
-	calc_target(osdc, &req->r_t, &req->r_last_force_resend, false);
+	ct_res = calc_target(osdc, &req->r_t, &req->r_last_force_resend, false);
+	if (ct_res == CALC_TARGET_POOL_DNE && !wrlocked)
+		goto promote;
+
 	osd = lookup_create_osd(osdc, req->r_t.osd, wrlocked);
 	if (IS_ERR(osd)) {
 		WARN_ON(PTR_ERR(osd) != -EAGAIN || wrlocked);
@@ -1656,6 +1665,9 @@
 		send_request(req);
 	mutex_unlock(&osd->lock);
 
+	if (ct_res == CALC_TARGET_POOL_DNE)
+		send_map_check(req);
+
 	if (promoted)
 		downgrade_write(&osdc->lock);
 	return;
@@ -1699,6 +1711,7 @@
 	verify_osd_locked(osd);
 	dout("%s req %p tid %llu\n", __func__, req, req->r_tid);
 
+	WARN_ON(lookup_request_mc(&osdc->map_checks, req->r_tid));
 	unlink_request(osd, req);
 	atomic_dec(&osdc->num_requests);
 
@@ -1726,13 +1739,127 @@
 		complete_all(&req->r_completion);
 }
 
+/*
+ * Note that this is open-coded in handle_reply(), which has to deal
+ * with ack vs commit, dup acks, etc.
+ */
+static void complete_request(struct ceph_osd_request *req, int err)
+{
+	dout("%s req %p tid %llu err %d\n", __func__, req, req->r_tid, err);
+
+	req->r_result = err;
+	__finish_request(req);
+	__complete_request(req);
+	complete_all(&req->r_safe_completion);
+	ceph_osdc_put_request(req);
+}
+
+static void cancel_map_check(struct ceph_osd_request *req)
+{
+	struct ceph_osd_client *osdc = req->r_osdc;
+	struct ceph_osd_request *lookup_req;
+
+	verify_osdc_wrlocked(osdc);
+
+	lookup_req = lookup_request_mc(&osdc->map_checks, req->r_tid);
+	if (!lookup_req)
+		return;
+
+	WARN_ON(lookup_req != req);
+	erase_request_mc(&osdc->map_checks, req);
+	ceph_osdc_put_request(req);
+}
+
 static void cancel_request(struct ceph_osd_request *req)
 {
 	dout("%s req %p tid %llu\n", __func__, req, req->r_tid);
 
+	cancel_map_check(req);
 	finish_request(req);
 }
 
+static void check_pool_dne(struct ceph_osd_request *req)
+{
+	struct ceph_osd_client *osdc = req->r_osdc;
+	struct ceph_osdmap *map = osdc->osdmap;
+
+	verify_osdc_wrlocked(osdc);
+	WARN_ON(!map->epoch);
+
+	if (req->r_attempts) {
+		/*
+		 * We sent a request earlier, which means that
+		 * previously the pool existed, and now it does not
+		 * (i.e., it was deleted).
+		 */
+		req->r_map_dne_bound = map->epoch;
+		dout("%s req %p tid %llu pool disappeared\n", __func__, req,
+		     req->r_tid);
+	} else {
+		dout("%s req %p tid %llu map_dne_bound %u have %u\n", __func__,
+		     req, req->r_tid, req->r_map_dne_bound, map->epoch);
+	}
+
+	if (req->r_map_dne_bound) {
+		if (map->epoch >= req->r_map_dne_bound) {
+			/* we had a new enough map */
+			pr_info_ratelimited("tid %llu pool does not exist\n",
+					    req->r_tid);
+			complete_request(req, -ENOENT);
+		}
+	} else {
+		send_map_check(req);
+	}
+}
+
+static void map_check_cb(struct ceph_mon_generic_request *greq)
+{
+	struct ceph_osd_client *osdc = &greq->monc->client->osdc;
+	struct ceph_osd_request *req;
+	u64 tid = greq->private_data;
+
+	WARN_ON(greq->result || !greq->u.newest);
+
+	down_write(&osdc->lock);
+	req = lookup_request_mc(&osdc->map_checks, tid);
+	if (!req) {
+		dout("%s tid %llu dne\n", __func__, tid);
+		goto out_unlock;
+	}
+
+	dout("%s req %p tid %llu map_dne_bound %u newest %llu\n", __func__,
+	     req, req->r_tid, req->r_map_dne_bound, greq->u.newest);
+	if (!req->r_map_dne_bound)
+		req->r_map_dne_bound = greq->u.newest;
+	erase_request_mc(&osdc->map_checks, req);
+	check_pool_dne(req);
+
+	ceph_osdc_put_request(req);
+out_unlock:
+	up_write(&osdc->lock);
+}
+
+static void send_map_check(struct ceph_osd_request *req)
+{
+	struct ceph_osd_client *osdc = req->r_osdc;
+	struct ceph_osd_request *lookup_req;
+	int ret;
+
+	verify_osdc_wrlocked(osdc);
+
+	lookup_req = lookup_request_mc(&osdc->map_checks, req->r_tid);
+	if (lookup_req) {
+		WARN_ON(lookup_req != req);
+		return;
+	}
+
+	ceph_osdc_get_request(req);
+	insert_request_mc(&osdc->map_checks, req);
+	ret = ceph_monc_get_version_async(&osdc->client->monc, "osdmap",
+					  map_check_cb, req->r_tid);
+	WARN_ON(ret);
+}
+
 /*
  * lingering requests, watch/notify v2 infrastructure
  */
@@ -1745,6 +1872,7 @@
 	     lreq->reg_req, lreq->ping_req);
 	WARN_ON(!RB_EMPTY_NODE(&lreq->node));
 	WARN_ON(!RB_EMPTY_NODE(&lreq->osdc_node));
+	WARN_ON(!RB_EMPTY_NODE(&lreq->mc_node));
 	WARN_ON(!list_empty(&lreq->scan_item));
 	WARN_ON(!list_empty(&lreq->pending_lworks));
 	WARN_ON(lreq->osd);
@@ -1783,6 +1911,7 @@
 	mutex_init(&lreq->lock);
 	RB_CLEAR_NODE(&lreq->node);
 	RB_CLEAR_NODE(&lreq->osdc_node);
+	RB_CLEAR_NODE(&lreq->mc_node);
 	INIT_LIST_HEAD(&lreq->scan_item);
 	INIT_LIST_HEAD(&lreq->pending_lworks);
 	init_completion(&lreq->reg_commit_wait);
@@ -1797,6 +1926,7 @@
 
 DEFINE_RB_INSDEL_FUNCS(linger, struct ceph_osd_linger_request, linger_id, node)
 DEFINE_RB_FUNCS(linger_osdc, struct ceph_osd_linger_request, linger_id, osdc_node)
+DEFINE_RB_FUNCS(linger_mc, struct ceph_osd_linger_request, linger_id, mc_node)
 
 /*
  * Create linger request <-> OSD session relation.
@@ -2193,6 +2323,23 @@
 	send_linger(lreq);
 }
 
+static void cancel_linger_map_check(struct ceph_osd_linger_request *lreq)
+{
+	struct ceph_osd_client *osdc = lreq->osdc;
+	struct ceph_osd_linger_request *lookup_lreq;
+
+	verify_osdc_wrlocked(osdc);
+
+	lookup_lreq = lookup_linger_mc(&osdc->linger_map_checks,
+				       lreq->linger_id);
+	if (!lookup_lreq)
+		return;
+
+	WARN_ON(lookup_lreq != lreq);
+	erase_linger_mc(&osdc->linger_map_checks, lreq);
+	linger_put(lreq);
+}
+
 /*
  * @lreq has to be both registered and linked.
  */
@@ -2202,6 +2349,7 @@
 		cancel_linger_request(lreq->ping_req);
 	if (lreq->reg_req->r_osd)
 		cancel_linger_request(lreq->reg_req);
+	cancel_linger_map_check(lreq);
 	unlink_linger(lreq->osd, lreq);
 	linger_unregister(lreq);
 }
@@ -2216,6 +2364,89 @@
 	up_write(&osdc->lock);
 }
 
+static void send_linger_map_check(struct ceph_osd_linger_request *lreq);
+
+static void check_linger_pool_dne(struct ceph_osd_linger_request *lreq)
+{
+	struct ceph_osd_client *osdc = lreq->osdc;
+	struct ceph_osdmap *map = osdc->osdmap;
+
+	verify_osdc_wrlocked(osdc);
+	WARN_ON(!map->epoch);
+
+	if (lreq->register_gen) {
+		lreq->map_dne_bound = map->epoch;
+		dout("%s lreq %p linger_id %llu pool disappeared\n", __func__,
+		     lreq, lreq->linger_id);
+	} else {
+		dout("%s lreq %p linger_id %llu map_dne_bound %u have %u\n",
+		     __func__, lreq, lreq->linger_id, lreq->map_dne_bound,
+		     map->epoch);
+	}
+
+	if (lreq->map_dne_bound) {
+		if (map->epoch >= lreq->map_dne_bound) {
+			/* we had a new enough map */
+			pr_info("linger_id %llu pool does not exist\n",
+				lreq->linger_id);
+			linger_reg_commit_complete(lreq, -ENOENT);
+			__linger_cancel(lreq);
+		}
+	} else {
+		send_linger_map_check(lreq);
+	}
+}
+
+static void linger_map_check_cb(struct ceph_mon_generic_request *greq)
+{
+	struct ceph_osd_client *osdc = &greq->monc->client->osdc;
+	struct ceph_osd_linger_request *lreq;
+	u64 linger_id = greq->private_data;
+
+	WARN_ON(greq->result || !greq->u.newest);
+
+	down_write(&osdc->lock);
+	lreq = lookup_linger_mc(&osdc->linger_map_checks, linger_id);
+	if (!lreq) {
+		dout("%s linger_id %llu dne\n", __func__, linger_id);
+		goto out_unlock;
+	}
+
+	dout("%s lreq %p linger_id %llu map_dne_bound %u newest %llu\n",
+	     __func__, lreq, lreq->linger_id, lreq->map_dne_bound,
+	     greq->u.newest);
+	if (!lreq->map_dne_bound)
+		lreq->map_dne_bound = greq->u.newest;
+	erase_linger_mc(&osdc->linger_map_checks, lreq);
+	check_linger_pool_dne(lreq);
+
+	linger_put(lreq);
+out_unlock:
+	up_write(&osdc->lock);
+}
+
+static void send_linger_map_check(struct ceph_osd_linger_request *lreq)
+{
+	struct ceph_osd_client *osdc = lreq->osdc;
+	struct ceph_osd_linger_request *lookup_lreq;
+	int ret;
+
+	verify_osdc_wrlocked(osdc);
+
+	lookup_lreq = lookup_linger_mc(&osdc->linger_map_checks,
+				       lreq->linger_id);
+	if (lookup_lreq) {
+		WARN_ON(lookup_lreq != lreq);
+		return;
+	}
+
+	linger_get(lreq);
+	insert_linger_mc(&osdc->linger_map_checks, lreq);
+	ret = ceph_monc_get_version_async(&osdc->client->monc, "osdmap",
+					  linger_map_check_cb, lreq->linger_id);
+	WARN_ON(ret);
+}
+
 static int linger_reg_commit_wait(struct ceph_osd_linger_request *lreq)
 {
 	int ret;
@@ -2677,10 +2908,7 @@
 	return;
 
 fail_request:
-	req->r_result = -EIO;
-	__finish_request(req);
-	__complete_request(req);
-	complete_all(&req->r_safe_completion);
+	complete_request(req, -EIO);
 out_unlock_session:
 	mutex_unlock(&osd->lock);
 out_unlock_osdc:
@@ -2764,6 +2992,7 @@
 
 			/* fall through */
 		case CALC_TARGET_NEED_RESEND:
+			cancel_linger_map_check(lreq);
 			/*
 			 * scan_requests() for the previous epoch(s)
 			 * may have already added it to the list, since
@@ -2773,6 +3002,7 @@
 				list_add_tail(&lreq->scan_item, need_resend_linger);
 			break;
 		case CALC_TARGET_POOL_DNE:
+			check_linger_pool_dne(lreq);
 			break;
 		}
 	}
@@ -2782,7 +3012,7 @@
 		    rb_entry(n, struct ceph_osd_request, r_node);
 		enum calc_target_result ct_res;
 
-		n = rb_next(n); /* unlink_request() */
+		n = rb_next(n); /* unlink_request(), check_pool_dne() */
 
 		dout("%s req %p tid %llu\n", __func__, req, req->r_tid);
 		ct_res = calc_target(osdc, &req->r_t,
@@ -2799,10 +3029,12 @@
 
 			/* fall through */
 		case CALC_TARGET_NEED_RESEND:
+			cancel_map_check(req);
 			unlink_request(osd, req);
 			insert_request(need_resend, req);
 			break;
 		case CALC_TARGET_POOL_DNE:
+			check_pool_dne(req);
 			break;
 		}
 	}
@@ -3655,6 +3887,8 @@
 	osdc->homeless_osd.o_osdc = osdc;
 	osdc->homeless_osd.o_osd = CEPH_HOMELESS_OSD;
 	osdc->linger_requests = RB_ROOT;
+	osdc->map_checks = RB_ROOT;
+	osdc->linger_map_checks = RB_ROOT;
 	INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout);
 	INIT_DELAYED_WORK(&osdc->osds_timeout_work, handle_osds_timeout);
 
@@ -3720,6 +3954,8 @@
 
 	WARN_ON(!list_empty(&osdc->osd_lru));
 	WARN_ON(!RB_EMPTY_ROOT(&osdc->linger_requests));
+	WARN_ON(!RB_EMPTY_ROOT(&osdc->map_checks));
+	WARN_ON(!RB_EMPTY_ROOT(&osdc->linger_map_checks));
 	WARN_ON(atomic_read(&osdc->num_requests));
 	WARN_ON(atomic_read(&osdc->num_homeless));