Merge branch 'for-2.6.26' of git://linux-nfs.org/~bfields/linux

* 'for-2.6.26' of git://linux-nfs.org/~bfields/linux: (25 commits)
  svcrdma: Verify read-list fits within RPCSVC_MAXPAGES
  svcrdma: Change svc_rdma_send_error return type to void
  svcrdma: Copy transport address and arm CQ before calling rdma_accept
  svcrdma: Set rqstp transport address in rdma_read_complete function
  svcrdma: Use ib verbs version of dma_unmap
  svcrdma: Cleanup queued, but unprocessed I/O in svc_rdma_free
  svcrdma: Move the QP and cm_id destruction to svc_rdma_free
  svcrdma: Add reference for each SQ/RQ WR
  svcrdma: Move destroy to kernel thread
  svcrdma: Shrink scope of spinlock on RQ CQ
  svcrdma: Use standard Linux lists for context cache
  svcrdma: Simplify RDMA_READ deferral buffer management
  svcrdma: Remove unused READ_DONE context flags bit
  svcrdma: Return error from rdma_read_xdr so caller knows to free context
  svcrdma: Fix error handling during listening endpoint creation
  svcrdma: Free context on post_recv error in send_reply
  svcrdma: Free context on ib_post_recv error
  svcrdma: Add put of connection ESTABLISHED reference in rdma_cma_handler
  svcrdma: Fix return value in svc_rdma_send
  svcrdma: Fix race with dto_tasklet in svc_rdma_send
  ...
diff --git a/fs/nfsd/nfs4callback.c b/fs/nfsd/nfs4callback.c
index 0b3ffa9..4d4760e 100644
--- a/fs/nfsd/nfs4callback.c
+++ b/fs/nfsd/nfs4callback.c
@@ -419,9 +419,9 @@
 out_release_client:
 	rpc_shutdown_client(client);
 out_err:
-	put_nfs4_client(clp);
 	dprintk("NFSD: warning: no callback path to client %.*s\n",
 		(int)clp->cl_name.len, clp->cl_name.data);
+	put_nfs4_client(clp);
 	return status;
 }
 
diff --git a/include/linux/sunrpc/svc_rdma.h b/include/linux/sunrpc/svc_rdma.h
index c11bbcc..05eb466 100644
--- a/include/linux/sunrpc/svc_rdma.h
+++ b/include/linux/sunrpc/svc_rdma.h
@@ -71,7 +71,8 @@
  * completes.
  */
 struct svc_rdma_op_ctxt {
-	struct svc_rdma_op_ctxt *next;
+	struct svc_rdma_op_ctxt *read_hdr;
+	struct list_head free_list;
 	struct xdr_buf arg;
 	struct list_head dto_q;
 	enum ib_wr_opcode wr_op;
@@ -85,7 +86,6 @@
 	struct page *pages[RPCSVC_MAXPAGES];
 };
 
-#define RDMACTXT_F_READ_DONE	1
 #define RDMACTXT_F_LAST_CTXT	2
 
 struct svcxprt_rdma {
@@ -104,7 +104,8 @@
 
 	struct ib_pd         *sc_pd;
 
-	struct svc_rdma_op_ctxt  *sc_ctxt_head;
+	atomic_t	     sc_ctxt_used;
+	struct list_head     sc_ctxt_free;
 	int		     sc_ctxt_cnt;
 	int		     sc_ctxt_bump;
 	int		     sc_ctxt_max;
@@ -123,6 +124,7 @@
 	struct list_head     sc_dto_q;		/* DTO tasklet I/O pending Q */
 	struct list_head     sc_read_complete_q;
 	spinlock_t           sc_read_complete_lock;
+	struct work_struct   sc_work;
 };
 /* sc_flags */
 #define RDMAXPRT_RQ_PENDING	1
@@ -164,8 +166,8 @@
 
 /* svc_rdma_transport.c */
 extern int svc_rdma_send(struct svcxprt_rdma *, struct ib_send_wr *);
-extern int svc_rdma_send_error(struct svcxprt_rdma *, struct rpcrdma_msg *,
-			       enum rpcrdma_errcode);
+extern void svc_rdma_send_error(struct svcxprt_rdma *, struct rpcrdma_msg *,
+				enum rpcrdma_errcode);
 struct page *svc_rdma_get_page(void);
 extern int svc_rdma_post_recv(struct svcxprt_rdma *);
 extern int svc_rdma_create_listen(struct svc_serv *, int, struct sockaddr *);
diff --git a/net/sunrpc/svc_xprt.c b/net/sunrpc/svc_xprt.c
index d8e8d79..e46c825 100644
--- a/net/sunrpc/svc_xprt.c
+++ b/net/sunrpc/svc_xprt.c
@@ -6,30 +6,9 @@
 
 #include <linux/sched.h>
 #include <linux/errno.h>
-#include <linux/fcntl.h>
-#include <linux/net.h>
-#include <linux/in.h>
-#include <linux/inet.h>
-#include <linux/udp.h>
-#include <linux/tcp.h>
-#include <linux/unistd.h>
-#include <linux/slab.h>
-#include <linux/netdevice.h>
-#include <linux/skbuff.h>
-#include <linux/file.h>
 #include <linux/freezer.h>
 #include <linux/kthread.h>
 #include <net/sock.h>
-#include <net/checksum.h>
-#include <net/ip.h>
-#include <net/ipv6.h>
-#include <net/tcp_states.h>
-#include <linux/uaccess.h>
-#include <asm/ioctls.h>
-
-#include <linux/sunrpc/types.h>
-#include <linux/sunrpc/clnt.h>
-#include <linux/sunrpc/xdr.h>
 #include <linux/sunrpc/stats.h>
 #include <linux/sunrpc/svc_xprt.h>
 
@@ -296,8 +275,6 @@
 	if (!(xprt->xpt_flags &
 	      ((1<<XPT_CONN)|(1<<XPT_DATA)|(1<<XPT_CLOSE)|(1<<XPT_DEFERRED))))
 		return;
-	if (test_bit(XPT_DEAD, &xprt->xpt_flags))
-		return;
 
 	cpu = get_cpu();
 	pool = svc_pool_for_cpu(xprt->xpt_server, cpu);
diff --git a/net/sunrpc/svcauth_unix.c b/net/sunrpc/svcauth_unix.c
index 3f30ee6..f24800f 100644
--- a/net/sunrpc/svcauth_unix.c
+++ b/net/sunrpc/svcauth_unix.c
@@ -278,7 +278,7 @@
 		dom = im->m_client->h.name;
 
 	if (ipv6_addr_v4mapped(&addr)) {
-		seq_printf(m, "%s" NIPQUAD_FMT "%s\n",
+		seq_printf(m, "%s " NIPQUAD_FMT " %s\n",
 			im->m_class,
 			ntohl(addr.s6_addr32[3]) >> 24 & 0xff,
 			ntohl(addr.s6_addr32[3]) >> 16 & 0xff,
@@ -286,7 +286,7 @@
 			ntohl(addr.s6_addr32[3]) >>  0 & 0xff,
 			dom);
 	} else {
-		seq_printf(m, "%s" NIP6_FMT "%s\n",
+		seq_printf(m, "%s " NIP6_FMT " %s\n",
 			im->m_class, NIP6(addr), dom);
 	}
 	return 0;
diff --git a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
index c22d6b6..06ab484 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
@@ -260,11 +260,16 @@
  * On our side, we need to read into a pagelist. The first page immediately
  * follows the RPC header.
  *
- * This function returns 1 to indicate success. The data is not yet in
+ * This function returns:
+ * 0 - No error and no read-list found.
+ *
+ * 1 - Successful read-list processing. The data is not yet in
  * the pagelist and therefore the RPC request must be deferred. The
  * I/O completion will enqueue the transport again and
  * svc_rdma_recvfrom will complete the request.
  *
+ * <0 - Error processing/posting read-list.
+ *
  * NOTE: The ctxt must not be touched after the last WR has been posted
  * because the I/O completion processing may occur on another
  * processor and free / modify the context. Ne touche pas!
@@ -284,7 +289,6 @@
 	u64 sgl_offset;
 	struct rpcrdma_read_chunk *ch;
 	struct svc_rdma_op_ctxt *ctxt = NULL;
-	struct svc_rdma_op_ctxt *head;
 	struct svc_rdma_op_ctxt *tmp_sge_ctxt;
 	struct svc_rdma_op_ctxt *tmp_ch_ctxt;
 	struct chunk_sge *ch_sge_ary;
@@ -302,25 +306,19 @@
 	ch_sge_ary = (struct chunk_sge *)tmp_ch_ctxt->sge;
 
 	svc_rdma_rcl_chunk_counts(ch, &ch_count, &byte_count);
+	if (ch_count > RPCSVC_MAXPAGES)
+		return -EINVAL;
 	sge_count = rdma_rcl_to_sge(xprt, rqstp, hdr_ctxt, rmsgp,
 				    sge, ch_sge_ary,
 				    ch_count, byte_count);
-	head = svc_rdma_get_context(xprt);
 	sgl_offset = 0;
 	ch_no = 0;
 
 	for (ch = (struct rpcrdma_read_chunk *)&rmsgp->rm_body.rm_chunks[0];
 	     ch->rc_discrim != 0; ch++, ch_no++) {
 next_sge:
-		if (!ctxt)
-			ctxt = head;
-		else {
-			ctxt->next = svc_rdma_get_context(xprt);
-			ctxt = ctxt->next;
-		}
-		ctxt->next = NULL;
+		ctxt = svc_rdma_get_context(xprt);
 		ctxt->direction = DMA_FROM_DEVICE;
-		clear_bit(RDMACTXT_F_READ_DONE, &ctxt->flags);
 		clear_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags);
 
 		/* Prepare READ WR */
@@ -347,20 +345,15 @@
 			 * the client and the RPC needs to be enqueued.
 			 */
 			set_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags);
-			ctxt->next = hdr_ctxt;
-			hdr_ctxt->next = head;
+			ctxt->read_hdr = hdr_ctxt;
 		}
 		/* Post the read */
 		err = svc_rdma_send(xprt, &read_wr);
 		if (err) {
-			printk(KERN_ERR "svcrdma: Error posting send = %d\n",
+			printk(KERN_ERR "svcrdma: Error %d posting RDMA_READ\n",
 			       err);
-			/*
-			 * Break the circular list so free knows when
-			 * to stop if the error happened to occur on
-			 * the last read
-			 */
-			ctxt->next = NULL;
+			set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
+			svc_rdma_put_context(ctxt, 0);
 			goto out;
 		}
 		atomic_inc(&rdma_stat_read);
@@ -371,7 +364,7 @@
 			goto next_sge;
 		}
 		sgl_offset = 0;
-		err = 0;
+		err = 1;
 	}
 
  out:
@@ -389,25 +382,12 @@
 	while (rqstp->rq_resused)
 		rqstp->rq_respages[--rqstp->rq_resused] = NULL;
 
-	if (err) {
-		printk(KERN_ERR "svcrdma : RDMA_READ error = %d\n", err);
-		set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
-		/* Free the linked list of read contexts */
-		while (head != NULL) {
-			ctxt = head->next;
-			svc_rdma_put_context(head, 1);
-			head = ctxt;
-		}
-		return 0;
-	}
-
-	return 1;
+	return err;
 }
 
 static int rdma_read_complete(struct svc_rqst *rqstp,
-			      struct svc_rdma_op_ctxt *data)
+			      struct svc_rdma_op_ctxt *head)
 {
-	struct svc_rdma_op_ctxt *head = data->next;
 	int page_no;
 	int ret;
 
@@ -433,21 +413,12 @@
 	rqstp->rq_arg.len = head->arg.len;
 	rqstp->rq_arg.buflen = head->arg.buflen;
 
+	/* Free the context */
+	svc_rdma_put_context(head, 0);
+
 	/* XXX: What should this be? */
 	rqstp->rq_prot = IPPROTO_MAX;
-
-	/*
-	 * Free the contexts we used to build the RDMA_READ. We have
-	 * to be careful here because the context list uses the same
-	 * next pointer used to chain the contexts associated with the
-	 * RDMA_READ
-	 */
-	data->next = NULL;	/* terminate circular list */
-	do {
-		data = head->next;
-		svc_rdma_put_context(head, 0);
-		head = data;
-	} while (head != NULL);
+	svc_xprt_copy_addrs(rqstp, rqstp->rq_xprt);
 
 	ret = rqstp->rq_arg.head[0].iov_len
 		+ rqstp->rq_arg.page_len
@@ -457,8 +428,6 @@
 		ret, rqstp->rq_arg.len,	rqstp->rq_arg.head[0].iov_base,
 		rqstp->rq_arg.head[0].iov_len);
 
-	/* Indicate that we've consumed an RQ credit */
-	rqstp->rq_xprt_ctxt = rqstp->rq_xprt;
 	svc_xprt_received(rqstp->rq_xprt);
 	return ret;
 }
@@ -480,13 +449,6 @@
 
 	dprintk("svcrdma: rqstp=%p\n", rqstp);
 
-	/*
-	 * The rq_xprt_ctxt indicates if we've consumed an RQ credit
-	 * or not. It is used in the rdma xpo_release_rqst function to
-	 * determine whether or not to return an RQ WQE to the RQ.
-	 */
-	rqstp->rq_xprt_ctxt = NULL;
-
 	spin_lock_bh(&rdma_xprt->sc_read_complete_lock);
 	if (!list_empty(&rdma_xprt->sc_read_complete_q)) {
 		ctxt = list_entry(rdma_xprt->sc_read_complete_q.next,
@@ -537,21 +499,22 @@
 	/* If the request is invalid, reply with an error */
 	if (len < 0) {
 		if (len == -ENOSYS)
-			(void)svc_rdma_send_error(rdma_xprt, rmsgp, ERR_VERS);
+			svc_rdma_send_error(rdma_xprt, rmsgp, ERR_VERS);
 		goto close_out;
 	}
 
-	/* Read read-list data. If we would need to wait, defer
-	 * it. Not that in this case, we don't return the RQ credit
-	 * until after the read completes.
-	 */
-	if (rdma_read_xdr(rdma_xprt, rmsgp, rqstp, ctxt)) {
+	/* Read read-list data. */
+	ret = rdma_read_xdr(rdma_xprt, rmsgp, rqstp, ctxt);
+	if (ret > 0) {
+		/* read-list posted, defer until data received from client. */
 		svc_xprt_received(xprt);
 		return 0;
 	}
-
-	/* Indicate we've consumed an RQ credit */
-	rqstp->rq_xprt_ctxt = rqstp->rq_xprt;
+	if (ret < 0) {
+		/* Post of read-list failed, free context. */
+		svc_rdma_put_context(ctxt, 1);
+		return 0;
+	}
 
 	ret = rqstp->rq_arg.head[0].iov_len
 		+ rqstp->rq_arg.page_len
@@ -569,11 +532,8 @@
 	return ret;
 
  close_out:
-	if (ctxt) {
+	if (ctxt)
 		svc_rdma_put_context(ctxt, 1);
-		/* Indicate we've consumed an RQ credit */
-		rqstp->rq_xprt_ctxt = rqstp->rq_xprt;
-	}
 	dprintk("svcrdma: transport %p is closing\n", xprt);
 	/*
 	 * Set the close bit and enqueue it. svc_recv will see the
diff --git a/net/sunrpc/xprtrdma/svc_rdma_sendto.c b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
index 981f190..fb82b1b 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_sendto.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
@@ -389,6 +389,17 @@
 	int page_no;
 	int ret;
 
+	/* Post a recv buffer to handle another request. */
+	ret = svc_rdma_post_recv(rdma);
+	if (ret) {
+		printk(KERN_INFO
+		       "svcrdma: could not post a receive buffer, err=%d."
+		       "Closing transport %p.\n", ret, rdma);
+		set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags);
+		svc_rdma_put_context(ctxt, 0);
+		return -ENOTCONN;
+	}
+
 	/* Prepare the context */
 	ctxt->pages[0] = page;
 	ctxt->count = 1;
diff --git a/net/sunrpc/xprtrdma/svc_rdma_transport.c b/net/sunrpc/xprtrdma/svc_rdma_transport.c
index af408fc..e132509 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_transport.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_transport.c
@@ -103,8 +103,8 @@
 		spin_lock_bh(&xprt->sc_ctxt_lock);
 		if (ctxt) {
 			at_least_one = 1;
-			ctxt->next = xprt->sc_ctxt_head;
-			xprt->sc_ctxt_head = ctxt;
+			INIT_LIST_HEAD(&ctxt->free_list);
+			list_add(&ctxt->free_list, &xprt->sc_ctxt_free);
 		} else {
 			/* kmalloc failed...give up for now */
 			xprt->sc_ctxt_cnt--;
@@ -123,7 +123,7 @@
 
 	while (1) {
 		spin_lock_bh(&xprt->sc_ctxt_lock);
-		if (unlikely(xprt->sc_ctxt_head == NULL)) {
+		if (unlikely(list_empty(&xprt->sc_ctxt_free))) {
 			/* Try to bump my cache. */
 			spin_unlock_bh(&xprt->sc_ctxt_lock);
 
@@ -136,12 +136,15 @@
 			schedule_timeout_uninterruptible(msecs_to_jiffies(500));
 			continue;
 		}
-		ctxt = xprt->sc_ctxt_head;
-		xprt->sc_ctxt_head = ctxt->next;
+		ctxt = list_entry(xprt->sc_ctxt_free.next,
+				  struct svc_rdma_op_ctxt,
+				  free_list);
+		list_del_init(&ctxt->free_list);
 		spin_unlock_bh(&xprt->sc_ctxt_lock);
 		ctxt->xprt = xprt;
 		INIT_LIST_HEAD(&ctxt->dto_q);
 		ctxt->count = 0;
+		atomic_inc(&xprt->sc_ctxt_used);
 		break;
 	}
 	return ctxt;
@@ -159,14 +162,15 @@
 			put_page(ctxt->pages[i]);
 
 	for (i = 0; i < ctxt->count; i++)
-		dma_unmap_single(xprt->sc_cm_id->device->dma_device,
-				 ctxt->sge[i].addr,
-				 ctxt->sge[i].length,
-				 ctxt->direction);
+		ib_dma_unmap_single(xprt->sc_cm_id->device,
+				    ctxt->sge[i].addr,
+				    ctxt->sge[i].length,
+				    ctxt->direction);
+
 	spin_lock_bh(&xprt->sc_ctxt_lock);
-	ctxt->next = xprt->sc_ctxt_head;
-	xprt->sc_ctxt_head = ctxt;
+	list_add(&ctxt->free_list, &xprt->sc_ctxt_free);
 	spin_unlock_bh(&xprt->sc_ctxt_lock);
+	atomic_dec(&xprt->sc_ctxt_used);
 }
 
 /* ib_cq event handler */
@@ -228,23 +232,8 @@
 		list_del_init(&xprt->sc_dto_q);
 		spin_unlock_irqrestore(&dto_lock, flags);
 
-		if (test_and_clear_bit(RDMAXPRT_RQ_PENDING, &xprt->sc_flags)) {
-			ib_req_notify_cq(xprt->sc_rq_cq, IB_CQ_NEXT_COMP);
-			rq_cq_reap(xprt);
-			set_bit(XPT_DATA, &xprt->sc_xprt.xpt_flags);
-			/*
-			 * If data arrived before established event,
-			 * don't enqueue. This defers RPC I/O until the
-			 * RDMA connection is complete.
-			 */
-			if (!test_bit(RDMAXPRT_CONN_PENDING, &xprt->sc_flags))
-				svc_xprt_enqueue(&xprt->sc_xprt);
-		}
-
-		if (test_and_clear_bit(RDMAXPRT_SQ_PENDING, &xprt->sc_flags)) {
-			ib_req_notify_cq(xprt->sc_sq_cq, IB_CQ_NEXT_COMP);
-			sq_cq_reap(xprt);
-		}
+		rq_cq_reap(xprt);
+		sq_cq_reap(xprt);
 
 		svc_xprt_put(&xprt->sc_xprt);
 		spin_lock_irqsave(&dto_lock, flags);
@@ -263,11 +252,15 @@
 	struct svcxprt_rdma *xprt = cq_context;
 	unsigned long flags;
 
+	/* Guard against unconditional flush call for destroyed QP */
+	if (atomic_read(&xprt->sc_xprt.xpt_ref.refcount)==0)
+		return;
+
 	/*
 	 * Set the bit regardless of whether or not it's on the list
 	 * because it may be on the list already due to an SQ
 	 * completion.
-	*/
+	 */
 	set_bit(RDMAXPRT_RQ_PENDING, &xprt->sc_flags);
 
 	/*
@@ -290,6 +283,8 @@
  *
  * Take all completing WC off the CQE and enqueue the associated DTO
  * context on the dto_q for the transport.
+ *
+ * Note that caller must hold a transport reference.
  */
 static void rq_cq_reap(struct svcxprt_rdma *xprt)
 {
@@ -297,29 +292,47 @@
 	struct ib_wc wc;
 	struct svc_rdma_op_ctxt *ctxt = NULL;
 
+	if (!test_and_clear_bit(RDMAXPRT_RQ_PENDING, &xprt->sc_flags))
+		return;
+
+	ib_req_notify_cq(xprt->sc_rq_cq, IB_CQ_NEXT_COMP);
 	atomic_inc(&rdma_stat_rq_poll);
 
-	spin_lock_bh(&xprt->sc_rq_dto_lock);
 	while ((ret = ib_poll_cq(xprt->sc_rq_cq, 1, &wc)) > 0) {
 		ctxt = (struct svc_rdma_op_ctxt *)(unsigned long)wc.wr_id;
 		ctxt->wc_status = wc.status;
 		ctxt->byte_len = wc.byte_len;
 		if (wc.status != IB_WC_SUCCESS) {
 			/* Close the transport */
+			dprintk("svcrdma: transport closing putting ctxt %p\n", ctxt);
 			set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
 			svc_rdma_put_context(ctxt, 1);
+			svc_xprt_put(&xprt->sc_xprt);
 			continue;
 		}
+		spin_lock_bh(&xprt->sc_rq_dto_lock);
 		list_add_tail(&ctxt->dto_q, &xprt->sc_rq_dto_q);
+		spin_unlock_bh(&xprt->sc_rq_dto_lock);
+		svc_xprt_put(&xprt->sc_xprt);
 	}
-	spin_unlock_bh(&xprt->sc_rq_dto_lock);
 
 	if (ctxt)
 		atomic_inc(&rdma_stat_rq_prod);
+
+	set_bit(XPT_DATA, &xprt->sc_xprt.xpt_flags);
+	/*
+	 * If data arrived before established event,
+	 * don't enqueue. This defers RPC I/O until the
+	 * RDMA connection is complete.
+	 */
+	if (!test_bit(RDMAXPRT_CONN_PENDING, &xprt->sc_flags))
+		svc_xprt_enqueue(&xprt->sc_xprt);
 }
 
 /*
  * Send Queue Completion Handler - potentially called on interrupt context.
+ *
+ * Note that caller must hold a transport reference.
  */
 static void sq_cq_reap(struct svcxprt_rdma *xprt)
 {
@@ -328,6 +341,11 @@
 	struct ib_cq *cq = xprt->sc_sq_cq;
 	int ret;
 
+
+	if (!test_and_clear_bit(RDMAXPRT_SQ_PENDING, &xprt->sc_flags))
+		return;
+
+	ib_req_notify_cq(xprt->sc_sq_cq, IB_CQ_NEXT_COMP);
 	atomic_inc(&rdma_stat_sq_poll);
 	while ((ret = ib_poll_cq(cq, 1, &wc)) > 0) {
 		ctxt = (struct svc_rdma_op_ctxt *)(unsigned long)wc.wr_id;
@@ -349,14 +367,16 @@
 
 		case IB_WR_RDMA_READ:
 			if (test_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags)) {
+				struct svc_rdma_op_ctxt *read_hdr = ctxt->read_hdr;
+				BUG_ON(!read_hdr);
 				set_bit(XPT_DATA, &xprt->sc_xprt.xpt_flags);
-				set_bit(RDMACTXT_F_READ_DONE, &ctxt->flags);
 				spin_lock_bh(&xprt->sc_read_complete_lock);
-				list_add_tail(&ctxt->dto_q,
+				list_add_tail(&read_hdr->dto_q,
 					      &xprt->sc_read_complete_q);
 				spin_unlock_bh(&xprt->sc_read_complete_lock);
 				svc_xprt_enqueue(&xprt->sc_xprt);
 			}
+			svc_rdma_put_context(ctxt, 0);
 			break;
 
 		default:
@@ -365,6 +385,7 @@
 			       wc.opcode, wc.status);
 			break;
 		}
+		svc_xprt_put(&xprt->sc_xprt);
 	}
 
 	if (ctxt)
@@ -376,11 +397,15 @@
 	struct svcxprt_rdma *xprt = cq_context;
 	unsigned long flags;
 
+	/* Guard against unconditional flush call for destroyed QP */
+	if (atomic_read(&xprt->sc_xprt.xpt_ref.refcount)==0)
+		return;
+
 	/*
 	 * Set the bit regardless of whether or not it's on the list
 	 * because it may be on the list already due to an RQ
 	 * completion.
-	*/
+	 */
 	set_bit(RDMAXPRT_SQ_PENDING, &xprt->sc_flags);
 
 	/*
@@ -407,28 +432,29 @@
 	xprt->sc_ctxt_max = ctxt_max;
 	xprt->sc_ctxt_bump = ctxt_bump;
 	xprt->sc_ctxt_cnt = 0;
-	xprt->sc_ctxt_head = NULL;
+	atomic_set(&xprt->sc_ctxt_used, 0);
+
+	INIT_LIST_HEAD(&xprt->sc_ctxt_free);
 	for (i = 0; i < ctxt_count; i++) {
 		ctxt = kmalloc(sizeof(*ctxt), GFP_KERNEL);
 		if (ctxt) {
-			ctxt->next = xprt->sc_ctxt_head;
-			xprt->sc_ctxt_head = ctxt;
+			INIT_LIST_HEAD(&ctxt->free_list);
+			list_add(&ctxt->free_list, &xprt->sc_ctxt_free);
 			xprt->sc_ctxt_cnt++;
 		}
 	}
 }
 
-static void destroy_context_cache(struct svc_rdma_op_ctxt *ctxt)
+static void destroy_context_cache(struct svcxprt_rdma *xprt)
 {
-	struct svc_rdma_op_ctxt *next;
-	if (!ctxt)
-		return;
-
-	do {
-		next = ctxt->next;
+	while (!list_empty(&xprt->sc_ctxt_free)) {
+		struct svc_rdma_op_ctxt *ctxt;
+		ctxt = list_entry(xprt->sc_ctxt_free.next,
+				  struct svc_rdma_op_ctxt,
+				  free_list);
+		list_del_init(&ctxt->free_list);
 		kfree(ctxt);
-		ctxt = next;
-	} while (next);
+	}
 }
 
 static struct svcxprt_rdma *rdma_create_xprt(struct svc_serv *serv,
@@ -465,7 +491,7 @@
 				     reqs +
 				     cma_xprt->sc_sq_depth +
 				     RPCRDMA_MAX_THREADS + 1); /* max */
-		if (!cma_xprt->sc_ctxt_head) {
+		if (list_empty(&cma_xprt->sc_ctxt_free)) {
 			kfree(cma_xprt);
 			return NULL;
 		}
@@ -520,7 +546,12 @@
 	recv_wr.num_sge = ctxt->count;
 	recv_wr.wr_id = (u64)(unsigned long)ctxt;
 
+	svc_xprt_get(&xprt->sc_xprt);
 	ret = ib_post_recv(xprt->sc_qp, &recv_wr, &bad_recv_wr);
+	if (ret) {
+		svc_xprt_put(&xprt->sc_xprt);
+		svc_rdma_put_context(ctxt, 1);
+	}
 	return ret;
 }
 
@@ -539,6 +570,7 @@
 {
 	struct svcxprt_rdma *listen_xprt = new_cma_id->context;
 	struct svcxprt_rdma *newxprt;
+	struct sockaddr *sa;
 
 	/* Create a new transport */
 	newxprt = rdma_create_xprt(listen_xprt->sc_xprt.xpt_server, 0);
@@ -551,6 +583,12 @@
 	dprintk("svcrdma: Creating newxprt=%p, cm_id=%p, listenxprt=%p\n",
 		newxprt, newxprt->sc_cm_id, listen_xprt);
 
+	/* Set the local and remote addresses in the transport */
+	sa = (struct sockaddr *)&newxprt->sc_cm_id->route.addr.dst_addr;
+	svc_xprt_set_remote(&newxprt->sc_xprt, sa, svc_addr_len(sa));
+	sa = (struct sockaddr *)&newxprt->sc_cm_id->route.addr.src_addr;
+	svc_xprt_set_local(&newxprt->sc_xprt, sa, svc_addr_len(sa));
+
 	/*
 	 * Enqueue the new transport on the accept queue of the listening
 	 * transport
@@ -627,6 +665,7 @@
 		if (xprt) {
 			set_bit(XPT_CLOSE, &xprt->xpt_flags);
 			svc_xprt_enqueue(xprt);
+			svc_xprt_put(xprt);
 		}
 		break;
 	case RDMA_CM_EVENT_DEVICE_REMOVAL:
@@ -661,31 +700,27 @@
 
 	cma_xprt = rdma_create_xprt(serv, 1);
 	if (!cma_xprt)
-		return ERR_PTR(ENOMEM);
+		return ERR_PTR(-ENOMEM);
 	xprt = &cma_xprt->sc_xprt;
 
 	listen_id = rdma_create_id(rdma_listen_handler, cma_xprt, RDMA_PS_TCP);
 	if (IS_ERR(listen_id)) {
-		svc_xprt_put(&cma_xprt->sc_xprt);
-		dprintk("svcrdma: rdma_create_id failed = %ld\n",
-			PTR_ERR(listen_id));
-		return (void *)listen_id;
+		ret = PTR_ERR(listen_id);
+		dprintk("svcrdma: rdma_create_id failed = %d\n", ret);
+		goto err0;
 	}
+
 	ret = rdma_bind_addr(listen_id, sa);
 	if (ret) {
-		rdma_destroy_id(listen_id);
-		svc_xprt_put(&cma_xprt->sc_xprt);
 		dprintk("svcrdma: rdma_bind_addr failed = %d\n", ret);
-		return ERR_PTR(ret);
+		goto err1;
 	}
 	cma_xprt->sc_cm_id = listen_id;
 
 	ret = rdma_listen(listen_id, RPCRDMA_LISTEN_BACKLOG);
 	if (ret) {
-		rdma_destroy_id(listen_id);
-		svc_xprt_put(&cma_xprt->sc_xprt);
 		dprintk("svcrdma: rdma_listen failed = %d\n", ret);
-		return ERR_PTR(ret);
+		goto err1;
 	}
 
 	/*
@@ -696,6 +731,12 @@
 	svc_xprt_set_local(&cma_xprt->sc_xprt, sa, salen);
 
 	return &cma_xprt->sc_xprt;
+
+ err1:
+	rdma_destroy_id(listen_id);
+ err0:
+	kfree(cma_xprt);
+	return ERR_PTR(ret);
 }
 
 /*
@@ -716,7 +757,6 @@
 	struct rdma_conn_param conn_param;
 	struct ib_qp_init_attr qp_attr;
 	struct ib_device_attr devattr;
-	struct sockaddr *sa;
 	int ret;
 	int i;
 
@@ -826,7 +866,6 @@
 		newxprt->sc_sq_depth = qp_attr.cap.max_send_wr;
 		newxprt->sc_max_requests = qp_attr.cap.max_recv_wr;
 	}
-	svc_xprt_get(&newxprt->sc_xprt);
 	newxprt->sc_qp = newxprt->sc_cm_id->qp;
 
 	/* Register all of physical memory */
@@ -850,6 +889,13 @@
 	/* Swap out the handler */
 	newxprt->sc_cm_id->event_handler = rdma_cma_handler;
 
+	/*
+	 * Arm the CQs for the SQ and RQ before accepting so we can't
+	 * miss the first message
+	 */
+	ib_req_notify_cq(newxprt->sc_sq_cq, IB_CQ_NEXT_COMP);
+	ib_req_notify_cq(newxprt->sc_rq_cq, IB_CQ_NEXT_COMP);
+
 	/* Accept Connection */
 	set_bit(RDMAXPRT_CONN_PENDING, &newxprt->sc_flags);
 	memset(&conn_param, 0, sizeof conn_param);
@@ -886,58 +932,26 @@
 		newxprt->sc_max_requests,
 		newxprt->sc_ord);
 
-	/* Set the local and remote addresses in the transport */
-	sa = (struct sockaddr *)&newxprt->sc_cm_id->route.addr.dst_addr;
-	svc_xprt_set_remote(&newxprt->sc_xprt, sa, svc_addr_len(sa));
-	sa = (struct sockaddr *)&newxprt->sc_cm_id->route.addr.src_addr;
-	svc_xprt_set_local(&newxprt->sc_xprt, sa, svc_addr_len(sa));
-
-	ib_req_notify_cq(newxprt->sc_sq_cq, IB_CQ_NEXT_COMP);
-	ib_req_notify_cq(newxprt->sc_rq_cq, IB_CQ_NEXT_COMP);
 	return &newxprt->sc_xprt;
 
  errout:
 	dprintk("svcrdma: failure accepting new connection rc=%d.\n", ret);
 	/* Take a reference in case the DTO handler runs */
 	svc_xprt_get(&newxprt->sc_xprt);
-	if (newxprt->sc_qp && !IS_ERR(newxprt->sc_qp)) {
+	if (newxprt->sc_qp && !IS_ERR(newxprt->sc_qp))
 		ib_destroy_qp(newxprt->sc_qp);
-		svc_xprt_put(&newxprt->sc_xprt);
-	}
 	rdma_destroy_id(newxprt->sc_cm_id);
 	/* This call to put will destroy the transport */
 	svc_xprt_put(&newxprt->sc_xprt);
 	return NULL;
 }
 
-/*
- * Post an RQ WQE to the RQ when the rqst is being released. This
- * effectively returns an RQ credit to the client. The rq_xprt_ctxt
- * will be null if the request is deferred due to an RDMA_READ or the
- * transport had no data ready (EAGAIN). Note that an RPC deferred in
- * svc_process will still return the credit, this is because the data
- * is copied and no longer consume a WQE/WC.
- */
 static void svc_rdma_release_rqst(struct svc_rqst *rqstp)
 {
-	int err;
-	struct svcxprt_rdma *rdma =
-		container_of(rqstp->rq_xprt, struct svcxprt_rdma, sc_xprt);
-	if (rqstp->rq_xprt_ctxt) {
-		BUG_ON(rqstp->rq_xprt_ctxt != rdma);
-		err = svc_rdma_post_recv(rdma);
-		if (err)
-			dprintk("svcrdma: failed to post an RQ WQE error=%d\n",
-				err);
-	}
-	rqstp->rq_xprt_ctxt = NULL;
 }
 
 /*
- * When connected, an svc_xprt has at least three references:
- *
- * - A reference held by the QP. We still hold that here because this
- *   code deletes the QP and puts the reference.
+ * When connected, an svc_xprt has at least two references:
  *
  * - A reference held by the cm_id between the ESTABLISHED and
  *   DISCONNECTED events. If the remote peer disconnected first, this
@@ -946,7 +960,7 @@
  * - A reference held by the svc_recv code that called this function
  *   as part of close processing.
  *
- * At a minimum two references should still be held.
+ * At a minimum one references should still be held.
  */
 static void svc_rdma_detach(struct svc_xprt *xprt)
 {
@@ -956,23 +970,53 @@
 
 	/* Disconnect and flush posted WQE */
 	rdma_disconnect(rdma->sc_cm_id);
-
-	/* Destroy the QP if present (not a listener) */
-	if (rdma->sc_qp && !IS_ERR(rdma->sc_qp)) {
-		ib_destroy_qp(rdma->sc_qp);
-		svc_xprt_put(xprt);
-	}
-
-	/* Destroy the CM ID */
-	rdma_destroy_id(rdma->sc_cm_id);
 }
 
-static void svc_rdma_free(struct svc_xprt *xprt)
+static void __svc_rdma_free(struct work_struct *work)
 {
-	struct svcxprt_rdma *rdma = (struct svcxprt_rdma *)xprt;
+	struct svcxprt_rdma *rdma =
+		container_of(work, struct svcxprt_rdma, sc_work);
 	dprintk("svcrdma: svc_rdma_free(%p)\n", rdma);
+
 	/* We should only be called from kref_put */
-	BUG_ON(atomic_read(&xprt->xpt_ref.refcount) != 0);
+	BUG_ON(atomic_read(&rdma->sc_xprt.xpt_ref.refcount) != 0);
+
+	/*
+	 * Destroy queued, but not processed read completions. Note
+	 * that this cleanup has to be done before destroying the
+	 * cm_id because the device ptr is needed to unmap the dma in
+	 * svc_rdma_put_context.
+	 */
+	spin_lock_bh(&rdma->sc_read_complete_lock);
+	while (!list_empty(&rdma->sc_read_complete_q)) {
+		struct svc_rdma_op_ctxt *ctxt;
+		ctxt = list_entry(rdma->sc_read_complete_q.next,
+				  struct svc_rdma_op_ctxt,
+				  dto_q);
+		list_del_init(&ctxt->dto_q);
+		svc_rdma_put_context(ctxt, 1);
+	}
+	spin_unlock_bh(&rdma->sc_read_complete_lock);
+
+	/* Destroy queued, but not processed recv completions */
+	spin_lock_bh(&rdma->sc_rq_dto_lock);
+	while (!list_empty(&rdma->sc_rq_dto_q)) {
+		struct svc_rdma_op_ctxt *ctxt;
+		ctxt = list_entry(rdma->sc_rq_dto_q.next,
+				  struct svc_rdma_op_ctxt,
+				  dto_q);
+		list_del_init(&ctxt->dto_q);
+		svc_rdma_put_context(ctxt, 1);
+	}
+	spin_unlock_bh(&rdma->sc_rq_dto_lock);
+
+	/* Warn if we leaked a resource or under-referenced */
+	WARN_ON(atomic_read(&rdma->sc_ctxt_used) != 0);
+
+	/* Destroy the QP if present (not a listener) */
+	if (rdma->sc_qp && !IS_ERR(rdma->sc_qp))
+		ib_destroy_qp(rdma->sc_qp);
+
 	if (rdma->sc_sq_cq && !IS_ERR(rdma->sc_sq_cq))
 		ib_destroy_cq(rdma->sc_sq_cq);
 
@@ -985,10 +1029,21 @@
 	if (rdma->sc_pd && !IS_ERR(rdma->sc_pd))
 		ib_dealloc_pd(rdma->sc_pd);
 
-	destroy_context_cache(rdma->sc_ctxt_head);
+	/* Destroy the CM ID */
+	rdma_destroy_id(rdma->sc_cm_id);
+
+	destroy_context_cache(rdma);
 	kfree(rdma);
 }
 
+static void svc_rdma_free(struct svc_xprt *xprt)
+{
+	struct svcxprt_rdma *rdma =
+		container_of(xprt, struct svcxprt_rdma, sc_xprt);
+	INIT_WORK(&rdma->sc_work, __svc_rdma_free);
+	schedule_work(&rdma->sc_work);
+}
+
 static int svc_rdma_has_wspace(struct svc_xprt *xprt)
 {
 	struct svcxprt_rdma *rdma =
@@ -1018,7 +1073,7 @@
 	int ret;
 
 	if (test_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags))
-		return 0;
+		return -ENOTCONN;
 
 	BUG_ON(wr->send_flags != IB_SEND_SIGNALED);
 	BUG_ON(((struct svc_rdma_op_ctxt *)(unsigned long)wr->wr_id)->wr_op !=
@@ -1029,7 +1084,8 @@
 		if (xprt->sc_sq_depth == atomic_read(&xprt->sc_sq_count)) {
 			spin_unlock_bh(&xprt->sc_lock);
 			atomic_inc(&rdma_stat_sq_starve);
-			/* See if we can reap some SQ WR */
+
+			/* See if we can opportunistically reap SQ WR to make room */
 			sq_cq_reap(xprt);
 
 			/* Wait until SQ WR available if SQ still full */
@@ -1041,22 +1097,25 @@
 			continue;
 		}
 		/* Bumped used SQ WR count and post */
+		svc_xprt_get(&xprt->sc_xprt);
 		ret = ib_post_send(xprt->sc_qp, wr, &bad_wr);
 		if (!ret)
 			atomic_inc(&xprt->sc_sq_count);
-		else
+		else {
+			svc_xprt_put(&xprt->sc_xprt);
 			dprintk("svcrdma: failed to post SQ WR rc=%d, "
 			       "sc_sq_count=%d, sc_sq_depth=%d\n",
 			       ret, atomic_read(&xprt->sc_sq_count),
 			       xprt->sc_sq_depth);
+		}
 		spin_unlock_bh(&xprt->sc_lock);
 		break;
 	}
 	return ret;
 }
 
-int svc_rdma_send_error(struct svcxprt_rdma *xprt, struct rpcrdma_msg *rmsgp,
-			enum rpcrdma_errcode err)
+void svc_rdma_send_error(struct svcxprt_rdma *xprt, struct rpcrdma_msg *rmsgp,
+			 enum rpcrdma_errcode err)
 {
 	struct ib_send_wr err_wr;
 	struct ib_sge sge;
@@ -1094,9 +1153,8 @@
 	/* Post It */
 	ret = svc_rdma_send(xprt, &err_wr);
 	if (ret) {
-		dprintk("svcrdma: Error posting send = %d\n", ret);
+		dprintk("svcrdma: Error %d posting send for protocol error\n",
+			ret);
 		svc_rdma_put_context(ctxt, 1);
 	}
-
-	return ret;
 }