tipc: let broadcast packet reception use new link receive function
The code path for receiving broadcast packets is currently distinct
from the unicast path. This leads to unnecessary code and data
duplication, something that can be avoided with some effort.
We now introduce separate per-peer tipc_link instances for handling
broadcast packet reception. Each receive link keeps a pointer to the
common, single, broadcast link instance, and can hence handle release
and retransmission of send buffers as if they belonged to the own
instance.
Furthermore, we let each unicast link instance keep a reference to both
the pertaining broadcast receive link, and to the common send link.
This makes it possible for the unicast links to easily access data for
broadcast link synchronization, as well as for carrying acknowledges for
received broadcast packets.
Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
Reviewed-by: Ying Xue <ying.xue@windriver.com>
Signed-off-by: David S. Miller <davem@davemloft.net>
diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c
index 7fdf895..ea28c29 100644
--- a/net/tipc/bcast.c
+++ b/net/tipc/bcast.c
@@ -112,11 +112,6 @@
return tipc_net(net)->bcbase;
}
-static struct tipc_link *tipc_bc_sndlink(struct net *net)
-{
- return tipc_net(net)->bcl;
-}
-
/**
* tipc_nmap_equal - test for equality of node maps
*/
@@ -169,31 +164,6 @@
bcbuf_set_acks(buf, bcbuf_acks(buf) - 1);
}
-void tipc_bclink_add_node(struct net *net, u32 addr)
-{
- struct tipc_net *tn = net_generic(net, tipc_net_id);
- struct tipc_link *l = tipc_bc_sndlink(net);
- tipc_bclink_lock(net);
- tipc_nmap_add(&tn->bcbase->bcast_nodes, addr);
- tipc_link_add_bc_peer(l);
- tipc_bclink_unlock(net);
-}
-
-void tipc_bclink_remove_node(struct net *net, u32 addr)
-{
- struct tipc_net *tn = net_generic(net, tipc_net_id);
-
- tipc_bclink_lock(net);
- tipc_nmap_remove(&tn->bcbase->bcast_nodes, addr);
- tn->bcl->ackers--;
-
- /* Last node? => reset backlog queue */
- if (!tn->bcbase->bcast_nodes.count)
- tipc_link_purge_backlog(tn->bcbase->link);
-
- tipc_bclink_unlock(net);
-}
-
static void bclink_set_last_sent(struct net *net)
{
struct tipc_net *tn = net_generic(net, tipc_net_id);
@@ -501,12 +471,141 @@
__skb_queue_purge(&rcvq);
return rc;
}
+
/* Broadcast to all nodes, inluding local node */
tipc_bcbearer_xmit(net, &xmitq);
tipc_sk_mcast_rcv(net, &rcvq, &inputq);
__skb_queue_purge(list);
return 0;
}
+
+/* tipc_bcast_rcv - receive a broadcast packet, and deliver to rcv link
+ *
+ * RCU is locked, no other locks set
+ */
+int tipc_bcast_rcv(struct net *net, struct tipc_link *l, struct sk_buff *skb)
+{
+ struct tipc_msg *hdr = buf_msg(skb);
+ struct sk_buff_head *inputq = &tipc_bc_base(net)->inputq;
+ struct sk_buff_head xmitq;
+ int rc;
+
+ __skb_queue_head_init(&xmitq);
+
+ if (msg_mc_netid(hdr) != tipc_netid(net) || !tipc_link_is_up(l)) {
+ kfree_skb(skb);
+ return 0;
+ }
+
+ tipc_bcast_lock(net);
+ if (msg_user(hdr) == BCAST_PROTOCOL)
+ rc = tipc_link_bc_nack_rcv(l, skb, &xmitq);
+ else
+ rc = tipc_link_rcv(l, skb, NULL);
+ tipc_bcast_unlock(net);
+
+ if (!skb_queue_empty(&xmitq))
+ tipc_bcbearer_xmit(net, &xmitq);
+
+ /* Any socket wakeup messages ? */
+ if (!skb_queue_empty(inputq))
+ tipc_sk_rcv(net, inputq);
+
+ return rc;
+}
+
+/* tipc_bcast_ack_rcv - receive and handle a broadcast acknowledge
+ *
+ * RCU is locked, no other locks set
+ */
+void tipc_bcast_ack_rcv(struct net *net, struct tipc_link *l, u32 acked)
+{
+ struct sk_buff_head *inputq = &tipc_bc_base(net)->inputq;
+ struct sk_buff_head xmitq;
+
+ __skb_queue_head_init(&xmitq);
+
+ tipc_bcast_lock(net);
+ tipc_link_bc_ack_rcv(l, acked, &xmitq);
+ tipc_bcast_unlock(net);
+
+ tipc_bcbearer_xmit(net, &xmitq);
+
+ /* Any socket wakeup messages ? */
+ if (!skb_queue_empty(inputq))
+ tipc_sk_rcv(net, inputq);
+}
+
+/* tipc_bcast_synch_rcv - check and update rcv link with peer's send state
+ *
+ * RCU is locked, no other locks set
+ */
+void tipc_bcast_sync_rcv(struct net *net, struct tipc_link *l,
+ struct tipc_msg *hdr)
+{
+ struct sk_buff_head *inputq = &tipc_bc_base(net)->inputq;
+ struct sk_buff_head xmitq;
+
+ __skb_queue_head_init(&xmitq);
+
+ tipc_bcast_lock(net);
+ if (msg_type(hdr) == STATE_MSG) {
+ tipc_link_bc_ack_rcv(l, msg_bcast_ack(hdr), &xmitq);
+ tipc_link_bc_sync_rcv(l, hdr, &xmitq);
+ } else {
+ tipc_link_bc_init_rcv(l, hdr);
+ }
+ tipc_bcast_unlock(net);
+
+ tipc_bcbearer_xmit(net, &xmitq);
+
+ /* Any socket wakeup messages ? */
+ if (!skb_queue_empty(inputq))
+ tipc_sk_rcv(net, inputq);
+}
+
+/* tipc_bcast_add_peer - add a peer node to broadcast link and bearer
+ *
+ * RCU is locked, node lock is set
+ */
+void tipc_bcast_add_peer(struct net *net, u32 addr, struct tipc_link *uc_l,
+ struct sk_buff_head *xmitq)
+{
+ struct tipc_net *tn = net_generic(net, tipc_net_id);
+ struct tipc_link *snd_l = tipc_bc_sndlink(net);
+
+ tipc_bclink_lock(net);
+ tipc_nmap_add(&tn->bcbase->bcast_nodes, addr);
+ tipc_link_add_bc_peer(snd_l, uc_l, xmitq);
+ tipc_bclink_unlock(net);
+}
+
+/* tipc_bcast_remove_peer - remove a peer node from broadcast link and bearer
+ *
+ * RCU is locked, node lock is set
+ */
+void tipc_bcast_remove_peer(struct net *net, u32 addr,
+ struct tipc_link *rcv_l)
+{
+ struct tipc_net *tn = net_generic(net, tipc_net_id);
+ struct sk_buff_head *inputq = &tipc_bc_base(net)->inputq;
+ struct tipc_link *snd_l = tipc_bc_sndlink(net);
+ struct sk_buff_head xmitq;
+
+ __skb_queue_head_init(&xmitq);
+
+ tipc_bclink_lock(net);
+ tipc_nmap_remove(&tn->bcbase->bcast_nodes, addr);
+ tipc_link_remove_bc_peer(snd_l, rcv_l, &xmitq);
+ tipc_bclink_unlock(net);
+
+ tipc_bcbearer_xmit(net, &xmitq);
+
+ /* Any socket wakeup messages ? */
+ if (!skb_queue_empty(inputq))
+ tipc_sk_rcv(net, inputq);
+}
+
/**
* bclink_accept_pkt - accept an incoming, in-sequence broadcast packet
*
@@ -728,6 +827,7 @@
return 0;
}
}
+ msg_set_mc_netid(msg, tn->net_id);
/* Send buffer over bearers until all targets reached */
bcbearer->remains = bclink->bcast_nodes;
@@ -1042,12 +1142,13 @@
spin_lock_init(&tipc_net(net)->bclock);
bb->node.net = net;
- if (!tipc_link_bc_create(&bb->node,
+ if (!tipc_link_bc_create(&bb->node, 0, 0,
MAX_PKT_DEFAULT_MCAST,
BCLINK_WIN_DEFAULT,
0,
&bb->inputq,
&bb->namedq,
+ NULL,
&l))
goto enomem;
bb->link = l;
diff --git a/net/tipc/bcast.h b/net/tipc/bcast.h
index a378fdd..568a57c 100644
--- a/net/tipc/bcast.h
+++ b/net/tipc/bcast.h
@@ -47,8 +47,11 @@
int tipc_bcast_init(struct net *net);
void tipc_bcast_reinit(struct net *net);
void tipc_bcast_stop(struct net *net);
-void tipc_bclink_add_node(struct net *net, u32 addr);
-void tipc_bclink_remove_node(struct net *net, u32 addr);
+void tipc_bcast_add_peer(struct net *net, u32 addr,
+ struct tipc_link *l,
+ struct sk_buff_head *xmitq);
+void tipc_bcast_remove_peer(struct net *net, u32 addr,
+ struct tipc_link *rcv_bcl);
struct tipc_node *tipc_bclink_retransmit_to(struct net *tn);
void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked);
void tipc_bclink_rcv(struct net *net, struct sk_buff *buf);
@@ -62,6 +65,10 @@
int tipc_bclink_set_queue_limits(struct net *net, u32 limit);
uint tipc_bcast_get_mtu(void);
int tipc_bcast_xmit(struct net *net, struct sk_buff_head *list);
+int tipc_bcast_rcv(struct net *net, struct tipc_link *l, struct sk_buff *skb);
+void tipc_bcast_ack_rcv(struct net *net, struct tipc_link *l, u32 acked);
+void tipc_bcast_sync_rcv(struct net *net, struct tipc_link *l,
+ struct tipc_msg *hdr);
void tipc_bclink_wakeup_users(struct net *net);
int tipc_nl_add_bc_link(struct net *net, struct tipc_nl_msg *msg);
int tipc_nl_bc_link_set(struct net *net, struct nlattr *attrs[]);
@@ -78,4 +85,9 @@
spin_unlock_bh(&tipc_net(net)->bclock);
}
+static inline struct tipc_link *tipc_bc_sndlink(struct net *net)
+{
+ return tipc_net(net)->bcl;
+}
+
#endif
diff --git a/net/tipc/core.h b/net/tipc/core.h
index 645dcac..6d589aa 100644
--- a/net/tipc/core.h
+++ b/net/tipc/core.h
@@ -115,6 +115,11 @@
return net_generic(net, tipc_net_id);
}
+static inline int tipc_netid(struct net *net)
+{
+ return tipc_net(net)->net_id;
+}
+
static inline u16 mod(u16 x)
{
return x & 0xffffu;
diff --git a/net/tipc/link.c b/net/tipc/link.c
index 6a1a9d9..ff725c3 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -76,6 +76,14 @@
[TIPC_NLA_PROP_WIN] = { .type = NLA_U32 }
};
+/* Send states for broadcast NACKs
+ */
+enum {
+ BC_NACK_SND_CONDITIONAL,
+ BC_NACK_SND_UNCONDITIONAL,
+ BC_NACK_SND_SUPPRESS,
+};
+
/*
* Interval between NACKs when packets arrive out of order
*/
@@ -111,7 +119,11 @@
struct sk_buff_head *xmitq);
static void link_reset_statistics(struct tipc_link *l_ptr);
static void link_print(struct tipc_link *l_ptr, const char *str);
-static void tipc_link_sync_rcv(struct tipc_node *n, struct sk_buff *buf);
+static void tipc_link_build_nack_msg(struct tipc_link *l,
+ struct sk_buff_head *xmitq);
+static void tipc_link_build_bc_init_msg(struct tipc_link *l,
+ struct sk_buff_head *xmitq);
+static bool tipc_link_release_pkts(struct tipc_link *l, u16 to);
/*
* Simple non-static link routines (i.e. referenced outside this file)
@@ -151,6 +163,16 @@
return l->state & (LINK_RESETTING | LINK_PEER_RESET | LINK_FAILINGOVER);
}
+bool link_is_bc_sndlink(struct tipc_link *l)
+{
+ return !l->bc_sndlink;
+}
+
+bool link_is_bc_rcvlink(struct tipc_link *l)
+{
+ return ((l->bc_rcvlink == l) && !link_is_bc_sndlink(l));
+}
+
int tipc_link_is_active(struct tipc_link *l)
{
struct tipc_node *n = l->owner;
@@ -158,14 +180,31 @@
return (node_active_link(n, 0) == l) || (node_active_link(n, 1) == l);
}
-void tipc_link_add_bc_peer(struct tipc_link *l)
+void tipc_link_add_bc_peer(struct tipc_link *snd_l,
+ struct tipc_link *uc_l,
+ struct sk_buff_head *xmitq)
{
- l->ackers++;
+ struct tipc_link *rcv_l = uc_l->bc_rcvlink;
+
+ snd_l->ackers++;
+ rcv_l->acked = snd_l->snd_nxt - 1;
+ tipc_link_build_bc_init_msg(uc_l, xmitq);
}
-void tipc_link_remove_bc_peer(struct tipc_link *l)
+void tipc_link_remove_bc_peer(struct tipc_link *snd_l,
+ struct tipc_link *rcv_l,
+ struct sk_buff_head *xmitq)
{
- l->ackers--;
+ u16 ack = snd_l->snd_nxt - 1;
+
+ snd_l->ackers--;
+ tipc_link_bc_ack_rcv(rcv_l, ack, xmitq);
+ tipc_link_reset(rcv_l);
+ rcv_l->state = LINK_RESET;
+ if (!snd_l->ackers) {
+ tipc_link_reset(snd_l);
+ __skb_queue_purge(xmitq);
+ }
}
int tipc_link_bc_peers(struct tipc_link *l)
@@ -193,6 +232,8 @@
* @peer: node id of peer node
* @peer_caps: bitmap describing peer node capabilities
* @maddr: media address to be used
+ * @bc_sndlink: the namespace global link used for broadcast sending
+ * @bc_rcvlink: the peer specific link used for broadcast reception
* @inputq: queue to put messages ready for delivery
* @namedq: queue to put binding table update messages ready for delivery
* @link: return value, pointer to put the created link
@@ -202,8 +243,12 @@
bool tipc_link_create(struct tipc_node *n, char *if_name, int bearer_id,
int tolerance, char net_plane, u32 mtu, int priority,
int window, u32 session, u32 ownnode, u32 peer,
- u16 peer_caps, struct tipc_media_addr *maddr,
- struct sk_buff_head *inputq, struct sk_buff_head *namedq,
+ u16 peer_caps,
+ struct tipc_media_addr *maddr,
+ struct tipc_link *bc_sndlink,
+ struct tipc_link *bc_rcvlink,
+ struct sk_buff_head *inputq,
+ struct sk_buff_head *namedq,
struct tipc_link **link)
{
struct tipc_link *l;
@@ -239,6 +284,8 @@
l->priority = priority;
tipc_link_set_queue_limits(l, window);
l->ackers = 1;
+ l->bc_sndlink = bc_sndlink;
+ l->bc_rcvlink = bc_rcvlink;
l->inputq = inputq;
l->namedq = namedq;
l->state = LINK_RESETTING;
@@ -261,48 +308,34 @@
*
* Returns true if link was created, otherwise false
*/
-bool tipc_link_bc_create(struct tipc_node *n, int mtu, int window,
- u16 peer_caps,
+bool tipc_link_bc_create(struct tipc_node *n, u32 ownnode, u32 peer,
+ int mtu, int window, u16 peer_caps,
struct sk_buff_head *inputq,
struct sk_buff_head *namedq,
+ struct tipc_link *bc_sndlink,
struct tipc_link **link)
{
struct tipc_link *l;
if (!tipc_link_create(n, "", MAX_BEARERS, 0, 'Z', mtu, 0, window,
- 0, 0, 0, peer_caps, NULL, inputq, namedq, link))
+ 0, ownnode, peer, peer_caps, NULL, bc_sndlink,
+ NULL, inputq, namedq, link))
return false;
l = *link;
strcpy(l->name, tipc_bclink_name);
tipc_link_reset(l);
+ l->state = LINK_RESET;
l->ackers = 0;
+ l->bc_rcvlink = l;
+
+ /* Broadcast send link is always up */
+ if (link_is_bc_sndlink(l))
+ l->state = LINK_ESTABLISHED;
+
return true;
}
-/* tipc_link_build_bcast_sync_msg() - synchronize broadcast link endpoints.
- *
- * Give a newly added peer node the sequence number where it should
- * start receiving and acking broadcast packets.
- */
-void tipc_link_build_bcast_sync_msg(struct tipc_link *l,
- struct sk_buff_head *xmitq)
-{
- struct sk_buff *skb;
- struct sk_buff_head list;
- u16 last_sent;
-
- skb = tipc_msg_create(BCAST_PROTOCOL, STATE_MSG, INT_H_SIZE,
- 0, l->addr, link_own_addr(l), 0, 0, 0);
- if (!skb)
- return;
- last_sent = tipc_bclink_get_last_sent(l->owner->net);
- msg_set_last_bcast(buf_msg(skb), last_sent);
- __skb_queue_head_init(&list);
- __skb_queue_tail(&list, skb);
- tipc_link_xmit(l, &list, xmitq);
-}
-
/**
* tipc_link_fsm_evt - link finite state machine
* @l: pointer to link
@@ -507,12 +540,17 @@
/* tipc_link_timeout - perform periodic task as instructed from node timeout
*/
+/* tipc_link_timeout - perform periodic task as instructed from node timeout
+ */
int tipc_link_timeout(struct tipc_link *l, struct sk_buff_head *xmitq)
{
int rc = 0;
int mtyp = STATE_MSG;
bool xmit = false;
bool prb = false;
+ u16 bc_snt = l->bc_sndlink->snd_nxt - 1;
+ u16 bc_acked = l->bc_rcvlink->acked;
+ bool bc_up = link_is_up(l->bc_rcvlink);
link_profile_stats(l);
@@ -520,7 +558,7 @@
case LINK_ESTABLISHED:
case LINK_SYNCHING:
if (!l->silent_intv_cnt) {
- if (tipc_bclink_acks_missing(l->owner))
+ if (bc_up && (bc_acked != bc_snt))
xmit = true;
} else if (l->silent_intv_cnt <= l->abort_limit) {
xmit = true;
@@ -671,6 +709,7 @@
l->silent_intv_cnt = 0;
l->stats.recv_info = 0;
l->stale_count = 0;
+ l->bc_peer_is_up = false;
link_reset_statistics(l);
}
@@ -692,7 +731,7 @@
uint mtu = link->mtu;
u16 ack = mod(link->rcv_nxt - 1);
u16 seqno = link->snd_nxt;
- u16 bc_last_in = link->owner->bclink.last_in;
+ u16 bc_ack = link->bc_rcvlink->rcv_nxt - 1;
struct tipc_media_addr *addr = link->media_addr;
struct sk_buff_head *transmq = &link->transmq;
struct sk_buff_head *backlogq = &link->backlogq;
@@ -712,7 +751,7 @@
msg = buf_msg(skb);
msg_set_seqno(msg, seqno);
msg_set_ack(msg, ack);
- msg_set_bcast_ack(msg, bc_last_in);
+ msg_set_bcast_ack(msg, bc_ack);
if (likely(skb_queue_len(transmq) < maxwin)) {
__skb_dequeue(list);
@@ -762,7 +801,7 @@
unsigned int mtu = l->mtu;
u16 ack = l->rcv_nxt - 1;
u16 seqno = l->snd_nxt;
- u16 bc_last_in = l->owner->bclink.last_in;
+ u16 bc_ack = l->bc_rcvlink->rcv_nxt - 1;
struct sk_buff_head *transmq = &l->transmq;
struct sk_buff_head *backlogq = &l->backlogq;
struct sk_buff *skb, *_skb, *bskb;
@@ -781,7 +820,7 @@
hdr = buf_msg(skb);
msg_set_seqno(hdr, seqno);
msg_set_ack(hdr, ack);
- msg_set_bcast_ack(hdr, bc_last_in);
+ msg_set_bcast_ack(hdr, bc_ack);
if (likely(skb_queue_len(transmq) < maxwin)) {
_skb = skb_clone(skb, GFP_ATOMIC);
@@ -816,23 +855,6 @@
}
/*
- * tipc_link_sync_rcv - synchronize broadcast link endpoints.
- * Receive the sequence number where we should start receiving and
- * acking broadcast packets from a newly added peer node, and open
- * up for reception of such packets.
- *
- * Called with node locked
- */
-static void tipc_link_sync_rcv(struct tipc_node *n, struct sk_buff *buf)
-{
- struct tipc_msg *msg = buf_msg(buf);
-
- n->bclink.last_sent = n->bclink.last_in = msg_last_bcast(msg);
- n->bclink.recv_permitted = true;
- kfree_skb(buf);
-}
-
-/*
* tipc_link_push_packets - push unsent packets to bearer
*
* Push out the unsent messages of a link where congestion
@@ -872,6 +894,7 @@
struct tipc_msg *hdr;
u16 seqno = l->snd_nxt;
u16 ack = l->rcv_nxt - 1;
+ u16 bc_ack = l->bc_rcvlink->rcv_nxt - 1;
while (skb_queue_len(&l->transmq) < l->window) {
skb = skb_peek(&l->backlogq);
@@ -886,54 +909,25 @@
__skb_queue_tail(&l->transmq, skb);
__skb_queue_tail(xmitq, _skb);
TIPC_SKB_CB(skb)->ackers = l->ackers;
- msg_set_ack(hdr, ack);
msg_set_seqno(hdr, seqno);
- msg_set_bcast_ack(hdr, l->owner->bclink.last_in);
+ msg_set_ack(hdr, ack);
+ msg_set_bcast_ack(hdr, bc_ack);
l->rcv_unacked = 0;
seqno++;
}
l->snd_nxt = seqno;
}
-static void link_retransmit_failure(struct tipc_link *l_ptr,
- struct sk_buff *buf)
+static void link_retransmit_failure(struct tipc_link *l, struct sk_buff *skb)
{
- struct tipc_msg *msg = buf_msg(buf);
- struct net *net = l_ptr->owner->net;
+ struct tipc_msg *hdr = buf_msg(skb);
- pr_warn("Retransmission failure on link <%s>\n", l_ptr->name);
-
- if (l_ptr->addr) {
- /* Handle failure on standard link */
- link_print(l_ptr, "Resetting link ");
- pr_info("Failed msg: usr %u, typ %u, len %u, err %u\n",
- msg_user(msg), msg_type(msg), msg_size(msg),
- msg_errcode(msg));
- pr_info("sqno %u, prev: %x, src: %x\n",
- msg_seqno(msg), msg_prevnode(msg), msg_orignode(msg));
- } else {
- /* Handle failure on broadcast link */
- struct tipc_node *n_ptr;
- char addr_string[16];
-
- pr_info("Msg seq number: %u, ", msg_seqno(msg));
- pr_cont("Outstanding acks: %u\n", TIPC_SKB_CB(buf)->ackers);
-
- n_ptr = tipc_bclink_retransmit_to(net);
-
- tipc_addr_string_fill(addr_string, n_ptr->addr);
- pr_info("Broadcast link info for %s\n", addr_string);
- pr_info("Reception permitted: %d, Acked: %u\n",
- n_ptr->bclink.recv_permitted,
- n_ptr->bclink.acked);
- pr_info("Last in: %u, Oos state: %u, Last sent: %u\n",
- n_ptr->bclink.last_in,
- n_ptr->bclink.oos_state,
- n_ptr->bclink.last_sent);
-
- n_ptr->action_flags |= TIPC_BCAST_RESET;
- l_ptr->stale_count = 0;
- }
+ pr_warn("Retransmission failure on link <%s>\n", l->name);
+ link_print(l, "Resetting link ");
+ pr_info("Failed msg: usr %u, typ %u, len %u, err %u\n",
+ msg_user(hdr), msg_type(hdr), msg_size(hdr), msg_errcode(hdr));
+ pr_info("sqno %u, prev: %x, src: %x\n",
+ msg_seqno(hdr), msg_prevnode(hdr), msg_orignode(hdr));
}
void tipc_link_retransmit(struct tipc_link *l_ptr, struct sk_buff *skb,
@@ -976,7 +970,7 @@
struct sk_buff *_skb, *skb = skb_peek(&l->transmq);
struct tipc_msg *hdr;
u16 ack = l->rcv_nxt - 1;
- u16 bc_ack = l->owner->bclink.last_in;
+ u16 bc_ack = l->bc_rcvlink->rcv_nxt - 1;
if (!skb)
return 0;
@@ -1018,11 +1012,9 @@
* Consumes buffer if message is of right type
* Node lock must be held
*/
-static bool tipc_data_input(struct tipc_link *link, struct sk_buff *skb,
+static bool tipc_data_input(struct tipc_link *l, struct sk_buff *skb,
struct sk_buff_head *inputq)
{
- struct tipc_node *node = link->owner;
-
switch (msg_user(buf_msg(skb))) {
case TIPC_LOW_IMPORTANCE:
case TIPC_MEDIUM_IMPORTANCE:
@@ -1032,8 +1024,8 @@
skb_queue_tail(inputq, skb);
return true;
case NAME_DISTRIBUTOR:
- node->bclink.recv_permitted = true;
- skb_queue_tail(link->namedq, skb);
+ l->bc_rcvlink->state = LINK_ESTABLISHED;
+ skb_queue_tail(l->namedq, skb);
return true;
case MSG_BUNDLER:
case TUNNEL_PROTOCOL:
@@ -1054,7 +1046,6 @@
static int tipc_link_input(struct tipc_link *l, struct sk_buff *skb,
struct sk_buff_head *inputq)
{
- struct tipc_node *node = l->owner;
struct tipc_msg *hdr = buf_msg(skb);
struct sk_buff **reasm_skb = &l->reasm_buf;
struct sk_buff *iskb;
@@ -1095,13 +1086,15 @@
if (tipc_buf_append(reasm_skb, &skb)) {
l->stats.recv_fragmented++;
tipc_data_input(l, skb, inputq);
- } else if (!*reasm_skb) {
+ } else if (!*reasm_skb && !link_is_bc_rcvlink(l)) {
+ pr_warn_ratelimited("Unable to build fragment list\n");
return tipc_link_fsm_evt(l, LINK_FAILURE_EVT);
}
return 0;
} else if (usr == BCAST_PROTOCOL) {
- tipc_link_sync_rcv(node, skb);
- return 0;
+ tipc_bcast_lock(l->owner->net);
+ tipc_link_bc_init_rcv(l->bc_rcvlink, hdr);
+ tipc_bcast_unlock(l->owner->net);
}
drop:
kfree_skb(skb);
@@ -1124,12 +1117,28 @@
}
/* tipc_link_build_ack_msg: prepare link acknowledge message for transmission
+ *
+ * Note that sending of broadcast ack is coordinated among nodes, to reduce
+ * risk of ack storms towards the sender
*/
-void tipc_link_build_ack_msg(struct tipc_link *l, struct sk_buff_head *xmitq)
+int tipc_link_build_ack_msg(struct tipc_link *l, struct sk_buff_head *xmitq)
{
+ if (!l)
+ return 0;
+
+ /* Broadcast ACK must be sent via a unicast link => defer to caller */
+ if (link_is_bc_rcvlink(l)) {
+ if (((l->rcv_nxt ^ link_own_addr(l)) & 0xf) != 0xf)
+ return 0;
+ l->rcv_unacked = 0;
+ return TIPC_LINK_SND_BC_ACK;
+ }
+
+ /* Unicast ACK */
l->rcv_unacked = 0;
l->stats.sent_acks++;
tipc_link_build_proto_msg(l, STATE_MSG, 0, 0, 0, 0, xmitq);
+ return 0;
}
/* tipc_link_build_reset_msg: prepare link RESET or ACTIVATE message
@@ -1151,6 +1160,9 @@
{
u32 def_cnt = ++l->stats.deferred_recv;
+ if (link_is_bc_rcvlink(l))
+ return;
+
if ((skb_queue_len(&l->deferdq) == 1) || !(def_cnt % TIPC_NACK_INTV))
tipc_link_build_proto_msg(l, STATE_MSG, 0, 0, 0, 0, xmitq);
}
@@ -1211,12 +1223,11 @@
l->rcv_nxt++;
l->stats.recv_info++;
if (!tipc_data_input(l, skb, l->inputq))
- rc = tipc_link_input(l, skb, l->inputq);
- if (unlikely(rc))
- break;
+ rc |= tipc_link_input(l, skb, l->inputq);
if (unlikely(++l->rcv_unacked >= TIPC_MIN_LINK_WIN))
- tipc_link_build_ack_msg(l, xmitq);
-
+ rc |= tipc_link_build_ack_msg(l, xmitq);
+ if (unlikely(rc & ~TIPC_LINK_SND_BC_ACK))
+ break;
} while ((skb = __skb_dequeue(defq)));
return rc;
@@ -1284,18 +1295,13 @@
kfree_skb(skb);
}
-/* tipc_link_build_proto_msg: prepare link protocol message for transmission
- */
static void tipc_link_build_proto_msg(struct tipc_link *l, int mtyp, bool probe,
u16 rcvgap, int tolerance, int priority,
struct sk_buff_head *xmitq)
{
struct sk_buff *skb = NULL;
struct tipc_msg *hdr = l->pmsg;
- u16 snd_nxt = l->snd_nxt;
- u16 rcv_nxt = l->rcv_nxt;
- u16 rcv_last = rcv_nxt - 1;
- int node_up = l->owner->bclink.recv_permitted;
+ bool node_up = link_is_up(l->bc_rcvlink);
/* Don't send protocol message during reset or link failover */
if (tipc_link_is_blocked(l))
@@ -1303,33 +1309,34 @@
msg_set_type(hdr, mtyp);
msg_set_net_plane(hdr, l->net_plane);
- msg_set_bcast_ack(hdr, l->owner->bclink.last_in);
- msg_set_last_bcast(hdr, tipc_bclink_get_last_sent(l->owner->net));
+ msg_set_next_sent(hdr, l->snd_nxt);
+ msg_set_ack(hdr, l->rcv_nxt - 1);
+ msg_set_bcast_ack(hdr, l->bc_rcvlink->rcv_nxt - 1);
+ msg_set_last_bcast(hdr, l->bc_sndlink->snd_nxt - 1);
msg_set_link_tolerance(hdr, tolerance);
msg_set_linkprio(hdr, priority);
msg_set_redundant_link(hdr, node_up);
msg_set_seq_gap(hdr, 0);
/* Compatibility: created msg must not be in sequence with pkt flow */
- msg_set_seqno(hdr, snd_nxt + U16_MAX / 2);
+ msg_set_seqno(hdr, l->snd_nxt + U16_MAX / 2);
if (mtyp == STATE_MSG) {
if (!tipc_link_is_up(l))
return;
- msg_set_next_sent(hdr, snd_nxt);
/* Override rcvgap if there are packets in deferred queue */
if (!skb_queue_empty(&l->deferdq))
- rcvgap = buf_seqno(skb_peek(&l->deferdq)) - rcv_nxt;
+ rcvgap = buf_seqno(skb_peek(&l->deferdq)) - l->rcv_nxt;
if (rcvgap) {
msg_set_seq_gap(hdr, rcvgap);
l->stats.sent_nacks++;
}
- msg_set_ack(hdr, rcv_last);
msg_set_probe(hdr, probe);
if (probe)
l->stats.sent_probes++;
l->stats.sent_states++;
+ l->rcv_unacked = 0;
} else {
/* RESET_MSG or ACTIVATE_MSG */
msg_set_max_pkt(hdr, l->advertised_mtu);
@@ -1431,7 +1438,7 @@
char *if_name;
int rc = 0;
- if (tipc_link_is_blocked(l))
+ if (tipc_link_is_blocked(l) || !xmitq)
goto exit;
if (link_own_addr(l) > msg_prevnode(hdr))
@@ -1518,6 +1525,188 @@
return rc;
}
+/* tipc_link_build_bc_proto_msg() - create broadcast protocol message
+ */
+static bool tipc_link_build_bc_proto_msg(struct tipc_link *l, bool bcast,
+ u16 peers_snd_nxt,
+ struct sk_buff_head *xmitq)
+{
+ struct sk_buff *skb;
+ struct tipc_msg *hdr;
+ struct sk_buff *dfrd_skb = skb_peek(&l->deferdq);
+ u16 ack = l->rcv_nxt - 1;
+ u16 gap_to = peers_snd_nxt - 1;
+
+ skb = tipc_msg_create(BCAST_PROTOCOL, STATE_MSG, INT_H_SIZE,
+ 0, l->addr, link_own_addr(l), 0, 0, 0);
+ if (!skb)
+ return false;
+ hdr = buf_msg(skb);
+ msg_set_last_bcast(hdr, l->bc_sndlink->snd_nxt - 1);
+ msg_set_bcast_ack(hdr, ack);
+ msg_set_bcgap_after(hdr, ack);
+ if (dfrd_skb)
+ gap_to = buf_seqno(dfrd_skb) - 1;
+ msg_set_bcgap_to(hdr, gap_to);
+ msg_set_non_seq(hdr, bcast);
+ __skb_queue_tail(xmitq, skb);
+ return true;
+}
+
+/* tipc_link_build_bc_init_msg() - synchronize broadcast link endpoints.
+ *
+ * Give a newly added peer node the sequence number where it should
+ * start receiving and acking broadcast packets.
+ */
+void tipc_link_build_bc_init_msg(struct tipc_link *l,
+ struct sk_buff_head *xmitq)
+{
+ struct sk_buff_head list;
+
+ __skb_queue_head_init(&list);
+ if (!tipc_link_build_bc_proto_msg(l->bc_rcvlink, false, 0, &list))
+ return;
+ tipc_link_xmit(l, &list, xmitq);
+}
+
+/* tipc_link_bc_init_rcv - receive initial broadcast synch data from peer
+ */
+void tipc_link_bc_init_rcv(struct tipc_link *l, struct tipc_msg *hdr)
+{
+ int mtyp = msg_type(hdr);
+ u16 peers_snd_nxt = msg_bc_snd_nxt(hdr);
+
+ if (link_is_up(l))
+ return;
+
+ if (msg_user(hdr) == BCAST_PROTOCOL) {
+ l->rcv_nxt = peers_snd_nxt;
+ l->state = LINK_ESTABLISHED;
+ return;
+ }
+
+ if (l->peer_caps & TIPC_BCAST_SYNCH)
+ return;
+
+ if (msg_peer_node_is_up(hdr))
+ return;
+
+ /* Compatibility: accept older, less safe initial synch data */
+ if ((mtyp == RESET_MSG) || (mtyp == ACTIVATE_MSG))
+ l->rcv_nxt = peers_snd_nxt;
+}
+
+/* tipc_link_bc_sync_rcv - update rcv link according to peer's send state
+ */
+void tipc_link_bc_sync_rcv(struct tipc_link *l, struct tipc_msg *hdr,
+ struct sk_buff_head *xmitq)
+{
+ u16 peers_snd_nxt = msg_bc_snd_nxt(hdr);
+
+ if (!link_is_up(l))
+ return;
+
+ if (!msg_peer_node_is_up(hdr))
+ return;
+
+ l->bc_peer_is_up = true;
+
+ /* Ignore if peers_snd_nxt goes beyond receive window */
+ if (more(peers_snd_nxt, l->rcv_nxt + l->window))
+ return;
+
+ if (!more(peers_snd_nxt, l->rcv_nxt)) {
+ l->nack_state = BC_NACK_SND_CONDITIONAL;
+ return;
+ }
+
+ /* Don't NACK if one was recently sent or peeked */
+ if (l->nack_state == BC_NACK_SND_SUPPRESS) {
+ l->nack_state = BC_NACK_SND_UNCONDITIONAL;
+ return;
+ }
+
+ /* Conditionally delay NACK sending until next synch rcv */
+ if (l->nack_state == BC_NACK_SND_CONDITIONAL) {
+ l->nack_state = BC_NACK_SND_UNCONDITIONAL;
+ if ((peers_snd_nxt - l->rcv_nxt) < TIPC_MIN_LINK_WIN)
+ return;
+ }
+
+ /* Send NACK now but suppress next one */
+ tipc_link_build_bc_proto_msg(l, true, peers_snd_nxt, xmitq);
+ l->nack_state = BC_NACK_SND_SUPPRESS;
+}
+
+void tipc_link_bc_ack_rcv(struct tipc_link *l, u16 acked,
+ struct sk_buff_head *xmitq)
+{
+ struct sk_buff *skb, *tmp;
+ struct tipc_link *snd_l = l->bc_sndlink;
+
+ if (!link_is_up(l) || !l->bc_peer_is_up)
+ return;
+
+ if (!more(acked, l->acked))
+ return;
+
+ /* Skip over packets peer has already acked */
+ skb_queue_walk(&snd_l->transmq, skb) {
+ if (more(buf_seqno(skb), l->acked))
+ break;
+ }
+
+ /* Update/release the packets peer is acking now */
+ skb_queue_walk_from_safe(&snd_l->transmq, skb, tmp) {
+ if (more(buf_seqno(skb), acked))
+ break;
+ if (!--TIPC_SKB_CB(skb)->ackers) {
+ __skb_unlink(skb, &snd_l->transmq);
+ kfree_skb(skb);
+ }
+ }
+ l->acked = acked;
+ tipc_link_advance_backlog(snd_l, xmitq);
+ if (unlikely(!skb_queue_empty(&snd_l->wakeupq)))
+ link_prepare_wakeup(snd_l);
+}
+
+/* tipc_link_bc_nack_rcv(): receive broadcast nack message
+ */
+int tipc_link_bc_nack_rcv(struct tipc_link *l, struct sk_buff *skb,
+ struct sk_buff_head *xmitq)
+{
+ struct tipc_msg *hdr = buf_msg(skb);
+ u32 dnode = msg_destnode(hdr);
+ int mtyp = msg_type(hdr);
+ u16 acked = msg_bcast_ack(hdr);
+ u16 from = acked + 1;
+ u16 to = msg_bcgap_to(hdr);
+ u16 peers_snd_nxt = to + 1;
+ int rc = 0;
+
+ kfree_skb(skb);
+
+ if (!tipc_link_is_up(l) || !l->bc_peer_is_up)
+ return 0;
+
+ if (mtyp != STATE_MSG)
+ return 0;
+
+ if (dnode == link_own_addr(l)) {
+ tipc_link_bc_ack_rcv(l, acked, xmitq);
+ rc = tipc_link_retrans(l->bc_sndlink, from, to, xmitq);
+ l->stats.recv_nacks++;
+ return rc;
+ }
+
+ /* Msg for other node => suppress own NACK at next sync if applicable */
+ if (more(peers_snd_nxt, l->rcv_nxt) && !less(l->rcv_nxt, from))
+ l->nack_state = BC_NACK_SND_SUPPRESS;
+
+ return 0;
+}
+
void tipc_link_set_queue_limits(struct tipc_link *l, u32 win)
{
int max_bulk = TIPC_MAX_PUBLICATIONS / (l->mtu / ITEM_SIZE);
diff --git a/net/tipc/link.h b/net/tipc/link.h
index d23329d..28a6396 100644
--- a/net/tipc/link.h
+++ b/net/tipc/link.h
@@ -66,7 +66,8 @@
*/
enum {
TIPC_LINK_UP_EVT = 1,
- TIPC_LINK_DOWN_EVT = (1 << 1)
+ TIPC_LINK_DOWN_EVT = (1 << 1),
+ TIPC_LINK_SND_BC_ACK = (1 << 2)
};
/* Starting value for maximum packet size negotiation on unicast links
@@ -209,6 +210,10 @@
/* Broadcast */
u16 ackers;
u16 acked;
+ struct tipc_link *bc_rcvlink;
+ struct tipc_link *bc_sndlink;
+ int nack_state;
+ bool bc_peer_is_up;
/* Statistics */
struct tipc_stats stats;
@@ -217,17 +222,21 @@
bool tipc_link_create(struct tipc_node *n, char *if_name, int bearer_id,
int tolerance, char net_plane, u32 mtu, int priority,
int window, u32 session, u32 ownnode, u32 peer,
- u16 peer_caps, struct tipc_media_addr *maddr,
- struct sk_buff_head *inputq, struct sk_buff_head *namedq,
+ u16 peer_caps,
+ struct tipc_media_addr *maddr,
+ struct tipc_link *bc_sndlink,
+ struct tipc_link *bc_rcvlink,
+ struct sk_buff_head *inputq,
+ struct sk_buff_head *namedq,
struct tipc_link **link);
-bool tipc_link_bc_create(struct tipc_node *n, int mtu, int window,
- u16 peer_caps, struct sk_buff_head *inputq,
+bool tipc_link_bc_create(struct tipc_node *n, u32 ownnode, u32 peer,
+ int mtu, int window, u16 peer_caps,
+ struct sk_buff_head *inputq,
struct sk_buff_head *namedq,
+ struct tipc_link *bc_sndlink,
struct tipc_link **link);
void tipc_link_tnl_prepare(struct tipc_link *l, struct tipc_link *tnl,
int mtyp, struct sk_buff_head *xmitq);
-void tipc_link_build_bcast_sync_msg(struct tipc_link *l,
- struct sk_buff_head *xmitq);
void tipc_link_build_reset_msg(struct tipc_link *l, struct sk_buff_head *xmitq);
int tipc_link_fsm_evt(struct tipc_link *l, int evt);
void tipc_link_reset_fragments(struct tipc_link *l_ptr);
@@ -264,9 +273,21 @@
int tipc_link_timeout(struct tipc_link *l, struct sk_buff_head *xmitq);
int tipc_link_rcv(struct tipc_link *l, struct sk_buff *skb,
struct sk_buff_head *xmitq);
-void tipc_link_build_ack_msg(struct tipc_link *l, struct sk_buff_head *xmitq);
-void tipc_link_add_bc_peer(struct tipc_link *l);
-void tipc_link_remove_bc_peer(struct tipc_link *l);
+int tipc_link_build_ack_msg(struct tipc_link *l, struct sk_buff_head *xmitq);
+void tipc_link_add_bc_peer(struct tipc_link *snd_l,
+ struct tipc_link *uc_l,
+ struct sk_buff_head *xmitq);
+void tipc_link_remove_bc_peer(struct tipc_link *snd_l,
+ struct tipc_link *rcv_l,
+ struct sk_buff_head *xmitq);
int tipc_link_bc_peers(struct tipc_link *l);
-
+void tipc_link_bc_ack_rcv(struct tipc_link *l, u16 acked,
+ struct sk_buff_head *xmitq);
+void tipc_link_build_bc_sync_msg(struct tipc_link *l,
+ struct sk_buff_head *xmitq);
+void tipc_link_bc_init_rcv(struct tipc_link *l, struct tipc_msg *hdr);
+void tipc_link_bc_sync_rcv(struct tipc_link *l, struct tipc_msg *hdr,
+ struct sk_buff_head *xmitq);
+int tipc_link_bc_nack_rcv(struct tipc_link *l, struct sk_buff *skb,
+ struct sk_buff_head *xmitq);
#endif
diff --git a/net/tipc/msg.c b/net/tipc/msg.c
index 5b47468..8740930 100644
--- a/net/tipc/msg.c
+++ b/net/tipc/msg.c
@@ -182,7 +182,6 @@
*buf = NULL;
return 0;
err:
- pr_warn_ratelimited("Unable to build fragment list\n");
kfree_skb(*buf);
kfree_skb(*headbuf);
*buf = *headbuf = NULL;
diff --git a/net/tipc/msg.h b/net/tipc/msg.h
index fbf51fa..55778a0 100644
--- a/net/tipc/msg.h
+++ b/net/tipc/msg.h
@@ -601,6 +601,11 @@
return msg_bits(m, 4, 16, 0xffff);
}
+static inline u32 msg_bc_snd_nxt(struct tipc_msg *m)
+{
+ return msg_last_bcast(m) + 1;
+}
+
static inline void msg_set_last_bcast(struct tipc_msg *m, u32 n)
{
msg_set_bits(m, 4, 16, 0xffff, n);
diff --git a/net/tipc/node.c b/net/tipc/node.c
index 28bcd7b..cd92455 100644
--- a/net/tipc/node.c
+++ b/net/tipc/node.c
@@ -72,7 +72,6 @@
static void tipc_node_link_down(struct tipc_node *n, int bearer_id,
bool delete);
static void node_lost_contact(struct tipc_node *n, struct sk_buff_head *inputq);
-static void node_established_contact(struct tipc_node *n_ptr);
static void tipc_node_delete(struct tipc_node *node);
static void tipc_node_timeout(unsigned long data);
static void tipc_node_fsm_evt(struct tipc_node *n, int evt);
@@ -165,8 +164,10 @@
INIT_LIST_HEAD(&n_ptr->list);
INIT_LIST_HEAD(&n_ptr->publ_list);
INIT_LIST_HEAD(&n_ptr->conn_sks);
- skb_queue_head_init(&n_ptr->bclink.namedq);
- __skb_queue_head_init(&n_ptr->bclink.deferdq);
+ skb_queue_head_init(&n_ptr->bc_entry.namedq);
+ skb_queue_head_init(&n_ptr->bc_entry.inputq1);
+ __skb_queue_head_init(&n_ptr->bc_entry.arrvq);
+ skb_queue_head_init(&n_ptr->bc_entry.inputq2);
hlist_add_head_rcu(&n_ptr->hash, &tn->node_htable[tipc_hashfn(addr)]);
list_for_each_entry_rcu(temp_node, &tn->node_list, list) {
if (n_ptr->addr < temp_node->addr)
@@ -177,6 +178,18 @@
n_ptr->signature = INVALID_NODE_SIG;
n_ptr->active_links[0] = INVALID_BEARER_ID;
n_ptr->active_links[1] = INVALID_BEARER_ID;
+ if (!tipc_link_bc_create(n_ptr, tipc_own_addr(net), n_ptr->addr,
+ U16_MAX, tipc_bc_sndlink(net)->window,
+ n_ptr->capabilities,
+ &n_ptr->bc_entry.inputq1,
+ &n_ptr->bc_entry.namedq,
+ tipc_bc_sndlink(net),
+ &n_ptr->bc_entry.link)) {
+ pr_warn("Broadcast rcv link creation failed, no memory\n");
+ kfree(n_ptr);
+ n_ptr = NULL;
+ goto exit;
+ }
tipc_node_get(n_ptr);
setup_timer(&n_ptr->timer, tipc_node_timeout, (unsigned long)n_ptr);
n_ptr->keepalive_intv = U32_MAX;
@@ -203,6 +216,7 @@
{
list_del_rcu(&node->list);
hlist_del_rcu(&node->hash);
+ kfree(node->bc_entry.link);
kfree_rcu(node, rcu);
}
@@ -340,8 +354,9 @@
if (!ol) {
*slot0 = bearer_id;
*slot1 = bearer_id;
- tipc_link_build_bcast_sync_msg(nl, xmitq);
- node_established_contact(n);
+ tipc_node_fsm_evt(n, SELF_ESTABL_CONTACT_EVT);
+ n->action_flags |= TIPC_NOTIFY_NODE_UP;
+ tipc_bcast_add_peer(n->net, n->addr, nl, xmitq);
return;
}
@@ -585,9 +600,10 @@
b->net_plane, b->mtu, b->priority,
b->window, mod(tipc_net(net)->random),
tipc_own_addr(net), onode,
- n->capabilities,
- &le->maddr, &le->inputq,
- &n->bclink.namedq, &l)) {
+ n->capabilities, &le->maddr,
+ tipc_bc_sndlink(n->net), n->bc_entry.link,
+ &le->inputq,
+ &n->bc_entry.namedq, &l)) {
*respond = false;
goto exit;
}
@@ -830,58 +846,36 @@
return true;
}
-static void node_established_contact(struct tipc_node *n_ptr)
-{
- tipc_node_fsm_evt(n_ptr, SELF_ESTABL_CONTACT_EVT);
- n_ptr->action_flags |= TIPC_NOTIFY_NODE_UP;
- n_ptr->bclink.oos_state = 0;
- n_ptr->bclink.acked = tipc_bclink_get_last_sent(n_ptr->net);
- tipc_bclink_add_node(n_ptr->net, n_ptr->addr);
-}
-
-static void node_lost_contact(struct tipc_node *n_ptr,
+static void node_lost_contact(struct tipc_node *n,
struct sk_buff_head *inputq)
{
char addr_string[16];
struct tipc_sock_conn *conn, *safe;
struct tipc_link *l;
- struct list_head *conns = &n_ptr->conn_sks;
+ struct list_head *conns = &n->conn_sks;
struct sk_buff *skb;
- struct tipc_net *tn = net_generic(n_ptr->net, tipc_net_id);
uint i;
pr_debug("Lost contact with %s\n",
- tipc_addr_string_fill(addr_string, n_ptr->addr));
+ tipc_addr_string_fill(addr_string, n->addr));
- /* Flush broadcast link info associated with lost node */
- if (n_ptr->bclink.recv_permitted) {
- __skb_queue_purge(&n_ptr->bclink.deferdq);
-
- if (n_ptr->bclink.reasm_buf) {
- kfree_skb(n_ptr->bclink.reasm_buf);
- n_ptr->bclink.reasm_buf = NULL;
- }
-
- tipc_bclink_remove_node(n_ptr->net, n_ptr->addr);
- tipc_bclink_acknowledge(n_ptr, INVALID_LINK_SEQ);
-
- n_ptr->bclink.recv_permitted = false;
- }
+ /* Clean up broadcast state */
+ tipc_bcast_remove_peer(n->net, n->addr, n->bc_entry.link);
/* Abort any ongoing link failover */
for (i = 0; i < MAX_BEARERS; i++) {
- l = n_ptr->links[i].link;
+ l = n->links[i].link;
if (l)
tipc_link_fsm_evt(l, LINK_FAILOVER_END_EVT);
}
/* Notify publications from this node */
- n_ptr->action_flags |= TIPC_NOTIFY_NODE_DOWN;
+ n->action_flags |= TIPC_NOTIFY_NODE_DOWN;
/* Notify sockets connected to node */
list_for_each_entry_safe(conn, safe, conns, list) {
skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE, TIPC_CONN_MSG,
- SHORT_H_SIZE, 0, tn->own_addr,
+ SHORT_H_SIZE, 0, tipc_own_addr(n->net),
conn->peer_node, conn->port,
conn->peer_port, TIPC_ERR_NO_NODE);
if (likely(skb))
@@ -1086,6 +1080,67 @@
}
/**
+ * tipc_node_bc_rcv - process TIPC broadcast packet arriving from off-node
+ * @net: the applicable net namespace
+ * @skb: TIPC packet
+ * @bearer_id: id of bearer message arrived on
+ *
+ * Invoked with no locks held.
+ */
+void tipc_node_bc_rcv(struct net *net, struct sk_buff *skb, int bearer_id)
+{
+ int rc;
+ struct sk_buff_head xmitq;
+ struct tipc_bclink_entry *be;
+ struct tipc_link_entry *le;
+ struct tipc_msg *hdr = buf_msg(skb);
+ int usr = msg_user(hdr);
+ u32 dnode = msg_destnode(hdr);
+ struct tipc_node *n;
+
+ __skb_queue_head_init(&xmitq);
+
+ /* If NACK for other node, let rcv link for that node peek into it */
+ if ((usr == BCAST_PROTOCOL) && (dnode != tipc_own_addr(net)))
+ n = tipc_node_find(net, dnode);
+ else
+ n = tipc_node_find(net, msg_prevnode(hdr));
+ if (!n) {
+ kfree_skb(skb);
+ return;
+ }
+ be = &n->bc_entry;
+ le = &n->links[bearer_id];
+
+ rc = tipc_bcast_rcv(net, be->link, skb);
+
+ /* Broadcast link reset may happen at reassembly failure */
+ if (rc & TIPC_LINK_DOWN_EVT)
+ tipc_node_reset_links(n);
+
+ /* Broadcast ACKs are sent on a unicast link */
+ if (rc & TIPC_LINK_SND_BC_ACK) {
+ tipc_node_lock(n);
+ tipc_link_build_ack_msg(le->link, &xmitq);
+ tipc_node_unlock(n);
+ }
+
+ if (!skb_queue_empty(&xmitq))
+ tipc_bearer_xmit(net, bearer_id, &xmitq, &le->maddr);
+
+ /* Deliver. 'arrvq' is under inputq2's lock protection */
+ if (!skb_queue_empty(&be->inputq1)) {
+ spin_lock_bh(&be->inputq2.lock);
+ spin_lock_bh(&be->inputq1.lock);
+ skb_queue_splice_tail_init(&be->inputq1, &be->arrvq);
+ spin_unlock_bh(&be->inputq1.lock);
+ spin_unlock_bh(&be->inputq2.lock);
+ tipc_sk_mcast_rcv(net, &be->arrvq, &be->inputq2);
+ }
+ tipc_node_put(n);
+}
+
+/**
* tipc_node_check_state - check and if necessary update node state
* @skb: TIPC packet
* @bearer_id: identity of bearer delivering the packet
@@ -1227,6 +1282,7 @@
int usr = msg_user(hdr);
int bearer_id = b->identity;
struct tipc_link_entry *le;
+ u16 bc_ack = msg_bcast_ack(hdr);
int rc = 0;
__skb_queue_head_init(&xmitq);
@@ -1235,13 +1291,12 @@
if (unlikely(!tipc_msg_validate(skb)))
goto discard;
- /* Handle arrival of a non-unicast link packet */
+ /* Handle arrival of discovery or broadcast packet */
if (unlikely(msg_non_seq(hdr))) {
- if (usr == LINK_CONFIG)
- tipc_disc_rcv(net, skb, b);
+ if (unlikely(usr == LINK_CONFIG))
+ return tipc_disc_rcv(net, skb, b);
else
- tipc_bclink_rcv(net, skb);
- return;
+ return tipc_node_bc_rcv(net, skb, bearer_id);
}
/* Locate neighboring node that sent packet */
@@ -1250,19 +1305,18 @@
goto discard;
le = &n->links[bearer_id];
+ /* Ensure broadcast reception is in synch with peer's send state */
+ if (unlikely(usr == LINK_PROTOCOL))
+ tipc_bcast_sync_rcv(net, n->bc_entry.link, hdr);
+ else if (unlikely(n->bc_entry.link->acked != bc_ack))
+ tipc_bcast_ack_rcv(net, n->bc_entry.link, bc_ack);
+
tipc_node_lock(n);
/* Is reception permitted at the moment ? */
if (!tipc_node_filter_pkt(n, hdr))
goto unlock;
- if (unlikely(msg_user(hdr) == LINK_PROTOCOL))
- tipc_bclink_sync_state(n, hdr);
-
- /* Release acked broadcast packets */
- if (unlikely(n->bclink.acked != msg_bcast_ack(hdr)))
- tipc_bclink_acknowledge(n, msg_bcast_ack(hdr));
-
/* Check and if necessary update node state */
if (likely(tipc_node_check_state(n, skb, bearer_id, &xmitq))) {
rc = tipc_link_rcv(le->link, skb, &xmitq);
@@ -1277,8 +1331,8 @@
if (unlikely(rc & TIPC_LINK_DOWN_EVT))
tipc_node_link_down(n, bearer_id, false);
- if (unlikely(!skb_queue_empty(&n->bclink.namedq)))
- tipc_named_rcv(net, &n->bclink.namedq);
+ if (unlikely(!skb_queue_empty(&n->bc_entry.namedq)))
+ tipc_named_rcv(net, &n->bc_entry.namedq);
if (!skb_queue_empty(&le->inputq))
tipc_sk_rcv(net, &le->inputq);
diff --git a/net/tipc/node.h b/net/tipc/node.h
index 1465774..36a1cd0 100644
--- a/net/tipc/node.h
+++ b/net/tipc/node.h
@@ -100,6 +100,14 @@
struct tipc_media_addr maddr;
};
+struct tipc_bclink_entry {
+ struct tipc_link *link;
+ struct sk_buff_head inputq1;
+ struct sk_buff_head arrvq;
+ struct sk_buff_head inputq2;
+ struct sk_buff_head namedq;
+};
+
/**
* struct tipc_node - TIPC node structure
* @addr: network address of node
@@ -132,6 +140,7 @@
struct hlist_node hash;
int active_links[2];
struct tipc_link_entry links[MAX_BEARERS];
+ struct tipc_bclink_entry bc_entry;
int action_flags;
struct tipc_node_bclink bclink;
struct list_head list;