ceph: rados pool namespace support

This patch adds codes that decode pool namespace information in
cap message and request reply. Pool namespace is saved in i_layout,
it will be passed to libceph when doing read/write.

Signed-off-by: Yan, Zheng <zyan@redhat.com>
diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c
index f24722d..0a9406a 100644
--- a/fs/ceph/caps.c
+++ b/fs/ceph/caps.c
@@ -2779,12 +2779,11 @@
  */
 static void handle_cap_grant(struct ceph_mds_client *mdsc,
 			     struct inode *inode, struct ceph_mds_caps *grant,
-			     u64 inline_version,
-			     void *inline_data, int inline_len,
+			     struct ceph_string **pns, u64 inline_version,
+			     void *inline_data, u32 inline_len,
 			     struct ceph_buffer *xattr_buf,
 			     struct ceph_mds_session *session,
-			     struct ceph_cap *cap, int issued,
-			     u32 pool_ns_len)
+			     struct ceph_cap *cap, int issued)
 	__releases(ci->i_ceph_lock)
 	__releases(mdsc->snap_rwsem)
 {
@@ -2896,11 +2895,18 @@
 	if (newcaps & (CEPH_CAP_ANY_FILE_RD | CEPH_CAP_ANY_FILE_WR)) {
 		/* file layout may have changed */
 		s64 old_pool = ci->i_layout.pool_id;
+		struct ceph_string *old_ns;
+
 		ceph_file_layout_from_legacy(&ci->i_layout, &grant->layout);
-		ci->i_pool_ns_len = pool_ns_len;
-		if (ci->i_layout.pool_id != old_pool)
+		old_ns = rcu_dereference_protected(ci->i_layout.pool_ns,
+					lockdep_is_held(&ci->i_ceph_lock));
+		rcu_assign_pointer(ci->i_layout.pool_ns, *pns);
+
+		if (ci->i_layout.pool_id != old_pool || *pns != old_ns)
 			ci->i_ceph_flags &= ~CEPH_I_POOL_PERM;
 
+		*pns = old_ns;
+
 		/* size/truncate_seq? */
 		queue_trunc = ceph_fill_file_size(inode, issued,
 					le32_to_cpu(grant->truncate_seq),
@@ -3423,20 +3429,18 @@
 	struct ceph_cap *cap;
 	struct ceph_mds_caps *h;
 	struct ceph_mds_cap_peer *peer = NULL;
-	struct ceph_snap_realm *realm;
+	struct ceph_snap_realm *realm = NULL;
+	struct ceph_string *pool_ns = NULL;
 	int mds = session->s_mds;
 	int op, issued;
 	u32 seq, mseq;
 	struct ceph_vino vino;
-	u64 cap_id;
-	u64 size, max_size;
 	u64 tid;
 	u64 inline_version = 0;
 	void *inline_data = NULL;
 	u32  inline_len = 0;
 	void *snaptrace;
 	size_t snaptrace_len;
-	u32 pool_ns_len = 0;
 	void *p, *end;
 
 	dout("handle_caps from mds%d\n", mds);
@@ -3450,11 +3454,8 @@
 	op = le32_to_cpu(h->op);
 	vino.ino = le64_to_cpu(h->ino);
 	vino.snap = CEPH_NOSNAP;
-	cap_id = le64_to_cpu(h->cap_id);
 	seq = le32_to_cpu(h->seq);
 	mseq = le32_to_cpu(h->migrate_seq);
-	size = le64_to_cpu(h->size);
-	max_size = le64_to_cpu(h->max_size);
 
 	snaptrace = h + 1;
 	snaptrace_len = le32_to_cpu(h->snap_trace_len);
@@ -3493,6 +3494,7 @@
 		u64 flush_tid;
 		u32 caller_uid, caller_gid;
 		u32 osd_epoch_barrier;
+		u32 pool_ns_len;
 		/* version >= 5 */
 		ceph_decode_32_safe(&p, end, osd_epoch_barrier, bad);
 		/* version >= 6 */
@@ -3502,6 +3504,11 @@
 		ceph_decode_32_safe(&p, end, caller_gid, bad);
 		/* version >= 8 */
 		ceph_decode_32_safe(&p, end, pool_ns_len, bad);
+		if (pool_ns_len > 0) {
+			ceph_decode_need(&p, end, pool_ns_len, bad);
+			pool_ns = ceph_find_or_create_string(p, pool_ns_len);
+			p += pool_ns_len;
+		}
 	}
 
 	/* lookup ino */
@@ -3522,7 +3529,7 @@
 			cap = ceph_get_cap(mdsc, NULL);
 			cap->cap_ino = vino.ino;
 			cap->queue_release = 1;
-			cap->cap_id = cap_id;
+			cap->cap_id = le64_to_cpu(h->cap_id);
 			cap->mseq = mseq;
 			cap->seq = seq;
 			spin_lock(&session->s_cap_lock);
@@ -3557,10 +3564,9 @@
 		}
 		handle_cap_import(mdsc, inode, h, peer, session,
 				  &cap, &issued);
-		handle_cap_grant(mdsc, inode, h,
+		handle_cap_grant(mdsc, inode, h, &pool_ns,
 				 inline_version, inline_data, inline_len,
-				 msg->middle, session, cap, issued,
-				 pool_ns_len);
+				 msg->middle, session, cap, issued);
 		if (realm)
 			ceph_put_snap_realm(mdsc, realm);
 		goto done_unlocked;
@@ -3582,10 +3588,9 @@
 	case CEPH_CAP_OP_GRANT:
 		__ceph_caps_issued(ci, &issued);
 		issued |= __ceph_caps_dirty(ci);
-		handle_cap_grant(mdsc, inode, h,
+		handle_cap_grant(mdsc, inode, h, &pool_ns,
 				 inline_version, inline_data, inline_len,
-				 msg->middle, session, cap, issued,
-				 pool_ns_len);
+				 msg->middle, session, cap, issued);
 		goto done_unlocked;
 
 	case CEPH_CAP_OP_FLUSH_ACK:
@@ -3616,6 +3621,7 @@
 	mutex_unlock(&session->s_mutex);
 done_unlocked:
 	iput(inode);
+	ceph_put_string(pool_ns);
 	return;
 
 bad: