rbd: support for exclusive-lock feature

Add basic support for RBD_FEATURE_EXCLUSIVE_LOCK feature.  Maintenance
operations (resize, snapshot create, etc) are offloaded to librbd via
returning -EOPNOTSUPP - librbd should request the lock and execute the
operation.

Signed-off-by: Ilya Dryomov <idryomov@gmail.com>
Reviewed-by: Mike Christie <mchristi@redhat.com>
Tested-by: Mike Christie <mchristi@redhat.com>
diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index cb96fb1..7cda1cc 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -31,6 +31,7 @@
 #include <linux/ceph/libceph.h>
 #include <linux/ceph/osd_client.h>
 #include <linux/ceph/mon_client.h>
+#include <linux/ceph/cls_lock_client.h>
 #include <linux/ceph/decode.h>
 #include <linux/parser.h>
 #include <linux/bsearch.h>
@@ -114,14 +115,17 @@
 
 #define RBD_OBJ_PREFIX_LEN_MAX	64
 
+#define RBD_NOTIFY_TIMEOUT	5	/* seconds */
 #define RBD_RETRY_DELAY		msecs_to_jiffies(1000)
 
 /* Feature bits */
 
 #define RBD_FEATURE_LAYERING	(1<<0)
 #define RBD_FEATURE_STRIPINGV2	(1<<1)
-#define RBD_FEATURES_ALL \
-	    (RBD_FEATURE_LAYERING | RBD_FEATURE_STRIPINGV2)
+#define RBD_FEATURE_EXCLUSIVE_LOCK (1<<2)
+#define RBD_FEATURES_ALL	(RBD_FEATURE_LAYERING |		\
+				 RBD_FEATURE_STRIPINGV2 |	\
+				 RBD_FEATURE_EXCLUSIVE_LOCK)
 
 /* Features supported by this (client software) implementation. */
 
@@ -327,6 +331,18 @@
 	RBD_WATCH_STATE_ERROR,
 };
 
+enum rbd_lock_state {
+	RBD_LOCK_STATE_UNLOCKED,
+	RBD_LOCK_STATE_LOCKED,
+	RBD_LOCK_STATE_RELEASING,
+};
+
+/* WatchNotify::ClientId */
+struct rbd_client_id {
+	u64 gid;
+	u64 handle;
+};
+
 struct rbd_mapping {
 	u64                     size;
 	u64                     features;
@@ -366,6 +382,15 @@
 	u64			watch_cookie;
 	struct delayed_work	watch_dwork;
 
+	struct rw_semaphore	lock_rwsem;
+	enum rbd_lock_state	lock_state;
+	struct rbd_client_id	owner_cid;
+	struct work_struct	acquired_lock_work;
+	struct work_struct	released_lock_work;
+	struct delayed_work	lock_dwork;
+	struct work_struct	unlock_work;
+	wait_queue_head_t	lock_waitq;
+
 	struct workqueue_struct	*task_wq;
 
 	struct rbd_spec		*parent_spec;
@@ -450,6 +475,29 @@
 	return minor >> RBD_SINGLE_MAJOR_PART_SHIFT;
 }
 
+static bool rbd_is_lock_supported(struct rbd_device *rbd_dev)
+{
+	return (rbd_dev->header.features & RBD_FEATURE_EXCLUSIVE_LOCK) &&
+	       rbd_dev->spec->snap_id == CEPH_NOSNAP &&
+	       !rbd_dev->mapping.read_only;
+}
+
+static bool __rbd_is_lock_owner(struct rbd_device *rbd_dev)
+{
+	return rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED ||
+	       rbd_dev->lock_state == RBD_LOCK_STATE_RELEASING;
+}
+
+static bool rbd_is_lock_owner(struct rbd_device *rbd_dev)
+{
+	bool is_lock_owner;
+
+	down_read(&rbd_dev->lock_rwsem);
+	is_lock_owner = __rbd_is_lock_owner(rbd_dev);
+	up_read(&rbd_dev->lock_rwsem);
+	return is_lock_owner;
+}
+
 static BUS_ATTR(add, S_IWUSR, NULL, rbd_add);
 static BUS_ATTR(remove, S_IWUSR, NULL, rbd_remove);
 static BUS_ATTR(add_single_major, S_IWUSR, NULL, rbd_add_single_major);
@@ -3095,31 +3143,690 @@
 	obj_request_done_set(obj_request);
 }
 
+static const struct rbd_client_id rbd_empty_cid;
+
+static bool rbd_cid_equal(const struct rbd_client_id *lhs,
+			  const struct rbd_client_id *rhs)
+{
+	return lhs->gid == rhs->gid && lhs->handle == rhs->handle;
+}
+
+static struct rbd_client_id rbd_get_cid(struct rbd_device *rbd_dev)
+{
+	struct rbd_client_id cid;
+
+	mutex_lock(&rbd_dev->watch_mutex);
+	cid.gid = ceph_client_gid(rbd_dev->rbd_client->client);
+	cid.handle = rbd_dev->watch_cookie;
+	mutex_unlock(&rbd_dev->watch_mutex);
+	return cid;
+}
+
+/*
+ * lock_rwsem must be held for write
+ */
+static void rbd_set_owner_cid(struct rbd_device *rbd_dev,
+			      const struct rbd_client_id *cid)
+{
+	dout("%s rbd_dev %p %llu-%llu -> %llu-%llu\n", __func__, rbd_dev,
+	     rbd_dev->owner_cid.gid, rbd_dev->owner_cid.handle,
+	     cid->gid, cid->handle);
+	rbd_dev->owner_cid = *cid; /* struct */
+}
+
+static void format_lock_cookie(struct rbd_device *rbd_dev, char *buf)
+{
+	mutex_lock(&rbd_dev->watch_mutex);
+	sprintf(buf, "%s %llu", RBD_LOCK_COOKIE_PREFIX, rbd_dev->watch_cookie);
+	mutex_unlock(&rbd_dev->watch_mutex);
+}
+
+/*
+ * lock_rwsem must be held for write
+ */
+static int rbd_lock(struct rbd_device *rbd_dev)
+{
+	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
+	struct rbd_client_id cid = rbd_get_cid(rbd_dev);
+	char cookie[32];
+	int ret;
+
+	WARN_ON(__rbd_is_lock_owner(rbd_dev));
+
+	format_lock_cookie(rbd_dev, cookie);
+	ret = ceph_cls_lock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
+			    RBD_LOCK_NAME, CEPH_CLS_LOCK_EXCLUSIVE, cookie,
+			    RBD_LOCK_TAG, "", 0);
+	if (ret)
+		return ret;
+
+	rbd_dev->lock_state = RBD_LOCK_STATE_LOCKED;
+	rbd_set_owner_cid(rbd_dev, &cid);
+	queue_work(rbd_dev->task_wq, &rbd_dev->acquired_lock_work);
+	return 0;
+}
+
+/*
+ * lock_rwsem must be held for write
+ */
+static int rbd_unlock(struct rbd_device *rbd_dev)
+{
+	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
+	char cookie[32];
+	int ret;
+
+	WARN_ON(!__rbd_is_lock_owner(rbd_dev));
+
+	rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED;
+
+	format_lock_cookie(rbd_dev, cookie);
+	ret = ceph_cls_unlock(osdc, &rbd_dev->header_oid, &rbd_dev->header_oloc,
+			      RBD_LOCK_NAME, cookie);
+	if (ret && ret != -ENOENT) {
+		rbd_warn(rbd_dev, "cls_unlock failed: %d", ret);
+		return ret;
+	}
+
+	rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
+	queue_work(rbd_dev->task_wq, &rbd_dev->released_lock_work);
+	return 0;
+}
+
+static int __rbd_notify_op_lock(struct rbd_device *rbd_dev,
+				enum rbd_notify_op notify_op,
+				struct page ***preply_pages,
+				size_t *preply_len)
+{
+	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
+	struct rbd_client_id cid = rbd_get_cid(rbd_dev);
+	int buf_size = 4 + 8 + 8 + CEPH_ENCODING_START_BLK_LEN;
+	char buf[buf_size];
+	void *p = buf;
+
+	dout("%s rbd_dev %p notify_op %d\n", __func__, rbd_dev, notify_op);
+
+	/* encode *LockPayload NotifyMessage (op + ClientId) */
+	ceph_start_encoding(&p, 2, 1, buf_size - CEPH_ENCODING_START_BLK_LEN);
+	ceph_encode_32(&p, notify_op);
+	ceph_encode_64(&p, cid.gid);
+	ceph_encode_64(&p, cid.handle);
+
+	return ceph_osdc_notify(osdc, &rbd_dev->header_oid,
+				&rbd_dev->header_oloc, buf, buf_size,
+				RBD_NOTIFY_TIMEOUT, preply_pages, preply_len);
+}
+
+static void rbd_notify_op_lock(struct rbd_device *rbd_dev,
+			       enum rbd_notify_op notify_op)
+{
+	struct page **reply_pages;
+	size_t reply_len;
+
+	__rbd_notify_op_lock(rbd_dev, notify_op, &reply_pages, &reply_len);
+	ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len));
+}
+
+static void rbd_notify_acquired_lock(struct work_struct *work)
+{
+	struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
+						  acquired_lock_work);
+
+	rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_ACQUIRED_LOCK);
+}
+
+static void rbd_notify_released_lock(struct work_struct *work)
+{
+	struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
+						  released_lock_work);
+
+	rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_RELEASED_LOCK);
+}
+
+static int rbd_request_lock(struct rbd_device *rbd_dev)
+{
+	struct page **reply_pages;
+	size_t reply_len;
+	bool lock_owner_responded = false;
+	int ret;
+
+	dout("%s rbd_dev %p\n", __func__, rbd_dev);
+
+	ret = __rbd_notify_op_lock(rbd_dev, RBD_NOTIFY_OP_REQUEST_LOCK,
+				   &reply_pages, &reply_len);
+	if (ret && ret != -ETIMEDOUT) {
+		rbd_warn(rbd_dev, "failed to request lock: %d", ret);
+		goto out;
+	}
+
+	if (reply_len > 0 && reply_len <= PAGE_SIZE) {
+		void *p = page_address(reply_pages[0]);
+		void *const end = p + reply_len;
+		u32 n;
+
+		ceph_decode_32_safe(&p, end, n, e_inval); /* num_acks */
+		while (n--) {
+			u8 struct_v;
+			u32 len;
+
+			ceph_decode_need(&p, end, 8 + 8, e_inval);
+			p += 8 + 8; /* skip gid and cookie */
+
+			ceph_decode_32_safe(&p, end, len, e_inval);
+			if (!len)
+				continue;
+
+			if (lock_owner_responded) {
+				rbd_warn(rbd_dev,
+					 "duplicate lock owners detected");
+				ret = -EIO;
+				goto out;
+			}
+
+			lock_owner_responded = true;
+			ret = ceph_start_decoding(&p, end, 1, "ResponseMessage",
+						  &struct_v, &len);
+			if (ret) {
+				rbd_warn(rbd_dev,
+					 "failed to decode ResponseMessage: %d",
+					 ret);
+				goto e_inval;
+			}
+
+			ret = ceph_decode_32(&p);
+		}
+	}
+
+	if (!lock_owner_responded) {
+		rbd_warn(rbd_dev, "no lock owners detected");
+		ret = -ETIMEDOUT;
+	}
+
+out:
+	ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len));
+	return ret;
+
+e_inval:
+	ret = -EINVAL;
+	goto out;
+}
+
+static void wake_requests(struct rbd_device *rbd_dev, bool wake_all)
+{
+	dout("%s rbd_dev %p wake_all %d\n", __func__, rbd_dev, wake_all);
+
+	cancel_delayed_work(&rbd_dev->lock_dwork);
+	if (wake_all)
+		wake_up_all(&rbd_dev->lock_waitq);
+	else
+		wake_up(&rbd_dev->lock_waitq);
+}
+
+static int get_lock_owner_info(struct rbd_device *rbd_dev,
+			       struct ceph_locker **lockers, u32 *num_lockers)
+{
+	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
+	u8 lock_type;
+	char *lock_tag;
+	int ret;
+
+	dout("%s rbd_dev %p\n", __func__, rbd_dev);
+
+	ret = ceph_cls_lock_info(osdc, &rbd_dev->header_oid,
+				 &rbd_dev->header_oloc, RBD_LOCK_NAME,
+				 &lock_type, &lock_tag, lockers, num_lockers);
+	if (ret)
+		return ret;
+
+	if (*num_lockers == 0) {
+		dout("%s rbd_dev %p no lockers detected\n", __func__, rbd_dev);
+		goto out;
+	}
+
+	if (strcmp(lock_tag, RBD_LOCK_TAG)) {
+		rbd_warn(rbd_dev, "locked by external mechanism, tag %s",
+			 lock_tag);
+		ret = -EBUSY;
+		goto out;
+	}
+
+	if (lock_type == CEPH_CLS_LOCK_SHARED) {
+		rbd_warn(rbd_dev, "shared lock type detected");
+		ret = -EBUSY;
+		goto out;
+	}
+
+	if (strncmp((*lockers)[0].id.cookie, RBD_LOCK_COOKIE_PREFIX,
+		    strlen(RBD_LOCK_COOKIE_PREFIX))) {
+		rbd_warn(rbd_dev, "locked by external mechanism, cookie %s",
+			 (*lockers)[0].id.cookie);
+		ret = -EBUSY;
+		goto out;
+	}
+
+out:
+	kfree(lock_tag);
+	return ret;
+}
+
+static int find_watcher(struct rbd_device *rbd_dev,
+			const struct ceph_locker *locker)
+{
+	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
+	struct ceph_watch_item *watchers;
+	u32 num_watchers;
+	u64 cookie;
+	int i;
+	int ret;
+
+	ret = ceph_osdc_list_watchers(osdc, &rbd_dev->header_oid,
+				      &rbd_dev->header_oloc, &watchers,
+				      &num_watchers);
+	if (ret)
+		return ret;
+
+	sscanf(locker->id.cookie, RBD_LOCK_COOKIE_PREFIX " %llu", &cookie);
+	for (i = 0; i < num_watchers; i++) {
+		if (!memcmp(&watchers[i].addr, &locker->info.addr,
+			    sizeof(locker->info.addr)) &&
+		    watchers[i].cookie == cookie) {
+			struct rbd_client_id cid = {
+				.gid = le64_to_cpu(watchers[i].name.num),
+				.handle = cookie,
+			};
+
+			dout("%s rbd_dev %p found cid %llu-%llu\n", __func__,
+			     rbd_dev, cid.gid, cid.handle);
+			rbd_set_owner_cid(rbd_dev, &cid);
+			ret = 1;
+			goto out;
+		}
+	}
+
+	dout("%s rbd_dev %p no watchers\n", __func__, rbd_dev);
+	ret = 0;
+out:
+	kfree(watchers);
+	return ret;
+}
+
+/*
+ * lock_rwsem must be held for write
+ */
+static int rbd_try_lock(struct rbd_device *rbd_dev)
+{
+	struct ceph_client *client = rbd_dev->rbd_client->client;
+	struct ceph_locker *lockers;
+	u32 num_lockers;
+	int ret;
+
+	for (;;) {
+		ret = rbd_lock(rbd_dev);
+		if (ret != -EBUSY)
+			return ret;
+
+		/* determine if the current lock holder is still alive */
+		ret = get_lock_owner_info(rbd_dev, &lockers, &num_lockers);
+		if (ret)
+			return ret;
+
+		if (num_lockers == 0)
+			goto again;
+
+		ret = find_watcher(rbd_dev, lockers);
+		if (ret) {
+			if (ret > 0)
+				ret = 0; /* have to request lock */
+			goto out;
+		}
+
+		rbd_warn(rbd_dev, "%s%llu seems dead, breaking lock",
+			 ENTITY_NAME(lockers[0].id.name));
+
+		ret = ceph_monc_blacklist_add(&client->monc,
+					      &lockers[0].info.addr);
+		if (ret) {
+			rbd_warn(rbd_dev, "blacklist of %s%llu failed: %d",
+				 ENTITY_NAME(lockers[0].id.name), ret);
+			goto out;
+		}
+
+		ret = ceph_cls_break_lock(&client->osdc, &rbd_dev->header_oid,
+					  &rbd_dev->header_oloc, RBD_LOCK_NAME,
+					  lockers[0].id.cookie,
+					  &lockers[0].id.name);
+		if (ret && ret != -ENOENT)
+			goto out;
+
+again:
+		ceph_free_lockers(lockers, num_lockers);
+	}
+
+out:
+	ceph_free_lockers(lockers, num_lockers);
+	return ret;
+}
+
+/*
+ * ret is set only if lock_state is RBD_LOCK_STATE_UNLOCKED
+ */
+static enum rbd_lock_state rbd_try_acquire_lock(struct rbd_device *rbd_dev,
+						int *pret)
+{
+	enum rbd_lock_state lock_state;
+
+	down_read(&rbd_dev->lock_rwsem);
+	dout("%s rbd_dev %p read lock_state %d\n", __func__, rbd_dev,
+	     rbd_dev->lock_state);
+	if (__rbd_is_lock_owner(rbd_dev)) {
+		lock_state = rbd_dev->lock_state;
+		up_read(&rbd_dev->lock_rwsem);
+		return lock_state;
+	}
+
+	up_read(&rbd_dev->lock_rwsem);
+	down_write(&rbd_dev->lock_rwsem);
+	dout("%s rbd_dev %p write lock_state %d\n", __func__, rbd_dev,
+	     rbd_dev->lock_state);
+	if (!__rbd_is_lock_owner(rbd_dev)) {
+		*pret = rbd_try_lock(rbd_dev);
+		if (*pret)
+			rbd_warn(rbd_dev, "failed to acquire lock: %d", *pret);
+	}
+
+	lock_state = rbd_dev->lock_state;
+	up_write(&rbd_dev->lock_rwsem);
+	return lock_state;
+}
+
+static void rbd_acquire_lock(struct work_struct *work)
+{
+	struct rbd_device *rbd_dev = container_of(to_delayed_work(work),
+					    struct rbd_device, lock_dwork);
+	enum rbd_lock_state lock_state;
+	int ret;
+
+	dout("%s rbd_dev %p\n", __func__, rbd_dev);
+again:
+	lock_state = rbd_try_acquire_lock(rbd_dev, &ret);
+	if (lock_state != RBD_LOCK_STATE_UNLOCKED || ret == -EBLACKLISTED) {
+		if (lock_state == RBD_LOCK_STATE_LOCKED)
+			wake_requests(rbd_dev, true);
+		dout("%s rbd_dev %p lock_state %d ret %d - done\n", __func__,
+		     rbd_dev, lock_state, ret);
+		return;
+	}
+
+	ret = rbd_request_lock(rbd_dev);
+	if (ret == -ETIMEDOUT) {
+		goto again; /* treat this as a dead client */
+	} else if (ret < 0) {
+		rbd_warn(rbd_dev, "error requesting lock: %d", ret);
+		mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork,
+				 RBD_RETRY_DELAY);
+	} else {
+		/*
+		 * lock owner acked, but resend if we don't see them
+		 * release the lock
+		 */
+		dout("%s rbd_dev %p requeueing lock_dwork\n", __func__,
+		     rbd_dev);
+		mod_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork,
+		    msecs_to_jiffies(2 * RBD_NOTIFY_TIMEOUT * MSEC_PER_SEC));
+	}
+}
+
+/*
+ * lock_rwsem must be held for write
+ */
+static bool rbd_release_lock(struct rbd_device *rbd_dev)
+{
+	dout("%s rbd_dev %p read lock_state %d\n", __func__, rbd_dev,
+	     rbd_dev->lock_state);
+	if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED)
+		return false;
+
+	rbd_dev->lock_state = RBD_LOCK_STATE_RELEASING;
+	downgrade_write(&rbd_dev->lock_rwsem);
+	/*
+	 * Ensure that all in-flight IO is flushed.
+	 *
+	 * FIXME: ceph_osdc_sync() flushes the entire OSD client, which
+	 * may be shared with other devices.
+	 */
+	ceph_osdc_sync(&rbd_dev->rbd_client->client->osdc);
+	up_read(&rbd_dev->lock_rwsem);
+
+	down_write(&rbd_dev->lock_rwsem);
+	dout("%s rbd_dev %p write lock_state %d\n", __func__, rbd_dev,
+	     rbd_dev->lock_state);
+	if (rbd_dev->lock_state != RBD_LOCK_STATE_RELEASING)
+		return false;
+
+	if (!rbd_unlock(rbd_dev))
+		/*
+		 * Give others a chance to grab the lock - we would re-acquire
+		 * almost immediately if we got new IO during ceph_osdc_sync()
+		 * otherwise.  We need to ack our own notifications, so this
+		 * lock_dwork will be requeued from rbd_wait_state_locked()
+		 * after wake_requests() in rbd_handle_released_lock().
+		 */
+		cancel_delayed_work(&rbd_dev->lock_dwork);
+
+	return true;
+}
+
+static void rbd_release_lock_work(struct work_struct *work)
+{
+	struct rbd_device *rbd_dev = container_of(work, struct rbd_device,
+						  unlock_work);
+
+	down_write(&rbd_dev->lock_rwsem);
+	rbd_release_lock(rbd_dev);
+	up_write(&rbd_dev->lock_rwsem);
+}
+
+static void rbd_handle_acquired_lock(struct rbd_device *rbd_dev, u8 struct_v,
+				     void **p)
+{
+	struct rbd_client_id cid = { 0 };
+
+	if (struct_v >= 2) {
+		cid.gid = ceph_decode_64(p);
+		cid.handle = ceph_decode_64(p);
+	}
+
+	dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
+	     cid.handle);
+	if (!rbd_cid_equal(&cid, &rbd_empty_cid)) {
+		down_write(&rbd_dev->lock_rwsem);
+		if (rbd_cid_equal(&cid, &rbd_dev->owner_cid)) {
+			/*
+			 * we already know that the remote client is
+			 * the owner
+			 */
+			up_write(&rbd_dev->lock_rwsem);
+			return;
+		}
+
+		rbd_set_owner_cid(rbd_dev, &cid);
+		downgrade_write(&rbd_dev->lock_rwsem);
+	} else {
+		down_read(&rbd_dev->lock_rwsem);
+	}
+
+	if (!__rbd_is_lock_owner(rbd_dev))
+		wake_requests(rbd_dev, false);
+	up_read(&rbd_dev->lock_rwsem);
+}
+
+static void rbd_handle_released_lock(struct rbd_device *rbd_dev, u8 struct_v,
+				     void **p)
+{
+	struct rbd_client_id cid = { 0 };
+
+	if (struct_v >= 2) {
+		cid.gid = ceph_decode_64(p);
+		cid.handle = ceph_decode_64(p);
+	}
+
+	dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
+	     cid.handle);
+	if (!rbd_cid_equal(&cid, &rbd_empty_cid)) {
+		down_write(&rbd_dev->lock_rwsem);
+		if (!rbd_cid_equal(&cid, &rbd_dev->owner_cid)) {
+			dout("%s rbd_dev %p unexpected owner, cid %llu-%llu != owner_cid %llu-%llu\n",
+			     __func__, rbd_dev, cid.gid, cid.handle,
+			     rbd_dev->owner_cid.gid, rbd_dev->owner_cid.handle);
+			up_write(&rbd_dev->lock_rwsem);
+			return;
+		}
+
+		rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
+		downgrade_write(&rbd_dev->lock_rwsem);
+	} else {
+		down_read(&rbd_dev->lock_rwsem);
+	}
+
+	if (!__rbd_is_lock_owner(rbd_dev))
+		wake_requests(rbd_dev, false);
+	up_read(&rbd_dev->lock_rwsem);
+}
+
+static bool rbd_handle_request_lock(struct rbd_device *rbd_dev, u8 struct_v,
+				    void **p)
+{
+	struct rbd_client_id my_cid = rbd_get_cid(rbd_dev);
+	struct rbd_client_id cid = { 0 };
+	bool need_to_send;
+
+	if (struct_v >= 2) {
+		cid.gid = ceph_decode_64(p);
+		cid.handle = ceph_decode_64(p);
+	}
+
+	dout("%s rbd_dev %p cid %llu-%llu\n", __func__, rbd_dev, cid.gid,
+	     cid.handle);
+	if (rbd_cid_equal(&cid, &my_cid))
+		return false;
+
+	down_read(&rbd_dev->lock_rwsem);
+	need_to_send = __rbd_is_lock_owner(rbd_dev);
+	if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED) {
+		if (!rbd_cid_equal(&rbd_dev->owner_cid, &rbd_empty_cid)) {
+			dout("%s rbd_dev %p queueing unlock_work\n", __func__,
+			     rbd_dev);
+			queue_work(rbd_dev->task_wq, &rbd_dev->unlock_work);
+		}
+	}
+	up_read(&rbd_dev->lock_rwsem);
+	return need_to_send;
+}
+
+static void __rbd_acknowledge_notify(struct rbd_device *rbd_dev,
+				     u64 notify_id, u64 cookie, s32 *result)
+{
+	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
+	int buf_size = 4 + CEPH_ENCODING_START_BLK_LEN;
+	char buf[buf_size];
+	int ret;
+
+	if (result) {
+		void *p = buf;
+
+		/* encode ResponseMessage */
+		ceph_start_encoding(&p, 1, 1,
+				    buf_size - CEPH_ENCODING_START_BLK_LEN);
+		ceph_encode_32(&p, *result);
+	} else {
+		buf_size = 0;
+	}
+
+	ret = ceph_osdc_notify_ack(osdc, &rbd_dev->header_oid,
+				   &rbd_dev->header_oloc, notify_id, cookie,
+				   buf, buf_size);
+	if (ret)
+		rbd_warn(rbd_dev, "acknowledge_notify failed: %d", ret);
+}
+
+static void rbd_acknowledge_notify(struct rbd_device *rbd_dev, u64 notify_id,
+				   u64 cookie)
+{
+	dout("%s rbd_dev %p\n", __func__, rbd_dev);
+	__rbd_acknowledge_notify(rbd_dev, notify_id, cookie, NULL);
+}
+
+static void rbd_acknowledge_notify_result(struct rbd_device *rbd_dev,
+					  u64 notify_id, u64 cookie, s32 result)
+{
+	dout("%s rbd_dev %p result %d\n", __func__, rbd_dev, result);
+	__rbd_acknowledge_notify(rbd_dev, notify_id, cookie, &result);
+}
+
 static void rbd_watch_cb(void *arg, u64 notify_id, u64 cookie,
 			 u64 notifier_id, void *data, size_t data_len)
 {
 	struct rbd_device *rbd_dev = arg;
-	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
+	void *p = data;
+	void *const end = p + data_len;
+	u8 struct_v;
+	u32 len;
+	u32 notify_op;
 	int ret;
 
-	dout("%s rbd_dev %p cookie %llu notify_id %llu\n", __func__, rbd_dev,
-	     cookie, notify_id);
+	dout("%s rbd_dev %p cookie %llu notify_id %llu data_len %zu\n",
+	     __func__, rbd_dev, cookie, notify_id, data_len);
+	if (data_len) {
+		ret = ceph_start_decoding(&p, end, 1, "NotifyMessage",
+					  &struct_v, &len);
+		if (ret) {
+			rbd_warn(rbd_dev, "failed to decode NotifyMessage: %d",
+				 ret);
+			return;
+		}
 
-	/*
-	 * Until adequate refresh error handling is in place, there is
-	 * not much we can do here, except warn.
-	 *
-	 * See http://tracker.ceph.com/issues/5040
-	 */
-	ret = rbd_dev_refresh(rbd_dev);
-	if (ret)
-		rbd_warn(rbd_dev, "refresh failed: %d", ret);
+		notify_op = ceph_decode_32(&p);
+	} else {
+		/* legacy notification for header updates */
+		notify_op = RBD_NOTIFY_OP_HEADER_UPDATE;
+		len = 0;
+	}
 
-	ret = ceph_osdc_notify_ack(osdc, &rbd_dev->header_oid,
-				   &rbd_dev->header_oloc, notify_id, cookie,
-				   NULL, 0);
-	if (ret)
-		rbd_warn(rbd_dev, "notify_ack ret %d", ret);
+	dout("%s rbd_dev %p notify_op %u\n", __func__, rbd_dev, notify_op);
+	switch (notify_op) {
+	case RBD_NOTIFY_OP_ACQUIRED_LOCK:
+		rbd_handle_acquired_lock(rbd_dev, struct_v, &p);
+		rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
+		break;
+	case RBD_NOTIFY_OP_RELEASED_LOCK:
+		rbd_handle_released_lock(rbd_dev, struct_v, &p);
+		rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
+		break;
+	case RBD_NOTIFY_OP_REQUEST_LOCK:
+		if (rbd_handle_request_lock(rbd_dev, struct_v, &p))
+			/*
+			 * send ResponseMessage(0) back so the client
+			 * can detect a missing owner
+			 */
+			rbd_acknowledge_notify_result(rbd_dev, notify_id,
+						      cookie, 0);
+		else
+			rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
+		break;
+	case RBD_NOTIFY_OP_HEADER_UPDATE:
+		ret = rbd_dev_refresh(rbd_dev);
+		if (ret)
+			rbd_warn(rbd_dev, "refresh failed: %d", ret);
+
+		rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
+		break;
+	default:
+		if (rbd_is_lock_owner(rbd_dev))
+			rbd_acknowledge_notify_result(rbd_dev, notify_id,
+						      cookie, -EOPNOTSUPP);
+		else
+			rbd_acknowledge_notify(rbd_dev, notify_id, cookie);
+		break;
+	}
 }
 
 static void __rbd_unregister_watch(struct rbd_device *rbd_dev);
@@ -3130,6 +3837,10 @@
 
 	rbd_warn(rbd_dev, "encountered watch error: %d", err);
 
+	down_write(&rbd_dev->lock_rwsem);
+	rbd_set_owner_cid(rbd_dev, &rbd_empty_cid);
+	up_write(&rbd_dev->lock_rwsem);
+
 	mutex_lock(&rbd_dev->watch_mutex);
 	if (rbd_dev->watch_state == RBD_WATCH_STATE_REGISTERED) {
 		__rbd_unregister_watch(rbd_dev);
@@ -3202,10 +3913,15 @@
 	dout("%s rbd_dev %p\n", __func__, rbd_dev);
 
 	cancel_delayed_work_sync(&rbd_dev->watch_dwork);
+	cancel_work_sync(&rbd_dev->acquired_lock_work);
+	cancel_work_sync(&rbd_dev->released_lock_work);
+	cancel_delayed_work_sync(&rbd_dev->lock_dwork);
+	cancel_work_sync(&rbd_dev->unlock_work);
 }
 
 static void rbd_unregister_watch(struct rbd_device *rbd_dev)
 {
+	WARN_ON(waitqueue_active(&rbd_dev->lock_waitq));
 	cancel_tasks_sync(rbd_dev);
 
 	mutex_lock(&rbd_dev->watch_mutex);
@@ -3221,10 +3937,15 @@
 {
 	struct rbd_device *rbd_dev = container_of(to_delayed_work(work),
 					    struct rbd_device, watch_dwork);
+	bool was_lock_owner = false;
 	int ret;
 
 	dout("%s rbd_dev %p\n", __func__, rbd_dev);
 
+	down_write(&rbd_dev->lock_rwsem);
+	if (rbd_dev->lock_state == RBD_LOCK_STATE_LOCKED)
+		was_lock_owner = rbd_release_lock(rbd_dev);
+
 	mutex_lock(&rbd_dev->watch_mutex);
 	if (rbd_dev->watch_state != RBD_WATCH_STATE_ERROR)
 		goto fail_unlock;
@@ -3247,10 +3968,20 @@
 	if (ret)
 		rbd_warn(rbd_dev, "reregisteration refresh failed: %d", ret);
 
+	if (was_lock_owner) {
+		ret = rbd_try_lock(rbd_dev);
+		if (ret)
+			rbd_warn(rbd_dev, "reregisteration lock failed: %d",
+				 ret);
+	}
+
+	up_write(&rbd_dev->lock_rwsem);
+	wake_requests(rbd_dev, true);
 	return;
 
 fail_unlock:
 	mutex_unlock(&rbd_dev->watch_mutex);
+	up_write(&rbd_dev->lock_rwsem);
 }
 
 /*
@@ -3340,6 +4071,29 @@
 	return ret;
 }
 
+/*
+ * lock_rwsem must be held for read
+ */
+static void rbd_wait_state_locked(struct rbd_device *rbd_dev)
+{
+	DEFINE_WAIT(wait);
+
+	do {
+		/*
+		 * Note the use of mod_delayed_work() in rbd_acquire_lock()
+		 * and cancel_delayed_work() in wake_requests().
+		 */
+		dout("%s rbd_dev %p queueing lock_dwork\n", __func__, rbd_dev);
+		queue_delayed_work(rbd_dev->task_wq, &rbd_dev->lock_dwork, 0);
+		prepare_to_wait_exclusive(&rbd_dev->lock_waitq, &wait,
+					  TASK_UNINTERRUPTIBLE);
+		up_read(&rbd_dev->lock_rwsem);
+		schedule();
+		down_read(&rbd_dev->lock_rwsem);
+	} while (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED);
+	finish_wait(&rbd_dev->lock_waitq, &wait);
+}
+
 static void rbd_queue_workfn(struct work_struct *work)
 {
 	struct request *rq = blk_mq_rq_from_pdu(work);
@@ -3350,6 +4104,7 @@
 	u64 length = blk_rq_bytes(rq);
 	enum obj_operation_type op_type;
 	u64 mapping_size;
+	bool must_be_locked = false;
 	int result;
 
 	if (rq->cmd_type != REQ_TYPE_FS) {
@@ -3411,6 +4166,7 @@
 	if (op_type != OBJ_OP_READ) {
 		snapc = rbd_dev->header.snapc;
 		ceph_get_snap_context(snapc);
+		must_be_locked = rbd_is_lock_supported(rbd_dev);
 	}
 	up_read(&rbd_dev->header_rwsem);
 
@@ -3421,11 +4177,17 @@
 		goto err_rq;
 	}
 
+	if (must_be_locked) {
+		down_read(&rbd_dev->lock_rwsem);
+		if (rbd_dev->lock_state != RBD_LOCK_STATE_LOCKED)
+			rbd_wait_state_locked(rbd_dev);
+	}
+
 	img_request = rbd_img_request_create(rbd_dev, offset, length, op_type,
 					     snapc);
 	if (!img_request) {
 		result = -ENOMEM;
-		goto err_rq;
+		goto err_unlock;
 	}
 	img_request->rq = rq;
 	snapc = NULL; /* img_request consumes a ref */
@@ -3443,10 +4205,15 @@
 	if (result)
 		goto err_img_request;
 
+	if (must_be_locked)
+		up_read(&rbd_dev->lock_rwsem);
 	return;
 
 err_img_request:
 	rbd_img_request_put(img_request);
+err_unlock:
+	if (must_be_locked)
+		up_read(&rbd_dev->lock_rwsem);
 err_rq:
 	if (result)
 		rbd_warn(rbd_dev, "%s %llx at %llx result %d",
@@ -4020,6 +4787,7 @@
 static void rbd_dev_free(struct rbd_device *rbd_dev)
 {
 	WARN_ON(rbd_dev->watch_state != RBD_WATCH_STATE_UNREGISTERED);
+	WARN_ON(rbd_dev->lock_state != RBD_LOCK_STATE_UNLOCKED);
 
 	ceph_oid_destroy(&rbd_dev->header_oid);
 	ceph_oloc_destroy(&rbd_dev->header_oloc);
@@ -4071,6 +4839,14 @@
 	rbd_dev->watch_state = RBD_WATCH_STATE_UNREGISTERED;
 	INIT_DELAYED_WORK(&rbd_dev->watch_dwork, rbd_reregister_watch);
 
+	init_rwsem(&rbd_dev->lock_rwsem);
+	rbd_dev->lock_state = RBD_LOCK_STATE_UNLOCKED;
+	INIT_WORK(&rbd_dev->acquired_lock_work, rbd_notify_acquired_lock);
+	INIT_WORK(&rbd_dev->released_lock_work, rbd_notify_released_lock);
+	INIT_DELAYED_WORK(&rbd_dev->lock_dwork, rbd_acquire_lock);
+	INIT_WORK(&rbd_dev->unlock_work, rbd_release_lock_work);
+	init_waitqueue_head(&rbd_dev->lock_waitq);
+
 	rbd_dev->dev.bus = &rbd_bus_type;
 	rbd_dev->dev.type = &rbd_device_type;
 	rbd_dev->dev.parent = &rbd_root_dev;
@@ -5553,6 +6329,10 @@
 	if (ret < 0 || already)
 		return ret;
 
+	down_write(&rbd_dev->lock_rwsem);
+	if (__rbd_is_lock_owner(rbd_dev))
+		rbd_unlock(rbd_dev);
+	up_write(&rbd_dev->lock_rwsem);
 	rbd_unregister_watch(rbd_dev);
 
 	/*
diff --git a/drivers/block/rbd_types.h b/drivers/block/rbd_types.h
index 49d77cb..94f367db2 100644
--- a/drivers/block/rbd_types.h
+++ b/drivers/block/rbd_types.h
@@ -28,6 +28,17 @@
 #define RBD_DATA_PREFIX        "rbd_data."
 #define RBD_ID_PREFIX          "rbd_id."
 
+#define RBD_LOCK_NAME          "rbd_lock"
+#define RBD_LOCK_TAG           "internal"
+#define RBD_LOCK_COOKIE_PREFIX "auto"
+
+enum rbd_notify_op {
+	RBD_NOTIFY_OP_ACQUIRED_LOCK      = 0,
+	RBD_NOTIFY_OP_RELEASED_LOCK      = 1,
+	RBD_NOTIFY_OP_REQUEST_LOCK       = 2,
+	RBD_NOTIFY_OP_HEADER_UPDATE      = 3,
+};
+
 /*
  * For format version 1, rbd image 'foo' consists of objects
  *   foo.rbd		- image metadata
diff --git a/net/ceph/ceph_strings.c b/net/ceph/ceph_strings.c
index 3773a4f..19b7d8a 100644
--- a/net/ceph/ceph_strings.c
+++ b/net/ceph/ceph_strings.c
@@ -15,6 +15,7 @@
 	default: return "unknown";
 	}
 }
+EXPORT_SYMBOL(ceph_entity_type_name);
 
 const char *ceph_osd_op_name(int op)
 {