tipc: split link outqueue

struct tipc_link contains one single queue for outgoing packets,
where both transmitted and waiting packets are queued.

This infrastructure is hard to maintain, because we need
to keep a number of fields to keep track of which packets are
sent or unsent, and the number of packets in each category.

A lot of code becomes simpler if we split this queue into a transmission
queue, where sent/unacknowledged packets are kept, and a backlog queue,
where we keep the not yet sent packets.

In this commit we do this separation.

Reviewed-by: Erik Hugne <erik.hugne@ericsson.com>
Reviewed-by: Ying Xue <ying.xue@windriver.com>
Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
Signed-off-by: David S. Miller <davem@davemloft.net>
diff --git a/net/tipc/msg.c b/net/tipc/msg.c
index 333d2ae..47c8fd8 100644
--- a/net/tipc/msg.c
+++ b/net/tipc/msg.c
@@ -330,33 +330,36 @@
 
 /**
  * tipc_msg_bundle(): Append contents of a buffer to tail of an existing one
- * @list: the buffer chain of the existing buffer ("bundle")
+ * @bskb: the buffer to append to ("bundle")
  * @skb:  buffer to be appended
  * @mtu:  max allowable size for the bundle buffer
  * Consumes buffer if successful
  * Returns true if bundling could be performed, otherwise false
  */
-bool tipc_msg_bundle(struct sk_buff_head *list, struct sk_buff *skb, u32 mtu)
+bool tipc_msg_bundle(struct sk_buff *bskb, struct sk_buff *skb, u32 mtu)
 {
-	struct sk_buff *bskb = skb_peek_tail(list);
-	struct tipc_msg *bmsg = buf_msg(bskb);
+	struct tipc_msg *bmsg;
 	struct tipc_msg *msg = buf_msg(skb);
-	unsigned int bsz = msg_size(bmsg);
+	unsigned int bsz;
 	unsigned int msz = msg_size(msg);
-	u32 start = align(bsz);
+	u32 start, pad;
 	u32 max = mtu - INT_H_SIZE;
-	u32 pad = start - bsz;
 
 	if (likely(msg_user(msg) == MSG_FRAGMENTER))
 		return false;
+	if (!bskb)
+		return false;
+	bmsg = buf_msg(bskb);
+	bsz = msg_size(bmsg);
+	start = align(bsz);
+	pad = start - bsz;
+
 	if (unlikely(msg_user(msg) == CHANGEOVER_PROTOCOL))
 		return false;
 	if (unlikely(msg_user(msg) == BCAST_PROTOCOL))
 		return false;
 	if (likely(msg_user(bmsg) != MSG_BUNDLER))
 		return false;
-	if (likely(!TIPC_SKB_CB(bskb)->bundling))
-		return false;
 	if (unlikely(skb_tailroom(bskb) < (pad + msz)))
 		return false;
 	if (unlikely(max < (start + msz)))
@@ -419,12 +422,11 @@
  * Replaces buffer if successful
  * Returns true if success, otherwise false
  */
-bool tipc_msg_make_bundle(struct sk_buff_head *list,
-			  struct sk_buff *skb, u32 mtu, u32 dnode)
+bool tipc_msg_make_bundle(struct sk_buff **skb, u32 mtu, u32 dnode)
 {
 	struct sk_buff *bskb;
 	struct tipc_msg *bmsg;
-	struct tipc_msg *msg = buf_msg(skb);
+	struct tipc_msg *msg = buf_msg(*skb);
 	u32 msz = msg_size(msg);
 	u32 max = mtu - INT_H_SIZE;
 
@@ -448,9 +450,9 @@
 	msg_set_seqno(bmsg, msg_seqno(msg));
 	msg_set_ack(bmsg, msg_ack(msg));
 	msg_set_bcast_ack(bmsg, msg_bcast_ack(msg));
-	TIPC_SKB_CB(bskb)->bundling = true;
-	__skb_queue_tail(list, bskb);
-	return tipc_msg_bundle(list, skb, mtu);
+	tipc_msg_bundle(bskb, *skb, mtu);
+	*skb = bskb;
+	return true;
 }
 
 /**