- djm@cvs.openbsd.org 2010/09/22 22:58:51
     [atomicio.c atomicio.h misc.c misc.h scp.c sftp-client.c]
     [sftp-client.h sftp.1 sftp.c]
     add an option per-read/write callback to atomicio

     factor out bandwidth limiting code from scp(1) into a generic bandwidth
     limiter that can be attached using the atomicio callback mechanism

     add a bandwidth limit option to sftp(1) using the above
     "very nice" markus@
diff --git a/sftp-client.c b/sftp-client.c
index 9dab477..4e009ef 100644
--- a/sftp-client.c
+++ b/sftp-client.c
@@ -1,4 +1,4 @@
-/* $OpenBSD: sftp-client.c,v 1.92 2010/07/19 03:16:33 djm Exp $ */
+/* $OpenBSD: sftp-client.c,v 1.93 2010/09/22 22:58:51 djm Exp $ */
 /*
  * Copyright (c) 2001-2004 Damien Miller <djm@openbsd.org>
  *
@@ -76,14 +76,26 @@
 #define SFTP_EXT_STATVFS	0x00000002
 #define SFTP_EXT_FSTATVFS	0x00000004
 	u_int exts;
+	u_int64_t limit_kbps;
+	struct bwlimit bwlimit_in, bwlimit_out;
 };
 
 static char *
-get_handle(int fd, u_int expected_id, u_int *len, const char *errfmt, ...)
-    __attribute__((format(printf, 4, 5)));
+get_handle(struct sftp_conn *conn, u_int expected_id, u_int *len,
+    const char *errfmt, ...) __attribute__((format(printf, 4, 5)));
+
+/* ARGSUSED */
+static int
+sftpio(void *_bwlimit, size_t amount)
+{
+	struct bwlimit *bwlimit = (struct bwlimit *)_bwlimit;
+
+	bandwidth_limit(bwlimit, amount);
+	return 0;
+}
 
 static void
-send_msg(int fd, Buffer *m)
+send_msg(struct sftp_conn *conn, Buffer *m)
 {
 	u_char mlen[4];
 	struct iovec iov[2];
@@ -98,19 +110,22 @@
 	iov[1].iov_base = buffer_ptr(m);
 	iov[1].iov_len = buffer_len(m);
 
-	if (atomiciov(writev, fd, iov, 2) != buffer_len(m) + sizeof(mlen))
+	if (atomiciov6(writev, conn->fd_out, iov, 2,
+	    conn->limit_kbps > 0 ? sftpio : NULL, &conn->bwlimit_out) != 
+	    buffer_len(m) + sizeof(mlen))
 		fatal("Couldn't send packet: %s", strerror(errno));
 
 	buffer_clear(m);
 }
 
 static void
-get_msg(int fd, Buffer *m)
+get_msg(struct sftp_conn *conn, Buffer *m)
 {
 	u_int msg_len;
 
 	buffer_append_space(m, 4);
-	if (atomicio(read, fd, buffer_ptr(m), 4) != 4) {
+	if (atomicio6(read, conn->fd_in, buffer_ptr(m), 4,
+	    conn->limit_kbps > 0 ? sftpio : NULL, &conn->bwlimit_in) != 4) {
 		if (errno == EPIPE)
 			fatal("Connection closed");
 		else
@@ -122,7 +137,9 @@
 		fatal("Received message too long %u", msg_len);
 
 	buffer_append_space(m, msg_len);
-	if (atomicio(read, fd, buffer_ptr(m), msg_len) != msg_len) {
+	if (atomicio6(read, conn->fd_in, buffer_ptr(m), msg_len,
+	    conn->limit_kbps > 0 ? sftpio : NULL, &conn->bwlimit_in)
+	    != msg_len) {
 		if (errno == EPIPE)
 			fatal("Connection closed");
 		else
@@ -131,7 +148,7 @@
 }
 
 static void
-send_string_request(int fd, u_int id, u_int code, char *s,
+send_string_request(struct sftp_conn *conn, u_int id, u_int code, char *s,
     u_int len)
 {
 	Buffer msg;
@@ -140,14 +157,14 @@
 	buffer_put_char(&msg, code);
 	buffer_put_int(&msg, id);
 	buffer_put_string(&msg, s, len);
-	send_msg(fd, &msg);
-	debug3("Sent message fd %d T:%u I:%u", fd, code, id);
+	send_msg(conn, &msg);
+	debug3("Sent message fd %d T:%u I:%u", conn->fd_out, code, id);
 	buffer_free(&msg);
 }
 
 static void
-send_string_attrs_request(int fd, u_int id, u_int code, char *s,
-    u_int len, Attrib *a)
+send_string_attrs_request(struct sftp_conn *conn, u_int id, u_int code,
+    char *s, u_int len, Attrib *a)
 {
 	Buffer msg;
 
@@ -156,19 +173,19 @@
 	buffer_put_int(&msg, id);
 	buffer_put_string(&msg, s, len);
 	encode_attrib(&msg, a);
-	send_msg(fd, &msg);
-	debug3("Sent message fd %d T:%u I:%u", fd, code, id);
+	send_msg(conn, &msg);
+	debug3("Sent message fd %d T:%u I:%u", conn->fd_out, code, id);
 	buffer_free(&msg);
 }
 
 static u_int
-get_status(int fd, u_int expected_id)
+get_status(struct sftp_conn *conn, u_int expected_id)
 {
 	Buffer msg;
 	u_int type, id, status;
 
 	buffer_init(&msg);
-	get_msg(fd, &msg);
+	get_msg(conn, &msg);
 	type = buffer_get_char(&msg);
 	id = buffer_get_int(&msg);
 
@@ -183,11 +200,12 @@
 
 	debug3("SSH2_FXP_STATUS %u", status);
 
-	return(status);
+	return status;
 }
 
 static char *
-get_handle(int fd, u_int expected_id, u_int *len, const char *errfmt, ...)
+get_handle(struct sftp_conn *conn, u_int expected_id, u_int *len,
+    const char *errfmt, ...)
 {
 	Buffer msg;
 	u_int type, id;
@@ -201,7 +219,7 @@
 	va_end(args);
 
 	buffer_init(&msg);
-	get_msg(fd, &msg);
+	get_msg(conn, &msg);
 	type = buffer_get_char(&msg);
 	id = buffer_get_int(&msg);
 
@@ -225,14 +243,14 @@
 }
 
 static Attrib *
-get_decode_stat(int fd, u_int expected_id, int quiet)
+get_decode_stat(struct sftp_conn *conn, u_int expected_id, int quiet)
 {
 	Buffer msg;
 	u_int type, id;
 	Attrib *a;
 
 	buffer_init(&msg);
-	get_msg(fd, &msg);
+	get_msg(conn, &msg);
 
 	type = buffer_get_char(&msg);
 	id = buffer_get_int(&msg);
@@ -260,14 +278,14 @@
 }
 
 static int
-get_decode_statvfs(int fd, struct sftp_statvfs *st, u_int expected_id,
-    int quiet)
+get_decode_statvfs(struct sftp_conn *conn, struct sftp_statvfs *st,
+    u_int expected_id, int quiet)
 {
 	Buffer msg;
 	u_int type, id, flag;
 
 	buffer_init(&msg);
-	get_msg(fd, &msg);
+	get_msg(conn, &msg);
 
 	type = buffer_get_char(&msg);
 	id = buffer_get_int(&msg);
@@ -311,21 +329,29 @@
 }
 
 struct sftp_conn *
-do_init(int fd_in, int fd_out, u_int transfer_buflen, u_int num_requests)
+do_init(int fd_in, int fd_out, u_int transfer_buflen, u_int num_requests,
+    u_int64_t limit_kbps)
 {
-	u_int type, exts = 0;
-	int version;
+	u_int type;
 	Buffer msg;
 	struct sftp_conn *ret;
 
+	ret = xmalloc(sizeof(*ret));
+	ret->fd_in = fd_in;
+	ret->fd_out = fd_out;
+	ret->transfer_buflen = transfer_buflen;
+	ret->num_requests = num_requests;
+	ret->exts = 0;
+	ret->limit_kbps = 0;
+
 	buffer_init(&msg);
 	buffer_put_char(&msg, SSH2_FXP_INIT);
 	buffer_put_int(&msg, SSH2_FILEXFER_VERSION);
-	send_msg(fd_out, &msg);
+	send_msg(ret, &msg);
 
 	buffer_clear(&msg);
 
-	get_msg(fd_in, &msg);
+	get_msg(ret, &msg);
 
 	/* Expecting a VERSION reply */
 	if ((type = buffer_get_char(&msg)) != SSH2_FXP_VERSION) {
@@ -334,9 +360,9 @@
 		buffer_free(&msg);
 		return(NULL);
 	}
-	version = buffer_get_int(&msg);
+	ret->version = buffer_get_int(&msg);
 
-	debug2("Remote version: %d", version);
+	debug2("Remote version: %u", ret->version);
 
 	/* Check for extensions */
 	while (buffer_len(&msg) > 0) {
@@ -346,15 +372,15 @@
 
 		if (strcmp(name, "posix-rename@openssh.com") == 0 &&
 		    strcmp(value, "1") == 0) {
-			exts |= SFTP_EXT_POSIX_RENAME;
+			ret->exts |= SFTP_EXT_POSIX_RENAME;
 			known = 1;
 		} else if (strcmp(name, "statvfs@openssh.com") == 0 &&
 		    strcmp(value, "2") == 0) {
-			exts |= SFTP_EXT_STATVFS;
+			ret->exts |= SFTP_EXT_STATVFS;
 			known = 1;
 		} if (strcmp(name, "fstatvfs@openssh.com") == 0 &&
 		    strcmp(value, "2") == 0) {
-			exts |= SFTP_EXT_FSTATVFS;
+			ret->exts |= SFTP_EXT_FSTATVFS;
 			known = 1;
 		}
 		if (known) {
@@ -369,26 +395,25 @@
 
 	buffer_free(&msg);
 
-	ret = xmalloc(sizeof(*ret));
-	ret->fd_in = fd_in;
-	ret->fd_out = fd_out;
-	ret->transfer_buflen = transfer_buflen;
-	ret->num_requests = num_requests;
-	ret->version = version;
-	ret->msg_id = 1;
-	ret->exts = exts;
-
 	/* Some filexfer v.0 servers don't support large packets */
-	if (version == 0)
+	if (ret->version == 0)
 		ret->transfer_buflen = MIN(ret->transfer_buflen, 20480);
 
-	return(ret);
+	ret->limit_kbps = limit_kbps;
+	if (ret->limit_kbps > 0) {
+		bandwidth_limit_init(&ret->bwlimit_in, ret->limit_kbps,
+		    ret->transfer_buflen);
+		bandwidth_limit_init(&ret->bwlimit_out, ret->limit_kbps,
+		    ret->transfer_buflen);
+	}
+
+	return ret;
 }
 
 u_int
 sftp_proto_version(struct sftp_conn *conn)
 {
-	return(conn->version);
+	return conn->version;
 }
 
 int
@@ -403,16 +428,16 @@
 	buffer_put_char(&msg, SSH2_FXP_CLOSE);
 	buffer_put_int(&msg, id);
 	buffer_put_string(&msg, handle, handle_len);
-	send_msg(conn->fd_out, &msg);
+	send_msg(conn, &msg);
 	debug3("Sent message SSH2_FXP_CLOSE I:%u", id);
 
-	status = get_status(conn->fd_in, id);
+	status = get_status(conn, id);
 	if (status != SSH2_FX_OK)
 		error("Couldn't close file: %s", fx2txt(status));
 
 	buffer_free(&msg);
 
-	return(status);
+	return status;
 }
 
 
@@ -430,14 +455,14 @@
 	buffer_put_char(&msg, SSH2_FXP_OPENDIR);
 	buffer_put_int(&msg, id);
 	buffer_put_cstring(&msg, path);
-	send_msg(conn->fd_out, &msg);
+	send_msg(conn, &msg);
 
 	buffer_clear(&msg);
 
-	handle = get_handle(conn->fd_in, id, &handle_len,
+	handle = get_handle(conn, id, &handle_len,
 	    "remote readdir(\"%s\")", path);
 	if (handle == NULL)
-		return(-1);
+		return -1;
 
 	if (dir) {
 		ents = 0;
@@ -454,11 +479,11 @@
 		buffer_put_char(&msg, SSH2_FXP_READDIR);
 		buffer_put_int(&msg, id);
 		buffer_put_string(&msg, handle, handle_len);
-		send_msg(conn->fd_out, &msg);
+		send_msg(conn, &msg);
 
 		buffer_clear(&msg);
 
-		get_msg(conn->fd_in, &msg);
+		get_msg(conn, &msg);
 
 		type = buffer_get_char(&msg);
 		id = buffer_get_int(&msg);
@@ -537,7 +562,7 @@
 		**dir = NULL;
 	}
 
-	return(0);
+	return 0;
 }
 
 int
@@ -566,9 +591,8 @@
 	debug2("Sending SSH2_FXP_REMOVE \"%s\"", path);
 
 	id = conn->msg_id++;
-	send_string_request(conn->fd_out, id, SSH2_FXP_REMOVE, path,
-	    strlen(path));
-	status = get_status(conn->fd_in, id);
+	send_string_request(conn, id, SSH2_FXP_REMOVE, path, strlen(path));
+	status = get_status(conn, id);
 	if (status != SSH2_FX_OK)
 		error("Couldn't delete file: %s", fx2txt(status));
 	return(status);
@@ -580,10 +604,10 @@
 	u_int status, id;
 
 	id = conn->msg_id++;
-	send_string_attrs_request(conn->fd_out, id, SSH2_FXP_MKDIR, path,
+	send_string_attrs_request(conn, id, SSH2_FXP_MKDIR, path,
 	    strlen(path), a);
 
-	status = get_status(conn->fd_in, id);
+	status = get_status(conn, id);
 	if (status != SSH2_FX_OK && printflag)
 		error("Couldn't create directory: %s", fx2txt(status));
 
@@ -596,10 +620,10 @@
 	u_int status, id;
 
 	id = conn->msg_id++;
-	send_string_request(conn->fd_out, id, SSH2_FXP_RMDIR, path,
+	send_string_request(conn, id, SSH2_FXP_RMDIR, path,
 	    strlen(path));
 
-	status = get_status(conn->fd_in, id);
+	status = get_status(conn, id);
 	if (status != SSH2_FX_OK)
 		error("Couldn't remove directory: %s", fx2txt(status));
 
@@ -613,11 +637,11 @@
 
 	id = conn->msg_id++;
 
-	send_string_request(conn->fd_out, id,
+	send_string_request(conn, id,
 	    conn->version == 0 ? SSH2_FXP_STAT_VERSION_0 : SSH2_FXP_STAT,
 	    path, strlen(path));
 
-	return(get_decode_stat(conn->fd_in, id, quiet));
+	return(get_decode_stat(conn, id, quiet));
 }
 
 Attrib *
@@ -634,10 +658,10 @@
 	}
 
 	id = conn->msg_id++;
-	send_string_request(conn->fd_out, id, SSH2_FXP_LSTAT, path,
+	send_string_request(conn, id, SSH2_FXP_LSTAT, path,
 	    strlen(path));
 
-	return(get_decode_stat(conn->fd_in, id, quiet));
+	return(get_decode_stat(conn, id, quiet));
 }
 
 #ifdef notyet
@@ -647,10 +671,10 @@
 	u_int id;
 
 	id = conn->msg_id++;
-	send_string_request(conn->fd_out, id, SSH2_FXP_FSTAT, handle,
+	send_string_request(conn, id, SSH2_FXP_FSTAT, handle,
 	    handle_len);
 
-	return(get_decode_stat(conn->fd_in, id, quiet));
+	return(get_decode_stat(conn, id, quiet));
 }
 #endif
 
@@ -660,10 +684,10 @@
 	u_int status, id;
 
 	id = conn->msg_id++;
-	send_string_attrs_request(conn->fd_out, id, SSH2_FXP_SETSTAT, path,
+	send_string_attrs_request(conn, id, SSH2_FXP_SETSTAT, path,
 	    strlen(path), a);
 
-	status = get_status(conn->fd_in, id);
+	status = get_status(conn, id);
 	if (status != SSH2_FX_OK)
 		error("Couldn't setstat on \"%s\": %s", path,
 		    fx2txt(status));
@@ -678,10 +702,10 @@
 	u_int status, id;
 
 	id = conn->msg_id++;
-	send_string_attrs_request(conn->fd_out, id, SSH2_FXP_FSETSTAT, handle,
+	send_string_attrs_request(conn, id, SSH2_FXP_FSETSTAT, handle,
 	    handle_len, a);
 
-	status = get_status(conn->fd_in, id);
+	status = get_status(conn, id);
 	if (status != SSH2_FX_OK)
 		error("Couldn't fsetstat: %s", fx2txt(status));
 
@@ -697,12 +721,12 @@
 	Attrib *a;
 
 	expected_id = id = conn->msg_id++;
-	send_string_request(conn->fd_out, id, SSH2_FXP_REALPATH, path,
+	send_string_request(conn, id, SSH2_FXP_REALPATH, path,
 	    strlen(path));
 
 	buffer_init(&msg);
 
-	get_msg(conn->fd_in, &msg);
+	get_msg(conn, &msg);
 	type = buffer_get_char(&msg);
 	id = buffer_get_int(&msg);
 
@@ -756,13 +780,13 @@
 	}
 	buffer_put_cstring(&msg, oldpath);
 	buffer_put_cstring(&msg, newpath);
-	send_msg(conn->fd_out, &msg);
+	send_msg(conn, &msg);
 	debug3("Sent message %s \"%s\" -> \"%s\"",
 	    (conn->exts & SFTP_EXT_POSIX_RENAME) ? "posix-rename@openssh.com" :
 	    "SSH2_FXP_RENAME", oldpath, newpath);
 	buffer_free(&msg);
 
-	status = get_status(conn->fd_in, id);
+	status = get_status(conn, id);
 	if (status != SSH2_FX_OK)
 		error("Couldn't rename file \"%s\" to \"%s\": %s", oldpath,
 		    newpath, fx2txt(status));
@@ -789,12 +813,12 @@
 	buffer_put_int(&msg, id);
 	buffer_put_cstring(&msg, oldpath);
 	buffer_put_cstring(&msg, newpath);
-	send_msg(conn->fd_out, &msg);
+	send_msg(conn, &msg);
 	debug3("Sent message SSH2_FXP_SYMLINK \"%s\" -> \"%s\"", oldpath,
 	    newpath);
 	buffer_free(&msg);
 
-	status = get_status(conn->fd_in, id);
+	status = get_status(conn, id);
 	if (status != SSH2_FX_OK)
 		error("Couldn't symlink file \"%s\" to \"%s\": %s", oldpath,
 		    newpath, fx2txt(status));
@@ -812,12 +836,11 @@
 	Attrib *a;
 
 	expected_id = id = conn->msg_id++;
-	send_string_request(conn->fd_out, id, SSH2_FXP_READLINK, path,
-	    strlen(path));
+	send_string_request(conn, id, SSH2_FXP_READLINK, path, strlen(path));
 
 	buffer_init(&msg);
 
-	get_msg(conn->fd_in, &msg);
+	get_msg(conn, &msg);
 	type = buffer_get_char(&msg);
 	id = buffer_get_int(&msg);
 
@@ -871,10 +894,10 @@
 	buffer_put_int(&msg, id);
 	buffer_put_cstring(&msg, "statvfs@openssh.com");
 	buffer_put_cstring(&msg, path);
-	send_msg(conn->fd_out, &msg);
+	send_msg(conn, &msg);
 	buffer_free(&msg);
 
-	return get_decode_statvfs(conn->fd_in, st, id, quiet);
+	return get_decode_statvfs(conn, st, id, quiet);
 }
 
 #ifdef notyet
@@ -898,16 +921,16 @@
 	buffer_put_int(&msg, id);
 	buffer_put_cstring(&msg, "fstatvfs@openssh.com");
 	buffer_put_string(&msg, handle, handle_len);
-	send_msg(conn->fd_out, &msg);
+	send_msg(conn, &msg);
 	buffer_free(&msg);
 
-	return get_decode_statvfs(conn->fd_in, st, id, quiet);
+	return get_decode_statvfs(conn, st, id, quiet);
 }
 #endif
 
 static void
-send_read_request(int fd_out, u_int id, u_int64_t offset, u_int len,
-    char *handle, u_int handle_len)
+send_read_request(struct sftp_conn *conn, u_int id, u_int64_t offset,
+    u_int len, char *handle, u_int handle_len)
 {
 	Buffer msg;
 
@@ -918,7 +941,7 @@
 	buffer_put_string(&msg, handle, handle_len);
 	buffer_put_int64(&msg, offset);
 	buffer_put_int(&msg, len);
-	send_msg(fd_out, &msg);
+	send_msg(conn, &msg);
 	buffer_free(&msg);
 }
 
@@ -976,10 +999,10 @@
 	buffer_put_int(&msg, SSH2_FXF_READ);
 	attrib_clear(&junk); /* Send empty attributes */
 	encode_attrib(&msg, &junk);
-	send_msg(conn->fd_out, &msg);
+	send_msg(conn, &msg);
 	debug3("Sent message SSH2_FXP_OPEN I:%u P:%s", id, remote_path);
 
-	handle = get_handle(conn->fd_in, id, &handle_len,
+	handle = get_handle(conn, id, &handle_len,
 	    "remote open(\"%s\")", remote_path);
 	if (handle == NULL) {
 		buffer_free(&msg);
@@ -1032,12 +1055,12 @@
 			offset += buflen;
 			num_req++;
 			TAILQ_INSERT_TAIL(&requests, req, tq);
-			send_read_request(conn->fd_out, req->id, req->offset,
+			send_read_request(conn, req->id, req->offset,
 			    req->len, handle, handle_len);
 		}
 
 		buffer_clear(&msg);
-		get_msg(conn->fd_in, &msg);
+		get_msg(conn, &msg);
 		type = buffer_get_char(&msg);
 		id = buffer_get_int(&msg);
 		debug3("Received reply T:%u I:%u R:%d", type, id, max_req);
@@ -1092,7 +1115,7 @@
 				req->id = conn->msg_id++;
 				req->len -= len;
 				req->offset += len;
-				send_read_request(conn->fd_out, req->id,
+				send_read_request(conn, req->id,
 				    req->offset, req->len, handle, handle_len);
 				/* Reduce the request size */
 				if (len < buflen)
@@ -1327,12 +1350,12 @@
 	buffer_put_cstring(&msg, remote_path);
 	buffer_put_int(&msg, SSH2_FXF_WRITE|SSH2_FXF_CREAT|SSH2_FXF_TRUNC);
 	encode_attrib(&msg, &a);
-	send_msg(conn->fd_out, &msg);
+	send_msg(conn, &msg);
 	debug3("Sent message SSH2_FXP_OPEN I:%u P:%s", id, remote_path);
 
 	buffer_clear(&msg);
 
-	handle = get_handle(conn->fd_in, id, &handle_len,
+	handle = get_handle(conn, id, &handle_len,
 	    "remote open(\"%s\")", remote_path);
 	if (handle == NULL) {
 		close(local_fd);
@@ -1381,7 +1404,7 @@
 			buffer_put_string(&msg, handle, handle_len);
 			buffer_put_int64(&msg, offset);
 			buffer_put_string(&msg, data, len);
-			send_msg(conn->fd_out, &msg);
+			send_msg(conn, &msg);
 			debug3("Sent message SSH2_FXP_WRITE I:%u O:%llu S:%u",
 			    id, (unsigned long long)offset, len);
 		} else if (TAILQ_FIRST(&acks) == NULL)
@@ -1395,7 +1418,7 @@
 			u_int r_id;
 
 			buffer_clear(&msg);
-			get_msg(conn->fd_in, &msg);
+			get_msg(conn, &msg);
 			type = buffer_get_char(&msg);
 			r_id = buffer_get_int(&msg);