tipc: split link outqueue

struct tipc_link contains one single queue for outgoing packets,
where both transmitted and waiting packets are queued.

This infrastructure is hard to maintain, because we need
to keep a number of fields to keep track of which packets are
sent or unsent, and the number of packets in each category.

A lot of code becomes simpler if we split this queue into a transmission
queue, where sent/unacknowledged packets are kept, and a backlog queue,
where we keep the not yet sent packets.

In this commit we do this separation.

Reviewed-by: Erik Hugne <erik.hugne@ericsson.com>
Reviewed-by: Ying Xue <ying.xue@windriver.com>
Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
Signed-off-by: David S. Miller <davem@davemloft.net>
diff --git a/net/tipc/link.c b/net/tipc/link.c
index 2652c32..7e0036f 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -194,10 +194,10 @@
 	tipc_node_lock(l_ptr->owner);
 
 	/* update counters used in statistical profiling of send traffic */
-	l_ptr->stats.accu_queue_sz += skb_queue_len(&l_ptr->outqueue);
+	l_ptr->stats.accu_queue_sz += skb_queue_len(&l_ptr->transmq);
 	l_ptr->stats.queue_sz_counts++;
 
-	skb = skb_peek(&l_ptr->outqueue);
+	skb = skb_peek(&l_ptr->transmq);
 	if (skb) {
 		struct tipc_msg *msg = buf_msg(skb);
 		u32 length = msg_size(msg);
@@ -229,7 +229,7 @@
 	/* do all other link processing performed on a periodic basis */
 	link_state_event(l_ptr, TIMEOUT_EVT);
 
-	if (l_ptr->next_out)
+	if (skb_queue_len(&l_ptr->backlogq))
 		tipc_link_push_packets(l_ptr);
 
 	tipc_node_unlock(l_ptr->owner);
@@ -313,8 +313,9 @@
 	link_init_max_pkt(l_ptr);
 
 	l_ptr->next_out_no = 1;
-	__skb_queue_head_init(&l_ptr->outqueue);
-	__skb_queue_head_init(&l_ptr->deferred_queue);
+	__skb_queue_head_init(&l_ptr->transmq);
+	__skb_queue_head_init(&l_ptr->backlogq);
+	__skb_queue_head_init(&l_ptr->deferdq);
 	skb_queue_head_init(&l_ptr->wakeupq);
 	skb_queue_head_init(&l_ptr->inputq);
 	skb_queue_head_init(&l_ptr->namedq);
@@ -400,7 +401,7 @@
  */
 void link_prepare_wakeup(struct tipc_link *link)
 {
-	uint pend_qsz = skb_queue_len(&link->outqueue);
+	uint pend_qsz = skb_queue_len(&link->backlogq);
 	struct sk_buff *skb, *tmp;
 
 	skb_queue_walk_safe(&link->wakeupq, skb, tmp) {
@@ -430,8 +431,9 @@
  */
 void tipc_link_purge_queues(struct tipc_link *l_ptr)
 {
-	__skb_queue_purge(&l_ptr->deferred_queue);
-	__skb_queue_purge(&l_ptr->outqueue);
+	__skb_queue_purge(&l_ptr->deferdq);
+	__skb_queue_purge(&l_ptr->transmq);
+	__skb_queue_purge(&l_ptr->backlogq);
 	tipc_link_reset_fragments(l_ptr);
 }
 
@@ -464,15 +466,15 @@
 	}
 
 	/* Clean up all queues, except inputq: */
-	__skb_queue_purge(&l_ptr->outqueue);
-	__skb_queue_purge(&l_ptr->deferred_queue);
+	__skb_queue_purge(&l_ptr->transmq);
+	__skb_queue_purge(&l_ptr->backlogq);
+	__skb_queue_purge(&l_ptr->deferdq);
 	if (!owner->inputq)
 		owner->inputq = &l_ptr->inputq;
 	skb_queue_splice_init(&l_ptr->wakeupq, owner->inputq);
 	if (!skb_queue_empty(owner->inputq))
 		owner->action_flags |= TIPC_MSG_EVT;
-	l_ptr->next_out = NULL;
-	l_ptr->unacked_window = 0;
+	l_ptr->rcv_unacked = 0;
 	l_ptr->checkpoint = 1;
 	l_ptr->next_out_no = 1;
 	l_ptr->fsm_msg_cnt = 0;
@@ -742,54 +744,51 @@
 		     struct sk_buff_head *list)
 {
 	struct tipc_msg *msg = buf_msg(skb_peek(list));
-	uint psz = msg_size(msg);
-	uint sndlim = link->queue_limit[0];
+	unsigned int maxwin = link->window;
 	uint imp = tipc_msg_tot_importance(msg);
 	uint mtu = link->max_pkt;
 	uint ack = mod(link->next_in_no - 1);
 	uint seqno = link->next_out_no;
 	uint bc_last_in = link->owner->bclink.last_in;
 	struct tipc_media_addr *addr = &link->media_addr;
-	struct sk_buff_head *outqueue = &link->outqueue;
+	struct sk_buff_head *transmq = &link->transmq;
+	struct sk_buff_head *backlogq = &link->backlogq;
 	struct sk_buff *skb, *tmp;
 
 	/* Match queue limits against msg importance: */
-	if (unlikely(skb_queue_len(outqueue) >= link->queue_limit[imp]))
+	if (unlikely(skb_queue_len(backlogq) >= link->queue_limit[imp]))
 		return tipc_link_cong(link, list);
 
 	/* Has valid packet limit been used ? */
-	if (unlikely(psz > mtu)) {
+	if (unlikely(msg_size(msg) > mtu)) {
 		__skb_queue_purge(list);
 		return -EMSGSIZE;
 	}
 
-	/* Prepare each packet for sending, and add to outqueue: */
+	/* Prepare each packet for sending, and add to relevant queue: */
 	skb_queue_walk_safe(list, skb, tmp) {
 		__skb_unlink(skb, list);
 		msg = buf_msg(skb);
-		msg_set_word(msg, 2, ((ack << 16) | mod(seqno)));
+		msg_set_seqno(msg, seqno);
+		msg_set_ack(msg, ack);
 		msg_set_bcast_ack(msg, bc_last_in);
 
-		if (skb_queue_len(outqueue) < sndlim) {
-			__skb_queue_tail(outqueue, skb);
-			tipc_bearer_send(net, link->bearer_id,
-					 skb, addr);
-			link->next_out = NULL;
-			link->unacked_window = 0;
-		} else if (tipc_msg_bundle(outqueue, skb, mtu)) {
+		if (likely(skb_queue_len(transmq) < maxwin)) {
+			__skb_queue_tail(transmq, skb);
+			tipc_bearer_send(net, link->bearer_id, skb, addr);
+			link->rcv_unacked = 0;
+			seqno++;
+			continue;
+		}
+		if (tipc_msg_bundle(skb_peek_tail(backlogq), skb, mtu)) {
 			link->stats.sent_bundled++;
 			continue;
-		} else if (tipc_msg_make_bundle(outqueue, skb, mtu,
-						link->addr)) {
+		}
+		if (tipc_msg_make_bundle(&skb, mtu, link->addr)) {
 			link->stats.sent_bundled++;
 			link->stats.sent_bundles++;
-			if (!link->next_out)
-				link->next_out = skb_peek_tail(outqueue);
-		} else {
-			__skb_queue_tail(outqueue, skb);
-			if (!link->next_out)
-				link->next_out = skb;
 		}
+		__skb_queue_tail(backlogq, skb);
 		seqno++;
 	}
 	link->next_out_no = seqno;
@@ -895,14 +894,6 @@
 	kfree_skb(buf);
 }
 
-struct sk_buff *tipc_skb_queue_next(const struct sk_buff_head *list,
-				    const struct sk_buff *skb)
-{
-	if (skb_queue_is_last(list, skb))
-		return NULL;
-	return skb->next;
-}
-
 /*
  * tipc_link_push_packets - push unsent packets to bearer
  *
@@ -911,30 +902,23 @@
  *
  * Called with node locked
  */
-void tipc_link_push_packets(struct tipc_link *l_ptr)
+void tipc_link_push_packets(struct tipc_link *link)
 {
-	struct sk_buff_head *outqueue = &l_ptr->outqueue;
-	struct sk_buff *skb = l_ptr->next_out;
+	struct sk_buff *skb;
 	struct tipc_msg *msg;
-	u32 next, first;
+	unsigned int ack = mod(link->next_in_no - 1);
 
-	skb_queue_walk_from(outqueue, skb) {
-		msg = buf_msg(skb);
-		next = msg_seqno(msg);
-		first = buf_seqno(skb_peek(outqueue));
-
-		if (mod(next - first) < l_ptr->queue_limit[0]) {
-			msg_set_ack(msg, mod(l_ptr->next_in_no - 1));
-			msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);
-			if (msg_user(msg) == MSG_BUNDLER)
-				TIPC_SKB_CB(skb)->bundling = false;
-			tipc_bearer_send(l_ptr->owner->net,
-					 l_ptr->bearer_id, skb,
-					 &l_ptr->media_addr);
-			l_ptr->next_out = tipc_skb_queue_next(outqueue, skb);
-		} else {
+	while (skb_queue_len(&link->transmq) < link->window) {
+		skb = __skb_dequeue(&link->backlogq);
+		if (!skb)
 			break;
-		}
+		msg = buf_msg(skb);
+		msg_set_ack(msg, ack);
+		msg_set_bcast_ack(msg, link->owner->bclink.last_in);
+		link->rcv_unacked = 0;
+		__skb_queue_tail(&link->transmq, skb);
+		tipc_bearer_send(link->owner->net, link->bearer_id,
+				 skb, &link->media_addr);
 	}
 }
 
@@ -1021,8 +1005,8 @@
 		l_ptr->stale_count = 1;
 	}
 
-	skb_queue_walk_from(&l_ptr->outqueue, skb) {
-		if (!retransmits || skb == l_ptr->next_out)
+	skb_queue_walk_from(&l_ptr->transmq, skb) {
+		if (!retransmits)
 			break;
 		msg = buf_msg(skb);
 		msg_set_ack(msg, mod(l_ptr->next_in_no - 1));
@@ -1039,12 +1023,12 @@
 {
 	u32 seq_no;
 
-	if (skb_queue_empty(&link->deferred_queue))
+	if (skb_queue_empty(&link->deferdq))
 		return;
 
-	seq_no = buf_seqno(skb_peek(&link->deferred_queue));
+	seq_no = buf_seqno(skb_peek(&link->deferdq));
 	if (seq_no == mod(link->next_in_no))
-		skb_queue_splice_tail_init(&link->deferred_queue, list);
+		skb_queue_splice_tail_init(&link->deferdq, list);
 }
 
 /**
@@ -1121,17 +1105,16 @@
 			tipc_bclink_acknowledge(n_ptr, msg_bcast_ack(msg));
 
 		released = 0;
-		skb_queue_walk_safe(&l_ptr->outqueue, skb1, tmp) {
-			if (skb1 == l_ptr->next_out ||
-			    more(buf_seqno(skb1), ackd))
+		skb_queue_walk_safe(&l_ptr->transmq, skb1, tmp) {
+			if (more(buf_seqno(skb1), ackd))
 				break;
-			 __skb_unlink(skb1, &l_ptr->outqueue);
+			 __skb_unlink(skb1, &l_ptr->transmq);
 			 kfree_skb(skb1);
 			 released = 1;
 		}
 
 		/* Try sending any messages link endpoint has pending */
-		if (unlikely(l_ptr->next_out))
+		if (unlikely(skb_queue_len(&l_ptr->backlogq)))
 			tipc_link_push_packets(l_ptr);
 
 		if (released && !skb_queue_empty(&l_ptr->wakeupq))
@@ -1166,10 +1149,9 @@
 			goto unlock;
 		}
 		l_ptr->next_in_no++;
-		if (unlikely(!skb_queue_empty(&l_ptr->deferred_queue)))
+		if (unlikely(!skb_queue_empty(&l_ptr->deferdq)))
 			link_retrieve_defq(l_ptr, &head);
-
-		if (unlikely(++l_ptr->unacked_window >= TIPC_MIN_LINK_WIN)) {
+		if (unlikely(++l_ptr->rcv_unacked >= TIPC_MIN_LINK_WIN)) {
 			l_ptr->stats.sent_acks++;
 			tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, 0, 0, 0, 0);
 		}
@@ -1336,9 +1318,9 @@
 		return;
 	}
 
-	if (tipc_link_defer_pkt(&l_ptr->deferred_queue, buf)) {
+	if (tipc_link_defer_pkt(&l_ptr->deferdq, buf)) {
 		l_ptr->stats.deferred_recv++;
-		if ((skb_queue_len(&l_ptr->deferred_queue) % 16) == 1)
+		if ((skb_queue_len(&l_ptr->deferdq) % TIPC_MIN_LINK_WIN) == 1)
 			tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, 0, 0, 0, 0);
 	} else {
 		l_ptr->stats.duplicates++;
@@ -1375,11 +1357,11 @@
 
 		if (!tipc_link_is_up(l_ptr))
 			return;
-		if (l_ptr->next_out)
-			next_sent = buf_seqno(l_ptr->next_out);
+		if (skb_queue_len(&l_ptr->backlogq))
+			next_sent = buf_seqno(skb_peek(&l_ptr->backlogq));
 		msg_set_next_sent(msg, next_sent);
-		if (!skb_queue_empty(&l_ptr->deferred_queue)) {
-			u32 rec = buf_seqno(skb_peek(&l_ptr->deferred_queue));
+		if (!skb_queue_empty(&l_ptr->deferdq)) {
+			u32 rec = buf_seqno(skb_peek(&l_ptr->deferdq));
 			gap = mod(rec - mod(l_ptr->next_in_no));
 		}
 		msg_set_seq_gap(msg, gap);
@@ -1431,10 +1413,9 @@
 
 	skb_copy_to_linear_data(buf, msg, sizeof(l_ptr->proto_msg));
 	buf->priority = TC_PRIO_CONTROL;
-
 	tipc_bearer_send(l_ptr->owner->net, l_ptr->bearer_id, buf,
 			 &l_ptr->media_addr);
-	l_ptr->unacked_window = 0;
+	l_ptr->rcv_unacked = 0;
 	kfree_skb(buf);
 }
 
@@ -1569,7 +1550,7 @@
 		}
 		if (msg_seq_gap(msg)) {
 			l_ptr->stats.recv_nacks++;
-			tipc_link_retransmit(l_ptr, skb_peek(&l_ptr->outqueue),
+			tipc_link_retransmit(l_ptr, skb_peek(&l_ptr->transmq),
 					     msg_seq_gap(msg));
 		}
 		break;
@@ -1616,7 +1597,7 @@
  */
 void tipc_link_failover_send_queue(struct tipc_link *l_ptr)
 {
-	u32 msgcount = skb_queue_len(&l_ptr->outqueue);
+	int msgcount;
 	struct tipc_link *tunnel = l_ptr->owner->active_links[0];
 	struct tipc_msg tunnel_hdr;
 	struct sk_buff *skb;
@@ -1627,10 +1608,12 @@
 
 	tipc_msg_init(link_own_addr(l_ptr), &tunnel_hdr, CHANGEOVER_PROTOCOL,
 		      ORIGINAL_MSG, INT_H_SIZE, l_ptr->addr);
+	skb_queue_splice_tail_init(&l_ptr->backlogq, &l_ptr->transmq);
+	msgcount = skb_queue_len(&l_ptr->transmq);
 	msg_set_bearer_id(&tunnel_hdr, l_ptr->peer_bearer_id);
 	msg_set_msgcnt(&tunnel_hdr, msgcount);
 
-	if (skb_queue_empty(&l_ptr->outqueue)) {
+	if (skb_queue_empty(&l_ptr->transmq)) {
 		skb = tipc_buf_acquire(INT_H_SIZE);
 		if (skb) {
 			skb_copy_to_linear_data(skb, &tunnel_hdr, INT_H_SIZE);
@@ -1646,7 +1629,7 @@
 	split_bundles = (l_ptr->owner->active_links[0] !=
 			 l_ptr->owner->active_links[1]);
 
-	skb_queue_walk(&l_ptr->outqueue, skb) {
+	skb_queue_walk(&l_ptr->transmq, skb) {
 		struct tipc_msg *msg = buf_msg(skb);
 
 		if ((msg_user(msg) == MSG_BUNDLER) && split_bundles) {
@@ -1677,39 +1660,46 @@
  * and sequence order is preserved per sender/receiver socket pair.
  * Owner node is locked.
  */
-void tipc_link_dup_queue_xmit(struct tipc_link *l_ptr,
-			      struct tipc_link *tunnel)
+void tipc_link_dup_queue_xmit(struct tipc_link *link,
+			      struct tipc_link *tnl)
 {
 	struct sk_buff *skb;
-	struct tipc_msg tunnel_hdr;
+	struct tipc_msg tnl_hdr;
+	struct sk_buff_head *queue = &link->transmq;
+	int mcnt;
 
-	tipc_msg_init(link_own_addr(l_ptr), &tunnel_hdr, CHANGEOVER_PROTOCOL,
-		      DUPLICATE_MSG, INT_H_SIZE, l_ptr->addr);
-	msg_set_msgcnt(&tunnel_hdr, skb_queue_len(&l_ptr->outqueue));
-	msg_set_bearer_id(&tunnel_hdr, l_ptr->peer_bearer_id);
-	skb_queue_walk(&l_ptr->outqueue, skb) {
+	tipc_msg_init(link_own_addr(link), &tnl_hdr, CHANGEOVER_PROTOCOL,
+		      DUPLICATE_MSG, INT_H_SIZE, link->addr);
+	mcnt = skb_queue_len(&link->transmq) + skb_queue_len(&link->backlogq);
+	msg_set_msgcnt(&tnl_hdr, mcnt);
+	msg_set_bearer_id(&tnl_hdr, link->peer_bearer_id);
+
+tunnel_queue:
+	skb_queue_walk(queue, skb) {
 		struct sk_buff *outskb;
 		struct tipc_msg *msg = buf_msg(skb);
-		u32 length = msg_size(msg);
+		u32 len = msg_size(msg);
 
-		if (msg_user(msg) == MSG_BUNDLER)
-			msg_set_type(msg, CLOSED_MSG);
-		msg_set_ack(msg, mod(l_ptr->next_in_no - 1));	/* Update */
-		msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);
-		msg_set_size(&tunnel_hdr, length + INT_H_SIZE);
-		outskb = tipc_buf_acquire(length + INT_H_SIZE);
+		msg_set_ack(msg, mod(link->next_in_no - 1));
+		msg_set_bcast_ack(msg, link->owner->bclink.last_in);
+		msg_set_size(&tnl_hdr, len + INT_H_SIZE);
+		outskb = tipc_buf_acquire(len + INT_H_SIZE);
 		if (outskb == NULL) {
 			pr_warn("%sunable to send duplicate msg\n",
 				link_co_err);
 			return;
 		}
-		skb_copy_to_linear_data(outskb, &tunnel_hdr, INT_H_SIZE);
-		skb_copy_to_linear_data_offset(outskb, INT_H_SIZE, skb->data,
-					       length);
-		__tipc_link_xmit_skb(tunnel, outskb);
-		if (!tipc_link_is_up(l_ptr))
+		skb_copy_to_linear_data(outskb, &tnl_hdr, INT_H_SIZE);
+		skb_copy_to_linear_data_offset(outskb, INT_H_SIZE,
+					       skb->data, len);
+		__tipc_link_xmit_skb(tnl, outskb);
+		if (!tipc_link_is_up(link))
 			return;
 	}
+	if (queue == &link->backlogq)
+		return;
+	queue = &link->backlogq;
+	goto tunnel_queue;
 }
 
 /* tipc_link_dup_rcv(): Receive a tunnelled DUPLICATE_MSG packet.
@@ -1823,6 +1813,8 @@
 
 void tipc_link_set_queue_limits(struct tipc_link *l_ptr, u32 window)
 {
+	l_ptr->window = window;
+
 	/* Data messages from this node, inclusive FIRST_FRAGM */
 	l_ptr->queue_limit[TIPC_LOW_IMPORTANCE] = window;
 	l_ptr->queue_limit[TIPC_MEDIUM_IMPORTANCE] = (window / 3) * 4;