// SPDX-License-Identifier: GPL-2.0-or-later /* * dlmast.c * * AST and BAST functionality for local and remote nodes * * Copyright (C) 2004 Oracle. All rights reserved.
*/
/* Should be called as an ast gets queued to see if the new * lock level will obsolete a pending bast. * For example, if dlm_thread queued a bast for an EX lock that * was blocking another EX, but before sending the bast the * lock owner downconverted to NL, the bast is now obsolete. * Only the ast should be sent. * This is needed because the lock and convert paths can queue * asts out-of-band (not waiting for dlm_thread) in order to
* allow for LKM_NOQUEUE to get immediate responses. */ staticint dlm_should_cancel_bast(struct dlm_ctxt *dlm, struct dlm_lock *lock)
{
assert_spin_locked(&dlm->ast_lock);
assert_spin_locked(&lock->spinlock);
if (lock->ml.highest_blocked == LKM_IVMODE) return 0;
BUG_ON(lock->ml.highest_blocked == LKM_NLMODE);
if (lock->bast_pending &&
list_empty(&lock->bast_list)) /* old bast already sent, ok */ return 0;
if (lock->ml.type == LKM_EXMODE) /* EX blocks anything left, any bast still valid */ return 0; elseif (lock->ml.type == LKM_NLMODE) /* NL blocks nothing, no reason to send any bast, cancel it */ return 1; elseif (lock->ml.highest_blocked != LKM_EXMODE) /* PR only blocks EX */ return 1;
if (!list_empty(&lock->ast_list)) {
mlog(ML_ERROR, "%s: res %.*s, lock %u:%llu, " "AST list not empty, pending %d, newlevel %d\n",
dlm->name, res->lockname.len, res->lockname.name,
dlm_get_lock_cookie_node(be64_to_cpu(lock->ml.cookie)),
dlm_get_lock_cookie_seq(be64_to_cpu(lock->ml.cookie)),
lock->ast_pending, lock->ml.type);
BUG();
} if (lock->ast_pending)
mlog(0, "%s: res %.*s, lock %u:%llu, AST getting flushed\n",
dlm->name, res->lockname.len, res->lockname.name,
dlm_get_lock_cookie_node(be64_to_cpu(lock->ml.cookie)),
dlm_get_lock_cookie_seq(be64_to_cpu(lock->ml.cookie)));
/* putting lock on list, add a ref */
dlm_lock_get(lock);
spin_lock(&lock->spinlock);
/* check to see if this ast obsoletes the bast */ if (dlm_should_cancel_bast(dlm, lock)) {
mlog(0, "%s: res %.*s, lock %u:%llu, Cancelling BAST\n",
dlm->name, res->lockname.len, res->lockname.name,
dlm_get_lock_cookie_node(be64_to_cpu(lock->ml.cookie)),
dlm_get_lock_cookie_seq(be64_to_cpu(lock->ml.cookie)));
lock->bast_pending = 0;
list_del_init(&lock->bast_list);
lock->ml.highest_blocked = LKM_IVMODE; /* removing lock from list, remove a ref. guaranteed * this won't be the last ref because of the get above,
* so res->spinlock will not be taken here */
dlm_lock_put(lock); /* free up the reserved bast that we are cancelling. * guaranteed that this will not be the last reserved * ast because *both* an ast and a bast were reserved * to get to this point. the res->spinlock will not be
* taken here */
dlm_lockres_release_ast(dlm, res);
}
list_add_tail(&lock->ast_list, &dlm->pending_asts);
lock->ast_pending = 1;
spin_unlock(&lock->spinlock);
}
/* only updates if this node masters the lockres */
spin_lock(&res->spinlock); if (res->owner == dlm->node_num) { /* check the lksb flags for the direction */ if (lksb->flags & DLM_LKSB_GET_LVB) {
mlog(0, "getting lvb from lockres for %s node\n",
lock->ml.node == dlm->node_num ? "master" : "remote");
memcpy(lksb->lvb, res->lvb, DLM_LVB_LEN);
} /* Do nothing for lvb put requests - they should be done in * place when the lock is downconverted - otherwise we risk * racing gets and puts which could result in old lvb data * being propagated. We leave the put flag set and clear it * here. In the future we might want to clear it at the time * the put is actually done.
*/
}
spin_unlock(&res->spinlock);
/* reset any lvb flags on the lksb */
lksb->flags &= ~(DLM_LKSB_PUT_LVB|DLM_LKSB_GET_LVB);
}
if (locklen > DLM_LOCKID_NAME_MAX) {
ret = DLM_IVBUFLEN;
mlog(ML_ERROR, "Invalid name length (%d) in proxy ast " "handler!\n", locklen); goto leave;
}
if ((flags & (LKM_PUT_LVB|LKM_GET_LVB)) ==
(LKM_PUT_LVB|LKM_GET_LVB)) {
mlog(ML_ERROR, "Both PUT and GET lvb specified, (0x%x)\n",
flags);
ret = DLM_BADARGS; goto leave;
}
res = dlm_lookup_lockres(dlm, name, locklen); if (!res) {
mlog(0, "Got %sast for unknown lockres! cookie=%u:%llu, " "name=%.*s, node=%u\n", (past->type == DLM_AST ? "" : "b"),
dlm_get_lock_cookie_node(be64_to_cpu(cookie)),
dlm_get_lock_cookie_seq(be64_to_cpu(cookie)),
locklen, name, node);
ret = DLM_IVLOCKID; goto leave;
}
/* cannot get a proxy ast message if this node owns it */
BUG_ON(res->owner == dlm->node_num);
mlog(0, "%s: res %.*s\n", dlm->name, res->lockname.len,
res->lockname.name);
spin_lock(&res->spinlock); if (res->state & DLM_LOCK_RES_RECOVERING) {
mlog(0, "Responding with DLM_RECOVERING!\n");
ret = DLM_RECOVERING; goto unlock_out;
} if (res->state & DLM_LOCK_RES_MIGRATING) {
mlog(0, "Responding with DLM_MIGRATING!\n");
ret = DLM_MIGRATING; goto unlock_out;
} /* try convert queue for both ast/bast */
head = &res->converting;
lock = NULL;
list_for_each_entry(lock, head, list) { if (lock->ml.cookie == cookie) goto do_ast;
}
/* if not on convert, try blocked for ast, granted for bast */ if (past->type == DLM_AST)
head = &res->blocked; else
head = &res->granted;
list_for_each_entry(lock, head, list) { /* if lock is found but unlock is pending ignore the bast */ if (lock->ml.cookie == cookie) { if (lock->unlock_pending) break; goto do_ast;
}
}
ret = DLM_NORMAL;
unlock_out:
spin_unlock(&res->spinlock); goto leave;
do_ast:
ret = DLM_NORMAL; if (past->type == DLM_AST) { /* do not alter lock refcount. switching lists. */
list_move_tail(&lock->list, &res->granted);
mlog(0, "%s: res %.*s, lock %u:%llu, Granted type %d => %d\n",
dlm->name, res->lockname.len, res->lockname.name,
dlm_get_lock_cookie_node(be64_to_cpu(cookie)),
dlm_get_lock_cookie_seq(be64_to_cpu(cookie)),
lock->ml.type, lock->ml.convert_type);
if (lock->ml.convert_type != LKM_IVMODE) {
lock->ml.type = lock->ml.convert_type;
lock->ml.convert_type = LKM_IVMODE;
} else { // should already be there....
}
lock->lksb->status = DLM_NORMAL;
/* if we requested the lvb, fetch it into our lksb now */ if (flags & LKM_GET_LVB) {
BUG_ON(!(lock->lksb->flags & DLM_LKSB_GET_LVB));
memcpy(lock->lksb->lvb, past->lvb, DLM_LVB_LEN);
}
}
spin_unlock(&res->spinlock);
int dlm_send_proxy_ast_msg(struct dlm_ctxt *dlm, struct dlm_lock_resource *res, struct dlm_lock *lock, int msg_type, int blocked_type, int flags)
{ int ret = 0; struct dlm_proxy_ast past; struct kvec vec[2];
size_t veclen = 1; int status;
mlog(0, "%s: res %.*s, to %u, type %d, blocked_type %d\n", dlm->name,
res->lockname.len, res->lockname.name, lock->ml.node, msg_type,
blocked_type);
Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.
Bemerkung:
Die farbliche Syntaxdarstellung und die Messung sind noch experimentell.