tipc: make replicast a user selectable option

If the bearer carrying multicast messages supports broadcast, those
messages will be sent to all cluster nodes, irrespective of whether
these nodes host any actual destinations socket or not. This is clearly
wasteful if the cluster is large and there are only a few real
destinations for the message being sent.

In this commit we extend the eligibility of the newly introduced
"replicast" transmit option. We now make it possible for a user to
select which method he wants to be used, either as a mandatory setting
via setsockopt(), or as a relative setting where we let the broadcast
layer decide which method to use based on the ratio between cluster
size and the message's actual number of destination nodes.

In the latter case, a sending socket must stick to a previously
selected method until it enters an idle period of at least 5 seconds.
This eliminates the risk of message reordering caused by method change,
i.e., when changes to cluster size or number of destinations would
otherwise mandate a new method to be used.

Reviewed-by: Parthasarathy Bhuvaragan <parthasarathy.bhuvaragan@ericsson.com>
Acked-by: Ying Xue <ying.xue@windriver.com>
Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
Signed-off-by: David S. Miller <davem@davemloft.net>
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index 93b6ae3..5bec8aa 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -79,6 +79,7 @@
  * @rcv_unacked: # messages read by user, but not yet acked back to peer
  * @peer: 'connected' peer for dgram/rdm
  * @node: hash table node
+ * @mc_method: cookie for use between socket and broadcast layer
  * @rcu: rcu struct for tipc_sock
  */
 struct tipc_sock {
@@ -103,6 +104,7 @@
 	u16 rcv_win;
 	struct sockaddr_tipc peer;
 	struct rhash_head node;
+	struct tipc_mc_method mc_method;
 	struct rcu_head rcu;
 };
 
@@ -740,6 +742,7 @@
 	struct tipc_msg *hdr = &tsk->phdr;
 	struct net *net = sock_net(sk);
 	int mtu = tipc_bcast_get_mtu(net);
+	struct tipc_mc_method *method = &tsk->mc_method;
 	u32 domain = addr_domain(net, TIPC_CLUSTER_SCOPE);
 	struct sk_buff_head pkts;
 	struct tipc_nlist dsts;
@@ -773,7 +776,7 @@
 
 	/* Send message if build was successful */
 	if (unlikely(rc == dlen))
-		rc = tipc_mcast_xmit(net, &pkts, &dsts,
+		rc = tipc_mcast_xmit(net, &pkts, method, &dsts,
 				     &tsk->cong_link_cnt);
 
 	tipc_nlist_purge(&dsts);
@@ -2344,18 +2347,29 @@
 {
 	struct sock *sk = sock->sk;
 	struct tipc_sock *tsk = tipc_sk(sk);
-	u32 value;
+	u32 value = 0;
 	int res;
 
 	if ((lvl == IPPROTO_TCP) && (sock->type == SOCK_STREAM))
 		return 0;
 	if (lvl != SOL_TIPC)
 		return -ENOPROTOOPT;
-	if (ol < sizeof(value))
-		return -EINVAL;
-	res = get_user(value, (u32 __user *)ov);
-	if (res)
-		return res;
+
+	switch (opt) {
+	case TIPC_IMPORTANCE:
+	case TIPC_SRC_DROPPABLE:
+	case TIPC_DEST_DROPPABLE:
+	case TIPC_CONN_TIMEOUT:
+		if (ol < sizeof(value))
+			return -EINVAL;
+		res = get_user(value, (u32 __user *)ov);
+		if (res)
+			return res;
+		break;
+	default:
+		if (ov || ol)
+			return -EINVAL;
+	}
 
 	lock_sock(sk);
 
@@ -2376,6 +2390,14 @@
 		tipc_sk(sk)->conn_timeout = value;
 		/* no need to set "res", since already 0 at this point */
 		break;
+	case TIPC_MCAST_BROADCAST:
+		tsk->mc_method.rcast = false;
+		tsk->mc_method.mandatory = true;
+		break;
+	case TIPC_MCAST_REPLICAST:
+		tsk->mc_method.rcast = true;
+		tsk->mc_method.mandatory = true;
+		break;
 	default:
 		res = -EINVAL;
 	}