libceph: separate read and write data

An osd request defines information about where data to be read
should be placed as well as where data to write comes from.
Currently these are represented by common fields.

Keep information about data for writing separate from data to be
read by splitting these into data_in and data_out fields.

This is the key patch in this whole series, in that it actually
identifies which osd requests generate outgoing data and which
generate incoming data.  It's less obvious (currently) that an osd
CALL op generates both outgoing and incoming data; that's the focus
of some upcoming work.

This resolves:
    http://tracker.ceph.com/issues/4127

Signed-off-by: Alex Elder <elder@inktank.com>
Reviewed-by: Josh Durgin <josh.durgin@inktank.com>
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index 591e1b0..f9cf445 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -122,10 +122,16 @@
 	}
 	if (req->r_reply)
 		ceph_msg_put(req->r_reply);
-	if (req->r_data.type == CEPH_OSD_DATA_TYPE_PAGES &&
-			req->r_data.own_pages)
-		ceph_release_page_vector(req->r_data.pages,
-					 req->r_data.num_pages);
+
+	if (req->r_data_in.type == CEPH_OSD_DATA_TYPE_PAGES &&
+			req->r_data_in.own_pages)
+		ceph_release_page_vector(req->r_data_in.pages,
+					 req->r_data_in.num_pages);
+	if (req->r_data_out.type == CEPH_OSD_DATA_TYPE_PAGES &&
+			req->r_data_out.own_pages)
+		ceph_release_page_vector(req->r_data_out.pages,
+					 req->r_data_out.num_pages);
+
 	ceph_put_snap_context(req->r_snapc);
 	ceph_pagelist_release(&req->r_trail);
 	if (req->r_mempool)
@@ -189,7 +195,8 @@
 	}
 	req->r_reply = msg;
 
-	req->r_data.type = CEPH_OSD_DATA_TYPE_NONE;
+	req->r_data_in.type = CEPH_OSD_DATA_TYPE_NONE;
+	req->r_data_out.type = CEPH_OSD_DATA_TYPE_NONE;
 	ceph_pagelist_init(&req->r_trail);
 
 	/* create request message; allow space for oid */
@@ -1740,17 +1747,21 @@
 			    bool nofail)
 {
 	int rc = 0;
+	struct ceph_osd_data *osd_data;
 
-	if (req->r_data.type == CEPH_OSD_DATA_TYPE_PAGES) {
-		req->r_request->pages = req->r_data.pages;
-		req->r_request->page_count = req->r_data.num_pages;
-		req->r_request->page_alignment = req->r_data.alignment;
+	/* Set up outgoing data */
+
+	osd_data = &req->r_data_out;
+	if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES) {
+		req->r_request->pages = osd_data->pages;
+		req->r_request->page_count = osd_data->num_pages;
+		req->r_request->page_alignment = osd_data->alignment;
 #ifdef CONFIG_BLOCK
-	} else if (req->r_data.type == CEPH_OSD_DATA_TYPE_BIO) {
-		req->r_request->bio = req->r_data.bio;
+	} else if (osd_data->type == CEPH_OSD_DATA_TYPE_BIO) {
+		req->r_request->bio = osd_data->bio;
 #endif
 	} else {
-		pr_err("unknown request data type %d\n", req->r_data.type);
+		BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_NONE);
 	}
 	req->r_request->trail = &req->r_trail;
 
@@ -1939,6 +1950,7 @@
 			struct page **pages, int num_pages, int page_align)
 {
 	struct ceph_osd_request *req;
+	struct ceph_osd_data *osd_data;
 	int rc = 0;
 
 	dout("readpages on ino %llx.%llx on %llu~%llu\n", vino.ino,
@@ -1951,13 +1963,15 @@
 		return PTR_ERR(req);
 
 	/* it may be a short read due to an object boundary */
-	req->r_data.type = CEPH_OSD_DATA_TYPE_PAGES;
-	req->r_data.pages = pages;
-	req->r_data.num_pages = calc_pages_for(page_align, *plen);
-	req->r_data.alignment = page_align;
+
+	osd_data = &req->r_data_in;
+	osd_data->type = CEPH_OSD_DATA_TYPE_PAGES;
+	osd_data->pages = pages;
+	osd_data->num_pages = calc_pages_for(page_align, *plen);
+	osd_data->alignment = page_align;
 
 	dout("readpages  final extent is %llu~%llu (%d pages align %d)\n",
-	     off, *plen, req->r_data.num_pages, page_align);
+	     off, *plen, osd_data->num_pages, page_align);
 
 	rc = ceph_osdc_start_request(osdc, req, false);
 	if (!rc)
@@ -1981,6 +1995,7 @@
 			 struct page **pages, int num_pages)
 {
 	struct ceph_osd_request *req;
+	struct ceph_osd_data *osd_data;
 	int rc = 0;
 	int page_align = off & ~PAGE_MASK;
 
@@ -1995,11 +2010,13 @@
 		return PTR_ERR(req);
 
 	/* it may be a short write due to an object boundary */
-	req->r_data.type = CEPH_OSD_DATA_TYPE_PAGES;
-	req->r_data.pages = pages;
-	req->r_data.num_pages = calc_pages_for(page_align, len);
-	req->r_data.alignment = page_align;
-	dout("writepages %llu~%llu (%d pages)\n", off, len, req->r_data.num_pages);
+	osd_data = &req->r_data_out;
+	osd_data->type = CEPH_OSD_DATA_TYPE_PAGES;
+	osd_data->pages = pages;
+	osd_data->num_pages = calc_pages_for(page_align, len);
+	osd_data->alignment = page_align;
+	dout("writepages %llu~%llu (%d pages)\n", off, len,
+		osd_data->num_pages);
 
 	rc = ceph_osdc_start_request(osdc, req, true);
 	if (!rc)
@@ -2092,28 +2109,30 @@
 	m = ceph_msg_get(req->r_reply);
 
 	if (data_len > 0) {
-		if (req->r_data.type == CEPH_OSD_DATA_TYPE_PAGES) {
+		struct ceph_osd_data *osd_data = &req->r_data_in;
+
+		if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES) {
 			int want;
 
-			want = calc_pages_for(req->r_data.alignment, data_len);
-			if (req->r_data.pages &&
-				unlikely(req->r_data.num_pages < want)) {
+			want = calc_pages_for(osd_data->alignment, data_len);
+			if (osd_data->pages &&
+				unlikely(osd_data->num_pages < want)) {
 
 				pr_warning("tid %lld reply has %d bytes %d "
 					"pages, we had only %d pages ready\n",
 					tid, data_len, want,
-					req->r_data.num_pages);
+					osd_data->num_pages);
 				*skip = 1;
 				ceph_msg_put(m);
 				m = NULL;
 				goto out;
 			}
-			m->pages = req->r_data.pages;
-			m->page_count = req->r_data.num_pages;
-			m->page_alignment = req->r_data.alignment;
+			m->pages = osd_data->pages;
+			m->page_count = osd_data->num_pages;
+			m->page_alignment = osd_data->alignment;
 #ifdef CONFIG_BLOCK
-		} else if (req->r_data.type == CEPH_OSD_DATA_TYPE_BIO) {
-			m->bio = req->r_data.bio;
+		} else if (osd_data->type == CEPH_OSD_DATA_TYPE_BIO) {
+			m->bio = osd_data->bio;
 #endif
 		}
 	}