nfsd: split DRC global spinlock into per-bucket locks
Signed-off-by: Trond Myklebust <trond.myklebust@primarydata.com>
Signed-off-by: J. Bruce Fields <bfields@redhat.com>
diff --git a/fs/nfsd/nfscache.c b/fs/nfsd/nfscache.c
index dc90909..7460365 100644
--- a/fs/nfsd/nfscache.c
+++ b/fs/nfsd/nfscache.c
@@ -29,6 +29,7 @@
struct nfsd_drc_bucket {
struct list_head lru_head;
+ spinlock_t cache_lock;
};
static struct nfsd_drc_bucket *drc_hashtbl;
@@ -79,7 +80,6 @@
* A cache entry is "single use" if c_state == RC_INPROG
* Otherwise, it when accessing _prev or _next, the lock must be held.
*/
-static DEFINE_SPINLOCK(cache_lock);
static DECLARE_DELAYED_WORK(cache_cleaner, cache_cleaner_func);
/*
@@ -154,11 +154,11 @@
}
static void
-nfsd_reply_cache_free(struct svc_cacherep *rp)
+nfsd_reply_cache_free(struct nfsd_drc_bucket *b, struct svc_cacherep *rp)
{
- spin_lock(&cache_lock);
+ spin_lock(&b->cache_lock);
nfsd_reply_cache_free_locked(rp);
- spin_unlock(&cache_lock);
+ spin_unlock(&b->cache_lock);
}
int nfsd_reply_cache_init(void)
@@ -180,8 +180,10 @@
drc_hashtbl = kcalloc(hashsize, sizeof(*drc_hashtbl), GFP_KERNEL);
if (!drc_hashtbl)
goto out_nomem;
- for (i = 0; i < hashsize; i++)
+ for (i = 0; i < hashsize; i++) {
INIT_LIST_HEAD(&drc_hashtbl[i].lru_head);
+ spin_lock_init(&drc_hashtbl[i].cache_lock);
+ }
drc_hashsize = hashsize;
return 0;
@@ -265,9 +267,13 @@
for (i = 0; i < drc_hashsize; i++) {
struct nfsd_drc_bucket *b = &drc_hashtbl[i];
+ if (list_empty(&b->lru_head))
+ continue;
+ spin_lock(&b->cache_lock);
freed += prune_bucket(b);
if (!list_empty(&b->lru_head))
cancel = false;
+ spin_unlock(&b->cache_lock);
}
/*
@@ -282,9 +288,7 @@
static void
cache_cleaner_func(struct work_struct *unused)
{
- spin_lock(&cache_lock);
prune_cache_entries();
- spin_unlock(&cache_lock);
}
static unsigned long
@@ -296,12 +300,7 @@
static unsigned long
nfsd_reply_cache_scan(struct shrinker *shrink, struct shrink_control *sc)
{
- unsigned long freed;
-
- spin_lock(&cache_lock);
- freed = prune_cache_entries();
- spin_unlock(&cache_lock);
- return freed;
+ return prune_cache_entries();
}
/*
* Walk an xdr_buf and get a CRC for at most the first RC_CSUMLEN bytes
@@ -426,14 +425,14 @@
* preallocate an entry.
*/
rp = nfsd_reply_cache_alloc();
- spin_lock(&cache_lock);
+ spin_lock(&b->cache_lock);
if (likely(rp)) {
atomic_inc(&num_drc_entries);
drc_mem_usage += sizeof(*rp);
}
/* go ahead and prune the cache */
- prune_cache_entries();
+ prune_bucket(b);
found = nfsd_cache_search(b, rqstp, csum);
if (found) {
@@ -470,7 +469,7 @@
}
rp->c_type = RC_NOCACHE;
out:
- spin_unlock(&cache_lock);
+ spin_unlock(&b->cache_lock);
return rtn;
found_entry:
@@ -548,7 +547,7 @@
/* Don't cache excessive amounts of data and XDR failures */
if (!statp || len > (256 >> 2)) {
- nfsd_reply_cache_free(rp);
+ nfsd_reply_cache_free(b, rp);
return;
}
@@ -563,23 +562,23 @@
bufsize = len << 2;
cachv->iov_base = kmalloc(bufsize, GFP_KERNEL);
if (!cachv->iov_base) {
- nfsd_reply_cache_free(rp);
+ nfsd_reply_cache_free(b, rp);
return;
}
cachv->iov_len = bufsize;
memcpy(cachv->iov_base, statp, bufsize);
break;
case RC_NOCACHE:
- nfsd_reply_cache_free(rp);
+ nfsd_reply_cache_free(b, rp);
return;
}
- spin_lock(&cache_lock);
+ spin_lock(&b->cache_lock);
drc_mem_usage += bufsize;
lru_put_end(b, rp);
rp->c_secure = rqstp->rq_secure;
rp->c_type = cachetype;
rp->c_state = RC_DONE;
- spin_unlock(&cache_lock);
+ spin_unlock(&b->cache_lock);
return;
}
@@ -610,7 +609,6 @@
*/
static int nfsd_reply_cache_stats_show(struct seq_file *m, void *v)
{
- spin_lock(&cache_lock);
seq_printf(m, "max entries: %u\n", max_drc_entries);
seq_printf(m, "num entries: %u\n",
atomic_read(&num_drc_entries));
@@ -622,7 +620,6 @@
seq_printf(m, "payload misses: %u\n", payload_misses);
seq_printf(m, "longest chain len: %u\n", longest_chain);
seq_printf(m, "cachesize at longest: %u\n", longest_chain_cachesize);
- spin_unlock(&cache_lock);
return 0;
}