rxrpc: Don't expose skbs to in-kernel users [ver #2]

Don't expose skbs to in-kernel users, such as the AFS filesystem, but
instead provide a notification hook the indicates that a call needs
attention and another that indicates that there's a new call to be
collected.

This makes the following possibilities more achievable:

 (1) Call refcounting can be made simpler if skbs don't hold refs to calls.

 (2) skbs referring to non-data events will be able to be freed much sooner
     rather than being queued for AFS to pick up as rxrpc_kernel_recv_data
     will be able to consult the call state.

 (3) We can shortcut the receive phase when a call is remotely aborted
     because we don't have to go through all the packets to get to the one
     cancelling the operation.

 (4) It makes it easier to do encryption/decryption directly between AFS's
     buffers and sk_buffs.

 (5) Encryption/decryption can more easily be done in the AFS's thread
     contexts - usually that of the userspace process that issued a syscall
     - rather than in one of rxrpc's background threads on a workqueue.

 (6) AFS will be able to wait synchronously on a call inside AF_RXRPC.

To make this work, the following interface function has been added:

     int rxrpc_kernel_recv_data(
		struct socket *sock, struct rxrpc_call *call,
		void *buffer, size_t bufsize, size_t *_offset,
		bool want_more, u32 *_abort_code);

This is the recvmsg equivalent.  It allows the caller to find out about the
state of a specific call and to transfer received data into a buffer
piecemeal.

afs_extract_data() and rxrpc_kernel_recv_data() now do all the extraction
logic between them.  They don't wait synchronously yet because the socket
lock needs to be dealt with.

Five interface functions have been removed:

	rxrpc_kernel_is_data_last()
    	rxrpc_kernel_get_abort_code()
    	rxrpc_kernel_get_error_number()
    	rxrpc_kernel_free_skb()
    	rxrpc_kernel_data_consumed()

As a temporary hack, sk_buffs going to an in-kernel call are queued on the
rxrpc_call struct (->knlrecv_queue) rather than being handed over to the
in-kernel user.  To process the queue internally, a temporary function,
temp_deliver_data() has been added.  This will be replaced with common code
between the rxrpc_recvmsg() path and the kernel_rxrpc_recv_data() path in a
future patch.

Signed-off-by: David Howells <dhowells@redhat.com>
Signed-off-by: David S. Miller <davem@davemloft.net>
diff --git a/net/rxrpc/recvmsg.c b/net/rxrpc/recvmsg.c
index c9b38c7..0ab7b33 100644
--- a/net/rxrpc/recvmsg.c
+++ b/net/rxrpc/recvmsg.c
@@ -369,55 +369,178 @@
 
 }
 
-/**
- * rxrpc_kernel_is_data_last - Determine if data message is last one
- * @skb: Message holding data
+/*
+ * Deliver messages to a call.  This keeps processing packets until the buffer
+ * is filled and we find either more DATA (returns 0) or the end of the DATA
+ * (returns 1).  If more packets are required, it returns -EAGAIN.
  *
- * Determine if data message is last one for the parent call.
+ * TODO: Note that this is hacked in at the moment and will be replaced.
  */
-bool rxrpc_kernel_is_data_last(struct sk_buff *skb)
+static int temp_deliver_data(struct socket *sock, struct rxrpc_call *call,
+			     struct iov_iter *iter, size_t size,
+			     size_t *_offset)
 {
-	struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
+	struct rxrpc_skb_priv *sp;
+	struct sk_buff *skb;
+	size_t remain;
+	int ret, copy;
 
-	ASSERTCMP(skb->mark, ==, RXRPC_SKB_MARK_DATA);
+	_enter("%d", call->debug_id);
 
-	return sp->hdr.flags & RXRPC_LAST_PACKET;
-}
+next:
+	local_bh_disable();
+	skb = skb_dequeue(&call->knlrecv_queue);
+	local_bh_enable();
+	if (!skb) {
+		if (test_bit(RXRPC_CALL_RX_NO_MORE, &call->flags))
+			return 1;
+		_leave(" = -EAGAIN [empty]");
+		return -EAGAIN;
+	}
 
-EXPORT_SYMBOL(rxrpc_kernel_is_data_last);
-
-/**
- * rxrpc_kernel_get_abort_code - Get the abort code from an RxRPC abort message
- * @skb: Message indicating an abort
- *
- * Get the abort code from an RxRPC abort message.
- */
-u32 rxrpc_kernel_get_abort_code(struct sk_buff *skb)
-{
-	struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
+	sp = rxrpc_skb(skb);
+	_debug("dequeued %p %u/%zu", skb, sp->offset, size);
 
 	switch (skb->mark) {
-	case RXRPC_SKB_MARK_REMOTE_ABORT:
-	case RXRPC_SKB_MARK_LOCAL_ABORT:
-		return sp->call->abort_code;
-	default:
-		BUG();
-	}
-}
+	case RXRPC_SKB_MARK_DATA:
+		remain = size - *_offset;
+		if (remain > 0) {
+			copy = skb->len - sp->offset;
+			if (copy > remain)
+				copy = remain;
+			ret = skb_copy_datagram_iter(skb, sp->offset, iter,
+						     copy);
+			if (ret < 0)
+				goto requeue_and_leave;
 
-EXPORT_SYMBOL(rxrpc_kernel_get_abort_code);
+			/* handle piecemeal consumption of data packets */
+			sp->offset += copy;
+			*_offset += copy;
+		}
+
+		if (sp->offset < skb->len)
+			goto partially_used_skb;
+
+		/* We consumed the whole packet */
+		ASSERTCMP(sp->offset, ==, skb->len);
+		if (sp->hdr.flags & RXRPC_LAST_PACKET)
+			set_bit(RXRPC_CALL_RX_NO_MORE, &call->flags);
+		rxrpc_kernel_data_consumed(call, skb);
+		rxrpc_free_skb(skb);
+		goto next;
+
+	default:
+		rxrpc_free_skb(skb);
+		goto next;
+	}
+
+partially_used_skb:
+	ASSERTCMP(*_offset, ==, size);
+	ret = 0;
+requeue_and_leave:
+	skb_queue_head(&call->knlrecv_queue, skb);
+	return ret;
+}
 
 /**
- * rxrpc_kernel_get_error - Get the error number from an RxRPC error message
- * @skb: Message indicating an error
+ * rxrpc_kernel_recv_data - Allow a kernel service to receive data/info
+ * @sock: The socket that the call exists on
+ * @call: The call to send data through
+ * @buf: The buffer to receive into
+ * @size: The size of the buffer, including data already read
+ * @_offset: The running offset into the buffer.
+ * @want_more: True if more data is expected to be read
+ * @_abort: Where the abort code is stored if -ECONNABORTED is returned
  *
- * Get the error number from an RxRPC error message.
+ * Allow a kernel service to receive data and pick up information about the
+ * state of a call.  Returns 0 if got what was asked for and there's more
+ * available, 1 if we got what was asked for and we're at the end of the data
+ * and -EAGAIN if we need more data.
+ *
+ * Note that we may return -EAGAIN to drain empty packets at the end of the
+ * data, even if we've already copied over the requested data.
+ *
+ * This function adds the amount it transfers to *_offset, so this should be
+ * precleared as appropriate.  Note that the amount remaining in the buffer is
+ * taken to be size - *_offset.
+ *
+ * *_abort should also be initialised to 0.
  */
-int rxrpc_kernel_get_error_number(struct sk_buff *skb)
+int rxrpc_kernel_recv_data(struct socket *sock, struct rxrpc_call *call,
+			   void *buf, size_t size, size_t *_offset,
+			   bool want_more, u32 *_abort)
 {
-	struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
+	struct iov_iter iter;
+	struct kvec iov;
+	int ret;
 
-	return sp->error;
+	_enter("{%d,%s},%zu,%d",
+	       call->debug_id, rxrpc_call_states[call->state], size, want_more);
+
+	ASSERTCMP(*_offset, <=, size);
+	ASSERTCMP(call->state, !=, RXRPC_CALL_SERVER_ACCEPTING);
+
+	iov.iov_base = buf + *_offset;
+	iov.iov_len = size - *_offset;
+	iov_iter_kvec(&iter, ITER_KVEC | READ, &iov, 1, size - *_offset);
+
+	lock_sock(sock->sk);
+
+	switch (call->state) {
+	case RXRPC_CALL_CLIENT_RECV_REPLY:
+	case RXRPC_CALL_SERVER_RECV_REQUEST:
+	case RXRPC_CALL_SERVER_ACK_REQUEST:
+		ret = temp_deliver_data(sock, call, &iter, size, _offset);
+		if (ret < 0)
+			goto out;
+
+		/* We can only reach here with a partially full buffer if we
+		 * have reached the end of the data.  We must otherwise have a
+		 * full buffer or have been given -EAGAIN.
+		 */
+		if (ret == 1) {
+			if (*_offset < size)
+				goto short_data;
+			if (!want_more)
+				goto read_phase_complete;
+			ret = 0;
+			goto out;
+		}
+
+		if (!want_more)
+			goto excess_data;
+		goto out;
+
+	case RXRPC_CALL_COMPLETE:
+		goto call_complete;
+
+	default:
+		*_offset = 0;
+		ret = -EINPROGRESS;
+		goto out;
+	}
+
+read_phase_complete:
+	ret = 1;
+out:
+	release_sock(sock->sk);
+	_leave(" = %d [%zu,%d]", ret, *_offset, *_abort);
+	return ret;
+
+short_data:
+	ret = -EBADMSG;
+	goto out;
+excess_data:
+	ret = -EMSGSIZE;
+	goto out;
+call_complete:
+	*_abort = call->abort_code;
+	ret = call->error;
+	if (call->completion == RXRPC_CALL_SUCCEEDED) {
+		ret = 1;
+		if (size > 0)
+			ret = -ECONNRESET;
+	}
+	goto out;
 }
-
-EXPORT_SYMBOL(rxrpc_kernel_get_error_number);
+EXPORT_SYMBOL(rxrpc_kernel_recv_data);