raid5-ppl: PPL support for disks with write-back cache enabled

In order to provide data consistency with PPL for disks with write-back
cache enabled all data has to be flushed to disks before next PPL
entry. The disks to be flushed are marked in the bitmap. It's modified
under a mutex and it's only read after PPL io unit is submitted.

A limitation of 64 disks in the array has been introduced to keep data
structures and implementation simple. RAID5 arrays with so many disks are
not likely due to high risk of multiple disks failure. Such restriction
should not be a real life limitation.

With write-back cache disabled next PPL entry is submitted when data write
for current one completes. Data flush defers next log submission so trigger
it when there are no stripes for handling found.

As PPL assures all data is flushed to disk at request completion, just
acknowledge flush request when PPL is enabled.

Signed-off-by: Tomasz Majchrzak <tomasz.majchrzak@intel.com>
Signed-off-by: Shaohua Li <sh.li@alibaba-inc.com>
diff --git a/drivers/md/raid5-ppl.c b/drivers/md/raid5-ppl.c
index 628c0bf..2764c22 100644
--- a/drivers/md/raid5-ppl.c
+++ b/drivers/md/raid5-ppl.c
@@ -85,6 +85,9 @@
  * (for a single member disk). New io_units are added to the end of the list
  * and the first io_unit is submitted, if it is not submitted already.
  * The current io_unit accepting new stripes is always at the end of the list.
+ *
+ * If write-back cache is enabled for any of the disks in the array, its data
+ * must be flushed before next io_unit is submitted.
  */
 
 #define PPL_SPACE_SIZE (128 * 1024)
@@ -104,6 +107,7 @@
 	struct kmem_cache *io_kc;
 	mempool_t *io_pool;
 	struct bio_set *bs;
+	struct bio_set *flush_bs;
 
 	/* used only for recovery */
 	int recovered_entries;
@@ -128,6 +132,8 @@
 	sector_t next_io_sector;
 	unsigned int entry_space;
 	bool use_multippl;
+	bool wb_cache_on;
+	unsigned long disk_flush_bitmap;
 };
 
 #define PPL_IO_INLINE_BVECS 32
@@ -145,6 +151,7 @@
 
 	struct list_head stripe_list;	/* stripes added to the io_unit */
 	atomic_t pending_stripes;	/* how many stripes not written to raid */
+	atomic_t pending_flushes;	/* how many disk flushes are in progress */
 
 	bool submitted;			/* true if write to log started */
 
@@ -249,6 +256,7 @@
 	INIT_LIST_HEAD(&io->log_sibling);
 	INIT_LIST_HEAD(&io->stripe_list);
 	atomic_set(&io->pending_stripes, 0);
+	atomic_set(&io->pending_flushes, 0);
 	bio_init(&io->bio, io->biovec, PPL_IO_INLINE_BVECS);
 
 	pplhdr = page_address(io->header_page);
@@ -475,7 +483,18 @@
 	if (log->use_multippl)
 		log->next_io_sector += (PPL_HEADER_SIZE + io->pp_size) >> 9;
 
+	WARN_ON(log->disk_flush_bitmap != 0);
+
 	list_for_each_entry(sh, &io->stripe_list, log_list) {
+		for (i = 0; i < sh->disks; i++) {
+			struct r5dev *dev = &sh->dev[i];
+
+			if ((ppl_conf->child_logs[i].wb_cache_on) &&
+			    (test_bit(R5_Wantwrite, &dev->flags))) {
+				set_bit(i, &log->disk_flush_bitmap);
+			}
+		}
+
 		/* entries for full stripe writes have no partial parity */
 		if (test_bit(STRIPE_FULL_WRITE, &sh->state))
 			continue;
@@ -540,6 +559,7 @@
 {
 	struct ppl_log *log = io->log;
 	struct ppl_conf *ppl_conf = log->ppl_conf;
+	struct r5conf *conf = ppl_conf->mddev->private;
 	unsigned long flags;
 
 	pr_debug("%s: seq: %llu\n", __func__, io->seq);
@@ -565,6 +585,112 @@
 	spin_unlock(&ppl_conf->no_mem_stripes_lock);
 
 	local_irq_restore(flags);
+
+	wake_up(&conf->wait_for_quiescent);
+}
+
+static void ppl_flush_endio(struct bio *bio)
+{
+	struct ppl_io_unit *io = bio->bi_private;
+	struct ppl_log *log = io->log;
+	struct ppl_conf *ppl_conf = log->ppl_conf;
+	struct r5conf *conf = ppl_conf->mddev->private;
+	char b[BDEVNAME_SIZE];
+
+	pr_debug("%s: dev: %s\n", __func__, bio_devname(bio, b));
+
+	if (bio->bi_status) {
+		struct md_rdev *rdev;
+
+		rcu_read_lock();
+		rdev = md_find_rdev_rcu(conf->mddev, bio_dev(bio));
+		if (rdev)
+			md_error(rdev->mddev, rdev);
+		rcu_read_unlock();
+	}
+
+	bio_put(bio);
+
+	if (atomic_dec_and_test(&io->pending_flushes)) {
+		ppl_io_unit_finished(io);
+		md_wakeup_thread(conf->mddev->thread);
+	}
+}
+
+static void ppl_do_flush(struct ppl_io_unit *io)
+{
+	struct ppl_log *log = io->log;
+	struct ppl_conf *ppl_conf = log->ppl_conf;
+	struct r5conf *conf = ppl_conf->mddev->private;
+	int raid_disks = conf->raid_disks;
+	int flushed_disks = 0;
+	int i;
+
+	atomic_set(&io->pending_flushes, raid_disks);
+
+	for_each_set_bit(i, &log->disk_flush_bitmap, raid_disks) {
+		struct md_rdev *rdev;
+		struct block_device *bdev = NULL;
+
+		rcu_read_lock();
+		rdev = rcu_dereference(conf->disks[i].rdev);
+		if (rdev && !test_bit(Faulty, &rdev->flags))
+			bdev = rdev->bdev;
+		rcu_read_unlock();
+
+		if (bdev) {
+			struct bio *bio;
+			char b[BDEVNAME_SIZE];
+
+			bio = bio_alloc_bioset(GFP_NOIO, 0, ppl_conf->flush_bs);
+			bio_set_dev(bio, bdev);
+			bio->bi_private = io;
+			bio->bi_opf = REQ_OP_WRITE | REQ_PREFLUSH;
+			bio->bi_end_io = ppl_flush_endio;
+
+			pr_debug("%s: dev: %s\n", __func__,
+				 bio_devname(bio, b));
+
+			submit_bio(bio);
+			flushed_disks++;
+		}
+	}
+
+	log->disk_flush_bitmap = 0;
+
+	for (i = flushed_disks ; i < raid_disks; i++) {
+		if (atomic_dec_and_test(&io->pending_flushes))
+			ppl_io_unit_finished(io);
+	}
+}
+
+static inline bool ppl_no_io_unit_submitted(struct r5conf *conf,
+					    struct ppl_log *log)
+{
+	struct ppl_io_unit *io;
+
+	io = list_first_entry_or_null(&log->io_list, struct ppl_io_unit,
+				      log_sibling);
+
+	return !io || !io->submitted;
+}
+
+void ppl_quiesce(struct r5conf *conf, int quiesce)
+{
+	struct ppl_conf *ppl_conf = conf->log_private;
+	int i;
+
+	if (quiesce) {
+		for (i = 0; i < ppl_conf->count; i++) {
+			struct ppl_log *log = &ppl_conf->child_logs[i];
+
+			spin_lock_irq(&log->io_list_lock);
+			wait_event_lock_irq(conf->wait_for_quiescent,
+					    ppl_no_io_unit_submitted(conf, log),
+					    log->io_list_lock);
+			spin_unlock_irq(&log->io_list_lock);
+		}
+	}
 }
 
 void ppl_stripe_write_finished(struct stripe_head *sh)
@@ -574,8 +700,12 @@
 	io = sh->ppl_io;
 	sh->ppl_io = NULL;
 
-	if (io && atomic_dec_and_test(&io->pending_stripes))
-		ppl_io_unit_finished(io);
+	if (io && atomic_dec_and_test(&io->pending_stripes)) {
+		if (io->log->disk_flush_bitmap)
+			ppl_do_flush(io);
+		else
+			ppl_io_unit_finished(io);
+	}
 }
 
 static void ppl_xor(int size, struct page *page1, struct page *page2)
@@ -1108,6 +1238,8 @@
 
 	if (ppl_conf->bs)
 		bioset_free(ppl_conf->bs);
+	if (ppl_conf->flush_bs)
+		bioset_free(ppl_conf->flush_bs);
 	mempool_destroy(ppl_conf->io_pool);
 	kmem_cache_destroy(ppl_conf->io_kc);
 
@@ -1173,6 +1305,8 @@
 
 static void ppl_init_child_log(struct ppl_log *log, struct md_rdev *rdev)
 {
+	struct request_queue *q;
+
 	if ((rdev->ppl.size << 9) >= (PPL_SPACE_SIZE +
 				      PPL_HEADER_SIZE) * 2) {
 		log->use_multippl = true;
@@ -1185,6 +1319,10 @@
 				   PPL_HEADER_SIZE;
 	}
 	log->next_io_sector = rdev->ppl.sector;
+
+	q = bdev_get_queue(rdev->bdev);
+	if (test_bit(QUEUE_FLAG_WC, &q->queue_flags))
+		log->wb_cache_on = true;
 }
 
 int ppl_init_log(struct r5conf *conf)
@@ -1192,8 +1330,8 @@
 	struct ppl_conf *ppl_conf;
 	struct mddev *mddev = conf->mddev;
 	int ret = 0;
+	int max_disks;
 	int i;
-	bool need_cache_flush = false;
 
 	pr_debug("md/raid:%s: enabling distributed Partial Parity Log\n",
 		 mdname(conf->mddev));
@@ -1219,6 +1357,14 @@
 		return -EINVAL;
 	}
 
+	max_disks = FIELD_SIZEOF(struct ppl_log, disk_flush_bitmap) *
+		BITS_PER_BYTE;
+	if (conf->raid_disks > max_disks) {
+		pr_warn("md/raid:%s PPL doesn't support over %d disks in the array\n",
+			mdname(mddev), max_disks);
+		return -EINVAL;
+	}
+
 	ppl_conf = kzalloc(sizeof(struct ppl_conf), GFP_KERNEL);
 	if (!ppl_conf)
 		return -ENOMEM;
@@ -1244,6 +1390,12 @@
 		goto err;
 	}
 
+	ppl_conf->flush_bs = bioset_create(conf->raid_disks, 0, 0);
+	if (!ppl_conf->flush_bs) {
+		ret = -ENOMEM;
+		goto err;
+	}
+
 	ppl_conf->count = conf->raid_disks;
 	ppl_conf->child_logs = kcalloc(ppl_conf->count, sizeof(struct ppl_log),
 				       GFP_KERNEL);
@@ -1275,23 +1427,14 @@
 		log->rdev = rdev;
 
 		if (rdev) {
-			struct request_queue *q;
-
 			ret = ppl_validate_rdev(rdev);
 			if (ret)
 				goto err;
 
-			q = bdev_get_queue(rdev->bdev);
-			if (test_bit(QUEUE_FLAG_WC, &q->queue_flags))
-				need_cache_flush = true;
 			ppl_init_child_log(log, rdev);
 		}
 	}
 
-	if (need_cache_flush)
-		pr_warn("md/raid:%s: Volatile write-back cache should be disabled on all member drives when using PPL!\n",
-			mdname(mddev));
-
 	/* load and possibly recover the logs from the member disks */
 	ret = ppl_load(ppl_conf);