raw_spin_lock(&acct->workers_lock); if (test_bit(IO_WORKER_F_FREE, &worker->flags))
hlist_nulls_del_rcu(&worker->nulls_node);
list_del_rcu(&worker->all_list);
raw_spin_unlock(&acct->workers_lock);
io_wq_dec_running(worker); /* * this worker is a goner, clear ->worker_private to avoid any * inc/dec running calls that could happen as part of exit from * touching 'worker'.
*/
current->worker_private = NULL;
/* * If there's work to do, returns true with acct->lock acquired. If not, * returns false with no lock held.
*/ staticinlinebool io_acct_run_queue(struct io_wq_acct *acct)
__acquires(&acct->lock)
{
raw_spin_lock(&acct->lock); if (__io_acct_run_queue(acct)) returntrue;
raw_spin_unlock(&acct->lock); returnfalse;
}
/* * Check head of free list for an available worker. If one isn't available, * caller must create one.
*/ staticbool io_acct_activate_free_worker(struct io_wq_acct *acct)
__must_hold(RCU)
{ struct hlist_nulls_node *n; struct io_worker *worker;
/* * Iterate free_list and see if we can find an idle worker to * activate. If a given worker is on the free_list but in the process * of exiting, keep trying.
*/
hlist_nulls_for_each_entry_rcu(worker, n, &acct->free_list, nulls_node) { if (!io_worker_get(worker)) continue; /* * If the worker is already running, it's either already * starting work or finishing work. In either case, if it does * to go sleep, we'll kick off a new task for this work anyway.
*/
wake_up_process(worker->task);
io_worker_release(worker); returntrue;
}
returnfalse;
}
/* * We need a worker. If we find a free one, we're good. If not, and we're * below the max number of workers, create one.
*/ staticbool io_wq_create_worker(struct io_wq *wq, struct io_wq_acct *acct)
{ /* * Most likely an attempt to queue unbounded work on an io_wq that * wasn't setup with any unbounded workers.
*/ if (unlikely(!acct->max_workers))
pr_warn_once("io-wq is not configured for unbound workers");
/* raced with exit, just ignore create call */ if (test_bit(IO_WQ_BIT_EXIT, &wq->state)) goto fail; if (!io_worker_get(worker)) goto fail; /* * create_state manages ownership of create_work/index. We should * only need one entry per worker, as the worker going to sleep * will trigger the condition, and waking will clear it once it * runs the task_work.
*/ if (test_bit(0, &worker->create_state) ||
test_and_set_bit_lock(0, &worker->create_state)) goto fail_release;
atomic_inc(&wq->worker_refs);
init_task_work(&worker->create_work, func); if (!task_work_add(wq->task, &worker->create_work, TWA_SIGNAL)) { /* * EXIT may have been set after checking it above, check after * adding the task_work and remove any creation item if it is * now set. wq exit does that too, but we can have added this * work item after we canceled in io_wq_exit_workers().
*/ if (test_bit(IO_WQ_BIT_EXIT, &wq->state))
io_wq_cancel_tw_create(wq);
io_worker_ref_put(wq); returntrue;
}
io_worker_ref_put(wq);
clear_bit_unlock(0, &worker->create_state);
fail_release:
io_worker_release(worker);
fail:
atomic_dec(&acct->nr_running);
io_worker_ref_put(wq); returnfalse;
}
/* Defer if current and next work are both hashed to the same chain */ staticbool io_wq_hash_defer(struct io_wq_work *work, struct io_wq_acct *acct)
{ unsignedint hash, work_flags; struct io_wq_work *next;
lockdep_assert_held(&acct->lock);
work_flags = atomic_read(&work->flags); if (!__io_wq_is_hashed(work_flags)) returnfalse;
/* should not happen, io_acct_run_queue() said we had work */ if (wq_list_empty(&acct->work_list)) returntrue;
hash = __io_get_work_hash(work_flags);
next = container_of(acct->work_list.first, struct io_wq_work, list);
work_flags = atomic_read(&next->flags); if (!__io_wq_is_hashed(work_flags)) returnfalse; return hash == __io_get_work_hash(work_flags);
}
if (!test_bit(IO_WORKER_F_UP, &worker->flags)) return;
if (!atomic_dec_and_test(&acct->nr_running)) return; if (!worker->cur_work) return; if (!io_acct_run_queue(acct)) return; if (io_wq_hash_defer(worker->cur_work, acct)) {
raw_spin_unlock(&acct->lock); return;
}
/* * Worker will start processing some work. Move it to the busy list, if * it's currently on the freelist
*/ staticvoid __io_worker_busy(struct io_wq_acct *acct, struct io_worker *worker)
{ if (test_bit(IO_WORKER_F_FREE, &worker->flags)) {
clear_bit(IO_WORKER_F_FREE, &worker->flags);
raw_spin_lock(&acct->workers_lock);
hlist_nulls_del_init_rcu(&worker->nulls_node);
raw_spin_unlock(&acct->workers_lock);
}
}
/* * No work, worker going to sleep. Move to freelist.
*/ staticvoid __io_worker_idle(struct io_wq_acct *acct, struct io_worker *worker)
__must_hold(acct->workers_lock)
{ if (!test_bit(IO_WORKER_F_FREE, &worker->flags)) {
set_bit(IO_WORKER_F_FREE, &worker->flags);
hlist_nulls_add_head_rcu(&worker->nulls_node, &acct->free_list);
}
}
work = container_of(node, struct io_wq_work, list);
/* not hashed, can run anytime */
work_flags = atomic_read(&work->flags); if (!__io_wq_is_hashed(work_flags)) {
wq_list_del(&acct->work_list, node, prev); return work;
}
hash = __io_get_work_hash(work_flags); /* all items with this hash lie in [work, tail] */
tail = wq->hash_tail[hash];
/* hashed, can run if not already running */ if (!test_and_set_bit(hash, &wq->hash->map)) {
wq->hash_tail[hash] = NULL;
wq_list_cut(&acct->work_list, &tail->list, prev); return work;
} if (stall_hash == -1U)
stall_hash = hash; /* fast forward to a next hash, for-each will fix up @prev */
node = &tail->list;
}
if (stall_hash != -1U) { bool unstalled;
/* * Set this before dropping the lock to avoid racing with new * work being added and clearing the stalled bit.
*/
set_bit(IO_ACCT_STALLED_BIT, &acct->flags);
raw_spin_unlock(&acct->lock);
unstalled = io_wait_on_hash(wq, stall_hash);
raw_spin_lock(&acct->lock); if (unstalled) {
clear_bit(IO_ACCT_STALLED_BIT, &acct->flags); if (wq_has_sleeper(&wq->hash->wait))
wake_up(&wq->hash->wait);
}
}
/* * Called with acct->lock held, drops it before returning
*/ staticvoid io_worker_handle_work(struct io_wq_acct *acct, struct io_worker *worker)
__releases(&acct->lock)
{ struct io_wq *wq = worker->wq; bool do_kill = test_bit(IO_WQ_BIT_EXIT, &wq->state);
do { struct io_wq_work *work;
/* * If we got some work, mark us as busy. If we didn't, but * the list isn't empty, it means we stalled on hashed work. * Mark us stalled so we don't keep looking for work when we * can't make progress, any work completion or insertion will * clear the stalled flag.
*/
work = io_get_next_work(acct, wq); if (work) { /* * Make sure cancelation can find this, even before * it becomes the active work. That avoids a window * where the work has been removed from our general * work list, but isn't yet discoverable as the * current work item for this worker.
*/
raw_spin_lock(&worker->lock);
worker->cur_work = work;
raw_spin_unlock(&worker->lock);
}
while (!test_bit(IO_WQ_BIT_EXIT, &wq->state)) { long ret;
set_current_state(TASK_INTERRUPTIBLE);
/* * If we have work to do, io_acct_run_queue() returns with * the acct->lock held. If not, it will drop it.
*/ while (io_acct_run_queue(acct))
io_worker_handle_work(acct, worker);
raw_spin_lock(&acct->workers_lock); /* * Last sleep timed out. Exit if we're not the last worker, * or if someone modified our affinity.
*/ if (last_timeout && (exit_mask || acct->nr_workers > 1)) {
acct->nr_workers--;
raw_spin_unlock(&acct->workers_lock);
__set_current_state(TASK_RUNNING); break;
}
last_timeout = false;
__io_worker_idle(acct, worker);
raw_spin_unlock(&acct->workers_lock); if (io_run_task_work()) continue;
ret = schedule_timeout(WORKER_IDLE_TIMEOUT); if (signal_pending(current)) { struct ksignal ksig;
if (!get_signal(&ksig)) continue; break;
} if (!ret) {
last_timeout = true;
exit_mask = !cpumask_test_cpu(raw_smp_processor_id(),
wq->cpu_mask);
}
}
if (test_bit(IO_WQ_BIT_EXIT, &wq->state) && io_acct_run_queue(acct))
io_worker_handle_work(acct, worker);
io_worker_exit(worker); return 0;
}
/* * Called when a worker is scheduled in. Mark us as currently running.
*/ void io_wq_worker_running(struct task_struct *tsk)
{ struct io_worker *worker = tsk->worker_private;
if (!worker) return; if (!test_bit(IO_WORKER_F_UP, &worker->flags)) return; if (test_bit(IO_WORKER_F_RUNNING, &worker->flags)) return;
set_bit(IO_WORKER_F_RUNNING, &worker->flags);
io_wq_inc_running(worker);
}
/* * Called when worker is going to sleep. If there are no workers currently * running and we have work pending, wake up a free one or create a new one.
*/ void io_wq_worker_sleeping(struct task_struct *tsk)
{ struct io_worker *worker = tsk->worker_private;
if (!worker) return; if (!test_bit(IO_WORKER_F_UP, &worker->flags)) return; if (!test_bit(IO_WORKER_F_RUNNING, &worker->flags)) return;
staticinlinebool io_should_retry_thread(struct io_worker *worker, long err)
{ /* * Prevent perpetual task_work retry, if the task (or its group) is * exiting.
*/ if (fatal_signal_pending(current)) returnfalse; if (worker->init_retries++ >= WORKER_INIT_LIMIT) returnfalse;
switch (err) { case -EAGAIN: case -ERESTARTSYS: case -ERESTARTNOINTR: case -ERESTARTNOHAND: returntrue; default: returnfalse;
}
}
staticvoid queue_create_worker_retry(struct io_worker *worker)
{ /* * We only bother retrying because there's a chance that the * failure to create a worker is due to some temporary condition * in the forking task (e.g. outstanding signal); give the task * some time to clear that condition.
*/
schedule_delayed_work(&worker->work,
msecs_to_jiffies(worker->init_retries * 5));
}
/* * Iterate the passed in list and call the specific function for each * worker that isn't exiting
*/ staticbool io_acct_for_each_worker(struct io_wq_acct *acct, bool (*func)(struct io_worker *, void *), void *data)
{ struct io_worker *worker; bool ret = false;
list_for_each_entry_rcu(worker, &acct->all_list, all_list) { if (io_worker_get(worker)) { /* no task if node is/was offline */ if (worker->task)
ret = func(worker, data);
io_worker_release(worker); if (ret) break;
}
}
return ret;
}
staticbool io_wq_for_each_worker(struct io_wq *wq, bool (*func)(struct io_worker *, void *), void *data)
{ for (int i = 0; i < IO_WQ_ACCT_NR; i++) { if (!io_acct_for_each_worker(&wq->acct[i], func, data)) returnfalse;
}
/* * If io-wq is exiting for this task, or if the request has explicitly * been marked as one that should not get executed, cancel it here.
*/ if (test_bit(IO_WQ_BIT_EXIT, &wq->state) ||
(work_flags & IO_WQ_WORK_CANCEL)) {
io_run_cancel(work, wq); return;
}
did_create = io_wq_create_worker(wq, acct); if (likely(did_create)) return;
raw_spin_lock(&acct->workers_lock); if (acct->nr_workers) {
raw_spin_unlock(&acct->workers_lock); return;
}
raw_spin_unlock(&acct->workers_lock);
/* fatal condition, failed to create the first worker */
io_acct_cancel_pending_work(wq, acct, &match);
}
}
/* * Work items that hash to the same value will not be done in parallel. * Used to limit concurrent writes, generally hashed by inode.
*/ void io_wq_hash_work(struct io_wq_work *work, void *val)
{ unsignedint bit;
/* * Hold the lock to avoid ->cur_work going out of scope, caller * may dereference the passed in work.
*/
raw_spin_lock(&worker->lock); if (__io_wq_worker_cancel(worker, match, worker->cur_work))
match->nr_running++;
raw_spin_unlock(&worker->lock);
/* * First check pending list, if we're lucky we can just remove it * from there. CANCEL_OK means that the work is returned as-new, * no completion will be posted for it. * * Then check if a free (going busy) or busy worker has the work * currently running. If we find it there, we'll return CANCEL_RUNNING * as an indication that we attempt to signal cancellation. The * completion will run normally in this case. * * Do both of these while holding the acct->workers_lock, to ensure that * we'll find a work item regardless of state.
*/
io_wq_cancel_pending_work(wq, &match); if (match.nr_pending && !match.cancel_all) return IO_WQ_CANCEL_OK;
io_wq_cancel_running_work(wq, &match); if (match.nr_running && !match.cancel_all) return IO_WQ_CANCEL_RUNNING;
if (match.nr_running) return IO_WQ_CANCEL_RUNNING; if (match.nr_pending) return IO_WQ_CANCEL_OK; return IO_WQ_CANCEL_NOTFOUND;
}
int io_wq_cpu_affinity(struct io_uring_task *tctx, cpumask_var_t mask)
{
cpumask_var_t allowed_mask; int ret = 0;
if (!tctx || !tctx->io_wq) return -EINVAL;
if (!alloc_cpumask_var(&allowed_mask, GFP_KERNEL)) return -ENOMEM;
rcu_read_lock();
cpuset_cpus_allowed(tctx->io_wq->task, allowed_mask); if (mask) { if (cpumask_subset(mask, allowed_mask))
cpumask_copy(tctx->io_wq->cpu_mask, mask); else
ret = -EINVAL;
} else {
cpumask_copy(tctx->io_wq->cpu_mask, allowed_mask);
}
rcu_read_unlock();
free_cpumask_var(allowed_mask); return ret;
}
/* * Set max number of unbounded workers, returns old value. If new_count is 0, * then just return the old value.
*/ int io_wq_max_workers(struct io_wq *wq, int *new_count)
{ struct io_wq_acct *acct; int prev[IO_WQ_ACCT_NR]; int i;
Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.
Bemerkung:
Die farbliche Syntaxdarstellung und die Messung sind noch experimentell.