pnfs: layout roc code

A layout can request return-on-close.  How this interacts with the
forgetful model of never sending LAYOUTRETURNS is a bit ambiguous.
We forget any layouts marked roc, and wait for them to be completely
forgotten before continuing with the close.  In addition, to compensate
for races with any inflight LAYOUTGETs, and the fact that we do not get
any layout stateid back from the server, we set the barrier to the worst
case scenario of current_seqid + number of outstanding LAYOUTGETS.

Signed-off-by: Fred Isaman <iisaman@netapp.com>
Signed-off-by: Trond Myklebust <Trond.Myklebust@netapp.com>
diff --git a/fs/nfs/nfs4proc.c b/fs/nfs/nfs4proc.c
index a3549ce..88f590f 100644
--- a/fs/nfs/nfs4proc.c
+++ b/fs/nfs/nfs4proc.c
@@ -1839,6 +1839,8 @@
 	struct nfs_closeres res;
 	struct nfs_fattr fattr;
 	unsigned long timestamp;
+	bool roc;
+	u32 roc_barrier;
 };
 
 static void nfs4_free_closedata(void *data)
@@ -1846,6 +1848,8 @@
 	struct nfs4_closedata *calldata = data;
 	struct nfs4_state_owner *sp = calldata->state->owner;
 
+	if (calldata->roc)
+		pnfs_roc_release(calldata->state->inode);
 	nfs4_put_open_state(calldata->state);
 	nfs_free_seqid(calldata->arg.seqid);
 	nfs4_put_state_owner(sp);
@@ -1878,6 +1882,9 @@
 	 */
 	switch (task->tk_status) {
 		case 0:
+			if (calldata->roc)
+				pnfs_roc_set_barrier(state->inode,
+						     calldata->roc_barrier);
 			nfs_set_open_stateid(state, &calldata->res.stateid, 0);
 			renew_lease(server, calldata->timestamp);
 			nfs4_close_clear_stateid_flags(state,
@@ -1930,8 +1937,15 @@
 		return;
 	}
 
-	if (calldata->arg.fmode == 0)
+	if (calldata->arg.fmode == 0) {
 		task->tk_msg.rpc_proc = &nfs4_procedures[NFSPROC4_CLNT_CLOSE];
+		if (calldata->roc &&
+		    pnfs_roc_drain(calldata->inode, &calldata->roc_barrier)) {
+			rpc_sleep_on(&NFS_SERVER(calldata->inode)->roc_rpcwaitq,
+				     task, NULL);
+			return;
+		}
+	}
 
 	nfs_fattr_init(calldata->res.fattr);
 	calldata->timestamp = jiffies;
@@ -1959,7 +1973,7 @@
  *
  * NOTE: Caller must be holding the sp->so_owner semaphore!
  */
-int nfs4_do_close(struct path *path, struct nfs4_state *state, gfp_t gfp_mask, int wait)
+int nfs4_do_close(struct path *path, struct nfs4_state *state, gfp_t gfp_mask, int wait, bool roc)
 {
 	struct nfs_server *server = NFS_SERVER(state->inode);
 	struct nfs4_closedata *calldata;
@@ -1994,6 +2008,7 @@
 	calldata->res.fattr = &calldata->fattr;
 	calldata->res.seqid = calldata->arg.seqid;
 	calldata->res.server = server;
+	calldata->roc = roc;
 	path_get(path);
 	calldata->path = *path;
 
@@ -2011,6 +2026,8 @@
 out_free_calldata:
 	kfree(calldata);
 out:
+	if (roc)
+		pnfs_roc_release(state->inode);
 	nfs4_put_open_state(state);
 	nfs4_put_state_owner(sp);
 	return status;