blob: 6ec9d53806c5e791b5d56fa0ae27dae0e903db02 [file] [log] [blame]
Yehuda Sadeh602adf42010-08-12 16:11:25 -07001/*
2 rbd.c -- Export ceph rados objects as a Linux block device
3
4
5 based on drivers/block/osdblk.c:
6
7 Copyright 2009 Red Hat, Inc.
8
9 This program is free software; you can redistribute it and/or modify
10 it under the terms of the GNU General Public License as published by
11 the Free Software Foundation.
12
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
17
18 You should have received a copy of the GNU General Public License
19 along with this program; see the file COPYING. If not, write to
20 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
21
22
23
24 Instructions for use
25 --------------------
26
27 1) Map a Linux block device to an existing rbd image.
28
29 Usage: <mon ip addr> <options> <pool name> <rbd image name> [snap name]
30
31 $ echo "192.168.0.1 name=admin rbd foo" > /sys/class/rbd/add
32
33 The snapshot name can be "-" or omitted to map the image read/write.
34
35 2) List all active blkdev<->object mappings.
36
37 In this example, we have performed step #1 twice, creating two blkdevs,
38 mapped to two separate rados objects in the rados rbd pool
39
40 $ cat /sys/class/rbd/list
41 #id major client_name pool name snap KB
42 0 254 client4143 rbd foo - 1024000
43
44 The columns, in order, are:
45 - blkdev unique id
46 - blkdev assigned major
47 - rados client id
48 - rados pool name
49 - rados block device name
50 - mapped snapshot ("-" if none)
51 - device size in KB
52
53
54 3) Create a snapshot.
55
56 Usage: <blkdev id> <snapname>
57
58 $ echo "0 mysnap" > /sys/class/rbd/snap_create
59
60
61 4) Listing a snapshot.
62
63 $ cat /sys/class/rbd/snaps_list
64 #id snap KB
65 0 - 1024000 (*)
66 0 foo 1024000
67
68 The columns, in order, are:
69 - blkdev unique id
70 - snapshot name, '-' means none (active read/write version)
71 - size of device at time of snapshot
72 - the (*) indicates this is the active version
73
74 5) Rollback to snapshot.
75
76 Usage: <blkdev id> <snapname>
77
78 $ echo "0 mysnap" > /sys/class/rbd/snap_rollback
79
80
81 6) Mapping an image using snapshot.
82
83 A snapshot mapping is read-only. This is being done by passing
84 snap=<snapname> to the options when adding a device.
85
86 $ echo "192.168.0.1 name=admin,snap=mysnap rbd foo" > /sys/class/rbd/add
87
88
89 7) Remove an active blkdev<->rbd image mapping.
90
91 In this example, we remove the mapping with blkdev unique id 1.
92
93 $ echo 1 > /sys/class/rbd/remove
94
95
96 NOTE: The actual creation and deletion of rados objects is outside the scope
97 of this driver.
98
99 */
100
101#include <linux/ceph/libceph.h>
102#include <linux/ceph/osd_client.h>
103#include <linux/ceph/mon_client.h>
104#include <linux/ceph/decode.h>
105
106#include <linux/kernel.h>
107#include <linux/device.h>
108#include <linux/module.h>
109#include <linux/fs.h>
110#include <linux/blkdev.h>
111
112#include "rbd_types.h"
113
114#define DRV_NAME "rbd"
115#define DRV_NAME_LONG "rbd (rados block device)"
116
117#define RBD_MINORS_PER_MAJOR 256 /* max minors per blkdev */
118
119#define RBD_MAX_MD_NAME_LEN (96 + sizeof(RBD_SUFFIX))
120#define RBD_MAX_POOL_NAME_LEN 64
121#define RBD_MAX_SNAP_NAME_LEN 32
122#define RBD_MAX_OPT_LEN 1024
123
124#define RBD_SNAP_HEAD_NAME "-"
125
126#define DEV_NAME_LEN 32
127
128/*
129 * block device image metadata (in-memory version)
130 */
131struct rbd_image_header {
132 u64 image_size;
133 char block_name[32];
134 __u8 obj_order;
135 __u8 crypt_type;
136 __u8 comp_type;
137 struct rw_semaphore snap_rwsem;
138 struct ceph_snap_context *snapc;
139 size_t snap_names_len;
140 u64 snap_seq;
141 u32 total_snaps;
142
143 char *snap_names;
144 u64 *snap_sizes;
145};
146
147/*
148 * an instance of the client. multiple devices may share a client.
149 */
150struct rbd_client {
151 struct ceph_client *client;
152 struct kref kref;
153 struct list_head node;
154};
155
156/*
157 * a single io request
158 */
159struct rbd_request {
160 struct request *rq; /* blk layer request */
161 struct bio *bio; /* cloned bio */
162 struct page **pages; /* list of used pages */
163 u64 len;
164};
165
166/*
167 * a single device
168 */
169struct rbd_device {
170 int id; /* blkdev unique id */
171
172 int major; /* blkdev assigned major */
173 struct gendisk *disk; /* blkdev's gendisk and rq */
174 struct request_queue *q;
175
176 struct ceph_client *client;
177 struct rbd_client *rbd_client;
178
179 char name[DEV_NAME_LEN]; /* blkdev name, e.g. rbd3 */
180
181 spinlock_t lock; /* queue lock */
182
183 struct rbd_image_header header;
184 char obj[RBD_MAX_OBJ_NAME_LEN]; /* rbd image name */
185 int obj_len;
186 char obj_md_name[RBD_MAX_MD_NAME_LEN]; /* hdr nm. */
187 char pool_name[RBD_MAX_POOL_NAME_LEN];
188 int poolid;
189
190 char snap_name[RBD_MAX_SNAP_NAME_LEN];
191 u32 cur_snap; /* index+1 of current snapshot within snap context
192 0 - for the head */
193 int read_only;
194
195 struct list_head node;
196};
197
198static spinlock_t node_lock; /* protects client get/put */
199
200static struct class *class_rbd; /* /sys/class/rbd */
201static DEFINE_MUTEX(ctl_mutex); /* Serialize open/close/setup/teardown */
202static LIST_HEAD(rbd_dev_list); /* devices */
203static LIST_HEAD(rbd_client_list); /* clients */
204
205
206static int rbd_open(struct block_device *bdev, fmode_t mode)
207{
208 struct gendisk *disk = bdev->bd_disk;
209 struct rbd_device *rbd_dev = disk->private_data;
210
211 set_device_ro(bdev, rbd_dev->read_only);
212
213 if ((mode & FMODE_WRITE) && rbd_dev->read_only)
214 return -EROFS;
215
216 return 0;
217}
218
219static const struct block_device_operations rbd_bd_ops = {
220 .owner = THIS_MODULE,
221 .open = rbd_open,
222};
223
224/*
225 * Initialize an rbd client instance.
226 * We own *opt.
227 */
228static struct rbd_client *rbd_client_create(struct ceph_options *opt)
229{
230 struct rbd_client *rbdc;
231 int ret = -ENOMEM;
232
233 dout("rbd_client_create\n");
234 rbdc = kmalloc(sizeof(struct rbd_client), GFP_KERNEL);
235 if (!rbdc)
236 goto out_opt;
237
238 kref_init(&rbdc->kref);
239 INIT_LIST_HEAD(&rbdc->node);
240
241 rbdc->client = ceph_create_client(opt, rbdc);
242 if (IS_ERR(rbdc->client))
243 goto out_rbdc;
Vasiliy Kulikov28f259b2010-09-26 12:59:37 +0400244 opt = NULL; /* Now rbdc->client is responsible for opt */
Yehuda Sadeh602adf42010-08-12 16:11:25 -0700245
246 ret = ceph_open_session(rbdc->client);
247 if (ret < 0)
248 goto out_err;
249
250 spin_lock(&node_lock);
251 list_add_tail(&rbdc->node, &rbd_client_list);
252 spin_unlock(&node_lock);
253
254 dout("rbd_client_create created %p\n", rbdc);
255 return rbdc;
256
257out_err:
258 ceph_destroy_client(rbdc->client);
Yehuda Sadeh602adf42010-08-12 16:11:25 -0700259out_rbdc:
260 kfree(rbdc);
261out_opt:
Vasiliy Kulikov28f259b2010-09-26 12:59:37 +0400262 if (opt)
263 ceph_destroy_options(opt);
264 return ERR_PTR(ret);
Yehuda Sadeh602adf42010-08-12 16:11:25 -0700265}
266
267/*
268 * Find a ceph client with specific addr and configuration.
269 */
270static struct rbd_client *__rbd_client_find(struct ceph_options *opt)
271{
272 struct rbd_client *client_node;
273
274 if (opt->flags & CEPH_OPT_NOSHARE)
275 return NULL;
276
277 list_for_each_entry(client_node, &rbd_client_list, node)
278 if (ceph_compare_options(opt, client_node->client) == 0)
279 return client_node;
280 return NULL;
281}
282
283/*
284 * Get a ceph client with specific addr and configuration, if one does
285 * not exist create it.
286 */
287static int rbd_get_client(struct rbd_device *rbd_dev, const char *mon_addr,
288 char *options)
289{
290 struct rbd_client *rbdc;
291 struct ceph_options *opt;
292 int ret;
293
294 ret = ceph_parse_options(&opt, options, mon_addr,
295 mon_addr + strlen(mon_addr), NULL, NULL);
296 if (ret < 0)
297 return ret;
298
299 spin_lock(&node_lock);
300 rbdc = __rbd_client_find(opt);
301 if (rbdc) {
302 ceph_destroy_options(opt);
303
304 /* using an existing client */
305 kref_get(&rbdc->kref);
306 rbd_dev->rbd_client = rbdc;
307 rbd_dev->client = rbdc->client;
308 spin_unlock(&node_lock);
309 return 0;
310 }
311 spin_unlock(&node_lock);
312
313 rbdc = rbd_client_create(opt);
314 if (IS_ERR(rbdc))
315 return PTR_ERR(rbdc);
316
317 rbd_dev->rbd_client = rbdc;
318 rbd_dev->client = rbdc->client;
319 return 0;
320}
321
322/*
323 * Destroy ceph client
324 */
325static void rbd_client_release(struct kref *kref)
326{
327 struct rbd_client *rbdc = container_of(kref, struct rbd_client, kref);
328
329 dout("rbd_release_client %p\n", rbdc);
330 spin_lock(&node_lock);
331 list_del(&rbdc->node);
332 spin_unlock(&node_lock);
333
334 ceph_destroy_client(rbdc->client);
335 kfree(rbdc);
336}
337
338/*
339 * Drop reference to ceph client node. If it's not referenced anymore, release
340 * it.
341 */
342static void rbd_put_client(struct rbd_device *rbd_dev)
343{
344 kref_put(&rbd_dev->rbd_client->kref, rbd_client_release);
345 rbd_dev->rbd_client = NULL;
346 rbd_dev->client = NULL;
347}
348
349
350/*
351 * Create a new header structure, translate header format from the on-disk
352 * header.
353 */
354static int rbd_header_from_disk(struct rbd_image_header *header,
355 struct rbd_image_header_ondisk *ondisk,
356 int allocated_snaps,
357 gfp_t gfp_flags)
358{
359 int i;
360 u32 snap_count = le32_to_cpu(ondisk->snap_count);
361 int ret = -ENOMEM;
362
363 init_rwsem(&header->snap_rwsem);
364
365 header->snap_names_len = le64_to_cpu(ondisk->snap_names_len);
366 header->snapc = kmalloc(sizeof(struct ceph_snap_context) +
367 snap_count *
368 sizeof(struct rbd_image_snap_ondisk),
369 gfp_flags);
370 if (!header->snapc)
371 return -ENOMEM;
372 if (snap_count) {
373 header->snap_names = kmalloc(header->snap_names_len,
374 GFP_KERNEL);
375 if (!header->snap_names)
376 goto err_snapc;
377 header->snap_sizes = kmalloc(snap_count * sizeof(u64),
378 GFP_KERNEL);
379 if (!header->snap_sizes)
380 goto err_names;
381 } else {
382 header->snap_names = NULL;
383 header->snap_sizes = NULL;
384 }
385 memcpy(header->block_name, ondisk->block_name,
386 sizeof(ondisk->block_name));
387
388 header->image_size = le64_to_cpu(ondisk->image_size);
389 header->obj_order = ondisk->options.order;
390 header->crypt_type = ondisk->options.crypt_type;
391 header->comp_type = ondisk->options.comp_type;
392
393 atomic_set(&header->snapc->nref, 1);
394 header->snap_seq = le64_to_cpu(ondisk->snap_seq);
395 header->snapc->num_snaps = snap_count;
396 header->total_snaps = snap_count;
397
398 if (snap_count &&
399 allocated_snaps == snap_count) {
400 for (i = 0; i < snap_count; i++) {
401 header->snapc->snaps[i] =
402 le64_to_cpu(ondisk->snaps[i].id);
403 header->snap_sizes[i] =
404 le64_to_cpu(ondisk->snaps[i].image_size);
405 }
406
407 /* copy snapshot names */
408 memcpy(header->snap_names, &ondisk->snaps[i],
409 header->snap_names_len);
410 }
411
412 return 0;
413
414err_names:
415 kfree(header->snap_names);
416err_snapc:
417 kfree(header->snapc);
418 return ret;
419}
420
421static int snap_index(struct rbd_image_header *header, int snap_num)
422{
423 return header->total_snaps - snap_num;
424}
425
426static u64 cur_snap_id(struct rbd_device *rbd_dev)
427{
428 struct rbd_image_header *header = &rbd_dev->header;
429
430 if (!rbd_dev->cur_snap)
431 return 0;
432
433 return header->snapc->snaps[snap_index(header, rbd_dev->cur_snap)];
434}
435
436static int snap_by_name(struct rbd_image_header *header, const char *snap_name,
437 u64 *seq, u64 *size)
438{
439 int i;
440 char *p = header->snap_names;
441
442 for (i = 0; i < header->total_snaps; i++, p += strlen(p) + 1) {
443 if (strcmp(snap_name, p) == 0)
444 break;
445 }
446 if (i == header->total_snaps)
447 return -ENOENT;
448 if (seq)
449 *seq = header->snapc->snaps[i];
450
451 if (size)
452 *size = header->snap_sizes[i];
453
454 return i;
455}
456
457static int rbd_header_set_snap(struct rbd_device *dev,
458 const char *snap_name,
459 u64 *size)
460{
461 struct rbd_image_header *header = &dev->header;
462 struct ceph_snap_context *snapc = header->snapc;
463 int ret = -ENOENT;
464
465 down_write(&header->snap_rwsem);
466
467 if (!snap_name ||
468 !*snap_name ||
469 strcmp(snap_name, "-") == 0 ||
470 strcmp(snap_name, RBD_SNAP_HEAD_NAME) == 0) {
471 if (header->total_snaps)
472 snapc->seq = header->snap_seq;
473 else
474 snapc->seq = 0;
475 dev->cur_snap = 0;
476 dev->read_only = 0;
477 if (size)
478 *size = header->image_size;
479 } else {
480 ret = snap_by_name(header, snap_name, &snapc->seq, size);
481 if (ret < 0)
482 goto done;
483
484 dev->cur_snap = header->total_snaps - ret;
485 dev->read_only = 1;
486 }
487
488 ret = 0;
489done:
490 up_write(&header->snap_rwsem);
491 return ret;
492}
493
494static void rbd_header_free(struct rbd_image_header *header)
495{
496 kfree(header->snapc);
497 kfree(header->snap_names);
498 kfree(header->snap_sizes);
499}
500
501/*
502 * get the actual striped segment name, offset and length
503 */
504static u64 rbd_get_segment(struct rbd_image_header *header,
505 const char *block_name,
506 u64 ofs, u64 len,
507 char *seg_name, u64 *segofs)
508{
509 u64 seg = ofs >> header->obj_order;
510
511 if (seg_name)
512 snprintf(seg_name, RBD_MAX_SEG_NAME_LEN,
513 "%s.%012llx", block_name, seg);
514
515 ofs = ofs & ((1 << header->obj_order) - 1);
516 len = min_t(u64, len, (1 << header->obj_order) - ofs);
517
518 if (segofs)
519 *segofs = ofs;
520
521 return len;
522}
523
524/*
525 * bio helpers
526 */
527
528static void bio_chain_put(struct bio *chain)
529{
530 struct bio *tmp;
531
532 while (chain) {
533 tmp = chain;
534 chain = chain->bi_next;
535 bio_put(tmp);
536 }
537}
538
539/*
540 * zeros a bio chain, starting at specific offset
541 */
542static void zero_bio_chain(struct bio *chain, int start_ofs)
543{
544 struct bio_vec *bv;
545 unsigned long flags;
546 void *buf;
547 int i;
548 int pos = 0;
549
550 while (chain) {
551 bio_for_each_segment(bv, chain, i) {
552 if (pos + bv->bv_len > start_ofs) {
553 int remainder = max(start_ofs - pos, 0);
554 buf = bvec_kmap_irq(bv, &flags);
555 memset(buf + remainder, 0,
556 bv->bv_len - remainder);
Dan Carpenter85b5aaa2010-10-11 21:15:11 +0200557 bvec_kunmap_irq(buf, &flags);
Yehuda Sadeh602adf42010-08-12 16:11:25 -0700558 }
559 pos += bv->bv_len;
560 }
561
562 chain = chain->bi_next;
563 }
564}
565
566/*
567 * bio_chain_clone - clone a chain of bios up to a certain length.
568 * might return a bio_pair that will need to be released.
569 */
570static struct bio *bio_chain_clone(struct bio **old, struct bio **next,
571 struct bio_pair **bp,
572 int len, gfp_t gfpmask)
573{
574 struct bio *tmp, *old_chain = *old, *new_chain = NULL, *tail = NULL;
575 int total = 0;
576
577 if (*bp) {
578 bio_pair_release(*bp);
579 *bp = NULL;
580 }
581
582 while (old_chain && (total < len)) {
583 tmp = bio_kmalloc(gfpmask, old_chain->bi_max_vecs);
584 if (!tmp)
585 goto err_out;
586
587 if (total + old_chain->bi_size > len) {
588 struct bio_pair *bp;
589
590 /*
591 * this split can only happen with a single paged bio,
592 * split_bio will BUG_ON if this is not the case
593 */
594 dout("bio_chain_clone split! total=%d remaining=%d"
595 "bi_size=%d\n",
596 (int)total, (int)len-total,
597 (int)old_chain->bi_size);
598
599 /* split the bio. We'll release it either in the next
600 call, or it will have to be released outside */
601 bp = bio_split(old_chain, (len - total) / 512ULL);
602 if (!bp)
603 goto err_out;
604
605 __bio_clone(tmp, &bp->bio1);
606
607 *next = &bp->bio2;
608 } else {
609 __bio_clone(tmp, old_chain);
610 *next = old_chain->bi_next;
611 }
612
613 tmp->bi_bdev = NULL;
614 gfpmask &= ~__GFP_WAIT;
615 tmp->bi_next = NULL;
616
617 if (!new_chain) {
618 new_chain = tail = tmp;
619 } else {
620 tail->bi_next = tmp;
621 tail = tmp;
622 }
623 old_chain = old_chain->bi_next;
624
625 total += tmp->bi_size;
626 }
627
628 BUG_ON(total < len);
629
630 if (tail)
631 tail->bi_next = NULL;
632
633 *old = old_chain;
634
635 return new_chain;
636
637err_out:
638 dout("bio_chain_clone with err\n");
639 bio_chain_put(new_chain);
640 return NULL;
641}
642
643/*
644 * helpers for osd request op vectors.
645 */
646static int rbd_create_rw_ops(struct ceph_osd_req_op **ops,
647 int num_ops,
648 int opcode,
649 u32 payload_len)
650{
651 *ops = kzalloc(sizeof(struct ceph_osd_req_op) * (num_ops + 1),
652 GFP_NOIO);
653 if (!*ops)
654 return -ENOMEM;
655 (*ops)[0].op = opcode;
656 /*
657 * op extent offset and length will be set later on
658 * in calc_raw_layout()
659 */
660 (*ops)[0].payload_len = payload_len;
661 return 0;
662}
663
664static void rbd_destroy_ops(struct ceph_osd_req_op *ops)
665{
666 kfree(ops);
667}
668
669/*
670 * Send ceph osd request
671 */
672static int rbd_do_request(struct request *rq,
673 struct rbd_device *dev,
674 struct ceph_snap_context *snapc,
675 u64 snapid,
676 const char *obj, u64 ofs, u64 len,
677 struct bio *bio,
678 struct page **pages,
679 int num_pages,
680 int flags,
681 struct ceph_osd_req_op *ops,
682 int num_reply,
683 void (*rbd_cb)(struct ceph_osd_request *req,
684 struct ceph_msg *msg))
685{
686 struct ceph_osd_request *req;
687 struct ceph_file_layout *layout;
688 int ret;
689 u64 bno;
690 struct timespec mtime = CURRENT_TIME;
691 struct rbd_request *req_data;
692 struct ceph_osd_request_head *reqhead;
693 struct rbd_image_header *header = &dev->header;
694
695 ret = -ENOMEM;
696 req_data = kzalloc(sizeof(*req_data), GFP_NOIO);
697 if (!req_data)
698 goto done;
699
700 dout("rbd_do_request len=%lld ofs=%lld\n", len, ofs);
701
702 down_read(&header->snap_rwsem);
703
704 req = ceph_osdc_alloc_request(&dev->client->osdc, flags,
705 snapc,
706 ops,
707 false,
708 GFP_NOIO, pages, bio);
709 if (IS_ERR(req)) {
710 up_read(&header->snap_rwsem);
711 ret = PTR_ERR(req);
712 goto done_pages;
713 }
714
715 req->r_callback = rbd_cb;
716
717 req_data->rq = rq;
718 req_data->bio = bio;
719 req_data->pages = pages;
720 req_data->len = len;
721
722 req->r_priv = req_data;
723
724 reqhead = req->r_request->front.iov_base;
725 reqhead->snapid = cpu_to_le64(CEPH_NOSNAP);
726
727 strncpy(req->r_oid, obj, sizeof(req->r_oid));
728 req->r_oid_len = strlen(req->r_oid);
729
730 layout = &req->r_file_layout;
731 memset(layout, 0, sizeof(*layout));
732 layout->fl_stripe_unit = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
733 layout->fl_stripe_count = cpu_to_le32(1);
734 layout->fl_object_size = cpu_to_le32(1 << RBD_MAX_OBJ_ORDER);
735 layout->fl_pg_preferred = cpu_to_le32(-1);
736 layout->fl_pg_pool = cpu_to_le32(dev->poolid);
737 ceph_calc_raw_layout(&dev->client->osdc, layout, snapid,
738 ofs, &len, &bno, req, ops);
739
740 ceph_osdc_build_request(req, ofs, &len,
741 ops,
742 snapc,
743 &mtime,
744 req->r_oid, req->r_oid_len);
745 up_read(&header->snap_rwsem);
746
747 ret = ceph_osdc_start_request(&dev->client->osdc, req, false);
748 if (ret < 0)
749 goto done_err;
750
751 if (!rbd_cb) {
752 ret = ceph_osdc_wait_request(&dev->client->osdc, req);
753 ceph_osdc_put_request(req);
754 }
755 return ret;
756
757done_err:
758 bio_chain_put(req_data->bio);
759 ceph_osdc_put_request(req);
760done_pages:
761 kfree(req_data);
762done:
763 if (rq)
764 blk_end_request(rq, ret, len);
765 return ret;
766}
767
768/*
769 * Ceph osd op callback
770 */
771static void rbd_req_cb(struct ceph_osd_request *req, struct ceph_msg *msg)
772{
773 struct rbd_request *req_data = req->r_priv;
774 struct ceph_osd_reply_head *replyhead;
775 struct ceph_osd_op *op;
776 __s32 rc;
777 u64 bytes;
778 int read_op;
779
780 /* parse reply */
781 replyhead = msg->front.iov_base;
782 WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
783 op = (void *)(replyhead + 1);
784 rc = le32_to_cpu(replyhead->result);
785 bytes = le64_to_cpu(op->extent.length);
786 read_op = (le32_to_cpu(op->op) == CEPH_OSD_OP_READ);
787
788 dout("rbd_req_cb bytes=%lld readop=%d rc=%d\n", bytes, read_op, rc);
789
790 if (rc == -ENOENT && read_op) {
791 zero_bio_chain(req_data->bio, 0);
792 rc = 0;
793 } else if (rc == 0 && read_op && bytes < req_data->len) {
794 zero_bio_chain(req_data->bio, bytes);
795 bytes = req_data->len;
796 }
797
798 blk_end_request(req_data->rq, rc, bytes);
799
800 if (req_data->bio)
801 bio_chain_put(req_data->bio);
802
803 ceph_osdc_put_request(req);
804 kfree(req_data);
805}
806
807/*
808 * Do a synchronous ceph osd operation
809 */
810static int rbd_req_sync_op(struct rbd_device *dev,
811 struct ceph_snap_context *snapc,
812 u64 snapid,
813 int opcode,
814 int flags,
815 struct ceph_osd_req_op *orig_ops,
816 int num_reply,
817 const char *obj,
818 u64 ofs, u64 len,
819 char *buf)
820{
821 int ret;
822 struct page **pages;
823 int num_pages;
824 struct ceph_osd_req_op *ops = orig_ops;
825 u32 payload_len;
826
827 num_pages = calc_pages_for(ofs , len);
828 pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
Dan Carpenterb8d06382010-10-11 21:14:23 +0200829 if (IS_ERR(pages))
830 return PTR_ERR(pages);
Yehuda Sadeh602adf42010-08-12 16:11:25 -0700831
832 if (!orig_ops) {
833 payload_len = (flags & CEPH_OSD_FLAG_WRITE ? len : 0);
834 ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
835 if (ret < 0)
836 goto done;
837
838 if ((flags & CEPH_OSD_FLAG_WRITE) && buf) {
839 ret = ceph_copy_to_page_vector(pages, buf, ofs, len);
840 if (ret < 0)
841 goto done_ops;
842 }
843 }
844
845 ret = rbd_do_request(NULL, dev, snapc, snapid,
846 obj, ofs, len, NULL,
847 pages, num_pages,
848 flags,
849 ops,
850 2,
851 NULL);
852 if (ret < 0)
853 goto done_ops;
854
855 if ((flags & CEPH_OSD_FLAG_READ) && buf)
856 ret = ceph_copy_from_page_vector(pages, buf, ofs, ret);
857
858done_ops:
859 if (!orig_ops)
860 rbd_destroy_ops(ops);
861done:
862 ceph_release_page_vector(pages, num_pages);
863 return ret;
864}
865
866/*
867 * Do an asynchronous ceph osd operation
868 */
869static int rbd_do_op(struct request *rq,
870 struct rbd_device *rbd_dev ,
871 struct ceph_snap_context *snapc,
872 u64 snapid,
873 int opcode, int flags, int num_reply,
874 u64 ofs, u64 len,
875 struct bio *bio)
876{
877 char *seg_name;
878 u64 seg_ofs;
879 u64 seg_len;
880 int ret;
881 struct ceph_osd_req_op *ops;
882 u32 payload_len;
883
884 seg_name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
885 if (!seg_name)
886 return -ENOMEM;
887
888 seg_len = rbd_get_segment(&rbd_dev->header,
889 rbd_dev->header.block_name,
890 ofs, len,
891 seg_name, &seg_ofs);
Yehuda Sadeh602adf42010-08-12 16:11:25 -0700892
893 payload_len = (flags & CEPH_OSD_FLAG_WRITE ? seg_len : 0);
894
895 ret = rbd_create_rw_ops(&ops, 1, opcode, payload_len);
896 if (ret < 0)
897 goto done;
898
899 /* we've taken care of segment sizes earlier when we
900 cloned the bios. We should never have a segment
901 truncated at this point */
902 BUG_ON(seg_len < len);
903
904 ret = rbd_do_request(rq, rbd_dev, snapc, snapid,
905 seg_name, seg_ofs, seg_len,
906 bio,
907 NULL, 0,
908 flags,
909 ops,
910 num_reply,
911 rbd_req_cb);
912done:
913 kfree(seg_name);
914 return ret;
915}
916
917/*
918 * Request async osd write
919 */
920static int rbd_req_write(struct request *rq,
921 struct rbd_device *rbd_dev,
922 struct ceph_snap_context *snapc,
923 u64 ofs, u64 len,
924 struct bio *bio)
925{
926 return rbd_do_op(rq, rbd_dev, snapc, CEPH_NOSNAP,
927 CEPH_OSD_OP_WRITE,
928 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
929 2,
930 ofs, len, bio);
931}
932
933/*
934 * Request async osd read
935 */
936static int rbd_req_read(struct request *rq,
937 struct rbd_device *rbd_dev,
938 u64 snapid,
939 u64 ofs, u64 len,
940 struct bio *bio)
941{
942 return rbd_do_op(rq, rbd_dev, NULL,
943 (snapid ? snapid : CEPH_NOSNAP),
944 CEPH_OSD_OP_READ,
945 CEPH_OSD_FLAG_READ,
946 2,
947 ofs, len, bio);
948}
949
950/*
951 * Request sync osd read
952 */
953static int rbd_req_sync_read(struct rbd_device *dev,
954 struct ceph_snap_context *snapc,
955 u64 snapid,
956 const char *obj,
957 u64 ofs, u64 len,
958 char *buf)
959{
960 return rbd_req_sync_op(dev, NULL,
961 (snapid ? snapid : CEPH_NOSNAP),
962 CEPH_OSD_OP_READ,
963 CEPH_OSD_FLAG_READ,
964 NULL,
965 1, obj, ofs, len, buf);
966}
967
968/*
969 * Request sync osd read
970 */
971static int rbd_req_sync_rollback_obj(struct rbd_device *dev,
972 u64 snapid,
973 const char *obj)
974{
975 struct ceph_osd_req_op *ops;
976 int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_ROLLBACK, 0);
977 if (ret < 0)
978 return ret;
979
980 ops[0].snap.snapid = snapid;
981
982 ret = rbd_req_sync_op(dev, NULL,
983 CEPH_NOSNAP,
984 0,
985 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
986 ops,
987 1, obj, 0, 0, NULL);
988
989 rbd_destroy_ops(ops);
990
991 if (ret < 0)
992 return ret;
993
994 return ret;
995}
996
997/*
998 * Request sync osd read
999 */
1000static int rbd_req_sync_exec(struct rbd_device *dev,
1001 const char *obj,
1002 const char *cls,
1003 const char *method,
1004 const char *data,
1005 int len)
1006{
1007 struct ceph_osd_req_op *ops;
1008 int cls_len = strlen(cls);
1009 int method_len = strlen(method);
1010 int ret = rbd_create_rw_ops(&ops, 1, CEPH_OSD_OP_CALL,
1011 cls_len + method_len + len);
1012 if (ret < 0)
1013 return ret;
1014
1015 ops[0].cls.class_name = cls;
1016 ops[0].cls.class_len = (__u8)cls_len;
1017 ops[0].cls.method_name = method;
1018 ops[0].cls.method_len = (__u8)method_len;
1019 ops[0].cls.argc = 0;
1020 ops[0].cls.indata = data;
1021 ops[0].cls.indata_len = len;
1022
1023 ret = rbd_req_sync_op(dev, NULL,
1024 CEPH_NOSNAP,
1025 0,
1026 CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_ONDISK,
1027 ops,
1028 1, obj, 0, 0, NULL);
1029
1030 rbd_destroy_ops(ops);
1031
1032 dout("cls_exec returned %d\n", ret);
1033 return ret;
1034}
1035
1036/*
1037 * block device queue callback
1038 */
1039static void rbd_rq_fn(struct request_queue *q)
1040{
1041 struct rbd_device *rbd_dev = q->queuedata;
1042 struct request *rq;
1043 struct bio_pair *bp = NULL;
1044
1045 rq = blk_fetch_request(q);
1046
1047 while (1) {
1048 struct bio *bio;
1049 struct bio *rq_bio, *next_bio = NULL;
1050 bool do_write;
1051 int size, op_size = 0;
1052 u64 ofs;
1053
1054 /* peek at request from block layer */
1055 if (!rq)
1056 break;
1057
1058 dout("fetched request\n");
1059
1060 /* filter out block requests we don't understand */
1061 if ((rq->cmd_type != REQ_TYPE_FS)) {
1062 __blk_end_request_all(rq, 0);
1063 goto next;
1064 }
1065
1066 /* deduce our operation (read, write) */
1067 do_write = (rq_data_dir(rq) == WRITE);
1068
1069 size = blk_rq_bytes(rq);
1070 ofs = blk_rq_pos(rq) * 512ULL;
1071 rq_bio = rq->bio;
1072 if (do_write && rbd_dev->read_only) {
1073 __blk_end_request_all(rq, -EROFS);
1074 goto next;
1075 }
1076
1077 spin_unlock_irq(q->queue_lock);
1078
1079 dout("%s 0x%x bytes at 0x%llx\n",
1080 do_write ? "write" : "read",
1081 size, blk_rq_pos(rq) * 512ULL);
1082
1083 do {
1084 /* a bio clone to be passed down to OSD req */
1085 dout("rq->bio->bi_vcnt=%d\n", rq->bio->bi_vcnt);
1086 op_size = rbd_get_segment(&rbd_dev->header,
1087 rbd_dev->header.block_name,
1088 ofs, size,
1089 NULL, NULL);
1090 bio = bio_chain_clone(&rq_bio, &next_bio, &bp,
1091 op_size, GFP_ATOMIC);
1092 if (!bio) {
1093 spin_lock_irq(q->queue_lock);
1094 __blk_end_request_all(rq, -ENOMEM);
1095 goto next;
1096 }
1097
1098 /* init OSD command: write or read */
1099 if (do_write)
1100 rbd_req_write(rq, rbd_dev,
1101 rbd_dev->header.snapc,
1102 ofs,
1103 op_size, bio);
1104 else
1105 rbd_req_read(rq, rbd_dev,
1106 cur_snap_id(rbd_dev),
1107 ofs,
1108 op_size, bio);
1109
1110 size -= op_size;
1111 ofs += op_size;
1112
1113 rq_bio = next_bio;
1114 } while (size > 0);
1115
1116 if (bp)
1117 bio_pair_release(bp);
1118
1119 spin_lock_irq(q->queue_lock);
1120next:
1121 rq = blk_fetch_request(q);
1122 }
1123}
1124
1125/*
1126 * a queue callback. Makes sure that we don't create a bio that spans across
1127 * multiple osd objects. One exception would be with a single page bios,
1128 * which we handle later at bio_chain_clone
1129 */
1130static int rbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bmd,
1131 struct bio_vec *bvec)
1132{
1133 struct rbd_device *rbd_dev = q->queuedata;
1134 unsigned int chunk_sectors = 1 << (rbd_dev->header.obj_order - 9);
1135 sector_t sector = bmd->bi_sector + get_start_sect(bmd->bi_bdev);
1136 unsigned int bio_sectors = bmd->bi_size >> 9;
1137 int max;
1138
1139 max = (chunk_sectors - ((sector & (chunk_sectors - 1))
1140 + bio_sectors)) << 9;
1141 if (max < 0)
1142 max = 0; /* bio_add cannot handle a negative return */
1143 if (max <= bvec->bv_len && bio_sectors == 0)
1144 return bvec->bv_len;
1145 return max;
1146}
1147
1148static void rbd_free_disk(struct rbd_device *rbd_dev)
1149{
1150 struct gendisk *disk = rbd_dev->disk;
1151
1152 if (!disk)
1153 return;
1154
1155 rbd_header_free(&rbd_dev->header);
1156
1157 if (disk->flags & GENHD_FL_UP)
1158 del_gendisk(disk);
1159 if (disk->queue)
1160 blk_cleanup_queue(disk->queue);
1161 put_disk(disk);
1162}
1163
1164/*
1165 * reload the ondisk the header
1166 */
1167static int rbd_read_header(struct rbd_device *rbd_dev,
1168 struct rbd_image_header *header)
1169{
1170 ssize_t rc;
1171 struct rbd_image_header_ondisk *dh;
1172 int snap_count = 0;
1173 u64 snap_names_len = 0;
1174
1175 while (1) {
1176 int len = sizeof(*dh) +
1177 snap_count * sizeof(struct rbd_image_snap_ondisk) +
1178 snap_names_len;
1179
1180 rc = -ENOMEM;
1181 dh = kmalloc(len, GFP_KERNEL);
1182 if (!dh)
1183 return -ENOMEM;
1184
1185 rc = rbd_req_sync_read(rbd_dev,
1186 NULL, CEPH_NOSNAP,
1187 rbd_dev->obj_md_name,
1188 0, len,
1189 (char *)dh);
1190 if (rc < 0)
1191 goto out_dh;
1192
1193 rc = rbd_header_from_disk(header, dh, snap_count, GFP_KERNEL);
1194 if (rc < 0)
1195 goto out_dh;
1196
1197 if (snap_count != header->total_snaps) {
1198 snap_count = header->total_snaps;
1199 snap_names_len = header->snap_names_len;
1200 rbd_header_free(header);
1201 kfree(dh);
1202 continue;
1203 }
1204 break;
1205 }
1206
1207out_dh:
1208 kfree(dh);
1209 return rc;
1210}
1211
1212/*
1213 * create a snapshot
1214 */
1215static int rbd_header_add_snap(struct rbd_device *dev,
1216 const char *snap_name,
1217 gfp_t gfp_flags)
1218{
1219 int name_len = strlen(snap_name);
1220 u64 new_snapid;
1221 int ret;
1222 void *data, *data_start, *data_end;
1223
1224 /* we should create a snapshot only if we're pointing at the head */
1225 if (dev->cur_snap)
1226 return -EINVAL;
1227
1228 ret = ceph_monc_create_snapid(&dev->client->monc, dev->poolid,
1229 &new_snapid);
1230 dout("created snapid=%lld\n", new_snapid);
1231 if (ret < 0)
1232 return ret;
1233
1234 data = kmalloc(name_len + 16, gfp_flags);
1235 if (!data)
1236 return -ENOMEM;
1237
1238 data_start = data;
1239 data_end = data + name_len + 16;
1240
1241 ceph_encode_string_safe(&data, data_end, snap_name, name_len, bad);
1242 ceph_encode_64_safe(&data, data_end, new_snapid, bad);
1243
1244 ret = rbd_req_sync_exec(dev, dev->obj_md_name, "rbd", "snap_add",
1245 data_start, data - data_start);
1246
1247 kfree(data_start);
1248
1249 if (ret < 0)
1250 return ret;
1251
1252 dev->header.snapc->seq = new_snapid;
1253
1254 return 0;
1255bad:
1256 return -ERANGE;
1257}
1258
1259/*
1260 * only read the first part of the ondisk header, without the snaps info
1261 */
1262static int rbd_update_snaps(struct rbd_device *rbd_dev)
1263{
1264 int ret;
1265 struct rbd_image_header h;
1266 u64 snap_seq;
1267
1268 ret = rbd_read_header(rbd_dev, &h);
1269 if (ret < 0)
1270 return ret;
1271
1272 down_write(&rbd_dev->header.snap_rwsem);
1273
1274 snap_seq = rbd_dev->header.snapc->seq;
1275
1276 kfree(rbd_dev->header.snapc);
1277 kfree(rbd_dev->header.snap_names);
1278 kfree(rbd_dev->header.snap_sizes);
1279
1280 rbd_dev->header.total_snaps = h.total_snaps;
1281 rbd_dev->header.snapc = h.snapc;
1282 rbd_dev->header.snap_names = h.snap_names;
1283 rbd_dev->header.snap_sizes = h.snap_sizes;
1284 rbd_dev->header.snapc->seq = snap_seq;
1285
1286 up_write(&rbd_dev->header.snap_rwsem);
1287
1288 return 0;
1289}
1290
1291static int rbd_init_disk(struct rbd_device *rbd_dev)
1292{
1293 struct gendisk *disk;
1294 struct request_queue *q;
1295 int rc;
1296 u64 total_size = 0;
1297
1298 /* contact OSD, request size info about the object being mapped */
1299 rc = rbd_read_header(rbd_dev, &rbd_dev->header);
1300 if (rc)
1301 return rc;
1302
1303 rc = rbd_header_set_snap(rbd_dev, rbd_dev->snap_name, &total_size);
1304 if (rc)
1305 return rc;
1306
1307 /* create gendisk info */
1308 rc = -ENOMEM;
1309 disk = alloc_disk(RBD_MINORS_PER_MAJOR);
1310 if (!disk)
1311 goto out;
1312
1313 sprintf(disk->disk_name, DRV_NAME "%d", rbd_dev->id);
1314 disk->major = rbd_dev->major;
1315 disk->first_minor = 0;
1316 disk->fops = &rbd_bd_ops;
1317 disk->private_data = rbd_dev;
1318
1319 /* init rq */
1320 rc = -ENOMEM;
1321 q = blk_init_queue(rbd_rq_fn, &rbd_dev->lock);
1322 if (!q)
1323 goto out_disk;
1324 blk_queue_merge_bvec(q, rbd_merge_bvec);
1325 disk->queue = q;
1326
1327 q->queuedata = rbd_dev;
1328
1329 rbd_dev->disk = disk;
1330 rbd_dev->q = q;
1331
1332 /* finally, announce the disk to the world */
1333 set_capacity(disk, total_size / 512ULL);
1334 add_disk(disk);
1335
1336 pr_info("%s: added with size 0x%llx\n",
1337 disk->disk_name, (unsigned long long)total_size);
1338 return 0;
1339
1340out_disk:
1341 put_disk(disk);
1342out:
1343 return rc;
1344}
1345
1346/********************************************************************
1347 * /sys/class/rbd/
1348 * add map rados objects to blkdev
1349 * remove unmap rados objects
1350 * list show mappings
1351 *******************************************************************/
1352
1353static void class_rbd_release(struct class *cls)
1354{
1355 kfree(cls);
1356}
1357
1358static ssize_t class_rbd_list(struct class *c,
1359 struct class_attribute *attr,
1360 char *data)
1361{
1362 int n = 0;
1363 struct list_head *tmp;
1364 int max = PAGE_SIZE;
1365
1366 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1367
1368 n += snprintf(data, max,
1369 "#id\tmajor\tclient_name\tpool\tname\tsnap\tKB\n");
1370
1371 list_for_each(tmp, &rbd_dev_list) {
1372 struct rbd_device *rbd_dev;
1373
1374 rbd_dev = list_entry(tmp, struct rbd_device, node);
1375 n += snprintf(data+n, max-n,
1376 "%d\t%d\tclient%lld\t%s\t%s\t%s\t%lld\n",
1377 rbd_dev->id,
1378 rbd_dev->major,
1379 ceph_client_id(rbd_dev->client),
1380 rbd_dev->pool_name,
1381 rbd_dev->obj, rbd_dev->snap_name,
1382 rbd_dev->header.image_size >> 10);
1383 if (n == max)
1384 break;
1385 }
1386
1387 mutex_unlock(&ctl_mutex);
1388 return n;
1389}
1390
1391static ssize_t class_rbd_add(struct class *c,
1392 struct class_attribute *attr,
1393 const char *buf, size_t count)
1394{
1395 struct ceph_osd_client *osdc;
1396 struct rbd_device *rbd_dev;
1397 ssize_t rc = -ENOMEM;
1398 int irc, new_id = 0;
1399 struct list_head *tmp;
1400 char *mon_dev_name;
1401 char *options;
1402
1403 if (!try_module_get(THIS_MODULE))
1404 return -ENODEV;
1405
1406 mon_dev_name = kmalloc(RBD_MAX_OPT_LEN, GFP_KERNEL);
1407 if (!mon_dev_name)
1408 goto err_out_mod;
1409
1410 options = kmalloc(RBD_MAX_OPT_LEN, GFP_KERNEL);
1411 if (!options)
1412 goto err_mon_dev;
1413
1414 /* new rbd_device object */
1415 rbd_dev = kzalloc(sizeof(*rbd_dev), GFP_KERNEL);
1416 if (!rbd_dev)
1417 goto err_out_opt;
1418
1419 /* static rbd_device initialization */
1420 spin_lock_init(&rbd_dev->lock);
1421 INIT_LIST_HEAD(&rbd_dev->node);
1422
1423 /* generate unique id: find highest unique id, add one */
1424 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1425
1426 list_for_each(tmp, &rbd_dev_list) {
1427 struct rbd_device *rbd_dev;
1428
1429 rbd_dev = list_entry(tmp, struct rbd_device, node);
1430 if (rbd_dev->id >= new_id)
1431 new_id = rbd_dev->id + 1;
1432 }
1433
1434 rbd_dev->id = new_id;
1435
1436 /* add to global list */
1437 list_add_tail(&rbd_dev->node, &rbd_dev_list);
1438
1439 /* parse add command */
1440 if (sscanf(buf, "%" __stringify(RBD_MAX_OPT_LEN) "s "
1441 "%" __stringify(RBD_MAX_OPT_LEN) "s "
1442 "%" __stringify(RBD_MAX_POOL_NAME_LEN) "s "
1443 "%" __stringify(RBD_MAX_OBJ_NAME_LEN) "s"
1444 "%" __stringify(RBD_MAX_SNAP_NAME_LEN) "s",
1445 mon_dev_name, options, rbd_dev->pool_name,
1446 rbd_dev->obj, rbd_dev->snap_name) < 4) {
1447 rc = -EINVAL;
1448 goto err_out_slot;
1449 }
1450
1451 if (rbd_dev->snap_name[0] == 0)
1452 rbd_dev->snap_name[0] = '-';
1453
1454 rbd_dev->obj_len = strlen(rbd_dev->obj);
1455 snprintf(rbd_dev->obj_md_name, sizeof(rbd_dev->obj_md_name), "%s%s",
1456 rbd_dev->obj, RBD_SUFFIX);
1457
1458 /* initialize rest of new object */
1459 snprintf(rbd_dev->name, DEV_NAME_LEN, DRV_NAME "%d", rbd_dev->id);
1460 rc = rbd_get_client(rbd_dev, mon_dev_name, options);
1461 if (rc < 0)
1462 goto err_out_slot;
1463
1464 mutex_unlock(&ctl_mutex);
1465
1466 /* pick the pool */
1467 osdc = &rbd_dev->client->osdc;
1468 rc = ceph_pg_poolid_by_name(osdc->osdmap, rbd_dev->pool_name);
1469 if (rc < 0)
1470 goto err_out_client;
1471 rbd_dev->poolid = rc;
1472
1473 /* register our block device */
1474 irc = register_blkdev(0, rbd_dev->name);
1475 if (irc < 0) {
1476 rc = irc;
1477 goto err_out_client;
1478 }
1479 rbd_dev->major = irc;
1480
1481 /* set up and announce blkdev mapping */
1482 rc = rbd_init_disk(rbd_dev);
1483 if (rc)
1484 goto err_out_blkdev;
1485
1486 return count;
1487
1488err_out_blkdev:
1489 unregister_blkdev(rbd_dev->major, rbd_dev->name);
1490err_out_client:
1491 rbd_put_client(rbd_dev);
1492 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1493err_out_slot:
1494 list_del_init(&rbd_dev->node);
1495 mutex_unlock(&ctl_mutex);
1496
1497 kfree(rbd_dev);
1498err_out_opt:
1499 kfree(options);
1500err_mon_dev:
1501 kfree(mon_dev_name);
1502err_out_mod:
1503 dout("Error adding device %s\n", buf);
1504 module_put(THIS_MODULE);
1505 return rc;
1506}
1507
1508static struct rbd_device *__rbd_get_dev(unsigned long id)
1509{
1510 struct list_head *tmp;
1511 struct rbd_device *rbd_dev;
1512
1513 list_for_each(tmp, &rbd_dev_list) {
1514 rbd_dev = list_entry(tmp, struct rbd_device, node);
1515 if (rbd_dev->id == id)
1516 return rbd_dev;
1517 }
1518 return NULL;
1519}
1520
1521static ssize_t class_rbd_remove(struct class *c,
1522 struct class_attribute *attr,
1523 const char *buf,
1524 size_t count)
1525{
1526 struct rbd_device *rbd_dev = NULL;
1527 int target_id, rc;
1528 unsigned long ul;
1529
1530 rc = strict_strtoul(buf, 10, &ul);
1531 if (rc)
1532 return rc;
1533
1534 /* convert to int; abort if we lost anything in the conversion */
1535 target_id = (int) ul;
1536 if (target_id != ul)
1537 return -EINVAL;
1538
1539 /* remove object from list immediately */
1540 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1541
1542 rbd_dev = __rbd_get_dev(target_id);
1543 if (rbd_dev)
1544 list_del_init(&rbd_dev->node);
1545
1546 mutex_unlock(&ctl_mutex);
1547
1548 if (!rbd_dev)
1549 return -ENOENT;
1550
1551 rbd_put_client(rbd_dev);
1552
1553 /* clean up and free blkdev */
1554 rbd_free_disk(rbd_dev);
1555 unregister_blkdev(rbd_dev->major, rbd_dev->name);
1556 kfree(rbd_dev);
1557
1558 /* release module ref */
1559 module_put(THIS_MODULE);
1560
1561 return count;
1562}
1563
1564static ssize_t class_rbd_snaps_list(struct class *c,
1565 struct class_attribute *attr,
1566 char *data)
1567{
1568 struct rbd_device *rbd_dev = NULL;
1569 struct list_head *tmp;
1570 struct rbd_image_header *header;
1571 int i, n = 0, max = PAGE_SIZE;
1572 int ret;
1573
1574 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1575
1576 n += snprintf(data, max, "#id\tsnap\tKB\n");
1577
1578 list_for_each(tmp, &rbd_dev_list) {
1579 char *names, *p;
1580 struct ceph_snap_context *snapc;
1581
1582 rbd_dev = list_entry(tmp, struct rbd_device, node);
1583 header = &rbd_dev->header;
1584
1585 down_read(&header->snap_rwsem);
1586
1587 names = header->snap_names;
1588 snapc = header->snapc;
1589
1590 n += snprintf(data + n, max - n, "%d\t%s\t%lld%s\n",
1591 rbd_dev->id, RBD_SNAP_HEAD_NAME,
1592 header->image_size >> 10,
1593 (!rbd_dev->cur_snap ? " (*)" : ""));
1594 if (n == max)
1595 break;
1596
1597 p = names;
1598 for (i = 0; i < header->total_snaps; i++, p += strlen(p) + 1) {
1599 n += snprintf(data + n, max - n, "%d\t%s\t%lld%s\n",
1600 rbd_dev->id, p, header->snap_sizes[i] >> 10,
1601 (rbd_dev->cur_snap &&
1602 (snap_index(header, i) == rbd_dev->cur_snap) ?
1603 " (*)" : ""));
1604 if (n == max)
1605 break;
1606 }
1607
1608 up_read(&header->snap_rwsem);
1609 }
1610
1611
1612 ret = n;
1613 mutex_unlock(&ctl_mutex);
1614 return ret;
1615}
1616
1617static ssize_t class_rbd_snaps_refresh(struct class *c,
1618 struct class_attribute *attr,
1619 const char *buf,
1620 size_t count)
1621{
1622 struct rbd_device *rbd_dev = NULL;
1623 int target_id, rc;
1624 unsigned long ul;
1625 int ret = count;
1626
1627 rc = strict_strtoul(buf, 10, &ul);
1628 if (rc)
1629 return rc;
1630
1631 /* convert to int; abort if we lost anything in the conversion */
1632 target_id = (int) ul;
1633 if (target_id != ul)
1634 return -EINVAL;
1635
1636 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1637
1638 rbd_dev = __rbd_get_dev(target_id);
1639 if (!rbd_dev) {
1640 ret = -ENOENT;
1641 goto done;
1642 }
1643
1644 rc = rbd_update_snaps(rbd_dev);
1645 if (rc < 0)
1646 ret = rc;
1647
1648done:
1649 mutex_unlock(&ctl_mutex);
1650 return ret;
1651}
1652
1653static ssize_t class_rbd_snap_create(struct class *c,
1654 struct class_attribute *attr,
1655 const char *buf,
1656 size_t count)
1657{
1658 struct rbd_device *rbd_dev = NULL;
1659 int target_id, ret;
1660 char *name;
1661
1662 name = kmalloc(RBD_MAX_SNAP_NAME_LEN + 1, GFP_KERNEL);
1663 if (!name)
1664 return -ENOMEM;
1665
1666 /* parse snaps add command */
1667 if (sscanf(buf, "%d "
1668 "%" __stringify(RBD_MAX_SNAP_NAME_LEN) "s",
1669 &target_id,
1670 name) != 2) {
1671 ret = -EINVAL;
1672 goto done;
1673 }
1674
1675 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1676
1677 rbd_dev = __rbd_get_dev(target_id);
1678 if (!rbd_dev) {
1679 ret = -ENOENT;
1680 goto done_unlock;
1681 }
1682
1683 ret = rbd_header_add_snap(rbd_dev,
1684 name, GFP_KERNEL);
1685 if (ret < 0)
1686 goto done_unlock;
1687
1688 ret = rbd_update_snaps(rbd_dev);
1689 if (ret < 0)
1690 goto done_unlock;
1691
1692 ret = count;
1693done_unlock:
1694 mutex_unlock(&ctl_mutex);
1695done:
1696 kfree(name);
1697 return ret;
1698}
1699
1700static ssize_t class_rbd_rollback(struct class *c,
1701 struct class_attribute *attr,
1702 const char *buf,
1703 size_t count)
1704{
1705 struct rbd_device *rbd_dev = NULL;
1706 int target_id, ret;
1707 u64 snapid;
1708 char snap_name[RBD_MAX_SNAP_NAME_LEN];
1709 u64 cur_ofs;
1710 char *seg_name;
1711
1712 /* parse snaps add command */
1713 if (sscanf(buf, "%d "
1714 "%" __stringify(RBD_MAX_SNAP_NAME_LEN) "s",
1715 &target_id,
1716 snap_name) != 2) {
1717 return -EINVAL;
1718 }
1719
1720 ret = -ENOMEM;
1721 seg_name = kmalloc(RBD_MAX_SEG_NAME_LEN + 1, GFP_NOIO);
1722 if (!seg_name)
1723 return ret;
1724
1725 mutex_lock_nested(&ctl_mutex, SINGLE_DEPTH_NESTING);
1726
1727 rbd_dev = __rbd_get_dev(target_id);
1728 if (!rbd_dev) {
1729 ret = -ENOENT;
1730 goto done_unlock;
1731 }
1732
1733 ret = snap_by_name(&rbd_dev->header, snap_name, &snapid, NULL);
1734 if (ret < 0)
1735 goto done_unlock;
1736
1737 dout("snapid=%lld\n", snapid);
1738
1739 cur_ofs = 0;
1740 while (cur_ofs < rbd_dev->header.image_size) {
1741 cur_ofs += rbd_get_segment(&rbd_dev->header,
1742 rbd_dev->obj,
1743 cur_ofs, (u64)-1,
1744 seg_name, NULL);
1745 dout("seg_name=%s\n", seg_name);
1746
1747 ret = rbd_req_sync_rollback_obj(rbd_dev, snapid, seg_name);
1748 if (ret < 0)
1749 pr_warning("could not roll back obj %s err=%d\n",
1750 seg_name, ret);
1751 }
1752
1753 ret = rbd_update_snaps(rbd_dev);
1754 if (ret < 0)
1755 goto done_unlock;
1756
1757 ret = count;
1758
1759done_unlock:
1760 mutex_unlock(&ctl_mutex);
1761 kfree(seg_name);
1762
1763 return ret;
1764}
1765
1766static struct class_attribute class_rbd_attrs[] = {
1767 __ATTR(add, 0200, NULL, class_rbd_add),
1768 __ATTR(remove, 0200, NULL, class_rbd_remove),
1769 __ATTR(list, 0444, class_rbd_list, NULL),
1770 __ATTR(snaps_refresh, 0200, NULL, class_rbd_snaps_refresh),
1771 __ATTR(snap_create, 0200, NULL, class_rbd_snap_create),
1772 __ATTR(snaps_list, 0444, class_rbd_snaps_list, NULL),
1773 __ATTR(snap_rollback, 0200, NULL, class_rbd_rollback),
1774 __ATTR_NULL
1775};
1776
1777/*
1778 * create control files in sysfs
1779 * /sys/class/rbd/...
1780 */
1781static int rbd_sysfs_init(void)
1782{
1783 int ret = -ENOMEM;
1784
1785 class_rbd = kzalloc(sizeof(*class_rbd), GFP_KERNEL);
1786 if (!class_rbd)
1787 goto out;
1788
1789 class_rbd->name = DRV_NAME;
1790 class_rbd->owner = THIS_MODULE;
1791 class_rbd->class_release = class_rbd_release;
1792 class_rbd->class_attrs = class_rbd_attrs;
1793
1794 ret = class_register(class_rbd);
1795 if (ret)
1796 goto out_class;
1797 return 0;
1798
1799out_class:
1800 kfree(class_rbd);
1801 class_rbd = NULL;
1802 pr_err(DRV_NAME ": failed to create class rbd\n");
1803out:
1804 return ret;
1805}
1806
1807static void rbd_sysfs_cleanup(void)
1808{
1809 if (class_rbd)
1810 class_destroy(class_rbd);
1811 class_rbd = NULL;
1812}
1813
1814int __init rbd_init(void)
1815{
1816 int rc;
1817
1818 rc = rbd_sysfs_init();
1819 if (rc)
1820 return rc;
1821 spin_lock_init(&node_lock);
1822 pr_info("loaded " DRV_NAME_LONG "\n");
1823 return 0;
1824}
1825
1826void __exit rbd_exit(void)
1827{
1828 rbd_sysfs_cleanup();
1829}
1830
1831module_init(rbd_init);
1832module_exit(rbd_exit);
1833
1834MODULE_AUTHOR("Sage Weil <sage@newdream.net>");
1835MODULE_AUTHOR("Yehuda Sadeh <yehuda@hq.newdream.net>");
1836MODULE_DESCRIPTION("rados block device");
1837
1838/* following authorship retained from original osdblk.c */
1839MODULE_AUTHOR("Jeff Garzik <jeff@garzik.org>");
1840
1841MODULE_LICENSE("GPL");