tipc: reduce locking scope during packet reception

We convert packet/message reception according to the same principle
we have been using for message sending and timeout handling:

We move the function tipc_rcv() to node.c, hence handling the initial
packet reception at the link aggregation level. The function grabs
the node lock, selects the receiving link, and accesses it via a new
call tipc_link_rcv(). This function appends buffers to the input
queue for delivery upwards, but it may also append outgoing packets
to the xmit queue, just as we do during regular message sending. The
latter will happen when buffers are forwarded from the link backlog,
or when retransmission is requested.

Upon return of this function, and after having released the node lock,
tipc_rcv() delivers/tranmsits the contents of those queues, but it may
also perform actions such as link activation or reset, as indicated by
the return flags from the link.

This reduces the number of cpu cycles spent inside the node spinlock,
and reduces contention on that lock.

Reviewed-by: Ying Xue <ying.xue@windriver.com>
Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
Signed-off-by: David S. Miller <davem@davemloft.net>
diff --git a/net/tipc/msg.h b/net/tipc/msg.h
index 4dc66d9..2f1563b 100644
--- a/net/tipc/msg.h
+++ b/net/tipc/msg.h
@@ -38,6 +38,7 @@
 #define _TIPC_MSG_H
 
 #include <linux/tipc.h>
+#include "core.h"
 
 /*
  * Constants and routines used to read and write TIPC payload message headers
@@ -658,12 +659,12 @@
 /*
  * Word 5
  */
-static inline u32 msg_session(struct tipc_msg *m)
+static inline u16 msg_session(struct tipc_msg *m)
 {
 	return msg_bits(m, 5, 16, 0xffff);
 }
 
-static inline void msg_set_session(struct tipc_msg *m, u32 n)
+static inline void msg_set_session(struct tipc_msg *m, u16 n)
 {
 	msg_set_bits(m, 5, 16, 0xffff, n);
 }
@@ -766,10 +767,19 @@
 	msg_set_bits(m, 9, 0, 0xffff, n);
 }
 
+static inline bool msg_is_traffic(struct tipc_msg *m)
+{
+	if (likely(msg_user(m) != LINK_PROTOCOL))
+		return true;
+	if ((msg_type(m) == RESET_MSG) || (msg_type(m) == ACTIVATE_MSG))
+		return false;
+	return true;
+}
+
 static inline bool msg_peer_is_up(struct tipc_msg *m)
 {
-	if (likely(msg_user(m) != LINK_PROTOCOL) || (msg_type(m) == STATE_MSG))
-		return true;
+	if (likely(msg_is_traffic(m)))
+		return false;
 	return msg_redundant_link(m);
 }
 
@@ -886,4 +896,36 @@
 	return rv;
 }
 
+/* tipc_skb_queue_sorted(); sort pkt into list according to sequence number
+ * @list: list to be appended to
+ * @skb: buffer to add
+ * Returns true if queue should treated further, otherwise false
+ */
+static inline bool __tipc_skb_queue_sorted(struct sk_buff_head *list,
+					   struct sk_buff *skb)
+{
+	struct sk_buff *_skb, *tmp;
+	struct tipc_msg *hdr = buf_msg(skb);
+	u16 seqno = msg_seqno(hdr);
+
+	if (skb_queue_empty(list) || (msg_user(hdr) == LINK_PROTOCOL)) {
+		__skb_queue_head(list, skb);
+		return true;
+	}
+	if (likely(less(seqno, buf_seqno(skb_peek(list))))) {
+		__skb_queue_head(list, skb);
+		return true;
+	}
+	if (!more(seqno, buf_seqno(skb_peek_tail(list)))) {
+		skb_queue_walk_safe(list, _skb, tmp) {
+			if (likely(less(seqno, buf_seqno(_skb)))) {
+				__skb_queue_before(list, _skb, skb);
+				return true;
+			}
+		}
+	}
+	__skb_queue_tail(list, skb);
+	return false;
+}
+
 #endif