io-wq.c 33 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402
  1. // SPDX-License-Identifier: GPL-2.0
  2. /*
  3. * Basic worker thread pool for io_uring
  4. *
  5. * Copyright (C) 2019 Jens Axboe
  6. *
  7. */
  8. #include <linux/kernel.h>
  9. #include <linux/init.h>
  10. #include <linux/errno.h>
  11. #include <linux/sched/signal.h>
  12. #include <linux/percpu.h>
  13. #include <linux/slab.h>
  14. #include <linux/rculist_nulls.h>
  15. #include <linux/cpu.h>
  16. #include <linux/cpuset.h>
  17. #include <linux/task_work.h>
  18. #include <linux/audit.h>
  19. #include <linux/mmu_context.h>
  20. #include <uapi/linux/io_uring.h>
  21. #include "io-wq.h"
  22. #include "slist.h"
  23. #include "io_uring.h"
  24. #define WORKER_IDLE_TIMEOUT (5 * HZ)
  25. #define WORKER_INIT_LIMIT 3
  26. enum {
  27. IO_WORKER_F_UP = 0, /* up and active */
  28. IO_WORKER_F_RUNNING = 1, /* account as running */
  29. IO_WORKER_F_FREE = 2, /* worker on free list */
  30. IO_WORKER_F_BOUND = 3, /* is doing bounded work */
  31. };
  32. enum {
  33. IO_WQ_BIT_EXIT = 0, /* wq exiting */
  34. };
  35. enum {
  36. IO_ACCT_STALLED_BIT = 0, /* stalled on hash */
  37. };
  38. /*
  39. * One for each thread in a wq pool
  40. */
  41. struct io_worker {
  42. refcount_t ref;
  43. int create_index;
  44. unsigned long flags;
  45. struct hlist_nulls_node nulls_node;
  46. struct list_head all_list;
  47. struct task_struct *task;
  48. struct io_wq *wq;
  49. struct io_wq_work *cur_work;
  50. raw_spinlock_t lock;
  51. struct completion ref_done;
  52. unsigned long create_state;
  53. struct callback_head create_work;
  54. int init_retries;
  55. union {
  56. struct rcu_head rcu;
  57. struct work_struct work;
  58. };
  59. };
  60. #if BITS_PER_LONG == 64
  61. #define IO_WQ_HASH_ORDER 6
  62. #else
  63. #define IO_WQ_HASH_ORDER 5
  64. #endif
  65. #define IO_WQ_NR_HASH_BUCKETS (1u << IO_WQ_HASH_ORDER)
  66. struct io_wq_acct {
  67. unsigned nr_workers;
  68. unsigned max_workers;
  69. int index;
  70. atomic_t nr_running;
  71. raw_spinlock_t lock;
  72. struct io_wq_work_list work_list;
  73. unsigned long flags;
  74. };
  75. enum {
  76. IO_WQ_ACCT_BOUND,
  77. IO_WQ_ACCT_UNBOUND,
  78. IO_WQ_ACCT_NR,
  79. };
  80. /*
  81. * Per io_wq state
  82. */
  83. struct io_wq {
  84. unsigned long state;
  85. free_work_fn *free_work;
  86. io_wq_work_fn *do_work;
  87. struct io_wq_hash *hash;
  88. atomic_t worker_refs;
  89. struct completion worker_done;
  90. struct hlist_node cpuhp_node;
  91. struct task_struct *task;
  92. struct io_wq_acct acct[IO_WQ_ACCT_NR];
  93. /* lock protects access to elements below */
  94. raw_spinlock_t lock;
  95. struct hlist_nulls_head free_list;
  96. struct list_head all_list;
  97. struct wait_queue_entry wait;
  98. struct io_wq_work *hash_tail[IO_WQ_NR_HASH_BUCKETS];
  99. cpumask_var_t cpu_mask;
  100. };
  101. static enum cpuhp_state io_wq_online;
  102. struct io_cb_cancel_data {
  103. work_cancel_fn *fn;
  104. void *data;
  105. int nr_running;
  106. int nr_pending;
  107. bool cancel_all;
  108. };
  109. static bool create_io_worker(struct io_wq *wq, int index);
  110. static void io_wq_dec_running(struct io_worker *worker);
  111. static bool io_acct_cancel_pending_work(struct io_wq *wq,
  112. struct io_wq_acct *acct,
  113. struct io_cb_cancel_data *match);
  114. static void create_worker_cb(struct callback_head *cb);
  115. static void io_wq_cancel_tw_create(struct io_wq *wq);
  116. static bool io_worker_get(struct io_worker *worker)
  117. {
  118. return refcount_inc_not_zero(&worker->ref);
  119. }
  120. static void io_worker_release(struct io_worker *worker)
  121. {
  122. if (refcount_dec_and_test(&worker->ref))
  123. complete(&worker->ref_done);
  124. }
  125. static inline struct io_wq_acct *io_get_acct(struct io_wq *wq, bool bound)
  126. {
  127. return &wq->acct[bound ? IO_WQ_ACCT_BOUND : IO_WQ_ACCT_UNBOUND];
  128. }
  129. static inline struct io_wq_acct *io_work_get_acct(struct io_wq *wq,
  130. struct io_wq_work *work)
  131. {
  132. return io_get_acct(wq, !(atomic_read(&work->flags) & IO_WQ_WORK_UNBOUND));
  133. }
  134. static inline struct io_wq_acct *io_wq_get_acct(struct io_worker *worker)
  135. {
  136. return io_get_acct(worker->wq, test_bit(IO_WORKER_F_BOUND, &worker->flags));
  137. }
  138. static void io_worker_ref_put(struct io_wq *wq)
  139. {
  140. if (atomic_dec_and_test(&wq->worker_refs))
  141. complete(&wq->worker_done);
  142. }
  143. bool io_wq_worker_stopped(void)
  144. {
  145. struct io_worker *worker = current->worker_private;
  146. if (WARN_ON_ONCE(!io_wq_current_is_worker()))
  147. return true;
  148. return test_bit(IO_WQ_BIT_EXIT, &worker->wq->state);
  149. }
  150. static void io_worker_cancel_cb(struct io_worker *worker)
  151. {
  152. struct io_wq_acct *acct = io_wq_get_acct(worker);
  153. struct io_wq *wq = worker->wq;
  154. atomic_dec(&acct->nr_running);
  155. raw_spin_lock(&wq->lock);
  156. acct->nr_workers--;
  157. raw_spin_unlock(&wq->lock);
  158. io_worker_ref_put(wq);
  159. clear_bit_unlock(0, &worker->create_state);
  160. io_worker_release(worker);
  161. }
  162. static bool io_task_worker_match(struct callback_head *cb, void *data)
  163. {
  164. struct io_worker *worker;
  165. if (cb->func != create_worker_cb)
  166. return false;
  167. worker = container_of(cb, struct io_worker, create_work);
  168. return worker == data;
  169. }
  170. static void io_worker_exit(struct io_worker *worker)
  171. {
  172. struct io_wq *wq = worker->wq;
  173. while (1) {
  174. struct callback_head *cb = task_work_cancel_match(wq->task,
  175. io_task_worker_match, worker);
  176. if (!cb)
  177. break;
  178. io_worker_cancel_cb(worker);
  179. }
  180. io_worker_release(worker);
  181. wait_for_completion(&worker->ref_done);
  182. raw_spin_lock(&wq->lock);
  183. if (test_bit(IO_WORKER_F_FREE, &worker->flags))
  184. hlist_nulls_del_rcu(&worker->nulls_node);
  185. list_del_rcu(&worker->all_list);
  186. raw_spin_unlock(&wq->lock);
  187. io_wq_dec_running(worker);
  188. /*
  189. * this worker is a goner, clear ->worker_private to avoid any
  190. * inc/dec running calls that could happen as part of exit from
  191. * touching 'worker'.
  192. */
  193. current->worker_private = NULL;
  194. kfree_rcu(worker, rcu);
  195. io_worker_ref_put(wq);
  196. do_exit(0);
  197. }
  198. static inline bool __io_acct_run_queue(struct io_wq_acct *acct)
  199. {
  200. return !test_bit(IO_ACCT_STALLED_BIT, &acct->flags) &&
  201. !wq_list_empty(&acct->work_list);
  202. }
  203. /*
  204. * If there's work to do, returns true with acct->lock acquired. If not,
  205. * returns false with no lock held.
  206. */
  207. static inline bool io_acct_run_queue(struct io_wq_acct *acct)
  208. __acquires(&acct->lock)
  209. {
  210. raw_spin_lock(&acct->lock);
  211. if (__io_acct_run_queue(acct))
  212. return true;
  213. raw_spin_unlock(&acct->lock);
  214. return false;
  215. }
  216. /*
  217. * Check head of free list for an available worker. If one isn't available,
  218. * caller must create one.
  219. */
  220. static bool io_wq_activate_free_worker(struct io_wq *wq,
  221. struct io_wq_acct *acct)
  222. __must_hold(RCU)
  223. {
  224. struct hlist_nulls_node *n;
  225. struct io_worker *worker;
  226. /*
  227. * Iterate free_list and see if we can find an idle worker to
  228. * activate. If a given worker is on the free_list but in the process
  229. * of exiting, keep trying.
  230. */
  231. hlist_nulls_for_each_entry_rcu(worker, n, &wq->free_list, nulls_node) {
  232. if (!io_worker_get(worker))
  233. continue;
  234. if (io_wq_get_acct(worker) != acct) {
  235. io_worker_release(worker);
  236. continue;
  237. }
  238. /*
  239. * If the worker is already running, it's either already
  240. * starting work or finishing work. In either case, if it does
  241. * to go sleep, we'll kick off a new task for this work anyway.
  242. */
  243. wake_up_process(worker->task);
  244. io_worker_release(worker);
  245. return true;
  246. }
  247. return false;
  248. }
  249. /*
  250. * We need a worker. If we find a free one, we're good. If not, and we're
  251. * below the max number of workers, create one.
  252. */
  253. static bool io_wq_create_worker(struct io_wq *wq, struct io_wq_acct *acct)
  254. {
  255. /*
  256. * Most likely an attempt to queue unbounded work on an io_wq that
  257. * wasn't setup with any unbounded workers.
  258. */
  259. if (unlikely(!acct->max_workers))
  260. pr_warn_once("io-wq is not configured for unbound workers");
  261. raw_spin_lock(&wq->lock);
  262. if (acct->nr_workers >= acct->max_workers) {
  263. raw_spin_unlock(&wq->lock);
  264. return true;
  265. }
  266. acct->nr_workers++;
  267. raw_spin_unlock(&wq->lock);
  268. atomic_inc(&acct->nr_running);
  269. atomic_inc(&wq->worker_refs);
  270. return create_io_worker(wq, acct->index);
  271. }
  272. static void io_wq_inc_running(struct io_worker *worker)
  273. {
  274. struct io_wq_acct *acct = io_wq_get_acct(worker);
  275. atomic_inc(&acct->nr_running);
  276. }
  277. static void create_worker_cb(struct callback_head *cb)
  278. {
  279. struct io_worker *worker;
  280. struct io_wq *wq;
  281. struct io_wq_acct *acct;
  282. bool do_create = false;
  283. worker = container_of(cb, struct io_worker, create_work);
  284. wq = worker->wq;
  285. acct = &wq->acct[worker->create_index];
  286. raw_spin_lock(&wq->lock);
  287. if (acct->nr_workers < acct->max_workers) {
  288. acct->nr_workers++;
  289. do_create = true;
  290. }
  291. raw_spin_unlock(&wq->lock);
  292. if (do_create) {
  293. create_io_worker(wq, worker->create_index);
  294. } else {
  295. atomic_dec(&acct->nr_running);
  296. io_worker_ref_put(wq);
  297. }
  298. clear_bit_unlock(0, &worker->create_state);
  299. io_worker_release(worker);
  300. }
  301. static bool io_queue_worker_create(struct io_worker *worker,
  302. struct io_wq_acct *acct,
  303. task_work_func_t func)
  304. {
  305. struct io_wq *wq = worker->wq;
  306. /* raced with exit, just ignore create call */
  307. if (test_bit(IO_WQ_BIT_EXIT, &wq->state))
  308. goto fail;
  309. if (!io_worker_get(worker))
  310. goto fail;
  311. /*
  312. * create_state manages ownership of create_work/index. We should
  313. * only need one entry per worker, as the worker going to sleep
  314. * will trigger the condition, and waking will clear it once it
  315. * runs the task_work.
  316. */
  317. if (test_bit(0, &worker->create_state) ||
  318. test_and_set_bit_lock(0, &worker->create_state))
  319. goto fail_release;
  320. atomic_inc(&wq->worker_refs);
  321. init_task_work(&worker->create_work, func);
  322. worker->create_index = acct->index;
  323. if (!task_work_add(wq->task, &worker->create_work, TWA_SIGNAL)) {
  324. /*
  325. * EXIT may have been set after checking it above, check after
  326. * adding the task_work and remove any creation item if it is
  327. * now set. wq exit does that too, but we can have added this
  328. * work item after we canceled in io_wq_exit_workers().
  329. */
  330. if (test_bit(IO_WQ_BIT_EXIT, &wq->state))
  331. io_wq_cancel_tw_create(wq);
  332. io_worker_ref_put(wq);
  333. return true;
  334. }
  335. io_worker_ref_put(wq);
  336. clear_bit_unlock(0, &worker->create_state);
  337. fail_release:
  338. io_worker_release(worker);
  339. fail:
  340. atomic_dec(&acct->nr_running);
  341. io_worker_ref_put(wq);
  342. return false;
  343. }
  344. static void io_wq_dec_running(struct io_worker *worker)
  345. {
  346. struct io_wq_acct *acct = io_wq_get_acct(worker);
  347. struct io_wq *wq = worker->wq;
  348. if (!test_bit(IO_WORKER_F_UP, &worker->flags))
  349. return;
  350. if (!atomic_dec_and_test(&acct->nr_running))
  351. return;
  352. if (!io_acct_run_queue(acct))
  353. return;
  354. raw_spin_unlock(&acct->lock);
  355. atomic_inc(&acct->nr_running);
  356. atomic_inc(&wq->worker_refs);
  357. io_queue_worker_create(worker, acct, create_worker_cb);
  358. }
  359. /*
  360. * Worker will start processing some work. Move it to the busy list, if
  361. * it's currently on the freelist
  362. */
  363. static void __io_worker_busy(struct io_wq *wq, struct io_worker *worker)
  364. {
  365. if (test_bit(IO_WORKER_F_FREE, &worker->flags)) {
  366. clear_bit(IO_WORKER_F_FREE, &worker->flags);
  367. raw_spin_lock(&wq->lock);
  368. hlist_nulls_del_init_rcu(&worker->nulls_node);
  369. raw_spin_unlock(&wq->lock);
  370. }
  371. }
  372. /*
  373. * No work, worker going to sleep. Move to freelist.
  374. */
  375. static void __io_worker_idle(struct io_wq *wq, struct io_worker *worker)
  376. __must_hold(wq->lock)
  377. {
  378. if (!test_bit(IO_WORKER_F_FREE, &worker->flags)) {
  379. set_bit(IO_WORKER_F_FREE, &worker->flags);
  380. hlist_nulls_add_head_rcu(&worker->nulls_node, &wq->free_list);
  381. }
  382. }
  383. static inline unsigned int io_get_work_hash(struct io_wq_work *work)
  384. {
  385. return atomic_read(&work->flags) >> IO_WQ_HASH_SHIFT;
  386. }
  387. static bool io_wait_on_hash(struct io_wq *wq, unsigned int hash)
  388. {
  389. bool ret = false;
  390. spin_lock_irq(&wq->hash->wait.lock);
  391. if (list_empty(&wq->wait.entry)) {
  392. __add_wait_queue(&wq->hash->wait, &wq->wait);
  393. if (!test_bit(hash, &wq->hash->map)) {
  394. __set_current_state(TASK_RUNNING);
  395. list_del_init(&wq->wait.entry);
  396. ret = true;
  397. }
  398. }
  399. spin_unlock_irq(&wq->hash->wait.lock);
  400. return ret;
  401. }
  402. static struct io_wq_work *io_get_next_work(struct io_wq_acct *acct,
  403. struct io_worker *worker)
  404. __must_hold(acct->lock)
  405. {
  406. struct io_wq_work_node *node, *prev;
  407. struct io_wq_work *work, *tail;
  408. unsigned int stall_hash = -1U;
  409. struct io_wq *wq = worker->wq;
  410. wq_list_for_each(node, prev, &acct->work_list) {
  411. unsigned int hash;
  412. work = container_of(node, struct io_wq_work, list);
  413. /* not hashed, can run anytime */
  414. if (!io_wq_is_hashed(work)) {
  415. wq_list_del(&acct->work_list, node, prev);
  416. return work;
  417. }
  418. hash = io_get_work_hash(work);
  419. /* all items with this hash lie in [work, tail] */
  420. tail = wq->hash_tail[hash];
  421. /* hashed, can run if not already running */
  422. if (!test_and_set_bit(hash, &wq->hash->map)) {
  423. wq->hash_tail[hash] = NULL;
  424. wq_list_cut(&acct->work_list, &tail->list, prev);
  425. return work;
  426. }
  427. if (stall_hash == -1U)
  428. stall_hash = hash;
  429. /* fast forward to a next hash, for-each will fix up @prev */
  430. node = &tail->list;
  431. }
  432. if (stall_hash != -1U) {
  433. bool unstalled;
  434. /*
  435. * Set this before dropping the lock to avoid racing with new
  436. * work being added and clearing the stalled bit.
  437. */
  438. set_bit(IO_ACCT_STALLED_BIT, &acct->flags);
  439. raw_spin_unlock(&acct->lock);
  440. unstalled = io_wait_on_hash(wq, stall_hash);
  441. raw_spin_lock(&acct->lock);
  442. if (unstalled) {
  443. clear_bit(IO_ACCT_STALLED_BIT, &acct->flags);
  444. if (wq_has_sleeper(&wq->hash->wait))
  445. wake_up(&wq->hash->wait);
  446. }
  447. }
  448. return NULL;
  449. }
  450. static void io_assign_current_work(struct io_worker *worker,
  451. struct io_wq_work *work)
  452. {
  453. if (work) {
  454. io_run_task_work();
  455. cond_resched();
  456. }
  457. raw_spin_lock(&worker->lock);
  458. worker->cur_work = work;
  459. raw_spin_unlock(&worker->lock);
  460. }
  461. /*
  462. * Called with acct->lock held, drops it before returning
  463. */
  464. static void io_worker_handle_work(struct io_wq_acct *acct,
  465. struct io_worker *worker)
  466. __releases(&acct->lock)
  467. {
  468. struct io_wq *wq = worker->wq;
  469. bool do_kill = test_bit(IO_WQ_BIT_EXIT, &wq->state);
  470. do {
  471. struct io_wq_work *work;
  472. /*
  473. * If we got some work, mark us as busy. If we didn't, but
  474. * the list isn't empty, it means we stalled on hashed work.
  475. * Mark us stalled so we don't keep looking for work when we
  476. * can't make progress, any work completion or insertion will
  477. * clear the stalled flag.
  478. */
  479. work = io_get_next_work(acct, worker);
  480. if (work) {
  481. /*
  482. * Make sure cancelation can find this, even before
  483. * it becomes the active work. That avoids a window
  484. * where the work has been removed from our general
  485. * work list, but isn't yet discoverable as the
  486. * current work item for this worker.
  487. */
  488. raw_spin_lock(&worker->lock);
  489. worker->cur_work = work;
  490. raw_spin_unlock(&worker->lock);
  491. }
  492. raw_spin_unlock(&acct->lock);
  493. if (!work)
  494. break;
  495. __io_worker_busy(wq, worker);
  496. io_assign_current_work(worker, work);
  497. __set_current_state(TASK_RUNNING);
  498. /* handle a whole dependent link */
  499. do {
  500. struct io_wq_work *next_hashed, *linked;
  501. unsigned int hash = io_get_work_hash(work);
  502. next_hashed = wq_next_work(work);
  503. if (do_kill &&
  504. (atomic_read(&work->flags) & IO_WQ_WORK_UNBOUND))
  505. atomic_or(IO_WQ_WORK_CANCEL, &work->flags);
  506. wq->do_work(work);
  507. io_assign_current_work(worker, NULL);
  508. linked = wq->free_work(work);
  509. work = next_hashed;
  510. if (!work && linked && !io_wq_is_hashed(linked)) {
  511. work = linked;
  512. linked = NULL;
  513. }
  514. io_assign_current_work(worker, work);
  515. if (linked)
  516. io_wq_enqueue(wq, linked);
  517. if (hash != -1U && !next_hashed) {
  518. /* serialize hash clear with wake_up() */
  519. spin_lock_irq(&wq->hash->wait.lock);
  520. clear_bit(hash, &wq->hash->map);
  521. clear_bit(IO_ACCT_STALLED_BIT, &acct->flags);
  522. spin_unlock_irq(&wq->hash->wait.lock);
  523. if (wq_has_sleeper(&wq->hash->wait))
  524. wake_up(&wq->hash->wait);
  525. }
  526. } while (work);
  527. if (!__io_acct_run_queue(acct))
  528. break;
  529. raw_spin_lock(&acct->lock);
  530. } while (1);
  531. }
  532. static int io_wq_worker(void *data)
  533. {
  534. struct io_worker *worker = data;
  535. struct io_wq_acct *acct = io_wq_get_acct(worker);
  536. struct io_wq *wq = worker->wq;
  537. bool exit_mask = false, last_timeout = false;
  538. char buf[TASK_COMM_LEN];
  539. set_mask_bits(&worker->flags, 0,
  540. BIT(IO_WORKER_F_UP) | BIT(IO_WORKER_F_RUNNING));
  541. snprintf(buf, sizeof(buf), "iou-wrk-%d", wq->task->pid);
  542. set_task_comm(current, buf);
  543. while (!test_bit(IO_WQ_BIT_EXIT, &wq->state)) {
  544. long ret;
  545. set_current_state(TASK_INTERRUPTIBLE);
  546. /*
  547. * If we have work to do, io_acct_run_queue() returns with
  548. * the acct->lock held. If not, it will drop it.
  549. */
  550. while (io_acct_run_queue(acct))
  551. io_worker_handle_work(acct, worker);
  552. raw_spin_lock(&wq->lock);
  553. /*
  554. * Last sleep timed out. Exit if we're not the last worker,
  555. * or if someone modified our affinity.
  556. */
  557. if (last_timeout && (exit_mask || acct->nr_workers > 1)) {
  558. acct->nr_workers--;
  559. raw_spin_unlock(&wq->lock);
  560. __set_current_state(TASK_RUNNING);
  561. break;
  562. }
  563. last_timeout = false;
  564. __io_worker_idle(wq, worker);
  565. raw_spin_unlock(&wq->lock);
  566. if (io_run_task_work())
  567. continue;
  568. ret = schedule_timeout(WORKER_IDLE_TIMEOUT);
  569. if (signal_pending(current)) {
  570. struct ksignal ksig;
  571. if (!get_signal(&ksig))
  572. continue;
  573. break;
  574. }
  575. if (!ret) {
  576. last_timeout = true;
  577. exit_mask = !cpumask_test_cpu(raw_smp_processor_id(),
  578. wq->cpu_mask);
  579. }
  580. }
  581. if (test_bit(IO_WQ_BIT_EXIT, &wq->state) && io_acct_run_queue(acct))
  582. io_worker_handle_work(acct, worker);
  583. io_worker_exit(worker);
  584. return 0;
  585. }
  586. /*
  587. * Called when a worker is scheduled in. Mark us as currently running.
  588. */
  589. void io_wq_worker_running(struct task_struct *tsk)
  590. {
  591. struct io_worker *worker = tsk->worker_private;
  592. if (!worker)
  593. return;
  594. if (!test_bit(IO_WORKER_F_UP, &worker->flags))
  595. return;
  596. if (test_bit(IO_WORKER_F_RUNNING, &worker->flags))
  597. return;
  598. set_bit(IO_WORKER_F_RUNNING, &worker->flags);
  599. io_wq_inc_running(worker);
  600. }
  601. /*
  602. * Called when worker is going to sleep. If there are no workers currently
  603. * running and we have work pending, wake up a free one or create a new one.
  604. */
  605. void io_wq_worker_sleeping(struct task_struct *tsk)
  606. {
  607. struct io_worker *worker = tsk->worker_private;
  608. if (!worker)
  609. return;
  610. if (!test_bit(IO_WORKER_F_UP, &worker->flags))
  611. return;
  612. if (!test_bit(IO_WORKER_F_RUNNING, &worker->flags))
  613. return;
  614. clear_bit(IO_WORKER_F_RUNNING, &worker->flags);
  615. io_wq_dec_running(worker);
  616. }
  617. static void io_init_new_worker(struct io_wq *wq, struct io_worker *worker,
  618. struct task_struct *tsk)
  619. {
  620. tsk->worker_private = worker;
  621. worker->task = tsk;
  622. set_cpus_allowed_ptr(tsk, wq->cpu_mask);
  623. raw_spin_lock(&wq->lock);
  624. hlist_nulls_add_head_rcu(&worker->nulls_node, &wq->free_list);
  625. list_add_tail_rcu(&worker->all_list, &wq->all_list);
  626. set_bit(IO_WORKER_F_FREE, &worker->flags);
  627. raw_spin_unlock(&wq->lock);
  628. wake_up_new_task(tsk);
  629. }
  630. static bool io_wq_work_match_all(struct io_wq_work *work, void *data)
  631. {
  632. return true;
  633. }
  634. static inline bool io_should_retry_thread(struct io_worker *worker, long err)
  635. {
  636. /*
  637. * Prevent perpetual task_work retry, if the task (or its group) is
  638. * exiting.
  639. */
  640. if (fatal_signal_pending(current))
  641. return false;
  642. if (worker->init_retries++ >= WORKER_INIT_LIMIT)
  643. return false;
  644. switch (err) {
  645. case -EAGAIN:
  646. case -ERESTARTSYS:
  647. case -ERESTARTNOINTR:
  648. case -ERESTARTNOHAND:
  649. return true;
  650. default:
  651. return false;
  652. }
  653. }
  654. static void create_worker_cont(struct callback_head *cb)
  655. {
  656. struct io_worker *worker;
  657. struct task_struct *tsk;
  658. struct io_wq *wq;
  659. worker = container_of(cb, struct io_worker, create_work);
  660. clear_bit_unlock(0, &worker->create_state);
  661. wq = worker->wq;
  662. tsk = create_io_thread(io_wq_worker, worker, NUMA_NO_NODE);
  663. if (!IS_ERR(tsk)) {
  664. io_init_new_worker(wq, worker, tsk);
  665. io_worker_release(worker);
  666. return;
  667. } else if (!io_should_retry_thread(worker, PTR_ERR(tsk))) {
  668. struct io_wq_acct *acct = io_wq_get_acct(worker);
  669. atomic_dec(&acct->nr_running);
  670. raw_spin_lock(&wq->lock);
  671. acct->nr_workers--;
  672. if (!acct->nr_workers) {
  673. struct io_cb_cancel_data match = {
  674. .fn = io_wq_work_match_all,
  675. .cancel_all = true,
  676. };
  677. raw_spin_unlock(&wq->lock);
  678. while (io_acct_cancel_pending_work(wq, acct, &match))
  679. ;
  680. } else {
  681. raw_spin_unlock(&wq->lock);
  682. }
  683. io_worker_ref_put(wq);
  684. kfree(worker);
  685. return;
  686. }
  687. /* re-create attempts grab a new worker ref, drop the existing one */
  688. io_worker_release(worker);
  689. schedule_work(&worker->work);
  690. }
  691. static void io_workqueue_create(struct work_struct *work)
  692. {
  693. struct io_worker *worker = container_of(work, struct io_worker, work);
  694. struct io_wq_acct *acct = io_wq_get_acct(worker);
  695. if (!io_queue_worker_create(worker, acct, create_worker_cont))
  696. kfree(worker);
  697. }
  698. static bool create_io_worker(struct io_wq *wq, int index)
  699. {
  700. struct io_wq_acct *acct = &wq->acct[index];
  701. struct io_worker *worker;
  702. struct task_struct *tsk;
  703. __set_current_state(TASK_RUNNING);
  704. worker = kzalloc(sizeof(*worker), GFP_KERNEL);
  705. if (!worker) {
  706. fail:
  707. atomic_dec(&acct->nr_running);
  708. raw_spin_lock(&wq->lock);
  709. acct->nr_workers--;
  710. raw_spin_unlock(&wq->lock);
  711. io_worker_ref_put(wq);
  712. return false;
  713. }
  714. refcount_set(&worker->ref, 1);
  715. worker->wq = wq;
  716. raw_spin_lock_init(&worker->lock);
  717. init_completion(&worker->ref_done);
  718. if (index == IO_WQ_ACCT_BOUND)
  719. set_bit(IO_WORKER_F_BOUND, &worker->flags);
  720. tsk = create_io_thread(io_wq_worker, worker, NUMA_NO_NODE);
  721. if (!IS_ERR(tsk)) {
  722. io_init_new_worker(wq, worker, tsk);
  723. } else if (!io_should_retry_thread(worker, PTR_ERR(tsk))) {
  724. kfree(worker);
  725. goto fail;
  726. } else {
  727. INIT_WORK(&worker->work, io_workqueue_create);
  728. schedule_work(&worker->work);
  729. }
  730. return true;
  731. }
  732. /*
  733. * Iterate the passed in list and call the specific function for each
  734. * worker that isn't exiting
  735. */
  736. static bool io_wq_for_each_worker(struct io_wq *wq,
  737. bool (*func)(struct io_worker *, void *),
  738. void *data)
  739. {
  740. struct io_worker *worker;
  741. bool ret = false;
  742. list_for_each_entry_rcu(worker, &wq->all_list, all_list) {
  743. if (io_worker_get(worker)) {
  744. /* no task if node is/was offline */
  745. if (worker->task)
  746. ret = func(worker, data);
  747. io_worker_release(worker);
  748. if (ret)
  749. break;
  750. }
  751. }
  752. return ret;
  753. }
  754. static bool io_wq_worker_wake(struct io_worker *worker, void *data)
  755. {
  756. __set_notify_signal(worker->task);
  757. wake_up_process(worker->task);
  758. return false;
  759. }
  760. static void io_run_cancel(struct io_wq_work *work, struct io_wq *wq)
  761. {
  762. do {
  763. atomic_or(IO_WQ_WORK_CANCEL, &work->flags);
  764. wq->do_work(work);
  765. work = wq->free_work(work);
  766. } while (work);
  767. }
  768. static void io_wq_insert_work(struct io_wq *wq, struct io_wq_work *work)
  769. {
  770. struct io_wq_acct *acct = io_work_get_acct(wq, work);
  771. unsigned int hash;
  772. struct io_wq_work *tail;
  773. if (!io_wq_is_hashed(work)) {
  774. append:
  775. wq_list_add_tail(&work->list, &acct->work_list);
  776. return;
  777. }
  778. hash = io_get_work_hash(work);
  779. tail = wq->hash_tail[hash];
  780. wq->hash_tail[hash] = work;
  781. if (!tail)
  782. goto append;
  783. wq_list_add_after(&work->list, &tail->list, &acct->work_list);
  784. }
  785. static bool io_wq_work_match_item(struct io_wq_work *work, void *data)
  786. {
  787. return work == data;
  788. }
  789. void io_wq_enqueue(struct io_wq *wq, struct io_wq_work *work)
  790. {
  791. struct io_wq_acct *acct = io_work_get_acct(wq, work);
  792. unsigned int work_flags = atomic_read(&work->flags);
  793. struct io_cb_cancel_data match = {
  794. .fn = io_wq_work_match_item,
  795. .data = work,
  796. .cancel_all = false,
  797. };
  798. bool do_create;
  799. /*
  800. * If io-wq is exiting for this task, or if the request has explicitly
  801. * been marked as one that should not get executed, cancel it here.
  802. */
  803. if (test_bit(IO_WQ_BIT_EXIT, &wq->state) ||
  804. (work_flags & IO_WQ_WORK_CANCEL)) {
  805. io_run_cancel(work, wq);
  806. return;
  807. }
  808. raw_spin_lock(&acct->lock);
  809. io_wq_insert_work(wq, work);
  810. clear_bit(IO_ACCT_STALLED_BIT, &acct->flags);
  811. raw_spin_unlock(&acct->lock);
  812. rcu_read_lock();
  813. do_create = !io_wq_activate_free_worker(wq, acct);
  814. rcu_read_unlock();
  815. if (do_create && ((work_flags & IO_WQ_WORK_CONCURRENT) ||
  816. !atomic_read(&acct->nr_running))) {
  817. bool did_create;
  818. did_create = io_wq_create_worker(wq, acct);
  819. if (likely(did_create))
  820. return;
  821. raw_spin_lock(&wq->lock);
  822. if (acct->nr_workers) {
  823. raw_spin_unlock(&wq->lock);
  824. return;
  825. }
  826. raw_spin_unlock(&wq->lock);
  827. /* fatal condition, failed to create the first worker */
  828. io_acct_cancel_pending_work(wq, acct, &match);
  829. }
  830. }
  831. /*
  832. * Work items that hash to the same value will not be done in parallel.
  833. * Used to limit concurrent writes, generally hashed by inode.
  834. */
  835. void io_wq_hash_work(struct io_wq_work *work, void *val)
  836. {
  837. unsigned int bit;
  838. bit = hash_ptr(val, IO_WQ_HASH_ORDER);
  839. atomic_or(IO_WQ_WORK_HASHED | (bit << IO_WQ_HASH_SHIFT), &work->flags);
  840. }
  841. static bool __io_wq_worker_cancel(struct io_worker *worker,
  842. struct io_cb_cancel_data *match,
  843. struct io_wq_work *work)
  844. {
  845. if (work && match->fn(work, match->data)) {
  846. atomic_or(IO_WQ_WORK_CANCEL, &work->flags);
  847. __set_notify_signal(worker->task);
  848. return true;
  849. }
  850. return false;
  851. }
  852. static bool io_wq_worker_cancel(struct io_worker *worker, void *data)
  853. {
  854. struct io_cb_cancel_data *match = data;
  855. /*
  856. * Hold the lock to avoid ->cur_work going out of scope, caller
  857. * may dereference the passed in work.
  858. */
  859. raw_spin_lock(&worker->lock);
  860. if (__io_wq_worker_cancel(worker, match, worker->cur_work))
  861. match->nr_running++;
  862. raw_spin_unlock(&worker->lock);
  863. return match->nr_running && !match->cancel_all;
  864. }
  865. static inline void io_wq_remove_pending(struct io_wq *wq,
  866. struct io_wq_work *work,
  867. struct io_wq_work_node *prev)
  868. {
  869. struct io_wq_acct *acct = io_work_get_acct(wq, work);
  870. unsigned int hash = io_get_work_hash(work);
  871. struct io_wq_work *prev_work = NULL;
  872. if (io_wq_is_hashed(work) && work == wq->hash_tail[hash]) {
  873. if (prev)
  874. prev_work = container_of(prev, struct io_wq_work, list);
  875. if (prev_work && io_get_work_hash(prev_work) == hash)
  876. wq->hash_tail[hash] = prev_work;
  877. else
  878. wq->hash_tail[hash] = NULL;
  879. }
  880. wq_list_del(&acct->work_list, &work->list, prev);
  881. }
  882. static bool io_acct_cancel_pending_work(struct io_wq *wq,
  883. struct io_wq_acct *acct,
  884. struct io_cb_cancel_data *match)
  885. {
  886. struct io_wq_work_node *node, *prev;
  887. struct io_wq_work *work;
  888. raw_spin_lock(&acct->lock);
  889. wq_list_for_each(node, prev, &acct->work_list) {
  890. work = container_of(node, struct io_wq_work, list);
  891. if (!match->fn(work, match->data))
  892. continue;
  893. io_wq_remove_pending(wq, work, prev);
  894. raw_spin_unlock(&acct->lock);
  895. io_run_cancel(work, wq);
  896. match->nr_pending++;
  897. /* not safe to continue after unlock */
  898. return true;
  899. }
  900. raw_spin_unlock(&acct->lock);
  901. return false;
  902. }
  903. static void io_wq_cancel_pending_work(struct io_wq *wq,
  904. struct io_cb_cancel_data *match)
  905. {
  906. int i;
  907. retry:
  908. for (i = 0; i < IO_WQ_ACCT_NR; i++) {
  909. struct io_wq_acct *acct = io_get_acct(wq, i == 0);
  910. if (io_acct_cancel_pending_work(wq, acct, match)) {
  911. if (match->cancel_all)
  912. goto retry;
  913. break;
  914. }
  915. }
  916. }
  917. static void io_wq_cancel_running_work(struct io_wq *wq,
  918. struct io_cb_cancel_data *match)
  919. {
  920. rcu_read_lock();
  921. io_wq_for_each_worker(wq, io_wq_worker_cancel, match);
  922. rcu_read_unlock();
  923. }
  924. enum io_wq_cancel io_wq_cancel_cb(struct io_wq *wq, work_cancel_fn *cancel,
  925. void *data, bool cancel_all)
  926. {
  927. struct io_cb_cancel_data match = {
  928. .fn = cancel,
  929. .data = data,
  930. .cancel_all = cancel_all,
  931. };
  932. /*
  933. * First check pending list, if we're lucky we can just remove it
  934. * from there. CANCEL_OK means that the work is returned as-new,
  935. * no completion will be posted for it.
  936. *
  937. * Then check if a free (going busy) or busy worker has the work
  938. * currently running. If we find it there, we'll return CANCEL_RUNNING
  939. * as an indication that we attempt to signal cancellation. The
  940. * completion will run normally in this case.
  941. *
  942. * Do both of these while holding the wq->lock, to ensure that
  943. * we'll find a work item regardless of state.
  944. */
  945. io_wq_cancel_pending_work(wq, &match);
  946. if (match.nr_pending && !match.cancel_all)
  947. return IO_WQ_CANCEL_OK;
  948. raw_spin_lock(&wq->lock);
  949. io_wq_cancel_running_work(wq, &match);
  950. raw_spin_unlock(&wq->lock);
  951. if (match.nr_running && !match.cancel_all)
  952. return IO_WQ_CANCEL_RUNNING;
  953. if (match.nr_running)
  954. return IO_WQ_CANCEL_RUNNING;
  955. if (match.nr_pending)
  956. return IO_WQ_CANCEL_OK;
  957. return IO_WQ_CANCEL_NOTFOUND;
  958. }
  959. static int io_wq_hash_wake(struct wait_queue_entry *wait, unsigned mode,
  960. int sync, void *key)
  961. {
  962. struct io_wq *wq = container_of(wait, struct io_wq, wait);
  963. int i;
  964. list_del_init(&wait->entry);
  965. rcu_read_lock();
  966. for (i = 0; i < IO_WQ_ACCT_NR; i++) {
  967. struct io_wq_acct *acct = &wq->acct[i];
  968. if (test_and_clear_bit(IO_ACCT_STALLED_BIT, &acct->flags))
  969. io_wq_activate_free_worker(wq, acct);
  970. }
  971. rcu_read_unlock();
  972. return 1;
  973. }
  974. struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
  975. {
  976. int ret, i;
  977. struct io_wq *wq;
  978. if (WARN_ON_ONCE(!data->free_work || !data->do_work))
  979. return ERR_PTR(-EINVAL);
  980. if (WARN_ON_ONCE(!bounded))
  981. return ERR_PTR(-EINVAL);
  982. wq = kzalloc(sizeof(struct io_wq), GFP_KERNEL);
  983. if (!wq)
  984. return ERR_PTR(-ENOMEM);
  985. refcount_inc(&data->hash->refs);
  986. wq->hash = data->hash;
  987. wq->free_work = data->free_work;
  988. wq->do_work = data->do_work;
  989. ret = -ENOMEM;
  990. if (!alloc_cpumask_var(&wq->cpu_mask, GFP_KERNEL))
  991. goto err;
  992. cpuset_cpus_allowed(data->task, wq->cpu_mask);
  993. wq->acct[IO_WQ_ACCT_BOUND].max_workers = bounded;
  994. wq->acct[IO_WQ_ACCT_UNBOUND].max_workers =
  995. task_rlimit(current, RLIMIT_NPROC);
  996. INIT_LIST_HEAD(&wq->wait.entry);
  997. wq->wait.func = io_wq_hash_wake;
  998. for (i = 0; i < IO_WQ_ACCT_NR; i++) {
  999. struct io_wq_acct *acct = &wq->acct[i];
  1000. acct->index = i;
  1001. atomic_set(&acct->nr_running, 0);
  1002. INIT_WQ_LIST(&acct->work_list);
  1003. raw_spin_lock_init(&acct->lock);
  1004. }
  1005. raw_spin_lock_init(&wq->lock);
  1006. INIT_HLIST_NULLS_HEAD(&wq->free_list, 0);
  1007. INIT_LIST_HEAD(&wq->all_list);
  1008. wq->task = get_task_struct(data->task);
  1009. atomic_set(&wq->worker_refs, 1);
  1010. init_completion(&wq->worker_done);
  1011. ret = cpuhp_state_add_instance_nocalls(io_wq_online, &wq->cpuhp_node);
  1012. if (ret)
  1013. goto err;
  1014. return wq;
  1015. err:
  1016. io_wq_put_hash(data->hash);
  1017. free_cpumask_var(wq->cpu_mask);
  1018. kfree(wq);
  1019. return ERR_PTR(ret);
  1020. }
  1021. static bool io_task_work_match(struct callback_head *cb, void *data)
  1022. {
  1023. struct io_worker *worker;
  1024. if (cb->func != create_worker_cb && cb->func != create_worker_cont)
  1025. return false;
  1026. worker = container_of(cb, struct io_worker, create_work);
  1027. return worker->wq == data;
  1028. }
  1029. void io_wq_exit_start(struct io_wq *wq)
  1030. {
  1031. set_bit(IO_WQ_BIT_EXIT, &wq->state);
  1032. }
  1033. static void io_wq_cancel_tw_create(struct io_wq *wq)
  1034. {
  1035. struct callback_head *cb;
  1036. while ((cb = task_work_cancel_match(wq->task, io_task_work_match, wq)) != NULL) {
  1037. struct io_worker *worker;
  1038. worker = container_of(cb, struct io_worker, create_work);
  1039. io_worker_cancel_cb(worker);
  1040. /*
  1041. * Only the worker continuation helper has worker allocated and
  1042. * hence needs freeing.
  1043. */
  1044. if (cb->func == create_worker_cont)
  1045. kfree(worker);
  1046. }
  1047. }
  1048. static void io_wq_exit_workers(struct io_wq *wq)
  1049. {
  1050. if (!wq->task)
  1051. return;
  1052. io_wq_cancel_tw_create(wq);
  1053. rcu_read_lock();
  1054. io_wq_for_each_worker(wq, io_wq_worker_wake, NULL);
  1055. rcu_read_unlock();
  1056. io_worker_ref_put(wq);
  1057. wait_for_completion(&wq->worker_done);
  1058. spin_lock_irq(&wq->hash->wait.lock);
  1059. list_del_init(&wq->wait.entry);
  1060. spin_unlock_irq(&wq->hash->wait.lock);
  1061. put_task_struct(wq->task);
  1062. wq->task = NULL;
  1063. }
  1064. static void io_wq_destroy(struct io_wq *wq)
  1065. {
  1066. struct io_cb_cancel_data match = {
  1067. .fn = io_wq_work_match_all,
  1068. .cancel_all = true,
  1069. };
  1070. cpuhp_state_remove_instance_nocalls(io_wq_online, &wq->cpuhp_node);
  1071. io_wq_cancel_pending_work(wq, &match);
  1072. free_cpumask_var(wq->cpu_mask);
  1073. io_wq_put_hash(wq->hash);
  1074. kfree(wq);
  1075. }
  1076. void io_wq_put_and_exit(struct io_wq *wq)
  1077. {
  1078. WARN_ON_ONCE(!test_bit(IO_WQ_BIT_EXIT, &wq->state));
  1079. io_wq_exit_workers(wq);
  1080. io_wq_destroy(wq);
  1081. }
  1082. struct online_data {
  1083. unsigned int cpu;
  1084. bool online;
  1085. };
  1086. static bool io_wq_worker_affinity(struct io_worker *worker, void *data)
  1087. {
  1088. struct online_data *od = data;
  1089. if (od->online)
  1090. cpumask_set_cpu(od->cpu, worker->wq->cpu_mask);
  1091. else
  1092. cpumask_clear_cpu(od->cpu, worker->wq->cpu_mask);
  1093. return false;
  1094. }
  1095. static int __io_wq_cpu_online(struct io_wq *wq, unsigned int cpu, bool online)
  1096. {
  1097. struct online_data od = {
  1098. .cpu = cpu,
  1099. .online = online
  1100. };
  1101. rcu_read_lock();
  1102. io_wq_for_each_worker(wq, io_wq_worker_affinity, &od);
  1103. rcu_read_unlock();
  1104. return 0;
  1105. }
  1106. static int io_wq_cpu_online(unsigned int cpu, struct hlist_node *node)
  1107. {
  1108. struct io_wq *wq = hlist_entry_safe(node, struct io_wq, cpuhp_node);
  1109. return __io_wq_cpu_online(wq, cpu, true);
  1110. }
  1111. static int io_wq_cpu_offline(unsigned int cpu, struct hlist_node *node)
  1112. {
  1113. struct io_wq *wq = hlist_entry_safe(node, struct io_wq, cpuhp_node);
  1114. return __io_wq_cpu_online(wq, cpu, false);
  1115. }
  1116. int io_wq_cpu_affinity(struct io_uring_task *tctx, cpumask_var_t mask)
  1117. {
  1118. cpumask_var_t allowed_mask;
  1119. int ret = 0;
  1120. if (!tctx || !tctx->io_wq)
  1121. return -EINVAL;
  1122. if (!alloc_cpumask_var(&allowed_mask, GFP_KERNEL))
  1123. return -ENOMEM;
  1124. rcu_read_lock();
  1125. cpuset_cpus_allowed(tctx->io_wq->task, allowed_mask);
  1126. if (mask) {
  1127. if (cpumask_subset(mask, allowed_mask))
  1128. cpumask_copy(tctx->io_wq->cpu_mask, mask);
  1129. else
  1130. ret = -EINVAL;
  1131. } else {
  1132. cpumask_copy(tctx->io_wq->cpu_mask, allowed_mask);
  1133. }
  1134. rcu_read_unlock();
  1135. free_cpumask_var(allowed_mask);
  1136. return ret;
  1137. }
  1138. /*
  1139. * Set max number of unbounded workers, returns old value. If new_count is 0,
  1140. * then just return the old value.
  1141. */
  1142. int io_wq_max_workers(struct io_wq *wq, int *new_count)
  1143. {
  1144. struct io_wq_acct *acct;
  1145. int prev[IO_WQ_ACCT_NR];
  1146. int i;
  1147. BUILD_BUG_ON((int) IO_WQ_ACCT_BOUND != (int) IO_WQ_BOUND);
  1148. BUILD_BUG_ON((int) IO_WQ_ACCT_UNBOUND != (int) IO_WQ_UNBOUND);
  1149. BUILD_BUG_ON((int) IO_WQ_ACCT_NR != 2);
  1150. for (i = 0; i < IO_WQ_ACCT_NR; i++) {
  1151. if (new_count[i] > task_rlimit(current, RLIMIT_NPROC))
  1152. new_count[i] = task_rlimit(current, RLIMIT_NPROC);
  1153. }
  1154. for (i = 0; i < IO_WQ_ACCT_NR; i++)
  1155. prev[i] = 0;
  1156. rcu_read_lock();
  1157. raw_spin_lock(&wq->lock);
  1158. for (i = 0; i < IO_WQ_ACCT_NR; i++) {
  1159. acct = &wq->acct[i];
  1160. prev[i] = max_t(int, acct->max_workers, prev[i]);
  1161. if (new_count[i])
  1162. acct->max_workers = new_count[i];
  1163. }
  1164. raw_spin_unlock(&wq->lock);
  1165. rcu_read_unlock();
  1166. for (i = 0; i < IO_WQ_ACCT_NR; i++)
  1167. new_count[i] = prev[i];
  1168. return 0;
  1169. }
  1170. static __init int io_wq_init(void)
  1171. {
  1172. int ret;
  1173. ret = cpuhp_setup_state_multi(CPUHP_AP_ONLINE_DYN, "io-wq/online",
  1174. io_wq_cpu_online, io_wq_cpu_offline);
  1175. if (ret < 0)
  1176. return ret;
  1177. io_wq_online = ret;
  1178. return 0;
  1179. }
  1180. subsys_initcall(io_wq_init);