ceph: support ceph_pagelist for message payload

The ceph_pagelist is a simple list of whole pages, strung together via
their lru list_head.  It facilitates encoding to a "buffer" of unknown
size.  Allow its use in place of the ceph_msg page vector.

This will be used to fix the huge buffer preallocation woes of MDS
reconnection.

Signed-off-by: Sage Weil <sage@newdream.net>
diff --git a/fs/ceph/Makefile b/fs/ceph/Makefile
index 827629c8..47caf2f 100644
--- a/fs/ceph/Makefile
+++ b/fs/ceph/Makefile
@@ -8,7 +8,7 @@
 
 ceph-objs := super.o inode.o dir.o file.o addr.o ioctl.o \
 	export.o caps.o snap.o xattr.o \
-	messenger.o msgpool.o buffer.o \
+	messenger.o msgpool.o buffer.o pagelist.o \
 	mds_client.o mdsmap.o \
 	mon_client.o \
 	osd_client.o osdmap.o crush/crush.o crush/mapper.o crush/hash.o \
diff --git a/fs/ceph/messenger.c b/fs/ceph/messenger.c
index 68052f6..c1106e8 100644
--- a/fs/ceph/messenger.c
+++ b/fs/ceph/messenger.c
@@ -13,6 +13,7 @@
 #include "super.h"
 #include "messenger.h"
 #include "decode.h"
+#include "pagelist.h"
 
 /*
  * Ceph uses the messenger to exchange ceph_msg messages with other
@@ -728,6 +729,11 @@
 			page = msg->pages[con->out_msg_pos.page];
 			if (crc)
 				kaddr = kmap(page);
+		} else if (msg->pagelist) {
+			page = list_first_entry(&msg->pagelist->head,
+						struct page, lru);
+			if (crc)
+				kaddr = kmap(page);
 		} else {
 			page = con->msgr->zero_page;
 			if (crc)
@@ -750,7 +756,7 @@
 				      MSG_DONTWAIT | MSG_NOSIGNAL |
 				      MSG_MORE);
 
-		if (crc && msg->pages)
+		if (crc && (msg->pages || msg->pagelist))
 			kunmap(page);
 
 		if (ret <= 0)
@@ -762,6 +768,9 @@
 			con->out_msg_pos.page_pos = 0;
 			con->out_msg_pos.page++;
 			con->out_msg_pos.did_page_crc = 0;
+			if (msg->pagelist)
+				list_move_tail(&page->lru,
+					       &msg->pagelist->head);
 		}
 	}
 
@@ -1051,13 +1060,13 @@
 				       &con->actual_peer_addr) &&
 	    !(addr_is_blank(&con->actual_peer_addr.in_addr) &&
 	      con->actual_peer_addr.nonce == con->peer_addr.nonce)) {
-		pr_err("wrong peer, want %s/%d, "
-		       "got %s/%d, wtf\n",
+		pr_warning("wrong peer, want %s/%d, "
+		       "got %s/%d\n",
 		       pr_addr(&con->peer_addr.in_addr),
 		       con->peer_addr.nonce,
 		       pr_addr(&con->actual_peer_addr.in_addr),
 		       con->actual_peer_addr.nonce);
-		con->error_msg = "protocol error, wrong peer";
+		con->error_msg = "wrong peer at address";
 		return -1;
 	}
 
@@ -2096,6 +2105,7 @@
 	/* data */
 	m->nr_pages = calc_pages_for(page_off, page_len);
 	m->pages = pages;
+	m->pagelist = NULL;
 
 	dout("ceph_msg_new %p page %d~%d -> %d\n", m, page_off, page_len,
 	     m->nr_pages);
@@ -2181,6 +2191,12 @@
 	m->nr_pages = 0;
 	m->pages = NULL;
 
+	if (m->pagelist) {
+		ceph_pagelist_release(m->pagelist);
+		kfree(m->pagelist);
+		m->pagelist = NULL;
+	}
+
 	if (m->pool)
 		ceph_msgpool_put(m->pool, m);
 	else
diff --git a/fs/ceph/messenger.h b/fs/ceph/messenger.h
index 7e2aab1..a7b6841 100644
--- a/fs/ceph/messenger.h
+++ b/fs/ceph/messenger.h
@@ -85,6 +85,7 @@
 	struct ceph_buffer *middle;
 	struct page **pages;            /* data payload.  NOT OWNER. */
 	unsigned nr_pages;              /* size of page array */
+	struct ceph_pagelist *pagelist; /* instead of pages */
 	struct list_head list_head;
 	struct kref kref;
 	bool front_is_vmalloc;
diff --git a/fs/ceph/pagelist.c b/fs/ceph/pagelist.c
new file mode 100644
index 0000000..370e936
--- /dev/null
+++ b/fs/ceph/pagelist.c
@@ -0,0 +1,54 @@
+
+#include <linux/pagemap.h>
+#include <linux/highmem.h>
+
+#include "pagelist.h"
+
+int ceph_pagelist_release(struct ceph_pagelist *pl)
+{
+	if (pl->mapped_tail)
+		kunmap(pl->mapped_tail);
+	while (!list_empty(&pl->head)) {
+		struct page *page = list_first_entry(&pl->head, struct page,
+						     lru);
+		list_del(&page->lru);
+		__free_page(page);
+	}
+	return 0;
+}
+
+static int ceph_pagelist_addpage(struct ceph_pagelist *pl)
+{
+	struct page *page = alloc_page(GFP_NOFS);
+	if (!page)
+		return -ENOMEM;
+	pl->room += PAGE_SIZE;
+	list_add_tail(&page->lru, &pl->head);
+	if (pl->mapped_tail)
+		kunmap(pl->mapped_tail);
+	pl->mapped_tail = kmap(page);
+	return 0;
+}
+
+int ceph_pagelist_append(struct ceph_pagelist *pl, void *buf, size_t len)
+{
+	while (pl->room < len) {
+		size_t bit = pl->room;
+		int ret;
+
+		memcpy(pl->mapped_tail + (pl->length & ~PAGE_CACHE_MASK),
+		       buf, bit);
+		pl->length += bit;
+		pl->room -= bit;
+		buf += bit;
+		len -= bit;
+		ret = ceph_pagelist_addpage(pl);
+		if (ret)
+			return ret;
+	}
+
+	memcpy(pl->mapped_tail + (pl->length & ~PAGE_CACHE_MASK), buf, len);
+	pl->length += len;
+	pl->room -= len;
+	return 0;
+}
diff --git a/fs/ceph/pagelist.h b/fs/ceph/pagelist.h
new file mode 100644
index 0000000..e8a4187
--- /dev/null
+++ b/fs/ceph/pagelist.h
@@ -0,0 +1,54 @@
+#ifndef __FS_CEPH_PAGELIST_H
+#define __FS_CEPH_PAGELIST_H
+
+#include <linux/list.h>
+
+struct ceph_pagelist {
+	struct list_head head;
+	void *mapped_tail;
+	size_t length;
+	size_t room;
+};
+
+static inline void ceph_pagelist_init(struct ceph_pagelist *pl)
+{
+	INIT_LIST_HEAD(&pl->head);
+	pl->mapped_tail = NULL;
+	pl->length = 0;
+	pl->room = 0;
+}
+extern int ceph_pagelist_release(struct ceph_pagelist *pl);
+
+extern int ceph_pagelist_append(struct ceph_pagelist *pl, void *d, size_t l);
+
+static inline int ceph_pagelist_encode_64(struct ceph_pagelist *pl, u64 v)
+{
+	__le64 ev = cpu_to_le64(v);
+	return ceph_pagelist_append(pl, &ev, sizeof(ev));
+}
+static inline int ceph_pagelist_encode_32(struct ceph_pagelist *pl, u32 v)
+{
+	__le32 ev = cpu_to_le32(v);
+	return ceph_pagelist_append(pl, &ev, sizeof(ev));
+}
+static inline int ceph_pagelist_encode_16(struct ceph_pagelist *pl, u16 v)
+{
+	__le16 ev = cpu_to_le16(v);
+	return ceph_pagelist_append(pl, &ev, sizeof(ev));
+}
+static inline int ceph_pagelist_encode_8(struct ceph_pagelist *pl, u8 v)
+{
+	return ceph_pagelist_append(pl, &v, 1);
+}
+static inline int ceph_pagelist_encode_string(struct ceph_pagelist *pl,
+					      char *s, size_t len)
+{
+	int ret = ceph_pagelist_encode_32(pl, len);
+	if (ret)
+		return ret;
+	if (len)
+		return ceph_pagelist_append(pl, s, len);
+	return 0;
+}
+
+#endif