blob: df91578145d1f7413360741087a9c380cd1b8eba [file] [log] [blame]
David Teiglande7fd4172006-01-18 09:30:29 +00001/******************************************************************************
2*******************************************************************************
3**
David Teiglandef0c2bb2007-03-28 09:56:46 -05004** Copyright (C) 2005-2007 Red Hat, Inc. All rights reserved.
David Teiglande7fd4172006-01-18 09:30:29 +00005**
6** This copyrighted material is made available to anyone wishing to use,
7** modify, copy, or redistribute it subject to the terms and conditions
8** of the GNU General Public License v.2.
9**
10*******************************************************************************
11******************************************************************************/
12
13/* Central locking logic has four stages:
14
15 dlm_lock()
16 dlm_unlock()
17
18 request_lock(ls, lkb)
19 convert_lock(ls, lkb)
20 unlock_lock(ls, lkb)
21 cancel_lock(ls, lkb)
22
23 _request_lock(r, lkb)
24 _convert_lock(r, lkb)
25 _unlock_lock(r, lkb)
26 _cancel_lock(r, lkb)
27
28 do_request(r, lkb)
29 do_convert(r, lkb)
30 do_unlock(r, lkb)
31 do_cancel(r, lkb)
32
33 Stage 1 (lock, unlock) is mainly about checking input args and
34 splitting into one of the four main operations:
35
36 dlm_lock = request_lock
37 dlm_lock+CONVERT = convert_lock
38 dlm_unlock = unlock_lock
39 dlm_unlock+CANCEL = cancel_lock
40
41 Stage 2, xxxx_lock(), just finds and locks the relevant rsb which is
42 provided to the next stage.
43
44 Stage 3, _xxxx_lock(), determines if the operation is local or remote.
45 When remote, it calls send_xxxx(), when local it calls do_xxxx().
46
47 Stage 4, do_xxxx(), is the guts of the operation. It manipulates the
48 given rsb and lkb and queues callbacks.
49
50 For remote operations, send_xxxx() results in the corresponding do_xxxx()
51 function being executed on the remote node. The connecting send/receive
52 calls on local (L) and remote (R) nodes:
53
54 L: send_xxxx() -> R: receive_xxxx()
55 R: do_xxxx()
56 L: receive_xxxx_reply() <- R: send_xxxx_reply()
57*/
David Teigland597d0ca2006-07-12 16:44:04 -050058#include <linux/types.h>
David Teiglande7fd4172006-01-18 09:30:29 +000059#include "dlm_internal.h"
David Teigland597d0ca2006-07-12 16:44:04 -050060#include <linux/dlm_device.h>
David Teiglande7fd4172006-01-18 09:30:29 +000061#include "memory.h"
62#include "lowcomms.h"
63#include "requestqueue.h"
64#include "util.h"
65#include "dir.h"
66#include "member.h"
67#include "lockspace.h"
68#include "ast.h"
69#include "lock.h"
70#include "rcom.h"
71#include "recover.h"
72#include "lvb_table.h"
David Teigland597d0ca2006-07-12 16:44:04 -050073#include "user.h"
David Teiglande7fd4172006-01-18 09:30:29 +000074#include "config.h"
75
76static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb);
77static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb);
78static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb);
79static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb);
80static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb);
81static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode);
82static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb);
83static int send_remove(struct dlm_rsb *r);
84static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
David Teigland3ae1acf2007-05-18 08:59:31 -050085static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb);
David Teiglande7fd4172006-01-18 09:30:29 +000086static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
87 struct dlm_message *ms);
88static int receive_extralen(struct dlm_message *ms);
David Teigland84991372007-03-30 15:02:40 -050089static void do_purge(struct dlm_ls *ls, int nodeid, int pid);
David Teigland3ae1acf2007-05-18 08:59:31 -050090static void del_timeout(struct dlm_lkb *lkb);
91void dlm_timeout_warn(struct dlm_lkb *lkb);
David Teiglande7fd4172006-01-18 09:30:29 +000092
93/*
94 * Lock compatibilty matrix - thanks Steve
95 * UN = Unlocked state. Not really a state, used as a flag
96 * PD = Padding. Used to make the matrix a nice power of two in size
97 * Other states are the same as the VMS DLM.
98 * Usage: matrix[grmode+1][rqmode+1] (although m[rq+1][gr+1] is the same)
99 */
100
101static const int __dlm_compat_matrix[8][8] = {
102 /* UN NL CR CW PR PW EX PD */
103 {1, 1, 1, 1, 1, 1, 1, 0}, /* UN */
104 {1, 1, 1, 1, 1, 1, 1, 0}, /* NL */
105 {1, 1, 1, 1, 1, 1, 0, 0}, /* CR */
106 {1, 1, 1, 1, 0, 0, 0, 0}, /* CW */
107 {1, 1, 1, 0, 1, 0, 0, 0}, /* PR */
108 {1, 1, 1, 0, 0, 0, 0, 0}, /* PW */
109 {1, 1, 0, 0, 0, 0, 0, 0}, /* EX */
110 {0, 0, 0, 0, 0, 0, 0, 0} /* PD */
111};
112
113/*
114 * This defines the direction of transfer of LVB data.
115 * Granted mode is the row; requested mode is the column.
116 * Usage: matrix[grmode+1][rqmode+1]
117 * 1 = LVB is returned to the caller
118 * 0 = LVB is written to the resource
119 * -1 = nothing happens to the LVB
120 */
121
122const int dlm_lvb_operations[8][8] = {
123 /* UN NL CR CW PR PW EX PD*/
124 { -1, 1, 1, 1, 1, 1, 1, -1 }, /* UN */
125 { -1, 1, 1, 1, 1, 1, 1, 0 }, /* NL */
126 { -1, -1, 1, 1, 1, 1, 1, 0 }, /* CR */
127 { -1, -1, -1, 1, 1, 1, 1, 0 }, /* CW */
128 { -1, -1, -1, -1, 1, 1, 1, 0 }, /* PR */
129 { -1, 0, 0, 0, 0, 0, 1, 0 }, /* PW */
130 { -1, 0, 0, 0, 0, 0, 0, 0 }, /* EX */
131 { -1, 0, 0, 0, 0, 0, 0, 0 } /* PD */
132};
David Teiglande7fd4172006-01-18 09:30:29 +0000133
134#define modes_compat(gr, rq) \
135 __dlm_compat_matrix[(gr)->lkb_grmode + 1][(rq)->lkb_rqmode + 1]
136
137int dlm_modes_compat(int mode1, int mode2)
138{
139 return __dlm_compat_matrix[mode1 + 1][mode2 + 1];
140}
141
142/*
143 * Compatibility matrix for conversions with QUECVT set.
144 * Granted mode is the row; requested mode is the column.
145 * Usage: matrix[grmode+1][rqmode+1]
146 */
147
148static const int __quecvt_compat_matrix[8][8] = {
149 /* UN NL CR CW PR PW EX PD */
150 {0, 0, 0, 0, 0, 0, 0, 0}, /* UN */
151 {0, 0, 1, 1, 1, 1, 1, 0}, /* NL */
152 {0, 0, 0, 1, 1, 1, 1, 0}, /* CR */
153 {0, 0, 0, 0, 1, 1, 1, 0}, /* CW */
154 {0, 0, 0, 1, 0, 1, 1, 0}, /* PR */
155 {0, 0, 0, 0, 0, 0, 1, 0}, /* PW */
156 {0, 0, 0, 0, 0, 0, 0, 0}, /* EX */
157 {0, 0, 0, 0, 0, 0, 0, 0} /* PD */
158};
159
David Teigland597d0ca2006-07-12 16:44:04 -0500160void dlm_print_lkb(struct dlm_lkb *lkb)
David Teiglande7fd4172006-01-18 09:30:29 +0000161{
162 printk(KERN_ERR "lkb: nodeid %d id %x remid %x exflags %x flags %x\n"
163 " status %d rqmode %d grmode %d wait_type %d ast_type %d\n",
164 lkb->lkb_nodeid, lkb->lkb_id, lkb->lkb_remid, lkb->lkb_exflags,
165 lkb->lkb_flags, lkb->lkb_status, lkb->lkb_rqmode,
166 lkb->lkb_grmode, lkb->lkb_wait_type, lkb->lkb_ast_type);
167}
168
169void dlm_print_rsb(struct dlm_rsb *r)
170{
171 printk(KERN_ERR "rsb: nodeid %d flags %lx first %x rlc %d name %s\n",
172 r->res_nodeid, r->res_flags, r->res_first_lkid,
173 r->res_recover_locks_count, r->res_name);
174}
175
David Teiglanda345da32006-08-18 11:54:25 -0500176void dlm_dump_rsb(struct dlm_rsb *r)
177{
178 struct dlm_lkb *lkb;
179
180 dlm_print_rsb(r);
181
182 printk(KERN_ERR "rsb: root_list empty %d recover_list empty %d\n",
183 list_empty(&r->res_root_list), list_empty(&r->res_recover_list));
184 printk(KERN_ERR "rsb lookup list\n");
185 list_for_each_entry(lkb, &r->res_lookup, lkb_rsb_lookup)
186 dlm_print_lkb(lkb);
187 printk(KERN_ERR "rsb grant queue:\n");
188 list_for_each_entry(lkb, &r->res_grantqueue, lkb_statequeue)
189 dlm_print_lkb(lkb);
190 printk(KERN_ERR "rsb convert queue:\n");
191 list_for_each_entry(lkb, &r->res_convertqueue, lkb_statequeue)
192 dlm_print_lkb(lkb);
193 printk(KERN_ERR "rsb wait queue:\n");
194 list_for_each_entry(lkb, &r->res_waitqueue, lkb_statequeue)
195 dlm_print_lkb(lkb);
196}
197
David Teiglande7fd4172006-01-18 09:30:29 +0000198/* Threads cannot use the lockspace while it's being recovered */
199
David Teigland85e86ed2007-05-18 08:58:15 -0500200static inline void dlm_lock_recovery(struct dlm_ls *ls)
David Teiglande7fd4172006-01-18 09:30:29 +0000201{
202 down_read(&ls->ls_in_recovery);
203}
204
David Teigland85e86ed2007-05-18 08:58:15 -0500205void dlm_unlock_recovery(struct dlm_ls *ls)
David Teiglande7fd4172006-01-18 09:30:29 +0000206{
207 up_read(&ls->ls_in_recovery);
208}
209
David Teigland85e86ed2007-05-18 08:58:15 -0500210int dlm_lock_recovery_try(struct dlm_ls *ls)
David Teiglande7fd4172006-01-18 09:30:29 +0000211{
212 return down_read_trylock(&ls->ls_in_recovery);
213}
214
215static inline int can_be_queued(struct dlm_lkb *lkb)
216{
217 return !(lkb->lkb_exflags & DLM_LKF_NOQUEUE);
218}
219
220static inline int force_blocking_asts(struct dlm_lkb *lkb)
221{
222 return (lkb->lkb_exflags & DLM_LKF_NOQUEUEBAST);
223}
224
225static inline int is_demoted(struct dlm_lkb *lkb)
226{
227 return (lkb->lkb_sbflags & DLM_SBF_DEMOTED);
228}
229
David Teigland7d3c1fe2007-04-19 10:30:41 -0500230static inline int is_altmode(struct dlm_lkb *lkb)
231{
232 return (lkb->lkb_sbflags & DLM_SBF_ALTMODE);
233}
234
235static inline int is_granted(struct dlm_lkb *lkb)
236{
237 return (lkb->lkb_status == DLM_LKSTS_GRANTED);
238}
239
David Teiglande7fd4172006-01-18 09:30:29 +0000240static inline int is_remote(struct dlm_rsb *r)
241{
242 DLM_ASSERT(r->res_nodeid >= 0, dlm_print_rsb(r););
243 return !!r->res_nodeid;
244}
245
246static inline int is_process_copy(struct dlm_lkb *lkb)
247{
248 return (lkb->lkb_nodeid && !(lkb->lkb_flags & DLM_IFL_MSTCPY));
249}
250
251static inline int is_master_copy(struct dlm_lkb *lkb)
252{
253 if (lkb->lkb_flags & DLM_IFL_MSTCPY)
254 DLM_ASSERT(lkb->lkb_nodeid, dlm_print_lkb(lkb););
David Teigland90135922006-01-20 08:47:07 +0000255 return (lkb->lkb_flags & DLM_IFL_MSTCPY) ? 1 : 0;
David Teiglande7fd4172006-01-18 09:30:29 +0000256}
257
258static inline int middle_conversion(struct dlm_lkb *lkb)
259{
260 if ((lkb->lkb_grmode==DLM_LOCK_PR && lkb->lkb_rqmode==DLM_LOCK_CW) ||
261 (lkb->lkb_rqmode==DLM_LOCK_PR && lkb->lkb_grmode==DLM_LOCK_CW))
David Teigland90135922006-01-20 08:47:07 +0000262 return 1;
263 return 0;
David Teiglande7fd4172006-01-18 09:30:29 +0000264}
265
266static inline int down_conversion(struct dlm_lkb *lkb)
267{
268 return (!middle_conversion(lkb) && lkb->lkb_rqmode < lkb->lkb_grmode);
269}
270
David Teiglandef0c2bb2007-03-28 09:56:46 -0500271static inline int is_overlap_unlock(struct dlm_lkb *lkb)
272{
273 return lkb->lkb_flags & DLM_IFL_OVERLAP_UNLOCK;
274}
275
276static inline int is_overlap_cancel(struct dlm_lkb *lkb)
277{
278 return lkb->lkb_flags & DLM_IFL_OVERLAP_CANCEL;
279}
280
281static inline int is_overlap(struct dlm_lkb *lkb)
282{
283 return (lkb->lkb_flags & (DLM_IFL_OVERLAP_UNLOCK |
284 DLM_IFL_OVERLAP_CANCEL));
285}
286
David Teiglande7fd4172006-01-18 09:30:29 +0000287static void queue_cast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
288{
289 if (is_master_copy(lkb))
290 return;
291
David Teigland3ae1acf2007-05-18 08:59:31 -0500292 del_timeout(lkb);
293
David Teiglande7fd4172006-01-18 09:30:29 +0000294 DLM_ASSERT(lkb->lkb_lksb, dlm_print_lkb(lkb););
295
David Teigland3ae1acf2007-05-18 08:59:31 -0500296 /* if the operation was a cancel, then return -DLM_ECANCEL, if a
297 timeout caused the cancel then return -ETIMEDOUT */
298 if (rv == -DLM_ECANCEL && (lkb->lkb_flags & DLM_IFL_TIMEOUT_CANCEL)) {
299 lkb->lkb_flags &= ~DLM_IFL_TIMEOUT_CANCEL;
300 rv = -ETIMEDOUT;
301 }
302
David Teiglande7fd4172006-01-18 09:30:29 +0000303 lkb->lkb_lksb->sb_status = rv;
304 lkb->lkb_lksb->sb_flags = lkb->lkb_sbflags;
305
306 dlm_add_ast(lkb, AST_COMP);
307}
308
David Teiglandef0c2bb2007-03-28 09:56:46 -0500309static inline void queue_cast_overlap(struct dlm_rsb *r, struct dlm_lkb *lkb)
310{
311 queue_cast(r, lkb,
312 is_overlap_unlock(lkb) ? -DLM_EUNLOCK : -DLM_ECANCEL);
313}
314
David Teiglande7fd4172006-01-18 09:30:29 +0000315static void queue_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int rqmode)
316{
317 if (is_master_copy(lkb))
318 send_bast(r, lkb, rqmode);
319 else {
320 lkb->lkb_bastmode = rqmode;
321 dlm_add_ast(lkb, AST_BAST);
322 }
323}
324
325/*
326 * Basic operations on rsb's and lkb's
327 */
328
329static struct dlm_rsb *create_rsb(struct dlm_ls *ls, char *name, int len)
330{
331 struct dlm_rsb *r;
332
333 r = allocate_rsb(ls, len);
334 if (!r)
335 return NULL;
336
337 r->res_ls = ls;
338 r->res_length = len;
339 memcpy(r->res_name, name, len);
David Teigland90135922006-01-20 08:47:07 +0000340 mutex_init(&r->res_mutex);
David Teiglande7fd4172006-01-18 09:30:29 +0000341
342 INIT_LIST_HEAD(&r->res_lookup);
343 INIT_LIST_HEAD(&r->res_grantqueue);
344 INIT_LIST_HEAD(&r->res_convertqueue);
345 INIT_LIST_HEAD(&r->res_waitqueue);
346 INIT_LIST_HEAD(&r->res_root_list);
347 INIT_LIST_HEAD(&r->res_recover_list);
348
349 return r;
350}
351
352static int search_rsb_list(struct list_head *head, char *name, int len,
353 unsigned int flags, struct dlm_rsb **r_ret)
354{
355 struct dlm_rsb *r;
356 int error = 0;
357
358 list_for_each_entry(r, head, res_hashchain) {
359 if (len == r->res_length && !memcmp(name, r->res_name, len))
360 goto found;
361 }
David Teigland597d0ca2006-07-12 16:44:04 -0500362 return -EBADR;
David Teiglande7fd4172006-01-18 09:30:29 +0000363
364 found:
365 if (r->res_nodeid && (flags & R_MASTER))
366 error = -ENOTBLK;
367 *r_ret = r;
368 return error;
369}
370
371static int _search_rsb(struct dlm_ls *ls, char *name, int len, int b,
372 unsigned int flags, struct dlm_rsb **r_ret)
373{
374 struct dlm_rsb *r;
375 int error;
376
377 error = search_rsb_list(&ls->ls_rsbtbl[b].list, name, len, flags, &r);
378 if (!error) {
379 kref_get(&r->res_ref);
380 goto out;
381 }
382 error = search_rsb_list(&ls->ls_rsbtbl[b].toss, name, len, flags, &r);
383 if (error)
384 goto out;
385
386 list_move(&r->res_hashchain, &ls->ls_rsbtbl[b].list);
387
388 if (dlm_no_directory(ls))
389 goto out;
390
391 if (r->res_nodeid == -1) {
392 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
393 r->res_first_lkid = 0;
394 } else if (r->res_nodeid > 0) {
395 rsb_set_flag(r, RSB_MASTER_UNCERTAIN);
396 r->res_first_lkid = 0;
397 } else {
398 DLM_ASSERT(r->res_nodeid == 0, dlm_print_rsb(r););
399 DLM_ASSERT(!rsb_flag(r, RSB_MASTER_UNCERTAIN),);
400 }
401 out:
402 *r_ret = r;
403 return error;
404}
405
406static int search_rsb(struct dlm_ls *ls, char *name, int len, int b,
407 unsigned int flags, struct dlm_rsb **r_ret)
408{
409 int error;
410 write_lock(&ls->ls_rsbtbl[b].lock);
411 error = _search_rsb(ls, name, len, b, flags, r_ret);
412 write_unlock(&ls->ls_rsbtbl[b].lock);
413 return error;
414}
415
416/*
417 * Find rsb in rsbtbl and potentially create/add one
418 *
419 * Delaying the release of rsb's has a similar benefit to applications keeping
420 * NL locks on an rsb, but without the guarantee that the cached master value
421 * will still be valid when the rsb is reused. Apps aren't always smart enough
422 * to keep NL locks on an rsb that they may lock again shortly; this can lead
423 * to excessive master lookups and removals if we don't delay the release.
424 *
425 * Searching for an rsb means looking through both the normal list and toss
426 * list. When found on the toss list the rsb is moved to the normal list with
427 * ref count of 1; when found on normal list the ref count is incremented.
428 */
429
430static int find_rsb(struct dlm_ls *ls, char *name, int namelen,
431 unsigned int flags, struct dlm_rsb **r_ret)
432{
433 struct dlm_rsb *r, *tmp;
434 uint32_t hash, bucket;
435 int error = 0;
436
437 if (dlm_no_directory(ls))
438 flags |= R_CREATE;
439
440 hash = jhash(name, namelen, 0);
441 bucket = hash & (ls->ls_rsbtbl_size - 1);
442
443 error = search_rsb(ls, name, namelen, bucket, flags, &r);
444 if (!error)
445 goto out;
446
David Teigland597d0ca2006-07-12 16:44:04 -0500447 if (error == -EBADR && !(flags & R_CREATE))
David Teiglande7fd4172006-01-18 09:30:29 +0000448 goto out;
449
450 /* the rsb was found but wasn't a master copy */
451 if (error == -ENOTBLK)
452 goto out;
453
454 error = -ENOMEM;
455 r = create_rsb(ls, name, namelen);
456 if (!r)
457 goto out;
458
459 r->res_hash = hash;
460 r->res_bucket = bucket;
461 r->res_nodeid = -1;
462 kref_init(&r->res_ref);
463
464 /* With no directory, the master can be set immediately */
465 if (dlm_no_directory(ls)) {
466 int nodeid = dlm_dir_nodeid(r);
467 if (nodeid == dlm_our_nodeid())
468 nodeid = 0;
469 r->res_nodeid = nodeid;
470 }
471
472 write_lock(&ls->ls_rsbtbl[bucket].lock);
473 error = _search_rsb(ls, name, namelen, bucket, 0, &tmp);
474 if (!error) {
475 write_unlock(&ls->ls_rsbtbl[bucket].lock);
476 free_rsb(r);
477 r = tmp;
478 goto out;
479 }
480 list_add(&r->res_hashchain, &ls->ls_rsbtbl[bucket].list);
481 write_unlock(&ls->ls_rsbtbl[bucket].lock);
482 error = 0;
483 out:
484 *r_ret = r;
485 return error;
486}
487
488int dlm_find_rsb(struct dlm_ls *ls, char *name, int namelen,
489 unsigned int flags, struct dlm_rsb **r_ret)
490{
491 return find_rsb(ls, name, namelen, flags, r_ret);
492}
493
494/* This is only called to add a reference when the code already holds
495 a valid reference to the rsb, so there's no need for locking. */
496
497static inline void hold_rsb(struct dlm_rsb *r)
498{
499 kref_get(&r->res_ref);
500}
501
502void dlm_hold_rsb(struct dlm_rsb *r)
503{
504 hold_rsb(r);
505}
506
507static void toss_rsb(struct kref *kref)
508{
509 struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
510 struct dlm_ls *ls = r->res_ls;
511
512 DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
513 kref_init(&r->res_ref);
514 list_move(&r->res_hashchain, &ls->ls_rsbtbl[r->res_bucket].toss);
515 r->res_toss_time = jiffies;
516 if (r->res_lvbptr) {
517 free_lvb(r->res_lvbptr);
518 r->res_lvbptr = NULL;
519 }
520}
521
522/* When all references to the rsb are gone it's transfered to
523 the tossed list for later disposal. */
524
525static void put_rsb(struct dlm_rsb *r)
526{
527 struct dlm_ls *ls = r->res_ls;
528 uint32_t bucket = r->res_bucket;
529
530 write_lock(&ls->ls_rsbtbl[bucket].lock);
531 kref_put(&r->res_ref, toss_rsb);
532 write_unlock(&ls->ls_rsbtbl[bucket].lock);
533}
534
535void dlm_put_rsb(struct dlm_rsb *r)
536{
537 put_rsb(r);
538}
539
540/* See comment for unhold_lkb */
541
542static void unhold_rsb(struct dlm_rsb *r)
543{
544 int rv;
545 rv = kref_put(&r->res_ref, toss_rsb);
David Teiglanda345da32006-08-18 11:54:25 -0500546 DLM_ASSERT(!rv, dlm_dump_rsb(r););
David Teiglande7fd4172006-01-18 09:30:29 +0000547}
548
549static void kill_rsb(struct kref *kref)
550{
551 struct dlm_rsb *r = container_of(kref, struct dlm_rsb, res_ref);
552
553 /* All work is done after the return from kref_put() so we
554 can release the write_lock before the remove and free. */
555
David Teiglanda345da32006-08-18 11:54:25 -0500556 DLM_ASSERT(list_empty(&r->res_lookup), dlm_dump_rsb(r););
557 DLM_ASSERT(list_empty(&r->res_grantqueue), dlm_dump_rsb(r););
558 DLM_ASSERT(list_empty(&r->res_convertqueue), dlm_dump_rsb(r););
559 DLM_ASSERT(list_empty(&r->res_waitqueue), dlm_dump_rsb(r););
560 DLM_ASSERT(list_empty(&r->res_root_list), dlm_dump_rsb(r););
561 DLM_ASSERT(list_empty(&r->res_recover_list), dlm_dump_rsb(r););
David Teiglande7fd4172006-01-18 09:30:29 +0000562}
563
564/* Attaching/detaching lkb's from rsb's is for rsb reference counting.
565 The rsb must exist as long as any lkb's for it do. */
566
567static void attach_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
568{
569 hold_rsb(r);
570 lkb->lkb_resource = r;
571}
572
573static void detach_lkb(struct dlm_lkb *lkb)
574{
575 if (lkb->lkb_resource) {
576 put_rsb(lkb->lkb_resource);
577 lkb->lkb_resource = NULL;
578 }
579}
580
581static int create_lkb(struct dlm_ls *ls, struct dlm_lkb **lkb_ret)
582{
583 struct dlm_lkb *lkb, *tmp;
584 uint32_t lkid = 0;
585 uint16_t bucket;
586
587 lkb = allocate_lkb(ls);
588 if (!lkb)
589 return -ENOMEM;
590
591 lkb->lkb_nodeid = -1;
592 lkb->lkb_grmode = DLM_LOCK_IV;
593 kref_init(&lkb->lkb_ref);
David Teigland34e22be2006-07-18 11:24:04 -0500594 INIT_LIST_HEAD(&lkb->lkb_ownqueue);
David Teiglandef0c2bb2007-03-28 09:56:46 -0500595 INIT_LIST_HEAD(&lkb->lkb_rsb_lookup);
David Teigland3ae1acf2007-05-18 08:59:31 -0500596 INIT_LIST_HEAD(&lkb->lkb_time_list);
David Teiglande7fd4172006-01-18 09:30:29 +0000597
598 get_random_bytes(&bucket, sizeof(bucket));
599 bucket &= (ls->ls_lkbtbl_size - 1);
600
601 write_lock(&ls->ls_lkbtbl[bucket].lock);
602
603 /* counter can roll over so we must verify lkid is not in use */
604
605 while (lkid == 0) {
David Teiglandce03f122007-04-02 12:12:55 -0500606 lkid = (bucket << 16) | ls->ls_lkbtbl[bucket].counter++;
David Teiglande7fd4172006-01-18 09:30:29 +0000607
608 list_for_each_entry(tmp, &ls->ls_lkbtbl[bucket].list,
609 lkb_idtbl_list) {
610 if (tmp->lkb_id != lkid)
611 continue;
612 lkid = 0;
613 break;
614 }
615 }
616
617 lkb->lkb_id = lkid;
618 list_add(&lkb->lkb_idtbl_list, &ls->ls_lkbtbl[bucket].list);
619 write_unlock(&ls->ls_lkbtbl[bucket].lock);
620
621 *lkb_ret = lkb;
622 return 0;
623}
624
625static struct dlm_lkb *__find_lkb(struct dlm_ls *ls, uint32_t lkid)
626{
David Teiglande7fd4172006-01-18 09:30:29 +0000627 struct dlm_lkb *lkb;
David Teiglandce03f122007-04-02 12:12:55 -0500628 uint16_t bucket = (lkid >> 16);
David Teiglande7fd4172006-01-18 09:30:29 +0000629
630 list_for_each_entry(lkb, &ls->ls_lkbtbl[bucket].list, lkb_idtbl_list) {
631 if (lkb->lkb_id == lkid)
632 return lkb;
633 }
634 return NULL;
635}
636
637static int find_lkb(struct dlm_ls *ls, uint32_t lkid, struct dlm_lkb **lkb_ret)
638{
639 struct dlm_lkb *lkb;
David Teiglandce03f122007-04-02 12:12:55 -0500640 uint16_t bucket = (lkid >> 16);
David Teiglande7fd4172006-01-18 09:30:29 +0000641
642 if (bucket >= ls->ls_lkbtbl_size)
643 return -EBADSLT;
644
645 read_lock(&ls->ls_lkbtbl[bucket].lock);
646 lkb = __find_lkb(ls, lkid);
647 if (lkb)
648 kref_get(&lkb->lkb_ref);
649 read_unlock(&ls->ls_lkbtbl[bucket].lock);
650
651 *lkb_ret = lkb;
652 return lkb ? 0 : -ENOENT;
653}
654
655static void kill_lkb(struct kref *kref)
656{
657 struct dlm_lkb *lkb = container_of(kref, struct dlm_lkb, lkb_ref);
658
659 /* All work is done after the return from kref_put() so we
660 can release the write_lock before the detach_lkb */
661
662 DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
663}
664
David Teiglandb3f58d82006-02-28 11:16:37 -0500665/* __put_lkb() is used when an lkb may not have an rsb attached to
666 it so we need to provide the lockspace explicitly */
667
668static int __put_lkb(struct dlm_ls *ls, struct dlm_lkb *lkb)
David Teiglande7fd4172006-01-18 09:30:29 +0000669{
David Teiglandce03f122007-04-02 12:12:55 -0500670 uint16_t bucket = (lkb->lkb_id >> 16);
David Teiglande7fd4172006-01-18 09:30:29 +0000671
672 write_lock(&ls->ls_lkbtbl[bucket].lock);
673 if (kref_put(&lkb->lkb_ref, kill_lkb)) {
674 list_del(&lkb->lkb_idtbl_list);
675 write_unlock(&ls->ls_lkbtbl[bucket].lock);
676
677 detach_lkb(lkb);
678
679 /* for local/process lkbs, lvbptr points to caller's lksb */
680 if (lkb->lkb_lvbptr && is_master_copy(lkb))
681 free_lvb(lkb->lkb_lvbptr);
David Teiglande7fd4172006-01-18 09:30:29 +0000682 free_lkb(lkb);
683 return 1;
684 } else {
685 write_unlock(&ls->ls_lkbtbl[bucket].lock);
686 return 0;
687 }
688}
689
690int dlm_put_lkb(struct dlm_lkb *lkb)
691{
David Teiglandb3f58d82006-02-28 11:16:37 -0500692 struct dlm_ls *ls;
693
694 DLM_ASSERT(lkb->lkb_resource, dlm_print_lkb(lkb););
695 DLM_ASSERT(lkb->lkb_resource->res_ls, dlm_print_lkb(lkb););
696
697 ls = lkb->lkb_resource->res_ls;
698 return __put_lkb(ls, lkb);
David Teiglande7fd4172006-01-18 09:30:29 +0000699}
700
701/* This is only called to add a reference when the code already holds
702 a valid reference to the lkb, so there's no need for locking. */
703
704static inline void hold_lkb(struct dlm_lkb *lkb)
705{
706 kref_get(&lkb->lkb_ref);
707}
708
709/* This is called when we need to remove a reference and are certain
710 it's not the last ref. e.g. del_lkb is always called between a
711 find_lkb/put_lkb and is always the inverse of a previous add_lkb.
712 put_lkb would work fine, but would involve unnecessary locking */
713
714static inline void unhold_lkb(struct dlm_lkb *lkb)
715{
716 int rv;
717 rv = kref_put(&lkb->lkb_ref, kill_lkb);
718 DLM_ASSERT(!rv, dlm_print_lkb(lkb););
719}
720
721static void lkb_add_ordered(struct list_head *new, struct list_head *head,
722 int mode)
723{
724 struct dlm_lkb *lkb = NULL;
725
726 list_for_each_entry(lkb, head, lkb_statequeue)
727 if (lkb->lkb_rqmode < mode)
728 break;
729
730 if (!lkb)
731 list_add_tail(new, head);
732 else
733 __list_add(new, lkb->lkb_statequeue.prev, &lkb->lkb_statequeue);
734}
735
736/* add/remove lkb to rsb's grant/convert/wait queue */
737
738static void add_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int status)
739{
740 kref_get(&lkb->lkb_ref);
741
742 DLM_ASSERT(!lkb->lkb_status, dlm_print_lkb(lkb););
743
744 lkb->lkb_status = status;
745
746 switch (status) {
747 case DLM_LKSTS_WAITING:
748 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
749 list_add(&lkb->lkb_statequeue, &r->res_waitqueue);
750 else
751 list_add_tail(&lkb->lkb_statequeue, &r->res_waitqueue);
752 break;
753 case DLM_LKSTS_GRANTED:
754 /* convention says granted locks kept in order of grmode */
755 lkb_add_ordered(&lkb->lkb_statequeue, &r->res_grantqueue,
756 lkb->lkb_grmode);
757 break;
758 case DLM_LKSTS_CONVERT:
759 if (lkb->lkb_exflags & DLM_LKF_HEADQUE)
760 list_add(&lkb->lkb_statequeue, &r->res_convertqueue);
761 else
762 list_add_tail(&lkb->lkb_statequeue,
763 &r->res_convertqueue);
764 break;
765 default:
766 DLM_ASSERT(0, dlm_print_lkb(lkb); printk("sts=%d\n", status););
767 }
768}
769
770static void del_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb)
771{
772 lkb->lkb_status = 0;
773 list_del(&lkb->lkb_statequeue);
774 unhold_lkb(lkb);
775}
776
777static void move_lkb(struct dlm_rsb *r, struct dlm_lkb *lkb, int sts)
778{
779 hold_lkb(lkb);
780 del_lkb(r, lkb);
781 add_lkb(r, lkb, sts);
782 unhold_lkb(lkb);
783}
784
David Teiglandef0c2bb2007-03-28 09:56:46 -0500785static int msg_reply_type(int mstype)
786{
787 switch (mstype) {
788 case DLM_MSG_REQUEST:
789 return DLM_MSG_REQUEST_REPLY;
790 case DLM_MSG_CONVERT:
791 return DLM_MSG_CONVERT_REPLY;
792 case DLM_MSG_UNLOCK:
793 return DLM_MSG_UNLOCK_REPLY;
794 case DLM_MSG_CANCEL:
795 return DLM_MSG_CANCEL_REPLY;
796 case DLM_MSG_LOOKUP:
797 return DLM_MSG_LOOKUP_REPLY;
798 }
799 return -1;
800}
801
David Teiglande7fd4172006-01-18 09:30:29 +0000802/* add/remove lkb from global waiters list of lkb's waiting for
803 a reply from a remote node */
804
David Teiglandef0c2bb2007-03-28 09:56:46 -0500805static int add_to_waiters(struct dlm_lkb *lkb, int mstype)
David Teiglande7fd4172006-01-18 09:30:29 +0000806{
807 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
David Teiglandef0c2bb2007-03-28 09:56:46 -0500808 int error = 0;
David Teiglande7fd4172006-01-18 09:30:29 +0000809
David Teigland90135922006-01-20 08:47:07 +0000810 mutex_lock(&ls->ls_waiters_mutex);
David Teiglandef0c2bb2007-03-28 09:56:46 -0500811
812 if (is_overlap_unlock(lkb) ||
813 (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL))) {
814 error = -EINVAL;
David Teiglande7fd4172006-01-18 09:30:29 +0000815 goto out;
816 }
David Teiglandef0c2bb2007-03-28 09:56:46 -0500817
818 if (lkb->lkb_wait_type || is_overlap_cancel(lkb)) {
819 switch (mstype) {
820 case DLM_MSG_UNLOCK:
821 lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
822 break;
823 case DLM_MSG_CANCEL:
824 lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
825 break;
826 default:
827 error = -EBUSY;
828 goto out;
829 }
830 lkb->lkb_wait_count++;
831 hold_lkb(lkb);
832
833 log_debug(ls, "add overlap %x cur %d new %d count %d flags %x",
834 lkb->lkb_id, lkb->lkb_wait_type, mstype,
835 lkb->lkb_wait_count, lkb->lkb_flags);
836 goto out;
837 }
838
839 DLM_ASSERT(!lkb->lkb_wait_count,
840 dlm_print_lkb(lkb);
841 printk("wait_count %d\n", lkb->lkb_wait_count););
842
843 lkb->lkb_wait_count++;
David Teiglande7fd4172006-01-18 09:30:29 +0000844 lkb->lkb_wait_type = mstype;
David Teiglandef0c2bb2007-03-28 09:56:46 -0500845 hold_lkb(lkb);
David Teiglande7fd4172006-01-18 09:30:29 +0000846 list_add(&lkb->lkb_wait_reply, &ls->ls_waiters);
847 out:
David Teiglandef0c2bb2007-03-28 09:56:46 -0500848 if (error)
849 log_error(ls, "add_to_waiters %x error %d flags %x %d %d %s",
850 lkb->lkb_id, error, lkb->lkb_flags, mstype,
851 lkb->lkb_wait_type, lkb->lkb_resource->res_name);
David Teigland90135922006-01-20 08:47:07 +0000852 mutex_unlock(&ls->ls_waiters_mutex);
David Teiglandef0c2bb2007-03-28 09:56:46 -0500853 return error;
David Teiglande7fd4172006-01-18 09:30:29 +0000854}
855
David Teiglandb790c3b2007-01-24 10:21:33 -0600856/* We clear the RESEND flag because we might be taking an lkb off the waiters
857 list as part of process_requestqueue (e.g. a lookup that has an optimized
858 request reply on the requestqueue) between dlm_recover_waiters_pre() which
859 set RESEND and dlm_recover_waiters_post() */
860
David Teiglandef0c2bb2007-03-28 09:56:46 -0500861static int _remove_from_waiters(struct dlm_lkb *lkb, int mstype)
David Teiglande7fd4172006-01-18 09:30:29 +0000862{
David Teiglandef0c2bb2007-03-28 09:56:46 -0500863 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
864 int overlap_done = 0;
David Teiglande7fd4172006-01-18 09:30:29 +0000865
David Teiglandef0c2bb2007-03-28 09:56:46 -0500866 if (is_overlap_unlock(lkb) && (mstype == DLM_MSG_UNLOCK_REPLY)) {
867 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
868 overlap_done = 1;
869 goto out_del;
David Teiglande7fd4172006-01-18 09:30:29 +0000870 }
David Teiglandef0c2bb2007-03-28 09:56:46 -0500871
872 if (is_overlap_cancel(lkb) && (mstype == DLM_MSG_CANCEL_REPLY)) {
873 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
874 overlap_done = 1;
875 goto out_del;
876 }
877
878 /* N.B. type of reply may not always correspond to type of original
879 msg due to lookup->request optimization, verify others? */
880
881 if (lkb->lkb_wait_type) {
882 lkb->lkb_wait_type = 0;
883 goto out_del;
884 }
885
886 log_error(ls, "remove_from_waiters lkid %x flags %x types %d %d",
887 lkb->lkb_id, lkb->lkb_flags, mstype, lkb->lkb_wait_type);
888 return -1;
889
890 out_del:
891 /* the force-unlock/cancel has completed and we haven't recvd a reply
892 to the op that was in progress prior to the unlock/cancel; we
893 give up on any reply to the earlier op. FIXME: not sure when/how
894 this would happen */
895
896 if (overlap_done && lkb->lkb_wait_type) {
897 log_error(ls, "remove_from_waiters %x reply %d give up on %d",
898 lkb->lkb_id, mstype, lkb->lkb_wait_type);
899 lkb->lkb_wait_count--;
900 lkb->lkb_wait_type = 0;
901 }
902
903 DLM_ASSERT(lkb->lkb_wait_count, dlm_print_lkb(lkb););
904
David Teiglandb790c3b2007-01-24 10:21:33 -0600905 lkb->lkb_flags &= ~DLM_IFL_RESEND;
David Teiglandef0c2bb2007-03-28 09:56:46 -0500906 lkb->lkb_wait_count--;
907 if (!lkb->lkb_wait_count)
908 list_del_init(&lkb->lkb_wait_reply);
David Teiglande7fd4172006-01-18 09:30:29 +0000909 unhold_lkb(lkb);
David Teiglandef0c2bb2007-03-28 09:56:46 -0500910 return 0;
David Teiglande7fd4172006-01-18 09:30:29 +0000911}
912
David Teiglandef0c2bb2007-03-28 09:56:46 -0500913static int remove_from_waiters(struct dlm_lkb *lkb, int mstype)
David Teiglande7fd4172006-01-18 09:30:29 +0000914{
915 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
916 int error;
917
David Teigland90135922006-01-20 08:47:07 +0000918 mutex_lock(&ls->ls_waiters_mutex);
David Teiglandef0c2bb2007-03-28 09:56:46 -0500919 error = _remove_from_waiters(lkb, mstype);
David Teigland90135922006-01-20 08:47:07 +0000920 mutex_unlock(&ls->ls_waiters_mutex);
David Teiglande7fd4172006-01-18 09:30:29 +0000921 return error;
922}
923
David Teiglandef0c2bb2007-03-28 09:56:46 -0500924/* Handles situations where we might be processing a "fake" or "stub" reply in
925 which we can't try to take waiters_mutex again. */
926
927static int remove_from_waiters_ms(struct dlm_lkb *lkb, struct dlm_message *ms)
928{
929 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
930 int error;
931
932 if (ms != &ls->ls_stub_ms)
933 mutex_lock(&ls->ls_waiters_mutex);
934 error = _remove_from_waiters(lkb, ms->m_type);
935 if (ms != &ls->ls_stub_ms)
936 mutex_unlock(&ls->ls_waiters_mutex);
937 return error;
938}
939
David Teiglande7fd4172006-01-18 09:30:29 +0000940static void dir_remove(struct dlm_rsb *r)
941{
942 int to_nodeid;
943
944 if (dlm_no_directory(r->res_ls))
945 return;
946
947 to_nodeid = dlm_dir_nodeid(r);
948 if (to_nodeid != dlm_our_nodeid())
949 send_remove(r);
950 else
951 dlm_dir_remove_entry(r->res_ls, to_nodeid,
952 r->res_name, r->res_length);
953}
954
955/* FIXME: shouldn't this be able to exit as soon as one non-due rsb is
956 found since they are in order of newest to oldest? */
957
958static int shrink_bucket(struct dlm_ls *ls, int b)
959{
960 struct dlm_rsb *r;
961 int count = 0, found;
962
963 for (;;) {
David Teigland90135922006-01-20 08:47:07 +0000964 found = 0;
David Teiglande7fd4172006-01-18 09:30:29 +0000965 write_lock(&ls->ls_rsbtbl[b].lock);
966 list_for_each_entry_reverse(r, &ls->ls_rsbtbl[b].toss,
967 res_hashchain) {
968 if (!time_after_eq(jiffies, r->res_toss_time +
David Teigland68c817a2007-01-09 09:41:48 -0600969 dlm_config.ci_toss_secs * HZ))
David Teiglande7fd4172006-01-18 09:30:29 +0000970 continue;
David Teigland90135922006-01-20 08:47:07 +0000971 found = 1;
David Teiglande7fd4172006-01-18 09:30:29 +0000972 break;
973 }
974
975 if (!found) {
976 write_unlock(&ls->ls_rsbtbl[b].lock);
977 break;
978 }
979
980 if (kref_put(&r->res_ref, kill_rsb)) {
981 list_del(&r->res_hashchain);
982 write_unlock(&ls->ls_rsbtbl[b].lock);
983
984 if (is_master(r))
985 dir_remove(r);
986 free_rsb(r);
987 count++;
988 } else {
989 write_unlock(&ls->ls_rsbtbl[b].lock);
990 log_error(ls, "tossed rsb in use %s", r->res_name);
991 }
992 }
993
994 return count;
995}
996
997void dlm_scan_rsbs(struct dlm_ls *ls)
998{
999 int i;
1000
David Teiglande7fd4172006-01-18 09:30:29 +00001001 for (i = 0; i < ls->ls_rsbtbl_size; i++) {
1002 shrink_bucket(ls, i);
David Teigland85e86ed2007-05-18 08:58:15 -05001003 if (dlm_locking_stopped(ls))
1004 break;
David Teiglande7fd4172006-01-18 09:30:29 +00001005 cond_resched();
1006 }
1007}
1008
David Teigland3ae1acf2007-05-18 08:59:31 -05001009static void add_timeout(struct dlm_lkb *lkb)
1010{
1011 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1012
David Teigland84d8cd62007-05-29 08:44:23 -05001013 if (is_master_copy(lkb)) {
1014 lkb->lkb_timestamp = jiffies;
David Teigland3ae1acf2007-05-18 08:59:31 -05001015 return;
David Teigland84d8cd62007-05-29 08:44:23 -05001016 }
David Teigland3ae1acf2007-05-18 08:59:31 -05001017
1018 if (test_bit(LSFL_TIMEWARN, &ls->ls_flags) &&
1019 !(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
1020 lkb->lkb_flags |= DLM_IFL_WATCH_TIMEWARN;
1021 goto add_it;
1022 }
David Teigland84d8cd62007-05-29 08:44:23 -05001023 if (lkb->lkb_exflags & DLM_LKF_TIMEOUT)
1024 goto add_it;
David Teigland3ae1acf2007-05-18 08:59:31 -05001025 return;
1026
1027 add_it:
1028 DLM_ASSERT(list_empty(&lkb->lkb_time_list), dlm_print_lkb(lkb););
1029 mutex_lock(&ls->ls_timeout_mutex);
1030 hold_lkb(lkb);
1031 lkb->lkb_timestamp = jiffies;
1032 list_add_tail(&lkb->lkb_time_list, &ls->ls_timeout);
1033 mutex_unlock(&ls->ls_timeout_mutex);
1034}
1035
1036static void del_timeout(struct dlm_lkb *lkb)
1037{
1038 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
1039
1040 mutex_lock(&ls->ls_timeout_mutex);
1041 if (!list_empty(&lkb->lkb_time_list)) {
1042 list_del_init(&lkb->lkb_time_list);
1043 unhold_lkb(lkb);
1044 }
1045 mutex_unlock(&ls->ls_timeout_mutex);
1046}
1047
1048/* FIXME: is it safe to look at lkb_exflags, lkb_flags, lkb_timestamp, and
1049 lkb_lksb_timeout without lock_rsb? Note: we can't lock timeout_mutex
1050 and then lock rsb because of lock ordering in add_timeout. We may need
1051 to specify some special timeout-related bits in the lkb that are just to
1052 be accessed under the timeout_mutex. */
1053
1054void dlm_scan_timeout(struct dlm_ls *ls)
1055{
1056 struct dlm_rsb *r;
1057 struct dlm_lkb *lkb;
1058 int do_cancel, do_warn;
1059
1060 for (;;) {
1061 if (dlm_locking_stopped(ls))
1062 break;
1063
1064 do_cancel = 0;
1065 do_warn = 0;
1066 mutex_lock(&ls->ls_timeout_mutex);
1067 list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list) {
1068
1069 if ((lkb->lkb_exflags & DLM_LKF_TIMEOUT) &&
1070 time_after_eq(jiffies, lkb->lkb_timestamp +
1071 lkb->lkb_timeout_cs * HZ/100))
1072 do_cancel = 1;
1073
1074 if ((lkb->lkb_flags & DLM_IFL_WATCH_TIMEWARN) &&
1075 time_after_eq(jiffies, lkb->lkb_timestamp +
1076 dlm_config.ci_timewarn_cs * HZ/100))
1077 do_warn = 1;
1078
1079 if (!do_cancel && !do_warn)
1080 continue;
1081 hold_lkb(lkb);
1082 break;
1083 }
1084 mutex_unlock(&ls->ls_timeout_mutex);
1085
1086 if (!do_cancel && !do_warn)
1087 break;
1088
1089 r = lkb->lkb_resource;
1090 hold_rsb(r);
1091 lock_rsb(r);
1092
1093 if (do_warn) {
1094 /* clear flag so we only warn once */
1095 lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1096 if (!(lkb->lkb_exflags & DLM_LKF_TIMEOUT))
1097 del_timeout(lkb);
1098 dlm_timeout_warn(lkb);
1099 }
1100
1101 if (do_cancel) {
Steven Whitehouseb3cab7b2007-05-29 11:14:21 +01001102 log_debug(ls, "timeout cancel %x node %d %s",
David Teigland639aca42007-05-18 16:02:57 -05001103 lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
David Teigland3ae1acf2007-05-18 08:59:31 -05001104 lkb->lkb_flags &= ~DLM_IFL_WATCH_TIMEWARN;
1105 lkb->lkb_flags |= DLM_IFL_TIMEOUT_CANCEL;
1106 del_timeout(lkb);
1107 _cancel_lock(r, lkb);
1108 }
1109
1110 unlock_rsb(r);
1111 unhold_rsb(r);
1112 dlm_put_lkb(lkb);
1113 }
1114}
1115
1116/* This is only called by dlm_recoverd, and we rely on dlm_ls_stop() stopping
1117 dlm_recoverd before checking/setting ls_recover_begin. */
1118
1119void dlm_adjust_timeouts(struct dlm_ls *ls)
1120{
1121 struct dlm_lkb *lkb;
1122 long adj = jiffies - ls->ls_recover_begin;
1123
1124 ls->ls_recover_begin = 0;
1125 mutex_lock(&ls->ls_timeout_mutex);
1126 list_for_each_entry(lkb, &ls->ls_timeout, lkb_time_list)
1127 lkb->lkb_timestamp += adj;
1128 mutex_unlock(&ls->ls_timeout_mutex);
1129}
1130
David Teiglande7fd4172006-01-18 09:30:29 +00001131/* lkb is master or local copy */
1132
1133static void set_lvb_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1134{
1135 int b, len = r->res_ls->ls_lvblen;
1136
1137 /* b=1 lvb returned to caller
1138 b=0 lvb written to rsb or invalidated
1139 b=-1 do nothing */
1140
1141 b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
1142
1143 if (b == 1) {
1144 if (!lkb->lkb_lvbptr)
1145 return;
1146
1147 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1148 return;
1149
1150 if (!r->res_lvbptr)
1151 return;
1152
1153 memcpy(lkb->lkb_lvbptr, r->res_lvbptr, len);
1154 lkb->lkb_lvbseq = r->res_lvbseq;
1155
1156 } else if (b == 0) {
1157 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1158 rsb_set_flag(r, RSB_VALNOTVALID);
1159 return;
1160 }
1161
1162 if (!lkb->lkb_lvbptr)
1163 return;
1164
1165 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1166 return;
1167
1168 if (!r->res_lvbptr)
1169 r->res_lvbptr = allocate_lvb(r->res_ls);
1170
1171 if (!r->res_lvbptr)
1172 return;
1173
1174 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, len);
1175 r->res_lvbseq++;
1176 lkb->lkb_lvbseq = r->res_lvbseq;
1177 rsb_clear_flag(r, RSB_VALNOTVALID);
1178 }
1179
1180 if (rsb_flag(r, RSB_VALNOTVALID))
1181 lkb->lkb_sbflags |= DLM_SBF_VALNOTVALID;
1182}
1183
1184static void set_lvb_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1185{
1186 if (lkb->lkb_grmode < DLM_LOCK_PW)
1187 return;
1188
1189 if (lkb->lkb_exflags & DLM_LKF_IVVALBLK) {
1190 rsb_set_flag(r, RSB_VALNOTVALID);
1191 return;
1192 }
1193
1194 if (!lkb->lkb_lvbptr)
1195 return;
1196
1197 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1198 return;
1199
1200 if (!r->res_lvbptr)
1201 r->res_lvbptr = allocate_lvb(r->res_ls);
1202
1203 if (!r->res_lvbptr)
1204 return;
1205
1206 memcpy(r->res_lvbptr, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
1207 r->res_lvbseq++;
1208 rsb_clear_flag(r, RSB_VALNOTVALID);
1209}
1210
1211/* lkb is process copy (pc) */
1212
1213static void set_lvb_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1214 struct dlm_message *ms)
1215{
1216 int b;
1217
1218 if (!lkb->lkb_lvbptr)
1219 return;
1220
1221 if (!(lkb->lkb_exflags & DLM_LKF_VALBLK))
1222 return;
1223
David Teigland597d0ca2006-07-12 16:44:04 -05001224 b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1];
David Teiglande7fd4172006-01-18 09:30:29 +00001225 if (b == 1) {
1226 int len = receive_extralen(ms);
1227 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
1228 lkb->lkb_lvbseq = ms->m_lvbseq;
1229 }
1230}
1231
1232/* Manipulate lkb's on rsb's convert/granted/waiting queues
1233 remove_lock -- used for unlock, removes lkb from granted
1234 revert_lock -- used for cancel, moves lkb from convert to granted
1235 grant_lock -- used for request and convert, adds lkb to granted or
1236 moves lkb from convert or waiting to granted
1237
1238 Each of these is used for master or local copy lkb's. There is
1239 also a _pc() variation used to make the corresponding change on
1240 a process copy (pc) lkb. */
1241
1242static void _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1243{
1244 del_lkb(r, lkb);
1245 lkb->lkb_grmode = DLM_LOCK_IV;
1246 /* this unhold undoes the original ref from create_lkb()
1247 so this leads to the lkb being freed */
1248 unhold_lkb(lkb);
1249}
1250
1251static void remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1252{
1253 set_lvb_unlock(r, lkb);
1254 _remove_lock(r, lkb);
1255}
1256
1257static void remove_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
1258{
1259 _remove_lock(r, lkb);
1260}
1261
David Teiglandef0c2bb2007-03-28 09:56:46 -05001262/* returns: 0 did nothing
1263 1 moved lock to granted
1264 -1 removed lock */
1265
1266static int revert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
David Teiglande7fd4172006-01-18 09:30:29 +00001267{
David Teiglandef0c2bb2007-03-28 09:56:46 -05001268 int rv = 0;
1269
David Teiglande7fd4172006-01-18 09:30:29 +00001270 lkb->lkb_rqmode = DLM_LOCK_IV;
1271
1272 switch (lkb->lkb_status) {
David Teigland597d0ca2006-07-12 16:44:04 -05001273 case DLM_LKSTS_GRANTED:
1274 break;
David Teiglande7fd4172006-01-18 09:30:29 +00001275 case DLM_LKSTS_CONVERT:
1276 move_lkb(r, lkb, DLM_LKSTS_GRANTED);
David Teiglandef0c2bb2007-03-28 09:56:46 -05001277 rv = 1;
David Teiglande7fd4172006-01-18 09:30:29 +00001278 break;
1279 case DLM_LKSTS_WAITING:
1280 del_lkb(r, lkb);
1281 lkb->lkb_grmode = DLM_LOCK_IV;
1282 /* this unhold undoes the original ref from create_lkb()
1283 so this leads to the lkb being freed */
1284 unhold_lkb(lkb);
David Teiglandef0c2bb2007-03-28 09:56:46 -05001285 rv = -1;
David Teiglande7fd4172006-01-18 09:30:29 +00001286 break;
1287 default:
1288 log_print("invalid status for revert %d", lkb->lkb_status);
1289 }
David Teiglandef0c2bb2007-03-28 09:56:46 -05001290 return rv;
David Teiglande7fd4172006-01-18 09:30:29 +00001291}
1292
David Teiglandef0c2bb2007-03-28 09:56:46 -05001293static int revert_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb)
David Teiglande7fd4172006-01-18 09:30:29 +00001294{
David Teiglandef0c2bb2007-03-28 09:56:46 -05001295 return revert_lock(r, lkb);
David Teiglande7fd4172006-01-18 09:30:29 +00001296}
1297
1298static void _grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1299{
1300 if (lkb->lkb_grmode != lkb->lkb_rqmode) {
1301 lkb->lkb_grmode = lkb->lkb_rqmode;
1302 if (lkb->lkb_status)
1303 move_lkb(r, lkb, DLM_LKSTS_GRANTED);
1304 else
1305 add_lkb(r, lkb, DLM_LKSTS_GRANTED);
1306 }
1307
1308 lkb->lkb_rqmode = DLM_LOCK_IV;
David Teiglande7fd4172006-01-18 09:30:29 +00001309}
1310
1311static void grant_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
1312{
1313 set_lvb_lock(r, lkb);
1314 _grant_lock(r, lkb);
1315 lkb->lkb_highbast = 0;
1316}
1317
1318static void grant_lock_pc(struct dlm_rsb *r, struct dlm_lkb *lkb,
1319 struct dlm_message *ms)
1320{
1321 set_lvb_lock_pc(r, lkb, ms);
1322 _grant_lock(r, lkb);
1323}
1324
1325/* called by grant_pending_locks() which means an async grant message must
1326 be sent to the requesting node in addition to granting the lock if the
1327 lkb belongs to a remote node. */
1328
1329static void grant_lock_pending(struct dlm_rsb *r, struct dlm_lkb *lkb)
1330{
1331 grant_lock(r, lkb);
1332 if (is_master_copy(lkb))
1333 send_grant(r, lkb);
1334 else
1335 queue_cast(r, lkb, 0);
1336}
1337
David Teigland7d3c1fe2007-04-19 10:30:41 -05001338/* The special CONVDEADLK, ALTPR and ALTCW flags allow the master to
1339 change the granted/requested modes. We're munging things accordingly in
1340 the process copy.
1341 CONVDEADLK: our grmode may have been forced down to NL to resolve a
1342 conversion deadlock
1343 ALTPR/ALTCW: our rqmode may have been changed to PR or CW to become
1344 compatible with other granted locks */
1345
1346static void munge_demoted(struct dlm_lkb *lkb, struct dlm_message *ms)
1347{
1348 if (ms->m_type != DLM_MSG_CONVERT_REPLY) {
1349 log_print("munge_demoted %x invalid reply type %d",
1350 lkb->lkb_id, ms->m_type);
1351 return;
1352 }
1353
1354 if (lkb->lkb_rqmode == DLM_LOCK_IV || lkb->lkb_grmode == DLM_LOCK_IV) {
1355 log_print("munge_demoted %x invalid modes gr %d rq %d",
1356 lkb->lkb_id, lkb->lkb_grmode, lkb->lkb_rqmode);
1357 return;
1358 }
1359
1360 lkb->lkb_grmode = DLM_LOCK_NL;
1361}
1362
1363static void munge_altmode(struct dlm_lkb *lkb, struct dlm_message *ms)
1364{
1365 if (ms->m_type != DLM_MSG_REQUEST_REPLY &&
1366 ms->m_type != DLM_MSG_GRANT) {
1367 log_print("munge_altmode %x invalid reply type %d",
1368 lkb->lkb_id, ms->m_type);
1369 return;
1370 }
1371
1372 if (lkb->lkb_exflags & DLM_LKF_ALTPR)
1373 lkb->lkb_rqmode = DLM_LOCK_PR;
1374 else if (lkb->lkb_exflags & DLM_LKF_ALTCW)
1375 lkb->lkb_rqmode = DLM_LOCK_CW;
1376 else {
1377 log_print("munge_altmode invalid exflags %x", lkb->lkb_exflags);
1378 dlm_print_lkb(lkb);
1379 }
1380}
1381
David Teiglande7fd4172006-01-18 09:30:29 +00001382static inline int first_in_list(struct dlm_lkb *lkb, struct list_head *head)
1383{
1384 struct dlm_lkb *first = list_entry(head->next, struct dlm_lkb,
1385 lkb_statequeue);
1386 if (lkb->lkb_id == first->lkb_id)
David Teigland90135922006-01-20 08:47:07 +00001387 return 1;
David Teiglande7fd4172006-01-18 09:30:29 +00001388
David Teigland90135922006-01-20 08:47:07 +00001389 return 0;
David Teiglande7fd4172006-01-18 09:30:29 +00001390}
1391
David Teiglande7fd4172006-01-18 09:30:29 +00001392/* Check if the given lkb conflicts with another lkb on the queue. */
1393
1394static int queue_conflict(struct list_head *head, struct dlm_lkb *lkb)
1395{
1396 struct dlm_lkb *this;
1397
1398 list_for_each_entry(this, head, lkb_statequeue) {
1399 if (this == lkb)
1400 continue;
David Teigland3bcd3682006-02-23 09:56:38 +00001401 if (!modes_compat(this, lkb))
David Teigland90135922006-01-20 08:47:07 +00001402 return 1;
David Teiglande7fd4172006-01-18 09:30:29 +00001403 }
David Teigland90135922006-01-20 08:47:07 +00001404 return 0;
David Teiglande7fd4172006-01-18 09:30:29 +00001405}
1406
1407/*
1408 * "A conversion deadlock arises with a pair of lock requests in the converting
1409 * queue for one resource. The granted mode of each lock blocks the requested
1410 * mode of the other lock."
1411 *
David Teiglandc85d65e2007-05-18 09:01:26 -05001412 * Part 2: if the granted mode of lkb is preventing an earlier lkb in the
1413 * convert queue from being granted, then deadlk/demote lkb.
David Teiglande7fd4172006-01-18 09:30:29 +00001414 *
1415 * Example:
1416 * Granted Queue: empty
1417 * Convert Queue: NL->EX (first lock)
1418 * PR->EX (second lock)
1419 *
1420 * The first lock can't be granted because of the granted mode of the second
1421 * lock and the second lock can't be granted because it's not first in the
David Teiglandc85d65e2007-05-18 09:01:26 -05001422 * list. We either cancel lkb's conversion (PR->EX) and return EDEADLK, or we
1423 * demote the granted mode of lkb (from PR to NL) if it has the CONVDEADLK
1424 * flag set and return DEMOTED in the lksb flags.
David Teiglande7fd4172006-01-18 09:30:29 +00001425 *
David Teiglandc85d65e2007-05-18 09:01:26 -05001426 * Originally, this function detected conv-deadlk in a more limited scope:
1427 * - if !modes_compat(lkb1, lkb2) && !modes_compat(lkb2, lkb1), or
1428 * - if lkb1 was the first entry in the queue (not just earlier), and was
1429 * blocked by the granted mode of lkb2, and there was nothing on the
1430 * granted queue preventing lkb1 from being granted immediately, i.e.
1431 * lkb2 was the only thing preventing lkb1 from being granted.
1432 *
1433 * That second condition meant we'd only say there was conv-deadlk if
1434 * resolving it (by demotion) would lead to the first lock on the convert
1435 * queue being granted right away. It allowed conversion deadlocks to exist
1436 * between locks on the convert queue while they couldn't be granted anyway.
1437 *
1438 * Now, we detect and take action on conversion deadlocks immediately when
1439 * they're created, even if they may not be immediately consequential. If
1440 * lkb1 exists anywhere in the convert queue and lkb2 comes in with a granted
1441 * mode that would prevent lkb1's conversion from being granted, we do a
1442 * deadlk/demote on lkb2 right away and don't let it onto the convert queue.
1443 * I think this means that the lkb_is_ahead condition below should always
1444 * be zero, i.e. there will never be conv-deadlk between two locks that are
1445 * both already on the convert queue.
David Teiglande7fd4172006-01-18 09:30:29 +00001446 */
1447
David Teiglandc85d65e2007-05-18 09:01:26 -05001448static int conversion_deadlock_detect(struct dlm_rsb *r, struct dlm_lkb *lkb2)
David Teiglande7fd4172006-01-18 09:30:29 +00001449{
David Teiglandc85d65e2007-05-18 09:01:26 -05001450 struct dlm_lkb *lkb1;
1451 int lkb_is_ahead = 0;
David Teiglande7fd4172006-01-18 09:30:29 +00001452
David Teiglandc85d65e2007-05-18 09:01:26 -05001453 list_for_each_entry(lkb1, &r->res_convertqueue, lkb_statequeue) {
1454 if (lkb1 == lkb2) {
1455 lkb_is_ahead = 1;
David Teiglande7fd4172006-01-18 09:30:29 +00001456 continue;
1457 }
1458
David Teiglandc85d65e2007-05-18 09:01:26 -05001459 if (!lkb_is_ahead) {
1460 if (!modes_compat(lkb2, lkb1))
1461 return 1;
1462 } else {
1463 if (!modes_compat(lkb2, lkb1) &&
1464 !modes_compat(lkb1, lkb2))
1465 return 1;
1466 }
David Teiglande7fd4172006-01-18 09:30:29 +00001467 }
David Teigland90135922006-01-20 08:47:07 +00001468 return 0;
David Teiglande7fd4172006-01-18 09:30:29 +00001469}
1470
1471/*
1472 * Return 1 if the lock can be granted, 0 otherwise.
1473 * Also detect and resolve conversion deadlocks.
1474 *
1475 * lkb is the lock to be granted
1476 *
1477 * now is 1 if the function is being called in the context of the
1478 * immediate request, it is 0 if called later, after the lock has been
1479 * queued.
1480 *
1481 * References are from chapter 6 of "VAXcluster Principles" by Roy Davis
1482 */
1483
1484static int _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now)
1485{
1486 int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV);
1487
1488 /*
1489 * 6-10: Version 5.4 introduced an option to address the phenomenon of
1490 * a new request for a NL mode lock being blocked.
1491 *
1492 * 6-11: If the optional EXPEDITE flag is used with the new NL mode
1493 * request, then it would be granted. In essence, the use of this flag
1494 * tells the Lock Manager to expedite theis request by not considering
1495 * what may be in the CONVERTING or WAITING queues... As of this
1496 * writing, the EXPEDITE flag can be used only with new requests for NL
1497 * mode locks. This flag is not valid for conversion requests.
1498 *
1499 * A shortcut. Earlier checks return an error if EXPEDITE is used in a
1500 * conversion or used with a non-NL requested mode. We also know an
1501 * EXPEDITE request is always granted immediately, so now must always
1502 * be 1. The full condition to grant an expedite request: (now &&
1503 * !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can
1504 * therefore be shortened to just checking the flag.
1505 */
1506
1507 if (lkb->lkb_exflags & DLM_LKF_EXPEDITE)
David Teigland90135922006-01-20 08:47:07 +00001508 return 1;
David Teiglande7fd4172006-01-18 09:30:29 +00001509
1510 /*
1511 * A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be
1512 * added to the remaining conditions.
1513 */
1514
1515 if (queue_conflict(&r->res_grantqueue, lkb))
1516 goto out;
1517
1518 /*
1519 * 6-3: By default, a conversion request is immediately granted if the
1520 * requested mode is compatible with the modes of all other granted
1521 * locks
1522 */
1523
1524 if (queue_conflict(&r->res_convertqueue, lkb))
1525 goto out;
1526
1527 /*
1528 * 6-5: But the default algorithm for deciding whether to grant or
1529 * queue conversion requests does not by itself guarantee that such
1530 * requests are serviced on a "first come first serve" basis. This, in
1531 * turn, can lead to a phenomenon known as "indefinate postponement".
1532 *
1533 * 6-7: This issue is dealt with by using the optional QUECVT flag with
1534 * the system service employed to request a lock conversion. This flag
1535 * forces certain conversion requests to be queued, even if they are
1536 * compatible with the granted modes of other locks on the same
1537 * resource. Thus, the use of this flag results in conversion requests
1538 * being ordered on a "first come first servce" basis.
1539 *
1540 * DCT: This condition is all about new conversions being able to occur
1541 * "in place" while the lock remains on the granted queue (assuming
1542 * nothing else conflicts.) IOW if QUECVT isn't set, a conversion
1543 * doesn't _have_ to go onto the convert queue where it's processed in
1544 * order. The "now" variable is necessary to distinguish converts
1545 * being received and processed for the first time now, because once a
1546 * convert is moved to the conversion queue the condition below applies
1547 * requiring fifo granting.
1548 */
1549
1550 if (now && conv && !(lkb->lkb_exflags & DLM_LKF_QUECVT))
David Teigland90135922006-01-20 08:47:07 +00001551 return 1;
David Teiglande7fd4172006-01-18 09:30:29 +00001552
1553 /*
David Teigland3bcd3682006-02-23 09:56:38 +00001554 * The NOORDER flag is set to avoid the standard vms rules on grant
1555 * order.
David Teiglande7fd4172006-01-18 09:30:29 +00001556 */
1557
1558 if (lkb->lkb_exflags & DLM_LKF_NOORDER)
David Teigland90135922006-01-20 08:47:07 +00001559 return 1;
David Teiglande7fd4172006-01-18 09:30:29 +00001560
1561 /*
1562 * 6-3: Once in that queue [CONVERTING], a conversion request cannot be
1563 * granted until all other conversion requests ahead of it are granted
1564 * and/or canceled.
1565 */
1566
1567 if (!now && conv && first_in_list(lkb, &r->res_convertqueue))
David Teigland90135922006-01-20 08:47:07 +00001568 return 1;
David Teiglande7fd4172006-01-18 09:30:29 +00001569
1570 /*
1571 * 6-4: By default, a new request is immediately granted only if all
1572 * three of the following conditions are satisfied when the request is
1573 * issued:
1574 * - The queue of ungranted conversion requests for the resource is
1575 * empty.
1576 * - The queue of ungranted new requests for the resource is empty.
1577 * - The mode of the new request is compatible with the most
1578 * restrictive mode of all granted locks on the resource.
1579 */
1580
1581 if (now && !conv && list_empty(&r->res_convertqueue) &&
1582 list_empty(&r->res_waitqueue))
David Teigland90135922006-01-20 08:47:07 +00001583 return 1;
David Teiglande7fd4172006-01-18 09:30:29 +00001584
1585 /*
1586 * 6-4: Once a lock request is in the queue of ungranted new requests,
1587 * it cannot be granted until the queue of ungranted conversion
1588 * requests is empty, all ungranted new requests ahead of it are
1589 * granted and/or canceled, and it is compatible with the granted mode
1590 * of the most restrictive lock granted on the resource.
1591 */
1592
1593 if (!now && !conv && list_empty(&r->res_convertqueue) &&
1594 first_in_list(lkb, &r->res_waitqueue))
David Teigland90135922006-01-20 08:47:07 +00001595 return 1;
David Teiglande7fd4172006-01-18 09:30:29 +00001596 out:
David Teigland90135922006-01-20 08:47:07 +00001597 return 0;
David Teiglande7fd4172006-01-18 09:30:29 +00001598}
1599
David Teiglandc85d65e2007-05-18 09:01:26 -05001600static int can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now,
1601 int *err)
David Teiglande7fd4172006-01-18 09:30:29 +00001602{
David Teiglande7fd4172006-01-18 09:30:29 +00001603 int rv;
1604 int8_t alt = 0, rqmode = lkb->lkb_rqmode;
David Teiglandc85d65e2007-05-18 09:01:26 -05001605 int8_t is_convert = (lkb->lkb_grmode != DLM_LOCK_IV);
1606
1607 if (err)
1608 *err = 0;
David Teiglande7fd4172006-01-18 09:30:29 +00001609
1610 rv = _can_be_granted(r, lkb, now);
1611 if (rv)
1612 goto out;
1613
David Teiglandc85d65e2007-05-18 09:01:26 -05001614 /*
1615 * The CONVDEADLK flag is non-standard and tells the dlm to resolve
1616 * conversion deadlocks by demoting grmode to NL, otherwise the dlm
1617 * cancels one of the locks.
1618 */
David Teiglande7fd4172006-01-18 09:30:29 +00001619
David Teiglandc85d65e2007-05-18 09:01:26 -05001620 if (is_convert && can_be_queued(lkb) &&
1621 conversion_deadlock_detect(r, lkb)) {
1622 if (lkb->lkb_exflags & DLM_LKF_CONVDEADLK) {
1623 lkb->lkb_grmode = DLM_LOCK_NL;
1624 lkb->lkb_sbflags |= DLM_SBF_DEMOTED;
1625 } else if (!(lkb->lkb_exflags & DLM_LKF_NODLCKWT)) {
1626 if (err)
1627 *err = -EDEADLK;
1628 else {
1629 log_print("can_be_granted deadlock %x now %d",
1630 lkb->lkb_id, now);
1631 dlm_dump_rsb(r);
1632 }
1633 }
1634 goto out;
1635 }
1636
1637 /*
1638 * The ALTPR and ALTCW flags are non-standard and tell the dlm to try
1639 * to grant a request in a mode other than the normal rqmode. It's a
1640 * simple way to provide a big optimization to applications that can
1641 * use them.
1642 */
1643
1644 if (rqmode != DLM_LOCK_PR && (lkb->lkb_exflags & DLM_LKF_ALTPR))
David Teiglande7fd4172006-01-18 09:30:29 +00001645 alt = DLM_LOCK_PR;
David Teiglandc85d65e2007-05-18 09:01:26 -05001646 else if (rqmode != DLM_LOCK_CW && (lkb->lkb_exflags & DLM_LKF_ALTCW))
David Teiglande7fd4172006-01-18 09:30:29 +00001647 alt = DLM_LOCK_CW;
1648
1649 if (alt) {
1650 lkb->lkb_rqmode = alt;
1651 rv = _can_be_granted(r, lkb, now);
1652 if (rv)
1653 lkb->lkb_sbflags |= DLM_SBF_ALTMODE;
1654 else
1655 lkb->lkb_rqmode = rqmode;
1656 }
1657 out:
1658 return rv;
1659}
1660
David Teiglandc85d65e2007-05-18 09:01:26 -05001661/* FIXME: I don't think that can_be_granted() can/will demote or find deadlock
1662 for locks pending on the convert list. Once verified (watch for these
1663 log_prints), we should be able to just call _can_be_granted() and not
1664 bother with the demote/deadlk cases here (and there's no easy way to deal
1665 with a deadlk here, we'd have to generate something like grant_lock with
1666 the deadlk error.) */
1667
1668/* returns the highest requested mode of all blocked conversions */
1669
David Teiglande7fd4172006-01-18 09:30:29 +00001670static int grant_pending_convert(struct dlm_rsb *r, int high)
1671{
1672 struct dlm_lkb *lkb, *s;
1673 int hi, demoted, quit, grant_restart, demote_restart;
David Teiglandc85d65e2007-05-18 09:01:26 -05001674 int deadlk;
David Teiglande7fd4172006-01-18 09:30:29 +00001675
1676 quit = 0;
1677 restart:
1678 grant_restart = 0;
1679 demote_restart = 0;
1680 hi = DLM_LOCK_IV;
1681
1682 list_for_each_entry_safe(lkb, s, &r->res_convertqueue, lkb_statequeue) {
1683 demoted = is_demoted(lkb);
David Teiglandc85d65e2007-05-18 09:01:26 -05001684 deadlk = 0;
1685
1686 if (can_be_granted(r, lkb, 0, &deadlk)) {
David Teiglande7fd4172006-01-18 09:30:29 +00001687 grant_lock_pending(r, lkb);
1688 grant_restart = 1;
David Teiglandc85d65e2007-05-18 09:01:26 -05001689 continue;
David Teiglande7fd4172006-01-18 09:30:29 +00001690 }
David Teiglandc85d65e2007-05-18 09:01:26 -05001691
1692 if (!demoted && is_demoted(lkb)) {
1693 log_print("WARN: pending demoted %x node %d %s",
1694 lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1695 demote_restart = 1;
1696 continue;
1697 }
1698
1699 if (deadlk) {
1700 log_print("WARN: pending deadlock %x node %d %s",
1701 lkb->lkb_id, lkb->lkb_nodeid, r->res_name);
1702 dlm_dump_rsb(r);
1703 continue;
1704 }
1705
1706 hi = max_t(int, lkb->lkb_rqmode, hi);
David Teiglande7fd4172006-01-18 09:30:29 +00001707 }
1708
1709 if (grant_restart)
1710 goto restart;
1711 if (demote_restart && !quit) {
1712 quit = 1;
1713 goto restart;
1714 }
1715
1716 return max_t(int, high, hi);
1717}
1718
1719static int grant_pending_wait(struct dlm_rsb *r, int high)
1720{
1721 struct dlm_lkb *lkb, *s;
1722
1723 list_for_each_entry_safe(lkb, s, &r->res_waitqueue, lkb_statequeue) {
David Teiglandc85d65e2007-05-18 09:01:26 -05001724 if (can_be_granted(r, lkb, 0, NULL))
David Teiglande7fd4172006-01-18 09:30:29 +00001725 grant_lock_pending(r, lkb);
1726 else
1727 high = max_t(int, lkb->lkb_rqmode, high);
1728 }
1729
1730 return high;
1731}
1732
1733static void grant_pending_locks(struct dlm_rsb *r)
1734{
1735 struct dlm_lkb *lkb, *s;
1736 int high = DLM_LOCK_IV;
1737
David Teiglanda345da32006-08-18 11:54:25 -05001738 DLM_ASSERT(is_master(r), dlm_dump_rsb(r););
David Teiglande7fd4172006-01-18 09:30:29 +00001739
1740 high = grant_pending_convert(r, high);
1741 high = grant_pending_wait(r, high);
1742
1743 if (high == DLM_LOCK_IV)
1744 return;
1745
1746 /*
1747 * If there are locks left on the wait/convert queue then send blocking
1748 * ASTs to granted locks based on the largest requested mode (high)
David Teigland3bcd3682006-02-23 09:56:38 +00001749 * found above. FIXME: highbast < high comparison not valid for PR/CW.
David Teiglande7fd4172006-01-18 09:30:29 +00001750 */
1751
1752 list_for_each_entry_safe(lkb, s, &r->res_grantqueue, lkb_statequeue) {
1753 if (lkb->lkb_bastaddr && (lkb->lkb_highbast < high) &&
1754 !__dlm_compat_matrix[lkb->lkb_grmode+1][high+1]) {
1755 queue_bast(r, lkb, high);
1756 lkb->lkb_highbast = high;
1757 }
1758 }
1759}
1760
1761static void send_bast_queue(struct dlm_rsb *r, struct list_head *head,
1762 struct dlm_lkb *lkb)
1763{
1764 struct dlm_lkb *gr;
1765
1766 list_for_each_entry(gr, head, lkb_statequeue) {
1767 if (gr->lkb_bastaddr &&
1768 gr->lkb_highbast < lkb->lkb_rqmode &&
David Teigland3bcd3682006-02-23 09:56:38 +00001769 !modes_compat(gr, lkb)) {
David Teiglande7fd4172006-01-18 09:30:29 +00001770 queue_bast(r, gr, lkb->lkb_rqmode);
1771 gr->lkb_highbast = lkb->lkb_rqmode;
1772 }
1773 }
1774}
1775
1776static void send_blocking_asts(struct dlm_rsb *r, struct dlm_lkb *lkb)
1777{
1778 send_bast_queue(r, &r->res_grantqueue, lkb);
1779}
1780
1781static void send_blocking_asts_all(struct dlm_rsb *r, struct dlm_lkb *lkb)
1782{
1783 send_bast_queue(r, &r->res_grantqueue, lkb);
1784 send_bast_queue(r, &r->res_convertqueue, lkb);
1785}
1786
1787/* set_master(r, lkb) -- set the master nodeid of a resource
1788
1789 The purpose of this function is to set the nodeid field in the given
1790 lkb using the nodeid field in the given rsb. If the rsb's nodeid is
1791 known, it can just be copied to the lkb and the function will return
1792 0. If the rsb's nodeid is _not_ known, it needs to be looked up
1793 before it can be copied to the lkb.
1794
1795 When the rsb nodeid is being looked up remotely, the initial lkb
1796 causing the lookup is kept on the ls_waiters list waiting for the
1797 lookup reply. Other lkb's waiting for the same rsb lookup are kept
1798 on the rsb's res_lookup list until the master is verified.
1799
1800 Return values:
1801 0: nodeid is set in rsb/lkb and the caller should go ahead and use it
1802 1: the rsb master is not available and the lkb has been placed on
1803 a wait queue
1804*/
1805
1806static int set_master(struct dlm_rsb *r, struct dlm_lkb *lkb)
1807{
1808 struct dlm_ls *ls = r->res_ls;
1809 int error, dir_nodeid, ret_nodeid, our_nodeid = dlm_our_nodeid();
1810
1811 if (rsb_flag(r, RSB_MASTER_UNCERTAIN)) {
1812 rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
1813 r->res_first_lkid = lkb->lkb_id;
1814 lkb->lkb_nodeid = r->res_nodeid;
1815 return 0;
1816 }
1817
1818 if (r->res_first_lkid && r->res_first_lkid != lkb->lkb_id) {
1819 list_add_tail(&lkb->lkb_rsb_lookup, &r->res_lookup);
1820 return 1;
1821 }
1822
1823 if (r->res_nodeid == 0) {
1824 lkb->lkb_nodeid = 0;
1825 return 0;
1826 }
1827
1828 if (r->res_nodeid > 0) {
1829 lkb->lkb_nodeid = r->res_nodeid;
1830 return 0;
1831 }
1832
David Teiglanda345da32006-08-18 11:54:25 -05001833 DLM_ASSERT(r->res_nodeid == -1, dlm_dump_rsb(r););
David Teiglande7fd4172006-01-18 09:30:29 +00001834
1835 dir_nodeid = dlm_dir_nodeid(r);
1836
1837 if (dir_nodeid != our_nodeid) {
1838 r->res_first_lkid = lkb->lkb_id;
1839 send_lookup(r, lkb);
1840 return 1;
1841 }
1842
1843 for (;;) {
1844 /* It's possible for dlm_scand to remove an old rsb for
1845 this same resource from the toss list, us to create
1846 a new one, look up the master locally, and find it
1847 already exists just before dlm_scand does the
1848 dir_remove() on the previous rsb. */
1849
1850 error = dlm_dir_lookup(ls, our_nodeid, r->res_name,
1851 r->res_length, &ret_nodeid);
1852 if (!error)
1853 break;
1854 log_debug(ls, "dir_lookup error %d %s", error, r->res_name);
1855 schedule();
1856 }
1857
1858 if (ret_nodeid == our_nodeid) {
1859 r->res_first_lkid = 0;
1860 r->res_nodeid = 0;
1861 lkb->lkb_nodeid = 0;
1862 } else {
1863 r->res_first_lkid = lkb->lkb_id;
1864 r->res_nodeid = ret_nodeid;
1865 lkb->lkb_nodeid = ret_nodeid;
1866 }
1867 return 0;
1868}
1869
1870static void process_lookup_list(struct dlm_rsb *r)
1871{
1872 struct dlm_lkb *lkb, *safe;
1873
1874 list_for_each_entry_safe(lkb, safe, &r->res_lookup, lkb_rsb_lookup) {
David Teiglandef0c2bb2007-03-28 09:56:46 -05001875 list_del_init(&lkb->lkb_rsb_lookup);
David Teiglande7fd4172006-01-18 09:30:29 +00001876 _request_lock(r, lkb);
1877 schedule();
1878 }
1879}
1880
1881/* confirm_master -- confirm (or deny) an rsb's master nodeid */
1882
1883static void confirm_master(struct dlm_rsb *r, int error)
1884{
1885 struct dlm_lkb *lkb;
1886
1887 if (!r->res_first_lkid)
1888 return;
1889
1890 switch (error) {
1891 case 0:
1892 case -EINPROGRESS:
1893 r->res_first_lkid = 0;
1894 process_lookup_list(r);
1895 break;
1896
1897 case -EAGAIN:
1898 /* the remote master didn't queue our NOQUEUE request;
1899 make a waiting lkb the first_lkid */
1900
1901 r->res_first_lkid = 0;
1902
1903 if (!list_empty(&r->res_lookup)) {
1904 lkb = list_entry(r->res_lookup.next, struct dlm_lkb,
1905 lkb_rsb_lookup);
David Teiglandef0c2bb2007-03-28 09:56:46 -05001906 list_del_init(&lkb->lkb_rsb_lookup);
David Teiglande7fd4172006-01-18 09:30:29 +00001907 r->res_first_lkid = lkb->lkb_id;
1908 _request_lock(r, lkb);
1909 } else
1910 r->res_nodeid = -1;
1911 break;
1912
1913 default:
1914 log_error(r->res_ls, "confirm_master unknown error %d", error);
1915 }
1916}
1917
1918static int set_lock_args(int mode, struct dlm_lksb *lksb, uint32_t flags,
David Teiglandd7db9232007-05-18 09:00:32 -05001919 int namelen, unsigned long timeout_cs, void *ast,
David Teigland3bcd3682006-02-23 09:56:38 +00001920 void *astarg, void *bast, struct dlm_args *args)
David Teiglande7fd4172006-01-18 09:30:29 +00001921{
1922 int rv = -EINVAL;
1923
1924 /* check for invalid arg usage */
1925
1926 if (mode < 0 || mode > DLM_LOCK_EX)
1927 goto out;
1928
1929 if (!(flags & DLM_LKF_CONVERT) && (namelen > DLM_RESNAME_MAXLEN))
1930 goto out;
1931
1932 if (flags & DLM_LKF_CANCEL)
1933 goto out;
1934
1935 if (flags & DLM_LKF_QUECVT && !(flags & DLM_LKF_CONVERT))
1936 goto out;
1937
1938 if (flags & DLM_LKF_CONVDEADLK && !(flags & DLM_LKF_CONVERT))
1939 goto out;
1940
1941 if (flags & DLM_LKF_CONVDEADLK && flags & DLM_LKF_NOQUEUE)
1942 goto out;
1943
1944 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_CONVERT)
1945 goto out;
1946
1947 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_QUECVT)
1948 goto out;
1949
1950 if (flags & DLM_LKF_EXPEDITE && flags & DLM_LKF_NOQUEUE)
1951 goto out;
1952
1953 if (flags & DLM_LKF_EXPEDITE && mode != DLM_LOCK_NL)
1954 goto out;
1955
1956 if (!ast || !lksb)
1957 goto out;
1958
1959 if (flags & DLM_LKF_VALBLK && !lksb->sb_lvbptr)
1960 goto out;
1961
David Teiglande7fd4172006-01-18 09:30:29 +00001962 if (flags & DLM_LKF_CONVERT && !lksb->sb_lkid)
1963 goto out;
1964
1965 /* these args will be copied to the lkb in validate_lock_args,
1966 it cannot be done now because when converting locks, fields in
1967 an active lkb cannot be modified before locking the rsb */
1968
1969 args->flags = flags;
1970 args->astaddr = ast;
1971 args->astparam = (long) astarg;
1972 args->bastaddr = bast;
David Teiglandd7db9232007-05-18 09:00:32 -05001973 args->timeout = timeout_cs;
David Teiglande7fd4172006-01-18 09:30:29 +00001974 args->mode = mode;
1975 args->lksb = lksb;
David Teiglande7fd4172006-01-18 09:30:29 +00001976 rv = 0;
1977 out:
1978 return rv;
1979}
1980
1981static int set_unlock_args(uint32_t flags, void *astarg, struct dlm_args *args)
1982{
1983 if (flags & ~(DLM_LKF_CANCEL | DLM_LKF_VALBLK | DLM_LKF_IVVALBLK |
1984 DLM_LKF_FORCEUNLOCK))
1985 return -EINVAL;
1986
David Teiglandef0c2bb2007-03-28 09:56:46 -05001987 if (flags & DLM_LKF_CANCEL && flags & DLM_LKF_FORCEUNLOCK)
1988 return -EINVAL;
1989
David Teiglande7fd4172006-01-18 09:30:29 +00001990 args->flags = flags;
1991 args->astparam = (long) astarg;
1992 return 0;
1993}
1994
1995static int validate_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
1996 struct dlm_args *args)
1997{
1998 int rv = -EINVAL;
1999
2000 if (args->flags & DLM_LKF_CONVERT) {
2001 if (lkb->lkb_flags & DLM_IFL_MSTCPY)
2002 goto out;
2003
2004 if (args->flags & DLM_LKF_QUECVT &&
2005 !__quecvt_compat_matrix[lkb->lkb_grmode+1][args->mode+1])
2006 goto out;
2007
2008 rv = -EBUSY;
2009 if (lkb->lkb_status != DLM_LKSTS_GRANTED)
2010 goto out;
2011
2012 if (lkb->lkb_wait_type)
2013 goto out;
David Teiglandef0c2bb2007-03-28 09:56:46 -05002014
2015 if (is_overlap(lkb))
2016 goto out;
David Teiglande7fd4172006-01-18 09:30:29 +00002017 }
2018
2019 lkb->lkb_exflags = args->flags;
2020 lkb->lkb_sbflags = 0;
2021 lkb->lkb_astaddr = args->astaddr;
2022 lkb->lkb_astparam = args->astparam;
2023 lkb->lkb_bastaddr = args->bastaddr;
2024 lkb->lkb_rqmode = args->mode;
2025 lkb->lkb_lksb = args->lksb;
2026 lkb->lkb_lvbptr = args->lksb->sb_lvbptr;
2027 lkb->lkb_ownpid = (int) current->pid;
David Teiglandd7db9232007-05-18 09:00:32 -05002028 lkb->lkb_timeout_cs = args->timeout;
David Teiglande7fd4172006-01-18 09:30:29 +00002029 rv = 0;
2030 out:
2031 return rv;
2032}
2033
David Teiglandef0c2bb2007-03-28 09:56:46 -05002034/* when dlm_unlock() sees -EBUSY with CANCEL/FORCEUNLOCK it returns 0
2035 for success */
2036
2037/* note: it's valid for lkb_nodeid/res_nodeid to be -1 when we get here
2038 because there may be a lookup in progress and it's valid to do
2039 cancel/unlockf on it */
2040
David Teiglande7fd4172006-01-18 09:30:29 +00002041static int validate_unlock_args(struct dlm_lkb *lkb, struct dlm_args *args)
2042{
David Teiglandef0c2bb2007-03-28 09:56:46 -05002043 struct dlm_ls *ls = lkb->lkb_resource->res_ls;
David Teiglande7fd4172006-01-18 09:30:29 +00002044 int rv = -EINVAL;
2045
David Teiglandef0c2bb2007-03-28 09:56:46 -05002046 if (lkb->lkb_flags & DLM_IFL_MSTCPY) {
2047 log_error(ls, "unlock on MSTCPY %x", lkb->lkb_id);
2048 dlm_print_lkb(lkb);
David Teiglande7fd4172006-01-18 09:30:29 +00002049 goto out;
David Teiglandef0c2bb2007-03-28 09:56:46 -05002050 }
David Teiglande7fd4172006-01-18 09:30:29 +00002051
David Teiglandef0c2bb2007-03-28 09:56:46 -05002052 /* an lkb may still exist even though the lock is EOL'ed due to a
2053 cancel, unlock or failed noqueue request; an app can't use these
2054 locks; return same error as if the lkid had not been found at all */
2055
2056 if (lkb->lkb_flags & DLM_IFL_ENDOFLIFE) {
2057 log_debug(ls, "unlock on ENDOFLIFE %x", lkb->lkb_id);
2058 rv = -ENOENT;
2059 goto out;
2060 }
2061
2062 /* an lkb may be waiting for an rsb lookup to complete where the
2063 lookup was initiated by another lock */
2064
2065 if (args->flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)) {
2066 if (!list_empty(&lkb->lkb_rsb_lookup)) {
2067 log_debug(ls, "unlock on rsb_lookup %x", lkb->lkb_id);
2068 list_del_init(&lkb->lkb_rsb_lookup);
2069 queue_cast(lkb->lkb_resource, lkb,
2070 args->flags & DLM_LKF_CANCEL ?
2071 -DLM_ECANCEL : -DLM_EUNLOCK);
2072 unhold_lkb(lkb); /* undoes create_lkb() */
2073 rv = -EBUSY;
2074 goto out;
2075 }
2076 }
2077
2078 /* cancel not allowed with another cancel/unlock in progress */
2079
2080 if (args->flags & DLM_LKF_CANCEL) {
2081 if (lkb->lkb_exflags & DLM_LKF_CANCEL)
2082 goto out;
2083
2084 if (is_overlap(lkb))
2085 goto out;
2086
David Teigland3ae1acf2007-05-18 08:59:31 -05002087 /* don't let scand try to do a cancel */
2088 del_timeout(lkb);
2089
David Teiglandef0c2bb2007-03-28 09:56:46 -05002090 if (lkb->lkb_flags & DLM_IFL_RESEND) {
2091 lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
2092 rv = -EBUSY;
2093 goto out;
2094 }
2095
2096 switch (lkb->lkb_wait_type) {
2097 case DLM_MSG_LOOKUP:
2098 case DLM_MSG_REQUEST:
2099 lkb->lkb_flags |= DLM_IFL_OVERLAP_CANCEL;
2100 rv = -EBUSY;
2101 goto out;
2102 case DLM_MSG_UNLOCK:
2103 case DLM_MSG_CANCEL:
2104 goto out;
2105 }
2106 /* add_to_waiters() will set OVERLAP_CANCEL */
David Teiglande7fd4172006-01-18 09:30:29 +00002107 goto out_ok;
David Teiglandef0c2bb2007-03-28 09:56:46 -05002108 }
David Teiglande7fd4172006-01-18 09:30:29 +00002109
David Teiglandef0c2bb2007-03-28 09:56:46 -05002110 /* do we need to allow a force-unlock if there's a normal unlock
2111 already in progress? in what conditions could the normal unlock
2112 fail such that we'd want to send a force-unlock to be sure? */
David Teiglande7fd4172006-01-18 09:30:29 +00002113
David Teiglandef0c2bb2007-03-28 09:56:46 -05002114 if (args->flags & DLM_LKF_FORCEUNLOCK) {
2115 if (lkb->lkb_exflags & DLM_LKF_FORCEUNLOCK)
2116 goto out;
David Teiglande7fd4172006-01-18 09:30:29 +00002117
David Teiglandef0c2bb2007-03-28 09:56:46 -05002118 if (is_overlap_unlock(lkb))
2119 goto out;
2120
David Teigland3ae1acf2007-05-18 08:59:31 -05002121 /* don't let scand try to do a cancel */
2122 del_timeout(lkb);
2123
David Teiglandef0c2bb2007-03-28 09:56:46 -05002124 if (lkb->lkb_flags & DLM_IFL_RESEND) {
2125 lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
2126 rv = -EBUSY;
2127 goto out;
2128 }
2129
2130 switch (lkb->lkb_wait_type) {
2131 case DLM_MSG_LOOKUP:
2132 case DLM_MSG_REQUEST:
2133 lkb->lkb_flags |= DLM_IFL_OVERLAP_UNLOCK;
2134 rv = -EBUSY;
2135 goto out;
2136 case DLM_MSG_UNLOCK:
2137 goto out;
2138 }
2139 /* add_to_waiters() will set OVERLAP_UNLOCK */
2140 goto out_ok;
2141 }
2142
2143 /* normal unlock not allowed if there's any op in progress */
David Teiglande7fd4172006-01-18 09:30:29 +00002144 rv = -EBUSY;
David Teiglandef0c2bb2007-03-28 09:56:46 -05002145 if (lkb->lkb_wait_type || lkb->lkb_wait_count)
David Teiglande7fd4172006-01-18 09:30:29 +00002146 goto out;
2147
2148 out_ok:
David Teiglandef0c2bb2007-03-28 09:56:46 -05002149 /* an overlapping op shouldn't blow away exflags from other op */
2150 lkb->lkb_exflags |= args->flags;
David Teiglande7fd4172006-01-18 09:30:29 +00002151 lkb->lkb_sbflags = 0;
2152 lkb->lkb_astparam = args->astparam;
David Teiglande7fd4172006-01-18 09:30:29 +00002153 rv = 0;
2154 out:
David Teiglandef0c2bb2007-03-28 09:56:46 -05002155 if (rv)
2156 log_debug(ls, "validate_unlock_args %d %x %x %x %x %d %s", rv,
2157 lkb->lkb_id, lkb->lkb_flags, lkb->lkb_exflags,
2158 args->flags, lkb->lkb_wait_type,
2159 lkb->lkb_resource->res_name);
David Teiglande7fd4172006-01-18 09:30:29 +00002160 return rv;
2161}
2162
2163/*
2164 * Four stage 4 varieties:
2165 * do_request(), do_convert(), do_unlock(), do_cancel()
2166 * These are called on the master node for the given lock and
2167 * from the central locking logic.
2168 */
2169
2170static int do_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
2171{
2172 int error = 0;
2173
David Teiglandc85d65e2007-05-18 09:01:26 -05002174 if (can_be_granted(r, lkb, 1, NULL)) {
David Teiglande7fd4172006-01-18 09:30:29 +00002175 grant_lock(r, lkb);
2176 queue_cast(r, lkb, 0);
2177 goto out;
2178 }
2179
2180 if (can_be_queued(lkb)) {
2181 error = -EINPROGRESS;
2182 add_lkb(r, lkb, DLM_LKSTS_WAITING);
2183 send_blocking_asts(r, lkb);
David Teigland3ae1acf2007-05-18 08:59:31 -05002184 add_timeout(lkb);
David Teiglande7fd4172006-01-18 09:30:29 +00002185 goto out;
2186 }
2187
2188 error = -EAGAIN;
2189 if (force_blocking_asts(lkb))
2190 send_blocking_asts_all(r, lkb);
2191 queue_cast(r, lkb, -EAGAIN);
2192
2193 out:
2194 return error;
2195}
2196
2197static int do_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2198{
2199 int error = 0;
David Teiglandc85d65e2007-05-18 09:01:26 -05002200 int deadlk = 0;
David Teiglande7fd4172006-01-18 09:30:29 +00002201
2202 /* changing an existing lock may allow others to be granted */
2203
David Teiglandc85d65e2007-05-18 09:01:26 -05002204 if (can_be_granted(r, lkb, 1, &deadlk)) {
David Teiglande7fd4172006-01-18 09:30:29 +00002205 grant_lock(r, lkb);
2206 queue_cast(r, lkb, 0);
2207 grant_pending_locks(r);
2208 goto out;
2209 }
2210
David Teiglandc85d65e2007-05-18 09:01:26 -05002211 /* can_be_granted() detected that this lock would block in a conversion
2212 deadlock, so we leave it on the granted queue and return EDEADLK in
2213 the ast for the convert. */
2214
2215 if (deadlk) {
2216 /* it's left on the granted queue */
2217 log_debug(r->res_ls, "deadlock %x node %d sts%d g%d r%d %s",
2218 lkb->lkb_id, lkb->lkb_nodeid, lkb->lkb_status,
2219 lkb->lkb_grmode, lkb->lkb_rqmode, r->res_name);
2220 revert_lock(r, lkb);
2221 queue_cast(r, lkb, -EDEADLK);
2222 error = -EDEADLK;
2223 goto out;
2224 }
2225
David Teigland7d3c1fe2007-04-19 10:30:41 -05002226 /* is_demoted() means the can_be_granted() above set the grmode
2227 to NL, and left us on the granted queue. This auto-demotion
2228 (due to CONVDEADLK) might mean other locks, and/or this lock, are
2229 now grantable. We have to try to grant other converting locks
2230 before we try again to grant this one. */
2231
2232 if (is_demoted(lkb)) {
2233 grant_pending_convert(r, DLM_LOCK_IV);
2234 if (_can_be_granted(r, lkb, 1)) {
2235 grant_lock(r, lkb);
2236 queue_cast(r, lkb, 0);
David Teiglande7fd4172006-01-18 09:30:29 +00002237 grant_pending_locks(r);
David Teigland7d3c1fe2007-04-19 10:30:41 -05002238 goto out;
2239 }
2240 /* else fall through and move to convert queue */
2241 }
2242
2243 if (can_be_queued(lkb)) {
David Teiglande7fd4172006-01-18 09:30:29 +00002244 error = -EINPROGRESS;
2245 del_lkb(r, lkb);
2246 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
2247 send_blocking_asts(r, lkb);
David Teigland3ae1acf2007-05-18 08:59:31 -05002248 add_timeout(lkb);
David Teiglande7fd4172006-01-18 09:30:29 +00002249 goto out;
2250 }
2251
2252 error = -EAGAIN;
2253 if (force_blocking_asts(lkb))
2254 send_blocking_asts_all(r, lkb);
2255 queue_cast(r, lkb, -EAGAIN);
2256
2257 out:
2258 return error;
2259}
2260
2261static int do_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2262{
2263 remove_lock(r, lkb);
2264 queue_cast(r, lkb, -DLM_EUNLOCK);
2265 grant_pending_locks(r);
2266 return -DLM_EUNLOCK;
2267}
2268
David Teiglandef0c2bb2007-03-28 09:56:46 -05002269/* returns: 0 did nothing, -DLM_ECANCEL canceled lock */
Steven Whitehouse907b9bc2006-09-25 09:26:04 -04002270
David Teiglande7fd4172006-01-18 09:30:29 +00002271static int do_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2272{
David Teiglandef0c2bb2007-03-28 09:56:46 -05002273 int error;
2274
2275 error = revert_lock(r, lkb);
2276 if (error) {
2277 queue_cast(r, lkb, -DLM_ECANCEL);
2278 grant_pending_locks(r);
2279 return -DLM_ECANCEL;
2280 }
2281 return 0;
David Teiglande7fd4172006-01-18 09:30:29 +00002282}
2283
2284/*
2285 * Four stage 3 varieties:
2286 * _request_lock(), _convert_lock(), _unlock_lock(), _cancel_lock()
2287 */
2288
2289/* add a new lkb to a possibly new rsb, called by requesting process */
2290
2291static int _request_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2292{
2293 int error;
2294
2295 /* set_master: sets lkb nodeid from r */
2296
2297 error = set_master(r, lkb);
2298 if (error < 0)
2299 goto out;
2300 if (error) {
2301 error = 0;
2302 goto out;
2303 }
2304
2305 if (is_remote(r))
2306 /* receive_request() calls do_request() on remote node */
2307 error = send_request(r, lkb);
2308 else
2309 error = do_request(r, lkb);
2310 out:
2311 return error;
2312}
2313
David Teigland3bcd3682006-02-23 09:56:38 +00002314/* change some property of an existing lkb, e.g. mode */
David Teiglande7fd4172006-01-18 09:30:29 +00002315
2316static int _convert_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2317{
2318 int error;
2319
2320 if (is_remote(r))
2321 /* receive_convert() calls do_convert() on remote node */
2322 error = send_convert(r, lkb);
2323 else
2324 error = do_convert(r, lkb);
2325
2326 return error;
2327}
2328
2329/* remove an existing lkb from the granted queue */
2330
2331static int _unlock_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2332{
2333 int error;
2334
2335 if (is_remote(r))
2336 /* receive_unlock() calls do_unlock() on remote node */
2337 error = send_unlock(r, lkb);
2338 else
2339 error = do_unlock(r, lkb);
2340
2341 return error;
2342}
2343
2344/* remove an existing lkb from the convert or wait queue */
2345
2346static int _cancel_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2347{
2348 int error;
2349
2350 if (is_remote(r))
2351 /* receive_cancel() calls do_cancel() on remote node */
2352 error = send_cancel(r, lkb);
2353 else
2354 error = do_cancel(r, lkb);
2355
2356 return error;
2357}
2358
2359/*
2360 * Four stage 2 varieties:
2361 * request_lock(), convert_lock(), unlock_lock(), cancel_lock()
2362 */
2363
2364static int request_lock(struct dlm_ls *ls, struct dlm_lkb *lkb, char *name,
2365 int len, struct dlm_args *args)
2366{
2367 struct dlm_rsb *r;
2368 int error;
2369
2370 error = validate_lock_args(ls, lkb, args);
2371 if (error)
2372 goto out;
2373
2374 error = find_rsb(ls, name, len, R_CREATE, &r);
2375 if (error)
2376 goto out;
2377
2378 lock_rsb(r);
2379
2380 attach_lkb(r, lkb);
2381 lkb->lkb_lksb->sb_lkid = lkb->lkb_id;
2382
2383 error = _request_lock(r, lkb);
2384
2385 unlock_rsb(r);
2386 put_rsb(r);
2387
2388 out:
2389 return error;
2390}
2391
2392static int convert_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2393 struct dlm_args *args)
2394{
2395 struct dlm_rsb *r;
2396 int error;
2397
2398 r = lkb->lkb_resource;
2399
2400 hold_rsb(r);
2401 lock_rsb(r);
2402
2403 error = validate_lock_args(ls, lkb, args);
2404 if (error)
2405 goto out;
2406
2407 error = _convert_lock(r, lkb);
2408 out:
2409 unlock_rsb(r);
2410 put_rsb(r);
2411 return error;
2412}
2413
2414static int unlock_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2415 struct dlm_args *args)
2416{
2417 struct dlm_rsb *r;
2418 int error;
2419
2420 r = lkb->lkb_resource;
2421
2422 hold_rsb(r);
2423 lock_rsb(r);
2424
2425 error = validate_unlock_args(lkb, args);
2426 if (error)
2427 goto out;
2428
2429 error = _unlock_lock(r, lkb);
2430 out:
2431 unlock_rsb(r);
2432 put_rsb(r);
2433 return error;
2434}
2435
2436static int cancel_lock(struct dlm_ls *ls, struct dlm_lkb *lkb,
2437 struct dlm_args *args)
2438{
2439 struct dlm_rsb *r;
2440 int error;
2441
2442 r = lkb->lkb_resource;
2443
2444 hold_rsb(r);
2445 lock_rsb(r);
2446
2447 error = validate_unlock_args(lkb, args);
2448 if (error)
2449 goto out;
2450
2451 error = _cancel_lock(r, lkb);
2452 out:
2453 unlock_rsb(r);
2454 put_rsb(r);
2455 return error;
2456}
2457
2458/*
2459 * Two stage 1 varieties: dlm_lock() and dlm_unlock()
2460 */
2461
2462int dlm_lock(dlm_lockspace_t *lockspace,
2463 int mode,
2464 struct dlm_lksb *lksb,
2465 uint32_t flags,
2466 void *name,
2467 unsigned int namelen,
2468 uint32_t parent_lkid,
2469 void (*ast) (void *astarg),
2470 void *astarg,
David Teigland3bcd3682006-02-23 09:56:38 +00002471 void (*bast) (void *astarg, int mode))
David Teiglande7fd4172006-01-18 09:30:29 +00002472{
2473 struct dlm_ls *ls;
2474 struct dlm_lkb *lkb;
2475 struct dlm_args args;
2476 int error, convert = flags & DLM_LKF_CONVERT;
2477
2478 ls = dlm_find_lockspace_local(lockspace);
2479 if (!ls)
2480 return -EINVAL;
2481
David Teigland85e86ed2007-05-18 08:58:15 -05002482 dlm_lock_recovery(ls);
David Teiglande7fd4172006-01-18 09:30:29 +00002483
2484 if (convert)
2485 error = find_lkb(ls, lksb->sb_lkid, &lkb);
2486 else
2487 error = create_lkb(ls, &lkb);
2488
2489 if (error)
2490 goto out;
2491
David Teiglandd7db9232007-05-18 09:00:32 -05002492 error = set_lock_args(mode, lksb, flags, namelen, 0, ast,
David Teigland3bcd3682006-02-23 09:56:38 +00002493 astarg, bast, &args);
David Teiglande7fd4172006-01-18 09:30:29 +00002494 if (error)
2495 goto out_put;
2496
2497 if (convert)
2498 error = convert_lock(ls, lkb, &args);
2499 else
2500 error = request_lock(ls, lkb, name, namelen, &args);
2501
2502 if (error == -EINPROGRESS)
2503 error = 0;
2504 out_put:
2505 if (convert || error)
David Teiglandb3f58d82006-02-28 11:16:37 -05002506 __put_lkb(ls, lkb);
David Teiglandc85d65e2007-05-18 09:01:26 -05002507 if (error == -EAGAIN || error == -EDEADLK)
David Teiglande7fd4172006-01-18 09:30:29 +00002508 error = 0;
2509 out:
David Teigland85e86ed2007-05-18 08:58:15 -05002510 dlm_unlock_recovery(ls);
David Teiglande7fd4172006-01-18 09:30:29 +00002511 dlm_put_lockspace(ls);
2512 return error;
2513}
2514
2515int dlm_unlock(dlm_lockspace_t *lockspace,
2516 uint32_t lkid,
2517 uint32_t flags,
2518 struct dlm_lksb *lksb,
2519 void *astarg)
2520{
2521 struct dlm_ls *ls;
2522 struct dlm_lkb *lkb;
2523 struct dlm_args args;
2524 int error;
2525
2526 ls = dlm_find_lockspace_local(lockspace);
2527 if (!ls)
2528 return -EINVAL;
2529
David Teigland85e86ed2007-05-18 08:58:15 -05002530 dlm_lock_recovery(ls);
David Teiglande7fd4172006-01-18 09:30:29 +00002531
2532 error = find_lkb(ls, lkid, &lkb);
2533 if (error)
2534 goto out;
2535
2536 error = set_unlock_args(flags, astarg, &args);
2537 if (error)
2538 goto out_put;
2539
2540 if (flags & DLM_LKF_CANCEL)
2541 error = cancel_lock(ls, lkb, &args);
2542 else
2543 error = unlock_lock(ls, lkb, &args);
2544
2545 if (error == -DLM_EUNLOCK || error == -DLM_ECANCEL)
2546 error = 0;
David Teiglandef0c2bb2007-03-28 09:56:46 -05002547 if (error == -EBUSY && (flags & (DLM_LKF_CANCEL | DLM_LKF_FORCEUNLOCK)))
2548 error = 0;
David Teiglande7fd4172006-01-18 09:30:29 +00002549 out_put:
David Teiglandb3f58d82006-02-28 11:16:37 -05002550 dlm_put_lkb(lkb);
David Teiglande7fd4172006-01-18 09:30:29 +00002551 out:
David Teigland85e86ed2007-05-18 08:58:15 -05002552 dlm_unlock_recovery(ls);
David Teiglande7fd4172006-01-18 09:30:29 +00002553 dlm_put_lockspace(ls);
2554 return error;
2555}
2556
2557/*
2558 * send/receive routines for remote operations and replies
2559 *
2560 * send_args
2561 * send_common
2562 * send_request receive_request
2563 * send_convert receive_convert
2564 * send_unlock receive_unlock
2565 * send_cancel receive_cancel
2566 * send_grant receive_grant
2567 * send_bast receive_bast
2568 * send_lookup receive_lookup
2569 * send_remove receive_remove
2570 *
2571 * send_common_reply
2572 * receive_request_reply send_request_reply
2573 * receive_convert_reply send_convert_reply
2574 * receive_unlock_reply send_unlock_reply
2575 * receive_cancel_reply send_cancel_reply
2576 * receive_lookup_reply send_lookup_reply
2577 */
2578
David Teigland7e4dac32007-04-02 09:06:41 -05002579static int _create_message(struct dlm_ls *ls, int mb_len,
2580 int to_nodeid, int mstype,
2581 struct dlm_message **ms_ret,
2582 struct dlm_mhandle **mh_ret)
2583{
2584 struct dlm_message *ms;
2585 struct dlm_mhandle *mh;
2586 char *mb;
2587
2588 /* get_buffer gives us a message handle (mh) that we need to
2589 pass into lowcomms_commit and a message buffer (mb) that we
2590 write our data into */
2591
2592 mh = dlm_lowcomms_get_buffer(to_nodeid, mb_len, GFP_KERNEL, &mb);
2593 if (!mh)
2594 return -ENOBUFS;
2595
2596 memset(mb, 0, mb_len);
2597
2598 ms = (struct dlm_message *) mb;
2599
2600 ms->m_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
2601 ms->m_header.h_lockspace = ls->ls_global_id;
2602 ms->m_header.h_nodeid = dlm_our_nodeid();
2603 ms->m_header.h_length = mb_len;
2604 ms->m_header.h_cmd = DLM_MSG;
2605
2606 ms->m_type = mstype;
2607
2608 *mh_ret = mh;
2609 *ms_ret = ms;
2610 return 0;
2611}
2612
David Teiglande7fd4172006-01-18 09:30:29 +00002613static int create_message(struct dlm_rsb *r, struct dlm_lkb *lkb,
2614 int to_nodeid, int mstype,
2615 struct dlm_message **ms_ret,
2616 struct dlm_mhandle **mh_ret)
2617{
David Teiglande7fd4172006-01-18 09:30:29 +00002618 int mb_len = sizeof(struct dlm_message);
2619
2620 switch (mstype) {
2621 case DLM_MSG_REQUEST:
2622 case DLM_MSG_LOOKUP:
2623 case DLM_MSG_REMOVE:
2624 mb_len += r->res_length;
2625 break;
2626 case DLM_MSG_CONVERT:
2627 case DLM_MSG_UNLOCK:
2628 case DLM_MSG_REQUEST_REPLY:
2629 case DLM_MSG_CONVERT_REPLY:
2630 case DLM_MSG_GRANT:
2631 if (lkb && lkb->lkb_lvbptr)
2632 mb_len += r->res_ls->ls_lvblen;
2633 break;
2634 }
2635
David Teigland7e4dac32007-04-02 09:06:41 -05002636 return _create_message(r->res_ls, mb_len, to_nodeid, mstype,
2637 ms_ret, mh_ret);
David Teiglande7fd4172006-01-18 09:30:29 +00002638}
2639
2640/* further lowcomms enhancements or alternate implementations may make
2641 the return value from this function useful at some point */
2642
2643static int send_message(struct dlm_mhandle *mh, struct dlm_message *ms)
2644{
2645 dlm_message_out(ms);
2646 dlm_lowcomms_commit_buffer(mh);
2647 return 0;
2648}
2649
2650static void send_args(struct dlm_rsb *r, struct dlm_lkb *lkb,
2651 struct dlm_message *ms)
2652{
2653 ms->m_nodeid = lkb->lkb_nodeid;
2654 ms->m_pid = lkb->lkb_ownpid;
2655 ms->m_lkid = lkb->lkb_id;
2656 ms->m_remid = lkb->lkb_remid;
2657 ms->m_exflags = lkb->lkb_exflags;
2658 ms->m_sbflags = lkb->lkb_sbflags;
2659 ms->m_flags = lkb->lkb_flags;
2660 ms->m_lvbseq = lkb->lkb_lvbseq;
2661 ms->m_status = lkb->lkb_status;
2662 ms->m_grmode = lkb->lkb_grmode;
2663 ms->m_rqmode = lkb->lkb_rqmode;
2664 ms->m_hash = r->res_hash;
2665
2666 /* m_result and m_bastmode are set from function args,
2667 not from lkb fields */
2668
2669 if (lkb->lkb_bastaddr)
2670 ms->m_asts |= AST_BAST;
2671 if (lkb->lkb_astaddr)
2672 ms->m_asts |= AST_COMP;
2673
David Teiglandda49f362006-12-13 10:38:45 -06002674 /* compare with switch in create_message; send_remove() doesn't
2675 use send_args() */
2676
2677 switch (ms->m_type) {
2678 case DLM_MSG_REQUEST:
2679 case DLM_MSG_LOOKUP:
David Teiglande7fd4172006-01-18 09:30:29 +00002680 memcpy(ms->m_extra, r->res_name, r->res_length);
David Teiglandda49f362006-12-13 10:38:45 -06002681 break;
2682 case DLM_MSG_CONVERT:
2683 case DLM_MSG_UNLOCK:
2684 case DLM_MSG_REQUEST_REPLY:
2685 case DLM_MSG_CONVERT_REPLY:
2686 case DLM_MSG_GRANT:
2687 if (!lkb->lkb_lvbptr)
2688 break;
David Teiglande7fd4172006-01-18 09:30:29 +00002689 memcpy(ms->m_extra, lkb->lkb_lvbptr, r->res_ls->ls_lvblen);
David Teiglandda49f362006-12-13 10:38:45 -06002690 break;
2691 }
David Teiglande7fd4172006-01-18 09:30:29 +00002692}
2693
2694static int send_common(struct dlm_rsb *r, struct dlm_lkb *lkb, int mstype)
2695{
2696 struct dlm_message *ms;
2697 struct dlm_mhandle *mh;
2698 int to_nodeid, error;
2699
David Teiglandef0c2bb2007-03-28 09:56:46 -05002700 error = add_to_waiters(lkb, mstype);
2701 if (error)
2702 return error;
David Teiglande7fd4172006-01-18 09:30:29 +00002703
2704 to_nodeid = r->res_nodeid;
2705
2706 error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
2707 if (error)
2708 goto fail;
2709
2710 send_args(r, lkb, ms);
2711
2712 error = send_message(mh, ms);
2713 if (error)
2714 goto fail;
2715 return 0;
2716
2717 fail:
David Teiglandef0c2bb2007-03-28 09:56:46 -05002718 remove_from_waiters(lkb, msg_reply_type(mstype));
David Teiglande7fd4172006-01-18 09:30:29 +00002719 return error;
2720}
2721
2722static int send_request(struct dlm_rsb *r, struct dlm_lkb *lkb)
2723{
2724 return send_common(r, lkb, DLM_MSG_REQUEST);
2725}
2726
2727static int send_convert(struct dlm_rsb *r, struct dlm_lkb *lkb)
2728{
2729 int error;
2730
2731 error = send_common(r, lkb, DLM_MSG_CONVERT);
2732
2733 /* down conversions go without a reply from the master */
2734 if (!error && down_conversion(lkb)) {
David Teiglandef0c2bb2007-03-28 09:56:46 -05002735 remove_from_waiters(lkb, DLM_MSG_CONVERT_REPLY);
2736 r->res_ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
David Teiglande7fd4172006-01-18 09:30:29 +00002737 r->res_ls->ls_stub_ms.m_result = 0;
David Teigland32f105a2006-08-23 16:07:31 -04002738 r->res_ls->ls_stub_ms.m_flags = lkb->lkb_flags;
David Teiglande7fd4172006-01-18 09:30:29 +00002739 __receive_convert_reply(r, lkb, &r->res_ls->ls_stub_ms);
2740 }
2741
2742 return error;
2743}
2744
2745/* FIXME: if this lkb is the only lock we hold on the rsb, then set
2746 MASTER_UNCERTAIN to force the next request on the rsb to confirm
2747 that the master is still correct. */
2748
2749static int send_unlock(struct dlm_rsb *r, struct dlm_lkb *lkb)
2750{
2751 return send_common(r, lkb, DLM_MSG_UNLOCK);
2752}
2753
2754static int send_cancel(struct dlm_rsb *r, struct dlm_lkb *lkb)
2755{
2756 return send_common(r, lkb, DLM_MSG_CANCEL);
2757}
2758
2759static int send_grant(struct dlm_rsb *r, struct dlm_lkb *lkb)
2760{
2761 struct dlm_message *ms;
2762 struct dlm_mhandle *mh;
2763 int to_nodeid, error;
2764
2765 to_nodeid = lkb->lkb_nodeid;
2766
2767 error = create_message(r, lkb, to_nodeid, DLM_MSG_GRANT, &ms, &mh);
2768 if (error)
2769 goto out;
2770
2771 send_args(r, lkb, ms);
2772
2773 ms->m_result = 0;
2774
2775 error = send_message(mh, ms);
2776 out:
2777 return error;
2778}
2779
2780static int send_bast(struct dlm_rsb *r, struct dlm_lkb *lkb, int mode)
2781{
2782 struct dlm_message *ms;
2783 struct dlm_mhandle *mh;
2784 int to_nodeid, error;
2785
2786 to_nodeid = lkb->lkb_nodeid;
2787
2788 error = create_message(r, NULL, to_nodeid, DLM_MSG_BAST, &ms, &mh);
2789 if (error)
2790 goto out;
2791
2792 send_args(r, lkb, ms);
2793
2794 ms->m_bastmode = mode;
2795
2796 error = send_message(mh, ms);
2797 out:
2798 return error;
2799}
2800
2801static int send_lookup(struct dlm_rsb *r, struct dlm_lkb *lkb)
2802{
2803 struct dlm_message *ms;
2804 struct dlm_mhandle *mh;
2805 int to_nodeid, error;
2806
David Teiglandef0c2bb2007-03-28 09:56:46 -05002807 error = add_to_waiters(lkb, DLM_MSG_LOOKUP);
2808 if (error)
2809 return error;
David Teiglande7fd4172006-01-18 09:30:29 +00002810
2811 to_nodeid = dlm_dir_nodeid(r);
2812
2813 error = create_message(r, NULL, to_nodeid, DLM_MSG_LOOKUP, &ms, &mh);
2814 if (error)
2815 goto fail;
2816
2817 send_args(r, lkb, ms);
2818
2819 error = send_message(mh, ms);
2820 if (error)
2821 goto fail;
2822 return 0;
2823
2824 fail:
David Teiglandef0c2bb2007-03-28 09:56:46 -05002825 remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
David Teiglande7fd4172006-01-18 09:30:29 +00002826 return error;
2827}
2828
2829static int send_remove(struct dlm_rsb *r)
2830{
2831 struct dlm_message *ms;
2832 struct dlm_mhandle *mh;
2833 int to_nodeid, error;
2834
2835 to_nodeid = dlm_dir_nodeid(r);
2836
2837 error = create_message(r, NULL, to_nodeid, DLM_MSG_REMOVE, &ms, &mh);
2838 if (error)
2839 goto out;
2840
2841 memcpy(ms->m_extra, r->res_name, r->res_length);
2842 ms->m_hash = r->res_hash;
2843
2844 error = send_message(mh, ms);
2845 out:
2846 return error;
2847}
2848
2849static int send_common_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
2850 int mstype, int rv)
2851{
2852 struct dlm_message *ms;
2853 struct dlm_mhandle *mh;
2854 int to_nodeid, error;
2855
2856 to_nodeid = lkb->lkb_nodeid;
2857
2858 error = create_message(r, lkb, to_nodeid, mstype, &ms, &mh);
2859 if (error)
2860 goto out;
2861
2862 send_args(r, lkb, ms);
2863
2864 ms->m_result = rv;
2865
2866 error = send_message(mh, ms);
2867 out:
2868 return error;
2869}
2870
2871static int send_request_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2872{
2873 return send_common_reply(r, lkb, DLM_MSG_REQUEST_REPLY, rv);
2874}
2875
2876static int send_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2877{
2878 return send_common_reply(r, lkb, DLM_MSG_CONVERT_REPLY, rv);
2879}
2880
2881static int send_unlock_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2882{
2883 return send_common_reply(r, lkb, DLM_MSG_UNLOCK_REPLY, rv);
2884}
2885
2886static int send_cancel_reply(struct dlm_rsb *r, struct dlm_lkb *lkb, int rv)
2887{
2888 return send_common_reply(r, lkb, DLM_MSG_CANCEL_REPLY, rv);
2889}
2890
2891static int send_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms_in,
2892 int ret_nodeid, int rv)
2893{
2894 struct dlm_rsb *r = &ls->ls_stub_rsb;
2895 struct dlm_message *ms;
2896 struct dlm_mhandle *mh;
2897 int error, nodeid = ms_in->m_header.h_nodeid;
2898
2899 error = create_message(r, NULL, nodeid, DLM_MSG_LOOKUP_REPLY, &ms, &mh);
2900 if (error)
2901 goto out;
2902
2903 ms->m_lkid = ms_in->m_lkid;
2904 ms->m_result = rv;
2905 ms->m_nodeid = ret_nodeid;
2906
2907 error = send_message(mh, ms);
2908 out:
2909 return error;
2910}
2911
2912/* which args we save from a received message depends heavily on the type
2913 of message, unlike the send side where we can safely send everything about
2914 the lkb for any type of message */
2915
2916static void receive_flags(struct dlm_lkb *lkb, struct dlm_message *ms)
2917{
2918 lkb->lkb_exflags = ms->m_exflags;
David Teigland6f90a8b12006-11-10 14:16:27 -06002919 lkb->lkb_sbflags = ms->m_sbflags;
David Teiglande7fd4172006-01-18 09:30:29 +00002920 lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
2921 (ms->m_flags & 0x0000FFFF);
2922}
2923
2924static void receive_flags_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
2925{
2926 lkb->lkb_sbflags = ms->m_sbflags;
2927 lkb->lkb_flags = (lkb->lkb_flags & 0xFFFF0000) |
2928 (ms->m_flags & 0x0000FFFF);
2929}
2930
2931static int receive_extralen(struct dlm_message *ms)
2932{
2933 return (ms->m_header.h_length - sizeof(struct dlm_message));
2934}
2935
David Teiglande7fd4172006-01-18 09:30:29 +00002936static int receive_lvb(struct dlm_ls *ls, struct dlm_lkb *lkb,
2937 struct dlm_message *ms)
2938{
2939 int len;
2940
2941 if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
2942 if (!lkb->lkb_lvbptr)
2943 lkb->lkb_lvbptr = allocate_lvb(ls);
2944 if (!lkb->lkb_lvbptr)
2945 return -ENOMEM;
2946 len = receive_extralen(ms);
2947 memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
2948 }
2949 return 0;
2950}
2951
2952static int receive_request_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2953 struct dlm_message *ms)
2954{
2955 lkb->lkb_nodeid = ms->m_header.h_nodeid;
2956 lkb->lkb_ownpid = ms->m_pid;
2957 lkb->lkb_remid = ms->m_lkid;
2958 lkb->lkb_grmode = DLM_LOCK_IV;
2959 lkb->lkb_rqmode = ms->m_rqmode;
2960 lkb->lkb_bastaddr = (void *) (long) (ms->m_asts & AST_BAST);
2961 lkb->lkb_astaddr = (void *) (long) (ms->m_asts & AST_COMP);
2962
2963 DLM_ASSERT(is_master_copy(lkb), dlm_print_lkb(lkb););
2964
David Teigland8d07fd52006-12-13 10:39:20 -06002965 if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
2966 /* lkb was just created so there won't be an lvb yet */
2967 lkb->lkb_lvbptr = allocate_lvb(ls);
2968 if (!lkb->lkb_lvbptr)
2969 return -ENOMEM;
2970 }
David Teiglande7fd4172006-01-18 09:30:29 +00002971
2972 return 0;
2973}
2974
2975static int receive_convert_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
2976 struct dlm_message *ms)
2977{
2978 if (lkb->lkb_nodeid != ms->m_header.h_nodeid) {
2979 log_error(ls, "convert_args nodeid %d %d lkid %x %x",
2980 lkb->lkb_nodeid, ms->m_header.h_nodeid,
2981 lkb->lkb_id, lkb->lkb_remid);
2982 return -EINVAL;
2983 }
2984
2985 if (!is_master_copy(lkb))
2986 return -EINVAL;
2987
2988 if (lkb->lkb_status != DLM_LKSTS_GRANTED)
2989 return -EBUSY;
2990
David Teiglande7fd4172006-01-18 09:30:29 +00002991 if (receive_lvb(ls, lkb, ms))
2992 return -ENOMEM;
2993
2994 lkb->lkb_rqmode = ms->m_rqmode;
2995 lkb->lkb_lvbseq = ms->m_lvbseq;
2996
2997 return 0;
2998}
2999
3000static int receive_unlock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
3001 struct dlm_message *ms)
3002{
3003 if (!is_master_copy(lkb))
3004 return -EINVAL;
3005 if (receive_lvb(ls, lkb, ms))
3006 return -ENOMEM;
3007 return 0;
3008}
3009
3010/* We fill in the stub-lkb fields with the info that send_xxxx_reply()
3011 uses to send a reply and that the remote end uses to process the reply. */
3012
3013static void setup_stub_lkb(struct dlm_ls *ls, struct dlm_message *ms)
3014{
3015 struct dlm_lkb *lkb = &ls->ls_stub_lkb;
3016 lkb->lkb_nodeid = ms->m_header.h_nodeid;
3017 lkb->lkb_remid = ms->m_lkid;
3018}
3019
3020static void receive_request(struct dlm_ls *ls, struct dlm_message *ms)
3021{
3022 struct dlm_lkb *lkb;
3023 struct dlm_rsb *r;
3024 int error, namelen;
3025
3026 error = create_lkb(ls, &lkb);
3027 if (error)
3028 goto fail;
3029
3030 receive_flags(lkb, ms);
3031 lkb->lkb_flags |= DLM_IFL_MSTCPY;
3032 error = receive_request_args(ls, lkb, ms);
3033 if (error) {
David Teiglandb3f58d82006-02-28 11:16:37 -05003034 __put_lkb(ls, lkb);
David Teiglande7fd4172006-01-18 09:30:29 +00003035 goto fail;
3036 }
3037
3038 namelen = receive_extralen(ms);
3039
3040 error = find_rsb(ls, ms->m_extra, namelen, R_MASTER, &r);
3041 if (error) {
David Teiglandb3f58d82006-02-28 11:16:37 -05003042 __put_lkb(ls, lkb);
David Teiglande7fd4172006-01-18 09:30:29 +00003043 goto fail;
3044 }
3045
3046 lock_rsb(r);
3047
3048 attach_lkb(r, lkb);
3049 error = do_request(r, lkb);
3050 send_request_reply(r, lkb, error);
3051
3052 unlock_rsb(r);
3053 put_rsb(r);
3054
3055 if (error == -EINPROGRESS)
3056 error = 0;
3057 if (error)
David Teiglandb3f58d82006-02-28 11:16:37 -05003058 dlm_put_lkb(lkb);
David Teiglande7fd4172006-01-18 09:30:29 +00003059 return;
3060
3061 fail:
3062 setup_stub_lkb(ls, ms);
3063 send_request_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3064}
3065
3066static void receive_convert(struct dlm_ls *ls, struct dlm_message *ms)
3067{
3068 struct dlm_lkb *lkb;
3069 struct dlm_rsb *r;
David Teigland90135922006-01-20 08:47:07 +00003070 int error, reply = 1;
David Teiglande7fd4172006-01-18 09:30:29 +00003071
3072 error = find_lkb(ls, ms->m_remid, &lkb);
3073 if (error)
3074 goto fail;
3075
3076 r = lkb->lkb_resource;
3077
3078 hold_rsb(r);
3079 lock_rsb(r);
3080
3081 receive_flags(lkb, ms);
3082 error = receive_convert_args(ls, lkb, ms);
3083 if (error)
3084 goto out;
3085 reply = !down_conversion(lkb);
3086
3087 error = do_convert(r, lkb);
3088 out:
3089 if (reply)
3090 send_convert_reply(r, lkb, error);
3091
3092 unlock_rsb(r);
3093 put_rsb(r);
David Teiglandb3f58d82006-02-28 11:16:37 -05003094 dlm_put_lkb(lkb);
David Teiglande7fd4172006-01-18 09:30:29 +00003095 return;
3096
3097 fail:
3098 setup_stub_lkb(ls, ms);
3099 send_convert_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3100}
3101
3102static void receive_unlock(struct dlm_ls *ls, struct dlm_message *ms)
3103{
3104 struct dlm_lkb *lkb;
3105 struct dlm_rsb *r;
3106 int error;
3107
3108 error = find_lkb(ls, ms->m_remid, &lkb);
3109 if (error)
3110 goto fail;
3111
3112 r = lkb->lkb_resource;
3113
3114 hold_rsb(r);
3115 lock_rsb(r);
3116
3117 receive_flags(lkb, ms);
3118 error = receive_unlock_args(ls, lkb, ms);
3119 if (error)
3120 goto out;
3121
3122 error = do_unlock(r, lkb);
3123 out:
3124 send_unlock_reply(r, lkb, error);
3125
3126 unlock_rsb(r);
3127 put_rsb(r);
David Teiglandb3f58d82006-02-28 11:16:37 -05003128 dlm_put_lkb(lkb);
David Teiglande7fd4172006-01-18 09:30:29 +00003129 return;
3130
3131 fail:
3132 setup_stub_lkb(ls, ms);
3133 send_unlock_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3134}
3135
3136static void receive_cancel(struct dlm_ls *ls, struct dlm_message *ms)
3137{
3138 struct dlm_lkb *lkb;
3139 struct dlm_rsb *r;
3140 int error;
3141
3142 error = find_lkb(ls, ms->m_remid, &lkb);
3143 if (error)
3144 goto fail;
3145
3146 receive_flags(lkb, ms);
3147
3148 r = lkb->lkb_resource;
3149
3150 hold_rsb(r);
3151 lock_rsb(r);
3152
3153 error = do_cancel(r, lkb);
3154 send_cancel_reply(r, lkb, error);
3155
3156 unlock_rsb(r);
3157 put_rsb(r);
David Teiglandb3f58d82006-02-28 11:16:37 -05003158 dlm_put_lkb(lkb);
David Teiglande7fd4172006-01-18 09:30:29 +00003159 return;
3160
3161 fail:
3162 setup_stub_lkb(ls, ms);
3163 send_cancel_reply(&ls->ls_stub_rsb, &ls->ls_stub_lkb, error);
3164}
3165
3166static void receive_grant(struct dlm_ls *ls, struct dlm_message *ms)
3167{
3168 struct dlm_lkb *lkb;
3169 struct dlm_rsb *r;
3170 int error;
3171
3172 error = find_lkb(ls, ms->m_remid, &lkb);
3173 if (error) {
3174 log_error(ls, "receive_grant no lkb");
3175 return;
3176 }
3177 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3178
3179 r = lkb->lkb_resource;
3180
3181 hold_rsb(r);
3182 lock_rsb(r);
3183
3184 receive_flags_reply(lkb, ms);
David Teigland7d3c1fe2007-04-19 10:30:41 -05003185 if (is_altmode(lkb))
3186 munge_altmode(lkb, ms);
David Teiglande7fd4172006-01-18 09:30:29 +00003187 grant_lock_pc(r, lkb, ms);
3188 queue_cast(r, lkb, 0);
3189
3190 unlock_rsb(r);
3191 put_rsb(r);
David Teiglandb3f58d82006-02-28 11:16:37 -05003192 dlm_put_lkb(lkb);
David Teiglande7fd4172006-01-18 09:30:29 +00003193}
3194
3195static void receive_bast(struct dlm_ls *ls, struct dlm_message *ms)
3196{
3197 struct dlm_lkb *lkb;
3198 struct dlm_rsb *r;
3199 int error;
3200
3201 error = find_lkb(ls, ms->m_remid, &lkb);
3202 if (error) {
3203 log_error(ls, "receive_bast no lkb");
3204 return;
3205 }
3206 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3207
3208 r = lkb->lkb_resource;
3209
3210 hold_rsb(r);
3211 lock_rsb(r);
3212
3213 queue_bast(r, lkb, ms->m_bastmode);
3214
3215 unlock_rsb(r);
3216 put_rsb(r);
David Teiglandb3f58d82006-02-28 11:16:37 -05003217 dlm_put_lkb(lkb);
David Teiglande7fd4172006-01-18 09:30:29 +00003218}
3219
3220static void receive_lookup(struct dlm_ls *ls, struct dlm_message *ms)
3221{
3222 int len, error, ret_nodeid, dir_nodeid, from_nodeid, our_nodeid;
3223
3224 from_nodeid = ms->m_header.h_nodeid;
3225 our_nodeid = dlm_our_nodeid();
3226
3227 len = receive_extralen(ms);
3228
3229 dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
3230 if (dir_nodeid != our_nodeid) {
3231 log_error(ls, "lookup dir_nodeid %d from %d",
3232 dir_nodeid, from_nodeid);
3233 error = -EINVAL;
3234 ret_nodeid = -1;
3235 goto out;
3236 }
3237
3238 error = dlm_dir_lookup(ls, from_nodeid, ms->m_extra, len, &ret_nodeid);
3239
3240 /* Optimization: we're master so treat lookup as a request */
3241 if (!error && ret_nodeid == our_nodeid) {
3242 receive_request(ls, ms);
3243 return;
3244 }
3245 out:
3246 send_lookup_reply(ls, ms, ret_nodeid, error);
3247}
3248
3249static void receive_remove(struct dlm_ls *ls, struct dlm_message *ms)
3250{
3251 int len, dir_nodeid, from_nodeid;
3252
3253 from_nodeid = ms->m_header.h_nodeid;
3254
3255 len = receive_extralen(ms);
3256
3257 dir_nodeid = dlm_hash2nodeid(ls, ms->m_hash);
3258 if (dir_nodeid != dlm_our_nodeid()) {
3259 log_error(ls, "remove dir entry dir_nodeid %d from %d",
3260 dir_nodeid, from_nodeid);
3261 return;
3262 }
3263
3264 dlm_dir_remove_entry(ls, from_nodeid, ms->m_extra, len);
3265}
3266
David Teigland84991372007-03-30 15:02:40 -05003267static void receive_purge(struct dlm_ls *ls, struct dlm_message *ms)
3268{
3269 do_purge(ls, ms->m_nodeid, ms->m_pid);
3270}
3271
David Teiglande7fd4172006-01-18 09:30:29 +00003272static void receive_request_reply(struct dlm_ls *ls, struct dlm_message *ms)
3273{
3274 struct dlm_lkb *lkb;
3275 struct dlm_rsb *r;
David Teiglandef0c2bb2007-03-28 09:56:46 -05003276 int error, mstype, result;
David Teiglande7fd4172006-01-18 09:30:29 +00003277
3278 error = find_lkb(ls, ms->m_remid, &lkb);
3279 if (error) {
3280 log_error(ls, "receive_request_reply no lkb");
3281 return;
3282 }
3283 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3284
David Teiglande7fd4172006-01-18 09:30:29 +00003285 r = lkb->lkb_resource;
3286 hold_rsb(r);
3287 lock_rsb(r);
3288
David Teiglandef0c2bb2007-03-28 09:56:46 -05003289 mstype = lkb->lkb_wait_type;
3290 error = remove_from_waiters(lkb, DLM_MSG_REQUEST_REPLY);
3291 if (error)
3292 goto out;
3293
David Teiglande7fd4172006-01-18 09:30:29 +00003294 /* Optimization: the dir node was also the master, so it took our
3295 lookup as a request and sent request reply instead of lookup reply */
3296 if (mstype == DLM_MSG_LOOKUP) {
3297 r->res_nodeid = ms->m_header.h_nodeid;
3298 lkb->lkb_nodeid = r->res_nodeid;
3299 }
3300
David Teiglandef0c2bb2007-03-28 09:56:46 -05003301 /* this is the value returned from do_request() on the master */
3302 result = ms->m_result;
3303
3304 switch (result) {
David Teiglande7fd4172006-01-18 09:30:29 +00003305 case -EAGAIN:
David Teiglandef0c2bb2007-03-28 09:56:46 -05003306 /* request would block (be queued) on remote master */
David Teiglande7fd4172006-01-18 09:30:29 +00003307 queue_cast(r, lkb, -EAGAIN);
3308 confirm_master(r, -EAGAIN);
David Teiglandef0c2bb2007-03-28 09:56:46 -05003309 unhold_lkb(lkb); /* undoes create_lkb() */
David Teiglande7fd4172006-01-18 09:30:29 +00003310 break;
3311
3312 case -EINPROGRESS:
3313 case 0:
3314 /* request was queued or granted on remote master */
3315 receive_flags_reply(lkb, ms);
3316 lkb->lkb_remid = ms->m_lkid;
David Teigland7d3c1fe2007-04-19 10:30:41 -05003317 if (is_altmode(lkb))
3318 munge_altmode(lkb, ms);
David Teigland3ae1acf2007-05-18 08:59:31 -05003319 if (result) {
David Teiglande7fd4172006-01-18 09:30:29 +00003320 add_lkb(r, lkb, DLM_LKSTS_WAITING);
David Teigland3ae1acf2007-05-18 08:59:31 -05003321 add_timeout(lkb);
3322 } else {
David Teiglande7fd4172006-01-18 09:30:29 +00003323 grant_lock_pc(r, lkb, ms);
3324 queue_cast(r, lkb, 0);
3325 }
David Teiglandef0c2bb2007-03-28 09:56:46 -05003326 confirm_master(r, result);
David Teiglande7fd4172006-01-18 09:30:29 +00003327 break;
3328
David Teigland597d0ca2006-07-12 16:44:04 -05003329 case -EBADR:
David Teiglande7fd4172006-01-18 09:30:29 +00003330 case -ENOTBLK:
3331 /* find_rsb failed to find rsb or rsb wasn't master */
David Teiglandef0c2bb2007-03-28 09:56:46 -05003332 log_debug(ls, "receive_request_reply %x %x master diff %d %d",
3333 lkb->lkb_id, lkb->lkb_flags, r->res_nodeid, result);
David Teiglande7fd4172006-01-18 09:30:29 +00003334 r->res_nodeid = -1;
3335 lkb->lkb_nodeid = -1;
David Teiglandef0c2bb2007-03-28 09:56:46 -05003336
3337 if (is_overlap(lkb)) {
3338 /* we'll ignore error in cancel/unlock reply */
3339 queue_cast_overlap(r, lkb);
3340 unhold_lkb(lkb); /* undoes create_lkb() */
3341 } else
3342 _request_lock(r, lkb);
David Teiglande7fd4172006-01-18 09:30:29 +00003343 break;
3344
3345 default:
David Teiglandef0c2bb2007-03-28 09:56:46 -05003346 log_error(ls, "receive_request_reply %x error %d",
3347 lkb->lkb_id, result);
David Teiglande7fd4172006-01-18 09:30:29 +00003348 }
3349
David Teiglandef0c2bb2007-03-28 09:56:46 -05003350 if (is_overlap_unlock(lkb) && (result == 0 || result == -EINPROGRESS)) {
3351 log_debug(ls, "receive_request_reply %x result %d unlock",
3352 lkb->lkb_id, result);
3353 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3354 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3355 send_unlock(r, lkb);
3356 } else if (is_overlap_cancel(lkb) && (result == -EINPROGRESS)) {
3357 log_debug(ls, "receive_request_reply %x cancel", lkb->lkb_id);
3358 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3359 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3360 send_cancel(r, lkb);
3361 } else {
3362 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3363 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3364 }
3365 out:
David Teiglande7fd4172006-01-18 09:30:29 +00003366 unlock_rsb(r);
3367 put_rsb(r);
David Teiglandb3f58d82006-02-28 11:16:37 -05003368 dlm_put_lkb(lkb);
David Teiglande7fd4172006-01-18 09:30:29 +00003369}
3370
3371static void __receive_convert_reply(struct dlm_rsb *r, struct dlm_lkb *lkb,
3372 struct dlm_message *ms)
3373{
David Teiglande7fd4172006-01-18 09:30:29 +00003374 /* this is the value returned from do_convert() on the master */
David Teiglandef0c2bb2007-03-28 09:56:46 -05003375 switch (ms->m_result) {
David Teiglande7fd4172006-01-18 09:30:29 +00003376 case -EAGAIN:
3377 /* convert would block (be queued) on remote master */
3378 queue_cast(r, lkb, -EAGAIN);
3379 break;
3380
David Teiglandc85d65e2007-05-18 09:01:26 -05003381 case -EDEADLK:
3382 receive_flags_reply(lkb, ms);
3383 revert_lock_pc(r, lkb);
3384 queue_cast(r, lkb, -EDEADLK);
3385 break;
3386
David Teiglande7fd4172006-01-18 09:30:29 +00003387 case -EINPROGRESS:
3388 /* convert was queued on remote master */
David Teigland7d3c1fe2007-04-19 10:30:41 -05003389 receive_flags_reply(lkb, ms);
3390 if (is_demoted(lkb))
3391 munge_demoted(lkb, ms);
David Teiglande7fd4172006-01-18 09:30:29 +00003392 del_lkb(r, lkb);
3393 add_lkb(r, lkb, DLM_LKSTS_CONVERT);
David Teigland3ae1acf2007-05-18 08:59:31 -05003394 add_timeout(lkb);
David Teiglande7fd4172006-01-18 09:30:29 +00003395 break;
3396
3397 case 0:
3398 /* convert was granted on remote master */
3399 receive_flags_reply(lkb, ms);
David Teigland7d3c1fe2007-04-19 10:30:41 -05003400 if (is_demoted(lkb))
3401 munge_demoted(lkb, ms);
David Teiglande7fd4172006-01-18 09:30:29 +00003402 grant_lock_pc(r, lkb, ms);
3403 queue_cast(r, lkb, 0);
3404 break;
3405
3406 default:
David Teiglandef0c2bb2007-03-28 09:56:46 -05003407 log_error(r->res_ls, "receive_convert_reply %x error %d",
3408 lkb->lkb_id, ms->m_result);
David Teiglande7fd4172006-01-18 09:30:29 +00003409 }
3410}
3411
3412static void _receive_convert_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3413{
3414 struct dlm_rsb *r = lkb->lkb_resource;
David Teiglandef0c2bb2007-03-28 09:56:46 -05003415 int error;
David Teiglande7fd4172006-01-18 09:30:29 +00003416
3417 hold_rsb(r);
3418 lock_rsb(r);
3419
David Teiglandef0c2bb2007-03-28 09:56:46 -05003420 /* stub reply can happen with waiters_mutex held */
3421 error = remove_from_waiters_ms(lkb, ms);
3422 if (error)
3423 goto out;
David Teiglande7fd4172006-01-18 09:30:29 +00003424
David Teiglandef0c2bb2007-03-28 09:56:46 -05003425 __receive_convert_reply(r, lkb, ms);
3426 out:
David Teiglande7fd4172006-01-18 09:30:29 +00003427 unlock_rsb(r);
3428 put_rsb(r);
3429}
3430
3431static void receive_convert_reply(struct dlm_ls *ls, struct dlm_message *ms)
3432{
3433 struct dlm_lkb *lkb;
3434 int error;
3435
3436 error = find_lkb(ls, ms->m_remid, &lkb);
3437 if (error) {
3438 log_error(ls, "receive_convert_reply no lkb");
3439 return;
3440 }
3441 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3442
David Teiglande7fd4172006-01-18 09:30:29 +00003443 _receive_convert_reply(lkb, ms);
David Teiglandb3f58d82006-02-28 11:16:37 -05003444 dlm_put_lkb(lkb);
David Teiglande7fd4172006-01-18 09:30:29 +00003445}
3446
3447static void _receive_unlock_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3448{
3449 struct dlm_rsb *r = lkb->lkb_resource;
David Teiglandef0c2bb2007-03-28 09:56:46 -05003450 int error;
David Teiglande7fd4172006-01-18 09:30:29 +00003451
3452 hold_rsb(r);
3453 lock_rsb(r);
3454
David Teiglandef0c2bb2007-03-28 09:56:46 -05003455 /* stub reply can happen with waiters_mutex held */
3456 error = remove_from_waiters_ms(lkb, ms);
3457 if (error)
3458 goto out;
3459
David Teiglande7fd4172006-01-18 09:30:29 +00003460 /* this is the value returned from do_unlock() on the master */
3461
David Teiglandef0c2bb2007-03-28 09:56:46 -05003462 switch (ms->m_result) {
David Teiglande7fd4172006-01-18 09:30:29 +00003463 case -DLM_EUNLOCK:
3464 receive_flags_reply(lkb, ms);
3465 remove_lock_pc(r, lkb);
3466 queue_cast(r, lkb, -DLM_EUNLOCK);
3467 break;
David Teiglandef0c2bb2007-03-28 09:56:46 -05003468 case -ENOENT:
3469 break;
David Teiglande7fd4172006-01-18 09:30:29 +00003470 default:
David Teiglandef0c2bb2007-03-28 09:56:46 -05003471 log_error(r->res_ls, "receive_unlock_reply %x error %d",
3472 lkb->lkb_id, ms->m_result);
David Teiglande7fd4172006-01-18 09:30:29 +00003473 }
David Teiglandef0c2bb2007-03-28 09:56:46 -05003474 out:
David Teiglande7fd4172006-01-18 09:30:29 +00003475 unlock_rsb(r);
3476 put_rsb(r);
3477}
3478
3479static void receive_unlock_reply(struct dlm_ls *ls, struct dlm_message *ms)
3480{
3481 struct dlm_lkb *lkb;
3482 int error;
3483
3484 error = find_lkb(ls, ms->m_remid, &lkb);
3485 if (error) {
3486 log_error(ls, "receive_unlock_reply no lkb");
3487 return;
3488 }
3489 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3490
David Teiglande7fd4172006-01-18 09:30:29 +00003491 _receive_unlock_reply(lkb, ms);
David Teiglandb3f58d82006-02-28 11:16:37 -05003492 dlm_put_lkb(lkb);
David Teiglande7fd4172006-01-18 09:30:29 +00003493}
3494
3495static void _receive_cancel_reply(struct dlm_lkb *lkb, struct dlm_message *ms)
3496{
3497 struct dlm_rsb *r = lkb->lkb_resource;
David Teiglandef0c2bb2007-03-28 09:56:46 -05003498 int error;
David Teiglande7fd4172006-01-18 09:30:29 +00003499
3500 hold_rsb(r);
3501 lock_rsb(r);
3502
David Teiglandef0c2bb2007-03-28 09:56:46 -05003503 /* stub reply can happen with waiters_mutex held */
3504 error = remove_from_waiters_ms(lkb, ms);
3505 if (error)
3506 goto out;
3507
David Teiglande7fd4172006-01-18 09:30:29 +00003508 /* this is the value returned from do_cancel() on the master */
3509
David Teiglandef0c2bb2007-03-28 09:56:46 -05003510 switch (ms->m_result) {
David Teiglande7fd4172006-01-18 09:30:29 +00003511 case -DLM_ECANCEL:
3512 receive_flags_reply(lkb, ms);
3513 revert_lock_pc(r, lkb);
David Teigland84d8cd62007-05-29 08:44:23 -05003514 queue_cast(r, lkb, -DLM_ECANCEL);
David Teiglandef0c2bb2007-03-28 09:56:46 -05003515 break;
3516 case 0:
David Teiglande7fd4172006-01-18 09:30:29 +00003517 break;
3518 default:
David Teiglandef0c2bb2007-03-28 09:56:46 -05003519 log_error(r->res_ls, "receive_cancel_reply %x error %d",
3520 lkb->lkb_id, ms->m_result);
David Teiglande7fd4172006-01-18 09:30:29 +00003521 }
David Teiglandef0c2bb2007-03-28 09:56:46 -05003522 out:
David Teiglande7fd4172006-01-18 09:30:29 +00003523 unlock_rsb(r);
3524 put_rsb(r);
3525}
3526
3527static void receive_cancel_reply(struct dlm_ls *ls, struct dlm_message *ms)
3528{
3529 struct dlm_lkb *lkb;
3530 int error;
3531
3532 error = find_lkb(ls, ms->m_remid, &lkb);
3533 if (error) {
3534 log_error(ls, "receive_cancel_reply no lkb");
3535 return;
3536 }
3537 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
3538
David Teiglande7fd4172006-01-18 09:30:29 +00003539 _receive_cancel_reply(lkb, ms);
David Teiglandb3f58d82006-02-28 11:16:37 -05003540 dlm_put_lkb(lkb);
David Teiglande7fd4172006-01-18 09:30:29 +00003541}
3542
3543static void receive_lookup_reply(struct dlm_ls *ls, struct dlm_message *ms)
3544{
3545 struct dlm_lkb *lkb;
3546 struct dlm_rsb *r;
3547 int error, ret_nodeid;
3548
3549 error = find_lkb(ls, ms->m_lkid, &lkb);
3550 if (error) {
3551 log_error(ls, "receive_lookup_reply no lkb");
3552 return;
3553 }
3554
David Teiglandef0c2bb2007-03-28 09:56:46 -05003555 /* ms->m_result is the value returned by dlm_dir_lookup on dir node
David Teiglande7fd4172006-01-18 09:30:29 +00003556 FIXME: will a non-zero error ever be returned? */
David Teiglande7fd4172006-01-18 09:30:29 +00003557
3558 r = lkb->lkb_resource;
3559 hold_rsb(r);
3560 lock_rsb(r);
3561
David Teiglandef0c2bb2007-03-28 09:56:46 -05003562 error = remove_from_waiters(lkb, DLM_MSG_LOOKUP_REPLY);
3563 if (error)
3564 goto out;
3565
David Teiglande7fd4172006-01-18 09:30:29 +00003566 ret_nodeid = ms->m_nodeid;
3567 if (ret_nodeid == dlm_our_nodeid()) {
3568 r->res_nodeid = 0;
3569 ret_nodeid = 0;
3570 r->res_first_lkid = 0;
3571 } else {
3572 /* set_master() will copy res_nodeid to lkb_nodeid */
3573 r->res_nodeid = ret_nodeid;
3574 }
3575
David Teiglandef0c2bb2007-03-28 09:56:46 -05003576 if (is_overlap(lkb)) {
3577 log_debug(ls, "receive_lookup_reply %x unlock %x",
3578 lkb->lkb_id, lkb->lkb_flags);
3579 queue_cast_overlap(r, lkb);
3580 unhold_lkb(lkb); /* undoes create_lkb() */
3581 goto out_list;
3582 }
3583
David Teiglande7fd4172006-01-18 09:30:29 +00003584 _request_lock(r, lkb);
3585
David Teiglandef0c2bb2007-03-28 09:56:46 -05003586 out_list:
David Teiglande7fd4172006-01-18 09:30:29 +00003587 if (!ret_nodeid)
3588 process_lookup_list(r);
David Teiglandef0c2bb2007-03-28 09:56:46 -05003589 out:
David Teiglande7fd4172006-01-18 09:30:29 +00003590 unlock_rsb(r);
3591 put_rsb(r);
David Teiglandb3f58d82006-02-28 11:16:37 -05003592 dlm_put_lkb(lkb);
David Teiglande7fd4172006-01-18 09:30:29 +00003593}
3594
3595int dlm_receive_message(struct dlm_header *hd, int nodeid, int recovery)
3596{
3597 struct dlm_message *ms = (struct dlm_message *) hd;
3598 struct dlm_ls *ls;
David Teigland8fd3a982007-01-24 10:11:45 -06003599 int error = 0;
David Teiglande7fd4172006-01-18 09:30:29 +00003600
3601 if (!recovery)
3602 dlm_message_in(ms);
3603
3604 ls = dlm_find_lockspace_global(hd->h_lockspace);
3605 if (!ls) {
3606 log_print("drop message %d from %d for unknown lockspace %d",
3607 ms->m_type, nodeid, hd->h_lockspace);
3608 return -EINVAL;
3609 }
3610
3611 /* recovery may have just ended leaving a bunch of backed-up requests
3612 in the requestqueue; wait while dlm_recoverd clears them */
3613
3614 if (!recovery)
3615 dlm_wait_requestqueue(ls);
3616
3617 /* recovery may have just started while there were a bunch of
3618 in-flight requests -- save them in requestqueue to be processed
3619 after recovery. we can't let dlm_recvd block on the recovery
3620 lock. if dlm_recoverd is calling this function to clear the
3621 requestqueue, it needs to be interrupted (-EINTR) if another
3622 recovery operation is starting. */
3623
3624 while (1) {
3625 if (dlm_locking_stopped(ls)) {
David Teiglandd4400152006-10-31 11:55:56 -06003626 if (recovery) {
3627 error = -EINTR;
3628 goto out;
3629 }
3630 error = dlm_add_requestqueue(ls, nodeid, hd);
3631 if (error == -EAGAIN)
3632 continue;
3633 else {
3634 error = -EINTR;
3635 goto out;
3636 }
David Teiglande7fd4172006-01-18 09:30:29 +00003637 }
3638
David Teigland85e86ed2007-05-18 08:58:15 -05003639 if (dlm_lock_recovery_try(ls))
David Teiglande7fd4172006-01-18 09:30:29 +00003640 break;
3641 schedule();
3642 }
3643
3644 switch (ms->m_type) {
3645
3646 /* messages sent to a master node */
3647
3648 case DLM_MSG_REQUEST:
3649 receive_request(ls, ms);
3650 break;
3651
3652 case DLM_MSG_CONVERT:
3653 receive_convert(ls, ms);
3654 break;
3655
3656 case DLM_MSG_UNLOCK:
3657 receive_unlock(ls, ms);
3658 break;
3659
3660 case DLM_MSG_CANCEL:
3661 receive_cancel(ls, ms);
3662 break;
3663
3664 /* messages sent from a master node (replies to above) */
3665
3666 case DLM_MSG_REQUEST_REPLY:
3667 receive_request_reply(ls, ms);
3668 break;
3669
3670 case DLM_MSG_CONVERT_REPLY:
3671 receive_convert_reply(ls, ms);
3672 break;
3673
3674 case DLM_MSG_UNLOCK_REPLY:
3675 receive_unlock_reply(ls, ms);
3676 break;
3677
3678 case DLM_MSG_CANCEL_REPLY:
3679 receive_cancel_reply(ls, ms);
3680 break;
3681
3682 /* messages sent from a master node (only two types of async msg) */
3683
3684 case DLM_MSG_GRANT:
3685 receive_grant(ls, ms);
3686 break;
3687
3688 case DLM_MSG_BAST:
3689 receive_bast(ls, ms);
3690 break;
3691
3692 /* messages sent to a dir node */
3693
3694 case DLM_MSG_LOOKUP:
3695 receive_lookup(ls, ms);
3696 break;
3697
3698 case DLM_MSG_REMOVE:
3699 receive_remove(ls, ms);
3700 break;
3701
3702 /* messages sent from a dir node (remove has no reply) */
3703
3704 case DLM_MSG_LOOKUP_REPLY:
3705 receive_lookup_reply(ls, ms);
3706 break;
3707
David Teigland84991372007-03-30 15:02:40 -05003708 /* other messages */
3709
3710 case DLM_MSG_PURGE:
3711 receive_purge(ls, ms);
3712 break;
3713
David Teiglande7fd4172006-01-18 09:30:29 +00003714 default:
3715 log_error(ls, "unknown message type %d", ms->m_type);
3716 }
3717
David Teigland85e86ed2007-05-18 08:58:15 -05003718 dlm_unlock_recovery(ls);
David Teiglande7fd4172006-01-18 09:30:29 +00003719 out:
3720 dlm_put_lockspace(ls);
3721 dlm_astd_wake();
David Teigland8fd3a982007-01-24 10:11:45 -06003722 return error;
David Teiglande7fd4172006-01-18 09:30:29 +00003723}
3724
3725
3726/*
3727 * Recovery related
3728 */
3729
3730static void recover_convert_waiter(struct dlm_ls *ls, struct dlm_lkb *lkb)
3731{
3732 if (middle_conversion(lkb)) {
3733 hold_lkb(lkb);
David Teiglandef0c2bb2007-03-28 09:56:46 -05003734 ls->ls_stub_ms.m_type = DLM_MSG_CONVERT_REPLY;
David Teiglande7fd4172006-01-18 09:30:29 +00003735 ls->ls_stub_ms.m_result = -EINPROGRESS;
David Teigland075529b2006-12-13 10:40:26 -06003736 ls->ls_stub_ms.m_flags = lkb->lkb_flags;
David Teiglande7fd4172006-01-18 09:30:29 +00003737 _receive_convert_reply(lkb, &ls->ls_stub_ms);
3738
3739 /* Same special case as in receive_rcom_lock_args() */
3740 lkb->lkb_grmode = DLM_LOCK_IV;
3741 rsb_set_flag(lkb->lkb_resource, RSB_RECOVER_CONVERT);
3742 unhold_lkb(lkb);
3743
3744 } else if (lkb->lkb_rqmode >= lkb->lkb_grmode) {
3745 lkb->lkb_flags |= DLM_IFL_RESEND;
3746 }
3747
3748 /* lkb->lkb_rqmode < lkb->lkb_grmode shouldn't happen since down
3749 conversions are async; there's no reply from the remote master */
3750}
3751
3752/* A waiting lkb needs recovery if the master node has failed, or
3753 the master node is changing (only when no directory is used) */
3754
3755static int waiter_needs_recovery(struct dlm_ls *ls, struct dlm_lkb *lkb)
3756{
3757 if (dlm_is_removed(ls, lkb->lkb_nodeid))
3758 return 1;
3759
3760 if (!dlm_no_directory(ls))
3761 return 0;
3762
3763 if (dlm_dir_nodeid(lkb->lkb_resource) != lkb->lkb_nodeid)
3764 return 1;
3765
3766 return 0;
3767}
3768
3769/* Recovery for locks that are waiting for replies from nodes that are now
3770 gone. We can just complete unlocks and cancels by faking a reply from the
3771 dead node. Requests and up-conversions we flag to be resent after
3772 recovery. Down-conversions can just be completed with a fake reply like
3773 unlocks. Conversions between PR and CW need special attention. */
3774
3775void dlm_recover_waiters_pre(struct dlm_ls *ls)
3776{
3777 struct dlm_lkb *lkb, *safe;
3778
David Teigland90135922006-01-20 08:47:07 +00003779 mutex_lock(&ls->ls_waiters_mutex);
David Teiglande7fd4172006-01-18 09:30:29 +00003780
3781 list_for_each_entry_safe(lkb, safe, &ls->ls_waiters, lkb_wait_reply) {
3782 log_debug(ls, "pre recover waiter lkid %x type %d flags %x",
3783 lkb->lkb_id, lkb->lkb_wait_type, lkb->lkb_flags);
3784
3785 /* all outstanding lookups, regardless of destination will be
3786 resent after recovery is done */
3787
3788 if (lkb->lkb_wait_type == DLM_MSG_LOOKUP) {
3789 lkb->lkb_flags |= DLM_IFL_RESEND;
3790 continue;
3791 }
3792
3793 if (!waiter_needs_recovery(ls, lkb))
3794 continue;
3795
3796 switch (lkb->lkb_wait_type) {
3797
3798 case DLM_MSG_REQUEST:
3799 lkb->lkb_flags |= DLM_IFL_RESEND;
3800 break;
3801
3802 case DLM_MSG_CONVERT:
3803 recover_convert_waiter(ls, lkb);
3804 break;
3805
3806 case DLM_MSG_UNLOCK:
3807 hold_lkb(lkb);
David Teiglandef0c2bb2007-03-28 09:56:46 -05003808 ls->ls_stub_ms.m_type = DLM_MSG_UNLOCK_REPLY;
David Teiglande7fd4172006-01-18 09:30:29 +00003809 ls->ls_stub_ms.m_result = -DLM_EUNLOCK;
David Teigland075529b2006-12-13 10:40:26 -06003810 ls->ls_stub_ms.m_flags = lkb->lkb_flags;
David Teiglande7fd4172006-01-18 09:30:29 +00003811 _receive_unlock_reply(lkb, &ls->ls_stub_ms);
David Teiglandb3f58d82006-02-28 11:16:37 -05003812 dlm_put_lkb(lkb);
David Teiglande7fd4172006-01-18 09:30:29 +00003813 break;
3814
3815 case DLM_MSG_CANCEL:
3816 hold_lkb(lkb);
David Teiglandef0c2bb2007-03-28 09:56:46 -05003817 ls->ls_stub_ms.m_type = DLM_MSG_CANCEL_REPLY;
David Teiglande7fd4172006-01-18 09:30:29 +00003818 ls->ls_stub_ms.m_result = -DLM_ECANCEL;
David Teigland075529b2006-12-13 10:40:26 -06003819 ls->ls_stub_ms.m_flags = lkb->lkb_flags;
David Teiglande7fd4172006-01-18 09:30:29 +00003820 _receive_cancel_reply(lkb, &ls->ls_stub_ms);
David Teiglandb3f58d82006-02-28 11:16:37 -05003821 dlm_put_lkb(lkb);
David Teiglande7fd4172006-01-18 09:30:29 +00003822 break;
3823
3824 default:
3825 log_error(ls, "invalid lkb wait_type %d",
3826 lkb->lkb_wait_type);
3827 }
David Teigland81456802006-07-25 14:05:09 -05003828 schedule();
David Teiglande7fd4172006-01-18 09:30:29 +00003829 }
David Teigland90135922006-01-20 08:47:07 +00003830 mutex_unlock(&ls->ls_waiters_mutex);
David Teiglande7fd4172006-01-18 09:30:29 +00003831}
3832
David Teiglandef0c2bb2007-03-28 09:56:46 -05003833static struct dlm_lkb *find_resend_waiter(struct dlm_ls *ls)
David Teiglande7fd4172006-01-18 09:30:29 +00003834{
3835 struct dlm_lkb *lkb;
David Teiglandef0c2bb2007-03-28 09:56:46 -05003836 int found = 0;
David Teiglande7fd4172006-01-18 09:30:29 +00003837
David Teigland90135922006-01-20 08:47:07 +00003838 mutex_lock(&ls->ls_waiters_mutex);
David Teiglande7fd4172006-01-18 09:30:29 +00003839 list_for_each_entry(lkb, &ls->ls_waiters, lkb_wait_reply) {
3840 if (lkb->lkb_flags & DLM_IFL_RESEND) {
David Teiglandef0c2bb2007-03-28 09:56:46 -05003841 hold_lkb(lkb);
3842 found = 1;
David Teiglande7fd4172006-01-18 09:30:29 +00003843 break;
3844 }
3845 }
David Teigland90135922006-01-20 08:47:07 +00003846 mutex_unlock(&ls->ls_waiters_mutex);
David Teiglande7fd4172006-01-18 09:30:29 +00003847
David Teiglandef0c2bb2007-03-28 09:56:46 -05003848 if (!found)
David Teiglande7fd4172006-01-18 09:30:29 +00003849 lkb = NULL;
David Teiglandef0c2bb2007-03-28 09:56:46 -05003850 return lkb;
David Teiglande7fd4172006-01-18 09:30:29 +00003851}
3852
3853/* Deal with lookups and lkb's marked RESEND from _pre. We may now be the
3854 master or dir-node for r. Processing the lkb may result in it being placed
3855 back on waiters. */
3856
David Teiglandef0c2bb2007-03-28 09:56:46 -05003857/* We do this after normal locking has been enabled and any saved messages
3858 (in requestqueue) have been processed. We should be confident that at
3859 this point we won't get or process a reply to any of these waiting
3860 operations. But, new ops may be coming in on the rsbs/locks here from
3861 userspace or remotely. */
3862
3863/* there may have been an overlap unlock/cancel prior to recovery or after
3864 recovery. if before, the lkb may still have a pos wait_count; if after, the
3865 overlap flag would just have been set and nothing new sent. we can be
3866 confident here than any replies to either the initial op or overlap ops
3867 prior to recovery have been received. */
3868
David Teiglande7fd4172006-01-18 09:30:29 +00003869int dlm_recover_waiters_post(struct dlm_ls *ls)
3870{
3871 struct dlm_lkb *lkb;
3872 struct dlm_rsb *r;
David Teiglandef0c2bb2007-03-28 09:56:46 -05003873 int error = 0, mstype, err, oc, ou;
David Teiglande7fd4172006-01-18 09:30:29 +00003874
3875 while (1) {
3876 if (dlm_locking_stopped(ls)) {
3877 log_debug(ls, "recover_waiters_post aborted");
3878 error = -EINTR;
3879 break;
3880 }
3881
David Teiglandef0c2bb2007-03-28 09:56:46 -05003882 lkb = find_resend_waiter(ls);
3883 if (!lkb)
David Teiglande7fd4172006-01-18 09:30:29 +00003884 break;
3885
3886 r = lkb->lkb_resource;
David Teiglandef0c2bb2007-03-28 09:56:46 -05003887 hold_rsb(r);
3888 lock_rsb(r);
3889
3890 mstype = lkb->lkb_wait_type;
3891 oc = is_overlap_cancel(lkb);
3892 ou = is_overlap_unlock(lkb);
3893 err = 0;
David Teiglande7fd4172006-01-18 09:30:29 +00003894
3895 log_debug(ls, "recover_waiters_post %x type %d flags %x %s",
3896 lkb->lkb_id, mstype, lkb->lkb_flags, r->res_name);
3897
David Teiglandef0c2bb2007-03-28 09:56:46 -05003898 /* At this point we assume that we won't get a reply to any
3899 previous op or overlap op on this lock. First, do a big
3900 remove_from_waiters() for all previous ops. */
David Teiglande7fd4172006-01-18 09:30:29 +00003901
David Teiglandef0c2bb2007-03-28 09:56:46 -05003902 lkb->lkb_flags &= ~DLM_IFL_RESEND;
3903 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_UNLOCK;
3904 lkb->lkb_flags &= ~DLM_IFL_OVERLAP_CANCEL;
3905 lkb->lkb_wait_type = 0;
3906 lkb->lkb_wait_count = 0;
3907 mutex_lock(&ls->ls_waiters_mutex);
3908 list_del_init(&lkb->lkb_wait_reply);
3909 mutex_unlock(&ls->ls_waiters_mutex);
3910 unhold_lkb(lkb); /* for waiters list */
David Teiglande7fd4172006-01-18 09:30:29 +00003911
David Teiglandef0c2bb2007-03-28 09:56:46 -05003912 if (oc || ou) {
3913 /* do an unlock or cancel instead of resending */
3914 switch (mstype) {
3915 case DLM_MSG_LOOKUP:
3916 case DLM_MSG_REQUEST:
3917 queue_cast(r, lkb, ou ? -DLM_EUNLOCK :
3918 -DLM_ECANCEL);
3919 unhold_lkb(lkb); /* undoes create_lkb() */
3920 break;
3921 case DLM_MSG_CONVERT:
3922 if (oc) {
3923 queue_cast(r, lkb, -DLM_ECANCEL);
3924 } else {
3925 lkb->lkb_exflags |= DLM_LKF_FORCEUNLOCK;
3926 _unlock_lock(r, lkb);
3927 }
3928 break;
3929 default:
3930 err = 1;
3931 }
3932 } else {
3933 switch (mstype) {
3934 case DLM_MSG_LOOKUP:
3935 case DLM_MSG_REQUEST:
3936 _request_lock(r, lkb);
3937 if (is_master(r))
3938 confirm_master(r, 0);
3939 break;
3940 case DLM_MSG_CONVERT:
3941 _convert_lock(r, lkb);
3942 break;
3943 default:
3944 err = 1;
3945 }
David Teiglande7fd4172006-01-18 09:30:29 +00003946 }
David Teiglandef0c2bb2007-03-28 09:56:46 -05003947
3948 if (err)
3949 log_error(ls, "recover_waiters_post %x %d %x %d %d",
3950 lkb->lkb_id, mstype, lkb->lkb_flags, oc, ou);
3951 unlock_rsb(r);
3952 put_rsb(r);
3953 dlm_put_lkb(lkb);
David Teiglande7fd4172006-01-18 09:30:29 +00003954 }
3955
3956 return error;
3957}
3958
3959static void purge_queue(struct dlm_rsb *r, struct list_head *queue,
3960 int (*test)(struct dlm_ls *ls, struct dlm_lkb *lkb))
3961{
3962 struct dlm_ls *ls = r->res_ls;
3963 struct dlm_lkb *lkb, *safe;
3964
3965 list_for_each_entry_safe(lkb, safe, queue, lkb_statequeue) {
3966 if (test(ls, lkb)) {
David Teigland97a35d12006-05-02 13:34:03 -04003967 rsb_set_flag(r, RSB_LOCKS_PURGED);
David Teiglande7fd4172006-01-18 09:30:29 +00003968 del_lkb(r, lkb);
3969 /* this put should free the lkb */
David Teiglandb3f58d82006-02-28 11:16:37 -05003970 if (!dlm_put_lkb(lkb))
David Teiglande7fd4172006-01-18 09:30:29 +00003971 log_error(ls, "purged lkb not released");
3972 }
3973 }
3974}
3975
3976static int purge_dead_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
3977{
3978 return (is_master_copy(lkb) && dlm_is_removed(ls, lkb->lkb_nodeid));
3979}
3980
3981static int purge_mstcpy_test(struct dlm_ls *ls, struct dlm_lkb *lkb)
3982{
3983 return is_master_copy(lkb);
3984}
3985
3986static void purge_dead_locks(struct dlm_rsb *r)
3987{
3988 purge_queue(r, &r->res_grantqueue, &purge_dead_test);
3989 purge_queue(r, &r->res_convertqueue, &purge_dead_test);
3990 purge_queue(r, &r->res_waitqueue, &purge_dead_test);
3991}
3992
3993void dlm_purge_mstcpy_locks(struct dlm_rsb *r)
3994{
3995 purge_queue(r, &r->res_grantqueue, &purge_mstcpy_test);
3996 purge_queue(r, &r->res_convertqueue, &purge_mstcpy_test);
3997 purge_queue(r, &r->res_waitqueue, &purge_mstcpy_test);
3998}
3999
4000/* Get rid of locks held by nodes that are gone. */
4001
4002int dlm_purge_locks(struct dlm_ls *ls)
4003{
4004 struct dlm_rsb *r;
4005
4006 log_debug(ls, "dlm_purge_locks");
4007
4008 down_write(&ls->ls_root_sem);
4009 list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
4010 hold_rsb(r);
4011 lock_rsb(r);
4012 if (is_master(r))
4013 purge_dead_locks(r);
4014 unlock_rsb(r);
4015 unhold_rsb(r);
4016
4017 schedule();
4018 }
4019 up_write(&ls->ls_root_sem);
4020
4021 return 0;
4022}
4023
David Teigland97a35d12006-05-02 13:34:03 -04004024static struct dlm_rsb *find_purged_rsb(struct dlm_ls *ls, int bucket)
4025{
4026 struct dlm_rsb *r, *r_ret = NULL;
4027
4028 read_lock(&ls->ls_rsbtbl[bucket].lock);
4029 list_for_each_entry(r, &ls->ls_rsbtbl[bucket].list, res_hashchain) {
4030 if (!rsb_flag(r, RSB_LOCKS_PURGED))
4031 continue;
4032 hold_rsb(r);
4033 rsb_clear_flag(r, RSB_LOCKS_PURGED);
4034 r_ret = r;
4035 break;
4036 }
4037 read_unlock(&ls->ls_rsbtbl[bucket].lock);
4038 return r_ret;
4039}
4040
4041void dlm_grant_after_purge(struct dlm_ls *ls)
David Teiglande7fd4172006-01-18 09:30:29 +00004042{
4043 struct dlm_rsb *r;
David Teigland2b4e9262006-07-25 13:59:48 -05004044 int bucket = 0;
David Teiglande7fd4172006-01-18 09:30:29 +00004045
David Teigland2b4e9262006-07-25 13:59:48 -05004046 while (1) {
4047 r = find_purged_rsb(ls, bucket);
4048 if (!r) {
4049 if (bucket == ls->ls_rsbtbl_size - 1)
4050 break;
4051 bucket++;
David Teigland97a35d12006-05-02 13:34:03 -04004052 continue;
David Teigland2b4e9262006-07-25 13:59:48 -05004053 }
David Teigland97a35d12006-05-02 13:34:03 -04004054 lock_rsb(r);
4055 if (is_master(r)) {
4056 grant_pending_locks(r);
4057 confirm_master(r, 0);
David Teiglande7fd4172006-01-18 09:30:29 +00004058 }
David Teigland97a35d12006-05-02 13:34:03 -04004059 unlock_rsb(r);
4060 put_rsb(r);
David Teigland2b4e9262006-07-25 13:59:48 -05004061 schedule();
David Teiglande7fd4172006-01-18 09:30:29 +00004062 }
David Teiglande7fd4172006-01-18 09:30:29 +00004063}
4064
4065static struct dlm_lkb *search_remid_list(struct list_head *head, int nodeid,
4066 uint32_t remid)
4067{
4068 struct dlm_lkb *lkb;
4069
4070 list_for_each_entry(lkb, head, lkb_statequeue) {
4071 if (lkb->lkb_nodeid == nodeid && lkb->lkb_remid == remid)
4072 return lkb;
4073 }
4074 return NULL;
4075}
4076
4077static struct dlm_lkb *search_remid(struct dlm_rsb *r, int nodeid,
4078 uint32_t remid)
4079{
4080 struct dlm_lkb *lkb;
4081
4082 lkb = search_remid_list(&r->res_grantqueue, nodeid, remid);
4083 if (lkb)
4084 return lkb;
4085 lkb = search_remid_list(&r->res_convertqueue, nodeid, remid);
4086 if (lkb)
4087 return lkb;
4088 lkb = search_remid_list(&r->res_waitqueue, nodeid, remid);
4089 if (lkb)
4090 return lkb;
4091 return NULL;
4092}
4093
4094static int receive_rcom_lock_args(struct dlm_ls *ls, struct dlm_lkb *lkb,
4095 struct dlm_rsb *r, struct dlm_rcom *rc)
4096{
4097 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
4098 int lvblen;
4099
4100 lkb->lkb_nodeid = rc->rc_header.h_nodeid;
4101 lkb->lkb_ownpid = rl->rl_ownpid;
4102 lkb->lkb_remid = rl->rl_lkid;
4103 lkb->lkb_exflags = rl->rl_exflags;
4104 lkb->lkb_flags = rl->rl_flags & 0x0000FFFF;
4105 lkb->lkb_flags |= DLM_IFL_MSTCPY;
4106 lkb->lkb_lvbseq = rl->rl_lvbseq;
4107 lkb->lkb_rqmode = rl->rl_rqmode;
4108 lkb->lkb_grmode = rl->rl_grmode;
4109 /* don't set lkb_status because add_lkb wants to itself */
4110
4111 lkb->lkb_bastaddr = (void *) (long) (rl->rl_asts & AST_BAST);
4112 lkb->lkb_astaddr = (void *) (long) (rl->rl_asts & AST_COMP);
4113
David Teiglande7fd4172006-01-18 09:30:29 +00004114 if (lkb->lkb_exflags & DLM_LKF_VALBLK) {
4115 lkb->lkb_lvbptr = allocate_lvb(ls);
4116 if (!lkb->lkb_lvbptr)
4117 return -ENOMEM;
4118 lvblen = rc->rc_header.h_length - sizeof(struct dlm_rcom) -
4119 sizeof(struct rcom_lock);
4120 memcpy(lkb->lkb_lvbptr, rl->rl_lvb, lvblen);
4121 }
4122
4123 /* Conversions between PR and CW (middle modes) need special handling.
4124 The real granted mode of these converting locks cannot be determined
4125 until all locks have been rebuilt on the rsb (recover_conversion) */
4126
4127 if (rl->rl_wait_type == DLM_MSG_CONVERT && middle_conversion(lkb)) {
4128 rl->rl_status = DLM_LKSTS_CONVERT;
4129 lkb->lkb_grmode = DLM_LOCK_IV;
4130 rsb_set_flag(r, RSB_RECOVER_CONVERT);
4131 }
4132
4133 return 0;
4134}
4135
4136/* This lkb may have been recovered in a previous aborted recovery so we need
4137 to check if the rsb already has an lkb with the given remote nodeid/lkid.
4138 If so we just send back a standard reply. If not, we create a new lkb with
4139 the given values and send back our lkid. We send back our lkid by sending
4140 back the rcom_lock struct we got but with the remid field filled in. */
4141
4142int dlm_recover_master_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
4143{
4144 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
4145 struct dlm_rsb *r;
4146 struct dlm_lkb *lkb;
4147 int error;
4148
4149 if (rl->rl_parent_lkid) {
4150 error = -EOPNOTSUPP;
4151 goto out;
4152 }
4153
4154 error = find_rsb(ls, rl->rl_name, rl->rl_namelen, R_MASTER, &r);
4155 if (error)
4156 goto out;
4157
4158 lock_rsb(r);
4159
4160 lkb = search_remid(r, rc->rc_header.h_nodeid, rl->rl_lkid);
4161 if (lkb) {
4162 error = -EEXIST;
4163 goto out_remid;
4164 }
4165
4166 error = create_lkb(ls, &lkb);
4167 if (error)
4168 goto out_unlock;
4169
4170 error = receive_rcom_lock_args(ls, lkb, r, rc);
4171 if (error) {
David Teiglandb3f58d82006-02-28 11:16:37 -05004172 __put_lkb(ls, lkb);
David Teiglande7fd4172006-01-18 09:30:29 +00004173 goto out_unlock;
4174 }
4175
4176 attach_lkb(r, lkb);
4177 add_lkb(r, lkb, rl->rl_status);
4178 error = 0;
4179
4180 out_remid:
4181 /* this is the new value returned to the lock holder for
4182 saving in its process-copy lkb */
4183 rl->rl_remid = lkb->lkb_id;
4184
4185 out_unlock:
4186 unlock_rsb(r);
4187 put_rsb(r);
4188 out:
4189 if (error)
4190 log_print("recover_master_copy %d %x", error, rl->rl_lkid);
4191 rl->rl_result = error;
4192 return error;
4193}
4194
4195int dlm_recover_process_copy(struct dlm_ls *ls, struct dlm_rcom *rc)
4196{
4197 struct rcom_lock *rl = (struct rcom_lock *) rc->rc_buf;
4198 struct dlm_rsb *r;
4199 struct dlm_lkb *lkb;
4200 int error;
4201
4202 error = find_lkb(ls, rl->rl_lkid, &lkb);
4203 if (error) {
4204 log_error(ls, "recover_process_copy no lkid %x", rl->rl_lkid);
4205 return error;
4206 }
4207
4208 DLM_ASSERT(is_process_copy(lkb), dlm_print_lkb(lkb););
4209
4210 error = rl->rl_result;
4211
4212 r = lkb->lkb_resource;
4213 hold_rsb(r);
4214 lock_rsb(r);
4215
4216 switch (error) {
David Teiglanddc200a82006-12-13 10:36:37 -06004217 case -EBADR:
4218 /* There's a chance the new master received our lock before
4219 dlm_recover_master_reply(), this wouldn't happen if we did
4220 a barrier between recover_masters and recover_locks. */
4221 log_debug(ls, "master copy not ready %x r %lx %s", lkb->lkb_id,
4222 (unsigned long)r, r->res_name);
4223 dlm_send_rcom_lock(r, lkb);
4224 goto out;
David Teiglande7fd4172006-01-18 09:30:29 +00004225 case -EEXIST:
4226 log_debug(ls, "master copy exists %x", lkb->lkb_id);
4227 /* fall through */
4228 case 0:
4229 lkb->lkb_remid = rl->rl_remid;
4230 break;
4231 default:
4232 log_error(ls, "dlm_recover_process_copy unknown error %d %x",
4233 error, lkb->lkb_id);
4234 }
4235
4236 /* an ack for dlm_recover_locks() which waits for replies from
4237 all the locks it sends to new masters */
4238 dlm_recovered_lock(r);
David Teiglanddc200a82006-12-13 10:36:37 -06004239 out:
David Teiglande7fd4172006-01-18 09:30:29 +00004240 unlock_rsb(r);
4241 put_rsb(r);
David Teiglandb3f58d82006-02-28 11:16:37 -05004242 dlm_put_lkb(lkb);
David Teiglande7fd4172006-01-18 09:30:29 +00004243
4244 return 0;
4245}
4246
David Teigland597d0ca2006-07-12 16:44:04 -05004247int dlm_user_request(struct dlm_ls *ls, struct dlm_user_args *ua,
4248 int mode, uint32_t flags, void *name, unsigned int namelen,
David Teiglandd7db9232007-05-18 09:00:32 -05004249 unsigned long timeout_cs)
David Teigland597d0ca2006-07-12 16:44:04 -05004250{
4251 struct dlm_lkb *lkb;
4252 struct dlm_args args;
4253 int error;
4254
David Teigland85e86ed2007-05-18 08:58:15 -05004255 dlm_lock_recovery(ls);
David Teigland597d0ca2006-07-12 16:44:04 -05004256
4257 error = create_lkb(ls, &lkb);
4258 if (error) {
4259 kfree(ua);
4260 goto out;
4261 }
4262
4263 if (flags & DLM_LKF_VALBLK) {
David Teigland62a0f622007-01-31 13:25:00 -06004264 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_KERNEL);
David Teigland597d0ca2006-07-12 16:44:04 -05004265 if (!ua->lksb.sb_lvbptr) {
4266 kfree(ua);
4267 __put_lkb(ls, lkb);
4268 error = -ENOMEM;
4269 goto out;
4270 }
4271 }
4272
4273 /* After ua is attached to lkb it will be freed by free_lkb().
4274 When DLM_IFL_USER is set, the dlm knows that this is a userspace
4275 lock and that lkb_astparam is the dlm_user_args structure. */
4276
David Teiglandd7db9232007-05-18 09:00:32 -05004277 error = set_lock_args(mode, &ua->lksb, flags, namelen, timeout_cs,
David Teigland32f105a2006-08-23 16:07:31 -04004278 DLM_FAKE_USER_AST, ua, DLM_FAKE_USER_AST, &args);
David Teigland597d0ca2006-07-12 16:44:04 -05004279 lkb->lkb_flags |= DLM_IFL_USER;
4280 ua->old_mode = DLM_LOCK_IV;
4281
4282 if (error) {
4283 __put_lkb(ls, lkb);
4284 goto out;
4285 }
4286
4287 error = request_lock(ls, lkb, name, namelen, &args);
4288
4289 switch (error) {
4290 case 0:
4291 break;
4292 case -EINPROGRESS:
4293 error = 0;
4294 break;
4295 case -EAGAIN:
4296 error = 0;
4297 /* fall through */
4298 default:
4299 __put_lkb(ls, lkb);
4300 goto out;
4301 }
4302
4303 /* add this new lkb to the per-process list of locks */
4304 spin_lock(&ua->proc->locks_spin);
David Teiglandef0c2bb2007-03-28 09:56:46 -05004305 hold_lkb(lkb);
David Teigland597d0ca2006-07-12 16:44:04 -05004306 list_add_tail(&lkb->lkb_ownqueue, &ua->proc->locks);
4307 spin_unlock(&ua->proc->locks_spin);
4308 out:
David Teigland85e86ed2007-05-18 08:58:15 -05004309 dlm_unlock_recovery(ls);
David Teigland597d0ca2006-07-12 16:44:04 -05004310 return error;
4311}
4312
4313int dlm_user_convert(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
David Teiglandd7db9232007-05-18 09:00:32 -05004314 int mode, uint32_t flags, uint32_t lkid, char *lvb_in,
4315 unsigned long timeout_cs)
David Teigland597d0ca2006-07-12 16:44:04 -05004316{
4317 struct dlm_lkb *lkb;
4318 struct dlm_args args;
4319 struct dlm_user_args *ua;
4320 int error;
4321
David Teigland85e86ed2007-05-18 08:58:15 -05004322 dlm_lock_recovery(ls);
David Teigland597d0ca2006-07-12 16:44:04 -05004323
4324 error = find_lkb(ls, lkid, &lkb);
4325 if (error)
4326 goto out;
4327
4328 /* user can change the params on its lock when it converts it, or
4329 add an lvb that didn't exist before */
4330
4331 ua = (struct dlm_user_args *)lkb->lkb_astparam;
4332
4333 if (flags & DLM_LKF_VALBLK && !ua->lksb.sb_lvbptr) {
David Teigland62a0f622007-01-31 13:25:00 -06004334 ua->lksb.sb_lvbptr = kzalloc(DLM_USER_LVB_LEN, GFP_KERNEL);
David Teigland597d0ca2006-07-12 16:44:04 -05004335 if (!ua->lksb.sb_lvbptr) {
4336 error = -ENOMEM;
4337 goto out_put;
4338 }
4339 }
4340 if (lvb_in && ua->lksb.sb_lvbptr)
4341 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
4342
David Teiglandd7db9232007-05-18 09:00:32 -05004343 ua->xid = ua_tmp->xid;
David Teigland597d0ca2006-07-12 16:44:04 -05004344 ua->castparam = ua_tmp->castparam;
4345 ua->castaddr = ua_tmp->castaddr;
4346 ua->bastparam = ua_tmp->bastparam;
4347 ua->bastaddr = ua_tmp->bastaddr;
Patrick Caulfield10948eb2006-08-23 09:49:31 +01004348 ua->user_lksb = ua_tmp->user_lksb;
David Teigland597d0ca2006-07-12 16:44:04 -05004349 ua->old_mode = lkb->lkb_grmode;
4350
David Teiglandd7db9232007-05-18 09:00:32 -05004351 error = set_lock_args(mode, &ua->lksb, flags, 0, timeout_cs,
4352 DLM_FAKE_USER_AST, ua, DLM_FAKE_USER_AST, &args);
David Teigland597d0ca2006-07-12 16:44:04 -05004353 if (error)
4354 goto out_put;
4355
4356 error = convert_lock(ls, lkb, &args);
4357
David Teiglandc85d65e2007-05-18 09:01:26 -05004358 if (error == -EINPROGRESS || error == -EAGAIN || error == -EDEADLK)
David Teigland597d0ca2006-07-12 16:44:04 -05004359 error = 0;
4360 out_put:
4361 dlm_put_lkb(lkb);
4362 out:
David Teigland85e86ed2007-05-18 08:58:15 -05004363 dlm_unlock_recovery(ls);
David Teigland597d0ca2006-07-12 16:44:04 -05004364 kfree(ua_tmp);
4365 return error;
4366}
4367
4368int dlm_user_unlock(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4369 uint32_t flags, uint32_t lkid, char *lvb_in)
4370{
4371 struct dlm_lkb *lkb;
4372 struct dlm_args args;
4373 struct dlm_user_args *ua;
4374 int error;
4375
David Teigland85e86ed2007-05-18 08:58:15 -05004376 dlm_lock_recovery(ls);
David Teigland597d0ca2006-07-12 16:44:04 -05004377
4378 error = find_lkb(ls, lkid, &lkb);
4379 if (error)
4380 goto out;
4381
4382 ua = (struct dlm_user_args *)lkb->lkb_astparam;
4383
4384 if (lvb_in && ua->lksb.sb_lvbptr)
4385 memcpy(ua->lksb.sb_lvbptr, lvb_in, DLM_USER_LVB_LEN);
4386 ua->castparam = ua_tmp->castparam;
Patrick Caulfieldcc346d52006-08-08 10:34:40 -04004387 ua->user_lksb = ua_tmp->user_lksb;
David Teigland597d0ca2006-07-12 16:44:04 -05004388
4389 error = set_unlock_args(flags, ua, &args);
4390 if (error)
4391 goto out_put;
4392
4393 error = unlock_lock(ls, lkb, &args);
4394
4395 if (error == -DLM_EUNLOCK)
4396 error = 0;
David Teiglandef0c2bb2007-03-28 09:56:46 -05004397 /* from validate_unlock_args() */
4398 if (error == -EBUSY && (flags & DLM_LKF_FORCEUNLOCK))
4399 error = 0;
David Teigland597d0ca2006-07-12 16:44:04 -05004400 if (error)
4401 goto out_put;
4402
4403 spin_lock(&ua->proc->locks_spin);
David Teiglanda1bc86e2007-01-15 10:34:52 -06004404 /* dlm_user_add_ast() may have already taken lkb off the proc list */
4405 if (!list_empty(&lkb->lkb_ownqueue))
4406 list_move(&lkb->lkb_ownqueue, &ua->proc->unlocking);
David Teigland597d0ca2006-07-12 16:44:04 -05004407 spin_unlock(&ua->proc->locks_spin);
David Teigland597d0ca2006-07-12 16:44:04 -05004408 out_put:
4409 dlm_put_lkb(lkb);
4410 out:
David Teigland85e86ed2007-05-18 08:58:15 -05004411 dlm_unlock_recovery(ls);
David Teiglandef0c2bb2007-03-28 09:56:46 -05004412 kfree(ua_tmp);
David Teigland597d0ca2006-07-12 16:44:04 -05004413 return error;
4414}
4415
4416int dlm_user_cancel(struct dlm_ls *ls, struct dlm_user_args *ua_tmp,
4417 uint32_t flags, uint32_t lkid)
4418{
4419 struct dlm_lkb *lkb;
4420 struct dlm_args args;
4421 struct dlm_user_args *ua;
4422 int error;
4423
David Teigland85e86ed2007-05-18 08:58:15 -05004424 dlm_lock_recovery(ls);
David Teigland597d0ca2006-07-12 16:44:04 -05004425
4426 error = find_lkb(ls, lkid, &lkb);
4427 if (error)
4428 goto out;
4429
4430 ua = (struct dlm_user_args *)lkb->lkb_astparam;
4431 ua->castparam = ua_tmp->castparam;
Patrick Caulfieldc059f702006-08-23 10:24:03 +01004432 ua->user_lksb = ua_tmp->user_lksb;
David Teigland597d0ca2006-07-12 16:44:04 -05004433
4434 error = set_unlock_args(flags, ua, &args);
4435 if (error)
4436 goto out_put;
4437
4438 error = cancel_lock(ls, lkb, &args);
4439
4440 if (error == -DLM_ECANCEL)
4441 error = 0;
David Teiglandef0c2bb2007-03-28 09:56:46 -05004442 /* from validate_unlock_args() */
4443 if (error == -EBUSY)
4444 error = 0;
David Teigland597d0ca2006-07-12 16:44:04 -05004445 out_put:
4446 dlm_put_lkb(lkb);
4447 out:
David Teigland85e86ed2007-05-18 08:58:15 -05004448 dlm_unlock_recovery(ls);
David Teiglandef0c2bb2007-03-28 09:56:46 -05004449 kfree(ua_tmp);
David Teigland597d0ca2006-07-12 16:44:04 -05004450 return error;
4451}
4452
David Teiglandef0c2bb2007-03-28 09:56:46 -05004453/* lkb's that are removed from the waiters list by revert are just left on the
4454 orphans list with the granted orphan locks, to be freed by purge */
4455
David Teigland597d0ca2006-07-12 16:44:04 -05004456static int orphan_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
4457{
4458 struct dlm_user_args *ua = (struct dlm_user_args *)lkb->lkb_astparam;
David Teiglandef0c2bb2007-03-28 09:56:46 -05004459 struct dlm_args args;
4460 int error;
David Teigland597d0ca2006-07-12 16:44:04 -05004461
David Teiglandef0c2bb2007-03-28 09:56:46 -05004462 hold_lkb(lkb);
4463 mutex_lock(&ls->ls_orphans_mutex);
4464 list_add_tail(&lkb->lkb_ownqueue, &ls->ls_orphans);
4465 mutex_unlock(&ls->ls_orphans_mutex);
David Teigland597d0ca2006-07-12 16:44:04 -05004466
David Teiglandef0c2bb2007-03-28 09:56:46 -05004467 set_unlock_args(0, ua, &args);
4468
4469 error = cancel_lock(ls, lkb, &args);
4470 if (error == -DLM_ECANCEL)
4471 error = 0;
4472 return error;
David Teigland597d0ca2006-07-12 16:44:04 -05004473}
4474
4475/* The force flag allows the unlock to go ahead even if the lkb isn't granted.
4476 Regardless of what rsb queue the lock is on, it's removed and freed. */
4477
4478static int unlock_proc_lock(struct dlm_ls *ls, struct dlm_lkb *lkb)
4479{
4480 struct dlm_user_args *ua = (struct dlm_user_args *)lkb->lkb_astparam;
4481 struct dlm_args args;
4482 int error;
4483
David Teigland597d0ca2006-07-12 16:44:04 -05004484 set_unlock_args(DLM_LKF_FORCEUNLOCK, ua, &args);
4485
4486 error = unlock_lock(ls, lkb, &args);
4487 if (error == -DLM_EUNLOCK)
4488 error = 0;
4489 return error;
4490}
4491
David Teiglandef0c2bb2007-03-28 09:56:46 -05004492/* We have to release clear_proc_locks mutex before calling unlock_proc_lock()
4493 (which does lock_rsb) due to deadlock with receiving a message that does
4494 lock_rsb followed by dlm_user_add_ast() */
4495
4496static struct dlm_lkb *del_proc_lock(struct dlm_ls *ls,
4497 struct dlm_user_proc *proc)
4498{
4499 struct dlm_lkb *lkb = NULL;
4500
4501 mutex_lock(&ls->ls_clear_proc_locks);
4502 if (list_empty(&proc->locks))
4503 goto out;
4504
4505 lkb = list_entry(proc->locks.next, struct dlm_lkb, lkb_ownqueue);
4506 list_del_init(&lkb->lkb_ownqueue);
4507
4508 if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
4509 lkb->lkb_flags |= DLM_IFL_ORPHAN;
4510 else
4511 lkb->lkb_flags |= DLM_IFL_DEAD;
4512 out:
4513 mutex_unlock(&ls->ls_clear_proc_locks);
4514 return lkb;
4515}
4516
David Teigland597d0ca2006-07-12 16:44:04 -05004517/* The ls_clear_proc_locks mutex protects against dlm_user_add_asts() which
4518 1) references lkb->ua which we free here and 2) adds lkbs to proc->asts,
4519 which we clear here. */
4520
4521/* proc CLOSING flag is set so no more device_reads should look at proc->asts
4522 list, and no more device_writes should add lkb's to proc->locks list; so we
4523 shouldn't need to take asts_spin or locks_spin here. this assumes that
4524 device reads/writes/closes are serialized -- FIXME: we may need to serialize
4525 them ourself. */
4526
4527void dlm_clear_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
4528{
4529 struct dlm_lkb *lkb, *safe;
4530
David Teigland85e86ed2007-05-18 08:58:15 -05004531 dlm_lock_recovery(ls);
David Teigland597d0ca2006-07-12 16:44:04 -05004532
David Teiglandef0c2bb2007-03-28 09:56:46 -05004533 while (1) {
4534 lkb = del_proc_lock(ls, proc);
4535 if (!lkb)
4536 break;
David Teigland84d8cd62007-05-29 08:44:23 -05004537 del_timeout(lkb);
David Teiglandef0c2bb2007-03-28 09:56:46 -05004538 if (lkb->lkb_exflags & DLM_LKF_PERSISTENT)
David Teigland597d0ca2006-07-12 16:44:04 -05004539 orphan_proc_lock(ls, lkb);
David Teiglandef0c2bb2007-03-28 09:56:46 -05004540 else
David Teigland597d0ca2006-07-12 16:44:04 -05004541 unlock_proc_lock(ls, lkb);
David Teigland597d0ca2006-07-12 16:44:04 -05004542
4543 /* this removes the reference for the proc->locks list
4544 added by dlm_user_request, it may result in the lkb
4545 being freed */
4546
4547 dlm_put_lkb(lkb);
4548 }
David Teiglanda1bc86e2007-01-15 10:34:52 -06004549
David Teiglandef0c2bb2007-03-28 09:56:46 -05004550 mutex_lock(&ls->ls_clear_proc_locks);
4551
David Teiglanda1bc86e2007-01-15 10:34:52 -06004552 /* in-progress unlocks */
4553 list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
4554 list_del_init(&lkb->lkb_ownqueue);
4555 lkb->lkb_flags |= DLM_IFL_DEAD;
4556 dlm_put_lkb(lkb);
4557 }
4558
4559 list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_astqueue) {
4560 list_del(&lkb->lkb_astqueue);
4561 dlm_put_lkb(lkb);
4562 }
4563
David Teigland597d0ca2006-07-12 16:44:04 -05004564 mutex_unlock(&ls->ls_clear_proc_locks);
David Teigland85e86ed2007-05-18 08:58:15 -05004565 dlm_unlock_recovery(ls);
David Teigland597d0ca2006-07-12 16:44:04 -05004566}
David Teiglanda1bc86e2007-01-15 10:34:52 -06004567
David Teigland84991372007-03-30 15:02:40 -05004568static void purge_proc_locks(struct dlm_ls *ls, struct dlm_user_proc *proc)
4569{
4570 struct dlm_lkb *lkb, *safe;
4571
4572 while (1) {
4573 lkb = NULL;
4574 spin_lock(&proc->locks_spin);
4575 if (!list_empty(&proc->locks)) {
4576 lkb = list_entry(proc->locks.next, struct dlm_lkb,
4577 lkb_ownqueue);
4578 list_del_init(&lkb->lkb_ownqueue);
4579 }
4580 spin_unlock(&proc->locks_spin);
4581
4582 if (!lkb)
4583 break;
4584
4585 lkb->lkb_flags |= DLM_IFL_DEAD;
4586 unlock_proc_lock(ls, lkb);
4587 dlm_put_lkb(lkb); /* ref from proc->locks list */
4588 }
4589
4590 spin_lock(&proc->locks_spin);
4591 list_for_each_entry_safe(lkb, safe, &proc->unlocking, lkb_ownqueue) {
4592 list_del_init(&lkb->lkb_ownqueue);
4593 lkb->lkb_flags |= DLM_IFL_DEAD;
4594 dlm_put_lkb(lkb);
4595 }
4596 spin_unlock(&proc->locks_spin);
4597
4598 spin_lock(&proc->asts_spin);
4599 list_for_each_entry_safe(lkb, safe, &proc->asts, lkb_astqueue) {
4600 list_del(&lkb->lkb_astqueue);
4601 dlm_put_lkb(lkb);
4602 }
4603 spin_unlock(&proc->asts_spin);
4604}
4605
4606/* pid of 0 means purge all orphans */
4607
4608static void do_purge(struct dlm_ls *ls, int nodeid, int pid)
4609{
4610 struct dlm_lkb *lkb, *safe;
4611
4612 mutex_lock(&ls->ls_orphans_mutex);
4613 list_for_each_entry_safe(lkb, safe, &ls->ls_orphans, lkb_ownqueue) {
4614 if (pid && lkb->lkb_ownpid != pid)
4615 continue;
4616 unlock_proc_lock(ls, lkb);
4617 list_del_init(&lkb->lkb_ownqueue);
4618 dlm_put_lkb(lkb);
4619 }
4620 mutex_unlock(&ls->ls_orphans_mutex);
4621}
4622
4623static int send_purge(struct dlm_ls *ls, int nodeid, int pid)
4624{
4625 struct dlm_message *ms;
4626 struct dlm_mhandle *mh;
4627 int error;
4628
4629 error = _create_message(ls, sizeof(struct dlm_message), nodeid,
4630 DLM_MSG_PURGE, &ms, &mh);
4631 if (error)
4632 return error;
4633 ms->m_nodeid = nodeid;
4634 ms->m_pid = pid;
4635
4636 return send_message(mh, ms);
4637}
4638
4639int dlm_user_purge(struct dlm_ls *ls, struct dlm_user_proc *proc,
4640 int nodeid, int pid)
4641{
4642 int error = 0;
4643
4644 if (nodeid != dlm_our_nodeid()) {
4645 error = send_purge(ls, nodeid, pid);
4646 } else {
David Teigland85e86ed2007-05-18 08:58:15 -05004647 dlm_lock_recovery(ls);
David Teigland84991372007-03-30 15:02:40 -05004648 if (pid == current->pid)
4649 purge_proc_locks(ls, proc);
4650 else
4651 do_purge(ls, nodeid, pid);
David Teigland85e86ed2007-05-18 08:58:15 -05004652 dlm_unlock_recovery(ls);
David Teigland84991372007-03-30 15:02:40 -05004653 }
4654 return error;
4655}
4656