tipc: use message to abort connections when losing contact to node

In the current implementation, each 'struct tipc_node' instance keeps
a linked list of those ports/sockets that are connected to the node
represented by that struct. The purpose of this is to let the node
object know which sockets to alert when it loses contact with its peer
node, i.e., which sockets need to have their connections aborted.

This entails an unwanted direct reference from the node structure
back to the port/socket structure, and a need to grab port_lock
when we have to make an upcall to the port. We want to get rid of
this unecessary BH entry point into the socket, and also eliminate
its use of port_lock.

In this commit, we instead let the node struct keep list of "connected
socket" structs, which each represents a connected socket, but is
allocated independently by the node at the moment of connection. If
the node loses contact with its peer node, the list is traversed, and
a "connection abort" message is created for each entry in the list. The
message is sent to it respective connected socket using the ordinary
data path, and the receiving socket aborts its connections upon reception
of the message.

This enables us to get rid of the direct reference from 'struct node' to
´struct port', and another unwanted BH access point to the latter.

Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
Reviewed-by: Erik Hugne <erik.hugne@ericsson.com>
Reviewed-by: Ying Xue <ying.xue@windriver.com>
Signed-off-by: David S. Miller <davem@davemloft.net>
diff --git a/net/tipc/node.c b/net/tipc/node.c
index 6ea2c15..17e6378 100644
--- a/net/tipc/node.c
+++ b/net/tipc/node.c
@@ -51,6 +51,13 @@
 static u32 tipc_num_links;
 static DEFINE_SPINLOCK(node_list_lock);
 
+struct tipc_sock_conn {
+	u32 port;
+	u32 peer_port;
+	u32 peer_node;
+	struct list_head list;
+};
+
 /*
  * A trivial power-of-two bitmask technique is used for speed, since this
  * operation is done for every incoming TIPC packet. The number of hash table
@@ -101,6 +108,7 @@
 	INIT_HLIST_NODE(&n_ptr->hash);
 	INIT_LIST_HEAD(&n_ptr->list);
 	INIT_LIST_HEAD(&n_ptr->nsub);
+	INIT_LIST_HEAD(&n_ptr->conn_sks);
 	__skb_queue_head_init(&n_ptr->waiting_sks);
 
 	hlist_add_head_rcu(&n_ptr->hash, &node_htable[tipc_hashfn(addr)]);
@@ -138,6 +146,71 @@
 	spin_unlock_bh(&node_list_lock);
 }
 
+int tipc_node_add_conn(u32 dnode, u32 port, u32 peer_port)
+{
+	struct tipc_node *node;
+	struct tipc_sock_conn *conn;
+
+	if (in_own_node(dnode))
+		return 0;
+
+	node = tipc_node_find(dnode);
+	if (!node) {
+		pr_warn("Connecting sock to node 0x%x failed\n", dnode);
+		return -EHOSTUNREACH;
+	}
+	conn = kmalloc(sizeof(*conn), GFP_ATOMIC);
+	if (!conn)
+		return -EHOSTUNREACH;
+	conn->peer_node = dnode;
+	conn->port = port;
+	conn->peer_port = peer_port;
+
+	tipc_node_lock(node);
+	list_add_tail(&conn->list, &node->conn_sks);
+	tipc_node_unlock(node);
+	return 0;
+}
+
+void tipc_node_remove_conn(u32 dnode, u32 port)
+{
+	struct tipc_node *node;
+	struct tipc_sock_conn *conn, *safe;
+
+	if (in_own_node(dnode))
+		return;
+
+	node = tipc_node_find(dnode);
+	if (!node)
+		return;
+
+	tipc_node_lock(node);
+	list_for_each_entry_safe(conn, safe, &node->conn_sks, list) {
+		if (port != conn->port)
+			continue;
+		list_del(&conn->list);
+		kfree(conn);
+	}
+	tipc_node_unlock(node);
+}
+
+void tipc_node_abort_sock_conns(struct list_head *conns)
+{
+	struct tipc_sock_conn *conn, *safe;
+	struct sk_buff *buf;
+
+	list_for_each_entry_safe(conn, safe, conns, list) {
+		buf = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE, TIPC_CONN_MSG,
+				      SHORT_H_SIZE, 0, tipc_own_addr,
+				      conn->peer_node, conn->port,
+				      conn->peer_port, TIPC_ERR_NO_NODE);
+		if (likely(buf))
+			tipc_sk_rcv(buf);
+		list_del(&conn->list);
+		kfree(conn);
+	}
+}
+
 /**
  * tipc_node_link_up - handle addition of link
  *
@@ -476,6 +549,7 @@
 void tipc_node_unlock(struct tipc_node *node)
 {
 	LIST_HEAD(nsub_list);
+	LIST_HEAD(conn_sks);
 	struct sk_buff_head waiting_sks;
 	u32 addr = 0;
 
@@ -491,6 +565,7 @@
 	}
 	if (node->action_flags & TIPC_NOTIFY_NODE_DOWN) {
 		list_replace_init(&node->nsub, &nsub_list);
+		list_replace_init(&node->conn_sks, &conn_sks);
 		node->action_flags &= ~TIPC_NOTIFY_NODE_DOWN;
 	}
 	if (node->action_flags & TIPC_NOTIFY_NODE_UP) {
@@ -502,6 +577,9 @@
 	while (!skb_queue_empty(&waiting_sks))
 		tipc_sk_rcv(__skb_dequeue(&waiting_sks));
 
+	if (!list_empty(&conn_sks))
+		tipc_node_abort_sock_conns(&conn_sks);
+
 	if (!list_empty(&nsub_list))
 		tipc_nodesub_notify(&nsub_list);