libceph: revamp subs code, switch to SUBSCRIBE2 protocol

It is currently hard-coded in the mon_client that mdsmap and monmap
subs are continuous, while osdmap sub is always "onetime".  To better
handle full clusters/pools in the osd_client, we need to be able to
issue continuous osdmap subs.  Revamp subs code to allow us to specify
for each sub whether it should be continuous or not.

Although not strictly required for the above, switch to SUBSCRIBE2
protocol while at it, eliminating the ambiguity between a request for
"every map since X" and a request for "just the latest" when we don't
have a map yet (i.e. have epoch 0).  SUBSCRIBE2 feature bit is now
required - it's been supported since pre-argonaut (2010).

Move "got mdsmap" call to the end of ceph_mdsc_handle_map() - calling
in before we validate the epoch and successfully install the new map
can mess up mon_client sub state.

Signed-off-by: Ilya Dryomov <idryomov@gmail.com>
diff --git a/net/ceph/mon_client.c b/net/ceph/mon_client.c
index d6af6ca..8902991 100644
--- a/net/ceph/mon_client.c
+++ b/net/ceph/mon_client.c
@@ -140,9 +140,8 @@
 		monc->cur_mon = r % monc->monmap->num_mon;
 		dout("open_session num=%d r=%d -> mon%d\n",
 		     monc->monmap->num_mon, r, monc->cur_mon);
-		monc->sub_sent = 0;
 		monc->sub_renew_after = jiffies;  /* i.e., expired */
-		monc->want_next_osdmap = !!monc->want_next_osdmap;
+		monc->sub_renew_sent = 0;
 
 		dout("open_session mon%d opening\n", monc->cur_mon);
 		ceph_con_open(&monc->con,
@@ -189,59 +188,58 @@
 			      round_jiffies_relative(delay));
 }
 
+const char *ceph_sub_str[] = {
+	[CEPH_SUB_MDSMAP] = "mdsmap",
+	[CEPH_SUB_MONMAP] = "monmap",
+	[CEPH_SUB_OSDMAP] = "osdmap",
+};
+
 /*
- * Send subscribe request for mdsmap and/or osdmap.
+ * Send subscribe request for one or more maps, according to
+ * monc->subs.
  */
 static void __send_subscribe(struct ceph_mon_client *monc)
 {
-	dout("__send_subscribe sub_sent=%u exp=%u want_osd=%d\n",
-	     (unsigned int)monc->sub_sent, __sub_expired(monc),
-	     monc->want_next_osdmap);
-	if ((__sub_expired(monc) && !monc->sub_sent) ||
-	    monc->want_next_osdmap == 1) {
-		struct ceph_msg *msg = monc->m_subscribe;
-		struct ceph_mon_subscribe_item *i;
-		void *p, *end;
-		int num;
+	struct ceph_msg *msg = monc->m_subscribe;
+	void *p = msg->front.iov_base;
+	void *const end = p + msg->front_alloc_len;
+	int num = 0;
+	int i;
 
-		p = msg->front.iov_base;
-		end = p + msg->front_alloc_len;
+	dout("%s sent %lu\n", __func__, monc->sub_renew_sent);
 
-		num = 1 + !!monc->want_next_osdmap + !!monc->want_mdsmap;
-		ceph_encode_32(&p, num);
+	BUG_ON(monc->cur_mon < 0);
 
-		if (monc->want_next_osdmap) {
-			dout("__send_subscribe to 'osdmap' %u\n",
-			     (unsigned int)monc->have_osdmap);
-			ceph_encode_string(&p, end, "osdmap", 6);
-			i = p;
-			i->have = cpu_to_le64(monc->have_osdmap);
-			i->onetime = 1;
-			p += sizeof(*i);
-			monc->want_next_osdmap = 2;  /* requested */
-		}
-		if (monc->want_mdsmap) {
-			dout("__send_subscribe to 'mdsmap' %u+\n",
-			     (unsigned int)monc->have_mdsmap);
-			ceph_encode_string(&p, end, "mdsmap", 6);
-			i = p;
-			i->have = cpu_to_le64(monc->have_mdsmap);
-			i->onetime = 0;
-			p += sizeof(*i);
-		}
-		ceph_encode_string(&p, end, "monmap", 6);
-		i = p;
-		i->have = 0;
-		i->onetime = 0;
-		p += sizeof(*i);
+	if (!monc->sub_renew_sent)
+		monc->sub_renew_sent = jiffies | 1; /* never 0 */
 
-		msg->front.iov_len = p - msg->front.iov_base;
-		msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
-		ceph_msg_revoke(msg);
-		ceph_con_send(&monc->con, ceph_msg_get(msg));
+	msg->hdr.version = cpu_to_le16(2);
 
-		monc->sub_sent = jiffies | 1;  /* never 0 */
+	for (i = 0; i < ARRAY_SIZE(monc->subs); i++) {
+		if (monc->subs[i].want)
+			num++;
 	}
+	BUG_ON(num < 1); /* monmap sub is always there */
+	ceph_encode_32(&p, num);
+	for (i = 0; i < ARRAY_SIZE(monc->subs); i++) {
+		const char *s = ceph_sub_str[i];
+
+		if (!monc->subs[i].want)
+			continue;
+
+		dout("%s %s start %llu flags 0x%x\n", __func__, s,
+		     le64_to_cpu(monc->subs[i].item.start),
+		     monc->subs[i].item.flags);
+		ceph_encode_string(&p, end, s, strlen(s));
+		memcpy(p, &monc->subs[i].item, sizeof(monc->subs[i].item));
+		p += sizeof(monc->subs[i].item);
+	}
+
+	BUG_ON(p != (end - 35 - (ARRAY_SIZE(monc->subs) - num) * 19));
+	msg->front.iov_len = p - msg->front.iov_base;
+	msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
+	ceph_msg_revoke(msg);
+	ceph_con_send(&monc->con, ceph_msg_get(msg));
 }
 
 static void handle_subscribe_ack(struct ceph_mon_client *monc,
@@ -255,9 +253,16 @@
 	seconds = le32_to_cpu(h->duration);
 
 	mutex_lock(&monc->mutex);
-	dout("handle_subscribe_ack after %d seconds\n", seconds);
-	monc->sub_renew_after = monc->sub_sent + (seconds >> 1)*HZ - 1;
-	monc->sub_sent = 0;
+	if (monc->sub_renew_sent) {
+		monc->sub_renew_after = monc->sub_renew_sent +
+					    (seconds >> 1) * HZ - 1;
+		dout("%s sent %lu duration %d renew after %lu\n", __func__,
+		     monc->sub_renew_sent, seconds, monc->sub_renew_after);
+		monc->sub_renew_sent = 0;
+	} else {
+		dout("%s sent %lu renew after %lu, ignoring\n", __func__,
+		     monc->sub_renew_sent, monc->sub_renew_after);
+	}
 	mutex_unlock(&monc->mutex);
 	return;
 bad:
@@ -266,36 +271,82 @@
 }
 
 /*
- * Keep track of which maps we have
+ * Register interest in a map
+ *
+ * @sub: one of CEPH_SUB_*
+ * @epoch: X for "every map since X", or 0 for "just the latest"
  */
-int ceph_monc_got_mdsmap(struct ceph_mon_client *monc, u32 got)
+static bool __ceph_monc_want_map(struct ceph_mon_client *monc, int sub,
+				 u32 epoch, bool continuous)
 {
-	mutex_lock(&monc->mutex);
-	monc->have_mdsmap = got;
-	mutex_unlock(&monc->mutex);
-	return 0;
-}
-EXPORT_SYMBOL(ceph_monc_got_mdsmap);
+	__le64 start = cpu_to_le64(epoch);
+	u8 flags = !continuous ? CEPH_SUBSCRIBE_ONETIME : 0;
 
-int ceph_monc_got_osdmap(struct ceph_mon_client *monc, u32 got)
+	dout("%s %s epoch %u continuous %d\n", __func__, ceph_sub_str[sub],
+	     epoch, continuous);
+
+	if (monc->subs[sub].want &&
+	    monc->subs[sub].item.start == start &&
+	    monc->subs[sub].item.flags == flags)
+		return false;
+
+	monc->subs[sub].item.start = start;
+	monc->subs[sub].item.flags = flags;
+	monc->subs[sub].want = true;
+
+	return true;
+}
+
+bool ceph_monc_want_map(struct ceph_mon_client *monc, int sub, u32 epoch,
+			bool continuous)
+{
+	bool need_request;
+
+	mutex_lock(&monc->mutex);
+	need_request = __ceph_monc_want_map(monc, sub, epoch, continuous);
+	mutex_unlock(&monc->mutex);
+
+	return need_request;
+}
+EXPORT_SYMBOL(ceph_monc_want_map);
+
+/*
+ * Keep track of which maps we have
+ *
+ * @sub: one of CEPH_SUB_*
+ */
+static void __ceph_monc_got_map(struct ceph_mon_client *monc, int sub,
+				u32 epoch)
+{
+	dout("%s %s epoch %u\n", __func__, ceph_sub_str[sub], epoch);
+
+	if (monc->subs[sub].want) {
+		if (monc->subs[sub].item.flags & CEPH_SUBSCRIBE_ONETIME)
+			monc->subs[sub].want = false;
+		else
+			monc->subs[sub].item.start = cpu_to_le64(epoch + 1);
+	}
+
+	monc->subs[sub].have = epoch;
+}
+
+void ceph_monc_got_map(struct ceph_mon_client *monc, int sub, u32 epoch)
 {
 	mutex_lock(&monc->mutex);
-	monc->have_osdmap = got;
-	monc->want_next_osdmap = 0;
+	__ceph_monc_got_map(monc, sub, epoch);
 	mutex_unlock(&monc->mutex);
-	return 0;
 }
+EXPORT_SYMBOL(ceph_monc_got_map);
 
 /*
  * Register interest in the next osdmap
  */
 void ceph_monc_request_next_osdmap(struct ceph_mon_client *monc)
 {
-	dout("request_next_osdmap have %u\n", monc->have_osdmap);
+	dout("%s have %u\n", __func__, monc->subs[CEPH_SUB_OSDMAP].have);
 	mutex_lock(&monc->mutex);
-	if (!monc->want_next_osdmap)
-		monc->want_next_osdmap = 1;
-	if (monc->want_next_osdmap < 2)
+	if (__ceph_monc_want_map(monc, CEPH_SUB_OSDMAP,
+				 monc->subs[CEPH_SUB_OSDMAP].have + 1, false))
 		__send_subscribe(monc);
 	mutex_unlock(&monc->mutex);
 }
@@ -314,15 +365,15 @@
 	long ret;
 
 	mutex_lock(&monc->mutex);
-	while (monc->have_osdmap < epoch) {
+	while (monc->subs[CEPH_SUB_OSDMAP].have < epoch) {
 		mutex_unlock(&monc->mutex);
 
 		if (timeout && time_after_eq(jiffies, started + timeout))
 			return -ETIMEDOUT;
 
 		ret = wait_event_interruptible_timeout(monc->client->auth_wq,
-						monc->have_osdmap >= epoch,
-						ceph_timeout_jiffies(timeout));
+				     monc->subs[CEPH_SUB_OSDMAP].have >= epoch,
+				     ceph_timeout_jiffies(timeout));
 		if (ret < 0)
 			return ret;
 
@@ -335,11 +386,14 @@
 EXPORT_SYMBOL(ceph_monc_wait_osdmap);
 
 /*
- *
+ * Open a session with a random monitor.  Request monmap and osdmap,
+ * which are waited upon in __ceph_open_session().
  */
 int ceph_monc_open_session(struct ceph_mon_client *monc)
 {
 	mutex_lock(&monc->mutex);
+	__ceph_monc_want_map(monc, CEPH_SUB_MONMAP, 0, true);
+	__ceph_monc_want_map(monc, CEPH_SUB_OSDMAP, 0, false);
 	__open_session(monc);
 	__schedule_delayed(monc);
 	mutex_unlock(&monc->mutex);
@@ -375,6 +429,7 @@
 	client->monc.monmap = monmap;
 	kfree(old);
 
+	__ceph_monc_got_map(monc, CEPH_SUB_MONMAP, monc->monmap->epoch);
 	client->have_fsid = true;
 
 out:
@@ -725,8 +780,14 @@
 			__validate_auth(monc);
 		}
 
-		if (is_auth)
-			__send_subscribe(monc);
+		if (is_auth) {
+			unsigned long now = jiffies;
+
+			dout("%s renew subs? now %lu renew after %lu\n",
+			     __func__, now, monc->sub_renew_after);
+			if (time_after_eq(now, monc->sub_renew_after))
+				__send_subscribe(monc);
+		}
 	}
 	__schedule_delayed(monc);
 	mutex_unlock(&monc->mutex);
@@ -815,16 +876,13 @@
 	monc->cur_mon = -1;
 	monc->hunting = true;
 	monc->sub_renew_after = jiffies;
-	monc->sub_sent = 0;
+	monc->sub_renew_sent = 0;
 
 	INIT_DELAYED_WORK(&monc->delayed_work, delayed_work);
 	monc->generic_request_tree = RB_ROOT;
 	monc->num_generic_requests = 0;
 	monc->last_tid = 0;
 
-	monc->have_mdsmap = 0;
-	monc->have_osdmap = 0;
-	monc->want_next_osdmap = 1;
 	return 0;
 
 out_auth_reply: