blob: 2f8a5a700cc0c81bc02cb0a78fb74aef18368748 [file] [log] [blame]
David Teiglande7fd4172006-01-18 09:30:29 +00001/******************************************************************************
2*******************************************************************************
3**
David Teiglandef0c2bb2007-03-28 09:56:46 -05004** Copyright (C) 2005-2007 Red Hat, Inc. All rights reserved.
David Teiglande7fd4172006-01-18 09:30:29 +00005**
6** This copyrighted material is made available to anyone wishing to use,
7** modify, copy, or redistribute it subject to the terms and conditions
8** of the GNU General Public License v.2.
9**
10*******************************************************************************
11******************************************************************************/
12
13/* Central locking logic has four stages:
14
15 dlm_lock()
16 dlm_unlock()
17
18 request_lock(ls, lkb)
19 convert_lock(ls, lkb)
20 unlock_lock(ls, lkb)
21 cancel_lock(ls, lkb)
22
23 _request_lock(r, lkb)
24 _convert_lock(r, lkb)
25 _unlock_lock(r, lkb)
26 _cancel_lock(r, lkb)
27
28 do_request(r, lkb)
29 do_convert(r, lkb)
30 do_unlock(r, lkb)
31 do_cancel(r, lkb)
32
33 Stage 1 (lock, unlock) is mainly about checking input args and
34 splitting into one of the four main operations:
35
36 dlm_lock = request_lock
37 dlm_lock+CONVERT = convert_lock
38 dlm_unlock = unlock_lock
39 dlm_unlock+CANCEL = cancel_lock
40
41 Stage 2, xxxx_lock(), just finds and locks the relevant rsb which is
42 provided to the next stage.
43
44 Stage 3, _xxxx_lock(), determines if the operation is local or remote.
45 When remote, it calls send_xxxx(), when local it calls do_xxxx().
46
47 Stage 4, do_xxxx(), is the guts of the operation. It manipulates the
48 given rsb and lkb and queues callbacks.
49
50 For remote operations, send_xxxx() results in the corresponding do_xxxx()
51 function being executed on the remote node. The connecting send/receive
52 calls on local (L) and remote (R) nodes:
53
54 L: send_xxxx() -> R: receive_xxxx()
55 R: do_xxxx()
56 L: receive_xxxx_reply() <- R: send_xxxx_reply()
57*/
David Teigland597d0ca2006-07-12 16:44:04 -050058#include <linux/types.h>
David Teiglande7fd4172006-01-18 09:30:29 +000059#include "dlm_internal.h"
David Teigland597d0ca2006-07-12 16:44:04 -050060#include <linux/dlm_device.h>
David Teiglande7fd4172006-01-18 09:30:29 +000061#include "memory.h"
62#include "lowcomms.h"
63#include "requestqueue.h"
64#include "util.h"
65#include "dir.h"
66#include "member.h"
67#include "lockspace.h"
68#include "ast.h"
69#include "lock.h"
70#include "rcom.h"
71#include "recover.h"
72#include "lvb_table.h"
David Teigland597d0ca2006-07-12 16:44:04 -050073#include "user.h"
David Teiglande7fd4172006-01-18 09:30:29 +000074#include "config.h"
75
76static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
77static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb);
78static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb);
79static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb);
80static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb);
81static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
82static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
83static int send_remove(struct dlm_rsb *r);
84static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
David Teigland3ae1acf2007-05-18 08:59:31 -050085static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
David Teiglande7fd4172006-01-18 09:30:29 +000086static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
87 struct dlm_message *ms);
88static int receive_extralen(struct dlm_message *ms);
David Teigland84991372007-03-30 15:02:40 -050089static void do_purge(struct dlm_ls *ls, int nodeid, int pid);
David Teigland3ae1acf2007-05-18 08:59:31 -050090static void del_timeout(struct dlm_lkb *lkb);
91void dlm_timeout_warn(struct dlm_lkb *lkb);
David Teiglande7fd4172006-01-18 09:30:29 +000092
93/*
94 * Lock compatibilty matrix - thanks Steve
95 * UN = Unlocked state. Not really a state, used as a flag
96 * PD = Padding. Used to make the matrix a nice power of two in size
97 * Other states are the same as the VMS DLM.
98 * Usage: matrix[grmode+1][rqmode+1] (although m[rq+1][gr+1] is the same)
99 */
100
101static const int __dlm_compat_matrix[8][8] = {
102 /* UN NL CR CW PR PW EX PD */
103 {1, 1, 1, 1, 1, 1, 1, 0}, /* UN */
104 {1, 1, 1, 1, 1, 1, 1, 0}, /* NL */
105 {1, 1, 1, 1, 1, 1, 0, 0}, /* CR */
106 {1, 1, 1, 1, 0, 0, 0, 0}, /* CW */
107 {1, 1, 1, 0, 1, 0, 0, 0}, /* PR */
108 {1, 1, 1, 0, 0, 0, 0, 0}, /* PW */
109 {1, 1, 0, 0, 0, 0, 0, 0}, /* EX */
110 {0, 0, 0, 0, 0, 0, 0, 0} /* PD */
111};
112
113/*
114 * This defines the direction of transfer of LVB data.
115 * Granted mode is the row; requested mode is the column.
116 * Usage: matrix[grmode+1][rqmode+1]
117 * 1 = LVB is returned to the caller
118 * 0 = LVB is written to the resource
119 * -1 = nothing happens to the LVB
120 */
121
122const int dlm_lvb_operations[8][8] = {
123 /* UN NL CR CW PR PW EX PD*/
124 { -1, 1, 1, 1, 1, 1, 1, -1 }, /* UN */
125 { -1, 1, 1, 1, 1, 1, 1, 0 }, /* NL */
126 { -1, -1, 1, 1, 1, 1, 1, 0 }, /* CR */
127 { -1, -1, -1, 1, 1, 1, 1, 0 }, /* CW */
128 { -1, -1, -1, -1, 1, 1, 1, 0 }, /* PR */
129 { -1, 0, 0, 0, 0, 0, 1, 0 }, /* PW */
130 { -1, 0, 0, 0, 0, 0, 0, 0 }, /* EX */
131 { -1, 0, 0, 0, 0, 0, 0, 0 } /* PD */
132};
David Teiglande7fd4172006-01-18 09:30:29 +0000133
134#define modes_compat(gr, rq) \
135 __dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1]
136
137int dlm_modes_compat(int mode1, int mode2)
138{
139 return __dlm_compat_matrix[mode1 + 1][mode2 + 1];
140}
141
142/*
143 * Compatibility matrix for conversions with QUECVT set.
144 * Granted mode is the row; requested mode is the column.
145 * Usage: matrix[grmode+1][rqmode+1]
146 */
147
148static const int __quecvt_compat_matrix[8][8] = {
149 /* UN NL CR CW PR PW EX PD */
150 {0, 0, 0, 0, 0, 0, 0, 0}, /* UN */
151 {0, 0, 1, 1, 1, 1, 1, 0}, /* NL */
152 {0, 0, 0, 1, 1, 1, 1, 0}, /* CR */
153 {0, 0, 0, 0, 1, 1, 1, 0}, /* CW */
154 {0, 0, 0, 1, 0, 1, 1, 0}, /* PR */
155 {0, 0, 0, 0, 0, 0, 1, 0}, /* PW */
156 {0, 0, 0, 0, 0, 0, 0, 0}, /* EX */
157 {0, 0, 0, 0, 0, 0, 0, 0} /* PD */
158};
159
David Teigland597d0ca2006-07-12 16:44:04 -0500160void dlm_print_lkb(struct dlm_lkb *lkb)
David Teiglande7fd4172006-01-18 09:30:29 +0000161{
162 printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x\n"
163 " status %d rqmode %d grmode %d wait_type %d ast_type %d\n",
164 lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
165 lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode,
166 lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_ast_type);
167}
168
169void dlm_print_rsb(struct dlm_rsb *r)
170{
171 printk(KERN_ERR "rsb: nodeid %d flags %lx first %x rlc %d name %s\n",
172 r->res_nodeid, r->res_flags, r->res_first_lkid,
173 r->res_recover_locks_count, r->res_name);
174}
175
David Teiglanda345da32006-08-18 11:54:25 -0500176void dlm_dump_rsb(struct dlm_rsb *r)
177{
178 struct dlm_lkb *lkb;
179
180 dlm_print_rsb(r);
181
182 printk(KERN_ERR "rsb: root_list empty %d recover_list empty %d\n",
183 list_empty(&r->res_root_list), list_empty(&r->res_recover_list));
184 printk(KERN_ERR "rsb lookup list\n");
185 list_for_each_entry(lkb, &r->res_lookup, lkb_rsb_lookup)
186 dlm_print_lkb(lkb);
187 printk(KERN_ERR "rsb grant queue:\n");
188 list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue)
189 dlm_print_lkb(lkb);
190 printk(KERN_ERR "rsb convert queue:\n");
191 list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue)
192 dlm_print_lkb(lkb);
193 printk(KERN_ERR "rsb wait queue:\n");
194 list_for_each_entry(lkb, &r->res_waitqueue, lkb_statequeue)
195 dlm_print_lkb(lkb);
196}
197
David Teiglande7fd4172006-01-18 09:30:29 +0000198/* Threads cannot use the lockspace while it's being recovered */
199
David Teigland85e86ed2007-05-18 08:58:15 -0500200static inline void dlm_lock_recovery(struct dlm_ls *ls)
David Teiglande7fd4172006-01-18 09:30:29 +0000201{
202 down_read(&ls->ls_in_recovery);
203}
204
David Teigland85e86ed2007-05-18 08:58:15 -0500205void dlm_unlock_recovery(struct dlm_ls *ls)
David Teiglande7fd4172006-01-18 09:30:29 +0000206{
207 up_read(&ls->ls_in_recovery);
208}
209
David Teigland85e86ed2007-05-18 08:58:15 -0500210int dlm_lock_recovery_try(struct dlm_ls *ls)
David Teiglande7fd4172006-01-18 09:30:29 +0000211{
212 return down_read_trylock(&ls->ls_in_recovery);
213}
214
215static inline int can_be_queued(struct dlm_lkb *lkb)
216{
217 return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE);
218}
219
220static inline int force_blocking_asts(struct dlm_lkb *lkb)
221{
222 return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST);
223}
224
225static inline int is_demoted(struct dlm_lkb *lkb)
226{
227 return (lkb->lkb_sbflags & DLM_SBF_DEMOTED);
228}
229
David Teigland7d3c1fe2007-04-19 10:30:41 -0500230static inline int is_altmode(struct dlm_lkb *lkb)
231{
232 return (lkb->lkb_sbflags & DLM_SBF_ALTMODE);
233}
234
235static inline int is_granted(struct dlm_lkb *lkb)
236{
237 return (lkb->lkb_status == DLM_LKSTS_GRANTED);
238}
239
David Teiglande7fd4172006-01-18 09:30:29 +0000240static inline int is_remote(struct dlm_rsb *r)
241{
242 DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r););
243 return !!r->res_nodeid;
244}
245
246static inline int is_process_copy(struct dlm_lkb *lkb)
247{
248 return (lkb->lkb_nodeid && !(lkb->lkb_flags & DLM_IFL_MSTCPY));
249}
250
251static inline int is_master_copy(struct dlm_lkb *lkb)
252{
253 if (lkb->lkb_flags & DLM_IFL_MSTCPY)
254 DLM_ASSERT(lkb->lkb_nodeid, dlm_print_lkb(lkb););
David Teigland90135922006-01-20 08:47:07 +0000255 return (lkb->lkb_flags & DLM_IFL_MSTCPY) ? 1 : 0;
David Teiglande7fd4172006-01-18 09:30:29 +0000256}
257
258static inline int middle_conversion(struct dlm_lkb *lkb)
259{
260 if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) ||
261 (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW))
David Teigland90135922006-01-20 08:47:07 +0000262 return 1;
263 return 0;
David Teiglande7fd4172006-01-18 09:30:29 +0000264}
265
266static inline int down_conversion(struct dlm_lkb *lkb)
267{
268 return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode);
269}
270
David Teiglandef0c2bb2007-03-28 09:56:46 -0500271static inline int is_overlap_unlock(struct dlm_lkb *lkb)
272{
273 return lkb->lkb_flags & DLM_IFL_OVERLAP_UNLOCK;
274}
275
276static inline int is_overlap_cancel(struct dlm_lkb *lkb)
277{
278 return lkb->lkb_flags & DLM_IFL_OVERLAP_CANCEL;
279}
280
281static inline int is_overlap(struct dlm_lkb *lkb)
282{
283 return (lkb->lkb_flags & (DLM_IFL_OVERLAP_UNLOCK |
284 DLM_IFL_OVERLAP_CANCEL));
285}
286
David Teiglande7fd4172006-01-18 09:30:29 +0000287static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
288{
289 if (is_master_copy(lkb))
290 return;
291
David Teigland3ae1acf2007-05-18 08:59:31 -0500292 del_timeout(lkb);
293
David Teiglande7fd4172006-01-18 09:30:29 +0000294 DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb););
295
David Teigland3ae1acf2007-05-18 08:59:31 -0500296 /* if the operation was a cancel, then return -DLM_ECANCEL, if a
297 timeout caused the cancel then return -ETIMEDOUT */
298 if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_TIMEOUT_CANCEL)) {
299 lkb->lkb_flags &= ~DLM_IFL_TIMEOUT_CANCEL;
300 rv = -ETIMEDOUT;
301 }
302
David Teiglande7fd4172006-01-18 09:30:29 +0000303 lkb->lkb_lksb->sb_status = rv;
304 lkb->lkb_lksb->sb_flags = lkb->lkb_sbflags;
305
306 dlm_add_ast(lkb, AST_COMP);
307}
308
David Teiglandef0c2bb2007-03-28 09:56:46 -0500309static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb)
310{
311 queue_cast(r, lkb,
312 is_overlap_unlock(lkb) ? -DLM_EUNLOCK : -DLM_ECANCEL);
313}
314
David Teiglande7fd4172006-01-18 09:30:29 +0000315static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
316{
317 if (is_master_copy(lkb))
318 send_bast(r, lkb, rqmode);
319 else {
320 lkb->lkb_bastmode = rqmode;
321 dlm_add_ast(lkb, AST_BAST);
322 }
323}
324
325/*
326 * Basic operations on rsb's and lkb's
327 */
328
329static struct dlm_rsb *create_rsb(struct dlm_ls *ls, char *name, int len)
330{
331 struct dlm_rsb *r;
332
333 r = allocate_rsb(ls, len);
334 if (!r)
335 return NULL;
336
337 r->res_ls = ls;
338 r->res_length = len;
339 memcpy(r->res_name, name, len);
David Teigland90135922006-01-20 08:47:07 +0000340 mutex_init(&r->res_mutex);
David Teiglande7fd4172006-01-18 09:30:29 +0000341
342 INIT_LIST_HEAD(&r->res_lookup);
343 INIT_LIST_HEAD(&r->res_grantqueue);
344 INIT_LIST_HEAD(&r->res_convertqueue);
345 INIT_LIST_HEAD(&r->res_waitqueue);
346 INIT_LIST_HEAD(&r->res_root_list);
347 INIT_LIST_HEAD(&r->res_recover_list);
348
349 return r;
350}
351
352static int search_rsb_list(struct list_head *head, char *name, int len,
353 unsigned int flags, struct dlm_rsb **r_ret)
354{
355 struct dlm_rsb *r;
356 int error = 0;
357
358 list_for_each_entry(r, head, res_hashchain) {
359 if (len == r->res_length && !memcmp(name, r->res_name, len))
360 goto found;
361 }
David Teigland597d0ca2006-07-12 16:44:04 -0500362 return -EBADR;
David Teiglande7fd4172006-01-18 09:30:29 +0000363
364 found:
365 if (r->res_nodeid && (flags & R_MASTER))
366 error = -ENOTBLK;
367 *r_ret = r;
368 return error;
369}
370
371static int _search_rsb(struct dlm_ls *ls, char *name, int len, int b,
372 unsigned int flags, struct dlm_rsb **r_ret)
373{
374 struct dlm_rsb *r;
375 int error;
376
377 error = search_rsb_list(&ls->ls_rsbtbl[b].list, name, len, flags, &r);
378 if (!error) {
379 kref_get(&r->res_ref);
380 goto out;
381 }
382 error = search_rsb_list(&ls->ls_rsbtbl[b].toss, name, len, flags, &r);
383 if (error)
384 goto out;
385
386 list_move(&r->res_hashchain, &ls->ls_rsbtbl[b].list);
387
388 if (dlm_no_directory(ls))
389 goto out;
390
391 if (r->res_nodeid == -1) {
392 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
393 r->res_first_lkid = 0;
394 } else if (r->res_nodeid > 0) {
395 rsb_set_flag(r, RSB_MASTER_UNCERTAIN);
396 r->res_first_lkid = 0;
397 } else {
398 DLM_ASSERT(r->res_nodeid == 0, dlm_print_rsb(r););
399 DLM_ASSERT(!rsb_flag(r, RSB_MASTER_UNCERTAIN),);
400 }
401 out:
402 *r_ret = r;
403 return error;
404}
405
406static int search_rsb(struct dlm_ls *ls, char *name, int len, int b,
407 unsigned int flags, struct dlm_rsb **r_ret)
408{
409 int error;
410 write_lock(&ls->ls_rsbtbl[b].lock);
411 error = _search_rsb(ls, name, len, b, flags, r_ret);
412 write_unlock(&ls->ls_rsbtbl[b].lock);
413 return error;
414}
415
416/*
417 * Find rsb in rsbtbl and potentially create/add one
418 *
419 * Delaying the release of rsb's has a similar benefit to applications keeping
420 * NL locks on an rsb, but without the guarantee that the cached master value
421 * will still be valid when the rsb is reused. Apps aren't always smart enough
422 * to keep NL locks on an rsb that they may lock again shortly; this can lead
423 * to excessive master lookups and removals if we don't delay the release.
424 *
425 * Searching for an rsb means looking through both the normal list and toss
426 * list. When found on the toss list the rsb is moved to the normal list with
427 * ref count of 1; when found on normal list the ref count is incremented.
428 */
429
430static int find_rsb(struct dlm_ls *ls, char *name, int namelen,
431 unsigned int flags, struct dlm_rsb **r_ret)
432{
433 struct dlm_rsb *r, *tmp;
434 uint32_t hash, bucket;
435 int error = 0;
436
437 if (dlm_no_directory(ls))
438 flags |= R_CREATE;
439
440 hash = jhash(name, namelen, 0);
441 bucket = hash & (ls->ls_rsbtbl_size - 1);
442
443 error = search_rsb(ls, name, namelen, bucket, flags, &r);
444 if (!error)
445 goto out;
446
David Teigland597d0ca2006-07-12 16:44:04 -0500447 if (error == -EBADR && !(flags & R_CREATE))
David Teiglande7fd4172006-01-18 09:30:29 +0000448 goto out;
449
450 /* the rsb was found but wasn't a master copy */
451 if (error == -ENOTBLK)
452 goto out;
453
454 error = -ENOMEM;
455 r = create_rsb(ls, name, namelen);
456 if (!r)
457 goto out;
458
459 r->res_hash = hash;
460 r->res_bucket = bucket;
461 r->res_nodeid = -1;
462 kref_init(&r->res_ref);
463
464 /* With no directory, the master can be set immediately */
465 if (dlm_no_directory(ls)) {
466 int nodeid = dlm_dir_nodeid(r);
467 if (nodeid == dlm_our_nodeid())
468 nodeid = 0;
469 r->res_nodeid = nodeid;
470 }
471
472 write_lock(&ls->ls_rsbtbl[bucket].lock);
473 error = _search_rsb(ls, name, namelen, bucket, 0, &tmp);
474 if (!error) {
475 write_unlock(&ls->ls_rsbtbl[bucket].lock);
476 free_rsb(r);
477 r = tmp;
478 goto out;
479 }
480 list_add(&r->res_hashchain, &ls->ls_rsbtbl[bucket].list);
481 write_unlock(&ls->ls_rsbtbl[bucket].lock);
482 error = 0;
483 out:
484 *r_ret = r;
485 return error;
486}
487
488int dlm_find_rsb(struct dlm_ls *ls, char *name, int namelen,
489 unsigned int flags, struct dlm_rsb **r_ret)
490{
491 return find_rsb(ls, name, namelen, flags, r_ret);
492}
493
494/* This is only called to add a reference when the code already holds
495 a valid reference to the rsb, so there's no need for locking. */
496
497static inline void hold_rsb(struct dlm_rsb *r)
498{
499 kref_get(&r->res_ref);
500}
501
502void dlm_hold_rsb(struct dlm_rsb *r)
503{
504 hold_rsb(r);
505}
506
507static void toss_rsb(struct kref *kref)
508{
509 struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
510 struct dlm_ls *ls = r->res_ls;
511
512 DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
513 kref_init(&r->res_ref);
514 list_move(&r->res_hashchain, &ls->ls_rsbtbl[r->res_bucket].toss);
515 r->res_toss_time = jiffies;
516 if (r->res_lvbptr) {
517 free_lvb(r->res_lvbptr);
518 r->res_lvbptr = NULL;
519 }
520}
521
522/* When all references to the rsb are gone it's transfered to
523 the tossed list for later disposal. */
524
525static void put_rsb(struct dlm_rsb *r)
526{
527 struct dlm_ls *ls = r->res_ls;
528 uint32_t bucket = r->res_bucket;
529
530 write_lock(&ls->ls_rsbtbl[bucket].lock);
531 kref_put(&r->res_ref, toss_rsb);
532 write_unlock(&ls->ls_rsbtbl[bucket].lock);
533}
534
535void dlm_put_rsb(struct dlm_rsb *r)
536{
537 put_rsb(r);
538}
539
540/* See comment for unhold_lkb */
541
542static void unhold_rsb(struct dlm_rsb *r)
543{
544 int rv;
545 rv = kref_put(&r->res_ref, toss_rsb);
David Teiglanda345da32006-08-18 11:54:25 -0500546 DLM_ASSERT(!rv, dlm_dump_rsb(r););
David Teiglande7fd4172006-01-18 09:30:29 +0000547}
548
549static void kill_rsb(struct kref *kref)
550{
551 struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
552
553 /* All work is done after the return from kref_put() so we
554 can release the write_lock before the remove and free. */
555
David Teiglanda345da32006-08-18 11:54:25 -0500556 DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r););
557 DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r););
558 DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r););
559 DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r););
560 DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r););
561 DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r););
David Teiglande7fd4172006-01-18 09:30:29 +0000562}
563
564/* Attaching/detaching lkb's from rsb's is for rsb reference counting.
565 The rsb must exist as long as any lkb's for it do. */
566
567static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
568{
569 hold_rsb(r);
570 lkb->lkb_resource = r;
571}
572
573static void detach_lkb(struct dlm_lkb *lkb)
574{
575 if (lkb->lkb_resource) {
576 put_rsb(lkb->lkb_resource);
577 lkb->lkb_resource = NULL;
578 }
579}
580
581static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
582{
583 struct dlm_lkb *lkb, *tmp;
584 uint32_t lkid = 0;
585 uint16_t bucket;
586
587 lkb = allocate_lkb(ls);
588 if (!lkb)
589 return -ENOMEM;
590
591 lkb->lkb_nodeid = -1;
592 lkb->lkb_grmode = DLM_LOCK_IV;
593 kref_init(&lkb->lkb_ref);
David Teigland34e22be2006-07-18 11:24:04 -0500594 INIT_LIST_HEAD(&lkb->lkb_ownqueue);
David Teiglandef0c2bb2007-03-28 09:56:46 -0500595 INIT_LIST_HEAD(&lkb->lkb_rsb_lookup);
David Teigland3ae1acf2007-05-18 08:59:31 -0500596 INIT_LIST_HEAD(&lkb->lkb_time_list);
David Teiglande7fd4172006-01-18 09:30:29 +0000597
598 get_random_bytes(&bucket, sizeof(bucket));
599 bucket &= (ls->ls_lkbtbl_size - 1);
600
601 write_lock(&ls->ls_lkbtbl[bucket].lock);
602
603 /* counter can roll over so we must verify lkid is not in use */
604
605 while (lkid == 0) {
David Teiglandce03f122007-04-02 12:12:55 -0500606 lkid = (bucket << 16) | ls->ls_lkbtbl[bucket].counter++;
David Teiglande7fd4172006-01-18 09:30:29 +0000607
608 list_for_each_entry(tmp, &ls->ls_lkbtbl[bucket].list,
609 lkb_idtbl_list) {
610 if (tmp->lkb_id != lkid)
611 continue;
612 lkid = 0;
613 break;
614 }
615 }
616
617 lkb->lkb_id = lkid;
618 list_add(&lkb->lkb_idtbl_list, &ls->ls_lkbtbl[bucket].list);
619 write_unlock(&ls->ls_lkbtbl[bucket].lock);
620
621 *lkb_ret = lkb;
622 return 0;
623}
624
625static struct dlm_lkb *__find_lkb(struct dlm_ls *ls, uint32_t lkid)
626{
David Teiglande7fd4172006-01-18 09:30:29 +0000627 struct dlm_lkb *lkb;
David Teiglandce03f122007-04-02 12:12:55 -0500628 uint16_t bucket = (lkid >> 16);
David Teiglande7fd4172006-01-18 09:30:29 +0000629
630 list_for_each_entry(lkb, &ls->ls_lkbtbl[bucket].list, lkb_idtbl_list) {
631 if (lkb->lkb_id == lkid)
632 return lkb;
633 }
634 return NULL;
635}
636
637static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
638{
639 struct dlm_lkb *lkb;
David Teiglandce03f122007-04-02 12:12:55 -0500640 uint16_t bucket = (lkid >> 16);
David Teiglande7fd4172006-01-18 09:30:29 +0000641
642 if (bucket >= ls->ls_lkbtbl_size)
643 return -EBADSLT;
644
645 read_lock(&ls->ls_lkbtbl[bucket].lock);
646 lkb = __find_lkb(ls, lkid);
647 if (lkb)
648 kref_get(&lkb->lkb_ref);
649 read_unlock(&ls->ls_lkbtbl[bucket].lock);
650
651 *lkb_ret = lkb;
652 return lkb ? 0 : -ENOENT;
653}
654
655static void kill_lkb(struct kref *kref)
656{
657 struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
658
659 /* All work is done after the return from kref_put() so we
660 can release the write_lock before the detach_lkb */
661
662 DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
663}
664
David Teiglandb3f58d82006-02-28 11:16:37 -0500665/* __put_lkb() is used when an lkb may not have an rsb attached to
666 it so we need to provide the lockspace explicitly */
667
668static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
David Teiglande7fd4172006-01-18 09:30:29 +0000669{
David Teiglandce03f122007-04-02 12:12:55 -0500670 uint16_t bucket = (lkb->lkb_id >> 16);
David Teiglande7fd4172006-01-18 09:30:29 +0000671
672 write_lock(&ls->ls_lkbtbl[bucket].lock);
673 if (kref_put(&lkb->lkb_ref, kill_lkb)) {
674 list_del(&lkb->lkb_idtbl_list);
675 write_unlock(&ls->ls_lkbtbl[bucket].lock);
676
677 detach_lkb(lkb);
678
679 /* for local/process lkbs, lvbptr points to caller's lksb */
680 if (lkb->lkb_lvbptr && is_master_copy(lkb))
681 free_lvb(lkb->lkb_lvbptr);
David Teiglande7fd4172006-01-18 09:30:29 +0000682 free_lkb(lkb);
683 return 1;
684 } else {
685 write_unlock(&ls->ls_lkbtbl[bucket].lock);
686 return 0;
687 }
688}
689
690int dlm_put_lkb(struct dlm_lkb *lkb)
691{
David Teiglandb3f58d82006-02-28 11:16:37 -0500692 struct dlm_ls *ls;
693
694 DLM_ASSERT(lkb->lkb_resource, dlm_print_lkb(lkb););
695 DLM_ASSERT(lkb->lkb_resource->res_ls, dlm_print_lkb(lkb););
696
697 ls = lkb->lkb_resource->res_ls;
698 return __put_lkb(ls, lkb);
David Teiglande7fd4172006-01-18 09:30:29 +0000699}
700
701/* This is only called to add a reference when the code already holds
702 a valid reference to the lkb, so there's no need for locking. */
703
704static inline void hold_lkb(struct dlm_lkb *lkb)
705{
706 kref_get(&lkb->lkb_ref);
707}
708
709/* This is called when we need to remove a reference and are certain
710 it's not the last ref. e.g. del_lkb is always called between a
711 find_lkb/put_lkb and is always the inverse of a previous add_lkb.
712 put_lkb would work fine, but would involve unnecessary locking */
713
714static inline void unhold_lkb(struct dlm_lkb *lkb)
715{
716 int rv;
717 rv = kref_put(&lkb->lkb_ref, kill_lkb);
718 DLM_ASSERT(!rv, dlm_print_lkb(lkb););
719}
720
721static void lkb_add_ordered(struct list_head *new, struct list_head *head,
722 int mode)
723{
724 struct dlm_lkb *lkb = NULL;
725
726 list_for_each_entry(lkb, head, lkb_statequeue)
727 if (lkb->lkb_rqmode < mode)
728 break;
729
730 if (!lkb)
731 list_add_tail(new, head);
732 else
733 __list_add(new, lkb->lkb_statequeue.prev, &lkb->lkb_statequeue);
734}
735
736/* add/remove lkb to rsb's grant/convert/wait queue */
737
738static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
739{
740 kref_get(&lkb->lkb_ref);
741
742 DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
743
744 lkb->lkb_status = status;
745
746 switch (status) {
747 case DLM_LKSTS_WAITING:
748 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
749 list_add(&lkb->lkb_statequeue, &r->res_waitqueue);
750 else
751 list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue);
752 break;
753 case DLM_LKSTS_GRANTED:
754 /* convention says granted locks kept in order of grmode */
755 lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue,
756 lkb->lkb_grmode);
757 break;
758 case DLM_LKSTS_CONVERT:
759 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
760 list_add(&lkb->lkb_statequeue, &r->res_convertqueue);
761 else
762 list_add_tail(&lkb->lkb_statequeue,
763 &r->res_convertqueue);
764 break;
765 default:
766 DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status););
767 }
768}
769
770static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
771{
772 lkb->lkb_status = 0;
773 list_del(&lkb->lkb_statequeue);
774 unhold_lkb(lkb);
775}
776
777static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts)
778{
779 hold_lkb(lkb);
780 del_lkb(r, lkb);
781 add_lkb(r, lkb, sts);
782 unhold_lkb(lkb);
783}
784
David Teiglandef0c2bb2007-03-28 09:56:46 -0500785static int msg_reply_type(int mstype)
786{
787 switch (mstype) {
788 case DLM_MSG_REQUEST:
789 return DLM_MSG_REQUEST_REPLY;
790 case DLM_MSG_CONVERT:
791 return DLM_MSG_CONVERT_REPLY;
792 case DLM_MSG_UNLOCK:
793 return DLM_MSG_UNLOCK_REPLY;
794 case DLM_MSG_CANCEL:
795 return DLM_MSG_CANCEL_REPLY;
796 case DLM_MSG_LOOKUP:
797 return DLM_MSG_LOOKUP_REPLY;
798 }
799 return -1;
800}
801
David Teiglande7fd4172006-01-18 09:30:29 +0000802/* add/remove lkb from global waiters list of lkb's waiting for
803 a reply from a remote node */
804
David Teiglandef0c2bb2007-03-28 09:56:46 -0500805static int add_to_waiters(struct dlm_lkb *lkb, int mstype)
David Teiglande7fd4172006-01-18 09:30:29 +0000806{
807 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
David Teiglandef0c2bb2007-03-28 09:56:46 -0500808 int error = 0;
David Teiglande7fd4172006-01-18 09:30:29 +0000809
David Teigland90135922006-01-20 08:47:07 +0000810 mutex_lock(&ls->ls_waiters_mutex);
David Teiglandef0c2bb2007-03-28 09:56:46 -0500811
812 if (is_overlap_unlock(lkb) ||
813 (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL))) {
814 error = -EINVAL;
David Teiglande7fd4172006-01-18 09:30:29 +0000815 goto out;
816 }
David Teiglandef0c2bb2007-03-28 09:56:46 -0500817
818 if (lkb->lkb_wait_type || is_overlap_cancel(lkb)) {
819 switch (mstype) {
820 case DLM_MSG_UNLOCK:
821 lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
822 break;
823 case DLM_MSG_CANCEL:
824 lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
825 break;
826 default:
827 error = -EBUSY;
828 goto out;
829 }
830 lkb->lkb_wait_count++;
831 hold_lkb(lkb);
832
833 log_debug(ls, "add overlap %x cur %d new %d count %d flags %x",
834 lkb->lkb_id, lkb->lkb_wait_type, mstype,
835 lkb->lkb_wait_count, lkb->lkb_flags);
836 goto out;
837 }
838
839 DLM_ASSERT(!lkb->lkb_wait_count,
840 dlm_print_lkb(lkb);
841 printk("wait_count %d\n", lkb->lkb_wait_count););
842
843 lkb->lkb_wait_count++;
David Teiglande7fd4172006-01-18 09:30:29 +0000844 lkb->lkb_wait_type = mstype;
David Teiglandef0c2bb2007-03-28 09:56:46 -0500845 hold_lkb(lkb);
David Teiglande7fd4172006-01-18 09:30:29 +0000846 list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
847 out:
David Teiglandef0c2bb2007-03-28 09:56:46 -0500848 if (error)
849 log_error(ls, "add_to_waiters %x error %d flags %x %d %d %s",
850 lkb->lkb_id, error, lkb->lkb_flags, mstype,
851 lkb->lkb_wait_type, lkb->lkb_resource->res_name);
David Teigland90135922006-01-20 08:47:07 +0000852 mutex_unlock(&ls->ls_waiters_mutex);
David Teiglandef0c2bb2007-03-28 09:56:46 -0500853 return error;
David Teiglande7fd4172006-01-18 09:30:29 +0000854}
855
David Teiglandb790c3b2007-01-24 10:21:33 -0600856/* We clear the RESEND flag because we might be taking an lkb off the waiters
857 list as part of process_requestqueue (e.g. a lookup that has an optimized
858 request reply on the requestqueue) between dlm_recover_waiters_pre() which
859 set RESEND and dlm_recover_waiters_post() */
860
David Teiglandef0c2bb2007-03-28 09:56:46 -0500861static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype)
David Teiglande7fd4172006-01-18 09:30:29 +0000862{
David Teiglandef0c2bb2007-03-28 09:56:46 -0500863 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
864 int overlap_done = 0;
David Teiglande7fd4172006-01-18 09:30:29 +0000865
David Teiglandef0c2bb2007-03-28 09:56:46 -0500866 if (is_overlap_unlock(lkb) && (mstype == DLM_MSG_UNLOCK_REPLY)) {
867 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
868 overlap_done = 1;
869 goto out_del;
David Teiglande7fd4172006-01-18 09:30:29 +0000870 }
David Teiglandef0c2bb2007-03-28 09:56:46 -0500871
872 if (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL_REPLY)) {
873 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
874 overlap_done = 1;
875 goto out_del;
876 }
877
878 /* N.B. type of reply may not always correspond to type of original
879 msg due to lookup->request optimization, verify others? */
880
881 if (lkb->lkb_wait_type) {
882 lkb->lkb_wait_type = 0;
883 goto out_del;
884 }
885
886 log_error(ls, "remove_from_waiters lkid %x flags %x types %d %d",
887 lkb->lkb_id, lkb->lkb_flags, mstype, lkb->lkb_wait_type);
888 return -1;
889
890 out_del:
891 /* the force-unlock/cancel has completed and we haven't recvd a reply
892 to the op that was in progress prior to the unlock/cancel; we
893 give up on any reply to the earlier op. FIXME: not sure when/how
894 this would happen */
895
896 if (overlap_done && lkb->lkb_wait_type) {
897 log_error(ls, "remove_from_waiters %x reply %d give up on %d",
898 lkb->lkb_id, mstype, lkb->lkb_wait_type);
899 lkb->lkb_wait_count--;
900 lkb->lkb_wait_type = 0;
901 }
902
903 DLM_ASSERT(lkb->lkb_wait_count, dlm_print_lkb(lkb););
904
David Teiglandb790c3b2007-01-24 10:21:33 -0600905 lkb->lkb_flags &= ~DLM_IFL_RESEND;
David Teiglandef0c2bb2007-03-28 09:56:46 -0500906 lkb->lkb_wait_count--;
907 if (!lkb->lkb_wait_count)
908 list_del_init(&lkb->lkb_wait_reply);
David Teiglande7fd4172006-01-18 09:30:29 +0000909 unhold_lkb(lkb);
David Teiglandef0c2bb2007-03-28 09:56:46 -0500910 return 0;
David Teiglande7fd4172006-01-18 09:30:29 +0000911}
912
David Teiglandef0c2bb2007-03-28 09:56:46 -0500913static int remove_from_waiters(struct dlm_lkb *lkb, int mstype)
David Teiglande7fd4172006-01-18 09:30:29 +0000914{
915 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
916 int error;
917
David Teigland90135922006-01-20 08:47:07 +0000918 mutex_lock(&ls->ls_waiters_mutex);
David Teiglandef0c2bb2007-03-28 09:56:46 -0500919 error = _remove_from_waiters(lkb, mstype);
David Teigland90135922006-01-20 08:47:07 +0000920 mutex_unlock(&ls->ls_waiters_mutex);
David Teiglande7fd4172006-01-18 09:30:29 +0000921 return error;
922}
923
David Teiglandef0c2bb2007-03-28 09:56:46 -0500924/* Handles situations where we might be processing a "fake" or "stub" reply in
925 which we can't try to take waiters_mutex again. */
926
927static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms)
928{
929 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
930 int error;
931
932 if (ms != &ls->ls_stub_ms)
933 mutex_lock(&ls->ls_waiters_mutex);
934 error = _remove_from_waiters(lkb, ms->m_type);
935 if (ms != &ls->ls_stub_ms)
936 mutex_unlock(&ls->ls_waiters_mutex);
937 return error;
938}
939
David Teiglande7fd4172006-01-18 09:30:29 +0000940static void dir_remove(struct dlm_rsb *r)
941{
942 int to_nodeid;
943
944 if (dlm_no_directory(r->res_ls))
945 return;
946
947 to_nodeid = dlm_dir_nodeid(r);
948 if (to_nodeid != dlm_our_nodeid())
949 send_remove(r);
950 else
951 dlm_dir_remove_entry(r->res_ls, to_nodeid,
952 r->res_name, r->res_length);
953}
954
955/* FIXME: shouldn't this be able to exit as soon as one non-due rsb is
956 found since they are in order of newest to oldest? */
957
958static int shrink_bucket(struct dlm_ls *ls, int b)
959{
960 struct dlm_rsb *r;
961 int count = 0, found;
962
963 for (;;) {
David Teigland90135922006-01-20 08:47:07 +0000964 found = 0;
David Teiglande7fd4172006-01-18 09:30:29 +0000965 write_lock(&ls->ls_rsbtbl[b].lock);
966 list_for_each_entry_reverse(r, &ls->ls_rsbtbl[b].toss,
967 res_hashchain) {
968 if (!time_after_eq(jiffies, r->res_toss_time +
David Teigland68c817a2007-01-09 09:41:48 -0600969 dlm_config.ci_toss_secs * HZ))
David Teiglande7fd4172006-01-18 09:30:29 +0000970 continue;
David Teigland90135922006-01-20 08:47:07 +0000971 found = 1;
David Teiglande7fd4172006-01-18 09:30:29 +0000972 break;
973 }
974
975 if (!found) {
976 write_unlock(&ls->ls_rsbtbl[b].lock);
977 break;
978 }
979
980 if (kref_put(&r->res_ref, kill_rsb)) {
981 list_del(&r->res_hashchain);
982 write_unlock(&ls->ls_rsbtbl[b].lock);
983
984 if (is_master(r))
985 dir_remove(r);
986 free_rsb(r);
987 count++;
988 } else {
989 write_unlock(&ls->ls_rsbtbl[b].lock);
990 log_error(ls, "tossed rsb in use %s", r->res_name);
991 }
992 }
993
994 return count;
995}
996
997void dlm_scan_rsbs(struct dlm_ls *ls)
998{
999 int i;
1000
David Teiglande7fd4172006-01-18 09:30:29 +00001001 for (i = 0; i < ls->ls_rsbtbl_size; i++) {
1002 shrink_bucket(ls, i);
David Teigland85e86ed2007-05-18 08:58:15 -05001003 if (dlm_locking_stopped(ls))
1004 break;
David Teiglande7fd4172006-01-18 09:30:29 +00001005 cond_resched();
1006 }
1007}
1008
David Teigland3ae1acf2007-05-18 08:59:31 -05001009static void add_timeout(struct dlm_lkb *lkb)
1010{
1011 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1012
1013 if (is_master_copy(lkb))
1014 return;
1015
1016 if (lkb->lkb_exflags & DLM_LKF_TIMEOUT)
1017 goto add_it;
1018
1019 if (test_bit(LSFL_TIMEWARN, &ls->ls_flags) &&
1020 !(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
1021 lkb->lkb_flags |= DLM_IFL_WATCH_TIMEWARN;
1022 goto add_it;
1023 }
1024 return;
1025
1026 add_it:
1027 DLM_ASSERT(list_empty(&lkb->lkb_time_list), dlm_print_lkb(lkb););
1028 mutex_lock(&ls->ls_timeout_mutex);
1029 hold_lkb(lkb);
1030 lkb->lkb_timestamp = jiffies;
1031 list_add_tail(&lkb->lkb_time_list, &ls->ls_timeout);
1032 mutex_unlock(&ls->ls_timeout_mutex);
1033}
1034
1035static void del_timeout(struct dlm_lkb *lkb)
1036{
1037 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1038
1039 mutex_lock(&ls->ls_timeout_mutex);
1040 if (!list_empty(&lkb->lkb_time_list)) {
1041 list_del_init(&lkb->lkb_time_list);
1042 unhold_lkb(lkb);
1043 }
1044 mutex_unlock(&ls->ls_timeout_mutex);
1045}
1046
1047/* FIXME: is it safe to look at lkb_exflags, lkb_flags, lkb_timestamp, and
1048 lkb_lksb_timeout without lock_rsb? Note: we can't lock timeout_mutex
1049 and then lock rsb because of lock ordering in add_timeout. We may need
1050 to specify some special timeout-related bits in the lkb that are just to
1051 be accessed under the timeout_mutex. */
1052
1053void dlm_scan_timeout(struct dlm_ls *ls)
1054{
1055 struct dlm_rsb *r;
1056 struct dlm_lkb *lkb;
1057 int do_cancel, do_warn;
1058
1059 for (;;) {
1060 if (dlm_locking_stopped(ls))
1061 break;
1062
1063 do_cancel = 0;
1064 do_warn = 0;
1065 mutex_lock(&ls->ls_timeout_mutex);
1066 list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list) {
1067
1068 if ((lkb->lkb_exflags & DLM_LKF_TIMEOUT) &&
1069 time_after_eq(jiffies, lkb->lkb_timestamp +
1070 lkb->lkb_timeout_cs * HZ/100))
1071 do_cancel = 1;
1072
1073 if ((lkb->lkb_flags & DLM_IFL_WATCH_TIMEWARN) &&
1074 time_after_eq(jiffies, lkb->lkb_timestamp +
1075 dlm_config.ci_timewarn_cs * HZ/100))
1076 do_warn = 1;
1077
1078 if (!do_cancel && !do_warn)
1079 continue;
1080 hold_lkb(lkb);
1081 break;
1082 }
1083 mutex_unlock(&ls->ls_timeout_mutex);
1084
1085 if (!do_cancel && !do_warn)
1086 break;
1087
1088 r = lkb->lkb_resource;
1089 hold_rsb(r);
1090 lock_rsb(r);
1091
1092 if (do_warn) {
1093 /* clear flag so we only warn once */
1094 lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1095 if (!(lkb->lkb_exflags & DLM_LKF_TIMEOUT))
1096 del_timeout(lkb);
1097 dlm_timeout_warn(lkb);
1098 }
1099
1100 if (do_cancel) {
Steven Whitehouseb3cab7b2007-05-29 11:14:21 +01001101 log_debug(ls, "timeout cancel %x node %d %s",
David Teigland639aca42007-05-18 16:02:57 -05001102 lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
David Teigland3ae1acf2007-05-18 08:59:31 -05001103 lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1104 lkb->lkb_flags |= DLM_IFL_TIMEOUT_CANCEL;
1105 del_timeout(lkb);
1106 _cancel_lock(r, lkb);
1107 }
1108
1109 unlock_rsb(r);
1110 unhold_rsb(r);
1111 dlm_put_lkb(lkb);
1112 }
1113}
1114
1115/* This is only called by dlm_recoverd, and we rely on dlm_ls_stop() stopping
1116 dlm_recoverd before checking/setting ls_recover_begin. */
1117
1118void dlm_adjust_timeouts(struct dlm_ls *ls)
1119{
1120 struct dlm_lkb *lkb;
1121 long adj = jiffies - ls->ls_recover_begin;
1122
1123 ls->ls_recover_begin = 0;
1124 mutex_lock(&ls->ls_timeout_mutex);
1125 list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list)
1126 lkb->lkb_timestamp += adj;
1127 mutex_unlock(&ls->ls_timeout_mutex);
1128}
1129
David Teiglande7fd4172006-01-18 09:30:29 +00001130/* lkb is master or local copy */
1131
1132static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1133{
1134 int b, len = r->res_ls->ls_lvblen;
1135
1136 /* b=1 lvb returned to caller
1137 b=0 lvb written to rsb or invalidated
1138 b=-1 do nothing */
1139
1140 b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1141
1142 if (b == 1) {
1143 if (!lkb->lkb_lvbptr)
1144 return;
1145
1146 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1147 return;
1148
1149 if (!r->res_lvbptr)
1150 return;
1151
1152 memcpy(lkb->lkb_lvbptr, r->res_lvbptr, len);
1153 lkb->lkb_lvbseq = r->res_lvbseq;
1154
1155 } else if (b == 0) {
1156 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1157 rsb_set_flag(r, RSB_VALNOTVALID);
1158 return;
1159 }
1160
1161 if (!lkb->lkb_lvbptr)
1162 return;
1163
1164 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1165 return;
1166
1167 if (!r->res_lvbptr)
1168 r->res_lvbptr = allocate_lvb(r->res_ls);
1169
1170 if (!r->res_lvbptr)
1171 return;
1172
1173 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, len);
1174 r->res_lvbseq++;
1175 lkb->lkb_lvbseq = r->res_lvbseq;
1176 rsb_clear_flag(r, RSB_VALNOTVALID);
1177 }
1178
1179 if (rsb_flag(r, RSB_VALNOTVALID))
1180 lkb->lkb_sbflags |= DLM_SBF_VALNOTVALID;
1181}
1182
1183static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1184{
1185 if (lkb->lkb_grmode < DLM_LOCK_PW)
1186 return;
1187
1188 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1189 rsb_set_flag(r, RSB_VALNOTVALID);
1190 return;
1191 }
1192
1193 if (!lkb->lkb_lvbptr)
1194 return;
1195
1196 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1197 return;
1198
1199 if (!r->res_lvbptr)
1200 r->res_lvbptr = allocate_lvb(r->res_ls);
1201
1202 if (!r->res_lvbptr)
1203 return;
1204
1205 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
1206 r->res_lvbseq++;
1207 rsb_clear_flag(r, RSB_VALNOTVALID);
1208}
1209
1210/* lkb is process copy (pc) */
1211
1212static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1213 struct dlm_message *ms)
1214{
1215 int b;
1216
1217 if (!lkb->lkb_lvbptr)
1218 return;
1219
1220 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1221 return;
1222
David Teigland597d0ca2006-07-12 16:44:04 -05001223 b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
David Teiglande7fd4172006-01-18 09:30:29 +00001224 if (b == 1) {
1225 int len = receive_extralen(ms);
1226 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
1227 lkb->lkb_lvbseq = ms->m_lvbseq;
1228 }
1229}
1230
1231/* Manipulate lkb's on rsb's convert/granted/waiting queues
1232 remove_lock -- used for unlock, removes lkb from granted
1233 revert_lock -- used for cancel, moves lkb from convert to granted
1234 grant_lock -- used for request and convert, adds lkb to granted or
1235 moves lkb from convert or waiting to granted
1236
1237 Each of these is used for master or local copy lkb's. There is
1238 also a _pc() variation used to make the corresponding change on
1239 a process copy (pc) lkb. */
1240
1241static void _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1242{
1243 del_lkb(r, lkb);
1244 lkb->lkb_grmode = DLM_LOCK_IV;
1245 /* this unhold undoes the original ref from create_lkb()
1246 so this leads to the lkb being freed */
1247 unhold_lkb(lkb);
1248}
1249
1250static void remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1251{
1252 set_lvb_unlock(r, lkb);
1253 _remove_lock(r, lkb);
1254}
1255
1256static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1257{
1258 _remove_lock(r, lkb);
1259}
1260
David Teiglandef0c2bb2007-03-28 09:56:46 -05001261/* returns: 0 did nothing
1262 1 moved lock to granted
1263 -1 removed lock */
1264
1265static int revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
David Teiglande7fd4172006-01-18 09:30:29 +00001266{
David Teiglandef0c2bb2007-03-28 09:56:46 -05001267 int rv = 0;
1268
David Teiglande7fd4172006-01-18 09:30:29 +00001269 lkb->lkb_rqmode = DLM_LOCK_IV;
1270
1271 switch (lkb->lkb_status) {
David Teigland597d0ca2006-07-12 16:44:04 -05001272 case DLM_LKSTS_GRANTED:
1273 break;
David Teiglande7fd4172006-01-18 09:30:29 +00001274 case DLM_LKSTS_CONVERT:
1275 move_lkb(r, lkb, DLM_LKSTS_GRANTED);
David Teiglandef0c2bb2007-03-28 09:56:46 -05001276 rv = 1;
David Teiglande7fd4172006-01-18 09:30:29 +00001277 break;
1278 case DLM_LKSTS_WAITING:
1279 del_lkb(r, lkb);
1280 lkb->lkb_grmode = DLM_LOCK_IV;
1281 /* this unhold undoes the original ref from create_lkb()
1282 so this leads to the lkb being freed */
1283 unhold_lkb(lkb);
David Teiglandef0c2bb2007-03-28 09:56:46 -05001284 rv = -1;
David Teiglande7fd4172006-01-18 09:30:29 +00001285 break;
1286 default:
1287 log_print("invalid status for revert %d", lkb->lkb_status);
1288 }
David Teiglandef0c2bb2007-03-28 09:56:46 -05001289 return rv;
David Teiglande7fd4172006-01-18 09:30:29 +00001290}
1291
David Teiglandef0c2bb2007-03-28 09:56:46 -05001292static int revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
David Teiglande7fd4172006-01-18 09:30:29 +00001293{
David Teiglandef0c2bb2007-03-28 09:56:46 -05001294 return revert_lock(r, lkb);
David Teiglande7fd4172006-01-18 09:30:29 +00001295}
1296
1297static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1298{
1299 if (lkb->lkb_grmode != lkb->lkb_rqmode) {
1300 lkb->lkb_grmode = lkb->lkb_rqmode;
1301 if (lkb->lkb_status)
1302 move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1303 else
1304 add_lkb(r, lkb, DLM_LKSTS_GRANTED);
1305 }
1306
1307 lkb->lkb_rqmode = DLM_LOCK_IV;
David Teiglande7fd4172006-01-18 09:30:29 +00001308}
1309
1310static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1311{
1312 set_lvb_lock(r, lkb);
1313 _grant_lock(r, lkb);
1314 lkb->lkb_highbast = 0;
1315}
1316
1317static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1318 struct dlm_message *ms)
1319{
1320 set_lvb_lock_pc(r, lkb, ms);
1321 _grant_lock(r, lkb);
1322}
1323
1324/* called by grant_pending_locks() which means an async grant message must
1325 be sent to the requesting node in addition to granting the lock if the
1326 lkb belongs to a remote node. */
1327
1328static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb)
1329{
1330 grant_lock(r, lkb);
1331 if (is_master_copy(lkb))
1332 send_grant(r, lkb);
1333 else
1334 queue_cast(r, lkb, 0);
1335}
1336
David Teigland7d3c1fe2007-04-19 10:30:41 -05001337/* The special CONVDEADLK, ALTPR and ALTCW flags allow the master to
1338 change the granted/requested modes. We're munging things accordingly in
1339 the process copy.
1340 CONVDEADLK: our grmode may have been forced down to NL to resolve a
1341 conversion deadlock
1342 ALTPR/ALTCW: our rqmode may have been changed to PR or CW to become
1343 compatible with other granted locks */
1344
1345static void munge_demoted(struct dlm_lkb *lkb, struct dlm_message *ms)
1346{
1347 if (ms->m_type != DLM_MSG_CONVERT_REPLY) {
1348 log_print("munge_demoted %x invalid reply type %d",
1349 lkb->lkb_id, ms->m_type);
1350 return;
1351 }
1352
1353 if (lkb->lkb_rqmode == DLM_LOCK_IV || lkb->lkb_grmode == DLM_LOCK_IV) {
1354 log_print("munge_demoted %x invalid modes gr %d rq %d",
1355 lkb->lkb_id, lkb->lkb_grmode, lkb->lkb_rqmode);
1356 return;
1357 }
1358
1359 lkb->lkb_grmode = DLM_LOCK_NL;
1360}
1361
1362static void munge_altmode(struct dlm_lkb *lkb, struct dlm_message *ms)
1363{
1364 if (ms->m_type != DLM_MSG_REQUEST_REPLY &&
1365 ms->m_type != DLM_MSG_GRANT) {
1366 log_print("munge_altmode %x invalid reply type %d",
1367 lkb->lkb_id, ms->m_type);
1368 return;
1369 }
1370
1371 if (lkb->lkb_exflags & DLM_LKF_ALTPR)
1372 lkb->lkb_rqmode = DLM_LOCK_PR;
1373 else if (lkb->lkb_exflags & DLM_LKF_ALTCW)
1374 lkb->lkb_rqmode = DLM_LOCK_CW;
1375 else {
1376 log_print("munge_altmode invalid exflags %x", lkb->lkb_exflags);
1377 dlm_print_lkb(lkb);
1378 }
1379}
1380
David Teiglande7fd4172006-01-18 09:30:29 +00001381static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head)
1382{
1383 struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb,
1384 lkb_statequeue);
1385 if (lkb->lkb_id == first->lkb_id)
David Teigland90135922006-01-20 08:47:07 +00001386 return 1;
David Teiglande7fd4172006-01-18 09:30:29 +00001387
David Teigland90135922006-01-20 08:47:07 +00001388 return 0;
David Teiglande7fd4172006-01-18 09:30:29 +00001389}
1390
David Teiglande7fd4172006-01-18 09:30:29 +00001391/* Check if the given lkb conflicts with another lkb on the queue. */
1392
1393static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb)
1394{
1395 struct dlm_lkb *this;
1396
1397 list_for_each_entry(this, head, lkb_statequeue) {
1398 if (this == lkb)
1399 continue;
David Teigland3bcd3682006-02-23 09:56:38 +00001400 if (!modes_compat(this, lkb))
David Teigland90135922006-01-20 08:47:07 +00001401 return 1;
David Teiglande7fd4172006-01-18 09:30:29 +00001402 }
David Teigland90135922006-01-20 08:47:07 +00001403 return 0;
David Teiglande7fd4172006-01-18 09:30:29 +00001404}
1405
1406/*
1407 * "A conversion deadlock arises with a pair of lock requests in the converting
1408 * queue for one resource. The granted mode of each lock blocks the requested
1409 * mode of the other lock."
1410 *
David Teiglandc85d65e2007-05-18 09:01:26 -05001411 * Part 2: if the granted mode of lkb is preventing an earlier lkb in the
1412 * convert queue from being granted, then deadlk/demote lkb.
David Teiglande7fd4172006-01-18 09:30:29 +00001413 *
1414 * Example:
1415 * Granted Queue: empty
1416 * Convert Queue: NL->EX (first lock)
1417 * PR->EX (second lock)
1418 *
1419 * The first lock can't be granted because of the granted mode of the second
1420 * lock and the second lock can't be granted because it's not first in the
David Teiglandc85d65e2007-05-18 09:01:26 -05001421 * list. We either cancel lkb's conversion (PR->EX) and return EDEADLK, or we
1422 * demote the granted mode of lkb (from PR to NL) if it has the CONVDEADLK
1423 * flag set and return DEMOTED in the lksb flags.
David Teiglande7fd4172006-01-18 09:30:29 +00001424 *
David Teiglandc85d65e2007-05-18 09:01:26 -05001425 * Originally, this function detected conv-deadlk in a more limited scope:
1426 * - if !modes_compat(lkb1, lkb2) && !modes_compat(lkb2, lkb1), or
1427 * - if lkb1 was the first entry in the queue (not just earlier), and was
1428 * blocked by the granted mode of lkb2, and there was nothing on the
1429 * granted queue preventing lkb1 from being granted immediately, i.e.
1430 * lkb2 was the only thing preventing lkb1 from being granted.
1431 *
1432 * That second condition meant we'd only say there was conv-deadlk if
1433 * resolving it (by demotion) would lead to the first lock on the convert
1434 * queue being granted right away. It allowed conversion deadlocks to exist
1435 * between locks on the convert queue while they couldn't be granted anyway.
1436 *
1437 * Now, we detect and take action on conversion deadlocks immediately when
1438 * they're created, even if they may not be immediately consequential. If
1439 * lkb1 exists anywhere in the convert queue and lkb2 comes in with a granted
1440 * mode that would prevent lkb1's conversion from being granted, we do a
1441 * deadlk/demote on lkb2 right away and don't let it onto the convert queue.
1442 * I think this means that the lkb_is_ahead condition below should always
1443 * be zero, i.e. there will never be conv-deadlk between two locks that are
1444 * both already on the convert queue.
David Teiglande7fd4172006-01-18 09:30:29 +00001445 */
1446
David Teiglandc85d65e2007-05-18 09:01:26 -05001447static int conversion_deadlock_detect(struct dlm_rsb *r, struct dlm_lkb *lkb2)
David Teiglande7fd4172006-01-18 09:30:29 +00001448{
David Teiglandc85d65e2007-05-18 09:01:26 -05001449 struct dlm_lkb *lkb1;
1450 int lkb_is_ahead = 0;
David Teiglande7fd4172006-01-18 09:30:29 +00001451
David Teiglandc85d65e2007-05-18 09:01:26 -05001452 list_for_each_entry(lkb1, &r->res_convertqueue, lkb_statequeue) {
1453 if (lkb1 == lkb2) {
1454 lkb_is_ahead = 1;
David Teiglande7fd4172006-01-18 09:30:29 +00001455 continue;
1456 }
1457
David Teiglandc85d65e2007-05-18 09:01:26 -05001458 if (!lkb_is_ahead) {
1459 if (!modes_compat(lkb2, lkb1))
1460 return 1;
1461 } else {
1462 if (!modes_compat(lkb2, lkb1) &&
1463 !modes_compat(lkb1, lkb2))
1464 return 1;
1465 }
David Teiglande7fd4172006-01-18 09:30:29 +00001466 }
David Teigland90135922006-01-20 08:47:07 +00001467 return 0;
David Teiglande7fd4172006-01-18 09:30:29 +00001468}
1469
1470/*
1471 * Return 1 if the lock can be granted, 0 otherwise.
1472 * Also detect and resolve conversion deadlocks.
1473 *
1474 * lkb is the lock to be granted
1475 *
1476 * now is 1 if the function is being called in the context of the
1477 * immediate request, it is 0 if called later, after the lock has been
1478 * queued.
1479 *
1480 * References are from chapter 6 of "VAXcluster Principles" by Roy Davis
1481 */
1482
1483static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
1484{
1485 int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV);
1486
1487 /*
1488 * 6-10: Version 5.4 introduced an option to address the phenomenon of
1489 * a new request for a NL mode lock being blocked.
1490 *
1491 * 6-11: If the optional EXPEDITE flag is used with the new NL mode
1492 * request, then it would be granted. In essence, the use of this flag
1493 * tells the Lock Manager to expedite theis request by not considering
1494 * what may be in the CONVERTING or WAITING queues... As of this
1495 * writing, the EXPEDITE flag can be used only with new requests for NL
1496 * mode locks. This flag is not valid for conversion requests.
1497 *
1498 * A shortcut. Earlier checks return an error if EXPEDITE is used in a
1499 * conversion or used with a non-NL requested mode. We also know an
1500 * EXPEDITE request is always granted immediately, so now must always
1501 * be 1. The full condition to grant an expedite request: (now &&
1502 * !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can
1503 * therefore be shortened to just checking the flag.
1504 */
1505
1506 if (lkb->lkb_exflags & DLM_LKF_EXPEDITE)
David Teigland90135922006-01-20 08:47:07 +00001507 return 1;
David Teiglande7fd4172006-01-18 09:30:29 +00001508
1509 /*
1510 * A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be
1511 * added to the remaining conditions.
1512 */
1513
1514 if (queue_conflict(&r->res_grantqueue, lkb))
1515 goto out;
1516
1517 /*
1518 * 6-3: By default, a conversion request is immediately granted if the
1519 * requested mode is compatible with the modes of all other granted
1520 * locks
1521 */
1522
1523 if (queue_conflict(&r->res_convertqueue, lkb))
1524 goto out;
1525
1526 /*
1527 * 6-5: But the default algorithm for deciding whether to grant or
1528 * queue conversion requests does not by itself guarantee that such
1529 * requests are serviced on a "first come first serve" basis. This, in
1530 * turn, can lead to a phenomenon known as "indefinate postponement".
1531 *
1532 * 6-7: This issue is dealt with by using the optional QUECVT flag with
1533 * the system service employed to request a lock conversion. This flag
1534 * forces certain conversion requests to be queued, even if they are
1535 * compatible with the granted modes of other locks on the same
1536 * resource. Thus, the use of this flag results in conversion requests
1537 * being ordered on a "first come first servce" basis.
1538 *
1539 * DCT: This condition is all about new conversions being able to occur
1540 * "in place" while the lock remains on the granted queue (assuming
1541 * nothing else conflicts.) IOW if QUECVT isn't set, a conversion
1542 * doesn't _have_ to go onto the convert queue where it's processed in
1543 * order. The "now" variable is necessary to distinguish converts
1544 * being received and processed for the first time now, because once a
1545 * convert is moved to the conversion queue the condition below applies
1546 * requiring fifo granting.
1547 */
1548
1549 if (now && conv && !(lkb->lkb_exflags & DLM_LKF_QUECVT))
David Teigland90135922006-01-20 08:47:07 +00001550 return 1;
David Teiglande7fd4172006-01-18 09:30:29 +00001551
1552 /*
David Teigland3bcd3682006-02-23 09:56:38 +00001553 * The NOORDER flag is set to avoid the standard vms rules on grant
1554 * order.
David Teiglande7fd4172006-01-18 09:30:29 +00001555 */
1556
1557 if (lkb->lkb_exflags & DLM_LKF_NOORDER)
David Teigland90135922006-01-20 08:47:07 +00001558 return 1;
David Teiglande7fd4172006-01-18 09:30:29 +00001559
1560 /*
1561 * 6-3: Once in that queue [CONVERTING], a conversion request cannot be
1562 * granted until all other conversion requests ahead of it are granted
1563 * and/or canceled.
1564 */
1565
1566 if (!now && conv && first_in_list(lkb, &r->res_convertqueue))
David Teigland90135922006-01-20 08:47:07 +00001567 return 1;
David Teiglande7fd4172006-01-18 09:30:29 +00001568
1569 /*
1570 * 6-4: By default, a new request is immediately granted only if all
1571 * three of the following conditions are satisfied when the request is
1572 * issued:
1573 * - The queue of ungranted conversion requests for the resource is
1574 * empty.
1575 * - The queue of ungranted new requests for the resource is empty.
1576 * - The mode of the new request is compatible with the most
1577 * restrictive mode of all granted locks on the resource.
1578 */
1579
1580 if (now && !conv && list_empty(&r->res_convertqueue) &&
1581 list_empty(&r->res_waitqueue))
David Teigland90135922006-01-20 08:47:07 +00001582 return 1;
David Teiglande7fd4172006-01-18 09:30:29 +00001583
1584 /*
1585 * 6-4: Once a lock request is in the queue of ungranted new requests,
1586 * it cannot be granted until the queue of ungranted conversion
1587 * requests is empty, all ungranted new requests ahead of it are
1588 * granted and/or canceled, and it is compatible with the granted mode
1589 * of the most restrictive lock granted on the resource.
1590 */
1591
1592 if (!now && !conv && list_empty(&r->res_convertqueue) &&
1593 first_in_list(lkb, &r->res_waitqueue))
David Teigland90135922006-01-20 08:47:07 +00001594 return 1;
David Teiglande7fd4172006-01-18 09:30:29 +00001595 out:
David Teigland90135922006-01-20 08:47:07 +00001596 return 0;
David Teiglande7fd4172006-01-18 09:30:29 +00001597}
1598
David Teiglandc85d65e2007-05-18 09:01:26 -05001599static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
1600 int *err)
David Teiglande7fd4172006-01-18 09:30:29 +00001601{
David Teiglande7fd4172006-01-18 09:30:29 +00001602 int rv;
1603 int8_t alt = 0, rqmode = lkb->lkb_rqmode;
David Teiglandc85d65e2007-05-18 09:01:26 -05001604 int8_t is_convert = (lkb->lkb_grmode != DLM_LOCK_IV);
1605
1606 if (err)
1607 *err = 0;
David Teiglande7fd4172006-01-18 09:30:29 +00001608
1609 rv = _can_be_granted(r, lkb, now);
1610 if (rv)
1611 goto out;
1612
David Teiglandc85d65e2007-05-18 09:01:26 -05001613 /*
1614 * The CONVDEADLK flag is non-standard and tells the dlm to resolve
1615 * conversion deadlocks by demoting grmode to NL, otherwise the dlm
1616 * cancels one of the locks.
1617 */
David Teiglande7fd4172006-01-18 09:30:29 +00001618
David Teiglandc85d65e2007-05-18 09:01:26 -05001619 if (is_convert && can_be_queued(lkb) &&
1620 conversion_deadlock_detect(r, lkb)) {
1621 if (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) {
1622 lkb->lkb_grmode = DLM_LOCK_NL;
1623 lkb->lkb_sbflags |= DLM_SBF_DEMOTED;
1624 } else if (!(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
1625 if (err)
1626 *err = -EDEADLK;
1627 else {
1628 log_print("can_be_granted deadlock %x now %d",
1629 lkb->lkb_id, now);
1630 dlm_dump_rsb(r);
1631 }
1632 }
1633 goto out;
1634 }
1635
1636 /*
1637 * The ALTPR and ALTCW flags are non-standard and tell the dlm to try
1638 * to grant a request in a mode other than the normal rqmode. It's a
1639 * simple way to provide a big optimization to applications that can
1640 * use them.
1641 */
1642
1643 if (rqmode != DLM_LOCK_PR && (lkb->lkb_exflags & DLM_LKF_ALTPR))
David Teiglande7fd4172006-01-18 09:30:29 +00001644 alt = DLM_LOCK_PR;
David Teiglandc85d65e2007-05-18 09:01:26 -05001645 else if (rqmode != DLM_LOCK_CW && (lkb->lkb_exflags & DLM_LKF_ALTCW))
David Teiglande7fd4172006-01-18 09:30:29 +00001646 alt = DLM_LOCK_CW;
1647
1648 if (alt) {
1649 lkb->lkb_rqmode = alt;
1650 rv = _can_be_granted(r, lkb, now);
1651 if (rv)
1652 lkb->lkb_sbflags |= DLM_SBF_ALTMODE;
1653 else
1654 lkb->lkb_rqmode = rqmode;
1655 }
1656 out:
1657 return rv;
1658}
1659
David Teiglandc85d65e2007-05-18 09:01:26 -05001660/* FIXME: I don't think that can_be_granted() can/will demote or find deadlock
1661 for locks pending on the convert list. Once verified (watch for these
1662 log_prints), we should be able to just call _can_be_granted() and not
1663 bother with the demote/deadlk cases here (and there's no easy way to deal
1664 with a deadlk here, we'd have to generate something like grant_lock with
1665 the deadlk error.) */
1666
1667/* returns the highest requested mode of all blocked conversions */
1668
David Teiglande7fd4172006-01-18 09:30:29 +00001669static int grant_pending_convert(struct dlm_rsb *r, int high)
1670{
1671 struct dlm_lkb *lkb, *s;
1672 int hi, demoted, quit, grant_restart, demote_restart;
David Teiglandc85d65e2007-05-18 09:01:26 -05001673 int deadlk;
David Teiglande7fd4172006-01-18 09:30:29 +00001674
1675 quit = 0;
1676 restart:
1677 grant_restart = 0;
1678 demote_restart = 0;
1679 hi = DLM_LOCK_IV;
1680
1681 list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) {
1682 demoted = is_demoted(lkb);
David Teiglandc85d65e2007-05-18 09:01:26 -05001683 deadlk = 0;
1684
1685 if (can_be_granted(r, lkb, 0, &deadlk)) {
David Teiglande7fd4172006-01-18 09:30:29 +00001686 grant_lock_pending(r, lkb);
1687 grant_restart = 1;
David Teiglandc85d65e2007-05-18 09:01:26 -05001688 continue;
David Teiglande7fd4172006-01-18 09:30:29 +00001689 }
David Teiglandc85d65e2007-05-18 09:01:26 -05001690
1691 if (!demoted && is_demoted(lkb)) {
1692 log_print("WARN: pending demoted %x node %d %s",
1693 lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1694 demote_restart = 1;
1695 continue;
1696 }
1697
1698 if (deadlk) {
1699 log_print("WARN: pending deadlock %x node %d %s",
1700 lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1701 dlm_dump_rsb(r);
1702 continue;
1703 }
1704
1705 hi = max_t(int, lkb->lkb_rqmode, hi);
David Teiglande7fd4172006-01-18 09:30:29 +00001706 }
1707
1708 if (grant_restart)
1709 goto restart;
1710 if (demote_restart && !quit) {
1711 quit = 1;
1712 goto restart;
1713 }
1714
1715 return max_t(int, high, hi);
1716}
1717
1718static int grant_pending_wait(struct dlm_rsb *r, int high)
1719{
1720 struct dlm_lkb *lkb, *s;
1721
1722 list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
David Teiglandc85d65e2007-05-18 09:01:26 -05001723 if (can_be_granted(r, lkb, 0, NULL))
David Teiglande7fd4172006-01-18 09:30:29 +00001724 grant_lock_pending(r, lkb);
1725 else
1726 high = max_t(int, lkb->lkb_rqmode, high);
1727 }
1728
1729 return high;
1730}
1731
1732static void grant_pending_locks(struct dlm_rsb *r)
1733{
1734 struct dlm_lkb *lkb, *s;
1735 int high = DLM_LOCK_IV;
1736
David Teiglanda345da32006-08-18 11:54:25 -05001737 DLM_ASSERT(is_master(r), dlm_dump_rsb(r););
David Teiglande7fd4172006-01-18 09:30:29 +00001738
1739 high = grant_pending_convert(r, high);
1740 high = grant_pending_wait(r, high);
1741
1742 if (high == DLM_LOCK_IV)
1743 return;
1744
1745 /*
1746 * If there are locks left on the wait/convert queue then send blocking
1747 * ASTs to granted locks based on the largest requested mode (high)
David Teigland3bcd3682006-02-23 09:56:38 +00001748 * found above. FIXME: highbast < high comparison not valid for PR/CW.
David Teiglande7fd4172006-01-18 09:30:29 +00001749 */
1750
1751 list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) {
1752 if (lkb->lkb_bastaddr && (lkb->lkb_highbast < high) &&
1753 !__dlm_compat_matrix[lkb->lkb_grmode+1][high+1]) {
1754 queue_bast(r, lkb, high);
1755 lkb->lkb_highbast = high;
1756 }
1757 }
1758}
1759
1760static void send_bast_queue(struct dlm_rsb *r, struct list_head *head,
1761 struct dlm_lkb *lkb)
1762{
1763 struct dlm_lkb *gr;
1764
1765 list_for_each_entry(gr, head, lkb_statequeue) {
1766 if (gr->lkb_bastaddr &&
1767 gr->lkb_highbast < lkb->lkb_rqmode &&
David Teigland3bcd3682006-02-23 09:56:38 +00001768 !modes_compat(gr, lkb)) {
David Teiglande7fd4172006-01-18 09:30:29 +00001769 queue_bast(r, gr, lkb->lkb_rqmode);
1770 gr->lkb_highbast = lkb->lkb_rqmode;
1771 }
1772 }
1773}
1774
1775static void send_blocking_asts(struct dlm_rsb *r, struct dlm_lkb *lkb)
1776{
1777 send_bast_queue(r, &r->res_grantqueue, lkb);
1778}
1779
1780static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb)
1781{
1782 send_bast_queue(r, &r->res_grantqueue, lkb);
1783 send_bast_queue(r, &r->res_convertqueue, lkb);
1784}
1785
1786/* set_master(r, lkb) -- set the master nodeid of a resource
1787
1788 The purpose of this function is to set the nodeid field in the given
1789 lkb using the nodeid field in the given rsb. If the rsb's nodeid is
1790 known, it can just be copied to the lkb and the function will return
1791 0. If the rsb's nodeid is _not_ known, it needs to be looked up
1792 before it can be copied to the lkb.
1793
1794 When the rsb nodeid is being looked up remotely, the initial lkb
1795 causing the lookup is kept on the ls_waiters list waiting for the
1796 lookup reply. Other lkb's waiting for the same rsb lookup are kept
1797 on the rsb's res_lookup list until the master is verified.
1798
1799 Return values:
1800 0: nodeid is set in rsb/lkb and the caller should go ahead and use it
1801 1: the rsb master is not available and the lkb has been placed on
1802 a wait queue
1803*/
1804
1805static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
1806{
1807 struct dlm_ls *ls = r->res_ls;
1808 int error, dir_nodeid, ret_nodeid, our_nodeid = dlm_our_nodeid();
1809
1810 if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) {
1811 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
1812 r->res_first_lkid = lkb->lkb_id;
1813 lkb->lkb_nodeid = r->res_nodeid;
1814 return 0;
1815 }
1816
1817 if (r->res_first_lkid && r->res_first_lkid != lkb->lkb_id) {
1818 list_add_tail(&lkb->lkb_rsb_lookup, &r->res_lookup);
1819 return 1;
1820 }
1821
1822 if (r->res_nodeid == 0) {
1823 lkb->lkb_nodeid = 0;
1824 return 0;
1825 }
1826
1827 if (r->res_nodeid > 0) {
1828 lkb->lkb_nodeid = r->res_nodeid;
1829 return 0;
1830 }
1831
David Teiglanda345da32006-08-18 11:54:25 -05001832 DLM_ASSERT(r->res_nodeid == -1, dlm_dump_rsb(r););
David Teiglande7fd4172006-01-18 09:30:29 +00001833
1834 dir_nodeid = dlm_dir_nodeid(r);
1835
1836 if (dir_nodeid != our_nodeid) {
1837 r->res_first_lkid = lkb->lkb_id;
1838 send_lookup(r, lkb);
1839 return 1;
1840 }
1841
1842 for (;;) {
1843 /* It's possible for dlm_scand to remove an old rsb for
1844 this same resource from the toss list, us to create
1845 a new one, look up the master locally, and find it
1846 already exists just before dlm_scand does the
1847 dir_remove() on the previous rsb. */
1848
1849 error = dlm_dir_lookup(ls, our_nodeid, r->res_name,
1850 r->res_length, &ret_nodeid);
1851 if (!error)
1852 break;
1853 log_debug(ls, "dir_lookup error %d %s", error, r->res_name);
1854 schedule();
1855 }
1856
1857 if (ret_nodeid == our_nodeid) {
1858 r->res_first_lkid = 0;
1859 r->res_nodeid = 0;
1860 lkb->lkb_nodeid = 0;
1861 } else {
1862 r->res_first_lkid = lkb->lkb_id;
1863 r->res_nodeid = ret_nodeid;
1864 lkb->lkb_nodeid = ret_nodeid;
1865 }
1866 return 0;
1867}
1868
1869static void process_lookup_list(struct dlm_rsb *r)
1870{
1871 struct dlm_lkb *lkb, *safe;
1872
1873 list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) {
David Teiglandef0c2bb2007-03-28 09:56:46 -05001874 list_del_init(&lkb->lkb_rsb_lookup);
David Teiglande7fd4172006-01-18 09:30:29 +00001875 _request_lock(r, lkb);
1876 schedule();
1877 }
1878}
1879
1880/* confirm_master -- confirm (or deny) an rsb's master nodeid */
1881
1882static void confirm_master(struct dlm_rsb *r, int error)
1883{
1884 struct dlm_lkb *lkb;
1885
1886 if (!r->res_first_lkid)
1887 return;
1888
1889 switch (error) {
1890 case 0:
1891 case -EINPROGRESS:
1892 r->res_first_lkid = 0;
1893 process_lookup_list(r);
1894 break;
1895
1896 case -EAGAIN:
1897 /* the remote master didn't queue our NOQUEUE request;
1898 make a waiting lkb the first_lkid */
1899
1900 r->res_first_lkid = 0;
1901
1902 if (!list_empty(&r->res_lookup)) {
1903 lkb = list_entry(r->res_lookup.next, struct dlm_lkb,
1904 lkb_rsb_lookup);
David Teiglandef0c2bb2007-03-28 09:56:46 -05001905 list_del_init(&lkb->lkb_rsb_lookup);
David Teiglande7fd4172006-01-18 09:30:29 +00001906 r->res_first_lkid = lkb->lkb_id;
1907 _request_lock(r, lkb);
1908 } else
1909 r->res_nodeid = -1;
1910 break;
1911
1912 default:
1913 log_error(r->res_ls, "confirm_master unknown error %d", error);
1914 }
1915}
1916
1917static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags,
David Teiglandd7db9232007-05-18 09:00:32 -05001918 int namelen, unsigned long timeout_cs, void *ast,
David Teigland3bcd3682006-02-23 09:56:38 +00001919 void *astarg, void *bast, struct dlm_args *args)
David Teiglande7fd4172006-01-18 09:30:29 +00001920{
1921 int rv = -EINVAL;
1922
1923 /* check for invalid arg usage */
1924
1925 if (mode < 0 || mode > DLM_LOCK_EX)
1926 goto out;
1927
1928 if (!(flags & DLM_LKF_CONVERT) && (namelen > DLM_RESNAME_MAXLEN))
1929 goto out;
1930
1931 if (flags & DLM_LKF_CANCEL)
1932 goto out;
1933
1934 if (flags & DLM_LKF_QUECVT && !(flags & DLM_LKF_CONVERT))
1935 goto out;
1936
1937 if (flags & DLM_LKF_CONVDEADLK && !(flags & DLM_LKF_CONVERT))
1938 goto out;
1939
1940 if (flags & DLM_LKF_CONVDEADLK && flags & DLM_LKF_NOQUEUE)
1941 goto out;
1942
1943 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_CONVERT)
1944 goto out;
1945
1946 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_QUECVT)
1947 goto out;
1948
1949 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_NOQUEUE)
1950 goto out;
1951
1952 if (flags & DLM_LKF_EXPEDITE && mode != DLM_LOCK_NL)
1953 goto out;
1954
1955 if (!ast || !lksb)
1956 goto out;
1957
1958 if (flags & DLM_LKF_VALBLK && !lksb->sb_lvbptr)
1959 goto out;
1960
David Teiglande7fd4172006-01-18 09:30:29 +00001961 if (flags & DLM_LKF_CONVERT && !lksb->sb_lkid)
1962 goto out;
1963
1964 /* these args will be copied to the lkb in validate_lock_args,
1965 it cannot be done now because when converting locks, fields in
1966 an active lkb cannot be modified before locking the rsb */
1967
1968 args->flags = flags;
1969 args->astaddr = ast;
1970 args->astparam = (long) astarg;
1971 args->bastaddr = bast;
David Teiglandd7db9232007-05-18 09:00:32 -05001972 args->timeout = timeout_cs;
David Teiglande7fd4172006-01-18 09:30:29 +00001973 args->mode = mode;
1974 args->lksb = lksb;
David Teiglande7fd4172006-01-18 09:30:29 +00001975 rv = 0;
1976 out:
1977 return rv;
1978}
1979
1980static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args)
1981{
1982 if (flags & ~(DLM_LKF_CANCEL | DLM_LKF_VALBLK | DLM_LKF_IVVALBLK |
1983 DLM_LKF_FORCEUNLOCK))
1984 return -EINVAL;
1985
David Teiglandef0c2bb2007-03-28 09:56:46 -05001986 if (flags & DLM_LKF_CANCEL && flags & DLM_LKF_FORCEUNLOCK)
1987 return -EINVAL;
1988
David Teiglande7fd4172006-01-18 09:30:29 +00001989 args->flags = flags;
1990 args->astparam = (long) astarg;
1991 return 0;
1992}
1993
1994static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
1995 struct dlm_args *args)
1996{
1997 int rv = -EINVAL;
1998
1999 if (args->flags & DLM_LKF_CONVERT) {
2000 if (lkb->lkb_flags & DLM_IFL_MSTCPY)
2001 goto out;
2002
2003 if (args->flags & DLM_LKF_QUECVT &&
2004 !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1])
2005 goto out;
2006
2007 rv = -EBUSY;
2008 if (lkb->lkb_status != DLM_LKSTS_GRANTED)
2009 goto out;
2010
2011 if (lkb->lkb_wait_type)
2012 goto out;
David Teiglandef0c2bb2007-03-28 09:56:46 -05002013
2014 if (is_overlap(lkb))
2015 goto out;
David Teiglande7fd4172006-01-18 09:30:29 +00002016 }
2017
2018 lkb->lkb_exflags = args->flags;
2019 lkb->lkb_sbflags = 0;
2020 lkb->lkb_astaddr = args->astaddr;
2021 lkb->lkb_astparam = args->astparam;
2022 lkb->lkb_bastaddr = args->bastaddr;
2023 lkb->lkb_rqmode = args->mode;
2024 lkb->lkb_lksb = args->lksb;
2025 lkb->lkb_lvbptr = args->lksb->sb_lvbptr;
2026 lkb->lkb_ownpid = (int) current->pid;
David Teiglandd7db9232007-05-18 09:00:32 -05002027 lkb->lkb_timeout_cs = args->timeout;
David Teiglande7fd4172006-01-18 09:30:29 +00002028 rv = 0;
2029 out:
2030 return rv;
2031}
2032
David Teiglandef0c2bb2007-03-28 09:56:46 -05002033/* when dlm_unlock() sees -EBUSY with CANCEL/FORCEUNLOCK it returns 0
2034 for success */
2035
2036/* note: it's valid for lkb_nodeid/res_nodeid to be -1 when we get here
2037 because there may be a lookup in progress and it's valid to do
2038 cancel/unlockf on it */
2039
David Teiglande7fd4172006-01-18 09:30:29 +00002040static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
2041{
David Teiglandef0c2bb2007-03-28 09:56:46 -05002042 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
David Teiglande7fd4172006-01-18 09:30:29 +00002043 int rv = -EINVAL;
2044
David Teiglandef0c2bb2007-03-28 09:56:46 -05002045 if (lkb->lkb_flags & DLM_IFL_MSTCPY) {
2046 log_error(ls, "unlock on MSTCPY %x", lkb->lkb_id);
2047 dlm_print_lkb(lkb);
David Teiglande7fd4172006-01-18 09:30:29 +00002048 goto out;
David Teiglandef0c2bb2007-03-28 09:56:46 -05002049 }
David Teiglande7fd4172006-01-18 09:30:29 +00002050
David Teiglandef0c2bb2007-03-28 09:56:46 -05002051 /* an lkb may still exist even though the lock is EOL'ed due to a
2052 cancel, unlock or failed noqueue request; an app can't use these
2053 locks; return same error as if the lkid had not been found at all */
2054
2055 if (lkb->lkb_flags & DLM_IFL_ENDOFLIFE) {
2056 log_debug(ls, "unlock on ENDOFLIFE %x", lkb->lkb_id);
2057 rv = -ENOENT;
2058 goto out;
2059 }
2060
2061 /* an lkb may be waiting for an rsb lookup to complete where the
2062 lookup was initiated by another lock */
2063
2064 if (args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) {
2065 if (!list_empty(&lkb->lkb_rsb_lookup)) {
2066 log_debug(ls, "unlock on rsb_lookup %x", lkb->lkb_id);
2067 list_del_init(&lkb->lkb_rsb_lookup);
2068 queue_cast(lkb->lkb_resource, lkb,
2069 args->flags & DLM_LKF_CANCEL ?
2070 -DLM_ECANCEL : -DLM_EUNLOCK);
2071 unhold_lkb(lkb); /* undoes create_lkb() */
2072 rv = -EBUSY;
2073 goto out;
2074 }
2075 }
2076
2077 /* cancel not allowed with another cancel/unlock in progress */
2078
2079 if (args->flags & DLM_LKF_CANCEL) {
2080 if (lkb->lkb_exflags & DLM_LKF_CANCEL)
2081 goto out;
2082
2083 if (is_overlap(lkb))
2084 goto out;
2085
David Teigland3ae1acf2007-05-18 08:59:31 -05002086 /* don't let scand try to do a cancel */
2087 del_timeout(lkb);
2088
David Teiglandef0c2bb2007-03-28 09:56:46 -05002089 if (lkb->lkb_flags & DLM_IFL_RESEND) {
2090 lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
2091 rv = -EBUSY;
2092 goto out;
2093 }
2094
2095 switch (lkb->lkb_wait_type) {
2096 case DLM_MSG_LOOKUP:
2097 case DLM_MSG_REQUEST:
2098 lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
2099 rv = -EBUSY;
2100 goto out;
2101 case DLM_MSG_UNLOCK:
2102 case DLM_MSG_CANCEL:
2103 goto out;
2104 }
2105 /* add_to_waiters() will set OVERLAP_CANCEL */
David Teiglande7fd4172006-01-18 09:30:29 +00002106 goto out_ok;
David Teiglandef0c2bb2007-03-28 09:56:46 -05002107 }
David Teiglande7fd4172006-01-18 09:30:29 +00002108
David Teiglandef0c2bb2007-03-28 09:56:46 -05002109 /* do we need to allow a force-unlock if there's a normal unlock
2110 already in progress? in what conditions could the normal unlock
2111 fail such that we'd want to send a force-unlock to be sure? */
David Teiglande7fd4172006-01-18 09:30:29 +00002112
David Teiglandef0c2bb2007-03-28 09:56:46 -05002113 if (args->flags & DLM_LKF_FORCEUNLOCK) {
2114 if (lkb->lkb_exflags & DLM_LKF_FORCEUNLOCK)
2115 goto out;
David Teiglande7fd4172006-01-18 09:30:29 +00002116
David Teiglandef0c2bb2007-03-28 09:56:46 -05002117 if (is_overlap_unlock(lkb))
2118 goto out;
2119
David Teigland3ae1acf2007-05-18 08:59:31 -05002120 /* don't let scand try to do a cancel */
2121 del_timeout(lkb);
2122
David Teiglandef0c2bb2007-03-28 09:56:46 -05002123 if (lkb->lkb_flags & DLM_IFL_RESEND) {
2124 lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
2125 rv = -EBUSY;
2126 goto out;
2127 }
2128
2129 switch (lkb->lkb_wait_type) {
2130 case DLM_MSG_LOOKUP:
2131 case DLM_MSG_REQUEST:
2132 lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
2133 rv = -EBUSY;
2134 goto out;
2135 case DLM_MSG_UNLOCK:
2136 goto out;
2137 }
2138 /* add_to_waiters() will set OVERLAP_UNLOCK */
2139 goto out_ok;
2140 }
2141
2142 /* normal unlock not allowed if there's any op in progress */
David Teiglande7fd4172006-01-18 09:30:29 +00002143 rv = -EBUSY;
David Teiglandef0c2bb2007-03-28 09:56:46 -05002144 if (lkb->lkb_wait_type || lkb->lkb_wait_count)
David Teiglande7fd4172006-01-18 09:30:29 +00002145 goto out;
2146
2147 out_ok:
David Teiglandef0c2bb2007-03-28 09:56:46 -05002148 /* an overlapping op shouldn't blow away exflags from other op */
2149 lkb->lkb_exflags |= args->flags;
David Teiglande7fd4172006-01-18 09:30:29 +00002150 lkb->lkb_sbflags = 0;
2151 lkb->lkb_astparam = args->astparam;
David Teiglande7fd4172006-01-18 09:30:29 +00002152 rv = 0;
2153 out:
David Teiglandef0c2bb2007-03-28 09:56:46 -05002154 if (rv)
2155 log_debug(ls, "validate_unlock_args %d %x %x %x %x %d %s", rv,
2156 lkb->lkb_id, lkb->lkb_flags, lkb->lkb_exflags,
2157 args->flags, lkb->lkb_wait_type,
2158 lkb->lkb_resource->res_name);
David Teiglande7fd4172006-01-18 09:30:29 +00002159 return rv;
2160}
2161
2162/*
2163 * Four stage 4 varieties:
2164 * do_request(), do_convert(), do_unlock(), do_cancel()
2165 * These are called on the master node for the given lock and
2166 * from the central locking logic.
2167 */
2168
2169static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
2170{
2171 int error = 0;
2172
David Teiglandc85d65e2007-05-18 09:01:26 -05002173 if (can_be_granted(r, lkb, 1, NULL)) {
David Teiglande7fd4172006-01-18 09:30:29 +00002174 grant_lock(r, lkb);
2175 queue_cast(r, lkb, 0);
2176 goto out;
2177 }
2178
2179 if (can_be_queued(lkb)) {
2180 error = -EINPROGRESS;
2181 add_lkb(r, lkb, DLM_LKSTS_WAITING);
2182 send_blocking_asts(r, lkb);
David Teigland3ae1acf2007-05-18 08:59:31 -05002183 add_timeout(lkb);
David Teiglande7fd4172006-01-18 09:30:29 +00002184 goto out;
2185 }
2186
2187 error = -EAGAIN;
2188 if (force_blocking_asts(lkb))
2189 send_blocking_asts_all(r, lkb);
2190 queue_cast(r, lkb, -EAGAIN);
2191
2192 out:
2193 return error;
2194}
2195
2196static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2197{
2198 int error = 0;
David Teiglandc85d65e2007-05-18 09:01:26 -05002199 int deadlk = 0;
David Teiglande7fd4172006-01-18 09:30:29 +00002200
2201 /* changing an existing lock may allow others to be granted */
2202
David Teiglandc85d65e2007-05-18 09:01:26 -05002203 if (can_be_granted(r, lkb, 1, &deadlk)) {
David Teiglande7fd4172006-01-18 09:30:29 +00002204 grant_lock(r, lkb);
2205 queue_cast(r, lkb, 0);
2206 grant_pending_locks(r);
2207 goto out;
2208 }
2209
David Teiglandc85d65e2007-05-18 09:01:26 -05002210 /* can_be_granted() detected that this lock would block in a conversion
2211 deadlock, so we leave it on the granted queue and return EDEADLK in
2212 the ast for the convert. */
2213
2214 if (deadlk) {
2215 /* it's left on the granted queue */
2216 log_debug(r->res_ls, "deadlock %x node %d sts%d g%d r%d %s",
2217 lkb->lkb_id, lkb->lkb_nodeid, lkb->lkb_status,
2218 lkb->lkb_grmode, lkb->lkb_rqmode, r->res_name);
2219 revert_lock(r, lkb);
2220 queue_cast(r, lkb, -EDEADLK);
2221 error = -EDEADLK;
2222 goto out;
2223 }
2224
David Teigland7d3c1fe2007-04-19 10:30:41 -05002225 /* is_demoted() means the can_be_granted() above set the grmode
2226 to NL, and left us on the granted queue. This auto-demotion
2227 (due to CONVDEADLK) might mean other locks, and/or this lock, are
2228 now grantable. We have to try to grant other converting locks
2229 before we try again to grant this one. */
2230
2231 if (is_demoted(lkb)) {
2232 grant_pending_convert(r, DLM_LOCK_IV);
2233 if (_can_be_granted(r, lkb, 1)) {
2234 grant_lock(r, lkb);
2235 queue_cast(r, lkb, 0);
David Teiglande7fd4172006-01-18 09:30:29 +00002236 grant_pending_locks(r);
David Teigland7d3c1fe2007-04-19 10:30:41 -05002237 goto out;
2238 }
2239 /* else fall through and move to convert queue */
2240 }
2241
2242 if (can_be_queued(lkb)) {
David Teiglande7fd4172006-01-18 09:30:29 +00002243 error = -EINPROGRESS;
2244 del_lkb(r, lkb);
2245 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
2246 send_blocking_asts(r, lkb);
David Teigland3ae1acf2007-05-18 08:59:31 -05002247 add_timeout(lkb);
David Teiglande7fd4172006-01-18 09:30:29 +00002248 goto out;
2249 }
2250
2251 error = -EAGAIN;
2252 if (force_blocking_asts(lkb))
2253 send_blocking_asts_all(r, lkb);
2254 queue_cast(r, lkb, -EAGAIN);
2255
2256 out:
2257 return error;
2258}
2259
2260static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2261{
2262 remove_lock(r, lkb);
2263 queue_cast(r, lkb, -DLM_EUNLOCK);
2264 grant_pending_locks(r);
2265 return -DLM_EUNLOCK;
2266}
2267
David Teiglandef0c2bb2007-03-28 09:56:46 -05002268/* returns: 0 did nothing, -DLM_ECANCEL canceled lock */
Steven Whitehouse907b9bc2006-09-25 09:26:04 -04002269
David Teiglande7fd4172006-01-18 09:30:29 +00002270static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2271{
David Teiglandef0c2bb2007-03-28 09:56:46 -05002272 int error;
2273
2274 error = revert_lock(r, lkb);
2275 if (error) {
2276 queue_cast(r, lkb, -DLM_ECANCEL);
2277 grant_pending_locks(r);
2278 return -DLM_ECANCEL;
2279 }
2280 return 0;
David Teiglande7fd4172006-01-18 09:30:29 +00002281}
2282
2283/*
2284 * Four stage 3 varieties:
2285 * _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock()
2286 */
2287
2288/* add a new lkb to a possibly new rsb, called by requesting process */
2289
2290static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2291{
2292 int error;
2293
2294 /* set_master: sets lkb nodeid from r */
2295
2296 error = set_master(r, lkb);
2297 if (error < 0)
2298 goto out;
2299 if (error) {
2300 error = 0;
2301 goto out;
2302 }
2303
2304 if (is_remote(r))
2305 /* receive_request() calls do_request() on remote node */
2306 error = send_request(r, lkb);
2307 else
2308 error = do_request(r, lkb);
2309 out:
2310 return error;
2311}
2312
David Teigland3bcd3682006-02-23 09:56:38 +00002313/* change some property of an existing lkb, e.g. mode */
David Teiglande7fd4172006-01-18 09:30:29 +00002314
2315static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2316{
2317 int error;
2318
2319 if (is_remote(r))
2320 /* receive_convert() calls do_convert() on remote node */
2321 error = send_convert(r, lkb);
2322 else
2323 error = do_convert(r, lkb);
2324
2325 return error;
2326}
2327
2328/* remove an existing lkb from the granted queue */
2329
2330static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2331{
2332 int error;
2333
2334 if (is_remote(r))
2335 /* receive_unlock() calls do_unlock() on remote node */
2336 error = send_unlock(r, lkb);
2337 else
2338 error = do_unlock(r, lkb);
2339
2340 return error;
2341}
2342
2343/* remove an existing lkb from the convert or wait queue */
2344
2345static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2346{
2347 int error;
2348
2349 if (is_remote(r))
2350 /* receive_cancel() calls do_cancel() on remote node */
2351 error = send_cancel(r, lkb);
2352 else
2353 error = do_cancel(r, lkb);
2354
2355 return error;
2356}
2357
2358/*
2359 * Four stage 2 varieties:
2360 * request_lock(), convert_lock(), unlock_lock(), cancel_lock()
2361 */
2362
2363static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, char *name,
2364 int len, struct dlm_args *args)
2365{
2366 struct dlm_rsb *r;
2367 int error;
2368
2369 error = validate_lock_args(ls, lkb, args);
2370 if (error)
2371 goto out;
2372
2373 error = find_rsb(ls, name, len, R_CREATE, &r);
2374 if (error)
2375 goto out;
2376
2377 lock_rsb(r);
2378
2379 attach_lkb(r, lkb);
2380 lkb->lkb_lksb->sb_lkid = lkb->lkb_id;
2381
2382 error = _request_lock(r, lkb);
2383
2384 unlock_rsb(r);
2385 put_rsb(r);
2386
2387 out:
2388 return error;
2389}
2390
2391static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2392 struct dlm_args *args)
2393{
2394 struct dlm_rsb *r;
2395 int error;
2396
2397 r = lkb->lkb_resource;
2398
2399 hold_rsb(r);
2400 lock_rsb(r);
2401
2402 error = validate_lock_args(ls, lkb, args);
2403 if (error)
2404 goto out;
2405
2406 error = _convert_lock(r, lkb);
2407 out:
2408 unlock_rsb(r);
2409 put_rsb(r);
2410 return error;
2411}
2412
2413static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2414 struct dlm_args *args)
2415{
2416 struct dlm_rsb *r;
2417 int error;
2418
2419 r = lkb->lkb_resource;
2420
2421 hold_rsb(r);
2422 lock_rsb(r);
2423
2424 error = validate_unlock_args(lkb, args);
2425 if (error)
2426 goto out;
2427
2428 error = _unlock_lock(r, lkb);
2429 out:
2430 unlock_rsb(r);
2431 put_rsb(r);
2432 return error;
2433}
2434
2435static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2436 struct dlm_args *args)
2437{
2438 struct dlm_rsb *r;
2439 int error;
2440
2441 r = lkb->lkb_resource;
2442
2443 hold_rsb(r);
2444 lock_rsb(r);
2445
2446 error = validate_unlock_args(lkb, args);
2447 if (error)
2448 goto out;
2449
2450 error = _cancel_lock(r, lkb);
2451 out:
2452 unlock_rsb(r);
2453 put_rsb(r);
2454 return error;
2455}
2456
2457/*
2458 * Two stage 1 varieties: dlm_lock() and dlm_unlock()
2459 */
2460
2461int dlm_lock(dlm_lockspace_t *lockspace,
2462 int mode,
2463 struct dlm_lksb *lksb,
2464 uint32_t flags,
2465 void *name,
2466 unsigned int namelen,
2467 uint32_t parent_lkid,
2468 void (*ast) (void *astarg),
2469 void *astarg,
David Teigland3bcd3682006-02-23 09:56:38 +00002470 void (*bast) (void *astarg, int mode))
David Teiglande7fd4172006-01-18 09:30:29 +00002471{
2472 struct dlm_ls *ls;
2473 struct dlm_lkb *lkb;
2474 struct dlm_args args;
2475 int error, convert = flags & DLM_LKF_CONVERT;
2476
2477 ls = dlm_find_lockspace_local(lockspace);
2478 if (!ls)
2479 return -EINVAL;
2480
David Teigland85e86ed2007-05-18 08:58:15 -05002481 dlm_lock_recovery(ls);
David Teiglande7fd4172006-01-18 09:30:29 +00002482
2483 if (convert)
2484 error = find_lkb(ls, lksb->sb_lkid, &lkb);
2485 else
2486 error = create_lkb(ls, &lkb);
2487
2488 if (error)
2489 goto out;
2490
David Teiglandd7db9232007-05-18 09:00:32 -05002491 error = set_lock_args(mode, lksb, flags, namelen, 0, ast,
David Teigland3bcd3682006-02-23 09:56:38 +00002492 astarg, bast, &args);
David Teiglande7fd4172006-01-18 09:30:29 +00002493 if (error)
2494 goto out_put;
2495
2496 if (convert)
2497 error = convert_lock(ls, lkb, &args);
2498 else
2499 error = request_lock(ls, lkb, name, namelen, &args);
2500
2501 if (error == -EINPROGRESS)
2502 error = 0;
2503 out_put:
2504 if (convert || error)
David Teiglandb3f58d82006-02-28 11:16:37 -05002505 __put_lkb(ls, lkb);
David Teiglandc85d65e2007-05-18 09:01:26 -05002506 if (error == -EAGAIN || error == -EDEADLK)
David Teiglande7fd4172006-01-18 09:30:29 +00002507 error = 0;
2508 out:
David Teigland85e86ed2007-05-18 08:58:15 -05002509 dlm_unlock_recovery(ls);
David Teiglande7fd4172006-01-18 09:30:29 +00002510 dlm_put_lockspace(ls);
2511 return error;
2512}
2513
2514int dlm_unlock(dlm_lockspace_t *lockspace,
2515 uint32_t lkid,
2516 uint32_t flags,
2517 struct dlm_lksb *lksb,
2518 void *astarg)
2519{
2520 struct dlm_ls *ls;
2521 struct dlm_lkb *lkb;
2522 struct dlm_args args;
2523 int error;
2524
2525 ls = dlm_find_lockspace_local(lockspace);
2526 if (!ls)
2527 return -EINVAL;
2528
David Teigland85e86ed2007-05-18 08:58:15 -05002529 dlm_lock_recovery(ls);
David Teiglande7fd4172006-01-18 09:30:29 +00002530
2531 error = find_lkb(ls, lkid, &lkb);
2532 if (error)
2533 goto out;
2534
2535 error = set_unlock_args(flags, astarg, &args);
2536 if (error)
2537 goto out_put;
2538
2539 if (flags & DLM_LKF_CANCEL)
2540 error = cancel_lock(ls, lkb, &args);
2541 else
2542 error = unlock_lock(ls, lkb, &args);
2543
2544 if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL)
2545 error = 0;
David Teiglandef0c2bb2007-03-28 09:56:46 -05002546 if (error == -EBUSY && (flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)))
2547 error = 0;
David Teiglande7fd4172006-01-18 09:30:29 +00002548 out_put:
David Teiglandb3f58d82006-02-28 11:16:37 -05002549 dlm_put_lkb(lkb);
David Teiglande7fd4172006-01-18 09:30:29 +00002550 out:
David Teigland85e86ed2007-05-18 08:58:15 -05002551 dlm_unlock_recovery(ls);
David Teiglande7fd4172006-01-18 09:30:29 +00002552 dlm_put_lockspace(ls);
2553 return error;
2554}
2555
2556/*
2557 * send/receive routines for remote operations and replies
2558 *
2559 * send_args
2560 * send_common
2561 * send_request receive_request
2562 * send_convert receive_convert
2563 * send_unlock receive_unlock
2564 * send_cancel receive_cancel
2565 * send_grant receive_grant
2566 * send_bast receive_bast
2567 * send_lookup receive_lookup
2568 * send_remove receive_remove
2569 *
2570 * send_common_reply
2571 * receive_request_reply send_request_reply
2572 * receive_convert_reply send_convert_reply
2573 * receive_unlock_reply send_unlock_reply
2574 * receive_cancel_reply send_cancel_reply
2575 * receive_lookup_reply send_lookup_reply
2576 */
2577
David Teigland7e4dac32007-04-02 09:06:41 -05002578static int _create_message(struct dlm_ls *ls, int mb_len,
2579 int to_nodeid, int mstype,
2580 struct dlm_message **ms_ret,
2581 struct dlm_mhandle **mh_ret)
2582{
2583 struct dlm_message *ms;
2584 struct dlm_mhandle *mh;
2585 char *mb;
2586
2587 /* get_buffer gives us a message handle (mh) that we need to
2588 pass into lowcomms_commit and a message buffer (mb) that we
2589 write our data into */
2590
2591 mh = dlm_lowcomms_get_buffer(to_nodeid, mb_len, GFP_KERNEL, &mb);
2592 if (!mh)
2593 return -ENOBUFS;
2594
2595 memset(mb, 0, mb_len);
2596
2597 ms = (struct dlm_message *) mb;
2598
2599 ms->m_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
2600 ms->m_header.h_lockspace = ls->ls_global_id;
2601 ms->m_header.h_nodeid = dlm_our_nodeid();
2602 ms->m_header.h_length = mb_len;
2603 ms->m_header.h_cmd = DLM_MSG;
2604
2605 ms->m_type = mstype;
2606
2607 *mh_ret = mh;
2608 *ms_ret = ms;
2609 return 0;
2610}
2611
David Teiglande7fd4172006-01-18 09:30:29 +00002612static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
2613 int to_nodeid, int mstype,
2614 struct dlm_message **ms_ret,
2615 struct dlm_mhandle **mh_ret)
2616{
David Teiglande7fd4172006-01-18 09:30:29 +00002617 int mb_len = sizeof(struct dlm_message);
2618
2619 switch (mstype) {
2620 case DLM_MSG_REQUEST:
2621 case DLM_MSG_LOOKUP:
2622 case DLM_MSG_REMOVE:
2623 mb_len += r->res_length;
2624 break;
2625 case DLM_MSG_CONVERT:
2626 case DLM_MSG_UNLOCK:
2627 case DLM_MSG_REQUEST_REPLY:
2628 case DLM_MSG_CONVERT_REPLY:
2629 case DLM_MSG_GRANT:
2630 if (lkb && lkb->lkb_lvbptr)
2631 mb_len += r->res_ls->ls_lvblen;
2632 break;
2633 }
2634
David Teigland7e4dac32007-04-02 09:06:41 -05002635 return _create_message(r->res_ls, mb_len, to_nodeid, mstype,
2636 ms_ret, mh_ret);
David Teiglande7fd4172006-01-18 09:30:29 +00002637}
2638
2639/* further lowcomms enhancements or alternate implementations may make
2640 the return value from this function useful at some point */
2641
2642static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms)
2643{
2644 dlm_message_out(ms);
2645 dlm_lowcomms_commit_buffer(mh);
2646 return 0;
2647}
2648
2649static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
2650 struct dlm_message *ms)
2651{
2652 ms->m_nodeid = lkb->lkb_nodeid;
2653 ms->m_pid = lkb->lkb_ownpid;
2654 ms->m_lkid = lkb->lkb_id;
2655 ms->m_remid = lkb->lkb_remid;
2656 ms->m_exflags = lkb->lkb_exflags;
2657 ms->m_sbflags = lkb->lkb_sbflags;
2658 ms->m_flags = lkb->lkb_flags;
2659 ms->m_lvbseq = lkb->lkb_lvbseq;
2660 ms->m_status = lkb->lkb_status;
2661 ms->m_grmode = lkb->lkb_grmode;
2662 ms->m_rqmode = lkb->lkb_rqmode;
2663 ms->m_hash = r->res_hash;
2664
2665 /* m_result and m_bastmode are set from function args,
2666 not from lkb fields */
2667
2668 if (lkb->lkb_bastaddr)
2669 ms->m_asts |= AST_BAST;
2670 if (lkb->lkb_astaddr)
2671 ms->m_asts |= AST_COMP;
2672
David Teiglandda49f362006-12-13 10:38:45 -06002673 /* compare with switch in create_message; send_remove() doesn't
2674 use send_args() */
2675
2676 switch (ms->m_type) {
2677 case DLM_MSG_REQUEST:
2678 case DLM_MSG_LOOKUP:
David Teiglande7fd4172006-01-18 09:30:29 +00002679 memcpy(ms->m_extra, r->res_name, r->res_length);
David Teiglandda49f362006-12-13 10:38:45 -06002680 break;
2681 case DLM_MSG_CONVERT:
2682 case DLM_MSG_UNLOCK:
2683 case DLM_MSG_REQUEST_REPLY:
2684 case DLM_MSG_CONVERT_REPLY:
2685 case DLM_MSG_GRANT:
2686 if (!lkb->lkb_lvbptr)
2687 break;
David Teiglande7fd4172006-01-18 09:30:29 +00002688 memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
David Teiglandda49f362006-12-13 10:38:45 -06002689 break;
2690 }
David Teiglande7fd4172006-01-18 09:30:29 +00002691}
2692
2693static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
2694{
2695 struct dlm_message *ms;
2696 struct dlm_mhandle *mh;
2697 int to_nodeid, error;
2698
David Teiglandef0c2bb2007-03-28 09:56:46 -05002699 error = add_to_waiters(lkb, mstype);
2700 if (error)
2701 return error;
David Teiglande7fd4172006-01-18 09:30:29 +00002702
2703 to_nodeid = r->res_nodeid;
2704
2705 error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
2706 if (error)
2707 goto fail;
2708
2709 send_args(r, lkb, ms);
2710
2711 error = send_message(mh, ms);
2712 if (error)
2713 goto fail;
2714 return 0;
2715
2716 fail:
David Teiglandef0c2bb2007-03-28 09:56:46 -05002717 remove_from_waiters(lkb, msg_reply_type(mstype));
David Teiglande7fd4172006-01-18 09:30:29 +00002718 return error;
2719}
2720
2721static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
2722{
2723 return send_common(r, lkb, DLM_MSG_REQUEST);
2724}
2725
2726static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2727{
2728 int error;
2729
2730 error = send_common(r, lkb, DLM_MSG_CONVERT);
2731
2732 /* down conversions go without a reply from the master */
2733 if (!error && down_conversion(lkb)) {
David Teiglandef0c2bb2007-03-28 09:56:46 -05002734 remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY);
2735 r->res_ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
David Teiglande7fd4172006-01-18 09:30:29 +00002736 r->res_ls->ls_stub_ms.m_result = 0;
David Teigland32f105a2006-08-23 16:07:31 -04002737 r->res_ls->ls_stub_ms.m_flags = lkb->lkb_flags;
David Teiglande7fd4172006-01-18 09:30:29 +00002738 __receive_convert_reply(r, lkb, &r->res_ls->ls_stub_ms);
2739 }
2740
2741 return error;
2742}
2743
2744/* FIXME: if this lkb is the only lock we hold on the rsb, then set
2745 MASTER_UNCERTAIN to force the next request on the rsb to confirm
2746 that the master is still correct. */
2747
2748static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2749{
2750 return send_common(r, lkb, DLM_MSG_UNLOCK);
2751}
2752
2753static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2754{
2755 return send_common(r, lkb, DLM_MSG_CANCEL);
2756}
2757
2758static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
2759{
2760 struct dlm_message *ms;
2761 struct dlm_mhandle *mh;
2762 int to_nodeid, error;
2763
2764 to_nodeid = lkb->lkb_nodeid;
2765
2766 error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh);
2767 if (error)
2768 goto out;
2769
2770 send_args(r, lkb, ms);
2771
2772 ms->m_result = 0;
2773
2774 error = send_message(mh, ms);
2775 out:
2776 return error;
2777}
2778
2779static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
2780{
2781 struct dlm_message *ms;
2782 struct dlm_mhandle *mh;
2783 int to_nodeid, error;
2784
2785 to_nodeid = lkb->lkb_nodeid;
2786
2787 error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh);
2788 if (error)
2789 goto out;
2790
2791 send_args(r, lkb, ms);
2792
2793 ms->m_bastmode = mode;
2794
2795 error = send_message(mh, ms);
2796 out:
2797 return error;
2798}
2799
2800static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
2801{
2802 struct dlm_message *ms;
2803 struct dlm_mhandle *mh;
2804 int to_nodeid, error;
2805
David Teiglandef0c2bb2007-03-28 09:56:46 -05002806 error = add_to_waiters(lkb, DLM_MSG_LOOKUP);
2807 if (error)
2808 return error;
David Teiglande7fd4172006-01-18 09:30:29 +00002809
2810 to_nodeid = dlm_dir_nodeid(r);
2811
2812 error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh);
2813 if (error)
2814 goto fail;
2815
2816 send_args(r, lkb, ms);
2817
2818 error = send_message(mh, ms);
2819 if (error)
2820 goto fail;
2821 return 0;
2822
2823 fail:
David Teiglandef0c2bb2007-03-28 09:56:46 -05002824 remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
David Teiglande7fd4172006-01-18 09:30:29 +00002825 return error;
2826}
2827
2828static int send_remove(struct dlm_rsb *r)
2829{
2830 struct dlm_message *ms;
2831 struct dlm_mhandle *mh;
2832 int to_nodeid, error;
2833
2834 to_nodeid = dlm_dir_nodeid(r);
2835
2836 error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh);
2837 if (error)
2838 goto out;
2839
2840 memcpy(ms->m_extra, r->res_name, r->res_length);
2841 ms->m_hash = r->res_hash;
2842
2843 error = send_message(mh, ms);
2844 out:
2845 return error;
2846}
2847
2848static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
2849 int mstype, int rv)
2850{
2851 struct dlm_message *ms;
2852 struct dlm_mhandle *mh;
2853 int to_nodeid, error;
2854
2855 to_nodeid = lkb->lkb_nodeid;
2856
2857 error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
2858 if (error)
2859 goto out;
2860
2861 send_args(r, lkb, ms);
2862
2863 ms->m_result = rv;
2864
2865 error = send_message(mh, ms);
2866 out:
2867 return error;
2868}
2869
2870static int send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2871{
2872 return send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv);
2873}
2874
2875static int send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2876{
2877 return send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv);
2878}
2879
2880static int send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2881{
2882 return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv);
2883}
2884
2885static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2886{
2887 return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv);
2888}
2889
2890static int send_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms_in,
2891 int ret_nodeid, int rv)
2892{
2893 struct dlm_rsb *r = &ls->ls_stub_rsb;
2894 struct dlm_message *ms;
2895 struct dlm_mhandle *mh;
2896 int error, nodeid = ms_in->m_header.h_nodeid;
2897
2898 error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh);
2899 if (error)
2900 goto out;
2901
2902 ms->m_lkid = ms_in->m_lkid;
2903 ms->m_result = rv;
2904 ms->m_nodeid = ret_nodeid;
2905
2906 error = send_message(mh, ms);
2907 out:
2908 return error;
2909}
2910
2911/* which args we save from a received message depends heavily on the type
2912 of message, unlike the send side where we can safely send everything about
2913 the lkb for any type of message */
2914
2915static void receive_flags(struct dlm_lkb *lkb, struct dlm_message *ms)
2916{
2917 lkb->lkb_exflags = ms->m_exflags;
David Teigland6f90a8b12006-11-10 14:16:27 -06002918 lkb->lkb_sbflags = ms->m_sbflags;
David Teiglande7fd4172006-01-18 09:30:29 +00002919 lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
2920 (ms->m_flags & 0x0000FFFF);
2921}
2922
2923static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
2924{
2925 lkb->lkb_sbflags = ms->m_sbflags;
2926 lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
2927 (ms->m_flags & 0x0000FFFF);
2928}
2929
2930static int receive_extralen(struct dlm_message *ms)
2931{
2932 return (ms->m_header.h_length - sizeof(struct dlm_message));
2933}
2934
David Teiglande7fd4172006-01-18 09:30:29 +00002935static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
2936 struct dlm_message *ms)
2937{
2938 int len;
2939
2940 if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
2941 if (!lkb->lkb_lvbptr)
2942 lkb->lkb_lvbptr = allocate_lvb(ls);
2943 if (!lkb->lkb_lvbptr)
2944 return -ENOMEM;
2945 len = receive_extralen(ms);
2946 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
2947 }
2948 return 0;
2949}
2950
2951static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2952 struct dlm_message *ms)
2953{
2954 lkb->lkb_nodeid = ms->m_header.h_nodeid;
2955 lkb->lkb_ownpid = ms->m_pid;
2956 lkb->lkb_remid = ms->m_lkid;
2957 lkb->lkb_grmode = DLM_LOCK_IV;
2958 lkb->lkb_rqmode = ms->m_rqmode;
2959 lkb->lkb_bastaddr = (void *) (long) (ms->m_asts & AST_BAST);
2960 lkb->lkb_astaddr = (void *) (long) (ms->m_asts & AST_COMP);
2961
2962 DLM_ASSERT(is_master_copy(lkb), dlm_print_lkb(lkb););
2963
David Teigland8d07fd52006-12-13 10:39:20 -06002964 if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
2965 /* lkb was just created so there won't be an lvb yet */
2966 lkb->lkb_lvbptr = allocate_lvb(ls);
2967 if (!lkb->lkb_lvbptr)
2968 return -ENOMEM;
2969 }
David Teiglande7fd4172006-01-18 09:30:29 +00002970
2971 return 0;
2972}
2973
2974static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2975 struct dlm_message *ms)
2976{
2977 if (lkb->lkb_nodeid != ms->m_header.h_nodeid) {
2978 log_error(ls, "convert_args nodeid %d %d lkid %x %x",
2979 lkb->lkb_nodeid, ms->m_header.h_nodeid,
2980 lkb->lkb_id, lkb->lkb_remid);
2981 return -EINVAL;
2982 }
2983
2984 if (!is_master_copy(lkb))
2985 return -EINVAL;
2986
2987 if (lkb->lkb_status != DLM_LKSTS_GRANTED)
2988 return -EBUSY;
2989
David Teiglande7fd4172006-01-18 09:30:29 +00002990 if (receive_lvb(ls, lkb, ms))
2991 return -ENOMEM;
2992
2993 lkb->lkb_rqmode = ms->m_rqmode;
2994 lkb->lkb_lvbseq = ms->m_lvbseq;
2995
2996 return 0;
2997}
2998
2999static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3000 struct dlm_message *ms)
3001{
3002 if (!is_master_copy(lkb))
3003 return -EINVAL;
3004 if (receive_lvb(ls, lkb, ms))
3005 return -ENOMEM;
3006 return 0;
3007}
3008
3009/* We fill in the stub-lkb fields with the info that send_xxxx_reply()
3010 uses to send a reply and that the remote end uses to process the reply. */
3011
3012static void setup_stub_lkb(struct dlm_ls *ls, struct dlm_message *ms)
3013{
3014 struct dlm_lkb *lkb = &ls->ls_stub_lkb;
3015 lkb->lkb_nodeid = ms->m_header.h_nodeid;
3016 lkb->lkb_remid = ms->m_lkid;
3017}
3018
3019static void receive_request(struct dlm_ls *ls, struct dlm_message *ms)
3020{
3021 struct dlm_lkb *lkb;
3022 struct dlm_rsb *r;
3023 int error, namelen;
3024
3025 error = create_lkb(ls, &lkb);
3026 if (error)
3027 goto fail;
3028
3029 receive_flags(lkb, ms);
3030 lkb->lkb_flags |= DLM_IFL_MSTCPY;
3031 error = receive_request_args(ls, lkb, ms);
3032 if (error) {
David Teiglandb3f58d82006-02-28 11:16:37 -05003033 __put_lkb(ls, lkb);
David Teiglande7fd4172006-01-18 09:30:29 +00003034 goto fail;
3035 }
3036
3037 namelen = receive_extralen(ms);
3038
3039 error = find_rsb(ls, ms->m_extra, namelen, R_MASTER, &r);
3040 if (error) {
David Teiglandb3f58d82006-02-28 11:16:37 -05003041 __put_lkb(ls, lkb);
David Teiglande7fd4172006-01-18 09:30:29 +00003042 goto fail;
3043 }
3044
3045 lock_rsb(r);
3046
3047 attach_lkb(r, lkb);
3048 error = do_request(r, lkb);
3049 send_request_reply(r, lkb, error);
3050
3051 unlock_rsb(r);
3052 put_rsb(r);
3053
3054 if (error == -EINPROGRESS)
3055 error = 0;
3056 if (error)
David Teiglandb3f58d82006-02-28 11:16:37 -05003057 dlm_put_lkb(lkb);
David Teiglande7fd4172006-01-18 09:30:29 +00003058 return;
3059
3060 fail:
3061 setup_stub_lkb(ls, ms);
3062 send_request_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3063}
3064
3065static void receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
3066{
3067 struct dlm_lkb *lkb;
3068 struct dlm_rsb *r;
David Teigland90135922006-01-20 08:47:07 +00003069 int error, reply = 1;
David Teiglande7fd4172006-01-18 09:30:29 +00003070
3071 error = find_lkb(ls, ms->m_remid, &lkb);
3072 if (error)
3073 goto fail;
3074
3075 r = lkb->lkb_resource;
3076
3077 hold_rsb(r);
3078 lock_rsb(r);
3079
3080 receive_flags(lkb, ms);
3081 error = receive_convert_args(ls, lkb, ms);
3082 if (error)
3083 goto out;
3084 reply = !down_conversion(lkb);
3085
3086 error = do_convert(r, lkb);
3087 out:
3088 if (reply)
3089 send_convert_reply(r, lkb, error);
3090
3091 unlock_rsb(r);
3092 put_rsb(r);
David Teiglandb3f58d82006-02-28 11:16:37 -05003093 dlm_put_lkb(lkb);
David Teiglande7fd4172006-01-18 09:30:29 +00003094 return;
3095
3096 fail:
3097 setup_stub_lkb(ls, ms);
3098 send_convert_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3099}
3100
3101static void receive_unlock(struct dlm_ls *ls, struct dlm_message *ms)
3102{
3103 struct dlm_lkb *lkb;
3104 struct dlm_rsb *r;
3105 int error;
3106
3107 error = find_lkb(ls, ms->m_remid, &lkb);
3108 if (error)
3109 goto fail;
3110
3111 r = lkb->lkb_resource;
3112
3113 hold_rsb(r);
3114 lock_rsb(r);
3115
3116 receive_flags(lkb, ms);
3117 error = receive_unlock_args(ls, lkb, ms);
3118 if (error)
3119 goto out;
3120
3121 error = do_unlock(r, lkb);
3122 out:
3123 send_unlock_reply(r, lkb, error);
3124
3125 unlock_rsb(r);
3126 put_rsb(r);
David Teiglandb3f58d82006-02-28 11:16:37 -05003127 dlm_put_lkb(lkb);
David Teiglande7fd4172006-01-18 09:30:29 +00003128 return;
3129
3130 fail:
3131 setup_stub_lkb(ls, ms);
3132 send_unlock_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3133}
3134
3135static void receive_cancel(struct dlm_ls *ls, struct dlm_message *ms)
3136{
3137 struct dlm_lkb *lkb;
3138 struct dlm_rsb *r;
3139 int error;
3140
3141 error = find_lkb(ls, ms->m_remid, &lkb);
3142 if (error)
3143 goto fail;
3144
3145 receive_flags(lkb, ms);
3146
3147 r = lkb->lkb_resource;
3148
3149 hold_rsb(r);
3150 lock_rsb(r);
3151
3152 error = do_cancel(r, lkb);
3153 send_cancel_reply(r, lkb, error);
3154
3155 unlock_rsb(r);
3156 put_rsb(r);
David Teiglandb3f58d82006-02-28 11:16:37 -05003157 dlm_put_lkb(lkb);
David Teiglande7fd4172006-01-18 09:30:29 +00003158 return;
3159
3160 fail:
3161 setup_stub_lkb(ls, ms);
3162 send_cancel_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3163}
3164
3165static void receive_grant(struct dlm_ls *ls, struct dlm_message *ms)
3166{
3167 struct dlm_lkb *lkb;
3168 struct dlm_rsb *r;
3169 int error;
3170
3171 error = find_lkb(ls, ms->m_remid, &lkb);
3172 if (error) {
3173 log_error(ls, "receive_grant no lkb");
3174 return;
3175 }
3176 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3177
3178 r = lkb->lkb_resource;
3179
3180 hold_rsb(r);
3181 lock_rsb(r);
3182
3183 receive_flags_reply(lkb, ms);
David Teigland7d3c1fe2007-04-19 10:30:41 -05003184 if (is_altmode(lkb))
3185 munge_altmode(lkb, ms);
David Teiglande7fd4172006-01-18 09:30:29 +00003186 grant_lock_pc(r, lkb, ms);
3187 queue_cast(r, lkb, 0);
3188
3189 unlock_rsb(r);
3190 put_rsb(r);
David Teiglandb3f58d82006-02-28 11:16:37 -05003191 dlm_put_lkb(lkb);
David Teiglande7fd4172006-01-18 09:30:29 +00003192}
3193
3194static void receive_bast(struct dlm_ls *ls, struct dlm_message *ms)
3195{
3196 struct dlm_lkb *lkb;
3197 struct dlm_rsb *r;
3198 int error;
3199
3200 error = find_lkb(ls, ms->m_remid, &lkb);
3201 if (error) {
3202 log_error(ls, "receive_bast no lkb");
3203 return;
3204 }
3205 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3206
3207 r = lkb->lkb_resource;
3208
3209 hold_rsb(r);
3210 lock_rsb(r);
3211
3212 queue_bast(r, lkb, ms->m_bastmode);
3213
3214 unlock_rsb(r);
3215 put_rsb(r);
David Teiglandb3f58d82006-02-28 11:16:37 -05003216 dlm_put_lkb(lkb);
David Teiglande7fd4172006-01-18 09:30:29 +00003217}
3218
3219static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms)
3220{
3221 int len, error, ret_nodeid, dir_nodeid, from_nodeid, our_nodeid;
3222
3223 from_nodeid = ms->m_header.h_nodeid;
3224 our_nodeid = dlm_our_nodeid();
3225
3226 len = receive_extralen(ms);
3227
3228 dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
3229 if (dir_nodeid != our_nodeid) {
3230 log_error(ls, "lookup dir_nodeid %d from %d",
3231 dir_nodeid, from_nodeid);
3232 error = -EINVAL;
3233 ret_nodeid = -1;
3234 goto out;
3235 }
3236
3237 error = dlm_dir_lookup(ls, from_nodeid, ms->m_extra, len, &ret_nodeid);
3238
3239 /* Optimization: we're master so treat lookup as a request */
3240 if (!error && ret_nodeid == our_nodeid) {
3241 receive_request(ls, ms);
3242 return;
3243 }
3244 out:
3245 send_lookup_reply(ls, ms, ret_nodeid, error);
3246}
3247
3248static void receive_remove(struct dlm_ls *ls, struct dlm_message *ms)
3249{
3250 int len, dir_nodeid, from_nodeid;
3251
3252 from_nodeid = ms->m_header.h_nodeid;
3253
3254 len = receive_extralen(ms);
3255
3256 dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
3257 if (dir_nodeid != dlm_our_nodeid()) {
3258 log_error(ls, "remove dir entry dir_nodeid %d from %d",
3259 dir_nodeid, from_nodeid);
3260 return;
3261 }
3262
3263 dlm_dir_remove_entry(ls, from_nodeid, ms->m_extra, len);
3264}
3265
David Teigland84991372007-03-30 15:02:40 -05003266static void receive_purge(struct dlm_ls *ls, struct dlm_message *ms)
3267{
3268 do_purge(ls, ms->m_nodeid, ms->m_pid);
3269}
3270
David Teiglande7fd4172006-01-18 09:30:29 +00003271static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
3272{
3273 struct dlm_lkb *lkb;
3274 struct dlm_rsb *r;
David Teiglandef0c2bb2007-03-28 09:56:46 -05003275 int error, mstype, result;
David Teiglande7fd4172006-01-18 09:30:29 +00003276
3277 error = find_lkb(ls, ms->m_remid, &lkb);
3278 if (error) {
3279 log_error(ls, "receive_request_reply no lkb");
3280 return;
3281 }
3282 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3283
David Teiglande7fd4172006-01-18 09:30:29 +00003284 r = lkb->lkb_resource;
3285 hold_rsb(r);
3286 lock_rsb(r);
3287
David Teiglandef0c2bb2007-03-28 09:56:46 -05003288 mstype = lkb->lkb_wait_type;
3289 error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY);
3290 if (error)
3291 goto out;
3292
David Teiglande7fd4172006-01-18 09:30:29 +00003293 /* Optimization: the dir node was also the master, so it took our
3294 lookup as a request and sent request reply instead of lookup reply */
3295 if (mstype == DLM_MSG_LOOKUP) {
3296 r->res_nodeid = ms->m_header.h_nodeid;
3297 lkb->lkb_nodeid = r->res_nodeid;
3298 }
3299
David Teiglandef0c2bb2007-03-28 09:56:46 -05003300 /* this is the value returned from do_request() on the master */
3301 result = ms->m_result;
3302
3303 switch (result) {
David Teiglande7fd4172006-01-18 09:30:29 +00003304 case -EAGAIN:
David Teiglandef0c2bb2007-03-28 09:56:46 -05003305 /* request would block (be queued) on remote master */
David Teiglande7fd4172006-01-18 09:30:29 +00003306 queue_cast(r, lkb, -EAGAIN);
3307 confirm_master(r, -EAGAIN);
David Teiglandef0c2bb2007-03-28 09:56:46 -05003308 unhold_lkb(lkb); /* undoes create_lkb() */
David Teiglande7fd4172006-01-18 09:30:29 +00003309 break;
3310
3311 case -EINPROGRESS:
3312 case 0:
3313 /* request was queued or granted on remote master */
3314 receive_flags_reply(lkb, ms);
3315 lkb->lkb_remid = ms->m_lkid;
David Teigland7d3c1fe2007-04-19 10:30:41 -05003316 if (is_altmode(lkb))
3317 munge_altmode(lkb, ms);
David Teigland3ae1acf2007-05-18 08:59:31 -05003318 if (result) {
David Teiglande7fd4172006-01-18 09:30:29 +00003319 add_lkb(r, lkb, DLM_LKSTS_WAITING);
David Teigland3ae1acf2007-05-18 08:59:31 -05003320 add_timeout(lkb);
3321 } else {
David Teiglande7fd4172006-01-18 09:30:29 +00003322 grant_lock_pc(r, lkb, ms);
3323 queue_cast(r, lkb, 0);
3324 }
David Teiglandef0c2bb2007-03-28 09:56:46 -05003325 confirm_master(r, result);
David Teiglande7fd4172006-01-18 09:30:29 +00003326 break;
3327
David Teigland597d0ca2006-07-12 16:44:04 -05003328 case -EBADR:
David Teiglande7fd4172006-01-18 09:30:29 +00003329 case -ENOTBLK:
3330 /* find_rsb failed to find rsb or rsb wasn't master */
David Teiglandef0c2bb2007-03-28 09:56:46 -05003331 log_debug(ls, "receive_request_reply %x %x master diff %d %d",
3332 lkb->lkb_id, lkb->lkb_flags, r->res_nodeid, result);
David Teiglande7fd4172006-01-18 09:30:29 +00003333 r->res_nodeid = -1;
3334 lkb->lkb_nodeid = -1;
David Teiglandef0c2bb2007-03-28 09:56:46 -05003335
3336 if (is_overlap(lkb)) {
3337 /* we'll ignore error in cancel/unlock reply */
3338 queue_cast_overlap(r, lkb);
3339 unhold_lkb(lkb); /* undoes create_lkb() */
3340 } else
3341 _request_lock(r, lkb);
David Teiglande7fd4172006-01-18 09:30:29 +00003342 break;
3343
3344 default:
David Teiglandef0c2bb2007-03-28 09:56:46 -05003345 log_error(ls, "receive_request_reply %x error %d",
3346 lkb->lkb_id, result);
David Teiglande7fd4172006-01-18 09:30:29 +00003347 }
3348
David Teiglandef0c2bb2007-03-28 09:56:46 -05003349 if (is_overlap_unlock(lkb) && (result == 0 || result == -EINPROGRESS)) {
3350 log_debug(ls, "receive_request_reply %x result %d unlock",
3351 lkb->lkb_id, result);
3352 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3353 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3354 send_unlock(r, lkb);
3355 } else if (is_overlap_cancel(lkb) && (result == -EINPROGRESS)) {
3356 log_debug(ls, "receive_request_reply %x cancel", lkb->lkb_id);
3357 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3358 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3359 send_cancel(r, lkb);
3360 } else {
3361 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3362 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3363 }
3364 out:
David Teiglande7fd4172006-01-18 09:30:29 +00003365 unlock_rsb(r);
3366 put_rsb(r);
David Teiglandb3f58d82006-02-28 11:16:37 -05003367 dlm_put_lkb(lkb);
David Teiglande7fd4172006-01-18 09:30:29 +00003368}
3369
3370static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
3371 struct dlm_message *ms)
3372{
David Teiglande7fd4172006-01-18 09:30:29 +00003373 /* this is the value returned from do_convert() on the master */
David Teiglandef0c2bb2007-03-28 09:56:46 -05003374 switch (ms->m_result) {
David Teiglande7fd4172006-01-18 09:30:29 +00003375 case -EAGAIN:
3376 /* convert would block (be queued) on remote master */
3377 queue_cast(r, lkb, -EAGAIN);
3378 break;
3379
David Teiglandc85d65e2007-05-18 09:01:26 -05003380 case -EDEADLK:
3381 receive_flags_reply(lkb, ms);
3382 revert_lock_pc(r, lkb);
3383 queue_cast(r, lkb, -EDEADLK);
3384 break;
3385
David Teiglande7fd4172006-01-18 09:30:29 +00003386 case -EINPROGRESS:
3387 /* convert was queued on remote master */
David Teigland7d3c1fe2007-04-19 10:30:41 -05003388 receive_flags_reply(lkb, ms);
3389 if (is_demoted(lkb))
3390 munge_demoted(lkb, ms);
David Teiglande7fd4172006-01-18 09:30:29 +00003391 del_lkb(r, lkb);
3392 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
David Teigland3ae1acf2007-05-18 08:59:31 -05003393 add_timeout(lkb);
David Teiglande7fd4172006-01-18 09:30:29 +00003394 break;
3395
3396 case 0:
3397 /* convert was granted on remote master */
3398 receive_flags_reply(lkb, ms);
David Teigland7d3c1fe2007-04-19 10:30:41 -05003399 if (is_demoted(lkb))
3400 munge_demoted(lkb, ms);
David Teiglande7fd4172006-01-18 09:30:29 +00003401 grant_lock_pc(r, lkb, ms);
3402 queue_cast(r, lkb, 0);
3403 break;
3404
3405 default:
David Teiglandef0c2bb2007-03-28 09:56:46 -05003406 log_error(r->res_ls, "receive_convert_reply %x error %d",
3407 lkb->lkb_id, ms->m_result);
David Teiglande7fd4172006-01-18 09:30:29 +00003408 }
3409}
3410
3411static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3412{
3413 struct dlm_rsb *r = lkb->lkb_resource;
David Teiglandef0c2bb2007-03-28 09:56:46 -05003414 int error;
David Teiglande7fd4172006-01-18 09:30:29 +00003415
3416 hold_rsb(r);
3417 lock_rsb(r);
3418
David Teiglandef0c2bb2007-03-28 09:56:46 -05003419 /* stub reply can happen with waiters_mutex held */
3420 error = remove_from_waiters_ms(lkb, ms);
3421 if (error)
3422 goto out;
David Teiglande7fd4172006-01-18 09:30:29 +00003423
David Teiglandef0c2bb2007-03-28 09:56:46 -05003424 __receive_convert_reply(r, lkb, ms);
3425 out:
David Teiglande7fd4172006-01-18 09:30:29 +00003426 unlock_rsb(r);
3427 put_rsb(r);
3428}
3429
3430static void receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms)
3431{
3432 struct dlm_lkb *lkb;
3433 int error;
3434
3435 error = find_lkb(ls, ms->m_remid, &lkb);
3436 if (error) {
3437 log_error(ls, "receive_convert_reply no lkb");
3438 return;
3439 }
3440 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3441
David Teiglande7fd4172006-01-18 09:30:29 +00003442 _receive_convert_reply(lkb, ms);
David Teiglandb3f58d82006-02-28 11:16:37 -05003443 dlm_put_lkb(lkb);
David Teiglande7fd4172006-01-18 09:30:29 +00003444}
3445
3446static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3447{
3448 struct dlm_rsb *r = lkb->lkb_resource;
David Teiglandef0c2bb2007-03-28 09:56:46 -05003449 int error;
David Teiglande7fd4172006-01-18 09:30:29 +00003450
3451 hold_rsb(r);
3452 lock_rsb(r);
3453
David Teiglandef0c2bb2007-03-28 09:56:46 -05003454 /* stub reply can happen with waiters_mutex held */
3455 error = remove_from_waiters_ms(lkb, ms);
3456 if (error)
3457 goto out;
3458
David Teiglande7fd4172006-01-18 09:30:29 +00003459 /* this is the value returned from do_unlock() on the master */
3460
David Teiglandef0c2bb2007-03-28 09:56:46 -05003461 switch (ms->m_result) {
David Teiglande7fd4172006-01-18 09:30:29 +00003462 case -DLM_EUNLOCK:
3463 receive_flags_reply(lkb, ms);
3464 remove_lock_pc(r, lkb);
3465 queue_cast(r, lkb, -DLM_EUNLOCK);
3466 break;
David Teiglandef0c2bb2007-03-28 09:56:46 -05003467 case -ENOENT:
3468 break;
David Teiglande7fd4172006-01-18 09:30:29 +00003469 default:
David Teiglandef0c2bb2007-03-28 09:56:46 -05003470 log_error(r->res_ls, "receive_unlock_reply %x error %d",
3471 lkb->lkb_id, ms->m_result);
David Teiglande7fd4172006-01-18 09:30:29 +00003472 }
David Teiglandef0c2bb2007-03-28 09:56:46 -05003473 out:
David Teiglande7fd4172006-01-18 09:30:29 +00003474 unlock_rsb(r);
3475 put_rsb(r);
3476}
3477
3478static void receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms)
3479{
3480 struct dlm_lkb *lkb;
3481 int error;
3482
3483 error = find_lkb(ls, ms->m_remid, &lkb);
3484 if (error) {
3485 log_error(ls, "receive_unlock_reply no lkb");
3486 return;
3487 }
3488 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3489
David Teiglande7fd4172006-01-18 09:30:29 +00003490 _receive_unlock_reply(lkb, ms);
David Teiglandb3f58d82006-02-28 11:16:37 -05003491 dlm_put_lkb(lkb);
David Teiglande7fd4172006-01-18 09:30:29 +00003492}
3493
3494static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3495{
3496 struct dlm_rsb *r = lkb->lkb_resource;
David Teiglandef0c2bb2007-03-28 09:56:46 -05003497 int error;
David Teiglande7fd4172006-01-18 09:30:29 +00003498
3499 hold_rsb(r);
3500 lock_rsb(r);
3501
David Teiglandef0c2bb2007-03-28 09:56:46 -05003502 /* stub reply can happen with waiters_mutex held */
3503 error = remove_from_waiters_ms(lkb, ms);
3504 if (error)
3505 goto out;
3506
David Teiglande7fd4172006-01-18 09:30:29 +00003507 /* this is the value returned from do_cancel() on the master */
3508
David Teiglandef0c2bb2007-03-28 09:56:46 -05003509 switch (ms->m_result) {
David Teiglande7fd4172006-01-18 09:30:29 +00003510 case -DLM_ECANCEL:
3511 receive_flags_reply(lkb, ms);
3512 revert_lock_pc(r, lkb);
David Teiglandef0c2bb2007-03-28 09:56:46 -05003513 if (ms->m_result)
3514 queue_cast(r, lkb, -DLM_ECANCEL);
3515 break;
3516 case 0:
David Teiglande7fd4172006-01-18 09:30:29 +00003517 break;
3518 default:
David Teiglandef0c2bb2007-03-28 09:56:46 -05003519 log_error(r->res_ls, "receive_cancel_reply %x error %d",
3520 lkb->lkb_id, ms->m_result);
David Teiglande7fd4172006-01-18 09:30:29 +00003521 }
David Teiglandef0c2bb2007-03-28 09:56:46 -05003522 out:
David Teiglande7fd4172006-01-18 09:30:29 +00003523 unlock_rsb(r);
3524 put_rsb(r);
3525}
3526
3527static void receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms)
3528{
3529 struct dlm_lkb *lkb;
3530 int error;
3531
3532 error = find_lkb(ls, ms->m_remid, &lkb);
3533 if (error) {
3534 log_error(ls, "receive_cancel_reply no lkb");
3535 return;
3536 }
3537 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3538
David Teiglande7fd4172006-01-18 09:30:29 +00003539 _receive_cancel_reply(lkb, ms);
David Teiglandb3f58d82006-02-28 11:16:37 -05003540 dlm_put_lkb(lkb);
David Teiglande7fd4172006-01-18 09:30:29 +00003541}
3542
3543static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
3544{
3545 struct dlm_lkb *lkb;
3546 struct dlm_rsb *r;
3547 int error, ret_nodeid;
3548
3549 error = find_lkb(ls, ms->m_lkid, &lkb);
3550 if (error) {
3551 log_error(ls, "receive_lookup_reply no lkb");
3552 return;
3553 }
3554
David Teiglandef0c2bb2007-03-28 09:56:46 -05003555 /* ms->m_result is the value returned by dlm_dir_lookup on dir node
David Teiglande7fd4172006-01-18 09:30:29 +00003556 FIXME: will a non-zero error ever be returned? */
David Teiglande7fd4172006-01-18 09:30:29 +00003557
3558 r = lkb->lkb_resource;
3559 hold_rsb(r);
3560 lock_rsb(r);
3561
David Teiglandef0c2bb2007-03-28 09:56:46 -05003562 error = remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
3563 if (error)
3564 goto out;
3565
David Teiglande7fd4172006-01-18 09:30:29 +00003566 ret_nodeid = ms->m_nodeid;
3567 if (ret_nodeid == dlm_our_nodeid()) {
3568 r->res_nodeid = 0;
3569 ret_nodeid = 0;
3570 r->res_first_lkid = 0;
3571 } else {
3572 /* set_master() will copy res_nodeid to lkb_nodeid */
3573 r->res_nodeid = ret_nodeid;
3574 }
3575
David Teiglandef0c2bb2007-03-28 09:56:46 -05003576 if (is_overlap(lkb)) {
3577 log_debug(ls, "receive_lookup_reply %x unlock %x",
3578 lkb->lkb_id, lkb->lkb_flags);
3579 queue_cast_overlap(r, lkb);
3580 unhold_lkb(lkb); /* undoes create_lkb() */
3581 goto out_list;
3582 }
3583
David Teiglande7fd4172006-01-18 09:30:29 +00003584 _request_lock(r, lkb);
3585
David Teiglandef0c2bb2007-03-28 09:56:46 -05003586 out_list:
David Teiglande7fd4172006-01-18 09:30:29 +00003587 if (!ret_nodeid)
3588 process_lookup_list(r);
David Teiglandef0c2bb2007-03-28 09:56:46 -05003589 out:
David Teiglande7fd4172006-01-18 09:30:29 +00003590 unlock_rsb(r);
3591 put_rsb(r);
David Teiglandb3f58d82006-02-28 11:16:37 -05003592 dlm_put_lkb(lkb);
David Teiglande7fd4172006-01-18 09:30:29 +00003593}
3594
3595int dlm_receive_message(struct dlm_header *hd, int nodeid, int recovery)
3596{
3597 struct dlm_message *ms = (struct dlm_message *) hd;
3598 struct dlm_ls *ls;
David Teigland8fd3a982007-01-24 10:11:45 -06003599 int error = 0;
David Teiglande7fd4172006-01-18 09:30:29 +00003600
3601 if (!recovery)
3602 dlm_message_in(ms);
3603
3604 ls = dlm_find_lockspace_global(hd->h_lockspace);
3605 if (!ls) {
3606 log_print("drop message %d from %d for unknown lockspace %d",
3607 ms->m_type, nodeid, hd->h_lockspace);
3608 return -EINVAL;
3609 }
3610
3611 /* recovery may have just ended leaving a bunch of backed-up requests
3612 in the requestqueue; wait while dlm_recoverd clears them */
3613
3614 if (!recovery)
3615 dlm_wait_requestqueue(ls);
3616
3617 /* recovery may have just started while there were a bunch of
3618 in-flight requests -- save them in requestqueue to be processed
3619 after recovery. we can't let dlm_recvd block on the recovery
3620 lock. if dlm_recoverd is calling this function to clear the
3621 requestqueue, it needs to be interrupted (-EINTR) if another
3622 recovery operation is starting. */
3623
3624 while (1) {
3625 if (dlm_locking_stopped(ls)) {
David Teiglandd4400152006-10-31 11:55:56 -06003626 if (recovery) {
3627 error = -EINTR;
3628 goto out;
3629 }
3630 error = dlm_add_requestqueue(ls, nodeid, hd);
3631 if (error == -EAGAIN)
3632 continue;
3633 else {
3634 error = -EINTR;
3635 goto out;
3636 }
David Teiglande7fd4172006-01-18 09:30:29 +00003637 }
3638
David Teigland85e86ed2007-05-18 08:58:15 -05003639 if (dlm_lock_recovery_try(ls))
David Teiglande7fd4172006-01-18 09:30:29 +00003640 break;
3641 schedule();
3642 }
3643
3644 switch (ms->m_type) {
3645
3646 /* messages sent to a master node */
3647
3648 case DLM_MSG_REQUEST:
3649 receive_request(ls, ms);
3650 break;
3651
3652 case DLM_MSG_CONVERT:
3653 receive_convert(ls, ms);
3654 break;
3655
3656 case DLM_MSG_UNLOCK:
3657 receive_unlock(ls, ms);
3658 break;
3659
3660 case DLM_MSG_CANCEL:
3661 receive_cancel(ls, ms);
3662 break;
3663
3664 /* messages sent from a master node (replies to above) */
3665
3666 case DLM_MSG_REQUEST_REPLY:
3667 receive_request_reply(ls, ms);
3668 break;
3669
3670 case DLM_MSG_CONVERT_REPLY:
3671 receive_convert_reply(ls, ms);
3672 break;
3673
3674 case DLM_MSG_UNLOCK_REPLY:
3675 receive_unlock_reply(ls, ms);
3676 break;
3677
3678 case DLM_MSG_CANCEL_REPLY:
3679 receive_cancel_reply(ls, ms);
3680 break;
3681
3682 /* messages sent from a master node (only two types of async msg) */
3683
3684 case DLM_MSG_GRANT:
3685 receive_grant(ls, ms);
3686 break;
3687
3688 case DLM_MSG_BAST:
3689 receive_bast(ls, ms);
3690 break;
3691
3692 /* messages sent to a dir node */
3693
3694 case DLM_MSG_LOOKUP:
3695 receive_lookup(ls, ms);
3696 break;
3697
3698 case DLM_MSG_REMOVE:
3699 receive_remove(ls, ms);
3700 break;
3701
3702 /* messages sent from a dir node (remove has no reply) */
3703
3704 case DLM_MSG_LOOKUP_REPLY:
3705 receive_lookup_reply(ls, ms);
3706 break;
3707
David Teigland84991372007-03-30 15:02:40 -05003708 /* other messages */
3709
3710 case DLM_MSG_PURGE:
3711 receive_purge(ls, ms);
3712 break;
3713
David Teiglande7fd4172006-01-18 09:30:29 +00003714 default:
3715 log_error(ls, "unknown message type %d", ms->m_type);
3716 }
3717
David Teigland85e86ed2007-05-18 08:58:15 -05003718 dlm_unlock_recovery(ls);
David Teiglande7fd4172006-01-18 09:30:29 +00003719 out:
3720 dlm_put_lockspace(ls);
3721 dlm_astd_wake();
David Teigland8fd3a982007-01-24 10:11:45 -06003722 return error;
David Teiglande7fd4172006-01-18 09:30:29 +00003723}
3724
3725
3726/*
3727 * Recovery related
3728 */
3729
3730static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb)
3731{
3732 if (middle_conversion(lkb)) {
3733 hold_lkb(lkb);
David Teiglandef0c2bb2007-03-28 09:56:46 -05003734 ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
David Teiglande7fd4172006-01-18 09:30:29 +00003735 ls->ls_stub_ms.m_result = -EINPROGRESS;
David Teigland075529b2006-12-13 10:40:26 -06003736 ls->ls_stub_ms.m_flags = lkb->lkb_flags;
David Teiglande7fd4172006-01-18 09:30:29 +00003737 _receive_convert_reply(lkb, &ls->ls_stub_ms);
3738
3739 /* Same special case as in receive_rcom_lock_args() */
3740 lkb->lkb_grmode = DLM_LOCK_IV;
3741 rsb_set_flag(lkb->lkb_resource, RSB_RECOVER_CONVERT);
3742 unhold_lkb(lkb);
3743
3744 } else if (lkb->lkb_rqmode >= lkb->lkb_grmode) {
3745 lkb->lkb_flags |= DLM_IFL_RESEND;
3746 }
3747
3748 /* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down
3749 conversions are async; there's no reply from the remote master */
3750}
3751
3752/* A waiting lkb needs recovery if the master node has failed, or
3753 the master node is changing (only when no directory is used) */
3754
3755static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb)
3756{
3757 if (dlm_is_removed(ls, lkb->lkb_nodeid))
3758 return 1;
3759
3760 if (!dlm_no_directory(ls))
3761 return 0;
3762
3763 if (dlm_dir_nodeid(lkb->lkb_resource) != lkb->lkb_nodeid)
3764 return 1;
3765
3766 return 0;
3767}
3768
3769/* Recovery for locks that are waiting for replies from nodes that are now
3770 gone. We can just complete unlocks and cancels by faking a reply from the
3771 dead node. Requests and up-conversions we flag to be resent after
3772 recovery. Down-conversions can just be completed with a fake reply like
3773 unlocks. Conversions between PR and CW need special attention. */
3774
3775void dlm_recover_waiters_pre(struct dlm_ls *ls)
3776{
3777 struct dlm_lkb *lkb, *safe;
3778
David Teigland90135922006-01-20 08:47:07 +00003779 mutex_lock(&ls->ls_waiters_mutex);
David Teiglande7fd4172006-01-18 09:30:29 +00003780
3781 list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
3782 log_debug(ls, "pre recover waiter lkid %x type %d flags %x",
3783 lkb->lkb_id, lkb->lkb_wait_type, lkb->lkb_flags);
3784
3785 /* all outstanding lookups, regardless of destination will be
3786 resent after recovery is done */
3787
3788 if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) {
3789 lkb->lkb_flags |= DLM_IFL_RESEND;
3790 continue;
3791 }
3792
3793 if (!waiter_needs_recovery(ls, lkb))
3794 continue;
3795
3796 switch (lkb->lkb_wait_type) {
3797
3798 case DLM_MSG_REQUEST:
3799 lkb->lkb_flags |= DLM_IFL_RESEND;
3800 break;
3801
3802 case DLM_MSG_CONVERT:
3803 recover_convert_waiter(ls, lkb);
3804 break;
3805
3806 case DLM_MSG_UNLOCK:
3807 hold_lkb(lkb);
David Teiglandef0c2bb2007-03-28 09:56:46 -05003808 ls->ls_stub_ms.m_type = DLM_MSG_UNLOCK_REPLY;
David Teiglande7fd4172006-01-18 09:30:29 +00003809 ls->ls_stub_ms.m_result = -DLM_EUNLOCK;
David Teigland075529b2006-12-13 10:40:26 -06003810 ls->ls_stub_ms.m_flags = lkb->lkb_flags;
David Teiglande7fd4172006-01-18 09:30:29 +00003811 _receive_unlock_reply(lkb, &ls->ls_stub_ms);
David Teiglandb3f58d82006-02-28 11:16:37 -05003812 dlm_put_lkb(lkb);
David Teiglande7fd4172006-01-18 09:30:29 +00003813 break;
3814
3815 case DLM_MSG_CANCEL:
3816 hold_lkb(lkb);
David Teiglandef0c2bb2007-03-28 09:56:46 -05003817 ls->ls_stub_ms.m_type = DLM_MSG_CANCEL_REPLY;
David Teiglande7fd4172006-01-18 09:30:29 +00003818 ls->ls_stub_ms.m_result = -DLM_ECANCEL;
David Teigland075529b2006-12-13 10:40:26 -06003819 ls->ls_stub_ms.m_flags = lkb->lkb_flags;
David Teiglande7fd4172006-01-18 09:30:29 +00003820 _receive_cancel_reply(lkb, &ls->ls_stub_ms);
David Teiglandb3f58d82006-02-28 11:16:37 -05003821 dlm_put_lkb(lkb);
David Teiglande7fd4172006-01-18 09:30:29 +00003822 break;
3823
3824 default:
3825 log_error(ls, "invalid lkb wait_type %d",
3826 lkb->lkb_wait_type);
3827 }
David Teigland81456802006-07-25 14:05:09 -05003828 schedule();
David Teiglande7fd4172006-01-18 09:30:29 +00003829 }
David Teigland90135922006-01-20 08:47:07 +00003830 mutex_unlock(&ls->ls_waiters_mutex);
David Teiglande7fd4172006-01-18 09:30:29 +00003831}
3832
David Teiglandef0c2bb2007-03-28 09:56:46 -05003833static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls)
David Teiglande7fd4172006-01-18 09:30:29 +00003834{
3835 struct dlm_lkb *lkb;
David Teiglandef0c2bb2007-03-28 09:56:46 -05003836 int found = 0;
David Teiglande7fd4172006-01-18 09:30:29 +00003837
David Teigland90135922006-01-20 08:47:07 +00003838 mutex_lock(&ls->ls_waiters_mutex);
David Teiglande7fd4172006-01-18 09:30:29 +00003839 list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
3840 if (lkb->lkb_flags & DLM_IFL_RESEND) {
David Teiglandef0c2bb2007-03-28 09:56:46 -05003841 hold_lkb(lkb);
3842 found = 1;
David Teiglande7fd4172006-01-18 09:30:29 +00003843 break;
3844 }
3845 }
David Teigland90135922006-01-20 08:47:07 +00003846 mutex_unlock(&ls->ls_waiters_mutex);
David Teiglande7fd4172006-01-18 09:30:29 +00003847
David Teiglandef0c2bb2007-03-28 09:56:46 -05003848 if (!found)
David Teiglande7fd4172006-01-18 09:30:29 +00003849 lkb = NULL;
David Teiglandef0c2bb2007-03-28 09:56:46 -05003850 return lkb;
David Teiglande7fd4172006-01-18 09:30:29 +00003851}
3852
3853/* Deal with lookups and lkb's marked RESEND from _pre. We may now be the
3854 master or dir-node for r. Processing the lkb may result in it being placed
3855 back on waiters. */
3856
David Teiglandef0c2bb2007-03-28 09:56:46 -05003857/* We do this after normal locking has been enabled and any saved messages
3858 (in requestqueue) have been processed. We should be confident that at
3859 this point we won't get or process a reply to any of these waiting
3860 operations. But, new ops may be coming in on the rsbs/locks here from
3861 userspace or remotely. */
3862
3863/* there may have been an overlap unlock/cancel prior to recovery or after
3864 recovery. if before, the lkb may still have a pos wait_count; if after, the
3865 overlap flag would just have been set and nothing new sent. we can be
3866 confident here than any replies to either the initial op or overlap ops
3867 prior to recovery have been received. */
3868
David Teiglande7fd4172006-01-18 09:30:29 +00003869int dlm_recover_waiters_post(struct dlm_ls *ls)
3870{
3871 struct dlm_lkb *lkb;
3872 struct dlm_rsb *r;
David Teiglandef0c2bb2007-03-28 09:56:46 -05003873 int error = 0, mstype, err, oc, ou;
David Teiglande7fd4172006-01-18 09:30:29 +00003874
3875 while (1) {
3876 if (dlm_locking_stopped(ls)) {
3877 log_debug(ls, "recover_waiters_post aborted");
3878 error = -EINTR;
3879 break;
3880 }
3881
David Teiglandef0c2bb2007-03-28 09:56:46 -05003882 lkb = find_resend_waiter(ls);
3883 if (!lkb)
David Teiglande7fd4172006-01-18 09:30:29 +00003884 break;
3885
3886 r = lkb->lkb_resource;
David Teiglandef0c2bb2007-03-28 09:56:46 -05003887 hold_rsb(r);
3888 lock_rsb(r);
3889
3890 mstype = lkb->lkb_wait_type;
3891 oc = is_overlap_cancel(lkb);
3892 ou = is_overlap_unlock(lkb);
3893 err = 0;
David Teiglande7fd4172006-01-18 09:30:29 +00003894
3895 log_debug(ls, "recover_waiters_post %x type %d flags %x %s",
3896 lkb->lkb_id, mstype, lkb->lkb_flags, r->res_name);
3897
David Teiglandef0c2bb2007-03-28 09:56:46 -05003898 /* At this point we assume that we won't get a reply to any
3899 previous op or overlap op on this lock. First, do a big
3900 remove_from_waiters() for all previous ops. */
David Teiglande7fd4172006-01-18 09:30:29 +00003901
David Teiglandef0c2bb2007-03-28 09:56:46 -05003902 lkb->lkb_flags &= ~DLM_IFL_RESEND;
3903 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3904 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3905 lkb->lkb_wait_type = 0;
3906 lkb->lkb_wait_count = 0;
3907 mutex_lock(&ls->ls_waiters_mutex);
3908 list_del_init(&lkb->lkb_wait_reply);
3909 mutex_unlock(&ls->ls_waiters_mutex);
3910 unhold_lkb(lkb); /* for waiters list */
David Teiglande7fd4172006-01-18 09:30:29 +00003911
David Teiglandef0c2bb2007-03-28 09:56:46 -05003912 if (oc || ou) {
3913 /* do an unlock or cancel instead of resending */
3914 switch (mstype) {
3915 case DLM_MSG_LOOKUP:
3916 case DLM_MSG_REQUEST:
3917 queue_cast(r, lkb, ou ? -DLM_EUNLOCK :
3918 -DLM_ECANCEL);
3919 unhold_lkb(lkb); /* undoes create_lkb() */
3920 break;
3921 case DLM_MSG_CONVERT:
3922 if (oc) {
3923 queue_cast(r, lkb, -DLM_ECANCEL);
3924 } else {
3925 lkb->lkb_exflags |= DLM_LKF_FORCEUNLOCK;
3926 _unlock_lock(r, lkb);
3927 }
3928 break;
3929 default:
3930 err = 1;
3931 }
3932 } else {
3933 switch (mstype) {
3934 case DLM_MSG_LOOKUP:
3935 case DLM_MSG_REQUEST:
3936 _request_lock(r, lkb);
3937 if (is_master(r))
3938 confirm_master(r, 0);
3939 break;
3940 case DLM_MSG_CONVERT:
3941 _convert_lock(r, lkb);
3942 break;
3943 default:
3944 err = 1;
3945 }
David Teiglande7fd4172006-01-18 09:30:29 +00003946 }
David Teiglandef0c2bb2007-03-28 09:56:46 -05003947
3948 if (err)
3949 log_error(ls, "recover_waiters_post %x %d %x %d %d",
3950 lkb->lkb_id, mstype, lkb->lkb_flags, oc, ou);
3951 unlock_rsb(r);
3952 put_rsb(r);
3953 dlm_put_lkb(lkb);
David Teiglande7fd4172006-01-18 09:30:29 +00003954 }
3955
3956 return error;
3957}
3958
3959static void purge_queue(struct dlm_rsb *r, struct list_head *queue,
3960 int (*test)(struct dlm_ls *ls, struct dlm_lkb *lkb))
3961{
3962 struct dlm_ls *ls = r->res_ls;
3963 struct dlm_lkb *lkb, *safe;
3964
3965 list_for_each_entry_safe(lkb, safe, queue, lkb_statequeue) {
3966 if (test(ls, lkb)) {
David Teigland97a35d12006-05-02 13:34:03 -04003967 rsb_set_flag(r, RSB_LOCKS_PURGED);
David Teiglande7fd4172006-01-18 09:30:29 +00003968 del_lkb(r, lkb);
3969 /* this put should free the lkb */
David Teiglandb3f58d82006-02-28 11:16:37 -05003970 if (!dlm_put_lkb(lkb))
David Teiglande7fd4172006-01-18 09:30:29 +00003971 log_error(ls, "purged lkb not released");
3972 }
3973 }
3974}
3975
3976static int purge_dead_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
3977{
3978 return (is_master_copy(lkb) && dlm_is_removed(ls, lkb->lkb_nodeid));
3979}
3980
3981static int purge_mstcpy_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
3982{
3983 return is_master_copy(lkb);
3984}
3985
3986static void purge_dead_locks(struct dlm_rsb *r)
3987{
3988 purge_queue(r, &r->res_grantqueue, &purge_dead_test);
3989 purge_queue(r, &r->res_convertqueue, &purge_dead_test);
3990 purge_queue(r, &r->res_waitqueue, &purge_dead_test);
3991}
3992
3993void dlm_purge_mstcpy_locks(struct dlm_rsb *r)
3994{
3995 purge_queue(r, &r->res_grantqueue, &purge_mstcpy_test);
3996 purge_queue(r, &r->res_convertqueue, &purge_mstcpy_test);
3997 purge_queue(r, &r->res_waitqueue, &purge_mstcpy_test);
3998}
3999
4000/* Get rid of locks held by nodes that are gone. */
4001
4002int dlm_purge_locks(struct dlm_ls *ls)
4003{
4004 struct dlm_rsb *r;
4005
4006 log_debug(ls, "dlm_purge_locks");
4007
4008 down_write(&ls->ls_root_sem);
4009 list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
4010 hold_rsb(r);
4011 lock_rsb(r);
4012 if (is_master(r))
4013 purge_dead_locks(r);
4014 unlock_rsb(r);
4015 unhold_rsb(r);
4016
4017 schedule();
4018 }
4019 up_write(&ls->ls_root_sem);
4020
4021 return 0;
4022}
4023
David Teigland97a35d12006-05-02 13:34:03 -04004024static struct dlm_rsb *find_purged_rsb(struct dlm_ls *ls, int bucket)
4025{
4026 struct dlm_rsb *r, *r_ret = NULL;
4027
4028 read_lock(&ls->ls_rsbtbl[bucket].lock);
4029 list_for_each_entry(r, &ls->ls_rsbtbl[bucket].list, res_hashchain) {
4030 if (!rsb_flag(r, RSB_LOCKS_PURGED))
4031 continue;
4032 hold_rsb(r);
4033 rsb_clear_flag(r, RSB_LOCKS_PURGED);
4034 r_ret = r;
4035 break;
4036 }
4037 read_unlock(&ls->ls_rsbtbl[bucket].lock);
4038 return r_ret;
4039}
4040
4041void dlm_grant_after_purge(struct dlm_ls *ls)
David Teiglande7fd4172006-01-18 09:30:29 +00004042{
4043 struct dlm_rsb *r;
David Teigland2b4e9262006-07-25 13:59:48 -05004044 int bucket = 0;
David Teiglande7fd4172006-01-18 09:30:29 +00004045
David Teigland2b4e9262006-07-25 13:59:48 -05004046 while (1) {
4047 r = find_purged_rsb(ls, bucket);
4048 if (!r) {
4049 if (bucket == ls->ls_rsbtbl_size - 1)
4050 break;
4051 bucket++;
David Teigland97a35d12006-05-02 13:34:03 -04004052 continue;
David Teigland2b4e9262006-07-25 13:59:48 -05004053 }
David Teigland97a35d12006-05-02 13:34:03 -04004054 lock_rsb(r);
4055 if (is_master(r)) {
4056 grant_pending_locks(r);
4057 confirm_master(r, 0);
David Teiglande7fd4172006-01-18 09:30:29 +00004058 }
David Teigland97a35d12006-05-02 13:34:03 -04004059 unlock_rsb(r);
4060 put_rsb(r);
David Teigland2b4e9262006-07-25 13:59:48 -05004061 schedule();
David Teiglande7fd4172006-01-18 09:30:29 +00004062 }
David Teiglande7fd4172006-01-18 09:30:29 +00004063}
4064
4065static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid,
4066 uint32_t remid)
4067{
4068 struct dlm_lkb *lkb;
4069
4070 list_for_each_entry(lkb, head, lkb_statequeue) {
4071 if (lkb->lkb_nodeid == nodeid && lkb->lkb_remid == remid)
4072 return lkb;
4073 }
4074 return NULL;
4075}
4076
4077static struct dlm_lkb *search_remid(struct dlm_rsb *r, int nodeid,
4078 uint32_t remid)
4079{
4080 struct dlm_lkb *lkb;
4081
4082 lkb = search_remid_list(&r->res_grantqueue, nodeid, remid);
4083 if (lkb)
4084 return lkb;
4085 lkb = search_remid_list(&r->res_convertqueue, nodeid, remid);
4086 if (lkb)
4087 return lkb;
4088 lkb = search_remid_list(&r->res_waitqueue, nodeid, remid);
4089 if (lkb)
4090 return lkb;
4091 return NULL;
4092}
4093
4094static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
4095 struct dlm_rsb *r, struct dlm_rcom *rc)
4096{
4097 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
4098 int lvblen;
4099
4100 lkb->lkb_nodeid = rc->rc_header.h_nodeid;
4101 lkb->lkb_ownpid = rl->rl_ownpid;
4102 lkb->lkb_remid = rl->rl_lkid;
4103 lkb->lkb_exflags = rl->rl_exflags;
4104 lkb->lkb_flags = rl->rl_flags & 0x0000FFFF;
4105 lkb->lkb_flags |= DLM_IFL_MSTCPY;
4106 lkb->lkb_lvbseq = rl->rl_lvbseq;
4107 lkb->lkb_rqmode = rl->rl_rqmode;
4108 lkb->lkb_grmode = rl->rl_grmode;
4109 /* don't set lkb_status because add_lkb wants to itself */
4110
4111 lkb->lkb_bastaddr = (void *) (long) (rl->rl_asts & AST_BAST);
4112 lkb->lkb_astaddr = (void *) (long) (rl->rl_asts & AST_COMP);
4113
David Teiglande7fd4172006-01-18 09:30:29 +00004114 if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
4115 lkb->lkb_lvbptr = allocate_lvb(ls);
4116 if (!lkb->lkb_lvbptr)
4117 return -ENOMEM;
4118 lvblen = rc->rc_header.h_length - sizeof(struct dlm_rcom) -
4119 sizeof(struct rcom_lock);
4120 memcpy(lkb->lkb_lvbptr, rl->rl_lvb, lvblen);
4121 }
4122
4123 /* Conversions between PR and CW (middle modes) need special handling.
4124 The real granted mode of these converting locks cannot be determined
4125 until all locks have been rebuilt on the rsb (recover_conversion) */
4126
4127 if (rl->rl_wait_type == DLM_MSG_CONVERT && middle_conversion(lkb)) {
4128 rl->rl_status = DLM_LKSTS_CONVERT;
4129 lkb->lkb_grmode = DLM_LOCK_IV;
4130 rsb_set_flag(r, RSB_RECOVER_CONVERT);
4131 }
4132
4133 return 0;
4134}
4135
4136/* This lkb may have been recovered in a previous aborted recovery so we need
4137 to check if the rsb already has an lkb with the given remote nodeid/lkid.
4138 If so we just send back a standard reply. If not, we create a new lkb with
4139 the given values and send back our lkid. We send back our lkid by sending
4140 back the rcom_lock struct we got but with the remid field filled in. */
4141
4142int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
4143{
4144 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
4145 struct dlm_rsb *r;
4146 struct dlm_lkb *lkb;
4147 int error;
4148
4149 if (rl->rl_parent_lkid) {
4150 error = -EOPNOTSUPP;
4151 goto out;
4152 }
4153
4154 error = find_rsb(ls, rl->rl_name, rl->rl_namelen, R_MASTER, &r);
4155 if (error)
4156 goto out;
4157
4158 lock_rsb(r);
4159
4160 lkb = search_remid(r, rc->rc_header.h_nodeid, rl->rl_lkid);
4161 if (lkb) {
4162 error = -EEXIST;
4163 goto out_remid;
4164 }
4165
4166 error = create_lkb(ls, &lkb);
4167 if (error)
4168 goto out_unlock;
4169
4170 error = receive_rcom_lock_args(ls, lkb, r, rc);
4171 if (error) {
David Teiglandb3f58d82006-02-28 11:16:37 -05004172 __put_lkb(ls, lkb);
David Teiglande7fd4172006-01-18 09:30:29 +00004173 goto out_unlock;
4174 }
4175
4176 attach_lkb(r, lkb);
4177 add_lkb(r, lkb, rl->rl_status);
4178 error = 0;
4179
4180 out_remid:
4181 /* this is the new value returned to the lock holder for
4182 saving in its process-copy lkb */
4183 rl->rl_remid = lkb->lkb_id;
4184
4185 out_unlock:
4186 unlock_rsb(r);
4187 put_rsb(r);
4188 out:
4189 if (error)
4190 log_print("recover_master_copy %d %x", error, rl->rl_lkid);
4191 rl->rl_result = error;
4192 return error;
4193}
4194
4195int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
4196{
4197 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
4198 struct dlm_rsb *r;
4199 struct dlm_lkb *lkb;
4200 int error;
4201
4202 error = find_lkb(ls, rl->rl_lkid, &lkb);
4203 if (error) {
4204 log_error(ls, "recover_process_copy no lkid %x", rl->rl_lkid);
4205 return error;
4206 }
4207
4208 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
4209
4210 error = rl->rl_result;
4211
4212 r = lkb->lkb_resource;
4213 hold_rsb(r);
4214 lock_rsb(r);
4215
4216 switch (error) {
David Teiglanddc200a82006-12-13 10:36:37 -06004217 case -EBADR:
4218 /* There's a chance the new master received our lock before
4219 dlm_recover_master_reply(), this wouldn't happen if we did
4220 a barrier between recover_masters and recover_locks. */
4221 log_debug(ls, "master copy not ready %x r %lx %s", lkb->lkb_id,
4222 (unsigned long)r, r->res_name);
4223 dlm_send_rcom_lock(r, lkb);
4224 goto out;
David Teiglande7fd4172006-01-18 09:30:29 +00004225 case -EEXIST:
4226 log_debug(ls, "master copy exists %x", lkb->lkb_id);
4227 /* fall through */
4228 case 0:
4229 lkb->lkb_remid = rl->rl_remid;
4230 break;
4231 default:
4232 log_error(ls, "dlm_recover_process_copy unknown error %d %x",
4233 error, lkb->lkb_id);
4234 }
4235
4236 /* an ack for dlm_recover_locks() which waits for replies from
4237 all the locks it sends to new masters */
4238 dlm_recovered_lock(r);
David Teiglanddc200a82006-12-13 10:36:37 -06004239 out:
David Teiglande7fd4172006-01-18 09:30:29 +00004240 unlock_rsb(r);
4241 put_rsb(r);
David Teiglandb3f58d82006-02-28 11:16:37 -05004242 dlm_put_lkb(lkb);
David Teiglande7fd4172006-01-18 09:30:29 +00004243
4244 return 0;
4245}
4246
David Teigland597d0ca2006-07-12 16:44:04 -05004247int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
4248 int mode, uint32_t flags, void *name, unsigned int namelen,
David Teiglandd7db9232007-05-18 09:00:32 -05004249 unsigned long timeout_cs)
David Teigland597d0ca2006-07-12 16:44:04 -05004250{
4251 struct dlm_lkb *lkb;
4252 struct dlm_args args;
4253 int error;
4254
David Teigland85e86ed2007-05-18 08:58:15 -05004255 dlm_lock_recovery(ls);
David Teigland597d0ca2006-07-12 16:44:04 -05004256
4257 error = create_lkb(ls, &lkb);
4258 if (error) {
4259 kfree(ua);
4260 goto out;
4261 }
4262
4263 if (flags & DLM_LKF_VALBLK) {
David Teigland62a0f622007-01-31 13:25:00 -06004264 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_KERNEL);
David Teigland597d0ca2006-07-12 16:44:04 -05004265 if (!ua->lksb.sb_lvbptr) {
4266 kfree(ua);
4267 __put_lkb(ls, lkb);
4268 error = -ENOMEM;
4269 goto out;
4270 }
4271 }
4272
4273 /* After ua is attached to lkb it will be freed by free_lkb().
4274 When DLM_IFL_USER is set, the dlm knows that this is a userspace
4275 lock and that lkb_astparam is the dlm_user_args structure. */
4276
David Teiglandd7db9232007-05-18 09:00:32 -05004277 error = set_lock_args(mode, &ua->lksb, flags, namelen, timeout_cs,
David Teigland32f105a2006-08-23 16:07:31 -04004278 DLM_FAKE_USER_AST, ua, DLM_FAKE_USER_AST, &args);
David Teigland597d0ca2006-07-12 16:44:04 -05004279 lkb->lkb_flags |= DLM_IFL_USER;
4280 ua->old_mode = DLM_LOCK_IV;
4281
4282 if (error) {
4283 __put_lkb(ls, lkb);
4284 goto out;
4285 }
4286
4287 error = request_lock(ls, lkb, name, namelen, &args);
4288
4289 switch (error) {
4290 case 0:
4291 break;
4292 case -EINPROGRESS:
4293 error = 0;
4294 break;
4295 case -EAGAIN:
4296 error = 0;
4297 /* fall through */
4298 default:
4299 __put_lkb(ls, lkb);
4300 goto out;
4301 }
4302
4303 /* add this new lkb to the per-process list of locks */
4304 spin_lock(&ua->proc->locks_spin);
David Teiglandef0c2bb2007-03-28 09:56:46 -05004305 hold_lkb(lkb);
David Teigland597d0ca2006-07-12 16:44:04 -05004306 list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
4307 spin_unlock(&ua->proc->locks_spin);
4308 out:
David Teigland85e86ed2007-05-18 08:58:15 -05004309 dlm_unlock_recovery(ls);
David Teigland597d0ca2006-07-12 16:44:04 -05004310 return error;
4311}
4312
4313int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
David Teiglandd7db9232007-05-18 09:00:32 -05004314 int mode, uint32_t flags, uint32_t lkid, char *lvb_in,
4315 unsigned long timeout_cs)
David Teigland597d0ca2006-07-12 16:44:04 -05004316{
4317 struct dlm_lkb *lkb;
4318 struct dlm_args args;
4319 struct dlm_user_args *ua;
4320 int error;
4321
David Teigland85e86ed2007-05-18 08:58:15 -05004322 dlm_lock_recovery(ls);
David Teigland597d0ca2006-07-12 16:44:04 -05004323
4324 error = find_lkb(ls, lkid, &lkb);
4325 if (error)
4326 goto out;
4327
4328 /* user can change the params on its lock when it converts it, or
4329 add an lvb that didn't exist before */
4330
4331 ua = (struct dlm_user_args *)lkb->lkb_astparam;
4332
4333 if (flags & DLM_LKF_VALBLK && !ua->lksb.sb_lvbptr) {
David Teigland62a0f622007-01-31 13:25:00 -06004334 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_KERNEL);
David Teigland597d0ca2006-07-12 16:44:04 -05004335 if (!ua->lksb.sb_lvbptr) {
4336 error = -ENOMEM;
4337 goto out_put;
4338 }
4339 }
4340 if (lvb_in && ua->lksb.sb_lvbptr)
4341 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
4342
David Teiglandd7db9232007-05-18 09:00:32 -05004343 ua->xid = ua_tmp->xid;
David Teigland597d0ca2006-07-12 16:44:04 -05004344 ua->castparam = ua_tmp->castparam;
4345 ua->castaddr = ua_tmp->castaddr;
4346 ua->bastparam = ua_tmp->bastparam;
4347 ua->bastaddr = ua_tmp->bastaddr;
Patrick Caulfield10948eb2006-08-23 09:49:31 +01004348 ua->user_lksb = ua_tmp->user_lksb;
David Teigland597d0ca2006-07-12 16:44:04 -05004349 ua->old_mode = lkb->lkb_grmode;
4350
David Teiglandd7db9232007-05-18 09:00:32 -05004351 error = set_lock_args(mode, &ua->lksb, flags, 0, timeout_cs,
4352 DLM_FAKE_USER_AST, ua, DLM_FAKE_USER_AST, &args);
David Teigland597d0ca2006-07-12 16:44:04 -05004353 if (error)
4354 goto out_put;
4355
4356 error = convert_lock(ls, lkb, &args);
4357
David Teiglandc85d65e2007-05-18 09:01:26 -05004358 if (error == -EINPROGRESS || error == -EAGAIN || error == -EDEADLK)
David Teigland597d0ca2006-07-12 16:44:04 -05004359 error = 0;
4360 out_put:
4361 dlm_put_lkb(lkb);
4362 out:
David Teigland85e86ed2007-05-18 08:58:15 -05004363 dlm_unlock_recovery(ls);
David Teigland597d0ca2006-07-12 16:44:04 -05004364 kfree(ua_tmp);
4365 return error;
4366}
4367
4368int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4369 uint32_t flags, uint32_t lkid, char *lvb_in)
4370{
4371 struct dlm_lkb *lkb;
4372 struct dlm_args args;
4373 struct dlm_user_args *ua;
4374 int error;
4375
David Teigland85e86ed2007-05-18 08:58:15 -05004376 dlm_lock_recovery(ls);
David Teigland597d0ca2006-07-12 16:44:04 -05004377
4378 error = find_lkb(ls, lkid, &lkb);
4379 if (error)
4380 goto out;
4381
4382 ua = (struct dlm_user_args *)lkb->lkb_astparam;
4383
4384 if (lvb_in && ua->lksb.sb_lvbptr)
4385 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
4386 ua->castparam = ua_tmp->castparam;
Patrick Caulfieldcc346d52006-08-08 10:34:40 -04004387 ua->user_lksb = ua_tmp->user_lksb;
David Teigland597d0ca2006-07-12 16:44:04 -05004388
4389 error = set_unlock_args(flags, ua, &args);
4390 if (error)
4391 goto out_put;
4392
4393 error = unlock_lock(ls, lkb, &args);
4394
4395 if (error == -DLM_EUNLOCK)
4396 error = 0;
David Teiglandef0c2bb2007-03-28 09:56:46 -05004397 /* from validate_unlock_args() */
4398 if (error == -EBUSY && (flags & DLM_LKF_FORCEUNLOCK))
4399 error = 0;
David Teigland597d0ca2006-07-12 16:44:04 -05004400 if (error)
4401 goto out_put;
4402
4403 spin_lock(&ua->proc->locks_spin);
David Teiglanda1bc86e2007-01-15 10:34:52 -06004404 /* dlm_user_add_ast() may have already taken lkb off the proc list */
4405 if (!list_empty(&lkb->lkb_ownqueue))
4406 list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking);
David Teigland597d0ca2006-07-12 16:44:04 -05004407 spin_unlock(&ua->proc->locks_spin);
David Teigland597d0ca2006-07-12 16:44:04 -05004408 out_put:
4409 dlm_put_lkb(lkb);
4410 out:
David Teigland85e86ed2007-05-18 08:58:15 -05004411 dlm_unlock_recovery(ls);
David Teiglandef0c2bb2007-03-28 09:56:46 -05004412 kfree(ua_tmp);
David Teigland597d0ca2006-07-12 16:44:04 -05004413 return error;
4414}
4415
4416int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4417 uint32_t flags, uint32_t lkid)
4418{
4419 struct dlm_lkb *lkb;
4420 struct dlm_args args;
4421 struct dlm_user_args *ua;
4422 int error;
4423
David Teigland85e86ed2007-05-18 08:58:15 -05004424 dlm_lock_recovery(ls);
David Teigland597d0ca2006-07-12 16:44:04 -05004425
4426 error = find_lkb(ls, lkid, &lkb);
4427 if (error)
4428 goto out;
4429
4430 ua = (struct dlm_user_args *)lkb->lkb_astparam;
4431 ua->castparam = ua_tmp->castparam;
Patrick Caulfieldc059f702006-08-23 10:24:03 +01004432 ua->user_lksb = ua_tmp->user_lksb;
David Teigland597d0ca2006-07-12 16:44:04 -05004433
4434 error = set_unlock_args(flags, ua, &args);
4435 if (error)
4436 goto out_put;
4437
4438 error = cancel_lock(ls, lkb, &args);
4439
4440 if (error == -DLM_ECANCEL)
4441 error = 0;
David Teiglandef0c2bb2007-03-28 09:56:46 -05004442 /* from validate_unlock_args() */
4443 if (error == -EBUSY)
4444 error = 0;
David Teigland597d0ca2006-07-12 16:44:04 -05004445 out_put:
4446 dlm_put_lkb(lkb);
4447 out:
David Teigland85e86ed2007-05-18 08:58:15 -05004448 dlm_unlock_recovery(ls);
David Teiglandef0c2bb2007-03-28 09:56:46 -05004449 kfree(ua_tmp);
David Teigland597d0ca2006-07-12 16:44:04 -05004450 return error;
4451}
4452
David Teiglandef0c2bb2007-03-28 09:56:46 -05004453/* lkb's that are removed from the waiters list by revert are just left on the
4454 orphans list with the granted orphan locks, to be freed by purge */
4455
David Teigland597d0ca2006-07-12 16:44:04 -05004456static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
4457{
4458 struct dlm_user_args *ua = (struct dlm_user_args *)lkb->lkb_astparam;
David Teiglandef0c2bb2007-03-28 09:56:46 -05004459 struct dlm_args args;
4460 int error;
David Teigland597d0ca2006-07-12 16:44:04 -05004461
David Teiglandef0c2bb2007-03-28 09:56:46 -05004462 hold_lkb(lkb);
4463 mutex_lock(&ls->ls_orphans_mutex);
4464 list_add_tail(&lkb->lkb_ownqueue, &ls->ls_orphans);
4465 mutex_unlock(&ls->ls_orphans_mutex);
David Teigland597d0ca2006-07-12 16:44:04 -05004466
David Teiglandef0c2bb2007-03-28 09:56:46 -05004467 set_unlock_args(0, ua, &args);
4468
4469 error = cancel_lock(ls, lkb, &args);
4470 if (error == -DLM_ECANCEL)
4471 error = 0;
4472 return error;
David Teigland597d0ca2006-07-12 16:44:04 -05004473}
4474
4475/* The force flag allows the unlock to go ahead even if the lkb isn't granted.
4476 Regardless of what rsb queue the lock is on, it's removed and freed. */
4477
4478static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
4479{
4480 struct dlm_user_args *ua = (struct dlm_user_args *)lkb->lkb_astparam;
4481 struct dlm_args args;
4482 int error;
4483
David Teigland597d0ca2006-07-12 16:44:04 -05004484 set_unlock_args(DLM_LKF_FORCEUNLOCK, ua, &args);
4485
4486 error = unlock_lock(ls, lkb, &args);
4487 if (error == -DLM_EUNLOCK)
4488 error = 0;
4489 return error;
4490}
4491
David Teiglandef0c2bb2007-03-28 09:56:46 -05004492/* We have to release clear_proc_locks mutex before calling unlock_proc_lock()
4493 (which does lock_rsb) due to deadlock with receiving a message that does
4494 lock_rsb followed by dlm_user_add_ast() */
4495
4496static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls,
4497 struct dlm_user_proc *proc)
4498{
4499 struct dlm_lkb *lkb = NULL;
4500
4501 mutex_lock(&ls->ls_clear_proc_locks);
4502 if (list_empty(&proc->locks))
4503 goto out;
4504
4505 lkb = list_entry(proc->locks.next, struct dlm_lkb, lkb_ownqueue);
4506 list_del_init(&lkb->lkb_ownqueue);
4507
4508 if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
4509 lkb->lkb_flags |= DLM_IFL_ORPHAN;
4510 else
4511 lkb->lkb_flags |= DLM_IFL_DEAD;
4512 out:
4513 mutex_unlock(&ls->ls_clear_proc_locks);
4514 return lkb;
4515}
4516
David Teigland597d0ca2006-07-12 16:44:04 -05004517/* The ls_clear_proc_locks mutex protects against dlm_user_add_asts() which
4518 1) references lkb->ua which we free here and 2) adds lkbs to proc->asts,
4519 which we clear here. */
4520
4521/* proc CLOSING flag is set so no more device_reads should look at proc->asts
4522 list, and no more device_writes should add lkb's to proc->locks list; so we
4523 shouldn't need to take asts_spin or locks_spin here. this assumes that
4524 device reads/writes/closes are serialized -- FIXME: we may need to serialize
4525 them ourself. */
4526
4527void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
4528{
4529 struct dlm_lkb *lkb, *safe;
4530
David Teigland85e86ed2007-05-18 08:58:15 -05004531 dlm_lock_recovery(ls);
David Teigland597d0ca2006-07-12 16:44:04 -05004532
David Teiglandef0c2bb2007-03-28 09:56:46 -05004533 while (1) {
4534 lkb = del_proc_lock(ls, proc);
4535 if (!lkb)
4536 break;
4537 if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
David Teigland597d0ca2006-07-12 16:44:04 -05004538 orphan_proc_lock(ls, lkb);
David Teiglandef0c2bb2007-03-28 09:56:46 -05004539 else
David Teigland597d0ca2006-07-12 16:44:04 -05004540 unlock_proc_lock(ls, lkb);
David Teigland597d0ca2006-07-12 16:44:04 -05004541
4542 /* this removes the reference for the proc->locks list
4543 added by dlm_user_request, it may result in the lkb
4544 being freed */
4545
4546 dlm_put_lkb(lkb);
4547 }
David Teiglanda1bc86e2007-01-15 10:34:52 -06004548
David Teiglandef0c2bb2007-03-28 09:56:46 -05004549 mutex_lock(&ls->ls_clear_proc_locks);
4550
David Teiglanda1bc86e2007-01-15 10:34:52 -06004551 /* in-progress unlocks */
4552 list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
4553 list_del_init(&lkb->lkb_ownqueue);
4554 lkb->lkb_flags |= DLM_IFL_DEAD;
4555 dlm_put_lkb(lkb);
4556 }
4557
4558 list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_astqueue) {
4559 list_del(&lkb->lkb_astqueue);
4560 dlm_put_lkb(lkb);
4561 }
4562
David Teigland597d0ca2006-07-12 16:44:04 -05004563 mutex_unlock(&ls->ls_clear_proc_locks);
David Teigland85e86ed2007-05-18 08:58:15 -05004564 dlm_unlock_recovery(ls);
David Teigland597d0ca2006-07-12 16:44:04 -05004565}
David Teiglanda1bc86e2007-01-15 10:34:52 -06004566
David Teigland84991372007-03-30 15:02:40 -05004567static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
4568{
4569 struct dlm_lkb *lkb, *safe;
4570
4571 while (1) {
4572 lkb = NULL;
4573 spin_lock(&proc->locks_spin);
4574 if (!list_empty(&proc->locks)) {
4575 lkb = list_entry(proc->locks.next, struct dlm_lkb,
4576 lkb_ownqueue);
4577 list_del_init(&lkb->lkb_ownqueue);
4578 }
4579 spin_unlock(&proc->locks_spin);
4580
4581 if (!lkb)
4582 break;
4583
4584 lkb->lkb_flags |= DLM_IFL_DEAD;
4585 unlock_proc_lock(ls, lkb);
4586 dlm_put_lkb(lkb); /* ref from proc->locks list */
4587 }
4588
4589 spin_lock(&proc->locks_spin);
4590 list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
4591 list_del_init(&lkb->lkb_ownqueue);
4592 lkb->lkb_flags |= DLM_IFL_DEAD;
4593 dlm_put_lkb(lkb);
4594 }
4595 spin_unlock(&proc->locks_spin);
4596
4597 spin_lock(&proc->asts_spin);
4598 list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_astqueue) {
4599 list_del(&lkb->lkb_astqueue);
4600 dlm_put_lkb(lkb);
4601 }
4602 spin_unlock(&proc->asts_spin);
4603}
4604
4605/* pid of 0 means purge all orphans */
4606
4607static void do_purge(struct dlm_ls *ls, int nodeid, int pid)
4608{
4609 struct dlm_lkb *lkb, *safe;
4610
4611 mutex_lock(&ls->ls_orphans_mutex);
4612 list_for_each_entry_safe(lkb, safe, &ls->ls_orphans, lkb_ownqueue) {
4613 if (pid && lkb->lkb_ownpid != pid)
4614 continue;
4615 unlock_proc_lock(ls, lkb);
4616 list_del_init(&lkb->lkb_ownqueue);
4617 dlm_put_lkb(lkb);
4618 }
4619 mutex_unlock(&ls->ls_orphans_mutex);
4620}
4621
4622static int send_purge(struct dlm_ls *ls, int nodeid, int pid)
4623{
4624 struct dlm_message *ms;
4625 struct dlm_mhandle *mh;
4626 int error;
4627
4628 error = _create_message(ls, sizeof(struct dlm_message), nodeid,
4629 DLM_MSG_PURGE, &ms, &mh);
4630 if (error)
4631 return error;
4632 ms->m_nodeid = nodeid;
4633 ms->m_pid = pid;
4634
4635 return send_message(mh, ms);
4636}
4637
4638int dlm_user_purge(struct dlm_ls *ls, struct dlm_user_proc *proc,
4639 int nodeid, int pid)
4640{
4641 int error = 0;
4642
4643 if (nodeid != dlm_our_nodeid()) {
4644 error = send_purge(ls, nodeid, pid);
4645 } else {
David Teigland85e86ed2007-05-18 08:58:15 -05004646 dlm_lock_recovery(ls);
David Teigland84991372007-03-30 15:02:40 -05004647 if (pid == current->pid)
4648 purge_proc_locks(ls, proc);
4649 else
4650 do_purge(ls, nodeid, pid);
David Teigland85e86ed2007-05-18 08:58:15 -05004651 dlm_unlock_recovery(ls);
David Teigland84991372007-03-30 15:02:40 -05004652 }
4653 return error;
4654}
4655