Line data Source code
1 : // SPDX-License-Identifier: GPL-2.0
2 : /*
3 : * Basic worker thread pool for io_uring
4 : *
5 : * Copyright (C) 2019 Jens Axboe
6 : *
7 : */
8 : #include <linux/kernel.h>
9 : #include <linux/init.h>
10 : #include <linux/errno.h>
11 : #include <linux/sched/signal.h>
12 : #include <linux/percpu.h>
13 : #include <linux/slab.h>
14 : #include <linux/rculist_nulls.h>
15 : #include <linux/cpu.h>
16 : #include <linux/task_work.h>
17 : #include <linux/audit.h>
18 : #include <linux/mmu_context.h>
19 : #include <uapi/linux/io_uring.h>
20 :
21 : #include "io-wq.h"
22 : #include "slist.h"
23 : #include "io_uring.h"
24 :
25 : #define WORKER_IDLE_TIMEOUT (5 * HZ)
26 :
27 : enum {
28 : IO_WORKER_F_UP = 1, /* up and active */
29 : IO_WORKER_F_RUNNING = 2, /* account as running */
30 : IO_WORKER_F_FREE = 4, /* worker on free list */
31 : IO_WORKER_F_BOUND = 8, /* is doing bounded work */
32 : };
33 :
34 : enum {
35 : IO_WQ_BIT_EXIT = 0, /* wq exiting */
36 : };
37 :
38 : enum {
39 : IO_ACCT_STALLED_BIT = 0, /* stalled on hash */
40 : };
41 :
42 : /*
43 : * One for each thread in a wq pool
44 : */
45 : struct io_worker {
46 : refcount_t ref;
47 : unsigned flags;
48 : struct hlist_nulls_node nulls_node;
49 : struct list_head all_list;
50 : struct task_struct *task;
51 : struct io_wq *wq;
52 :
53 : struct io_wq_work *cur_work;
54 : struct io_wq_work *next_work;
55 : raw_spinlock_t lock;
56 :
57 : struct completion ref_done;
58 :
59 : unsigned long create_state;
60 : struct callback_head create_work;
61 : int create_index;
62 :
63 : union {
64 : struct rcu_head rcu;
65 : struct work_struct work;
66 : };
67 : };
68 :
69 : #if BITS_PER_LONG == 64
70 : #define IO_WQ_HASH_ORDER 6
71 : #else
72 : #define IO_WQ_HASH_ORDER 5
73 : #endif
74 :
75 : #define IO_WQ_NR_HASH_BUCKETS (1u << IO_WQ_HASH_ORDER)
76 :
77 : struct io_wq_acct {
78 : unsigned nr_workers;
79 : unsigned max_workers;
80 : int index;
81 : atomic_t nr_running;
82 : raw_spinlock_t lock;
83 : struct io_wq_work_list work_list;
84 : unsigned long flags;
85 : };
86 :
87 : enum {
88 : IO_WQ_ACCT_BOUND,
89 : IO_WQ_ACCT_UNBOUND,
90 : IO_WQ_ACCT_NR,
91 : };
92 :
93 : /*
94 : * Per io_wq state
95 : */
96 : struct io_wq {
97 : unsigned long state;
98 :
99 : free_work_fn *free_work;
100 : io_wq_work_fn *do_work;
101 :
102 : struct io_wq_hash *hash;
103 :
104 : atomic_t worker_refs;
105 : struct completion worker_done;
106 :
107 : struct hlist_node cpuhp_node;
108 :
109 : struct task_struct *task;
110 :
111 : struct io_wq_acct acct[IO_WQ_ACCT_NR];
112 :
113 : /* lock protects access to elements below */
114 : raw_spinlock_t lock;
115 :
116 : struct hlist_nulls_head free_list;
117 : struct list_head all_list;
118 :
119 : struct wait_queue_entry wait;
120 :
121 : struct io_wq_work *hash_tail[IO_WQ_NR_HASH_BUCKETS];
122 :
123 : cpumask_var_t cpu_mask;
124 : };
125 :
126 : static enum cpuhp_state io_wq_online;
127 :
128 : struct io_cb_cancel_data {
129 : work_cancel_fn *fn;
130 : void *data;
131 : int nr_running;
132 : int nr_pending;
133 : bool cancel_all;
134 : };
135 :
136 : static bool create_io_worker(struct io_wq *wq, int index);
137 : static void io_wq_dec_running(struct io_worker *worker);
138 : static bool io_acct_cancel_pending_work(struct io_wq *wq,
139 : struct io_wq_acct *acct,
140 : struct io_cb_cancel_data *match);
141 : static void create_worker_cb(struct callback_head *cb);
142 : static void io_wq_cancel_tw_create(struct io_wq *wq);
143 :
144 : static bool io_worker_get(struct io_worker *worker)
145 : {
146 0 : return refcount_inc_not_zero(&worker->ref);
147 : }
148 :
149 0 : static void io_worker_release(struct io_worker *worker)
150 : {
151 0 : if (refcount_dec_and_test(&worker->ref))
152 0 : complete(&worker->ref_done);
153 0 : }
154 :
155 : static inline struct io_wq_acct *io_get_acct(struct io_wq *wq, bool bound)
156 : {
157 0 : return &wq->acct[bound ? IO_WQ_ACCT_BOUND : IO_WQ_ACCT_UNBOUND];
158 : }
159 :
160 : static inline struct io_wq_acct *io_work_get_acct(struct io_wq *wq,
161 : struct io_wq_work *work)
162 : {
163 0 : return io_get_acct(wq, !(work->flags & IO_WQ_WORK_UNBOUND));
164 : }
165 :
166 : static inline struct io_wq_acct *io_wq_get_acct(struct io_worker *worker)
167 : {
168 0 : return io_get_acct(worker->wq, worker->flags & IO_WORKER_F_BOUND);
169 : }
170 :
171 : static void io_worker_ref_put(struct io_wq *wq)
172 : {
173 0 : if (atomic_dec_and_test(&wq->worker_refs))
174 0 : complete(&wq->worker_done);
175 : }
176 :
177 0 : static void io_worker_cancel_cb(struct io_worker *worker)
178 : {
179 0 : struct io_wq_acct *acct = io_wq_get_acct(worker);
180 0 : struct io_wq *wq = worker->wq;
181 :
182 0 : atomic_dec(&acct->nr_running);
183 0 : raw_spin_lock(&wq->lock);
184 0 : acct->nr_workers--;
185 0 : raw_spin_unlock(&wq->lock);
186 0 : io_worker_ref_put(wq);
187 0 : clear_bit_unlock(0, &worker->create_state);
188 0 : io_worker_release(worker);
189 0 : }
190 :
191 0 : static bool io_task_worker_match(struct callback_head *cb, void *data)
192 : {
193 : struct io_worker *worker;
194 :
195 0 : if (cb->func != create_worker_cb)
196 : return false;
197 0 : worker = container_of(cb, struct io_worker, create_work);
198 0 : return worker == data;
199 : }
200 :
201 0 : static void io_worker_exit(struct io_worker *worker)
202 : {
203 0 : struct io_wq *wq = worker->wq;
204 :
205 0 : while (1) {
206 0 : struct callback_head *cb = task_work_cancel_match(wq->task,
207 : io_task_worker_match, worker);
208 :
209 0 : if (!cb)
210 : break;
211 0 : io_worker_cancel_cb(worker);
212 : }
213 :
214 0 : io_worker_release(worker);
215 0 : wait_for_completion(&worker->ref_done);
216 :
217 0 : raw_spin_lock(&wq->lock);
218 0 : if (worker->flags & IO_WORKER_F_FREE)
219 0 : hlist_nulls_del_rcu(&worker->nulls_node);
220 0 : list_del_rcu(&worker->all_list);
221 0 : raw_spin_unlock(&wq->lock);
222 0 : io_wq_dec_running(worker);
223 : /*
224 : * this worker is a goner, clear ->worker_private to avoid any
225 : * inc/dec running calls that could happen as part of exit from
226 : * touching 'worker'.
227 : */
228 0 : current->worker_private = NULL;
229 :
230 0 : kfree_rcu(worker, rcu);
231 0 : io_worker_ref_put(wq);
232 0 : do_exit(0);
233 : }
234 :
235 : static inline bool io_acct_run_queue(struct io_wq_acct *acct)
236 : {
237 0 : bool ret = false;
238 :
239 0 : raw_spin_lock(&acct->lock);
240 0 : if (!wq_list_empty(&acct->work_list) &&
241 0 : !test_bit(IO_ACCT_STALLED_BIT, &acct->flags))
242 0 : ret = true;
243 0 : raw_spin_unlock(&acct->lock);
244 :
245 : return ret;
246 : }
247 :
248 : /*
249 : * Check head of free list for an available worker. If one isn't available,
250 : * caller must create one.
251 : */
252 0 : static bool io_wq_activate_free_worker(struct io_wq *wq,
253 : struct io_wq_acct *acct)
254 : __must_hold(RCU)
255 : {
256 : struct hlist_nulls_node *n;
257 : struct io_worker *worker;
258 :
259 : /*
260 : * Iterate free_list and see if we can find an idle worker to
261 : * activate. If a given worker is on the free_list but in the process
262 : * of exiting, keep trying.
263 : */
264 0 : hlist_nulls_for_each_entry_rcu(worker, n, &wq->free_list, nulls_node) {
265 0 : if (!io_worker_get(worker))
266 0 : continue;
267 0 : if (io_wq_get_acct(worker) != acct) {
268 0 : io_worker_release(worker);
269 0 : continue;
270 : }
271 0 : if (wake_up_process(worker->task)) {
272 0 : io_worker_release(worker);
273 0 : return true;
274 : }
275 0 : io_worker_release(worker);
276 : }
277 :
278 : return false;
279 : }
280 :
281 : /*
282 : * We need a worker. If we find a free one, we're good. If not, and we're
283 : * below the max number of workers, create one.
284 : */
285 0 : static bool io_wq_create_worker(struct io_wq *wq, struct io_wq_acct *acct)
286 : {
287 : /*
288 : * Most likely an attempt to queue unbounded work on an io_wq that
289 : * wasn't setup with any unbounded workers.
290 : */
291 0 : if (unlikely(!acct->max_workers))
292 0 : pr_warn_once("io-wq is not configured for unbound workers");
293 :
294 0 : raw_spin_lock(&wq->lock);
295 0 : if (acct->nr_workers >= acct->max_workers) {
296 0 : raw_spin_unlock(&wq->lock);
297 0 : return true;
298 : }
299 0 : acct->nr_workers++;
300 0 : raw_spin_unlock(&wq->lock);
301 0 : atomic_inc(&acct->nr_running);
302 0 : atomic_inc(&wq->worker_refs);
303 0 : return create_io_worker(wq, acct->index);
304 : }
305 :
306 : static void io_wq_inc_running(struct io_worker *worker)
307 : {
308 0 : struct io_wq_acct *acct = io_wq_get_acct(worker);
309 :
310 0 : atomic_inc(&acct->nr_running);
311 : }
312 :
313 0 : static void create_worker_cb(struct callback_head *cb)
314 : {
315 : struct io_worker *worker;
316 : struct io_wq *wq;
317 :
318 : struct io_wq_acct *acct;
319 0 : bool do_create = false;
320 :
321 0 : worker = container_of(cb, struct io_worker, create_work);
322 0 : wq = worker->wq;
323 0 : acct = &wq->acct[worker->create_index];
324 0 : raw_spin_lock(&wq->lock);
325 :
326 0 : if (acct->nr_workers < acct->max_workers) {
327 0 : acct->nr_workers++;
328 0 : do_create = true;
329 : }
330 0 : raw_spin_unlock(&wq->lock);
331 0 : if (do_create) {
332 0 : create_io_worker(wq, worker->create_index);
333 : } else {
334 0 : atomic_dec(&acct->nr_running);
335 : io_worker_ref_put(wq);
336 : }
337 0 : clear_bit_unlock(0, &worker->create_state);
338 0 : io_worker_release(worker);
339 0 : }
340 :
341 0 : static bool io_queue_worker_create(struct io_worker *worker,
342 : struct io_wq_acct *acct,
343 : task_work_func_t func)
344 : {
345 0 : struct io_wq *wq = worker->wq;
346 :
347 : /* raced with exit, just ignore create call */
348 0 : if (test_bit(IO_WQ_BIT_EXIT, &wq->state))
349 : goto fail;
350 0 : if (!io_worker_get(worker))
351 : goto fail;
352 : /*
353 : * create_state manages ownership of create_work/index. We should
354 : * only need one entry per worker, as the worker going to sleep
355 : * will trigger the condition, and waking will clear it once it
356 : * runs the task_work.
357 : */
358 0 : if (test_bit(0, &worker->create_state) ||
359 0 : test_and_set_bit_lock(0, &worker->create_state))
360 : goto fail_release;
361 :
362 0 : atomic_inc(&wq->worker_refs);
363 0 : init_task_work(&worker->create_work, func);
364 0 : worker->create_index = acct->index;
365 0 : if (!task_work_add(wq->task, &worker->create_work, TWA_SIGNAL)) {
366 : /*
367 : * EXIT may have been set after checking it above, check after
368 : * adding the task_work and remove any creation item if it is
369 : * now set. wq exit does that too, but we can have added this
370 : * work item after we canceled in io_wq_exit_workers().
371 : */
372 0 : if (test_bit(IO_WQ_BIT_EXIT, &wq->state))
373 0 : io_wq_cancel_tw_create(wq);
374 : io_worker_ref_put(wq);
375 : return true;
376 : }
377 0 : io_worker_ref_put(wq);
378 0 : clear_bit_unlock(0, &worker->create_state);
379 : fail_release:
380 0 : io_worker_release(worker);
381 : fail:
382 0 : atomic_dec(&acct->nr_running);
383 : io_worker_ref_put(wq);
384 : return false;
385 : }
386 :
387 0 : static void io_wq_dec_running(struct io_worker *worker)
388 : {
389 0 : struct io_wq_acct *acct = io_wq_get_acct(worker);
390 0 : struct io_wq *wq = worker->wq;
391 :
392 0 : if (!(worker->flags & IO_WORKER_F_UP))
393 : return;
394 :
395 0 : if (!atomic_dec_and_test(&acct->nr_running))
396 : return;
397 0 : if (!io_acct_run_queue(acct))
398 : return;
399 :
400 0 : atomic_inc(&acct->nr_running);
401 0 : atomic_inc(&wq->worker_refs);
402 0 : io_queue_worker_create(worker, acct, create_worker_cb);
403 : }
404 :
405 : /*
406 : * Worker will start processing some work. Move it to the busy list, if
407 : * it's currently on the freelist
408 : */
409 : static void __io_worker_busy(struct io_wq *wq, struct io_worker *worker)
410 : {
411 0 : if (worker->flags & IO_WORKER_F_FREE) {
412 0 : worker->flags &= ~IO_WORKER_F_FREE;
413 0 : raw_spin_lock(&wq->lock);
414 0 : hlist_nulls_del_init_rcu(&worker->nulls_node);
415 0 : raw_spin_unlock(&wq->lock);
416 : }
417 : }
418 :
419 : /*
420 : * No work, worker going to sleep. Move to freelist.
421 : */
422 : static void __io_worker_idle(struct io_wq *wq, struct io_worker *worker)
423 : __must_hold(wq->lock)
424 : {
425 0 : if (!(worker->flags & IO_WORKER_F_FREE)) {
426 0 : worker->flags |= IO_WORKER_F_FREE;
427 0 : hlist_nulls_add_head_rcu(&worker->nulls_node, &wq->free_list);
428 : }
429 : }
430 :
431 : static inline unsigned int io_get_work_hash(struct io_wq_work *work)
432 : {
433 0 : return work->flags >> IO_WQ_HASH_SHIFT;
434 : }
435 :
436 0 : static bool io_wait_on_hash(struct io_wq *wq, unsigned int hash)
437 : {
438 0 : bool ret = false;
439 :
440 0 : spin_lock_irq(&wq->hash->wait.lock);
441 0 : if (list_empty(&wq->wait.entry)) {
442 0 : __add_wait_queue(&wq->hash->wait, &wq->wait);
443 0 : if (!test_bit(hash, &wq->hash->map)) {
444 0 : __set_current_state(TASK_RUNNING);
445 0 : list_del_init(&wq->wait.entry);
446 0 : ret = true;
447 : }
448 : }
449 0 : spin_unlock_irq(&wq->hash->wait.lock);
450 0 : return ret;
451 : }
452 :
453 0 : static struct io_wq_work *io_get_next_work(struct io_wq_acct *acct,
454 : struct io_worker *worker)
455 : __must_hold(acct->lock)
456 : {
457 : struct io_wq_work_node *node, *prev;
458 : struct io_wq_work *work, *tail;
459 0 : unsigned int stall_hash = -1U;
460 0 : struct io_wq *wq = worker->wq;
461 :
462 0 : wq_list_for_each(node, prev, &acct->work_list) {
463 : unsigned int hash;
464 :
465 0 : work = container_of(node, struct io_wq_work, list);
466 :
467 : /* not hashed, can run anytime */
468 0 : if (!io_wq_is_hashed(work)) {
469 0 : wq_list_del(&acct->work_list, node, prev);
470 : return work;
471 : }
472 :
473 0 : hash = io_get_work_hash(work);
474 : /* all items with this hash lie in [work, tail] */
475 0 : tail = wq->hash_tail[hash];
476 :
477 : /* hashed, can run if not already running */
478 0 : if (!test_and_set_bit(hash, &wq->hash->map)) {
479 0 : wq->hash_tail[hash] = NULL;
480 0 : wq_list_cut(&acct->work_list, &tail->list, prev);
481 : return work;
482 : }
483 0 : if (stall_hash == -1U)
484 0 : stall_hash = hash;
485 : /* fast forward to a next hash, for-each will fix up @prev */
486 0 : node = &tail->list;
487 : }
488 :
489 0 : if (stall_hash != -1U) {
490 : bool unstalled;
491 :
492 : /*
493 : * Set this before dropping the lock to avoid racing with new
494 : * work being added and clearing the stalled bit.
495 : */
496 0 : set_bit(IO_ACCT_STALLED_BIT, &acct->flags);
497 0 : raw_spin_unlock(&acct->lock);
498 0 : unstalled = io_wait_on_hash(wq, stall_hash);
499 0 : raw_spin_lock(&acct->lock);
500 0 : if (unstalled) {
501 0 : clear_bit(IO_ACCT_STALLED_BIT, &acct->flags);
502 0 : if (wq_has_sleeper(&wq->hash->wait))
503 0 : wake_up(&wq->hash->wait);
504 : }
505 : }
506 :
507 : return NULL;
508 : }
509 :
510 : static void io_assign_current_work(struct io_worker *worker,
511 : struct io_wq_work *work)
512 : {
513 0 : if (work) {
514 0 : io_run_task_work();
515 0 : cond_resched();
516 : }
517 :
518 0 : raw_spin_lock(&worker->lock);
519 0 : worker->cur_work = work;
520 0 : worker->next_work = NULL;
521 0 : raw_spin_unlock(&worker->lock);
522 : }
523 :
524 0 : static void io_worker_handle_work(struct io_worker *worker)
525 : {
526 0 : struct io_wq_acct *acct = io_wq_get_acct(worker);
527 0 : struct io_wq *wq = worker->wq;
528 0 : bool do_kill = test_bit(IO_WQ_BIT_EXIT, &wq->state);
529 :
530 : do {
531 : struct io_wq_work *work;
532 :
533 : /*
534 : * If we got some work, mark us as busy. If we didn't, but
535 : * the list isn't empty, it means we stalled on hashed work.
536 : * Mark us stalled so we don't keep looking for work when we
537 : * can't make progress, any work completion or insertion will
538 : * clear the stalled flag.
539 : */
540 0 : raw_spin_lock(&acct->lock);
541 0 : work = io_get_next_work(acct, worker);
542 0 : raw_spin_unlock(&acct->lock);
543 0 : if (work) {
544 0 : __io_worker_busy(wq, worker);
545 :
546 : /*
547 : * Make sure cancelation can find this, even before
548 : * it becomes the active work. That avoids a window
549 : * where the work has been removed from our general
550 : * work list, but isn't yet discoverable as the
551 : * current work item for this worker.
552 : */
553 0 : raw_spin_lock(&worker->lock);
554 0 : worker->next_work = work;
555 0 : raw_spin_unlock(&worker->lock);
556 : } else {
557 : break;
558 : }
559 0 : io_assign_current_work(worker, work);
560 0 : __set_current_state(TASK_RUNNING);
561 :
562 : /* handle a whole dependent link */
563 : do {
564 : struct io_wq_work *next_hashed, *linked;
565 0 : unsigned int hash = io_get_work_hash(work);
566 :
567 0 : next_hashed = wq_next_work(work);
568 :
569 0 : if (unlikely(do_kill) && (work->flags & IO_WQ_WORK_UNBOUND))
570 0 : work->flags |= IO_WQ_WORK_CANCEL;
571 0 : wq->do_work(work);
572 0 : io_assign_current_work(worker, NULL);
573 :
574 0 : linked = wq->free_work(work);
575 0 : work = next_hashed;
576 0 : if (!work && linked && !io_wq_is_hashed(linked)) {
577 0 : work = linked;
578 0 : linked = NULL;
579 : }
580 0 : io_assign_current_work(worker, work);
581 0 : if (linked)
582 0 : io_wq_enqueue(wq, linked);
583 :
584 0 : if (hash != -1U && !next_hashed) {
585 : /* serialize hash clear with wake_up() */
586 0 : spin_lock_irq(&wq->hash->wait.lock);
587 0 : clear_bit(hash, &wq->hash->map);
588 0 : clear_bit(IO_ACCT_STALLED_BIT, &acct->flags);
589 0 : spin_unlock_irq(&wq->hash->wait.lock);
590 0 : if (wq_has_sleeper(&wq->hash->wait))
591 0 : wake_up(&wq->hash->wait);
592 : }
593 0 : } while (work);
594 : } while (1);
595 0 : }
596 :
597 0 : static int io_wq_worker(void *data)
598 : {
599 0 : struct io_worker *worker = data;
600 0 : struct io_wq_acct *acct = io_wq_get_acct(worker);
601 0 : struct io_wq *wq = worker->wq;
602 0 : bool exit_mask = false, last_timeout = false;
603 : char buf[TASK_COMM_LEN];
604 :
605 0 : worker->flags |= (IO_WORKER_F_UP | IO_WORKER_F_RUNNING);
606 :
607 0 : snprintf(buf, sizeof(buf), "iou-wrk-%d", wq->task->pid);
608 0 : set_task_comm(current, buf);
609 :
610 0 : while (!test_bit(IO_WQ_BIT_EXIT, &wq->state)) {
611 : long ret;
612 :
613 0 : set_current_state(TASK_INTERRUPTIBLE);
614 0 : while (io_acct_run_queue(acct))
615 0 : io_worker_handle_work(worker);
616 :
617 0 : raw_spin_lock(&wq->lock);
618 : /*
619 : * Last sleep timed out. Exit if we're not the last worker,
620 : * or if someone modified our affinity.
621 : */
622 0 : if (last_timeout && (exit_mask || acct->nr_workers > 1)) {
623 0 : acct->nr_workers--;
624 0 : raw_spin_unlock(&wq->lock);
625 0 : __set_current_state(TASK_RUNNING);
626 0 : break;
627 : }
628 0 : last_timeout = false;
629 0 : __io_worker_idle(wq, worker);
630 0 : raw_spin_unlock(&wq->lock);
631 0 : if (io_run_task_work())
632 0 : continue;
633 0 : ret = schedule_timeout(WORKER_IDLE_TIMEOUT);
634 0 : if (signal_pending(current)) {
635 : struct ksignal ksig;
636 :
637 0 : if (!get_signal(&ksig))
638 0 : continue;
639 0 : break;
640 : }
641 0 : if (!ret) {
642 0 : last_timeout = true;
643 0 : exit_mask = !cpumask_test_cpu(raw_smp_processor_id(),
644 0 : wq->cpu_mask);
645 : }
646 : }
647 :
648 0 : if (test_bit(IO_WQ_BIT_EXIT, &wq->state))
649 0 : io_worker_handle_work(worker);
650 :
651 0 : io_worker_exit(worker);
652 : return 0;
653 : }
654 :
655 : /*
656 : * Called when a worker is scheduled in. Mark us as currently running.
657 : */
658 0 : void io_wq_worker_running(struct task_struct *tsk)
659 : {
660 0 : struct io_worker *worker = tsk->worker_private;
661 :
662 0 : if (!worker)
663 : return;
664 0 : if (!(worker->flags & IO_WORKER_F_UP))
665 : return;
666 0 : if (worker->flags & IO_WORKER_F_RUNNING)
667 : return;
668 0 : worker->flags |= IO_WORKER_F_RUNNING;
669 : io_wq_inc_running(worker);
670 : }
671 :
672 : /*
673 : * Called when worker is going to sleep. If there are no workers currently
674 : * running and we have work pending, wake up a free one or create a new one.
675 : */
676 0 : void io_wq_worker_sleeping(struct task_struct *tsk)
677 : {
678 0 : struct io_worker *worker = tsk->worker_private;
679 :
680 0 : if (!worker)
681 : return;
682 0 : if (!(worker->flags & IO_WORKER_F_UP))
683 : return;
684 0 : if (!(worker->flags & IO_WORKER_F_RUNNING))
685 : return;
686 :
687 0 : worker->flags &= ~IO_WORKER_F_RUNNING;
688 0 : io_wq_dec_running(worker);
689 : }
690 :
691 0 : static void io_init_new_worker(struct io_wq *wq, struct io_worker *worker,
692 : struct task_struct *tsk)
693 : {
694 0 : tsk->worker_private = worker;
695 0 : worker->task = tsk;
696 0 : set_cpus_allowed_ptr(tsk, wq->cpu_mask);
697 :
698 0 : raw_spin_lock(&wq->lock);
699 0 : hlist_nulls_add_head_rcu(&worker->nulls_node, &wq->free_list);
700 0 : list_add_tail_rcu(&worker->all_list, &wq->all_list);
701 0 : worker->flags |= IO_WORKER_F_FREE;
702 0 : raw_spin_unlock(&wq->lock);
703 0 : wake_up_new_task(tsk);
704 0 : }
705 :
706 0 : static bool io_wq_work_match_all(struct io_wq_work *work, void *data)
707 : {
708 0 : return true;
709 : }
710 :
711 0 : static inline bool io_should_retry_thread(long err)
712 : {
713 : /*
714 : * Prevent perpetual task_work retry, if the task (or its group) is
715 : * exiting.
716 : */
717 0 : if (fatal_signal_pending(current))
718 : return false;
719 :
720 0 : switch (err) {
721 : case -EAGAIN:
722 : case -ERESTARTSYS:
723 : case -ERESTARTNOINTR:
724 : case -ERESTARTNOHAND:
725 : return true;
726 : default:
727 0 : return false;
728 : }
729 : }
730 :
731 0 : static void create_worker_cont(struct callback_head *cb)
732 : {
733 : struct io_worker *worker;
734 : struct task_struct *tsk;
735 : struct io_wq *wq;
736 :
737 0 : worker = container_of(cb, struct io_worker, create_work);
738 0 : clear_bit_unlock(0, &worker->create_state);
739 0 : wq = worker->wq;
740 0 : tsk = create_io_thread(io_wq_worker, worker, NUMA_NO_NODE);
741 0 : if (!IS_ERR(tsk)) {
742 0 : io_init_new_worker(wq, worker, tsk);
743 0 : io_worker_release(worker);
744 0 : return;
745 0 : } else if (!io_should_retry_thread(PTR_ERR(tsk))) {
746 0 : struct io_wq_acct *acct = io_wq_get_acct(worker);
747 :
748 0 : atomic_dec(&acct->nr_running);
749 0 : raw_spin_lock(&wq->lock);
750 0 : acct->nr_workers--;
751 0 : if (!acct->nr_workers) {
752 0 : struct io_cb_cancel_data match = {
753 : .fn = io_wq_work_match_all,
754 : .cancel_all = true,
755 : };
756 :
757 0 : raw_spin_unlock(&wq->lock);
758 0 : while (io_acct_cancel_pending_work(wq, acct, &match))
759 : ;
760 : } else {
761 0 : raw_spin_unlock(&wq->lock);
762 : }
763 0 : io_worker_ref_put(wq);
764 0 : kfree(worker);
765 0 : return;
766 : }
767 :
768 : /* re-create attempts grab a new worker ref, drop the existing one */
769 0 : io_worker_release(worker);
770 0 : schedule_work(&worker->work);
771 : }
772 :
773 0 : static void io_workqueue_create(struct work_struct *work)
774 : {
775 0 : struct io_worker *worker = container_of(work, struct io_worker, work);
776 0 : struct io_wq_acct *acct = io_wq_get_acct(worker);
777 :
778 0 : if (!io_queue_worker_create(worker, acct, create_worker_cont))
779 0 : kfree(worker);
780 0 : }
781 :
782 0 : static bool create_io_worker(struct io_wq *wq, int index)
783 : {
784 0 : struct io_wq_acct *acct = &wq->acct[index];
785 : struct io_worker *worker;
786 : struct task_struct *tsk;
787 :
788 0 : __set_current_state(TASK_RUNNING);
789 :
790 0 : worker = kzalloc(sizeof(*worker), GFP_KERNEL);
791 0 : if (!worker) {
792 : fail:
793 0 : atomic_dec(&acct->nr_running);
794 0 : raw_spin_lock(&wq->lock);
795 0 : acct->nr_workers--;
796 0 : raw_spin_unlock(&wq->lock);
797 : io_worker_ref_put(wq);
798 : return false;
799 : }
800 :
801 0 : refcount_set(&worker->ref, 1);
802 0 : worker->wq = wq;
803 : raw_spin_lock_init(&worker->lock);
804 0 : init_completion(&worker->ref_done);
805 :
806 0 : if (index == IO_WQ_ACCT_BOUND)
807 0 : worker->flags |= IO_WORKER_F_BOUND;
808 :
809 0 : tsk = create_io_thread(io_wq_worker, worker, NUMA_NO_NODE);
810 0 : if (!IS_ERR(tsk)) {
811 0 : io_init_new_worker(wq, worker, tsk);
812 0 : } else if (!io_should_retry_thread(PTR_ERR(tsk))) {
813 0 : kfree(worker);
814 0 : goto fail;
815 : } else {
816 0 : INIT_WORK(&worker->work, io_workqueue_create);
817 0 : schedule_work(&worker->work);
818 : }
819 :
820 : return true;
821 : }
822 :
823 : /*
824 : * Iterate the passed in list and call the specific function for each
825 : * worker that isn't exiting
826 : */
827 0 : static bool io_wq_for_each_worker(struct io_wq *wq,
828 : bool (*func)(struct io_worker *, void *),
829 : void *data)
830 : {
831 : struct io_worker *worker;
832 0 : bool ret = false;
833 :
834 0 : list_for_each_entry_rcu(worker, &wq->all_list, all_list) {
835 0 : if (io_worker_get(worker)) {
836 : /* no task if node is/was offline */
837 0 : if (worker->task)
838 0 : ret = func(worker, data);
839 0 : io_worker_release(worker);
840 0 : if (ret)
841 : break;
842 : }
843 : }
844 :
845 0 : return ret;
846 : }
847 :
848 0 : static bool io_wq_worker_wake(struct io_worker *worker, void *data)
849 : {
850 0 : __set_notify_signal(worker->task);
851 0 : wake_up_process(worker->task);
852 0 : return false;
853 : }
854 :
855 : static void io_run_cancel(struct io_wq_work *work, struct io_wq *wq)
856 : {
857 : do {
858 0 : work->flags |= IO_WQ_WORK_CANCEL;
859 0 : wq->do_work(work);
860 0 : work = wq->free_work(work);
861 0 : } while (work);
862 : }
863 :
864 0 : static void io_wq_insert_work(struct io_wq *wq, struct io_wq_work *work)
865 : {
866 0 : struct io_wq_acct *acct = io_work_get_acct(wq, work);
867 : unsigned int hash;
868 : struct io_wq_work *tail;
869 :
870 0 : if (!io_wq_is_hashed(work)) {
871 : append:
872 0 : wq_list_add_tail(&work->list, &acct->work_list);
873 : return;
874 : }
875 :
876 0 : hash = io_get_work_hash(work);
877 0 : tail = wq->hash_tail[hash];
878 0 : wq->hash_tail[hash] = work;
879 0 : if (!tail)
880 : goto append;
881 :
882 0 : wq_list_add_after(&work->list, &tail->list, &acct->work_list);
883 : }
884 :
885 0 : static bool io_wq_work_match_item(struct io_wq_work *work, void *data)
886 : {
887 0 : return work == data;
888 : }
889 :
890 0 : void io_wq_enqueue(struct io_wq *wq, struct io_wq_work *work)
891 : {
892 0 : struct io_wq_acct *acct = io_work_get_acct(wq, work);
893 : struct io_cb_cancel_data match;
894 0 : unsigned work_flags = work->flags;
895 : bool do_create;
896 :
897 : /*
898 : * If io-wq is exiting for this task, or if the request has explicitly
899 : * been marked as one that should not get executed, cancel it here.
900 : */
901 0 : if (test_bit(IO_WQ_BIT_EXIT, &wq->state) ||
902 0 : (work->flags & IO_WQ_WORK_CANCEL)) {
903 : io_run_cancel(work, wq);
904 0 : return;
905 : }
906 :
907 0 : raw_spin_lock(&acct->lock);
908 0 : io_wq_insert_work(wq, work);
909 0 : clear_bit(IO_ACCT_STALLED_BIT, &acct->flags);
910 0 : raw_spin_unlock(&acct->lock);
911 :
912 0 : raw_spin_lock(&wq->lock);
913 : rcu_read_lock();
914 0 : do_create = !io_wq_activate_free_worker(wq, acct);
915 : rcu_read_unlock();
916 :
917 0 : raw_spin_unlock(&wq->lock);
918 :
919 0 : if (do_create && ((work_flags & IO_WQ_WORK_CONCURRENT) ||
920 0 : !atomic_read(&acct->nr_running))) {
921 : bool did_create;
922 :
923 0 : did_create = io_wq_create_worker(wq, acct);
924 0 : if (likely(did_create))
925 : return;
926 :
927 0 : raw_spin_lock(&wq->lock);
928 0 : if (acct->nr_workers) {
929 0 : raw_spin_unlock(&wq->lock);
930 0 : return;
931 : }
932 0 : raw_spin_unlock(&wq->lock);
933 :
934 : /* fatal condition, failed to create the first worker */
935 0 : match.fn = io_wq_work_match_item,
936 0 : match.data = work,
937 0 : match.cancel_all = false,
938 :
939 0 : io_acct_cancel_pending_work(wq, acct, &match);
940 : }
941 : }
942 :
943 : /*
944 : * Work items that hash to the same value will not be done in parallel.
945 : * Used to limit concurrent writes, generally hashed by inode.
946 : */
947 0 : void io_wq_hash_work(struct io_wq_work *work, void *val)
948 : {
949 : unsigned int bit;
950 :
951 0 : bit = hash_ptr(val, IO_WQ_HASH_ORDER);
952 0 : work->flags |= (IO_WQ_WORK_HASHED | (bit << IO_WQ_HASH_SHIFT));
953 0 : }
954 :
955 0 : static bool __io_wq_worker_cancel(struct io_worker *worker,
956 : struct io_cb_cancel_data *match,
957 : struct io_wq_work *work)
958 : {
959 0 : if (work && match->fn(work, match->data)) {
960 0 : work->flags |= IO_WQ_WORK_CANCEL;
961 0 : __set_notify_signal(worker->task);
962 : return true;
963 : }
964 :
965 : return false;
966 : }
967 :
968 0 : static bool io_wq_worker_cancel(struct io_worker *worker, void *data)
969 : {
970 0 : struct io_cb_cancel_data *match = data;
971 :
972 : /*
973 : * Hold the lock to avoid ->cur_work going out of scope, caller
974 : * may dereference the passed in work.
975 : */
976 0 : raw_spin_lock(&worker->lock);
977 0 : if (__io_wq_worker_cancel(worker, match, worker->cur_work) ||
978 0 : __io_wq_worker_cancel(worker, match, worker->next_work))
979 0 : match->nr_running++;
980 0 : raw_spin_unlock(&worker->lock);
981 :
982 0 : return match->nr_running && !match->cancel_all;
983 : }
984 :
985 0 : static inline void io_wq_remove_pending(struct io_wq *wq,
986 : struct io_wq_work *work,
987 : struct io_wq_work_node *prev)
988 : {
989 0 : struct io_wq_acct *acct = io_work_get_acct(wq, work);
990 0 : unsigned int hash = io_get_work_hash(work);
991 0 : struct io_wq_work *prev_work = NULL;
992 :
993 0 : if (io_wq_is_hashed(work) && work == wq->hash_tail[hash]) {
994 0 : if (prev)
995 0 : prev_work = container_of(prev, struct io_wq_work, list);
996 0 : if (prev_work && io_get_work_hash(prev_work) == hash)
997 0 : wq->hash_tail[hash] = prev_work;
998 : else
999 0 : wq->hash_tail[hash] = NULL;
1000 : }
1001 0 : wq_list_del(&acct->work_list, &work->list, prev);
1002 0 : }
1003 :
1004 0 : static bool io_acct_cancel_pending_work(struct io_wq *wq,
1005 : struct io_wq_acct *acct,
1006 : struct io_cb_cancel_data *match)
1007 : {
1008 : struct io_wq_work_node *node, *prev;
1009 : struct io_wq_work *work;
1010 :
1011 0 : raw_spin_lock(&acct->lock);
1012 0 : wq_list_for_each(node, prev, &acct->work_list) {
1013 0 : work = container_of(node, struct io_wq_work, list);
1014 0 : if (!match->fn(work, match->data))
1015 0 : continue;
1016 0 : io_wq_remove_pending(wq, work, prev);
1017 0 : raw_spin_unlock(&acct->lock);
1018 0 : io_run_cancel(work, wq);
1019 0 : match->nr_pending++;
1020 : /* not safe to continue after unlock */
1021 : return true;
1022 : }
1023 0 : raw_spin_unlock(&acct->lock);
1024 :
1025 : return false;
1026 : }
1027 :
1028 0 : static void io_wq_cancel_pending_work(struct io_wq *wq,
1029 : struct io_cb_cancel_data *match)
1030 : {
1031 : int i;
1032 : retry:
1033 0 : for (i = 0; i < IO_WQ_ACCT_NR; i++) {
1034 0 : struct io_wq_acct *acct = io_get_acct(wq, i == 0);
1035 :
1036 0 : if (io_acct_cancel_pending_work(wq, acct, match)) {
1037 0 : if (match->cancel_all)
1038 : goto retry;
1039 : break;
1040 : }
1041 : }
1042 0 : }
1043 :
1044 : static void io_wq_cancel_running_work(struct io_wq *wq,
1045 : struct io_cb_cancel_data *match)
1046 : {
1047 : rcu_read_lock();
1048 0 : io_wq_for_each_worker(wq, io_wq_worker_cancel, match);
1049 : rcu_read_unlock();
1050 : }
1051 :
1052 0 : enum io_wq_cancel io_wq_cancel_cb(struct io_wq *wq, work_cancel_fn *cancel,
1053 : void *data, bool cancel_all)
1054 : {
1055 0 : struct io_cb_cancel_data match = {
1056 : .fn = cancel,
1057 : .data = data,
1058 : .cancel_all = cancel_all,
1059 : };
1060 :
1061 : /*
1062 : * First check pending list, if we're lucky we can just remove it
1063 : * from there. CANCEL_OK means that the work is returned as-new,
1064 : * no completion will be posted for it.
1065 : *
1066 : * Then check if a free (going busy) or busy worker has the work
1067 : * currently running. If we find it there, we'll return CANCEL_RUNNING
1068 : * as an indication that we attempt to signal cancellation. The
1069 : * completion will run normally in this case.
1070 : *
1071 : * Do both of these while holding the wq->lock, to ensure that
1072 : * we'll find a work item regardless of state.
1073 : */
1074 0 : io_wq_cancel_pending_work(wq, &match);
1075 0 : if (match.nr_pending && !match.cancel_all)
1076 : return IO_WQ_CANCEL_OK;
1077 :
1078 0 : raw_spin_lock(&wq->lock);
1079 0 : io_wq_cancel_running_work(wq, &match);
1080 0 : raw_spin_unlock(&wq->lock);
1081 0 : if (match.nr_running && !match.cancel_all)
1082 : return IO_WQ_CANCEL_RUNNING;
1083 :
1084 0 : if (match.nr_running)
1085 : return IO_WQ_CANCEL_RUNNING;
1086 0 : if (match.nr_pending)
1087 : return IO_WQ_CANCEL_OK;
1088 0 : return IO_WQ_CANCEL_NOTFOUND;
1089 : }
1090 :
1091 0 : static int io_wq_hash_wake(struct wait_queue_entry *wait, unsigned mode,
1092 : int sync, void *key)
1093 : {
1094 0 : struct io_wq *wq = container_of(wait, struct io_wq, wait);
1095 : int i;
1096 :
1097 0 : list_del_init(&wait->entry);
1098 :
1099 : rcu_read_lock();
1100 0 : for (i = 0; i < IO_WQ_ACCT_NR; i++) {
1101 0 : struct io_wq_acct *acct = &wq->acct[i];
1102 :
1103 0 : if (test_and_clear_bit(IO_ACCT_STALLED_BIT, &acct->flags))
1104 0 : io_wq_activate_free_worker(wq, acct);
1105 : }
1106 : rcu_read_unlock();
1107 0 : return 1;
1108 : }
1109 :
1110 0 : struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
1111 : {
1112 : int ret, i;
1113 : struct io_wq *wq;
1114 :
1115 0 : if (WARN_ON_ONCE(!data->free_work || !data->do_work))
1116 : return ERR_PTR(-EINVAL);
1117 0 : if (WARN_ON_ONCE(!bounded))
1118 : return ERR_PTR(-EINVAL);
1119 :
1120 0 : wq = kzalloc(sizeof(struct io_wq), GFP_KERNEL);
1121 0 : if (!wq)
1122 : return ERR_PTR(-ENOMEM);
1123 0 : ret = cpuhp_state_add_instance_nocalls(io_wq_online, &wq->cpuhp_node);
1124 0 : if (ret)
1125 : goto err_wq;
1126 :
1127 0 : refcount_inc(&data->hash->refs);
1128 0 : wq->hash = data->hash;
1129 0 : wq->free_work = data->free_work;
1130 0 : wq->do_work = data->do_work;
1131 :
1132 0 : ret = -ENOMEM;
1133 :
1134 0 : if (!alloc_cpumask_var(&wq->cpu_mask, GFP_KERNEL))
1135 : goto err;
1136 0 : cpumask_copy(wq->cpu_mask, cpu_possible_mask);
1137 0 : wq->acct[IO_WQ_ACCT_BOUND].max_workers = bounded;
1138 0 : wq->acct[IO_WQ_ACCT_UNBOUND].max_workers =
1139 0 : task_rlimit(current, RLIMIT_NPROC);
1140 0 : INIT_LIST_HEAD(&wq->wait.entry);
1141 0 : wq->wait.func = io_wq_hash_wake;
1142 0 : for (i = 0; i < IO_WQ_ACCT_NR; i++) {
1143 0 : struct io_wq_acct *acct = &wq->acct[i];
1144 :
1145 0 : acct->index = i;
1146 0 : atomic_set(&acct->nr_running, 0);
1147 0 : INIT_WQ_LIST(&acct->work_list);
1148 : raw_spin_lock_init(&acct->lock);
1149 : }
1150 :
1151 : raw_spin_lock_init(&wq->lock);
1152 0 : INIT_HLIST_NULLS_HEAD(&wq->free_list, 0);
1153 0 : INIT_LIST_HEAD(&wq->all_list);
1154 :
1155 0 : wq->task = get_task_struct(data->task);
1156 0 : atomic_set(&wq->worker_refs, 1);
1157 0 : init_completion(&wq->worker_done);
1158 0 : return wq;
1159 : err:
1160 : io_wq_put_hash(data->hash);
1161 : cpuhp_state_remove_instance_nocalls(io_wq_online, &wq->cpuhp_node);
1162 :
1163 : free_cpumask_var(wq->cpu_mask);
1164 : err_wq:
1165 0 : kfree(wq);
1166 0 : return ERR_PTR(ret);
1167 : }
1168 :
1169 0 : static bool io_task_work_match(struct callback_head *cb, void *data)
1170 : {
1171 : struct io_worker *worker;
1172 :
1173 0 : if (cb->func != create_worker_cb && cb->func != create_worker_cont)
1174 : return false;
1175 0 : worker = container_of(cb, struct io_worker, create_work);
1176 0 : return worker->wq == data;
1177 : }
1178 :
1179 0 : void io_wq_exit_start(struct io_wq *wq)
1180 : {
1181 0 : set_bit(IO_WQ_BIT_EXIT, &wq->state);
1182 0 : }
1183 :
1184 0 : static void io_wq_cancel_tw_create(struct io_wq *wq)
1185 : {
1186 : struct callback_head *cb;
1187 :
1188 0 : while ((cb = task_work_cancel_match(wq->task, io_task_work_match, wq)) != NULL) {
1189 : struct io_worker *worker;
1190 :
1191 0 : worker = container_of(cb, struct io_worker, create_work);
1192 0 : io_worker_cancel_cb(worker);
1193 : /*
1194 : * Only the worker continuation helper has worker allocated and
1195 : * hence needs freeing.
1196 : */
1197 0 : if (cb->func == create_worker_cont)
1198 0 : kfree(worker);
1199 : }
1200 0 : }
1201 :
1202 0 : static void io_wq_exit_workers(struct io_wq *wq)
1203 : {
1204 0 : if (!wq->task)
1205 : return;
1206 :
1207 0 : io_wq_cancel_tw_create(wq);
1208 :
1209 : rcu_read_lock();
1210 0 : io_wq_for_each_worker(wq, io_wq_worker_wake, NULL);
1211 0 : rcu_read_unlock();
1212 0 : io_worker_ref_put(wq);
1213 0 : wait_for_completion(&wq->worker_done);
1214 :
1215 0 : spin_lock_irq(&wq->hash->wait.lock);
1216 0 : list_del_init(&wq->wait.entry);
1217 0 : spin_unlock_irq(&wq->hash->wait.lock);
1218 :
1219 0 : put_task_struct(wq->task);
1220 0 : wq->task = NULL;
1221 : }
1222 :
1223 0 : static void io_wq_destroy(struct io_wq *wq)
1224 : {
1225 0 : struct io_cb_cancel_data match = {
1226 : .fn = io_wq_work_match_all,
1227 : .cancel_all = true,
1228 : };
1229 :
1230 0 : cpuhp_state_remove_instance_nocalls(io_wq_online, &wq->cpuhp_node);
1231 0 : io_wq_cancel_pending_work(wq, &match);
1232 0 : free_cpumask_var(wq->cpu_mask);
1233 0 : io_wq_put_hash(wq->hash);
1234 0 : kfree(wq);
1235 0 : }
1236 :
1237 0 : void io_wq_put_and_exit(struct io_wq *wq)
1238 : {
1239 0 : WARN_ON_ONCE(!test_bit(IO_WQ_BIT_EXIT, &wq->state));
1240 :
1241 0 : io_wq_exit_workers(wq);
1242 0 : io_wq_destroy(wq);
1243 0 : }
1244 :
1245 : struct online_data {
1246 : unsigned int cpu;
1247 : bool online;
1248 : };
1249 :
1250 0 : static bool io_wq_worker_affinity(struct io_worker *worker, void *data)
1251 : {
1252 0 : struct online_data *od = data;
1253 :
1254 0 : if (od->online)
1255 0 : cpumask_set_cpu(od->cpu, worker->wq->cpu_mask);
1256 : else
1257 0 : cpumask_clear_cpu(od->cpu, worker->wq->cpu_mask);
1258 0 : return false;
1259 : }
1260 :
1261 : static int __io_wq_cpu_online(struct io_wq *wq, unsigned int cpu, bool online)
1262 : {
1263 0 : struct online_data od = {
1264 : .cpu = cpu,
1265 : .online = online
1266 : };
1267 :
1268 : rcu_read_lock();
1269 0 : io_wq_for_each_worker(wq, io_wq_worker_affinity, &od);
1270 : rcu_read_unlock();
1271 : return 0;
1272 : }
1273 :
1274 0 : static int io_wq_cpu_online(unsigned int cpu, struct hlist_node *node)
1275 : {
1276 0 : struct io_wq *wq = hlist_entry_safe(node, struct io_wq, cpuhp_node);
1277 :
1278 0 : return __io_wq_cpu_online(wq, cpu, true);
1279 : }
1280 :
1281 0 : static int io_wq_cpu_offline(unsigned int cpu, struct hlist_node *node)
1282 : {
1283 0 : struct io_wq *wq = hlist_entry_safe(node, struct io_wq, cpuhp_node);
1284 :
1285 0 : return __io_wq_cpu_online(wq, cpu, false);
1286 : }
1287 :
1288 0 : int io_wq_cpu_affinity(struct io_wq *wq, cpumask_var_t mask)
1289 : {
1290 : rcu_read_lock();
1291 0 : if (mask)
1292 0 : cpumask_copy(wq->cpu_mask, mask);
1293 : else
1294 0 : cpumask_copy(wq->cpu_mask, cpu_possible_mask);
1295 : rcu_read_unlock();
1296 :
1297 0 : return 0;
1298 : }
1299 :
1300 : /*
1301 : * Set max number of unbounded workers, returns old value. If new_count is 0,
1302 : * then just return the old value.
1303 : */
1304 0 : int io_wq_max_workers(struct io_wq *wq, int *new_count)
1305 : {
1306 : struct io_wq_acct *acct;
1307 : int prev[IO_WQ_ACCT_NR];
1308 : int i;
1309 :
1310 : BUILD_BUG_ON((int) IO_WQ_ACCT_BOUND != (int) IO_WQ_BOUND);
1311 : BUILD_BUG_ON((int) IO_WQ_ACCT_UNBOUND != (int) IO_WQ_UNBOUND);
1312 : BUILD_BUG_ON((int) IO_WQ_ACCT_NR != 2);
1313 :
1314 0 : for (i = 0; i < IO_WQ_ACCT_NR; i++) {
1315 0 : if (new_count[i] > task_rlimit(current, RLIMIT_NPROC))
1316 0 : new_count[i] = task_rlimit(current, RLIMIT_NPROC);
1317 : }
1318 :
1319 0 : for (i = 0; i < IO_WQ_ACCT_NR; i++)
1320 0 : prev[i] = 0;
1321 :
1322 : rcu_read_lock();
1323 :
1324 0 : raw_spin_lock(&wq->lock);
1325 0 : for (i = 0; i < IO_WQ_ACCT_NR; i++) {
1326 0 : acct = &wq->acct[i];
1327 0 : prev[i] = max_t(int, acct->max_workers, prev[i]);
1328 0 : if (new_count[i])
1329 0 : acct->max_workers = new_count[i];
1330 : }
1331 0 : raw_spin_unlock(&wq->lock);
1332 : rcu_read_unlock();
1333 :
1334 0 : for (i = 0; i < IO_WQ_ACCT_NR; i++)
1335 0 : new_count[i] = prev[i];
1336 :
1337 0 : return 0;
1338 : }
1339 :
1340 1 : static __init int io_wq_init(void)
1341 : {
1342 : int ret;
1343 :
1344 1 : ret = cpuhp_setup_state_multi(CPUHP_AP_ONLINE_DYN, "io-wq/online",
1345 : io_wq_cpu_online, io_wq_cpu_offline);
1346 1 : if (ret < 0)
1347 : return ret;
1348 1 : io_wq_online = ret;
1349 1 : return 0;
1350 : }
1351 : subsys_initcall(io_wq_init);
|