Merge branch 'tipc-next'

Jon Maloy says:

====================
tipc: multicast and internal users to new send functions

We move the remaining data transmit users: multicast, name table
distributor, and link internal protocols to use the new data
transmission framework introduced in a previous commit series
("tipc: new unicast transmission code").

Finally, we remove the code obsoleted by the new functions.

v2: - Fixed a braindead, but harmless return sequence in commit #3, as
      reported by David Miller.
    - Rebased series to 3.16.0-rc5+
====================

Signed-off-by: David S. Miller <davem@davemloft.net>
diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c
index 55c6c9d..dd13bfa 100644
--- a/net/tipc/bcast.c
+++ b/net/tipc/bcast.c
@@ -1,7 +1,7 @@
 /*
  * net/tipc/bcast.c: TIPC broadcast code
  *
- * Copyright (c) 2004-2006, Ericsson AB
+ * Copyright (c) 2004-2006, 2014, Ericsson AB
  * Copyright (c) 2004, Intel Corporation.
  * Copyright (c) 2005, 2010-2011, Wind River Systems
  * All rights reserved.
@@ -38,6 +38,8 @@
 #include "core.h"
 #include "link.h"
 #include "port.h"
+#include "socket.h"
+#include "msg.h"
 #include "bcast.h"
 #include "name_distr.h"
 
@@ -138,6 +140,11 @@
 		tipc_link_reset_all(node);
 }
 
+uint  tipc_bclink_get_mtu(void)
+{
+	return MAX_PKT_DEFAULT_MCAST;
+}
+
 void tipc_bclink_set_flags(unsigned int flags)
 {
 	bclink->flags |= flags;
@@ -382,30 +389,50 @@
 	tipc_node_unlock(n_ptr);
 }
 
-/*
- * tipc_bclink_xmit - broadcast a packet to all nodes in cluster
+/* tipc_bclink_xmit - broadcast buffer chain to all nodes in cluster
+ *                    and to identified node local sockets
+ * @buf: chain of buffers containing message
+ * Consumes the buffer chain, except when returning -ELINKCONG
+ * Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE
  */
 int tipc_bclink_xmit(struct sk_buff *buf)
 {
-	int res;
+	int rc = 0;
+	int bc = 0;
+	struct sk_buff *clbuf;
 
-	tipc_bclink_lock();
-
-	if (!bclink->bcast_nodes.count) {
-		res = msg_data_sz(buf_msg(buf));
-		kfree_skb(buf);
-		goto exit;
+	/* Prepare clone of message for local node */
+	clbuf = tipc_msg_reassemble(buf);
+	if (unlikely(!clbuf)) {
+		kfree_skb_list(buf);
+		return -EHOSTUNREACH;
 	}
 
-	res = __tipc_link_xmit(bcl, buf);
-	if (likely(res >= 0)) {
-		bclink_set_last_sent();
-		bcl->stats.queue_sz_counts++;
-		bcl->stats.accu_queue_sz += bcl->out_queue_size;
+	/* Broadcast to all other nodes */
+	if (likely(bclink)) {
+		tipc_bclink_lock();
+		if (likely(bclink->bcast_nodes.count)) {
+			rc = __tipc_link_xmit(bcl, buf);
+			if (likely(!rc)) {
+				bclink_set_last_sent();
+				bcl->stats.queue_sz_counts++;
+				bcl->stats.accu_queue_sz += bcl->out_queue_size;
+			}
+			bc = 1;
+		}
+		tipc_bclink_unlock();
 	}
-exit:
-	tipc_bclink_unlock();
-	return res;
+
+	if (unlikely(!bc))
+		kfree_skb_list(buf);
+
+	/* Deliver message clone */
+	if (likely(!rc))
+		tipc_sk_mcast_rcv(clbuf);
+	else
+		kfree_skb(clbuf);
+
+	return rc;
 }
 
 /**
@@ -443,7 +470,7 @@
 	struct tipc_node *node;
 	u32 next_in;
 	u32 seqno;
-	int deferred;
+	int deferred = 0;
 
 	/* Screen out unwanted broadcast messages */
 
@@ -494,7 +521,7 @@
 			tipc_bclink_unlock();
 			tipc_node_unlock(node);
 			if (likely(msg_mcast(msg)))
-				tipc_port_mcast_rcv(buf, NULL);
+				tipc_sk_mcast_rcv(buf);
 			else
 				kfree_skb(buf);
 		} else if (msg_user(msg) == MSG_BUNDLER) {
@@ -573,8 +600,7 @@
 		node->bclink.deferred_size += deferred;
 		bclink_update_last_sent(node, seqno);
 		buf = NULL;
-	} else
-		deferred = 0;
+	}
 
 	tipc_bclink_lock();
 
@@ -611,6 +637,7 @@
 			      struct tipc_media_addr *unused2)
 {
 	int bp_index;
+	struct tipc_msg *msg = buf_msg(buf);
 
 	/* Prepare broadcast link message for reliable transmission,
 	 * if first time trying to send it;
@@ -618,10 +645,7 @@
 	 * since they are sent in an unreliable manner and don't need it
 	 */
 	if (likely(!msg_non_seq(buf_msg(buf)))) {
-		struct tipc_msg *msg;
-
 		bcbuf_set_acks(buf, bclink->bcast_nodes.count);
-		msg = buf_msg(buf);
 		msg_set_non_seq(msg, 1);
 		msg_set_mc_netid(msg, tipc_net_id);
 		bcl->stats.sent_info++;
@@ -638,12 +662,14 @@
 	for (bp_index = 0; bp_index < MAX_BEARERS; bp_index++) {
 		struct tipc_bearer *p = bcbearer->bpairs[bp_index].primary;
 		struct tipc_bearer *s = bcbearer->bpairs[bp_index].secondary;
-		struct tipc_bearer *b = p;
+		struct tipc_bearer *bp[2] = {p, s};
+		struct tipc_bearer *b = bp[msg_link_selector(msg)];
 		struct sk_buff *tbuf;
 
 		if (!p)
 			break; /* No more bearers to try */
-
+		if (!b)
+			b = p;
 		tipc_nmap_diff(&bcbearer->remains, &b->nodes,
 			       &bcbearer->remains_new);
 		if (bcbearer->remains_new.count == bcbearer->remains.count)
@@ -660,13 +686,6 @@
 			tipc_bearer_send(b->identity, tbuf, &b->bcast_addr);
 			kfree_skb(tbuf); /* Bearer keeps a clone */
 		}
-
-		/* Swap bearers for next packet */
-		if (s) {
-			bcbearer->bpairs[bp_index].primary = s;
-			bcbearer->bpairs[bp_index].secondary = p;
-		}
-
 		if (bcbearer->remains_new.count == 0)
 			break; /* All targets reached */
 
diff --git a/net/tipc/bcast.h b/net/tipc/bcast.h
index 00330c4..4875d95 100644
--- a/net/tipc/bcast.h
+++ b/net/tipc/bcast.h
@@ -1,7 +1,7 @@
 /*
  * net/tipc/bcast.h: Include file for TIPC broadcast code
  *
- * Copyright (c) 2003-2006, Ericsson AB
+ * Copyright (c) 2003-2006, 2014, Ericsson AB
  * Copyright (c) 2005, 2010-2011, Wind River Systems
  * All rights reserved.
  *
@@ -89,7 +89,6 @@
 void tipc_bclink_remove_node(u32 addr);
 struct tipc_node *tipc_bclink_retransmit_to(void);
 void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked);
-int  tipc_bclink_xmit(struct sk_buff *buf);
 void tipc_bclink_rcv(struct sk_buff *buf);
 u32  tipc_bclink_get_last_sent(void);
 u32  tipc_bclink_acks_missing(struct tipc_node *n_ptr);
@@ -98,5 +97,7 @@
 int  tipc_bclink_reset_stats(void);
 int  tipc_bclink_set_queue_limits(u32 limit);
 void tipc_bcbearer_sort(struct tipc_node_map *nm_ptr, u32 node, bool action);
+uint  tipc_bclink_get_mtu(void);
+int tipc_bclink_xmit(struct sk_buff *buf);
 
 #endif
diff --git a/net/tipc/link.c b/net/tipc/link.c
index a235b24..fb1485d 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -85,7 +85,6 @@
 static void link_state_event(struct tipc_link *l_ptr, u32 event);
 static void link_reset_statistics(struct tipc_link *l_ptr);
 static void link_print(struct tipc_link *l_ptr, const char *str);
-static int tipc_link_frag_xmit(struct tipc_link *l_ptr, struct sk_buff *buf);
 static void tipc_link_sync_xmit(struct tipc_link *l);
 static void tipc_link_sync_rcv(struct tipc_node *n, struct sk_buff *buf);
 static int tipc_link_input(struct tipc_link *l, struct sk_buff *buf);
@@ -679,180 +678,6 @@
 	}
 }
 
-/*
- * link_bundle_buf(): Append contents of a buffer to
- * the tail of an existing one.
- */
-static int link_bundle_buf(struct tipc_link *l_ptr, struct sk_buff *bundler,
-			   struct sk_buff *buf)
-{
-	struct tipc_msg *bundler_msg = buf_msg(bundler);
-	struct tipc_msg *msg = buf_msg(buf);
-	u32 size = msg_size(msg);
-	u32 bundle_size = msg_size(bundler_msg);
-	u32 to_pos = align(bundle_size);
-	u32 pad = to_pos - bundle_size;
-
-	if (msg_user(bundler_msg) != MSG_BUNDLER)
-		return 0;
-	if (msg_type(bundler_msg) != OPEN_MSG)
-		return 0;
-	if (skb_tailroom(bundler) < (pad + size))
-		return 0;
-	if (l_ptr->max_pkt < (to_pos + size))
-		return 0;
-
-	skb_put(bundler, pad + size);
-	skb_copy_to_linear_data_offset(bundler, to_pos, buf->data, size);
-	msg_set_size(bundler_msg, to_pos + size);
-	msg_set_msgcnt(bundler_msg, msg_msgcnt(bundler_msg) + 1);
-	kfree_skb(buf);
-	l_ptr->stats.sent_bundled++;
-	return 1;
-}
-
-static void link_add_to_outqueue(struct tipc_link *l_ptr,
-				 struct sk_buff *buf,
-				 struct tipc_msg *msg)
-{
-	u32 ack = mod(l_ptr->next_in_no - 1);
-	u32 seqno = mod(l_ptr->next_out_no++);
-
-	msg_set_word(msg, 2, ((ack << 16) | seqno));
-	msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);
-	buf->next = NULL;
-	if (l_ptr->first_out) {
-		l_ptr->last_out->next = buf;
-		l_ptr->last_out = buf;
-	} else
-		l_ptr->first_out = l_ptr->last_out = buf;
-
-	l_ptr->out_queue_size++;
-	if (l_ptr->out_queue_size > l_ptr->stats.max_queue_sz)
-		l_ptr->stats.max_queue_sz = l_ptr->out_queue_size;
-}
-
-static void link_add_chain_to_outqueue(struct tipc_link *l_ptr,
-				       struct sk_buff *buf_chain,
-				       u32 long_msgno)
-{
-	struct sk_buff *buf;
-	struct tipc_msg *msg;
-
-	if (!l_ptr->next_out)
-		l_ptr->next_out = buf_chain;
-	while (buf_chain) {
-		buf = buf_chain;
-		buf_chain = buf_chain->next;
-
-		msg = buf_msg(buf);
-		msg_set_long_msgno(msg, long_msgno);
-		link_add_to_outqueue(l_ptr, buf, msg);
-	}
-}
-
-/*
- * tipc_link_xmit() is the 'full path' for messages, called from
- * inside TIPC when the 'fast path' in tipc_send_xmit
- * has failed, and from link_send()
- */
-int __tipc_link_xmit(struct tipc_link *l_ptr, struct sk_buff *buf)
-{
-	struct tipc_msg *msg = buf_msg(buf);
-	u32 size = msg_size(msg);
-	u32 dsz = msg_data_sz(msg);
-	u32 queue_size = l_ptr->out_queue_size;
-	u32 imp = tipc_msg_tot_importance(msg);
-	u32 queue_limit = l_ptr->queue_limit[imp];
-	u32 max_packet = l_ptr->max_pkt;
-
-	/* Match msg importance against queue limits: */
-	if (unlikely(queue_size >= queue_limit)) {
-		if (imp <= TIPC_CRITICAL_IMPORTANCE) {
-			link_schedule_port(l_ptr, msg_origport(msg), size);
-			kfree_skb(buf);
-			return -ELINKCONG;
-		}
-		kfree_skb(buf);
-		if (imp > CONN_MANAGER) {
-			pr_warn("%s<%s>, send queue full", link_rst_msg,
-				l_ptr->name);
-			tipc_link_reset(l_ptr);
-		}
-		return dsz;
-	}
-
-	/* Fragmentation needed ? */
-	if (size > max_packet)
-		return tipc_link_frag_xmit(l_ptr, buf);
-
-	/* Packet can be queued or sent. */
-	if (likely(!link_congested(l_ptr))) {
-		link_add_to_outqueue(l_ptr, buf, msg);
-
-		tipc_bearer_send(l_ptr->bearer_id, buf, &l_ptr->media_addr);
-		l_ptr->unacked_window = 0;
-		return dsz;
-	}
-	/* Congestion: can message be bundled ? */
-	if ((msg_user(msg) != CHANGEOVER_PROTOCOL) &&
-	    (msg_user(msg) != MSG_FRAGMENTER)) {
-
-		/* Try adding message to an existing bundle */
-		if (l_ptr->next_out &&
-		    link_bundle_buf(l_ptr, l_ptr->last_out, buf))
-			return dsz;
-
-		/* Try creating a new bundle */
-		if (size <= max_packet * 2 / 3) {
-			struct sk_buff *bundler = tipc_buf_acquire(max_packet);
-			struct tipc_msg bundler_hdr;
-
-			if (bundler) {
-				tipc_msg_init(&bundler_hdr, MSG_BUNDLER, OPEN_MSG,
-					 INT_H_SIZE, l_ptr->addr);
-				skb_copy_to_linear_data(bundler, &bundler_hdr,
-							INT_H_SIZE);
-				skb_trim(bundler, INT_H_SIZE);
-				link_bundle_buf(l_ptr, bundler, buf);
-				buf = bundler;
-				msg = buf_msg(buf);
-				l_ptr->stats.sent_bundles++;
-			}
-		}
-	}
-	if (!l_ptr->next_out)
-		l_ptr->next_out = buf;
-	link_add_to_outqueue(l_ptr, buf, msg);
-	return dsz;
-}
-
-/*
- * tipc_link_xmit(): same as __tipc_link_xmit(), but the link to use
- * has not been selected yet, and the the owner node is not locked
- * Called by TIPC internal users, e.g. the name distributor
- */
-int tipc_link_xmit(struct sk_buff *buf, u32 dest, u32 selector)
-{
-	struct tipc_link *l_ptr;
-	struct tipc_node *n_ptr;
-	int res = -ELINKCONG;
-
-	n_ptr = tipc_node_find(dest);
-	if (n_ptr) {
-		tipc_node_lock(n_ptr);
-		l_ptr = n_ptr->active_links[selector & 1];
-		if (l_ptr)
-			res = __tipc_link_xmit(l_ptr, buf);
-		else
-			kfree_skb(buf);
-		tipc_node_unlock(n_ptr);
-	} else {
-		kfree_skb(buf);
-	}
-	return res;
-}
-
 /* tipc_link_cong: determine return value and how to treat the
  * sent buffer during link congestion.
  * - For plain, errorless user data messages we keep the buffer and
@@ -881,7 +706,7 @@
 }
 
 /**
- * __tipc_link_xmit2(): same as tipc_link_xmit2, but destlink is known & locked
+ * __tipc_link_xmit(): same as tipc_link_xmit, but destlink is known & locked
  * @link: link to use
  * @buf: chain of buffers containing message
  * Consumes the buffer chain, except when returning -ELINKCONG
@@ -890,7 +715,7 @@
  * Only the socket functions tipc_send_stream() and tipc_send_packet() need
  * to act on the return value, since they may need to do more send attempts.
  */
-int __tipc_link_xmit2(struct tipc_link *link, struct sk_buff *buf)
+int __tipc_link_xmit(struct tipc_link *link, struct sk_buff *buf)
 {
 	struct tipc_msg *msg = buf_msg(buf);
 	uint psz = msg_size(msg);
@@ -958,7 +783,7 @@
 }
 
 /**
- * tipc_link_xmit2() is the general link level function for message sending
+ * tipc_link_xmit() is the general link level function for message sending
  * @buf: chain of buffers containing message
  * @dsz: amount of user data to be sent
  * @dnode: address of destination node
@@ -966,7 +791,7 @@
  * Consumes the buffer chain, except when returning -ELINKCONG
  * Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE
  */
-int tipc_link_xmit2(struct sk_buff *buf, u32 dnode, u32 selector)
+int tipc_link_xmit(struct sk_buff *buf, u32 dnode, u32 selector)
 {
 	struct tipc_link *link = NULL;
 	struct tipc_node *node;
@@ -977,7 +802,7 @@
 		tipc_node_lock(node);
 		link = node->active_links[selector & 1];
 		if (link)
-			rc = __tipc_link_xmit2(link, buf);
+			rc = __tipc_link_xmit(link, buf);
 		tipc_node_unlock(node);
 	}
 
@@ -999,7 +824,7 @@
  *
  * Called with node locked
  */
-static void tipc_link_sync_xmit(struct tipc_link *l)
+static void tipc_link_sync_xmit(struct tipc_link *link)
 {
 	struct sk_buff *buf;
 	struct tipc_msg *msg;
@@ -1009,10 +834,9 @@
 		return;
 
 	msg = buf_msg(buf);
-	tipc_msg_init(msg, BCAST_PROTOCOL, STATE_MSG, INT_H_SIZE, l->addr);
-	msg_set_last_bcast(msg, l->owner->bclink.acked);
-	link_add_chain_to_outqueue(l, buf, 0);
-	tipc_link_push_queue(l);
+	tipc_msg_init(msg, BCAST_PROTOCOL, STATE_MSG, INT_H_SIZE, link->addr);
+	msg_set_last_bcast(msg, link->owner->bclink.acked);
+	__tipc_link_xmit(link, buf);
 }
 
 /*
@@ -1033,47 +857,6 @@
 }
 
 /*
- * tipc_link_names_xmit - send name table entries to new neighbor
- *
- * Send routine for bulk delivery of name table messages when contact
- * with a new neighbor occurs. No link congestion checking is performed
- * because name table messages *must* be delivered. The messages must be
- * small enough not to require fragmentation.
- * Called without any locks held.
- */
-void tipc_link_names_xmit(struct list_head *message_list, u32 dest)
-{
-	struct tipc_node *n_ptr;
-	struct tipc_link *l_ptr;
-	struct sk_buff *buf;
-	struct sk_buff *temp_buf;
-
-	if (list_empty(message_list))
-		return;
-
-	n_ptr = tipc_node_find(dest);
-	if (n_ptr) {
-		tipc_node_lock(n_ptr);
-		l_ptr = n_ptr->active_links[0];
-		if (l_ptr) {
-			/* convert circular list to linear list */
-			((struct sk_buff *)message_list->prev)->next = NULL;
-			link_add_chain_to_outqueue(l_ptr,
-				(struct sk_buff *)message_list->next, 0);
-			tipc_link_push_queue(l_ptr);
-			INIT_LIST_HEAD(message_list);
-		}
-		tipc_node_unlock(n_ptr);
-	}
-
-	/* discard the messages if they couldn't be sent */
-	list_for_each_safe(buf, temp_buf, ((struct sk_buff *)message_list)) {
-		list_del((struct list_head *)buf);
-		kfree_skb(buf);
-	}
-}
-
-/*
  * tipc_link_push_packet: Push one unsent packet to the media
  */
 static u32 tipc_link_push_packet(struct tipc_link *l_ptr)
@@ -2165,78 +1948,6 @@
 	kfree_skb(buf);
 }
 
-/*
- *  Fragmentation/defragmentation:
- */
-
-/*
- * tipc_link_frag_xmit: Entry for buffers needing fragmentation.
- * The buffer is complete, inclusive total message length.
- * Returns user data length.
- */
-static int tipc_link_frag_xmit(struct tipc_link *l_ptr, struct sk_buff *buf)
-{
-	struct sk_buff *buf_chain = NULL;
-	struct sk_buff *buf_chain_tail = (struct sk_buff *)&buf_chain;
-	struct tipc_msg *inmsg = buf_msg(buf);
-	struct tipc_msg fragm_hdr;
-	u32 insize = msg_size(inmsg);
-	u32 dsz = msg_data_sz(inmsg);
-	unchar *crs = buf->data;
-	u32 rest = insize;
-	u32 pack_sz = l_ptr->max_pkt;
-	u32 fragm_sz = pack_sz - INT_H_SIZE;
-	u32 fragm_no = 0;
-	u32 destaddr;
-
-	if (msg_short(inmsg))
-		destaddr = l_ptr->addr;
-	else
-		destaddr = msg_destnode(inmsg);
-
-	/* Prepare reusable fragment header: */
-	tipc_msg_init(&fragm_hdr, MSG_FRAGMENTER, FIRST_FRAGMENT,
-		 INT_H_SIZE, destaddr);
-
-	/* Chop up message: */
-	while (rest > 0) {
-		struct sk_buff *fragm;
-
-		if (rest <= fragm_sz) {
-			fragm_sz = rest;
-			msg_set_type(&fragm_hdr, LAST_FRAGMENT);
-		}
-		fragm = tipc_buf_acquire(fragm_sz + INT_H_SIZE);
-		if (fragm == NULL) {
-			kfree_skb(buf);
-			kfree_skb_list(buf_chain);
-			return -ENOMEM;
-		}
-		msg_set_size(&fragm_hdr, fragm_sz + INT_H_SIZE);
-		fragm_no++;
-		msg_set_fragm_no(&fragm_hdr, fragm_no);
-		skb_copy_to_linear_data(fragm, &fragm_hdr, INT_H_SIZE);
-		skb_copy_to_linear_data_offset(fragm, INT_H_SIZE, crs,
-					       fragm_sz);
-		buf_chain_tail->next = fragm;
-		buf_chain_tail = fragm;
-
-		rest -= fragm_sz;
-		crs += fragm_sz;
-		msg_set_type(&fragm_hdr, FRAGMENT);
-	}
-	kfree_skb(buf);
-
-	/* Append chain of fragments to send queue & send them */
-	l_ptr->long_msg_seq_no++;
-	link_add_chain_to_outqueue(l_ptr, buf_chain, l_ptr->long_msg_seq_no);
-	l_ptr->stats.sent_fragments += fragm_no;
-	l_ptr->stats.sent_fragmented++;
-	tipc_link_push_queue(l_ptr);
-
-	return dsz;
-}
-
 static void link_set_supervision_props(struct tipc_link *l_ptr, u32 tolerance)
 {
 	if ((tolerance < TIPC_MIN_LINK_TOL) || (tolerance > TIPC_MAX_LINK_TOL))
diff --git a/net/tipc/link.h b/net/tipc/link.h
index 227ff81..782983c 100644
--- a/net/tipc/link.h
+++ b/net/tipc/link.h
@@ -227,15 +227,8 @@
 void tipc_link_reset(struct tipc_link *l_ptr);
 void tipc_link_reset_list(unsigned int bearer_id);
 int tipc_link_xmit(struct sk_buff *buf, u32 dest, u32 selector);
-int tipc_link_xmit2(struct sk_buff *buf, u32 dest, u32 selector);
-void tipc_link_names_xmit(struct list_head *message_list, u32 dest);
-int __tipc_link_xmit(struct tipc_link *l_ptr, struct sk_buff *buf);
-int __tipc_link_xmit2(struct tipc_link *link, struct sk_buff *buf);
-int tipc_link_send_buf(struct tipc_link *l_ptr, struct sk_buff *buf);
+int __tipc_link_xmit(struct tipc_link *link, struct sk_buff *buf);
 u32 tipc_link_get_max_pkt(u32 dest, u32 selector);
-int tipc_link_iovec_xmit_fast(struct tipc_port *sender,
-			      struct iovec const *msg_sect,
-			      unsigned int len, u32 destnode);
 void tipc_link_bundle_rcv(struct sk_buff *buf);
 void tipc_link_proto_xmit(struct tipc_link *l_ptr, u32 msg_typ, int prob,
 			  u32 gap, u32 tolerance, u32 priority, u32 acked_mtu);
diff --git a/net/tipc/msg.c b/net/tipc/msg.c
index ce6d929..b6f45d0 100644
--- a/net/tipc/msg.c
+++ b/net/tipc/msg.c
@@ -60,41 +60,6 @@
 	msg_set_destnode(m, destnode);
 }
 
-/**
- * tipc_msg_build - create message using specified header and data
- *
- * Note: Caller must not hold any locks in case copy_from_user() is interrupted!
- *
- * Returns message data size or errno
- */
-int tipc_msg_build(struct tipc_msg *hdr, struct iovec const *msg_sect,
-		   unsigned int len, int max_size, struct sk_buff **buf)
-{
-	int dsz, sz, hsz;
-	unsigned char *to;
-
-	dsz = len;
-	hsz = msg_hdr_sz(hdr);
-	sz = hsz + dsz;
-	msg_set_size(hdr, sz);
-	if (unlikely(sz > max_size)) {
-		*buf = NULL;
-		return dsz;
-	}
-
-	*buf = tipc_buf_acquire(sz);
-	if (!(*buf))
-		return -ENOMEM;
-	skb_copy_to_linear_data(*buf, hdr, hsz);
-	to = (*buf)->data + hsz;
-	if (len && memcpy_fromiovecend(to, msg_sect, 0, dsz)) {
-		kfree_skb(*buf);
-		*buf = NULL;
-		return -EFAULT;
-	}
-	return dsz;
-}
-
 /* tipc_buf_append(): Append a buffer to the fragment list of another buffer
  * @*headbuf: in:  NULL for first frag, otherwise value returned from prev call
  *            out: set when successful non-complete reassembly, otherwise NULL
@@ -155,7 +120,7 @@
 
 
 /**
- * tipc_msg_build2 - create buffer chain containing specified header and data
+ * tipc_msg_build - create buffer chain containing specified header and data
  * @mhdr: Message header, to be prepended to data
  * @iov: User data
  * @offset: Posision in iov to start copying from
@@ -164,8 +129,8 @@
  * @chain: Buffer or chain of buffers to be returned to caller
  * Returns message data size or errno: -ENOMEM, -EFAULT
  */
-int tipc_msg_build2(struct tipc_msg *mhdr, struct iovec const *iov,
-		    int offset, int dsz, int pktmax , struct sk_buff **chain)
+int tipc_msg_build(struct tipc_msg *mhdr, struct iovec const *iov,
+		   int offset, int dsz, int pktmax , struct sk_buff **chain)
 {
 	int mhsz = msg_hdr_sz(mhdr);
 	int msz = mhsz + dsz;
@@ -417,3 +382,38 @@
 	msg_set_destport(msg, dport);
 	return TIPC_OK;
 }
+
+/* tipc_msg_reassemble() - clone a buffer chain of fragments and
+ *                         reassemble the clones into one message
+ */
+struct sk_buff *tipc_msg_reassemble(struct sk_buff *chain)
+{
+	struct sk_buff *buf = chain;
+	struct sk_buff *frag = buf;
+	struct sk_buff *head = NULL;
+	int hdr_sz;
+
+	/* Copy header if single buffer */
+	if (!buf->next) {
+		hdr_sz = skb_headroom(buf) + msg_hdr_sz(buf_msg(buf));
+		return __pskb_copy(buf, hdr_sz, GFP_ATOMIC);
+	}
+
+	/* Clone all fragments and reassemble */
+	while (buf) {
+		frag = skb_clone(buf, GFP_ATOMIC);
+		if (!frag)
+			goto error;
+		frag->next = NULL;
+		if (tipc_buf_append(&head, &frag))
+			break;
+		if (!head)
+			goto error;
+		buf = buf->next;
+	}
+	return frag;
+error:
+	pr_warn("Failed do clone local mcast rcv buffer\n");
+	kfree_skb(head);
+	return NULL;
+}
diff --git a/net/tipc/msg.h b/net/tipc/msg.h
index 7d57434..462fa19 100644
--- a/net/tipc/msg.h
+++ b/net/tipc/msg.h
@@ -732,16 +732,15 @@
 void tipc_msg_init(struct tipc_msg *m, u32 user, u32 type, u32 hsize,
 		   u32 destnode);
 
-int tipc_msg_build(struct tipc_msg *hdr, struct iovec const *msg_sect,
-		   unsigned int len, int max_size, struct sk_buff **buf);
-
 int tipc_buf_append(struct sk_buff **headbuf, struct sk_buff **buf);
 
 bool tipc_msg_bundle(struct sk_buff *bbuf, struct sk_buff *buf, u32 mtu);
 
 bool tipc_msg_make_bundle(struct sk_buff **buf, u32 mtu, u32 dnode);
 
-int tipc_msg_build2(struct tipc_msg *mhdr, struct iovec const *iov,
-		    int offset, int dsz, int mtu , struct sk_buff **chain);
+int tipc_msg_build(struct tipc_msg *mhdr, struct iovec const *iov,
+		   int offset, int dsz, int mtu , struct sk_buff **chain);
+
+struct sk_buff *tipc_msg_reassemble(struct sk_buff *chain);
 
 #endif
diff --git a/net/tipc/name_distr.c b/net/tipc/name_distr.c
index 8ce7309..dcc15bc 100644
--- a/net/tipc/name_distr.c
+++ b/net/tipc/name_distr.c
@@ -101,24 +101,22 @@
 
 void named_cluster_distribute(struct sk_buff *buf)
 {
-	struct sk_buff *buf_copy;
-	struct tipc_node *n_ptr;
-	struct tipc_link *l_ptr;
+	struct sk_buff *obuf;
+	struct tipc_node *node;
+	u32 dnode;
 
 	rcu_read_lock();
-	list_for_each_entry_rcu(n_ptr, &tipc_node_list, list) {
-		tipc_node_lock(n_ptr);
-		l_ptr = n_ptr->active_links[n_ptr->addr & 1];
-		if (l_ptr) {
-			buf_copy = skb_copy(buf, GFP_ATOMIC);
-			if (!buf_copy) {
-				tipc_node_unlock(n_ptr);
-				break;
-			}
-			msg_set_destnode(buf_msg(buf_copy), n_ptr->addr);
-			__tipc_link_xmit(l_ptr, buf_copy);
-		}
-		tipc_node_unlock(n_ptr);
+	list_for_each_entry_rcu(node, &tipc_node_list, list) {
+		dnode = node->addr;
+		if (in_own_node(dnode))
+			continue;
+		if (!tipc_node_active_links(node))
+			continue;
+		obuf = skb_copy(buf, GFP_ATOMIC);
+		if (!obuf)
+			break;
+		msg_set_destnode(buf_msg(obuf), dnode);
+		tipc_link_xmit(obuf, dnode, dnode);
 	}
 	rcu_read_unlock();
 
@@ -175,34 +173,44 @@
 	return buf;
 }
 
-/*
+/**
  * named_distribute - prepare name info for bulk distribution to another node
+ * @msg_list: list of messages (buffers) to be returned from this function
+ * @dnode: node to be updated
+ * @pls: linked list of publication items to be packed into buffer chain
  */
-static void named_distribute(struct list_head *message_list, u32 node,
-			     struct publ_list *pls, u32 max_item_buf)
+static void named_distribute(struct list_head *msg_list, u32 dnode,
+			     struct publ_list *pls)
 {
 	struct publication *publ;
 	struct sk_buff *buf = NULL;
 	struct distr_item *item = NULL;
-	u32 left = 0;
-	u32 rest = pls->size * ITEM_SIZE;
+	uint dsz = pls->size * ITEM_SIZE;
+	uint msg_dsz = (tipc_node_get_mtu(dnode, 0) / ITEM_SIZE) * ITEM_SIZE;
+	uint rem = dsz;
+	uint msg_rem = 0;
 
 	list_for_each_entry(publ, &pls->list, local_list) {
+		/* Prepare next buffer: */
 		if (!buf) {
-			left = (rest <= max_item_buf) ? rest : max_item_buf;
-			rest -= left;
-			buf = named_prepare_buf(PUBLICATION, left, node);
+			msg_rem = min_t(uint, rem, msg_dsz);
+			rem -= msg_rem;
+			buf = named_prepare_buf(PUBLICATION, msg_rem, dnode);
 			if (!buf) {
 				pr_warn("Bulk publication failure\n");
 				return;
 			}
 			item = (struct distr_item *)msg_data(buf_msg(buf));
 		}
+
+		/* Pack publication into message: */
 		publ_to_item(item, publ);
 		item++;
-		left -= ITEM_SIZE;
-		if (!left) {
-			list_add_tail((struct list_head *)buf, message_list);
+		msg_rem -= ITEM_SIZE;
+
+		/* Append full buffer to list: */
+		if (!msg_rem) {
+			list_add_tail((struct list_head *)buf, msg_list);
 			buf = NULL;
 		}
 	}
@@ -211,16 +219,20 @@
 /**
  * tipc_named_node_up - tell specified node about all publications by this node
  */
-void tipc_named_node_up(u32 max_item_buf, u32 node)
+void tipc_named_node_up(u32 dnode)
 {
-	LIST_HEAD(message_list);
+	LIST_HEAD(msg_list);
+	struct sk_buff *buf_chain;
 
 	read_lock_bh(&tipc_nametbl_lock);
-	named_distribute(&message_list, node, &publ_cluster, max_item_buf);
-	named_distribute(&message_list, node, &publ_zone, max_item_buf);
+	named_distribute(&msg_list, dnode, &publ_cluster);
+	named_distribute(&msg_list, dnode, &publ_zone);
 	read_unlock_bh(&tipc_nametbl_lock);
 
-	tipc_link_names_xmit(&message_list, node);
+	/* Convert circular list to linear list and send: */
+	buf_chain = (struct sk_buff *)msg_list.next;
+	((struct sk_buff *)msg_list.prev)->next = NULL;
+	tipc_link_xmit(buf_chain, dnode, dnode);
 }
 
 /**
diff --git a/net/tipc/name_distr.h b/net/tipc/name_distr.h
index b2eed4e..8afe32b 100644
--- a/net/tipc/name_distr.h
+++ b/net/tipc/name_distr.h
@@ -70,7 +70,7 @@
 struct sk_buff *tipc_named_publish(struct publication *publ);
 struct sk_buff *tipc_named_withdraw(struct publication *publ);
 void named_cluster_distribute(struct sk_buff *buf);
-void tipc_named_node_up(u32 max_item_buf, u32 node);
+void tipc_named_node_up(u32 dnode);
 void tipc_named_rcv(struct sk_buff *buf);
 void tipc_named_reinit(void);
 
diff --git a/net/tipc/node.c b/net/tipc/node.c
index d959343..f706929 100644
--- a/net/tipc/node.c
+++ b/net/tipc/node.c
@@ -474,8 +474,6 @@
 void tipc_node_unlock(struct tipc_node *node)
 {
 	LIST_HEAD(nsub_list);
-	struct tipc_link *link;
-	int pkt_sz = 0;
 	u32 addr = 0;
 
 	if (likely(!node->action_flags)) {
@@ -488,18 +486,13 @@
 		node->action_flags &= ~TIPC_NOTIFY_NODE_DOWN;
 	}
 	if (node->action_flags & TIPC_NOTIFY_NODE_UP) {
-		link = node->active_links[0];
 		node->action_flags &= ~TIPC_NOTIFY_NODE_UP;
-		if (link) {
-			pkt_sz = ((link->max_pkt - INT_H_SIZE) / ITEM_SIZE) *
-				  ITEM_SIZE;
-			addr = node->addr;
-		}
+		addr = node->addr;
 	}
 	spin_unlock_bh(&node->lock);
 
 	if (!list_empty(&nsub_list))
 		tipc_nodesub_notify(&nsub_list);
-	if (pkt_sz)
-		tipc_named_node_up(pkt_sz, addr);
+	if (addr)
+		tipc_named_node_up(addr);
 }
diff --git a/net/tipc/port.c b/net/tipc/port.c
index 0d09dcb..7e096a5 100644
--- a/net/tipc/port.c
+++ b/net/tipc/port.c
@@ -74,118 +74,6 @@
 		(!peernode && (orignode == tipc_own_addr));
 }
 
-/**
- * tipc_port_mcast_xmit - send a multicast message to local and remote
- * destinations
- */
-int tipc_port_mcast_xmit(struct tipc_port *oport,
-			 struct tipc_name_seq const *seq,
-			 struct iovec const *msg_sect,
-			 unsigned int len)
-{
-	struct tipc_msg *hdr;
-	struct sk_buff *buf;
-	struct sk_buff *ibuf = NULL;
-	struct tipc_port_list dports = {0, NULL, };
-	int ext_targets;
-	int res;
-
-	/* Create multicast message */
-	hdr = &oport->phdr;
-	msg_set_type(hdr, TIPC_MCAST_MSG);
-	msg_set_lookup_scope(hdr, TIPC_CLUSTER_SCOPE);
-	msg_set_destport(hdr, 0);
-	msg_set_destnode(hdr, 0);
-	msg_set_nametype(hdr, seq->type);
-	msg_set_namelower(hdr, seq->lower);
-	msg_set_nameupper(hdr, seq->upper);
-	msg_set_hdr_sz(hdr, MCAST_H_SIZE);
-	res = tipc_msg_build(hdr, msg_sect, len, MAX_MSG_SIZE, &buf);
-	if (unlikely(!buf))
-		return res;
-
-	/* Figure out where to send multicast message */
-	ext_targets = tipc_nametbl_mc_translate(seq->type, seq->lower, seq->upper,
-						TIPC_NODE_SCOPE, &dports);
-
-	/* Send message to destinations (duplicate it only if necessary) */
-	if (ext_targets) {
-		if (dports.count != 0) {
-			ibuf = skb_copy(buf, GFP_ATOMIC);
-			if (ibuf == NULL) {
-				tipc_port_list_free(&dports);
-				kfree_skb(buf);
-				return -ENOMEM;
-			}
-		}
-		res = tipc_bclink_xmit(buf);
-		if ((res < 0) && (dports.count != 0))
-			kfree_skb(ibuf);
-	} else {
-		ibuf = buf;
-	}
-
-	if (res >= 0) {
-		if (ibuf)
-			tipc_port_mcast_rcv(ibuf, &dports);
-	} else {
-		tipc_port_list_free(&dports);
-	}
-	return res;
-}
-
-/**
- * tipc_port_mcast_rcv - deliver multicast message to all destination ports
- *
- * If there is no port list, perform a lookup to create one
- */
-void tipc_port_mcast_rcv(struct sk_buff *buf, struct tipc_port_list *dp)
-{
-	struct tipc_msg *msg;
-	struct tipc_port_list dports = {0, NULL, };
-	struct tipc_port_list *item = dp;
-	int cnt = 0;
-
-	msg = buf_msg(buf);
-
-	/* Create destination port list, if one wasn't supplied */
-	if (dp == NULL) {
-		tipc_nametbl_mc_translate(msg_nametype(msg),
-				     msg_namelower(msg),
-				     msg_nameupper(msg),
-				     TIPC_CLUSTER_SCOPE,
-				     &dports);
-		item = dp = &dports;
-	}
-
-	/* Deliver a copy of message to each destination port */
-	if (dp->count != 0) {
-		msg_set_destnode(msg, tipc_own_addr);
-		if (dp->count == 1) {
-			msg_set_destport(msg, dp->ports[0]);
-			tipc_sk_rcv(buf);
-			tipc_port_list_free(dp);
-			return;
-		}
-		for (; cnt < dp->count; cnt++) {
-			int index = cnt % PLSIZE;
-			struct sk_buff *b = skb_clone(buf, GFP_ATOMIC);
-
-			if (b == NULL) {
-				pr_warn("Unable to deliver multicast message(s)\n");
-				goto exit;
-			}
-			if ((index == 0) && (cnt != 0))
-				item = item->next;
-			msg_set_destport(buf_msg(b), item->ports[index]);
-			tipc_sk_rcv(b);
-		}
-	}
-exit:
-	kfree_skb(buf);
-	tipc_port_list_free(dp);
-}
-
 /* tipc_port_init - intiate TIPC port and lock it
  *
  * Returns obtained reference if initialization is successful, zero otherwise
@@ -242,7 +130,7 @@
 		tipc_nodesub_unsubscribe(&p_ptr->subscription);
 		msg = buf_msg(buf);
 		peer = msg_destnode(msg);
-		tipc_link_xmit2(buf, peer, msg_link_selector(msg));
+		tipc_link_xmit(buf, peer, msg_link_selector(msg));
 	}
 	spin_lock_bh(&tipc_port_list_lock);
 	list_del(&p_ptr->port_list);
@@ -299,7 +187,7 @@
 	}
 	tipc_port_unlock(p_ptr);
 	msg = buf_msg(buf);
-	tipc_link_xmit2(buf, msg_destnode(msg),	msg_link_selector(msg));
+	tipc_link_xmit(buf, msg_destnode(msg),	msg_link_selector(msg));
 }
 
 
@@ -314,7 +202,7 @@
 	buf = port_build_self_abort_msg(p_ptr, TIPC_ERR_NO_NODE);
 	tipc_port_unlock(p_ptr);
 	msg = buf_msg(buf);
-	tipc_link_xmit2(buf, msg_destnode(msg),	msg_link_selector(msg));
+	tipc_link_xmit(buf, msg_destnode(msg),	msg_link_selector(msg));
 }
 
 
@@ -459,7 +347,7 @@
 	if (!buf)
 		return;
 	msg = buf_msg(buf);
-	tipc_link_xmit2(buf, msg_destnode(msg),	msg_link_selector(msg));
+	tipc_link_xmit(buf, msg_destnode(msg),	msg_link_selector(msg));
 }
 
 int tipc_publish(struct tipc_port *p_ptr, unsigned int scope,
@@ -621,6 +509,6 @@
 	buf = port_build_peer_abort_msg(p_ptr, TIPC_CONN_SHUTDOWN);
 	tipc_port_unlock(p_ptr);
 	msg = buf_msg(buf);
-	tipc_link_xmit2(buf, msg_destnode(msg),	msg_link_selector(msg));
+	tipc_link_xmit(buf, msg_destnode(msg),	msg_link_selector(msg));
 	return tipc_port_disconnect(ref);
 }
diff --git a/net/tipc/port.h b/net/tipc/port.h
index 0e47052..3f93454 100644
--- a/net/tipc/port.h
+++ b/net/tipc/port.h
@@ -120,17 +120,7 @@
 		   struct tipc_portid const *peer);
 int tipc_port_peer_msg(struct tipc_port *p_ptr, struct tipc_msg *msg);
 
-/*
- * TIPC messaging routines
- */
-
-int tipc_port_mcast_xmit(struct tipc_port *port,
-			 struct tipc_name_seq const *seq,
-			 struct iovec const *msg,
-			 unsigned int len);
-
 struct sk_buff *tipc_port_get_ports(void);
-void tipc_port_mcast_rcv(struct sk_buff *buf, struct tipc_port_list *dp);
 void tipc_port_reinit(void);
 
 /**
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index de01622..5ad42f6 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -53,6 +53,7 @@
 static void tipc_write_space(struct sock *sk);
 static int tipc_release(struct socket *sock);
 static int tipc_accept(struct socket *sock, struct socket *new_sock, int flags);
+static int tipc_wait_for_sndmsg(struct socket *sock, long *timeo_p);
 
 static const struct proto_ops packet_ops;
 static const struct proto_ops stream_ops;
@@ -130,7 +131,7 @@
 
 	while ((buf = __skb_dequeue(&sk->sk_receive_queue))) {
 		if (tipc_msg_reverse(buf, &dnode, TIPC_ERR_NO_PORT))
-			tipc_link_xmit2(buf, dnode, 0);
+			tipc_link_xmit(buf, dnode, 0);
 	}
 }
 
@@ -340,7 +341,7 @@
 				tipc_port_disconnect(port->ref);
 			}
 			if (tipc_msg_reverse(buf, &dnode, TIPC_ERR_NO_PORT))
-				tipc_link_xmit2(buf, dnode, 0);
+				tipc_link_xmit(buf, dnode, 0);
 		}
 	}
 
@@ -535,6 +536,98 @@
 }
 
 /**
+ * tipc_sendmcast - send multicast message
+ * @sock: socket structure
+ * @seq: destination address
+ * @iov: message data to send
+ * @dsz: total length of message data
+ * @timeo: timeout to wait for wakeup
+ *
+ * Called from function tipc_sendmsg(), which has done all sanity checks
+ * Returns the number of bytes sent on success, or errno
+ */
+static int tipc_sendmcast(struct  socket *sock, struct tipc_name_seq *seq,
+			  struct iovec *iov, size_t dsz, long timeo)
+{
+	struct sock *sk = sock->sk;
+	struct tipc_msg *mhdr = &tipc_sk(sk)->port.phdr;
+	struct sk_buff *buf;
+	uint mtu;
+	int rc;
+
+	msg_set_type(mhdr, TIPC_MCAST_MSG);
+	msg_set_lookup_scope(mhdr, TIPC_CLUSTER_SCOPE);
+	msg_set_destport(mhdr, 0);
+	msg_set_destnode(mhdr, 0);
+	msg_set_nametype(mhdr, seq->type);
+	msg_set_namelower(mhdr, seq->lower);
+	msg_set_nameupper(mhdr, seq->upper);
+	msg_set_hdr_sz(mhdr, MCAST_H_SIZE);
+
+new_mtu:
+	mtu = tipc_bclink_get_mtu();
+	rc = tipc_msg_build(mhdr, iov, 0, dsz, mtu, &buf);
+	if (unlikely(rc < 0))
+		return rc;
+
+	do {
+		rc = tipc_bclink_xmit(buf);
+		if (likely(rc >= 0)) {
+			rc = dsz;
+			break;
+		}
+		if (rc == -EMSGSIZE)
+			goto new_mtu;
+		if (rc != -ELINKCONG)
+			break;
+		rc = tipc_wait_for_sndmsg(sock, &timeo);
+		if (rc)
+			kfree_skb_list(buf);
+	} while (!rc);
+	return rc;
+}
+
+/* tipc_sk_mcast_rcv - Deliver multicast message to all destination sockets
+ */
+void tipc_sk_mcast_rcv(struct sk_buff *buf)
+{
+	struct tipc_msg *msg = buf_msg(buf);
+	struct tipc_port_list dports = {0, NULL, };
+	struct tipc_port_list *item;
+	struct sk_buff *b;
+	uint i, last, dst = 0;
+	u32 scope = TIPC_CLUSTER_SCOPE;
+
+	if (in_own_node(msg_orignode(msg)))
+		scope = TIPC_NODE_SCOPE;
+
+	/* Create destination port list: */
+	tipc_nametbl_mc_translate(msg_nametype(msg),
+				  msg_namelower(msg),
+				  msg_nameupper(msg),
+				  scope,
+				  &dports);
+	last = dports.count;
+	if (!last) {
+		kfree_skb(buf);
+		return;
+	}
+
+	for (item = &dports; item; item = item->next) {
+		for (i = 0; i < PLSIZE && ++dst <= last; i++) {
+			b = (dst != last) ? skb_clone(buf, GFP_ATOMIC) : buf;
+			if (!b) {
+				pr_warn("Failed do clone mcast rcv buffer\n");
+				continue;
+			}
+			msg_set_destport(msg, item->ports[i]);
+			tipc_sk_rcv(b);
+		}
+	}
+	tipc_port_list_free(&dports);
+}
+
+/**
  * tipc_sk_proto_rcv - receive a connection mng protocol message
  * @tsk: receiving socket
  * @dnode: node to send response message to, if any
@@ -630,43 +723,6 @@
 }
 
 /**
- * tipc_sendmcast - send multicast message
- * @sock: socket structure
- * @seq: destination address
- * @iov: message data to send
- * @dsz: total length of message data
- * @timeo: timeout to wait for wakeup
- *
- * Called from function tipc_sendmsg(), which has done all sanity checks
- * Returns the number of bytes sent on success, or errno
- */
-static int tipc_sendmcast(struct  socket *sock, struct tipc_name_seq *seq,
-			  struct iovec *iov, size_t dsz, long timeo)
-{
-	struct sock *sk = sock->sk;
-	struct tipc_sock *tsk = tipc_sk(sk);
-	int rc;
-
-	do {
-		if (sock->state != SS_READY) {
-			rc = -EOPNOTSUPP;
-			break;
-		}
-		rc = tipc_port_mcast_xmit(&tsk->port, seq, iov, dsz);
-		if (likely(rc >= 0)) {
-			if (sock->state != SS_READY)
-				sock->state = SS_CONNECTING;
-			break;
-		}
-		if (rc != -ELINKCONG)
-			break;
-		rc = tipc_wait_for_sndmsg(sock, &timeo);
-	} while (!rc);
-
-	return rc;
-}
-
-/**
  * tipc_sendmsg - send message in connectionless manner
  * @iocb: if NULL, indicates that socket lock is already held
  * @sock: socket structure
@@ -765,12 +821,12 @@
 
 new_mtu:
 	mtu = tipc_node_get_mtu(dnode, tsk->port.ref);
-	rc = tipc_msg_build2(mhdr, iov, 0, dsz, mtu, &buf);
+	rc = tipc_msg_build(mhdr, iov, 0, dsz, mtu, &buf);
 	if (rc < 0)
 		goto exit;
 
 	do {
-		rc = tipc_link_xmit2(buf, dnode, tsk->port.ref);
+		rc = tipc_link_xmit(buf, dnode, tsk->port.ref);
 		if (likely(rc >= 0)) {
 			if (sock->state != SS_READY)
 				sock->state = SS_CONNECTING;
@@ -878,12 +934,12 @@
 next:
 	mtu = port->max_pkt;
 	send = min_t(uint, dsz - sent, TIPC_MAX_USER_MSG_SIZE);
-	rc = tipc_msg_build2(mhdr, m->msg_iov, sent, send, mtu, &buf);
+	rc = tipc_msg_build(mhdr, m->msg_iov, sent, send, mtu, &buf);
 	if (unlikely(rc < 0))
 		goto exit;
 	do {
 		if (likely(!tipc_sk_conn_cong(tsk))) {
-			rc = tipc_link_xmit2(buf, dnode, ref);
+			rc = tipc_link_xmit(buf, dnode, ref);
 			if (likely(!rc)) {
 				tsk->sent_unacked++;
 				sent += send;
@@ -1515,7 +1571,7 @@
 	if ((rc < 0) && !tipc_msg_reverse(buf, &onode, -rc))
 		return 0;
 
-	tipc_link_xmit2(buf, onode, 0);
+	tipc_link_xmit(buf, onode, 0);
 
 	return 0;
 }
@@ -1567,7 +1623,7 @@
 	if ((rc < 0) && !tipc_msg_reverse(buf, &dnode, -rc))
 		return -EHOSTUNREACH;
 
-	tipc_link_xmit2(buf, dnode, 0);
+	tipc_link_xmit(buf, dnode, 0);
 	return (rc < 0) ? -EHOSTUNREACH : 0;
 }
 
@@ -1854,7 +1910,7 @@
 			}
 			tipc_port_disconnect(port->ref);
 			if (tipc_msg_reverse(buf, &peer, TIPC_CONN_SHUTDOWN))
-				tipc_link_xmit2(buf, peer, 0);
+				tipc_link_xmit(buf, peer, 0);
 		} else {
 			tipc_port_shutdown(port->ref);
 		}
diff --git a/net/tipc/socket.h b/net/tipc/socket.h
index 2cdede9..43b75b3 100644
--- a/net/tipc/socket.h
+++ b/net/tipc/socket.h
@@ -85,4 +85,6 @@
 
 int tipc_sk_rcv(struct sk_buff *buf);
 
+void tipc_sk_mcast_rcv(struct sk_buff *buf);
+
 #endif