drbd: Finished the "on-no-data-accessible suspend-io;" functionality

When no data is accessible (no connection to the peer, nor a local disk)
allow the user to select to freeze all IO operations instead of getting
IO errors.

Signed-off-by: Philipp Reisner <philipp.reisner@linbit.com>
Signed-off-by: Lars Ellenberg <lars.ellenberg@linbit.com>
diff --git a/drivers/block/drbd/drbd_int.h b/drivers/block/drbd/drbd_int.h
index bef9138..03cc975 100644
--- a/drivers/block/drbd/drbd_int.h
+++ b/drivers/block/drbd/drbd_int.h
@@ -1469,6 +1469,7 @@
 extern int w_send_read_req(struct drbd_conf *, struct drbd_work *, int);
 extern int w_prev_work_done(struct drbd_conf *, struct drbd_work *, int);
 extern int w_e_reissue(struct drbd_conf *, struct drbd_work *, int);
+extern int w_restart_disk_io(struct drbd_conf *, struct drbd_work *, int);
 
 extern void resync_timer_fn(unsigned long data);
 
diff --git a/drivers/block/drbd/drbd_main.c b/drivers/block/drbd/drbd_main.c
index 7d35986..106b9ab 100644
--- a/drivers/block/drbd/drbd_main.c
+++ b/drivers/block/drbd/drbd_main.c
@@ -925,7 +925,12 @@
 	if (fp == FP_STONITH &&
 	    (ns.role == R_PRIMARY && ns.conn < C_CONNECTED && ns.pdsk > D_OUTDATED) &&
 	    !(os.role == R_PRIMARY && os.conn < C_CONNECTED && os.pdsk > D_OUTDATED))
-		ns.susp = 1;
+		ns.susp = 1; /* Suspend IO while fence-peer handler runs (peer lost) */
+
+	if (mdev->sync_conf.on_no_data == OND_SUSPEND_IO &&
+	    (ns.role == R_PRIMARY && ns.disk < D_UP_TO_DATE && ns.pdsk < D_UP_TO_DATE) &&
+	    !(os.role == R_PRIMARY && os.disk < D_UP_TO_DATE && os.pdsk < D_UP_TO_DATE))
+		ns.susp = 1; /* Suspend IO while no data available (no accessible data available) */
 
 	if (ns.aftr_isp || ns.peer_isp || ns.user_isp) {
 		if (ns.conn == C_SYNC_SOURCE)
@@ -1236,6 +1241,25 @@
 	/* Here we have the actions that are performed after a
 	   state change. This function might sleep */
 
+	if (os.susp && ns.susp && mdev->sync_conf.on_no_data == OND_SUSPEND_IO) {
+		if (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED) {
+			if (ns.conn == C_CONNECTED) {
+				spin_lock_irq(&mdev->req_lock);
+				_tl_restart(mdev, resend);
+				_drbd_set_state(_NS(mdev, susp, 0), CS_VERBOSE, NULL);
+				spin_unlock_irq(&mdev->req_lock);
+			} else /* ns.conn > C_CONNECTED */
+				dev_err(DEV, "Unexpected Resynd going on!\n");
+		}
+
+		if (os.disk == D_ATTACHING && ns.disk > D_ATTACHING) {
+			spin_lock_irq(&mdev->req_lock);
+			_tl_restart(mdev, restart_frozen_disk_io);
+			_drbd_set_state(_NS(mdev, susp, 0), CS_VERBOSE, NULL);
+			spin_unlock_irq(&mdev->req_lock);
+		}
+	}
+
 	if (fp == FP_STONITH && ns.susp) {
 		/* case1: The outdate peer handler is successful:
 		 * case2: The connection was established again: */
diff --git a/drivers/block/drbd/drbd_nl.c b/drivers/block/drbd/drbd_nl.c
index 73131c5..563a6ad 100644
--- a/drivers/block/drbd/drbd_nl.c
+++ b/drivers/block/drbd/drbd_nl.c
@@ -33,6 +33,7 @@
 #include <linux/blkpg.h>
 #include <linux/cpumask.h>
 #include "drbd_int.h"
+#include "drbd_req.h"
 #include "drbd_wrappers.h"
 #include <asm/unaligned.h>
 #include <linux/drbd_tag_magic.h>
@@ -494,6 +495,8 @@
 void drbd_suspend_io(struct drbd_conf *mdev)
 {
 	set_bit(SUSPEND_IO, &mdev->flags);
+	if (mdev->state.susp)
+		return;
 	wait_event(mdev->misc_wait, !atomic_read(&mdev->ap_bio_cnt));
 }
 
@@ -1557,6 +1560,7 @@
 		sc.rate       = DRBD_RATE_DEF;
 		sc.after      = DRBD_AFTER_DEF;
 		sc.al_extents = DRBD_AL_EXTENTS_DEF;
+		sc.on_no_data  = DRBD_ON_NO_DATA_DEF;
 	} else
 		memcpy(&sc, &mdev->sync_conf, sizeof(struct syncer_conf));
 
@@ -1765,7 +1769,16 @@
 static int drbd_nl_resume_io(struct drbd_conf *mdev, struct drbd_nl_cfg_req *nlp,
 			     struct drbd_nl_cfg_reply *reply)
 {
+	drbd_suspend_io(mdev);
 	reply->ret_code = drbd_request_state(mdev, NS(susp, 0));
+	if (reply->ret_code == SS_SUCCESS) {
+		if (mdev->state.conn < C_CONNECTED)
+			tl_clear(mdev);
+		if (mdev->state.disk == D_DISKLESS || mdev->state.disk == D_FAILED)
+			tl_restart(mdev, fail_frozen_disk_io);
+	}
+	drbd_resume_io(mdev);
+
 	return 0;
 }
 
diff --git a/drivers/block/drbd/drbd_req.c b/drivers/block/drbd/drbd_req.c
index 4864758..8259d4f 100644
--- a/drivers/block/drbd/drbd_req.c
+++ b/drivers/block/drbd/drbd_req.c
@@ -226,6 +226,8 @@
 		return;
 	if (s & RQ_LOCAL_PENDING)
 		return;
+	if (mdev->state.susp)
+		return;
 
 	if (req->master_bio) {
 		/* this is data_received (remote read)
@@ -634,6 +636,28 @@
 		/* else: done by handed_over_to_network */
 		break;
 
+	case fail_frozen_disk_io:
+		if (!(req->rq_state & RQ_LOCAL_COMPLETED))
+			break;
+
+		_req_may_be_done(req, m);
+		break;
+
+	case restart_frozen_disk_io:
+		if (!(req->rq_state & RQ_LOCAL_COMPLETED))
+			break;
+
+		req->rq_state &= ~RQ_LOCAL_COMPLETED;
+
+		rv = MR_READ;
+		if (bio_data_dir(req->master_bio) == WRITE)
+			rv = MR_WRITE;
+
+		get_ldev(mdev);
+		req->w.cb = w_restart_disk_io;
+		drbd_queue_work(&mdev->data.work, &req->w);
+		break;
+
 	case resend:
 		/* If RQ_NET_OK is already set, we got a P_WRITE_ACK or P_RECV_ACK
 		   before the connection loss; only P_BARRIER_ACK was missing.
diff --git a/drivers/block/drbd/drbd_req.h b/drivers/block/drbd/drbd_req.h
index 07cb3b1..f2e45aa 100644
--- a/drivers/block/drbd/drbd_req.h
+++ b/drivers/block/drbd/drbd_req.h
@@ -105,6 +105,8 @@
 	write_completed_with_error,
 	completed_ok,
 	resend,
+	fail_frozen_disk_io,
+	restart_frozen_disk_io,
 	nothing, /* for tracing only */
 };
 
diff --git a/drivers/block/drbd/drbd_worker.c b/drivers/block/drbd/drbd_worker.c
index ca4a16c..3c1e884 100644
--- a/drivers/block/drbd/drbd_worker.c
+++ b/drivers/block/drbd/drbd_worker.c
@@ -1173,6 +1173,24 @@
 	return ok;
 }
 
+int w_restart_disk_io(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
+{
+	struct drbd_request *req = container_of(w, struct drbd_request, w);
+
+	if (bio_data_dir(req->master_bio) == WRITE)
+		drbd_al_begin_io(mdev, req->sector);
+	/* Calling drbd_al_begin_io() out of the worker might deadlocks
+	   theoretically. Practically it can not deadlock, since this is
+	   only used when unfreezing IOs. All the extents of the requests
+	   that made it into the TL are already active */
+
+	drbd_req_make_private_bio(req, req->master_bio);
+	req->private_bio->bi_bdev = mdev->ldev->backing_bdev;
+	generic_make_request(req->private_bio);
+
+	return 1;
+}
+
 static int _drbd_may_sync_now(struct drbd_conf *mdev)
 {
 	struct drbd_conf *odev = mdev;