blob: 3e20a122ffa2f2bf40a91596fe12bb93e6874b64 [file] [log] [blame]
Yehuda Sadeh3d14c5d2010-04-06 15:14:15 -07001#include <linux/ceph/ceph_debug.h>
Sage Weilf24e9982009-10-06 11:31:10 -07002
Yehuda Sadeh3d14c5d2010-04-06 15:14:15 -07003#include <linux/module.h>
Sage Weilf24e9982009-10-06 11:31:10 -07004#include <linux/err.h>
5#include <linux/highmem.h>
6#include <linux/mm.h>
7#include <linux/pagemap.h>
8#include <linux/slab.h>
9#include <linux/uaccess.h>
Yehuda Sadeh68b44762010-04-06 15:01:27 -070010#ifdef CONFIG_BLOCK
11#include <linux/bio.h>
12#endif
Sage Weilf24e9982009-10-06 11:31:10 -070013
Yehuda Sadeh3d14c5d2010-04-06 15:14:15 -070014#include <linux/ceph/libceph.h>
15#include <linux/ceph/osd_client.h>
16#include <linux/ceph/messenger.h>
17#include <linux/ceph/decode.h>
18#include <linux/ceph/auth.h>
19#include <linux/ceph/pagelist.h>
Sage Weilf24e9982009-10-06 11:31:10 -070020
Sage Weilc16e7862010-03-01 13:02:00 -080021#define OSD_OP_FRONT_LEN 4096
22#define OSD_OPREPLY_FRONT_LEN 512
Yehuda Sadeh0d59ab82010-01-13 17:03:23 -080023
Tobias Klauser9e327892010-05-20 10:40:19 +020024static const struct ceph_connection_operations osd_con_ops;
Yehuda Sadeh422d2cb2010-02-26 15:32:31 -080025static int __kick_requests(struct ceph_osd_client *osdc,
26 struct ceph_osd *kickosd);
Sage Weilf24e9982009-10-06 11:31:10 -070027
28static void kick_requests(struct ceph_osd_client *osdc, struct ceph_osd *osd);
29
Yehuda Sadeh68b44762010-04-06 15:01:27 -070030static int op_needs_trail(int op)
31{
32 switch (op) {
33 case CEPH_OSD_OP_GETXATTR:
34 case CEPH_OSD_OP_SETXATTR:
35 case CEPH_OSD_OP_CMPXATTR:
36 case CEPH_OSD_OP_CALL:
37 return 1;
38 default:
39 return 0;
40 }
41}
42
43static int op_has_extent(int op)
44{
45 return (op == CEPH_OSD_OP_READ ||
46 op == CEPH_OSD_OP_WRITE);
47}
48
Yehuda Sadeh3499e8a2010-04-06 14:51:47 -070049void ceph_calc_raw_layout(struct ceph_osd_client *osdc,
50 struct ceph_file_layout *layout,
51 u64 snapid,
Yehuda Sadeh68b44762010-04-06 15:01:27 -070052 u64 off, u64 *plen, u64 *bno,
53 struct ceph_osd_request *req,
54 struct ceph_osd_req_op *op)
Yehuda Sadeh3499e8a2010-04-06 14:51:47 -070055{
56 struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base;
Yehuda Sadeh68b44762010-04-06 15:01:27 -070057 u64 orig_len = *plen;
Yehuda Sadeh3499e8a2010-04-06 14:51:47 -070058 u64 objoff, objlen; /* extent in object */
59
60 reqhead->snapid = cpu_to_le64(snapid);
61
62 /* object extent? */
Yehuda Sadeh68b44762010-04-06 15:01:27 -070063 ceph_calc_file_object_mapping(layout, off, plen, bno,
Yehuda Sadeh3499e8a2010-04-06 14:51:47 -070064 &objoff, &objlen);
Yehuda Sadeh68b44762010-04-06 15:01:27 -070065 if (*plen < orig_len)
Yehuda Sadeh3499e8a2010-04-06 14:51:47 -070066 dout(" skipping last %llu, final file extent %llu~%llu\n",
Yehuda Sadeh68b44762010-04-06 15:01:27 -070067 orig_len - *plen, off, *plen);
Yehuda Sadeh3499e8a2010-04-06 14:51:47 -070068
Yehuda Sadeh68b44762010-04-06 15:01:27 -070069 if (op_has_extent(op->op)) {
70 op->extent.offset = objoff;
71 op->extent.length = objlen;
72 }
73 req->r_num_pages = calc_pages_for(off, *plen);
Sage Weilb7495fc2010-11-09 12:43:12 -080074 req->r_page_alignment = off & ~PAGE_MASK;
Yehuda Sadeh3d14c5d2010-04-06 15:14:15 -070075 if (op->op == CEPH_OSD_OP_WRITE)
76 op->payload_len = *plen;
Yehuda Sadeh3499e8a2010-04-06 14:51:47 -070077
78 dout("calc_layout bno=%llx %llu~%llu (%d pages)\n",
79 *bno, objoff, objlen, req->r_num_pages);
80
81}
Yehuda Sadeh3d14c5d2010-04-06 15:14:15 -070082EXPORT_SYMBOL(ceph_calc_raw_layout);
Yehuda Sadeh3499e8a2010-04-06 14:51:47 -070083
Sage Weilf24e9982009-10-06 11:31:10 -070084/*
85 * Implement client access to distributed object storage cluster.
86 *
87 * All data objects are stored within a cluster/cloud of OSDs, or
88 * "object storage devices." (Note that Ceph OSDs have _nothing_ to
89 * do with the T10 OSD extensions to SCSI.) Ceph OSDs are simply
90 * remote daemons serving up and coordinating consistent and safe
91 * access to storage.
92 *
93 * Cluster membership and the mapping of data objects onto storage devices
94 * are described by the osd map.
95 *
96 * We keep track of pending OSD requests (read, write), resubmit
97 * requests to different OSDs when the cluster topology/data layout
98 * change, or retry the affected requests when the communications
99 * channel with an OSD is reset.
100 */
101
102/*
103 * calculate the mapping of a file extent onto an object, and fill out the
104 * request accordingly. shorten extent as necessary if it crosses an
105 * object boundary.
106 *
107 * fill osd op in request message.
108 */
109static void calc_layout(struct ceph_osd_client *osdc,
Yehuda Sadeh3499e8a2010-04-06 14:51:47 -0700110 struct ceph_vino vino,
111 struct ceph_file_layout *layout,
Sage Weilf24e9982009-10-06 11:31:10 -0700112 u64 off, u64 *plen,
Yehuda Sadeh68b44762010-04-06 15:01:27 -0700113 struct ceph_osd_request *req,
114 struct ceph_osd_req_op *op)
Sage Weilf24e9982009-10-06 11:31:10 -0700115{
Sage Weilf24e9982009-10-06 11:31:10 -0700116 u64 bno;
117
Yehuda Sadeh68b44762010-04-06 15:01:27 -0700118 ceph_calc_raw_layout(osdc, layout, vino.snap, off,
119 plen, &bno, req, op);
Sage Weilf24e9982009-10-06 11:31:10 -0700120
121 sprintf(req->r_oid, "%llx.%08llx", vino.ino, bno);
122 req->r_oid_len = strlen(req->r_oid);
Sage Weilf24e9982009-10-06 11:31:10 -0700123}
124
Sage Weilf24e9982009-10-06 11:31:10 -0700125/*
126 * requests
127 */
Sage Weil415e49a2009-12-07 13:37:03 -0800128void ceph_osdc_release_request(struct kref *kref)
Sage Weilf24e9982009-10-06 11:31:10 -0700129{
Sage Weil415e49a2009-12-07 13:37:03 -0800130 struct ceph_osd_request *req = container_of(kref,
131 struct ceph_osd_request,
132 r_kref);
133
134 if (req->r_request)
135 ceph_msg_put(req->r_request);
136 if (req->r_reply)
137 ceph_msg_put(req->r_reply);
Yehuda Sadeh0d59ab82010-01-13 17:03:23 -0800138 if (req->r_con_filling_msg) {
Sage Weil350b1c32009-12-22 10:45:45 -0800139 dout("release_request revoking pages %p from con %p\n",
Yehuda Sadeh0d59ab82010-01-13 17:03:23 -0800140 req->r_pages, req->r_con_filling_msg);
141 ceph_con_revoke_message(req->r_con_filling_msg,
142 req->r_reply);
143 ceph_con_put(req->r_con_filling_msg);
Sage Weil350b1c32009-12-22 10:45:45 -0800144 }
Sage Weil415e49a2009-12-07 13:37:03 -0800145 if (req->r_own_pages)
146 ceph_release_page_vector(req->r_pages,
147 req->r_num_pages);
Yehuda Sadeh68b44762010-04-06 15:01:27 -0700148#ifdef CONFIG_BLOCK
149 if (req->r_bio)
150 bio_put(req->r_bio);
151#endif
Sage Weil415e49a2009-12-07 13:37:03 -0800152 ceph_put_snap_context(req->r_snapc);
Yehuda Sadeh68b44762010-04-06 15:01:27 -0700153 if (req->r_trail) {
154 ceph_pagelist_release(req->r_trail);
155 kfree(req->r_trail);
156 }
Sage Weil415e49a2009-12-07 13:37:03 -0800157 if (req->r_mempool)
158 mempool_free(req, req->r_osdc->req_mempool);
159 else
160 kfree(req);
Sage Weilf24e9982009-10-06 11:31:10 -0700161}
Yehuda Sadeh3d14c5d2010-04-06 15:14:15 -0700162EXPORT_SYMBOL(ceph_osdc_release_request);
Yehuda Sadeh68b44762010-04-06 15:01:27 -0700163
164static int get_num_ops(struct ceph_osd_req_op *ops, int *needs_trail)
165{
166 int i = 0;
167
168 if (needs_trail)
169 *needs_trail = 0;
170 while (ops[i].op) {
171 if (needs_trail && op_needs_trail(ops[i].op))
172 *needs_trail = 1;
173 i++;
174 }
175
176 return i;
177}
178
Yehuda Sadeh3499e8a2010-04-06 14:51:47 -0700179struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
180 int flags,
181 struct ceph_snap_context *snapc,
Yehuda Sadeh68b44762010-04-06 15:01:27 -0700182 struct ceph_osd_req_op *ops,
Yehuda Sadeh3499e8a2010-04-06 14:51:47 -0700183 bool use_mempool,
184 gfp_t gfp_flags,
Yehuda Sadeh68b44762010-04-06 15:01:27 -0700185 struct page **pages,
186 struct bio *bio)
Yehuda Sadeh3499e8a2010-04-06 14:51:47 -0700187{
188 struct ceph_osd_request *req;
189 struct ceph_msg *msg;
Yehuda Sadeh68b44762010-04-06 15:01:27 -0700190 int needs_trail;
191 int num_op = get_num_ops(ops, &needs_trail);
192 size_t msg_size = sizeof(struct ceph_osd_request_head);
Yehuda Sadeh3499e8a2010-04-06 14:51:47 -0700193
Yehuda Sadeh68b44762010-04-06 15:01:27 -0700194 msg_size += num_op*sizeof(struct ceph_osd_op);
Yehuda Sadeh3499e8a2010-04-06 14:51:47 -0700195
196 if (use_mempool) {
197 req = mempool_alloc(osdc->req_mempool, gfp_flags);
198 memset(req, 0, sizeof(*req));
199 } else {
200 req = kzalloc(sizeof(*req), gfp_flags);
201 }
202 if (req == NULL)
203 return NULL;
204
205 req->r_osdc = osdc;
206 req->r_mempool = use_mempool;
Yehuda Sadeh68b44762010-04-06 15:01:27 -0700207
Yehuda Sadeh3499e8a2010-04-06 14:51:47 -0700208 kref_init(&req->r_kref);
209 init_completion(&req->r_completion);
210 init_completion(&req->r_safe_completion);
211 INIT_LIST_HEAD(&req->r_unsafe_item);
212 req->r_flags = flags;
213
214 WARN_ON((flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE)) == 0);
215
216 /* create reply message */
217 if (use_mempool)
218 msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0);
219 else
220 msg = ceph_msg_new(CEPH_MSG_OSD_OPREPLY,
221 OSD_OPREPLY_FRONT_LEN, gfp_flags);
222 if (!msg) {
223 ceph_osdc_put_request(req);
224 return NULL;
225 }
226 req->r_reply = msg;
227
Yehuda Sadeh68b44762010-04-06 15:01:27 -0700228 /* allocate space for the trailing data */
229 if (needs_trail) {
230 req->r_trail = kmalloc(sizeof(struct ceph_pagelist), gfp_flags);
231 if (!req->r_trail) {
232 ceph_osdc_put_request(req);
233 return NULL;
234 }
235 ceph_pagelist_init(req->r_trail);
236 }
Yehuda Sadeh3499e8a2010-04-06 14:51:47 -0700237 /* create request message; allow space for oid */
238 msg_size += 40;
239 if (snapc)
240 msg_size += sizeof(u64) * snapc->num_snaps;
241 if (use_mempool)
242 msg = ceph_msgpool_get(&osdc->msgpool_op, 0);
243 else
244 msg = ceph_msg_new(CEPH_MSG_OSD_OP, msg_size, gfp_flags);
245 if (!msg) {
246 ceph_osdc_put_request(req);
247 return NULL;
248 }
Yehuda Sadeh68b44762010-04-06 15:01:27 -0700249
Yehuda Sadeh3499e8a2010-04-06 14:51:47 -0700250 msg->hdr.type = cpu_to_le16(CEPH_MSG_OSD_OP);
251 memset(msg->front.iov_base, 0, msg->front.iov_len);
252
253 req->r_request = msg;
254 req->r_pages = pages;
Yehuda Sadeh68b44762010-04-06 15:01:27 -0700255#ifdef CONFIG_BLOCK
256 if (bio) {
257 req->r_bio = bio;
258 bio_get(req->r_bio);
259 }
260#endif
Yehuda Sadeh3499e8a2010-04-06 14:51:47 -0700261
262 return req;
263}
Yehuda Sadeh3d14c5d2010-04-06 15:14:15 -0700264EXPORT_SYMBOL(ceph_osdc_alloc_request);
Yehuda Sadeh3499e8a2010-04-06 14:51:47 -0700265
Yehuda Sadeh68b44762010-04-06 15:01:27 -0700266static void osd_req_encode_op(struct ceph_osd_request *req,
267 struct ceph_osd_op *dst,
268 struct ceph_osd_req_op *src)
269{
270 dst->op = cpu_to_le16(src->op);
271
272 switch (dst->op) {
273 case CEPH_OSD_OP_READ:
274 case CEPH_OSD_OP_WRITE:
275 dst->extent.offset =
276 cpu_to_le64(src->extent.offset);
277 dst->extent.length =
278 cpu_to_le64(src->extent.length);
279 dst->extent.truncate_size =
280 cpu_to_le64(src->extent.truncate_size);
281 dst->extent.truncate_seq =
282 cpu_to_le32(src->extent.truncate_seq);
283 break;
284
285 case CEPH_OSD_OP_GETXATTR:
286 case CEPH_OSD_OP_SETXATTR:
287 case CEPH_OSD_OP_CMPXATTR:
288 BUG_ON(!req->r_trail);
289
290 dst->xattr.name_len = cpu_to_le32(src->xattr.name_len);
291 dst->xattr.value_len = cpu_to_le32(src->xattr.value_len);
292 dst->xattr.cmp_op = src->xattr.cmp_op;
293 dst->xattr.cmp_mode = src->xattr.cmp_mode;
294 ceph_pagelist_append(req->r_trail, src->xattr.name,
295 src->xattr.name_len);
296 ceph_pagelist_append(req->r_trail, src->xattr.val,
297 src->xattr.value_len);
298 break;
Yehuda Sadehae1533b2010-05-18 16:38:08 -0700299 case CEPH_OSD_OP_CALL:
300 BUG_ON(!req->r_trail);
301
302 dst->cls.class_len = src->cls.class_len;
303 dst->cls.method_len = src->cls.method_len;
304 dst->cls.indata_len = cpu_to_le32(src->cls.indata_len);
305
306 ceph_pagelist_append(req->r_trail, src->cls.class_name,
307 src->cls.class_len);
308 ceph_pagelist_append(req->r_trail, src->cls.method_name,
309 src->cls.method_len);
310 ceph_pagelist_append(req->r_trail, src->cls.indata,
311 src->cls.indata_len);
312 break;
313 case CEPH_OSD_OP_ROLLBACK:
314 dst->snap.snapid = cpu_to_le64(src->snap.snapid);
315 break;
Yehuda Sadeh68b44762010-04-06 15:01:27 -0700316 case CEPH_OSD_OP_STARTSYNC:
317 break;
318 default:
319 pr_err("unrecognized osd opcode %d\n", dst->op);
320 WARN_ON(1);
321 break;
322 }
323 dst->payload_len = cpu_to_le32(src->payload_len);
324}
325
Yehuda Sadeh3499e8a2010-04-06 14:51:47 -0700326/*
327 * build new request AND message
328 *
329 */
330void ceph_osdc_build_request(struct ceph_osd_request *req,
Yehuda Sadeh68b44762010-04-06 15:01:27 -0700331 u64 off, u64 *plen,
332 struct ceph_osd_req_op *src_ops,
333 struct ceph_snap_context *snapc,
334 struct timespec *mtime,
335 const char *oid,
336 int oid_len)
Yehuda Sadeh3499e8a2010-04-06 14:51:47 -0700337{
338 struct ceph_msg *msg = req->r_request;
339 struct ceph_osd_request_head *head;
Yehuda Sadeh68b44762010-04-06 15:01:27 -0700340 struct ceph_osd_req_op *src_op;
Yehuda Sadeh3499e8a2010-04-06 14:51:47 -0700341 struct ceph_osd_op *op;
342 void *p;
Yehuda Sadeh68b44762010-04-06 15:01:27 -0700343 int num_op = get_num_ops(src_ops, NULL);
Yehuda Sadeh3499e8a2010-04-06 14:51:47 -0700344 size_t msg_size = sizeof(*head) + num_op*sizeof(*op);
Yehuda Sadeh3499e8a2010-04-06 14:51:47 -0700345 int flags = req->r_flags;
Yehuda Sadeh68b44762010-04-06 15:01:27 -0700346 u64 data_len = 0;
347 int i;
Yehuda Sadeh3499e8a2010-04-06 14:51:47 -0700348
349 head = msg->front.iov_base;
350 op = (void *)(head + 1);
351 p = (void *)(op + num_op);
352
353 req->r_snapc = ceph_get_snap_context(snapc);
354
355 head->client_inc = cpu_to_le32(1); /* always, for now. */
356 head->flags = cpu_to_le32(flags);
357 if (flags & CEPH_OSD_FLAG_WRITE)
358 ceph_encode_timespec(&head->mtime, mtime);
359 head->num_ops = cpu_to_le16(num_op);
Yehuda Sadeh3499e8a2010-04-06 14:51:47 -0700360
Yehuda Sadeh3499e8a2010-04-06 14:51:47 -0700361
362 /* fill in oid */
363 head->object_len = cpu_to_le32(oid_len);
364 memcpy(p, oid, oid_len);
365 p += oid_len;
366
Yehuda Sadeh68b44762010-04-06 15:01:27 -0700367 src_op = src_ops;
368 while (src_op->op) {
369 osd_req_encode_op(req, op, src_op);
370 src_op++;
Yehuda Sadeh3499e8a2010-04-06 14:51:47 -0700371 op++;
Yehuda Sadeh3499e8a2010-04-06 14:51:47 -0700372 }
Yehuda Sadeh68b44762010-04-06 15:01:27 -0700373
374 if (req->r_trail)
375 data_len += req->r_trail->length;
376
Yehuda Sadeh3499e8a2010-04-06 14:51:47 -0700377 if (snapc) {
378 head->snap_seq = cpu_to_le64(snapc->seq);
379 head->num_snaps = cpu_to_le32(snapc->num_snaps);
380 for (i = 0; i < snapc->num_snaps; i++) {
381 put_unaligned_le64(snapc->snaps[i], p);
382 p += sizeof(u64);
383 }
384 }
385
Yehuda Sadeh68b44762010-04-06 15:01:27 -0700386 if (flags & CEPH_OSD_FLAG_WRITE) {
387 req->r_request->hdr.data_off = cpu_to_le16(off);
388 req->r_request->hdr.data_len = cpu_to_le32(*plen + data_len);
389 } else if (data_len) {
390 req->r_request->hdr.data_off = 0;
391 req->r_request->hdr.data_len = cpu_to_le32(data_len);
392 }
393
Sage Weilc5c6b192010-11-09 12:40:00 -0800394 req->r_request->page_alignment = req->r_page_alignment;
395
Yehuda Sadeh3499e8a2010-04-06 14:51:47 -0700396 BUG_ON(p > msg->front.iov_base + msg->front.iov_len);
397 msg_size = p - msg->front.iov_base;
398 msg->front.iov_len = msg_size;
399 msg->hdr.front_len = cpu_to_le32(msg_size);
400 return;
401}
Yehuda Sadeh3d14c5d2010-04-06 15:14:15 -0700402EXPORT_SYMBOL(ceph_osdc_build_request);
Yehuda Sadeh3499e8a2010-04-06 14:51:47 -0700403
Sage Weilf24e9982009-10-06 11:31:10 -0700404/*
405 * build new request AND message, calculate layout, and adjust file
406 * extent as needed.
407 *
408 * if the file was recently truncated, we include information about its
409 * old and new size so that the object can be updated appropriately. (we
410 * avoid synchronously deleting truncated objects because it's slow.)
411 *
412 * if @do_sync, include a 'startsync' command so that the osd will flush
413 * data quickly.
414 */
415struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
416 struct ceph_file_layout *layout,
417 struct ceph_vino vino,
418 u64 off, u64 *plen,
419 int opcode, int flags,
420 struct ceph_snap_context *snapc,
421 int do_sync,
422 u32 truncate_seq,
423 u64 truncate_size,
424 struct timespec *mtime,
Sage Weilb7495fc2010-11-09 12:43:12 -0800425 bool use_mempool, int num_reply,
426 int page_align)
Sage Weilf24e9982009-10-06 11:31:10 -0700427{
Yehuda Sadeh68b44762010-04-06 15:01:27 -0700428 struct ceph_osd_req_op ops[3];
429 struct ceph_osd_request *req;
430
431 ops[0].op = opcode;
432 ops[0].extent.truncate_seq = truncate_seq;
433 ops[0].extent.truncate_size = truncate_size;
434 ops[0].payload_len = 0;
435
436 if (do_sync) {
437 ops[1].op = CEPH_OSD_OP_STARTSYNC;
438 ops[1].payload_len = 0;
439 ops[2].op = 0;
440 } else
441 ops[1].op = 0;
442
443 req = ceph_osdc_alloc_request(osdc, flags,
444 snapc, ops,
Yehuda Sadeh3499e8a2010-04-06 14:51:47 -0700445 use_mempool,
Yehuda Sadeh68b44762010-04-06 15:01:27 -0700446 GFP_NOFS, NULL, NULL);
Yehuda Sadeh3499e8a2010-04-06 14:51:47 -0700447 if (IS_ERR(req))
448 return req;
Sage Weilf24e9982009-10-06 11:31:10 -0700449
450 /* calculate max write size */
Yehuda Sadeh68b44762010-04-06 15:01:27 -0700451 calc_layout(osdc, vino, layout, off, plen, req, ops);
Sage Weilf24e9982009-10-06 11:31:10 -0700452 req->r_file_layout = *layout; /* keep a copy */
453
Sage Weilb7495fc2010-11-09 12:43:12 -0800454 /* in case it differs from natural alignment that calc_layout
455 filled in for us */
456 req->r_page_alignment = page_align;
457
Yehuda Sadeh68b44762010-04-06 15:01:27 -0700458 ceph_osdc_build_request(req, off, plen, ops,
459 snapc,
Yehuda Sadeh3499e8a2010-04-06 14:51:47 -0700460 mtime,
461 req->r_oid, req->r_oid_len);
Sage Weilf24e9982009-10-06 11:31:10 -0700462
Sage Weilf24e9982009-10-06 11:31:10 -0700463 return req;
464}
Yehuda Sadeh3d14c5d2010-04-06 15:14:15 -0700465EXPORT_SYMBOL(ceph_osdc_new_request);
Sage Weilf24e9982009-10-06 11:31:10 -0700466
467/*
468 * We keep osd requests in an rbtree, sorted by ->r_tid.
469 */
470static void __insert_request(struct ceph_osd_client *osdc,
471 struct ceph_osd_request *new)
472{
473 struct rb_node **p = &osdc->requests.rb_node;
474 struct rb_node *parent = NULL;
475 struct ceph_osd_request *req = NULL;
476
477 while (*p) {
478 parent = *p;
479 req = rb_entry(parent, struct ceph_osd_request, r_node);
480 if (new->r_tid < req->r_tid)
481 p = &(*p)->rb_left;
482 else if (new->r_tid > req->r_tid)
483 p = &(*p)->rb_right;
484 else
485 BUG();
486 }
487
488 rb_link_node(&new->r_node, parent, p);
489 rb_insert_color(&new->r_node, &osdc->requests);
490}
491
492static struct ceph_osd_request *__lookup_request(struct ceph_osd_client *osdc,
493 u64 tid)
494{
495 struct ceph_osd_request *req;
496 struct rb_node *n = osdc->requests.rb_node;
497
498 while (n) {
499 req = rb_entry(n, struct ceph_osd_request, r_node);
500 if (tid < req->r_tid)
501 n = n->rb_left;
502 else if (tid > req->r_tid)
503 n = n->rb_right;
504 else
505 return req;
506 }
507 return NULL;
508}
509
510static struct ceph_osd_request *
511__lookup_request_ge(struct ceph_osd_client *osdc,
512 u64 tid)
513{
514 struct ceph_osd_request *req;
515 struct rb_node *n = osdc->requests.rb_node;
516
517 while (n) {
518 req = rb_entry(n, struct ceph_osd_request, r_node);
519 if (tid < req->r_tid) {
520 if (!n->rb_left)
521 return req;
522 n = n->rb_left;
523 } else if (tid > req->r_tid) {
524 n = n->rb_right;
525 } else {
526 return req;
527 }
528 }
529 return NULL;
530}
531
532
533/*
Sage Weil81b024e2009-10-09 10:29:18 -0700534 * If the osd connection drops, we need to resubmit all requests.
Sage Weilf24e9982009-10-06 11:31:10 -0700535 */
536static void osd_reset(struct ceph_connection *con)
537{
538 struct ceph_osd *osd = con->private;
539 struct ceph_osd_client *osdc;
540
541 if (!osd)
542 return;
543 dout("osd_reset osd%d\n", osd->o_osd);
544 osdc = osd->o_osdc;
Sage Weilf24e9982009-10-06 11:31:10 -0700545 down_read(&osdc->map_sem);
546 kick_requests(osdc, osd);
547 up_read(&osdc->map_sem);
548}
549
550/*
551 * Track open sessions with osds.
552 */
553static struct ceph_osd *create_osd(struct ceph_osd_client *osdc)
554{
555 struct ceph_osd *osd;
556
557 osd = kzalloc(sizeof(*osd), GFP_NOFS);
558 if (!osd)
559 return NULL;
560
561 atomic_set(&osd->o_ref, 1);
562 osd->o_osdc = osdc;
563 INIT_LIST_HEAD(&osd->o_requests);
Yehuda Sadehf5a20412010-02-03 11:00:26 -0800564 INIT_LIST_HEAD(&osd->o_osd_lru);
Sage Weilf24e9982009-10-06 11:31:10 -0700565 osd->o_incarnation = 1;
566
567 ceph_con_init(osdc->client->msgr, &osd->o_con);
568 osd->o_con.private = osd;
569 osd->o_con.ops = &osd_con_ops;
570 osd->o_con.peer_name.type = CEPH_ENTITY_TYPE_OSD;
Sage Weil4e7a5dc2009-11-18 16:19:57 -0800571
Yehuda Sadeh422d2cb2010-02-26 15:32:31 -0800572 INIT_LIST_HEAD(&osd->o_keepalive_item);
Sage Weilf24e9982009-10-06 11:31:10 -0700573 return osd;
574}
575
576static struct ceph_osd *get_osd(struct ceph_osd *osd)
577{
578 if (atomic_inc_not_zero(&osd->o_ref)) {
579 dout("get_osd %p %d -> %d\n", osd, atomic_read(&osd->o_ref)-1,
580 atomic_read(&osd->o_ref));
581 return osd;
582 } else {
583 dout("get_osd %p FAIL\n", osd);
584 return NULL;
585 }
586}
587
588static void put_osd(struct ceph_osd *osd)
589{
590 dout("put_osd %p %d -> %d\n", osd, atomic_read(&osd->o_ref),
591 atomic_read(&osd->o_ref) - 1);
Sage Weil79494d12010-05-27 14:15:49 -0700592 if (atomic_dec_and_test(&osd->o_ref)) {
593 struct ceph_auth_client *ac = osd->o_osdc->client->monc.auth;
594
595 if (osd->o_authorizer)
596 ac->ops->destroy_authorizer(ac, osd->o_authorizer);
Sage Weilf24e9982009-10-06 11:31:10 -0700597 kfree(osd);
Sage Weil79494d12010-05-27 14:15:49 -0700598 }
Sage Weilf24e9982009-10-06 11:31:10 -0700599}
600
601/*
602 * remove an osd from our map
603 */
Yehuda Sadehf5a20412010-02-03 11:00:26 -0800604static void __remove_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
Sage Weilf24e9982009-10-06 11:31:10 -0700605{
Yehuda Sadehf5a20412010-02-03 11:00:26 -0800606 dout("__remove_osd %p\n", osd);
Sage Weilf24e9982009-10-06 11:31:10 -0700607 BUG_ON(!list_empty(&osd->o_requests));
608 rb_erase(&osd->o_node, &osdc->osds);
Yehuda Sadehf5a20412010-02-03 11:00:26 -0800609 list_del_init(&osd->o_osd_lru);
Sage Weilf24e9982009-10-06 11:31:10 -0700610 ceph_con_close(&osd->o_con);
611 put_osd(osd);
612}
613
Yehuda Sadehf5a20412010-02-03 11:00:26 -0800614static void __move_osd_to_lru(struct ceph_osd_client *osdc,
615 struct ceph_osd *osd)
616{
617 dout("__move_osd_to_lru %p\n", osd);
618 BUG_ON(!list_empty(&osd->o_osd_lru));
619 list_add_tail(&osd->o_osd_lru, &osdc->osd_lru);
Yehuda Sadeh3d14c5d2010-04-06 15:14:15 -0700620 osd->lru_ttl = jiffies + osdc->client->options->osd_idle_ttl * HZ;
Yehuda Sadehf5a20412010-02-03 11:00:26 -0800621}
622
623static void __remove_osd_from_lru(struct ceph_osd *osd)
624{
625 dout("__remove_osd_from_lru %p\n", osd);
626 if (!list_empty(&osd->o_osd_lru))
627 list_del_init(&osd->o_osd_lru);
628}
629
630static void remove_old_osds(struct ceph_osd_client *osdc, int remove_all)
631{
632 struct ceph_osd *osd, *nosd;
633
634 dout("__remove_old_osds %p\n", osdc);
635 mutex_lock(&osdc->request_mutex);
636 list_for_each_entry_safe(osd, nosd, &osdc->osd_lru, o_osd_lru) {
637 if (!remove_all && time_before(jiffies, osd->lru_ttl))
638 break;
639 __remove_osd(osdc, osd);
640 }
641 mutex_unlock(&osdc->request_mutex);
642}
643
Sage Weilf24e9982009-10-06 11:31:10 -0700644/*
645 * reset osd connect
646 */
Yehuda Sadehf5a20412010-02-03 11:00:26 -0800647static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
Sage Weilf24e9982009-10-06 11:31:10 -0700648{
Sage Weil87b315a2010-03-22 14:51:18 -0700649 struct ceph_osd_request *req;
Sage Weilf24e9982009-10-06 11:31:10 -0700650 int ret = 0;
651
Yehuda Sadehf5a20412010-02-03 11:00:26 -0800652 dout("__reset_osd %p osd%d\n", osd, osd->o_osd);
Sage Weilf24e9982009-10-06 11:31:10 -0700653 if (list_empty(&osd->o_requests)) {
Yehuda Sadehf5a20412010-02-03 11:00:26 -0800654 __remove_osd(osdc, osd);
Sage Weil87b315a2010-03-22 14:51:18 -0700655 } else if (memcmp(&osdc->osdmap->osd_addr[osd->o_osd],
656 &osd->o_con.peer_addr,
657 sizeof(osd->o_con.peer_addr)) == 0 &&
658 !ceph_con_opened(&osd->o_con)) {
659 dout(" osd addr hasn't changed and connection never opened,"
660 " letting msgr retry");
661 /* touch each r_stamp for handle_timeout()'s benfit */
662 list_for_each_entry(req, &osd->o_requests, r_osd_item)
663 req->r_stamp = jiffies;
664 ret = -EAGAIN;
Sage Weilf24e9982009-10-06 11:31:10 -0700665 } else {
666 ceph_con_close(&osd->o_con);
667 ceph_con_open(&osd->o_con, &osdc->osdmap->osd_addr[osd->o_osd]);
668 osd->o_incarnation++;
669 }
670 return ret;
671}
672
673static void __insert_osd(struct ceph_osd_client *osdc, struct ceph_osd *new)
674{
675 struct rb_node **p = &osdc->osds.rb_node;
676 struct rb_node *parent = NULL;
677 struct ceph_osd *osd = NULL;
678
679 while (*p) {
680 parent = *p;
681 osd = rb_entry(parent, struct ceph_osd, o_node);
682 if (new->o_osd < osd->o_osd)
683 p = &(*p)->rb_left;
684 else if (new->o_osd > osd->o_osd)
685 p = &(*p)->rb_right;
686 else
687 BUG();
688 }
689
690 rb_link_node(&new->o_node, parent, p);
691 rb_insert_color(&new->o_node, &osdc->osds);
692}
693
694static struct ceph_osd *__lookup_osd(struct ceph_osd_client *osdc, int o)
695{
696 struct ceph_osd *osd;
697 struct rb_node *n = osdc->osds.rb_node;
698
699 while (n) {
700 osd = rb_entry(n, struct ceph_osd, o_node);
701 if (o < osd->o_osd)
702 n = n->rb_left;
703 else if (o > osd->o_osd)
704 n = n->rb_right;
705 else
706 return osd;
707 }
708 return NULL;
709}
710
Yehuda Sadeh422d2cb2010-02-26 15:32:31 -0800711static void __schedule_osd_timeout(struct ceph_osd_client *osdc)
712{
713 schedule_delayed_work(&osdc->timeout_work,
Yehuda Sadeh3d14c5d2010-04-06 15:14:15 -0700714 osdc->client->options->osd_keepalive_timeout * HZ);
Yehuda Sadeh422d2cb2010-02-26 15:32:31 -0800715}
716
717static void __cancel_osd_timeout(struct ceph_osd_client *osdc)
718{
719 cancel_delayed_work(&osdc->timeout_work);
720}
Sage Weilf24e9982009-10-06 11:31:10 -0700721
722/*
723 * Register request, assign tid. If this is the first request, set up
724 * the timeout event.
725 */
726static void register_request(struct ceph_osd_client *osdc,
727 struct ceph_osd_request *req)
728{
Sage Weilf24e9982009-10-06 11:31:10 -0700729 mutex_lock(&osdc->request_mutex);
730 req->r_tid = ++osdc->last_tid;
Sage Weil6df058c2009-12-22 11:24:33 -0800731 req->r_request->hdr.tid = cpu_to_le64(req->r_tid);
Yehuda Sadeh422d2cb2010-02-26 15:32:31 -0800732 INIT_LIST_HEAD(&req->r_req_lru_item);
Sage Weilf24e9982009-10-06 11:31:10 -0700733
734 dout("register_request %p tid %lld\n", req, req->r_tid);
735 __insert_request(osdc, req);
736 ceph_osdc_get_request(req);
737 osdc->num_requests++;
738
Sage Weilf24e9982009-10-06 11:31:10 -0700739 if (osdc->num_requests == 1) {
Yehuda Sadeh422d2cb2010-02-26 15:32:31 -0800740 dout(" first request, scheduling timeout\n");
741 __schedule_osd_timeout(osdc);
Sage Weilf24e9982009-10-06 11:31:10 -0700742 }
743 mutex_unlock(&osdc->request_mutex);
744}
745
746/*
747 * called under osdc->request_mutex
748 */
749static void __unregister_request(struct ceph_osd_client *osdc,
750 struct ceph_osd_request *req)
751{
752 dout("__unregister_request %p tid %lld\n", req, req->r_tid);
753 rb_erase(&req->r_node, &osdc->requests);
754 osdc->num_requests--;
755
Sage Weil0ba64782009-10-08 16:57:16 -0700756 if (req->r_osd) {
757 /* make sure the original request isn't in flight. */
758 ceph_con_revoke(&req->r_osd->o_con, req->r_request);
759
760 list_del_init(&req->r_osd_item);
761 if (list_empty(&req->r_osd->o_requests))
Yehuda Sadehf5a20412010-02-03 11:00:26 -0800762 __move_osd_to_lru(osdc, req->r_osd);
Sage Weil0ba64782009-10-08 16:57:16 -0700763 req->r_osd = NULL;
764 }
Sage Weilf24e9982009-10-06 11:31:10 -0700765
766 ceph_osdc_put_request(req);
767
Yehuda Sadeh422d2cb2010-02-26 15:32:31 -0800768 list_del_init(&req->r_req_lru_item);
769 if (osdc->num_requests == 0) {
770 dout(" no requests, canceling timeout\n");
771 __cancel_osd_timeout(osdc);
Sage Weilf24e9982009-10-06 11:31:10 -0700772 }
773}
774
775/*
776 * Cancel a previously queued request message
777 */
778static void __cancel_request(struct ceph_osd_request *req)
779{
Sage Weil6bc18872010-09-27 10:18:52 -0700780 if (req->r_sent && req->r_osd) {
Sage Weilf24e9982009-10-06 11:31:10 -0700781 ceph_con_revoke(&req->r_osd->o_con, req->r_request);
782 req->r_sent = 0;
783 }
Yehuda Sadeh422d2cb2010-02-26 15:32:31 -0800784 list_del_init(&req->r_req_lru_item);
Sage Weilf24e9982009-10-06 11:31:10 -0700785}
786
787/*
788 * Pick an osd (the first 'up' osd in the pg), allocate the osd struct
789 * (as needed), and set the request r_osd appropriately. If there is
790 * no up osd, set r_osd to NULL.
791 *
792 * Return 0 if unchanged, 1 if changed, or negative on error.
793 *
794 * Caller should hold map_sem for read and request_mutex.
795 */
796static int __map_osds(struct ceph_osd_client *osdc,
797 struct ceph_osd_request *req)
798{
799 struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base;
Sage Weil51042122009-11-04 11:39:12 -0800800 struct ceph_pg pgid;
Sage Weild85b7052010-05-10 10:24:48 -0700801 int acting[CEPH_PG_MAX_SIZE];
802 int o = -1, num = 0;
Sage Weilf24e9982009-10-06 11:31:10 -0700803 int err;
Sage Weilf24e9982009-10-06 11:31:10 -0700804
805 dout("map_osds %p tid %lld\n", req, req->r_tid);
806 err = ceph_calc_object_layout(&reqhead->layout, req->r_oid,
807 &req->r_file_layout, osdc->osdmap);
808 if (err)
809 return err;
Sage Weil51042122009-11-04 11:39:12 -0800810 pgid = reqhead->layout.ol_pgid;
Sage Weil7740a422010-01-08 15:58:25 -0800811 req->r_pgid = pgid;
812
Sage Weild85b7052010-05-10 10:24:48 -0700813 err = ceph_calc_pg_acting(osdc->osdmap, pgid, acting);
814 if (err > 0) {
815 o = acting[0];
816 num = err;
817 }
Sage Weilf24e9982009-10-06 11:31:10 -0700818
819 if ((req->r_osd && req->r_osd->o_osd == o &&
Sage Weild85b7052010-05-10 10:24:48 -0700820 req->r_sent >= req->r_osd->o_incarnation &&
821 req->r_num_pg_osds == num &&
822 memcmp(req->r_pg_osds, acting, sizeof(acting[0])*num) == 0) ||
Sage Weilf24e9982009-10-06 11:31:10 -0700823 (req->r_osd == NULL && o == -1))
824 return 0; /* no change */
825
Sage Weil51042122009-11-04 11:39:12 -0800826 dout("map_osds tid %llu pgid %d.%x osd%d (was osd%d)\n",
827 req->r_tid, le32_to_cpu(pgid.pool), le16_to_cpu(pgid.ps), o,
Sage Weilf24e9982009-10-06 11:31:10 -0700828 req->r_osd ? req->r_osd->o_osd : -1);
829
Sage Weild85b7052010-05-10 10:24:48 -0700830 /* record full pg acting set */
831 memcpy(req->r_pg_osds, acting, sizeof(acting[0]) * num);
832 req->r_num_pg_osds = num;
833
Sage Weilf24e9982009-10-06 11:31:10 -0700834 if (req->r_osd) {
835 __cancel_request(req);
836 list_del_init(&req->r_osd_item);
Sage Weilf24e9982009-10-06 11:31:10 -0700837 req->r_osd = NULL;
838 }
839
840 req->r_osd = __lookup_osd(osdc, o);
841 if (!req->r_osd && o >= 0) {
Sage Weilc99eb1c2010-02-26 09:37:33 -0800842 err = -ENOMEM;
843 req->r_osd = create_osd(osdc);
844 if (!req->r_osd)
845 goto out;
Sage Weilf24e9982009-10-06 11:31:10 -0700846
847 dout("map_osds osd %p is osd%d\n", req->r_osd, o);
848 req->r_osd->o_osd = o;
849 req->r_osd->o_con.peer_name.num = cpu_to_le64(o);
850 __insert_osd(osdc, req->r_osd);
851
852 ceph_con_open(&req->r_osd->o_con, &osdc->osdmap->osd_addr[o]);
853 }
854
Yehuda Sadehf5a20412010-02-03 11:00:26 -0800855 if (req->r_osd) {
856 __remove_osd_from_lru(req->r_osd);
Sage Weilf24e9982009-10-06 11:31:10 -0700857 list_add(&req->r_osd_item, &req->r_osd->o_requests);
Yehuda Sadehf5a20412010-02-03 11:00:26 -0800858 }
Sage Weild85b7052010-05-10 10:24:48 -0700859 err = 1; /* osd or pg changed */
Sage Weilf24e9982009-10-06 11:31:10 -0700860
861out:
Sage Weilf24e9982009-10-06 11:31:10 -0700862 return err;
863}
864
865/*
866 * caller should hold map_sem (for read) and request_mutex
867 */
868static int __send_request(struct ceph_osd_client *osdc,
869 struct ceph_osd_request *req)
870{
871 struct ceph_osd_request_head *reqhead;
872 int err;
873
874 err = __map_osds(osdc, req);
875 if (err < 0)
876 return err;
877 if (req->r_osd == NULL) {
878 dout("send_request %p no up osds in pg\n", req);
879 ceph_monc_request_next_osdmap(&osdc->client->monc);
880 return 0;
881 }
882
883 dout("send_request %p tid %llu to osd%d flags %d\n",
884 req, req->r_tid, req->r_osd->o_osd, req->r_flags);
885
886 reqhead = req->r_request->front.iov_base;
887 reqhead->osdmap_epoch = cpu_to_le32(osdc->osdmap->epoch);
888 reqhead->flags |= cpu_to_le32(req->r_flags); /* e.g., RETRY */
889 reqhead->reassert_version = req->r_reassert_version;
890
Sage Weil3dd72fc2010-03-22 14:42:30 -0700891 req->r_stamp = jiffies;
Henry C Chang07a27e22010-08-22 21:34:27 -0700892 list_move_tail(&req->r_req_lru_item, &osdc->req_lru);
Sage Weilf24e9982009-10-06 11:31:10 -0700893
894 ceph_msg_get(req->r_request); /* send consumes a ref */
895 ceph_con_send(&req->r_osd->o_con, req->r_request);
896 req->r_sent = req->r_osd->o_incarnation;
897 return 0;
898}
899
900/*
901 * Timeout callback, called every N seconds when 1 or more osd
902 * requests has been active for more than N seconds. When this
903 * happens, we ping all OSDs with requests who have timed out to
904 * ensure any communications channel reset is detected. Reset the
905 * request timeouts another N seconds in the future as we go.
906 * Reschedule the timeout event another N seconds in future (unless
907 * there are no open requests).
908 */
909static void handle_timeout(struct work_struct *work)
910{
911 struct ceph_osd_client *osdc =
912 container_of(work, struct ceph_osd_client, timeout_work.work);
Yehuda Sadeh422d2cb2010-02-26 15:32:31 -0800913 struct ceph_osd_request *req, *last_req = NULL;
Sage Weilf24e9982009-10-06 11:31:10 -0700914 struct ceph_osd *osd;
Yehuda Sadeh3d14c5d2010-04-06 15:14:15 -0700915 unsigned long timeout = osdc->client->options->osd_timeout * HZ;
Yehuda Sadeh422d2cb2010-02-26 15:32:31 -0800916 unsigned long keepalive =
Yehuda Sadeh3d14c5d2010-04-06 15:14:15 -0700917 osdc->client->options->osd_keepalive_timeout * HZ;
Sage Weil3dd72fc2010-03-22 14:42:30 -0700918 unsigned long last_stamp = 0;
Sage Weilf24e9982009-10-06 11:31:10 -0700919 struct rb_node *p;
Yehuda Sadeh422d2cb2010-02-26 15:32:31 -0800920 struct list_head slow_osds;
Sage Weilf24e9982009-10-06 11:31:10 -0700921
922 dout("timeout\n");
923 down_read(&osdc->map_sem);
924
925 ceph_monc_request_next_osdmap(&osdc->client->monc);
926
927 mutex_lock(&osdc->request_mutex);
928 for (p = rb_first(&osdc->requests); p; p = rb_next(p)) {
929 req = rb_entry(p, struct ceph_osd_request, r_node);
930
931 if (req->r_resend) {
932 int err;
933
934 dout("osdc resending prev failed %lld\n", req->r_tid);
935 err = __send_request(osdc, req);
936 if (err)
937 dout("osdc failed again on %lld\n", req->r_tid);
938 else
939 req->r_resend = false;
940 continue;
941 }
942 }
Sage Weilf24e9982009-10-06 11:31:10 -0700943
Yehuda Sadeh422d2cb2010-02-26 15:32:31 -0800944 /*
945 * reset osds that appear to be _really_ unresponsive. this
946 * is a failsafe measure.. we really shouldn't be getting to
947 * this point if the system is working properly. the monitors
948 * should mark the osd as failed and we should find out about
949 * it from an updated osd map.
950 */
Sage Weilf26e6812010-04-21 11:09:38 -0700951 while (timeout && !list_empty(&osdc->req_lru)) {
Yehuda Sadeh422d2cb2010-02-26 15:32:31 -0800952 req = list_entry(osdc->req_lru.next, struct ceph_osd_request,
953 r_req_lru_item);
954
Sage Weil3dd72fc2010-03-22 14:42:30 -0700955 if (time_before(jiffies, req->r_stamp + timeout))
Yehuda Sadeh422d2cb2010-02-26 15:32:31 -0800956 break;
957
Sage Weil3dd72fc2010-03-22 14:42:30 -0700958 BUG_ON(req == last_req && req->r_stamp == last_stamp);
Yehuda Sadeh422d2cb2010-02-26 15:32:31 -0800959 last_req = req;
Sage Weil3dd72fc2010-03-22 14:42:30 -0700960 last_stamp = req->r_stamp;
Yehuda Sadeh422d2cb2010-02-26 15:32:31 -0800961
962 osd = req->r_osd;
963 BUG_ON(!osd);
964 pr_warning(" tid %llu timed out on osd%d, will reset osd\n",
965 req->r_tid, osd->o_osd);
966 __kick_requests(osdc, osd);
967 }
968
969 /*
970 * ping osds that are a bit slow. this ensures that if there
971 * is a break in the TCP connection we will notice, and reopen
972 * a connection with that osd (from the fault callback).
973 */
974 INIT_LIST_HEAD(&slow_osds);
975 list_for_each_entry(req, &osdc->req_lru, r_req_lru_item) {
Sage Weil3dd72fc2010-03-22 14:42:30 -0700976 if (time_before(jiffies, req->r_stamp + keepalive))
Yehuda Sadeh422d2cb2010-02-26 15:32:31 -0800977 break;
978
979 osd = req->r_osd;
980 BUG_ON(!osd);
981 dout(" tid %llu is slow, will send keepalive on osd%d\n",
Sage Weilf24e9982009-10-06 11:31:10 -0700982 req->r_tid, osd->o_osd);
Yehuda Sadeh422d2cb2010-02-26 15:32:31 -0800983 list_move_tail(&osd->o_keepalive_item, &slow_osds);
984 }
985 while (!list_empty(&slow_osds)) {
986 osd = list_entry(slow_osds.next, struct ceph_osd,
987 o_keepalive_item);
988 list_del_init(&osd->o_keepalive_item);
Sage Weilf24e9982009-10-06 11:31:10 -0700989 ceph_con_keepalive(&osd->o_con);
990 }
991
Yehuda Sadeh422d2cb2010-02-26 15:32:31 -0800992 __schedule_osd_timeout(osdc);
Sage Weilf24e9982009-10-06 11:31:10 -0700993 mutex_unlock(&osdc->request_mutex);
994
995 up_read(&osdc->map_sem);
996}
997
Yehuda Sadehf5a20412010-02-03 11:00:26 -0800998static void handle_osds_timeout(struct work_struct *work)
999{
1000 struct ceph_osd_client *osdc =
1001 container_of(work, struct ceph_osd_client,
1002 osds_timeout_work.work);
1003 unsigned long delay =
Yehuda Sadeh3d14c5d2010-04-06 15:14:15 -07001004 osdc->client->options->osd_idle_ttl * HZ >> 2;
Yehuda Sadehf5a20412010-02-03 11:00:26 -08001005
1006 dout("osds timeout\n");
1007 down_read(&osdc->map_sem);
1008 remove_old_osds(osdc, 0);
1009 up_read(&osdc->map_sem);
1010
1011 schedule_delayed_work(&osdc->osds_timeout_work,
1012 round_jiffies_relative(delay));
1013}
1014
Sage Weilf24e9982009-10-06 11:31:10 -07001015/*
1016 * handle osd op reply. either call the callback if it is specified,
1017 * or do the completion to wake up the waiting thread.
1018 */
Sage Weil350b1c32009-12-22 10:45:45 -08001019static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
1020 struct ceph_connection *con)
Sage Weilf24e9982009-10-06 11:31:10 -07001021{
1022 struct ceph_osd_reply_head *rhead = msg->front.iov_base;
1023 struct ceph_osd_request *req;
1024 u64 tid;
1025 int numops, object_len, flags;
Sage Weil0ceed5d2010-05-11 09:53:18 -07001026 s32 result;
Sage Weilf24e9982009-10-06 11:31:10 -07001027
Sage Weil6df058c2009-12-22 11:24:33 -08001028 tid = le64_to_cpu(msg->hdr.tid);
Sage Weilf24e9982009-10-06 11:31:10 -07001029 if (msg->front.iov_len < sizeof(*rhead))
1030 goto bad;
Sage Weilf24e9982009-10-06 11:31:10 -07001031 numops = le32_to_cpu(rhead->num_ops);
1032 object_len = le32_to_cpu(rhead->object_len);
Sage Weil0ceed5d2010-05-11 09:53:18 -07001033 result = le32_to_cpu(rhead->result);
Sage Weilf24e9982009-10-06 11:31:10 -07001034 if (msg->front.iov_len != sizeof(*rhead) + object_len +
1035 numops * sizeof(struct ceph_osd_op))
1036 goto bad;
Sage Weil0ceed5d2010-05-11 09:53:18 -07001037 dout("handle_reply %p tid %llu result %d\n", msg, tid, (int)result);
Sage Weilf24e9982009-10-06 11:31:10 -07001038
1039 /* lookup */
1040 mutex_lock(&osdc->request_mutex);
1041 req = __lookup_request(osdc, tid);
1042 if (req == NULL) {
1043 dout("handle_reply tid %llu dne\n", tid);
1044 mutex_unlock(&osdc->request_mutex);
1045 return;
1046 }
1047 ceph_osdc_get_request(req);
1048 flags = le32_to_cpu(rhead->flags);
1049
Sage Weil350b1c32009-12-22 10:45:45 -08001050 /*
Yehuda Sadeh0d59ab82010-01-13 17:03:23 -08001051 * if this connection filled our message, drop our reference now, to
Sage Weil350b1c32009-12-22 10:45:45 -08001052 * avoid a (safe but slower) revoke later.
1053 */
Yehuda Sadeh0d59ab82010-01-13 17:03:23 -08001054 if (req->r_con_filling_msg == con && req->r_reply == msg) {
Sage Weilc16e7862010-03-01 13:02:00 -08001055 dout(" dropping con_filling_msg ref %p\n", con);
Yehuda Sadeh0d59ab82010-01-13 17:03:23 -08001056 req->r_con_filling_msg = NULL;
Sage Weil350b1c32009-12-22 10:45:45 -08001057 ceph_con_put(con);
1058 }
1059
Sage Weilf24e9982009-10-06 11:31:10 -07001060 if (!req->r_got_reply) {
1061 unsigned bytes;
1062
1063 req->r_result = le32_to_cpu(rhead->result);
1064 bytes = le32_to_cpu(msg->hdr.data_len);
1065 dout("handle_reply result %d bytes %d\n", req->r_result,
1066 bytes);
1067 if (req->r_result == 0)
1068 req->r_result = bytes;
1069
1070 /* in case this is a write and we need to replay, */
1071 req->r_reassert_version = rhead->reassert_version;
1072
1073 req->r_got_reply = 1;
1074 } else if ((flags & CEPH_OSD_FLAG_ONDISK) == 0) {
1075 dout("handle_reply tid %llu dup ack\n", tid);
Sage Weil34b43a52009-12-01 12:23:54 -08001076 mutex_unlock(&osdc->request_mutex);
Sage Weilf24e9982009-10-06 11:31:10 -07001077 goto done;
1078 }
1079
1080 dout("handle_reply tid %llu flags %d\n", tid, flags);
1081
1082 /* either this is a read, or we got the safe response */
Sage Weil0ceed5d2010-05-11 09:53:18 -07001083 if (result < 0 ||
1084 (flags & CEPH_OSD_FLAG_ONDISK) ||
Sage Weilf24e9982009-10-06 11:31:10 -07001085 ((flags & CEPH_OSD_FLAG_WRITE) == 0))
1086 __unregister_request(osdc, req);
1087
1088 mutex_unlock(&osdc->request_mutex);
1089
1090 if (req->r_callback)
1091 req->r_callback(req, msg);
1092 else
Yehuda Sadeh03066f22010-07-27 13:11:08 -07001093 complete_all(&req->r_completion);
Sage Weilf24e9982009-10-06 11:31:10 -07001094
1095 if (flags & CEPH_OSD_FLAG_ONDISK) {
1096 if (req->r_safe_callback)
1097 req->r_safe_callback(req, msg);
Yehuda Sadeh03066f22010-07-27 13:11:08 -07001098 complete_all(&req->r_safe_completion); /* fsync waiter */
Sage Weilf24e9982009-10-06 11:31:10 -07001099 }
1100
1101done:
1102 ceph_osdc_put_request(req);
1103 return;
1104
1105bad:
1106 pr_err("corrupt osd_op_reply got %d %d expected %d\n",
1107 (int)msg->front.iov_len, le32_to_cpu(msg->hdr.front_len),
1108 (int)sizeof(*rhead));
Sage Weil9ec7cab2009-12-14 15:13:47 -08001109 ceph_msg_dump(msg);
Sage Weilf24e9982009-10-06 11:31:10 -07001110}
1111
1112
Yehuda Sadeh422d2cb2010-02-26 15:32:31 -08001113static int __kick_requests(struct ceph_osd_client *osdc,
Sage Weilf24e9982009-10-06 11:31:10 -07001114 struct ceph_osd *kickosd)
1115{
1116 struct ceph_osd_request *req;
1117 struct rb_node *p, *n;
1118 int needmap = 0;
1119 int err;
1120
1121 dout("kick_requests osd%d\n", kickosd ? kickosd->o_osd : -1);
Sage Weil153a0082010-02-15 12:11:51 -08001122 if (kickosd) {
Sage Weil87b315a2010-03-22 14:51:18 -07001123 err = __reset_osd(osdc, kickosd);
1124 if (err == -EAGAIN)
1125 return 1;
Sage Weil153a0082010-02-15 12:11:51 -08001126 } else {
Sage Weilf24e9982009-10-06 11:31:10 -07001127 for (p = rb_first(&osdc->osds); p; p = n) {
1128 struct ceph_osd *osd =
1129 rb_entry(p, struct ceph_osd, o_node);
1130
1131 n = rb_next(p);
1132 if (!ceph_osd_is_up(osdc->osdmap, osd->o_osd) ||
Sage Weil103e2d32010-01-07 16:12:36 -08001133 memcmp(&osd->o_con.peer_addr,
1134 ceph_osd_addr(osdc->osdmap,
1135 osd->o_osd),
1136 sizeof(struct ceph_entity_addr)) != 0)
Yehuda Sadehf5a20412010-02-03 11:00:26 -08001137 __reset_osd(osdc, osd);
Sage Weilf24e9982009-10-06 11:31:10 -07001138 }
1139 }
1140
1141 for (p = rb_first(&osdc->requests); p; p = rb_next(p)) {
1142 req = rb_entry(p, struct ceph_osd_request, r_node);
1143
1144 if (req->r_resend) {
1145 dout(" r_resend set on tid %llu\n", req->r_tid);
Sage Weil266673d2009-10-09 10:31:32 -07001146 __cancel_request(req);
Sage Weilf24e9982009-10-06 11:31:10 -07001147 goto kick;
1148 }
Sage Weil266673d2009-10-09 10:31:32 -07001149 if (req->r_osd && kickosd == req->r_osd) {
1150 __cancel_request(req);
Sage Weilf24e9982009-10-06 11:31:10 -07001151 goto kick;
Sage Weil266673d2009-10-09 10:31:32 -07001152 }
Sage Weilf24e9982009-10-06 11:31:10 -07001153
1154 err = __map_osds(osdc, req);
1155 if (err == 0)
1156 continue; /* no change */
1157 if (err < 0) {
1158 /*
1159 * FIXME: really, we should set the request
1160 * error and fail if this isn't a 'nofail'
1161 * request, but that's a fair bit more
1162 * complicated to do. So retry!
1163 */
1164 dout(" setting r_resend on %llu\n", req->r_tid);
1165 req->r_resend = true;
1166 continue;
1167 }
1168 if (req->r_osd == NULL) {
1169 dout("tid %llu maps to no valid osd\n", req->r_tid);
1170 needmap++; /* request a newer map */
1171 continue;
1172 }
1173
1174kick:
Sage Weilc1ea8822009-10-08 16:55:47 -07001175 dout("kicking %p tid %llu osd%d\n", req, req->r_tid,
Sage Weil12eadc12010-03-15 22:20:39 -07001176 req->r_osd ? req->r_osd->o_osd : -1);
Sage Weilf24e9982009-10-06 11:31:10 -07001177 req->r_flags |= CEPH_OSD_FLAG_RETRY;
1178 err = __send_request(osdc, req);
1179 if (err) {
1180 dout(" setting r_resend on %llu\n", req->r_tid);
1181 req->r_resend = true;
1182 }
1183 }
Yehuda Sadeh422d2cb2010-02-26 15:32:31 -08001184
1185 return needmap;
1186}
1187
1188/*
1189 * Resubmit osd requests whose osd or osd address has changed. Request
1190 * a new osd map if osds are down, or we are otherwise unable to determine
1191 * how to direct a request.
1192 *
1193 * Close connections to down osds.
1194 *
1195 * If @who is specified, resubmit requests for that specific osd.
1196 *
1197 * Caller should hold map_sem for read and request_mutex.
1198 */
1199static void kick_requests(struct ceph_osd_client *osdc,
1200 struct ceph_osd *kickosd)
1201{
1202 int needmap;
1203
1204 mutex_lock(&osdc->request_mutex);
1205 needmap = __kick_requests(osdc, kickosd);
Sage Weilf24e9982009-10-06 11:31:10 -07001206 mutex_unlock(&osdc->request_mutex);
1207
1208 if (needmap) {
1209 dout("%d requests for down osds, need new map\n", needmap);
1210 ceph_monc_request_next_osdmap(&osdc->client->monc);
1211 }
Sage Weilf24e9982009-10-06 11:31:10 -07001212
Yehuda Sadeh422d2cb2010-02-26 15:32:31 -08001213}
Sage Weilf24e9982009-10-06 11:31:10 -07001214/*
1215 * Process updated osd map.
1216 *
1217 * The message contains any number of incremental and full maps, normally
1218 * indicating some sort of topology change in the cluster. Kick requests
1219 * off to different OSDs as needed.
1220 */
1221void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
1222{
1223 void *p, *end, *next;
1224 u32 nr_maps, maplen;
1225 u32 epoch;
1226 struct ceph_osdmap *newmap = NULL, *oldmap;
1227 int err;
1228 struct ceph_fsid fsid;
1229
1230 dout("handle_map have %u\n", osdc->osdmap ? osdc->osdmap->epoch : 0);
1231 p = msg->front.iov_base;
1232 end = p + msg->front.iov_len;
1233
1234 /* verify fsid */
1235 ceph_decode_need(&p, end, sizeof(fsid), bad);
1236 ceph_decode_copy(&p, &fsid, sizeof(fsid));
Sage Weil07433042009-11-18 16:50:41 -08001237 if (ceph_check_fsid(osdc->client, &fsid) < 0)
1238 return;
Sage Weilf24e9982009-10-06 11:31:10 -07001239
1240 down_write(&osdc->map_sem);
1241
1242 /* incremental maps */
1243 ceph_decode_32_safe(&p, end, nr_maps, bad);
1244 dout(" %d inc maps\n", nr_maps);
1245 while (nr_maps > 0) {
1246 ceph_decode_need(&p, end, 2*sizeof(u32), bad);
Sage Weilc89136e2009-10-14 09:59:09 -07001247 epoch = ceph_decode_32(&p);
1248 maplen = ceph_decode_32(&p);
Sage Weilf24e9982009-10-06 11:31:10 -07001249 ceph_decode_need(&p, end, maplen, bad);
1250 next = p + maplen;
1251 if (osdc->osdmap && osdc->osdmap->epoch+1 == epoch) {
1252 dout("applying incremental map %u len %d\n",
1253 epoch, maplen);
1254 newmap = osdmap_apply_incremental(&p, next,
1255 osdc->osdmap,
1256 osdc->client->msgr);
1257 if (IS_ERR(newmap)) {
1258 err = PTR_ERR(newmap);
1259 goto bad;
1260 }
Sage Weil30dc6382009-12-21 14:49:37 -08001261 BUG_ON(!newmap);
Sage Weilf24e9982009-10-06 11:31:10 -07001262 if (newmap != osdc->osdmap) {
1263 ceph_osdmap_destroy(osdc->osdmap);
1264 osdc->osdmap = newmap;
1265 }
1266 } else {
1267 dout("ignoring incremental map %u len %d\n",
1268 epoch, maplen);
1269 }
1270 p = next;
1271 nr_maps--;
1272 }
1273 if (newmap)
1274 goto done;
1275
1276 /* full maps */
1277 ceph_decode_32_safe(&p, end, nr_maps, bad);
1278 dout(" %d full maps\n", nr_maps);
1279 while (nr_maps) {
1280 ceph_decode_need(&p, end, 2*sizeof(u32), bad);
Sage Weilc89136e2009-10-14 09:59:09 -07001281 epoch = ceph_decode_32(&p);
1282 maplen = ceph_decode_32(&p);
Sage Weilf24e9982009-10-06 11:31:10 -07001283 ceph_decode_need(&p, end, maplen, bad);
1284 if (nr_maps > 1) {
1285 dout("skipping non-latest full map %u len %d\n",
1286 epoch, maplen);
1287 } else if (osdc->osdmap && osdc->osdmap->epoch >= epoch) {
1288 dout("skipping full map %u len %d, "
1289 "older than our %u\n", epoch, maplen,
1290 osdc->osdmap->epoch);
1291 } else {
1292 dout("taking full map %u len %d\n", epoch, maplen);
1293 newmap = osdmap_decode(&p, p+maplen);
1294 if (IS_ERR(newmap)) {
1295 err = PTR_ERR(newmap);
1296 goto bad;
1297 }
Sage Weil30dc6382009-12-21 14:49:37 -08001298 BUG_ON(!newmap);
Sage Weilf24e9982009-10-06 11:31:10 -07001299 oldmap = osdc->osdmap;
1300 osdc->osdmap = newmap;
1301 if (oldmap)
1302 ceph_osdmap_destroy(oldmap);
1303 }
1304 p += maplen;
1305 nr_maps--;
1306 }
1307
1308done:
1309 downgrade_write(&osdc->map_sem);
1310 ceph_monc_got_osdmap(&osdc->client->monc, osdc->osdmap->epoch);
1311 if (newmap)
1312 kick_requests(osdc, NULL);
1313 up_read(&osdc->map_sem);
Yehuda Sadeh03066f22010-07-27 13:11:08 -07001314 wake_up_all(&osdc->client->auth_wq);
Sage Weilf24e9982009-10-06 11:31:10 -07001315 return;
1316
1317bad:
1318 pr_err("osdc handle_map corrupt msg\n");
Sage Weil9ec7cab2009-12-14 15:13:47 -08001319 ceph_msg_dump(msg);
Sage Weilf24e9982009-10-06 11:31:10 -07001320 up_write(&osdc->map_sem);
1321 return;
1322}
1323
Sage Weilf24e9982009-10-06 11:31:10 -07001324/*
1325 * Register request, send initial attempt.
1326 */
1327int ceph_osdc_start_request(struct ceph_osd_client *osdc,
1328 struct ceph_osd_request *req,
1329 bool nofail)
1330{
Sage Weilc1ea8822009-10-08 16:55:47 -07001331 int rc = 0;
Sage Weilf24e9982009-10-06 11:31:10 -07001332
1333 req->r_request->pages = req->r_pages;
1334 req->r_request->nr_pages = req->r_num_pages;
Yehuda Sadeh68b44762010-04-06 15:01:27 -07001335#ifdef CONFIG_BLOCK
1336 req->r_request->bio = req->r_bio;
1337#endif
1338 req->r_request->trail = req->r_trail;
Sage Weilf24e9982009-10-06 11:31:10 -07001339
1340 register_request(osdc, req);
1341
1342 down_read(&osdc->map_sem);
1343 mutex_lock(&osdc->request_mutex);
Sage Weilc1ea8822009-10-08 16:55:47 -07001344 /*
1345 * a racing kick_requests() may have sent the message for us
1346 * while we dropped request_mutex above, so only send now if
1347 * the request still han't been touched yet.
1348 */
1349 if (req->r_sent == 0) {
1350 rc = __send_request(osdc, req);
1351 if (rc) {
1352 if (nofail) {
1353 dout("osdc_start_request failed send, "
1354 " marking %lld\n", req->r_tid);
1355 req->r_resend = true;
1356 rc = 0;
1357 } else {
1358 __unregister_request(osdc, req);
1359 }
Sage Weilf24e9982009-10-06 11:31:10 -07001360 }
1361 }
1362 mutex_unlock(&osdc->request_mutex);
1363 up_read(&osdc->map_sem);
1364 return rc;
1365}
Yehuda Sadeh3d14c5d2010-04-06 15:14:15 -07001366EXPORT_SYMBOL(ceph_osdc_start_request);
Sage Weilf24e9982009-10-06 11:31:10 -07001367
1368/*
1369 * wait for a request to complete
1370 */
1371int ceph_osdc_wait_request(struct ceph_osd_client *osdc,
1372 struct ceph_osd_request *req)
1373{
1374 int rc;
1375
1376 rc = wait_for_completion_interruptible(&req->r_completion);
1377 if (rc < 0) {
1378 mutex_lock(&osdc->request_mutex);
1379 __cancel_request(req);
Sage Weil529cfcc2009-12-22 10:29:39 -08001380 __unregister_request(osdc, req);
Sage Weilf24e9982009-10-06 11:31:10 -07001381 mutex_unlock(&osdc->request_mutex);
Sage Weil529cfcc2009-12-22 10:29:39 -08001382 dout("wait_request tid %llu canceled/timed out\n", req->r_tid);
Sage Weilf24e9982009-10-06 11:31:10 -07001383 return rc;
1384 }
1385
1386 dout("wait_request tid %llu result %d\n", req->r_tid, req->r_result);
1387 return req->r_result;
1388}
Yehuda Sadeh3d14c5d2010-04-06 15:14:15 -07001389EXPORT_SYMBOL(ceph_osdc_wait_request);
Sage Weilf24e9982009-10-06 11:31:10 -07001390
1391/*
1392 * sync - wait for all in-flight requests to flush. avoid starvation.
1393 */
1394void ceph_osdc_sync(struct ceph_osd_client *osdc)
1395{
1396 struct ceph_osd_request *req;
1397 u64 last_tid, next_tid = 0;
1398
1399 mutex_lock(&osdc->request_mutex);
1400 last_tid = osdc->last_tid;
1401 while (1) {
1402 req = __lookup_request_ge(osdc, next_tid);
1403 if (!req)
1404 break;
1405 if (req->r_tid > last_tid)
1406 break;
1407
1408 next_tid = req->r_tid + 1;
1409 if ((req->r_flags & CEPH_OSD_FLAG_WRITE) == 0)
1410 continue;
1411
1412 ceph_osdc_get_request(req);
1413 mutex_unlock(&osdc->request_mutex);
1414 dout("sync waiting on tid %llu (last is %llu)\n",
1415 req->r_tid, last_tid);
1416 wait_for_completion(&req->r_safe_completion);
1417 mutex_lock(&osdc->request_mutex);
1418 ceph_osdc_put_request(req);
1419 }
1420 mutex_unlock(&osdc->request_mutex);
1421 dout("sync done (thru tid %llu)\n", last_tid);
1422}
Yehuda Sadeh3d14c5d2010-04-06 15:14:15 -07001423EXPORT_SYMBOL(ceph_osdc_sync);
Sage Weilf24e9982009-10-06 11:31:10 -07001424
1425/*
1426 * init, shutdown
1427 */
1428int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
1429{
1430 int err;
1431
1432 dout("init\n");
1433 osdc->client = client;
1434 osdc->osdmap = NULL;
1435 init_rwsem(&osdc->map_sem);
1436 init_completion(&osdc->map_waiters);
1437 osdc->last_requested_map = 0;
1438 mutex_init(&osdc->request_mutex);
Sage Weilf24e9982009-10-06 11:31:10 -07001439 osdc->last_tid = 0;
1440 osdc->osds = RB_ROOT;
Yehuda Sadehf5a20412010-02-03 11:00:26 -08001441 INIT_LIST_HEAD(&osdc->osd_lru);
Sage Weilf24e9982009-10-06 11:31:10 -07001442 osdc->requests = RB_ROOT;
Yehuda Sadeh422d2cb2010-02-26 15:32:31 -08001443 INIT_LIST_HEAD(&osdc->req_lru);
Sage Weilf24e9982009-10-06 11:31:10 -07001444 osdc->num_requests = 0;
1445 INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout);
Yehuda Sadehf5a20412010-02-03 11:00:26 -08001446 INIT_DELAYED_WORK(&osdc->osds_timeout_work, handle_osds_timeout);
1447
1448 schedule_delayed_work(&osdc->osds_timeout_work,
Yehuda Sadeh3d14c5d2010-04-06 15:14:15 -07001449 round_jiffies_relative(osdc->client->options->osd_idle_ttl * HZ));
Sage Weilf24e9982009-10-06 11:31:10 -07001450
Sage Weil5f44f142009-11-18 14:52:18 -08001451 err = -ENOMEM;
Sage Weilf24e9982009-10-06 11:31:10 -07001452 osdc->req_mempool = mempool_create_kmalloc_pool(10,
1453 sizeof(struct ceph_osd_request));
1454 if (!osdc->req_mempool)
Sage Weil5f44f142009-11-18 14:52:18 -08001455 goto out;
Sage Weilf24e9982009-10-06 11:31:10 -07001456
Sage Weil4f482802010-04-24 09:56:35 -07001457 err = ceph_msgpool_init(&osdc->msgpool_op, OSD_OP_FRONT_LEN, 10, true,
1458 "osd_op");
Sage Weilf24e9982009-10-06 11:31:10 -07001459 if (err < 0)
Sage Weil5f44f142009-11-18 14:52:18 -08001460 goto out_mempool;
Sage Weilc16e7862010-03-01 13:02:00 -08001461 err = ceph_msgpool_init(&osdc->msgpool_op_reply,
Sage Weil4f482802010-04-24 09:56:35 -07001462 OSD_OPREPLY_FRONT_LEN, 10, true,
1463 "osd_op_reply");
Sage Weilc16e7862010-03-01 13:02:00 -08001464 if (err < 0)
1465 goto out_msgpool;
Sage Weilf24e9982009-10-06 11:31:10 -07001466 return 0;
Sage Weil5f44f142009-11-18 14:52:18 -08001467
Sage Weilc16e7862010-03-01 13:02:00 -08001468out_msgpool:
1469 ceph_msgpool_destroy(&osdc->msgpool_op);
Sage Weil5f44f142009-11-18 14:52:18 -08001470out_mempool:
1471 mempool_destroy(osdc->req_mempool);
1472out:
1473 return err;
Sage Weilf24e9982009-10-06 11:31:10 -07001474}
Yehuda Sadeh3d14c5d2010-04-06 15:14:15 -07001475EXPORT_SYMBOL(ceph_osdc_init);
Sage Weilf24e9982009-10-06 11:31:10 -07001476
1477void ceph_osdc_stop(struct ceph_osd_client *osdc)
1478{
1479 cancel_delayed_work_sync(&osdc->timeout_work);
Yehuda Sadehf5a20412010-02-03 11:00:26 -08001480 cancel_delayed_work_sync(&osdc->osds_timeout_work);
Sage Weilf24e9982009-10-06 11:31:10 -07001481 if (osdc->osdmap) {
1482 ceph_osdmap_destroy(osdc->osdmap);
1483 osdc->osdmap = NULL;
1484 }
Yehuda Sadehf5a20412010-02-03 11:00:26 -08001485 remove_old_osds(osdc, 1);
Sage Weilf24e9982009-10-06 11:31:10 -07001486 mempool_destroy(osdc->req_mempool);
1487 ceph_msgpool_destroy(&osdc->msgpool_op);
Sage Weilc16e7862010-03-01 13:02:00 -08001488 ceph_msgpool_destroy(&osdc->msgpool_op_reply);
Sage Weilf24e9982009-10-06 11:31:10 -07001489}
Yehuda Sadeh3d14c5d2010-04-06 15:14:15 -07001490EXPORT_SYMBOL(ceph_osdc_stop);
Sage Weilf24e9982009-10-06 11:31:10 -07001491
1492/*
1493 * Read some contiguous pages. If we cross a stripe boundary, shorten
1494 * *plen. Return number of bytes read, or error.
1495 */
1496int ceph_osdc_readpages(struct ceph_osd_client *osdc,
1497 struct ceph_vino vino, struct ceph_file_layout *layout,
1498 u64 off, u64 *plen,
1499 u32 truncate_seq, u64 truncate_size,
Sage Weilb7495fc2010-11-09 12:43:12 -08001500 struct page **pages, int num_pages, int page_align)
Sage Weilf24e9982009-10-06 11:31:10 -07001501{
1502 struct ceph_osd_request *req;
1503 int rc = 0;
1504
1505 dout("readpages on ino %llx.%llx on %llu~%llu\n", vino.ino,
1506 vino.snap, off, *plen);
1507 req = ceph_osdc_new_request(osdc, layout, vino, off, plen,
1508 CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ,
1509 NULL, 0, truncate_seq, truncate_size, NULL,
Sage Weilb7495fc2010-11-09 12:43:12 -08001510 false, 1, page_align);
Sage Weila79832f2010-04-01 16:06:19 -07001511 if (!req)
1512 return -ENOMEM;
Sage Weilf24e9982009-10-06 11:31:10 -07001513
1514 /* it may be a short read due to an object boundary */
1515 req->r_pages = pages;
Sage Weilf24e9982009-10-06 11:31:10 -07001516
Sage Weilb7495fc2010-11-09 12:43:12 -08001517 dout("readpages final extent is %llu~%llu (%d pages align %d)\n",
1518 off, *plen, req->r_num_pages, page_align);
Sage Weilf24e9982009-10-06 11:31:10 -07001519
1520 rc = ceph_osdc_start_request(osdc, req, false);
1521 if (!rc)
1522 rc = ceph_osdc_wait_request(osdc, req);
1523
1524 ceph_osdc_put_request(req);
1525 dout("readpages result %d\n", rc);
1526 return rc;
1527}
Yehuda Sadeh3d14c5d2010-04-06 15:14:15 -07001528EXPORT_SYMBOL(ceph_osdc_readpages);
Sage Weilf24e9982009-10-06 11:31:10 -07001529
1530/*
1531 * do a synchronous write on N pages
1532 */
1533int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
1534 struct ceph_file_layout *layout,
1535 struct ceph_snap_context *snapc,
1536 u64 off, u64 len,
1537 u32 truncate_seq, u64 truncate_size,
1538 struct timespec *mtime,
1539 struct page **pages, int num_pages,
1540 int flags, int do_sync, bool nofail)
1541{
1542 struct ceph_osd_request *req;
1543 int rc = 0;
Sage Weilb7495fc2010-11-09 12:43:12 -08001544 int page_align = off & ~PAGE_MASK;
Sage Weilf24e9982009-10-06 11:31:10 -07001545
1546 BUG_ON(vino.snap != CEPH_NOSNAP);
1547 req = ceph_osdc_new_request(osdc, layout, vino, off, &len,
1548 CEPH_OSD_OP_WRITE,
1549 flags | CEPH_OSD_FLAG_ONDISK |
1550 CEPH_OSD_FLAG_WRITE,
1551 snapc, do_sync,
1552 truncate_seq, truncate_size, mtime,
Sage Weilb7495fc2010-11-09 12:43:12 -08001553 nofail, 1, page_align);
Sage Weila79832f2010-04-01 16:06:19 -07001554 if (!req)
1555 return -ENOMEM;
Sage Weilf24e9982009-10-06 11:31:10 -07001556
1557 /* it may be a short write due to an object boundary */
1558 req->r_pages = pages;
Sage Weilf24e9982009-10-06 11:31:10 -07001559 dout("writepages %llu~%llu (%d pages)\n", off, len,
1560 req->r_num_pages);
1561
1562 rc = ceph_osdc_start_request(osdc, req, nofail);
1563 if (!rc)
1564 rc = ceph_osdc_wait_request(osdc, req);
1565
1566 ceph_osdc_put_request(req);
1567 if (rc == 0)
1568 rc = len;
1569 dout("writepages result %d\n", rc);
1570 return rc;
1571}
Yehuda Sadeh3d14c5d2010-04-06 15:14:15 -07001572EXPORT_SYMBOL(ceph_osdc_writepages);
Sage Weilf24e9982009-10-06 11:31:10 -07001573
1574/*
1575 * handle incoming message
1576 */
1577static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
1578{
1579 struct ceph_osd *osd = con->private;
Julia Lawall32c895e2009-11-21 16:53:16 +01001580 struct ceph_osd_client *osdc;
Sage Weilf24e9982009-10-06 11:31:10 -07001581 int type = le16_to_cpu(msg->hdr.type);
1582
1583 if (!osd)
Sage Weil4a32f932010-06-13 10:27:53 -07001584 goto out;
Julia Lawall32c895e2009-11-21 16:53:16 +01001585 osdc = osd->o_osdc;
Sage Weilf24e9982009-10-06 11:31:10 -07001586
1587 switch (type) {
1588 case CEPH_MSG_OSD_MAP:
1589 ceph_osdc_handle_map(osdc, msg);
1590 break;
1591 case CEPH_MSG_OSD_OPREPLY:
Sage Weil350b1c32009-12-22 10:45:45 -08001592 handle_reply(osdc, msg, con);
Sage Weilf24e9982009-10-06 11:31:10 -07001593 break;
1594
1595 default:
1596 pr_err("received unknown message type %d %s\n", type,
1597 ceph_msg_type_name(type));
1598 }
Sage Weil4a32f932010-06-13 10:27:53 -07001599out:
Sage Weilf24e9982009-10-06 11:31:10 -07001600 ceph_msg_put(msg);
1601}
1602
Sage Weil5b3a4db2010-02-19 21:43:23 -08001603/*
Sage Weil21b667f2010-03-04 10:22:59 -08001604 * lookup and return message for incoming reply. set up reply message
1605 * pages.
Sage Weil5b3a4db2010-02-19 21:43:23 -08001606 */
1607static struct ceph_msg *get_reply(struct ceph_connection *con,
Yehuda Sadeh24504182010-01-08 13:58:34 -08001608 struct ceph_msg_header *hdr,
1609 int *skip)
Sage Weilf24e9982009-10-06 11:31:10 -07001610{
1611 struct ceph_osd *osd = con->private;
1612 struct ceph_osd_client *osdc = osd->o_osdc;
Yehuda Sadeh24504182010-01-08 13:58:34 -08001613 struct ceph_msg *m;
Yehuda Sadeh0547a9b2010-01-11 14:47:13 -08001614 struct ceph_osd_request *req;
Sage Weil5b3a4db2010-02-19 21:43:23 -08001615 int front = le32_to_cpu(hdr->front_len);
1616 int data_len = le32_to_cpu(hdr->data_len);
Yehuda Sadeh0547a9b2010-01-11 14:47:13 -08001617 u64 tid;
Sage Weilf24e9982009-10-06 11:31:10 -07001618
Yehuda Sadeh0547a9b2010-01-11 14:47:13 -08001619 tid = le64_to_cpu(hdr->tid);
1620 mutex_lock(&osdc->request_mutex);
1621 req = __lookup_request(osdc, tid);
1622 if (!req) {
1623 *skip = 1;
1624 m = NULL;
Sage Weilc16e7862010-03-01 13:02:00 -08001625 pr_info("get_reply unknown tid %llu from osd%d\n", tid,
Sage Weil5b3a4db2010-02-19 21:43:23 -08001626 osd->o_osd);
Yehuda Sadeh0547a9b2010-01-11 14:47:13 -08001627 goto out;
1628 }
Sage Weilc16e7862010-03-01 13:02:00 -08001629
1630 if (req->r_con_filling_msg) {
1631 dout("get_reply revoking msg %p from old con %p\n",
1632 req->r_reply, req->r_con_filling_msg);
1633 ceph_con_revoke_message(req->r_con_filling_msg, req->r_reply);
1634 ceph_con_put(req->r_con_filling_msg);
Sage Weil6f46cb22010-03-24 21:30:19 -07001635 req->r_con_filling_msg = NULL;
Sage Weilf24e9982009-10-06 11:31:10 -07001636 }
Yehuda Sadeh24504182010-01-08 13:58:34 -08001637
Sage Weilc16e7862010-03-01 13:02:00 -08001638 if (front > req->r_reply->front.iov_len) {
1639 pr_warning("get_reply front %d > preallocated %d\n",
1640 front, (int)req->r_reply->front.iov_len);
Yehuda Sadeh34d23762010-04-06 14:33:58 -07001641 m = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, front, GFP_NOFS);
Sage Weila79832f2010-04-01 16:06:19 -07001642 if (!m)
Sage Weilc16e7862010-03-01 13:02:00 -08001643 goto out;
1644 ceph_msg_put(req->r_reply);
1645 req->r_reply = m;
1646 }
1647 m = ceph_msg_get(req->r_reply);
1648
Yehuda Sadeh0547a9b2010-01-11 14:47:13 -08001649 if (data_len > 0) {
Sage Weilb7495fc2010-11-09 12:43:12 -08001650 int want = calc_pages_for(req->r_page_alignment, data_len);
Sage Weil21b667f2010-03-04 10:22:59 -08001651
1652 if (unlikely(req->r_num_pages < want)) {
1653 pr_warning("tid %lld reply %d > expected %d pages\n",
1654 tid, want, m->nr_pages);
Yehuda Sadeh0547a9b2010-01-11 14:47:13 -08001655 *skip = 1;
1656 ceph_msg_put(m);
Sage Weila79832f2010-04-01 16:06:19 -07001657 m = NULL;
Sage Weil21b667f2010-03-04 10:22:59 -08001658 goto out;
Yehuda Sadeh0547a9b2010-01-11 14:47:13 -08001659 }
Sage Weil21b667f2010-03-04 10:22:59 -08001660 m->pages = req->r_pages;
1661 m->nr_pages = req->r_num_pages;
Sage Weilc5c6b192010-11-09 12:40:00 -08001662 m->page_alignment = req->r_page_alignment;
Yehuda Sadeh68b44762010-04-06 15:01:27 -07001663#ifdef CONFIG_BLOCK
1664 m->bio = req->r_bio;
1665#endif
Yehuda Sadeh0547a9b2010-01-11 14:47:13 -08001666 }
Sage Weil5b3a4db2010-02-19 21:43:23 -08001667 *skip = 0;
Sage Weilc16e7862010-03-01 13:02:00 -08001668 req->r_con_filling_msg = ceph_con_get(con);
1669 dout("get_reply tid %lld %p\n", tid, m);
Yehuda Sadeh0547a9b2010-01-11 14:47:13 -08001670
1671out:
1672 mutex_unlock(&osdc->request_mutex);
Yehuda Sadeh24504182010-01-08 13:58:34 -08001673 return m;
Sage Weil5b3a4db2010-02-19 21:43:23 -08001674
1675}
1676
1677static struct ceph_msg *alloc_msg(struct ceph_connection *con,
1678 struct ceph_msg_header *hdr,
1679 int *skip)
1680{
1681 struct ceph_osd *osd = con->private;
1682 int type = le16_to_cpu(hdr->type);
1683 int front = le32_to_cpu(hdr->front_len);
1684
1685 switch (type) {
1686 case CEPH_MSG_OSD_MAP:
Yehuda Sadeh34d23762010-04-06 14:33:58 -07001687 return ceph_msg_new(type, front, GFP_NOFS);
Sage Weil5b3a4db2010-02-19 21:43:23 -08001688 case CEPH_MSG_OSD_OPREPLY:
1689 return get_reply(con, hdr, skip);
1690 default:
1691 pr_info("alloc_msg unexpected msg type %d from osd%d\n", type,
1692 osd->o_osd);
1693 *skip = 1;
1694 return NULL;
1695 }
Sage Weilf24e9982009-10-06 11:31:10 -07001696}
1697
1698/*
1699 * Wrappers to refcount containing ceph_osd struct
1700 */
1701static struct ceph_connection *get_osd_con(struct ceph_connection *con)
1702{
1703 struct ceph_osd *osd = con->private;
1704 if (get_osd(osd))
1705 return con;
1706 return NULL;
1707}
1708
1709static void put_osd_con(struct ceph_connection *con)
1710{
1711 struct ceph_osd *osd = con->private;
1712 put_osd(osd);
1713}
1714
Sage Weil4e7a5dc2009-11-18 16:19:57 -08001715/*
1716 * authentication
1717 */
1718static int get_authorizer(struct ceph_connection *con,
Sage Weil213c99e2010-08-03 10:25:11 -07001719 void **buf, int *len, int *proto,
1720 void **reply_buf, int *reply_len, int force_new)
Sage Weil4e7a5dc2009-11-18 16:19:57 -08001721{
1722 struct ceph_osd *o = con->private;
1723 struct ceph_osd_client *osdc = o->o_osdc;
1724 struct ceph_auth_client *ac = osdc->client->monc.auth;
1725 int ret = 0;
1726
1727 if (force_new && o->o_authorizer) {
1728 ac->ops->destroy_authorizer(ac, o->o_authorizer);
1729 o->o_authorizer = NULL;
1730 }
1731 if (o->o_authorizer == NULL) {
1732 ret = ac->ops->create_authorizer(
1733 ac, CEPH_ENTITY_TYPE_OSD,
1734 &o->o_authorizer,
1735 &o->o_authorizer_buf,
1736 &o->o_authorizer_buf_len,
1737 &o->o_authorizer_reply_buf,
1738 &o->o_authorizer_reply_buf_len);
1739 if (ret)
Sage Weil213c99e2010-08-03 10:25:11 -07001740 return ret;
Sage Weil4e7a5dc2009-11-18 16:19:57 -08001741 }
1742
1743 *proto = ac->protocol;
1744 *buf = o->o_authorizer_buf;
1745 *len = o->o_authorizer_buf_len;
1746 *reply_buf = o->o_authorizer_reply_buf;
1747 *reply_len = o->o_authorizer_reply_buf_len;
1748 return 0;
1749}
1750
1751
1752static int verify_authorizer_reply(struct ceph_connection *con, int len)
1753{
1754 struct ceph_osd *o = con->private;
1755 struct ceph_osd_client *osdc = o->o_osdc;
1756 struct ceph_auth_client *ac = osdc->client->monc.auth;
1757
1758 return ac->ops->verify_authorizer_reply(ac, o->o_authorizer, len);
1759}
1760
Sage Weil9bd2e6f2010-02-02 16:21:06 -08001761static int invalidate_authorizer(struct ceph_connection *con)
1762{
1763 struct ceph_osd *o = con->private;
1764 struct ceph_osd_client *osdc = o->o_osdc;
1765 struct ceph_auth_client *ac = osdc->client->monc.auth;
1766
1767 if (ac->ops->invalidate_authorizer)
1768 ac->ops->invalidate_authorizer(ac, CEPH_ENTITY_TYPE_OSD);
1769
1770 return ceph_monc_validate_auth(&osdc->client->monc);
1771}
Sage Weil4e7a5dc2009-11-18 16:19:57 -08001772
Tobias Klauser9e327892010-05-20 10:40:19 +02001773static const struct ceph_connection_operations osd_con_ops = {
Sage Weilf24e9982009-10-06 11:31:10 -07001774 .get = get_osd_con,
1775 .put = put_osd_con,
1776 .dispatch = dispatch,
Sage Weil4e7a5dc2009-11-18 16:19:57 -08001777 .get_authorizer = get_authorizer,
1778 .verify_authorizer_reply = verify_authorizer_reply,
Sage Weil9bd2e6f2010-02-02 16:21:06 -08001779 .invalidate_authorizer = invalidate_authorizer,
Sage Weilf24e9982009-10-06 11:31:10 -07001780 .alloc_msg = alloc_msg,
Sage Weil81b024e2009-10-09 10:29:18 -07001781 .fault = osd_reset,
Sage Weilf24e9982009-10-06 11:31:10 -07001782};