Quellcodebibliothek Statistik Leitseite products/Sources/formale Sprachen/C/Linux/fs/ceph/   (Open Source Betriebssystem Version 6.17.9©)  Datei vom 24.10.2025 mit Größe 170 kB image not shown  

Quelle  mds_client.c   Sprache: C

 
// SPDX-License-Identifier: GPL-2.0
#include <linux/ceph/ceph_debug.h>

#include <linux/fs.h>
#include <linux/wait.h>
#include <linux/slab.h>
#include <linux/gfp.h>
#include <linux/sched.h>
#include <linux/debugfs.h>
#include <linux/seq_file.h>
#include <linux/ratelimit.h>
#include <linux/bits.h>
#include <linux/ktime.h>
#include <linux/bitmap.h>
#include <linux/mnt_idmapping.h>

#include "super.h"
#include "mds_client.h"
#include "crypto.h"

#include <linux/ceph/ceph_features.h>
#include <linux/ceph/messenger.h>
#include <linux/ceph/decode.h>
#include <linux/ceph/pagelist.h>
#include <linux/ceph/auth.h>
#include <linux/ceph/debugfs.h>

#define RECONNECT_MAX_SIZE (INT_MAX - PAGE_SIZE)

/*
 * A cluster of MDS (metadata server) daemons is responsible for
 * managing the file system namespace (the directory hierarchy and
 * inodes) and for coordinating shared access to storage.  Metadata is
 * partitioning hierarchically across a number of servers, and that
 * partition varies over time as the cluster adjusts the distribution
 * in order to balance load.
 *
 * The MDS client is primarily responsible to managing synchronous
 * metadata requests for operations like open, unlink, and so forth.
 * If there is a MDS failure, we find out about it when we (possibly
 * request and) receive a new MDS map, and can resubmit affected
 * requests.
 *
 * For the most part, though, we take advantage of a lossless
 * communications channel to the MDS, and do not need to worry about
 * timing out or resubmitting requests.
 *
 * We maintain a stateful "session" with each MDS we interact with.
 * Within each session, we sent periodic heartbeat messages to ensure
 * any capabilities or leases we have been issues remain valid.  If
 * the session times out and goes stale, our leases and capabilities
 * are no longer valid.
 */


struct ceph_reconnect_state {
 struct ceph_mds_session *session;
 int nr_caps, nr_realms;
 struct ceph_pagelist *pagelist;
 unsigned msg_version;
 bool allow_multi;
};

static void __wake_requests(struct ceph_mds_client *mdsc,
       struct list_head *head);
static void ceph_cap_release_work(struct work_struct *work);
static void ceph_cap_reclaim_work(struct work_struct *work);

static const struct ceph_connection_operations mds_con_ops;


/*
 * mds reply parsing
 */


static int parse_reply_info_quota(void **p, void *end,
      struct ceph_mds_reply_info_in *info)
{
 u8 struct_v, struct_compat;
 u32 struct_len;

 ceph_decode_8_safe(p, end, struct_v, bad);
 ceph_decode_8_safe(p, end, struct_compat, bad);
 /* struct_v is expected to be >= 1. we only
 * understand encoding with struct_compat == 1. */

 if (!struct_v || struct_compat != 1)
  goto bad;
 ceph_decode_32_safe(p, end, struct_len, bad);
 ceph_decode_need(p, end, struct_len, bad);
 end = *p + struct_len;
 ceph_decode_64_safe(p, end, info->max_bytes, bad);
 ceph_decode_64_safe(p, end, info->max_files, bad);
 *p = end;
 return 0;
bad:
 return -EIO;
}

/*
 * parse individual inode info
 */

static int parse_reply_info_in(void **p, void *end,
          struct ceph_mds_reply_info_in *info,
          u64 features)
{
 int err = 0;
 u8 struct_v = 0;

 if (features == (u64)-1) {
  u32 struct_len;
  u8 struct_compat;
  ceph_decode_8_safe(p, end, struct_v, bad);
  ceph_decode_8_safe(p, end, struct_compat, bad);
  /* struct_v is expected to be >= 1. we only understand
 * encoding with struct_compat == 1. */

  if (!struct_v || struct_compat != 1)
   goto bad;
  ceph_decode_32_safe(p, end, struct_len, bad);
  ceph_decode_need(p, end, struct_len, bad);
  end = *p + struct_len;
 }

 ceph_decode_need(p, end, sizeof(struct ceph_mds_reply_inode), bad);
 info->in = *p;
 *p += sizeof(struct ceph_mds_reply_inode) +
  sizeof(*info->in->fragtree.splits) *
  le32_to_cpu(info->in->fragtree.nsplits);

 ceph_decode_32_safe(p, end, info->symlink_len, bad);
 ceph_decode_need(p, end, info->symlink_len, bad);
 info->symlink = *p;
 *p += info->symlink_len;

 ceph_decode_copy_safe(p, end, &info->dir_layout,
         sizeof(info->dir_layout), bad);
 ceph_decode_32_safe(p, end, info->xattr_len, bad);
 ceph_decode_need(p, end, info->xattr_len, bad);
 info->xattr_data = *p;
 *p += info->xattr_len;

 if (features == (u64)-1) {
  /* inline data */
  ceph_decode_64_safe(p, end, info->inline_version, bad);
  ceph_decode_32_safe(p, end, info->inline_len, bad);
  ceph_decode_need(p, end, info->inline_len, bad);
  info->inline_data = *p;
  *p += info->inline_len;
  /* quota */
  err = parse_reply_info_quota(p, end, info);
  if (err < 0)
   goto out_bad;
  /* pool namespace */
  ceph_decode_32_safe(p, end, info->pool_ns_len, bad);
  if (info->pool_ns_len > 0) {
   ceph_decode_need(p, end, info->pool_ns_len, bad);
   info->pool_ns_data = *p;
   *p += info->pool_ns_len;
  }

  /* btime */
  ceph_decode_need(p, end, sizeof(info->btime), bad);
  ceph_decode_copy(p, &info->btime, sizeof(info->btime));

  /* change attribute */
  ceph_decode_64_safe(p, end, info->change_attr, bad);

  /* dir pin */
  if (struct_v >= 2) {
   ceph_decode_32_safe(p, end, info->dir_pin, bad);
  } else {
   info->dir_pin = -ENODATA;
  }

  /* snapshot birth time, remains zero for v<=2 */
  if (struct_v >= 3) {
   ceph_decode_need(p, end, sizeof(info->snap_btime), bad);
   ceph_decode_copy(p, &info->snap_btime,
      sizeof(info->snap_btime));
  } else {
   memset(&info->snap_btime, 0, sizeof(info->snap_btime));
  }

  /* snapshot count, remains zero for v<=3 */
  if (struct_v >= 4) {
   ceph_decode_64_safe(p, end, info->rsnaps, bad);
  } else {
   info->rsnaps = 0;
  }

  if (struct_v >= 5) {
   u32 alen;

   ceph_decode_32_safe(p, end, alen, bad);

   while (alen--) {
    u32 len;

    /* key */
    ceph_decode_32_safe(p, end, len, bad);
    ceph_decode_skip_n(p, end, len, bad);
    /* value */
    ceph_decode_32_safe(p, end, len, bad);
    ceph_decode_skip_n(p, end, len, bad);
   }
  }

  /* fscrypt flag -- ignore */
  if (struct_v >= 6)
   ceph_decode_skip_8(p, end, bad);

  info->fscrypt_auth = NULL;
  info->fscrypt_auth_len = 0;
  info->fscrypt_file = NULL;
  info->fscrypt_file_len = 0;
  if (struct_v >= 7) {
   ceph_decode_32_safe(p, end, info->fscrypt_auth_len, bad);
   if (info->fscrypt_auth_len) {
    info->fscrypt_auth = kmalloc(info->fscrypt_auth_len,
            GFP_KERNEL);
    if (!info->fscrypt_auth)
     return -ENOMEM;
    ceph_decode_copy_safe(p, end, info->fscrypt_auth,
            info->fscrypt_auth_len, bad);
   }
   ceph_decode_32_safe(p, end, info->fscrypt_file_len, bad);
   if (info->fscrypt_file_len) {
    info->fscrypt_file = kmalloc(info->fscrypt_file_len,
            GFP_KERNEL);
    if (!info->fscrypt_file)
     return -ENOMEM;
    ceph_decode_copy_safe(p, end, info->fscrypt_file,
            info->fscrypt_file_len, bad);
   }
  }
  *p = end;
 } else {
  /* legacy (unversioned) struct */
  if (features & CEPH_FEATURE_MDS_INLINE_DATA) {
   ceph_decode_64_safe(p, end, info->inline_version, bad);
   ceph_decode_32_safe(p, end, info->inline_len, bad);
   ceph_decode_need(p, end, info->inline_len, bad);
   info->inline_data = *p;
   *p += info->inline_len;
  } else
   info->inline_version = CEPH_INLINE_NONE;

  if (features & CEPH_FEATURE_MDS_QUOTA) {
   err = parse_reply_info_quota(p, end, info);
   if (err < 0)
    goto out_bad;
  } else {
   info->max_bytes = 0;
   info->max_files = 0;
  }

  info->pool_ns_len = 0;
  info->pool_ns_data = NULL;
  if (features & CEPH_FEATURE_FS_FILE_LAYOUT_V2) {
   ceph_decode_32_safe(p, end, info->pool_ns_len, bad);
   if (info->pool_ns_len > 0) {
    ceph_decode_need(p, end, info->pool_ns_len, bad);
    info->pool_ns_data = *p;
    *p += info->pool_ns_len;
   }
  }

  if (features & CEPH_FEATURE_FS_BTIME) {
   ceph_decode_need(p, end, sizeof(info->btime), bad);
   ceph_decode_copy(p, &info->btime, sizeof(info->btime));
   ceph_decode_64_safe(p, end, info->change_attr, bad);
  }

  info->dir_pin = -ENODATA;
  /* info->snap_btime and info->rsnaps remain zero */
 }
 return 0;
bad:
 err = -EIO;
out_bad:
 return err;
}

static int parse_reply_info_dir(void **p, void *end,
    struct ceph_mds_reply_dirfrag **dirfrag,
    u64 features)
{
 if (features == (u64)-1) {
  u8 struct_v, struct_compat;
  u32 struct_len;
  ceph_decode_8_safe(p, end, struct_v, bad);
  ceph_decode_8_safe(p, end, struct_compat, bad);
  /* struct_v is expected to be >= 1. we only understand
 * encoding whose struct_compat == 1. */

  if (!struct_v || struct_compat != 1)
   goto bad;
  ceph_decode_32_safe(p, end, struct_len, bad);
  ceph_decode_need(p, end, struct_len, bad);
  end = *p + struct_len;
 }

 ceph_decode_need(p, end, sizeof(**dirfrag), bad);
 *dirfrag = *p;
 *p += sizeof(**dirfrag) + sizeof(u32) * le32_to_cpu((*dirfrag)->ndist);
 if (unlikely(*p > end))
  goto bad;
 if (features == (u64)-1)
  *p = end;
 return 0;
bad:
 return -EIO;
}

static int parse_reply_info_lease(void **p, void *end,
      struct ceph_mds_reply_lease **lease,
      u64 features, u32 *altname_len, u8 **altname)
{
 u8 struct_v;
 u32 struct_len;
 void *lend;

 if (features == (u64)-1) {
  u8 struct_compat;

  ceph_decode_8_safe(p, end, struct_v, bad);
  ceph_decode_8_safe(p, end, struct_compat, bad);

  /* struct_v is expected to be >= 1. we only understand
 * encoding whose struct_compat == 1. */

  if (!struct_v || struct_compat != 1)
   goto bad;

  ceph_decode_32_safe(p, end, struct_len, bad);
 } else {
  struct_len = sizeof(**lease);
  *altname_len = 0;
  *altname = NULL;
 }

 lend = *p + struct_len;
 ceph_decode_need(p, end, struct_len, bad);
 *lease = *p;
 *p += sizeof(**lease);

 if (features == (u64)-1) {
  if (struct_v >= 2) {
   ceph_decode_32_safe(p, end, *altname_len, bad);
   ceph_decode_need(p, end, *altname_len, bad);
   *altname = *p;
   *p += *altname_len;
  } else {
   *altname = NULL;
   *altname_len = 0;
  }
 }
 *p = lend;
 return 0;
bad:
 return -EIO;
}

/*
 * parse a normal reply, which may contain a (dir+)dentry and/or a
 * target inode.
 */

static int parse_reply_info_trace(void **p, void *end,
      struct ceph_mds_reply_info_parsed *info,
      u64 features)
{
 int err;

 if (info->head->is_dentry) {
  err = parse_reply_info_in(p, end, &info->diri, features);
  if (err < 0)
   goto out_bad;

  err = parse_reply_info_dir(p, end, &info->dirfrag, features);
  if (err < 0)
   goto out_bad;

  ceph_decode_32_safe(p, end, info->dname_len, bad);
  ceph_decode_need(p, end, info->dname_len, bad);
  info->dname = *p;
  *p += info->dname_len;

  err = parse_reply_info_lease(p, end, &info->dlease, features,
          &info->altname_len, &info->altname);
  if (err < 0)
   goto out_bad;
 }

 if (info->head->is_target) {
  err = parse_reply_info_in(p, end, &info->targeti, features);
  if (err < 0)
   goto out_bad;
 }

 if (unlikely(*p != end))
  goto bad;
 return 0;

bad:
 err = -EIO;
out_bad:
 pr_err("problem parsing mds trace %d\n", err);
 return err;
}

/*
 * parse readdir results
 */

static int parse_reply_info_readdir(void **p, void *end,
        struct ceph_mds_request *req,
        u64 features)
{
 struct ceph_mds_reply_info_parsed *info = &req->r_reply_info;
 struct ceph_client *cl = req->r_mdsc->fsc->client;
 u32 num, i = 0;
 int err;

 err = parse_reply_info_dir(p, end, &info->dir_dir, features);
 if (err < 0)
  goto out_bad;

 ceph_decode_need(p, end, sizeof(num) + 2, bad);
 num = ceph_decode_32(p);
 {
  u16 flags = ceph_decode_16(p);
  info->dir_end = !!(flags & CEPH_READDIR_FRAG_END);
  info->dir_complete = !!(flags & CEPH_READDIR_FRAG_COMPLETE);
  info->hash_order = !!(flags & CEPH_READDIR_HASH_ORDER);
  info->offset_hash = !!(flags & CEPH_READDIR_OFFSET_HASH);
 }
 if (num == 0)
  goto done;

 BUG_ON(!info->dir_entries);
 if ((unsigned long)(info->dir_entries + num) >
     (unsigned long)info->dir_entries + info->dir_buf_size) {
  pr_err_client(cl, "dir contents are larger than expected\n");
  WARN_ON(1);
  goto bad;
 }

 info->dir_nr = num;
 while (num) {
  struct inode *inode = d_inode(req->r_dentry);
  struct ceph_inode_info *ci = ceph_inode(inode);
  struct ceph_mds_reply_dir_entry *rde = info->dir_entries + i;
  struct fscrypt_str tname = FSTR_INIT(NULL, 0);
  struct fscrypt_str oname = FSTR_INIT(NULL, 0);
  struct ceph_fname fname;
  u32 altname_len, _name_len;
  u8 *altname, *_name;

  /* dentry */
  ceph_decode_32_safe(p, end, _name_len, bad);
  ceph_decode_need(p, end, _name_len, bad);
  _name = *p;
  *p += _name_len;
  doutc(cl, "parsed dir dname '%.*s'\n", _name_len, _name);

  if (info->hash_order)
   rde->raw_hash = ceph_str_hash(ci->i_dir_layout.dl_dir_hash,
            _name, _name_len);

  /* dentry lease */
  err = parse_reply_info_lease(p, end, &rde->lease, features,
          &altname_len, &altname);
  if (err)
   goto out_bad;

  /*
 * Try to dencrypt the dentry names and update them
 * in the ceph_mds_reply_dir_entry struct.
 */

  fname.dir = inode;
  fname.name = _name;
  fname.name_len = _name_len;
  fname.ctext = altname;
  fname.ctext_len = altname_len;
  /*
 * The _name_len maybe larger than altname_len, such as
 * when the human readable name length is in range of
 * (CEPH_NOHASH_NAME_MAX, CEPH_NOHASH_NAME_MAX + SHA256_DIGEST_SIZE),
 * then the copy in ceph_fname_to_usr will corrupt the
 * data if there has no encryption key.
 *
 * Just set the no_copy flag and then if there has no
 * encryption key the oname.name will be assigned to
 * _name always.
 */

  fname.no_copy = true;
  if (altname_len == 0) {
   /*
 * Set tname to _name, and this will be used
 * to do the base64_decode in-place. It's
 * safe because the decoded string should
 * always be shorter, which is 3/4 of origin
 * string.
 */

   tname.name = _name;

   /*
 * Set oname to _name too, and this will be
 * used to do the dencryption in-place.
 */

   oname.name = _name;
   oname.len = _name_len;
  } else {
   /*
 * This will do the decryption only in-place
 * from altname cryptext directly.
 */

   oname.name = altname;
   oname.len = altname_len;
  }
  rde->is_nokey = false;
  err = ceph_fname_to_usr(&fname, &tname, &oname, &rde->is_nokey);
  if (err) {
   pr_err_client(cl, "unable to decode %.*s, got %d\n",
          _name_len, _name, err);
   goto out_bad;
  }
  rde->name = oname.name;
  rde->name_len = oname.len;

  /* inode */
  err = parse_reply_info_in(p, end, &rde->inode, features);
  if (err < 0)
   goto out_bad;
  /* ceph_readdir_prepopulate() will update it */
  rde->offset = 0;
  i++;
  num--;
 }

done:
 /* Skip over any unrecognized fields */
 *p = end;
 return 0;

bad:
 err = -EIO;
out_bad:
 pr_err_client(cl, "problem parsing dir contents %d\n", err);
 return err;
}

/*
 * parse fcntl F_GETLK results
 */

static int parse_reply_info_filelock(void **p, void *end,
         struct ceph_mds_reply_info_parsed *info,
         u64 features)
{
 if (*p + sizeof(*info->filelock_reply) > end)
  goto bad;

 info->filelock_reply = *p;

 /* Skip over any unrecognized fields */
 *p = end;
 return 0;
bad:
 return -EIO;
}


#if BITS_PER_LONG == 64

#define DELEGATED_INO_AVAILABLE  xa_mk_value(1)

static int ceph_parse_deleg_inos(void **p, void *end,
     struct ceph_mds_session *s)
{
 struct ceph_client *cl = s->s_mdsc->fsc->client;
 u32 sets;

 ceph_decode_32_safe(p, end, sets, bad);
 doutc(cl, "got %u sets of delegated inodes\n", sets);
 while (sets--) {
  u64 start, len;

  ceph_decode_64_safe(p, end, start, bad);
  ceph_decode_64_safe(p, end, len, bad);

  /* Don't accept a delegation of system inodes */
  if (start < CEPH_INO_SYSTEM_BASE) {
   pr_warn_ratelimited_client(cl,
    "ignoring reserved inode range delegation (start=0x%llx len=0x%llx)\n",
    start, len);
   continue;
  }
  while (len--) {
   int err = xa_insert(&s->s_delegated_inos, start++,
         DELEGATED_INO_AVAILABLE,
         GFP_KERNEL);
   if (!err) {
    doutc(cl, "added delegated inode 0x%llx\n", start - 1);
   } else if (err == -EBUSY) {
    pr_warn_client(cl,
     "MDS delegated inode 0x%llx more than once.\n",
     start - 1);
   } else {
    return err;
   }
  }
 }
 return 0;
bad:
 return -EIO;
}

u64 ceph_get_deleg_ino(struct ceph_mds_session *s)
{
 unsigned long ino;
 void *val;

 xa_for_each(&s->s_delegated_inos, ino, val) {
  val = xa_erase(&s->s_delegated_inos, ino);
  if (val == DELEGATED_INO_AVAILABLE)
   return ino;
 }
 return 0;
}

int ceph_restore_deleg_ino(struct ceph_mds_session *s, u64 ino)
{
 return xa_insert(&s->s_delegated_inos, ino, DELEGATED_INO_AVAILABLE,
    GFP_KERNEL);
}
#else /* BITS_PER_LONG == 64 */
/*
 * FIXME: xarrays can't handle 64-bit indexes on a 32-bit arch. For now, just
 * ignore delegated_inos on 32 bit arch. Maybe eventually add xarrays for top
 * and bottom words?
 */

static int ceph_parse_deleg_inos(void **p, void *end,
     struct ceph_mds_session *s)
{
 u32 sets;

 ceph_decode_32_safe(p, end, sets, bad);
 if (sets)
  ceph_decode_skip_n(p, end, sets * 2 * sizeof(__le64), bad);
 return 0;
bad:
 return -EIO;
}

u64 ceph_get_deleg_ino(struct ceph_mds_session *s)
{
 return 0;
}

int ceph_restore_deleg_ino(struct ceph_mds_session *s, u64 ino)
{
 return 0;
}
#endif /* BITS_PER_LONG == 64 */

/*
 * parse create results
 */

static int parse_reply_info_create(void **p, void *end,
      struct ceph_mds_reply_info_parsed *info,
      u64 features, struct ceph_mds_session *s)
{
 int ret;

 if (features == (u64)-1 ||
     (features & CEPH_FEATURE_REPLY_CREATE_INODE)) {
  if (*p == end) {
   /* Malformed reply? */
   info->has_create_ino = false;
  } else if (test_bit(CEPHFS_FEATURE_DELEG_INO, &s->s_features)) {
   info->has_create_ino = true;
   /* struct_v, struct_compat, and len */
   ceph_decode_skip_n(p, end, 2 + sizeof(u32), bad);
   ceph_decode_64_safe(p, end, info->ino, bad);
   ret = ceph_parse_deleg_inos(p, end, s);
   if (ret)
    return ret;
  } else {
   /* legacy */
   ceph_decode_64_safe(p, end, info->ino, bad);
   info->has_create_ino = true;
  }
 } else {
  if (*p != end)
   goto bad;
 }

 /* Skip over any unrecognized fields */
 *p = end;
 return 0;
bad:
 return -EIO;
}

static int parse_reply_info_getvxattr(void **p, void *end,
          struct ceph_mds_reply_info_parsed *info,
          u64 features)
{
 u32 value_len;

 ceph_decode_skip_8(p, end, bad); /* skip current version: 1 */
 ceph_decode_skip_8(p, end, bad); /* skip first version: 1 */
 ceph_decode_skip_32(p, end, bad); /* skip payload length */

 ceph_decode_32_safe(p, end, value_len, bad);

 if (value_len == end - *p) {
   info->xattr_info.xattr_value = *p;
   info->xattr_info.xattr_value_len = value_len;
   *p = end;
   return value_len;
 }
bad:
 return -EIO;
}

/*
 * parse extra results
 */

static int parse_reply_info_extra(void **p, void *end,
      struct ceph_mds_request *req,
      u64 features, struct ceph_mds_session *s)
{
 struct ceph_mds_reply_info_parsed *info = &req->r_reply_info;
 u32 op = le32_to_cpu(info->head->op);

 if (op == CEPH_MDS_OP_GETFILELOCK)
  return parse_reply_info_filelock(p, end, info, features);
 else if (op == CEPH_MDS_OP_READDIR || op == CEPH_MDS_OP_LSSNAP)
  return parse_reply_info_readdir(p, end, req, features);
 else if (op == CEPH_MDS_OP_CREATE)
  return parse_reply_info_create(p, end, info, features, s);
 else if (op == CEPH_MDS_OP_GETVXATTR)
  return parse_reply_info_getvxattr(p, end, info, features);
 else
  return -EIO;
}

/*
 * parse entire mds reply
 */

static int parse_reply_info(struct ceph_mds_session *s, struct ceph_msg *msg,
       struct ceph_mds_request *req, u64 features)
{
 struct ceph_mds_reply_info_parsed *info = &req->r_reply_info;
 struct ceph_client *cl = s->s_mdsc->fsc->client;
 void *p, *end;
 u32 len;
 int err;

 info->head = msg->front.iov_base;
 p = msg->front.iov_base + sizeof(struct ceph_mds_reply_head);
 end = p + msg->front.iov_len - sizeof(struct ceph_mds_reply_head);

 /* trace */
 ceph_decode_32_safe(&p, end, len, bad);
 if (len > 0) {
  ceph_decode_need(&p, end, len, bad);
  err = parse_reply_info_trace(&p, p+len, info, features);
  if (err < 0)
   goto out_bad;
 }

 /* extra */
 ceph_decode_32_safe(&p, end, len, bad);
 if (len > 0) {
  ceph_decode_need(&p, end, len, bad);
  err = parse_reply_info_extra(&p, p+len, req, features, s);
  if (err < 0)
   goto out_bad;
 }

 /* snap blob */
 ceph_decode_32_safe(&p, end, len, bad);
 info->snapblob_len = len;
 info->snapblob = p;
 p += len;

 if (p != end)
  goto bad;
 return 0;

bad:
 err = -EIO;
out_bad:
 pr_err_client(cl, "mds parse_reply err %d\n", err);
 ceph_msg_dump(msg);
 return err;
}

static void destroy_reply_info(struct ceph_mds_reply_info_parsed *info)
{
 int i;

 kfree(info->diri.fscrypt_auth);
 kfree(info->diri.fscrypt_file);
 kfree(info->targeti.fscrypt_auth);
 kfree(info->targeti.fscrypt_file);
 if (!info->dir_entries)
  return;

 for (i = 0; i < info->dir_nr; i++) {
  struct ceph_mds_reply_dir_entry *rde = info->dir_entries + i;

  kfree(rde->inode.fscrypt_auth);
  kfree(rde->inode.fscrypt_file);
 }
 free_pages((unsigned long)info->dir_entries, get_order(info->dir_buf_size));
}

/*
 * In async unlink case the kclient won't wait for the first reply
 * from MDS and just drop all the links and unhash the dentry and then
 * succeeds immediately.
 *
 * For any new create/link/rename,etc requests followed by using the
 * same file names we must wait for the first reply of the inflight
 * unlink request, or the MDS possibly will fail these following
 * requests with -EEXIST if the inflight async unlink request was
 * delayed for some reasons.
 *
 * And the worst case is that for the none async openc request it will
 * successfully open the file if the CDentry hasn't been unlinked yet,
 * but later the previous delayed async unlink request will remove the
 * CDentry. That means the just created file is possibly deleted later
 * by accident.
 *
 * We need to wait for the inflight async unlink requests to finish
 * when creating new files/directories by using the same file names.
 */

int ceph_wait_on_conflict_unlink(struct dentry *dentry)
{
 struct ceph_fs_client *fsc = ceph_sb_to_fs_client(dentry->d_sb);
 struct ceph_client *cl = fsc->client;
 struct dentry *pdentry = dentry->d_parent;
 struct dentry *udentry, *found = NULL;
 struct ceph_dentry_info *di;
 struct qstr dname;
 u32 hash = dentry->d_name.hash;
 int err;

 dname.name = dentry->d_name.name;
 dname.len = dentry->d_name.len;

 rcu_read_lock();
 hash_for_each_possible_rcu(fsc->async_unlink_conflict, di,
       hnode, hash) {
  udentry = di->dentry;

  spin_lock(&udentry->d_lock);
  if (udentry->d_name.hash != hash)
   goto next;
  if (unlikely(udentry->d_parent != pdentry))
   goto next;
  if (!hash_hashed(&di->hnode))
   goto next;

  if (!test_bit(CEPH_DENTRY_ASYNC_UNLINK_BIT, &di->flags))
   pr_warn_client(cl, "dentry %p:%pd async unlink bit is not set\n",
           dentry, dentry);

  if (!d_same_name(udentry, pdentry, &dname))
   goto next;

  found = dget_dlock(udentry);
  spin_unlock(&udentry->d_lock);
  break;
next:
  spin_unlock(&udentry->d_lock);
 }
 rcu_read_unlock();

 if (likely(!found))
  return 0;

 doutc(cl, "dentry %p:%pd conflict with old %p:%pd\n", dentry, dentry,
       found, found);

 err = wait_on_bit(&di->flags, CEPH_DENTRY_ASYNC_UNLINK_BIT,
     TASK_KILLABLE);
 dput(found);
 return err;
}


/*
 * sessions
 */

const char *ceph_session_state_name(int s)
{
 switch (s) {
 case CEPH_MDS_SESSION_NEW: return "new";
 case CEPH_MDS_SESSION_OPENING: return "opening";
 case CEPH_MDS_SESSION_OPEN: return "open";
 case CEPH_MDS_SESSION_HUNG: return "hung";
 case CEPH_MDS_SESSION_CLOSING: return "closing";
 case CEPH_MDS_SESSION_CLOSED: return "closed";
 case CEPH_MDS_SESSION_RESTARTING: return "restarting";
 case CEPH_MDS_SESSION_RECONNECTING: return "reconnecting";
 case CEPH_MDS_SESSION_REJECTED: return "rejected";
 defaultreturn "???";
 }
}

struct ceph_mds_session *ceph_get_mds_session(struct ceph_mds_session *s)
{
 if (refcount_inc_not_zero(&s->s_ref))
  return s;
 return NULL;
}

void ceph_put_mds_session(struct ceph_mds_session *s)
{
 if (IS_ERR_OR_NULL(s))
  return;

 if (refcount_dec_and_test(&s->s_ref)) {
  if (s->s_auth.authorizer)
   ceph_auth_destroy_authorizer(s->s_auth.authorizer);
  WARN_ON(mutex_is_locked(&s->s_mutex));
  xa_destroy(&s->s_delegated_inos);
  kfree(s);
 }
}

/*
 * called under mdsc->mutex
 */

struct ceph_mds_session *__ceph_lookup_mds_session(struct ceph_mds_client *mdsc,
         int mds)
{
 if (mds >= mdsc->max_sessions || !mdsc->sessions[mds])
  return NULL;
 return ceph_get_mds_session(mdsc->sessions[mds]);
}

static bool __have_session(struct ceph_mds_client *mdsc, int mds)
{
 if (mds >= mdsc->max_sessions || !mdsc->sessions[mds])
  return false;
 else
  return true;
}

static int __verify_registered_session(struct ceph_mds_client *mdsc,
           struct ceph_mds_session *s)
{
 if (s->s_mds >= mdsc->max_sessions ||
     mdsc->sessions[s->s_mds] != s)
  return -ENOENT;
 return 0;
}

/*
 * create+register a new session for given mds.
 * called under mdsc->mutex.
 */

static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
       int mds)
{
 struct ceph_client *cl = mdsc->fsc->client;
 struct ceph_mds_session *s;

 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_FENCE_IO)
  return ERR_PTR(-EIO);

 if (mds >= mdsc->mdsmap->possible_max_rank)
  return ERR_PTR(-EINVAL);

 s = kzalloc(sizeof(*s), GFP_NOFS);
 if (!s)
  return ERR_PTR(-ENOMEM);

 if (mds >= mdsc->max_sessions) {
  int newmax = 1 << get_count_order(mds + 1);
  struct ceph_mds_session **sa;

  doutc(cl, "realloc to %d\n", newmax);
  sa = kcalloc(newmax, sizeof(void *), GFP_NOFS);
  if (!sa)
   goto fail_realloc;
  if (mdsc->sessions) {
   memcpy(sa, mdsc->sessions,
          mdsc->max_sessions * sizeof(void *));
   kfree(mdsc->sessions);
  }
  mdsc->sessions = sa;
  mdsc->max_sessions = newmax;
 }

 doutc(cl, "mds%d\n", mds);
 s->s_mdsc = mdsc;
 s->s_mds = mds;
 s->s_state = CEPH_MDS_SESSION_NEW;
 mutex_init(&s->s_mutex);

 ceph_con_init(&s->s_con, s, &mds_con_ops, &mdsc->fsc->client->msgr);

 atomic_set(&s->s_cap_gen, 1);
 s->s_cap_ttl = jiffies - 1;

 spin_lock_init(&s->s_cap_lock);
 INIT_LIST_HEAD(&s->s_caps);
 refcount_set(&s->s_ref, 1);
 INIT_LIST_HEAD(&s->s_waiting);
 INIT_LIST_HEAD(&s->s_unsafe);
 xa_init(&s->s_delegated_inos);
 INIT_LIST_HEAD(&s->s_cap_releases);
 INIT_WORK(&s->s_cap_release_work, ceph_cap_release_work);

 INIT_LIST_HEAD(&s->s_cap_dirty);
 INIT_LIST_HEAD(&s->s_cap_flushing);

 mdsc->sessions[mds] = s;
 atomic_inc(&mdsc->num_sessions);
 refcount_inc(&s->s_ref);  /* one ref to sessions[], one to caller */

 ceph_con_open(&s->s_con, CEPH_ENTITY_TYPE_MDS, mds,
        ceph_mdsmap_get_addr(mdsc->mdsmap, mds));

 return s;

fail_realloc:
 kfree(s);
 return ERR_PTR(-ENOMEM);
}

/*
 * called under mdsc->mutex
 */

static void __unregister_session(struct ceph_mds_client *mdsc,
          struct ceph_mds_session *s)
{
 doutc(mdsc->fsc->client, "mds%d %p\n", s->s_mds, s);
 BUG_ON(mdsc->sessions[s->s_mds] != s);
 mdsc->sessions[s->s_mds] = NULL;
 ceph_con_close(&s->s_con);
 ceph_put_mds_session(s);
 atomic_dec(&mdsc->num_sessions);
}

/*
 * drop session refs in request.
 *
 * should be last request ref, or hold mdsc->mutex
 */

static void put_request_session(struct ceph_mds_request *req)
{
 if (req->r_session) {
  ceph_put_mds_session(req->r_session);
  req->r_session = NULL;
 }
}

void ceph_mdsc_iterate_sessions(struct ceph_mds_client *mdsc,
    void (*cb)(struct ceph_mds_session *),
    bool check_state)
{
 int mds;

 mutex_lock(&mdsc->mutex);
 for (mds = 0; mds < mdsc->max_sessions; ++mds) {
  struct ceph_mds_session *s;

  s = __ceph_lookup_mds_session(mdsc, mds);
  if (!s)
   continue;

  if (check_state && !check_session_state(s)) {
   ceph_put_mds_session(s);
   continue;
  }

  mutex_unlock(&mdsc->mutex);
  cb(s);
  ceph_put_mds_session(s);
  mutex_lock(&mdsc->mutex);
 }
 mutex_unlock(&mdsc->mutex);
}

void ceph_mdsc_release_request(struct kref *kref)
{
 struct ceph_mds_request *req = container_of(kref,
          struct ceph_mds_request,
          r_kref);
 ceph_mdsc_release_dir_caps_async(req);
 destroy_reply_info(&req->r_reply_info);
 if (req->r_request)
  ceph_msg_put(req->r_request);
 if (req->r_reply)
  ceph_msg_put(req->r_reply);
 if (req->r_inode) {
  ceph_put_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
  iput(req->r_inode);
 }
 if (req->r_parent) {
  ceph_put_cap_refs(ceph_inode(req->r_parent), CEPH_CAP_PIN);
  iput(req->r_parent);
 }
 iput(req->r_target_inode);
 iput(req->r_new_inode);
 if (req->r_dentry)
  dput(req->r_dentry);
 if (req->r_old_dentry)
  dput(req->r_old_dentry);
 if (req->r_old_dentry_dir) {
  /*
 * track (and drop pins for) r_old_dentry_dir
 * separately, since r_old_dentry's d_parent may have
 * changed between the dir mutex being dropped and
 * this request being freed.
 */

  ceph_put_cap_refs(ceph_inode(req->r_old_dentry_dir),
      CEPH_CAP_PIN);
  iput(req->r_old_dentry_dir);
 }
 kfree(req->r_path1);
 kfree(req->r_path2);
 put_cred(req->r_cred);
 if (req->r_mnt_idmap)
  mnt_idmap_put(req->r_mnt_idmap);
 if (req->r_pagelist)
  ceph_pagelist_release(req->r_pagelist);
 kfree(req->r_fscrypt_auth);
 kfree(req->r_altname);
 put_request_session(req);
 ceph_unreserve_caps(req->r_mdsc, &req->r_caps_reservation);
 WARN_ON_ONCE(!list_empty(&req->r_wait));
 kmem_cache_free(ceph_mds_request_cachep, req);
}

DEFINE_RB_FUNCS(request, struct ceph_mds_request, r_tid, r_node)

/*
 * lookup session, bump ref if found.
 *
 * called under mdsc->mutex.
 */

static struct ceph_mds_request *
lookup_get_request(struct ceph_mds_client *mdsc, u64 tid)
{
 struct ceph_mds_request *req;

 req = lookup_request(&mdsc->request_tree, tid);
 if (req)
  ceph_mdsc_get_request(req);

 return req;
}

/*
 * Register an in-flight request, and assign a tid.  Link to directory
 * are modifying (if any).
 *
 * Called under mdsc->mutex.
 */

static void __register_request(struct ceph_mds_client *mdsc,
          struct ceph_mds_request *req,
          struct inode *dir)
{
 struct ceph_client *cl = mdsc->fsc->client;
 int ret = 0;

 req->r_tid = ++mdsc->last_tid;
 if (req->r_num_caps) {
  ret = ceph_reserve_caps(mdsc, &req->r_caps_reservation,
     req->r_num_caps);
  if (ret < 0) {
   pr_err_client(cl, "%p failed to reserve caps: %d\n",
          req, ret);
   /* set req->r_err to fail early from __do_request */
   req->r_err = ret;
   return;
  }
 }
 doutc(cl, "%p tid %lld\n", req, req->r_tid);
 ceph_mdsc_get_request(req);
 insert_request(&mdsc->request_tree, req);

 req->r_cred = get_current_cred();
 if (!req->r_mnt_idmap)
  req->r_mnt_idmap = &nop_mnt_idmap;

 if (mdsc->oldest_tid == 0 && req->r_op != CEPH_MDS_OP_SETFILELOCK)
  mdsc->oldest_tid = req->r_tid;

 if (dir) {
  struct ceph_inode_info *ci = ceph_inode(dir);

  ihold(dir);
  req->r_unsafe_dir = dir;
  spin_lock(&ci->i_unsafe_lock);
  list_add_tail(&req->r_unsafe_dir_item, &ci->i_unsafe_dirops);
  spin_unlock(&ci->i_unsafe_lock);
 }
}

static void __unregister_request(struct ceph_mds_client *mdsc,
     struct ceph_mds_request *req)
{
 doutc(mdsc->fsc->client, "%p tid %lld\n", req, req->r_tid);

 /* Never leave an unregistered request on an unsafe list! */
 list_del_init(&req->r_unsafe_item);

 if (req->r_tid == mdsc->oldest_tid) {
  struct rb_node *p = rb_next(&req->r_node);
  mdsc->oldest_tid = 0;
  while (p) {
   struct ceph_mds_request *next_req =
    rb_entry(p, struct ceph_mds_request, r_node);
   if (next_req->r_op != CEPH_MDS_OP_SETFILELOCK) {
    mdsc->oldest_tid = next_req->r_tid;
    break;
   }
   p = rb_next(p);
  }
 }

 erase_request(&mdsc->request_tree, req);

 if (req->r_unsafe_dir) {
  struct ceph_inode_info *ci = ceph_inode(req->r_unsafe_dir);
  spin_lock(&ci->i_unsafe_lock);
  list_del_init(&req->r_unsafe_dir_item);
  spin_unlock(&ci->i_unsafe_lock);
 }
 if (req->r_target_inode &&
     test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags)) {
  struct ceph_inode_info *ci = ceph_inode(req->r_target_inode);
  spin_lock(&ci->i_unsafe_lock);
  list_del_init(&req->r_unsafe_target_item);
  spin_unlock(&ci->i_unsafe_lock);
 }

 if (req->r_unsafe_dir) {
  iput(req->r_unsafe_dir);
  req->r_unsafe_dir = NULL;
 }

 complete_all(&req->r_safe_completion);

 ceph_mdsc_put_request(req);
}

/*
 * Walk back up the dentry tree until we hit a dentry representing a
 * non-snapshot inode. We do this using the rcu_read_lock (which must be held
 * when calling this) to ensure that the objects won't disappear while we're
 * working with them. Once we hit a candidate dentry, we attempt to take a
 * reference to it, and return that as the result.
 */

static struct inode *get_nonsnap_parent(struct dentry *dentry)
{
 struct inode *inode = NULL;

 while (dentry && !IS_ROOT(dentry)) {
  inode = d_inode_rcu(dentry);
  if (!inode || ceph_snap(inode) == CEPH_NOSNAP)
   break;
  dentry = dentry->d_parent;
 }
 if (inode)
  inode = igrab(inode);
 return inode;
}

/*
 * Choose mds to send request to next.  If there is a hint set in the
 * request (e.g., due to a prior forward hint from the mds), use that.
 * Otherwise, consult frag tree and/or caps to identify the
 * appropriate mds.  If all else fails, choose randomly.
 *
 * Called under mdsc->mutex.
 */

static int __choose_mds(struct ceph_mds_client *mdsc,
   struct ceph_mds_request *req,
   bool *random)
{
 struct inode *inode;
 struct ceph_inode_info *ci;
 struct ceph_cap *cap;
 int mode = req->r_direct_mode;
 int mds = -1;
 u32 hash = req->r_direct_hash;
 bool is_hash = test_bit(CEPH_MDS_R_DIRECT_IS_HASH, &req->r_req_flags);
 struct ceph_client *cl = mdsc->fsc->client;

 if (random)
  *random = false;

 /*
 * is there a specific mds we should try?  ignore hint if we have
 * no session and the mds is not up (active or recovering).
 */

 if (req->r_resend_mds >= 0 &&
     (__have_session(mdsc, req->r_resend_mds) ||
      ceph_mdsmap_get_state(mdsc->mdsmap, req->r_resend_mds) > 0)) {
  doutc(cl, "using resend_mds mds%d\n", req->r_resend_mds);
  return req->r_resend_mds;
 }

 if (mode == USE_RANDOM_MDS)
  goto random;

 inode = NULL;
 if (req->r_inode) {
  if (ceph_snap(req->r_inode) != CEPH_SNAPDIR) {
   inode = req->r_inode;
   ihold(inode);
  } else {
   /* req->r_dentry is non-null for LSSNAP request */
   rcu_read_lock();
   inode = get_nonsnap_parent(req->r_dentry);
   rcu_read_unlock();
   doutc(cl, "using snapdir's parent %p %llx.%llx\n",
         inode, ceph_vinop(inode));
  }
 } else if (req->r_dentry) {
  /* ignore race with rename; old or new d_parent is okay */
  struct dentry *parent;
  struct inode *dir;

  rcu_read_lock();
  parent = READ_ONCE(req->r_dentry->d_parent);
  dir = req->r_parent ? : d_inode_rcu(parent);

  if (!dir || dir->i_sb != mdsc->fsc->sb) {
   /*  not this fs or parent went negative */
   inode = d_inode(req->r_dentry);
   if (inode)
    ihold(inode);
  } else if (ceph_snap(dir) != CEPH_NOSNAP) {
   /* direct snapped/virtual snapdir requests
 * based on parent dir inode */

   inode = get_nonsnap_parent(parent);
   doutc(cl, "using nonsnap parent %p %llx.%llx\n",
         inode, ceph_vinop(inode));
  } else {
   /* dentry target */
   inode = d_inode(req->r_dentry);
   if (!inode || mode == USE_AUTH_MDS) {
    /* dir + name */
    inode = igrab(dir);
    hash = ceph_dentry_hash(dir, req->r_dentry);
    is_hash = true;
   } else {
    ihold(inode);
   }
  }
  rcu_read_unlock();
 }

 if (!inode)
  goto random;

 doutc(cl, "%p %llx.%llx is_hash=%d (0x%x) mode %d\n", inode,
       ceph_vinop(inode), (int)is_hash, hash, mode);
 ci = ceph_inode(inode);

 if (is_hash && S_ISDIR(inode->i_mode)) {
  struct ceph_inode_frag frag;
  int found;

  ceph_choose_frag(ci, hash, &frag, &found);
  if (found) {
   if (mode == USE_ANY_MDS && frag.ndist > 0) {
    u8 r;

    /* choose a random replica */
    get_random_bytes(&r, 1);
    r %= frag.ndist;
    mds = frag.dist[r];
    doutc(cl, "%p %llx.%llx frag %u mds%d (%d/%d)\n",
          inode, ceph_vinop(inode), frag.frag,
          mds, (int)r, frag.ndist);
    if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >=
        CEPH_MDS_STATE_ACTIVE &&
        !ceph_mdsmap_is_laggy(mdsc->mdsmap, mds))
     goto out;
   }

   /* since this file/dir wasn't known to be
 * replicated, then we want to look for the
 * authoritative mds. */

   if (frag.mds >= 0) {
    /* choose auth mds */
    mds = frag.mds;
    doutc(cl, "%p %llx.%llx frag %u mds%d (auth)\n",
          inode, ceph_vinop(inode), frag.frag, mds);
    if (ceph_mdsmap_get_state(mdsc->mdsmap, mds) >=
        CEPH_MDS_STATE_ACTIVE) {
     if (!ceph_mdsmap_is_laggy(mdsc->mdsmap,
          mds))
      goto out;
    }
   }
   mode = USE_AUTH_MDS;
  }
 }

 spin_lock(&ci->i_ceph_lock);
 cap = NULL;
 if (mode == USE_AUTH_MDS)
  cap = ci->i_auth_cap;
 if (!cap && !RB_EMPTY_ROOT(&ci->i_caps))
  cap = rb_entry(rb_first(&ci->i_caps), struct ceph_cap, ci_node);
 if (!cap) {
  spin_unlock(&ci->i_ceph_lock);
  iput(inode);
  goto random;
 }
 mds = cap->session->s_mds;
 doutc(cl, "%p %llx.%llx mds%d (%scap %p)\n", inode,
       ceph_vinop(inode), mds,
       cap == ci->i_auth_cap ? "auth " : "", cap);
 spin_unlock(&ci->i_ceph_lock);
out:
 iput(inode);
 return mds;

random:
 if (random)
  *random = true;

 mds = ceph_mdsmap_get_random_mds(mdsc->mdsmap);
 doutc(cl, "chose random mds%d\n", mds);
 return mds;
}


/*
 * session messages
 */

struct ceph_msg *ceph_create_session_msg(u32 op, u64 seq)
{
 struct ceph_msg *msg;
 struct ceph_mds_session_head *h;

 msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h), GFP_NOFS,
      false);
 if (!msg) {
  pr_err("ENOMEM creating session %s msg\n",
         ceph_session_op_name(op));
  return NULL;
 }
 h = msg->front.iov_base;
 h->op = cpu_to_le32(op);
 h->seq = cpu_to_le64(seq);

 return msg;
}

static const unsigned char feature_bits[] = CEPHFS_FEATURES_CLIENT_SUPPORTED;
#define FEATURE_BYTES(c) (DIV_ROUND_UP((size_t)feature_bits[c - 1] + 1, 64) * 8)
static int encode_supported_features(void **p, void *end)
{
 static const size_t count = ARRAY_SIZE(feature_bits);

 if (count > 0) {
  size_t i;
  size_t size = FEATURE_BYTES(count);
  unsigned long bit;

  if (WARN_ON_ONCE(*p + 4 + size > end))
   return -ERANGE;

  ceph_encode_32(p, size);
  memset(*p, 0, size);
  for (i = 0; i < count; i++) {
   bit = feature_bits[i];
   ((unsigned char *)(*p))[bit / 8] |= BIT(bit % 8);
  }
  *p += size;
 } else {
  if (WARN_ON_ONCE(*p + 4 > end))
   return -ERANGE;

  ceph_encode_32(p, 0);
 }

 return 0;
}

static const unsigned char metric_bits[] = CEPHFS_METRIC_SPEC_CLIENT_SUPPORTED;
#define METRIC_BYTES(cnt) (DIV_ROUND_UP((size_t)metric_bits[cnt - 1] + 1, 64) * 8)
static int encode_metric_spec(void **p, void *end)
{
 static const size_t count = ARRAY_SIZE(metric_bits);

 /* header */
 if (WARN_ON_ONCE(*p + 2 > end))
  return -ERANGE;

 ceph_encode_8(p, 1); /* version */
 ceph_encode_8(p, 1); /* compat */

 if (count > 0) {
  size_t i;
  size_t size = METRIC_BYTES(count);

  if (WARN_ON_ONCE(*p + 4 + 4 + size > end))
   return -ERANGE;

  /* metric spec info length */
  ceph_encode_32(p, 4 + size);

  /* metric spec */
  ceph_encode_32(p, size);
  memset(*p, 0, size);
  for (i = 0; i < count; i++)
   ((unsigned char *)(*p))[i / 8] |= BIT(metric_bits[i] % 8);
  *p += size;
 } else {
  if (WARN_ON_ONCE(*p + 4 + 4 > end))
   return -ERANGE;

  /* metric spec info length */
  ceph_encode_32(p, 4);
  /* metric spec */
  ceph_encode_32(p, 0);
 }

 return 0;
}

/*
 * session message, specialization for CEPH_SESSION_REQUEST_OPEN
 * to include additional client metadata fields.
 */

static struct ceph_msg *
create_session_full_msg(struct ceph_mds_client *mdsc, int op, u64 seq)
{
 struct ceph_msg *msg;
 struct ceph_mds_session_head *h;
 int i;
 int extra_bytes = 0;
 int metadata_key_count = 0;
 struct ceph_options *opt = mdsc->fsc->client->options;
 struct ceph_mount_options *fsopt = mdsc->fsc->mount_options;
 struct ceph_client *cl = mdsc->fsc->client;
 size_t size, count;
 void *p, *end;
 int ret;

 const char* metadata[][2] = {
  {"hostname", mdsc->nodename},
  {"kernel_version", init_utsname()->release},
  {"entity_id", opt->name ? : ""},
  {"root", fsopt->server_path ? : "/"},
  {NULL, NULL}
 };

 /* Calculate serialized length of metadata */
 extra_bytes = 4;  /* map length */
 for (i = 0; metadata[i][0]; ++i) {
  extra_bytes += 8 + strlen(metadata[i][0]) +
   strlen(metadata[i][1]);
  metadata_key_count++;
 }

 /* supported feature */
 size = 0;
 count = ARRAY_SIZE(feature_bits);
 if (count > 0)
  size = FEATURE_BYTES(count);
 extra_bytes += 4 + size;

 /* metric spec */
 size = 0;
 count = ARRAY_SIZE(metric_bits);
 if (count > 0)
  size = METRIC_BYTES(count);
 extra_bytes += 2 + 4 + 4 + size;

 /* flags, mds auth caps and oldest_client_tid */
 extra_bytes += 4 + 4 + 8;

 /* Allocate the message */
 msg = ceph_msg_new(CEPH_MSG_CLIENT_SESSION, sizeof(*h) + extra_bytes,
      GFP_NOFS, false);
 if (!msg) {
  pr_err_client(cl, "ENOMEM creating session open msg\n");
  return ERR_PTR(-ENOMEM);
 }
 p = msg->front.iov_base;
 end = p + msg->front.iov_len;

 h = p;
 h->op = cpu_to_le32(op);
 h->seq = cpu_to_le64(seq);

 /*
 * Serialize client metadata into waiting buffer space, using
 * the format that userspace expects for map<string, string>
 *
 * ClientSession messages with metadata are v7
 */

 msg->hdr.version = cpu_to_le16(7);
 msg->hdr.compat_version = cpu_to_le16(1);

 /* The write pointer, following the session_head structure */
 p += sizeof(*h);

 /* Number of entries in the map */
 ceph_encode_32(&p, metadata_key_count);

 /* Two length-prefixed strings for each entry in the map */
 for (i = 0; metadata[i][0]; ++i) {
  size_t const key_len = strlen(metadata[i][0]);
  size_t const val_len = strlen(metadata[i][1]);

  ceph_encode_32(&p, key_len);
  memcpy(p, metadata[i][0], key_len);
  p += key_len;
  ceph_encode_32(&p, val_len);
  memcpy(p, metadata[i][1], val_len);
  p += val_len;
 }

 ret = encode_supported_features(&p, end);
 if (ret) {
  pr_err_client(cl, "encode_supported_features failed!\n");
  ceph_msg_put(msg);
  return ERR_PTR(ret);
 }

 ret = encode_metric_spec(&p, end);
 if (ret) {
  pr_err_client(cl, "encode_metric_spec failed!\n");
  ceph_msg_put(msg);
  return ERR_PTR(ret);
 }

 /* version == 5, flags */
 ceph_encode_32(&p, 0);

 /* version == 6, mds auth caps */
 ceph_encode_32(&p, 0);

 /* version == 7, oldest_client_tid */
 ceph_encode_64(&p, mdsc->oldest_tid);

 msg->front.iov_len = p - msg->front.iov_base;
 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);

 return msg;
}

/*
 * send session open request.
 *
 * called under mdsc->mutex
 */

static int __open_session(struct ceph_mds_client *mdsc,
     struct ceph_mds_session *session)
{
 struct ceph_msg *msg;
 int mstate;
 int mds = session->s_mds;

 if (READ_ONCE(mdsc->fsc->mount_state) == CEPH_MOUNT_FENCE_IO)
  return -EIO;

 /* wait for mds to go active? */
 mstate = ceph_mdsmap_get_state(mdsc->mdsmap, mds);
 doutc(mdsc->fsc->client, "open_session to mds%d (%s)\n", mds,
       ceph_mds_state_name(mstate));
 session->s_state = CEPH_MDS_SESSION_OPENING;
 session->s_renew_requested = jiffies;

 /* send connect message */
 msg = create_session_full_msg(mdsc, CEPH_SESSION_REQUEST_OPEN,
          session->s_seq);
 if (IS_ERR(msg))
  return PTR_ERR(msg);
 ceph_con_send(&session->s_con, msg);
 return 0;
}

/*
 * open sessions for any export targets for the given mds
 *
 * called under mdsc->mutex
 */

static struct ceph_mds_session *
__open_export_target_session(struct ceph_mds_client *mdsc, int target)
{
 struct ceph_mds_session *session;
 int ret;

 session = __ceph_lookup_mds_session(mdsc, target);
 if (!session) {
  session = register_session(mdsc, target);
  if (IS_ERR(session))
   return session;
 }
 if (session->s_state == CEPH_MDS_SESSION_NEW ||
     session->s_state == CEPH_MDS_SESSION_CLOSING) {
  ret = __open_session(mdsc, session);
  if (ret)
   return ERR_PTR(ret);
 }

 return session;
}

struct ceph_mds_session *
ceph_mdsc_open_export_target_session(struct ceph_mds_client *mdsc, int target)
{
 struct ceph_mds_session *session;
 struct ceph_client *cl = mdsc->fsc->client;

 doutc(cl, "to mds%d\n", target);

 mutex_lock(&mdsc->mutex);
 session = __open_export_target_session(mdsc, target);
 mutex_unlock(&mdsc->mutex);

 return session;
}

static void __open_export_target_sessions(struct ceph_mds_client *mdsc,
       struct ceph_mds_session *session)
{
 struct ceph_mds_info *mi;
 struct ceph_mds_session *ts;
 int i, mds = session->s_mds;
 struct ceph_client *cl = mdsc->fsc->client;

 if (mds >= mdsc->mdsmap->possible_max_rank)
  return;

 mi = &mdsc->mdsmap->m_info[mds];
 doutc(cl, "for mds%d (%d targets)\n", session->s_mds,
       mi->num_export_targets);

 for (i = 0; i < mi->num_export_targets; i++) {
  ts = __open_export_target_session(mdsc, mi->export_targets[i]);
  ceph_put_mds_session(ts);
 }
}

/*
 * session caps
 */


static void detach_cap_releases(struct ceph_mds_session *session,
    struct list_head *target)
{
 struct ceph_client *cl = session->s_mdsc->fsc->client;

 lockdep_assert_held(&session->s_cap_lock);

 list_splice_init(&session->s_cap_releases, target);
 session->s_num_cap_releases = 0;
 doutc(cl, "mds%d\n", session->s_mds);
}

static void dispose_cap_releases(struct ceph_mds_client *mdsc,
     struct list_head *dispose)
{
 while (!list_empty(dispose)) {
  struct ceph_cap *cap;
  /* zero out the in-progress message */
  cap = list_first_entry(dispose, struct ceph_cap, session_caps);
  list_del(&cap->session_caps);
  ceph_put_cap(mdsc, cap);
 }
}

static void cleanup_session_requests(struct ceph_mds_client *mdsc,
         struct ceph_mds_session *session)
{
 struct ceph_client *cl = mdsc->fsc->client;
 struct ceph_mds_request *req;
 struct rb_node *p;

 doutc(cl, "mds%d\n", session->s_mds);
 mutex_lock(&mdsc->mutex);
 while (!list_empty(&session->s_unsafe)) {
  req = list_first_entry(&session->s_unsafe,
           struct ceph_mds_request, r_unsafe_item);
  pr_warn_ratelimited_client(cl, " dropping unsafe request %llu\n",
        req->r_tid);
  if (req->r_target_inode)
   mapping_set_error(req->r_target_inode->i_mapping, -EIO);
  if (req->r_unsafe_dir)
   mapping_set_error(req->r_unsafe_dir->i_mapping, -EIO);
  __unregister_request(mdsc, req);
 }
 /* zero r_attempts, so kick_requests() will re-send requests */
 p = rb_first(&mdsc->request_tree);
 while (p) {
  req = rb_entry(p, struct ceph_mds_request, r_node);
  p = rb_next(p);
  if (req->r_session &&
      req->r_session->s_mds == session->s_mds)
   req->r_attempts = 0;
 }
 mutex_unlock(&mdsc->mutex);
}

/*
 * Helper to safely iterate over all caps associated with a session, with
 * special care taken to handle a racing __ceph_remove_cap().
 *
 * Caller must hold session s_mutex.
 */

int ceph_iterate_session_caps(struct ceph_mds_session *session,
         int (*cb)(struct inode *, int mds, void *),
         void *arg)
{
 struct ceph_client *cl = session->s_mdsc->fsc->client;
 struct list_head *p;
 struct ceph_cap *cap;
 struct inode *inode, *last_inode = NULL;
 struct ceph_cap *old_cap = NULL;
 int ret;

 doutc(cl, "%p mds%d\n", session, session->s_mds);
 spin_lock(&session->s_cap_lock);
 p = session->s_caps.next;
 while (p != &session->s_caps) {
  int mds;

  cap = list_entry(p, struct ceph_cap, session_caps);
  inode = igrab(&cap->ci->netfs.inode);
  if (!inode) {
   p = p->next;
   continue;
  }
  session->s_cap_iterator = cap;
  mds = cap->mds;
  spin_unlock(&session->s_cap_lock);

  if (last_inode) {
   iput(last_inode);
   last_inode = NULL;
  }
  if (old_cap) {
   ceph_put_cap(session->s_mdsc, old_cap);
   old_cap = NULL;
  }

  ret = cb(inode, mds, arg);
  last_inode = inode;

  spin_lock(&session->s_cap_lock);
  p = p->next;
  if (!cap->ci) {
   doutc(cl, "finishing cap %p removal\n", cap);
   BUG_ON(cap->session != session);
   cap->session = NULL;
   list_del_init(&cap->session_caps);
   session->s_nr_caps--;
   atomic64_dec(&session->s_mdsc->metric.total_caps);
   if (cap->queue_release)
    __ceph_queue_cap_release(session, cap);
   else
    old_cap = cap;  /* put_cap it w/o locks held */
  }
  if (ret < 0)
   goto out;
 }
 ret = 0;
out:
 session->s_cap_iterator = NULL;
 spin_unlock(&session->s_cap_lock);

 iput(last_inode);
 if (old_cap)
  ceph_put_cap(session->s_mdsc, old_cap);

 return ret;
}

static int remove_session_caps_cb(struct inode *inode, int mds, void *arg)
{
 struct ceph_inode_info *ci = ceph_inode(inode);
 struct ceph_client *cl = ceph_inode_to_client(inode);
 bool invalidate = false;
 struct ceph_cap *cap;
 int iputs = 0;

 spin_lock(&ci->i_ceph_lock);
 cap = __get_cap_for_mds(ci, mds);
 if (cap) {
  doutc(cl, " removing cap %p, ci is %p, inode is %p\n",
        cap, ci, &ci->netfs.inode);

  iputs = ceph_purge_inode_cap(inode, cap, &invalidate);
 }
 spin_unlock(&ci->i_ceph_lock);

 if (cap)
  wake_up_all(&ci->i_cap_wq);
 if (invalidate)
  ceph_queue_invalidate(inode);
 while (iputs--)
  iput(inode);
 return 0;
}

/*
 * caller must hold session s_mutex
 */

static void remove_session_caps(struct ceph_mds_session *session)
{
 struct ceph_fs_client *fsc = session->s_mdsc->fsc;
 struct super_block *sb = fsc->sb;
 LIST_HEAD(dispose);

 doutc(fsc->client, "on %p\n", session);
 ceph_iterate_session_caps(session, remove_session_caps_cb, fsc);

 wake_up_all(&fsc->mdsc->cap_flushing_wq);

 spin_lock(&session->s_cap_lock);
 if (session->s_nr_caps > 0) {
  struct inode *inode;
  struct ceph_cap *cap, *prev = NULL;
  struct ceph_vino vino;
  /*
 * iterate_session_caps() skips inodes that are being
 * deleted, we need to wait until deletions are complete.
 * __wait_on_freeing_inode() is designed for the job,
 * but it is not exported, so use lookup inode function
 * to access it.
 */

  while (!list_empty(&session->s_caps)) {
   cap = list_entry(session->s_caps.next,
      struct ceph_cap, session_caps);
   if (cap == prev)
    break;
   prev = cap;
   vino = cap->ci->i_vino;
   spin_unlock(&session->s_cap_lock);

   inode = ceph_find_inode(sb, vino);
   iput(inode);

   spin_lock(&session->s_cap_lock);
  }
 }

 // drop cap expires and unlock s_cap_lock
 detach_cap_releases(session, &dispose);

 BUG_ON(session->s_nr_caps > 0);
 BUG_ON(!list_empty(&session->s_cap_flushing));
 spin_unlock(&session->s_cap_lock);
 dispose_cap_releases(session->s_mdsc, &dispose);
}

enum {
 RECONNECT,
 RENEWCAPS,
 FORCE_RO,
};

/*
 * wake up any threads waiting on this session's caps.  if the cap is
 * old (didn't get renewed on the client reconnect), remove it now.
 *
 * caller must hold s_mutex.
 */

static int wake_up_session_cb(struct inode *inode, int mds, void *arg)
{
 struct ceph_inode_info *ci = ceph_inode(inode);
 unsigned long ev = (unsigned long)arg;

 if (ev == RECONNECT) {
  spin_lock(&ci->i_ceph_lock);
  ci->i_wanted_max_size = 0;
  ci->i_requested_max_size = 0;
  spin_unlock(&ci->i_ceph_lock);
 } else if (ev == RENEWCAPS) {
  struct ceph_cap *cap;

  spin_lock(&ci->i_ceph_lock);
  cap = __get_cap_for_mds(ci, mds);
  /* mds did not re-issue stale cap */
  if (cap && cap->cap_gen < atomic_read(&cap->session->s_cap_gen))
   cap->issued = cap->implemented = CEPH_CAP_PIN;
  spin_unlock(&ci->i_ceph_lock);
 } else if (ev == FORCE_RO) {
 }
 wake_up_all(&ci->i_cap_wq);
 return 0;
}

static void wake_up_session_caps(struct ceph_mds_session *session, int ev)
{
 struct ceph_client *cl = session->s_mdsc->fsc->client;

 doutc(cl, "session %p mds%d\n", session, session->s_mds);
 ceph_iterate_session_caps(session, wake_up_session_cb,
      (void *)(unsigned long)ev);
}

/*
 * Send periodic message to MDS renewing all currently held caps.  The
 * ack will reset the expiration for all caps from this session.
 *
 * caller holds s_mutex
 */

static int send_renew_caps(struct ceph_mds_client *mdsc,
      struct ceph_mds_session *session)
{
 struct ceph_client *cl = mdsc->fsc->client;
 struct ceph_msg *msg;
 int state;

 if (time_after_eq(jiffies, session->s_cap_ttl) &&
     time_after_eq(session->s_cap_ttl, session->s_renew_requested))
  pr_info_client(cl, "mds%d caps stale\n", session->s_mds);
 session->s_renew_requested = jiffies;

 /* do not try to renew caps until a recovering mds has reconnected
 * with its clients. */

 state = ceph_mdsmap_get_state(mdsc->mdsmap, session->s_mds);
 if (state < CEPH_MDS_STATE_RECONNECT) {
  doutc(cl, "ignoring mds%d (%s)\n", session->s_mds,
        ceph_mds_state_name(state));
  return 0;
 }

 doutc(cl, "to mds%d (%s)\n", session->s_mds,
       ceph_mds_state_name(state));
 msg = create_session_full_msg(mdsc, CEPH_SESSION_REQUEST_RENEWCAPS,
          ++session->s_renew_seq);
 if (IS_ERR(msg))
  return PTR_ERR(msg);
 ceph_con_send(&session->s_con, msg);
 return 0;
}

static int send_flushmsg_ack(struct ceph_mds_client *mdsc,
        struct ceph_mds_session *session, u64 seq)
{
 struct ceph_client *cl = mdsc->fsc->client;
 struct ceph_msg *msg;

 doutc(cl, "to mds%d (%s)s seq %lld\n", session->s_mds,
       ceph_session_state_name(session->s_state), seq);
 msg = ceph_create_session_msg(CEPH_SESSION_FLUSHMSG_ACK, seq);
 if (!msg)
  return -ENOMEM;
 ceph_con_send(&session->s_con, msg);
 return 0;
}


/*
 * Note new cap ttl, and any transition from stale -> not stale (fresh?).
 *
 * Called under session->s_mutex
 */

static void renewed_caps(struct ceph_mds_client *mdsc,
    struct ceph_mds_session *session, int is_renew)
{
 struct ceph_client *cl = mdsc->fsc->client;
 int was_stale;
 int wake = 0;

 spin_lock(&session->s_cap_lock);
 was_stale = is_renew && time_after_eq(jiffies, session->s_cap_ttl);

 session->s_cap_ttl = session->s_renew_requested +
  mdsc->mdsmap->m_session_timeout*HZ;

 if (was_stale) {
  if (time_before(jiffies, session->s_cap_ttl)) {
   pr_info_client(cl, "mds%d caps renewed\n",
           session->s_mds);
   wake = 1;
  } else {
   pr_info_client(cl, "mds%d caps still stale\n",
           session->s_mds);
  }
 }
 doutc(cl, "mds%d ttl now %lu, was %s, now %s\n", session->s_mds,
       session->s_cap_ttl, was_stale ? "stale" : "fresh",
       time_before(jiffies, session->s_cap_ttl) ? "stale" : "fresh");
 spin_unlock(&session->s_cap_lock);

 if (wake)
  wake_up_session_caps(session, RENEWCAPS);
}

/*
 * send a session close request
 */

static int request_close_session(struct ceph_mds_session *session)
{
 struct ceph_client *cl = session->s_mdsc->fsc->client;
 struct ceph_msg *msg;

 doutc(cl, "mds%d state %s seq %lld\n", session->s_mds,
       ceph_session_state_name(session->s_state), session->s_seq);
 msg = ceph_create_session_msg(CEPH_SESSION_REQUEST_CLOSE,
          session->s_seq);
 if (!msg)
  return -ENOMEM;
 ceph_con_send(&session->s_con, msg);
 return 1;
}

/*
 * Called with s_mutex held.
 */

static int __close_session(struct ceph_mds_client *mdsc,
    struct ceph_mds_session *session)
{
 if (session->s_state >= CEPH_MDS_SESSION_CLOSING)
  return 0;
 session->s_state = CEPH_MDS_SESSION_CLOSING;
 return request_close_session(session);
}

static bool drop_negative_children(struct dentry *dentry)
{
 struct dentry *child;
 bool all_negative = true;

 if (!d_is_dir(dentry))
  goto out;

 spin_lock(&dentry->d_lock);
 hlist_for_each_entry(child, &dentry->d_children, d_sib) {
  if (d_really_is_positive(child)) {
   all_negative = false;
   break;
  }
 }
 spin_unlock(&dentry->d_lock);

 if (all_negative)
  shrink_dcache_parent(dentry);
out:
 return all_negative;
}

/*
 * Trim old(er) caps.
 *
 * Because we can't cache an inode without one or more caps, we do
 * this indirectly: if a cap is unused, we prune its aliases, at which
 * point the inode will hopefully get dropped to.
 *
 * Yes, this is a bit sloppy.  Our only real goal here is to respond to
 * memory pressure from the MDS, though, so it needn't be perfect.
 */

static int trim_caps_cb(struct inode *inode, int mds, void *arg)
{
 struct ceph_mds_client *mdsc = ceph_sb_to_mdsc(inode->i_sb);
 struct ceph_client *cl = mdsc->fsc->client;
 int *remaining = arg;
 struct ceph_inode_info *ci = ceph_inode(inode);
 int used, wanted, oissued, mine;
 struct ceph_cap *cap;

 if (*remaining <= 0)
  return -1;

 spin_lock(&ci->i_ceph_lock);
 cap = __get_cap_for_mds(ci, mds);
 if (!cap) {
  spin_unlock(&ci->i_ceph_lock);
  return 0;
 }
 mine = cap->issued | cap->implemented;
 used = __ceph_caps_used(ci);
 wanted = __ceph_caps_file_wanted(ci);
 oissued = __ceph_caps_issued_other(ci, cap);

 doutc(cl, "%p %llx.%llx cap %p mine %s oissued %s used %s wanted %s\n",
       inode, ceph_vinop(inode), cap, ceph_cap_string(mine),
       ceph_cap_string(oissued), ceph_cap_string(used),
       ceph_cap_string(wanted));
 if (cap == ci->i_auth_cap) {
  if (ci->i_dirty_caps || ci->i_flushing_caps ||
      !list_empty(&ci->i_cap_snaps))
   goto out;
  if ((used | wanted) & CEPH_CAP_ANY_WR)
   goto out;
  /* Note: it's possible that i_filelock_ref becomes non-zero
 * after dropping auth caps. It doesn't hurt because reply
 * of lock mds request will re-add auth caps. */

  if (atomic_read(&ci->i_filelock_ref) > 0)
   goto out;
 }
 /* The inode has cached pages, but it's no longer used.
 * we can safely drop it */

 if (S_ISREG(inode->i_mode) &&
     wanted == 0 && used == CEPH_CAP_FILE_CACHE &&
     !(oissued & CEPH_CAP_FILE_CACHE)) {
   used = 0;
   oissued = 0;
 }
 if ((used | wanted) & ~oissued & mine)
  goto out;   /* we need these caps */

 if (oissued) {
  /* we aren't the only cap.. just remove us */
  ceph_remove_cap(mdsc, cap, true);
  (*remaining)--;
 } else {
  struct dentry *dentry;
  /* try dropping referring dentries */
  spin_unlock(&ci->i_ceph_lock);
  dentry = d_find_any_alias(inode);
  if (dentry && drop_negative_children(dentry)) {
   int count;
   dput(dentry);
   d_prune_aliases(inode);
   count = atomic_read(&inode->i_count);
   if (count == 1)
    (*remaining)--;
   doutc(cl, "%p %llx.%llx cap %p pruned, count now %d\n",
         inode, ceph_vinop(inode), cap, count);
  } else {
   dput(dentry);
  }
  return 0;
 }

out:
 spin_unlock(&ci->i_ceph_lock);
 return 0;
}

/*
 * Trim session cap count down to some max number.
 */

int ceph_trim_caps(struct ceph_mds_client *mdsc,
     struct ceph_mds_session *session,
     int max_caps)
{
 struct ceph_client *cl = mdsc->fsc->client;
 int trim_caps = session->s_nr_caps - max_caps;

 doutc(cl, "mds%d start: %d / %d, trim %d\n", session->s_mds,
       session->s_nr_caps, max_caps, trim_caps);
 if (trim_caps > 0) {
  int remaining = trim_caps;

  ceph_iterate_session_caps(session, trim_caps_cb, &remaining);
  doutc(cl, "mds%d done: %d / %d, trimmed %d\n",
        session->s_mds, session->s_nr_caps, max_caps,
        trim_caps - remaining);
 }

 ceph_flush_session_cap_releases(mdsc, session);
 return 0;
}

static int check_caps_flush(struct ceph_mds_client *mdsc,
       u64 want_flush_tid)
{
 struct ceph_client *cl = mdsc->fsc->client;
 int ret = 1;

 spin_lock(&mdsc->cap_dirty_lock);
 if (!list_empty(&mdsc->cap_flush_list)) {
  struct ceph_cap_flush *cf =
   list_first_entry(&mdsc->cap_flush_list,
      struct ceph_cap_flush, g_list);
  if (cf->tid <= want_flush_tid) {
   doutc(cl, "still flushing tid %llu <= %llu\n",
         cf->tid, want_flush_tid);
   ret = 0;
  }
 }
 spin_unlock(&mdsc->cap_dirty_lock);
 return ret;
}

/*
 * flush all dirty inode data to disk.
 *
 * returns true if we've flushed through want_flush_tid
 */

static void wait_caps_flush(struct ceph_mds_client *mdsc,
       u64 want_flush_tid)
{
 struct ceph_client *cl = mdsc->fsc->client;

 doutc(cl, "want %llu\n", want_flush_tid);

 wait_event(mdsc->cap_flushing_wq,
     check_caps_flush(mdsc, want_flush_tid));

 doutc(cl, "ok, flushed thru %llu\n", want_flush_tid);
}

/*
 * called under s_mutex
 */

static void ceph_send_cap_releases(struct ceph_mds_client *mdsc,
       struct ceph_mds_session *session)
{
 struct ceph_client *cl = mdsc->fsc->client;
 struct ceph_msg *msg = NULL;
 struct ceph_mds_cap_release *head;
 struct ceph_mds_cap_item *item;
 struct ceph_osd_client *osdc = &mdsc->fsc->client->osdc;
 struct ceph_cap *cap;
 LIST_HEAD(tmp_list);
 int num_cap_releases;
 __le32 barrier, *cap_barrier;

 down_read(&osdc->lock);
 barrier = cpu_to_le32(osdc->epoch_barrier);
 up_read(&osdc->lock);

 spin_lock(&session->s_cap_lock);
again:
 list_splice_init(&session->s_cap_releases, &tmp_list);
 num_cap_releases = session->s_num_cap_releases;
 session->s_num_cap_releases = 0;
 spin_unlock(&session->s_cap_lock);

 while (!list_empty(&tmp_list)) {
  if (!msg) {
   msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPRELEASE,
     PAGE_SIZE, GFP_NOFS, false);
   if (!msg)
    goto out_err;
   head = msg->front.iov_base;
   head->num = cpu_to_le32(0);
   msg->front.iov_len = sizeof(*head);

   msg->hdr.version = cpu_to_le16(2);
   msg->hdr.compat_version = cpu_to_le16(1);
  }

  cap = list_first_entry(&tmp_list, struct ceph_cap,
     session_caps);
  list_del(&cap->session_caps);
  num_cap_releases--;

  head = msg->front.iov_base;
  put_unaligned_le32(get_unaligned_le32(&head->num) + 1,
       &head->num);
  item = msg->front.iov_base + msg->front.iov_len;
  item->ino = cpu_to_le64(cap->cap_ino);
  item->cap_id = cpu_to_le64(cap->cap_id);
  item->migrate_seq = cpu_to_le32(cap->mseq);
  item->issue_seq = cpu_to_le32(cap->issue_seq);
  msg->front.iov_len += sizeof(*item);

  ceph_put_cap(mdsc, cap);

  if (le32_to_cpu(head->num) == CEPH_CAPS_PER_RELEASE) {
   // Append cap_barrier field
   cap_barrier = msg->front.iov_base + msg->front.iov_len;
   *cap_barrier = barrier;
   msg->front.iov_len += sizeof(*cap_barrier);

   msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
   doutc(cl, "mds%d %p\n", session->s_mds, msg);
   ceph_con_send(&session->s_con, msg);
   msg = NULL;
  }
 }

 BUG_ON(num_cap_releases != 0);

 spin_lock(&session->s_cap_lock);
 if (!list_empty(&session->s_cap_releases))
  goto again;
 spin_unlock(&session->s_cap_lock);

 if (msg) {
  // Append cap_barrier field
  cap_barrier = msg->front.iov_base + msg->front.iov_len;
  *cap_barrier = barrier;
  msg->front.iov_len += sizeof(*cap_barrier);

  msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
  doutc(cl, "mds%d %p\n", session->s_mds, msg);
  ceph_con_send(&session->s_con, msg);
 }
 return;
out_err:
 pr_err_client(cl, "mds%d, failed to allocate message\n",
        session->s_mds);
 spin_lock(&session->s_cap_lock);
 list_splice(&tmp_list, &session->s_cap_releases);
 session->s_num_cap_releases += num_cap_releases;
 spin_unlock(&session->s_cap_lock);
}

static void ceph_cap_release_work(struct work_struct *work)
{
 struct ceph_mds_session *session =
  container_of(work, struct ceph_mds_session, s_cap_release_work);

 mutex_lock(&session->s_mutex);
 if (session->s_state == CEPH_MDS_SESSION_OPEN ||
     session->s_state == CEPH_MDS_SESSION_HUNG)
  ceph_send_cap_releases(session->s_mdsc, session);
 mutex_unlock(&session->s_mutex);
 ceph_put_mds_session(session);
}

void ceph_flush_session_cap_releases(struct ceph_mds_client *mdsc,
               struct ceph_mds_session *session)
{
 struct ceph_client *cl = mdsc->fsc->client;
 if (mdsc->stopping)
  return;

 ceph_get_mds_session(session);
 if (queue_work(mdsc->fsc->cap_wq,
         &session->s_cap_release_work)) {
  doutc(cl, "cap release work queued\n");
 } else {
  ceph_put_mds_session(session);
  doutc(cl, "failed to queue cap release work\n");
 }
}

/*
 * caller holds session->s_cap_lock
 */

void __ceph_queue_cap_release(struct ceph_mds_session *session,
         struct ceph_cap *cap)
{
 list_add_tail(&cap->session_caps, &session->s_cap_releases);
 session->s_num_cap_releases++;

 if (!(session->s_num_cap_releases % CEPH_CAPS_PER_RELEASE))
  ceph_flush_session_cap_releases(session->s_mdsc, session);
}

static void ceph_cap_reclaim_work(struct work_struct *work)
{
 struct ceph_mds_client *mdsc =
  container_of(work, struct ceph_mds_client, cap_reclaim_work);
 int ret = ceph_trim_dentries(mdsc);
 if (ret == -EAGAIN)
  ceph_queue_cap_reclaim_work(mdsc);
}

void ceph_queue_cap_reclaim_work(struct ceph_mds_client *mdsc)
{
 struct ceph_client *cl = mdsc->fsc->client;
 if (mdsc->stopping)
  return;

        if (queue_work(mdsc->fsc->cap_wq, &mdsc->cap_reclaim_work)) {
                doutc(cl, "caps reclaim work queued\n");
        } else {
                doutc(cl, "failed to queue caps release work\n");
        }
}

void ceph_reclaim_caps_nr(struct ceph_mds_client *mdsc, int nr)
{
 int val;
 if (!nr)
  return;
 val = atomic_add_return(nr, &mdsc->cap_reclaim_pending);
 if ((val % CEPH_CAPS_PER_RELEASE) < nr) {
  atomic_set(&mdsc->cap_reclaim_pending, 0);
  ceph_queue_cap_reclaim_work(mdsc);
 }
}

void ceph_queue_cap_unlink_work(struct ceph_mds_client *mdsc)
{
 struct ceph_client *cl = mdsc->fsc->client;
 if (mdsc->stopping)
  return;

        if (queue_work(mdsc->fsc->cap_wq, &mdsc->cap_unlink_work)) {
                doutc(cl, "caps unlink work queued\n");
        } else {
                doutc(cl, "failed to queue caps unlink work\n");
        }
}

static void ceph_cap_unlink_work(struct work_struct *work)
{
 struct ceph_mds_client *mdsc =
  container_of(work, struct ceph_mds_client, cap_unlink_work);
 struct ceph_client *cl = mdsc->fsc->client;

 doutc(cl, "begin\n");
 spin_lock(&mdsc->cap_delay_lock);
 while (!list_empty(&mdsc->cap_unlink_delay_list)) {
  struct ceph_inode_info *ci;
  struct inode *inode;

  ci = list_first_entry(&mdsc->cap_unlink_delay_list,
          struct ceph_inode_info,
          i_cap_delay_list);
  list_del_init(&ci->i_cap_delay_list);

  inode = igrab(&ci->netfs.inode);
  if (inode) {
   spin_unlock(&mdsc->cap_delay_lock);
   doutc(cl, "on %p %llx.%llx\n", inode,
         ceph_vinop(inode));
   ceph_check_caps(ci, CHECK_CAPS_FLUSH);
   iput(inode);
   spin_lock(&mdsc->cap_delay_lock);
  }
--> --------------------

--> maximum size reached

--> --------------------

Messung V0.5
C=95 H=88 G=91

¤ Dauer der Verarbeitung: 0.22 Sekunden  ¤

*© Formatika GbR, Deutschland






Wurzel

Suchen

Beweissystem der NASA

Beweissystem Isabelle

NIST Cobol Testsuite

Cephes Mathematical Library

Wiener Entwicklungsmethode

Haftungshinweis

Die Informationen auf dieser Webseite wurden nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit, noch Qualität der bereit gestellten Informationen zugesichert.

Bemerkung:

Die farbliche Syntaxdarstellung und die Messung sind noch experimentell.