ceph: flush inline version

After converting inline data to normal data, client need to flush
the new i_inline_version (CEPH_INLINE_NONE) to MDS. This commit makes
cap messages (sent to MDS) contain inline_version and inline_data.
Client always converts inline data to normal data before data write,
so the inline data length part is always zero.

Signed-off-by: Yan, Zheng <zyan@redhat.com>
diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c
index 795afe3..b93c631 100644
--- a/fs/ceph/caps.c
+++ b/fs/ceph/caps.c
@@ -975,10 +975,12 @@
 			kuid_t uid, kgid_t gid, umode_t mode,
 			u64 xattr_version,
 			struct ceph_buffer *xattrs_buf,
-			u64 follows)
+			u64 follows, bool inline_data)
 {
 	struct ceph_mds_caps *fc;
 	struct ceph_msg *msg;
+	void *p;
+	size_t extra_len;
 
 	dout("send_cap_msg %s %llx %llx caps %s wanted %s dirty %s"
 	     " seq %u/%u mseq %u follows %lld size %llu/%llu"
@@ -988,7 +990,10 @@
 	     seq, issue_seq, mseq, follows, size, max_size,
 	     xattr_version, xattrs_buf ? (int)xattrs_buf->vec.iov_len : 0);
 
-	msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPS, sizeof(*fc), GFP_NOFS, false);
+	/* flock buffer size + inline version + inline data size */
+	extra_len = 4 + 8 + 4;
+	msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPS, sizeof(*fc) + extra_len,
+			   GFP_NOFS, false);
 	if (!msg)
 		return -ENOMEM;
 
@@ -1020,6 +1025,14 @@
 	fc->gid = cpu_to_le32(from_kgid(&init_user_ns, gid));
 	fc->mode = cpu_to_le32(mode);
 
+	p = fc + 1;
+	/* flock buffer size */
+	ceph_encode_32(&p, 0);
+	/* inline version */
+	ceph_encode_64(&p, inline_data ? 0 : CEPH_INLINE_NONE);
+	/* inline data size */
+	ceph_encode_32(&p, 0);
+
 	fc->xattr_version = cpu_to_le64(xattr_version);
 	if (xattrs_buf) {
 		msg->middle = ceph_buffer_get(xattrs_buf);
@@ -1126,6 +1139,7 @@
 	u64 flush_tid = 0;
 	int i;
 	int ret;
+	bool inline_data;
 
 	held = cap->issued | cap->implemented;
 	revoking = cap->implemented & ~cap->issued;
@@ -1209,13 +1223,15 @@
 		xattr_version = ci->i_xattrs.version;
 	}
 
+	inline_data = ci->i_inline_version != CEPH_INLINE_NONE;
+
 	spin_unlock(&ci->i_ceph_lock);
 
 	ret = send_cap_msg(session, ceph_vino(inode).ino, cap_id,
 		op, keep, want, flushing, seq, flush_tid, issue_seq, mseq,
 		size, max_size, &mtime, &atime, time_warp_seq,
 		uid, gid, mode, xattr_version, xattr_blob,
-		follows);
+		follows, inline_data);
 	if (ret < 0) {
 		dout("error sending cap msg, must requeue %p\n", inode);
 		delayed = 1;
@@ -1336,7 +1352,7 @@
 			     capsnap->time_warp_seq,
 			     capsnap->uid, capsnap->gid, capsnap->mode,
 			     capsnap->xattr_version, capsnap->xattr_blob,
-			     capsnap->follows);
+			     capsnap->follows, capsnap->inline_data);
 
 		next_follows = capsnap->follows + 1;
 		ceph_put_cap_snap(capsnap);
diff --git a/fs/ceph/snap.c b/fs/ceph/snap.c
index f64576e..ce35fbd 100644
--- a/fs/ceph/snap.c
+++ b/fs/ceph/snap.c
@@ -516,6 +516,8 @@
 			capsnap->xattr_version = 0;
 		}
 
+		capsnap->inline_data = ci->i_inline_version != CEPH_INLINE_NONE;
+
 		/* dirty page count moved from _head to this cap_snap;
 		   all subsequent writes page dirties occur _after_ this
 		   snapshot. */
diff --git a/fs/ceph/super.h b/fs/ceph/super.h
index 8197a3c..e1aa32d 100644
--- a/fs/ceph/super.h
+++ b/fs/ceph/super.h
@@ -161,6 +161,7 @@
 	u64 time_warp_seq;
 	int writing;   /* a sync write is still in progress */
 	int dirty_pages;     /* dirty pages awaiting writeback */
+	bool inline_data;
 };
 
 static inline void ceph_put_cap_snap(struct ceph_cap_snap *capsnap)