libceph: abstract message data

Group the types of message data into an abstract structure with a
type indicator and a union containing fields appropriate to the
type of data it represents.  Use this to represent the pages,
pagelist, bio, and trail in a ceph message.

Verify message data is of type NONE in ceph_msg_data_set_*()
routines.  Since information about message data of type NONE really
should not be interpreted, get rid of the other assertions in those
functions.

Signed-off-by: Alex Elder <elder@inktank.com>
Reviewed-by: Josh Durgin <josh.durgin@inktank.com>
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index f485455..f256b4b 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -1054,7 +1054,7 @@
 	msg_pos->did_page_crc = false;
 	if (in_trail) {
 		BUG_ON(!ceph_msg_has_trail(msg));
-		list_rotate_left(&msg->t.trail->head);
+		list_rotate_left(&msg->t.pagelist->head);
 	} else if (ceph_msg_has_pagelist(msg)) {
 		list_rotate_left(&msg->l.pagelist->head);
 #ifdef CONFIG_BLOCK
@@ -1120,7 +1120,7 @@
 	size_t trail_off = data_len;
 
 	if (ceph_msg_has_trail(msg)) {
-		trail_len = msg->t.trail->length;
+		trail_len = msg->t.pagelist->length;
 		trail_off -= trail_len;
 	}
 
@@ -1149,7 +1149,7 @@
 		if (in_trail) {
 			BUG_ON(!ceph_msg_has_trail(msg));
 			total_max_write = data_len - msg_pos->data_pos;
-			page = list_first_entry(&msg->t.trail->head,
+			page = list_first_entry(&msg->t.pagelist->head,
 						struct page, lru);
 		} else if (ceph_msg_has_pages(msg)) {
 			page = msg->p.pages[msg_pos->page];
@@ -2736,14 +2736,19 @@
 }
 EXPORT_SYMBOL(ceph_con_keepalive);
 
+static void ceph_msg_data_init(struct ceph_msg_data *data)
+{
+	data->type = CEPH_MSG_DATA_NONE;
+}
+
 void ceph_msg_data_set_pages(struct ceph_msg *msg, struct page **pages,
 		size_t length, size_t alignment)
 {
 	BUG_ON(!pages);
 	BUG_ON(!length);
-	BUG_ON(msg->p.pages);
-	BUG_ON(msg->p.length);
+	BUG_ON(msg->p.type != CEPH_MSG_DATA_NONE);
 
+	msg->p.type = CEPH_MSG_DATA_PAGES;
 	msg->p.pages = pages;
 	msg->p.length = length;
 	msg->p.alignment = alignment & ~PAGE_MASK;
@@ -2755,8 +2760,9 @@
 {
 	BUG_ON(!pagelist);
 	BUG_ON(!pagelist->length);
-	BUG_ON(msg->l.pagelist);
+	BUG_ON(msg->l.type != CEPH_MSG_DATA_NONE);
 
+	msg->l.type = CEPH_MSG_DATA_PAGELIST;
 	msg->l.pagelist = pagelist;
 }
 EXPORT_SYMBOL(ceph_msg_data_set_pagelist);
@@ -2764,8 +2770,9 @@
 void ceph_msg_data_set_bio(struct ceph_msg *msg, struct bio *bio)
 {
 	BUG_ON(!bio);
-	BUG_ON(msg->b.bio);
+	BUG_ON(msg->b.type != CEPH_MSG_DATA_NONE);
 
+	msg->b.type = CEPH_MSG_DATA_BIO;
 	msg->b.bio = bio;
 }
 EXPORT_SYMBOL(ceph_msg_data_set_bio);
@@ -2774,9 +2781,10 @@
 {
 	BUG_ON(!trail);
 	BUG_ON(!trail->length);
-	BUG_ON(msg->t.trail);
+	BUG_ON(msg->b.type != CEPH_MSG_DATA_NONE);
 
-	msg->t.trail = trail;
+	msg->t.type = CEPH_MSG_DATA_PAGELIST;
+	msg->t.pagelist = trail;
 }
 EXPORT_SYMBOL(ceph_msg_data_set_trail);
 
@@ -2800,6 +2808,11 @@
 	INIT_LIST_HEAD(&m->list_head);
 	kref_init(&m->kref);
 
+	ceph_msg_data_init(&m->p);
+	ceph_msg_data_init(&m->l);
+	ceph_msg_data_init(&m->b);
+	ceph_msg_data_init(&m->t);
+
 	/* front */
 	m->front_max = front_len;
 	if (front_len) {
@@ -2965,7 +2978,7 @@
 	}
 
 	if (ceph_msg_has_trail(m))
-		m->t.trail = NULL;
+		m->t.pagelist = NULL;
 
 	if (m->pool)
 		ceph_msgpool_put(m->pool, m);