ipvs: wakeup master thread

	High rate of sync messages in master can lead to
overflowing the socket buffer and dropping the messages.
Fixed sleep of 1 second without wakeup events is not suitable
for loaded masters,

	Use delayed_work to schedule sending for queued messages
and limit the delay to IPVS_SYNC_SEND_DELAY (20ms). This will
reduce the rate of wakeups but to avoid sending long bursts we
wakeup the master thread after IPVS_SYNC_WAKEUP_RATE (8) messages.

	Add hard limit for the queued messages before sending
by using "sync_qlen_max" sysctl var. It defaults to 1/32 of
the memory pages but actually represents number of messages.
It will protect us from allocating large parts of memory
when the sending rate is lower than the queuing rate.

	As suggested by Pablo, add new sysctl var
"sync_sock_size" to configure the SNDBUF (master) or
RCVBUF (slave) socket limit. Default value is 0 (preserve
system defaults).

	Change the master thread to detect and block on
SNDBUF overflow, so that we do not drop messages when
the socket limit is low but the sync_qlen_max limit is
not reached. On ENOBUFS or other errors just drop the
messages.

	Change master thread to enter TASK_INTERRUPTIBLE
state early, so that we do not miss wakeups due to messages or
kthread_should_stop event.

Thanks to Pablo Neira Ayuso for his valuable feedback!

Signed-off-by: Julian Anastasov <ja@ssi.bg>
Signed-off-by: Simon Horman <horms@verge.net.au>
diff --git a/include/net/ip_vs.h b/include/net/ip_vs.h
index 93b81aa..30e43c8 100644
--- a/include/net/ip_vs.h
+++ b/include/net/ip_vs.h
@@ -869,6 +869,8 @@
 #endif
 	int			sysctl_snat_reroute;
 	int			sysctl_sync_ver;
+	int			sysctl_sync_qlen_max;
+	int			sysctl_sync_sock_size;
 	int			sysctl_cache_bypass;
 	int			sysctl_expire_nodest_conn;
 	int			sysctl_expire_quiescent_template;
@@ -889,6 +891,9 @@
 	struct timer_list	est_timer;	/* Estimation timer */
 	/* ip_vs_sync */
 	struct list_head	sync_queue;
+	int			sync_queue_len;
+	unsigned int		sync_queue_delay;
+	struct delayed_work	master_wakeup_work;
 	spinlock_t		sync_lock;
 	struct ip_vs_sync_buff  *sync_buff;
 	spinlock_t		sync_buff_lock;
@@ -911,6 +916,10 @@
 #define DEFAULT_SYNC_THRESHOLD	3
 #define DEFAULT_SYNC_PERIOD	50
 #define DEFAULT_SYNC_VER	1
+#define IPVS_SYNC_WAKEUP_RATE	8
+#define IPVS_SYNC_QLEN_MAX	(IPVS_SYNC_WAKEUP_RATE * 4)
+#define IPVS_SYNC_SEND_DELAY	(HZ / 50)
+#define IPVS_SYNC_CHECK_PERIOD	HZ
 
 #ifdef CONFIG_SYSCTL
 
@@ -929,6 +938,16 @@
 	return ipvs->sysctl_sync_ver;
 }
 
+static inline int sysctl_sync_qlen_max(struct netns_ipvs *ipvs)
+{
+	return ipvs->sysctl_sync_qlen_max;
+}
+
+static inline int sysctl_sync_sock_size(struct netns_ipvs *ipvs)
+{
+	return ipvs->sysctl_sync_sock_size;
+}
+
 #else
 
 static inline int sysctl_sync_threshold(struct netns_ipvs *ipvs)
@@ -946,6 +965,16 @@
 	return DEFAULT_SYNC_VER;
 }
 
+static inline int sysctl_sync_qlen_max(struct netns_ipvs *ipvs)
+{
+	return IPVS_SYNC_QLEN_MAX;
+}
+
+static inline int sysctl_sync_sock_size(struct netns_ipvs *ipvs)
+{
+	return 0;
+}
+
 #endif
 
 /*
diff --git a/net/netfilter/ipvs/ip_vs_ctl.c b/net/netfilter/ipvs/ip_vs_ctl.c
index 37b9199..bd3827e 100644
--- a/net/netfilter/ipvs/ip_vs_ctl.c
+++ b/net/netfilter/ipvs/ip_vs_ctl.c
@@ -1718,6 +1718,18 @@
 		.proc_handler	= &proc_do_sync_mode,
 	},
 	{
+		.procname	= "sync_qlen_max",
+		.maxlen		= sizeof(int),
+		.mode		= 0644,
+		.proc_handler	= proc_dointvec,
+	},
+	{
+		.procname	= "sync_sock_size",
+		.maxlen		= sizeof(int),
+		.mode		= 0644,
+		.proc_handler	= proc_dointvec,
+	},
+	{
 		.procname	= "cache_bypass",
 		.maxlen		= sizeof(int),
 		.mode		= 0644,
@@ -3655,6 +3667,10 @@
 	tbl[idx++].data = &ipvs->sysctl_snat_reroute;
 	ipvs->sysctl_sync_ver = 1;
 	tbl[idx++].data = &ipvs->sysctl_sync_ver;
+	ipvs->sysctl_sync_qlen_max = nr_free_buffer_pages() / 32;
+	tbl[idx++].data = &ipvs->sysctl_sync_qlen_max;
+	ipvs->sysctl_sync_sock_size = 0;
+	tbl[idx++].data = &ipvs->sysctl_sync_sock_size;
 	tbl[idx++].data = &ipvs->sysctl_cache_bypass;
 	tbl[idx++].data = &ipvs->sysctl_expire_nodest_conn;
 	tbl[idx++].data = &ipvs->sysctl_expire_quiescent_template;
diff --git a/net/netfilter/ipvs/ip_vs_sync.c b/net/netfilter/ipvs/ip_vs_sync.c
index d2df694..b3235b2 100644
--- a/net/netfilter/ipvs/ip_vs_sync.c
+++ b/net/netfilter/ipvs/ip_vs_sync.c
@@ -307,11 +307,15 @@
 	spin_lock_bh(&ipvs->sync_lock);
 	if (list_empty(&ipvs->sync_queue)) {
 		sb = NULL;
+		__set_current_state(TASK_INTERRUPTIBLE);
 	} else {
 		sb = list_entry(ipvs->sync_queue.next,
 				struct ip_vs_sync_buff,
 				list);
 		list_del(&sb->list);
+		ipvs->sync_queue_len--;
+		if (!ipvs->sync_queue_len)
+			ipvs->sync_queue_delay = 0;
 	}
 	spin_unlock_bh(&ipvs->sync_lock);
 
@@ -358,9 +362,16 @@
 	struct ip_vs_sync_buff *sb = ipvs->sync_buff;
 
 	spin_lock(&ipvs->sync_lock);
-	if (ipvs->sync_state & IP_VS_STATE_MASTER)
+	if (ipvs->sync_state & IP_VS_STATE_MASTER &&
+	    ipvs->sync_queue_len < sysctl_sync_qlen_max(ipvs)) {
+		if (!ipvs->sync_queue_len)
+			schedule_delayed_work(&ipvs->master_wakeup_work,
+					      max(IPVS_SYNC_SEND_DELAY, 1));
+		ipvs->sync_queue_len++;
 		list_add_tail(&sb->list, &ipvs->sync_queue);
-	else
+		if ((++ipvs->sync_queue_delay) == IPVS_SYNC_WAKEUP_RATE)
+			wake_up_process(ipvs->master_thread);
+	} else
 		ip_vs_sync_buff_release(sb);
 	spin_unlock(&ipvs->sync_lock);
 }
@@ -379,6 +390,7 @@
 	    time_after_eq(jiffies - ipvs->sync_buff->firstuse, time)) {
 		sb = ipvs->sync_buff;
 		ipvs->sync_buff = NULL;
+		__set_current_state(TASK_RUNNING);
 	} else
 		sb = NULL;
 	spin_unlock_bh(&ipvs->sync_buff_lock);
@@ -392,26 +404,23 @@
 void ip_vs_sync_switch_mode(struct net *net, int mode)
 {
 	struct netns_ipvs *ipvs = net_ipvs(net);
-
-	if (!(ipvs->sync_state & IP_VS_STATE_MASTER))
-		return;
-	if (mode == sysctl_sync_ver(ipvs) || !ipvs->sync_buff)
-		return;
+	struct ip_vs_sync_buff *sb;
 
 	spin_lock_bh(&ipvs->sync_buff_lock);
+	if (!(ipvs->sync_state & IP_VS_STATE_MASTER))
+		goto unlock;
+	sb = ipvs->sync_buff;
+	if (mode == sysctl_sync_ver(ipvs) || !sb)
+		goto unlock;
+
 	/* Buffer empty ? then let buf_create do the job  */
-	if (ipvs->sync_buff->mesg->size <=  sizeof(struct ip_vs_sync_mesg)) {
-		kfree(ipvs->sync_buff);
+	if (sb->mesg->size <= sizeof(struct ip_vs_sync_mesg)) {
+		ip_vs_sync_buff_release(sb);
 		ipvs->sync_buff = NULL;
-	} else {
-		spin_lock_bh(&ipvs->sync_lock);
-		if (ipvs->sync_state & IP_VS_STATE_MASTER)
-			list_add_tail(&ipvs->sync_buff->list,
-				      &ipvs->sync_queue);
-		else
-			ip_vs_sync_buff_release(ipvs->sync_buff);
-		spin_unlock_bh(&ipvs->sync_lock);
-	}
+	} else
+		sb_queue_tail(ipvs);
+
+unlock:
 	spin_unlock_bh(&ipvs->sync_buff_lock);
 }
 
@@ -1130,6 +1139,28 @@
 
 
 /*
+ *      Setup sndbuf (mode=1) or rcvbuf (mode=0)
+ */
+static void set_sock_size(struct sock *sk, int mode, int val)
+{
+	/* setsockopt(sock, SOL_SOCKET, SO_SNDBUF, &val, sizeof(val)); */
+	/* setsockopt(sock, SOL_SOCKET, SO_RCVBUF, &val, sizeof(val)); */
+	lock_sock(sk);
+	if (mode) {
+		val = clamp_t(int, val, (SOCK_MIN_SNDBUF + 1) / 2,
+			      sysctl_wmem_max);
+		sk->sk_sndbuf = val * 2;
+		sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
+	} else {
+		val = clamp_t(int, val, (SOCK_MIN_RCVBUF + 1) / 2,
+			      sysctl_rmem_max);
+		sk->sk_rcvbuf = val * 2;
+		sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
+	}
+	release_sock(sk);
+}
+
+/*
  *      Setup loopback of outgoing multicasts on a sending socket
  */
 static void set_mcast_loop(struct sock *sk, u_char loop)
@@ -1305,6 +1336,9 @@
 
 	set_mcast_loop(sock->sk, 0);
 	set_mcast_ttl(sock->sk, 1);
+	result = sysctl_sync_sock_size(ipvs);
+	if (result > 0)
+		set_sock_size(sock->sk, 1, result);
 
 	result = bind_mcastif_addr(sock, ipvs->master_mcast_ifn);
 	if (result < 0) {
@@ -1350,6 +1384,9 @@
 	sk_change_net(sock->sk, net);
 	/* it is equivalent to the REUSEADDR option in user-space */
 	sock->sk->sk_reuse = SK_CAN_REUSE;
+	result = sysctl_sync_sock_size(ipvs);
+	if (result > 0)
+		set_sock_size(sock->sk, 0, result);
 
 	result = sock->ops->bind(sock, (struct sockaddr *) &mcast_addr,
 			sizeof(struct sockaddr));
@@ -1392,18 +1429,22 @@
 	return len;
 }
 
-static void
+static int
 ip_vs_send_sync_msg(struct socket *sock, struct ip_vs_sync_mesg *msg)
 {
 	int msize;
+	int ret;
 
 	msize = msg->size;
 
 	/* Put size in network byte order */
 	msg->size = htons(msg->size);
 
-	if (ip_vs_send_async(sock, (char *)msg, msize) != msize)
-		pr_err("ip_vs_send_async error\n");
+	ret = ip_vs_send_async(sock, (char *)msg, msize);
+	if (ret >= 0 || ret == -EAGAIN)
+		return ret;
+	pr_err("ip_vs_send_async error %d\n", ret);
+	return 0;
 }
 
 static int
@@ -1428,36 +1469,75 @@
 	return len;
 }
 
+/* Wakeup the master thread for sending */
+static void master_wakeup_work_handler(struct work_struct *work)
+{
+	struct netns_ipvs *ipvs = container_of(work, struct netns_ipvs,
+					       master_wakeup_work.work);
+
+	spin_lock_bh(&ipvs->sync_lock);
+	if (ipvs->sync_queue_len &&
+	    ipvs->sync_queue_delay < IPVS_SYNC_WAKEUP_RATE) {
+		ipvs->sync_queue_delay = IPVS_SYNC_WAKEUP_RATE;
+		wake_up_process(ipvs->master_thread);
+	}
+	spin_unlock_bh(&ipvs->sync_lock);
+}
+
+/* Get next buffer to send */
+static inline struct ip_vs_sync_buff *
+next_sync_buff(struct netns_ipvs *ipvs)
+{
+	struct ip_vs_sync_buff *sb;
+
+	sb = sb_dequeue(ipvs);
+	if (sb)
+		return sb;
+	/* Do not delay entries in buffer for more than 2 seconds */
+	return get_curr_sync_buff(ipvs, 2 * HZ);
+}
 
 static int sync_thread_master(void *data)
 {
 	struct ip_vs_sync_thread_data *tinfo = data;
 	struct netns_ipvs *ipvs = net_ipvs(tinfo->net);
+	struct sock *sk = tinfo->sock->sk;
 	struct ip_vs_sync_buff *sb;
 
 	pr_info("sync thread started: state = MASTER, mcast_ifn = %s, "
 		"syncid = %d\n",
 		ipvs->master_mcast_ifn, ipvs->master_syncid);
 
-	while (!kthread_should_stop()) {
-		while ((sb = sb_dequeue(ipvs))) {
-			ip_vs_send_sync_msg(tinfo->sock, sb->mesg);
-			ip_vs_sync_buff_release(sb);
+	for (;;) {
+		sb = next_sync_buff(ipvs);
+		if (unlikely(kthread_should_stop()))
+			break;
+		if (!sb) {
+			schedule_timeout(IPVS_SYNC_CHECK_PERIOD);
+			continue;
 		}
+		while (ip_vs_send_sync_msg(tinfo->sock, sb->mesg) < 0) {
+			int ret = 0;
 
-		/* check if entries stay in ipvs->sync_buff for 2 seconds */
-		sb = get_curr_sync_buff(ipvs, 2 * HZ);
-		if (sb) {
-			ip_vs_send_sync_msg(tinfo->sock, sb->mesg);
-			ip_vs_sync_buff_release(sb);
+			__wait_event_interruptible(*sk_sleep(sk),
+						   sock_writeable(sk) ||
+						   kthread_should_stop(),
+						   ret);
+			if (unlikely(kthread_should_stop()))
+				goto done;
 		}
-
-		schedule_timeout_interruptible(HZ);
+		ip_vs_sync_buff_release(sb);
 	}
 
+done:
+	__set_current_state(TASK_RUNNING);
+	if (sb)
+		ip_vs_sync_buff_release(sb);
+
 	/* clean up the sync_buff queue */
 	while ((sb = sb_dequeue(ipvs)))
 		ip_vs_sync_buff_release(sb);
+	__set_current_state(TASK_RUNNING);
 
 	/* clean up the current sync_buff */
 	sb = get_curr_sync_buff(ipvs, 0);
@@ -1538,6 +1618,10 @@
 		realtask = &ipvs->master_thread;
 		name = "ipvs_master:%d";
 		threadfn = sync_thread_master;
+		ipvs->sync_queue_len = 0;
+		ipvs->sync_queue_delay = 0;
+		INIT_DELAYED_WORK(&ipvs->master_wakeup_work,
+				  master_wakeup_work_handler);
 		sock = make_send_sock(net);
 	} else if (state == IP_VS_STATE_BACKUP) {
 		if (ipvs->backup_thread)
@@ -1623,6 +1707,7 @@
 		spin_lock_bh(&ipvs->sync_lock);
 		ipvs->sync_state &= ~IP_VS_STATE_MASTER;
 		spin_unlock_bh(&ipvs->sync_lock);
+		cancel_delayed_work_sync(&ipvs->master_wakeup_work);
 		retc = kthread_stop(ipvs->master_thread);
 		ipvs->master_thread = NULL;
 	} else if (state == IP_VS_STATE_BACKUP) {