libceph: implement bio message data item cursor

Implement and use cursor routines for bio message data items for
outbound message data.

(See the previous commit for reasoning in support of the changes
in out_msg_pos_next().)

Signed-off-by: Alex Elder <elder@inktank.com>
Reviewed-by: Josh Durgin <josh.durgin@inktank.com>
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index 30c8792..209990a 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -739,6 +739,95 @@
 	if (*seg == (*bio_iter)->bi_vcnt)
 		init_bio_iter((*bio_iter)->bi_next, bio_iter, seg);
 }
+
+/*
+ * For a bio data item, a piece is whatever remains of the next
+ * entry in the current bio iovec, or the first entry in the next
+ * bio in the list.
+ */
+static void ceph_msg_data_bio_cursor_init(struct ceph_msg_data *data)
+{
+	struct ceph_msg_data_cursor *cursor = &data->cursor;
+	struct bio *bio;
+
+	BUG_ON(data->type != CEPH_MSG_DATA_BIO);
+
+	bio = data->bio;
+	BUG_ON(!bio);
+	BUG_ON(!bio->bi_vcnt);
+	/* resid = bio->bi_size */
+
+	cursor->bio = bio;
+	cursor->vector_index = 0;
+	cursor->vector_offset = 0;
+	cursor->last_piece = !bio->bi_next && bio->bi_vcnt == 1;
+}
+
+static struct page *ceph_msg_data_bio_next(struct ceph_msg_data *data,
+						size_t *page_offset,
+						size_t *length)
+{
+	struct ceph_msg_data_cursor *cursor = &data->cursor;
+	struct bio *bio;
+	struct bio_vec *bio_vec;
+	unsigned int index;
+
+	BUG_ON(data->type != CEPH_MSG_DATA_BIO);
+
+	bio = cursor->bio;
+	BUG_ON(!bio);
+
+	index = cursor->vector_index;
+	BUG_ON(index >= (unsigned int) bio->bi_vcnt);
+
+	bio_vec = &bio->bi_io_vec[index];
+	BUG_ON(cursor->vector_offset >= bio_vec->bv_len);
+	*page_offset = (size_t) (bio_vec->bv_offset + cursor->vector_offset);
+	BUG_ON(*page_offset >= PAGE_SIZE);
+	*length = (size_t) (bio_vec->bv_len - cursor->vector_offset);
+	BUG_ON(*length > PAGE_SIZE);
+
+	return bio_vec->bv_page;
+}
+
+static bool ceph_msg_data_bio_advance(struct ceph_msg_data *data, size_t bytes)
+{
+	struct ceph_msg_data_cursor *cursor = &data->cursor;
+	struct bio *bio;
+	struct bio_vec *bio_vec;
+	unsigned int index;
+
+	BUG_ON(data->type != CEPH_MSG_DATA_BIO);
+
+	bio = cursor->bio;
+	BUG_ON(!bio);
+
+	index = cursor->vector_index;
+	BUG_ON(index >= (unsigned int) bio->bi_vcnt);
+	bio_vec = &bio->bi_io_vec[index];
+	BUG_ON(cursor->vector_offset + bytes > bio_vec->bv_len);
+
+	/* Advance the cursor offset */
+
+	cursor->vector_offset += bytes;
+	if (cursor->vector_offset < bio_vec->bv_len)
+		return false;	/* more bytes to process in this segment */
+
+	/* Move on to the next segment, and possibly the next bio */
+
+	if (++cursor->vector_index == (unsigned int) bio->bi_vcnt) {
+		bio = bio->bi_next;
+		cursor->bio = bio;
+		cursor->vector_index = 0;
+	}
+	cursor->vector_offset = 0;
+
+	if (!cursor->last_piece && bio && !bio->bi_next)
+		if (cursor->vector_index == (unsigned int) bio->bi_vcnt - 1)
+			cursor->last_piece = true;
+
+	return true;
+}
 #endif
 
 /*
@@ -843,11 +932,13 @@
 	case CEPH_MSG_DATA_PAGELIST:
 		ceph_msg_data_pagelist_cursor_init(data);
 		break;
-	case CEPH_MSG_DATA_NONE:
-	case CEPH_MSG_DATA_PAGES:
 #ifdef CONFIG_BLOCK
 	case CEPH_MSG_DATA_BIO:
+		ceph_msg_data_bio_cursor_init(data);
+		break;
 #endif /* CONFIG_BLOCK */
+	case CEPH_MSG_DATA_NONE:
+	case CEPH_MSG_DATA_PAGES:
 	default:
 		/* BUG(); */
 		break;
@@ -870,11 +961,13 @@
 	case CEPH_MSG_DATA_PAGELIST:
 		page = ceph_msg_data_pagelist_next(data, page_offset, length);
 		break;
-	case CEPH_MSG_DATA_NONE:
-	case CEPH_MSG_DATA_PAGES:
 #ifdef CONFIG_BLOCK
 	case CEPH_MSG_DATA_BIO:
+		page = ceph_msg_data_bio_next(data, page_offset, length);
+		break;
 #endif /* CONFIG_BLOCK */
+	case CEPH_MSG_DATA_NONE:
+	case CEPH_MSG_DATA_PAGES:
 	default:
 		page = NULL;
 		break;
@@ -900,11 +993,13 @@
 	case CEPH_MSG_DATA_PAGELIST:
 		new_piece = ceph_msg_data_pagelist_advance(data, bytes);
 		break;
-	case CEPH_MSG_DATA_NONE:
-	case CEPH_MSG_DATA_PAGES:
 #ifdef CONFIG_BLOCK
 	case CEPH_MSG_DATA_BIO:
+		new_piece = ceph_msg_data_bio_advance(data, bytes);
+		break;
 #endif /* CONFIG_BLOCK */
+	case CEPH_MSG_DATA_NONE:
+	case CEPH_MSG_DATA_PAGES:
 	default:
 		BUG();
 		break;
@@ -933,6 +1028,10 @@
 
 	/* Initialize data cursors */
 
+#ifdef CONFIG_BLOCK
+	if (ceph_msg_has_bio(msg))
+		ceph_msg_data_cursor_init(&msg->b);
+#endif /* CONFIG_BLOCK */
 	if (ceph_msg_has_pagelist(msg))
 		ceph_msg_data_cursor_init(&msg->l);
 	if (ceph_msg_has_trail(msg))
@@ -1233,6 +1332,10 @@
 		need_crc = ceph_msg_data_advance(&msg->t, sent);
 	else if (ceph_msg_has_pagelist(msg))
 		need_crc = ceph_msg_data_advance(&msg->l, sent);
+#ifdef CONFIG_BLOCK
+	else if (ceph_msg_has_bio(msg))
+		need_crc = ceph_msg_data_advance(&msg->b, sent);
+#endif /* CONFIG_BLOCK */
 	BUG_ON(need_crc && sent != len);
 
 	if (sent < len)
@@ -1242,10 +1345,6 @@
 	msg_pos->page_pos = 0;
 	msg_pos->page++;
 	msg_pos->did_page_crc = false;
-#ifdef CONFIG_BLOCK
-	if (ceph_msg_has_bio(msg))
-		iter_bio_next(&msg->b.bio_iter, &msg->b.bio_seg);
-#endif
 }
 
 static void in_msg_pos_next(struct ceph_connection *con, size_t len,
@@ -1323,8 +1422,6 @@
 		struct page *page = NULL;
 		size_t page_offset;
 		size_t length;
-		int max_write = PAGE_SIZE;
-		int bio_offset = 0;
 		bool use_cursor = false;
 		bool last_piece = true;	/* preserve existing behavior */
 
@@ -1345,21 +1442,19 @@
 							&length, &last_piece);
 #ifdef CONFIG_BLOCK
 		} else if (ceph_msg_has_bio(msg)) {
-			struct bio_vec *bv;
-
-			bv = bio_iovec_idx(msg->b.bio_iter, msg->b.bio_seg);
-			page = bv->bv_page;
-			bio_offset = bv->bv_offset;
-			max_write = bv->bv_len;
+			use_cursor = true;
+			page = ceph_msg_data_next(&msg->b, &page_offset,
+							&length, &last_piece);
 #endif
 		} else {
 			page = zero_page;
 		}
-		if (!use_cursor)
-			length = min_t(int, max_write - msg_pos->page_pos,
+		if (!use_cursor) {
+			length = min_t(int, PAGE_SIZE - msg_pos->page_pos,
 					    total_max_write);
 
-		page_offset = msg_pos->page_pos + bio_offset;
+			page_offset = msg_pos->page_pos;
+		}
 		if (do_datacrc && !msg_pos->did_page_crc) {
 			u32 crc = le32_to_cpu(msg->footer.data_crc);