libceph: rados pool namespace support

Add pool namesapce pointer to struct ceph_file_layout and struct
ceph_object_locator. Pool namespace is used by when mapping object
to PG, it's also used when composing OSD request.

The namespace pointer in struct ceph_file_layout is RCU protected.
So libceph can read namespace without taking lock.

Signed-off-by: Yan, Zheng <zyan@redhat.com>
[idryomov@gmail.com: ceph_oloc_destroy(), misc minor changes]
Signed-off-by: Ilya Dryomov <idryomov@gmail.com>
diff --git a/net/ceph/debugfs.c b/net/ceph/debugfs.c
index e77b04c..c62b2b0 100644
--- a/net/ceph/debugfs.c
+++ b/net/ceph/debugfs.c
@@ -156,8 +156,16 @@
 	seq_printf(s, "]/%d\t[", t->up.primary);
 	for (i = 0; i < t->acting.size; i++)
 		seq_printf(s, "%s%d", (!i ? "" : ","), t->acting.osds[i]);
-	seq_printf(s, "]/%d\t%*pE\t0x%x", t->acting.primary,
-		   t->target_oid.name_len, t->target_oid.name, t->flags);
+	seq_printf(s, "]/%d\t", t->acting.primary);
+	if (t->target_oloc.pool_ns) {
+		seq_printf(s, "%*pE/%*pE\t0x%x",
+			(int)t->target_oloc.pool_ns->len,
+			t->target_oloc.pool_ns->str,
+			t->target_oid.name_len, t->target_oid.name, t->flags);
+	} else {
+		seq_printf(s, "%*pE\t0x%x", t->target_oid.name_len,
+			t->target_oid.name, t->flags);
+	}
 	if (t->paused)
 		seq_puts(s, "\tP");
 }
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index 23efcac..b68cc15 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -387,7 +387,9 @@
 static void target_destroy(struct ceph_osd_request_target *t)
 {
 	ceph_oid_destroy(&t->base_oid);
+	ceph_oloc_destroy(&t->base_oloc);
 	ceph_oid_destroy(&t->target_oid);
+	ceph_oloc_destroy(&t->target_oloc);
 }
 
 /*
@@ -533,6 +535,11 @@
 }
 EXPORT_SYMBOL(ceph_osdc_alloc_request);
 
+static int ceph_oloc_encoding_size(struct ceph_object_locator *oloc)
+{
+	return 8 + 4 + 4 + 4 + (oloc->pool_ns ? oloc->pool_ns->len : 0);
+}
+
 int ceph_osdc_alloc_messages(struct ceph_osd_request *req, gfp_t gfp)
 {
 	struct ceph_osd_client *osdc = req->r_osdc;
@@ -540,11 +547,13 @@
 	int msg_size;
 
 	WARN_ON(ceph_oid_empty(&req->r_base_oid));
+	WARN_ON(ceph_oloc_empty(&req->r_base_oloc));
 
 	/* create request message */
 	msg_size = 4 + 4 + 4; /* client_inc, osdmap_epoch, flags */
 	msg_size += 4 + 4 + 4 + 8; /* mtime, reassert_version */
-	msg_size += 2 + 4 + 8 + 4 + 4; /* oloc */
+	msg_size += CEPH_ENCODING_START_BLK_LEN +
+			ceph_oloc_encoding_size(&req->r_base_oloc); /* oloc */
 	msg_size += 1 + 8 + 4 + 4; /* pgid */
 	msg_size += 4 + req->r_base_oid.name_len; /* oid */
 	msg_size += 2 + req->r_num_ops * sizeof(struct ceph_osd_op);
@@ -949,6 +958,7 @@
 
 	req->r_flags = flags;
 	req->r_base_oloc.pool = layout->pool_id;
+	req->r_base_oloc.pool_ns = ceph_try_get_string(layout->pool_ns);
 	ceph_oid_printf(&req->r_base_oid, "%llx.%08llx", vino.ino, objnum);
 
 	req->r_snapid = vino.snap;
@@ -1489,12 +1499,16 @@
 	p += sizeof(req->r_replay_version);
 
 	/* oloc */
-	ceph_encode_8(&p, 4);
-	ceph_encode_8(&p, 4);
-	ceph_encode_32(&p, 8 + 4 + 4);
+	ceph_start_encoding(&p, 5, 4,
+			    ceph_oloc_encoding_size(&req->r_t.target_oloc));
 	ceph_encode_64(&p, req->r_t.target_oloc.pool);
 	ceph_encode_32(&p, -1); /* preferred */
 	ceph_encode_32(&p, 0); /* key len */
+	if (req->r_t.target_oloc.pool_ns)
+		ceph_encode_string(&p, end, req->r_t.target_oloc.pool_ns->str,
+				   req->r_t.target_oloc.pool_ns->len);
+	else
+		ceph_encode_32(&p, 0);
 
 	/* pgid */
 	ceph_encode_8(&p, 1);
@@ -2596,8 +2610,8 @@
 	if (struct_v >= 5) {
 		len = ceph_decode_32(p);
 		if (len > 0) {
-			pr_warn("ceph_object_locator::nspace is set\n");
-			goto e_inval;
+			ceph_decode_need(p, end, len, e_inval);
+			*p += len;
 		}
 	}
 
@@ -2835,7 +2849,11 @@
 		unlink_request(osd, req);
 		mutex_unlock(&osd->lock);
 
-		ceph_oloc_copy(&req->r_t.target_oloc, &m.redirect.oloc);
+		/*
+		 * Not ceph_oloc_copy() - changing pool_ns is not
+		 * supported.
+		 */
+		req->r_t.target_oloc.pool = m.redirect.oloc.pool;
 		req->r_flags |= CEPH_OSD_FLAG_REDIRECTED;
 		req->r_tid = 0;
 		__submit_request(req, false);
diff --git a/net/ceph/osdmap.c b/net/ceph/osdmap.c
index 5947b2e..d243688 100644
--- a/net/ceph/osdmap.c
+++ b/net/ceph/osdmap.c
@@ -1510,6 +1510,24 @@
 	return ERR_PTR(err);
 }
 
+void ceph_oloc_copy(struct ceph_object_locator *dest,
+		    const struct ceph_object_locator *src)
+{
+	WARN_ON(!ceph_oloc_empty(dest));
+	WARN_ON(dest->pool_ns); /* empty() only covers ->pool */
+
+	dest->pool = src->pool;
+	if (src->pool_ns)
+		dest->pool_ns = ceph_get_string(src->pool_ns);
+}
+EXPORT_SYMBOL(ceph_oloc_copy);
+
+void ceph_oloc_destroy(struct ceph_object_locator *oloc)
+{
+	ceph_put_string(oloc->pool_ns);
+}
+EXPORT_SYMBOL(ceph_oloc_destroy);
+
 void ceph_oid_copy(struct ceph_object_id *dest,
 		   const struct ceph_object_id *src)
 {
@@ -1844,12 +1862,34 @@
 	if (!pi)
 		return -ENOENT;
 
-	raw_pgid->pool = oloc->pool;
-	raw_pgid->seed = ceph_str_hash(pi->object_hash, oid->name,
-				       oid->name_len);
+	if (!oloc->pool_ns) {
+		raw_pgid->pool = oloc->pool;
+		raw_pgid->seed = ceph_str_hash(pi->object_hash, oid->name,
+					     oid->name_len);
+		dout("%s %s -> raw_pgid %llu.%x\n", __func__, oid->name,
+		     raw_pgid->pool, raw_pgid->seed);
+	} else {
+		char stack_buf[256];
+		char *buf = stack_buf;
+		int nsl = oloc->pool_ns->len;
+		size_t total = nsl + 1 + oid->name_len;
 
-	dout("%s %s -> raw_pgid %llu.%x\n", __func__, oid->name,
-	     raw_pgid->pool, raw_pgid->seed);
+		if (total > sizeof(stack_buf)) {
+			buf = kmalloc(total, GFP_NOIO);
+			if (!buf)
+				return -ENOMEM;
+		}
+		memcpy(buf, oloc->pool_ns->str, nsl);
+		buf[nsl] = '\037';
+		memcpy(buf + nsl + 1, oid->name, oid->name_len);
+		raw_pgid->pool = oloc->pool;
+		raw_pgid->seed = ceph_str_hash(pi->object_hash, buf, total);
+		if (buf != stack_buf)
+			kfree(buf);
+		dout("%s %s ns %.*s -> raw_pgid %llu.%x\n", __func__,
+		     oid->name, nsl, oloc->pool_ns->str,
+		     raw_pgid->pool, raw_pgid->seed);
+	}
 	return 0;
 }
 EXPORT_SYMBOL(ceph_object_locator_to_pg);