diff --git a/fs/io-wq.c b/fs/io-wq.c index 9b375009a553..33b14b85752b 100644 --- a/fs/io-wq.c +++ b/fs/io-wq.c @@ -27,6 +27,7 @@ enum { IO_WORKER_F_FREE = 4, /* worker on free list */ IO_WORKER_F_EXITING = 8, /* worker exiting */ IO_WORKER_F_FIXED = 16, /* static idle worker */ + IO_WORKER_F_BOUND = 32, /* is doing bounded work */ }; enum { @@ -66,6 +67,17 @@ struct io_wq_nulls_list { #define IO_WQ_HASH_ORDER 5 #endif +struct io_wqe_acct { + unsigned nr_workers; + unsigned max_workers; + atomic_t nr_running; +}; + +enum { + IO_WQ_ACCT_BOUND, + IO_WQ_ACCT_UNBOUND, +}; + /* * Per-node worker thread pool */ @@ -78,9 +90,7 @@ struct io_wqe { } ____cacheline_aligned_in_smp; int node; - unsigned nr_workers; - unsigned max_workers; - atomic_t nr_running; + struct io_wqe_acct acct[2]; struct io_wq_nulls_list free_list; struct io_wq_nulls_list busy_list; @@ -97,6 +107,7 @@ struct io_wq { unsigned nr_wqes; struct task_struct *manager; + struct user_struct *user; struct mm_struct *mm; refcount_t refs; struct completion done; @@ -152,10 +163,29 @@ static bool __io_worker_unuse(struct io_wqe *wqe, struct io_worker *worker) return dropped_lock; } +static inline struct io_wqe_acct *io_work_get_acct(struct io_wqe *wqe, + struct io_wq_work *work) +{ + if (work->flags & IO_WQ_WORK_UNBOUND) + return &wqe->acct[IO_WQ_ACCT_UNBOUND]; + + return &wqe->acct[IO_WQ_ACCT_BOUND]; +} + +static inline struct io_wqe_acct *io_wqe_get_acct(struct io_wqe *wqe, + struct io_worker *worker) +{ + if (worker->flags & IO_WORKER_F_BOUND) + return &wqe->acct[IO_WQ_ACCT_BOUND]; + + return &wqe->acct[IO_WQ_ACCT_UNBOUND]; +} + static void io_worker_exit(struct io_worker *worker) { struct io_wqe *wqe = worker->wqe; - bool all_done = false; + struct io_wqe_acct *acct = io_wqe_get_acct(wqe, worker); + unsigned nr_workers; /* * If we're not at zero, someone else is holding a brief reference @@ -169,7 +199,9 @@ static void io_worker_exit(struct io_worker *worker) preempt_disable(); current->flags &= ~PF_IO_WORKER; if (worker->flags & IO_WORKER_F_RUNNING) - atomic_dec(&wqe->nr_running); + atomic_dec(&acct->nr_running); + if (!(worker->flags & IO_WORKER_F_BOUND)) + atomic_dec(&wqe->wq->user->processes); worker->flags = 0; preempt_enable(); @@ -179,17 +211,88 @@ static void io_worker_exit(struct io_worker *worker) __release(&wqe->lock); spin_lock_irq(&wqe->lock); } - wqe->nr_workers--; - all_done = !wqe->nr_workers; + acct->nr_workers--; + nr_workers = wqe->acct[IO_WQ_ACCT_BOUND].nr_workers + + wqe->acct[IO_WQ_ACCT_UNBOUND].nr_workers; spin_unlock_irq(&wqe->lock); /* all workers gone, wq exit can proceed */ - if (all_done && refcount_dec_and_test(&wqe->wq->refs)) + if (!nr_workers && refcount_dec_and_test(&wqe->wq->refs)) complete(&wqe->wq->done); kfree_rcu(worker, rcu); } +static inline bool io_wqe_run_queue(struct io_wqe *wqe) + __must_hold(wqe->lock) +{ + if (!list_empty(&wqe->work_list) && !(wqe->flags & IO_WQE_FLAG_STALLED)) + return true; + return false; +} + +/* + * Check head of free list for an available worker. If one isn't available, + * caller must wake up the wq manager to create one. + */ +static bool io_wqe_activate_free_worker(struct io_wqe *wqe) + __must_hold(RCU) +{ + struct hlist_nulls_node *n; + struct io_worker *worker; + + n = rcu_dereference(hlist_nulls_first_rcu(&wqe->free_list.head)); + if (is_a_nulls(n)) + return false; + + worker = hlist_nulls_entry(n, struct io_worker, nulls_node); + if (io_worker_get(worker)) { + wake_up(&worker->wait); + io_worker_release(worker); + return true; + } + + return false; +} + +/* + * We need a worker. If we find a free one, we're good. If not, and we're + * below the max number of workers, wake up the manager to create one. + */ +static void io_wqe_wake_worker(struct io_wqe *wqe, struct io_wqe_acct *acct) +{ + bool ret; + + /* + * Most likely an attempt to queue unbounded work on an io_wq that + * wasn't setup with any unbounded workers. + */ + WARN_ON_ONCE(!acct->max_workers); + + rcu_read_lock(); + ret = io_wqe_activate_free_worker(wqe); + rcu_read_unlock(); + + if (!ret && acct->nr_workers < acct->max_workers) + wake_up_process(wqe->wq->manager); +} + +static void io_wqe_inc_running(struct io_wqe *wqe, struct io_worker *worker) +{ + struct io_wqe_acct *acct = io_wqe_get_acct(wqe, worker); + + atomic_inc(&acct->nr_running); +} + +static void io_wqe_dec_running(struct io_wqe *wqe, struct io_worker *worker) + __must_hold(wqe->lock) +{ + struct io_wqe_acct *acct = io_wqe_get_acct(wqe, worker); + + if (atomic_dec_and_test(&acct->nr_running) && io_wqe_run_queue(wqe)) + io_wqe_wake_worker(wqe, acct); +} + static void io_worker_start(struct io_wqe *wqe, struct io_worker *worker) { allow_kernel_signal(SIGINT); @@ -198,7 +301,7 @@ static void io_worker_start(struct io_wqe *wqe, struct io_worker *worker) worker->flags |= (IO_WORKER_F_UP | IO_WORKER_F_RUNNING); worker->restore_files = current->files; - atomic_inc(&wqe->nr_running); + io_wqe_inc_running(wqe, worker); } /* @@ -209,6 +312,8 @@ static void __io_worker_busy(struct io_wqe *wqe, struct io_worker *worker, struct io_wq_work *work) __must_hold(wqe->lock) { + bool worker_bound, work_bound; + if (worker->flags & IO_WORKER_F_FREE) { worker->flags &= ~IO_WORKER_F_FREE; hlist_nulls_del_init_rcu(&worker->nulls_node); @@ -216,6 +321,28 @@ static void __io_worker_busy(struct io_wqe *wqe, struct io_worker *worker, &wqe->busy_list.head); } worker->cur_work = work; + + /* + * If worker is moving from bound to unbound (or vice versa), then + * ensure we update the running accounting. + */ + worker_bound = (worker->flags & IO_WORKER_F_BOUND) != 0; + work_bound = (work->flags & IO_WQ_WORK_UNBOUND) == 0; + if (worker_bound != work_bound) { + io_wqe_dec_running(wqe, worker); + if (work_bound) { + worker->flags |= IO_WORKER_F_BOUND; + wqe->acct[IO_WQ_ACCT_UNBOUND].nr_workers--; + wqe->acct[IO_WQ_ACCT_BOUND].nr_workers++; + atomic_dec(&wqe->wq->user->processes); + } else { + worker->flags &= ~IO_WORKER_F_BOUND; + wqe->acct[IO_WQ_ACCT_UNBOUND].nr_workers++; + wqe->acct[IO_WQ_ACCT_BOUND].nr_workers--; + atomic_inc(&wqe->wq->user->processes); + } + io_wqe_inc_running(wqe, worker); + } } /* @@ -335,14 +462,6 @@ static void io_worker_handle_work(struct io_worker *worker) } while (1); } -static inline bool io_wqe_run_queue(struct io_wqe *wqe) - __must_hold(wqe->lock) -{ - if (!list_empty(&wqe->work_list) && !(wqe->flags & IO_WQE_FLAG_STALLED)) - return true; - return false; -} - static int io_wqe_worker(void *data) { struct io_worker *worker = data; @@ -391,46 +510,6 @@ static int io_wqe_worker(void *data) return 0; } -/* - * Check head of free list for an available worker. If one isn't available, - * caller must wake up the wq manager to create one. - */ -static bool io_wqe_activate_free_worker(struct io_wqe *wqe) - __must_hold(RCU) -{ - struct hlist_nulls_node *n; - struct io_worker *worker; - - n = rcu_dereference(hlist_nulls_first_rcu(&wqe->free_list.head)); - if (is_a_nulls(n)) - return false; - - worker = hlist_nulls_entry(n, struct io_worker, nulls_node); - if (io_worker_get(worker)) { - wake_up(&worker->wait); - io_worker_release(worker); - return true; - } - - return false; -} - -/* - * We need a worker. If we find a free one, we're good. If not, and we're - * below the max number of workers, wake up the manager to create one. - */ -static void io_wqe_wake_worker(struct io_wqe *wqe) -{ - bool ret; - - rcu_read_lock(); - ret = io_wqe_activate_free_worker(wqe); - rcu_read_unlock(); - - if (!ret && wqe->nr_workers < wqe->max_workers) - wake_up_process(wqe->wq->manager); -} - /* * Called when a worker is scheduled in. Mark us as currently running. */ @@ -444,7 +523,7 @@ void io_wq_worker_running(struct task_struct *tsk) if (worker->flags & IO_WORKER_F_RUNNING) return; worker->flags |= IO_WORKER_F_RUNNING; - atomic_inc(&wqe->nr_running); + io_wqe_inc_running(wqe, worker); } /* @@ -465,13 +544,13 @@ void io_wq_worker_sleeping(struct task_struct *tsk) worker->flags &= ~IO_WORKER_F_RUNNING; spin_lock_irq(&wqe->lock); - if (atomic_dec_and_test(&wqe->nr_running) && io_wqe_run_queue(wqe)) - io_wqe_wake_worker(wqe); + io_wqe_dec_running(wqe, worker); spin_unlock_irq(&wqe->lock); } -static void create_io_worker(struct io_wq *wq, struct io_wqe *wqe) +static void create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index) { + struct io_wqe_acct *acct =&wqe->acct[index]; struct io_worker *worker; worker = kcalloc_node(1, sizeof(*worker), GFP_KERNEL, wqe->node); @@ -484,7 +563,7 @@ static void create_io_worker(struct io_wq *wq, struct io_wqe *wqe) worker->wqe = wqe; worker->task = kthread_create_on_node(io_wqe_worker, worker, wqe->node, - "io_wqe_worker-%d", wqe->node); + "io_wqe_worker-%d/%d", index, wqe->node); if (IS_ERR(worker->task)) { kfree(worker); return; @@ -493,24 +572,31 @@ static void create_io_worker(struct io_wq *wq, struct io_wqe *wqe) spin_lock_irq(&wqe->lock); hlist_nulls_add_head_rcu(&worker->nulls_node, &wqe->free_list.head); worker->flags |= IO_WORKER_F_FREE; - if (!wqe->nr_workers) + if (index == IO_WQ_ACCT_BOUND) + worker->flags |= IO_WORKER_F_BOUND; + if (!acct->nr_workers && (worker->flags & IO_WORKER_F_BOUND)) worker->flags |= IO_WORKER_F_FIXED; - wqe->nr_workers++; + acct->nr_workers++; spin_unlock_irq(&wqe->lock); + if (index == IO_WQ_ACCT_UNBOUND) + atomic_inc(&wq->user->processes); + wake_up_process(worker->task); } -static inline bool io_wqe_need_new_worker(struct io_wqe *wqe) +static inline bool io_wqe_need_worker(struct io_wqe *wqe, int index) __must_hold(wqe->lock) { - if (!wqe->nr_workers) - return true; - if (hlist_nulls_empty(&wqe->free_list.head) && - wqe->nr_workers < wqe->max_workers && io_wqe_run_queue(wqe)) - return true; + struct io_wqe_acct *acct = &wqe->acct[index]; - return false; + /* always ensure we have one bounded worker */ + if (index == IO_WQ_ACCT_BOUND && !acct->nr_workers) + return true; + /* if we have available workers or no work, no need */ + if (!hlist_nulls_empty(&wqe->free_list.head) || !io_wqe_run_queue(wqe)) + return false; + return acct->nr_workers < acct->max_workers; } /* @@ -525,13 +611,18 @@ static int io_wq_manager(void *data) for (i = 0; i < wq->nr_wqes; i++) { struct io_wqe *wqe = wq->wqes[i]; - bool fork_worker = false; + bool fork_worker[2] = { false, false }; spin_lock_irq(&wqe->lock); - fork_worker = io_wqe_need_new_worker(wqe); + if (io_wqe_need_worker(wqe, IO_WQ_ACCT_BOUND)) + fork_worker[IO_WQ_ACCT_BOUND] = true; + if (io_wqe_need_worker(wqe, IO_WQ_ACCT_UNBOUND)) + fork_worker[IO_WQ_ACCT_UNBOUND] = true; spin_unlock_irq(&wqe->lock); - if (fork_worker) - create_io_worker(wq, wqe); + if (fork_worker[IO_WQ_ACCT_BOUND]) + create_io_worker(wq, wqe, IO_WQ_ACCT_BOUND); + if (fork_worker[IO_WQ_ACCT_UNBOUND]) + create_io_worker(wq, wqe, IO_WQ_ACCT_UNBOUND); } set_current_state(TASK_INTERRUPTIBLE); schedule_timeout(HZ); @@ -540,17 +631,53 @@ static int io_wq_manager(void *data) return 0; } +static bool io_wq_can_queue(struct io_wqe *wqe, struct io_wqe_acct *acct, + struct io_wq_work *work) +{ + bool free_worker; + + if (!(work->flags & IO_WQ_WORK_UNBOUND)) + return true; + if (atomic_read(&acct->nr_running)) + return true; + + rcu_read_lock(); + free_worker = !hlist_nulls_empty(&wqe->free_list.head); + rcu_read_unlock(); + if (free_worker) + return true; + + if (atomic_read(&wqe->wq->user->processes) >= acct->max_workers && + !(capable(CAP_SYS_RESOURCE) || capable(CAP_SYS_ADMIN))) + return false; + + return true; +} + static void io_wqe_enqueue(struct io_wqe *wqe, struct io_wq_work *work) { + struct io_wqe_acct *acct = io_work_get_acct(wqe, work); unsigned long flags; + /* + * Do early check to see if we need a new unbound worker, and if we do, + * if we're allowed to do so. This isn't 100% accurate as there's a + * gap between this check and incrementing the value, but that's OK. + * It's close enough to not be an issue, fork() has the same delay. + */ + if (unlikely(!io_wq_can_queue(wqe, acct, work))) { + work->flags |= IO_WQ_WORK_CANCEL; + work->func(&work); + return; + } + spin_lock_irqsave(&wqe->lock, flags); list_add_tail(&work->list, &wqe->work_list); wqe->flags &= ~IO_WQE_FLAG_STALLED; spin_unlock_irqrestore(&wqe->lock, flags); - if (!atomic_read(&wqe->nr_running)) - io_wqe_wake_worker(wqe); + if (!atomic_read(&acct->nr_running)) + io_wqe_wake_worker(wqe, acct); } void io_wq_enqueue(struct io_wq *wq, struct io_wq_work *work) @@ -828,7 +955,8 @@ void io_wq_flush(struct io_wq *wq) } } -struct io_wq *io_wq_create(unsigned concurrency, struct mm_struct *mm) +struct io_wq *io_wq_create(unsigned bounded, struct mm_struct *mm, + struct user_struct *user) { int ret = -ENOMEM, i, node; struct io_wq *wq; @@ -844,6 +972,9 @@ struct io_wq *io_wq_create(unsigned concurrency, struct mm_struct *mm) return ERR_PTR(-ENOMEM); } + /* caller must already hold a reference to this */ + wq->user = user; + i = 0; refcount_set(&wq->refs, wq->nr_wqes); for_each_online_node(node) { @@ -854,7 +985,13 @@ struct io_wq *io_wq_create(unsigned concurrency, struct mm_struct *mm) break; wq->wqes[i] = wqe; wqe->node = node; - wqe->max_workers = concurrency; + wqe->acct[IO_WQ_ACCT_BOUND].max_workers = bounded; + atomic_set(&wqe->acct[IO_WQ_ACCT_BOUND].nr_running, 0); + if (user) { + wqe->acct[IO_WQ_ACCT_UNBOUND].max_workers = + task_rlimit(current, RLIMIT_NPROC); + } + atomic_set(&wqe->acct[IO_WQ_ACCT_UNBOUND].nr_running, 0); wqe->node = node; wqe->wq = wq; spin_lock_init(&wqe->lock); @@ -863,7 +1000,6 @@ struct io_wq *io_wq_create(unsigned concurrency, struct mm_struct *mm) wqe->free_list.nulls = 0; INIT_HLIST_NULLS_HEAD(&wqe->busy_list.head, 1); wqe->busy_list.nulls = 1; - atomic_set(&wqe->nr_running, 0); i++; } diff --git a/fs/io-wq.h b/fs/io-wq.h index 3de192dc73fc..8cb345256f35 100644 --- a/fs/io-wq.h +++ b/fs/io-wq.h @@ -9,6 +9,7 @@ enum { IO_WQ_WORK_HASHED = 4, IO_WQ_WORK_NEEDS_USER = 8, IO_WQ_WORK_NEEDS_FILES = 16, + IO_WQ_WORK_UNBOUND = 32, IO_WQ_HASH_SHIFT = 24, /* upper 8 bits are used for hash key */ }; @@ -33,7 +34,8 @@ struct io_wq_work { (work)->files = NULL; \ } while (0) \ -struct io_wq *io_wq_create(unsigned concurrency, struct mm_struct *mm); +struct io_wq *io_wq_create(unsigned bounded, struct mm_struct *mm, + struct user_struct *user); void io_wq_destroy(struct io_wq *wq); void io_wq_enqueue(struct io_wq *wq, struct io_wq_work *work); diff --git a/fs/io_uring.c b/fs/io_uring.c index 4d89a2f222bf..831bea0fbc75 100644 --- a/fs/io_uring.c +++ b/fs/io_uring.c @@ -3745,7 +3745,7 @@ static int io_sq_offload_start(struct io_ring_ctx *ctx, /* Do QD, or 4 * CPUS, whatever is smallest */ concurrency = min(ctx->sq_entries, 4 * num_online_cpus()); - ctx->io_wq = io_wq_create(concurrency, ctx->sqo_mm); + ctx->io_wq = io_wq_create(concurrency, ctx->sqo_mm, NULL); if (IS_ERR(ctx->io_wq)) { ret = PTR_ERR(ctx->io_wq); ctx->io_wq = NULL;