Merge branch 'for-3.19' of git://linux-nfs.org/~bfields/linux

Pull nfsd updates from Bruce Fields:
 "A comparatively quieter cycle for nfsd this time, but still with two
  larger changes:

   - RPC server scalability improvements from Jeff Layton (using RCU
     instead of a spinlock to find idle threads).

   - server-side NFSv4.2 ALLOCATE/DEALLOCATE support from Anna
     Schumaker, enabling fallocate on new clients"

* 'for-3.19' of git://linux-nfs.org/~bfields/linux: (32 commits)
  nfsd4: fix xdr4 count of server in fs_location4
  nfsd4: fix xdr4 inclusion of escaped char
  sunrpc/cache: convert to use string_escape_str()
  sunrpc: only call test_bit once in svc_xprt_received
  fs: nfsd: Fix signedness bug in compare_blob
  sunrpc: add some tracepoints around enqueue and dequeue of svc_xprt
  sunrpc: convert to lockless lookup of queued server threads
  sunrpc: fix potential races in pool_stats collection
  sunrpc: add a rcu_head to svc_rqst and use kfree_rcu to free it
  sunrpc: require svc_create callers to pass in meaningful shutdown routine
  sunrpc: have svc_wake_up only deal with pool 0
  sunrpc: convert sp_task_pending flag to use atomic bitops
  sunrpc: move rq_cachetype field to better optimize space
  sunrpc: move rq_splice_ok flag into rq_flags
  sunrpc: move rq_dropme flag into rq_flags
  sunrpc: move rq_usedeferral flag to rq_flags
  sunrpc: move rq_local field to rq_flags
  sunrpc: add a generic rq_flags field to svc_rqst and move rq_secure to it
  nfsd: minor off by one checks in __write_versions()
  sunrpc: release svc_pool_map reference when serv allocation fails
  ...
diff --git a/drivers/staging/android/ashmem.c b/drivers/staging/android/ashmem.c
index 46f8ef4..8c78527 100644
--- a/drivers/staging/android/ashmem.c
+++ b/drivers/staging/android/ashmem.c
@@ -446,7 +446,7 @@
 		loff_t start = range->pgstart * PAGE_SIZE;
 		loff_t end = (range->pgend + 1) * PAGE_SIZE;
 
-		do_fallocate(range->asma->file,
+		vfs_fallocate(range->asma->file,
 				FALLOC_FL_PUNCH_HOLE | FALLOC_FL_KEEP_SIZE,
 				start, end - start);
 		range->purged = ASHMEM_WAS_PURGED;
diff --git a/fs/ioctl.c b/fs/ioctl.c
index 77c9a78..214c3c1 100644
--- a/fs/ioctl.c
+++ b/fs/ioctl.c
@@ -443,7 +443,7 @@
 		return -EINVAL;
 	}
 
-	return do_fallocate(filp, FALLOC_FL_KEEP_SIZE, sr.l_start, sr.l_len);
+	return vfs_fallocate(filp, FALLOC_FL_KEEP_SIZE, sr.l_start, sr.l_len);
 }
 
 static int file_ioctl(struct file *filp, unsigned int cmd,
diff --git a/fs/lockd/mon.c b/fs/lockd/mon.c
index 9106f42..1cc6ec5 100644
--- a/fs/lockd/mon.c
+++ b/fs/lockd/mon.c
@@ -214,7 +214,7 @@
 	if (unlikely(res.status != 0))
 		status = -EIO;
 	if (unlikely(status < 0)) {
-		printk(KERN_NOTICE "lockd: cannot monitor %s\n", nsm->sm_name);
+		pr_notice_ratelimited("lockd: cannot monitor %s\n", nsm->sm_name);
 		return status;
 	}
 
diff --git a/fs/lockd/svc.c b/fs/lockd/svc.c
index d1bb7ec..e94c887 100644
--- a/fs/lockd/svc.c
+++ b/fs/lockd/svc.c
@@ -350,7 +350,7 @@
 		printk(KERN_WARNING
 			"lockd_up: no pid, %d users??\n", nlmsvc_users);
 
-	serv = svc_create(&nlmsvc_program, LOCKD_BUFSIZE, NULL);
+	serv = svc_create(&nlmsvc_program, LOCKD_BUFSIZE, svc_rpcb_cleanup);
 	if (!serv) {
 		printk(KERN_WARNING "lockd_up: create service failed\n");
 		return ERR_PTR(-ENOMEM);
diff --git a/fs/nfsd/nfs4proc.c b/fs/nfsd/nfs4proc.c
index 0beb023..ac71d13 100644
--- a/fs/nfsd/nfs4proc.c
+++ b/fs/nfsd/nfs4proc.c
@@ -33,6 +33,7 @@
  *  SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  */
 #include <linux/file.h>
+#include <linux/falloc.h>
 #include <linux/slab.h>
 
 #include "idmap.h"
@@ -772,7 +773,7 @@
 	 * the client wants us to do more in this compound:
 	 */
 	if (!nfsd4_last_compound_op(rqstp))
-		rqstp->rq_splice_ok = false;
+		clear_bit(RQ_SPLICE_OK, &rqstp->rq_flags);
 
 	/* check stateid */
 	if ((status = nfs4_preprocess_stateid_op(SVC_NET(rqstp),
@@ -1014,6 +1015,44 @@
 }
 
 static __be32
+nfsd4_fallocate(struct svc_rqst *rqstp, struct nfsd4_compound_state *cstate,
+		struct nfsd4_fallocate *fallocate, int flags)
+{
+	__be32 status = nfserr_notsupp;
+	struct file *file;
+
+	status = nfs4_preprocess_stateid_op(SVC_NET(rqstp), cstate,
+					    &fallocate->falloc_stateid,
+					    WR_STATE, &file);
+	if (status != nfs_ok) {
+		dprintk("NFSD: nfsd4_fallocate: couldn't process stateid!\n");
+		return status;
+	}
+
+	status = nfsd4_vfs_fallocate(rqstp, &cstate->current_fh, file,
+				     fallocate->falloc_offset,
+				     fallocate->falloc_length,
+				     flags);
+	fput(file);
+	return status;
+}
+
+static __be32
+nfsd4_allocate(struct svc_rqst *rqstp, struct nfsd4_compound_state *cstate,
+	       struct nfsd4_fallocate *fallocate)
+{
+	return nfsd4_fallocate(rqstp, cstate, fallocate, 0);
+}
+
+static __be32
+nfsd4_deallocate(struct svc_rqst *rqstp, struct nfsd4_compound_state *cstate,
+		 struct nfsd4_fallocate *fallocate)
+{
+	return nfsd4_fallocate(rqstp, cstate, fallocate,
+			       FALLOC_FL_PUNCH_HOLE | FALLOC_FL_KEEP_SIZE);
+}
+
+static __be32
 nfsd4_seek(struct svc_rqst *rqstp, struct nfsd4_compound_state *cstate,
 		struct nfsd4_seek *seek)
 {
@@ -1331,7 +1370,7 @@
 	 * Don't use the deferral mechanism for NFSv4; compounds make it
 	 * too hard to avoid non-idempotency problems.
 	 */
-	rqstp->rq_usedeferral = false;
+	clear_bit(RQ_USEDEFERRAL, &rqstp->rq_flags);
 
 	/*
 	 * According to RFC3010, this takes precedence over all other errors.
@@ -1447,7 +1486,7 @@
 	BUG_ON(cstate->replay_owner);
 out:
 	/* Reset deferral mechanism for RPC deferrals */
-	rqstp->rq_usedeferral = true;
+	set_bit(RQ_USEDEFERRAL, &rqstp->rq_flags);
 	dprintk("nfsv4 compound returned %d\n", ntohl(status));
 	return status;
 }
@@ -1929,6 +1968,18 @@
 	},
 
 	/* NFSv4.2 operations */
+	[OP_ALLOCATE] = {
+		.op_func = (nfsd4op_func)nfsd4_allocate,
+		.op_flags = OP_MODIFIES_SOMETHING | OP_CACHEME,
+		.op_name = "OP_ALLOCATE",
+		.op_rsize_bop = (nfsd4op_rsize)nfsd4_write_rsize,
+	},
+	[OP_DEALLOCATE] = {
+		.op_func = (nfsd4op_func)nfsd4_deallocate,
+		.op_flags = OP_MODIFIES_SOMETHING | OP_CACHEME,
+		.op_name = "OP_DEALLOCATE",
+		.op_rsize_bop = (nfsd4op_rsize)nfsd4_write_rsize,
+	},
 	[OP_SEEK] = {
 		.op_func = (nfsd4op_func)nfsd4_seek,
 		.op_name = "OP_SEEK",
diff --git a/fs/nfsd/nfs4state.c b/fs/nfsd/nfs4state.c
index 4e1d726..3550a9c 100644
--- a/fs/nfsd/nfs4state.c
+++ b/fs/nfsd/nfs4state.c
@@ -275,9 +275,11 @@
 	return x;
 }
 
-static void nfsd4_free_file(struct nfs4_file *f)
+static void nfsd4_free_file_rcu(struct rcu_head *rcu)
 {
-	kmem_cache_free(file_slab, f);
+	struct nfs4_file *fp = container_of(rcu, struct nfs4_file, fi_rcu);
+
+	kmem_cache_free(file_slab, fp);
 }
 
 static inline void
@@ -286,9 +288,10 @@
 	might_lock(&state_lock);
 
 	if (atomic_dec_and_lock(&fi->fi_ref, &state_lock)) {
-		hlist_del(&fi->fi_hash);
+		hlist_del_rcu(&fi->fi_hash);
 		spin_unlock(&state_lock);
-		nfsd4_free_file(fi);
+		WARN_ON_ONCE(!list_empty(&fi->fi_delegations));
+		call_rcu(&fi->fi_rcu, nfsd4_free_file_rcu);
 	}
 }
 
@@ -1440,7 +1443,7 @@
 	list_add(&new->se_perclnt, &clp->cl_sessions);
 	spin_unlock(&clp->cl_lock);
 
-	if (cses->flags & SESSION4_BACK_CHAN) {
+	{
 		struct sockaddr *sa = svc_addr(rqstp);
 		/*
 		 * This is a little silly; with sessions there's no real
@@ -1711,15 +1714,14 @@
 	return 0;
 }
 
-static long long
+static int
 compare_blob(const struct xdr_netobj *o1, const struct xdr_netobj *o2)
 {
-	long long res;
-
-	res = o1->len - o2->len;
-	if (res)
-		return res;
-	return (long long)memcmp(o1->data, o2->data, o1->len);
+	if (o1->len < o2->len)
+		return -1;
+	if (o1->len > o2->len)
+		return 1;
+	return memcmp(o1->data, o2->data, o1->len);
 }
 
 static int same_name(const char *n1, const char *n2)
@@ -1907,7 +1909,7 @@
 static struct nfs4_client *
 find_clp_in_name_tree(struct xdr_netobj *name, struct rb_root *root)
 {
-	long long cmp;
+	int cmp;
 	struct rb_node *node = root->rb_node;
 	struct nfs4_client *clp;
 
@@ -3057,10 +3059,9 @@
 }
 
 /* OPEN Share state helper functions */
-static void nfsd4_init_file(struct nfs4_file *fp, struct knfsd_fh *fh)
+static void nfsd4_init_file(struct knfsd_fh *fh, unsigned int hashval,
+				struct nfs4_file *fp)
 {
-	unsigned int hashval = file_hashval(fh);
-
 	lockdep_assert_held(&state_lock);
 
 	atomic_set(&fp->fi_ref, 1);
@@ -3073,7 +3074,7 @@
 	fp->fi_share_deny = 0;
 	memset(fp->fi_fds, 0, sizeof(fp->fi_fds));
 	memset(fp->fi_access, 0, sizeof(fp->fi_access));
-	hlist_add_head(&fp->fi_hash, &file_hashtbl[hashval]);
+	hlist_add_head_rcu(&fp->fi_hash, &file_hashtbl[hashval]);
 }
 
 void
@@ -3294,17 +3295,14 @@
 
 /* search file_hashtbl[] for file */
 static struct nfs4_file *
-find_file_locked(struct knfsd_fh *fh)
+find_file_locked(struct knfsd_fh *fh, unsigned int hashval)
 {
-	unsigned int hashval = file_hashval(fh);
 	struct nfs4_file *fp;
 
-	lockdep_assert_held(&state_lock);
-
-	hlist_for_each_entry(fp, &file_hashtbl[hashval], fi_hash) {
+	hlist_for_each_entry_rcu(fp, &file_hashtbl[hashval], fi_hash) {
 		if (nfsd_fh_match(&fp->fi_fhandle, fh)) {
-			get_nfs4_file(fp);
-			return fp;
+			if (atomic_inc_not_zero(&fp->fi_ref))
+				return fp;
 		}
 	}
 	return NULL;
@@ -3314,10 +3312,11 @@
 find_file(struct knfsd_fh *fh)
 {
 	struct nfs4_file *fp;
+	unsigned int hashval = file_hashval(fh);
 
-	spin_lock(&state_lock);
-	fp = find_file_locked(fh);
-	spin_unlock(&state_lock);
+	rcu_read_lock();
+	fp = find_file_locked(fh, hashval);
+	rcu_read_unlock();
 	return fp;
 }
 
@@ -3325,11 +3324,18 @@
 find_or_add_file(struct nfs4_file *new, struct knfsd_fh *fh)
 {
 	struct nfs4_file *fp;
+	unsigned int hashval = file_hashval(fh);
+
+	rcu_read_lock();
+	fp = find_file_locked(fh, hashval);
+	rcu_read_unlock();
+	if (fp)
+		return fp;
 
 	spin_lock(&state_lock);
-	fp = find_file_locked(fh);
-	if (fp == NULL) {
-		nfsd4_init_file(new, fh);
+	fp = find_file_locked(fh, hashval);
+	if (likely(fp == NULL)) {
+		nfsd4_init_file(fh, hashval, new);
 		fp = new;
 	}
 	spin_unlock(&state_lock);
@@ -4127,7 +4133,7 @@
 		nfs4_put_stateowner(so);
 	}
 	if (open->op_file)
-		nfsd4_free_file(open->op_file);
+		kmem_cache_free(file_slab, open->op_file);
 	if (open->op_stp)
 		nfs4_put_stid(&open->op_stp->st_stid);
 }
diff --git a/fs/nfsd/nfs4xdr.c b/fs/nfsd/nfs4xdr.c
index b1eed4d..15f7b73 100644
--- a/fs/nfsd/nfs4xdr.c
+++ b/fs/nfsd/nfs4xdr.c
@@ -1514,6 +1514,23 @@
 }
 
 static __be32
+nfsd4_decode_fallocate(struct nfsd4_compoundargs *argp,
+		       struct nfsd4_fallocate *fallocate)
+{
+	DECODE_HEAD;
+
+	status = nfsd4_decode_stateid(argp, &fallocate->falloc_stateid);
+	if (status)
+		return status;
+
+	READ_BUF(16);
+	p = xdr_decode_hyper(p, &fallocate->falloc_offset);
+	xdr_decode_hyper(p, &fallocate->falloc_length);
+
+	DECODE_TAIL;
+}
+
+static __be32
 nfsd4_decode_seek(struct nfsd4_compoundargs *argp, struct nfsd4_seek *seek)
 {
 	DECODE_HEAD;
@@ -1604,10 +1621,10 @@
 	[OP_RECLAIM_COMPLETE]	= (nfsd4_dec)nfsd4_decode_reclaim_complete,
 
 	/* new operations for NFSv4.2 */
-	[OP_ALLOCATE]		= (nfsd4_dec)nfsd4_decode_notsupp,
+	[OP_ALLOCATE]		= (nfsd4_dec)nfsd4_decode_fallocate,
 	[OP_COPY]		= (nfsd4_dec)nfsd4_decode_notsupp,
 	[OP_COPY_NOTIFY]	= (nfsd4_dec)nfsd4_decode_notsupp,
-	[OP_DEALLOCATE]		= (nfsd4_dec)nfsd4_decode_notsupp,
+	[OP_DEALLOCATE]		= (nfsd4_dec)nfsd4_decode_fallocate,
 	[OP_IO_ADVISE]		= (nfsd4_dec)nfsd4_decode_notsupp,
 	[OP_LAYOUTERROR]	= (nfsd4_dec)nfsd4_decode_notsupp,
 	[OP_LAYOUTSTATS]	= (nfsd4_dec)nfsd4_decode_notsupp,
@@ -1714,7 +1731,7 @@
 	argp->rqstp->rq_cachetype = cachethis ? RC_REPLBUFF : RC_NOCACHE;
 
 	if (readcount > 1 || max_reply > PAGE_SIZE - auth_slack)
-		argp->rqstp->rq_splice_ok = false;
+		clear_bit(RQ_SPLICE_OK, &argp->rqstp->rq_flags);
 
 	DECODE_TAIL;
 }
@@ -1795,9 +1812,12 @@
 		}
 		else
 			end++;
+		if (found_esc)
+			end = next;
+
 		str = end;
 	}
-	pathlen = htonl(xdr->buf->len - pathlen_offset);
+	pathlen = htonl(count);
 	write_bytes_to_xdr_buf(xdr->buf, pathlen_offset, &pathlen, 4);
 	return 0;
 }
@@ -3236,10 +3256,10 @@
 
 	p = xdr_reserve_space(xdr, 8); /* eof flag and byte count */
 	if (!p) {
-		WARN_ON_ONCE(resp->rqstp->rq_splice_ok);
+		WARN_ON_ONCE(test_bit(RQ_SPLICE_OK, &resp->rqstp->rq_flags));
 		return nfserr_resource;
 	}
-	if (resp->xdr.buf->page_len && resp->rqstp->rq_splice_ok) {
+	if (resp->xdr.buf->page_len && test_bit(RQ_SPLICE_OK, &resp->rqstp->rq_flags)) {
 		WARN_ON_ONCE(1);
 		return nfserr_resource;
 	}
@@ -3256,7 +3276,7 @@
 			goto err_truncate;
 	}
 
-	if (file->f_op->splice_read && resp->rqstp->rq_splice_ok)
+	if (file->f_op->splice_read && test_bit(RQ_SPLICE_OK, &resp->rqstp->rq_flags))
 		err = nfsd4_encode_splice_read(resp, read, file, maxcount);
 	else
 		err = nfsd4_encode_readv(resp, read, file, maxcount);
diff --git a/fs/nfsd/nfscache.c b/fs/nfsd/nfscache.c
index 122f691..83a9694 100644
--- a/fs/nfsd/nfscache.c
+++ b/fs/nfsd/nfscache.c
@@ -490,7 +490,7 @@
 	/* From the hall of fame of impractical attacks:
 	 * Is this a user who tries to snoop on the cache? */
 	rtn = RC_DOIT;
-	if (!rqstp->rq_secure && rp->c_secure)
+	if (!test_bit(RQ_SECURE, &rqstp->rq_flags) && rp->c_secure)
 		goto out;
 
 	/* Compose RPC reply header */
@@ -579,7 +579,7 @@
 	spin_lock(&b->cache_lock);
 	drc_mem_usage += bufsize;
 	lru_put_end(b, rp);
-	rp->c_secure = rqstp->rq_secure;
+	rp->c_secure = test_bit(RQ_SECURE, &rqstp->rq_flags);
 	rp->c_type = cachetype;
 	rp->c_state = RC_DONE;
 	spin_unlock(&b->cache_lock);
diff --git a/fs/nfsd/nfsctl.c b/fs/nfsd/nfsctl.c
index 9506ea5..19ace74 100644
--- a/fs/nfsd/nfsctl.c
+++ b/fs/nfsd/nfsctl.c
@@ -608,7 +608,7 @@
 				       num);
 			sep = " ";
 
-			if (len > remaining)
+			if (len >= remaining)
 				break;
 			remaining -= len;
 			buf += len;
@@ -623,7 +623,7 @@
 						'+' : '-',
 					minor);
 
-			if (len > remaining)
+			if (len >= remaining)
 				break;
 			remaining -= len;
 			buf += len;
@@ -631,7 +631,7 @@
 		}
 
 	len = snprintf(buf, remaining, "\n");
-	if (len > remaining)
+	if (len >= remaining)
 		return -EINVAL;
 	return tlen + len;
 }
diff --git a/fs/nfsd/nfsfh.c b/fs/nfsd/nfsfh.c
index 88026fc..965b478 100644
--- a/fs/nfsd/nfsfh.c
+++ b/fs/nfsd/nfsfh.c
@@ -86,7 +86,7 @@
 	int flags = nfsexp_flags(rqstp, exp);
 
 	/* Check if the request originated from a secure port. */
-	if (!rqstp->rq_secure && !(flags & NFSEXP_INSECURE_PORT)) {
+	if (!test_bit(RQ_SECURE, &rqstp->rq_flags) && !(flags & NFSEXP_INSECURE_PORT)) {
 		RPC_IFDEBUG(char buf[RPC_MAX_ADDRBUFLEN]);
 		dprintk("nfsd: request from insecure port %s!\n",
 		        svc_print_addr(rqstp, buf, sizeof(buf)));
diff --git a/fs/nfsd/nfssvc.c b/fs/nfsd/nfssvc.c
index 752d56b..314f5c8 100644
--- a/fs/nfsd/nfssvc.c
+++ b/fs/nfsd/nfssvc.c
@@ -692,7 +692,7 @@
 	/* Now call the procedure handler, and encode NFS status. */
 	nfserr = proc->pc_func(rqstp, rqstp->rq_argp, rqstp->rq_resp);
 	nfserr = map_new_errors(rqstp->rq_vers, nfserr);
-	if (nfserr == nfserr_dropit || rqstp->rq_dropme) {
+	if (nfserr == nfserr_dropit || test_bit(RQ_DROPME, &rqstp->rq_flags)) {
 		dprintk("nfsd: Dropping request; may be revisited later\n");
 		nfsd_cache_update(rqstp, RC_NOCACHE, NULL);
 		return 0;
diff --git a/fs/nfsd/state.h b/fs/nfsd/state.h
index 2712042..9d3be37 100644
--- a/fs/nfsd/state.h
+++ b/fs/nfsd/state.h
@@ -463,17 +463,24 @@
 /*
  * nfs4_file: a file opened by some number of (open) nfs4_stateowners.
  *
- * These objects are global. nfsd only keeps one instance of a nfs4_file per
- * inode (though it may keep multiple file descriptors open per inode). These
- * are tracked in the file_hashtbl which is protected by the state_lock
- * spinlock.
+ * These objects are global. nfsd keeps one instance of a nfs4_file per
+ * filehandle (though it may keep multiple file descriptors for each). Each
+ * inode can have multiple filehandles associated with it, so there is
+ * (potentially) a many to one relationship between this struct and struct
+ * inode.
+ *
+ * These are hashed by filehandle in the file_hashtbl, which is protected by
+ * the global state_lock spinlock.
  */
 struct nfs4_file {
 	atomic_t		fi_ref;
 	spinlock_t		fi_lock;
-	struct hlist_node       fi_hash;    /* hash by "struct inode *" */
+	struct hlist_node       fi_hash;	/* hash on fi_fhandle */
 	struct list_head        fi_stateids;
-	struct list_head	fi_delegations;
+	union {
+		struct list_head	fi_delegations;
+		struct rcu_head		fi_rcu;
+	};
 	/* One each for O_RDONLY, O_WRONLY, O_RDWR: */
 	struct file *		fi_fds[3];
 	/*
diff --git a/fs/nfsd/vfs.c b/fs/nfsd/vfs.c
index 0a82e3c..5685c67 100644
--- a/fs/nfsd/vfs.c
+++ b/fs/nfsd/vfs.c
@@ -16,6 +16,7 @@
 #include <linux/fs.h>
 #include <linux/file.h>
 #include <linux/splice.h>
+#include <linux/falloc.h>
 #include <linux/fcntl.h>
 #include <linux/namei.h>
 #include <linux/delay.h>
@@ -533,6 +534,26 @@
 }
 #endif
 
+__be32 nfsd4_vfs_fallocate(struct svc_rqst *rqstp, struct svc_fh *fhp,
+			   struct file *file, loff_t offset, loff_t len,
+			   int flags)
+{
+	__be32 err;
+	int error;
+
+	if (!S_ISREG(file_inode(file)->i_mode))
+		return nfserr_inval;
+
+	err = nfsd_permission(rqstp, fhp->fh_export, fhp->fh_dentry, NFSD_MAY_WRITE);
+	if (err)
+		return err;
+
+	error = vfs_fallocate(file, flags, offset, len);
+	if (!error)
+		error = commit_metadata(fhp);
+
+	return nfserrno(error);
+}
 #endif /* defined(CONFIG_NFSD_V4) */
 
 #ifdef CONFIG_NFSD_V3
@@ -881,7 +902,7 @@
 nfsd_vfs_read(struct svc_rqst *rqstp, struct file *file,
 	      loff_t offset, struct kvec *vec, int vlen, unsigned long *count)
 {
-	if (file->f_op->splice_read && rqstp->rq_splice_ok)
+	if (file->f_op->splice_read && test_bit(RQ_SPLICE_OK, &rqstp->rq_flags))
 		return nfsd_splice_read(rqstp, file, offset, count);
 	else
 		return nfsd_readv(file, offset, vec, vlen, count);
@@ -937,9 +958,10 @@
 	int			stable = *stablep;
 	int			use_wgather;
 	loff_t			pos = offset;
+	loff_t			end = LLONG_MAX;
 	unsigned int		pflags = current->flags;
 
-	if (rqstp->rq_local)
+	if (test_bit(RQ_LOCAL, &rqstp->rq_flags))
 		/*
 		 * We want less throttling in balance_dirty_pages()
 		 * and shrink_inactive_list() so that nfs to
@@ -967,10 +989,13 @@
 	fsnotify_modify(file);
 
 	if (stable) {
-		if (use_wgather)
+		if (use_wgather) {
 			host_err = wait_for_concurrent_writes(file);
-		else
-			host_err = vfs_fsync_range(file, offset, offset+*cnt, 0);
+		} else {
+			if (*cnt)
+				end = offset + *cnt - 1;
+			host_err = vfs_fsync_range(file, offset, end, 0);
+		}
 	}
 
 out_nfserr:
@@ -979,7 +1004,7 @@
 		err = 0;
 	else
 		err = nfserrno(host_err);
-	if (rqstp->rq_local)
+	if (test_bit(RQ_LOCAL, &rqstp->rq_flags))
 		tsk_restore_flags(current, pflags, PF_LESS_THROTTLE);
 	return err;
 }
diff --git a/fs/nfsd/vfs.h b/fs/nfsd/vfs.h
index b1796d6..2050cb0 100644
--- a/fs/nfsd/vfs.h
+++ b/fs/nfsd/vfs.h
@@ -54,6 +54,8 @@
 #ifdef CONFIG_NFSD_V4
 __be32          nfsd4_set_nfs4_label(struct svc_rqst *, struct svc_fh *,
 		    struct xdr_netobj *);
+__be32		nfsd4_vfs_fallocate(struct svc_rqst *, struct svc_fh *,
+				    struct file *, loff_t, loff_t, int);
 #endif /* CONFIG_NFSD_V4 */
 __be32		nfsd_create(struct svc_rqst *, struct svc_fh *,
 				char *name, int len, struct iattr *attrs,
diff --git a/fs/nfsd/xdr4.h b/fs/nfsd/xdr4.h
index 5720e94..90a5925 100644
--- a/fs/nfsd/xdr4.h
+++ b/fs/nfsd/xdr4.h
@@ -428,6 +428,13 @@
 	u32 rca_one_fs;
 };
 
+struct nfsd4_fallocate {
+	/* request */
+	stateid_t	falloc_stateid;
+	loff_t		falloc_offset;
+	u64		falloc_length;
+};
+
 struct nfsd4_seek {
 	/* request */
 	stateid_t	seek_stateid;
@@ -486,6 +493,8 @@
 		struct nfsd4_free_stateid	free_stateid;
 
 		/* NFSv4.2 */
+		struct nfsd4_fallocate		allocate;
+		struct nfsd4_fallocate		deallocate;
 		struct nfsd4_seek		seek;
 	} u;
 	struct nfs4_replay *			replay;
diff --git a/fs/open.c b/fs/open.c
index d45bd90..813be03 100644
--- a/fs/open.c
+++ b/fs/open.c
@@ -222,7 +222,7 @@
 #endif /* BITS_PER_LONG == 32 */
 
 
-int do_fallocate(struct file *file, int mode, loff_t offset, loff_t len)
+int vfs_fallocate(struct file *file, int mode, loff_t offset, loff_t len)
 {
 	struct inode *inode = file_inode(file);
 	long ret;
@@ -309,6 +309,7 @@
 	sb_end_write(inode->i_sb);
 	return ret;
 }
+EXPORT_SYMBOL_GPL(vfs_fallocate);
 
 SYSCALL_DEFINE4(fallocate, int, fd, int, mode, loff_t, offset, loff_t, len)
 {
@@ -316,7 +317,7 @@
 	int error = -EBADF;
 
 	if (f.file) {
-		error = do_fallocate(f.file, mode, offset, len);
+		error = vfs_fallocate(f.file, mode, offset, len);
 		fdput(f);
 	}
 	return error;
diff --git a/include/linux/fs.h b/include/linux/fs.h
index 8815725..eeaccd3 100644
--- a/include/linux/fs.h
+++ b/include/linux/fs.h
@@ -2086,7 +2086,7 @@
 extern long vfs_truncate(struct path *, loff_t);
 extern int do_truncate(struct dentry *, loff_t start, unsigned int time_attrs,
 		       struct file *filp);
-extern int do_fallocate(struct file *file, int mode, loff_t offset,
+extern int vfs_fallocate(struct file *file, int mode, loff_t offset,
 			loff_t len);
 extern long do_sys_open(int dfd, const char __user *filename, int flags,
 			umode_t mode);
diff --git a/include/linux/sunrpc/svc.h b/include/linux/sunrpc/svc.h
index 2167846..6f22cfe 100644
--- a/include/linux/sunrpc/svc.h
+++ b/include/linux/sunrpc/svc.h
@@ -26,10 +26,10 @@
 
 /* statistics for svc_pool structures */
 struct svc_pool_stats {
-	unsigned long	packets;
+	atomic_long_t	packets;
 	unsigned long	sockets_queued;
-	unsigned long	threads_woken;
-	unsigned long	threads_timedout;
+	atomic_long_t	threads_woken;
+	atomic_long_t	threads_timedout;
 };
 
 /*
@@ -45,12 +45,13 @@
 struct svc_pool {
 	unsigned int		sp_id;	    	/* pool id; also node id on NUMA */
 	spinlock_t		sp_lock;	/* protects all fields */
-	struct list_head	sp_threads;	/* idle server threads */
 	struct list_head	sp_sockets;	/* pending sockets */
 	unsigned int		sp_nrthreads;	/* # of threads in pool */
 	struct list_head	sp_all_threads;	/* all server threads */
 	struct svc_pool_stats	sp_stats;	/* statistics on pool operation */
-	int			sp_task_pending;/* has pending task */
+#define	SP_TASK_PENDING		(0)		/* still work to do even if no
+						 * xprt is queued. */
+	unsigned long		sp_flags;
 } ____cacheline_aligned_in_smp;
 
 /*
@@ -219,8 +220,8 @@
  * processed.
  */
 struct svc_rqst {
-	struct list_head	rq_list;	/* idle list */
 	struct list_head	rq_all;		/* all threads list */
+	struct rcu_head		rq_rcu_head;	/* for RCU deferred kfree */
 	struct svc_xprt *	rq_xprt;	/* transport ptr */
 
 	struct sockaddr_storage	rq_addr;	/* peer address */
@@ -236,7 +237,6 @@
 	struct svc_cred		rq_cred;	/* auth info */
 	void *			rq_xprt_ctxt;	/* transport specific context ptr */
 	struct svc_deferred_req*rq_deferred;	/* deferred request we are replaying */
-	bool			rq_usedeferral;	/* use deferral */
 
 	size_t			rq_xprt_hlen;	/* xprt header len */
 	struct xdr_buf		rq_arg;
@@ -253,9 +253,17 @@
 	u32			rq_vers;	/* program version */
 	u32			rq_proc;	/* procedure number */
 	u32			rq_prot;	/* IP protocol */
-	unsigned short
-				rq_secure  : 1;	/* secure port */
-	unsigned short		rq_local   : 1;	/* local request */
+	int			rq_cachetype;	/* catering to nfsd */
+#define	RQ_SECURE	(0)			/* secure port */
+#define	RQ_LOCAL	(1)			/* local request */
+#define	RQ_USEDEFERRAL	(2)			/* use deferral */
+#define	RQ_DROPME	(3)			/* drop current reply */
+#define	RQ_SPLICE_OK	(4)			/* turned off in gss privacy
+						 * to prevent encrypting page
+						 * cache pages */
+#define	RQ_VICTIM	(5)			/* about to be shut down */
+#define	RQ_BUSY		(6)			/* request is busy */
+	unsigned long		rq_flags;	/* flags field */
 
 	void *			rq_argp;	/* decoded arguments */
 	void *			rq_resp;	/* xdr'd results */
@@ -271,16 +279,12 @@
 	struct cache_req	rq_chandle;	/* handle passed to caches for 
 						 * request delaying 
 						 */
-	bool			rq_dropme;
 	/* Catering to nfsd */
 	struct auth_domain *	rq_client;	/* RPC peer info */
 	struct auth_domain *	rq_gssclient;	/* "gss/"-style peer info */
-	int			rq_cachetype;
 	struct svc_cacherep *	rq_cacherep;	/* cache info */
-	bool			rq_splice_ok;   /* turned off in gss privacy
-						 * to prevent encrypting page
-						 * cache pages */
 	struct task_struct	*rq_task;	/* service thread */
+	spinlock_t		rq_lock;	/* per-request lock */
 };
 
 #define SVC_NET(svc_rqst)	(svc_rqst->rq_xprt->xpt_net)
diff --git a/include/linux/sunrpc/svc_xprt.h b/include/linux/sunrpc/svc_xprt.h
index ce6e418..79f6f8f 100644
--- a/include/linux/sunrpc/svc_xprt.h
+++ b/include/linux/sunrpc/svc_xprt.h
@@ -63,10 +63,9 @@
 #define	XPT_CHNGBUF	7		/* need to change snd/rcv buf sizes */
 #define	XPT_DEFERRED	8		/* deferred request pending */
 #define	XPT_OLD		9		/* used for xprt aging mark+sweep */
-#define	XPT_DETACHED	10		/* detached from tempsocks list */
-#define XPT_LISTENER	11		/* listening endpoint */
-#define XPT_CACHE_AUTH	12		/* cache auth info */
-#define XPT_LOCAL	13		/* connection from loopback interface */
+#define XPT_LISTENER	10		/* listening endpoint */
+#define XPT_CACHE_AUTH	11		/* cache auth info */
+#define XPT_LOCAL	12		/* connection from loopback interface */
 
 	struct svc_serv		*xpt_server;	/* service for transport */
 	atomic_t    	    	xpt_reserved;	/* space on outq that is rsvd */
diff --git a/include/trace/events/sunrpc.h b/include/trace/events/sunrpc.h
index 171ca4f..b9c1dc6 100644
--- a/include/trace/events/sunrpc.h
+++ b/include/trace/events/sunrpc.h
@@ -8,6 +8,7 @@
 #include <linux/sunrpc/clnt.h>
 #include <linux/sunrpc/svc.h>
 #include <linux/sunrpc/xprtsock.h>
+#include <linux/sunrpc/svc_xprt.h>
 #include <net/tcp_states.h>
 #include <linux/net.h>
 #include <linux/tracepoint.h>
@@ -412,6 +413,16 @@
 			__entry->copied, __entry->reclen, __entry->offset)
 );
 
+#define show_rqstp_flags(flags)						\
+	__print_flags(flags, "|",					\
+		{ (1UL << RQ_SECURE),		"RQ_SECURE"},		\
+		{ (1UL << RQ_LOCAL),		"RQ_LOCAL"},		\
+		{ (1UL << RQ_USEDEFERRAL),	"RQ_USEDEFERRAL"},	\
+		{ (1UL << RQ_DROPME),		"RQ_DROPME"},		\
+		{ (1UL << RQ_SPLICE_OK),	"RQ_SPLICE_OK"},	\
+		{ (1UL << RQ_VICTIM),		"RQ_VICTIM"},		\
+		{ (1UL << RQ_BUSY),		"RQ_BUSY"})
+
 TRACE_EVENT(svc_recv,
 	TP_PROTO(struct svc_rqst *rqst, int status),
 
@@ -421,16 +432,19 @@
 		__field(struct sockaddr *, addr)
 		__field(__be32, xid)
 		__field(int, status)
+		__field(unsigned long, flags)
 	),
 
 	TP_fast_assign(
 		__entry->addr = (struct sockaddr *)&rqst->rq_addr;
 		__entry->xid = status > 0 ? rqst->rq_xid : 0;
 		__entry->status = status;
+		__entry->flags = rqst->rq_flags;
 	),
 
-	TP_printk("addr=%pIScp xid=0x%x status=%d", __entry->addr,
-			be32_to_cpu(__entry->xid), __entry->status)
+	TP_printk("addr=%pIScp xid=0x%x status=%d flags=%s", __entry->addr,
+			be32_to_cpu(__entry->xid), __entry->status,
+			show_rqstp_flags(__entry->flags))
 );
 
 DECLARE_EVENT_CLASS(svc_rqst_status,
@@ -444,18 +458,19 @@
 		__field(__be32, xid)
 		__field(int, dropme)
 		__field(int, status)
+		__field(unsigned long, flags)
 	),
 
 	TP_fast_assign(
 		__entry->addr = (struct sockaddr *)&rqst->rq_addr;
 		__entry->xid = rqst->rq_xid;
-		__entry->dropme = (int)rqst->rq_dropme;
 		__entry->status = status;
+		__entry->flags = rqst->rq_flags;
 	),
 
-	TP_printk("addr=%pIScp rq_xid=0x%x dropme=%d status=%d",
-		__entry->addr, be32_to_cpu(__entry->xid), __entry->dropme,
-		__entry->status)
+	TP_printk("addr=%pIScp rq_xid=0x%x status=%d flags=%s",
+		__entry->addr, be32_to_cpu(__entry->xid),
+		__entry->status, show_rqstp_flags(__entry->flags))
 );
 
 DEFINE_EVENT(svc_rqst_status, svc_process,
@@ -466,6 +481,99 @@
 	TP_PROTO(struct svc_rqst *rqst, int status),
 	TP_ARGS(rqst, status));
 
+#define show_svc_xprt_flags(flags)					\
+	__print_flags(flags, "|",					\
+		{ (1UL << XPT_BUSY),		"XPT_BUSY"},		\
+		{ (1UL << XPT_CONN),		"XPT_CONN"},		\
+		{ (1UL << XPT_CLOSE),		"XPT_CLOSE"},		\
+		{ (1UL << XPT_DATA),		"XPT_DATA"},		\
+		{ (1UL << XPT_TEMP),		"XPT_TEMP"},		\
+		{ (1UL << XPT_DEAD),		"XPT_DEAD"},		\
+		{ (1UL << XPT_CHNGBUF),		"XPT_CHNGBUF"},		\
+		{ (1UL << XPT_DEFERRED),	"XPT_DEFERRED"},	\
+		{ (1UL << XPT_OLD),		"XPT_OLD"},		\
+		{ (1UL << XPT_LISTENER),	"XPT_LISTENER"},	\
+		{ (1UL << XPT_CACHE_AUTH),	"XPT_CACHE_AUTH"},	\
+		{ (1UL << XPT_LOCAL),		"XPT_LOCAL"})
+
+TRACE_EVENT(svc_xprt_do_enqueue,
+	TP_PROTO(struct svc_xprt *xprt, struct svc_rqst *rqst),
+
+	TP_ARGS(xprt, rqst),
+
+	TP_STRUCT__entry(
+		__field(struct svc_xprt *, xprt)
+		__field(struct svc_rqst *, rqst)
+	),
+
+	TP_fast_assign(
+		__entry->xprt = xprt;
+		__entry->rqst = rqst;
+	),
+
+	TP_printk("xprt=0x%p addr=%pIScp pid=%d flags=%s", __entry->xprt,
+		(struct sockaddr *)&__entry->xprt->xpt_remote,
+		__entry->rqst ? __entry->rqst->rq_task->pid : 0,
+		show_svc_xprt_flags(__entry->xprt->xpt_flags))
+);
+
+TRACE_EVENT(svc_xprt_dequeue,
+	TP_PROTO(struct svc_xprt *xprt),
+
+	TP_ARGS(xprt),
+
+	TP_STRUCT__entry(
+		__field(struct svc_xprt *, xprt)
+		__field_struct(struct sockaddr_storage, ss)
+		__field(unsigned long, flags)
+	),
+
+	TP_fast_assign(
+		__entry->xprt = xprt,
+		xprt ? memcpy(&__entry->ss, &xprt->xpt_remote, sizeof(__entry->ss)) : memset(&__entry->ss, 0, sizeof(__entry->ss));
+		__entry->flags = xprt ? xprt->xpt_flags : 0;
+	),
+
+	TP_printk("xprt=0x%p addr=%pIScp flags=%s", __entry->xprt,
+		(struct sockaddr *)&__entry->ss,
+		show_svc_xprt_flags(__entry->flags))
+);
+
+TRACE_EVENT(svc_wake_up,
+	TP_PROTO(int pid),
+
+	TP_ARGS(pid),
+
+	TP_STRUCT__entry(
+		__field(int, pid)
+	),
+
+	TP_fast_assign(
+		__entry->pid = pid;
+	),
+
+	TP_printk("pid=%d", __entry->pid)
+);
+
+TRACE_EVENT(svc_handle_xprt,
+	TP_PROTO(struct svc_xprt *xprt, int len),
+
+	TP_ARGS(xprt, len),
+
+	TP_STRUCT__entry(
+		__field(struct svc_xprt *, xprt)
+		__field(int, len)
+	),
+
+	TP_fast_assign(
+		__entry->xprt = xprt;
+		__entry->len = len;
+	),
+
+	TP_printk("xprt=0x%p addr=%pIScp len=%d flags=%s", __entry->xprt,
+		(struct sockaddr *)&__entry->xprt->xpt_remote, __entry->len,
+		show_svc_xprt_flags(__entry->xprt->xpt_flags))
+);
 #endif /* _TRACE_SUNRPC_H */
 
 #include <trace/define_trace.h>
diff --git a/mm/madvise.c b/mm/madvise.c
index 0938b30..a271adc 100644
--- a/mm/madvise.c
+++ b/mm/madvise.c
@@ -326,7 +326,7 @@
 	 */
 	get_file(f);
 	up_read(&current->mm->mmap_sem);
-	error = do_fallocate(f,
+	error = vfs_fallocate(f,
 				FALLOC_FL_PUNCH_HOLE | FALLOC_FL_KEEP_SIZE,
 				offset, end - start);
 	fput(f);
diff --git a/net/sunrpc/auth_gss/svcauth_gss.c b/net/sunrpc/auth_gss/svcauth_gss.c
index de856dd..224a82f 100644
--- a/net/sunrpc/auth_gss/svcauth_gss.c
+++ b/net/sunrpc/auth_gss/svcauth_gss.c
@@ -886,7 +886,7 @@
 	u32 priv_len, maj_stat;
 	int pad, saved_len, remaining_len, offset;
 
-	rqstp->rq_splice_ok = false;
+	clear_bit(RQ_SPLICE_OK, &rqstp->rq_flags);
 
 	priv_len = svc_getnl(&buf->head[0]);
 	if (rqstp->rq_deferred) {
diff --git a/net/sunrpc/cache.c b/net/sunrpc/cache.c
index 0663621..33fb105 100644
--- a/net/sunrpc/cache.c
+++ b/net/sunrpc/cache.c
@@ -20,6 +20,7 @@
 #include <linux/list.h>
 #include <linux/module.h>
 #include <linux/ctype.h>
+#include <linux/string_helpers.h>
 #include <asm/uaccess.h>
 #include <linux/poll.h>
 #include <linux/seq_file.h>
@@ -1067,30 +1068,15 @@
 {
 	char *bp = *bpp;
 	int len = *lp;
-	char c;
+	int ret;
 
 	if (len < 0) return;
 
-	while ((c=*str++) && len)
-		switch(c) {
-		case ' ':
-		case '\t':
-		case '\n':
-		case '\\':
-			if (len >= 4) {
-				*bp++ = '\\';
-				*bp++ = '0' + ((c & 0300)>>6);
-				*bp++ = '0' + ((c & 0070)>>3);
-				*bp++ = '0' + ((c & 0007)>>0);
-			}
-			len -= 4;
-			break;
-		default:
-			*bp++ = c;
-			len--;
-		}
-	if (c || len <1) len = -1;
+	ret = string_escape_str(str, &bp, len, ESCAPE_OCTAL, "\\ \n\t");
+	if (ret < 0 || ret == len)
+		len = -1;
 	else {
+		len -= ret;
 		*bp++ = ' ';
 		len--;
 	}
diff --git a/net/sunrpc/svc.c b/net/sunrpc/svc.c
index 2783fd8..91eaef1 100644
--- a/net/sunrpc/svc.c
+++ b/net/sunrpc/svc.c
@@ -191,7 +191,7 @@
 		return err;
 
 	for_each_online_cpu(cpu) {
-		BUG_ON(pidx > maxpools);
+		BUG_ON(pidx >= maxpools);
 		m->to_pool[cpu] = pidx;
 		m->pool_to[pidx] = cpu;
 		pidx++;
@@ -476,15 +476,11 @@
 				i, serv->sv_name);
 
 		pool->sp_id = i;
-		INIT_LIST_HEAD(&pool->sp_threads);
 		INIT_LIST_HEAD(&pool->sp_sockets);
 		INIT_LIST_HEAD(&pool->sp_all_threads);
 		spin_lock_init(&pool->sp_lock);
 	}
 
-	if (svc_uses_rpcbind(serv) && (!serv->sv_shutdown))
-		serv->sv_shutdown = svc_rpcb_cleanup;
-
 	return serv;
 }
 
@@ -505,13 +501,15 @@
 	unsigned int npools = svc_pool_map_get();
 
 	serv = __svc_create(prog, bufsize, npools, shutdown);
+	if (!serv)
+		goto out_err;
 
-	if (serv != NULL) {
-		serv->sv_function = func;
-		serv->sv_module = mod;
-	}
-
+	serv->sv_function = func;
+	serv->sv_module = mod;
 	return serv;
+out_err:
+	svc_pool_map_put();
+	return NULL;
 }
 EXPORT_SYMBOL_GPL(svc_create_pooled);
 
@@ -615,12 +613,14 @@
 		goto out_enomem;
 
 	serv->sv_nrthreads++;
-	spin_lock_bh(&pool->sp_lock);
-	pool->sp_nrthreads++;
-	list_add(&rqstp->rq_all, &pool->sp_all_threads);
-	spin_unlock_bh(&pool->sp_lock);
+	__set_bit(RQ_BUSY, &rqstp->rq_flags);
+	spin_lock_init(&rqstp->rq_lock);
 	rqstp->rq_server = serv;
 	rqstp->rq_pool = pool;
+	spin_lock_bh(&pool->sp_lock);
+	pool->sp_nrthreads++;
+	list_add_rcu(&rqstp->rq_all, &pool->sp_all_threads);
+	spin_unlock_bh(&pool->sp_lock);
 
 	rqstp->rq_argp = kmalloc_node(serv->sv_xdrsize, GFP_KERNEL, node);
 	if (!rqstp->rq_argp)
@@ -685,7 +685,8 @@
 		 * so we don't try to kill it again.
 		 */
 		rqstp = list_entry(pool->sp_all_threads.next, struct svc_rqst, rq_all);
-		list_del_init(&rqstp->rq_all);
+		set_bit(RQ_VICTIM, &rqstp->rq_flags);
+		list_del_rcu(&rqstp->rq_all);
 		task = rqstp->rq_task;
 	}
 	spin_unlock_bh(&pool->sp_lock);
@@ -783,10 +784,11 @@
 
 	spin_lock_bh(&pool->sp_lock);
 	pool->sp_nrthreads--;
-	list_del(&rqstp->rq_all);
+	if (!test_and_set_bit(RQ_VICTIM, &rqstp->rq_flags))
+		list_del_rcu(&rqstp->rq_all);
 	spin_unlock_bh(&pool->sp_lock);
 
-	kfree(rqstp);
+	kfree_rcu(rqstp, rq_rcu_head);
 
 	/* Release the server */
 	if (serv)
@@ -1086,10 +1088,10 @@
 		goto err_short_len;
 
 	/* Will be turned off only in gss privacy case: */
-	rqstp->rq_splice_ok = true;
+	set_bit(RQ_SPLICE_OK, &rqstp->rq_flags);
 	/* Will be turned off only when NFSv4 Sessions are used */
-	rqstp->rq_usedeferral = true;
-	rqstp->rq_dropme = false;
+	set_bit(RQ_USEDEFERRAL, &rqstp->rq_flags);
+	clear_bit(RQ_DROPME, &rqstp->rq_flags);
 
 	/* Setup reply header */
 	rqstp->rq_xprt->xpt_ops->xpo_prep_reply_hdr(rqstp);
@@ -1189,7 +1191,7 @@
 		*statp = procp->pc_func(rqstp, rqstp->rq_argp, rqstp->rq_resp);
 
 		/* Encode reply */
-		if (rqstp->rq_dropme) {
+		if (test_bit(RQ_DROPME, &rqstp->rq_flags)) {
 			if (procp->pc_release)
 				procp->pc_release(rqstp, NULL, rqstp->rq_resp);
 			goto dropit;
diff --git a/net/sunrpc/svc_xprt.c b/net/sunrpc/svc_xprt.c
index bbb3b04..c69358b 100644
--- a/net/sunrpc/svc_xprt.c
+++ b/net/sunrpc/svc_xprt.c
@@ -220,9 +220,11 @@
  */
 static void svc_xprt_received(struct svc_xprt *xprt)
 {
-	WARN_ON_ONCE(!test_bit(XPT_BUSY, &xprt->xpt_flags));
-	if (!test_bit(XPT_BUSY, &xprt->xpt_flags))
+	if (!test_bit(XPT_BUSY, &xprt->xpt_flags)) {
+		WARN_ONCE(1, "xprt=0x%p already busy!", xprt);
 		return;
+	}
+
 	/* As soon as we clear busy, the xprt could be closed and
 	 * 'put', so we need a reference to call svc_xprt_do_enqueue with:
 	 */
@@ -310,25 +312,6 @@
 }
 EXPORT_SYMBOL_GPL(svc_print_addr);
 
-/*
- * Queue up an idle server thread.  Must have pool->sp_lock held.
- * Note: this is really a stack rather than a queue, so that we only
- * use as many different threads as we need, and the rest don't pollute
- * the cache.
- */
-static void svc_thread_enqueue(struct svc_pool *pool, struct svc_rqst *rqstp)
-{
-	list_add(&rqstp->rq_list, &pool->sp_threads);
-}
-
-/*
- * Dequeue an nfsd thread.  Must have pool->sp_lock held.
- */
-static void svc_thread_dequeue(struct svc_pool *pool, struct svc_rqst *rqstp)
-{
-	list_del(&rqstp->rq_list);
-}
-
 static bool svc_xprt_has_something_to_do(struct svc_xprt *xprt)
 {
 	if (xprt->xpt_flags & ((1<<XPT_CONN)|(1<<XPT_CLOSE)))
@@ -341,11 +324,12 @@
 static void svc_xprt_do_enqueue(struct svc_xprt *xprt)
 {
 	struct svc_pool *pool;
-	struct svc_rqst	*rqstp;
+	struct svc_rqst	*rqstp = NULL;
 	int cpu;
+	bool queued = false;
 
 	if (!svc_xprt_has_something_to_do(xprt))
-		return;
+		goto out;
 
 	/* Mark transport as busy. It will remain in this state until
 	 * the provider calls svc_xprt_received. We update XPT_BUSY
@@ -355,43 +339,69 @@
 	if (test_and_set_bit(XPT_BUSY, &xprt->xpt_flags)) {
 		/* Don't enqueue transport while already enqueued */
 		dprintk("svc: transport %p busy, not enqueued\n", xprt);
-		return;
+		goto out;
 	}
 
 	cpu = get_cpu();
 	pool = svc_pool_for_cpu(xprt->xpt_server, cpu);
-	spin_lock_bh(&pool->sp_lock);
 
-	pool->sp_stats.packets++;
+	atomic_long_inc(&pool->sp_stats.packets);
 
-	if (!list_empty(&pool->sp_threads)) {
-		rqstp = list_entry(pool->sp_threads.next,
-				   struct svc_rqst,
-				   rq_list);
-		dprintk("svc: transport %p served by daemon %p\n",
-			xprt, rqstp);
-		svc_thread_dequeue(pool, rqstp);
-		if (rqstp->rq_xprt)
-			printk(KERN_ERR
-				"svc_xprt_enqueue: server %p, rq_xprt=%p!\n",
-				rqstp, rqstp->rq_xprt);
-		/* Note the order of the following 3 lines:
-		 * We want to assign xprt to rqstp->rq_xprt only _after_
-		 * we've woken up the process, so that we don't race with
-		 * the lockless check in svc_get_next_xprt().
+redo_search:
+	/* find a thread for this xprt */
+	rcu_read_lock();
+	list_for_each_entry_rcu(rqstp, &pool->sp_all_threads, rq_all) {
+		/* Do a lockless check first */
+		if (test_bit(RQ_BUSY, &rqstp->rq_flags))
+			continue;
+
+		/*
+		 * Once the xprt has been queued, it can only be dequeued by
+		 * the task that intends to service it. All we can do at that
+		 * point is to try to wake this thread back up so that it can
+		 * do so.
 		 */
-		svc_xprt_get(xprt);
+		if (!queued) {
+			spin_lock_bh(&rqstp->rq_lock);
+			if (test_and_set_bit(RQ_BUSY, &rqstp->rq_flags)) {
+				/* already busy, move on... */
+				spin_unlock_bh(&rqstp->rq_lock);
+				continue;
+			}
+
+			/* this one will do */
+			rqstp->rq_xprt = xprt;
+			svc_xprt_get(xprt);
+			spin_unlock_bh(&rqstp->rq_lock);
+		}
+		rcu_read_unlock();
+
+		atomic_long_inc(&pool->sp_stats.threads_woken);
 		wake_up_process(rqstp->rq_task);
-		rqstp->rq_xprt = xprt;
-		pool->sp_stats.threads_woken++;
-	} else {
+		put_cpu();
+		goto out;
+	}
+	rcu_read_unlock();
+
+	/*
+	 * We didn't find an idle thread to use, so we need to queue the xprt.
+	 * Do so and then search again. If we find one, we can't hook this one
+	 * up to it directly but we can wake the thread up in the hopes that it
+	 * will pick it up once it searches for a xprt to service.
+	 */
+	if (!queued) {
+		queued = true;
 		dprintk("svc: transport %p put into queue\n", xprt);
+		spin_lock_bh(&pool->sp_lock);
 		list_add_tail(&xprt->xpt_ready, &pool->sp_sockets);
 		pool->sp_stats.sockets_queued++;
+		spin_unlock_bh(&pool->sp_lock);
+		goto redo_search;
 	}
-
-	spin_unlock_bh(&pool->sp_lock);
+	rqstp = NULL;
 	put_cpu();
+out:
+	trace_svc_xprt_do_enqueue(xprt, rqstp);
 }
 
 /*
@@ -408,22 +418,28 @@
 EXPORT_SYMBOL_GPL(svc_xprt_enqueue);
 
 /*
- * Dequeue the first transport.  Must be called with the pool->sp_lock held.
+ * Dequeue the first transport, if there is one.
  */
 static struct svc_xprt *svc_xprt_dequeue(struct svc_pool *pool)
 {
-	struct svc_xprt	*xprt;
+	struct svc_xprt	*xprt = NULL;
 
 	if (list_empty(&pool->sp_sockets))
-		return NULL;
+		goto out;
 
-	xprt = list_entry(pool->sp_sockets.next,
-			  struct svc_xprt, xpt_ready);
-	list_del_init(&xprt->xpt_ready);
+	spin_lock_bh(&pool->sp_lock);
+	if (likely(!list_empty(&pool->sp_sockets))) {
+		xprt = list_first_entry(&pool->sp_sockets,
+					struct svc_xprt, xpt_ready);
+		list_del_init(&xprt->xpt_ready);
+		svc_xprt_get(xprt);
 
-	dprintk("svc: transport %p dequeued, inuse=%d\n",
-		xprt, atomic_read(&xprt->xpt_ref.refcount));
-
+		dprintk("svc: transport %p dequeued, inuse=%d\n",
+			xprt, atomic_read(&xprt->xpt_ref.refcount));
+	}
+	spin_unlock_bh(&pool->sp_lock);
+out:
+	trace_svc_xprt_dequeue(xprt);
 	return xprt;
 }
 
@@ -484,34 +500,36 @@
 }
 
 /*
- * External function to wake up a server waiting for data
- * This really only makes sense for services like lockd
- * which have exactly one thread anyway.
+ * Some svc_serv's will have occasional work to do, even when a xprt is not
+ * waiting to be serviced. This function is there to "kick" a task in one of
+ * those services so that it can wake up and do that work. Note that we only
+ * bother with pool 0 as we don't need to wake up more than one thread for
+ * this purpose.
  */
 void svc_wake_up(struct svc_serv *serv)
 {
 	struct svc_rqst	*rqstp;
-	unsigned int i;
 	struct svc_pool *pool;
 
-	for (i = 0; i < serv->sv_nrpools; i++) {
-		pool = &serv->sv_pools[i];
+	pool = &serv->sv_pools[0];
 
-		spin_lock_bh(&pool->sp_lock);
-		if (!list_empty(&pool->sp_threads)) {
-			rqstp = list_entry(pool->sp_threads.next,
-					   struct svc_rqst,
-					   rq_list);
-			dprintk("svc: daemon %p woken up.\n", rqstp);
-			/*
-			svc_thread_dequeue(pool, rqstp);
-			rqstp->rq_xprt = NULL;
-			 */
-			wake_up_process(rqstp->rq_task);
-		} else
-			pool->sp_task_pending = 1;
-		spin_unlock_bh(&pool->sp_lock);
+	rcu_read_lock();
+	list_for_each_entry_rcu(rqstp, &pool->sp_all_threads, rq_all) {
+		/* skip any that aren't queued */
+		if (test_bit(RQ_BUSY, &rqstp->rq_flags))
+			continue;
+		rcu_read_unlock();
+		dprintk("svc: daemon %p woken up.\n", rqstp);
+		wake_up_process(rqstp->rq_task);
+		trace_svc_wake_up(rqstp->rq_task->pid);
+		return;
 	}
+	rcu_read_unlock();
+
+	/* No free entries available */
+	set_bit(SP_TASK_PENDING, &pool->sp_flags);
+	smp_wmb();
+	trace_svc_wake_up(0);
 }
 EXPORT_SYMBOL_GPL(svc_wake_up);
 
@@ -622,75 +640,86 @@
 	return 0;
 }
 
+static bool
+rqst_should_sleep(struct svc_rqst *rqstp)
+{
+	struct svc_pool		*pool = rqstp->rq_pool;
+
+	/* did someone call svc_wake_up? */
+	if (test_and_clear_bit(SP_TASK_PENDING, &pool->sp_flags))
+		return false;
+
+	/* was a socket queued? */
+	if (!list_empty(&pool->sp_sockets))
+		return false;
+
+	/* are we shutting down? */
+	if (signalled() || kthread_should_stop())
+		return false;
+
+	/* are we freezing? */
+	if (freezing(current))
+		return false;
+
+	return true;
+}
+
 static struct svc_xprt *svc_get_next_xprt(struct svc_rqst *rqstp, long timeout)
 {
 	struct svc_xprt *xprt;
 	struct svc_pool		*pool = rqstp->rq_pool;
 	long			time_left = 0;
 
+	/* rq_xprt should be clear on entry */
+	WARN_ON_ONCE(rqstp->rq_xprt);
+
 	/* Normally we will wait up to 5 seconds for any required
 	 * cache information to be provided.
 	 */
 	rqstp->rq_chandle.thread_wait = 5*HZ;
 
-	spin_lock_bh(&pool->sp_lock);
 	xprt = svc_xprt_dequeue(pool);
 	if (xprt) {
 		rqstp->rq_xprt = xprt;
-		svc_xprt_get(xprt);
 
 		/* As there is a shortage of threads and this request
 		 * had to be queued, don't allow the thread to wait so
 		 * long for cache updates.
 		 */
 		rqstp->rq_chandle.thread_wait = 1*HZ;
-		pool->sp_task_pending = 0;
-	} else {
-		if (pool->sp_task_pending) {
-			pool->sp_task_pending = 0;
-			xprt = ERR_PTR(-EAGAIN);
-			goto out;
-		}
-		/*
-		 * We have to be able to interrupt this wait
-		 * to bring down the daemons ...
-		 */
-		set_current_state(TASK_INTERRUPTIBLE);
-
-		/* No data pending. Go to sleep */
-		svc_thread_enqueue(pool, rqstp);
-		spin_unlock_bh(&pool->sp_lock);
-
-		if (!(signalled() || kthread_should_stop())) {
-			time_left = schedule_timeout(timeout);
-			__set_current_state(TASK_RUNNING);
-
-			try_to_freeze();
-
-			xprt = rqstp->rq_xprt;
-			if (xprt != NULL)
-				return xprt;
-		} else
-			__set_current_state(TASK_RUNNING);
-
-		spin_lock_bh(&pool->sp_lock);
-		if (!time_left)
-			pool->sp_stats.threads_timedout++;
-
-		xprt = rqstp->rq_xprt;
-		if (!xprt) {
-			svc_thread_dequeue(pool, rqstp);
-			spin_unlock_bh(&pool->sp_lock);
-			dprintk("svc: server %p, no data yet\n", rqstp);
-			if (signalled() || kthread_should_stop())
-				return ERR_PTR(-EINTR);
-			else
-				return ERR_PTR(-EAGAIN);
-		}
+		clear_bit(SP_TASK_PENDING, &pool->sp_flags);
+		return xprt;
 	}
-out:
-	spin_unlock_bh(&pool->sp_lock);
-	return xprt;
+
+	/*
+	 * We have to be able to interrupt this wait
+	 * to bring down the daemons ...
+	 */
+	set_current_state(TASK_INTERRUPTIBLE);
+	clear_bit(RQ_BUSY, &rqstp->rq_flags);
+	smp_mb();
+
+	if (likely(rqst_should_sleep(rqstp)))
+		time_left = schedule_timeout(timeout);
+	else
+		__set_current_state(TASK_RUNNING);
+
+	try_to_freeze();
+
+	spin_lock_bh(&rqstp->rq_lock);
+	set_bit(RQ_BUSY, &rqstp->rq_flags);
+	spin_unlock_bh(&rqstp->rq_lock);
+
+	xprt = rqstp->rq_xprt;
+	if (xprt != NULL)
+		return xprt;
+
+	if (!time_left)
+		atomic_long_inc(&pool->sp_stats.threads_timedout);
+
+	if (signalled() || kthread_should_stop())
+		return ERR_PTR(-EINTR);
+	return ERR_PTR(-EAGAIN);
 }
 
 static void svc_add_new_temp_xprt(struct svc_serv *serv, struct svc_xprt *newxpt)
@@ -719,7 +748,7 @@
 		dprintk("svc_recv: found XPT_CLOSE\n");
 		svc_delete_xprt(xprt);
 		/* Leave XPT_BUSY set on the dead xprt: */
-		return 0;
+		goto out;
 	}
 	if (test_bit(XPT_LISTENER, &xprt->xpt_flags)) {
 		struct svc_xprt *newxpt;
@@ -750,6 +779,8 @@
 	}
 	/* clear XPT_BUSY: */
 	svc_xprt_received(xprt);
+out:
+	trace_svc_handle_xprt(xprt, len);
 	return len;
 }
 
@@ -797,7 +828,10 @@
 
 	clear_bit(XPT_OLD, &xprt->xpt_flags);
 
-	rqstp->rq_secure = xprt->xpt_ops->xpo_secure_port(rqstp);
+	if (xprt->xpt_ops->xpo_secure_port(rqstp))
+		set_bit(RQ_SECURE, &rqstp->rq_flags);
+	else
+		clear_bit(RQ_SECURE, &rqstp->rq_flags);
 	rqstp->rq_chandle.defer = svc_defer;
 	rqstp->rq_xid = svc_getu32(&rqstp->rq_arg.head[0]);
 
@@ -895,7 +929,6 @@
 			continue;
 		list_del_init(le);
 		set_bit(XPT_CLOSE, &xprt->xpt_flags);
-		set_bit(XPT_DETACHED, &xprt->xpt_flags);
 		dprintk("queuing xprt %p for closing\n", xprt);
 
 		/* a thread will dequeue and close it soon */
@@ -935,8 +968,7 @@
 	xprt->xpt_ops->xpo_detach(xprt);
 
 	spin_lock_bh(&serv->sv_lock);
-	if (!test_and_set_bit(XPT_DETACHED, &xprt->xpt_flags))
-		list_del_init(&xprt->xpt_list);
+	list_del_init(&xprt->xpt_list);
 	WARN_ON_ONCE(!list_empty(&xprt->xpt_ready));
 	if (test_bit(XPT_TEMP, &xprt->xpt_flags))
 		serv->sv_tmpcnt--;
@@ -1080,7 +1112,7 @@
 	struct svc_rqst *rqstp = container_of(req, struct svc_rqst, rq_chandle);
 	struct svc_deferred_req *dr;
 
-	if (rqstp->rq_arg.page_len || !rqstp->rq_usedeferral)
+	if (rqstp->rq_arg.page_len || !test_bit(RQ_USEDEFERRAL, &rqstp->rq_flags))
 		return NULL; /* if more than a page, give up FIXME */
 	if (rqstp->rq_deferred) {
 		dr = rqstp->rq_deferred;
@@ -1109,7 +1141,7 @@
 	}
 	svc_xprt_get(rqstp->rq_xprt);
 	dr->xprt = rqstp->rq_xprt;
-	rqstp->rq_dropme = true;
+	set_bit(RQ_DROPME, &rqstp->rq_flags);
 
 	dr->handle.revisit = svc_revisit;
 	return &dr->handle;
@@ -1311,10 +1343,10 @@
 
 	seq_printf(m, "%u %lu %lu %lu %lu\n",
 		pool->sp_id,
-		pool->sp_stats.packets,
+		(unsigned long)atomic_long_read(&pool->sp_stats.packets),
 		pool->sp_stats.sockets_queued,
-		pool->sp_stats.threads_woken,
-		pool->sp_stats.threads_timedout);
+		(unsigned long)atomic_long_read(&pool->sp_stats.threads_woken),
+		(unsigned long)atomic_long_read(&pool->sp_stats.threads_timedout));
 
 	return 0;
 }
diff --git a/net/sunrpc/svcsock.c b/net/sunrpc/svcsock.c
index f9c052d..cc331b6 100644
--- a/net/sunrpc/svcsock.c
+++ b/net/sunrpc/svcsock.c
@@ -1145,7 +1145,10 @@
 
 	rqstp->rq_xprt_ctxt   = NULL;
 	rqstp->rq_prot	      = IPPROTO_TCP;
-	rqstp->rq_local	      = !!test_bit(XPT_LOCAL, &svsk->sk_xprt.xpt_flags);
+	if (test_bit(XPT_LOCAL, &svsk->sk_xprt.xpt_flags))
+		set_bit(RQ_LOCAL, &rqstp->rq_flags);
+	else
+		clear_bit(RQ_LOCAL, &rqstp->rq_flags);
 
 	p = (__be32 *)rqstp->rq_arg.head[0].iov_base;
 	calldir = p[1];
diff --git a/net/sunrpc/xdr.c b/net/sunrpc/xdr.c
index 290af97..1cb6124 100644
--- a/net/sunrpc/xdr.c
+++ b/net/sunrpc/xdr.c
@@ -617,9 +617,10 @@
 	fraglen = min_t(int, buf->len - len, tail->iov_len);
 	tail->iov_len -= fraglen;
 	buf->len -= fraglen;
-	if (tail->iov_len && buf->len == len) {
+	if (tail->iov_len) {
 		xdr->p = tail->iov_base + tail->iov_len;
-		/* xdr->end, xdr->iov should be set already */
+		WARN_ON_ONCE(!xdr->end);
+		WARN_ON_ONCE(!xdr->iov);
 		return;
 	}
 	WARN_ON_ONCE(fraglen);
@@ -631,11 +632,11 @@
 	old = new + fraglen;
 	xdr->page_ptr -= (old >> PAGE_SHIFT) - (new >> PAGE_SHIFT);
 
-	if (buf->page_len && buf->len == len) {
+	if (buf->page_len) {
 		xdr->p = page_address(*xdr->page_ptr);
 		xdr->end = (void *)xdr->p + PAGE_SIZE;
 		xdr->p = (void *)xdr->p + (new % PAGE_SIZE);
-		/* xdr->iov should already be NULL */
+		WARN_ON_ONCE(xdr->iov);
 		return;
 	}
 	if (fraglen) {