libceph: handle_one_map()

Separate osdmap handling from decoding and iterating over a bag of maps
in a fresh MOSDMap message.  This sets up the scene for the updated OSD
client.

Of particular importance here is the addition of pi->was_full, which
can be used to answer "did this pool go full -> not-full in this map?".
This is the key bit for supporting pool quotas.

We won't be able to downgrade map_sem for much longer, so drop
downgrade_write().

Signed-off-by: Ilya Dryomov <idryomov@gmail.com>
diff --git a/include/linux/ceph/mon_client.h b/include/linux/ceph/mon_client.h
index 330d045..c14e9d8 100644
--- a/include/linux/ceph/mon_client.h
+++ b/include/linux/ceph/mon_client.h
@@ -115,6 +115,7 @@
 bool ceph_monc_want_map(struct ceph_mon_client *monc, int sub, u32 epoch,
 			bool continuous);
 void ceph_monc_got_map(struct ceph_mon_client *monc, int sub, u32 epoch);
+void ceph_monc_renew_subs(struct ceph_mon_client *monc);
 
 extern void ceph_monc_request_next_osdmap(struct ceph_mon_client *monc);
 extern int ceph_monc_wait_osdmap(struct ceph_mon_client *monc, u32 epoch,
diff --git a/include/linux/ceph/osdmap.h b/include/linux/ceph/osdmap.h
index 8468c73..821e16f 100644
--- a/include/linux/ceph/osdmap.h
+++ b/include/linux/ceph/osdmap.h
@@ -45,6 +45,8 @@
 	s64 write_tier; /* wins for read+write ops */
 	u64 flags; /* CEPH_POOL_FLAG_* */
 	char *name;
+
+	bool was_full;  /* for handle_one_map() */
 };
 
 static inline bool ceph_can_shift_osds(struct ceph_pg_pool_info *pool)
diff --git a/net/ceph/mon_client.c b/net/ceph/mon_client.c
index a426a4b..98bfbe1 100644
--- a/net/ceph/mon_client.c
+++ b/net/ceph/mon_client.c
@@ -376,6 +376,14 @@
 }
 EXPORT_SYMBOL(ceph_monc_got_map);
 
+void ceph_monc_renew_subs(struct ceph_mon_client *monc)
+{
+	mutex_lock(&monc->mutex);
+	__send_subscribe(monc);
+	mutex_unlock(&monc->mutex);
+}
+EXPORT_SYMBOL(ceph_monc_renew_subs);
+
 /*
  * Register interest in the next osdmap
  */
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index 9c35fd8..4227c55 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -1245,6 +1245,21 @@
 	return pi->flags & CEPH_POOL_FLAG_FULL;
 }
 
+static bool have_pool_full(struct ceph_osd_client *osdc)
+{
+	struct rb_node *n;
+
+	for (n = rb_first(&osdc->osdmap->pg_pools); n; n = rb_next(n)) {
+		struct ceph_pg_pool_info *pi =
+		    rb_entry(n, struct ceph_pg_pool_info, node);
+
+		if (__pool_full(pi))
+			return true;
+	}
+
+	return false;
+}
+
 /*
  * Returns whether a request should be blocked from being sent
  * based on the current osdmap and osd_client settings.
@@ -1639,6 +1654,26 @@
 	}
 }
 
+static void maybe_request_map(struct ceph_osd_client *osdc)
+{
+	bool continuous = false;
+
+	WARN_ON(!osdc->osdmap->epoch);
+
+	if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL) ||
+	    ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSERD) ||
+	    ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSEWR)) {
+		dout("%s osdc %p continuous\n", __func__, osdc);
+		continuous = true;
+	} else {
+		dout("%s osdc %p onetime\n", __func__, osdc);
+	}
+
+	if (ceph_monc_want_map(&osdc->client->monc, CEPH_SUB_OSDMAP,
+			       osdc->osdmap->epoch + 1, continuous))
+		ceph_monc_renew_subs(&osdc->client->monc);
+}
+
 /*
  * Caller should hold map_sem for read and request_mutex.
  */
@@ -2119,6 +2154,18 @@
 	up_read(&osdc->map_sem);
 }
 
+static void set_pool_was_full(struct ceph_osd_client *osdc)
+{
+	struct rb_node *n;
+
+	for (n = rb_first(&osdc->osdmap->pg_pools); n; n = rb_next(n)) {
+		struct ceph_pg_pool_info *pi =
+		    rb_entry(n, struct ceph_pg_pool_info, node);
+
+		pi->was_full = __pool_full(pi);
+	}
+}
+
 static void reset_changed_osds(struct ceph_osd_client *osdc)
 {
 	struct rb_node *p, *n;
@@ -2237,6 +2284,57 @@
 	}
 }
 
+static int handle_one_map(struct ceph_osd_client *osdc,
+			  void *p, void *end, bool incremental)
+{
+	struct ceph_osdmap *newmap;
+	struct rb_node *n;
+	bool skipped_map = false;
+	bool was_full;
+
+	was_full = ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL);
+	set_pool_was_full(osdc);
+
+	if (incremental)
+		newmap = osdmap_apply_incremental(&p, end, osdc->osdmap);
+	else
+		newmap = ceph_osdmap_decode(&p, end);
+	if (IS_ERR(newmap))
+		return PTR_ERR(newmap);
+
+	if (newmap != osdc->osdmap) {
+		/*
+		 * Preserve ->was_full before destroying the old map.
+		 * For pools that weren't in the old map, ->was_full
+		 * should be false.
+		 */
+		for (n = rb_first(&newmap->pg_pools); n; n = rb_next(n)) {
+			struct ceph_pg_pool_info *pi =
+			    rb_entry(n, struct ceph_pg_pool_info, node);
+			struct ceph_pg_pool_info *old_pi;
+
+			old_pi = ceph_pg_pool_by_id(osdc->osdmap, pi->id);
+			if (old_pi)
+				pi->was_full = old_pi->was_full;
+			else
+				WARN_ON(pi->was_full);
+		}
+
+		if (osdc->osdmap->epoch &&
+		    osdc->osdmap->epoch + 1 < newmap->epoch) {
+			WARN_ON(incremental);
+			skipped_map = true;
+		}
+
+		ceph_osdmap_destroy(osdc->osdmap);
+		osdc->osdmap = newmap;
+	}
+
+	was_full &= !ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL);
+	kick_requests(osdc, skipped_map, was_full);
+
+	return 0;
+}
 
 /*
  * Process updated osd map.
@@ -2247,27 +2345,29 @@
  */
 void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
 {
-	void *p, *end, *next;
+	void *p = msg->front.iov_base;
+	void *const end = p + msg->front.iov_len;
 	u32 nr_maps, maplen;
 	u32 epoch;
-	struct ceph_osdmap *newmap = NULL, *oldmap;
-	int err;
 	struct ceph_fsid fsid;
-	bool was_full;
+	bool handled_incremental = false;
+	bool was_pauserd, was_pausewr;
+	bool pauserd, pausewr;
+	int err;
 
-	dout("handle_map have %u\n", osdc->osdmap->epoch);
-	p = msg->front.iov_base;
-	end = p + msg->front.iov_len;
+	dout("%s have %u\n", __func__, osdc->osdmap->epoch);
+	down_write(&osdc->map_sem);
 
 	/* verify fsid */
 	ceph_decode_need(&p, end, sizeof(fsid), bad);
 	ceph_decode_copy(&p, &fsid, sizeof(fsid));
 	if (ceph_check_fsid(osdc->client, &fsid) < 0)
-		return;
+		goto bad;
 
-	down_write(&osdc->map_sem);
-
-	was_full = ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL);
+	was_pauserd = ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSERD);
+	was_pausewr = ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSEWR) ||
+		      ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL) ||
+		      have_pool_full(osdc);
 
 	/* incremental maps */
 	ceph_decode_32_safe(&p, end, nr_maps, bad);
@@ -2277,33 +2377,22 @@
 		epoch = ceph_decode_32(&p);
 		maplen = ceph_decode_32(&p);
 		ceph_decode_need(&p, end, maplen, bad);
-		next = p + maplen;
-		if (osdc->osdmap->epoch+1 == epoch) {
+		if (osdc->osdmap->epoch &&
+		    osdc->osdmap->epoch + 1 == epoch) {
 			dout("applying incremental map %u len %d\n",
 			     epoch, maplen);
-			newmap = osdmap_apply_incremental(&p, next,
-							  osdc->osdmap);
-			if (IS_ERR(newmap)) {
-				err = PTR_ERR(newmap);
+			err = handle_one_map(osdc, p, p + maplen, true);
+			if (err)
 				goto bad;
-			}
-			BUG_ON(!newmap);
-			if (newmap != osdc->osdmap) {
-				ceph_osdmap_destroy(osdc->osdmap);
-				osdc->osdmap = newmap;
-			}
-			was_full = was_full ||
-				ceph_osdmap_flag(osdc->osdmap,
-						 CEPH_OSDMAP_FULL);
-			kick_requests(osdc, 0, was_full);
+			handled_incremental = true;
 		} else {
 			dout("ignoring incremental map %u len %d\n",
 			     epoch, maplen);
 		}
-		p = next;
+		p += maplen;
 		nr_maps--;
 	}
-	if (newmap)
+	if (handled_incremental)
 		goto done;
 
 	/* full maps */
@@ -2322,50 +2411,35 @@
 			     "older than our %u\n", epoch, maplen,
 			     osdc->osdmap->epoch);
 		} else {
-			int skipped_map = 0;
-
 			dout("taking full map %u len %d\n", epoch, maplen);
-			newmap = ceph_osdmap_decode(&p, p+maplen);
-			if (IS_ERR(newmap)) {
-				err = PTR_ERR(newmap);
+			err = handle_one_map(osdc, p, p + maplen, false);
+			if (err)
 				goto bad;
-			}
-			BUG_ON(!newmap);
-			oldmap = osdc->osdmap;
-			osdc->osdmap = newmap;
-			if (oldmap) {
-				if (oldmap->epoch + 1 < newmap->epoch)
-					skipped_map = 1;
-				ceph_osdmap_destroy(oldmap);
-			}
-			was_full = was_full ||
-				ceph_osdmap_flag(osdc->osdmap,
-						 CEPH_OSDMAP_FULL);
-			kick_requests(osdc, skipped_map, was_full);
 		}
 		p += maplen;
 		nr_maps--;
 	}
 
 done:
-	downgrade_write(&osdc->map_sem);
-	ceph_monc_got_map(&osdc->client->monc, CEPH_SUB_OSDMAP,
-			  osdc->osdmap->epoch);
-
 	/*
 	 * subscribe to subsequent osdmap updates if full to ensure
 	 * we find out when we are no longer full and stop returning
 	 * ENOSPC.
 	 */
-	if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL) ||
-		ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSERD) ||
-		ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSEWR))
-		ceph_monc_request_next_osdmap(&osdc->client->monc);
+	pauserd = ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSERD);
+	pausewr = ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_PAUSEWR) ||
+		  ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL) ||
+		  have_pool_full(osdc);
+	if (was_pauserd || was_pausewr || pauserd || pausewr)
+		maybe_request_map(osdc);
 
 	mutex_lock(&osdc->request_mutex);
 	__send_queued(osdc);
 	mutex_unlock(&osdc->request_mutex);
-	up_read(&osdc->map_sem);
+
+	ceph_monc_got_map(&osdc->client->monc, CEPH_SUB_OSDMAP,
+			  osdc->osdmap->epoch);
+	up_write(&osdc->map_sem);
 	wake_up_all(&osdc->client->auth_wq);
 	return;