ceph: take snap_rwsem when accessing snap realm's cached_context

When ceph inode's i_head_snapc is NULL, __ceph_mark_dirty_caps()
accesses snap realm's cached_context. So we need take read lock
of snap_rwsem.

Signed-off-by: Yan, Zheng <zyan@redhat.com>
diff --git a/fs/ceph/xattr.c b/fs/ceph/xattr.c
index cd7ffad4..c6f7d9b 100644
--- a/fs/ceph/xattr.c
+++ b/fs/ceph/xattr.c
@@ -911,6 +911,7 @@
 	struct inode *inode = d_inode(dentry);
 	struct ceph_vxattr *vxattr;
 	struct ceph_inode_info *ci = ceph_inode(inode);
+	struct ceph_mds_client *mdsc = ceph_sb_to_client(dentry->d_sb)->mdsc;
 	int issued;
 	int err;
 	int dirty = 0;
@@ -920,6 +921,7 @@
 	char *newval = NULL;
 	struct ceph_inode_xattr *xattr = NULL;
 	int required_blob_size;
+	bool lock_snap_rwsem = false;
 
 	if (!ceph_is_valid_xattr(name))
 		return -EOPNOTSUPP;
@@ -951,9 +953,20 @@
 	spin_lock(&ci->i_ceph_lock);
 retry:
 	issued = __ceph_caps_issued(ci, NULL);
-	dout("setxattr %p issued %s\n", inode, ceph_cap_string(issued));
 	if (ci->i_xattrs.version == 0 || !(issued & CEPH_CAP_XATTR_EXCL))
 		goto do_sync;
+
+	if (!lock_snap_rwsem && !ci->i_head_snapc) {
+		lock_snap_rwsem = true;
+		if (!down_read_trylock(&mdsc->snap_rwsem)) {
+			spin_unlock(&ci->i_ceph_lock);
+			down_read(&mdsc->snap_rwsem);
+			spin_lock(&ci->i_ceph_lock);
+			goto retry;
+		}
+	}
+
+	dout("setxattr %p issued %s\n", inode, ceph_cap_string(issued));
 	__build_xattrs(inode);
 
 	required_blob_size = __get_required_blob_size(ci, name_len, val_len);
@@ -966,7 +979,7 @@
 		dout(" preaallocating new blob size=%d\n", required_blob_size);
 		blob = ceph_buffer_new(required_blob_size, GFP_NOFS);
 		if (!blob)
-			goto out;
+			goto do_sync_unlocked;
 		spin_lock(&ci->i_ceph_lock);
 		if (ci->i_xattrs.prealloc_blob)
 			ceph_buffer_put(ci->i_xattrs.prealloc_blob);
@@ -984,6 +997,8 @@
 	}
 
 	spin_unlock(&ci->i_ceph_lock);
+	if (lock_snap_rwsem)
+		up_read(&mdsc->snap_rwsem);
 	if (dirty)
 		__mark_inode_dirty(inode, dirty);
 	return err;
@@ -991,6 +1006,8 @@
 do_sync:
 	spin_unlock(&ci->i_ceph_lock);
 do_sync_unlocked:
+	if (lock_snap_rwsem)
+		up_read(&mdsc->snap_rwsem);
 	err = ceph_sync_setxattr(dentry, name, value, size, flags);
 out:
 	kfree(newname);
@@ -1044,10 +1061,12 @@
 	struct inode *inode = d_inode(dentry);
 	struct ceph_vxattr *vxattr;
 	struct ceph_inode_info *ci = ceph_inode(inode);
+	struct ceph_mds_client *mdsc = ceph_sb_to_client(dentry->d_sb)->mdsc;
 	int issued;
 	int err;
 	int required_blob_size;
 	int dirty;
+	bool lock_snap_rwsem = false;
 
 	if (!ceph_is_valid_xattr(name))
 		return -EOPNOTSUPP;
@@ -1064,10 +1083,21 @@
 	spin_lock(&ci->i_ceph_lock);
 retry:
 	issued = __ceph_caps_issued(ci, NULL);
-	dout("removexattr %p issued %s\n", inode, ceph_cap_string(issued));
-
 	if (ci->i_xattrs.version == 0 || !(issued & CEPH_CAP_XATTR_EXCL))
 		goto do_sync;
+
+	if (!lock_snap_rwsem && !ci->i_head_snapc) {
+		lock_snap_rwsem = true;
+		if (!down_read_trylock(&mdsc->snap_rwsem)) {
+			spin_unlock(&ci->i_ceph_lock);
+			down_read(&mdsc->snap_rwsem);
+			spin_lock(&ci->i_ceph_lock);
+			goto retry;
+		}
+	}
+
+	dout("removexattr %p issued %s\n", inode, ceph_cap_string(issued));
+
 	__build_xattrs(inode);
 
 	required_blob_size = __get_required_blob_size(ci, 0, 0);
@@ -1080,7 +1110,7 @@
 		dout(" preaallocating new blob size=%d\n", required_blob_size);
 		blob = ceph_buffer_new(required_blob_size, GFP_NOFS);
 		if (!blob)
-			goto out;
+			goto do_sync_unlocked;
 		spin_lock(&ci->i_ceph_lock);
 		if (ci->i_xattrs.prealloc_blob)
 			ceph_buffer_put(ci->i_xattrs.prealloc_blob);
@@ -1094,14 +1124,17 @@
 	ci->i_xattrs.dirty = true;
 	inode->i_ctime = CURRENT_TIME;
 	spin_unlock(&ci->i_ceph_lock);
+	if (lock_snap_rwsem)
+		up_read(&mdsc->snap_rwsem);
 	if (dirty)
 		__mark_inode_dirty(inode, dirty);
 	return err;
 do_sync:
 	spin_unlock(&ci->i_ceph_lock);
 do_sync_unlocked:
+	if (lock_snap_rwsem)
+		up_read(&mdsc->snap_rwsem);
 	err = ceph_send_removexattr(dentry, name);
-out:
 	return err;
 }