/* * A cluster of MDS (metadata server) daemons is responsible for * managing the file system namespace (the directory hierarchy and * inodes) and for coordinating shared access to storage. Metadata is * partitioning hierarchically across a number of servers, and that * partition varies over time as the cluster adjusts the distribution * in order to balance load. * * The MDS client is primarily responsible to managing synchronous * metadata requests for operations like open, unlink, and so forth. * If there is a MDS failure, we find out about it when we (possibly * request and) receive a new MDS map, and can resubmit affected * requests. * * For the most part, though, we take advantage of a lossless * communications channel to the MDS, and do not need to worry about * timing out or resubmitting requests. * * We maintain a stateful "session" with each MDS we interact with. * Within each session, we sent periodic heartbeat messages to ensure * any capabilities or leases we have been issues remain valid. If * the session times out and goes stale, our leases and capabilities * are no longer valid.
*/
/* * parse a normal reply, which may contain a (dir+)dentry and/or a * target inode.
*/ staticint parse_reply_info_trace(void **p, void *end, struct ceph_mds_reply_info_parsed *info,
u64 features)
{ int err;
if (info->head->is_dentry) {
err = parse_reply_info_in(p, end, &info->diri, features); if (err < 0) goto out_bad;
/* * Try to dencrypt the dentry names and update them * in the ceph_mds_reply_dir_entry struct.
*/
fname.dir = inode;
fname.name = _name;
fname.name_len = _name_len;
fname.ctext = altname;
fname.ctext_len = altname_len; /* * The _name_len maybe larger than altname_len, such as * when the human readable name length is in range of * (CEPH_NOHASH_NAME_MAX, CEPH_NOHASH_NAME_MAX + SHA256_DIGEST_SIZE), * then the copy in ceph_fname_to_usr will corrupt the * data if there has no encryption key. * * Just set the no_copy flag and then if there has no * encryption key the oname.name will be assigned to * _name always.
*/
fname.no_copy = true; if (altname_len == 0) { /* * Set tname to _name, and this will be used * to do the base64_decode in-place. It's * safe because the decoded string should * always be shorter, which is 3/4 of origin * string.
*/
tname.name = _name;
/* * Set oname to _name too, and this will be * used to do the dencryption in-place.
*/
oname.name = _name;
oname.len = _name_len;
} else { /* * This will do the decryption only in-place * from altname cryptext directly.
*/
oname.name = altname;
oname.len = altname_len;
}
rde->is_nokey = false;
err = ceph_fname_to_usr(&fname, &tname, &oname, &rde->is_nokey); if (err) {
pr_err_client(cl, "unable to decode %.*s, got %d\n",
_name_len, _name, err); goto out_bad;
}
rde->name = oname.name;
rde->name_len = oname.len;
/* inode */
err = parse_reply_info_in(p, end, &rde->inode, features); if (err < 0) goto out_bad; /* ceph_readdir_prepopulate() will update it */
rde->offset = 0;
i++;
num--;
}
done: /* Skip over any unrecognized fields */
*p = end; return 0;
/* * In async unlink case the kclient won't wait for the first reply * from MDS and just drop all the links and unhash the dentry and then * succeeds immediately. * * For any new create/link/rename,etc requests followed by using the * same file names we must wait for the first reply of the inflight * unlink request, or the MDS possibly will fail these following * requests with -EEXIST if the inflight async unlink request was * delayed for some reasons. * * And the worst case is that for the none async openc request it will * successfully open the file if the CDentry hasn't been unlinked yet, * but later the previous delayed async unlink request will remove the * CDentry. That means the just created file is possibly deleted later * by accident. * * We need to wait for the inflight async unlink requests to finish * when creating new files/directories by using the same file names.
*/ int ceph_wait_on_conflict_unlink(struct dentry *dentry)
{ struct ceph_fs_client *fsc = ceph_sb_to_fs_client(dentry->d_sb); struct ceph_client *cl = fsc->client; struct dentry *pdentry = dentry->d_parent; struct dentry *udentry, *found = NULL; struct ceph_dentry_info *di; struct qstr dname;
u32 hash = dentry->d_name.hash; int err;
/* * sessions
*/ constchar *ceph_session_state_name(int s)
{ switch (s) { case CEPH_MDS_SESSION_NEW: return"new"; case CEPH_MDS_SESSION_OPENING: return"opening"; case CEPH_MDS_SESSION_OPEN: return"open"; case CEPH_MDS_SESSION_HUNG: return"hung"; case CEPH_MDS_SESSION_CLOSING: return"closing"; case CEPH_MDS_SESSION_CLOSED: return"closed"; case CEPH_MDS_SESSION_RESTARTING: return"restarting"; case CEPH_MDS_SESSION_RECONNECTING: return"reconnecting"; case CEPH_MDS_SESSION_REJECTED: return"rejected"; default: return"???";
}
}
/* * create+register a new session for given mds. * called under mdsc->mutex.
*/ staticstruct ceph_mds_session *register_session(struct ceph_mds_client *mdsc, int mds)
{ struct ceph_client *cl = mdsc->fsc->client; struct ceph_mds_session *s;
if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_FENCE_IO) return ERR_PTR(-EIO);
if (mds >= mdsc->mdsmap->possible_max_rank) return ERR_PTR(-EINVAL);
s = kzalloc(sizeof(*s), GFP_NOFS); if (!s) return ERR_PTR(-ENOMEM);
if (mds >= mdsc->max_sessions) { int newmax = 1 << get_count_order(mds + 1); struct ceph_mds_session **sa;
doutc(cl, "realloc to %d\n", newmax);
sa = kcalloc(newmax, sizeof(void *), GFP_NOFS); if (!sa) goto fail_realloc; if (mdsc->sessions) {
memcpy(sa, mdsc->sessions,
mdsc->max_sessions * sizeof(void *));
kfree(mdsc->sessions);
}
mdsc->sessions = sa;
mdsc->max_sessions = newmax;
}
/* * drop session refs in request. * * should be last request ref, or hold mdsc->mutex
*/ staticvoid put_request_session(struct ceph_mds_request *req)
{ if (req->r_session) {
ceph_put_mds_session(req->r_session);
req->r_session = NULL;
}
}
void ceph_mdsc_release_request(struct kref *kref)
{ struct ceph_mds_request *req = container_of(kref, struct ceph_mds_request,
r_kref);
ceph_mdsc_release_dir_caps_async(req);
destroy_reply_info(&req->r_reply_info); if (req->r_request)
ceph_msg_put(req->r_request); if (req->r_reply)
ceph_msg_put(req->r_reply); if (req->r_inode) {
ceph_put_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
iput(req->r_inode);
} if (req->r_parent) {
ceph_put_cap_refs(ceph_inode(req->r_parent), CEPH_CAP_PIN);
iput(req->r_parent);
}
iput(req->r_target_inode);
iput(req->r_new_inode); if (req->r_dentry)
dput(req->r_dentry); if (req->r_old_dentry)
dput(req->r_old_dentry); if (req->r_old_dentry_dir) { /* * track (and drop pins for) r_old_dentry_dir * separately, since r_old_dentry's d_parent may have * changed between the dir mutex being dropped and * this request being freed.
*/
ceph_put_cap_refs(ceph_inode(req->r_old_dentry_dir),
CEPH_CAP_PIN);
iput(req->r_old_dentry_dir);
}
kfree(req->r_path1);
kfree(req->r_path2);
put_cred(req->r_cred); if (req->r_mnt_idmap)
mnt_idmap_put(req->r_mnt_idmap); if (req->r_pagelist)
ceph_pagelist_release(req->r_pagelist);
kfree(req->r_fscrypt_auth);
kfree(req->r_altname);
put_request_session(req);
ceph_unreserve_caps(req->r_mdsc, &req->r_caps_reservation);
WARN_ON_ONCE(!list_empty(&req->r_wait));
kmem_cache_free(ceph_mds_request_cachep, req);
}
/* * lookup session, bump ref if found. * * called under mdsc->mutex.
*/ staticstruct ceph_mds_request *
lookup_get_request(struct ceph_mds_client *mdsc, u64 tid)
{ struct ceph_mds_request *req;
req = lookup_request(&mdsc->request_tree, tid); if (req)
ceph_mdsc_get_request(req);
return req;
}
/* * Register an in-flight request, and assign a tid. Link to directory * are modifying (if any). * * Called under mdsc->mutex.
*/ staticvoid __register_request(struct ceph_mds_client *mdsc, struct ceph_mds_request *req, struct inode *dir)
{ struct ceph_client *cl = mdsc->fsc->client; int ret = 0;
req->r_tid = ++mdsc->last_tid; if (req->r_num_caps) {
ret = ceph_reserve_caps(mdsc, &req->r_caps_reservation,
req->r_num_caps); if (ret < 0) {
pr_err_client(cl, "%p failed to reserve caps: %d\n",
req, ret); /* set req->r_err to fail early from __do_request */
req->r_err = ret; return;
}
}
doutc(cl, "%p tid %lld\n", req, req->r_tid);
ceph_mdsc_get_request(req);
insert_request(&mdsc->request_tree, req);
req->r_cred = get_current_cred(); if (!req->r_mnt_idmap)
req->r_mnt_idmap = &nop_mnt_idmap;
if (req->r_unsafe_dir) {
iput(req->r_unsafe_dir);
req->r_unsafe_dir = NULL;
}
complete_all(&req->r_safe_completion);
ceph_mdsc_put_request(req);
}
/* * Walk back up the dentry tree until we hit a dentry representing a * non-snapshot inode. We do this using the rcu_read_lock (which must be held * when calling this) to ensure that the objects won't disappear while we're * working with them. Once we hit a candidate dentry, we attempt to take a * reference to it, and return that as the result.
*/ staticstruct inode *get_nonsnap_parent(struct dentry *dentry)
{ struct inode *inode = NULL;
while (dentry && !IS_ROOT(dentry)) {
inode = d_inode_rcu(dentry); if (!inode || ceph_snap(inode) == CEPH_NOSNAP) break;
dentry = dentry->d_parent;
} if (inode)
inode = igrab(inode); return inode;
}
/* * Choose mds to send request to next. If there is a hint set in the * request (e.g., due to a prior forward hint from the mds), use that. * Otherwise, consult frag tree and/or caps to identify the * appropriate mds. If all else fails, choose randomly. * * Called under mdsc->mutex.
*/ staticint __choose_mds(struct ceph_mds_client *mdsc, struct ceph_mds_request *req, bool *random)
{ struct inode *inode; struct ceph_inode_info *ci; struct ceph_cap *cap; int mode = req->r_direct_mode; int mds = -1;
u32 hash = req->r_direct_hash; bool is_hash = test_bit(CEPH_MDS_R_DIRECT_IS_HASH, &req->r_req_flags); struct ceph_client *cl = mdsc->fsc->client;
if (random)
*random = false;
/* * is there a specific mds we should try? ignore hint if we have * no session and the mds is not up (active or recovering).
*/ if (req->r_resend_mds >= 0 &&
(__have_session(mdsc, req->r_resend_mds) ||
ceph_mdsmap_get_state(mdsc->mdsmap, req->r_resend_mds) > 0)) {
doutc(cl, "using resend_mds mds%d\n", req->r_resend_mds); return req->r_resend_mds;
}
if (mode == USE_RANDOM_MDS) goto random;
inode = NULL; if (req->r_inode) { if (ceph_snap(req->r_inode) != CEPH_SNAPDIR) {
inode = req->r_inode;
ihold(inode);
} else { /* req->r_dentry is non-null for LSSNAP request */
rcu_read_lock();
inode = get_nonsnap_parent(req->r_dentry);
rcu_read_unlock();
doutc(cl, "using snapdir's parent %p %llx.%llx\n",
inode, ceph_vinop(inode));
}
} elseif (req->r_dentry) { /* ignore race with rename; old or new d_parent is okay */ struct dentry *parent; struct inode *dir;
rcu_read_lock();
parent = READ_ONCE(req->r_dentry->d_parent);
dir = req->r_parent ? : d_inode_rcu(parent);
if (!dir || dir->i_sb != mdsc->fsc->sb) { /* not this fs or parent went negative */
inode = d_inode(req->r_dentry); if (inode)
ihold(inode);
} elseif (ceph_snap(dir) != CEPH_NOSNAP) { /* direct snapped/virtual snapdir requests
* based on parent dir inode */
inode = get_nonsnap_parent(parent);
doutc(cl, "using nonsnap parent %p %llx.%llx\n",
inode, ceph_vinop(inode));
} else { /* dentry target */
inode = d_inode(req->r_dentry); if (!inode || mode == USE_AUTH_MDS) { /* dir + name */
inode = igrab(dir);
hash = ceph_dentry_hash(dir, req->r_dentry);
is_hash = true;
} else {
ihold(inode);
}
}
rcu_read_unlock();
}
if (is_hash && S_ISDIR(inode->i_mode)) { struct ceph_inode_frag frag; int found;
ceph_choose_frag(ci, hash, &frag, &found); if (found) { if (mode == USE_ANY_MDS && frag.ndist > 0) {
u8 r;
/* choose a random replica */
get_random_bytes(&r, 1);
r %= frag.ndist;
mds = frag.dist[r];
doutc(cl, "%p %llx.%llx frag %u mds%d (%d/%d)\n",
inode, ceph_vinop(inode), frag.frag,
mds, (int)r, frag.ndist); if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >=
CEPH_MDS_STATE_ACTIVE &&
!ceph_mdsmap_is_laggy(mdsc->mdsmap, mds)) goto out;
}
/* since this file/dir wasn't known to be * replicated, then we want to look for the
* authoritative mds. */ if (frag.mds >= 0) { /* choose auth mds */
mds = frag.mds;
doutc(cl, "%p %llx.%llx frag %u mds%d (auth)\n",
inode, ceph_vinop(inode), frag.frag, mds); if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >=
CEPH_MDS_STATE_ACTIVE) { if (!ceph_mdsmap_is_laggy(mdsc->mdsmap,
mds)) goto out;
}
}
mode = USE_AUTH_MDS;
}
}
spin_lock(&ci->i_ceph_lock);
cap = NULL; if (mode == USE_AUTH_MDS)
cap = ci->i_auth_cap; if (!cap && !RB_EMPTY_ROOT(&ci->i_caps))
cap = rb_entry(rb_first(&ci->i_caps), struct ceph_cap, ci_node); if (!cap) {
spin_unlock(&ci->i_ceph_lock);
iput(inode); goto random;
}
mds = cap->session->s_mds;
doutc(cl, "%p %llx.%llx mds%d (%scap %p)\n", inode,
ceph_vinop(inode), mds,
cap == ci->i_auth_cap ? "auth " : "", cap);
spin_unlock(&ci->i_ceph_lock);
out:
iput(inode); return mds;
/* Allocate the message */
msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h) + extra_bytes,
GFP_NOFS, false); if (!msg) {
pr_err_client(cl, "ENOMEM creating session open msg\n"); return ERR_PTR(-ENOMEM);
}
p = msg->front.iov_base;
end = p + msg->front.iov_len;
h = p;
h->op = cpu_to_le32(op);
h->seq = cpu_to_le64(seq);
/* * Serialize client metadata into waiting buffer space, using * the format that userspace expects for map<string, string> * * ClientSession messages with metadata are v7
*/
msg->hdr.version = cpu_to_le16(7);
msg->hdr.compat_version = cpu_to_le16(1);
/* The write pointer, following the session_head structure */
p += sizeof(*h);
/* Number of entries in the map */
ceph_encode_32(&p, metadata_key_count);
/* Two length-prefixed strings for each entry in the map */ for (i = 0; metadata[i][0]; ++i) {
size_t const key_len = strlen(metadata[i][0]);
size_t const val_len = strlen(metadata[i][1]);
ceph_encode_32(&p, key_len);
memcpy(p, metadata[i][0], key_len);
p += key_len;
ceph_encode_32(&p, val_len);
memcpy(p, metadata[i][1], val_len);
p += val_len;
}
ret = encode_supported_features(&p, end); if (ret) {
pr_err_client(cl, "encode_supported_features failed!\n");
ceph_msg_put(msg); return ERR_PTR(ret);
}
ret = encode_metric_spec(&p, end); if (ret) {
pr_err_client(cl, "encode_metric_spec failed!\n");
ceph_msg_put(msg); return ERR_PTR(ret);
}
/* version == 5, flags */
ceph_encode_32(&p, 0);
/* version == 6, mds auth caps */
ceph_encode_32(&p, 0);
/* version == 7, oldest_client_tid */
ceph_encode_64(&p, mdsc->oldest_tid);
msg->front.iov_len = p - msg->front.iov_base;
msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
return msg;
}
/* * send session open request. * * called under mdsc->mutex
*/ staticint __open_session(struct ceph_mds_client *mdsc, struct ceph_mds_session *session)
{ struct ceph_msg *msg; int mstate; int mds = session->s_mds;
if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_FENCE_IO) return -EIO;
/* wait for mds to go active? */
mstate = ceph_mdsmap_get_state(mdsc->mdsmap, mds);
doutc(mdsc->fsc->client, "open_session to mds%d (%s)\n", mds,
ceph_mds_state_name(mstate));
session->s_state = CEPH_MDS_SESSION_OPENING;
session->s_renew_requested = jiffies;
/* * open sessions for any export targets for the given mds * * called under mdsc->mutex
*/ staticstruct ceph_mds_session *
__open_export_target_session(struct ceph_mds_client *mdsc, int target)
{ struct ceph_mds_session *session; int ret;
session = __ceph_lookup_mds_session(mdsc, target); if (!session) {
session = register_session(mdsc, target); if (IS_ERR(session)) return session;
} if (session->s_state == CEPH_MDS_SESSION_NEW ||
session->s_state == CEPH_MDS_SESSION_CLOSING) {
ret = __open_session(mdsc, session); if (ret) return ERR_PTR(ret);
}
doutc(cl, "mds%d\n", session->s_mds);
mutex_lock(&mdsc->mutex); while (!list_empty(&session->s_unsafe)) {
req = list_first_entry(&session->s_unsafe, struct ceph_mds_request, r_unsafe_item);
pr_warn_ratelimited_client(cl, " dropping unsafe request %llu\n",
req->r_tid); if (req->r_target_inode)
mapping_set_error(req->r_target_inode->i_mapping, -EIO); if (req->r_unsafe_dir)
mapping_set_error(req->r_unsafe_dir->i_mapping, -EIO);
__unregister_request(mdsc, req);
} /* zero r_attempts, so kick_requests() will re-send requests */
p = rb_first(&mdsc->request_tree); while (p) {
req = rb_entry(p, struct ceph_mds_request, r_node);
p = rb_next(p); if (req->r_session &&
req->r_session->s_mds == session->s_mds)
req->r_attempts = 0;
}
mutex_unlock(&mdsc->mutex);
}
/* * Helper to safely iterate over all caps associated with a session, with * special care taken to handle a racing __ceph_remove_cap(). * * Caller must hold session s_mutex.
*/ int ceph_iterate_session_caps(struct ceph_mds_session *session, int (*cb)(struct inode *, int mds, void *), void *arg)
{ struct ceph_client *cl = session->s_mdsc->fsc->client; struct list_head *p; struct ceph_cap *cap; struct inode *inode, *last_inode = NULL; struct ceph_cap *old_cap = NULL; int ret;
doutc(cl, "%p mds%d\n", session, session->s_mds);
spin_lock(&session->s_cap_lock);
p = session->s_caps.next; while (p != &session->s_caps) { int mds;
cap = list_entry(p, struct ceph_cap, session_caps);
inode = igrab(&cap->ci->netfs.inode); if (!inode) {
p = p->next; continue;
}
session->s_cap_iterator = cap;
mds = cap->mds;
spin_unlock(&session->s_cap_lock);
if (last_inode) {
iput(last_inode);
last_inode = NULL;
} if (old_cap) {
ceph_put_cap(session->s_mdsc, old_cap);
old_cap = NULL;
}
ret = cb(inode, mds, arg);
last_inode = inode;
spin_lock(&session->s_cap_lock);
p = p->next; if (!cap->ci) {
doutc(cl, "finishing cap %p removal\n", cap);
BUG_ON(cap->session != session);
cap->session = NULL;
list_del_init(&cap->session_caps);
session->s_nr_caps--;
atomic64_dec(&session->s_mdsc->metric.total_caps); if (cap->queue_release)
__ceph_queue_cap_release(session, cap); else
old_cap = cap; /* put_cap it w/o locks held */
} if (ret < 0) goto out;
}
ret = 0;
out:
session->s_cap_iterator = NULL;
spin_unlock(&session->s_cap_lock);
iput(last_inode); if (old_cap)
ceph_put_cap(session->s_mdsc, old_cap);
spin_lock(&ci->i_ceph_lock);
cap = __get_cap_for_mds(ci, mds); if (cap) {
doutc(cl, " removing cap %p, ci is %p, inode is %p\n",
cap, ci, &ci->netfs.inode);
spin_lock(&session->s_cap_lock); if (session->s_nr_caps > 0) { struct inode *inode; struct ceph_cap *cap, *prev = NULL; struct ceph_vino vino; /* * iterate_session_caps() skips inodes that are being * deleted, we need to wait until deletions are complete. * __wait_on_freeing_inode() is designed for the job, * but it is not exported, so use lookup inode function * to access it.
*/ while (!list_empty(&session->s_caps)) {
cap = list_entry(session->s_caps.next, struct ceph_cap, session_caps); if (cap == prev) break;
prev = cap;
vino = cap->ci->i_vino;
spin_unlock(&session->s_cap_lock);
inode = ceph_find_inode(sb, vino);
iput(inode);
spin_lock(&session->s_cap_lock);
}
}
// drop cap expires and unlock s_cap_lock
detach_cap_releases(session, &dispose);
/* * wake up any threads waiting on this session's caps. if the cap is * old (didn't get renewed on the client reconnect), remove it now. * * caller must hold s_mutex.
*/ staticint wake_up_session_cb(struct inode *inode, int mds, void *arg)
{ struct ceph_inode_info *ci = ceph_inode(inode); unsignedlong ev = (unsignedlong)arg;
/* * Send periodic message to MDS renewing all currently held caps. The * ack will reset the expiration for all caps from this session. * * caller holds s_mutex
*/ staticint send_renew_caps(struct ceph_mds_client *mdsc, struct ceph_mds_session *session)
{ struct ceph_client *cl = mdsc->fsc->client; struct ceph_msg *msg; int state;
/* do not try to renew caps until a recovering mds has reconnected
* with its clients. */
state = ceph_mdsmap_get_state(mdsc->mdsmap, session->s_mds); if (state < CEPH_MDS_STATE_RECONNECT) {
doutc(cl, "ignoring mds%d (%s)\n", session->s_mds,
ceph_mds_state_name(state)); return 0;
}
/* * Note new cap ttl, and any transition from stale -> not stale (fresh?). * * Called under session->s_mutex
*/ staticvoid renewed_caps(struct ceph_mds_client *mdsc, struct ceph_mds_session *session, int is_renew)
{ struct ceph_client *cl = mdsc->fsc->client; int was_stale; int wake = 0;
if (all_negative)
shrink_dcache_parent(dentry);
out: return all_negative;
}
/* * Trim old(er) caps. * * Because we can't cache an inode without one or more caps, we do * this indirectly: if a cap is unused, we prune its aliases, at which * point the inode will hopefully get dropped to. * * Yes, this is a bit sloppy. Our only real goal here is to respond to * memory pressure from the MDS, though, so it needn't be perfect.
*/ staticint trim_caps_cb(struct inode *inode, int mds, void *arg)
{ struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(inode->i_sb); struct ceph_client *cl = mdsc->fsc->client; int *remaining = arg; struct ceph_inode_info *ci = ceph_inode(inode); int used, wanted, oissued, mine; struct ceph_cap *cap;
if (*remaining <= 0) return -1;
spin_lock(&ci->i_ceph_lock);
cap = __get_cap_for_mds(ci, mds); if (!cap) {
spin_unlock(&ci->i_ceph_lock); return 0;
}
mine = cap->issued | cap->implemented;
used = __ceph_caps_used(ci);
wanted = __ceph_caps_file_wanted(ci);
oissued = __ceph_caps_issued_other(ci, cap);
doutc(cl, "%p %llx.%llx cap %p mine %s oissued %s used %s wanted %s\n",
inode, ceph_vinop(inode), cap, ceph_cap_string(mine),
ceph_cap_string(oissued), ceph_cap_string(used),
ceph_cap_string(wanted)); if (cap == ci->i_auth_cap) { if (ci->i_dirty_caps || ci->i_flushing_caps ||
!list_empty(&ci->i_cap_snaps)) goto out; if ((used | wanted) & CEPH_CAP_ANY_WR) goto out; /* Note: it's possible that i_filelock_ref becomes non-zero * after dropping auth caps. It doesn't hurt because reply
* of lock mds request will re-add auth caps. */ if (atomic_read(&ci->i_filelock_ref) > 0) goto out;
} /* The inode has cached pages, but it's no longer used.
* we can safely drop it */ if (S_ISREG(inode->i_mode) &&
wanted == 0 && used == CEPH_CAP_FILE_CACHE &&
!(oissued & CEPH_CAP_FILE_CACHE)) {
used = 0;
oissued = 0;
} if ((used | wanted) & ~oissued & mine) goto out; /* we need these caps */
if (oissued) { /* we aren't the only cap.. just remove us */
ceph_remove_cap(mdsc, cap, true);
(*remaining)--;
} else { struct dentry *dentry; /* try dropping referring dentries */
spin_unlock(&ci->i_ceph_lock);
dentry = d_find_any_alias(inode); if (dentry && drop_negative_children(dentry)) { int count;
dput(dentry);
d_prune_aliases(inode);
count = atomic_read(&inode->i_count); if (count == 1)
(*remaining)--;
doutc(cl, "%p %llx.%llx cap %p pruned, count now %d\n",
inode, ceph_vinop(inode), cap, count);
} else {
dput(dentry);
} return 0;
}
out:
spin_unlock(&ci->i_ceph_lock); return 0;
}
/* * Trim session cap count down to some max number.
*/ int ceph_trim_caps(struct ceph_mds_client *mdsc, struct ceph_mds_session *session, int max_caps)
{ struct ceph_client *cl = mdsc->fsc->client; int trim_caps = session->s_nr_caps - max_caps;
doutc(cl, "mds%d start: %d / %d, trim %d\n", session->s_mds,
session->s_nr_caps, max_caps, trim_caps); if (trim_caps > 0) { int remaining = trim_caps;
Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.
Bemerkung:
Die farbliche Syntaxdarstellung und die Messung sind noch experimentell.