tipc: decouple the relationship between bearer and link

Currently on both paths of message transmission and reception, the
read lock of tipc_net_lock must be held before bearer is accessed,
while the write lock of tipc_net_lock has to be taken before bearer
is configured. Although it can ensure that bearer is always valid on
the two data paths, link and bearer is closely bound together.

So as the part of effort of removing tipc_net_lock, the locking
policy of bearer protection will be adjusted as below: on the two
data paths, RCU is used, and on the configuration path of bearer,
RTNL lock is applied.

Now RCU just covers the path of message reception. To make it possible
to protect the path of message transmission with RCU, link should not
use its stored bearer pointer to access bearer, but it should use the
bearer identity of its attached bearer as index to get bearer instance
from bearer_list array, which can help us decouple the relationship
between bearer and link. As a result, bearer on the path of message
transmission can be safely protected by RCU when we access bearer_list
array within RCU lock protection.

Signed-off-by: Ying Xue <ying.xue@windriver.com>
Reviewed-by: Jon Maloy <jon.maloy@ericsson.com>
Reviewed-by: Erik Hugne <erik.hugne@ericsson.com>
Tested-by: Erik Hugne <erik.hugne@ericsson.com>
Signed-off-by: David S. Miller <davem@davemloft.net>
diff --git a/net/tipc/link.c b/net/tipc/link.c
index c5190ab..229d478 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -101,9 +101,18 @@
 
 static void link_init_max_pkt(struct tipc_link *l_ptr)
 {
+	struct tipc_bearer *b_ptr;
 	u32 max_pkt;
 
-	max_pkt = (l_ptr->b_ptr->mtu & ~3);
+	rcu_read_lock();
+	b_ptr = rcu_dereference_rtnl(bearer_list[l_ptr->bearer_id]);
+	if (!b_ptr) {
+		rcu_read_unlock();
+		return;
+	}
+	max_pkt = (b_ptr->mtu & ~3);
+	rcu_read_unlock();
+
 	if (max_pkt > MAX_MSG_SIZE)
 		max_pkt = MAX_MSG_SIZE;
 
@@ -248,7 +257,7 @@
 	l_ptr->owner = n_ptr;
 	l_ptr->checkpoint = 1;
 	l_ptr->peer_session = INVALID_SESSION;
-	l_ptr->b_ptr = b_ptr;
+	l_ptr->bearer_id = b_ptr->identity;
 	link_set_supervision_props(l_ptr, b_ptr->tolerance);
 	l_ptr->state = RESET_UNKNOWN;
 
@@ -263,6 +272,7 @@
 	l_ptr->priority = b_ptr->priority;
 	tipc_link_set_queue_limits(l_ptr, b_ptr->window);
 
+	l_ptr->net_plane = b_ptr->net_plane;
 	link_init_max_pkt(l_ptr);
 
 	l_ptr->next_out_no = 1;
@@ -426,7 +436,7 @@
 		return;
 
 	tipc_node_link_down(l_ptr->owner, l_ptr);
-	tipc_bearer_remove_dest(l_ptr->b_ptr, l_ptr->addr);
+	tipc_bearer_remove_dest(l_ptr->bearer_id, l_ptr->addr);
 
 	if (was_active_link && tipc_node_active_links(l_ptr->owner)) {
 		l_ptr->reset_checkpoint = checkpoint;
@@ -477,7 +487,7 @@
 {
 	l_ptr->next_in_no = l_ptr->stats.recv_info = 1;
 	tipc_node_link_up(l_ptr->owner, l_ptr);
-	tipc_bearer_add_dest(l_ptr->b_ptr, l_ptr->addr);
+	tipc_bearer_add_dest(l_ptr->bearer_id, l_ptr->addr);
 }
 
 /**
@@ -777,7 +787,7 @@
 	if (likely(!link_congested(l_ptr))) {
 		link_add_to_outqueue(l_ptr, buf, msg);
 
-		tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr);
+		tipc_bearer_send(l_ptr->bearer_id, buf, &l_ptr->media_addr);
 		l_ptr->unacked_window = 0;
 		return dsz;
 	}
@@ -941,7 +951,7 @@
 	if (likely(!link_congested(l_ptr))) {
 		if (likely(msg_size(msg) <= l_ptr->max_pkt)) {
 			link_add_to_outqueue(l_ptr, buf, msg);
-			tipc_bearer_send(l_ptr->b_ptr, buf,
+			tipc_bearer_send(l_ptr->bearer_id, buf,
 					 &l_ptr->media_addr);
 			l_ptr->unacked_window = 0;
 			return res;
@@ -1204,7 +1214,7 @@
 	if (r_q_size && buf) {
 		msg_set_ack(buf_msg(buf), mod(l_ptr->next_in_no - 1));
 		msg_set_bcast_ack(buf_msg(buf), l_ptr->owner->bclink.last_in);
-		tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr);
+		tipc_bearer_send(l_ptr->bearer_id, buf, &l_ptr->media_addr);
 		l_ptr->retransm_queue_head = mod(++r_q_head);
 		l_ptr->retransm_queue_size = --r_q_size;
 		l_ptr->stats.retransmitted++;
@@ -1216,7 +1226,7 @@
 	if (buf) {
 		msg_set_ack(buf_msg(buf), mod(l_ptr->next_in_no - 1));
 		msg_set_bcast_ack(buf_msg(buf), l_ptr->owner->bclink.last_in);
-		tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr);
+		tipc_bearer_send(l_ptr->bearer_id, buf, &l_ptr->media_addr);
 		l_ptr->unacked_window = 0;
 		kfree_skb(buf);
 		l_ptr->proto_msg_queue = NULL;
@@ -1233,7 +1243,8 @@
 		if (mod(next - first) < l_ptr->queue_limit[0]) {
 			msg_set_ack(msg, mod(l_ptr->next_in_no - 1));
 			msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);
-			tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr);
+			tipc_bearer_send(l_ptr->bearer_id, buf,
+					 &l_ptr->media_addr);
 			if (msg_user(msg) == MSG_BUNDLER)
 				msg_set_type(msg, CLOSED_MSG);
 			l_ptr->next_out = buf->next;
@@ -1352,7 +1363,7 @@
 		msg = buf_msg(buf);
 		msg_set_ack(msg, mod(l_ptr->next_in_no - 1));
 		msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);
-		tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr);
+		tipc_bearer_send(l_ptr->bearer_id, buf, &l_ptr->media_addr);
 		buf = buf->next;
 		retransmits--;
 		l_ptr->stats.retransmitted++;
@@ -1440,7 +1451,7 @@
 /**
  * tipc_rcv - process TIPC packets/messages arriving from off-node
  * @head: pointer to message buffer chain
- * @tb_ptr: pointer to bearer message arrived on
+ * @b_ptr: pointer to bearer message arrived on
  *
  * Invoked with no locks held.  Bearer pointer must point to a valid bearer
  * structure (i.e. cannot be NULL), but bearer can be inactive.
@@ -1752,7 +1763,7 @@
 
 	/* Create protocol message with "out-of-sequence" sequence number */
 	msg_set_type(msg, msg_typ);
-	msg_set_net_plane(msg, l_ptr->b_ptr->net_plane);
+	msg_set_net_plane(msg, l_ptr->net_plane);
 	msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);
 	msg_set_last_bcast(msg, tipc_bclink_get_last_sent());
 
@@ -1818,7 +1829,7 @@
 	skb_copy_to_linear_data(buf, msg, sizeof(l_ptr->proto_msg));
 	buf->priority = TC_PRIO_CONTROL;
 
-	tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr);
+	tipc_bearer_send(l_ptr->bearer_id, buf, &l_ptr->media_addr);
 	l_ptr->unacked_window = 0;
 	kfree_skb(buf);
 }
@@ -1843,9 +1854,9 @@
 	/* record unnumbered packet arrival (force mismatch on next timeout) */
 	l_ptr->checkpoint--;
 
-	if (l_ptr->b_ptr->net_plane != msg_net_plane(msg))
+	if (l_ptr->net_plane != msg_net_plane(msg))
 		if (tipc_own_addr > msg_prevnode(msg))
-			l_ptr->b_ptr->net_plane = msg_net_plane(msg);
+			l_ptr->net_plane = msg_net_plane(msg);
 
 	switch (msg_type(msg)) {
 
@@ -2793,7 +2804,13 @@
 
 static void link_print(struct tipc_link *l_ptr, const char *str)
 {
-	pr_info("%s Link %x<%s>:", str, l_ptr->addr, l_ptr->b_ptr->name);
+	struct tipc_bearer *b_ptr;
+
+	rcu_read_lock();
+	b_ptr = rcu_dereference_rtnl(bearer_list[l_ptr->bearer_id]);
+	if (b_ptr)
+		pr_info("%s Link %x<%s>:", str, l_ptr->addr, b_ptr->name);
+	rcu_read_unlock();
 
 	if (link_working_unknown(l_ptr))
 		pr_cont(":WU\n");