drbd: Removed the BIO_RW_BARRIER support form the receiver/epoch code

Signed-off-by: Philipp Reisner <philipp.reisner@linbit.com>
Signed-off-by: Lars Ellenberg <lars.ellenberg@linbit.com>
diff --git a/drivers/block/drbd/drbd_receiver.c b/drivers/block/drbd/drbd_receiver.c
index 1146faa..2952c12 100644
--- a/drivers/block/drbd/drbd_receiver.c
+++ b/drivers/block/drbd/drbd_receiver.c
@@ -49,11 +49,6 @@
 
 #include "drbd_vli.h"
 
-struct flush_work {
-	struct drbd_work w;
-	struct drbd_epoch *epoch;
-};
-
 enum finish_epoch {
 	FE_STILL_LIVE,
 	FE_DESTROYED,
@@ -66,16 +61,6 @@
 static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *, struct drbd_epoch *, enum epoch_event);
 static int e_end_block(struct drbd_conf *, struct drbd_work *, int);
 
-static struct drbd_epoch *previous_epoch(struct drbd_conf *mdev, struct drbd_epoch *epoch)
-{
-	struct drbd_epoch *prev;
-	spin_lock(&mdev->epoch_lock);
-	prev = list_entry(epoch->list.prev, struct drbd_epoch, list);
-	if (prev == epoch || prev == mdev->current_epoch)
-		prev = NULL;
-	spin_unlock(&mdev->epoch_lock);
-	return prev;
-}
 
 #define GFP_TRY	(__GFP_HIGHMEM | __GFP_NOWARN)
 
@@ -981,7 +966,7 @@
 	return TRUE;
 }
 
-static enum finish_epoch drbd_flush_after_epoch(struct drbd_conf *mdev, struct drbd_epoch *epoch)
+static void drbd_flush(struct drbd_conf *mdev)
 {
 	int rv;
 
@@ -997,24 +982,6 @@
 		}
 		put_ldev(mdev);
 	}
-
-	return drbd_may_finish_epoch(mdev, epoch, EV_BARRIER_DONE);
-}
-
-static int w_flush(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
-{
-	struct flush_work *fw = (struct flush_work *)w;
-	struct drbd_epoch *epoch = fw->epoch;
-
-	kfree(w);
-
-	if (!test_and_set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &epoch->flags))
-		drbd_flush_after_epoch(mdev, epoch);
-
-	drbd_may_finish_epoch(mdev, epoch, EV_PUT |
-			      (mdev->state.conn < C_CONNECTED ? EV_CLEANUP : 0));
-
-	return 1;
 }
 
 /**
@@ -1027,15 +994,13 @@
 					       struct drbd_epoch *epoch,
 					       enum epoch_event ev)
 {
-	int finish, epoch_size;
+	int epoch_size;
 	struct drbd_epoch *next_epoch;
-	int schedule_flush = 0;
 	enum finish_epoch rv = FE_STILL_LIVE;
 
 	spin_lock(&mdev->epoch_lock);
 	do {
 		next_epoch = NULL;
-		finish = 0;
 
 		epoch_size = atomic_read(&epoch->epoch_size);
 
@@ -1045,16 +1010,6 @@
 			break;
 		case EV_GOT_BARRIER_NR:
 			set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
-
-			/* Special case: If we just switched from WO_bio_barrier to
-			   WO_bdev_flush we should not finish the current epoch */
-			if (test_bit(DE_CONTAINS_A_BARRIER, &epoch->flags) && epoch_size == 1 &&
-			    mdev->write_ordering != WO_bio_barrier &&
-			    epoch == mdev->current_epoch)
-				clear_bit(DE_CONTAINS_A_BARRIER, &epoch->flags);
-			break;
-		case EV_BARRIER_DONE:
-			set_bit(DE_BARRIER_IN_NEXT_EPOCH_DONE, &epoch->flags);
 			break;
 		case EV_BECAME_LAST:
 			/* nothing to do*/
@@ -1063,23 +1018,7 @@
 
 		if (epoch_size != 0 &&
 		    atomic_read(&epoch->active) == 0 &&
-		    test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags) &&
-		    epoch->list.prev == &mdev->current_epoch->list &&
-		    !test_bit(DE_IS_FINISHING, &epoch->flags)) {
-			/* Nearly all conditions are met to finish that epoch... */
-			if (test_bit(DE_BARRIER_IN_NEXT_EPOCH_DONE, &epoch->flags) ||
-			    mdev->write_ordering == WO_none ||
-			    (epoch_size == 1 && test_bit(DE_CONTAINS_A_BARRIER, &epoch->flags)) ||
-			    ev & EV_CLEANUP) {
-				finish = 1;
-				set_bit(DE_IS_FINISHING, &epoch->flags);
-			} else if (!test_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &epoch->flags) &&
-				 mdev->write_ordering == WO_bio_barrier) {
-				atomic_inc(&epoch->active);
-				schedule_flush = 1;
-			}
-		}
-		if (finish) {
+		    test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags)) {
 			if (!(ev & EV_CLEANUP)) {
 				spin_unlock(&mdev->epoch_lock);
 				drbd_send_b_ack(mdev, epoch->barrier_nr, epoch_size);
@@ -1102,6 +1041,7 @@
 				/* atomic_set(&epoch->active, 0); is already zero */
 				if (rv == FE_STILL_LIVE)
 					rv = FE_RECYCLED;
+				wake_up(&mdev->ee_wait);
 			}
 		}
 
@@ -1113,22 +1053,6 @@
 
 	spin_unlock(&mdev->epoch_lock);
 
-	if (schedule_flush) {
-		struct flush_work *fw;
-		fw = kmalloc(sizeof(*fw), GFP_ATOMIC);
-		if (fw) {
-			fw->w.cb = w_flush;
-			fw->epoch = epoch;
-			drbd_queue_work(&mdev->data.work, &fw->w);
-		} else {
-			dev_warn(DEV, "Could not kmalloc a flush_work obj\n");
-			set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &epoch->flags);
-			/* That is not a recursion, only one level */
-			drbd_may_finish_epoch(mdev, epoch, EV_BARRIER_DONE);
-			drbd_may_finish_epoch(mdev, epoch, EV_PUT);
-		}
-	}
-
 	return rv;
 }
 
@@ -1144,19 +1068,16 @@
 		[WO_none] = "none",
 		[WO_drain_io] = "drain",
 		[WO_bdev_flush] = "flush",
-		[WO_bio_barrier] = "barrier",
 	};
 
 	pwo = mdev->write_ordering;
 	wo = min(pwo, wo);
-	if (wo == WO_bio_barrier && mdev->ldev->dc.no_disk_barrier)
-		wo = WO_bdev_flush;
 	if (wo == WO_bdev_flush && mdev->ldev->dc.no_disk_flush)
 		wo = WO_drain_io;
 	if (wo == WO_drain_io && mdev->ldev->dc.no_disk_drain)
 		wo = WO_none;
 	mdev->write_ordering = wo;
-	if (pwo != mdev->write_ordering || wo == WO_bio_barrier)
+	if (pwo != mdev->write_ordering || wo == WO_bdev_flush)
 		dev_info(DEV, "Method to ensure write ordering: %s\n", write_ordering_str[mdev->write_ordering]);
 }
 
@@ -1192,7 +1113,7 @@
 	bio->bi_sector = sector;
 	bio->bi_bdev = mdev->ldev->backing_bdev;
 	/* we special case some flags in the multi-bio case, see below
-	 * (REQ_UNPLUG, REQ_HARDBARRIER) */
+	 * (REQ_UNPLUG) */
 	bio->bi_rw = rw;
 	bio->bi_private = e;
 	bio->bi_end_io = drbd_endio_sec;
@@ -1226,11 +1147,6 @@
 			bio->bi_rw &= ~REQ_UNPLUG;
 
 		drbd_generic_make_request(mdev, fault_type, bio);
-
-		/* strip off REQ_HARDBARRIER,
-		 * unless it is the first or last bio */
-		if (bios && bios->bi_next)
-			bios->bi_rw &= ~REQ_HARDBARRIER;
 	} while (bios);
 	maybe_kick_lo(mdev);
 	return 0;
@@ -1244,45 +1160,9 @@
 	return -ENOMEM;
 }
 
-/**
- * w_e_reissue() - Worker callback; Resubmit a bio, without REQ_HARDBARRIER set
- * @mdev:	DRBD device.
- * @w:		work object.
- * @cancel:	The connection will be closed anyways (unused in this callback)
- */
-int w_e_reissue(struct drbd_conf *mdev, struct drbd_work *w, int cancel) __releases(local)
-{
-	struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
-	/* We leave DE_CONTAINS_A_BARRIER and EE_IS_BARRIER in place,
-	   (and DE_BARRIER_IN_NEXT_EPOCH_ISSUED in the previous Epoch)
-	   so that we can finish that epoch in drbd_may_finish_epoch().
-	   That is necessary if we already have a long chain of Epochs, before
-	   we realize that REQ_HARDBARRIER is actually not supported */
-
-	/* As long as the -ENOTSUPP on the barrier is reported immediately
-	   that will never trigger. If it is reported late, we will just
-	   print that warning and continue correctly for all future requests
-	   with WO_bdev_flush */
-	if (previous_epoch(mdev, e->epoch))
-		dev_warn(DEV, "Write ordering was not enforced (one time event)\n");
-
-	/* we still have a local reference,
-	 * get_ldev was done in receive_Data. */
-
-	e->w.cb = e_end_block;
-	if (drbd_submit_ee(mdev, e, WRITE, DRBD_FAULT_DT_WR) != 0) {
-		/* drbd_submit_ee fails for one reason only:
-		 * if was not able to allocate sufficient bios.
-		 * requeue, try again later. */
-		e->w.cb = w_e_reissue;
-		drbd_queue_work(&mdev->data.work, &e->w);
-	}
-	return 1;
-}
-
 static int receive_Barrier(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
 {
-	int rv, issue_flush;
+	int rv;
 	struct p_barrier *p = &mdev->data.rbuf.barrier;
 	struct drbd_epoch *epoch;
 
@@ -1300,44 +1180,40 @@
 	 * Therefore we must send the barrier_ack after the barrier request was
 	 * completed. */
 	switch (mdev->write_ordering) {
-	case WO_bio_barrier:
 	case WO_none:
 		if (rv == FE_RECYCLED)
 			return TRUE;
-		break;
+
+		/* receiver context, in the writeout path of the other node.
+		 * avoid potential distributed deadlock */
+		epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
+		if (epoch)
+			break;
+		else
+			dev_warn(DEV, "Allocation of an epoch failed, slowing down\n");
+			/* Fall through */
 
 	case WO_bdev_flush:
 	case WO_drain_io:
-		if (rv == FE_STILL_LIVE) {
-			set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &mdev->current_epoch->flags);
-			drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
-			rv = drbd_flush_after_epoch(mdev, mdev->current_epoch);
-		}
-		if (rv == FE_RECYCLED)
-			return TRUE;
-
-		/* The asender will send all the ACKs and barrier ACKs out, since
-		   all EEs moved from the active_ee to the done_ee. We need to
-		   provide a new epoch object for the EEs that come in soon */
-		break;
-	}
-
-	/* receiver context, in the writeout path of the other node.
-	 * avoid potential distributed deadlock */
-	epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
-	if (!epoch) {
-		dev_warn(DEV, "Allocation of an epoch failed, slowing down\n");
-		issue_flush = !test_and_set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &mdev->current_epoch->flags);
 		drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
-		if (issue_flush) {
-			rv = drbd_flush_after_epoch(mdev, mdev->current_epoch);
-			if (rv == FE_RECYCLED)
-				return TRUE;
+		drbd_flush(mdev);
+
+		if (atomic_read(&mdev->current_epoch->epoch_size)) {
+			epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
+			if (epoch)
+				break;
 		}
 
-		drbd_wait_ee_list_empty(mdev, &mdev->done_ee);
+		epoch = mdev->current_epoch;
+		wait_event(mdev->ee_wait, atomic_read(&epoch->epoch_size) == 0);
+
+		D_ASSERT(atomic_read(&epoch->active) == 0);
+		D_ASSERT(epoch->flags == 0);
 
 		return TRUE;
+	default:
+		dev_err(DEV, "Strangeness in mdev->write_ordering %d\n", mdev->write_ordering);
+		return FALSE;
 	}
 
 	epoch->flags = 0;
@@ -1652,15 +1528,8 @@
 {
 	struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
 	sector_t sector = e->sector;
-	struct drbd_epoch *epoch;
 	int ok = 1, pcmd;
 
-	if (e->flags & EE_IS_BARRIER) {
-		epoch = previous_epoch(mdev, e->epoch);
-		if (epoch)
-			drbd_may_finish_epoch(mdev, epoch, EV_BARRIER_DONE + (cancel ? EV_CLEANUP : 0));
-	}
-
 	if (mdev->net_conf->wire_protocol == DRBD_PROT_C) {
 		if (likely((e->flags & EE_WAS_ERROR) == 0)) {
 			pcmd = (mdev->state.conn >= C_SYNC_SOURCE &&
@@ -1817,27 +1686,6 @@
 	e->epoch = mdev->current_epoch;
 	atomic_inc(&e->epoch->epoch_size);
 	atomic_inc(&e->epoch->active);
-
-	if (mdev->write_ordering == WO_bio_barrier && atomic_read(&e->epoch->epoch_size) == 1) {
-		struct drbd_epoch *epoch;
-		/* Issue a barrier if we start a new epoch, and the previous epoch
-		   was not a epoch containing a single request which already was
-		   a Barrier. */
-		epoch = list_entry(e->epoch->list.prev, struct drbd_epoch, list);
-		if (epoch == e->epoch) {
-			set_bit(DE_CONTAINS_A_BARRIER, &e->epoch->flags);
-			rw |= REQ_HARDBARRIER;
-			e->flags |= EE_IS_BARRIER;
-		} else {
-			if (atomic_read(&epoch->epoch_size) > 1 ||
-			    !test_bit(DE_CONTAINS_A_BARRIER, &epoch->flags)) {
-				set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &epoch->flags);
-				set_bit(DE_CONTAINS_A_BARRIER, &e->epoch->flags);
-				rw |= REQ_HARDBARRIER;
-				e->flags |= EE_IS_BARRIER;
-			}
-		}
-	}
 	spin_unlock(&mdev->epoch_lock);
 
 	dp_flags = be32_to_cpu(p->dp_flags);