tipc: eliminate race condition at multicast reception

In a previous commit in this series we resolved a race problem during
unicast message reception.

Here, we resolve the same problem at multicast reception. We apply the
same technique: an input queue serializing the delivery of arriving
buffers. The main difference is that here we do it in two steps.
First, the broadcast link feeds arriving buffers into the tail of an
arrival queue, which head is consumed at the socket level, and where
destination lookup is performed. Second, if the lookup is successful,
the resulting buffer clones are fed into a second queue, the input
queue. This queue is consumed at reception in the socket just like
in the unicast case. Both queues are protected by the same lock, -the
one of the input queue.

Reviewed-by: Ying Xue <ying.xue@windriver.com>
Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
Signed-off-by: David S. Miller <davem@davemloft.net>
diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c
index 3eaa931..81b1fef 100644
--- a/net/tipc/bcast.c
+++ b/net/tipc/bcast.c
@@ -79,6 +79,13 @@
 		tipc_link_reset_all(node);
 }
 
+void tipc_bclink_input(struct net *net)
+{
+	struct tipc_net *tn = net_generic(net, tipc_net_id);
+
+	tipc_sk_mcast_rcv(net, &tn->bclink->arrvq, &tn->bclink->inputq);
+}
+
 uint  tipc_bclink_get_mtu(void)
 {
 	return MAX_PKT_DEFAULT_MCAST;
@@ -356,7 +363,7 @@
 	tipc_node_unlock(n_ptr);
 }
 
-/* tipc_bclink_xmit - broadcast buffer chain to all nodes in cluster
+/* tipc_bclink_xmit - deliver buffer chain to all nodes in cluster
  *                    and to identified node local sockets
  * @net: the applicable net namespace
  * @list: chain of buffers containing message
@@ -371,6 +378,8 @@
 	int rc = 0;
 	int bc = 0;
 	struct sk_buff *skb;
+	struct sk_buff_head arrvq;
+	struct sk_buff_head inputq;
 
 	/* Prepare clone of message for local node */
 	skb = tipc_msg_reassemble(list);
@@ -379,7 +388,7 @@
 		return -EHOSTUNREACH;
 	}
 
-	/* Broadcast to all other nodes */
+	/* Broadcast to all nodes */
 	if (likely(bclink)) {
 		tipc_bclink_lock(net);
 		if (likely(bclink->bcast_nodes.count)) {
@@ -399,12 +408,15 @@
 	if (unlikely(!bc))
 		__skb_queue_purge(list);
 
-	/* Deliver message clone */
-	if (likely(!rc))
-		tipc_sk_mcast_rcv(net, skb);
-	else
+	if (unlikely(rc)) {
 		kfree_skb(skb);
-
+		return rc;
+	}
+	/* Deliver message clone */
+	__skb_queue_head_init(&arrvq);
+	skb_queue_head_init(&inputq);
+	__skb_queue_tail(&arrvq, skb);
+	tipc_sk_mcast_rcv(net, &arrvq, &inputq);
 	return rc;
 }
 
@@ -449,7 +461,7 @@
 	int deferred = 0;
 	int pos = 0;
 	struct sk_buff *iskb;
-	struct sk_buff_head msgs;
+	struct sk_buff_head *arrvq, *inputq;
 
 	/* Screen out unwanted broadcast messages */
 	if (msg_mc_netid(msg) != tn->net_id)
@@ -486,6 +498,8 @@
 	/* Handle in-sequence broadcast message */
 	seqno = msg_seqno(msg);
 	next_in = mod(node->bclink.last_in + 1);
+	arrvq = &tn->bclink->arrvq;
+	inputq = &tn->bclink->inputq;
 
 	if (likely(seqno == next_in)) {
 receive:
@@ -493,21 +507,26 @@
 		if (likely(msg_isdata(msg))) {
 			tipc_bclink_lock(net);
 			bclink_accept_pkt(node, seqno);
+			spin_lock_bh(&inputq->lock);
+			__skb_queue_tail(arrvq, buf);
+			spin_unlock_bh(&inputq->lock);
+			node->action_flags |= TIPC_BCAST_MSG_EVT;
 			tipc_bclink_unlock(net);
 			tipc_node_unlock(node);
-			if (likely(msg_mcast(msg)))
-				tipc_sk_mcast_rcv(net, buf);
-			else
-				kfree_skb(buf);
 		} else if (msg_user(msg) == MSG_BUNDLER) {
 			tipc_bclink_lock(net);
 			bclink_accept_pkt(node, seqno);
 			bcl->stats.recv_bundles++;
 			bcl->stats.recv_bundled += msg_msgcnt(msg);
+			pos = 0;
+			while (tipc_msg_extract(buf, &iskb, &pos)) {
+				spin_lock_bh(&inputq->lock);
+				__skb_queue_tail(arrvq, iskb);
+				spin_unlock_bh(&inputq->lock);
+			}
+			node->action_flags |= TIPC_BCAST_MSG_EVT;
 			tipc_bclink_unlock(net);
 			tipc_node_unlock(node);
-			while (tipc_msg_extract(buf, &iskb, &pos))
-				tipc_sk_mcast_rcv(net, iskb);
 		} else if (msg_user(msg) == MSG_FRAGMENTER) {
 			tipc_buf_append(&node->bclink.reasm_buf, &buf);
 			if (unlikely(!buf && !node->bclink.reasm_buf))
@@ -523,14 +542,6 @@
 			}
 			tipc_bclink_unlock(net);
 			tipc_node_unlock(node);
-		} else if (msg_user(msg) == NAME_DISTRIBUTOR) {
-			tipc_bclink_lock(net);
-			bclink_accept_pkt(node, seqno);
-			tipc_bclink_unlock(net);
-			tipc_node_unlock(node);
-			skb_queue_head_init(&msgs);
-			skb_queue_tail(&msgs, buf);
-			tipc_named_rcv(net, &msgs);
 		} else {
 			tipc_bclink_lock(net);
 			bclink_accept_pkt(node, seqno);
@@ -950,6 +961,8 @@
 	skb_queue_head_init(&bcl->wakeupq);
 	bcl->next_out_no = 1;
 	spin_lock_init(&bclink->node.lock);
+	__skb_queue_head_init(&bclink->arrvq);
+	skb_queue_head_init(&bclink->inputq);
 	bcl->owner = &bclink->node;
 	bcl->owner->net = net;
 	bcl->max_pkt = MAX_PKT_DEFAULT_MCAST;