ceph: generalize mon requests, add pool op support

Generalize the current statfs synchronous requests, and support pool_ops.

Signed-off-by: Yehuda Sadeh <yehuda@hq.newdream.net>
Signed-off-by: Sage Weil <sage@newdream.net>
diff --git a/fs/ceph/mon_client.c b/fs/ceph/mon_client.c
index 54fe01c..b2a5a3e 100644
--- a/fs/ceph/mon_client.c
+++ b/fs/ceph/mon_client.c
@@ -349,7 +349,7 @@
 }
 
 /*
- * statfs
+ * generic requests (e.g., statfs, poolop)
  */
 static struct ceph_mon_generic_request *__lookup_generic_req(
 	struct ceph_mon_client *monc, u64 tid)
@@ -442,6 +442,35 @@
 	return m;
 }
 
+static int do_generic_request(struct ceph_mon_client *monc,
+			      struct ceph_mon_generic_request *req)
+{
+	int err;
+
+	/* register request */
+	mutex_lock(&monc->mutex);
+	req->tid = ++monc->last_tid;
+	req->request->hdr.tid = cpu_to_le64(req->tid);
+	__insert_generic_request(monc, req);
+	monc->num_generic_requests++;
+	ceph_con_send(monc->con, ceph_msg_get(req->request));
+	mutex_unlock(&monc->mutex);
+
+	err = wait_for_completion_interruptible(&req->completion);
+
+	mutex_lock(&monc->mutex);
+	rb_erase(&req->node, &monc->generic_request_tree);
+	monc->num_generic_requests--;
+	mutex_unlock(&monc->mutex);
+
+	if (!err)
+		err = req->result;
+	return err;
+}
+
+/*
+ * statfs
+ */
 static void handle_statfs_reply(struct ceph_mon_client *monc,
 				struct ceph_msg *msg)
 {
@@ -468,7 +497,7 @@
 	return;
 
 bad:
-	pr_err("corrupt generic reply, no tid\n");
+	pr_err("corrupt generic reply, tid %llu\n", tid);
 	ceph_msg_dump(msg);
 }
 
@@ -487,6 +516,7 @@
 
 	kref_init(&req->kref);
 	req->buf = buf;
+	req->buf_len = sizeof(*buf);
 	init_completion(&req->completion);
 
 	err = -ENOMEM;
@@ -504,25 +534,7 @@
 	h->monhdr.session_mon_tid = 0;
 	h->fsid = monc->monmap->fsid;
 
-	/* register request */
-	mutex_lock(&monc->mutex);
-	req->tid = ++monc->last_tid;
-	req->request->hdr.tid = cpu_to_le64(req->tid);
-	__insert_generic_request(monc, req);
-	monc->num_generic_requests++;
-	mutex_unlock(&monc->mutex);
-
-	/* send request and wait */
-	ceph_con_send(monc->con, ceph_msg_get(req->request));
-	err = wait_for_completion_interruptible(&req->completion);
-
-	mutex_lock(&monc->mutex);
-	rb_erase(&req->node, &monc->generic_request_tree);
-	monc->num_generic_requests--;
-	mutex_unlock(&monc->mutex);
-
-	if (!err)
-		err = req->result;
+	err = do_generic_request(monc, req);
 
 out:
 	kref_put(&req->kref, release_generic_request);
@@ -530,7 +542,126 @@
 }
 
 /*
- * Resend pending statfs requests.
+ * pool ops
+ */
+static int get_poolop_reply_buf(const char *src, size_t src_len,
+				char *dst, size_t dst_len)
+{
+	u32 buf_len;
+
+	if (src_len != sizeof(u32) + dst_len)
+		return -EINVAL;
+
+	buf_len = le32_to_cpu(*(u32 *)src);
+	if (buf_len != dst_len)
+		return -EINVAL;
+
+	memcpy(dst, src + sizeof(u32), dst_len);
+	return 0;
+}
+
+static void handle_poolop_reply(struct ceph_mon_client *monc,
+				struct ceph_msg *msg)
+{
+	struct ceph_mon_generic_request *req;
+	struct ceph_mon_poolop_reply *reply = msg->front.iov_base;
+	u64 tid = le64_to_cpu(msg->hdr.tid);
+
+	if (msg->front.iov_len < sizeof(*reply))
+		goto bad;
+	dout("handle_poolop_reply %p tid %llu\n", msg, tid);
+
+	mutex_lock(&monc->mutex);
+	req = __lookup_generic_req(monc, tid);
+	if (req) {
+		if (req->buf_len &&
+		    get_poolop_reply_buf(msg->front.iov_base + sizeof(*reply),
+				     msg->front.iov_len - sizeof(*reply),
+				     req->buf, req->buf_len) < 0) {
+			mutex_unlock(&monc->mutex);
+			goto bad;
+		}
+		req->result = le32_to_cpu(reply->reply_code);
+		get_generic_request(req);
+	}
+	mutex_unlock(&monc->mutex);
+	if (req) {
+		complete(&req->completion);
+		put_generic_request(req);
+	}
+	return;
+
+bad:
+	pr_err("corrupt generic reply, tid %llu\n", tid);
+	ceph_msg_dump(msg);
+}
+
+/*
+ * Do a synchronous pool op.
+ */
+int ceph_monc_do_poolop(struct ceph_mon_client *monc, u32 op,
+			u32 pool, u64 snapid,
+			char *buf, int len)
+{
+	struct ceph_mon_generic_request *req;
+	struct ceph_mon_poolop *h;
+	int err;
+
+	req = kzalloc(sizeof(*req), GFP_NOFS);
+	if (!req)
+		return -ENOMEM;
+
+	kref_init(&req->kref);
+	req->buf = buf;
+	req->buf_len = len;
+	init_completion(&req->completion);
+
+	err = -ENOMEM;
+	req->request = ceph_msg_new(CEPH_MSG_POOLOP, sizeof(*h), GFP_NOFS);
+	if (!req->request)
+		goto out;
+	req->reply = ceph_msg_new(CEPH_MSG_POOLOP_REPLY, 1024, GFP_NOFS);
+	if (!req->reply)
+		goto out;
+
+	/* fill out request */
+	req->request->hdr.version = cpu_to_le16(2);
+	h = req->request->front.iov_base;
+	h->monhdr.have_version = 0;
+	h->monhdr.session_mon = cpu_to_le16(-1);
+	h->monhdr.session_mon_tid = 0;
+	h->fsid = monc->monmap->fsid;
+	h->pool = cpu_to_le32(pool);
+	h->op = cpu_to_le32(op);
+	h->auid = 0;
+	h->snapid = cpu_to_le64(snapid);
+	h->name_len = 0;
+
+	err = do_generic_request(monc, req);
+
+out:
+	kref_put(&req->kref, release_generic_request);
+	return err;
+}
+
+int ceph_monc_create_snapid(struct ceph_mon_client *monc,
+			    u32 pool, u64 *snapid)
+{
+	return ceph_monc_do_poolop(monc,  POOL_OP_CREATE_UNMANAGED_SNAP,
+				   pool, 0, (char *)snapid, sizeof(*snapid));
+
+}
+
+int ceph_monc_delete_snapid(struct ceph_mon_client *monc,
+			    u32 pool, u64 snapid)
+{
+	return ceph_monc_do_poolop(monc,  POOL_OP_CREATE_UNMANAGED_SNAP,
+				   pool, snapid, 0, 0);
+
+}
+
+/*
+ * Resend pending generic requests.
  */
 static void __resend_generic_request(struct ceph_mon_client *monc)
 {
@@ -783,6 +914,10 @@
 		handle_statfs_reply(monc, msg);
 		break;
 
+	case CEPH_MSG_POOLOP_REPLY:
+		handle_poolop_reply(monc, msg);
+		break;
+
 	case CEPH_MSG_MON_MAP:
 		ceph_monc_handle_map(monc, msg);
 		break;
@@ -820,6 +955,7 @@
 	case CEPH_MSG_MON_SUBSCRIBE_ACK:
 		m = ceph_msg_get(monc->m_subscribe_ack);
 		break;
+	case CEPH_MSG_POOLOP_REPLY:
 	case CEPH_MSG_STATFS_REPLY:
 		return get_generic_reply(con, hdr, skip);
 	case CEPH_MSG_AUTH_REPLY: