Merge branch 'flexfiles'

* flexfiles: (53 commits)
  pnfs: lookup new lseg at lseg boundary
  nfs41: .init_read and .init_write can be called with valid pg_lseg
  pnfs: Update documentation on the Layout Drivers
  pnfs/flexfiles: Add the FlexFile Layout Driver
  nfs: count DIO good bytes correctly with mirroring
  nfs41: wait for LAYOUTRETURN before retrying LAYOUTGET
  nfs: add a helper to set NFS_ODIRECT_RESCHED_WRITES to direct writes
  nfs41: add NFS_LAYOUT_RETRY_LAYOUTGET to layout header flags
  nfs/flexfiles: send layoutreturn before freeing lseg
  nfs41: introduce NFS_LAYOUT_RETURN_BEFORE_CLOSE
  nfs41: allow async version layoutreturn
  nfs41: add range to layoutreturn args
  pnfs: allow LD to ask to resend read through pnfs
  nfs: add nfs_pgio_current_mirror helper
  nfs: only reset desc->pg_mirror_idx when mirroring is supported
  nfs41: add a debug warning if we destroy an unempty layout
  pnfs: fail comparison when bucket verifier not set
  nfs: mirroring support for direct io
  nfs: add mirroring support to pgio layer
  pnfs: pass ds_commit_idx through the commit path
  ...

Conflicts:
	fs/nfs/pnfs.c
	fs/nfs/pnfs.h
diff --git a/fs/nfs/callback.c b/fs/nfs/callback.c
index b8fb3a4..351be920 100644
--- a/fs/nfs/callback.c
+++ b/fs/nfs/callback.c
@@ -128,22 +128,24 @@
 		if (try_to_freeze())
 			continue;
 
-		prepare_to_wait(&serv->sv_cb_waitq, &wq, TASK_INTERRUPTIBLE);
+		prepare_to_wait(&serv->sv_cb_waitq, &wq, TASK_UNINTERRUPTIBLE);
 		spin_lock_bh(&serv->sv_cb_lock);
 		if (!list_empty(&serv->sv_cb_list)) {
 			req = list_first_entry(&serv->sv_cb_list,
 					struct rpc_rqst, rq_bc_list);
 			list_del(&req->rq_bc_list);
 			spin_unlock_bh(&serv->sv_cb_lock);
+			finish_wait(&serv->sv_cb_waitq, &wq);
 			dprintk("Invoking bc_svc_process()\n");
 			error = bc_svc_process(serv, req, rqstp);
 			dprintk("bc_svc_process() returned w/ error code= %d\n",
 				error);
 		} else {
 			spin_unlock_bh(&serv->sv_cb_lock);
-			schedule();
+			/* schedule_timeout to game the hung task watchdog */
+			schedule_timeout(60 * HZ);
+			finish_wait(&serv->sv_cb_waitq, &wq);
 		}
-		finish_wait(&serv->sv_cb_waitq, &wq);
 	}
 	return 0;
 }
diff --git a/fs/nfs/delegation.c b/fs/nfs/delegation.c
index 7f3f606..16b754e 100644
--- a/fs/nfs/delegation.c
+++ b/fs/nfs/delegation.c
@@ -301,6 +301,17 @@
 	return nfs_detach_delegation(nfsi, delegation, server);
 }
 
+static void
+nfs_update_inplace_delegation(struct nfs_delegation *delegation,
+		const struct nfs_delegation *update)
+{
+	if (nfs4_stateid_is_newer(&update->stateid, &delegation->stateid)) {
+		delegation->stateid.seqid = update->stateid.seqid;
+		smp_wmb();
+		delegation->type = update->type;
+	}
+}
+
 /**
  * nfs_inode_set_delegation - set up a delegation on an inode
  * @inode: inode to which delegation applies
@@ -334,9 +345,12 @@
 	old_delegation = rcu_dereference_protected(nfsi->delegation,
 					lockdep_is_held(&clp->cl_lock));
 	if (old_delegation != NULL) {
-		if (nfs4_stateid_match(&delegation->stateid,
-					&old_delegation->stateid) &&
-				delegation->type == old_delegation->type) {
+		/* Is this an update of the existing delegation? */
+		if (nfs4_stateid_match_other(&old_delegation->stateid,
+					&delegation->stateid)) {
+			nfs_update_inplace_delegation(old_delegation,
+					delegation);
+			nfsi->delegation_state = old_delegation->type;
 			goto out;
 		}
 		/*
diff --git a/fs/nfs/direct.c b/fs/nfs/direct.c
index 3715b49..7077521 100644
--- a/fs/nfs/direct.c
+++ b/fs/nfs/direct.c
@@ -253,6 +253,12 @@
  */
 ssize_t nfs_direct_IO(int rw, struct kiocb *iocb, struct iov_iter *iter, loff_t pos)
 {
+	struct inode *inode = iocb->ki_filp->f_mapping->host;
+
+	/* we only support swap file calling nfs_direct_IO */
+	if (!IS_SWAPFILE(inode))
+		return 0;
+
 #ifndef CONFIG_NFS_SWAP
 	dprintk("NFS: nfs_direct_IO (%pD) off/no(%Ld/%lu) EINVAL\n",
 			iocb->ki_filp, (long long) pos, iter->nr_segs);
diff --git a/fs/nfs/inode.c b/fs/nfs/inode.c
index 4bffe63..d2398c1 100644
--- a/fs/nfs/inode.c
+++ b/fs/nfs/inode.c
@@ -352,8 +352,9 @@
 
 	nfs_attr_check_mountpoint(sb, fattr);
 
-	if (((fattr->valid & NFS_ATTR_FATTR_FILEID) == 0) &&
-	    !nfs_attr_use_mounted_on_fileid(fattr))
+	if (nfs_attr_use_mounted_on_fileid(fattr))
+		fattr->fileid = fattr->mounted_on_fileid;
+	else if ((fattr->valid & NFS_ATTR_FATTR_FILEID) == 0)
 		goto out_no_inode;
 	if ((fattr->valid & NFS_ATTR_FATTR_TYPE) == 0)
 		goto out_no_inode;
@@ -506,10 +507,15 @@
 		attr->ia_valid &= ~ATTR_MODE;
 
 	if (attr->ia_valid & ATTR_SIZE) {
+		loff_t i_size;
+
 		BUG_ON(!S_ISREG(inode->i_mode));
 
-		if (attr->ia_size == i_size_read(inode))
+		i_size = i_size_read(inode);
+		if (attr->ia_size == i_size)
 			attr->ia_valid &= ~ATTR_SIZE;
+		else if (attr->ia_size < i_size && IS_SWAPFILE(inode))
+			return -ETXTBSY;
 	}
 
 	/* Optimization: if the end result is no change, don't RPC */
diff --git a/fs/nfs/internal.h b/fs/nfs/internal.h
index 44e8496..a98cf20 100644
--- a/fs/nfs/internal.h
+++ b/fs/nfs/internal.h
@@ -32,8 +32,6 @@
 	    (((fattr->valid & NFS_ATTR_FATTR_MOUNTPOINT) == 0) &&
 	     ((fattr->valid & NFS_ATTR_FATTR_V4_REFERRAL) == 0)))
 		return 0;
-
-	fattr->fileid = fattr->mounted_on_fileid;
 	return 1;
 }
 
diff --git a/fs/nfs/nfs4_fs.h b/fs/nfs/nfs4_fs.h
index b3c771e..fdef424 100644
--- a/fs/nfs/nfs4_fs.h
+++ b/fs/nfs/nfs4_fs.h
@@ -44,6 +44,7 @@
 #define NFS4_RENEW_TIMEOUT		0x01
 #define NFS4_RENEW_DELEGATION_CB	0x02
 
+struct nfs_seqid_counter;
 struct nfs4_minor_version_ops {
 	u32	minor_version;
 	unsigned init_caps;
@@ -56,6 +57,8 @@
 			struct nfs_fsinfo *);
 	void	(*free_lock_state)(struct nfs_server *,
 			struct nfs4_lock_state *);
+	struct nfs_seqid *
+		(*alloc_seqid)(struct nfs_seqid_counter *, gfp_t);
 	const struct rpc_call_ops *call_sync_ops;
 	const struct nfs4_state_recovery_ops *reboot_recovery_ops;
 	const struct nfs4_state_recovery_ops *nograce_recovery_ops;
diff --git a/fs/nfs/nfs4client.c b/fs/nfs/nfs4client.c
index 102d967..8646af9 100644
--- a/fs/nfs/nfs4client.c
+++ b/fs/nfs/nfs4client.c
@@ -639,7 +639,7 @@
 			prev = pos;
 
 			status = nfs_wait_client_init_complete(pos);
-			if (status == 0) {
+			if (pos->cl_cons_state == NFS_CS_SESSION_INITING) {
 				nfs4_schedule_lease_recovery(pos);
 				status = nfs4_wait_clnt_recover(pos);
 			}
diff --git a/fs/nfs/nfs4proc.c b/fs/nfs/nfs4proc.c
index ca6dda0..6e1c9b2 100644
--- a/fs/nfs/nfs4proc.c
+++ b/fs/nfs/nfs4proc.c
@@ -980,6 +980,7 @@
 	struct dentry *parent = dget_parent(dentry);
 	struct inode *dir = parent->d_inode;
 	struct nfs_server *server = NFS_SERVER(dir);
+	struct nfs_seqid *(*alloc_seqid)(struct nfs_seqid_counter *, gfp_t);
 	struct nfs4_opendata *p;
 
 	p = kzalloc(sizeof(*p), gfp_mask);
@@ -990,8 +991,9 @@
 	if (IS_ERR(p->f_label))
 		goto err_free_p;
 
-	p->o_arg.seqid = nfs_alloc_seqid(&sp->so_seqid, gfp_mask);
-	if (p->o_arg.seqid == NULL)
+	alloc_seqid = server->nfs_client->cl_mvops->alloc_seqid;
+	p->o_arg.seqid = alloc_seqid(&sp->so_seqid, gfp_mask);
+	if (IS_ERR(p->o_arg.seqid))
 		goto err_free_label;
 	nfs_sb_active(dentry->d_sb);
 	p->dentry = dget(dentry);
@@ -1170,6 +1172,16 @@
 	return false;
 }
 
+static void nfs_resync_open_stateid_locked(struct nfs4_state *state)
+{
+	if (state->n_wronly)
+		set_bit(NFS_O_WRONLY_STATE, &state->flags);
+	if (state->n_rdonly)
+		set_bit(NFS_O_RDONLY_STATE, &state->flags);
+	if (state->n_rdwr)
+		set_bit(NFS_O_RDWR_STATE, &state->flags);
+}
+
 static void nfs_clear_open_stateid_locked(struct nfs4_state *state,
 		nfs4_stateid *stateid, fmode_t fmode)
 {
@@ -1188,8 +1200,12 @@
 	}
 	if (stateid == NULL)
 		return;
-	if (!nfs_need_update_open_stateid(state, stateid))
+	/* Handle races with OPEN */
+	if (!nfs4_stateid_match_other(stateid, &state->open_stateid) ||
+	    !nfs4_stateid_is_newer(stateid, &state->open_stateid)) {
+		nfs_resync_open_stateid_locked(state);
 		return;
+	}
 	if (test_bit(NFS_DELEGATED_STATE, &state->flags) == 0)
 		nfs4_stateid_copy(&state->stateid, stateid);
 	nfs4_stateid_copy(&state->open_stateid, stateid);
@@ -1284,6 +1300,23 @@
 	return ret;
 }
 
+static bool nfs4_update_lock_stateid(struct nfs4_lock_state *lsp,
+		const nfs4_stateid *stateid)
+{
+	struct nfs4_state *state = lsp->ls_state;
+	bool ret = false;
+
+	spin_lock(&state->state_lock);
+	if (!nfs4_stateid_match_other(stateid, &lsp->ls_stateid))
+		goto out_noupdate;
+	if (!nfs4_stateid_is_newer(stateid, &lsp->ls_stateid))
+		goto out_noupdate;
+	nfs4_stateid_copy(&lsp->ls_stateid, stateid);
+	ret = true;
+out_noupdate:
+	spin_unlock(&state->state_lock);
+	return ret;
+}
 
 static void nfs4_return_incompatible_delegation(struct inode *inode, fmode_t fmode)
 {
@@ -2590,6 +2623,11 @@
 		case -NFS4ERR_OLD_STATEID:
 		case -NFS4ERR_BAD_STATEID:
 		case -NFS4ERR_EXPIRED:
+			if (!nfs4_stateid_match(&calldata->arg.stateid,
+						&state->stateid)) {
+				rpc_restart_call_prepare(task);
+				goto out_release;
+			}
 			if (calldata->arg.fmode == 0)
 				break;
 		default:
@@ -2622,6 +2660,7 @@
 	is_rdwr = test_bit(NFS_O_RDWR_STATE, &state->flags);
 	is_rdonly = test_bit(NFS_O_RDONLY_STATE, &state->flags);
 	is_wronly = test_bit(NFS_O_WRONLY_STATE, &state->flags);
+	nfs4_stateid_copy(&calldata->arg.stateid, &state->stateid);
 	/* Calculate the change in open mode */
 	calldata->arg.fmode = 0;
 	if (state->n_rdwr == 0) {
@@ -2678,45 +2717,10 @@
 	.rpc_release = nfs4_free_closedata,
 };
 
-static bool nfs4_state_has_opener(struct nfs4_state *state)
-{
-	/* first check existing openers */
-	if (test_bit(NFS_O_RDONLY_STATE, &state->flags) != 0 &&
-	    state->n_rdonly != 0)
-		return true;
-
-	if (test_bit(NFS_O_WRONLY_STATE, &state->flags) != 0 &&
-	    state->n_wronly != 0)
-		return true;
-
-	if (test_bit(NFS_O_RDWR_STATE, &state->flags) != 0 &&
-	    state->n_rdwr != 0)
-		return true;
-
-	return false;
-}
-
 static bool nfs4_roc(struct inode *inode)
 {
-	struct nfs_inode *nfsi = NFS_I(inode);
-	struct nfs_open_context *ctx;
-	struct nfs4_state *state;
-
-	spin_lock(&inode->i_lock);
-	list_for_each_entry(ctx, &nfsi->open_files, list) {
-		state = ctx->state;
-		if (state == NULL)
-			continue;
-		if (nfs4_state_has_opener(state)) {
-			spin_unlock(&inode->i_lock);
-			return false;
-		}
-	}
-	spin_unlock(&inode->i_lock);
-
-	if (nfs4_check_delegation(inode, FMODE_READ))
+	if (!nfs_have_layout(inode))
 		return false;
-
 	return pnfs_roc(inode);
 }
 
@@ -2734,6 +2738,7 @@
 int nfs4_do_close(struct nfs4_state *state, gfp_t gfp_mask, int wait)
 {
 	struct nfs_server *server = NFS_SERVER(state->inode);
+	struct nfs_seqid *(*alloc_seqid)(struct nfs_seqid_counter *, gfp_t);
 	struct nfs4_closedata *calldata;
 	struct nfs4_state_owner *sp = state->owner;
 	struct rpc_task *task;
@@ -2760,10 +2765,10 @@
 	calldata->inode = state->inode;
 	calldata->state = state;
 	calldata->arg.fh = NFS_FH(state->inode);
-	calldata->arg.stateid = &state->open_stateid;
 	/* Serialization for the sequence id */
-	calldata->arg.seqid = nfs_alloc_seqid(&state->owner->so_seqid, gfp_mask);
-	if (calldata->arg.seqid == NULL)
+	alloc_seqid = server->nfs_client->cl_mvops->alloc_seqid;
+	calldata->arg.seqid = alloc_seqid(&state->owner->so_seqid, gfp_mask);
+	if (IS_ERR(calldata->arg.seqid))
 		goto out_free_calldata;
 	calldata->arg.fmode = 0;
 	calldata->arg.bitmask = server->cache_consistency_bitmask;
@@ -5356,7 +5361,6 @@
 	p->arg.fl = &p->fl;
 	p->arg.seqid = seqid;
 	p->res.seqid = seqid;
-	p->arg.stateid = &lsp->ls_stateid;
 	p->lsp = lsp;
 	atomic_inc(&lsp->ls_count);
 	/* Ensure we don't close file until we're done freeing locks! */
@@ -5383,14 +5387,18 @@
 		return;
 	switch (task->tk_status) {
 		case 0:
-			nfs4_stateid_copy(&calldata->lsp->ls_stateid,
-					&calldata->res.stateid);
 			renew_lease(calldata->server, calldata->timestamp);
-			break;
+			do_vfs_lock(calldata->fl.fl_file, &calldata->fl);
+			if (nfs4_update_lock_stateid(calldata->lsp,
+					&calldata->res.stateid))
+				break;
 		case -NFS4ERR_BAD_STATEID:
 		case -NFS4ERR_OLD_STATEID:
 		case -NFS4ERR_STALE_STATEID:
 		case -NFS4ERR_EXPIRED:
+			if (!nfs4_stateid_match(&calldata->arg.stateid,
+						&calldata->lsp->ls_stateid))
+				rpc_restart_call_prepare(task);
 			break;
 		default:
 			if (nfs4_async_handle_error(task, calldata->server,
@@ -5406,6 +5414,7 @@
 
 	if (nfs_wait_on_sequence(calldata->arg.seqid, task) != 0)
 		goto out_wait;
+	nfs4_stateid_copy(&calldata->arg.stateid, &calldata->lsp->ls_stateid);
 	if (test_bit(NFS_LOCK_INITIALIZED, &calldata->lsp->ls_flags) == 0) {
 		/* Note: exit _without_ running nfs4_locku_done */
 		goto out_no_action;
@@ -5476,6 +5485,7 @@
 	struct nfs_seqid *seqid;
 	struct nfs4_lock_state *lsp;
 	struct rpc_task *task;
+	struct nfs_seqid *(*alloc_seqid)(struct nfs_seqid_counter *, gfp_t);
 	int status = 0;
 	unsigned char fl_flags = request->fl_flags;
 
@@ -5499,9 +5509,10 @@
 	lsp = request->fl_u.nfs4_fl.owner;
 	if (test_bit(NFS_LOCK_INITIALIZED, &lsp->ls_flags) == 0)
 		goto out;
-	seqid = nfs_alloc_seqid(&lsp->ls_seqid, GFP_KERNEL);
+	alloc_seqid = NFS_SERVER(inode)->nfs_client->cl_mvops->alloc_seqid;
+	seqid = alloc_seqid(&lsp->ls_seqid, GFP_KERNEL);
 	status = -ENOMEM;
-	if (seqid == NULL)
+	if (IS_ERR(seqid))
 		goto out;
 	task = nfs4_do_unlck(request, nfs_file_open_context(request->fl_file), lsp, seqid);
 	status = PTR_ERR(task);
@@ -5534,6 +5545,7 @@
 	struct nfs4_lockdata *p;
 	struct inode *inode = lsp->ls_state->inode;
 	struct nfs_server *server = NFS_SERVER(inode);
+	struct nfs_seqid *(*alloc_seqid)(struct nfs_seqid_counter *, gfp_t);
 
 	p = kzalloc(sizeof(*p), gfp_mask);
 	if (p == NULL)
@@ -5542,12 +5554,12 @@
 	p->arg.fh = NFS_FH(inode);
 	p->arg.fl = &p->fl;
 	p->arg.open_seqid = nfs_alloc_seqid(&lsp->ls_state->owner->so_seqid, gfp_mask);
-	if (p->arg.open_seqid == NULL)
+	if (IS_ERR(p->arg.open_seqid))
 		goto out_free;
-	p->arg.lock_seqid = nfs_alloc_seqid(&lsp->ls_seqid, gfp_mask);
-	if (p->arg.lock_seqid == NULL)
+	alloc_seqid = server->nfs_client->cl_mvops->alloc_seqid;
+	p->arg.lock_seqid = alloc_seqid(&lsp->ls_seqid, gfp_mask);
+	if (IS_ERR(p->arg.lock_seqid))
 		goto out_free_seqid;
-	p->arg.lock_stateid = &lsp->ls_stateid;
 	p->arg.lock_owner.clientid = server->nfs_client->cl_clientid;
 	p->arg.lock_owner.id = lsp->ls_seqid.owner_id;
 	p->arg.lock_owner.s_dev = server->s_dev;
@@ -5574,15 +5586,19 @@
 	if (nfs_wait_on_sequence(data->arg.lock_seqid, task) != 0)
 		goto out_wait;
 	/* Do we need to do an open_to_lock_owner? */
-	if (!(data->arg.lock_seqid->sequence->flags & NFS_SEQID_CONFIRMED)) {
+	if (!test_bit(NFS_LOCK_INITIALIZED, &data->lsp->ls_flags)) {
 		if (nfs_wait_on_sequence(data->arg.open_seqid, task) != 0) {
 			goto out_release_lock_seqid;
 		}
-		data->arg.open_stateid = &state->open_stateid;
+		nfs4_stateid_copy(&data->arg.open_stateid,
+				&state->open_stateid);
 		data->arg.new_lock_owner = 1;
 		data->res.open_seqid = data->arg.open_seqid;
-	} else
+	} else {
 		data->arg.new_lock_owner = 0;
+		nfs4_stateid_copy(&data->arg.lock_stateid,
+				&data->lsp->ls_stateid);
+	}
 	if (!nfs4_valid_open_stateid(state)) {
 		data->rpc_status = -EBADF;
 		task->tk_action = NULL;
@@ -5606,6 +5622,7 @@
 static void nfs4_lock_done(struct rpc_task *task, void *calldata)
 {
 	struct nfs4_lockdata *data = calldata;
+	struct nfs4_lock_state *lsp = data->lsp;
 
 	dprintk("%s: begin!\n", __func__);
 
@@ -5613,18 +5630,36 @@
 		return;
 
 	data->rpc_status = task->tk_status;
-	if (data->arg.new_lock_owner != 0) {
-		if (data->rpc_status == 0)
-			nfs_confirm_seqid(&data->lsp->ls_seqid, 0);
-		else
-			goto out;
+	switch (task->tk_status) {
+	case 0:
+		renew_lease(NFS_SERVER(data->ctx->dentry->d_inode),
+				data->timestamp);
+		if (data->arg.new_lock) {
+			data->fl.fl_flags &= ~(FL_SLEEP | FL_ACCESS);
+			if (do_vfs_lock(data->fl.fl_file, &data->fl) < 0) {
+				rpc_restart_call_prepare(task);
+				break;
+			}
+		}
+		if (data->arg.new_lock_owner != 0) {
+			nfs_confirm_seqid(&lsp->ls_seqid, 0);
+			nfs4_stateid_copy(&lsp->ls_stateid, &data->res.stateid);
+			set_bit(NFS_LOCK_INITIALIZED, &lsp->ls_flags);
+		} else if (!nfs4_update_lock_stateid(lsp, &data->res.stateid))
+			rpc_restart_call_prepare(task);
+		break;
+	case -NFS4ERR_BAD_STATEID:
+	case -NFS4ERR_OLD_STATEID:
+	case -NFS4ERR_STALE_STATEID:
+	case -NFS4ERR_EXPIRED:
+		if (data->arg.new_lock_owner != 0) {
+			if (!nfs4_stateid_match(&data->arg.open_stateid,
+						&lsp->ls_state->open_stateid))
+				rpc_restart_call_prepare(task);
+		} else if (!nfs4_stateid_match(&data->arg.lock_stateid,
+						&lsp->ls_stateid))
+				rpc_restart_call_prepare(task);
 	}
-	if (data->rpc_status == 0) {
-		nfs4_stateid_copy(&data->lsp->ls_stateid, &data->res.stateid);
-		set_bit(NFS_LOCK_INITIALIZED, &data->lsp->ls_flags);
-		renew_lease(NFS_SERVER(data->ctx->dentry->d_inode), data->timestamp);
-	}
-out:
 	dprintk("%s: done, ret = %d!\n", __func__, data->rpc_status);
 }
 
@@ -5705,7 +5740,8 @@
 		if (recovery_type == NFS_LOCK_RECLAIM)
 			data->arg.reclaim = NFS_LOCK_RECLAIM;
 		nfs4_set_sequence_privileged(&data->arg.seq_args);
-	}
+	} else
+		data->arg.new_lock = 1;
 	task = rpc_run_task(&task_setup_data);
 	if (IS_ERR(task))
 		return PTR_ERR(task);
@@ -5829,10 +5865,8 @@
 
 static int _nfs4_proc_setlk(struct nfs4_state *state, int cmd, struct file_lock *request)
 {
-	struct nfs4_state_owner *sp = state->owner;
 	struct nfs_inode *nfsi = NFS_I(state->inode);
 	unsigned char fl_flags = request->fl_flags;
-	unsigned int seq;
 	int status = -ENOLCK;
 
 	if ((fl_flags & FL_POSIX) &&
@@ -5852,25 +5886,11 @@
 		/* ...but avoid races with delegation recall... */
 		request->fl_flags = fl_flags & ~FL_SLEEP;
 		status = do_vfs_lock(request->fl_file, request);
-		goto out_unlock;
+		up_read(&nfsi->rwsem);
+		goto out;
 	}
-	seq = raw_seqcount_begin(&sp->so_reclaim_seqcount);
 	up_read(&nfsi->rwsem);
 	status = _nfs4_do_setlk(state, cmd, request, NFS_LOCK_NEW);
-	if (status != 0)
-		goto out;
-	down_read(&nfsi->rwsem);
-	if (read_seqcount_retry(&sp->so_reclaim_seqcount, seq)) {
-		status = -NFS4ERR_DELAY;
-		goto out_unlock;
-	}
-	/* Note: we always want to sleep here! */
-	request->fl_flags = fl_flags | FL_SLEEP;
-	if (do_vfs_lock(request->fl_file, request) < 0)
-		printk(KERN_WARNING "NFS: %s: VFS is out of sync with lock "
-			"manager!\n", __func__);
-out_unlock:
-	up_read(&nfsi->rwsem);
 out:
 	request->fl_flags = fl_flags;
 	return status;
@@ -8409,6 +8429,7 @@
 	.match_stateid = nfs4_match_stateid,
 	.find_root_sec = nfs4_find_root_sec,
 	.free_lock_state = nfs4_release_lockowner,
+	.alloc_seqid = nfs_alloc_seqid,
 	.call_sync_ops = &nfs40_call_sync_ops,
 	.reboot_recovery_ops = &nfs40_reboot_recovery_ops,
 	.nograce_recovery_ops = &nfs40_nograce_recovery_ops,
@@ -8417,6 +8438,12 @@
 };
 
 #if defined(CONFIG_NFS_V4_1)
+static struct nfs_seqid *
+nfs_alloc_no_seqid(struct nfs_seqid_counter *arg1, gfp_t arg2)
+{
+	return NULL;
+}
+
 static const struct nfs4_minor_version_ops nfs_v4_1_minor_ops = {
 	.minor_version = 1,
 	.init_caps = NFS_CAP_READDIRPLUS
@@ -8430,6 +8457,7 @@
 	.match_stateid = nfs41_match_stateid,
 	.find_root_sec = nfs41_find_root_sec,
 	.free_lock_state = nfs41_free_lock_state,
+	.alloc_seqid = nfs_alloc_no_seqid,
 	.call_sync_ops = &nfs41_call_sync_ops,
 	.reboot_recovery_ops = &nfs41_reboot_recovery_ops,
 	.nograce_recovery_ops = &nfs41_nograce_recovery_ops,
@@ -8456,6 +8484,7 @@
 	.find_root_sec = nfs41_find_root_sec,
 	.free_lock_state = nfs41_free_lock_state,
 	.call_sync_ops = &nfs41_call_sync_ops,
+	.alloc_seqid = nfs_alloc_no_seqid,
 	.reboot_recovery_ops = &nfs41_reboot_recovery_ops,
 	.nograce_recovery_ops = &nfs41_nograce_recovery_ops,
 	.state_renewal_ops = &nfs41_state_renewal_ops,
diff --git a/fs/nfs/nfs4state.c b/fs/nfs/nfs4state.c
index 5194933..590f096 100644
--- a/fs/nfs/nfs4state.c
+++ b/fs/nfs/nfs4state.c
@@ -1003,11 +1003,11 @@
 	struct nfs_seqid *new;
 
 	new = kmalloc(sizeof(*new), gfp_mask);
-	if (new != NULL) {
-		new->sequence = counter;
-		INIT_LIST_HEAD(&new->list);
-		new->task = NULL;
-	}
+	if (new == NULL)
+		return ERR_PTR(-ENOMEM);
+	new->sequence = counter;
+	INIT_LIST_HEAD(&new->list);
+	new->task = NULL;
 	return new;
 }
 
@@ -1015,7 +1015,7 @@
 {
 	struct nfs_seqid_counter *sequence;
 
-	if (list_empty(&seqid->list))
+	if (seqid == NULL || list_empty(&seqid->list))
 		return;
 	sequence = seqid->sequence;
 	spin_lock(&sequence->lock);
@@ -1071,13 +1071,15 @@
 
 void nfs_increment_open_seqid(int status, struct nfs_seqid *seqid)
 {
-	struct nfs4_state_owner *sp = container_of(seqid->sequence,
-					struct nfs4_state_owner, so_seqid);
-	struct nfs_server *server = sp->so_server;
+	struct nfs4_state_owner *sp;
 
+	if (seqid == NULL)
+		return;
+
+	sp = container_of(seqid->sequence, struct nfs4_state_owner, so_seqid);
 	if (status == -NFS4ERR_BAD_SEQID)
 		nfs4_drop_state_owner(sp);
-	if (!nfs4_has_session(server->nfs_client))
+	if (!nfs4_has_session(sp->so_server->nfs_client))
 		nfs_increment_seqid(status, seqid);
 }
 
@@ -1088,14 +1090,18 @@
  */
 void nfs_increment_lock_seqid(int status, struct nfs_seqid *seqid)
 {
-	nfs_increment_seqid(status, seqid);
+	if (seqid != NULL)
+		nfs_increment_seqid(status, seqid);
 }
 
 int nfs_wait_on_sequence(struct nfs_seqid *seqid, struct rpc_task *task)
 {
-	struct nfs_seqid_counter *sequence = seqid->sequence;
+	struct nfs_seqid_counter *sequence;
 	int status = 0;
 
+	if (seqid == NULL)
+		goto out;
+	sequence = seqid->sequence;
 	spin_lock(&sequence->lock);
 	seqid->task = task;
 	if (list_empty(&seqid->list))
@@ -1106,6 +1112,7 @@
 	status = -EAGAIN;
 unlock:
 	spin_unlock(&sequence->lock);
+out:
 	return status;
 }
 
diff --git a/fs/nfs/nfs4xdr.c b/fs/nfs/nfs4xdr.c
index 56d4c91..a2329d6 100644
--- a/fs/nfs/nfs4xdr.c
+++ b/fs/nfs/nfs4xdr.c
@@ -946,7 +946,10 @@
 static void encode_nfs4_seqid(struct xdr_stream *xdr,
 		const struct nfs_seqid *seqid)
 {
-	encode_uint32(xdr, seqid->sequence->counter);
+	if (seqid != NULL)
+		encode_uint32(xdr, seqid->sequence->counter);
+	else
+		encode_uint32(xdr, 0);
 }
 
 static void encode_compound_hdr(struct xdr_stream *xdr,
@@ -1125,7 +1128,7 @@
 {
 	encode_op_hdr(xdr, OP_CLOSE, decode_close_maxsz, hdr);
 	encode_nfs4_seqid(xdr, arg->seqid);
-	encode_nfs4_stateid(xdr, arg->stateid);
+	encode_nfs4_stateid(xdr, &arg->stateid);
 }
 
 static void encode_commit(struct xdr_stream *xdr, const struct nfs_commitargs *args, struct compound_hdr *hdr)
@@ -1301,12 +1304,12 @@
 	*p = cpu_to_be32(args->new_lock_owner);
 	if (args->new_lock_owner){
 		encode_nfs4_seqid(xdr, args->open_seqid);
-		encode_nfs4_stateid(xdr, args->open_stateid);
+		encode_nfs4_stateid(xdr, &args->open_stateid);
 		encode_nfs4_seqid(xdr, args->lock_seqid);
 		encode_lockowner(xdr, &args->lock_owner);
 	}
 	else {
-		encode_nfs4_stateid(xdr, args->lock_stateid);
+		encode_nfs4_stateid(xdr, &args->lock_stateid);
 		encode_nfs4_seqid(xdr, args->lock_seqid);
 	}
 }
@@ -1330,7 +1333,7 @@
 	encode_op_hdr(xdr, OP_LOCKU, decode_locku_maxsz, hdr);
 	encode_uint32(xdr, nfs4_lock_type(args->fl, 0));
 	encode_nfs4_seqid(xdr, args->seqid);
-	encode_nfs4_stateid(xdr, args->stateid);
+	encode_nfs4_stateid(xdr, &args->stateid);
 	p = reserve_space(xdr, 16);
 	p = xdr_encode_hyper(p, args->fl->fl_start);
 	xdr_encode_hyper(p, nfs4_lock_length(args->fl));
@@ -1530,7 +1533,7 @@
 static void encode_open_downgrade(struct xdr_stream *xdr, const struct nfs_closeargs *arg, struct compound_hdr *hdr)
 {
 	encode_op_hdr(xdr, OP_OPEN_DOWNGRADE, decode_open_downgrade_maxsz, hdr);
-	encode_nfs4_stateid(xdr, arg->stateid);
+	encode_nfs4_stateid(xdr, &arg->stateid);
 	encode_nfs4_seqid(xdr, arg->seqid);
 	encode_share_access(xdr, arg->fmode);
 }
@@ -1801,9 +1804,8 @@
 				  struct compound_hdr *hdr)
 {
 	__be32 *p;
-	char machine_name[NFS4_MAX_MACHINE_NAME_LEN];
-	uint32_t len;
 	struct nfs_client *clp = args->client;
+	struct rpc_clnt *clnt = clp->cl_rpcclient;
 	struct nfs_net *nn = net_generic(clp->cl_net, nfs_net_id);
 	u32 max_resp_sz_cached;
 
@@ -1814,11 +1816,8 @@
 	max_resp_sz_cached = (NFS4_dec_open_sz + RPC_REPHDRSIZE +
 			      RPC_MAX_AUTH_SIZE + 2) * XDR_UNIT;
 
-	len = scnprintf(machine_name, sizeof(machine_name), "%s",
-			clp->cl_ipaddr);
-
 	encode_op_hdr(xdr, OP_CREATE_SESSION, decode_create_session_maxsz, hdr);
-	p = reserve_space(xdr, 16 + 2*28 + 20 + len + 12);
+	p = reserve_space(xdr, 16 + 2*28 + 20 + clnt->cl_nodelen + 12);
 	p = xdr_encode_hyper(p, clp->cl_clientid);
 	*p++ = cpu_to_be32(clp->cl_seqid);			/*Sequence id */
 	*p++ = cpu_to_be32(args->flags);			/*flags */
@@ -1847,7 +1846,7 @@
 
 	/* authsys_parms rfc1831 */
 	*p++ = cpu_to_be32(nn->boot_time.tv_nsec);	/* stamp */
-	p = xdr_encode_opaque(p, machine_name, len);
+	p = xdr_encode_array(p, clnt->cl_nodename, clnt->cl_nodelen);
 	*p++ = cpu_to_be32(0);				/* UID */
 	*p++ = cpu_to_be32(0);				/* GID */
 	*p = cpu_to_be32(0);				/* No more gids */
diff --git a/fs/nfs/nfsroot.c b/fs/nfs/nfsroot.c
index cd3c910..9bc9f04 100644
--- a/fs/nfs/nfsroot.c
+++ b/fs/nfs/nfsroot.c
@@ -261,11 +261,11 @@
 	 */
 	len = snprintf(nfs_export_path, sizeof(nfs_export_path),
 				tmp, utsname()->nodename);
-	if (len > (int)sizeof(nfs_export_path))
+	if (len >= (int)sizeof(nfs_export_path))
 		goto out_devnametoolong;
 	len = snprintf(nfs_root_device, sizeof(nfs_root_device),
 				"%pI4:%s", &servaddr, nfs_export_path);
-	if (len > (int)sizeof(nfs_root_device))
+	if (len >= (int)sizeof(nfs_root_device))
 		goto out_devnametoolong;
 
 	retval = 0;
diff --git a/fs/nfs/pnfs.c b/fs/nfs/pnfs.c
index 9304984..703501d 100644
--- a/fs/nfs/pnfs.c
+++ b/fs/nfs/pnfs.c
@@ -34,6 +34,7 @@
 #include "pnfs.h"
 #include "iostat.h"
 #include "nfs4trace.h"
+#include "delegation.h"
 
 #define NFSDBG_FACILITY		NFSDBG_PNFS
 #define PNFS_LAYOUTGET_RETRY_TIMEOUT (120*HZ)
@@ -1066,6 +1067,9 @@
 
 bool pnfs_roc(struct inode *ino)
 {
+	struct nfs_inode *nfsi = NFS_I(ino);
+	struct nfs_open_context *ctx;
+	struct nfs4_state *state;
 	struct pnfs_layout_hdr *lo;
 	struct pnfs_layout_segment *lseg, *tmp;
 	nfs4_stateid stateid;
@@ -1073,10 +1077,23 @@
 	bool found = false, layoutreturn = false;
 
 	spin_lock(&ino->i_lock);
-	lo = NFS_I(ino)->layout;
+	lo = nfsi->layout;
 	if (!lo || !test_and_clear_bit(NFS_LAYOUT_ROC, &lo->plh_flags) ||
 	    test_bit(NFS_LAYOUT_BULK_RECALL, &lo->plh_flags))
-		goto out_nolayout;
+		goto out_noroc;
+
+	/* Don't return layout if we hold a delegation */
+	if (nfs4_check_delegation(ino, FMODE_READ))
+		goto out_noroc;
+
+	list_for_each_entry(ctx, &nfsi->open_files, list) {
+		state = ctx->state;
+		/* Don't return layout if there is open file state */
+		if (state != NULL && state->state != 0)
+			goto out_noroc;
+	}
+
+		goto out_noroc;
 	pnfs_clear_retry_layoutget(lo);
 	list_for_each_entry_safe(lseg, tmp, &lo->plh_segs, pls_list)
 		if (test_bit(NFS_LSEG_ROC, &lseg->pls_flags)) {
@@ -1084,14 +1101,14 @@
 			found = true;
 		}
 	if (!found)
-		goto out_nolayout;
+		goto out_noroc;
 	lo->plh_block_lgets++;
 	pnfs_get_layout_hdr(lo); /* matched in pnfs_roc_release */
 	spin_unlock(&ino->i_lock);
 	pnfs_free_lseg_list(&tmp_list);
 	return true;
 
-out_nolayout:
+out_noroc:
 	if (lo) {
 		stateid = lo->plh_stateid;
 		layoutreturn =
diff --git a/fs/nfs/pnfs.h b/fs/nfs/pnfs.h
index 7642021..797cd62 100644
--- a/fs/nfs/pnfs.h
+++ b/fs/nfs/pnfs.h
@@ -345,6 +345,11 @@
 						 struct xdr_stream *xdr,
 						 gfp_t gfp_flags);
 
+static inline bool nfs_have_layout(struct inode *inode)
+{
+	return NFS_I(inode)->layout != NULL;
+}
+
 static inline struct nfs4_deviceid_node *
 nfs4_get_deviceid(struct nfs4_deviceid_node *d)
 {
@@ -514,6 +519,11 @@
 #endif /* NFS_DEBUG */
 #else  /* CONFIG_NFS_V4_1 */
 
+static inline bool nfs_have_layout(struct inode *inode)
+{
+	return false;
+}
+
 static inline void pnfs_destroy_all_layouts(struct nfs_client *clp)
 {
 }
diff --git a/include/linux/nfs_xdr.h b/include/linux/nfs_xdr.h
index 3637923..8140112 100644
--- a/include/linux/nfs_xdr.h
+++ b/include/linux/nfs_xdr.h
@@ -390,7 +390,7 @@
 struct nfs_closeargs {
 	struct nfs4_sequence_args	seq_args;
 	struct nfs_fh *         fh;
-	nfs4_stateid *		stateid;
+	nfs4_stateid 		stateid;
 	struct nfs_seqid *	seqid;
 	fmode_t			fmode;
 	const u32 *		bitmask;
@@ -417,12 +417,13 @@
 	struct nfs_fh *		fh;
 	struct file_lock *	fl;
 	struct nfs_seqid *	lock_seqid;
-	nfs4_stateid *		lock_stateid;
+	nfs4_stateid		lock_stateid;
 	struct nfs_seqid *	open_seqid;
-	nfs4_stateid *		open_stateid;
+	nfs4_stateid		open_stateid;
 	struct nfs_lowner	lock_owner;
 	unsigned char		block : 1;
 	unsigned char		reclaim : 1;
+	unsigned char		new_lock : 1;
 	unsigned char		new_lock_owner : 1;
 };
 
@@ -438,7 +439,7 @@
 	struct nfs_fh *		fh;
 	struct file_lock *	fl;
 	struct nfs_seqid *	seqid;
-	nfs4_stateid *		stateid;
+	nfs4_stateid 		stateid;
 };
 
 struct nfs_locku_res {
diff --git a/include/linux/sunrpc/rpc_rdma.h b/include/linux/sunrpc/rpc_rdma.h
index b78f16b..f33c5a4 100644
--- a/include/linux/sunrpc/rpc_rdma.h
+++ b/include/linux/sunrpc/rpc_rdma.h
@@ -42,6 +42,9 @@
 
 #include <linux/types.h>
 
+#define RPCRDMA_VERSION		1
+#define rpcrdma_version		cpu_to_be32(RPCRDMA_VERSION)
+
 struct rpcrdma_segment {
 	__be32 rs_handle;	/* Registered memory handle */
 	__be32 rs_length;	/* Length of the chunk in bytes */
@@ -95,7 +98,10 @@
 	} rm_body;
 };
 
-#define RPCRDMA_HDRLEN_MIN	28
+/*
+ * Smallest RPC/RDMA header: rm_xid through rm_type, then rm_nochunks
+ */
+#define RPCRDMA_HDRLEN_MIN	(sizeof(__be32) * 7)
 
 enum rpcrdma_errcode {
 	ERR_VERS = 1,
@@ -115,4 +121,10 @@
 	RDMA_ERROR = 4		/* An RPC RDMA encoding error */
 };
 
+#define rdma_msg	cpu_to_be32(RDMA_MSG)
+#define rdma_nomsg	cpu_to_be32(RDMA_NOMSG)
+#define rdma_msgp	cpu_to_be32(RDMA_MSGP)
+#define rdma_done	cpu_to_be32(RDMA_DONE)
+#define rdma_error	cpu_to_be32(RDMA_ERROR)
+
 #endif				/* _LINUX_SUNRPC_RPC_RDMA_H */
diff --git a/include/linux/sunrpc/svc_rdma.h b/include/linux/sunrpc/svc_rdma.h
index 975da75..ddfe88f 100644
--- a/include/linux/sunrpc/svc_rdma.h
+++ b/include/linux/sunrpc/svc_rdma.h
@@ -63,8 +63,6 @@
 extern atomic_t rdma_stat_sq_poll;
 extern atomic_t rdma_stat_sq_prod;
 
-#define RPCRDMA_VERSION 1
-
 /*
  * Contexts are built when an RDMA request is created and are a
  * record of the resources that can be recovered when the request
diff --git a/net/sunrpc/sched.c b/net/sunrpc/sched.c
index d20f232..b91fd9c 100644
--- a/net/sunrpc/sched.c
+++ b/net/sunrpc/sched.c
@@ -844,10 +844,10 @@
 void *rpc_malloc(struct rpc_task *task, size_t size)
 {
 	struct rpc_buffer *buf;
-	gfp_t gfp = GFP_NOWAIT | __GFP_NOWARN;
+	gfp_t gfp = GFP_NOIO | __GFP_NOWARN;
 
 	if (RPC_IS_SWAPPER(task))
-		gfp |= __GFP_MEMALLOC;
+		gfp = __GFP_MEMALLOC | GFP_NOWAIT | __GFP_NOWARN;
 
 	size += sizeof(struct rpc_buffer);
 	if (size <= RPC_BUFFER_MAXSIZE)
@@ -1069,7 +1069,8 @@
 	 * Create the rpciod thread and wait for it to start.
 	 */
 	dprintk("RPC:       creating workqueue rpciod\n");
-	wq = alloc_workqueue("rpciod", WQ_MEM_RECLAIM, 1);
+	/* Note: highpri because network receive is latency sensitive */
+	wq = alloc_workqueue("rpciod", WQ_MEM_RECLAIM | WQ_HIGHPRI, 0);
 	rpciod_workqueue = wq;
 	return rpciod_workqueue != NULL;
 }
diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
index df01d12..7e9acd9 100644
--- a/net/sunrpc/xprtrdma/rpc_rdma.c
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -209,9 +209,11 @@
 		if (cur_rchunk) {	/* read */
 			cur_rchunk->rc_discrim = xdr_one;
 			/* all read chunks have the same "position" */
-			cur_rchunk->rc_position = htonl(pos);
-			cur_rchunk->rc_target.rs_handle = htonl(seg->mr_rkey);
-			cur_rchunk->rc_target.rs_length = htonl(seg->mr_len);
+			cur_rchunk->rc_position = cpu_to_be32(pos);
+			cur_rchunk->rc_target.rs_handle =
+						cpu_to_be32(seg->mr_rkey);
+			cur_rchunk->rc_target.rs_length =
+						cpu_to_be32(seg->mr_len);
 			xdr_encode_hyper(
 					(__be32 *)&cur_rchunk->rc_target.rs_offset,
 					seg->mr_base);
@@ -222,8 +224,10 @@
 			cur_rchunk++;
 			r_xprt->rx_stats.read_chunk_count++;
 		} else {		/* write/reply */
-			cur_wchunk->wc_target.rs_handle = htonl(seg->mr_rkey);
-			cur_wchunk->wc_target.rs_length = htonl(seg->mr_len);
+			cur_wchunk->wc_target.rs_handle =
+						cpu_to_be32(seg->mr_rkey);
+			cur_wchunk->wc_target.rs_length =
+						cpu_to_be32(seg->mr_len);
 			xdr_encode_hyper(
 					(__be32 *)&cur_wchunk->wc_target.rs_offset,
 					seg->mr_base);
@@ -257,7 +261,7 @@
 		*iptr++ = xdr_zero;	/* encode a NULL reply chunk */
 	} else {
 		warray->wc_discrim = xdr_one;
-		warray->wc_nchunks = htonl(nchunks);
+		warray->wc_nchunks = cpu_to_be32(nchunks);
 		iptr = (__be32 *) cur_wchunk;
 		if (type == rpcrdma_writech) {
 			*iptr++ = xdr_zero; /* finish the write chunk list */
@@ -290,7 +294,7 @@
 rpcrdma_marshal_chunks(struct rpc_rqst *rqst, ssize_t result)
 {
 	struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
-	struct rpcrdma_msg *headerp = (struct rpcrdma_msg *)req->rl_base;
+	struct rpcrdma_msg *headerp = rdmab_to_msg(req->rl_rdmabuf);
 
 	if (req->rl_rtype != rpcrdma_noch)
 		result = rpcrdma_create_chunks(rqst, &rqst->rq_snd_buf,
@@ -402,13 +406,12 @@
 	base = rqst->rq_svec[0].iov_base;
 	rpclen = rqst->rq_svec[0].iov_len;
 
-	/* build RDMA header in private area at front */
-	headerp = (struct rpcrdma_msg *) req->rl_base;
-	/* don't htonl XID, it's already done in request */
+	headerp = rdmab_to_msg(req->rl_rdmabuf);
+	/* don't byte-swap XID, it's already done in request */
 	headerp->rm_xid = rqst->rq_xid;
-	headerp->rm_vers = xdr_one;
-	headerp->rm_credit = htonl(r_xprt->rx_buf.rb_max_requests);
-	headerp->rm_type = htonl(RDMA_MSG);
+	headerp->rm_vers = rpcrdma_version;
+	headerp->rm_credit = cpu_to_be32(r_xprt->rx_buf.rb_max_requests);
+	headerp->rm_type = rdma_msg;
 
 	/*
 	 * Chunks needed for results?
@@ -468,7 +471,7 @@
 		return -EIO;
 	}
 
-	hdrlen = 28; /*sizeof *headerp;*/
+	hdrlen = RPCRDMA_HDRLEN_MIN;
 	padlen = 0;
 
 	/*
@@ -482,11 +485,11 @@
 						RPCRDMA_INLINE_PAD_VALUE(rqst));
 
 		if (padlen) {
-			headerp->rm_type = htonl(RDMA_MSGP);
+			headerp->rm_type = rdma_msgp;
 			headerp->rm_body.rm_padded.rm_align =
-				htonl(RPCRDMA_INLINE_PAD_VALUE(rqst));
+				cpu_to_be32(RPCRDMA_INLINE_PAD_VALUE(rqst));
 			headerp->rm_body.rm_padded.rm_thresh =
-				htonl(RPCRDMA_INLINE_PAD_THRESH);
+				cpu_to_be32(RPCRDMA_INLINE_PAD_THRESH);
 			headerp->rm_body.rm_padded.rm_pempty[0] = xdr_zero;
 			headerp->rm_body.rm_padded.rm_pempty[1] = xdr_zero;
 			headerp->rm_body.rm_padded.rm_pempty[2] = xdr_zero;
@@ -524,7 +527,7 @@
 	dprintk("RPC:       %s: %s: hdrlen %zd rpclen %zd padlen %zd"
 		" headerp 0x%p base 0x%p lkey 0x%x\n",
 		__func__, transfertypes[req->rl_wtype], hdrlen, rpclen, padlen,
-		headerp, base, req->rl_iov.lkey);
+		headerp, base, rdmab_lkey(req->rl_rdmabuf));
 
 	/*
 	 * initialize send_iov's - normally only two: rdma chunk header and
@@ -533,26 +536,26 @@
 	 * header and any write data. In all non-rdma cases, any following
 	 * data has been copied into the RPC header buffer.
 	 */
-	req->rl_send_iov[0].addr = req->rl_iov.addr;
+	req->rl_send_iov[0].addr = rdmab_addr(req->rl_rdmabuf);
 	req->rl_send_iov[0].length = hdrlen;
-	req->rl_send_iov[0].lkey = req->rl_iov.lkey;
+	req->rl_send_iov[0].lkey = rdmab_lkey(req->rl_rdmabuf);
 
-	req->rl_send_iov[1].addr = req->rl_iov.addr + (base - req->rl_base);
+	req->rl_send_iov[1].addr = rdmab_addr(req->rl_sendbuf);
 	req->rl_send_iov[1].length = rpclen;
-	req->rl_send_iov[1].lkey = req->rl_iov.lkey;
+	req->rl_send_iov[1].lkey = rdmab_lkey(req->rl_sendbuf);
 
 	req->rl_niovs = 2;
 
 	if (padlen) {
 		struct rpcrdma_ep *ep = &r_xprt->rx_ep;
 
-		req->rl_send_iov[2].addr = ep->rep_pad.addr;
+		req->rl_send_iov[2].addr = rdmab_addr(ep->rep_padbuf);
 		req->rl_send_iov[2].length = padlen;
-		req->rl_send_iov[2].lkey = ep->rep_pad.lkey;
+		req->rl_send_iov[2].lkey = rdmab_lkey(ep->rep_padbuf);
 
 		req->rl_send_iov[3].addr = req->rl_send_iov[1].addr + rpclen;
 		req->rl_send_iov[3].length = rqst->rq_slen - rpclen;
-		req->rl_send_iov[3].lkey = req->rl_iov.lkey;
+		req->rl_send_iov[3].lkey = rdmab_lkey(req->rl_sendbuf);
 
 		req->rl_niovs = 4;
 	}
@@ -569,8 +572,9 @@
 {
 	unsigned int i, total_len;
 	struct rpcrdma_write_chunk *cur_wchunk;
+	char *base = (char *)rdmab_to_msg(rep->rr_rdmabuf);
 
-	i = ntohl(**iptrp);	/* get array count */
+	i = be32_to_cpu(**iptrp);
 	if (i > max)
 		return -1;
 	cur_wchunk = (struct rpcrdma_write_chunk *) (*iptrp + 1);
@@ -582,11 +586,11 @@
 			xdr_decode_hyper((__be32 *)&seg->rs_offset, &off);
 			dprintk("RPC:       %s: chunk %d@0x%llx:0x%x\n",
 				__func__,
-				ntohl(seg->rs_length),
+				be32_to_cpu(seg->rs_length),
 				(unsigned long long)off,
-				ntohl(seg->rs_handle));
+				be32_to_cpu(seg->rs_handle));
 		}
-		total_len += ntohl(seg->rs_length);
+		total_len += be32_to_cpu(seg->rs_length);
 		++cur_wchunk;
 	}
 	/* check and adjust for properly terminated write chunk */
@@ -596,7 +600,7 @@
 			return -1;
 		cur_wchunk = (struct rpcrdma_write_chunk *) w;
 	}
-	if ((char *) cur_wchunk > rep->rr_base + rep->rr_len)
+	if ((char *)cur_wchunk > base + rep->rr_len)
 		return -1;
 
 	*iptrp = (__be32 *) cur_wchunk;
@@ -691,7 +695,9 @@
 {
 	struct rpcrdma_ep *ep =
 		container_of(work, struct rpcrdma_ep, rep_connect_worker.work);
-	struct rpc_xprt *xprt = ep->rep_xprt;
+	struct rpcrdma_xprt *r_xprt =
+		container_of(ep, struct rpcrdma_xprt, rx_ep);
+	struct rpc_xprt *xprt = &r_xprt->rx_xprt;
 
 	spin_lock_bh(&xprt->transport_lock);
 	if (++xprt->connect_cookie == 0)	/* maintain a reserved value */
@@ -732,7 +738,7 @@
 	struct rpc_xprt *xprt = rep->rr_xprt;
 	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
 	__be32 *iptr;
-	int rdmalen, status;
+	int credits, rdmalen, status;
 	unsigned long cwnd;
 
 	/* Check status. If bad, signal disconnect and return rep to pool */
@@ -744,14 +750,14 @@
 		}
 		return;
 	}
-	if (rep->rr_len < 28) {
+	if (rep->rr_len < RPCRDMA_HDRLEN_MIN) {
 		dprintk("RPC:       %s: short/invalid reply\n", __func__);
 		goto repost;
 	}
-	headerp = (struct rpcrdma_msg *) rep->rr_base;
-	if (headerp->rm_vers != xdr_one) {
+	headerp = rdmab_to_msg(rep->rr_rdmabuf);
+	if (headerp->rm_vers != rpcrdma_version) {
 		dprintk("RPC:       %s: invalid version %d\n",
-			__func__, ntohl(headerp->rm_vers));
+			__func__, be32_to_cpu(headerp->rm_vers));
 		goto repost;
 	}
 
@@ -762,7 +768,8 @@
 		spin_unlock(&xprt->transport_lock);
 		dprintk("RPC:       %s: reply 0x%p failed "
 			"to match any request xid 0x%08x len %d\n",
-			__func__, rep, headerp->rm_xid, rep->rr_len);
+			__func__, rep, be32_to_cpu(headerp->rm_xid),
+			rep->rr_len);
 repost:
 		r_xprt->rx_stats.bad_reply_count++;
 		rep->rr_func = rpcrdma_reply_handler;
@@ -778,13 +785,14 @@
 		spin_unlock(&xprt->transport_lock);
 		dprintk("RPC:       %s: duplicate reply 0x%p to RPC "
 			"request 0x%p: xid 0x%08x\n", __func__, rep, req,
-			headerp->rm_xid);
+			be32_to_cpu(headerp->rm_xid));
 		goto repost;
 	}
 
 	dprintk("RPC:       %s: reply 0x%p completes request 0x%p\n"
 		"                   RPC request 0x%p xid 0x%08x\n",
-			__func__, rep, req, rqst, headerp->rm_xid);
+			__func__, rep, req, rqst,
+			be32_to_cpu(headerp->rm_xid));
 
 	/* from here on, the reply is no longer an orphan */
 	req->rl_reply = rep;
@@ -793,7 +801,7 @@
 	/* check for expected message types */
 	/* The order of some of these tests is important. */
 	switch (headerp->rm_type) {
-	case htonl(RDMA_MSG):
+	case rdma_msg:
 		/* never expect read chunks */
 		/* never expect reply chunks (two ways to check) */
 		/* never expect write chunks without having offered RDMA */
@@ -824,22 +832,24 @@
 		} else {
 			/* else ordinary inline */
 			rdmalen = 0;
-			iptr = (__be32 *)((unsigned char *)headerp + 28);
-			rep->rr_len -= 28; /*sizeof *headerp;*/
+			iptr = (__be32 *)((unsigned char *)headerp +
+							RPCRDMA_HDRLEN_MIN);
+			rep->rr_len -= RPCRDMA_HDRLEN_MIN;
 			status = rep->rr_len;
 		}
 		/* Fix up the rpc results for upper layer */
 		rpcrdma_inline_fixup(rqst, (char *)iptr, rep->rr_len, rdmalen);
 		break;
 
-	case htonl(RDMA_NOMSG):
+	case rdma_nomsg:
 		/* never expect read or write chunks, always reply chunks */
 		if (headerp->rm_body.rm_chunks[0] != xdr_zero ||
 		    headerp->rm_body.rm_chunks[1] != xdr_zero ||
 		    headerp->rm_body.rm_chunks[2] != xdr_one ||
 		    req->rl_nchunks == 0)
 			goto badheader;
-		iptr = (__be32 *)((unsigned char *)headerp + 28);
+		iptr = (__be32 *)((unsigned char *)headerp +
+							RPCRDMA_HDRLEN_MIN);
 		rdmalen = rpcrdma_count_chunks(rep, req->rl_nchunks, 0, &iptr);
 		if (rdmalen < 0)
 			goto badheader;
@@ -853,7 +863,7 @@
 		dprintk("%s: invalid rpcrdma reply header (type %d):"
 				" chunks[012] == %d %d %d"
 				" expected chunks <= %d\n",
-				__func__, ntohl(headerp->rm_type),
+				__func__, be32_to_cpu(headerp->rm_type),
 				headerp->rm_body.rm_chunks[0],
 				headerp->rm_body.rm_chunks[1],
 				headerp->rm_body.rm_chunks[2],
@@ -863,8 +873,14 @@
 		break;
 	}
 
+	credits = be32_to_cpu(headerp->rm_credit);
+	if (credits == 0)
+		credits = 1;	/* don't deadlock */
+	else if (credits > r_xprt->rx_buf.rb_max_requests)
+		credits = r_xprt->rx_buf.rb_max_requests;
+
 	cwnd = xprt->cwnd;
-	xprt->cwnd = atomic_read(&r_xprt->rx_buf.rb_credits) << RPC_CWNDSHIFT;
+	xprt->cwnd = credits << RPC_CWNDSHIFT;
 	if (xprt->cwnd > cwnd)
 		xprt_release_rqst_cong(rqst->rq_task);
 
diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c
index bbd6155..2e192ba 100644
--- a/net/sunrpc/xprtrdma/transport.c
+++ b/net/sunrpc/xprtrdma/transport.c
@@ -200,9 +200,9 @@
 static void
 xprt_rdma_connect_worker(struct work_struct *work)
 {
-	struct rpcrdma_xprt *r_xprt =
-		container_of(work, struct rpcrdma_xprt, rdma_connect.work);
-	struct rpc_xprt *xprt = &r_xprt->xprt;
+	struct rpcrdma_xprt *r_xprt = container_of(work, struct rpcrdma_xprt,
+						   rx_connect_worker.work);
+	struct rpc_xprt *xprt = &r_xprt->rx_xprt;
 	int rc = 0;
 
 	xprt_clear_connected(xprt);
@@ -235,7 +235,7 @@
 
 	dprintk("RPC:       %s: called\n", __func__);
 
-	cancel_delayed_work_sync(&r_xprt->rdma_connect);
+	cancel_delayed_work_sync(&r_xprt->rx_connect_worker);
 
 	xprt_clear_connected(xprt);
 
@@ -364,8 +364,7 @@
 	 * any inline data. Also specify any padding which will be provided
 	 * from a preregistered zero buffer.
 	 */
-	rc = rpcrdma_buffer_create(&new_xprt->rx_buf, new_ep, &new_xprt->rx_ia,
-				&new_xprt->rx_data);
+	rc = rpcrdma_buffer_create(new_xprt);
 	if (rc)
 		goto out3;
 
@@ -374,9 +373,8 @@
 	 * connection loss notification is async. We also catch connection loss
 	 * when reaping receives.
 	 */
-	INIT_DELAYED_WORK(&new_xprt->rdma_connect, xprt_rdma_connect_worker);
-	new_ep->rep_func = rpcrdma_conn_func;
-	new_ep->rep_xprt = xprt;
+	INIT_DELAYED_WORK(&new_xprt->rx_connect_worker,
+			  xprt_rdma_connect_worker);
 
 	xprt_rdma_format_addresses(xprt);
 	xprt->max_payload = rpcrdma_max_payload(new_xprt);
@@ -434,94 +432,101 @@
 
 	if (r_xprt->rx_ep.rep_connected != 0) {
 		/* Reconnect */
-		schedule_delayed_work(&r_xprt->rdma_connect,
-			xprt->reestablish_timeout);
+		schedule_delayed_work(&r_xprt->rx_connect_worker,
+				      xprt->reestablish_timeout);
 		xprt->reestablish_timeout <<= 1;
 		if (xprt->reestablish_timeout > RPCRDMA_MAX_REEST_TO)
 			xprt->reestablish_timeout = RPCRDMA_MAX_REEST_TO;
 		else if (xprt->reestablish_timeout < RPCRDMA_INIT_REEST_TO)
 			xprt->reestablish_timeout = RPCRDMA_INIT_REEST_TO;
 	} else {
-		schedule_delayed_work(&r_xprt->rdma_connect, 0);
+		schedule_delayed_work(&r_xprt->rx_connect_worker, 0);
 		if (!RPC_IS_ASYNC(task))
-			flush_delayed_work(&r_xprt->rdma_connect);
+			flush_delayed_work(&r_xprt->rx_connect_worker);
 	}
 }
 
 /*
  * The RDMA allocate/free functions need the task structure as a place
  * to hide the struct rpcrdma_req, which is necessary for the actual send/recv
- * sequence. For this reason, the recv buffers are attached to send
- * buffers for portions of the RPC. Note that the RPC layer allocates
- * both send and receive buffers in the same call. We may register
- * the receive buffer portion when using reply chunks.
+ * sequence.
+ *
+ * The RPC layer allocates both send and receive buffers in the same call
+ * (rq_send_buf and rq_rcv_buf are both part of a single contiguous buffer).
+ * We may register rq_rcv_buf when using reply chunks.
  */
 static void *
 xprt_rdma_allocate(struct rpc_task *task, size_t size)
 {
 	struct rpc_xprt *xprt = task->tk_rqstp->rq_xprt;
-	struct rpcrdma_req *req, *nreq;
+	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
+	struct rpcrdma_regbuf *rb;
+	struct rpcrdma_req *req;
+	size_t min_size;
+	gfp_t flags;
 
-	req = rpcrdma_buffer_get(&rpcx_to_rdmax(xprt)->rx_buf);
+	req = rpcrdma_buffer_get(&r_xprt->rx_buf);
 	if (req == NULL)
 		return NULL;
 
-	if (size > req->rl_size) {
-		dprintk("RPC:       %s: size %zd too large for buffer[%zd]: "
-			"prog %d vers %d proc %d\n",
-			__func__, size, req->rl_size,
-			task->tk_client->cl_prog, task->tk_client->cl_vers,
-			task->tk_msg.rpc_proc->p_proc);
-		/*
-		 * Outgoing length shortage. Our inline write max must have
-		 * been configured to perform direct i/o.
-		 *
-		 * This is therefore a large metadata operation, and the
-		 * allocate call was made on the maximum possible message,
-		 * e.g. containing long filename(s) or symlink data. In
-		 * fact, while these metadata operations *might* carry
-		 * large outgoing payloads, they rarely *do*. However, we
-		 * have to commit to the request here, so reallocate and
-		 * register it now. The data path will never require this
-		 * reallocation.
-		 *
-		 * If the allocation or registration fails, the RPC framework
-		 * will (doggedly) retry.
-		 */
-		if (task->tk_flags & RPC_TASK_SWAPPER)
-			nreq = kmalloc(sizeof *req + size, GFP_ATOMIC);
-		else
-			nreq = kmalloc(sizeof *req + size, GFP_NOFS);
-		if (nreq == NULL)
-			goto outfail;
+	flags = GFP_NOIO | __GFP_NOWARN;
+	if (RPC_IS_SWAPPER(task))
+		flags = __GFP_MEMALLOC | GFP_NOWAIT | __GFP_NOWARN;
 
-		if (rpcrdma_register_internal(&rpcx_to_rdmax(xprt)->rx_ia,
-				nreq->rl_base, size + sizeof(struct rpcrdma_req)
-				- offsetof(struct rpcrdma_req, rl_base),
-				&nreq->rl_handle, &nreq->rl_iov)) {
-			kfree(nreq);
-			goto outfail;
-		}
-		rpcx_to_rdmax(xprt)->rx_stats.hardway_register_count += size;
-		nreq->rl_size = size;
-		nreq->rl_niovs = 0;
-		nreq->rl_nchunks = 0;
-		nreq->rl_buffer = (struct rpcrdma_buffer *)req;
-		nreq->rl_reply = req->rl_reply;
-		memcpy(nreq->rl_segments,
-			req->rl_segments, sizeof nreq->rl_segments);
-		/* flag the swap with an unused field */
-		nreq->rl_iov.length = 0;
-		req->rl_reply = NULL;
-		req = nreq;
-	}
+	if (req->rl_rdmabuf == NULL)
+		goto out_rdmabuf;
+	if (req->rl_sendbuf == NULL)
+		goto out_sendbuf;
+	if (size > req->rl_sendbuf->rg_size)
+		goto out_sendbuf;
+
+out:
 	dprintk("RPC:       %s: size %zd, request 0x%p\n", __func__, size, req);
 	req->rl_connect_cookie = 0;	/* our reserved value */
-	return req->rl_xdr_buf;
+	return req->rl_sendbuf->rg_base;
 
-outfail:
+out_rdmabuf:
+	min_size = RPCRDMA_INLINE_WRITE_THRESHOLD(task->tk_rqstp);
+	rb = rpcrdma_alloc_regbuf(&r_xprt->rx_ia, min_size, flags);
+	if (IS_ERR(rb))
+		goto out_fail;
+	req->rl_rdmabuf = rb;
+
+out_sendbuf:
+	/* XDR encoding and RPC/RDMA marshaling of this request has not
+	 * yet occurred. Thus a lower bound is needed to prevent buffer
+	 * overrun during marshaling.
+	 *
+	 * RPC/RDMA marshaling may choose to send payload bearing ops
+	 * inline, if the result is smaller than the inline threshold.
+	 * The value of the "size" argument accounts for header
+	 * requirements but not for the payload in these cases.
+	 *
+	 * Likewise, allocate enough space to receive a reply up to the
+	 * size of the inline threshold.
+	 *
+	 * It's unlikely that both the send header and the received
+	 * reply will be large, but slush is provided here to allow
+	 * flexibility when marshaling.
+	 */
+	min_size = RPCRDMA_INLINE_READ_THRESHOLD(task->tk_rqstp);
+	min_size += RPCRDMA_INLINE_WRITE_THRESHOLD(task->tk_rqstp);
+	if (size < min_size)
+		size = min_size;
+
+	rb = rpcrdma_alloc_regbuf(&r_xprt->rx_ia, size, flags);
+	if (IS_ERR(rb))
+		goto out_fail;
+	rb->rg_owner = req;
+
+	r_xprt->rx_stats.hardway_register_count += size;
+	rpcrdma_free_regbuf(&r_xprt->rx_ia, req->rl_sendbuf);
+	req->rl_sendbuf = rb;
+	goto out;
+
+out_fail:
 	rpcrdma_buffer_put(req);
-	rpcx_to_rdmax(xprt)->rx_stats.failed_marshal_count++;
+	r_xprt->rx_stats.failed_marshal_count++;
 	return NULL;
 }
 
@@ -533,47 +538,24 @@
 {
 	struct rpcrdma_req *req;
 	struct rpcrdma_xprt *r_xprt;
-	struct rpcrdma_rep *rep;
+	struct rpcrdma_regbuf *rb;
 	int i;
 
 	if (buffer == NULL)
 		return;
 
-	req = container_of(buffer, struct rpcrdma_req, rl_xdr_buf[0]);
-	if (req->rl_iov.length == 0) {	/* see allocate above */
-		r_xprt = container_of(((struct rpcrdma_req *) req->rl_buffer)->rl_buffer,
-				      struct rpcrdma_xprt, rx_buf);
-	} else
-		r_xprt = container_of(req->rl_buffer, struct rpcrdma_xprt, rx_buf);
-	rep = req->rl_reply;
+	rb = container_of(buffer, struct rpcrdma_regbuf, rg_base[0]);
+	req = rb->rg_owner;
+	r_xprt = container_of(req->rl_buffer, struct rpcrdma_xprt, rx_buf);
 
-	dprintk("RPC:       %s: called on 0x%p%s\n",
-		__func__, rep, (rep && rep->rr_func) ? " (with waiter)" : "");
+	dprintk("RPC:       %s: called on 0x%p\n", __func__, req->rl_reply);
 
-	/*
-	 * Finish the deregistration.  The process is considered
-	 * complete when the rr_func vector becomes NULL - this
-	 * was put in place during rpcrdma_reply_handler() - the wait
-	 * call below will not block if the dereg is "done". If
-	 * interrupted, our framework will clean up.
-	 */
 	for (i = 0; req->rl_nchunks;) {
 		--req->rl_nchunks;
 		i += rpcrdma_deregister_external(
 			&req->rl_segments[i], r_xprt);
 	}
 
-	if (req->rl_iov.length == 0) {	/* see allocate above */
-		struct rpcrdma_req *oreq = (struct rpcrdma_req *)req->rl_buffer;
-		oreq->rl_reply = req->rl_reply;
-		(void) rpcrdma_deregister_internal(&r_xprt->rx_ia,
-						   req->rl_handle,
-						   &req->rl_iov);
-		kfree(req);
-		req = oreq;
-	}
-
-	/* Put back request+reply buffers */
 	rpcrdma_buffer_put(req);
 }
 
diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index c98e406..124676c 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -49,6 +49,7 @@
 
 #include <linux/interrupt.h>
 #include <linux/slab.h>
+#include <linux/prefetch.h>
 #include <asm/bitops.h>
 
 #include "xprt_rdma.h"
@@ -153,7 +154,7 @@
 		event->device->name, context);
 	if (ep->rep_connected == 1) {
 		ep->rep_connected = -EIO;
-		ep->rep_func(ep);
+		rpcrdma_conn_func(ep);
 		wake_up_all(&ep->rep_connect_wait);
 	}
 }
@@ -168,23 +169,59 @@
 		event->device->name, context);
 	if (ep->rep_connected == 1) {
 		ep->rep_connected = -EIO;
-		ep->rep_func(ep);
+		rpcrdma_conn_func(ep);
 		wake_up_all(&ep->rep_connect_wait);
 	}
 }
 
+static const char * const wc_status[] = {
+	"success",
+	"local length error",
+	"local QP operation error",
+	"local EE context operation error",
+	"local protection error",
+	"WR flushed",
+	"memory management operation error",
+	"bad response error",
+	"local access error",
+	"remote invalid request error",
+	"remote access error",
+	"remote operation error",
+	"transport retry counter exceeded",
+	"RNR retrycounter exceeded",
+	"local RDD violation error",
+	"remove invalid RD request",
+	"operation aborted",
+	"invalid EE context number",
+	"invalid EE context state",
+	"fatal error",
+	"response timeout error",
+	"general error",
+};
+
+#define COMPLETION_MSG(status)					\
+	((status) < ARRAY_SIZE(wc_status) ?			\
+		wc_status[(status)] : "unexpected completion error")
+
 static void
 rpcrdma_sendcq_process_wc(struct ib_wc *wc)
 {
-	struct rpcrdma_mw *frmr = (struct rpcrdma_mw *)(unsigned long)wc->wr_id;
-
-	dprintk("RPC:       %s: frmr %p status %X opcode %d\n",
-		__func__, frmr, wc->status, wc->opcode);
-
-	if (wc->wr_id == 0ULL)
+	if (likely(wc->status == IB_WC_SUCCESS))
 		return;
-	if (wc->status != IB_WC_SUCCESS)
-		frmr->r.frmr.fr_state = FRMR_IS_STALE;
+
+	/* WARNING: Only wr_id and status are reliable at this point */
+	if (wc->wr_id == 0ULL) {
+		if (wc->status != IB_WC_WR_FLUSH_ERR)
+			pr_err("RPC:       %s: SEND: %s\n",
+			       __func__, COMPLETION_MSG(wc->status));
+	} else {
+		struct rpcrdma_mw *r;
+
+		r = (struct rpcrdma_mw *)(unsigned long)wc->wr_id;
+		r->r.frmr.fr_state = FRMR_IS_STALE;
+		pr_err("RPC:       %s: frmr %p (stale): %s\n",
+		       __func__, r, COMPLETION_MSG(wc->status));
+	}
 }
 
 static int
@@ -248,33 +285,32 @@
 	struct rpcrdma_rep *rep =
 			(struct rpcrdma_rep *)(unsigned long)wc->wr_id;
 
-	dprintk("RPC:       %s: rep %p status %X opcode %X length %u\n",
-		__func__, rep, wc->status, wc->opcode, wc->byte_len);
+	/* WARNING: Only wr_id and status are reliable at this point */
+	if (wc->status != IB_WC_SUCCESS)
+		goto out_fail;
 
-	if (wc->status != IB_WC_SUCCESS) {
-		rep->rr_len = ~0U;
-		goto out_schedule;
-	}
+	/* status == SUCCESS means all fields in wc are trustworthy */
 	if (wc->opcode != IB_WC_RECV)
 		return;
 
+	dprintk("RPC:       %s: rep %p opcode 'recv', length %u: success\n",
+		__func__, rep, wc->byte_len);
+
 	rep->rr_len = wc->byte_len;
 	ib_dma_sync_single_for_cpu(rdmab_to_ia(rep->rr_buffer)->ri_id->device,
-			rep->rr_iov.addr, rep->rr_len, DMA_FROM_DEVICE);
-
-	if (rep->rr_len >= 16) {
-		struct rpcrdma_msg *p = (struct rpcrdma_msg *)rep->rr_base;
-		unsigned int credits = ntohl(p->rm_credit);
-
-		if (credits == 0)
-			credits = 1;	/* don't deadlock */
-		else if (credits > rep->rr_buffer->rb_max_requests)
-			credits = rep->rr_buffer->rb_max_requests;
-		atomic_set(&rep->rr_buffer->rb_credits, credits);
-	}
+				   rdmab_addr(rep->rr_rdmabuf),
+				   rep->rr_len, DMA_FROM_DEVICE);
+	prefetch(rdmab_to_msg(rep->rr_rdmabuf));
 
 out_schedule:
 	list_add_tail(&rep->rr_list, sched_list);
+	return;
+out_fail:
+	if (wc->status != IB_WC_WR_FLUSH_ERR)
+		pr_err("RPC:       %s: rep %p: %s\n",
+		       __func__, rep, COMPLETION_MSG(wc->status));
+	rep->rr_len = ~0U;
+	goto out_schedule;
 }
 
 static int
@@ -390,8 +426,8 @@
 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
 	struct sockaddr_in *addr = (struct sockaddr_in *) &ep->rep_remote_addr;
 #endif
-	struct ib_qp_attr attr;
-	struct ib_qp_init_attr iattr;
+	struct ib_qp_attr *attr = &ia->ri_qp_attr;
+	struct ib_qp_init_attr *iattr = &ia->ri_qp_init_attr;
 	int connstate = 0;
 
 	switch (event->event) {
@@ -414,12 +450,13 @@
 		break;
 	case RDMA_CM_EVENT_ESTABLISHED:
 		connstate = 1;
-		ib_query_qp(ia->ri_id->qp, &attr,
-			IB_QP_MAX_QP_RD_ATOMIC | IB_QP_MAX_DEST_RD_ATOMIC,
-			&iattr);
+		ib_query_qp(ia->ri_id->qp, attr,
+			    IB_QP_MAX_QP_RD_ATOMIC | IB_QP_MAX_DEST_RD_ATOMIC,
+			    iattr);
 		dprintk("RPC:       %s: %d responder resources"
 			" (%d initiator)\n",
-			__func__, attr.max_dest_rd_atomic, attr.max_rd_atomic);
+			__func__, attr->max_dest_rd_atomic,
+			attr->max_rd_atomic);
 		goto connected;
 	case RDMA_CM_EVENT_CONNECT_ERROR:
 		connstate = -ENOTCONN;
@@ -436,11 +473,10 @@
 	case RDMA_CM_EVENT_DEVICE_REMOVAL:
 		connstate = -ENODEV;
 connected:
-		atomic_set(&rpcx_to_rdmax(ep->rep_xprt)->rx_buf.rb_credits, 1);
 		dprintk("RPC:       %s: %sconnected\n",
 					__func__, connstate > 0 ? "" : "dis");
 		ep->rep_connected = connstate;
-		ep->rep_func(ep);
+		rpcrdma_conn_func(ep);
 		wake_up_all(&ep->rep_connect_wait);
 		/*FALLTHROUGH*/
 	default:
@@ -453,7 +489,7 @@
 
 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
 	if (connstate == 1) {
-		int ird = attr.max_dest_rd_atomic;
+		int ird = attr->max_dest_rd_atomic;
 		int tird = ep->rep_remote_cma.responder_resources;
 		printk(KERN_INFO "rpcrdma: connection to %pI4:%u "
 			"on %s, memreg %d slots %d ird %d%s\n",
@@ -554,8 +590,8 @@
 rpcrdma_ia_open(struct rpcrdma_xprt *xprt, struct sockaddr *addr, int memreg)
 {
 	int rc, mem_priv;
-	struct ib_device_attr devattr;
 	struct rpcrdma_ia *ia = &xprt->rx_ia;
+	struct ib_device_attr *devattr = &ia->ri_devattr;
 
 	ia->ri_id = rpcrdma_create_id(xprt, ia, addr);
 	if (IS_ERR(ia->ri_id)) {
@@ -571,26 +607,21 @@
 		goto out2;
 	}
 
-	/*
-	 * Query the device to determine if the requested memory
-	 * registration strategy is supported. If it isn't, set the
-	 * strategy to a globally supported model.
-	 */
-	rc = ib_query_device(ia->ri_id->device, &devattr);
+	rc = ib_query_device(ia->ri_id->device, devattr);
 	if (rc) {
 		dprintk("RPC:       %s: ib_query_device failed %d\n",
 			__func__, rc);
-		goto out2;
+		goto out3;
 	}
 
-	if (devattr.device_cap_flags & IB_DEVICE_LOCAL_DMA_LKEY) {
+	if (devattr->device_cap_flags & IB_DEVICE_LOCAL_DMA_LKEY) {
 		ia->ri_have_dma_lkey = 1;
 		ia->ri_dma_lkey = ia->ri_id->device->local_dma_lkey;
 	}
 
 	if (memreg == RPCRDMA_FRMR) {
 		/* Requires both frmr reg and local dma lkey */
-		if ((devattr.device_cap_flags &
+		if ((devattr->device_cap_flags &
 		     (IB_DEVICE_MEM_MGT_EXTENSIONS|IB_DEVICE_LOCAL_DMA_LKEY)) !=
 		    (IB_DEVICE_MEM_MGT_EXTENSIONS|IB_DEVICE_LOCAL_DMA_LKEY)) {
 			dprintk("RPC:       %s: FRMR registration "
@@ -600,7 +631,7 @@
 			/* Mind the ia limit on FRMR page list depth */
 			ia->ri_max_frmr_depth = min_t(unsigned int,
 				RPCRDMA_MAX_DATA_SEGS,
-				devattr.max_fast_reg_page_list_len);
+				devattr->max_fast_reg_page_list_len);
 		}
 	}
 	if (memreg == RPCRDMA_MTHCAFMR) {
@@ -638,14 +669,14 @@
 				"phys register failed with %lX\n",
 				__func__, PTR_ERR(ia->ri_bind_mem));
 			rc = -ENOMEM;
-			goto out2;
+			goto out3;
 		}
 		break;
 	default:
 		printk(KERN_ERR "RPC: Unsupported memory "
 				"registration mode: %d\n", memreg);
 		rc = -ENOMEM;
-		goto out2;
+		goto out3;
 	}
 	dprintk("RPC:       %s: memory registration strategy is %d\n",
 		__func__, memreg);
@@ -655,6 +686,10 @@
 
 	rwlock_init(&ia->ri_qplock);
 	return 0;
+
+out3:
+	ib_dealloc_pd(ia->ri_pd);
+	ia->ri_pd = NULL;
 out2:
 	rdma_destroy_id(ia->ri_id);
 	ia->ri_id = NULL;
@@ -698,20 +733,13 @@
 rpcrdma_ep_create(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia,
 				struct rpcrdma_create_data_internal *cdata)
 {
-	struct ib_device_attr devattr;
+	struct ib_device_attr *devattr = &ia->ri_devattr;
 	struct ib_cq *sendcq, *recvcq;
 	int rc, err;
 
-	rc = ib_query_device(ia->ri_id->device, &devattr);
-	if (rc) {
-		dprintk("RPC:       %s: ib_query_device failed %d\n",
-			__func__, rc);
-		return rc;
-	}
-
 	/* check provider's send/recv wr limits */
-	if (cdata->max_requests > devattr.max_qp_wr)
-		cdata->max_requests = devattr.max_qp_wr;
+	if (cdata->max_requests > devattr->max_qp_wr)
+		cdata->max_requests = devattr->max_qp_wr;
 
 	ep->rep_attr.event_handler = rpcrdma_qp_async_error_upcall;
 	ep->rep_attr.qp_context = ep;
@@ -746,8 +774,8 @@
 
 		}
 		ep->rep_attr.cap.max_send_wr *= depth;
-		if (ep->rep_attr.cap.max_send_wr > devattr.max_qp_wr) {
-			cdata->max_requests = devattr.max_qp_wr / depth;
+		if (ep->rep_attr.cap.max_send_wr > devattr->max_qp_wr) {
+			cdata->max_requests = devattr->max_qp_wr / depth;
 			if (!cdata->max_requests)
 				return -EINVAL;
 			ep->rep_attr.cap.max_send_wr = cdata->max_requests *
@@ -766,6 +794,14 @@
 	ep->rep_attr.qp_type = IB_QPT_RC;
 	ep->rep_attr.port_num = ~0;
 
+	if (cdata->padding) {
+		ep->rep_padbuf = rpcrdma_alloc_regbuf(ia, cdata->padding,
+						      GFP_KERNEL);
+		if (IS_ERR(ep->rep_padbuf))
+			return PTR_ERR(ep->rep_padbuf);
+	} else
+		ep->rep_padbuf = NULL;
+
 	dprintk("RPC:       %s: requested max: dtos: send %d recv %d; "
 		"iovs: send %d recv %d\n",
 		__func__,
@@ -781,7 +817,6 @@
 	else if (ep->rep_cqinit <= 2)
 		ep->rep_cqinit = 0;
 	INIT_CQCOUNT(ep);
-	ep->rep_ia = ia;
 	init_waitqueue_head(&ep->rep_connect_wait);
 	INIT_DELAYED_WORK(&ep->rep_connect_worker, rpcrdma_connect_worker);
 
@@ -831,10 +866,11 @@
 
 	/* Client offers RDMA Read but does not initiate */
 	ep->rep_remote_cma.initiator_depth = 0;
-	if (devattr.max_qp_rd_atom > 32)	/* arbitrary but <= 255 */
+	if (devattr->max_qp_rd_atom > 32)	/* arbitrary but <= 255 */
 		ep->rep_remote_cma.responder_resources = 32;
 	else
-		ep->rep_remote_cma.responder_resources = devattr.max_qp_rd_atom;
+		ep->rep_remote_cma.responder_resources =
+						devattr->max_qp_rd_atom;
 
 	ep->rep_remote_cma.retry_count = 7;
 	ep->rep_remote_cma.flow_control = 0;
@@ -848,6 +884,7 @@
 		dprintk("RPC:       %s: ib_destroy_cq returned %i\n",
 			__func__, err);
 out1:
+	rpcrdma_free_regbuf(ia, ep->rep_padbuf);
 	return rc;
 }
 
@@ -874,11 +911,7 @@
 		ia->ri_id->qp = NULL;
 	}
 
-	/* padding - could be done in rpcrdma_buffer_destroy... */
-	if (ep->rep_pad_mr) {
-		rpcrdma_deregister_internal(ia, ep->rep_pad_mr, &ep->rep_pad);
-		ep->rep_pad_mr = NULL;
-	}
+	rpcrdma_free_regbuf(ia, ep->rep_padbuf);
 
 	rpcrdma_clean_cq(ep->rep_attr.recv_cq);
 	rc = ib_destroy_cq(ep->rep_attr.recv_cq);
@@ -1048,6 +1081,48 @@
 	}
 }
 
+static struct rpcrdma_req *
+rpcrdma_create_req(struct rpcrdma_xprt *r_xprt)
+{
+	struct rpcrdma_req *req;
+
+	req = kzalloc(sizeof(*req), GFP_KERNEL);
+	if (req == NULL)
+		return ERR_PTR(-ENOMEM);
+
+	req->rl_buffer = &r_xprt->rx_buf;
+	return req;
+}
+
+static struct rpcrdma_rep *
+rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt)
+{
+	struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
+	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
+	struct rpcrdma_rep *rep;
+	int rc;
+
+	rc = -ENOMEM;
+	rep = kzalloc(sizeof(*rep), GFP_KERNEL);
+	if (rep == NULL)
+		goto out;
+
+	rep->rr_rdmabuf = rpcrdma_alloc_regbuf(ia, cdata->inline_rsize,
+					       GFP_KERNEL);
+	if (IS_ERR(rep->rr_rdmabuf)) {
+		rc = PTR_ERR(rep->rr_rdmabuf);
+		goto out_free;
+	}
+
+	rep->rr_buffer = &r_xprt->rx_buf;
+	return rep;
+
+out_free:
+	kfree(rep);
+out:
+	return ERR_PTR(rc);
+}
+
 static int
 rpcrdma_init_fmrs(struct rpcrdma_ia *ia, struct rpcrdma_buffer *buf)
 {
@@ -1134,27 +1209,26 @@
 }
 
 int
-rpcrdma_buffer_create(struct rpcrdma_buffer *buf, struct rpcrdma_ep *ep,
-	struct rpcrdma_ia *ia, struct rpcrdma_create_data_internal *cdata)
+rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
 {
+	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
+	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
+	struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
 	char *p;
-	size_t len, rlen, wlen;
+	size_t len;
 	int i, rc;
 
 	buf->rb_max_requests = cdata->max_requests;
 	spin_lock_init(&buf->rb_lock);
-	atomic_set(&buf->rb_credits, 1);
 
 	/* Need to allocate:
 	 *   1.  arrays for send and recv pointers
 	 *   2.  arrays of struct rpcrdma_req to fill in pointers
 	 *   3.  array of struct rpcrdma_rep for replies
-	 *   4.  padding, if any
 	 * Send/recv buffers in req/rep need to be registered
 	 */
 	len = buf->rb_max_requests *
 		(sizeof(struct rpcrdma_req *) + sizeof(struct rpcrdma_rep *));
-	len += cdata->padding;
 
 	p = kzalloc(len, GFP_KERNEL);
 	if (p == NULL) {
@@ -1170,17 +1244,6 @@
 	buf->rb_recv_bufs = (struct rpcrdma_rep **) p;
 	p = (char *) &buf->rb_recv_bufs[buf->rb_max_requests];
 
-	/*
-	 * Register the zeroed pad buffer, if any.
-	 */
-	if (cdata->padding) {
-		rc = rpcrdma_register_internal(ia, p, cdata->padding,
-					    &ep->rep_pad_mr, &ep->rep_pad);
-		if (rc)
-			goto out;
-	}
-	p += cdata->padding;
-
 	INIT_LIST_HEAD(&buf->rb_mws);
 	INIT_LIST_HEAD(&buf->rb_all);
 	switch (ia->ri_memreg_strategy) {
@@ -1198,62 +1261,29 @@
 		break;
 	}
 
-	/*
-	 * Allocate/init the request/reply buffers. Doing this
-	 * using kmalloc for now -- one for each buf.
-	 */
-	wlen = 1 << fls(cdata->inline_wsize + sizeof(struct rpcrdma_req));
-	rlen = 1 << fls(cdata->inline_rsize + sizeof(struct rpcrdma_rep));
-	dprintk("RPC:       %s: wlen = %zu, rlen = %zu\n",
-		__func__, wlen, rlen);
-
 	for (i = 0; i < buf->rb_max_requests; i++) {
 		struct rpcrdma_req *req;
 		struct rpcrdma_rep *rep;
 
-		req = kmalloc(wlen, GFP_KERNEL);
-		if (req == NULL) {
+		req = rpcrdma_create_req(r_xprt);
+		if (IS_ERR(req)) {
 			dprintk("RPC:       %s: request buffer %d alloc"
 				" failed\n", __func__, i);
-			rc = -ENOMEM;
+			rc = PTR_ERR(req);
 			goto out;
 		}
-		memset(req, 0, sizeof(struct rpcrdma_req));
 		buf->rb_send_bufs[i] = req;
-		buf->rb_send_bufs[i]->rl_buffer = buf;
 
-		rc = rpcrdma_register_internal(ia, req->rl_base,
-				wlen - offsetof(struct rpcrdma_req, rl_base),
-				&buf->rb_send_bufs[i]->rl_handle,
-				&buf->rb_send_bufs[i]->rl_iov);
-		if (rc)
-			goto out;
-
-		buf->rb_send_bufs[i]->rl_size = wlen -
-						sizeof(struct rpcrdma_req);
-
-		rep = kmalloc(rlen, GFP_KERNEL);
-		if (rep == NULL) {
+		rep = rpcrdma_create_rep(r_xprt);
+		if (IS_ERR(rep)) {
 			dprintk("RPC:       %s: reply buffer %d alloc failed\n",
 				__func__, i);
-			rc = -ENOMEM;
+			rc = PTR_ERR(rep);
 			goto out;
 		}
-		memset(rep, 0, sizeof(struct rpcrdma_rep));
 		buf->rb_recv_bufs[i] = rep;
-		buf->rb_recv_bufs[i]->rr_buffer = buf;
-
-		rc = rpcrdma_register_internal(ia, rep->rr_base,
-				rlen - offsetof(struct rpcrdma_rep, rr_base),
-				&buf->rb_recv_bufs[i]->rr_handle,
-				&buf->rb_recv_bufs[i]->rr_iov);
-		if (rc)
-			goto out;
-
 	}
-	dprintk("RPC:       %s: max_requests %d\n",
-		__func__, buf->rb_max_requests);
-	/* done */
+
 	return 0;
 out:
 	rpcrdma_buffer_destroy(buf);
@@ -1261,6 +1291,27 @@
 }
 
 static void
+rpcrdma_destroy_rep(struct rpcrdma_ia *ia, struct rpcrdma_rep *rep)
+{
+	if (!rep)
+		return;
+
+	rpcrdma_free_regbuf(ia, rep->rr_rdmabuf);
+	kfree(rep);
+}
+
+static void
+rpcrdma_destroy_req(struct rpcrdma_ia *ia, struct rpcrdma_req *req)
+{
+	if (!req)
+		return;
+
+	rpcrdma_free_regbuf(ia, req->rl_sendbuf);
+	rpcrdma_free_regbuf(ia, req->rl_rdmabuf);
+	kfree(req);
+}
+
+static void
 rpcrdma_destroy_fmrs(struct rpcrdma_buffer *buf)
 {
 	struct rpcrdma_mw *r;
@@ -1315,18 +1366,10 @@
 	dprintk("RPC:       %s: entering\n", __func__);
 
 	for (i = 0; i < buf->rb_max_requests; i++) {
-		if (buf->rb_recv_bufs && buf->rb_recv_bufs[i]) {
-			rpcrdma_deregister_internal(ia,
-					buf->rb_recv_bufs[i]->rr_handle,
-					&buf->rb_recv_bufs[i]->rr_iov);
-			kfree(buf->rb_recv_bufs[i]);
-		}
-		if (buf->rb_send_bufs && buf->rb_send_bufs[i]) {
-			rpcrdma_deregister_internal(ia,
-					buf->rb_send_bufs[i]->rl_handle,
-					&buf->rb_send_bufs[i]->rl_iov);
-			kfree(buf->rb_send_bufs[i]);
-		}
+		if (buf->rb_recv_bufs)
+			rpcrdma_destroy_rep(ia, buf->rb_recv_bufs[i]);
+		if (buf->rb_send_bufs)
+			rpcrdma_destroy_req(ia, buf->rb_send_bufs[i]);
 	}
 
 	switch (ia->ri_memreg_strategy) {
@@ -1450,8 +1493,8 @@
 	int i;
 
 	for (i = 1, seg++; i < RPCRDMA_MAX_SEGS; seg++, i++)
-		rpcrdma_buffer_put_mr(&seg->mr_chunk.rl_mw, buf);
-	rpcrdma_buffer_put_mr(&seg1->mr_chunk.rl_mw, buf);
+		rpcrdma_buffer_put_mr(&seg->rl_mw, buf);
+	rpcrdma_buffer_put_mr(&seg1->rl_mw, buf);
 }
 
 static void
@@ -1537,7 +1580,7 @@
 			list_add(&r->mw_list, stale);
 			continue;
 		}
-		req->rl_segments[i].mr_chunk.rl_mw = r;
+		req->rl_segments[i].rl_mw = r;
 		if (unlikely(i-- == 0))
 			return req;	/* Success */
 	}
@@ -1559,7 +1602,7 @@
 		r = list_entry(buf->rb_mws.next,
 			       struct rpcrdma_mw, mw_list);
 		list_del(&r->mw_list);
-		req->rl_segments[i].mr_chunk.rl_mw = r;
+		req->rl_segments[i].rl_mw = r;
 		if (unlikely(i-- == 0))
 			return req;	/* Success */
 	}
@@ -1658,8 +1701,6 @@
 	struct rpcrdma_buffer *buffers = req->rl_buffer;
 	unsigned long flags;
 
-	if (req->rl_iov.length == 0)	/* special case xprt_rdma_allocate() */
-		buffers = ((struct rpcrdma_req *) buffers)->rl_buffer;
 	spin_lock_irqsave(&buffers->rb_lock, flags);
 	if (buffers->rb_recv_index < buffers->rb_max_requests) {
 		req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index];
@@ -1688,7 +1729,7 @@
  * Wrappers for internal-use kmalloc memory registration, used by buffer code.
  */
 
-int
+static int
 rpcrdma_register_internal(struct rpcrdma_ia *ia, void *va, int len,
 				struct ib_mr **mrp, struct ib_sge *iov)
 {
@@ -1739,7 +1780,7 @@
 	return rc;
 }
 
-int
+static int
 rpcrdma_deregister_internal(struct rpcrdma_ia *ia,
 				struct ib_mr *mr, struct ib_sge *iov)
 {
@@ -1757,6 +1798,61 @@
 	return rc;
 }
 
+/**
+ * rpcrdma_alloc_regbuf - kmalloc and register memory for SEND/RECV buffers
+ * @ia: controlling rpcrdma_ia
+ * @size: size of buffer to be allocated, in bytes
+ * @flags: GFP flags
+ *
+ * Returns pointer to private header of an area of internally
+ * registered memory, or an ERR_PTR. The registered buffer follows
+ * the end of the private header.
+ *
+ * xprtrdma uses a regbuf for posting an outgoing RDMA SEND, or for
+ * receiving the payload of RDMA RECV operations. regbufs are not
+ * used for RDMA READ/WRITE operations, thus are registered only for
+ * LOCAL access.
+ */
+struct rpcrdma_regbuf *
+rpcrdma_alloc_regbuf(struct rpcrdma_ia *ia, size_t size, gfp_t flags)
+{
+	struct rpcrdma_regbuf *rb;
+	int rc;
+
+	rc = -ENOMEM;
+	rb = kmalloc(sizeof(*rb) + size, flags);
+	if (rb == NULL)
+		goto out;
+
+	rb->rg_size = size;
+	rb->rg_owner = NULL;
+	rc = rpcrdma_register_internal(ia, rb->rg_base, size,
+				       &rb->rg_mr, &rb->rg_iov);
+	if (rc)
+		goto out_free;
+
+	return rb;
+
+out_free:
+	kfree(rb);
+out:
+	return ERR_PTR(rc);
+}
+
+/**
+ * rpcrdma_free_regbuf - deregister and free registered buffer
+ * @ia: controlling rpcrdma_ia
+ * @rb: regbuf to be deregistered and freed
+ */
+void
+rpcrdma_free_regbuf(struct rpcrdma_ia *ia, struct rpcrdma_regbuf *rb)
+{
+	if (rb) {
+		rpcrdma_deregister_internal(ia, rb->rg_mr, &rb->rg_iov);
+		kfree(rb);
+	}
+}
+
 /*
  * Wrappers for chunk registration, shared by read/write chunk code.
  */
@@ -1799,7 +1895,7 @@
 			struct rpcrdma_xprt *r_xprt)
 {
 	struct rpcrdma_mr_seg *seg1 = seg;
-	struct rpcrdma_mw *mw = seg1->mr_chunk.rl_mw;
+	struct rpcrdma_mw *mw = seg1->rl_mw;
 	struct rpcrdma_frmr *frmr = &mw->r.frmr;
 	struct ib_mr *mr = frmr->fr_mr;
 	struct ib_send_wr fastreg_wr, *bad_wr;
@@ -1888,12 +1984,12 @@
 	struct ib_send_wr invalidate_wr, *bad_wr;
 	int rc;
 
-	seg1->mr_chunk.rl_mw->r.frmr.fr_state = FRMR_IS_INVALID;
+	seg1->rl_mw->r.frmr.fr_state = FRMR_IS_INVALID;
 
 	memset(&invalidate_wr, 0, sizeof invalidate_wr);
-	invalidate_wr.wr_id = (unsigned long)(void *)seg1->mr_chunk.rl_mw;
+	invalidate_wr.wr_id = (unsigned long)(void *)seg1->rl_mw;
 	invalidate_wr.opcode = IB_WR_LOCAL_INV;
-	invalidate_wr.ex.invalidate_rkey = seg1->mr_chunk.rl_mw->r.frmr.fr_mr->rkey;
+	invalidate_wr.ex.invalidate_rkey = seg1->rl_mw->r.frmr.fr_mr->rkey;
 	DECR_CQCOUNT(&r_xprt->rx_ep);
 
 	read_lock(&ia->ri_qplock);
@@ -1903,7 +1999,7 @@
 	read_unlock(&ia->ri_qplock);
 	if (rc) {
 		/* Force rpcrdma_buffer_get() to retry */
-		seg1->mr_chunk.rl_mw->r.frmr.fr_state = FRMR_IS_STALE;
+		seg1->rl_mw->r.frmr.fr_state = FRMR_IS_STALE;
 		dprintk("RPC:       %s: failed ib_post_send for invalidate,"
 			" status %i\n", __func__, rc);
 	}
@@ -1935,8 +2031,7 @@
 		    offset_in_page((seg-1)->mr_offset + (seg-1)->mr_len))
 			break;
 	}
-	rc = ib_map_phys_fmr(seg1->mr_chunk.rl_mw->r.fmr,
-				physaddrs, i, seg1->mr_dma);
+	rc = ib_map_phys_fmr(seg1->rl_mw->r.fmr, physaddrs, i, seg1->mr_dma);
 	if (rc) {
 		dprintk("RPC:       %s: failed ib_map_phys_fmr "
 			"%u@0x%llx+%i (%d)... status %i\n", __func__,
@@ -1945,7 +2040,7 @@
 		while (i--)
 			rpcrdma_unmap_one(ia, --seg);
 	} else {
-		seg1->mr_rkey = seg1->mr_chunk.rl_mw->r.fmr->rkey;
+		seg1->mr_rkey = seg1->rl_mw->r.fmr->rkey;
 		seg1->mr_base = seg1->mr_dma + pageoff;
 		seg1->mr_nsegs = i;
 		seg1->mr_len = len;
@@ -1962,7 +2057,7 @@
 	LIST_HEAD(l);
 	int rc;
 
-	list_add(&seg1->mr_chunk.rl_mw->r.fmr->list, &l);
+	list_add(&seg1->rl_mw->r.fmr->list, &l);
 	rc = ib_unmap_fmr(&l);
 	read_lock(&ia->ri_qplock);
 	while (seg1->mr_nsegs--)
@@ -2104,11 +2199,13 @@
 
 	recv_wr.next = NULL;
 	recv_wr.wr_id = (u64) (unsigned long) rep;
-	recv_wr.sg_list = &rep->rr_iov;
+	recv_wr.sg_list = &rep->rr_rdmabuf->rg_iov;
 	recv_wr.num_sge = 1;
 
 	ib_dma_sync_single_for_cpu(ia->ri_id->device,
-		rep->rr_iov.addr, rep->rr_iov.length, DMA_BIDIRECTIONAL);
+				   rdmab_addr(rep->rr_rdmabuf),
+				   rdmab_length(rep->rr_rdmabuf),
+				   DMA_BIDIRECTIONAL);
 
 	rc = ib_post_recv(ia->ri_id->qp, &recv_wr, &recv_wr_fail);
 
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index b799041..c9d2a02 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -70,6 +70,9 @@
 	int			ri_async_rc;
 	enum rpcrdma_memreg	ri_memreg_strategy;
 	unsigned int		ri_max_frmr_depth;
+	struct ib_device_attr	ri_devattr;
+	struct ib_qp_attr	ri_qp_attr;
+	struct ib_qp_init_attr	ri_qp_init_attr;
 };
 
 /*
@@ -83,13 +86,9 @@
 	atomic_t		rep_cqcount;
 	int			rep_cqinit;
 	int			rep_connected;
-	struct rpcrdma_ia	*rep_ia;
 	struct ib_qp_init_attr	rep_attr;
 	wait_queue_head_t 	rep_connect_wait;
-	struct ib_sge		rep_pad;	/* holds zeroed pad */
-	struct ib_mr		*rep_pad_mr;	/* holds zeroed pad */
-	void			(*rep_func)(struct rpcrdma_ep *);
-	struct rpc_xprt		*rep_xprt;	/* for rep_func */
+	struct rpcrdma_regbuf	*rep_padbuf;
 	struct rdma_conn_param	rep_remote_cma;
 	struct sockaddr_storage	rep_remote_addr;
 	struct delayed_work	rep_connect_worker;
@@ -106,6 +105,44 @@
 #define INIT_CQCOUNT(ep) atomic_set(&(ep)->rep_cqcount, (ep)->rep_cqinit)
 #define DECR_CQCOUNT(ep) atomic_sub_return(1, &(ep)->rep_cqcount)
 
+/* Registered buffer -- registered kmalloc'd memory for RDMA SEND/RECV
+ *
+ * The below structure appears at the front of a large region of kmalloc'd
+ * memory, which always starts on a good alignment boundary.
+ */
+
+struct rpcrdma_regbuf {
+	size_t			rg_size;
+	struct rpcrdma_req	*rg_owner;
+	struct ib_mr		*rg_mr;
+	struct ib_sge		rg_iov;
+	__be32			rg_base[0] __attribute__ ((aligned(256)));
+};
+
+static inline u64
+rdmab_addr(struct rpcrdma_regbuf *rb)
+{
+	return rb->rg_iov.addr;
+}
+
+static inline u32
+rdmab_length(struct rpcrdma_regbuf *rb)
+{
+	return rb->rg_iov.length;
+}
+
+static inline u32
+rdmab_lkey(struct rpcrdma_regbuf *rb)
+{
+	return rb->rg_iov.lkey;
+}
+
+static inline struct rpcrdma_msg *
+rdmab_to_msg(struct rpcrdma_regbuf *rb)
+{
+	return (struct rpcrdma_msg *)rb->rg_base;
+}
+
 enum rpcrdma_chunktype {
 	rpcrdma_noch = 0,
 	rpcrdma_readch,
@@ -134,22 +171,16 @@
 /* temporary static scatter/gather max */
 #define RPCRDMA_MAX_DATA_SEGS	(64)	/* max scatter/gather */
 #define RPCRDMA_MAX_SEGS 	(RPCRDMA_MAX_DATA_SEGS + 2) /* head+tail = 2 */
-#define MAX_RPCRDMAHDR	(\
-	/* max supported RPC/RDMA header */ \
-	sizeof(struct rpcrdma_msg) + (2 * sizeof(u32)) + \
-	(sizeof(struct rpcrdma_read_chunk) * RPCRDMA_MAX_SEGS) + sizeof(u32))
 
 struct rpcrdma_buffer;
 
 struct rpcrdma_rep {
-	unsigned int	rr_len;		/* actual received reply length */
-	struct rpcrdma_buffer *rr_buffer; /* home base for this structure */
-	struct rpc_xprt	*rr_xprt;	/* needed for request/reply matching */
-	void (*rr_func)(struct rpcrdma_rep *);/* called by tasklet in softint */
-	struct list_head rr_list;	/* tasklet list */
-	struct ib_sge	rr_iov;		/* for posting */
-	struct ib_mr	*rr_handle;	/* handle for mem in rr_iov */
-	char	rr_base[MAX_RPCRDMAHDR]; /* minimal inline receive buffer */
+	unsigned int		rr_len;
+	struct rpcrdma_buffer	*rr_buffer;
+	struct rpc_xprt		*rr_xprt;
+	void			(*rr_func)(struct rpcrdma_rep *);
+	struct list_head	rr_list;
+	struct rpcrdma_regbuf	*rr_rdmabuf;
 };
 
 /*
@@ -211,10 +242,7 @@
  */
 
 struct rpcrdma_mr_seg {		/* chunk descriptors */
-	union {				/* chunk memory handles */
-		struct ib_mr	*rl_mr;		/* if registered directly */
-		struct rpcrdma_mw *rl_mw;	/* if registered from region */
-	} mr_chunk;
+	struct rpcrdma_mw *rl_mw;	/* registered MR */
 	u64		mr_base;	/* registration result */
 	u32		mr_rkey;	/* registration result */
 	u32		mr_len;		/* length of chunk or segment */
@@ -227,22 +255,26 @@
 };
 
 struct rpcrdma_req {
-	size_t 		rl_size;	/* actual length of buffer */
 	unsigned int	rl_niovs;	/* 0, 2 or 4 */
 	unsigned int	rl_nchunks;	/* non-zero if chunks */
 	unsigned int	rl_connect_cookie;	/* retry detection */
 	enum rpcrdma_chunktype	rl_rtype, rl_wtype;
 	struct rpcrdma_buffer *rl_buffer; /* home base for this structure */
 	struct rpcrdma_rep	*rl_reply;/* holder for reply buffer */
-	struct rpcrdma_mr_seg rl_segments[RPCRDMA_MAX_SEGS];/* chunk segments */
 	struct ib_sge	rl_send_iov[4];	/* for active requests */
-	struct ib_sge	rl_iov;		/* for posting */
-	struct ib_mr	*rl_handle;	/* handle for mem in rl_iov */
-	char		rl_base[MAX_RPCRDMAHDR]; /* start of actual buffer */
-	__u32 		rl_xdr_buf[0];	/* start of returned rpc rq_buffer */
+	struct rpcrdma_regbuf *rl_rdmabuf;
+	struct rpcrdma_regbuf *rl_sendbuf;
+	struct rpcrdma_mr_seg rl_segments[RPCRDMA_MAX_SEGS];
 };
-#define rpcr_to_rdmar(r) \
-	container_of((r)->rq_buffer, struct rpcrdma_req, rl_xdr_buf[0])
+
+static inline struct rpcrdma_req *
+rpcr_to_rdmar(struct rpc_rqst *rqst)
+{
+	struct rpcrdma_regbuf *rb = container_of(rqst->rq_buffer,
+						 struct rpcrdma_regbuf,
+						 rg_base[0]);
+	return rb->rg_owner;
+}
 
 /*
  * struct rpcrdma_buffer -- holds list/queue of pre-registered memory for
@@ -252,7 +284,6 @@
  */
 struct rpcrdma_buffer {
 	spinlock_t	rb_lock;	/* protects indexes */
-	atomic_t	rb_credits;	/* most recent server credits */
 	int		rb_max_requests;/* client max requests */
 	struct list_head rb_mws;	/* optional memory windows/fmrs/frmrs */
 	struct list_head rb_all;
@@ -318,16 +349,16 @@
  * during unmount.
  */
 struct rpcrdma_xprt {
-	struct rpc_xprt		xprt;
+	struct rpc_xprt		rx_xprt;
 	struct rpcrdma_ia	rx_ia;
 	struct rpcrdma_ep	rx_ep;
 	struct rpcrdma_buffer	rx_buf;
 	struct rpcrdma_create_data_internal rx_data;
-	struct delayed_work	rdma_connect;
+	struct delayed_work	rx_connect_worker;
 	struct rpcrdma_stats	rx_stats;
 };
 
-#define rpcx_to_rdmax(x) container_of(x, struct rpcrdma_xprt, xprt)
+#define rpcx_to_rdmax(x) container_of(x, struct rpcrdma_xprt, rx_xprt)
 #define rpcx_to_rdmad(x) (rpcx_to_rdmax(x)->rx_data)
 
 /* Setting this to 0 ensures interoperability with early servers.
@@ -358,9 +389,7 @@
 /*
  * Buffer calls - xprtrdma/verbs.c
  */
-int rpcrdma_buffer_create(struct rpcrdma_buffer *, struct rpcrdma_ep *,
-				struct rpcrdma_ia *,
-				struct rpcrdma_create_data_internal *);
+int rpcrdma_buffer_create(struct rpcrdma_xprt *);
 void rpcrdma_buffer_destroy(struct rpcrdma_buffer *);
 
 struct rpcrdma_req *rpcrdma_buffer_get(struct rpcrdma_buffer *);
@@ -368,16 +397,16 @@
 void rpcrdma_recv_buffer_get(struct rpcrdma_req *);
 void rpcrdma_recv_buffer_put(struct rpcrdma_rep *);
 
-int rpcrdma_register_internal(struct rpcrdma_ia *, void *, int,
-				struct ib_mr **, struct ib_sge *);
-int rpcrdma_deregister_internal(struct rpcrdma_ia *,
-				struct ib_mr *, struct ib_sge *);
-
 int rpcrdma_register_external(struct rpcrdma_mr_seg *,
 				int, int, struct rpcrdma_xprt *);
 int rpcrdma_deregister_external(struct rpcrdma_mr_seg *,
 				struct rpcrdma_xprt *);
 
+struct rpcrdma_regbuf *rpcrdma_alloc_regbuf(struct rpcrdma_ia *,
+					    size_t, gfp_t);
+void rpcrdma_free_regbuf(struct rpcrdma_ia *,
+			 struct rpcrdma_regbuf *);
+
 /*
  * RPC/RDMA connection management calls - xprtrdma/rpc_rdma.c
  */