drbd: Immediately allow completion of IOs, that wait for IO completions on a failed disk

Signed-off-by: Philipp Reisner <philipp.reisner@linbit.com>
Signed-off-by: Lars Ellenberg <lars.ellenberg@linbit.com>
diff --git a/drivers/block/drbd/drbd_main.c b/drivers/block/drbd/drbd_main.c
index 8e489a6..16969c2 100644
--- a/drivers/block/drbd/drbd_main.c
+++ b/drivers/block/drbd/drbd_main.c
@@ -368,6 +368,12 @@
 		}
 		tmp = b->next;
 
+		if (what == abort_disk_io) {
+			/* Only walk the TL, leave barrier objects in place */
+			b = tmp;
+			continue;
+		}
+
 		if (n_writes) {
 			if (what == resend) {
 				b->n_writes = n_writes;
@@ -1565,6 +1571,10 @@
 		eh = mdev->ldev->dc.on_io_error;
 		was_io_error = test_and_clear_bit(WAS_IO_ERROR, &mdev->flags);
 
+		/* Immediately allow completion of all application IO, that waits
+		   for completion from the local disk. */
+		tl_restart(mdev, abort_disk_io);
+
 		/* current state still has to be D_FAILED,
 		 * there is only one way out: to D_DISKLESS,
 		 * and that may only happen after our put_ldev below. */
diff --git a/drivers/block/drbd/drbd_req.c b/drivers/block/drbd/drbd_req.c
index 4a0f314..1a8aac4 100644
--- a/drivers/block/drbd/drbd_req.c
+++ b/drivers/block/drbd/drbd_req.c
@@ -214,8 +214,7 @@
 {
 	const unsigned long s = req->rq_state;
 	struct drbd_conf *mdev = req->mdev;
-	/* only WRITES may end up here without a master bio (on barrier ack) */
-	int rw = req->master_bio ? bio_data_dir(req->master_bio) : WRITE;
+	int rw = req->rq_state & RQ_WRITE ? WRITE : READ;
 
 	/* we must not complete the master bio, while it is
 	 *	still being processed by _drbd_send_zc_bio (drbd_send_dblock)
@@ -230,7 +229,7 @@
 		return;
 	if (s & RQ_NET_PENDING)
 		return;
-	if (s & RQ_LOCAL_PENDING)
+	if (s & RQ_LOCAL_PENDING && !(s & RQ_LOCAL_ABORTED))
 		return;
 
 	if (req->master_bio) {
@@ -277,6 +276,9 @@
 		req->master_bio = NULL;
 	}
 
+	if (s & RQ_LOCAL_PENDING)
+		return;
+
 	if ((s & RQ_NET_MASK) == 0 || (s & RQ_NET_DONE)) {
 		/* this is disconnected (local only) operation,
 		 * or protocol C P_WRITE_ACK,
@@ -429,7 +431,7 @@
 		break;
 
 	case completed_ok:
-		if (bio_data_dir(req->master_bio) == WRITE)
+		if (req->rq_state & RQ_WRITE)
 			mdev->writ_cnt += req->size>>9;
 		else
 			mdev->read_cnt += req->size>>9;
@@ -441,6 +443,14 @@
 		put_ldev(mdev);
 		break;
 
+	case abort_disk_io:
+		req->rq_state |= RQ_LOCAL_ABORTED;
+		if (req->rq_state & RQ_WRITE)
+			_req_may_be_done_not_susp(req, m);
+		else
+			goto goto_queue_for_net_read;
+		break;
+
 	case write_completed_with_error:
 		req->rq_state |= RQ_LOCAL_COMPLETED;
 		req->rq_state &= ~RQ_LOCAL_PENDING;
@@ -469,6 +479,8 @@
 		__drbd_chk_io_error(mdev, false);
 		put_ldev(mdev);
 
+	goto_queue_for_net_read:
+
 		/* no point in retrying if there is no good remote data,
 		 * or we have no connection. */
 		if (mdev->state.pdsk != D_UP_TO_DATE) {
diff --git a/drivers/block/drbd/drbd_req.h b/drivers/block/drbd/drbd_req.h
index 74c5b9f..3d21119 100644
--- a/drivers/block/drbd/drbd_req.h
+++ b/drivers/block/drbd/drbd_req.h
@@ -119,18 +119,21 @@
  * same time, so we should hold the request lock anyways.
  */
 enum drbd_req_state_bits {
-	/* 210
-	 * 000: no local possible
-	 * 001: to be submitted
+	/* 3210
+	 * 0000: no local possible
+	 * 0001: to be submitted
 	 *    UNUSED, we could map: 011: submitted, completion still pending
-	 * 110: completed ok
-	 * 010: completed with error
+	 * 0110: completed ok
+	 * 0010: completed with error
+	 * 1001: Aborted (before completion)
+	 * 1x10: Aborted and completed -> free
 	 */
 	__RQ_LOCAL_PENDING,
 	__RQ_LOCAL_COMPLETED,
 	__RQ_LOCAL_OK,
+	__RQ_LOCAL_ABORTED,
 
-	/* 76543
+	/* 87654
 	 * 00000: no network possible
 	 * 00001: to be send
 	 * 00011: to be send, on worker queue
@@ -200,8 +203,9 @@
 #define RQ_LOCAL_PENDING   (1UL << __RQ_LOCAL_PENDING)
 #define RQ_LOCAL_COMPLETED (1UL << __RQ_LOCAL_COMPLETED)
 #define RQ_LOCAL_OK        (1UL << __RQ_LOCAL_OK)
+#define RQ_LOCAL_ABORTED   (1UL << __RQ_LOCAL_ABORTED)
 
-#define RQ_LOCAL_MASK      ((RQ_LOCAL_OK << 1)-1) /* 0x07 */
+#define RQ_LOCAL_MASK      ((RQ_LOCAL_ABORTED << 1)-1)
 
 #define RQ_NET_PENDING     (1UL << __RQ_NET_PENDING)
 #define RQ_NET_QUEUED      (1UL << __RQ_NET_QUEUED)