rbd: simplify rbd_rq_fn()

When processing a request, rbd_rq_fn() makes clones of the bio's in
the request's bio chain and submits the results to osd's to be
satisfied.  If a request bio straddles the boundary between objects
backing the rbd image, it must be represented by two cloned bio's,
one for the first part (at the end of one object) and one for the
second (at the beginning of the next object).

This has been handled by a function bio_chain_clone(), which
includes an interface only a mother could love, and which has
been found to have other problems.

This patch defines two new fairly generic bio functions (one which
replaces bio_chain_clone()) to help out the situation, and then
revises rbd_rq_fn() to make use of them.

First, bio_clone_range() clones a portion of a single bio, starting
at a given offset within the bio and including only as many bytes
as requested.  As a convenience, a request to clone the entire bio
is passed directly to bio_clone().

Second, bio_chain_clone_range() performs a similar function,
producing a chain of cloned bio's covering a sub-range of the
source chain.  No bio_pair structures are used, and if successful
the result will represent exactly the specified range.

Using bio_chain_clone_range() makes bio_rq_fn() a little easier
to understand, because it avoids the need to pass very much
state information between consecutive calls.  By avoiding the need
to track a bio_pair structure, it also eliminates the problem
described here:  http://tracker.newdream.net/issues/2933

Note that a block request (and therefore the complete length of
a bio chain processed in rbd_rq_fn()) is an unsigned int, while
the result of rbd_segment_length() is u64.  This change makes
this range trunctation explicit, and trips a bug if the the
segment boundary is too far off.

Signed-off-by: Alex Elder <elder@inktank.com>
Reviewed-by: Josh Durgin <josh.durgin@inktank.com>
diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index c800047..cc06c55 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -826,77 +826,144 @@
 }
 
 /*
- * bio_chain_clone - clone a chain of bios up to a certain length.
- * might return a bio_pair that will need to be released.
+ * Clone a portion of a bio, starting at the given byte offset
+ * and continuing for the number of bytes indicated.
  */
-static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
-				   struct bio_pair **bp,
-				   int len, gfp_t gfpmask)
+static struct bio *bio_clone_range(struct bio *bio_src,
+					unsigned int offset,
+					unsigned int len,
+					gfp_t gfpmask)
 {
-	struct bio *old_chain = *old;
-	struct bio *new_chain = NULL;
-	struct bio *tail;
-	int total = 0;
+	struct bio_vec *bv;
+	unsigned int resid;
+	unsigned short idx;
+	unsigned int voff;
+	unsigned short end_idx;
+	unsigned short vcnt;
+	struct bio *bio;
 
-	if (*bp) {
-		bio_pair_release(*bp);
-		*bp = NULL;
+	/* Handle the easy case for the caller */
+
+	if (!offset && len == bio_src->bi_size)
+		return bio_clone(bio_src, gfpmask);
+
+	if (WARN_ON_ONCE(!len))
+		return NULL;
+	if (WARN_ON_ONCE(len > bio_src->bi_size))
+		return NULL;
+	if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
+		return NULL;
+
+	/* Find first affected segment... */
+
+	resid = offset;
+	__bio_for_each_segment(bv, bio_src, idx, 0) {
+		if (resid < bv->bv_len)
+			break;
+		resid -= bv->bv_len;
+	}
+	voff = resid;
+
+	/* ...and the last affected segment */
+
+	resid += len;
+	__bio_for_each_segment(bv, bio_src, end_idx, idx) {
+		if (resid <= bv->bv_len)
+			break;
+		resid -= bv->bv_len;
+	}
+	vcnt = end_idx - idx + 1;
+
+	/* Build the clone */
+
+	bio = bio_alloc(gfpmask, (unsigned int) vcnt);
+	if (!bio)
+		return NULL;	/* ENOMEM */
+
+	bio->bi_bdev = bio_src->bi_bdev;
+	bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
+	bio->bi_rw = bio_src->bi_rw;
+	bio->bi_flags |= 1 << BIO_CLONED;
+
+	/*
+	 * Copy over our part of the bio_vec, then update the first
+	 * and last (or only) entries.
+	 */
+	memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
+			vcnt * sizeof (struct bio_vec));
+	bio->bi_io_vec[0].bv_offset += voff;
+	if (vcnt > 1) {
+		bio->bi_io_vec[0].bv_len -= voff;
+		bio->bi_io_vec[vcnt - 1].bv_len = resid;
+	} else {
+		bio->bi_io_vec[0].bv_len = len;
 	}
 
-	while (old_chain && (total < len)) {
-		struct bio *tmp;
+	bio->bi_vcnt = vcnt;
+	bio->bi_size = len;
+	bio->bi_idx = 0;
 
-		tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
-		if (!tmp)
-			goto err_out;
-		gfpmask &= ~__GFP_WAIT;	/* can't wait after the first */
+	return bio;
+}
 
-		if (total + old_chain->bi_size > len) {
-			struct bio_pair *bp;
+/*
+ * Clone a portion of a bio chain, starting at the given byte offset
+ * into the first bio in the source chain and continuing for the
+ * number of bytes indicated.  The result is another bio chain of
+ * exactly the given length, or a null pointer on error.
+ *
+ * The bio_src and offset parameters are both in-out.  On entry they
+ * refer to the first source bio and the offset into that bio where
+ * the start of data to be cloned is located.
+ *
+ * On return, bio_src is updated to refer to the bio in the source
+ * chain that contains first un-cloned byte, and *offset will
+ * contain the offset of that byte within that bio.
+ */
+static struct bio *bio_chain_clone_range(struct bio **bio_src,
+					unsigned int *offset,
+					unsigned int len,
+					gfp_t gfpmask)
+{
+	struct bio *bi = *bio_src;
+	unsigned int off = *offset;
+	struct bio *chain = NULL;
+	struct bio **end;
 
-			/*
-			 * this split can only happen with a single paged bio,
-			 * split_bio will BUG_ON if this is not the case
-			 */
-			dout("bio_chain_clone split! total=%d remaining=%d"
-			     "bi_size=%u\n",
-			     total, len - total, old_chain->bi_size);
+	/* Build up a chain of clone bios up to the limit */
 
-			/* split the bio. We'll release it either in the next
-			   call, or it will have to be released outside */
-			bp = bio_split(old_chain, (len - total) / SECTOR_SIZE);
-			if (!bp)
-				goto err_out;
+	if (!bi || off >= bi->bi_size || !len)
+		return NULL;		/* Nothing to clone */
 
-			__bio_clone(tmp, &bp->bio1);
+	end = &chain;
+	while (len) {
+		unsigned int bi_size;
+		struct bio *bio;
 
-			*next = &bp->bio2;
-		} else {
-			__bio_clone(tmp, old_chain);
-			*next = old_chain->bi_next;
+		if (!bi)
+			goto out_err;	/* EINVAL; ran out of bio's */
+		bi_size = min_t(unsigned int, bi->bi_size - off, len);
+		bio = bio_clone_range(bi, off, bi_size, gfpmask);
+		if (!bio)
+			goto out_err;	/* ENOMEM */
+
+		*end = bio;
+		end = &bio->bi_next;
+
+		off += bi_size;
+		if (off == bi->bi_size) {
+			bi = bi->bi_next;
+			off = 0;
 		}
-
-		tmp->bi_bdev = NULL;
-		tmp->bi_next = NULL;
-		if (new_chain)
-			tail->bi_next = tmp;
-		else
-			new_chain = tmp;
-		tail = tmp;
-		old_chain = old_chain->bi_next;
-
-		total += tmp->bi_size;
+		len -= bi_size;
 	}
+	*bio_src = bi;
+	*offset = off;
 
-	rbd_assert(total == len);
+	return chain;
+out_err:
+	bio_chain_put(chain);
 
-	*old = old_chain;
-
-	return new_chain;
-
-err_out:
-	dout("bio_chain_clone with err\n");
-	bio_chain_put(new_chain);
 	return NULL;
 }
 
@@ -1014,8 +1081,9 @@
 		req_data->coll_index = coll_index;
 	}
 
-	dout("rbd_do_request object_name=%s ofs=%llu len=%llu\n", object_name,
-		(unsigned long long) ofs, (unsigned long long) len);
+	dout("rbd_do_request object_name=%s ofs=%llu len=%llu coll=%p[%d]\n",
+		object_name, (unsigned long long) ofs,
+		(unsigned long long) len, coll, coll_index);
 
 	osdc = &rbd_dev->rbd_client->client->osdc;
 	req = ceph_osdc_alloc_request(osdc, flags, snapc, ops,
@@ -1463,18 +1531,16 @@
 {
 	struct rbd_device *rbd_dev = q->queuedata;
 	struct request *rq;
-	struct bio_pair *bp = NULL;
 
 	while ((rq = blk_fetch_request(q))) {
 		struct bio *bio;
-		struct bio *rq_bio, *next_bio = NULL;
 		bool do_write;
 		unsigned int size;
-		u64 op_size = 0;
 		u64 ofs;
 		int num_segs, cur_seg = 0;
 		struct rbd_req_coll *coll;
 		struct ceph_snap_context *snapc;
+		unsigned int bio_offset;
 
 		dout("fetched request\n");
 
@@ -1486,10 +1552,6 @@
 
 		/* deduce our operation (read, write) */
 		do_write = (rq_data_dir(rq) == WRITE);
-
-		size = blk_rq_bytes(rq);
-		ofs = blk_rq_pos(rq) * SECTOR_SIZE;
-		rq_bio = rq->bio;
 		if (do_write && rbd_dev->mapping.read_only) {
 			__blk_end_request_all(rq, -EROFS);
 			continue;
@@ -1512,6 +1574,10 @@
 
 		up_read(&rbd_dev->header_rwsem);
 
+		size = blk_rq_bytes(rq);
+		ofs = blk_rq_pos(rq) * SECTOR_SIZE;
+		bio = rq->bio;
+
 		dout("%s 0x%x bytes at 0x%llx\n",
 		     do_write ? "write" : "read",
 		     size, (unsigned long long) blk_rq_pos(rq) * SECTOR_SIZE);
@@ -1531,30 +1597,37 @@
 			continue;
 		}
 
+		bio_offset = 0;
 		do {
-			/* a bio clone to be passed down to OSD req */
+			u64 limit = rbd_segment_length(rbd_dev, ofs, size);
+			unsigned int chain_size;
+			struct bio *bio_chain;
+
+			BUG_ON(limit > (u64) UINT_MAX);
+			chain_size = (unsigned int) limit;
 			dout("rq->bio->bi_vcnt=%hu\n", rq->bio->bi_vcnt);
-			op_size = rbd_segment_length(rbd_dev, ofs, size);
+
 			kref_get(&coll->kref);
-			bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
-					      op_size, GFP_ATOMIC);
-			if (bio)
+
+			/* Pass a cloned bio chain via an osd request */
+
+			bio_chain = bio_chain_clone_range(&bio,
+						&bio_offset, chain_size,
+						GFP_ATOMIC);
+			if (bio_chain)
 				(void) rbd_do_op(rq, rbd_dev, snapc,
-						ofs, op_size,
-						bio, coll, cur_seg);
+						ofs, chain_size,
+						bio_chain, coll, cur_seg);
 			else
 				rbd_coll_end_req_index(rq, coll, cur_seg,
-						       -ENOMEM, op_size);
-			size -= op_size;
-			ofs += op_size;
+						       -ENOMEM, chain_size);
+			size -= chain_size;
+			ofs += chain_size;
 
 			cur_seg++;
-			rq_bio = next_bio;
 		} while (size > 0);
 		kref_put(&coll->kref, rbd_coll_release);
 
-		if (bp)
-			bio_pair_release(bp);
 		spin_lock_irq(q->queue_lock);
 
 		ceph_put_snap_context(snapc);
@@ -1564,7 +1637,7 @@
 /*
  * a queue callback. Makes sure that we don't create a bio that spans across
  * multiple osd objects. One exception would be with a single page bios,
- * which we handle later at bio_chain_clone
+ * which we handle later at bio_chain_clone_range()
  */
 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
 			  struct bio_vec *bvec)