Quellcodebibliothek Statistik Leitseite products/Sources/formale Sprachen/C/Linux/drivers/md/dm-vdo/   (Open Source Betriebssystem Version 6.17.9©)  Datei vom 24.10.2025 mit Größe 98 kB image not shown  

Quelle  block-map.c   Sprache: C

 
// SPDX-License-Identifier: GPL-2.0-only
/*
 * Copyright 2023 Red Hat
 */


#include "block-map.h"

#include <linux/bio.h>
#include <linux/ratelimit.h>

#include "errors.h"
#include "logger.h"
#include "memory-alloc.h"
#include "permassert.h"

#include "action-manager.h"
#include "admin-state.h"
#include "completion.h"
#include "constants.h"
#include "data-vio.h"
#include "encodings.h"
#include "io-submitter.h"
#include "physical-zone.h"
#include "recovery-journal.h"
#include "slab-depot.h"
#include "status-codes.h"
#include "types.h"
#include "vdo.h"
#include "vio.h"
#include "wait-queue.h"

/**
 * DOC: Block map eras
 *
 * The block map era, or maximum age, is used as follows:
 *
 * Each block map page, when dirty, records the earliest recovery journal block sequence number of
 * the changes reflected in that dirty block. Sequence numbers are classified into eras: every
 * @maximum_age sequence numbers, we switch to a new era. Block map pages are assigned to eras
 * according to the sequence number they record.
 *
 * In the current (newest) era, block map pages are not written unless there is cache pressure. In
 * the next oldest era, each time a new journal block is written 1/@maximum_age of the pages in
 * this era are issued for write. In all older eras, pages are issued for write immediately.
 */


struct page_descriptor {
 root_count_t root_index;
 height_t height;
 page_number_t page_index;
 slot_number_t slot;
} __packed;

union page_key {
 struct page_descriptor descriptor;
 u64 key;
};

struct write_if_not_dirtied_context {
 struct block_map_zone *zone;
 u8 generation;
};

struct block_map_tree_segment {
 struct tree_page *levels[VDO_BLOCK_MAP_TREE_HEIGHT];
};

struct block_map_tree {
 struct block_map_tree_segment *segments;
};

struct forest {
 struct block_map *map;
 size_t segments;
 struct boundary *boundaries;
 struct tree_page **pages;
 struct block_map_tree trees[];
};

struct cursor_level {
 page_number_t page_index;
 slot_number_t slot;
};

struct cursors;

struct cursor {
 struct vdo_waiter waiter;
 struct block_map_tree *tree;
 height_t height;
 struct cursors *parent;
 struct boundary boundary;
 struct cursor_level levels[VDO_BLOCK_MAP_TREE_HEIGHT];
 struct pooled_vio *vio;
};

struct cursors {
 struct block_map_zone *zone;
 struct vio_pool *pool;
 vdo_entry_callback_fn entry_callback;
 struct vdo_completion *completion;
 root_count_t active_roots;
 struct cursor cursors[];
};

static const physical_block_number_t NO_PAGE = 0xFFFFFFFFFFFFFFFF;

/* Used to indicate that the page holding the location of a tree root has been "loaded". */
static const physical_block_number_t VDO_INVALID_PBN = 0xFFFFFFFFFFFFFFFF;

const struct block_map_entry UNMAPPED_BLOCK_MAP_ENTRY = {
 .mapping_state = VDO_MAPPING_STATE_UNMAPPED & 0x0F,
 .pbn_high_nibble = 0,
 .pbn_low_word = __cpu_to_le32(VDO_ZERO_BLOCK & UINT_MAX),
};

#define LOG_INTERVAL 4000
#define DISPLAY_INTERVAL 100000

/*
 * For adjusting VDO page cache statistic fields which are only mutated on the logical zone thread.
 * Prevents any compiler shenanigans from affecting other threads reading those stats.
 */

#define ADD_ONCE(value, delta) WRITE_ONCE(value, (value) + (delta))

static inline bool is_dirty(const struct page_info *info)
{
 return info->state == PS_DIRTY;
}

static inline bool is_present(const struct page_info *info)
{
 return (info->state == PS_RESIDENT) || (info->state == PS_DIRTY);
}

static inline bool is_in_flight(const struct page_info *info)
{
 return (info->state == PS_INCOMING) || (info->state == PS_OUTGOING);
}

static inline bool is_incoming(const struct page_info *info)
{
 return info->state == PS_INCOMING;
}

static inline bool is_outgoing(const struct page_info *info)
{
 return info->state == PS_OUTGOING;
}

static inline bool is_valid(const struct page_info *info)
{
 return is_present(info) || is_outgoing(info);
}

static char *get_page_buffer(struct page_info *info)
{
 struct vdo_page_cache *cache = info->cache;

 return &cache->pages[(info - cache->infos) * VDO_BLOCK_SIZE];
}

static inline struct vdo_page_completion *page_completion_from_waiter(struct vdo_waiter *waiter)
{
 struct vdo_page_completion *completion;

 if (waiter == NULL)
  return NULL;

 completion = container_of(waiter, struct vdo_page_completion, waiter);
 vdo_assert_completion_type(&completion->completion, VDO_PAGE_COMPLETION);
 return completion;
}

/**
 * initialize_info() - Initialize all page info structures and put them on the free list.
 *
 * Return: VDO_SUCCESS or an error.
 */

static int initialize_info(struct vdo_page_cache *cache)
{
 struct page_info *info;

 INIT_LIST_HEAD(&cache->free_list);
 for (info = cache->infos; info < cache->infos + cache->page_count; info++) {
  int result;

  info->cache = cache;
  info->state = PS_FREE;
  info->pbn = NO_PAGE;

  result = create_metadata_vio(cache->vdo, VIO_TYPE_BLOCK_MAP,
          VIO_PRIORITY_METADATA, info,
          get_page_buffer(info), &info->vio);
  if (result != VDO_SUCCESS)
   return result;

  /* The thread ID should never change. */
  info->vio->completion.callback_thread_id = cache->zone->thread_id;

  INIT_LIST_HEAD(&info->state_entry);
  list_add_tail(&info->state_entry, &cache->free_list);
  INIT_LIST_HEAD(&info->lru_entry);
 }

 return VDO_SUCCESS;
}

/**
 * allocate_cache_components() - Allocate components of the cache which require their own
 *                               allocation.
 *
 * The caller is responsible for all clean up on errors.
 *
 * Return: VDO_SUCCESS or an error code.
 */

static int __must_check allocate_cache_components(struct vdo_page_cache *cache)
{
 u64 size = cache->page_count * (u64) VDO_BLOCK_SIZE;
 int result;

 result = vdo_allocate(cache->page_count, struct page_info, "page infos",
         &cache->infos);
 if (result != VDO_SUCCESS)
  return result;

 result = vdo_allocate_memory(size, VDO_BLOCK_SIZE, "cache pages", &cache->pages);
 if (result != VDO_SUCCESS)
  return result;

 result = vdo_int_map_create(cache->page_count, &cache->page_map);
 if (result != VDO_SUCCESS)
  return result;

 return initialize_info(cache);
}

/**
 * assert_on_cache_thread() - Assert that a function has been called on the VDO page cache's
 *                            thread.
 */

static inline void assert_on_cache_thread(struct vdo_page_cache *cache,
       const char *function_name)
{
 thread_id_t thread_id = vdo_get_callback_thread_id();

 VDO_ASSERT_LOG_ONLY((thread_id == cache->zone->thread_id),
       "%s() must only be called on cache thread %d, not thread %d",
       function_name, cache->zone->thread_id, thread_id);
}

/** assert_io_allowed() - Assert that a page cache may issue I/O. */
static inline void assert_io_allowed(struct vdo_page_cache *cache)
{
 VDO_ASSERT_LOG_ONLY(!vdo_is_state_quiescent(&cache->zone->state),
       "VDO page cache may issue I/O");
}

/** report_cache_pressure() - Log and, if enabled, report cache pressure. */
static void report_cache_pressure(struct vdo_page_cache *cache)
{
 ADD_ONCE(cache->stats.cache_pressure, 1);
 if (cache->waiter_count > cache->page_count) {
  if ((cache->pressure_report % LOG_INTERVAL) == 0)
   vdo_log_info("page cache pressure %u", cache->stats.cache_pressure);

  if (++cache->pressure_report >= DISPLAY_INTERVAL)
   cache->pressure_report = 0;
 }
}

/**
 * get_page_state_name() - Return the name of a page state.
 *
 * If the page state is invalid a static string is returned and the invalid state is logged.
 *
 * Return: A pointer to a static page state name.
 */

static const char * __must_check get_page_state_name(enum vdo_page_buffer_state state)
{
 int result;
 static const char * const state_names[] = {
  "FREE""INCOMING""FAILED""RESIDENT""DIRTY""OUTGOING"
 };

 BUILD_BUG_ON(ARRAY_SIZE(state_names) != PAGE_STATE_COUNT);

 result = VDO_ASSERT(state < ARRAY_SIZE(state_names),
       "Unknown page_state value %d", state);
 if (result != VDO_SUCCESS)
  return "[UNKNOWN PAGE STATE]";

 return state_names[state];
}

/**
 * update_counter() - Update the counter associated with a given state.
 * @info: The page info to count.
 * @delta: The delta to apply to the counter.
 */

static void update_counter(struct page_info *info, s32 delta)
{
 struct block_map_statistics *stats = &info->cache->stats;

 switch (info->state) {
 case PS_FREE:
  ADD_ONCE(stats->free_pages, delta);
  return;

 case PS_INCOMING:
  ADD_ONCE(stats->incoming_pages, delta);
  return;

 case PS_OUTGOING:
  ADD_ONCE(stats->outgoing_pages, delta);
  return;

 case PS_FAILED:
  ADD_ONCE(stats->failed_pages, delta);
  return;

 case PS_RESIDENT:
  ADD_ONCE(stats->clean_pages, delta);
  return;

 case PS_DIRTY:
  ADD_ONCE(stats->dirty_pages, delta);
  return;

 default:
  return;
 }
}

/** update_lru() - Update the lru information for an active page. */
static void update_lru(struct page_info *info)
{
 if (info->cache->lru_list.prev != &info->lru_entry)
  list_move_tail(&info->lru_entry, &info->cache->lru_list);
}

/**
 * set_info_state() - Set the state of a page_info and put it on the right list, adjusting
 *                    counters.
 */

static void set_info_state(struct page_info *info, enum vdo_page_buffer_state new_state)
{
 if (new_state == info->state)
  return;

 update_counter(info, -1);
 info->state = new_state;
 update_counter(info, 1);

 switch (info->state) {
 case PS_FREE:
 case PS_FAILED:
  list_move_tail(&info->state_entry, &info->cache->free_list);
  return;

 case PS_OUTGOING:
  list_move_tail(&info->state_entry, &info->cache->outgoing_list);
  return;

 case PS_DIRTY:
  return;

 default:
  list_del_init(&info->state_entry);
 }
}

/** set_info_pbn() - Set the pbn for an info, updating the map as needed. */
static int __must_check set_info_pbn(struct page_info *info, physical_block_number_t pbn)
{
 struct vdo_page_cache *cache = info->cache;

 /* Either the new or the old page number must be NO_PAGE. */
 int result = VDO_ASSERT((pbn == NO_PAGE) || (info->pbn == NO_PAGE),
    "Must free a page before reusing it.");
 if (result != VDO_SUCCESS)
  return result;

 if (info->pbn != NO_PAGE)
  vdo_int_map_remove(cache->page_map, info->pbn);

 info->pbn = pbn;

 if (pbn != NO_PAGE) {
  result = vdo_int_map_put(cache->page_map, pbn, info, true, NULL);
  if (result != VDO_SUCCESS)
   return result;
 }
 return VDO_SUCCESS;
}

/** reset_page_info() - Reset page info to represent an unallocated page. */
static int reset_page_info(struct page_info *info)
{
 int result;

 result = VDO_ASSERT(info->busy == 0, "VDO Page must not be busy");
 if (result != VDO_SUCCESS)
  return result;

 result = VDO_ASSERT(!vdo_waitq_has_waiters(&info->waiting),
       "VDO Page must not have waiters");
 if (result != VDO_SUCCESS)
  return result;

 result = set_info_pbn(info, NO_PAGE);
 set_info_state(info, PS_FREE);
 list_del_init(&info->lru_entry);
 return result;
}

/**
 * find_free_page() - Find a free page.
 *
 * Return: A pointer to the page info structure (if found), NULL otherwise.
 */

static struct page_info * __must_check find_free_page(struct vdo_page_cache *cache)
{
 struct page_info *info;

 info = list_first_entry_or_null(&cache->free_list, struct page_info,
     state_entry);
 if (info != NULL)
  list_del_init(&info->state_entry);

 return info;
}

/**
 * find_page() - Find the page info (if any) associated with a given pbn.
 * @pbn: The absolute physical block number of the page.
 *
 * Return: The page info for the page if available, or NULL if not.
 */

static struct page_info * __must_check find_page(struct vdo_page_cache *cache,
       physical_block_number_t pbn)
{
 if ((cache->last_found != NULL) && (cache->last_found->pbn == pbn))
  return cache->last_found;

 cache->last_found = vdo_int_map_get(cache->page_map, pbn);
 return cache->last_found;
}

/**
 * select_lru_page() - Determine which page is least recently used.
 *
 * Picks the least recently used from among the non-busy entries at the front of each of the lru
 * list. Since whenever we mark a page busy we also put it to the end of the list it is unlikely
 * that the entries at the front are busy unless the queue is very short, but not impossible.
 *
 * Return: A pointer to the info structure for a relevant page, or NULL if no such page can be
 *         found. The page can be dirty or resident.
 */

static struct page_info * __must_check select_lru_page(struct vdo_page_cache *cache)
{
 struct page_info *info;

 list_for_each_entry(info, &cache->lru_list, lru_entry)
  if ((info->busy == 0) && !is_in_flight(info))
   return info;

 return NULL;
}

/* ASYNCHRONOUS INTERFACE BEYOND THIS POINT */

/**
 * complete_with_page() - Helper to complete the VDO Page Completion request successfully.
 * @info: The page info representing the result page.
 * @vdo_page_comp: The VDO page completion to complete.
 */

static void complete_with_page(struct page_info *info,
          struct vdo_page_completion *vdo_page_comp)
{
 bool available = vdo_page_comp->writable ? is_present(info) : is_valid(info);

 if (!available) {
  vdo_log_error_strerror(VDO_BAD_PAGE,
           "Requested cache page %llu in state %s is not %s",
           (unsigned long long) info->pbn,
           get_page_state_name(info->state),
           vdo_page_comp->writable ? "present" : "valid");
  vdo_fail_completion(&vdo_page_comp->completion, VDO_BAD_PAGE);
  return;
 }

 vdo_page_comp->info = info;
 vdo_page_comp->ready = true;
 vdo_finish_completion(&vdo_page_comp->completion);
}

/**
 * complete_waiter_with_error() - Complete a page completion with an error code.
 * @waiter: The page completion, as a waiter.
 * @result_ptr: A pointer to the error code.
 *
 * Implements waiter_callback_fn.
 */

static void complete_waiter_with_error(struct vdo_waiter *waiter, void *result_ptr)
{
 int *result = result_ptr;

 vdo_fail_completion(&page_completion_from_waiter(waiter)->completion, *result);
}

/**
 * complete_waiter_with_page() - Complete a page completion with a page.
 * @waiter: The page completion, as a waiter.
 * @page_info: The page info to complete with.
 *
 * Implements waiter_callback_fn.
 */

static void complete_waiter_with_page(struct vdo_waiter *waiter, void *page_info)
{
 complete_with_page(page_info, page_completion_from_waiter(waiter));
}

/**
 * distribute_page_over_waitq() - Complete a waitq of VDO page completions with a page result.
 *
 * Upon completion the waitq will be empty.
 *
 * Return: The number of pages distributed.
 */

static unsigned int distribute_page_over_waitq(struct page_info *info,
            struct vdo_wait_queue *waitq)
{
 size_t num_pages;

 update_lru(info);
 num_pages = vdo_waitq_num_waiters(waitq);

 /*
 * Increment the busy count once for each pending completion so that this page does not
 * stop being busy until all completions have been processed.
 */

 info->busy += num_pages;

 vdo_waitq_notify_all_waiters(waitq, complete_waiter_with_page, info);
 return num_pages;
}

/**
 * set_persistent_error() - Set a persistent error which all requests will receive in the future.
 * @context: A string describing what triggered the error.
 *
 * Once triggered, all enqueued completions will get this error. Any future requests will result in
 * this error as well.
 */

static void set_persistent_error(struct vdo_page_cache *cache, const char *context,
     int result)
{
 struct page_info *info;
 /* If we're already read-only, there's no need to log. */
 struct vdo *vdo = cache->vdo;

 if ((result != VDO_READ_ONLY) && !vdo_is_read_only(vdo)) {
  vdo_log_error_strerror(result, "VDO Page Cache persistent error: %s",
           context);
  vdo_enter_read_only_mode(vdo, result);
 }

 assert_on_cache_thread(cache, __func__);

 vdo_waitq_notify_all_waiters(&cache->free_waiters,
         complete_waiter_with_error, &result);
 cache->waiter_count = 0;

 for (info = cache->infos; info < cache->infos + cache->page_count; info++) {
  vdo_waitq_notify_all_waiters(&info->waiting,
          complete_waiter_with_error, &result);
 }
}

/**
 * validate_completed_page() - Check that a page completion which is being freed to the cache
 *                             referred to a valid page and is in a valid state.
 * @writable: Whether a writable page is required.
 *
 * Return: VDO_SUCCESS if the page was valid, otherwise as error
 */

static int __must_check validate_completed_page(struct vdo_page_completion *completion,
      bool writable)
{
 int result;

 result = VDO_ASSERT(completion->ready, "VDO Page completion not ready");
 if (result != VDO_SUCCESS)
  return result;

 result = VDO_ASSERT(completion->info != NULL,
       "VDO Page Completion must be complete");
 if (result != VDO_SUCCESS)
  return result;

 result = VDO_ASSERT(completion->info->pbn == completion->pbn,
       "VDO Page Completion pbn must be consistent");
 if (result != VDO_SUCCESS)
  return result;

 result = VDO_ASSERT(is_valid(completion->info),
       "VDO Page Completion page must be valid");
 if (result != VDO_SUCCESS)
  return result;

 if (writable) {
  result = VDO_ASSERT(completion->writable,
        "VDO Page Completion must be writable");
  if (result != VDO_SUCCESS)
   return result;
 }

 return VDO_SUCCESS;
}

static void check_for_drain_complete(struct block_map_zone *zone)
{
 if (vdo_is_state_draining(&zone->state) &&
     (zone->active_lookups == 0) &&
     !vdo_waitq_has_waiters(&zone->flush_waiters) &&
     !is_vio_pool_busy(zone->vio_pool) &&
     (zone->page_cache.outstanding_reads == 0) &&
     (zone->page_cache.outstanding_writes == 0)) {
  vdo_finish_draining_with_result(&zone->state,
      (vdo_is_read_only(zone->block_map->vdo) ?
       VDO_READ_ONLY : VDO_SUCCESS));
 }
}

static void enter_zone_read_only_mode(struct block_map_zone *zone, int result)
{
 vdo_enter_read_only_mode(zone->block_map->vdo, result);

 /*
 * We are in read-only mode, so we won't ever write any page out.
 * Just take all waiters off the waitq so the zone can drain.
 */

 vdo_waitq_init(&zone->flush_waiters);
 check_for_drain_complete(zone);
}

static bool __must_check
validate_completed_page_or_enter_read_only_mode(struct vdo_page_completion *completion,
      bool writable)
{
 int result = validate_completed_page(completion, writable);

 if (result == VDO_SUCCESS)
  return true;

 enter_zone_read_only_mode(completion->info->cache->zone, result);
 return false;
}

/**
 * handle_load_error() - Handle page load errors.
 * @completion: The page read vio.
 */

static void handle_load_error(struct vdo_completion *completion)
{
 int result = completion->result;
 struct page_info *info = completion->parent;
 struct vdo_page_cache *cache = info->cache;

 assert_on_cache_thread(cache, __func__);
 vio_record_metadata_io_error(as_vio(completion));
 vdo_enter_read_only_mode(cache->zone->block_map->vdo, result);
 ADD_ONCE(cache->stats.failed_reads, 1);
 set_info_state(info, PS_FAILED);
 vdo_waitq_notify_all_waiters(&info->waiting, complete_waiter_with_error, &result);
 reset_page_info(info);

 /*
 * Don't decrement until right before calling check_for_drain_complete() to
 * ensure that the above work can't cause the page cache to be freed out from under us.
 */

 cache->outstanding_reads--;
 check_for_drain_complete(cache->zone);
}

/**
 * page_is_loaded() - Callback used when a page has been loaded.
 * @completion: The vio which has loaded the page. Its parent is the page_info.
 */

static void page_is_loaded(struct vdo_completion *completion)
{
 struct page_info *info = completion->parent;
 struct vdo_page_cache *cache = info->cache;
 nonce_t nonce = info->cache->zone->block_map->nonce;
 struct block_map_page *page;
 enum block_map_page_validity validity;

 assert_on_cache_thread(cache, __func__);

 page = (struct block_map_page *) get_page_buffer(info);
 validity = vdo_validate_block_map_page(page, nonce, info->pbn);
 if (validity == VDO_BLOCK_MAP_PAGE_BAD) {
  physical_block_number_t pbn = vdo_get_block_map_page_pbn(page);
  int result = vdo_log_error_strerror(VDO_BAD_PAGE,
          "Expected page %llu but got page %llu instead",
          (unsigned long long) info->pbn,
          (unsigned long long) pbn);

  vdo_continue_completion(completion, result);
  return;
 }

 if (validity == VDO_BLOCK_MAP_PAGE_INVALID)
  vdo_format_block_map_page(page, nonce, info->pbn, false);

 info->recovery_lock = 0;
 set_info_state(info, PS_RESIDENT);
 distribute_page_over_waitq(info, &info->waiting);

 /*
 * Don't decrement until right before calling check_for_drain_complete() to
 * ensure that the above work can't cause the page cache to be freed out from under us.
 */

 cache->outstanding_reads--;
 check_for_drain_complete(cache->zone);
}

/**
 * handle_rebuild_read_error() - Handle a read error during a read-only rebuild.
 * @completion: The page load completion.
 */

static void handle_rebuild_read_error(struct vdo_completion *completion)
{
 struct page_info *info = completion->parent;
 struct vdo_page_cache *cache = info->cache;

 assert_on_cache_thread(cache, __func__);

 /*
 * We are doing a read-only rebuild, so treat this as a successful read
 * of an uninitialized page.
 */

 vio_record_metadata_io_error(as_vio(completion));
 ADD_ONCE(cache->stats.failed_reads, 1);
 memset(get_page_buffer(info), 0, VDO_BLOCK_SIZE);
 vdo_reset_completion(completion);
 page_is_loaded(completion);
}

static void load_cache_page_endio(struct bio *bio)
{
 struct vio *vio = bio->bi_private;
 struct page_info *info = vio->completion.parent;

 continue_vio_after_io(vio, page_is_loaded, info->cache->zone->thread_id);
}

/**
 * launch_page_load() - Begin the process of loading a page.
 *
 * Return: VDO_SUCCESS or an error code.
 */

static int __must_check launch_page_load(struct page_info *info,
      physical_block_number_t pbn)
{
 int result;
 vdo_action_fn callback;
 struct vdo_page_cache *cache = info->cache;

 assert_io_allowed(cache);

 result = set_info_pbn(info, pbn);
 if (result != VDO_SUCCESS)
  return result;

 result = VDO_ASSERT((info->busy == 0), "Page is not busy before loading.");
 if (result != VDO_SUCCESS)
  return result;

 set_info_state(info, PS_INCOMING);
 cache->outstanding_reads++;
 ADD_ONCE(cache->stats.pages_loaded, 1);
 callback = (cache->rebuilding ? handle_rebuild_read_error : handle_load_error);
 vdo_submit_metadata_vio(info->vio, pbn, load_cache_page_endio,
    callback, REQ_OP_READ | REQ_PRIO);
 return VDO_SUCCESS;
}

static void write_pages(struct vdo_completion *completion);

/** handle_flush_error() - Handle errors flushing the layer. */
static void handle_flush_error(struct vdo_completion *completion)
{
 struct page_info *info = completion->parent;

 vio_record_metadata_io_error(as_vio(completion));
 set_persistent_error(info->cache, "flush failed", completion->result);
 write_pages(completion);
}

static void flush_endio(struct bio *bio)
{
 struct vio *vio = bio->bi_private;
 struct page_info *info = vio->completion.parent;

 continue_vio_after_io(vio, write_pages, info->cache->zone->thread_id);
}

/** save_pages() - Attempt to save the outgoing pages by first flushing the layer. */
static void save_pages(struct vdo_page_cache *cache)
{
 struct page_info *info;
 struct vio *vio;

 if ((cache->pages_in_flush > 0) || (cache->pages_to_flush == 0))
  return;

 assert_io_allowed(cache);

 info = list_first_entry(&cache->outgoing_list, struct page_info, state_entry);

 cache->pages_in_flush = cache->pages_to_flush;
 cache->pages_to_flush = 0;
 ADD_ONCE(cache->stats.flush_count, 1);

 vio = info->vio;

 /*
 * We must make sure that the recovery journal entries that changed these pages were
 * successfully persisted, and thus must issue a flush before each batch of pages is
 * written to ensure this.
 */

 vdo_submit_flush_vio(vio, flush_endio, handle_flush_error);
}

/**
 * schedule_page_save() - Add a page to the outgoing list of pages waiting to be saved.
 *
 * Once in the list, a page may not be used until it has been written out.
 */

static void schedule_page_save(struct page_info *info)
{
 if (info->busy > 0) {
  info->write_status = WRITE_STATUS_DEFERRED;
  return;
 }

 info->cache->pages_to_flush++;
 info->cache->outstanding_writes++;
 set_info_state(info, PS_OUTGOING);
}

/**
 * launch_page_save() - Add a page to outgoing pages waiting to be saved, and then start saving
 * pages if another save is not in progress.
 */

static void launch_page_save(struct page_info *info)
{
 schedule_page_save(info);
 save_pages(info->cache);
}

/**
 * completion_needs_page() - Determine whether a given vdo_page_completion (as a waiter) is
 *                           requesting a given page number.
 * @context: A pointer to the pbn of the desired page.
 *
 * Implements waiter_match_fn.
 *
 * Return: true if the page completion is for the desired page number.
 */

static bool completion_needs_page(struct vdo_waiter *waiter, void *context)
{
 physical_block_number_t *pbn = context;

 return (page_completion_from_waiter(waiter)->pbn == *pbn);
}

/**
 * allocate_free_page() - Allocate a free page to the first completion in the waiting queue, and
 *                        any other completions that match it in page number.
 */

static void allocate_free_page(struct page_info *info)
{
 int result;
 struct vdo_waiter *oldest_waiter;
 physical_block_number_t pbn;
 struct vdo_page_cache *cache = info->cache;

 assert_on_cache_thread(cache, __func__);

 if (!vdo_waitq_has_waiters(&cache->free_waiters)) {
  if (cache->stats.cache_pressure > 0) {
   vdo_log_info("page cache pressure relieved");
   WRITE_ONCE(cache->stats.cache_pressure, 0);
  }

  return;
 }

 result = reset_page_info(info);
 if (result != VDO_SUCCESS) {
  set_persistent_error(cache, "cannot reset page info", result);
  return;
 }

 oldest_waiter = vdo_waitq_get_first_waiter(&cache->free_waiters);
 pbn = page_completion_from_waiter(oldest_waiter)->pbn;

 /*
 * Remove all entries which match the page number in question and push them onto the page
 * info's waitq.
 */

 vdo_waitq_dequeue_matching_waiters(&cache->free_waiters, completion_needs_page,
        &pbn, &info->waiting);
 cache->waiter_count -= vdo_waitq_num_waiters(&info->waiting);

 result = launch_page_load(info, pbn);
 if (result != VDO_SUCCESS) {
  vdo_waitq_notify_all_waiters(&info->waiting,
          complete_waiter_with_error, &result);
 }
}

/**
 * discard_a_page() - Begin the process of discarding a page.
 *
 * If no page is discardable, increments a count of deferred frees so that the next release of a
 * page which is no longer busy will kick off another discard cycle. This is an indication that the
 * cache is not big enough.
 *
 * If the selected page is not dirty, immediately allocates the page to the oldest completion
 * waiting for a free page.
 */

static void discard_a_page(struct vdo_page_cache *cache)
{
 struct page_info *info = select_lru_page(cache);

 if (info == NULL) {
  report_cache_pressure(cache);
  return;
 }

 if (!is_dirty(info)) {
  allocate_free_page(info);
  return;
 }

 VDO_ASSERT_LOG_ONLY(!is_in_flight(info),
       "page selected for discard is not in flight");

 cache->discard_count++;
 info->write_status = WRITE_STATUS_DISCARD;
 launch_page_save(info);
}

/**
 * discard_page_for_completion() - Helper used to trigger a discard so that the completion can get
 *                                 a different page.
 */

static void discard_page_for_completion(struct vdo_page_completion *vdo_page_comp)
{
 struct vdo_page_cache *cache = vdo_page_comp->cache;

 cache->waiter_count++;
 vdo_waitq_enqueue_waiter(&cache->free_waiters, &vdo_page_comp->waiter);
 discard_a_page(cache);
}

/**
 * discard_page_if_needed() - Helper used to trigger a discard if the cache needs another free
 *                            page.
 * @cache: The page cache.
 */

static void discard_page_if_needed(struct vdo_page_cache *cache)
{
 if (cache->waiter_count > cache->discard_count)
  discard_a_page(cache);
}

/**
 * write_has_finished() - Inform the cache that a write has finished (possibly with an error).
 * @info: The info structure for the page whose write just completed.
 *
 * Return: true if the page write was a discard.
 */

static bool write_has_finished(struct page_info *info)
{
 bool was_discard = (info->write_status == WRITE_STATUS_DISCARD);

 assert_on_cache_thread(info->cache, __func__);
 info->cache->outstanding_writes--;

 info->write_status = WRITE_STATUS_NORMAL;
 return was_discard;
}

/**
 * handle_page_write_error() - Handler for page write errors.
 * @completion: The page write vio.
 */

static void handle_page_write_error(struct vdo_completion *completion)
{
 int result = completion->result;
 struct page_info *info = completion->parent;
 struct vdo_page_cache *cache = info->cache;

 vio_record_metadata_io_error(as_vio(completion));

 /* If we're already read-only, write failures are to be expected. */
 if (result != VDO_READ_ONLY) {
  vdo_log_ratelimit(vdo_log_error,
      "failed to write block map page %llu",
      (unsigned long long) info->pbn);
 }

 set_info_state(info, PS_DIRTY);
 ADD_ONCE(cache->stats.failed_writes, 1);
 set_persistent_error(cache, "cannot write page", result);

 if (!write_has_finished(info))
  discard_page_if_needed(cache);

 check_for_drain_complete(cache->zone);
}

static void page_is_written_out(struct vdo_completion *completion);

static void write_cache_page_endio(struct bio *bio)
{
 struct vio *vio = bio->bi_private;
 struct page_info *info = vio->completion.parent;

 continue_vio_after_io(vio, page_is_written_out, info->cache->zone->thread_id);
}

/**
 * page_is_written_out() - Callback used when a page has been written out.
 * @completion: The vio which wrote the page. Its parent is a page_info.
 */

static void page_is_written_out(struct vdo_completion *completion)
{
 bool was_discard, reclaimed;
 u32 reclamations;
 struct page_info *info = completion->parent;
 struct vdo_page_cache *cache = info->cache;
 struct block_map_page *page = (struct block_map_page *) get_page_buffer(info);

 if (!page->header.initialized) {
  page->header.initialized = true;
  vdo_submit_metadata_vio(info->vio, info->pbn,
     write_cache_page_endio,
     handle_page_write_error,
     REQ_OP_WRITE | REQ_PRIO | REQ_PREFLUSH);
  return;
 }

 /* Handle journal updates and torn write protection. */
 vdo_release_recovery_journal_block_reference(cache->zone->block_map->journal,
           info->recovery_lock,
           VDO_ZONE_TYPE_LOGICAL,
           cache->zone->zone_number);
 info->recovery_lock = 0;
 was_discard = write_has_finished(info);
 reclaimed = (!was_discard || (info->busy > 0) || vdo_waitq_has_waiters(&info->waiting));

 set_info_state(info, PS_RESIDENT);

 reclamations = distribute_page_over_waitq(info, &info->waiting);
 ADD_ONCE(cache->stats.reclaimed, reclamations);

 if (was_discard)
  cache->discard_count--;

 if (reclaimed)
  discard_page_if_needed(cache);
 else
  allocate_free_page(info);

 check_for_drain_complete(cache->zone);
}

/**
 * write_pages() - Write the batch of pages which were covered by the layer flush which just
 *                 completed.
 * @flush_completion: The flush vio.
 *
 * This callback is registered in save_pages().
 */

static void write_pages(struct vdo_completion *flush_completion)
{
 struct vdo_page_cache *cache = ((struct page_info *) flush_completion->parent)->cache;

 /*
 * We need to cache these two values on the stack since it is possible for the last
 * page info to cause the page cache to get freed. Hence once we launch the last page,
 * it may be unsafe to dereference the cache.
 */

 bool has_unflushed_pages = (cache->pages_to_flush > 0);
 page_count_t pages_in_flush = cache->pages_in_flush;

 cache->pages_in_flush = 0;
 while (pages_in_flush-- > 0) {
  struct page_info *info =
   list_first_entry(&cache->outgoing_list, struct page_info,
      state_entry);

  list_del_init(&info->state_entry);
  if (vdo_is_read_only(info->cache->vdo)) {
   struct vdo_completion *completion = &info->vio->completion;

   vdo_reset_completion(completion);
   completion->callback = page_is_written_out;
   completion->error_handler = handle_page_write_error;
   vdo_fail_completion(completion, VDO_READ_ONLY);
   continue;
  }
  ADD_ONCE(info->cache->stats.pages_saved, 1);
  vdo_submit_metadata_vio(info->vio, info->pbn, write_cache_page_endio,
     handle_page_write_error, REQ_OP_WRITE | REQ_PRIO);
 }

 if (has_unflushed_pages) {
  /*
 * If there are unflushed pages, the cache can't have been freed, so this call is
 * safe.
 */

  save_pages(cache);
 }
}

/**
 * vdo_release_page_completion() - Release a VDO Page Completion.
 *
 * The page referenced by this completion (if any) will no longer be held busy by this completion.
 * If a page becomes discardable and there are completions awaiting free pages then a new round of
 * page discarding is started.
 */

void vdo_release_page_completion(struct vdo_completion *completion)
{
 struct page_info *discard_info = NULL;
 struct vdo_page_completion *page_completion = as_vdo_page_completion(completion);
 struct vdo_page_cache *cache;

 if (completion->result == VDO_SUCCESS) {
  if (!validate_completed_page_or_enter_read_only_mode(page_completion, false))
   return;

  if (--page_completion->info->busy == 0)
   discard_info = page_completion->info;
 }

 VDO_ASSERT_LOG_ONLY((page_completion->waiter.next_waiter == NULL),
       "Page being released after leaving all queues");

 page_completion->info = NULL;
 cache = page_completion->cache;
 assert_on_cache_thread(cache, __func__);

 if (discard_info != NULL) {
  if (discard_info->write_status == WRITE_STATUS_DEFERRED) {
   discard_info->write_status = WRITE_STATUS_NORMAL;
   launch_page_save(discard_info);
  }

  /*
 * if there are excess requests for pages (that have not already started discards)
 * we need to discard some page (which may be this one)
 */

  discard_page_if_needed(cache);
 }
}

/**
 * load_page_for_completion() - Helper function to load a page as described by a VDO Page
 *                              Completion.
 */

static void load_page_for_completion(struct page_info *info,
         struct vdo_page_completion *vdo_page_comp)
{
 int result;

 vdo_waitq_enqueue_waiter(&info->waiting, &vdo_page_comp->waiter);
 result = launch_page_load(info, vdo_page_comp->pbn);
 if (result != VDO_SUCCESS) {
  vdo_waitq_notify_all_waiters(&info->waiting,
          complete_waiter_with_error, &result);
 }
}

/**
 * vdo_get_page() - Initialize a page completion and get a block map page.
 * @page_completion: The vdo_page_completion to initialize.
 * @zone: The block map zone of the desired page.
 * @pbn: The absolute physical block of the desired page.
 * @writable: Whether the page can be modified.
 * @parent: The object to notify when the fetch is complete.
 * @callback: The notification callback.
 * @error_handler: The handler for fetch errors.
 * @requeue: Whether we must requeue when notifying the parent.
 *
 * May cause another page to be discarded (potentially writing a dirty page) and the one nominated
 * by the completion to be loaded from disk. When the callback is invoked, the page will be
 * resident in the cache and marked busy. All callers must call vdo_release_page_completion()
 * when they are done with the page to clear the busy mark.
 */

void vdo_get_page(struct vdo_page_completion *page_completion,
    struct block_map_zone *zone, physical_block_number_t pbn,
    bool writable, void *parent, vdo_action_fn callback,
    vdo_action_fn error_handler, bool requeue)
{
 struct vdo_page_cache *cache = &zone->page_cache;
 struct vdo_completion *completion = &page_completion->completion;
 struct page_info *info;

 assert_on_cache_thread(cache, __func__);
 VDO_ASSERT_LOG_ONLY((page_completion->waiter.next_waiter == NULL),
       "New page completion was not already on a wait queue");

 *page_completion = (struct vdo_page_completion) {
  .pbn = pbn,
  .writable = writable,
  .cache = cache,
 };

 vdo_initialize_completion(completion, cache->vdo, VDO_PAGE_COMPLETION);
 vdo_prepare_completion(completion, callback, error_handler,
          cache->zone->thread_id, parent);
 completion->requeue = requeue;

 if (page_completion->writable && vdo_is_read_only(cache->vdo)) {
  vdo_fail_completion(completion, VDO_READ_ONLY);
  return;
 }

 if (page_completion->writable)
  ADD_ONCE(cache->stats.write_count, 1);
 else
  ADD_ONCE(cache->stats.read_count, 1);

 info = find_page(cache, page_completion->pbn);
 if (info != NULL) {
  /* The page is in the cache already. */
  if ((info->write_status == WRITE_STATUS_DEFERRED) ||
      is_incoming(info) ||
      (is_outgoing(info) && page_completion->writable)) {
   /* The page is unusable until it has finished I/O. */
   ADD_ONCE(cache->stats.wait_for_page, 1);
   vdo_waitq_enqueue_waiter(&info->waiting, &page_completion->waiter);
   return;
  }

  if (is_valid(info)) {
   /* The page is usable. */
   ADD_ONCE(cache->stats.found_in_cache, 1);
   if (!is_present(info))
    ADD_ONCE(cache->stats.read_outgoing, 1);
   update_lru(info);
   info->busy++;
   complete_with_page(info, page_completion);
   return;
  }

  /* Something horrible has gone wrong. */
  VDO_ASSERT_LOG_ONLY(false"Info found in a usable state.");
 }

 /* The page must be fetched. */
 info = find_free_page(cache);
 if (info != NULL) {
  ADD_ONCE(cache->stats.fetch_required, 1);
  load_page_for_completion(info, page_completion);
  return;
 }

 /* The page must wait for a page to be discarded. */
 ADD_ONCE(cache->stats.discard_required, 1);
 discard_page_for_completion(page_completion);
}

/**
 * vdo_request_page_write() - Request that a VDO page be written out as soon as it is not busy.
 * @completion: The vdo_page_completion containing the page.
 */

void vdo_request_page_write(struct vdo_completion *completion)
{
 struct page_info *info;
 struct vdo_page_completion *vdo_page_comp = as_vdo_page_completion(completion);

 if (!validate_completed_page_or_enter_read_only_mode(vdo_page_comp, true))
  return;

 info = vdo_page_comp->info;
 set_info_state(info, PS_DIRTY);
 launch_page_save(info);
}

/**
 * vdo_get_cached_page() - Get the block map page from a page completion.
 * @completion: A vdo page completion whose callback has been called.
 * @page_ptr: A pointer to hold the page
 *
 * Return: VDO_SUCCESS or an error
 */

int vdo_get_cached_page(struct vdo_completion *completion,
   struct block_map_page **page_ptr)
{
 int result;
 struct vdo_page_completion *vpc;

 vpc = as_vdo_page_completion(completion);
 result = validate_completed_page(vpc, true);
 if (result == VDO_SUCCESS)
  *page_ptr = (struct block_map_page *) get_page_buffer(vpc->info);

 return result;
}

/**
 * vdo_invalidate_page_cache() - Invalidate all entries in the VDO page cache.
 *
 * There must not be any dirty pages in the cache.
 *
 * Return: A success or error code.
 */

int vdo_invalidate_page_cache(struct vdo_page_cache *cache)
{
 struct page_info *info;

 assert_on_cache_thread(cache, __func__);

 /* Make sure we don't throw away any dirty pages. */
 for (info = cache->infos; info < cache->infos + cache->page_count; info++) {
  int result = VDO_ASSERT(!is_dirty(info), "cache must have no dirty pages");

  if (result != VDO_SUCCESS)
   return result;
 }

 /* Reset the page map by re-allocating it. */
 vdo_int_map_free(vdo_forget(cache->page_map));
 return vdo_int_map_create(cache->page_count, &cache->page_map);
}

/**
 * get_tree_page_by_index() - Get the tree page for a given height and page index.
 *
 * Return: The requested page.
 */

static struct tree_page * __must_check get_tree_page_by_index(struct forest *forest,
             root_count_t root_index,
             height_t height,
             page_number_t page_index)
{
 page_number_t offset = 0;
 size_t segment;

 for (segment = 0; segment < forest->segments; segment++) {
  page_number_t border = forest->boundaries[segment].levels[height - 1];

  if (page_index < border) {
   struct block_map_tree *tree = &forest->trees[root_index];

   return &(tree->segments[segment].levels[height - 1][page_index - offset]);
  }

  offset = border;
 }

 return NULL;
}

/* Get the page referred to by the lock's tree slot at its current height. */
static inline struct tree_page *get_tree_page(const struct block_map_zone *zone,
           const struct tree_lock *lock)
{
 return get_tree_page_by_index(zone->block_map->forest, lock->root_index,
          lock->height,
          lock->tree_slots[lock->height].page_index);
}

/** vdo_copy_valid_page() - Validate and copy a buffer to a page. */
bool vdo_copy_valid_page(char *buffer, nonce_t nonce,
    physical_block_number_t pbn,
    struct block_map_page *page)
{
 struct block_map_page *loaded = (struct block_map_page *) buffer;
 enum block_map_page_validity validity =
  vdo_validate_block_map_page(loaded, nonce, pbn);

 if (validity == VDO_BLOCK_MAP_PAGE_VALID) {
  memcpy(page, loaded, VDO_BLOCK_SIZE);
  return true;
 }

 if (validity == VDO_BLOCK_MAP_PAGE_BAD) {
  vdo_log_error_strerror(VDO_BAD_PAGE,
           "Expected page %llu but got page %llu instead",
           (unsigned long long) pbn,
           (unsigned long long) vdo_get_block_map_page_pbn(loaded));
 }

 return false;
}

/**
 * in_cyclic_range() - Check whether the given value is between the lower and upper bounds, within
 *                     a cyclic range of values from 0 to (modulus - 1).
 * @lower: The lowest value to accept.
 * @value: The value to check.
 * @upper: The highest value to accept.
 * @modulus: The size of the cyclic space, no more than 2^15.
 *
 * The value and both bounds must be smaller than the modulus.
 *
 * Return: true if the value is in range.
 */

static bool in_cyclic_range(u16 lower, u16 value, u16 upper, u16 modulus)
{
 if (value < lower)
  value += modulus;
 if (upper < lower)
  upper += modulus;
 return (value <= upper);
}

/**
 * is_not_older() - Check whether a generation is strictly older than some other generation in the
 *                  context of a zone's current generation range.
 * @zone: The zone in which to do the comparison.
 * @a: The generation in question.
 * @b: The generation to compare to.
 *
 * Return: true if generation @a is not strictly older than generation @b in the context of @zone
 */

static bool __must_check is_not_older(struct block_map_zone *zone, u8 a, u8 b)
{
 int result;

 result = VDO_ASSERT((in_cyclic_range(zone->oldest_generation, a, zone->generation, 1 << 8) &&
        in_cyclic_range(zone->oldest_generation, b, zone->generation, 1 << 8)),
       "generation(s) %u, %u are out of range [%u, %u]",
       a, b, zone->oldest_generation, zone->generation);
 if (result != VDO_SUCCESS) {
  enter_zone_read_only_mode(zone, result);
  return true;
 }

 return in_cyclic_range(b, a, zone->generation, 1 << 8);
}

static void release_generation(struct block_map_zone *zone, u8 generation)
{
 int result;

 result = VDO_ASSERT((zone->dirty_page_counts[generation] > 0),
       "dirty page count underflow for generation %u", generation);
 if (result != VDO_SUCCESS) {
  enter_zone_read_only_mode(zone, result);
  return;
 }

 zone->dirty_page_counts[generation]--;
 while ((zone->dirty_page_counts[zone->oldest_generation] == 0) &&
        (zone->oldest_generation != zone->generation))
  zone->oldest_generation++;
}

static void set_generation(struct block_map_zone *zone, struct tree_page *page,
      u8 new_generation)
{
 u32 new_count;
 int result;
 bool decrement_old = vdo_waiter_is_waiting(&page->waiter);
 u8 old_generation = page->generation;

 if (decrement_old && (old_generation == new_generation))
  return;

 page->generation = new_generation;
 new_count = ++zone->dirty_page_counts[new_generation];
 result = VDO_ASSERT((new_count != 0), "dirty page count overflow for generation %u",
       new_generation);
 if (result != VDO_SUCCESS) {
  enter_zone_read_only_mode(zone, result);
  return;
 }

 if (decrement_old)
  release_generation(zone, old_generation);
}

static void write_page(struct tree_page *tree_page, struct pooled_vio *vio);

/* Implements waiter_callback_fn */
static void write_page_callback(struct vdo_waiter *waiter, void *context)
{
 write_page(container_of(waiter, struct tree_page, waiter), context);
}

static void acquire_vio(struct vdo_waiter *waiter, struct block_map_zone *zone)
{
 waiter->callback = write_page_callback;
 acquire_vio_from_pool(zone->vio_pool, waiter);
}

/* Return: true if all possible generations were not already active */
static bool attempt_increment(struct block_map_zone *zone)
{
 u8 generation = zone->generation + 1;

 if (zone->oldest_generation == generation)
  return false;

 zone->generation = generation;
 return true;
}

/* Launches a flush if one is not already in progress. */
static void enqueue_page(struct tree_page *page, struct block_map_zone *zone)
{
 if ((zone->flusher == NULL) && attempt_increment(zone)) {
  zone->flusher = page;
  acquire_vio(&page->waiter, zone);
  return;
 }

 vdo_waitq_enqueue_waiter(&zone->flush_waiters, &page->waiter);
}

static void write_page_if_not_dirtied(struct vdo_waiter *waiter, void *context)
{
 struct tree_page *page = container_of(waiter, struct tree_page, waiter);
 struct write_if_not_dirtied_context *write_context = context;

 if (page->generation == write_context->generation) {
  acquire_vio(waiter, write_context->zone);
  return;
 }

 enqueue_page(page, write_context->zone);
}

static void return_to_pool(struct block_map_zone *zone, struct pooled_vio *vio)
{
 return_vio_to_pool(vio);
 check_for_drain_complete(zone);
}

/* This callback is registered in write_initialized_page(). */
static void finish_page_write(struct vdo_completion *completion)
{
 bool dirty;
 struct vio *vio = as_vio(completion);
 struct pooled_vio *pooled = container_of(vio, struct pooled_vio, vio);
 struct tree_page *page = completion->parent;
 struct block_map_zone *zone = pooled->context;

 vdo_release_recovery_journal_block_reference(zone->block_map->journal,
           page->writing_recovery_lock,
           VDO_ZONE_TYPE_LOGICAL,
           zone->zone_number);

 dirty = (page->writing_generation != page->generation);
 release_generation(zone, page->writing_generation);
 page->writing = false;

 if (zone->flusher == page) {
  struct write_if_not_dirtied_context context = {
   .zone = zone,
   .generation = page->writing_generation,
  };

  vdo_waitq_notify_all_waiters(&zone->flush_waiters,
          write_page_if_not_dirtied, &context);
  if (dirty && attempt_increment(zone)) {
   write_page(page, pooled);
   return;
  }

  zone->flusher = NULL;
 }

 if (dirty) {
  enqueue_page(page, zone);
 } else if ((zone->flusher == NULL) && vdo_waitq_has_waiters(&zone->flush_waiters) &&
     attempt_increment(zone)) {
  zone->flusher = container_of(vdo_waitq_dequeue_waiter(&zone->flush_waiters),
          struct tree_page, waiter);
  write_page(zone->flusher, pooled);
  return;
 }

 return_to_pool(zone, pooled);
}

static void handle_write_error(struct vdo_completion *completion)
{
 int result = completion->result;
 struct vio *vio = as_vio(completion);
 struct pooled_vio *pooled = container_of(vio, struct pooled_vio, vio);
 struct block_map_zone *zone = pooled->context;

 vio_record_metadata_io_error(vio);
 enter_zone_read_only_mode(zone, result);
 return_to_pool(zone, pooled);
}

static void write_page_endio(struct bio *bio);

static void write_initialized_page(struct vdo_completion *completion)
{
 struct vio *vio = as_vio(completion);
 struct pooled_vio *pooled = container_of(vio, struct pooled_vio, vio);
 struct block_map_zone *zone = pooled->context;
 struct tree_page *tree_page = completion->parent;
 struct block_map_page *page = (struct block_map_page *) vio->data;
 blk_opf_t operation = REQ_OP_WRITE | REQ_PRIO;

 /*
 * Now that we know the page has been written at least once, mark the copy we are writing
 * as initialized.
 */

 page->header.initialized = true;

 if (zone->flusher == tree_page)
  operation |= REQ_PREFLUSH;

 vdo_submit_metadata_vio(vio, vdo_get_block_map_page_pbn(page),
    write_page_endio, handle_write_error,
    operation);
}

static void write_page_endio(struct bio *bio)
{
 struct pooled_vio *vio = bio->bi_private;
 struct block_map_zone *zone = vio->context;
 struct block_map_page *page = (struct block_map_page *) vio->vio.data;

 continue_vio_after_io(&vio->vio,
         (page->header.initialized ?
          finish_page_write : write_initialized_page),
         zone->thread_id);
}

static void write_page(struct tree_page *tree_page, struct pooled_vio *vio)
{
 struct vdo_completion *completion = &vio->vio.completion;
 struct block_map_zone *zone = vio->context;
 struct block_map_page *page = vdo_as_block_map_page(tree_page);

 if ((zone->flusher != tree_page) &&
     is_not_older(zone, tree_page->generation, zone->generation)) {
  /*
 * This page was re-dirtied after the last flush was issued, hence we need to do
 * another flush.
 */

  enqueue_page(tree_page, zone);
  return_to_pool(zone, vio);
  return;
 }

 completion->parent = tree_page;
 memcpy(vio->vio.data, tree_page->page_buffer, VDO_BLOCK_SIZE);
 completion->callback_thread_id = zone->thread_id;

 tree_page->writing = true;
 tree_page->writing_generation = tree_page->generation;
 tree_page->writing_recovery_lock = tree_page->recovery_lock;

 /* Clear this now so that we know this page is not on any dirty list. */
 tree_page->recovery_lock = 0;

 /*
 * We've already copied the page into the vio which will write it, so if it was not yet
 * initialized, the first write will indicate that (for torn write protection). It is now
 * safe to mark it as initialized in memory since if the write fails, the in memory state
 * will become irrelevant.
 */

 if (page->header.initialized) {
  write_initialized_page(completion);
  return;
 }

 page->header.initialized = true;
 vdo_submit_metadata_vio(&vio->vio, vdo_get_block_map_page_pbn(page),
    write_page_endio, handle_write_error,
    REQ_OP_WRITE | REQ_PRIO);
}

/* Release a lock on a page which was being loaded or allocated. */
static void release_page_lock(struct data_vio *data_vio, char *what)
{
 struct block_map_zone *zone;
 struct tree_lock *lock_holder;
 struct tree_lock *lock = &data_vio->tree_lock;

 VDO_ASSERT_LOG_ONLY(lock->locked,
       "release of unlocked block map page %s for key %llu in tree %u",
       what, (unsigned long long) lock->key, lock->root_index);

 zone = data_vio->logical.zone->block_map_zone;
 lock_holder = vdo_int_map_remove(zone->loading_pages, lock->key);
 VDO_ASSERT_LOG_ONLY((lock_holder == lock),
       "block map page %s mismatch for key %llu in tree %u",
       what, (unsigned long long) lock->key, lock->root_index);
 lock->locked = false;
}

static void finish_lookup(struct data_vio *data_vio, int result)
{
 data_vio->tree_lock.height = 0;

 --data_vio->logical.zone->block_map_zone->active_lookups;

 set_data_vio_logical_callback(data_vio, continue_data_vio_with_block_map_slot);
 data_vio->vio.completion.error_handler = handle_data_vio_error;
 continue_data_vio_with_error(data_vio, result);
}

static void abort_lookup_for_waiter(struct vdo_waiter *waiter, void *context)
{
 struct data_vio *data_vio = vdo_waiter_as_data_vio(waiter);
 int result = *((int *) context);

 if (!data_vio->write) {
  if (result == VDO_NO_SPACE)
   result = VDO_SUCCESS;
 } else if (result != VDO_NO_SPACE) {
  result = VDO_READ_ONLY;
 }

 finish_lookup(data_vio, result);
}

static void abort_lookup(struct data_vio *data_vio, int result, char *what)
{
 if (result != VDO_NO_SPACE)
  enter_zone_read_only_mode(data_vio->logical.zone->block_map_zone, result);

 if (data_vio->tree_lock.locked) {
  release_page_lock(data_vio, what);
  vdo_waitq_notify_all_waiters(&data_vio->tree_lock.waiters,
          abort_lookup_for_waiter,
          &result);
 }

 finish_lookup(data_vio, result);
}

static void abort_load(struct data_vio *data_vio, int result)
{
 abort_lookup(data_vio, result, "load");
}

static bool __must_check is_invalid_tree_entry(const struct vdo *vdo,
            const struct data_location *mapping,
            height_t height)
{
 if (!vdo_is_valid_location(mapping) ||
     vdo_is_state_compressed(mapping->state) ||
     (vdo_is_mapped_location(mapping) && (mapping->pbn == VDO_ZERO_BLOCK)))
  return true;

 /* Roots aren't physical data blocks, so we can't check their PBNs. */
 if (height == VDO_BLOCK_MAP_TREE_HEIGHT)
  return false;

 return !vdo_is_physical_data_block(vdo->depot, mapping->pbn);
}

static void load_block_map_page(struct block_map_zone *zone, struct data_vio *data_vio);
static void allocate_block_map_page(struct block_map_zone *zone,
        struct data_vio *data_vio);

static void continue_with_loaded_page(struct data_vio *data_vio,
          struct block_map_page *page)
{
 struct tree_lock *lock = &data_vio->tree_lock;
 struct block_map_tree_slot slot = lock->tree_slots[lock->height];
 struct data_location mapping =
  vdo_unpack_block_map_entry(&page->entries[slot.block_map_slot.slot]);

 if (is_invalid_tree_entry(vdo_from_data_vio(data_vio), &mapping, lock->height)) {
  vdo_log_error_strerror(VDO_BAD_MAPPING,
           "Invalid block map tree PBN: %llu with state %u for page index %u at height %u",
           (unsigned long long) mapping.pbn, mapping.state,
           lock->tree_slots[lock->height - 1].page_index,
           lock->height - 1);
  abort_load(data_vio, VDO_BAD_MAPPING);
  return;
 }

 if (!vdo_is_mapped_location(&mapping)) {
  /* The page we need is unallocated */
  allocate_block_map_page(data_vio->logical.zone->block_map_zone,
     data_vio);
  return;
 }

 lock->tree_slots[lock->height - 1].block_map_slot.pbn = mapping.pbn;
 if (lock->height == 1) {
  finish_lookup(data_vio, VDO_SUCCESS);
  return;
 }

 /* We know what page we need to load next */
 load_block_map_page(data_vio->logical.zone->block_map_zone, data_vio);
}

static void continue_load_for_waiter(struct vdo_waiter *waiter, void *context)
{
 struct data_vio *data_vio = vdo_waiter_as_data_vio(waiter);

 data_vio->tree_lock.height--;
 continue_with_loaded_page(data_vio, context);
}

static void finish_block_map_page_load(struct vdo_completion *completion)
{
 physical_block_number_t pbn;
 struct tree_page *tree_page;
 struct block_map_page *page;
 nonce_t nonce;
 struct vio *vio = as_vio(completion);
 struct pooled_vio *pooled = vio_as_pooled_vio(vio);
 struct data_vio *data_vio = completion->parent;
 struct block_map_zone *zone = pooled->context;
 struct tree_lock *tree_lock = &data_vio->tree_lock;

 tree_lock->height--;
 pbn = tree_lock->tree_slots[tree_lock->height].block_map_slot.pbn;
 tree_page = get_tree_page(zone, tree_lock);
 page = (struct block_map_page *) tree_page->page_buffer;
 nonce = zone->block_map->nonce;

 if (!vdo_copy_valid_page(vio->data, nonce, pbn, page))
  vdo_format_block_map_page(page, nonce, pbn, false);
 return_vio_to_pool(pooled);

 /* Release our claim to the load and wake any waiters */
 release_page_lock(data_vio, "load");
 vdo_waitq_notify_all_waiters(&tree_lock->waiters, continue_load_for_waiter, page);
 continue_with_loaded_page(data_vio, page);
}

static void handle_io_error(struct vdo_completion *completion)
{
 int result = completion->result;
 struct vio *vio = as_vio(completion);
 struct pooled_vio *pooled = container_of(vio, struct pooled_vio, vio);
 struct data_vio *data_vio = completion->parent;

 vio_record_metadata_io_error(vio);
 return_vio_to_pool(pooled);
 abort_load(data_vio, result);
}

static void load_page_endio(struct bio *bio)
{
 struct vio *vio = bio->bi_private;
 struct data_vio *data_vio = vio->completion.parent;

 continue_vio_after_io(vio, finish_block_map_page_load,
         data_vio->logical.zone->thread_id);
}

static void load_page(struct vdo_waiter *waiter, void *context)
{
 struct pooled_vio *pooled = context;
 struct data_vio *data_vio = vdo_waiter_as_data_vio(waiter);
 struct tree_lock *lock = &data_vio->tree_lock;
 physical_block_number_t pbn = lock->tree_slots[lock->height - 1].block_map_slot.pbn;

 pooled->vio.completion.parent = data_vio;
 vdo_submit_metadata_vio(&pooled->vio, pbn, load_page_endio,
    handle_io_error, REQ_OP_READ | REQ_PRIO);
}

/*
 * If the page is already locked, queue up to wait for the lock to be released. If the lock is
 * acquired, @data_vio->tree_lock.locked will be true.
 */

static int attempt_page_lock(struct block_map_zone *zone, struct data_vio *data_vio)
{
 int result;
 struct tree_lock *lock_holder;
 struct tree_lock *lock = &data_vio->tree_lock;
 height_t height = lock->height;
 struct block_map_tree_slot tree_slot = lock->tree_slots[height];
 union page_key key;

 key.descriptor = (struct page_descriptor) {
  .root_index = lock->root_index,
  .height = height,
  .page_index = tree_slot.page_index,
  .slot = tree_slot.block_map_slot.slot,
 };
 lock->key = key.key;

 result = vdo_int_map_put(zone->loading_pages, lock->key,
     lock, false, (void **) &lock_holder);
 if (result != VDO_SUCCESS)
  return result;

 if (lock_holder == NULL) {
  /* We got the lock */
  data_vio->tree_lock.locked = true;
  return VDO_SUCCESS;
 }

 /* Someone else is loading or allocating the page we need */
 vdo_waitq_enqueue_waiter(&lock_holder->waiters, &data_vio->waiter);
 return VDO_SUCCESS;
}

/* Load a block map tree page from disk, for the next level in the data vio tree lock. */
static void load_block_map_page(struct block_map_zone *zone, struct data_vio *data_vio)
{
 int result;

 result = attempt_page_lock(zone, data_vio);
 if (result != VDO_SUCCESS) {
  abort_load(data_vio, result);
  return;
 }

 if (data_vio->tree_lock.locked) {
  data_vio->waiter.callback = load_page;
  acquire_vio_from_pool(zone->vio_pool, &data_vio->waiter);
 }
}

static void allocation_failure(struct vdo_completion *completion)
{
 struct data_vio *data_vio = as_data_vio(completion);

 if (vdo_requeue_completion_if_needed(completion,
          data_vio->logical.zone->thread_id))
  return;

 abort_lookup(data_vio, completion->result, "allocation");
}

static void continue_allocation_for_waiter(struct vdo_waiter *waiter, void *context)
{
 struct data_vio *data_vio = vdo_waiter_as_data_vio(waiter);
 struct tree_lock *tree_lock = &data_vio->tree_lock;
 physical_block_number_t pbn = *((physical_block_number_t *) context);

 tree_lock->height--;
 data_vio->tree_lock.tree_slots[tree_lock->height].block_map_slot.pbn = pbn;

 if (tree_lock->height == 0) {
  finish_lookup(data_vio, VDO_SUCCESS);
  return;
 }

 allocate_block_map_page(data_vio->logical.zone->block_map_zone, data_vio);
}

/** expire_oldest_list() - Expire the oldest list. */
static void expire_oldest_list(struct dirty_lists *dirty_lists)
{
 block_count_t i = dirty_lists->offset++;

 dirty_lists->oldest_period++;
 if (!list_empty(&dirty_lists->eras[i][VDO_TREE_PAGE])) {
  list_splice_tail_init(&dirty_lists->eras[i][VDO_TREE_PAGE],
          &dirty_lists->expired[VDO_TREE_PAGE]);
 }

 if (!list_empty(&dirty_lists->eras[i][VDO_CACHE_PAGE])) {
  list_splice_tail_init(&dirty_lists->eras[i][VDO_CACHE_PAGE],
          &dirty_lists->expired[VDO_CACHE_PAGE]);
 }

 if (dirty_lists->offset == dirty_lists->maximum_age)
  dirty_lists->offset = 0;
}


/** update_period() - Update the dirty_lists period if necessary. */
static void update_period(struct dirty_lists *dirty, sequence_number_t period)
{
 while (dirty->next_period <= period) {
  if ((dirty->next_period - dirty->oldest_period) == dirty->maximum_age)
   expire_oldest_list(dirty);
  dirty->next_period++;
 }
}

/** write_expired_elements() - Write out the expired list. */
static void write_expired_elements(struct block_map_zone *zone)
{
 struct tree_page *page, *ttmp;
 struct page_info *info, *ptmp;
 struct list_head *expired;
 u8 generation = zone->generation;

 expired = &zone->dirty_lists->expired[VDO_TREE_PAGE];
 list_for_each_entry_safe(page, ttmp, expired, entry) {
  int result;

  list_del_init(&page->entry);

  result = VDO_ASSERT(!vdo_waiter_is_waiting(&page->waiter),
        "Newly expired page not already waiting to write");
  if (result != VDO_SUCCESS) {
   enter_zone_read_only_mode(zone, result);
   continue;
  }

  set_generation(zone, page, generation);
  if (!page->writing)
   enqueue_page(page, zone);
 }

 expired = &zone->dirty_lists->expired[VDO_CACHE_PAGE];
 list_for_each_entry_safe(info, ptmp, expired, state_entry) {
  list_del_init(&info->state_entry);
  schedule_page_save(info);
 }

 save_pages(&zone->page_cache);
}

/**
 * add_to_dirty_lists() - Add an element to the dirty lists.
 * @zone: The zone in which we are operating.
 * @entry: The list entry of the element to add.
 * @type: The type of page.
 * @old_period: The period in which the element was previously dirtied, or 0 if it was not dirty.
 * @new_period: The period in which the element has now been dirtied, or 0 if it does not hold a
 *              lock.
 */

static void add_to_dirty_lists(struct block_map_zone *zone,
          struct list_head *entry,
          enum block_map_page_type type,
          sequence_number_t old_period,
          sequence_number_t new_period)
{
 struct dirty_lists *dirty_lists = zone->dirty_lists;

 if ((old_period == new_period) || ((old_period != 0) && (old_period < new_period)))
  return;

 if (new_period < dirty_lists->oldest_period) {
  list_move_tail(entry, &dirty_lists->expired[type]);
 } else {
  update_period(dirty_lists, new_period);
  list_move_tail(entry,
          &dirty_lists->eras[new_period % dirty_lists->maximum_age][type]);
 }

 write_expired_elements(zone);
}

/*
 * Record the allocation in the tree and wake any waiters now that the write lock has been
 * released.
 */

static void finish_block_map_allocation(struct vdo_completion *completion)
{
 physical_block_number_t pbn;
 struct tree_page *tree_page;
 struct block_map_page *page;
 sequence_number_t old_lock;
 struct data_vio *data_vio = as_data_vio(completion);
 struct block_map_zone *zone = data_vio->logical.zone->block_map_zone;
 struct tree_lock *tree_lock = &data_vio->tree_lock;
 height_t height = tree_lock->height;

 assert_data_vio_in_logical_zone(data_vio);

 tree_page = get_tree_page(zone, tree_lock);
 pbn = tree_lock->tree_slots[height - 1].block_map_slot.pbn;

 /* Record the allocation. */
 page = (struct block_map_page *) tree_page->page_buffer;
 old_lock = tree_page->recovery_lock;
 vdo_update_block_map_page(page, data_vio, pbn,
      VDO_MAPPING_STATE_UNCOMPRESSED,
      &tree_page->recovery_lock);

 if (vdo_waiter_is_waiting(&tree_page->waiter)) {
  /* This page is waiting to be written out. */
  if (zone->flusher != tree_page) {
   /*
 * The outstanding flush won't cover the update we just made,
 * so mark the page as needing another flush.
 */

   set_generation(zone, tree_page, zone->generation);
  }
 } else {
  /* Put the page on a dirty list */
  if (old_lock == 0)
   INIT_LIST_HEAD(&tree_page->entry);
  add_to_dirty_lists(zone, &tree_page->entry, VDO_TREE_PAGE,
       old_lock, tree_page->recovery_lock);
 }

 tree_lock->height--;
 if (height > 1) {
  /* Format the interior node we just allocated (in memory). */
  tree_page = get_tree_page(zone, tree_lock);
  vdo_format_block_map_page(tree_page->page_buffer,
       zone->block_map->nonce,
       pbn, false);
 }

 /* Release our claim to the allocation and wake any waiters */
 release_page_lock(data_vio, "allocation");
 vdo_waitq_notify_all_waiters(&tree_lock->waiters,
         continue_allocation_for_waiter, &pbn);
 if (tree_lock->height == 0) {
  finish_lookup(data_vio, VDO_SUCCESS);
  return;
 }

 allocate_block_map_page(zone, data_vio);
}

static void release_block_map_write_lock(struct vdo_completion *completion)
{
 struct data_vio *data_vio = as_data_vio(completion);

 assert_data_vio_in_allocated_zone(data_vio);

 release_data_vio_allocation_lock(data_vio, true);
 launch_data_vio_logical_callback(data_vio, finish_block_map_allocation);
}

/*
 * Newly allocated block map pages are set to have to MAXIMUM_REFERENCES after they are journaled,
 * to prevent deduplication against the block after we release the write lock on it, but before we
 * write out the page.
 */

static void set_block_map_page_reference_count(struct vdo_completion *completion)
{
 struct data_vio *data_vio = as_data_vio(completion);

 assert_data_vio_in_allocated_zone(data_vio);

 completion->callback = release_block_map_write_lock;
 vdo_modify_reference_count(completion, &data_vio->increment_updater);
}

static void journal_block_map_allocation(struct vdo_completion *completion)
{
 struct data_vio *data_vio = as_data_vio(completion);

 assert_data_vio_in_journal_zone(data_vio);

 set_data_vio_allocated_zone_callback(data_vio,
          set_block_map_page_reference_count);
 vdo_add_recovery_journal_entry(completion->vdo->recovery_journal, data_vio);
}

static void allocate_block(struct vdo_completion *completion)
{
 struct data_vio *data_vio = as_data_vio(completion);
 struct tree_lock *lock = &data_vio->tree_lock;
 physical_block_number_t pbn;

 assert_data_vio_in_allocated_zone(data_vio);

 if (!vdo_allocate_block_in_zone(data_vio))
  return;

 pbn = data_vio->allocation.pbn;
 lock->tree_slots[lock->height - 1].block_map_slot.pbn = pbn;
 data_vio->increment_updater = (struct reference_updater) {
  .operation = VDO_JOURNAL_BLOCK_MAP_REMAPPING,
  .increment = true,
  .zpbn = {
   .pbn = pbn,
   .state = VDO_MAPPING_STATE_UNCOMPRESSED,
  },
  .lock = data_vio->allocation.lock,
 };

 launch_data_vio_journal_callback(data_vio, journal_block_map_allocation);
}

static void allocate_block_map_page(struct block_map_zone *zone,
        struct data_vio *data_vio)
{
 int result;

 if (!data_vio->write || data_vio->is_discard) {
--> --------------------

--> maximum size reached

--> --------------------

Messung V0.5
C=95 H=94 G=94

¤ Dauer der Verarbeitung: 0.11 Sekunden  (vorverarbeitet)  ¤

*© Formatika GbR, Deutschland






Wurzel

Suchen

Beweissystem der NASA

Beweissystem Isabelle

NIST Cobol Testsuite

Cephes Mathematical Library

Wiener Entwicklungsmethode

Haftungshinweis

Die Informationen auf dieser Webseite wurden nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit, noch Qualität der bereit gestellten Informationen zugesichert.

Bemerkung:

Die farbliche Syntaxdarstellung und die Messung sind noch experimentell.