// SPDX-License-Identifier: GPL-2.0-only /* * Copyright (C) 2002 Sistina Software (UK) Limited. * Copyright (C) 2006 Red Hat GmbH * * This file is released under the GPL. * * Kcopyd provides a simple interface for copying an area of one * block-device to one or more other block-devices, with an asynchronous * completion notification.
*/
/* *---------------------------------------------------------------- * Each kcopyd client has its own little pool of preallocated * pages for kcopyd io. *---------------------------------------------------------------
*/ struct dm_kcopyd_client { struct page_list *pages; unsignedint nr_reserved_pages; unsignedint nr_free_pages; unsignedint sub_job_size;
/* * We maintain four lists of jobs: * * i) jobs waiting for pages * ii) jobs that have pages, and are waiting for the io to be issued. * iii) jobs that don't need to do any IO and just run a callback * iv) jobs that have completed. * * All four of these are protected by job_lock.
*/
spinlock_t job_lock; struct list_head callback_jobs; struct list_head complete_jobs; struct list_head io_jobs; struct list_head pages_jobs;
};
staticstruct page_list zero_page_list;
static DEFINE_SPINLOCK(throttle_spinlock);
/* * IO/IDLE accounting slowly decays after (1 << ACCOUNT_INTERVAL_SHIFT) period. * When total_period >= (1 << ACCOUNT_INTERVAL_SHIFT) the counters are divided * by 2.
*/ #define ACCOUNT_INTERVAL_SHIFT SHIFT_HZ
/* * Sleep this number of milliseconds. * * The value was decided experimentally. * Smaller values seem to cause an increased copy rate above the limit. * The reason for this is unknown but possibly due to jiffies rounding errors * or read/write cache inside the disk.
*/ #define SLEEP_USEC 100000
/* * Maximum number of sleep events. There is a theoretical livelock if more * kcopyd clients do work simultaneously which this limit avoids.
*/ #define MAX_SLEEPS 10
/* * Add the provided pages to a client's free page list, releasing * back to the system any beyond the reserved_pages limit.
*/ staticvoid kcopyd_put_pages(struct dm_kcopyd_client *kc, struct page_list *pl)
{ struct page_list *next;
do {
pl = alloc_pl(__GFP_NOWARN | __GFP_NORETRY | __GFP_KSWAPD_RECLAIM); if (unlikely(!pl)) { /* Use reserved pages */
pl = kc->pages; if (unlikely(!pl)) goto out_of_memory;
kc->pages = pl->next;
kc->nr_free_pages--;
}
pl->next = *pages;
*pages = pl;
} while (--nr);
return 0;
out_of_memory: if (*pages)
kcopyd_put_pages(kc, *pages); return -ENOMEM;
}
/* * These three functions resize the page pool.
*/ staticvoid drop_pages(struct page_list *pl)
{ struct page_list *next;
while (pl) {
next = pl->next;
free_pl(pl);
pl = next;
}
}
/* * Allocate and reserve nr_pages for the use of a specific client.
*/ staticint client_reserve_pages(struct dm_kcopyd_client *kc, unsignedint nr_pages)
{ unsignedint i; struct page_list *pl = NULL, *next;
for (i = 0; i < nr_pages; i++) {
next = alloc_pl(GFP_KERNEL); if (!next) { if (pl)
drop_pages(pl); return -ENOMEM;
}
next->next = pl;
pl = next;
}
/* *--------------------------------------------------------------- * kcopyd_jobs need to be allocated by the *clients* of kcopyd, * for this reason we use a mempool to prevent the client from * ever having to do io (which could cause a deadlock). *---------------------------------------------------------------
*/ struct kcopyd_job { struct dm_kcopyd_client *kc; struct list_head list; unsignedint flags;
/* * Error state of the job.
*/ int read_err; unsignedlong write_err;
/* * The destinations for the transfer.
*/ unsignedint num_dests; struct dm_io_region dests[DM_KCOPYD_MAX_REGIONS];
struct page_list *pages;
/* * Set this to ensure you are notified when the job has * completed. 'context' is for callback to use.
*/
dm_kcopyd_notify_fn fn; void *context;
/* * These fields are only used if the job has been split * into more manageable parts.
*/ struct mutex lock;
atomic_t sub_jobs;
sector_t progress;
sector_t write_offset;
/* * Functions to push and pop a job onto the head of a given job * list.
*/ staticstruct kcopyd_job *pop_io_job(struct list_head *jobs, struct dm_kcopyd_client *kc)
{ struct kcopyd_job *job;
/* * For I/O jobs, pop any read, any write without sequential write * constraint and sequential writes that are at the right position.
*/
list_for_each_entry(job, jobs, list) { if (job->op == REQ_OP_READ ||
!(job->flags & BIT(DM_KCOPYD_WRITE_SEQ))) {
list_del(&job->list); return job;
}
/* * These three functions process 1 item from the corresponding * job list. * * They return: * < 0: error * 0: success * > 0: can't process yet.
*/ staticint run_complete_job(struct kcopyd_job *job)
{ void *context = job->context; int read_err = job->read_err; unsignedlong write_err = job->write_err;
dm_kcopyd_notify_fn fn = job->fn; struct dm_kcopyd_client *kc = job->kc;
if (job->pages && job->pages != &zero_page_list)
kcopyd_put_pages(kc, job->pages); /* * If this is the master job, the sub jobs have already * completed so we can free everything.
*/ if (job->master_job == job) {
mutex_destroy(&job->lock);
mempool_free(job, &kc->job_pool);
}
fn(read_err, write_err, context);
if (atomic_dec_and_test(&kc->nr_jobs))
wake_up(&kc->destroyq);
/* * Request io on as many buffer heads as we can currently get for * a particular job.
*/ staticint run_io_job(struct kcopyd_job *job)
{ int r; struct dm_io_request io_req = {
.bi_opf = job->op,
.mem.type = DM_IO_PAGE_LIST,
.mem.ptr.pl = job->pages,
.mem.offset = 0,
.notify.fn = complete_io,
.notify.context = job,
.client = job->kc->io_client,
};
/* * If we need to write sequentially and some reads or writes failed, * no point in continuing.
*/ if (job->flags & BIT(DM_KCOPYD_WRITE_SEQ) &&
job->master_job->write_err) {
job->write_err = job->master_job->write_err; return -EIO;
}
io_job_start(job->kc->throttle);
if (job->op == REQ_OP_READ)
r = dm_io(&io_req, 1, &job->source, NULL, IOPRIO_DEFAULT); else
r = dm_io(&io_req, job->num_dests, job->dests, NULL, IOPRIO_DEFAULT);
r = kcopyd_get_pages(job->kc, nr_pages, &job->pages); if (!r) { /* this job is ready for io */
push(&job->kc->io_jobs, job); return 0;
}
if (r == -ENOMEM) /* can't complete now */ return 1;
return r;
}
/* * Run through a list for as long as possible. Returns the count * of successful jobs.
*/ staticint process_jobs(struct list_head *jobs, struct dm_kcopyd_client *kc, int (*fn)(struct kcopyd_job *))
{ struct kcopyd_job *job; int r, count = 0;
while ((job = pop(jobs, kc))) {
r = fn(job);
if (r < 0) { /* error this rogue job */ if (op_is_write(job->op))
job->write_err = (unsignedlong) -1L; else
job->read_err = 1;
push(&kc->complete_jobs, job);
wake(kc); break;
}
if (r > 0) { /* * We couldn't service this job ATM, so * push this job back onto the list.
*/
push_head(jobs, job); break;
}
count++;
}
return count;
}
/* * kcopyd does this every time it's woken up.
*/ staticvoid do_work(struct work_struct *work)
{ struct dm_kcopyd_client *kc = container_of(work, struct dm_kcopyd_client, kcopyd_work); struct blk_plug plug;
/* * The order that these are called is *very* important. * complete jobs can free some pages for pages jobs. * Pages jobs when successful will jump onto the io jobs * list. io jobs call wake when they complete and it all * starts again.
*/
spin_lock_irq(&kc->job_lock);
list_splice_tail_init(&kc->callback_jobs, &kc->complete_jobs);
spin_unlock_irq(&kc->job_lock);
/* * If we are copying a small region we just dispatch a single job * to do the copy, otherwise the io has to be split up into many * jobs.
*/ staticvoid dispatch_job(struct kcopyd_job *job)
{ struct dm_kcopyd_client *kc = job->kc;
/* update the error */ if (read_err)
job->read_err = 1;
if (write_err)
job->write_err |= write_err;
/* * Only dispatch more work if there hasn't been an error.
*/ if ((!job->read_err && !job->write_err) ||
job->flags & BIT(DM_KCOPYD_IGNORE_ERROR)) { /* get the next chunk of work */
progress = job->progress;
count = job->source.count - progress; if (count) { if (count > kc->sub_job_size)
count = kc->sub_job_size;
/* * Queue the completion callback to the kcopyd thread. * * Some callers assume that all the completions are called * from a single thread and don't race with each other. * * We must not call the callback directly here because this * code may not be executing in the thread.
*/
push(&kc->complete_jobs, job);
wake(kc);
}
}
/* * Create some sub jobs to share the work between them.
*/ staticvoid split_job(struct kcopyd_job *master_job)
{ int i;
atomic_inc(&master_job->kc->nr_jobs);
atomic_set(&master_job->sub_jobs, SPLIT_COUNT); for (i = 0; i < SPLIT_COUNT; i++) {
master_job[i + 1].master_job = master_job;
segment_complete(0, 0u, &master_job[i + 1]);
}
}
/* * Allocate an array of jobs consisting of one master job * followed by SPLIT_COUNT sub jobs.
*/
job = mempool_alloc(&kc->job_pool, GFP_NOIO);
mutex_init(&job->lock);
/* * set up for the read.
*/
job->kc = kc;
job->flags = flags;
job->read_err = 0;
job->write_err = 0;
/* * If one of the destination is a host-managed zoned block device, * we need to write sequentially. If one of the destination is a * host-aware device, then leave it to the caller to choose what to do.
*/ if (!(job->flags & BIT(DM_KCOPYD_WRITE_SEQ))) { for (i = 0; i < job->num_dests; i++) { if (bdev_is_zoned(dests[i].bdev)) {
job->flags |= BIT(DM_KCOPYD_WRITE_SEQ); break;
}
}
}
/* * If we need to write sequentially, errors cannot be ignored.
*/ if (job->flags & BIT(DM_KCOPYD_WRITE_SEQ) &&
job->flags & BIT(DM_KCOPYD_IGNORE_ERROR))
job->flags &= ~BIT(DM_KCOPYD_IGNORE_ERROR);
/* * Use WRITE ZEROES to optimize zeroing if all dests support it.
*/
job->op = REQ_OP_WRITE_ZEROES; for (i = 0; i < job->num_dests; i++) if (!bdev_write_zeroes_sectors(job->dests[i].bdev)) {
job->op = REQ_OP_WRITE; break;
}
}
void dm_kcopyd_client_destroy(struct dm_kcopyd_client *kc)
{ /* Wait for completion of all jobs submitted by this client. */
wait_event(kc->destroyq, !atomic_read(&kc->nr_jobs));
Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.
Bemerkung:
Die farbliche Syntaxdarstellung und die Messung sind noch experimentell.