blob: 9d26b3a396718c902b500b8c4f51551b1ef1d6ff [file] [log] [blame]
David Teiglande7fd4172006-01-18 09:30:29 +00001/******************************************************************************
2*******************************************************************************
3**
David Teiglandef0c2bb2007-03-28 09:56:46 -05004** Copyright (C) 2005-2007 Red Hat, Inc. All rights reserved.
David Teiglande7fd4172006-01-18 09:30:29 +00005**
6** This copyrighted material is made available to anyone wishing to use,
7** modify, copy, or redistribute it subject to the terms and conditions
8** of the GNU General Public License v.2.
9**
10*******************************************************************************
11******************************************************************************/
12
13/* Central locking logic has four stages:
14
15 dlm_lock()
16 dlm_unlock()
17
18 request_lock(ls, lkb)
19 convert_lock(ls, lkb)
20 unlock_lock(ls, lkb)
21 cancel_lock(ls, lkb)
22
23 _request_lock(r, lkb)
24 _convert_lock(r, lkb)
25 _unlock_lock(r, lkb)
26 _cancel_lock(r, lkb)
27
28 do_request(r, lkb)
29 do_convert(r, lkb)
30 do_unlock(r, lkb)
31 do_cancel(r, lkb)
32
33 Stage 1 (lock, unlock) is mainly about checking input args and
34 splitting into one of the four main operations:
35
36 dlm_lock = request_lock
37 dlm_lock+CONVERT = convert_lock
38 dlm_unlock = unlock_lock
39 dlm_unlock+CANCEL = cancel_lock
40
41 Stage 2, xxxx_lock(), just finds and locks the relevant rsb which is
42 provided to the next stage.
43
44 Stage 3, _xxxx_lock(), determines if the operation is local or remote.
45 When remote, it calls send_xxxx(), when local it calls do_xxxx().
46
47 Stage 4, do_xxxx(), is the guts of the operation. It manipulates the
48 given rsb and lkb and queues callbacks.
49
50 For remote operations, send_xxxx() results in the corresponding do_xxxx()
51 function being executed on the remote node. The connecting send/receive
52 calls on local (L) and remote (R) nodes:
53
54 L: send_xxxx() -> R: receive_xxxx()
55 R: do_xxxx()
56 L: receive_xxxx_reply() <- R: send_xxxx_reply()
57*/
David Teigland597d0ca2006-07-12 16:44:04 -050058#include <linux/types.h>
David Teiglande7fd4172006-01-18 09:30:29 +000059#include "dlm_internal.h"
David Teigland597d0ca2006-07-12 16:44:04 -050060#include <linux/dlm_device.h>
David Teiglande7fd4172006-01-18 09:30:29 +000061#include "memory.h"
62#include "lowcomms.h"
63#include "requestqueue.h"
64#include "util.h"
65#include "dir.h"
66#include "member.h"
67#include "lockspace.h"
68#include "ast.h"
69#include "lock.h"
70#include "rcom.h"
71#include "recover.h"
72#include "lvb_table.h"
David Teigland597d0ca2006-07-12 16:44:04 -050073#include "user.h"
David Teiglande7fd4172006-01-18 09:30:29 +000074#include "config.h"
75
76static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
77static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb);
78static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb);
79static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb);
80static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb);
81static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
82static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
83static int send_remove(struct dlm_rsb *r);
84static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
85static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
86 struct dlm_message *ms);
87static int receive_extralen(struct dlm_message *ms);
David Teigland84991372007-03-30 15:02:40 -050088static void do_purge(struct dlm_ls *ls, int nodeid, int pid);
David Teiglande7fd4172006-01-18 09:30:29 +000089
90/*
91 * Lock compatibilty matrix - thanks Steve
92 * UN = Unlocked state. Not really a state, used as a flag
93 * PD = Padding. Used to make the matrix a nice power of two in size
94 * Other states are the same as the VMS DLM.
95 * Usage: matrix[grmode+1][rqmode+1] (although m[rq+1][gr+1] is the same)
96 */
97
98static const int __dlm_compat_matrix[8][8] = {
99 /* UN NL CR CW PR PW EX PD */
100 {1, 1, 1, 1, 1, 1, 1, 0}, /* UN */
101 {1, 1, 1, 1, 1, 1, 1, 0}, /* NL */
102 {1, 1, 1, 1, 1, 1, 0, 0}, /* CR */
103 {1, 1, 1, 1, 0, 0, 0, 0}, /* CW */
104 {1, 1, 1, 0, 1, 0, 0, 0}, /* PR */
105 {1, 1, 1, 0, 0, 0, 0, 0}, /* PW */
106 {1, 1, 0, 0, 0, 0, 0, 0}, /* EX */
107 {0, 0, 0, 0, 0, 0, 0, 0} /* PD */
108};
109
110/*
111 * This defines the direction of transfer of LVB data.
112 * Granted mode is the row; requested mode is the column.
113 * Usage: matrix[grmode+1][rqmode+1]
114 * 1 = LVB is returned to the caller
115 * 0 = LVB is written to the resource
116 * -1 = nothing happens to the LVB
117 */
118
119const int dlm_lvb_operations[8][8] = {
120 /* UN NL CR CW PR PW EX PD*/
121 { -1, 1, 1, 1, 1, 1, 1, -1 }, /* UN */
122 { -1, 1, 1, 1, 1, 1, 1, 0 }, /* NL */
123 { -1, -1, 1, 1, 1, 1, 1, 0 }, /* CR */
124 { -1, -1, -1, 1, 1, 1, 1, 0 }, /* CW */
125 { -1, -1, -1, -1, 1, 1, 1, 0 }, /* PR */
126 { -1, 0, 0, 0, 0, 0, 1, 0 }, /* PW */
127 { -1, 0, 0, 0, 0, 0, 0, 0 }, /* EX */
128 { -1, 0, 0, 0, 0, 0, 0, 0 } /* PD */
129};
David Teiglande7fd4172006-01-18 09:30:29 +0000130
131#define modes_compat(gr, rq) \
132 __dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1]
133
134int dlm_modes_compat(int mode1, int mode2)
135{
136 return __dlm_compat_matrix[mode1 + 1][mode2 + 1];
137}
138
139/*
140 * Compatibility matrix for conversions with QUECVT set.
141 * Granted mode is the row; requested mode is the column.
142 * Usage: matrix[grmode+1][rqmode+1]
143 */
144
145static const int __quecvt_compat_matrix[8][8] = {
146 /* UN NL CR CW PR PW EX PD */
147 {0, 0, 0, 0, 0, 0, 0, 0}, /* UN */
148 {0, 0, 1, 1, 1, 1, 1, 0}, /* NL */
149 {0, 0, 0, 1, 1, 1, 1, 0}, /* CR */
150 {0, 0, 0, 0, 1, 1, 1, 0}, /* CW */
151 {0, 0, 0, 1, 0, 1, 1, 0}, /* PR */
152 {0, 0, 0, 0, 0, 0, 1, 0}, /* PW */
153 {0, 0, 0, 0, 0, 0, 0, 0}, /* EX */
154 {0, 0, 0, 0, 0, 0, 0, 0} /* PD */
155};
156
David Teigland597d0ca2006-07-12 16:44:04 -0500157void dlm_print_lkb(struct dlm_lkb *lkb)
David Teiglande7fd4172006-01-18 09:30:29 +0000158{
159 printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x\n"
160 " status %d rqmode %d grmode %d wait_type %d ast_type %d\n",
161 lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
162 lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode,
163 lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_ast_type);
164}
165
166void dlm_print_rsb(struct dlm_rsb *r)
167{
168 printk(KERN_ERR "rsb: nodeid %d flags %lx first %x rlc %d name %s\n",
169 r->res_nodeid, r->res_flags, r->res_first_lkid,
170 r->res_recover_locks_count, r->res_name);
171}
172
David Teiglanda345da32006-08-18 11:54:25 -0500173void dlm_dump_rsb(struct dlm_rsb *r)
174{
175 struct dlm_lkb *lkb;
176
177 dlm_print_rsb(r);
178
179 printk(KERN_ERR "rsb: root_list empty %d recover_list empty %d\n",
180 list_empty(&r->res_root_list), list_empty(&r->res_recover_list));
181 printk(KERN_ERR "rsb lookup list\n");
182 list_for_each_entry(lkb, &r->res_lookup, lkb_rsb_lookup)
183 dlm_print_lkb(lkb);
184 printk(KERN_ERR "rsb grant queue:\n");
185 list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue)
186 dlm_print_lkb(lkb);
187 printk(KERN_ERR "rsb convert queue:\n");
188 list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue)
189 dlm_print_lkb(lkb);
190 printk(KERN_ERR "rsb wait queue:\n");
191 list_for_each_entry(lkb, &r->res_waitqueue, lkb_statequeue)
192 dlm_print_lkb(lkb);
193}
194
David Teiglande7fd4172006-01-18 09:30:29 +0000195/* Threads cannot use the lockspace while it's being recovered */
196
197static inline void lock_recovery(struct dlm_ls *ls)
198{
199 down_read(&ls->ls_in_recovery);
200}
201
202static inline void unlock_recovery(struct dlm_ls *ls)
203{
204 up_read(&ls->ls_in_recovery);
205}
206
207static inline int lock_recovery_try(struct dlm_ls *ls)
208{
209 return down_read_trylock(&ls->ls_in_recovery);
210}
211
212static inline int can_be_queued(struct dlm_lkb *lkb)
213{
214 return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE);
215}
216
217static inline int force_blocking_asts(struct dlm_lkb *lkb)
218{
219 return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST);
220}
221
222static inline int is_demoted(struct dlm_lkb *lkb)
223{
224 return (lkb->lkb_sbflags & DLM_SBF_DEMOTED);
225}
226
227static inline int is_remote(struct dlm_rsb *r)
228{
229 DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r););
230 return !!r->res_nodeid;
231}
232
233static inline int is_process_copy(struct dlm_lkb *lkb)
234{
235 return (lkb->lkb_nodeid && !(lkb->lkb_flags & DLM_IFL_MSTCPY));
236}
237
238static inline int is_master_copy(struct dlm_lkb *lkb)
239{
240 if (lkb->lkb_flags & DLM_IFL_MSTCPY)
241 DLM_ASSERT(lkb->lkb_nodeid, dlm_print_lkb(lkb););
David Teigland90135922006-01-20 08:47:07 +0000242 return (lkb->lkb_flags & DLM_IFL_MSTCPY) ? 1 : 0;
David Teiglande7fd4172006-01-18 09:30:29 +0000243}
244
245static inline int middle_conversion(struct dlm_lkb *lkb)
246{
247 if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) ||
248 (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW))
David Teigland90135922006-01-20 08:47:07 +0000249 return 1;
250 return 0;
David Teiglande7fd4172006-01-18 09:30:29 +0000251}
252
253static inline int down_conversion(struct dlm_lkb *lkb)
254{
255 return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode);
256}
257
David Teiglandef0c2bb2007-03-28 09:56:46 -0500258static inline int is_overlap_unlock(struct dlm_lkb *lkb)
259{
260 return lkb->lkb_flags & DLM_IFL_OVERLAP_UNLOCK;
261}
262
263static inline int is_overlap_cancel(struct dlm_lkb *lkb)
264{
265 return lkb->lkb_flags & DLM_IFL_OVERLAP_CANCEL;
266}
267
268static inline int is_overlap(struct dlm_lkb *lkb)
269{
270 return (lkb->lkb_flags & (DLM_IFL_OVERLAP_UNLOCK |
271 DLM_IFL_OVERLAP_CANCEL));
272}
273
David Teiglande7fd4172006-01-18 09:30:29 +0000274static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
275{
276 if (is_master_copy(lkb))
277 return;
278
279 DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb););
280
281 lkb->lkb_lksb->sb_status = rv;
282 lkb->lkb_lksb->sb_flags = lkb->lkb_sbflags;
283
284 dlm_add_ast(lkb, AST_COMP);
285}
286
David Teiglandef0c2bb2007-03-28 09:56:46 -0500287static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb)
288{
289 queue_cast(r, lkb,
290 is_overlap_unlock(lkb) ? -DLM_EUNLOCK : -DLM_ECANCEL);
291}
292
David Teiglande7fd4172006-01-18 09:30:29 +0000293static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
294{
295 if (is_master_copy(lkb))
296 send_bast(r, lkb, rqmode);
297 else {
298 lkb->lkb_bastmode = rqmode;
299 dlm_add_ast(lkb, AST_BAST);
300 }
301}
302
303/*
304 * Basic operations on rsb's and lkb's
305 */
306
307static struct dlm_rsb *create_rsb(struct dlm_ls *ls, char *name, int len)
308{
309 struct dlm_rsb *r;
310
311 r = allocate_rsb(ls, len);
312 if (!r)
313 return NULL;
314
315 r->res_ls = ls;
316 r->res_length = len;
317 memcpy(r->res_name, name, len);
David Teigland90135922006-01-20 08:47:07 +0000318 mutex_init(&r->res_mutex);
David Teiglande7fd4172006-01-18 09:30:29 +0000319
320 INIT_LIST_HEAD(&r->res_lookup);
321 INIT_LIST_HEAD(&r->res_grantqueue);
322 INIT_LIST_HEAD(&r->res_convertqueue);
323 INIT_LIST_HEAD(&r->res_waitqueue);
324 INIT_LIST_HEAD(&r->res_root_list);
325 INIT_LIST_HEAD(&r->res_recover_list);
326
327 return r;
328}
329
330static int search_rsb_list(struct list_head *head, char *name, int len,
331 unsigned int flags, struct dlm_rsb **r_ret)
332{
333 struct dlm_rsb *r;
334 int error = 0;
335
336 list_for_each_entry(r, head, res_hashchain) {
337 if (len == r->res_length && !memcmp(name, r->res_name, len))
338 goto found;
339 }
David Teigland597d0ca2006-07-12 16:44:04 -0500340 return -EBADR;
David Teiglande7fd4172006-01-18 09:30:29 +0000341
342 found:
343 if (r->res_nodeid && (flags & R_MASTER))
344 error = -ENOTBLK;
345 *r_ret = r;
346 return error;
347}
348
349static int _search_rsb(struct dlm_ls *ls, char *name, int len, int b,
350 unsigned int flags, struct dlm_rsb **r_ret)
351{
352 struct dlm_rsb *r;
353 int error;
354
355 error = search_rsb_list(&ls->ls_rsbtbl[b].list, name, len, flags, &r);
356 if (!error) {
357 kref_get(&r->res_ref);
358 goto out;
359 }
360 error = search_rsb_list(&ls->ls_rsbtbl[b].toss, name, len, flags, &r);
361 if (error)
362 goto out;
363
364 list_move(&r->res_hashchain, &ls->ls_rsbtbl[b].list);
365
366 if (dlm_no_directory(ls))
367 goto out;
368
369 if (r->res_nodeid == -1) {
370 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
371 r->res_first_lkid = 0;
372 } else if (r->res_nodeid > 0) {
373 rsb_set_flag(r, RSB_MASTER_UNCERTAIN);
374 r->res_first_lkid = 0;
375 } else {
376 DLM_ASSERT(r->res_nodeid == 0, dlm_print_rsb(r););
377 DLM_ASSERT(!rsb_flag(r, RSB_MASTER_UNCERTAIN),);
378 }
379 out:
380 *r_ret = r;
381 return error;
382}
383
384static int search_rsb(struct dlm_ls *ls, char *name, int len, int b,
385 unsigned int flags, struct dlm_rsb **r_ret)
386{
387 int error;
388 write_lock(&ls->ls_rsbtbl[b].lock);
389 error = _search_rsb(ls, name, len, b, flags, r_ret);
390 write_unlock(&ls->ls_rsbtbl[b].lock);
391 return error;
392}
393
394/*
395 * Find rsb in rsbtbl and potentially create/add one
396 *
397 * Delaying the release of rsb's has a similar benefit to applications keeping
398 * NL locks on an rsb, but without the guarantee that the cached master value
399 * will still be valid when the rsb is reused. Apps aren't always smart enough
400 * to keep NL locks on an rsb that they may lock again shortly; this can lead
401 * to excessive master lookups and removals if we don't delay the release.
402 *
403 * Searching for an rsb means looking through both the normal list and toss
404 * list. When found on the toss list the rsb is moved to the normal list with
405 * ref count of 1; when found on normal list the ref count is incremented.
406 */
407
408static int find_rsb(struct dlm_ls *ls, char *name, int namelen,
409 unsigned int flags, struct dlm_rsb **r_ret)
410{
411 struct dlm_rsb *r, *tmp;
412 uint32_t hash, bucket;
413 int error = 0;
414
415 if (dlm_no_directory(ls))
416 flags |= R_CREATE;
417
418 hash = jhash(name, namelen, 0);
419 bucket = hash & (ls->ls_rsbtbl_size - 1);
420
421 error = search_rsb(ls, name, namelen, bucket, flags, &r);
422 if (!error)
423 goto out;
424
David Teigland597d0ca2006-07-12 16:44:04 -0500425 if (error == -EBADR && !(flags & R_CREATE))
David Teiglande7fd4172006-01-18 09:30:29 +0000426 goto out;
427
428 /* the rsb was found but wasn't a master copy */
429 if (error == -ENOTBLK)
430 goto out;
431
432 error = -ENOMEM;
433 r = create_rsb(ls, name, namelen);
434 if (!r)
435 goto out;
436
437 r->res_hash = hash;
438 r->res_bucket = bucket;
439 r->res_nodeid = -1;
440 kref_init(&r->res_ref);
441
442 /* With no directory, the master can be set immediately */
443 if (dlm_no_directory(ls)) {
444 int nodeid = dlm_dir_nodeid(r);
445 if (nodeid == dlm_our_nodeid())
446 nodeid = 0;
447 r->res_nodeid = nodeid;
448 }
449
450 write_lock(&ls->ls_rsbtbl[bucket].lock);
451 error = _search_rsb(ls, name, namelen, bucket, 0, &tmp);
452 if (!error) {
453 write_unlock(&ls->ls_rsbtbl[bucket].lock);
454 free_rsb(r);
455 r = tmp;
456 goto out;
457 }
458 list_add(&r->res_hashchain, &ls->ls_rsbtbl[bucket].list);
459 write_unlock(&ls->ls_rsbtbl[bucket].lock);
460 error = 0;
461 out:
462 *r_ret = r;
463 return error;
464}
465
466int dlm_find_rsb(struct dlm_ls *ls, char *name, int namelen,
467 unsigned int flags, struct dlm_rsb **r_ret)
468{
469 return find_rsb(ls, name, namelen, flags, r_ret);
470}
471
472/* This is only called to add a reference when the code already holds
473 a valid reference to the rsb, so there's no need for locking. */
474
475static inline void hold_rsb(struct dlm_rsb *r)
476{
477 kref_get(&r->res_ref);
478}
479
480void dlm_hold_rsb(struct dlm_rsb *r)
481{
482 hold_rsb(r);
483}
484
485static void toss_rsb(struct kref *kref)
486{
487 struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
488 struct dlm_ls *ls = r->res_ls;
489
490 DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
491 kref_init(&r->res_ref);
492 list_move(&r->res_hashchain, &ls->ls_rsbtbl[r->res_bucket].toss);
493 r->res_toss_time = jiffies;
494 if (r->res_lvbptr) {
495 free_lvb(r->res_lvbptr);
496 r->res_lvbptr = NULL;
497 }
498}
499
500/* When all references to the rsb are gone it's transfered to
501 the tossed list for later disposal. */
502
503static void put_rsb(struct dlm_rsb *r)
504{
505 struct dlm_ls *ls = r->res_ls;
506 uint32_t bucket = r->res_bucket;
507
508 write_lock(&ls->ls_rsbtbl[bucket].lock);
509 kref_put(&r->res_ref, toss_rsb);
510 write_unlock(&ls->ls_rsbtbl[bucket].lock);
511}
512
513void dlm_put_rsb(struct dlm_rsb *r)
514{
515 put_rsb(r);
516}
517
518/* See comment for unhold_lkb */
519
520static void unhold_rsb(struct dlm_rsb *r)
521{
522 int rv;
523 rv = kref_put(&r->res_ref, toss_rsb);
David Teiglanda345da32006-08-18 11:54:25 -0500524 DLM_ASSERT(!rv, dlm_dump_rsb(r););
David Teiglande7fd4172006-01-18 09:30:29 +0000525}
526
527static void kill_rsb(struct kref *kref)
528{
529 struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
530
531 /* All work is done after the return from kref_put() so we
532 can release the write_lock before the remove and free. */
533
David Teiglanda345da32006-08-18 11:54:25 -0500534 DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r););
535 DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r););
536 DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r););
537 DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r););
538 DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r););
539 DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r););
David Teiglande7fd4172006-01-18 09:30:29 +0000540}
541
542/* Attaching/detaching lkb's from rsb's is for rsb reference counting.
543 The rsb must exist as long as any lkb's for it do. */
544
545static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
546{
547 hold_rsb(r);
548 lkb->lkb_resource = r;
549}
550
551static void detach_lkb(struct dlm_lkb *lkb)
552{
553 if (lkb->lkb_resource) {
554 put_rsb(lkb->lkb_resource);
555 lkb->lkb_resource = NULL;
556 }
557}
558
559static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
560{
561 struct dlm_lkb *lkb, *tmp;
562 uint32_t lkid = 0;
563 uint16_t bucket;
564
565 lkb = allocate_lkb(ls);
566 if (!lkb)
567 return -ENOMEM;
568
569 lkb->lkb_nodeid = -1;
570 lkb->lkb_grmode = DLM_LOCK_IV;
571 kref_init(&lkb->lkb_ref);
David Teigland34e22be2006-07-18 11:24:04 -0500572 INIT_LIST_HEAD(&lkb->lkb_ownqueue);
David Teiglandef0c2bb2007-03-28 09:56:46 -0500573 INIT_LIST_HEAD(&lkb->lkb_rsb_lookup);
David Teiglande7fd4172006-01-18 09:30:29 +0000574
575 get_random_bytes(&bucket, sizeof(bucket));
576 bucket &= (ls->ls_lkbtbl_size - 1);
577
578 write_lock(&ls->ls_lkbtbl[bucket].lock);
579
580 /* counter can roll over so we must verify lkid is not in use */
581
582 while (lkid == 0) {
583 lkid = bucket | (ls->ls_lkbtbl[bucket].counter++ << 16);
584
585 list_for_each_entry(tmp, &ls->ls_lkbtbl[bucket].list,
586 lkb_idtbl_list) {
587 if (tmp->lkb_id != lkid)
588 continue;
589 lkid = 0;
590 break;
591 }
592 }
593
594 lkb->lkb_id = lkid;
595 list_add(&lkb->lkb_idtbl_list, &ls->ls_lkbtbl[bucket].list);
596 write_unlock(&ls->ls_lkbtbl[bucket].lock);
597
598 *lkb_ret = lkb;
599 return 0;
600}
601
602static struct dlm_lkb *__find_lkb(struct dlm_ls *ls, uint32_t lkid)
603{
604 uint16_t bucket = lkid & 0xFFFF;
605 struct dlm_lkb *lkb;
606
607 list_for_each_entry(lkb, &ls->ls_lkbtbl[bucket].list, lkb_idtbl_list) {
608 if (lkb->lkb_id == lkid)
609 return lkb;
610 }
611 return NULL;
612}
613
614static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
615{
616 struct dlm_lkb *lkb;
617 uint16_t bucket = lkid & 0xFFFF;
618
619 if (bucket >= ls->ls_lkbtbl_size)
620 return -EBADSLT;
621
622 read_lock(&ls->ls_lkbtbl[bucket].lock);
623 lkb = __find_lkb(ls, lkid);
624 if (lkb)
625 kref_get(&lkb->lkb_ref);
626 read_unlock(&ls->ls_lkbtbl[bucket].lock);
627
628 *lkb_ret = lkb;
629 return lkb ? 0 : -ENOENT;
630}
631
632static void kill_lkb(struct kref *kref)
633{
634 struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
635
636 /* All work is done after the return from kref_put() so we
637 can release the write_lock before the detach_lkb */
638
639 DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
640}
641
David Teiglandb3f58d82006-02-28 11:16:37 -0500642/* __put_lkb() is used when an lkb may not have an rsb attached to
643 it so we need to provide the lockspace explicitly */
644
645static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
David Teiglande7fd4172006-01-18 09:30:29 +0000646{
David Teiglande7fd4172006-01-18 09:30:29 +0000647 uint16_t bucket = lkb->lkb_id & 0xFFFF;
648
649 write_lock(&ls->ls_lkbtbl[bucket].lock);
650 if (kref_put(&lkb->lkb_ref, kill_lkb)) {
651 list_del(&lkb->lkb_idtbl_list);
652 write_unlock(&ls->ls_lkbtbl[bucket].lock);
653
654 detach_lkb(lkb);
655
656 /* for local/process lkbs, lvbptr points to caller's lksb */
657 if (lkb->lkb_lvbptr && is_master_copy(lkb))
658 free_lvb(lkb->lkb_lvbptr);
David Teiglande7fd4172006-01-18 09:30:29 +0000659 free_lkb(lkb);
660 return 1;
661 } else {
662 write_unlock(&ls->ls_lkbtbl[bucket].lock);
663 return 0;
664 }
665}
666
667int dlm_put_lkb(struct dlm_lkb *lkb)
668{
David Teiglandb3f58d82006-02-28 11:16:37 -0500669 struct dlm_ls *ls;
670
671 DLM_ASSERT(lkb->lkb_resource, dlm_print_lkb(lkb););
672 DLM_ASSERT(lkb->lkb_resource->res_ls, dlm_print_lkb(lkb););
673
674 ls = lkb->lkb_resource->res_ls;
675 return __put_lkb(ls, lkb);
David Teiglande7fd4172006-01-18 09:30:29 +0000676}
677
678/* This is only called to add a reference when the code already holds
679 a valid reference to the lkb, so there's no need for locking. */
680
681static inline void hold_lkb(struct dlm_lkb *lkb)
682{
683 kref_get(&lkb->lkb_ref);
684}
685
686/* This is called when we need to remove a reference and are certain
687 it's not the last ref. e.g. del_lkb is always called between a
688 find_lkb/put_lkb and is always the inverse of a previous add_lkb.
689 put_lkb would work fine, but would involve unnecessary locking */
690
691static inline void unhold_lkb(struct dlm_lkb *lkb)
692{
693 int rv;
694 rv = kref_put(&lkb->lkb_ref, kill_lkb);
695 DLM_ASSERT(!rv, dlm_print_lkb(lkb););
696}
697
698static void lkb_add_ordered(struct list_head *new, struct list_head *head,
699 int mode)
700{
701 struct dlm_lkb *lkb = NULL;
702
703 list_for_each_entry(lkb, head, lkb_statequeue)
704 if (lkb->lkb_rqmode < mode)
705 break;
706
707 if (!lkb)
708 list_add_tail(new, head);
709 else
710 __list_add(new, lkb->lkb_statequeue.prev, &lkb->lkb_statequeue);
711}
712
713/* add/remove lkb to rsb's grant/convert/wait queue */
714
715static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
716{
717 kref_get(&lkb->lkb_ref);
718
719 DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
720
721 lkb->lkb_status = status;
722
723 switch (status) {
724 case DLM_LKSTS_WAITING:
725 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
726 list_add(&lkb->lkb_statequeue, &r->res_waitqueue);
727 else
728 list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue);
729 break;
730 case DLM_LKSTS_GRANTED:
731 /* convention says granted locks kept in order of grmode */
732 lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue,
733 lkb->lkb_grmode);
734 break;
735 case DLM_LKSTS_CONVERT:
736 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
737 list_add(&lkb->lkb_statequeue, &r->res_convertqueue);
738 else
739 list_add_tail(&lkb->lkb_statequeue,
740 &r->res_convertqueue);
741 break;
742 default:
743 DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status););
744 }
745}
746
747static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
748{
749 lkb->lkb_status = 0;
750 list_del(&lkb->lkb_statequeue);
751 unhold_lkb(lkb);
752}
753
754static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts)
755{
756 hold_lkb(lkb);
757 del_lkb(r, lkb);
758 add_lkb(r, lkb, sts);
759 unhold_lkb(lkb);
760}
761
David Teiglandef0c2bb2007-03-28 09:56:46 -0500762static int msg_reply_type(int mstype)
763{
764 switch (mstype) {
765 case DLM_MSG_REQUEST:
766 return DLM_MSG_REQUEST_REPLY;
767 case DLM_MSG_CONVERT:
768 return DLM_MSG_CONVERT_REPLY;
769 case DLM_MSG_UNLOCK:
770 return DLM_MSG_UNLOCK_REPLY;
771 case DLM_MSG_CANCEL:
772 return DLM_MSG_CANCEL_REPLY;
773 case DLM_MSG_LOOKUP:
774 return DLM_MSG_LOOKUP_REPLY;
775 }
776 return -1;
777}
778
David Teiglande7fd4172006-01-18 09:30:29 +0000779/* add/remove lkb from global waiters list of lkb's waiting for
780 a reply from a remote node */
781
David Teiglandef0c2bb2007-03-28 09:56:46 -0500782static int add_to_waiters(struct dlm_lkb *lkb, int mstype)
David Teiglande7fd4172006-01-18 09:30:29 +0000783{
784 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
David Teiglandef0c2bb2007-03-28 09:56:46 -0500785 int error = 0;
David Teiglande7fd4172006-01-18 09:30:29 +0000786
David Teigland90135922006-01-20 08:47:07 +0000787 mutex_lock(&ls->ls_waiters_mutex);
David Teiglandef0c2bb2007-03-28 09:56:46 -0500788
789 if (is_overlap_unlock(lkb) ||
790 (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL))) {
791 error = -EINVAL;
David Teiglande7fd4172006-01-18 09:30:29 +0000792 goto out;
793 }
David Teiglandef0c2bb2007-03-28 09:56:46 -0500794
795 if (lkb->lkb_wait_type || is_overlap_cancel(lkb)) {
796 switch (mstype) {
797 case DLM_MSG_UNLOCK:
798 lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
799 break;
800 case DLM_MSG_CANCEL:
801 lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
802 break;
803 default:
804 error = -EBUSY;
805 goto out;
806 }
807 lkb->lkb_wait_count++;
808 hold_lkb(lkb);
809
810 log_debug(ls, "add overlap %x cur %d new %d count %d flags %x",
811 lkb->lkb_id, lkb->lkb_wait_type, mstype,
812 lkb->lkb_wait_count, lkb->lkb_flags);
813 goto out;
814 }
815
816 DLM_ASSERT(!lkb->lkb_wait_count,
817 dlm_print_lkb(lkb);
818 printk("wait_count %d\n", lkb->lkb_wait_count););
819
820 lkb->lkb_wait_count++;
David Teiglande7fd4172006-01-18 09:30:29 +0000821 lkb->lkb_wait_type = mstype;
David Teiglandef0c2bb2007-03-28 09:56:46 -0500822 hold_lkb(lkb);
David Teiglande7fd4172006-01-18 09:30:29 +0000823 list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
824 out:
David Teiglandef0c2bb2007-03-28 09:56:46 -0500825 if (error)
826 log_error(ls, "add_to_waiters %x error %d flags %x %d %d %s",
827 lkb->lkb_id, error, lkb->lkb_flags, mstype,
828 lkb->lkb_wait_type, lkb->lkb_resource->res_name);
David Teigland90135922006-01-20 08:47:07 +0000829 mutex_unlock(&ls->ls_waiters_mutex);
David Teiglandef0c2bb2007-03-28 09:56:46 -0500830 return error;
David Teiglande7fd4172006-01-18 09:30:29 +0000831}
832
David Teiglandb790c3b2007-01-24 10:21:33 -0600833/* We clear the RESEND flag because we might be taking an lkb off the waiters
834 list as part of process_requestqueue (e.g. a lookup that has an optimized
835 request reply on the requestqueue) between dlm_recover_waiters_pre() which
836 set RESEND and dlm_recover_waiters_post() */
837
David Teiglandef0c2bb2007-03-28 09:56:46 -0500838static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype)
David Teiglande7fd4172006-01-18 09:30:29 +0000839{
David Teiglandef0c2bb2007-03-28 09:56:46 -0500840 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
841 int overlap_done = 0;
David Teiglande7fd4172006-01-18 09:30:29 +0000842
David Teiglandef0c2bb2007-03-28 09:56:46 -0500843 if (is_overlap_unlock(lkb) && (mstype == DLM_MSG_UNLOCK_REPLY)) {
844 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
845 overlap_done = 1;
846 goto out_del;
David Teiglande7fd4172006-01-18 09:30:29 +0000847 }
David Teiglandef0c2bb2007-03-28 09:56:46 -0500848
849 if (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL_REPLY)) {
850 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
851 overlap_done = 1;
852 goto out_del;
853 }
854
855 /* N.B. type of reply may not always correspond to type of original
856 msg due to lookup->request optimization, verify others? */
857
858 if (lkb->lkb_wait_type) {
859 lkb->lkb_wait_type = 0;
860 goto out_del;
861 }
862
863 log_error(ls, "remove_from_waiters lkid %x flags %x types %d %d",
864 lkb->lkb_id, lkb->lkb_flags, mstype, lkb->lkb_wait_type);
865 return -1;
866
867 out_del:
868 /* the force-unlock/cancel has completed and we haven't recvd a reply
869 to the op that was in progress prior to the unlock/cancel; we
870 give up on any reply to the earlier op. FIXME: not sure when/how
871 this would happen */
872
873 if (overlap_done && lkb->lkb_wait_type) {
874 log_error(ls, "remove_from_waiters %x reply %d give up on %d",
875 lkb->lkb_id, mstype, lkb->lkb_wait_type);
876 lkb->lkb_wait_count--;
877 lkb->lkb_wait_type = 0;
878 }
879
880 DLM_ASSERT(lkb->lkb_wait_count, dlm_print_lkb(lkb););
881
David Teiglandb790c3b2007-01-24 10:21:33 -0600882 lkb->lkb_flags &= ~DLM_IFL_RESEND;
David Teiglandef0c2bb2007-03-28 09:56:46 -0500883 lkb->lkb_wait_count--;
884 if (!lkb->lkb_wait_count)
885 list_del_init(&lkb->lkb_wait_reply);
David Teiglande7fd4172006-01-18 09:30:29 +0000886 unhold_lkb(lkb);
David Teiglandef0c2bb2007-03-28 09:56:46 -0500887 return 0;
David Teiglande7fd4172006-01-18 09:30:29 +0000888}
889
David Teiglandef0c2bb2007-03-28 09:56:46 -0500890static int remove_from_waiters(struct dlm_lkb *lkb, int mstype)
David Teiglande7fd4172006-01-18 09:30:29 +0000891{
892 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
893 int error;
894
David Teigland90135922006-01-20 08:47:07 +0000895 mutex_lock(&ls->ls_waiters_mutex);
David Teiglandef0c2bb2007-03-28 09:56:46 -0500896 error = _remove_from_waiters(lkb, mstype);
David Teigland90135922006-01-20 08:47:07 +0000897 mutex_unlock(&ls->ls_waiters_mutex);
David Teiglande7fd4172006-01-18 09:30:29 +0000898 return error;
899}
900
David Teiglandef0c2bb2007-03-28 09:56:46 -0500901/* Handles situations where we might be processing a "fake" or "stub" reply in
902 which we can't try to take waiters_mutex again. */
903
904static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms)
905{
906 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
907 int error;
908
909 if (ms != &ls->ls_stub_ms)
910 mutex_lock(&ls->ls_waiters_mutex);
911 error = _remove_from_waiters(lkb, ms->m_type);
912 if (ms != &ls->ls_stub_ms)
913 mutex_unlock(&ls->ls_waiters_mutex);
914 return error;
915}
916
David Teiglande7fd4172006-01-18 09:30:29 +0000917static void dir_remove(struct dlm_rsb *r)
918{
919 int to_nodeid;
920
921 if (dlm_no_directory(r->res_ls))
922 return;
923
924 to_nodeid = dlm_dir_nodeid(r);
925 if (to_nodeid != dlm_our_nodeid())
926 send_remove(r);
927 else
928 dlm_dir_remove_entry(r->res_ls, to_nodeid,
929 r->res_name, r->res_length);
930}
931
932/* FIXME: shouldn't this be able to exit as soon as one non-due rsb is
933 found since they are in order of newest to oldest? */
934
935static int shrink_bucket(struct dlm_ls *ls, int b)
936{
937 struct dlm_rsb *r;
938 int count = 0, found;
939
940 for (;;) {
David Teigland90135922006-01-20 08:47:07 +0000941 found = 0;
David Teiglande7fd4172006-01-18 09:30:29 +0000942 write_lock(&ls->ls_rsbtbl[b].lock);
943 list_for_each_entry_reverse(r, &ls->ls_rsbtbl[b].toss,
944 res_hashchain) {
945 if (!time_after_eq(jiffies, r->res_toss_time +
David Teigland68c817a2007-01-09 09:41:48 -0600946 dlm_config.ci_toss_secs * HZ))
David Teiglande7fd4172006-01-18 09:30:29 +0000947 continue;
David Teigland90135922006-01-20 08:47:07 +0000948 found = 1;
David Teiglande7fd4172006-01-18 09:30:29 +0000949 break;
950 }
951
952 if (!found) {
953 write_unlock(&ls->ls_rsbtbl[b].lock);
954 break;
955 }
956
957 if (kref_put(&r->res_ref, kill_rsb)) {
958 list_del(&r->res_hashchain);
959 write_unlock(&ls->ls_rsbtbl[b].lock);
960
961 if (is_master(r))
962 dir_remove(r);
963 free_rsb(r);
964 count++;
965 } else {
966 write_unlock(&ls->ls_rsbtbl[b].lock);
967 log_error(ls, "tossed rsb in use %s", r->res_name);
968 }
969 }
970
971 return count;
972}
973
974void dlm_scan_rsbs(struct dlm_ls *ls)
975{
976 int i;
977
978 if (dlm_locking_stopped(ls))
979 return;
980
981 for (i = 0; i < ls->ls_rsbtbl_size; i++) {
982 shrink_bucket(ls, i);
983 cond_resched();
984 }
985}
986
987/* lkb is master or local copy */
988
989static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
990{
991 int b, len = r->res_ls->ls_lvblen;
992
993 /* b=1 lvb returned to caller
994 b=0 lvb written to rsb or invalidated
995 b=-1 do nothing */
996
997 b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
998
999 if (b == 1) {
1000 if (!lkb->lkb_lvbptr)
1001 return;
1002
1003 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1004 return;
1005
1006 if (!r->res_lvbptr)
1007 return;
1008
1009 memcpy(lkb->lkb_lvbptr, r->res_lvbptr, len);
1010 lkb->lkb_lvbseq = r->res_lvbseq;
1011
1012 } else if (b == 0) {
1013 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1014 rsb_set_flag(r, RSB_VALNOTVALID);
1015 return;
1016 }
1017
1018 if (!lkb->lkb_lvbptr)
1019 return;
1020
1021 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1022 return;
1023
1024 if (!r->res_lvbptr)
1025 r->res_lvbptr = allocate_lvb(r->res_ls);
1026
1027 if (!r->res_lvbptr)
1028 return;
1029
1030 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, len);
1031 r->res_lvbseq++;
1032 lkb->lkb_lvbseq = r->res_lvbseq;
1033 rsb_clear_flag(r, RSB_VALNOTVALID);
1034 }
1035
1036 if (rsb_flag(r, RSB_VALNOTVALID))
1037 lkb->lkb_sbflags |= DLM_SBF_VALNOTVALID;
1038}
1039
1040static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1041{
1042 if (lkb->lkb_grmode < DLM_LOCK_PW)
1043 return;
1044
1045 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1046 rsb_set_flag(r, RSB_VALNOTVALID);
1047 return;
1048 }
1049
1050 if (!lkb->lkb_lvbptr)
1051 return;
1052
1053 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1054 return;
1055
1056 if (!r->res_lvbptr)
1057 r->res_lvbptr = allocate_lvb(r->res_ls);
1058
1059 if (!r->res_lvbptr)
1060 return;
1061
1062 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
1063 r->res_lvbseq++;
1064 rsb_clear_flag(r, RSB_VALNOTVALID);
1065}
1066
1067/* lkb is process copy (pc) */
1068
1069static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1070 struct dlm_message *ms)
1071{
1072 int b;
1073
1074 if (!lkb->lkb_lvbptr)
1075 return;
1076
1077 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1078 return;
1079
David Teigland597d0ca2006-07-12 16:44:04 -05001080 b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
David Teiglande7fd4172006-01-18 09:30:29 +00001081 if (b == 1) {
1082 int len = receive_extralen(ms);
1083 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
1084 lkb->lkb_lvbseq = ms->m_lvbseq;
1085 }
1086}
1087
1088/* Manipulate lkb's on rsb's convert/granted/waiting queues
1089 remove_lock -- used for unlock, removes lkb from granted
1090 revert_lock -- used for cancel, moves lkb from convert to granted
1091 grant_lock -- used for request and convert, adds lkb to granted or
1092 moves lkb from convert or waiting to granted
1093
1094 Each of these is used for master or local copy lkb's. There is
1095 also a _pc() variation used to make the corresponding change on
1096 a process copy (pc) lkb. */
1097
1098static void _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1099{
1100 del_lkb(r, lkb);
1101 lkb->lkb_grmode = DLM_LOCK_IV;
1102 /* this unhold undoes the original ref from create_lkb()
1103 so this leads to the lkb being freed */
1104 unhold_lkb(lkb);
1105}
1106
1107static void remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1108{
1109 set_lvb_unlock(r, lkb);
1110 _remove_lock(r, lkb);
1111}
1112
1113static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1114{
1115 _remove_lock(r, lkb);
1116}
1117
David Teiglandef0c2bb2007-03-28 09:56:46 -05001118/* returns: 0 did nothing
1119 1 moved lock to granted
1120 -1 removed lock */
1121
1122static int revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
David Teiglande7fd4172006-01-18 09:30:29 +00001123{
David Teiglandef0c2bb2007-03-28 09:56:46 -05001124 int rv = 0;
1125
David Teiglande7fd4172006-01-18 09:30:29 +00001126 lkb->lkb_rqmode = DLM_LOCK_IV;
1127
1128 switch (lkb->lkb_status) {
David Teigland597d0ca2006-07-12 16:44:04 -05001129 case DLM_LKSTS_GRANTED:
1130 break;
David Teiglande7fd4172006-01-18 09:30:29 +00001131 case DLM_LKSTS_CONVERT:
1132 move_lkb(r, lkb, DLM_LKSTS_GRANTED);
David Teiglandef0c2bb2007-03-28 09:56:46 -05001133 rv = 1;
David Teiglande7fd4172006-01-18 09:30:29 +00001134 break;
1135 case DLM_LKSTS_WAITING:
1136 del_lkb(r, lkb);
1137 lkb->lkb_grmode = DLM_LOCK_IV;
1138 /* this unhold undoes the original ref from create_lkb()
1139 so this leads to the lkb being freed */
1140 unhold_lkb(lkb);
David Teiglandef0c2bb2007-03-28 09:56:46 -05001141 rv = -1;
David Teiglande7fd4172006-01-18 09:30:29 +00001142 break;
1143 default:
1144 log_print("invalid status for revert %d", lkb->lkb_status);
1145 }
David Teiglandef0c2bb2007-03-28 09:56:46 -05001146 return rv;
David Teiglande7fd4172006-01-18 09:30:29 +00001147}
1148
David Teiglandef0c2bb2007-03-28 09:56:46 -05001149static int revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
David Teiglande7fd4172006-01-18 09:30:29 +00001150{
David Teiglandef0c2bb2007-03-28 09:56:46 -05001151 return revert_lock(r, lkb);
David Teiglande7fd4172006-01-18 09:30:29 +00001152}
1153
1154static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1155{
1156 if (lkb->lkb_grmode != lkb->lkb_rqmode) {
1157 lkb->lkb_grmode = lkb->lkb_rqmode;
1158 if (lkb->lkb_status)
1159 move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1160 else
1161 add_lkb(r, lkb, DLM_LKSTS_GRANTED);
1162 }
1163
1164 lkb->lkb_rqmode = DLM_LOCK_IV;
David Teiglande7fd4172006-01-18 09:30:29 +00001165}
1166
1167static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1168{
1169 set_lvb_lock(r, lkb);
1170 _grant_lock(r, lkb);
1171 lkb->lkb_highbast = 0;
1172}
1173
1174static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1175 struct dlm_message *ms)
1176{
1177 set_lvb_lock_pc(r, lkb, ms);
1178 _grant_lock(r, lkb);
1179}
1180
1181/* called by grant_pending_locks() which means an async grant message must
1182 be sent to the requesting node in addition to granting the lock if the
1183 lkb belongs to a remote node. */
1184
1185static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb)
1186{
1187 grant_lock(r, lkb);
1188 if (is_master_copy(lkb))
1189 send_grant(r, lkb);
1190 else
1191 queue_cast(r, lkb, 0);
1192}
1193
1194static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head)
1195{
1196 struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb,
1197 lkb_statequeue);
1198 if (lkb->lkb_id == first->lkb_id)
David Teigland90135922006-01-20 08:47:07 +00001199 return 1;
David Teiglande7fd4172006-01-18 09:30:29 +00001200
David Teigland90135922006-01-20 08:47:07 +00001201 return 0;
David Teiglande7fd4172006-01-18 09:30:29 +00001202}
1203
David Teiglande7fd4172006-01-18 09:30:29 +00001204/* Check if the given lkb conflicts with another lkb on the queue. */
1205
1206static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb)
1207{
1208 struct dlm_lkb *this;
1209
1210 list_for_each_entry(this, head, lkb_statequeue) {
1211 if (this == lkb)
1212 continue;
David Teigland3bcd3682006-02-23 09:56:38 +00001213 if (!modes_compat(this, lkb))
David Teigland90135922006-01-20 08:47:07 +00001214 return 1;
David Teiglande7fd4172006-01-18 09:30:29 +00001215 }
David Teigland90135922006-01-20 08:47:07 +00001216 return 0;
David Teiglande7fd4172006-01-18 09:30:29 +00001217}
1218
1219/*
1220 * "A conversion deadlock arises with a pair of lock requests in the converting
1221 * queue for one resource. The granted mode of each lock blocks the requested
1222 * mode of the other lock."
1223 *
1224 * Part 2: if the granted mode of lkb is preventing the first lkb in the
1225 * convert queue from being granted, then demote lkb (set grmode to NL).
1226 * This second form requires that we check for conv-deadlk even when
1227 * now == 0 in _can_be_granted().
1228 *
1229 * Example:
1230 * Granted Queue: empty
1231 * Convert Queue: NL->EX (first lock)
1232 * PR->EX (second lock)
1233 *
1234 * The first lock can't be granted because of the granted mode of the second
1235 * lock and the second lock can't be granted because it's not first in the
1236 * list. We demote the granted mode of the second lock (the lkb passed to this
1237 * function).
1238 *
1239 * After the resolution, the "grant pending" function needs to go back and try
1240 * to grant locks on the convert queue again since the first lock can now be
1241 * granted.
1242 */
1243
1244static int conversion_deadlock_detect(struct dlm_rsb *rsb, struct dlm_lkb *lkb)
1245{
1246 struct dlm_lkb *this, *first = NULL, *self = NULL;
1247
1248 list_for_each_entry(this, &rsb->res_convertqueue, lkb_statequeue) {
1249 if (!first)
1250 first = this;
1251 if (this == lkb) {
1252 self = lkb;
1253 continue;
1254 }
1255
David Teiglande7fd4172006-01-18 09:30:29 +00001256 if (!modes_compat(this, lkb) && !modes_compat(lkb, this))
David Teigland90135922006-01-20 08:47:07 +00001257 return 1;
David Teiglande7fd4172006-01-18 09:30:29 +00001258 }
1259
1260 /* if lkb is on the convert queue and is preventing the first
1261 from being granted, then there's deadlock and we demote lkb.
1262 multiple converting locks may need to do this before the first
1263 converting lock can be granted. */
1264
1265 if (self && self != first) {
1266 if (!modes_compat(lkb, first) &&
1267 !queue_conflict(&rsb->res_grantqueue, first))
David Teigland90135922006-01-20 08:47:07 +00001268 return 1;
David Teiglande7fd4172006-01-18 09:30:29 +00001269 }
1270
David Teigland90135922006-01-20 08:47:07 +00001271 return 0;
David Teiglande7fd4172006-01-18 09:30:29 +00001272}
1273
1274/*
1275 * Return 1 if the lock can be granted, 0 otherwise.
1276 * Also detect and resolve conversion deadlocks.
1277 *
1278 * lkb is the lock to be granted
1279 *
1280 * now is 1 if the function is being called in the context of the
1281 * immediate request, it is 0 if called later, after the lock has been
1282 * queued.
1283 *
1284 * References are from chapter 6 of "VAXcluster Principles" by Roy Davis
1285 */
1286
1287static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
1288{
1289 int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV);
1290
1291 /*
1292 * 6-10: Version 5.4 introduced an option to address the phenomenon of
1293 * a new request for a NL mode lock being blocked.
1294 *
1295 * 6-11: If the optional EXPEDITE flag is used with the new NL mode
1296 * request, then it would be granted. In essence, the use of this flag
1297 * tells the Lock Manager to expedite theis request by not considering
1298 * what may be in the CONVERTING or WAITING queues... As of this
1299 * writing, the EXPEDITE flag can be used only with new requests for NL
1300 * mode locks. This flag is not valid for conversion requests.
1301 *
1302 * A shortcut. Earlier checks return an error if EXPEDITE is used in a
1303 * conversion or used with a non-NL requested mode. We also know an
1304 * EXPEDITE request is always granted immediately, so now must always
1305 * be 1. The full condition to grant an expedite request: (now &&
1306 * !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can
1307 * therefore be shortened to just checking the flag.
1308 */
1309
1310 if (lkb->lkb_exflags & DLM_LKF_EXPEDITE)
David Teigland90135922006-01-20 08:47:07 +00001311 return 1;
David Teiglande7fd4172006-01-18 09:30:29 +00001312
1313 /*
1314 * A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be
1315 * added to the remaining conditions.
1316 */
1317
1318 if (queue_conflict(&r->res_grantqueue, lkb))
1319 goto out;
1320
1321 /*
1322 * 6-3: By default, a conversion request is immediately granted if the
1323 * requested mode is compatible with the modes of all other granted
1324 * locks
1325 */
1326
1327 if (queue_conflict(&r->res_convertqueue, lkb))
1328 goto out;
1329
1330 /*
1331 * 6-5: But the default algorithm for deciding whether to grant or
1332 * queue conversion requests does not by itself guarantee that such
1333 * requests are serviced on a "first come first serve" basis. This, in
1334 * turn, can lead to a phenomenon known as "indefinate postponement".
1335 *
1336 * 6-7: This issue is dealt with by using the optional QUECVT flag with
1337 * the system service employed to request a lock conversion. This flag
1338 * forces certain conversion requests to be queued, even if they are
1339 * compatible with the granted modes of other locks on the same
1340 * resource. Thus, the use of this flag results in conversion requests
1341 * being ordered on a "first come first servce" basis.
1342 *
1343 * DCT: This condition is all about new conversions being able to occur
1344 * "in place" while the lock remains on the granted queue (assuming
1345 * nothing else conflicts.) IOW if QUECVT isn't set, a conversion
1346 * doesn't _have_ to go onto the convert queue where it's processed in
1347 * order. The "now" variable is necessary to distinguish converts
1348 * being received and processed for the first time now, because once a
1349 * convert is moved to the conversion queue the condition below applies
1350 * requiring fifo granting.
1351 */
1352
1353 if (now && conv && !(lkb->lkb_exflags & DLM_LKF_QUECVT))
David Teigland90135922006-01-20 08:47:07 +00001354 return 1;
David Teiglande7fd4172006-01-18 09:30:29 +00001355
1356 /*
David Teigland3bcd3682006-02-23 09:56:38 +00001357 * The NOORDER flag is set to avoid the standard vms rules on grant
1358 * order.
David Teiglande7fd4172006-01-18 09:30:29 +00001359 */
1360
1361 if (lkb->lkb_exflags & DLM_LKF_NOORDER)
David Teigland90135922006-01-20 08:47:07 +00001362 return 1;
David Teiglande7fd4172006-01-18 09:30:29 +00001363
1364 /*
1365 * 6-3: Once in that queue [CONVERTING], a conversion request cannot be
1366 * granted until all other conversion requests ahead of it are granted
1367 * and/or canceled.
1368 */
1369
1370 if (!now && conv && first_in_list(lkb, &r->res_convertqueue))
David Teigland90135922006-01-20 08:47:07 +00001371 return 1;
David Teiglande7fd4172006-01-18 09:30:29 +00001372
1373 /*
1374 * 6-4: By default, a new request is immediately granted only if all
1375 * three of the following conditions are satisfied when the request is
1376 * issued:
1377 * - The queue of ungranted conversion requests for the resource is
1378 * empty.
1379 * - The queue of ungranted new requests for the resource is empty.
1380 * - The mode of the new request is compatible with the most
1381 * restrictive mode of all granted locks on the resource.
1382 */
1383
1384 if (now && !conv && list_empty(&r->res_convertqueue) &&
1385 list_empty(&r->res_waitqueue))
David Teigland90135922006-01-20 08:47:07 +00001386 return 1;
David Teiglande7fd4172006-01-18 09:30:29 +00001387
1388 /*
1389 * 6-4: Once a lock request is in the queue of ungranted new requests,
1390 * it cannot be granted until the queue of ungranted conversion
1391 * requests is empty, all ungranted new requests ahead of it are
1392 * granted and/or canceled, and it is compatible with the granted mode
1393 * of the most restrictive lock granted on the resource.
1394 */
1395
1396 if (!now && !conv && list_empty(&r->res_convertqueue) &&
1397 first_in_list(lkb, &r->res_waitqueue))
David Teigland90135922006-01-20 08:47:07 +00001398 return 1;
David Teiglande7fd4172006-01-18 09:30:29 +00001399
1400 out:
1401 /*
1402 * The following, enabled by CONVDEADLK, departs from VMS.
1403 */
1404
1405 if (conv && (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) &&
1406 conversion_deadlock_detect(r, lkb)) {
1407 lkb->lkb_grmode = DLM_LOCK_NL;
1408 lkb->lkb_sbflags |= DLM_SBF_DEMOTED;
1409 }
1410
David Teigland90135922006-01-20 08:47:07 +00001411 return 0;
David Teiglande7fd4172006-01-18 09:30:29 +00001412}
1413
1414/*
1415 * The ALTPR and ALTCW flags aren't traditional lock manager flags, but are a
1416 * simple way to provide a big optimization to applications that can use them.
1417 */
1418
1419static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
1420{
1421 uint32_t flags = lkb->lkb_exflags;
1422 int rv;
1423 int8_t alt = 0, rqmode = lkb->lkb_rqmode;
1424
1425 rv = _can_be_granted(r, lkb, now);
1426 if (rv)
1427 goto out;
1428
1429 if (lkb->lkb_sbflags & DLM_SBF_DEMOTED)
1430 goto out;
1431
1432 if (rqmode != DLM_LOCK_PR && flags & DLM_LKF_ALTPR)
1433 alt = DLM_LOCK_PR;
1434 else if (rqmode != DLM_LOCK_CW && flags & DLM_LKF_ALTCW)
1435 alt = DLM_LOCK_CW;
1436
1437 if (alt) {
1438 lkb->lkb_rqmode = alt;
1439 rv = _can_be_granted(r, lkb, now);
1440 if (rv)
1441 lkb->lkb_sbflags |= DLM_SBF_ALTMODE;
1442 else
1443 lkb->lkb_rqmode = rqmode;
1444 }
1445 out:
1446 return rv;
1447}
1448
1449static int grant_pending_convert(struct dlm_rsb *r, int high)
1450{
1451 struct dlm_lkb *lkb, *s;
1452 int hi, demoted, quit, grant_restart, demote_restart;
1453
1454 quit = 0;
1455 restart:
1456 grant_restart = 0;
1457 demote_restart = 0;
1458 hi = DLM_LOCK_IV;
1459
1460 list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) {
1461 demoted = is_demoted(lkb);
David Teigland90135922006-01-20 08:47:07 +00001462 if (can_be_granted(r, lkb, 0)) {
David Teiglande7fd4172006-01-18 09:30:29 +00001463 grant_lock_pending(r, lkb);
1464 grant_restart = 1;
1465 } else {
1466 hi = max_t(int, lkb->lkb_rqmode, hi);
1467 if (!demoted && is_demoted(lkb))
1468 demote_restart = 1;
1469 }
1470 }
1471
1472 if (grant_restart)
1473 goto restart;
1474 if (demote_restart && !quit) {
1475 quit = 1;
1476 goto restart;
1477 }
1478
1479 return max_t(int, high, hi);
1480}
1481
1482static int grant_pending_wait(struct dlm_rsb *r, int high)
1483{
1484 struct dlm_lkb *lkb, *s;
1485
1486 list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
David Teigland90135922006-01-20 08:47:07 +00001487 if (can_be_granted(r, lkb, 0))
David Teiglande7fd4172006-01-18 09:30:29 +00001488 grant_lock_pending(r, lkb);
1489 else
1490 high = max_t(int, lkb->lkb_rqmode, high);
1491 }
1492
1493 return high;
1494}
1495
1496static void grant_pending_locks(struct dlm_rsb *r)
1497{
1498 struct dlm_lkb *lkb, *s;
1499 int high = DLM_LOCK_IV;
1500
David Teiglanda345da32006-08-18 11:54:25 -05001501 DLM_ASSERT(is_master(r), dlm_dump_rsb(r););
David Teiglande7fd4172006-01-18 09:30:29 +00001502
1503 high = grant_pending_convert(r, high);
1504 high = grant_pending_wait(r, high);
1505
1506 if (high == DLM_LOCK_IV)
1507 return;
1508
1509 /*
1510 * If there are locks left on the wait/convert queue then send blocking
1511 * ASTs to granted locks based on the largest requested mode (high)
David Teigland3bcd3682006-02-23 09:56:38 +00001512 * found above. FIXME: highbast < high comparison not valid for PR/CW.
David Teiglande7fd4172006-01-18 09:30:29 +00001513 */
1514
1515 list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) {
1516 if (lkb->lkb_bastaddr && (lkb->lkb_highbast < high) &&
1517 !__dlm_compat_matrix[lkb->lkb_grmode+1][high+1]) {
1518 queue_bast(r, lkb, high);
1519 lkb->lkb_highbast = high;
1520 }
1521 }
1522}
1523
1524static void send_bast_queue(struct dlm_rsb *r, struct list_head *head,
1525 struct dlm_lkb *lkb)
1526{
1527 struct dlm_lkb *gr;
1528
1529 list_for_each_entry(gr, head, lkb_statequeue) {
1530 if (gr->lkb_bastaddr &&
1531 gr->lkb_highbast < lkb->lkb_rqmode &&
David Teigland3bcd3682006-02-23 09:56:38 +00001532 !modes_compat(gr, lkb)) {
David Teiglande7fd4172006-01-18 09:30:29 +00001533 queue_bast(r, gr, lkb->lkb_rqmode);
1534 gr->lkb_highbast = lkb->lkb_rqmode;
1535 }
1536 }
1537}
1538
1539static void send_blocking_asts(struct dlm_rsb *r, struct dlm_lkb *lkb)
1540{
1541 send_bast_queue(r, &r->res_grantqueue, lkb);
1542}
1543
1544static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb)
1545{
1546 send_bast_queue(r, &r->res_grantqueue, lkb);
1547 send_bast_queue(r, &r->res_convertqueue, lkb);
1548}
1549
1550/* set_master(r, lkb) -- set the master nodeid of a resource
1551
1552 The purpose of this function is to set the nodeid field in the given
1553 lkb using the nodeid field in the given rsb. If the rsb's nodeid is
1554 known, it can just be copied to the lkb and the function will return
1555 0. If the rsb's nodeid is _not_ known, it needs to be looked up
1556 before it can be copied to the lkb.
1557
1558 When the rsb nodeid is being looked up remotely, the initial lkb
1559 causing the lookup is kept on the ls_waiters list waiting for the
1560 lookup reply. Other lkb's waiting for the same rsb lookup are kept
1561 on the rsb's res_lookup list until the master is verified.
1562
1563 Return values:
1564 0: nodeid is set in rsb/lkb and the caller should go ahead and use it
1565 1: the rsb master is not available and the lkb has been placed on
1566 a wait queue
1567*/
1568
1569static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
1570{
1571 struct dlm_ls *ls = r->res_ls;
1572 int error, dir_nodeid, ret_nodeid, our_nodeid = dlm_our_nodeid();
1573
1574 if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) {
1575 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
1576 r->res_first_lkid = lkb->lkb_id;
1577 lkb->lkb_nodeid = r->res_nodeid;
1578 return 0;
1579 }
1580
1581 if (r->res_first_lkid && r->res_first_lkid != lkb->lkb_id) {
1582 list_add_tail(&lkb->lkb_rsb_lookup, &r->res_lookup);
1583 return 1;
1584 }
1585
1586 if (r->res_nodeid == 0) {
1587 lkb->lkb_nodeid = 0;
1588 return 0;
1589 }
1590
1591 if (r->res_nodeid > 0) {
1592 lkb->lkb_nodeid = r->res_nodeid;
1593 return 0;
1594 }
1595
David Teiglanda345da32006-08-18 11:54:25 -05001596 DLM_ASSERT(r->res_nodeid == -1, dlm_dump_rsb(r););
David Teiglande7fd4172006-01-18 09:30:29 +00001597
1598 dir_nodeid = dlm_dir_nodeid(r);
1599
1600 if (dir_nodeid != our_nodeid) {
1601 r->res_first_lkid = lkb->lkb_id;
1602 send_lookup(r, lkb);
1603 return 1;
1604 }
1605
1606 for (;;) {
1607 /* It's possible for dlm_scand to remove an old rsb for
1608 this same resource from the toss list, us to create
1609 a new one, look up the master locally, and find it
1610 already exists just before dlm_scand does the
1611 dir_remove() on the previous rsb. */
1612
1613 error = dlm_dir_lookup(ls, our_nodeid, r->res_name,
1614 r->res_length, &ret_nodeid);
1615 if (!error)
1616 break;
1617 log_debug(ls, "dir_lookup error %d %s", error, r->res_name);
1618 schedule();
1619 }
1620
1621 if (ret_nodeid == our_nodeid) {
1622 r->res_first_lkid = 0;
1623 r->res_nodeid = 0;
1624 lkb->lkb_nodeid = 0;
1625 } else {
1626 r->res_first_lkid = lkb->lkb_id;
1627 r->res_nodeid = ret_nodeid;
1628 lkb->lkb_nodeid = ret_nodeid;
1629 }
1630 return 0;
1631}
1632
1633static void process_lookup_list(struct dlm_rsb *r)
1634{
1635 struct dlm_lkb *lkb, *safe;
1636
1637 list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) {
David Teiglandef0c2bb2007-03-28 09:56:46 -05001638 list_del_init(&lkb->lkb_rsb_lookup);
David Teiglande7fd4172006-01-18 09:30:29 +00001639 _request_lock(r, lkb);
1640 schedule();
1641 }
1642}
1643
1644/* confirm_master -- confirm (or deny) an rsb's master nodeid */
1645
1646static void confirm_master(struct dlm_rsb *r, int error)
1647{
1648 struct dlm_lkb *lkb;
1649
1650 if (!r->res_first_lkid)
1651 return;
1652
1653 switch (error) {
1654 case 0:
1655 case -EINPROGRESS:
1656 r->res_first_lkid = 0;
1657 process_lookup_list(r);
1658 break;
1659
1660 case -EAGAIN:
1661 /* the remote master didn't queue our NOQUEUE request;
1662 make a waiting lkb the first_lkid */
1663
1664 r->res_first_lkid = 0;
1665
1666 if (!list_empty(&r->res_lookup)) {
1667 lkb = list_entry(r->res_lookup.next, struct dlm_lkb,
1668 lkb_rsb_lookup);
David Teiglandef0c2bb2007-03-28 09:56:46 -05001669 list_del_init(&lkb->lkb_rsb_lookup);
David Teiglande7fd4172006-01-18 09:30:29 +00001670 r->res_first_lkid = lkb->lkb_id;
1671 _request_lock(r, lkb);
1672 } else
1673 r->res_nodeid = -1;
1674 break;
1675
1676 default:
1677 log_error(r->res_ls, "confirm_master unknown error %d", error);
1678 }
1679}
1680
1681static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags,
1682 int namelen, uint32_t parent_lkid, void *ast,
David Teigland3bcd3682006-02-23 09:56:38 +00001683 void *astarg, void *bast, struct dlm_args *args)
David Teiglande7fd4172006-01-18 09:30:29 +00001684{
1685 int rv = -EINVAL;
1686
1687 /* check for invalid arg usage */
1688
1689 if (mode < 0 || mode > DLM_LOCK_EX)
1690 goto out;
1691
1692 if (!(flags & DLM_LKF_CONVERT) && (namelen > DLM_RESNAME_MAXLEN))
1693 goto out;
1694
1695 if (flags & DLM_LKF_CANCEL)
1696 goto out;
1697
1698 if (flags & DLM_LKF_QUECVT && !(flags & DLM_LKF_CONVERT))
1699 goto out;
1700
1701 if (flags & DLM_LKF_CONVDEADLK && !(flags & DLM_LKF_CONVERT))
1702 goto out;
1703
1704 if (flags & DLM_LKF_CONVDEADLK && flags & DLM_LKF_NOQUEUE)
1705 goto out;
1706
1707 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_CONVERT)
1708 goto out;
1709
1710 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_QUECVT)
1711 goto out;
1712
1713 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_NOQUEUE)
1714 goto out;
1715
1716 if (flags & DLM_LKF_EXPEDITE && mode != DLM_LOCK_NL)
1717 goto out;
1718
1719 if (!ast || !lksb)
1720 goto out;
1721
1722 if (flags & DLM_LKF_VALBLK && !lksb->sb_lvbptr)
1723 goto out;
1724
1725 /* parent/child locks not yet supported */
1726 if (parent_lkid)
1727 goto out;
1728
1729 if (flags & DLM_LKF_CONVERT && !lksb->sb_lkid)
1730 goto out;
1731
1732 /* these args will be copied to the lkb in validate_lock_args,
1733 it cannot be done now because when converting locks, fields in
1734 an active lkb cannot be modified before locking the rsb */
1735
1736 args->flags = flags;
1737 args->astaddr = ast;
1738 args->astparam = (long) astarg;
1739 args->bastaddr = bast;
1740 args->mode = mode;
1741 args->lksb = lksb;
David Teiglande7fd4172006-01-18 09:30:29 +00001742 rv = 0;
1743 out:
1744 return rv;
1745}
1746
1747static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args)
1748{
1749 if (flags & ~(DLM_LKF_CANCEL | DLM_LKF_VALBLK | DLM_LKF_IVVALBLK |
1750 DLM_LKF_FORCEUNLOCK))
1751 return -EINVAL;
1752
David Teiglandef0c2bb2007-03-28 09:56:46 -05001753 if (flags & DLM_LKF_CANCEL && flags & DLM_LKF_FORCEUNLOCK)
1754 return -EINVAL;
1755
David Teiglande7fd4172006-01-18 09:30:29 +00001756 args->flags = flags;
1757 args->astparam = (long) astarg;
1758 return 0;
1759}
1760
1761static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
1762 struct dlm_args *args)
1763{
1764 int rv = -EINVAL;
1765
1766 if (args->flags & DLM_LKF_CONVERT) {
1767 if (lkb->lkb_flags & DLM_IFL_MSTCPY)
1768 goto out;
1769
1770 if (args->flags & DLM_LKF_QUECVT &&
1771 !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1])
1772 goto out;
1773
1774 rv = -EBUSY;
1775 if (lkb->lkb_status != DLM_LKSTS_GRANTED)
1776 goto out;
1777
1778 if (lkb->lkb_wait_type)
1779 goto out;
David Teiglandef0c2bb2007-03-28 09:56:46 -05001780
1781 if (is_overlap(lkb))
1782 goto out;
David Teiglande7fd4172006-01-18 09:30:29 +00001783 }
1784
1785 lkb->lkb_exflags = args->flags;
1786 lkb->lkb_sbflags = 0;
1787 lkb->lkb_astaddr = args->astaddr;
1788 lkb->lkb_astparam = args->astparam;
1789 lkb->lkb_bastaddr = args->bastaddr;
1790 lkb->lkb_rqmode = args->mode;
1791 lkb->lkb_lksb = args->lksb;
1792 lkb->lkb_lvbptr = args->lksb->sb_lvbptr;
1793 lkb->lkb_ownpid = (int) current->pid;
David Teiglande7fd4172006-01-18 09:30:29 +00001794 rv = 0;
1795 out:
1796 return rv;
1797}
1798
David Teiglandef0c2bb2007-03-28 09:56:46 -05001799/* when dlm_unlock() sees -EBUSY with CANCEL/FORCEUNLOCK it returns 0
1800 for success */
1801
1802/* note: it's valid for lkb_nodeid/res_nodeid to be -1 when we get here
1803 because there may be a lookup in progress and it's valid to do
1804 cancel/unlockf on it */
1805
David Teiglande7fd4172006-01-18 09:30:29 +00001806static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
1807{
David Teiglandef0c2bb2007-03-28 09:56:46 -05001808 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
David Teiglande7fd4172006-01-18 09:30:29 +00001809 int rv = -EINVAL;
1810
David Teiglandef0c2bb2007-03-28 09:56:46 -05001811 if (lkb->lkb_flags & DLM_IFL_MSTCPY) {
1812 log_error(ls, "unlock on MSTCPY %x", lkb->lkb_id);
1813 dlm_print_lkb(lkb);
David Teiglande7fd4172006-01-18 09:30:29 +00001814 goto out;
David Teiglandef0c2bb2007-03-28 09:56:46 -05001815 }
David Teiglande7fd4172006-01-18 09:30:29 +00001816
David Teiglandef0c2bb2007-03-28 09:56:46 -05001817 /* an lkb may still exist even though the lock is EOL'ed due to a
1818 cancel, unlock or failed noqueue request; an app can't use these
1819 locks; return same error as if the lkid had not been found at all */
1820
1821 if (lkb->lkb_flags & DLM_IFL_ENDOFLIFE) {
1822 log_debug(ls, "unlock on ENDOFLIFE %x", lkb->lkb_id);
1823 rv = -ENOENT;
1824 goto out;
1825 }
1826
1827 /* an lkb may be waiting for an rsb lookup to complete where the
1828 lookup was initiated by another lock */
1829
1830 if (args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) {
1831 if (!list_empty(&lkb->lkb_rsb_lookup)) {
1832 log_debug(ls, "unlock on rsb_lookup %x", lkb->lkb_id);
1833 list_del_init(&lkb->lkb_rsb_lookup);
1834 queue_cast(lkb->lkb_resource, lkb,
1835 args->flags & DLM_LKF_CANCEL ?
1836 -DLM_ECANCEL : -DLM_EUNLOCK);
1837 unhold_lkb(lkb); /* undoes create_lkb() */
1838 rv = -EBUSY;
1839 goto out;
1840 }
1841 }
1842
1843 /* cancel not allowed with another cancel/unlock in progress */
1844
1845 if (args->flags & DLM_LKF_CANCEL) {
1846 if (lkb->lkb_exflags & DLM_LKF_CANCEL)
1847 goto out;
1848
1849 if (is_overlap(lkb))
1850 goto out;
1851
1852 if (lkb->lkb_flags & DLM_IFL_RESEND) {
1853 lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
1854 rv = -EBUSY;
1855 goto out;
1856 }
1857
1858 switch (lkb->lkb_wait_type) {
1859 case DLM_MSG_LOOKUP:
1860 case DLM_MSG_REQUEST:
1861 lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
1862 rv = -EBUSY;
1863 goto out;
1864 case DLM_MSG_UNLOCK:
1865 case DLM_MSG_CANCEL:
1866 goto out;
1867 }
1868 /* add_to_waiters() will set OVERLAP_CANCEL */
David Teiglande7fd4172006-01-18 09:30:29 +00001869 goto out_ok;
David Teiglandef0c2bb2007-03-28 09:56:46 -05001870 }
David Teiglande7fd4172006-01-18 09:30:29 +00001871
David Teiglandef0c2bb2007-03-28 09:56:46 -05001872 /* do we need to allow a force-unlock if there's a normal unlock
1873 already in progress? in what conditions could the normal unlock
1874 fail such that we'd want to send a force-unlock to be sure? */
David Teiglande7fd4172006-01-18 09:30:29 +00001875
David Teiglandef0c2bb2007-03-28 09:56:46 -05001876 if (args->flags & DLM_LKF_FORCEUNLOCK) {
1877 if (lkb->lkb_exflags & DLM_LKF_FORCEUNLOCK)
1878 goto out;
David Teiglande7fd4172006-01-18 09:30:29 +00001879
David Teiglandef0c2bb2007-03-28 09:56:46 -05001880 if (is_overlap_unlock(lkb))
1881 goto out;
1882
1883 if (lkb->lkb_flags & DLM_IFL_RESEND) {
1884 lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
1885 rv = -EBUSY;
1886 goto out;
1887 }
1888
1889 switch (lkb->lkb_wait_type) {
1890 case DLM_MSG_LOOKUP:
1891 case DLM_MSG_REQUEST:
1892 lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
1893 rv = -EBUSY;
1894 goto out;
1895 case DLM_MSG_UNLOCK:
1896 goto out;
1897 }
1898 /* add_to_waiters() will set OVERLAP_UNLOCK */
1899 goto out_ok;
1900 }
1901
1902 /* normal unlock not allowed if there's any op in progress */
David Teiglande7fd4172006-01-18 09:30:29 +00001903 rv = -EBUSY;
David Teiglandef0c2bb2007-03-28 09:56:46 -05001904 if (lkb->lkb_wait_type || lkb->lkb_wait_count)
David Teiglande7fd4172006-01-18 09:30:29 +00001905 goto out;
1906
1907 out_ok:
David Teiglandef0c2bb2007-03-28 09:56:46 -05001908 /* an overlapping op shouldn't blow away exflags from other op */
1909 lkb->lkb_exflags |= args->flags;
David Teiglande7fd4172006-01-18 09:30:29 +00001910 lkb->lkb_sbflags = 0;
1911 lkb->lkb_astparam = args->astparam;
David Teiglande7fd4172006-01-18 09:30:29 +00001912 rv = 0;
1913 out:
David Teiglandef0c2bb2007-03-28 09:56:46 -05001914 if (rv)
1915 log_debug(ls, "validate_unlock_args %d %x %x %x %x %d %s", rv,
1916 lkb->lkb_id, lkb->lkb_flags, lkb->lkb_exflags,
1917 args->flags, lkb->lkb_wait_type,
1918 lkb->lkb_resource->res_name);
David Teiglande7fd4172006-01-18 09:30:29 +00001919 return rv;
1920}
1921
1922/*
1923 * Four stage 4 varieties:
1924 * do_request(), do_convert(), do_unlock(), do_cancel()
1925 * These are called on the master node for the given lock and
1926 * from the central locking logic.
1927 */
1928
1929static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
1930{
1931 int error = 0;
1932
David Teigland90135922006-01-20 08:47:07 +00001933 if (can_be_granted(r, lkb, 1)) {
David Teiglande7fd4172006-01-18 09:30:29 +00001934 grant_lock(r, lkb);
1935 queue_cast(r, lkb, 0);
1936 goto out;
1937 }
1938
1939 if (can_be_queued(lkb)) {
1940 error = -EINPROGRESS;
1941 add_lkb(r, lkb, DLM_LKSTS_WAITING);
1942 send_blocking_asts(r, lkb);
1943 goto out;
1944 }
1945
1946 error = -EAGAIN;
1947 if (force_blocking_asts(lkb))
1948 send_blocking_asts_all(r, lkb);
1949 queue_cast(r, lkb, -EAGAIN);
1950
1951 out:
1952 return error;
1953}
1954
1955static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
1956{
1957 int error = 0;
1958
1959 /* changing an existing lock may allow others to be granted */
1960
David Teigland90135922006-01-20 08:47:07 +00001961 if (can_be_granted(r, lkb, 1)) {
David Teiglande7fd4172006-01-18 09:30:29 +00001962 grant_lock(r, lkb);
1963 queue_cast(r, lkb, 0);
1964 grant_pending_locks(r);
1965 goto out;
1966 }
1967
1968 if (can_be_queued(lkb)) {
1969 if (is_demoted(lkb))
1970 grant_pending_locks(r);
1971 error = -EINPROGRESS;
1972 del_lkb(r, lkb);
1973 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
1974 send_blocking_asts(r, lkb);
1975 goto out;
1976 }
1977
1978 error = -EAGAIN;
1979 if (force_blocking_asts(lkb))
1980 send_blocking_asts_all(r, lkb);
1981 queue_cast(r, lkb, -EAGAIN);
1982
1983 out:
1984 return error;
1985}
1986
1987static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1988{
1989 remove_lock(r, lkb);
1990 queue_cast(r, lkb, -DLM_EUNLOCK);
1991 grant_pending_locks(r);
1992 return -DLM_EUNLOCK;
1993}
1994
David Teiglandef0c2bb2007-03-28 09:56:46 -05001995/* returns: 0 did nothing, -DLM_ECANCEL canceled lock */
Steven Whitehouse907b9bc2006-09-25 09:26:04 -04001996
David Teiglande7fd4172006-01-18 09:30:29 +00001997static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
1998{
David Teiglandef0c2bb2007-03-28 09:56:46 -05001999 int error;
2000
2001 error = revert_lock(r, lkb);
2002 if (error) {
2003 queue_cast(r, lkb, -DLM_ECANCEL);
2004 grant_pending_locks(r);
2005 return -DLM_ECANCEL;
2006 }
2007 return 0;
David Teiglande7fd4172006-01-18 09:30:29 +00002008}
2009
2010/*
2011 * Four stage 3 varieties:
2012 * _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock()
2013 */
2014
2015/* add a new lkb to a possibly new rsb, called by requesting process */
2016
2017static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2018{
2019 int error;
2020
2021 /* set_master: sets lkb nodeid from r */
2022
2023 error = set_master(r, lkb);
2024 if (error < 0)
2025 goto out;
2026 if (error) {
2027 error = 0;
2028 goto out;
2029 }
2030
2031 if (is_remote(r))
2032 /* receive_request() calls do_request() on remote node */
2033 error = send_request(r, lkb);
2034 else
2035 error = do_request(r, lkb);
2036 out:
2037 return error;
2038}
2039
David Teigland3bcd3682006-02-23 09:56:38 +00002040/* change some property of an existing lkb, e.g. mode */
David Teiglande7fd4172006-01-18 09:30:29 +00002041
2042static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2043{
2044 int error;
2045
2046 if (is_remote(r))
2047 /* receive_convert() calls do_convert() on remote node */
2048 error = send_convert(r, lkb);
2049 else
2050 error = do_convert(r, lkb);
2051
2052 return error;
2053}
2054
2055/* remove an existing lkb from the granted queue */
2056
2057static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2058{
2059 int error;
2060
2061 if (is_remote(r))
2062 /* receive_unlock() calls do_unlock() on remote node */
2063 error = send_unlock(r, lkb);
2064 else
2065 error = do_unlock(r, lkb);
2066
2067 return error;
2068}
2069
2070/* remove an existing lkb from the convert or wait queue */
2071
2072static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2073{
2074 int error;
2075
2076 if (is_remote(r))
2077 /* receive_cancel() calls do_cancel() on remote node */
2078 error = send_cancel(r, lkb);
2079 else
2080 error = do_cancel(r, lkb);
2081
2082 return error;
2083}
2084
2085/*
2086 * Four stage 2 varieties:
2087 * request_lock(), convert_lock(), unlock_lock(), cancel_lock()
2088 */
2089
2090static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, char *name,
2091 int len, struct dlm_args *args)
2092{
2093 struct dlm_rsb *r;
2094 int error;
2095
2096 error = validate_lock_args(ls, lkb, args);
2097 if (error)
2098 goto out;
2099
2100 error = find_rsb(ls, name, len, R_CREATE, &r);
2101 if (error)
2102 goto out;
2103
2104 lock_rsb(r);
2105
2106 attach_lkb(r, lkb);
2107 lkb->lkb_lksb->sb_lkid = lkb->lkb_id;
2108
2109 error = _request_lock(r, lkb);
2110
2111 unlock_rsb(r);
2112 put_rsb(r);
2113
2114 out:
2115 return error;
2116}
2117
2118static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2119 struct dlm_args *args)
2120{
2121 struct dlm_rsb *r;
2122 int error;
2123
2124 r = lkb->lkb_resource;
2125
2126 hold_rsb(r);
2127 lock_rsb(r);
2128
2129 error = validate_lock_args(ls, lkb, args);
2130 if (error)
2131 goto out;
2132
2133 error = _convert_lock(r, lkb);
2134 out:
2135 unlock_rsb(r);
2136 put_rsb(r);
2137 return error;
2138}
2139
2140static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2141 struct dlm_args *args)
2142{
2143 struct dlm_rsb *r;
2144 int error;
2145
2146 r = lkb->lkb_resource;
2147
2148 hold_rsb(r);
2149 lock_rsb(r);
2150
2151 error = validate_unlock_args(lkb, args);
2152 if (error)
2153 goto out;
2154
2155 error = _unlock_lock(r, lkb);
2156 out:
2157 unlock_rsb(r);
2158 put_rsb(r);
2159 return error;
2160}
2161
2162static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2163 struct dlm_args *args)
2164{
2165 struct dlm_rsb *r;
2166 int error;
2167
2168 r = lkb->lkb_resource;
2169
2170 hold_rsb(r);
2171 lock_rsb(r);
2172
2173 error = validate_unlock_args(lkb, args);
2174 if (error)
2175 goto out;
2176
2177 error = _cancel_lock(r, lkb);
2178 out:
2179 unlock_rsb(r);
2180 put_rsb(r);
2181 return error;
2182}
2183
2184/*
2185 * Two stage 1 varieties: dlm_lock() and dlm_unlock()
2186 */
2187
2188int dlm_lock(dlm_lockspace_t *lockspace,
2189 int mode,
2190 struct dlm_lksb *lksb,
2191 uint32_t flags,
2192 void *name,
2193 unsigned int namelen,
2194 uint32_t parent_lkid,
2195 void (*ast) (void *astarg),
2196 void *astarg,
David Teigland3bcd3682006-02-23 09:56:38 +00002197 void (*bast) (void *astarg, int mode))
David Teiglande7fd4172006-01-18 09:30:29 +00002198{
2199 struct dlm_ls *ls;
2200 struct dlm_lkb *lkb;
2201 struct dlm_args args;
2202 int error, convert = flags & DLM_LKF_CONVERT;
2203
2204 ls = dlm_find_lockspace_local(lockspace);
2205 if (!ls)
2206 return -EINVAL;
2207
2208 lock_recovery(ls);
2209
2210 if (convert)
2211 error = find_lkb(ls, lksb->sb_lkid, &lkb);
2212 else
2213 error = create_lkb(ls, &lkb);
2214
2215 if (error)
2216 goto out;
2217
2218 error = set_lock_args(mode, lksb, flags, namelen, parent_lkid, ast,
David Teigland3bcd3682006-02-23 09:56:38 +00002219 astarg, bast, &args);
David Teiglande7fd4172006-01-18 09:30:29 +00002220 if (error)
2221 goto out_put;
2222
2223 if (convert)
2224 error = convert_lock(ls, lkb, &args);
2225 else
2226 error = request_lock(ls, lkb, name, namelen, &args);
2227
2228 if (error == -EINPROGRESS)
2229 error = 0;
2230 out_put:
2231 if (convert || error)
David Teiglandb3f58d82006-02-28 11:16:37 -05002232 __put_lkb(ls, lkb);
David Teiglande7fd4172006-01-18 09:30:29 +00002233 if (error == -EAGAIN)
2234 error = 0;
2235 out:
2236 unlock_recovery(ls);
2237 dlm_put_lockspace(ls);
2238 return error;
2239}
2240
2241int dlm_unlock(dlm_lockspace_t *lockspace,
2242 uint32_t lkid,
2243 uint32_t flags,
2244 struct dlm_lksb *lksb,
2245 void *astarg)
2246{
2247 struct dlm_ls *ls;
2248 struct dlm_lkb *lkb;
2249 struct dlm_args args;
2250 int error;
2251
2252 ls = dlm_find_lockspace_local(lockspace);
2253 if (!ls)
2254 return -EINVAL;
2255
2256 lock_recovery(ls);
2257
2258 error = find_lkb(ls, lkid, &lkb);
2259 if (error)
2260 goto out;
2261
2262 error = set_unlock_args(flags, astarg, &args);
2263 if (error)
2264 goto out_put;
2265
2266 if (flags & DLM_LKF_CANCEL)
2267 error = cancel_lock(ls, lkb, &args);
2268 else
2269 error = unlock_lock(ls, lkb, &args);
2270
2271 if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL)
2272 error = 0;
David Teiglandef0c2bb2007-03-28 09:56:46 -05002273 if (error == -EBUSY && (flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)))
2274 error = 0;
David Teiglande7fd4172006-01-18 09:30:29 +00002275 out_put:
David Teiglandb3f58d82006-02-28 11:16:37 -05002276 dlm_put_lkb(lkb);
David Teiglande7fd4172006-01-18 09:30:29 +00002277 out:
2278 unlock_recovery(ls);
2279 dlm_put_lockspace(ls);
2280 return error;
2281}
2282
2283/*
2284 * send/receive routines for remote operations and replies
2285 *
2286 * send_args
2287 * send_common
2288 * send_request receive_request
2289 * send_convert receive_convert
2290 * send_unlock receive_unlock
2291 * send_cancel receive_cancel
2292 * send_grant receive_grant
2293 * send_bast receive_bast
2294 * send_lookup receive_lookup
2295 * send_remove receive_remove
2296 *
2297 * send_common_reply
2298 * receive_request_reply send_request_reply
2299 * receive_convert_reply send_convert_reply
2300 * receive_unlock_reply send_unlock_reply
2301 * receive_cancel_reply send_cancel_reply
2302 * receive_lookup_reply send_lookup_reply
2303 */
2304
David Teigland7e4dac32007-04-02 09:06:41 -05002305static int _create_message(struct dlm_ls *ls, int mb_len,
2306 int to_nodeid, int mstype,
2307 struct dlm_message **ms_ret,
2308 struct dlm_mhandle **mh_ret)
2309{
2310 struct dlm_message *ms;
2311 struct dlm_mhandle *mh;
2312 char *mb;
2313
2314 /* get_buffer gives us a message handle (mh) that we need to
2315 pass into lowcomms_commit and a message buffer (mb) that we
2316 write our data into */
2317
2318 mh = dlm_lowcomms_get_buffer(to_nodeid, mb_len, GFP_KERNEL, &mb);
2319 if (!mh)
2320 return -ENOBUFS;
2321
2322 memset(mb, 0, mb_len);
2323
2324 ms = (struct dlm_message *) mb;
2325
2326 ms->m_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
2327 ms->m_header.h_lockspace = ls->ls_global_id;
2328 ms->m_header.h_nodeid = dlm_our_nodeid();
2329 ms->m_header.h_length = mb_len;
2330 ms->m_header.h_cmd = DLM_MSG;
2331
2332 ms->m_type = mstype;
2333
2334 *mh_ret = mh;
2335 *ms_ret = ms;
2336 return 0;
2337}
2338
David Teiglande7fd4172006-01-18 09:30:29 +00002339static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
2340 int to_nodeid, int mstype,
2341 struct dlm_message **ms_ret,
2342 struct dlm_mhandle **mh_ret)
2343{
David Teiglande7fd4172006-01-18 09:30:29 +00002344 int mb_len = sizeof(struct dlm_message);
2345
2346 switch (mstype) {
2347 case DLM_MSG_REQUEST:
2348 case DLM_MSG_LOOKUP:
2349 case DLM_MSG_REMOVE:
2350 mb_len += r->res_length;
2351 break;
2352 case DLM_MSG_CONVERT:
2353 case DLM_MSG_UNLOCK:
2354 case DLM_MSG_REQUEST_REPLY:
2355 case DLM_MSG_CONVERT_REPLY:
2356 case DLM_MSG_GRANT:
2357 if (lkb && lkb->lkb_lvbptr)
2358 mb_len += r->res_ls->ls_lvblen;
2359 break;
2360 }
2361
David Teigland7e4dac32007-04-02 09:06:41 -05002362 return _create_message(r->res_ls, mb_len, to_nodeid, mstype,
2363 ms_ret, mh_ret);
David Teiglande7fd4172006-01-18 09:30:29 +00002364}
2365
2366/* further lowcomms enhancements or alternate implementations may make
2367 the return value from this function useful at some point */
2368
2369static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms)
2370{
2371 dlm_message_out(ms);
2372 dlm_lowcomms_commit_buffer(mh);
2373 return 0;
2374}
2375
2376static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
2377 struct dlm_message *ms)
2378{
2379 ms->m_nodeid = lkb->lkb_nodeid;
2380 ms->m_pid = lkb->lkb_ownpid;
2381 ms->m_lkid = lkb->lkb_id;
2382 ms->m_remid = lkb->lkb_remid;
2383 ms->m_exflags = lkb->lkb_exflags;
2384 ms->m_sbflags = lkb->lkb_sbflags;
2385 ms->m_flags = lkb->lkb_flags;
2386 ms->m_lvbseq = lkb->lkb_lvbseq;
2387 ms->m_status = lkb->lkb_status;
2388 ms->m_grmode = lkb->lkb_grmode;
2389 ms->m_rqmode = lkb->lkb_rqmode;
2390 ms->m_hash = r->res_hash;
2391
2392 /* m_result and m_bastmode are set from function args,
2393 not from lkb fields */
2394
2395 if (lkb->lkb_bastaddr)
2396 ms->m_asts |= AST_BAST;
2397 if (lkb->lkb_astaddr)
2398 ms->m_asts |= AST_COMP;
2399
David Teiglandda49f362006-12-13 10:38:45 -06002400 /* compare with switch in create_message; send_remove() doesn't
2401 use send_args() */
2402
2403 switch (ms->m_type) {
2404 case DLM_MSG_REQUEST:
2405 case DLM_MSG_LOOKUP:
David Teiglande7fd4172006-01-18 09:30:29 +00002406 memcpy(ms->m_extra, r->res_name, r->res_length);
David Teiglandda49f362006-12-13 10:38:45 -06002407 break;
2408 case DLM_MSG_CONVERT:
2409 case DLM_MSG_UNLOCK:
2410 case DLM_MSG_REQUEST_REPLY:
2411 case DLM_MSG_CONVERT_REPLY:
2412 case DLM_MSG_GRANT:
2413 if (!lkb->lkb_lvbptr)
2414 break;
David Teiglande7fd4172006-01-18 09:30:29 +00002415 memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
David Teiglandda49f362006-12-13 10:38:45 -06002416 break;
2417 }
David Teiglande7fd4172006-01-18 09:30:29 +00002418}
2419
2420static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
2421{
2422 struct dlm_message *ms;
2423 struct dlm_mhandle *mh;
2424 int to_nodeid, error;
2425
David Teiglandef0c2bb2007-03-28 09:56:46 -05002426 error = add_to_waiters(lkb, mstype);
2427 if (error)
2428 return error;
David Teiglande7fd4172006-01-18 09:30:29 +00002429
2430 to_nodeid = r->res_nodeid;
2431
2432 error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
2433 if (error)
2434 goto fail;
2435
2436 send_args(r, lkb, ms);
2437
2438 error = send_message(mh, ms);
2439 if (error)
2440 goto fail;
2441 return 0;
2442
2443 fail:
David Teiglandef0c2bb2007-03-28 09:56:46 -05002444 remove_from_waiters(lkb, msg_reply_type(mstype));
David Teiglande7fd4172006-01-18 09:30:29 +00002445 return error;
2446}
2447
2448static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
2449{
2450 return send_common(r, lkb, DLM_MSG_REQUEST);
2451}
2452
2453static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2454{
2455 int error;
2456
2457 error = send_common(r, lkb, DLM_MSG_CONVERT);
2458
2459 /* down conversions go without a reply from the master */
2460 if (!error && down_conversion(lkb)) {
David Teiglandef0c2bb2007-03-28 09:56:46 -05002461 remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY);
2462 r->res_ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
David Teiglande7fd4172006-01-18 09:30:29 +00002463 r->res_ls->ls_stub_ms.m_result = 0;
David Teigland32f105a2006-08-23 16:07:31 -04002464 r->res_ls->ls_stub_ms.m_flags = lkb->lkb_flags;
David Teiglande7fd4172006-01-18 09:30:29 +00002465 __receive_convert_reply(r, lkb, &r->res_ls->ls_stub_ms);
2466 }
2467
2468 return error;
2469}
2470
2471/* FIXME: if this lkb is the only lock we hold on the rsb, then set
2472 MASTER_UNCERTAIN to force the next request on the rsb to confirm
2473 that the master is still correct. */
2474
2475static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2476{
2477 return send_common(r, lkb, DLM_MSG_UNLOCK);
2478}
2479
2480static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2481{
2482 return send_common(r, lkb, DLM_MSG_CANCEL);
2483}
2484
2485static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
2486{
2487 struct dlm_message *ms;
2488 struct dlm_mhandle *mh;
2489 int to_nodeid, error;
2490
2491 to_nodeid = lkb->lkb_nodeid;
2492
2493 error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh);
2494 if (error)
2495 goto out;
2496
2497 send_args(r, lkb, ms);
2498
2499 ms->m_result = 0;
2500
2501 error = send_message(mh, ms);
2502 out:
2503 return error;
2504}
2505
2506static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
2507{
2508 struct dlm_message *ms;
2509 struct dlm_mhandle *mh;
2510 int to_nodeid, error;
2511
2512 to_nodeid = lkb->lkb_nodeid;
2513
2514 error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh);
2515 if (error)
2516 goto out;
2517
2518 send_args(r, lkb, ms);
2519
2520 ms->m_bastmode = mode;
2521
2522 error = send_message(mh, ms);
2523 out:
2524 return error;
2525}
2526
2527static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
2528{
2529 struct dlm_message *ms;
2530 struct dlm_mhandle *mh;
2531 int to_nodeid, error;
2532
David Teiglandef0c2bb2007-03-28 09:56:46 -05002533 error = add_to_waiters(lkb, DLM_MSG_LOOKUP);
2534 if (error)
2535 return error;
David Teiglande7fd4172006-01-18 09:30:29 +00002536
2537 to_nodeid = dlm_dir_nodeid(r);
2538
2539 error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh);
2540 if (error)
2541 goto fail;
2542
2543 send_args(r, lkb, ms);
2544
2545 error = send_message(mh, ms);
2546 if (error)
2547 goto fail;
2548 return 0;
2549
2550 fail:
David Teiglandef0c2bb2007-03-28 09:56:46 -05002551 remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
David Teiglande7fd4172006-01-18 09:30:29 +00002552 return error;
2553}
2554
2555static int send_remove(struct dlm_rsb *r)
2556{
2557 struct dlm_message *ms;
2558 struct dlm_mhandle *mh;
2559 int to_nodeid, error;
2560
2561 to_nodeid = dlm_dir_nodeid(r);
2562
2563 error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh);
2564 if (error)
2565 goto out;
2566
2567 memcpy(ms->m_extra, r->res_name, r->res_length);
2568 ms->m_hash = r->res_hash;
2569
2570 error = send_message(mh, ms);
2571 out:
2572 return error;
2573}
2574
2575static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
2576 int mstype, int rv)
2577{
2578 struct dlm_message *ms;
2579 struct dlm_mhandle *mh;
2580 int to_nodeid, error;
2581
2582 to_nodeid = lkb->lkb_nodeid;
2583
2584 error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
2585 if (error)
2586 goto out;
2587
2588 send_args(r, lkb, ms);
2589
2590 ms->m_result = rv;
2591
2592 error = send_message(mh, ms);
2593 out:
2594 return error;
2595}
2596
2597static int send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2598{
2599 return send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv);
2600}
2601
2602static int send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2603{
2604 return send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv);
2605}
2606
2607static int send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2608{
2609 return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv);
2610}
2611
2612static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2613{
2614 return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv);
2615}
2616
2617static int send_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms_in,
2618 int ret_nodeid, int rv)
2619{
2620 struct dlm_rsb *r = &ls->ls_stub_rsb;
2621 struct dlm_message *ms;
2622 struct dlm_mhandle *mh;
2623 int error, nodeid = ms_in->m_header.h_nodeid;
2624
2625 error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh);
2626 if (error)
2627 goto out;
2628
2629 ms->m_lkid = ms_in->m_lkid;
2630 ms->m_result = rv;
2631 ms->m_nodeid = ret_nodeid;
2632
2633 error = send_message(mh, ms);
2634 out:
2635 return error;
2636}
2637
2638/* which args we save from a received message depends heavily on the type
2639 of message, unlike the send side where we can safely send everything about
2640 the lkb for any type of message */
2641
2642static void receive_flags(struct dlm_lkb *lkb, struct dlm_message *ms)
2643{
2644 lkb->lkb_exflags = ms->m_exflags;
David Teigland6f90a8b12006-11-10 14:16:27 -06002645 lkb->lkb_sbflags = ms->m_sbflags;
David Teiglande7fd4172006-01-18 09:30:29 +00002646 lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
2647 (ms->m_flags & 0x0000FFFF);
2648}
2649
2650static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
2651{
2652 lkb->lkb_sbflags = ms->m_sbflags;
2653 lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
2654 (ms->m_flags & 0x0000FFFF);
2655}
2656
2657static int receive_extralen(struct dlm_message *ms)
2658{
2659 return (ms->m_header.h_length - sizeof(struct dlm_message));
2660}
2661
David Teiglande7fd4172006-01-18 09:30:29 +00002662static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
2663 struct dlm_message *ms)
2664{
2665 int len;
2666
2667 if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
2668 if (!lkb->lkb_lvbptr)
2669 lkb->lkb_lvbptr = allocate_lvb(ls);
2670 if (!lkb->lkb_lvbptr)
2671 return -ENOMEM;
2672 len = receive_extralen(ms);
2673 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
2674 }
2675 return 0;
2676}
2677
2678static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2679 struct dlm_message *ms)
2680{
2681 lkb->lkb_nodeid = ms->m_header.h_nodeid;
2682 lkb->lkb_ownpid = ms->m_pid;
2683 lkb->lkb_remid = ms->m_lkid;
2684 lkb->lkb_grmode = DLM_LOCK_IV;
2685 lkb->lkb_rqmode = ms->m_rqmode;
2686 lkb->lkb_bastaddr = (void *) (long) (ms->m_asts & AST_BAST);
2687 lkb->lkb_astaddr = (void *) (long) (ms->m_asts & AST_COMP);
2688
2689 DLM_ASSERT(is_master_copy(lkb), dlm_print_lkb(lkb););
2690
David Teigland8d07fd52006-12-13 10:39:20 -06002691 if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
2692 /* lkb was just created so there won't be an lvb yet */
2693 lkb->lkb_lvbptr = allocate_lvb(ls);
2694 if (!lkb->lkb_lvbptr)
2695 return -ENOMEM;
2696 }
David Teiglande7fd4172006-01-18 09:30:29 +00002697
2698 return 0;
2699}
2700
2701static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2702 struct dlm_message *ms)
2703{
2704 if (lkb->lkb_nodeid != ms->m_header.h_nodeid) {
2705 log_error(ls, "convert_args nodeid %d %d lkid %x %x",
2706 lkb->lkb_nodeid, ms->m_header.h_nodeid,
2707 lkb->lkb_id, lkb->lkb_remid);
2708 return -EINVAL;
2709 }
2710
2711 if (!is_master_copy(lkb))
2712 return -EINVAL;
2713
2714 if (lkb->lkb_status != DLM_LKSTS_GRANTED)
2715 return -EBUSY;
2716
David Teiglande7fd4172006-01-18 09:30:29 +00002717 if (receive_lvb(ls, lkb, ms))
2718 return -ENOMEM;
2719
2720 lkb->lkb_rqmode = ms->m_rqmode;
2721 lkb->lkb_lvbseq = ms->m_lvbseq;
2722
2723 return 0;
2724}
2725
2726static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2727 struct dlm_message *ms)
2728{
2729 if (!is_master_copy(lkb))
2730 return -EINVAL;
2731 if (receive_lvb(ls, lkb, ms))
2732 return -ENOMEM;
2733 return 0;
2734}
2735
2736/* We fill in the stub-lkb fields with the info that send_xxxx_reply()
2737 uses to send a reply and that the remote end uses to process the reply. */
2738
2739static void setup_stub_lkb(struct dlm_ls *ls, struct dlm_message *ms)
2740{
2741 struct dlm_lkb *lkb = &ls->ls_stub_lkb;
2742 lkb->lkb_nodeid = ms->m_header.h_nodeid;
2743 lkb->lkb_remid = ms->m_lkid;
2744}
2745
2746static void receive_request(struct dlm_ls *ls, struct dlm_message *ms)
2747{
2748 struct dlm_lkb *lkb;
2749 struct dlm_rsb *r;
2750 int error, namelen;
2751
2752 error = create_lkb(ls, &lkb);
2753 if (error)
2754 goto fail;
2755
2756 receive_flags(lkb, ms);
2757 lkb->lkb_flags |= DLM_IFL_MSTCPY;
2758 error = receive_request_args(ls, lkb, ms);
2759 if (error) {
David Teiglandb3f58d82006-02-28 11:16:37 -05002760 __put_lkb(ls, lkb);
David Teiglande7fd4172006-01-18 09:30:29 +00002761 goto fail;
2762 }
2763
2764 namelen = receive_extralen(ms);
2765
2766 error = find_rsb(ls, ms->m_extra, namelen, R_MASTER, &r);
2767 if (error) {
David Teiglandb3f58d82006-02-28 11:16:37 -05002768 __put_lkb(ls, lkb);
David Teiglande7fd4172006-01-18 09:30:29 +00002769 goto fail;
2770 }
2771
2772 lock_rsb(r);
2773
2774 attach_lkb(r, lkb);
2775 error = do_request(r, lkb);
2776 send_request_reply(r, lkb, error);
2777
2778 unlock_rsb(r);
2779 put_rsb(r);
2780
2781 if (error == -EINPROGRESS)
2782 error = 0;
2783 if (error)
David Teiglandb3f58d82006-02-28 11:16:37 -05002784 dlm_put_lkb(lkb);
David Teiglande7fd4172006-01-18 09:30:29 +00002785 return;
2786
2787 fail:
2788 setup_stub_lkb(ls, ms);
2789 send_request_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
2790}
2791
2792static void receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
2793{
2794 struct dlm_lkb *lkb;
2795 struct dlm_rsb *r;
David Teigland90135922006-01-20 08:47:07 +00002796 int error, reply = 1;
David Teiglande7fd4172006-01-18 09:30:29 +00002797
2798 error = find_lkb(ls, ms->m_remid, &lkb);
2799 if (error)
2800 goto fail;
2801
2802 r = lkb->lkb_resource;
2803
2804 hold_rsb(r);
2805 lock_rsb(r);
2806
2807 receive_flags(lkb, ms);
2808 error = receive_convert_args(ls, lkb, ms);
2809 if (error)
2810 goto out;
2811 reply = !down_conversion(lkb);
2812
2813 error = do_convert(r, lkb);
2814 out:
2815 if (reply)
2816 send_convert_reply(r, lkb, error);
2817
2818 unlock_rsb(r);
2819 put_rsb(r);
David Teiglandb3f58d82006-02-28 11:16:37 -05002820 dlm_put_lkb(lkb);
David Teiglande7fd4172006-01-18 09:30:29 +00002821 return;
2822
2823 fail:
2824 setup_stub_lkb(ls, ms);
2825 send_convert_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
2826}
2827
2828static void receive_unlock(struct dlm_ls *ls, struct dlm_message *ms)
2829{
2830 struct dlm_lkb *lkb;
2831 struct dlm_rsb *r;
2832 int error;
2833
2834 error = find_lkb(ls, ms->m_remid, &lkb);
2835 if (error)
2836 goto fail;
2837
2838 r = lkb->lkb_resource;
2839
2840 hold_rsb(r);
2841 lock_rsb(r);
2842
2843 receive_flags(lkb, ms);
2844 error = receive_unlock_args(ls, lkb, ms);
2845 if (error)
2846 goto out;
2847
2848 error = do_unlock(r, lkb);
2849 out:
2850 send_unlock_reply(r, lkb, error);
2851
2852 unlock_rsb(r);
2853 put_rsb(r);
David Teiglandb3f58d82006-02-28 11:16:37 -05002854 dlm_put_lkb(lkb);
David Teiglande7fd4172006-01-18 09:30:29 +00002855 return;
2856
2857 fail:
2858 setup_stub_lkb(ls, ms);
2859 send_unlock_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
2860}
2861
2862static void receive_cancel(struct dlm_ls *ls, struct dlm_message *ms)
2863{
2864 struct dlm_lkb *lkb;
2865 struct dlm_rsb *r;
2866 int error;
2867
2868 error = find_lkb(ls, ms->m_remid, &lkb);
2869 if (error)
2870 goto fail;
2871
2872 receive_flags(lkb, ms);
2873
2874 r = lkb->lkb_resource;
2875
2876 hold_rsb(r);
2877 lock_rsb(r);
2878
2879 error = do_cancel(r, lkb);
2880 send_cancel_reply(r, lkb, error);
2881
2882 unlock_rsb(r);
2883 put_rsb(r);
David Teiglandb3f58d82006-02-28 11:16:37 -05002884 dlm_put_lkb(lkb);
David Teiglande7fd4172006-01-18 09:30:29 +00002885 return;
2886
2887 fail:
2888 setup_stub_lkb(ls, ms);
2889 send_cancel_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
2890}
2891
2892static void receive_grant(struct dlm_ls *ls, struct dlm_message *ms)
2893{
2894 struct dlm_lkb *lkb;
2895 struct dlm_rsb *r;
2896 int error;
2897
2898 error = find_lkb(ls, ms->m_remid, &lkb);
2899 if (error) {
2900 log_error(ls, "receive_grant no lkb");
2901 return;
2902 }
2903 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2904
2905 r = lkb->lkb_resource;
2906
2907 hold_rsb(r);
2908 lock_rsb(r);
2909
2910 receive_flags_reply(lkb, ms);
2911 grant_lock_pc(r, lkb, ms);
2912 queue_cast(r, lkb, 0);
2913
2914 unlock_rsb(r);
2915 put_rsb(r);
David Teiglandb3f58d82006-02-28 11:16:37 -05002916 dlm_put_lkb(lkb);
David Teiglande7fd4172006-01-18 09:30:29 +00002917}
2918
2919static void receive_bast(struct dlm_ls *ls, struct dlm_message *ms)
2920{
2921 struct dlm_lkb *lkb;
2922 struct dlm_rsb *r;
2923 int error;
2924
2925 error = find_lkb(ls, ms->m_remid, &lkb);
2926 if (error) {
2927 log_error(ls, "receive_bast no lkb");
2928 return;
2929 }
2930 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
2931
2932 r = lkb->lkb_resource;
2933
2934 hold_rsb(r);
2935 lock_rsb(r);
2936
2937 queue_bast(r, lkb, ms->m_bastmode);
2938
2939 unlock_rsb(r);
2940 put_rsb(r);
David Teiglandb3f58d82006-02-28 11:16:37 -05002941 dlm_put_lkb(lkb);
David Teiglande7fd4172006-01-18 09:30:29 +00002942}
2943
2944static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms)
2945{
2946 int len, error, ret_nodeid, dir_nodeid, from_nodeid, our_nodeid;
2947
2948 from_nodeid = ms->m_header.h_nodeid;
2949 our_nodeid = dlm_our_nodeid();
2950
2951 len = receive_extralen(ms);
2952
2953 dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
2954 if (dir_nodeid != our_nodeid) {
2955 log_error(ls, "lookup dir_nodeid %d from %d",
2956 dir_nodeid, from_nodeid);
2957 error = -EINVAL;
2958 ret_nodeid = -1;
2959 goto out;
2960 }
2961
2962 error = dlm_dir_lookup(ls, from_nodeid, ms->m_extra, len, &ret_nodeid);
2963
2964 /* Optimization: we're master so treat lookup as a request */
2965 if (!error && ret_nodeid == our_nodeid) {
2966 receive_request(ls, ms);
2967 return;
2968 }
2969 out:
2970 send_lookup_reply(ls, ms, ret_nodeid, error);
2971}
2972
2973static void receive_remove(struct dlm_ls *ls, struct dlm_message *ms)
2974{
2975 int len, dir_nodeid, from_nodeid;
2976
2977 from_nodeid = ms->m_header.h_nodeid;
2978
2979 len = receive_extralen(ms);
2980
2981 dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
2982 if (dir_nodeid != dlm_our_nodeid()) {
2983 log_error(ls, "remove dir entry dir_nodeid %d from %d",
2984 dir_nodeid, from_nodeid);
2985 return;
2986 }
2987
2988 dlm_dir_remove_entry(ls, from_nodeid, ms->m_extra, len);
2989}
2990
David Teigland84991372007-03-30 15:02:40 -05002991static void receive_purge(struct dlm_ls *ls, struct dlm_message *ms)
2992{
2993 do_purge(ls, ms->m_nodeid, ms->m_pid);
2994}
2995
David Teiglande7fd4172006-01-18 09:30:29 +00002996static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
2997{
2998 struct dlm_lkb *lkb;
2999 struct dlm_rsb *r;
David Teiglandef0c2bb2007-03-28 09:56:46 -05003000 int error, mstype, result;
David Teiglande7fd4172006-01-18 09:30:29 +00003001
3002 error = find_lkb(ls, ms->m_remid, &lkb);
3003 if (error) {
3004 log_error(ls, "receive_request_reply no lkb");
3005 return;
3006 }
3007 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3008
David Teiglande7fd4172006-01-18 09:30:29 +00003009 r = lkb->lkb_resource;
3010 hold_rsb(r);
3011 lock_rsb(r);
3012
David Teiglandef0c2bb2007-03-28 09:56:46 -05003013 mstype = lkb->lkb_wait_type;
3014 error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY);
3015 if (error)
3016 goto out;
3017
David Teiglande7fd4172006-01-18 09:30:29 +00003018 /* Optimization: the dir node was also the master, so it took our
3019 lookup as a request and sent request reply instead of lookup reply */
3020 if (mstype == DLM_MSG_LOOKUP) {
3021 r->res_nodeid = ms->m_header.h_nodeid;
3022 lkb->lkb_nodeid = r->res_nodeid;
3023 }
3024
David Teiglandef0c2bb2007-03-28 09:56:46 -05003025 /* this is the value returned from do_request() on the master */
3026 result = ms->m_result;
3027
3028 switch (result) {
David Teiglande7fd4172006-01-18 09:30:29 +00003029 case -EAGAIN:
David Teiglandef0c2bb2007-03-28 09:56:46 -05003030 /* request would block (be queued) on remote master */
David Teiglande7fd4172006-01-18 09:30:29 +00003031 queue_cast(r, lkb, -EAGAIN);
3032 confirm_master(r, -EAGAIN);
David Teiglandef0c2bb2007-03-28 09:56:46 -05003033 unhold_lkb(lkb); /* undoes create_lkb() */
David Teiglande7fd4172006-01-18 09:30:29 +00003034 break;
3035
3036 case -EINPROGRESS:
3037 case 0:
3038 /* request was queued or granted on remote master */
3039 receive_flags_reply(lkb, ms);
3040 lkb->lkb_remid = ms->m_lkid;
David Teiglandef0c2bb2007-03-28 09:56:46 -05003041 if (result)
David Teiglande7fd4172006-01-18 09:30:29 +00003042 add_lkb(r, lkb, DLM_LKSTS_WAITING);
3043 else {
3044 grant_lock_pc(r, lkb, ms);
3045 queue_cast(r, lkb, 0);
3046 }
David Teiglandef0c2bb2007-03-28 09:56:46 -05003047 confirm_master(r, result);
David Teiglande7fd4172006-01-18 09:30:29 +00003048 break;
3049
David Teigland597d0ca2006-07-12 16:44:04 -05003050 case -EBADR:
David Teiglande7fd4172006-01-18 09:30:29 +00003051 case -ENOTBLK:
3052 /* find_rsb failed to find rsb or rsb wasn't master */
David Teiglandef0c2bb2007-03-28 09:56:46 -05003053 log_debug(ls, "receive_request_reply %x %x master diff %d %d",
3054 lkb->lkb_id, lkb->lkb_flags, r->res_nodeid, result);
David Teiglande7fd4172006-01-18 09:30:29 +00003055 r->res_nodeid = -1;
3056 lkb->lkb_nodeid = -1;
David Teiglandef0c2bb2007-03-28 09:56:46 -05003057
3058 if (is_overlap(lkb)) {
3059 /* we'll ignore error in cancel/unlock reply */
3060 queue_cast_overlap(r, lkb);
3061 unhold_lkb(lkb); /* undoes create_lkb() */
3062 } else
3063 _request_lock(r, lkb);
David Teiglande7fd4172006-01-18 09:30:29 +00003064 break;
3065
3066 default:
David Teiglandef0c2bb2007-03-28 09:56:46 -05003067 log_error(ls, "receive_request_reply %x error %d",
3068 lkb->lkb_id, result);
David Teiglande7fd4172006-01-18 09:30:29 +00003069 }
3070
David Teiglandef0c2bb2007-03-28 09:56:46 -05003071 if (is_overlap_unlock(lkb) && (result == 0 || result == -EINPROGRESS)) {
3072 log_debug(ls, "receive_request_reply %x result %d unlock",
3073 lkb->lkb_id, result);
3074 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3075 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3076 send_unlock(r, lkb);
3077 } else if (is_overlap_cancel(lkb) && (result == -EINPROGRESS)) {
3078 log_debug(ls, "receive_request_reply %x cancel", lkb->lkb_id);
3079 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3080 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3081 send_cancel(r, lkb);
3082 } else {
3083 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3084 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3085 }
3086 out:
David Teiglande7fd4172006-01-18 09:30:29 +00003087 unlock_rsb(r);
3088 put_rsb(r);
David Teiglandb3f58d82006-02-28 11:16:37 -05003089 dlm_put_lkb(lkb);
David Teiglande7fd4172006-01-18 09:30:29 +00003090}
3091
3092static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
3093 struct dlm_message *ms)
3094{
David Teiglande7fd4172006-01-18 09:30:29 +00003095 /* this is the value returned from do_convert() on the master */
David Teiglandef0c2bb2007-03-28 09:56:46 -05003096 switch (ms->m_result) {
David Teiglande7fd4172006-01-18 09:30:29 +00003097 case -EAGAIN:
3098 /* convert would block (be queued) on remote master */
3099 queue_cast(r, lkb, -EAGAIN);
3100 break;
3101
3102 case -EINPROGRESS:
3103 /* convert was queued on remote master */
3104 del_lkb(r, lkb);
3105 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
3106 break;
3107
3108 case 0:
3109 /* convert was granted on remote master */
3110 receive_flags_reply(lkb, ms);
3111 grant_lock_pc(r, lkb, ms);
3112 queue_cast(r, lkb, 0);
3113 break;
3114
3115 default:
David Teiglandef0c2bb2007-03-28 09:56:46 -05003116 log_error(r->res_ls, "receive_convert_reply %x error %d",
3117 lkb->lkb_id, ms->m_result);
David Teiglande7fd4172006-01-18 09:30:29 +00003118 }
3119}
3120
3121static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3122{
3123 struct dlm_rsb *r = lkb->lkb_resource;
David Teiglandef0c2bb2007-03-28 09:56:46 -05003124 int error;
David Teiglande7fd4172006-01-18 09:30:29 +00003125
3126 hold_rsb(r);
3127 lock_rsb(r);
3128
David Teiglandef0c2bb2007-03-28 09:56:46 -05003129 /* stub reply can happen with waiters_mutex held */
3130 error = remove_from_waiters_ms(lkb, ms);
3131 if (error)
3132 goto out;
David Teiglande7fd4172006-01-18 09:30:29 +00003133
David Teiglandef0c2bb2007-03-28 09:56:46 -05003134 __receive_convert_reply(r, lkb, ms);
3135 out:
David Teiglande7fd4172006-01-18 09:30:29 +00003136 unlock_rsb(r);
3137 put_rsb(r);
3138}
3139
3140static void receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms)
3141{
3142 struct dlm_lkb *lkb;
3143 int error;
3144
3145 error = find_lkb(ls, ms->m_remid, &lkb);
3146 if (error) {
3147 log_error(ls, "receive_convert_reply no lkb");
3148 return;
3149 }
3150 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3151
David Teiglande7fd4172006-01-18 09:30:29 +00003152 _receive_convert_reply(lkb, ms);
David Teiglandb3f58d82006-02-28 11:16:37 -05003153 dlm_put_lkb(lkb);
David Teiglande7fd4172006-01-18 09:30:29 +00003154}
3155
3156static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3157{
3158 struct dlm_rsb *r = lkb->lkb_resource;
David Teiglandef0c2bb2007-03-28 09:56:46 -05003159 int error;
David Teiglande7fd4172006-01-18 09:30:29 +00003160
3161 hold_rsb(r);
3162 lock_rsb(r);
3163
David Teiglandef0c2bb2007-03-28 09:56:46 -05003164 /* stub reply can happen with waiters_mutex held */
3165 error = remove_from_waiters_ms(lkb, ms);
3166 if (error)
3167 goto out;
3168
David Teiglande7fd4172006-01-18 09:30:29 +00003169 /* this is the value returned from do_unlock() on the master */
3170
David Teiglandef0c2bb2007-03-28 09:56:46 -05003171 switch (ms->m_result) {
David Teiglande7fd4172006-01-18 09:30:29 +00003172 case -DLM_EUNLOCK:
3173 receive_flags_reply(lkb, ms);
3174 remove_lock_pc(r, lkb);
3175 queue_cast(r, lkb, -DLM_EUNLOCK);
3176 break;
David Teiglandef0c2bb2007-03-28 09:56:46 -05003177 case -ENOENT:
3178 break;
David Teiglande7fd4172006-01-18 09:30:29 +00003179 default:
David Teiglandef0c2bb2007-03-28 09:56:46 -05003180 log_error(r->res_ls, "receive_unlock_reply %x error %d",
3181 lkb->lkb_id, ms->m_result);
David Teiglande7fd4172006-01-18 09:30:29 +00003182 }
David Teiglandef0c2bb2007-03-28 09:56:46 -05003183 out:
David Teiglande7fd4172006-01-18 09:30:29 +00003184 unlock_rsb(r);
3185 put_rsb(r);
3186}
3187
3188static void receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms)
3189{
3190 struct dlm_lkb *lkb;
3191 int error;
3192
3193 error = find_lkb(ls, ms->m_remid, &lkb);
3194 if (error) {
3195 log_error(ls, "receive_unlock_reply no lkb");
3196 return;
3197 }
3198 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3199
David Teiglande7fd4172006-01-18 09:30:29 +00003200 _receive_unlock_reply(lkb, ms);
David Teiglandb3f58d82006-02-28 11:16:37 -05003201 dlm_put_lkb(lkb);
David Teiglande7fd4172006-01-18 09:30:29 +00003202}
3203
3204static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3205{
3206 struct dlm_rsb *r = lkb->lkb_resource;
David Teiglandef0c2bb2007-03-28 09:56:46 -05003207 int error;
David Teiglande7fd4172006-01-18 09:30:29 +00003208
3209 hold_rsb(r);
3210 lock_rsb(r);
3211
David Teiglandef0c2bb2007-03-28 09:56:46 -05003212 /* stub reply can happen with waiters_mutex held */
3213 error = remove_from_waiters_ms(lkb, ms);
3214 if (error)
3215 goto out;
3216
David Teiglande7fd4172006-01-18 09:30:29 +00003217 /* this is the value returned from do_cancel() on the master */
3218
David Teiglandef0c2bb2007-03-28 09:56:46 -05003219 switch (ms->m_result) {
David Teiglande7fd4172006-01-18 09:30:29 +00003220 case -DLM_ECANCEL:
3221 receive_flags_reply(lkb, ms);
3222 revert_lock_pc(r, lkb);
David Teiglandef0c2bb2007-03-28 09:56:46 -05003223 if (ms->m_result)
3224 queue_cast(r, lkb, -DLM_ECANCEL);
3225 break;
3226 case 0:
David Teiglande7fd4172006-01-18 09:30:29 +00003227 break;
3228 default:
David Teiglandef0c2bb2007-03-28 09:56:46 -05003229 log_error(r->res_ls, "receive_cancel_reply %x error %d",
3230 lkb->lkb_id, ms->m_result);
David Teiglande7fd4172006-01-18 09:30:29 +00003231 }
David Teiglandef0c2bb2007-03-28 09:56:46 -05003232 out:
David Teiglande7fd4172006-01-18 09:30:29 +00003233 unlock_rsb(r);
3234 put_rsb(r);
3235}
3236
3237static void receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms)
3238{
3239 struct dlm_lkb *lkb;
3240 int error;
3241
3242 error = find_lkb(ls, ms->m_remid, &lkb);
3243 if (error) {
3244 log_error(ls, "receive_cancel_reply no lkb");
3245 return;
3246 }
3247 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3248
David Teiglande7fd4172006-01-18 09:30:29 +00003249 _receive_cancel_reply(lkb, ms);
David Teiglandb3f58d82006-02-28 11:16:37 -05003250 dlm_put_lkb(lkb);
David Teiglande7fd4172006-01-18 09:30:29 +00003251}
3252
3253static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
3254{
3255 struct dlm_lkb *lkb;
3256 struct dlm_rsb *r;
3257 int error, ret_nodeid;
3258
3259 error = find_lkb(ls, ms->m_lkid, &lkb);
3260 if (error) {
3261 log_error(ls, "receive_lookup_reply no lkb");
3262 return;
3263 }
3264
David Teiglandef0c2bb2007-03-28 09:56:46 -05003265 /* ms->m_result is the value returned by dlm_dir_lookup on dir node
David Teiglande7fd4172006-01-18 09:30:29 +00003266 FIXME: will a non-zero error ever be returned? */
David Teiglande7fd4172006-01-18 09:30:29 +00003267
3268 r = lkb->lkb_resource;
3269 hold_rsb(r);
3270 lock_rsb(r);
3271
David Teiglandef0c2bb2007-03-28 09:56:46 -05003272 error = remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
3273 if (error)
3274 goto out;
3275
David Teiglande7fd4172006-01-18 09:30:29 +00003276 ret_nodeid = ms->m_nodeid;
3277 if (ret_nodeid == dlm_our_nodeid()) {
3278 r->res_nodeid = 0;
3279 ret_nodeid = 0;
3280 r->res_first_lkid = 0;
3281 } else {
3282 /* set_master() will copy res_nodeid to lkb_nodeid */
3283 r->res_nodeid = ret_nodeid;
3284 }
3285
David Teiglandef0c2bb2007-03-28 09:56:46 -05003286 if (is_overlap(lkb)) {
3287 log_debug(ls, "receive_lookup_reply %x unlock %x",
3288 lkb->lkb_id, lkb->lkb_flags);
3289 queue_cast_overlap(r, lkb);
3290 unhold_lkb(lkb); /* undoes create_lkb() */
3291 goto out_list;
3292 }
3293
David Teiglande7fd4172006-01-18 09:30:29 +00003294 _request_lock(r, lkb);
3295
David Teiglandef0c2bb2007-03-28 09:56:46 -05003296 out_list:
David Teiglande7fd4172006-01-18 09:30:29 +00003297 if (!ret_nodeid)
3298 process_lookup_list(r);
David Teiglandef0c2bb2007-03-28 09:56:46 -05003299 out:
David Teiglande7fd4172006-01-18 09:30:29 +00003300 unlock_rsb(r);
3301 put_rsb(r);
David Teiglandb3f58d82006-02-28 11:16:37 -05003302 dlm_put_lkb(lkb);
David Teiglande7fd4172006-01-18 09:30:29 +00003303}
3304
3305int dlm_receive_message(struct dlm_header *hd, int nodeid, int recovery)
3306{
3307 struct dlm_message *ms = (struct dlm_message *) hd;
3308 struct dlm_ls *ls;
David Teigland8fd3a982007-01-24 10:11:45 -06003309 int error = 0;
David Teiglande7fd4172006-01-18 09:30:29 +00003310
3311 if (!recovery)
3312 dlm_message_in(ms);
3313
3314 ls = dlm_find_lockspace_global(hd->h_lockspace);
3315 if (!ls) {
3316 log_print("drop message %d from %d for unknown lockspace %d",
3317 ms->m_type, nodeid, hd->h_lockspace);
3318 return -EINVAL;
3319 }
3320
3321 /* recovery may have just ended leaving a bunch of backed-up requests
3322 in the requestqueue; wait while dlm_recoverd clears them */
3323
3324 if (!recovery)
3325 dlm_wait_requestqueue(ls);
3326
3327 /* recovery may have just started while there were a bunch of
3328 in-flight requests -- save them in requestqueue to be processed
3329 after recovery. we can't let dlm_recvd block on the recovery
3330 lock. if dlm_recoverd is calling this function to clear the
3331 requestqueue, it needs to be interrupted (-EINTR) if another
3332 recovery operation is starting. */
3333
3334 while (1) {
3335 if (dlm_locking_stopped(ls)) {
David Teiglandd4400152006-10-31 11:55:56 -06003336 if (recovery) {
3337 error = -EINTR;
3338 goto out;
3339 }
3340 error = dlm_add_requestqueue(ls, nodeid, hd);
3341 if (error == -EAGAIN)
3342 continue;
3343 else {
3344 error = -EINTR;
3345 goto out;
3346 }
David Teiglande7fd4172006-01-18 09:30:29 +00003347 }
3348
3349 if (lock_recovery_try(ls))
3350 break;
3351 schedule();
3352 }
3353
3354 switch (ms->m_type) {
3355
3356 /* messages sent to a master node */
3357
3358 case DLM_MSG_REQUEST:
3359 receive_request(ls, ms);
3360 break;
3361
3362 case DLM_MSG_CONVERT:
3363 receive_convert(ls, ms);
3364 break;
3365
3366 case DLM_MSG_UNLOCK:
3367 receive_unlock(ls, ms);
3368 break;
3369
3370 case DLM_MSG_CANCEL:
3371 receive_cancel(ls, ms);
3372 break;
3373
3374 /* messages sent from a master node (replies to above) */
3375
3376 case DLM_MSG_REQUEST_REPLY:
3377 receive_request_reply(ls, ms);
3378 break;
3379
3380 case DLM_MSG_CONVERT_REPLY:
3381 receive_convert_reply(ls, ms);
3382 break;
3383
3384 case DLM_MSG_UNLOCK_REPLY:
3385 receive_unlock_reply(ls, ms);
3386 break;
3387
3388 case DLM_MSG_CANCEL_REPLY:
3389 receive_cancel_reply(ls, ms);
3390 break;
3391
3392 /* messages sent from a master node (only two types of async msg) */
3393
3394 case DLM_MSG_GRANT:
3395 receive_grant(ls, ms);
3396 break;
3397
3398 case DLM_MSG_BAST:
3399 receive_bast(ls, ms);
3400 break;
3401
3402 /* messages sent to a dir node */
3403
3404 case DLM_MSG_LOOKUP:
3405 receive_lookup(ls, ms);
3406 break;
3407
3408 case DLM_MSG_REMOVE:
3409 receive_remove(ls, ms);
3410 break;
3411
3412 /* messages sent from a dir node (remove has no reply) */
3413
3414 case DLM_MSG_LOOKUP_REPLY:
3415 receive_lookup_reply(ls, ms);
3416 break;
3417
David Teigland84991372007-03-30 15:02:40 -05003418 /* other messages */
3419
3420 case DLM_MSG_PURGE:
3421 receive_purge(ls, ms);
3422 break;
3423
David Teiglande7fd4172006-01-18 09:30:29 +00003424 default:
3425 log_error(ls, "unknown message type %d", ms->m_type);
3426 }
3427
3428 unlock_recovery(ls);
3429 out:
3430 dlm_put_lockspace(ls);
3431 dlm_astd_wake();
David Teigland8fd3a982007-01-24 10:11:45 -06003432 return error;
David Teiglande7fd4172006-01-18 09:30:29 +00003433}
3434
3435
3436/*
3437 * Recovery related
3438 */
3439
3440static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb)
3441{
3442 if (middle_conversion(lkb)) {
3443 hold_lkb(lkb);
David Teiglandef0c2bb2007-03-28 09:56:46 -05003444 ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
David Teiglande7fd4172006-01-18 09:30:29 +00003445 ls->ls_stub_ms.m_result = -EINPROGRESS;
David Teigland075529b2006-12-13 10:40:26 -06003446 ls->ls_stub_ms.m_flags = lkb->lkb_flags;
David Teiglande7fd4172006-01-18 09:30:29 +00003447 _receive_convert_reply(lkb, &ls->ls_stub_ms);
3448
3449 /* Same special case as in receive_rcom_lock_args() */
3450 lkb->lkb_grmode = DLM_LOCK_IV;
3451 rsb_set_flag(lkb->lkb_resource, RSB_RECOVER_CONVERT);
3452 unhold_lkb(lkb);
3453
3454 } else if (lkb->lkb_rqmode >= lkb->lkb_grmode) {
3455 lkb->lkb_flags |= DLM_IFL_RESEND;
3456 }
3457
3458 /* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down
3459 conversions are async; there's no reply from the remote master */
3460}
3461
3462/* A waiting lkb needs recovery if the master node has failed, or
3463 the master node is changing (only when no directory is used) */
3464
3465static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb)
3466{
3467 if (dlm_is_removed(ls, lkb->lkb_nodeid))
3468 return 1;
3469
3470 if (!dlm_no_directory(ls))
3471 return 0;
3472
3473 if (dlm_dir_nodeid(lkb->lkb_resource) != lkb->lkb_nodeid)
3474 return 1;
3475
3476 return 0;
3477}
3478
3479/* Recovery for locks that are waiting for replies from nodes that are now
3480 gone. We can just complete unlocks and cancels by faking a reply from the
3481 dead node. Requests and up-conversions we flag to be resent after
3482 recovery. Down-conversions can just be completed with a fake reply like
3483 unlocks. Conversions between PR and CW need special attention. */
3484
3485void dlm_recover_waiters_pre(struct dlm_ls *ls)
3486{
3487 struct dlm_lkb *lkb, *safe;
3488
David Teigland90135922006-01-20 08:47:07 +00003489 mutex_lock(&ls->ls_waiters_mutex);
David Teiglande7fd4172006-01-18 09:30:29 +00003490
3491 list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
3492 log_debug(ls, "pre recover waiter lkid %x type %d flags %x",
3493 lkb->lkb_id, lkb->lkb_wait_type, lkb->lkb_flags);
3494
3495 /* all outstanding lookups, regardless of destination will be
3496 resent after recovery is done */
3497
3498 if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) {
3499 lkb->lkb_flags |= DLM_IFL_RESEND;
3500 continue;
3501 }
3502
3503 if (!waiter_needs_recovery(ls, lkb))
3504 continue;
3505
3506 switch (lkb->lkb_wait_type) {
3507
3508 case DLM_MSG_REQUEST:
3509 lkb->lkb_flags |= DLM_IFL_RESEND;
3510 break;
3511
3512 case DLM_MSG_CONVERT:
3513 recover_convert_waiter(ls, lkb);
3514 break;
3515
3516 case DLM_MSG_UNLOCK:
3517 hold_lkb(lkb);
David Teiglandef0c2bb2007-03-28 09:56:46 -05003518 ls->ls_stub_ms.m_type = DLM_MSG_UNLOCK_REPLY;
David Teiglande7fd4172006-01-18 09:30:29 +00003519 ls->ls_stub_ms.m_result = -DLM_EUNLOCK;
David Teigland075529b2006-12-13 10:40:26 -06003520 ls->ls_stub_ms.m_flags = lkb->lkb_flags;
David Teiglande7fd4172006-01-18 09:30:29 +00003521 _receive_unlock_reply(lkb, &ls->ls_stub_ms);
David Teiglandb3f58d82006-02-28 11:16:37 -05003522 dlm_put_lkb(lkb);
David Teiglande7fd4172006-01-18 09:30:29 +00003523 break;
3524
3525 case DLM_MSG_CANCEL:
3526 hold_lkb(lkb);
David Teiglandef0c2bb2007-03-28 09:56:46 -05003527 ls->ls_stub_ms.m_type = DLM_MSG_CANCEL_REPLY;
David Teiglande7fd4172006-01-18 09:30:29 +00003528 ls->ls_stub_ms.m_result = -DLM_ECANCEL;
David Teigland075529b2006-12-13 10:40:26 -06003529 ls->ls_stub_ms.m_flags = lkb->lkb_flags;
David Teiglande7fd4172006-01-18 09:30:29 +00003530 _receive_cancel_reply(lkb, &ls->ls_stub_ms);
David Teiglandb3f58d82006-02-28 11:16:37 -05003531 dlm_put_lkb(lkb);
David Teiglande7fd4172006-01-18 09:30:29 +00003532 break;
3533
3534 default:
3535 log_error(ls, "invalid lkb wait_type %d",
3536 lkb->lkb_wait_type);
3537 }
David Teigland81456802006-07-25 14:05:09 -05003538 schedule();
David Teiglande7fd4172006-01-18 09:30:29 +00003539 }
David Teigland90135922006-01-20 08:47:07 +00003540 mutex_unlock(&ls->ls_waiters_mutex);
David Teiglande7fd4172006-01-18 09:30:29 +00003541}
3542
David Teiglandef0c2bb2007-03-28 09:56:46 -05003543static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls)
David Teiglande7fd4172006-01-18 09:30:29 +00003544{
3545 struct dlm_lkb *lkb;
David Teiglandef0c2bb2007-03-28 09:56:46 -05003546 int found = 0;
David Teiglande7fd4172006-01-18 09:30:29 +00003547
David Teigland90135922006-01-20 08:47:07 +00003548 mutex_lock(&ls->ls_waiters_mutex);
David Teiglande7fd4172006-01-18 09:30:29 +00003549 list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
3550 if (lkb->lkb_flags & DLM_IFL_RESEND) {
David Teiglandef0c2bb2007-03-28 09:56:46 -05003551 hold_lkb(lkb);
3552 found = 1;
David Teiglande7fd4172006-01-18 09:30:29 +00003553 break;
3554 }
3555 }
David Teigland90135922006-01-20 08:47:07 +00003556 mutex_unlock(&ls->ls_waiters_mutex);
David Teiglande7fd4172006-01-18 09:30:29 +00003557
David Teiglandef0c2bb2007-03-28 09:56:46 -05003558 if (!found)
David Teiglande7fd4172006-01-18 09:30:29 +00003559 lkb = NULL;
David Teiglandef0c2bb2007-03-28 09:56:46 -05003560 return lkb;
David Teiglande7fd4172006-01-18 09:30:29 +00003561}
3562
3563/* Deal with lookups and lkb's marked RESEND from _pre. We may now be the
3564 master or dir-node for r. Processing the lkb may result in it being placed
3565 back on waiters. */
3566
David Teiglandef0c2bb2007-03-28 09:56:46 -05003567/* We do this after normal locking has been enabled and any saved messages
3568 (in requestqueue) have been processed. We should be confident that at
3569 this point we won't get or process a reply to any of these waiting
3570 operations. But, new ops may be coming in on the rsbs/locks here from
3571 userspace or remotely. */
3572
3573/* there may have been an overlap unlock/cancel prior to recovery or after
3574 recovery. if before, the lkb may still have a pos wait_count; if after, the
3575 overlap flag would just have been set and nothing new sent. we can be
3576 confident here than any replies to either the initial op or overlap ops
3577 prior to recovery have been received. */
3578
David Teiglande7fd4172006-01-18 09:30:29 +00003579int dlm_recover_waiters_post(struct dlm_ls *ls)
3580{
3581 struct dlm_lkb *lkb;
3582 struct dlm_rsb *r;
David Teiglandef0c2bb2007-03-28 09:56:46 -05003583 int error = 0, mstype, err, oc, ou;
David Teiglande7fd4172006-01-18 09:30:29 +00003584
3585 while (1) {
3586 if (dlm_locking_stopped(ls)) {
3587 log_debug(ls, "recover_waiters_post aborted");
3588 error = -EINTR;
3589 break;
3590 }
3591
David Teiglandef0c2bb2007-03-28 09:56:46 -05003592 lkb = find_resend_waiter(ls);
3593 if (!lkb)
David Teiglande7fd4172006-01-18 09:30:29 +00003594 break;
3595
3596 r = lkb->lkb_resource;
David Teiglandef0c2bb2007-03-28 09:56:46 -05003597 hold_rsb(r);
3598 lock_rsb(r);
3599
3600 mstype = lkb->lkb_wait_type;
3601 oc = is_overlap_cancel(lkb);
3602 ou = is_overlap_unlock(lkb);
3603 err = 0;
David Teiglande7fd4172006-01-18 09:30:29 +00003604
3605 log_debug(ls, "recover_waiters_post %x type %d flags %x %s",
3606 lkb->lkb_id, mstype, lkb->lkb_flags, r->res_name);
3607
David Teiglandef0c2bb2007-03-28 09:56:46 -05003608 /* At this point we assume that we won't get a reply to any
3609 previous op or overlap op on this lock. First, do a big
3610 remove_from_waiters() for all previous ops. */
David Teiglande7fd4172006-01-18 09:30:29 +00003611
David Teiglandef0c2bb2007-03-28 09:56:46 -05003612 lkb->lkb_flags &= ~DLM_IFL_RESEND;
3613 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3614 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3615 lkb->lkb_wait_type = 0;
3616 lkb->lkb_wait_count = 0;
3617 mutex_lock(&ls->ls_waiters_mutex);
3618 list_del_init(&lkb->lkb_wait_reply);
3619 mutex_unlock(&ls->ls_waiters_mutex);
3620 unhold_lkb(lkb); /* for waiters list */
David Teiglande7fd4172006-01-18 09:30:29 +00003621
David Teiglandef0c2bb2007-03-28 09:56:46 -05003622 if (oc || ou) {
3623 /* do an unlock or cancel instead of resending */
3624 switch (mstype) {
3625 case DLM_MSG_LOOKUP:
3626 case DLM_MSG_REQUEST:
3627 queue_cast(r, lkb, ou ? -DLM_EUNLOCK :
3628 -DLM_ECANCEL);
3629 unhold_lkb(lkb); /* undoes create_lkb() */
3630 break;
3631 case DLM_MSG_CONVERT:
3632 if (oc) {
3633 queue_cast(r, lkb, -DLM_ECANCEL);
3634 } else {
3635 lkb->lkb_exflags |= DLM_LKF_FORCEUNLOCK;
3636 _unlock_lock(r, lkb);
3637 }
3638 break;
3639 default:
3640 err = 1;
3641 }
3642 } else {
3643 switch (mstype) {
3644 case DLM_MSG_LOOKUP:
3645 case DLM_MSG_REQUEST:
3646 _request_lock(r, lkb);
3647 if (is_master(r))
3648 confirm_master(r, 0);
3649 break;
3650 case DLM_MSG_CONVERT:
3651 _convert_lock(r, lkb);
3652 break;
3653 default:
3654 err = 1;
3655 }
David Teiglande7fd4172006-01-18 09:30:29 +00003656 }
David Teiglandef0c2bb2007-03-28 09:56:46 -05003657
3658 if (err)
3659 log_error(ls, "recover_waiters_post %x %d %x %d %d",
3660 lkb->lkb_id, mstype, lkb->lkb_flags, oc, ou);
3661 unlock_rsb(r);
3662 put_rsb(r);
3663 dlm_put_lkb(lkb);
David Teiglande7fd4172006-01-18 09:30:29 +00003664 }
3665
3666 return error;
3667}
3668
3669static void purge_queue(struct dlm_rsb *r, struct list_head *queue,
3670 int (*test)(struct dlm_ls *ls, struct dlm_lkb *lkb))
3671{
3672 struct dlm_ls *ls = r->res_ls;
3673 struct dlm_lkb *lkb, *safe;
3674
3675 list_for_each_entry_safe(lkb, safe, queue, lkb_statequeue) {
3676 if (test(ls, lkb)) {
David Teigland97a35d12006-05-02 13:34:03 -04003677 rsb_set_flag(r, RSB_LOCKS_PURGED);
David Teiglande7fd4172006-01-18 09:30:29 +00003678 del_lkb(r, lkb);
3679 /* this put should free the lkb */
David Teiglandb3f58d82006-02-28 11:16:37 -05003680 if (!dlm_put_lkb(lkb))
David Teiglande7fd4172006-01-18 09:30:29 +00003681 log_error(ls, "purged lkb not released");
3682 }
3683 }
3684}
3685
3686static int purge_dead_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
3687{
3688 return (is_master_copy(lkb) && dlm_is_removed(ls, lkb->lkb_nodeid));
3689}
3690
3691static int purge_mstcpy_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
3692{
3693 return is_master_copy(lkb);
3694}
3695
3696static void purge_dead_locks(struct dlm_rsb *r)
3697{
3698 purge_queue(r, &r->res_grantqueue, &purge_dead_test);
3699 purge_queue(r, &r->res_convertqueue, &purge_dead_test);
3700 purge_queue(r, &r->res_waitqueue, &purge_dead_test);
3701}
3702
3703void dlm_purge_mstcpy_locks(struct dlm_rsb *r)
3704{
3705 purge_queue(r, &r->res_grantqueue, &purge_mstcpy_test);
3706 purge_queue(r, &r->res_convertqueue, &purge_mstcpy_test);
3707 purge_queue(r, &r->res_waitqueue, &purge_mstcpy_test);
3708}
3709
3710/* Get rid of locks held by nodes that are gone. */
3711
3712int dlm_purge_locks(struct dlm_ls *ls)
3713{
3714 struct dlm_rsb *r;
3715
3716 log_debug(ls, "dlm_purge_locks");
3717
3718 down_write(&ls->ls_root_sem);
3719 list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
3720 hold_rsb(r);
3721 lock_rsb(r);
3722 if (is_master(r))
3723 purge_dead_locks(r);
3724 unlock_rsb(r);
3725 unhold_rsb(r);
3726
3727 schedule();
3728 }
3729 up_write(&ls->ls_root_sem);
3730
3731 return 0;
3732}
3733
David Teigland97a35d12006-05-02 13:34:03 -04003734static struct dlm_rsb *find_purged_rsb(struct dlm_ls *ls, int bucket)
3735{
3736 struct dlm_rsb *r, *r_ret = NULL;
3737
3738 read_lock(&ls->ls_rsbtbl[bucket].lock);
3739 list_for_each_entry(r, &ls->ls_rsbtbl[bucket].list, res_hashchain) {
3740 if (!rsb_flag(r, RSB_LOCKS_PURGED))
3741 continue;
3742 hold_rsb(r);
3743 rsb_clear_flag(r, RSB_LOCKS_PURGED);
3744 r_ret = r;
3745 break;
3746 }
3747 read_unlock(&ls->ls_rsbtbl[bucket].lock);
3748 return r_ret;
3749}
3750
3751void dlm_grant_after_purge(struct dlm_ls *ls)
David Teiglande7fd4172006-01-18 09:30:29 +00003752{
3753 struct dlm_rsb *r;
David Teigland2b4e9262006-07-25 13:59:48 -05003754 int bucket = 0;
David Teiglande7fd4172006-01-18 09:30:29 +00003755
David Teigland2b4e9262006-07-25 13:59:48 -05003756 while (1) {
3757 r = find_purged_rsb(ls, bucket);
3758 if (!r) {
3759 if (bucket == ls->ls_rsbtbl_size - 1)
3760 break;
3761 bucket++;
David Teigland97a35d12006-05-02 13:34:03 -04003762 continue;
David Teigland2b4e9262006-07-25 13:59:48 -05003763 }
David Teigland97a35d12006-05-02 13:34:03 -04003764 lock_rsb(r);
3765 if (is_master(r)) {
3766 grant_pending_locks(r);
3767 confirm_master(r, 0);
David Teiglande7fd4172006-01-18 09:30:29 +00003768 }
David Teigland97a35d12006-05-02 13:34:03 -04003769 unlock_rsb(r);
3770 put_rsb(r);
David Teigland2b4e9262006-07-25 13:59:48 -05003771 schedule();
David Teiglande7fd4172006-01-18 09:30:29 +00003772 }
David Teiglande7fd4172006-01-18 09:30:29 +00003773}
3774
3775static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid,
3776 uint32_t remid)
3777{
3778 struct dlm_lkb *lkb;
3779
3780 list_for_each_entry(lkb, head, lkb_statequeue) {
3781 if (lkb->lkb_nodeid == nodeid && lkb->lkb_remid == remid)
3782 return lkb;
3783 }
3784 return NULL;
3785}
3786
3787static struct dlm_lkb *search_remid(struct dlm_rsb *r, int nodeid,
3788 uint32_t remid)
3789{
3790 struct dlm_lkb *lkb;
3791
3792 lkb = search_remid_list(&r->res_grantqueue, nodeid, remid);
3793 if (lkb)
3794 return lkb;
3795 lkb = search_remid_list(&r->res_convertqueue, nodeid, remid);
3796 if (lkb)
3797 return lkb;
3798 lkb = search_remid_list(&r->res_waitqueue, nodeid, remid);
3799 if (lkb)
3800 return lkb;
3801 return NULL;
3802}
3803
3804static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3805 struct dlm_rsb *r, struct dlm_rcom *rc)
3806{
3807 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
3808 int lvblen;
3809
3810 lkb->lkb_nodeid = rc->rc_header.h_nodeid;
3811 lkb->lkb_ownpid = rl->rl_ownpid;
3812 lkb->lkb_remid = rl->rl_lkid;
3813 lkb->lkb_exflags = rl->rl_exflags;
3814 lkb->lkb_flags = rl->rl_flags & 0x0000FFFF;
3815 lkb->lkb_flags |= DLM_IFL_MSTCPY;
3816 lkb->lkb_lvbseq = rl->rl_lvbseq;
3817 lkb->lkb_rqmode = rl->rl_rqmode;
3818 lkb->lkb_grmode = rl->rl_grmode;
3819 /* don't set lkb_status because add_lkb wants to itself */
3820
3821 lkb->lkb_bastaddr = (void *) (long) (rl->rl_asts & AST_BAST);
3822 lkb->lkb_astaddr = (void *) (long) (rl->rl_asts & AST_COMP);
3823
David Teiglande7fd4172006-01-18 09:30:29 +00003824 if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
3825 lkb->lkb_lvbptr = allocate_lvb(ls);
3826 if (!lkb->lkb_lvbptr)
3827 return -ENOMEM;
3828 lvblen = rc->rc_header.h_length - sizeof(struct dlm_rcom) -
3829 sizeof(struct rcom_lock);
3830 memcpy(lkb->lkb_lvbptr, rl->rl_lvb, lvblen);
3831 }
3832
3833 /* Conversions between PR and CW (middle modes) need special handling.
3834 The real granted mode of these converting locks cannot be determined
3835 until all locks have been rebuilt on the rsb (recover_conversion) */
3836
3837 if (rl->rl_wait_type == DLM_MSG_CONVERT && middle_conversion(lkb)) {
3838 rl->rl_status = DLM_LKSTS_CONVERT;
3839 lkb->lkb_grmode = DLM_LOCK_IV;
3840 rsb_set_flag(r, RSB_RECOVER_CONVERT);
3841 }
3842
3843 return 0;
3844}
3845
3846/* This lkb may have been recovered in a previous aborted recovery so we need
3847 to check if the rsb already has an lkb with the given remote nodeid/lkid.
3848 If so we just send back a standard reply. If not, we create a new lkb with
3849 the given values and send back our lkid. We send back our lkid by sending
3850 back the rcom_lock struct we got but with the remid field filled in. */
3851
3852int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
3853{
3854 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
3855 struct dlm_rsb *r;
3856 struct dlm_lkb *lkb;
3857 int error;
3858
3859 if (rl->rl_parent_lkid) {
3860 error = -EOPNOTSUPP;
3861 goto out;
3862 }
3863
3864 error = find_rsb(ls, rl->rl_name, rl->rl_namelen, R_MASTER, &r);
3865 if (error)
3866 goto out;
3867
3868 lock_rsb(r);
3869
3870 lkb = search_remid(r, rc->rc_header.h_nodeid, rl->rl_lkid);
3871 if (lkb) {
3872 error = -EEXIST;
3873 goto out_remid;
3874 }
3875
3876 error = create_lkb(ls, &lkb);
3877 if (error)
3878 goto out_unlock;
3879
3880 error = receive_rcom_lock_args(ls, lkb, r, rc);
3881 if (error) {
David Teiglandb3f58d82006-02-28 11:16:37 -05003882 __put_lkb(ls, lkb);
David Teiglande7fd4172006-01-18 09:30:29 +00003883 goto out_unlock;
3884 }
3885
3886 attach_lkb(r, lkb);
3887 add_lkb(r, lkb, rl->rl_status);
3888 error = 0;
3889
3890 out_remid:
3891 /* this is the new value returned to the lock holder for
3892 saving in its process-copy lkb */
3893 rl->rl_remid = lkb->lkb_id;
3894
3895 out_unlock:
3896 unlock_rsb(r);
3897 put_rsb(r);
3898 out:
3899 if (error)
3900 log_print("recover_master_copy %d %x", error, rl->rl_lkid);
3901 rl->rl_result = error;
3902 return error;
3903}
3904
3905int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
3906{
3907 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
3908 struct dlm_rsb *r;
3909 struct dlm_lkb *lkb;
3910 int error;
3911
3912 error = find_lkb(ls, rl->rl_lkid, &lkb);
3913 if (error) {
3914 log_error(ls, "recover_process_copy no lkid %x", rl->rl_lkid);
3915 return error;
3916 }
3917
3918 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3919
3920 error = rl->rl_result;
3921
3922 r = lkb->lkb_resource;
3923 hold_rsb(r);
3924 lock_rsb(r);
3925
3926 switch (error) {
David Teiglanddc200a82006-12-13 10:36:37 -06003927 case -EBADR:
3928 /* There's a chance the new master received our lock before
3929 dlm_recover_master_reply(), this wouldn't happen if we did
3930 a barrier between recover_masters and recover_locks. */
3931 log_debug(ls, "master copy not ready %x r %lx %s", lkb->lkb_id,
3932 (unsigned long)r, r->res_name);
3933 dlm_send_rcom_lock(r, lkb);
3934 goto out;
David Teiglande7fd4172006-01-18 09:30:29 +00003935 case -EEXIST:
3936 log_debug(ls, "master copy exists %x", lkb->lkb_id);
3937 /* fall through */
3938 case 0:
3939 lkb->lkb_remid = rl->rl_remid;
3940 break;
3941 default:
3942 log_error(ls, "dlm_recover_process_copy unknown error %d %x",
3943 error, lkb->lkb_id);
3944 }
3945
3946 /* an ack for dlm_recover_locks() which waits for replies from
3947 all the locks it sends to new masters */
3948 dlm_recovered_lock(r);
David Teiglanddc200a82006-12-13 10:36:37 -06003949 out:
David Teiglande7fd4172006-01-18 09:30:29 +00003950 unlock_rsb(r);
3951 put_rsb(r);
David Teiglandb3f58d82006-02-28 11:16:37 -05003952 dlm_put_lkb(lkb);
David Teiglande7fd4172006-01-18 09:30:29 +00003953
3954 return 0;
3955}
3956
David Teigland597d0ca2006-07-12 16:44:04 -05003957int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
3958 int mode, uint32_t flags, void *name, unsigned int namelen,
3959 uint32_t parent_lkid)
3960{
3961 struct dlm_lkb *lkb;
3962 struct dlm_args args;
3963 int error;
3964
3965 lock_recovery(ls);
3966
3967 error = create_lkb(ls, &lkb);
3968 if (error) {
3969 kfree(ua);
3970 goto out;
3971 }
3972
3973 if (flags & DLM_LKF_VALBLK) {
David Teigland62a0f622007-01-31 13:25:00 -06003974 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_KERNEL);
David Teigland597d0ca2006-07-12 16:44:04 -05003975 if (!ua->lksb.sb_lvbptr) {
3976 kfree(ua);
3977 __put_lkb(ls, lkb);
3978 error = -ENOMEM;
3979 goto out;
3980 }
3981 }
3982
3983 /* After ua is attached to lkb it will be freed by free_lkb().
3984 When DLM_IFL_USER is set, the dlm knows that this is a userspace
3985 lock and that lkb_astparam is the dlm_user_args structure. */
3986
3987 error = set_lock_args(mode, &ua->lksb, flags, namelen, parent_lkid,
David Teigland32f105a2006-08-23 16:07:31 -04003988 DLM_FAKE_USER_AST, ua, DLM_FAKE_USER_AST, &args);
David Teigland597d0ca2006-07-12 16:44:04 -05003989 lkb->lkb_flags |= DLM_IFL_USER;
3990 ua->old_mode = DLM_LOCK_IV;
3991
3992 if (error) {
3993 __put_lkb(ls, lkb);
3994 goto out;
3995 }
3996
3997 error = request_lock(ls, lkb, name, namelen, &args);
3998
3999 switch (error) {
4000 case 0:
4001 break;
4002 case -EINPROGRESS:
4003 error = 0;
4004 break;
4005 case -EAGAIN:
4006 error = 0;
4007 /* fall through */
4008 default:
4009 __put_lkb(ls, lkb);
4010 goto out;
4011 }
4012
4013 /* add this new lkb to the per-process list of locks */
4014 spin_lock(&ua->proc->locks_spin);
David Teiglandef0c2bb2007-03-28 09:56:46 -05004015 hold_lkb(lkb);
David Teigland597d0ca2006-07-12 16:44:04 -05004016 list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
4017 spin_unlock(&ua->proc->locks_spin);
4018 out:
4019 unlock_recovery(ls);
4020 return error;
4021}
4022
4023int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4024 int mode, uint32_t flags, uint32_t lkid, char *lvb_in)
4025{
4026 struct dlm_lkb *lkb;
4027 struct dlm_args args;
4028 struct dlm_user_args *ua;
4029 int error;
4030
4031 lock_recovery(ls);
4032
4033 error = find_lkb(ls, lkid, &lkb);
4034 if (error)
4035 goto out;
4036
4037 /* user can change the params on its lock when it converts it, or
4038 add an lvb that didn't exist before */
4039
4040 ua = (struct dlm_user_args *)lkb->lkb_astparam;
4041
4042 if (flags & DLM_LKF_VALBLK && !ua->lksb.sb_lvbptr) {
David Teigland62a0f622007-01-31 13:25:00 -06004043 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_KERNEL);
David Teigland597d0ca2006-07-12 16:44:04 -05004044 if (!ua->lksb.sb_lvbptr) {
4045 error = -ENOMEM;
4046 goto out_put;
4047 }
4048 }
4049 if (lvb_in && ua->lksb.sb_lvbptr)
4050 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
4051
4052 ua->castparam = ua_tmp->castparam;
4053 ua->castaddr = ua_tmp->castaddr;
4054 ua->bastparam = ua_tmp->bastparam;
4055 ua->bastaddr = ua_tmp->bastaddr;
Patrick Caulfield10948eb2006-08-23 09:49:31 +01004056 ua->user_lksb = ua_tmp->user_lksb;
David Teigland597d0ca2006-07-12 16:44:04 -05004057 ua->old_mode = lkb->lkb_grmode;
4058
David Teigland32f105a2006-08-23 16:07:31 -04004059 error = set_lock_args(mode, &ua->lksb, flags, 0, 0, DLM_FAKE_USER_AST,
4060 ua, DLM_FAKE_USER_AST, &args);
David Teigland597d0ca2006-07-12 16:44:04 -05004061 if (error)
4062 goto out_put;
4063
4064 error = convert_lock(ls, lkb, &args);
4065
4066 if (error == -EINPROGRESS || error == -EAGAIN)
4067 error = 0;
4068 out_put:
4069 dlm_put_lkb(lkb);
4070 out:
4071 unlock_recovery(ls);
4072 kfree(ua_tmp);
4073 return error;
4074}
4075
4076int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4077 uint32_t flags, uint32_t lkid, char *lvb_in)
4078{
4079 struct dlm_lkb *lkb;
4080 struct dlm_args args;
4081 struct dlm_user_args *ua;
4082 int error;
4083
4084 lock_recovery(ls);
4085
4086 error = find_lkb(ls, lkid, &lkb);
4087 if (error)
4088 goto out;
4089
4090 ua = (struct dlm_user_args *)lkb->lkb_astparam;
4091
4092 if (lvb_in && ua->lksb.sb_lvbptr)
4093 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
4094 ua->castparam = ua_tmp->castparam;
Patrick Caulfieldcc346d52006-08-08 10:34:40 -04004095 ua->user_lksb = ua_tmp->user_lksb;
David Teigland597d0ca2006-07-12 16:44:04 -05004096
4097 error = set_unlock_args(flags, ua, &args);
4098 if (error)
4099 goto out_put;
4100
4101 error = unlock_lock(ls, lkb, &args);
4102
4103 if (error == -DLM_EUNLOCK)
4104 error = 0;
David Teiglandef0c2bb2007-03-28 09:56:46 -05004105 /* from validate_unlock_args() */
4106 if (error == -EBUSY && (flags & DLM_LKF_FORCEUNLOCK))
4107 error = 0;
David Teigland597d0ca2006-07-12 16:44:04 -05004108 if (error)
4109 goto out_put;
4110
4111 spin_lock(&ua->proc->locks_spin);
David Teiglanda1bc86e2007-01-15 10:34:52 -06004112 /* dlm_user_add_ast() may have already taken lkb off the proc list */
4113 if (!list_empty(&lkb->lkb_ownqueue))
4114 list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking);
David Teigland597d0ca2006-07-12 16:44:04 -05004115 spin_unlock(&ua->proc->locks_spin);
David Teigland597d0ca2006-07-12 16:44:04 -05004116 out_put:
4117 dlm_put_lkb(lkb);
4118 out:
4119 unlock_recovery(ls);
David Teiglandef0c2bb2007-03-28 09:56:46 -05004120 kfree(ua_tmp);
David Teigland597d0ca2006-07-12 16:44:04 -05004121 return error;
4122}
4123
4124int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4125 uint32_t flags, uint32_t lkid)
4126{
4127 struct dlm_lkb *lkb;
4128 struct dlm_args args;
4129 struct dlm_user_args *ua;
4130 int error;
4131
4132 lock_recovery(ls);
4133
4134 error = find_lkb(ls, lkid, &lkb);
4135 if (error)
4136 goto out;
4137
4138 ua = (struct dlm_user_args *)lkb->lkb_astparam;
4139 ua->castparam = ua_tmp->castparam;
Patrick Caulfieldc059f702006-08-23 10:24:03 +01004140 ua->user_lksb = ua_tmp->user_lksb;
David Teigland597d0ca2006-07-12 16:44:04 -05004141
4142 error = set_unlock_args(flags, ua, &args);
4143 if (error)
4144 goto out_put;
4145
4146 error = cancel_lock(ls, lkb, &args);
4147
4148 if (error == -DLM_ECANCEL)
4149 error = 0;
David Teiglandef0c2bb2007-03-28 09:56:46 -05004150 /* from validate_unlock_args() */
4151 if (error == -EBUSY)
4152 error = 0;
David Teigland597d0ca2006-07-12 16:44:04 -05004153 out_put:
4154 dlm_put_lkb(lkb);
4155 out:
4156 unlock_recovery(ls);
David Teiglandef0c2bb2007-03-28 09:56:46 -05004157 kfree(ua_tmp);
David Teigland597d0ca2006-07-12 16:44:04 -05004158 return error;
4159}
4160
David Teiglandef0c2bb2007-03-28 09:56:46 -05004161/* lkb's that are removed from the waiters list by revert are just left on the
4162 orphans list with the granted orphan locks, to be freed by purge */
4163
David Teigland597d0ca2006-07-12 16:44:04 -05004164static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
4165{
4166 struct dlm_user_args *ua = (struct dlm_user_args *)lkb->lkb_astparam;
David Teiglandef0c2bb2007-03-28 09:56:46 -05004167 struct dlm_args args;
4168 int error;
David Teigland597d0ca2006-07-12 16:44:04 -05004169
David Teiglandef0c2bb2007-03-28 09:56:46 -05004170 hold_lkb(lkb);
4171 mutex_lock(&ls->ls_orphans_mutex);
4172 list_add_tail(&lkb->lkb_ownqueue, &ls->ls_orphans);
4173 mutex_unlock(&ls->ls_orphans_mutex);
David Teigland597d0ca2006-07-12 16:44:04 -05004174
David Teiglandef0c2bb2007-03-28 09:56:46 -05004175 set_unlock_args(0, ua, &args);
4176
4177 error = cancel_lock(ls, lkb, &args);
4178 if (error == -DLM_ECANCEL)
4179 error = 0;
4180 return error;
David Teigland597d0ca2006-07-12 16:44:04 -05004181}
4182
4183/* The force flag allows the unlock to go ahead even if the lkb isn't granted.
4184 Regardless of what rsb queue the lock is on, it's removed and freed. */
4185
4186static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
4187{
4188 struct dlm_user_args *ua = (struct dlm_user_args *)lkb->lkb_astparam;
4189 struct dlm_args args;
4190 int error;
4191
David Teigland597d0ca2006-07-12 16:44:04 -05004192 set_unlock_args(DLM_LKF_FORCEUNLOCK, ua, &args);
4193
4194 error = unlock_lock(ls, lkb, &args);
4195 if (error == -DLM_EUNLOCK)
4196 error = 0;
4197 return error;
4198}
4199
David Teiglandef0c2bb2007-03-28 09:56:46 -05004200/* We have to release clear_proc_locks mutex before calling unlock_proc_lock()
4201 (which does lock_rsb) due to deadlock with receiving a message that does
4202 lock_rsb followed by dlm_user_add_ast() */
4203
4204static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls,
4205 struct dlm_user_proc *proc)
4206{
4207 struct dlm_lkb *lkb = NULL;
4208
4209 mutex_lock(&ls->ls_clear_proc_locks);
4210 if (list_empty(&proc->locks))
4211 goto out;
4212
4213 lkb = list_entry(proc->locks.next, struct dlm_lkb, lkb_ownqueue);
4214 list_del_init(&lkb->lkb_ownqueue);
4215
4216 if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
4217 lkb->lkb_flags |= DLM_IFL_ORPHAN;
4218 else
4219 lkb->lkb_flags |= DLM_IFL_DEAD;
4220 out:
4221 mutex_unlock(&ls->ls_clear_proc_locks);
4222 return lkb;
4223}
4224
David Teigland597d0ca2006-07-12 16:44:04 -05004225/* The ls_clear_proc_locks mutex protects against dlm_user_add_asts() which
4226 1) references lkb->ua which we free here and 2) adds lkbs to proc->asts,
4227 which we clear here. */
4228
4229/* proc CLOSING flag is set so no more device_reads should look at proc->asts
4230 list, and no more device_writes should add lkb's to proc->locks list; so we
4231 shouldn't need to take asts_spin or locks_spin here. this assumes that
4232 device reads/writes/closes are serialized -- FIXME: we may need to serialize
4233 them ourself. */
4234
4235void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
4236{
4237 struct dlm_lkb *lkb, *safe;
4238
4239 lock_recovery(ls);
David Teigland597d0ca2006-07-12 16:44:04 -05004240
David Teiglandef0c2bb2007-03-28 09:56:46 -05004241 while (1) {
4242 lkb = del_proc_lock(ls, proc);
4243 if (!lkb)
4244 break;
4245 if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
David Teigland597d0ca2006-07-12 16:44:04 -05004246 orphan_proc_lock(ls, lkb);
David Teiglandef0c2bb2007-03-28 09:56:46 -05004247 else
David Teigland597d0ca2006-07-12 16:44:04 -05004248 unlock_proc_lock(ls, lkb);
David Teigland597d0ca2006-07-12 16:44:04 -05004249
4250 /* this removes the reference for the proc->locks list
4251 added by dlm_user_request, it may result in the lkb
4252 being freed */
4253
4254 dlm_put_lkb(lkb);
4255 }
David Teiglanda1bc86e2007-01-15 10:34:52 -06004256
David Teiglandef0c2bb2007-03-28 09:56:46 -05004257 mutex_lock(&ls->ls_clear_proc_locks);
4258
David Teiglanda1bc86e2007-01-15 10:34:52 -06004259 /* in-progress unlocks */
4260 list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
4261 list_del_init(&lkb->lkb_ownqueue);
4262 lkb->lkb_flags |= DLM_IFL_DEAD;
4263 dlm_put_lkb(lkb);
4264 }
4265
4266 list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_astqueue) {
4267 list_del(&lkb->lkb_astqueue);
4268 dlm_put_lkb(lkb);
4269 }
4270
David Teigland597d0ca2006-07-12 16:44:04 -05004271 mutex_unlock(&ls->ls_clear_proc_locks);
4272 unlock_recovery(ls);
4273}
David Teiglanda1bc86e2007-01-15 10:34:52 -06004274
David Teigland84991372007-03-30 15:02:40 -05004275static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
4276{
4277 struct dlm_lkb *lkb, *safe;
4278
4279 while (1) {
4280 lkb = NULL;
4281 spin_lock(&proc->locks_spin);
4282 if (!list_empty(&proc->locks)) {
4283 lkb = list_entry(proc->locks.next, struct dlm_lkb,
4284 lkb_ownqueue);
4285 list_del_init(&lkb->lkb_ownqueue);
4286 }
4287 spin_unlock(&proc->locks_spin);
4288
4289 if (!lkb)
4290 break;
4291
4292 lkb->lkb_flags |= DLM_IFL_DEAD;
4293 unlock_proc_lock(ls, lkb);
4294 dlm_put_lkb(lkb); /* ref from proc->locks list */
4295 }
4296
4297 spin_lock(&proc->locks_spin);
4298 list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
4299 list_del_init(&lkb->lkb_ownqueue);
4300 lkb->lkb_flags |= DLM_IFL_DEAD;
4301 dlm_put_lkb(lkb);
4302 }
4303 spin_unlock(&proc->locks_spin);
4304
4305 spin_lock(&proc->asts_spin);
4306 list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_astqueue) {
4307 list_del(&lkb->lkb_astqueue);
4308 dlm_put_lkb(lkb);
4309 }
4310 spin_unlock(&proc->asts_spin);
4311}
4312
4313/* pid of 0 means purge all orphans */
4314
4315static void do_purge(struct dlm_ls *ls, int nodeid, int pid)
4316{
4317 struct dlm_lkb *lkb, *safe;
4318
4319 mutex_lock(&ls->ls_orphans_mutex);
4320 list_for_each_entry_safe(lkb, safe, &ls->ls_orphans, lkb_ownqueue) {
4321 if (pid && lkb->lkb_ownpid != pid)
4322 continue;
4323 unlock_proc_lock(ls, lkb);
4324 list_del_init(&lkb->lkb_ownqueue);
4325 dlm_put_lkb(lkb);
4326 }
4327 mutex_unlock(&ls->ls_orphans_mutex);
4328}
4329
4330static int send_purge(struct dlm_ls *ls, int nodeid, int pid)
4331{
4332 struct dlm_message *ms;
4333 struct dlm_mhandle *mh;
4334 int error;
4335
4336 error = _create_message(ls, sizeof(struct dlm_message), nodeid,
4337 DLM_MSG_PURGE, &ms, &mh);
4338 if (error)
4339 return error;
4340 ms->m_nodeid = nodeid;
4341 ms->m_pid = pid;
4342
4343 return send_message(mh, ms);
4344}
4345
4346int dlm_user_purge(struct dlm_ls *ls, struct dlm_user_proc *proc,
4347 int nodeid, int pid)
4348{
4349 int error = 0;
4350
4351 if (nodeid != dlm_our_nodeid()) {
4352 error = send_purge(ls, nodeid, pid);
4353 } else {
4354 lock_recovery(ls);
4355 if (pid == current->pid)
4356 purge_proc_locks(ls, proc);
4357 else
4358 do_purge(ls, nodeid, pid);
4359 unlock_recovery(ls);
4360 }
4361 return error;
4362}
4363