nfs41: Add backchannel processing support to RPC state machine

Adds rpc_run_bc_task() which is called by the NFS callback service to
process backchannel requests.  It performs similar work to rpc_run_task()
though "schedules" the backchannel task to be executed starting at the
call_trasmit state in the RPC state machine.

It also introduces some miscellaneous updates to the argument validation,
call_transmit, and transport cleanup functions to take into account
that there are now forechannel and backchannel tasks.

Backchannel requests do not carry an RPC message structure, since the
payload has already been XDR encoded using the existing NFSv4 callback
mechanism.

Introduce a new transmit state for the client to reply on to backchannel
requests.  This new state simply reserves the transport and issues the
reply.  In case of a connection related error, disconnects the transport and
drops the reply.  It requires the forechannel to re-establish the connection
and the server to retransmit the request, as stated in NFSv4.1 section
2.9.2 "Client and Server Transport Behavior".

Note: There is no need to loop attempting to reserve the transport.  If EAGAIN
is returned by xprt_prepare_transmit(), return with tk_status == 0,
setting tk_action to call_bc_transmit.  rpc_execute() will invoke it again
after the task is taken off the sleep queue.

[nfs41: rpc_run_bc_task() need not be exported outside RPC module]
[nfs41: New call_bc_transmit RPC state]
Signed-off-by: Ricardo Labiaga <Ricardo.Labiaga@netapp.com>
Signed-off-by: Benny Halevy <bhalevy@panasas.com>
[nfs41: Backchannel: No need to loop in call_bc_transmit()]
Signed-off-by: Andy Adamson <andros@netapp.com>
Signed-off-by: Ricardo Labiaga <Ricardo.Labiaga@netapp.com>
Signed-off-by: Benny Halevy <bhalevy@panasas.com>
[rpc_count_iostats incorrectly exits early]
Signed-off-by: Ricardo Labiaga <Ricardo.Labiaga@netapp.com>
Signed-off-by: Benny Halevy <bhalevy@panasas.com>
[Convert rpc_reply_expected() to inline function]
[Remove unnecessary BUG_ON()]
[Rename variable]
Signed-off-by: Ricardo Labiaga <Ricardo.Labiaga@netapp.com>
Signed-off-by: Benny Halevy <bhalevy@panasas.com>
diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c
index aca3ab6..f3e93b8 100644
--- a/net/sunrpc/clnt.c
+++ b/net/sunrpc/clnt.c
@@ -36,7 +36,9 @@
 #include <linux/sunrpc/clnt.h>
 #include <linux/sunrpc/rpc_pipe_fs.h>
 #include <linux/sunrpc/metrics.h>
+#include <linux/sunrpc/bc_xprt.h>
 
+#include "sunrpc.h"
 
 #ifdef RPC_DEBUG
 # define RPCDBG_FACILITY	RPCDBG_CALL
@@ -63,6 +65,9 @@
 static void	call_bind(struct rpc_task *task);
 static void	call_bind_status(struct rpc_task *task);
 static void	call_transmit(struct rpc_task *task);
+#if defined(CONFIG_NFS_V4_1)
+static void	call_bc_transmit(struct rpc_task *task);
+#endif /* CONFIG_NFS_V4_1 */
 static void	call_status(struct rpc_task *task);
 static void	call_transmit_status(struct rpc_task *task);
 static void	call_refresh(struct rpc_task *task);
@@ -613,6 +618,50 @@
 }
 EXPORT_SYMBOL_GPL(rpc_call_async);
 
+#if defined(CONFIG_NFS_V4_1)
+/**
+ * rpc_run_bc_task - Allocate a new RPC task for backchannel use, then run
+ * rpc_execute against it
+ * @ops: RPC call ops
+ */
+struct rpc_task *rpc_run_bc_task(struct rpc_rqst *req,
+					const struct rpc_call_ops *tk_ops)
+{
+	struct rpc_task *task;
+	struct xdr_buf *xbufp = &req->rq_snd_buf;
+	struct rpc_task_setup task_setup_data = {
+		.callback_ops = tk_ops,
+	};
+
+	dprintk("RPC: rpc_run_bc_task req= %p\n", req);
+	/*
+	 * Create an rpc_task to send the data
+	 */
+	task = rpc_new_task(&task_setup_data);
+	if (!task) {
+		xprt_free_bc_request(req);
+		goto out;
+	}
+	task->tk_rqstp = req;
+
+	/*
+	 * Set up the xdr_buf length.
+	 * This also indicates that the buffer is XDR encoded already.
+	 */
+	xbufp->len = xbufp->head[0].iov_len + xbufp->page_len +
+			xbufp->tail[0].iov_len;
+
+	task->tk_action = call_bc_transmit;
+	atomic_inc(&task->tk_count);
+	BUG_ON(atomic_read(&task->tk_count) != 2);
+	rpc_execute(task);
+
+out:
+	dprintk("RPC: rpc_run_bc_task: task= %p\n", task);
+	return task;
+}
+#endif /* CONFIG_NFS_V4_1 */
+
 void
 rpc_call_start(struct rpc_task *task)
 {
@@ -1098,7 +1147,7 @@
 	 * in order to allow access to the socket to other RPC requests.
 	 */
 	call_transmit_status(task);
-	if (task->tk_msg.rpc_proc->p_decode != NULL)
+	if (rpc_reply_expected(task))
 		return;
 	task->tk_action = rpc_exit_task;
 	rpc_wake_up_queued_task(&task->tk_xprt->pending, task);
@@ -1133,6 +1182,72 @@
 	}
 }
 
+#if defined(CONFIG_NFS_V4_1)
+/*
+ * 5b.	Send the backchannel RPC reply.  On error, drop the reply.  In
+ * addition, disconnect on connectivity errors.
+ */
+static void
+call_bc_transmit(struct rpc_task *task)
+{
+	struct rpc_rqst *req = task->tk_rqstp;
+
+	BUG_ON(task->tk_status != 0);
+	task->tk_status = xprt_prepare_transmit(task);
+	if (task->tk_status == -EAGAIN) {
+		/*
+		 * Could not reserve the transport. Try again after the
+		 * transport is released.
+		 */
+		task->tk_status = 0;
+		task->tk_action = call_bc_transmit;
+		return;
+	}
+
+	task->tk_action = rpc_exit_task;
+	if (task->tk_status < 0) {
+		printk(KERN_NOTICE "RPC: Could not send backchannel reply "
+			"error: %d\n", task->tk_status);
+		return;
+	}
+
+	xprt_transmit(task);
+	xprt_end_transmit(task);
+	dprint_status(task);
+	switch (task->tk_status) {
+	case 0:
+		/* Success */
+		break;
+	case -EHOSTDOWN:
+	case -EHOSTUNREACH:
+	case -ENETUNREACH:
+	case -ETIMEDOUT:
+		/*
+		 * Problem reaching the server.  Disconnect and let the
+		 * forechannel reestablish the connection.  The server will
+		 * have to retransmit the backchannel request and we'll
+		 * reprocess it.  Since these ops are idempotent, there's no
+		 * need to cache our reply at this time.
+		 */
+		printk(KERN_NOTICE "RPC: Could not send backchannel reply "
+			"error: %d\n", task->tk_status);
+		xprt_conditional_disconnect(task->tk_xprt,
+			req->rq_connect_cookie);
+		break;
+	default:
+		/*
+		 * We were unable to reply and will have to drop the
+		 * request.  The server should reconnect and retransmit.
+		 */
+		BUG_ON(task->tk_status == -EAGAIN);
+		printk(KERN_NOTICE "RPC: Could not send backchannel reply "
+			"error: %d\n", task->tk_status);
+		break;
+	}
+	rpc_wake_up_queued_task(&req->rq_xprt->pending, task);
+}
+#endif /* CONFIG_NFS_V4_1 */
+
 /*
  * 6.	Sort out the RPC call status
  */