drbd: Allow drbd_epoch_entries to use multiple bios.
This should allow for better performance if the lower level IO stack
of the peers differs in limits exposed either via the queue,
or via some merge_bvec_fn.

Signed-off-by: Philipp Reisner <philipp.reisner@linbit.com>
Signed-off-by: Lars Ellenberg <lars.ellenberg@linbit.com>
diff --git a/drivers/block/drbd/drbd_receiver.c b/drivers/block/drbd/drbd_receiver.c
index fee0d24..388a3e8 100644
--- a/drivers/block/drbd/drbd_receiver.c
+++ b/drivers/block/drbd/drbd_receiver.c
@@ -80,30 +80,124 @@
 
 #define GFP_TRY	(__GFP_HIGHMEM | __GFP_NOWARN)
 
-static struct page *drbd_pp_first_page_or_try_alloc(struct drbd_conf *mdev)
+/*
+ * some helper functions to deal with single linked page lists,
+ * page->private being our "next" pointer.
+ */
+
+/* If at least n pages are linked at head, get n pages off.
+ * Otherwise, don't modify head, and return NULL.
+ * Locking is the responsibility of the caller.
+ */
+static struct page *page_chain_del(struct page **head, int n)
+{
+	struct page *page;
+	struct page *tmp;
+
+	BUG_ON(!n);
+	BUG_ON(!head);
+
+	page = *head;
+	while (page) {
+		tmp = page_chain_next(page);
+		if (--n == 0)
+			break; /* found sufficient pages */
+		if (tmp == NULL)
+			/* insufficient pages, don't use any of them. */
+			return NULL;
+		page = tmp;
+	}
+
+	/* add end of list marker for the returned list */
+	set_page_private(page, 0);
+	/* actual return value, and adjustment of head */
+	page = *head;
+	*head = tmp;
+	return page;
+}
+
+/* may be used outside of locks to find the tail of a (usually short)
+ * "private" page chain, before adding it back to a global chain head
+ * with page_chain_add() under a spinlock. */
+static struct page *page_chain_tail(struct page *page, int *len)
+{
+	struct page *tmp;
+	int i = 1;
+	while ((tmp = page_chain_next(page)))
+		++i, page = tmp;
+	if (len)
+		*len = i;
+	return page;
+}
+
+static int page_chain_free(struct page *page)
+{
+	struct page *tmp;
+	int i = 0;
+	page_chain_for_each_safe(page, tmp) {
+		put_page(page);
+		++i;
+	}
+	return i;
+}
+
+static void page_chain_add(struct page **head,
+		struct page *chain_first, struct page *chain_last)
+{
+#if 1
+	struct page *tmp;
+	tmp = page_chain_tail(chain_first, NULL);
+	BUG_ON(tmp != chain_last);
+#endif
+
+	/* add chain to head */
+	set_page_private(chain_last, (unsigned long)*head);
+	*head = chain_first;
+}
+
+static struct page *drbd_pp_first_pages_or_try_alloc(struct drbd_conf *mdev, int number)
 {
 	struct page *page = NULL;
+	struct page *tmp = NULL;
+	int i = 0;
 
 	/* Yes, testing drbd_pp_vacant outside the lock is racy.
 	 * So what. It saves a spin_lock. */
-	if (drbd_pp_vacant > 0) {
+	if (drbd_pp_vacant >= number) {
 		spin_lock(&drbd_pp_lock);
-		page = drbd_pp_pool;
-		if (page) {
-			drbd_pp_pool = (struct page *)page_private(page);
-			set_page_private(page, 0); /* just to be polite */
-			drbd_pp_vacant--;
-		}
+		page = page_chain_del(&drbd_pp_pool, number);
+		if (page)
+			drbd_pp_vacant -= number;
 		spin_unlock(&drbd_pp_lock);
+		if (page)
+			return page;
 	}
+
 	/* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
 	 * "criss-cross" setup, that might cause write-out on some other DRBD,
 	 * which in turn might block on the other node at this very place.  */
-	if (!page)
-		page = alloc_page(GFP_TRY);
-	if (page)
-		atomic_inc(&mdev->pp_in_use);
-	return page;
+	for (i = 0; i < number; i++) {
+		tmp = alloc_page(GFP_TRY);
+		if (!tmp)
+			break;
+		set_page_private(tmp, (unsigned long)page);
+		page = tmp;
+	}
+
+	if (i == number)
+		return page;
+
+	/* Not enough pages immediately available this time.
+	 * No need to jump around here, drbd_pp_alloc will retry this
+	 * function "soon". */
+	if (page) {
+		tmp = page_chain_tail(page, NULL);
+		spin_lock(&drbd_pp_lock);
+		page_chain_add(&drbd_pp_pool, page, tmp);
+		drbd_pp_vacant += i;
+		spin_unlock(&drbd_pp_lock);
+	}
+	return NULL;
 }
 
 /* kick lower level device, if we have more than (arbitrary number)
@@ -127,7 +221,7 @@
 
 	list_for_each_safe(le, tle, &mdev->net_ee) {
 		e = list_entry(le, struct drbd_epoch_entry, w.list);
-		if (drbd_bio_has_active_page(e->private_bio))
+		if (drbd_ee_has_active_page(e))
 			break;
 		list_move(le, to_be_freed);
 	}
@@ -148,32 +242,34 @@
 }
 
 /**
- * drbd_pp_alloc() - Returns a page, fails only if a signal comes in
+ * drbd_pp_alloc() - Returns @number pages, retries forever (or until signalled)
  * @mdev:	DRBD device.
- * @retry:	whether or not to retry allocation forever (or until signalled)
+ * @number:	number of pages requested
+ * @retry:	whether to retry, if not enough pages are available right now
  *
- * Tries to allocate a page, first from our own page pool, then from the
- * kernel, unless this allocation would exceed the max_buffers setting.
- * If @retry is non-zero, retry until DRBD frees a page somewhere else.
+ * Tries to allocate number pages, first from our own page pool, then from
+ * the kernel, unless this allocation would exceed the max_buffers setting.
+ * Possibly retry until DRBD frees sufficient pages somewhere else.
+ *
+ * Returns a page chain linked via page->private.
  */
-static struct page *drbd_pp_alloc(struct drbd_conf *mdev, int retry)
+static struct page *drbd_pp_alloc(struct drbd_conf *mdev, unsigned number, bool retry)
 {
 	struct page *page = NULL;
 	DEFINE_WAIT(wait);
 
-	if (atomic_read(&mdev->pp_in_use) < mdev->net_conf->max_buffers) {
-		page = drbd_pp_first_page_or_try_alloc(mdev);
-		if (page)
-			return page;
-	}
+	/* Yes, we may run up to @number over max_buffers. If we
+	 * follow it strictly, the admin will get it wrong anyways. */
+	if (atomic_read(&mdev->pp_in_use) < mdev->net_conf->max_buffers)
+		page = drbd_pp_first_pages_or_try_alloc(mdev, number);
 
-	for (;;) {
+	while (page == NULL) {
 		prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
 
 		drbd_kick_lo_and_reclaim_net(mdev);
 
 		if (atomic_read(&mdev->pp_in_use) < mdev->net_conf->max_buffers) {
-			page = drbd_pp_first_page_or_try_alloc(mdev);
+			page = drbd_pp_first_pages_or_try_alloc(mdev, number);
 			if (page)
 				break;
 		}
@@ -190,62 +286,32 @@
 	}
 	finish_wait(&drbd_pp_wait, &wait);
 
+	if (page)
+		atomic_add(number, &mdev->pp_in_use);
 	return page;
 }
 
 /* Must not be used from irq, as that may deadlock: see drbd_pp_alloc.
- * Is also used from inside an other spin_lock_irq(&mdev->req_lock) */
+ * Is also used from inside an other spin_lock_irq(&mdev->req_lock);
+ * Either links the page chain back to the global pool,
+ * or returns all pages to the system. */
 static void drbd_pp_free(struct drbd_conf *mdev, struct page *page)
 {
-	int free_it;
-
-	spin_lock(&drbd_pp_lock);
-	if (drbd_pp_vacant > (DRBD_MAX_SEGMENT_SIZE/PAGE_SIZE)*minor_count) {
-		free_it = 1;
-	} else {
-		set_page_private(page, (unsigned long)drbd_pp_pool);
-		drbd_pp_pool = page;
-		drbd_pp_vacant++;
-		free_it = 0;
-	}
-	spin_unlock(&drbd_pp_lock);
-
-	atomic_dec(&mdev->pp_in_use);
-
-	if (free_it)
-		__free_page(page);
-
-	wake_up(&drbd_pp_wait);
-}
-
-static void drbd_pp_free_bio_pages(struct drbd_conf *mdev, struct bio *bio)
-{
-	struct page *p_to_be_freed = NULL;
-	struct page *page;
-	struct bio_vec *bvec;
 	int i;
-
-	spin_lock(&drbd_pp_lock);
-	__bio_for_each_segment(bvec, bio, i, 0) {
-		if (drbd_pp_vacant > (DRBD_MAX_SEGMENT_SIZE/PAGE_SIZE)*minor_count) {
-			set_page_private(bvec->bv_page, (unsigned long)p_to_be_freed);
-			p_to_be_freed = bvec->bv_page;
-		} else {
-			set_page_private(bvec->bv_page, (unsigned long)drbd_pp_pool);
-			drbd_pp_pool = bvec->bv_page;
-			drbd_pp_vacant++;
-		}
+	if (drbd_pp_vacant > (DRBD_MAX_SEGMENT_SIZE/PAGE_SIZE)*minor_count)
+		i = page_chain_free(page);
+	else {
+		struct page *tmp;
+		tmp = page_chain_tail(page, &i);
+		spin_lock(&drbd_pp_lock);
+		page_chain_add(&drbd_pp_pool, page, tmp);
+		drbd_pp_vacant += i;
+		spin_unlock(&drbd_pp_lock);
 	}
-	spin_unlock(&drbd_pp_lock);
-	atomic_sub(bio->bi_vcnt, &mdev->pp_in_use);
-
-	while (p_to_be_freed) {
-		page = p_to_be_freed;
-		p_to_be_freed = (struct page *)page_private(page);
-		set_page_private(page, 0); /* just to be polite */
-		put_page(page);
-	}
-
+	atomic_sub(i, &mdev->pp_in_use);
+	i = atomic_read(&mdev->pp_in_use);
+	if (i < 0)
+		dev_warn(DEV, "ASSERTION FAILED: pp_in_use: %d < 0\n", i);
 	wake_up(&drbd_pp_wait);
 }
 
@@ -270,11 +336,9 @@
 				     unsigned int data_size,
 				     gfp_t gfp_mask) __must_hold(local)
 {
-	struct request_queue *q;
 	struct drbd_epoch_entry *e;
 	struct page *page;
-	struct bio *bio;
-	unsigned int ds;
+	unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
 
 	if (FAULT_ACTIVE(mdev, DRBD_FAULT_AL_EE))
 		return NULL;
@@ -286,84 +350,32 @@
 		return NULL;
 	}
 
-	bio = bio_alloc(gfp_mask & ~__GFP_HIGHMEM, div_ceil(data_size, PAGE_SIZE));
-	if (!bio) {
-		if (!(gfp_mask & __GFP_NOWARN))
-			dev_err(DEV, "alloc_ee: Allocation of a bio failed\n");
-		goto fail1;
-	}
+	page = drbd_pp_alloc(mdev, nr_pages, (gfp_mask & __GFP_WAIT));
+	if (!page)
+		goto fail;
 
-	bio->bi_bdev = mdev->ldev->backing_bdev;
-	bio->bi_sector = sector;
-
-	ds = data_size;
-	while (ds) {
-		page = drbd_pp_alloc(mdev, (gfp_mask & __GFP_WAIT));
-		if (!page) {
-			if (!(gfp_mask & __GFP_NOWARN))
-				dev_err(DEV, "alloc_ee: Allocation of a page failed\n");
-			goto fail2;
-		}
-		if (!bio_add_page(bio, page, min_t(int, ds, PAGE_SIZE), 0)) {
-			drbd_pp_free(mdev, page);
-			dev_err(DEV, "alloc_ee: bio_add_page(s=%llu,"
-			    "data_size=%u,ds=%u) failed\n",
-			    (unsigned long long)sector, data_size, ds);
-
-			q = bdev_get_queue(bio->bi_bdev);
-			if (q->merge_bvec_fn) {
-				struct bvec_merge_data bvm = {
-					.bi_bdev = bio->bi_bdev,
-					.bi_sector = bio->bi_sector,
-					.bi_size = bio->bi_size,
-					.bi_rw = bio->bi_rw,
-				};
-				int l = q->merge_bvec_fn(q, &bvm,
-						&bio->bi_io_vec[bio->bi_vcnt]);
-				dev_err(DEV, "merge_bvec_fn() = %d\n", l);
-			}
-
-			/* dump more of the bio. */
-			dev_err(DEV, "bio->bi_max_vecs = %d\n", bio->bi_max_vecs);
-			dev_err(DEV, "bio->bi_vcnt = %d\n", bio->bi_vcnt);
-			dev_err(DEV, "bio->bi_size = %d\n", bio->bi_size);
-			dev_err(DEV, "bio->bi_phys_segments = %d\n", bio->bi_phys_segments);
-
-			goto fail2;
-			break;
-		}
-		ds -= min_t(int, ds, PAGE_SIZE);
-	}
-
-	D_ASSERT(data_size == bio->bi_size);
-
-	bio->bi_private = e;
-	e->mdev = mdev;
-	e->sector = sector;
-	e->size = bio->bi_size;
-
-	e->private_bio = bio;
-	e->block_id = id;
 	INIT_HLIST_NODE(&e->colision);
 	e->epoch = NULL;
+	e->mdev = mdev;
+	e->pages = page;
+	atomic_set(&e->pending_bios, 0);
+	e->size = data_size;
 	e->flags = 0;
+	e->sector = sector;
+	e->sector = sector;
+	e->block_id = id;
 
 	return e;
 
- fail2:
-	drbd_pp_free_bio_pages(mdev, bio);
-	bio_put(bio);
- fail1:
+ fail:
 	mempool_free(e, drbd_ee_mempool);
-
 	return NULL;
 }
 
 void drbd_free_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e)
 {
-	struct bio *bio = e->private_bio;
-	drbd_pp_free_bio_pages(mdev, bio);
-	bio_put(bio);
+	drbd_pp_free(mdev, e->pages);
+	D_ASSERT(atomic_read(&e->pending_bios) == 0);
 	D_ASSERT(hlist_unhashed(&e->colision));
 	mempool_free(e, drbd_ee_mempool);
 }
@@ -1121,6 +1133,90 @@
 }
 
 /**
+ * drbd_submit_ee()
+ * @mdev:	DRBD device.
+ * @e:		epoch entry
+ * @rw:		flag field, see bio->bi_rw
+ */
+/* TODO allocate from our own bio_set. */
+int drbd_submit_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e,
+		const unsigned rw, const int fault_type)
+{
+	struct bio *bios = NULL;
+	struct bio *bio;
+	struct page *page = e->pages;
+	sector_t sector = e->sector;
+	unsigned ds = e->size;
+	unsigned n_bios = 0;
+	unsigned nr_pages = (ds + PAGE_SIZE -1) >> PAGE_SHIFT;
+
+	/* In most cases, we will only need one bio.  But in case the lower
+	 * level restrictions happen to be different at this offset on this
+	 * side than those of the sending peer, we may need to submit the
+	 * request in more than one bio. */
+next_bio:
+	bio = bio_alloc(GFP_NOIO, nr_pages);
+	if (!bio) {
+		dev_err(DEV, "submit_ee: Allocation of a bio failed\n");
+		goto fail;
+	}
+	/* > e->sector, unless this is the first bio */
+	bio->bi_sector = sector;
+	bio->bi_bdev = mdev->ldev->backing_bdev;
+	/* we special case some flags in the multi-bio case, see below
+	 * (BIO_RW_UNPLUG, BIO_RW_BARRIER) */
+	bio->bi_rw = rw;
+	bio->bi_private = e;
+	bio->bi_end_io = drbd_endio_sec;
+
+	bio->bi_next = bios;
+	bios = bio;
+	++n_bios;
+
+	page_chain_for_each(page) {
+		unsigned len = min_t(unsigned, ds, PAGE_SIZE);
+		if (!bio_add_page(bio, page, len, 0)) {
+			/* a single page must always be possible! */
+			BUG_ON(bio->bi_vcnt == 0);
+			goto next_bio;
+		}
+		ds -= len;
+		sector += len >> 9;
+		--nr_pages;
+	}
+	D_ASSERT(page == NULL);
+	D_ASSERT(ds == 0);
+
+	atomic_set(&e->pending_bios, n_bios);
+	do {
+		bio = bios;
+		bios = bios->bi_next;
+		bio->bi_next = NULL;
+
+		/* strip off BIO_RW_UNPLUG unless it is the last bio */
+		if (bios)
+			bio->bi_rw &= ~(1<<BIO_RW_UNPLUG);
+
+		drbd_generic_make_request(mdev, fault_type, bio);
+
+		/* strip off BIO_RW_BARRIER,
+		 * unless it is the first or last bio */
+		if (bios && bios->bi_next)
+			bios->bi_rw &= ~(1<<BIO_RW_BARRIER);
+	} while (bios);
+	maybe_kick_lo(mdev);
+	return 0;
+
+fail:
+	while (bios) {
+		bio = bios;
+		bios = bios->bi_next;
+		bio_put(bio);
+	}
+	return -ENOMEM;
+}
+
+/**
  * w_e_reissue() - Worker callback; Resubmit a bio, without BIO_RW_BARRIER set
  * @mdev:	DRBD device.
  * @w:		work object.
@@ -1129,8 +1225,6 @@
 int w_e_reissue(struct drbd_conf *mdev, struct drbd_work *w, int cancel) __releases(local)
 {
 	struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
-	struct bio *bio = e->private_bio;
-
 	/* We leave DE_CONTAINS_A_BARRIER and EE_IS_BARRIER in place,
 	   (and DE_BARRIER_IN_NEXT_EPOCH_ISSUED in the previous Epoch)
 	   so that we can finish that epoch in drbd_may_finish_epoch().
@@ -1144,33 +1238,17 @@
 	if (previous_epoch(mdev, e->epoch))
 		dev_warn(DEV, "Write ordering was not enforced (one time event)\n");
 
-	/* prepare bio for re-submit,
-	 * re-init volatile members */
 	/* we still have a local reference,
 	 * get_ldev was done in receive_Data. */
-	bio->bi_bdev = mdev->ldev->backing_bdev;
-	bio->bi_sector = e->sector;
-	bio->bi_size = e->size;
-	bio->bi_idx = 0;
-
-	bio->bi_flags &= ~(BIO_POOL_MASK - 1);
-	bio->bi_flags |= 1 << BIO_UPTODATE;
-
-	/* don't know whether this is necessary: */
-	bio->bi_phys_segments = 0;
-	bio->bi_next = NULL;
-
-	/* these should be unchanged: */
-	/* bio->bi_end_io = drbd_endio_write_sec; */
-	/* bio->bi_vcnt = whatever; */
 
 	e->w.cb = e_end_block;
-
-	/* This is no longer a barrier request. */
-	bio->bi_rw &= ~(1UL << BIO_RW_BARRIER);
-
-	drbd_generic_make_request(mdev, DRBD_FAULT_DT_WR, bio);
-
+	if (drbd_submit_ee(mdev, e, WRITE, DRBD_FAULT_DT_WR) != 0) {
+		/* drbd_submit_ee fails for one reason only:
+		 * if was not able to allocate sufficient bios.
+		 * requeue, try again later. */
+		e->w.cb = w_e_reissue;
+		drbd_queue_work(&mdev->data.work, &e->w);
+	}
 	return 1;
 }
 
@@ -1264,10 +1342,8 @@
 {
 	const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
 	struct drbd_epoch_entry *e;
-	struct bio_vec *bvec;
 	struct page *page;
-	struct bio *bio;
-	int dgs, ds, i, rr;
+	int dgs, ds, rr;
 	void *dig_in = mdev->int_dig_in;
 	void *dig_vv = mdev->int_dig_vv;
 	unsigned long *data;
@@ -1304,28 +1380,29 @@
 	e = drbd_alloc_ee(mdev, id, sector, data_size, GFP_NOIO);
 	if (!e)
 		return NULL;
-	bio = e->private_bio;
+
 	ds = data_size;
-	bio_for_each_segment(bvec, bio, i) {
-		page = bvec->bv_page;
+	page = e->pages;
+	page_chain_for_each(page) {
+		unsigned len = min_t(int, ds, PAGE_SIZE);
 		data = kmap(page);
-		rr = drbd_recv(mdev, data, min_t(int, ds, PAGE_SIZE));
+		rr = drbd_recv(mdev, data, len);
 		if (FAULT_ACTIVE(mdev, DRBD_FAULT_RECEIVE)) {
 			dev_err(DEV, "Fault injection: Corrupting data on receive\n");
 			data[0] = data[0] ^ (unsigned long)-1;
 		}
 		kunmap(page);
-		if (rr != min_t(int, ds, PAGE_SIZE)) {
+		if (rr != len) {
 			drbd_free_ee(mdev, e);
 			dev_warn(DEV, "short read receiving data: read %d expected %d\n",
-			     rr, min_t(int, ds, PAGE_SIZE));
+			     rr, len);
 			return NULL;
 		}
 		ds -= rr;
 	}
 
 	if (dgs) {
-		drbd_csum(mdev, mdev->integrity_r_tfm, bio, dig_vv);
+		drbd_csum_ee(mdev, mdev->integrity_r_tfm, e, dig_vv);
 		if (memcmp(dig_in, dig_vv, dgs)) {
 			dev_err(DEV, "Digest integrity check FAILED.\n");
 			drbd_bcast_ee(mdev, "digest failed",
@@ -1350,7 +1427,7 @@
 	if (!data_size)
 		return TRUE;
 
-	page = drbd_pp_alloc(mdev, 1);
+	page = drbd_pp_alloc(mdev, 1, 1);
 
 	data = kmap(page);
 	while (data_size) {
@@ -1414,7 +1491,7 @@
 	}
 
 	if (dgs) {
-		drbd_csum(mdev, mdev->integrity_r_tfm, bio, dig_vv);
+		drbd_csum_bio(mdev, mdev->integrity_r_tfm, bio, dig_vv);
 		if (memcmp(dig_in, dig_vv, dgs)) {
 			dev_err(DEV, "Digest integrity check FAILED. Broken NICs?\n");
 			return 0;
@@ -1435,7 +1512,7 @@
 
 	D_ASSERT(hlist_unhashed(&e->colision));
 
-	if (likely(drbd_bio_uptodate(e->private_bio))) {
+	if (likely((e->flags & EE_WAS_ERROR) == 0)) {
 		drbd_set_in_sync(mdev, sector, e->size);
 		ok = drbd_send_ack(mdev, P_RS_WRITE_ACK, e);
 	} else {
@@ -1454,30 +1531,28 @@
 	struct drbd_epoch_entry *e;
 
 	e = read_in_block(mdev, ID_SYNCER, sector, data_size);
-	if (!e) {
-		put_ldev(mdev);
-		return FALSE;
-	}
+	if (!e)
+		goto fail;
 
 	dec_rs_pending(mdev);
 
-	e->private_bio->bi_end_io = drbd_endio_write_sec;
-	e->private_bio->bi_rw = WRITE;
-	e->w.cb = e_end_resync_block;
-
 	inc_unacked(mdev);
 	/* corresponding dec_unacked() in e_end_resync_block()
 	 * respective _drbd_clear_done_ee */
 
+	e->w.cb = e_end_resync_block;
+
 	spin_lock_irq(&mdev->req_lock);
 	list_add(&e->w.list, &mdev->sync_ee);
 	spin_unlock_irq(&mdev->req_lock);
 
-	drbd_generic_make_request(mdev, DRBD_FAULT_RS_WR, e->private_bio);
-	/* accounting done in endio */
+	if (drbd_submit_ee(mdev, e, WRITE, DRBD_FAULT_RS_WR) == 0)
+		return TRUE;
 
-	maybe_kick_lo(mdev);
-	return TRUE;
+	drbd_free_ee(mdev, e);
+fail:
+	put_ldev(mdev);
+	return FALSE;
 }
 
 static int receive_DataReply(struct drbd_conf *mdev, struct p_header *h)
@@ -1572,7 +1647,7 @@
 	}
 
 	if (mdev->net_conf->wire_protocol == DRBD_PROT_C) {
-		if (likely(drbd_bio_uptodate(e->private_bio))) {
+		if (likely((e->flags & EE_WAS_ERROR) == 0)) {
 			pcmd = (mdev->state.conn >= C_SYNC_SOURCE &&
 				mdev->state.conn <= C_PAUSED_SYNC_T &&
 				e->flags & EE_MAY_SET_IN_SYNC) ?
@@ -1718,7 +1793,6 @@
 		return FALSE;
 	}
 
-	e->private_bio->bi_end_io = drbd_endio_write_sec;
 	e->w.cb = e_end_block;
 
 	spin_lock(&mdev->epoch_lock);
@@ -1914,12 +1988,8 @@
 		drbd_al_begin_io(mdev, e->sector);
 	}
 
-	e->private_bio->bi_rw = rw;
-	drbd_generic_make_request(mdev, DRBD_FAULT_DT_WR, e->private_bio);
-	/* accounting done in endio */
-
-	maybe_kick_lo(mdev);
-	return TRUE;
+	if (drbd_submit_ee(mdev, e, rw, DRBD_FAULT_DT_WR) == 0)
+		return TRUE;
 
 out_interrupted:
 	/* yes, the epoch_size now is imbalanced.
@@ -1977,9 +2047,6 @@
 		return FALSE;
 	}
 
-	e->private_bio->bi_rw = READ;
-	e->private_bio->bi_end_io = drbd_endio_read_sec;
-
 	switch (h->command) {
 	case P_DATA_REQUEST:
 		e->w.cb = w_e_end_data_req;
@@ -2073,10 +2140,8 @@
 
 	inc_unacked(mdev);
 
-	drbd_generic_make_request(mdev, fault_type, e->private_bio);
-	maybe_kick_lo(mdev);
-
-	return TRUE;
+	if (drbd_submit_ee(mdev, e, READ, fault_type) == 0)
+		return TRUE;
 
 out_free_e:
 	kfree(di);
@@ -3837,7 +3902,7 @@
 		dev_info(DEV, "net_ee not empty, killed %u entries\n", i);
 	i = atomic_read(&mdev->pp_in_use);
 	if (i)
-		dev_info(DEV, "pp_in_use = %u, expected 0\n", i);
+		dev_info(DEV, "pp_in_use = %d, expected 0\n", i);
 
 	D_ASSERT(list_empty(&mdev->read_ee));
 	D_ASSERT(list_empty(&mdev->active_ee));