tipc: connection oriented transport uses new send functions

We move the message sending across established connections
to use the message preparation and send functions introduced
earlier in this series. We now do the message preparation
and call to the link send function directly from the socket,
instead of going via the port layer.

As a consequence of this change, the functions tipc_send(),
tipc_port_iovec_rcv(), tipc_port_iovec_reject() and tipc_reject_msg()
become unreferenced and can be eliminated from port.c. For the same
reason, the functions tipc_link_xmit_fast(), tipc_link_iovec_xmit_long()
and tipc_link_iovec_fast() can be eliminated from link.c.

Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
Reviewed-by: Erik Hugne <erik.hugne@ericsson.com>
Reviewed-by: Ying Xue <ying.xue@windriver.com>
Signed-off-by: David S. Miller <davem@davemloft.net>
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index 5690751..bfe79bb 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -206,6 +206,7 @@
 	sk->sk_data_ready = tipc_data_ready;
 	sk->sk_write_space = tipc_write_space;
 	tsk->conn_timeout = CONN_TIMEOUT_DEFAULT;
+	tsk->port.sent = 0;
 	atomic_set(&tsk->dupl_rcvcnt, 0);
 	tipc_port_unlock(port);
 
@@ -784,64 +785,11 @@
 }
 
 /**
- * tipc_send_packet - send a connection-oriented message
- * @iocb: if NULL, indicates that socket lock is already held
- * @sock: socket structure
- * @m: message to send
- * @total_len: length of message
- *
- * Used for SOCK_SEQPACKET messages and SOCK_STREAM data.
- *
- * Returns the number of bytes sent on success, or errno otherwise
- */
-static int tipc_send_packet(struct kiocb *iocb, struct socket *sock,
-			    struct msghdr *m, size_t total_len)
-{
-	struct sock *sk = sock->sk;
-	struct tipc_sock *tsk = tipc_sk(sk);
-	DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
-	int res = -EINVAL;
-	long timeo;
-
-	/* Handle implied connection establishment */
-	if (unlikely(dest))
-		return tipc_sendmsg(iocb, sock, m, total_len);
-
-	if (total_len > TIPC_MAX_USER_MSG_SIZE)
-		return -EMSGSIZE;
-
-	if (iocb)
-		lock_sock(sk);
-
-	if (unlikely(sock->state != SS_CONNECTED)) {
-		if (sock->state == SS_DISCONNECTING)
-			res = -EPIPE;
-		else
-			res = -ENOTCONN;
-		goto exit;
-	}
-
-	timeo = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
-	do {
-		res = tipc_send(&tsk->port, m->msg_iov, total_len);
-		if (likely(res != -ELINKCONG))
-			break;
-		res = tipc_wait_for_sndpkt(sock, &timeo);
-		if (res)
-			break;
-	} while (1);
-exit:
-	if (iocb)
-		release_sock(sk);
-	return res;
-}
-
-/**
  * tipc_send_stream - send stream-oriented data
  * @iocb: (unused)
  * @sock: socket structure
  * @m: data to send
- * @total_len: total length of data to be sent
+ * @dsz: total length of data to be transmitted
  *
  * Used for SOCK_STREAM data.
  *
@@ -849,89 +797,97 @@
  * or errno if no data sent
  */
 static int tipc_send_stream(struct kiocb *iocb, struct socket *sock,
-			    struct msghdr *m, size_t total_len)
+			    struct msghdr *m, size_t dsz)
 {
 	struct sock *sk = sock->sk;
 	struct tipc_sock *tsk = tipc_sk(sk);
-	struct msghdr my_msg;
-	struct iovec my_iov;
-	struct iovec *curr_iov;
-	int curr_iovlen;
-	char __user *curr_start;
-	u32 hdr_size;
-	int curr_left;
-	int bytes_to_send;
-	int bytes_sent;
-	int res;
+	struct tipc_port *port = &tsk->port;
+	struct tipc_msg *mhdr = &port->phdr;
+	struct sk_buff *buf;
+	DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
+	u32 ref = port->ref;
+	int rc = -EINVAL;
+	long timeo;
+	u32 dnode;
+	uint mtu, send, sent = 0;
 
-	lock_sock(sk);
+	/* Handle implied connection establishment */
+	if (unlikely(dest)) {
+		rc = tipc_sendmsg(iocb, sock, m, dsz);
+		if (dsz && (dsz == rc))
+			tsk->port.sent = 1;
+		return rc;
+	}
+	if (dsz > (uint)INT_MAX)
+		return -EMSGSIZE;
 
-	/* Handle special cases where there is no connection */
+	if (iocb)
+		lock_sock(sk);
+
 	if (unlikely(sock->state != SS_CONNECTED)) {
-		if (sock->state == SS_UNCONNECTED)
-			res = tipc_send_packet(NULL, sock, m, total_len);
+		if (sock->state == SS_DISCONNECTING)
+			rc = -EPIPE;
 		else
-			res = sock->state == SS_DISCONNECTING ? -EPIPE : -ENOTCONN;
+			rc = -ENOTCONN;
 		goto exit;
 	}
 
-	if (unlikely(m->msg_name)) {
-		res = -EISCONN;
+	timeo = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
+	dnode = tipc_port_peernode(port);
+	port->congested = 1;
+
+next:
+	mtu = port->max_pkt;
+	send = min_t(uint, dsz - sent, TIPC_MAX_USER_MSG_SIZE);
+	rc = tipc_msg_build2(mhdr, m->msg_iov, sent, send, mtu, &buf);
+	if (unlikely(rc < 0))
 		goto exit;
-	}
-
-	if (total_len > (unsigned int)INT_MAX) {
-		res = -EMSGSIZE;
-		goto exit;
-	}
-
-	/*
-	 * Send each iovec entry using one or more messages
-	 *
-	 * Note: This algorithm is good for the most likely case
-	 * (i.e. one large iovec entry), but could be improved to pass sets
-	 * of small iovec entries into send_packet().
-	 */
-	curr_iov = m->msg_iov;
-	curr_iovlen = m->msg_iovlen;
-	my_msg.msg_iov = &my_iov;
-	my_msg.msg_iovlen = 1;
-	my_msg.msg_flags = m->msg_flags;
-	my_msg.msg_name = NULL;
-	bytes_sent = 0;
-
-	hdr_size = msg_hdr_sz(&tsk->port.phdr);
-
-	while (curr_iovlen--) {
-		curr_start = curr_iov->iov_base;
-		curr_left = curr_iov->iov_len;
-
-		while (curr_left) {
-			bytes_to_send = tsk->port.max_pkt - hdr_size;
-			if (bytes_to_send > TIPC_MAX_USER_MSG_SIZE)
-				bytes_to_send = TIPC_MAX_USER_MSG_SIZE;
-			if (curr_left < bytes_to_send)
-				bytes_to_send = curr_left;
-			my_iov.iov_base = curr_start;
-			my_iov.iov_len = bytes_to_send;
-			res = tipc_send_packet(NULL, sock, &my_msg,
-					       bytes_to_send);
-			if (res < 0) {
-				if (bytes_sent)
-					res = bytes_sent;
-				goto exit;
+	do {
+		port->congested = 1;
+		if (likely(!tipc_port_congested(port))) {
+			rc = tipc_link_xmit2(buf, dnode, ref);
+			if (likely(!rc)) {
+				port->sent++;
+				sent += send;
+				if (sent == dsz)
+					break;
+				goto next;
 			}
-			curr_left -= bytes_to_send;
-			curr_start += bytes_to_send;
-			bytes_sent += bytes_to_send;
+			if (rc == -EMSGSIZE) {
+				port->max_pkt = tipc_node_get_mtu(dnode, ref);
+				goto next;
+			}
+			if (rc != -ELINKCONG)
+				break;
 		}
+		rc = tipc_wait_for_sndpkt(sock, &timeo);
+	} while (!rc);
 
-		curr_iov++;
-	}
-	res = bytes_sent;
+	port->congested = 0;
 exit:
-	release_sock(sk);
-	return res;
+	if (iocb)
+		release_sock(sk);
+	return sent ? sent : rc;
+}
+
+/**
+ * tipc_send_packet - send a connection-oriented message
+ * @iocb: if NULL, indicates that socket lock is already held
+ * @sock: socket structure
+ * @m: message to send
+ * @dsz: length of data to be transmitted
+ *
+ * Used for SOCK_SEQPACKET messages.
+ *
+ * Returns the number of bytes sent on success, or errno otherwise
+ */
+static int tipc_send_packet(struct kiocb *iocb, struct socket *sock,
+			    struct msghdr *m, size_t dsz)
+{
+	if (dsz > TIPC_MAX_USER_MSG_SIZE)
+		return -EMSGSIZE;
+
+	return tipc_send_stream(iocb, sock, m, dsz);
 }
 
 /**