tipc: transfer broadcast nacks in link state messages

When we send broadcasts in clusters of more 70-80 nodes, we sometimes
see the broadcast link resetting because of an excessive number of
retransmissions. This is caused by a combination of two factors:

1) A 'NACK crunch", where loss of broadcast packets is discovered
   and NACK'ed by several nodes simultaneously, leading to multiple
   redundant broadcast retransmissions.

2) The fact that the NACKS as such also are sent as broadcast, leading
   to excessive load and packet loss on the transmitting switch/bridge.

This commit deals with the latter problem, by moving sending of
broadcast nacks from the dedicated BCAST_PROTOCOL/NACK message type
to regular unicast LINK_PROTOCOL/STATE messages. We allocate 10 unused
bits in word 8 of the said message for this purpose, and introduce a
new capability bit, TIPC_BCAST_STATE_NACK in order to keep the change
backwards compatible.

Reviewed-by: Ying Xue <ying.xue@windriver.com>
Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
Signed-off-by: David S. Miller <davem@davemloft.net>
diff --git a/net/tipc/link.c b/net/tipc/link.c
index 2c6e1b9..136316f 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -367,6 +367,18 @@
 	return l->ackers;
 }
 
+u16 link_bc_rcv_gap(struct tipc_link *l)
+{
+	struct sk_buff *skb = skb_peek(&l->deferdq);
+	u16 gap = 0;
+
+	if (more(l->snd_nxt, l->rcv_nxt))
+		gap = l->snd_nxt - l->rcv_nxt;
+	if (skb)
+		gap = buf_seqno(skb) - l->rcv_nxt;
+	return gap;
+}
+
 void tipc_link_set_mtu(struct tipc_link *l, int mtu)
 {
 	l->mtu = mtu;
@@ -1135,7 +1147,10 @@
 		if (((l->rcv_nxt ^ tipc_own_addr(l->net)) & 0xf) != 0xf)
 			return 0;
 		l->rcv_unacked = 0;
-		return TIPC_LINK_SND_BC_ACK;
+
+		/* Use snd_nxt to store peer's snd_nxt in broadcast rcv link */
+		l->snd_nxt = l->rcv_nxt;
+		return TIPC_LINK_SND_STATE;
 	}
 
 	/* Unicast ACK */
@@ -1236,7 +1251,7 @@
 			rc |= tipc_link_input(l, skb, l->inputq);
 		if (unlikely(++l->rcv_unacked >= TIPC_MIN_LINK_WIN))
 			rc |= tipc_link_build_state_msg(l, xmitq);
-		if (unlikely(rc & ~TIPC_LINK_SND_BC_ACK))
+		if (unlikely(rc & ~TIPC_LINK_SND_STATE))
 			break;
 	} while ((skb = __skb_dequeue(defq)));
 
@@ -1250,10 +1265,11 @@
 				      u16 rcvgap, int tolerance, int priority,
 				      struct sk_buff_head *xmitq)
 {
+	struct tipc_link *bcl = l->bc_rcvlink;
 	struct sk_buff *skb;
 	struct tipc_msg *hdr;
 	struct sk_buff_head *dfq = &l->deferdq;
-	bool node_up = link_is_up(l->bc_rcvlink);
+	bool node_up = link_is_up(bcl);
 	struct tipc_mon_state *mstate = &l->mon_state;
 	int dlen = 0;
 	void *data;
@@ -1281,7 +1297,7 @@
 	msg_set_net_plane(hdr, l->net_plane);
 	msg_set_next_sent(hdr, l->snd_nxt);
 	msg_set_ack(hdr, l->rcv_nxt - 1);
-	msg_set_bcast_ack(hdr, l->bc_rcvlink->rcv_nxt - 1);
+	msg_set_bcast_ack(hdr, bcl->rcv_nxt - 1);
 	msg_set_last_bcast(hdr, l->bc_sndlink->snd_nxt - 1);
 	msg_set_link_tolerance(hdr, tolerance);
 	msg_set_linkprio(hdr, priority);
@@ -1291,6 +1307,7 @@
 
 	if (mtyp == STATE_MSG) {
 		msg_set_seq_gap(hdr, rcvgap);
+		msg_set_bc_gap(hdr, link_bc_rcv_gap(bcl));
 		msg_set_probe(hdr, probe);
 		tipc_mon_prep(l->net, data, &dlen, mstate, l->bearer_id);
 		msg_set_size(hdr, INT_H_SIZE + dlen);
@@ -1575,49 +1592,68 @@
 
 /* tipc_link_bc_sync_rcv - update rcv link according to peer's send state
  */
-void tipc_link_bc_sync_rcv(struct tipc_link *l, struct tipc_msg *hdr,
-			   struct sk_buff_head *xmitq)
+int tipc_link_bc_sync_rcv(struct tipc_link *l, struct tipc_msg *hdr,
+			  struct sk_buff_head *xmitq)
 {
 	u16 peers_snd_nxt = msg_bc_snd_nxt(hdr);
+	u16 from = msg_bcast_ack(hdr) + 1;
+	u16 to = from + msg_bc_gap(hdr) - 1;
+	int rc = 0;
 
 	if (!link_is_up(l))
-		return;
+		return rc;
 
 	if (!msg_peer_node_is_up(hdr))
-		return;
+		return rc;
 
 	/* Open when peer ackowledges our bcast init msg (pkt #1) */
 	if (msg_ack(hdr))
 		l->bc_peer_is_up = true;
 
 	if (!l->bc_peer_is_up)
-		return;
+		return rc;
 
 	/* Ignore if peers_snd_nxt goes beyond receive window */
 	if (more(peers_snd_nxt, l->rcv_nxt + l->window))
-		return;
+		return rc;
+
+	if (!less(to, from)) {
+		rc = tipc_link_retrans(l->bc_sndlink, from, to, xmitq);
+		l->stats.recv_nacks++;
+	}
+
+	l->snd_nxt = peers_snd_nxt;
+	if (link_bc_rcv_gap(l))
+		rc |= TIPC_LINK_SND_STATE;
+
+	/* Return now if sender supports nack via STATE messages */
+	if (l->peer_caps & TIPC_BCAST_STATE_NACK)
+		return rc;
+
+	/* Otherwise, be backwards compatible */
 
 	if (!more(peers_snd_nxt, l->rcv_nxt)) {
 		l->nack_state = BC_NACK_SND_CONDITIONAL;
-		return;
+		return 0;
 	}
 
 	/* Don't NACK if one was recently sent or peeked */
 	if (l->nack_state == BC_NACK_SND_SUPPRESS) {
 		l->nack_state = BC_NACK_SND_UNCONDITIONAL;
-		return;
+		return 0;
 	}
 
 	/* Conditionally delay NACK sending until next synch rcv */
 	if (l->nack_state == BC_NACK_SND_CONDITIONAL) {
 		l->nack_state = BC_NACK_SND_UNCONDITIONAL;
 		if ((peers_snd_nxt - l->rcv_nxt) < TIPC_MIN_LINK_WIN)
-			return;
+			return 0;
 	}
 
 	/* Send NACK now but suppress next one */
 	tipc_link_build_bc_proto_msg(l, true, peers_snd_nxt, xmitq);
 	l->nack_state = BC_NACK_SND_SUPPRESS;
+	return 0;
 }
 
 void tipc_link_bc_ack_rcv(struct tipc_link *l, u16 acked,
@@ -1654,6 +1690,8 @@
 }
 
 /* tipc_link_bc_nack_rcv(): receive broadcast nack message
+ * This function is here for backwards compatibility, since
+ * no BCAST_PROTOCOL/STATE messages occur from TIPC v2.5.
  */
 int tipc_link_bc_nack_rcv(struct tipc_link *l, struct sk_buff *skb,
 			  struct sk_buff_head *xmitq)