libceph: follow redirect replies from osds

Follow redirect replies from osds, for details see ceph.git commit
fbbe3ad1220799b7bb00ea30fce581c5eadaf034.

v1 (current) version of redirect reply consists of oloc and oid, which
expands to pool, key, nspace, hash and oid.  However, server-side code
that would populate anything other than pool doesn't exist yet, and
hence this commit adds support for pool redirects only.  To make sure
that future server-side updates don't break us, we decode all fields
and, if any of key, nspace, hash or oid have a non-default value, error
out with "corrupt osd_op_reply ..." message.

Signed-off-by: Ilya Dryomov <ilya.dryomov@inktank.com>
Reviewed-by: Sage Weil <sage@inktank.com>
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index 3997a87..010ff3b 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -369,6 +369,7 @@
 	INIT_LIST_HEAD(&req->r_osd_item);
 
 	req->r_base_oloc.pool = -1;
+	req->r_target_oloc.pool = -1;
 
 	/* create reply message */
 	if (use_mempool)
@@ -1256,23 +1257,36 @@
 			     struct ceph_osd_request *req,
 			     struct ceph_pg *pg_out)
 {
-	if ((req->r_flags & CEPH_OSD_FLAG_IGNORE_OVERLAY) == 0) {
+	bool need_check_tiering;
+
+	need_check_tiering = false;
+	if (req->r_target_oloc.pool == -1) {
+		req->r_target_oloc = req->r_base_oloc; /* struct */
+		need_check_tiering = true;
+	}
+	if (req->r_target_oid.name_len == 0) {
+		ceph_oid_copy(&req->r_target_oid, &req->r_base_oid);
+		need_check_tiering = true;
+	}
+
+	if (need_check_tiering &&
+	    (req->r_flags & CEPH_OSD_FLAG_IGNORE_OVERLAY) == 0) {
 		struct ceph_pg_pool_info *pi;
 
-		pi = ceph_pg_pool_by_id(osdmap, req->r_base_oloc.pool);
+		pi = ceph_pg_pool_by_id(osdmap, req->r_target_oloc.pool);
 		if (pi) {
 			if ((req->r_flags & CEPH_OSD_FLAG_READ) &&
 			    pi->read_tier >= 0)
-				req->r_base_oloc.pool = pi->read_tier;
+				req->r_target_oloc.pool = pi->read_tier;
 			if ((req->r_flags & CEPH_OSD_FLAG_WRITE) &&
 			    pi->write_tier >= 0)
-				req->r_base_oloc.pool = pi->write_tier;
+				req->r_target_oloc.pool = pi->write_tier;
 		}
 		/* !pi is caught in ceph_oloc_oid_to_pg() */
 	}
 
-	return ceph_oloc_oid_to_pg(osdmap, &req->r_base_oloc,
-				   &req->r_base_oid, pg_out);
+	return ceph_oloc_oid_to_pg(osdmap, &req->r_target_oloc,
+				   &req->r_target_oid, pg_out);
 }
 
 /*
@@ -1382,7 +1396,7 @@
 	/* fill in message content that changes each time we send it */
 	put_unaligned_le32(osdc->osdmap->epoch, req->r_request_osdmap_epoch);
 	put_unaligned_le32(req->r_flags, req->r_request_flags);
-	put_unaligned_le64(req->r_base_oloc.pool, req->r_request_pool);
+	put_unaligned_le64(req->r_target_oloc.pool, req->r_request_pool);
 	p = req->r_request_pgid;
 	ceph_encode_64(&p, req->r_pgid.pool);
 	ceph_encode_32(&p, req->r_pgid.seed);
@@ -1483,6 +1497,109 @@
 			      round_jiffies_relative(delay));
 }
 
+static int ceph_oloc_decode(void **p, void *end,
+			    struct ceph_object_locator *oloc)
+{
+	u8 struct_v, struct_cv;
+	u32 len;
+	void *struct_end;
+	int ret = 0;
+
+	ceph_decode_need(p, end, 1 + 1 + 4, e_inval);
+	struct_v = ceph_decode_8(p);
+	struct_cv = ceph_decode_8(p);
+	if (struct_v < 3) {
+		pr_warn("got v %d < 3 cv %d of ceph_object_locator\n",
+			struct_v, struct_cv);
+		goto e_inval;
+	}
+	if (struct_cv > 6) {
+		pr_warn("got v %d cv %d > 6 of ceph_object_locator\n",
+			struct_v, struct_cv);
+		goto e_inval;
+	}
+	len = ceph_decode_32(p);
+	ceph_decode_need(p, end, len, e_inval);
+	struct_end = *p + len;
+
+	oloc->pool = ceph_decode_64(p);
+	*p += 4; /* skip preferred */
+
+	len = ceph_decode_32(p);
+	if (len > 0) {
+		pr_warn("ceph_object_locator::key is set\n");
+		goto e_inval;
+	}
+
+	if (struct_v >= 5) {
+		len = ceph_decode_32(p);
+		if (len > 0) {
+			pr_warn("ceph_object_locator::nspace is set\n");
+			goto e_inval;
+		}
+	}
+
+	if (struct_v >= 6) {
+		s64 hash = ceph_decode_64(p);
+		if (hash != -1) {
+			pr_warn("ceph_object_locator::hash is set\n");
+			goto e_inval;
+		}
+	}
+
+	/* skip the rest */
+	*p = struct_end;
+out:
+	return ret;
+
+e_inval:
+	ret = -EINVAL;
+	goto out;
+}
+
+static int ceph_redirect_decode(void **p, void *end,
+				struct ceph_request_redirect *redir)
+{
+	u8 struct_v, struct_cv;
+	u32 len;
+	void *struct_end;
+	int ret;
+
+	ceph_decode_need(p, end, 1 + 1 + 4, e_inval);
+	struct_v = ceph_decode_8(p);
+	struct_cv = ceph_decode_8(p);
+	if (struct_cv > 1) {
+		pr_warn("got v %d cv %d > 1 of ceph_request_redirect\n",
+			struct_v, struct_cv);
+		goto e_inval;
+	}
+	len = ceph_decode_32(p);
+	ceph_decode_need(p, end, len, e_inval);
+	struct_end = *p + len;
+
+	ret = ceph_oloc_decode(p, end, &redir->oloc);
+	if (ret)
+		goto out;
+
+	len = ceph_decode_32(p);
+	if (len > 0) {
+		pr_warn("ceph_request_redirect::object_name is set\n");
+		goto e_inval;
+	}
+
+	len = ceph_decode_32(p);
+	*p += len; /* skip osd_instructions */
+
+	/* skip the rest */
+	*p = struct_end;
+out:
+	return ret;
+
+e_inval:
+	ret = -EINVAL;
+	goto out;
+}
+
 static void complete_request(struct ceph_osd_request *req)
 {
 	complete_all(&req->r_safe_completion);  /* fsync waiter */
@@ -1497,6 +1614,7 @@
 {
 	void *p, *end;
 	struct ceph_osd_request *req;
+	struct ceph_request_redirect redir;
 	u64 tid;
 	int object_len;
 	unsigned int numops;
@@ -1576,10 +1694,41 @@
 	for (i = 0; i < numops; i++)
 		req->r_reply_op_result[i] = ceph_decode_32(&p);
 
+	if (le16_to_cpu(msg->hdr.version) >= 6) {
+		p += 8 + 4; /* skip replay_version */
+		p += 8; /* skip user_version */
+
+		err = ceph_redirect_decode(&p, end, &redir);
+		if (err)
+			goto bad_put;
+	} else {
+		redir.oloc.pool = -1;
+	}
+
+	if (redir.oloc.pool != -1) {
+		dout("redirect pool %lld\n", redir.oloc.pool);
+
+		__unregister_request(osdc, req);
+		mutex_unlock(&osdc->request_mutex);
+
+		req->r_target_oloc = redir.oloc; /* struct */
+
+		/*
+		 * Start redirect requests with nofail=true.  If
+		 * mapping fails, request will end up on the notarget
+		 * list, waiting for the new osdmap (which can take
+		 * a while), even though the original request mapped
+		 * successfully.  In the future we might want to follow
+		 * original request's nofail setting here.
+		 */
+		err = ceph_osdc_start_request(osdc, req, true);
+		BUG_ON(err);
+
+		goto done;
+	}
+
 	already_completed = req->r_got_reply;
-
 	if (!req->r_got_reply) {
-
 		req->r_result = result;
 		dout("handle_reply result %d bytes %d\n", req->r_result,
 		     bytes);