Merge tag 'rxrpc-rewrite-20160913-1' of git://git.kernel.org/pub/scm/linux/kernel/git/dhowells/linux-fs

David Howells says:

====================
rxrpc: Miscellaneous fixes

Here's a set of miscellaneous fix patches.  There are a couple of points of
note:

 (1) There is one non-fix patch that adjusts the call ref tracking
     tracepoint to make kernel API-held refs on calls more obvious.  This
     is a prerequisite for the patch that fixes prealloc refcounting.

 (2) The final patch alters how jumbo packets that partially exceed the
     receive window are handled.  Previously, space was being left in the
     Rx buffer for them, but this significantly hurts performance as the Rx
     window can't be increased to match the OpenAFS Tx window size.

     Instead, the excess subpackets are discarded and an EXCEEDS_WINDOW ACK
     is generated for the first.  To avoid the problem of someone trying to
     run the kernel out of space by feeding the kernel a series of
     overlapping maximal jumbo packets, we stop allowing jumbo packets on a
     call if we encounter more than three jumbo packets with duplicate or
     excessive subpackets.
====================

Reviewed-by: Steve Wise <swise@opengridcomputing.com>
Signed-off-by: David S. Miller <davem@davemloft.net>
diff --git a/net/rxrpc/af_rxrpc.c b/net/rxrpc/af_rxrpc.c
index caa226d..25d00de 100644
--- a/net/rxrpc/af_rxrpc.c
+++ b/net/rxrpc/af_rxrpc.c
@@ -299,7 +299,7 @@
 {
 	_enter("%d{%d}", call->debug_id, atomic_read(&call->usage));
 	rxrpc_release_call(rxrpc_sk(sock->sk), call);
-	rxrpc_put_call(call, rxrpc_call_put);
+	rxrpc_put_call(call, rxrpc_call_put_kernel);
 }
 EXPORT_SYMBOL(rxrpc_kernel_end_call);
 
diff --git a/net/rxrpc/ar-internal.h b/net/rxrpc/ar-internal.h
index b1cb79e..e78c40b 100644
--- a/net/rxrpc/ar-internal.h
+++ b/net/rxrpc/ar-internal.h
@@ -498,6 +498,7 @@
 	 */
 #define RXRPC_RXTX_BUFF_SIZE	64
 #define RXRPC_RXTX_BUFF_MASK	(RXRPC_RXTX_BUFF_SIZE - 1)
+#define RXRPC_INIT_RX_WINDOW_SIZE 32
 	struct sk_buff		**rxtx_buffer;
 	u8			*rxtx_annotations;
 #define RXRPC_TX_ANNO_ACK	0
@@ -518,7 +519,7 @@
 	rxrpc_seq_t		rx_expect_next;	/* Expected next packet sequence number */
 	u8			rx_winsize;	/* Size of Rx window */
 	u8			tx_winsize;	/* Maximum size of Tx window */
-	u8			nr_jumbo_dup;	/* Number of jumbo duplicates */
+	u8			nr_jumbo_bad;	/* Number of jumbo dups/exceeds-windows */
 
 	/* receive-phase ACK management */
 	u8			ackr_reason;	/* reason to ACK */
@@ -540,8 +541,10 @@
 	rxrpc_call_seen,
 	rxrpc_call_got,
 	rxrpc_call_got_userid,
+	rxrpc_call_got_kernel,
 	rxrpc_call_put,
 	rxrpc_call_put_userid,
+	rxrpc_call_put_kernel,
 	rxrpc_call_put_noqueue,
 	rxrpc_call__nr_trace
 };
diff --git a/net/rxrpc/call_accept.c b/net/rxrpc/call_accept.c
index b8acec0..26c293e 100644
--- a/net/rxrpc/call_accept.c
+++ b/net/rxrpc/call_accept.c
@@ -121,7 +121,7 @@
 
 		call->user_call_ID = user_call_ID;
 		call->notify_rx = notify_rx;
-		rxrpc_get_call(call, rxrpc_call_got);
+		rxrpc_get_call(call, rxrpc_call_got_kernel);
 		user_attach_call(call, user_call_ID);
 		rxrpc_get_call(call, rxrpc_call_got_userid);
 		rb_link_node(&call->sock_node, parent, pp);
@@ -221,6 +221,7 @@
 		if (rx->discard_new_call) {
 			_debug("discard %lx", call->user_call_ID);
 			rx->discard_new_call(call, call->user_call_ID);
+			rxrpc_put_call(call, rxrpc_call_put_kernel);
 		}
 		rxrpc_call_completed(call);
 		rxrpc_release_call(rx, call);
@@ -300,6 +301,7 @@
 	smp_store_release(&b->call_backlog_tail,
 			  (call_tail + 1) & (RXRPC_BACKLOG_MAX - 1));
 
+	rxrpc_see_call(call);
 	call->conn = conn;
 	call->peer = rxrpc_get_peer(conn->params.peer);
 	return call;
@@ -401,6 +403,13 @@
 	if (call->state == RXRPC_CALL_SERVER_ACCEPTING)
 		rxrpc_notify_socket(call);
 
+	/* We have to discard the prealloc queue's ref here and rely on a
+	 * combination of the RCU read lock and refs held either by the socket
+	 * (recvmsg queue, to-be-accepted queue or user ID tree) or the kernel
+	 * service to prevent the call from being deallocated too early.
+	 */
+	rxrpc_put_call(call, rxrpc_call_put);
+
 	_leave(" = %p{%d}", call, call->debug_id);
 out:
 	spin_unlock(&rx->incoming_lock);
@@ -425,9 +434,11 @@
 
 	write_lock(&rx->call_lock);
 
-	ret = -ENODATA;
-	if (list_empty(&rx->to_be_accepted))
-		goto out;
+	if (list_empty(&rx->to_be_accepted)) {
+		write_unlock(&rx->call_lock);
+		kleave(" = -ENODATA [empty]");
+		return ERR_PTR(-ENODATA);
+	}
 
 	/* check the user ID isn't already in use */
 	pp = &rx->calls.rb_node;
@@ -466,7 +477,6 @@
 	}
 
 	/* formalise the acceptance */
-	rxrpc_get_call(call, rxrpc_call_got);
 	call->notify_rx = notify_rx;
 	call->user_call_ID = user_call_ID;
 	rxrpc_get_call(call, rxrpc_call_got_userid);
diff --git a/net/rxrpc/call_event.c b/net/rxrpc/call_event.c
index 2b976e7..6143204 100644
--- a/net/rxrpc/call_event.c
+++ b/net/rxrpc/call_event.c
@@ -95,7 +95,7 @@
 		break;
 
 	case RXRPC_ACK_IDLE:
-		if (rxrpc_soft_ack_delay < expiry)
+		if (rxrpc_idle_ack_delay < expiry)
 			expiry = rxrpc_idle_ack_delay;
 		break;
 
diff --git a/net/rxrpc/call_object.c b/net/rxrpc/call_object.c
index 18ab13f..22f9b0d 100644
--- a/net/rxrpc/call_object.c
+++ b/net/rxrpc/call_object.c
@@ -56,8 +56,10 @@
 	[rxrpc_call_seen]		= "SEE",
 	[rxrpc_call_got]		= "GOT",
 	[rxrpc_call_got_userid]		= "Gus",
+	[rxrpc_call_got_kernel]		= "Gke",
 	[rxrpc_call_put]		= "PUT",
 	[rxrpc_call_put_userid]		= "Pus",
+	[rxrpc_call_put_kernel]		= "Pke",
 	[rxrpc_call_put_noqueue]	= "PNQ",
 };
 
@@ -150,7 +152,7 @@
 	memset(&call->sock_node, 0xed, sizeof(call->sock_node));
 
 	/* Leave space in the ring to handle a maxed-out jumbo packet */
-	call->rx_winsize = RXRPC_RXTX_BUFF_SIZE - 1 - 46;
+	call->rx_winsize = rxrpc_rx_window_size;
 	call->tx_winsize = 16;
 	call->rx_expect_next = 1;
 	return call;
@@ -462,9 +464,6 @@
 		call->rxtx_buffer[i] = NULL;
 	}
 
-	/* We have to release the prealloc backlog ref */
-	if (rxrpc_is_service_call(call))
-		rxrpc_put_call(call, rxrpc_call_put);
 	_leave("");
 }
 
diff --git a/net/rxrpc/input.c b/net/rxrpc/input.c
index afeba98..75af0bd 100644
--- a/net/rxrpc/input.c
+++ b/net/rxrpc/input.c
@@ -59,6 +59,8 @@
 
 	spin_unlock(&call->lock);
 
+	wake_up(&call->waitq);
+
 	while (list) {
 		skb = list;
 		list = skb->next;
@@ -125,7 +127,7 @@
 {
 	struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
 	unsigned int offset = sp->offset;
-	unsigned int len = skb->data_len;
+	unsigned int len = skb->len;
 	int nr_jumbo = 1;
 	u8 flags = sp->hdr.flags;
 
@@ -162,7 +164,7 @@
  * (that information is encoded in the ACK packet).
  */
 static void rxrpc_input_dup_data(struct rxrpc_call *call, rxrpc_seq_t seq,
-				 u8 annotation, bool *_jumbo_dup)
+				 u8 annotation, bool *_jumbo_bad)
 {
 	/* Discard normal packets that are duplicates. */
 	if (annotation == 0)
@@ -172,9 +174,9 @@
 	 * more partially duplicate jumbo packets, we refuse to take any more
 	 * jumbos for this call.
 	 */
-	if (!*_jumbo_dup) {
-		call->nr_jumbo_dup++;
-		*_jumbo_dup = true;
+	if (!*_jumbo_bad) {
+		call->nr_jumbo_bad++;
+		*_jumbo_bad = true;
 	}
 }
 
@@ -189,12 +191,12 @@
 	unsigned int ix;
 	rxrpc_serial_t serial = sp->hdr.serial, ack_serial = 0;
 	rxrpc_seq_t seq = sp->hdr.seq, hard_ack;
-	bool immediate_ack = false, jumbo_dup = false, queued;
+	bool immediate_ack = false, jumbo_bad = false, queued;
 	u16 len;
 	u8 ack = 0, flags, annotation = 0;
 
 	_enter("{%u,%u},{%u,%u}",
-	       call->rx_hard_ack, call->rx_top, skb->data_len, seq);
+	       call->rx_hard_ack, call->rx_top, skb->len, seq);
 
 	_proto("Rx DATA %%%u { #%u f=%02x }",
 	       sp->hdr.serial, seq, sp->hdr.flags);
@@ -220,7 +222,7 @@
 
 	flags = sp->hdr.flags;
 	if (flags & RXRPC_JUMBO_PACKET) {
-		if (call->nr_jumbo_dup > 3) {
+		if (call->nr_jumbo_bad > 3) {
 			ack = RXRPC_ACK_NOSPACE;
 			ack_serial = serial;
 			goto ack;
@@ -231,7 +233,7 @@
 next_subpacket:
 	queued = false;
 	ix = seq & RXRPC_RXTX_BUFF_MASK;
-	len = skb->data_len;
+	len = skb->len;
 	if (flags & RXRPC_JUMBO_PACKET)
 		len = RXRPC_JUMBO_DATALEN;
 
@@ -257,7 +259,7 @@
 	}
 
 	if (call->rxtx_buffer[ix]) {
-		rxrpc_input_dup_data(call, seq, annotation, &jumbo_dup);
+		rxrpc_input_dup_data(call, seq, annotation, &jumbo_bad);
 		if (ack != RXRPC_ACK_DUPLICATE) {
 			ack = RXRPC_ACK_DUPLICATE;
 			ack_serial = serial;
@@ -302,6 +304,15 @@
 		annotation++;
 		if (flags & RXRPC_JUMBO_PACKET)
 			annotation |= RXRPC_RX_ANNO_JLAST;
+		if (after(seq, hard_ack + call->rx_winsize)) {
+			ack = RXRPC_ACK_EXCEEDS_WINDOW;
+			ack_serial = serial;
+			if (!jumbo_bad) {
+				call->nr_jumbo_bad++;
+				jumbo_bad = true;
+			}
+			goto ack;
+		}
 
 		_proto("Rx DATA Jumbo %%%u", serial);
 		goto next_subpacket;
@@ -331,14 +342,16 @@
 	struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
 	struct rxrpc_peer *peer;
 	unsigned int mtu;
+	u32 rwind = ntohl(ackinfo->rwind);
 
 	_proto("Rx ACK %%%u Info { rx=%u max=%u rwin=%u jm=%u }",
 	       sp->hdr.serial,
 	       ntohl(ackinfo->rxMTU), ntohl(ackinfo->maxMTU),
-	       ntohl(ackinfo->rwind), ntohl(ackinfo->jumbo_max));
+	       rwind, ntohl(ackinfo->jumbo_max));
 
-	if (call->tx_winsize > ntohl(ackinfo->rwind))
-		call->tx_winsize = ntohl(ackinfo->rwind);
+	if (rwind > RXRPC_RXTX_BUFF_SIZE - 1)
+		rwind = RXRPC_RXTX_BUFF_SIZE - 1;
+	call->tx_winsize = rwind;
 
 	mtu = min(ntohl(ackinfo->rxMTU), ntohl(ackinfo->maxMTU));
 
@@ -442,7 +455,7 @@
 	}
 
 	offset = sp->offset + nr_acks + 3;
-	if (skb->data_len >= offset + sizeof(buf.info)) {
+	if (skb->len >= offset + sizeof(buf.info)) {
 		if (skb_copy_bits(skb, offset, &buf.info, sizeof(buf.info)) < 0)
 			return rxrpc_proto_abort("XAI", call, 0);
 		rxrpc_input_ackinfo(call, skb, &buf.info);
diff --git a/net/rxrpc/misc.c b/net/rxrpc/misc.c
index fd096f7..8b91078 100644
--- a/net/rxrpc/misc.c
+++ b/net/rxrpc/misc.c
@@ -50,7 +50,10 @@
  * limit is hit, we should generate an EXCEEDS_WINDOW ACK and discard further
  * packets.
  */
-unsigned int rxrpc_rx_window_size = RXRPC_RXTX_BUFF_SIZE - 46;
+unsigned int rxrpc_rx_window_size = RXRPC_INIT_RX_WINDOW_SIZE;
+#if (RXRPC_RXTX_BUFF_SIZE - 1) < RXRPC_INIT_RX_WINDOW_SIZE
+#error Need to reduce RXRPC_INIT_RX_WINDOW_SIZE
+#endif
 
 /*
  * Maximum Rx MTU size.  This indicates to the sender the size of jumbo packet
diff --git a/net/rxrpc/output.c b/net/rxrpc/output.c
index 719a4c2..90c7722 100644
--- a/net/rxrpc/output.c
+++ b/net/rxrpc/output.c
@@ -71,10 +71,10 @@
 
 	mtu = call->conn->params.peer->if_mtu;
 	mtu -= call->conn->params.peer->hdrsize;
-	jmax = (call->nr_jumbo_dup > 3) ? 1 : rxrpc_rx_jumbo_max;
+	jmax = (call->nr_jumbo_bad > 3) ? 1 : rxrpc_rx_jumbo_max;
 	pkt->ackinfo.rxMTU	= htonl(rxrpc_rx_mtu);
 	pkt->ackinfo.maxMTU	= htonl(mtu);
-	pkt->ackinfo.rwind	= htonl(rxrpc_rx_window_size);
+	pkt->ackinfo.rwind	= htonl(call->rx_winsize);
 	pkt->ackinfo.jumbo_max	= htonl(jmax);
 
 	*ackp++ = 0;
diff --git a/net/rxrpc/peer_object.c b/net/rxrpc/peer_object.c
index 2efe29a..3e6cd17 100644
--- a/net/rxrpc/peer_object.c
+++ b/net/rxrpc/peer_object.c
@@ -203,6 +203,7 @@
  */
 static void rxrpc_init_peer(struct rxrpc_peer *peer, unsigned long hash_key)
 {
+	peer->hash_key = hash_key;
 	rxrpc_assess_MTU_size(peer);
 	peer->mtu = peer->if_mtu;
 
@@ -238,7 +239,6 @@
 
 	peer = rxrpc_alloc_peer(local, gfp);
 	if (peer) {
-		peer->hash_key = hash_key;
 		memcpy(&peer->srx, srx, sizeof(*srx));
 		rxrpc_init_peer(peer, hash_key);
 	}
diff --git a/net/rxrpc/recvmsg.c b/net/rxrpc/recvmsg.c
index 20d0b5c..a284205 100644
--- a/net/rxrpc/recvmsg.c
+++ b/net/rxrpc/recvmsg.c
@@ -118,6 +118,7 @@
 		list_del_init(&call->recvmsg_link);
 		write_unlock_bh(&rx->recvmsg_lock);
 
+		rxrpc_get_call(call, rxrpc_call_got);
 		write_lock(&rx->call_lock);
 		list_add_tail(&call->accept_link, &rx->to_be_accepted);
 		write_unlock(&rx->call_lock);
@@ -463,6 +464,10 @@
 					 flags, &copied);
 		if (ret == -EAGAIN)
 			ret = 0;
+
+		if (after(call->rx_top, call->rx_hard_ack) &&
+		    call->rxtx_buffer[(call->rx_hard_ack + 1) & RXRPC_RXTX_BUFF_MASK])
+			rxrpc_notify_socket(call);
 		break;
 	default:
 		ret = 0;
diff --git a/net/rxrpc/sysctl.c b/net/rxrpc/sysctl.c
index b7ca8cf..a03c61c 100644
--- a/net/rxrpc/sysctl.c
+++ b/net/rxrpc/sysctl.c
@@ -20,7 +20,7 @@
 static const unsigned int four = 4;
 static const unsigned int thirtytwo = 32;
 static const unsigned int n_65535 = 65535;
-static const unsigned int n_max_acks = RXRPC_MAXACKS;
+static const unsigned int n_max_acks = RXRPC_RXTX_BUFF_SIZE - 1;
 
 /*
  * RxRPC operating parameters.