pNFS: Do not free layout segments that are marked for return

We may want to process and transmit layout stat information for the
layout segments that are being returned, so we should defer freeing
them until after the layoutreturn has completed.

Signed-off-by: Trond Myklebust <trond.myklebust@primarydata.com>
diff --git a/fs/nfs/pnfs.c b/fs/nfs/pnfs.c
index d70cc46..555151b 100644
--- a/fs/nfs/pnfs.c
+++ b/fs/nfs/pnfs.c
@@ -54,6 +54,10 @@
 static LIST_HEAD(pnfs_modules_tbl);
 
 static void pnfs_layoutreturn_before_put_layout_hdr(struct pnfs_layout_hdr *lo);
+static void pnfs_free_returned_lsegs(struct pnfs_layout_hdr *lo,
+		struct list_head *free_me,
+		const struct pnfs_layout_range *range,
+		u32 seq);
 
 /* Return the registered pnfs layout driver module matching given id */
 static struct pnfs_layoutdriver_type *
@@ -326,6 +330,7 @@
 
 	set_bit(NFS_LAYOUT_INVALID_STID, &lo->plh_flags);
 	pnfs_clear_layoutreturn_info(lo);
+	pnfs_free_returned_lsegs(lo, lseg_list, &range, 0);
 	return pnfs_mark_matching_lsegs_invalid(lo, lseg_list, &range, 0);
 }
 
@@ -405,9 +410,10 @@
 
 static void pnfs_free_lseg(struct pnfs_layout_segment *lseg)
 {
-	struct inode *ino = lseg->pls_layout->plh_inode;
-
-	NFS_SERVER(ino)->pnfs_curr_ld->free_lseg(lseg);
+	if (lseg != NULL) {
+		struct inode *inode = lseg->pls_layout->plh_inode;
+		NFS_SERVER(inode)->pnfs_curr_ld->free_lseg(lseg);
+	}
 }
 
 static void
@@ -430,6 +436,18 @@
 	rpc_wake_up(&NFS_SERVER(inode)->roc_rpcwaitq);
 }
 
+static bool
+pnfs_cache_lseg_for_layoutreturn(struct pnfs_layout_hdr *lo,
+		struct pnfs_layout_segment *lseg)
+{
+	if (test_and_clear_bit(NFS_LSEG_LAYOUTRETURN, &lseg->pls_flags) &&
+	    pnfs_layout_is_valid(lo)) {
+		list_move_tail(&lseg->pls_list, &lo->plh_return_segs);
+		return true;
+	}
+	return false;
+}
+
 void
 pnfs_put_lseg(struct pnfs_layout_segment *lseg)
 {
@@ -453,6 +471,8 @@
 		}
 		pnfs_get_layout_hdr(lo);
 		pnfs_layout_remove_lseg(lo, lseg);
+		if (pnfs_cache_lseg_for_layoutreturn(lo, lseg))
+			lseg = NULL;
 		spin_unlock(&inode->i_lock);
 		pnfs_free_lseg(lseg);
 		pnfs_put_layout_hdr(lo);
@@ -493,9 +513,11 @@
 		struct pnfs_layout_hdr *lo = lseg->pls_layout;
 		if (test_bit(NFS_LSEG_VALID, &lseg->pls_flags))
 			return;
-		pnfs_get_layout_hdr(lo);
 		pnfs_layout_remove_lseg(lo, lseg);
-		pnfs_free_lseg_async(lseg);
+		if (!pnfs_cache_lseg_for_layoutreturn(lo, lseg)) {
+			pnfs_get_layout_hdr(lo);
+			pnfs_free_lseg_async(lseg);
+		}
 	}
 }
 EXPORT_SYMBOL_GPL(pnfs_put_lseg_locked);
@@ -619,6 +641,20 @@
 	return remaining;
 }
 
+static void
+pnfs_free_returned_lsegs(struct pnfs_layout_hdr *lo,
+		struct list_head *free_me,
+		const struct pnfs_layout_range *range,
+		u32 seq)
+{
+	struct pnfs_layout_segment *lseg, *next;
+
+	list_for_each_entry_safe(lseg, next, &lo->plh_return_segs, pls_list) {
+		if (pnfs_match_lseg_recall(lseg, range, seq))
+			list_move_tail(&lseg->pls_list, free_me);
+	}
+}
+
 /* note free_me must contain lsegs from a single layout_hdr */
 void
 pnfs_free_lseg_list(struct list_head *free_me)
@@ -915,7 +951,7 @@
 	}
 }
 
-void pnfs_clear_layoutreturn_waitbit(struct pnfs_layout_hdr *lo)
+static void pnfs_clear_layoutreturn_waitbit(struct pnfs_layout_hdr *lo)
 {
 	clear_bit_unlock(NFS_LAYOUT_RETURN, &lo->plh_flags);
 	clear_bit(NFS_LAYOUT_RETURN_LOCK, &lo->plh_flags);
@@ -924,6 +960,27 @@
 	rpc_wake_up(&NFS_SERVER(lo->plh_inode)->roc_rpcwaitq);
 }
 
+void pnfs_layoutreturn_free_lsegs(struct pnfs_layout_hdr *lo,
+		const struct pnfs_layout_range *range,
+		u32 seq,
+		const nfs4_stateid *stateid)
+{
+	struct inode *inode = lo->plh_inode;
+	LIST_HEAD(freeme);
+
+	spin_lock(&inode->i_lock);
+	if (stateid) {
+		pnfs_mark_matching_lsegs_invalid(lo, &freeme, range, seq);
+		pnfs_free_returned_lsegs(lo, &freeme, range, seq);
+		pnfs_set_layout_stateid(lo, stateid, true);
+	} else
+		pnfs_mark_layout_stateid_invalid(lo, &freeme);
+	pnfs_clear_layoutreturn_waitbit(lo);
+	spin_unlock(&inode->i_lock);
+	pnfs_free_lseg_list(&freeme);
+
+}
+
 static bool
 pnfs_prepare_layoutreturn(struct pnfs_layout_hdr *lo,
 		nfs4_stateid *stateid,
@@ -1349,6 +1406,7 @@
 	atomic_set(&lo->plh_refcount, 1);
 	INIT_LIST_HEAD(&lo->plh_layouts);
 	INIT_LIST_HEAD(&lo->plh_segs);
+	INIT_LIST_HEAD(&lo->plh_return_segs);
 	INIT_LIST_HEAD(&lo->plh_bulk_destroy);
 	lo->plh_inode = ino;
 	lo->plh_lc_cred = get_rpccred(ctx->cred);
@@ -1920,7 +1978,6 @@
 		.offset = 0,
 		.length = NFS4_MAX_UINT64,
 	};
-	LIST_HEAD(free_me);
 	bool return_now = false;
 
 	spin_lock(&inode->i_lock);
@@ -1932,7 +1989,7 @@
 	 * segments at hand when sending layoutreturn. See pnfs_put_lseg()
 	 * for how it works.
 	 */
-	if (!pnfs_mark_matching_lsegs_return(lo, &free_me, &range, 0)) {
+	if (!pnfs_mark_matching_lsegs_return(lo, &lo->plh_return_segs, &range, 0)) {
 		nfs4_stateid stateid;
 		enum pnfs_iomode iomode;
 
@@ -1944,7 +2001,6 @@
 		spin_unlock(&inode->i_lock);
 		nfs_commit_inode(inode, 0);
 	}
-	pnfs_free_lseg_list(&free_me);
 }
 EXPORT_SYMBOL_GPL(pnfs_error_mark_layout_for_return);