Merge branch 'tipc-Mar13-2011' of git://git.kernel.org/pub/scm/linux/kernel/git/paulg/net-next-2.6
diff --git a/MAINTAINERS b/MAINTAINERS
index 4765c67..a41c1e0 100644
--- a/MAINTAINERS
+++ b/MAINTAINERS
@@ -6083,13 +6083,11 @@
 TIPC NETWORK LAYER
 M:	Jon Maloy <jon.maloy@ericsson.com>
 M:	Allan Stephens <allan.stephens@windriver.com>
-L:	tipc-discussion@lists.sourceforge.net
+L:	netdev@vger.kernel.org (core kernel code)
+L:	tipc-discussion@lists.sourceforge.net (user apps, general discussion)
 W:	http://tipc.sourceforge.net/
-W:	http://tipc.cslab.ericsson.net/
-T:	git git://tipc.cslab.ericsson.net/pub/git/tipc.git
 S:	Maintained
 F:	include/linux/tipc*.h
-F:	include/net/tipc/
 F:	net/tipc/
 
 TILE ARCHITECTURE
diff --git a/include/linux/tipc_config.h b/include/linux/tipc_config.h
index 011556f..0db2395 100644
--- a/include/linux/tipc_config.h
+++ b/include/linux/tipc_config.h
@@ -89,7 +89,7 @@
 #define  TIPC_CMD_GET_MAX_SUBSCR    0x4006    /* tx none, rx unsigned */
 #define  TIPC_CMD_GET_MAX_ZONES     0x4007    /* obsoleted */
 #define  TIPC_CMD_GET_MAX_CLUSTERS  0x4008    /* obsoleted */
-#define  TIPC_CMD_GET_MAX_NODES     0x4009    /* tx none, rx unsigned */
+#define  TIPC_CMD_GET_MAX_NODES     0x4009    /* obsoleted */
 #define  TIPC_CMD_GET_MAX_SLAVES    0x400A    /* obsoleted */
 #define  TIPC_CMD_GET_NETID         0x400B    /* tx none, rx unsigned */
 
@@ -115,7 +115,7 @@
 #define  TIPC_CMD_SET_MAX_SUBSCR    0x8006    /* tx unsigned, rx none */
 #define  TIPC_CMD_SET_MAX_ZONES     0x8007    /* obsoleted */
 #define  TIPC_CMD_SET_MAX_CLUSTERS  0x8008    /* obsoleted */
-#define  TIPC_CMD_SET_MAX_NODES     0x8009    /* tx unsigned, rx none */
+#define  TIPC_CMD_SET_MAX_NODES     0x8009    /* obsoleted */
 #define  TIPC_CMD_SET_MAX_SLAVES    0x800A    /* obsoleted */
 #define  TIPC_CMD_SET_NETID         0x800B    /* tx unsigned, rx none */
 
@@ -202,7 +202,7 @@
 
 struct tipc_bearer_config {
 	__be32 priority;		/* Range [1,31]. Override per link  */
-	__be32 detect_scope;
+	__be32 disc_domain;		/* <Z.C.N> describing desired nodes */
 	char name[TIPC_MAX_BEARER_NAME];
 };
 
diff --git a/net/tipc/Kconfig b/net/tipc/Kconfig
index 0436927..2c5954b 100644
--- a/net/tipc/Kconfig
+++ b/net/tipc/Kconfig
@@ -29,18 +29,6 @@
 	  Saying Y here will open some advanced configuration for TIPC.
 	  Most users do not need to bother; if unsure, just say N.
 
-config TIPC_NODES
-	int "Maximum number of nodes in a cluster"
-	depends on TIPC_ADVANCED
-	range 8 2047
-	default "255"
-	help
-	  Specifies how many nodes can be supported in a TIPC cluster.
-	  Can range from 8 to 2047 nodes; default is 255.
-
-	  Setting this to a smaller value saves some memory;
-	  setting it to higher allows for more nodes.
-
 config TIPC_PORTS
 	int "Maximum number of ports in a node"
 	depends on TIPC_ADVANCED
diff --git a/net/tipc/addr.c b/net/tipc/addr.c
index 88463d9..a6fdab3 100644
--- a/net/tipc/addr.c
+++ b/net/tipc/addr.c
@@ -2,7 +2,7 @@
  * net/tipc/addr.c: TIPC address utility routines
  *
  * Copyright (c) 2000-2006, Ericsson AB
- * Copyright (c) 2004-2005, Wind River Systems
+ * Copyright (c) 2004-2005, 2010-2011, Wind River Systems
  * All rights reserved.
  *
  * Redistribution and use in source and binary forms, with or without
@@ -41,7 +41,7 @@
  * tipc_addr_domain_valid - validates a network domain address
  *
  * Accepts <Z.C.N>, <Z.C.0>, <Z.0.0>, and <0.0.0>,
- * where Z, C, and N are non-zero and do not exceed the configured limits.
+ * where Z, C, and N are non-zero.
  *
  * Returns 1 if domain address is valid, otherwise 0
  */
@@ -51,10 +51,6 @@
 	u32 n = tipc_node(addr);
 	u32 c = tipc_cluster(addr);
 	u32 z = tipc_zone(addr);
-	u32 max_nodes = tipc_max_nodes;
-
-	if (n > max_nodes)
-		return 0;
 
 	if (n && (!z || !c))
 		return 0;
@@ -66,8 +62,7 @@
 /**
  * tipc_addr_node_valid - validates a proposed network address for this node
  *
- * Accepts <Z.C.N>, where Z, C, and N are non-zero and do not exceed
- * the configured limits.
+ * Accepts <Z.C.N>, where Z, C, and N are non-zero.
  *
  * Returns 1 if address can be used, otherwise 0
  */
@@ -81,9 +76,9 @@
 {
 	if (!domain || (domain == addr))
 		return 1;
-	if (domain == (addr & 0xfffff000u)) /* domain <Z.C.0> */
+	if (domain == tipc_cluster_mask(addr)) /* domain <Z.C.0> */
 		return 1;
-	if (domain == (addr & 0xff000000u)) /* domain <Z.0.0> */
+	if (domain == tipc_zone_mask(addr)) /* domain <Z.0.0> */
 		return 1;
 	return 0;
 }
diff --git a/net/tipc/addr.h b/net/tipc/addr.h
index 2490fad..8971aba 100644
--- a/net/tipc/addr.h
+++ b/net/tipc/addr.h
@@ -37,6 +37,16 @@
 #ifndef _TIPC_ADDR_H
 #define _TIPC_ADDR_H
 
+static inline u32 tipc_zone_mask(u32 addr)
+{
+	return addr & 0xff000000u;
+}
+
+static inline u32 tipc_cluster_mask(u32 addr)
+{
+	return addr & 0xfffff000u;
+}
+
 static inline int in_own_cluster(u32 addr)
 {
 	return !((addr ^ tipc_own_addr) >> 12);
@@ -49,14 +59,13 @@
  * after a network hop.
  */
 
-static inline int addr_domain(int sc)
+static inline u32 addr_domain(u32 sc)
 {
 	if (likely(sc == TIPC_NODE_SCOPE))
 		return tipc_own_addr;
 	if (sc == TIPC_CLUSTER_SCOPE)
-		return tipc_addr(tipc_zone(tipc_own_addr),
-				 tipc_cluster(tipc_own_addr), 0);
-	return tipc_addr(tipc_zone(tipc_own_addr), 0, 0);
+		return tipc_cluster_mask(tipc_own_addr);
+	return tipc_zone_mask(tipc_own_addr);
 }
 
 int tipc_addr_domain_valid(u32);
diff --git a/net/tipc/bearer.c b/net/tipc/bearer.c
index f2839b0..411719f 100644
--- a/net/tipc/bearer.c
+++ b/net/tipc/bearer.c
@@ -158,7 +158,6 @@
 	m_ptr->disable_bearer = disable;
 	m_ptr->addr2str = addr2str;
 	memcpy(&m_ptr->bcast_addr, bcast_addr, sizeof(*bcast_addr));
-	m_ptr->bcast = 1;
 	strcpy(m_ptr->name, name);
 	m_ptr->priority = bearer_priority;
 	m_ptr->tolerance = link_tolerance;
@@ -474,7 +473,7 @@
  * tipc_enable_bearer - enable bearer with the given name
  */
 
-int tipc_enable_bearer(const char *name, u32 bcast_scope, u32 priority)
+int tipc_enable_bearer(const char *name, u32 disc_domain, u32 priority)
 {
 	struct tipc_bearer *b_ptr;
 	struct media *m_ptr;
@@ -494,9 +493,9 @@
 		warn("Bearer <%s> rejected, illegal name\n", name);
 		return -EINVAL;
 	}
-	if (!tipc_addr_domain_valid(bcast_scope) ||
-	    !tipc_in_scope(bcast_scope, tipc_own_addr)) {
-		warn("Bearer <%s> rejected, illegal broadcast scope\n", name);
+	if (!tipc_addr_domain_valid(disc_domain) ||
+	    !tipc_in_scope(disc_domain, tipc_own_addr)) {
+		warn("Bearer <%s> rejected, illegal discovery domain\n", name);
 		return -EINVAL;
 	}
 	if ((priority < TIPC_MIN_LINK_PRI ||
@@ -560,18 +559,15 @@
 	b_ptr->media = m_ptr;
 	b_ptr->net_plane = bearer_id + 'A';
 	b_ptr->active = 1;
-	b_ptr->detect_scope = bcast_scope;
 	b_ptr->priority = priority;
 	INIT_LIST_HEAD(&b_ptr->cong_links);
 	INIT_LIST_HEAD(&b_ptr->links);
-	if (m_ptr->bcast) {
-		b_ptr->link_req = tipc_disc_init_link_req(b_ptr, &m_ptr->bcast_addr,
-							  bcast_scope);
-	}
+	b_ptr->link_req = tipc_disc_init_link_req(b_ptr, &m_ptr->bcast_addr,
+						  disc_domain);
 	spin_lock_init(&b_ptr->lock);
 	write_unlock_bh(&tipc_net_lock);
 	info("Enabled bearer <%s>, discovery domain %s, priority %u\n",
-	     name, tipc_addr_string_fill(addr_string, bcast_scope), priority);
+	     name, tipc_addr_string_fill(addr_string, disc_domain), priority);
 	return 0;
 failed:
 	write_unlock_bh(&tipc_net_lock);
diff --git a/net/tipc/bearer.h b/net/tipc/bearer.h
index 255dea6..31d6172 100644
--- a/net/tipc/bearer.h
+++ b/net/tipc/bearer.h
@@ -70,7 +70,6 @@
  * @disable_bearer: routine which disables a bearer
  * @addr2str: routine which converts bearer's address to string form
  * @bcast_addr: media address used in broadcasting
- * @bcast: non-zero if media supports broadcasting [currently mandatory]
  * @priority: default link (and bearer) priority
  * @tolerance: default time (in ms) before declaring link failure
  * @window: default window (in packets) before declaring link congestion
@@ -87,7 +86,6 @@
 	char *(*addr2str)(struct tipc_media_addr *a,
 			  char *str_buf, int str_size);
 	struct tipc_media_addr bcast_addr;
-	int bcast;
 	u32 priority;
 	u32 tolerance;
 	u32 window;
@@ -105,7 +103,6 @@
  * @name: bearer name (format = media:interface)
  * @media: ptr to media structure associated with bearer
  * @priority: default link priority for bearer
- * @detect_scope: network address mask used during automatic link creation
  * @identity: array index of this bearer within TIPC bearer array
  * @link_req: ptr to (optional) structure making periodic link setup requests
  * @links: list of non-congested links associated with bearer
@@ -128,7 +125,6 @@
 	spinlock_t lock;
 	struct media *media;
 	u32 priority;
-	u32 detect_scope;
 	u32 identity;
 	struct link_req *link_req;
 	struct list_head links;
@@ -167,7 +163,7 @@
 int  tipc_block_bearer(const char *name);
 void tipc_continue(struct tipc_bearer *tb_ptr);
 
-int tipc_enable_bearer(const char *bearer_name, u32 bcast_scope, u32 priority);
+int tipc_enable_bearer(const char *bearer_name, u32 disc_domain, u32 priority);
 int tipc_disable_bearer(const char *name);
 
 /*
diff --git a/net/tipc/config.c b/net/tipc/config.c
index e16750d..b25a396 100644
--- a/net/tipc/config.c
+++ b/net/tipc/config.c
@@ -2,7 +2,7 @@
  * net/tipc/config.c: TIPC configuration management code
  *
  * Copyright (c) 2002-2006, Ericsson AB
- * Copyright (c) 2004-2007, Wind River Systems
+ * Copyright (c) 2004-2007, 2010-2011, Wind River Systems
  * All rights reserved.
  *
  * Redistribution and use in source and binary forms, with or without
@@ -148,7 +148,7 @@
 
 	args = (struct tipc_bearer_config *)TLV_DATA(req_tlv_area);
 	if (tipc_enable_bearer(args->name,
-			       ntohl(args->detect_scope),
+			       ntohl(args->disc_domain),
 			       ntohl(args->priority)))
 		return tipc_cfg_reply_error_string("unable to enable bearer");
 
@@ -260,25 +260,6 @@
 	return tipc_cfg_reply_none();
 }
 
-static struct sk_buff *cfg_set_max_nodes(void)
-{
-	u32 value;
-
-	if (!TLV_CHECK(req_tlv_area, req_tlv_space, TIPC_TLV_UNSIGNED))
-		return tipc_cfg_reply_error_string(TIPC_CFG_TLV_ERROR);
-	value = ntohl(*(__be32 *)TLV_DATA(req_tlv_area));
-	if (value == tipc_max_nodes)
-		return tipc_cfg_reply_none();
-	if (value != delimit(value, 8, 2047))
-		return tipc_cfg_reply_error_string(TIPC_CFG_INVALID_VALUE
-						   " (max nodes must be 8-2047)");
-	if (tipc_mode == TIPC_NET_MODE)
-		return tipc_cfg_reply_error_string(TIPC_CFG_NOT_SUPPORTED
-			" (cannot change max nodes once TIPC has joined a network)");
-	tipc_max_nodes = value;
-	return tipc_cfg_reply_none();
-}
-
 static struct sk_buff *cfg_set_netid(void)
 {
 	u32 value;
@@ -397,9 +378,6 @@
 	case TIPC_CMD_SET_MAX_SUBSCR:
 		rep_tlv_buf = cfg_set_max_subscriptions();
 		break;
-	case TIPC_CMD_SET_MAX_NODES:
-		rep_tlv_buf = cfg_set_max_nodes();
-		break;
 	case TIPC_CMD_SET_NETID:
 		rep_tlv_buf = cfg_set_netid();
 		break;
@@ -415,9 +393,6 @@
 	case TIPC_CMD_GET_MAX_SUBSCR:
 		rep_tlv_buf = tipc_cfg_reply_unsigned(tipc_max_subscriptions);
 		break;
-	case TIPC_CMD_GET_MAX_NODES:
-		rep_tlv_buf = tipc_cfg_reply_unsigned(tipc_max_nodes);
-		break;
 	case TIPC_CMD_GET_NETID:
 		rep_tlv_buf = tipc_cfg_reply_unsigned(tipc_net_id);
 		break;
@@ -431,6 +406,8 @@
 	case TIPC_CMD_GET_MAX_SLAVES:
 	case TIPC_CMD_SET_MAX_CLUSTERS:
 	case TIPC_CMD_GET_MAX_CLUSTERS:
+	case TIPC_CMD_SET_MAX_NODES:
+	case TIPC_CMD_GET_MAX_NODES:
 		rep_tlv_buf = tipc_cfg_reply_error_string(TIPC_CFG_NOT_SUPPORTED
 							  " (obsolete command)");
 		break;
diff --git a/net/tipc/core.c b/net/tipc/core.c
index 2da1fc7..c9a73e7 100644
--- a/net/tipc/core.c
+++ b/net/tipc/core.c
@@ -41,10 +41,6 @@
 #include "config.h"
 
 
-#ifndef CONFIG_TIPC_NODES
-#define CONFIG_TIPC_NODES 255
-#endif
-
 #ifndef CONFIG_TIPC_PORTS
 #define CONFIG_TIPC_PORTS 8191
 #endif
@@ -64,7 +60,6 @@
 /* configurable TIPC parameters */
 
 u32 tipc_own_addr;
-int tipc_max_nodes;
 int tipc_max_ports;
 int tipc_max_subscriptions;
 int tipc_max_publications;
@@ -192,7 +187,6 @@
 	tipc_max_publications = 10000;
 	tipc_max_subscriptions = 2000;
 	tipc_max_ports = CONFIG_TIPC_PORTS;
-	tipc_max_nodes = CONFIG_TIPC_NODES;
 	tipc_net_id = 4711;
 
 	res = tipc_core_start();
diff --git a/net/tipc/core.h b/net/tipc/core.h
index 37544d9..436dda1 100644
--- a/net/tipc/core.h
+++ b/net/tipc/core.h
@@ -147,7 +147,6 @@
  */
 
 extern u32 tipc_own_addr;
-extern int tipc_max_nodes;
 extern int tipc_max_ports;
 extern int tipc_max_subscriptions;
 extern int tipc_max_publications;
diff --git a/net/tipc/discover.c b/net/tipc/discover.c
index 09ce231..491eff5 100644
--- a/net/tipc/discover.c
+++ b/net/tipc/discover.c
@@ -75,12 +75,12 @@
 					  u32 dest_domain,
 					  struct tipc_bearer *b_ptr)
 {
-	struct sk_buff *buf = tipc_buf_acquire(DSC_H_SIZE);
+	struct sk_buff *buf = tipc_buf_acquire(INT_H_SIZE);
 	struct tipc_msg *msg;
 
 	if (buf) {
 		msg = buf_msg(buf);
-		tipc_msg_init(msg, LINK_CONFIG, type, DSC_H_SIZE, dest_domain);
+		tipc_msg_init(msg, LINK_CONFIG, type, INT_H_SIZE, dest_domain);
 		msg_set_non_seq(msg, 1);
 		msg_set_dest_domain(msg, dest_domain);
 		msg_set_bc_netid(msg, tipc_net_id);
@@ -119,17 +119,21 @@
 
 void tipc_disc_recv_msg(struct sk_buff *buf, struct tipc_bearer *b_ptr)
 {
+	struct tipc_node *n_ptr;
 	struct link *link;
-	struct tipc_media_addr media_addr;
+	struct tipc_media_addr media_addr, *addr;
+	struct sk_buff *rbuf;
 	struct tipc_msg *msg = buf_msg(buf);
 	u32 dest = msg_dest_domain(msg);
 	u32 orig = msg_prevnode(msg);
 	u32 net_id = msg_bc_netid(msg);
 	u32 type = msg_type(msg);
+	int link_fully_up;
 
 	msg_get_media_addr(msg, &media_addr);
 	buf_discard(buf);
 
+	/* Validate discovery message from requesting node */
 	if (net_id != tipc_net_id)
 		return;
 	if (!tipc_addr_domain_valid(dest))
@@ -143,57 +147,70 @@
 	}
 	if (!tipc_in_scope(dest, tipc_own_addr))
 		return;
-	if (in_own_cluster(orig)) {
-		/* Always accept link here */
-		struct sk_buff *rbuf;
-		struct tipc_media_addr *addr;
-		struct tipc_node *n_ptr = tipc_node_find(orig);
-		int link_fully_up;
+	if (!in_own_cluster(orig))
+		return;
 
-		if (n_ptr == NULL) {
-			n_ptr = tipc_node_create(orig);
-			if (!n_ptr)
-				return;
-		}
-		spin_lock_bh(&n_ptr->lock);
-
-		/* Don't talk to neighbor during cleanup after last session */
-
-		if (n_ptr->cleanup_required) {
-			spin_unlock_bh(&n_ptr->lock);
+	/* Locate structure corresponding to requesting node */
+	n_ptr = tipc_node_find(orig);
+	if (!n_ptr) {
+		n_ptr = tipc_node_create(orig);
+		if (!n_ptr)
 			return;
-		}
+	}
+	tipc_node_lock(n_ptr);
 
-		link = n_ptr->links[b_ptr->identity];
+	/* Don't talk to neighbor during cleanup after last session */
+	if (n_ptr->cleanup_required) {
+		tipc_node_unlock(n_ptr);
+		return;
+	}
+
+	link = n_ptr->links[b_ptr->identity];
+
+	/* Create a link endpoint for this bearer, if necessary */
+	if (!link) {
+		link = tipc_link_create(n_ptr, b_ptr, &media_addr);
 		if (!link) {
-			link = tipc_link_create(b_ptr, orig, &media_addr);
-			if (!link) {
-				spin_unlock_bh(&n_ptr->lock);
-				return;
-			}
-		}
-		addr = &link->media_addr;
-		if (memcmp(addr, &media_addr, sizeof(*addr))) {
-			if (tipc_link_is_up(link) || (!link->started)) {
-				disc_dupl_alert(b_ptr, orig, &media_addr);
-				spin_unlock_bh(&n_ptr->lock);
-				return;
-			}
-			warn("Resetting link <%s>, peer interface address changed\n",
-			     link->name);
-			memcpy(addr, &media_addr, sizeof(*addr));
-			tipc_link_reset(link);
-		}
-		link_fully_up = link_working_working(link);
-		spin_unlock_bh(&n_ptr->lock);
-		if ((type == DSC_RESP_MSG) || link_fully_up)
+			tipc_node_unlock(n_ptr);
 			return;
+		}
+	}
+
+	/*
+	 * Ensure requesting node's media address is correct
+	 *
+	 * If media address doesn't match and the link is working, reject the
+	 * request (must be from a duplicate node).
+	 *
+	 * If media address doesn't match and the link is not working, accept
+	 * the new media address and reset the link to ensure it starts up
+	 * cleanly.
+	 */
+	addr = &link->media_addr;
+	if (memcmp(addr, &media_addr, sizeof(*addr))) {
+		if (tipc_link_is_up(link) || (!link->started)) {
+			disc_dupl_alert(b_ptr, orig, &media_addr);
+			tipc_node_unlock(n_ptr);
+			return;
+		}
+		warn("Resetting link <%s>, peer interface address changed\n",
+		     link->name);
+		memcpy(addr, &media_addr, sizeof(*addr));
+		tipc_link_reset(link);
+	}
+
+	/* Accept discovery message & send response, if necessary */
+	link_fully_up = link_working_working(link);
+
+	if ((type == DSC_REQ_MSG) && !link_fully_up && !b_ptr->blocked) {
 		rbuf = tipc_disc_init_msg(DSC_RESP_MSG, orig, b_ptr);
-		if (rbuf != NULL) {
+		if (rbuf) {
 			b_ptr->media->send_msg(rbuf, b_ptr, &media_addr);
 			buf_discard(rbuf);
 		}
 	}
+
+	tipc_node_unlock(n_ptr);
 }
 
 /**
diff --git a/net/tipc/link.c b/net/tipc/link.c
index 89fbb6d..43639ff 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -293,19 +293,35 @@
 
 /**
  * tipc_link_create - create a new link
+ * @n_ptr: pointer to associated node
  * @b_ptr: pointer to associated bearer
- * @peer: network address of node at other end of link
  * @media_addr: media address to use when sending messages over link
  *
  * Returns pointer to link.
  */
 
-struct link *tipc_link_create(struct tipc_bearer *b_ptr, const u32 peer,
+struct link *tipc_link_create(struct tipc_node *n_ptr,
+			      struct tipc_bearer *b_ptr,
 			      const struct tipc_media_addr *media_addr)
 {
 	struct link *l_ptr;
 	struct tipc_msg *msg;
 	char *if_name;
+	char addr_string[16];
+	u32 peer = n_ptr->addr;
+
+	if (n_ptr->link_cnt >= 2) {
+		tipc_addr_string_fill(addr_string, n_ptr->addr);
+		err("Attempt to establish third link to %s\n", addr_string);
+		return NULL;
+	}
+
+	if (n_ptr->links[b_ptr->identity]) {
+		tipc_addr_string_fill(addr_string, n_ptr->addr);
+		err("Attempt to establish second link on <%s> to %s\n",
+		    b_ptr->name, addr_string);
+		return NULL;
+	}
 
 	l_ptr = kzalloc(sizeof(*l_ptr), GFP_ATOMIC);
 	if (!l_ptr) {
@@ -322,6 +338,7 @@
 		tipc_zone(peer), tipc_cluster(peer), tipc_node(peer));
 		/* note: peer i/f is appended to link name by reset/activate */
 	memcpy(&l_ptr->media_addr, media_addr, sizeof(*media_addr));
+	l_ptr->owner = n_ptr;
 	l_ptr->checkpoint = 1;
 	l_ptr->b_ptr = b_ptr;
 	link_set_supervision_props(l_ptr, b_ptr->media->tolerance);
@@ -345,11 +362,7 @@
 
 	link_reset_statistics(l_ptr);
 
-	l_ptr->owner = tipc_node_attach_link(l_ptr);
-	if (!l_ptr->owner) {
-		kfree(l_ptr);
-		return NULL;
-	}
+	tipc_node_attach_link(n_ptr, l_ptr);
 
 	k_init_timer(&l_ptr->timer, (Handler)link_timeout, (unsigned long)l_ptr);
 	list_add_tail(&l_ptr->link_list, &b_ptr->links);
@@ -548,7 +561,7 @@
 	tipc_node_link_down(l_ptr->owner, l_ptr);
 	tipc_bearer_remove_dest(l_ptr->b_ptr, l_ptr->addr);
 
-	if (was_active_link && tipc_node_has_active_links(l_ptr->owner) &&
+	if (was_active_link && tipc_node_active_links(l_ptr->owner) &&
 	    l_ptr->owner->permit_changeover) {
 		l_ptr->reset_checkpoint = checkpoint;
 		l_ptr->exp_msg_count = START_CHANGEOVER;
@@ -1733,10 +1746,6 @@
 						tipc_node_unlock(n_ptr);
 						tipc_link_recv_bundle(buf);
 						continue;
-					case ROUTE_DISTRIBUTOR:
-						tipc_node_unlock(n_ptr);
-						buf_discard(buf);
-						continue;
 					case NAME_DISTRIBUTOR:
 						tipc_node_unlock(n_ptr);
 						tipc_named_recv(buf);
@@ -1763,6 +1772,10 @@
 							goto protocol_check;
 						}
 						break;
+					default:
+						buf_discard(buf);
+						buf = NULL;
+						break;
 					}
 				}
 				tipc_node_unlock(n_ptr);
@@ -1898,6 +1911,7 @@
 	struct sk_buff *buf = NULL;
 	struct tipc_msg *msg = l_ptr->pmsg;
 	u32 msg_size = sizeof(l_ptr->proto_msg);
+	int r_flag;
 
 	if (link_blocked(l_ptr))
 		return;
@@ -1954,10 +1968,8 @@
 		msg_set_max_pkt(msg, l_ptr->max_pkt_target);
 	}
 
-	if (tipc_node_has_redundant_links(l_ptr->owner))
-		msg_set_redundant_link(msg);
-	else
-		msg_clear_redundant_link(msg);
+	r_flag = (l_ptr->owner->working_links > tipc_link_is_up(l_ptr));
+	msg_set_redundant_link(msg, r_flag);
 	msg_set_linkprio(msg, l_ptr->priority);
 
 	/* Ensure sequence number will not fit : */
@@ -1977,7 +1989,6 @@
 		skb_copy_to_linear_data(buf, msg, sizeof(l_ptr->proto_msg));
 		return;
 	}
-	msg_set_timestamp(msg, jiffies_to_msecs(jiffies));
 
 	/* Message can be sent */
 
@@ -2065,7 +2076,7 @@
 		l_ptr->peer_bearer_id = msg_bearer_id(msg);
 
 		/* Synchronize broadcast sequence numbers */
-		if (!tipc_node_has_redundant_links(l_ptr->owner))
+		if (!tipc_node_redundant_links(l_ptr->owner))
 			l_ptr->owner->bclink.last_in = mod(msg_last_bcast(msg));
 		break;
 	case STATE_MSG:
@@ -2412,9 +2423,6 @@
 	else
 		destaddr = msg_destnode(inmsg);
 
-	if (msg_routed(inmsg))
-		msg_set_prevnode(inmsg, tipc_own_addr);
-
 	/* Prepare reusable fragment header: */
 
 	tipc_msg_init(&fragm_hdr, MSG_FRAGMENTER, FIRST_FRAGMENT,
diff --git a/net/tipc/link.h b/net/tipc/link.h
index a7794e7..e6a30db 100644
--- a/net/tipc/link.h
+++ b/net/tipc/link.h
@@ -207,7 +207,8 @@
 
 struct tipc_port;
 
-struct link *tipc_link_create(struct tipc_bearer *b_ptr, const u32 peer,
+struct link *tipc_link_create(struct tipc_node *n_ptr,
+			      struct tipc_bearer *b_ptr,
 			      const struct tipc_media_addr *media_addr);
 void tipc_link_delete(struct link *l_ptr);
 void tipc_link_changeover(struct link *l_ptr);
diff --git a/net/tipc/msg.c b/net/tipc/msg.c
index 0787e12..6d92d17 100644
--- a/net/tipc/msg.c
+++ b/net/tipc/msg.c
@@ -192,8 +192,6 @@
 		default:
 			tipc_printf(buf, "UNKNOWN TYPE %u", msg_type(msg));
 		}
-		if (msg_routed(msg) && !msg_non_seq(msg))
-			tipc_printf(buf, "ROUT:");
 		if (msg_reroute_cnt(msg))
 			tipc_printf(buf, "REROUTED(%u):",
 				    msg_reroute_cnt(msg));
@@ -210,8 +208,6 @@
 		default:
 			tipc_printf(buf, "UNKNOWN:%x", msg_type(msg));
 		}
-		if (msg_routed(msg))
-			tipc_printf(buf, "ROUT:");
 		if (msg_reroute_cnt(msg))
 			tipc_printf(buf, "REROUTED(%u):",
 				    msg_reroute_cnt(msg));
@@ -232,13 +228,10 @@
 		default:
 			tipc_printf(buf, "UNKNOWN TYPE:%x", msg_type(msg));
 		}
-		if (msg_routed(msg))
-			tipc_printf(buf, "ROUT:");
 		if (msg_reroute_cnt(msg))
 			tipc_printf(buf, "REROUTED(%u):", msg_reroute_cnt(msg));
 		break;
 	case LINK_PROTOCOL:
-		tipc_printf(buf, "PROT:TIM(%u):", msg_timestamp(msg));
 		switch (msg_type(msg)) {
 		case STATE_MSG:
 			tipc_printf(buf, "STATE:");
@@ -275,33 +268,6 @@
 			tipc_printf(buf, "UNKNOWN TYPE:%x", msg_type(msg));
 		}
 		break;
-	case ROUTE_DISTRIBUTOR:
-		tipc_printf(buf, "ROUTING_MNG:");
-		switch (msg_type(msg)) {
-		case EXT_ROUTING_TABLE:
-			tipc_printf(buf, "EXT_TBL:");
-			tipc_printf(buf, "TO:%x:", msg_remote_node(msg));
-			break;
-		case LOCAL_ROUTING_TABLE:
-			tipc_printf(buf, "LOCAL_TBL:");
-			tipc_printf(buf, "TO:%x:", msg_remote_node(msg));
-			break;
-		case SLAVE_ROUTING_TABLE:
-			tipc_printf(buf, "DP_TBL:");
-			tipc_printf(buf, "TO:%x:", msg_remote_node(msg));
-			break;
-		case ROUTE_ADDITION:
-			tipc_printf(buf, "ADD:");
-			tipc_printf(buf, "TO:%x:", msg_remote_node(msg));
-			break;
-		case ROUTE_REMOVAL:
-			tipc_printf(buf, "REMOVE:");
-			tipc_printf(buf, "TO:%x:", msg_remote_node(msg));
-			break;
-		default:
-			tipc_printf(buf, "UNKNOWN TYPE:%x", msg_type(msg));
-		}
-		break;
 	case LINK_CONFIG:
 		tipc_printf(buf, "CFG:");
 		switch (msg_type(msg)) {
diff --git a/net/tipc/msg.h b/net/tipc/msg.h
index 9d643a1..de02339 100644
--- a/net/tipc/msg.h
+++ b/net/tipc/msg.h
@@ -421,13 +421,6 @@
 	return msg_short(m) || (msg_destnode(m) == d);
 }
 
-static inline u32 msg_routed(struct tipc_msg *m)
-{
-	if (likely(msg_short(m)))
-		return 0;
-	return (msg_destnode(m) ^ msg_orignode(m)) >> 11;
-}
-
 static inline u32 msg_nametype(struct tipc_msg *m)
 {
 	return msg_word(m, 8);
@@ -438,16 +431,6 @@
 	msg_set_word(m, 8, n);
 }
 
-static inline void msg_set_timestamp(struct tipc_msg *m, u32 n)
-{
-	msg_set_word(m, 8, n);
-}
-
-static inline u32 msg_timestamp(struct tipc_msg *m)
-{
-	return msg_word(m, 8);
-}
-
 static inline u32 msg_nameinst(struct tipc_msg *m)
 {
 	return msg_word(m, 9);
@@ -535,7 +518,6 @@
 #define  NAME_DISTRIBUTOR     11
 #define  MSG_FRAGMENTER       12
 #define  LINK_CONFIG          13
-#define  DSC_H_SIZE           40
 
 /*
  *  Connection management protocol messages
@@ -729,14 +711,9 @@
 	return msg_bits(m, 5, 12, 0x1);
 }
 
-static inline void msg_set_redundant_link(struct tipc_msg *m)
+static inline void msg_set_redundant_link(struct tipc_msg *m, u32 r)
 {
-	msg_set_bits(m, 5, 12, 0x1, 1);
-}
-
-static inline void msg_clear_redundant_link(struct tipc_msg *m)
-{
-	msg_set_bits(m, 5, 12, 0x1, 0);
+	msg_set_bits(m, 5, 12, 0x1, r);
 }
 
 
@@ -785,21 +762,6 @@
 }
 
 /*
- * Routing table message data
- */
-
-
-static inline u32 msg_remote_node(struct tipc_msg *m)
-{
-	return msg_word(m, msg_hdr_sz(m)/4);
-}
-
-static inline void msg_set_remote_node(struct tipc_msg *m, u32 a)
-{
-	msg_set_word(m, msg_hdr_sz(m)/4, a);
-}
-
-/*
  * Segmentation message types
  */
 
diff --git a/net/tipc/name_distr.c b/net/tipc/name_distr.c
index 483c226..c9fa6df 100644
--- a/net/tipc/name_distr.c
+++ b/net/tipc/name_distr.c
@@ -2,7 +2,7 @@
  * net/tipc/name_distr.c: TIPC name distribution code
  *
  * Copyright (c) 2000-2006, Ericsson AB
- * Copyright (c) 2005, Wind River Systems
+ * Copyright (c) 2005, 2010-2011, Wind River Systems
  * All rights reserved.
  *
  * Redistribution and use in source and binary forms, with or without
@@ -109,11 +109,9 @@
 {
 	struct sk_buff *buf_copy;
 	struct tipc_node *n_ptr;
-	u32 n_num;
 
-	for (n_num = 1; n_num <= tipc_net.highest_node; n_num++) {
-		n_ptr = tipc_net.nodes[n_num];
-		if (n_ptr && tipc_node_has_active_links(n_ptr)) {
+	list_for_each_entry(n_ptr, &tipc_node_list, list) {
+		if (tipc_node_active_links(n_ptr)) {
 			buf_copy = skb_copy(buf, GFP_ATOMIC);
 			if (!buf_copy)
 				break;
@@ -214,17 +212,16 @@
 }
 
 /**
- * node_is_down - remove publication associated with a failed node
+ * named_purge_publ - remove publication associated with a failed node
  *
  * Invoked for each publication issued by a newly failed node.
  * Removes publication structure from name table & deletes it.
  * In rare cases the link may have come back up again when this
  * function is called, and we have two items representing the same
  * publication. Nudge this item's key to distinguish it from the other.
- * (Note: Publication's node subscription is already unsubscribed.)
  */
 
-static void node_is_down(struct publication *publ)
+static void named_purge_publ(struct publication *publ)
 {
 	struct publication *p;
 
@@ -232,6 +229,8 @@
 	publ->key += 1222345;
 	p = tipc_nametbl_remove_publ(publ->type, publ->lower,
 				     publ->node, publ->ref, publ->key);
+	if (p)
+		tipc_nodesub_unsubscribe(&p->subscr);
 	write_unlock_bh(&tipc_nametbl_lock);
 
 	if (p != publ) {
@@ -268,7 +267,8 @@
 				tipc_nodesub_subscribe(&publ->subscr,
 						       msg_orignode(msg),
 						       publ,
-						       (net_ev_handler)node_is_down);
+						       (net_ev_handler)
+						       named_purge_publ);
 			}
 		} else if (msg_type(msg) == WITHDRAWAL) {
 			publ = tipc_nametbl_remove_publ(ntohl(item->type),
diff --git a/net/tipc/net.c b/net/tipc/net.c
index 9bacfd0..8fbc7e6 100644
--- a/net/tipc/net.c
+++ b/net/tipc/net.c
@@ -2,7 +2,7 @@
  * net/tipc/net.c: TIPC network routing code
  *
  * Copyright (c) 1995-2006, Ericsson AB
- * Copyright (c) 2005, Wind River Systems
+ * Copyright (c) 2005, 2010-2011, Wind River Systems
  * All rights reserved.
  *
  * Redistribution and use in source and binary forms, with or without
@@ -39,6 +39,7 @@
 #include "name_distr.h"
 #include "subscr.h"
 #include "port.h"
+#include "node.h"
 #include "config.h"
 
 /*
@@ -108,26 +109,6 @@
 */
 
 DEFINE_RWLOCK(tipc_net_lock);
-struct network tipc_net;
-
-static int net_start(void)
-{
-	tipc_net.nodes = kcalloc(tipc_max_nodes + 1,
-				 sizeof(*tipc_net.nodes), GFP_ATOMIC);
-	tipc_net.highest_node = 0;
-
-	return tipc_net.nodes ? 0 : -ENOMEM;
-}
-
-static void net_stop(void)
-{
-	u32 n_num;
-
-	for (n_num = 1; n_num <= tipc_net.highest_node; n_num++)
-		tipc_node_delete(tipc_net.nodes[n_num]);
-	kfree(tipc_net.nodes);
-	tipc_net.nodes = NULL;
-}
 
 static void net_route_named_msg(struct sk_buff *buf)
 {
@@ -217,9 +198,6 @@
 	tipc_named_reinit();
 	tipc_port_reinit();
 
-	res = net_start();
-	if (res)
-		return res;
 	res = tipc_bclink_init();
 	if (res)
 		return res;
@@ -235,14 +213,16 @@
 
 void tipc_net_stop(void)
 {
+	struct tipc_node *node, *t_node;
+
 	if (tipc_mode != TIPC_NET_MODE)
 		return;
 	write_lock_bh(&tipc_net_lock);
 	tipc_bearer_stop();
 	tipc_mode = TIPC_NODE_MODE;
 	tipc_bclink_stop();
-	net_stop();
+	list_for_each_entry_safe(node, t_node, &tipc_node_list, list);
+		tipc_node_delete(node);
 	write_unlock_bh(&tipc_net_lock);
 	info("Left network mode\n");
 }
-
diff --git a/net/tipc/net.h b/net/tipc/net.h
index 4ae59ad..9eb4b9e 100644
--- a/net/tipc/net.h
+++ b/net/tipc/net.h
@@ -2,7 +2,7 @@
  * net/tipc/net.h: Include file for TIPC network routing code
  *
  * Copyright (c) 1995-2006, Ericsson AB
- * Copyright (c) 2005, Wind River Systems
+ * Copyright (c) 2005, 2010-2011, Wind River Systems
  * All rights reserved.
  *
  * Redistribution and use in source and binary forms, with or without
@@ -37,23 +37,6 @@
 #ifndef _TIPC_NET_H
 #define _TIPC_NET_H
 
-struct tipc_node;
-
-/**
- * struct network - TIPC network structure
- * @nodes: array of pointers to all nodes within cluster
- * @highest_node: id of highest numbered node within cluster
- * @links: number of (unicast) links to cluster
- */
-
-struct network {
-	struct tipc_node **nodes;
-	u32 highest_node;
-	u32 links;
-};
-
-
-extern struct network tipc_net;
 extern rwlock_t tipc_net_lock;
 
 void tipc_net_route_msg(struct sk_buff *buf);
diff --git a/net/tipc/node.c b/net/tipc/node.c
index e4dba1d..2d106ef 100644
--- a/net/tipc/node.c
+++ b/net/tipc/node.c
@@ -44,9 +44,33 @@
 
 static DEFINE_SPINLOCK(node_create_lock);
 
+static struct hlist_head node_htable[NODE_HTABLE_SIZE];
+LIST_HEAD(tipc_node_list);
+static u32 tipc_num_nodes;
+
+static atomic_t tipc_num_links = ATOMIC_INIT(0);
 u32 tipc_own_tag;
 
 /**
+ * tipc_node_find - locate specified node object, if it exists
+ */
+
+struct tipc_node *tipc_node_find(u32 addr)
+{
+	struct tipc_node *node;
+	struct hlist_node *pos;
+
+	if (unlikely(!in_own_cluster(addr)))
+		return NULL;
+
+	hlist_for_each_entry(node, pos, &node_htable[tipc_hashfn(addr)], hash) {
+		if (node->addr == addr)
+			return node;
+	}
+	return NULL;
+}
+
+/**
  * tipc_node_create - create neighboring node
  *
  * Currently, this routine is called by neighbor discovery code, which holds
@@ -58,8 +82,7 @@
 
 struct tipc_node *tipc_node_create(u32 addr)
 {
-	struct tipc_node *n_ptr;
-	u32 n_num;
+	struct tipc_node *n_ptr, *temp_node;
 
 	spin_lock_bh(&node_create_lock);
 
@@ -78,12 +101,19 @@
 
 	n_ptr->addr = addr;
 	spin_lock_init(&n_ptr->lock);
+	INIT_HLIST_NODE(&n_ptr->hash);
+	INIT_LIST_HEAD(&n_ptr->list);
 	INIT_LIST_HEAD(&n_ptr->nsub);
 
-	n_num = tipc_node(addr);
-	tipc_net.nodes[n_num] = n_ptr;
-	if (n_num > tipc_net.highest_node)
-		tipc_net.highest_node = n_num;
+	hlist_add_head(&n_ptr->hash, &node_htable[tipc_hashfn(addr)]);
+
+	list_for_each_entry(temp_node, &tipc_node_list, list) {
+		if (n_ptr->addr < temp_node->addr)
+			break;
+	}
+	list_add_tail(&n_ptr->list, &temp_node->list);
+
+	tipc_num_nodes++;
 
 	spin_unlock_bh(&node_create_lock);
 	return n_ptr;
@@ -91,18 +121,11 @@
 
 void tipc_node_delete(struct tipc_node *n_ptr)
 {
-	u32 n_num;
-
-	if (!n_ptr)
-		return;
-
-	n_num = tipc_node(n_ptr->addr);
-	tipc_net.nodes[n_num] = NULL;
+	list_del(&n_ptr->list);
+	hlist_del(&n_ptr->hash);
 	kfree(n_ptr);
 
-	while (!tipc_net.nodes[tipc_net.highest_node])
-		if (--tipc_net.highest_node == 0)
-			break;
+	tipc_num_nodes--;
 }
 
 
@@ -200,54 +223,32 @@
 		node_lost_contact(n_ptr);
 }
 
-int tipc_node_has_active_links(struct tipc_node *n_ptr)
+int tipc_node_active_links(struct tipc_node *n_ptr)
 {
 	return n_ptr->active_links[0] != NULL;
 }
 
-int tipc_node_has_redundant_links(struct tipc_node *n_ptr)
+int tipc_node_redundant_links(struct tipc_node *n_ptr)
 {
 	return n_ptr->working_links > 1;
 }
 
 int tipc_node_is_up(struct tipc_node *n_ptr)
 {
-	return tipc_node_has_active_links(n_ptr);
+	return tipc_node_active_links(n_ptr);
 }
 
-struct tipc_node *tipc_node_attach_link(struct link *l_ptr)
+void tipc_node_attach_link(struct tipc_node *n_ptr, struct link *l_ptr)
 {
-	struct tipc_node *n_ptr = tipc_node_find(l_ptr->addr);
-
-	if (!n_ptr)
-		n_ptr = tipc_node_create(l_ptr->addr);
-	if (n_ptr) {
-		u32 bearer_id = l_ptr->b_ptr->identity;
-		char addr_string[16];
-
-		if (n_ptr->link_cnt >= 2) {
-			err("Attempt to create third link to %s\n",
-			    tipc_addr_string_fill(addr_string, n_ptr->addr));
-			return NULL;
-		}
-
-		if (!n_ptr->links[bearer_id]) {
-			n_ptr->links[bearer_id] = l_ptr;
-			tipc_net.links++;
-			n_ptr->link_cnt++;
-			return n_ptr;
-		}
-		err("Attempt to establish second link on <%s> to %s\n",
-		    l_ptr->b_ptr->name,
-		    tipc_addr_string_fill(addr_string, l_ptr->addr));
-	}
-	return NULL;
+	n_ptr->links[l_ptr->b_ptr->identity] = l_ptr;
+	atomic_inc(&tipc_num_links);
+	n_ptr->link_cnt++;
 }
 
 void tipc_node_detach_link(struct tipc_node *n_ptr, struct link *l_ptr)
 {
 	n_ptr->links[l_ptr->b_ptr->identity] = NULL;
-	tipc_net.links--;
+	atomic_dec(&tipc_num_links);
 	n_ptr->link_cnt--;
 }
 
@@ -327,7 +328,6 @@
 
 static void node_lost_contact(struct tipc_node *n_ptr)
 {
-	struct tipc_node_subscr *ns, *tns;
 	char addr_string[16];
 	u32 i;
 
@@ -365,12 +365,7 @@
 	}
 
 	/* Notify subscribers */
-	list_for_each_entry_safe(ns, tns, &n_ptr->nsub, nodesub_list) {
-		ns->node = NULL;
-		list_del_init(&ns->nodesub_list);
-		tipc_k_signal((Handler)ns->handle_node_down,
-			      (unsigned long)ns->usr_handle);
-	}
+	tipc_nodesub_notify(n_ptr);
 
 	/* Prevent re-contact with node until all cleanup is done */
 
@@ -385,7 +380,6 @@
 	struct tipc_node *n_ptr;
 	struct tipc_node_info node_info;
 	u32 payload_size;
-	u32 n_num;
 
 	if (!TLV_CHECK(req_tlv_area, req_tlv_space, TIPC_TLV_NET_ADDR))
 		return tipc_cfg_reply_error_string(TIPC_CFG_TLV_ERROR);
@@ -396,15 +390,14 @@
 						   " (network address)");
 
 	read_lock_bh(&tipc_net_lock);
-	if (!tipc_net.nodes) {
+	if (!tipc_num_nodes) {
 		read_unlock_bh(&tipc_net_lock);
 		return tipc_cfg_reply_none();
 	}
 
 	/* For now, get space for all other nodes */
 
-	payload_size = TLV_SPACE(sizeof(node_info)) *
-		(tipc_net.highest_node - 1);
+	payload_size = TLV_SPACE(sizeof(node_info)) * tipc_num_nodes;
 	if (payload_size > 32768u) {
 		read_unlock_bh(&tipc_net_lock);
 		return tipc_cfg_reply_error_string(TIPC_CFG_NOT_SUPPORTED
@@ -418,9 +411,8 @@
 
 	/* Add TLVs for all nodes in scope */
 
-	for (n_num = 1; n_num <= tipc_net.highest_node; n_num++) {
-		n_ptr = tipc_net.nodes[n_num];
-		if (!n_ptr || !tipc_in_scope(domain, n_ptr->addr))
+	list_for_each_entry(n_ptr, &tipc_node_list, list) {
+		if (!tipc_in_scope(domain, n_ptr->addr))
 			continue;
 		node_info.addr = htonl(n_ptr->addr);
 		node_info.up = htonl(tipc_node_is_up(n_ptr));
@@ -439,7 +431,6 @@
 	struct tipc_node *n_ptr;
 	struct tipc_link_info link_info;
 	u32 payload_size;
-	u32 n_num;
 
 	if (!TLV_CHECK(req_tlv_area, req_tlv_space, TIPC_TLV_NET_ADDR))
 		return tipc_cfg_reply_error_string(TIPC_CFG_TLV_ERROR);
@@ -456,7 +447,8 @@
 
 	/* Get space for all unicast links + multicast link */
 
-	payload_size = TLV_SPACE(sizeof(link_info)) * (tipc_net.links + 1);
+	payload_size = TLV_SPACE(sizeof(link_info)) *
+		(atomic_read(&tipc_num_links) + 1);
 	if (payload_size > 32768u) {
 		read_unlock_bh(&tipc_net_lock);
 		return tipc_cfg_reply_error_string(TIPC_CFG_NOT_SUPPORTED
@@ -470,18 +462,17 @@
 
 	/* Add TLV for broadcast link */
 
-	link_info.dest = htonl(tipc_own_addr & 0xfffff00);
+	link_info.dest = htonl(tipc_cluster_mask(tipc_own_addr));
 	link_info.up = htonl(1);
 	strlcpy(link_info.str, tipc_bclink_name, TIPC_MAX_LINK_NAME);
 	tipc_cfg_append_tlv(buf, TIPC_TLV_LINK_INFO, &link_info, sizeof(link_info));
 
 	/* Add TLVs for any other links in scope */
 
-	for (n_num = 1; n_num <= tipc_net.highest_node; n_num++) {
+	list_for_each_entry(n_ptr, &tipc_node_list, list) {
 		u32 i;
 
-		n_ptr = tipc_net.nodes[n_num];
-		if (!n_ptr || !tipc_in_scope(domain, n_ptr->addr))
+		if (!tipc_in_scope(domain, n_ptr->addr))
 			continue;
 		tipc_node_lock(n_ptr);
 		for (i = 0; i < MAX_BEARERS; i++) {
diff --git a/net/tipc/node.h b/net/tipc/node.h
index 206a8ef..5c61afc 100644
--- a/net/tipc/node.h
+++ b/net/tipc/node.h
@@ -2,7 +2,7 @@
  * net/tipc/node.h: Include file for TIPC node management routines
  *
  * Copyright (c) 2000-2006, Ericsson AB
- * Copyright (c) 2005, Wind River Systems
+ * Copyright (c) 2005, 2010-2011, Wind River Systems
  * All rights reserved.
  *
  * Redistribution and use in source and binary forms, with or without
@@ -46,7 +46,8 @@
  * struct tipc_node - TIPC node structure
  * @addr: network address of node
  * @lock: spinlock governing access to structure
- * @next: pointer to next node in sorted list of cluster's nodes
+ * @hash: links to adjacent nodes in unsorted hash chain
+ * @list: links to adjacent nodes in sorted list of cluster's nodes
  * @nsub: list of "node down" subscriptions monitoring node
  * @active_links: pointers to active links to node
  * @links: pointers to all links to node
@@ -69,7 +70,8 @@
 struct tipc_node {
 	u32 addr;
 	spinlock_t lock;
-	struct tipc_node *next;
+	struct hlist_node hash;
+	struct list_head list;
 	struct list_head nsub;
 	struct link *active_links[2];
 	struct link *links[MAX_BEARERS];
@@ -90,27 +92,35 @@
 	} bclink;
 };
 
+#define NODE_HTABLE_SIZE 512
+extern struct list_head tipc_node_list;
+
+/*
+ * A trivial power-of-two bitmask technique is used for speed, since this
+ * operation is done for every incoming TIPC packet. The number of hash table
+ * entries has been chosen so that no hash chain exceeds 8 nodes and will
+ * usually be much smaller (typically only a single node).
+ */
+static inline unsigned int tipc_hashfn(u32 addr)
+{
+	return addr & (NODE_HTABLE_SIZE - 1);
+}
+
 extern u32 tipc_own_tag;
 
+struct tipc_node *tipc_node_find(u32 addr);
 struct tipc_node *tipc_node_create(u32 addr);
 void tipc_node_delete(struct tipc_node *n_ptr);
-struct tipc_node *tipc_node_attach_link(struct link *l_ptr);
+void tipc_node_attach_link(struct tipc_node *n_ptr, struct link *l_ptr);
 void tipc_node_detach_link(struct tipc_node *n_ptr, struct link *l_ptr);
 void tipc_node_link_down(struct tipc_node *n_ptr, struct link *l_ptr);
 void tipc_node_link_up(struct tipc_node *n_ptr, struct link *l_ptr);
-int tipc_node_has_active_links(struct tipc_node *n_ptr);
-int tipc_node_has_redundant_links(struct tipc_node *n_ptr);
+int tipc_node_active_links(struct tipc_node *n_ptr);
+int tipc_node_redundant_links(struct tipc_node *n_ptr);
 int tipc_node_is_up(struct tipc_node *n_ptr);
 struct sk_buff *tipc_node_get_links(const void *req_tlv_area, int req_tlv_space);
 struct sk_buff *tipc_node_get_nodes(const void *req_tlv_area, int req_tlv_space);
 
-static inline struct tipc_node *tipc_node_find(u32 addr)
-{
-	if (likely(in_own_cluster(addr)))
-		return tipc_net.nodes[tipc_node(addr)];
-	return NULL;
-}
-
 static inline void tipc_node_lock(struct tipc_node *n_ptr)
 {
 	spin_lock_bh(&n_ptr->lock);
diff --git a/net/tipc/node_subscr.c b/net/tipc/node_subscr.c
index 018a553..c3c2815 100644
--- a/net/tipc/node_subscr.c
+++ b/net/tipc/node_subscr.c
@@ -2,7 +2,7 @@
  * net/tipc/node_subscr.c: TIPC "node down" subscription handling
  *
  * Copyright (c) 1995-2006, Ericsson AB
- * Copyright (c) 2005, Wind River Systems
+ * Copyright (c) 2005, 2010-2011, Wind River Systems
  * All rights reserved.
  *
  * Redistribution and use in source and binary forms, with or without
@@ -76,3 +76,22 @@
 	list_del_init(&node_sub->nodesub_list);
 	tipc_node_unlock(node_sub->node);
 }
+
+/**
+ * tipc_nodesub_notify - notify subscribers that a node is unreachable
+ *
+ * Note: node is locked by caller
+ */
+
+void tipc_nodesub_notify(struct tipc_node *node)
+{
+	struct tipc_node_subscr *ns;
+
+	list_for_each_entry(ns, &node->nsub, nodesub_list) {
+		if (ns->handle_node_down) {
+			tipc_k_signal((Handler)ns->handle_node_down,
+				      (unsigned long)ns->usr_handle);
+			ns->handle_node_down = NULL;
+		}
+	}
+}
diff --git a/net/tipc/node_subscr.h b/net/tipc/node_subscr.h
index 006ed73..4bc2ca0 100644
--- a/net/tipc/node_subscr.h
+++ b/net/tipc/node_subscr.h
@@ -2,7 +2,7 @@
  * net/tipc/node_subscr.h: Include file for TIPC "node down" subscription handling
  *
  * Copyright (c) 1995-2006, Ericsson AB
- * Copyright (c) 2005, Wind River Systems
+ * Copyright (c) 2005, 2010-2011, Wind River Systems
  * All rights reserved.
  *
  * Redistribution and use in source and binary forms, with or without
@@ -59,5 +59,6 @@
 void tipc_nodesub_subscribe(struct tipc_node_subscr *node_sub, u32 addr,
 			    void *usr_handle, net_ev_handler handle_down);
 void tipc_nodesub_unsubscribe(struct tipc_node_subscr *node_sub);
+void tipc_nodesub_notify(struct tipc_node *node);
 
 #endif
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index 125dcb0..29d94d5 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -58,6 +58,9 @@
 #define tipc_sk(sk) ((struct tipc_sock *)(sk))
 #define tipc_sk_port(sk) ((struct tipc_port *)(tipc_sk(sk)->p))
 
+#define tipc_rx_ready(sock) (!skb_queue_empty(&sock->sk->sk_receive_queue) || \
+			(sock->state == SS_DISCONNECTING))
+
 static int backlog_rcv(struct sock *sk, struct sk_buff *skb);
 static u32 dispatch(struct tipc_port *tport, struct sk_buff *buf);
 static void wakeupdispatch(struct tipc_port *tport);
@@ -289,7 +292,7 @@
 		if (buf == NULL)
 			break;
 		atomic_dec(&tipc_queue_size);
-		if (TIPC_SKB_CB(buf)->handle != msg_data(buf_msg(buf)))
+		if (TIPC_SKB_CB(buf)->handle != 0)
 			buf_discard(buf);
 		else {
 			if ((sock->state == SS_CONNECTING) ||
@@ -911,15 +914,13 @@
 	struct tipc_port *tport = tipc_sk_port(sk);
 	struct sk_buff *buf;
 	struct tipc_msg *msg;
+	long timeout;
 	unsigned int sz;
 	u32 err;
 	int res;
 
 	/* Catch invalid receive requests */
 
-	if (m->msg_iovlen != 1)
-		return -EOPNOTSUPP;   /* Don't do multiple iovec entries yet */
-
 	if (unlikely(!buf_len))
 		return -EINVAL;
 
@@ -930,6 +931,7 @@
 		goto exit;
 	}
 
+	timeout = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
 restart:
 
 	/* Look for a message in receive queue; wait if necessary */
@@ -939,17 +941,15 @@
 			res = -ENOTCONN;
 			goto exit;
 		}
-		if (flags & MSG_DONTWAIT) {
-			res = -EWOULDBLOCK;
+		if (timeout <= 0L) {
+			res = timeout ? timeout : -EWOULDBLOCK;
 			goto exit;
 		}
 		release_sock(sk);
-		res = wait_event_interruptible(*sk_sleep(sk),
-			(!skb_queue_empty(&sk->sk_receive_queue) ||
-			 (sock->state == SS_DISCONNECTING)));
+		timeout = wait_event_interruptible_timeout(*sk_sleep(sk),
+							   tipc_rx_ready(sock),
+							   timeout);
 		lock_sock(sk);
-		if (res)
-			goto exit;
 	}
 
 	/* Look at first message in receive queue */
@@ -991,11 +991,10 @@
 			sz = buf_len;
 			m->msg_flags |= MSG_TRUNC;
 		}
-		if (unlikely(copy_to_user(m->msg_iov->iov_base, msg_data(msg),
-					  sz))) {
-			res = -EFAULT;
+		res = skb_copy_datagram_iovec(buf, msg_hdr_sz(msg),
+					      m->msg_iov, sz);
+		if (res)
 			goto exit;
-		}
 		res = sz;
 	} else {
 		if ((sock->state == SS_READY) ||
@@ -1038,19 +1037,15 @@
 	struct tipc_port *tport = tipc_sk_port(sk);
 	struct sk_buff *buf;
 	struct tipc_msg *msg;
+	long timeout;
 	unsigned int sz;
 	int sz_to_copy, target, needed;
 	int sz_copied = 0;
-	char __user *crs = m->msg_iov->iov_base;
-	unsigned char *buf_crs;
 	u32 err;
 	int res = 0;
 
 	/* Catch invalid receive attempts */
 
-	if (m->msg_iovlen != 1)
-		return -EOPNOTSUPP;   /* Don't do multiple iovec entries yet */
-
 	if (unlikely(!buf_len))
 		return -EINVAL;
 
@@ -1063,7 +1058,7 @@
 	}
 
 	target = sock_rcvlowat(sk, flags & MSG_WAITALL, buf_len);
-
+	timeout = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
 restart:
 
 	/* Look for a message in receive queue; wait if necessary */
@@ -1073,17 +1068,15 @@
 			res = -ENOTCONN;
 			goto exit;
 		}
-		if (flags & MSG_DONTWAIT) {
-			res = -EWOULDBLOCK;
+		if (timeout <= 0L) {
+			res = timeout ? timeout : -EWOULDBLOCK;
 			goto exit;
 		}
 		release_sock(sk);
-		res = wait_event_interruptible(*sk_sleep(sk),
-			(!skb_queue_empty(&sk->sk_receive_queue) ||
-			 (sock->state == SS_DISCONNECTING)));
+		timeout = wait_event_interruptible_timeout(*sk_sleep(sk),
+							   tipc_rx_ready(sock),
+							   timeout);
 		lock_sock(sk);
-		if (res)
-			goto exit;
 	}
 
 	/* Look at first message in receive queue */
@@ -1112,24 +1105,25 @@
 	/* Capture message data (if valid) & compute return value (always) */
 
 	if (!err) {
-		buf_crs = (unsigned char *)(TIPC_SKB_CB(buf)->handle);
-		sz = (unsigned char *)msg + msg_size(msg) - buf_crs;
+		u32 offset = (u32)(unsigned long)(TIPC_SKB_CB(buf)->handle);
 
+		sz -= offset;
 		needed = (buf_len - sz_copied);
 		sz_to_copy = (sz <= needed) ? sz : needed;
-		if (unlikely(copy_to_user(crs, buf_crs, sz_to_copy))) {
-			res = -EFAULT;
+
+		res = skb_copy_datagram_iovec(buf, msg_hdr_sz(msg) + offset,
+					      m->msg_iov, sz_to_copy);
+		if (res)
 			goto exit;
-		}
+
 		sz_copied += sz_to_copy;
 
 		if (sz_to_copy < sz) {
 			if (!(flags & MSG_PEEK))
-				TIPC_SKB_CB(buf)->handle = buf_crs + sz_to_copy;
+				TIPC_SKB_CB(buf)->handle =
+				(void *)(unsigned long)(offset + sz_to_copy);
 			goto exit;
 		}
-
-		crs += sz_to_copy;
 	} else {
 		if (sz_copied != 0)
 			goto exit; /* can't add error msg to valid data */
@@ -1256,7 +1250,7 @@
 
 	/* Enqueue message (finally!) */
 
-	TIPC_SKB_CB(buf)->handle = msg_data(msg);
+	TIPC_SKB_CB(buf)->handle = 0;
 	atomic_inc(&tipc_queue_size);
 	__skb_queue_tail(&sk->sk_receive_queue, buf);
 
@@ -1608,7 +1602,7 @@
 		buf = __skb_dequeue(&sk->sk_receive_queue);
 		if (buf) {
 			atomic_dec(&tipc_queue_size);
-			if (TIPC_SKB_CB(buf)->handle != msg_data(buf_msg(buf))) {
+			if (TIPC_SKB_CB(buf)->handle != 0) {
 				buf_discard(buf);
 				goto restart;
 			}