libceph: add lingering request and watch/notify event framework

Lingering requests are requests that are sent to the OSD normally but
tracked also after we get a successful request.  This keeps the OSD
connection open and resends the original request if the object moves to
another OSD.  The OSD can then send notification messages back to us
if another client initiates a notify.

This framework will be used by RBD so that the client gets notification
when a snapshot is created by another node or tool.

Signed-off-by: Yehuda Sadeh <yehuda@hq.newdream.net>
Signed-off-by: Sage Weil <sage@newdream.net>
diff --git a/include/linux/ceph/osd_client.h b/include/linux/ceph/osd_client.h
index e791b8e..f88eacb 100644
--- a/include/linux/ceph/osd_client.h
+++ b/include/linux/ceph/osd_client.h
@@ -32,6 +32,7 @@
 	struct rb_node o_node;
 	struct ceph_connection o_con;
 	struct list_head o_requests;
+	struct list_head o_linger_requests;
 	struct list_head o_osd_lru;
 	struct ceph_authorizer *o_authorizer;
 	void *o_authorizer_buf, *o_authorizer_reply_buf;
@@ -47,6 +48,8 @@
 	struct rb_node  r_node;
 	struct list_head r_req_lru_item;
 	struct list_head r_osd_item;
+	struct list_head r_linger_item;
+	struct list_head r_linger_osd;
 	struct ceph_osd *r_osd;
 	struct ceph_pg   r_pgid;
 	int              r_pg_osds[CEPH_PG_MAX_SIZE];
@@ -59,6 +62,7 @@
 	int               r_flags;     /* any additional flags for the osd */
 	u32               r_sent;      /* >0 if r_request is sending/sent */
 	int               r_got_reply;
+	int		  r_linger;
 
 	struct ceph_osd_client *r_osdc;
 	struct kref       r_kref;
@@ -89,6 +93,26 @@
 	struct ceph_pagelist *r_trail;	      /* trailing part of the data */
 };
 
+struct ceph_osd_event {
+	u64 cookie;
+	int one_shot;
+	struct ceph_osd_client *osdc;
+	void (*cb)(u64, u64, u8, void *);
+	void *data;
+	struct rb_node node;
+	struct list_head osd_node;
+	struct kref kref;
+	struct completion completion;
+};
+
+struct ceph_osd_event_work {
+	struct work_struct work;
+	struct ceph_osd_event *event;
+        u64 ver;
+        u64 notify_id;
+        u8 opcode;
+};
+
 struct ceph_osd_client {
 	struct ceph_client     *client;
 
@@ -106,6 +130,7 @@
 	struct list_head       req_lru;	      /* in-flight lru */
 	struct list_head       req_unsent;    /* unsent/need-resend queue */
 	struct list_head       req_notarget;  /* map to no osd */
+	struct list_head       req_linger;    /* lingering requests */
 	int                    num_requests;
 	struct delayed_work    timeout_work;
 	struct delayed_work    osds_timeout_work;
@@ -117,6 +142,12 @@
 
 	struct ceph_msgpool	msgpool_op;
 	struct ceph_msgpool	msgpool_op_reply;
+
+	spinlock_t		event_lock;
+	struct rb_root		event_tree;
+	u64			event_count;
+
+	struct workqueue_struct	*notify_wq;
 };
 
 struct ceph_osd_req_op {
@@ -151,6 +182,13 @@
 	        struct {
 		        u64 snapid;
 	        } snap;
+		struct {
+			u64 cookie;
+			u64 ver;
+			__u8 flag;
+			u32 prot_ver;
+			u32 timeout;
+		} watch;
 	};
 	u32 payload_len;
 };
@@ -199,6 +237,11 @@
 				      bool use_mempool, int num_reply,
 				      int page_align);
 
+extern void ceph_osdc_set_request_linger(struct ceph_osd_client *osdc,
+					 struct ceph_osd_request *req);
+extern void ceph_osdc_unregister_linger_request(struct ceph_osd_client *osdc,
+						struct ceph_osd_request *req);
+
 static inline void ceph_osdc_get_request(struct ceph_osd_request *req)
 {
 	kref_get(&req->r_kref);
@@ -234,5 +277,14 @@
 				struct page **pages, int nr_pages,
 				int flags, int do_sync, bool nofail);
 
+/* watch/notify events */
+extern int ceph_osdc_create_event(struct ceph_osd_client *osdc,
+				  void (*event_cb)(u64, u64, u8, void *),
+				  int one_shot, void *data,
+				  struct ceph_osd_event **pevent);
+extern void ceph_osdc_cancel_event(struct ceph_osd_event *event);
+extern int ceph_osdc_wait_event(struct ceph_osd_event *event,
+				unsigned long timeout);
+extern void ceph_osdc_put_event(struct ceph_osd_event *event);
 #endif