rbd: use watch/notify for changes in rbd header

Send notifications when we change the rbd header (e.g. create a snapshot)
and wait for such notifications.  This allows synchronizing the snapshot
creation between different rbd clients/rools.

Signed-off-by: Yehuda Sadeh <yehuda@hq.newdream.net>
Signed-off-by: Sage Weil <sage@newdream.net>
diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index e1e38b1..16dc364 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -31,6 +31,7 @@
 #include <linux/ceph/osd_client.h>
 #include <linux/ceph/mon_client.h>
 #include <linux/ceph/decode.h>
+#include <linux/parser.h>
 
 #include <linux/kernel.h>
 #include <linux/device.h>
@@ -54,6 +55,8 @@
 
 #define DEV_NAME_LEN		32
 
+#define RBD_NOTIFY_TIMEOUT_DEFAULT 10
+
 /*
  * block device image metadata (in-memory version)
  */
@@ -71,6 +74,12 @@
 
 	char *snap_names;
 	u64 *snap_sizes;
+
+	u64 obj_version;
+};
+
+struct rbd_options {
+	int	notify_timeout;
 };
 
 /*
@@ -78,6 +87,7 @@
  */
 struct rbd_client {
 	struct ceph_client	*client;
+	struct rbd_options	*rbd_opts;
 	struct kref		kref;
 	struct list_head	node;
 };
@@ -124,6 +134,9 @@
 	char			pool_name[RBD_MAX_POOL_NAME_LEN];
 	int			poolid;
 
+	struct ceph_osd_event   *watch_event;
+	struct ceph_osd_request *watch_request;
+
 	char                    snap_name[RBD_MAX_SNAP_NAME_LEN];
 	u32 cur_snap;	/* index+1 of current snapshot within snap context
 			   0 - for the head */
@@ -177,6 +190,8 @@
 	put_device(&rbd_dev->dev);
 }
 
+static int __rbd_update_snaps(struct rbd_device *rbd_dev);
+
 static int rbd_open(struct block_device *bdev, fmode_t mode)
 {
 	struct gendisk *disk = bdev->bd_disk;
@@ -211,7 +226,8 @@
  * Initialize an rbd client instance.
  * We own *opt.
  */
-static struct rbd_client *rbd_client_create(struct ceph_options *opt)
+static struct rbd_client *rbd_client_create(struct ceph_options *opt,
+					    struct rbd_options *rbd_opts)
 {
 	struct rbd_client *rbdc;
 	int ret = -ENOMEM;
@@ -233,6 +249,8 @@
 	if (ret < 0)
 		goto out_err;
 
+	rbdc->rbd_opts = rbd_opts;
+
 	spin_lock(&node_lock);
 	list_add_tail(&rbdc->node, &rbd_client_list);
 	spin_unlock(&node_lock);
@@ -267,6 +285,59 @@
 }
 
 /*
+ * mount options
+ */
+enum {
+	Opt_notify_timeout,
+	Opt_last_int,
+	/* int args above */
+	Opt_last_string,
+	/* string args above */
+};
+
+static match_table_t rbdopt_tokens = {
+	{Opt_notify_timeout, "notify_timeout=%d"},
+	/* int args above */
+	/* string args above */
+	{-1, NULL}
+};
+
+static int parse_rbd_opts_token(char *c, void *private)
+{
+	struct rbd_options *rbdopt = private;
+	substring_t argstr[MAX_OPT_ARGS];
+	int token, intval, ret;
+
+	token = match_token((char *)c, rbdopt_tokens, argstr);
+	if (token < 0)
+		return -EINVAL;
+
+	if (token < Opt_last_int) {
+		ret = match_int(&argstr[0], &intval);
+		if (ret < 0) {
+			pr_err("bad mount option arg (not int) "
+			       "at '%s'\n", c);
+			return ret;
+		}
+		dout("got int token %d val %d\n", token, intval);
+	} else if (token > Opt_last_int && token < Opt_last_string) {
+		dout("got string token %d val %s\n", token,
+		     argstr[0].from);
+	} else {
+		dout("got token %d\n", token);
+	}
+
+	switch (token) {
+	case Opt_notify_timeout:
+		rbdopt->notify_timeout = intval;
+		break;
+	default:
+		BUG_ON(token);
+	}
+	return 0;
+}
+
+/*
  * Get a ceph client with specific addr and configuration, if one does
  * not exist create it.
  */
@@ -276,11 +347,18 @@
 	struct rbd_client *rbdc;
 	struct ceph_options *opt;
 	int ret;
+	struct rbd_options *rbd_opts;
+
+	rbd_opts = kzalloc(sizeof(*rbd_opts), GFP_KERNEL);
+	if (!rbd_opts)
+		return -ENOMEM;
+
+	rbd_opts->notify_timeout = RBD_NOTIFY_TIMEOUT_DEFAULT;
 
 	ret = ceph_parse_options(&opt, options, mon_addr,
-				 mon_addr + strlen(mon_addr), NULL, NULL);
+				 mon_addr + strlen(mon_addr), parse_rbd_opts_token, rbd_opts);
 	if (ret < 0)
-		return ret;
+		goto done_err;
 
 	spin_lock(&node_lock);
 	rbdc = __rbd_client_find(opt);
@@ -296,13 +374,18 @@
 	}
 	spin_unlock(&node_lock);
 
-	rbdc = rbd_client_create(opt);
-	if (IS_ERR(rbdc))
-		return PTR_ERR(rbdc);
+	rbdc = rbd_client_create(opt, rbd_opts);
+	if (IS_ERR(rbdc)) {
+		ret = PTR_ERR(rbdc);
+		goto done_err;
+	}
 
 	rbd_dev->rbd_client = rbdc;
 	rbd_dev->client = rbdc->client;
 	return 0;
+done_err:
+	kfree(rbd_opts);
+	return ret;
 }
 
 /*
@@ -318,6 +401,7 @@
 	spin_unlock(&node_lock);
 
 	ceph_destroy_client(rbdc->client);
+	kfree(rbdc->rbd_opts);
 	kfree(rbdc);
 }
 
@@ -666,7 +750,9 @@
 			  struct ceph_osd_req_op *ops,
 			  int num_reply,
 			  void (*rbd_cb)(struct ceph_osd_request *req,
-					 struct ceph_msg *msg))
+					 struct ceph_msg *msg),
+			  struct ceph_osd_request **linger_req,
+			  u64 *ver)
 {
 	struct ceph_osd_request *req;
 	struct ceph_file_layout *layout;
@@ -729,12 +815,20 @@
 				req->r_oid, req->r_oid_len);
 	up_read(&header->snap_rwsem);
 
+	if (linger_req) {
+		ceph_osdc_set_request_linger(&dev->client->osdc, req);
+		*linger_req = req;
+	}
+
 	ret = ceph_osdc_start_request(&dev->client->osdc, req, false);
 	if (ret < 0)
 		goto done_err;
 
 	if (!rbd_cb) {
 		ret = ceph_osdc_wait_request(&dev->client->osdc, req);
+		if (ver)
+			*ver = le64_to_cpu(req->r_reassert_version.version);
+		dout("reassert_ver=%lld\n", le64_to_cpu(req->r_reassert_version.version));
 		ceph_osdc_put_request(req);
 	}
 	return ret;
@@ -789,6 +883,11 @@
 	kfree(req_data);
 }
 
+static void rbd_simple_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
+{
+	ceph_osdc_put_request(req);
+}
+
 /*
  * Do a synchronous ceph osd operation
  */
@@ -801,7 +900,9 @@
 			   int num_reply,
 			   const char *obj,
 			   u64 ofs, u64 len,
-			   char *buf)
+			   char *buf,
+			   struct ceph_osd_request **linger_req,
+			   u64 *ver)
 {
 	int ret;
 	struct page **pages;
@@ -833,7 +934,8 @@
 			  flags,
 			  ops,
 			  2,
-			  NULL);
+			  NULL,
+			  linger_req, ver);
 	if (ret < 0)
 		goto done_ops;
 
@@ -893,7 +995,7 @@
 			     flags,
 			     ops,
 			     num_reply,
-			     rbd_req_cb);
+			     rbd_req_cb, 0, NULL);
 done:
 	kfree(seg_name);
 	return ret;
@@ -940,18 +1042,174 @@
 			  u64 snapid,
 			  const char *obj,
 			  u64 ofs, u64 len,
-			  char *buf)
+			  char *buf,
+			  u64 *ver)
 {
 	return rbd_req_sync_op(dev, NULL,
 			       (snapid ? snapid : CEPH_NOSNAP),
 			       CEPH_OSD_OP_READ,
 			       CEPH_OSD_FLAG_READ,
 			       NULL,
-			       1, obj, ofs, len, buf);
+			       1, obj, ofs, len, buf, NULL, ver);
 }
 
 /*
- * Request sync osd read
+ * Request sync osd watch
+ */
+static int rbd_req_sync_notify_ack(struct rbd_device *dev,
+				   u64 ver,
+				   u64 notify_id,
+				   const char *obj)
+{
+	struct ceph_osd_req_op *ops;
+	struct page **pages = NULL;
+	int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY_ACK, 0);
+	if (ret < 0)
+		return ret;
+
+	ops[0].watch.ver = cpu_to_le64(dev->header.obj_version);
+	ops[0].watch.cookie = notify_id;
+	ops[0].watch.flag = 0;
+
+	ret = rbd_do_request(NULL, dev, NULL, CEPH_NOSNAP,
+			  obj, 0, 0, NULL,
+			  pages, 0,
+			  CEPH_OSD_FLAG_READ,
+			  ops,
+			  1,
+			  rbd_simple_req_cb, 0, NULL);
+
+	rbd_destroy_ops(ops);
+	return ret;
+}
+
+static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
+{
+	struct rbd_device *dev = (struct rbd_device *)data;
+	if (!dev)
+		return;
+
+	dout("rbd_watch_cb %s notify_id=%lld opcode=%d\n", dev->obj_md_name,
+		notify_id, (int)opcode);
+	mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
+	__rbd_update_snaps(dev);
+	mutex_unlock(&ctl_mutex);
+
+	rbd_req_sync_notify_ack(dev, ver, notify_id, dev->obj_md_name);
+}
+
+/*
+ * Request sync osd watch
+ */
+static int rbd_req_sync_watch(struct rbd_device *dev,
+			      const char *obj,
+			      u64 ver)
+{
+	struct ceph_osd_req_op *ops;
+	struct ceph_osd_client *osdc = &dev->client->osdc;
+
+	int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_WATCH, 0);
+	if (ret < 0)
+		return ret;
+
+	ret = ceph_osdc_create_event(osdc, rbd_watch_cb, 0,
+				     (void *)dev, &dev->watch_event);
+	if (ret < 0)
+		goto fail;
+
+	ops[0].watch.ver = cpu_to_le64(ver);
+	ops[0].watch.cookie = cpu_to_le64(dev->watch_event->cookie);
+	ops[0].watch.flag = 1;
+
+	ret = rbd_req_sync_op(dev, NULL,
+			      CEPH_NOSNAP,
+			      0,
+			      CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
+			      ops,
+			      1, obj, 0, 0, NULL,
+			      &dev->watch_request, NULL);
+
+	if (ret < 0)
+		goto fail_event;
+
+	rbd_destroy_ops(ops);
+	return 0;
+
+fail_event:
+	ceph_osdc_cancel_event(dev->watch_event);
+	dev->watch_event = NULL;
+fail:
+	rbd_destroy_ops(ops);
+	return ret;
+}
+
+struct rbd_notify_info {
+	struct rbd_device *dev;
+};
+
+static void rbd_notify_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
+{
+	struct rbd_device *dev = (struct rbd_device *)data;
+	if (!dev)
+		return;
+
+	dout("rbd_notify_cb %s notify_id=%lld opcode=%d\n", dev->obj_md_name,
+		notify_id, (int)opcode);
+}
+
+/*
+ * Request sync osd notify
+ */
+static int rbd_req_sync_notify(struct rbd_device *dev,
+		          const char *obj)
+{
+	struct ceph_osd_req_op *ops;
+	struct ceph_osd_client *osdc = &dev->client->osdc;
+	struct ceph_osd_event *event;
+	struct rbd_notify_info info;
+	int payload_len = sizeof(u32) + sizeof(u32);
+	int ret;
+
+	ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_NOTIFY, payload_len);
+	if (ret < 0)
+		return ret;
+
+	info.dev = dev;
+
+	ret = ceph_osdc_create_event(osdc, rbd_notify_cb, 1,
+				     (void *)&info, &event);
+	if (ret < 0)
+		goto fail;
+
+	ops[0].watch.ver = 1;
+	ops[0].watch.flag = 1;
+	ops[0].watch.cookie = event->cookie;
+	ops[0].watch.prot_ver = RADOS_NOTIFY_VER;
+	ops[0].watch.timeout = 12;
+
+	ret = rbd_req_sync_op(dev, NULL,
+			       CEPH_NOSNAP,
+			       0,
+			       CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
+			       ops,
+			       1, obj, 0, 0, NULL, NULL, NULL);
+	if (ret < 0)
+		goto fail_event;
+
+	ret = ceph_osdc_wait_event(event, CEPH_OSD_TIMEOUT_DEFAULT);
+	dout("ceph_osdc_wait_event returned %d\n", ret);
+	rbd_destroy_ops(ops);
+	return 0;
+
+fail_event:
+	ceph_osdc_cancel_event(event);
+fail:
+	rbd_destroy_ops(ops);
+	return ret;
+}
+
+/*
+ * Request sync osd rollback
  */
 static int rbd_req_sync_rollback_obj(struct rbd_device *dev,
 				     u64 snapid,
@@ -969,13 +1227,10 @@
 			       0,
 			       CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
 			       ops,
-			       1, obj, 0, 0, NULL);
+			       1, obj, 0, 0, NULL, NULL, NULL);
 
 	rbd_destroy_ops(ops);
 
-	if (ret < 0)
-		return ret;
-
 	return ret;
 }
 
@@ -987,7 +1242,8 @@
 			     const char *cls,
 			     const char *method,
 			     const char *data,
-			     int len)
+			     int len,
+			     u64 *ver)
 {
 	struct ceph_osd_req_op *ops;
 	int cls_len = strlen(cls);
@@ -1010,7 +1266,7 @@
 			       0,
 			       CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
 			       ops,
-			       1, obj, 0, 0, NULL);
+			       1, obj, 0, 0, NULL, NULL, ver);
 
 	rbd_destroy_ops(ops);
 
@@ -1156,6 +1412,7 @@
 	struct rbd_image_header_ondisk *dh;
 	int snap_count = 0;
 	u64 snap_names_len = 0;
+	u64 ver;
 
 	while (1) {
 		int len = sizeof(*dh) +
@@ -1171,7 +1428,7 @@
 				       NULL, CEPH_NOSNAP,
 				       rbd_dev->obj_md_name,
 				       0, len,
-				       (char *)dh);
+				       (char *)dh, &ver);
 		if (rc < 0)
 			goto out_dh;
 
@@ -1188,6 +1445,7 @@
 		}
 		break;
 	}
+	header->obj_version = ver;
 
 out_dh:
 	kfree(dh);
@@ -1205,6 +1463,7 @@
 	u64 new_snapid;
 	int ret;
 	void *data, *data_start, *data_end;
+	u64 ver;
 
 	/* we should create a snapshot only if we're pointing at the head */
 	if (dev->cur_snap)
@@ -1227,7 +1486,7 @@
 	ceph_encode_64_safe(&data, data_end, new_snapid, bad);
 
 	ret = rbd_req_sync_exec(dev, dev->obj_md_name, "rbd", "snap_add",
-				data_start, data - data_start);
+				data_start, data - data_start, &ver);
 
 	kfree(data_start);
 
@@ -1259,6 +1518,7 @@
 	int ret;
 	struct rbd_image_header h;
 	u64 snap_seq;
+	int follow_seq = 0;
 
 	ret = rbd_read_header(rbd_dev, &h);
 	if (ret < 0)
@@ -1267,6 +1527,11 @@
 	down_write(&rbd_dev->header.snap_rwsem);
 
 	snap_seq = rbd_dev->header.snapc->seq;
+	if (rbd_dev->header.total_snaps &&
+	    rbd_dev->header.snapc->snaps[0] == snap_seq)
+		/* pointing at the head, will need to follow that
+		   if head moves */
+		follow_seq = 1;
 
 	kfree(rbd_dev->header.snapc);
 	kfree(rbd_dev->header.snap_names);
@@ -1277,7 +1542,10 @@
 	rbd_dev->header.snap_names = h.snap_names;
 	rbd_dev->header.snap_names_len = h.snap_names_len;
 	rbd_dev->header.snap_sizes = h.snap_sizes;
-	rbd_dev->header.snapc->seq = snap_seq;
+	if (follow_seq)
+		rbd_dev->header.snapc->seq = rbd_dev->header.snapc->snaps[0];
+	else
+		rbd_dev->header.snapc->seq = snap_seq;
 
 	ret = __rbd_init_snaps_header(rbd_dev);
 
@@ -1699,7 +1967,28 @@
 	device_unregister(&rbd_dev->dev);
 }
 
-static ssize_t rbd_add(struct bus_type *bus, const char *buf, size_t count)
+static int rbd_init_watch_dev(struct rbd_device *rbd_dev)
+{
+	int ret, rc;
+
+	do {
+		ret = rbd_req_sync_watch(rbd_dev, rbd_dev->obj_md_name,
+					 rbd_dev->header.obj_version);
+		if (ret == -ERANGE) {
+			mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
+			rc = __rbd_update_snaps(rbd_dev);
+			mutex_unlock(&ctl_mutex);
+			if (rc < 0)
+				return rc;
+		}
+	} while (ret == -ERANGE);
+
+	return ret;
+}
+
+static ssize_t rbd_add(struct bus_type *bus,
+		       const char *buf,
+		       size_t count)
 {
 	struct ceph_osd_client *osdc;
 	struct rbd_device *rbd_dev;
@@ -1797,6 +2086,10 @@
 	if (rc)
 		goto err_out_bus;
 
+	rc = rbd_init_watch_dev(rbd_dev);
+	if (rc)
+		goto err_out_bus;
+
 	return count;
 
 err_out_bus:
@@ -1849,6 +2142,12 @@
 	struct rbd_device *rbd_dev =
 			container_of(dev, struct rbd_device, dev);
 
+	if (rbd_dev->watch_request)
+		ceph_osdc_unregister_linger_request(&rbd_dev->client->osdc,
+						    rbd_dev->watch_request);
+	if (rbd_dev->watch_event)
+		ceph_osdc_cancel_event(rbd_dev->watch_event);
+
 	rbd_put_client(rbd_dev);
 
 	/* clean up and free blkdev */
@@ -1914,14 +2213,24 @@
 	ret = rbd_header_add_snap(rbd_dev,
 				  name, GFP_KERNEL);
 	if (ret < 0)
-		goto done_unlock;
+		goto err_unlock;
 
 	ret = __rbd_update_snaps(rbd_dev);
 	if (ret < 0)
-		goto done_unlock;
+		goto err_unlock;
+
+	/* shouldn't hold ctl_mutex when notifying.. notify might
+	   trigger a watch callback that would need to get that mutex */
+	mutex_unlock(&ctl_mutex);
+
+	/* make a best effort, don't error if failed */
+	rbd_req_sync_notify(rbd_dev, rbd_dev->obj_md_name);
 
 	ret = count;
-done_unlock:
+	kfree(name);
+	return ret;
+
+err_unlock:
 	mutex_unlock(&ctl_mutex);
 	kfree(name);
 	return ret;