drbd: Allow drbd_epoch_entries to use multiple bios.
This should allow for better performance if the lower level IO stack
of the peers differs in limits exposed either via the queue,
or via some merge_bvec_fn.

Signed-off-by: Philipp Reisner <philipp.reisner@linbit.com>
Signed-off-by: Lars Ellenberg <lars.ellenberg@linbit.com>
diff --git a/drivers/block/drbd/drbd_worker.c b/drivers/block/drbd/drbd_worker.c
index 0bbecf4..d771b1e 100644
--- a/drivers/block/drbd/drbd_worker.c
+++ b/drivers/block/drbd/drbd_worker.c
@@ -47,8 +47,7 @@
 
 /* defined here:
    drbd_md_io_complete
-   drbd_endio_write_sec
-   drbd_endio_read_sec
+   drbd_endio_sec
    drbd_endio_pri
 
  * more endio handlers:
@@ -85,27 +84,10 @@
 /* reads on behalf of the partner,
  * "submitted" by the receiver
  */
-void drbd_endio_read_sec(struct bio *bio, int error) __releases(local)
+void drbd_endio_read_sec_final(struct drbd_epoch_entry *e) __releases(local)
 {
 	unsigned long flags = 0;
-	struct drbd_epoch_entry *e = NULL;
-	struct drbd_conf *mdev;
-	int uptodate = bio_flagged(bio, BIO_UPTODATE);
-
-	e = bio->bi_private;
-	mdev = e->mdev;
-
-	if (error)
-		dev_warn(DEV, "read: error=%d s=%llus\n", error,
-				(unsigned long long)e->sector);
-	if (!error && !uptodate) {
-		dev_warn(DEV, "read: setting error to -EIO s=%llus\n",
-				(unsigned long long)e->sector);
-		/* strange behavior of some lower level drivers...
-		 * fail the request by clearing the uptodate flag,
-		 * but do not return any error?! */
-		error = -EIO;
-	}
+	struct drbd_conf *mdev = e->mdev;
 
 	D_ASSERT(e->block_id != ID_VACANT);
 
@@ -114,49 +96,38 @@
 	list_del(&e->w.list);
 	if (list_empty(&mdev->read_ee))
 		wake_up(&mdev->ee_wait);
+	if (test_bit(__EE_WAS_ERROR, &e->flags))
+		__drbd_chk_io_error(mdev, FALSE);
 	spin_unlock_irqrestore(&mdev->req_lock, flags);
 
-	drbd_chk_io_error(mdev, error, FALSE);
 	drbd_queue_work(&mdev->data.work, &e->w);
 	put_ldev(mdev);
 }
 
+static int is_failed_barrier(int ee_flags)
+{
+	return (ee_flags & (EE_IS_BARRIER|EE_WAS_ERROR|EE_RESUBMITTED))
+			== (EE_IS_BARRIER|EE_WAS_ERROR);
+}
+
 /* writes on behalf of the partner, or resync writes,
- * "submitted" by the receiver.
- */
-void drbd_endio_write_sec(struct bio *bio, int error) __releases(local)
+ * "submitted" by the receiver, final stage.  */
+static void drbd_endio_write_sec_final(struct drbd_epoch_entry *e) __releases(local)
 {
 	unsigned long flags = 0;
-	struct drbd_epoch_entry *e = NULL;
-	struct drbd_conf *mdev;
+	struct drbd_conf *mdev = e->mdev;
 	sector_t e_sector;
 	int do_wake;
 	int is_syncer_req;
 	int do_al_complete_io;
-	int uptodate = bio_flagged(bio, BIO_UPTODATE);
-	int is_barrier = bio_rw_flagged(bio, BIO_RW_BARRIER);
 
-	e = bio->bi_private;
-	mdev = e->mdev;
-
-	if (error)
-		dev_warn(DEV, "write: error=%d s=%llus\n", error,
-				(unsigned long long)e->sector);
-	if (!error && !uptodate) {
-		dev_warn(DEV, "write: setting error to -EIO s=%llus\n",
-				(unsigned long long)e->sector);
-		/* strange behavior of some lower level drivers...
-		 * fail the request by clearing the uptodate flag,
-		 * but do not return any error?! */
-		error = -EIO;
-	}
-
-	/* error == -ENOTSUPP would be a better test,
-	 * alas it is not reliable */
-	if (error && is_barrier && e->flags & EE_IS_BARRIER) {
+	/* if this is a failed barrier request, disable use of barriers,
+	 * and schedule for resubmission */
+	if (is_failed_barrier(e->flags)) {
 		drbd_bump_write_ordering(mdev, WO_bdev_flush);
 		spin_lock_irqsave(&mdev->req_lock, flags);
 		list_del(&e->w.list);
+		e->flags |= EE_RESUBMITTED;
 		e->w.cb = w_e_reissue;
 		/* put_ldev actually happens below, once we come here again. */
 		__release(local);
@@ -167,17 +138,16 @@
 
 	D_ASSERT(e->block_id != ID_VACANT);
 
-	spin_lock_irqsave(&mdev->req_lock, flags);
-	mdev->writ_cnt += e->size >> 9;
-	is_syncer_req = is_syncer_block_id(e->block_id);
-
 	/* after we moved e to done_ee,
 	 * we may no longer access it,
 	 * it may be freed/reused already!
 	 * (as soon as we release the req_lock) */
 	e_sector = e->sector;
 	do_al_complete_io = e->flags & EE_CALL_AL_COMPLETE_IO;
+	is_syncer_req = is_syncer_block_id(e->block_id);
 
+	spin_lock_irqsave(&mdev->req_lock, flags);
+	mdev->writ_cnt += e->size >> 9;
 	list_del(&e->w.list); /* has been on active_ee or sync_ee */
 	list_add_tail(&e->w.list, &mdev->done_ee);
 
@@ -190,7 +160,7 @@
 		? list_empty(&mdev->sync_ee)
 		: list_empty(&mdev->active_ee);
 
-	if (error)
+	if (test_bit(__EE_WAS_ERROR, &e->flags))
 		__drbd_chk_io_error(mdev, FALSE);
 	spin_unlock_irqrestore(&mdev->req_lock, flags);
 
@@ -205,7 +175,42 @@
 
 	wake_asender(mdev);
 	put_ldev(mdev);
+}
 
+/* writes on behalf of the partner, or resync writes,
+ * "submitted" by the receiver.
+ */
+void drbd_endio_sec(struct bio *bio, int error)
+{
+	struct drbd_epoch_entry *e = bio->bi_private;
+	struct drbd_conf *mdev = e->mdev;
+	int uptodate = bio_flagged(bio, BIO_UPTODATE);
+	int is_write = bio_data_dir(bio) == WRITE;
+
+	if (error)
+		dev_warn(DEV, "%s: error=%d s=%llus\n",
+				is_write ? "write" : "read", error,
+				(unsigned long long)e->sector);
+	if (!error && !uptodate) {
+		dev_warn(DEV, "%s: setting error to -EIO s=%llus\n",
+				is_write ? "write" : "read",
+				(unsigned long long)e->sector);
+		/* strange behavior of some lower level drivers...
+		 * fail the request by clearing the uptodate flag,
+		 * but do not return any error?! */
+		error = -EIO;
+	}
+
+	if (error)
+		set_bit(__EE_WAS_ERROR, &e->flags);
+
+	bio_put(bio); /* no need for the bio anymore */
+	if (atomic_dec_and_test(&e->pending_bios)) {
+		if (is_write)
+			drbd_endio_write_sec_final(e);
+		else
+			drbd_endio_read_sec_final(e);
+	}
 }
 
 /* read, readA or write requests on R_PRIMARY coming from drbd_make_request
@@ -295,7 +300,34 @@
 	return 1; /* Simply ignore this! */
 }
 
-void drbd_csum(struct drbd_conf *mdev, struct crypto_hash *tfm, struct bio *bio, void *digest)
+void drbd_csum_ee(struct drbd_conf *mdev, struct crypto_hash *tfm, struct drbd_epoch_entry *e, void *digest)
+{
+	struct hash_desc desc;
+	struct scatterlist sg;
+	struct page *page = e->pages;
+	struct page *tmp;
+	unsigned len;
+
+	desc.tfm = tfm;
+	desc.flags = 0;
+
+	sg_init_table(&sg, 1);
+	crypto_hash_init(&desc);
+
+	while ((tmp = page_chain_next(page))) {
+		/* all but the last page will be fully used */
+		sg_set_page(&sg, page, PAGE_SIZE, 0);
+		crypto_hash_update(&desc, &sg, sg.length);
+		page = tmp;
+	}
+	/* and now the last, possibly only partially used page */
+	len = e->size & (PAGE_SIZE - 1);
+	sg_set_page(&sg, page, len ?: PAGE_SIZE, 0);
+	crypto_hash_update(&desc, &sg, sg.length);
+	crypto_hash_final(&desc, digest);
+}
+
+void drbd_csum_bio(struct drbd_conf *mdev, struct crypto_hash *tfm, struct bio *bio, void *digest)
 {
 	struct hash_desc desc;
 	struct scatterlist sg;
@@ -329,11 +361,11 @@
 		return 1;
 	}
 
-	if (likely(drbd_bio_uptodate(e->private_bio))) {
+	if (likely((e->flags & EE_WAS_ERROR) == 0)) {
 		digest_size = crypto_hash_digestsize(mdev->csums_tfm);
 		digest = kmalloc(digest_size, GFP_NOIO);
 		if (digest) {
-			drbd_csum(mdev, mdev->csums_tfm, e->private_bio, digest);
+			drbd_csum_ee(mdev, mdev->csums_tfm, e, digest);
 
 			inc_rs_pending(mdev);
 			ok = drbd_send_drequest_csum(mdev,
@@ -369,23 +401,21 @@
 	/* GFP_TRY, because if there is no memory available right now, this may
 	 * be rescheduled for later. It is "only" background resync, after all. */
 	e = drbd_alloc_ee(mdev, DRBD_MAGIC+0xbeef, sector, size, GFP_TRY);
-	if (!e) {
-		put_ldev(mdev);
-		return 2;
-	}
+	if (!e)
+		goto fail;
 
 	spin_lock_irq(&mdev->req_lock);
 	list_add(&e->w.list, &mdev->read_ee);
 	spin_unlock_irq(&mdev->req_lock);
 
-	e->private_bio->bi_end_io = drbd_endio_read_sec;
-	e->private_bio->bi_rw = READ;
 	e->w.cb = w_e_send_csum;
+	if (drbd_submit_ee(mdev, e, READ, DRBD_FAULT_RS_RD) == 0)
+		return 1;
 
-	mdev->read_cnt += size >> 9;
-	drbd_generic_make_request(mdev, DRBD_FAULT_RS_RD, e->private_bio);
-
-	return 1;
+	drbd_free_ee(mdev, e);
+fail:
+	put_ldev(mdev);
+	return 2;
 }
 
 void resync_timer_fn(unsigned long data)
@@ -819,7 +849,7 @@
 /* helper */
 static void move_to_net_ee_or_free(struct drbd_conf *mdev, struct drbd_epoch_entry *e)
 {
-	if (drbd_bio_has_active_page(e->private_bio)) {
+	if (drbd_ee_has_active_page(e)) {
 		/* This might happen if sendpage() has not finished */
 		spin_lock_irq(&mdev->req_lock);
 		list_add_tail(&e->w.list, &mdev->net_ee);
@@ -845,7 +875,7 @@
 		return 1;
 	}
 
-	if (likely(drbd_bio_uptodate(e->private_bio))) {
+	if (likely((e->flags & EE_WAS_ERROR) == 0)) {
 		ok = drbd_send_block(mdev, P_DATA_REPLY, e);
 	} else {
 		if (__ratelimit(&drbd_ratelimit_state))
@@ -886,7 +916,7 @@
 		put_ldev(mdev);
 	}
 
-	if (likely(drbd_bio_uptodate(e->private_bio))) {
+	if (likely((e->flags & EE_WAS_ERROR) == 0)) {
 		if (likely(mdev->state.pdsk >= D_INCONSISTENT)) {
 			inc_rs_pending(mdev);
 			ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
@@ -934,7 +964,7 @@
 
 	di = (struct digest_info *)(unsigned long)e->block_id;
 
-	if (likely(drbd_bio_uptodate(e->private_bio))) {
+	if (likely((e->flags & EE_WAS_ERROR) == 0)) {
 		/* quick hack to try to avoid a race against reconfiguration.
 		 * a real fix would be much more involved,
 		 * introducing more locking mechanisms */
@@ -944,7 +974,7 @@
 			digest = kmalloc(digest_size, GFP_NOIO);
 		}
 		if (digest) {
-			drbd_csum(mdev, mdev->csums_tfm, e->private_bio, digest);
+			drbd_csum_ee(mdev, mdev->csums_tfm, e, digest);
 			eq = !memcmp(digest, di->digest, digest_size);
 			kfree(digest);
 		}
@@ -986,14 +1016,14 @@
 	if (unlikely(cancel))
 		goto out;
 
-	if (unlikely(!drbd_bio_uptodate(e->private_bio)))
+	if (unlikely((e->flags & EE_WAS_ERROR) != 0))
 		goto out;
 
 	digest_size = crypto_hash_digestsize(mdev->verify_tfm);
 	/* FIXME if this allocation fails, online verify will not terminate! */
 	digest = kmalloc(digest_size, GFP_NOIO);
 	if (digest) {
-		drbd_csum(mdev, mdev->verify_tfm, e->private_bio, digest);
+		drbd_csum_ee(mdev, mdev->verify_tfm, e, digest);
 		inc_rs_pending(mdev);
 		ok = drbd_send_drequest_csum(mdev, e->sector, e->size,
 					     digest, digest_size, P_OV_REPLY);
@@ -1042,11 +1072,11 @@
 
 	di = (struct digest_info *)(unsigned long)e->block_id;
 
-	if (likely(drbd_bio_uptodate(e->private_bio))) {
+	if (likely((e->flags & EE_WAS_ERROR) == 0)) {
 		digest_size = crypto_hash_digestsize(mdev->verify_tfm);
 		digest = kmalloc(digest_size, GFP_NOIO);
 		if (digest) {
-			drbd_csum(mdev, mdev->verify_tfm, e->private_bio, digest);
+			drbd_csum_ee(mdev, mdev->verify_tfm, e, digest);
 
 			D_ASSERT(digest_size == di->digest_size);
 			eq = !memcmp(digest, di->digest, digest_size);