ceph: rados pool namespace support

This patch adds codes that decode pool namespace information in
cap message and request reply. Pool namespace is saved in i_layout,
it will be passed to libceph when doing read/write.

Signed-off-by: Yan, Zheng <zyan@redhat.com>
diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c
index 3f8efd8..d5b6f95 100644
--- a/fs/ceph/addr.c
+++ b/fs/ceph/addr.c
@@ -1730,7 +1730,8 @@
 	POOL_WRITE	= 2,
 };
 
-static int __ceph_pool_perm_get(struct ceph_inode_info *ci, s64 pool)
+static int __ceph_pool_perm_get(struct ceph_inode_info *ci,
+				s64 pool, struct ceph_string *pool_ns)
 {
 	struct ceph_fs_client *fsc = ceph_inode_to_client(&ci->vfs_inode);
 	struct ceph_mds_client *mdsc = fsc->mdsc;
@@ -1738,6 +1739,7 @@
 	struct rb_node **p, *parent;
 	struct ceph_pool_perm *perm;
 	struct page **pages;
+	size_t pool_ns_len;
 	int err = 0, err2 = 0, have = 0;
 
 	down_read(&mdsc->pool_perm_rwsem);
@@ -1749,17 +1751,31 @@
 		else if (pool > perm->pool)
 			p = &(*p)->rb_right;
 		else {
-			have = perm->perm;
-			break;
+			int ret = ceph_compare_string(pool_ns,
+						perm->pool_ns,
+						perm->pool_ns_len);
+			if (ret < 0)
+				p = &(*p)->rb_left;
+			else if (ret > 0)
+				p = &(*p)->rb_right;
+			else {
+				have = perm->perm;
+				break;
+			}
 		}
 	}
 	up_read(&mdsc->pool_perm_rwsem);
 	if (*p)
 		goto out;
 
-	dout("__ceph_pool_perm_get pool %lld no perm cached\n", pool);
+	if (pool_ns)
+		dout("__ceph_pool_perm_get pool %lld ns %.*s no perm cached\n",
+		     pool, (int)pool_ns->len, pool_ns->str);
+	else
+		dout("__ceph_pool_perm_get pool %lld no perm cached\n", pool);
 
 	down_write(&mdsc->pool_perm_rwsem);
+	p = &mdsc->pool_perm_tree.rb_node;
 	parent = NULL;
 	while (*p) {
 		parent = *p;
@@ -1769,8 +1785,17 @@
 		else if (pool > perm->pool)
 			p = &(*p)->rb_right;
 		else {
-			have = perm->perm;
-			break;
+			int ret = ceph_compare_string(pool_ns,
+						perm->pool_ns,
+						perm->pool_ns_len);
+			if (ret < 0)
+				p = &(*p)->rb_left;
+			else if (ret > 0)
+				p = &(*p)->rb_right;
+			else {
+				have = perm->perm;
+				break;
+			}
 		}
 	}
 	if (*p) {
@@ -1788,6 +1813,8 @@
 	rd_req->r_flags = CEPH_OSD_FLAG_READ;
 	osd_req_op_init(rd_req, 0, CEPH_OSD_OP_STAT, 0);
 	rd_req->r_base_oloc.pool = pool;
+	if (pool_ns)
+		rd_req->r_base_oloc.pool_ns = ceph_get_string(pool_ns);
 	ceph_oid_printf(&rd_req->r_base_oid, "%llx.00000000", ci->i_vino.ino);
 
 	err = ceph_osdc_alloc_messages(rd_req, GFP_NOFS);
@@ -1841,7 +1868,8 @@
 		goto out_unlock;
 	}
 
-	perm = kmalloc(sizeof(*perm), GFP_NOFS);
+	pool_ns_len = pool_ns ? pool_ns->len : 0;
+	perm = kmalloc(sizeof(*perm) + pool_ns_len + 1, GFP_NOFS);
 	if (!perm) {
 		err = -ENOMEM;
 		goto out_unlock;
@@ -1849,6 +1877,11 @@
 
 	perm->pool = pool;
 	perm->perm = have;
+	perm->pool_ns_len = pool_ns_len;
+	if (pool_ns_len > 0)
+		memcpy(perm->pool_ns, pool_ns->str, pool_ns_len);
+	perm->pool_ns[pool_ns_len] = 0;
+
 	rb_link_node(&perm->node, parent, p);
 	rb_insert_color(&perm->node, &mdsc->pool_perm_tree);
 	err = 0;
@@ -1860,19 +1893,20 @@
 out:
 	if (!err)
 		err = have;
-	dout("__ceph_pool_perm_get pool %lld result = %d\n", pool, err);
+	if (pool_ns)
+		dout("__ceph_pool_perm_get pool %lld ns %.*s result = %d\n",
+		     pool, (int)pool_ns->len, pool_ns->str, err);
+	else
+		dout("__ceph_pool_perm_get pool %lld result = %d\n", pool, err);
 	return err;
 }
 
 int ceph_pool_perm_check(struct ceph_inode_info *ci, int need)
 {
 	s64 pool;
+	struct ceph_string *pool_ns;
 	int ret, flags;
 
-	/* does not support pool namespace yet */
-	if (ci->i_pool_ns_len)
-		return -EIO;
-
 	if (ceph_test_mount_opt(ceph_inode_to_client(&ci->vfs_inode),
 				NOPOOLPERM))
 		return 0;
@@ -1896,7 +1930,9 @@
 		return 0;
 	}
 
-	ret = __ceph_pool_perm_get(ci, pool);
+	pool_ns = ceph_try_get_string(ci->i_layout.pool_ns);
+	ret = __ceph_pool_perm_get(ci, pool, pool_ns);
+	ceph_put_string(pool_ns);
 	if (ret < 0)
 		return ret;
 
@@ -1907,8 +1943,9 @@
 		flags |= CEPH_I_POOL_WR;
 
 	spin_lock(&ci->i_ceph_lock);
-	if (pool == ci->i_layout.pool_id) {
-		ci->i_ceph_flags = flags;
+	if (pool == ci->i_layout.pool_id &&
+	    pool_ns == rcu_dereference_raw(ci->i_layout.pool_ns)) {
+		ci->i_ceph_flags |= flags;
         } else {
 		pool = ci->i_layout.pool_id;
 		flags = ci->i_ceph_flags;
diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c
index f24722d..0a9406a 100644
--- a/fs/ceph/caps.c
+++ b/fs/ceph/caps.c
@@ -2779,12 +2779,11 @@
  */
 static void handle_cap_grant(struct ceph_mds_client *mdsc,
 			     struct inode *inode, struct ceph_mds_caps *grant,
-			     u64 inline_version,
-			     void *inline_data, int inline_len,
+			     struct ceph_string **pns, u64 inline_version,
+			     void *inline_data, u32 inline_len,
 			     struct ceph_buffer *xattr_buf,
 			     struct ceph_mds_session *session,
-			     struct ceph_cap *cap, int issued,
-			     u32 pool_ns_len)
+			     struct ceph_cap *cap, int issued)
 	__releases(ci->i_ceph_lock)
 	__releases(mdsc->snap_rwsem)
 {
@@ -2896,11 +2895,18 @@
 	if (newcaps & (CEPH_CAP_ANY_FILE_RD | CEPH_CAP_ANY_FILE_WR)) {
 		/* file layout may have changed */
 		s64 old_pool = ci->i_layout.pool_id;
+		struct ceph_string *old_ns;
+
 		ceph_file_layout_from_legacy(&ci->i_layout, &grant->layout);
-		ci->i_pool_ns_len = pool_ns_len;
-		if (ci->i_layout.pool_id != old_pool)
+		old_ns = rcu_dereference_protected(ci->i_layout.pool_ns,
+					lockdep_is_held(&ci->i_ceph_lock));
+		rcu_assign_pointer(ci->i_layout.pool_ns, *pns);
+
+		if (ci->i_layout.pool_id != old_pool || *pns != old_ns)
 			ci->i_ceph_flags &= ~CEPH_I_POOL_PERM;
 
+		*pns = old_ns;
+
 		/* size/truncate_seq? */
 		queue_trunc = ceph_fill_file_size(inode, issued,
 					le32_to_cpu(grant->truncate_seq),
@@ -3423,20 +3429,18 @@
 	struct ceph_cap *cap;
 	struct ceph_mds_caps *h;
 	struct ceph_mds_cap_peer *peer = NULL;
-	struct ceph_snap_realm *realm;
+	struct ceph_snap_realm *realm = NULL;
+	struct ceph_string *pool_ns = NULL;
 	int mds = session->s_mds;
 	int op, issued;
 	u32 seq, mseq;
 	struct ceph_vino vino;
-	u64 cap_id;
-	u64 size, max_size;
 	u64 tid;
 	u64 inline_version = 0;
 	void *inline_data = NULL;
 	u32  inline_len = 0;
 	void *snaptrace;
 	size_t snaptrace_len;
-	u32 pool_ns_len = 0;
 	void *p, *end;
 
 	dout("handle_caps from mds%d\n", mds);
@@ -3450,11 +3454,8 @@
 	op = le32_to_cpu(h->op);
 	vino.ino = le64_to_cpu(h->ino);
 	vino.snap = CEPH_NOSNAP;
-	cap_id = le64_to_cpu(h->cap_id);
 	seq = le32_to_cpu(h->seq);
 	mseq = le32_to_cpu(h->migrate_seq);
-	size = le64_to_cpu(h->size);
-	max_size = le64_to_cpu(h->max_size);
 
 	snaptrace = h + 1;
 	snaptrace_len = le32_to_cpu(h->snap_trace_len);
@@ -3493,6 +3494,7 @@
 		u64 flush_tid;
 		u32 caller_uid, caller_gid;
 		u32 osd_epoch_barrier;
+		u32 pool_ns_len;
 		/* version >= 5 */
 		ceph_decode_32_safe(&p, end, osd_epoch_barrier, bad);
 		/* version >= 6 */
@@ -3502,6 +3504,11 @@
 		ceph_decode_32_safe(&p, end, caller_gid, bad);
 		/* version >= 8 */
 		ceph_decode_32_safe(&p, end, pool_ns_len, bad);
+		if (pool_ns_len > 0) {
+			ceph_decode_need(&p, end, pool_ns_len, bad);
+			pool_ns = ceph_find_or_create_string(p, pool_ns_len);
+			p += pool_ns_len;
+		}
 	}
 
 	/* lookup ino */
@@ -3522,7 +3529,7 @@
 			cap = ceph_get_cap(mdsc, NULL);
 			cap->cap_ino = vino.ino;
 			cap->queue_release = 1;
-			cap->cap_id = cap_id;
+			cap->cap_id = le64_to_cpu(h->cap_id);
 			cap->mseq = mseq;
 			cap->seq = seq;
 			spin_lock(&session->s_cap_lock);
@@ -3557,10 +3564,9 @@
 		}
 		handle_cap_import(mdsc, inode, h, peer, session,
 				  &cap, &issued);
-		handle_cap_grant(mdsc, inode, h,
+		handle_cap_grant(mdsc, inode, h, &pool_ns,
 				 inline_version, inline_data, inline_len,
-				 msg->middle, session, cap, issued,
-				 pool_ns_len);
+				 msg->middle, session, cap, issued);
 		if (realm)
 			ceph_put_snap_realm(mdsc, realm);
 		goto done_unlocked;
@@ -3582,10 +3588,9 @@
 	case CEPH_CAP_OP_GRANT:
 		__ceph_caps_issued(ci, &issued);
 		issued |= __ceph_caps_dirty(ci);
-		handle_cap_grant(mdsc, inode, h,
+		handle_cap_grant(mdsc, inode, h, &pool_ns,
 				 inline_version, inline_data, inline_len,
-				 msg->middle, session, cap, issued,
-				 pool_ns_len);
+				 msg->middle, session, cap, issued);
 		goto done_unlocked;
 
 	case CEPH_CAP_OP_FLUSH_ACK:
@@ -3616,6 +3621,7 @@
 	mutex_unlock(&session->s_mutex);
 done_unlocked:
 	iput(inode);
+	ceph_put_string(pool_ns);
 	return;
 
 bad:
diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c
index d035e0a..dc03256 100644
--- a/fs/ceph/inode.c
+++ b/fs/ceph/inode.c
@@ -447,7 +447,6 @@
 
 	memset(&ci->i_dir_layout, 0, sizeof(ci->i_dir_layout));
 	RCU_INIT_POINTER(ci->i_layout.pool_ns, NULL);
-	ci->i_pool_ns_len = 0;
 
 	ci->i_fragtree = RB_ROOT;
 	mutex_init(&ci->i_fragtree_mutex);
@@ -571,7 +570,7 @@
 	if (ci->i_xattrs.prealloc_blob)
 		ceph_buffer_put(ci->i_xattrs.prealloc_blob);
 
-	ceph_put_string(ci->i_layout.pool_ns);
+	ceph_put_string(rcu_dereference_raw(ci->i_layout.pool_ns));
 
 	call_rcu(&inode->i_rcu, ceph_i_callback);
 }
@@ -736,6 +735,7 @@
 	int issued = 0, implemented, new_issued;
 	struct timespec mtime, atime, ctime;
 	struct ceph_buffer *xattr_blob = NULL;
+	struct ceph_string *pool_ns = NULL;
 	struct ceph_cap *new_cap = NULL;
 	int err = 0;
 	bool wake = false;
@@ -763,6 +763,10 @@
 			       iinfo->xattr_len);
 	}
 
+	if (iinfo->pool_ns_len > 0)
+		pool_ns = ceph_find_or_create_string(iinfo->pool_ns_data,
+						     iinfo->pool_ns_len);
+
 	spin_lock(&ci->i_ceph_lock);
 
 	/*
@@ -818,11 +822,18 @@
 	if (new_version ||
 	    (new_issued & (CEPH_CAP_ANY_FILE_RD | CEPH_CAP_ANY_FILE_WR))) {
 		s64 old_pool = ci->i_layout.pool_id;
+		struct ceph_string *old_ns;
+
 		ceph_file_layout_from_legacy(&ci->i_layout, &info->layout);
-		ci->i_pool_ns_len = iinfo->pool_ns_len;
-		if (ci->i_layout.pool_id != old_pool)
+		old_ns = rcu_dereference_protected(ci->i_layout.pool_ns,
+					lockdep_is_held(&ci->i_ceph_lock));
+		rcu_assign_pointer(ci->i_layout.pool_ns, pool_ns);
+
+		if (ci->i_layout.pool_id != old_pool || pool_ns != old_ns)
 			ci->i_ceph_flags &= ~CEPH_I_POOL_PERM;
 
+		pool_ns = old_ns;
+
 		queue_trunc = ceph_fill_file_size(inode, issued,
 					le32_to_cpu(info->truncate_seq),
 					le64_to_cpu(info->truncate_size),
@@ -989,6 +1000,7 @@
 		ceph_put_cap(mdsc, new_cap);
 	if (xattr_blob)
 		ceph_buffer_put(xattr_blob);
+	ceph_put_string(pool_ns);
 	return err;
 }
 
diff --git a/fs/ceph/ioctl.c b/fs/ceph/ioctl.c
index 843dd31a0..6a30101 100644
--- a/fs/ceph/ioctl.c
+++ b/fs/ceph/ioctl.c
@@ -213,9 +213,12 @@
 		 ceph_ino(inode), dl.object_no);
 
 	oloc.pool = ci->i_layout.pool_id;
+	oloc.pool_ns = ceph_try_get_string(ci->i_layout.pool_ns);
 	ceph_oid_printf(&oid, "%s", dl.object_name);
 
 	r = ceph_object_locator_to_pg(osdc->osdmap, &oid, &oloc, &pgid);
+
+	ceph_oloc_destroy(&oloc);
 	if (r < 0) {
 		up_read(&osdc->lock);
 		return r;
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index 2103b82..46641bb 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -100,12 +100,15 @@
 	} else
 		info->inline_version = CEPH_INLINE_NONE;
 
+	info->pool_ns_len = 0;
+	info->pool_ns_data = NULL;
 	if (features & CEPH_FEATURE_FS_FILE_LAYOUT_V2) {
 		ceph_decode_32_safe(p, end, info->pool_ns_len, bad);
-		ceph_decode_need(p, end, info->pool_ns_len, bad);
-		*p += info->pool_ns_len;
-	} else {
-		info->pool_ns_len = 0;
+		if (info->pool_ns_len > 0) {
+			ceph_decode_need(p, end, info->pool_ns_len, bad);
+			info->pool_ns_data = *p;
+			*p += info->pool_ns_len;
+		}
 	}
 
 	return 0;
@@ -2292,14 +2295,6 @@
 		ceph_get_cap_refs(ceph_inode(req->r_old_dentry_dir),
 				  CEPH_CAP_PIN);
 
-	/* deny access to directories with pool_ns layouts */
-	if (req->r_inode && S_ISDIR(req->r_inode->i_mode) &&
-	    ceph_inode(req->r_inode)->i_pool_ns_len)
-		return -EIO;
-	if (req->r_locked_dir &&
-	    ceph_inode(req->r_locked_dir)->i_pool_ns_len)
-		return -EIO;
-
 	/* issue */
 	mutex_lock(&mdsc->mutex);
 	__register_request(mdsc, req, dir);
diff --git a/fs/ceph/mds_client.h b/fs/ceph/mds_client.h
index 75ecf96..2ce8e9f 100644
--- a/fs/ceph/mds_client.h
+++ b/fs/ceph/mds_client.h
@@ -45,6 +45,7 @@
 	u32 inline_len;
 	char *inline_data;
 	u32 pool_ns_len;
+	char *pool_ns_data;
 };
 
 struct ceph_mds_reply_dir_entry {
@@ -277,6 +278,8 @@
 	struct rb_node node;
 	int perm;
 	s64 pool;
+	size_t pool_ns_len;
+	char pool_ns[];
 };
 
 /*
diff --git a/fs/ceph/super.h b/fs/ceph/super.h
index 0168b49..7ceab18 100644
--- a/fs/ceph/super.h
+++ b/fs/ceph/super.h
@@ -287,7 +287,6 @@
 
 	struct ceph_dir_layout i_dir_layout;
 	struct ceph_file_layout i_layout;
-	size_t i_pool_ns_len;
 	char *i_symlink;
 
 	/* for dirs */
diff --git a/fs/ceph/xattr.c b/fs/ceph/xattr.c
index 5377c9c..adc2318 100644
--- a/fs/ceph/xattr.c
+++ b/fs/ceph/xattr.c
@@ -57,56 +57,69 @@
 
 static bool ceph_vxattrcb_layout_exists(struct ceph_inode_info *ci)
 {
-	size_t s;
-	char *p = (char *)&ci->i_layout;
-
-	for (s = 0; s < sizeof(ci->i_layout); s++, p++)
-		if (*p)
-			return true;
-	return false;
+	struct ceph_file_layout *fl = &ci->i_layout;
+	return (fl->stripe_unit > 0 || fl->stripe_count > 0 ||
+		fl->object_size > 0 || fl->pool_id >= 0 ||
+		rcu_dereference_raw(fl->pool_ns) != NULL);
 }
 
 static size_t ceph_vxattrcb_layout(struct ceph_inode_info *ci, char *val,
 				   size_t size)
 {
-	int ret;
 	struct ceph_fs_client *fsc = ceph_sb_to_client(ci->vfs_inode.i_sb);
 	struct ceph_osd_client *osdc = &fsc->client->osdc;
+	struct ceph_string *pool_ns;
 	s64 pool = ci->i_layout.pool_id;
 	const char *pool_name;
+	const char *ns_field = " pool_namespace=";
 	char buf[128];
+	size_t len, total_len = 0;
+	int ret;
+
+	pool_ns = ceph_try_get_string(ci->i_layout.pool_ns);
 
 	dout("ceph_vxattrcb_layout %p\n", &ci->vfs_inode);
 	down_read(&osdc->lock);
 	pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, pool);
 	if (pool_name) {
-		size_t len = strlen(pool_name);
-		ret = snprintf(buf, sizeof(buf),
+		len = snprintf(buf, sizeof(buf),
 		"stripe_unit=%u stripe_count=%u object_size=%u pool=",
 		ci->i_layout.stripe_unit, ci->i_layout.stripe_count,
 	        ci->i_layout.object_size);
-		if (!size) {
-			ret += len;
-		} else if (ret + len > size) {
-			ret = -ERANGE;
-		} else {
-			memcpy(val, buf, ret);
-			memcpy(val + ret, pool_name, len);
-			ret += len;
-		}
+		total_len = len + strlen(pool_name);
 	} else {
-		ret = snprintf(buf, sizeof(buf),
+		len = snprintf(buf, sizeof(buf),
 		"stripe_unit=%u stripe_count=%u object_size=%u pool=%lld",
 		ci->i_layout.stripe_unit, ci->i_layout.stripe_count,
 	        ci->i_layout.object_size, (unsigned long long)pool);
-		if (size) {
-			if (ret <= size)
-				memcpy(val, buf, ret);
-			else
-				ret = -ERANGE;
+		total_len = len;
+	}
+
+	if (pool_ns)
+		total_len += strlen(ns_field) + pool_ns->len;
+
+	if (!size) {
+		ret = total_len;
+	} else if (total_len > size) {
+		ret = -ERANGE;
+	} else {
+		memcpy(val, buf, len);
+		ret = len;
+		if (pool_name) {
+			len = strlen(pool_name);
+			memcpy(val + ret, pool_name, len);
+			ret += len;
+		}
+		if (pool_ns) {
+			len = strlen(ns_field);
+			memcpy(val + ret, ns_field, len);
+			ret += len;
+			memcpy(val + ret, pool_ns->str, pool_ns->len);
+			ret += pool_ns->len;
 		}
 	}
 	up_read(&osdc->lock);
+	ceph_put_string(pool_ns);
 	return ret;
 }
 
@@ -147,6 +160,18 @@
 	return ret;
 }
 
+static size_t ceph_vxattrcb_layout_pool_namespace(struct ceph_inode_info *ci,
+						  char *val, size_t size)
+{
+	int ret = 0;
+	struct ceph_string *ns = ceph_try_get_string(ci->i_layout.pool_ns);
+	if (ns) {
+		ret = snprintf(val, size, "%.*s", (int)ns->len, ns->str);
+		ceph_put_string(ns);
+	}
+	return ret;
+}
+
 /* directories */
 
 static size_t ceph_vxattrcb_dir_entries(struct ceph_inode_info *ci, char *val,
@@ -235,6 +260,7 @@
 	XATTR_LAYOUT_FIELD(dir, layout, stripe_count),
 	XATTR_LAYOUT_FIELD(dir, layout, object_size),
 	XATTR_LAYOUT_FIELD(dir, layout, pool),
+	XATTR_LAYOUT_FIELD(dir, layout, pool_namespace),
 	XATTR_NAME_CEPH(dir, entries),
 	XATTR_NAME_CEPH(dir, files),
 	XATTR_NAME_CEPH(dir, subdirs),
@@ -262,6 +288,7 @@
 	XATTR_LAYOUT_FIELD(file, layout, stripe_count),
 	XATTR_LAYOUT_FIELD(file, layout, object_size),
 	XATTR_LAYOUT_FIELD(file, layout, pool),
+	XATTR_LAYOUT_FIELD(file, layout, pool_namespace),
 	{ .name = NULL, 0 }	/* Required table terminator */
 };
 static size_t ceph_file_vxattrs_name_size;	/* total size of all names */