Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client

Pull Ceph update from Sage Weil:
 "There are a few different groups of commits here.  The largest is
  Alex's ongoing work to enable the coming RBD features (cloning,
  striping).  There is some cleanup in libceph that goes along with it.

  Cyril and David have fixed some problems with NFS reexport (leaking
  dentries and page locks), and there is a batch of patches from Yan
  fixing problems with the fs client when running against a clustered
  MDS.  There are a few bug fixes mixed in for good measure, many of
  which will be going to the stable trees once they're upstream.

  My apologies for the late pull.  There is still a gremlin in the rbd
  map/unmap code and I was hoping to include the fix for that as well,
  but we haven't been able to confirm the fix is correct yet; I'll send
  that in a separate pull once it's nailed down."

* 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client: (68 commits)
  rbd: get rid of rbd_{get,put}_dev()
  libceph: register request before unregister linger
  libceph: don't use rb_init_node() in ceph_osdc_alloc_request()
  libceph: init event->node in ceph_osdc_create_event()
  libceph: init osd->o_node in create_osd()
  libceph: report connection fault with warning
  libceph: socket can close in any connection state
  rbd: don't use ENOTSUPP
  rbd: remove linger unconditionally
  rbd: get rid of RBD_MAX_SEG_NAME_LEN
  libceph: avoid using freed osd in __kick_osd_requests()
  ceph: don't reference req after put
  rbd: do not allow remove of mounted-on image
  libceph: Unlock unprocessed pages in start_read() error path
  ceph: call handle_cap_grant() for cap import message
  ceph: Fix __ceph_do_pending_vmtruncate
  ceph: Don't add dirty inode to dirty list if caps is in migration
  ceph: Fix infinite loop in __wake_requests
  ceph: Don't update i_max_size when handling non-auth cap
  bdi_register: add __printf verification, fix arg mismatch
  ...
diff --git a/Documentation/ABI/testing/sysfs-bus-rbd b/Documentation/ABI/testing/sysfs-bus-rbd
index 1cf2adf..cd9213c 100644
--- a/Documentation/ABI/testing/sysfs-bus-rbd
+++ b/Documentation/ABI/testing/sysfs-bus-rbd
@@ -70,6 +70,10 @@
 
 	A directory per each snapshot
 
+parent
+
+	Information identifying the pool, image, and snapshot id for
+	the parent image in a layered rbd image (format 2 only).
 
 Entries under /sys/bus/rbd/devices/<dev-id>/snap_<snap-name>
 -------------------------------------------------------------
diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index bb3d9be..89576a0 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -61,15 +61,29 @@
 
 #define RBD_MINORS_PER_MAJOR	256		/* max minors per blkdev */
 
-#define RBD_MAX_SNAP_NAME_LEN	32
+#define RBD_SNAP_DEV_NAME_PREFIX	"snap_"
+#define RBD_MAX_SNAP_NAME_LEN	\
+			(NAME_MAX - (sizeof (RBD_SNAP_DEV_NAME_PREFIX) - 1))
+
 #define RBD_MAX_SNAP_COUNT	510	/* allows max snapc to fit in 4KB */
 #define RBD_MAX_OPT_LEN		1024
 
 #define RBD_SNAP_HEAD_NAME	"-"
 
+/* This allows a single page to hold an image name sent by OSD */
+#define RBD_IMAGE_NAME_LEN_MAX	(PAGE_SIZE - sizeof (__le32) - 1)
 #define RBD_IMAGE_ID_LEN_MAX	64
+
 #define RBD_OBJ_PREFIX_LEN_MAX	64
 
+/* Feature bits */
+
+#define RBD_FEATURE_LAYERING      1
+
+/* Features supported by this (client software) implementation. */
+
+#define RBD_FEATURES_ALL          (0)
+
 /*
  * An RBD device name will be "rbd#", where the "rbd" comes from
  * RBD_DRV_NAME above, and # is a unique integer identifier.
@@ -101,6 +115,27 @@
 	u64 obj_version;
 };
 
+/*
+ * An rbd image specification.
+ *
+ * The tuple (pool_id, image_id, snap_id) is sufficient to uniquely
+ * identify an image.
+ */
+struct rbd_spec {
+	u64		pool_id;
+	char		*pool_name;
+
+	char		*image_id;
+	size_t		image_id_len;
+	char		*image_name;
+	size_t		image_name_len;
+
+	u64		snap_id;
+	char		*snap_name;
+
+	struct kref	kref;
+};
+
 struct rbd_options {
 	bool	read_only;
 };
@@ -155,11 +190,8 @@
 };
 
 struct rbd_mapping {
-	char                    *snap_name;
-	u64                     snap_id;
 	u64                     size;
 	u64                     features;
-	bool                    snap_exists;
 	bool			read_only;
 };
 
@@ -173,7 +205,6 @@
 	struct gendisk		*disk;		/* blkdev's gendisk and rq */
 
 	u32			image_format;	/* Either 1 or 2 */
-	struct rbd_options	rbd_opts;
 	struct rbd_client	*rbd_client;
 
 	char			name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
@@ -181,17 +212,17 @@
 	spinlock_t		lock;		/* queue lock */
 
 	struct rbd_image_header	header;
-	char			*image_id;
-	size_t			image_id_len;
-	char			*image_name;
-	size_t			image_name_len;
+	bool                    exists;
+	struct rbd_spec		*spec;
+
 	char			*header_name;
-	char			*pool_name;
-	int			pool_id;
 
 	struct ceph_osd_event   *watch_event;
 	struct ceph_osd_request *watch_request;
 
+	struct rbd_spec		*parent_spec;
+	u64			parent_overlap;
+
 	/* protects updating the header */
 	struct rw_semaphore     header_rwsem;
 
@@ -204,6 +235,7 @@
 
 	/* sysfs related */
 	struct device		dev;
+	unsigned long		open_count;
 };
 
 static DEFINE_MUTEX(ctl_mutex);	  /* Serialize open/close/setup/teardown */
@@ -218,7 +250,7 @@
 static int rbd_dev_snaps_register(struct rbd_device *rbd_dev);
 
 static void rbd_dev_release(struct device *dev);
-static void __rbd_remove_snap_dev(struct rbd_snap *snap);
+static void rbd_remove_snap_dev(struct rbd_snap *snap);
 
 static ssize_t rbd_add(struct bus_type *bus, const char *buf,
 		       size_t count);
@@ -258,17 +290,8 @@
 #  define rbd_assert(expr)	((void) 0)
 #endif /* !RBD_DEBUG */
 
-static struct device *rbd_get_dev(struct rbd_device *rbd_dev)
-{
-	return get_device(&rbd_dev->dev);
-}
-
-static void rbd_put_dev(struct rbd_device *rbd_dev)
-{
-	put_device(&rbd_dev->dev);
-}
-
-static int rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver);
+static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver);
+static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver);
 
 static int rbd_open(struct block_device *bdev, fmode_t mode)
 {
@@ -277,8 +300,11 @@
 	if ((mode & FMODE_WRITE) && rbd_dev->mapping.read_only)
 		return -EROFS;
 
-	rbd_get_dev(rbd_dev);
+	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
+	(void) get_device(&rbd_dev->dev);
 	set_device_ro(bdev, rbd_dev->mapping.read_only);
+	rbd_dev->open_count++;
+	mutex_unlock(&ctl_mutex);
 
 	return 0;
 }
@@ -287,7 +313,11 @@
 {
 	struct rbd_device *rbd_dev = disk->private_data;
 
-	rbd_put_dev(rbd_dev);
+	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
+	rbd_assert(rbd_dev->open_count > 0);
+	rbd_dev->open_count--;
+	put_device(&rbd_dev->dev);
+	mutex_unlock(&ctl_mutex);
 
 	return 0;
 }
@@ -388,7 +418,7 @@
 static match_table_t rbd_opts_tokens = {
 	/* int args above */
 	/* string args above */
-	{Opt_read_only, "mapping.read_only"},
+	{Opt_read_only, "read_only"},
 	{Opt_read_only, "ro"},		/* Alternate spelling */
 	{Opt_read_write, "read_write"},
 	{Opt_read_write, "rw"},		/* Alternate spelling */
@@ -441,33 +471,17 @@
  * Get a ceph client with specific addr and configuration, if one does
  * not exist create it.
  */
-static int rbd_get_client(struct rbd_device *rbd_dev, const char *mon_addr,
-				size_t mon_addr_len, char *options)
+static struct rbd_client *rbd_get_client(struct ceph_options *ceph_opts)
 {
-	struct rbd_options *rbd_opts = &rbd_dev->rbd_opts;
-	struct ceph_options *ceph_opts;
 	struct rbd_client *rbdc;
 
-	rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
-
-	ceph_opts = ceph_parse_options(options, mon_addr,
-					mon_addr + mon_addr_len,
-					parse_rbd_opts_token, rbd_opts);
-	if (IS_ERR(ceph_opts))
-		return PTR_ERR(ceph_opts);
-
 	rbdc = rbd_client_find(ceph_opts);
-	if (rbdc) {
-		/* using an existing client */
+	if (rbdc)	/* using an existing client */
 		ceph_destroy_options(ceph_opts);
-	} else {
+	else
 		rbdc = rbd_client_create(ceph_opts);
-		if (IS_ERR(rbdc))
-			return PTR_ERR(rbdc);
-	}
-	rbd_dev->rbd_client = rbdc;
 
-	return 0;
+	return rbdc;
 }
 
 /*
@@ -492,10 +506,10 @@
  * Drop reference to ceph client node. If it's not referenced anymore, release
  * it.
  */
-static void rbd_put_client(struct rbd_device *rbd_dev)
+static void rbd_put_client(struct rbd_client *rbdc)
 {
-	kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
-	rbd_dev->rbd_client = NULL;
+	if (rbdc)
+		kref_put(&rbdc->kref, rbd_client_release);
 }
 
 /*
@@ -524,6 +538,16 @@
 	if (memcmp(&ondisk->text, RBD_HEADER_TEXT, sizeof (RBD_HEADER_TEXT)))
 		return false;
 
+	/* The bio layer requires at least sector-sized I/O */
+
+	if (ondisk->options.order < SECTOR_SHIFT)
+		return false;
+
+	/* If we use u64 in a few spots we may be able to loosen this */
+
+	if (ondisk->options.order > 8 * sizeof (int) - 1)
+		return false;
+
 	/*
 	 * The size of a snapshot header has to fit in a size_t, and
 	 * that limits the number of snapshots.
@@ -635,6 +659,20 @@
 	return -ENOMEM;
 }
 
+static const char *rbd_snap_name(struct rbd_device *rbd_dev, u64 snap_id)
+{
+	struct rbd_snap *snap;
+
+	if (snap_id == CEPH_NOSNAP)
+		return RBD_SNAP_HEAD_NAME;
+
+	list_for_each_entry(snap, &rbd_dev->snaps, node)
+		if (snap_id == snap->id)
+			return snap->name;
+
+	return NULL;
+}
+
 static int snap_by_name(struct rbd_device *rbd_dev, const char *snap_name)
 {
 
@@ -642,7 +680,7 @@
 
 	list_for_each_entry(snap, &rbd_dev->snaps, node) {
 		if (!strcmp(snap_name, snap->name)) {
-			rbd_dev->mapping.snap_id = snap->id;
+			rbd_dev->spec->snap_id = snap->id;
 			rbd_dev->mapping.size = snap->size;
 			rbd_dev->mapping.features = snap->features;
 
@@ -653,26 +691,23 @@
 	return -ENOENT;
 }
 
-static int rbd_dev_set_mapping(struct rbd_device *rbd_dev, char *snap_name)
+static int rbd_dev_set_mapping(struct rbd_device *rbd_dev)
 {
 	int ret;
 
-	if (!memcmp(snap_name, RBD_SNAP_HEAD_NAME,
+	if (!memcmp(rbd_dev->spec->snap_name, RBD_SNAP_HEAD_NAME,
 		    sizeof (RBD_SNAP_HEAD_NAME))) {
-		rbd_dev->mapping.snap_id = CEPH_NOSNAP;
+		rbd_dev->spec->snap_id = CEPH_NOSNAP;
 		rbd_dev->mapping.size = rbd_dev->header.image_size;
 		rbd_dev->mapping.features = rbd_dev->header.features;
-		rbd_dev->mapping.snap_exists = false;
-		rbd_dev->mapping.read_only = rbd_dev->rbd_opts.read_only;
 		ret = 0;
 	} else {
-		ret = snap_by_name(rbd_dev, snap_name);
+		ret = snap_by_name(rbd_dev, rbd_dev->spec->snap_name);
 		if (ret < 0)
 			goto done;
-		rbd_dev->mapping.snap_exists = true;
 		rbd_dev->mapping.read_only = true;
 	}
-	rbd_dev->mapping.snap_name = snap_name;
+	rbd_dev->exists = true;
 done:
 	return ret;
 }
@@ -695,13 +730,13 @@
 	u64 segment;
 	int ret;
 
-	name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
+	name = kmalloc(MAX_OBJ_NAME_SIZE + 1, GFP_NOIO);
 	if (!name)
 		return NULL;
 	segment = offset >> rbd_dev->header.obj_order;
-	ret = snprintf(name, RBD_MAX_SEG_NAME_LEN, "%s.%012llx",
+	ret = snprintf(name, MAX_OBJ_NAME_SIZE + 1, "%s.%012llx",
 			rbd_dev->header.object_prefix, segment);
-	if (ret < 0 || ret >= RBD_MAX_SEG_NAME_LEN) {
+	if (ret < 0 || ret > MAX_OBJ_NAME_SIZE) {
 		pr_err("error formatting segment name for #%llu (%d)\n",
 			segment, ret);
 		kfree(name);
@@ -800,77 +835,144 @@
 }
 
 /*
- * bio_chain_clone - clone a chain of bios up to a certain length.
- * might return a bio_pair that will need to be released.
+ * Clone a portion of a bio, starting at the given byte offset
+ * and continuing for the number of bytes indicated.
  */
-static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
-				   struct bio_pair **bp,
-				   int len, gfp_t gfpmask)
+static struct bio *bio_clone_range(struct bio *bio_src,
+					unsigned int offset,
+					unsigned int len,
+					gfp_t gfpmask)
 {
-	struct bio *old_chain = *old;
-	struct bio *new_chain = NULL;
-	struct bio *tail;
-	int total = 0;
+	struct bio_vec *bv;
+	unsigned int resid;
+	unsigned short idx;
+	unsigned int voff;
+	unsigned short end_idx;
+	unsigned short vcnt;
+	struct bio *bio;
 
-	if (*bp) {
-		bio_pair_release(*bp);
-		*bp = NULL;
+	/* Handle the easy case for the caller */
+
+	if (!offset && len == bio_src->bi_size)
+		return bio_clone(bio_src, gfpmask);
+
+	if (WARN_ON_ONCE(!len))
+		return NULL;
+	if (WARN_ON_ONCE(len > bio_src->bi_size))
+		return NULL;
+	if (WARN_ON_ONCE(offset > bio_src->bi_size - len))
+		return NULL;
+
+	/* Find first affected segment... */
+
+	resid = offset;
+	__bio_for_each_segment(bv, bio_src, idx, 0) {
+		if (resid < bv->bv_len)
+			break;
+		resid -= bv->bv_len;
+	}
+	voff = resid;
+
+	/* ...and the last affected segment */
+
+	resid += len;
+	__bio_for_each_segment(bv, bio_src, end_idx, idx) {
+		if (resid <= bv->bv_len)
+			break;
+		resid -= bv->bv_len;
+	}
+	vcnt = end_idx - idx + 1;
+
+	/* Build the clone */
+
+	bio = bio_alloc(gfpmask, (unsigned int) vcnt);
+	if (!bio)
+		return NULL;	/* ENOMEM */
+
+	bio->bi_bdev = bio_src->bi_bdev;
+	bio->bi_sector = bio_src->bi_sector + (offset >> SECTOR_SHIFT);
+	bio->bi_rw = bio_src->bi_rw;
+	bio->bi_flags |= 1 << BIO_CLONED;
+
+	/*
+	 * Copy over our part of the bio_vec, then update the first
+	 * and last (or only) entries.
+	 */
+	memcpy(&bio->bi_io_vec[0], &bio_src->bi_io_vec[idx],
+			vcnt * sizeof (struct bio_vec));
+	bio->bi_io_vec[0].bv_offset += voff;
+	if (vcnt > 1) {
+		bio->bi_io_vec[0].bv_len -= voff;
+		bio->bi_io_vec[vcnt - 1].bv_len = resid;
+	} else {
+		bio->bi_io_vec[0].bv_len = len;
 	}
 
-	while (old_chain && (total < len)) {
-		struct bio *tmp;
+	bio->bi_vcnt = vcnt;
+	bio->bi_size = len;
+	bio->bi_idx = 0;
 
-		tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
-		if (!tmp)
-			goto err_out;
-		gfpmask &= ~__GFP_WAIT;	/* can't wait after the first */
+	return bio;
+}
 
-		if (total + old_chain->bi_size > len) {
-			struct bio_pair *bp;
+/*
+ * Clone a portion of a bio chain, starting at the given byte offset
+ * into the first bio in the source chain and continuing for the
+ * number of bytes indicated.  The result is another bio chain of
+ * exactly the given length, or a null pointer on error.
+ *
+ * The bio_src and offset parameters are both in-out.  On entry they
+ * refer to the first source bio and the offset into that bio where
+ * the start of data to be cloned is located.
+ *
+ * On return, bio_src is updated to refer to the bio in the source
+ * chain that contains first un-cloned byte, and *offset will
+ * contain the offset of that byte within that bio.
+ */
+static struct bio *bio_chain_clone_range(struct bio **bio_src,
+					unsigned int *offset,
+					unsigned int len,
+					gfp_t gfpmask)
+{
+	struct bio *bi = *bio_src;
+	unsigned int off = *offset;
+	struct bio *chain = NULL;
+	struct bio **end;
 
-			/*
-			 * this split can only happen with a single paged bio,
-			 * split_bio will BUG_ON if this is not the case
-			 */
-			dout("bio_chain_clone split! total=%d remaining=%d"
-			     "bi_size=%u\n",
-			     total, len - total, old_chain->bi_size);
+	/* Build up a chain of clone bios up to the limit */
 
-			/* split the bio. We'll release it either in the next
-			   call, or it will have to be released outside */
-			bp = bio_split(old_chain, (len - total) / SECTOR_SIZE);
-			if (!bp)
-				goto err_out;
+	if (!bi || off >= bi->bi_size || !len)
+		return NULL;		/* Nothing to clone */
 
-			__bio_clone(tmp, &bp->bio1);
+	end = &chain;
+	while (len) {
+		unsigned int bi_size;
+		struct bio *bio;
 
-			*next = &bp->bio2;
-		} else {
-			__bio_clone(tmp, old_chain);
-			*next = old_chain->bi_next;
+		if (!bi)
+			goto out_err;	/* EINVAL; ran out of bio's */
+		bi_size = min_t(unsigned int, bi->bi_size - off, len);
+		bio = bio_clone_range(bi, off, bi_size, gfpmask);
+		if (!bio)
+			goto out_err;	/* ENOMEM */
+
+		*end = bio;
+		end = &bio->bi_next;
+
+		off += bi_size;
+		if (off == bi->bi_size) {
+			bi = bi->bi_next;
+			off = 0;
 		}
-
-		tmp->bi_bdev = NULL;
-		tmp->bi_next = NULL;
-		if (new_chain)
-			tail->bi_next = tmp;
-		else
-			new_chain = tmp;
-		tail = tmp;
-		old_chain = old_chain->bi_next;
-
-		total += tmp->bi_size;
+		len -= bi_size;
 	}
+	*bio_src = bi;
+	*offset = off;
 
-	rbd_assert(total == len);
+	return chain;
+out_err:
+	bio_chain_put(chain);
 
-	*old = old_chain;
-
-	return new_chain;
-
-err_out:
-	dout("bio_chain_clone with err\n");
-	bio_chain_put(new_chain);
 	return NULL;
 }
 
@@ -988,8 +1090,9 @@
 		req_data->coll_index = coll_index;
 	}
 
-	dout("rbd_do_request object_name=%s ofs=%llu len=%llu\n", object_name,
-		(unsigned long long) ofs, (unsigned long long) len);
+	dout("rbd_do_request object_name=%s ofs=%llu len=%llu coll=%p[%d]\n",
+		object_name, (unsigned long long) ofs,
+		(unsigned long long) len, coll, coll_index);
 
 	osdc = &rbd_dev->rbd_client->client->osdc;
 	req = ceph_osdc_alloc_request(osdc, flags, snapc, ops,
@@ -1019,7 +1122,7 @@
 	layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
 	layout->fl_stripe_count = cpu_to_le32(1);
 	layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
-	layout->fl_pg_pool = cpu_to_le32(rbd_dev->pool_id);
+	layout->fl_pg_pool = cpu_to_le32((int) rbd_dev->spec->pool_id);
 	ret = ceph_calc_raw_layout(osdc, layout, snapid, ofs, &len, &bno,
 				   req, ops);
 	rbd_assert(ret == 0);
@@ -1154,8 +1257,6 @@
 static int rbd_do_op(struct request *rq,
 		     struct rbd_device *rbd_dev,
 		     struct ceph_snap_context *snapc,
-		     u64 snapid,
-		     int opcode, int flags,
 		     u64 ofs, u64 len,
 		     struct bio *bio,
 		     struct rbd_req_coll *coll,
@@ -1167,6 +1268,9 @@
 	int ret;
 	struct ceph_osd_req_op *ops;
 	u32 payload_len;
+	int opcode;
+	int flags;
+	u64 snapid;
 
 	seg_name = rbd_segment_name(rbd_dev, ofs);
 	if (!seg_name)
@@ -1174,7 +1278,18 @@
 	seg_len = rbd_segment_length(rbd_dev, ofs, len);
 	seg_ofs = rbd_segment_offset(rbd_dev, ofs);
 
-	payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0);
+	if (rq_data_dir(rq) == WRITE) {
+		opcode = CEPH_OSD_OP_WRITE;
+		flags = CEPH_OSD_FLAG_WRITE|CEPH_OSD_FLAG_ONDISK;
+		snapid = CEPH_NOSNAP;
+		payload_len = seg_len;
+	} else {
+		opcode = CEPH_OSD_OP_READ;
+		flags = CEPH_OSD_FLAG_READ;
+		snapc = NULL;
+		snapid = rbd_dev->spec->snap_id;
+		payload_len = 0;
+	}
 
 	ret = -ENOMEM;
 	ops = rbd_create_rw_ops(1, opcode, payload_len);
@@ -1202,41 +1317,6 @@
 }
 
 /*
- * Request async osd write
- */
-static int rbd_req_write(struct request *rq,
-			 struct rbd_device *rbd_dev,
-			 struct ceph_snap_context *snapc,
-			 u64 ofs, u64 len,
-			 struct bio *bio,
-			 struct rbd_req_coll *coll,
-			 int coll_index)
-{
-	return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP,
-			 CEPH_OSD_OP_WRITE,
-			 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
-			 ofs, len, bio, coll, coll_index);
-}
-
-/*
- * Request async osd read
- */
-static int rbd_req_read(struct request *rq,
-			 struct rbd_device *rbd_dev,
-			 u64 snapid,
-			 u64 ofs, u64 len,
-			 struct bio *bio,
-			 struct rbd_req_coll *coll,
-			 int coll_index)
-{
-	return rbd_do_op(rq, rbd_dev, NULL,
-			 snapid,
-			 CEPH_OSD_OP_READ,
-			 CEPH_OSD_FLAG_READ,
-			 ofs, len, bio, coll, coll_index);
-}
-
-/*
  * Request sync osd read
  */
 static int rbd_req_sync_read(struct rbd_device *rbd_dev,
@@ -1304,7 +1384,7 @@
 	dout("rbd_watch_cb %s notify_id=%llu opcode=%u\n",
 		rbd_dev->header_name, (unsigned long long) notify_id,
 		(unsigned int) opcode);
-	rc = rbd_refresh_header(rbd_dev, &hver);
+	rc = rbd_dev_refresh(rbd_dev, &hver);
 	if (rc)
 		pr_warning(RBD_DRV_NAME "%d got notification but failed to "
 			   " update snaps: %d\n", rbd_dev->major, rc);
@@ -1460,18 +1540,16 @@
 {
 	struct rbd_device *rbd_dev = q->queuedata;
 	struct request *rq;
-	struct bio_pair *bp = NULL;
 
 	while ((rq = blk_fetch_request(q))) {
 		struct bio *bio;
-		struct bio *rq_bio, *next_bio = NULL;
 		bool do_write;
 		unsigned int size;
-		u64 op_size = 0;
 		u64 ofs;
 		int num_segs, cur_seg = 0;
 		struct rbd_req_coll *coll;
 		struct ceph_snap_context *snapc;
+		unsigned int bio_offset;
 
 		dout("fetched request\n");
 
@@ -1483,10 +1561,6 @@
 
 		/* deduce our operation (read, write) */
 		do_write = (rq_data_dir(rq) == WRITE);
-
-		size = blk_rq_bytes(rq);
-		ofs = blk_rq_pos(rq) * SECTOR_SIZE;
-		rq_bio = rq->bio;
 		if (do_write && rbd_dev->mapping.read_only) {
 			__blk_end_request_all(rq, -EROFS);
 			continue;
@@ -1496,8 +1570,8 @@
 
 		down_read(&rbd_dev->header_rwsem);
 
-		if (rbd_dev->mapping.snap_id != CEPH_NOSNAP &&
-				!rbd_dev->mapping.snap_exists) {
+		if (!rbd_dev->exists) {
+			rbd_assert(rbd_dev->spec->snap_id != CEPH_NOSNAP);
 			up_read(&rbd_dev->header_rwsem);
 			dout("request for non-existent snapshot");
 			spin_lock_irq(q->queue_lock);
@@ -1509,6 +1583,10 @@
 
 		up_read(&rbd_dev->header_rwsem);
 
+		size = blk_rq_bytes(rq);
+		ofs = blk_rq_pos(rq) * SECTOR_SIZE;
+		bio = rq->bio;
+
 		dout("%s 0x%x bytes at 0x%llx\n",
 		     do_write ? "write" : "read",
 		     size, (unsigned long long) blk_rq_pos(rq) * SECTOR_SIZE);
@@ -1528,45 +1606,37 @@
 			continue;
 		}
 
+		bio_offset = 0;
 		do {
-			/* a bio clone to be passed down to OSD req */
+			u64 limit = rbd_segment_length(rbd_dev, ofs, size);
+			unsigned int chain_size;
+			struct bio *bio_chain;
+
+			BUG_ON(limit > (u64) UINT_MAX);
+			chain_size = (unsigned int) limit;
 			dout("rq->bio->bi_vcnt=%hu\n", rq->bio->bi_vcnt);
-			op_size = rbd_segment_length(rbd_dev, ofs, size);
+
 			kref_get(&coll->kref);
-			bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
-					      op_size, GFP_ATOMIC);
-			if (!bio) {
-				rbd_coll_end_req_index(rq, coll, cur_seg,
-						       -ENOMEM, op_size);
-				goto next_seg;
-			}
 
+			/* Pass a cloned bio chain via an osd request */
 
-			/* init OSD command: write or read */
-			if (do_write)
-				rbd_req_write(rq, rbd_dev,
-					      snapc,
-					      ofs,
-					      op_size, bio,
-					      coll, cur_seg);
+			bio_chain = bio_chain_clone_range(&bio,
+						&bio_offset, chain_size,
+						GFP_ATOMIC);
+			if (bio_chain)
+				(void) rbd_do_op(rq, rbd_dev, snapc,
+						ofs, chain_size,
+						bio_chain, coll, cur_seg);
 			else
-				rbd_req_read(rq, rbd_dev,
-					     rbd_dev->mapping.snap_id,
-					     ofs,
-					     op_size, bio,
-					     coll, cur_seg);
-
-next_seg:
-			size -= op_size;
-			ofs += op_size;
+				rbd_coll_end_req_index(rq, coll, cur_seg,
+						       -ENOMEM, chain_size);
+			size -= chain_size;
+			ofs += chain_size;
 
 			cur_seg++;
-			rq_bio = next_bio;
 		} while (size > 0);
 		kref_put(&coll->kref, rbd_coll_release);
 
-		if (bp)
-			bio_pair_release(bp);
 		spin_lock_irq(q->queue_lock);
 
 		ceph_put_snap_context(snapc);
@@ -1576,28 +1646,47 @@
 /*
  * a queue callback. Makes sure that we don't create a bio that spans across
  * multiple osd objects. One exception would be with a single page bios,
- * which we handle later at bio_chain_clone
+ * which we handle later at bio_chain_clone_range()
  */
 static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
 			  struct bio_vec *bvec)
 {
 	struct rbd_device *rbd_dev = q->queuedata;
-	unsigned int chunk_sectors;
-	sector_t sector;
-	unsigned int bio_sectors;
-	int max;
+	sector_t sector_offset;
+	sector_t sectors_per_obj;
+	sector_t obj_sector_offset;
+	int ret;
 
-	chunk_sectors = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
-	sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
-	bio_sectors = bmd->bi_size >> SECTOR_SHIFT;
+	/*
+	 * Find how far into its rbd object the partition-relative
+	 * bio start sector is to offset relative to the enclosing
+	 * device.
+	 */
+	sector_offset = get_start_sect(bmd->bi_bdev) + bmd->bi_sector;
+	sectors_per_obj = 1 << (rbd_dev->header.obj_order - SECTOR_SHIFT);
+	obj_sector_offset = sector_offset & (sectors_per_obj - 1);
 
-	max =  (chunk_sectors - ((sector & (chunk_sectors - 1))
-				 + bio_sectors)) << SECTOR_SHIFT;
-	if (max < 0)
-		max = 0; /* bio_add cannot handle a negative return */
-	if (max <= bvec->bv_len && bio_sectors == 0)
-		return bvec->bv_len;
-	return max;
+	/*
+	 * Compute the number of bytes from that offset to the end
+	 * of the object.  Account for what's already used by the bio.
+	 */
+	ret = (int) (sectors_per_obj - obj_sector_offset) << SECTOR_SHIFT;
+	if (ret > bmd->bi_size)
+		ret -= bmd->bi_size;
+	else
+		ret = 0;
+
+	/*
+	 * Don't send back more than was asked for.  And if the bio
+	 * was empty, let the whole thing through because:  "Note
+	 * that a block device *must* allow a single page to be
+	 * added to an empty bio."
+	 */
+	rbd_assert(bvec->bv_len <= PAGE_SIZE);
+	if (ret > (int) bvec->bv_len || !bmd->bi_size)
+		ret = (int) bvec->bv_len;
+
+	return ret;
 }
 
 static void rbd_free_disk(struct rbd_device *rbd_dev)
@@ -1663,13 +1752,13 @@
 			ret = -ENXIO;
 			pr_warning("short header read for image %s"
 					" (want %zd got %d)\n",
-				rbd_dev->image_name, size, ret);
+				rbd_dev->spec->image_name, size, ret);
 			goto out_err;
 		}
 		if (!rbd_dev_ondisk_valid(ondisk)) {
 			ret = -ENXIO;
 			pr_warning("invalid header for image %s\n",
-				rbd_dev->image_name);
+				rbd_dev->spec->image_name);
 			goto out_err;
 		}
 
@@ -1707,19 +1796,32 @@
 	return ret;
 }
 
-static void __rbd_remove_all_snaps(struct rbd_device *rbd_dev)
+static void rbd_remove_all_snaps(struct rbd_device *rbd_dev)
 {
 	struct rbd_snap *snap;
 	struct rbd_snap *next;
 
 	list_for_each_entry_safe(snap, next, &rbd_dev->snaps, node)
-		__rbd_remove_snap_dev(snap);
+		rbd_remove_snap_dev(snap);
+}
+
+static void rbd_update_mapping_size(struct rbd_device *rbd_dev)
+{
+	sector_t size;
+
+	if (rbd_dev->spec->snap_id != CEPH_NOSNAP)
+		return;
+
+	size = (sector_t) rbd_dev->header.image_size / SECTOR_SIZE;
+	dout("setting size to %llu sectors", (unsigned long long) size);
+	rbd_dev->mapping.size = (u64) size;
+	set_capacity(rbd_dev->disk, size);
 }
 
 /*
  * only read the first part of the ondisk header, without the snaps info
  */
-static int __rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver)
+static int rbd_dev_v1_refresh(struct rbd_device *rbd_dev, u64 *hver)
 {
 	int ret;
 	struct rbd_image_header h;
@@ -1730,17 +1832,9 @@
 
 	down_write(&rbd_dev->header_rwsem);
 
-	/* resized? */
-	if (rbd_dev->mapping.snap_id == CEPH_NOSNAP) {
-		sector_t size = (sector_t) h.image_size / SECTOR_SIZE;
-
-		if (size != (sector_t) rbd_dev->mapping.size) {
-			dout("setting size to %llu sectors",
-				(unsigned long long) size);
-			rbd_dev->mapping.size = (u64) size;
-			set_capacity(rbd_dev->disk, size);
-		}
-	}
+	/* Update image size, and check for resize of mapped image */
+	rbd_dev->header.image_size = h.image_size;
+	rbd_update_mapping_size(rbd_dev);
 
 	/* rbd_dev->header.object_prefix shouldn't change */
 	kfree(rbd_dev->header.snap_sizes);
@@ -1768,12 +1862,16 @@
 	return ret;
 }
 
-static int rbd_refresh_header(struct rbd_device *rbd_dev, u64 *hver)
+static int rbd_dev_refresh(struct rbd_device *rbd_dev, u64 *hver)
 {
 	int ret;
 
+	rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
 	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
-	ret = __rbd_refresh_header(rbd_dev, hver);
+	if (rbd_dev->image_format == 1)
+		ret = rbd_dev_v1_refresh(rbd_dev, hver);
+	else
+		ret = rbd_dev_v2_refresh(rbd_dev, hver);
 	mutex_unlock(&ctl_mutex);
 
 	return ret;
@@ -1885,7 +1983,7 @@
 {
 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
 
-	return sprintf(buf, "%s\n", rbd_dev->pool_name);
+	return sprintf(buf, "%s\n", rbd_dev->spec->pool_name);
 }
 
 static ssize_t rbd_pool_id_show(struct device *dev,
@@ -1893,7 +1991,8 @@
 {
 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
 
-	return sprintf(buf, "%d\n", rbd_dev->pool_id);
+	return sprintf(buf, "%llu\n",
+		(unsigned long long) rbd_dev->spec->pool_id);
 }
 
 static ssize_t rbd_name_show(struct device *dev,
@@ -1901,7 +2000,10 @@
 {
 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
 
-	return sprintf(buf, "%s\n", rbd_dev->image_name);
+	if (rbd_dev->spec->image_name)
+		return sprintf(buf, "%s\n", rbd_dev->spec->image_name);
+
+	return sprintf(buf, "(unknown)\n");
 }
 
 static ssize_t rbd_image_id_show(struct device *dev,
@@ -1909,7 +2011,7 @@
 {
 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
 
-	return sprintf(buf, "%s\n", rbd_dev->image_id);
+	return sprintf(buf, "%s\n", rbd_dev->spec->image_id);
 }
 
 /*
@@ -1922,7 +2024,50 @@
 {
 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
 
-	return sprintf(buf, "%s\n", rbd_dev->mapping.snap_name);
+	return sprintf(buf, "%s\n", rbd_dev->spec->snap_name);
+}
+
+/*
+ * For an rbd v2 image, shows the pool id, image id, and snapshot id
+ * for the parent image.  If there is no parent, simply shows
+ * "(no parent image)".
+ */
+static ssize_t rbd_parent_show(struct device *dev,
+			     struct device_attribute *attr,
+			     char *buf)
+{
+	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
+	struct rbd_spec *spec = rbd_dev->parent_spec;
+	int count;
+	char *bufp = buf;
+
+	if (!spec)
+		return sprintf(buf, "(no parent image)\n");
+
+	count = sprintf(bufp, "pool_id %llu\npool_name %s\n",
+			(unsigned long long) spec->pool_id, spec->pool_name);
+	if (count < 0)
+		return count;
+	bufp += count;
+
+	count = sprintf(bufp, "image_id %s\nimage_name %s\n", spec->image_id,
+			spec->image_name ? spec->image_name : "(unknown)");
+	if (count < 0)
+		return count;
+	bufp += count;
+
+	count = sprintf(bufp, "snap_id %llu\nsnap_name %s\n",
+			(unsigned long long) spec->snap_id, spec->snap_name);
+	if (count < 0)
+		return count;
+	bufp += count;
+
+	count = sprintf(bufp, "overlap %llu\n", rbd_dev->parent_overlap);
+	if (count < 0)
+		return count;
+	bufp += count;
+
+	return (ssize_t) (bufp - buf);
 }
 
 static ssize_t rbd_image_refresh(struct device *dev,
@@ -1933,7 +2078,7 @@
 	struct rbd_device *rbd_dev = dev_to_rbd_dev(dev);
 	int ret;
 
-	ret = rbd_refresh_header(rbd_dev, NULL);
+	ret = rbd_dev_refresh(rbd_dev, NULL);
 
 	return ret < 0 ? ret : size;
 }
@@ -1948,6 +2093,7 @@
 static DEVICE_ATTR(image_id, S_IRUGO, rbd_image_id_show, NULL);
 static DEVICE_ATTR(refresh, S_IWUSR, NULL, rbd_image_refresh);
 static DEVICE_ATTR(current_snap, S_IRUGO, rbd_snap_show, NULL);
+static DEVICE_ATTR(parent, S_IRUGO, rbd_parent_show, NULL);
 
 static struct attribute *rbd_attrs[] = {
 	&dev_attr_size.attr,
@@ -1959,6 +2105,7 @@
 	&dev_attr_name.attr,
 	&dev_attr_image_id.attr,
 	&dev_attr_current_snap.attr,
+	&dev_attr_parent.attr,
 	&dev_attr_refresh.attr,
 	NULL
 };
@@ -2047,6 +2194,74 @@
 	.release	= rbd_snap_dev_release,
 };
 
+static struct rbd_spec *rbd_spec_get(struct rbd_spec *spec)
+{
+	kref_get(&spec->kref);
+
+	return spec;
+}
+
+static void rbd_spec_free(struct kref *kref);
+static void rbd_spec_put(struct rbd_spec *spec)
+{
+	if (spec)
+		kref_put(&spec->kref, rbd_spec_free);
+}
+
+static struct rbd_spec *rbd_spec_alloc(void)
+{
+	struct rbd_spec *spec;
+
+	spec = kzalloc(sizeof (*spec), GFP_KERNEL);
+	if (!spec)
+		return NULL;
+	kref_init(&spec->kref);
+
+	rbd_spec_put(rbd_spec_get(spec));	/* TEMPORARY */
+
+	return spec;
+}
+
+static void rbd_spec_free(struct kref *kref)
+{
+	struct rbd_spec *spec = container_of(kref, struct rbd_spec, kref);
+
+	kfree(spec->pool_name);
+	kfree(spec->image_id);
+	kfree(spec->image_name);
+	kfree(spec->snap_name);
+	kfree(spec);
+}
+
+struct rbd_device *rbd_dev_create(struct rbd_client *rbdc,
+				struct rbd_spec *spec)
+{
+	struct rbd_device *rbd_dev;
+
+	rbd_dev = kzalloc(sizeof (*rbd_dev), GFP_KERNEL);
+	if (!rbd_dev)
+		return NULL;
+
+	spin_lock_init(&rbd_dev->lock);
+	INIT_LIST_HEAD(&rbd_dev->node);
+	INIT_LIST_HEAD(&rbd_dev->snaps);
+	init_rwsem(&rbd_dev->header_rwsem);
+
+	rbd_dev->spec = spec;
+	rbd_dev->rbd_client = rbdc;
+
+	return rbd_dev;
+}
+
+static void rbd_dev_destroy(struct rbd_device *rbd_dev)
+{
+	rbd_spec_put(rbd_dev->parent_spec);
+	kfree(rbd_dev->header_name);
+	rbd_put_client(rbd_dev->rbd_client);
+	rbd_spec_put(rbd_dev->spec);
+	kfree(rbd_dev);
+}
+
 static bool rbd_snap_registered(struct rbd_snap *snap)
 {
 	bool ret = snap->dev.type == &rbd_snap_device_type;
@@ -2057,7 +2272,7 @@
 	return ret;
 }
 
-static void __rbd_remove_snap_dev(struct rbd_snap *snap)
+static void rbd_remove_snap_dev(struct rbd_snap *snap)
 {
 	list_del(&snap->node);
 	if (device_is_registered(&snap->dev))
@@ -2073,7 +2288,7 @@
 	dev->type = &rbd_snap_device_type;
 	dev->parent = parent;
 	dev->release = rbd_snap_dev_release;
-	dev_set_name(dev, "snap_%s", snap->name);
+	dev_set_name(dev, "%s%s", RBD_SNAP_DEV_NAME_PREFIX, snap->name);
 	dout("%s: registering device for snapshot %s\n", __func__, snap->name);
 
 	ret = device_register(dev);
@@ -2189,6 +2404,7 @@
 	dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
 	if (ret < 0)
 		goto out;
+	ret = 0;    /* rbd_req_sync_exec() can return positive */
 
 	p = reply_buf;
 	rbd_dev->header.object_prefix = ceph_extract_encoded_string(&p,
@@ -2216,6 +2432,7 @@
 		__le64 features;
 		__le64 incompat;
 	} features_buf = { 0 };
+	u64 incompat;
 	int ret;
 
 	ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
@@ -2226,6 +2443,11 @@
 	dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
 	if (ret < 0)
 		return ret;
+
+	incompat = le64_to_cpu(features_buf.incompat);
+	if (incompat & ~RBD_FEATURES_ALL)
+		return -ENXIO;
+
 	*snap_features = le64_to_cpu(features_buf.features);
 
 	dout("  snap_id 0x%016llx features = 0x%016llx incompat = 0x%016llx\n",
@@ -2242,6 +2464,183 @@
 						&rbd_dev->header.features);
 }
 
+static int rbd_dev_v2_parent_info(struct rbd_device *rbd_dev)
+{
+	struct rbd_spec *parent_spec;
+	size_t size;
+	void *reply_buf = NULL;
+	__le64 snapid;
+	void *p;
+	void *end;
+	char *image_id;
+	u64 overlap;
+	size_t len = 0;
+	int ret;
+
+	parent_spec = rbd_spec_alloc();
+	if (!parent_spec)
+		return -ENOMEM;
+
+	size = sizeof (__le64) +				/* pool_id */
+		sizeof (__le32) + RBD_IMAGE_ID_LEN_MAX +	/* image_id */
+		sizeof (__le64) +				/* snap_id */
+		sizeof (__le64);				/* overlap */
+	reply_buf = kmalloc(size, GFP_KERNEL);
+	if (!reply_buf) {
+		ret = -ENOMEM;
+		goto out_err;
+	}
+
+	snapid = cpu_to_le64(CEPH_NOSNAP);
+	ret = rbd_req_sync_exec(rbd_dev, rbd_dev->header_name,
+				"rbd", "get_parent",
+				(char *) &snapid, sizeof (snapid),
+				(char *) reply_buf, size,
+				CEPH_OSD_FLAG_READ, NULL);
+	dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
+	if (ret < 0)
+		goto out_err;
+
+	ret = -ERANGE;
+	p = reply_buf;
+	end = (char *) reply_buf + size;
+	ceph_decode_64_safe(&p, end, parent_spec->pool_id, out_err);
+	if (parent_spec->pool_id == CEPH_NOPOOL)
+		goto out;	/* No parent?  No problem. */
+
+	image_id = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
+	if (IS_ERR(image_id)) {
+		ret = PTR_ERR(image_id);
+		goto out_err;
+	}
+	parent_spec->image_id = image_id;
+	parent_spec->image_id_len = len;
+	ceph_decode_64_safe(&p, end, parent_spec->snap_id, out_err);
+	ceph_decode_64_safe(&p, end, overlap, out_err);
+
+	rbd_dev->parent_overlap = overlap;
+	rbd_dev->parent_spec = parent_spec;
+	parent_spec = NULL;	/* rbd_dev now owns this */
+out:
+	ret = 0;
+out_err:
+	kfree(reply_buf);
+	rbd_spec_put(parent_spec);
+
+	return ret;
+}
+
+static char *rbd_dev_image_name(struct rbd_device *rbd_dev)
+{
+	size_t image_id_size;
+	char *image_id;
+	void *p;
+	void *end;
+	size_t size;
+	void *reply_buf = NULL;
+	size_t len = 0;
+	char *image_name = NULL;
+	int ret;
+
+	rbd_assert(!rbd_dev->spec->image_name);
+
+	image_id_size = sizeof (__le32) + rbd_dev->spec->image_id_len;
+	image_id = kmalloc(image_id_size, GFP_KERNEL);
+	if (!image_id)
+		return NULL;
+
+	p = image_id;
+	end = (char *) image_id + image_id_size;
+	ceph_encode_string(&p, end, rbd_dev->spec->image_id,
+				(u32) rbd_dev->spec->image_id_len);
+
+	size = sizeof (__le32) + RBD_IMAGE_NAME_LEN_MAX;
+	reply_buf = kmalloc(size, GFP_KERNEL);
+	if (!reply_buf)
+		goto out;
+
+	ret = rbd_req_sync_exec(rbd_dev, RBD_DIRECTORY,
+				"rbd", "dir_get_name",
+				image_id, image_id_size,
+				(char *) reply_buf, size,
+				CEPH_OSD_FLAG_READ, NULL);
+	if (ret < 0)
+		goto out;
+	p = reply_buf;
+	end = (char *) reply_buf + size;
+	image_name = ceph_extract_encoded_string(&p, end, &len, GFP_KERNEL);
+	if (IS_ERR(image_name))
+		image_name = NULL;
+	else
+		dout("%s: name is %s len is %zd\n", __func__, image_name, len);
+out:
+	kfree(reply_buf);
+	kfree(image_id);
+
+	return image_name;
+}
+
+/*
+ * When a parent image gets probed, we only have the pool, image,
+ * and snapshot ids but not the names of any of them.  This call
+ * is made later to fill in those names.  It has to be done after
+ * rbd_dev_snaps_update() has completed because some of the
+ * information (in particular, snapshot name) is not available
+ * until then.
+ */
+static int rbd_dev_probe_update_spec(struct rbd_device *rbd_dev)
+{
+	struct ceph_osd_client *osdc;
+	const char *name;
+	void *reply_buf = NULL;
+	int ret;
+
+	if (rbd_dev->spec->pool_name)
+		return 0;	/* Already have the names */
+
+	/* Look up the pool name */
+
+	osdc = &rbd_dev->rbd_client->client->osdc;
+	name = ceph_pg_pool_name_by_id(osdc->osdmap, rbd_dev->spec->pool_id);
+	if (!name)
+		return -EIO;	/* pool id too large (>= 2^31) */
+
+	rbd_dev->spec->pool_name = kstrdup(name, GFP_KERNEL);
+	if (!rbd_dev->spec->pool_name)
+		return -ENOMEM;
+
+	/* Fetch the image name; tolerate failure here */
+
+	name = rbd_dev_image_name(rbd_dev);
+	if (name) {
+		rbd_dev->spec->image_name_len = strlen(name);
+		rbd_dev->spec->image_name = (char *) name;
+	} else {
+		pr_warning(RBD_DRV_NAME "%d "
+			"unable to get image name for image id %s\n",
+			rbd_dev->major, rbd_dev->spec->image_id);
+	}
+
+	/* Look up the snapshot name. */
+
+	name = rbd_snap_name(rbd_dev, rbd_dev->spec->snap_id);
+	if (!name) {
+		ret = -EIO;
+		goto out_err;
+	}
+	rbd_dev->spec->snap_name = kstrdup(name, GFP_KERNEL);
+	if(!rbd_dev->spec->snap_name)
+		goto out_err;
+
+	return 0;
+out_err:
+	kfree(reply_buf);
+	kfree(rbd_dev->spec->pool_name);
+	rbd_dev->spec->pool_name = NULL;
+
+	return ret;
+}
+
 static int rbd_dev_v2_snap_context(struct rbd_device *rbd_dev, u64 *ver)
 {
 	size_t size;
@@ -2328,7 +2727,6 @@
 	int ret;
 	void *p;
 	void *end;
-	size_t snap_name_len;
 	char *snap_name;
 
 	size = sizeof (__le32) + RBD_MAX_SNAP_NAME_LEN;
@@ -2348,9 +2746,7 @@
 
 	p = reply_buf;
 	end = (char *) reply_buf + size;
-	snap_name_len = 0;
-	snap_name = ceph_extract_encoded_string(&p, end, &snap_name_len,
-				GFP_KERNEL);
+	snap_name = ceph_extract_encoded_string(&p, end, NULL, GFP_KERNEL);
 	if (IS_ERR(snap_name)) {
 		ret = PTR_ERR(snap_name);
 		goto out;
@@ -2397,6 +2793,41 @@
 	return ERR_PTR(-EINVAL);
 }
 
+static int rbd_dev_v2_refresh(struct rbd_device *rbd_dev, u64 *hver)
+{
+	int ret;
+	__u8 obj_order;
+
+	down_write(&rbd_dev->header_rwsem);
+
+	/* Grab old order first, to see if it changes */
+
+	obj_order = rbd_dev->header.obj_order,
+	ret = rbd_dev_v2_image_size(rbd_dev);
+	if (ret)
+		goto out;
+	if (rbd_dev->header.obj_order != obj_order) {
+		ret = -EIO;
+		goto out;
+	}
+	rbd_update_mapping_size(rbd_dev);
+
+	ret = rbd_dev_v2_snap_context(rbd_dev, hver);
+	dout("rbd_dev_v2_snap_context returned %d\n", ret);
+	if (ret)
+		goto out;
+	ret = rbd_dev_snaps_update(rbd_dev);
+	dout("rbd_dev_snaps_update returned %d\n", ret);
+	if (ret)
+		goto out;
+	ret = rbd_dev_snaps_register(rbd_dev);
+	dout("rbd_dev_snaps_register returned %d\n", ret);
+out:
+	up_write(&rbd_dev->header_rwsem);
+
+	return ret;
+}
+
 /*
  * Scan the rbd device's current snapshot list and compare it to the
  * newly-received snapshot context.  Remove any existing snapshots
@@ -2436,12 +2867,12 @@
 
 			/* Existing snapshot not in the new snap context */
 
-			if (rbd_dev->mapping.snap_id == snap->id)
-				rbd_dev->mapping.snap_exists = false;
-			__rbd_remove_snap_dev(snap);
+			if (rbd_dev->spec->snap_id == snap->id)
+				rbd_dev->exists = false;
+			rbd_remove_snap_dev(snap);
 			dout("%ssnap id %llu has been removed\n",
-				rbd_dev->mapping.snap_id == snap->id ?
-								"mapped " : "",
+				rbd_dev->spec->snap_id == snap->id ?
+							"mapped " : "",
 				(unsigned long long) snap->id);
 
 			/* Done with this list entry; advance */
@@ -2559,7 +2990,7 @@
 	do {
 		ret = rbd_req_sync_watch(rbd_dev);
 		if (ret == -ERANGE) {
-			rc = rbd_refresh_header(rbd_dev, NULL);
+			rc = rbd_dev_refresh(rbd_dev, NULL);
 			if (rc < 0)
 				return rc;
 		}
@@ -2621,8 +3052,8 @@
 		struct rbd_device *rbd_dev;
 
 		rbd_dev = list_entry(tmp, struct rbd_device, node);
-		if (rbd_id > max_id)
-			max_id = rbd_id;
+		if (rbd_dev->dev_id > max_id)
+			max_id = rbd_dev->dev_id;
 	}
 	spin_unlock(&rbd_dev_list_lock);
 
@@ -2722,73 +3153,140 @@
 }
 
 /*
- * This fills in the pool_name, image_name, image_name_len, rbd_dev,
- * rbd_md_name, and name fields of the given rbd_dev, based on the
- * list of monitor addresses and other options provided via
- * /sys/bus/rbd/add.  Returns a pointer to a dynamically-allocated
- * copy of the snapshot name to map if successful, or a
- * pointer-coded error otherwise.
+ * Parse the options provided for an "rbd add" (i.e., rbd image
+ * mapping) request.  These arrive via a write to /sys/bus/rbd/add,
+ * and the data written is passed here via a NUL-terminated buffer.
+ * Returns 0 if successful or an error code otherwise.
  *
- * Note: rbd_dev is assumed to have been initially zero-filled.
+ * The information extracted from these options is recorded in
+ * the other parameters which return dynamically-allocated
+ * structures:
+ *  ceph_opts
+ *      The address of a pointer that will refer to a ceph options
+ *      structure.  Caller must release the returned pointer using
+ *      ceph_destroy_options() when it is no longer needed.
+ *  rbd_opts
+ *	Address of an rbd options pointer.  Fully initialized by
+ *	this function; caller must release with kfree().
+ *  spec
+ *	Address of an rbd image specification pointer.  Fully
+ *	initialized by this function based on parsed options.
+ *	Caller must release with rbd_spec_put().
+ *
+ * The options passed take this form:
+ *  <mon_addrs> <options> <pool_name> <image_name> [<snap_id>]
+ * where:
+ *  <mon_addrs>
+ *      A comma-separated list of one or more monitor addresses.
+ *      A monitor address is an ip address, optionally followed
+ *      by a port number (separated by a colon).
+ *        I.e.:  ip1[:port1][,ip2[:port2]...]
+ *  <options>
+ *      A comma-separated list of ceph and/or rbd options.
+ *  <pool_name>
+ *      The name of the rados pool containing the rbd image.
+ *  <image_name>
+ *      The name of the image in that pool to map.
+ *  <snap_id>
+ *      An optional snapshot id.  If provided, the mapping will
+ *      present data from the image at the time that snapshot was
+ *      created.  The image head is used if no snapshot id is
+ *      provided.  Snapshot mappings are always read-only.
  */
-static char *rbd_add_parse_args(struct rbd_device *rbd_dev,
-				const char *buf,
-				const char **mon_addrs,
-				size_t *mon_addrs_size,
-				char *options,
-				size_t options_size)
+static int rbd_add_parse_args(const char *buf,
+				struct ceph_options **ceph_opts,
+				struct rbd_options **opts,
+				struct rbd_spec **rbd_spec)
 {
 	size_t len;
-	char *err_ptr = ERR_PTR(-EINVAL);
-	char *snap_name;
+	char *options;
+	const char *mon_addrs;
+	size_t mon_addrs_size;
+	struct rbd_spec *spec = NULL;
+	struct rbd_options *rbd_opts = NULL;
+	struct ceph_options *copts;
+	int ret;
 
 	/* The first four tokens are required */
 
 	len = next_token(&buf);
 	if (!len)
-		return err_ptr;
-	*mon_addrs_size = len + 1;
-	*mon_addrs = buf;
-
+		return -EINVAL;	/* Missing monitor address(es) */
+	mon_addrs = buf;
+	mon_addrs_size = len + 1;
 	buf += len;
 
-	len = copy_token(&buf, options, options_size);
-	if (!len || len >= options_size)
-		return err_ptr;
+	ret = -EINVAL;
+	options = dup_token(&buf, NULL);
+	if (!options)
+		return -ENOMEM;
+	if (!*options)
+		goto out_err;	/* Missing options */
 
-	err_ptr = ERR_PTR(-ENOMEM);
-	rbd_dev->pool_name = dup_token(&buf, NULL);
-	if (!rbd_dev->pool_name)
-		goto out_err;
+	spec = rbd_spec_alloc();
+	if (!spec)
+		goto out_mem;
 
-	rbd_dev->image_name = dup_token(&buf, &rbd_dev->image_name_len);
-	if (!rbd_dev->image_name)
-		goto out_err;
+	spec->pool_name = dup_token(&buf, NULL);
+	if (!spec->pool_name)
+		goto out_mem;
+	if (!*spec->pool_name)
+		goto out_err;	/* Missing pool name */
 
-	/* Snapshot name is optional */
+	spec->image_name = dup_token(&buf, &spec->image_name_len);
+	if (!spec->image_name)
+		goto out_mem;
+	if (!*spec->image_name)
+		goto out_err;	/* Missing image name */
+
+	/*
+	 * Snapshot name is optional; default is to use "-"
+	 * (indicating the head/no snapshot).
+	 */
 	len = next_token(&buf);
 	if (!len) {
 		buf = RBD_SNAP_HEAD_NAME; /* No snapshot supplied */
 		len = sizeof (RBD_SNAP_HEAD_NAME) - 1;
-	}
-	snap_name = kmalloc(len + 1, GFP_KERNEL);
-	if (!snap_name)
+	} else if (len > RBD_MAX_SNAP_NAME_LEN) {
+		ret = -ENAMETOOLONG;
 		goto out_err;
-	memcpy(snap_name, buf, len);
-	*(snap_name + len) = '\0';
+	}
+	spec->snap_name = kmalloc(len + 1, GFP_KERNEL);
+	if (!spec->snap_name)
+		goto out_mem;
+	memcpy(spec->snap_name, buf, len);
+	*(spec->snap_name + len) = '\0';
 
-dout("    SNAP_NAME is <%s>, len is %zd\n", snap_name, len);
+	/* Initialize all rbd options to the defaults */
 
-	return snap_name;
+	rbd_opts = kzalloc(sizeof (*rbd_opts), GFP_KERNEL);
+	if (!rbd_opts)
+		goto out_mem;
 
+	rbd_opts->read_only = RBD_READ_ONLY_DEFAULT;
+
+	copts = ceph_parse_options(options, mon_addrs,
+					mon_addrs + mon_addrs_size - 1,
+					parse_rbd_opts_token, rbd_opts);
+	if (IS_ERR(copts)) {
+		ret = PTR_ERR(copts);
+		goto out_err;
+	}
+	kfree(options);
+
+	*ceph_opts = copts;
+	*opts = rbd_opts;
+	*rbd_spec = spec;
+
+	return 0;
+out_mem:
+	ret = -ENOMEM;
 out_err:
-	kfree(rbd_dev->image_name);
-	rbd_dev->image_name = NULL;
-	rbd_dev->image_name_len = 0;
-	kfree(rbd_dev->pool_name);
-	rbd_dev->pool_name = NULL;
+	kfree(rbd_opts);
+	rbd_spec_put(spec);
+	kfree(options);
 
-	return err_ptr;
+	return ret;
 }
 
 /*
@@ -2814,14 +3312,22 @@
 	void *p;
 
 	/*
+	 * When probing a parent image, the image id is already
+	 * known (and the image name likely is not).  There's no
+	 * need to fetch the image id again in this case.
+	 */
+	if (rbd_dev->spec->image_id)
+		return 0;
+
+	/*
 	 * First, see if the format 2 image id file exists, and if
 	 * so, get the image's persistent id from it.
 	 */
-	size = sizeof (RBD_ID_PREFIX) + rbd_dev->image_name_len;
+	size = sizeof (RBD_ID_PREFIX) + rbd_dev->spec->image_name_len;
 	object_name = kmalloc(size, GFP_NOIO);
 	if (!object_name)
 		return -ENOMEM;
-	sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->image_name);
+	sprintf(object_name, "%s%s", RBD_ID_PREFIX, rbd_dev->spec->image_name);
 	dout("rbd id object name is %s\n", object_name);
 
 	/* Response will be an encoded string, which includes a length */
@@ -2841,17 +3347,18 @@
 	dout("%s: rbd_req_sync_exec returned %d\n", __func__, ret);
 	if (ret < 0)
 		goto out;
+	ret = 0;    /* rbd_req_sync_exec() can return positive */
 
 	p = response;
-	rbd_dev->image_id = ceph_extract_encoded_string(&p,
+	rbd_dev->spec->image_id = ceph_extract_encoded_string(&p,
 						p + RBD_IMAGE_ID_LEN_MAX,
-						&rbd_dev->image_id_len,
+						&rbd_dev->spec->image_id_len,
 						GFP_NOIO);
-	if (IS_ERR(rbd_dev->image_id)) {
-		ret = PTR_ERR(rbd_dev->image_id);
-		rbd_dev->image_id = NULL;
+	if (IS_ERR(rbd_dev->spec->image_id)) {
+		ret = PTR_ERR(rbd_dev->spec->image_id);
+		rbd_dev->spec->image_id = NULL;
 	} else {
-		dout("image_id is %s\n", rbd_dev->image_id);
+		dout("image_id is %s\n", rbd_dev->spec->image_id);
 	}
 out:
 	kfree(response);
@@ -2867,26 +3374,33 @@
 
 	/* Version 1 images have no id; empty string is used */
 
-	rbd_dev->image_id = kstrdup("", GFP_KERNEL);
-	if (!rbd_dev->image_id)
+	rbd_dev->spec->image_id = kstrdup("", GFP_KERNEL);
+	if (!rbd_dev->spec->image_id)
 		return -ENOMEM;
-	rbd_dev->image_id_len = 0;
+	rbd_dev->spec->image_id_len = 0;
 
 	/* Record the header object name for this rbd image. */
 
-	size = rbd_dev->image_name_len + sizeof (RBD_SUFFIX);
+	size = rbd_dev->spec->image_name_len + sizeof (RBD_SUFFIX);
 	rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
 	if (!rbd_dev->header_name) {
 		ret = -ENOMEM;
 		goto out_err;
 	}
-	sprintf(rbd_dev->header_name, "%s%s", rbd_dev->image_name, RBD_SUFFIX);
+	sprintf(rbd_dev->header_name, "%s%s",
+		rbd_dev->spec->image_name, RBD_SUFFIX);
 
 	/* Populate rbd image metadata */
 
 	ret = rbd_read_header(rbd_dev, &rbd_dev->header);
 	if (ret < 0)
 		goto out_err;
+
+	/* Version 1 images have no parent (no layering) */
+
+	rbd_dev->parent_spec = NULL;
+	rbd_dev->parent_overlap = 0;
+
 	rbd_dev->image_format = 1;
 
 	dout("discovered version 1 image, header name is %s\n",
@@ -2897,8 +3411,8 @@
 out_err:
 	kfree(rbd_dev->header_name);
 	rbd_dev->header_name = NULL;
-	kfree(rbd_dev->image_id);
-	rbd_dev->image_id = NULL;
+	kfree(rbd_dev->spec->image_id);
+	rbd_dev->spec->image_id = NULL;
 
 	return ret;
 }
@@ -2913,12 +3427,12 @@
 	 * Image id was filled in by the caller.  Record the header
 	 * object name for this rbd image.
 	 */
-	size = sizeof (RBD_HEADER_PREFIX) + rbd_dev->image_id_len;
+	size = sizeof (RBD_HEADER_PREFIX) + rbd_dev->spec->image_id_len;
 	rbd_dev->header_name = kmalloc(size, GFP_KERNEL);
 	if (!rbd_dev->header_name)
 		return -ENOMEM;
 	sprintf(rbd_dev->header_name, "%s%s",
-			RBD_HEADER_PREFIX, rbd_dev->image_id);
+			RBD_HEADER_PREFIX, rbd_dev->spec->image_id);
 
 	/* Get the size and object order for the image */
 
@@ -2932,12 +3446,20 @@
 	if (ret < 0)
 		goto out_err;
 
-	/* Get the features for the image */
+	/* Get the and check features for the image */
 
 	ret = rbd_dev_v2_features(rbd_dev);
 	if (ret < 0)
 		goto out_err;
 
+	/* If the image supports layering, get the parent info */
+
+	if (rbd_dev->header.features & RBD_FEATURE_LAYERING) {
+		ret = rbd_dev_v2_parent_info(rbd_dev);
+		if (ret < 0)
+			goto out_err;
+	}
+
 	/* crypto and compression type aren't (yet) supported for v2 images */
 
 	rbd_dev->header.crypt_type = 0;
@@ -2955,8 +3477,11 @@
 	dout("discovered version 2 image, header name is %s\n",
 		rbd_dev->header_name);
 
-	return -ENOTSUPP;
+	return 0;
 out_err:
+	rbd_dev->parent_overlap = 0;
+	rbd_spec_put(rbd_dev->parent_spec);
+	rbd_dev->parent_spec = NULL;
 	kfree(rbd_dev->header_name);
 	rbd_dev->header_name = NULL;
 	kfree(rbd_dev->header.object_prefix);
@@ -2965,6 +3490,88 @@
 	return ret;
 }
 
+static int rbd_dev_probe_finish(struct rbd_device *rbd_dev)
+{
+	int ret;
+
+	/* no need to lock here, as rbd_dev is not registered yet */
+	ret = rbd_dev_snaps_update(rbd_dev);
+	if (ret)
+		return ret;
+
+	ret = rbd_dev_probe_update_spec(rbd_dev);
+	if (ret)
+		goto err_out_snaps;
+
+	ret = rbd_dev_set_mapping(rbd_dev);
+	if (ret)
+		goto err_out_snaps;
+
+	/* generate unique id: find highest unique id, add one */
+	rbd_dev_id_get(rbd_dev);
+
+	/* Fill in the device name, now that we have its id. */
+	BUILD_BUG_ON(DEV_NAME_LEN
+			< sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
+	sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
+
+	/* Get our block major device number. */
+
+	ret = register_blkdev(0, rbd_dev->name);
+	if (ret < 0)
+		goto err_out_id;
+	rbd_dev->major = ret;
+
+	/* Set up the blkdev mapping. */
+
+	ret = rbd_init_disk(rbd_dev);
+	if (ret)
+		goto err_out_blkdev;
+
+	ret = rbd_bus_add_dev(rbd_dev);
+	if (ret)
+		goto err_out_disk;
+
+	/*
+	 * At this point cleanup in the event of an error is the job
+	 * of the sysfs code (initiated by rbd_bus_del_dev()).
+	 */
+	down_write(&rbd_dev->header_rwsem);
+	ret = rbd_dev_snaps_register(rbd_dev);
+	up_write(&rbd_dev->header_rwsem);
+	if (ret)
+		goto err_out_bus;
+
+	ret = rbd_init_watch_dev(rbd_dev);
+	if (ret)
+		goto err_out_bus;
+
+	/* Everything's ready.  Announce the disk to the world. */
+
+	add_disk(rbd_dev->disk);
+
+	pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
+		(unsigned long long) rbd_dev->mapping.size);
+
+	return ret;
+err_out_bus:
+	/* this will also clean up rest of rbd_dev stuff */
+
+	rbd_bus_del_dev(rbd_dev);
+
+	return ret;
+err_out_disk:
+	rbd_free_disk(rbd_dev);
+err_out_blkdev:
+	unregister_blkdev(rbd_dev->major, rbd_dev->name);
+err_out_id:
+	rbd_dev_id_put(rbd_dev);
+err_out_snaps:
+	rbd_remove_all_snaps(rbd_dev);
+
+	return ret;
+}
+
 /*
  * Probe for the existence of the header object for the given rbd
  * device.  For format 2 images this includes determining the image
@@ -2984,9 +3591,16 @@
 		ret = rbd_dev_v1_probe(rbd_dev);
 	else
 		ret = rbd_dev_v2_probe(rbd_dev);
-	if (ret)
+	if (ret) {
 		dout("probe failed, returning %d\n", ret);
 
+		return ret;
+	}
+
+	ret = rbd_dev_probe_finish(rbd_dev);
+	if (ret)
+		rbd_header_free(&rbd_dev->header);
+
 	return ret;
 }
 
@@ -2994,141 +3608,64 @@
 		       const char *buf,
 		       size_t count)
 {
-	char *options;
 	struct rbd_device *rbd_dev = NULL;
-	const char *mon_addrs = NULL;
-	size_t mon_addrs_size = 0;
+	struct ceph_options *ceph_opts = NULL;
+	struct rbd_options *rbd_opts = NULL;
+	struct rbd_spec *spec = NULL;
+	struct rbd_client *rbdc;
 	struct ceph_osd_client *osdc;
 	int rc = -ENOMEM;
-	char *snap_name;
 
 	if (!try_module_get(THIS_MODULE))
 		return -ENODEV;
 
-	options = kmalloc(count, GFP_KERNEL);
-	if (!options)
-		goto err_out_mem;
-	rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
-	if (!rbd_dev)
-		goto err_out_mem;
-
-	/* static rbd_device initialization */
-	spin_lock_init(&rbd_dev->lock);
-	INIT_LIST_HEAD(&rbd_dev->node);
-	INIT_LIST_HEAD(&rbd_dev->snaps);
-	init_rwsem(&rbd_dev->header_rwsem);
-
 	/* parse add command */
-	snap_name = rbd_add_parse_args(rbd_dev, buf,
-				&mon_addrs, &mon_addrs_size, options, count);
-	if (IS_ERR(snap_name)) {
-		rc = PTR_ERR(snap_name);
-		goto err_out_mem;
-	}
-
-	rc = rbd_get_client(rbd_dev, mon_addrs, mon_addrs_size - 1, options);
+	rc = rbd_add_parse_args(buf, &ceph_opts, &rbd_opts, &spec);
 	if (rc < 0)
+		goto err_out_module;
+
+	rbdc = rbd_get_client(ceph_opts);
+	if (IS_ERR(rbdc)) {
+		rc = PTR_ERR(rbdc);
 		goto err_out_args;
+	}
+	ceph_opts = NULL;	/* rbd_dev client now owns this */
 
 	/* pick the pool */
-	osdc = &rbd_dev->rbd_client->client->osdc;
-	rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
+	osdc = &rbdc->client->osdc;
+	rc = ceph_pg_poolid_by_name(osdc->osdmap, spec->pool_name);
 	if (rc < 0)
 		goto err_out_client;
-	rbd_dev->pool_id = rc;
+	spec->pool_id = (u64) rc;
+
+	rbd_dev = rbd_dev_create(rbdc, spec);
+	if (!rbd_dev)
+		goto err_out_client;
+	rbdc = NULL;		/* rbd_dev now owns this */
+	spec = NULL;		/* rbd_dev now owns this */
+
+	rbd_dev->mapping.read_only = rbd_opts->read_only;
+	kfree(rbd_opts);
+	rbd_opts = NULL;	/* done with this */
 
 	rc = rbd_dev_probe(rbd_dev);
 	if (rc < 0)
-		goto err_out_client;
-	rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
-
-	/* no need to lock here, as rbd_dev is not registered yet */
-	rc = rbd_dev_snaps_update(rbd_dev);
-	if (rc)
-		goto err_out_header;
-
-	rc = rbd_dev_set_mapping(rbd_dev, snap_name);
-	if (rc)
-		goto err_out_header;
-
-	/* generate unique id: find highest unique id, add one */
-	rbd_dev_id_get(rbd_dev);
-
-	/* Fill in the device name, now that we have its id. */
-	BUILD_BUG_ON(DEV_NAME_LEN
-			< sizeof (RBD_DRV_NAME) + MAX_INT_FORMAT_WIDTH);
-	sprintf(rbd_dev->name, "%s%d", RBD_DRV_NAME, rbd_dev->dev_id);
-
-	/* Get our block major device number. */
-
-	rc = register_blkdev(0, rbd_dev->name);
-	if (rc < 0)
-		goto err_out_id;
-	rbd_dev->major = rc;
-
-	/* Set up the blkdev mapping. */
-
-	rc = rbd_init_disk(rbd_dev);
-	if (rc)
-		goto err_out_blkdev;
-
-	rc = rbd_bus_add_dev(rbd_dev);
-	if (rc)
-		goto err_out_disk;
-
-	/*
-	 * At this point cleanup in the event of an error is the job
-	 * of the sysfs code (initiated by rbd_bus_del_dev()).
-	 */
-
-	down_write(&rbd_dev->header_rwsem);
-	rc = rbd_dev_snaps_register(rbd_dev);
-	up_write(&rbd_dev->header_rwsem);
-	if (rc)
-		goto err_out_bus;
-
-	rc = rbd_init_watch_dev(rbd_dev);
-	if (rc)
-		goto err_out_bus;
-
-	/* Everything's ready.  Announce the disk to the world. */
-
-	add_disk(rbd_dev->disk);
-
-	pr_info("%s: added with size 0x%llx\n", rbd_dev->disk->disk_name,
-		(unsigned long long) rbd_dev->mapping.size);
+		goto err_out_rbd_dev;
 
 	return count;
-
-err_out_bus:
-	/* this will also clean up rest of rbd_dev stuff */
-
-	rbd_bus_del_dev(rbd_dev);
-	kfree(options);
-	return rc;
-
-err_out_disk:
-	rbd_free_disk(rbd_dev);
-err_out_blkdev:
-	unregister_blkdev(rbd_dev->major, rbd_dev->name);
-err_out_id:
-	rbd_dev_id_put(rbd_dev);
-err_out_header:
-	rbd_header_free(&rbd_dev->header);
+err_out_rbd_dev:
+	rbd_dev_destroy(rbd_dev);
 err_out_client:
-	kfree(rbd_dev->header_name);
-	rbd_put_client(rbd_dev);
-	kfree(rbd_dev->image_id);
+	rbd_put_client(rbdc);
 err_out_args:
-	kfree(rbd_dev->mapping.snap_name);
-	kfree(rbd_dev->image_name);
-	kfree(rbd_dev->pool_name);
-err_out_mem:
-	kfree(rbd_dev);
-	kfree(options);
+	if (ceph_opts)
+		ceph_destroy_options(ceph_opts);
+	kfree(rbd_opts);
+	rbd_spec_put(spec);
+err_out_module:
+	module_put(THIS_MODULE);
 
 	dout("Error adding device %s\n", buf);
-	module_put(THIS_MODULE);
 
 	return (ssize_t) rc;
 }
@@ -3163,7 +3700,6 @@
 	if (rbd_dev->watch_event)
 		rbd_req_sync_unwatch(rbd_dev);
 
-	rbd_put_client(rbd_dev);
 
 	/* clean up and free blkdev */
 	rbd_free_disk(rbd_dev);
@@ -3173,13 +3709,9 @@
 	rbd_header_free(&rbd_dev->header);
 
 	/* done with the id, and with the rbd_dev */
-	kfree(rbd_dev->mapping.snap_name);
-	kfree(rbd_dev->image_id);
-	kfree(rbd_dev->header_name);
-	kfree(rbd_dev->pool_name);
-	kfree(rbd_dev->image_name);
 	rbd_dev_id_put(rbd_dev);
-	kfree(rbd_dev);
+	rbd_assert(rbd_dev->rbd_client != NULL);
+	rbd_dev_destroy(rbd_dev);
 
 	/* release module ref */
 	module_put(THIS_MODULE);
@@ -3211,7 +3743,12 @@
 		goto done;
 	}
 
-	__rbd_remove_all_snaps(rbd_dev);
+	if (rbd_dev->open_count) {
+		ret = -EBUSY;
+		goto done;
+	}
+
+	rbd_remove_all_snaps(rbd_dev);
 	rbd_bus_del_dev(rbd_dev);
 
 done:
diff --git a/drivers/block/rbd_types.h b/drivers/block/rbd_types.h
index cbe77fa..49d77cb 100644
--- a/drivers/block/rbd_types.h
+++ b/drivers/block/rbd_types.h
@@ -46,8 +46,6 @@
 #define RBD_MIN_OBJ_ORDER       16
 #define RBD_MAX_OBJ_ORDER       30
 
-#define RBD_MAX_SEG_NAME_LEN	128
-
 #define RBD_COMP_NONE		0
 #define RBD_CRYPT_NONE		0
 
diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c
index 6690269..064d1a6 100644
--- a/fs/ceph/addr.c
+++ b/fs/ceph/addr.c
@@ -267,6 +267,14 @@
 	kfree(req->r_pages);
 }
 
+static void ceph_unlock_page_vector(struct page **pages, int num_pages)
+{
+	int i;
+
+	for (i = 0; i < num_pages; i++)
+		unlock_page(pages[i]);
+}
+
 /*
  * start an async read(ahead) operation.  return nr_pages we submitted
  * a read for on success, or negative error code.
@@ -347,6 +355,7 @@
 	return nr_pages;
 
 out_pages:
+	ceph_unlock_page_vector(pages, nr_pages);
 	ceph_release_page_vector(pages, nr_pages);
 out:
 	ceph_osdc_put_request(req);
@@ -1078,23 +1087,51 @@
 			    struct page **pagep, void **fsdata)
 {
 	struct inode *inode = file->f_dentry->d_inode;
+	struct ceph_inode_info *ci = ceph_inode(inode);
+	struct ceph_file_info *fi = file->private_data;
 	struct page *page;
 	pgoff_t index = pos >> PAGE_CACHE_SHIFT;
-	int r;
+	int r, want, got = 0;
+
+	if (fi->fmode & CEPH_FILE_MODE_LAZY)
+		want = CEPH_CAP_FILE_BUFFER | CEPH_CAP_FILE_LAZYIO;
+	else
+		want = CEPH_CAP_FILE_BUFFER;
+
+	dout("write_begin %p %llx.%llx %llu~%u getting caps. i_size %llu\n",
+	     inode, ceph_vinop(inode), pos, len, inode->i_size);
+	r = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, &got, pos+len);
+	if (r < 0)
+		return r;
+	dout("write_begin %p %llx.%llx %llu~%u  got cap refs on %s\n",
+	     inode, ceph_vinop(inode), pos, len, ceph_cap_string(got));
+	if (!(got & (CEPH_CAP_FILE_BUFFER|CEPH_CAP_FILE_LAZYIO))) {
+		ceph_put_cap_refs(ci, got);
+		return -EAGAIN;
+	}
 
 	do {
 		/* get a page */
 		page = grab_cache_page_write_begin(mapping, index, 0);
-		if (!page)
-			return -ENOMEM;
-		*pagep = page;
+		if (!page) {
+			r = -ENOMEM;
+			break;
+		}
 
 		dout("write_begin file %p inode %p page %p %d~%d\n", file,
 		     inode, page, (int)pos, (int)len);
 
 		r = ceph_update_writeable_page(file, pos, len, page);
+		if (r)
+			page_cache_release(page);
 	} while (r == -EAGAIN);
 
+	if (r) {
+		ceph_put_cap_refs(ci, got);
+	} else {
+		*pagep = page;
+		*(int *)fsdata = got;
+	}
 	return r;
 }
 
@@ -1108,10 +1145,12 @@
 			  struct page *page, void *fsdata)
 {
 	struct inode *inode = file->f_dentry->d_inode;
+	struct ceph_inode_info *ci = ceph_inode(inode);
 	struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
 	struct ceph_mds_client *mdsc = fsc->mdsc;
 	unsigned from = pos & (PAGE_CACHE_SIZE - 1);
 	int check_cap = 0;
+	int got = (unsigned long)fsdata;
 
 	dout("write_end file %p inode %p page %p %d~%d (%d)\n", file,
 	     inode, page, (int)pos, (int)copied, (int)len);
@@ -1134,6 +1173,19 @@
 	up_read(&mdsc->snap_rwsem);
 	page_cache_release(page);
 
+	if (copied > 0) {
+		int dirty;
+		spin_lock(&ci->i_ceph_lock);
+		dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR);
+		spin_unlock(&ci->i_ceph_lock);
+		if (dirty)
+			__mark_inode_dirty(inode, dirty);
+	}
+
+	dout("write_end %p %llx.%llx %llu~%u  dropping cap refs on %s\n",
+	     inode, ceph_vinop(inode), pos, len, ceph_cap_string(got));
+	ceph_put_cap_refs(ci, got);
+
 	if (check_cap)
 		ceph_check_caps(ceph_inode(inode), CHECK_CAPS_AUTHONLY, NULL);
 
diff --git a/fs/ceph/caps.c b/fs/ceph/caps.c
index 3251e9c..a1d9bb3 100644
--- a/fs/ceph/caps.c
+++ b/fs/ceph/caps.c
@@ -236,8 +236,10 @@
 	if (!ctx) {
 		cap = kmem_cache_alloc(ceph_cap_cachep, GFP_NOFS);
 		if (cap) {
+			spin_lock(&mdsc->caps_list_lock);
 			mdsc->caps_use_count++;
 			mdsc->caps_total_count++;
+			spin_unlock(&mdsc->caps_list_lock);
 		}
 		return cap;
 	}
@@ -1349,11 +1351,15 @@
 		if (!ci->i_head_snapc)
 			ci->i_head_snapc = ceph_get_snap_context(
 				ci->i_snap_realm->cached_context);
-		dout(" inode %p now dirty snapc %p\n", &ci->vfs_inode,
-			ci->i_head_snapc);
+		dout(" inode %p now dirty snapc %p auth cap %p\n",
+		     &ci->vfs_inode, ci->i_head_snapc, ci->i_auth_cap);
 		BUG_ON(!list_empty(&ci->i_dirty_item));
 		spin_lock(&mdsc->cap_dirty_lock);
-		list_add(&ci->i_dirty_item, &mdsc->cap_dirty);
+		if (ci->i_auth_cap)
+			list_add(&ci->i_dirty_item, &mdsc->cap_dirty);
+		else
+			list_add(&ci->i_dirty_item,
+				 &mdsc->cap_dirty_migrating);
 		spin_unlock(&mdsc->cap_dirty_lock);
 		if (ci->i_flushing_caps == 0) {
 			ihold(inode);
@@ -2388,7 +2394,7 @@
 			    &atime);
 
 	/* max size increase? */
-	if (max_size != ci->i_max_size) {
+	if (ci->i_auth_cap == cap && max_size != ci->i_max_size) {
 		dout("max_size %lld -> %llu\n", ci->i_max_size, max_size);
 		ci->i_max_size = max_size;
 		if (max_size >= ci->i_wanted_max_size) {
@@ -2745,6 +2751,7 @@
 
 	/* make sure we re-request max_size, if necessary */
 	spin_lock(&ci->i_ceph_lock);
+	ci->i_wanted_max_size = 0;  /* reset */
 	ci->i_requested_max_size = 0;
 	spin_unlock(&ci->i_ceph_lock);
 }
@@ -2840,8 +2847,6 @@
 	case CEPH_CAP_OP_IMPORT:
 		handle_cap_import(mdsc, inode, h, session,
 				  snaptrace, snaptrace_len);
-		ceph_check_caps(ceph_inode(inode), 0, session);
-		goto done_unlocked;
 	}
 
 	/* the rest require a cap */
@@ -2858,6 +2863,7 @@
 	switch (op) {
 	case CEPH_CAP_OP_REVOKE:
 	case CEPH_CAP_OP_GRANT:
+	case CEPH_CAP_OP_IMPORT:
 		handle_cap_grant(inode, h, session, cap, msg->middle);
 		goto done_unlocked;
 
diff --git a/fs/ceph/file.c b/fs/ceph/file.c
index d4dfdcf..e51558f 100644
--- a/fs/ceph/file.c
+++ b/fs/ceph/file.c
@@ -712,63 +712,53 @@
 	struct ceph_osd_client *osdc =
 		&ceph_sb_to_client(inode->i_sb)->client->osdc;
 	loff_t endoff = pos + iov->iov_len;
-	int want, got = 0;
-	int ret, err;
+	int got = 0;
+	int ret, err, written;
 
 	if (ceph_snap(inode) != CEPH_NOSNAP)
 		return -EROFS;
 
 retry_snap:
+	written = 0;
 	if (ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_FULL))
 		return -ENOSPC;
 	__ceph_do_pending_vmtruncate(inode);
-	dout("aio_write %p %llx.%llx %llu~%u getting caps. i_size %llu\n",
-	     inode, ceph_vinop(inode), pos, (unsigned)iov->iov_len,
-	     inode->i_size);
-	if (fi->fmode & CEPH_FILE_MODE_LAZY)
-		want = CEPH_CAP_FILE_BUFFER | CEPH_CAP_FILE_LAZYIO;
-	else
-		want = CEPH_CAP_FILE_BUFFER;
-	ret = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, &got, endoff);
-	if (ret < 0)
-		goto out_put;
 
-	dout("aio_write %p %llx.%llx %llu~%u  got cap refs on %s\n",
-	     inode, ceph_vinop(inode), pos, (unsigned)iov->iov_len,
-	     ceph_cap_string(got));
-
-	if ((got & (CEPH_CAP_FILE_BUFFER|CEPH_CAP_FILE_LAZYIO)) == 0 ||
-	    (iocb->ki_filp->f_flags & O_DIRECT) ||
-	    (inode->i_sb->s_flags & MS_SYNCHRONOUS) ||
-	    (fi->flags & CEPH_F_SYNC)) {
-		ret = ceph_sync_write(file, iov->iov_base, iov->iov_len,
-			&iocb->ki_pos);
-	} else {
-		/*
-		 * buffered write; drop Fw early to avoid slow
-		 * revocation if we get stuck on balance_dirty_pages
-		 */
-		int dirty;
-
-		spin_lock(&ci->i_ceph_lock);
-		dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR);
-		spin_unlock(&ci->i_ceph_lock);
-		ceph_put_cap_refs(ci, got);
-
+	/*
+	 * try to do a buffered write.  if we don't have sufficient
+	 * caps, we'll get -EAGAIN from generic_file_aio_write, or a
+	 * short write if we only get caps for some pages.
+	 */
+	if (!(iocb->ki_filp->f_flags & O_DIRECT) &&
+	    !(inode->i_sb->s_flags & MS_SYNCHRONOUS) &&
+	    !(fi->flags & CEPH_F_SYNC)) {
 		ret = generic_file_aio_write(iocb, iov, nr_segs, pos);
+		if (ret >= 0)
+			written = ret;
+
 		if ((ret >= 0 || ret == -EIOCBQUEUED) &&
 		    ((file->f_flags & O_SYNC) || IS_SYNC(file->f_mapping->host)
 		     || ceph_osdmap_flag(osdc->osdmap, CEPH_OSDMAP_NEARFULL))) {
-			err = vfs_fsync_range(file, pos, pos + ret - 1, 1);
+			err = vfs_fsync_range(file, pos, pos + written - 1, 1);
 			if (err < 0)
 				ret = err;
 		}
-
-		if (dirty)
-			__mark_inode_dirty(inode, dirty);
-		goto out;
+		if ((ret < 0 && ret != -EAGAIN) || pos + written >= endoff)
+			goto out;
 	}
 
+	dout("aio_write %p %llx.%llx %llu~%u getting caps. i_size %llu\n",
+	     inode, ceph_vinop(inode), pos + written,
+	     (unsigned)iov->iov_len - written, inode->i_size);
+	ret = ceph_get_caps(ci, CEPH_CAP_FILE_WR, 0, &got, endoff);
+	if (ret < 0)
+		goto out;
+
+	dout("aio_write %p %llx.%llx %llu~%u  got cap refs on %s\n",
+	     inode, ceph_vinop(inode), pos + written,
+	     (unsigned)iov->iov_len - written, ceph_cap_string(got));
+	ret = ceph_sync_write(file, iov->iov_base + written,
+			      iov->iov_len - written, &iocb->ki_pos);
 	if (ret >= 0) {
 		int dirty;
 		spin_lock(&ci->i_ceph_lock);
@@ -777,13 +767,10 @@
 		if (dirty)
 			__mark_inode_dirty(inode, dirty);
 	}
-
-out_put:
 	dout("aio_write %p %llx.%llx %llu~%u  dropping cap refs on %s\n",
-	     inode, ceph_vinop(inode), pos, (unsigned)iov->iov_len,
-	     ceph_cap_string(got));
+	     inode, ceph_vinop(inode), pos + written,
+	     (unsigned)iov->iov_len - written, ceph_cap_string(got));
 	ceph_put_cap_refs(ci, got);
-
 out:
 	if (ret == -EOLDSNAPC) {
 		dout("aio_write %p %llx.%llx %llu~%u got EOLDSNAPC, retrying\n",
diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c
index ba95eea..2971eaa 100644
--- a/fs/ceph/inode.c
+++ b/fs/ceph/inode.c
@@ -1466,7 +1466,7 @@
 {
 	struct ceph_inode_info *ci = ceph_inode(inode);
 	u64 to;
-	int wrbuffer_refs, wake = 0;
+	int wrbuffer_refs, finish = 0;
 
 retry:
 	spin_lock(&ci->i_ceph_lock);
@@ -1498,15 +1498,18 @@
 	truncate_inode_pages(inode->i_mapping, to);
 
 	spin_lock(&ci->i_ceph_lock);
-	ci->i_truncate_pending--;
-	if (ci->i_truncate_pending == 0)
-		wake = 1;
+	if (to == ci->i_truncate_size) {
+		ci->i_truncate_pending = 0;
+		finish = 1;
+	}
 	spin_unlock(&ci->i_ceph_lock);
+	if (!finish)
+		goto retry;
 
 	if (wrbuffer_refs == 0)
 		ceph_check_caps(ci, CHECK_CAPS_AUTHONLY, NULL);
-	if (wake)
-		wake_up_all(&ci->i_cap_wq);
+
+	wake_up_all(&ci->i_cap_wq);
 }
 
 
diff --git a/fs/ceph/mds_client.c b/fs/ceph/mds_client.c
index 1bcf712..9165eb8 100644
--- a/fs/ceph/mds_client.c
+++ b/fs/ceph/mds_client.c
@@ -1590,7 +1590,7 @@
 	} else if (rpath || rino) {
 		*ino = rino;
 		*ppath = rpath;
-		*pathlen = strlen(rpath);
+		*pathlen = rpath ? strlen(rpath) : 0;
 		dout(" path %.*s\n", *pathlen, rpath);
 	}
 
@@ -1876,9 +1876,14 @@
 static void __wake_requests(struct ceph_mds_client *mdsc,
 			    struct list_head *head)
 {
-	struct ceph_mds_request *req, *nreq;
+	struct ceph_mds_request *req;
+	LIST_HEAD(tmp_list);
 
-	list_for_each_entry_safe(req, nreq, head, r_wait) {
+	list_splice_init(head, &tmp_list);
+
+	while (!list_empty(&tmp_list)) {
+		req = list_entry(tmp_list.next,
+				 struct ceph_mds_request, r_wait);
 		list_del_init(&req->r_wait);
 		__do_request(mdsc, req);
 	}
diff --git a/fs/ceph/super.c b/fs/ceph/super.c
index 2eb43f2..e86aa994 100644
--- a/fs/ceph/super.c
+++ b/fs/ceph/super.c
@@ -403,8 +403,6 @@
 		seq_printf(m, ",mount_timeout=%d", opt->mount_timeout);
 	if (opt->osd_idle_ttl != CEPH_OSD_IDLE_TTL_DEFAULT)
 		seq_printf(m, ",osd_idle_ttl=%d", opt->osd_idle_ttl);
-	if (opt->osd_timeout != CEPH_OSD_TIMEOUT_DEFAULT)
-		seq_printf(m, ",osdtimeout=%d", opt->osd_timeout);
 	if (opt->osd_keepalive_timeout != CEPH_OSD_KEEPALIVE_DEFAULT)
 		seq_printf(m, ",osdkeepalivetimeout=%d",
 			   opt->osd_keepalive_timeout);
@@ -849,7 +847,7 @@
 		fsc->backing_dev_info.ra_pages =
 			default_backing_dev_info.ra_pages;
 
-	err = bdi_register(&fsc->backing_dev_info, NULL, "ceph-%d",
+	err = bdi_register(&fsc->backing_dev_info, NULL, "ceph-%ld",
 			   atomic_long_inc_return(&bdi_seq));
 	if (!err)
 		sb->s_bdi = &fsc->backing_dev_info;
diff --git a/include/linux/backing-dev.h b/include/linux/backing-dev.h
index 2a9a9ab..12731a19 100644
--- a/include/linux/backing-dev.h
+++ b/include/linux/backing-dev.h
@@ -114,6 +114,7 @@
 int bdi_init(struct backing_dev_info *bdi);
 void bdi_destroy(struct backing_dev_info *bdi);
 
+__printf(3, 4)
 int bdi_register(struct backing_dev_info *bdi, struct device *parent,
 		const char *fmt, ...);
 int bdi_register_dev(struct backing_dev_info *bdi, dev_t dev);
diff --git a/include/linux/ceph/libceph.h b/include/linux/ceph/libceph.h
index 6470792..084d3c6 100644
--- a/include/linux/ceph/libceph.h
+++ b/include/linux/ceph/libceph.h
@@ -43,7 +43,6 @@
 	struct ceph_entity_addr my_addr;
 	int mount_timeout;
 	int osd_idle_ttl;
-	int osd_timeout;
 	int osd_keepalive_timeout;
 
 	/*
@@ -63,7 +62,6 @@
  * defaults
  */
 #define CEPH_MOUNT_TIMEOUT_DEFAULT  60
-#define CEPH_OSD_TIMEOUT_DEFAULT    60  /* seconds */
 #define CEPH_OSD_KEEPALIVE_DEFAULT  5
 #define CEPH_OSD_IDLE_TTL_DEFAULT    60
 
diff --git a/include/linux/ceph/osdmap.h b/include/linux/ceph/osdmap.h
index e37acbe..10a417f 100644
--- a/include/linux/ceph/osdmap.h
+++ b/include/linux/ceph/osdmap.h
@@ -123,6 +123,7 @@
 extern int ceph_calc_pg_primary(struct ceph_osdmap *osdmap,
 				struct ceph_pg pgid);
 
+extern const char *ceph_pg_pool_name_by_id(struct ceph_osdmap *map, u64 id);
 extern int ceph_pg_poolid_by_name(struct ceph_osdmap *map, const char *name);
 
 #endif
diff --git a/include/linux/ceph/rados.h b/include/linux/ceph/rados.h
index de91fbd..2c04afe 100644
--- a/include/linux/ceph/rados.h
+++ b/include/linux/ceph/rados.h
@@ -87,6 +87,8 @@
  *
  *  lpgp_num -- as above.
  */
+#define CEPH_NOPOOL  ((__u64) (-1))  /* pool id not defined */
+
 #define CEPH_PG_TYPE_REP     1
 #define CEPH_PG_TYPE_RAID4   2
 #define CEPH_PG_POOL_VERSION 2
diff --git a/net/ceph/ceph_common.c b/net/ceph/ceph_common.c
index a802029..ee71ea2 100644
--- a/net/ceph/ceph_common.c
+++ b/net/ceph/ceph_common.c
@@ -305,7 +305,6 @@
 
 	/* start with defaults */
 	opt->flags = CEPH_OPT_DEFAULT;
-	opt->osd_timeout = CEPH_OSD_TIMEOUT_DEFAULT;
 	opt->osd_keepalive_timeout = CEPH_OSD_KEEPALIVE_DEFAULT;
 	opt->mount_timeout = CEPH_MOUNT_TIMEOUT_DEFAULT; /* seconds */
 	opt->osd_idle_ttl = CEPH_OSD_IDLE_TTL_DEFAULT;   /* seconds */
@@ -391,7 +390,7 @@
 
 			/* misc */
 		case Opt_osdtimeout:
-			opt->osd_timeout = intval;
+			pr_warning("ignoring deprecated osdtimeout option\n");
 			break;
 		case Opt_osdkeepalivetimeout:
 			opt->osd_keepalive_timeout = intval;
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index 3ef1759..4d111fd 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -2244,22 +2244,62 @@
 
 
 /*
- * Atomically queue work on a connection.  Bump @con reference to
- * avoid races with connection teardown.
+ * Atomically queue work on a connection after the specified delay.
+ * Bump @con reference to avoid races with connection teardown.
+ * Returns 0 if work was queued, or an error code otherwise.
  */
-static void queue_con(struct ceph_connection *con)
+static int queue_con_delay(struct ceph_connection *con, unsigned long delay)
 {
 	if (!con->ops->get(con)) {
-		dout("queue_con %p ref count 0\n", con);
-		return;
+		dout("%s %p ref count 0\n", __func__, con);
+
+		return -ENOENT;
 	}
 
-	if (!queue_delayed_work(ceph_msgr_wq, &con->work, 0)) {
-		dout("queue_con %p - already queued\n", con);
+	if (!queue_delayed_work(ceph_msgr_wq, &con->work, delay)) {
+		dout("%s %p - already queued\n", __func__, con);
 		con->ops->put(con);
-	} else {
-		dout("queue_con %p\n", con);
+
+		return -EBUSY;
 	}
+
+	dout("%s %p %lu\n", __func__, con, delay);
+
+	return 0;
+}
+
+static void queue_con(struct ceph_connection *con)
+{
+	(void) queue_con_delay(con, 0);
+}
+
+static bool con_sock_closed(struct ceph_connection *con)
+{
+	if (!test_and_clear_bit(CON_FLAG_SOCK_CLOSED, &con->flags))
+		return false;
+
+#define CASE(x)								\
+	case CON_STATE_ ## x:						\
+		con->error_msg = "socket closed (con state " #x ")";	\
+		break;
+
+	switch (con->state) {
+	CASE(CLOSED);
+	CASE(PREOPEN);
+	CASE(CONNECTING);
+	CASE(NEGOTIATING);
+	CASE(OPEN);
+	CASE(STANDBY);
+	default:
+		pr_warning("%s con %p unrecognized state %lu\n",
+			__func__, con, con->state);
+		con->error_msg = "unrecognized con state";
+		BUG();
+		break;
+	}
+#undef CASE
+
+	return true;
 }
 
 /*
@@ -2273,35 +2313,16 @@
 
 	mutex_lock(&con->mutex);
 restart:
-	if (test_and_clear_bit(CON_FLAG_SOCK_CLOSED, &con->flags)) {
-		switch (con->state) {
-		case CON_STATE_CONNECTING:
-			con->error_msg = "connection failed";
-			break;
-		case CON_STATE_NEGOTIATING:
-			con->error_msg = "negotiation failed";
-			break;
-		case CON_STATE_OPEN:
-			con->error_msg = "socket closed";
-			break;
-		default:
-			dout("unrecognized con state %d\n", (int)con->state);
-			con->error_msg = "unrecognized con state";
-			BUG();
-		}
+	if (con_sock_closed(con))
 		goto fault;
-	}
 
 	if (test_and_clear_bit(CON_FLAG_BACKOFF, &con->flags)) {
 		dout("con_work %p backing off\n", con);
-		if (queue_delayed_work(ceph_msgr_wq, &con->work,
-				       round_jiffies_relative(con->delay))) {
-			dout("con_work %p backoff %lu\n", con, con->delay);
-			mutex_unlock(&con->mutex);
-			return;
-		} else {
+		ret = queue_con_delay(con, round_jiffies_relative(con->delay));
+		if (ret) {
 			dout("con_work %p FAILED to back off %lu\n", con,
 			     con->delay);
+			BUG_ON(ret == -ENOENT);
 			set_bit(CON_FLAG_BACKOFF, &con->flags);
 		}
 		goto done;
@@ -2356,7 +2377,7 @@
 static void ceph_fault(struct ceph_connection *con)
 	__releases(con->mutex)
 {
-	pr_err("%s%lld %s %s\n", ENTITY_NAME(con->peer_name),
+	pr_warning("%s%lld %s %s\n", ENTITY_NAME(con->peer_name),
 	       ceph_pr_addr(&con->peer_addr.in_addr), con->error_msg);
 	dout("fault %p state %lu to peer %s\n",
 	     con, con->state, ceph_pr_addr(&con->peer_addr.in_addr));
@@ -2398,24 +2419,8 @@
 			con->delay = BASE_DELAY_INTERVAL;
 		else if (con->delay < MAX_DELAY_INTERVAL)
 			con->delay *= 2;
-		con->ops->get(con);
-		if (queue_delayed_work(ceph_msgr_wq, &con->work,
-				       round_jiffies_relative(con->delay))) {
-			dout("fault queued %p delay %lu\n", con, con->delay);
-		} else {
-			con->ops->put(con);
-			dout("fault failed to queue %p delay %lu, backoff\n",
-			     con, con->delay);
-			/*
-			 * In many cases we see a socket state change
-			 * while con_work is running and end up
-			 * queuing (non-delayed) work, such that we
-			 * can't backoff with a delay.  Set a flag so
-			 * that when con_work restarts we schedule the
-			 * delay then.
-			 */
-			set_bit(CON_FLAG_BACKOFF, &con->flags);
-		}
+		set_bit(CON_FLAG_BACKOFF, &con->flags);
+		queue_con(con);
 	}
 
 out_unlock:
diff --git a/net/ceph/osd_client.c b/net/ceph/osd_client.c
index c1d756c..780caf6 100644
--- a/net/ceph/osd_client.c
+++ b/net/ceph/osd_client.c
@@ -221,6 +221,7 @@
 	kref_init(&req->r_kref);
 	init_completion(&req->r_completion);
 	init_completion(&req->r_safe_completion);
+	RB_CLEAR_NODE(&req->r_node);
 	INIT_LIST_HEAD(&req->r_unsafe_item);
 	INIT_LIST_HEAD(&req->r_linger_item);
 	INIT_LIST_HEAD(&req->r_linger_osd);
@@ -580,7 +581,7 @@
 
 	dout("__kick_osd_requests osd%d\n", osd->o_osd);
 	err = __reset_osd(osdc, osd);
-	if (err == -EAGAIN)
+	if (err)
 		return;
 
 	list_for_each_entry(req, &osd->o_requests, r_osd_item) {
@@ -607,14 +608,6 @@
 	}
 }
 
-static void kick_osd_requests(struct ceph_osd_client *osdc,
-			      struct ceph_osd *kickosd)
-{
-	mutex_lock(&osdc->request_mutex);
-	__kick_osd_requests(osdc, kickosd);
-	mutex_unlock(&osdc->request_mutex);
-}
-
 /*
  * If the osd connection drops, we need to resubmit all requests.
  */
@@ -628,7 +621,9 @@
 	dout("osd_reset osd%d\n", osd->o_osd);
 	osdc = osd->o_osdc;
 	down_read(&osdc->map_sem);
-	kick_osd_requests(osdc, osd);
+	mutex_lock(&osdc->request_mutex);
+	__kick_osd_requests(osdc, osd);
+	mutex_unlock(&osdc->request_mutex);
 	send_queued(osdc);
 	up_read(&osdc->map_sem);
 }
@@ -647,6 +642,7 @@
 	atomic_set(&osd->o_ref, 1);
 	osd->o_osdc = osdc;
 	osd->o_osd = onum;
+	RB_CLEAR_NODE(&osd->o_node);
 	INIT_LIST_HEAD(&osd->o_requests);
 	INIT_LIST_HEAD(&osd->o_linger_requests);
 	INIT_LIST_HEAD(&osd->o_osd_lru);
@@ -750,6 +746,7 @@
 	if (list_empty(&osd->o_requests) &&
 	    list_empty(&osd->o_linger_requests)) {
 		__remove_osd(osdc, osd);
+		ret = -ENODEV;
 	} else if (memcmp(&osdc->osdmap->osd_addr[osd->o_osd],
 			  &osd->o_con.peer_addr,
 			  sizeof(osd->o_con.peer_addr)) == 0 &&
@@ -876,9 +873,9 @@
 			req->r_osd = NULL;
 	}
 
+	list_del_init(&req->r_req_lru_item);
 	ceph_osdc_put_request(req);
 
-	list_del_init(&req->r_req_lru_item);
 	if (osdc->num_requests == 0) {
 		dout(" no requests, canceling timeout\n");
 		__cancel_osd_timeout(osdc);
@@ -910,8 +907,8 @@
 					struct ceph_osd_request *req)
 {
 	dout("__unregister_linger_request %p\n", req);
+	list_del_init(&req->r_linger_item);
 	if (req->r_osd) {
-		list_del_init(&req->r_linger_item);
 		list_del_init(&req->r_linger_osd);
 
 		if (list_empty(&req->r_osd->o_requests) &&
@@ -1090,12 +1087,10 @@
 {
 	struct ceph_osd_client *osdc =
 		container_of(work, struct ceph_osd_client, timeout_work.work);
-	struct ceph_osd_request *req, *last_req = NULL;
+	struct ceph_osd_request *req;
 	struct ceph_osd *osd;
-	unsigned long timeout = osdc->client->options->osd_timeout * HZ;
 	unsigned long keepalive =
 		osdc->client->options->osd_keepalive_timeout * HZ;
-	unsigned long last_stamp = 0;
 	struct list_head slow_osds;
 	dout("timeout\n");
 	down_read(&osdc->map_sem);
@@ -1105,37 +1100,6 @@
 	mutex_lock(&osdc->request_mutex);
 
 	/*
-	 * reset osds that appear to be _really_ unresponsive.  this
-	 * is a failsafe measure.. we really shouldn't be getting to
-	 * this point if the system is working properly.  the monitors
-	 * should mark the osd as failed and we should find out about
-	 * it from an updated osd map.
-	 */
-	while (timeout && !list_empty(&osdc->req_lru)) {
-		req = list_entry(osdc->req_lru.next, struct ceph_osd_request,
-				 r_req_lru_item);
-
-		/* hasn't been long enough since we sent it? */
-		if (time_before(jiffies, req->r_stamp + timeout))
-			break;
-
-		/* hasn't been long enough since it was acked? */
-		if (req->r_request->ack_stamp == 0 ||
-		    time_before(jiffies, req->r_request->ack_stamp + timeout))
-			break;
-
-		BUG_ON(req == last_req && req->r_stamp == last_stamp);
-		last_req = req;
-		last_stamp = req->r_stamp;
-
-		osd = req->r_osd;
-		BUG_ON(!osd);
-		pr_warning(" tid %llu timed out on osd%d, will reset osd\n",
-			   req->r_tid, osd->o_osd);
-		__kick_osd_requests(osdc, osd);
-	}
-
-	/*
 	 * ping osds that are a bit slow.  this ensures that if there
 	 * is a break in the TCP connection we will notice, and reopen
 	 * a connection with that osd (from the fault callback).
@@ -1364,8 +1328,8 @@
 
 		dout("kicking lingering %p tid %llu osd%d\n", req, req->r_tid,
 		     req->r_osd ? req->r_osd->o_osd : -1);
-		__unregister_linger_request(osdc, req);
 		__register_request(osdc, req);
+		__unregister_linger_request(osdc, req);
 	}
 	mutex_unlock(&osdc->request_mutex);
 
@@ -1599,6 +1563,7 @@
 	event->data = data;
 	event->osdc = osdc;
 	INIT_LIST_HEAD(&event->osd_node);
+	RB_CLEAR_NODE(&event->node);
 	kref_init(&event->kref);   /* one ref for us */
 	kref_get(&event->kref);    /* one ref for the caller */
 	init_completion(&event->completion);
diff --git a/net/ceph/osdmap.c b/net/ceph/osdmap.c
index 5433fb0..de73214 100644
--- a/net/ceph/osdmap.c
+++ b/net/ceph/osdmap.c
@@ -469,6 +469,22 @@
 	return NULL;
 }
 
+const char *ceph_pg_pool_name_by_id(struct ceph_osdmap *map, u64 id)
+{
+	struct ceph_pg_pool_info *pi;
+
+	if (id == CEPH_NOPOOL)
+		return NULL;
+
+	if (WARN_ON_ONCE(id > (u64) INT_MAX))
+		return NULL;
+
+	pi = __lookup_pg_pool(&map->pg_pools, (int) id);
+
+	return pi ? pi->name : NULL;
+}
+EXPORT_SYMBOL(ceph_pg_pool_name_by_id);
+
 int ceph_pg_poolid_by_name(struct ceph_osdmap *map, const char *name)
 {
 	struct rb_node *rbp;
@@ -645,10 +661,12 @@
 	ceph_decode_32_safe(p, end, max, bad);
 	while (max--) {
 		ceph_decode_need(p, end, 4 + 1 + sizeof(pi->v), bad);
+		err = -ENOMEM;
 		pi = kzalloc(sizeof(*pi), GFP_NOFS);
 		if (!pi)
 			goto bad;
 		pi->id = ceph_decode_32(p);
+		err = -EINVAL;
 		ev = ceph_decode_8(p); /* encoding version */
 		if (ev > CEPH_PG_POOL_VERSION) {
 			pr_warning("got unknown v %d > %d of ceph_pg_pool\n",
@@ -664,8 +682,13 @@
 		__insert_pg_pool(&map->pg_pools, pi);
 	}
 
-	if (version >= 5 && __decode_pool_names(p, end, map) < 0)
-		goto bad;
+	if (version >= 5) {
+		err = __decode_pool_names(p, end, map);
+		if (err < 0) {
+			dout("fail to decode pool names");
+			goto bad;
+		}
+	}
 
 	ceph_decode_32_safe(p, end, map->pool_max, bad);
 
@@ -745,7 +768,7 @@
 	return map;
 
 bad:
-	dout("osdmap_decode fail\n");
+	dout("osdmap_decode fail err %d\n", err);
 	ceph_osdmap_destroy(map);
 	return ERR_PTR(err);
 }
@@ -839,6 +862,7 @@
 		if (ev > CEPH_PG_POOL_VERSION) {
 			pr_warning("got unknown v %d > %d of ceph_pg_pool\n",
 				   ev, CEPH_PG_POOL_VERSION);
+			err = -EINVAL;
 			goto bad;
 		}
 		pi = __lookup_pg_pool(&map->pg_pools, pool);
@@ -855,8 +879,11 @@
 		if (err < 0)
 			goto bad;
 	}
-	if (version >= 5 && __decode_pool_names(p, end, map) < 0)
-		goto bad;
+	if (version >= 5) {
+		err = __decode_pool_names(p, end, map);
+		if (err < 0)
+			goto bad;
+	}
 
 	/* old_pool */
 	ceph_decode_32_safe(p, end, len, bad);
@@ -932,15 +959,13 @@
 			(void) __remove_pg_mapping(&map->pg_temp, pgid);
 
 			/* insert */
-			if (pglen > (UINT_MAX - sizeof(*pg)) / sizeof(u32)) {
-				err = -EINVAL;
+			err = -EINVAL;
+			if (pglen > (UINT_MAX - sizeof(*pg)) / sizeof(u32))
 				goto bad;
-			}
+			err = -ENOMEM;
 			pg = kmalloc(sizeof(*pg) + sizeof(u32)*pglen, GFP_NOFS);
-			if (!pg) {
-				err = -ENOMEM;
+			if (!pg)
 				goto bad;
-			}
 			pg->pgid = pgid;
 			pg->len = pglen;
 			for (j = 0; j < pglen; j++)