NFSD introduce async copy feature

Upon receiving a request for async copy, create a new kthread.  If we
get asynchronous request, make sure to copy the needed arguments/state
from the stack before starting the copy. Then start the thread and reply
back to the client indicating copy is asynchronous.

nfsd_copy_file_range() will copy in a loop over the total number of
bytes is needed to copy. In case a failure happens in the middle, we
ignore the error and return how much we copied so far. Once done
creating a workitem for the callback workqueue and send CB_OFFLOAD with
the results.

The lifetime of the copy stateid is bound to the vfs copy. This way we
don't need to keep the nfsd_net structure for the callback.  We could
keep it around longer so that an OFFLOAD_STATUS that came late would
still get results, but clients should be able to deal without that.

We handle OFFLOAD_CANCEL by sending a signal to the copy thread and
calling kthread_stop.

A client should cancel any ongoing copies before calling DESTROY_CLIENT;
if not, we return a CLIENT_BUSY error.

If the client is destroyed for some other reason (lease expiration, or
server shutdown), we must clean up any ongoing copies ourselves.

Signed-off-by: Olga Kornievskaia <kolga@netapp.com>
[colin.king@canonical.com: fix leak in error case]
[bfields@fieldses.org: remove signalling, merge patches]
Signed-off-by: J. Bruce Fields <bfields@redhat.com>
diff --git a/fs/nfsd/nfs4xdr.c b/fs/nfsd/nfs4xdr.c
index b78280a..3de42a7 100644
--- a/fs/nfsd/nfs4xdr.c
+++ b/fs/nfsd/nfs4xdr.c
@@ -4231,15 +4231,27 @@ nfsd4_encode_layoutreturn(struct nfsd4_compoundres *resp, __be32 nfserr,
 #endif /* CONFIG_NFSD_PNFS */
 
 static __be32
-nfsd42_encode_write_res(struct nfsd4_compoundres *resp, struct nfsd42_write_res *write)
+nfsd42_encode_write_res(struct nfsd4_compoundres *resp,
+		struct nfsd42_write_res *write, bool sync)
 {
 	__be32 *p;
-
-	p = xdr_reserve_space(&resp->xdr, 4 + 8 + 4 + NFS4_VERIFIER_SIZE);
+	p = xdr_reserve_space(&resp->xdr, 4);
 	if (!p)
 		return nfserr_resource;
 
-	*p++ = cpu_to_be32(0);
+	if (sync)
+		*p++ = cpu_to_be32(0);
+	else {
+		__be32 nfserr;
+		*p++ = cpu_to_be32(1);
+		nfserr = nfsd4_encode_stateid(&resp->xdr, &write->cb_stateid);
+		if (nfserr)
+			return nfserr;
+	}
+	p = xdr_reserve_space(&resp->xdr, 8 + 4 + NFS4_VERIFIER_SIZE);
+	if (!p)
+		return nfserr_resource;
+
 	p = xdr_encode_hyper(p, write->wr_bytes_written);
 	*p++ = cpu_to_be32(write->wr_stable_how);
 	p = xdr_encode_opaque_fixed(p, write->wr_verifier.data,
@@ -4253,7 +4265,8 @@ nfsd4_encode_copy(struct nfsd4_compoundres *resp, __be32 nfserr,
 {
 	__be32 *p;
 
-	nfserr = nfsd42_encode_write_res(resp, &copy->cp_res);
+	nfserr = nfsd42_encode_write_res(resp, &copy->cp_res,
+			copy->cp_synchronous);
 	if (nfserr)
 		return nfserr;