Quellcodebibliothek Statistik Leitseite products/Sources/formale Sprachen/C/Linux/net/ceph/   (Open Source Betriebssystem Version 6.17.9©)  Datei vom 24.10.2025 mit Größe 152 kB image not shown  

Quelle  osd_client.c   Sprache: C

 
// SPDX-License-Identifier: GPL-2.0

#include <linux/ceph/ceph_debug.h>

#include <linux/module.h>
#include <linux/err.h>
#include <linux/highmem.h>
#include <linux/mm.h>
#include <linux/pagemap.h>
#include <linux/slab.h>
#include <linux/uaccess.h>
#ifdef CONFIG_BLOCK
#include <linux/bio.h>
#endif

#include <linux/ceph/ceph_features.h>
#include <linux/ceph/libceph.h>
#include <linux/ceph/osd_client.h>
#include <linux/ceph/messenger.h>
#include <linux/ceph/decode.h>
#include <linux/ceph/auth.h>
#include <linux/ceph/pagelist.h>
#include <linux/ceph/striper.h>

#define OSD_OPREPLY_FRONT_LEN 512

static struct kmem_cache *ceph_osd_request_cache;

static const struct ceph_connection_operations osd_con_ops;

/*
 * Implement client access to distributed object storage cluster.
 *
 * All data objects are stored within a cluster/cloud of OSDs, or
 * "object storage devices."  (Note that Ceph OSDs have _nothing_ to
 * do with the T10 OSD extensions to SCSI.)  Ceph OSDs are simply
 * remote daemons serving up and coordinating consistent and safe
 * access to storage.
 *
 * Cluster membership and the mapping of data objects onto storage devices
 * are described by the osd map.
 *
 * We keep track of pending OSD requests (read, write), resubmit
 * requests to different OSDs when the cluster topology/data layout
 * change, or retry the affected requests when the communications
 * channel with an OSD is reset.
 */


static void link_request(struct ceph_osd *osd, struct ceph_osd_request *req);
static void unlink_request(struct ceph_osd *osd, struct ceph_osd_request *req);
static void link_linger(struct ceph_osd *osd,
   struct ceph_osd_linger_request *lreq);
static void unlink_linger(struct ceph_osd *osd,
     struct ceph_osd_linger_request *lreq);
static void clear_backoffs(struct ceph_osd *osd);

#if 1
static inline bool rwsem_is_wrlocked(struct rw_semaphore *sem)
{
 bool wrlocked = true;

 if (unlikely(down_read_trylock(sem))) {
  wrlocked = false;
  up_read(sem);
 }

 return wrlocked;
}
static inline void verify_osdc_locked(struct ceph_osd_client *osdc)
{
 WARN_ON(!rwsem_is_locked(&osdc->lock));
}
static inline void verify_osdc_wrlocked(struct ceph_osd_client *osdc)
{
 WARN_ON(!rwsem_is_wrlocked(&osdc->lock));
}
static inline void verify_osd_locked(struct ceph_osd *osd)
{
 struct ceph_osd_client *osdc = osd->o_osdc;

 WARN_ON(!(mutex_is_locked(&osd->lock) &&
    rwsem_is_locked(&osdc->lock)) &&
  !rwsem_is_wrlocked(&osdc->lock));
}
static inline void verify_lreq_locked(struct ceph_osd_linger_request *lreq)
{
 WARN_ON(!mutex_is_locked(&lreq->lock));
}
#else
static inline void verify_osdc_locked(struct ceph_osd_client *osdc) { }
static inline void verify_osdc_wrlocked(struct ceph_osd_client *osdc) { }
static inline void verify_osd_locked(struct ceph_osd *osd) { }
static inline void verify_lreq_locked(struct ceph_osd_linger_request *lreq) { }
#endif

/*
 * calculate the mapping of a file extent onto an object, and fill out the
 * request accordingly.  shorten extent as necessary if it crosses an
 * object boundary.
 *
 * fill osd op in request message.
 */

static int calc_layout(struct ceph_file_layout *layout, u64 off, u64 *plen,
   u64 *objnum, u64 *objoff, u64 *objlen)
{
 u64 orig_len = *plen;
 u32 xlen;

 /* object extent? */
 ceph_calc_file_object_mapping(layout, off, orig_len, objnum,
       objoff, &xlen);
 *objlen = xlen;
 if (*objlen < orig_len) {
  *plen = *objlen;
  dout(" skipping last %llu, final file extent %llu~%llu\n",
       orig_len - *plen, off, *plen);
 }

 dout("calc_layout objnum=%llx %llu~%llu\n", *objnum, *objoff, *objlen);
 return 0;
}

static void ceph_osd_data_init(struct ceph_osd_data *osd_data)
{
 memset(osd_data, 0, sizeof (*osd_data));
 osd_data->type = CEPH_OSD_DATA_TYPE_NONE;
}

/*
 * Consumes @pages if @own_pages is true.
 */

static void ceph_osd_data_pages_init(struct ceph_osd_data *osd_data,
   struct page **pages, u64 length, u32 alignment,
   bool pages_from_pool, bool own_pages)
{
 osd_data->type = CEPH_OSD_DATA_TYPE_PAGES;
 osd_data->pages = pages;
 osd_data->length = length;
 osd_data->alignment = alignment;
 osd_data->pages_from_pool = pages_from_pool;
 osd_data->own_pages = own_pages;
}

/*
 * Consumes a ref on @pagelist.
 */

static void ceph_osd_data_pagelist_init(struct ceph_osd_data *osd_data,
   struct ceph_pagelist *pagelist)
{
 osd_data->type = CEPH_OSD_DATA_TYPE_PAGELIST;
 osd_data->pagelist = pagelist;
}

#ifdef CONFIG_BLOCK
static void ceph_osd_data_bio_init(struct ceph_osd_data *osd_data,
       struct ceph_bio_iter *bio_pos,
       u32 bio_length)
{
 osd_data->type = CEPH_OSD_DATA_TYPE_BIO;
 osd_data->bio_pos = *bio_pos;
 osd_data->bio_length = bio_length;
}
#endif /* CONFIG_BLOCK */

static void ceph_osd_data_bvecs_init(struct ceph_osd_data *osd_data,
         struct ceph_bvec_iter *bvec_pos,
         u32 num_bvecs)
{
 osd_data->type = CEPH_OSD_DATA_TYPE_BVECS;
 osd_data->bvec_pos = *bvec_pos;
 osd_data->num_bvecs = num_bvecs;
}

static void ceph_osd_iter_init(struct ceph_osd_data *osd_data,
          struct iov_iter *iter)
{
 osd_data->type = CEPH_OSD_DATA_TYPE_ITER;
 osd_data->iter = *iter;
}

static struct ceph_osd_data *
osd_req_op_raw_data_in(struct ceph_osd_request *osd_req, unsigned int which)
{
 BUG_ON(which >= osd_req->r_num_ops);

 return &osd_req->r_ops[which].raw_data_in;
}

struct ceph_osd_data *
osd_req_op_extent_osd_data(struct ceph_osd_request *osd_req,
   unsigned int which)
{
 return osd_req_op_data(osd_req, which, extent, osd_data);
}
EXPORT_SYMBOL(osd_req_op_extent_osd_data);

void osd_req_op_raw_data_in_pages(struct ceph_osd_request *osd_req,
   unsigned int which, struct page **pages,
   u64 length, u32 alignment,
   bool pages_from_pool, bool own_pages)
{
 struct ceph_osd_data *osd_data;

 osd_data = osd_req_op_raw_data_in(osd_req, which);
 ceph_osd_data_pages_init(osd_data, pages, length, alignment,
    pages_from_pool, own_pages);
}
EXPORT_SYMBOL(osd_req_op_raw_data_in_pages);

void osd_req_op_extent_osd_data_pages(struct ceph_osd_request *osd_req,
   unsigned int which, struct page **pages,
   u64 length, u32 alignment,
   bool pages_from_pool, bool own_pages)
{
 struct ceph_osd_data *osd_data;

 osd_data = osd_req_op_data(osd_req, which, extent, osd_data);
 ceph_osd_data_pages_init(osd_data, pages, length, alignment,
    pages_from_pool, own_pages);
}
EXPORT_SYMBOL(osd_req_op_extent_osd_data_pages);

#ifdef CONFIG_BLOCK
void osd_req_op_extent_osd_data_bio(struct ceph_osd_request *osd_req,
        unsigned int which,
        struct ceph_bio_iter *bio_pos,
        u32 bio_length)
{
 struct ceph_osd_data *osd_data;

 osd_data = osd_req_op_data(osd_req, which, extent, osd_data);
 ceph_osd_data_bio_init(osd_data, bio_pos, bio_length);
}
EXPORT_SYMBOL(osd_req_op_extent_osd_data_bio);
#endif /* CONFIG_BLOCK */

void osd_req_op_extent_osd_data_bvecs(struct ceph_osd_request *osd_req,
          unsigned int which,
          struct bio_vec *bvecs, u32 num_bvecs,
          u32 bytes)
{
 struct ceph_osd_data *osd_data;
 struct ceph_bvec_iter it = {
  .bvecs = bvecs,
  .iter = { .bi_size = bytes },
 };

 osd_data = osd_req_op_data(osd_req, which, extent, osd_data);
 ceph_osd_data_bvecs_init(osd_data, &it, num_bvecs);
}
EXPORT_SYMBOL(osd_req_op_extent_osd_data_bvecs);

void osd_req_op_extent_osd_data_bvec_pos(struct ceph_osd_request *osd_req,
      unsigned int which,
      struct ceph_bvec_iter *bvec_pos)
{
 struct ceph_osd_data *osd_data;

 osd_data = osd_req_op_data(osd_req, which, extent, osd_data);
 ceph_osd_data_bvecs_init(osd_data, bvec_pos, 0);
}
EXPORT_SYMBOL(osd_req_op_extent_osd_data_bvec_pos);

/**
 * osd_req_op_extent_osd_iter - Set up an operation with an iterator buffer
 * @osd_req: The request to set up
 * @which: Index of the operation in which to set the iter
 * @iter: The buffer iterator
 */

void osd_req_op_extent_osd_iter(struct ceph_osd_request *osd_req,
    unsigned int which, struct iov_iter *iter)
{
 struct ceph_osd_data *osd_data;

 osd_data = osd_req_op_data(osd_req, which, extent, osd_data);
 ceph_osd_iter_init(osd_data, iter);
}
EXPORT_SYMBOL(osd_req_op_extent_osd_iter);

static void osd_req_op_cls_request_info_pagelist(
   struct ceph_osd_request *osd_req,
   unsigned int which, struct ceph_pagelist *pagelist)
{
 struct ceph_osd_data *osd_data;

 osd_data = osd_req_op_data(osd_req, which, cls, request_info);
 ceph_osd_data_pagelist_init(osd_data, pagelist);
}

void osd_req_op_cls_request_data_pages(struct ceph_osd_request *osd_req,
   unsigned int which, struct page **pages, u64 length,
   u32 alignment, bool pages_from_pool, bool own_pages)
{
 struct ceph_osd_data *osd_data;

 osd_data = osd_req_op_data(osd_req, which, cls, request_data);
 ceph_osd_data_pages_init(osd_data, pages, length, alignment,
    pages_from_pool, own_pages);
 osd_req->r_ops[which].cls.indata_len += length;
 osd_req->r_ops[which].indata_len += length;
}
EXPORT_SYMBOL(osd_req_op_cls_request_data_pages);

void osd_req_op_cls_request_data_bvecs(struct ceph_osd_request *osd_req,
           unsigned int which,
           struct bio_vec *bvecs, u32 num_bvecs,
           u32 bytes)
{
 struct ceph_osd_data *osd_data;
 struct ceph_bvec_iter it = {
  .bvecs = bvecs,
  .iter = { .bi_size = bytes },
 };

 osd_data = osd_req_op_data(osd_req, which, cls, request_data);
 ceph_osd_data_bvecs_init(osd_data, &it, num_bvecs);
 osd_req->r_ops[which].cls.indata_len += bytes;
 osd_req->r_ops[which].indata_len += bytes;
}
EXPORT_SYMBOL(osd_req_op_cls_request_data_bvecs);

void osd_req_op_cls_response_data_pages(struct ceph_osd_request *osd_req,
   unsigned int which, struct page **pages, u64 length,
   u32 alignment, bool pages_from_pool, bool own_pages)
{
 struct ceph_osd_data *osd_data;

 osd_data = osd_req_op_data(osd_req, which, cls, response_data);
 ceph_osd_data_pages_init(osd_data, pages, length, alignment,
    pages_from_pool, own_pages);
}
EXPORT_SYMBOL(osd_req_op_cls_response_data_pages);

static u64 ceph_osd_data_length(struct ceph_osd_data *osd_data)
{
 switch (osd_data->type) {
 case CEPH_OSD_DATA_TYPE_NONE:
  return 0;
 case CEPH_OSD_DATA_TYPE_PAGES:
  return osd_data->length;
 case CEPH_OSD_DATA_TYPE_PAGELIST:
  return (u64)osd_data->pagelist->length;
#ifdef CONFIG_BLOCK
 case CEPH_OSD_DATA_TYPE_BIO:
  return (u64)osd_data->bio_length;
#endif /* CONFIG_BLOCK */
 case CEPH_OSD_DATA_TYPE_BVECS:
  return osd_data->bvec_pos.iter.bi_size;
 case CEPH_OSD_DATA_TYPE_ITER:
  return iov_iter_count(&osd_data->iter);
 default:
  WARN(true"unrecognized data type %d\n", (int)osd_data->type);
  return 0;
 }
}

static void ceph_osd_data_release(struct ceph_osd_data *osd_data)
{
 if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES && osd_data->own_pages) {
  int num_pages;

  num_pages = calc_pages_for((u64)osd_data->alignment,
      (u64)osd_data->length);
  ceph_release_page_vector(osd_data->pages, num_pages);
 } else if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGELIST) {
  ceph_pagelist_release(osd_data->pagelist);
 }
 ceph_osd_data_init(osd_data);
}

static void osd_req_op_data_release(struct ceph_osd_request *osd_req,
   unsigned int which)
{
 struct ceph_osd_req_op *op;

 BUG_ON(which >= osd_req->r_num_ops);
 op = &osd_req->r_ops[which];

 switch (op->op) {
 case CEPH_OSD_OP_READ:
 case CEPH_OSD_OP_SPARSE_READ:
 case CEPH_OSD_OP_WRITE:
 case CEPH_OSD_OP_WRITEFULL:
  kfree(op->extent.sparse_ext);
  ceph_osd_data_release(&op->extent.osd_data);
  break;
 case CEPH_OSD_OP_CALL:
  ceph_osd_data_release(&op->cls.request_info);
  ceph_osd_data_release(&op->cls.request_data);
  ceph_osd_data_release(&op->cls.response_data);
  break;
 case CEPH_OSD_OP_SETXATTR:
 case CEPH_OSD_OP_CMPXATTR:
  ceph_osd_data_release(&op->xattr.osd_data);
  break;
 case CEPH_OSD_OP_STAT:
  ceph_osd_data_release(&op->raw_data_in);
  break;
 case CEPH_OSD_OP_NOTIFY_ACK:
  ceph_osd_data_release(&op->notify_ack.request_data);
  break;
 case CEPH_OSD_OP_NOTIFY:
  ceph_osd_data_release(&op->notify.request_data);
  ceph_osd_data_release(&op->notify.response_data);
  break;
 case CEPH_OSD_OP_LIST_WATCHERS:
  ceph_osd_data_release(&op->list_watchers.response_data);
  break;
 case CEPH_OSD_OP_COPY_FROM2:
  ceph_osd_data_release(&op->copy_from.osd_data);
  break;
 default:
  break;
 }
}

/*
 * Assumes @t is zero-initialized.
 */

static void target_init(struct ceph_osd_request_target *t)
{
 ceph_oid_init(&t->base_oid);
 ceph_oloc_init(&t->base_oloc);
 ceph_oid_init(&t->target_oid);
 ceph_oloc_init(&t->target_oloc);

 ceph_osds_init(&t->acting);
 ceph_osds_init(&t->up);
 t->size = -1;
 t->min_size = -1;

 t->osd = CEPH_HOMELESS_OSD;
}

static void target_copy(struct ceph_osd_request_target *dest,
   const struct ceph_osd_request_target *src)
{
 ceph_oid_copy(&dest->base_oid, &src->base_oid);
 ceph_oloc_copy(&dest->base_oloc, &src->base_oloc);
 ceph_oid_copy(&dest->target_oid, &src->target_oid);
 ceph_oloc_copy(&dest->target_oloc, &src->target_oloc);

 dest->pgid = src->pgid; /* struct */
 dest->spgid = src->spgid; /* struct */
 dest->pg_num = src->pg_num;
 dest->pg_num_mask = src->pg_num_mask;
 ceph_osds_copy(&dest->acting, &src->acting);
 ceph_osds_copy(&dest->up, &src->up);
 dest->size = src->size;
 dest->min_size = src->min_size;
 dest->sort_bitwise = src->sort_bitwise;
 dest->recovery_deletes = src->recovery_deletes;

 dest->flags = src->flags;
 dest->used_replica = src->used_replica;
 dest->paused = src->paused;

 dest->epoch = src->epoch;
 dest->last_force_resend = src->last_force_resend;

 dest->osd = src->osd;
}

static void target_destroy(struct ceph_osd_request_target *t)
{
 ceph_oid_destroy(&t->base_oid);
 ceph_oloc_destroy(&t->base_oloc);
 ceph_oid_destroy(&t->target_oid);
 ceph_oloc_destroy(&t->target_oloc);
}

/*
 * requests
 */

static void request_release_checks(struct ceph_osd_request *req)
{
 WARN_ON(!RB_EMPTY_NODE(&req->r_node));
 WARN_ON(!RB_EMPTY_NODE(&req->r_mc_node));
 WARN_ON(!list_empty(&req->r_private_item));
 WARN_ON(req->r_osd);
}

static void ceph_osdc_release_request(struct kref *kref)
{
 struct ceph_osd_request *req = container_of(kref,
         struct ceph_osd_request, r_kref);
 unsigned int which;

 dout("%s %p (r_request %p r_reply %p)\n", __func__, req,
      req->r_request, req->r_reply);
 request_release_checks(req);

 if (req->r_request)
  ceph_msg_put(req->r_request);
 if (req->r_reply)
  ceph_msg_put(req->r_reply);

 for (which = 0; which < req->r_num_ops; which++)
  osd_req_op_data_release(req, which);

 target_destroy(&req->r_t);
 ceph_put_snap_context(req->r_snapc);

 if (req->r_mempool)
  mempool_free(req, req->r_osdc->req_mempool);
 else if (req->r_num_ops <= CEPH_OSD_SLAB_OPS)
  kmem_cache_free(ceph_osd_request_cache, req);
 else
  kfree(req);
}

void ceph_osdc_get_request(struct ceph_osd_request *req)
{
 dout("%s %p (was %d)\n", __func__, req,
      kref_read(&req->r_kref));
 kref_get(&req->r_kref);
}
EXPORT_SYMBOL(ceph_osdc_get_request);

void ceph_osdc_put_request(struct ceph_osd_request *req)
{
 if (req) {
  dout("%s %p (was %d)\n", __func__, req,
       kref_read(&req->r_kref));
  kref_put(&req->r_kref, ceph_osdc_release_request);
 }
}
EXPORT_SYMBOL(ceph_osdc_put_request);

static void request_init(struct ceph_osd_request *req)
{
 /* req only, each op is zeroed in osd_req_op_init() */
 memset(req, 0, sizeof(*req));

 kref_init(&req->r_kref);
 init_completion(&req->r_completion);
 RB_CLEAR_NODE(&req->r_node);
 RB_CLEAR_NODE(&req->r_mc_node);
 INIT_LIST_HEAD(&req->r_private_item);

 target_init(&req->r_t);
}

struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
            struct ceph_snap_context *snapc,
            unsigned int num_ops,
            bool use_mempool,
            gfp_t gfp_flags)
{
 struct ceph_osd_request *req;

 if (use_mempool) {
  BUG_ON(num_ops > CEPH_OSD_SLAB_OPS);
  req = mempool_alloc(osdc->req_mempool, gfp_flags);
 } else if (num_ops <= CEPH_OSD_SLAB_OPS) {
  req = kmem_cache_alloc(ceph_osd_request_cache, gfp_flags);
 } else {
  BUG_ON(num_ops > CEPH_OSD_MAX_OPS);
  req = kmalloc(struct_size(req, r_ops, num_ops), gfp_flags);
 }
 if (unlikely(!req))
  return NULL;

 request_init(req);
 req->r_osdc = osdc;
 req->r_mempool = use_mempool;
 req->r_num_ops = num_ops;
 req->r_snapid = CEPH_NOSNAP;
 req->r_snapc = ceph_get_snap_context(snapc);

 dout("%s req %p\n", __func__, req);
 return req;
}
EXPORT_SYMBOL(ceph_osdc_alloc_request);

static int ceph_oloc_encoding_size(const struct ceph_object_locator *oloc)
{
 return 8 + 4 + 4 + 4 + (oloc->pool_ns ? oloc->pool_ns->len : 0);
}

static int __ceph_osdc_alloc_messages(struct ceph_osd_request *req, gfp_t gfp,
          int num_request_data_items,
          int num_reply_data_items)
{
 struct ceph_osd_client *osdc = req->r_osdc;
 struct ceph_msg *msg;
 int msg_size;

 WARN_ON(req->r_request || req->r_reply);
 WARN_ON(ceph_oid_empty(&req->r_base_oid));
 WARN_ON(ceph_oloc_empty(&req->r_base_oloc));

 /* create request message */
 msg_size = CEPH_ENCODING_START_BLK_LEN +
   CEPH_PGID_ENCODING_LEN + 1; /* spgid */
 msg_size += 4 + 4 + 4; /* hash, osdmap_epoch, flags */
 msg_size += CEPH_ENCODING_START_BLK_LEN +
   sizeof(struct ceph_osd_reqid); /* reqid */
 msg_size += sizeof(struct ceph_blkin_trace_info); /* trace */
 msg_size += 4 + sizeof(struct ceph_timespec); /* client_inc, mtime */
 msg_size += CEPH_ENCODING_START_BLK_LEN +
   ceph_oloc_encoding_size(&req->r_base_oloc); /* oloc */
 msg_size += 4 + req->r_base_oid.name_len; /* oid */
 msg_size += 2 + req->r_num_ops * sizeof(struct ceph_osd_op);
 msg_size += 8; /* snapid */
 msg_size += 8; /* snap_seq */
 msg_size += 4 + 8 * (req->r_snapc ? req->r_snapc->num_snaps : 0);
 msg_size += 4 + 8; /* retry_attempt, features */

 if (req->r_mempool)
  msg = ceph_msgpool_get(&osdc->msgpool_op, msg_size,
           num_request_data_items);
 else
  msg = ceph_msg_new2(CEPH_MSG_OSD_OP, msg_size,
        num_request_data_items, gfp, true);
 if (!msg)
  return -ENOMEM;

 memset(msg->front.iov_base, 0, msg->front.iov_len);
 req->r_request = msg;

 /* create reply message */
 msg_size = OSD_OPREPLY_FRONT_LEN;
 msg_size += req->r_base_oid.name_len;
 msg_size += req->r_num_ops * sizeof(struct ceph_osd_op);

 if (req->r_mempool)
  msg = ceph_msgpool_get(&osdc->msgpool_op_reply, msg_size,
           num_reply_data_items);
 else
  msg = ceph_msg_new2(CEPH_MSG_OSD_OPREPLY, msg_size,
        num_reply_data_items, gfp, true);
 if (!msg)
  return -ENOMEM;

 req->r_reply = msg;

 return 0;
}

static bool osd_req_opcode_valid(u16 opcode)
{
 switch (opcode) {
#define GENERATE_CASE(op, opcode, str) case CEPH_OSD_OP_##op: return true;
__CEPH_FORALL_OSD_OPS(GENERATE_CASE)
#undef GENERATE_CASE
 default:
  return false;
 }
}

static void get_num_data_items(struct ceph_osd_request *req,
          int *num_request_data_items,
          int *num_reply_data_items)
{
 struct ceph_osd_req_op *op;

 *num_request_data_items = 0;
 *num_reply_data_items = 0;

 for (op = req->r_ops; op != &req->r_ops[req->r_num_ops]; op++) {
  switch (op->op) {
  /* request */
  case CEPH_OSD_OP_WRITE:
  case CEPH_OSD_OP_WRITEFULL:
  case CEPH_OSD_OP_SETXATTR:
  case CEPH_OSD_OP_CMPXATTR:
  case CEPH_OSD_OP_NOTIFY_ACK:
  case CEPH_OSD_OP_COPY_FROM2:
   *num_request_data_items += 1;
   break;

  /* reply */
  case CEPH_OSD_OP_STAT:
  case CEPH_OSD_OP_READ:
  case CEPH_OSD_OP_SPARSE_READ:
  case CEPH_OSD_OP_LIST_WATCHERS:
   *num_reply_data_items += 1;
   break;

  /* both */
  case CEPH_OSD_OP_NOTIFY:
   *num_request_data_items += 1;
   *num_reply_data_items += 1;
   break;
  case CEPH_OSD_OP_CALL:
   *num_request_data_items += 2;
   *num_reply_data_items += 1;
   break;

  default:
   WARN_ON(!osd_req_opcode_valid(op->op));
   break;
  }
 }
}

/*
 * oid, oloc and OSD op opcode(s) must be filled in before this function
 * is called.
 */

int ceph_osdc_alloc_messages(struct ceph_osd_request *req, gfp_t gfp)
{
 int num_request_data_items, num_reply_data_items;

 get_num_data_items(req, &num_request_data_items, &num_reply_data_items);
 return __ceph_osdc_alloc_messages(req, gfp, num_request_data_items,
       num_reply_data_items);
}
EXPORT_SYMBOL(ceph_osdc_alloc_messages);

/*
 * This is an osd op init function for opcodes that have no data or
 * other information associated with them.  It also serves as a
 * common init routine for all the other init functions, below.
 */

struct ceph_osd_req_op *
osd_req_op_init(struct ceph_osd_request *osd_req, unsigned int which,
   u16 opcode, u32 flags)
{
 struct ceph_osd_req_op *op;

 BUG_ON(which >= osd_req->r_num_ops);
 BUG_ON(!osd_req_opcode_valid(opcode));

 op = &osd_req->r_ops[which];
 memset(op, 0, sizeof (*op));
 op->op = opcode;
 op->flags = flags;

 return op;
}
EXPORT_SYMBOL(osd_req_op_init);

void osd_req_op_extent_init(struct ceph_osd_request *osd_req,
    unsigned int which, u16 opcode,
    u64 offset, u64 length,
    u64 truncate_size, u32 truncate_seq)
{
 struct ceph_osd_req_op *op = osd_req_op_init(osd_req, which,
           opcode, 0);
 size_t payload_len = 0;

 BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE &&
        opcode != CEPH_OSD_OP_WRITEFULL && opcode != CEPH_OSD_OP_ZERO &&
        opcode != CEPH_OSD_OP_TRUNCATE && opcode != CEPH_OSD_OP_SPARSE_READ);

 op->extent.offset = offset;
 op->extent.length = length;
 op->extent.truncate_size = truncate_size;
 op->extent.truncate_seq = truncate_seq;
 if (opcode == CEPH_OSD_OP_WRITE || opcode == CEPH_OSD_OP_WRITEFULL)
  payload_len += length;

 op->indata_len = payload_len;
}
EXPORT_SYMBOL(osd_req_op_extent_init);

void osd_req_op_extent_update(struct ceph_osd_request *osd_req,
    unsigned int which, u64 length)
{
 struct ceph_osd_req_op *op;
 u64 previous;

 BUG_ON(which >= osd_req->r_num_ops);
 op = &osd_req->r_ops[which];
 previous = op->extent.length;

 if (length == previous)
  return;  /* Nothing to do */
 BUG_ON(length > previous);

 op->extent.length = length;
 if (op->op == CEPH_OSD_OP_WRITE || op->op == CEPH_OSD_OP_WRITEFULL)
  op->indata_len -= previous - length;
}
EXPORT_SYMBOL(osd_req_op_extent_update);

void osd_req_op_extent_dup_last(struct ceph_osd_request *osd_req,
    unsigned int which, u64 offset_inc)
{
 struct ceph_osd_req_op *op, *prev_op;

 BUG_ON(which + 1 >= osd_req->r_num_ops);

 prev_op = &osd_req->r_ops[which];
 op = osd_req_op_init(osd_req, which + 1, prev_op->op, prev_op->flags);
 /* dup previous one */
 op->indata_len = prev_op->indata_len;
 op->outdata_len = prev_op->outdata_len;
 op->extent = prev_op->extent;
 /* adjust offset */
 op->extent.offset += offset_inc;
 op->extent.length -= offset_inc;

 if (op->op == CEPH_OSD_OP_WRITE || op->op == CEPH_OSD_OP_WRITEFULL)
  op->indata_len -= offset_inc;
}
EXPORT_SYMBOL(osd_req_op_extent_dup_last);

int osd_req_op_cls_init(struct ceph_osd_request *osd_req, unsigned int which,
   const char *classconst char *method)
{
 struct ceph_osd_req_op *op;
 struct ceph_pagelist *pagelist;
 size_t payload_len = 0;
 size_t size;
 int ret;

 op = osd_req_op_init(osd_req, which, CEPH_OSD_OP_CALL, 0);

 pagelist = ceph_pagelist_alloc(GFP_NOFS);
 if (!pagelist)
  return -ENOMEM;

 op->cls.class_name = class;
 size = strlen(class);
 BUG_ON(size > (size_t) U8_MAX);
 op->cls.class_len = size;
 ret = ceph_pagelist_append(pagelist, class, size);
 if (ret)
  goto err_pagelist_free;
 payload_len += size;

 op->cls.method_name = method;
 size = strlen(method);
 BUG_ON(size > (size_t) U8_MAX);
 op->cls.method_len = size;
 ret = ceph_pagelist_append(pagelist, method, size);
 if (ret)
  goto err_pagelist_free;
 payload_len += size;

 osd_req_op_cls_request_info_pagelist(osd_req, which, pagelist);
 op->indata_len = payload_len;
 return 0;

err_pagelist_free:
 ceph_pagelist_release(pagelist);
 return ret;
}
EXPORT_SYMBOL(osd_req_op_cls_init);

int osd_req_op_xattr_init(struct ceph_osd_request *osd_req, unsigned int which,
     u16 opcode, const char *name, const void *value,
     size_t size, u8 cmp_op, u8 cmp_mode)
{
 struct ceph_osd_req_op *op = osd_req_op_init(osd_req, which,
           opcode, 0);
 struct ceph_pagelist *pagelist;
 size_t payload_len;
 int ret;

 BUG_ON(opcode != CEPH_OSD_OP_SETXATTR && opcode != CEPH_OSD_OP_CMPXATTR);

 pagelist = ceph_pagelist_alloc(GFP_NOFS);
 if (!pagelist)
  return -ENOMEM;

 payload_len = strlen(name);
 op->xattr.name_len = payload_len;
 ret = ceph_pagelist_append(pagelist, name, payload_len);
 if (ret)
  goto err_pagelist_free;

 op->xattr.value_len = size;
 ret = ceph_pagelist_append(pagelist, value, size);
 if (ret)
  goto err_pagelist_free;
 payload_len += size;

 op->xattr.cmp_op = cmp_op;
 op->xattr.cmp_mode = cmp_mode;

 ceph_osd_data_pagelist_init(&op->xattr.osd_data, pagelist);
 op->indata_len = payload_len;
 return 0;

err_pagelist_free:
 ceph_pagelist_release(pagelist);
 return ret;
}
EXPORT_SYMBOL(osd_req_op_xattr_init);

/*
 * @watch_opcode: CEPH_OSD_WATCH_OP_*
 */

static void osd_req_op_watch_init(struct ceph_osd_request *req, int which,
      u8 watch_opcode, u64 cookie, u32 gen)
{
 struct ceph_osd_req_op *op;

 op = osd_req_op_init(req, which, CEPH_OSD_OP_WATCH, 0);
 op->watch.cookie = cookie;
 op->watch.op = watch_opcode;
 op->watch.gen = gen;
}

/*
 * prot_ver, timeout and notify payload (may be empty) should already be
 * encoded in @request_pl
 */

static void osd_req_op_notify_init(struct ceph_osd_request *req, int which,
       u64 cookie, struct ceph_pagelist *request_pl)
{
 struct ceph_osd_req_op *op;

 op = osd_req_op_init(req, which, CEPH_OSD_OP_NOTIFY, 0);
 op->notify.cookie = cookie;

 ceph_osd_data_pagelist_init(&op->notify.request_data, request_pl);
 op->indata_len = request_pl->length;
}

/*
 * @flags: CEPH_OSD_OP_ALLOC_HINT_FLAG_*
 */

void osd_req_op_alloc_hint_init(struct ceph_osd_request *osd_req,
    unsigned int which,
    u64 expected_object_size,
    u64 expected_write_size,
    u32 flags)
{
 struct ceph_osd_req_op *op;

 op = osd_req_op_init(osd_req, which, CEPH_OSD_OP_SETALLOCHINT, 0);
 op->alloc_hint.expected_object_size = expected_object_size;
 op->alloc_hint.expected_write_size = expected_write_size;
 op->alloc_hint.flags = flags;

 /*
 * CEPH_OSD_OP_SETALLOCHINT op is advisory and therefore deemed
 * not worth a feature bit.  Set FAILOK per-op flag to make
 * sure older osds don't trip over an unsupported opcode.
 */

 op->flags |= CEPH_OSD_OP_FLAG_FAILOK;
}
EXPORT_SYMBOL(osd_req_op_alloc_hint_init);

static void ceph_osdc_msg_data_add(struct ceph_msg *msg,
    struct ceph_osd_data *osd_data)
{
 u64 length = ceph_osd_data_length(osd_data);

 if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES) {
  BUG_ON(length > (u64) SIZE_MAX);
  if (length)
   ceph_msg_data_add_pages(msg, osd_data->pages,
     length, osd_data->alignment, false);
 } else if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGELIST) {
  BUG_ON(!length);
  ceph_msg_data_add_pagelist(msg, osd_data->pagelist);
#ifdef CONFIG_BLOCK
 } else if (osd_data->type == CEPH_OSD_DATA_TYPE_BIO) {
  ceph_msg_data_add_bio(msg, &osd_data->bio_pos, length);
#endif
 } else if (osd_data->type == CEPH_OSD_DATA_TYPE_BVECS) {
  ceph_msg_data_add_bvecs(msg, &osd_data->bvec_pos);
 } else if (osd_data->type == CEPH_OSD_DATA_TYPE_ITER) {
  ceph_msg_data_add_iter(msg, &osd_data->iter);
 } else {
  BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_NONE);
 }
}

static u32 osd_req_encode_op(struct ceph_osd_op *dst,
        const struct ceph_osd_req_op *src)
{
 switch (src->op) {
 case CEPH_OSD_OP_STAT:
  break;
 case CEPH_OSD_OP_READ:
 case CEPH_OSD_OP_SPARSE_READ:
 case CEPH_OSD_OP_WRITE:
 case CEPH_OSD_OP_WRITEFULL:
 case CEPH_OSD_OP_ZERO:
 case CEPH_OSD_OP_TRUNCATE:
  dst->extent.offset = cpu_to_le64(src->extent.offset);
  dst->extent.length = cpu_to_le64(src->extent.length);
  dst->extent.truncate_size =
   cpu_to_le64(src->extent.truncate_size);
  dst->extent.truncate_seq =
   cpu_to_le32(src->extent.truncate_seq);
  break;
 case CEPH_OSD_OP_CALL:
  dst->cls.class_len = src->cls.class_len;
  dst->cls.method_len = src->cls.method_len;
  dst->cls.indata_len = cpu_to_le32(src->cls.indata_len);
  break;
 case CEPH_OSD_OP_WATCH:
  dst->watch.cookie = cpu_to_le64(src->watch.cookie);
  dst->watch.ver = cpu_to_le64(0);
  dst->watch.op = src->watch.op;
  dst->watch.gen = cpu_to_le32(src->watch.gen);
  break;
 case CEPH_OSD_OP_NOTIFY_ACK:
  break;
 case CEPH_OSD_OP_NOTIFY:
  dst->notify.cookie = cpu_to_le64(src->notify.cookie);
  break;
 case CEPH_OSD_OP_LIST_WATCHERS:
  break;
 case CEPH_OSD_OP_SETALLOCHINT:
  dst->alloc_hint.expected_object_size =
      cpu_to_le64(src->alloc_hint.expected_object_size);
  dst->alloc_hint.expected_write_size =
      cpu_to_le64(src->alloc_hint.expected_write_size);
  dst->alloc_hint.flags = cpu_to_le32(src->alloc_hint.flags);
  break;
 case CEPH_OSD_OP_SETXATTR:
 case CEPH_OSD_OP_CMPXATTR:
  dst->xattr.name_len = cpu_to_le32(src->xattr.name_len);
  dst->xattr.value_len = cpu_to_le32(src->xattr.value_len);
  dst->xattr.cmp_op = src->xattr.cmp_op;
  dst->xattr.cmp_mode = src->xattr.cmp_mode;
  break;
 case CEPH_OSD_OP_CREATE:
 case CEPH_OSD_OP_DELETE:
  break;
 case CEPH_OSD_OP_COPY_FROM2:
  dst->copy_from.snapid = cpu_to_le64(src->copy_from.snapid);
  dst->copy_from.src_version =
   cpu_to_le64(src->copy_from.src_version);
  dst->copy_from.flags = src->copy_from.flags;
  dst->copy_from.src_fadvise_flags =
   cpu_to_le32(src->copy_from.src_fadvise_flags);
  break;
 case CEPH_OSD_OP_ASSERT_VER:
  dst->assert_ver.unused = cpu_to_le64(0);
  dst->assert_ver.ver = cpu_to_le64(src->assert_ver.ver);
  break;
 default:
  pr_err("unsupported osd opcode %s\n",
   ceph_osd_op_name(src->op));
  WARN_ON(1);

  return 0;
 }

 dst->op = cpu_to_le16(src->op);
 dst->flags = cpu_to_le32(src->flags);
 dst->payload_len = cpu_to_le32(src->indata_len);

 return src->indata_len;
}

/*
 * build new request AND message, calculate layout, and adjust file
 * extent as needed.
 *
 * if the file was recently truncated, we include information about its
 * old and new size so that the object can be updated appropriately.  (we
 * avoid synchronously deleting truncated objects because it's slow.)
 */

struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
            struct ceph_file_layout *layout,
            struct ceph_vino vino,
            u64 off, u64 *plen,
            unsigned int which, int num_ops,
            int opcode, int flags,
            struct ceph_snap_context *snapc,
            u32 truncate_seq,
            u64 truncate_size,
            bool use_mempool)
{
 struct ceph_osd_request *req;
 u64 objnum = 0;
 u64 objoff = 0;
 u64 objlen = 0;
 int r;

 BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE &&
        opcode != CEPH_OSD_OP_ZERO && opcode != CEPH_OSD_OP_TRUNCATE &&
        opcode != CEPH_OSD_OP_CREATE && opcode != CEPH_OSD_OP_DELETE &&
        opcode != CEPH_OSD_OP_SPARSE_READ);

 req = ceph_osdc_alloc_request(osdc, snapc, num_ops, use_mempool,
     GFP_NOFS);
 if (!req) {
  r = -ENOMEM;
  goto fail;
 }

 /* calculate max write size */
 r = calc_layout(layout, off, plen, &objnum, &objoff, &objlen);
 if (r)
  goto fail;

 if (opcode == CEPH_OSD_OP_CREATE || opcode == CEPH_OSD_OP_DELETE) {
  osd_req_op_init(req, which, opcode, 0);
 } else {
  u32 object_size = layout->object_size;
  u32 object_base = off - objoff;
  if (!(truncate_seq == 1 && truncate_size == -1ULL)) {
   if (truncate_size <= object_base) {
    truncate_size = 0;
   } else {
    truncate_size -= object_base;
    if (truncate_size > object_size)
     truncate_size = object_size;
   }
  }
  osd_req_op_extent_init(req, which, opcode, objoff, objlen,
           truncate_size, truncate_seq);
 }

 req->r_base_oloc.pool = layout->pool_id;
 req->r_base_oloc.pool_ns = ceph_try_get_string(layout->pool_ns);
 ceph_oid_printf(&req->r_base_oid, "%llx.%08llx", vino.ino, objnum);
 req->r_flags = flags | osdc->client->options->read_from_replica;

 req->r_snapid = vino.snap;
 if (flags & CEPH_OSD_FLAG_WRITE)
  req->r_data_offset = off;

 if (num_ops > 1) {
  int num_req_ops, num_rep_ops;

  /*
 * If this is a multi-op write request, assume that we'll need
 * request ops. If it's a multi-op read then assume we'll need
 * reply ops. Anything else and call it -EINVAL.
 */

  if (flags & CEPH_OSD_FLAG_WRITE) {
   num_req_ops = num_ops;
   num_rep_ops = 0;
  } else if (flags & CEPH_OSD_FLAG_READ) {
   num_req_ops = 0;
   num_rep_ops = num_ops;
  } else {
   r = -EINVAL;
   goto fail;
  }

  r = __ceph_osdc_alloc_messages(req, GFP_NOFS, num_req_ops,
            num_rep_ops);
 } else {
  r = ceph_osdc_alloc_messages(req, GFP_NOFS);
 }
 if (r)
  goto fail;

 return req;

fail:
 ceph_osdc_put_request(req);
 return ERR_PTR(r);
}
EXPORT_SYMBOL(ceph_osdc_new_request);

int __ceph_alloc_sparse_ext_map(struct ceph_osd_req_op *op, int cnt)
{
 WARN_ON(op->op != CEPH_OSD_OP_SPARSE_READ);

 op->extent.sparse_ext_cnt = cnt;
 op->extent.sparse_ext = kmalloc_array(cnt,
           sizeof(*op->extent.sparse_ext),
           GFP_NOFS);
 if (!op->extent.sparse_ext)
  return -ENOMEM;
 return 0;
}
EXPORT_SYMBOL(__ceph_alloc_sparse_ext_map);

/*
 * We keep osd requests in an rbtree, sorted by ->r_tid.
 */

DEFINE_RB_FUNCS(request, struct ceph_osd_request, r_tid, r_node)
DEFINE_RB_FUNCS(request_mc, struct ceph_osd_request, r_tid, r_mc_node)

/*
 * Call @fn on each OSD request as long as @fn returns 0.
 */

static void for_each_request(struct ceph_osd_client *osdc,
   int (*fn)(struct ceph_osd_request *req, void *arg),
   void *arg)
{
 struct rb_node *n, *p;

 for (n = rb_first(&osdc->osds); n; n = rb_next(n)) {
  struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node);

  for (p = rb_first(&osd->o_requests); p; ) {
   struct ceph_osd_request *req =
       rb_entry(p, struct ceph_osd_request, r_node);

   p = rb_next(p);
   if (fn(req, arg))
    return;
  }
 }

 for (p = rb_first(&osdc->homeless_osd.o_requests); p; ) {
  struct ceph_osd_request *req =
      rb_entry(p, struct ceph_osd_request, r_node);

  p = rb_next(p);
  if (fn(req, arg))
   return;
 }
}

static bool osd_homeless(struct ceph_osd *osd)
{
 return osd->o_osd == CEPH_HOMELESS_OSD;
}

static bool osd_registered(struct ceph_osd *osd)
{
 verify_osdc_locked(osd->o_osdc);

 return !RB_EMPTY_NODE(&osd->o_node);
}

/*
 * Assumes @osd is zero-initialized.
 */

static void osd_init(struct ceph_osd *osd)
{
 refcount_set(&osd->o_ref, 1);
 RB_CLEAR_NODE(&osd->o_node);
 spin_lock_init(&osd->o_requests_lock);
 osd->o_requests = RB_ROOT;
 osd->o_linger_requests = RB_ROOT;
 osd->o_backoff_mappings = RB_ROOT;
 osd->o_backoffs_by_id = RB_ROOT;
 INIT_LIST_HEAD(&osd->o_osd_lru);
 INIT_LIST_HEAD(&osd->o_keepalive_item);
 osd->o_incarnation = 1;
 mutex_init(&osd->lock);
}

static void ceph_init_sparse_read(struct ceph_sparse_read *sr)
{
 kfree(sr->sr_extent);
 memset(sr, '\0'sizeof(*sr));
 sr->sr_state = CEPH_SPARSE_READ_HDR;
}

static void osd_cleanup(struct ceph_osd *osd)
{
 WARN_ON(!RB_EMPTY_NODE(&osd->o_node));
 WARN_ON(!RB_EMPTY_ROOT(&osd->o_requests));
 WARN_ON(!RB_EMPTY_ROOT(&osd->o_linger_requests));
 WARN_ON(!RB_EMPTY_ROOT(&osd->o_backoff_mappings));
 WARN_ON(!RB_EMPTY_ROOT(&osd->o_backoffs_by_id));
 WARN_ON(!list_empty(&osd->o_osd_lru));
 WARN_ON(!list_empty(&osd->o_keepalive_item));

 ceph_init_sparse_read(&osd->o_sparse_read);

 if (osd->o_auth.authorizer) {
  WARN_ON(osd_homeless(osd));
  ceph_auth_destroy_authorizer(osd->o_auth.authorizer);
 }
}

/*
 * Track open sessions with osds.
 */

static struct ceph_osd *create_osd(struct ceph_osd_client *osdc, int onum)
{
 struct ceph_osd *osd;

 WARN_ON(onum == CEPH_HOMELESS_OSD);

 osd = kzalloc(sizeof(*osd), GFP_NOIO | __GFP_NOFAIL);
 osd_init(osd);
 osd->o_osdc = osdc;
 osd->o_osd = onum;
 osd->o_sparse_op_idx = -1;

 ceph_init_sparse_read(&osd->o_sparse_read);

 ceph_con_init(&osd->o_con, osd, &osd_con_ops, &osdc->client->msgr);

 return osd;
}

static struct ceph_osd *get_osd(struct ceph_osd *osd)
{
 if (refcount_inc_not_zero(&osd->o_ref)) {
  dout("get_osd %p %d -> %d\n", osd, refcount_read(&osd->o_ref)-1,
       refcount_read(&osd->o_ref));
  return osd;
 } else {
  dout("get_osd %p FAIL\n", osd);
  return NULL;
 }
}

static void put_osd(struct ceph_osd *osd)
{
 dout("put_osd %p %d -> %d\n", osd, refcount_read(&osd->o_ref),
      refcount_read(&osd->o_ref) - 1);
 if (refcount_dec_and_test(&osd->o_ref)) {
  osd_cleanup(osd);
  kfree(osd);
 }
}

DEFINE_RB_FUNCS(osd, struct ceph_osd, o_osd, o_node)

static void __move_osd_to_lru(struct ceph_osd *osd)
{
 struct ceph_osd_client *osdc = osd->o_osdc;

 dout("%s osd %p osd%d\n", __func__, osd, osd->o_osd);
 BUG_ON(!list_empty(&osd->o_osd_lru));

 spin_lock(&osdc->osd_lru_lock);
 list_add_tail(&osd->o_osd_lru, &osdc->osd_lru);
 spin_unlock(&osdc->osd_lru_lock);

 osd->lru_ttl = jiffies + osdc->client->options->osd_idle_ttl;
}

static void maybe_move_osd_to_lru(struct ceph_osd *osd)
{
 if (RB_EMPTY_ROOT(&osd->o_requests) &&
     RB_EMPTY_ROOT(&osd->o_linger_requests))
  __move_osd_to_lru(osd);
}

static void __remove_osd_from_lru(struct ceph_osd *osd)
{
 struct ceph_osd_client *osdc = osd->o_osdc;

 dout("%s osd %p osd%d\n", __func__, osd, osd->o_osd);

 spin_lock(&osdc->osd_lru_lock);
 if (!list_empty(&osd->o_osd_lru))
  list_del_init(&osd->o_osd_lru);
 spin_unlock(&osdc->osd_lru_lock);
}

/*
 * Close the connection and assign any leftover requests to the
 * homeless session.
 */

static void close_osd(struct ceph_osd *osd)
{
 struct ceph_osd_client *osdc = osd->o_osdc;
 struct rb_node *n;

 verify_osdc_wrlocked(osdc);
 dout("%s osd %p osd%d\n", __func__, osd, osd->o_osd);

 ceph_con_close(&osd->o_con);

 for (n = rb_first(&osd->o_requests); n; ) {
  struct ceph_osd_request *req =
      rb_entry(n, struct ceph_osd_request, r_node);

  n = rb_next(n); /* unlink_request() */

  dout(" reassigning req %p tid %llu\n", req, req->r_tid);
  unlink_request(osd, req);
  link_request(&osdc->homeless_osd, req);
 }
 for (n = rb_first(&osd->o_linger_requests); n; ) {
  struct ceph_osd_linger_request *lreq =
      rb_entry(n, struct ceph_osd_linger_request, node);

  n = rb_next(n); /* unlink_linger() */

  dout(" reassigning lreq %p linger_id %llu\n", lreq,
       lreq->linger_id);
  unlink_linger(osd, lreq);
  link_linger(&osdc->homeless_osd, lreq);
 }
 clear_backoffs(osd);

 __remove_osd_from_lru(osd);
 erase_osd(&osdc->osds, osd);
 put_osd(osd);
}

/*
 * reset osd connect
 */

static int reopen_osd(struct ceph_osd *osd)
{
 struct ceph_entity_addr *peer_addr;

 dout("%s osd %p osd%d\n", __func__, osd, osd->o_osd);

 if (RB_EMPTY_ROOT(&osd->o_requests) &&
     RB_EMPTY_ROOT(&osd->o_linger_requests)) {
  close_osd(osd);
  return -ENODEV;
 }

 peer_addr = &osd->o_osdc->osdmap->osd_addr[osd->o_osd];
 if (!memcmp(peer_addr, &osd->o_con.peer_addr, sizeof (*peer_addr)) &&
   !ceph_con_opened(&osd->o_con)) {
  struct rb_node *n;

  dout("osd addr hasn't changed and connection never opened, "
       "letting msgr retry\n");
  /* touch each r_stamp for handle_timeout()'s benfit */
  for (n = rb_first(&osd->o_requests); n; n = rb_next(n)) {
   struct ceph_osd_request *req =
       rb_entry(n, struct ceph_osd_request, r_node);
   req->r_stamp = jiffies;
  }

  return -EAGAIN;
 }

 ceph_con_close(&osd->o_con);
 ceph_con_open(&osd->o_con, CEPH_ENTITY_TYPE_OSD, osd->o_osd, peer_addr);
 osd->o_incarnation++;

 return 0;
}

static struct ceph_osd *lookup_create_osd(struct ceph_osd_client *osdc, int o,
       bool wrlocked)
{
 struct ceph_osd *osd;

 if (wrlocked)
  verify_osdc_wrlocked(osdc);
 else
  verify_osdc_locked(osdc);

 if (o != CEPH_HOMELESS_OSD)
  osd = lookup_osd(&osdc->osds, o);
 else
  osd = &osdc->homeless_osd;
 if (!osd) {
  if (!wrlocked)
   return ERR_PTR(-EAGAIN);

  osd = create_osd(osdc, o);
  insert_osd(&osdc->osds, osd);
  ceph_con_open(&osd->o_con, CEPH_ENTITY_TYPE_OSD, osd->o_osd,
         &osdc->osdmap->osd_addr[osd->o_osd]);
 }

 dout("%s osdc %p osd%d -> osd %p\n", __func__, osdc, o, osd);
 return osd;
}

/*
 * Create request <-> OSD session relation.
 *
 * @req has to be assigned a tid, @osd may be homeless.
 */

static void link_request(struct ceph_osd *osd, struct ceph_osd_request *req)
{
 verify_osd_locked(osd);
 WARN_ON(!req->r_tid || req->r_osd);
 dout("%s osd %p osd%d req %p tid %llu\n", __func__, osd, osd->o_osd,
      req, req->r_tid);

 if (!osd_homeless(osd))
  __remove_osd_from_lru(osd);
 else
  atomic_inc(&osd->o_osdc->num_homeless);

 get_osd(osd);
 spin_lock(&osd->o_requests_lock);
 insert_request(&osd->o_requests, req);
 spin_unlock(&osd->o_requests_lock);
 req->r_osd = osd;
}

static void unlink_request(struct ceph_osd *osd, struct ceph_osd_request *req)
{
 verify_osd_locked(osd);
 WARN_ON(req->r_osd != osd);
 dout("%s osd %p osd%d req %p tid %llu\n", __func__, osd, osd->o_osd,
      req, req->r_tid);

 req->r_osd = NULL;
 spin_lock(&osd->o_requests_lock);
 erase_request(&osd->o_requests, req);
 spin_unlock(&osd->o_requests_lock);
 put_osd(osd);

 if (!osd_homeless(osd))
  maybe_move_osd_to_lru(osd);
 else
  atomic_dec(&osd->o_osdc->num_homeless);
}

static bool __pool_full(struct ceph_pg_pool_info *pi)
{
 return pi->flags & CEPH_POOL_FLAG_FULL;
}

static bool have_pool_full(struct ceph_osd_client *osdc)
{
 struct rb_node *n;

 for (n = rb_first(&osdc->osdmap->pg_pools); n; n = rb_next(n)) {
  struct ceph_pg_pool_info *pi =
      rb_entry(n, struct ceph_pg_pool_info, node);

  if (__pool_full(pi))
   return true;
 }

 return false;
}

static bool pool_full(struct ceph_osd_client *osdc, s64 pool_id)
{
 struct ceph_pg_pool_info *pi;

 pi = ceph_pg_pool_by_id(osdc->osdmap, pool_id);
 if (!pi)
  return false;

 return __pool_full(pi);
}

/*
 * Returns whether a request should be blocked from being sent
 * based on the current osdmap and osd_client settings.
 */

static bool target_should_be_paused(struct ceph_osd_client *osdc,
        const struct ceph_osd_request_target *t,
        struct ceph_pg_pool_info *pi)
{
 bool pauserd = ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSERD);
 bool pausewr = ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSEWR) ||
         ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) ||
         __pool_full(pi);

 WARN_ON(pi->id != t->target_oloc.pool);
 return ((t->flags & CEPH_OSD_FLAG_READ) && pauserd) ||
        ((t->flags & CEPH_OSD_FLAG_WRITE) && pausewr) ||
        (osdc->osdmap->epoch < osdc->epoch_barrier);
}

static int pick_random_replica(const struct ceph_osds *acting)
{
 int i = get_random_u32_below(acting->size);

 dout("%s picked osd%d, primary osd%d\n", __func__,
      acting->osds[i], acting->primary);
 return i;
}

/*
 * Picks the closest replica based on client's location given by
 * crush_location option.  Prefers the primary if the locality is
 * the same.
 */

static int pick_closest_replica(struct ceph_osd_client *osdc,
    const struct ceph_osds *acting)
{
 struct ceph_options *opt = osdc->client->options;
 int best_i, best_locality;
 int i = 0, locality;

 do {
  locality = ceph_get_crush_locality(osdc->osdmap,
         acting->osds[i],
         &opt->crush_locs);
  if (i == 0 ||
      (locality >= 0 && best_locality < 0) ||
      (locality >= 0 && best_locality >= 0 &&
       locality < best_locality)) {
   best_i = i;
   best_locality = locality;
  }
 } while (++i < acting->size);

 dout("%s picked osd%d with locality %d, primary osd%d\n", __func__,
      acting->osds[best_i], best_locality, acting->primary);
 return best_i;
}

enum calc_target_result {
 CALC_TARGET_NO_ACTION = 0,
 CALC_TARGET_NEED_RESEND,
 CALC_TARGET_POOL_DNE,
};

static enum calc_target_result calc_target(struct ceph_osd_client *osdc,
        struct ceph_osd_request_target *t,
        bool any_change)
{
 struct ceph_pg_pool_info *pi;
 struct ceph_pg pgid, last_pgid;
 struct ceph_osds up, acting;
 bool is_read = t->flags & CEPH_OSD_FLAG_READ;
 bool is_write = t->flags & CEPH_OSD_FLAG_WRITE;
 bool force_resend = false;
 bool unpaused = false;
 bool legacy_change = false;
 bool split = false;
 bool sort_bitwise = ceph_osdmap_flag(osdc, CEPH_OSDMAP_SORTBITWISE);
 bool recovery_deletes = ceph_osdmap_flag(osdc,
       CEPH_OSDMAP_RECOVERY_DELETES);
 enum calc_target_result ct_res;

 t->epoch = osdc->osdmap->epoch;
 pi = ceph_pg_pool_by_id(osdc->osdmap, t->base_oloc.pool);
 if (!pi) {
  t->osd = CEPH_HOMELESS_OSD;
  ct_res = CALC_TARGET_POOL_DNE;
  goto out;
 }

 if (osdc->osdmap->epoch == pi->last_force_request_resend) {
  if (t->last_force_resend < pi->last_force_request_resend) {
   t->last_force_resend = pi->last_force_request_resend;
   force_resend = true;
  } else if (t->last_force_resend == 0) {
   force_resend = true;
  }
 }

 /* apply tiering */
 ceph_oid_copy(&t->target_oid, &t->base_oid);
 ceph_oloc_copy(&t->target_oloc, &t->base_oloc);
 if ((t->flags & CEPH_OSD_FLAG_IGNORE_OVERLAY) == 0) {
  if (is_read && pi->read_tier >= 0)
   t->target_oloc.pool = pi->read_tier;
  if (is_write && pi->write_tier >= 0)
   t->target_oloc.pool = pi->write_tier;

  pi = ceph_pg_pool_by_id(osdc->osdmap, t->target_oloc.pool);
  if (!pi) {
   t->osd = CEPH_HOMELESS_OSD;
   ct_res = CALC_TARGET_POOL_DNE;
   goto out;
  }
 }

 __ceph_object_locator_to_pg(pi, &t->target_oid, &t->target_oloc, &pgid);
 last_pgid.pool = pgid.pool;
 last_pgid.seed = ceph_stable_mod(pgid.seed, t->pg_num, t->pg_num_mask);

 ceph_pg_to_up_acting_osds(osdc->osdmap, pi, &pgid, &up, &acting);
 if (any_change &&
     ceph_is_new_interval(&t->acting,
     &acting,
     &t->up,
     &up,
     t->size,
     pi->size,
     t->min_size,
     pi->min_size,
     t->pg_num,
     pi->pg_num,
     t->sort_bitwise,
     sort_bitwise,
     t->recovery_deletes,
     recovery_deletes,
     &last_pgid))
  force_resend = true;

 if (t->paused && !target_should_be_paused(osdc, t, pi)) {
  t->paused = false;
  unpaused = true;
 }
 legacy_change = ceph_pg_compare(&t->pgid, &pgid) ||
   ceph_osds_changed(&t->acting, &acting,
       t->used_replica || any_change);
 if (t->pg_num)
  split = ceph_pg_is_split(&last_pgid, t->pg_num, pi->pg_num);

 if (legacy_change || force_resend || split) {
  t->pgid = pgid; /* struct */
  ceph_pg_to_primary_shard(osdc->osdmap, pi, &pgid, &t->spgid);
  ceph_osds_copy(&t->acting, &acting);
  ceph_osds_copy(&t->up, &up);
  t->size = pi->size;
  t->min_size = pi->min_size;
  t->pg_num = pi->pg_num;
  t->pg_num_mask = pi->pg_num_mask;
  t->sort_bitwise = sort_bitwise;
  t->recovery_deletes = recovery_deletes;

  if ((t->flags & (CEPH_OSD_FLAG_BALANCE_READS |
     CEPH_OSD_FLAG_LOCALIZE_READS)) &&
      !is_write && pi->type == CEPH_POOL_TYPE_REP &&
      acting.size > 1) {
   int pos;

   WARN_ON(!is_read || acting.osds[0] != acting.primary);
   if (t->flags & CEPH_OSD_FLAG_BALANCE_READS) {
    pos = pick_random_replica(&acting);
   } else {
    pos = pick_closest_replica(osdc, &acting);
   }
   t->osd = acting.osds[pos];
   t->used_replica = pos > 0;
  } else {
   t->osd = acting.primary;
   t->used_replica = false;
  }
 }

 if (unpaused || legacy_change || force_resend || split)
  ct_res = CALC_TARGET_NEED_RESEND;
 else
  ct_res = CALC_TARGET_NO_ACTION;

out:
 dout("%s t %p -> %d%d%d%d ct_res %d osd%d\n", __func__, t, unpaused,
      legacy_change, force_resend, split, ct_res, t->osd);
 return ct_res;
}

static struct ceph_spg_mapping *alloc_spg_mapping(void)
{
 struct ceph_spg_mapping *spg;

 spg = kmalloc(sizeof(*spg), GFP_NOIO);
 if (!spg)
  return NULL;

 RB_CLEAR_NODE(&spg->node);
 spg->backoffs = RB_ROOT;
 return spg;
}

static void free_spg_mapping(struct ceph_spg_mapping *spg)
{
 WARN_ON(!RB_EMPTY_NODE(&spg->node));
 WARN_ON(!RB_EMPTY_ROOT(&spg->backoffs));

 kfree(spg);
}

/*
 * rbtree of ceph_spg_mapping for handling map<spg_t, ...>, similar to
 * ceph_pg_mapping.  Used to track OSD backoffs -- a backoff [range] is
 * defined only within a specific spgid; it does not pass anything to
 * children on split, or to another primary.
 */

DEFINE_RB_FUNCS2(spg_mapping, struct ceph_spg_mapping, spgid, ceph_spg_compare,
   RB_BYPTR, const struct ceph_spg *, node)

static u64 hoid_get_bitwise_key(const struct ceph_hobject_id *hoid)
{
 return hoid->is_max ? 0x100000000ull : hoid->hash_reverse_bits;
}

static void hoid_get_effective_key(const struct ceph_hobject_id *hoid,
       void **pkey, size_t *pkey_len)
{
 if (hoid->key_len) {
  *pkey = hoid->key;
  *pkey_len = hoid->key_len;
 } else {
  *pkey = hoid->oid;
  *pkey_len = hoid->oid_len;
 }
}

static int compare_names(const void *name1, size_t name1_len,
    const void *name2, size_t name2_len)
{
 int ret;

 ret = memcmp(name1, name2, min(name1_len, name2_len));
 if (!ret) {
  if (name1_len < name2_len)
   ret = -1;
  else if (name1_len > name2_len)
   ret = 1;
 }
 return ret;
}

static int hoid_compare(const struct ceph_hobject_id *lhs,
   const struct ceph_hobject_id *rhs)
{
 void *effective_key1, *effective_key2;
 size_t effective_key1_len, effective_key2_len;
 int ret;

 if (lhs->is_max < rhs->is_max)
  return -1;
 if (lhs->is_max > rhs->is_max)
  return 1;

 if (lhs->pool < rhs->pool)
  return -1;
 if (lhs->pool > rhs->pool)
  return 1;

 if (hoid_get_bitwise_key(lhs) < hoid_get_bitwise_key(rhs))
  return -1;
 if (hoid_get_bitwise_key(lhs) > hoid_get_bitwise_key(rhs))
  return 1;

 ret = compare_names(lhs->nspace, lhs->nspace_len,
       rhs->nspace, rhs->nspace_len);
 if (ret)
  return ret;

 hoid_get_effective_key(lhs, &effective_key1, &effective_key1_len);
 hoid_get_effective_key(rhs, &effective_key2, &effective_key2_len);
 ret = compare_names(effective_key1, effective_key1_len,
       effective_key2, effective_key2_len);
 if (ret)
  return ret;

 ret = compare_names(lhs->oid, lhs->oid_len, rhs->oid, rhs->oid_len);
 if (ret)
  return ret;

 if (lhs->snapid < rhs->snapid)
  return -1;
 if (lhs->snapid > rhs->snapid)
  return 1;

 return 0;
}

/*
 * For decoding ->begin and ->end of MOSDBackoff only -- no MIN/MAX
 * compat stuff here.
 *
 * Assumes @hoid is zero-initialized.
 */

static int decode_hoid(void **p, void *end, struct ceph_hobject_id *hoid)
{
 u8 struct_v;
 u32 struct_len;
 int ret;

 ret = ceph_start_decoding(p, end, 4, "hobject_t", &struct_v,
      &struct_len);
 if (ret)
  return ret;

 if (struct_v < 4) {
  pr_err("got struct_v %d < 4 of hobject_t\n", struct_v);
  goto e_inval;
 }

 hoid->key = ceph_extract_encoded_string(p, end, &hoid->key_len,
      GFP_NOIO);
 if (IS_ERR(hoid->key)) {
  ret = PTR_ERR(hoid->key);
  hoid->key = NULL;
  return ret;
 }

 hoid->oid = ceph_extract_encoded_string(p, end, &hoid->oid_len,
      GFP_NOIO);
 if (IS_ERR(hoid->oid)) {
  ret = PTR_ERR(hoid->oid);
  hoid->oid = NULL;
  return ret;
 }

 ceph_decode_64_safe(p, end, hoid->snapid, e_inval);
 ceph_decode_32_safe(p, end, hoid->hash, e_inval);
 ceph_decode_8_safe(p, end, hoid->is_max, e_inval);

 hoid->nspace = ceph_extract_encoded_string(p, end, &hoid->nspace_len,
         GFP_NOIO);
 if (IS_ERR(hoid->nspace)) {
  ret = PTR_ERR(hoid->nspace);
  hoid->nspace = NULL;
  return ret;
 }

 ceph_decode_64_safe(p, end, hoid->pool, e_inval);

 ceph_hoid_build_hash_cache(hoid);
 return 0;

e_inval:
 return -EINVAL;
}

static int hoid_encoding_size(const struct ceph_hobject_id *hoid)
{
 return 8 + 4 + 1 + 8 + /* snapid, hash, is_max, pool */
        4 + hoid->key_len + 4 + hoid->oid_len + 4 + hoid->nspace_len;
}

static void encode_hoid(void **p, void *end, const struct ceph_hobject_id *hoid)
{
 ceph_start_encoding(p, 4, 3, hoid_encoding_size(hoid));
 ceph_encode_string(p, end, hoid->key, hoid->key_len);
 ceph_encode_string(p, end, hoid->oid, hoid->oid_len);
 ceph_encode_64(p, hoid->snapid);
 ceph_encode_32(p, hoid->hash);
 ceph_encode_8(p, hoid->is_max);
 ceph_encode_string(p, end, hoid->nspace, hoid->nspace_len);
 ceph_encode_64(p, hoid->pool);
}

static void free_hoid(struct ceph_hobject_id *hoid)
{
 if (hoid) {
  kfree(hoid->key);
  kfree(hoid->oid);
  kfree(hoid->nspace);
  kfree(hoid);
 }
}

static struct ceph_osd_backoff *alloc_backoff(void)
{
 struct ceph_osd_backoff *backoff;

 backoff = kzalloc(sizeof(*backoff), GFP_NOIO);
 if (!backoff)
  return NULL;

 RB_CLEAR_NODE(&backoff->spg_node);
 RB_CLEAR_NODE(&backoff->id_node);
 return backoff;
}

static void free_backoff(struct ceph_osd_backoff *backoff)
{
 WARN_ON(!RB_EMPTY_NODE(&backoff->spg_node));
 WARN_ON(!RB_EMPTY_NODE(&backoff->id_node));

 free_hoid(backoff->begin);
 free_hoid(backoff->end);
 kfree(backoff);
}

/*
 * Within a specific spgid, backoffs are managed by ->begin hoid.
 */

DEFINE_RB_INSDEL_FUNCS2(backoff, struct ceph_osd_backoff, begin, hoid_compare,
   RB_BYVAL, spg_node);

static struct ceph_osd_backoff *lookup_containing_backoff(struct rb_root *root,
         const struct ceph_hobject_id *hoid)
{
 struct rb_node *n = root->rb_node;

 while (n) {
  struct ceph_osd_backoff *cur =
      rb_entry(n, struct ceph_osd_backoff, spg_node);
  int cmp;

  cmp = hoid_compare(hoid, cur->begin);
  if (cmp < 0) {
   n = n->rb_left;
  } else if (cmp > 0) {
   if (hoid_compare(hoid, cur->end) < 0)
    return cur;

   n = n->rb_right;
  } else {
   return cur;
  }
 }

 return NULL;
}

/*
 * Each backoff has a unique id within its OSD session.
 */

DEFINE_RB_FUNCS(backoff_by_id, struct ceph_osd_backoff, id, id_node)

static void clear_backoffs(struct ceph_osd *osd)
{
 while (!RB_EMPTY_ROOT(&osd->o_backoff_mappings)) {
  struct ceph_spg_mapping *spg =
      rb_entry(rb_first(&osd->o_backoff_mappings),
        struct ceph_spg_mapping, node);

  while (!RB_EMPTY_ROOT(&spg->backoffs)) {
   struct ceph_osd_backoff *backoff =
       rb_entry(rb_first(&spg->backoffs),
         struct ceph_osd_backoff, spg_node);

   erase_backoff(&spg->backoffs, backoff);
   erase_backoff_by_id(&osd->o_backoffs_by_id, backoff);
   free_backoff(backoff);
  }
  erase_spg_mapping(&osd->o_backoff_mappings, spg);
  free_spg_mapping(spg);
 }
}

/*
 * Set up a temporary, non-owning view into @t.
 */

static void hoid_fill_from_target(struct ceph_hobject_id *hoid,
      const struct ceph_osd_request_target *t)
{
 hoid->key = NULL;
 hoid->key_len = 0;
 hoid->oid = t->target_oid.name;
 hoid->oid_len = t->target_oid.name_len;
 hoid->snapid = CEPH_NOSNAP;
 hoid->hash = t->pgid.seed;
 hoid->is_max = false;
 if (t->target_oloc.pool_ns) {
  hoid->nspace = t->target_oloc.pool_ns->str;
  hoid->nspace_len = t->target_oloc.pool_ns->len;
 } else {
  hoid->nspace = NULL;
  hoid->nspace_len = 0;
 }
 hoid->pool = t->target_oloc.pool;
 ceph_hoid_build_hash_cache(hoid);
}

static bool should_plug_request(struct ceph_osd_request *req)
{
 struct ceph_osd *osd = req->r_osd;
 struct ceph_spg_mapping *spg;
 struct ceph_osd_backoff *backoff;
 struct ceph_hobject_id hoid;

 spg = lookup_spg_mapping(&osd->o_backoff_mappings, &req->r_t.spgid);
 if (!spg)
  return false;

 hoid_fill_from_target(&hoid, &req->r_t);
 backoff = lookup_containing_backoff(&spg->backoffs, &hoid);
 if (!backoff)
  return false;

 dout("%s req %p tid %llu backoff osd%d spgid %llu.%xs%d id %llu\n",
      __func__, req, req->r_tid, osd->o_osd, backoff->spgid.pgid.pool,
      backoff->spgid.pgid.seed, backoff->spgid.shard, backoff->id);
 return true;
}

/*
 * Keep get_num_data_items() in sync with this function.
 */

static void setup_request_data(struct ceph_osd_request *req)
{
 struct ceph_msg *request_msg = req->r_request;
 struct ceph_msg *reply_msg = req->r_reply;
 struct ceph_osd_req_op *op;

 if (req->r_request->num_data_items || req->r_reply->num_data_items)
  return;

 WARN_ON(request_msg->data_length || reply_msg->data_length);
 for (op = req->r_ops; op != &req->r_ops[req->r_num_ops]; op++) {
  switch (op->op) {
  /* request */
  case CEPH_OSD_OP_WRITE:
  case CEPH_OSD_OP_WRITEFULL:
   WARN_ON(op->indata_len != op->extent.length);
   ceph_osdc_msg_data_add(request_msg,
            &op->extent.osd_data);
   break;
  case CEPH_OSD_OP_SETXATTR:
  case CEPH_OSD_OP_CMPXATTR:
   WARN_ON(op->indata_len != op->xattr.name_len +
        op->xattr.value_len);
   ceph_osdc_msg_data_add(request_msg,
            &op->xattr.osd_data);
   break;
  case CEPH_OSD_OP_NOTIFY_ACK:
   ceph_osdc_msg_data_add(request_msg,
            &op->notify_ack.request_data);
   break;
  case CEPH_OSD_OP_COPY_FROM2:
   ceph_osdc_msg_data_add(request_msg,
            &op->copy_from.osd_data);
   break;

  /* reply */
  case CEPH_OSD_OP_STAT:
   ceph_osdc_msg_data_add(reply_msg,
            &op->raw_data_in);
   break;
  case CEPH_OSD_OP_READ:
  case CEPH_OSD_OP_SPARSE_READ:
   ceph_osdc_msg_data_add(reply_msg,
            &op->extent.osd_data);
   break;
  case CEPH_OSD_OP_LIST_WATCHERS:
   ceph_osdc_msg_data_add(reply_msg,
            &op->list_watchers.response_data);
   break;

  /* both */
  case CEPH_OSD_OP_CALL:
   WARN_ON(op->indata_len != op->cls.class_len +
        op->cls.method_len +
        op->cls.indata_len);
   ceph_osdc_msg_data_add(request_msg,
            &op->cls.request_info);
   /* optional, can be NONE */
   ceph_osdc_msg_data_add(request_msg,
            &op->cls.request_data);
   /* optional, can be NONE */
   ceph_osdc_msg_data_add(reply_msg,
            &op->cls.response_data);
   break;
  case CEPH_OSD_OP_NOTIFY:
   ceph_osdc_msg_data_add(request_msg,
            &op->notify.request_data);
   ceph_osdc_msg_data_add(reply_msg,
            &op->notify.response_data);
   break;
  }
 }
}

static void encode_pgid(void **p, const struct ceph_pg *pgid)
{
 ceph_encode_8(p, 1);
 ceph_encode_64(p, pgid->pool);
 ceph_encode_32(p, pgid->seed);
 ceph_encode_32(p, -1); /* preferred */
}

static void encode_spgid(void **p, const struct ceph_spg *spgid)
{
 ceph_start_encoding(p, 1, 1, CEPH_PGID_ENCODING_LEN + 1);
 encode_pgid(p, &spgid->pgid);
 ceph_encode_8(p, spgid->shard);
}

static void encode_oloc(void **p, void *end,
   const struct ceph_object_locator *oloc)
{
 ceph_start_encoding(p, 5, 4, ceph_oloc_encoding_size(oloc));
 ceph_encode_64(p, oloc->pool);
 ceph_encode_32(p, -1); /* preferred */
 ceph_encode_32(p, 0);  /* key len */
 if (oloc->pool_ns)
  ceph_encode_string(p, end, oloc->pool_ns->str,
       oloc->pool_ns->len);
 else
  ceph_encode_32(p, 0);
}

static void encode_request_partial(struct ceph_osd_request *req,
       struct ceph_msg *msg)
{
 void *p = msg->front.iov_base;
 void *const end = p + msg->front_alloc_len;
 u32 data_len = 0;
 int i;

 if (req->r_flags & CEPH_OSD_FLAG_WRITE) {
  /* snapshots aren't writeable */
  WARN_ON(req->r_snapid != CEPH_NOSNAP);
 } else {
  WARN_ON(req->r_mtime.tv_sec || req->r_mtime.tv_nsec ||
   req->r_data_offset || req->r_snapc);
 }

 setup_request_data(req);

 encode_spgid(&p, &req->r_t.spgid); /* actual spg */
 ceph_encode_32(&p, req->r_t.pgid.seed); /* raw hash */
 ceph_encode_32(&p, req->r_osdc->osdmap->epoch);
 ceph_encode_32(&p, req->r_flags);

 /* reqid */
 ceph_start_encoding(&p, 2, 2, sizeof(struct ceph_osd_reqid));
 memset(p, 0, sizeof(struct ceph_osd_reqid));
 p += sizeof(struct ceph_osd_reqid);

 /* trace */
 memset(p, 0, sizeof(struct ceph_blkin_trace_info));
 p += sizeof(struct ceph_blkin_trace_info);

 ceph_encode_32(&p, 0); /* client_inc, always 0 */
 ceph_encode_timespec64(p, &req->r_mtime);
 p += sizeof(struct ceph_timespec);

 encode_oloc(&p, end, &req->r_t.target_oloc);
 ceph_encode_string(&p, end, req->r_t.target_oid.name,
      req->r_t.target_oid.name_len);

 /* ops, can imply data */
 ceph_encode_16(&p, req->r_num_ops);
 for (i = 0; i < req->r_num_ops; i++) {
  data_len += osd_req_encode_op(p, &req->r_ops[i]);
  p += sizeof(struct ceph_osd_op);
 }

 ceph_encode_64(&p, req->r_snapid); /* snapid */
 if (req->r_snapc) {
  ceph_encode_64(&p, req->r_snapc->seq);
  ceph_encode_32(&p, req->r_snapc->num_snaps);
  for (i = 0; i < req->r_snapc->num_snaps; i++)
   ceph_encode_64(&p, req->r_snapc->snaps[i]);
 } else {
  ceph_encode_64(&p, 0); /* snap_seq */
  ceph_encode_32(&p, 0); /* snaps len */
 }

 ceph_encode_32(&p, req->r_attempts); /* retry_attempt */
 BUG_ON(p > end - 8); /* space for features */

 msg->hdr.version = cpu_to_le16(8); /* MOSDOp v8 */
 /* front_len is finalized in encode_request_finish() */
 msg->front.iov_len = p - msg->front.iov_base;
 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
 msg->hdr.data_len = cpu_to_le32(data_len);
 /*
 * The header "data_off" is a hint to the receiver allowing it
 * to align received data into its buffers such that there's no
 * need to re-copy it before writing it to disk (direct I/O).
 */

 msg->hdr.data_off = cpu_to_le16(req->r_data_offset);

 dout("%s req %p msg %p oid %s oid_len %d\n", __func__, req, msg,
      req->r_t.target_oid.name, req->r_t.target_oid.name_len);
}

static void encode_request_finish(struct ceph_msg *msg)
{
 void *p = msg->front.iov_base;
 void *const partial_end = p + msg->front.iov_len;
 void *const end = p + msg->front_alloc_len;

 if (CEPH_HAVE_FEATURE(msg->con->peer_features, RESEND_ON_SPLIT)) {
  /* luminous OSD -- encode features and be done */
  p = partial_end;
  ceph_encode_64(&p, msg->con->peer_features);
 } else {
  struct {
   char spgid[CEPH_ENCODING_START_BLK_LEN +
       CEPH_PGID_ENCODING_LEN + 1];
   __le32 hash;
   __le32 epoch;
   __le32 flags;
   char reqid[CEPH_ENCODING_START_BLK_LEN +
       sizeof(struct ceph_osd_reqid)];
   char trace[sizeof(struct ceph_blkin_trace_info)];
   __le32 client_inc;
   struct ceph_timespec mtime;
  } __packed head;
  struct ceph_pg pgid;
  void *oloc, *oid, *tail;
  int oloc_len, oid_len, tail_len;
  int len;

  /*
 * Pre-luminous OSD -- reencode v8 into v4 using @head
 * as a temporary buffer.  Encode the raw PG; the rest
 * is just a matter of moving oloc, oid and tail blobs
 * around.
 */

  memcpy(&head, p, sizeof(head));
  p += sizeof(head);

  oloc = p;
  p += CEPH_ENCODING_START_BLK_LEN;
  pgid.pool = ceph_decode_64(&p);
  p += 4 + 4; /* preferred, key len */
  len = ceph_decode_32(&p);
  p += len;   /* nspace */
  oloc_len = p - oloc;

  oid = p;
  len = ceph_decode_32(&p);
  p += len;
  oid_len = p - oid;

  tail = p;
  tail_len = partial_end - p;

  p = msg->front.iov_base;
  ceph_encode_copy(&p, &head.client_inc, sizeof(head.client_inc));
  ceph_encode_copy(&p, &head.epoch, sizeof(head.epoch));
  ceph_encode_copy(&p, &head.flags, sizeof(head.flags));
  ceph_encode_copy(&p, &head.mtime, sizeof(head.mtime));

  /* reassert_version */
  memset(p, 0, sizeof(struct ceph_eversion));
  p += sizeof(struct ceph_eversion);

  BUG_ON(p >= oloc);
  memmove(p, oloc, oloc_len);
  p += oloc_len;

  pgid.seed = le32_to_cpu(head.hash);
  encode_pgid(&p, &pgid); /* raw pg */

  BUG_ON(p >= oid);
  memmove(p, oid, oid_len);
  p += oid_len;

  /* tail -- ops, snapid, snapc, retry_attempt */
  BUG_ON(p >= tail);
  memmove(p, tail, tail_len);
  p += tail_len;

  msg->hdr.version = cpu_to_le16(4); /* MOSDOp v4 */
 }

 BUG_ON(p > end);
 msg->front.iov_len = p - msg->front.iov_base;
 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);

 dout("%s msg %p tid %llu %u+%u+%u v%d\n", __func__, msg,
      le64_to_cpu(msg->hdr.tid), le32_to_cpu(msg->hdr.front_len),
      le32_to_cpu(msg->hdr.middle_len), le32_to_cpu(msg->hdr.data_len),
      le16_to_cpu(msg->hdr.version));
}

/*
 * @req has to be assigned a tid and registered.
 */

static void send_request(struct ceph_osd_request *req)
{
 struct ceph_osd *osd = req->r_osd;

 verify_osd_locked(osd);
 WARN_ON(osd->o_osd != req->r_t.osd);

 /* backoff? */
 if (should_plug_request(req))
  return;

 /*
 * We may have a previously queued request message hanging
 * around.  Cancel it to avoid corrupting the msgr.
 */

 if (req->r_sent)
  ceph_msg_revoke(req->r_request);

 req->r_flags |= CEPH_OSD_FLAG_KNOWN_REDIR;
 if (req->r_attempts)
  req->r_flags |= CEPH_OSD_FLAG_RETRY;
 else
  WARN_ON(req->r_flags & CEPH_OSD_FLAG_RETRY);

 encode_request_partial(req, req->r_request);

 dout("%s req %p tid %llu to pgid %llu.%x spgid %llu.%xs%d osd%d e%u flags 0x%x attempt %d\n",
      __func__, req, req->r_tid, req->r_t.pgid.pool, req->r_t.pgid.seed,
      req->r_t.spgid.pgid.pool, req->r_t.spgid.pgid.seed,
      req->r_t.spgid.shard, osd->o_osd, req->r_t.epoch, req->r_flags,
      req->r_attempts);

 req->r_t.paused = false;
 req->r_stamp = jiffies;
 req->r_attempts++;

 req->r_sent = osd->o_incarnation;
 req->r_request->hdr.tid = cpu_to_le64(req->r_tid);
 ceph_con_send(&osd->o_con, ceph_msg_get(req->r_request));
}

static void maybe_request_map(struct ceph_osd_client *osdc)
{
 bool continuous = false;

 verify_osdc_locked(osdc);
 WARN_ON(!osdc->osdmap->epoch);

 if (ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) ||
     ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSERD) ||
     ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSEWR)) {
  dout("%s osdc %p continuous\n", __func__, osdc);
  continuous = true;
 } else {
  dout("%s osdc %p onetime\n", __func__, osdc);
 }

 if (ceph_monc_want_map(&osdc->client->monc, CEPH_SUB_OSDMAP,
          osdc->osdmap->epoch + 1, continuous))
  ceph_monc_renew_subs(&osdc->client->monc);
}

static void complete_request(struct ceph_osd_request *req, int err);
static void send_map_check(struct ceph_osd_request *req);

static void __submit_request(struct ceph_osd_request *req, bool wrlocked)
{
 struct ceph_osd_client *osdc = req->r_osdc;
 struct ceph_osd *osd;
 enum calc_target_result ct_res;
 int err = 0;
 bool need_send = false;
 bool promoted = false;

 WARN_ON(req->r_tid);
 dout("%s req %p wrlocked %d\n", __func__, req, wrlocked);

again:
 ct_res = calc_target(osdc, &req->r_t, false);
 if (ct_res == CALC_TARGET_POOL_DNE && !wrlocked)
  goto promote;

 osd = lookup_create_osd(osdc, req->r_t.osd, wrlocked);
 if (IS_ERR(osd)) {
  WARN_ON(PTR_ERR(osd) != -EAGAIN || wrlocked);
  goto promote;
 }

 if (osdc->abort_err) {
  dout("req %p abort_err %d\n", req, osdc->abort_err);
  err = osdc->abort_err;
 } else if (osdc->osdmap->epoch < osdc->epoch_barrier) {
  dout("req %p epoch %u barrier %u\n", req, osdc->osdmap->epoch,
       osdc->epoch_barrier);
--> --------------------

--> maximum size reached

--> --------------------

Messung V0.5
C=87 H=94 G=90

¤ Dauer der Verarbeitung: 0.23 Sekunden  (vorverarbeitet)  ¤

*© Formatika GbR, Deutschland






Wurzel

Suchen

Beweissystem der NASA

Beweissystem Isabelle

NIST Cobol Testsuite

Cephes Mathematical Library

Wiener Entwicklungsmethode

Haftungshinweis

Die Informationen auf dieser Webseite wurden nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit, noch Qualität der bereit gestellten Informationen zugesichert.

Bemerkung:

Die farbliche Syntaxdarstellung und die Messung sind noch experimentell.