// SPDX-License-Identifier: GPL-2.0-only /****************************************************************************** ******************************************************************************* ** ** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved. ** Copyright (C) 2004-2011 Red Hat, Inc. All rights reserved. ** ** *******************************************************************************
******************************************************************************/
/* If the start for which we're re-enabling locking (seq) has been superseded by a newer stop (ls_recover_seq), we need to leave locking disabled.
We suspend dlm_recv threads here to avoid the race where dlm_recv a) sees locking stopped and b) adds a message to the requestqueue, but dlm_recoverd
enables locking and clears the requestqueue between a and b. */
spin_lock_bh(&ls->ls_recover_lock); if (ls->ls_recover_seq == seq) {
set_bit(LSFL_RUNNING, &ls->ls_flags); /* Schedule next timer if recovery put something on inactive. * * The rsbs that was queued while recovery on toss hasn't * started yet because LSFL_RUNNING was set everything * else recovery hasn't started as well because ls_in_recovery * is still hold. So we should not run into the case that * resume_scan_timer() queues a timer that can occur in * a no op.
*/
resume_scan_timer(ls); /* unblocks processes waiting to enter the dlm */
up_write(&ls->ls_in_recovery);
clear_bit(LSFL_RECOVER_LOCK, &ls->ls_flags);
error = 0;
}
spin_unlock_bh(&ls->ls_recover_lock);
/* * This list of root rsb's will be the basis of most of the recovery * routines.
*/
dlm_create_root_list(ls, &root_list);
/* * Add or remove nodes from the lockspace's ls_nodes list. * * Due to the fact that we must report all membership changes to lsops * or midcomms layer, it is not permitted to abort ls_recover() until * this is done.
*/
/* Create a snapshot of all active rsbs were we are the master of. * During the barrier between dlm_recover_members_wait() and * dlm_recover_directory() other nodes can dump their necessary * directory dlm_rsb (r->res_dir_nodeid == nodeid) in rcom * communication dlm_copy_master_names() handling. * * TODO We should create a per lockspace list that contains rsbs * that we are the master of. Instead of creating this list while * recovery we keep track of those rsbs while locking handling and * recovery can use it when necessary.
*/
error = dlm_create_masters_list(ls); if (error) {
log_rinfo(ls, "dlm_create_masters_list error %d", error); goto fail_root_list;
}
/* * We may have outstanding operations that are waiting for a reply from * a failed node. Mark these to be resent after recovery. Unlock and * cancel ops can just be completed.
*/
dlm_recover_waiters_pre(ls);
if (dlm_recovery_stopped(ls)) {
error = -EINTR; goto fail_root_list;
}
if (neg || dlm_no_directory(ls)) { /* * Clear lkb's for departed nodes.
*/
dlm_recover_purge(ls, &root_list);
/* * Get new master nodeid's for rsb's that were mastered on * departed nodes.
*/
/* * Finalize state in master rsb's now that all locks can be * checked. This includes conversion resolution and lvb * settings.
*/
dlm_recover_rsbs(ls, &root_list);
} else { /* * Other lockspace members may be going through the "neg" steps * while also adding us to the lockspace, in which case they'll * be doing the recover_locks (RS_LOCKS) barrier.
*/
dlm_set_recover_status(ls, DLM_RS_LOCKS);
/* * Purge directory-related requests that are saved in requestqueue. * All dir requests from before recovery are invalid now due to the dir * rebuild and will be resent by the requesting nodes.
*/
/* The dlm_ls_start() that created the rv we take here may already have been stopped via dlm_ls_stop(); in that case we need to leave the RECOVERY_STOP
flag set. */
if (rv) {
error = ls_recover(ls, rv); switch (error) { case 0:
ls->ls_recovery_result = 0;
complete(&ls->ls_recovery_done);
dlm_lsop_recover_done(ls); break; case -EINTR: /* if recovery was interrupted -EINTR we wait for the next * ls_recover() iteration until it hopefully succeeds.
*/
log_rinfo(ls, "%s %llu interrupted and should be queued to run again",
__func__, (unsignedlonglong)rv->seq); break; default:
log_rinfo(ls, "%s %llu error %d", __func__,
(unsignedlonglong)rv->seq, error);
/* let new_lockspace() get aware of critical error */
ls->ls_recovery_result = error;
complete(&ls->ls_recovery_done); break;
}
while (1) { /* * We call kthread_should_stop() after set_current_state(). * This is because it works correctly if kthread_stop() is * called just before set_current_state().
*/
set_current_state(TASK_INTERRUPTIBLE); if (kthread_should_stop()) {
set_current_state(TASK_RUNNING); break;
} if (!test_bit(LSFL_RECOVER_WORK, &ls->ls_flags) &&
!test_bit(LSFL_RECOVER_DOWN, &ls->ls_flags)) { if (kthread_should_stop()) break;
schedule();
}
set_current_state(TASK_RUNNING);
if (test_and_clear_bit(LSFL_RECOVER_DOWN, &ls->ls_flags)) {
down_write(&ls->ls_in_recovery);
set_bit(LSFL_RECOVER_LOCK, &ls->ls_flags);
wake_up(&ls->ls_recover_lock_wait);
}
if (test_and_clear_bit(LSFL_RECOVER_WORK, &ls->ls_flags))
do_ls_recovery(ls);
}
if (test_bit(LSFL_RECOVER_LOCK, &ls->ls_flags))
up_write(&ls->ls_in_recovery);
dlm_put_lockspace(ls); return 0;
}
int dlm_recoverd_start(struct dlm_ls *ls)
{ struct task_struct *p; int error = 0;
p = kthread_run(dlm_recoverd, ls, "dlm_recoverd"); if (IS_ERR(p))
error = PTR_ERR(p); else
ls->ls_recoverd_task = p; return error;
}
Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.
Bemerkung:
Die farbliche Syntaxdarstellung und die Messung sind noch experimentell.