tipc: reduce risk of user starvation during link congestion

The socket code currently handles link congestion by either blocking
and trying to send again when the congestion has abated, or just
returning to the user with -EAGAIN and let him re-try later.

This mechanism is prone to starvation, because the wakeup algorithm is
non-atomic. During the time the link issues a wakeup signal, until the
socket wakes up and re-attempts sending, other senders may have come
in between and occupied the free buffer space in the link. This in turn
may lead to a socket having to make many send attempts before it is
successful. In extremely loaded systems we have observed latency times
of several seconds before a low-priority socket is able to send out a
message.

In this commit, we simplify this mechanism and reduce the risk of the
described scenario happening. When a message is attempted sent via a
congested link, we now let it be added to the link's backlog queue
anyway, thus permitting an oversubscription of one message per source
socket. We still create a wakeup item and return an error code, hence
instructing the sender to block or stop sending. Only when enough space
has been freed up in the link's backlog queue do we issue a wakeup event
that allows the sender to continue with the next message, if any.

The fact that a socket now can consider a message sent even when the
link returns a congestion code means that the sending socket code can
be simplified. Also, since this is a good opportunity to get rid of the
obsolete 'mtu change' condition in the three socket send functions, we
now choose to refactor those functions completely.

Signed-off-by: Parthasarathy Bhuvaragan <parthasarathy.bhuvaragan@ericsson.com>
Acked-by: Ying Xue <ying.xue@windriver.com>
Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
Signed-off-by: David S. Miller <davem@davemloft.net>
diff --git a/net/tipc/link.c b/net/tipc/link.c
index bda89bf..b758ca8 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -776,60 +776,47 @@
 
 /**
  * link_schedule_user - schedule a message sender for wakeup after congestion
- * @link: congested link
- * @list: message that was attempted sent
+ * @l: congested link
+ * @hdr: header of message that is being sent
  * Create pseudo msg to send back to user when congestion abates
- * Does not consume buffer list
  */
-static int link_schedule_user(struct tipc_link *link, struct sk_buff_head *list)
+static int link_schedule_user(struct tipc_link *l, struct tipc_msg *hdr)
 {
-	struct tipc_msg *msg = buf_msg(skb_peek(list));
-	int imp = msg_importance(msg);
-	u32 oport = msg_origport(msg);
-	u32 addr = tipc_own_addr(link->net);
+	u32 dnode = tipc_own_addr(l->net);
+	u32 dport = msg_origport(hdr);
 	struct sk_buff *skb;
 
-	/* This really cannot happen...  */
-	if (unlikely(imp > TIPC_CRITICAL_IMPORTANCE)) {
-		pr_warn("%s<%s>, send queue full", link_rst_msg, link->name);
-		return -ENOBUFS;
-	}
-	/* Non-blocking sender: */
-	if (TIPC_SKB_CB(skb_peek(list))->wakeup_pending)
-		return -ELINKCONG;
-
 	/* Create and schedule wakeup pseudo message */
 	skb = tipc_msg_create(SOCK_WAKEUP, 0, INT_H_SIZE, 0,
-			      addr, addr, oport, 0, 0);
+			      dnode, l->addr, dport, 0, 0);
 	if (!skb)
 		return -ENOBUFS;
-	TIPC_SKB_CB(skb)->chain_sz = skb_queue_len(list);
-	TIPC_SKB_CB(skb)->chain_imp = imp;
-	skb_queue_tail(&link->wakeupq, skb);
-	link->stats.link_congs++;
+	msg_set_dest_droppable(buf_msg(skb), true);
+	TIPC_SKB_CB(skb)->chain_imp = msg_importance(hdr);
+	skb_queue_tail(&l->wakeupq, skb);
+	l->stats.link_congs++;
 	return -ELINKCONG;
 }
 
 /**
  * link_prepare_wakeup - prepare users for wakeup after congestion
- * @link: congested link
- * Move a number of waiting users, as permitted by available space in
- * the send queue, from link wait queue to node wait queue for wakeup
+ * @l: congested link
+ * Wake up a number of waiting users, as permitted by available space
+ * in the send queue
  */
 void link_prepare_wakeup(struct tipc_link *l)
 {
-	int pnd[TIPC_SYSTEM_IMPORTANCE + 1] = {0,};
-	int imp, lim;
 	struct sk_buff *skb, *tmp;
+	int imp, i = 0;
 
 	skb_queue_walk_safe(&l->wakeupq, skb, tmp) {
 		imp = TIPC_SKB_CB(skb)->chain_imp;
-		lim = l->backlog[imp].limit;
-		pnd[imp] += TIPC_SKB_CB(skb)->chain_sz;
-		if ((pnd[imp] + l->backlog[imp].len) >= lim)
+		if (l->backlog[imp].len < l->backlog[imp].limit) {
+			skb_unlink(skb, &l->wakeupq);
+			skb_queue_tail(l->inputq, skb);
+		} else if (i++ > 10) {
 			break;
-		skb_unlink(skb, &l->wakeupq);
-		skb_queue_tail(l->inputq, skb);
+		}
 	}
 }
 
@@ -869,8 +856,7 @@
  * @list: chain of buffers containing message
  * @xmitq: returned list of packets to be sent by caller
  *
- * Consumes the buffer chain, except when returning -ELINKCONG,
- * since the caller then may want to make more send attempts.
+ * Consumes the buffer chain.
  * Returns 0 if success, or errno: -ELINKCONG, -EMSGSIZE or -ENOBUFS
  * Messages at TIPC_SYSTEM_IMPORTANCE are always accepted
  */
@@ -879,7 +865,7 @@
 {
 	struct tipc_msg *hdr = buf_msg(skb_peek(list));
 	unsigned int maxwin = l->window;
-	unsigned int i, imp = msg_importance(hdr);
+	int imp = msg_importance(hdr);
 	unsigned int mtu = l->mtu;
 	u16 ack = l->rcv_nxt - 1;
 	u16 seqno = l->snd_nxt;
@@ -888,19 +874,22 @@
 	struct sk_buff_head *backlogq = &l->backlogq;
 	struct sk_buff *skb, *_skb, *bskb;
 	int pkt_cnt = skb_queue_len(list);
+	int rc = 0;
 
-	/* Match msg importance against this and all higher backlog limits: */
-	if (!skb_queue_empty(backlogq)) {
-		for (i = imp; i <= TIPC_SYSTEM_IMPORTANCE; i++) {
-			if (unlikely(l->backlog[i].len >= l->backlog[i].limit))
-				return link_schedule_user(l, list);
-		}
-	}
 	if (unlikely(msg_size(hdr) > mtu)) {
 		skb_queue_purge(list);
 		return -EMSGSIZE;
 	}
 
+	/* Allow oversubscription of one data msg per source at congestion */
+	if (unlikely(l->backlog[imp].len >= l->backlog[imp].limit)) {
+		if (imp == TIPC_SYSTEM_IMPORTANCE) {
+			pr_warn("%s<%s>, link overflow", link_rst_msg, l->name);
+			return -ENOBUFS;
+		}
+		rc = link_schedule_user(l, hdr);
+	}
+
 	if (pkt_cnt > 1) {
 		l->stats.sent_fragmented++;
 		l->stats.sent_fragments += pkt_cnt;
@@ -946,7 +935,7 @@
 		skb_queue_splice_tail_init(list, backlogq);
 	}
 	l->snd_nxt = seqno;
-	return 0;
+	return rc;
 }
 
 void tipc_link_advance_backlog(struct tipc_link *l, struct sk_buff_head *xmitq)