// SPDX-License-Identifier: GPL-2.0-only /****************************************************************************** ******************************************************************************* ** ** Copyright (C) 2005-2010 Red Hat, Inc. All rights reserved. ** ** *******************************************************************************
******************************************************************************/
Stage 2, xxxx_lock(), just finds and locks the relevant rsb which is provided to the next stage.
Stage 3, _xxxx_lock(), determines if the operation is local or remote. When remote, it calls send_xxxx(), when local it calls do_xxxx().
Stage 4, do_xxxx(), is the guts of the operation. It manipulates the given rsb and lkb and queues callbacks.
For remote operations, send_xxxx() results in the corresponding do_xxxx() function being executed on the remote node. The connecting send/receive calls on local (L) and remote (R) nodes:
/* * Lock compatibilty matrix - thanks Steve * UN = Unlocked state. Not really a state, used as a flag * PD = Padding. Used to make the matrix a nice power of two in size * Other states are the same as the VMS DLM. * Usage: matrix[grmode+1][rqmode+1] (although m[rq+1][gr+1] is the same)
*/
/* * This defines the direction of transfer of LVB data. * Granted mode is the row; requested mode is the column. * Usage: matrix[grmode+1][rqmode+1] * 1 = LVB is returned to the caller * 0 = LVB is written to the resource * -1 = nothing happens to the LVB
*/
int dlm_modes_compat(int mode1, int mode2)
{ return __dlm_compat_matrix[mode1 + 1][mode2 + 1];
}
/* * Compatibility matrix for conversions with QUECVT set. * Granted mode is the row; requested mode is the column. * Usage: matrix[grmode+1][rqmode+1]
*/
/* TODO move this to lib/refcount.c */ static __must_check bool
dlm_refcount_dec_and_write_lock_bh(refcount_t *r, rwlock_t *lock)
__cond_acquires(lock)
{ if (refcount_dec_not_one(r)) returnfalse;
write_lock_bh(lock); if (!refcount_dec_and_test(r)) {
write_unlock_bh(lock); returnfalse;
}
returntrue;
}
/* TODO move this to include/linux/kref.h */ staticinlineint dlm_kref_put_write_lock_bh(struct kref *kref, void (*release)(struct kref *kref),
rwlock_t *lock)
{ if (dlm_refcount_dec_and_write_lock_bh(&kref->refcount, lock)) {
release(kref); return 1;
}
/* connected with timer_delete_sync() in dlm_ls_stop() to stop * new timers when recovery is triggered and don't run them * again until a resume_scan_timer() tries it again.
*/ staticvoid enable_scan_timer(struct dlm_ls *ls, unsignedlong jiffies)
{ if (!dlm_locking_stopped(ls))
mod_timer(&ls->ls_scan_timer, jiffies);
}
/* This function tries to resume the timer callback if a rsb * is on the scan list and no timer is pending. It might that * the first entry is on currently executed as timer callback * but we don't care if a timer queued up again and does * nothing. Should be a rare case.
*/ void resume_scan_timer(struct dlm_ls *ls)
{ struct dlm_rsb *r;
spin_lock_bh(&ls->ls_scan_lock);
r = list_first_entry_or_null(&ls->ls_scan_list, struct dlm_rsb,
res_scan_list); if (r && !timer_pending(&ls->ls_scan_timer))
enable_scan_timer(ls, r->res_toss_time);
spin_unlock_bh(&ls->ls_scan_lock);
}
/* if the rsb is not queued do nothing */ if (list_empty(&r->res_scan_list)) goto out;
/* get the first element before delete */
first = list_first_entry(&ls->ls_scan_list, struct dlm_rsb,
res_scan_list);
list_del_init(&r->res_scan_list); /* check if the first element was the rsb we deleted */ if (first == r) { /* try to get the new first element, if the list * is empty now try to delete the timer, if we are * too late we don't care. * * if the list isn't empty and a new first element got * in place, set the new timer expire time.
*/
first = list_first_entry_or_null(&ls->ls_scan_list, struct dlm_rsb,
res_scan_list); if (!first)
timer_delete(&ls->ls_scan_timer); else
enable_scan_timer(ls, first->res_toss_time);
}
/* A dir record for a remote master rsb should never be on the scan list. */
WARN_ON(!dlm_no_directory(ls) &&
(r->res_master_nodeid != our_nodeid) &&
(dlm_dir_nodeid(r) == our_nodeid));
/* An active rsb should never be on the scan list. */
WARN_ON(!rsb_flag(r, RSB_INACTIVE));
/* An rsb should not already be on the scan list. */
WARN_ON(!list_empty(&r->res_scan_list));
spin_lock_bh(&ls->ls_scan_lock); /* set the new rsb absolute expire time in the rsb */
r->res_toss_time = rsb_toss_jiffies(); if (list_empty(&ls->ls_scan_list)) { /* if the queue is empty add the element and it's * our new expire time
*/
list_add_tail(&r->res_scan_list, &ls->ls_scan_list);
enable_scan_timer(ls, r->res_toss_time);
} else { /* try to get the maybe new first element and then add * to this rsb with the oldest expire time to the end * of the queue. If the list was empty before this * rsb expire time is our next expiration if it wasn't * the now new first elemet is our new expiration time
*/
first = list_first_entry_or_null(&ls->ls_scan_list, struct dlm_rsb,
res_scan_list);
list_add_tail(&r->res_scan_list, &ls->ls_scan_list); if (!first)
enable_scan_timer(ls, r->res_toss_time); else
enable_scan_timer(ls, first->res_toss_time);
}
spin_unlock_bh(&ls->ls_scan_lock);
}
/* if we hit contention we do in 250 ms a retry to trylock. * if there is any other mod_timer in between we don't care * about that it expires earlier again this is only for the * unlikely case nothing happened in this time.
*/ #define DLM_TOSS_TIMER_RETRY (jiffies + msecs_to_jiffies(250))
/* Called by lockspace scan_timer to free unused rsb's. */
while (1) { /* interrupting point to leave iteration when * recovery waits for timer_delete_sync(), recovery * will take care to delete everything in scan list.
*/ if (dlm_locking_stopped(ls)) break;
rv = spin_trylock(&ls->ls_scan_lock); if (!rv) { /* rearm again try timer */
enable_scan_timer(ls, DLM_TOSS_TIMER_RETRY); break;
}
r = list_first_entry_or_null(&ls->ls_scan_list, struct dlm_rsb,
res_scan_list); if (!r) { /* the next add_scan will enable the timer again */
spin_unlock(&ls->ls_scan_lock); break;
}
/* * If the first rsb is not yet expired, then stop because the * list is sorted with nearest expiration first.
*/ if (time_before(jiffies, r->res_toss_time)) { /* rearm with the next rsb to expire in the future */
enable_scan_timer(ls, r->res_toss_time);
spin_unlock(&ls->ls_scan_lock); break;
}
/* in find_rsb_dir/nodir there is a reverse order of this * lock, however this is only a trylock if we hit some * possible contention we try it again.
*/
rv = write_trylock(&ls->ls_rsbtbl_lock); if (!rv) {
spin_unlock(&ls->ls_scan_lock); /* rearm again try timer */
enable_scan_timer(ls, DLM_TOSS_TIMER_RETRY); break;
}
/* An rsb that is a dir record for a remote master rsb * cannot be removed, and should not have a timer enabled.
*/
WARN_ON(!dlm_no_directory(ls) &&
(r->res_master_nodeid != our_nodeid) &&
(dlm_dir_nodeid(r) == our_nodeid));
/* We're the master of this rsb but we're not * the directory record, so we need to tell the * dir node to remove the dir record
*/ if (!dlm_no_directory(ls) &&
(r->res_master_nodeid == our_nodeid) &&
(dlm_dir_nodeid(r) != our_nodeid))
send_remove(r);
free_inactive_rsb(r);
}
}
/* If ls->ls_new_rsb is empty, return -EAGAIN, so the caller can unlock any spinlocks, go back and call pre_rsb_struct again.
Otherwise, take an rsb off the list and return it. */
staticint rsb_insert(struct dlm_rsb *rsb, struct rhashtable *rhash)
{ int rv;
rv = rhashtable_insert_fast(rhash, &rsb->res_node,
dlm_rhash_rsb_params); if (!rv)
rsb_set_flag(rsb, RSB_HASHED);
return rv;
}
/* * Find rsb in rsbtbl and potentially create/add one * * Delaying the release of rsb's has a similar benefit to applications keeping * NL locks on an rsb, but without the guarantee that the cached master value * will still be valid when the rsb is reused. Apps aren't always smart enough * to keep NL locks on an rsb that they may lock again shortly; this can lead * to excessive master lookups and removals if we don't delay the release. * * Searching for an rsb means looking through both the normal list and toss * list. When found on the toss list the rsb is moved to the normal list with * ref count of 1; when found on normal list the ref count is incremented. * * rsb's on the keep list are being used locally and refcounted. * rsb's on the toss list are not being used locally, and are not refcounted. * * The toss list rsb's were either * - previously used locally but not any more (were on keep list, then * moved to toss list when last refcount dropped) * - created and put on toss list as a directory record for a lookup * (we are the dir node for the res, but are not using the res right now, * but some other node is) * * The purpose of find_rsb() is to return a refcounted rsb for local use. * So, if the given rsb is on the toss list, it is moved to the keep list * before being returned. * * deactivate_rsb() happens when all local usage of the rsb is done, i.e. no * more refcounts exist, so the rsb is moved from the keep list to the * toss list. * * rsb's on both keep and toss lists are used for doing a name to master * lookups. rsb's that are in use locally (and being refcounted) are on * the keep list, rsb's that are not in use locally (not refcounted) and * only exist for name/master lookups are on the toss list. * * rsb's on the toss list who's dir_nodeid is not local can have stale * name/master mappings. So, remote requests on such rsb's can potentially * return with an error, which means the mapping is stale and needs to * be updated with a new lookup. (The idea behind MASTER UNCERTAIN and * first_lkid is to keep only a single outstanding request on an rsb * while that rsb has a potentially stale master.)
*/
staticint find_rsb_dir(struct dlm_ls *ls, constvoid *name, int len,
uint32_t hash, int dir_nodeid, int from_nodeid, unsignedint flags, struct dlm_rsb **r_ret)
{ struct dlm_rsb *r = NULL; int our_nodeid = dlm_our_nodeid(); int from_local = 0; int from_other = 0; int from_dir = 0; int create = 0; int error;
/* * flags & R_RECEIVE_RECOVER is from dlm_recover_master_copy, so * from_nodeid has sent us a lock in dlm_recover_locks, believing * we're the new master. Our local recovery may not have set * res_master_nodeid to our_nodeid yet, so allow either. Don't * create the rsb; dlm_recover_process_copy() will handle EBADR * by resending. * * If someone sends us a request, we are the dir node, and we do * not find the rsb anywhere, then recreate it. This happens if * someone sends us a request after we have removed/freed an rsb. * (They sent a request instead of lookup because they are using * an rsb taken from their scan list.)
*/
/* check if the rsb is active under read lock - likely path */
read_lock_bh(&ls->ls_rsbtbl_lock); if (!rsb_flag(r, RSB_HASHED)) {
read_unlock_bh(&ls->ls_rsbtbl_lock);
error = -EBADR; goto do_new;
}
/* * rsb is active, so we can't check master_nodeid without lock_rsb.
*/
if (rsb_flag(r, RSB_INACTIVE)) {
read_unlock_bh(&ls->ls_rsbtbl_lock); goto do_inactive;
}
/* * The expectation here is that the rsb will have HASHED and * INACTIVE flags set, and that the rsb can be moved from * inactive back to active again. However, between releasing * the read lock and acquiring the write lock, this rsb could * have been removed from rsbtbl, and had HASHED cleared, to * be freed. To deal with this case, we would normally need * to repeat dlm_search_rsb_tree while holding the write lock, * but rcu allows us to simply check the HASHED flag, because * the rcu read lock means the rsb will not be freed yet. * If the HASHED flag is not set, then the rsb is being freed, * so we add a new rsb struct. If the HASHED flag is set, * and INACTIVE is not set, it means another thread has * made the rsb active, as we're expecting to do here, and * we just repeat the lookup (this will be very unlikely.)
*/ if (rsb_flag(r, RSB_HASHED)) { if (!rsb_flag(r, RSB_INACTIVE)) {
write_unlock_bh(&ls->ls_rsbtbl_lock); goto retry;
}
} else {
write_unlock_bh(&ls->ls_rsbtbl_lock);
error = -EBADR; goto do_new;
}
/* * rsb found inactive (master_nodeid may be out of date unless * we are the dir_nodeid or were the master) No other thread * is using this rsb because it's inactive, so we can * look at or update res_master_nodeid without lock_rsb.
*/
if ((r->res_master_nodeid != our_nodeid) && from_other) { /* our rsb was not master, and another node (not the dir node)
has sent us a request */
log_debug(ls, "find_rsb inactive from_other %d master %d dir %d %s",
from_nodeid, r->res_master_nodeid, dir_nodeid,
r->res_name);
write_unlock_bh(&ls->ls_rsbtbl_lock);
error = -ENOTBLK; goto out;
}
if ((r->res_master_nodeid != our_nodeid) && from_dir) { /* don't think this should ever happen */
log_error(ls, "find_rsb inactive from_dir %d master %d",
from_nodeid, r->res_master_nodeid);
dlm_print_rsb(r); /* fix it and go on */
r->res_master_nodeid = our_nodeid;
r->res_nodeid = 0;
rsb_clear_flag(r, RSB_MASTER_UNCERTAIN);
r->res_first_lkid = 0;
}
if (from_local && (r->res_master_nodeid != our_nodeid)) { /* Because we have held no locks on this rsb,
res_master_nodeid could have become stale. */
rsb_set_flag(r, RSB_MASTER_UNCERTAIN);
r->res_first_lkid = 0;
}
/* we always deactivate scan timer for the rsb, when * we move it out of the inactive state as rsb state * can be changed and scan timers are only for inactive * rsbs.
*/
del_scan(ls, r);
list_move(&r->res_slow_list, &ls->ls_slow_active);
rsb_clear_flag(r, RSB_INACTIVE);
kref_init(&r->res_ref); /* ref is now used in active state */
write_unlock_bh(&ls->ls_rsbtbl_lock);
goto out;
do_new: /* * rsb not found
*/
if (error == -EBADR && !create) goto out;
error = get_rsb_struct(ls, name, len, &r); if (WARN_ON_ONCE(error)) goto out;
if (from_dir) { /* want to see how often this happens */
log_debug(ls, "find_rsb new from_dir %d recreate %s",
from_nodeid, r->res_name);
r->res_master_nodeid = our_nodeid;
r->res_nodeid = 0; goto out_add;
}
if (from_other && (dir_nodeid != our_nodeid)) { /* should never happen */
log_error(ls, "find_rsb new from_other %d dir %d our %d %s",
from_nodeid, dir_nodeid, our_nodeid, r->res_name);
dlm_free_rsb(r);
r = NULL;
error = -ENOTBLK; goto out;
}
if (from_other) {
log_debug(ls, "find_rsb new from_other %d dir %d %s",
from_nodeid, dir_nodeid, r->res_name);
}
if (dir_nodeid == our_nodeid) { /* When we are the dir nodeid, we can set the master
node immediately */
r->res_master_nodeid = our_nodeid;
r->res_nodeid = 0;
} else { /* set_master will send_lookup to dir_nodeid */
r->res_master_nodeid = 0;
r->res_nodeid = -1;
}
out_add:
write_lock_bh(&ls->ls_rsbtbl_lock);
error = rsb_insert(r, &ls->ls_rsbtbl); if (error == -EEXIST) { /* somebody else was faster and it seems the * rsb exists now, we do a whole relookup
*/
write_unlock_bh(&ls->ls_rsbtbl_lock);
dlm_free_rsb(r); goto retry;
} elseif (!error) {
list_add(&r->res_slow_list, &ls->ls_slow_active);
}
write_unlock_bh(&ls->ls_rsbtbl_lock);
out:
*r_ret = r; return error;
}
/* During recovery, other nodes can send us new MSTCPY locks (from dlm_recover_locks) before we've made ourself master (in
dlm_recover_masters). */
staticint find_rsb_nodir(struct dlm_ls *ls, constvoid *name, int len,
uint32_t hash, int dir_nodeid, int from_nodeid, unsignedint flags, struct dlm_rsb **r_ret)
{ struct dlm_rsb *r = NULL; int our_nodeid = dlm_our_nodeid(); int recover = (flags & R_RECEIVE_RECOVER); int error;
/* check if the rsb is in active state under read lock - likely path */
read_lock_bh(&ls->ls_rsbtbl_lock); if (!rsb_flag(r, RSB_HASHED)) {
read_unlock_bh(&ls->ls_rsbtbl_lock); goto do_new;
}
if (rsb_flag(r, RSB_INACTIVE)) {
read_unlock_bh(&ls->ls_rsbtbl_lock); goto do_inactive;
}
/* * rsb is active, so we can't check master_nodeid without lock_rsb.
*/
/* See comment in find_rsb_dir. */ if (rsb_flag(r, RSB_HASHED)) { if (!rsb_flag(r, RSB_INACTIVE)) {
write_unlock_bh(&ls->ls_rsbtbl_lock); goto retry;
}
} else {
write_unlock_bh(&ls->ls_rsbtbl_lock); goto do_new;
}
/* * rsb found inactive. No other thread is using this rsb because * it's inactive, so we can look at or update res_master_nodeid * without lock_rsb.
*/
if (!recover && (r->res_master_nodeid != our_nodeid) && from_nodeid) { /* our rsb is not master, and another node has sent us a
request; this should never happen */
log_error(ls, "find_rsb inactive from_nodeid %d master %d dir %d",
from_nodeid, r->res_master_nodeid, dir_nodeid);
dlm_print_rsb(r);
write_unlock_bh(&ls->ls_rsbtbl_lock);
error = -ENOTBLK; goto out;
}
if (!recover && (r->res_master_nodeid != our_nodeid) &&
(dir_nodeid == our_nodeid)) { /* our rsb is not master, and we are dir; may as well fix it;
this should never happen */
log_error(ls, "find_rsb inactive our %d master %d dir %d",
our_nodeid, r->res_master_nodeid, dir_nodeid);
dlm_print_rsb(r);
r->res_master_nodeid = our_nodeid;
r->res_nodeid = 0;
}
write_lock_bh(&ls->ls_rsbtbl_lock);
error = rsb_insert(r, &ls->ls_rsbtbl); if (error == -EEXIST) { /* somebody else was faster and it seems the * rsb exists now, we do a whole relookup
*/
write_unlock_bh(&ls->ls_rsbtbl_lock);
dlm_free_rsb(r); goto retry;
} elseif (!error) {
list_add(&r->res_slow_list, &ls->ls_slow_active);
}
write_unlock_bh(&ls->ls_rsbtbl_lock);
out:
*r_ret = r; return error;
}
/* * rsb rcu usage * * While rcu read lock is held, the rsb cannot be freed, * which allows a lookup optimization. * * Two threads are accessing the same rsb concurrently, * the first (A) is trying to use the rsb, the second (B) * is trying to free the rsb. * * thread A thread B * (trying to use rsb) (trying to free rsb) * * A1. rcu read lock * A2. rsbtbl read lock * A3. look up rsb in rsbtbl * A4. rsbtbl read unlock * B1. rsbtbl write lock * B2. look up rsb in rsbtbl * B3. remove rsb from rsbtbl * B4. clear rsb HASHED flag * B5. rsbtbl write unlock * B6. begin freeing rsb using rcu... * * (rsb is inactive, so try to make it active again) * A5. read rsb HASHED flag (safe because rsb is not freed yet) * A6. the rsb HASHED flag is not set, which it means the rsb * is being removed from rsbtbl and freed, so don't use it. * A7. rcu read unlock * * B7. ...finish freeing rsb using rcu * A8. create a new rsb * * Without the rcu optimization, steps A5-8 would need to do * an extra rsbtbl lookup: * A5. rsbtbl write lock * A6. look up rsb in rsbtbl, not found * A7. rsbtbl write unlock * A8. create a new rsb
*/
staticint find_rsb(struct dlm_ls *ls, constvoid *name, int len, int from_nodeid, unsignedint flags, struct dlm_rsb **r_ret)
{ int dir_nodeid;
uint32_t hash; int rv;
/* we have received a request and found that res_master_nodeid != our_nodeid,
so we need to return an error or make ourself the master */
staticint validate_master_nodeid(struct dlm_ls *ls, struct dlm_rsb *r, int from_nodeid)
{ if (dlm_no_directory(ls)) {
log_error(ls, "find_rsb keep from_nodeid %d master %d dir %d",
from_nodeid, r->res_master_nodeid,
r->res_dir_nodeid);
dlm_print_rsb(r); return -ENOTBLK;
}
if (from_nodeid != r->res_dir_nodeid) { /* our rsb is not master, and another node (not the dir node) has sent us a request. this is much more common when our
master_nodeid is zero, so limit debug to non-zero. */
if (r->res_master_nodeid) {
log_debug(ls, "validate master from_other %d master %d " "dir %d first %x %s", from_nodeid,
r->res_master_nodeid, r->res_dir_nodeid,
r->res_first_lkid, r->res_name);
} return -ENOTBLK;
} else { /* our rsb is not master, but the dir nodeid has sent us a
request; this could happen with master 0 / res_nodeid -1 */
staticvoid __dlm_master_lookup(struct dlm_ls *ls, struct dlm_rsb *r, int our_nodeid, int from_nodeid, bool is_inactive, unsignedint flags, int *r_nodeid, int *result)
{ int fix_master = (flags & DLM_LU_RECOVER_MASTER); int from_master = (flags & DLM_LU_RECOVER_DIR);
if (r->res_dir_nodeid != our_nodeid) { /* should not happen, but may as well fix it and carry on */
log_error(ls, "%s res_dir %d our %d %s", __func__,
r->res_dir_nodeid, our_nodeid, r->res_name);
r->res_dir_nodeid = our_nodeid;
}
if (fix_master && r->res_master_nodeid && dlm_is_removed(ls, r->res_master_nodeid)) { /* Recovery uses this function to set a new master when * the previous master failed. Setting NEW_MASTER will * force dlm_recover_masters to call recover_master on this * rsb even though the res_nodeid is no longer removed.
*/
if (is_inactive) { /* I don't think we should ever find it inactive. */
log_error(ls, "%s fix_master inactive", __func__);
dlm_dump_rsb(r);
}
}
if (from_master && (r->res_master_nodeid != from_nodeid)) { /* this will happen if from_nodeid became master during * a previous recovery cycle, and we aborted the previous * cycle before recovering this master value
*/
if (!r->res_master_nodeid) { /* this will happen if recovery happens while we're looking * up the master for this rsb
*/
log_debug(ls, "%s master 0 to %d first %x %s", __func__,
from_nodeid, r->res_first_lkid, r->res_name);
r->res_master_nodeid = from_nodeid;
r->res_nodeid = from_nodeid;
}
if (!from_master && !fix_master &&
(r->res_master_nodeid == from_nodeid)) { /* this can happen when the master sends remove, the dir node * finds the rsb on the active list and ignores the remove, * and the former master sends a lookup
*/
log_limit(ls, "%s from master %d flags %x first %x %s",
__func__, from_nodeid, flags, r->res_first_lkid,
r->res_name);
}
ret_assign:
*r_nodeid = r->res_master_nodeid; if (result)
*result = DLM_LU_MATCH;
}
/* * We're the dir node for this res and another node wants to know the * master nodeid. During normal operation (non recovery) this is only * called from receive_lookup(); master lookups when the local node is * the dir node are done by find_rsb(). * * normal operation, we are the dir node for a resource * . _request_lock * . set_master * . send_lookup * . receive_lookup * . dlm_master_lookup flags 0 * * recover directory, we are rebuilding dir for all resources * . dlm_recover_directory * . dlm_rcom_names * remote node sends back the rsb names it is master of and we are dir of * . dlm_master_lookup RECOVER_DIR (fix_master 0, from_master 1) * we either create new rsb setting remote node as master, or find existing * rsb and set master to be the remote node. * * recover masters, we are finding the new master for resources * . dlm_recover_masters * . recover_master * . dlm_send_rcom_lookup * . receive_rcom_lookup * . dlm_master_lookup RECOVER_MASTER (fix_master 1, from_master 0)
*/
staticint _dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, constchar *name, int len, unsignedint flags, int *r_nodeid, int *result)
{ struct dlm_rsb *r = NULL;
uint32_t hash; int our_nodeid = dlm_our_nodeid(); int dir_nodeid, error;
if (len > DLM_RESNAME_MAXLEN) return -EINVAL;
if (from_nodeid == our_nodeid) {
log_error(ls, "dlm_master_lookup from our_nodeid %d flags %x",
our_nodeid, flags); return -EINVAL;
}
hash = jhash(name, len, 0);
dir_nodeid = dlm_hash2nodeid(ls, hash); if (dir_nodeid != our_nodeid) {
log_error(ls, "dlm_master_lookup from %d dir %d our %d h %x %d",
from_nodeid, dir_nodeid, our_nodeid, hash,
ls->ls_num_nodes);
*r_nodeid = -1; return -EINVAL;
}
/* check if the rsb is active under read lock - likely path */
read_lock_bh(&ls->ls_rsbtbl_lock); if (!rsb_flag(r, RSB_HASHED)) {
read_unlock_bh(&ls->ls_rsbtbl_lock); goto not_found;
}
if (rsb_flag(r, RSB_INACTIVE)) {
read_unlock_bh(&ls->ls_rsbtbl_lock); goto do_inactive;
}
/* because the rsb is active, we need to lock_rsb before * checking/changing re_master_nodeid
*/
/* the rsb was active */
unlock_rsb(r);
put_rsb(r);
return 0;
do_inactive: /* unlikely path - check if still part of ls_rsbtbl */
write_lock_bh(&ls->ls_rsbtbl_lock);
/* see comment in find_rsb_dir */ if (rsb_flag(r, RSB_HASHED)) { if (!rsb_flag(r, RSB_INACTIVE)) {
write_unlock_bh(&ls->ls_rsbtbl_lock); /* something as changed, very unlikely but * try again
*/ goto retry;
}
} else {
write_unlock_bh(&ls->ls_rsbtbl_lock); goto not_found;
}
/* because the rsb is inactive, it's not refcounted and lock_rsb
is not used, but is protected by the rsbtbl lock */
/* A dir record rsb should never be on scan list. * Except when we are the dir and master node. * This function should only be called by the dir * node.
*/
WARN_ON(!list_empty(&r->res_scan_list) &&
r->res_master_nodeid != our_nodeid);
write_lock_bh(&ls->ls_rsbtbl_lock);
error = rsb_insert(r, &ls->ls_rsbtbl); if (error == -EEXIST) { /* somebody else was faster and it seems the * rsb exists now, we do a whole relookup
*/
write_unlock_bh(&ls->ls_rsbtbl_lock);
dlm_free_rsb(r); goto retry;
} elseif (error) {
write_unlock_bh(&ls->ls_rsbtbl_lock); /* should never happen */
dlm_free_rsb(r); goto retry;
}
/* * When the rsb becomes unused, there are two possibilities: * 1. Leave the inactive rsb in place (don't remove it). * 2. Add it to the scan list to be removed. * * 1 is done when the rsb is acting as the dir record * for a remotely mastered rsb. The rsb must be left * in place as an inactive rsb to act as the dir record. * * 2 is done when a) the rsb is not the master and not the * dir record, b) when the rsb is both the master and the * dir record, c) when the rsb is master but not dir record. * * (If no directory is used, the rsb can always be removed.)
*/ if (dlm_no_directory(ls) ||
(r->res_master_nodeid == our_nodeid ||
dlm_dir_nodeid(r) != our_nodeid))
add_scan(ls, r);
if (r->res_lvbptr) {
dlm_free_lvb(r->res_lvbptr);
r->res_lvbptr = NULL;
}
}
rcu_read_lock();
lkb = xa_load(&ls->ls_lkbxa, lkid); if (lkb) { /* check if lkb is still part of lkbxa under lkbxa_lock as * the lkb_ref is tight to the lkbxa data structure, see * __put_lkb().
*/
read_lock_bh(&ls->ls_lkbxa_lock); if (kref_read(&lkb->lkb_ref))
kref_get(&lkb->lkb_ref); else
lkb = NULL;
read_unlock_bh(&ls->ls_lkbxa_lock);
}
rcu_read_unlock();
/* This is called when we need to remove a reference and are certain it's not the last ref. e.g. del_lkb is always called between a find_lkb/put_lkb and is always the inverse of a previous add_lkb.
put_lkb would work fine, but would involve unnecessary locking */
staticint msg_reply_type(int mstype)
{ switch (mstype) { case DLM_MSG_REQUEST: return DLM_MSG_REQUEST_REPLY; case DLM_MSG_CONVERT: return DLM_MSG_CONVERT_REPLY; case DLM_MSG_UNLOCK: return DLM_MSG_UNLOCK_REPLY; case DLM_MSG_CANCEL: return DLM_MSG_CANCEL_REPLY; case DLM_MSG_LOOKUP: return DLM_MSG_LOOKUP_REPLY;
} return -1;
}
/* add/remove lkb from global waiters list of lkb's waiting for
a reply from a remote node */
staticvoid add_to_waiters(struct dlm_lkb *lkb, int mstype, int to_nodeid)
{ struct dlm_ls *ls = lkb->lkb_resource->res_ls;
spin_lock_bh(&ls->ls_waiters_lock); if (lkb->lkb_wait_type || is_overlap_cancel(lkb)) { switch (mstype) { case DLM_MSG_UNLOCK:
set_bit(DLM_IFL_OVERLAP_UNLOCK_BIT, &lkb->lkb_iflags); break; case DLM_MSG_CANCEL:
set_bit(DLM_IFL_OVERLAP_CANCEL_BIT, &lkb->lkb_iflags); break; default: /* should never happen as validate_lock_args() checks * on lkb_wait_type and validate_unlock_args() only * creates UNLOCK or CANCEL messages.
*/
WARN_ON_ONCE(1); goto out;
}
lkb->lkb_wait_count++;
hold_lkb(lkb);
log_debug(ls, "addwait %x cur %d overlap %d count %d f %x",
lkb->lkb_id, lkb->lkb_wait_type, mstype,
lkb->lkb_wait_count, dlm_iflags_val(lkb)); goto out;
}
/* We clear the RESEND flag because we might be taking an lkb off the waiters list as part of process_requestqueue (e.g. a lookup that has an optimized request reply on the requestqueue) between dlm_recover_waiters_pre() which
set RESEND and dlm_recover_waiters_post() */
staticint _remove_from_waiters(struct dlm_lkb *lkb, int mstype, conststruct dlm_message *ms)
{ struct dlm_ls *ls = lkb->lkb_resource->res_ls; int overlap_done = 0;
/* Remove for the convert reply, and premptively remove for the cancel reply. A convert has been granted while there's still an outstanding cancel on it (the cancel is moot and the result in the cancel reply should be 0). We preempt the cancel reply because the app gets the convert result and then can follow up with another op, like convert. This subsequent op would see the
lingering state of the cancel and fail with -EBUSY. */
out_del: /* the force-unlock/cancel has completed and we haven't recvd a reply to the op that was in progress prior to the unlock/cancel; we give up on any reply to the earlier op. FIXME: not sure when/how
this would happen */
/* Handles situations where we might be processing a "fake" or "local" reply in * the recovery context which stops any locking activity. Only debugfs might * change the lockspace waiters but they will held the recovery lock to ensure * remove_from_waiters_ms() in local case will be the only user manipulating the * lockspace waiters in recovery context.
*/
b = dlm_lvb_operations[lkb->lkb_grmode + 1][lkb->lkb_rqmode + 1]; if (b == 1) { int len = receive_extralen(ms); if (len > r->res_ls->ls_lvblen)
len = r->res_ls->ls_lvblen;
memcpy(lkb->lkb_lvbptr, ms->m_extra, len);
lkb->lkb_lvbseq = le32_to_cpu(ms->m_lvbseq);
}
}
/* Manipulate lkb's on rsb's convert/granted/waiting queues remove_lock -- used for unlock, removes lkb from granted revert_lock -- used for cancel, moves lkb from convert to granted grant_lock -- used for request and convert, adds lkb to granted or moves lkb from convert or waiting to granted
Each of these is used for master or local copy lkb's. There is also a _pc() variation used to make the corresponding change on
a process copy (pc) lkb. */
staticvoid _remove_lock(struct dlm_rsb *r, struct dlm_lkb *lkb)
{
del_lkb(r, lkb);
lkb->lkb_grmode = DLM_LOCK_IV; /* this unhold undoes the original ref from create_lkb()
so this leads to the lkb being freed */
unhold_lkb(lkb);
}
switch (lkb->lkb_status) { case DLM_LKSTS_GRANTED: break; case DLM_LKSTS_CONVERT:
move_lkb(r, lkb, DLM_LKSTS_GRANTED);
rv = 1; break; case DLM_LKSTS_WAITING:
del_lkb(r, lkb);
lkb->lkb_grmode = DLM_LOCK_IV; /* this unhold undoes the original ref from create_lkb()
so this leads to the lkb being freed */
unhold_lkb(lkb);
rv = -1; break; default:
log_print("invalid status for revert %d", lkb->lkb_status);
} return rv;
}
/* called by grant_pending_locks() which means an async grant message must be sent to the requesting node in addition to granting the lock if the
lkb belongs to a remote node. */
/* The special CONVDEADLK, ALTPR and ALTCW flags allow the master to change the granted/requested modes. We're munging things accordingly in the process copy. CONVDEADLK: our grmode may have been forced down to NL to resolve a conversion deadlock ALTPR/ALTCW: our rqmode may have been changed to PR or CW to become
compatible with other granted locks */
list_for_each_entry(this, head, lkb_statequeue) { if (this == lkb) continue; if (!modes_compat(this, lkb)) return 1;
} return 0;
}
/* * "A conversion deadlock arises with a pair of lock requests in the converting * queue for one resource. The granted mode of each lock blocks the requested * mode of the other lock." * * Part 2: if the granted mode of lkb is preventing an earlier lkb in the * convert queue from being granted, then deadlk/demote lkb. * * Example: * Granted Queue: empty * Convert Queue: NL->EX (first lock) * PR->EX (second lock) * * The first lock can't be granted because of the granted mode of the second * lock and the second lock can't be granted because it's not first in the * list. We either cancel lkb's conversion (PR->EX) and return EDEADLK, or we * demote the granted mode of lkb (from PR to NL) if it has the CONVDEADLK * flag set and return DEMOTED in the lksb flags. * * Originally, this function detected conv-deadlk in a more limited scope: * - if !modes_compat(lkb1, lkb2) && !modes_compat(lkb2, lkb1), or * - if lkb1 was the first entry in the queue (not just earlier), and was * blocked by the granted mode of lkb2, and there was nothing on the * granted queue preventing lkb1 from being granted immediately, i.e. * lkb2 was the only thing preventing lkb1 from being granted. * * That second condition meant we'd only say there was conv-deadlk if * resolving it (by demotion) would lead to the first lock on the convert * queue being granted right away. It allowed conversion deadlocks to exist * between locks on the convert queue while they couldn't be granted anyway. * * Now, we detect and take action on conversion deadlocks immediately when * they're created, even if they may not be immediately consequential. If * lkb1 exists anywhere in the convert queue and lkb2 comes in with a granted * mode that would prevent lkb1's conversion from being granted, we do a * deadlk/demote on lkb2 right away and don't let it onto the convert queue. * I think this means that the lkb_is_ahead condition below should always * be zero, i.e. there will never be conv-deadlk between two locks that are * both already on the convert queue.
*/
if (!lkb_is_ahead) { if (!modes_compat(lkb2, lkb1)) return 1;
} else { if (!modes_compat(lkb2, lkb1) &&
!modes_compat(lkb1, lkb2)) return 1;
}
} return 0;
}
/* * Return 1 if the lock can be granted, 0 otherwise. * Also detect and resolve conversion deadlocks. * * lkb is the lock to be granted * * now is 1 if the function is being called in the context of the * immediate request, it is 0 if called later, after the lock has been * queued. * * recover is 1 if dlm_recover_grant() is trying to grant conversions * after recovery. * * References are from chapter 6 of "VAXcluster Principles" by Roy Davis
*/
staticint _can_be_granted(struct dlm_rsb *r, struct dlm_lkb *lkb, int now, int recover)
{
int8_t conv = (lkb->lkb_grmode != DLM_LOCK_IV);
/* * 6-10: Version 5.4 introduced an option to address the phenomenon of * a new request for a NL mode lock being blocked. * * 6-11: If the optional EXPEDITE flag is used with the new NL mode * request, then it would be granted. In essence, the use of this flag * tells the Lock Manager to expedite theis request by not considering * what may be in the CONVERTING or WAITING queues... As of this * writing, the EXPEDITE flag can be used only with new requests for NL * mode locks. This flag is not valid for conversion requests. * * A shortcut. Earlier checks return an error if EXPEDITE is used in a * conversion or used with a non-NL requested mode. We also know an * EXPEDITE request is always granted immediately, so now must always * be 1. The full condition to grant an expedite request: (now && * !conv && lkb->rqmode == DLM_LOCK_NL && (flags & EXPEDITE)) can * therefore be shortened to just checking the flag.
*/
if (lkb->lkb_exflags & DLM_LKF_EXPEDITE) return 1;
/* * A shortcut. Without this, !queue_conflict(grantqueue, lkb) would be * added to the remaining conditions.
*/
if (queue_conflict(&r->res_grantqueue, lkb)) return 0;
/* * 6-3: By default, a conversion request is immediately granted if the * requested mode is compatible with the modes of all other granted * locks
*/
if (queue_conflict(&r->res_convertqueue, lkb)) return 0;
/* * The RECOVER_GRANT flag means dlm_recover_grant() is granting * locks for a recovered rsb, on which lkb's have been rebuilt. * The lkb's may have been rebuilt on the queues in a different * order than they were in on the previous master. So, granting * queued conversions in order after recovery doesn't make sense * since the order hasn't been preserved anyway. The new order * could also have created a new "in place" conversion deadlock. * (e.g. old, failed master held granted EX, with PR->EX, NL->EX.
--> --------------------
Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.
Bemerkung:
Die farbliche Syntaxdarstellung und die Messung sind noch experimentell.