udp: do fwd memory scheduling on dequeue

A new argument is added to __skb_recv_datagram to provide
an explicit skb destructor, invoked under the receive queue
lock.
The UDP protocol uses such argument to perform memory
reclaiming on dequeue, so that the UDP protocol does not
set anymore skb->desctructor.
Instead explicit memory reclaiming is performed at close() time and
when skbs are removed from the receive queue.
The in kernel UDP protocol users now need to call a
skb_recv_udp() variant instead of skb_recv_datagram() to
properly perform memory accounting on dequeue.

Overall, this allows acquiring only once the receive queue
lock on dequeue.

Tested using pktgen with random src port, 64 bytes packet,
wire-speed on a 10G link as sender and udp_sink as the receiver,
using an l4 tuple rxhash to stress the contention, and one or more
udp_sink instances with reuseport.

nr sinks	vanilla		patched
1		440		560
3		2150		2300
6		3650		3800
9		4450		4600
12		6250		6450

v1 -> v2:
 - do rmem and allocated memory scheduling under the receive lock
 - do bulk scheduling in first_packet_length() and in udp_destruct_sock()
 - avoid the typdef for the dequeue callback

Suggested-by: Eric Dumazet <edumazet@google.com>
Acked-by: Hannes Frederic Sowa <hannes@stressinduktion.org>
Signed-off-by: Paolo Abeni <pabeni@redhat.com>
Acked-by: Eric Dumazet <edumazet@google.com>
Signed-off-by: David S. Miller <davem@davemloft.net>
diff --git a/net/rxrpc/input.c b/net/rxrpc/input.c
index 44fb8d8..1d87b54 100644
--- a/net/rxrpc/input.c
+++ b/net/rxrpc/input.c
@@ -1053,7 +1053,7 @@
 
 	ASSERT(!irqs_disabled());
 
-	skb = skb_recv_datagram(udp_sk, 0, 1, &ret);
+	skb = skb_recv_udp(udp_sk, 0, 1, &ret);
 	if (!skb) {
 		if (ret == -EAGAIN)
 			return;
@@ -1075,10 +1075,9 @@
 
 	__UDP_INC_STATS(&init_net, UDP_MIB_INDATAGRAMS, 0);
 
-	/* The socket buffer we have is owned by UDP, with UDP's data all over
-	 * it, but we really want our own data there.
+	/* The UDP protocol already released all skb resources;
+	 * we are free to add our own data there.
 	 */
-	skb_orphan(skb);
 	sp = rxrpc_skb(skb);
 
 	/* dig out the RxRPC connection details */