raid5: handle io error of batch list

If io error happens in any stripe of a batch list, the batch list will be
split, then normal process will run for the stripes in the list.

Signed-off-by: Shaohua Li <shli@fusionio.com>
Signed-off-by: NeilBrown <neilb@suse.de>
diff --git a/drivers/md/raid5.c b/drivers/md/raid5.c
index 717189e..54f3cb3 100644
--- a/drivers/md/raid5.c
+++ b/drivers/md/raid5.c
@@ -1070,6 +1070,9 @@
 			pr_debug("skip op %ld on disc %d for sector %llu\n",
 				bi->bi_rw, i, (unsigned long long)sh->sector);
 			clear_bit(R5_LOCKED, &sh->dev[i].flags);
+			if (sh->batch_head)
+				set_bit(STRIPE_BATCH_ERR,
+					&sh->batch_head->state);
 			set_bit(STRIPE_HANDLE, &sh->state);
 		}
 
@@ -2380,6 +2383,9 @@
 	}
 	rdev_dec_pending(rdev, conf->mddev);
 
+	if (sh->batch_head && !uptodate)
+		set_bit(STRIPE_BATCH_ERR, &sh->batch_head->state);
+
 	if (!test_and_clear_bit(R5_DOUBLE_LOCKED, &sh->dev[i].flags))
 		clear_bit(R5_LOCKED, &sh->dev[i].flags);
 	set_bit(STRIPE_HANDLE, &sh->state);
@@ -4124,6 +4130,46 @@
 	return 0;
 }
 
+static void check_break_stripe_batch_list(struct stripe_head *sh)
+{
+	struct stripe_head *head_sh, *next;
+	int i;
+
+	if (!test_and_clear_bit(STRIPE_BATCH_ERR, &sh->state))
+		return;
+
+	head_sh = sh;
+	do {
+		sh = list_first_entry(&sh->batch_list,
+				      struct stripe_head, batch_list);
+		BUG_ON(sh == head_sh);
+	} while (!test_bit(STRIPE_DEGRADED, &sh->state));
+
+	while (sh != head_sh) {
+		next = list_first_entry(&sh->batch_list,
+					struct stripe_head, batch_list);
+		list_del_init(&sh->batch_list);
+
+		sh->state = head_sh->state & ~((1 << STRIPE_ACTIVE) |
+					       (1 << STRIPE_PREREAD_ACTIVE) |
+					       (1 << STRIPE_DEGRADED));
+		sh->check_state = head_sh->check_state;
+		sh->reconstruct_state = head_sh->reconstruct_state;
+		for (i = 0; i < sh->disks; i++)
+			sh->dev[i].flags = head_sh->dev[i].flags &
+				(~((1 << R5_WriteError) | (1 << R5_Overlap)));
+
+		spin_lock_irq(&sh->stripe_lock);
+		sh->batch_head = NULL;
+		spin_unlock_irq(&sh->stripe_lock);
+
+		set_bit(STRIPE_HANDLE, &sh->state);
+		release_stripe(sh);
+
+		sh = next;
+	}
+}
+
 static void handle_stripe(struct stripe_head *sh)
 {
 	struct stripe_head_state s;
@@ -4146,6 +4192,8 @@
 		return;
 	}
 
+	check_break_stripe_batch_list(sh);
+
 	if (test_bit(STRIPE_SYNC_REQUESTED, &sh->state)) {
 		spin_lock(&sh->stripe_lock);
 		/* Cannot process 'sync' concurrently with 'discard' */