Merge branch 'tipc-next'

Ying Xue says:

====================
tipc: clean up bearer and node layer

This is another commit series which aims at facilitating future
changes to the locking policy around nodes, links and bearers.

Currently, the tipc routing hierarchy comprises the structures 'node',
'link' and 'bearer'. The whole hierarchy is protected by a big
read/write lock (tipc_net_lock), to ensure that nothing is added or
removed while any of these structures is being accessed. Obviously
the locking policy makes node, link and bearer components closely
bound together so that their relationship becomes extremely complex.
In the worst case, such locking policy not only has a negative
influence on performance, but also it's prone to lead to deadlock
occasionally.

In order to decouple the complex relationship between bearer and node
as well as link, the locking policy is adjusted as follows:

- Bearer level
  RTNL lock is used on update side, and RCU is used on read side.
  Meanwhile, all bearer instances including broadcast bearer are
  saved into bearer_list array.

- Node and link level
  All node instances are saved into two tipc_node_list and node_htable
  lists. The two lists are protected by node_list_lock on write side,
  and they are guarded with RCU lock on read side. All members in node
  structure including link instances are protected by node spin lock.

- The relationship between bearer and node
  When link accesses bearer, it first needs to find the bearer with
  its bearer identity from the bearer_list array. When bearer accesses
  node, it can iterate the node_htable hash list with the node address
  to find the corresponding node.

In the new locking policy, every component has its private locking
solution and the relationship between bearer and node is very simple,
that is, they can find each other with node address or bearer identity
from node_htable hash list or bearer_list array.

But, prior to these changes, we need to do some necessary cleanup and
code consolidation. This is what we do with this commit series. In a
later series we will replace net_lock with RTNL as well as RCU lock
to deploy the new locking policy.
====================

Signed-off-by: David S. Miller <davem@davemloft.net>
diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c
index e0feb7e..95ab5ef 100644
--- a/net/tipc/bcast.c
+++ b/net/tipc/bcast.c
@@ -41,9 +41,9 @@
 #include "bcast.h"
 #include "name_distr.h"
 
-#define MAX_PKT_DEFAULT_MCAST 1500	/* bcast link max packet size (fixed) */
-
-#define BCLINK_WIN_DEFAULT 20		/* bcast link window size (default) */
+#define	MAX_PKT_DEFAULT_MCAST	1500	/* bcast link max packet size (fixed) */
+#define	BCLINK_WIN_DEFAULT	20	/* bcast link window size (default) */
+#define	BCBEARER		MAX_BEARERS
 
 /**
  * struct tipc_bcbearer_pair - a pair of bearers used by broadcast link
@@ -668,9 +668,8 @@
 	memset(bp_temp, 0, sizeof(bcbearer->bpairs_temp));
 
 	for (b_index = 0; b_index < MAX_BEARERS; b_index++) {
-		struct tipc_bearer *b = &tipc_bearers[b_index];
-
-		if (!b->active || !b->nodes.count)
+		struct tipc_bearer *b = bearer_list[b_index];
+		if (!b || !b->nodes.count)
 			continue;
 
 		if (!bp_temp[b->priority].primary)
@@ -785,6 +784,7 @@
 	bcl->max_pkt = MAX_PKT_DEFAULT_MCAST;
 	tipc_link_set_queue_limits(bcl, BCLINK_WIN_DEFAULT);
 	bcl->b_ptr = &bcbearer->bearer;
+	bearer_list[BCBEARER] = &bcbearer->bearer;
 	bcl->state = WORKING_WORKING;
 	strlcpy(bcl->name, tipc_bclink_name, TIPC_MAX_LINK_NAME);
 }
@@ -795,6 +795,7 @@
 	tipc_link_purge_queues(bcl);
 	spin_unlock_bh(&bc_lock);
 
+	bearer_list[BCBEARER] = NULL;
 	memset(bclink, 0, sizeof(*bclink));
 	memset(bcbearer, 0, sizeof(*bcbearer));
 }
diff --git a/net/tipc/bearer.c b/net/tipc/bearer.c
index 7f1f95c..ed45f97 100644
--- a/net/tipc/bearer.c
+++ b/net/tipc/bearer.c
@@ -49,7 +49,7 @@
 	NULL
 };
 
-struct tipc_bearer tipc_bearers[MAX_BEARERS];
+struct tipc_bearer *bearer_list[MAX_BEARERS + 1];
 
 static void bearer_disable(struct tipc_bearer *b_ptr, bool shutting_down);
 
@@ -177,8 +177,9 @@
 	struct tipc_bearer *b_ptr;
 	u32 i;
 
-	for (i = 0, b_ptr = tipc_bearers; i < MAX_BEARERS; i++, b_ptr++) {
-		if (b_ptr->active && (!strcmp(b_ptr->name, name)))
+	for (i = 0; i < MAX_BEARERS; i++) {
+		b_ptr = bearer_list[i];
+		if (b_ptr && (!strcmp(b_ptr->name, name)))
 			return b_ptr;
 	}
 	return NULL;
@@ -200,8 +201,10 @@
 	read_lock_bh(&tipc_net_lock);
 	for (i = 0; media_info_array[i] != NULL; i++) {
 		for (j = 0; j < MAX_BEARERS; j++) {
-			b = &tipc_bearers[j];
-			if (b->active && (b->media == media_info_array[i])) {
+			b = bearer_list[j];
+			if (!b)
+				continue;
+			if (b->media == media_info_array[i]) {
 				tipc_cfg_append_tlv(buf, TIPC_TLV_BEARER_NAME,
 						    b->name,
 						    strlen(b->name) + 1);
@@ -284,16 +287,17 @@
 	bearer_id = MAX_BEARERS;
 	with_this_prio = 1;
 	for (i = MAX_BEARERS; i-- != 0; ) {
-		if (!tipc_bearers[i].active) {
+		b_ptr = bearer_list[i];
+		if (!b_ptr) {
 			bearer_id = i;
 			continue;
 		}
-		if (!strcmp(name, tipc_bearers[i].name)) {
+		if (!strcmp(name, b_ptr->name)) {
 			pr_warn("Bearer <%s> rejected, already enabled\n",
 				name);
 			goto exit;
 		}
-		if ((tipc_bearers[i].priority == priority) &&
+		if ((b_ptr->priority == priority) &&
 		    (++with_this_prio > 2)) {
 			if (priority-- == 0) {
 				pr_warn("Bearer <%s> rejected, duplicate priority\n",
@@ -311,7 +315,11 @@
 		goto exit;
 	}
 
-	b_ptr = &tipc_bearers[bearer_id];
+	b_ptr = kzalloc(sizeof(*b_ptr), GFP_ATOMIC);
+	if (!b_ptr) {
+		res = -ENOMEM;
+		goto exit;
+	}
 	strcpy(b_ptr->name, name);
 	b_ptr->media = m_ptr;
 	res = m_ptr->enable_media(b_ptr);
@@ -325,7 +333,6 @@
 	b_ptr->tolerance = m_ptr->tolerance;
 	b_ptr->window = m_ptr->window;
 	b_ptr->net_plane = bearer_id + 'A';
-	b_ptr->active = 1;
 	b_ptr->priority = priority;
 
 	res = tipc_disc_create(b_ptr, &b_ptr->bcast_addr, disc_domain);
@@ -335,6 +342,9 @@
 			name);
 		goto exit;
 	}
+
+	bearer_list[bearer_id] = b_ptr;
+
 	pr_info("Enabled bearer <%s>, discovery domain %s, priority %u\n",
 		name,
 		tipc_addr_string_fill(addr_string, disc_domain), priority);
@@ -362,13 +372,22 @@
  */
 static void bearer_disable(struct tipc_bearer *b_ptr, bool shutting_down)
 {
+	u32 i;
+
 	pr_info("Disabling bearer <%s>\n", b_ptr->name);
 	b_ptr->media->disable_media(b_ptr);
 
 	tipc_link_delete_list(b_ptr->identity, shutting_down);
 	if (b_ptr->link_req)
 		tipc_disc_delete(b_ptr->link_req);
-	memset(b_ptr, 0, sizeof(struct tipc_bearer));
+
+	for (i = 0; i < MAX_BEARERS; i++) {
+		if (b_ptr == bearer_list[i]) {
+			bearer_list[i] = NULL;
+			break;
+		}
+	}
+	kfree(b_ptr);
 }
 
 int tipc_disable_bearer(const char *name)
@@ -603,10 +622,14 @@
 
 void tipc_bearer_stop(void)
 {
+	struct tipc_bearer *b_ptr;
 	u32 i;
 
 	for (i = 0; i < MAX_BEARERS; i++) {
-		if (tipc_bearers[i].active)
-			bearer_disable(&tipc_bearers[i], true);
+		b_ptr = bearer_list[i];
+		if (b_ptr) {
+			bearer_disable(b_ptr, true);
+			bearer_list[i] = NULL;
+		}
 	}
 }
diff --git a/net/tipc/bearer.h b/net/tipc/bearer.h
index 425dd81..3f6d7d0 100644
--- a/net/tipc/bearer.h
+++ b/net/tipc/bearer.h
@@ -118,7 +118,6 @@
  * @tolerance: default link tolerance for bearer
  * @identity: array index of this bearer within TIPC bearer array
  * @link_req: ptr to (optional) structure making periodic link setup requests
- * @active: non-zero if bearer structure is represents a bearer
  * @net_plane: network plane ('A' through 'H') currently associated with bearer
  * @nodes: indicates which nodes in cluster can be reached through bearer
  *
@@ -138,7 +137,6 @@
 	u32 tolerance;
 	u32 identity;
 	struct tipc_link_req *link_req;
-	int active;
 	char net_plane;
 	struct tipc_node_map nodes;
 };
@@ -150,7 +148,7 @@
 
 struct tipc_link;
 
-extern struct tipc_bearer tipc_bearers[];
+extern struct tipc_bearer *bearer_list[];
 
 /*
  * TIPC routines available to supported media types
diff --git a/net/tipc/config.c b/net/tipc/config.c
index e6d7216..4b981c0 100644
--- a/net/tipc/config.c
+++ b/net/tipc/config.c
@@ -43,13 +43,11 @@
 #define REPLY_TRUNCATED "<truncated>\n"
 
 static DEFINE_MUTEX(config_mutex);
-static struct tipc_server cfgsrv;
 
 static const void *req_tlv_area;	/* request message TLV area */
 static int req_tlv_space;		/* request message TLV area size */
 static int rep_headroom;		/* reply message headroom to use */
 
-
 struct sk_buff *tipc_cfg_reply_alloc(int payload_size)
 {
 	struct sk_buff *buf;
@@ -185,18 +183,6 @@
 	return tipc_cfg_reply_none();
 }
 
-static struct sk_buff *cfg_set_remote_mng(void)
-{
-	u32 value;
-
-	if (!TLV_CHECK(req_tlv_area, req_tlv_space, TIPC_TLV_UNSIGNED))
-		return tipc_cfg_reply_error_string(TIPC_CFG_TLV_ERROR);
-
-	value = ntohl(*(__be32 *)TLV_DATA(req_tlv_area));
-	tipc_remote_management = (value != 0);
-	return tipc_cfg_reply_none();
-}
-
 static struct sk_buff *cfg_set_max_ports(void)
 {
 	u32 value;
@@ -247,21 +233,10 @@
 	/* Check command authorization */
 	if (likely(in_own_node(orig_node))) {
 		/* command is permitted */
-	} else if (cmd >= 0x8000) {
+	} else {
 		rep_tlv_buf = tipc_cfg_reply_error_string(TIPC_CFG_NOT_SUPPORTED
 							  " (cannot be done remotely)");
 		goto exit;
-	} else if (!tipc_remote_management) {
-		rep_tlv_buf = tipc_cfg_reply_error_string(TIPC_CFG_NO_REMOTE);
-		goto exit;
-	} else if (cmd >= 0x4000) {
-		u32 domain = 0;
-
-		if ((tipc_nametbl_translate(TIPC_ZM_SRV, 0, &domain) == 0) ||
-		    (domain != orig_node)) {
-			rep_tlv_buf = tipc_cfg_reply_error_string(TIPC_CFG_NOT_ZONE_MSTR);
-			goto exit;
-		}
 	}
 
 	/* Call appropriate processing routine */
@@ -310,18 +285,12 @@
 	case TIPC_CMD_SET_NODE_ADDR:
 		rep_tlv_buf = cfg_set_own_addr();
 		break;
-	case TIPC_CMD_SET_REMOTE_MNG:
-		rep_tlv_buf = cfg_set_remote_mng();
-		break;
 	case TIPC_CMD_SET_MAX_PORTS:
 		rep_tlv_buf = cfg_set_max_ports();
 		break;
 	case TIPC_CMD_SET_NETID:
 		rep_tlv_buf = cfg_set_netid();
 		break;
-	case TIPC_CMD_GET_REMOTE_MNG:
-		rep_tlv_buf = tipc_cfg_reply_unsigned(tipc_remote_management);
-		break;
 	case TIPC_CMD_GET_MAX_PORTS:
 		rep_tlv_buf = tipc_cfg_reply_unsigned(tipc_max_ports);
 		break;
@@ -345,6 +314,8 @@
 	case TIPC_CMD_SET_MAX_PUBL:
 	case TIPC_CMD_GET_MAX_PUBL:
 	case TIPC_CMD_SET_LOG_SIZE:
+	case TIPC_CMD_SET_REMOTE_MNG:
+	case TIPC_CMD_GET_REMOTE_MNG:
 	case TIPC_CMD_DUMP_LOG:
 		rep_tlv_buf = tipc_cfg_reply_error_string(TIPC_CFG_NOT_SUPPORTED
 							  " (obsolete command)");
@@ -369,75 +340,3 @@
 	mutex_unlock(&config_mutex);
 	return rep_tlv_buf;
 }
-
-static void cfg_conn_msg_event(int conid, struct sockaddr_tipc *addr,
-			       void *usr_data, void *buf, size_t len)
-{
-	struct tipc_cfg_msg_hdr *req_hdr;
-	struct tipc_cfg_msg_hdr *rep_hdr;
-	struct sk_buff *rep_buf;
-
-	/* Validate configuration message header (ignore invalid message) */
-	req_hdr = (struct tipc_cfg_msg_hdr *)buf;
-	if ((len < sizeof(*req_hdr)) ||
-	    (len != TCM_ALIGN(ntohl(req_hdr->tcm_len))) ||
-	    (ntohs(req_hdr->tcm_flags) != TCM_F_REQUEST)) {
-		pr_warn("Invalid configuration message discarded\n");
-		return;
-	}
-
-	/* Generate reply for request (if can't, return request) */
-	rep_buf = tipc_cfg_do_cmd(addr->addr.id.node, ntohs(req_hdr->tcm_type),
-				  buf + sizeof(*req_hdr),
-				  len - sizeof(*req_hdr),
-				  BUF_HEADROOM + MAX_H_SIZE + sizeof(*rep_hdr));
-	if (rep_buf) {
-		skb_push(rep_buf, sizeof(*rep_hdr));
-		rep_hdr = (struct tipc_cfg_msg_hdr *)rep_buf->data;
-		memcpy(rep_hdr, req_hdr, sizeof(*rep_hdr));
-		rep_hdr->tcm_len = htonl(rep_buf->len);
-		rep_hdr->tcm_flags &= htons(~TCM_F_REQUEST);
-		tipc_conn_sendmsg(&cfgsrv, conid, addr, rep_buf->data,
-				  rep_buf->len);
-		kfree_skb(rep_buf);
-	}
-}
-
-static struct sockaddr_tipc cfgsrv_addr __read_mostly = {
-	.family			= AF_TIPC,
-	.addrtype		= TIPC_ADDR_NAMESEQ,
-	.addr.nameseq.type	= TIPC_CFG_SRV,
-	.addr.nameseq.lower	= 0,
-	.addr.nameseq.upper	= 0,
-	.scope			= TIPC_ZONE_SCOPE
-};
-
-static struct tipc_server cfgsrv __read_mostly = {
-	.saddr			= &cfgsrv_addr,
-	.imp			= TIPC_CRITICAL_IMPORTANCE,
-	.type			= SOCK_RDM,
-	.max_rcvbuf_size	= 64 * 1024,
-	.name			= "cfg_server",
-	.tipc_conn_recvmsg	= cfg_conn_msg_event,
-	.tipc_conn_new		= NULL,
-	.tipc_conn_shutdown	= NULL
-};
-
-int tipc_cfg_init(void)
-{
-	return tipc_server_start(&cfgsrv);
-}
-
-void tipc_cfg_reinit(void)
-{
-	tipc_server_stop(&cfgsrv);
-
-	cfgsrv_addr.addr.nameseq.lower = tipc_own_addr;
-	cfgsrv_addr.addr.nameseq.upper = tipc_own_addr;
-	tipc_server_start(&cfgsrv);
-}
-
-void tipc_cfg_stop(void)
-{
-	tipc_server_stop(&cfgsrv);
-}
diff --git a/net/tipc/config.h b/net/tipc/config.h
index 1f252f3..47b1bf1 100644
--- a/net/tipc/config.h
+++ b/net/tipc/config.h
@@ -64,9 +64,4 @@
 struct sk_buff *tipc_cfg_do_cmd(u32 orig_node, u16 cmd,
 				const void *req_tlv_area, int req_tlv_space,
 				int headroom);
-
-int  tipc_cfg_init(void);
-void tipc_cfg_reinit(void);
-void tipc_cfg_stop(void);
-
 #endif
diff --git a/net/tipc/core.c b/net/tipc/core.c
index e2491b3..50d5742 100644
--- a/net/tipc/core.c
+++ b/net/tipc/core.c
@@ -50,7 +50,6 @@
 u32 tipc_own_addr __read_mostly;
 int tipc_max_ports __read_mostly;
 int tipc_net_id __read_mostly;
-int tipc_remote_management __read_mostly;
 int sysctl_tipc_rmem[3] __read_mostly;	/* min/default/max */
 
 /**
@@ -85,7 +84,6 @@
 	tipc_net_stop();
 	tipc_bearer_cleanup();
 	tipc_netlink_stop();
-	tipc_cfg_stop();
 	tipc_subscr_stop();
 	tipc_nametbl_stop();
 	tipc_ref_table_stop();
@@ -130,18 +128,12 @@
 	if (err)
 		goto out_subscr;
 
-	err = tipc_cfg_init();
-	if (err)
-		goto out_cfg;
-
 	err = tipc_bearer_setup();
 	if (err)
 		goto out_bearer;
 
 	return 0;
 out_bearer:
-	tipc_cfg_stop();
-out_cfg:
 	tipc_subscr_stop();
 out_subscr:
 	tipc_unregister_sysctl();
@@ -166,7 +158,6 @@
 	pr_info("Activated (version " TIPC_MOD_VER ")\n");
 
 	tipc_own_addr = 0;
-	tipc_remote_management = 1;
 	tipc_max_ports = CONFIG_TIPC_PORTS;
 	tipc_net_id = 4711;
 
diff --git a/net/tipc/core.h b/net/tipc/core.h
index 4dfe137..8985bbc 100644
--- a/net/tipc/core.h
+++ b/net/tipc/core.h
@@ -79,7 +79,6 @@
 extern u32 tipc_own_addr __read_mostly;
 extern int tipc_max_ports __read_mostly;
 extern int tipc_net_id __read_mostly;
-extern int tipc_remote_management __read_mostly;
 extern int sysctl_tipc_rmem[3] __read_mostly;
 
 /*
diff --git a/net/tipc/link.c b/net/tipc/link.c
index a42f4a1..c5190ab 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -280,13 +280,13 @@
 	return l_ptr;
 }
 
-
 void tipc_link_delete_list(unsigned int bearer_id, bool shutting_down)
 {
 	struct tipc_link *l_ptr;
 	struct tipc_node *n_ptr;
 
-	list_for_each_entry(n_ptr, &tipc_node_list, list) {
+	rcu_read_lock();
+	list_for_each_entry_rcu(n_ptr, &tipc_node_list, list) {
 		spin_lock_bh(&n_ptr->lock);
 		l_ptr = n_ptr->links[bearer_id];
 		if (l_ptr) {
@@ -309,6 +309,7 @@
 		}
 		spin_unlock_bh(&n_ptr->lock);
 	}
+	rcu_read_unlock();
 }
 
 /**
@@ -461,13 +462,15 @@
 	struct tipc_link *l_ptr;
 	struct tipc_node *n_ptr;
 
-	list_for_each_entry(n_ptr, &tipc_node_list, list) {
+	rcu_read_lock();
+	list_for_each_entry_rcu(n_ptr, &tipc_node_list, list) {
 		spin_lock_bh(&n_ptr->lock);
 		l_ptr = n_ptr->links[bearer_id];
 		if (l_ptr)
 			tipc_link_reset(l_ptr);
 		spin_unlock_bh(&n_ptr->lock);
 	}
+	rcu_read_unlock();
 }
 
 static void link_activate(struct tipc_link *l_ptr)
@@ -1458,10 +1461,6 @@
 		head = head->next;
 		buf->next = NULL;
 
-		/* Ensure bearer is still enabled */
-		if (unlikely(!b_ptr->active))
-			goto discard;
-
 		/* Ensure message is well-formed */
 		if (unlikely(!link_recv_buf_validate(buf)))
 			goto discard;
@@ -2408,13 +2407,12 @@
 {
 	struct tipc_link *l_ptr;
 	struct tipc_node *n_ptr;
-	struct tipc_node *tmp_n_ptr;
 	struct tipc_node *found_node = 0;
-
 	int i;
 
 	*bearer_id = 0;
-	list_for_each_entry_safe(n_ptr, tmp_n_ptr, &tipc_node_list, list) {
+	rcu_read_lock();
+	list_for_each_entry_rcu(n_ptr, &tipc_node_list, list) {
 		tipc_node_lock(n_ptr);
 		for (i = 0; i < MAX_BEARERS; i++) {
 			l_ptr = n_ptr->links[i];
@@ -2428,6 +2426,8 @@
 		if (found_node)
 			break;
 	}
+	rcu_read_unlock();
+
 	return found_node;
 }
 
diff --git a/net/tipc/name_distr.c b/net/tipc/name_distr.c
index 893c49a..aff8041 100644
--- a/net/tipc/name_distr.c
+++ b/net/tipc/name_distr.c
@@ -131,16 +131,24 @@
 {
 	struct sk_buff *buf_copy;
 	struct tipc_node *n_ptr;
+	struct tipc_link *l_ptr;
 
-	list_for_each_entry(n_ptr, &tipc_node_list, list) {
-		if (tipc_node_active_links(n_ptr)) {
+	rcu_read_lock();
+	list_for_each_entry_rcu(n_ptr, &tipc_node_list, list) {
+		spin_lock_bh(&n_ptr->lock);
+		l_ptr = n_ptr->active_links[n_ptr->addr & 1];
+		if (l_ptr) {
 			buf_copy = skb_copy(buf, GFP_ATOMIC);
-			if (!buf_copy)
+			if (!buf_copy) {
+				spin_unlock_bh(&n_ptr->lock);
 				break;
+			}
 			msg_set_destnode(buf_msg(buf_copy), n_ptr->addr);
-			tipc_link_xmit(buf_copy, n_ptr->addr, n_ptr->addr);
+			__tipc_link_xmit(l_ptr, buf_copy);
 		}
+		spin_unlock_bh(&n_ptr->lock);
 	}
+	rcu_read_unlock();
 
 	kfree_skb(buf);
 }
diff --git a/net/tipc/net.c b/net/tipc/net.c
index 31b606e..0374a81 100644
--- a/net/tipc/net.c
+++ b/net/tipc/net.c
@@ -182,8 +182,6 @@
 	tipc_bclink_init();
 	write_unlock_bh(&tipc_net_lock);
 
-	tipc_cfg_reinit();
-
 	pr_info("Started in network mode\n");
 	pr_info("Own node address %s, network identity %u\n",
 		tipc_addr_string_fill(addr_string, tipc_own_addr), tipc_net_id);
@@ -191,15 +189,14 @@
 
 void tipc_net_stop(void)
 {
-	struct tipc_node *node, *t_node;
-
 	if (!tipc_own_addr)
 		return;
+
 	write_lock_bh(&tipc_net_lock);
 	tipc_bearer_stop();
 	tipc_bclink_stop();
-	list_for_each_entry_safe(node, t_node, &tipc_node_list, list)
-		tipc_node_delete(node);
+	tipc_node_stop();
 	write_unlock_bh(&tipc_net_lock);
+
 	pr_info("Left network mode\n");
 }
diff --git a/net/tipc/node.c b/net/tipc/node.c
index 0b0f6c7..1d3a499 100644
--- a/net/tipc/node.c
+++ b/net/tipc/node.c
@@ -2,7 +2,7 @@
  * net/tipc/node.c: TIPC node management routines
  *
  * Copyright (c) 2000-2006, 2012 Ericsson AB
- * Copyright (c) 2005-2006, 2010-2011, Wind River Systems
+ * Copyright (c) 2005-2006, 2010-2014, Wind River Systems
  * All rights reserved.
  *
  * Redistribution and use in source and binary forms, with or without
@@ -44,13 +44,11 @@
 static void node_lost_contact(struct tipc_node *n_ptr);
 static void node_established_contact(struct tipc_node *n_ptr);
 
-static DEFINE_SPINLOCK(node_create_lock);
-
 static struct hlist_head node_htable[NODE_HTABLE_SIZE];
 LIST_HEAD(tipc_node_list);
 static u32 tipc_num_nodes;
-
-static atomic_t tipc_num_links = ATOMIC_INIT(0);
+static u32 tipc_num_links;
+static DEFINE_SPINLOCK(node_list_lock);
 
 /*
  * A trivial power-of-two bitmask technique is used for speed, since this
@@ -73,37 +71,26 @@
 	if (unlikely(!in_own_cluster_exact(addr)))
 		return NULL;
 
-	hlist_for_each_entry(node, &node_htable[tipc_hashfn(addr)], hash) {
-		if (node->addr == addr)
+	rcu_read_lock();
+	hlist_for_each_entry_rcu(node, &node_htable[tipc_hashfn(addr)], hash) {
+		if (node->addr == addr) {
+			rcu_read_unlock();
 			return node;
+		}
 	}
+	rcu_read_unlock();
 	return NULL;
 }
 
-/**
- * tipc_node_create - create neighboring node
- *
- * Currently, this routine is called by neighbor discovery code, which holds
- * net_lock for reading only.  We must take node_create_lock to ensure a node
- * isn't created twice if two different bearers discover the node at the same
- * time.  (It would be preferable to switch to holding net_lock in write mode,
- * but this is a non-trivial change.)
- */
 struct tipc_node *tipc_node_create(u32 addr)
 {
 	struct tipc_node *n_ptr, *temp_node;
 
-	spin_lock_bh(&node_create_lock);
-
-	n_ptr = tipc_node_find(addr);
-	if (n_ptr) {
-		spin_unlock_bh(&node_create_lock);
-		return n_ptr;
-	}
+	spin_lock_bh(&node_list_lock);
 
 	n_ptr = kzalloc(sizeof(*n_ptr), GFP_ATOMIC);
 	if (!n_ptr) {
-		spin_unlock_bh(&node_create_lock);
+		spin_unlock_bh(&node_list_lock);
 		pr_warn("Node creation failed, no memory\n");
 		return NULL;
 	}
@@ -114,31 +101,41 @@
 	INIT_LIST_HEAD(&n_ptr->list);
 	INIT_LIST_HEAD(&n_ptr->nsub);
 
-	hlist_add_head(&n_ptr->hash, &node_htable[tipc_hashfn(addr)]);
+	hlist_add_head_rcu(&n_ptr->hash, &node_htable[tipc_hashfn(addr)]);
 
-	list_for_each_entry(temp_node, &tipc_node_list, list) {
+	list_for_each_entry_rcu(temp_node, &tipc_node_list, list) {
 		if (n_ptr->addr < temp_node->addr)
 			break;
 	}
-	list_add_tail(&n_ptr->list, &temp_node->list);
+	list_add_tail_rcu(&n_ptr->list, &temp_node->list);
 	n_ptr->block_setup = WAIT_PEER_DOWN;
 	n_ptr->signature = INVALID_NODE_SIG;
 
 	tipc_num_nodes++;
 
-	spin_unlock_bh(&node_create_lock);
+	spin_unlock_bh(&node_list_lock);
 	return n_ptr;
 }
 
-void tipc_node_delete(struct tipc_node *n_ptr)
+static void tipc_node_delete(struct tipc_node *n_ptr)
 {
-	list_del(&n_ptr->list);
-	hlist_del(&n_ptr->hash);
-	kfree(n_ptr);
+	list_del_rcu(&n_ptr->list);
+	hlist_del_rcu(&n_ptr->hash);
+	kfree_rcu(n_ptr, rcu);
 
 	tipc_num_nodes--;
 }
 
+void tipc_node_stop(void)
+{
+	struct tipc_node *node, *t_node;
+
+	spin_lock_bh(&node_list_lock);
+	list_for_each_entry_safe(node, t_node, &tipc_node_list, list)
+		tipc_node_delete(node);
+	spin_unlock_bh(&node_list_lock);
+}
+
 /**
  * tipc_node_link_up - handle addition of link
  *
@@ -243,7 +240,9 @@
 void tipc_node_attach_link(struct tipc_node *n_ptr, struct tipc_link *l_ptr)
 {
 	n_ptr->links[l_ptr->b_ptr->identity] = l_ptr;
-	atomic_inc(&tipc_num_links);
+	spin_lock_bh(&node_list_lock);
+	tipc_num_links++;
+	spin_unlock_bh(&node_list_lock);
 	n_ptr->link_cnt++;
 }
 
@@ -255,7 +254,9 @@
 		if (l_ptr != n_ptr->links[i])
 			continue;
 		n_ptr->links[i] = NULL;
-		atomic_dec(&tipc_num_links);
+		spin_lock_bh(&node_list_lock);
+		tipc_num_links--;
+		spin_unlock_bh(&node_list_lock);
 		n_ptr->link_cnt--;
 	}
 }
@@ -341,27 +342,28 @@
 		return tipc_cfg_reply_error_string(TIPC_CFG_INVALID_VALUE
 						   " (network address)");
 
-	read_lock_bh(&tipc_net_lock);
+	spin_lock_bh(&node_list_lock);
 	if (!tipc_num_nodes) {
-		read_unlock_bh(&tipc_net_lock);
+		spin_unlock_bh(&node_list_lock);
 		return tipc_cfg_reply_none();
 	}
 
 	/* For now, get space for all other nodes */
 	payload_size = TLV_SPACE(sizeof(node_info)) * tipc_num_nodes;
 	if (payload_size > 32768u) {
-		read_unlock_bh(&tipc_net_lock);
+		spin_unlock_bh(&node_list_lock);
 		return tipc_cfg_reply_error_string(TIPC_CFG_NOT_SUPPORTED
 						   " (too many nodes)");
 	}
+	spin_unlock_bh(&node_list_lock);
+
 	buf = tipc_cfg_reply_alloc(payload_size);
-	if (!buf) {
-		read_unlock_bh(&tipc_net_lock);
+	if (!buf)
 		return NULL;
-	}
 
 	/* Add TLVs for all nodes in scope */
-	list_for_each_entry(n_ptr, &tipc_node_list, list) {
+	rcu_read_lock();
+	list_for_each_entry_rcu(n_ptr, &tipc_node_list, list) {
 		if (!tipc_in_scope(domain, n_ptr->addr))
 			continue;
 		node_info.addr = htonl(n_ptr->addr);
@@ -369,8 +371,7 @@
 		tipc_cfg_append_tlv(buf, TIPC_TLV_NODE_INFO,
 				    &node_info, sizeof(node_info));
 	}
-
-	read_unlock_bh(&tipc_net_lock);
+	rcu_read_unlock();
 	return buf;
 }
 
@@ -393,21 +394,19 @@
 	if (!tipc_own_addr)
 		return tipc_cfg_reply_none();
 
-	read_lock_bh(&tipc_net_lock);
-
+	spin_lock_bh(&node_list_lock);
 	/* Get space for all unicast links + broadcast link */
-	payload_size = TLV_SPACE(sizeof(link_info)) *
-		(atomic_read(&tipc_num_links) + 1);
+	payload_size = TLV_SPACE((sizeof(link_info)) * (tipc_num_links + 1));
 	if (payload_size > 32768u) {
-		read_unlock_bh(&tipc_net_lock);
+		spin_unlock_bh(&node_list_lock);
 		return tipc_cfg_reply_error_string(TIPC_CFG_NOT_SUPPORTED
 						   " (too many links)");
 	}
+	spin_unlock_bh(&node_list_lock);
+
 	buf = tipc_cfg_reply_alloc(payload_size);
-	if (!buf) {
-		read_unlock_bh(&tipc_net_lock);
+	if (!buf)
 		return NULL;
-	}
 
 	/* Add TLV for broadcast link */
 	link_info.dest = htonl(tipc_cluster_mask(tipc_own_addr));
@@ -416,7 +415,8 @@
 	tipc_cfg_append_tlv(buf, TIPC_TLV_LINK_INFO, &link_info, sizeof(link_info));
 
 	/* Add TLVs for any other links in scope */
-	list_for_each_entry(n_ptr, &tipc_node_list, list) {
+	rcu_read_lock();
+	list_for_each_entry_rcu(n_ptr, &tipc_node_list, list) {
 		u32 i;
 
 		if (!tipc_in_scope(domain, n_ptr->addr))
@@ -433,7 +433,6 @@
 		}
 		tipc_node_unlock(n_ptr);
 	}
-
-	read_unlock_bh(&tipc_net_lock);
+	rcu_read_unlock();
 	return buf;
 }
diff --git a/net/tipc/node.h b/net/tipc/node.h
index 63e2e8e..7cbb8ce 100644
--- a/net/tipc/node.h
+++ b/net/tipc/node.h
@@ -2,7 +2,7 @@
  * net/tipc/node.h: Include file for TIPC node management routines
  *
  * Copyright (c) 2000-2006, Ericsson AB
- * Copyright (c) 2005, 2010-2011, Wind River Systems
+ * Copyright (c) 2005, 2010-2014, Wind River Systems
  * All rights reserved.
  *
  * Redistribution and use in source and binary forms, with or without
@@ -66,6 +66,7 @@
  * @link_cnt: number of links to node
  * @signature: node instance identifier
  * @bclink: broadcast-related info
+ * @rcu: rcu struct for tipc_node
  *    @acked: sequence # of last outbound b'cast message acknowledged by node
  *    @last_in: sequence # of last in-sequence b'cast message received from node
  *    @last_sent: sequence # of last b'cast message sent by node
@@ -89,6 +90,7 @@
 	int working_links;
 	int block_setup;
 	u32 signature;
+	struct rcu_head rcu;
 	struct {
 		u32 acked;
 		u32 last_in;
@@ -107,7 +109,7 @@
 
 struct tipc_node *tipc_node_find(u32 addr);
 struct tipc_node *tipc_node_create(u32 addr);
-void tipc_node_delete(struct tipc_node *n_ptr);
+void tipc_node_stop(void);
 void tipc_node_attach_link(struct tipc_node *n_ptr, struct tipc_link *l_ptr);
 void tipc_node_detach_link(struct tipc_node *n_ptr, struct tipc_link *l_ptr);
 void tipc_node_link_down(struct tipc_node *n_ptr, struct tipc_link *l_ptr);