dm: implement basic barrier support

Barriers are submitted to a worker thread that issues them in-order.

The thread is modified so that when it sees a barrier request it waits
for all pending IO before the request then submits the barrier and
waits for it.  (We must wait, otherwise it could be intermixed with
following requests.)

Errors from the barrier request are recorded in a per-device barrier_error
variable. There may be only one barrier request in progress at once.

For now, the barrier request is converted to a non-barrier request when
sending it to the underlying device.

This patch guarantees correct barrier behavior if the underlying device
doesn't perform write-back caching. The same requirement existed before
barriers were supported in dm.

Bottom layer barrier support (sending barriers by target drivers) and
handling devices with write-back caches will be done in further patches.

Signed-off-by: Mikulas Patocka <mpatocka@redhat.com>
Signed-off-by: Alasdair G Kergon <agk@redhat.com>
diff --git a/drivers/md/dm.c b/drivers/md/dm.c
index db022e5..8a994be 100644
--- a/drivers/md/dm.c
+++ b/drivers/md/dm.c
@@ -125,6 +125,11 @@
 	spinlock_t deferred_lock;
 
 	/*
+	 * An error from the barrier request currently being processed.
+	 */
+	int barrier_error;
+
+	/*
 	 * Processing queue (flush/barriers)
 	 */
 	struct workqueue_struct *wq;
@@ -425,6 +430,10 @@
 	part_stat_add(cpu, &dm_disk(md)->part0, ticks[rw], duration);
 	part_stat_unlock();
 
+	/*
+	 * After this is decremented the bio must not be touched if it is
+	 * a barrier.
+	 */
 	dm_disk(md)->part0.in_flight = pending =
 		atomic_dec_return(&md->pending);
 
@@ -531,25 +540,35 @@
 			 */
 			spin_lock_irqsave(&md->deferred_lock, flags);
 			if (__noflush_suspending(md))
-				bio_list_add(&md->deferred, io->bio);
+				bio_list_add_head(&md->deferred, io->bio);
 			else
 				/* noflush suspend was interrupted. */
 				io->error = -EIO;
 			spin_unlock_irqrestore(&md->deferred_lock, flags);
 		}
 
-		end_io_acct(io);
-
 		io_error = io->error;
 		bio = io->bio;
 
-		free_io(md, io);
+		if (bio_barrier(bio)) {
+			/*
+			 * There can be just one barrier request so we use
+			 * a per-device variable for error reporting.
+			 * Note that you can't touch the bio after end_io_acct
+			 */
+			md->barrier_error = io_error;
+			end_io_acct(io);
+		} else {
+			end_io_acct(io);
 
-		if (io_error != DM_ENDIO_REQUEUE) {
-			trace_block_bio_complete(md->queue, bio);
+			if (io_error != DM_ENDIO_REQUEUE) {
+				trace_block_bio_complete(md->queue, bio);
 
-			bio_endio(bio, io_error);
+				bio_endio(bio, io_error);
+			}
 		}
+
+		free_io(md, io);
 	}
 }
 
@@ -691,7 +710,7 @@
 
 	clone->bi_sector = sector;
 	clone->bi_bdev = bio->bi_bdev;
-	clone->bi_rw = bio->bi_rw;
+	clone->bi_rw = bio->bi_rw & ~(1 << BIO_RW_BARRIER);
 	clone->bi_vcnt = 1;
 	clone->bi_size = to_bytes(len);
 	clone->bi_io_vec->bv_offset = offset;
@@ -718,6 +737,7 @@
 
 	clone = bio_alloc_bioset(GFP_NOIO, bio->bi_max_vecs, bs);
 	__bio_clone(clone, bio);
+	clone->bi_rw &= ~(1 << BIO_RW_BARRIER);
 	clone->bi_destructor = dm_bio_destructor;
 	clone->bi_sector = sector;
 	clone->bi_idx = idx;
@@ -846,7 +866,10 @@
 
 	ci.map = dm_get_table(md);
 	if (unlikely(!ci.map)) {
-		bio_io_error(bio);
+		if (!bio_barrier(bio))
+			bio_io_error(bio);
+		else
+			md->barrier_error = -EIO;
 		return;
 	}
 
@@ -930,15 +953,6 @@
 	struct mapped_device *md = q->queuedata;
 	int cpu;
 
-	/*
-	 * There is no use in forwarding any barrier request since we can't
-	 * guarantee it is (or can be) handled by the targets correctly.
-	 */
-	if (unlikely(bio_barrier(bio))) {
-		bio_endio(bio, -EOPNOTSUPP);
-		return 0;
-	}
-
 	down_read(&md->io_lock);
 
 	cpu = part_stat_lock();
@@ -950,7 +964,8 @@
 	 * If we're suspended or the thread is processing barriers
 	 * we have to queue this io for later.
 	 */
-	if (unlikely(test_bit(DMF_QUEUE_IO_TO_THREAD, &md->flags))) {
+	if (unlikely(test_bit(DMF_QUEUE_IO_TO_THREAD, &md->flags)) ||
+	    unlikely(bio_barrier(bio))) {
 		up_read(&md->io_lock);
 
 		if (unlikely(test_bit(DMF_BLOCK_IO_FOR_SUSPEND, &md->flags)) &&
@@ -1415,6 +1430,36 @@
 	return r;
 }
 
+static int dm_flush(struct mapped_device *md)
+{
+	dm_wait_for_completion(md, TASK_UNINTERRUPTIBLE);
+	return 0;
+}
+
+static void process_barrier(struct mapped_device *md, struct bio *bio)
+{
+	int error = dm_flush(md);
+
+	if (unlikely(error)) {
+		bio_endio(bio, error);
+		return;
+	}
+	if (bio_empty_barrier(bio)) {
+		bio_endio(bio, 0);
+		return;
+	}
+
+	__split_and_process_bio(md, bio);
+
+	error = dm_flush(md);
+
+	if (!error && md->barrier_error)
+		error = md->barrier_error;
+
+	if (md->barrier_error != DM_ENDIO_REQUEUE)
+		bio_endio(bio, error);
+}
+
 /*
  * Process the deferred bios
  */
@@ -1438,7 +1483,10 @@
 
 		up_write(&md->io_lock);
 
-		__split_and_process_bio(md, c);
+		if (bio_barrier(c))
+			process_barrier(md, c);
+		else
+			__split_and_process_bio(md, c);
 
 		down_write(&md->io_lock);
 	}