Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client

* 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client:
  ceph: ensure prealloc_blob is in place when removing xattr
  rbd: initialize snap_rwsem in rbd_add()
  ceph: enable/disable dentry complete flags via mount option
  vfs: export symbol d_find_any_alias()
  ceph: always initialize the dentry in open_root_dentry()
  libceph: remove useless return value for osd_client __send_request()
  ceph: avoid iput() while holding spinlock in ceph_dir_fsync
  ceph: avoid useless dget/dput in encode_fh
  ceph: dereference pointer after checking for NULL
  crush: fix force for non-root TAKE
  ceph: remove unnecessary d_fsdata conditional checks
  ceph: Use kmemdup rather than duplicating its implementation

Fix up conflicts in fs/ceph/super.c (d_alloc_root() failure handling vs
always initialize the dentry in open_root_dentry)
diff --git a/Documentation/filesystems/ceph.txt b/Documentation/filesystems/ceph.txt
index 763d8eb..d6030aa 100644
--- a/Documentation/filesystems/ceph.txt
+++ b/Documentation/filesystems/ceph.txt
@@ -119,12 +119,20 @@
 	must rely on TCP's error correction to detect data corruption
 	in the data payload.
 
-  noasyncreaddir
-	Disable client's use its local cache to satisfy	readdir
-	requests.  (This does not change correctness; the client uses
-	cached metadata only when a lease or capability ensures it is
-	valid.)
+  dcache
+        Use the dcache contents to perform negative lookups and
+        readdir when the client has the entire directory contents in
+        its cache.  (This does not change correctness; the client uses
+        cached metadata only when a lease or capability ensures it is
+        valid.)
 
+  nodcache
+        Do not use the dcache as above.  This avoids a significant amount of
+        complex code, sacrificing performance without affecting correctness,
+        and is useful for tracking down bugs.
+
+  noasyncreaddir
+	Do not use the dcache as above for readdir.
 
 More Information
 ================
diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index 148ab94..3fd31de 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -2184,6 +2184,8 @@
 	INIT_LIST_HEAD(&rbd_dev->node);
 	INIT_LIST_HEAD(&rbd_dev->snaps);
 
+	init_rwsem(&rbd_dev->header.snap_rwsem);
+
 	/* generate unique id: find highest unique id, add one */
 	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
 
diff --git a/fs/ceph/dir.c b/fs/ceph/dir.c
index 74fd747..618246b 100644
--- a/fs/ceph/dir.c
+++ b/fs/ceph/dir.c
@@ -973,7 +973,7 @@
 
 	spin_lock(&dentry->d_lock);
 	di = ceph_dentry(dentry);
-	if (di && di->lease_session) {
+	if (di->lease_session) {
 		s = di->lease_session;
 		spin_lock(&s->s_cap_lock);
 		gen = s->s_cap_gen;
@@ -1072,13 +1072,11 @@
 	struct ceph_dentry_info *di = ceph_dentry(dentry);
 
 	dout("d_release %p\n", dentry);
-	if (di) {
-		ceph_dentry_lru_del(dentry);
-		if (di->lease_session)
-			ceph_put_mds_session(di->lease_session);
-		kmem_cache_free(ceph_dentry_cachep, di);
-		dentry->d_fsdata = NULL;
-	}
+	ceph_dentry_lru_del(dentry);
+	if (di->lease_session)
+		ceph_put_mds_session(di->lease_session);
+	kmem_cache_free(ceph_dentry_cachep, di);
+	dentry->d_fsdata = NULL;
 }
 
 static int ceph_snapdir_d_revalidate(struct dentry *dentry,
@@ -1096,17 +1094,36 @@
  */
 void ceph_dir_set_complete(struct inode *inode)
 {
-	/* not yet implemented */
+	struct dentry *dentry = d_find_any_alias(inode);
+	
+	if (dentry && ceph_dentry(dentry) &&
+	    ceph_test_mount_opt(ceph_sb_to_client(dentry->d_sb), DCACHE)) {
+		dout(" marking %p (%p) complete\n", inode, dentry);
+		set_bit(CEPH_D_COMPLETE, &ceph_dentry(dentry)->flags);
+	}
+	dput(dentry);
 }
 
 void ceph_dir_clear_complete(struct inode *inode)
 {
-	/* not yet implemented */
+	struct dentry *dentry = d_find_any_alias(inode);
+
+	if (dentry && ceph_dentry(dentry)) {
+		dout(" marking %p (%p) complete\n", inode, dentry);
+		set_bit(CEPH_D_COMPLETE, &ceph_dentry(dentry)->flags);
+	}
+	dput(dentry);
 }
 
 bool ceph_dir_test_complete(struct inode *inode)
 {
-	/* not yet implemented */
+	struct dentry *dentry = d_find_any_alias(inode);
+
+	if (dentry && ceph_dentry(dentry)) {
+		dout(" marking %p (%p) NOT complete\n", inode, dentry);
+		clear_bit(CEPH_D_COMPLETE, &ceph_dentry(dentry)->flags);
+	}
+	dput(dentry);
 	return false;
 }
 
@@ -1220,6 +1237,7 @@
 	do {
 		ceph_mdsc_get_request(req);
 		spin_unlock(&ci->i_unsafe_lock);
+
 		dout("dir_fsync %p wait on tid %llu (until %llu)\n",
 		     inode, req->r_tid, last_tid);
 		if (req->r_timeout) {
@@ -1232,9 +1250,9 @@
 		} else {
 			wait_for_completion(&req->r_safe_completion);
 		}
-		spin_lock(&ci->i_unsafe_lock);
 		ceph_mdsc_put_request(req);
 
+		spin_lock(&ci->i_unsafe_lock);
 		if (ret || list_empty(head))
 			break;
 		req = list_entry(head->next,
@@ -1259,13 +1277,11 @@
 
 	dout("dentry_lru_add %p %p '%.*s'\n", di, dn,
 	     dn->d_name.len, dn->d_name.name);
-	if (di) {
-		mdsc = ceph_sb_to_client(dn->d_sb)->mdsc;
-		spin_lock(&mdsc->dentry_lru_lock);
-		list_add_tail(&di->lru, &mdsc->dentry_lru);
-		mdsc->num_dentry++;
-		spin_unlock(&mdsc->dentry_lru_lock);
-	}
+	mdsc = ceph_sb_to_client(dn->d_sb)->mdsc;
+	spin_lock(&mdsc->dentry_lru_lock);
+	list_add_tail(&di->lru, &mdsc->dentry_lru);
+	mdsc->num_dentry++;
+	spin_unlock(&mdsc->dentry_lru_lock);
 }
 
 void ceph_dentry_lru_touch(struct dentry *dn)
@@ -1275,12 +1291,10 @@
 
 	dout("dentry_lru_touch %p %p '%.*s' (offset %lld)\n", di, dn,
 	     dn->d_name.len, dn->d_name.name, di->offset);
-	if (di) {
-		mdsc = ceph_sb_to_client(dn->d_sb)->mdsc;
-		spin_lock(&mdsc->dentry_lru_lock);
-		list_move_tail(&di->lru, &mdsc->dentry_lru);
-		spin_unlock(&mdsc->dentry_lru_lock);
-	}
+	mdsc = ceph_sb_to_client(dn->d_sb)->mdsc;
+	spin_lock(&mdsc->dentry_lru_lock);
+	list_move_tail(&di->lru, &mdsc->dentry_lru);
+	spin_unlock(&mdsc->dentry_lru_lock);
 }
 
 void ceph_dentry_lru_del(struct dentry *dn)
@@ -1290,13 +1304,11 @@
 
 	dout("dentry_lru_del %p %p '%.*s'\n", di, dn,
 	     dn->d_name.len, dn->d_name.name);
-	if (di) {
-		mdsc = ceph_sb_to_client(dn->d_sb)->mdsc;
-		spin_lock(&mdsc->dentry_lru_lock);
-		list_del_init(&di->lru);
-		mdsc->num_dentry--;
-		spin_unlock(&mdsc->dentry_lru_lock);
-	}
+	mdsc = ceph_sb_to_client(dn->d_sb)->mdsc;
+	spin_lock(&mdsc->dentry_lru_lock);
+	list_del_init(&di->lru);
+	mdsc->num_dentry--;
+	spin_unlock(&mdsc->dentry_lru_lock);
 }
 
 /*
diff --git a/fs/ceph/export.c b/fs/ceph/export.c
index 9fbcdec..fbb2a64 100644
--- a/fs/ceph/export.c
+++ b/fs/ceph/export.c
@@ -56,9 +56,7 @@
 		return -EINVAL;
 
 	spin_lock(&dentry->d_lock);
-	parent = dget(dentry->d_parent);
-	spin_unlock(&dentry->d_lock);
-
+	parent = dentry->d_parent;
 	if (*max_len >= connected_handle_length) {
 		dout("encode_fh %p connectable\n", dentry);
 		cfh->ino = ceph_ino(dentry->d_inode);
@@ -81,7 +79,7 @@
 		*max_len = handle_length;
 		type = 255;
 	}
-	dput(parent);
+	spin_unlock(&dentry->d_lock);
 	return type;
 }
 
diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c
index 25283e7..2c48937 100644
--- a/fs/ceph/inode.c
+++ b/fs/ceph/inode.c
@@ -850,11 +850,12 @@
 {
 	struct dentry *dir = dn->d_parent;
 	struct inode *inode = dir->d_inode;
-	struct ceph_inode_info *ci = ceph_inode(inode);
+	struct ceph_inode_info *ci;
 	struct ceph_dentry_info *di;
 
 	BUG_ON(!inode);
 
+	ci = ceph_inode(inode);
 	di = ceph_dentry(dn);
 
 	spin_lock(&ci->i_ceph_lock);
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index 6203d80..23ab6a3 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -2772,7 +2772,7 @@
 	di = ceph_dentry(dentry);
 	switch (h->action) {
 	case CEPH_MDS_LEASE_REVOKE:
-		if (di && di->lease_session == session) {
+		if (di->lease_session == session) {
 			if (ceph_seq_cmp(di->lease_seq, seq) > 0)
 				h->seq = cpu_to_le32(di->lease_seq);
 			__ceph_mdsc_drop_dentry_lease(dentry);
@@ -2781,7 +2781,7 @@
 		break;
 
 	case CEPH_MDS_LEASE_RENEW:
-		if (di && di->lease_session == session &&
+		if (di->lease_session == session &&
 		    di->lease_gen == session->s_cap_gen &&
 		    di->lease_renew_from &&
 		    di->lease_renew_after == 0) {
diff --git a/fs/ceph/super.c b/fs/ceph/super.c
index 48f61a1..00de2c9 100644
--- a/fs/ceph/super.c
+++ b/fs/ceph/super.c
@@ -131,6 +131,8 @@
 	Opt_rbytes,
 	Opt_norbytes,
 	Opt_noasyncreaddir,
+	Opt_dcache,
+	Opt_nodcache,
 	Opt_ino32,
 };
 
@@ -152,6 +154,8 @@
 	{Opt_rbytes, "rbytes"},
 	{Opt_norbytes, "norbytes"},
 	{Opt_noasyncreaddir, "noasyncreaddir"},
+	{Opt_dcache, "dcache"},
+	{Opt_nodcache, "nodcache"},
 	{Opt_ino32, "ino32"},
 	{-1, NULL}
 };
@@ -231,6 +235,12 @@
 	case Opt_noasyncreaddir:
 		fsopt->flags |= CEPH_MOUNT_OPT_NOASYNCREADDIR;
 		break;
+	case Opt_dcache:
+		fsopt->flags |= CEPH_MOUNT_OPT_DCACHE;
+		break;
+	case Opt_nodcache:
+		fsopt->flags &= ~CEPH_MOUNT_OPT_DCACHE;
+		break;
 	case Opt_ino32:
 		fsopt->flags |= CEPH_MOUNT_OPT_INO32;
 		break;
@@ -377,6 +387,10 @@
 		seq_puts(m, ",norbytes");
 	if (fsopt->flags & CEPH_MOUNT_OPT_NOASYNCREADDIR)
 		seq_puts(m, ",noasyncreaddir");
+	if (fsopt->flags & CEPH_MOUNT_OPT_DCACHE)
+		seq_puts(m, ",dcache");
+	else
+		seq_puts(m, ",nodcache");
 
 	if (fsopt->wsize)
 		seq_printf(m, ",wsize=%d", fsopt->wsize);
@@ -647,10 +661,10 @@
 				root = ERR_PTR(-ENOMEM);
 				goto out;
 			}
-			ceph_init_dentry(root);
 		} else {
 			root = d_obtain_alias(inode);
 		}
+		ceph_init_dentry(root);
 		dout("open_root_inode success, root dentry is %p\n", root);
 	} else {
 		root = ERR_PTR(err);
diff --git a/fs/ceph/super.h b/fs/ceph/super.h
index cb3652b..1421f3d 100644
--- a/fs/ceph/super.h
+++ b/fs/ceph/super.h
@@ -28,6 +28,7 @@
 #define CEPH_MOUNT_OPT_RBYTES          (1<<5) /* dir st_bytes = rbytes */
 #define CEPH_MOUNT_OPT_NOASYNCREADDIR  (1<<7) /* no dcache readdir */
 #define CEPH_MOUNT_OPT_INO32           (1<<8) /* 32 bit inos */
+#define CEPH_MOUNT_OPT_DCACHE          (1<<9) /* use dcache for readdir etc */
 
 #define CEPH_MOUNT_OPT_DEFAULT    (CEPH_MOUNT_OPT_RBYTES)
 
diff --git a/fs/ceph/xattr.c b/fs/ceph/xattr.c
index a5e36e4..857214a 100644
--- a/fs/ceph/xattr.c
+++ b/fs/ceph/xattr.c
@@ -818,6 +818,7 @@
 	struct ceph_vxattr_cb *vxattrs = ceph_inode_vxattrs(inode);
 	int issued;
 	int err;
+	int required_blob_size;
 	int dirty;
 
 	if (ceph_snap(inode) != CEPH_NOSNAP)
@@ -833,14 +834,34 @@
 			return -EOPNOTSUPP;
 	}
 
+	err = -ENOMEM;
 	spin_lock(&ci->i_ceph_lock);
 	__build_xattrs(inode);
+retry:
 	issued = __ceph_caps_issued(ci, NULL);
 	dout("removexattr %p issued %s\n", inode, ceph_cap_string(issued));
 
 	if (!(issued & CEPH_CAP_XATTR_EXCL))
 		goto do_sync;
 
+	required_blob_size = __get_required_blob_size(ci, 0, 0);
+
+	if (!ci->i_xattrs.prealloc_blob ||
+	    required_blob_size > ci->i_xattrs.prealloc_blob->alloc_len) {
+		struct ceph_buffer *blob;
+
+		spin_unlock(&ci->i_ceph_lock);
+		dout(" preaallocating new blob size=%d\n", required_blob_size);
+		blob = ceph_buffer_new(required_blob_size, GFP_NOFS);
+		if (!blob)
+			goto out;
+		spin_lock(&ci->i_ceph_lock);
+		if (ci->i_xattrs.prealloc_blob)
+			ceph_buffer_put(ci->i_xattrs.prealloc_blob);
+		ci->i_xattrs.prealloc_blob = blob;
+		goto retry;
+	}
+
 	err = __remove_xattr_by_name(ceph_inode(inode), name);
 	dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_XATTR_EXCL);
 	ci->i_xattrs.dirty = true;
@@ -853,6 +874,7 @@
 do_sync:
 	spin_unlock(&ci->i_ceph_lock);
 	err = ceph_send_removexattr(dentry, name);
+out:
 	return err;
 }
 
diff --git a/fs/dcache.c b/fs/dcache.c
index 616fedf..16a53cc 100644
--- a/fs/dcache.c
+++ b/fs/dcache.c
@@ -1475,7 +1475,14 @@
 	return alias;
 }
 
-static struct dentry * d_find_any_alias(struct inode *inode)
+/**
+ * d_find_any_alias - find any alias for a given inode
+ * @inode: inode to find an alias for
+ *
+ * If any aliases exist for the given inode, take and return a
+ * reference for one of them.  If no aliases exist, return %NULL.
+ */
+struct dentry *d_find_any_alias(struct inode *inode)
 {
 	struct dentry *de;
 
@@ -1484,7 +1491,7 @@
 	spin_unlock(&inode->i_lock);
 	return de;
 }
-
+EXPORT_SYMBOL(d_find_any_alias);
 
 /**
  * d_obtain_alias - find or allocate a dentry for a given inode
diff --git a/include/linux/dcache.h b/include/linux/dcache.h
index 31f7322..d64a55b 100644
--- a/include/linux/dcache.h
+++ b/include/linux/dcache.h
@@ -242,6 +242,7 @@
 extern struct dentry * d_alloc_pseudo(struct super_block *, const struct qstr *);
 extern struct dentry * d_splice_alias(struct inode *, struct dentry *);
 extern struct dentry * d_add_ci(struct dentry *, struct inode *, struct qstr *);
+extern struct dentry *d_find_any_alias(struct inode *inode);
 extern struct dentry * d_obtain_alias(struct inode *);
 extern void shrink_dcache_sb(struct super_block *);
 extern void shrink_dcache_parent(struct dentry *);
diff --git a/net/ceph/crush/mapper.c b/net/ceph/crush/mapper.c
index 3a94eae..b79747c 100644
--- a/net/ceph/crush/mapper.c
+++ b/net/ceph/crush/mapper.c
@@ -510,10 +510,15 @@
 		switch (rule->steps[step].op) {
 		case CRUSH_RULE_TAKE:
 			w[0] = rule->steps[step].arg1;
-			if (force_pos >= 0) {
-				BUG_ON(force_context[force_pos] != w[0]);
+
+			/* find position in force_context/hierarchy */
+			while (force_pos >= 0 &&
+			       force_context[force_pos] != w[0])
 				force_pos--;
-			}
+			/* and move past it */
+			if (force_pos >= 0)
+				force_pos--;
+
 			wsize = 1;
 			break;
 
diff --git a/net/ceph/crypto.c b/net/ceph/crypto.c
index 85f3bc0..b780cb7 100644
--- a/net/ceph/crypto.c
+++ b/net/ceph/crypto.c
@@ -15,10 +15,9 @@
 			  const struct ceph_crypto_key *src)
 {
 	memcpy(dst, src, sizeof(struct ceph_crypto_key));
-	dst->key = kmalloc(src->len, GFP_NOFS);
+	dst->key = kmemdup(src->key, src->len, GFP_NOFS);
 	if (!dst->key)
 		return -ENOMEM;
-	memcpy(dst->key, src->key, src->len);
 	return 0;
 }
 
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index f4f3f58..5e25405 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -29,8 +29,8 @@
 			       struct ceph_osd_request *req);
 static void __unregister_linger_request(struct ceph_osd_client *osdc,
 					struct ceph_osd_request *req);
-static int __send_request(struct ceph_osd_client *osdc,
-			  struct ceph_osd_request *req);
+static void __send_request(struct ceph_osd_client *osdc,
+			   struct ceph_osd_request *req);
 
 static int op_needs_trail(int op)
 {
@@ -1022,8 +1022,8 @@
 /*
  * caller should hold map_sem (for read) and request_mutex
  */
-static int __send_request(struct ceph_osd_client *osdc,
-			  struct ceph_osd_request *req)
+static void __send_request(struct ceph_osd_client *osdc,
+			   struct ceph_osd_request *req)
 {
 	struct ceph_osd_request_head *reqhead;
 
@@ -1041,7 +1041,6 @@
 	ceph_msg_get(req->r_request); /* send consumes a ref */
 	ceph_con_send(&req->r_osd->o_con, req->r_request);
 	req->r_sent = req->r_osd->o_incarnation;
-	return 0;
 }
 
 /*
@@ -1726,17 +1725,9 @@
 			dout("send_request %p no up osds in pg\n", req);
 			ceph_monc_request_next_osdmap(&osdc->client->monc);
 		} else {
-			rc = __send_request(osdc, req);
-			if (rc) {
-				if (nofail) {
-					dout("osdc_start_request failed send, "
-					     " will retry %lld\n", req->r_tid);
-					rc = 0;
-				} else {
-					__unregister_request(osdc, req);
-				}
-			}
+			__send_request(osdc, req);
 		}
+		rc = 0;
 	}
 
 out_unlock: