ceph: use rbtree for pg pools; decode new osdmap format
Since we can now create and destroy pg pools, the pool ids will be sparse,
and an array no longer makes sense for looking up by pool id. Use an
rbtree instead.
The OSDMap encoding also no longer has a max pool count (previously used to
allocate the array). There is a new pool_max, that is the largest pool id
we've ever used, although we don't actually need it in the client.
Signed-off-by: Sage Weil <sage@newdream.net>
diff --git a/fs/ceph/debugfs.c b/fs/ceph/debugfs.c
index 1a47b5c..e159f14 100644
--- a/fs/ceph/debugfs.c
+++ b/fs/ceph/debugfs.c
@@ -78,6 +78,7 @@
{
int i;
struct ceph_client *client = s->private;
+ struct rb_node *n;
if (client->osdc.osdmap == NULL)
return 0;
@@ -87,11 +88,11 @@
" NEARFULL" : "",
(client->osdc.osdmap->flags & CEPH_OSDMAP_FULL) ?
" FULL" : "");
- for (i = 0; i < client->osdc.osdmap->num_pools; i++) {
+ for (n = rb_first(&client->osdc.osdmap->pg_pools); n; n = rb_next(n)) {
struct ceph_pg_pool_info *pool =
- &client->osdc.osdmap->pg_pool[i];
+ rb_entry(n, struct ceph_pg_pool_info, node);
seq_printf(s, "pg_pool %d pg_num %d / %d, lpg_num %d / %d\n",
- i, pool->v.pg_num, pool->pg_num_mask,
+ pool->id, pool->v.pg_num, pool->pg_num_mask,
pool->v.lpg_num, pool->lpg_num_mask);
}
for (i = 0; i < client->osdc.osdmap->max_osd; i++) {
diff --git a/fs/ceph/osdmap.c b/fs/ceph/osdmap.c
index 443fdcd..34b5696 100644
--- a/fs/ceph/osdmap.c
+++ b/fs/ceph/osdmap.c
@@ -328,9 +328,15 @@
rb_erase(&pg->node, &map->pg_temp);
kfree(pg);
}
+ while (!RB_EMPTY_ROOT(&map->pg_pools)) {
+ struct ceph_pg_pool_info *pi =
+ rb_entry(rb_first(&map->pg_pools),
+ struct ceph_pg_pool_info, node);
+ rb_erase(&pi->node, &map->pg_pools);
+ kfree(pi);
+ }
kfree(map->osd_state);
kfree(map->osd_weight);
- kfree(map->pg_pool);
kfree(map->osd_addr);
kfree(map);
}
@@ -433,6 +439,48 @@
}
/*
+ * rbtree of pg pool info
+ */
+static int __insert_pg_pool(struct rb_root *root, struct ceph_pg_pool_info *new)
+{
+ struct rb_node **p = &root->rb_node;
+ struct rb_node *parent = NULL;
+ struct ceph_pg_pool_info *pi = NULL;
+
+ while (*p) {
+ parent = *p;
+ pi = rb_entry(parent, struct ceph_pg_pool_info, node);
+ if (new->id < pi->id)
+ p = &(*p)->rb_left;
+ else if (new->id > pi->id)
+ p = &(*p)->rb_right;
+ else
+ return -EEXIST;
+ }
+
+ rb_link_node(&new->node, parent, p);
+ rb_insert_color(&new->node, root);
+ return 0;
+}
+
+static struct ceph_pg_pool_info *__lookup_pg_pool(struct rb_root *root, int id)
+{
+ struct ceph_pg_pool_info *pi;
+ struct rb_node *n = root->rb_node;
+
+ while (n) {
+ pi = rb_entry(n, struct ceph_pg_pool_info, node);
+ if (id < pi->id)
+ n = n->rb_left;
+ else if (id > pi->id)
+ n = n->rb_right;
+ else
+ return pi;
+ }
+ return NULL;
+}
+
+/*
* decode a full map.
*/
struct ceph_osdmap *osdmap_decode(void **p, void *end)
@@ -443,6 +491,7 @@
u8 ev;
int err = -EINVAL;
void *start = *p;
+ struct ceph_pg_pool_info *pi;
dout("osdmap_decode %p to %p len %d\n", *p, end, (int)(end - *p));
@@ -464,32 +513,27 @@
ceph_decode_copy(p, &map->created, sizeof(map->created));
ceph_decode_copy(p, &map->modified, sizeof(map->modified));
- map->num_pools = ceph_decode_32(p);
- map->pg_pool = kcalloc(map->num_pools, sizeof(*map->pg_pool),
- GFP_NOFS);
- if (!map->pg_pool) {
- err = -ENOMEM;
- goto bad;
- }
ceph_decode_32_safe(p, end, max, bad);
while (max--) {
- ceph_decode_need(p, end, 4+1+sizeof(map->pg_pool->v), bad);
- i = ceph_decode_32(p);
- if (i >= map->num_pools)
+ ceph_decode_need(p, end, 4 + 1 + sizeof(pi->v), bad);
+ pi = kmalloc(sizeof(*pi), GFP_NOFS);
+ if (!pi)
goto bad;
+ pi->id = ceph_decode_32(p);
ev = ceph_decode_8(p); /* encoding version */
if (ev > CEPH_PG_POOL_VERSION) {
pr_warning("got unknown v %d > %d of ceph_pg_pool\n",
ev, CEPH_PG_POOL_VERSION);
goto bad;
}
- ceph_decode_copy(p, &map->pg_pool[i].v,
- sizeof(map->pg_pool->v));
- calc_pg_masks(&map->pg_pool[i]);
- p += le32_to_cpu(map->pg_pool[i].v.num_snaps) * sizeof(u64);
- p += le32_to_cpu(map->pg_pool[i].v.num_removed_snap_intervals)
+ ceph_decode_copy(p, &pi->v, sizeof(pi->v));
+ __insert_pg_pool(&map->pg_pools, pi);
+ calc_pg_masks(pi);
+ p += le32_to_cpu(pi->v.num_snaps) * sizeof(u64);
+ p += le32_to_cpu(pi->v.num_removed_snap_intervals)
* sizeof(u64) * 2;
}
+ ceph_decode_32_safe(p, end, map->pool_max, bad);
ceph_decode_32_safe(p, end, map->flags, bad);
@@ -581,7 +625,7 @@
u32 epoch = 0;
struct ceph_timespec modified;
u32 len, pool;
- __s32 new_flags, max;
+ __s32 new_pool_max, new_flags, max;
void *start = *p;
int err = -EINVAL;
u16 version;
@@ -600,6 +644,7 @@
epoch = ceph_decode_32(p);
BUG_ON(epoch != map->epoch+1);
ceph_decode_copy(p, &modified, sizeof(modified));
+ new_pool_max = ceph_decode_32(p);
new_flags = ceph_decode_32(p);
/* full map? */
@@ -623,6 +668,8 @@
/* new flags? */
if (new_flags >= 0)
map->flags = new_flags;
+ if (new_pool_max >= 0)
+ map->pool_max = new_pool_max;
ceph_decode_need(p, end, 5*sizeof(u32), bad);
@@ -647,37 +694,42 @@
ceph_decode_32_safe(p, end, len, bad);
while (len--) {
__u8 ev;
+ struct ceph_pg_pool_info *pi;
ceph_decode_32_safe(p, end, pool, bad);
- if (pool >= map->num_pools) {
- void *pg_pool = kcalloc(pool + 1,
- sizeof(*map->pg_pool),
- GFP_NOFS);
- if (!pg_pool) {
- err = -ENOMEM;
- goto bad;
- }
- memcpy(pg_pool, map->pg_pool,
- map->num_pools * sizeof(*map->pg_pool));
- kfree(map->pg_pool);
- map->pg_pool = pg_pool;
- map->num_pools = pool+1;
- }
- ceph_decode_need(p, end, 1 + sizeof(map->pg_pool->v), bad);
+ ceph_decode_need(p, end, 1 + sizeof(pi->v), bad);
ev = ceph_decode_8(p); /* encoding version */
if (ev > CEPH_PG_POOL_VERSION) {
pr_warning("got unknown v %d > %d of ceph_pg_pool\n",
ev, CEPH_PG_POOL_VERSION);
goto bad;
}
- ceph_decode_copy(p, &map->pg_pool[pool].v,
- sizeof(map->pg_pool->v));
- calc_pg_masks(&map->pg_pool[pool]);
+ pi = __lookup_pg_pool(&map->pg_pools, pool);
+ if (!pi) {
+ pi = kmalloc(sizeof(*pi), GFP_NOFS);
+ if (!pi) {
+ err = -ENOMEM;
+ goto bad;
+ }
+ pi->id = pool;
+ __insert_pg_pool(&map->pg_pools, pi);
+ }
+ ceph_decode_copy(p, &pi->v, sizeof(pi->v));
+ calc_pg_masks(pi);
}
- /* old_pool (ignore) */
+ /* old_pool */
ceph_decode_32_safe(p, end, len, bad);
- *p += len * sizeof(u32);
+ while (len--) {
+ struct ceph_pg_pool_info *pi;
+
+ ceph_decode_32_safe(p, end, pool, bad);
+ pi = __lookup_pg_pool(&map->pg_pools, pool);
+ if (pi) {
+ rb_erase(&pi->node, &map->pg_pools);
+ kfree(pi);
+ }
+ }
/* new_up */
err = -EINVAL;
@@ -861,10 +913,10 @@
unsigned ps;
BUG_ON(!osdmap);
- if (poolid >= osdmap->num_pools)
- return -EIO;
- pool = &osdmap->pg_pool[poolid];
+ pool = __lookup_pg_pool(&osdmap->pg_pools, poolid);
+ if (!pool)
+ return -EIO;
ps = ceph_str_hash(pool->v.object_hash, oid, strlen(oid));
if (preferred >= 0) {
ps += preferred;
@@ -919,9 +971,9 @@
preferred >= osdmap->crush->max_devices)
preferred = -1;
- if (poolid >= osdmap->num_pools)
+ pool = __lookup_pg_pool(&osdmap->pg_pools, poolid);
+ if (!pool)
return NULL;
- pool = &osdmap->pg_pool[poolid];
ruleno = crush_find_rule(osdmap->crush, pool->v.crush_ruleset,
pool->v.type, pool->v.size);
if (ruleno < 0) {
diff --git a/fs/ceph/osdmap.h b/fs/ceph/osdmap.h
index c4af841..1fb55af 100644
--- a/fs/ceph/osdmap.h
+++ b/fs/ceph/osdmap.h
@@ -19,6 +19,8 @@
* the change between two successive epochs, or as a fully encoded map.
*/
struct ceph_pg_pool_info {
+ struct rb_node node;
+ int id;
struct ceph_pg_pool v;
int pg_num_mask, pgp_num_mask, lpg_num_mask, lpgp_num_mask;
};
@@ -44,9 +46,8 @@
struct ceph_entity_addr *osd_addr;
struct rb_root pg_temp;
-
- u32 num_pools;
- struct ceph_pg_pool_info *pg_pool;
+ struct rb_root pg_pools;
+ u32 pool_max;
/* the CRUSH map specifies the mapping of placement groups to
* the list of osds that store+replicate them. */
diff --git a/fs/ceph/rados.h b/fs/ceph/rados.h
index 1f4c786..26ac8b8 100644
--- a/fs/ceph/rados.h
+++ b/fs/ceph/rados.h
@@ -11,8 +11,8 @@
/*
* osdmap encoding versions
*/
-#define CEPH_OSDMAP_INC_VERSION 3
-#define CEPH_OSDMAP_VERSION 3
+#define CEPH_OSDMAP_INC_VERSION 4
+#define CEPH_OSDMAP_VERSION 4
/*
* fs id