tipc: convert node lock to rwlock

According to the node FSM a node in state SELF_UP_PEER_UP cannot
change state inside a lock context, except when a TUNNEL_PROTOCOL
(SYNCH or FAILOVER) packet arrives. However, the node's individual
links may still change state.

Since each link now is protected by its own spinlock, we finally have
the conditions in place to convert the node spinlock to an rwlock_t.
If the node state and arriving packet type are rigth, we can let the
link directly receive the packet under protection of its own spinlock
and the node lock in read mode. In all other cases we use the node
lock in write mode. This enables full concurrent execution between
parallel links during steady-state traffic situations, i.e., 99+ %
of the time.

This commit implements this change.

Reviewed-by: Ying Xue <ying.xue@windriver.com>
Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
Signed-off-by: David S. Miller <davem@davemloft.net>
diff --git a/net/tipc/node.c b/net/tipc/node.c
index 572063a..47d5f84 100644
--- a/net/tipc/node.c
+++ b/net/tipc/node.c
@@ -141,10 +141,63 @@
 	return NULL;
 }
 
+void tipc_node_read_lock(struct tipc_node *n)
+{
+	read_lock_bh(&n->lock);
+}
+
+void tipc_node_read_unlock(struct tipc_node *n)
+{
+	read_unlock_bh(&n->lock);
+}
+
+static void tipc_node_write_lock(struct tipc_node *n)
+{
+	write_lock_bh(&n->lock);
+}
+
+static void tipc_node_write_unlock(struct tipc_node *n)
+{
+	struct net *net = n->net;
+	u32 addr = 0;
+	u32 flags = n->action_flags;
+	u32 link_id = 0;
+	struct list_head *publ_list;
+
+	if (likely(!flags)) {
+		write_unlock_bh(&n->lock);
+		return;
+	}
+
+	addr = n->addr;
+	link_id = n->link_id;
+	publ_list = &n->publ_list;
+
+	n->action_flags &= ~(TIPC_NOTIFY_NODE_DOWN | TIPC_NOTIFY_NODE_UP |
+			     TIPC_NOTIFY_LINK_DOWN | TIPC_NOTIFY_LINK_UP);
+
+	write_unlock_bh(&n->lock);
+
+	if (flags & TIPC_NOTIFY_NODE_DOWN)
+		tipc_publ_notify(net, publ_list, addr);
+
+	if (flags & TIPC_NOTIFY_NODE_UP)
+		tipc_named_node_up(net, addr);
+
+	if (flags & TIPC_NOTIFY_LINK_UP)
+		tipc_nametbl_publish(net, TIPC_LINK_STATE, addr, addr,
+				     TIPC_NODE_SCOPE, link_id, addr);
+
+	if (flags & TIPC_NOTIFY_LINK_DOWN)
+		tipc_nametbl_withdraw(net, TIPC_LINK_STATE, addr,
+				      link_id, addr);
+}
+
 struct tipc_node *tipc_node_create(struct net *net, u32 addr, u16 capabilities)
 {
 	struct tipc_net *tn = net_generic(net, tipc_net_id);
 	struct tipc_node *n_ptr, *temp_node;
+	int i;
 
 	spin_lock_bh(&tn->node_list_lock);
 	n_ptr = tipc_node_find(net, addr);
@@ -159,7 +212,7 @@
 	n_ptr->net = net;
 	n_ptr->capabilities = capabilities;
 	kref_init(&n_ptr->kref);
-	spin_lock_init(&n_ptr->lock);
+	rwlock_init(&n_ptr->lock);
 	INIT_HLIST_NODE(&n_ptr->hash);
 	INIT_LIST_HEAD(&n_ptr->list);
 	INIT_LIST_HEAD(&n_ptr->publ_list);
@@ -168,6 +221,8 @@
 	skb_queue_head_init(&n_ptr->bc_entry.inputq1);
 	__skb_queue_head_init(&n_ptr->bc_entry.arrvq);
 	skb_queue_head_init(&n_ptr->bc_entry.inputq2);
+	for (i = 0; i < MAX_BEARERS; i++)
+		spin_lock_init(&n_ptr->links[i].lock);
 	hlist_add_head_rcu(&n_ptr->hash, &tn->node_htable[tipc_hashfn(addr)]);
 	list_for_each_entry_rcu(temp_node, &tn->node_list, list) {
 		if (n_ptr->addr < temp_node->addr)
@@ -246,9 +301,9 @@
 		pr_warn("Node subscribe rejected, unknown node 0x%x\n", addr);
 		return;
 	}
-	tipc_node_lock(n);
+	tipc_node_write_lock(n);
 	list_add_tail(subscr, &n->publ_list);
-	tipc_node_unlock(n);
+	tipc_node_write_unlock(n);
 	tipc_node_put(n);
 }
 
@@ -264,9 +319,9 @@
 		pr_warn("Node unsubscribe rejected, unknown node 0x%x\n", addr);
 		return;
 	}
-	tipc_node_lock(n);
+	tipc_node_write_lock(n);
 	list_del_init(subscr);
-	tipc_node_unlock(n);
+	tipc_node_write_unlock(n);
 	tipc_node_put(n);
 }
 
@@ -293,9 +348,9 @@
 	conn->port = port;
 	conn->peer_port = peer_port;
 
-	tipc_node_lock(node);
+	tipc_node_write_lock(node);
 	list_add_tail(&conn->list, &node->conn_sks);
-	tipc_node_unlock(node);
+	tipc_node_write_unlock(node);
 exit:
 	tipc_node_put(node);
 	return err;
@@ -313,14 +368,14 @@
 	if (!node)
 		return;
 
-	tipc_node_lock(node);
+	tipc_node_write_lock(node);
 	list_for_each_entry_safe(conn, safe, &node->conn_sks, list) {
 		if (port != conn->port)
 			continue;
 		list_del(&conn->list);
 		kfree(conn);
 	}
-	tipc_node_unlock(node);
+	tipc_node_write_unlock(node);
 	tipc_node_put(node);
 }
 
@@ -337,7 +392,7 @@
 	__skb_queue_head_init(&xmitq);
 
 	for (bearer_id = 0; bearer_id < MAX_BEARERS; bearer_id++) {
-		tipc_node_lock(n);
+		tipc_node_read_lock(n);
 		le = &n->links[bearer_id];
 		spin_lock_bh(&le->lock);
 		if (le->link) {
@@ -346,7 +401,7 @@
 			rc = tipc_link_timeout(le->link, &xmitq);
 		}
 		spin_unlock_bh(&le->lock);
-		tipc_node_unlock(n);
+		tipc_node_read_unlock(n);
 		tipc_bearer_xmit(n->net, bearer_id, &xmitq, &le->maddr);
 		if (rc & TIPC_LINK_DOWN_EVT)
 			tipc_node_link_down(n, bearer_id, false);
@@ -425,9 +480,9 @@
 static void tipc_node_link_up(struct tipc_node *n, int bearer_id,
 			      struct sk_buff_head *xmitq)
 {
-	tipc_node_lock(n);
+	tipc_node_write_lock(n);
 	__tipc_node_link_up(n, bearer_id, xmitq);
-	tipc_node_unlock(n);
+	tipc_node_write_unlock(n);
 }
 
 /**
@@ -516,7 +571,7 @@
 
 	__skb_queue_head_init(&xmitq);
 
-	tipc_node_lock(n);
+	tipc_node_write_lock(n);
 	if (!tipc_link_is_establishing(l)) {
 		__tipc_node_link_down(n, &bearer_id, &xmitq, &maddr);
 		if (delete) {
@@ -528,7 +583,7 @@
 		/* Defuse pending tipc_node_link_up() */
 		tipc_link_fsm_evt(l, LINK_RESET_EVT);
 	}
-	tipc_node_unlock(n);
+	tipc_node_write_unlock(n);
 	tipc_bearer_xmit(n->net, bearer_id, &xmitq, maddr);
 	tipc_sk_rcv(n->net, &le->inputq);
 }
@@ -561,7 +616,7 @@
 	if (!n)
 		return;
 
-	tipc_node_lock(n);
+	tipc_node_write_lock(n);
 
 	le = &n->links[b->identity];
 
@@ -656,7 +711,6 @@
 		if (n->state == NODE_FAILINGOVER)
 			tipc_link_fsm_evt(l, LINK_FAILOVER_BEGIN_EVT);
 		le->link = l;
-		spin_lock_init(&le->lock);
 		n->link_cnt++;
 		tipc_node_calculate_timer(n, l);
 		if (n->link_cnt == 1)
@@ -665,7 +719,7 @@
 	}
 	memcpy(&le->maddr, maddr, sizeof(*maddr));
 exit:
-	tipc_node_unlock(n);
+	tipc_node_write_unlock(n);
 	if (reset && !tipc_link_is_reset(l))
 		tipc_node_link_down(n, b->identity, false);
 	tipc_node_put(n);
@@ -873,24 +927,6 @@
 	pr_err("Illegal node fsm evt %x in state %x\n", evt, state);
 }
 
-bool tipc_node_filter_pkt(struct tipc_node *n, struct tipc_msg *hdr)
-{
-	int state = n->state;
-
-	if (likely(state == SELF_UP_PEER_UP))
-		return true;
-
-	if (state == SELF_LEAVING_PEER_DOWN)
-		return false;
-
-	if (state == SELF_DOWN_PEER_LEAVING) {
-		if (msg_peer_node_is_up(hdr))
-			return false;
-	}
-
-	return true;
-}
-
 static void node_lost_contact(struct tipc_node *n,
 			      struct sk_buff_head *inputq)
 {
@@ -952,56 +988,18 @@
 	if (bearer_id >= MAX_BEARERS)
 		goto exit;
 
-	tipc_node_lock(node);
+	tipc_node_read_lock(node);
 	link = node->links[bearer_id].link;
 	if (link) {
 		strncpy(linkname, link->name, len);
 		err = 0;
 	}
 exit:
-	tipc_node_unlock(node);
+	tipc_node_read_unlock(node);
 	tipc_node_put(node);
 	return err;
 }
 
-void tipc_node_unlock(struct tipc_node *node)
-{
-	struct net *net = node->net;
-	u32 addr = 0;
-	u32 flags = node->action_flags;
-	u32 link_id = 0;
-	struct list_head *publ_list;
-
-	if (likely(!flags)) {
-		spin_unlock_bh(&node->lock);
-		return;
-	}
-
-	addr = node->addr;
-	link_id = node->link_id;
-	publ_list = &node->publ_list;
-
-	node->action_flags &= ~(TIPC_NOTIFY_NODE_DOWN | TIPC_NOTIFY_NODE_UP |
-				TIPC_NOTIFY_LINK_DOWN | TIPC_NOTIFY_LINK_UP);
-
-	spin_unlock_bh(&node->lock);
-
-	if (flags & TIPC_NOTIFY_NODE_DOWN)
-		tipc_publ_notify(net, publ_list, addr);
-
-	if (flags & TIPC_NOTIFY_NODE_UP)
-		tipc_named_node_up(net, addr);
-
-	if (flags & TIPC_NOTIFY_LINK_UP)
-		tipc_nametbl_publish(net, TIPC_LINK_STATE, addr, addr,
-				     TIPC_NODE_SCOPE, link_id, addr);
-
-	if (flags & TIPC_NOTIFY_LINK_DOWN)
-		tipc_nametbl_withdraw(net, TIPC_LINK_STATE, addr,
-				      link_id, addr);
-
-}
-
 /* Caller should hold node lock for the passed node */
 static int __tipc_nl_add_node(struct tipc_nl_msg *msg, struct tipc_node *node)
 {
@@ -1048,40 +1046,38 @@
 int tipc_node_xmit(struct net *net, struct sk_buff_head *list,
 		   u32 dnode, int selector)
 {
-	struct tipc_link_entry *le;
+	struct tipc_link_entry *le = NULL;
 	struct tipc_node *n;
 	struct sk_buff_head xmitq;
-	struct tipc_media_addr *maddr = NULL;
 	int bearer_id = -1;
 	int rc = -EHOSTUNREACH;
 
 	__skb_queue_head_init(&xmitq);
 	n = tipc_node_find(net, dnode);
 	if (likely(n)) {
-		tipc_node_lock(n);
+		tipc_node_read_lock(n);
 		bearer_id = n->active_links[selector & 1];
 		if (bearer_id >= 0) {
 			le = &n->links[bearer_id];
-			maddr = &le->maddr;
 			spin_lock_bh(&le->lock);
-			if (likely(le->link))
-				rc = tipc_link_xmit(le->link, list, &xmitq);
+			rc = tipc_link_xmit(le->link, list, &xmitq);
 			spin_unlock_bh(&le->lock);
 		}
-		tipc_node_unlock(n);
+		tipc_node_read_unlock(n);
+		if (likely(!skb_queue_empty(&xmitq))) {
+			tipc_bearer_xmit(net, bearer_id, &xmitq, &le->maddr);
+			return 0;
+		}
 		if (unlikely(rc == -ENOBUFS))
 			tipc_node_link_down(n, bearer_id, false);
 		tipc_node_put(n);
+		return rc;
 	}
-	if (likely(!skb_queue_empty(&xmitq))) {
-		tipc_bearer_xmit(net, bearer_id, &xmitq, maddr);
-		return 0;
-	}
-	if (likely(in_own_node(net, dnode))) {
-		tipc_sk_rcv(net, list);
-		return 0;
-	}
-	return rc;
+
+	if (unlikely(!in_own_node(net, dnode)))
+		return rc;
+	tipc_sk_rcv(net, list);
+	return 0;
 }
 
 /* tipc_node_xmit_skb(): send single buffer to destination
@@ -1171,9 +1167,9 @@
 
 	/* Broadcast ACKs are sent on a unicast link */
 	if (rc & TIPC_LINK_SND_BC_ACK) {
-		tipc_node_lock(n);
+		tipc_node_read_lock(n);
 		tipc_link_build_ack_msg(le->link, &xmitq);
-		tipc_node_unlock(n);
+		tipc_node_read_unlock(n);
 	}
 
 	if (!skb_queue_empty(&xmitq))
@@ -1229,7 +1225,7 @@
 		}
 	}
 
-	/* Update node accesibility if applicable */
+	/* Check and update node accesibility if applicable */
 	if (state == SELF_UP_PEER_COMING) {
 		if (!tipc_link_is_up(l))
 			return true;
@@ -1245,6 +1241,9 @@
 		return true;
 	}
 
+	if (state == SELF_LEAVING_PEER_DOWN)
+		return false;
+
 	/* Ignore duplicate packets */
 	if ((usr != LINK_PROTOCOL) && less(oseqno, rcv_nxt))
 		return true;
@@ -1361,21 +1360,29 @@
 	else if (unlikely(n->bc_entry.link->acked != bc_ack))
 		tipc_bcast_ack_rcv(net, n->bc_entry.link, bc_ack);
 
-	tipc_node_lock(n);
-
-	/* Is reception permitted at the moment ? */
-	if (!tipc_node_filter_pkt(n, hdr))
-		goto unlock;
-
-	/* Check and if necessary update node state */
-	if (likely(tipc_node_check_state(n, skb, bearer_id, &xmitq))) {
+	/* Receive packet directly if conditions permit */
+	tipc_node_read_lock(n);
+	if (likely((n->state == SELF_UP_PEER_UP) && (usr != TUNNEL_PROTOCOL))) {
 		spin_lock_bh(&le->lock);
-		rc = tipc_link_rcv(le->link, skb, &xmitq);
+		if (le->link) {
+			rc = tipc_link_rcv(le->link, skb, &xmitq);
+			skb = NULL;
+		}
 		spin_unlock_bh(&le->lock);
-		skb = NULL;
 	}
-unlock:
-	tipc_node_unlock(n);
+	tipc_node_read_unlock(n);
+
+	/* Check/update node state before receiving */
+	if (unlikely(skb)) {
+		tipc_node_write_lock(n);
+		if (tipc_node_check_state(n, skb, bearer_id, &xmitq)) {
+			if (le->link) {
+				rc = tipc_link_rcv(le->link, skb, &xmitq);
+				skb = NULL;
+			}
+		}
+		tipc_node_write_unlock(n);
+	}
 
 	if (unlikely(rc & TIPC_LINK_UP_EVT))
 		tipc_node_link_up(n, bearer_id, &xmitq);
@@ -1440,15 +1447,15 @@
 				continue;
 		}
 
-		tipc_node_lock(node);
+		tipc_node_read_lock(node);
 		err = __tipc_nl_add_node(&msg, node);
 		if (err) {
 			last_addr = node->addr;
-			tipc_node_unlock(node);
+			tipc_node_read_unlock(node);
 			goto out;
 		}
 
-		tipc_node_unlock(node);
+		tipc_node_read_unlock(node);
 	}
 	done = 1;
 out: