NFSv4.1: Fix bulk recall and destroy of layouts

The current code in pnfs_destroy_all_layouts() assumes that removing
the layout from the server->layouts list is sufficient to make it
invisible to other processes. This ignores the fact that most
users access the layout through the nfs_inode->layout...
There is further breakage due to lack of reference counting of the
layouts, meaning that the whole thing Oopses at the drop of a hat.

The code in initiate_bulk_draining() is almost correct, and can be
used as a model for pnfs_destroy_all_layouts(), so move that
code to pnfs.c, and refactor the code to allow us to choose between
a single filesystem bulk recall, and a recall of all layouts.
Also note that initiate_bulk_draining() currently calls iput() while
holding locks. Fix that too.

Signed-off-by: Trond Myklebust <Trond.Myklebust@netapp.com>
Cc: stable@vger.kernel.org
diff --git a/fs/nfs/callback_proc.c b/fs/nfs/callback_proc.c
index 264d1aa..2960512 100644
--- a/fs/nfs/callback_proc.c
+++ b/fs/nfs/callback_proc.c
@@ -183,60 +183,15 @@
 static u32 initiate_bulk_draining(struct nfs_client *clp,
 				  struct cb_layoutrecallargs *args)
 {
-	struct nfs_server *server;
-	struct pnfs_layout_hdr *lo;
-	struct inode *ino;
-	u32 rv = NFS4ERR_NOMATCHING_LAYOUT;
-	struct pnfs_layout_hdr *tmp;
-	LIST_HEAD(recall_list);
-	LIST_HEAD(free_me_list);
-	struct pnfs_layout_range range = {
-		.iomode = IOMODE_ANY,
-		.offset = 0,
-		.length = NFS4_MAX_UINT64,
-	};
+	int stat;
 
-	spin_lock(&clp->cl_lock);
-	rcu_read_lock();
-	list_for_each_entry_rcu(server, &clp->cl_superblocks, client_link) {
-		if ((args->cbl_recall_type == RETURN_FSID) &&
-		    memcmp(&server->fsid, &args->cbl_fsid,
-			   sizeof(struct nfs_fsid)))
-			continue;
-
-		list_for_each_entry(lo, &server->layouts, plh_layouts) {
-			ino = igrab(lo->plh_inode);
-			if (!ino)
-				continue;
-			spin_lock(&ino->i_lock);
-			/* Is this layout in the process of being freed? */
-			if (NFS_I(ino)->layout != lo) {
-				spin_unlock(&ino->i_lock);
-				iput(ino);
-				continue;
-			}
-			pnfs_get_layout_hdr(lo);
-			spin_unlock(&ino->i_lock);
-			list_add(&lo->plh_bulk_recall, &recall_list);
-		}
-	}
-	rcu_read_unlock();
-	spin_unlock(&clp->cl_lock);
-
-	list_for_each_entry_safe(lo, tmp,
-				 &recall_list, plh_bulk_recall) {
-		ino = lo->plh_inode;
-		spin_lock(&ino->i_lock);
-		set_bit(NFS_LAYOUT_BULK_RECALL, &lo->plh_flags);
-		if (pnfs_mark_matching_lsegs_invalid(lo, &free_me_list, &range))
-			rv = NFS4ERR_DELAY;
-		list_del_init(&lo->plh_bulk_recall);
-		spin_unlock(&ino->i_lock);
-		pnfs_free_lseg_list(&free_me_list);
-		pnfs_put_layout_hdr(lo);
-		iput(ino);
-	}
-	return rv;
+	if (args->cbl_recall_type == RETURN_FSID)
+		stat = pnfs_destroy_layouts_byfsid(clp, &args->cbl_fsid, true);
+	else
+		stat = pnfs_destroy_layouts_byclid(clp, true);
+	if (stat != 0)
+		return NFS4ERR_DELAY;
+	return NFS4ERR_NOMATCHING_LAYOUT;
 }
 
 static u32 do_callback_layoutrecall(struct nfs_client *clp,
diff --git a/fs/nfs/pnfs.c b/fs/nfs/pnfs.c
index d00260b..6be70f6 100644
--- a/fs/nfs/pnfs.c
+++ b/fs/nfs/pnfs.c
@@ -505,6 +505,136 @@
 }
 EXPORT_SYMBOL_GPL(pnfs_destroy_layout);
 
+static bool
+pnfs_layout_add_bulk_destroy_list(struct inode *inode,
+		struct list_head *layout_list)
+{
+	struct pnfs_layout_hdr *lo;
+	bool ret = false;
+
+	spin_lock(&inode->i_lock);
+	lo = NFS_I(inode)->layout;
+	if (lo != NULL && list_empty(&lo->plh_bulk_destroy)) {
+		pnfs_get_layout_hdr(lo);
+		list_add(&lo->plh_bulk_destroy, layout_list);
+		ret = true;
+	}
+	spin_unlock(&inode->i_lock);
+	return ret;
+}
+
+/* Caller must hold rcu_read_lock and clp->cl_lock */
+static int
+pnfs_layout_bulk_destroy_byserver_locked(struct nfs_client *clp,
+		struct nfs_server *server,
+		struct list_head *layout_list)
+{
+	struct pnfs_layout_hdr *lo, *next;
+	struct inode *inode;
+
+	list_for_each_entry_safe(lo, next, &server->layouts, plh_layouts) {
+		inode = igrab(lo->plh_inode);
+		if (inode == NULL)
+			continue;
+		list_del_init(&lo->plh_layouts);
+		if (pnfs_layout_add_bulk_destroy_list(inode, layout_list))
+			continue;
+		rcu_read_unlock();
+		spin_unlock(&clp->cl_lock);
+		iput(inode);
+		spin_lock(&clp->cl_lock);
+		rcu_read_lock();
+		return -EAGAIN;
+	}
+	return 0;
+}
+
+static int
+pnfs_layout_free_bulk_destroy_list(struct list_head *layout_list,
+		bool is_bulk_recall)
+{
+	struct pnfs_layout_hdr *lo;
+	struct inode *inode;
+	struct pnfs_layout_range range = {
+		.iomode = IOMODE_ANY,
+		.offset = 0,
+		.length = NFS4_MAX_UINT64,
+	};
+	LIST_HEAD(lseg_list);
+	int ret = 0;
+
+	while (!list_empty(layout_list)) {
+		lo = list_entry(layout_list->next, struct pnfs_layout_hdr,
+				plh_bulk_destroy);
+		dprintk("%s freeing layout for inode %lu\n", __func__,
+			lo->plh_inode->i_ino);
+		inode = lo->plh_inode;
+		spin_lock(&inode->i_lock);
+		list_del_init(&lo->plh_bulk_destroy);
+		lo->plh_block_lgets++; /* permanently block new LAYOUTGETs */
+		if (is_bulk_recall)
+			set_bit(NFS_LAYOUT_BULK_RECALL, &lo->plh_flags);
+		if (pnfs_mark_matching_lsegs_invalid(lo, &lseg_list, &range))
+			ret = -EAGAIN;
+		spin_unlock(&inode->i_lock);
+		pnfs_free_lseg_list(&lseg_list);
+		pnfs_put_layout_hdr(lo);
+		iput(inode);
+	}
+	return ret;
+}
+
+int
+pnfs_destroy_layouts_byfsid(struct nfs_client *clp,
+		struct nfs_fsid *fsid,
+		bool is_recall)
+{
+	struct nfs_server *server;
+	LIST_HEAD(layout_list);
+
+	spin_lock(&clp->cl_lock);
+	rcu_read_lock();
+restart:
+	list_for_each_entry_rcu(server, &clp->cl_superblocks, client_link) {
+		if (memcmp(&server->fsid, fsid, sizeof(*fsid)) != 0)
+			continue;
+		if (pnfs_layout_bulk_destroy_byserver_locked(clp,
+				server,
+				&layout_list) != 0)
+			goto restart;
+	}
+	rcu_read_unlock();
+	spin_unlock(&clp->cl_lock);
+
+	if (list_empty(&layout_list))
+		return 0;
+	return pnfs_layout_free_bulk_destroy_list(&layout_list, is_recall);
+}
+
+int
+pnfs_destroy_layouts_byclid(struct nfs_client *clp,
+		bool is_recall)
+{
+	struct nfs_server *server;
+	LIST_HEAD(layout_list);
+
+	spin_lock(&clp->cl_lock);
+	rcu_read_lock();
+restart:
+	list_for_each_entry_rcu(server, &clp->cl_superblocks, client_link) {
+		if (pnfs_layout_bulk_destroy_byserver_locked(clp,
+					server,
+					&layout_list) != 0)
+			goto restart;
+	}
+	rcu_read_unlock();
+	spin_unlock(&clp->cl_lock);
+
+	if (list_empty(&layout_list))
+		return 0;
+	return pnfs_layout_free_bulk_destroy_list(&layout_list, is_recall);
+}
+
 /*
  * Called by the state manger to remove all layouts established under an
  * expired lease.
@@ -512,30 +642,10 @@
 void
 pnfs_destroy_all_layouts(struct nfs_client *clp)
 {
-	struct nfs_server *server;
-	struct pnfs_layout_hdr *lo;
-	LIST_HEAD(tmp_list);
-
 	nfs4_deviceid_mark_client_invalid(clp);
 	nfs4_deviceid_purge_client(clp);
 
-	spin_lock(&clp->cl_lock);
-	rcu_read_lock();
-	list_for_each_entry_rcu(server, &clp->cl_superblocks, client_link) {
-		if (!list_empty(&server->layouts))
-			list_splice_init(&server->layouts, &tmp_list);
-	}
-	rcu_read_unlock();
-	spin_unlock(&clp->cl_lock);
-
-	while (!list_empty(&tmp_list)) {
-		lo = list_entry(tmp_list.next, struct pnfs_layout_hdr,
-				plh_layouts);
-		dprintk("%s freeing layout for inode %lu\n", __func__,
-			lo->plh_inode->i_ino);
-		list_del_init(&lo->plh_layouts);
-		pnfs_destroy_layout(NFS_I(lo->plh_inode));
-	}
+	pnfs_destroy_layouts_byclid(clp, false);
 }
 
 /*
@@ -888,7 +998,7 @@
 	atomic_set(&lo->plh_refcount, 1);
 	INIT_LIST_HEAD(&lo->plh_layouts);
 	INIT_LIST_HEAD(&lo->plh_segs);
-	INIT_LIST_HEAD(&lo->plh_bulk_recall);
+	INIT_LIST_HEAD(&lo->plh_bulk_destroy);
 	lo->plh_inode = ino;
 	lo->plh_lc_cred = get_rpccred(ctx->state->owner->so_cred);
 	return lo;
diff --git a/fs/nfs/pnfs.h b/fs/nfs/pnfs.h
index dbf7bba..97cb358 100644
--- a/fs/nfs/pnfs.h
+++ b/fs/nfs/pnfs.h
@@ -132,7 +132,7 @@
 struct pnfs_layout_hdr {
 	atomic_t		plh_refcount;
 	struct list_head	plh_layouts;   /* other client layouts */
-	struct list_head	plh_bulk_recall; /* clnt list of bulk recalls */
+	struct list_head	plh_bulk_destroy;
 	struct list_head	plh_segs;      /* layout segments list */
 	nfs4_stateid		plh_stateid;
 	atomic_t		plh_outstanding; /* number of RPCs out */
@@ -196,6 +196,11 @@
 void pnfs_free_lseg_list(struct list_head *tmp_list);
 void pnfs_destroy_layout(struct nfs_inode *);
 void pnfs_destroy_all_layouts(struct nfs_client *);
+int pnfs_destroy_layouts_byfsid(struct nfs_client *clp,
+		struct nfs_fsid *fsid,
+		bool is_recall);
+int pnfs_destroy_layouts_byclid(struct nfs_client *clp,
+		bool is_recall);
 void pnfs_put_layout_hdr(struct pnfs_layout_hdr *lo);
 void pnfs_set_layout_stateid(struct pnfs_layout_hdr *lo,
 			     const nfs4_stateid *new,