Merge branch 'nfs-for-2.6.39' into nfs-for-next
diff --git a/fs/nfs/dir.c b/fs/nfs/dir.c
index a83cd0d..7237672 100644
--- a/fs/nfs/dir.c
+++ b/fs/nfs/dir.c
@@ -44,6 +44,7 @@
 /* #define NFS_DEBUG_VERBOSE 1 */
 
 static int nfs_opendir(struct inode *, struct file *);
+static int nfs_closedir(struct inode *, struct file *);
 static int nfs_readdir(struct file *, void *, filldir_t);
 static struct dentry *nfs_lookup(struct inode *, struct dentry *, struct nameidata *);
 static int nfs_create(struct inode *, struct dentry *, int, struct nameidata *);
@@ -64,7 +65,7 @@
 	.read		= generic_read_dir,
 	.readdir	= nfs_readdir,
 	.open		= nfs_opendir,
-	.release	= nfs_release,
+	.release	= nfs_closedir,
 	.fsync		= nfs_fsync_dir,
 };
 
@@ -133,13 +134,35 @@
 
 #endif /* CONFIG_NFS_V4 */
 
+static struct nfs_open_dir_context *alloc_nfs_open_dir_context(struct rpc_cred *cred)
+{
+	struct nfs_open_dir_context *ctx;
+	ctx = kmalloc(sizeof(*ctx), GFP_KERNEL);
+	if (ctx != NULL) {
+		ctx->duped = 0;
+		ctx->dir_cookie = 0;
+		ctx->dup_cookie = 0;
+		ctx->cred = get_rpccred(cred);
+	} else
+		ctx = ERR_PTR(-ENOMEM);
+	return ctx;
+}
+
+static void put_nfs_open_dir_context(struct nfs_open_dir_context *ctx)
+{
+	put_rpccred(ctx->cred);
+	kfree(ctx);
+}
+
 /*
  * Open file
  */
 static int
 nfs_opendir(struct inode *inode, struct file *filp)
 {
-	int res;
+	int res = 0;
+	struct nfs_open_dir_context *ctx;
+	struct rpc_cred *cred;
 
 	dfprintk(FILE, "NFS: open dir(%s/%s)\n",
 			filp->f_path.dentry->d_parent->d_name.name,
@@ -147,8 +170,15 @@
 
 	nfs_inc_stats(inode, NFSIOS_VFSOPEN);
 
-	/* Call generic open code in order to cache credentials */
-	res = nfs_open(inode, filp);
+	cred = rpc_lookup_cred();
+	if (IS_ERR(cred))
+		return PTR_ERR(cred);
+	ctx = alloc_nfs_open_dir_context(cred);
+	if (IS_ERR(ctx)) {
+		res = PTR_ERR(ctx);
+		goto out;
+	}
+	filp->private_data = ctx;
 	if (filp->f_path.dentry == filp->f_path.mnt->mnt_root) {
 		/* This is a mountpoint, so d_revalidate will never
 		 * have been called, so we need to refresh the
@@ -156,9 +186,18 @@
 		 */
 		__nfs_revalidate_inode(NFS_SERVER(inode), inode);
 	}
+out:
+	put_rpccred(cred);
 	return res;
 }
 
+static int
+nfs_closedir(struct inode *inode, struct file *filp)
+{
+	put_nfs_open_dir_context(filp->private_data);
+	return 0;
+}
+
 struct nfs_cache_array_entry {
 	u64 cookie;
 	u64 ino;
@@ -284,19 +323,20 @@
 {
 	loff_t diff = desc->file->f_pos - desc->current_index;
 	unsigned int index;
+	struct nfs_open_dir_context *ctx = desc->file->private_data;
 
 	if (diff < 0)
 		goto out_eof;
 	if (diff >= array->size) {
 		if (array->eof_index >= 0)
 			goto out_eof;
-		desc->current_index += array->size;
 		return -EAGAIN;
 	}
 
 	index = (unsigned int)diff;
 	*desc->dir_cookie = array->array[index].cookie;
 	desc->cache_entry_index = index;
+	ctx->duped = 0;
 	return 0;
 out_eof:
 	desc->eof = 1;
@@ -307,10 +347,18 @@
 int nfs_readdir_search_for_cookie(struct nfs_cache_array *array, nfs_readdir_descriptor_t *desc)
 {
 	int i;
+	loff_t new_pos;
 	int status = -EAGAIN;
+	struct nfs_open_dir_context *ctx = desc->file->private_data;
 
 	for (i = 0; i < array->size; i++) {
 		if (array->array[i].cookie == *desc->dir_cookie) {
+			new_pos = desc->current_index + i;
+			if (new_pos < desc->file->f_pos) {
+				ctx->dup_cookie = *desc->dir_cookie;
+				ctx->duped = 1;
+			}
+			desc->file->f_pos = new_pos;
 			desc->cache_entry_index = i;
 			return 0;
 		}
@@ -342,6 +390,7 @@
 
 	if (status == -EAGAIN) {
 		desc->last_cookie = array->last_cookie;
+		desc->current_index += array->size;
 		desc->page_index++;
 	}
 	nfs_readdir_release_array(desc->page);
@@ -354,7 +403,8 @@
 int nfs_readdir_xdr_filler(struct page **pages, nfs_readdir_descriptor_t *desc,
 			struct nfs_entry *entry, struct file *file, struct inode *inode)
 {
-	struct rpc_cred	*cred = nfs_file_cred(file);
+	struct nfs_open_dir_context *ctx = file->private_data;
+	struct rpc_cred	*cred = ctx->cred;
 	unsigned long	timestamp, gencount;
 	int		error;
 
@@ -693,6 +743,20 @@
 	int i = 0;
 	int res = 0;
 	struct nfs_cache_array *array = NULL;
+	struct nfs_open_dir_context *ctx = file->private_data;
+
+	if (ctx->duped != 0 && ctx->dup_cookie == *desc->dir_cookie) {
+		if (printk_ratelimit()) {
+			pr_notice("NFS: directory %s/%s contains a readdir loop.  "
+				"Please contact your server vendor.  "
+				"Offending cookie: %llu\n",
+				file->f_dentry->d_parent->d_name.name,
+				file->f_dentry->d_name.name,
+				*desc->dir_cookie);
+		}
+		res = -ELOOP;
+		goto out;
+	}
 
 	array = nfs_readdir_get_array(desc->page);
 	if (IS_ERR(array)) {
@@ -785,6 +849,7 @@
 	struct inode	*inode = dentry->d_inode;
 	nfs_readdir_descriptor_t my_desc,
 			*desc = &my_desc;
+	struct nfs_open_dir_context *dir_ctx = filp->private_data;
 	int res;
 
 	dfprintk(FILE, "NFS: readdir(%s/%s) starting at cookie %llu\n",
@@ -801,7 +866,7 @@
 	memset(desc, 0, sizeof(*desc));
 
 	desc->file = filp;
-	desc->dir_cookie = &nfs_file_open_context(filp)->dir_cookie;
+	desc->dir_cookie = &dir_ctx->dir_cookie;
 	desc->decode = NFS_PROTO(inode)->decode_dirent;
 	desc->plus = NFS_USE_READDIRPLUS(inode);
 
@@ -853,6 +918,7 @@
 {
 	struct dentry *dentry = filp->f_path.dentry;
 	struct inode *inode = dentry->d_inode;
+	struct nfs_open_dir_context *dir_ctx = filp->private_data;
 
 	dfprintk(FILE, "NFS: llseek dir(%s/%s, %lld, %d)\n",
 			dentry->d_parent->d_name.name,
@@ -872,7 +938,8 @@
 	}
 	if (offset != filp->f_pos) {
 		filp->f_pos = offset;
-		nfs_file_open_context(filp)->dir_cookie = 0;
+		dir_ctx->dir_cookie = 0;
+		dir_ctx->duped = 0;
 	}
 out:
 	mutex_unlock(&inode->i_mutex);
diff --git a/fs/nfs/file.c b/fs/nfs/file.c
index d85a534..3ac5bd6 100644
--- a/fs/nfs/file.c
+++ b/fs/nfs/file.c
@@ -326,6 +326,9 @@
 		ret = xchg(&ctx->error, 0);
 	if (!ret && status < 0)
 		ret = status;
+	if (!ret && !datasync)
+		/* application has asked for meta-data sync */
+		ret = pnfs_layoutcommit_inode(inode, true);
 	return ret;
 }
 
diff --git a/fs/nfs/getroot.c b/fs/nfs/getroot.c
index 1084792..dcb6154 100644
--- a/fs/nfs/getroot.c
+++ b/fs/nfs/getroot.c
@@ -222,6 +222,10 @@
 		goto out;
 	}
 
+	if (fattr->valid & NFS_ATTR_FATTR_FSID &&
+	    !nfs_fsid_equal(&server->fsid, &fattr->fsid))
+		memcpy(&server->fsid, &fattr->fsid, sizeof(server->fsid));
+
 	inode = nfs_fhget(sb, mntfh, fattr);
 	if (IS_ERR(inode)) {
 		dprintk("nfs_get_root: get root inode failed\n");
diff --git a/fs/nfs/inode.c b/fs/nfs/inode.c
index 058d7d6..57bb31a 100644
--- a/fs/nfs/inode.c
+++ b/fs/nfs/inode.c
@@ -641,7 +641,6 @@
 		ctx->mode = f_mode;
 		ctx->flags = 0;
 		ctx->error = 0;
-		ctx->dir_cookie = 0;
 		nfs_init_lock_context(&ctx->lock_context);
 		ctx->lock_context.open_context = ctx;
 		INIT_LIST_HEAD(&ctx->list);
@@ -1473,6 +1472,7 @@
 	nfsi->delegation_state = 0;
 	init_rwsem(&nfsi->rwsem);
 	nfsi->layout = NULL;
+	atomic_set(&nfsi->commits_outstanding, 0);
 #endif
 }
 
diff --git a/fs/nfs/internal.h b/fs/nfs/internal.h
index 345d86b..ce118ce 100644
--- a/fs/nfs/internal.h
+++ b/fs/nfs/internal.h
@@ -283,11 +283,25 @@
 extern void nfs_read_prepare(struct rpc_task *task, void *calldata);
 
 /* write.c */
+extern void nfs_commit_free(struct nfs_write_data *p);
 extern int nfs_initiate_write(struct nfs_write_data *data,
 			      struct rpc_clnt *clnt,
 			      const struct rpc_call_ops *call_ops,
 			      int how);
 extern void nfs_write_prepare(struct rpc_task *task, void *calldata);
+extern int nfs_initiate_commit(struct nfs_write_data *data,
+			       struct rpc_clnt *clnt,
+			       const struct rpc_call_ops *call_ops,
+			       int how);
+extern void nfs_init_commit(struct nfs_write_data *data,
+			    struct list_head *head,
+			    struct pnfs_layout_segment *lseg);
+void nfs_retry_commit(struct list_head *page_list,
+		      struct pnfs_layout_segment *lseg);
+void nfs_commit_clear_lock(struct nfs_inode *nfsi);
+void nfs_commitdata_release(void *data);
+void nfs_commit_release_pages(struct nfs_write_data *data);
+
 #ifdef CONFIG_MIGRATION
 extern int nfs_migrate_page(struct address_space *,
 		struct page *, struct page *);
diff --git a/fs/nfs/nfs4_fs.h b/fs/nfs/nfs4_fs.h
index 72e5f1a..e1c261d 100644
--- a/fs/nfs/nfs4_fs.h
+++ b/fs/nfs/nfs4_fs.h
@@ -263,6 +263,8 @@
 extern int nfs4_init_session(struct nfs_server *server);
 extern int nfs4_proc_get_lease_time(struct nfs_client *clp,
 		struct nfs_fsinfo *fsinfo);
+extern int nfs4_proc_layoutcommit(struct nfs4_layoutcommit_data *data,
+				  bool sync);
 
 static inline bool
 is_ds_only_client(struct nfs_client *clp)
diff --git a/fs/nfs/nfs4filelayout.c b/fs/nfs/nfs4filelayout.c
index 4285584..6f8192f 100644
--- a/fs/nfs/nfs4filelayout.c
+++ b/fs/nfs/nfs4filelayout.c
@@ -154,6 +154,23 @@
 }
 
 /*
+ * We reference the rpc_cred of the first WRITE that triggers the need for
+ * a LAYOUTCOMMIT, and use it to send the layoutcommit compound.
+ * rfc5661 is not clear about which credential should be used.
+ */
+static void
+filelayout_set_layoutcommit(struct nfs_write_data *wdata)
+{
+	if (FILELAYOUT_LSEG(wdata->lseg)->commit_through_mds ||
+	    wdata->res.verf->committed == NFS_FILE_SYNC)
+		return;
+
+	pnfs_set_layoutcommit(wdata);
+	dprintk("%s ionde %lu pls_end_pos %lu\n", __func__, wdata->inode->i_ino,
+		(unsigned long) wdata->lseg->pls_end_pos);
+}
+
+/*
  * Call ops for the async read/write cases
  * In the case of dense layouts, the offset needs to be reset to its
  * original value.
@@ -210,6 +227,38 @@
 		return -EAGAIN;
 	}
 
+	filelayout_set_layoutcommit(data);
+	return 0;
+}
+
+/* Fake up some data that will cause nfs_commit_release to retry the writes. */
+static void prepare_to_resend_writes(struct nfs_write_data *data)
+{
+	struct nfs_page *first = nfs_list_entry(data->pages.next);
+
+	data->task.tk_status = 0;
+	memcpy(data->verf.verifier, first->wb_verf.verifier,
+	       sizeof(first->wb_verf.verifier));
+	data->verf.verifier[0]++; /* ensure verifier mismatch */
+}
+
+static int filelayout_commit_done_cb(struct rpc_task *task,
+				     struct nfs_write_data *data)
+{
+	int reset = 0;
+
+	if (filelayout_async_handle_error(task, data->args.context->state,
+					  data->ds_clp, &reset) == -EAGAIN) {
+		dprintk("%s calling restart ds_clp %p ds_clp->cl_session %p\n",
+			__func__, data->ds_clp, data->ds_clp->cl_session);
+		if (reset) {
+			prepare_to_resend_writes(data);
+			filelayout_set_lo_fail(data->lseg);
+		} else
+			nfs_restart_rpc(task, data->ds_clp);
+		return -EAGAIN;
+	}
+
 	return 0;
 }
 
@@ -240,6 +289,16 @@
 	wdata->mds_ops->rpc_release(data);
 }
 
+static void filelayout_commit_release(void *data)
+{
+	struct nfs_write_data *wdata = (struct nfs_write_data *)data;
+
+	nfs_commit_release_pages(wdata);
+	if (atomic_dec_and_test(&NFS_I(wdata->inode)->commits_outstanding))
+		nfs_commit_clear_lock(NFS_I(wdata->inode));
+	nfs_commitdata_release(wdata);
+}
+
 struct rpc_call_ops filelayout_read_call_ops = {
 	.rpc_call_prepare = filelayout_read_prepare,
 	.rpc_call_done = filelayout_read_call_done,
@@ -252,6 +311,12 @@
 	.rpc_release = filelayout_write_release,
 };
 
+struct rpc_call_ops filelayout_commit_call_ops = {
+	.rpc_call_prepare = filelayout_write_prepare,
+	.rpc_call_done = filelayout_write_call_done,
+	.rpc_release = filelayout_commit_release,
+};
+
 static enum pnfs_try_status
 filelayout_read_pagelist(struct nfs_read_data *data)
 {
@@ -320,10 +385,6 @@
 		data->inode->i_ino, sync, (size_t) data->args.count, offset,
 		ntohl(ds->ds_ip_addr), ntohs(ds->ds_port));
 
-	/* We can't handle commit to ds yet */
-	if (!FILELAYOUT_LSEG(lseg)->commit_through_mds)
-		data->args.stable = NFS_FILE_SYNC;
-
 	data->write_done_cb = filelayout_write_done_cb;
 	data->ds_clp = ds->ds_clp;
 	fh = nfs4_fl_select_ds_fh(lseg, j);
@@ -441,12 +502,33 @@
 			 struct nfs4_layoutget_res *lgr,
 			 struct nfs4_deviceid *id)
 {
-	uint32_t *p = (uint32_t *)lgr->layout.buf;
+	struct xdr_stream stream;
+	struct xdr_buf buf = {
+		.pages =  lgr->layoutp->pages,
+		.page_len =  lgr->layoutp->len,
+		.buflen =  lgr->layoutp->len,
+		.len = lgr->layoutp->len,
+	};
+	struct page *scratch;
+	__be32 *p;
 	uint32_t nfl_util;
 	int i;
 
 	dprintk("%s: set_layout_map Begin\n", __func__);
 
+	scratch = alloc_page(GFP_KERNEL);
+	if (!scratch)
+		return -ENOMEM;
+
+	xdr_init_decode(&stream, &buf, NULL);
+	xdr_set_scratch_buffer(&stream, page_address(scratch), PAGE_SIZE);
+
+	/* 20 = ufl_util (4), first_stripe_index (4), pattern_offset (8),
+	 * num_fh (4) */
+	p = xdr_inline_decode(&stream, NFS4_DEVICEID4_SIZE + 20);
+	if (unlikely(!p))
+		goto out_err;
+
 	memcpy(id, p, sizeof(*id));
 	p += XDR_QUADLEN(NFS4_DEVICEID4_SIZE);
 	print_deviceid(id);
@@ -468,32 +550,57 @@
 		__func__, nfl_util, fl->num_fh, fl->first_stripe_index,
 		fl->pattern_offset);
 
+	if (!fl->num_fh)
+		goto out_err;
+
 	fl->fh_array = kzalloc(fl->num_fh * sizeof(struct nfs_fh *),
 			       GFP_KERNEL);
 	if (!fl->fh_array)
-		return -ENOMEM;
+		goto out_err;
 
 	for (i = 0; i < fl->num_fh; i++) {
 		/* Do we want to use a mempool here? */
 		fl->fh_array[i] = kmalloc(sizeof(struct nfs_fh), GFP_KERNEL);
-		if (!fl->fh_array[i]) {
-			filelayout_free_fh_array(fl);
-			return -ENOMEM;
-		}
+		if (!fl->fh_array[i])
+			goto out_err_free;
+
+		p = xdr_inline_decode(&stream, 4);
+		if (unlikely(!p))
+			goto out_err_free;
 		fl->fh_array[i]->size = be32_to_cpup(p++);
 		if (sizeof(struct nfs_fh) < fl->fh_array[i]->size) {
 			printk(KERN_ERR "Too big fh %d received %d\n",
 			       i, fl->fh_array[i]->size);
-			filelayout_free_fh_array(fl);
-			return -EIO;
+			goto out_err_free;
 		}
+
+		p = xdr_inline_decode(&stream, fl->fh_array[i]->size);
+		if (unlikely(!p))
+			goto out_err_free;
 		memcpy(fl->fh_array[i]->data, p, fl->fh_array[i]->size);
-		p += XDR_QUADLEN(fl->fh_array[i]->size);
 		dprintk("DEBUG: %s: fh len %d\n", __func__,
 			fl->fh_array[i]->size);
 	}
 
+	__free_page(scratch);
 	return 0;
+
+out_err_free:
+	filelayout_free_fh_array(fl);
+out_err:
+	__free_page(scratch);
+	return -EIO;
+}
+
+static void
+filelayout_free_lseg(struct pnfs_layout_segment *lseg)
+{
+	struct nfs4_filelayout_segment *fl = FILELAYOUT_LSEG(lseg);
+
+	dprintk("--> %s\n", __func__);
+	nfs4_fl_put_deviceid(fl->dsaddr);
+	kfree(fl->commit_buckets);
+	_filelayout_free_lseg(fl);
 }
 
 static struct pnfs_layout_segment *
@@ -514,19 +621,30 @@
 		_filelayout_free_lseg(fl);
 		return NULL;
 	}
+
+	/* This assumes there is only one IOMODE_RW lseg.  What
+	 * we really want to do is have a layout_hdr level
+	 * dictionary of <multipath_list4, fh> keys, each
+	 * associated with a struct list_head, populated by calls
+	 * to filelayout_write_pagelist().
+	 * */
+	if ((!fl->commit_through_mds) && (lgr->range.iomode == IOMODE_RW)) {
+		int i;
+		int size = (fl->stripe_type == STRIPE_SPARSE) ?
+			fl->dsaddr->ds_num : fl->dsaddr->stripe_count;
+
+		fl->commit_buckets = kcalloc(size, sizeof(struct list_head), GFP_KERNEL);
+		if (!fl->commit_buckets) {
+			filelayout_free_lseg(&fl->generic_hdr);
+			return NULL;
+		}
+		fl->number_of_buckets = size;
+		for (i = 0; i < size; i++)
+			INIT_LIST_HEAD(&fl->commit_buckets[i]);
+	}
 	return &fl->generic_hdr;
 }
 
-static void
-filelayout_free_lseg(struct pnfs_layout_segment *lseg)
-{
-	struct nfs4_filelayout_segment *fl = FILELAYOUT_LSEG(lseg);
-
-	dprintk("--> %s\n", __func__);
-	nfs4_fl_put_deviceid(fl->dsaddr);
-	_filelayout_free_lseg(fl);
-}
-
 /*
  * filelayout_pg_test(). Called by nfs_can_coalesce_requests()
  *
@@ -552,6 +670,191 @@
 	return (p_stripe == r_stripe);
 }
 
+static bool filelayout_mark_pnfs_commit(struct pnfs_layout_segment *lseg)
+{
+	return !FILELAYOUT_LSEG(lseg)->commit_through_mds;
+}
+
+static u32 select_bucket_index(struct nfs4_filelayout_segment *fl, u32 j)
+{
+	if (fl->stripe_type == STRIPE_SPARSE)
+		return nfs4_fl_calc_ds_index(&fl->generic_hdr, j);
+	else
+		return j;
+}
+
+struct list_head *filelayout_choose_commit_list(struct nfs_page *req)
+{
+	struct pnfs_layout_segment *lseg = req->wb_commit_lseg;
+	struct nfs4_filelayout_segment *fl = FILELAYOUT_LSEG(lseg);
+	u32 i, j;
+	struct list_head *list;
+
+	/* Note that we are calling nfs4_fl_calc_j_index on each page
+	 * that ends up being committed to a data server.  An attractive
+	 * alternative is to add a field to nfs_write_data and nfs_page
+	 * to store the value calculated in filelayout_write_pagelist
+	 * and just use that here.
+	 */
+	j = nfs4_fl_calc_j_index(lseg,
+				 (loff_t)req->wb_index << PAGE_CACHE_SHIFT);
+	i = select_bucket_index(fl, j);
+	list = &fl->commit_buckets[i];
+	if (list_empty(list)) {
+		/* Non-empty buckets hold a reference on the lseg */
+		get_lseg(lseg);
+	}
+	return list;
+}
+
+static u32 calc_ds_index_from_commit(struct pnfs_layout_segment *lseg, u32 i)
+{
+	struct nfs4_filelayout_segment *flseg = FILELAYOUT_LSEG(lseg);
+
+	if (flseg->stripe_type == STRIPE_SPARSE)
+		return i;
+	else
+		return nfs4_fl_calc_ds_index(lseg, i);
+}
+
+static struct nfs_fh *
+select_ds_fh_from_commit(struct pnfs_layout_segment *lseg, u32 i)
+{
+	struct nfs4_filelayout_segment *flseg = FILELAYOUT_LSEG(lseg);
+
+	if (flseg->stripe_type == STRIPE_SPARSE) {
+		if (flseg->num_fh == 1)
+			i = 0;
+		else if (flseg->num_fh == 0)
+			/* Use the MDS OPEN fh set in nfs_read_rpcsetup */
+			return NULL;
+	}
+	return flseg->fh_array[i];
+}
+
+static int filelayout_initiate_commit(struct nfs_write_data *data, int how)
+{
+	struct pnfs_layout_segment *lseg = data->lseg;
+	struct nfs4_pnfs_ds *ds;
+	u32 idx;
+	struct nfs_fh *fh;
+
+	idx = calc_ds_index_from_commit(lseg, data->ds_commit_index);
+	ds = nfs4_fl_prepare_ds(lseg, idx);
+	if (!ds) {
+		printk(KERN_ERR "%s: prepare_ds failed, use MDS\n", __func__);
+		set_bit(lo_fail_bit(IOMODE_RW), &lseg->pls_layout->plh_flags);
+		set_bit(lo_fail_bit(IOMODE_READ), &lseg->pls_layout->plh_flags);
+		prepare_to_resend_writes(data);
+		data->mds_ops->rpc_release(data);
+		return -EAGAIN;
+	}
+	dprintk("%s ino %lu, how %d\n", __func__, data->inode->i_ino, how);
+	data->write_done_cb = filelayout_commit_done_cb;
+	data->ds_clp = ds->ds_clp;
+	fh = select_ds_fh_from_commit(lseg, data->ds_commit_index);
+	if (fh)
+		data->args.fh = fh;
+	return nfs_initiate_commit(data, ds->ds_clp->cl_rpcclient,
+				   &filelayout_commit_call_ops, how);
+}
+
+/*
+ * This is only useful while we are using whole file layouts.
+ */
+static struct pnfs_layout_segment *find_only_write_lseg(struct inode *inode)
+{
+	struct pnfs_layout_segment *lseg, *rv = NULL;
+
+	spin_lock(&inode->i_lock);
+	list_for_each_entry(lseg, &NFS_I(inode)->layout->plh_segs, pls_list)
+		if (lseg->pls_range.iomode == IOMODE_RW)
+			rv = get_lseg(lseg);
+	spin_unlock(&inode->i_lock);
+	return rv;
+}
+
+static int alloc_ds_commits(struct inode *inode, struct list_head *list)
+{
+	struct pnfs_layout_segment *lseg;
+	struct nfs4_filelayout_segment *fl;
+	struct nfs_write_data *data;
+	int i, j;
+
+	/* Won't need this when non-whole file layout segments are supported
+	 * instead we will use a pnfs_layout_hdr structure */
+	lseg = find_only_write_lseg(inode);
+	if (!lseg)
+		return 0;
+	fl = FILELAYOUT_LSEG(lseg);
+	for (i = 0; i < fl->number_of_buckets; i++) {
+		if (list_empty(&fl->commit_buckets[i]))
+			continue;
+		data = nfs_commitdata_alloc();
+		if (!data)
+			goto out_bad;
+		data->ds_commit_index = i;
+		data->lseg = lseg;
+		list_add(&data->pages, list);
+	}
+	put_lseg(lseg);
+	return 0;
+
+out_bad:
+	for (j = i; j < fl->number_of_buckets; j++) {
+		if (list_empty(&fl->commit_buckets[i]))
+			continue;
+		nfs_retry_commit(&fl->commit_buckets[i], lseg);
+		put_lseg(lseg);  /* associated with emptying bucket */
+	}
+	put_lseg(lseg);
+	/* Caller will clean up entries put on list */
+	return -ENOMEM;
+}
+
+/* This follows nfs_commit_list pretty closely */
+static int
+filelayout_commit_pagelist(struct inode *inode, struct list_head *mds_pages,
+			   int how)
+{
+	struct nfs_write_data	*data, *tmp;
+	LIST_HEAD(list);
+
+	if (!list_empty(mds_pages)) {
+		data = nfs_commitdata_alloc();
+		if (!data)
+			goto out_bad;
+		data->lseg = NULL;
+		list_add(&data->pages, &list);
+	}
+
+	if (alloc_ds_commits(inode, &list))
+		goto out_bad;
+
+	list_for_each_entry_safe(data, tmp, &list, pages) {
+		list_del_init(&data->pages);
+		atomic_inc(&NFS_I(inode)->commits_outstanding);
+		if (!data->lseg) {
+			nfs_init_commit(data, mds_pages, NULL);
+			nfs_initiate_commit(data, NFS_CLIENT(inode),
+					    data->mds_ops, how);
+		} else {
+			nfs_init_commit(data, &FILELAYOUT_LSEG(data->lseg)->commit_buckets[data->ds_commit_index], data->lseg);
+			filelayout_initiate_commit(data, how);
+		}
+	}
+	return 0;
+ out_bad:
+	list_for_each_entry_safe(data, tmp, &list, pages) {
+		nfs_retry_commit(&data->pages, data->lseg);
+		list_del_init(&data->pages);
+		nfs_commit_free(data);
+	}
+	nfs_retry_commit(mds_pages, NULL);
+	nfs_commit_clear_lock(NFS_I(inode));
+	return -ENOMEM;
+}
+
 static struct pnfs_layoutdriver_type filelayout_type = {
 	.id			= LAYOUT_NFSV4_1_FILES,
 	.name			= "LAYOUT_NFSV4_1_FILES",
@@ -559,6 +862,9 @@
 	.alloc_lseg		= filelayout_alloc_lseg,
 	.free_lseg		= filelayout_free_lseg,
 	.pg_test		= filelayout_pg_test,
+	.mark_pnfs_commit	= filelayout_mark_pnfs_commit,
+	.choose_commit_list	= filelayout_choose_commit_list,
+	.commit_pagelist	= filelayout_commit_pagelist,
 	.read_pagelist		= filelayout_read_pagelist,
 	.write_pagelist		= filelayout_write_pagelist,
 };
diff --git a/fs/nfs/nfs4filelayout.h b/fs/nfs/nfs4filelayout.h
index ee0c907..085a354 100644
--- a/fs/nfs/nfs4filelayout.h
+++ b/fs/nfs/nfs4filelayout.h
@@ -79,6 +79,8 @@
 	struct nfs4_file_layout_dsaddr *dsaddr; /* Point to GETDEVINFO data */
 	unsigned int num_fh;
 	struct nfs_fh **fh_array;
+	struct list_head *commit_buckets; /* Sort commits to ds */
+	int number_of_buckets;
 };
 
 static inline struct nfs4_filelayout_segment *
diff --git a/fs/nfs/nfs4filelayoutdev.c b/fs/nfs/nfs4filelayoutdev.c
index 68143c1..de5350f 100644
--- a/fs/nfs/nfs4filelayoutdev.c
+++ b/fs/nfs/nfs4filelayoutdev.c
@@ -261,7 +261,7 @@
  * Currently only support ipv4, and one multi-path address.
  */
 static struct nfs4_pnfs_ds *
-decode_and_add_ds(__be32 **pp, struct inode *inode)
+decode_and_add_ds(struct xdr_stream *streamp, struct inode *inode)
 {
 	struct nfs4_pnfs_ds *ds = NULL;
 	char *buf;
@@ -269,25 +269,34 @@
 	u32 ip_addr, port;
 	int nlen, rlen, i;
 	int tmp[2];
-	__be32 *r_netid, *r_addr, *p = *pp;
+	__be32 *p;
 
 	/* r_netid */
+	p = xdr_inline_decode(streamp, 4);
+	if (unlikely(!p))
+		goto out_err;
 	nlen = be32_to_cpup(p++);
-	r_netid = p;
-	p += XDR_QUADLEN(nlen);
 
-	/* r_addr */
-	rlen = be32_to_cpup(p++);
-	r_addr = p;
-	p += XDR_QUADLEN(rlen);
-	*pp = p;
+	p = xdr_inline_decode(streamp, nlen);
+	if (unlikely(!p))
+		goto out_err;
 
 	/* Check that netid is "tcp" */
-	if (nlen != 3 ||  memcmp((char *)r_netid, "tcp", 3)) {
+	if (nlen != 3 ||  memcmp((char *)p, "tcp", 3)) {
 		dprintk("%s: ERROR: non ipv4 TCP r_netid\n", __func__);
 		goto out_err;
 	}
 
+	/* r_addr */
+	p = xdr_inline_decode(streamp, 4);
+	if (unlikely(!p))
+		goto out_err;
+	rlen = be32_to_cpup(p);
+
+	p = xdr_inline_decode(streamp, rlen);
+	if (unlikely(!p))
+		goto out_err;
+
 	/* ipv6 length plus port is legal */
 	if (rlen > INET6_ADDRSTRLEN + 8) {
 		dprintk("%s: Invalid address, length %d\n", __func__,
@@ -300,7 +309,7 @@
 		goto out_err;
 	}
 	buf[rlen] = '\0';
-	memcpy(buf, r_addr, rlen);
+	memcpy(buf, p, rlen);
 
 	/* replace the port dots with dashes for the in4_pton() delimiter*/
 	for (i = 0; i < 2; i++) {
@@ -336,90 +345,154 @@
 static struct nfs4_file_layout_dsaddr*
 decode_device(struct inode *ino, struct pnfs_device *pdev)
 {
-	int i, dummy;
+	int i;
 	u32 cnt, num;
 	u8 *indexp;
-	__be32 *p = (__be32 *)pdev->area, *indicesp;
-	struct nfs4_file_layout_dsaddr *dsaddr;
+	__be32 *p;
+	u8 *stripe_indices;
+	u8 max_stripe_index;
+	struct nfs4_file_layout_dsaddr *dsaddr = NULL;
+	struct xdr_stream stream;
+	struct xdr_buf buf = {
+		.pages = pdev->pages,
+		.page_len = pdev->pglen,
+		.buflen = pdev->pglen,
+		.len = pdev->pglen,
+	};
+	struct page *scratch;
+
+	/* set up xdr stream */
+	scratch = alloc_page(GFP_KERNEL);
+	if (!scratch)
+		goto out_err;
+
+	xdr_init_decode(&stream, &buf, NULL);
+	xdr_set_scratch_buffer(&stream, page_address(scratch), PAGE_SIZE);
 
 	/* Get the stripe count (number of stripe index) */
-	cnt = be32_to_cpup(p++);
+	p = xdr_inline_decode(&stream, 4);
+	if (unlikely(!p))
+		goto out_err_free_scratch;
+
+	cnt = be32_to_cpup(p);
 	dprintk("%s stripe count  %d\n", __func__, cnt);
 	if (cnt > NFS4_PNFS_MAX_STRIPE_CNT) {
 		printk(KERN_WARNING "%s: stripe count %d greater than "
 		       "supported maximum %d\n", __func__,
 			cnt, NFS4_PNFS_MAX_STRIPE_CNT);
-		goto out_err;
+		goto out_err_free_scratch;
+	}
+
+	/* read stripe indices */
+	stripe_indices = kcalloc(cnt, sizeof(u8), GFP_KERNEL);
+	if (!stripe_indices)
+		goto out_err_free_scratch;
+
+	p = xdr_inline_decode(&stream, cnt << 2);
+	if (unlikely(!p))
+		goto out_err_free_stripe_indices;
+
+	indexp = &stripe_indices[0];
+	max_stripe_index = 0;
+	for (i = 0; i < cnt; i++) {
+		*indexp = be32_to_cpup(p++);
+		max_stripe_index = max(max_stripe_index, *indexp);
+		indexp++;
 	}
 
 	/* Check the multipath list count */
-	indicesp = p;
-	p += XDR_QUADLEN(cnt << 2);
-	num = be32_to_cpup(p++);
+	p = xdr_inline_decode(&stream, 4);
+	if (unlikely(!p))
+		goto out_err_free_stripe_indices;
+
+	num = be32_to_cpup(p);
 	dprintk("%s ds_num %u\n", __func__, num);
 	if (num > NFS4_PNFS_MAX_MULTI_CNT) {
 		printk(KERN_WARNING "%s: multipath count %d greater than "
 			"supported maximum %d\n", __func__,
 			num, NFS4_PNFS_MAX_MULTI_CNT);
-		goto out_err;
+		goto out_err_free_stripe_indices;
 	}
+
+	/* validate stripe indices are all < num */
+	if (max_stripe_index >= num) {
+		printk(KERN_WARNING "%s: stripe index %u >= num ds %u\n",
+			__func__, max_stripe_index, num);
+		goto out_err_free_stripe_indices;
+	}
+
 	dsaddr = kzalloc(sizeof(*dsaddr) +
 			(sizeof(struct nfs4_pnfs_ds *) * (num - 1)),
 			GFP_KERNEL);
 	if (!dsaddr)
-		goto out_err;
-
-	dsaddr->stripe_indices = kzalloc(sizeof(u8) * cnt, GFP_KERNEL);
-	if (!dsaddr->stripe_indices)
-		goto out_err_free;
+		goto out_err_free_stripe_indices;
 
 	dsaddr->stripe_count = cnt;
+	dsaddr->stripe_indices = stripe_indices;
+	stripe_indices = NULL;
 	dsaddr->ds_num = num;
 
 	memcpy(&dsaddr->deviceid, &pdev->dev_id, sizeof(pdev->dev_id));
 
-	/* Go back an read stripe indices */
-	p = indicesp;
-	indexp = &dsaddr->stripe_indices[0];
-	for (i = 0; i < dsaddr->stripe_count; i++) {
-		*indexp = be32_to_cpup(p++);
-		if (*indexp >= num)
-			goto out_err_free;
-		indexp++;
-	}
-	/* Skip already read multipath list count */
-	p++;
-
 	for (i = 0; i < dsaddr->ds_num; i++) {
 		int j;
+		u32 mp_count;
 
-		dummy = be32_to_cpup(p++); /* multipath count */
-		if (dummy > 1) {
+		p = xdr_inline_decode(&stream, 4);
+		if (unlikely(!p))
+			goto out_err_free_deviceid;
+
+		mp_count = be32_to_cpup(p); /* multipath count */
+		if (mp_count > 1) {
 			printk(KERN_WARNING
 			       "%s: Multipath count %d not supported, "
 			       "skipping all greater than 1\n", __func__,
-				dummy);
+				mp_count);
 		}
-		for (j = 0; j < dummy; j++) {
+		for (j = 0; j < mp_count; j++) {
 			if (j == 0) {
-				dsaddr->ds_list[i] = decode_and_add_ds(&p, ino);
+				dsaddr->ds_list[i] = decode_and_add_ds(&stream,
+					ino);
 				if (dsaddr->ds_list[i] == NULL)
-					goto out_err_free;
+					goto out_err_free_deviceid;
 			} else {
 				u32 len;
 				/* skip extra multipath */
-				len = be32_to_cpup(p++);
-				p += XDR_QUADLEN(len);
-				len = be32_to_cpup(p++);
-				p += XDR_QUADLEN(len);
-				continue;
+
+				/* read len, skip */
+				p = xdr_inline_decode(&stream, 4);
+				if (unlikely(!p))
+					goto out_err_free_deviceid;
+				len = be32_to_cpup(p);
+
+				p = xdr_inline_decode(&stream, len);
+				if (unlikely(!p))
+					goto out_err_free_deviceid;
+
+				/* read len, skip */
+				p = xdr_inline_decode(&stream, 4);
+				if (unlikely(!p))
+					goto out_err_free_deviceid;
+				len = be32_to_cpup(p);
+
+				p = xdr_inline_decode(&stream, len);
+				if (unlikely(!p))
+					goto out_err_free_deviceid;
 			}
 		}
 	}
+
+	__free_page(scratch);
 	return dsaddr;
 
-out_err_free:
+out_err_free_deviceid:
 	nfs4_fl_free_deviceid(dsaddr);
+	/* stripe_indicies was part of dsaddr */
+	goto out_err_free_scratch;
+out_err_free_stripe_indices:
+	kfree(stripe_indices);
+out_err_free_scratch:
+	__free_page(scratch);
 out_err:
 	dprintk("%s ERROR: returning NULL\n", __func__);
 	return NULL;
@@ -498,11 +571,6 @@
 			goto out_free;
 	}
 
-	/* set pdev->area */
-	pdev->area = vmap(pages, max_pages, VM_MAP, PAGE_KERNEL);
-	if (!pdev->area)
-		goto out_free;
-
 	memcpy(&pdev->dev_id, dev_id, sizeof(*dev_id));
 	pdev->layout_type = LAYOUT_NFSV4_1_FILES;
 	pdev->pages = pages;
@@ -521,8 +589,6 @@
 	 */
 	dsaddr = decode_and_add_device(inode, pdev);
 out_free:
-	if (pdev->area != NULL)
-		vunmap(pdev->area);
 	for (i = 0; i < max_pages; i++)
 		__free_page(pages[i]);
 	kfree(pages);
diff --git a/fs/nfs/nfs4proc.c b/fs/nfs/nfs4proc.c
index f9150f0..dfd1e6d 100644
--- a/fs/nfs/nfs4proc.c
+++ b/fs/nfs/nfs4proc.c
@@ -3253,12 +3253,9 @@
 	msg->rpc_proc = &nfs4_procedures[NFSPROC4_CLNT_WRITE];
 }
 
-static int nfs4_commit_done(struct rpc_task *task, struct nfs_write_data *data)
+static int nfs4_commit_done_cb(struct rpc_task *task, struct nfs_write_data *data)
 {
 	struct inode *inode = data->inode;
-	
-	if (!nfs4_sequence_done(task, &data->res.seq_res))
-		return -EAGAIN;
 
 	if (nfs4_async_handle_error(task, NFS_SERVER(inode), NULL) == -EAGAIN) {
 		nfs_restart_rpc(task, NFS_SERVER(inode)->nfs_client);
@@ -3268,11 +3265,24 @@
 	return 0;
 }
 
+static int nfs4_commit_done(struct rpc_task *task, struct nfs_write_data *data)
+{
+	if (!nfs4_sequence_done(task, &data->res.seq_res))
+		return -EAGAIN;
+	return data->write_done_cb(task, data);
+}
+
 static void nfs4_proc_commit_setup(struct nfs_write_data *data, struct rpc_message *msg)
 {
 	struct nfs_server *server = NFS_SERVER(data->inode);
-	
-	data->args.bitmask = server->cache_consistency_bitmask;
+
+	if (data->lseg) {
+		data->args.bitmask = NULL;
+		data->res.fattr = NULL;
+	} else
+		data->args.bitmask = server->cache_consistency_bitmask;
+	if (!data->write_done_cb)
+		data->write_done_cb = nfs4_commit_done_cb;
 	data->res.server = server;
 	msg->rpc_proc = &nfs4_procedures[NFSPROC4_CLNT_COMMIT];
 }
@@ -5608,8 +5618,6 @@
 	struct nfs4_layoutget *lgp = calldata;
 
 	dprintk("--> %s\n", __func__);
-	if (lgp->res.layout.buf != NULL)
-		free_page((unsigned long) lgp->res.layout.buf);
 	put_nfs_open_context(lgp->args.ctx);
 	kfree(calldata);
 	dprintk("<-- %s\n", __func__);
@@ -5641,12 +5649,7 @@
 
 	dprintk("--> %s\n", __func__);
 
-	lgp->res.layout.buf = (void *)__get_free_page(GFP_NOFS);
-	if (lgp->res.layout.buf == NULL) {
-		nfs4_layoutget_release(lgp);
-		return -ENOMEM;
-	}
-
+	lgp->res.layoutp = &lgp->args.layout;
 	lgp->res.seq_res.sr_slot = NULL;
 	task = rpc_run_task(&task_setup_data);
 	if (IS_ERR(task))
@@ -5698,6 +5701,100 @@
 }
 EXPORT_SYMBOL_GPL(nfs4_proc_getdeviceinfo);
 
+static void nfs4_layoutcommit_prepare(struct rpc_task *task, void *calldata)
+{
+	struct nfs4_layoutcommit_data *data = calldata;
+	struct nfs_server *server = NFS_SERVER(data->args.inode);
+
+	if (nfs4_setup_sequence(server, &data->args.seq_args,
+				&data->res.seq_res, 1, task))
+		return;
+	rpc_call_start(task);
+}
+
+static void
+nfs4_layoutcommit_done(struct rpc_task *task, void *calldata)
+{
+	struct nfs4_layoutcommit_data *data = calldata;
+	struct nfs_server *server = NFS_SERVER(data->args.inode);
+
+	if (!nfs4_sequence_done(task, &data->res.seq_res))
+		return;
+
+	switch (task->tk_status) { /* Just ignore these failures */
+	case NFS4ERR_DELEG_REVOKED: /* layout was recalled */
+	case NFS4ERR_BADIOMODE:     /* no IOMODE_RW layout for range */
+	case NFS4ERR_BADLAYOUT:     /* no layout */
+	case NFS4ERR_GRACE:	    /* loca_recalim always false */
+		task->tk_status = 0;
+	}
+
+	if (nfs4_async_handle_error(task, server, NULL) == -EAGAIN) {
+		nfs_restart_rpc(task, server->nfs_client);
+		return;
+	}
+
+	if (task->tk_status == 0)
+		nfs_post_op_update_inode_force_wcc(data->args.inode,
+						   data->res.fattr);
+}
+
+static void nfs4_layoutcommit_release(void *calldata)
+{
+	struct nfs4_layoutcommit_data *data = calldata;
+
+	/* Matched by references in pnfs_set_layoutcommit */
+	put_lseg(data->lseg);
+	put_rpccred(data->cred);
+	kfree(data);
+}
+
+static const struct rpc_call_ops nfs4_layoutcommit_ops = {
+	.rpc_call_prepare = nfs4_layoutcommit_prepare,
+	.rpc_call_done = nfs4_layoutcommit_done,
+	.rpc_release = nfs4_layoutcommit_release,
+};
+
+int
+nfs4_proc_layoutcommit(struct nfs4_layoutcommit_data *data, bool sync)
+{
+	struct rpc_message msg = {
+		.rpc_proc = &nfs4_procedures[NFSPROC4_CLNT_LAYOUTCOMMIT],
+		.rpc_argp = &data->args,
+		.rpc_resp = &data->res,
+		.rpc_cred = data->cred,
+	};
+	struct rpc_task_setup task_setup_data = {
+		.task = &data->task,
+		.rpc_client = NFS_CLIENT(data->args.inode),
+		.rpc_message = &msg,
+		.callback_ops = &nfs4_layoutcommit_ops,
+		.callback_data = data,
+		.flags = RPC_TASK_ASYNC,
+	};
+	struct rpc_task *task;
+	int status = 0;
+
+	dprintk("NFS: %4d initiating layoutcommit call. sync %d "
+		"lbw: %llu inode %lu\n",
+		data->task.tk_pid, sync,
+		data->args.lastbytewritten,
+		data->args.inode->i_ino);
+
+	task = rpc_run_task(&task_setup_data);
+	if (IS_ERR(task))
+		return PTR_ERR(task);
+	if (sync == false)
+		goto out;
+	status = nfs4_wait_for_completion_rpc_task(task);
+	if (status != 0)
+		goto out;
+	status = task->tk_status;
+out:
+	dprintk("%s: status %d\n", __func__, status);
+	rpc_put_task(task);
+	return status;
+}
 #endif /* CONFIG_NFS_V4_1 */
 
 struct nfs4_state_recovery_ops nfs40_reboot_recovery_ops = {
diff --git a/fs/nfs/nfs4xdr.c b/fs/nfs/nfs4xdr.c
index 21c3004..dddfb57 100644
--- a/fs/nfs/nfs4xdr.c
+++ b/fs/nfs/nfs4xdr.c
@@ -327,6 +327,18 @@
 #define decode_layoutget_maxsz	(op_decode_hdr_maxsz + 8 + \
 				decode_stateid_maxsz + \
 				XDR_QUADLEN(PNFS_LAYOUT_MAXSIZE))
+#define encode_layoutcommit_maxsz (op_encode_hdr_maxsz +          \
+				2 /* offset */ + \
+				2 /* length */ + \
+				1 /* reclaim */ + \
+				encode_stateid_maxsz + \
+				1 /* new offset (true) */ + \
+				2 /* last byte written */ + \
+				1 /* nt_timechanged (false) */ + \
+				1 /* layoutupdate4 layout type */ + \
+				1 /* NULL filelayout layoutupdate4 payload */)
+#define decode_layoutcommit_maxsz (op_decode_hdr_maxsz + 3)
+
 #else /* CONFIG_NFS_V4_1 */
 #define encode_sequence_maxsz	0
 #define decode_sequence_maxsz	0
@@ -738,6 +750,17 @@
 				decode_sequence_maxsz + \
 				decode_putfh_maxsz +        \
 				decode_layoutget_maxsz)
+#define NFS4_enc_layoutcommit_sz (compound_encode_hdr_maxsz + \
+				encode_sequence_maxsz +\
+				encode_putfh_maxsz + \
+				encode_layoutcommit_maxsz + \
+				encode_getattr_maxsz)
+#define NFS4_dec_layoutcommit_sz (compound_decode_hdr_maxsz + \
+				decode_sequence_maxsz + \
+				decode_putfh_maxsz + \
+				decode_layoutcommit_maxsz + \
+				decode_getattr_maxsz)
+
 
 const u32 nfs41_maxwrite_overhead = ((RPC_MAX_HEADER_WITH_AUTH +
 				      compound_encode_hdr_maxsz +
@@ -1839,6 +1862,34 @@
 	hdr->nops++;
 	hdr->replen += decode_layoutget_maxsz;
 }
+
+static int
+encode_layoutcommit(struct xdr_stream *xdr,
+		    const struct nfs4_layoutcommit_args *args,
+		    struct compound_hdr *hdr)
+{
+	__be32 *p;
+
+	dprintk("%s: lbw: %llu type: %d\n", __func__, args->lastbytewritten,
+		NFS_SERVER(args->inode)->pnfs_curr_ld->id);
+
+	p = reserve_space(xdr, 48 + NFS4_STATEID_SIZE);
+	*p++ = cpu_to_be32(OP_LAYOUTCOMMIT);
+	/* Only whole file layouts */
+	p = xdr_encode_hyper(p, 0); /* offset */
+	p = xdr_encode_hyper(p, NFS4_MAX_UINT64); /* length */
+	*p++ = cpu_to_be32(0); /* reclaim */
+	p = xdr_encode_opaque_fixed(p, args->stateid.data, NFS4_STATEID_SIZE);
+	*p++ = cpu_to_be32(1); /* newoffset = TRUE */
+	p = xdr_encode_hyper(p, args->lastbytewritten);
+	*p++ = cpu_to_be32(0); /* Never send time_modify_changed */
+	*p++ = cpu_to_be32(NFS_SERVER(args->inode)->pnfs_curr_ld->id);/* type */
+	*p++ = cpu_to_be32(0); /* no file layout payload */
+
+	hdr->nops++;
+	hdr->replen += decode_layoutcommit_maxsz;
+	return 0;
+}
 #endif /* CONFIG_NFS_V4_1 */
 
 /*
@@ -2317,7 +2368,8 @@
 	encode_sequence(xdr, &args->seq_args, &hdr);
 	encode_putfh(xdr, args->fh, &hdr);
 	encode_commit(xdr, args, &hdr);
-	encode_getfattr(xdr, args->bitmask, &hdr);
+	if (args->bitmask)
+		encode_getfattr(xdr, args->bitmask, &hdr);
 	encode_nops(&hdr);
 }
 
@@ -2645,8 +2697,32 @@
 	encode_sequence(xdr, &args->seq_args, &hdr);
 	encode_putfh(xdr, NFS_FH(args->inode), &hdr);
 	encode_layoutget(xdr, args, &hdr);
+
+	xdr_inline_pages(&req->rq_rcv_buf, hdr.replen << 2,
+	    args->layout.pages, 0, args->layout.pglen);
+
 	encode_nops(&hdr);
 }
+
+/*
+ *  Encode LAYOUTCOMMIT request
+ */
+static int nfs4_xdr_enc_layoutcommit(struct rpc_rqst *req,
+				     struct xdr_stream *xdr,
+				     struct nfs4_layoutcommit_args *args)
+{
+	struct compound_hdr hdr = {
+		.minorversion = nfs4_xdr_minorversion(&args->seq_args),
+	};
+
+	encode_compound_hdr(xdr, req, &hdr);
+	encode_sequence(xdr, &args->seq_args, &hdr);
+	encode_putfh(xdr, NFS_FH(args->inode), &hdr);
+	encode_layoutcommit(xdr, args, &hdr);
+	encode_getfattr(xdr, args->bitmask, &hdr);
+	encode_nops(&hdr);
+	return 0;
+}
 #endif /* CONFIG_NFS_V4_1 */
 
 static void print_overflow_msg(const char *func, const struct xdr_stream *xdr)
@@ -5063,6 +5139,9 @@
 	__be32 *p;
 	int status;
 	u32 layout_count;
+	struct xdr_buf *rcvbuf = &req->rq_rcv_buf;
+	struct kvec *iov = rcvbuf->head;
+	u32 hdrlen, recvd;
 
 	status = decode_op_hdr(xdr, OP_LAYOUTGET);
 	if (status)
@@ -5079,17 +5158,14 @@
 		return -EINVAL;
 	}
 
-	p = xdr_inline_decode(xdr, 24);
+	p = xdr_inline_decode(xdr, 28);
 	if (unlikely(!p))
 		goto out_overflow;
 	p = xdr_decode_hyper(p, &res->range.offset);
 	p = xdr_decode_hyper(p, &res->range.length);
 	res->range.iomode = be32_to_cpup(p++);
 	res->type = be32_to_cpup(p++);
-
-	status = decode_opaque_inline(xdr, &res->layout.len, (char **)&p);
-	if (unlikely(status))
-		return status;
+	res->layoutp->len = be32_to_cpup(p);
 
 	dprintk("%s roff:%lu rlen:%lu riomode:%d, lo_type:0x%x, lo.len:%d\n",
 		__func__,
@@ -5097,12 +5173,18 @@
 		(unsigned long)res->range.length,
 		res->range.iomode,
 		res->type,
-		res->layout.len);
+		res->layoutp->len);
 
-	/* nfs4_proc_layoutget allocated a single page */
-	if (res->layout.len > PAGE_SIZE)
-		return -ENOMEM;
-	memcpy(res->layout.buf, p, res->layout.len);
+	hdrlen = (u8 *) xdr->p - (u8 *) iov->iov_base;
+	recvd = req->rq_rcv_buf.len - hdrlen;
+	if (res->layoutp->len > recvd) {
+		dprintk("NFS: server cheating in layoutget reply: "
+				"layout len %u > recvd %u\n",
+				res->layoutp->len, recvd);
+		return -EINVAL;
+	}
+
+	xdr_read_pages(xdr, res->layoutp->len);
 
 	if (layout_count > 1) {
 		/* We only handle a length one array at the moment.  Any
@@ -5119,6 +5201,35 @@
 	print_overflow_msg(__func__, xdr);
 	return -EIO;
 }
+
+static int decode_layoutcommit(struct xdr_stream *xdr,
+			       struct rpc_rqst *req,
+			       struct nfs4_layoutcommit_res *res)
+{
+	__be32 *p;
+	__u32 sizechanged;
+	int status;
+
+	status = decode_op_hdr(xdr, OP_LAYOUTCOMMIT);
+	if (status)
+		return status;
+
+	p = xdr_inline_decode(xdr, 4);
+	if (unlikely(!p))
+		goto out_overflow;
+	sizechanged = be32_to_cpup(p);
+
+	if (sizechanged) {
+		/* throw away new size */
+		p = xdr_inline_decode(xdr, 8);
+		if (unlikely(!p))
+			goto out_overflow;
+	}
+	return 0;
+out_overflow:
+	print_overflow_msg(__func__, xdr);
+	return -EIO;
+}
 #endif /* CONFIG_NFS_V4_1 */
 
 /*
@@ -5836,8 +5947,9 @@
 	status = decode_commit(xdr, res);
 	if (status)
 		goto out;
-	decode_getfattr(xdr, res->fattr, res->server,
-			!RPC_IS_ASYNC(rqstp->rq_task));
+	if (res->fattr)
+		decode_getfattr(xdr, res->fattr, res->server,
+				!RPC_IS_ASYNC(rqstp->rq_task));
 out:
 	return status;
 }
@@ -6205,6 +6317,34 @@
 out:
 	return status;
 }
+
+/*
+ * Decode LAYOUTCOMMIT response
+ */
+static int nfs4_xdr_dec_layoutcommit(struct rpc_rqst *rqstp,
+				     struct xdr_stream *xdr,
+				     struct nfs4_layoutcommit_res *res)
+{
+	struct compound_hdr hdr;
+	int status;
+
+	status = decode_compound_hdr(xdr, &hdr);
+	if (status)
+		goto out;
+	status = decode_sequence(xdr, &res->seq_res, rqstp);
+	if (status)
+		goto out;
+	status = decode_putfh(xdr);
+	if (status)
+		goto out;
+	status = decode_layoutcommit(xdr, rqstp, res);
+	if (status)
+		goto out;
+	decode_getfattr(xdr, res->fattr, res->server,
+			!RPC_IS_ASYNC(rqstp->rq_task));
+out:
+	return status;
+}
 #endif /* CONFIG_NFS_V4_1 */
 
 /**
@@ -6403,6 +6543,7 @@
 	PROC(RECLAIM_COMPLETE,	enc_reclaim_complete,	dec_reclaim_complete),
 	PROC(GETDEVICEINFO,	enc_getdeviceinfo,	dec_getdeviceinfo),
 	PROC(LAYOUTGET,		enc_layoutget,		dec_layoutget),
+	PROC(LAYOUTCOMMIT,	enc_layoutcommit,	dec_layoutcommit),
 #endif /* CONFIG_NFS_V4_1 */
 };
 
diff --git a/fs/nfs/pagelist.c b/fs/nfs/pagelist.c
index 23e7944..87a593c 100644
--- a/fs/nfs/pagelist.c
+++ b/fs/nfs/pagelist.c
@@ -223,6 +223,7 @@
 	desc->pg_count = 0;
 	desc->pg_bsize = bsize;
 	desc->pg_base = 0;
+	desc->pg_moreio = 0;
 	desc->pg_inode = inode;
 	desc->pg_doio = doio;
 	desc->pg_ioflags = io_flags;
@@ -335,9 +336,11 @@
 			   struct nfs_page *req)
 {
 	while (!nfs_pageio_do_add_request(desc, req)) {
+		desc->pg_moreio = 1;
 		nfs_pageio_doio(desc);
 		if (desc->pg_error < 0)
 			return 0;
+		desc->pg_moreio = 0;
 	}
 	return 1;
 }
@@ -395,6 +398,7 @@
 	pgoff_t idx_end;
 	int found, i;
 	int res;
+	struct list_head *list;
 
 	res = 0;
 	if (npages == 0)
@@ -415,10 +419,10 @@
 			idx_start = req->wb_index + 1;
 			if (nfs_set_page_tag_locked(req)) {
 				kref_get(&req->wb_kref);
-				nfs_list_remove_request(req);
 				radix_tree_tag_clear(&nfsi->nfs_page_tree,
 						req->wb_index, tag);
-				nfs_list_add_request(req, dst);
+				list = pnfs_choose_commit_list(req, dst);
+				nfs_list_add_request(req, list);
 				res++;
 				if (res == INT_MAX)
 					goto out;
diff --git a/fs/nfs/pnfs.c b/fs/nfs/pnfs.c
index f38813a..d9ab972 100644
--- a/fs/nfs/pnfs.c
+++ b/fs/nfs/pnfs.c
@@ -259,6 +259,7 @@
 		pnfs_free_lseg_list(&free_me);
 	}
 }
+EXPORT_SYMBOL_GPL(put_lseg);
 
 static bool
 should_free_lseg(u32 lseg_iomode, u32 recall_iomode)
@@ -471,6 +472,9 @@
 	struct nfs_server *server = NFS_SERVER(ino);
 	struct nfs4_layoutget *lgp;
 	struct pnfs_layout_segment *lseg = NULL;
+	struct page **pages = NULL;
+	int i;
+	u32 max_resp_sz, max_pages;
 
 	dprintk("--> %s\n", __func__);
 
@@ -478,6 +482,21 @@
 	lgp = kzalloc(sizeof(*lgp), GFP_KERNEL);
 	if (lgp == NULL)
 		return NULL;
+
+	/* allocate pages for xdr post processing */
+	max_resp_sz = server->nfs_client->cl_session->fc_attrs.max_resp_sz;
+	max_pages = max_resp_sz >> PAGE_SHIFT;
+
+	pages = kzalloc(max_pages * sizeof(struct page *), GFP_KERNEL);
+	if (!pages)
+		goto out_err_free;
+
+	for (i = 0; i < max_pages; i++) {
+		pages[i] = alloc_page(GFP_KERNEL);
+		if (!pages[i])
+			goto out_err_free;
+	}
+
 	lgp->args.minlength = NFS4_MAX_UINT64;
 	lgp->args.maxcount = PNFS_LAYOUT_MAXSIZE;
 	lgp->args.range.iomode = iomode;
@@ -486,6 +505,8 @@
 	lgp->args.type = server->pnfs_curr_ld->id;
 	lgp->args.inode = ino;
 	lgp->args.ctx = get_nfs_open_context(ctx);
+	lgp->args.layout.pages = pages;
+	lgp->args.layout.pglen = max_pages * PAGE_SIZE;
 	lgp->lsegpp = &lseg;
 
 	/* Synchronously retrieve layout information from server and
@@ -496,7 +517,26 @@
 		/* remember that LAYOUTGET failed and suspend trying */
 		set_bit(lo_fail_bit(iomode), &lo->plh_flags);
 	}
+
+	/* free xdr pages */
+	for (i = 0; i < max_pages; i++)
+		__free_page(pages[i]);
+	kfree(pages);
+
 	return lseg;
+
+out_err_free:
+	/* free any allocated xdr pages, lgp as it's not used */
+	if (pages) {
+		for (i = 0; i < max_pages; i++) {
+			if (!pages[i])
+				break;
+			__free_page(pages[i]);
+		}
+		kfree(pages);
+	}
+	kfree(lgp);
+	return NULL;
 }
 
 bool pnfs_roc(struct inode *ino)
@@ -945,3 +985,105 @@
 	dprintk("%s End (trypnfs:%d)\n", __func__, trypnfs);
 	return trypnfs;
 }
+
+/*
+ * Currently there is only one (whole file) write lseg.
+ */
+static struct pnfs_layout_segment *pnfs_list_write_lseg(struct inode *inode)
+{
+	struct pnfs_layout_segment *lseg, *rv = NULL;
+
+	list_for_each_entry(lseg, &NFS_I(inode)->layout->plh_segs, pls_list)
+		if (lseg->pls_range.iomode == IOMODE_RW)
+			rv = lseg;
+	return rv;
+}
+
+void
+pnfs_set_layoutcommit(struct nfs_write_data *wdata)
+{
+	struct nfs_inode *nfsi = NFS_I(wdata->inode);
+	loff_t end_pos = wdata->args.offset + wdata->res.count;
+
+	spin_lock(&nfsi->vfs_inode.i_lock);
+	if (!test_and_set_bit(NFS_INO_LAYOUTCOMMIT, &nfsi->flags)) {
+		/* references matched in nfs4_layoutcommit_release */
+		get_lseg(wdata->lseg);
+		wdata->lseg->pls_lc_cred =
+			get_rpccred(wdata->args.context->state->owner->so_cred);
+		mark_inode_dirty_sync(wdata->inode);
+		dprintk("%s: Set layoutcommit for inode %lu ",
+			__func__, wdata->inode->i_ino);
+	}
+	if (end_pos > wdata->lseg->pls_end_pos)
+		wdata->lseg->pls_end_pos = end_pos;
+	spin_unlock(&nfsi->vfs_inode.i_lock);
+}
+EXPORT_SYMBOL_GPL(pnfs_set_layoutcommit);
+
+/*
+ * For the LAYOUT4_NFSV4_1_FILES layout type, NFS_DATA_SYNC WRITEs and
+ * NFS_UNSTABLE WRITEs with a COMMIT to data servers must store enough
+ * data to disk to allow the server to recover the data if it crashes.
+ * LAYOUTCOMMIT is only needed when the NFL4_UFLG_COMMIT_THRU_MDS flag
+ * is off, and a COMMIT is sent to a data server, or
+ * if WRITEs to a data server return NFS_DATA_SYNC.
+ */
+int
+pnfs_layoutcommit_inode(struct inode *inode, bool sync)
+{
+	struct nfs4_layoutcommit_data *data;
+	struct nfs_inode *nfsi = NFS_I(inode);
+	struct pnfs_layout_segment *lseg;
+	struct rpc_cred *cred;
+	loff_t end_pos;
+	int status = 0;
+
+	dprintk("--> %s inode %lu\n", __func__, inode->i_ino);
+
+	if (!test_bit(NFS_INO_LAYOUTCOMMIT, &nfsi->flags))
+		return 0;
+
+	/* Note kzalloc ensures data->res.seq_res.sr_slot == NULL */
+	data = kzalloc(sizeof(*data), GFP_NOFS);
+	if (!data) {
+		mark_inode_dirty_sync(inode);
+		status = -ENOMEM;
+		goto out;
+	}
+
+	spin_lock(&inode->i_lock);
+	if (!test_and_clear_bit(NFS_INO_LAYOUTCOMMIT, &nfsi->flags)) {
+		spin_unlock(&inode->i_lock);
+		kfree(data);
+		goto out;
+	}
+	/*
+	 * Currently only one (whole file) write lseg which is referenced
+	 * in pnfs_set_layoutcommit and will be found.
+	 */
+	lseg = pnfs_list_write_lseg(inode);
+
+	end_pos = lseg->pls_end_pos;
+	cred = lseg->pls_lc_cred;
+	lseg->pls_end_pos = 0;
+	lseg->pls_lc_cred = NULL;
+
+	memcpy(&data->args.stateid.data, nfsi->layout->plh_stateid.data,
+		sizeof(nfsi->layout->plh_stateid.data));
+	spin_unlock(&inode->i_lock);
+
+	data->args.inode = inode;
+	data->lseg = lseg;
+	data->cred = cred;
+	nfs_fattr_init(&data->fattr);
+	data->args.bitmask = NFS_SERVER(inode)->cache_consistency_bitmask;
+	data->res.fattr = &data->fattr;
+	data->args.lastbytewritten = end_pos - 1;
+	data->res.server = NFS_SERVER(inode);
+
+	status = nfs4_proc_layoutcommit(data, sync);
+out:
+	dprintk("<-- %s status %d\n", __func__, status);
+	return status;
+}
diff --git a/fs/nfs/pnfs.h b/fs/nfs/pnfs.h
index 6380b94..bc48272 100644
--- a/fs/nfs/pnfs.h
+++ b/fs/nfs/pnfs.h
@@ -43,6 +43,8 @@
 	atomic_t pls_refcount;
 	unsigned long pls_flags;
 	struct pnfs_layout_hdr *pls_layout;
+	struct rpc_cred	*pls_lc_cred; /* LAYOUTCOMMIT credential */
+	loff_t pls_end_pos; /* LAYOUTCOMMIT write end */
 };
 
 enum pnfs_try_status {
@@ -74,6 +76,13 @@
 	/* test for nfs page cache coalescing */
 	int (*pg_test)(struct nfs_pageio_descriptor *, struct nfs_page *, struct nfs_page *);
 
+	/* Returns true if layoutdriver wants to divert this request to
+	 * driver's commit routine.
+	 */
+	bool (*mark_pnfs_commit)(struct pnfs_layout_segment *lseg);
+	struct list_head * (*choose_commit_list) (struct nfs_page *req);
+	int (*commit_pagelist)(struct inode *inode, struct list_head *mds_pages, int how);
+
 	/*
 	 * Return PNFS_ATTEMPTED to indicate the layout code has attempted
 	 * I/O, else return PNFS_NOT_ATTEMPTED to fall back to normal NFS
@@ -100,7 +109,6 @@
 	unsigned int  layout_type;
 	unsigned int  mincount;
 	struct page **pages;
-	void          *area;
 	unsigned int  pgbase;
 	unsigned int  pglen;
 };
@@ -145,7 +153,8 @@
 void pnfs_roc_release(struct inode *ino);
 void pnfs_roc_set_barrier(struct inode *ino, u32 barrier);
 bool pnfs_roc_drain(struct inode *ino, u32 *barrier);
-
+void pnfs_set_layoutcommit(struct nfs_write_data *wdata);
+int pnfs_layoutcommit_inode(struct inode *inode, bool sync);
 
 static inline int lo_fail_bit(u32 iomode)
 {
@@ -169,6 +178,51 @@
 	return nfss->pnfs_curr_ld != NULL;
 }
 
+static inline void
+pnfs_mark_request_commit(struct nfs_page *req, struct pnfs_layout_segment *lseg)
+{
+	if (lseg) {
+		struct pnfs_layoutdriver_type *ld;
+
+		ld = NFS_SERVER(req->wb_page->mapping->host)->pnfs_curr_ld;
+		if (ld->mark_pnfs_commit && ld->mark_pnfs_commit(lseg)) {
+			set_bit(PG_PNFS_COMMIT, &req->wb_flags);
+			req->wb_commit_lseg = get_lseg(lseg);
+		}
+	}
+}
+
+static inline int
+pnfs_commit_list(struct inode *inode, struct list_head *mds_pages, int how)
+{
+	if (!test_and_clear_bit(NFS_INO_PNFS_COMMIT, &NFS_I(inode)->flags))
+		return PNFS_NOT_ATTEMPTED;
+	return NFS_SERVER(inode)->pnfs_curr_ld->commit_pagelist(inode, mds_pages, how);
+}
+
+static inline struct list_head *
+pnfs_choose_commit_list(struct nfs_page *req, struct list_head *mds)
+{
+	struct list_head *rv;
+
+	if (test_and_clear_bit(PG_PNFS_COMMIT, &req->wb_flags)) {
+		struct inode *inode = req->wb_commit_lseg->pls_layout->plh_inode;
+
+		set_bit(NFS_INO_PNFS_COMMIT, &NFS_I(inode)->flags);
+		rv = NFS_SERVER(inode)->pnfs_curr_ld->choose_commit_list(req);
+		/* matched by ref taken when PG_PNFS_COMMIT is set */
+		put_lseg(req->wb_commit_lseg);
+	} else
+		rv = mds;
+	return rv;
+}
+
+static inline void pnfs_clear_request_commit(struct nfs_page *req)
+{
+	if (test_and_clear_bit(PG_PNFS_COMMIT, &req->wb_flags))
+		put_lseg(req->wb_commit_lseg);
+}
+
 #else  /* CONFIG_NFS_V4_1 */
 
 static inline void pnfs_destroy_all_layouts(struct nfs_client *clp)
@@ -252,6 +306,31 @@
 	pgio->pg_test = NULL;
 }
 
+static inline void
+pnfs_mark_request_commit(struct nfs_page *req, struct pnfs_layout_segment *lseg)
+{
+}
+
+static inline int
+pnfs_commit_list(struct inode *inode, struct list_head *mds_pages, int how)
+{
+	return PNFS_NOT_ATTEMPTED;
+}
+
+static inline struct list_head *
+pnfs_choose_commit_list(struct nfs_page *req, struct list_head *mds)
+{
+	return mds;
+}
+
+static inline void pnfs_clear_request_commit(struct nfs_page *req)
+{
+}
+
+static inline int pnfs_layoutcommit_inode(struct inode *inode, bool sync)
+{
+	return 0;
+}
 #endif /* CONFIG_NFS_V4_1 */
 
 #endif /* FS_NFS_PNFS_H */
diff --git a/fs/nfs/write.c b/fs/nfs/write.c
index 47a3ad6..85d7525 100644
--- a/fs/nfs/write.c
+++ b/fs/nfs/write.c
@@ -59,6 +59,7 @@
 	}
 	return p;
 }
+EXPORT_SYMBOL_GPL(nfs_commitdata_alloc);
 
 void nfs_commit_free(struct nfs_write_data *p)
 {
@@ -66,6 +67,7 @@
 		kfree(p->pagevec);
 	mempool_free(p, nfs_commit_mempool);
 }
+EXPORT_SYMBOL_GPL(nfs_commit_free);
 
 struct nfs_write_data *nfs_writedata_alloc(unsigned int pagecount)
 {
@@ -179,8 +181,8 @@
 	if (wbc->for_reclaim)
 		return FLUSH_HIGHPRI | FLUSH_STABLE;
 	if (wbc->for_kupdate || wbc->for_background)
-		return FLUSH_LOWPRI;
-	return 0;
+		return FLUSH_LOWPRI | FLUSH_COND_STABLE;
+	return FLUSH_COND_STABLE;
 }
 
 /*
@@ -441,7 +443,7 @@
  * Add a request to the inode's commit list.
  */
 static void
-nfs_mark_request_commit(struct nfs_page *req)
+nfs_mark_request_commit(struct nfs_page *req, struct pnfs_layout_segment *lseg)
 {
 	struct inode *inode = req->wb_context->path.dentry->d_inode;
 	struct nfs_inode *nfsi = NFS_I(inode);
@@ -453,6 +455,7 @@
 			NFS_PAGE_TAG_COMMIT);
 	nfsi->ncommit++;
 	spin_unlock(&inode->i_lock);
+	pnfs_mark_request_commit(req, lseg);
 	inc_zone_page_state(req->wb_page, NR_UNSTABLE_NFS);
 	inc_bdi_stat(req->wb_page->mapping->backing_dev_info, BDI_RECLAIMABLE);
 	__mark_inode_dirty(inode, I_DIRTY_DATASYNC);
@@ -474,14 +477,18 @@
 static inline
 int nfs_write_need_commit(struct nfs_write_data *data)
 {
-	return data->verf.committed != NFS_FILE_SYNC;
+	if (data->verf.committed == NFS_DATA_SYNC)
+		return data->lseg == NULL;
+	else
+		return data->verf.committed != NFS_FILE_SYNC;
 }
 
 static inline
-int nfs_reschedule_unstable_write(struct nfs_page *req)
+int nfs_reschedule_unstable_write(struct nfs_page *req,
+				  struct nfs_write_data *data)
 {
 	if (test_and_clear_bit(PG_NEED_COMMIT, &req->wb_flags)) {
-		nfs_mark_request_commit(req);
+		nfs_mark_request_commit(req, data->lseg);
 		return 1;
 	}
 	if (test_and_clear_bit(PG_NEED_RESCHED, &req->wb_flags)) {
@@ -492,7 +499,7 @@
 }
 #else
 static inline void
-nfs_mark_request_commit(struct nfs_page *req)
+nfs_mark_request_commit(struct nfs_page *req, struct pnfs_layout_segment *lseg)
 {
 }
 
@@ -509,7 +516,8 @@
 }
 
 static inline
-int nfs_reschedule_unstable_write(struct nfs_page *req)
+int nfs_reschedule_unstable_write(struct nfs_page *req,
+				  struct nfs_write_data *data)
 {
 	return 0;
 }
@@ -612,9 +620,11 @@
 	}
 
 	if (nfs_clear_request_commit(req) &&
-			radix_tree_tag_clear(&NFS_I(inode)->nfs_page_tree,
-				req->wb_index, NFS_PAGE_TAG_COMMIT) != NULL)
+	    radix_tree_tag_clear(&NFS_I(inode)->nfs_page_tree,
+				 req->wb_index, NFS_PAGE_TAG_COMMIT) != NULL) {
 		NFS_I(inode)->ncommit--;
+		pnfs_clear_request_commit(req);
+	}
 
 	/* Okay, the request matches. Update the region */
 	if (offset < req->wb_offset) {
@@ -762,11 +772,12 @@
 	return status;
 }
 
-static void nfs_writepage_release(struct nfs_page *req)
+static void nfs_writepage_release(struct nfs_page *req,
+				  struct nfs_write_data *data)
 {
 	struct page *page = req->wb_page;
 
-	if (PageError(req->wb_page) || !nfs_reschedule_unstable_write(req))
+	if (PageError(req->wb_page) || !nfs_reschedule_unstable_write(req, data))
 		nfs_inode_remove_request(req);
 	nfs_clear_page_tag_locked(req);
 	nfs_end_page_writeback(page);
@@ -863,7 +874,7 @@
 	data->args.context = get_nfs_open_context(req->wb_context);
 	data->args.lock_context = req->wb_lock_context;
 	data->args.stable  = NFS_UNSTABLE;
-	if (how & FLUSH_STABLE) {
+	if (how & (FLUSH_STABLE | FLUSH_COND_STABLE)) {
 		data->args.stable = NFS_DATA_SYNC;
 		if (!nfs_need_commit(NFS_I(inode)))
 			data->args.stable = NFS_FILE_SYNC;
@@ -912,6 +923,12 @@
 
 	nfs_list_remove_request(req);
 
+	if ((desc->pg_ioflags & FLUSH_COND_STABLE) &&
+	    (desc->pg_moreio || NFS_I(desc->pg_inode)->ncommit ||
+	     desc->pg_count > wsize))
+		desc->pg_ioflags &= ~FLUSH_COND_STABLE;
+
+
 	nbytes = desc->pg_count;
 	do {
 		size_t len = min(nbytes, wsize);
@@ -1002,6 +1019,10 @@
 	if ((!lseg) && list_is_singular(&data->pages))
 		lseg = pnfs_update_layout(desc->pg_inode, req->wb_context, IOMODE_RW);
 
+	if ((desc->pg_ioflags & FLUSH_COND_STABLE) &&
+	    (desc->pg_moreio || NFS_I(desc->pg_inode)->ncommit))
+		desc->pg_ioflags &= ~FLUSH_COND_STABLE;
+
 	/* Set up the argument struct */
 	ret = nfs_write_rpcsetup(req, data, &nfs_write_full_ops, desc->pg_count, 0, lseg, desc->pg_ioflags);
 out:
@@ -1074,7 +1095,7 @@
 
 out:
 	if (atomic_dec_and_test(&req->wb_complete))
-		nfs_writepage_release(req);
+		nfs_writepage_release(req, data);
 	nfs_writedata_release(calldata);
 }
 
@@ -1141,7 +1162,7 @@
 
 		if (nfs_write_need_commit(data)) {
 			memcpy(&req->wb_verf, &data->verf, sizeof(req->wb_verf));
-			nfs_mark_request_commit(req);
+			nfs_mark_request_commit(req, data->lseg);
 			dprintk(" marked for commit\n");
 			goto next;
 		}
@@ -1251,78 +1272,60 @@
 #if defined(CONFIG_NFS_V3) || defined(CONFIG_NFS_V4)
 static int nfs_commit_set_lock(struct nfs_inode *nfsi, int may_wait)
 {
+	int ret;
+
 	if (!test_and_set_bit(NFS_INO_COMMIT, &nfsi->flags))
 		return 1;
-	if (may_wait && !out_of_line_wait_on_bit_lock(&nfsi->flags,
-				NFS_INO_COMMIT, nfs_wait_bit_killable,
-				TASK_KILLABLE))
-		return 1;
-	return 0;
+	if (!may_wait)
+		return 0;
+	ret = out_of_line_wait_on_bit_lock(&nfsi->flags,
+				NFS_INO_COMMIT,
+				nfs_wait_bit_killable,
+				TASK_KILLABLE);
+	return (ret < 0) ? ret : 1;
 }
 
-static void nfs_commit_clear_lock(struct nfs_inode *nfsi)
+void nfs_commit_clear_lock(struct nfs_inode *nfsi)
 {
 	clear_bit(NFS_INO_COMMIT, &nfsi->flags);
 	smp_mb__after_clear_bit();
 	wake_up_bit(&nfsi->flags, NFS_INO_COMMIT);
 }
+EXPORT_SYMBOL_GPL(nfs_commit_clear_lock);
 
-
-static void nfs_commitdata_release(void *data)
+void nfs_commitdata_release(void *data)
 {
 	struct nfs_write_data *wdata = data;
 
+	put_lseg(wdata->lseg);
 	put_nfs_open_context(wdata->args.context);
 	nfs_commit_free(wdata);
 }
+EXPORT_SYMBOL_GPL(nfs_commitdata_release);
 
-/*
- * Set up the argument/result storage required for the RPC call.
- */
-static int nfs_commit_rpcsetup(struct list_head *head,
-		struct nfs_write_data *data,
-		int how)
+int nfs_initiate_commit(struct nfs_write_data *data, struct rpc_clnt *clnt,
+			const struct rpc_call_ops *call_ops,
+			int how)
 {
-	struct nfs_page *first = nfs_list_entry(head->next);
-	struct inode *inode = first->wb_context->path.dentry->d_inode;
-	int priority = flush_task_priority(how);
 	struct rpc_task *task;
+	int priority = flush_task_priority(how);
 	struct rpc_message msg = {
 		.rpc_argp = &data->args,
 		.rpc_resp = &data->res,
-		.rpc_cred = first->wb_context->cred,
+		.rpc_cred = data->cred,
 	};
 	struct rpc_task_setup task_setup_data = {
 		.task = &data->task,
-		.rpc_client = NFS_CLIENT(inode),
+		.rpc_client = clnt,
 		.rpc_message = &msg,
-		.callback_ops = &nfs_commit_ops,
+		.callback_ops = call_ops,
 		.callback_data = data,
 		.workqueue = nfsiod_workqueue,
 		.flags = RPC_TASK_ASYNC,
 		.priority = priority,
 	};
-
-	/* Set up the RPC argument and reply structs
-	 * NB: take care not to mess about with data->commit et al. */
-
-	list_splice_init(head, &data->pages);
-
-	data->inode	  = inode;
-	data->cred	  = msg.rpc_cred;
-
-	data->args.fh     = NFS_FH(data->inode);
-	/* Note: we always request a commit of the entire inode */
-	data->args.offset = 0;
-	data->args.count  = 0;
-	data->args.context = get_nfs_open_context(first->wb_context);
-	data->res.count   = 0;
-	data->res.fattr   = &data->fattr;
-	data->res.verf    = &data->verf;
-	nfs_fattr_init(&data->fattr);
-
 	/* Set up the initial task struct.  */
-	NFS_PROTO(inode)->commit_setup(data, &msg);
+	NFS_PROTO(data->inode)->commit_setup(data, &msg);
 
 	dprintk("NFS: %5u initiated commit call\n", data->task.tk_pid);
 
@@ -1334,6 +1337,56 @@
 	rpc_put_task(task);
 	return 0;
 }
+EXPORT_SYMBOL_GPL(nfs_initiate_commit);
+
+/*
+ * Set up the argument/result storage required for the RPC call.
+ */
+void nfs_init_commit(struct nfs_write_data *data,
+			    struct list_head *head,
+			    struct pnfs_layout_segment *lseg)
+{
+	struct nfs_page *first = nfs_list_entry(head->next);
+	struct inode *inode = first->wb_context->path.dentry->d_inode;
+
+	/* Set up the RPC argument and reply structs
+	 * NB: take care not to mess about with data->commit et al. */
+
+	list_splice_init(head, &data->pages);
+
+	data->inode	  = inode;
+	data->cred	  = first->wb_context->cred;
+	data->lseg	  = lseg; /* reference transferred */
+	data->mds_ops     = &nfs_commit_ops;
+
+	data->args.fh     = NFS_FH(data->inode);
+	/* Note: we always request a commit of the entire inode */
+	data->args.offset = 0;
+	data->args.count  = 0;
+	data->args.context = get_nfs_open_context(first->wb_context);
+	data->res.count   = 0;
+	data->res.fattr   = &data->fattr;
+	data->res.verf    = &data->verf;
+	nfs_fattr_init(&data->fattr);
+}
+EXPORT_SYMBOL_GPL(nfs_init_commit);
+
+void nfs_retry_commit(struct list_head *page_list,
+		      struct pnfs_layout_segment *lseg)
+{
+	struct nfs_page *req;
+
+	while (!list_empty(page_list)) {
+		req = nfs_list_entry(page_list->next);
+		nfs_list_remove_request(req);
+		nfs_mark_request_commit(req, lseg);
+		dec_zone_page_state(req->wb_page, NR_UNSTABLE_NFS);
+		dec_bdi_stat(req->wb_page->mapping->backing_dev_info,
+			     BDI_RECLAIMABLE);
+		nfs_clear_page_tag_locked(req);
+	}
+}
+EXPORT_SYMBOL_GPL(nfs_retry_commit);
 
 /*
  * Commit dirty pages
@@ -1342,7 +1395,6 @@
 nfs_commit_list(struct inode *inode, struct list_head *head, int how)
 {
 	struct nfs_write_data	*data;
-	struct nfs_page         *req;
 
 	data = nfs_commitdata_alloc();
 
@@ -1350,17 +1402,10 @@
 		goto out_bad;
 
 	/* Set up the argument struct */
-	return nfs_commit_rpcsetup(head, data, how);
+	nfs_init_commit(data, head, NULL);
+	return nfs_initiate_commit(data, NFS_CLIENT(inode), data->mds_ops, how);
  out_bad:
-	while (!list_empty(head)) {
-		req = nfs_list_entry(head->next);
-		nfs_list_remove_request(req);
-		nfs_mark_request_commit(req);
-		dec_zone_page_state(req->wb_page, NR_UNSTABLE_NFS);
-		dec_bdi_stat(req->wb_page->mapping->backing_dev_info,
-				BDI_RECLAIMABLE);
-		nfs_clear_page_tag_locked(req);
-	}
+	nfs_retry_commit(head, NULL);
 	nfs_commit_clear_lock(NFS_I(inode));
 	return -ENOMEM;
 }
@@ -1380,10 +1425,9 @@
 		return;
 }
 
-static void nfs_commit_release(void *calldata)
+void nfs_commit_release_pages(struct nfs_write_data *data)
 {
-	struct nfs_write_data	*data = calldata;
-	struct nfs_page		*req;
+	struct nfs_page	*req;
 	int status = data->task.tk_status;
 
 	while (!list_empty(&data->pages)) {
@@ -1417,6 +1461,14 @@
 	next:
 		nfs_clear_page_tag_locked(req);
 	}
+}
+EXPORT_SYMBOL_GPL(nfs_commit_release_pages);
+
+static void nfs_commit_release(void *calldata)
+{
+	struct nfs_write_data *data = calldata;
+
+	nfs_commit_release_pages(data);
 	nfs_commit_clear_lock(NFS_I(data->inode));
 	nfs_commitdata_release(calldata);
 }
@@ -1433,23 +1485,30 @@
 {
 	LIST_HEAD(head);
 	int may_wait = how & FLUSH_SYNC;
-	int res = 0;
+	int res;
 
-	if (!nfs_commit_set_lock(NFS_I(inode), may_wait))
+	res = nfs_commit_set_lock(NFS_I(inode), may_wait);
+	if (res <= 0)
 		goto out_mark_dirty;
 	spin_lock(&inode->i_lock);
 	res = nfs_scan_commit(inode, &head, 0, 0);
 	spin_unlock(&inode->i_lock);
 	if (res) {
-		int error = nfs_commit_list(inode, &head, how);
+		int error;
+
+		error = pnfs_commit_list(inode, &head, how);
+		if (error == PNFS_NOT_ATTEMPTED)
+			error = nfs_commit_list(inode, &head, how);
 		if (error < 0)
 			return error;
-		if (may_wait)
-			wait_on_bit(&NFS_I(inode)->flags, NFS_INO_COMMIT,
-					nfs_wait_bit_killable,
-					TASK_KILLABLE);
-		else
+		if (!may_wait)
 			goto out_mark_dirty;
+		error = wait_on_bit(&NFS_I(inode)->flags,
+				NFS_INO_COMMIT,
+				nfs_wait_bit_killable,
+				TASK_KILLABLE);
+		if (error < 0)
+			return error;
 	} else
 		nfs_commit_clear_lock(NFS_I(inode));
 	return res;
@@ -1503,7 +1562,22 @@
 
 int nfs_write_inode(struct inode *inode, struct writeback_control *wbc)
 {
-	return nfs_commit_unstable_pages(inode, wbc);
+	int ret;
+
+	ret = nfs_commit_unstable_pages(inode, wbc);
+	if (ret >= 0 && test_bit(NFS_INO_LAYOUTCOMMIT, &NFS_I(inode)->flags)) {
+		int status;
+		bool sync = true;
+
+		if (wbc->sync_mode == WB_SYNC_NONE || wbc->nonblocking ||
+		    wbc->for_background)
+			sync = false;
+
+		status = pnfs_layoutcommit_inode(inode, sync);
+		if (status < 0)
+			return status;
+	}
+	return ret;
 }
 
 /*
diff --git a/fs/nfs_common/nfsacl.c b/fs/nfs_common/nfsacl.c
index 84c27d6..ec0f277 100644
--- a/fs/nfs_common/nfsacl.c
+++ b/fs/nfs_common/nfsacl.c
@@ -117,7 +117,6 @@
 		 * invoked in contexts where a memory allocation failure is
 		 * fatal.  Fortunately this fake ACL is small enough to
 		 * construct on the stack. */
-		memset(acl2, 0, sizeof(acl2));
 		posix_acl_init(acl2, 4);
 
 		/* Insert entries in canonical order: other orders seem
diff --git a/include/linux/nfs4.h b/include/linux/nfs4.h
index 7e7f6b7..b528f6d 100644
--- a/include/linux/nfs4.h
+++ b/include/linux/nfs4.h
@@ -561,6 +561,7 @@
 	NFSPROC4_CLNT_RECLAIM_COMPLETE,
 	NFSPROC4_CLNT_LAYOUTGET,
 	NFSPROC4_CLNT_GETDEVICEINFO,
+	NFSPROC4_CLNT_LAYOUTCOMMIT,
 };
 
 /* nfs41 types */
diff --git a/include/linux/nfs_fs.h b/include/linux/nfs_fs.h
index f88522b..1b93b9c 100644
--- a/include/linux/nfs_fs.h
+++ b/include/linux/nfs_fs.h
@@ -33,6 +33,8 @@
 #define FLUSH_STABLE		4	/* commit to stable storage */
 #define FLUSH_LOWPRI		8	/* low priority background flush */
 #define FLUSH_HIGHPRI		16	/* high priority memory reclaim flush */
+#define FLUSH_COND_STABLE	32	/* conditional stable write - only stable
+					 * if everything fits in one RPC */
 
 #ifdef __KERNEL__
 
@@ -93,8 +95,13 @@
 	int error;
 
 	struct list_head list;
+};
 
+struct nfs_open_dir_context {
+	struct rpc_cred *cred;
 	__u64 dir_cookie;
+	__u64 dup_cookie;
+	int duped;
 };
 
 /*
@@ -191,6 +198,7 @@
 
 	/* pNFS layout information */
 	struct pnfs_layout_hdr *layout;
+	atomic_t		commits_outstanding;
 #endif /* CONFIG_NFS_V4*/
 #ifdef CONFIG_NFS_FSCACHE
 	struct fscache_cookie	*fscache;
@@ -219,6 +227,8 @@
 #define NFS_INO_FSCACHE		(5)		/* inode can be cached by FS-Cache */
 #define NFS_INO_FSCACHE_LOCK	(6)		/* FS-Cache cookie management lock */
 #define NFS_INO_COMMIT		(7)		/* inode is committing unstable writes */
+#define NFS_INO_PNFS_COMMIT	(8)		/* use pnfs code for commit */
+#define NFS_INO_LAYOUTCOMMIT	(9)		/* layoutcommit required */
 
 static inline struct nfs_inode *NFS_I(const struct inode *inode)
 {
diff --git a/include/linux/nfs_page.h b/include/linux/nfs_page.h
index 90907ad..8023e4e 100644
--- a/include/linux/nfs_page.h
+++ b/include/linux/nfs_page.h
@@ -33,11 +33,15 @@
 	PG_CLEAN,
 	PG_NEED_COMMIT,
 	PG_NEED_RESCHED,
+	PG_PNFS_COMMIT,
 };
 
 struct nfs_inode;
 struct nfs_page {
-	struct list_head	wb_list;	/* Defines state of page: */
+	union {
+		struct list_head	wb_list;	/* Defines state of page: */
+		struct pnfs_layout_segment *wb_commit_lseg; /* Used when PG_PNFS_COMMIT set */
+	};
 	struct page		*wb_page;	/* page to read in/write out */
 	struct nfs_open_context	*wb_context;	/* File state context info */
 	struct nfs_lock_context	*wb_lock_context;	/* lock context info */
@@ -57,6 +61,7 @@
 	size_t			pg_count;
 	size_t			pg_bsize;
 	unsigned int		pg_base;
+	char			pg_moreio;
 
 	struct inode		*pg_inode;
 	int			(*pg_doio)(struct nfs_pageio_descriptor *);
diff --git a/include/linux/nfs_xdr.h b/include/linux/nfs_xdr.h
index fa1ba78..78b101e 100644
--- a/include/linux/nfs_xdr.h
+++ b/include/linux/nfs_xdr.h
@@ -195,8 +195,9 @@
 #define PNFS_LAYOUT_MAXSIZE 4096
 
 struct nfs4_layoutdriver_data {
+	struct page **pages;
+	__u32 pglen;
 	__u32 len;
-	void *buf;
 };
 
 struct pnfs_layout_range {
@@ -214,6 +215,7 @@
 	struct nfs_open_context *ctx;
 	struct nfs4_sequence_args seq_args;
 	nfs4_stateid stateid;
+	struct nfs4_layoutdriver_data layout;
 };
 
 struct nfs4_layoutget_res {
@@ -221,8 +223,8 @@
 	struct pnfs_layout_range range;
 	__u32 type;
 	nfs4_stateid stateid;
-	struct nfs4_layoutdriver_data layout;
 	struct nfs4_sequence_res seq_res;
+	struct nfs4_layoutdriver_data *layoutp;
 };
 
 struct nfs4_layoutget {
@@ -241,6 +243,29 @@
 	struct nfs4_sequence_res seq_res;
 };
 
+struct nfs4_layoutcommit_args {
+	nfs4_stateid stateid;
+	__u64 lastbytewritten;
+	struct inode *inode;
+	const u32 *bitmask;
+	struct nfs4_sequence_args seq_args;
+};
+
+struct nfs4_layoutcommit_res {
+	struct nfs_fattr *fattr;
+	const struct nfs_server *server;
+	struct nfs4_sequence_res seq_res;
+};
+
+struct nfs4_layoutcommit_data {
+	struct rpc_task task;
+	struct nfs_fattr fattr;
+	struct pnfs_layout_segment *lseg;
+	struct rpc_cred *cred;
+	struct nfs4_layoutcommit_args args;
+	struct nfs4_layoutcommit_res res;
+};
+
 /*
  * Arguments to the open call.
  */
@@ -1077,6 +1102,7 @@
 	struct nfs_writeres	res;		/* result struct */
 	struct pnfs_layout_segment *lseg;
 	struct nfs_client	*ds_clp;	/* pNFS data server */
+	int			ds_commit_index;
 	const struct rpc_call_ops *mds_ops;
 	int (*write_done_cb) (struct rpc_task *task, struct nfs_write_data *data);
 #ifdef CONFIG_NFS_V4
diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c
index be96d42..1e336a0 100644
--- a/net/sunrpc/xprtsock.c
+++ b/net/sunrpc/xprtsock.c
@@ -710,6 +710,8 @@
 	if (sk == NULL)
 		return;
 
+	transport->srcport = 0;
+
 	write_lock_bh(&sk->sk_callback_lock);
 	transport->inet = NULL;
 	transport->sock = NULL;