- djm@cvs.openbsd.org 2010/01/26 01:28:35
     [channels.c channels.h clientloop.c clientloop.h mux.c nchan.c ssh.c]
     rewrite ssh(1) multiplexing code to a more sensible protocol.

     The new multiplexing code uses channels for the listener and
     accepted control sockets to make the mux master non-blocking, so
     no stalls when processing messages from a slave.

     avoid use of fatal() in mux master protocol parsing so an errant slave
     process cannot take down a running master.

     implement requesting of port-forwards over multiplexed sessions. Any
     port forwards requested by the slave are added to those the master has
     established.

     add support for stdio forwarding ("ssh -W host:port ...") in mux slaves.

     document master/slave mux protocol so that other tools can use it to
     control a running ssh(1). Note: there are no guarantees that this
     protocol won't be incompatibly changed (though it is versioned).

     feedback Salvador Fandino, dtucker@
     channel changes ok markus@
diff --git a/channels.c b/channels.c
index e8589d8..8126167 100644
--- a/channels.c
+++ b/channels.c
@@ -1,4 +1,4 @@
-/* $OpenBSD: channels.c,v 1.301 2010/01/11 01:39:46 dtucker Exp $ */
+/* $OpenBSD: channels.c,v 1.302 2010/01/26 01:28:35 djm Exp $ */
 /*
  * Author: Tatu Ylonen <ylo@cs.hut.fi>
  * Copyright (c) 1995 Tatu Ylonen <ylo@cs.hut.fi>, Espoo, Finland
@@ -239,7 +239,6 @@
 	c->rfd = rfd;
 	c->wfd = wfd;
 	c->sock = (rfd == wfd) ? rfd : -1;
-	c->ctl_fd = -1; /* XXX: set elsewhere */
 	c->efd = efd;
 	c->extended_usage = extusage;
 
@@ -328,6 +327,9 @@
 	c->output_filter = NULL;
 	c->filter_ctx = NULL;
 	c->filter_cleanup = NULL;
+	c->ctl_chan = -1;
+	c->mux_rcb = NULL;
+	c->mux_ctx = NULL;
 	c->delayed = 1;		/* prevent call to channel_post handler */
 	TAILQ_INIT(&c->status_confirms);
 	debug("channel %d: new [%s]", found, remote_name);
@@ -370,11 +372,10 @@
 static void
 channel_close_fds(Channel *c)
 {
-	debug3("channel %d: close_fds r %d w %d e %d c %d",
-	    c->self, c->rfd, c->wfd, c->efd, c->ctl_fd);
+	debug3("channel %d: close_fds r %d w %d e %d",
+	    c->self, c->rfd, c->wfd, c->efd);
 
 	channel_close_fd(&c->sock);
-	channel_close_fd(&c->ctl_fd);
 	channel_close_fd(&c->rfd);
 	channel_close_fd(&c->wfd);
 	channel_close_fd(&c->efd);
@@ -400,8 +401,6 @@
 
 	if (c->sock != -1)
 		shutdown(c->sock, SHUT_RDWR);
-	if (c->ctl_fd != -1)
-		shutdown(c->ctl_fd, SHUT_RDWR);
 	channel_close_fds(c);
 	buffer_free(&c->input);
 	buffer_free(&c->output);
@@ -523,6 +522,7 @@
 		case SSH_CHANNEL_X11_LISTENER:
 		case SSH_CHANNEL_PORT_LISTENER:
 		case SSH_CHANNEL_RPORT_LISTENER:
+		case SSH_CHANNEL_MUX_LISTENER:
 		case SSH_CHANNEL_CLOSED:
 		case SSH_CHANNEL_AUTH_SOCKET:
 		case SSH_CHANNEL_DYNAMIC:
@@ -536,6 +536,7 @@
 		case SSH_CHANNEL_OPENING:
 		case SSH_CHANNEL_OPEN:
 		case SSH_CHANNEL_X11_OPEN:
+		case SSH_CHANNEL_MUX_CLIENT:
 			return 1;
 		case SSH_CHANNEL_INPUT_DRAINING:
 		case SSH_CHANNEL_OUTPUT_DRAINING:
@@ -567,6 +568,8 @@
 		case SSH_CHANNEL_X11_LISTENER:
 		case SSH_CHANNEL_PORT_LISTENER:
 		case SSH_CHANNEL_RPORT_LISTENER:
+		case SSH_CHANNEL_MUX_LISTENER:
+		case SSH_CHANNEL_MUX_CLIENT:
 		case SSH_CHANNEL_OPENING:
 		case SSH_CHANNEL_CONNECTING:
 		case SSH_CHANNEL_ZOMBIE:
@@ -617,6 +620,8 @@
 		case SSH_CHANNEL_CLOSED:
 		case SSH_CHANNEL_AUTH_SOCKET:
 		case SSH_CHANNEL_ZOMBIE:
+		case SSH_CHANNEL_MUX_CLIENT:
+		case SSH_CHANNEL_MUX_LISTENER:
 			continue;
 		case SSH_CHANNEL_LARVAL:
 		case SSH_CHANNEL_OPENING:
@@ -627,12 +632,12 @@
 		case SSH_CHANNEL_INPUT_DRAINING:
 		case SSH_CHANNEL_OUTPUT_DRAINING:
 			snprintf(buf, sizeof buf,
-			    "  #%d %.300s (t%d r%d i%d/%d o%d/%d fd %d/%d cfd %d)\r\n",
+			    "  #%d %.300s (t%d r%d i%d/%d o%d/%d fd %d/%d cc %d)\r\n",
 			    c->self, c->remote_name,
 			    c->type, c->remote_id,
 			    c->istate, buffer_len(&c->input),
 			    c->ostate, buffer_len(&c->output),
-			    c->rfd, c->wfd, c->ctl_fd);
+			    c->rfd, c->wfd, c->ctl_chan);
 			buffer_append(&buffer, buf, strlen(buf));
 			continue;
 		default:
@@ -839,9 +844,6 @@
 			FD_SET(c->efd, readset);
 	}
 	/* XXX: What about efd? races? */
-	if (compat20 && c->ctl_fd != -1 &&
-	    c->istate == CHAN_INPUT_OPEN && c->ostate == CHAN_OUTPUT_OPEN)
-		FD_SET(c->ctl_fd, readset);
 }
 
 /* ARGSUSED */
@@ -986,6 +988,28 @@
 	}
 }
 
+static void
+channel_pre_mux_client(Channel *c, fd_set *readset, fd_set *writeset)
+{
+	if (c->istate == CHAN_INPUT_OPEN &&
+	    buffer_check_alloc(&c->input, CHAN_RBUF))
+		FD_SET(c->rfd, readset);
+	if (c->istate == CHAN_INPUT_WAIT_DRAIN) {
+		/* clear buffer immediately (discard any partial packet) */
+		buffer_clear(&c->input);
+		chan_ibuf_empty(c);
+		/* Start output drain. XXX just kill chan? */
+		chan_rcvd_oclose(c);
+	}
+	if (c->ostate == CHAN_OUTPUT_OPEN ||
+	    c->ostate == CHAN_OUTPUT_WAIT_DRAIN) {
+		if (buffer_len(&c->output) > 0)
+			FD_SET(c->wfd, writeset);
+		else if (c->ostate == CHAN_OUTPUT_WAIT_DRAIN)
+			chan_obuf_empty(c);
+	}
+}
+
 /* try to decode a socks4 header */
 /* ARGSUSED */
 static int
@@ -1218,19 +1242,14 @@
 }
 
 Channel *
-channel_connect_stdio_fwd(const char *host_to_connect, u_short port_to_connect)
+channel_connect_stdio_fwd(const char *host_to_connect, u_short port_to_connect,
+    int in, int out)
 {
 	Channel *c;
-	int in, out;
 
 	debug("channel_connect_stdio_fwd %s:%d", host_to_connect,
 	    port_to_connect);
 
-	in = dup(STDIN_FILENO);
-	out = dup(STDOUT_FILENO);
-	if (in < 0 || out < 0)
-		fatal("channel_connect_stdio_fwd: dup() in/out failed");
-
 	c = channel_new("stdio-forward", SSH_CHANNEL_OPENING, in, out,
 	    -1, CHAN_TCP_WINDOW_DEFAULT, CHAN_TCP_PACKET_DEFAULT,
 	    0, "stdio-forward", /*nonblock*/0);
@@ -1749,36 +1768,6 @@
 	return 1;
 }
 
-/* ARGSUSED */
-static int
-channel_handle_ctl(Channel *c, fd_set *readset, fd_set *writeset)
-{
-	char buf[16];
-	int len;
-
-	/* Monitor control fd to detect if the slave client exits */
-	if (c->ctl_fd != -1 && FD_ISSET(c->ctl_fd, readset)) {
-		len = read(c->ctl_fd, buf, sizeof(buf));
-		if (len < 0 &&
-		    (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK))
-			return 1;
-		if (len <= 0) {
-			debug2("channel %d: ctl read<=0", c->self);
-			if (c->type != SSH_CHANNEL_OPEN) {
-				debug2("channel %d: not open", c->self);
-				chan_mark_dead(c);
-				return -1;
-			} else {
-				chan_read_failed(c);
-				chan_write_failed(c);
-			}
-			return -1;
-		} else
-			fatal("%s: unexpected data on ctl fd", __func__);
-	}
-	return 1;
-}
-
 static int
 channel_check_window(Channel *c)
 {
@@ -1809,10 +1798,131 @@
 	if (!compat20)
 		return;
 	channel_handle_efd(c, readset, writeset);
-	channel_handle_ctl(c, readset, writeset);
 	channel_check_window(c);
 }
 
+static u_int
+read_mux(Channel *c, u_int need)
+{
+	char buf[CHAN_RBUF];
+	int len;
+	u_int rlen;
+
+	if (buffer_len(&c->input) < need) {
+		rlen = need - buffer_len(&c->input);
+		len = read(c->rfd, buf, MIN(rlen, CHAN_RBUF));
+		if (len <= 0) {
+			if (errno != EINTR && errno != EAGAIN) {
+				debug2("channel %d: ctl read<=0 rfd %d len %d",
+				    c->self, c->rfd, len);
+				chan_read_failed(c);
+				return 0;
+			}
+		} else
+			buffer_append(&c->input, buf, len);
+	}
+	return buffer_len(&c->input);
+}
+
+static void
+channel_post_mux_client(Channel *c, fd_set *readset, fd_set *writeset)
+{
+	u_int need;
+	ssize_t len;
+
+	if (!compat20)
+		fatal("%s: entered with !compat20", __func__);
+
+	if (c->rfd != -1 && FD_ISSET(c->rfd, readset) &&
+	    (c->istate == CHAN_INPUT_OPEN ||
+	    c->istate == CHAN_INPUT_WAIT_DRAIN)) {
+		/*
+		 * Don't not read past the precise end of packets to
+		 * avoid disrupting fd passing.
+		 */
+		if (read_mux(c, 4) < 4) /* read header */
+			return;
+		need = get_u32(buffer_ptr(&c->input));
+#define CHANNEL_MUX_MAX_PACKET	(256 * 1024)
+		if (need > CHANNEL_MUX_MAX_PACKET) {
+			debug2("channel %d: packet too big %u > %u",
+			    c->self, CHANNEL_MUX_MAX_PACKET, need);
+			chan_rcvd_oclose(c);
+			return;
+		}
+		if (read_mux(c, need + 4) < need + 4) /* read body */
+			return;
+		if (c->mux_rcb(c) != 0) {
+			debug("channel %d: mux_rcb failed", c->self);
+			chan_mark_dead(c);
+			return;
+		}
+	}
+
+	if (c->wfd != -1 && FD_ISSET(c->wfd, writeset) &&
+	    buffer_len(&c->output) > 0) {
+		len = write(c->wfd, buffer_ptr(&c->output),
+		    buffer_len(&c->output));
+		if (len < 0 && (errno == EINTR || errno == EAGAIN))
+			return;
+		if (len <= 0) {
+			chan_mark_dead(c);
+			return;
+		}
+		buffer_consume(&c->output, len);
+	}
+}
+
+static void
+channel_post_mux_listener(Channel *c, fd_set *readset, fd_set *writeset)
+{
+	Channel *nc;
+	struct sockaddr_storage addr;
+	socklen_t addrlen;
+	int newsock;
+	uid_t euid;
+	gid_t egid;
+
+	if (!FD_ISSET(c->sock, readset))
+		return;
+
+	debug("multiplexing control connection");
+
+	/*
+	 * Accept connection on control socket
+	 */
+	memset(&addr, 0, sizeof(addr));
+	addrlen = sizeof(addr);
+	if ((newsock = accept(c->sock, (struct sockaddr*)&addr,
+	    &addrlen)) == -1) {
+		error("%s accept: %s", __func__, strerror(errno));
+		return;
+	}
+
+	if (getpeereid(newsock, &euid, &egid) < 0) {
+		error("%s getpeereid failed: %s", __func__,
+		    strerror(errno));
+		close(newsock);
+		return;
+	}
+	if ((euid != 0) && (getuid() != euid)) {
+		error("multiplex uid mismatch: peer euid %u != uid %u",
+		    (u_int)euid, (u_int)getuid());
+		close(newsock);
+		return;
+	}
+	nc = channel_new("multiplex client", SSH_CHANNEL_MUX_CLIENT,
+	    newsock, newsock, -1, c->local_window_max,
+	    c->local_maxpacket, 0, "mux-control", 1);
+	nc->mux_rcb = c->mux_rcb;
+	debug3("%s: new mux channel %d fd %d", __func__,
+	    nc->self, nc->sock);
+	/* establish state */
+	nc->mux_rcb(nc);
+	/* mux state transitions must not elicit protocol messages */
+	nc->flags |= CHAN_LOCAL;
+}
+
 /* ARGSUSED */
 static void
 channel_post_output_drain_13(Channel *c, fd_set *readset, fd_set *writeset)
@@ -1841,6 +1951,8 @@
 	channel_pre[SSH_CHANNEL_AUTH_SOCKET] =		&channel_pre_listener;
 	channel_pre[SSH_CHANNEL_CONNECTING] =		&channel_pre_connecting;
 	channel_pre[SSH_CHANNEL_DYNAMIC] =		&channel_pre_dynamic;
+	channel_pre[SSH_CHANNEL_MUX_LISTENER] =		&channel_pre_listener;
+	channel_pre[SSH_CHANNEL_MUX_CLIENT] =		&channel_pre_mux_client;
 
 	channel_post[SSH_CHANNEL_OPEN] =		&channel_post_open;
 	channel_post[SSH_CHANNEL_PORT_LISTENER] =	&channel_post_port_listener;
@@ -1849,6 +1961,8 @@
 	channel_post[SSH_CHANNEL_AUTH_SOCKET] =		&channel_post_auth_listener;
 	channel_post[SSH_CHANNEL_CONNECTING] =		&channel_post_connecting;
 	channel_post[SSH_CHANNEL_DYNAMIC] =		&channel_post_open;
+	channel_post[SSH_CHANNEL_MUX_LISTENER] =	&channel_post_mux_listener;
+	channel_post[SSH_CHANNEL_MUX_CLIENT] =		&channel_post_mux_client;
 }
 
 static void