xprtrdma: Disconnect on registration failure

If rpcrdma_register_external() fails during request marshaling, the
current RPC request is killed. Instead, this RPC should be retried
after reconnecting the transport instance.

The most likely reason for registration failure with FRMR is a
failed post_send, which would be due to a remote transport
disconnect or memory exhaustion. These issues can be recovered
by a retry.

Problems encountered in the marshaling logic itself will not be
corrected by trying again, so these should still kill a request.

Now that we've added a clean exit for marshaling errors, take the
opportunity to defang some BUG_ON's.

Signed-off-by: Chuck Lever <chuck.lever@oracle.com>
Signed-off-by: Anna Schumaker <Anna.Schumaker@Netapp.com>
diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
index 77b84cf..693966d 100644
--- a/net/sunrpc/xprtrdma/rpc_rdma.c
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -77,6 +77,8 @@
  * Prepare the passed-in xdr_buf into representation as RPC/RDMA chunk
  * elements. Segments are then coalesced when registered, if possible
  * within the selected memreg mode.
+ *
+ * Returns positive number of segments converted, or a negative errno.
  */
 
 static int
@@ -103,12 +105,13 @@
 			/* alloc the pagelist for receiving buffer */
 			ppages[p] = alloc_page(GFP_ATOMIC);
 			if (!ppages[p])
-				return 0;
+				return -ENOMEM;
 		}
 		seg[n].mr_page = ppages[p];
 		seg[n].mr_offset = (void *)(unsigned long) page_base;
 		seg[n].mr_len = min_t(u32, PAGE_SIZE - page_base, len);
-		BUG_ON(seg[n].mr_len > PAGE_SIZE);
+		if (seg[n].mr_len > PAGE_SIZE)
+			return -EIO;
 		len -= seg[n].mr_len;
 		++n;
 		++p;
@@ -117,7 +120,7 @@
 
 	/* Message overflows the seg array */
 	if (len && n == nsegs)
-		return 0;
+		return -EIO;
 
 	if (xdrbuf->tail[0].iov_len) {
 		/* the rpcrdma protocol allows us to omit any trailing
@@ -126,7 +129,7 @@
 			return n;
 		if (n == nsegs)
 			/* Tail remains, but we're out of segments */
-			return 0;
+			return -EIO;
 		seg[n].mr_page = NULL;
 		seg[n].mr_offset = xdrbuf->tail[0].iov_base;
 		seg[n].mr_len = xdrbuf->tail[0].iov_len;
@@ -167,15 +170,17 @@
  *  Reply chunk (a counted array):
  *   N elements:
  *    1 - N - HLOO - HLOO - ... - HLOO
+ *
+ * Returns positive RPC/RDMA header size, or negative errno.
  */
 
-static unsigned int
+static ssize_t
 rpcrdma_create_chunks(struct rpc_rqst *rqst, struct xdr_buf *target,
 		struct rpcrdma_msg *headerp, enum rpcrdma_chunktype type)
 {
 	struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
 	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt);
-	int nsegs, nchunks = 0;
+	int n, nsegs, nchunks = 0;
 	unsigned int pos;
 	struct rpcrdma_mr_seg *seg = req->rl_segments;
 	struct rpcrdma_read_chunk *cur_rchunk = NULL;
@@ -201,11 +206,11 @@
 		pos = target->head[0].iov_len;
 
 	nsegs = rpcrdma_convert_iovs(target, pos, type, seg, RPCRDMA_MAX_SEGS);
-	if (nsegs == 0)
-		return 0;
+	if (nsegs < 0)
+		return nsegs;
 
 	do {
-		int n = rpcrdma_register_external(seg, nsegs,
+		n = rpcrdma_register_external(seg, nsegs,
 						cur_wchunk != NULL, r_xprt);
 		if (n <= 0)
 			goto out;
@@ -277,7 +282,7 @@
 	for (pos = 0; nchunks--;)
 		pos += rpcrdma_deregister_external(
 				&req->rl_segments[pos], r_xprt);
-	return 0;
+	return n;
 }
 
 /*
@@ -359,6 +364,8 @@
  *  [1] -- the RPC header/data, marshaled by RPC and the NFS protocol.
  *  [2] -- optional padding.
  *  [3] -- if padded, header only in [1] and data here.
+ *
+ * Returns zero on success, otherwise a negative errno.
  */
 
 int
@@ -368,7 +375,8 @@
 	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
 	struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
 	char *base;
-	size_t hdrlen, rpclen, padlen;
+	size_t rpclen, padlen;
+	ssize_t hdrlen;
 	enum rpcrdma_chunktype rtype, wtype;
 	struct rpcrdma_msg *headerp;
 
@@ -439,7 +447,11 @@
 	/* The following simplification is not true forever */
 	if (rtype != rpcrdma_noch && wtype == rpcrdma_replych)
 		wtype = rpcrdma_noch;
-	BUG_ON(rtype != rpcrdma_noch && wtype != rpcrdma_noch);
+	if (rtype != rpcrdma_noch && wtype != rpcrdma_noch) {
+		dprintk("RPC:       %s: cannot marshal multiple chunk lists\n",
+			__func__);
+		return -EIO;
+	}
 
 	hdrlen = 28; /*sizeof *headerp;*/
 	padlen = 0;
@@ -464,8 +476,11 @@
 			headerp->rm_body.rm_padded.rm_pempty[1] = xdr_zero;
 			headerp->rm_body.rm_padded.rm_pempty[2] = xdr_zero;
 			hdrlen += 2 * sizeof(u32); /* extra words in padhdr */
-			BUG_ON(wtype != rpcrdma_noch);
-
+			if (wtype != rpcrdma_noch) {
+				dprintk("RPC:       %s: invalid chunk list\n",
+					__func__);
+				return -EIO;
+			}
 		} else {
 			headerp->rm_body.rm_nochunks.rm_empty[0] = xdr_zero;
 			headerp->rm_body.rm_nochunks.rm_empty[1] = xdr_zero;
@@ -500,9 +515,8 @@
 		hdrlen = rpcrdma_create_chunks(rqst,
 					&rqst->rq_rcv_buf, headerp, wtype);
 	}
-
-	if (hdrlen == 0)
-		return -1;
+	if (hdrlen < 0)
+		return hdrlen;
 
 	dprintk("RPC:       %s: %s: hdrlen %zd rpclen %zd padlen %zd"
 		" headerp 0x%p base 0x%p lkey 0x%x\n",
diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c
index 93fe775..66f91f0 100644
--- a/net/sunrpc/xprtrdma/transport.c
+++ b/net/sunrpc/xprtrdma/transport.c
@@ -595,13 +595,12 @@
 	struct rpc_xprt *xprt = rqst->rq_xprt;
 	struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
 	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
+	int rc;
 
-	/* marshal the send itself */
-	if (req->rl_niovs == 0 && rpcrdma_marshal_req(rqst) != 0) {
-		r_xprt->rx_stats.failed_marshal_count++;
-		dprintk("RPC:       %s: rpcrdma_marshal_req failed\n",
-			__func__);
-		return -EIO;
+	if (req->rl_niovs == 0) {
+		rc = rpcrdma_marshal_req(rqst);
+		if (rc < 0)
+			goto failed_marshal;
 	}
 
 	if (req->rl_reply == NULL) 		/* e.g. reconnection */
@@ -625,6 +624,12 @@
 	rqst->rq_bytes_sent = 0;
 	return 0;
 
+failed_marshal:
+	r_xprt->rx_stats.failed_marshal_count++;
+	dprintk("RPC:       %s: rpcrdma_marshal_req failed, status %i\n",
+		__func__, rc);
+	if (rc == -EIO)
+		return -EIO;
 drop_connection:
 	xprt_disconnect_done(xprt);
 	return -ENOTCONN;	/* implies disconnect */