NFSD introduce async copy feature

Upon receiving a request for async copy, create a new kthread.  If we
get asynchronous request, make sure to copy the needed arguments/state
from the stack before starting the copy. Then start the thread and reply
back to the client indicating copy is asynchronous.

nfsd_copy_file_range() will copy in a loop over the total number of
bytes is needed to copy. In case a failure happens in the middle, we
ignore the error and return how much we copied so far. Once done
creating a workitem for the callback workqueue and send CB_OFFLOAD with
the results.

The lifetime of the copy stateid is bound to the vfs copy. This way we
don't need to keep the nfsd_net structure for the callback.  We could
keep it around longer so that an OFFLOAD_STATUS that came late would
still get results, but clients should be able to deal without that.

We handle OFFLOAD_CANCEL by sending a signal to the copy thread and
calling kthread_stop.

A client should cancel any ongoing copies before calling DESTROY_CLIENT;
if not, we return a CLIENT_BUSY error.

If the client is destroyed for some other reason (lease expiration, or
server shutdown), we must clean up any ongoing copies ourselves.

Signed-off-by: Olga Kornievskaia <kolga@netapp.com>
[colin.king@canonical.com: fix leak in error case]
[bfields@fieldses.org: remove signalling, merge patches]
Signed-off-by: J. Bruce Fields <bfields@redhat.com>
diff --git a/fs/nfsd/nfs4state.c b/fs/nfsd/nfs4state.c
index b0ca0ef..07a57d0 100644
--- a/fs/nfsd/nfs4state.c
+++ b/fs/nfsd/nfs4state.c
@@ -713,6 +713,36 @@ struct nfs4_stid *nfs4_alloc_stid(struct nfs4_client *cl, struct kmem_cache *sla
 	return NULL;
 }
 
+/*
+ * Create a unique stateid_t to represent each COPY.
+ */
+int nfs4_init_cp_state(struct nfsd_net *nn, struct nfsd4_copy *copy)
+{
+	int new_id;
+
+	idr_preload(GFP_KERNEL);
+	spin_lock(&nn->s2s_cp_lock);
+	new_id = idr_alloc_cyclic(&nn->s2s_cp_stateids, copy, 0, 0, GFP_NOWAIT);
+	spin_unlock(&nn->s2s_cp_lock);
+	idr_preload_end();
+	if (new_id < 0)
+		return 0;
+	copy->cp_stateid.si_opaque.so_id = new_id;
+	copy->cp_stateid.si_opaque.so_clid.cl_boot = nn->boot_time;
+	copy->cp_stateid.si_opaque.so_clid.cl_id = nn->s2s_cp_cl_id;
+	return 1;
+}
+
+void nfs4_free_cp_state(struct nfsd4_copy *copy)
+{
+	struct nfsd_net *nn;
+
+	nn = net_generic(copy->cp_clp->net, nfsd_net_id);
+	spin_lock(&nn->s2s_cp_lock);
+	idr_remove(&nn->s2s_cp_stateids, copy->cp_stateid.si_opaque.so_id);
+	spin_unlock(&nn->s2s_cp_lock);
+}
+
 static struct nfs4_ol_stateid * nfs4_alloc_open_stateid(struct nfs4_client *clp)
 {
 	struct nfs4_stid *stid;
@@ -1827,6 +1857,8 @@ static struct nfs4_client *alloc_client(struct xdr_netobj name)
 #ifdef CONFIG_NFSD_PNFS
 	INIT_LIST_HEAD(&clp->cl_lo_states);
 #endif
+	INIT_LIST_HEAD(&clp->async_copies);
+	spin_lock_init(&clp->async_lock);
 	spin_lock_init(&clp->cl_lock);
 	rpc_init_wait_queue(&clp->cl_cb_waitq, "Backchannel slot table");
 	return clp;
@@ -1942,6 +1974,7 @@ __destroy_client(struct nfs4_client *clp)
 		}
 	}
 	nfsd4_return_all_client_layouts(clp);
+	nfsd4_shutdown_copy(clp);
 	nfsd4_shutdown_callback(clp);
 	if (clp->cl_cb_conn.cb_xprt)
 		svc_xprt_put(clp->cl_cb_conn.cb_xprt);
@@ -2475,7 +2508,8 @@ static bool client_has_state(struct nfs4_client *clp)
 		|| !list_empty(&clp->cl_lo_states)
 #endif
 		|| !list_empty(&clp->cl_delegations)
-		|| !list_empty(&clp->cl_sessions);
+		|| !list_empty(&clp->cl_sessions)
+		|| !list_empty(&clp->async_copies);
 }
 
 __be32
@@ -7161,6 +7195,8 @@ static int nfs4_state_create_net(struct net *net)
 	INIT_LIST_HEAD(&nn->close_lru);
 	INIT_LIST_HEAD(&nn->del_recall_lru);
 	spin_lock_init(&nn->client_lock);
+	spin_lock_init(&nn->s2s_cp_lock);
+	idr_init(&nn->s2s_cp_stateids);
 
 	spin_lock_init(&nn->blocked_locks_lock);
 	INIT_LIST_HEAD(&nn->blocked_locks_lru);