tipc: simplify bearer level broadcast

Until now, we have been keeping track of the exact set of broadcast
destinations though the help structure tipc_node_map. This leads us to
have to maintain a whole infrastructure for supporting this, including
a pseudo-bearer and a number of functions to manipulate both the bearers
and the node map correctly. Apart from the complexity, this approach is
also limiting, as struct tipc_node_map only can support cluster local
broadcast if we want to avoid it becoming excessively large. We want to
eliminate this limitation, in order to enable introduction of scoped
multicast in the future.

A closer analysis reveals that it is unnecessary maintaining this "full
set" overview; it is sufficient to keep a counter per bearer, indicating
how many nodes can be reached via this bearer at the moment. The protocol
is now robust enough to handle transitional discrepancies between the
nominal number of reachable destinations, as expected by the broadcast
protocol itself, and the number which is actually reachable at the
moment. The initial broadcast synchronization, in conjunction with the
retransmission mechanism, ensures that all packets will eventually be
acknowledged by the correct set of destinations.

This commit introduces these changes.

Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
Reviewed-by: Ying Xue <ying.xue@windriver.com>
Signed-off-by: David S. Miller <davem@davemloft.net>
diff --git a/net/tipc/bearer.c b/net/tipc/bearer.c
index 82b2786..62f47ec 100644
--- a/net/tipc/bearer.c
+++ b/net/tipc/bearer.c
@@ -193,10 +193,8 @@
 
 	rcu_read_lock();
 	b_ptr = rcu_dereference_rtnl(tn->bearer_list[bearer_id]);
-	if (b_ptr) {
-		tipc_bcbearer_sort(net, &b_ptr->nodes, dest, true);
+	if (b_ptr)
 		tipc_disc_add_dest(b_ptr->link_req);
-	}
 	rcu_read_unlock();
 }
 
@@ -207,10 +205,8 @@
 
 	rcu_read_lock();
 	b_ptr = rcu_dereference_rtnl(tn->bearer_list[bearer_id]);
-	if (b_ptr) {
-		tipc_bcbearer_sort(net, &b_ptr->nodes, dest, false);
+	if (b_ptr)
 		tipc_disc_remove_dest(b_ptr->link_req);
-	}
 	rcu_read_unlock();
 }
 
@@ -494,6 +490,33 @@
 	rcu_read_unlock();
 }
 
+/* tipc_bearer_bc_xmit() - broadcast buffers to all destinations
+ */
+void tipc_bearer_bc_xmit(struct net *net, u32 bearer_id,
+			 struct sk_buff_head *xmitq)
+{
+	struct tipc_net *tn = tipc_net(net);
+	int net_id = tn->net_id;
+	struct tipc_bearer *b;
+	struct sk_buff *skb, *tmp;
+	struct tipc_msg *hdr;
+
+	rcu_read_lock();
+	b = rcu_dereference_rtnl(tn->bearer_list[bearer_id]);
+	if (likely(b)) {
+		skb_queue_walk_safe(xmitq, skb, tmp) {
+			hdr = buf_msg(skb);
+			msg_set_non_seq(hdr, 1);
+			msg_set_mc_netid(hdr, net_id);
+			__skb_dequeue(xmitq);
+			b->media->send_msg(net, skb, b, &b->bcast_addr);
+			/* Until we remove cloning in tipc_l2_send_msg(): */
+			kfree_skb(skb);
+		}
+	}
+	rcu_read_unlock();
+}
+
 /**
  * tipc_l2_rcv_msg - handle incoming TIPC message from an interface
  * @buf: the received packet