Merge tag 'dm-3.16-changes' of git://git.kernel.org/pub/scm/linux/kernel/git/device-mapper/linux-dm

Pull device mapper updates from Mike Snitzer:
 "This pull request is later than I'd have liked because I was waiting
  for some performance data to help finally justify sending the
  long-standing dm-crypt cpu scalability improvements upstream.

  Unfortunately we came up short, so those dm-crypt changes will
  continue to wait, but it seems we're not far off.

   . Add dm_accept_partial_bio interface to DM core to allow DM targets
     to only process a portion of a bio, the remainder being sent in the
     next bio.  This enables the old dm snapshot-origin target to only
     split write bios on chunk boundaries, read bios are now sent to the
     origin device unchanged.

   . Add DM core support for disabling WRITE SAME if the underlying SCSI
     layer disables it due to command failure.

   . Reduce lock contention in DM's bio-prison.

   . A few small cleanups and fixes to dm-thin and dm-era"

* tag 'dm-3.16-changes' of git://git.kernel.org/pub/scm/linux/kernel/git/device-mapper/linux-dm:
  dm thin: update discard_granularity to reflect the thin-pool blocksize
  dm bio prison: implement per bucket locking in the dm_bio_prison hash table
  dm: remove symbol export for dm_set_device_limits
  dm: disable WRITE SAME if it fails
  dm era: check for a non-NULL metadata object before closing it
  dm thin: return ENOSPC instead of EIO when error_if_no_space enabled
  dm thin: cleanup noflush_work to use a proper completion
  dm snapshot: do not split read bios sent to snapshot-origin target
  dm snapshot: allocate a per-target structure for snapshot-origin target
  dm: introduce dm_accept_partial_bio
  dm: change sector_count member in clone_info from sector_t to unsigned
diff --git a/drivers/md/dm-bio-prison.c b/drivers/md/dm-bio-prison.c
index 85f0b70..f752d12 100644
--- a/drivers/md/dm-bio-prison.c
+++ b/drivers/md/dm-bio-prison.c
@@ -14,13 +14,17 @@
 
 /*----------------------------------------------------------------*/
 
-struct dm_bio_prison {
+struct bucket {
 	spinlock_t lock;
+	struct hlist_head cells;
+};
+
+struct dm_bio_prison {
 	mempool_t *cell_pool;
 
 	unsigned nr_buckets;
 	unsigned hash_mask;
-	struct hlist_head *cells;
+	struct bucket *buckets;
 };
 
 /*----------------------------------------------------------------*/
@@ -40,6 +44,12 @@
 
 static struct kmem_cache *_cell_cache;
 
+static void init_bucket(struct bucket *b)
+{
+	spin_lock_init(&b->lock);
+	INIT_HLIST_HEAD(&b->cells);
+}
+
 /*
  * @nr_cells should be the number of cells you want in use _concurrently_.
  * Don't confuse it with the number of distinct keys.
@@ -49,13 +59,12 @@
 	unsigned i;
 	uint32_t nr_buckets = calc_nr_buckets(nr_cells);
 	size_t len = sizeof(struct dm_bio_prison) +
-		(sizeof(struct hlist_head) * nr_buckets);
+		(sizeof(struct bucket) * nr_buckets);
 	struct dm_bio_prison *prison = kmalloc(len, GFP_KERNEL);
 
 	if (!prison)
 		return NULL;
 
-	spin_lock_init(&prison->lock);
 	prison->cell_pool = mempool_create_slab_pool(nr_cells, _cell_cache);
 	if (!prison->cell_pool) {
 		kfree(prison);
@@ -64,9 +73,9 @@
 
 	prison->nr_buckets = nr_buckets;
 	prison->hash_mask = nr_buckets - 1;
-	prison->cells = (struct hlist_head *) (prison + 1);
+	prison->buckets = (struct bucket *) (prison + 1);
 	for (i = 0; i < nr_buckets; i++)
-		INIT_HLIST_HEAD(prison->cells + i);
+		init_bucket(prison->buckets + i);
 
 	return prison;
 }
@@ -107,40 +116,44 @@
 		       (lhs->block == rhs->block);
 }
 
-static struct dm_bio_prison_cell *__search_bucket(struct hlist_head *bucket,
+static struct bucket *get_bucket(struct dm_bio_prison *prison,
+				 struct dm_cell_key *key)
+{
+	return prison->buckets + hash_key(prison, key);
+}
+
+static struct dm_bio_prison_cell *__search_bucket(struct bucket *b,
 						  struct dm_cell_key *key)
 {
 	struct dm_bio_prison_cell *cell;
 
-	hlist_for_each_entry(cell, bucket, list)
+	hlist_for_each_entry(cell, &b->cells, list)
 		if (keys_equal(&cell->key, key))
 			return cell;
 
 	return NULL;
 }
 
-static void __setup_new_cell(struct dm_bio_prison *prison,
+static void __setup_new_cell(struct bucket *b,
 			     struct dm_cell_key *key,
 			     struct bio *holder,
-			     uint32_t hash,
 			     struct dm_bio_prison_cell *cell)
 {
 	memcpy(&cell->key, key, sizeof(cell->key));
 	cell->holder = holder;
 	bio_list_init(&cell->bios);
-	hlist_add_head(&cell->list, prison->cells + hash);
+	hlist_add_head(&cell->list, &b->cells);
 }
 
-static int __bio_detain(struct dm_bio_prison *prison,
+static int __bio_detain(struct bucket *b,
 			struct dm_cell_key *key,
 			struct bio *inmate,
 			struct dm_bio_prison_cell *cell_prealloc,
 			struct dm_bio_prison_cell **cell_result)
 {
-	uint32_t hash = hash_key(prison, key);
 	struct dm_bio_prison_cell *cell;
 
-	cell = __search_bucket(prison->cells + hash, key);
+	cell = __search_bucket(b, key);
 	if (cell) {
 		if (inmate)
 			bio_list_add(&cell->bios, inmate);
@@ -148,7 +161,7 @@
 		return 1;
 	}
 
-	__setup_new_cell(prison, key, inmate, hash, cell_prealloc);
+	__setup_new_cell(b, key, inmate, cell_prealloc);
 	*cell_result = cell_prealloc;
 	return 0;
 }
@@ -161,10 +174,11 @@
 {
 	int r;
 	unsigned long flags;
+	struct bucket *b = get_bucket(prison, key);
 
-	spin_lock_irqsave(&prison->lock, flags);
-	r = __bio_detain(prison, key, inmate, cell_prealloc, cell_result);
-	spin_unlock_irqrestore(&prison->lock, flags);
+	spin_lock_irqsave(&b->lock, flags);
+	r = __bio_detain(b, key, inmate, cell_prealloc, cell_result);
+	spin_unlock_irqrestore(&b->lock, flags);
 
 	return r;
 }
@@ -208,10 +222,11 @@
 		     struct bio_list *bios)
 {
 	unsigned long flags;
+	struct bucket *b = get_bucket(prison, &cell->key);
 
-	spin_lock_irqsave(&prison->lock, flags);
+	spin_lock_irqsave(&b->lock, flags);
 	__cell_release(cell, bios);
-	spin_unlock_irqrestore(&prison->lock, flags);
+	spin_unlock_irqrestore(&b->lock, flags);
 }
 EXPORT_SYMBOL_GPL(dm_cell_release);
 
@@ -230,28 +245,25 @@
 			       struct bio_list *inmates)
 {
 	unsigned long flags;
+	struct bucket *b = get_bucket(prison, &cell->key);
 
-	spin_lock_irqsave(&prison->lock, flags);
+	spin_lock_irqsave(&b->lock, flags);
 	__cell_release_no_holder(cell, inmates);
-	spin_unlock_irqrestore(&prison->lock, flags);
+	spin_unlock_irqrestore(&b->lock, flags);
 }
 EXPORT_SYMBOL_GPL(dm_cell_release_no_holder);
 
 void dm_cell_error(struct dm_bio_prison *prison,
-		   struct dm_bio_prison_cell *cell)
+		   struct dm_bio_prison_cell *cell, int error)
 {
 	struct bio_list bios;
 	struct bio *bio;
-	unsigned long flags;
 
 	bio_list_init(&bios);
-
-	spin_lock_irqsave(&prison->lock, flags);
-	__cell_release(cell, &bios);
-	spin_unlock_irqrestore(&prison->lock, flags);
+	dm_cell_release(prison, cell, &bios);
 
 	while ((bio = bio_list_pop(&bios)))
-		bio_io_error(bio);
+		bio_endio(bio, error);
 }
 EXPORT_SYMBOL_GPL(dm_cell_error);
 
diff --git a/drivers/md/dm-bio-prison.h b/drivers/md/dm-bio-prison.h
index 3f83319..6805a14 100644
--- a/drivers/md/dm-bio-prison.h
+++ b/drivers/md/dm-bio-prison.h
@@ -85,7 +85,7 @@
 			       struct dm_bio_prison_cell *cell,
 			       struct bio_list *inmates);
 void dm_cell_error(struct dm_bio_prison *prison,
-		   struct dm_bio_prison_cell *cell);
+		   struct dm_bio_prison_cell *cell, int error);
 
 /*----------------------------------------------------------------*/
 
diff --git a/drivers/md/dm-era-target.c b/drivers/md/dm-era-target.c
index 414dad4..ad913cd 100644
--- a/drivers/md/dm-era-target.c
+++ b/drivers/md/dm-era-target.c
@@ -1391,7 +1391,8 @@
 
 static void era_destroy(struct era *era)
 {
-	metadata_close(era->md);
+	if (era->md)
+		metadata_close(era->md);
 
 	if (era->wq)
 		destroy_workqueue(era->wq);
diff --git a/drivers/md/dm-mpath.c b/drivers/md/dm-mpath.c
index ebfa411..3f6fd9d 100644
--- a/drivers/md/dm-mpath.c
+++ b/drivers/md/dm-mpath.c
@@ -1242,17 +1242,8 @@
 	if (!error && !clone->errors)
 		return 0;	/* I/O complete */
 
-	if (noretry_error(error)) {
-		if ((clone->cmd_flags & REQ_WRITE_SAME) &&
-		    !clone->q->limits.max_write_same_sectors) {
-			struct queue_limits *limits;
-
-			/* device doesn't really support WRITE SAME, disable it */
-			limits = dm_get_queue_limits(dm_table_get_md(m->ti->table));
-			limits->max_write_same_sectors = 0;
-		}
+	if (noretry_error(error))
 		return error;
-	}
 
 	if (mpio->pgpath)
 		fail_path(mpio->pgpath);
diff --git a/drivers/md/dm-snap.c b/drivers/md/dm-snap.c
index 8e0caed..5bd2290 100644
--- a/drivers/md/dm-snap.c
+++ b/drivers/md/dm-snap.c
@@ -2141,6 +2141,11 @@
  * Origin: maps a linear range of a device, with hooks for snapshotting.
  */
 
+struct dm_origin {
+	struct dm_dev *dev;
+	unsigned split_boundary;
+};
+
 /*
  * Construct an origin mapping: <dev_path>
  * The context for an origin is merely a 'struct dm_dev *'
@@ -2149,41 +2154,65 @@
 static int origin_ctr(struct dm_target *ti, unsigned int argc, char **argv)
 {
 	int r;
-	struct dm_dev *dev;
+	struct dm_origin *o;
 
 	if (argc != 1) {
 		ti->error = "origin: incorrect number of arguments";
 		return -EINVAL;
 	}
 
-	r = dm_get_device(ti, argv[0], dm_table_get_mode(ti->table), &dev);
-	if (r) {
-		ti->error = "Cannot get target device";
-		return r;
+	o = kmalloc(sizeof(struct dm_origin), GFP_KERNEL);
+	if (!o) {
+		ti->error = "Cannot allocate private origin structure";
+		r = -ENOMEM;
+		goto bad_alloc;
 	}
 
-	ti->private = dev;
+	r = dm_get_device(ti, argv[0], dm_table_get_mode(ti->table), &o->dev);
+	if (r) {
+		ti->error = "Cannot get target device";
+		goto bad_open;
+	}
+
+	ti->private = o;
 	ti->num_flush_bios = 1;
 
 	return 0;
+
+bad_open:
+	kfree(o);
+bad_alloc:
+	return r;
 }
 
 static void origin_dtr(struct dm_target *ti)
 {
-	struct dm_dev *dev = ti->private;
-	dm_put_device(ti, dev);
+	struct dm_origin *o = ti->private;
+	dm_put_device(ti, o->dev);
+	kfree(o);
 }
 
 static int origin_map(struct dm_target *ti, struct bio *bio)
 {
-	struct dm_dev *dev = ti->private;
-	bio->bi_bdev = dev->bdev;
+	struct dm_origin *o = ti->private;
+	unsigned available_sectors;
 
-	if (bio->bi_rw & REQ_FLUSH)
+	bio->bi_bdev = o->dev->bdev;
+
+	if (unlikely(bio->bi_rw & REQ_FLUSH))
 		return DM_MAPIO_REMAPPED;
 
+	if (bio_rw(bio) != WRITE)
+		return DM_MAPIO_REMAPPED;
+
+	available_sectors = o->split_boundary -
+		((unsigned)bio->bi_iter.bi_sector & (o->split_boundary - 1));
+
+	if (bio_sectors(bio) > available_sectors)
+		dm_accept_partial_bio(bio, available_sectors);
+
 	/* Only tell snapshots if this is a write */
-	return (bio_rw(bio) == WRITE) ? do_origin(dev, bio) : DM_MAPIO_REMAPPED;
+	return do_origin(o->dev, bio);
 }
 
 /*
@@ -2192,15 +2221,15 @@
  */
 static void origin_resume(struct dm_target *ti)
 {
-	struct dm_dev *dev = ti->private;
+	struct dm_origin *o = ti->private;
 
-	ti->max_io_len = get_origin_minimum_chunksize(dev->bdev);
+	o->split_boundary = get_origin_minimum_chunksize(o->dev->bdev);
 }
 
 static void origin_status(struct dm_target *ti, status_type_t type,
 			  unsigned status_flags, char *result, unsigned maxlen)
 {
-	struct dm_dev *dev = ti->private;
+	struct dm_origin *o = ti->private;
 
 	switch (type) {
 	case STATUSTYPE_INFO:
@@ -2208,7 +2237,7 @@
 		break;
 
 	case STATUSTYPE_TABLE:
-		snprintf(result, maxlen, "%s", dev->name);
+		snprintf(result, maxlen, "%s", o->dev->name);
 		break;
 	}
 }
@@ -2216,13 +2245,13 @@
 static int origin_merge(struct dm_target *ti, struct bvec_merge_data *bvm,
 			struct bio_vec *biovec, int max_size)
 {
-	struct dm_dev *dev = ti->private;
-	struct request_queue *q = bdev_get_queue(dev->bdev);
+	struct dm_origin *o = ti->private;
+	struct request_queue *q = bdev_get_queue(o->dev->bdev);
 
 	if (!q->merge_bvec_fn)
 		return max_size;
 
-	bvm->bi_bdev = dev->bdev;
+	bvm->bi_bdev = o->dev->bdev;
 
 	return min(max_size, q->merge_bvec_fn(q, bvm, biovec));
 }
@@ -2230,9 +2259,9 @@
 static int origin_iterate_devices(struct dm_target *ti,
 				  iterate_devices_callout_fn fn, void *data)
 {
-	struct dm_dev *dev = ti->private;
+	struct dm_origin *o = ti->private;
 
-	return fn(ti, dev, 0, ti->len, data);
+	return fn(ti, o->dev, 0, ti->len, data);
 }
 
 static struct target_type origin_target = {
diff --git a/drivers/md/dm-table.c b/drivers/md/dm-table.c
index 50601ec..5f59f1e 100644
--- a/drivers/md/dm-table.c
+++ b/drivers/md/dm-table.c
@@ -465,8 +465,8 @@
 }
 EXPORT_SYMBOL(dm_get_device);
 
-int dm_set_device_limits(struct dm_target *ti, struct dm_dev *dev,
-			 sector_t start, sector_t len, void *data)
+static int dm_set_device_limits(struct dm_target *ti, struct dm_dev *dev,
+				sector_t start, sector_t len, void *data)
 {
 	struct queue_limits *limits = data;
 	struct block_device *bdev = dev->bdev;
@@ -499,7 +499,6 @@
 					  (unsigned int) (PAGE_SIZE >> 9));
 	return 0;
 }
-EXPORT_SYMBOL_GPL(dm_set_device_limits);
 
 /*
  * Decrement a device's use count and remove it if necessary.
diff --git a/drivers/md/dm-thin.c b/drivers/md/dm-thin.c
index 242ac2e..fc9c848 100644
--- a/drivers/md/dm-thin.c
+++ b/drivers/md/dm-thin.c
@@ -310,13 +310,18 @@
 	wake_worker(pool);
 }
 
-static void cell_error(struct pool *pool,
-		       struct dm_bio_prison_cell *cell)
+static void cell_error_with_code(struct pool *pool,
+				 struct dm_bio_prison_cell *cell, int error_code)
 {
-	dm_cell_error(pool->prison, cell);
+	dm_cell_error(pool->prison, cell, error_code);
 	dm_bio_prison_free_cell(pool->prison, cell);
 }
 
+static void cell_error(struct pool *pool, struct dm_bio_prison_cell *cell)
+{
+	cell_error_with_code(pool, cell, -EIO);
+}
+
 /*----------------------------------------------------------------*/
 
 /*
@@ -1027,7 +1032,7 @@
 	spin_unlock_irqrestore(&tc->lock, flags);
 }
 
-static bool should_error_unserviceable_bio(struct pool *pool)
+static int should_error_unserviceable_bio(struct pool *pool)
 {
 	enum pool_mode m = get_pool_mode(pool);
 
@@ -1035,25 +1040,27 @@
 	case PM_WRITE:
 		/* Shouldn't get here */
 		DMERR_LIMIT("bio unserviceable, yet pool is in PM_WRITE mode");
-		return true;
+		return -EIO;
 
 	case PM_OUT_OF_DATA_SPACE:
-		return pool->pf.error_if_no_space;
+		return pool->pf.error_if_no_space ? -ENOSPC : 0;
 
 	case PM_READ_ONLY:
 	case PM_FAIL:
-		return true;
+		return -EIO;
 	default:
 		/* Shouldn't get here */
 		DMERR_LIMIT("bio unserviceable, yet pool has an unknown mode");
-		return true;
+		return -EIO;
 	}
 }
 
 static void handle_unserviceable_bio(struct pool *pool, struct bio *bio)
 {
-	if (should_error_unserviceable_bio(pool))
-		bio_io_error(bio);
+	int error = should_error_unserviceable_bio(pool);
+
+	if (error)
+		bio_endio(bio, error);
 	else
 		retry_on_resume(bio);
 }
@@ -1062,18 +1069,21 @@
 {
 	struct bio *bio;
 	struct bio_list bios;
+	int error;
 
-	if (should_error_unserviceable_bio(pool)) {
-		cell_error(pool, cell);
+	error = should_error_unserviceable_bio(pool);
+	if (error) {
+		cell_error_with_code(pool, cell, error);
 		return;
 	}
 
 	bio_list_init(&bios);
 	cell_release(pool, cell, &bios);
 
-	if (should_error_unserviceable_bio(pool))
+	error = should_error_unserviceable_bio(pool);
+	if (error)
 		while ((bio = bio_list_pop(&bios)))
-			bio_io_error(bio);
+			bio_endio(bio, error);
 	else
 		while ((bio = bio_list_pop(&bios)))
 			retry_on_resume(bio);
@@ -1610,47 +1620,63 @@
 
 /*----------------------------------------------------------------*/
 
-struct noflush_work {
+struct pool_work {
 	struct work_struct worker;
-	struct thin_c *tc;
-
-	atomic_t complete;
-	wait_queue_head_t wait;
+	struct completion complete;
 };
 
-static void complete_noflush_work(struct noflush_work *w)
+static struct pool_work *to_pool_work(struct work_struct *ws)
 {
-	atomic_set(&w->complete, 1);
-	wake_up(&w->wait);
+	return container_of(ws, struct pool_work, worker);
+}
+
+static void pool_work_complete(struct pool_work *pw)
+{
+	complete(&pw->complete);
+}
+
+static void pool_work_wait(struct pool_work *pw, struct pool *pool,
+			   void (*fn)(struct work_struct *))
+{
+	INIT_WORK_ONSTACK(&pw->worker, fn);
+	init_completion(&pw->complete);
+	queue_work(pool->wq, &pw->worker);
+	wait_for_completion(&pw->complete);
+}
+
+/*----------------------------------------------------------------*/
+
+struct noflush_work {
+	struct pool_work pw;
+	struct thin_c *tc;
+};
+
+static struct noflush_work *to_noflush(struct work_struct *ws)
+{
+	return container_of(to_pool_work(ws), struct noflush_work, pw);
 }
 
 static void do_noflush_start(struct work_struct *ws)
 {
-	struct noflush_work *w = container_of(ws, struct noflush_work, worker);
+	struct noflush_work *w = to_noflush(ws);
 	w->tc->requeue_mode = true;
 	requeue_io(w->tc);
-	complete_noflush_work(w);
+	pool_work_complete(&w->pw);
 }
 
 static void do_noflush_stop(struct work_struct *ws)
 {
-	struct noflush_work *w = container_of(ws, struct noflush_work, worker);
+	struct noflush_work *w = to_noflush(ws);
 	w->tc->requeue_mode = false;
-	complete_noflush_work(w);
+	pool_work_complete(&w->pw);
 }
 
 static void noflush_work(struct thin_c *tc, void (*fn)(struct work_struct *))
 {
 	struct noflush_work w;
 
-	INIT_WORK_ONSTACK(&w.worker, fn);
 	w.tc = tc;
-	atomic_set(&w.complete, 0);
-	init_waitqueue_head(&w.wait);
-
-	queue_work(tc->pool->wq, &w.worker);
-
-	wait_event(w.wait, atomic_read(&w.complete));
+	pool_work_wait(&w.pw, tc->pool, fn);
 }
 
 /*----------------------------------------------------------------*/
@@ -3068,7 +3094,8 @@
 	 */
 	if (pt->adjusted_pf.discard_passdown) {
 		data_limits = &bdev_get_queue(pt->data_dev->bdev)->limits;
-		limits->discard_granularity = data_limits->discard_granularity;
+		limits->discard_granularity = max(data_limits->discard_granularity,
+						  pool->sectors_per_block << SECTOR_SHIFT);
 	} else
 		limits->discard_granularity = pool->sectors_per_block << SECTOR_SHIFT;
 }
diff --git a/drivers/md/dm.c b/drivers/md/dm.c
index aa9e093..437d990 100644
--- a/drivers/md/dm.c
+++ b/drivers/md/dm.c
@@ -755,6 +755,14 @@
 	}
 }
 
+static void disable_write_same(struct mapped_device *md)
+{
+	struct queue_limits *limits = dm_get_queue_limits(md);
+
+	/* device doesn't really support WRITE SAME, disable it */
+	limits->max_write_same_sectors = 0;
+}
+
 static void clone_endio(struct bio *bio, int error)
 {
 	int r = 0;
@@ -783,6 +791,10 @@
 		}
 	}
 
+	if (unlikely(r == -EREMOTEIO && (bio->bi_rw & REQ_WRITE_SAME) &&
+		     !bdev_get_queue(bio->bi_bdev)->limits.max_write_same_sectors))
+		disable_write_same(md);
+
 	free_tio(md, tio);
 	dec_pending(io, error);
 }
@@ -977,6 +989,10 @@
 			r = rq_end_io(tio->ti, clone, error, &tio->info);
 	}
 
+	if (unlikely(r == -EREMOTEIO && (clone->cmd_flags & REQ_WRITE_SAME) &&
+		     !clone->q->limits.max_write_same_sectors))
+		disable_write_same(tio->md);
+
 	if (r <= 0)
 		/* The target wants to complete the I/O */
 		dm_end_request(clone, r);
@@ -1110,6 +1126,46 @@
 }
 EXPORT_SYMBOL_GPL(dm_set_target_max_io_len);
 
+/*
+ * A target may call dm_accept_partial_bio only from the map routine.  It is
+ * allowed for all bio types except REQ_FLUSH.
+ *
+ * dm_accept_partial_bio informs the dm that the target only wants to process
+ * additional n_sectors sectors of the bio and the rest of the data should be
+ * sent in a next bio.
+ *
+ * A diagram that explains the arithmetics:
+ * +--------------------+---------------+-------+
+ * |         1          |       2       |   3   |
+ * +--------------------+---------------+-------+
+ *
+ * <-------------- *tio->len_ptr --------------->
+ *                      <------- bi_size ------->
+ *                      <-- n_sectors -->
+ *
+ * Region 1 was already iterated over with bio_advance or similar function.
+ *	(it may be empty if the target doesn't use bio_advance)
+ * Region 2 is the remaining bio size that the target wants to process.
+ *	(it may be empty if region 1 is non-empty, although there is no reason
+ *	 to make it empty)
+ * The target requires that region 3 is to be sent in the next bio.
+ *
+ * If the target wants to receive multiple copies of the bio (via num_*bios, etc),
+ * the partially processed part (the sum of regions 1+2) must be the same for all
+ * copies of the bio.
+ */
+void dm_accept_partial_bio(struct bio *bio, unsigned n_sectors)
+{
+	struct dm_target_io *tio = container_of(bio, struct dm_target_io, clone);
+	unsigned bi_size = bio->bi_iter.bi_size >> SECTOR_SHIFT;
+	BUG_ON(bio->bi_rw & REQ_FLUSH);
+	BUG_ON(bi_size > *tio->len_ptr);
+	BUG_ON(n_sectors > bi_size);
+	*tio->len_ptr -= bi_size - n_sectors;
+	bio->bi_iter.bi_size = n_sectors << SECTOR_SHIFT;
+}
+EXPORT_SYMBOL_GPL(dm_accept_partial_bio);
+
 static void __map_bio(struct dm_target_io *tio)
 {
 	int r;
@@ -1152,10 +1208,10 @@
 	struct bio *bio;
 	struct dm_io *io;
 	sector_t sector;
-	sector_t sector_count;
+	unsigned sector_count;
 };
 
-static void bio_setup_sector(struct bio *bio, sector_t sector, sector_t len)
+static void bio_setup_sector(struct bio *bio, sector_t sector, unsigned len)
 {
 	bio->bi_iter.bi_sector = sector;
 	bio->bi_iter.bi_size = to_bytes(len);
@@ -1200,11 +1256,13 @@
 
 static void __clone_and_map_simple_bio(struct clone_info *ci,
 				       struct dm_target *ti,
-				       unsigned target_bio_nr, sector_t len)
+				       unsigned target_bio_nr, unsigned *len)
 {
 	struct dm_target_io *tio = alloc_tio(ci, ti, ci->bio->bi_max_vecs, target_bio_nr);
 	struct bio *clone = &tio->clone;
 
+	tio->len_ptr = len;
+
 	/*
 	 * Discard requests require the bio's inline iovecs be initialized.
 	 * ci->bio->bi_max_vecs is BIO_INLINE_VECS anyway, for both flush
@@ -1212,13 +1270,13 @@
 	 */
 	 __bio_clone_fast(clone, ci->bio);
 	if (len)
-		bio_setup_sector(clone, ci->sector, len);
+		bio_setup_sector(clone, ci->sector, *len);
 
 	__map_bio(tio);
 }
 
 static void __send_duplicate_bios(struct clone_info *ci, struct dm_target *ti,
-				  unsigned num_bios, sector_t len)
+				  unsigned num_bios, unsigned *len)
 {
 	unsigned target_bio_nr;
 
@@ -1233,13 +1291,13 @@
 
 	BUG_ON(bio_has_data(ci->bio));
 	while ((ti = dm_table_get_target(ci->map, target_nr++)))
-		__send_duplicate_bios(ci, ti, ti->num_flush_bios, 0);
+		__send_duplicate_bios(ci, ti, ti->num_flush_bios, NULL);
 
 	return 0;
 }
 
 static void __clone_and_map_data_bio(struct clone_info *ci, struct dm_target *ti,
-				     sector_t sector, unsigned len)
+				     sector_t sector, unsigned *len)
 {
 	struct bio *bio = ci->bio;
 	struct dm_target_io *tio;
@@ -1254,7 +1312,8 @@
 
 	for (target_bio_nr = 0; target_bio_nr < num_target_bios; target_bio_nr++) {
 		tio = alloc_tio(ci, ti, 0, target_bio_nr);
-		clone_bio(tio, bio, sector, len);
+		tio->len_ptr = len;
+		clone_bio(tio, bio, sector, *len);
 		__map_bio(tio);
 	}
 }
@@ -1283,7 +1342,7 @@
 				       is_split_required_fn is_split_required)
 {
 	struct dm_target *ti;
-	sector_t len;
+	unsigned len;
 	unsigned num_bios;
 
 	do {
@@ -1302,11 +1361,11 @@
 			return -EOPNOTSUPP;
 
 		if (is_split_required && !is_split_required(ti))
-			len = min(ci->sector_count, max_io_len_target_boundary(ci->sector, ti));
+			len = min((sector_t)ci->sector_count, max_io_len_target_boundary(ci->sector, ti));
 		else
-			len = min(ci->sector_count, max_io_len(ci->sector, ti));
+			len = min((sector_t)ci->sector_count, max_io_len(ci->sector, ti));
 
-		__send_duplicate_bios(ci, ti, num_bios, len);
+		__send_duplicate_bios(ci, ti, num_bios, &len);
 
 		ci->sector += len;
 	} while (ci->sector_count -= len);
@@ -1345,7 +1404,7 @@
 
 	len = min_t(sector_t, max_io_len(ci->sector, ti), ci->sector_count);
 
-	__clone_and_map_data_bio(ci, ti, ci->sector, len);
+	__clone_and_map_data_bio(ci, ti, ci->sector, &len);
 
 	ci->sector += len;
 	ci->sector_count -= len;
@@ -1439,7 +1498,6 @@
 	 * just one page.
 	 */
 	else if (queue_max_hw_sectors(q) <= PAGE_SIZE >> 9)
-
 		max_size = 0;
 
 out:
diff --git a/include/linux/device-mapper.h b/include/linux/device-mapper.h
index 63da56e..e1707de 100644
--- a/include/linux/device-mapper.h
+++ b/include/linux/device-mapper.h
@@ -115,12 +115,6 @@
 
 void dm_error(const char *message);
 
-/*
- * Combine device limits.
- */
-int dm_set_device_limits(struct dm_target *ti, struct dm_dev *dev,
-			 sector_t start, sector_t len, void *data);
-
 struct dm_dev {
 	struct block_device *bdev;
 	fmode_t mode;
@@ -132,7 +126,7 @@
  * are opened/closed correctly.
  */
 int dm_get_device(struct dm_target *ti, const char *path, fmode_t mode,
-						 struct dm_dev **result);
+		  struct dm_dev **result);
 void dm_put_device(struct dm_target *ti, struct dm_dev *d);
 
 /*
@@ -291,6 +285,7 @@
 	struct dm_io *io;
 	struct dm_target *ti;
 	unsigned target_bio_nr;
+	unsigned *len_ptr;
 	struct bio clone;
 };
 
@@ -401,6 +396,7 @@
 struct gendisk *dm_disk(struct mapped_device *md);
 int dm_suspended(struct dm_target *ti);
 int dm_noflush_suspending(struct dm_target *ti);
+void dm_accept_partial_bio(struct bio *bio, unsigned n_sectors);
 union map_info *dm_get_rq_mapinfo(struct request *rq);
 
 struct queue_limits *dm_get_queue_limits(struct mapped_device *md);