libceph: implement RECONNECT_SEQ feature

This is an old protocol extension that allows the client and server to
avoid resending old messages after a reconnect (following a socket error).
Instead, the exchange their sequence numbers during the handshake.  This
avoids sending a bunch of useless data over the socket.

It has been supported in the server code since v0.22 (Sep 2010).

Signed-off-by: Sage Weil <sage@inktank.com>
Reviewed-by: Alex Elder <elder@inktank.com>
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index 997dacc..e8491db 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -1247,6 +1247,24 @@
 }
 
 /*
+ * Prepare to share the seq during handshake
+ */
+static void prepare_write_seq(struct ceph_connection *con)
+{
+	dout("prepare_write_seq %p %llu -> %llu\n", con,
+	     con->in_seq_acked, con->in_seq);
+	con->in_seq_acked = con->in_seq;
+
+	con_out_kvec_reset(con);
+
+	con->out_temp_ack = cpu_to_le64(con->in_seq_acked);
+	con_out_kvec_add(con, sizeof (con->out_temp_ack),
+			 &con->out_temp_ack);
+
+	con_flag_set(con, CON_FLAG_WRITE_PENDING);
+}
+
+/*
  * Prepare to write keepalive byte.
  */
 static void prepare_write_keepalive(struct ceph_connection *con)
@@ -1582,6 +1600,13 @@
 	con->in_base_pos = 0;
 }
 
+static void prepare_read_seq(struct ceph_connection *con)
+{
+	dout("prepare_read_seq %p\n", con);
+	con->in_base_pos = 0;
+	con->in_tag = CEPH_MSGR_TAG_SEQ;
+}
+
 static void prepare_read_tag(struct ceph_connection *con)
 {
 	dout("prepare_read_tag %p\n", con);
@@ -2059,6 +2084,7 @@
 		prepare_read_connect(con);
 		break;
 
+	case CEPH_MSGR_TAG_SEQ:
 	case CEPH_MSGR_TAG_READY:
 		if (req_feat & ~server_feat) {
 			pr_err("%s%lld %s protocol feature mismatch,"
@@ -2089,7 +2115,12 @@
 
 		con->delay = 0;      /* reset backoff memory */
 
-		prepare_read_tag(con);
+		if (con->in_reply.tag == CEPH_MSGR_TAG_SEQ) {
+			prepare_write_seq(con);
+			prepare_read_seq(con);
+		} else {
+			prepare_read_tag(con);
+		}
 		break;
 
 	case CEPH_MSGR_TAG_WAIT:
@@ -2123,7 +2154,6 @@
 	return read_partial(con, end, size, &con->in_temp_ack);
 }
 
-
 /*
  * We can finally discard anything that's been acked.
  */
@@ -2148,8 +2178,6 @@
 }
 
 
-
-
 static int read_partial_message_section(struct ceph_connection *con,
 					struct kvec *section,
 					unsigned int sec_len, u32 *crc)
@@ -2672,7 +2700,12 @@
 			prepare_read_tag(con);
 		goto more;
 	}
-	if (con->in_tag == CEPH_MSGR_TAG_ACK) {
+	if (con->in_tag == CEPH_MSGR_TAG_ACK ||
+	    con->in_tag == CEPH_MSGR_TAG_SEQ) {
+		/*
+		 * the final handshake seq exchange is semantically
+		 * equivalent to an ACK
+		 */
 		ret = read_partial_ack(con);
 		if (ret <= 0)
 			goto out;