Merge tag 'nfs-for-4.2-1' of git://git.linux-nfs.org/projects/trondmy/linux-nfs

Pull NFS client updates from Trond Myklebust:
 "Highlights include:

  Stable patches:
   - Fix a crash in the NFSv4 file locking code.
   - Fix an fsync() regression, where we were failing to retry I/O in
     some circumstances.
   - Fix an infinite loop in NFSv4.0 OPEN stateid recovery
   - Fix a memory leak when an attempted pnfs fails.
   - Fix a memory leak in the backchannel code
   - Large hostnames were not supported correctly in NFSv4.1
   - Fix a pNFS/flexfiles bug that was impeding error reporting on I/O.
   - Fix a couple of credential issues in pNFS/flexfiles

  Bugfixes + cleanups:
   - Open flag sanity checks in the NFSv4 atomic open codepath
   - More NFSv4 delegation related bugfixes
   - Various NFSv4.1 backchannel bugfixes and cleanups
   - Fix the NFS swap socket code
   - Various cleanups of the NFSv4 SETCLIENTID and EXCHANGE_ID code
   - Fix a UDP transport deadlock issue

  Features:
   - More RDMA client transport improvements
   - NFSv4.2 LAYOUTSTATS functionality for pnfs flexfiles"

* tag 'nfs-for-4.2-1' of git://git.linux-nfs.org/projects/trondmy/linux-nfs: (87 commits)
  nfs: Remove invalid tk_pid from debug message
  nfs: Remove invalid NFS_ATTR_FATTR_V4_REFERRAL checking in nfs4_get_rootfh
  nfs: Drop bad comment in nfs41_walk_client_list()
  nfs: Remove unneeded micro checking of CONFIG_PROC_FS
  nfs: Don't setting FILE_CREATED flags always
  nfs: Use remove_proc_subtree() instead remove_proc_entry()
  nfs: Remove unused argument in nfs_server_set_fsinfo()
  nfs: Fix a memory leak when meeting an unsupported state protect
  nfs: take extra reference to fl->fl_file when running a LOCKU operation
  NFSv4: When returning a delegation, don't reclaim an incompatible open mode.
  NFSv4.2: LAYOUTSTATS is optional to implement
  NFSv4.2: Fix up a decoding error in layoutstats
  pNFS/flexfiles: Fix the reset of struct pgio_header when resending
  pNFS/flexfiles: Turn off layoutcommit for servers that don't need it
  pnfs/flexfiles: protect ktime manipulation with mirror lock
  nfs: provide pnfs_report_layoutstat when NFS42 is disabled
  nfs: verify open flags before allowing open
  nfs: always update creds in mirror, even when we have an already connected ds
  nfs: fix potential credential leak in ff_layout_update_mirror_cred
  pnfs/flexfiles: report layoutstat regularly
  ...
diff --git a/fs/nfs/callback.c b/fs/nfs/callback.c
index 8d129bb..682529c 100644
--- a/fs/nfs/callback.c
+++ b/fs/nfs/callback.c
@@ -458,7 +458,7 @@
  * pg_authenticate method for nfsv4 callback threads.
  *
  * The authflavor has been negotiated, so an incorrect flavor is a server
- * bug. Drop packets with incorrect authflavor.
+ * bug. Deny packets with incorrect authflavor.
  *
  * All other checking done after NFS decoding where the nfs_client can be
  * found in nfs4_callback_compound
@@ -468,12 +468,12 @@
 	switch (rqstp->rq_authop->flavour) {
 	case RPC_AUTH_NULL:
 		if (rqstp->rq_proc != CB_NULL)
-			return SVC_DROP;
+			return SVC_DENIED;
 		break;
 	case RPC_AUTH_GSS:
 		/* No RPC_AUTH_GSS support yet in NFSv4.1 */
 		 if (svc_is_backchannel(rqstp))
-			return SVC_DROP;
+			return SVC_DENIED;
 	}
 	return SVC_OK;
 }
diff --git a/fs/nfs/callback_proc.c b/fs/nfs/callback_proc.c
index 197806f..29e3c1b 100644
--- a/fs/nfs/callback_proc.c
+++ b/fs/nfs/callback_proc.c
@@ -327,10 +327,8 @@
 	dprintk("%s slot table seqid: %u\n", __func__, slot->seq_nr);
 
 	/* Normal */
-	if (likely(args->csa_sequenceid == slot->seq_nr + 1)) {
-		slot->seq_nr++;
+	if (likely(args->csa_sequenceid == slot->seq_nr + 1))
 		goto out_ok;
-	}
 
 	/* Replay */
 	if (args->csa_sequenceid == slot->seq_nr) {
@@ -418,6 +416,7 @@
 			      struct cb_process_state *cps)
 {
 	struct nfs4_slot_table *tbl;
+	struct nfs4_slot *slot;
 	struct nfs_client *clp;
 	int i;
 	__be32 status = htonl(NFS4ERR_BADSESSION);
@@ -429,25 +428,32 @@
 
 	if (!(clp->cl_session->flags & SESSION4_BACK_CHAN))
 		goto out;
+
 	tbl = &clp->cl_session->bc_slot_table;
+	slot = tbl->slots + args->csa_slotid;
 
 	spin_lock(&tbl->slot_tbl_lock);
 	/* state manager is resetting the session */
 	if (test_bit(NFS4_SLOT_TBL_DRAINING, &tbl->slot_tbl_state)) {
-		spin_unlock(&tbl->slot_tbl_lock);
 		status = htonl(NFS4ERR_DELAY);
 		/* Return NFS4ERR_BADSESSION if we're draining the session
 		 * in order to reset it.
 		 */
 		if (test_bit(NFS4CLNT_SESSION_RESET, &clp->cl_state))
 			status = htonl(NFS4ERR_BADSESSION);
-		goto out;
+		goto out_unlock;
 	}
 
-	status = validate_seqid(&clp->cl_session->bc_slot_table, args);
-	spin_unlock(&tbl->slot_tbl_lock);
+	memcpy(&res->csr_sessionid, &args->csa_sessionid,
+	       sizeof(res->csr_sessionid));
+	res->csr_sequenceid = args->csa_sequenceid;
+	res->csr_slotid = args->csa_slotid;
+	res->csr_highestslotid = NFS41_BC_MAX_CALLBACKS - 1;
+	res->csr_target_highestslotid = NFS41_BC_MAX_CALLBACKS - 1;
+
+	status = validate_seqid(tbl, args);
 	if (status)
-		goto out;
+		goto out_unlock;
 
 	cps->slotid = args->csa_slotid;
 
@@ -458,15 +464,17 @@
 	 */
 	if (referring_call_exists(clp, args->csa_nrclists, args->csa_rclists)) {
 		status = htonl(NFS4ERR_DELAY);
-		goto out;
+		goto out_unlock;
 	}
 
-	memcpy(&res->csr_sessionid, &args->csa_sessionid,
-	       sizeof(res->csr_sessionid));
-	res->csr_sequenceid = args->csa_sequenceid;
-	res->csr_slotid = args->csa_slotid;
-	res->csr_highestslotid = NFS41_BC_MAX_CALLBACKS - 1;
-	res->csr_target_highestslotid = NFS41_BC_MAX_CALLBACKS - 1;
+	/*
+	 * RFC5661 20.9.3
+	 * If CB_SEQUENCE returns an error, then the state of the slot
+	 * (sequence ID, cached reply) MUST NOT change.
+	 */
+	slot->seq_nr++;
+out_unlock:
+	spin_unlock(&tbl->slot_tbl_lock);
 
 out:
 	cps->clp = clp; /* put in nfs4_callback_compound */
diff --git a/fs/nfs/callback_xdr.c b/fs/nfs/callback_xdr.c
index 19ca95c..6b1697a 100644
--- a/fs/nfs/callback_xdr.c
+++ b/fs/nfs/callback_xdr.c
@@ -909,7 +909,7 @@
 	xdr_init_encode(&xdr_out, &rqstp->rq_res, p);
 
 	status = decode_compound_hdr_arg(&xdr_in, &hdr_arg);
-	if (status == __constant_htonl(NFS4ERR_RESOURCE))
+	if (status == htonl(NFS4ERR_RESOURCE))
 		return rpc_garbage_args;
 
 	if (hdr_arg.minorversion == 0) {
diff --git a/fs/nfs/client.c b/fs/nfs/client.c
index 892aeff..ecebb40 100644
--- a/fs/nfs/client.c
+++ b/fs/nfs/client.c
@@ -825,7 +825,6 @@
  * Load up the server record from information gained in an fsinfo record
  */
 static void nfs_server_set_fsinfo(struct nfs_server *server,
-				  struct nfs_fh *mntfh,
 				  struct nfs_fsinfo *fsinfo)
 {
 	unsigned long max_rpc_payload;
@@ -901,7 +900,7 @@
 	if (error < 0)
 		goto out_error;
 
-	nfs_server_set_fsinfo(server, mntfh, &fsinfo);
+	nfs_server_set_fsinfo(server, &fsinfo);
 
 	/* Get some general file system info */
 	if (server->namelen == 0) {
@@ -1193,8 +1192,6 @@
 }
 
 #ifdef CONFIG_PROC_FS
-static struct proc_dir_entry *proc_fs_nfs;
-
 static int nfs_server_list_open(struct inode *inode, struct file *file);
 static void *nfs_server_list_start(struct seq_file *p, loff_t *pos);
 static void *nfs_server_list_next(struct seq_file *p, void *v, loff_t *pos);
@@ -1364,27 +1361,29 @@
 {
 	struct nfs_server *server;
 	struct nfs_client *clp;
-	char dev[8], fsid[17];
+	char dev[13];	// 8 for 2^24, 1 for ':', 3 for 2^8, 1 for '\0'
+	char fsid[34];	// 2 * 16 for %llx, 1 for ':', 1 for '\0'
 	struct nfs_net *nn = net_generic(seq_file_net(m), nfs_net_id);
 
 	/* display header on line 1 */
 	if (v == &nn->nfs_volume_list) {
-		seq_puts(m, "NV SERVER   PORT DEV     FSID              FSC\n");
+		seq_puts(m, "NV SERVER   PORT DEV          FSID"
+			    "                              FSC\n");
 		return 0;
 	}
 	/* display one transport per line on subsequent lines */
 	server = list_entry(v, struct nfs_server, master_link);
 	clp = server->nfs_client;
 
-	snprintf(dev, 8, "%u:%u",
+	snprintf(dev, sizeof(dev), "%u:%u",
 		 MAJOR(server->s_dev), MINOR(server->s_dev));
 
-	snprintf(fsid, 17, "%llx:%llx",
+	snprintf(fsid, sizeof(fsid), "%llx:%llx",
 		 (unsigned long long) server->fsid.major,
 		 (unsigned long long) server->fsid.minor);
 
 	rcu_read_lock();
-	seq_printf(m, "v%u %s %s %-7s %-17s %s\n",
+	seq_printf(m, "v%u %s %s %-12s %-33s %s\n",
 		   clp->rpc_ops->version,
 		   rpc_peeraddr2str(clp->cl_rpcclient, RPC_DISPLAY_HEX_ADDR),
 		   rpc_peeraddr2str(clp->cl_rpcclient, RPC_DISPLAY_HEX_PORT),
@@ -1434,27 +1433,20 @@
  */
 int __init nfs_fs_proc_init(void)
 {
-	struct proc_dir_entry *p;
-
-	proc_fs_nfs = proc_mkdir("fs/nfsfs", NULL);
-	if (!proc_fs_nfs)
+	if (!proc_mkdir("fs/nfsfs", NULL))
 		goto error_0;
 
 	/* a file of servers with which we're dealing */
-	p = proc_symlink("servers", proc_fs_nfs, "../../net/nfsfs/servers");
-	if (!p)
+	if (!proc_symlink("fs/nfsfs/servers", NULL, "../../net/nfsfs/servers"))
 		goto error_1;
 
 	/* a file of volumes that we have mounted */
-	p = proc_symlink("volumes", proc_fs_nfs, "../../net/nfsfs/volumes");
-	if (!p)
-		goto error_2;
-	return 0;
+	if (!proc_symlink("fs/nfsfs/volumes", NULL, "../../net/nfsfs/volumes"))
+		goto error_1;
 
-error_2:
-	remove_proc_entry("servers", proc_fs_nfs);
+	return 0;
 error_1:
-	remove_proc_entry("fs/nfsfs", NULL);
+	remove_proc_subtree("fs/nfsfs", NULL);
 error_0:
 	return -ENOMEM;
 }
@@ -1464,9 +1456,7 @@
  */
 void nfs_fs_proc_exit(void)
 {
-	remove_proc_entry("volumes", proc_fs_nfs);
-	remove_proc_entry("servers", proc_fs_nfs);
-	remove_proc_entry("fs/nfsfs", NULL);
+	remove_proc_subtree("fs/nfsfs", NULL);
 }
 
 #endif /* CONFIG_PROC_FS */
diff --git a/fs/nfs/dir.c b/fs/nfs/dir.c
index b2c8b31..21457bb 100644
--- a/fs/nfs/dir.c
+++ b/fs/nfs/dir.c
@@ -1470,9 +1470,6 @@
 {
 	int err;
 
-	if ((open_flags & (O_CREAT | O_EXCL)) == (O_CREAT | O_EXCL))
-		*opened |= FILE_CREATED;
-
 	err = finish_open(file, dentry, do_open, opened);
 	if (err)
 		goto out;
diff --git a/fs/nfs/file.c b/fs/nfs/file.c
index 8b8d83a..cc4fa1e 100644
--- a/fs/nfs/file.c
+++ b/fs/nfs/file.c
@@ -555,31 +555,22 @@
 	return nfs_wb_page(inode, page);
 }
 
-#ifdef CONFIG_NFS_SWAP
 static int nfs_swap_activate(struct swap_info_struct *sis, struct file *file,
 						sector_t *span)
 {
-	int ret;
 	struct rpc_clnt *clnt = NFS_CLIENT(file->f_mapping->host);
 
 	*span = sis->pages;
 
-	rcu_read_lock();
-	ret = xs_swapper(rcu_dereference(clnt->cl_xprt), 1);
-	rcu_read_unlock();
-
-	return ret;
+	return rpc_clnt_swap_activate(clnt);
 }
 
 static void nfs_swap_deactivate(struct file *file)
 {
 	struct rpc_clnt *clnt = NFS_CLIENT(file->f_mapping->host);
 
-	rcu_read_lock();
-	xs_swapper(rcu_dereference(clnt->cl_xprt), 0);
-	rcu_read_unlock();
+	rpc_clnt_swap_deactivate(clnt);
 }
-#endif
 
 const struct address_space_operations nfs_file_aops = {
 	.readpage = nfs_readpage,
@@ -596,10 +587,8 @@
 	.launder_page = nfs_launder_page,
 	.is_dirty_writeback = nfs_check_dirty_writeback,
 	.error_remove_page = generic_error_remove_page,
-#ifdef CONFIG_NFS_SWAP
 	.swap_activate = nfs_swap_activate,
 	.swap_deactivate = nfs_swap_deactivate,
-#endif
 };
 
 /*
diff --git a/fs/nfs/flexfilelayout/flexfilelayout.c b/fs/nfs/flexfilelayout/flexfilelayout.c
index 7d05089..c12951b 100644
--- a/fs/nfs/flexfilelayout/flexfilelayout.c
+++ b/fs/nfs/flexfilelayout/flexfilelayout.c
@@ -20,6 +20,7 @@
 #include "../nfs4trace.h"
 #include "../iostat.h"
 #include "../nfs.h"
+#include "../nfs42.h"
 
 #define NFSDBG_FACILITY         NFSDBG_PNFS_LD
 
@@ -182,17 +183,14 @@
 
 static void ff_layout_sort_mirrors(struct nfs4_ff_layout_segment *fls)
 {
-	struct nfs4_ff_layout_mirror *tmp;
 	int i, j;
 
 	for (i = 0; i < fls->mirror_array_cnt - 1; i++) {
 		for (j = i + 1; j < fls->mirror_array_cnt; j++)
 			if (fls->mirror_array[i]->efficiency <
-			    fls->mirror_array[j]->efficiency) {
-				tmp = fls->mirror_array[i];
-				fls->mirror_array[i] = fls->mirror_array[j];
-				fls->mirror_array[j] = tmp;
-			}
+			    fls->mirror_array[j]->efficiency)
+				swap(fls->mirror_array[i],
+				     fls->mirror_array[j]);
 	}
 }
 
@@ -274,6 +272,7 @@
 
 		spin_lock_init(&fls->mirror_array[i]->lock);
 		fls->mirror_array[i]->ds_count = ds_count;
+		fls->mirror_array[i]->lseg = &fls->generic_hdr;
 
 		/* deviceid */
 		rc = decode_deviceid(&stream, &devid);
@@ -344,6 +343,10 @@
 			fls->mirror_array[i]->gid);
 	}
 
+	p = xdr_inline_decode(&stream, 4);
+	if (p)
+		fls->flags = be32_to_cpup(p);
+
 	ff_layout_sort_mirrors(fls);
 	rc = ff_layout_check_layout(lgr);
 	if (rc)
@@ -415,6 +418,146 @@
 	return 1;
 }
 
+static void
+nfs4_ff_start_busy_timer(struct nfs4_ff_busy_timer *timer)
+{
+	/* first IO request? */
+	if (atomic_inc_return(&timer->n_ops) == 1) {
+		timer->start_time = ktime_get();
+	}
+}
+
+static ktime_t
+nfs4_ff_end_busy_timer(struct nfs4_ff_busy_timer *timer)
+{
+	ktime_t start, now;
+
+	if (atomic_dec_return(&timer->n_ops) < 0)
+		WARN_ON_ONCE(1);
+
+	now = ktime_get();
+	start = timer->start_time;
+	timer->start_time = now;
+	return ktime_sub(now, start);
+}
+
+static ktime_t
+nfs4_ff_layout_calc_completion_time(struct rpc_task *task)
+{
+	return ktime_sub(ktime_get(), task->tk_start);
+}
+
+static bool
+nfs4_ff_layoutstat_start_io(struct nfs4_ff_layout_mirror *mirror,
+			    struct nfs4_ff_layoutstat *layoutstat)
+{
+	static const ktime_t notime = {0};
+	ktime_t now = ktime_get();
+
+	nfs4_ff_start_busy_timer(&layoutstat->busy_timer);
+	if (ktime_equal(mirror->start_time, notime))
+		mirror->start_time = now;
+	if (ktime_equal(mirror->last_report_time, notime))
+		mirror->last_report_time = now;
+	if (ktime_to_ms(ktime_sub(now, mirror->last_report_time)) >=
+			FF_LAYOUTSTATS_REPORT_INTERVAL) {
+		mirror->last_report_time = now;
+		return true;
+	}
+
+	return false;
+}
+
+static void
+nfs4_ff_layout_stat_io_update_requested(struct nfs4_ff_layoutstat *layoutstat,
+		__u64 requested)
+{
+	struct nfs4_ff_io_stat *iostat = &layoutstat->io_stat;
+
+	iostat->ops_requested++;
+	iostat->bytes_requested += requested;
+}
+
+static void
+nfs4_ff_layout_stat_io_update_completed(struct nfs4_ff_layoutstat *layoutstat,
+		__u64 requested,
+		__u64 completed,
+		ktime_t time_completed)
+{
+	struct nfs4_ff_io_stat *iostat = &layoutstat->io_stat;
+	ktime_t timer;
+
+	iostat->ops_completed++;
+	iostat->bytes_completed += completed;
+	iostat->bytes_not_delivered += requested - completed;
+
+	timer = nfs4_ff_end_busy_timer(&layoutstat->busy_timer);
+	iostat->total_busy_time =
+			ktime_add(iostat->total_busy_time, timer);
+	iostat->aggregate_completion_time =
+			ktime_add(iostat->aggregate_completion_time, time_completed);
+}
+
+static void
+nfs4_ff_layout_stat_io_start_read(struct nfs4_ff_layout_mirror *mirror,
+		__u64 requested)
+{
+	bool report;
+
+	spin_lock(&mirror->lock);
+	report = nfs4_ff_layoutstat_start_io(mirror, &mirror->read_stat);
+	nfs4_ff_layout_stat_io_update_requested(&mirror->read_stat, requested);
+	spin_unlock(&mirror->lock);
+
+	if (report)
+		pnfs_report_layoutstat(mirror->lseg->pls_layout->plh_inode);
+}
+
+static void
+nfs4_ff_layout_stat_io_end_read(struct rpc_task *task,
+		struct nfs4_ff_layout_mirror *mirror,
+		__u64 requested,
+		__u64 completed)
+{
+	spin_lock(&mirror->lock);
+	nfs4_ff_layout_stat_io_update_completed(&mirror->read_stat,
+			requested, completed,
+			nfs4_ff_layout_calc_completion_time(task));
+	spin_unlock(&mirror->lock);
+}
+
+static void
+nfs4_ff_layout_stat_io_start_write(struct nfs4_ff_layout_mirror *mirror,
+		__u64 requested)
+{
+	bool report;
+
+	spin_lock(&mirror->lock);
+	report = nfs4_ff_layoutstat_start_io(mirror , &mirror->write_stat);
+	nfs4_ff_layout_stat_io_update_requested(&mirror->write_stat, requested);
+	spin_unlock(&mirror->lock);
+
+	if (report)
+		pnfs_report_layoutstat(mirror->lseg->pls_layout->plh_inode);
+}
+
+static void
+nfs4_ff_layout_stat_io_end_write(struct rpc_task *task,
+		struct nfs4_ff_layout_mirror *mirror,
+		__u64 requested,
+		__u64 completed,
+		enum nfs3_stable_how committed)
+{
+	if (committed == NFS_UNSTABLE)
+		requested = completed = 0;
+
+	spin_lock(&mirror->lock);
+	nfs4_ff_layout_stat_io_update_completed(&mirror->write_stat,
+			requested, completed,
+			nfs4_ff_layout_calc_completion_time(task));
+	spin_unlock(&mirror->lock);
+}
+
 static int
 ff_layout_alloc_commit_info(struct pnfs_layout_segment *lseg,
 			    struct nfs_commit_info *cinfo,
@@ -631,7 +774,7 @@
 			nfs_direct_set_resched_writes(hdr->dreq);
 			/* fake unstable write to let common nfs resend pages */
 			hdr->verf.committed = NFS_UNSTABLE;
-			hdr->good_bytes = 0;
+			hdr->good_bytes = hdr->args.count;
 		}
 		return;
 	}
@@ -879,6 +1022,12 @@
 	return 0;
 }
 
+static bool
+ff_layout_need_layoutcommit(struct pnfs_layout_segment *lseg)
+{
+	return !(FF_LAYOUT_LSEG(lseg)->flags & FF_FLAGS_NO_LAYOUTCOMMIT);
+}
+
 /*
  * We reference the rpc_cred of the first WRITE that triggers the need for
  * a LAYOUTCOMMIT, and use it to send the layoutcommit compound.
@@ -891,6 +1040,9 @@
 static void
 ff_layout_set_layoutcommit(struct nfs_pgio_header *hdr)
 {
+	if (!ff_layout_need_layoutcommit(hdr->lseg))
+		return;
+
 	pnfs_set_layoutcommit(hdr->inode, hdr->lseg,
 			hdr->mds_offset + hdr->res.count);
 	dprintk("%s inode %lu pls_end_pos %lu\n", __func__, hdr->inode->i_ino,
@@ -909,6 +1061,10 @@
 static int ff_layout_read_prepare_common(struct rpc_task *task,
 					 struct nfs_pgio_header *hdr)
 {
+	nfs4_ff_layout_stat_io_start_read(
+			FF_LAYOUT_COMP(hdr->lseg, hdr->pgio_mirror_idx),
+			hdr->args.count);
+
 	if (unlikely(test_bit(NFS_CONTEXT_BAD, &hdr->args.context->flags))) {
 		rpc_exit(task, -EIO);
 		return -EIO;
@@ -962,15 +1118,15 @@
 {
 	struct nfs_pgio_header *hdr = data;
 
-	if (ff_layout_read_prepare_common(task, hdr))
-		return;
-
 	if (ff_layout_setup_sequence(hdr->ds_clp,
 				     &hdr->args.seq_args,
 				     &hdr->res.seq_res,
 				     task))
 		return;
 
+	if (ff_layout_read_prepare_common(task, hdr))
+		return;
+
 	if (nfs4_set_rw_stateid(&hdr->args.stateid, hdr->args.context,
 			hdr->args.lock_context, FMODE_READ) == -EIO)
 		rpc_exit(task, -EIO); /* lost lock, terminate I/O */
@@ -982,6 +1138,10 @@
 
 	dprintk("--> %s task->tk_status %d\n", __func__, task->tk_status);
 
+	nfs4_ff_layout_stat_io_end_read(task,
+			FF_LAYOUT_COMP(hdr->lseg, hdr->pgio_mirror_idx),
+			hdr->args.count, hdr->res.count);
+
 	if (test_bit(NFS_IOHDR_REDO, &hdr->flags) &&
 	    task->tk_status == 0) {
 		nfs4_sequence_done(task, &hdr->res.seq_res);
@@ -1074,7 +1234,8 @@
 		return -EAGAIN;
 	}
 
-	if (data->verf.committed == NFS_UNSTABLE)
+	if (data->verf.committed == NFS_UNSTABLE
+	    && ff_layout_need_layoutcommit(data->lseg))
 		pnfs_set_layoutcommit(data->inode, data->lseg, data->lwb);
 
 	return 0;
@@ -1083,6 +1244,10 @@
 static int ff_layout_write_prepare_common(struct rpc_task *task,
 					  struct nfs_pgio_header *hdr)
 {
+	nfs4_ff_layout_stat_io_start_write(
+			FF_LAYOUT_COMP(hdr->lseg, hdr->pgio_mirror_idx),
+			hdr->args.count);
+
 	if (unlikely(test_bit(NFS_CONTEXT_BAD, &hdr->args.context->flags))) {
 		rpc_exit(task, -EIO);
 		return -EIO;
@@ -1116,15 +1281,15 @@
 {
 	struct nfs_pgio_header *hdr = data;
 
-	if (ff_layout_write_prepare_common(task, hdr))
-		return;
-
 	if (ff_layout_setup_sequence(hdr->ds_clp,
 				     &hdr->args.seq_args,
 				     &hdr->res.seq_res,
 				     task))
 		return;
 
+	if (ff_layout_write_prepare_common(task, hdr))
+		return;
+
 	if (nfs4_set_rw_stateid(&hdr->args.stateid, hdr->args.context,
 			hdr->args.lock_context, FMODE_WRITE) == -EIO)
 		rpc_exit(task, -EIO); /* lost lock, terminate I/O */
@@ -1134,6 +1299,11 @@
 {
 	struct nfs_pgio_header *hdr = data;
 
+	nfs4_ff_layout_stat_io_end_write(task,
+			FF_LAYOUT_COMP(hdr->lseg, hdr->pgio_mirror_idx),
+			hdr->args.count, hdr->res.count,
+			hdr->res.verf->committed);
+
 	if (test_bit(NFS_IOHDR_REDO, &hdr->flags) &&
 	    task->tk_status == 0) {
 		nfs4_sequence_done(task, &hdr->res.seq_res);
@@ -1152,8 +1322,17 @@
 	    &NFS_CLIENT(hdr->inode)->cl_metrics[NFSPROC4_CLNT_WRITE]);
 }
 
+static void ff_layout_commit_prepare_common(struct rpc_task *task,
+		struct nfs_commit_data *cdata)
+{
+	nfs4_ff_layout_stat_io_start_write(
+			FF_LAYOUT_COMP(cdata->lseg, cdata->ds_commit_index),
+			0);
+}
+
 static void ff_layout_commit_prepare_v3(struct rpc_task *task, void *data)
 {
+	ff_layout_commit_prepare_common(task, data);
 	rpc_call_start(task);
 }
 
@@ -1161,10 +1340,30 @@
 {
 	struct nfs_commit_data *wdata = data;
 
-	ff_layout_setup_sequence(wdata->ds_clp,
+	if (ff_layout_setup_sequence(wdata->ds_clp,
 				 &wdata->args.seq_args,
 				 &wdata->res.seq_res,
-				 task);
+				 task))
+		return;
+	ff_layout_commit_prepare_common(task, data);
+}
+
+static void ff_layout_commit_done(struct rpc_task *task, void *data)
+{
+	struct nfs_commit_data *cdata = data;
+	struct nfs_page *req;
+	__u64 count = 0;
+
+	if (task->tk_status == 0) {
+		list_for_each_entry(req, &cdata->pages, wb_list)
+			count += req->wb_bytes;
+	}
+
+	nfs4_ff_layout_stat_io_end_write(task,
+			FF_LAYOUT_COMP(cdata->lseg, cdata->ds_commit_index),
+			count, count, NFS_FILE_SYNC);
+
+	pnfs_generic_write_commit_done(task, data);
 }
 
 static void ff_layout_commit_count_stats(struct rpc_task *task, void *data)
@@ -1205,14 +1404,14 @@
 
 static const struct rpc_call_ops ff_layout_commit_call_ops_v3 = {
 	.rpc_call_prepare = ff_layout_commit_prepare_v3,
-	.rpc_call_done = pnfs_generic_write_commit_done,
+	.rpc_call_done = ff_layout_commit_done,
 	.rpc_count_stats = ff_layout_commit_count_stats,
 	.rpc_release = pnfs_generic_commit_release,
 };
 
 static const struct rpc_call_ops ff_layout_commit_call_ops_v4 = {
 	.rpc_call_prepare = ff_layout_commit_prepare_v4,
-	.rpc_call_done = pnfs_generic_write_commit_done,
+	.rpc_call_done = ff_layout_commit_done,
 	.rpc_count_stats = ff_layout_commit_count_stats,
 	.rpc_release = pnfs_generic_commit_release,
 };
@@ -1256,7 +1455,6 @@
 	fh = nfs4_ff_layout_select_ds_fh(lseg, idx);
 	if (fh)
 		hdr->args.fh = fh;
-
 	/*
 	 * Note that if we ever decide to split across DSes,
 	 * then we may need to handle dense-like offsets.
@@ -1385,6 +1583,7 @@
 	fh = select_ds_fh_from_commit(lseg, data->ds_commit_index);
 	if (fh)
 		data->args.fh = fh;
+
 	return nfs_initiate_commit(ds_clnt, data, ds->ds_clp->rpc_ops,
 				   vers == 3 ? &ff_layout_commit_call_ops_v3 :
 					       &ff_layout_commit_call_ops_v4,
@@ -1488,6 +1687,247 @@
 	dprintk("%s: Return\n", __func__);
 }
 
+static int
+ff_layout_ntop4(const struct sockaddr *sap, char *buf, const size_t buflen)
+{
+	const struct sockaddr_in *sin = (struct sockaddr_in *)sap;
+
+	return snprintf(buf, buflen, "%pI4", &sin->sin_addr);
+}
+
+static size_t
+ff_layout_ntop6_noscopeid(const struct sockaddr *sap, char *buf,
+			  const int buflen)
+{
+	const struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)sap;
+	const struct in6_addr *addr = &sin6->sin6_addr;
+
+	/*
+	 * RFC 4291, Section 2.2.2
+	 *
+	 * Shorthanded ANY address
+	 */
+	if (ipv6_addr_any(addr))
+		return snprintf(buf, buflen, "::");
+
+	/*
+	 * RFC 4291, Section 2.2.2
+	 *
+	 * Shorthanded loopback address
+	 */
+	if (ipv6_addr_loopback(addr))
+		return snprintf(buf, buflen, "::1");
+
+	/*
+	 * RFC 4291, Section 2.2.3
+	 *
+	 * Special presentation address format for mapped v4
+	 * addresses.
+	 */
+	if (ipv6_addr_v4mapped(addr))
+		return snprintf(buf, buflen, "::ffff:%pI4",
+					&addr->s6_addr32[3]);
+
+	/*
+	 * RFC 4291, Section 2.2.1
+	 */
+	return snprintf(buf, buflen, "%pI6c", addr);
+}
+
+/* Derived from rpc_sockaddr2uaddr */
+static void
+ff_layout_encode_netaddr(struct xdr_stream *xdr, struct nfs4_pnfs_ds_addr *da)
+{
+	struct sockaddr *sap = (struct sockaddr *)&da->da_addr;
+	char portbuf[RPCBIND_MAXUADDRPLEN];
+	char addrbuf[RPCBIND_MAXUADDRLEN];
+	char *netid;
+	unsigned short port;
+	int len, netid_len;
+	__be32 *p;
+
+	switch (sap->sa_family) {
+	case AF_INET:
+		if (ff_layout_ntop4(sap, addrbuf, sizeof(addrbuf)) == 0)
+			return;
+		port = ntohs(((struct sockaddr_in *)sap)->sin_port);
+		netid = "tcp";
+		netid_len = 3;
+		break;
+	case AF_INET6:
+		if (ff_layout_ntop6_noscopeid(sap, addrbuf, sizeof(addrbuf)) == 0)
+			return;
+		port = ntohs(((struct sockaddr_in6 *)sap)->sin6_port);
+		netid = "tcp6";
+		netid_len = 4;
+		break;
+	default:
+		/* we only support tcp and tcp6 */
+		WARN_ON_ONCE(1);
+		return;
+	}
+
+	snprintf(portbuf, sizeof(portbuf), ".%u.%u", port >> 8, port & 0xff);
+	len = strlcat(addrbuf, portbuf, sizeof(addrbuf));
+
+	p = xdr_reserve_space(xdr, 4 + netid_len);
+	xdr_encode_opaque(p, netid, netid_len);
+
+	p = xdr_reserve_space(xdr, 4 + len);
+	xdr_encode_opaque(p, addrbuf, len);
+}
+
+static void
+ff_layout_encode_nfstime(struct xdr_stream *xdr,
+			 ktime_t t)
+{
+	struct timespec64 ts;
+	__be32 *p;
+
+	p = xdr_reserve_space(xdr, 12);
+	ts = ktime_to_timespec64(t);
+	p = xdr_encode_hyper(p, ts.tv_sec);
+	*p++ = cpu_to_be32(ts.tv_nsec);
+}
+
+static void
+ff_layout_encode_io_latency(struct xdr_stream *xdr,
+			    struct nfs4_ff_io_stat *stat)
+{
+	__be32 *p;
+
+	p = xdr_reserve_space(xdr, 5 * 8);
+	p = xdr_encode_hyper(p, stat->ops_requested);
+	p = xdr_encode_hyper(p, stat->bytes_requested);
+	p = xdr_encode_hyper(p, stat->ops_completed);
+	p = xdr_encode_hyper(p, stat->bytes_completed);
+	p = xdr_encode_hyper(p, stat->bytes_not_delivered);
+	ff_layout_encode_nfstime(xdr, stat->total_busy_time);
+	ff_layout_encode_nfstime(xdr, stat->aggregate_completion_time);
+}
+
+static void
+ff_layout_encode_layoutstats(struct xdr_stream *xdr,
+			     struct nfs42_layoutstat_args *args,
+			     struct nfs42_layoutstat_devinfo *devinfo)
+{
+	struct nfs4_ff_layout_mirror *mirror = devinfo->layout_private;
+	struct nfs4_pnfs_ds_addr *da;
+	struct nfs4_pnfs_ds *ds = mirror->mirror_ds->ds;
+	struct nfs_fh *fh = &mirror->fh_versions[0];
+	__be32 *p, *start;
+
+	da = list_first_entry(&ds->ds_addrs, struct nfs4_pnfs_ds_addr, da_node);
+	dprintk("%s: DS %s: encoding address %s\n",
+		__func__, ds->ds_remotestr, da->da_remotestr);
+	/* layoutupdate length */
+	start = xdr_reserve_space(xdr, 4);
+	/* netaddr4 */
+	ff_layout_encode_netaddr(xdr, da);
+	/* nfs_fh4 */
+	p = xdr_reserve_space(xdr, 4 + fh->size);
+	xdr_encode_opaque(p, fh->data, fh->size);
+	/* ff_io_latency4 read */
+	spin_lock(&mirror->lock);
+	ff_layout_encode_io_latency(xdr, &mirror->read_stat.io_stat);
+	/* ff_io_latency4 write */
+	ff_layout_encode_io_latency(xdr, &mirror->write_stat.io_stat);
+	spin_unlock(&mirror->lock);
+	/* nfstime4 */
+	ff_layout_encode_nfstime(xdr, ktime_sub(ktime_get(), mirror->start_time));
+	/* bool */
+	p = xdr_reserve_space(xdr, 4);
+	*p = cpu_to_be32(false);
+
+	*start = cpu_to_be32((xdr->p - start - 1) * 4);
+}
+
+static bool
+ff_layout_mirror_prepare_stats(struct nfs42_layoutstat_args *args,
+			       struct pnfs_layout_segment *pls,
+			       int *dev_count, int dev_limit)
+{
+	struct nfs4_ff_layout_mirror *mirror;
+	struct nfs4_deviceid_node *dev;
+	struct nfs42_layoutstat_devinfo *devinfo;
+	int i;
+
+	for (i = 0; i <= FF_LAYOUT_MIRROR_COUNT(pls); i++) {
+		if (*dev_count >= dev_limit)
+			break;
+		mirror = FF_LAYOUT_COMP(pls, i);
+		if (!mirror || !mirror->mirror_ds)
+			continue;
+		dev = FF_LAYOUT_DEVID_NODE(pls, i);
+		devinfo = &args->devinfo[*dev_count];
+		memcpy(&devinfo->dev_id, &dev->deviceid, NFS4_DEVICEID4_SIZE);
+		devinfo->offset = pls->pls_range.offset;
+		devinfo->length = pls->pls_range.length;
+		/* well, we don't really know if IO is continuous or not! */
+		devinfo->read_count = mirror->read_stat.io_stat.bytes_completed;
+		devinfo->read_bytes = mirror->read_stat.io_stat.bytes_completed;
+		devinfo->write_count = mirror->write_stat.io_stat.bytes_completed;
+		devinfo->write_bytes = mirror->write_stat.io_stat.bytes_completed;
+		devinfo->layout_type = LAYOUT_FLEX_FILES;
+		devinfo->layoutstats_encode = ff_layout_encode_layoutstats;
+		devinfo->layout_private = mirror;
+		/* lseg refcount put in cleanup_layoutstats */
+		pnfs_get_lseg(pls);
+
+		++(*dev_count);
+	}
+
+	return *dev_count < dev_limit;
+}
+
+static int
+ff_layout_prepare_layoutstats(struct nfs42_layoutstat_args *args)
+{
+	struct pnfs_layout_segment *pls;
+	int dev_count = 0;
+
+	spin_lock(&args->inode->i_lock);
+	list_for_each_entry(pls, &NFS_I(args->inode)->layout->plh_segs, pls_list) {
+		dev_count += FF_LAYOUT_MIRROR_COUNT(pls);
+	}
+	spin_unlock(&args->inode->i_lock);
+	/* For now, send at most PNFS_LAYOUTSTATS_MAXDEV statistics */
+	if (dev_count > PNFS_LAYOUTSTATS_MAXDEV) {
+		dprintk("%s: truncating devinfo to limit (%d:%d)\n",
+			__func__, dev_count, PNFS_LAYOUTSTATS_MAXDEV);
+		dev_count = PNFS_LAYOUTSTATS_MAXDEV;
+	}
+	args->devinfo = kmalloc(dev_count * sizeof(*args->devinfo), GFP_KERNEL);
+	if (!args->devinfo)
+		return -ENOMEM;
+
+	dev_count = 0;
+	spin_lock(&args->inode->i_lock);
+	list_for_each_entry(pls, &NFS_I(args->inode)->layout->plh_segs, pls_list) {
+		if (!ff_layout_mirror_prepare_stats(args, pls, &dev_count,
+						    PNFS_LAYOUTSTATS_MAXDEV)) {
+			break;
+		}
+	}
+	spin_unlock(&args->inode->i_lock);
+	args->num_dev = dev_count;
+
+	return 0;
+}
+
+static void
+ff_layout_cleanup_layoutstats(struct nfs42_layoutstat_data *data)
+{
+	struct nfs4_ff_layout_mirror *mirror;
+	int i;
+
+	for (i = 0; i < data->args.num_dev; i++) {
+		mirror = data->args.devinfo[i].layout_private;
+		data->args.devinfo[i].layout_private = NULL;
+		pnfs_put_lseg(mirror->lseg);
+	}
+}
+
 static struct pnfs_layoutdriver_type flexfilelayout_type = {
 	.id			= LAYOUT_FLEX_FILES,
 	.name			= "LAYOUT_FLEX_FILES",
@@ -1510,6 +1950,8 @@
 	.alloc_deviceid_node    = ff_layout_alloc_deviceid_node,
 	.encode_layoutreturn    = ff_layout_encode_layoutreturn,
 	.sync			= pnfs_nfs_generic_sync,
+	.prepare_layoutstats	= ff_layout_prepare_layoutstats,
+	.cleanup_layoutstats	= ff_layout_cleanup_layoutstats,
 };
 
 static int __init nfs4flexfilelayout_init(void)
diff --git a/fs/nfs/flexfilelayout/flexfilelayout.h b/fs/nfs/flexfilelayout/flexfilelayout.h
index 070f204..f92f9a0 100644
--- a/fs/nfs/flexfilelayout/flexfilelayout.h
+++ b/fs/nfs/flexfilelayout/flexfilelayout.h
@@ -9,12 +9,17 @@
 #ifndef FS_NFS_NFS4FLEXFILELAYOUT_H
 #define FS_NFS_NFS4FLEXFILELAYOUT_H
 
+#define FF_FLAGS_NO_LAYOUTCOMMIT 1
+
 #include "../pnfs.h"
 
 /* XXX: Let's filter out insanely large mirror count for now to avoid oom
  * due to network error etc. */
 #define NFS4_FLEXFILE_LAYOUT_MAX_MIRROR_CNT 4096
 
+/* LAYOUTSTATS report interval in ms */
+#define FF_LAYOUTSTATS_REPORT_INTERVAL (60000L)
+
 struct nfs4_ff_ds_version {
 	u32				version;
 	u32				minor_version;
@@ -41,24 +46,48 @@
 	struct nfs4_deviceid		deviceid;
 };
 
+struct nfs4_ff_io_stat {
+	__u64				ops_requested;
+	__u64				bytes_requested;
+	__u64				ops_completed;
+	__u64				bytes_completed;
+	__u64				bytes_not_delivered;
+	ktime_t				total_busy_time;
+	ktime_t				aggregate_completion_time;
+};
+
+struct nfs4_ff_busy_timer {
+	ktime_t start_time;
+	atomic_t n_ops;
+};
+
+struct nfs4_ff_layoutstat {
+	struct nfs4_ff_io_stat io_stat;
+	struct nfs4_ff_busy_timer busy_timer;
+};
+
 struct nfs4_ff_layout_mirror {
+	struct pnfs_layout_segment	*lseg; /* back pointer */
 	u32				ds_count;
 	u32				efficiency;
 	struct nfs4_ff_layout_ds	*mirror_ds;
 	u32				fh_versions_cnt;
 	struct nfs_fh			*fh_versions;
 	nfs4_stateid			stateid;
-	struct nfs4_string		user_name;
-	struct nfs4_string		group_name;
 	u32				uid;
 	u32				gid;
 	struct rpc_cred			*cred;
 	spinlock_t			lock;
+	struct nfs4_ff_layoutstat	read_stat;
+	struct nfs4_ff_layoutstat	write_stat;
+	ktime_t				start_time;
+	ktime_t				last_report_time;
 };
 
 struct nfs4_ff_layout_segment {
 	struct pnfs_layout_segment	generic_hdr;
 	u64				stripe_unit;
+	u32				flags;
 	u32				mirror_array_cnt;
 	struct nfs4_ff_layout_mirror	**mirror_array;
 };
diff --git a/fs/nfs/flexfilelayout/flexfilelayoutdev.c b/fs/nfs/flexfilelayout/flexfilelayoutdev.c
index 77a2d02..f13e196 100644
--- a/fs/nfs/flexfilelayout/flexfilelayoutdev.c
+++ b/fs/nfs/flexfilelayout/flexfilelayoutdev.c
@@ -324,7 +324,8 @@
 				__func__, PTR_ERR(cred));
 			return PTR_ERR(cred);
 		} else {
-			mirror->cred = cred;
+			if (cmpxchg(&mirror->cred, NULL, cred))
+				put_rpccred(cred);
 		}
 	}
 	return 0;
@@ -386,7 +387,7 @@
 	/* matching smp_wmb() in _nfs4_pnfs_v3/4_ds_connect */
 	smp_rmb();
 	if (ds->ds_clp)
-		goto out;
+		goto out_update_creds;
 
 	flavor = nfs4_ff_layout_choose_authflavor(mirror);
 
@@ -430,7 +431,7 @@
 			}
 		}
 	}
-
+out_update_creds:
 	if (ff_layout_update_mirror_cred(mirror, ds))
 		ds = NULL;
 out:
diff --git a/fs/nfs/inode.c b/fs/nfs/inode.c
index f734562..b77b328 100644
--- a/fs/nfs/inode.c
+++ b/fs/nfs/inode.c
@@ -678,6 +678,8 @@
 	if (!err) {
 		generic_fillattr(inode, stat);
 		stat->ino = nfs_compat_user_ino64(NFS_FILEID(inode));
+		if (S_ISDIR(inode->i_mode))
+			stat->blksize = NFS_SERVER(inode)->dtsize;
 	}
 out:
 	trace_nfs_getattr_exit(inode, err);
@@ -2008,17 +2010,15 @@
 	if (err)
 		goto out1;
 
-#ifdef CONFIG_PROC_FS
 	rpc_proc_register(&init_net, &nfs_rpcstat);
-#endif
-	if ((err = register_nfs_fs()) != 0)
+
+	err = register_nfs_fs();
+	if (err)
 		goto out0;
 
 	return 0;
 out0:
-#ifdef CONFIG_PROC_FS
 	rpc_proc_unregister(&init_net, "nfs");
-#endif
 	nfs_destroy_directcache();
 out1:
 	nfs_destroy_writepagecache();
@@ -2049,9 +2049,7 @@
 	nfs_destroy_nfspagecache();
 	nfs_fscache_unregister();
 	unregister_pernet_subsys(&nfs_net_ops);
-#ifdef CONFIG_PROC_FS
 	rpc_proc_unregister(&init_net, "nfs");
-#endif
 	unregister_nfs_fs();
 	nfs_fs_proc_exit();
 	nfsiod_stop();
diff --git a/fs/nfs/nfs3xdr.c b/fs/nfs/nfs3xdr.c
index 53852a4..9b04c2e 100644
--- a/fs/nfs/nfs3xdr.c
+++ b/fs/nfs/nfs3xdr.c
@@ -1342,7 +1342,7 @@
 	if (args->npages != 0)
 		xdr_write_pages(xdr, args->pages, 0, args->len);
 	else
-		xdr_reserve_space(xdr, NFS_ACL_INLINE_BUFSIZE);
+		xdr_reserve_space(xdr, args->len);
 
 	error = nfsacl_encode(xdr->buf, base, args->inode,
 			    (args->mask & NFS_ACL) ?
diff --git a/fs/nfs/nfs42.h b/fs/nfs/nfs42.h
index 7afb894..ff66ae7 100644
--- a/fs/nfs/nfs42.h
+++ b/fs/nfs/nfs42.h
@@ -5,11 +5,18 @@
 #ifndef __LINUX_FS_NFS_NFS4_2_H
 #define __LINUX_FS_NFS_NFS4_2_H
 
+/*
+ * FIXME:  four LAYOUTSTATS calls per compound at most! Do we need to support
+ * more? Need to consider not to pre-alloc too much for a compound.
+ */
+#define PNFS_LAYOUTSTATS_MAXDEV (4)
+
 /* nfs4.2proc.c */
 int nfs42_proc_allocate(struct file *, loff_t, loff_t);
 int nfs42_proc_deallocate(struct file *, loff_t, loff_t);
 loff_t nfs42_proc_llseek(struct file *, loff_t, int);
-
+int nfs42_proc_layoutstats_generic(struct nfs_server *,
+				   struct nfs42_layoutstat_data *);
 /* nfs4.2xdr.h */
 extern struct rpc_procinfo nfs4_2_procedures[];
 
diff --git a/fs/nfs/nfs42proc.c b/fs/nfs/nfs42proc.c
index 3a9e752..f486b80 100644
--- a/fs/nfs/nfs42proc.c
+++ b/fs/nfs/nfs42proc.c
@@ -10,6 +10,11 @@
 #include <linux/nfs_fs.h>
 #include "nfs4_fs.h"
 #include "nfs42.h"
+#include "iostat.h"
+#include "pnfs.h"
+#include "internal.h"
+
+#define NFSDBG_FACILITY NFSDBG_PNFS
 
 static int nfs42_set_rw_stateid(nfs4_stateid *dst, struct file *file,
 				fmode_t fmode)
@@ -165,3 +170,85 @@
 
 	return vfs_setpos(filep, res.sr_offset, inode->i_sb->s_maxbytes);
 }
+
+static void
+nfs42_layoutstat_prepare(struct rpc_task *task, void *calldata)
+{
+	struct nfs42_layoutstat_data *data = calldata;
+	struct nfs_server *server = NFS_SERVER(data->args.inode);
+
+	nfs41_setup_sequence(nfs4_get_session(server), &data->args.seq_args,
+			     &data->res.seq_res, task);
+}
+
+static void
+nfs42_layoutstat_done(struct rpc_task *task, void *calldata)
+{
+	struct nfs42_layoutstat_data *data = calldata;
+
+	if (!nfs4_sequence_done(task, &data->res.seq_res))
+		return;
+
+	switch (task->tk_status) {
+	case 0:
+		break;
+	case -ENOTSUPP:
+	case -EOPNOTSUPP:
+		NFS_SERVER(data->inode)->caps &= ~NFS_CAP_LAYOUTSTATS;
+	default:
+		dprintk("%s server returns %d\n", __func__, task->tk_status);
+	}
+}
+
+static void
+nfs42_layoutstat_release(void *calldata)
+{
+	struct nfs42_layoutstat_data *data = calldata;
+	struct nfs_server *nfss = NFS_SERVER(data->args.inode);
+
+	if (nfss->pnfs_curr_ld->cleanup_layoutstats)
+		nfss->pnfs_curr_ld->cleanup_layoutstats(data);
+
+	pnfs_put_layout_hdr(NFS_I(data->args.inode)->layout);
+	smp_mb__before_atomic();
+	clear_bit(NFS_INO_LAYOUTSTATS, &NFS_I(data->args.inode)->flags);
+	smp_mb__after_atomic();
+	nfs_iput_and_deactive(data->inode);
+	kfree(data->args.devinfo);
+	kfree(data);
+}
+
+static const struct rpc_call_ops nfs42_layoutstat_ops = {
+	.rpc_call_prepare = nfs42_layoutstat_prepare,
+	.rpc_call_done = nfs42_layoutstat_done,
+	.rpc_release = nfs42_layoutstat_release,
+};
+
+int nfs42_proc_layoutstats_generic(struct nfs_server *server,
+				   struct nfs42_layoutstat_data *data)
+{
+	struct rpc_message msg = {
+		.rpc_proc = &nfs4_procedures[NFSPROC4_CLNT_LAYOUTSTATS],
+		.rpc_argp = &data->args,
+		.rpc_resp = &data->res,
+	};
+	struct rpc_task_setup task_setup = {
+		.rpc_client = server->client,
+		.rpc_message = &msg,
+		.callback_ops = &nfs42_layoutstat_ops,
+		.callback_data = data,
+		.flags = RPC_TASK_ASYNC,
+	};
+	struct rpc_task *task;
+
+	data->inode = nfs_igrab_and_active(data->args.inode);
+	if (!data->inode) {
+		nfs42_layoutstat_release(data);
+		return -EAGAIN;
+	}
+	nfs4_init_sequence(&data->args.seq_args, &data->res.seq_res, 0);
+	task = rpc_run_task(&task_setup);
+	if (IS_ERR(task))
+		return PTR_ERR(task);
+	return 0;
+}
diff --git a/fs/nfs/nfs42xdr.c b/fs/nfs/nfs42xdr.c
index 1a25b27..a6bd27d 100644
--- a/fs/nfs/nfs42xdr.c
+++ b/fs/nfs/nfs42xdr.c
@@ -4,6 +4,8 @@
 #ifndef __LINUX_FS_NFS_NFS4_2XDR_H
 #define __LINUX_FS_NFS_NFS4_2XDR_H
 
+#include "nfs42.h"
+
 #define encode_fallocate_maxsz		(encode_stateid_maxsz + \
 					 2 /* offset */ + \
 					 2 /* length */)
@@ -22,6 +24,16 @@
 					 1 /* whence */ + \
 					 2 /* offset */ + \
 					 2 /* length */)
+#define encode_io_info_maxsz		4
+#define encode_layoutstats_maxsz	(op_decode_hdr_maxsz + \
+					2 /* offset */ + \
+					2 /* length */ + \
+					encode_stateid_maxsz + \
+					encode_io_info_maxsz + \
+					encode_io_info_maxsz + \
+					1 /* opaque devaddr4 length */ + \
+					XDR_QUADLEN(PNFS_LAYOUTSTATS_MAXSIZE))
+#define decode_layoutstats_maxsz	(op_decode_hdr_maxsz)
 
 #define NFS4_enc_allocate_sz		(compound_encode_hdr_maxsz + \
 					 encode_putfh_maxsz + \
@@ -45,6 +57,14 @@
 #define NFS4_dec_seek_sz		(compound_decode_hdr_maxsz + \
 					 decode_putfh_maxsz + \
 					 decode_seek_maxsz)
+#define NFS4_enc_layoutstats_sz		(compound_encode_hdr_maxsz + \
+					 encode_sequence_maxsz + \
+					 encode_putfh_maxsz + \
+					 PNFS_LAYOUTSTATS_MAXDEV * encode_layoutstats_maxsz)
+#define NFS4_dec_layoutstats_sz		(compound_decode_hdr_maxsz + \
+					 decode_sequence_maxsz + \
+					 decode_putfh_maxsz + \
+					 PNFS_LAYOUTSTATS_MAXDEV * decode_layoutstats_maxsz)
 
 
 static void encode_fallocate(struct xdr_stream *xdr,
@@ -81,6 +101,33 @@
 	encode_uint32(xdr, args->sa_what);
 }
 
+static void encode_layoutstats(struct xdr_stream *xdr,
+			       struct nfs42_layoutstat_args *args,
+			       struct nfs42_layoutstat_devinfo *devinfo,
+			       struct compound_hdr *hdr)
+{
+	__be32 *p;
+
+	encode_op_hdr(xdr, OP_LAYOUTSTATS, decode_layoutstats_maxsz, hdr);
+	p = reserve_space(xdr, 8 + 8);
+	p = xdr_encode_hyper(p, devinfo->offset);
+	p = xdr_encode_hyper(p, devinfo->length);
+	encode_nfs4_stateid(xdr, &args->stateid);
+	p = reserve_space(xdr, 4*8 + NFS4_DEVICEID4_SIZE + 4);
+	p = xdr_encode_hyper(p, devinfo->read_count);
+	p = xdr_encode_hyper(p, devinfo->read_bytes);
+	p = xdr_encode_hyper(p, devinfo->write_count);
+	p = xdr_encode_hyper(p, devinfo->write_bytes);
+	p = xdr_encode_opaque_fixed(p, devinfo->dev_id.data,
+			NFS4_DEVICEID4_SIZE);
+	/* Encode layoutupdate4 */
+	*p++ = cpu_to_be32(devinfo->layout_type);
+	if (devinfo->layoutstats_encode != NULL)
+		devinfo->layoutstats_encode(xdr, args, devinfo);
+	else
+		encode_uint32(xdr, 0);
+}
+
 /*
  * Encode ALLOCATE request
  */
@@ -137,6 +184,28 @@
 	encode_nops(&hdr);
 }
 
+/*
+ * Encode LAYOUTSTATS request
+ */
+static void nfs4_xdr_enc_layoutstats(struct rpc_rqst *req,
+				     struct xdr_stream *xdr,
+				     struct nfs42_layoutstat_args *args)
+{
+	int i;
+
+	struct compound_hdr hdr = {
+		.minorversion = nfs4_xdr_minorversion(&args->seq_args),
+	};
+
+	encode_compound_hdr(xdr, req, &hdr);
+	encode_sequence(xdr, &args->seq_args, &hdr);
+	encode_putfh(xdr, args->fh, &hdr);
+	WARN_ON(args->num_dev > PNFS_LAYOUTSTATS_MAXDEV);
+	for (i = 0; i < args->num_dev; i++)
+		encode_layoutstats(xdr, args, &args->devinfo[i], &hdr);
+	encode_nops(&hdr);
+}
+
 static int decode_allocate(struct xdr_stream *xdr, struct nfs42_falloc_res *res)
 {
 	return decode_op_hdr(xdr, OP_ALLOCATE);
@@ -169,6 +238,12 @@
 	return -EIO;
 }
 
+static int decode_layoutstats(struct xdr_stream *xdr,
+			      struct nfs42_layoutstat_res *res)
+{
+	return decode_op_hdr(xdr, OP_LAYOUTSTATS);
+}
+
 /*
  * Decode ALLOCATE request
  */
@@ -246,4 +321,35 @@
 out:
 	return status;
 }
+
+/*
+ * Decode LAYOUTSTATS request
+ */
+static int nfs4_xdr_dec_layoutstats(struct rpc_rqst *rqstp,
+				    struct xdr_stream *xdr,
+				    struct nfs42_layoutstat_res *res)
+{
+	struct compound_hdr hdr;
+	int status, i;
+
+	status = decode_compound_hdr(xdr, &hdr);
+	if (status)
+		goto out;
+	status = decode_sequence(xdr, &res->seq_res, rqstp);
+	if (status)
+		goto out;
+	status = decode_putfh(xdr);
+	if (status)
+		goto out;
+	WARN_ON(res->num_dev > PNFS_LAYOUTSTATS_MAXDEV);
+	for (i = 0; i < res->num_dev; i++) {
+		status = decode_layoutstats(xdr, res);
+		if (status)
+			goto out;
+	}
+out:
+	res->rpc_status = status;
+	return status;
+}
+
 #endif /* __LINUX_FS_NFS_NFS4_2XDR_H */
diff --git a/fs/nfs/nfs4_fs.h b/fs/nfs/nfs4_fs.h
index fdef424..ea3bee9 100644
--- a/fs/nfs/nfs4_fs.h
+++ b/fs/nfs/nfs4_fs.h
@@ -233,6 +233,7 @@
 extern int nfs4_call_sync(struct rpc_clnt *, struct nfs_server *,
 			  struct rpc_message *, struct nfs4_sequence_args *,
 			  struct nfs4_sequence_res *, int);
+extern void nfs4_init_sequence(struct nfs4_sequence_args *, struct nfs4_sequence_res *, int);
 extern int nfs4_proc_setclientid(struct nfs_client *, u32, unsigned short, struct rpc_cred *, struct nfs4_setclientid_res *);
 extern int nfs4_proc_setclientid_confirm(struct nfs_client *, struct nfs4_setclientid_res *arg, struct rpc_cred *);
 extern int nfs4_proc_get_rootfh(struct nfs_server *, struct nfs_fh *, struct nfs_fsinfo *, bool);
diff --git a/fs/nfs/nfs4client.c b/fs/nfs/nfs4client.c
index e42be52..3aa6a9b 100644
--- a/fs/nfs/nfs4client.c
+++ b/fs/nfs/nfs4client.c
@@ -676,7 +676,6 @@
 		break;
 	}
 
-	/* No matching nfs_client found. */
 	spin_unlock(&nn->nfs_client_lock);
 	dprintk("NFS: <-- %s status = %d\n", __func__, status);
 	nfs_put_client(prev);
diff --git a/fs/nfs/nfs4file.c b/fs/nfs/nfs4file.c
index f58c17b..dcd39d4 100644
--- a/fs/nfs/nfs4file.c
+++ b/fs/nfs/nfs4file.c
@@ -41,6 +41,10 @@
 
 	dprintk("NFS: open file(%pd2)\n", dentry);
 
+	err = nfs_check_flags(openflags);
+	if (err)
+		return err;
+
 	if ((openflags & O_ACCMODE) == 3)
 		openflags--;
 
diff --git a/fs/nfs/nfs4getroot.c b/fs/nfs/nfs4getroot.c
index c0b3a16..039b3eb 100644
--- a/fs/nfs/nfs4getroot.c
+++ b/fs/nfs/nfs4getroot.c
@@ -35,13 +35,6 @@
 		goto out;
 	}
 
-	if (fsinfo.fattr->valid & NFS_ATTR_FATTR_V4_REFERRAL) {
-		printk(KERN_ERR "nfs4_get_rootfh:"
-		       " getroot obtained referral\n");
-		ret = -EREMOTE;
-		goto out;
-	}
-
 	memcpy(&server->fsid, &fsinfo.fattr->fsid, sizeof(server->fsid));
 out:
 	nfs_free_fattr(fsinfo.fattr);
diff --git a/fs/nfs/nfs4idmap.c b/fs/nfs/nfs4idmap.c
index 2e1737c..535dfc6 100644
--- a/fs/nfs/nfs4idmap.c
+++ b/fs/nfs/nfs4idmap.c
@@ -494,12 +494,7 @@
 
 int nfs_idmap_init(void)
 {
-	int ret;
-	ret = nfs_idmap_init_keyring();
-	if (ret != 0)
-		goto out;
-out:
-	return ret;
+	return nfs_idmap_init_keyring();
 }
 
 void nfs_idmap_quit(void)
diff --git a/fs/nfs/nfs4proc.c b/fs/nfs/nfs4proc.c
index 55e1e3a..6f228b5 100644
--- a/fs/nfs/nfs4proc.c
+++ b/fs/nfs/nfs4proc.c
@@ -356,6 +356,9 @@
 		case 0:
 			return 0;
 		case -NFS4ERR_OPENMODE:
+		case -NFS4ERR_DELEG_REVOKED:
+		case -NFS4ERR_ADMIN_REVOKED:
+		case -NFS4ERR_BAD_STATEID:
 			if (inode && nfs4_have_delegation(inode, FMODE_READ)) {
 				nfs4_inode_return_delegation(inode);
 				exception->retry = 1;
@@ -367,15 +370,6 @@
 			if (ret < 0)
 				break;
 			goto wait_on_recovery;
-		case -NFS4ERR_DELEG_REVOKED:
-		case -NFS4ERR_ADMIN_REVOKED:
-		case -NFS4ERR_BAD_STATEID:
-			if (state == NULL)
-				break;
-			ret = nfs4_schedule_stateid_recovery(server, state);
-			if (ret < 0)
-				break;
-			goto wait_on_recovery;
 		case -NFS4ERR_EXPIRED:
 			if (state != NULL) {
 				ret = nfs4_schedule_stateid_recovery(server, state);
@@ -482,8 +476,8 @@
 	struct nfs4_sequence_res *seq_res;
 };
 
-static void nfs4_init_sequence(struct nfs4_sequence_args *args,
-			       struct nfs4_sequence_res *res, int cache_reply)
+void nfs4_init_sequence(struct nfs4_sequence_args *args,
+			struct nfs4_sequence_res *res, int cache_reply)
 {
 	args->sa_slot = NULL;
 	args->sa_cache_this = cache_reply;
@@ -1553,6 +1547,13 @@
 	struct nfs4_state *newstate;
 	int ret;
 
+	if ((opendata->o_arg.claim == NFS4_OPEN_CLAIM_DELEGATE_CUR ||
+	     opendata->o_arg.claim == NFS4_OPEN_CLAIM_DELEG_CUR_FH) &&
+	    (opendata->o_arg.u.delegation_type & fmode) != fmode)
+		/* This mode can't have been delegated, so we must have
+		 * a valid open_stateid to cover it - not need to reclaim.
+		 */
+		return 0;
 	opendata->o_arg.open_flags = 0;
 	opendata->o_arg.fmode = fmode;
 	opendata->o_arg.share_access = nfs4_map_atomic_open_share(
@@ -1684,6 +1685,7 @@
 					"%d.\n", __func__, err);
 		case 0:
 		case -ENOENT:
+		case -EAGAIN:
 		case -ESTALE:
 			break;
 		case -NFS4ERR_BADSESSION:
@@ -3355,6 +3357,8 @@
 			goto out;
 		case -NFS4ERR_MOVED:
 			err = nfs4_get_referral(client, dir, name, fattr, fhandle);
+			if (err == -NFS4ERR_MOVED)
+				err = nfs4_handle_exception(NFS_SERVER(dir), err, &exception);
 			goto out;
 		case -NFS4ERR_WRONGSEC:
 			err = -EPERM;
@@ -4955,49 +4959,128 @@
 	memcpy(bootverf->data, verf, sizeof(bootverf->data));
 }
 
-static unsigned int
-nfs4_init_nonuniform_client_string(struct nfs_client *clp,
-				   char *buf, size_t len)
+static int
+nfs4_init_nonuniform_client_string(struct nfs_client *clp)
 {
-	unsigned int result;
+	int result;
+	size_t len;
+	char *str;
+	bool retried = false;
 
 	if (clp->cl_owner_id != NULL)
-		return strlcpy(buf, clp->cl_owner_id, len);
+		return 0;
+retry:
+	rcu_read_lock();
+	len = 10 + strlen(clp->cl_ipaddr) + 1 +
+		strlen(rpc_peeraddr2str(clp->cl_rpcclient, RPC_DISPLAY_ADDR)) +
+		1 +
+		strlen(rpc_peeraddr2str(clp->cl_rpcclient, RPC_DISPLAY_PROTO)) +
+		1;
+	rcu_read_unlock();
+
+	if (len > NFS4_OPAQUE_LIMIT + 1)
+		return -EINVAL;
+
+	/*
+	 * Since this string is allocated at mount time, and held until the
+	 * nfs_client is destroyed, we can use GFP_KERNEL here w/o worrying
+	 * about a memory-reclaim deadlock.
+	 */
+	str = kmalloc(len, GFP_KERNEL);
+	if (!str)
+		return -ENOMEM;
 
 	rcu_read_lock();
-	result = scnprintf(buf, len, "Linux NFSv4.0 %s/%s %s",
-				clp->cl_ipaddr,
-				rpc_peeraddr2str(clp->cl_rpcclient,
-							RPC_DISPLAY_ADDR),
-				rpc_peeraddr2str(clp->cl_rpcclient,
-							RPC_DISPLAY_PROTO));
+	result = scnprintf(str, len, "Linux NFSv4.0 %s/%s %s",
+			clp->cl_ipaddr,
+			rpc_peeraddr2str(clp->cl_rpcclient, RPC_DISPLAY_ADDR),
+			rpc_peeraddr2str(clp->cl_rpcclient, RPC_DISPLAY_PROTO));
 	rcu_read_unlock();
-	clp->cl_owner_id = kstrdup(buf, GFP_KERNEL);
-	return result;
+
+	/* Did something change? */
+	if (result >= len) {
+		kfree(str);
+		if (retried)
+			return -EINVAL;
+		retried = true;
+		goto retry;
+	}
+	clp->cl_owner_id = str;
+	return 0;
 }
 
-static unsigned int
-nfs4_init_uniform_client_string(struct nfs_client *clp,
-				char *buf, size_t len)
+static int
+nfs4_init_uniquifier_client_string(struct nfs_client *clp)
 {
-	const char *nodename = clp->cl_rpcclient->cl_nodename;
-	unsigned int result;
+	int result;
+	size_t len;
+	char *str;
+
+	len = 10 + 10 + 1 + 10 + 1 +
+		strlen(nfs4_client_id_uniquifier) + 1 +
+		strlen(clp->cl_rpcclient->cl_nodename) + 1;
+
+	if (len > NFS4_OPAQUE_LIMIT + 1)
+		return -EINVAL;
+
+	/*
+	 * Since this string is allocated at mount time, and held until the
+	 * nfs_client is destroyed, we can use GFP_KERNEL here w/o worrying
+	 * about a memory-reclaim deadlock.
+	 */
+	str = kmalloc(len, GFP_KERNEL);
+	if (!str)
+		return -ENOMEM;
+
+	result = scnprintf(str, len, "Linux NFSv%u.%u %s/%s",
+			clp->rpc_ops->version, clp->cl_minorversion,
+			nfs4_client_id_uniquifier,
+			clp->cl_rpcclient->cl_nodename);
+	if (result >= len) {
+		kfree(str);
+		return -EINVAL;
+	}
+	clp->cl_owner_id = str;
+	return 0;
+}
+
+static int
+nfs4_init_uniform_client_string(struct nfs_client *clp)
+{
+	int result;
+	size_t len;
+	char *str;
 
 	if (clp->cl_owner_id != NULL)
-		return strlcpy(buf, clp->cl_owner_id, len);
+		return 0;
 
 	if (nfs4_client_id_uniquifier[0] != '\0')
-		result = scnprintf(buf, len, "Linux NFSv%u.%u %s/%s",
-				clp->rpc_ops->version,
-				clp->cl_minorversion,
-				nfs4_client_id_uniquifier,
-				nodename);
-	else
-		result = scnprintf(buf, len, "Linux NFSv%u.%u %s",
-				clp->rpc_ops->version, clp->cl_minorversion,
-				nodename);
-	clp->cl_owner_id = kstrdup(buf, GFP_KERNEL);
-	return result;
+		return nfs4_init_uniquifier_client_string(clp);
+
+	len = 10 + 10 + 1 + 10 + 1 +
+		strlen(clp->cl_rpcclient->cl_nodename) + 1;
+
+	if (len > NFS4_OPAQUE_LIMIT + 1)
+		return -EINVAL;
+
+	/*
+	 * Since this string is allocated at mount time, and held until the
+	 * nfs_client is destroyed, we can use GFP_KERNEL here w/o worrying
+	 * about a memory-reclaim deadlock.
+	 */
+	str = kmalloc(len, GFP_KERNEL);
+	if (!str)
+		return -ENOMEM;
+
+	result = scnprintf(str, len, "Linux NFSv%u.%u %s",
+			clp->rpc_ops->version, clp->cl_minorversion,
+			clp->cl_rpcclient->cl_nodename);
+	if (result >= len) {
+		kfree(str);
+		return -EINVAL;
+	}
+	clp->cl_owner_id = str;
+	return 0;
 }
 
 /*
@@ -5044,7 +5127,7 @@
 	struct nfs4_setclientid setclientid = {
 		.sc_verifier = &sc_verifier,
 		.sc_prog = program,
-		.sc_cb_ident = clp->cl_cb_ident,
+		.sc_clnt = clp,
 	};
 	struct rpc_message msg = {
 		.rpc_proc = &nfs4_procedures[NFSPROC4_CLNT_SETCLIENTID],
@@ -5064,16 +5147,15 @@
 
 	/* nfs_client_id4 */
 	nfs4_init_boot_verifier(clp, &sc_verifier);
+
 	if (test_bit(NFS_CS_MIGRATION, &clp->cl_flags))
-		setclientid.sc_name_len =
-				nfs4_init_uniform_client_string(clp,
-						setclientid.sc_name,
-						sizeof(setclientid.sc_name));
+		status = nfs4_init_uniform_client_string(clp);
 	else
-		setclientid.sc_name_len =
-				nfs4_init_nonuniform_client_string(clp,
-						setclientid.sc_name,
-						sizeof(setclientid.sc_name));
+		status = nfs4_init_nonuniform_client_string(clp);
+
+	if (status)
+		goto out;
+
 	/* cb_client4 */
 	setclientid.sc_netid_len =
 				nfs4_init_callback_netid(clp,
@@ -5083,9 +5165,9 @@
 				sizeof(setclientid.sc_uaddr), "%s.%u.%u",
 				clp->cl_ipaddr, port >> 8, port & 255);
 
-	dprintk("NFS call  setclientid auth=%s, '%.*s'\n",
+	dprintk("NFS call  setclientid auth=%s, '%s'\n",
 		clp->cl_rpcclient->cl_auth->au_ops->au_name,
-		setclientid.sc_name_len, setclientid.sc_name);
+		clp->cl_owner_id);
 	task = rpc_run_task(&task_setup_data);
 	if (IS_ERR(task)) {
 		status = PTR_ERR(task);
@@ -5402,6 +5484,7 @@
 	atomic_inc(&lsp->ls_count);
 	/* Ensure we don't close file until we're done freeing locks! */
 	p->ctx = get_nfs_open_context(ctx);
+	get_file(fl->fl_file);
 	memcpy(&p->fl, fl, sizeof(p->fl));
 	p->server = NFS_SERVER(inode);
 	return p;
@@ -5413,6 +5496,7 @@
 	nfs_free_seqid(calldata->arg.seqid);
 	nfs4_put_lock_state(calldata->lsp);
 	put_nfs_open_context(calldata->ctx);
+	fput(calldata->fl.fl_file);
 	kfree(calldata);
 }
 
@@ -6846,11 +6930,14 @@
 	};
 
 	nfs4_init_boot_verifier(clp, &verifier);
-	args.id_len = nfs4_init_uniform_client_string(clp, args.id,
-							sizeof(args.id));
-	dprintk("NFS call  exchange_id auth=%s, '%.*s'\n",
+
+	status = nfs4_init_uniform_client_string(clp);
+	if (status)
+		goto out;
+
+	dprintk("NFS call  exchange_id auth=%s, '%s'\n",
 		clp->cl_rpcclient->cl_auth->au_ops->au_name,
-		args.id_len, args.id);
+		clp->cl_owner_id);
 
 	res.server_owner = kzalloc(sizeof(struct nfs41_server_owner),
 					GFP_NOFS);
@@ -6885,7 +6972,7 @@
 		/* unsupported! */
 		WARN_ON_ONCE(1);
 		status = -EINVAL;
-		goto out_server_scope;
+		goto out_impl_id;
 	}
 
 	status = rpc_call_sync(clp->cl_rpcclient, &msg, RPC_TASK_TIMEOUT);
@@ -6913,6 +7000,7 @@
 		/* use the most recent implementation id */
 		kfree(clp->cl_implid);
 		clp->cl_implid = res.impl_id;
+		res.impl_id = NULL;
 
 		if (clp->cl_serverscope != NULL &&
 		    !nfs41_same_server_scope(clp->cl_serverscope,
@@ -6926,15 +7014,16 @@
 
 		if (clp->cl_serverscope == NULL) {
 			clp->cl_serverscope = res.server_scope;
-			goto out;
+			res.server_scope = NULL;
 		}
-	} else
-		kfree(res.impl_id);
+	}
 
-out_server_owner:
-	kfree(res.server_owner);
+out_impl_id:
+	kfree(res.impl_id);
 out_server_scope:
 	kfree(res.server_scope);
+out_server_owner:
+	kfree(res.server_owner);
 out:
 	if (clp->cl_implid != NULL)
 		dprintk("NFS reply exchange_id: Server Implementation ID: "
@@ -8061,9 +8150,8 @@
 	struct rpc_task *task;
 	int status = 0;
 
-	dprintk("NFS: %4d initiating layoutcommit call. sync %d "
-		"lbw: %llu inode %lu\n",
-		data->task.tk_pid, sync,
+	dprintk("NFS: initiating layoutcommit call. sync %d "
+		"lbw: %llu inode %lu\n", sync,
 		data->args.lastbytewritten,
 		data->args.inode->i_ino);
 
@@ -8557,7 +8645,8 @@
 		| NFS_CAP_ATOMIC_OPEN_V1
 		| NFS_CAP_ALLOCATE
 		| NFS_CAP_DEALLOCATE
-		| NFS_CAP_SEEK,
+		| NFS_CAP_SEEK
+		| NFS_CAP_LAYOUTSTATS,
 	.init_client = nfs41_init_client,
 	.shutdown_client = nfs41_shutdown_client,
 	.match_stateid = nfs41_match_stateid,
diff --git a/fs/nfs/nfs4state.c b/fs/nfs/nfs4state.c
index 2782cfc..605840d 100644
--- a/fs/nfs/nfs4state.c
+++ b/fs/nfs/nfs4state.c
@@ -309,7 +309,6 @@
 
 	if (test_bit(NFS4CLNT_LEASE_CONFIRM, &clp->cl_state))
 		goto do_confirm;
-	nfs4_begin_drain_session(clp);
 	status = nfs4_proc_exchange_id(clp, cred);
 	if (status != 0)
 		goto out;
@@ -1482,6 +1481,8 @@
 					spin_unlock(&state->state_lock);
 				}
 				nfs4_put_open_state(state);
+				clear_bit(NFS4CLNT_RECLAIM_NOGRACE,
+					&state->flags);
 				spin_lock(&sp->so_lock);
 				goto restart;
 			}
@@ -1830,6 +1831,7 @@
 		clp->cl_mvops->reboot_recovery_ops;
 	int status;
 
+	nfs4_begin_drain_session(clp);
 	cred = nfs4_get_clid_cred(clp);
 	if (cred == NULL)
 		return -ENOENT;
diff --git a/fs/nfs/nfs4xdr.c b/fs/nfs/nfs4xdr.c
index 0aea978..558cd65d 100644
--- a/fs/nfs/nfs4xdr.c
+++ b/fs/nfs/nfs4xdr.c
@@ -139,7 +139,8 @@
 #define encode_setclientid_maxsz \
 				(op_encode_hdr_maxsz + \
 				XDR_QUADLEN(NFS4_VERIFIER_SIZE) + \
-				XDR_QUADLEN(NFS4_SETCLIENTID_NAMELEN) + \
+				/* client name */ \
+				1 + XDR_QUADLEN(NFS4_OPAQUE_LIMIT) + \
 				1 /* sc_prog */ + \
 				1 + XDR_QUADLEN(RPCBIND_MAXNETIDLEN) + \
 				1 + XDR_QUADLEN(RPCBIND_MAXUADDRLEN) + \
@@ -288,7 +289,8 @@
 #define encode_exchange_id_maxsz (op_encode_hdr_maxsz + \
 				encode_verifier_maxsz + \
 				1 /* co_ownerid.len */ + \
-				XDR_QUADLEN(NFS4_EXCHANGE_ID_LEN) + \
+				/* eia_clientowner */ \
+				1 + XDR_QUADLEN(NFS4_OPAQUE_LIMIT) + \
 				1 /* flags */ + \
 				1 /* spa_how */ + \
 				/* max is SP4_MACH_CRED (for now) */ + \
@@ -1667,13 +1669,14 @@
 	encode_op_hdr(xdr, OP_SETCLIENTID, decode_setclientid_maxsz, hdr);
 	encode_nfs4_verifier(xdr, setclientid->sc_verifier);
 
-	encode_string(xdr, setclientid->sc_name_len, setclientid->sc_name);
+	encode_string(xdr, strlen(setclientid->sc_clnt->cl_owner_id),
+			setclientid->sc_clnt->cl_owner_id);
 	p = reserve_space(xdr, 4);
 	*p = cpu_to_be32(setclientid->sc_prog);
 	encode_string(xdr, setclientid->sc_netid_len, setclientid->sc_netid);
 	encode_string(xdr, setclientid->sc_uaddr_len, setclientid->sc_uaddr);
 	p = reserve_space(xdr, 4);
-	*p = cpu_to_be32(setclientid->sc_cb_ident);
+	*p = cpu_to_be32(setclientid->sc_clnt->cl_cb_ident);
 }
 
 static void encode_setclientid_confirm(struct xdr_stream *xdr, const struct nfs4_setclientid_res *arg, struct compound_hdr *hdr)
@@ -1747,7 +1750,8 @@
 	encode_op_hdr(xdr, OP_EXCHANGE_ID, decode_exchange_id_maxsz, hdr);
 	encode_nfs4_verifier(xdr, args->verifier);
 
-	encode_string(xdr, args->id_len, args->id);
+	encode_string(xdr, strlen(args->client->cl_owner_id),
+			args->client->cl_owner_id);
 
 	encode_uint32(xdr, args->flags);
 	encode_uint32(xdr, args->state_protect.how);
@@ -7427,6 +7431,7 @@
 	PROC(SEEK,		enc_seek,		dec_seek),
 	PROC(ALLOCATE,		enc_allocate,		dec_allocate),
 	PROC(DEALLOCATE,	enc_deallocate,		dec_deallocate),
+	PROC(LAYOUTSTATS,	enc_layoutstats,	dec_layoutstats),
 #endif /* CONFIG_NFS_V4_2 */
 };
 
diff --git a/fs/nfs/pagelist.c b/fs/nfs/pagelist.c
index 282b393..1da68d3 100644
--- a/fs/nfs/pagelist.c
+++ b/fs/nfs/pagelist.c
@@ -636,9 +636,8 @@
 
 	hdr->rw_ops->rw_initiate(hdr, &msg, rpc_ops, &task_setup_data, how);
 
-	dprintk("NFS: %5u initiated pgio call "
+	dprintk("NFS: initiated pgio call "
 		"(req %s/%llu, %u bytes @ offset %llu)\n",
-		hdr->task.tk_pid,
 		hdr->inode->i_sb->s_id,
 		(unsigned long long)NFS_FILEID(hdr->inode),
 		hdr->args.count,
@@ -690,8 +689,6 @@
 static void nfs_pgio_release(void *calldata)
 {
 	struct nfs_pgio_header *hdr = calldata;
-	if (hdr->rw_ops->rw_release)
-		hdr->rw_ops->rw_release(hdr);
 	nfs_pgio_data_destroy(hdr);
 	hdr->completion_ops->completion(hdr);
 }
@@ -711,7 +708,9 @@
  * nfs_pageio_init - initialise a page io descriptor
  * @desc: pointer to descriptor
  * @inode: pointer to inode
- * @doio: pointer to io function
+ * @pg_ops: pointer to pageio operations
+ * @compl_ops: pointer to pageio completion operations
+ * @rw_ops: pointer to nfs read/write operations
  * @bsize: io block size
  * @io_flags: extra parameters for the io function
  */
@@ -1186,6 +1185,7 @@
  * nfs_pageio_complete_mirror - Complete I/O on the current mirror of an
  *				nfs_pageio_descriptor
  * @desc: pointer to io descriptor
+ * @mirror_idx: pointer to mirror index
  */
 static void nfs_pageio_complete_mirror(struct nfs_pageio_descriptor *desc,
 				       u32 mirror_idx)
diff --git a/fs/nfs/pnfs.c b/fs/nfs/pnfs.c
index 2306062..0ba9a02 100644
--- a/fs/nfs/pnfs.c
+++ b/fs/nfs/pnfs.c
@@ -35,6 +35,7 @@
 #include "iostat.h"
 #include "nfs4trace.h"
 #include "delegation.h"
+#include "nfs42.h"
 
 #define NFSDBG_FACILITY		NFSDBG_PNFS
 #define PNFS_LAYOUTGET_RETRY_TIMEOUT (120*HZ)
@@ -1821,6 +1822,7 @@
 	/* Resend all requests through the MDS */
 	nfs_pageio_init_write(&pgio, hdr->inode, FLUSH_STABLE, true,
 			      hdr->completion_ops);
+	set_bit(NFS_CONTEXT_RESEND_WRITES, &hdr->args.context->flags);
 	return nfs_pageio_resend(&pgio, hdr);
 }
 EXPORT_SYMBOL_GPL(pnfs_write_done_resend_to_mds);
@@ -1865,6 +1867,7 @@
 		mirror->pg_recoalesce = 1;
 	}
 	nfs_pgio_data_destroy(hdr);
+	hdr->release(hdr);
 }
 
 static enum pnfs_try_status
@@ -1979,6 +1982,7 @@
 		mirror->pg_recoalesce = 1;
 	}
 	nfs_pgio_data_destroy(hdr);
+	hdr->release(hdr);
 }
 
 /*
@@ -2247,3 +2251,63 @@
 	}
 	return thp;
 }
+
+#if IS_ENABLED(CONFIG_NFS_V4_2)
+int
+pnfs_report_layoutstat(struct inode *inode)
+{
+	struct pnfs_layoutdriver_type *ld = NFS_SERVER(inode)->pnfs_curr_ld;
+	struct nfs_server *server = NFS_SERVER(inode);
+	struct nfs_inode *nfsi = NFS_I(inode);
+	struct nfs42_layoutstat_data *data;
+	struct pnfs_layout_hdr *hdr;
+	int status = 0;
+
+	if (!pnfs_enabled_sb(server) || !ld->prepare_layoutstats)
+		goto out;
+
+	if (!nfs_server_capable(inode, NFS_CAP_LAYOUTSTATS))
+		goto out;
+
+	if (test_and_set_bit(NFS_INO_LAYOUTSTATS, &nfsi->flags))
+		goto out;
+
+	spin_lock(&inode->i_lock);
+	if (!NFS_I(inode)->layout) {
+		spin_unlock(&inode->i_lock);
+		goto out;
+	}
+	hdr = NFS_I(inode)->layout;
+	pnfs_get_layout_hdr(hdr);
+	spin_unlock(&inode->i_lock);
+
+	data = kzalloc(sizeof(*data), GFP_KERNEL);
+	if (!data) {
+		status = -ENOMEM;
+		goto out_put;
+	}
+
+	data->args.fh = NFS_FH(inode);
+	data->args.inode = inode;
+	nfs4_stateid_copy(&data->args.stateid, &hdr->plh_stateid);
+	status = ld->prepare_layoutstats(&data->args);
+	if (status)
+		goto out_free;
+
+	status = nfs42_proc_layoutstats_generic(NFS_SERVER(inode), data);
+
+out:
+	dprintk("%s returns %d\n", __func__, status);
+	return status;
+
+out_free:
+	kfree(data);
+out_put:
+	pnfs_put_layout_hdr(hdr);
+	smp_mb__before_atomic();
+	clear_bit(NFS_INO_LAYOUTSTATS, &nfsi->flags);
+	smp_mb__after_atomic();
+	goto out;
+}
+EXPORT_SYMBOL_GPL(pnfs_report_layoutstat);
+#endif
diff --git a/fs/nfs/pnfs.h b/fs/nfs/pnfs.h
index 1e6308f..3e6ab7b 100644
--- a/fs/nfs/pnfs.h
+++ b/fs/nfs/pnfs.h
@@ -178,6 +178,8 @@
 	void (*encode_layoutcommit) (struct pnfs_layout_hdr *lo,
 				     struct xdr_stream *xdr,
 				     const struct nfs4_layoutcommit_args *args);
+	int (*prepare_layoutstats) (struct nfs42_layoutstat_args *args);
+	void (*cleanup_layoutstats) (struct nfs42_layoutstat_data *data);
 };
 
 struct pnfs_layout_hdr {
@@ -290,7 +292,6 @@
 struct nfs4_threshold *pnfs_mdsthreshold_alloc(void);
 void pnfs_error_mark_layout_for_return(struct inode *inode,
 				       struct pnfs_layout_segment *lseg);
-
 /* nfs4_deviceid_flags */
 enum {
 	NFS_DEVICEID_INVALID = 0,       /* set when MDS clientid recalled */
@@ -689,4 +690,14 @@
 
 #endif /* CONFIG_NFS_V4_1 */
 
+#if IS_ENABLED(CONFIG_NFS_V4_2)
+int pnfs_report_layoutstat(struct inode *inode);
+#else
+static inline int
+pnfs_report_layoutstat(struct inode *inode)
+{
+	return 0;
+}
+#endif
+
 #endif /* FS_NFS_PNFS_H */
diff --git a/fs/nfs/write.c b/fs/nfs/write.c
index e6c2625..65869ca 100644
--- a/fs/nfs/write.c
+++ b/fs/nfs/write.c
@@ -1290,6 +1290,7 @@
 static void nfs_redirty_request(struct nfs_page *req)
 {
 	nfs_mark_request_dirty(req);
+	set_bit(NFS_CONTEXT_RESEND_WRITES, &req->wb_context->flags);
 	nfs_unlock_request(req);
 	nfs_end_page_writeback(req);
 	nfs_release_request(req);
@@ -1348,11 +1349,6 @@
 	NFS_PROTO(data->inode)->commit_rpc_prepare(task, data);
 }
 
-static void nfs_writeback_release_common(struct nfs_pgio_header *hdr)
-{
-	/* do nothing! */
-}
-
 /*
  * Special version of should_remove_suid() that ignores capabilities.
  */
@@ -1556,7 +1552,7 @@
 	/* Set up the initial task struct.  */
 	nfs_ops->commit_setup(data, &msg);
 
-	dprintk("NFS: %5u initiated commit call\n", data->task.tk_pid);
+	dprintk("NFS: initiated commit call\n");
 
 	nfs4_state_protect(NFS_SERVER(data->inode)->nfs_client,
 		NFS_SP4_MACH_CRED_COMMIT, &task_setup_data.rpc_client, &msg);
@@ -2013,7 +2009,6 @@
 	.rw_mode		= FMODE_WRITE,
 	.rw_alloc_header	= nfs_writehdr_alloc,
 	.rw_free_header		= nfs_writehdr_free,
-	.rw_release		= nfs_writeback_release_common,
 	.rw_done		= nfs_writeback_done,
 	.rw_result		= nfs_writeback_result,
 	.rw_initiate		= nfs_initiate_write,
diff --git a/include/linux/nfs4.h b/include/linux/nfs4.h
index 32201c2..b8e72aa 100644
--- a/include/linux/nfs4.h
+++ b/include/linux/nfs4.h
@@ -500,6 +500,7 @@
 	NFSPROC4_CLNT_SEEK,
 	NFSPROC4_CLNT_ALLOCATE,
 	NFSPROC4_CLNT_DEALLOCATE,
+	NFSPROC4_CLNT_LAYOUTSTATS,
 };
 
 /* nfs41 types */
diff --git a/include/linux/nfs_fs.h b/include/linux/nfs_fs.h
index b95f914..f91b5ad 100644
--- a/include/linux/nfs_fs.h
+++ b/include/linux/nfs_fs.h
@@ -219,6 +219,7 @@
 #define NFS_INO_COMMIT		(7)		/* inode is committing unstable writes */
 #define NFS_INO_LAYOUTCOMMIT	(9)		/* layoutcommit required */
 #define NFS_INO_LAYOUTCOMMITTING (10)		/* layoutcommit inflight */
+#define NFS_INO_LAYOUTSTATS	(11)		/* layoutstats inflight */
 
 static inline struct nfs_inode *NFS_I(const struct inode *inode)
 {
diff --git a/include/linux/nfs_fs_sb.h b/include/linux/nfs_fs_sb.h
index 5e1273d..a2ea149 100644
--- a/include/linux/nfs_fs_sb.h
+++ b/include/linux/nfs_fs_sb.h
@@ -237,5 +237,6 @@
 #define NFS_CAP_SEEK		(1U << 19)
 #define NFS_CAP_ALLOCATE	(1U << 20)
 #define NFS_CAP_DEALLOCATE	(1U << 21)
+#define NFS_CAP_LAYOUTSTATS	(1U << 22)
 
 #endif
diff --git a/include/linux/nfs_page.h b/include/linux/nfs_page.h
index 3eb072d..f2f650f 100644
--- a/include/linux/nfs_page.h
+++ b/include/linux/nfs_page.h
@@ -67,7 +67,6 @@
 	const fmode_t rw_mode;
 	struct nfs_pgio_header *(*rw_alloc_header)(void);
 	void (*rw_free_header)(struct nfs_pgio_header *);
-	void (*rw_release)(struct nfs_pgio_header *);
 	int  (*rw_done)(struct rpc_task *, struct nfs_pgio_header *,
 			struct inode *);
 	void (*rw_result)(struct rpc_task *, struct nfs_pgio_header *);
diff --git a/include/linux/nfs_xdr.h b/include/linux/nfs_xdr.h
index 93ab607..7bbe505 100644
--- a/include/linux/nfs_xdr.h
+++ b/include/linux/nfs_xdr.h
@@ -316,6 +316,49 @@
 	int rpc_status;
 };
 
+#define PNFS_LAYOUTSTATS_MAXSIZE 256
+
+struct nfs42_layoutstat_args;
+struct nfs42_layoutstat_devinfo;
+typedef	void (*layoutstats_encode_t)(struct xdr_stream *,
+		struct nfs42_layoutstat_args *,
+		struct nfs42_layoutstat_devinfo *);
+
+/* Per file per deviceid layoutstats */
+struct nfs42_layoutstat_devinfo {
+	struct nfs4_deviceid dev_id;
+	__u64 offset;
+	__u64 length;
+	__u64 read_count;
+	__u64 read_bytes;
+	__u64 write_count;
+	__u64 write_bytes;
+	__u32 layout_type;
+	layoutstats_encode_t layoutstats_encode;
+	void *layout_private;
+};
+
+struct nfs42_layoutstat_args {
+	struct nfs4_sequence_args seq_args;
+	struct nfs_fh *fh;
+	struct inode *inode;
+	nfs4_stateid stateid;
+	int num_dev;
+	struct nfs42_layoutstat_devinfo *devinfo;
+};
+
+struct nfs42_layoutstat_res {
+	struct nfs4_sequence_res seq_res;
+	int num_dev;
+	int rpc_status;
+};
+
+struct nfs42_layoutstat_data {
+	struct inode *inode;
+	struct nfs42_layoutstat_args args;
+	struct nfs42_layoutstat_res res;
+};
+
 struct stateowner_id {
 	__u64	create_time;
 	__u32	uniquifier;
@@ -984,17 +1027,14 @@
 	struct nfs4_sequence_res	seq_res;
 };
 
-#define NFS4_SETCLIENTID_NAMELEN	(127)
 struct nfs4_setclientid {
 	const nfs4_verifier *		sc_verifier;
-	unsigned int			sc_name_len;
-	char				sc_name[NFS4_SETCLIENTID_NAMELEN + 1];
 	u32				sc_prog;
 	unsigned int			sc_netid_len;
 	char				sc_netid[RPCBIND_MAXNETIDLEN + 1];
 	unsigned int			sc_uaddr_len;
 	char				sc_uaddr[RPCBIND_MAXUADDRLEN + 1];
-	u32				sc_cb_ident;
+	struct nfs_client		*sc_clnt;
 	struct rpc_cred			*sc_cred;
 };
 
@@ -1142,12 +1182,9 @@
 	struct nfs4_op_map allow;
 };
 
-#define NFS4_EXCHANGE_ID_LEN	(48)
 struct nfs41_exchange_id_args {
 	struct nfs_client		*client;
 	nfs4_verifier			*verifier;
-	unsigned int 			id_len;
-	char 				id[NFS4_EXCHANGE_ID_LEN];
 	u32				flags;
 	struct nfs41_state_protection	state_protect;
 };
diff --git a/include/linux/sunrpc/bc_xprt.h b/include/linux/sunrpc/bc_xprt.h
index 2ca67b5..8df43c9f 100644
--- a/include/linux/sunrpc/bc_xprt.h
+++ b/include/linux/sunrpc/bc_xprt.h
@@ -37,7 +37,6 @@
 void xprt_free_bc_request(struct rpc_rqst *req);
 int xprt_setup_backchannel(struct rpc_xprt *, unsigned int min_reqs);
 void xprt_destroy_backchannel(struct rpc_xprt *, unsigned int max_reqs);
-int bc_send(struct rpc_rqst *req);
 
 /*
  * Determine if a shared backchannel is in use
diff --git a/include/linux/sunrpc/clnt.h b/include/linux/sunrpc/clnt.h
index 598ba80..131032f 100644
--- a/include/linux/sunrpc/clnt.h
+++ b/include/linux/sunrpc/clnt.h
@@ -56,6 +56,7 @@
 	struct rpc_rtt *	cl_rtt;		/* RTO estimator data */
 	const struct rpc_timeout *cl_timeout;	/* Timeout strategy */
 
+	atomic_t		cl_swapper;	/* swapfile count */
 	int			cl_nodelen;	/* nodename length */
 	char 			cl_nodename[UNX_MAXNODENAME+1];
 	struct rpc_pipe_dir_head cl_pipedir_objects;
diff --git a/include/linux/sunrpc/sched.h b/include/linux/sunrpc/sched.h
index 5f1e6bd..d703f0e 100644
--- a/include/linux/sunrpc/sched.h
+++ b/include/linux/sunrpc/sched.h
@@ -205,8 +205,7 @@
  */
 struct rpc_task *rpc_new_task(const struct rpc_task_setup *);
 struct rpc_task *rpc_run_task(const struct rpc_task_setup *);
-struct rpc_task *rpc_run_bc_task(struct rpc_rqst *req,
-				const struct rpc_call_ops *ops);
+struct rpc_task *rpc_run_bc_task(struct rpc_rqst *req);
 void		rpc_put_task(struct rpc_task *);
 void		rpc_put_task_async(struct rpc_task *);
 void		rpc_exit_task(struct rpc_task *);
@@ -269,4 +268,20 @@
 }
 #endif
 
+#if IS_ENABLED(CONFIG_SUNRPC_SWAP)
+int rpc_clnt_swap_activate(struct rpc_clnt *clnt);
+void rpc_clnt_swap_deactivate(struct rpc_clnt *clnt);
+#else
+static inline int
+rpc_clnt_swap_activate(struct rpc_clnt *clnt)
+{
+	return -EINVAL;
+}
+
+static inline void
+rpc_clnt_swap_deactivate(struct rpc_clnt *clnt)
+{
+}
+#endif /* CONFIG_SUNRPC_SWAP */
+
 #endif /* _LINUX_SUNRPC_SCHED_H_ */
diff --git a/include/linux/sunrpc/xprt.h b/include/linux/sunrpc/xprt.h
index 8b93ef5..0fb9acb 100644
--- a/include/linux/sunrpc/xprt.h
+++ b/include/linux/sunrpc/xprt.h
@@ -133,6 +133,9 @@
 	void		(*close)(struct rpc_xprt *xprt);
 	void		(*destroy)(struct rpc_xprt *xprt);
 	void		(*print_stats)(struct rpc_xprt *xprt, struct seq_file *seq);
+	int		(*enable_swap)(struct rpc_xprt *xprt);
+	void		(*disable_swap)(struct rpc_xprt *xprt);
+	void		(*inject_disconnect)(struct rpc_xprt *xprt);
 };
 
 /*
@@ -180,7 +183,7 @@
 	atomic_t		num_reqs;	/* total slots */
 	unsigned long		state;		/* transport state */
 	unsigned char		resvport   : 1; /* use a reserved port */
-	unsigned int		swapper;	/* we're swapping over this
+	atomic_t		swapper;	/* we're swapping over this
 						   transport */
 	unsigned int		bind_index;	/* bind function index */
 
@@ -212,7 +215,8 @@
 #if defined(CONFIG_SUNRPC_BACKCHANNEL)
 	struct svc_serv		*bc_serv;       /* The RPC service which will */
 						/* process the callback */
-	unsigned int		bc_alloc_count;	/* Total number of preallocs */
+	int			bc_alloc_count;	/* Total number of preallocs */
+	atomic_t		bc_free_slots;
 	spinlock_t		bc_pa_lock;	/* Protects the preallocated
 						 * items */
 	struct list_head	bc_pa_list;	/* List of preallocated
@@ -241,6 +245,7 @@
 	const char		*address_strings[RPC_DISPLAY_MAX];
 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
 	struct dentry		*debugfs;		/* debugfs directory */
+	atomic_t		inject_disconnect;
 #endif
 };
 
@@ -327,6 +332,18 @@
 	return p + xprt->tsh_size;
 }
 
+static inline int
+xprt_enable_swap(struct rpc_xprt *xprt)
+{
+	return xprt->ops->enable_swap(xprt);
+}
+
+static inline void
+xprt_disable_swap(struct rpc_xprt *xprt)
+{
+	xprt->ops->disable_swap(xprt);
+}
+
 /*
  * Transport switch helper functions
  */
@@ -345,7 +362,6 @@
 void			xprt_disconnect_done(struct rpc_xprt *xprt);
 void			xprt_force_disconnect(struct rpc_xprt *xprt);
 void			xprt_conditional_disconnect(struct rpc_xprt *xprt, unsigned int cookie);
-int			xs_swapper(struct rpc_xprt *xprt, int enable);
 
 bool			xprt_lock_connect(struct rpc_xprt *, struct rpc_task *, void *);
 void			xprt_unlock_connect(struct rpc_xprt *, void *);
@@ -431,6 +447,23 @@
 	return test_and_set_bit(XPRT_BINDING, &xprt->state);
 }
 
+#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
+extern unsigned int rpc_inject_disconnect;
+static inline void xprt_inject_disconnect(struct rpc_xprt *xprt)
+{
+	if (!rpc_inject_disconnect)
+		return;
+	if (atomic_dec_return(&xprt->inject_disconnect))
+		return;
+	atomic_set(&xprt->inject_disconnect, rpc_inject_disconnect);
+	xprt->ops->inject_disconnect(xprt);
+}
+#else
+static inline void xprt_inject_disconnect(struct rpc_xprt *xprt)
+{
+}
+#endif
+
 #endif /* __KERNEL__*/
 
 #endif /* _LINUX_SUNRPC_XPRT_H */
diff --git a/include/linux/sunrpc/xprtrdma.h b/include/linux/sunrpc/xprtrdma.h
index c984c85..b176130 100644
--- a/include/linux/sunrpc/xprtrdma.h
+++ b/include/linux/sunrpc/xprtrdma.h
@@ -56,7 +56,8 @@
 
 #define RPCRDMA_INLINE_PAD_THRESH  (512)/* payload threshold to pad (bytes) */
 
-/* memory registration strategies */
+/* Memory registration strategies, by number.
+ * This is part of a kernel / user space API. Do not remove. */
 enum rpcrdma_memreg {
 	RPCRDMA_BOUNCEBUFFERS = 0,
 	RPCRDMA_REGISTER,
diff --git a/net/sunrpc/Makefile b/net/sunrpc/Makefile
index 936ad0a..b512fbd 100644
--- a/net/sunrpc/Makefile
+++ b/net/sunrpc/Makefile
@@ -14,6 +14,6 @@
 	    sunrpc_syms.o cache.o rpc_pipe.o \
 	    svc_xprt.o
 sunrpc-$(CONFIG_SUNRPC_DEBUG) += debugfs.o
-sunrpc-$(CONFIG_SUNRPC_BACKCHANNEL) += backchannel_rqst.o bc_svc.o
+sunrpc-$(CONFIG_SUNRPC_BACKCHANNEL) += backchannel_rqst.o
 sunrpc-$(CONFIG_PROC_FS) += stats.o
 sunrpc-$(CONFIG_SYSCTL) += sysctl.o
diff --git a/net/sunrpc/backchannel_rqst.c b/net/sunrpc/backchannel_rqst.c
index 9dd0ea8d..9825ff0 100644
--- a/net/sunrpc/backchannel_rqst.c
+++ b/net/sunrpc/backchannel_rqst.c
@@ -37,16 +37,18 @@
  */
 static inline int xprt_need_to_requeue(struct rpc_xprt *xprt)
 {
-	return xprt->bc_alloc_count > 0;
+	return xprt->bc_alloc_count < atomic_read(&xprt->bc_free_slots);
 }
 
 static inline void xprt_inc_alloc_count(struct rpc_xprt *xprt, unsigned int n)
 {
+	atomic_add(n, &xprt->bc_free_slots);
 	xprt->bc_alloc_count += n;
 }
 
 static inline int xprt_dec_alloc_count(struct rpc_xprt *xprt, unsigned int n)
 {
+	atomic_sub(n, &xprt->bc_free_slots);
 	return xprt->bc_alloc_count -= n;
 }
 
@@ -60,13 +62,62 @@
 
 	dprintk("RPC:        free allocations for req= %p\n", req);
 	WARN_ON_ONCE(test_bit(RPC_BC_PA_IN_USE, &req->rq_bc_pa_state));
-	xbufp = &req->rq_private_buf;
+	xbufp = &req->rq_rcv_buf;
 	free_page((unsigned long)xbufp->head[0].iov_base);
 	xbufp = &req->rq_snd_buf;
 	free_page((unsigned long)xbufp->head[0].iov_base);
 	kfree(req);
 }
 
+static int xprt_alloc_xdr_buf(struct xdr_buf *buf, gfp_t gfp_flags)
+{
+	struct page *page;
+	/* Preallocate one XDR receive buffer */
+	page = alloc_page(gfp_flags);
+	if (page == NULL)
+		return -ENOMEM;
+	buf->head[0].iov_base = page_address(page);
+	buf->head[0].iov_len = PAGE_SIZE;
+	buf->tail[0].iov_base = NULL;
+	buf->tail[0].iov_len = 0;
+	buf->page_len = 0;
+	buf->len = 0;
+	buf->buflen = PAGE_SIZE;
+	return 0;
+}
+
+static
+struct rpc_rqst *xprt_alloc_bc_req(struct rpc_xprt *xprt, gfp_t gfp_flags)
+{
+	struct rpc_rqst *req;
+
+	/* Pre-allocate one backchannel rpc_rqst */
+	req = kzalloc(sizeof(*req), gfp_flags);
+	if (req == NULL)
+		return NULL;
+
+	req->rq_xprt = xprt;
+	INIT_LIST_HEAD(&req->rq_list);
+	INIT_LIST_HEAD(&req->rq_bc_list);
+
+	/* Preallocate one XDR receive buffer */
+	if (xprt_alloc_xdr_buf(&req->rq_rcv_buf, gfp_flags) < 0) {
+		printk(KERN_ERR "Failed to create bc receive xbuf\n");
+		goto out_free;
+	}
+	req->rq_rcv_buf.len = PAGE_SIZE;
+
+	/* Preallocate one XDR send buffer */
+	if (xprt_alloc_xdr_buf(&req->rq_snd_buf, gfp_flags) < 0) {
+		printk(KERN_ERR "Failed to create bc snd xbuf\n");
+		goto out_free;
+	}
+	return req;
+out_free:
+	xprt_free_allocation(req);
+	return NULL;
+}
+
 /*
  * Preallocate up to min_reqs structures and related buffers for use
  * by the backchannel.  This function can be called multiple times
@@ -87,9 +138,7 @@
  */
 int xprt_setup_backchannel(struct rpc_xprt *xprt, unsigned int min_reqs)
 {
-	struct page *page_rcv = NULL, *page_snd = NULL;
-	struct xdr_buf *xbufp = NULL;
-	struct rpc_rqst *req, *tmp;
+	struct rpc_rqst *req;
 	struct list_head tmp_list;
 	int i;
 
@@ -106,7 +155,7 @@
 	INIT_LIST_HEAD(&tmp_list);
 	for (i = 0; i < min_reqs; i++) {
 		/* Pre-allocate one backchannel rpc_rqst */
-		req = kzalloc(sizeof(struct rpc_rqst), GFP_KERNEL);
+		req = xprt_alloc_bc_req(xprt, GFP_KERNEL);
 		if (req == NULL) {
 			printk(KERN_ERR "Failed to create bc rpc_rqst\n");
 			goto out_free;
@@ -115,41 +164,6 @@
 		/* Add the allocated buffer to the tmp list */
 		dprintk("RPC:       adding req= %p\n", req);
 		list_add(&req->rq_bc_pa_list, &tmp_list);
-
-		req->rq_xprt = xprt;
-		INIT_LIST_HEAD(&req->rq_list);
-		INIT_LIST_HEAD(&req->rq_bc_list);
-
-		/* Preallocate one XDR receive buffer */
-		page_rcv = alloc_page(GFP_KERNEL);
-		if (page_rcv == NULL) {
-			printk(KERN_ERR "Failed to create bc receive xbuf\n");
-			goto out_free;
-		}
-		xbufp = &req->rq_rcv_buf;
-		xbufp->head[0].iov_base = page_address(page_rcv);
-		xbufp->head[0].iov_len = PAGE_SIZE;
-		xbufp->tail[0].iov_base = NULL;
-		xbufp->tail[0].iov_len = 0;
-		xbufp->page_len = 0;
-		xbufp->len = PAGE_SIZE;
-		xbufp->buflen = PAGE_SIZE;
-
-		/* Preallocate one XDR send buffer */
-		page_snd = alloc_page(GFP_KERNEL);
-		if (page_snd == NULL) {
-			printk(KERN_ERR "Failed to create bc snd xbuf\n");
-			goto out_free;
-		}
-
-		xbufp = &req->rq_snd_buf;
-		xbufp->head[0].iov_base = page_address(page_snd);
-		xbufp->head[0].iov_len = 0;
-		xbufp->tail[0].iov_base = NULL;
-		xbufp->tail[0].iov_len = 0;
-		xbufp->page_len = 0;
-		xbufp->len = 0;
-		xbufp->buflen = PAGE_SIZE;
 	}
 
 	/*
@@ -167,7 +181,10 @@
 	/*
 	 * Memory allocation failed, free the temporary list
 	 */
-	list_for_each_entry_safe(req, tmp, &tmp_list, rq_bc_pa_list) {
+	while (!list_empty(&tmp_list)) {
+		req = list_first_entry(&tmp_list,
+				struct rpc_rqst,
+				rq_bc_pa_list);
 		list_del(&req->rq_bc_pa_list);
 		xprt_free_allocation(req);
 	}
@@ -217,9 +234,15 @@
 	struct rpc_rqst *req = NULL;
 
 	dprintk("RPC:       allocate a backchannel request\n");
-	if (list_empty(&xprt->bc_pa_list))
+	if (atomic_read(&xprt->bc_free_slots) <= 0)
 		goto not_found;
-
+	if (list_empty(&xprt->bc_pa_list)) {
+		req = xprt_alloc_bc_req(xprt, GFP_ATOMIC);
+		if (!req)
+			goto not_found;
+		/* Note: this 'free' request adds it to xprt->bc_pa_list */
+		xprt_free_bc_request(req);
+	}
 	req = list_first_entry(&xprt->bc_pa_list, struct rpc_rqst,
 				rq_bc_pa_list);
 	req->rq_reply_bytes_recvd = 0;
@@ -245,11 +268,21 @@
 
 	req->rq_connect_cookie = xprt->connect_cookie - 1;
 	smp_mb__before_atomic();
-	WARN_ON_ONCE(!test_bit(RPC_BC_PA_IN_USE, &req->rq_bc_pa_state));
 	clear_bit(RPC_BC_PA_IN_USE, &req->rq_bc_pa_state);
 	smp_mb__after_atomic();
 
-	if (!xprt_need_to_requeue(xprt)) {
+	/*
+	 * Return it to the list of preallocations so that it
+	 * may be reused by a new callback request.
+	 */
+	spin_lock_bh(&xprt->bc_pa_lock);
+	if (xprt_need_to_requeue(xprt)) {
+		list_add_tail(&req->rq_bc_pa_list, &xprt->bc_pa_list);
+		xprt->bc_alloc_count++;
+		req = NULL;
+	}
+	spin_unlock_bh(&xprt->bc_pa_lock);
+	if (req != NULL) {
 		/*
 		 * The last remaining session was destroyed while this
 		 * entry was in use.  Free the entry and don't attempt
@@ -260,14 +293,6 @@
 		xprt_free_allocation(req);
 		return;
 	}
-
-	/*
-	 * Return it to the list of preallocations so that it
-	 * may be reused by a new callback request.
-	 */
-	spin_lock_bh(&xprt->bc_pa_lock);
-	list_add_tail(&req->rq_bc_pa_list, &xprt->bc_pa_list);
-	spin_unlock_bh(&xprt->bc_pa_lock);
 }
 
 /*
@@ -311,6 +336,7 @@
 
 	spin_lock(&xprt->bc_pa_lock);
 	list_del(&req->rq_bc_pa_list);
+	xprt->bc_alloc_count--;
 	spin_unlock(&xprt->bc_pa_lock);
 
 	req->rq_private_buf.len = copied;
diff --git a/net/sunrpc/bc_svc.c b/net/sunrpc/bc_svc.c
deleted file mode 100644
index 15c7a8a..0000000
--- a/net/sunrpc/bc_svc.c
+++ /dev/null
@@ -1,63 +0,0 @@
-/******************************************************************************
-
-(c) 2007 Network Appliance, Inc.  All Rights Reserved.
-(c) 2009 NetApp.  All Rights Reserved.
-
-NetApp provides this source code under the GPL v2 License.
-The GPL v2 license is available at
-http://opensource.org/licenses/gpl-license.php.
-
-THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
-"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
-LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
-A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
-CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
-EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
-PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
-PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
-LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
-NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
-SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-******************************************************************************/
-
-/*
- * The NFSv4.1 callback service helper routines.
- * They implement the transport level processing required to send the
- * reply over an existing open connection previously established by the client.
- */
-
-#include <linux/module.h>
-
-#include <linux/sunrpc/xprt.h>
-#include <linux/sunrpc/sched.h>
-#include <linux/sunrpc/bc_xprt.h>
-
-#define RPCDBG_FACILITY	RPCDBG_SVCDSP
-
-/* Empty callback ops */
-static const struct rpc_call_ops nfs41_callback_ops = {
-};
-
-
-/*
- * Send the callback reply
- */
-int bc_send(struct rpc_rqst *req)
-{
-	struct rpc_task *task;
-	int ret;
-
-	dprintk("RPC:       bc_send req= %p\n", req);
-	task = rpc_run_bc_task(req, &nfs41_callback_ops);
-	if (IS_ERR(task))
-		ret = PTR_ERR(task);
-	else {
-		WARN_ON_ONCE(atomic_read(&task->tk_count) != 1);
-		ret = task->tk_status;
-		rpc_put_task(task);
-	}
-	dprintk("RPC:       bc_send ret= %d\n", ret);
-	return ret;
-}
-
diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c
index e6ce151..cbc6af9 100644
--- a/net/sunrpc/clnt.c
+++ b/net/sunrpc/clnt.c
@@ -891,15 +891,8 @@
 			task->tk_flags |= RPC_TASK_SOFT;
 		if (clnt->cl_noretranstimeo)
 			task->tk_flags |= RPC_TASK_NO_RETRANS_TIMEOUT;
-		if (sk_memalloc_socks()) {
-			struct rpc_xprt *xprt;
-
-			rcu_read_lock();
-			xprt = rcu_dereference(clnt->cl_xprt);
-			if (xprt->swapper)
-				task->tk_flags |= RPC_TASK_SWAPPER;
-			rcu_read_unlock();
-		}
+		if (atomic_read(&clnt->cl_swapper))
+			task->tk_flags |= RPC_TASK_SWAPPER;
 		/* Add to the client's list of all tasks */
 		spin_lock(&clnt->cl_lock);
 		list_add_tail(&task->tk_task, &clnt->cl_tasks);
@@ -1031,15 +1024,14 @@
  * rpc_run_bc_task - Allocate a new RPC task for backchannel use, then run
  * rpc_execute against it
  * @req: RPC request
- * @tk_ops: RPC call ops
  */
-struct rpc_task *rpc_run_bc_task(struct rpc_rqst *req,
-				const struct rpc_call_ops *tk_ops)
+struct rpc_task *rpc_run_bc_task(struct rpc_rqst *req)
 {
 	struct rpc_task *task;
 	struct xdr_buf *xbufp = &req->rq_snd_buf;
 	struct rpc_task_setup task_setup_data = {
-		.callback_ops = tk_ops,
+		.callback_ops = &rpc_default_ops,
+		.flags = RPC_TASK_SOFTCONN,
 	};
 
 	dprintk("RPC: rpc_run_bc_task req= %p\n", req);
@@ -1614,6 +1606,7 @@
 					req->rq_callsize + req->rq_rcvsize);
 	if (req->rq_buffer != NULL)
 		return;
+	xprt_inject_disconnect(xprt);
 
 	dprintk("RPC: %5u rpc_buffer allocation failed\n", task->tk_pid);
 
@@ -1951,33 +1944,36 @@
 {
 	struct rpc_rqst *req = task->tk_rqstp;
 
-	if (!xprt_prepare_transmit(task)) {
-		/*
-		 * Could not reserve the transport. Try again after the
-		 * transport is released.
-		 */
-		task->tk_status = 0;
-		task->tk_action = call_bc_transmit;
-		return;
-	}
+	if (!xprt_prepare_transmit(task))
+		goto out_retry;
 
-	task->tk_action = rpc_exit_task;
 	if (task->tk_status < 0) {
 		printk(KERN_NOTICE "RPC: Could not send backchannel reply "
 			"error: %d\n", task->tk_status);
-		return;
+		goto out_done;
 	}
+	if (req->rq_connect_cookie != req->rq_xprt->connect_cookie)
+		req->rq_bytes_sent = 0;
 
 	xprt_transmit(task);
+
+	if (task->tk_status == -EAGAIN)
+		goto out_nospace;
+
 	xprt_end_transmit(task);
 	dprint_status(task);
 	switch (task->tk_status) {
 	case 0:
 		/* Success */
-		break;
 	case -EHOSTDOWN:
 	case -EHOSTUNREACH:
 	case -ENETUNREACH:
+	case -ECONNRESET:
+	case -ECONNREFUSED:
+	case -EADDRINUSE:
+	case -ENOTCONN:
+	case -EPIPE:
+		break;
 	case -ETIMEDOUT:
 		/*
 		 * Problem reaching the server.  Disconnect and let the
@@ -2002,6 +1998,13 @@
 		break;
 	}
 	rpc_wake_up_queued_task(&req->rq_xprt->pending, task);
+out_done:
+	task->tk_action = rpc_exit_task;
+	return;
+out_nospace:
+	req->rq_connect_cookie = req->rq_xprt->connect_cookie;
+out_retry:
+	task->tk_status = 0;
 }
 #endif /* CONFIG_SUNRPC_BACKCHANNEL */
 
@@ -2476,3 +2479,59 @@
 	spin_unlock(&sn->rpc_client_lock);
 }
 #endif
+
+#if IS_ENABLED(CONFIG_SUNRPC_SWAP)
+int
+rpc_clnt_swap_activate(struct rpc_clnt *clnt)
+{
+	int ret = 0;
+	struct rpc_xprt	*xprt;
+
+	if (atomic_inc_return(&clnt->cl_swapper) == 1) {
+retry:
+		rcu_read_lock();
+		xprt = xprt_get(rcu_dereference(clnt->cl_xprt));
+		rcu_read_unlock();
+		if (!xprt) {
+			/*
+			 * If we didn't get a reference, then we likely are
+			 * racing with a migration event. Wait for a grace
+			 * period and try again.
+			 */
+			synchronize_rcu();
+			goto retry;
+		}
+
+		ret = xprt_enable_swap(xprt);
+		xprt_put(xprt);
+	}
+	return ret;
+}
+EXPORT_SYMBOL_GPL(rpc_clnt_swap_activate);
+
+void
+rpc_clnt_swap_deactivate(struct rpc_clnt *clnt)
+{
+	struct rpc_xprt	*xprt;
+
+	if (atomic_dec_if_positive(&clnt->cl_swapper) == 0) {
+retry:
+		rcu_read_lock();
+		xprt = xprt_get(rcu_dereference(clnt->cl_xprt));
+		rcu_read_unlock();
+		if (!xprt) {
+			/*
+			 * If we didn't get a reference, then we likely are
+			 * racing with a migration event. Wait for a grace
+			 * period and try again.
+			 */
+			synchronize_rcu();
+			goto retry;
+		}
+
+		xprt_disable_swap(xprt);
+		xprt_put(xprt);
+	}
+}
+EXPORT_SYMBOL_GPL(rpc_clnt_swap_deactivate);
+#endif /* CONFIG_SUNRPC_SWAP */
diff --git a/net/sunrpc/debugfs.c b/net/sunrpc/debugfs.c
index 82962f7..e7b4d935 100644
--- a/net/sunrpc/debugfs.c
+++ b/net/sunrpc/debugfs.c
@@ -10,9 +10,12 @@
 #include "netns.h"
 
 static struct dentry *topdir;
+static struct dentry *rpc_fault_dir;
 static struct dentry *rpc_clnt_dir;
 static struct dentry *rpc_xprt_dir;
 
+unsigned int rpc_inject_disconnect;
+
 struct rpc_clnt_iter {
 	struct rpc_clnt	*clnt;
 	loff_t		pos;
@@ -257,6 +260,8 @@
 		debugfs_remove_recursive(xprt->debugfs);
 		xprt->debugfs = NULL;
 	}
+
+	atomic_set(&xprt->inject_disconnect, rpc_inject_disconnect);
 }
 
 void
@@ -266,11 +271,79 @@
 	xprt->debugfs = NULL;
 }
 
+static int
+fault_open(struct inode *inode, struct file *filp)
+{
+	filp->private_data = kmalloc(128, GFP_KERNEL);
+	if (!filp->private_data)
+		return -ENOMEM;
+	return 0;
+}
+
+static int
+fault_release(struct inode *inode, struct file *filp)
+{
+	kfree(filp->private_data);
+	return 0;
+}
+
+static ssize_t
+fault_disconnect_read(struct file *filp, char __user *user_buf,
+		      size_t len, loff_t *offset)
+{
+	char *buffer = (char *)filp->private_data;
+	size_t size;
+
+	size = sprintf(buffer, "%u\n", rpc_inject_disconnect);
+	return simple_read_from_buffer(user_buf, len, offset, buffer, size);
+}
+
+static ssize_t
+fault_disconnect_write(struct file *filp, const char __user *user_buf,
+		       size_t len, loff_t *offset)
+{
+	char buffer[16];
+
+	if (len >= sizeof(buffer))
+		len = sizeof(buffer) - 1;
+	if (copy_from_user(buffer, user_buf, len))
+		return -EFAULT;
+	buffer[len] = '\0';
+	if (kstrtouint(buffer, 10, &rpc_inject_disconnect))
+		return -EINVAL;
+	return len;
+}
+
+static const struct file_operations fault_disconnect_fops = {
+	.owner		= THIS_MODULE,
+	.open		= fault_open,
+	.read		= fault_disconnect_read,
+	.write		= fault_disconnect_write,
+	.release	= fault_release,
+};
+
+static struct dentry *
+inject_fault_dir(struct dentry *topdir)
+{
+	struct dentry *faultdir;
+
+	faultdir = debugfs_create_dir("inject_fault", topdir);
+	if (!faultdir)
+		return NULL;
+
+	if (!debugfs_create_file("disconnect", S_IFREG | S_IRUSR, faultdir,
+				 NULL, &fault_disconnect_fops))
+		return NULL;
+
+	return faultdir;
+}
+
 void __exit
 sunrpc_debugfs_exit(void)
 {
 	debugfs_remove_recursive(topdir);
 	topdir = NULL;
+	rpc_fault_dir = NULL;
 	rpc_clnt_dir = NULL;
 	rpc_xprt_dir = NULL;
 }
@@ -282,6 +355,10 @@
 	if (!topdir)
 		return;
 
+	rpc_fault_dir = inject_fault_dir(topdir);
+	if (!rpc_fault_dir)
+		goto out_remove;
+
 	rpc_clnt_dir = debugfs_create_dir("rpc_clnt", topdir);
 	if (!rpc_clnt_dir)
 		goto out_remove;
@@ -294,5 +371,6 @@
 out_remove:
 	debugfs_remove_recursive(topdir);
 	topdir = NULL;
+	rpc_fault_dir = NULL;
 	rpc_clnt_dir = NULL;
 }
diff --git a/net/sunrpc/svc.c b/net/sunrpc/svc.c
index 852ae60..5a16d8d 100644
--- a/net/sunrpc/svc.c
+++ b/net/sunrpc/svc.c
@@ -1350,6 +1350,11 @@
 {
 	struct kvec	*argv = &rqstp->rq_arg.head[0];
 	struct kvec	*resv = &rqstp->rq_res.head[0];
+	struct rpc_task *task;
+	int proc_error;
+	int error;
+
+	dprintk("svc: %s(%p)\n", __func__, req);
 
 	/* Build the svc_rqst used by the common processing routine */
 	rqstp->rq_xprt = serv->sv_bc_xprt;
@@ -1372,21 +1377,36 @@
 
 	/*
 	 * Skip the next two words because they've already been
-	 * processed in the trasport
+	 * processed in the transport
 	 */
 	svc_getu32(argv);	/* XID */
 	svc_getnl(argv);	/* CALLDIR */
 
-	/* Returns 1 for send, 0 for drop */
-	if (svc_process_common(rqstp, argv, resv)) {
-		memcpy(&req->rq_snd_buf, &rqstp->rq_res,
-						sizeof(req->rq_snd_buf));
-		return bc_send(req);
-	} else {
-		/* drop request */
+	/* Parse and execute the bc call */
+	proc_error = svc_process_common(rqstp, argv, resv);
+
+	atomic_inc(&req->rq_xprt->bc_free_slots);
+	if (!proc_error) {
+		/* Processing error: drop the request */
 		xprt_free_bc_request(req);
 		return 0;
 	}
+
+	/* Finally, send the reply synchronously */
+	memcpy(&req->rq_snd_buf, &rqstp->rq_res, sizeof(req->rq_snd_buf));
+	task = rpc_run_bc_task(req);
+	if (IS_ERR(task)) {
+		error = PTR_ERR(task);
+		goto out;
+	}
+
+	WARN_ON_ONCE(atomic_read(&task->tk_count) != 1);
+	error = task->tk_status;
+	rpc_put_task(task);
+
+out:
+	dprintk("svc: %s(), error=%d\n", __func__, error);
+	return error;
 }
 EXPORT_SYMBOL_GPL(bc_svc_process);
 #endif /* CONFIG_SUNRPC_BACKCHANNEL */
diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c
index 1d4fe24..ab5dd62 100644
--- a/net/sunrpc/xprt.c
+++ b/net/sunrpc/xprt.c
@@ -68,6 +68,7 @@
 static void	xprt_request_init(struct rpc_task *, struct rpc_xprt *);
 static void	xprt_connect_status(struct rpc_task *task);
 static int      __xprt_get_cong(struct rpc_xprt *, struct rpc_task *);
+static void     __xprt_put_cong(struct rpc_xprt *, struct rpc_rqst *);
 static void	 xprt_destroy(struct rpc_xprt *xprt);
 
 static DEFINE_SPINLOCK(xprt_list_lock);
@@ -250,6 +251,8 @@
 	}
 	xprt_clear_locked(xprt);
 out_sleep:
+	if (req)
+		__xprt_put_cong(xprt, req);
 	dprintk("RPC: %5u failed to lock transport %p\n", task->tk_pid, xprt);
 	task->tk_timeout = 0;
 	task->tk_status = -EAGAIN;
@@ -608,8 +611,8 @@
 	struct rpc_xprt *xprt =
 		container_of(work, struct rpc_xprt, task_cleanup);
 
-	xprt->ops->close(xprt);
 	clear_bit(XPRT_CLOSE_WAIT, &xprt->state);
+	xprt->ops->close(xprt);
 	xprt_release_write(xprt, NULL);
 }
 
@@ -967,6 +970,7 @@
 		task->tk_status = status;
 		return;
 	}
+	xprt_inject_disconnect(xprt);
 
 	dprintk("RPC: %5u xmit complete\n", task->tk_pid);
 	task->tk_flags |= RPC_TASK_SENT;
@@ -1285,6 +1289,7 @@
 	spin_unlock_bh(&xprt->transport_lock);
 	if (req->rq_buffer)
 		xprt->ops->buf_free(req->rq_buffer);
+	xprt_inject_disconnect(xprt);
 	if (req->rq_cred != NULL)
 		put_rpccred(req->rq_cred);
 	task->tk_rqstp = NULL;
diff --git a/net/sunrpc/xprtrdma/fmr_ops.c b/net/sunrpc/xprtrdma/fmr_ops.c
index 302d4eb..f1e8daf 100644
--- a/net/sunrpc/xprtrdma/fmr_ops.c
+++ b/net/sunrpc/xprtrdma/fmr_ops.c
@@ -11,6 +11,21 @@
  * can take tens of usecs to complete.
  */
 
+/* Normal operation
+ *
+ * A Memory Region is prepared for RDMA READ or WRITE using the
+ * ib_map_phys_fmr verb (fmr_op_map). When the RDMA operation is
+ * finished, the Memory Region is unmapped using the ib_unmap_fmr
+ * verb (fmr_op_unmap).
+ */
+
+/* Transport recovery
+ *
+ * After a transport reconnect, fmr_op_map re-uses the MR already
+ * allocated for the RPC, but generates a fresh rkey then maps the
+ * MR again. This process is synchronous.
+ */
+
 #include "xprt_rdma.h"
 
 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
@@ -50,19 +65,28 @@
 	struct rpcrdma_mw *r;
 	int i, rc;
 
+	spin_lock_init(&buf->rb_mwlock);
 	INIT_LIST_HEAD(&buf->rb_mws);
 	INIT_LIST_HEAD(&buf->rb_all);
 
-	i = (buf->rb_max_requests + 1) * RPCRDMA_MAX_SEGS;
-	dprintk("RPC:       %s: initializing %d FMRs\n", __func__, i);
+	i = max_t(int, RPCRDMA_MAX_DATA_SEGS / RPCRDMA_MAX_FMR_SGES, 1);
+	i += 2;				/* head + tail */
+	i *= buf->rb_max_requests;	/* one set for each RPC slot */
+	dprintk("RPC:       %s: initalizing %d FMRs\n", __func__, i);
 
+	rc = -ENOMEM;
 	while (i--) {
 		r = kzalloc(sizeof(*r), GFP_KERNEL);
 		if (!r)
-			return -ENOMEM;
+			goto out;
 
-		r->r.fmr = ib_alloc_fmr(pd, mr_access_flags, &fmr_attr);
-		if (IS_ERR(r->r.fmr))
+		r->r.fmr.physaddrs = kmalloc(RPCRDMA_MAX_FMR_SGES *
+					     sizeof(u64), GFP_KERNEL);
+		if (!r->r.fmr.physaddrs)
+			goto out_free;
+
+		r->r.fmr.fmr = ib_alloc_fmr(pd, mr_access_flags, &fmr_attr);
+		if (IS_ERR(r->r.fmr.fmr))
 			goto out_fmr_err;
 
 		list_add(&r->mw_list, &buf->rb_mws);
@@ -71,12 +95,24 @@
 	return 0;
 
 out_fmr_err:
-	rc = PTR_ERR(r->r.fmr);
+	rc = PTR_ERR(r->r.fmr.fmr);
 	dprintk("RPC:       %s: ib_alloc_fmr status %i\n", __func__, rc);
+	kfree(r->r.fmr.physaddrs);
+out_free:
 	kfree(r);
+out:
 	return rc;
 }
 
+static int
+__fmr_unmap(struct rpcrdma_mw *r)
+{
+	LIST_HEAD(l);
+
+	list_add(&r->r.fmr.fmr->list, &l);
+	return ib_unmap_fmr(&l);
+}
+
 /* Use the ib_map_phys_fmr() verb to register a memory region
  * for remote access via RDMA READ or RDMA WRITE.
  */
@@ -85,12 +121,24 @@
 	   int nsegs, bool writing)
 {
 	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
-	struct ib_device *device = ia->ri_id->device;
+	struct ib_device *device = ia->ri_device;
 	enum dma_data_direction direction = rpcrdma_data_dir(writing);
 	struct rpcrdma_mr_seg *seg1 = seg;
-	struct rpcrdma_mw *mw = seg1->rl_mw;
-	u64 physaddrs[RPCRDMA_MAX_DATA_SEGS];
 	int len, pageoff, i, rc;
+	struct rpcrdma_mw *mw;
+
+	mw = seg1->rl_mw;
+	seg1->rl_mw = NULL;
+	if (!mw) {
+		mw = rpcrdma_get_mw(r_xprt);
+		if (!mw)
+			return -ENOMEM;
+	} else {
+		/* this is a retransmit; generate a fresh rkey */
+		rc = __fmr_unmap(mw);
+		if (rc)
+			return rc;
+	}
 
 	pageoff = offset_in_page(seg1->mr_offset);
 	seg1->mr_offset -= pageoff;	/* start of page */
@@ -100,7 +148,7 @@
 		nsegs = RPCRDMA_MAX_FMR_SGES;
 	for (i = 0; i < nsegs;) {
 		rpcrdma_map_one(device, seg, direction);
-		physaddrs[i] = seg->mr_dma;
+		mw->r.fmr.physaddrs[i] = seg->mr_dma;
 		len += seg->mr_len;
 		++seg;
 		++i;
@@ -110,11 +158,13 @@
 			break;
 	}
 
-	rc = ib_map_phys_fmr(mw->r.fmr, physaddrs, i, seg1->mr_dma);
+	rc = ib_map_phys_fmr(mw->r.fmr.fmr, mw->r.fmr.physaddrs,
+			     i, seg1->mr_dma);
 	if (rc)
 		goto out_maperr;
 
-	seg1->mr_rkey = mw->r.fmr->rkey;
+	seg1->rl_mw = mw;
+	seg1->mr_rkey = mw->r.fmr.fmr->rkey;
 	seg1->mr_base = seg1->mr_dma + pageoff;
 	seg1->mr_nsegs = i;
 	seg1->mr_len = len;
@@ -137,48 +187,28 @@
 {
 	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
 	struct rpcrdma_mr_seg *seg1 = seg;
-	struct ib_device *device;
+	struct rpcrdma_mw *mw = seg1->rl_mw;
 	int rc, nsegs = seg->mr_nsegs;
-	LIST_HEAD(l);
 
-	list_add(&seg1->rl_mw->r.fmr->list, &l);
-	rc = ib_unmap_fmr(&l);
-	read_lock(&ia->ri_qplock);
-	device = ia->ri_id->device;
+	dprintk("RPC:       %s: FMR %p\n", __func__, mw);
+
+	seg1->rl_mw = NULL;
 	while (seg1->mr_nsegs--)
-		rpcrdma_unmap_one(device, seg++);
-	read_unlock(&ia->ri_qplock);
+		rpcrdma_unmap_one(ia->ri_device, seg++);
+	rc = __fmr_unmap(mw);
 	if (rc)
 		goto out_err;
+	rpcrdma_put_mw(r_xprt, mw);
 	return nsegs;
 
 out_err:
+	/* The FMR is abandoned, but remains in rb_all. fmr_op_destroy
+	 * will attempt to release it when the transport is destroyed.
+	 */
 	dprintk("RPC:       %s: ib_unmap_fmr status %i\n", __func__, rc);
 	return nsegs;
 }
 
-/* After a disconnect, unmap all FMRs.
- *
- * This is invoked only in the transport connect worker in order
- * to serialize with rpcrdma_register_fmr_external().
- */
-static void
-fmr_op_reset(struct rpcrdma_xprt *r_xprt)
-{
-	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
-	struct rpcrdma_mw *r;
-	LIST_HEAD(list);
-	int rc;
-
-	list_for_each_entry(r, &buf->rb_all, mw_all)
-		list_add(&r->r.fmr->list, &list);
-
-	rc = ib_unmap_fmr(&list);
-	if (rc)
-		dprintk("RPC:       %s: ib_unmap_fmr failed %i\n",
-			__func__, rc);
-}
-
 static void
 fmr_op_destroy(struct rpcrdma_buffer *buf)
 {
@@ -188,10 +218,13 @@
 	while (!list_empty(&buf->rb_all)) {
 		r = list_entry(buf->rb_all.next, struct rpcrdma_mw, mw_all);
 		list_del(&r->mw_all);
-		rc = ib_dealloc_fmr(r->r.fmr);
+		kfree(r->r.fmr.physaddrs);
+
+		rc = ib_dealloc_fmr(r->r.fmr.fmr);
 		if (rc)
 			dprintk("RPC:       %s: ib_dealloc_fmr failed %i\n",
 				__func__, rc);
+
 		kfree(r);
 	}
 }
@@ -202,7 +235,6 @@
 	.ro_open			= fmr_op_open,
 	.ro_maxpages			= fmr_op_maxpages,
 	.ro_init			= fmr_op_init,
-	.ro_reset			= fmr_op_reset,
 	.ro_destroy			= fmr_op_destroy,
 	.ro_displayname			= "fmr",
 };
diff --git a/net/sunrpc/xprtrdma/frwr_ops.c b/net/sunrpc/xprtrdma/frwr_ops.c
index d234521..04ea914 100644
--- a/net/sunrpc/xprtrdma/frwr_ops.c
+++ b/net/sunrpc/xprtrdma/frwr_ops.c
@@ -11,12 +11,136 @@
  * but most complex memory registration mode.
  */
 
+/* Normal operation
+ *
+ * A Memory Region is prepared for RDMA READ or WRITE using a FAST_REG
+ * Work Request (frmr_op_map). When the RDMA operation is finished, this
+ * Memory Region is invalidated using a LOCAL_INV Work Request
+ * (frmr_op_unmap).
+ *
+ * Typically these Work Requests are not signaled, and neither are RDMA
+ * SEND Work Requests (with the exception of signaling occasionally to
+ * prevent provider work queue overflows). This greatly reduces HCA
+ * interrupt workload.
+ *
+ * As an optimization, frwr_op_unmap marks MRs INVALID before the
+ * LOCAL_INV WR is posted. If posting succeeds, the MR is placed on
+ * rb_mws immediately so that no work (like managing a linked list
+ * under a spinlock) is needed in the completion upcall.
+ *
+ * But this means that frwr_op_map() can occasionally encounter an MR
+ * that is INVALID but the LOCAL_INV WR has not completed. Work Queue
+ * ordering prevents a subsequent FAST_REG WR from executing against
+ * that MR while it is still being invalidated.
+ */
+
+/* Transport recovery
+ *
+ * ->op_map and the transport connect worker cannot run at the same
+ * time, but ->op_unmap can fire while the transport connect worker
+ * is running. Thus MR recovery is handled in ->op_map, to guarantee
+ * that recovered MRs are owned by a sending RPC, and not one where
+ * ->op_unmap could fire at the same time transport reconnect is
+ * being done.
+ *
+ * When the underlying transport disconnects, MRs are left in one of
+ * three states:
+ *
+ * INVALID:	The MR was not in use before the QP entered ERROR state.
+ *		(Or, the LOCAL_INV WR has not completed or flushed yet).
+ *
+ * STALE:	The MR was being registered or unregistered when the QP
+ *		entered ERROR state, and the pending WR was flushed.
+ *
+ * VALID:	The MR was registered before the QP entered ERROR state.
+ *
+ * When frwr_op_map encounters STALE and VALID MRs, they are recovered
+ * with ib_dereg_mr and then are re-initialized. Beause MR recovery
+ * allocates fresh resources, it is deferred to a workqueue, and the
+ * recovered MRs are placed back on the rb_mws list when recovery is
+ * complete. frwr_op_map allocates another MR for the current RPC while
+ * the broken MR is reset.
+ *
+ * To ensure that frwr_op_map doesn't encounter an MR that is marked
+ * INVALID but that is about to be flushed due to a previous transport
+ * disconnect, the transport connect worker attempts to drain all
+ * pending send queue WRs before the transport is reconnected.
+ */
+
 #include "xprt_rdma.h"
 
 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
 # define RPCDBG_FACILITY	RPCDBG_TRANS
 #endif
 
+static struct workqueue_struct *frwr_recovery_wq;
+
+#define FRWR_RECOVERY_WQ_FLAGS		(WQ_UNBOUND | WQ_MEM_RECLAIM)
+
+int
+frwr_alloc_recovery_wq(void)
+{
+	frwr_recovery_wq = alloc_workqueue("frwr_recovery",
+					   FRWR_RECOVERY_WQ_FLAGS, 0);
+	return !frwr_recovery_wq ? -ENOMEM : 0;
+}
+
+void
+frwr_destroy_recovery_wq(void)
+{
+	struct workqueue_struct *wq;
+
+	if (!frwr_recovery_wq)
+		return;
+
+	wq = frwr_recovery_wq;
+	frwr_recovery_wq = NULL;
+	destroy_workqueue(wq);
+}
+
+/* Deferred reset of a single FRMR. Generate a fresh rkey by
+ * replacing the MR.
+ *
+ * There's no recovery if this fails. The FRMR is abandoned, but
+ * remains in rb_all. It will be cleaned up when the transport is
+ * destroyed.
+ */
+static void
+__frwr_recovery_worker(struct work_struct *work)
+{
+	struct rpcrdma_mw *r = container_of(work, struct rpcrdma_mw,
+					    r.frmr.fr_work);
+	struct rpcrdma_xprt *r_xprt = r->r.frmr.fr_xprt;
+	unsigned int depth = r_xprt->rx_ia.ri_max_frmr_depth;
+	struct ib_pd *pd = r_xprt->rx_ia.ri_pd;
+
+	if (ib_dereg_mr(r->r.frmr.fr_mr))
+		goto out_fail;
+
+	r->r.frmr.fr_mr = ib_alloc_fast_reg_mr(pd, depth);
+	if (IS_ERR(r->r.frmr.fr_mr))
+		goto out_fail;
+
+	dprintk("RPC:       %s: recovered FRMR %p\n", __func__, r);
+	r->r.frmr.fr_state = FRMR_IS_INVALID;
+	rpcrdma_put_mw(r_xprt, r);
+	return;
+
+out_fail:
+	pr_warn("RPC:       %s: FRMR %p unrecovered\n",
+		__func__, r);
+}
+
+/* A broken MR was discovered in a context that can't sleep.
+ * Defer recovery to the recovery worker.
+ */
+static void
+__frwr_queue_recovery(struct rpcrdma_mw *r)
+{
+	INIT_WORK(&r->r.frmr.fr_work, __frwr_recovery_worker);
+	queue_work(frwr_recovery_wq, &r->r.frmr.fr_work);
+}
+
 static int
 __frwr_init(struct rpcrdma_mw *r, struct ib_pd *pd, struct ib_device *device,
 	    unsigned int depth)
@@ -128,7 +252,7 @@
 
 	/* WARNING: Only wr_id and status are reliable at this point */
 	r = (struct rpcrdma_mw *)(unsigned long)wc->wr_id;
-	dprintk("RPC:       %s: frmr %p (stale), status %s (%d)\n",
+	pr_warn("RPC:       %s: frmr %p flushed, status %s (%d)\n",
 		__func__, r, ib_wc_status_msg(wc->status), wc->status);
 	r->r.frmr.fr_state = FRMR_IS_STALE;
 }
@@ -137,16 +261,19 @@
 frwr_op_init(struct rpcrdma_xprt *r_xprt)
 {
 	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
-	struct ib_device *device = r_xprt->rx_ia.ri_id->device;
+	struct ib_device *device = r_xprt->rx_ia.ri_device;
 	unsigned int depth = r_xprt->rx_ia.ri_max_frmr_depth;
 	struct ib_pd *pd = r_xprt->rx_ia.ri_pd;
 	int i;
 
+	spin_lock_init(&buf->rb_mwlock);
 	INIT_LIST_HEAD(&buf->rb_mws);
 	INIT_LIST_HEAD(&buf->rb_all);
 
-	i = (buf->rb_max_requests + 1) * RPCRDMA_MAX_SEGS;
-	dprintk("RPC:       %s: initializing %d FRMRs\n", __func__, i);
+	i = max_t(int, RPCRDMA_MAX_DATA_SEGS / depth, 1);
+	i += 2;				/* head + tail */
+	i *= buf->rb_max_requests;	/* one set for each RPC slot */
+	dprintk("RPC:       %s: initalizing %d FRMRs\n", __func__, i);
 
 	while (i--) {
 		struct rpcrdma_mw *r;
@@ -165,6 +292,7 @@
 		list_add(&r->mw_list, &buf->rb_mws);
 		list_add(&r->mw_all, &buf->rb_all);
 		r->mw_sendcompletion = frwr_sendcompletion;
+		r->r.frmr.fr_xprt = r_xprt;
 	}
 
 	return 0;
@@ -178,12 +306,12 @@
 	    int nsegs, bool writing)
 {
 	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
-	struct ib_device *device = ia->ri_id->device;
+	struct ib_device *device = ia->ri_device;
 	enum dma_data_direction direction = rpcrdma_data_dir(writing);
 	struct rpcrdma_mr_seg *seg1 = seg;
-	struct rpcrdma_mw *mw = seg1->rl_mw;
-	struct rpcrdma_frmr *frmr = &mw->r.frmr;
-	struct ib_mr *mr = frmr->fr_mr;
+	struct rpcrdma_mw *mw;
+	struct rpcrdma_frmr *frmr;
+	struct ib_mr *mr;
 	struct ib_send_wr fastreg_wr, *bad_wr;
 	u8 key;
 	int len, pageoff;
@@ -192,12 +320,25 @@
 	u64 pa;
 	int page_no;
 
+	mw = seg1->rl_mw;
+	seg1->rl_mw = NULL;
+	do {
+		if (mw)
+			__frwr_queue_recovery(mw);
+		mw = rpcrdma_get_mw(r_xprt);
+		if (!mw)
+			return -ENOMEM;
+	} while (mw->r.frmr.fr_state != FRMR_IS_INVALID);
+	frmr = &mw->r.frmr;
+	frmr->fr_state = FRMR_IS_VALID;
+
 	pageoff = offset_in_page(seg1->mr_offset);
 	seg1->mr_offset -= pageoff;	/* start of page */
 	seg1->mr_len += pageoff;
 	len = -pageoff;
 	if (nsegs > ia->ri_max_frmr_depth)
 		nsegs = ia->ri_max_frmr_depth;
+
 	for (page_no = i = 0; i < nsegs;) {
 		rpcrdma_map_one(device, seg, direction);
 		pa = seg->mr_dma;
@@ -216,8 +357,6 @@
 	dprintk("RPC:       %s: Using frmr %p to map %d segments (%d bytes)\n",
 		__func__, mw, i, len);
 
-	frmr->fr_state = FRMR_IS_VALID;
-
 	memset(&fastreg_wr, 0, sizeof(fastreg_wr));
 	fastreg_wr.wr_id = (unsigned long)(void *)mw;
 	fastreg_wr.opcode = IB_WR_FAST_REG_MR;
@@ -229,6 +368,7 @@
 	fastreg_wr.wr.fast_reg.access_flags = writing ?
 				IB_ACCESS_REMOTE_WRITE | IB_ACCESS_LOCAL_WRITE :
 				IB_ACCESS_REMOTE_READ;
+	mr = frmr->fr_mr;
 	key = (u8)(mr->rkey & 0x000000FF);
 	ib_update_fast_reg_key(mr, ++key);
 	fastreg_wr.wr.fast_reg.rkey = mr->rkey;
@@ -238,6 +378,7 @@
 	if (rc)
 		goto out_senderr;
 
+	seg1->rl_mw = mw;
 	seg1->mr_rkey = mr->rkey;
 	seg1->mr_base = seg1->mr_dma + pageoff;
 	seg1->mr_nsegs = i;
@@ -246,10 +387,9 @@
 
 out_senderr:
 	dprintk("RPC:       %s: ib_post_send status %i\n", __func__, rc);
-	ib_update_fast_reg_key(mr, --key);
-	frmr->fr_state = FRMR_IS_INVALID;
 	while (i--)
 		rpcrdma_unmap_one(device, --seg);
+	__frwr_queue_recovery(mw);
 	return rc;
 }
 
@@ -261,78 +401,46 @@
 {
 	struct rpcrdma_mr_seg *seg1 = seg;
 	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
+	struct rpcrdma_mw *mw = seg1->rl_mw;
 	struct ib_send_wr invalidate_wr, *bad_wr;
 	int rc, nsegs = seg->mr_nsegs;
-	struct ib_device *device;
 
-	seg1->rl_mw->r.frmr.fr_state = FRMR_IS_INVALID;
+	dprintk("RPC:       %s: FRMR %p\n", __func__, mw);
+
+	seg1->rl_mw = NULL;
+	mw->r.frmr.fr_state = FRMR_IS_INVALID;
 
 	memset(&invalidate_wr, 0, sizeof(invalidate_wr));
-	invalidate_wr.wr_id = (unsigned long)(void *)seg1->rl_mw;
+	invalidate_wr.wr_id = (unsigned long)(void *)mw;
 	invalidate_wr.opcode = IB_WR_LOCAL_INV;
-	invalidate_wr.ex.invalidate_rkey = seg1->rl_mw->r.frmr.fr_mr->rkey;
+	invalidate_wr.ex.invalidate_rkey = mw->r.frmr.fr_mr->rkey;
 	DECR_CQCOUNT(&r_xprt->rx_ep);
 
-	read_lock(&ia->ri_qplock);
-	device = ia->ri_id->device;
 	while (seg1->mr_nsegs--)
-		rpcrdma_unmap_one(device, seg++);
+		rpcrdma_unmap_one(ia->ri_device, seg++);
+	read_lock(&ia->ri_qplock);
 	rc = ib_post_send(ia->ri_id->qp, &invalidate_wr, &bad_wr);
 	read_unlock(&ia->ri_qplock);
 	if (rc)
 		goto out_err;
+
+	rpcrdma_put_mw(r_xprt, mw);
 	return nsegs;
 
 out_err:
-	/* Force rpcrdma_buffer_get() to retry */
-	seg1->rl_mw->r.frmr.fr_state = FRMR_IS_STALE;
 	dprintk("RPC:       %s: ib_post_send status %i\n", __func__, rc);
+	__frwr_queue_recovery(mw);
 	return nsegs;
 }
 
-/* After a disconnect, a flushed FAST_REG_MR can leave an FRMR in
- * an unusable state. Find FRMRs in this state and dereg / reg
- * each.  FRMRs that are VALID and attached to an rpcrdma_req are
- * also torn down.
- *
- * This gives all in-use FRMRs a fresh rkey and leaves them INVALID.
- *
- * This is invoked only in the transport connect worker in order
- * to serialize with rpcrdma_register_frmr_external().
- */
-static void
-frwr_op_reset(struct rpcrdma_xprt *r_xprt)
-{
-	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
-	struct ib_device *device = r_xprt->rx_ia.ri_id->device;
-	unsigned int depth = r_xprt->rx_ia.ri_max_frmr_depth;
-	struct ib_pd *pd = r_xprt->rx_ia.ri_pd;
-	struct rpcrdma_mw *r;
-	int rc;
-
-	list_for_each_entry(r, &buf->rb_all, mw_all) {
-		if (r->r.frmr.fr_state == FRMR_IS_INVALID)
-			continue;
-
-		__frwr_release(r);
-		rc = __frwr_init(r, pd, device, depth);
-		if (rc) {
-			dprintk("RPC:       %s: mw %p left %s\n",
-				__func__, r,
-				(r->r.frmr.fr_state == FRMR_IS_STALE ?
-					"stale" : "valid"));
-			continue;
-		}
-
-		r->r.frmr.fr_state = FRMR_IS_INVALID;
-	}
-}
-
 static void
 frwr_op_destroy(struct rpcrdma_buffer *buf)
 {
 	struct rpcrdma_mw *r;
 
+	/* Ensure stale MWs for "buf" are no longer in flight */
+	flush_workqueue(frwr_recovery_wq);
+
 	while (!list_empty(&buf->rb_all)) {
 		r = list_entry(buf->rb_all.next, struct rpcrdma_mw, mw_all);
 		list_del(&r->mw_all);
@@ -347,7 +455,6 @@
 	.ro_open			= frwr_op_open,
 	.ro_maxpages			= frwr_op_maxpages,
 	.ro_init			= frwr_op_init,
-	.ro_reset			= frwr_op_reset,
 	.ro_destroy			= frwr_op_destroy,
 	.ro_displayname			= "frwr",
 };
diff --git a/net/sunrpc/xprtrdma/physical_ops.c b/net/sunrpc/xprtrdma/physical_ops.c
index ba518af..41985d0 100644
--- a/net/sunrpc/xprtrdma/physical_ops.c
+++ b/net/sunrpc/xprtrdma/physical_ops.c
@@ -50,8 +50,7 @@
 {
 	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
 
-	rpcrdma_map_one(ia->ri_id->device, seg,
-			rpcrdma_data_dir(writing));
+	rpcrdma_map_one(ia->ri_device, seg, rpcrdma_data_dir(writing));
 	seg->mr_rkey = ia->ri_bind_mem->rkey;
 	seg->mr_base = seg->mr_dma;
 	seg->mr_nsegs = 1;
@@ -65,19 +64,11 @@
 {
 	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
 
-	read_lock(&ia->ri_qplock);
-	rpcrdma_unmap_one(ia->ri_id->device, seg);
-	read_unlock(&ia->ri_qplock);
-
+	rpcrdma_unmap_one(ia->ri_device, seg);
 	return 1;
 }
 
 static void
-physical_op_reset(struct rpcrdma_xprt *r_xprt)
-{
-}
-
-static void
 physical_op_destroy(struct rpcrdma_buffer *buf)
 {
 }
@@ -88,7 +79,6 @@
 	.ro_open			= physical_op_open,
 	.ro_maxpages			= physical_op_maxpages,
 	.ro_init			= physical_op_init,
-	.ro_reset			= physical_op_reset,
 	.ro_destroy			= physical_op_destroy,
 	.ro_displayname			= "physical",
 };
diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
index 2c53ea9..84ea37d 100644
--- a/net/sunrpc/xprtrdma/rpc_rdma.c
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -284,9 +284,6 @@
 	return (unsigned char *)iptr - (unsigned char *)headerp;
 
 out:
-	if (r_xprt->rx_ia.ri_memreg_strategy == RPCRDMA_FRMR)
-		return n;
-
 	for (pos = 0; nchunks--;)
 		pos += r_xprt->rx_ia.ri_ops->ro_unmap(r_xprt,
 						      &req->rl_segments[pos]);
@@ -732,8 +729,8 @@
 	struct rpcrdma_msg *headerp;
 	struct rpcrdma_req *req;
 	struct rpc_rqst *rqst;
-	struct rpc_xprt *xprt = rep->rr_xprt;
-	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
+	struct rpcrdma_xprt *r_xprt = rep->rr_rxprt;
+	struct rpc_xprt *xprt = &r_xprt->rx_xprt;
 	__be32 *iptr;
 	int rdmalen, status;
 	unsigned long cwnd;
@@ -770,7 +767,6 @@
 			rep->rr_len);
 repost:
 		r_xprt->rx_stats.bad_reply_count++;
-		rep->rr_func = rpcrdma_reply_handler;
 		if (rpcrdma_ep_post_recv(&r_xprt->rx_ia, &r_xprt->rx_ep, rep))
 			rpcrdma_recv_buffer_put(rep);
 
diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c
index 436da2c..680f888 100644
--- a/net/sunrpc/xprtrdma/transport.c
+++ b/net/sunrpc/xprtrdma/transport.c
@@ -240,6 +240,16 @@
 	xprt_clear_connecting(xprt);
 }
 
+static void
+xprt_rdma_inject_disconnect(struct rpc_xprt *xprt)
+{
+	struct rpcrdma_xprt *r_xprt = container_of(xprt, struct rpcrdma_xprt,
+						   rx_xprt);
+
+	pr_info("rpcrdma: injecting transport disconnect on xprt=%p\n", xprt);
+	rdma_disconnect(r_xprt->rx_ia.ri_id);
+}
+
 /*
  * xprt_rdma_destroy
  *
@@ -612,12 +622,6 @@
 	if (req->rl_reply == NULL) 		/* e.g. reconnection */
 		rpcrdma_recv_buffer_get(req);
 
-	if (req->rl_reply) {
-		req->rl_reply->rr_func = rpcrdma_reply_handler;
-		/* this need only be done once, but... */
-		req->rl_reply->rr_xprt = xprt;
-	}
-
 	/* Must suppress retransmit to maintain credits */
 	if (req->rl_connect_cookie == xprt->connect_cookie)
 		goto drop_connection;
@@ -676,6 +680,17 @@
 	   r_xprt->rx_stats.bad_reply_count);
 }
 
+static int
+xprt_rdma_enable_swap(struct rpc_xprt *xprt)
+{
+	return -EINVAL;
+}
+
+static void
+xprt_rdma_disable_swap(struct rpc_xprt *xprt)
+{
+}
+
 /*
  * Plumbing for rpc transport switch and kernel module
  */
@@ -694,7 +709,10 @@
 	.send_request		= xprt_rdma_send_request,
 	.close			= xprt_rdma_close,
 	.destroy		= xprt_rdma_destroy,
-	.print_stats		= xprt_rdma_print_stats
+	.print_stats		= xprt_rdma_print_stats,
+	.enable_swap		= xprt_rdma_enable_swap,
+	.disable_swap		= xprt_rdma_disable_swap,
+	.inject_disconnect	= xprt_rdma_inject_disconnect
 };
 
 static struct xprt_class xprt_rdma = {
@@ -720,17 +738,24 @@
 	if (rc)
 		dprintk("RPC:       %s: xprt_unregister returned %i\n",
 			__func__, rc);
+
+	frwr_destroy_recovery_wq();
 }
 
 int xprt_rdma_init(void)
 {
 	int rc;
 
-	rc = xprt_register_transport(&xprt_rdma);
-
+	rc = frwr_alloc_recovery_wq();
 	if (rc)
 		return rc;
 
+	rc = xprt_register_transport(&xprt_rdma);
+	if (rc) {
+		frwr_destroy_recovery_wq();
+		return rc;
+	}
+
 	dprintk("RPCRDMA Module Init, register RPC RDMA transport\n");
 
 	dprintk("Defaults:\n");
diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index 52df265..891c4ed 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -80,7 +80,6 @@
 rpcrdma_run_tasklet(unsigned long data)
 {
 	struct rpcrdma_rep *rep;
-	void (*func)(struct rpcrdma_rep *);
 	unsigned long flags;
 
 	data = data;
@@ -89,14 +88,9 @@
 		rep = list_entry(rpcrdma_tasklets_g.next,
 				 struct rpcrdma_rep, rr_list);
 		list_del(&rep->rr_list);
-		func = rep->rr_func;
-		rep->rr_func = NULL;
 		spin_unlock_irqrestore(&rpcrdma_tk_lock_g, flags);
 
-		if (func)
-			func(rep);
-		else
-			rpcrdma_recv_buffer_put(rep);
+		rpcrdma_reply_handler(rep);
 
 		spin_lock_irqsave(&rpcrdma_tk_lock_g, flags);
 	}
@@ -236,7 +230,7 @@
 		__func__, rep, wc->byte_len);
 
 	rep->rr_len = wc->byte_len;
-	ib_dma_sync_single_for_cpu(rdmab_to_ia(rep->rr_buffer)->ri_id->device,
+	ib_dma_sync_single_for_cpu(rep->rr_device,
 				   rdmab_addr(rep->rr_rdmabuf),
 				   rep->rr_len, DMA_FROM_DEVICE);
 	prefetch(rdmab_to_msg(rep->rr_rdmabuf));
@@ -407,7 +401,7 @@
 
 		pr_info("rpcrdma: connection to %pIS:%u on %s, memreg '%s', %d credits, %d responders%s\n",
 			sap, rpc_get_port(sap),
-			ia->ri_id->device->name,
+			ia->ri_device->name,
 			ia->ri_ops->ro_displayname,
 			xprt->rx_buf.rb_max_requests,
 			ird, ird < 4 && ird < tird / 2 ? " (low!)" : "");
@@ -508,8 +502,9 @@
 		rc = PTR_ERR(ia->ri_id);
 		goto out1;
 	}
+	ia->ri_device = ia->ri_id->device;
 
-	ia->ri_pd = ib_alloc_pd(ia->ri_id->device);
+	ia->ri_pd = ib_alloc_pd(ia->ri_device);
 	if (IS_ERR(ia->ri_pd)) {
 		rc = PTR_ERR(ia->ri_pd);
 		dprintk("RPC:       %s: ib_alloc_pd() failed %i\n",
@@ -517,7 +512,7 @@
 		goto out2;
 	}
 
-	rc = ib_query_device(ia->ri_id->device, devattr);
+	rc = ib_query_device(ia->ri_device, devattr);
 	if (rc) {
 		dprintk("RPC:       %s: ib_query_device failed %d\n",
 			__func__, rc);
@@ -526,7 +521,7 @@
 
 	if (devattr->device_cap_flags & IB_DEVICE_LOCAL_DMA_LKEY) {
 		ia->ri_have_dma_lkey = 1;
-		ia->ri_dma_lkey = ia->ri_id->device->local_dma_lkey;
+		ia->ri_dma_lkey = ia->ri_device->local_dma_lkey;
 	}
 
 	if (memreg == RPCRDMA_FRMR) {
@@ -541,7 +536,7 @@
 		}
 	}
 	if (memreg == RPCRDMA_MTHCAFMR) {
-		if (!ia->ri_id->device->alloc_fmr) {
+		if (!ia->ri_device->alloc_fmr) {
 			dprintk("RPC:       %s: MTHCAFMR registration "
 				"not supported by HCA\n", __func__);
 			memreg = RPCRDMA_ALLPHYSICAL;
@@ -590,9 +585,6 @@
 	dprintk("RPC:       %s: memory registration strategy is '%s'\n",
 		__func__, ia->ri_ops->ro_displayname);
 
-	/* Else will do memory reg/dereg for each chunk */
-	ia->ri_memreg_strategy = memreg;
-
 	rwlock_init(&ia->ri_qplock);
 	return 0;
 
@@ -622,17 +614,17 @@
 		dprintk("RPC:       %s: ib_dereg_mr returned %i\n",
 			__func__, rc);
 	}
+
 	if (ia->ri_id != NULL && !IS_ERR(ia->ri_id)) {
 		if (ia->ri_id->qp)
 			rdma_destroy_qp(ia->ri_id);
 		rdma_destroy_id(ia->ri_id);
 		ia->ri_id = NULL;
 	}
-	if (ia->ri_pd != NULL && !IS_ERR(ia->ri_pd)) {
-		rc = ib_dealloc_pd(ia->ri_pd);
-		dprintk("RPC:       %s: ib_dealloc_pd returned %i\n",
-			__func__, rc);
-	}
+
+	/* If the pd is still busy, xprtrdma missed freeing a resource */
+	if (ia->ri_pd && !IS_ERR(ia->ri_pd))
+		WARN_ON(ib_dealloc_pd(ia->ri_pd));
 }
 
 /*
@@ -693,8 +685,8 @@
 	INIT_DELAYED_WORK(&ep->rep_connect_worker, rpcrdma_connect_worker);
 
 	cq_attr.cqe = ep->rep_attr.cap.max_send_wr + 1;
-	sendcq = ib_create_cq(ia->ri_id->device, rpcrdma_sendcq_upcall,
-				  rpcrdma_cq_async_error_upcall, ep, &cq_attr);
+	sendcq = ib_create_cq(ia->ri_device, rpcrdma_sendcq_upcall,
+			      rpcrdma_cq_async_error_upcall, ep, &cq_attr);
 	if (IS_ERR(sendcq)) {
 		rc = PTR_ERR(sendcq);
 		dprintk("RPC:       %s: failed to create send CQ: %i\n",
@@ -710,8 +702,8 @@
 	}
 
 	cq_attr.cqe = ep->rep_attr.cap.max_recv_wr + 1;
-	recvcq = ib_create_cq(ia->ri_id->device, rpcrdma_recvcq_upcall,
-				  rpcrdma_cq_async_error_upcall, ep, &cq_attr);
+	recvcq = ib_create_cq(ia->ri_device, rpcrdma_recvcq_upcall,
+			      rpcrdma_cq_async_error_upcall, ep, &cq_attr);
 	if (IS_ERR(recvcq)) {
 		rc = PTR_ERR(recvcq);
 		dprintk("RPC:       %s: failed to create recv CQ: %i\n",
@@ -817,8 +809,6 @@
 		rpcrdma_flush_cqs(ep);
 
 		xprt = container_of(ia, struct rpcrdma_xprt, rx_ia);
-		ia->ri_ops->ro_reset(xprt);
-
 		id = rpcrdma_create_id(xprt, ia,
 				(struct sockaddr *)&xprt->rx_data.addr);
 		if (IS_ERR(id)) {
@@ -832,7 +822,7 @@
 		 * More stuff I haven't thought of!
 		 * Rrrgh!
 		 */
-		if (ia->ri_id->device != id->device) {
+		if (ia->ri_device != id->device) {
 			printk("RPC:       %s: can't reconnect on "
 				"different device!\n", __func__);
 			rdma_destroy_id(id);
@@ -974,7 +964,8 @@
 		goto out_free;
 	}
 
-	rep->rr_buffer = &r_xprt->rx_buf;
+	rep->rr_device = ia->ri_device;
+	rep->rr_rxprt = r_xprt;
 	return rep;
 
 out_free:
@@ -1098,31 +1089,33 @@
 	kfree(buf->rb_pool);
 }
 
-/* "*mw" can be NULL when rpcrdma_buffer_get_mrs() fails, leaving
- * some req segments uninitialized.
- */
-static void
-rpcrdma_buffer_put_mr(struct rpcrdma_mw **mw, struct rpcrdma_buffer *buf)
+struct rpcrdma_mw *
+rpcrdma_get_mw(struct rpcrdma_xprt *r_xprt)
 {
-	if (*mw) {
-		list_add_tail(&(*mw)->mw_list, &buf->rb_mws);
-		*mw = NULL;
+	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
+	struct rpcrdma_mw *mw = NULL;
+
+	spin_lock(&buf->rb_mwlock);
+	if (!list_empty(&buf->rb_mws)) {
+		mw = list_first_entry(&buf->rb_mws,
+				      struct rpcrdma_mw, mw_list);
+		list_del_init(&mw->mw_list);
 	}
+	spin_unlock(&buf->rb_mwlock);
+
+	if (!mw)
+		pr_err("RPC:       %s: no MWs available\n", __func__);
+	return mw;
 }
 
-/* Cycle mw's back in reverse order, and "spin" them.
- * This delays and scrambles reuse as much as possible.
- */
-static void
-rpcrdma_buffer_put_mrs(struct rpcrdma_req *req, struct rpcrdma_buffer *buf)
+void
+rpcrdma_put_mw(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mw *mw)
 {
-	struct rpcrdma_mr_seg *seg = req->rl_segments;
-	struct rpcrdma_mr_seg *seg1 = seg;
-	int i;
+	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
 
-	for (i = 1, seg++; i < RPCRDMA_MAX_SEGS; seg++, i++)
-		rpcrdma_buffer_put_mr(&seg->rl_mw, buf);
-	rpcrdma_buffer_put_mr(&seg1->rl_mw, buf);
+	spin_lock(&buf->rb_mwlock);
+	list_add_tail(&mw->mw_list, &buf->rb_mws);
+	spin_unlock(&buf->rb_mwlock);
 }
 
 static void
@@ -1132,115 +1125,10 @@
 	req->rl_niovs = 0;
 	if (req->rl_reply) {
 		buf->rb_recv_bufs[--buf->rb_recv_index] = req->rl_reply;
-		req->rl_reply->rr_func = NULL;
 		req->rl_reply = NULL;
 	}
 }
 
-/* rpcrdma_unmap_one() was already done during deregistration.
- * Redo only the ib_post_send().
- */
-static void
-rpcrdma_retry_local_inv(struct rpcrdma_mw *r, struct rpcrdma_ia *ia)
-{
-	struct rpcrdma_xprt *r_xprt =
-				container_of(ia, struct rpcrdma_xprt, rx_ia);
-	struct ib_send_wr invalidate_wr, *bad_wr;
-	int rc;
-
-	dprintk("RPC:       %s: FRMR %p is stale\n", __func__, r);
-
-	/* When this FRMR is re-inserted into rb_mws, it is no longer stale */
-	r->r.frmr.fr_state = FRMR_IS_INVALID;
-
-	memset(&invalidate_wr, 0, sizeof(invalidate_wr));
-	invalidate_wr.wr_id = (unsigned long)(void *)r;
-	invalidate_wr.opcode = IB_WR_LOCAL_INV;
-	invalidate_wr.ex.invalidate_rkey = r->r.frmr.fr_mr->rkey;
-	DECR_CQCOUNT(&r_xprt->rx_ep);
-
-	dprintk("RPC:       %s: frmr %p invalidating rkey %08x\n",
-		__func__, r, r->r.frmr.fr_mr->rkey);
-
-	read_lock(&ia->ri_qplock);
-	rc = ib_post_send(ia->ri_id->qp, &invalidate_wr, &bad_wr);
-	read_unlock(&ia->ri_qplock);
-	if (rc) {
-		/* Force rpcrdma_buffer_get() to retry */
-		r->r.frmr.fr_state = FRMR_IS_STALE;
-		dprintk("RPC:       %s: ib_post_send failed, %i\n",
-			__func__, rc);
-	}
-}
-
-static void
-rpcrdma_retry_flushed_linv(struct list_head *stale,
-			   struct rpcrdma_buffer *buf)
-{
-	struct rpcrdma_ia *ia = rdmab_to_ia(buf);
-	struct list_head *pos;
-	struct rpcrdma_mw *r;
-	unsigned long flags;
-
-	list_for_each(pos, stale) {
-		r = list_entry(pos, struct rpcrdma_mw, mw_list);
-		rpcrdma_retry_local_inv(r, ia);
-	}
-
-	spin_lock_irqsave(&buf->rb_lock, flags);
-	list_splice_tail(stale, &buf->rb_mws);
-	spin_unlock_irqrestore(&buf->rb_lock, flags);
-}
-
-static struct rpcrdma_req *
-rpcrdma_buffer_get_frmrs(struct rpcrdma_req *req, struct rpcrdma_buffer *buf,
-			 struct list_head *stale)
-{
-	struct rpcrdma_mw *r;
-	int i;
-
-	i = RPCRDMA_MAX_SEGS - 1;
-	while (!list_empty(&buf->rb_mws)) {
-		r = list_entry(buf->rb_mws.next,
-			       struct rpcrdma_mw, mw_list);
-		list_del(&r->mw_list);
-		if (r->r.frmr.fr_state == FRMR_IS_STALE) {
-			list_add(&r->mw_list, stale);
-			continue;
-		}
-		req->rl_segments[i].rl_mw = r;
-		if (unlikely(i-- == 0))
-			return req;	/* Success */
-	}
-
-	/* Not enough entries on rb_mws for this req */
-	rpcrdma_buffer_put_sendbuf(req, buf);
-	rpcrdma_buffer_put_mrs(req, buf);
-	return NULL;
-}
-
-static struct rpcrdma_req *
-rpcrdma_buffer_get_fmrs(struct rpcrdma_req *req, struct rpcrdma_buffer *buf)
-{
-	struct rpcrdma_mw *r;
-	int i;
-
-	i = RPCRDMA_MAX_SEGS - 1;
-	while (!list_empty(&buf->rb_mws)) {
-		r = list_entry(buf->rb_mws.next,
-			       struct rpcrdma_mw, mw_list);
-		list_del(&r->mw_list);
-		req->rl_segments[i].rl_mw = r;
-		if (unlikely(i-- == 0))
-			return req;	/* Success */
-	}
-
-	/* Not enough entries on rb_mws for this req */
-	rpcrdma_buffer_put_sendbuf(req, buf);
-	rpcrdma_buffer_put_mrs(req, buf);
-	return NULL;
-}
-
 /*
  * Get a set of request/reply buffers.
  *
@@ -1253,12 +1141,11 @@
 struct rpcrdma_req *
 rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
 {
-	struct rpcrdma_ia *ia = rdmab_to_ia(buffers);
-	struct list_head stale;
 	struct rpcrdma_req *req;
 	unsigned long flags;
 
 	spin_lock_irqsave(&buffers->rb_lock, flags);
+
 	if (buffers->rb_send_index == buffers->rb_max_requests) {
 		spin_unlock_irqrestore(&buffers->rb_lock, flags);
 		dprintk("RPC:       %s: out of request buffers\n", __func__);
@@ -1277,20 +1164,7 @@
 	}
 	buffers->rb_send_bufs[buffers->rb_send_index++] = NULL;
 
-	INIT_LIST_HEAD(&stale);
-	switch (ia->ri_memreg_strategy) {
-	case RPCRDMA_FRMR:
-		req = rpcrdma_buffer_get_frmrs(req, buffers, &stale);
-		break;
-	case RPCRDMA_MTHCAFMR:
-		req = rpcrdma_buffer_get_fmrs(req, buffers);
-		break;
-	default:
-		break;
-	}
 	spin_unlock_irqrestore(&buffers->rb_lock, flags);
-	if (!list_empty(&stale))
-		rpcrdma_retry_flushed_linv(&stale, buffers);
 	return req;
 }
 
@@ -1302,19 +1176,10 @@
 rpcrdma_buffer_put(struct rpcrdma_req *req)
 {
 	struct rpcrdma_buffer *buffers = req->rl_buffer;
-	struct rpcrdma_ia *ia = rdmab_to_ia(buffers);
 	unsigned long flags;
 
 	spin_lock_irqsave(&buffers->rb_lock, flags);
 	rpcrdma_buffer_put_sendbuf(req, buffers);
-	switch (ia->ri_memreg_strategy) {
-	case RPCRDMA_FRMR:
-	case RPCRDMA_MTHCAFMR:
-		rpcrdma_buffer_put_mrs(req, buffers);
-		break;
-	default:
-		break;
-	}
 	spin_unlock_irqrestore(&buffers->rb_lock, flags);
 }
 
@@ -1344,10 +1209,9 @@
 void
 rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep)
 {
-	struct rpcrdma_buffer *buffers = rep->rr_buffer;
+	struct rpcrdma_buffer *buffers = &rep->rr_rxprt->rx_buf;
 	unsigned long flags;
 
-	rep->rr_func = NULL;
 	spin_lock_irqsave(&buffers->rb_lock, flags);
 	buffers->rb_recv_bufs[--buffers->rb_recv_index] = rep;
 	spin_unlock_irqrestore(&buffers->rb_lock, flags);
@@ -1376,9 +1240,9 @@
 	/*
 	 * All memory passed here was kmalloc'ed, therefore phys-contiguous.
 	 */
-	iov->addr = ib_dma_map_single(ia->ri_id->device,
+	iov->addr = ib_dma_map_single(ia->ri_device,
 			va, len, DMA_BIDIRECTIONAL);
-	if (ib_dma_mapping_error(ia->ri_id->device, iov->addr))
+	if (ib_dma_mapping_error(ia->ri_device, iov->addr))
 		return -ENOMEM;
 
 	iov->length = len;
@@ -1422,8 +1286,8 @@
 {
 	int rc;
 
-	ib_dma_unmap_single(ia->ri_id->device,
-			iov->addr, iov->length, DMA_BIDIRECTIONAL);
+	ib_dma_unmap_single(ia->ri_device,
+			    iov->addr, iov->length, DMA_BIDIRECTIONAL);
 
 	if (NULL == mr)
 		return 0;
@@ -1516,15 +1380,18 @@
 	send_wr.num_sge = req->rl_niovs;
 	send_wr.opcode = IB_WR_SEND;
 	if (send_wr.num_sge == 4)	/* no need to sync any pad (constant) */
-		ib_dma_sync_single_for_device(ia->ri_id->device,
-			req->rl_send_iov[3].addr, req->rl_send_iov[3].length,
-			DMA_TO_DEVICE);
-	ib_dma_sync_single_for_device(ia->ri_id->device,
-		req->rl_send_iov[1].addr, req->rl_send_iov[1].length,
-		DMA_TO_DEVICE);
-	ib_dma_sync_single_for_device(ia->ri_id->device,
-		req->rl_send_iov[0].addr, req->rl_send_iov[0].length,
-		DMA_TO_DEVICE);
+		ib_dma_sync_single_for_device(ia->ri_device,
+					      req->rl_send_iov[3].addr,
+					      req->rl_send_iov[3].length,
+					      DMA_TO_DEVICE);
+	ib_dma_sync_single_for_device(ia->ri_device,
+				      req->rl_send_iov[1].addr,
+				      req->rl_send_iov[1].length,
+				      DMA_TO_DEVICE);
+	ib_dma_sync_single_for_device(ia->ri_device,
+				      req->rl_send_iov[0].addr,
+				      req->rl_send_iov[0].length,
+				      DMA_TO_DEVICE);
 
 	if (DECR_CQCOUNT(ep) > 0)
 		send_wr.send_flags = 0;
@@ -1557,7 +1424,7 @@
 	recv_wr.sg_list = &rep->rr_rdmabuf->rg_iov;
 	recv_wr.num_sge = 1;
 
-	ib_dma_sync_single_for_cpu(ia->ri_id->device,
+	ib_dma_sync_single_for_cpu(ia->ri_device,
 				   rdmab_addr(rep->rr_rdmabuf),
 				   rdmab_length(rep->rr_rdmabuf),
 				   DMA_BIDIRECTIONAL);
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index 58163b8..f49dd8b 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -62,6 +62,7 @@
 struct rpcrdma_ia {
 	const struct rpcrdma_memreg_ops	*ri_ops;
 	rwlock_t		ri_qplock;
+	struct ib_device	*ri_device;
 	struct rdma_cm_id 	*ri_id;
 	struct ib_pd		*ri_pd;
 	struct ib_mr		*ri_bind_mem;
@@ -69,7 +70,6 @@
 	int			ri_have_dma_lkey;
 	struct completion	ri_done;
 	int			ri_async_rc;
-	enum rpcrdma_memreg	ri_memreg_strategy;
 	unsigned int		ri_max_frmr_depth;
 	struct ib_device_attr	ri_devattr;
 	struct ib_qp_attr	ri_qp_attr;
@@ -173,9 +173,8 @@
 
 struct rpcrdma_rep {
 	unsigned int		rr_len;
-	struct rpcrdma_buffer	*rr_buffer;
-	struct rpc_xprt		*rr_xprt;
-	void			(*rr_func)(struct rpcrdma_rep *);
+	struct ib_device	*rr_device;
+	struct rpcrdma_xprt	*rr_rxprt;
 	struct list_head	rr_list;
 	struct rpcrdma_regbuf	*rr_rdmabuf;
 };
@@ -203,11 +202,18 @@
 	struct ib_fast_reg_page_list	*fr_pgl;
 	struct ib_mr			*fr_mr;
 	enum rpcrdma_frmr_state		fr_state;
+	struct work_struct		fr_work;
+	struct rpcrdma_xprt		*fr_xprt;
+};
+
+struct rpcrdma_fmr {
+	struct ib_fmr		*fmr;
+	u64			*physaddrs;
 };
 
 struct rpcrdma_mw {
 	union {
-		struct ib_fmr		*fmr;
+		struct rpcrdma_fmr	fmr;
 		struct rpcrdma_frmr	frmr;
 	} r;
 	void			(*mw_sendcompletion)(struct ib_wc *);
@@ -281,15 +287,17 @@
  * One of these is associated with a transport instance
  */
 struct rpcrdma_buffer {
-	spinlock_t	rb_lock;	/* protects indexes */
-	u32		rb_max_requests;/* client max requests */
-	struct list_head rb_mws;	/* optional memory windows/fmrs/frmrs */
-	struct list_head rb_all;
-	int		rb_send_index;
+	spinlock_t		rb_mwlock;	/* protect rb_mws list */
+	struct list_head	rb_mws;
+	struct list_head	rb_all;
+	char			*rb_pool;
+
+	spinlock_t		rb_lock;	/* protect buf arrays */
+	u32			rb_max_requests;
+	int			rb_send_index;
+	int			rb_recv_index;
 	struct rpcrdma_req	**rb_send_bufs;
-	int		rb_recv_index;
 	struct rpcrdma_rep	**rb_recv_bufs;
-	char		*rb_pool;
 };
 #define rdmab_to_ia(b) (&container_of((b), struct rpcrdma_xprt, rx_buf)->rx_ia)
 
@@ -350,7 +358,6 @@
 				   struct rpcrdma_create_data_internal *);
 	size_t		(*ro_maxpages)(struct rpcrdma_xprt *);
 	int		(*ro_init)(struct rpcrdma_xprt *);
-	void		(*ro_reset)(struct rpcrdma_xprt *);
 	void		(*ro_destroy)(struct rpcrdma_buffer *);
 	const char	*ro_displayname;
 };
@@ -413,6 +420,8 @@
 int rpcrdma_buffer_create(struct rpcrdma_xprt *);
 void rpcrdma_buffer_destroy(struct rpcrdma_buffer *);
 
+struct rpcrdma_mw *rpcrdma_get_mw(struct rpcrdma_xprt *);
+void rpcrdma_put_mw(struct rpcrdma_xprt *, struct rpcrdma_mw *);
 struct rpcrdma_req *rpcrdma_buffer_get(struct rpcrdma_buffer *);
 void rpcrdma_buffer_put(struct rpcrdma_req *);
 void rpcrdma_recv_buffer_get(struct rpcrdma_req *);
@@ -425,6 +434,9 @@
 
 unsigned int rpcrdma_max_segments(struct rpcrdma_xprt *);
 
+int frwr_alloc_recovery_wq(void);
+void frwr_destroy_recovery_wq(void);
+
 /*
  * Wrappers for chunk registration, shared by read/write chunk code.
  */
diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c
index b051728..e193c2b 100644
--- a/net/sunrpc/xprtsock.c
+++ b/net/sunrpc/xprtsock.c
@@ -623,24 +623,6 @@
 }
 
 /**
- * xs_tcp_shutdown - gracefully shut down a TCP socket
- * @xprt: transport
- *
- * Initiates a graceful shutdown of the TCP socket by calling the
- * equivalent of shutdown(SHUT_RDWR);
- */
-static void xs_tcp_shutdown(struct rpc_xprt *xprt)
-{
-	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
-	struct socket *sock = transport->sock;
-
-	if (sock != NULL) {
-		kernel_sock_shutdown(sock, SHUT_RDWR);
-		trace_rpc_socket_shutdown(xprt, sock);
-	}
-}
-
-/**
  * xs_tcp_send_request - write an RPC request to a TCP socket
  * @task: address of RPC task that manages the state of an RPC request
  *
@@ -786,6 +768,7 @@
 	xs_sock_reset_connection_flags(xprt);
 	/* Mark transport as closed and wake up all pending tasks */
 	xprt_disconnect_done(xprt);
+	xprt_force_disconnect(xprt);
 }
 
 /**
@@ -827,6 +810,9 @@
 	if (sk == NULL)
 		return;
 
+	if (atomic_read(&transport->xprt.swapper))
+		sk_clear_memalloc(sk);
+
 	write_lock_bh(&sk->sk_callback_lock);
 	transport->inet = NULL;
 	transport->sock = NULL;
@@ -863,6 +849,13 @@
 	xprt_disconnect_done(xprt);
 }
 
+static void xs_inject_disconnect(struct rpc_xprt *xprt)
+{
+	dprintk("RPC:       injecting transport disconnect on xprt=%p\n",
+		xprt);
+	xprt_disconnect_done(xprt);
+}
+
 static void xs_xprt_free(struct rpc_xprt *xprt)
 {
 	xs_free_peer_addresses(xprt);
@@ -901,7 +894,6 @@
 /**
  * xs_local_data_ready - "data ready" callback for AF_LOCAL sockets
  * @sk: socket with data to read
- * @len: how much data to read
  *
  * Currently this assumes we can read the whole reply in a single gulp.
  */
@@ -965,7 +957,6 @@
 /**
  * xs_udp_data_ready - "data ready" callback for UDP sockets
  * @sk: socket with data to read
- * @len: how much data to read
  *
  */
 static void xs_udp_data_ready(struct sock *sk)
@@ -1389,7 +1380,6 @@
 /**
  * xs_tcp_data_ready - "data ready" callback for TCP sockets
  * @sk: socket with data to read
- * @bytes: how much data to read
  *
  */
 static void xs_tcp_data_ready(struct sock *sk)
@@ -1886,9 +1876,7 @@
 
 /**
  * xs_local_setup_socket - create AF_LOCAL socket, connect to a local endpoint
- * @xprt: RPC transport to connect
  * @transport: socket transport to connect
- * @create_sock: function to create a socket of the correct type
  */
 static int xs_local_setup_socket(struct sock_xprt *transport)
 {
@@ -1960,43 +1948,84 @@
 		msleep_interruptible(15000);
 }
 
-#ifdef CONFIG_SUNRPC_SWAP
+#if IS_ENABLED(CONFIG_SUNRPC_SWAP)
+/*
+ * Note that this should be called with XPRT_LOCKED held (or when we otherwise
+ * know that we have exclusive access to the socket), to guard against
+ * races with xs_reset_transport.
+ */
 static void xs_set_memalloc(struct rpc_xprt *xprt)
 {
 	struct sock_xprt *transport = container_of(xprt, struct sock_xprt,
 			xprt);
 
-	if (xprt->swapper)
+	/*
+	 * If there's no sock, then we have nothing to set. The
+	 * reconnecting process will get it for us.
+	 */
+	if (!transport->inet)
+		return;
+	if (atomic_read(&xprt->swapper))
 		sk_set_memalloc(transport->inet);
 }
 
 /**
- * xs_swapper - Tag this transport as being used for swap.
+ * xs_enable_swap - Tag this transport as being used for swap.
  * @xprt: transport to tag
- * @enable: enable/disable
  *
+ * Take a reference to this transport on behalf of the rpc_clnt, and
+ * optionally mark it for swapping if it wasn't already.
  */
-int xs_swapper(struct rpc_xprt *xprt, int enable)
+static int
+xs_enable_swap(struct rpc_xprt *xprt)
 {
-	struct sock_xprt *transport = container_of(xprt, struct sock_xprt,
-			xprt);
-	int err = 0;
+	struct sock_xprt *xs = container_of(xprt, struct sock_xprt, xprt);
 
-	if (enable) {
-		xprt->swapper++;
-		xs_set_memalloc(xprt);
-	} else if (xprt->swapper) {
-		xprt->swapper--;
-		sk_clear_memalloc(transport->inet);
-	}
-
-	return err;
+	if (atomic_inc_return(&xprt->swapper) != 1)
+		return 0;
+	if (wait_on_bit_lock(&xprt->state, XPRT_LOCKED, TASK_KILLABLE))
+		return -ERESTARTSYS;
+	if (xs->inet)
+		sk_set_memalloc(xs->inet);
+	xprt_release_xprt(xprt, NULL);
+	return 0;
 }
-EXPORT_SYMBOL_GPL(xs_swapper);
+
+/**
+ * xs_disable_swap - Untag this transport as being used for swap.
+ * @xprt: transport to tag
+ *
+ * Drop a "swapper" reference to this xprt on behalf of the rpc_clnt. If the
+ * swapper refcount goes to 0, untag the socket as a memalloc socket.
+ */
+static void
+xs_disable_swap(struct rpc_xprt *xprt)
+{
+	struct sock_xprt *xs = container_of(xprt, struct sock_xprt, xprt);
+
+	if (!atomic_dec_and_test(&xprt->swapper))
+		return;
+	if (wait_on_bit_lock(&xprt->state, XPRT_LOCKED, TASK_KILLABLE))
+		return;
+	if (xs->inet)
+		sk_clear_memalloc(xs->inet);
+	xprt_release_xprt(xprt, NULL);
+}
 #else
 static void xs_set_memalloc(struct rpc_xprt *xprt)
 {
 }
+
+static int
+xs_enable_swap(struct rpc_xprt *xprt)
+{
+	return -EINVAL;
+}
+
+static void
+xs_disable_swap(struct rpc_xprt *xprt)
+{
+}
 #endif
 
 static void xs_udp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock)
@@ -2057,6 +2086,27 @@
 	xprt_wake_pending_tasks(xprt, status);
 }
 
+/**
+ * xs_tcp_shutdown - gracefully shut down a TCP socket
+ * @xprt: transport
+ *
+ * Initiates a graceful shutdown of the TCP socket by calling the
+ * equivalent of shutdown(SHUT_RDWR);
+ */
+static void xs_tcp_shutdown(struct rpc_xprt *xprt)
+{
+	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
+	struct socket *sock = transport->sock;
+
+	if (sock == NULL)
+		return;
+	if (xprt_connected(xprt)) {
+		kernel_sock_shutdown(sock, SHUT_RDWR);
+		trace_rpc_socket_shutdown(xprt, sock);
+	} else
+		xs_reset_transport(transport);
+}
+
 static int xs_tcp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock)
 {
 	struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
@@ -2067,6 +2117,7 @@
 		unsigned int keepidle = xprt->timeout->to_initval / HZ;
 		unsigned int keepcnt = xprt->timeout->to_retries + 1;
 		unsigned int opt_on = 1;
+		unsigned int timeo;
 
 		/* TCP Keepalive options */
 		kernel_setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE,
@@ -2078,6 +2129,12 @@
 		kernel_setsockopt(sock, SOL_TCP, TCP_KEEPCNT,
 				(char *)&keepcnt, sizeof(keepcnt));
 
+		/* TCP user timeout (see RFC5482) */
+		timeo = jiffies_to_msecs(xprt->timeout->to_initval) *
+			(xprt->timeout->to_retries + 1);
+		kernel_setsockopt(sock, SOL_TCP, TCP_USER_TIMEOUT,
+				(char *)&timeo, sizeof(timeo));
+
 		write_lock_bh(&sk->sk_callback_lock);
 
 		xs_save_old_callbacks(transport, sk);
@@ -2125,9 +2182,6 @@
 
 /**
  * xs_tcp_setup_socket - create a TCP socket and connect to a remote endpoint
- * @xprt: RPC transport to connect
- * @transport: socket transport to connect
- * @create_sock: function to create a socket of the correct type
  *
  * Invoked by a work queue tasklet.
  */
@@ -2463,6 +2517,8 @@
 	.close			= xs_close,
 	.destroy		= xs_destroy,
 	.print_stats		= xs_local_print_stats,
+	.enable_swap		= xs_enable_swap,
+	.disable_swap		= xs_disable_swap,
 };
 
 static struct rpc_xprt_ops xs_udp_ops = {
@@ -2482,6 +2538,9 @@
 	.close			= xs_close,
 	.destroy		= xs_destroy,
 	.print_stats		= xs_udp_print_stats,
+	.enable_swap		= xs_enable_swap,
+	.disable_swap		= xs_disable_swap,
+	.inject_disconnect	= xs_inject_disconnect,
 };
 
 static struct rpc_xprt_ops xs_tcp_ops = {
@@ -2498,6 +2557,9 @@
 	.close			= xs_tcp_shutdown,
 	.destroy		= xs_destroy,
 	.print_stats		= xs_tcp_print_stats,
+	.enable_swap		= xs_enable_swap,
+	.disable_swap		= xs_disable_swap,
+	.inject_disconnect	= xs_inject_disconnect,
 };
 
 /*
@@ -2515,6 +2577,9 @@
 	.close			= bc_close,
 	.destroy		= bc_destroy,
 	.print_stats		= xs_tcp_print_stats,
+	.enable_swap		= xs_enable_swap,
+	.disable_swap		= xs_disable_swap,
+	.inject_disconnect	= xs_inject_disconnect,
 };
 
 static int xs_init_anyaddr(const int family, struct sockaddr *sap)