SUNRPC: Add a server side per-connection limit

Allow the user to limit the number of requests serviced through a single
connection, to help prevent faster clients from starving slower clients.

Signed-off-by: Trond Myklebust <trond.myklebust@primarydata.com>
Signed-off-by: J. Bruce Fields <bfields@redhat.com>
diff --git a/Documentation/kernel-parameters.txt b/Documentation/kernel-parameters.txt
index 82b42c9..48ba6d2 100644
--- a/Documentation/kernel-parameters.txt
+++ b/Documentation/kernel-parameters.txt
@@ -3832,6 +3832,12 @@
 			using these two parameters to set the minimum and
 			maximum port values.
 
+	sunrpc.svc_rpc_per_connection_limit=
+			[NFS,SUNRPC]
+			Limit the number of requests that the server will
+			process in parallel from a single connection.
+			The default value is 0 (no limit).
+
 	sunrpc.pool_mode=
 			[NFS]
 			Control how the NFS server code allocates CPUs to
diff --git a/include/linux/sunrpc/svc.h b/include/linux/sunrpc/svc.h
index 7ca44fb..7321ae9 100644
--- a/include/linux/sunrpc/svc.h
+++ b/include/linux/sunrpc/svc.h
@@ -268,6 +268,7 @@
 						 * cache pages */
 #define	RQ_VICTIM	(5)			/* about to be shut down */
 #define	RQ_BUSY		(6)			/* request is busy */
+#define	RQ_DATA		(7)			/* request has data */
 	unsigned long		rq_flags;	/* flags field */
 
 	void *			rq_argp;	/* decoded arguments */
diff --git a/include/linux/sunrpc/svc_xprt.h b/include/linux/sunrpc/svc_xprt.h
index 79ba508..ad899ff 100644
--- a/include/linux/sunrpc/svc_xprt.h
+++ b/include/linux/sunrpc/svc_xprt.h
@@ -69,6 +69,7 @@
 
 	struct svc_serv		*xpt_server;	/* service for transport */
 	atomic_t    	    	xpt_reserved;	/* space on outq that is rsvd */
+	atomic_t		xpt_nr_rqsts;	/* Number of requests */
 	struct mutex		xpt_mutex;	/* to serialize sending data */
 	spinlock_t		xpt_lock;	/* protects sk_deferred
 						 * and xpt_auth_cache */
diff --git a/net/sunrpc/svc_xprt.c b/net/sunrpc/svc_xprt.c
index e7082a4..2adc8db 100644
--- a/net/sunrpc/svc_xprt.c
+++ b/net/sunrpc/svc_xprt.c
@@ -21,6 +21,10 @@
 
 #define RPCDBG_FACILITY	RPCDBG_SVCXPRT
 
+static unsigned int svc_rpc_per_connection_limit __read_mostly;
+module_param(svc_rpc_per_connection_limit, uint, 0644);
+
+
 static struct svc_deferred_req *svc_deferred_dequeue(struct svc_xprt *xprt);
 static int svc_deferred_recv(struct svc_rqst *rqstp);
 static struct cache_deferred_req *svc_defer(struct cache_req *req);
@@ -329,12 +333,41 @@
 }
 EXPORT_SYMBOL_GPL(svc_print_addr);
 
+static bool svc_xprt_slots_in_range(struct svc_xprt *xprt)
+{
+	unsigned int limit = svc_rpc_per_connection_limit;
+	int nrqsts = atomic_read(&xprt->xpt_nr_rqsts);
+
+	return limit == 0 || (nrqsts >= 0 && nrqsts < limit);
+}
+
+static bool svc_xprt_reserve_slot(struct svc_rqst *rqstp, struct svc_xprt *xprt)
+{
+	if (!test_bit(RQ_DATA, &rqstp->rq_flags)) {
+		if (!svc_xprt_slots_in_range(xprt))
+			return false;
+		atomic_inc(&xprt->xpt_nr_rqsts);
+		set_bit(RQ_DATA, &rqstp->rq_flags);
+	}
+	return true;
+}
+
+static void svc_xprt_release_slot(struct svc_rqst *rqstp)
+{
+	struct svc_xprt	*xprt = rqstp->rq_xprt;
+	if (test_and_clear_bit(RQ_DATA, &rqstp->rq_flags)) {
+		atomic_dec(&xprt->xpt_nr_rqsts);
+		svc_xprt_enqueue(xprt);
+	}
+}
+
 static bool svc_xprt_has_something_to_do(struct svc_xprt *xprt)
 {
 	if (xprt->xpt_flags & ((1<<XPT_CONN)|(1<<XPT_CLOSE)))
 		return true;
 	if (xprt->xpt_flags & ((1<<XPT_DATA)|(1<<XPT_DEFERRED))) {
-		if (xprt->xpt_ops->xpo_has_wspace(xprt))
+		if (xprt->xpt_ops->xpo_has_wspace(xprt) &&
+		    svc_xprt_slots_in_range(xprt))
 			return true;
 		trace_svc_xprt_no_write_space(xprt);
 		return false;
@@ -516,8 +549,8 @@
 
 	rqstp->rq_res.head[0].iov_len = 0;
 	svc_reserve(rqstp, 0);
+	svc_xprt_release_slot(rqstp);
 	rqstp->rq_xprt = NULL;
-
 	svc_xprt_put(xprt);
 }
 
@@ -785,7 +818,7 @@
 			svc_add_new_temp_xprt(serv, newxpt);
 		else
 			module_put(xprt->xpt_class->xcl_owner);
-	} else {
+	} else if (svc_xprt_reserve_slot(rqstp, xprt)) {
 		/* XPT_DATA|XPT_DEFERRED case: */
 		dprintk("svc: server %p, pool %u, transport %p, inuse=%d\n",
 			rqstp, rqstp->rq_pool->sp_id, xprt,