Merge branch 'udp-reduce-cache-pressure'

Paolo Abeni says:

====================
udp: reduce cache pressure

In the most common use case, many skb fields are not used by recvmsg(), and
the few ones actually accessed lays on cold cachelines, which leads to several
cache miss per packet.

This patch series attempts to reduce such misses with different strategies:
* caching the interesting fields in the scratched space
* avoid accessing at all uninteresting fields
* prefetching

Tested using the udp_sink program by Jesper[1] as the receiver, an h/w l4 rx
hash on the ingress nic, so that the number of ingress nic rx queues hit by the
udp traffic could be controlled via ethtool -L.

The udp_sink program was bound to the first idle cpu, to get more
stable numbers.

On a single numa node receiver:

nic rx queues           vanilla                 patched kernel      delta
1                       1850 kpps               1850 kpps           0%
2                       2370 kpps               2700 kpps           13.9%
16                      2000 kpps               2220 kpps           11%

[1] https://github.com/netoptimizer/network-testing/blob/master/src/udp_sink.c

v1 -> v2:
  - replaced secpath_reset() with skb_release_head_state()
  - changed udp_dev_scratch fields types to u{32,16} variant,
    replaced bitfield with bool

v2 -> v3:
  - no changes, tested against apachebench for performances regression
====================

Signed-off-by: David S. Miller <davem@davemloft.net>
diff --git a/include/linux/skbuff.h b/include/linux/skbuff.h
index d460a4c..d66d4fe 100644
--- a/include/linux/skbuff.h
+++ b/include/linux/skbuff.h
@@ -867,10 +867,25 @@
 #endif
 }
 
+/* decrement the reference count and return true if we can free the skb */
+static inline bool skb_unref(struct sk_buff *skb)
+{
+	if (unlikely(!skb))
+		return false;
+	if (likely(atomic_read(&skb->users) == 1))
+		smp_rmb();
+	else if (likely(!atomic_dec_and_test(&skb->users)))
+		return false;
+
+	return true;
+}
+
+void skb_release_head_state(struct sk_buff *skb);
 void kfree_skb(struct sk_buff *skb);
 void kfree_skb_list(struct sk_buff *segs);
 void skb_tx_error(struct sk_buff *skb);
 void consume_skb(struct sk_buff *skb);
+void consume_stateless_skb(struct sk_buff *skb);
 void  __kfree_skb(struct sk_buff *skb);
 extern struct kmem_cache *skbuff_head_cache;
 
diff --git a/net/core/datagram.c b/net/core/datagram.c
index bc46118..e5311a7 100644
--- a/net/core/datagram.c
+++ b/net/core/datagram.c
@@ -330,9 +330,7 @@
 {
 	bool slow;
 
-	if (likely(atomic_read(&skb->users) == 1))
-		smp_rmb();
-	else if (likely(!atomic_dec_and_test(&skb->users))) {
+	if (!skb_unref(skb)) {
 		sk_peek_offset_bwd(sk, len);
 		return;
 	}
diff --git a/net/core/skbuff.c b/net/core/skbuff.c
index e508c1e..3046027 100644
--- a/net/core/skbuff.c
+++ b/net/core/skbuff.c
@@ -643,12 +643,10 @@
 	kmem_cache_free(skbuff_fclone_cache, fclones);
 }
 
-static void skb_release_head_state(struct sk_buff *skb)
+void skb_release_head_state(struct sk_buff *skb)
 {
 	skb_dst_drop(skb);
-#ifdef CONFIG_XFRM
-	secpath_put(skb->sp);
-#endif
+	secpath_reset(skb);
 	if (skb->destructor) {
 		WARN_ON(in_irq());
 		skb->destructor(skb);
@@ -694,12 +692,9 @@
  */
 void kfree_skb(struct sk_buff *skb)
 {
-	if (unlikely(!skb))
+	if (!skb_unref(skb))
 		return;
-	if (likely(atomic_read(&skb->users) == 1))
-		smp_rmb();
-	else if (likely(!atomic_dec_and_test(&skb->users)))
-		return;
+
 	trace_kfree_skb(skb, __builtin_return_address(0));
 	__kfree_skb(skb);
 }
@@ -746,17 +741,32 @@
  */
 void consume_skb(struct sk_buff *skb)
 {
-	if (unlikely(!skb))
+	if (!skb_unref(skb))
 		return;
-	if (likely(atomic_read(&skb->users) == 1))
-		smp_rmb();
-	else if (likely(!atomic_dec_and_test(&skb->users)))
-		return;
+
 	trace_consume_skb(skb);
 	__kfree_skb(skb);
 }
 EXPORT_SYMBOL(consume_skb);
 
+/**
+ *	consume_stateless_skb - free an skbuff, assuming it is stateless
+ *	@skb: buffer to free
+ *
+ *	Works like consume_skb(), but this variant assumes that all the head
+ *	states have been already dropped.
+ */
+void consume_stateless_skb(struct sk_buff *skb)
+{
+	if (!skb_unref(skb))
+		return;
+
+	trace_consume_skb(skb);
+	if (likely(skb->head))
+		skb_release_data(skb);
+	kfree_skbmem(skb);
+}
+
 void __kfree_skb_flush(void)
 {
 	struct napi_alloc_cache *nc = this_cpu_ptr(&napi_alloc_cache);
diff --git a/net/ipv4/udp.c b/net/ipv4/udp.c
index fdcb743..2bc638c 100644
--- a/net/ipv4/udp.c
+++ b/net/ipv4/udp.c
@@ -1163,6 +1163,83 @@
 	return ret;
 }
 
+/* Copy as much information as possible into skb->dev_scratch to avoid
+ * possibly multiple cache miss on dequeue();
+ */
+#if BITS_PER_LONG == 64
+
+/* we can store multiple info here: truesize, len and the bit needed to
+ * compute skb_csum_unnecessary will be on cold cache lines at recvmsg
+ * time.
+ * skb->len can be stored on 16 bits since the udp header has been already
+ * validated and pulled.
+ */
+struct udp_dev_scratch {
+	u32 truesize;
+	u16 len;
+	bool is_linear;
+	bool csum_unnecessary;
+};
+
+static void udp_set_dev_scratch(struct sk_buff *skb)
+{
+	struct udp_dev_scratch *scratch;
+
+	BUILD_BUG_ON(sizeof(struct udp_dev_scratch) > sizeof(long));
+	scratch = (struct udp_dev_scratch *)&skb->dev_scratch;
+	scratch->truesize = skb->truesize;
+	scratch->len = skb->len;
+	scratch->csum_unnecessary = !!skb_csum_unnecessary(skb);
+	scratch->is_linear = !skb_is_nonlinear(skb);
+}
+
+static int udp_skb_truesize(struct sk_buff *skb)
+{
+	return ((struct udp_dev_scratch *)&skb->dev_scratch)->truesize;
+}
+
+static unsigned int udp_skb_len(struct sk_buff *skb)
+{
+	return ((struct udp_dev_scratch *)&skb->dev_scratch)->len;
+}
+
+static bool udp_skb_csum_unnecessary(struct sk_buff *skb)
+{
+	return ((struct udp_dev_scratch *)&skb->dev_scratch)->csum_unnecessary;
+}
+
+static bool udp_skb_is_linear(struct sk_buff *skb)
+{
+	return ((struct udp_dev_scratch *)&skb->dev_scratch)->is_linear;
+}
+
+#else
+static void udp_set_dev_scratch(struct sk_buff *skb)
+{
+	skb->dev_scratch = skb->truesize;
+}
+
+static int udp_skb_truesize(struct sk_buff *skb)
+{
+	return skb->dev_scratch;
+}
+
+static unsigned int udp_skb_len(struct sk_buff *skb)
+{
+	return skb->len;
+}
+
+static bool udp_skb_csum_unnecessary(struct sk_buff *skb)
+{
+	return skb_csum_unnecessary(skb);
+}
+
+static bool udp_skb_is_linear(struct sk_buff *skb)
+{
+	return !skb_is_nonlinear(skb);
+}
+#endif
+
 /* fully reclaim rmem/fwd memory allocated for skb */
 static void udp_rmem_release(struct sock *sk, int size, int partial,
 			     bool rx_queue_lock_held)
@@ -1213,14 +1290,16 @@
  */
 void udp_skb_destructor(struct sock *sk, struct sk_buff *skb)
 {
-	udp_rmem_release(sk, skb->dev_scratch, 1, false);
+	prefetch(&skb->data);
+	udp_rmem_release(sk, udp_skb_truesize(skb), 1, false);
 }
 EXPORT_SYMBOL(udp_skb_destructor);
 
 /* as above, but the caller held the rx queue lock, too */
 static void udp_skb_dtor_locked(struct sock *sk, struct sk_buff *skb)
 {
-	udp_rmem_release(sk, skb->dev_scratch, 1, true);
+	prefetch(&skb->data);
+	udp_rmem_release(sk, udp_skb_truesize(skb), 1, true);
 }
 
 /* Idea of busylocks is to let producers grab an extra spinlock
@@ -1274,10 +1353,7 @@
 		busy = busylock_acquire(sk);
 	}
 	size = skb->truesize;
-	/* Copy skb->truesize into skb->dev_scratch to avoid a cache line miss
-	 * in udp_skb_destructor()
-	 */
-	skb->dev_scratch = size;
+	udp_set_dev_scratch(skb);
 
 	/* we drop only if the receive buf is full and the receive
 	 * queue contains some other skb
@@ -1359,7 +1435,8 @@
 		sk_peek_offset_bwd(sk, len);
 		unlock_sock_fast(sk, slow);
 	}
-	consume_skb(skb);
+
+	consume_stateless_skb(skb);
 }
 EXPORT_SYMBOL_GPL(skb_consume_udp);
 
@@ -1514,6 +1591,18 @@
 }
 EXPORT_SYMBOL_GPL(__skb_recv_udp);
 
+static int copy_linear_skb(struct sk_buff *skb, int len, int off,
+			   struct iov_iter *to)
+{
+	int n, copy = len - off;
+
+	n = copy_to_iter(skb->data + off, copy, to);
+	if (n == copy)
+		return 0;
+
+	return -EFAULT;
+}
+
 /*
  * 	This should be easy, if there is something there we
  * 	return it, otherwise we block.
@@ -1540,7 +1629,7 @@
 	if (!skb)
 		return err;
 
-	ulen = skb->len;
+	ulen = udp_skb_len(skb);
 	copied = len;
 	if (copied > ulen - off)
 		copied = ulen - off;
@@ -1555,14 +1644,18 @@
 
 	if (copied < ulen || peeking ||
 	    (is_udplite && UDP_SKB_CB(skb)->partial_cov)) {
-		checksum_valid = !udp_lib_checksum_complete(skb);
+		checksum_valid = udp_skb_csum_unnecessary(skb) ||
+				!__udp_lib_checksum_complete(skb);
 		if (!checksum_valid)
 			goto csum_copy_err;
 	}
 
-	if (checksum_valid || skb_csum_unnecessary(skb))
-		err = skb_copy_datagram_msg(skb, off, msg, copied);
-	else {
+	if (checksum_valid || udp_skb_csum_unnecessary(skb)) {
+		if (udp_skb_is_linear(skb))
+			err = copy_linear_skb(skb, copied, off, &msg->msg_iter);
+		else
+			err = skb_copy_datagram_msg(skb, off, msg, copied);
+	} else {
 		err = skb_copy_and_csum_datagram_msg(skb, off, msg);
 
 		if (err == -EINVAL)
@@ -1739,6 +1832,9 @@
 		sk_mark_napi_id_once(sk, skb);
 	}
 
+	/* clear all pending head states while they are hot in the cache */
+	skb_release_head_state(skb);
+
 	rc = __udp_enqueue_schedule_skb(sk, skb);
 	if (rc < 0) {
 		int is_udplite = IS_UDPLITE(sk);