libceph, rbd: ceph_osd_linger_request, watch/notify v2

This adds support and switches rbd to a new, more reliable version of
watch/notify protocol.  As with the OSD client update, this is mostly
about getting the right structures linked into the right places so that
reconnects are properly sent when needed.  watch/notify v2 also
requires sending regular pings to the OSDs - send_linger_ping().

A major change from the old watch/notify implementation is the
introduction of ceph_osd_linger_request - linger requests no longer
piggy back on ceph_osd_request.  ceph_osd_event has been merged into
ceph_osd_linger_request.

All the details are now hidden within libceph, the interface consists
of a simple pair of watch/unwatch functions and ceph_osdc_notify_ack().
ceph_osdc_watch() does return ceph_osd_linger_request, but only to keep
the lifetime management simple.

ceph_osdc_notify_ack() accepts an optional data payload, which is
relayed back to the notifier.

Portions of this patch are loosely based on work by Douglas Fuller
<dfuller@redhat.com> and Mike Christie <michaelc@cs.wisc.edu>.

Signed-off-by: Ilya Dryomov <idryomov@gmail.com>
diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index fce23dc..d0834c4 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -351,11 +351,11 @@
 	struct rbd_options	*opts;
 
 	struct ceph_object_id	header_oid;
+	struct ceph_object_locator header_oloc;
 
 	struct ceph_file_layout	layout;
 
-	struct ceph_osd_event   *watch_event;
-	struct rbd_obj_request	*watch_request;
+	struct ceph_osd_linger_request *watch_handle;
 
 	struct rbd_spec		*parent_spec;
 	u64			parent_overlap;
@@ -1596,12 +1596,6 @@
 	return __rbd_obj_request_wait(obj_request, 0);
 }
 
-static int rbd_obj_request_wait_timeout(struct rbd_obj_request *obj_request,
-					unsigned long timeout)
-{
-	return __rbd_obj_request_wait(obj_request, timeout);
-}
-
 static void rbd_img_request_complete(struct rbd_img_request *img_request)
 {
 
@@ -1751,12 +1745,6 @@
 		complete_all(&obj_request->completion);
 }
 
-static void rbd_osd_trivial_callback(struct rbd_obj_request *obj_request)
-{
-	dout("%s: obj %p\n", __func__, obj_request);
-	obj_request_done_set(obj_request);
-}
-
 static void rbd_osd_read_callback(struct rbd_obj_request *obj_request)
 {
 	struct rbd_img_request *img_request = NULL;
@@ -1877,10 +1865,6 @@
 	case CEPH_OSD_OP_CALL:
 		rbd_osd_call_callback(obj_request);
 		break;
-	case CEPH_OSD_OP_NOTIFY_ACK:
-	case CEPH_OSD_OP_WATCH:
-		rbd_osd_trivial_callback(obj_request);
-		break;
 	default:
 		rbd_warn(NULL, "%s: unsupported op %hu",
 			obj_request->object_name, (unsigned short) opcode);
@@ -3100,45 +3084,18 @@
 	obj_request_done_set(obj_request);
 }
 
-static int rbd_obj_notify_ack_sync(struct rbd_device *rbd_dev, u64 notify_id)
+static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev);
+static void __rbd_dev_header_unwatch_sync(struct rbd_device *rbd_dev);
+
+static void rbd_watch_cb(void *arg, u64 notify_id, u64 cookie,
+			 u64 notifier_id, void *data, size_t data_len)
 {
-	struct rbd_obj_request *obj_request;
+	struct rbd_device *rbd_dev = arg;
 	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
 	int ret;
 
-	obj_request = rbd_obj_request_create(rbd_dev->header_oid.name, 0, 0,
-							OBJ_REQUEST_NODATA);
-	if (!obj_request)
-		return -ENOMEM;
-
-	ret = -ENOMEM;
-	obj_request->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_READ, 1,
-						  obj_request);
-	if (!obj_request->osd_req)
-		goto out;
-
-	osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_NOTIFY_ACK,
-					notify_id, 0, 0);
-	rbd_osd_req_format_read(obj_request);
-
-	ret = rbd_obj_request_submit(osdc, obj_request);
-	if (ret)
-		goto out;
-	ret = rbd_obj_request_wait(obj_request);
-out:
-	rbd_obj_request_put(obj_request);
-
-	return ret;
-}
-
-static void rbd_watch_cb(u64 ver, u64 notify_id, u8 opcode, void *data)
-{
-	struct rbd_device *rbd_dev = (struct rbd_device *)data;
-	int ret;
-
-	dout("%s: \"%s\" notify_id %llu opcode %u\n", __func__,
-		rbd_dev->header_oid.name, (unsigned long long)notify_id,
-		(unsigned int)opcode);
+	dout("%s rbd_dev %p cookie %llu notify_id %llu\n", __func__, rbd_dev,
+	     cookie, notify_id);
 
 	/*
 	 * Until adequate refresh error handling is in place, there is
@@ -3150,63 +3107,31 @@
 	if (ret)
 		rbd_warn(rbd_dev, "refresh failed: %d", ret);
 
-	ret = rbd_obj_notify_ack_sync(rbd_dev, notify_id);
+	ret = ceph_osdc_notify_ack(osdc, &rbd_dev->header_oid,
+				   &rbd_dev->header_oloc, notify_id, cookie,
+				   NULL, 0);
 	if (ret)
 		rbd_warn(rbd_dev, "notify_ack ret %d", ret);
 }
 
-/*
- * Send a (un)watch request and wait for the ack.  Return a request
- * with a ref held on success or error.
- */
-static struct rbd_obj_request *rbd_obj_watch_request_helper(
-						struct rbd_device *rbd_dev,
-						bool watch)
+static void rbd_watch_errcb(void *arg, u64 cookie, int err)
 {
-	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
-	struct ceph_options *opts = osdc->client->options;
-	struct rbd_obj_request *obj_request;
+	struct rbd_device *rbd_dev = arg;
 	int ret;
 
-	obj_request = rbd_obj_request_create(rbd_dev->header_oid.name, 0, 0,
-					     OBJ_REQUEST_NODATA);
-	if (!obj_request)
-		return ERR_PTR(-ENOMEM);
+	rbd_warn(rbd_dev, "encountered watch error: %d", err);
 
-	obj_request->osd_req = rbd_osd_req_create(rbd_dev, OBJ_OP_WRITE, 1,
-						  obj_request);
-	if (!obj_request->osd_req) {
-		ret = -ENOMEM;
-		goto out;
-	}
+	__rbd_dev_header_unwatch_sync(rbd_dev);
 
-	osd_req_op_watch_init(obj_request->osd_req, 0, CEPH_OSD_OP_WATCH,
-			      rbd_dev->watch_event->cookie, 0, watch);
-	rbd_osd_req_format_write(obj_request);
-
-	if (watch)
-		ceph_osdc_set_request_linger(osdc, obj_request->osd_req);
-
-	ret = rbd_obj_request_submit(osdc, obj_request);
-	if (ret)
-		goto out;
-
-	ret = rbd_obj_request_wait_timeout(obj_request, opts->mount_timeout);
-	if (ret)
-		goto out;
-
-	ret = obj_request->result;
+	ret = rbd_dev_header_watch_sync(rbd_dev);
 	if (ret) {
-		if (watch)
-			rbd_obj_request_end(obj_request);
-		goto out;
+		rbd_warn(rbd_dev, "failed to reregister watch: %d", ret);
+		return;
 	}
 
-	return obj_request;
-
-out:
-	rbd_obj_request_put(obj_request);
-	return ERR_PTR(ret);
+	ret = rbd_dev_refresh(rbd_dev);
+	if (ret)
+		rbd_warn(rbd_dev, "reregisteration refresh failed: %d", ret);
 }
 
 /*
@@ -3215,57 +3140,33 @@
 static int rbd_dev_header_watch_sync(struct rbd_device *rbd_dev)
 {
 	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
-	struct rbd_obj_request *obj_request;
-	int ret;
+	struct ceph_osd_linger_request *handle;
 
-	rbd_assert(!rbd_dev->watch_event);
-	rbd_assert(!rbd_dev->watch_request);
+	rbd_assert(!rbd_dev->watch_handle);
 
-	ret = ceph_osdc_create_event(osdc, rbd_watch_cb, rbd_dev,
-				     &rbd_dev->watch_event);
-	if (ret < 0)
-		return ret;
+	handle = ceph_osdc_watch(osdc, &rbd_dev->header_oid,
+				 &rbd_dev->header_oloc, rbd_watch_cb,
+				 rbd_watch_errcb, rbd_dev);
+	if (IS_ERR(handle))
+		return PTR_ERR(handle);
 
-	obj_request = rbd_obj_watch_request_helper(rbd_dev, true);
-	if (IS_ERR(obj_request)) {
-		ceph_osdc_cancel_event(rbd_dev->watch_event);
-		rbd_dev->watch_event = NULL;
-		return PTR_ERR(obj_request);
-	}
-
-	/*
-	 * A watch request is set to linger, so the underlying osd
-	 * request won't go away until we unregister it.  We retain
-	 * a pointer to the object request during that time (in
-	 * rbd_dev->watch_request), so we'll keep a reference to it.
-	 * We'll drop that reference after we've unregistered it in
-	 * rbd_dev_header_unwatch_sync().
-	 */
-	rbd_dev->watch_request = obj_request;
-
+	rbd_dev->watch_handle = handle;
 	return 0;
 }
 
 static void __rbd_dev_header_unwatch_sync(struct rbd_device *rbd_dev)
 {
-	struct rbd_obj_request *obj_request;
+	struct ceph_osd_client *osdc = &rbd_dev->rbd_client->client->osdc;
+	int ret;
 
-	rbd_assert(rbd_dev->watch_event);
-	rbd_assert(rbd_dev->watch_request);
+	if (!rbd_dev->watch_handle)
+		return;
 
-	rbd_obj_request_end(rbd_dev->watch_request);
-	rbd_obj_request_put(rbd_dev->watch_request);
-	rbd_dev->watch_request = NULL;
+	ret = ceph_osdc_unwatch(osdc, rbd_dev->watch_handle);
+	if (ret)
+		rbd_warn(rbd_dev, "failed to unwatch: %d", ret);
 
-	obj_request = rbd_obj_watch_request_helper(rbd_dev, false);
-	if (!IS_ERR(obj_request))
-		rbd_obj_request_put(obj_request);
-	else
-		rbd_warn(rbd_dev, "unable to tear down watch request (%ld)",
-			 PTR_ERR(obj_request));
-
-	ceph_osdc_cancel_event(rbd_dev->watch_event);
-	rbd_dev->watch_event = NULL;
+	rbd_dev->watch_handle = NULL;
 }
 
 /*
@@ -4081,6 +3982,7 @@
 	init_rwsem(&rbd_dev->header_rwsem);
 
 	ceph_oid_init(&rbd_dev->header_oid);
+	ceph_oloc_init(&rbd_dev->header_oloc);
 
 	rbd_dev->dev.bus = &rbd_bus_type;
 	rbd_dev->dev.type = &rbd_device_type;
@@ -5285,6 +5187,7 @@
 
 	rbd_assert(rbd_image_format_valid(rbd_dev->image_format));
 
+	rbd_dev->header_oloc.pool = ceph_file_layout_pg_pool(rbd_dev->layout);
 	if (rbd_dev->image_format == 1)
 		ret = ceph_oid_aprintf(&rbd_dev->header_oid, GFP_KERNEL, "%s%s",
 				       spec->image_name, RBD_SUFFIX);