ceph: put unused osd connections on lru
Instead of removing osd connection immediately when the
requests list is empty, put the osd connection on an lru.
Only if that osd has not been used for more than a specified
time, will it be removed.
Signed-off-by: Yehuda Sadeh <yehuda@hq.newdream.net>
Signed-off-by: Sage Weil <sage@newdream.net>
diff --git a/fs/ceph/osd_client.c b/fs/ceph/osd_client.c
index 35c8afe..7f8a26f 100644
--- a/fs/ceph/osd_client.c
+++ b/fs/ceph/osd_client.c
@@ -389,6 +389,7 @@
atomic_set(&osd->o_ref, 1);
osd->o_osdc = osdc;
INIT_LIST_HEAD(&osd->o_requests);
+ INIT_LIST_HEAD(&osd->o_osd_lru);
osd->o_incarnation = 1;
ceph_con_init(osdc->client->msgr, &osd->o_con);
@@ -422,25 +423,56 @@
/*
* remove an osd from our map
*/
-static void remove_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
+static void __remove_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
{
- dout("remove_osd %p\n", osd);
+ dout("__remove_osd %p\n", osd);
BUG_ON(!list_empty(&osd->o_requests));
rb_erase(&osd->o_node, &osdc->osds);
+ list_del_init(&osd->o_osd_lru);
ceph_con_close(&osd->o_con);
put_osd(osd);
}
+static void __move_osd_to_lru(struct ceph_osd_client *osdc,
+ struct ceph_osd *osd)
+{
+ dout("__move_osd_to_lru %p\n", osd);
+ BUG_ON(!list_empty(&osd->o_osd_lru));
+ list_add_tail(&osd->o_osd_lru, &osdc->osd_lru);
+ osd->lru_ttl = jiffies + osdc->client->mount_args->osd_idle_ttl * HZ;
+}
+
+static void __remove_osd_from_lru(struct ceph_osd *osd)
+{
+ dout("__remove_osd_from_lru %p\n", osd);
+ if (!list_empty(&osd->o_osd_lru))
+ list_del_init(&osd->o_osd_lru);
+}
+
+static void remove_old_osds(struct ceph_osd_client *osdc, int remove_all)
+{
+ struct ceph_osd *osd, *nosd;
+
+ dout("__remove_old_osds %p\n", osdc);
+ mutex_lock(&osdc->request_mutex);
+ list_for_each_entry_safe(osd, nosd, &osdc->osd_lru, o_osd_lru) {
+ if (!remove_all && time_before(jiffies, osd->lru_ttl))
+ break;
+ __remove_osd(osdc, osd);
+ }
+ mutex_unlock(&osdc->request_mutex);
+}
+
/*
* reset osd connect
*/
-static int reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
+static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
{
int ret = 0;
- dout("reset_osd %p osd%d\n", osd, osd->o_osd);
+ dout("__reset_osd %p osd%d\n", osd, osd->o_osd);
if (list_empty(&osd->o_requests)) {
- remove_osd(osdc, osd);
+ __remove_osd(osdc, osd);
} else {
ceph_con_close(&osd->o_con);
ceph_con_open(&osd->o_con, &osdc->osdmap->osd_addr[osd->o_osd]);
@@ -533,7 +565,7 @@
list_del_init(&req->r_osd_item);
if (list_empty(&req->r_osd->o_requests))
- remove_osd(osdc, req->r_osd);
+ __move_osd_to_lru(osdc, req->r_osd);
req->r_osd = NULL;
}
@@ -611,7 +643,7 @@
if (list_empty(&req->r_osd->o_requests)) {
/* try to re-use r_osd if possible */
newosd = get_osd(req->r_osd);
- remove_osd(osdc, newosd);
+ __remove_osd(osdc, newosd);
}
req->r_osd = NULL;
}
@@ -636,8 +668,10 @@
ceph_con_open(&req->r_osd->o_con, &osdc->osdmap->osd_addr[o]);
}
- if (req->r_osd)
+ if (req->r_osd) {
+ __remove_osd_from_lru(req->r_osd);
list_add(&req->r_osd_item, &req->r_osd->o_requests);
+ }
err = 1; /* osd changed */
out:
@@ -744,6 +778,23 @@
up_read(&osdc->map_sem);
}
+static void handle_osds_timeout(struct work_struct *work)
+{
+ struct ceph_osd_client *osdc =
+ container_of(work, struct ceph_osd_client,
+ osds_timeout_work.work);
+ unsigned long delay =
+ osdc->client->mount_args->osd_idle_ttl * HZ >> 2;
+
+ dout("osds timeout\n");
+ down_read(&osdc->map_sem);
+ remove_old_osds(osdc, 0);
+ up_read(&osdc->map_sem);
+
+ schedule_delayed_work(&osdc->osds_timeout_work,
+ round_jiffies_relative(delay));
+}
+
/*
* handle osd op reply. either call the callback if it is specified,
* or do the completion to wake up the waiting thread.
@@ -881,7 +932,7 @@
ceph_osd_addr(osdc->osdmap,
osd->o_osd),
sizeof(struct ceph_entity_addr)) != 0)
- reset_osd(osdc, osd);
+ __reset_osd(osdc, osd);
}
}
@@ -1195,9 +1246,14 @@
osdc->timeout_tid = 0;
osdc->last_tid = 0;
osdc->osds = RB_ROOT;
+ INIT_LIST_HEAD(&osdc->osd_lru);
osdc->requests = RB_ROOT;
osdc->num_requests = 0;
INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout);
+ INIT_DELAYED_WORK(&osdc->osds_timeout_work, handle_osds_timeout);
+
+ schedule_delayed_work(&osdc->osds_timeout_work,
+ round_jiffies_relative(osdc->client->mount_args->osd_idle_ttl * HZ));
err = -ENOMEM;
osdc->req_mempool = mempool_create_kmalloc_pool(10,
@@ -1219,10 +1275,12 @@
void ceph_osdc_stop(struct ceph_osd_client *osdc)
{
cancel_delayed_work_sync(&osdc->timeout_work);
+ cancel_delayed_work_sync(&osdc->osds_timeout_work);
if (osdc->osdmap) {
ceph_osdmap_destroy(osdc->osdmap);
osdc->osdmap = NULL;
}
+ remove_old_osds(osdc, 1);
mempool_destroy(osdc->req_mempool);
ceph_msgpool_destroy(&osdc->msgpool_op);
}
diff --git a/fs/ceph/osd_client.h b/fs/ceph/osd_client.h
index 8d533d9..70f31b6 100644
--- a/fs/ceph/osd_client.h
+++ b/fs/ceph/osd_client.h
@@ -31,9 +31,11 @@
struct rb_node o_node;
struct ceph_connection o_con;
struct list_head o_requests;
+ struct list_head o_osd_lru;
struct ceph_authorizer *o_authorizer;
void *o_authorizer_buf, *o_authorizer_reply_buf;
size_t o_authorizer_buf_len, o_authorizer_reply_buf_len;
+ unsigned long lru_ttl;
};
/* an in-flight request */
@@ -90,11 +92,13 @@
struct mutex request_mutex;
struct rb_root osds; /* osds */
+ struct list_head osd_lru; /* idle osds */
u64 timeout_tid; /* tid of timeout triggering rq */
u64 last_tid; /* tid of last request */
struct rb_root requests; /* pending requests */
int num_requests;
struct delayed_work timeout_work;
+ struct delayed_work osds_timeout_work;
#ifdef CONFIG_DEBUG_FS
struct dentry *debugfs_file;
#endif
diff --git a/fs/ceph/super.c b/fs/ceph/super.c
index 3a25489..39aaf29 100644
--- a/fs/ceph/super.c
+++ b/fs/ceph/super.c
@@ -293,6 +293,7 @@
Opt_rsize,
Opt_osdtimeout,
Opt_mount_timeout,
+ Opt_osd_idle_ttl,
Opt_caps_wanted_delay_min,
Opt_caps_wanted_delay_max,
Opt_readdir_max_entries,
@@ -322,6 +323,7 @@
{Opt_rsize, "rsize=%d"},
{Opt_osdtimeout, "osdtimeout=%d"},
{Opt_mount_timeout, "mount_timeout=%d"},
+ {Opt_osd_idle_ttl, "osd_idle_ttl=%d"},
{Opt_caps_wanted_delay_min, "caps_wanted_delay_min=%d"},
{Opt_caps_wanted_delay_max, "caps_wanted_delay_max=%d"},
{Opt_readdir_max_entries, "readdir_max_entries=%d"},
@@ -367,6 +369,7 @@
args->flags = CEPH_OPT_DEFAULT;
args->osd_timeout = 5; /* seconds */
args->mount_timeout = CEPH_MOUNT_TIMEOUT_DEFAULT; /* seconds */
+ args->osd_idle_ttl = CEPH_OSD_IDLE_TTL_DEFAULT; /* seconds */
args->caps_wanted_delay_min = CEPH_CAPS_WANTED_DELAY_MIN_DEFAULT;
args->caps_wanted_delay_max = CEPH_CAPS_WANTED_DELAY_MAX_DEFAULT;
args->rsize = CEPH_MOUNT_RSIZE_DEFAULT;
diff --git a/fs/ceph/super.h b/fs/ceph/super.h
index 770f7b5..3930fb6 100644
--- a/fs/ceph/super.h
+++ b/fs/ceph/super.h
@@ -53,6 +53,7 @@
struct ceph_entity_addr *mon_addr;
int flags;
int mount_timeout;
+ int osd_idle_ttl;
int caps_wanted_delay_min, caps_wanted_delay_max;
struct ceph_fsid fsid;
struct ceph_entity_addr my_addr;
@@ -71,6 +72,7 @@
* defaults
*/
#define CEPH_MOUNT_TIMEOUT_DEFAULT 60
+#define CEPH_OSD_IDLE_TTL_DEFAULT 60
#define CEPH_MOUNT_RSIZE_DEFAULT (512*1024) /* readahead */
#define CEPH_MSG_MAX_FRONT_LEN (16*1024*1024)