rxrpc: Don't expose skbs to in-kernel users [ver #2]

Don't expose skbs to in-kernel users, such as the AFS filesystem, but
instead provide a notification hook the indicates that a call needs
attention and another that indicates that there's a new call to be
collected.

This makes the following possibilities more achievable:

 (1) Call refcounting can be made simpler if skbs don't hold refs to calls.

 (2) skbs referring to non-data events will be able to be freed much sooner
     rather than being queued for AFS to pick up as rxrpc_kernel_recv_data
     will be able to consult the call state.

 (3) We can shortcut the receive phase when a call is remotely aborted
     because we don't have to go through all the packets to get to the one
     cancelling the operation.

 (4) It makes it easier to do encryption/decryption directly between AFS's
     buffers and sk_buffs.

 (5) Encryption/decryption can more easily be done in the AFS's thread
     contexts - usually that of the userspace process that issued a syscall
     - rather than in one of rxrpc's background threads on a workqueue.

 (6) AFS will be able to wait synchronously on a call inside AF_RXRPC.

To make this work, the following interface function has been added:

     int rxrpc_kernel_recv_data(
		struct socket *sock, struct rxrpc_call *call,
		void *buffer, size_t bufsize, size_t *_offset,
		bool want_more, u32 *_abort_code);

This is the recvmsg equivalent.  It allows the caller to find out about the
state of a specific call and to transfer received data into a buffer
piecemeal.

afs_extract_data() and rxrpc_kernel_recv_data() now do all the extraction
logic between them.  They don't wait synchronously yet because the socket
lock needs to be dealt with.

Five interface functions have been removed:

	rxrpc_kernel_is_data_last()
    	rxrpc_kernel_get_abort_code()
    	rxrpc_kernel_get_error_number()
    	rxrpc_kernel_free_skb()
    	rxrpc_kernel_data_consumed()

As a temporary hack, sk_buffs going to an in-kernel call are queued on the
rxrpc_call struct (->knlrecv_queue) rather than being handed over to the
in-kernel user.  To process the queue internally, a temporary function,
temp_deliver_data() has been added.  This will be replaced with common code
between the rxrpc_recvmsg() path and the kernel_rxrpc_recv_data() path in a
future patch.

Signed-off-by: David Howells <dhowells@redhat.com>
Signed-off-by: David S. Miller <davem@davemloft.net>
diff --git a/net/rxrpc/af_rxrpc.c b/net/rxrpc/af_rxrpc.c
index e07c91a..32d5449 100644
--- a/net/rxrpc/af_rxrpc.c
+++ b/net/rxrpc/af_rxrpc.c
@@ -231,6 +231,8 @@
  * @srx: The address of the peer to contact
  * @key: The security context to use (defaults to socket setting)
  * @user_call_ID: The ID to use
+ * @gfp: The allocation constraints
+ * @notify_rx: Where to send notifications instead of socket queue
  *
  * Allow a kernel service to begin a call on the nominated socket.  This just
  * sets up all the internal tracking structures and allocates connection and
@@ -243,7 +245,8 @@
 					   struct sockaddr_rxrpc *srx,
 					   struct key *key,
 					   unsigned long user_call_ID,
-					   gfp_t gfp)
+					   gfp_t gfp,
+					   rxrpc_notify_rx_t notify_rx)
 {
 	struct rxrpc_conn_parameters cp;
 	struct rxrpc_call *call;
@@ -270,6 +273,8 @@
 	cp.exclusive		= false;
 	cp.service_id		= srx->srx_service;
 	call = rxrpc_new_client_call(rx, &cp, srx, user_call_ID, gfp);
+	if (!IS_ERR(call))
+		call->notify_rx = notify_rx;
 
 	release_sock(&rx->sk);
 	_leave(" = %p", call);
@@ -289,31 +294,27 @@
 {
 	_enter("%d{%d}", call->debug_id, atomic_read(&call->usage));
 	rxrpc_remove_user_ID(rxrpc_sk(sock->sk), call);
+	rxrpc_purge_queue(&call->knlrecv_queue);
 	rxrpc_put_call(call);
 }
 EXPORT_SYMBOL(rxrpc_kernel_end_call);
 
 /**
- * rxrpc_kernel_intercept_rx_messages - Intercept received RxRPC messages
+ * rxrpc_kernel_new_call_notification - Get notifications of new calls
  * @sock: The socket to intercept received messages on
- * @interceptor: The function to pass the messages to
+ * @notify_new_call: Function to be called when new calls appear
  *
- * Allow a kernel service to intercept messages heading for the Rx queue on an
- * RxRPC socket.  They get passed to the specified function instead.
- * @interceptor should free the socket buffers it is given.  @interceptor is
- * called with the socket receive queue spinlock held and softirqs disabled -
- * this ensures that the messages will be delivered in the right order.
+ * Allow a kernel service to be given notifications about new calls.
  */
-void rxrpc_kernel_intercept_rx_messages(struct socket *sock,
-					rxrpc_interceptor_t interceptor)
+void rxrpc_kernel_new_call_notification(
+	struct socket *sock,
+	rxrpc_notify_new_call_t notify_new_call)
 {
 	struct rxrpc_sock *rx = rxrpc_sk(sock->sk);
 
-	_enter("");
-	rx->interceptor = interceptor;
+	rx->notify_new_call = notify_new_call;
 }
-
-EXPORT_SYMBOL(rxrpc_kernel_intercept_rx_messages);
+EXPORT_SYMBOL(rxrpc_kernel_new_call_notification);
 
 /*
  * connect an RxRPC socket
diff --git a/net/rxrpc/ar-internal.h b/net/rxrpc/ar-internal.h
index 0c320b2..4e86d24 100644
--- a/net/rxrpc/ar-internal.h
+++ b/net/rxrpc/ar-internal.h
@@ -40,6 +40,20 @@
 struct rxrpc_connection;
 
 /*
+ * Mark applied to socket buffers.
+ */
+enum rxrpc_skb_mark {
+	RXRPC_SKB_MARK_DATA,		/* data message */
+	RXRPC_SKB_MARK_FINAL_ACK,	/* final ACK received message */
+	RXRPC_SKB_MARK_BUSY,		/* server busy message */
+	RXRPC_SKB_MARK_REMOTE_ABORT,	/* remote abort message */
+	RXRPC_SKB_MARK_LOCAL_ABORT,	/* local abort message */
+	RXRPC_SKB_MARK_NET_ERROR,	/* network error message */
+	RXRPC_SKB_MARK_LOCAL_ERROR,	/* local error message */
+	RXRPC_SKB_MARK_NEW_CALL,	/* local error message */
+};
+
+/*
  * sk_state for RxRPC sockets
  */
 enum {
@@ -57,7 +71,7 @@
 struct rxrpc_sock {
 	/* WARNING: sk has to be the first member */
 	struct sock		sk;
-	rxrpc_interceptor_t	interceptor;	/* kernel service Rx interceptor function */
+	rxrpc_notify_new_call_t	notify_new_call; /* Func to notify of new call */
 	struct rxrpc_local	*local;		/* local endpoint */
 	struct list_head	listen_link;	/* link in the local endpoint's listen list */
 	struct list_head	secureq;	/* calls awaiting connection security clearance */
@@ -367,6 +381,7 @@
 	RXRPC_CALL_EXPECT_OOS,		/* expect out of sequence packets */
 	RXRPC_CALL_IS_SERVICE,		/* Call is service call */
 	RXRPC_CALL_EXPOSED,		/* The call was exposed to the world */
+	RXRPC_CALL_RX_NO_MORE,		/* Don't indicate MSG_MORE from recvmsg() */
 };
 
 /*
@@ -441,6 +456,7 @@
 	struct timer_list	resend_timer;	/* Tx resend timer */
 	struct work_struct	destroyer;	/* call destroyer */
 	struct work_struct	processor;	/* packet processor and ACK generator */
+	rxrpc_notify_rx_t	notify_rx;	/* kernel service Rx notification function */
 	struct list_head	link;		/* link in master call list */
 	struct list_head	chan_wait_link;	/* Link in conn->waiting_calls */
 	struct hlist_node	error_link;	/* link in error distribution list */
@@ -448,6 +464,7 @@
 	struct rb_node		sock_node;	/* node in socket call tree */
 	struct sk_buff_head	rx_queue;	/* received packets */
 	struct sk_buff_head	rx_oos_queue;	/* packets received out of sequence */
+	struct sk_buff_head	knlrecv_queue;	/* Queue for kernel_recv [TODO: replace this] */
 	struct sk_buff		*tx_pending;	/* Tx socket buffer being filled */
 	wait_queue_head_t	waitq;		/* Wait queue for channel or Tx */
 	__be32			crypto_buf[2];	/* Temporary packet crypto buffer */
@@ -512,7 +529,8 @@
  * call_accept.c
  */
 void rxrpc_accept_incoming_calls(struct rxrpc_local *);
-struct rxrpc_call *rxrpc_accept_call(struct rxrpc_sock *, unsigned long);
+struct rxrpc_call *rxrpc_accept_call(struct rxrpc_sock *, unsigned long,
+				     rxrpc_notify_rx_t);
 int rxrpc_reject_call(struct rxrpc_sock *);
 
 /*
@@ -874,6 +892,7 @@
 /*
  * skbuff.c
  */
+void rxrpc_kernel_data_consumed(struct rxrpc_call *, struct sk_buff *);
 void rxrpc_packet_destructor(struct sk_buff *);
 void rxrpc_new_skb(struct sk_buff *);
 void rxrpc_see_skb(struct sk_buff *);
diff --git a/net/rxrpc/call_accept.c b/net/rxrpc/call_accept.c
index 03af88f..68a439e 100644
--- a/net/rxrpc/call_accept.c
+++ b/net/rxrpc/call_accept.c
@@ -286,7 +286,8 @@
  * - assign the user call ID to the call at the front of the queue
  */
 struct rxrpc_call *rxrpc_accept_call(struct rxrpc_sock *rx,
-				     unsigned long user_call_ID)
+				     unsigned long user_call_ID,
+				     rxrpc_notify_rx_t notify_rx)
 {
 	struct rxrpc_call *call;
 	struct rb_node *parent, **pp;
@@ -340,6 +341,7 @@
 	}
 
 	/* formalise the acceptance */
+	call->notify_rx = notify_rx;
 	call->user_call_ID = user_call_ID;
 	rb_link_node(&call->sock_node, parent, pp);
 	rb_insert_color(&call->sock_node, &rx->calls);
@@ -437,17 +439,20 @@
  * rxrpc_kernel_accept_call - Allow a kernel service to accept an incoming call
  * @sock: The socket on which the impending call is waiting
  * @user_call_ID: The tag to attach to the call
+ * @notify_rx: Where to send notifications instead of socket queue
  *
  * Allow a kernel service to accept an incoming call, assuming the incoming
- * call is still valid.
+ * call is still valid.  The caller should immediately trigger their own
+ * notification as there must be data waiting.
  */
 struct rxrpc_call *rxrpc_kernel_accept_call(struct socket *sock,
-					    unsigned long user_call_ID)
+					    unsigned long user_call_ID,
+					    rxrpc_notify_rx_t notify_rx)
 {
 	struct rxrpc_call *call;
 
 	_enter(",%lx", user_call_ID);
-	call = rxrpc_accept_call(rxrpc_sk(sock->sk), user_call_ID);
+	call = rxrpc_accept_call(rxrpc_sk(sock->sk), user_call_ID, notify_rx);
 	_leave(" = %p", call);
 	return call;
 }
diff --git a/net/rxrpc/call_object.c b/net/rxrpc/call_object.c
index 104ee8b..516d8ea 100644
--- a/net/rxrpc/call_object.c
+++ b/net/rxrpc/call_object.c
@@ -136,6 +136,7 @@
 	INIT_LIST_HEAD(&call->accept_link);
 	skb_queue_head_init(&call->rx_queue);
 	skb_queue_head_init(&call->rx_oos_queue);
+	skb_queue_head_init(&call->knlrecv_queue);
 	init_waitqueue_head(&call->waitq);
 	spin_lock_init(&call->lock);
 	rwlock_init(&call->state_lock);
@@ -552,8 +553,6 @@
 			spin_lock_bh(&call->lock);
 		}
 		spin_unlock_bh(&call->lock);
-
-		ASSERTCMP(call->state, !=, RXRPC_CALL_COMPLETE);
 	}
 
 	del_timer_sync(&call->resend_timer);
@@ -682,6 +681,7 @@
 	struct rxrpc_call *call = container_of(rcu, struct rxrpc_call, rcu);
 
 	rxrpc_purge_queue(&call->rx_queue);
+	rxrpc_purge_queue(&call->knlrecv_queue);
 	rxrpc_put_peer(call->peer);
 	kmem_cache_free(rxrpc_call_jar, call);
 }
@@ -737,6 +737,7 @@
 
 	rxrpc_purge_queue(&call->rx_queue);
 	ASSERT(skb_queue_empty(&call->rx_oos_queue));
+	rxrpc_purge_queue(&call->knlrecv_queue);
 	sock_put(&call->socket->sk);
 	call_rcu(&call->rcu, rxrpc_rcu_destroy_call);
 }
diff --git a/net/rxrpc/conn_event.c b/net/rxrpc/conn_event.c
index bc9b059..9db90f4 100644
--- a/net/rxrpc/conn_event.c
+++ b/net/rxrpc/conn_event.c
@@ -282,7 +282,6 @@
 	case RXRPC_PACKET_TYPE_DATA:
 	case RXRPC_PACKET_TYPE_ACK:
 		rxrpc_conn_retransmit_call(conn, skb);
-		rxrpc_free_skb(skb);
 		return 0;
 
 	case RXRPC_PACKET_TYPE_ABORT:
diff --git a/net/rxrpc/input.c b/net/rxrpc/input.c
index 86bea9a..72f016c 100644
--- a/net/rxrpc/input.c
+++ b/net/rxrpc/input.c
@@ -90,9 +90,15 @@
 		}
 
 		/* allow interception by a kernel service */
-		if (rx->interceptor) {
-			rx->interceptor(sk, call->user_call_ID, skb);
+		if (skb->mark == RXRPC_SKB_MARK_NEW_CALL &&
+		    rx->notify_new_call) {
 			spin_unlock_bh(&sk->sk_receive_queue.lock);
+			skb_queue_tail(&call->knlrecv_queue, skb);
+			rx->notify_new_call(&rx->sk);
+		} else if (call->notify_rx) {
+			spin_unlock_bh(&sk->sk_receive_queue.lock);
+			skb_queue_tail(&call->knlrecv_queue, skb);
+			call->notify_rx(&rx->sk, call, call->user_call_ID);
 		} else {
 			_net("post skb %p", skb);
 			__skb_queue_tail(&sk->sk_receive_queue, skb);
diff --git a/net/rxrpc/output.c b/net/rxrpc/output.c
index b1e708a..817ae80 100644
--- a/net/rxrpc/output.c
+++ b/net/rxrpc/output.c
@@ -190,7 +190,7 @@
 	if (cmd == RXRPC_CMD_ACCEPT) {
 		if (rx->sk.sk_state != RXRPC_SERVER_LISTENING)
 			return -EINVAL;
-		call = rxrpc_accept_call(rx, user_call_ID);
+		call = rxrpc_accept_call(rx, user_call_ID, NULL);
 		if (IS_ERR(call))
 			return PTR_ERR(call);
 		rxrpc_put_call(call);
diff --git a/net/rxrpc/recvmsg.c b/net/rxrpc/recvmsg.c
index c9b38c7..0ab7b33 100644
--- a/net/rxrpc/recvmsg.c
+++ b/net/rxrpc/recvmsg.c
@@ -369,55 +369,178 @@
 
 }
 
-/**
- * rxrpc_kernel_is_data_last - Determine if data message is last one
- * @skb: Message holding data
+/*
+ * Deliver messages to a call.  This keeps processing packets until the buffer
+ * is filled and we find either more DATA (returns 0) or the end of the DATA
+ * (returns 1).  If more packets are required, it returns -EAGAIN.
  *
- * Determine if data message is last one for the parent call.
+ * TODO: Note that this is hacked in at the moment and will be replaced.
  */
-bool rxrpc_kernel_is_data_last(struct sk_buff *skb)
+static int temp_deliver_data(struct socket *sock, struct rxrpc_call *call,
+			     struct iov_iter *iter, size_t size,
+			     size_t *_offset)
 {
-	struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
+	struct rxrpc_skb_priv *sp;
+	struct sk_buff *skb;
+	size_t remain;
+	int ret, copy;
 
-	ASSERTCMP(skb->mark, ==, RXRPC_SKB_MARK_DATA);
+	_enter("%d", call->debug_id);
 
-	return sp->hdr.flags & RXRPC_LAST_PACKET;
-}
+next:
+	local_bh_disable();
+	skb = skb_dequeue(&call->knlrecv_queue);
+	local_bh_enable();
+	if (!skb) {
+		if (test_bit(RXRPC_CALL_RX_NO_MORE, &call->flags))
+			return 1;
+		_leave(" = -EAGAIN [empty]");
+		return -EAGAIN;
+	}
 
-EXPORT_SYMBOL(rxrpc_kernel_is_data_last);
-
-/**
- * rxrpc_kernel_get_abort_code - Get the abort code from an RxRPC abort message
- * @skb: Message indicating an abort
- *
- * Get the abort code from an RxRPC abort message.
- */
-u32 rxrpc_kernel_get_abort_code(struct sk_buff *skb)
-{
-	struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
+	sp = rxrpc_skb(skb);
+	_debug("dequeued %p %u/%zu", skb, sp->offset, size);
 
 	switch (skb->mark) {
-	case RXRPC_SKB_MARK_REMOTE_ABORT:
-	case RXRPC_SKB_MARK_LOCAL_ABORT:
-		return sp->call->abort_code;
-	default:
-		BUG();
-	}
-}
+	case RXRPC_SKB_MARK_DATA:
+		remain = size - *_offset;
+		if (remain > 0) {
+			copy = skb->len - sp->offset;
+			if (copy > remain)
+				copy = remain;
+			ret = skb_copy_datagram_iter(skb, sp->offset, iter,
+						     copy);
+			if (ret < 0)
+				goto requeue_and_leave;
 
-EXPORT_SYMBOL(rxrpc_kernel_get_abort_code);
+			/* handle piecemeal consumption of data packets */
+			sp->offset += copy;
+			*_offset += copy;
+		}
+
+		if (sp->offset < skb->len)
+			goto partially_used_skb;
+
+		/* We consumed the whole packet */
+		ASSERTCMP(sp->offset, ==, skb->len);
+		if (sp->hdr.flags & RXRPC_LAST_PACKET)
+			set_bit(RXRPC_CALL_RX_NO_MORE, &call->flags);
+		rxrpc_kernel_data_consumed(call, skb);
+		rxrpc_free_skb(skb);
+		goto next;
+
+	default:
+		rxrpc_free_skb(skb);
+		goto next;
+	}
+
+partially_used_skb:
+	ASSERTCMP(*_offset, ==, size);
+	ret = 0;
+requeue_and_leave:
+	skb_queue_head(&call->knlrecv_queue, skb);
+	return ret;
+}
 
 /**
- * rxrpc_kernel_get_error - Get the error number from an RxRPC error message
- * @skb: Message indicating an error
+ * rxrpc_kernel_recv_data - Allow a kernel service to receive data/info
+ * @sock: The socket that the call exists on
+ * @call: The call to send data through
+ * @buf: The buffer to receive into
+ * @size: The size of the buffer, including data already read
+ * @_offset: The running offset into the buffer.
+ * @want_more: True if more data is expected to be read
+ * @_abort: Where the abort code is stored if -ECONNABORTED is returned
  *
- * Get the error number from an RxRPC error message.
+ * Allow a kernel service to receive data and pick up information about the
+ * state of a call.  Returns 0 if got what was asked for and there's more
+ * available, 1 if we got what was asked for and we're at the end of the data
+ * and -EAGAIN if we need more data.
+ *
+ * Note that we may return -EAGAIN to drain empty packets at the end of the
+ * data, even if we've already copied over the requested data.
+ *
+ * This function adds the amount it transfers to *_offset, so this should be
+ * precleared as appropriate.  Note that the amount remaining in the buffer is
+ * taken to be size - *_offset.
+ *
+ * *_abort should also be initialised to 0.
  */
-int rxrpc_kernel_get_error_number(struct sk_buff *skb)
+int rxrpc_kernel_recv_data(struct socket *sock, struct rxrpc_call *call,
+			   void *buf, size_t size, size_t *_offset,
+			   bool want_more, u32 *_abort)
 {
-	struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
+	struct iov_iter iter;
+	struct kvec iov;
+	int ret;
 
-	return sp->error;
+	_enter("{%d,%s},%zu,%d",
+	       call->debug_id, rxrpc_call_states[call->state], size, want_more);
+
+	ASSERTCMP(*_offset, <=, size);
+	ASSERTCMP(call->state, !=, RXRPC_CALL_SERVER_ACCEPTING);
+
+	iov.iov_base = buf + *_offset;
+	iov.iov_len = size - *_offset;
+	iov_iter_kvec(&iter, ITER_KVEC | READ, &iov, 1, size - *_offset);
+
+	lock_sock(sock->sk);
+
+	switch (call->state) {
+	case RXRPC_CALL_CLIENT_RECV_REPLY:
+	case RXRPC_CALL_SERVER_RECV_REQUEST:
+	case RXRPC_CALL_SERVER_ACK_REQUEST:
+		ret = temp_deliver_data(sock, call, &iter, size, _offset);
+		if (ret < 0)
+			goto out;
+
+		/* We can only reach here with a partially full buffer if we
+		 * have reached the end of the data.  We must otherwise have a
+		 * full buffer or have been given -EAGAIN.
+		 */
+		if (ret == 1) {
+			if (*_offset < size)
+				goto short_data;
+			if (!want_more)
+				goto read_phase_complete;
+			ret = 0;
+			goto out;
+		}
+
+		if (!want_more)
+			goto excess_data;
+		goto out;
+
+	case RXRPC_CALL_COMPLETE:
+		goto call_complete;
+
+	default:
+		*_offset = 0;
+		ret = -EINPROGRESS;
+		goto out;
+	}
+
+read_phase_complete:
+	ret = 1;
+out:
+	release_sock(sock->sk);
+	_leave(" = %d [%zu,%d]", ret, *_offset, *_abort);
+	return ret;
+
+short_data:
+	ret = -EBADMSG;
+	goto out;
+excess_data:
+	ret = -EMSGSIZE;
+	goto out;
+call_complete:
+	*_abort = call->abort_code;
+	ret = call->error;
+	if (call->completion == RXRPC_CALL_SUCCEEDED) {
+		ret = 1;
+		if (size > 0)
+			ret = -ECONNRESET;
+	}
+	goto out;
 }
-
-EXPORT_SYMBOL(rxrpc_kernel_get_error_number);
+EXPORT_SYMBOL(rxrpc_kernel_recv_data);
diff --git a/net/rxrpc/skbuff.c b/net/rxrpc/skbuff.c
index 2052920..9752f8b 100644
--- a/net/rxrpc/skbuff.c
+++ b/net/rxrpc/skbuff.c
@@ -127,7 +127,6 @@
 	call->rx_data_recv = sp->hdr.seq;
 	rxrpc_hard_ACK_data(call, skb);
 }
-EXPORT_SYMBOL(rxrpc_kernel_data_consumed);
 
 /*
  * Destroy a packet that has an RxRPC control buffer