Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client
* 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client:
ceph: ensure prealloc_blob is in place when removing xattr
rbd: initialize snap_rwsem in rbd_add()
ceph: enable/disable dentry complete flags via mount option
vfs: export symbol d_find_any_alias()
ceph: always initialize the dentry in open_root_dentry()
libceph: remove useless return value for osd_client __send_request()
ceph: avoid iput() while holding spinlock in ceph_dir_fsync
ceph: avoid useless dget/dput in encode_fh
ceph: dereference pointer after checking for NULL
crush: fix force for non-root TAKE
ceph: remove unnecessary d_fsdata conditional checks
ceph: Use kmemdup rather than duplicating its implementation
Fix up conflicts in fs/ceph/super.c (d_alloc_root() failure handling vs
always initialize the dentry in open_root_dentry)
diff --git a/Documentation/filesystems/ceph.txt b/Documentation/filesystems/ceph.txt
index 763d8eb..d6030aa 100644
--- a/Documentation/filesystems/ceph.txt
+++ b/Documentation/filesystems/ceph.txt
@@ -119,12 +119,20 @@
must rely on TCP's error correction to detect data corruption
in the data payload.
- noasyncreaddir
- Disable client's use its local cache to satisfy readdir
- requests. (This does not change correctness; the client uses
- cached metadata only when a lease or capability ensures it is
- valid.)
+ dcache
+ Use the dcache contents to perform negative lookups and
+ readdir when the client has the entire directory contents in
+ its cache. (This does not change correctness; the client uses
+ cached metadata only when a lease or capability ensures it is
+ valid.)
+ nodcache
+ Do not use the dcache as above. This avoids a significant amount of
+ complex code, sacrificing performance without affecting correctness,
+ and is useful for tracking down bugs.
+
+ noasyncreaddir
+ Do not use the dcache as above for readdir.
More Information
================
diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index 148ab94..3fd31de 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -2184,6 +2184,8 @@
INIT_LIST_HEAD(&rbd_dev->node);
INIT_LIST_HEAD(&rbd_dev->snaps);
+ init_rwsem(&rbd_dev->header.snap_rwsem);
+
/* generate unique id: find highest unique id, add one */
mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
diff --git a/fs/ceph/dir.c b/fs/ceph/dir.c
index 74fd747..618246b 100644
--- a/fs/ceph/dir.c
+++ b/fs/ceph/dir.c
@@ -973,7 +973,7 @@
spin_lock(&dentry->d_lock);
di = ceph_dentry(dentry);
- if (di && di->lease_session) {
+ if (di->lease_session) {
s = di->lease_session;
spin_lock(&s->s_cap_lock);
gen = s->s_cap_gen;
@@ -1072,13 +1072,11 @@
struct ceph_dentry_info *di = ceph_dentry(dentry);
dout("d_release %p\n", dentry);
- if (di) {
- ceph_dentry_lru_del(dentry);
- if (di->lease_session)
- ceph_put_mds_session(di->lease_session);
- kmem_cache_free(ceph_dentry_cachep, di);
- dentry->d_fsdata = NULL;
- }
+ ceph_dentry_lru_del(dentry);
+ if (di->lease_session)
+ ceph_put_mds_session(di->lease_session);
+ kmem_cache_free(ceph_dentry_cachep, di);
+ dentry->d_fsdata = NULL;
}
static int ceph_snapdir_d_revalidate(struct dentry *dentry,
@@ -1096,17 +1094,36 @@
*/
void ceph_dir_set_complete(struct inode *inode)
{
- /* not yet implemented */
+ struct dentry *dentry = d_find_any_alias(inode);
+
+ if (dentry && ceph_dentry(dentry) &&
+ ceph_test_mount_opt(ceph_sb_to_client(dentry->d_sb), DCACHE)) {
+ dout(" marking %p (%p) complete\n", inode, dentry);
+ set_bit(CEPH_D_COMPLETE, &ceph_dentry(dentry)->flags);
+ }
+ dput(dentry);
}
void ceph_dir_clear_complete(struct inode *inode)
{
- /* not yet implemented */
+ struct dentry *dentry = d_find_any_alias(inode);
+
+ if (dentry && ceph_dentry(dentry)) {
+ dout(" marking %p (%p) complete\n", inode, dentry);
+ set_bit(CEPH_D_COMPLETE, &ceph_dentry(dentry)->flags);
+ }
+ dput(dentry);
}
bool ceph_dir_test_complete(struct inode *inode)
{
- /* not yet implemented */
+ struct dentry *dentry = d_find_any_alias(inode);
+
+ if (dentry && ceph_dentry(dentry)) {
+ dout(" marking %p (%p) NOT complete\n", inode, dentry);
+ clear_bit(CEPH_D_COMPLETE, &ceph_dentry(dentry)->flags);
+ }
+ dput(dentry);
return false;
}
@@ -1220,6 +1237,7 @@
do {
ceph_mdsc_get_request(req);
spin_unlock(&ci->i_unsafe_lock);
+
dout("dir_fsync %p wait on tid %llu (until %llu)\n",
inode, req->r_tid, last_tid);
if (req->r_timeout) {
@@ -1232,9 +1250,9 @@
} else {
wait_for_completion(&req->r_safe_completion);
}
- spin_lock(&ci->i_unsafe_lock);
ceph_mdsc_put_request(req);
+ spin_lock(&ci->i_unsafe_lock);
if (ret || list_empty(head))
break;
req = list_entry(head->next,
@@ -1259,13 +1277,11 @@
dout("dentry_lru_add %p %p '%.*s'\n", di, dn,
dn->d_name.len, dn->d_name.name);
- if (di) {
- mdsc = ceph_sb_to_client(dn->d_sb)->mdsc;
- spin_lock(&mdsc->dentry_lru_lock);
- list_add_tail(&di->lru, &mdsc->dentry_lru);
- mdsc->num_dentry++;
- spin_unlock(&mdsc->dentry_lru_lock);
- }
+ mdsc = ceph_sb_to_client(dn->d_sb)->mdsc;
+ spin_lock(&mdsc->dentry_lru_lock);
+ list_add_tail(&di->lru, &mdsc->dentry_lru);
+ mdsc->num_dentry++;
+ spin_unlock(&mdsc->dentry_lru_lock);
}
void ceph_dentry_lru_touch(struct dentry *dn)
@@ -1275,12 +1291,10 @@
dout("dentry_lru_touch %p %p '%.*s' (offset %lld)\n", di, dn,
dn->d_name.len, dn->d_name.name, di->offset);
- if (di) {
- mdsc = ceph_sb_to_client(dn->d_sb)->mdsc;
- spin_lock(&mdsc->dentry_lru_lock);
- list_move_tail(&di->lru, &mdsc->dentry_lru);
- spin_unlock(&mdsc->dentry_lru_lock);
- }
+ mdsc = ceph_sb_to_client(dn->d_sb)->mdsc;
+ spin_lock(&mdsc->dentry_lru_lock);
+ list_move_tail(&di->lru, &mdsc->dentry_lru);
+ spin_unlock(&mdsc->dentry_lru_lock);
}
void ceph_dentry_lru_del(struct dentry *dn)
@@ -1290,13 +1304,11 @@
dout("dentry_lru_del %p %p '%.*s'\n", di, dn,
dn->d_name.len, dn->d_name.name);
- if (di) {
- mdsc = ceph_sb_to_client(dn->d_sb)->mdsc;
- spin_lock(&mdsc->dentry_lru_lock);
- list_del_init(&di->lru);
- mdsc->num_dentry--;
- spin_unlock(&mdsc->dentry_lru_lock);
- }
+ mdsc = ceph_sb_to_client(dn->d_sb)->mdsc;
+ spin_lock(&mdsc->dentry_lru_lock);
+ list_del_init(&di->lru);
+ mdsc->num_dentry--;
+ spin_unlock(&mdsc->dentry_lru_lock);
}
/*
diff --git a/fs/ceph/export.c b/fs/ceph/export.c
index 9fbcdec..fbb2a64 100644
--- a/fs/ceph/export.c
+++ b/fs/ceph/export.c
@@ -56,9 +56,7 @@
return -EINVAL;
spin_lock(&dentry->d_lock);
- parent = dget(dentry->d_parent);
- spin_unlock(&dentry->d_lock);
-
+ parent = dentry->d_parent;
if (*max_len >= connected_handle_length) {
dout("encode_fh %p connectable\n", dentry);
cfh->ino = ceph_ino(dentry->d_inode);
@@ -81,7 +79,7 @@
*max_len = handle_length;
type = 255;
}
- dput(parent);
+ spin_unlock(&dentry->d_lock);
return type;
}
diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c
index 25283e7..2c48937 100644
--- a/fs/ceph/inode.c
+++ b/fs/ceph/inode.c
@@ -850,11 +850,12 @@
{
struct dentry *dir = dn->d_parent;
struct inode *inode = dir->d_inode;
- struct ceph_inode_info *ci = ceph_inode(inode);
+ struct ceph_inode_info *ci;
struct ceph_dentry_info *di;
BUG_ON(!inode);
+ ci = ceph_inode(inode);
di = ceph_dentry(dn);
spin_lock(&ci->i_ceph_lock);
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index 6203d80..23ab6a3 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -2772,7 +2772,7 @@
di = ceph_dentry(dentry);
switch (h->action) {
case CEPH_MDS_LEASE_REVOKE:
- if (di && di->lease_session == session) {
+ if (di->lease_session == session) {
if (ceph_seq_cmp(di->lease_seq, seq) > 0)
h->seq = cpu_to_le32(di->lease_seq);
__ceph_mdsc_drop_dentry_lease(dentry);
@@ -2781,7 +2781,7 @@
break;
case CEPH_MDS_LEASE_RENEW:
- if (di && di->lease_session == session &&
+ if (di->lease_session == session &&
di->lease_gen == session->s_cap_gen &&
di->lease_renew_from &&
di->lease_renew_after == 0) {
diff --git a/fs/ceph/super.c b/fs/ceph/super.c
index 48f61a1..00de2c9 100644
--- a/fs/ceph/super.c
+++ b/fs/ceph/super.c
@@ -131,6 +131,8 @@
Opt_rbytes,
Opt_norbytes,
Opt_noasyncreaddir,
+ Opt_dcache,
+ Opt_nodcache,
Opt_ino32,
};
@@ -152,6 +154,8 @@
{Opt_rbytes, "rbytes"},
{Opt_norbytes, "norbytes"},
{Opt_noasyncreaddir, "noasyncreaddir"},
+ {Opt_dcache, "dcache"},
+ {Opt_nodcache, "nodcache"},
{Opt_ino32, "ino32"},
{-1, NULL}
};
@@ -231,6 +235,12 @@
case Opt_noasyncreaddir:
fsopt->flags |= CEPH_MOUNT_OPT_NOASYNCREADDIR;
break;
+ case Opt_dcache:
+ fsopt->flags |= CEPH_MOUNT_OPT_DCACHE;
+ break;
+ case Opt_nodcache:
+ fsopt->flags &= ~CEPH_MOUNT_OPT_DCACHE;
+ break;
case Opt_ino32:
fsopt->flags |= CEPH_MOUNT_OPT_INO32;
break;
@@ -377,6 +387,10 @@
seq_puts(m, ",norbytes");
if (fsopt->flags & CEPH_MOUNT_OPT_NOASYNCREADDIR)
seq_puts(m, ",noasyncreaddir");
+ if (fsopt->flags & CEPH_MOUNT_OPT_DCACHE)
+ seq_puts(m, ",dcache");
+ else
+ seq_puts(m, ",nodcache");
if (fsopt->wsize)
seq_printf(m, ",wsize=%d", fsopt->wsize);
@@ -647,10 +661,10 @@
root = ERR_PTR(-ENOMEM);
goto out;
}
- ceph_init_dentry(root);
} else {
root = d_obtain_alias(inode);
}
+ ceph_init_dentry(root);
dout("open_root_inode success, root dentry is %p\n", root);
} else {
root = ERR_PTR(err);
diff --git a/fs/ceph/super.h b/fs/ceph/super.h
index cb3652b..1421f3d 100644
--- a/fs/ceph/super.h
+++ b/fs/ceph/super.h
@@ -28,6 +28,7 @@
#define CEPH_MOUNT_OPT_RBYTES (1<<5) /* dir st_bytes = rbytes */
#define CEPH_MOUNT_OPT_NOASYNCREADDIR (1<<7) /* no dcache readdir */
#define CEPH_MOUNT_OPT_INO32 (1<<8) /* 32 bit inos */
+#define CEPH_MOUNT_OPT_DCACHE (1<<9) /* use dcache for readdir etc */
#define CEPH_MOUNT_OPT_DEFAULT (CEPH_MOUNT_OPT_RBYTES)
diff --git a/fs/ceph/xattr.c b/fs/ceph/xattr.c
index a5e36e4..857214a 100644
--- a/fs/ceph/xattr.c
+++ b/fs/ceph/xattr.c
@@ -818,6 +818,7 @@
struct ceph_vxattr_cb *vxattrs = ceph_inode_vxattrs(inode);
int issued;
int err;
+ int required_blob_size;
int dirty;
if (ceph_snap(inode) != CEPH_NOSNAP)
@@ -833,14 +834,34 @@
return -EOPNOTSUPP;
}
+ err = -ENOMEM;
spin_lock(&ci->i_ceph_lock);
__build_xattrs(inode);
+retry:
issued = __ceph_caps_issued(ci, NULL);
dout("removexattr %p issued %s\n", inode, ceph_cap_string(issued));
if (!(issued & CEPH_CAP_XATTR_EXCL))
goto do_sync;
+ required_blob_size = __get_required_blob_size(ci, 0, 0);
+
+ if (!ci->i_xattrs.prealloc_blob ||
+ required_blob_size > ci->i_xattrs.prealloc_blob->alloc_len) {
+ struct ceph_buffer *blob;
+
+ spin_unlock(&ci->i_ceph_lock);
+ dout(" preaallocating new blob size=%d\n", required_blob_size);
+ blob = ceph_buffer_new(required_blob_size, GFP_NOFS);
+ if (!blob)
+ goto out;
+ spin_lock(&ci->i_ceph_lock);
+ if (ci->i_xattrs.prealloc_blob)
+ ceph_buffer_put(ci->i_xattrs.prealloc_blob);
+ ci->i_xattrs.prealloc_blob = blob;
+ goto retry;
+ }
+
err = __remove_xattr_by_name(ceph_inode(inode), name);
dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_XATTR_EXCL);
ci->i_xattrs.dirty = true;
@@ -853,6 +874,7 @@
do_sync:
spin_unlock(&ci->i_ceph_lock);
err = ceph_send_removexattr(dentry, name);
+out:
return err;
}
diff --git a/fs/dcache.c b/fs/dcache.c
index 616fedf..16a53cc 100644
--- a/fs/dcache.c
+++ b/fs/dcache.c
@@ -1475,7 +1475,14 @@
return alias;
}
-static struct dentry * d_find_any_alias(struct inode *inode)
+/**
+ * d_find_any_alias - find any alias for a given inode
+ * @inode: inode to find an alias for
+ *
+ * If any aliases exist for the given inode, take and return a
+ * reference for one of them. If no aliases exist, return %NULL.
+ */
+struct dentry *d_find_any_alias(struct inode *inode)
{
struct dentry *de;
@@ -1484,7 +1491,7 @@
spin_unlock(&inode->i_lock);
return de;
}
-
+EXPORT_SYMBOL(d_find_any_alias);
/**
* d_obtain_alias - find or allocate a dentry for a given inode
diff --git a/include/linux/dcache.h b/include/linux/dcache.h
index 31f7322..d64a55b 100644
--- a/include/linux/dcache.h
+++ b/include/linux/dcache.h
@@ -242,6 +242,7 @@
extern struct dentry * d_alloc_pseudo(struct super_block *, const struct qstr *);
extern struct dentry * d_splice_alias(struct inode *, struct dentry *);
extern struct dentry * d_add_ci(struct dentry *, struct inode *, struct qstr *);
+extern struct dentry *d_find_any_alias(struct inode *inode);
extern struct dentry * d_obtain_alias(struct inode *);
extern void shrink_dcache_sb(struct super_block *);
extern void shrink_dcache_parent(struct dentry *);
diff --git a/net/ceph/crush/mapper.c b/net/ceph/crush/mapper.c
index 3a94eae..b79747c 100644
--- a/net/ceph/crush/mapper.c
+++ b/net/ceph/crush/mapper.c
@@ -510,10 +510,15 @@
switch (rule->steps[step].op) {
case CRUSH_RULE_TAKE:
w[0] = rule->steps[step].arg1;
- if (force_pos >= 0) {
- BUG_ON(force_context[force_pos] != w[0]);
+
+ /* find position in force_context/hierarchy */
+ while (force_pos >= 0 &&
+ force_context[force_pos] != w[0])
force_pos--;
- }
+ /* and move past it */
+ if (force_pos >= 0)
+ force_pos--;
+
wsize = 1;
break;
diff --git a/net/ceph/crypto.c b/net/ceph/crypto.c
index 85f3bc0..b780cb7 100644
--- a/net/ceph/crypto.c
+++ b/net/ceph/crypto.c
@@ -15,10 +15,9 @@
const struct ceph_crypto_key *src)
{
memcpy(dst, src, sizeof(struct ceph_crypto_key));
- dst->key = kmalloc(src->len, GFP_NOFS);
+ dst->key = kmemdup(src->key, src->len, GFP_NOFS);
if (!dst->key)
return -ENOMEM;
- memcpy(dst->key, src->key, src->len);
return 0;
}
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index f4f3f58..5e25405 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -29,8 +29,8 @@
struct ceph_osd_request *req);
static void __unregister_linger_request(struct ceph_osd_client *osdc,
struct ceph_osd_request *req);
-static int __send_request(struct ceph_osd_client *osdc,
- struct ceph_osd_request *req);
+static void __send_request(struct ceph_osd_client *osdc,
+ struct ceph_osd_request *req);
static int op_needs_trail(int op)
{
@@ -1022,8 +1022,8 @@
/*
* caller should hold map_sem (for read) and request_mutex
*/
-static int __send_request(struct ceph_osd_client *osdc,
- struct ceph_osd_request *req)
+static void __send_request(struct ceph_osd_client *osdc,
+ struct ceph_osd_request *req)
{
struct ceph_osd_request_head *reqhead;
@@ -1041,7 +1041,6 @@
ceph_msg_get(req->r_request); /* send consumes a ref */
ceph_con_send(&req->r_osd->o_con, req->r_request);
req->r_sent = req->r_osd->o_incarnation;
- return 0;
}
/*
@@ -1726,17 +1725,9 @@
dout("send_request %p no up osds in pg\n", req);
ceph_monc_request_next_osdmap(&osdc->client->monc);
} else {
- rc = __send_request(osdc, req);
- if (rc) {
- if (nofail) {
- dout("osdc_start_request failed send, "
- " will retry %lld\n", req->r_tid);
- rc = 0;
- } else {
- __unregister_request(osdc, req);
- }
- }
+ __send_request(osdc, req);
}
+ rc = 0;
}
out_unlock: