libceph: ceph_osds, ceph_pg_to_up_acting_osds()

Knowning just acting set isn't enough, we need to be able to record up
set as well to detect interval changes.  This means returning (up[],
up_len, up_primary, acting[], acting_len, acting_primary) and passing
it around.  Introduce and switch to ceph_osds to help with that.

Rename ceph_calc_pg_acting() to ceph_pg_to_up_acting_osds() and return
both up and acting sets from it.

Signed-off-by: Ilya Dryomov <idryomov@gmail.com>
diff --git a/net/ceph/osdmap.c b/net/ceph/osdmap.c
index 6267839..f5fc8fc 100644
--- a/net/ceph/osdmap.c
+++ b/net/ceph/osdmap.c
@@ -1474,6 +1474,38 @@
 }
 EXPORT_SYMBOL(ceph_oid_destroy);
 
+static bool osds_valid(const struct ceph_osds *set)
+{
+	/* non-empty set */
+	if (set->size > 0 && set->primary >= 0)
+		return true;
+
+	/* empty can_shift_osds set */
+	if (!set->size && set->primary == -1)
+		return true;
+
+	/* empty !can_shift_osds set - all NONE */
+	if (set->size > 0 && set->primary == -1) {
+		int i;
+
+		for (i = 0; i < set->size; i++) {
+			if (set->osds[i] != CRUSH_ITEM_NONE)
+				break;
+		}
+		if (i == set->size)
+			return true;
+	}
+
+	return false;
+}
+
+void ceph_osds_copy(struct ceph_osds *dest, const struct ceph_osds *src)
+{
+	memcpy(dest->osds, src->osds, src->size * sizeof(src->osds[0]));
+	dest->size = src->size;
+	dest->primary = src->primary;
+}
+
 /*
  * calculate file layout from given offset, length.
  * fill in correct oid, logical length, and object extent
@@ -1571,6 +1603,46 @@
 }
 EXPORT_SYMBOL(ceph_object_locator_to_pg);
 
+/*
+ * Map a raw PG (full precision ps) into an actual PG.
+ */
+static void raw_pg_to_pg(struct ceph_pg_pool_info *pi,
+			 const struct ceph_pg *raw_pgid,
+			 struct ceph_pg *pgid)
+{
+	pgid->pool = raw_pgid->pool;
+	pgid->seed = ceph_stable_mod(raw_pgid->seed, pi->pg_num,
+				     pi->pg_num_mask);
+}
+
+/*
+ * Map a raw PG (full precision ps) into a placement ps (placement
+ * seed).  Include pool id in that value so that different pools don't
+ * use the same seeds.
+ */
+static u32 raw_pg_to_pps(struct ceph_pg_pool_info *pi,
+			 const struct ceph_pg *raw_pgid)
+{
+	if (pi->flags & CEPH_POOL_FLAG_HASHPSPOOL) {
+		/* hash pool id and seed so that pool PGs do not overlap */
+		return crush_hash32_2(CRUSH_HASH_RJENKINS1,
+				      ceph_stable_mod(raw_pgid->seed,
+						      pi->pgp_num,
+						      pi->pgp_num_mask),
+				      raw_pgid->pool);
+	} else {
+		/*
+		 * legacy behavior: add ps and pool together.  this is
+		 * not a great approach because the PGs from each pool
+		 * will overlap on top of each other: 0.5 == 1.4 ==
+		 * 2.3 == ...
+		 */
+		return ceph_stable_mod(raw_pgid->seed, pi->pgp_num,
+				       pi->pgp_num_mask) +
+		       (unsigned)raw_pgid->pool;
+	}
+}
+
 static int do_crush(struct ceph_osdmap *map, int ruleno, int x,
 		    int *result, int result_max,
 		    const __u32 *weight, int weight_max)
@@ -1588,84 +1660,92 @@
 }
 
 /*
- * Calculate raw (crush) set for given pgid.
+ * Calculate raw set (CRUSH output) for given PG.  The result may
+ * contain nonexistent OSDs.  ->primary is undefined for a raw set.
  *
- * Return raw set length, or error.
+ * Placement seed (CRUSH input) is returned through @ppps.
  */
-static int pg_to_raw_osds(struct ceph_osdmap *osdmap,
-			  struct ceph_pg_pool_info *pool,
-			  struct ceph_pg pgid, u32 pps, int *osds)
+static void pg_to_raw_osds(struct ceph_osdmap *osdmap,
+			   struct ceph_pg_pool_info *pi,
+			   const struct ceph_pg *raw_pgid,
+			   struct ceph_osds *raw,
+			   u32 *ppps)
 {
+	u32 pps = raw_pg_to_pps(pi, raw_pgid);
 	int ruleno;
 	int len;
 
-	/* crush */
-	ruleno = crush_find_rule(osdmap->crush, pool->crush_ruleset,
-				 pool->type, pool->size);
+	ceph_osds_init(raw);
+	if (ppps)
+		*ppps = pps;
+
+	ruleno = crush_find_rule(osdmap->crush, pi->crush_ruleset, pi->type,
+				 pi->size);
 	if (ruleno < 0) {
 		pr_err("no crush rule: pool %lld ruleset %d type %d size %d\n",
-		       pgid.pool, pool->crush_ruleset, pool->type,
-		       pool->size);
-		return -ENOENT;
+		       pi->id, pi->crush_ruleset, pi->type, pi->size);
+		return;
 	}
 
-	len = do_crush(osdmap, ruleno, pps, osds,
-		       min_t(int, pool->size, CEPH_PG_MAX_SIZE),
+	len = do_crush(osdmap, ruleno, pps, raw->osds,
+		       min_t(int, pi->size, ARRAY_SIZE(raw->osds)),
 		       osdmap->osd_weight, osdmap->max_osd);
 	if (len < 0) {
 		pr_err("error %d from crush rule %d: pool %lld ruleset %d type %d size %d\n",
-		       len, ruleno, pgid.pool, pool->crush_ruleset,
-		       pool->type, pool->size);
-		return len;
+		       len, ruleno, pi->id, pi->crush_ruleset, pi->type,
+		       pi->size);
+		return;
 	}
 
-	return len;
+	raw->size = len;
 }
 
 /*
- * Given raw set, calculate up set and up primary.
+ * Given raw set, calculate up set and up primary.  By definition of an
+ * up set, the result won't contain nonexistent or down OSDs.
  *
- * Return up set length.  *primary is set to up primary osd id, or -1
- * if up set is empty.
+ * This is done in-place - on return @set is the up set.  If it's
+ * empty, ->primary will remain undefined.
  */
-static int raw_to_up_osds(struct ceph_osdmap *osdmap,
-			  struct ceph_pg_pool_info *pool,
-			  int *osds, int len, int *primary)
+static void raw_to_up_osds(struct ceph_osdmap *osdmap,
+			   struct ceph_pg_pool_info *pi,
+			   struct ceph_osds *set)
 {
-	int up_primary = -1;
 	int i;
 
-	if (ceph_can_shift_osds(pool)) {
+	/* ->primary is undefined for a raw set */
+	BUG_ON(set->primary != -1);
+
+	if (ceph_can_shift_osds(pi)) {
 		int removed = 0;
 
-		for (i = 0; i < len; i++) {
-			if (ceph_osd_is_down(osdmap, osds[i])) {
+		/* shift left */
+		for (i = 0; i < set->size; i++) {
+			if (ceph_osd_is_down(osdmap, set->osds[i])) {
 				removed++;
 				continue;
 			}
 			if (removed)
-				osds[i - removed] = osds[i];
+				set->osds[i - removed] = set->osds[i];
 		}
-
-		len -= removed;
-		if (len > 0)
-			up_primary = osds[0];
+		set->size -= removed;
+		if (set->size > 0)
+			set->primary = set->osds[0];
 	} else {
-		for (i = len - 1; i >= 0; i--) {
-			if (ceph_osd_is_down(osdmap, osds[i]))
-				osds[i] = CRUSH_ITEM_NONE;
+		/* set down/dne devices to NONE */
+		for (i = set->size - 1; i >= 0; i--) {
+			if (ceph_osd_is_down(osdmap, set->osds[i]))
+				set->osds[i] = CRUSH_ITEM_NONE;
 			else
-				up_primary = osds[i];
+				set->primary = set->osds[i];
 		}
 	}
-
-	*primary = up_primary;
-	return len;
 }
 
-static void apply_primary_affinity(struct ceph_osdmap *osdmap, u32 pps,
-				   struct ceph_pg_pool_info *pool,
-				   int *osds, int len, int *primary)
+static void apply_primary_affinity(struct ceph_osdmap *osdmap,
+				   struct ceph_pg_pool_info *pi,
+				   u32 pps,
+				   struct ceph_osds *up)
 {
 	int i;
 	int pos = -1;
@@ -1677,8 +1757,8 @@
 	if (!osdmap->osd_primary_affinity)
 		return;
 
-	for (i = 0; i < len; i++) {
-		int osd = osds[i];
+	for (i = 0; i < up->size; i++) {
+		int osd = up->osds[i];
 
 		if (osd != CRUSH_ITEM_NONE &&
 		    osdmap->osd_primary_affinity[osd] !=
@@ -1686,7 +1766,7 @@
 			break;
 		}
 	}
-	if (i == len)
+	if (i == up->size)
 		return;
 
 	/*
@@ -1694,8 +1774,8 @@
 	 * osd into the hash/rng so that a proportional fraction of an
 	 * osd's pgs get rejected as primary.
 	 */
-	for (i = 0; i < len; i++) {
-		int osd = osds[i];
+	for (i = 0; i < up->size; i++) {
+		int osd = up->osds[i];
 		u32 aff;
 
 		if (osd == CRUSH_ITEM_NONE)
@@ -1720,123 +1800,99 @@
 	if (pos < 0)
 		return;
 
-	*primary = osds[pos];
+	up->primary = up->osds[pos];
 
-	if (ceph_can_shift_osds(pool) && pos > 0) {
+	if (ceph_can_shift_osds(pi) && pos > 0) {
 		/* move the new primary to the front */
 		for (i = pos; i > 0; i--)
-			osds[i] = osds[i - 1];
-		osds[0] = *primary;
+			up->osds[i] = up->osds[i - 1];
+		up->osds[0] = up->primary;
 	}
 }
 
 /*
- * Given up set, apply pg_temp and primary_temp mappings.
+ * Get pg_temp and primary_temp mappings for given PG.
  *
- * Return acting set length.  *primary is set to acting primary osd id,
- * or -1 if acting set is empty.
+ * Note that a PG may have none, only pg_temp, only primary_temp or
+ * both pg_temp and primary_temp mappings.  This means @temp isn't
+ * always a valid OSD set on return: in the "only primary_temp" case,
+ * @temp will have its ->primary >= 0 but ->size == 0.
  */
-static int apply_temps(struct ceph_osdmap *osdmap,
-		       struct ceph_pg_pool_info *pool, struct ceph_pg pgid,
-		       int *osds, int len, int *primary)
+static void get_temp_osds(struct ceph_osdmap *osdmap,
+			  struct ceph_pg_pool_info *pi,
+			  const struct ceph_pg *raw_pgid,
+			  struct ceph_osds *temp)
 {
+	struct ceph_pg pgid;
 	struct ceph_pg_mapping *pg;
-	int temp_len;
-	int temp_primary;
 	int i;
 
-	/* raw_pg -> pg */
-	pgid.seed = ceph_stable_mod(pgid.seed, pool->pg_num,
-				    pool->pg_num_mask);
+	raw_pg_to_pg(pi, raw_pgid, &pgid);
+	ceph_osds_init(temp);
 
 	/* pg_temp? */
 	pg = __lookup_pg_mapping(&osdmap->pg_temp, pgid);
 	if (pg) {
-		temp_len = 0;
-		temp_primary = -1;
-
 		for (i = 0; i < pg->pg_temp.len; i++) {
 			if (ceph_osd_is_down(osdmap, pg->pg_temp.osds[i])) {
-				if (ceph_can_shift_osds(pool))
+				if (ceph_can_shift_osds(pi))
 					continue;
-				else
-					osds[temp_len++] = CRUSH_ITEM_NONE;
+
+				temp->osds[temp->size++] = CRUSH_ITEM_NONE;
 			} else {
-				osds[temp_len++] = pg->pg_temp.osds[i];
+				temp->osds[temp->size++] = pg->pg_temp.osds[i];
 			}
 		}
 
 		/* apply pg_temp's primary */
-		for (i = 0; i < temp_len; i++) {
-			if (osds[i] != CRUSH_ITEM_NONE) {
-				temp_primary = osds[i];
+		for (i = 0; i < temp->size; i++) {
+			if (temp->osds[i] != CRUSH_ITEM_NONE) {
+				temp->primary = temp->osds[i];
 				break;
 			}
 		}
-	} else {
-		temp_len = len;
-		temp_primary = *primary;
 	}
 
 	/* primary_temp? */
 	pg = __lookup_pg_mapping(&osdmap->primary_temp, pgid);
 	if (pg)
-		temp_primary = pg->primary_temp.osd;
-
-	*primary = temp_primary;
-	return temp_len;
+		temp->primary = pg->primary_temp.osd;
 }
 
 /*
- * Calculate acting set for given pgid.
+ * Map a PG to its acting set as well as its up set.
  *
- * Return acting set length, or error.  *primary is set to acting
- * primary osd id, or -1 if acting set is empty or on error.
+ * Acting set is used for data mapping purposes, while up set can be
+ * recorded for detecting interval changes and deciding whether to
+ * resend a request.
  */
-int ceph_calc_pg_acting(struct ceph_osdmap *osdmap, struct ceph_pg pgid,
-			int *osds, int *primary)
+void ceph_pg_to_up_acting_osds(struct ceph_osdmap *osdmap,
+			       const struct ceph_pg *raw_pgid,
+			       struct ceph_osds *up,
+			       struct ceph_osds *acting)
 {
-	struct ceph_pg_pool_info *pool;
+	struct ceph_pg_pool_info *pi;
 	u32 pps;
-	int len;
 
-	pool = __lookup_pg_pool(&osdmap->pg_pools, pgid.pool);
-	if (!pool) {
-		*primary = -1;
-		return -ENOENT;
+	pi = ceph_pg_pool_by_id(osdmap, raw_pgid->pool);
+	if (!pi) {
+		ceph_osds_init(up);
+		ceph_osds_init(acting);
+		goto out;
 	}
 
-	if (pool->flags & CEPH_POOL_FLAG_HASHPSPOOL) {
-		/* hash pool id and seed so that pool PGs do not overlap */
-		pps = crush_hash32_2(CRUSH_HASH_RJENKINS1,
-				     ceph_stable_mod(pgid.seed, pool->pgp_num,
-						     pool->pgp_num_mask),
-				     pgid.pool);
-	} else {
-		/*
-		 * legacy behavior: add ps and pool together.  this is
-		 * not a great approach because the PGs from each pool
-		 * will overlap on top of each other: 0.5 == 1.4 ==
-		 * 2.3 == ...
-		 */
-		pps = ceph_stable_mod(pgid.seed, pool->pgp_num,
-				      pool->pgp_num_mask) +
-			(unsigned)pgid.pool;
+	pg_to_raw_osds(osdmap, pi, raw_pgid, up, &pps);
+	raw_to_up_osds(osdmap, pi, up);
+	apply_primary_affinity(osdmap, pi, pps, up);
+	get_temp_osds(osdmap, pi, raw_pgid, acting);
+	if (!acting->size) {
+		memcpy(acting->osds, up->osds, up->size * sizeof(up->osds[0]));
+		acting->size = up->size;
+		if (acting->primary == -1)
+			acting->primary = up->primary;
 	}
-
-	len = pg_to_raw_osds(osdmap, pool, pgid, pps, osds);
-	if (len < 0) {
-		*primary = -1;
-		return len;
-	}
-
-	len = raw_to_up_osds(osdmap, pool, osds, len, primary);
-
-	apply_primary_affinity(osdmap, pps, pool, osds, len, primary);
-
-	len = apply_temps(osdmap, pool, pgid, osds, len, primary);
-
-	return len;
+out:
+	WARN_ON(!osds_valid(up) || !osds_valid(acting));
 }
 
 /*
@@ -1844,11 +1900,9 @@
  */
 int ceph_calc_pg_primary(struct ceph_osdmap *osdmap, struct ceph_pg pgid)
 {
-	int osds[CEPH_PG_MAX_SIZE];
-	int primary;
+	struct ceph_osds up, acting;
 
-	ceph_calc_pg_acting(osdmap, pgid, osds, &primary);
-
-	return primary;
+	ceph_pg_to_up_acting_osds(osdmap, &pgid, &up, &acting);
+	return acting.primary;
 }
 EXPORT_SYMBOL(ceph_calc_pg_primary);