blob: 76526ea95bb2a58a286c4003740b95bc4eefadc7 [file] [log] [blame]
Kurt Hackel6714d8e2005-12-15 14:31:23 -08001/* -*- mode: c; c-basic-offset: 8; -*-
2 * vim: noexpandtab sw=8 ts=8 sts=0:
3 *
4 * dlmthread.c
5 *
6 * standalone DLM module
7 *
8 * Copyright (C) 2004 Oracle. All rights reserved.
9 *
10 * This program is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU General Public
12 * License as published by the Free Software Foundation; either
13 * version 2 of the License, or (at your option) any later version.
14 *
15 * This program is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
18 * General Public License for more details.
19 *
20 * You should have received a copy of the GNU General Public
21 * License along with this program; if not, write to the
22 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
23 * Boston, MA 021110-1307, USA.
24 *
25 */
26
27
28#include <linux/module.h>
29#include <linux/fs.h>
30#include <linux/types.h>
31#include <linux/slab.h>
32#include <linux/highmem.h>
33#include <linux/utsname.h>
34#include <linux/init.h>
35#include <linux/sysctl.h>
36#include <linux/random.h>
37#include <linux/blkdev.h>
38#include <linux/socket.h>
39#include <linux/inet.h>
40#include <linux/timer.h>
41#include <linux/kthread.h>
Kurt Hackel8d79d082006-04-27 17:58:23 -070042#include <linux/delay.h>
Kurt Hackel6714d8e2005-12-15 14:31:23 -080043
44
45#include "cluster/heartbeat.h"
46#include "cluster/nodemanager.h"
47#include "cluster/tcp.h"
48
49#include "dlmapi.h"
50#include "dlmcommon.h"
51#include "dlmdomain.h"
52
53#define MLOG_MASK_PREFIX (ML_DLM|ML_DLM_THREAD)
54#include "cluster/masklog.h"
55
Kurt Hackel6714d8e2005-12-15 14:31:23 -080056static int dlm_thread(void *data);
Kurt Hackel8b219802006-05-01 11:16:45 -070057static void dlm_purge_lockres_now(struct dlm_ctxt *dlm,
58 struct dlm_lock_resource *lockres);
Kurt Hackel6714d8e2005-12-15 14:31:23 -080059
60static void dlm_flush_asts(struct dlm_ctxt *dlm);
61
62#define dlm_lock_is_remote(dlm, lock) ((lock)->ml.node != (dlm)->node_num)
63
64/* will exit holding res->spinlock, but may drop in function */
65/* waits until flags are cleared on res->state */
66void __dlm_wait_on_lockres_flags(struct dlm_lock_resource *res, int flags)
67{
68 DECLARE_WAITQUEUE(wait, current);
69
70 assert_spin_locked(&res->spinlock);
71
72 add_wait_queue(&res->wq, &wait);
73repeat:
74 set_current_state(TASK_UNINTERRUPTIBLE);
75 if (res->state & flags) {
76 spin_unlock(&res->spinlock);
77 schedule();
78 spin_lock(&res->spinlock);
79 goto repeat;
80 }
81 remove_wait_queue(&res->wq, &wait);
82 current->state = TASK_RUNNING;
83}
84
85
Kurt Hackel69d72b02006-05-01 10:57:51 -070086int __dlm_lockres_unused(struct dlm_lock_resource *res)
Kurt Hackel6714d8e2005-12-15 14:31:23 -080087{
88 if (list_empty(&res->granted) &&
89 list_empty(&res->converting) &&
90 list_empty(&res->blocked) &&
91 list_empty(&res->dirty))
92 return 1;
93 return 0;
94}
95
96
97/* Call whenever you may have added or deleted something from one of
98 * the lockres queue's. This will figure out whether it belongs on the
99 * unused list or not and does the appropriate thing. */
100void __dlm_lockres_calc_usage(struct dlm_ctxt *dlm,
101 struct dlm_lock_resource *res)
102{
103 mlog_entry("%.*s\n", res->lockname.len, res->lockname.name);
104
105 assert_spin_locked(&dlm->spinlock);
106 assert_spin_locked(&res->spinlock);
107
108 if (__dlm_lockres_unused(res)){
109 if (list_empty(&res->purge)) {
110 mlog(0, "putting lockres %.*s from purge list\n",
111 res->lockname.len, res->lockname.name);
112
113 res->last_used = jiffies;
114 list_add_tail(&res->purge, &dlm->purge_list);
115 dlm->purge_count++;
Kurt Hackel8b219802006-05-01 11:16:45 -0700116
117 /* if this node is not the owner, there is
118 * no way to keep track of who the owner could be.
119 * unhash it to avoid serious problems. */
120 if (res->owner != dlm->node_num) {
121 mlog(0, "%s:%.*s: doing immediate "
122 "purge of lockres owned by %u\n",
123 dlm->name, res->lockname.len,
124 res->lockname.name, res->owner);
125
126 dlm_purge_lockres_now(dlm, res);
127 }
Kurt Hackel6714d8e2005-12-15 14:31:23 -0800128 }
129 } else if (!list_empty(&res->purge)) {
Kurt Hackel8b219802006-05-01 11:16:45 -0700130 mlog(0, "removing lockres %.*s from purge list, "
131 "owner=%u\n", res->lockname.len, res->lockname.name,
132 res->owner);
Kurt Hackel6714d8e2005-12-15 14:31:23 -0800133
134 list_del_init(&res->purge);
135 dlm->purge_count--;
136 }
137}
138
139void dlm_lockres_calc_usage(struct dlm_ctxt *dlm,
140 struct dlm_lock_resource *res)
141{
142 mlog_entry("%.*s\n", res->lockname.len, res->lockname.name);
143 spin_lock(&dlm->spinlock);
144 spin_lock(&res->spinlock);
145
146 __dlm_lockres_calc_usage(dlm, res);
147
148 spin_unlock(&res->spinlock);
149 spin_unlock(&dlm->spinlock);
150}
151
152/* TODO: Eventual API: Called with the dlm spinlock held, may drop it
153 * to do migration, but will re-acquire before exit. */
154void dlm_purge_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *lockres)
155{
156 int master;
157 int ret;
158
159 spin_lock(&lockres->spinlock);
160 master = lockres->owner == dlm->node_num;
161 spin_unlock(&lockres->spinlock);
162
163 mlog(0, "purging lockres %.*s, master = %d\n", lockres->lockname.len,
164 lockres->lockname.name, master);
165
166 /* Non master is the easy case -- no migration required, just
167 * quit. */
168 if (!master)
169 goto finish;
170
171 /* Wheee! Migrate lockres here! */
172 spin_unlock(&dlm->spinlock);
173again:
174
175 ret = dlm_migrate_lockres(dlm, lockres, O2NM_MAX_NODES);
176 if (ret == -ENOTEMPTY) {
177 mlog(ML_ERROR, "lockres %.*s still has local locks!\n",
178 lockres->lockname.len, lockres->lockname.name);
179
180 BUG();
181 } else if (ret < 0) {
182 mlog(ML_NOTICE, "lockres %.*s: migrate failed, retrying\n",
183 lockres->lockname.len, lockres->lockname.name);
Kurt Hackel8d79d082006-04-27 17:58:23 -0700184 msleep(100);
Kurt Hackel6714d8e2005-12-15 14:31:23 -0800185 goto again;
186 }
187
188 spin_lock(&dlm->spinlock);
189
190finish:
191 if (!list_empty(&lockres->purge)) {
192 list_del_init(&lockres->purge);
193 dlm->purge_count--;
194 }
195 __dlm_unhash_lockres(lockres);
196}
197
Kurt Hackel8b219802006-05-01 11:16:45 -0700198/* make an unused lockres go away immediately.
199 * as soon as the dlm spinlock is dropped, this lockres
200 * will not be found. kfree still happens on last put. */
201static void dlm_purge_lockres_now(struct dlm_ctxt *dlm,
202 struct dlm_lock_resource *lockres)
203{
204 assert_spin_locked(&dlm->spinlock);
205 assert_spin_locked(&lockres->spinlock);
206
207 BUG_ON(!__dlm_lockres_unused(lockres));
208
209 if (!list_empty(&lockres->purge)) {
210 list_del_init(&lockres->purge);
211 dlm->purge_count--;
212 }
213 __dlm_unhash_lockres(lockres);
214}
215
Kurt Hackel6714d8e2005-12-15 14:31:23 -0800216static void dlm_run_purge_list(struct dlm_ctxt *dlm,
217 int purge_now)
218{
219 unsigned int run_max, unused;
220 unsigned long purge_jiffies;
221 struct dlm_lock_resource *lockres;
222
223 spin_lock(&dlm->spinlock);
224 run_max = dlm->purge_count;
225
226 while(run_max && !list_empty(&dlm->purge_list)) {
227 run_max--;
228
229 lockres = list_entry(dlm->purge_list.next,
230 struct dlm_lock_resource, purge);
231
232 /* Status of the lockres *might* change so double
233 * check. If the lockres is unused, holding the dlm
234 * spinlock will prevent people from getting and more
235 * refs on it -- there's no need to keep the lockres
236 * spinlock. */
237 spin_lock(&lockres->spinlock);
238 unused = __dlm_lockres_unused(lockres);
239 spin_unlock(&lockres->spinlock);
240
241 if (!unused)
242 continue;
243
244 purge_jiffies = lockres->last_used +
245 msecs_to_jiffies(DLM_PURGE_INTERVAL_MS);
246
247 /* Make sure that we want to be processing this guy at
248 * this time. */
249 if (!purge_now && time_after(purge_jiffies, jiffies)) {
250 /* Since resources are added to the purge list
251 * in tail order, we can stop at the first
252 * unpurgable resource -- anyone added after
253 * him will have a greater last_used value */
254 break;
255 }
256
257 list_del_init(&lockres->purge);
258 dlm->purge_count--;
259
260 /* This may drop and reacquire the dlm spinlock if it
261 * has to do migration. */
262 mlog(0, "calling dlm_purge_lockres!\n");
263 dlm_purge_lockres(dlm, lockres);
264 mlog(0, "DONE calling dlm_purge_lockres!\n");
265
266 /* Avoid adding any scheduling latencies */
267 cond_resched_lock(&dlm->spinlock);
268 }
269
270 spin_unlock(&dlm->spinlock);
271}
272
273static void dlm_shuffle_lists(struct dlm_ctxt *dlm,
274 struct dlm_lock_resource *res)
275{
276 struct dlm_lock *lock, *target;
277 struct list_head *iter;
278 struct list_head *head;
279 int can_grant = 1;
280
281 //mlog(0, "res->lockname.len=%d\n", res->lockname.len);
282 //mlog(0, "res->lockname.name=%p\n", res->lockname.name);
283 //mlog(0, "shuffle res %.*s\n", res->lockname.len,
284 // res->lockname.name);
285
286 /* because this function is called with the lockres
287 * spinlock, and because we know that it is not migrating/
288 * recovering/in-progress, it is fine to reserve asts and
289 * basts right before queueing them all throughout */
290 assert_spin_locked(&res->spinlock);
291 BUG_ON((res->state & (DLM_LOCK_RES_MIGRATING|
292 DLM_LOCK_RES_RECOVERING|
293 DLM_LOCK_RES_IN_PROGRESS)));
294
295converting:
296 if (list_empty(&res->converting))
297 goto blocked;
298 mlog(0, "res %.*s has locks on a convert queue\n", res->lockname.len,
299 res->lockname.name);
300
301 target = list_entry(res->converting.next, struct dlm_lock, list);
302 if (target->ml.convert_type == LKM_IVMODE) {
303 mlog(ML_ERROR, "%.*s: converting a lock with no "
304 "convert_type!\n", res->lockname.len, res->lockname.name);
305 BUG();
306 }
307 head = &res->granted;
308 list_for_each(iter, head) {
309 lock = list_entry(iter, struct dlm_lock, list);
310 if (lock==target)
311 continue;
312 if (!dlm_lock_compatible(lock->ml.type,
313 target->ml.convert_type)) {
314 can_grant = 0;
315 /* queue the BAST if not already */
316 if (lock->ml.highest_blocked == LKM_IVMODE) {
317 __dlm_lockres_reserve_ast(res);
318 dlm_queue_bast(dlm, lock);
319 }
320 /* update the highest_blocked if needed */
321 if (lock->ml.highest_blocked < target->ml.convert_type)
322 lock->ml.highest_blocked =
323 target->ml.convert_type;
324 }
325 }
326 head = &res->converting;
327 list_for_each(iter, head) {
328 lock = list_entry(iter, struct dlm_lock, list);
329 if (lock==target)
330 continue;
331 if (!dlm_lock_compatible(lock->ml.type,
332 target->ml.convert_type)) {
333 can_grant = 0;
334 if (lock->ml.highest_blocked == LKM_IVMODE) {
335 __dlm_lockres_reserve_ast(res);
336 dlm_queue_bast(dlm, lock);
337 }
338 if (lock->ml.highest_blocked < target->ml.convert_type)
339 lock->ml.highest_blocked =
340 target->ml.convert_type;
341 }
342 }
343
344 /* we can convert the lock */
345 if (can_grant) {
346 spin_lock(&target->spinlock);
347 BUG_ON(target->ml.highest_blocked != LKM_IVMODE);
348
349 mlog(0, "calling ast for converting lock: %.*s, have: %d, "
350 "granting: %d, node: %u\n", res->lockname.len,
351 res->lockname.name, target->ml.type,
352 target->ml.convert_type, target->ml.node);
353
354 target->ml.type = target->ml.convert_type;
355 target->ml.convert_type = LKM_IVMODE;
Akinobu Mitaf1166292006-06-26 00:24:46 -0700356 list_move_tail(&target->list, &res->granted);
Kurt Hackel6714d8e2005-12-15 14:31:23 -0800357
358 BUG_ON(!target->lksb);
359 target->lksb->status = DLM_NORMAL;
360
361 spin_unlock(&target->spinlock);
362
363 __dlm_lockres_reserve_ast(res);
364 dlm_queue_ast(dlm, target);
365 /* go back and check for more */
366 goto converting;
367 }
368
369blocked:
370 if (list_empty(&res->blocked))
371 goto leave;
372 target = list_entry(res->blocked.next, struct dlm_lock, list);
373
374 head = &res->granted;
375 list_for_each(iter, head) {
376 lock = list_entry(iter, struct dlm_lock, list);
377 if (lock==target)
378 continue;
379 if (!dlm_lock_compatible(lock->ml.type, target->ml.type)) {
380 can_grant = 0;
381 if (lock->ml.highest_blocked == LKM_IVMODE) {
382 __dlm_lockres_reserve_ast(res);
383 dlm_queue_bast(dlm, lock);
384 }
385 if (lock->ml.highest_blocked < target->ml.type)
386 lock->ml.highest_blocked = target->ml.type;
387 }
388 }
389
390 head = &res->converting;
391 list_for_each(iter, head) {
392 lock = list_entry(iter, struct dlm_lock, list);
393 if (lock==target)
394 continue;
395 if (!dlm_lock_compatible(lock->ml.type, target->ml.type)) {
396 can_grant = 0;
397 if (lock->ml.highest_blocked == LKM_IVMODE) {
398 __dlm_lockres_reserve_ast(res);
399 dlm_queue_bast(dlm, lock);
400 }
401 if (lock->ml.highest_blocked < target->ml.type)
402 lock->ml.highest_blocked = target->ml.type;
403 }
404 }
405
406 /* we can grant the blocked lock (only
407 * possible if converting list empty) */
408 if (can_grant) {
409 spin_lock(&target->spinlock);
410 BUG_ON(target->ml.highest_blocked != LKM_IVMODE);
411
412 mlog(0, "calling ast for blocked lock: %.*s, granting: %d, "
413 "node: %u\n", res->lockname.len, res->lockname.name,
414 target->ml.type, target->ml.node);
415
416 // target->ml.type is already correct
Akinobu Mitaf1166292006-06-26 00:24:46 -0700417 list_move_tail(&target->list, &res->granted);
Kurt Hackel6714d8e2005-12-15 14:31:23 -0800418
419 BUG_ON(!target->lksb);
420 target->lksb->status = DLM_NORMAL;
421
422 spin_unlock(&target->spinlock);
423
424 __dlm_lockres_reserve_ast(res);
425 dlm_queue_ast(dlm, target);
426 /* go back and check for more */
427 goto converting;
428 }
429
430leave:
431 return;
432}
433
434/* must have NO locks when calling this with res !=NULL * */
435void dlm_kick_thread(struct dlm_ctxt *dlm, struct dlm_lock_resource *res)
436{
437 mlog_entry("dlm=%p, res=%p\n", dlm, res);
438 if (res) {
439 spin_lock(&dlm->spinlock);
440 spin_lock(&res->spinlock);
441 __dlm_dirty_lockres(dlm, res);
442 spin_unlock(&res->spinlock);
443 spin_unlock(&dlm->spinlock);
444 }
445 wake_up(&dlm->dlm_thread_wq);
446}
447
448void __dlm_dirty_lockres(struct dlm_ctxt *dlm, struct dlm_lock_resource *res)
449{
450 mlog_entry("dlm=%p, res=%p\n", dlm, res);
451
452 assert_spin_locked(&dlm->spinlock);
453 assert_spin_locked(&res->spinlock);
454
455 /* don't shuffle secondary queues */
456 if ((res->owner == dlm->node_num) &&
457 !(res->state & DLM_LOCK_RES_DIRTY)) {
458 list_add_tail(&res->dirty, &dlm->dirty_list);
459 res->state |= DLM_LOCK_RES_DIRTY;
460 }
461}
462
463
464/* Launch the NM thread for the mounted volume */
465int dlm_launch_thread(struct dlm_ctxt *dlm)
466{
467 mlog(0, "starting dlm thread...\n");
468
469 dlm->dlm_thread_task = kthread_run(dlm_thread, dlm, "dlm_thread");
470 if (IS_ERR(dlm->dlm_thread_task)) {
471 mlog_errno(PTR_ERR(dlm->dlm_thread_task));
472 dlm->dlm_thread_task = NULL;
473 return -EINVAL;
474 }
475
476 return 0;
477}
478
479void dlm_complete_thread(struct dlm_ctxt *dlm)
480{
481 if (dlm->dlm_thread_task) {
482 mlog(ML_KTHREAD, "waiting for dlm thread to exit\n");
483 kthread_stop(dlm->dlm_thread_task);
484 dlm->dlm_thread_task = NULL;
485 }
486}
487
488static int dlm_dirty_list_empty(struct dlm_ctxt *dlm)
489{
490 int empty;
491
492 spin_lock(&dlm->spinlock);
493 empty = list_empty(&dlm->dirty_list);
494 spin_unlock(&dlm->spinlock);
495
496 return empty;
497}
498
499static void dlm_flush_asts(struct dlm_ctxt *dlm)
500{
501 int ret;
502 struct dlm_lock *lock;
503 struct dlm_lock_resource *res;
504 u8 hi;
505
506 spin_lock(&dlm->ast_lock);
507 while (!list_empty(&dlm->pending_asts)) {
508 lock = list_entry(dlm->pending_asts.next,
509 struct dlm_lock, ast_list);
510 /* get an extra ref on lock */
511 dlm_lock_get(lock);
512 res = lock->lockres;
513 mlog(0, "delivering an ast for this lockres\n");
514
515 BUG_ON(!lock->ast_pending);
516
517 /* remove from list (including ref) */
518 list_del_init(&lock->ast_list);
519 dlm_lock_put(lock);
520 spin_unlock(&dlm->ast_lock);
521
522 if (lock->ml.node != dlm->node_num) {
523 ret = dlm_do_remote_ast(dlm, res, lock);
524 if (ret < 0)
525 mlog_errno(ret);
526 } else
527 dlm_do_local_ast(dlm, res, lock);
528
529 spin_lock(&dlm->ast_lock);
530
531 /* possible that another ast was queued while
532 * we were delivering the last one */
533 if (!list_empty(&lock->ast_list)) {
534 mlog(0, "aha another ast got queued while "
535 "we were finishing the last one. will "
536 "keep the ast_pending flag set.\n");
537 } else
538 lock->ast_pending = 0;
539
540 /* drop the extra ref.
541 * this may drop it completely. */
542 dlm_lock_put(lock);
543 dlm_lockres_release_ast(dlm, res);
544 }
545
546 while (!list_empty(&dlm->pending_basts)) {
547 lock = list_entry(dlm->pending_basts.next,
548 struct dlm_lock, bast_list);
549 /* get an extra ref on lock */
550 dlm_lock_get(lock);
551 res = lock->lockres;
552
553 BUG_ON(!lock->bast_pending);
554
555 /* get the highest blocked lock, and reset */
556 spin_lock(&lock->spinlock);
557 BUG_ON(lock->ml.highest_blocked <= LKM_IVMODE);
558 hi = lock->ml.highest_blocked;
559 lock->ml.highest_blocked = LKM_IVMODE;
560 spin_unlock(&lock->spinlock);
561
562 /* remove from list (including ref) */
563 list_del_init(&lock->bast_list);
564 dlm_lock_put(lock);
565 spin_unlock(&dlm->ast_lock);
566
567 mlog(0, "delivering a bast for this lockres "
568 "(blocked = %d\n", hi);
569
570 if (lock->ml.node != dlm->node_num) {
571 ret = dlm_send_proxy_bast(dlm, res, lock, hi);
572 if (ret < 0)
573 mlog_errno(ret);
574 } else
575 dlm_do_local_bast(dlm, res, lock, hi);
576
577 spin_lock(&dlm->ast_lock);
578
579 /* possible that another bast was queued while
580 * we were delivering the last one */
581 if (!list_empty(&lock->bast_list)) {
582 mlog(0, "aha another bast got queued while "
583 "we were finishing the last one. will "
584 "keep the bast_pending flag set.\n");
585 } else
586 lock->bast_pending = 0;
587
588 /* drop the extra ref.
589 * this may drop it completely. */
590 dlm_lock_put(lock);
591 dlm_lockres_release_ast(dlm, res);
592 }
593 wake_up(&dlm->ast_wq);
594 spin_unlock(&dlm->ast_lock);
595}
596
597
598#define DLM_THREAD_TIMEOUT_MS (4 * 1000)
599#define DLM_THREAD_MAX_DIRTY 100
600#define DLM_THREAD_MAX_ASTS 10
601
602static int dlm_thread(void *data)
603{
604 struct dlm_lock_resource *res;
605 struct dlm_ctxt *dlm = data;
606 unsigned long timeout = msecs_to_jiffies(DLM_THREAD_TIMEOUT_MS);
607
608 mlog(0, "dlm thread running for %s...\n", dlm->name);
609
610 while (!kthread_should_stop()) {
611 int n = DLM_THREAD_MAX_DIRTY;
612
613 /* dlm_shutting_down is very point-in-time, but that
614 * doesn't matter as we'll just loop back around if we
615 * get false on the leading edge of a state
616 * transition. */
617 dlm_run_purge_list(dlm, dlm_shutting_down(dlm));
618
619 /* We really don't want to hold dlm->spinlock while
620 * calling dlm_shuffle_lists on each lockres that
621 * needs to have its queues adjusted and AST/BASTs
622 * run. So let's pull each entry off the dirty_list
623 * and drop dlm->spinlock ASAP. Once off the list,
624 * res->spinlock needs to be taken again to protect
625 * the queues while calling dlm_shuffle_lists. */
626 spin_lock(&dlm->spinlock);
627 while (!list_empty(&dlm->dirty_list)) {
628 int delay = 0;
629 res = list_entry(dlm->dirty_list.next,
630 struct dlm_lock_resource, dirty);
631
632 /* peel a lockres off, remove it from the list,
633 * unset the dirty flag and drop the dlm lock */
634 BUG_ON(!res);
635 dlm_lockres_get(res);
636
637 spin_lock(&res->spinlock);
638 res->state &= ~DLM_LOCK_RES_DIRTY;
639 list_del_init(&res->dirty);
640 spin_unlock(&res->spinlock);
641 spin_unlock(&dlm->spinlock);
642
643 /* lockres can be re-dirtied/re-added to the
644 * dirty_list in this gap, but that is ok */
645
646 spin_lock(&res->spinlock);
647 if (res->owner != dlm->node_num) {
648 __dlm_print_one_lock_resource(res);
649 mlog(ML_ERROR, "inprog:%s, mig:%s, reco:%s, dirty:%s\n",
650 res->state & DLM_LOCK_RES_IN_PROGRESS ? "yes" : "no",
651 res->state & DLM_LOCK_RES_MIGRATING ? "yes" : "no",
652 res->state & DLM_LOCK_RES_RECOVERING ? "yes" : "no",
653 res->state & DLM_LOCK_RES_DIRTY ? "yes" : "no");
654 }
655 BUG_ON(res->owner != dlm->node_num);
656
657 /* it is now ok to move lockreses in these states
658 * to the dirty list, assuming that they will only be
659 * dirty for a short while. */
660 if (res->state & (DLM_LOCK_RES_IN_PROGRESS |
661 DLM_LOCK_RES_MIGRATING |
662 DLM_LOCK_RES_RECOVERING)) {
663 /* move it to the tail and keep going */
664 spin_unlock(&res->spinlock);
665 mlog(0, "delaying list shuffling for in-"
666 "progress lockres %.*s, state=%d\n",
667 res->lockname.len, res->lockname.name,
668 res->state);
669 delay = 1;
670 goto in_progress;
671 }
672
673 /* at this point the lockres is not migrating/
674 * recovering/in-progress. we have the lockres
675 * spinlock and do NOT have the dlm lock.
676 * safe to reserve/queue asts and run the lists. */
677
Kurt Hackel8d79d082006-04-27 17:58:23 -0700678 mlog(0, "calling dlm_shuffle_lists with dlm=%s, "
679 "res=%.*s\n", dlm->name,
680 res->lockname.len, res->lockname.name);
Kurt Hackel6714d8e2005-12-15 14:31:23 -0800681
682 /* called while holding lockres lock */
683 dlm_shuffle_lists(dlm, res);
684 spin_unlock(&res->spinlock);
685
686 dlm_lockres_calc_usage(dlm, res);
687
688in_progress:
689
690 spin_lock(&dlm->spinlock);
691 /* if the lock was in-progress, stick
692 * it on the back of the list */
693 if (delay) {
694 spin_lock(&res->spinlock);
695 list_add_tail(&res->dirty, &dlm->dirty_list);
696 res->state |= DLM_LOCK_RES_DIRTY;
697 spin_unlock(&res->spinlock);
698 }
699 dlm_lockres_put(res);
700
701 /* unlikely, but we may need to give time to
702 * other tasks */
703 if (!--n) {
704 mlog(0, "throttling dlm_thread\n");
705 break;
706 }
707 }
708
709 spin_unlock(&dlm->spinlock);
710 dlm_flush_asts(dlm);
711
712 /* yield and continue right away if there is more work to do */
713 if (!n) {
714 yield();
715 continue;
716 }
717
718 wait_event_interruptible_timeout(dlm->dlm_thread_wq,
719 !dlm_dirty_list_empty(dlm) ||
720 kthread_should_stop(),
721 timeout);
722 }
723
724 mlog(0, "quitting DLM thread\n");
725 return 0;
726}