afs: Refcount the afs_call struct

A static checker warning occurs in the AFS filesystem:

	fs/afs/cmservice.c:155 SRXAFSCB_CallBack()
	error: dereferencing freed memory 'call'

due to the reply being sent before we access the server it points to.  The
act of sending the reply causes the call to be freed if an error occurs
(but not if it doesn't).

On top of this, the lifetime handling of afs_call structs is fragile
because they get passed around through workqueues without any sort of
refcounting.

Deal with the issues by:

 (1) Fix the maybe/maybe not nature of the reply sending functions with
     regards to whether they release the call struct.

 (2) Refcount the afs_call struct and sort out places that need to get/put
     references.

 (3) Pass a ref through the work queue and release (or pass on) that ref in
     the work function.  Care has to be taken because a work queue may
     already own a ref to the call.

 (4) Do the cleaning up in the put function only.

 (5) Simplify module cleanup by always incrementing afs_outstanding_calls
     whenever a call is allocated.

 (6) Set the backlog to 0 with kernel_listen() at the beginning of the
     process of closing the socket to prevent new incoming calls from
     occurring and to remove the contribution of preallocated calls from
     afs_outstanding_calls before we wait on it.

A tracepoint is also added to monitor the afs_call refcount and lifetime.

Reported-by: Dan Carpenter <dan.carpenter@oracle.com>
Signed-off-by: David Howells <dhowells@redhat.com>
Fixes: 08e0e7c82eea: "[AF_RXRPC]: Make the in-kernel AFS filesystem use AF_RXRPC."
diff --git a/fs/afs/rxrpc.c b/fs/afs/rxrpc.c
index ec1e41f..95f4287 100644
--- a/fs/afs/rxrpc.c
+++ b/fs/afs/rxrpc.c
@@ -19,9 +19,8 @@
 struct socket *afs_socket; /* my RxRPC socket */
 static struct workqueue_struct *afs_async_calls;
 static struct afs_call *afs_spare_incoming_call;
-static atomic_t afs_outstanding_calls;
+atomic_t afs_outstanding_calls;
 
-static void afs_free_call(struct afs_call *);
 static void afs_wake_up_call_waiter(struct sock *, struct rxrpc_call *, unsigned long);
 static int afs_wait_for_call_to_complete(struct afs_call *);
 static void afs_wake_up_async_call(struct sock *, struct rxrpc_call *, unsigned long);
@@ -112,9 +111,11 @@
 {
 	_enter("");
 
+	kernel_listen(afs_socket, 0);
+	flush_workqueue(afs_async_calls);
+
 	if (afs_spare_incoming_call) {
-		atomic_inc(&afs_outstanding_calls);
-		afs_free_call(afs_spare_incoming_call);
+		afs_put_call(afs_spare_incoming_call);
 		afs_spare_incoming_call = NULL;
 	}
 
@@ -123,7 +124,6 @@
 			 TASK_UNINTERRUPTIBLE);
 	_debug("no outstanding calls");
 
-	flush_workqueue(afs_async_calls);
 	kernel_sock_shutdown(afs_socket, SHUT_RDWR);
 	flush_workqueue(afs_async_calls);
 	sock_release(afs_socket);
@@ -134,44 +134,79 @@
 }
 
 /*
- * free a call
+ * Allocate a call.
  */
-static void afs_free_call(struct afs_call *call)
+static struct afs_call *afs_alloc_call(const struct afs_call_type *type,
+				       gfp_t gfp)
 {
-	_debug("DONE %p{%s} [%d]",
-	       call, call->type->name, atomic_read(&afs_outstanding_calls));
+	struct afs_call *call;
+	int o;
 
-	ASSERTCMP(call->rxcall, ==, NULL);
-	ASSERT(!work_pending(&call->async_work));
-	ASSERT(call->type->name != NULL);
+	call = kzalloc(sizeof(*call), gfp);
+	if (!call)
+		return NULL;
 
-	kfree(call->request);
-	kfree(call);
+	call->type = type;
+	atomic_set(&call->usage, 1);
+	INIT_WORK(&call->async_work, afs_process_async_call);
+	init_waitqueue_head(&call->waitq);
 
-	if (atomic_dec_and_test(&afs_outstanding_calls))
-		wake_up_atomic_t(&afs_outstanding_calls);
+	o = atomic_inc_return(&afs_outstanding_calls);
+	trace_afs_call(call, afs_call_trace_alloc, 1, o,
+		       __builtin_return_address(0));
+	return call;
 }
 
 /*
- * End a call but do not free it
+ * Dispose of a reference on a call.
  */
-static void afs_end_call_nofree(struct afs_call *call)
+void afs_put_call(struct afs_call *call)
 {
-	if (call->rxcall) {
-		rxrpc_kernel_end_call(afs_socket, call->rxcall);
-		call->rxcall = NULL;
+	int n = atomic_dec_return(&call->usage);
+	int o = atomic_read(&afs_outstanding_calls);
+
+	trace_afs_call(call, afs_call_trace_put, n + 1, o,
+		       __builtin_return_address(0));
+
+	ASSERTCMP(n, >=, 0);
+	if (n == 0) {
+		ASSERT(!work_pending(&call->async_work));
+		ASSERT(call->type->name != NULL);
+
+		if (call->rxcall) {
+			rxrpc_kernel_end_call(afs_socket, call->rxcall);
+			call->rxcall = NULL;
+		}
+		if (call->type->destructor)
+			call->type->destructor(call);
+
+		kfree(call->request);
+		kfree(call);
+
+		o = atomic_dec_return(&afs_outstanding_calls);
+		trace_afs_call(call, afs_call_trace_free, 0, o,
+			       __builtin_return_address(0));
+		if (o == 0)
+			wake_up_atomic_t(&afs_outstanding_calls);
 	}
-	if (call->type->destructor)
-		call->type->destructor(call);
 }
 
 /*
- * End a call and free it
+ * Queue the call for actual work.  Returns 0 unconditionally for convenience.
  */
-static void afs_end_call(struct afs_call *call)
+int afs_queue_call_work(struct afs_call *call)
 {
-	afs_end_call_nofree(call);
-	afs_free_call(call);
+	int u = atomic_inc_return(&call->usage);
+
+	trace_afs_call(call, afs_call_trace_work, u,
+		       atomic_read(&afs_outstanding_calls),
+		       __builtin_return_address(0));
+
+	INIT_WORK(&call->work, call->type->work);
+
+	if (!queue_work(afs_wq, &call->work))
+		afs_put_call(call);
+	return 0;
 }
 
 /*
@@ -182,25 +217,19 @@
 {
 	struct afs_call *call;
 
-	call = kzalloc(sizeof(*call), GFP_NOFS);
+	call = afs_alloc_call(type, GFP_NOFS);
 	if (!call)
 		goto nomem_call;
 
-	_debug("CALL %p{%s} [%d]",
-	       call, type->name, atomic_read(&afs_outstanding_calls));
-	atomic_inc(&afs_outstanding_calls);
-
-	call->type = type;
-	call->request_size = request_size;
-	call->reply_max = reply_max;
-
 	if (request_size) {
+		call->request_size = request_size;
 		call->request = kmalloc(request_size, GFP_NOFS);
 		if (!call->request)
 			goto nomem_free;
 	}
 
 	if (reply_max) {
+		call->reply_max = reply_max;
 		call->buffer = kmalloc(reply_max, GFP_NOFS);
 		if (!call->buffer)
 			goto nomem_free;
@@ -210,7 +239,7 @@
 	return call;
 
 nomem_free:
-	afs_free_call(call);
+	afs_put_call(call);
 nomem_call:
 	return NULL;
 }
@@ -315,7 +344,6 @@
 	       atomic_read(&afs_outstanding_calls));
 
 	call->async = async;
-	INIT_WORK(&call->async_work, afs_process_async_call);
 
 	memset(&srx, 0, sizeof(srx));
 	srx.srx_family = AF_RXRPC;
@@ -378,7 +406,7 @@
 error_do_abort:
 	rxrpc_kernel_abort_call(afs_socket, rxcall, RX_USER_ABORT, -ret, "KSD");
 error_kill_call:
-	afs_end_call(call);
+	afs_put_call(call);
 	_leave(" = %d", ret);
 	return ret;
 }
@@ -448,7 +476,7 @@
 
 done:
 	if (call->state == AFS_CALL_COMPLETE && call->incoming)
-		afs_end_call(call);
+		afs_put_call(call);
 out:
 	_leave("");
 	return;
@@ -505,7 +533,7 @@
 	}
 
 	_debug("call complete");
-	afs_end_call(call);
+	afs_put_call(call);
 	_leave(" = %d", ret);
 	return ret;
 }
@@ -529,14 +557,25 @@
 				   unsigned long call_user_ID)
 {
 	struct afs_call *call = (struct afs_call *)call_user_ID;
+	int u;
 
 	trace_afs_notify_call(rxcall, call);
 	call->need_attention = true;
-	queue_work(afs_async_calls, &call->async_work);
+
+	u = __atomic_add_unless(&call->usage, 1, 0);
+	if (u != 0) {
+		trace_afs_call(call, afs_call_trace_wake, u,
+			       atomic_read(&afs_outstanding_calls),
+			       __builtin_return_address(0));
+
+		if (!queue_work(afs_async_calls, &call->async_work))
+			afs_put_call(call);
+	}
 }
 
 /*
- * delete an asynchronous call
+ * Delete an asynchronous call.  The work item carries a ref to the call struct
+ * that we need to release.
  */
 static void afs_delete_async_call(struct work_struct *work)
 {
@@ -544,13 +583,14 @@
 
 	_enter("");
 
-	afs_free_call(call);
+	afs_put_call(call);
 
 	_leave("");
 }
 
 /*
- * perform processing on an asynchronous call
+ * Perform I/O processing on an asynchronous call.  The work item carries a ref
+ * to the call struct that we either need to release or to pass on.
  */
 static void afs_process_async_call(struct work_struct *work)
 {
@@ -566,15 +606,16 @@
 	if (call->state == AFS_CALL_COMPLETE) {
 		call->reply = NULL;
 
-		/* kill the call */
-		afs_end_call_nofree(call);
-
-		/* we can't just delete the call because the work item may be
-		 * queued */
+		/* We have two refs to release - one from the alloc and one
+		 * queued with the work item - and we can't just deallocate the
+		 * call because the work item may be queued again.
+		 */
 		call->async_work.func = afs_delete_async_call;
-		queue_work(afs_async_calls, &call->async_work);
+		if (!queue_work(afs_async_calls, &call->async_work))
+			afs_put_call(call);
 	}
 
+	afs_put_call(call);
 	_leave("");
 }
 
@@ -594,12 +635,10 @@
 
 	for (;;) {
 		if (!call) {
-			call = kzalloc(sizeof(struct afs_call), GFP_KERNEL);
+			call = afs_alloc_call(&afs_RXCMxxxx, GFP_KERNEL);
 			if (!call)
 				break;
 
-			INIT_WORK(&call->async_work, afs_process_async_call);
-			call->type = &afs_RXCMxxxx;
 			call->async = true;
 			call->state = AFS_CALL_AWAIT_OP_ID;
 			init_waitqueue_head(&call->waitq);
@@ -624,9 +663,8 @@
 {
 	struct afs_call *call = (struct afs_call *)user_call_ID;
 
-	atomic_inc(&afs_outstanding_calls);
 	call->rxcall = NULL;
-	afs_free_call(call);
+	afs_put_call(call);
 }
 
 /*
@@ -635,7 +673,6 @@
 static void afs_rx_new_call(struct sock *sk, struct rxrpc_call *rxcall,
 			    unsigned long user_call_ID)
 {
-	atomic_inc(&afs_outstanding_calls);
 	queue_work(afs_wq, &afs_charge_preallocation_work);
 }
 
@@ -699,7 +736,6 @@
 		rxrpc_kernel_abort_call(afs_socket, call->rxcall,
 					RX_USER_ABORT, ENOMEM, "KOO");
 	default:
-		afs_end_call(call);
 		_leave(" [error]");
 		return;
 	}
@@ -738,7 +774,6 @@
 		rxrpc_kernel_abort_call(afs_socket, call->rxcall,
 					RX_USER_ABORT, ENOMEM, "KOO");
 	}
-	afs_end_call(call);
 	_leave(" [error]");
 }