- djm@cvs.openbsd.org 2002/02/12 12:32:27
     [sftp.1 sftp.c sftp-client.c sftp-client.h sftp-int.c]
     Perform multiple overlapping read/write requests in file transfer. Mostly
     done by Tobias Ringstrom <tori@ringstrom.mine.nu>; ok markus@
diff --git a/sftp-client.c b/sftp-client.c
index 362814d..835ae06 100644
--- a/sftp-client.c
+++ b/sftp-client.c
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2001 Damien Miller.  All rights reserved.
+ * Copyright (c) 2001-2002 Damien Miller.  All rights reserved.
  *
  * Redistribution and use in source and binary forms, with or without
  * modification, are permitted provided that the following conditions
@@ -24,12 +24,17 @@
 
 /* XXX: memleaks */
 /* XXX: signed vs unsigned */
-/* XXX: redesign to allow concurrent overlapped operations */
 /* XXX: we use fatal too much, error may be more appropriate in places */
 /* XXX: copy between two remote sites */
 
 #include "includes.h"
-RCSID("$OpenBSD: sftp-client.c,v 1.20 2002/02/05 00:00:46 djm Exp $");
+RCSID("$OpenBSD: sftp-client.c,v 1.21 2002/02/12 12:32:27 djm Exp $");
+
+#if defined(HAVE_SYS_QUEUE_H) && !defined(HAVE_BOGUS_SYS_QUEUE_H)
+#include <sys/queue.h>
+#else
+#include "openbsd-compat/fake-queue.h"
+#endif
 
 #include "buffer.h"
 #include "bufaux.h"
@@ -42,6 +47,9 @@
 #include "sftp-common.h"
 #include "sftp-client.h"
 
+/* Minimum amount of data to read at at time */
+#define MIN_READ_SIZE	512
+
 /* Message ID */
 static u_int msg_id = 1;
 
@@ -664,16 +672,44 @@
 	return(filename);
 }
 
+static void
+send_read_request(int fd_out, u_int id, u_int64_t offset, u_int len,
+    char *handle, u_int handle_len)
+{
+	Buffer msg;
+	
+	buffer_init(&msg);
+	buffer_clear(&msg);
+	buffer_put_char(&msg, SSH2_FXP_READ);
+	buffer_put_int(&msg, id);
+	buffer_put_string(&msg, handle, handle_len);
+	buffer_put_int64(&msg, offset);
+	buffer_put_int(&msg, len);
+	send_msg(fd_out, &msg);
+	buffer_free(&msg);
+}	
+
 int
 do_download(int fd_in, int fd_out, char *remote_path, char *local_path,
-    int pflag, size_t buflen)
+    int pflag, size_t buflen, int num_requests)
 {
-	int local_fd, status;
-	u_int expected_id, handle_len, mode, type, id;
-	u_int64_t offset;
-	char *handle;
-	Buffer msg;
 	Attrib junk, *a;
+	Buffer msg;
+	char *handle;
+	int local_fd, status, num_req, max_req, write_error;
+	int read_error, write_errno;
+	u_int64_t offset, size;
+	u_int handle_len, mode, type, id;
+	struct request {
+		u_int id;
+		u_int len;
+		u_int64_t offset;
+		TAILQ_ENTRY(request) tq; 
+	};
+	TAILQ_HEAD(reqhead, request) requests;
+	struct request *req;
+
+	TAILQ_INIT(&requests);
 
 	a = do_stat(fd_in, fd_out, remote_path, 0);
 	if (a == NULL)
@@ -691,6 +727,11 @@
 		return(-1);
 	}
 
+	if (a->flags & SSH2_FILEXFER_ATTR_SIZE)
+		size = a->size;
+	else
+		size = 0;
+
 	local_fd = open(local_path, O_WRONLY | O_CREAT | O_TRUNC, mode);
 	if (local_fd == -1) {
 		error("Couldn't open local file \"%s\" for writing: %s",
@@ -719,88 +760,140 @@
 	}
 
 	/* Read from remote and write to local */
-	offset = 0;
-	for (;;) {
-		u_int len;
+	write_error = read_error = write_errno = num_req = offset = 0;
+	max_req = 1;
+	while (num_req > 0 || max_req > 0) {
 		char *data;
+		u_int len;
 
-		id = expected_id = msg_id++;
+		/* Send some more requests */
+		while (num_req < max_req) {
+			debug3("Request range %llu -> %llu (%d/%d)", 
+			    offset, offset + buflen - 1, num_req, max_req);
+			req = xmalloc(sizeof(*req));
+			req->id = msg_id++;
+			req->len = buflen;
+			req->offset = offset;
+			offset += buflen;
+			num_req++;
+			TAILQ_INSERT_TAIL(&requests, req, tq);
+			send_read_request(fd_out, req->id, req->offset, 
+			    req->len, handle, handle_len);
+		}
 
 		buffer_clear(&msg);
-		buffer_put_char(&msg, SSH2_FXP_READ);
-		buffer_put_int(&msg, id);
-		buffer_put_string(&msg, handle, handle_len);
-		buffer_put_int64(&msg, offset);
-		buffer_put_int(&msg, buflen);
-		send_msg(fd_out, &msg);
-		debug3("Sent message SSH2_FXP_READ I:%d O:%llu S:%u",
-		    id, (u_int64_t)offset, buflen);
-
-		buffer_clear(&msg);
-
 		get_msg(fd_in, &msg);
 		type = buffer_get_char(&msg);
 		id = buffer_get_int(&msg);
-		debug3("Received reply T:%d I:%d", type, id);
-		if (id != expected_id)
-			fatal("ID mismatch (%d != %d)", id, expected_id);
-		if (type == SSH2_FXP_STATUS) {
-			status = buffer_get_int(&msg);
+		debug3("Received reply T:%d I:%d R:%d", type, id, max_req);
 
-			if (status == SSH2_FX_EOF)
-				break;
-			else {
-				error("Couldn't read from remote "
-				    "file \"%s\" : %s", remote_path,
-				    fx2txt(status));
-				do_close(fd_in, fd_out, handle, handle_len);
-				goto done;
+		/* Find the request in our queue */
+		for(req = TAILQ_FIRST(&requests);
+		    req != NULL && req->id != id;
+		    req = TAILQ_NEXT(req, tq))
+			;
+		if (req == NULL)
+			fatal("Unexpected reply %u", id);
+
+		switch (type) {
+		case SSH2_FXP_STATUS:
+			status = buffer_get_int(&msg);
+			if (status != SSH2_FX_EOF)
+				read_error = 1;
+			max_req = 0;
+			TAILQ_REMOVE(&requests, req, tq);
+			xfree(req);
+			num_req--;
+			break;
+		case SSH2_FXP_DATA:
+			data = buffer_get_string(&msg, &len);
+			debug3("Received data %llu -> %llu", req->offset, 
+			    req->offset + len - 1);
+			if (len > req->len)
+				fatal("Received more data than asked for "
+				      "%d > %d", len, req->len);
+			if ((lseek(local_fd, req->offset, SEEK_SET) == -1 ||
+			     atomicio(write, local_fd, data, len) != len) &&
+			    !write_error) {
+				write_errno = errno;
+				write_error = 1;
+				max_req = 0;
 			}
-		} else if (type != SSH2_FXP_DATA) {
+			xfree(data);
+
+			if (len == req->len) {
+				TAILQ_REMOVE(&requests, req, tq);
+				xfree(req);
+				num_req--;
+			} else {
+				/* Resend the request for the missing data */
+				debug3("Short data block, re-requesting "
+				    "%llu -> %llu (%2d)", req->offset + len, 
+					req->offset + req->len - 1, num_req);
+				req->id = msg_id++;
+				req->len -= len;
+				req->offset += len;
+				send_read_request(fd_out, req->id, 
+				    req->offset, req->len, handle, 
+				    handle_len);
+				/* Reduce the request size */
+				if (len < buflen)
+					buflen = MAX(MIN_READ_SIZE, len);
+			}
+			if (max_req > 0) { /* max_req = 0 iff EOF received */
+				if (size > 0 && offset > size) {
+					/* Only one request at a time
+					 * after the expected EOF */
+					debug3("Finish at %llu (%2d)",
+					    offset, num_req);
+					max_req = 1;
+				}
+				else if (max_req < num_requests + 1) {
+					++max_req;
+				}
+			}
+			break;
+		default:
 			fatal("Expected SSH2_FXP_DATA(%d) packet, got %d",
 			    SSH2_FXP_DATA, type);
 		}
-
-		data = buffer_get_string(&msg, &len);
-		if (len > buflen)
-			fatal("Received more data than asked for %d > %d",
-			    len, buflen);
-
-		debug3("In read loop, got %d offset %llu", len,
-		    (u_int64_t)offset);
-		if (atomicio(write, local_fd, data, len) != len) {
-			error("Couldn't write to \"%s\": %s", local_path,
-			    strerror(errno));
-			do_close(fd_in, fd_out, handle, handle_len);
-			status = -1;
-			xfree(data);
-			goto done;
-		}
-
-		offset += len;
-		xfree(data);
 	}
-	status = do_close(fd_in, fd_out, handle, handle_len);
 
-	/* Override umask and utimes if asked */
+	/* Sanity check */
+	if (TAILQ_FIRST(&requests) != NULL)
+		fatal("Transfer complete, but requests still in queue");
+
+	if (read_error) {
+	    error("Couldn't read from remote "
+		  "file \"%s\" : %s", remote_path,
+		  fx2txt(status));
+	    do_close(fd_in, fd_out, handle, handle_len);
+	} else if (write_error) {
+	    error("Couldn't write to \"%s\": %s", local_path,
+		  strerror(write_errno));
+	    status = -1;
+	    do_close(fd_in, fd_out, handle, handle_len);
+	} else {
+		status = do_close(fd_in, fd_out, handle, handle_len);
+
+		/* Override umask and utimes if asked */
 #ifdef HAVE_FCHMOD
-	if (pflag && fchmod(local_fd, mode) == -1)
+		if (pflag && fchmod(local_fd, mode) == -1)
 #else 
-	if (pflag && chmod(local_path, mode) == -1)
+		if (pflag && chmod(local_path, mode) == -1)
 #endif /* HAVE_FCHMOD */
-		error("Couldn't set mode on \"%s\": %s", local_path,
-		    strerror(errno));
-	if (pflag && (a->flags & SSH2_FILEXFER_ATTR_ACMODTIME)) {
-		struct timeval tv[2];
-		tv[0].tv_sec = a->atime;
-		tv[1].tv_sec = a->mtime;
-		tv[0].tv_usec = tv[1].tv_usec = 0;
-		if (utimes(local_path, tv) == -1)
-			error("Can't set times on \"%s\": %s", local_path,
-			    strerror(errno));
+			error("Couldn't set mode on \"%s\": %s", local_path,
+			      strerror(errno));
+		if (pflag && (a->flags & SSH2_FILEXFER_ATTR_ACMODTIME)) {
+			struct timeval tv[2];
+			tv[0].tv_sec = a->atime;
+			tv[1].tv_sec = a->mtime;
+			tv[0].tv_usec = tv[1].tv_usec = 0;
+			if (utimes(local_path, tv) == -1)
+				error("Can't set times on \"%s\": %s",
+				      local_path, strerror(errno));
+		}
 	}
-
-done:
 	close(local_fd);
 	buffer_free(&msg);
 	xfree(handle);
@@ -809,7 +902,7 @@
 
 int
 do_upload(int fd_in, int fd_out, char *local_path, char *remote_path,
-    int pflag, size_t buflen)
+    int pflag, size_t buflen, int num_requests)
 {
 	int local_fd, status;
 	u_int handle_len, id;
@@ -818,6 +911,8 @@
 	Buffer msg;
 	struct stat sb;
 	Attrib a;
+	u_int32_t startid;
+	u_int32_t ackid;
 
 	if ((local_fd = open(local_path, O_RDONLY, 0)) == -1) {
 		error("Couldn't open local file \"%s\" for reading: %s",
@@ -859,6 +954,7 @@
 		return(-1);
 	}
 
+	startid = ackid = id + 1;
 	data = xmalloc(buflen);
 
 	/* Read from local and write to remote */
@@ -877,29 +973,34 @@
 		if (len == -1)
 			fatal("Couldn't read from \"%s\": %s", local_path,
 			    strerror(errno));
-		if (len == 0)
+
+		if (len != 0) {
+			buffer_clear(&msg);
+			buffer_put_char(&msg, SSH2_FXP_WRITE);
+			buffer_put_int(&msg, ++id);
+			buffer_put_string(&msg, handle, handle_len);
+			buffer_put_int64(&msg, offset);
+			buffer_put_string(&msg, data, len);
+			send_msg(fd_out, &msg);
+			debug3("Sent message SSH2_FXP_WRITE I:%d O:%llu S:%u",
+			       id, (u_int64_t)offset, len);
+		} else if ( id < ackid )
 			break;
 
-		buffer_clear(&msg);
-		buffer_put_char(&msg, SSH2_FXP_WRITE);
-		buffer_put_int(&msg, ++id);
-		buffer_put_string(&msg, handle, handle_len);
-		buffer_put_int64(&msg, offset);
-		buffer_put_string(&msg, data, len);
-		send_msg(fd_out, &msg);
-		debug3("Sent message SSH2_FXP_WRITE I:%d O:%llu S:%u",
-		    id, (u_int64_t)offset, len);
-
-		status = get_status(fd_in, id);
-		if (status != SSH2_FX_OK) {
-			error("Couldn't write to remote file \"%s\": %s",
-			    remote_path, fx2txt(status));
-			do_close(fd_in, fd_out, handle, handle_len);
-			close(local_fd);
-			goto done;
+		if (id == startid || len == 0 ||
+		    id - ackid >= num_requests) {
+			status = get_status(fd_in, ackid);
+			if (status != SSH2_FX_OK) {
+				error("Couldn't write to remote file \"%s\": %s",
+				      remote_path, fx2txt(status));
+				do_close(fd_in, fd_out, handle, handle_len);
+				close(local_fd);
+				goto done;
+			}
+			debug3("In write loop, got %d offset %llu", len,
+			       (u_int64_t)offset);
+			++ackid;
 		}
-		debug3("In write loop, got %d offset %llu", len,
-		    (u_int64_t)offset);
 
 		offset += len;
 	}
@@ -924,4 +1025,3 @@
 	buffer_free(&msg);
 	return status;
 }
-