libceph: distinguish page and bio requests

An osd request uses either pages or a bio list for its data.  Use a
union to record information about the two, and add a data type
tag to select between them.

Signed-off-by: Alex Elder <elder@inktank.com>
Reviewed-by: Josh Durgin <josh.durgin@inktank.com>
diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index 0e814df..f189bc2 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -1425,12 +1425,16 @@
 		break;		/* Nothing to do */
 	case OBJ_REQUEST_BIO:
 		rbd_assert(obj_request->bio_list != NULL);
+		osd_req->r_data.type = CEPH_OSD_DATA_TYPE_BIO;
 		osd_req->r_data.bio = obj_request->bio_list;
 		break;
 	case OBJ_REQUEST_PAGES:
+		osd_req->r_data.type = CEPH_OSD_DATA_TYPE_PAGES;
 		osd_req->r_data.pages = obj_request->pages;
 		osd_req->r_data.num_pages = obj_request->page_count;
 		osd_req->r_data.alignment = offset & ~PAGE_MASK;
+		osd_req->r_data.pages_from_pool = false;
+		osd_req->r_data.own_pages = false;
 		break;
 	}
 
diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c
index 3a1a77b..276fe96 100644
--- a/fs/ceph/addr.c
+++ b/fs/ceph/addr.c
@@ -243,6 +243,7 @@
 	dout("finish_read %p req %p rc %d bytes %d\n", inode, req, rc, bytes);
 
 	/* unlock all pages, zeroing any data we didn't read */
+	BUG_ON(req->r_data.type != CEPH_OSD_DATA_TYPE_PAGES);
 	for (i = 0; i < req->r_data.num_pages; i++, bytes -= PAGE_CACHE_SIZE) {
 		struct page *page = req->r_data.pages[i];
 
@@ -336,6 +337,7 @@
 		}
 		pages[i] = page;
 	}
+	req->r_data.type = CEPH_OSD_DATA_TYPE_PAGES;
 	req->r_data.pages = pages;
 	req->r_data.num_pages = nr_pages;
 	req->r_data.alignment = 0;
@@ -561,6 +563,7 @@
 	long writeback_stat;
 	unsigned issued = ceph_caps_issued(ci);
 
+	BUG_ON(req->r_data.type != CEPH_OSD_DATA_TYPE_PAGES);
 	if (rc >= 0) {
 		/*
 		 * Assume we wrote the pages we originally sent.  The
@@ -830,6 +833,7 @@
 					break;
 				}
 
+				req->r_data.type = CEPH_OSD_DATA_TYPE_PAGES;
 				req->r_data.num_pages = calc_pages_for(0, len);
 				req->r_data.alignment = 0;
 				max_pages = req->r_data.num_pages;
diff --git a/fs/ceph/file.c b/fs/ceph/file.c
index d35fc05..3643a38 100644
--- a/fs/ceph/file.c
+++ b/fs/ceph/file.c
@@ -571,6 +571,7 @@
 			req->r_data.own_pages = 1;
 		}
 	}
+	req->r_data.type = CEPH_OSD_DATA_TYPE_PAGES;
 	req->r_data.pages = pages;
 	req->r_data.num_pages = num_pages;
 	req->r_data.alignment = page_align;
diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h
index 600b827..56604b3 100644
--- a/include/linux/ceph/osd_client.h
+++ b/include/linux/ceph/osd_client.h
@@ -50,8 +50,17 @@
 
 #define CEPH_OSD_MAX_OP 10
 
+enum ceph_osd_data_type {
+	CEPH_OSD_DATA_TYPE_NONE,
+	CEPH_OSD_DATA_TYPE_PAGES,
+#ifdef CONFIG_BLOCK
+	CEPH_OSD_DATA_TYPE_BIO,
+#endif /* CONFIG_BLOCK */
+};
+
 struct ceph_osd_data {
-	struct {
+	enum ceph_osd_data_type	type;
+	union {
 		struct {
 			struct page	**pages;
 			u32		num_pages;
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index 1f8c7a7..591e1b0 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -122,7 +122,8 @@
 	}
 	if (req->r_reply)
 		ceph_msg_put(req->r_reply);
-	if (req->r_data.own_pages)
+	if (req->r_data.type == CEPH_OSD_DATA_TYPE_PAGES &&
+			req->r_data.own_pages)
 		ceph_release_page_vector(req->r_data.pages,
 					 req->r_data.num_pages);
 	ceph_put_snap_context(req->r_snapc);
@@ -188,6 +189,7 @@
 	}
 	req->r_reply = msg;
 
+	req->r_data.type = CEPH_OSD_DATA_TYPE_NONE;
 	ceph_pagelist_init(&req->r_trail);
 
 	/* create request message; allow space for oid */
@@ -1739,12 +1741,17 @@
 {
 	int rc = 0;
 
-	req->r_request->pages = req->r_data.pages;
-	req->r_request->page_count = req->r_data.num_pages;
-	req->r_request->page_alignment = req->r_data.alignment;
+	if (req->r_data.type == CEPH_OSD_DATA_TYPE_PAGES) {
+		req->r_request->pages = req->r_data.pages;
+		req->r_request->page_count = req->r_data.num_pages;
+		req->r_request->page_alignment = req->r_data.alignment;
 #ifdef CONFIG_BLOCK
-	req->r_request->bio = req->r_data.bio;
+	} else if (req->r_data.type == CEPH_OSD_DATA_TYPE_BIO) {
+		req->r_request->bio = req->r_data.bio;
 #endif
+	} else {
+		pr_err("unknown request data type %d\n", req->r_data.type);
+	}
 	req->r_request->trail = &req->r_trail;
 
 	register_request(osdc, req);
@@ -1944,6 +1951,7 @@
 		return PTR_ERR(req);
 
 	/* it may be a short read due to an object boundary */
+	req->r_data.type = CEPH_OSD_DATA_TYPE_PAGES;
 	req->r_data.pages = pages;
 	req->r_data.num_pages = calc_pages_for(page_align, *plen);
 	req->r_data.alignment = page_align;
@@ -1987,6 +1995,7 @@
 		return PTR_ERR(req);
 
 	/* it may be a short write due to an object boundary */
+	req->r_data.type = CEPH_OSD_DATA_TYPE_PAGES;
 	req->r_data.pages = pages;
 	req->r_data.num_pages = calc_pages_for(page_align, len);
 	req->r_data.alignment = page_align;
@@ -2083,23 +2092,30 @@
 	m = ceph_msg_get(req->r_reply);
 
 	if (data_len > 0) {
-		int want = calc_pages_for(req->r_data.alignment, data_len);
+		if (req->r_data.type == CEPH_OSD_DATA_TYPE_PAGES) {
+			int want;
 
-		if (req->r_data.pages && unlikely(req->r_data.num_pages < want)) {
-			pr_warning("tid %lld reply has %d bytes %d pages, we"
-				   " had only %d pages ready\n", tid, data_len,
-				   want, req->r_data.num_pages);
-			*skip = 1;
-			ceph_msg_put(m);
-			m = NULL;
-			goto out;
-		}
-		m->pages = req->r_data.pages;
-		m->page_count = req->r_data.num_pages;
-		m->page_alignment = req->r_data.alignment;
+			want = calc_pages_for(req->r_data.alignment, data_len);
+			if (req->r_data.pages &&
+				unlikely(req->r_data.num_pages < want)) {
+
+				pr_warning("tid %lld reply has %d bytes %d "
+					"pages, we had only %d pages ready\n",
+					tid, data_len, want,
+					req->r_data.num_pages);
+				*skip = 1;
+				ceph_msg_put(m);
+				m = NULL;
+				goto out;
+			}
+			m->pages = req->r_data.pages;
+			m->page_count = req->r_data.num_pages;
+			m->page_alignment = req->r_data.alignment;
 #ifdef CONFIG_BLOCK
-		m->bio = req->r_data.bio;
+		} else if (req->r_data.type == CEPH_OSD_DATA_TYPE_BIO) {
+			m->bio = req->r_data.bio;
 #endif
+		}
 	}
 	*skip = 0;
 	req->r_con_filling_msg = con->ops->get(con);