Merge tag 'ceph-for-4.8-rc1' of git://github.com/ceph/ceph-client

Pull Ceph updates from Ilya Dryomov:
 "The highlights are:

   - RADOS namespace support in libceph and CephFS (Zheng Yan and
     myself).  The stopgaps added in 4.5 to deny access to inodes in
     namespaces are removed and CEPH_FEATURE_FS_FILE_LAYOUT_V2 feature
     bit is now fully supported

   - A large rework of the MDS cap flushing code (Zheng Yan)

   - Handle some of ->d_revalidate() in RCU mode (Jeff Layton).  We were
     overly pessimistic before, bailing at the first sight of LOOKUP_RCU

  On top of that we've got a few CephFS bug fixes, a couple of cleanups
  and Arnd's workaround for a weird genksyms issue"

* tag 'ceph-for-4.8-rc1' of git://github.com/ceph/ceph-client: (34 commits)
  ceph: fix symbol versioning for ceph_monc_do_statfs
  ceph: Correctly return NXIO errors from ceph_llseek
  ceph: Mark the file cache as unreclaimable
  ceph: optimize cap flush waiting
  ceph: cleanup ceph_flush_snaps()
  ceph: kick cap flushes before sending other cap message
  ceph: introduce an inode flag to indicates if snapflush is needed
  ceph: avoid sending duplicated cap flush message
  ceph: unify cap flush and snapcap flush
  ceph: use list instead of rbtree to track cap flushes
  ceph: update types of some local varibles
  ceph: include 'follows' of pending snapflush in cap reconnect message
  ceph: update cap reconnect message to version 3
  ceph: mount non-default filesystem by name
  libceph: fsmap.user subscription support
  ceph: handle LOOKUP_RCU in ceph_d_revalidate
  ceph: allow dentry_lease_is_valid to work under RCU walk
  ceph: clear d_fsinfo pointer under d_lock
  ceph: remove ceph_mdsc_lease_release
  ceph: don't use ->d_time
  ...
diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index 4506620..1a04af6 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -1937,7 +1937,7 @@
 	osd_req->r_callback = rbd_osd_req_callback;
 	osd_req->r_priv = obj_request;
 
-	osd_req->r_base_oloc.pool = ceph_file_layout_pg_pool(rbd_dev->layout);
+	osd_req->r_base_oloc.pool = rbd_dev->layout.pool_id;
 	if (ceph_oid_aprintf(&osd_req->r_base_oid, GFP_NOIO, "%s",
 			     obj_request->object_name))
 		goto fail;
@@ -1991,7 +1991,7 @@
 	osd_req->r_callback = rbd_osd_req_callback;
 	osd_req->r_priv = obj_request;
 
-	osd_req->r_base_oloc.pool = ceph_file_layout_pg_pool(rbd_dev->layout);
+	osd_req->r_base_oloc.pool = rbd_dev->layout.pool_id;
 	if (ceph_oid_aprintf(&osd_req->r_base_oid, GFP_NOIO, "%s",
 			     obj_request->object_name))
 		goto fail;
@@ -3995,10 +3995,11 @@
 
 	/* Initialize the layout used for all rbd requests */
 
-	rbd_dev->layout.fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
-	rbd_dev->layout.fl_stripe_count = cpu_to_le32(1);
-	rbd_dev->layout.fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
-	rbd_dev->layout.fl_pg_pool = cpu_to_le32((u32) spec->pool_id);
+	rbd_dev->layout.stripe_unit = 1 << RBD_MAX_OBJ_ORDER;
+	rbd_dev->layout.stripe_count = 1;
+	rbd_dev->layout.object_size = 1 << RBD_MAX_OBJ_ORDER;
+	rbd_dev->layout.pool_id = spec->pool_id;
+	RCU_INIT_POINTER(rbd_dev->layout.pool_ns, NULL);
 
 	/*
 	 * If this is a mapping rbd_dev (as opposed to a parent one),
@@ -5187,7 +5188,7 @@
 
 	rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
 
-	rbd_dev->header_oloc.pool = ceph_file_layout_pg_pool(rbd_dev->layout);
+	rbd_dev->header_oloc.pool = rbd_dev->layout.pool_id;
 	if (rbd_dev->image_format == 1)
 		ret = ceph_oid_aprintf(&rbd_dev->header_oid, GFP_KERNEL, "%s%s",
 				       spec->image_name, RBD_SUFFIX);
diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c
index 26a9d10..d5b6f95 100644
--- a/fs/ceph/addr.c
+++ b/fs/ceph/addr.c
@@ -1730,7 +1730,8 @@
 	POOL_WRITE	= 2,
 };
 
-static int __ceph_pool_perm_get(struct ceph_inode_info *ci, u32 pool)
+static int __ceph_pool_perm_get(struct ceph_inode_info *ci,
+				s64 pool, struct ceph_string *pool_ns)
 {
 	struct ceph_fs_client *fsc = ceph_inode_to_client(&ci->vfs_inode);
 	struct ceph_mds_client *mdsc = fsc->mdsc;
@@ -1738,6 +1739,7 @@
 	struct rb_node **p, *parent;
 	struct ceph_pool_perm *perm;
 	struct page **pages;
+	size_t pool_ns_len;
 	int err = 0, err2 = 0, have = 0;
 
 	down_read(&mdsc->pool_perm_rwsem);
@@ -1749,17 +1751,31 @@
 		else if (pool > perm->pool)
 			p = &(*p)->rb_right;
 		else {
-			have = perm->perm;
-			break;
+			int ret = ceph_compare_string(pool_ns,
+						perm->pool_ns,
+						perm->pool_ns_len);
+			if (ret < 0)
+				p = &(*p)->rb_left;
+			else if (ret > 0)
+				p = &(*p)->rb_right;
+			else {
+				have = perm->perm;
+				break;
+			}
 		}
 	}
 	up_read(&mdsc->pool_perm_rwsem);
 	if (*p)
 		goto out;
 
-	dout("__ceph_pool_perm_get pool %u no perm cached\n", pool);
+	if (pool_ns)
+		dout("__ceph_pool_perm_get pool %lld ns %.*s no perm cached\n",
+		     pool, (int)pool_ns->len, pool_ns->str);
+	else
+		dout("__ceph_pool_perm_get pool %lld no perm cached\n", pool);
 
 	down_write(&mdsc->pool_perm_rwsem);
+	p = &mdsc->pool_perm_tree.rb_node;
 	parent = NULL;
 	while (*p) {
 		parent = *p;
@@ -1769,8 +1785,17 @@
 		else if (pool > perm->pool)
 			p = &(*p)->rb_right;
 		else {
-			have = perm->perm;
-			break;
+			int ret = ceph_compare_string(pool_ns,
+						perm->pool_ns,
+						perm->pool_ns_len);
+			if (ret < 0)
+				p = &(*p)->rb_left;
+			else if (ret > 0)
+				p = &(*p)->rb_right;
+			else {
+				have = perm->perm;
+				break;
+			}
 		}
 	}
 	if (*p) {
@@ -1788,6 +1813,8 @@
 	rd_req->r_flags = CEPH_OSD_FLAG_READ;
 	osd_req_op_init(rd_req, 0, CEPH_OSD_OP_STAT, 0);
 	rd_req->r_base_oloc.pool = pool;
+	if (pool_ns)
+		rd_req->r_base_oloc.pool_ns = ceph_get_string(pool_ns);
 	ceph_oid_printf(&rd_req->r_base_oid, "%llx.00000000", ci->i_vino.ino);
 
 	err = ceph_osdc_alloc_messages(rd_req, GFP_NOFS);
@@ -1841,7 +1868,8 @@
 		goto out_unlock;
 	}
 
-	perm = kmalloc(sizeof(*perm), GFP_NOFS);
+	pool_ns_len = pool_ns ? pool_ns->len : 0;
+	perm = kmalloc(sizeof(*perm) + pool_ns_len + 1, GFP_NOFS);
 	if (!perm) {
 		err = -ENOMEM;
 		goto out_unlock;
@@ -1849,6 +1877,11 @@
 
 	perm->pool = pool;
 	perm->perm = have;
+	perm->pool_ns_len = pool_ns_len;
+	if (pool_ns_len > 0)
+		memcpy(perm->pool_ns, pool_ns->str, pool_ns_len);
+	perm->pool_ns[pool_ns_len] = 0;
+
 	rb_link_node(&perm->node, parent, p);
 	rb_insert_color(&perm->node, &mdsc->pool_perm_tree);
 	err = 0;
@@ -1860,43 +1893,46 @@
 out:
 	if (!err)
 		err = have;
-	dout("__ceph_pool_perm_get pool %u result = %d\n", pool, err);
+	if (pool_ns)
+		dout("__ceph_pool_perm_get pool %lld ns %.*s result = %d\n",
+		     pool, (int)pool_ns->len, pool_ns->str, err);
+	else
+		dout("__ceph_pool_perm_get pool %lld result = %d\n", pool, err);
 	return err;
 }
 
 int ceph_pool_perm_check(struct ceph_inode_info *ci, int need)
 {
-	u32 pool;
+	s64 pool;
+	struct ceph_string *pool_ns;
 	int ret, flags;
 
-	/* does not support pool namespace yet */
-	if (ci->i_pool_ns_len)
-		return -EIO;
-
 	if (ceph_test_mount_opt(ceph_inode_to_client(&ci->vfs_inode),
 				NOPOOLPERM))
 		return 0;
 
 	spin_lock(&ci->i_ceph_lock);
 	flags = ci->i_ceph_flags;
-	pool = ceph_file_layout_pg_pool(ci->i_layout);
+	pool = ci->i_layout.pool_id;
 	spin_unlock(&ci->i_ceph_lock);
 check:
 	if (flags & CEPH_I_POOL_PERM) {
 		if ((need & CEPH_CAP_FILE_RD) && !(flags & CEPH_I_POOL_RD)) {
-			dout("ceph_pool_perm_check pool %u no read perm\n",
+			dout("ceph_pool_perm_check pool %lld no read perm\n",
 			     pool);
 			return -EPERM;
 		}
 		if ((need & CEPH_CAP_FILE_WR) && !(flags & CEPH_I_POOL_WR)) {
-			dout("ceph_pool_perm_check pool %u no write perm\n",
+			dout("ceph_pool_perm_check pool %lld no write perm\n",
 			     pool);
 			return -EPERM;
 		}
 		return 0;
 	}
 
-	ret = __ceph_pool_perm_get(ci, pool);
+	pool_ns = ceph_try_get_string(ci->i_layout.pool_ns);
+	ret = __ceph_pool_perm_get(ci, pool, pool_ns);
+	ceph_put_string(pool_ns);
 	if (ret < 0)
 		return ret;
 
@@ -1907,10 +1943,11 @@
 		flags |= CEPH_I_POOL_WR;
 
 	spin_lock(&ci->i_ceph_lock);
-	if (pool == ceph_file_layout_pg_pool(ci->i_layout)) {
-		ci->i_ceph_flags = flags;
+	if (pool == ci->i_layout.pool_id &&
+	    pool_ns == rcu_dereference_raw(ci->i_layout.pool_ns)) {
+		ci->i_ceph_flags |= flags;
         } else {
-		pool = ceph_file_layout_pg_pool(ci->i_layout);
+		pool = ci->i_layout.pool_id;
 		flags = ci->i_ceph_flags;
 	}
 	spin_unlock(&ci->i_ceph_lock);
diff --git a/fs/ceph/cache.c b/fs/ceph/cache.c
index 238c55b..5bc5d37 100644
--- a/fs/ceph/cache.c
+++ b/fs/ceph/cache.c
@@ -71,7 +71,7 @@
 					      &ceph_fscache_fsid_object_def,
 					      fsc, true);
 	if (!fsc->fscache)
-		pr_err("Unable to resgister fsid: %p fscache cookie", fsc);
+		pr_err("Unable to register fsid: %p fscache cookie\n", fsc);
 
 	return 0;
 }
diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c
index 6f60d0a..99115ca 100644
--- a/fs/ceph/caps.c
+++ b/fs/ceph/caps.c
@@ -40,6 +40,11 @@
  * cluster to release server state.
  */
 
+static u64 __get_oldest_flush_tid(struct ceph_mds_client *mdsc);
+static void __kick_flushing_caps(struct ceph_mds_client *mdsc,
+				 struct ceph_mds_session *session,
+				 struct ceph_inode_info *ci,
+				 u64 oldest_flush_tid);
 
 /*
  * Generate readable cap strings for debugging output.
@@ -849,12 +854,14 @@
  */
 int __ceph_caps_file_wanted(struct ceph_inode_info *ci)
 {
-	int want = 0;
-	int mode;
-	for (mode = 0; mode < CEPH_FILE_MODE_NUM; mode++)
-		if (ci->i_nr_by_mode[mode])
-			want |= ceph_caps_for_mode(mode);
-	return want;
+	int i, bits = 0;
+	for (i = 0; i < CEPH_FILE_MODE_BITS; i++) {
+		if (ci->i_nr_by_mode[i])
+			bits |= 1 << i;
+	}
+	if (bits == 0)
+		return 0;
+	return ceph_caps_for_mode(bits >> 1);
 }
 
 /*
@@ -991,7 +998,7 @@
 			u32 seq, u64 flush_tid, u64 oldest_flush_tid,
 			u32 issue_seq, u32 mseq, u64 size, u64 max_size,
 			struct timespec *mtime, struct timespec *atime,
-			struct timespec *ctime, u64 time_warp_seq,
+			struct timespec *ctime, u32 time_warp_seq,
 			kuid_t uid, kgid_t gid, umode_t mode,
 			u64 xattr_version,
 			struct ceph_buffer *xattrs_buf,
@@ -1116,8 +1123,8 @@
 	struct inode *inode = &ci->vfs_inode;
 	u64 cap_id = cap->cap_id;
 	int held, revoking, dropping, keep;
-	u64 seq, issue_seq, mseq, time_warp_seq, follows;
-	u64 size, max_size;
+	u64 follows, size, max_size;
+	u32 seq, issue_seq, mseq, time_warp_seq;
 	struct timespec mtime, atime, ctime;
 	int wake = 0;
 	umode_t mode;
@@ -1215,6 +1222,22 @@
 	return delayed;
 }
 
+static inline int __send_flush_snap(struct inode *inode,
+				    struct ceph_mds_session *session,
+				    struct ceph_cap_snap *capsnap,
+				    u32 mseq, u64 oldest_flush_tid)
+{
+	return send_cap_msg(session, ceph_vino(inode).ino, 0,
+			CEPH_CAP_OP_FLUSHSNAP, capsnap->issued, 0,
+			capsnap->dirty, 0, capsnap->cap_flush.tid,
+			oldest_flush_tid, 0, mseq, capsnap->size, 0,
+			&capsnap->mtime, &capsnap->atime,
+			&capsnap->ctime, capsnap->time_warp_seq,
+			capsnap->uid, capsnap->gid, capsnap->mode,
+			capsnap->xattr_version, capsnap->xattr_blob,
+			capsnap->follows, capsnap->inline_data);
+}
+
 /*
  * When a snapshot is taken, clients accumulate dirty metadata on
  * inodes with capabilities in ceph_cap_snaps to describe the file
@@ -1222,37 +1245,22 @@
  * asynchronously back to the MDS once sync writes complete and dirty
  * data is written out.
  *
- * Unless @kick is true, skip cap_snaps that were already sent to
- * the MDS (i.e., during this session).
- *
  * Called under i_ceph_lock.  Takes s_mutex as needed.
  */
-void __ceph_flush_snaps(struct ceph_inode_info *ci,
-			struct ceph_mds_session **psession,
-			int kick)
+static void __ceph_flush_snaps(struct ceph_inode_info *ci,
+			       struct ceph_mds_session *session)
 		__releases(ci->i_ceph_lock)
 		__acquires(ci->i_ceph_lock)
 {
 	struct inode *inode = &ci->vfs_inode;
-	int mds;
+	struct ceph_mds_client *mdsc = session->s_mdsc;
 	struct ceph_cap_snap *capsnap;
-	u32 mseq;
-	struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc;
-	struct ceph_mds_session *session = NULL; /* if session != NULL, we hold
-						    session->s_mutex */
-	u64 next_follows = 0;  /* keep track of how far we've gotten through the
-			     i_cap_snaps list, and skip these entries next time
-			     around to avoid an infinite loop */
+	u64 oldest_flush_tid = 0;
+	u64 first_tid = 1, last_tid = 0;
 
-	if (psession)
-		session = *psession;
+	dout("__flush_snaps %p session %p\n", inode, session);
 
-	dout("__flush_snaps %p\n", inode);
-retry:
 	list_for_each_entry(capsnap, &ci->i_cap_snaps, ci_item) {
-		/* avoid an infiniute loop after retry */
-		if (capsnap->follows < next_follows)
-			continue;
 		/*
 		 * we need to wait for sync writes to complete and for dirty
 		 * pages to be written out.
@@ -1263,97 +1271,129 @@
 		/* should be removed by ceph_try_drop_cap_snap() */
 		BUG_ON(!capsnap->need_flush);
 
-		/* pick mds, take s_mutex */
-		if (ci->i_auth_cap == NULL) {
-			dout("no auth cap (migrating?), doing nothing\n");
-			goto out;
-		}
-
 		/* only flush each capsnap once */
-		if (!kick && !list_empty(&capsnap->flushing_item)) {
-			dout("already flushed %p, skipping\n", capsnap);
+		if (capsnap->cap_flush.tid > 0) {
+			dout(" already flushed %p, skipping\n", capsnap);
 			continue;
 		}
 
-		mds = ci->i_auth_cap->session->s_mds;
-		mseq = ci->i_auth_cap->mseq;
-
-		if (session && session->s_mds != mds) {
-			dout("oops, wrong session %p mutex\n", session);
-			if (kick)
-				goto out;
-
-			mutex_unlock(&session->s_mutex);
-			ceph_put_mds_session(session);
-			session = NULL;
-		}
-		if (!session) {
-			spin_unlock(&ci->i_ceph_lock);
-			mutex_lock(&mdsc->mutex);
-			session = __ceph_lookup_mds_session(mdsc, mds);
-			mutex_unlock(&mdsc->mutex);
-			if (session) {
-				dout("inverting session/ino locks on %p\n",
-				     session);
-				mutex_lock(&session->s_mutex);
-			}
-			/*
-			 * if session == NULL, we raced against a cap
-			 * deletion or migration.  retry, and we'll
-			 * get a better @mds value next time.
-			 */
-			spin_lock(&ci->i_ceph_lock);
-			goto retry;
-		}
-
 		spin_lock(&mdsc->cap_dirty_lock);
-		capsnap->flush_tid = ++mdsc->last_cap_flush_tid;
+		capsnap->cap_flush.tid = ++mdsc->last_cap_flush_tid;
+		list_add_tail(&capsnap->cap_flush.g_list,
+			      &mdsc->cap_flush_list);
+		if (oldest_flush_tid == 0)
+			oldest_flush_tid = __get_oldest_flush_tid(mdsc);
+		if (list_empty(&ci->i_flushing_item)) {
+			list_add_tail(&ci->i_flushing_item,
+				      &session->s_cap_flushing);
+		}
 		spin_unlock(&mdsc->cap_dirty_lock);
 
+		list_add_tail(&capsnap->cap_flush.i_list,
+			      &ci->i_cap_flush_list);
+
+		if (first_tid == 1)
+			first_tid = capsnap->cap_flush.tid;
+		last_tid = capsnap->cap_flush.tid;
+	}
+
+	ci->i_ceph_flags &= ~CEPH_I_FLUSH_SNAPS;
+
+	while (first_tid <= last_tid) {
+		struct ceph_cap *cap = ci->i_auth_cap;
+		struct ceph_cap_flush *cf;
+		int ret;
+
+		if (!(cap && cap->session == session)) {
+			dout("__flush_snaps %p auth cap %p not mds%d, "
+			     "stop\n", inode, cap, session->s_mds);
+			break;
+		}
+
+		ret = -ENOENT;
+		list_for_each_entry(cf, &ci->i_cap_flush_list, i_list) {
+			if (cf->tid >= first_tid) {
+				ret = 0;
+				break;
+			}
+		}
+		if (ret < 0)
+			break;
+
+		first_tid = cf->tid + 1;
+
+		capsnap = container_of(cf, struct ceph_cap_snap, cap_flush);
 		atomic_inc(&capsnap->nref);
-		if (list_empty(&capsnap->flushing_item))
-			list_add_tail(&capsnap->flushing_item,
-				      &session->s_cap_snaps_flushing);
 		spin_unlock(&ci->i_ceph_lock);
 
-		dout("flush_snaps %p cap_snap %p follows %lld tid %llu\n",
-		     inode, capsnap, capsnap->follows, capsnap->flush_tid);
-		send_cap_msg(session, ceph_vino(inode).ino, 0,
-			     CEPH_CAP_OP_FLUSHSNAP, capsnap->issued, 0,
-			     capsnap->dirty, 0, capsnap->flush_tid, 0,
-			     0, mseq, capsnap->size, 0,
-			     &capsnap->mtime, &capsnap->atime,
-			     &capsnap->ctime, capsnap->time_warp_seq,
-			     capsnap->uid, capsnap->gid, capsnap->mode,
-			     capsnap->xattr_version, capsnap->xattr_blob,
-			     capsnap->follows, capsnap->inline_data);
+		dout("__flush_snaps %p capsnap %p tid %llu %s\n",
+		     inode, capsnap, cf->tid, ceph_cap_string(capsnap->dirty));
 
-		next_follows = capsnap->follows + 1;
+		ret = __send_flush_snap(inode, session, capsnap, cap->mseq,
+					oldest_flush_tid);
+		if (ret < 0) {
+			pr_err("__flush_snaps: error sending cap flushsnap, "
+			       "ino (%llx.%llx) tid %llu follows %llu\n",
+				ceph_vinop(inode), cf->tid, capsnap->follows);
+		}
+
 		ceph_put_cap_snap(capsnap);
-
 		spin_lock(&ci->i_ceph_lock);
+	}
+}
+
+void ceph_flush_snaps(struct ceph_inode_info *ci,
+		      struct ceph_mds_session **psession)
+{
+	struct inode *inode = &ci->vfs_inode;
+	struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc;
+	struct ceph_mds_session *session = *psession;
+	int mds;
+	dout("ceph_flush_snaps %p\n", inode);
+retry:
+	spin_lock(&ci->i_ceph_lock);
+	if (!(ci->i_ceph_flags & CEPH_I_FLUSH_SNAPS)) {
+		dout(" no capsnap needs flush, doing nothing\n");
+		goto out;
+	}
+	if (!ci->i_auth_cap) {
+		dout(" no auth cap (migrating?), doing nothing\n");
+		goto out;
+	}
+
+	mds = ci->i_auth_cap->session->s_mds;
+	if (session && session->s_mds != mds) {
+		dout(" oops, wrong session %p mutex\n", session);
+		mutex_unlock(&session->s_mutex);
+		ceph_put_mds_session(session);
+		session = NULL;
+	}
+	if (!session) {
+		spin_unlock(&ci->i_ceph_lock);
+		mutex_lock(&mdsc->mutex);
+		session = __ceph_lookup_mds_session(mdsc, mds);
+		mutex_unlock(&mdsc->mutex);
+		if (session) {
+			dout(" inverting session/ino locks on %p\n", session);
+			mutex_lock(&session->s_mutex);
+		}
 		goto retry;
 	}
 
+	__ceph_flush_snaps(ci, session);
+out:
+	spin_unlock(&ci->i_ceph_lock);
+
+	if (psession) {
+		*psession = session;
+	} else {
+		mutex_unlock(&session->s_mutex);
+		ceph_put_mds_session(session);
+	}
 	/* we flushed them all; remove this inode from the queue */
 	spin_lock(&mdsc->snap_flush_lock);
 	list_del_init(&ci->i_snap_flush_item);
 	spin_unlock(&mdsc->snap_flush_lock);
-
-out:
-	if (psession)
-		*psession = session;
-	else if (session) {
-		mutex_unlock(&session->s_mutex);
-		ceph_put_mds_session(session);
-	}
-}
-
-static void ceph_flush_snaps(struct ceph_inode_info *ci)
-{
-	spin_lock(&ci->i_ceph_lock);
-	__ceph_flush_snaps(ci, NULL, 0);
-	spin_unlock(&ci->i_ceph_lock);
 }
 
 /*
@@ -1411,52 +1451,6 @@
 	return dirty;
 }
 
-static void __add_cap_flushing_to_inode(struct ceph_inode_info *ci,
-					struct ceph_cap_flush *cf)
-{
-	struct rb_node **p = &ci->i_cap_flush_tree.rb_node;
-	struct rb_node *parent = NULL;
-	struct ceph_cap_flush *other = NULL;
-
-	while (*p) {
-		parent = *p;
-		other = rb_entry(parent, struct ceph_cap_flush, i_node);
-
-		if (cf->tid < other->tid)
-			p = &(*p)->rb_left;
-		else if (cf->tid > other->tid)
-			p = &(*p)->rb_right;
-		else
-			BUG();
-	}
-
-	rb_link_node(&cf->i_node, parent, p);
-	rb_insert_color(&cf->i_node, &ci->i_cap_flush_tree);
-}
-
-static void __add_cap_flushing_to_mdsc(struct ceph_mds_client *mdsc,
-				       struct ceph_cap_flush *cf)
-{
-	struct rb_node **p = &mdsc->cap_flush_tree.rb_node;
-	struct rb_node *parent = NULL;
-	struct ceph_cap_flush *other = NULL;
-
-	while (*p) {
-		parent = *p;
-		other = rb_entry(parent, struct ceph_cap_flush, g_node);
-
-		if (cf->tid < other->tid)
-			p = &(*p)->rb_left;
-		else if (cf->tid > other->tid)
-			p = &(*p)->rb_right;
-		else
-			BUG();
-	}
-
-	rb_link_node(&cf->g_node, parent, p);
-	rb_insert_color(&cf->g_node, &mdsc->cap_flush_tree);
-}
-
 struct ceph_cap_flush *ceph_alloc_cap_flush(void)
 {
 	return kmem_cache_alloc(ceph_cap_flush_cachep, GFP_KERNEL);
@@ -1470,23 +1464,54 @@
 
 static u64 __get_oldest_flush_tid(struct ceph_mds_client *mdsc)
 {
-	struct rb_node *n = rb_first(&mdsc->cap_flush_tree);
-	if (n) {
+	if (!list_empty(&mdsc->cap_flush_list)) {
 		struct ceph_cap_flush *cf =
-			rb_entry(n, struct ceph_cap_flush, g_node);
+			list_first_entry(&mdsc->cap_flush_list,
+					 struct ceph_cap_flush, g_list);
 		return cf->tid;
 	}
 	return 0;
 }
 
 /*
+ * Remove cap_flush from the mdsc's or inode's flushing cap list.
+ * Return true if caller needs to wake up flush waiters.
+ */
+static bool __finish_cap_flush(struct ceph_mds_client *mdsc,
+			       struct ceph_inode_info *ci,
+			       struct ceph_cap_flush *cf)
+{
+	struct ceph_cap_flush *prev;
+	bool wake = cf->wake;
+	if (mdsc) {
+		/* are there older pending cap flushes? */
+		if (wake && cf->g_list.prev != &mdsc->cap_flush_list) {
+			prev = list_prev_entry(cf, g_list);
+			prev->wake = true;
+			wake = false;
+		}
+		list_del(&cf->g_list);
+	} else if (ci) {
+		if (wake && cf->i_list.prev != &ci->i_cap_flush_list) {
+			prev = list_prev_entry(cf, i_list);
+			prev->wake = true;
+			wake = false;
+		}
+		list_del(&cf->i_list);
+	} else {
+		BUG_ON(1);
+	}
+	return wake;
+}
+
+/*
  * Add dirty inode to the flushing list.  Assigned a seq number so we
  * can wait for caps to flush without starving.
  *
  * Called under i_ceph_lock.
  */
 static int __mark_caps_flushing(struct inode *inode,
-				struct ceph_mds_session *session,
+				struct ceph_mds_session *session, bool wake,
 				u64 *flush_tid, u64 *oldest_flush_tid)
 {
 	struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc;
@@ -1509,26 +1534,22 @@
 
 	swap(cf, ci->i_prealloc_cap_flush);
 	cf->caps = flushing;
+	cf->wake = wake;
 
 	spin_lock(&mdsc->cap_dirty_lock);
 	list_del_init(&ci->i_dirty_item);
 
 	cf->tid = ++mdsc->last_cap_flush_tid;
-	__add_cap_flushing_to_mdsc(mdsc, cf);
+	list_add_tail(&cf->g_list, &mdsc->cap_flush_list);
 	*oldest_flush_tid = __get_oldest_flush_tid(mdsc);
 
 	if (list_empty(&ci->i_flushing_item)) {
 		list_add_tail(&ci->i_flushing_item, &session->s_cap_flushing);
 		mdsc->num_cap_flushing++;
-		dout(" inode %p now flushing tid %llu\n", inode, cf->tid);
-	} else {
-		list_move_tail(&ci->i_flushing_item, &session->s_cap_flushing);
-		dout(" inode %p now flushing (more) tid %llu\n",
-		     inode, cf->tid);
 	}
 	spin_unlock(&mdsc->cap_dirty_lock);
 
-	__add_cap_flushing_to_inode(ci, cf);
+	list_add_tail(&cf->i_list, &ci->i_cap_flush_list);
 
 	*flush_tid = cf->tid;
 	return flushing;
@@ -1583,10 +1604,11 @@
 	int mds = -1;   /* keep track of how far we've gone through i_caps list
 			   to avoid an infinite loop on retry */
 	struct rb_node *p;
-	int tried_invalidate = 0;
-	int delayed = 0, sent = 0, force_requeue = 0, num;
-	int queue_invalidate = 0;
-	int is_delayed = flags & CHECK_CAPS_NODELAY;
+	int delayed = 0, sent = 0, num;
+	bool is_delayed = flags & CHECK_CAPS_NODELAY;
+	bool queue_invalidate = false;
+	bool force_requeue = false;
+	bool tried_invalidate = false;
 
 	/* if we are unmounting, flush any unused caps immediately. */
 	if (mdsc->stopping)
@@ -1597,9 +1619,6 @@
 	if (ci->i_ceph_flags & CEPH_I_FLUSH)
 		flags |= CHECK_CAPS_FLUSH;
 
-	/* flush snaps first time around only */
-	if (!list_empty(&ci->i_cap_snaps))
-		__ceph_flush_snaps(ci, &session, 0);
 	goto retry_locked;
 retry:
 	spin_lock(&ci->i_ceph_lock);
@@ -1666,17 +1685,17 @@
 			if (revoking & (CEPH_CAP_FILE_CACHE|
 					CEPH_CAP_FILE_LAZYIO)) {
 				dout("check_caps queuing invalidate\n");
-				queue_invalidate = 1;
+				queue_invalidate = true;
 				ci->i_rdcache_revoking = ci->i_rdcache_gen;
 			} else {
 				dout("check_caps failed to invalidate pages\n");
 				/* we failed to invalidate pages.  check these
 				   caps again later. */
-				force_requeue = 1;
+				force_requeue = true;
 				__cap_set_timeouts(mdsc, ci);
 			}
 		}
-		tried_invalidate = 1;
+		tried_invalidate = true;
 		goto retry_locked;
 	}
 
@@ -1720,10 +1739,15 @@
 			}
 		}
 		/* flush anything dirty? */
-		if (cap == ci->i_auth_cap && (flags & CHECK_CAPS_FLUSH) &&
-		    ci->i_dirty_caps) {
-			dout("flushing dirty caps\n");
-			goto ack;
+		if (cap == ci->i_auth_cap) {
+			if ((flags & CHECK_CAPS_FLUSH) && ci->i_dirty_caps) {
+				dout("flushing dirty caps\n");
+				goto ack;
+			}
+			if (ci->i_ceph_flags & CEPH_I_FLUSH_SNAPS) {
+				dout("flushing snap caps\n");
+				goto ack;
+			}
 		}
 
 		/* completed revocation? going down and there are no caps? */
@@ -1782,6 +1806,26 @@
 				goto retry;
 			}
 		}
+
+		/* kick flushing and flush snaps before sending normal
+		 * cap message */
+		if (cap == ci->i_auth_cap &&
+		    (ci->i_ceph_flags &
+		     (CEPH_I_KICK_FLUSH | CEPH_I_FLUSH_SNAPS))) {
+			if (ci->i_ceph_flags & CEPH_I_KICK_FLUSH) {
+				spin_lock(&mdsc->cap_dirty_lock);
+				oldest_flush_tid = __get_oldest_flush_tid(mdsc);
+				spin_unlock(&mdsc->cap_dirty_lock);
+				__kick_flushing_caps(mdsc, session, ci,
+						     oldest_flush_tid);
+				ci->i_ceph_flags &= ~CEPH_I_KICK_FLUSH;
+			}
+			if (ci->i_ceph_flags & CEPH_I_FLUSH_SNAPS)
+				__ceph_flush_snaps(ci, session);
+
+			goto retry_locked;
+		}
+
 		/* take snap_rwsem after session mutex */
 		if (!took_snap_rwsem) {
 			if (down_read_trylock(&mdsc->snap_rwsem) == 0) {
@@ -1796,7 +1840,7 @@
 		}
 
 		if (cap == ci->i_auth_cap && ci->i_dirty_caps) {
-			flushing = __mark_caps_flushing(inode, session,
+			flushing = __mark_caps_flushing(inode, session, false,
 							&flush_tid,
 							&oldest_flush_tid);
 		} else {
@@ -1822,7 +1866,7 @@
 	 * otherwise cancel.
 	 */
 	if (delayed && is_delayed)
-		force_requeue = 1;   /* __send_cap delayed release; requeue */
+		force_requeue = true;   /* __send_cap delayed release; requeue */
 	if (!delayed && !is_delayed)
 		__cap_delay_cancel(mdsc, ci);
 	else if (!is_delayed || force_requeue)
@@ -1873,8 +1917,8 @@
 		if (cap->session->s_state < CEPH_MDS_SESSION_OPEN)
 			goto out;
 
-		flushing = __mark_caps_flushing(inode, session, &flush_tid,
-						&oldest_flush_tid);
+		flushing = __mark_caps_flushing(inode, session, true,
+						&flush_tid, &oldest_flush_tid);
 
 		/* __send_cap drops i_ceph_lock */
 		delayed = __send_cap(mdsc, cap, CEPH_CAP_OP_FLUSH, used, want,
@@ -1887,10 +1931,11 @@
 			spin_unlock(&ci->i_ceph_lock);
 		}
 	} else {
-		struct rb_node *n = rb_last(&ci->i_cap_flush_tree);
-		if (n) {
+		if (!list_empty(&ci->i_cap_flush_list)) {
 			struct ceph_cap_flush *cf =
-				rb_entry(n, struct ceph_cap_flush, i_node);
+				list_last_entry(&ci->i_cap_flush_list,
+						struct ceph_cap_flush, i_list);
+			cf->wake = true;
 			flush_tid = cf->tid;
 		}
 		flushing = ci->i_flushing_caps;
@@ -1910,14 +1955,13 @@
 static int caps_are_flushed(struct inode *inode, u64 flush_tid)
 {
 	struct ceph_inode_info *ci = ceph_inode(inode);
-	struct ceph_cap_flush *cf;
-	struct rb_node *n;
 	int ret = 1;
 
 	spin_lock(&ci->i_ceph_lock);
-	n = rb_first(&ci->i_cap_flush_tree);
-	if (n) {
-		cf = rb_entry(n, struct ceph_cap_flush, i_node);
+	if (!list_empty(&ci->i_cap_flush_list)) {
+		struct ceph_cap_flush * cf =
+			list_first_entry(&ci->i_cap_flush_list,
+					 struct ceph_cap_flush, i_list);
 		if (cf->tid <= flush_tid)
 			ret = 0;
 	}
@@ -1926,53 +1970,6 @@
 }
 
 /*
- * Wait on any unsafe replies for the given inode.  First wait on the
- * newest request, and make that the upper bound.  Then, if there are
- * more requests, keep waiting on the oldest as long as it is still older
- * than the original request.
- */
-static void sync_write_wait(struct inode *inode)
-{
-	struct ceph_inode_info *ci = ceph_inode(inode);
-	struct list_head *head = &ci->i_unsafe_writes;
-	struct ceph_osd_request *req;
-	u64 last_tid;
-
-	if (!S_ISREG(inode->i_mode))
-		return;
-
-	spin_lock(&ci->i_unsafe_lock);
-	if (list_empty(head))
-		goto out;
-
-	/* set upper bound as _last_ entry in chain */
-	req = list_last_entry(head, struct ceph_osd_request,
-			      r_unsafe_item);
-	last_tid = req->r_tid;
-
-	do {
-		ceph_osdc_get_request(req);
-		spin_unlock(&ci->i_unsafe_lock);
-		dout("sync_write_wait on tid %llu (until %llu)\n",
-		     req->r_tid, last_tid);
-		wait_for_completion(&req->r_safe_completion);
-		spin_lock(&ci->i_unsafe_lock);
-		ceph_osdc_put_request(req);
-
-		/*
-		 * from here on look at first entry in chain, since we
-		 * only want to wait for anything older than last_tid
-		 */
-		if (list_empty(head))
-			break;
-		req = list_first_entry(head, struct ceph_osd_request,
-				       r_unsafe_item);
-	} while (req->r_tid < last_tid);
-out:
-	spin_unlock(&ci->i_unsafe_lock);
-}
-
-/*
  * wait for any unsafe requests to complete.
  */
 static int unsafe_request_wait(struct inode *inode)
@@ -2024,7 +2021,8 @@
 	int dirty;
 
 	dout("fsync %p%s\n", inode, datasync ? " datasync" : "");
-	sync_write_wait(inode);
+
+	ceph_sync_write_wait(inode);
 
 	ret = filemap_write_and_wait_range(inode->i_mapping, start, end);
 	if (ret < 0)
@@ -2087,87 +2085,74 @@
 	return err;
 }
 
-/*
- * After a recovering MDS goes active, we need to resend any caps
- * we were flushing.
- *
- * Caller holds session->s_mutex.
- */
-static void kick_flushing_capsnaps(struct ceph_mds_client *mdsc,
-				   struct ceph_mds_session *session)
-{
-	struct ceph_cap_snap *capsnap;
-
-	dout("kick_flushing_capsnaps mds%d\n", session->s_mds);
-	list_for_each_entry(capsnap, &session->s_cap_snaps_flushing,
-			    flushing_item) {
-		struct ceph_inode_info *ci = capsnap->ci;
-		struct inode *inode = &ci->vfs_inode;
-		struct ceph_cap *cap;
-
-		spin_lock(&ci->i_ceph_lock);
-		cap = ci->i_auth_cap;
-		if (cap && cap->session == session) {
-			dout("kick_flushing_caps %p cap %p capsnap %p\n", inode,
-			     cap, capsnap);
-			__ceph_flush_snaps(ci, &session, 1);
-		} else {
-			pr_err("%p auth cap %p not mds%d ???\n", inode,
-			       cap, session->s_mds);
-		}
-		spin_unlock(&ci->i_ceph_lock);
-	}
-}
-
-static int __kick_flushing_caps(struct ceph_mds_client *mdsc,
-				struct ceph_mds_session *session,
-				struct ceph_inode_info *ci)
+static void __kick_flushing_caps(struct ceph_mds_client *mdsc,
+				 struct ceph_mds_session *session,
+				 struct ceph_inode_info *ci,
+				 u64 oldest_flush_tid)
+	__releases(ci->i_ceph_lock)
+	__acquires(ci->i_ceph_lock)
 {
 	struct inode *inode = &ci->vfs_inode;
 	struct ceph_cap *cap;
 	struct ceph_cap_flush *cf;
-	struct rb_node *n;
-	int delayed = 0;
+	int ret;
 	u64 first_tid = 0;
-	u64 oldest_flush_tid;
 
-	spin_lock(&mdsc->cap_dirty_lock);
-	oldest_flush_tid = __get_oldest_flush_tid(mdsc);
-	spin_unlock(&mdsc->cap_dirty_lock);
+	list_for_each_entry(cf, &ci->i_cap_flush_list, i_list) {
+		if (cf->tid < first_tid)
+			continue;
 
-	while (true) {
-		spin_lock(&ci->i_ceph_lock);
 		cap = ci->i_auth_cap;
 		if (!(cap && cap->session == session)) {
-			pr_err("%p auth cap %p not mds%d ???\n", inode,
-					cap, session->s_mds);
-			spin_unlock(&ci->i_ceph_lock);
+			pr_err("%p auth cap %p not mds%d ???\n",
+			       inode, cap, session->s_mds);
 			break;
 		}
 
-		for (n = rb_first(&ci->i_cap_flush_tree); n; n = rb_next(n)) {
-			cf = rb_entry(n, struct ceph_cap_flush, i_node);
-			if (cf->tid >= first_tid)
-				break;
-		}
-		if (!n) {
-			spin_unlock(&ci->i_ceph_lock);
-			break;
-		}
-
-		cf = rb_entry(n, struct ceph_cap_flush, i_node);
-
 		first_tid = cf->tid + 1;
 
-		dout("kick_flushing_caps %p cap %p tid %llu %s\n", inode,
-		     cap, cf->tid, ceph_cap_string(cf->caps));
-		delayed |= __send_cap(mdsc, cap, CEPH_CAP_OP_FLUSH,
-				      __ceph_caps_used(ci),
-				      __ceph_caps_wanted(ci),
-				      cap->issued | cap->implemented,
-				      cf->caps, cf->tid, oldest_flush_tid);
+		if (cf->caps) {
+			dout("kick_flushing_caps %p cap %p tid %llu %s\n",
+			     inode, cap, cf->tid, ceph_cap_string(cf->caps));
+			ci->i_ceph_flags |= CEPH_I_NODELAY;
+			ret = __send_cap(mdsc, cap, CEPH_CAP_OP_FLUSH,
+					  __ceph_caps_used(ci),
+					  __ceph_caps_wanted(ci),
+					  cap->issued | cap->implemented,
+					  cf->caps, cf->tid, oldest_flush_tid);
+			if (ret) {
+				pr_err("kick_flushing_caps: error sending "
+					"cap flush, ino (%llx.%llx) "
+					"tid %llu flushing %s\n",
+					ceph_vinop(inode), cf->tid,
+					ceph_cap_string(cf->caps));
+			}
+		} else {
+			struct ceph_cap_snap *capsnap =
+					container_of(cf, struct ceph_cap_snap,
+						    cap_flush);
+			dout("kick_flushing_caps %p capsnap %p tid %llu %s\n",
+			     inode, capsnap, cf->tid,
+			     ceph_cap_string(capsnap->dirty));
+
+			atomic_inc(&capsnap->nref);
+			spin_unlock(&ci->i_ceph_lock);
+
+			ret = __send_flush_snap(inode, session, capsnap, cap->mseq,
+						oldest_flush_tid);
+			if (ret < 0) {
+				pr_err("kick_flushing_caps: error sending "
+					"cap flushsnap, ino (%llx.%llx) "
+					"tid %llu follows %llu\n",
+					ceph_vinop(inode), cf->tid,
+					capsnap->follows);
+			}
+
+			ceph_put_cap_snap(capsnap);
+		}
+
+		spin_lock(&ci->i_ceph_lock);
 	}
-	return delayed;
 }
 
 void ceph_early_kick_flushing_caps(struct ceph_mds_client *mdsc,
@@ -2175,8 +2160,14 @@
 {
 	struct ceph_inode_info *ci;
 	struct ceph_cap *cap;
+	u64 oldest_flush_tid;
 
 	dout("early_kick_flushing_caps mds%d\n", session->s_mds);
+
+	spin_lock(&mdsc->cap_dirty_lock);
+	oldest_flush_tid = __get_oldest_flush_tid(mdsc);
+	spin_unlock(&mdsc->cap_dirty_lock);
+
 	list_for_each_entry(ci, &session->s_cap_flushing, i_flushing_item) {
 		spin_lock(&ci->i_ceph_lock);
 		cap = ci->i_auth_cap;
@@ -2196,10 +2187,11 @@
 		 */
 		if ((cap->issued & ci->i_flushing_caps) !=
 		    ci->i_flushing_caps) {
-			spin_unlock(&ci->i_ceph_lock);
-			if (!__kick_flushing_caps(mdsc, session, ci))
-				continue;
-			spin_lock(&ci->i_ceph_lock);
+			ci->i_ceph_flags &= ~CEPH_I_KICK_FLUSH;
+			__kick_flushing_caps(mdsc, session, ci,
+					     oldest_flush_tid);
+		} else {
+			ci->i_ceph_flags |= CEPH_I_KICK_FLUSH;
 		}
 
 		spin_unlock(&ci->i_ceph_lock);
@@ -2210,50 +2202,56 @@
 			     struct ceph_mds_session *session)
 {
 	struct ceph_inode_info *ci;
-
-	kick_flushing_capsnaps(mdsc, session);
+	struct ceph_cap *cap;
+	u64 oldest_flush_tid;
 
 	dout("kick_flushing_caps mds%d\n", session->s_mds);
+
+	spin_lock(&mdsc->cap_dirty_lock);
+	oldest_flush_tid = __get_oldest_flush_tid(mdsc);
+	spin_unlock(&mdsc->cap_dirty_lock);
+
 	list_for_each_entry(ci, &session->s_cap_flushing, i_flushing_item) {
-		int delayed = __kick_flushing_caps(mdsc, session, ci);
-		if (delayed) {
-			spin_lock(&ci->i_ceph_lock);
-			__cap_delay_requeue(mdsc, ci);
+		spin_lock(&ci->i_ceph_lock);
+		cap = ci->i_auth_cap;
+		if (!(cap && cap->session == session)) {
+			pr_err("%p auth cap %p not mds%d ???\n",
+				&ci->vfs_inode, cap, session->s_mds);
 			spin_unlock(&ci->i_ceph_lock);
+			continue;
 		}
+		if (ci->i_ceph_flags & CEPH_I_KICK_FLUSH) {
+			ci->i_ceph_flags &= ~CEPH_I_KICK_FLUSH;
+			__kick_flushing_caps(mdsc, session, ci,
+					     oldest_flush_tid);
+		}
+		spin_unlock(&ci->i_ceph_lock);
 	}
 }
 
 static void kick_flushing_inode_caps(struct ceph_mds_client *mdsc,
 				     struct ceph_mds_session *session,
 				     struct inode *inode)
+	__releases(ci->i_ceph_lock)
 {
 	struct ceph_inode_info *ci = ceph_inode(inode);
 	struct ceph_cap *cap;
 
-	spin_lock(&ci->i_ceph_lock);
 	cap = ci->i_auth_cap;
 	dout("kick_flushing_inode_caps %p flushing %s\n", inode,
 	     ceph_cap_string(ci->i_flushing_caps));
 
-	__ceph_flush_snaps(ci, &session, 1);
-
-	if (ci->i_flushing_caps) {
-		int delayed;
-
+	if (!list_empty(&ci->i_cap_flush_list)) {
+		u64 oldest_flush_tid;
 		spin_lock(&mdsc->cap_dirty_lock);
 		list_move_tail(&ci->i_flushing_item,
 			       &cap->session->s_cap_flushing);
+		oldest_flush_tid = __get_oldest_flush_tid(mdsc);
 		spin_unlock(&mdsc->cap_dirty_lock);
 
+		ci->i_ceph_flags &= ~CEPH_I_KICK_FLUSH;
+		__kick_flushing_caps(mdsc, session, ci, oldest_flush_tid);
 		spin_unlock(&ci->i_ceph_lock);
-
-		delayed = __kick_flushing_caps(mdsc, session, ci);
-		if (delayed) {
-			spin_lock(&ci->i_ceph_lock);
-			__cap_delay_requeue(mdsc, ci);
-			spin_unlock(&ci->i_ceph_lock);
-		}
 	} else {
 		spin_unlock(&ci->i_ceph_lock);
 	}
@@ -2580,16 +2578,19 @@
  * drop cap_snap that is not associated with any snapshot.
  * we don't need to send FLUSHSNAP message for it.
  */
-static int ceph_try_drop_cap_snap(struct ceph_cap_snap *capsnap)
+static int ceph_try_drop_cap_snap(struct ceph_inode_info *ci,
+				  struct ceph_cap_snap *capsnap)
 {
 	if (!capsnap->need_flush &&
 	    !capsnap->writing && !capsnap->dirty_pages) {
-
 		dout("dropping cap_snap %p follows %llu\n",
 		     capsnap, capsnap->follows);
+		BUG_ON(capsnap->cap_flush.tid > 0);
 		ceph_put_snap_context(capsnap->context);
+		if (!list_is_last(&capsnap->ci_item, &ci->i_cap_snaps))
+			ci->i_ceph_flags |= CEPH_I_FLUSH_SNAPS;
+
 		list_del(&capsnap->ci_item);
-		list_del(&capsnap->flushing_item);
 		ceph_put_cap_snap(capsnap);
 		return 1;
 	}
@@ -2636,7 +2637,7 @@
 							struct ceph_cap_snap,
 							ci_item);
 				capsnap->writing = 0;
-				if (ceph_try_drop_cap_snap(capsnap))
+				if (ceph_try_drop_cap_snap(ci, capsnap))
 					put++;
 				else if (__ceph_finish_cap_snap(ci, capsnap))
 					flushsnaps = 1;
@@ -2661,7 +2662,7 @@
 	if (last && !flushsnaps)
 		ceph_check_caps(ci, 0, NULL);
 	else if (flushsnaps)
-		ceph_flush_snaps(ci);
+		ceph_flush_snaps(ci, NULL);
 	if (wake)
 		wake_up_all(&ci->i_cap_wq);
 	while (put-- > 0)
@@ -2679,15 +2680,19 @@
 				struct ceph_snap_context *snapc)
 {
 	struct inode *inode = &ci->vfs_inode;
-	int last = 0;
-	int complete_capsnap = 0;
-	int drop_capsnap = 0;
-	int found = 0;
 	struct ceph_cap_snap *capsnap = NULL;
+	int put = 0;
+	bool last = false;
+	bool found = false;
+	bool flush_snaps = false;
+	bool complete_capsnap = false;
 
 	spin_lock(&ci->i_ceph_lock);
 	ci->i_wrbuffer_ref -= nr;
-	last = !ci->i_wrbuffer_ref;
+	if (ci->i_wrbuffer_ref == 0) {
+		last = true;
+		put++;
+	}
 
 	if (ci->i_head_snapc == snapc) {
 		ci->i_wrbuffer_ref_head -= nr;
@@ -2707,15 +2712,22 @@
 	} else {
 		list_for_each_entry(capsnap, &ci->i_cap_snaps, ci_item) {
 			if (capsnap->context == snapc) {
-				found = 1;
+				found = true;
 				break;
 			}
 		}
 		BUG_ON(!found);
 		capsnap->dirty_pages -= nr;
 		if (capsnap->dirty_pages == 0) {
-			complete_capsnap = 1;
-			drop_capsnap = ceph_try_drop_cap_snap(capsnap);
+			complete_capsnap = true;
+			if (!capsnap->writing) {
+				if (ceph_try_drop_cap_snap(ci, capsnap)) {
+					put++;
+				} else {
+					ci->i_ceph_flags |= CEPH_I_FLUSH_SNAPS;
+					flush_snaps = true;
+				}
+			}
 		}
 		dout("put_wrbuffer_cap_refs on %p cap_snap %p "
 		     " snap %lld %d/%d -> %d/%d %s%s\n",
@@ -2730,12 +2742,12 @@
 
 	if (last) {
 		ceph_check_caps(ci, CHECK_CAPS_AUTHONLY, NULL);
-		iput(inode);
-	} else if (complete_capsnap) {
-		ceph_flush_snaps(ci);
-		wake_up_all(&ci->i_cap_wq);
+	} else if (flush_snaps) {
+		ceph_flush_snaps(ci, NULL);
 	}
-	if (drop_capsnap)
+	if (complete_capsnap)
+		wake_up_all(&ci->i_cap_wq);
+	while (put-- > 0)
 		iput(inode);
 }
 
@@ -2779,12 +2791,11 @@
  */
 static void handle_cap_grant(struct ceph_mds_client *mdsc,
 			     struct inode *inode, struct ceph_mds_caps *grant,
-			     u64 inline_version,
-			     void *inline_data, int inline_len,
+			     struct ceph_string **pns, u64 inline_version,
+			     void *inline_data, u32 inline_len,
 			     struct ceph_buffer *xattr_buf,
 			     struct ceph_mds_session *session,
-			     struct ceph_cap *cap, int issued,
-			     u32 pool_ns_len)
+			     struct ceph_cap *cap, int issued)
 	__releases(ci->i_ceph_lock)
 	__releases(mdsc->snap_rwsem)
 {
@@ -2895,8 +2906,18 @@
 
 	if (newcaps & (CEPH_CAP_ANY_FILE_RD | CEPH_CAP_ANY_FILE_WR)) {
 		/* file layout may have changed */
-		ci->i_layout = grant->layout;
-		ci->i_pool_ns_len = pool_ns_len;
+		s64 old_pool = ci->i_layout.pool_id;
+		struct ceph_string *old_ns;
+
+		ceph_file_layout_from_legacy(&ci->i_layout, &grant->layout);
+		old_ns = rcu_dereference_protected(ci->i_layout.pool_ns,
+					lockdep_is_held(&ci->i_ceph_lock));
+		rcu_assign_pointer(ci->i_layout.pool_ns, *pns);
+
+		if (ci->i_layout.pool_id != old_pool || *pns != old_ns)
+			ci->i_ceph_flags &= ~CEPH_I_POOL_PERM;
+
+		*pns = old_ns;
 
 		/* size/truncate_seq? */
 		queue_trunc = ceph_fill_file_size(inode, issued,
@@ -2979,13 +3000,13 @@
 			fill_inline = true;
 	}
 
-	spin_unlock(&ci->i_ceph_lock);
-
 	if (le32_to_cpu(grant->op) == CEPH_CAP_OP_IMPORT) {
-		kick_flushing_inode_caps(mdsc, session, inode);
-		up_read(&mdsc->snap_rwsem);
 		if (newcaps & ~issued)
 			wake = true;
+		kick_flushing_inode_caps(mdsc, session, inode);
+		up_read(&mdsc->snap_rwsem);
+	} else {
+		spin_unlock(&ci->i_ceph_lock);
 	}
 
 	if (fill_inline)
@@ -3029,23 +3050,24 @@
 {
 	struct ceph_inode_info *ci = ceph_inode(inode);
 	struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc;
-	struct ceph_cap_flush *cf;
-	struct rb_node *n;
+	struct ceph_cap_flush *cf, *tmp_cf;
 	LIST_HEAD(to_remove);
 	unsigned seq = le32_to_cpu(m->seq);
 	int dirty = le32_to_cpu(m->dirty);
 	int cleaned = 0;
-	int drop = 0;
+	bool drop = false;
+	bool wake_ci = 0;
+	bool wake_mdsc = 0;
 
-	n = rb_first(&ci->i_cap_flush_tree);
-	while (n) {
-		cf = rb_entry(n, struct ceph_cap_flush, i_node);
-		n = rb_next(&cf->i_node);
+	list_for_each_entry_safe(cf, tmp_cf, &ci->i_cap_flush_list, i_list) {
 		if (cf->tid == flush_tid)
 			cleaned = cf->caps;
+		if (cf->caps == 0) /* capsnap */
+			continue;
 		if (cf->tid <= flush_tid) {
-			rb_erase(&cf->i_node, &ci->i_cap_flush_tree);
-			list_add_tail(&cf->list, &to_remove);
+			if (__finish_cap_flush(NULL, ci, cf))
+				wake_ci = true;
+			list_add_tail(&cf->i_list, &to_remove);
 		} else {
 			cleaned &= ~cf->caps;
 			if (!cleaned)
@@ -3066,31 +3088,29 @@
 
 	spin_lock(&mdsc->cap_dirty_lock);
 
-	if (!list_empty(&to_remove)) {
-		list_for_each_entry(cf, &to_remove, list)
-			rb_erase(&cf->g_node, &mdsc->cap_flush_tree);
-
-		n = rb_first(&mdsc->cap_flush_tree);
-		cf = n ? rb_entry(n, struct ceph_cap_flush, g_node) : NULL;
-		if (!cf || cf->tid > flush_tid)
-			wake_up_all(&mdsc->cap_flushing_wq);
+	list_for_each_entry(cf, &to_remove, i_list) {
+		if (__finish_cap_flush(mdsc, NULL, cf))
+			wake_mdsc = true;
 	}
 
 	if (ci->i_flushing_caps == 0) {
-		list_del_init(&ci->i_flushing_item);
-		if (!list_empty(&session->s_cap_flushing))
-			dout(" mds%d still flushing cap on %p\n",
-			     session->s_mds,
-			     &list_entry(session->s_cap_flushing.next,
-					 struct ceph_inode_info,
-					 i_flushing_item)->vfs_inode);
+		if (list_empty(&ci->i_cap_flush_list)) {
+			list_del_init(&ci->i_flushing_item);
+			if (!list_empty(&session->s_cap_flushing)) {
+				dout(" mds%d still flushing cap on %p\n",
+				     session->s_mds,
+				     &list_first_entry(&session->s_cap_flushing,
+						struct ceph_inode_info,
+						i_flushing_item)->vfs_inode);
+			}
+		}
 		mdsc->num_cap_flushing--;
 		dout(" inode %p now !flushing\n", inode);
 
 		if (ci->i_dirty_caps == 0) {
 			dout(" inode %p now clean\n", inode);
 			BUG_ON(!list_empty(&ci->i_dirty_item));
-			drop = 1;
+			drop = true;
 			if (ci->i_wr_ref == 0 &&
 			    ci->i_wrbuffer_ref_head == 0) {
 				BUG_ON(!ci->i_head_snapc);
@@ -3102,17 +3122,21 @@
 		}
 	}
 	spin_unlock(&mdsc->cap_dirty_lock);
-	wake_up_all(&ci->i_cap_wq);
 
 out:
 	spin_unlock(&ci->i_ceph_lock);
 
 	while (!list_empty(&to_remove)) {
 		cf = list_first_entry(&to_remove,
-				      struct ceph_cap_flush, list);
-		list_del(&cf->list);
+				      struct ceph_cap_flush, i_list);
+		list_del(&cf->i_list);
 		ceph_free_cap_flush(cf);
 	}
+
+	if (wake_ci)
+		wake_up_all(&ci->i_cap_wq);
+	if (wake_mdsc)
+		wake_up_all(&mdsc->cap_flushing_wq);
 	if (drop)
 		iput(inode);
 }
@@ -3131,7 +3155,9 @@
 	struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc;
 	u64 follows = le64_to_cpu(m->snap_follows);
 	struct ceph_cap_snap *capsnap;
-	int drop = 0;
+	bool flushed = false;
+	bool wake_ci = false;
+	bool wake_mdsc = false;
 
 	dout("handle_cap_flushsnap_ack inode %p ci %p mds%d follows %lld\n",
 	     inode, ci, session->s_mds, follows);
@@ -3139,30 +3165,47 @@
 	spin_lock(&ci->i_ceph_lock);
 	list_for_each_entry(capsnap, &ci->i_cap_snaps, ci_item) {
 		if (capsnap->follows == follows) {
-			if (capsnap->flush_tid != flush_tid) {
+			if (capsnap->cap_flush.tid != flush_tid) {
 				dout(" cap_snap %p follows %lld tid %lld !="
 				     " %lld\n", capsnap, follows,
-				     flush_tid, capsnap->flush_tid);
+				     flush_tid, capsnap->cap_flush.tid);
 				break;
 			}
-			WARN_ON(capsnap->dirty_pages || capsnap->writing);
-			dout(" removing %p cap_snap %p follows %lld\n",
-			     inode, capsnap, follows);
-			ceph_put_snap_context(capsnap->context);
-			list_del(&capsnap->ci_item);
-			list_del(&capsnap->flushing_item);
-			ceph_put_cap_snap(capsnap);
-			wake_up_all(&mdsc->cap_flushing_wq);
-			drop = 1;
+			flushed = true;
 			break;
 		} else {
 			dout(" skipping cap_snap %p follows %lld\n",
 			     capsnap, capsnap->follows);
 		}
 	}
+	if (flushed) {
+		WARN_ON(capsnap->dirty_pages || capsnap->writing);
+		dout(" removing %p cap_snap %p follows %lld\n",
+		     inode, capsnap, follows);
+		list_del(&capsnap->ci_item);
+		if (__finish_cap_flush(NULL, ci, &capsnap->cap_flush))
+			wake_ci = true;
+
+		spin_lock(&mdsc->cap_dirty_lock);
+
+		if (list_empty(&ci->i_cap_flush_list))
+			list_del_init(&ci->i_flushing_item);
+
+		if (__finish_cap_flush(mdsc, NULL, &capsnap->cap_flush))
+			wake_mdsc = true;
+
+		spin_unlock(&mdsc->cap_dirty_lock);
+	}
 	spin_unlock(&ci->i_ceph_lock);
-	if (drop)
+	if (flushed) {
+		ceph_put_snap_context(capsnap->context);
+		ceph_put_cap_snap(capsnap);
+		if (wake_ci)
+			wake_up_all(&ci->i_cap_wq);
+		if (wake_mdsc)
+			wake_up_all(&mdsc->cap_flushing_wq);
 		iput(inode);
+	}
 }
 
 /*
@@ -3267,7 +3310,8 @@
 			tcap->implemented |= issued;
 			if (cap == ci->i_auth_cap)
 				ci->i_auth_cap = tcap;
-			if (ci->i_flushing_caps && ci->i_auth_cap == tcap) {
+			if (!list_empty(&ci->i_cap_flush_list) &&
+			    ci->i_auth_cap == tcap) {
 				spin_lock(&mdsc->cap_dirty_lock);
 				list_move_tail(&ci->i_flushing_item,
 					       &tcap->session->s_cap_flushing);
@@ -3420,20 +3464,18 @@
 	struct ceph_cap *cap;
 	struct ceph_mds_caps *h;
 	struct ceph_mds_cap_peer *peer = NULL;
-	struct ceph_snap_realm *realm;
+	struct ceph_snap_realm *realm = NULL;
+	struct ceph_string *pool_ns = NULL;
 	int mds = session->s_mds;
 	int op, issued;
 	u32 seq, mseq;
 	struct ceph_vino vino;
-	u64 cap_id;
-	u64 size, max_size;
 	u64 tid;
 	u64 inline_version = 0;
 	void *inline_data = NULL;
 	u32  inline_len = 0;
 	void *snaptrace;
 	size_t snaptrace_len;
-	u32 pool_ns_len = 0;
 	void *p, *end;
 
 	dout("handle_caps from mds%d\n", mds);
@@ -3447,11 +3489,8 @@
 	op = le32_to_cpu(h->op);
 	vino.ino = le64_to_cpu(h->ino);
 	vino.snap = CEPH_NOSNAP;
-	cap_id = le64_to_cpu(h->cap_id);
 	seq = le32_to_cpu(h->seq);
 	mseq = le32_to_cpu(h->migrate_seq);
-	size = le64_to_cpu(h->size);
-	max_size = le64_to_cpu(h->max_size);
 
 	snaptrace = h + 1;
 	snaptrace_len = le32_to_cpu(h->snap_trace_len);
@@ -3490,6 +3529,7 @@
 		u64 flush_tid;
 		u32 caller_uid, caller_gid;
 		u32 osd_epoch_barrier;
+		u32 pool_ns_len;
 		/* version >= 5 */
 		ceph_decode_32_safe(&p, end, osd_epoch_barrier, bad);
 		/* version >= 6 */
@@ -3499,6 +3539,11 @@
 		ceph_decode_32_safe(&p, end, caller_gid, bad);
 		/* version >= 8 */
 		ceph_decode_32_safe(&p, end, pool_ns_len, bad);
+		if (pool_ns_len > 0) {
+			ceph_decode_need(&p, end, pool_ns_len, bad);
+			pool_ns = ceph_find_or_create_string(p, pool_ns_len);
+			p += pool_ns_len;
+		}
 	}
 
 	/* lookup ino */
@@ -3519,7 +3564,7 @@
 			cap = ceph_get_cap(mdsc, NULL);
 			cap->cap_ino = vino.ino;
 			cap->queue_release = 1;
-			cap->cap_id = cap_id;
+			cap->cap_id = le64_to_cpu(h->cap_id);
 			cap->mseq = mseq;
 			cap->seq = seq;
 			spin_lock(&session->s_cap_lock);
@@ -3554,10 +3599,9 @@
 		}
 		handle_cap_import(mdsc, inode, h, peer, session,
 				  &cap, &issued);
-		handle_cap_grant(mdsc, inode, h,
+		handle_cap_grant(mdsc, inode, h, &pool_ns,
 				 inline_version, inline_data, inline_len,
-				 msg->middle, session, cap, issued,
-				 pool_ns_len);
+				 msg->middle, session, cap, issued);
 		if (realm)
 			ceph_put_snap_realm(mdsc, realm);
 		goto done_unlocked;
@@ -3579,10 +3623,9 @@
 	case CEPH_CAP_OP_GRANT:
 		__ceph_caps_issued(ci, &issued);
 		issued |= __ceph_caps_dirty(ci);
-		handle_cap_grant(mdsc, inode, h,
+		handle_cap_grant(mdsc, inode, h, &pool_ns,
 				 inline_version, inline_data, inline_len,
-				 msg->middle, session, cap, issued,
-				 pool_ns_len);
+				 msg->middle, session, cap, issued);
 		goto done_unlocked;
 
 	case CEPH_CAP_OP_FLUSH_ACK:
@@ -3613,6 +3656,7 @@
 	mutex_unlock(&session->s_mutex);
 done_unlocked:
 	iput(inode);
+	ceph_put_string(pool_ns);
 	return;
 
 bad:
@@ -3673,6 +3717,16 @@
 	dout("flush_dirty_caps done\n");
 }
 
+void __ceph_get_fmode(struct ceph_inode_info *ci, int fmode)
+{
+	int i;
+	int bits = (fmode << 1) | 1;
+	for (i = 0; i < CEPH_FILE_MODE_BITS; i++) {
+		if (bits & (1 << i))
+			ci->i_nr_by_mode[i]++;
+	}
+}
+
 /*
  * Drop open file reference.  If we were the last open file,
  * we may need to release capabilities to the MDS (or schedule
@@ -3680,15 +3734,20 @@
  */
 void ceph_put_fmode(struct ceph_inode_info *ci, int fmode)
 {
-	struct inode *inode = &ci->vfs_inode;
-	int last = 0;
-
+	int i, last = 0;
+	int bits = (fmode << 1) | 1;
 	spin_lock(&ci->i_ceph_lock);
-	dout("put_fmode %p fmode %d %d -> %d\n", inode, fmode,
-	     ci->i_nr_by_mode[fmode], ci->i_nr_by_mode[fmode]-1);
-	BUG_ON(ci->i_nr_by_mode[fmode] == 0);
-	if (--ci->i_nr_by_mode[fmode] == 0)
-		last++;
+	for (i = 0; i < CEPH_FILE_MODE_BITS; i++) {
+		if (bits & (1 << i)) {
+			BUG_ON(ci->i_nr_by_mode[i] == 0);
+			if (--ci->i_nr_by_mode[i] == 0)
+				last++;
+		}
+	}
+	dout("put_fmode %p fmode %d {%d,%d,%d,%d}\n",
+	     &ci->vfs_inode, fmode,
+	     ci->i_nr_by_mode[0], ci->i_nr_by_mode[1],
+	     ci->i_nr_by_mode[2], ci->i_nr_by_mode[3]);
 	spin_unlock(&ci->i_ceph_lock);
 
 	if (last && ci->i_vino.snap == CEPH_NOSNAP)
diff --git a/fs/ceph/dir.c b/fs/ceph/dir.c
index 6e0fedf..c64a0b7 100644
--- a/fs/ceph/dir.c
+++ b/fs/ceph/dir.c
@@ -59,7 +59,7 @@
 
 	di->dentry = dentry;
 	di->lease_session = NULL;
-	dentry->d_time = jiffies;
+	di->time = jiffies;
 	/* avoid reordering d_fsdata setup so that the check above is safe */
 	smp_mb();
 	dentry->d_fsdata = di;
@@ -1124,7 +1124,7 @@
 void ceph_invalidate_dentry_lease(struct dentry *dentry)
 {
 	spin_lock(&dentry->d_lock);
-	dentry->d_time = jiffies;
+	ceph_dentry(dentry)->time = jiffies;
 	ceph_dentry(dentry)->lease_shared_gen = 0;
 	spin_unlock(&dentry->d_lock);
 }
@@ -1133,7 +1133,8 @@
  * Check if dentry lease is valid.  If not, delete the lease.  Try to
  * renew if the least is more than half up.
  */
-static int dentry_lease_is_valid(struct dentry *dentry)
+static int dentry_lease_is_valid(struct dentry *dentry, unsigned int flags,
+				 struct inode *dir)
 {
 	struct ceph_dentry_info *di;
 	struct ceph_mds_session *s;
@@ -1141,12 +1142,11 @@
 	u32 gen;
 	unsigned long ttl;
 	struct ceph_mds_session *session = NULL;
-	struct inode *dir = NULL;
 	u32 seq = 0;
 
 	spin_lock(&dentry->d_lock);
 	di = ceph_dentry(dentry);
-	if (di->lease_session) {
+	if (di && di->lease_session) {
 		s = di->lease_session;
 		spin_lock(&s->s_gen_ttl_lock);
 		gen = s->s_cap_gen;
@@ -1154,17 +1154,24 @@
 		spin_unlock(&s->s_gen_ttl_lock);
 
 		if (di->lease_gen == gen &&
-		    time_before(jiffies, dentry->d_time) &&
+		    time_before(jiffies, di->time) &&
 		    time_before(jiffies, ttl)) {
 			valid = 1;
 			if (di->lease_renew_after &&
 			    time_after(jiffies, di->lease_renew_after)) {
-				/* we should renew */
-				dir = d_inode(dentry->d_parent);
-				session = ceph_get_mds_session(s);
-				seq = di->lease_seq;
-				di->lease_renew_after = 0;
-				di->lease_renew_from = jiffies;
+				/*
+				 * We should renew. If we're in RCU walk mode
+				 * though, we can't do that so just return
+				 * -ECHILD.
+				 */
+				if (flags & LOOKUP_RCU) {
+					valid = -ECHILD;
+				} else {
+					session = ceph_get_mds_session(s);
+					seq = di->lease_seq;
+					di->lease_renew_after = 0;
+					di->lease_renew_from = jiffies;
+				}
 			}
 		}
 	}
@@ -1207,15 +1214,19 @@
 	struct dentry *parent;
 	struct inode *dir;
 
-	if (flags & LOOKUP_RCU)
-		return -ECHILD;
+	if (flags & LOOKUP_RCU) {
+		parent = ACCESS_ONCE(dentry->d_parent);
+		dir = d_inode_rcu(parent);
+		if (!dir)
+			return -ECHILD;
+	} else {
+		parent = dget_parent(dentry);
+		dir = d_inode(parent);
+	}
 
 	dout("d_revalidate %p '%pd' inode %p offset %lld\n", dentry,
 	     dentry, d_inode(dentry), ceph_dentry(dentry)->offset);
 
-	parent = dget_parent(dentry);
-	dir = d_inode(parent);
-
 	/* always trust cached snapped dentries, snapdir dentry */
 	if (ceph_snap(dir) != CEPH_NOSNAP) {
 		dout("d_revalidate %p '%pd' inode %p is SNAPPED\n", dentry,
@@ -1224,12 +1235,16 @@
 	} else if (d_really_is_positive(dentry) &&
 		   ceph_snap(d_inode(dentry)) == CEPH_SNAPDIR) {
 		valid = 1;
-	} else if (dentry_lease_is_valid(dentry) ||
-		   dir_lease_is_valid(dir, dentry)) {
-		if (d_really_is_positive(dentry))
-			valid = ceph_is_any_caps(d_inode(dentry));
-		else
-			valid = 1;
+	} else {
+		valid = dentry_lease_is_valid(dentry, flags, dir);
+		if (valid == -ECHILD)
+			return valid;
+		if (valid || dir_lease_is_valid(dir, dentry)) {
+			if (d_really_is_positive(dentry))
+				valid = ceph_is_any_caps(d_inode(dentry));
+			else
+				valid = 1;
+		}
 	}
 
 	if (!valid) {
@@ -1238,6 +1253,9 @@
 		struct ceph_mds_request *req;
 		int op, mask, err;
 
+		if (flags & LOOKUP_RCU)
+			return -ECHILD;
+
 		op = ceph_snap(dir) == CEPH_SNAPDIR ?
 			CEPH_MDS_OP_LOOKUPSNAP : CEPH_MDS_OP_LOOKUP;
 		req = ceph_mdsc_create_request(mdsc, op, USE_ANY_MDS);
@@ -1273,7 +1291,8 @@
 		ceph_dir_clear_complete(dir);
 	}
 
-	dput(parent);
+	if (!(flags & LOOKUP_RCU))
+		dput(parent);
 	return valid;
 }
 
@@ -1286,10 +1305,14 @@
 
 	dout("d_release %p\n", dentry);
 	ceph_dentry_lru_del(dentry);
+
+	spin_lock(&dentry->d_lock);
+	dentry->d_fsdata = NULL;
+	spin_unlock(&dentry->d_lock);
+
 	if (di->lease_session)
 		ceph_put_mds_session(di->lease_session);
 	kmem_cache_free(ceph_dentry_cachep, di);
-	dentry->d_fsdata = NULL;
 }
 
 static int ceph_snapdir_d_revalidate(struct dentry *dentry,
diff --git a/fs/ceph/file.c b/fs/ceph/file.c
index 0daaf7c..0f5375d 100644
--- a/fs/ceph/file.c
+++ b/fs/ceph/file.c
@@ -708,7 +708,7 @@
 		}
 	}
 
-	ceph_put_page_vector(osd_data->pages, num_pages, false);
+	ceph_put_page_vector(osd_data->pages, num_pages, !aio_req->write);
 	ceph_osdc_put_request(req);
 
 	if (rc < 0)
@@ -821,6 +821,54 @@
 	}
 }
 
+/*
+ * Wait on any unsafe replies for the given inode.  First wait on the
+ * newest request, and make that the upper bound.  Then, if there are
+ * more requests, keep waiting on the oldest as long as it is still older
+ * than the original request.
+ */
+void ceph_sync_write_wait(struct inode *inode)
+{
+	struct ceph_inode_info *ci = ceph_inode(inode);
+	struct list_head *head = &ci->i_unsafe_writes;
+	struct ceph_osd_request *req;
+	u64 last_tid;
+
+	if (!S_ISREG(inode->i_mode))
+		return;
+
+	spin_lock(&ci->i_unsafe_lock);
+	if (list_empty(head))
+		goto out;
+
+	/* set upper bound as _last_ entry in chain */
+
+	req = list_last_entry(head, struct ceph_osd_request,
+			      r_unsafe_item);
+	last_tid = req->r_tid;
+
+	do {
+		ceph_osdc_get_request(req);
+		spin_unlock(&ci->i_unsafe_lock);
+
+		dout("sync_write_wait on tid %llu (until %llu)\n",
+		     req->r_tid, last_tid);
+		wait_for_completion(&req->r_safe_completion);
+		ceph_osdc_put_request(req);
+
+		spin_lock(&ci->i_unsafe_lock);
+		/*
+		 * from here on look at first entry in chain, since we
+		 * only want to wait for anything older than last_tid
+		 */
+		if (list_empty(head))
+			break;
+		req = list_first_entry(head, struct ceph_osd_request,
+				       r_unsafe_item);
+	} while (req->r_tid < last_tid);
+out:
+	spin_unlock(&ci->i_unsafe_lock);
+}
 
 static ssize_t
 ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter,
@@ -964,7 +1012,7 @@
 				len = ret;
 		}
 
-		ceph_put_page_vector(pages, num_pages, false);
+		ceph_put_page_vector(pages, num_pages, !write);
 
 		ceph_osdc_put_request(req);
 		if (ret < 0)
@@ -985,6 +1033,8 @@
 	}
 
 	if (aio_req) {
+		LIST_HEAD(osd_reqs);
+
 		if (aio_req->num_reqs == 0) {
 			kfree(aio_req);
 			return ret;
@@ -993,8 +1043,9 @@
 		ceph_get_cap_refs(ci, write ? CEPH_CAP_FILE_WR :
 					      CEPH_CAP_FILE_RD);
 
-		while (!list_empty(&aio_req->osd_reqs)) {
-			req = list_first_entry(&aio_req->osd_reqs,
+		list_splice(&aio_req->osd_reqs, &osd_reqs);
+		while (!list_empty(&osd_reqs)) {
+			req = list_first_entry(&osd_reqs,
 					       struct ceph_osd_request,
 					       r_unsafe_item);
 			list_del_init(&req->r_unsafe_item);
@@ -1448,16 +1499,14 @@
 {
 	struct inode *inode = file->f_mapping->host;
 	loff_t i_size;
-	int ret;
+	loff_t ret;
 
 	inode_lock(inode);
 
 	if (whence == SEEK_END || whence == SEEK_DATA || whence == SEEK_HOLE) {
 		ret = ceph_do_getattr(inode, CEPH_STAT_CAP_SIZE, false);
-		if (ret < 0) {
-			offset = ret;
+		if (ret < 0)
 			goto out;
-		}
 	}
 
 	i_size = i_size_read(inode);
@@ -1473,7 +1522,7 @@
 		 * write() or lseek() might have altered it
 		 */
 		if (offset == 0) {
-			offset = file->f_pos;
+			ret = file->f_pos;
 			goto out;
 		}
 		offset += file->f_pos;
@@ -1493,11 +1542,11 @@
 		break;
 	}
 
-	offset = vfs_setpos(file, offset, inode->i_sb->s_maxbytes);
+	ret = vfs_setpos(file, offset, inode->i_sb->s_maxbytes);
 
 out:
 	inode_unlock(inode);
-	return offset;
+	return ret;
 }
 
 static inline void ceph_zero_partial_page(
@@ -1583,9 +1632,9 @@
 {
 	int ret = 0;
 	struct ceph_inode_info *ci = ceph_inode(inode);
-	s32 stripe_unit = ceph_file_layout_su(ci->i_layout);
-	s32 stripe_count = ceph_file_layout_stripe_count(ci->i_layout);
-	s32 object_size = ceph_file_layout_object_size(ci->i_layout);
+	s32 stripe_unit = ci->i_layout.stripe_unit;
+	s32 stripe_count = ci->i_layout.stripe_count;
+	s32 object_size = ci->i_layout.object_size;
 	u64 object_set_size = object_size * stripe_count;
 	u64 nearly, t;
 
diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c
index 99bdef6..dd3a6db 100644
--- a/fs/ceph/inode.c
+++ b/fs/ceph/inode.c
@@ -446,7 +446,7 @@
 	ci->i_symlink = NULL;
 
 	memset(&ci->i_dir_layout, 0, sizeof(ci->i_dir_layout));
-	ci->i_pool_ns_len = 0;
+	RCU_INIT_POINTER(ci->i_layout.pool_ns, NULL);
 
 	ci->i_fragtree = RB_ROOT;
 	mutex_init(&ci->i_fragtree_mutex);
@@ -468,7 +468,7 @@
 	INIT_LIST_HEAD(&ci->i_dirty_item);
 	INIT_LIST_HEAD(&ci->i_flushing_item);
 	ci->i_prealloc_cap_flush = NULL;
-	ci->i_cap_flush_tree = RB_ROOT;
+	INIT_LIST_HEAD(&ci->i_cap_flush_list);
 	init_waitqueue_head(&ci->i_cap_wq);
 	ci->i_hold_caps_min = 0;
 	ci->i_hold_caps_max = 0;
@@ -477,7 +477,7 @@
 	ci->i_head_snapc = NULL;
 	ci->i_snap_caps = 0;
 
-	for (i = 0; i < CEPH_FILE_MODE_NUM; i++)
+	for (i = 0; i < CEPH_FILE_MODE_BITS; i++)
 		ci->i_nr_by_mode[i] = 0;
 
 	mutex_init(&ci->i_truncate_mutex);
@@ -570,6 +570,8 @@
 	if (ci->i_xattrs.prealloc_blob)
 		ceph_buffer_put(ci->i_xattrs.prealloc_blob);
 
+	ceph_put_string(rcu_dereference_raw(ci->i_layout.pool_ns));
+
 	call_rcu(&inode->i_rcu, ceph_i_callback);
 }
 
@@ -583,6 +585,14 @@
 	return 1;
 }
 
+void ceph_evict_inode(struct inode *inode)
+{
+	/* wait unsafe sync writes */
+	ceph_sync_write_wait(inode);
+	truncate_inode_pages_final(&inode->i_data);
+	clear_inode(inode);
+}
+
 static inline blkcnt_t calc_inode_blocks(u64 size)
 {
 	return (size + (1<<9) - 1) >> 9;
@@ -733,6 +743,7 @@
 	int issued = 0, implemented, new_issued;
 	struct timespec mtime, atime, ctime;
 	struct ceph_buffer *xattr_blob = NULL;
+	struct ceph_string *pool_ns = NULL;
 	struct ceph_cap *new_cap = NULL;
 	int err = 0;
 	bool wake = false;
@@ -760,6 +771,10 @@
 			       iinfo->xattr_len);
 	}
 
+	if (iinfo->pool_ns_len > 0)
+		pool_ns = ceph_find_or_create_string(iinfo->pool_ns_data,
+						     iinfo->pool_ns_len);
+
 	spin_lock(&ci->i_ceph_lock);
 
 	/*
@@ -814,10 +829,18 @@
 
 	if (new_version ||
 	    (new_issued & (CEPH_CAP_ANY_FILE_RD | CEPH_CAP_ANY_FILE_WR))) {
-		if (ci->i_layout.fl_pg_pool != info->layout.fl_pg_pool)
+		s64 old_pool = ci->i_layout.pool_id;
+		struct ceph_string *old_ns;
+
+		ceph_file_layout_from_legacy(&ci->i_layout, &info->layout);
+		old_ns = rcu_dereference_protected(ci->i_layout.pool_ns,
+					lockdep_is_held(&ci->i_ceph_lock));
+		rcu_assign_pointer(ci->i_layout.pool_ns, pool_ns);
+
+		if (ci->i_layout.pool_id != old_pool || pool_ns != old_ns)
 			ci->i_ceph_flags &= ~CEPH_I_POOL_PERM;
-		ci->i_layout = info->layout;
-		ci->i_pool_ns_len = iinfo->pool_ns_len;
+
+		pool_ns = old_ns;
 
 		queue_trunc = ceph_fill_file_size(inode, issued,
 					le32_to_cpu(info->truncate_seq),
@@ -985,6 +1008,7 @@
 		ceph_put_cap(mdsc, new_cap);
 	if (xattr_blob)
 		ceph_buffer_put(xattr_blob);
+	ceph_put_string(pool_ns);
 	return err;
 }
 
@@ -1018,7 +1042,7 @@
 		goto out_unlock;
 
 	if (di->lease_gen == session->s_cap_gen &&
-	    time_before(ttl, dentry->d_time))
+	    time_before(ttl, di->time))
 		goto out_unlock;  /* we already have a newer lease. */
 
 	if (di->lease_session && di->lease_session != session)
@@ -1032,7 +1056,7 @@
 	di->lease_seq = le32_to_cpu(lease->seq);
 	di->lease_renew_after = half_ttl;
 	di->lease_renew_from = 0;
-	dentry->d_time = ttl;
+	di->time = ttl;
 out_unlock:
 	spin_unlock(&dentry->d_lock);
 	return;
diff --git a/fs/ceph/ioctl.c b/fs/ceph/ioctl.c
index be6b165..7d752d5 100644
--- a/fs/ceph/ioctl.c
+++ b/fs/ceph/ioctl.c
@@ -21,10 +21,10 @@
 
 	err = ceph_do_getattr(file_inode(file), CEPH_STAT_CAP_LAYOUT, false);
 	if (!err) {
-		l.stripe_unit = ceph_file_layout_su(ci->i_layout);
-		l.stripe_count = ceph_file_layout_stripe_count(ci->i_layout);
-		l.object_size = ceph_file_layout_object_size(ci->i_layout);
-		l.data_pool = le32_to_cpu(ci->i_layout.fl_pg_pool);
+		l.stripe_unit = ci->i_layout.stripe_unit;
+		l.stripe_count = ci->i_layout.stripe_count;
+		l.object_size = ci->i_layout.object_size;
+		l.data_pool = ci->i_layout.pool_id;
 		l.preferred_osd = (s32)-1;
 		if (copy_to_user(arg, &l, sizeof(l)))
 			return -EFAULT;
@@ -82,19 +82,19 @@
 	if (l.stripe_count)
 		nl.stripe_count = l.stripe_count;
 	else
-		nl.stripe_count = ceph_file_layout_stripe_count(ci->i_layout);
+		nl.stripe_count = ci->i_layout.stripe_count;
 	if (l.stripe_unit)
 		nl.stripe_unit = l.stripe_unit;
 	else
-		nl.stripe_unit = ceph_file_layout_su(ci->i_layout);
+		nl.stripe_unit = ci->i_layout.stripe_unit;
 	if (l.object_size)
 		nl.object_size = l.object_size;
 	else
-		nl.object_size = ceph_file_layout_object_size(ci->i_layout);
+		nl.object_size = ci->i_layout.object_size;
 	if (l.data_pool)
 		nl.data_pool = l.data_pool;
 	else
-		nl.data_pool = ceph_file_layout_pg_pool(ci->i_layout);
+		nl.data_pool = ci->i_layout.pool_id;
 
 	/* this is obsolete, and always -1 */
 	nl.preferred_osd = le64_to_cpu(-1);
@@ -183,7 +183,7 @@
 	struct ceph_osd_client *osdc =
 		&ceph_sb_to_client(inode->i_sb)->client->osdc;
 	struct ceph_object_locator oloc;
-	struct ceph_object_id oid;
+	CEPH_DEFINE_OID_ONSTACK(oid);
 	u64 len = 1, olen;
 	u64 tmp;
 	struct ceph_pg pgid;
@@ -202,8 +202,8 @@
 		return -EIO;
 	}
 	dl.file_offset -= dl.object_offset;
-	dl.object_size = ceph_file_layout_object_size(ci->i_layout);
-	dl.block_size = ceph_file_layout_su(ci->i_layout);
+	dl.object_size = ci->i_layout.object_size;
+	dl.block_size = ci->i_layout.stripe_unit;
 
 	/* block_offset = object_offset % block_size */
 	tmp = dl.object_offset;
@@ -212,10 +212,13 @@
 	snprintf(dl.object_name, sizeof(dl.object_name), "%llx.%08llx",
 		 ceph_ino(inode), dl.object_no);
 
-	oloc.pool = ceph_file_layout_pg_pool(ci->i_layout);
+	oloc.pool = ci->i_layout.pool_id;
+	oloc.pool_ns = ceph_try_get_string(ci->i_layout.pool_ns);
 	ceph_oid_printf(&oid, "%s", dl.object_name);
 
 	r = ceph_object_locator_to_pg(osdc->osdmap, &oid, &oloc, &pgid);
+
+	ceph_oloc_destroy(&oloc);
 	if (r < 0) {
 		up_read(&osdc->lock);
 		return r;
@@ -247,9 +250,8 @@
 
 	if ((fi->fmode & CEPH_FILE_MODE_LAZY) == 0) {
 		spin_lock(&ci->i_ceph_lock);
-		ci->i_nr_by_mode[fi->fmode]--;
 		fi->fmode |= CEPH_FILE_MODE_LAZY;
-		ci->i_nr_by_mode[fi->fmode]++;
+		ci->i_nr_by_mode[ffs(CEPH_FILE_MODE_LAZY)]++;
 		spin_unlock(&ci->i_ceph_lock);
 		dout("ioctl_layzio: file %p marked lazy\n", file);
 
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index 4e8678a..fa59a85 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -48,7 +48,7 @@
 struct ceph_reconnect_state {
 	int nr_caps;
 	struct ceph_pagelist *pagelist;
-	bool flock;
+	unsigned msg_version;
 };
 
 static void __wake_requests(struct ceph_mds_client *mdsc,
@@ -100,12 +100,15 @@
 	} else
 		info->inline_version = CEPH_INLINE_NONE;
 
+	info->pool_ns_len = 0;
+	info->pool_ns_data = NULL;
 	if (features & CEPH_FEATURE_FS_FILE_LAYOUT_V2) {
 		ceph_decode_32_safe(p, end, info->pool_ns_len, bad);
-		ceph_decode_need(p, end, info->pool_ns_len, bad);
-		*p += info->pool_ns_len;
-	} else {
-		info->pool_ns_len = 0;
+		if (info->pool_ns_len > 0) {
+			ceph_decode_need(p, end, info->pool_ns_len, bad);
+			info->pool_ns_data = *p;
+			*p += info->pool_ns_len;
+		}
 	}
 
 	return 0;
@@ -469,7 +472,6 @@
 	s->s_cap_iterator = NULL;
 	INIT_LIST_HEAD(&s->s_cap_releases);
 	INIT_LIST_HEAD(&s->s_cap_flushing);
-	INIT_LIST_HEAD(&s->s_cap_snaps_flushing);
 
 	dout("register_session mds%d\n", mds);
 	if (mds >= mdsc->max_sessions) {
@@ -1145,19 +1147,17 @@
 		    ACCESS_ONCE(fsc->mount_state) == CEPH_MOUNT_SHUTDOWN)
 			invalidate = true;
 
-		while (true) {
-			struct rb_node *n = rb_first(&ci->i_cap_flush_tree);
-			if (!n)
-				break;
-			cf = rb_entry(n, struct ceph_cap_flush, i_node);
-			rb_erase(&cf->i_node, &ci->i_cap_flush_tree);
-			list_add(&cf->list, &to_remove);
+		while (!list_empty(&ci->i_cap_flush_list)) {
+			cf = list_first_entry(&ci->i_cap_flush_list,
+					      struct ceph_cap_flush, i_list);
+			list_del(&cf->i_list);
+			list_add(&cf->i_list, &to_remove);
 		}
 
 		spin_lock(&mdsc->cap_dirty_lock);
 
-		list_for_each_entry(cf, &to_remove, list)
-			rb_erase(&cf->g_node, &mdsc->cap_flush_tree);
+		list_for_each_entry(cf, &to_remove, i_list)
+			list_del(&cf->g_list);
 
 		if (!list_empty(&ci->i_dirty_item)) {
 			pr_warn_ratelimited(
@@ -1181,7 +1181,7 @@
 		spin_unlock(&mdsc->cap_dirty_lock);
 
 		if (!ci->i_dirty_caps && ci->i_prealloc_cap_flush) {
-			list_add(&ci->i_prealloc_cap_flush->list, &to_remove);
+			list_add(&ci->i_prealloc_cap_flush->i_list, &to_remove);
 			ci->i_prealloc_cap_flush = NULL;
 		}
 	}
@@ -1189,8 +1189,8 @@
 	while (!list_empty(&to_remove)) {
 		struct ceph_cap_flush *cf;
 		cf = list_first_entry(&to_remove,
-				      struct ceph_cap_flush, list);
-		list_del(&cf->list);
+				      struct ceph_cap_flush, i_list);
+		list_del(&cf->i_list);
 		ceph_free_cap_flush(cf);
 	}
 
@@ -1212,6 +1212,8 @@
 	dout("remove_session_caps on %p\n", session);
 	iterate_session_caps(session, remove_session_caps_cb, fsc);
 
+	wake_up_all(&fsc->mdsc->cap_flushing_wq);
+
 	spin_lock(&session->s_cap_lock);
 	if (session->s_nr_caps > 0) {
 		struct inode *inode;
@@ -1478,35 +1480,21 @@
 	return 0;
 }
 
-static int check_capsnap_flush(struct ceph_inode_info *ci,
-			       u64 want_snap_seq)
-{
-	int ret = 1;
-	spin_lock(&ci->i_ceph_lock);
-	if (want_snap_seq > 0 && !list_empty(&ci->i_cap_snaps)) {
-		struct ceph_cap_snap *capsnap =
-			list_first_entry(&ci->i_cap_snaps,
-					 struct ceph_cap_snap, ci_item);
-		ret = capsnap->follows >= want_snap_seq;
-	}
-	spin_unlock(&ci->i_ceph_lock);
-	return ret;
-}
-
 static int check_caps_flush(struct ceph_mds_client *mdsc,
 			    u64 want_flush_tid)
 {
-	struct rb_node *n;
-	struct ceph_cap_flush *cf;
 	int ret = 1;
 
 	spin_lock(&mdsc->cap_dirty_lock);
-	n = rb_first(&mdsc->cap_flush_tree);
-	cf = n ? rb_entry(n, struct ceph_cap_flush, g_node) : NULL;
-	if (cf && cf->tid <= want_flush_tid) {
-		dout("check_caps_flush still flushing tid %llu <= %llu\n",
-		     cf->tid, want_flush_tid);
-		ret = 0;
+	if (!list_empty(&mdsc->cap_flush_list)) {
+		struct ceph_cap_flush *cf =
+			list_first_entry(&mdsc->cap_flush_list,
+					 struct ceph_cap_flush, g_list);
+		if (cf->tid <= want_flush_tid) {
+			dout("check_caps_flush still flushing tid "
+			     "%llu <= %llu\n", cf->tid, want_flush_tid);
+			ret = 0;
+		}
 	}
 	spin_unlock(&mdsc->cap_dirty_lock);
 	return ret;
@@ -1518,54 +1506,9 @@
  * returns true if we've flushed through want_flush_tid
  */
 static void wait_caps_flush(struct ceph_mds_client *mdsc,
-			    u64 want_flush_tid, u64 want_snap_seq)
+			    u64 want_flush_tid)
 {
-	int mds;
-
-	dout("check_caps_flush want %llu snap want %llu\n",
-	     want_flush_tid, want_snap_seq);
-	mutex_lock(&mdsc->mutex);
-	for (mds = 0; mds < mdsc->max_sessions; ) {
-		struct ceph_mds_session *session = mdsc->sessions[mds];
-		struct inode *inode = NULL;
-
-		if (!session) {
-			mds++;
-			continue;
-		}
-		get_session(session);
-		mutex_unlock(&mdsc->mutex);
-
-		mutex_lock(&session->s_mutex);
-		if (!list_empty(&session->s_cap_snaps_flushing)) {
-			struct ceph_cap_snap *capsnap =
-				list_first_entry(&session->s_cap_snaps_flushing,
-						 struct ceph_cap_snap,
-						 flushing_item);
-			struct ceph_inode_info *ci = capsnap->ci;
-			if (!check_capsnap_flush(ci, want_snap_seq)) {
-				dout("check_cap_flush still flushing snap %p "
-				     "follows %lld <= %lld to mds%d\n",
-				     &ci->vfs_inode, capsnap->follows,
-				     want_snap_seq, mds);
-				inode = igrab(&ci->vfs_inode);
-			}
-		}
-		mutex_unlock(&session->s_mutex);
-		ceph_put_mds_session(session);
-
-		if (inode) {
-			wait_event(mdsc->cap_flushing_wq,
-				   check_capsnap_flush(ceph_inode(inode),
-						       want_snap_seq));
-			iput(inode);
-		} else {
-			mds++;
-		}
-
-		mutex_lock(&mdsc->mutex);
-	}
-	mutex_unlock(&mdsc->mutex);
+	dout("check_caps_flush want %llu\n", want_flush_tid);
 
 	wait_event(mdsc->cap_flushing_wq,
 		   check_caps_flush(mdsc, want_flush_tid));
@@ -2163,6 +2106,11 @@
 	mds = __choose_mds(mdsc, req);
 	if (mds < 0 ||
 	    ceph_mdsmap_get_state(mdsc->mdsmap, mds) < CEPH_MDS_STATE_ACTIVE) {
+		if (mdsc->mdsmap_err) {
+			err = mdsc->mdsmap_err;
+			dout("do_request mdsmap err %d\n", err);
+			goto finish;
+		}
 		dout("do_request no mds or not active, waiting for map\n");
 		list_add(&req->r_wait, &mdsc->waiting_for_map);
 		goto out;
@@ -2292,14 +2240,6 @@
 		ceph_get_cap_refs(ceph_inode(req->r_old_dentry_dir),
 				  CEPH_CAP_PIN);
 
-	/* deny access to directories with pool_ns layouts */
-	if (req->r_inode && S_ISDIR(req->r_inode->i_mode) &&
-	    ceph_inode(req->r_inode)->i_pool_ns_len)
-		return -EIO;
-	if (req->r_locked_dir &&
-	    ceph_inode(req->r_locked_dir)->i_pool_ns_len)
-		return -EIO;
-
 	/* issue */
 	mutex_lock(&mdsc->mutex);
 	__register_request(mdsc, req, dir);
@@ -2791,13 +2731,13 @@
 		struct ceph_mds_cap_reconnect v2;
 		struct ceph_mds_cap_reconnect_v1 v1;
 	} rec;
-	size_t reclen;
 	struct ceph_inode_info *ci;
 	struct ceph_reconnect_state *recon_state = arg;
 	struct ceph_pagelist *pagelist = recon_state->pagelist;
 	char *path;
 	int pathlen, err;
 	u64 pathbase;
+	u64 snap_follows;
 	struct dentry *dentry;
 
 	ci = cap->ci;
@@ -2820,9 +2760,6 @@
 		path = NULL;
 		pathlen = 0;
 	}
-	err = ceph_pagelist_encode_string(pagelist, path, pathlen);
-	if (err)
-		goto out_free;
 
 	spin_lock(&ci->i_ceph_lock);
 	cap->seq = 0;        /* reset cap seq */
@@ -2830,14 +2767,13 @@
 	cap->mseq = 0;       /* and migrate_seq */
 	cap->cap_gen = cap->session->s_cap_gen;
 
-	if (recon_state->flock) {
+	if (recon_state->msg_version >= 2) {
 		rec.v2.cap_id = cpu_to_le64(cap->cap_id);
 		rec.v2.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
 		rec.v2.issued = cpu_to_le32(cap->issued);
 		rec.v2.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
 		rec.v2.pathbase = cpu_to_le64(pathbase);
 		rec.v2.flock_len = 0;
-		reclen = sizeof(rec.v2);
 	} else {
 		rec.v1.cap_id = cpu_to_le64(cap->cap_id);
 		rec.v1.wanted = cpu_to_le32(__ceph_caps_wanted(ci));
@@ -2847,13 +2783,23 @@
 		ceph_encode_timespec(&rec.v1.atime, &inode->i_atime);
 		rec.v1.snaprealm = cpu_to_le64(ci->i_snap_realm->ino);
 		rec.v1.pathbase = cpu_to_le64(pathbase);
-		reclen = sizeof(rec.v1);
+	}
+
+	if (list_empty(&ci->i_cap_snaps)) {
+		snap_follows = 0;
+	} else {
+		struct ceph_cap_snap *capsnap =
+			list_first_entry(&ci->i_cap_snaps,
+					 struct ceph_cap_snap, ci_item);
+		snap_follows = capsnap->follows;
 	}
 	spin_unlock(&ci->i_ceph_lock);
 
-	if (recon_state->flock) {
+	if (recon_state->msg_version >= 2) {
 		int num_fcntl_locks, num_flock_locks;
 		struct ceph_filelock *flocks;
+		size_t struct_len, total_len = 0;
+		u8 struct_v = 0;
 
 encode_again:
 		ceph_count_locks(inode, &num_fcntl_locks, &num_flock_locks);
@@ -2872,20 +2818,51 @@
 				goto encode_again;
 			goto out_free;
 		}
+
+		if (recon_state->msg_version >= 3) {
+			/* version, compat_version and struct_len */
+			total_len = 2 * sizeof(u8) + sizeof(u32);
+			struct_v = 2;
+		}
 		/*
 		 * number of encoded locks is stable, so copy to pagelist
 		 */
-		rec.v2.flock_len = cpu_to_le32(2*sizeof(u32) +
-				    (num_fcntl_locks+num_flock_locks) *
-				    sizeof(struct ceph_filelock));
-		err = ceph_pagelist_append(pagelist, &rec, reclen);
-		if (!err)
-			err = ceph_locks_to_pagelist(flocks, pagelist,
-						     num_fcntl_locks,
-						     num_flock_locks);
+		struct_len = 2 * sizeof(u32) +
+			    (num_fcntl_locks + num_flock_locks) *
+			    sizeof(struct ceph_filelock);
+		rec.v2.flock_len = cpu_to_le32(struct_len);
+
+		struct_len += sizeof(rec.v2);
+		struct_len += sizeof(u32) + pathlen;
+
+		if (struct_v >= 2)
+			struct_len += sizeof(u64); /* snap_follows */
+
+		total_len += struct_len;
+		err = ceph_pagelist_reserve(pagelist, total_len);
+
+		if (!err) {
+			if (recon_state->msg_version >= 3) {
+				ceph_pagelist_encode_8(pagelist, struct_v);
+				ceph_pagelist_encode_8(pagelist, 1);
+				ceph_pagelist_encode_32(pagelist, struct_len);
+			}
+			ceph_pagelist_encode_string(pagelist, path, pathlen);
+			ceph_pagelist_append(pagelist, &rec, sizeof(rec.v2));
+			ceph_locks_to_pagelist(flocks, pagelist,
+					       num_fcntl_locks,
+					       num_flock_locks);
+			if (struct_v >= 2)
+				ceph_pagelist_encode_64(pagelist, snap_follows);
+		}
 		kfree(flocks);
 	} else {
-		err = ceph_pagelist_append(pagelist, &rec, reclen);
+		size_t size = sizeof(u32) + pathlen + sizeof(rec.v1);
+		err = ceph_pagelist_reserve(pagelist, size);
+		if (!err) {
+			ceph_pagelist_encode_string(pagelist, path, pathlen);
+			ceph_pagelist_append(pagelist, &rec, sizeof(rec.v1));
+		}
 	}
 
 	recon_state->nr_caps++;
@@ -2976,7 +2953,12 @@
 
 	recon_state.nr_caps = 0;
 	recon_state.pagelist = pagelist;
-	recon_state.flock = session->s_con.peer_features & CEPH_FEATURE_FLOCK;
+	if (session->s_con.peer_features & CEPH_FEATURE_MDSENC)
+		recon_state.msg_version = 3;
+	else if (session->s_con.peer_features & CEPH_FEATURE_FLOCK)
+		recon_state.msg_version = 2;
+	else
+		recon_state.msg_version = 1;
 	err = iterate_session_caps(session, encode_caps_cb, &recon_state);
 	if (err < 0)
 		goto fail;
@@ -3005,8 +2987,7 @@
 			goto fail;
 	}
 
-	if (recon_state.flock)
-		reply->hdr.version = cpu_to_le16(2);
+	reply->hdr.version = cpu_to_le16(recon_state.msg_version);
 
 	/* raced with cap release? */
 	if (s_nr_caps != recon_state.nr_caps) {
@@ -3231,7 +3212,7 @@
 				msecs_to_jiffies(le32_to_cpu(h->duration_ms));
 
 			di->lease_seq = seq;
-			dentry->d_time = di->lease_renew_from + duration;
+			di->time = di->lease_renew_from + duration;
 			di->lease_renew_after = di->lease_renew_from +
 				(duration >> 1);
 			di->lease_renew_from = 0;
@@ -3297,47 +3278,6 @@
 }
 
 /*
- * Preemptively release a lease we expect to invalidate anyway.
- * Pass @inode always, @dentry is optional.
- */
-void ceph_mdsc_lease_release(struct ceph_mds_client *mdsc, struct inode *inode,
-			     struct dentry *dentry)
-{
-	struct ceph_dentry_info *di;
-	struct ceph_mds_session *session;
-	u32 seq;
-
-	BUG_ON(inode == NULL);
-	BUG_ON(dentry == NULL);
-
-	/* is dentry lease valid? */
-	spin_lock(&dentry->d_lock);
-	di = ceph_dentry(dentry);
-	if (!di || !di->lease_session ||
-	    di->lease_session->s_mds < 0 ||
-	    di->lease_gen != di->lease_session->s_cap_gen ||
-	    !time_before(jiffies, dentry->d_time)) {
-		dout("lease_release inode %p dentry %p -- "
-		     "no lease\n",
-		     inode, dentry);
-		spin_unlock(&dentry->d_lock);
-		return;
-	}
-
-	/* we do have a lease on this dentry; note mds and seq */
-	session = ceph_get_mds_session(di->lease_session);
-	seq = di->lease_seq;
-	__ceph_mdsc_drop_dentry_lease(dentry);
-	spin_unlock(&dentry->d_lock);
-
-	dout("lease_release inode %p dentry %p to mds%d\n",
-	     inode, dentry, session->s_mds);
-	ceph_mdsc_lease_send_msg(session, inode, dentry,
-				 CEPH_MDS_LEASE_RELEASE, seq);
-	ceph_put_mds_session(session);
-}
-
-/*
  * drop all leases (and dentry refs) in preparation for umount
  */
 static void drop_leases(struct ceph_mds_client *mdsc)
@@ -3470,7 +3410,7 @@
 	INIT_LIST_HEAD(&mdsc->snap_flush_list);
 	spin_lock_init(&mdsc->snap_flush_lock);
 	mdsc->last_cap_flush_tid = 1;
-	mdsc->cap_flush_tree = RB_ROOT;
+	INIT_LIST_HEAD(&mdsc->cap_flush_list);
 	INIT_LIST_HEAD(&mdsc->cap_dirty);
 	INIT_LIST_HEAD(&mdsc->cap_dirty_migrating);
 	mdsc->num_cap_flushing = 0;
@@ -3585,7 +3525,7 @@
 
 void ceph_mdsc_sync(struct ceph_mds_client *mdsc)
 {
-	u64 want_tid, want_flush, want_snap;
+	u64 want_tid, want_flush;
 
 	if (ACCESS_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_SHUTDOWN)
 		return;
@@ -3598,17 +3538,19 @@
 	ceph_flush_dirty_caps(mdsc);
 	spin_lock(&mdsc->cap_dirty_lock);
 	want_flush = mdsc->last_cap_flush_tid;
+	if (!list_empty(&mdsc->cap_flush_list)) {
+		struct ceph_cap_flush *cf =
+			list_last_entry(&mdsc->cap_flush_list,
+					struct ceph_cap_flush, g_list);
+		cf->wake = true;
+	}
 	spin_unlock(&mdsc->cap_dirty_lock);
 
-	down_read(&mdsc->snap_rwsem);
-	want_snap = mdsc->last_snap_seq;
-	up_read(&mdsc->snap_rwsem);
-
-	dout("sync want tid %lld flush_seq %lld snap_seq %lld\n",
-	     want_tid, want_flush, want_snap);
+	dout("sync want tid %lld flush_seq %lld\n",
+	     want_tid, want_flush);
 
 	wait_unsafe_requests(mdsc, want_tid);
-	wait_caps_flush(mdsc, want_flush, want_snap);
+	wait_caps_flush(mdsc, want_flush);
 }
 
 /*
@@ -3729,11 +3671,86 @@
 	dout("mdsc_destroy %p done\n", mdsc);
 }
 
+void ceph_mdsc_handle_fsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
+{
+	struct ceph_fs_client *fsc = mdsc->fsc;
+	const char *mds_namespace = fsc->mount_options->mds_namespace;
+	void *p = msg->front.iov_base;
+	void *end = p + msg->front.iov_len;
+	u32 epoch;
+	u32 map_len;
+	u32 num_fs;
+	u32 mount_fscid = (u32)-1;
+	u8 struct_v, struct_cv;
+	int err = -EINVAL;
+
+	ceph_decode_need(&p, end, sizeof(u32), bad);
+	epoch = ceph_decode_32(&p);
+
+	dout("handle_fsmap epoch %u\n", epoch);
+
+	ceph_decode_need(&p, end, 2 + sizeof(u32), bad);
+	struct_v = ceph_decode_8(&p);
+	struct_cv = ceph_decode_8(&p);
+	map_len = ceph_decode_32(&p);
+
+	ceph_decode_need(&p, end, sizeof(u32) * 3, bad);
+	p += sizeof(u32) * 2; /* skip epoch and legacy_client_fscid */
+
+	num_fs = ceph_decode_32(&p);
+	while (num_fs-- > 0) {
+		void *info_p, *info_end;
+		u32 info_len;
+		u8 info_v, info_cv;
+		u32 fscid, namelen;
+
+		ceph_decode_need(&p, end, 2 + sizeof(u32), bad);
+		info_v = ceph_decode_8(&p);
+		info_cv = ceph_decode_8(&p);
+		info_len = ceph_decode_32(&p);
+		ceph_decode_need(&p, end, info_len, bad);
+		info_p = p;
+		info_end = p + info_len;
+		p = info_end;
+
+		ceph_decode_need(&info_p, info_end, sizeof(u32) * 2, bad);
+		fscid = ceph_decode_32(&info_p);
+		namelen = ceph_decode_32(&info_p);
+		ceph_decode_need(&info_p, info_end, namelen, bad);
+
+		if (mds_namespace &&
+		    strlen(mds_namespace) == namelen &&
+		    !strncmp(mds_namespace, (char *)info_p, namelen)) {
+			mount_fscid = fscid;
+			break;
+		}
+	}
+
+	ceph_monc_got_map(&fsc->client->monc, CEPH_SUB_FSMAP, epoch);
+	if (mount_fscid != (u32)-1) {
+		fsc->client->monc.fs_cluster_id = mount_fscid;
+		ceph_monc_want_map(&fsc->client->monc, CEPH_SUB_MDSMAP,
+				   0, true);
+		ceph_monc_renew_subs(&fsc->client->monc);
+	} else {
+		err = -ENOENT;
+		goto err_out;
+	}
+	return;
+bad:
+	pr_err("error decoding fsmap\n");
+err_out:
+	mutex_lock(&mdsc->mutex);
+	mdsc->mdsmap_err = -ENOENT;
+	__wake_requests(mdsc, &mdsc->waiting_for_map);
+	mutex_unlock(&mdsc->mutex);
+	return;
+}
 
 /*
  * handle mds map update.
  */
-void ceph_mdsc_handle_map(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
+void ceph_mdsc_handle_mdsmap(struct ceph_mds_client *mdsc, struct ceph_msg *msg)
 {
 	u32 epoch;
 	u32 maplen;
@@ -3840,7 +3857,10 @@
 
 	switch (type) {
 	case CEPH_MSG_MDS_MAP:
-		ceph_mdsc_handle_map(mdsc, msg);
+		ceph_mdsc_handle_mdsmap(mdsc, msg);
+		break;
+	case CEPH_MSG_FS_MAP_USER:
+		ceph_mdsc_handle_fsmap(mdsc, msg);
 		break;
 	case CEPH_MSG_CLIENT_SESSION:
 		handle_session(s, msg);
diff --git a/fs/ceph/mds_client.h b/fs/ceph/mds_client.h
index e7d38aa..6b36797 100644
--- a/fs/ceph/mds_client.h
+++ b/fs/ceph/mds_client.h
@@ -45,6 +45,7 @@
 	u32 inline_len;
 	char *inline_data;
 	u32 pool_ns_len;
+	char *pool_ns_data;
 };
 
 struct ceph_mds_reply_dir_entry {
@@ -151,7 +152,6 @@
 
 	/* protected by mutex */
 	struct list_head  s_cap_flushing;     /* inodes w/ flushing caps */
-	struct list_head  s_cap_snaps_flushing;
 	unsigned long     s_renew_requested; /* last time we sent a renew req */
 	u64               s_renew_seq;
 
@@ -275,8 +275,10 @@
 
 struct ceph_pool_perm {
 	struct rb_node node;
-	u32 pool;
 	int perm;
+	s64 pool;
+	size_t pool_ns_len;
+	char pool_ns[];
 };
 
 /*
@@ -290,6 +292,7 @@
 	struct completion       safe_umount_waiters;
 	wait_queue_head_t       session_close_wq;
 	struct list_head        waiting_for_map;
+	int 			mdsmap_err;
 
 	struct ceph_mds_session **sessions;    /* NULL for mds if no session */
 	atomic_t		num_sessions;
@@ -321,7 +324,7 @@
 	spinlock_t       snap_flush_lock;
 
 	u64               last_cap_flush_tid;
-	struct rb_root    cap_flush_tree;
+	struct list_head  cap_flush_list;
 	struct list_head  cap_dirty;        /* inodes with dirty caps */
 	struct list_head  cap_dirty_migrating; /* ...that are migration... */
 	int               num_cap_flushing; /* # caps we are flushing */
@@ -382,10 +385,6 @@
 
 extern void ceph_mdsc_sync(struct ceph_mds_client *mdsc);
 
-extern void ceph_mdsc_lease_release(struct ceph_mds_client *mdsc,
-				    struct inode *inode,
-				    struct dentry *dn);
-
 extern void ceph_invalidate_dir_request(struct ceph_mds_request *req);
 extern int ceph_alloc_readdir_reply_buffer(struct ceph_mds_request *req,
 					   struct inode *dir);
@@ -420,8 +419,10 @@
 				     struct dentry *dentry, char action,
 				     u32 seq);
 
-extern void ceph_mdsc_handle_map(struct ceph_mds_client *mdsc,
-				 struct ceph_msg *msg);
+extern void ceph_mdsc_handle_mdsmap(struct ceph_mds_client *mdsc,
+				    struct ceph_msg *msg);
+extern void ceph_mdsc_handle_fsmap(struct ceph_mds_client *mdsc,
+				   struct ceph_msg *msg);
 
 extern struct ceph_mds_session *
 ceph_mdsc_open_export_target_session(struct ceph_mds_client *mdsc, int target);
diff --git a/fs/ceph/snap.c b/fs/ceph/snap.c
index 9caaa7f..9ff5219 100644
--- a/fs/ceph/snap.c
+++ b/fs/ceph/snap.c
@@ -520,9 +520,7 @@
 	ihold(inode);
 
 	atomic_set(&capsnap->nref, 1);
-	capsnap->ci = ci;
 	INIT_LIST_HEAD(&capsnap->ci_item);
-	INIT_LIST_HEAD(&capsnap->flushing_item);
 
 	capsnap->follows = old_snapc->seq;
 	capsnap->issued = __ceph_caps_issued(ci, NULL);
@@ -551,7 +549,6 @@
 	ci->i_wrbuffer_ref_head = 0;
 	capsnap->context = old_snapc;
 	list_add_tail(&capsnap->ci_item, &ci->i_cap_snaps);
-	old_snapc = NULL;
 
 	if (used & CEPH_CAP_FILE_WR) {
 		dout("queue_cap_snap %p cap_snap %p snapc %p"
@@ -563,6 +560,7 @@
 		__ceph_finish_cap_snap(ci, capsnap);
 	}
 	capsnap = NULL;
+	old_snapc = NULL;
 
 update_snapc:
 	if (ci->i_head_snapc) {
@@ -603,6 +601,8 @@
 		     capsnap->dirty_pages);
 		return 0;
 	}
+
+	ci->i_ceph_flags |= CEPH_I_FLUSH_SNAPS;
 	dout("finish_cap_snap %p cap_snap %p snapc %p %llu %s s=%llu\n",
 	     inode, capsnap, capsnap->context,
 	     capsnap->context->seq, ceph_cap_string(capsnap->dirty),
@@ -799,9 +799,7 @@
 		inode = &ci->vfs_inode;
 		ihold(inode);
 		spin_unlock(&mdsc->snap_flush_lock);
-		spin_lock(&ci->i_ceph_lock);
-		__ceph_flush_snaps(ci, &session, 0);
-		spin_unlock(&ci->i_ceph_lock);
+		ceph_flush_snaps(ci, &session);
 		iput(inode);
 		spin_lock(&mdsc->snap_flush_lock);
 	}
diff --git a/fs/ceph/super.c b/fs/ceph/super.c
index 91e0248..e247f6f 100644
--- a/fs/ceph/super.c
+++ b/fs/ceph/super.c
@@ -108,7 +108,6 @@
  * mount options
  */
 enum {
-	Opt_mds_namespace,
 	Opt_wsize,
 	Opt_rsize,
 	Opt_rasize,
@@ -121,6 +120,7 @@
 	Opt_last_int,
 	/* int args above */
 	Opt_snapdirname,
+	Opt_mds_namespace,
 	Opt_last_string,
 	/* string args above */
 	Opt_dirstat,
@@ -144,7 +144,6 @@
 };
 
 static match_table_t fsopt_tokens = {
-	{Opt_mds_namespace, "mds_namespace=%d"},
 	{Opt_wsize, "wsize=%d"},
 	{Opt_rsize, "rsize=%d"},
 	{Opt_rasize, "rasize=%d"},
@@ -156,6 +155,7 @@
 	{Opt_congestion_kb, "write_congestion_kb=%d"},
 	/* int args above */
 	{Opt_snapdirname, "snapdirname=%s"},
+	{Opt_mds_namespace, "mds_namespace=%s"},
 	/* string args above */
 	{Opt_dirstat, "dirstat"},
 	{Opt_nodirstat, "nodirstat"},
@@ -212,11 +212,14 @@
 		if (!fsopt->snapdir_name)
 			return -ENOMEM;
 		break;
-
-		/* misc */
 	case Opt_mds_namespace:
-		fsopt->mds_namespace = intval;
+		fsopt->mds_namespace = kstrndup(argstr[0].from,
+						argstr[0].to-argstr[0].from,
+						GFP_KERNEL);
+		if (!fsopt->mds_namespace)
+			return -ENOMEM;
 		break;
+		/* misc */
 	case Opt_wsize:
 		fsopt->wsize = intval;
 		break;
@@ -302,6 +305,7 @@
 {
 	dout("destroy_mount_options %p\n", args);
 	kfree(args->snapdir_name);
+	kfree(args->mds_namespace);
 	kfree(args->server_path);
 	kfree(args);
 }
@@ -333,6 +337,9 @@
 	ret = strcmp_null(fsopt1->snapdir_name, fsopt2->snapdir_name);
 	if (ret)
 		return ret;
+	ret = strcmp_null(fsopt1->mds_namespace, fsopt2->mds_namespace);
+	if (ret)
+		return ret;
 
 	ret = strcmp_null(fsopt1->server_path, fsopt2->server_path);
 	if (ret)
@@ -376,7 +383,6 @@
 	fsopt->max_readdir = CEPH_MAX_READDIR_DEFAULT;
 	fsopt->max_readdir_bytes = CEPH_MAX_READDIR_BYTES_DEFAULT;
 	fsopt->congestion_kb = default_congestion_kb();
-	fsopt->mds_namespace = CEPH_FS_CLUSTER_ID_NONE;
 
 	/*
 	 * Distinguish the server list from the path in "dev_name".
@@ -469,8 +475,8 @@
 		seq_puts(m, ",noacl");
 #endif
 
-	if (fsopt->mds_namespace != CEPH_FS_CLUSTER_ID_NONE)
-		seq_printf(m, ",mds_namespace=%d", fsopt->mds_namespace);
+	if (fsopt->mds_namespace)
+		seq_printf(m, ",mds_namespace=%s", fsopt->mds_namespace);
 	if (fsopt->wsize)
 		seq_printf(m, ",wsize=%d", fsopt->wsize);
 	if (fsopt->rsize != CEPH_RSIZE_DEFAULT)
@@ -509,9 +515,11 @@
 
 	switch (type) {
 	case CEPH_MSG_MDS_MAP:
-		ceph_mdsc_handle_map(fsc->mdsc, msg);
+		ceph_mdsc_handle_mdsmap(fsc->mdsc, msg);
 		return 0;
-
+	case CEPH_MSG_FS_MAP_USER:
+		ceph_mdsc_handle_fsmap(fsc->mdsc, msg);
+		return 0;
 	default:
 		return -1;
 	}
@@ -543,8 +551,14 @@
 		goto fail;
 	}
 	fsc->client->extra_mon_dispatch = extra_mon_dispatch;
-	fsc->client->monc.fs_cluster_id = fsopt->mds_namespace;
-	ceph_monc_want_map(&fsc->client->monc, CEPH_SUB_MDSMAP, 0, true);
+
+	if (fsopt->mds_namespace == NULL) {
+		ceph_monc_want_map(&fsc->client->monc, CEPH_SUB_MDSMAP,
+				   0, true);
+	} else {
+		ceph_monc_want_map(&fsc->client->monc, CEPH_SUB_FSMAP,
+				   0, false);
+	}
 
 	fsc->mount_options = fsopt;
 
@@ -672,8 +686,8 @@
 	if (ceph_dentry_cachep == NULL)
 		goto bad_dentry;
 
-	ceph_file_cachep = KMEM_CACHE(ceph_file_info,
-				      SLAB_RECLAIM_ACCOUNT|SLAB_MEM_SPREAD);
+	ceph_file_cachep = KMEM_CACHE(ceph_file_info, SLAB_MEM_SPREAD);
+
 	if (ceph_file_cachep == NULL)
 		goto bad_file;
 
@@ -731,6 +745,7 @@
 	.destroy_inode	= ceph_destroy_inode,
 	.write_inode    = ceph_write_inode,
 	.drop_inode	= ceph_drop_inode,
+	.evict_inode	= ceph_evict_inode,
 	.sync_fs        = ceph_sync_fs,
 	.put_super	= ceph_put_super,
 	.show_options   = ceph_show_options,
diff --git a/fs/ceph/super.h b/fs/ceph/super.h
index 0168b49..3e3fa916 100644
--- a/fs/ceph/super.h
+++ b/fs/ceph/super.h
@@ -62,7 +62,6 @@
 	int cap_release_safety;
 	int max_readdir;       /* max readdir result (entires) */
 	int max_readdir_bytes; /* max readdir result (bytes) */
-	int mds_namespace;
 
 	/*
 	 * everything above this point can be memcmp'd; everything below
@@ -70,6 +69,7 @@
 	 */
 
 	char *snapdir_name;   /* default ".snap" */
+	char *mds_namespace;  /* default NULL */
 	char *server_path;    /* default  "/" */
 };
 
@@ -147,6 +147,14 @@
 #define CHECK_CAPS_AUTHONLY   2  /* only check auth cap */
 #define CHECK_CAPS_FLUSH      4  /* flush any dirty caps */
 
+struct ceph_cap_flush {
+	u64 tid;
+	int caps; /* 0 means capsnap */
+	bool wake; /* wake up flush waiters when finish ? */
+	struct list_head g_list; // global
+	struct list_head i_list; // per inode
+};
+
 /*
  * Snapped cap state that is pending flush to mds.  When a snapshot occurs,
  * we first complete any in-process sync writes and writeback any dirty
@@ -154,10 +162,11 @@
  */
 struct ceph_cap_snap {
 	atomic_t nref;
-	struct ceph_inode_info *ci;
-	struct list_head ci_item, flushing_item;
+	struct list_head ci_item;
 
-	u64 follows, flush_tid;
+	struct ceph_cap_flush cap_flush;
+
+	u64 follows;
 	int issued, dirty;
 	struct ceph_snap_context *context;
 
@@ -186,16 +195,6 @@
 	}
 }
 
-struct ceph_cap_flush {
-	u64 tid;
-	int caps;
-	struct rb_node g_node; // global
-	union {
-		struct rb_node i_node; // inode
-		struct list_head list;
-	};
-};
-
 /*
  * The frag tree describes how a directory is fragmented, potentially across
  * multiple metadata servers.  It is also used to indicate points where
@@ -246,7 +245,7 @@
 	unsigned long lease_renew_after, lease_renew_from;
 	struct list_head lru;
 	struct dentry *dentry;
-	u64 time;
+	unsigned long time;
 	u64 offset;
 };
 
@@ -287,7 +286,6 @@
 
 	struct ceph_dir_layout i_dir_layout;
 	struct ceph_file_layout i_layout;
-	size_t i_pool_ns_len;
 	char *i_symlink;
 
 	/* for dirs */
@@ -311,7 +309,7 @@
 	 * overlapping, pipelined cap flushes to the mds.  we can probably
 	 * reduce the tid to 8 bits if we're concerned about inode size. */
 	struct ceph_cap_flush *i_prealloc_cap_flush;
-	struct rb_root i_cap_flush_tree;
+	struct list_head i_cap_flush_list;
 	wait_queue_head_t i_cap_wq;      /* threads waiting on a capability */
 	unsigned long i_hold_caps_min; /* jiffies */
 	unsigned long i_hold_caps_max; /* jiffies */
@@ -322,7 +320,7 @@
 						    dirty|flushing caps */
 	unsigned i_snap_caps;           /* cap bits for snapped files */
 
-	int i_nr_by_mode[CEPH_FILE_MODE_NUM];  /* open file counts */
+	int i_nr_by_mode[CEPH_FILE_MODE_BITS];  /* open file counts */
 
 	struct mutex i_truncate_mutex;
 	u32 i_truncate_seq;        /* last truncate to smaller size */
@@ -471,6 +469,8 @@
 #define CEPH_I_POOL_WR		(1 << 6)  /* can write to pool */
 #define CEPH_I_SEC_INITED	(1 << 7)  /* security initialized */
 #define CEPH_I_CAP_DROPPED	(1 << 8)  /* caps were forcibly dropped */
+#define CEPH_I_KICK_FLUSH	(1 << 9)  /* kick flushing caps */
+#define CEPH_I_FLUSH_SNAPS	(1 << 10) /* need flush snapss */
 
 static inline void __ceph_dir_set_complete(struct ceph_inode_info *ci,
 					   long long release_count,
@@ -750,6 +750,7 @@
 extern struct inode *ceph_alloc_inode(struct super_block *sb);
 extern void ceph_destroy_inode(struct inode *inode);
 extern int ceph_drop_inode(struct inode *inode);
+extern void ceph_evict_inode(struct inode *inode);
 
 extern struct inode *ceph_get_inode(struct super_block *sb,
 				    struct ceph_vino vino);
@@ -890,9 +891,8 @@
 extern void ceph_put_cap_refs(struct ceph_inode_info *ci, int had);
 extern void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr,
 				       struct ceph_snap_context *snapc);
-extern void __ceph_flush_snaps(struct ceph_inode_info *ci,
-			       struct ceph_mds_session **psession,
-			       int again);
+extern void ceph_flush_snaps(struct ceph_inode_info *ci,
+			     struct ceph_mds_session **psession);
 extern void ceph_check_caps(struct ceph_inode_info *ci, int flags,
 			    struct ceph_mds_session *session);
 extern void ceph_check_delayed_caps(struct ceph_mds_client *mdsc);
@@ -907,10 +907,7 @@
 			 loff_t endoff, int *got, struct page **pinned_page);
 
 /* for counting open files by mode */
-static inline void __ceph_get_fmode(struct ceph_inode_info *ci, int mode)
-{
-	ci->i_nr_by_mode[mode]++;
-}
+extern void __ceph_get_fmode(struct ceph_inode_info *ci, int mode);
 extern void ceph_put_fmode(struct ceph_inode_info *ci, int mode);
 
 /* addr.c */
@@ -931,6 +928,7 @@
 extern int ceph_release(struct inode *inode, struct file *filp);
 extern void ceph_fill_inline_data(struct inode *inode, struct page *locked_page,
 				  char *data, size_t len);
+extern void ceph_sync_write_wait(struct inode *inode);
 /* dir.c */
 extern const struct file_operations ceph_dir_fops;
 extern const struct file_operations ceph_snapdir_fops;
diff --git a/fs/ceph/xattr.c b/fs/ceph/xattr.c
index 4870b29..adc2318 100644
--- a/fs/ceph/xattr.c
+++ b/fs/ceph/xattr.c
@@ -57,81 +57,88 @@
 
 static bool ceph_vxattrcb_layout_exists(struct ceph_inode_info *ci)
 {
-	size_t s;
-	char *p = (char *)&ci->i_layout;
-
-	for (s = 0; s < sizeof(ci->i_layout); s++, p++)
-		if (*p)
-			return true;
-	return false;
+	struct ceph_file_layout *fl = &ci->i_layout;
+	return (fl->stripe_unit > 0 || fl->stripe_count > 0 ||
+		fl->object_size > 0 || fl->pool_id >= 0 ||
+		rcu_dereference_raw(fl->pool_ns) != NULL);
 }
 
 static size_t ceph_vxattrcb_layout(struct ceph_inode_info *ci, char *val,
 				   size_t size)
 {
-	int ret;
 	struct ceph_fs_client *fsc = ceph_sb_to_client(ci->vfs_inode.i_sb);
 	struct ceph_osd_client *osdc = &fsc->client->osdc;
-	s64 pool = ceph_file_layout_pg_pool(ci->i_layout);
+	struct ceph_string *pool_ns;
+	s64 pool = ci->i_layout.pool_id;
 	const char *pool_name;
+	const char *ns_field = " pool_namespace=";
 	char buf[128];
+	size_t len, total_len = 0;
+	int ret;
+
+	pool_ns = ceph_try_get_string(ci->i_layout.pool_ns);
 
 	dout("ceph_vxattrcb_layout %p\n", &ci->vfs_inode);
 	down_read(&osdc->lock);
 	pool_name = ceph_pg_pool_name_by_id(osdc->osdmap, pool);
 	if (pool_name) {
-		size_t len = strlen(pool_name);
-		ret = snprintf(buf, sizeof(buf),
-		"stripe_unit=%lld stripe_count=%lld object_size=%lld pool=",
-		(unsigned long long)ceph_file_layout_su(ci->i_layout),
-		(unsigned long long)ceph_file_layout_stripe_count(ci->i_layout),
-	        (unsigned long long)ceph_file_layout_object_size(ci->i_layout));
-		if (!size) {
-			ret += len;
-		} else if (ret + len > size) {
-			ret = -ERANGE;
-		} else {
-			memcpy(val, buf, ret);
+		len = snprintf(buf, sizeof(buf),
+		"stripe_unit=%u stripe_count=%u object_size=%u pool=",
+		ci->i_layout.stripe_unit, ci->i_layout.stripe_count,
+	        ci->i_layout.object_size);
+		total_len = len + strlen(pool_name);
+	} else {
+		len = snprintf(buf, sizeof(buf),
+		"stripe_unit=%u stripe_count=%u object_size=%u pool=%lld",
+		ci->i_layout.stripe_unit, ci->i_layout.stripe_count,
+	        ci->i_layout.object_size, (unsigned long long)pool);
+		total_len = len;
+	}
+
+	if (pool_ns)
+		total_len += strlen(ns_field) + pool_ns->len;
+
+	if (!size) {
+		ret = total_len;
+	} else if (total_len > size) {
+		ret = -ERANGE;
+	} else {
+		memcpy(val, buf, len);
+		ret = len;
+		if (pool_name) {
+			len = strlen(pool_name);
 			memcpy(val + ret, pool_name, len);
 			ret += len;
 		}
-	} else {
-		ret = snprintf(buf, sizeof(buf),
-		"stripe_unit=%lld stripe_count=%lld object_size=%lld pool=%lld",
-		(unsigned long long)ceph_file_layout_su(ci->i_layout),
-		(unsigned long long)ceph_file_layout_stripe_count(ci->i_layout),
-	        (unsigned long long)ceph_file_layout_object_size(ci->i_layout),
-		(unsigned long long)pool);
-		if (size) {
-			if (ret <= size)
-				memcpy(val, buf, ret);
-			else
-				ret = -ERANGE;
+		if (pool_ns) {
+			len = strlen(ns_field);
+			memcpy(val + ret, ns_field, len);
+			ret += len;
+			memcpy(val + ret, pool_ns->str, pool_ns->len);
+			ret += pool_ns->len;
 		}
 	}
 	up_read(&osdc->lock);
+	ceph_put_string(pool_ns);
 	return ret;
 }
 
 static size_t ceph_vxattrcb_layout_stripe_unit(struct ceph_inode_info *ci,
 					       char *val, size_t size)
 {
-	return snprintf(val, size, "%lld",
-			(unsigned long long)ceph_file_layout_su(ci->i_layout));
+	return snprintf(val, size, "%u", ci->i_layout.stripe_unit);
 }
 
 static size_t ceph_vxattrcb_layout_stripe_count(struct ceph_inode_info *ci,
 						char *val, size_t size)
 {
-	return snprintf(val, size, "%lld",
-	       (unsigned long long)ceph_file_layout_stripe_count(ci->i_layout));
+	return snprintf(val, size, "%u", ci->i_layout.stripe_count);
 }
 
 static size_t ceph_vxattrcb_layout_object_size(struct ceph_inode_info *ci,
 					       char *val, size_t size)
 {
-	return snprintf(val, size, "%lld",
-	       (unsigned long long)ceph_file_layout_object_size(ci->i_layout));
+	return snprintf(val, size, "%u", ci->i_layout.object_size);
 }
 
 static size_t ceph_vxattrcb_layout_pool(struct ceph_inode_info *ci,
@@ -140,7 +147,7 @@
 	int ret;
 	struct ceph_fs_client *fsc = ceph_sb_to_client(ci->vfs_inode.i_sb);
 	struct ceph_osd_client *osdc = &fsc->client->osdc;
-	s64 pool = ceph_file_layout_pg_pool(ci->i_layout);
+	s64 pool = ci->i_layout.pool_id;
 	const char *pool_name;
 
 	down_read(&osdc->lock);
@@ -153,6 +160,18 @@
 	return ret;
 }
 
+static size_t ceph_vxattrcb_layout_pool_namespace(struct ceph_inode_info *ci,
+						  char *val, size_t size)
+{
+	int ret = 0;
+	struct ceph_string *ns = ceph_try_get_string(ci->i_layout.pool_ns);
+	if (ns) {
+		ret = snprintf(val, size, "%.*s", (int)ns->len, ns->str);
+		ceph_put_string(ns);
+	}
+	return ret;
+}
+
 /* directories */
 
 static size_t ceph_vxattrcb_dir_entries(struct ceph_inode_info *ci, char *val,
@@ -241,6 +260,7 @@
 	XATTR_LAYOUT_FIELD(dir, layout, stripe_count),
 	XATTR_LAYOUT_FIELD(dir, layout, object_size),
 	XATTR_LAYOUT_FIELD(dir, layout, pool),
+	XATTR_LAYOUT_FIELD(dir, layout, pool_namespace),
 	XATTR_NAME_CEPH(dir, entries),
 	XATTR_NAME_CEPH(dir, files),
 	XATTR_NAME_CEPH(dir, subdirs),
@@ -268,6 +288,7 @@
 	XATTR_LAYOUT_FIELD(file, layout, stripe_count),
 	XATTR_LAYOUT_FIELD(file, layout, object_size),
 	XATTR_LAYOUT_FIELD(file, layout, pool),
+	XATTR_LAYOUT_FIELD(file, layout, pool_namespace),
 	{ .name = NULL, 0 }	/* Required table terminator */
 };
 static size_t ceph_file_vxattrs_name_size;	/* total size of all names */
diff --git a/include/linux/ceph/ceph_fs.h b/include/linux/ceph/ceph_fs.h
index dfce616..7868d602 100644
--- a/include/linux/ceph/ceph_fs.h
+++ b/include/linux/ceph/ceph_fs.h
@@ -34,9 +34,9 @@
 #define CEPH_MAX_MON   31
 
 /*
- * ceph_file_layout - describe data layout for a file/inode
+ * legacy ceph_file_layoute
  */
-struct ceph_file_layout {
+struct ceph_file_layout_legacy {
 	/* file -> object mapping */
 	__le32 fl_stripe_unit;     /* stripe unit, in bytes.  must be multiple
 				      of page size. */
@@ -53,33 +53,27 @@
 	__le32 fl_pg_pool;      /* namespace, crush ruleset, rep level */
 } __attribute__ ((packed));
 
-#define ceph_file_layout_su(l) ((__s32)le32_to_cpu((l).fl_stripe_unit))
-#define ceph_file_layout_stripe_count(l) \
-	((__s32)le32_to_cpu((l).fl_stripe_count))
-#define ceph_file_layout_object_size(l) ((__s32)le32_to_cpu((l).fl_object_size))
-#define ceph_file_layout_cas_hash(l) ((__s32)le32_to_cpu((l).fl_cas_hash))
-#define ceph_file_layout_object_su(l) \
-	((__s32)le32_to_cpu((l).fl_object_stripe_unit))
-#define ceph_file_layout_pg_pool(l) \
-	((__s32)le32_to_cpu((l).fl_pg_pool))
+struct ceph_string;
+/*
+ * ceph_file_layout - describe data layout for a file/inode
+ */
+struct ceph_file_layout {
+	/* file -> object mapping */
+	u32 stripe_unit;   /* stripe unit, in bytes */
+	u32 stripe_count;  /* over this many objects */
+	u32 object_size;   /* until objects are this big */
+	s64 pool_id;        /* rados pool id */
+	struct ceph_string __rcu *pool_ns; /* rados pool namespace */
+};
 
-static inline unsigned ceph_file_layout_stripe_width(struct ceph_file_layout *l)
-{
-	return le32_to_cpu(l->fl_stripe_unit) *
-		le32_to_cpu(l->fl_stripe_count);
-}
-
-/* "period" == bytes before i start on a new set of objects */
-static inline unsigned ceph_file_layout_period(struct ceph_file_layout *l)
-{
-	return le32_to_cpu(l->fl_object_size) *
-		le32_to_cpu(l->fl_stripe_count);
-}
+extern int ceph_file_layout_is_valid(const struct ceph_file_layout *layout);
+extern void ceph_file_layout_from_legacy(struct ceph_file_layout *fl,
+				struct ceph_file_layout_legacy *legacy);
+extern void ceph_file_layout_to_legacy(struct ceph_file_layout *fl,
+				struct ceph_file_layout_legacy *legacy);
 
 #define CEPH_MIN_STRIPE_UNIT 65536
 
-int ceph_file_layout_is_valid(const struct ceph_file_layout *layout);
-
 struct ceph_dir_layout {
 	__u8   dl_dir_hash;   /* see ceph_hash.h for ids */
 	__u8   dl_unused1;
@@ -127,6 +121,7 @@
 
 /* client <-> mds */
 #define CEPH_MSG_MDS_MAP                21
+#define CEPH_MSG_FS_MAP_USER            103
 
 #define CEPH_MSG_CLIENT_SESSION         22
 #define CEPH_MSG_CLIENT_RECONNECT       23
@@ -399,7 +394,7 @@
 		__le32 flags;
 	} __attribute__ ((packed)) setxattr;
 	struct {
-		struct ceph_file_layout layout;
+		struct ceph_file_layout_legacy layout;
 	} __attribute__ ((packed)) setlayout;
 	struct {
 		__u8 rule; /* currently fcntl or flock */
@@ -478,7 +473,7 @@
 	__le64 version;                /* inode version */
 	__le64 xattr_version;          /* version for xattr blob */
 	struct ceph_mds_reply_cap cap; /* caps issued for this inode */
-	struct ceph_file_layout layout;
+	struct ceph_file_layout_legacy layout;
 	struct ceph_timespec ctime, mtime, atime;
 	__le32 time_warp_seq;
 	__le64 size, max_size, truncate_size;
@@ -531,7 +526,7 @@
 #define CEPH_FILE_MODE_WR         2
 #define CEPH_FILE_MODE_RDWR       3  /* RD | WR */
 #define CEPH_FILE_MODE_LAZY       4  /* lazy io */
-#define CEPH_FILE_MODE_NUM        8  /* bc these are bit fields.. mostly */
+#define CEPH_FILE_MODE_BITS       4
 
 int ceph_flags_to_mode(int flags);
 
@@ -673,7 +668,7 @@
 	__le64 size, max_size, truncate_size;
 	__le32 truncate_seq;
 	struct ceph_timespec mtime, atime, ctime;
-	struct ceph_file_layout layout;
+	struct ceph_file_layout_legacy layout;
 	__le32 time_warp_seq;
 } __attribute__ ((packed));
 
diff --git a/include/linux/ceph/decode.h b/include/linux/ceph/decode.h
index 19e9932..f990f2c 100644
--- a/include/linux/ceph/decode.h
+++ b/include/linux/ceph/decode.h
@@ -3,6 +3,7 @@
 
 #include <linux/err.h>
 #include <linux/bug.h>
+#include <linux/slab.h>
 #include <linux/time.h>
 #include <asm/unaligned.h>
 
@@ -217,6 +218,60 @@
 	*p += len;
 }
 
+/*
+ * version and length starting block encoders/decoders
+ */
+
+/* current code version (u8) + compat code version (u8) + len of struct (u32) */
+#define CEPH_ENCODING_START_BLK_LEN 6
+
+/**
+ * ceph_start_encoding - start encoding block
+ * @struct_v: current (code) version of the encoding
+ * @struct_compat: oldest code version that can decode it
+ * @struct_len: length of struct encoding
+ */
+static inline void ceph_start_encoding(void **p, u8 struct_v, u8 struct_compat,
+				       u32 struct_len)
+{
+	ceph_encode_8(p, struct_v);
+	ceph_encode_8(p, struct_compat);
+	ceph_encode_32(p, struct_len);
+}
+
+/**
+ * ceph_start_decoding - start decoding block
+ * @v: current version of the encoding that the code supports
+ * @name: name of the struct (free-form)
+ * @struct_v: out param for the encoding version
+ * @struct_len: out param for the length of struct encoding
+ *
+ * Validates the length of struct encoding, so unsafe ceph_decode_*
+ * variants can be used for decoding.
+ */
+static inline int ceph_start_decoding(void **p, void *end, u8 v,
+				      const char *name, u8 *struct_v,
+				      u32 *struct_len)
+{
+	u8 struct_compat;
+
+	ceph_decode_need(p, end, CEPH_ENCODING_START_BLK_LEN, bad);
+	*struct_v = ceph_decode_8(p);
+	struct_compat = ceph_decode_8(p);
+	if (v < struct_compat) {
+		pr_warn("got struct_v %d struct_compat %d > %d of %s\n",
+			*struct_v, struct_compat, v, name);
+		return -EINVAL;
+	}
+
+	*struct_len = ceph_decode_32(p);
+	ceph_decode_need(p, end, *struct_len, bad);
+	return 0;
+
+bad:
+	return -ERANGE;
+}
+
 #define ceph_encode_need(p, end, n, bad)			\
 	do {							\
 		if (!likely(ceph_has_room(p, end, n)))		\
diff --git a/include/linux/ceph/libceph.h b/include/linux/ceph/libceph.h
index 690985d..83fc1ff 100644
--- a/include/linux/ceph/libceph.h
+++ b/include/linux/ceph/libceph.h
@@ -21,6 +21,7 @@
 #include <linux/ceph/mon_client.h>
 #include <linux/ceph/osd_client.h>
 #include <linux/ceph/ceph_fs.h>
+#include <linux/ceph/string_table.h>
 
 /*
  * mount options
@@ -214,8 +215,9 @@
 }
 
 #define DEFINE_RB_LOOKUP_FUNC(name, type, keyfld, nodefld)		\
+extern type __lookup_##name##_key;					\
 static type *lookup_##name(struct rb_root *root,			\
-			   typeof(((type *)0)->keyfld) key)		\
+			   typeof(__lookup_##name##_key.keyfld) key)	\
 {									\
 	struct rb_node *n = root->rb_node;				\
 									\
diff --git a/include/linux/ceph/mon_client.h b/include/linux/ceph/mon_client.h
index e2a92df..24d704d 100644
--- a/include/linux/ceph/mon_client.h
+++ b/include/linux/ceph/mon_client.h
@@ -95,7 +95,7 @@
 		struct ceph_mon_subscribe_item item;
 		bool want;
 		u32 have; /* epoch */
-	} subs[3];
+	} subs[4];
 	int fs_cluster_id; /* "mdsmap.<id>" sub */
 
 #ifdef CONFIG_DEBUG_FS
@@ -111,9 +111,10 @@
 extern void ceph_monc_stop(struct ceph_mon_client *monc);
 
 enum {
-	CEPH_SUB_MDSMAP = 0,
-	CEPH_SUB_MONMAP,
+	CEPH_SUB_MONMAP = 0,
 	CEPH_SUB_OSDMAP,
+	CEPH_SUB_FSMAP,
+	CEPH_SUB_MDSMAP,
 };
 
 extern const char *ceph_sub_str[];
diff --git a/include/linux/ceph/msgpool.h b/include/linux/ceph/msgpool.h
index 4b0d389..ddd0d48d 100644
--- a/include/linux/ceph/msgpool.h
+++ b/include/linux/ceph/msgpool.h
@@ -2,7 +2,6 @@
 #define _FS_CEPH_MSGPOOL
 
 #include <linux/mempool.h>
-#include <linux/ceph/messenger.h>
 
 /*
  * we use memory pools for preallocating messages we may receive, to
diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h
index 1b3b6e155..8589323 100644
--- a/include/linux/ceph/osd_client.h
+++ b/include/linux/ceph/osd_client.h
@@ -9,6 +9,7 @@
 #include <linux/ceph/types.h>
 #include <linux/ceph/osdmap.h>
 #include <linux/ceph/messenger.h>
+#include <linux/ceph/msgpool.h>
 #include <linux/ceph/auth.h>
 #include <linux/ceph/pagelist.h>
 
diff --git a/include/linux/ceph/osdmap.h b/include/linux/ceph/osdmap.h
index 9ccf4db..9a90417 100644
--- a/include/linux/ceph/osdmap.h
+++ b/include/linux/ceph/osdmap.h
@@ -63,11 +63,13 @@
 
 struct ceph_object_locator {
 	s64 pool;
+	struct ceph_string *pool_ns;
 };
 
 static inline void ceph_oloc_init(struct ceph_object_locator *oloc)
 {
 	oloc->pool = -1;
+	oloc->pool_ns = NULL;
 }
 
 static inline bool ceph_oloc_empty(const struct ceph_object_locator *oloc)
@@ -75,11 +77,9 @@
 	return oloc->pool == -1;
 }
 
-static inline void ceph_oloc_copy(struct ceph_object_locator *dest,
-				  const struct ceph_object_locator *src)
-{
-	dest->pool = src->pool;
-}
+void ceph_oloc_copy(struct ceph_object_locator *dest,
+		    const struct ceph_object_locator *src);
+void ceph_oloc_destroy(struct ceph_object_locator *oloc);
 
 /*
  * Maximum supported by kernel client object name length
@@ -115,6 +115,11 @@
 	oid->name_len = 0;
 }
 
+#define CEPH_OID_INIT_ONSTACK(oid)					\
+    ({ ceph_oid_init(&oid); oid; })
+#define CEPH_DEFINE_OID_ONSTACK(oid)					\
+	struct ceph_object_id oid = CEPH_OID_INIT_ONSTACK(oid)
+
 static inline bool ceph_oid_empty(const struct ceph_object_id *oid)
 {
 	return oid->name == oid->inline_name && !oid->name_len;
diff --git a/include/linux/ceph/string_table.h b/include/linux/ceph/string_table.h
new file mode 100644
index 0000000..1b02c96
--- /dev/null
+++ b/include/linux/ceph/string_table.h
@@ -0,0 +1,62 @@
+#ifndef _FS_CEPH_STRING_TABLE_H
+#define _FS_CEPH_STRING_TABLE_H
+
+#include <linux/types.h>
+#include <linux/kref.h>
+#include <linux/rbtree.h>
+#include <linux/rcupdate.h>
+
+struct ceph_string {
+	struct kref kref;
+	union {
+		struct rb_node node;
+		struct rcu_head rcu;
+	};
+	size_t len;
+	char str[];
+};
+
+extern void ceph_release_string(struct kref *ref);
+extern struct ceph_string *ceph_find_or_create_string(const char *str,
+						      size_t len);
+extern bool ceph_strings_empty(void);
+
+static inline struct ceph_string *ceph_get_string(struct ceph_string *str)
+{
+	kref_get(&str->kref);
+	return str;
+}
+
+static inline void ceph_put_string(struct ceph_string *str)
+{
+	if (!str)
+		return;
+	kref_put(&str->kref, ceph_release_string);
+}
+
+static inline int ceph_compare_string(struct ceph_string *cs,
+				      const char* str, size_t len)
+{
+	size_t cs_len = cs ? cs->len : 0;
+	if (cs_len != len)
+		return cs_len - len;
+	if (len == 0)
+		return 0;
+	return strncmp(cs->str, str, len);
+}
+
+#define ceph_try_get_string(x)					\
+({								\
+	struct ceph_string *___str;				\
+	rcu_read_lock();					\
+	for (;;) {						\
+		___str = rcu_dereference(x);			\
+		if (!___str ||					\
+		    kref_get_unless_zero(&___str->kref))	\
+			break;					\
+	}							\
+	rcu_read_unlock();					\
+	(___str);						\
+})
+
+#endif
diff --git a/net/ceph/Makefile b/net/ceph/Makefile
index 958d9856..84cbed6 100644
--- a/net/ceph/Makefile
+++ b/net/ceph/Makefile
@@ -11,5 +11,5 @@
 	crypto.o armor.o \
 	auth_x.o \
 	ceph_fs.o ceph_strings.o ceph_hash.o \
-	pagevec.o snapshot.o
+	pagevec.o snapshot.o string_table.o
 
diff --git a/net/ceph/ceph_common.c b/net/ceph/ceph_common.c
index 55d2bfe..bddfcf6 100644
--- a/net/ceph/ceph_common.c
+++ b/net/ceph/ceph_common.c
@@ -747,6 +747,8 @@
 static void __exit exit_ceph_lib(void)
 {
 	dout("exit_ceph_lib\n");
+	WARN_ON(!ceph_strings_empty());
+
 	ceph_osdc_cleanup();
 	ceph_msgr_exit();
 	ceph_crypto_shutdown();
diff --git a/net/ceph/ceph_fs.c b/net/ceph/ceph_fs.c
index 41466cc..7d54e94 100644
--- a/net/ceph/ceph_fs.c
+++ b/net/ceph/ceph_fs.c
@@ -9,9 +9,9 @@
  */
 int ceph_file_layout_is_valid(const struct ceph_file_layout *layout)
 {
-	__u32 su = le32_to_cpu(layout->fl_stripe_unit);
-	__u32 sc = le32_to_cpu(layout->fl_stripe_count);
-	__u32 os = le32_to_cpu(layout->fl_object_size);
+	__u32 su = layout->stripe_unit;
+	__u32 sc = layout->stripe_count;
+	__u32 os = layout->object_size;
 
 	/* stripe unit, object size must be non-zero, 64k increment */
 	if (!su || (su & (CEPH_MIN_STRIPE_UNIT-1)))
@@ -27,6 +27,30 @@
 	return 1;
 }
 
+void ceph_file_layout_from_legacy(struct ceph_file_layout *fl,
+				  struct ceph_file_layout_legacy *legacy)
+{
+	fl->stripe_unit = le32_to_cpu(legacy->fl_stripe_unit);
+	fl->stripe_count = le32_to_cpu(legacy->fl_stripe_count);
+	fl->object_size = le32_to_cpu(legacy->fl_object_size);
+	fl->pool_id = le32_to_cpu(legacy->fl_pg_pool);
+	if (fl->pool_id == 0)
+		fl->pool_id = -1;
+}
+EXPORT_SYMBOL(ceph_file_layout_from_legacy);
+
+void ceph_file_layout_to_legacy(struct ceph_file_layout *fl,
+				struct ceph_file_layout_legacy *legacy)
+{
+	legacy->fl_stripe_unit = cpu_to_le32(fl->stripe_unit);
+	legacy->fl_stripe_count = cpu_to_le32(fl->stripe_count);
+	legacy->fl_object_size = cpu_to_le32(fl->object_size);
+	if (fl->pool_id >= 0)
+		legacy->fl_pg_pool = cpu_to_le32(fl->pool_id);
+	else
+		legacy->fl_pg_pool = 0;
+}
+EXPORT_SYMBOL(ceph_file_layout_to_legacy);
 
 int ceph_flags_to_mode(int flags)
 {
diff --git a/net/ceph/debugfs.c b/net/ceph/debugfs.c
index e77b04c..c62b2b0 100644
--- a/net/ceph/debugfs.c
+++ b/net/ceph/debugfs.c
@@ -156,8 +156,16 @@
 	seq_printf(s, "]/%d\t[", t->up.primary);
 	for (i = 0; i < t->acting.size; i++)
 		seq_printf(s, "%s%d", (!i ? "" : ","), t->acting.osds[i]);
-	seq_printf(s, "]/%d\t%*pE\t0x%x", t->acting.primary,
-		   t->target_oid.name_len, t->target_oid.name, t->flags);
+	seq_printf(s, "]/%d\t", t->acting.primary);
+	if (t->target_oloc.pool_ns) {
+		seq_printf(s, "%*pE/%*pE\t0x%x",
+			(int)t->target_oloc.pool_ns->len,
+			t->target_oloc.pool_ns->str,
+			t->target_oid.name_len, t->target_oid.name, t->flags);
+	} else {
+		seq_printf(s, "%*pE\t0x%x", t->target_oid.name_len,
+			t->target_oid.name, t->flags);
+	}
 	if (t->paused)
 		seq_puts(s, "\tP");
 }
diff --git a/net/ceph/mon_client.c b/net/ceph/mon_client.c
index 37c38a7..c83326c 100644
--- a/net/ceph/mon_client.c
+++ b/net/ceph/mon_client.c
@@ -227,9 +227,10 @@
 }
 
 const char *ceph_sub_str[] = {
-	[CEPH_SUB_MDSMAP] = "mdsmap",
 	[CEPH_SUB_MONMAP] = "monmap",
 	[CEPH_SUB_OSDMAP] = "osdmap",
+	[CEPH_SUB_FSMAP]  = "fsmap.user",
+	[CEPH_SUB_MDSMAP] = "mdsmap",
 };
 
 /*
@@ -1193,6 +1194,7 @@
 	case CEPH_MSG_MON_MAP:
 	case CEPH_MSG_MDS_MAP:
 	case CEPH_MSG_OSD_MAP:
+	case CEPH_MSG_FS_MAP_USER:
 		m = ceph_msg_new(type, front_len, GFP_NOFS, false);
 		if (!m)
 			return NULL;	/* ENOMEM--return skip == 0 */
diff --git a/net/ceph/msgpool.c b/net/ceph/msgpool.c
index ddec1c1..aaed59a 100644
--- a/net/ceph/msgpool.c
+++ b/net/ceph/msgpool.c
@@ -5,6 +5,7 @@
 #include <linux/types.h>
 #include <linux/vmalloc.h>
 
+#include <linux/ceph/messenger.h>
 #include <linux/ceph/msgpool.h>
 
 static void *msgpool_alloc(gfp_t gfp_mask, void *arg)
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index 8946959..b5ec096 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -387,7 +387,9 @@
 static void target_destroy(struct ceph_osd_request_target *t)
 {
 	ceph_oid_destroy(&t->base_oid);
+	ceph_oloc_destroy(&t->base_oloc);
 	ceph_oid_destroy(&t->target_oid);
+	ceph_oloc_destroy(&t->target_oloc);
 }
 
 /*
@@ -533,6 +535,11 @@
 }
 EXPORT_SYMBOL(ceph_osdc_alloc_request);
 
+static int ceph_oloc_encoding_size(struct ceph_object_locator *oloc)
+{
+	return 8 + 4 + 4 + 4 + (oloc->pool_ns ? oloc->pool_ns->len : 0);
+}
+
 int ceph_osdc_alloc_messages(struct ceph_osd_request *req, gfp_t gfp)
 {
 	struct ceph_osd_client *osdc = req->r_osdc;
@@ -540,11 +547,13 @@
 	int msg_size;
 
 	WARN_ON(ceph_oid_empty(&req->r_base_oid));
+	WARN_ON(ceph_oloc_empty(&req->r_base_oloc));
 
 	/* create request message */
 	msg_size = 4 + 4 + 4; /* client_inc, osdmap_epoch, flags */
 	msg_size += 4 + 4 + 4 + 8; /* mtime, reassert_version */
-	msg_size += 2 + 4 + 8 + 4 + 4; /* oloc */
+	msg_size += CEPH_ENCODING_START_BLK_LEN +
+			ceph_oloc_encoding_size(&req->r_base_oloc); /* oloc */
 	msg_size += 1 + 8 + 4 + 4; /* pgid */
 	msg_size += 4 + req->r_base_oid.name_len; /* oid */
 	msg_size += 2 + req->r_num_ops * sizeof(struct ceph_osd_op);
@@ -932,7 +941,7 @@
 	if (opcode == CEPH_OSD_OP_CREATE || opcode == CEPH_OSD_OP_DELETE) {
 		osd_req_op_init(req, which, opcode, 0);
 	} else {
-		u32 object_size = le32_to_cpu(layout->fl_object_size);
+		u32 object_size = layout->object_size;
 		u32 object_base = off - objoff;
 		if (!(truncate_seq == 1 && truncate_size == -1ULL)) {
 			if (truncate_size <= object_base) {
@@ -948,7 +957,8 @@
 	}
 
 	req->r_flags = flags;
-	req->r_base_oloc.pool = ceph_file_layout_pg_pool(*layout);
+	req->r_base_oloc.pool = layout->pool_id;
+	req->r_base_oloc.pool_ns = ceph_try_get_string(layout->pool_ns);
 	ceph_oid_printf(&req->r_base_oid, "%llx.%08llx", vino.ino, objnum);
 
 	req->r_snapid = vino.snap;
@@ -1489,12 +1499,16 @@
 	p += sizeof(req->r_replay_version);
 
 	/* oloc */
-	ceph_encode_8(&p, 4);
-	ceph_encode_8(&p, 4);
-	ceph_encode_32(&p, 8 + 4 + 4);
+	ceph_start_encoding(&p, 5, 4,
+			    ceph_oloc_encoding_size(&req->r_t.target_oloc));
 	ceph_encode_64(&p, req->r_t.target_oloc.pool);
 	ceph_encode_32(&p, -1); /* preferred */
 	ceph_encode_32(&p, 0); /* key len */
+	if (req->r_t.target_oloc.pool_ns)
+		ceph_encode_string(&p, end, req->r_t.target_oloc.pool_ns->str,
+				   req->r_t.target_oloc.pool_ns->len);
+	else
+		ceph_encode_32(&p, 0);
 
 	/* pgid */
 	ceph_encode_8(&p, 1);
@@ -2594,9 +2608,22 @@
 	}
 
 	if (struct_v >= 5) {
+		bool changed = false;
+
 		len = ceph_decode_32(p);
 		if (len > 0) {
-			pr_warn("ceph_object_locator::nspace is set\n");
+			ceph_decode_need(p, end, len, e_inval);
+			if (!oloc->pool_ns ||
+			    ceph_compare_string(oloc->pool_ns, *p, len))
+				changed = true;
+			*p += len;
+		} else {
+			if (oloc->pool_ns)
+				changed = true;
+		}
+		if (changed) {
+			/* redirect changes namespace */
+			pr_warn("ceph_object_locator::nspace is changed\n");
 			goto e_inval;
 		}
 	}
@@ -2806,7 +2833,9 @@
 		goto out_unlock_session;
 	}
 
+	m.redirect.oloc.pool_ns = req->r_t.target_oloc.pool_ns;
 	ret = decode_MOSDOpReply(msg, &m);
+	m.redirect.oloc.pool_ns = NULL;
 	if (ret) {
 		pr_err("failed to decode MOSDOpReply for tid %llu: %d\n",
 		       req->r_tid, ret);
@@ -2835,7 +2864,11 @@
 		unlink_request(osd, req);
 		mutex_unlock(&osd->lock);
 
-		ceph_oloc_copy(&req->r_t.target_oloc, &m.redirect.oloc);
+		/*
+		 * Not ceph_oloc_copy() - changing pool_ns is not
+		 * supported.
+		 */
+		req->r_t.target_oloc.pool = m.redirect.oloc.pool;
 		req->r_flags |= CEPH_OSD_FLAG_REDIRECTED;
 		req->r_tid = 0;
 		__submit_request(req, false);
diff --git a/net/ceph/osdmap.c b/net/ceph/osdmap.c
index 7e480bf..d243688 100644
--- a/net/ceph/osdmap.c
+++ b/net/ceph/osdmap.c
@@ -1510,6 +1510,24 @@
 	return ERR_PTR(err);
 }
 
+void ceph_oloc_copy(struct ceph_object_locator *dest,
+		    const struct ceph_object_locator *src)
+{
+	WARN_ON(!ceph_oloc_empty(dest));
+	WARN_ON(dest->pool_ns); /* empty() only covers ->pool */
+
+	dest->pool = src->pool;
+	if (src->pool_ns)
+		dest->pool_ns = ceph_get_string(src->pool_ns);
+}
+EXPORT_SYMBOL(ceph_oloc_copy);
+
+void ceph_oloc_destroy(struct ceph_object_locator *oloc)
+{
+	ceph_put_string(oloc->pool_ns);
+}
+EXPORT_SYMBOL(ceph_oloc_destroy);
+
 void ceph_oid_copy(struct ceph_object_id *dest,
 		   const struct ceph_object_id *src)
 {
@@ -1770,9 +1788,9 @@
 				   u64 *ono,
 				   u64 *oxoff, u64 *oxlen)
 {
-	u32 osize = le32_to_cpu(layout->fl_object_size);
-	u32 su = le32_to_cpu(layout->fl_stripe_unit);
-	u32 sc = le32_to_cpu(layout->fl_stripe_count);
+	u32 osize = layout->object_size;
+	u32 su = layout->stripe_unit;
+	u32 sc = layout->stripe_count;
 	u32 bl, stripeno, stripepos, objsetno;
 	u32 su_per_object;
 	u64 t, su_offset;
@@ -1844,12 +1862,34 @@
 	if (!pi)
 		return -ENOENT;
 
-	raw_pgid->pool = oloc->pool;
-	raw_pgid->seed = ceph_str_hash(pi->object_hash, oid->name,
-				       oid->name_len);
+	if (!oloc->pool_ns) {
+		raw_pgid->pool = oloc->pool;
+		raw_pgid->seed = ceph_str_hash(pi->object_hash, oid->name,
+					     oid->name_len);
+		dout("%s %s -> raw_pgid %llu.%x\n", __func__, oid->name,
+		     raw_pgid->pool, raw_pgid->seed);
+	} else {
+		char stack_buf[256];
+		char *buf = stack_buf;
+		int nsl = oloc->pool_ns->len;
+		size_t total = nsl + 1 + oid->name_len;
 
-	dout("%s %s -> raw_pgid %llu.%x\n", __func__, oid->name,
-	     raw_pgid->pool, raw_pgid->seed);
+		if (total > sizeof(stack_buf)) {
+			buf = kmalloc(total, GFP_NOIO);
+			if (!buf)
+				return -ENOMEM;
+		}
+		memcpy(buf, oloc->pool_ns->str, nsl);
+		buf[nsl] = '\037';
+		memcpy(buf + nsl + 1, oid->name, oid->name_len);
+		raw_pgid->pool = oloc->pool;
+		raw_pgid->seed = ceph_str_hash(pi->object_hash, buf, total);
+		if (buf != stack_buf)
+			kfree(buf);
+		dout("%s %s ns %.*s -> raw_pgid %llu.%x\n", __func__,
+		     oid->name, nsl, oloc->pool_ns->str,
+		     raw_pgid->pool, raw_pgid->seed);
+	}
 	return 0;
 }
 EXPORT_SYMBOL(ceph_object_locator_to_pg);
diff --git a/net/ceph/string_table.c b/net/ceph/string_table.c
new file mode 100644
index 0000000..ca53c83
--- /dev/null
+++ b/net/ceph/string_table.c
@@ -0,0 +1,111 @@
+#include <linux/slab.h>
+#include <linux/gfp.h>
+#include <linux/string.h>
+#include <linux/spinlock.h>
+#include <linux/ceph/string_table.h>
+
+static DEFINE_SPINLOCK(string_tree_lock);
+static struct rb_root string_tree = RB_ROOT;
+
+struct ceph_string *ceph_find_or_create_string(const char* str, size_t len)
+{
+	struct ceph_string *cs, *exist;
+	struct rb_node **p, *parent;
+	int ret;
+
+	exist = NULL;
+	spin_lock(&string_tree_lock);
+	p = &string_tree.rb_node;
+	while (*p) {
+		exist = rb_entry(*p, struct ceph_string, node);
+		ret = ceph_compare_string(exist, str, len);
+		if (ret > 0)
+			p = &(*p)->rb_left;
+		else if (ret < 0)
+			p = &(*p)->rb_right;
+		else
+			break;
+		exist = NULL;
+	}
+	if (exist && !kref_get_unless_zero(&exist->kref)) {
+		rb_erase(&exist->node, &string_tree);
+		RB_CLEAR_NODE(&exist->node);
+		exist = NULL;
+	}
+	spin_unlock(&string_tree_lock);
+	if (exist)
+		return exist;
+
+	cs = kmalloc(sizeof(*cs) + len + 1, GFP_NOFS);
+	if (!cs)
+		return NULL;
+
+	kref_init(&cs->kref);
+	cs->len = len;
+	memcpy(cs->str, str, len);
+	cs->str[len] = 0;
+
+retry:
+	exist = NULL;
+	parent = NULL;
+	p = &string_tree.rb_node;
+	spin_lock(&string_tree_lock);
+	while (*p) {
+		parent = *p;
+		exist = rb_entry(*p, struct ceph_string, node);
+		ret = ceph_compare_string(exist, str, len);
+		if (ret > 0)
+			p = &(*p)->rb_left;
+		else if (ret < 0)
+			p = &(*p)->rb_right;
+		else
+			break;
+		exist = NULL;
+	}
+	ret = 0;
+	if (!exist) {
+		rb_link_node(&cs->node, parent, p);
+		rb_insert_color(&cs->node, &string_tree);
+	} else if (!kref_get_unless_zero(&exist->kref)) {
+		rb_erase(&exist->node, &string_tree);
+		RB_CLEAR_NODE(&exist->node);
+		ret = -EAGAIN;
+	}
+	spin_unlock(&string_tree_lock);
+	if (ret == -EAGAIN)
+		goto retry;
+
+	if (exist) {
+		kfree(cs);
+		cs = exist;
+	}
+
+	return cs;
+}
+EXPORT_SYMBOL(ceph_find_or_create_string);
+
+static void ceph_free_string(struct rcu_head *head)
+{
+	struct ceph_string *cs = container_of(head, struct ceph_string, rcu);
+	kfree(cs);
+}
+
+void ceph_release_string(struct kref *ref)
+{
+	struct ceph_string *cs = container_of(ref, struct ceph_string, kref);
+
+	spin_lock(&string_tree_lock);
+	if (!RB_EMPTY_NODE(&cs->node)) {
+		rb_erase(&cs->node, &string_tree);
+		RB_CLEAR_NODE(&cs->node);
+	}
+	spin_unlock(&string_tree_lock);
+
+	call_rcu(&cs->rcu, ceph_free_string);
+}
+EXPORT_SYMBOL(ceph_release_string);
+
+bool ceph_strings_empty(void)
+{
+	return RB_EMPTY_ROOT(&string_tree);
+}