net/smc: urgent data support

Add support for out of band data send and receive.

Signed-off-by: Stefan Raspl <raspl@linux.ibm.com>
Signed-off-by: Ursula Braun <ubraun@linux.ibm.com>
Signed-off-by: David S. Miller <davem@davemloft.net>
diff --git a/net/smc/smc_rx.c b/net/smc/smc_rx.c
index 290a434..3d77b38 100644
--- a/net/smc/smc_rx.c
+++ b/net/smc/smc_rx.c
@@ -47,16 +47,59 @@
  *   @conn   connection to update
  *   @cons   consumer cursor
  *   @len    number of Bytes consumed
+ *   Returns:
+ *   1 if we should end our receive, 0 otherwise
  */
-static void smc_rx_update_consumer(struct smc_connection *conn,
-				   union smc_host_cursor cons, size_t len)
+static int smc_rx_update_consumer(struct smc_sock *smc,
+				  union smc_host_cursor cons, size_t len)
 {
+	struct smc_connection *conn = &smc->conn;
+	struct sock *sk = &smc->sk;
+	bool force = false;
+	int diff, rc = 0;
+
 	smc_curs_add(conn->rmb_desc->len, &cons, len);
+
+	/* did we process urgent data? */
+	if (conn->urg_state == SMC_URG_VALID || conn->urg_rx_skip_pend) {
+		diff = smc_curs_comp(conn->rmb_desc->len, &cons,
+				     &conn->urg_curs);
+		if (sock_flag(sk, SOCK_URGINLINE)) {
+			if (diff == 0) {
+				force = true;
+				rc = 1;
+				conn->urg_state = SMC_URG_READ;
+			}
+		} else {
+			if (diff == 1) {
+				/* skip urgent byte */
+				force = true;
+				smc_curs_add(conn->rmb_desc->len, &cons, 1);
+				conn->urg_rx_skip_pend = false;
+			} else if (diff < -1)
+				/* we read past urgent byte */
+				conn->urg_state = SMC_URG_READ;
+		}
+	}
+
 	smc_curs_write(&conn->local_tx_ctrl.cons, smc_curs_read(&cons, conn),
 		       conn);
+
 	/* send consumer cursor update if required */
 	/* similar to advertising new TCP rcv_wnd if required */
-	smc_tx_consumer_update(conn);
+	smc_tx_consumer_update(conn, force);
+
+	return rc;
+}
+
+static void smc_rx_update_cons(struct smc_sock *smc, size_t len)
+{
+	struct smc_connection *conn = &smc->conn;
+	union smc_host_cursor cons;
+
+	smc_curs_write(&cons, smc_curs_read(&conn->local_tx_ctrl.cons, conn),
+		       conn);
+	smc_rx_update_consumer(smc, cons, len);
 }
 
 struct smc_spd_priv {
@@ -70,7 +113,6 @@
 	struct smc_spd_priv *priv = (struct smc_spd_priv *)buf->private;
 	struct smc_sock *smc = priv->smc;
 	struct smc_connection *conn;
-	union smc_host_cursor cons;
 	struct sock *sk = &smc->sk;
 
 	if (sk->sk_state == SMC_CLOSED ||
@@ -79,9 +121,7 @@
 		goto out;
 	conn = &smc->conn;
 	lock_sock(sk);
-	smc_curs_write(&cons, smc_curs_read(&conn->local_tx_ctrl.cons, conn),
-		       conn);
-	smc_rx_update_consumer(conn, cons, priv->len);
+	smc_rx_update_cons(smc, priv->len);
 	release_sock(sk);
 	if (atomic_sub_and_test(priv->len, &conn->splice_pending))
 		smc_rx_wake_up(sk);
@@ -184,6 +224,52 @@
 	return rc;
 }
 
+static int smc_rx_recv_urg(struct smc_sock *smc, struct msghdr *msg, int len,
+			   int flags)
+{
+	struct smc_connection *conn = &smc->conn;
+	union smc_host_cursor cons;
+	struct sock *sk = &smc->sk;
+	int rc = 0;
+
+	if (sock_flag(sk, SOCK_URGINLINE) ||
+	    !(conn->urg_state == SMC_URG_VALID) ||
+	    conn->urg_state == SMC_URG_READ)
+		return -EINVAL;
+
+	if (conn->urg_state == SMC_URG_VALID) {
+		if (!(flags & MSG_PEEK))
+			smc->conn.urg_state = SMC_URG_READ;
+		msg->msg_flags |= MSG_OOB;
+		if (len > 0) {
+			if (!(flags & MSG_TRUNC))
+				rc = memcpy_to_msg(msg, &conn->urg_rx_byte, 1);
+			len = 1;
+			smc_curs_write(&cons,
+				       smc_curs_read(&conn->local_tx_ctrl.cons,
+						     conn),
+				       conn);
+			if (smc_curs_diff(conn->rmb_desc->len, &cons,
+					  &conn->urg_curs) > 1)
+				conn->urg_rx_skip_pend = true;
+			/* Urgent Byte was already accounted for, but trigger
+			 * skipping the urgent byte in non-inline case
+			 */
+			if (!(flags & MSG_PEEK))
+				smc_rx_update_consumer(smc, cons, 0);
+		} else {
+			msg->msg_flags |= MSG_TRUNC;
+		}
+
+		return rc ? -EFAULT : len;
+	}
+
+	if (sk->sk_state == SMC_CLOSED || sk->sk_shutdown & RCV_SHUTDOWN)
+		return 0;
+
+	return -EAGAIN;
+}
+
 /* smc_rx_recvmsg - receive data from RMBE
  * @msg:	copy data to receive buffer
  * @pipe:	copy data to pipe if set - indicates splice() call
@@ -209,12 +295,12 @@
 
 	if (unlikely(flags & MSG_ERRQUEUE))
 		return -EINVAL; /* future work for sk.sk_family == AF_SMC */
-	if (flags & MSG_OOB)
-		return -EINVAL; /* future work */
 
 	sk = &smc->sk;
 	if (sk->sk_state == SMC_LISTEN)
 		return -ENOTCONN;
+	if (flags & MSG_OOB)
+		return smc_rx_recv_urg(smc, msg, len, flags);
 	timeo = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
 	target = sock_rcvlowat(sk, flags & MSG_WAITALL, len);
 
@@ -227,6 +313,9 @@
 
 		if (atomic_read(&conn->bytes_to_rcv))
 			goto copy;
+		else if (conn->urg_state == SMC_URG_VALID)
+			/* we received a single urgent Byte - skip */
+			smc_rx_update_cons(smc, 0);
 
 		if (sk->sk_shutdown & RCV_SHUTDOWN ||
 		    smc_cdc_rxed_any_close_or_senddone(conn) ||
@@ -281,14 +370,18 @@
 			continue;
 		}
 
-		/* not more than what user space asked for */
-		copylen = min_t(size_t, read_remaining, readable);
 		smc_curs_write(&cons,
 			       smc_curs_read(&conn->local_tx_ctrl.cons, conn),
 			       conn);
 		/* subsequent splice() calls pick up where previous left */
 		if (splbytes)
 			smc_curs_add(conn->rmb_desc->len, &cons, splbytes);
+		if (conn->urg_state == SMC_URG_VALID &&
+		    sock_flag(&smc->sk, SOCK_URGINLINE) &&
+		    readable > 1)
+			readable--;	/* always stop at urgent Byte */
+		/* not more than what user space asked for */
+		copylen = min_t(size_t, read_remaining, readable);
 		/* determine chunks where to read from rcvbuf */
 		/* either unwrapped case, or 1st chunk of wrapped case */
 		chunk_len = min_t(size_t, copylen, conn->rmb_desc->len -
@@ -333,8 +426,8 @@
 			atomic_sub(copylen, &conn->bytes_to_rcv);
 			/* guarantee 0 <= bytes_to_rcv <= rmb_desc->len */
 			smp_mb__after_atomic();
-			if (msg)
-				smc_rx_update_consumer(conn, cons, copylen);
+			if (msg && smc_rx_update_consumer(smc, cons, copylen))
+				goto out;
 		}
 	} while (read_remaining);
 out:
@@ -346,4 +439,5 @@
 {
 	smc->sk.sk_data_ready = smc_rx_wake_up;
 	atomic_set(&smc->conn.splice_pending, 0);
+	smc->conn.urg_state = SMC_URG_READ;
 }