io-wq.c 34 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417
  1. // SPDX-License-Identifier: GPL-2.0
  2. /*
  3. * Basic worker thread pool for io_uring
  4. *
  5. * Copyright (C) 2019 Jens Axboe
  6. *
  7. */
  8. #include <linux/kernel.h>
  9. #include <linux/init.h>
  10. #include <linux/errno.h>
  11. #include <linux/sched/signal.h>
  12. #include <linux/percpu.h>
  13. #include <linux/slab.h>
  14. #include <linux/rculist_nulls.h>
  15. #include <linux/cpu.h>
  16. #include <linux/cpuset.h>
  17. #include <linux/task_work.h>
  18. #include <linux/audit.h>
  19. #include <linux/mmu_context.h>
  20. #include <uapi/linux/io_uring.h>
  21. #include "io-wq.h"
  22. #include "slist.h"
  23. #include "io_uring.h"
  24. #define WORKER_IDLE_TIMEOUT (5 * HZ)
  25. #define WORKER_INIT_LIMIT 3
  26. enum {
  27. IO_WORKER_F_UP = 0, /* up and active */
  28. IO_WORKER_F_RUNNING = 1, /* account as running */
  29. IO_WORKER_F_FREE = 2, /* worker on free list */
  30. IO_WORKER_F_BOUND = 3, /* is doing bounded work */
  31. };
  32. enum {
  33. IO_WQ_BIT_EXIT = 0, /* wq exiting */
  34. };
  35. enum {
  36. IO_ACCT_STALLED_BIT = 0, /* stalled on hash */
  37. };
  38. /*
  39. * One for each thread in a wq pool
  40. */
  41. struct io_worker {
  42. refcount_t ref;
  43. int create_index;
  44. unsigned long flags;
  45. struct hlist_nulls_node nulls_node;
  46. struct list_head all_list;
  47. struct task_struct *task;
  48. struct io_wq *wq;
  49. struct io_wq_work *cur_work;
  50. raw_spinlock_t lock;
  51. struct completion ref_done;
  52. unsigned long create_state;
  53. struct callback_head create_work;
  54. int init_retries;
  55. union {
  56. struct rcu_head rcu;
  57. struct delayed_work work;
  58. };
  59. };
  60. #if BITS_PER_LONG == 64
  61. #define IO_WQ_HASH_ORDER 6
  62. #else
  63. #define IO_WQ_HASH_ORDER 5
  64. #endif
  65. #define IO_WQ_NR_HASH_BUCKETS (1u << IO_WQ_HASH_ORDER)
  66. struct io_wq_acct {
  67. unsigned nr_workers;
  68. unsigned max_workers;
  69. int index;
  70. atomic_t nr_running;
  71. raw_spinlock_t lock;
  72. struct io_wq_work_list work_list;
  73. unsigned long flags;
  74. };
  75. enum {
  76. IO_WQ_ACCT_BOUND,
  77. IO_WQ_ACCT_UNBOUND,
  78. IO_WQ_ACCT_NR,
  79. };
  80. /*
  81. * Per io_wq state
  82. */
  83. struct io_wq {
  84. unsigned long state;
  85. free_work_fn *free_work;
  86. io_wq_work_fn *do_work;
  87. struct io_wq_hash *hash;
  88. atomic_t worker_refs;
  89. struct completion worker_done;
  90. struct hlist_node cpuhp_node;
  91. struct task_struct *task;
  92. struct io_wq_acct acct[IO_WQ_ACCT_NR];
  93. /* lock protects access to elements below */
  94. raw_spinlock_t lock;
  95. struct hlist_nulls_head free_list;
  96. struct list_head all_list;
  97. struct wait_queue_entry wait;
  98. struct io_wq_work *hash_tail[IO_WQ_NR_HASH_BUCKETS];
  99. cpumask_var_t cpu_mask;
  100. };
  101. static enum cpuhp_state io_wq_online;
  102. struct io_cb_cancel_data {
  103. work_cancel_fn *fn;
  104. void *data;
  105. int nr_running;
  106. int nr_pending;
  107. bool cancel_all;
  108. };
  109. static bool create_io_worker(struct io_wq *wq, int index);
  110. static void io_wq_dec_running(struct io_worker *worker);
  111. static bool io_acct_cancel_pending_work(struct io_wq *wq,
  112. struct io_wq_acct *acct,
  113. struct io_cb_cancel_data *match);
  114. static void create_worker_cb(struct callback_head *cb);
  115. static void io_wq_cancel_tw_create(struct io_wq *wq);
  116. static bool io_worker_get(struct io_worker *worker)
  117. {
  118. return refcount_inc_not_zero(&worker->ref);
  119. }
  120. static void io_worker_release(struct io_worker *worker)
  121. {
  122. if (refcount_dec_and_test(&worker->ref))
  123. complete(&worker->ref_done);
  124. }
  125. static inline struct io_wq_acct *io_get_acct(struct io_wq *wq, bool bound)
  126. {
  127. return &wq->acct[bound ? IO_WQ_ACCT_BOUND : IO_WQ_ACCT_UNBOUND];
  128. }
  129. static inline struct io_wq_acct *io_work_get_acct(struct io_wq *wq,
  130. struct io_wq_work *work)
  131. {
  132. return io_get_acct(wq, !(atomic_read(&work->flags) & IO_WQ_WORK_UNBOUND));
  133. }
  134. static inline struct io_wq_acct *io_wq_get_acct(struct io_worker *worker)
  135. {
  136. return io_get_acct(worker->wq, test_bit(IO_WORKER_F_BOUND, &worker->flags));
  137. }
  138. static void io_worker_ref_put(struct io_wq *wq)
  139. {
  140. if (atomic_dec_and_test(&wq->worker_refs))
  141. complete(&wq->worker_done);
  142. }
  143. bool io_wq_worker_stopped(void)
  144. {
  145. struct io_worker *worker = current->worker_private;
  146. if (WARN_ON_ONCE(!io_wq_current_is_worker()))
  147. return true;
  148. return test_bit(IO_WQ_BIT_EXIT, &worker->wq->state);
  149. }
  150. static void io_worker_cancel_cb(struct io_worker *worker)
  151. {
  152. struct io_wq_acct *acct = io_wq_get_acct(worker);
  153. struct io_wq *wq = worker->wq;
  154. atomic_dec(&acct->nr_running);
  155. raw_spin_lock(&wq->lock);
  156. acct->nr_workers--;
  157. raw_spin_unlock(&wq->lock);
  158. io_worker_ref_put(wq);
  159. clear_bit_unlock(0, &worker->create_state);
  160. io_worker_release(worker);
  161. }
  162. static bool io_task_worker_match(struct callback_head *cb, void *data)
  163. {
  164. struct io_worker *worker;
  165. if (cb->func != create_worker_cb)
  166. return false;
  167. worker = container_of(cb, struct io_worker, create_work);
  168. return worker == data;
  169. }
  170. static void io_worker_exit(struct io_worker *worker)
  171. {
  172. struct io_wq *wq = worker->wq;
  173. while (1) {
  174. struct callback_head *cb = task_work_cancel_match(wq->task,
  175. io_task_worker_match, worker);
  176. if (!cb)
  177. break;
  178. io_worker_cancel_cb(worker);
  179. }
  180. io_worker_release(worker);
  181. wait_for_completion(&worker->ref_done);
  182. raw_spin_lock(&wq->lock);
  183. if (test_bit(IO_WORKER_F_FREE, &worker->flags))
  184. hlist_nulls_del_rcu(&worker->nulls_node);
  185. list_del_rcu(&worker->all_list);
  186. raw_spin_unlock(&wq->lock);
  187. io_wq_dec_running(worker);
  188. /*
  189. * this worker is a goner, clear ->worker_private to avoid any
  190. * inc/dec running calls that could happen as part of exit from
  191. * touching 'worker'.
  192. */
  193. current->worker_private = NULL;
  194. kfree_rcu(worker, rcu);
  195. io_worker_ref_put(wq);
  196. do_exit(0);
  197. }
  198. static inline bool __io_acct_run_queue(struct io_wq_acct *acct)
  199. {
  200. return !test_bit(IO_ACCT_STALLED_BIT, &acct->flags) &&
  201. !wq_list_empty(&acct->work_list);
  202. }
  203. /*
  204. * If there's work to do, returns true with acct->lock acquired. If not,
  205. * returns false with no lock held.
  206. */
  207. static inline bool io_acct_run_queue(struct io_wq_acct *acct)
  208. __acquires(&acct->lock)
  209. {
  210. raw_spin_lock(&acct->lock);
  211. if (__io_acct_run_queue(acct))
  212. return true;
  213. raw_spin_unlock(&acct->lock);
  214. return false;
  215. }
  216. /*
  217. * Check head of free list for an available worker. If one isn't available,
  218. * caller must create one.
  219. */
  220. static bool io_wq_activate_free_worker(struct io_wq *wq,
  221. struct io_wq_acct *acct)
  222. __must_hold(RCU)
  223. {
  224. struct hlist_nulls_node *n;
  225. struct io_worker *worker;
  226. /*
  227. * Iterate free_list and see if we can find an idle worker to
  228. * activate. If a given worker is on the free_list but in the process
  229. * of exiting, keep trying.
  230. */
  231. hlist_nulls_for_each_entry_rcu(worker, n, &wq->free_list, nulls_node) {
  232. if (!io_worker_get(worker))
  233. continue;
  234. if (io_wq_get_acct(worker) != acct) {
  235. io_worker_release(worker);
  236. continue;
  237. }
  238. /*
  239. * If the worker is already running, it's either already
  240. * starting work or finishing work. In either case, if it does
  241. * to go sleep, we'll kick off a new task for this work anyway.
  242. */
  243. wake_up_process(worker->task);
  244. io_worker_release(worker);
  245. return true;
  246. }
  247. return false;
  248. }
  249. /*
  250. * We need a worker. If we find a free one, we're good. If not, and we're
  251. * below the max number of workers, create one.
  252. */
  253. static bool io_wq_create_worker(struct io_wq *wq, struct io_wq_acct *acct)
  254. {
  255. /*
  256. * Most likely an attempt to queue unbounded work on an io_wq that
  257. * wasn't setup with any unbounded workers.
  258. */
  259. if (unlikely(!acct->max_workers))
  260. pr_warn_once("io-wq is not configured for unbound workers");
  261. raw_spin_lock(&wq->lock);
  262. if (acct->nr_workers >= acct->max_workers) {
  263. raw_spin_unlock(&wq->lock);
  264. return true;
  265. }
  266. acct->nr_workers++;
  267. raw_spin_unlock(&wq->lock);
  268. atomic_inc(&acct->nr_running);
  269. atomic_inc(&wq->worker_refs);
  270. return create_io_worker(wq, acct->index);
  271. }
  272. static void io_wq_inc_running(struct io_worker *worker)
  273. {
  274. struct io_wq_acct *acct = io_wq_get_acct(worker);
  275. atomic_inc(&acct->nr_running);
  276. }
  277. static void create_worker_cb(struct callback_head *cb)
  278. {
  279. struct io_worker *worker;
  280. struct io_wq *wq;
  281. struct io_wq_acct *acct;
  282. bool do_create = false;
  283. worker = container_of(cb, struct io_worker, create_work);
  284. wq = worker->wq;
  285. acct = &wq->acct[worker->create_index];
  286. raw_spin_lock(&wq->lock);
  287. if (acct->nr_workers < acct->max_workers) {
  288. acct->nr_workers++;
  289. do_create = true;
  290. }
  291. raw_spin_unlock(&wq->lock);
  292. if (do_create) {
  293. create_io_worker(wq, worker->create_index);
  294. } else {
  295. atomic_dec(&acct->nr_running);
  296. io_worker_ref_put(wq);
  297. }
  298. clear_bit_unlock(0, &worker->create_state);
  299. io_worker_release(worker);
  300. }
  301. static bool io_queue_worker_create(struct io_worker *worker,
  302. struct io_wq_acct *acct,
  303. task_work_func_t func)
  304. {
  305. struct io_wq *wq = worker->wq;
  306. /* raced with exit, just ignore create call */
  307. if (test_bit(IO_WQ_BIT_EXIT, &wq->state))
  308. goto fail;
  309. if (!io_worker_get(worker))
  310. goto fail;
  311. /*
  312. * create_state manages ownership of create_work/index. We should
  313. * only need one entry per worker, as the worker going to sleep
  314. * will trigger the condition, and waking will clear it once it
  315. * runs the task_work.
  316. */
  317. if (test_bit(0, &worker->create_state) ||
  318. test_and_set_bit_lock(0, &worker->create_state))
  319. goto fail_release;
  320. atomic_inc(&wq->worker_refs);
  321. init_task_work(&worker->create_work, func);
  322. worker->create_index = acct->index;
  323. if (!task_work_add(wq->task, &worker->create_work, TWA_SIGNAL)) {
  324. /*
  325. * EXIT may have been set after checking it above, check after
  326. * adding the task_work and remove any creation item if it is
  327. * now set. wq exit does that too, but we can have added this
  328. * work item after we canceled in io_wq_exit_workers().
  329. */
  330. if (test_bit(IO_WQ_BIT_EXIT, &wq->state))
  331. io_wq_cancel_tw_create(wq);
  332. io_worker_ref_put(wq);
  333. return true;
  334. }
  335. io_worker_ref_put(wq);
  336. clear_bit_unlock(0, &worker->create_state);
  337. fail_release:
  338. io_worker_release(worker);
  339. fail:
  340. atomic_dec(&acct->nr_running);
  341. io_worker_ref_put(wq);
  342. return false;
  343. }
  344. static void io_wq_dec_running(struct io_worker *worker)
  345. {
  346. struct io_wq_acct *acct = io_wq_get_acct(worker);
  347. struct io_wq *wq = worker->wq;
  348. if (!test_bit(IO_WORKER_F_UP, &worker->flags))
  349. return;
  350. if (!atomic_dec_and_test(&acct->nr_running))
  351. return;
  352. if (!io_acct_run_queue(acct))
  353. return;
  354. raw_spin_unlock(&acct->lock);
  355. atomic_inc(&acct->nr_running);
  356. atomic_inc(&wq->worker_refs);
  357. io_queue_worker_create(worker, acct, create_worker_cb);
  358. }
  359. /*
  360. * Worker will start processing some work. Move it to the busy list, if
  361. * it's currently on the freelist
  362. */
  363. static void __io_worker_busy(struct io_wq *wq, struct io_worker *worker)
  364. {
  365. if (test_bit(IO_WORKER_F_FREE, &worker->flags)) {
  366. clear_bit(IO_WORKER_F_FREE, &worker->flags);
  367. raw_spin_lock(&wq->lock);
  368. hlist_nulls_del_init_rcu(&worker->nulls_node);
  369. raw_spin_unlock(&wq->lock);
  370. }
  371. }
  372. /*
  373. * No work, worker going to sleep. Move to freelist.
  374. */
  375. static void __io_worker_idle(struct io_wq *wq, struct io_worker *worker)
  376. __must_hold(wq->lock)
  377. {
  378. if (!test_bit(IO_WORKER_F_FREE, &worker->flags)) {
  379. set_bit(IO_WORKER_F_FREE, &worker->flags);
  380. hlist_nulls_add_head_rcu(&worker->nulls_node, &wq->free_list);
  381. }
  382. }
  383. static inline unsigned int io_get_work_hash(struct io_wq_work *work)
  384. {
  385. return atomic_read(&work->flags) >> IO_WQ_HASH_SHIFT;
  386. }
  387. static bool io_wait_on_hash(struct io_wq *wq, unsigned int hash)
  388. {
  389. bool ret = false;
  390. spin_lock_irq(&wq->hash->wait.lock);
  391. if (list_empty(&wq->wait.entry)) {
  392. __add_wait_queue(&wq->hash->wait, &wq->wait);
  393. if (!test_bit(hash, &wq->hash->map)) {
  394. __set_current_state(TASK_RUNNING);
  395. list_del_init(&wq->wait.entry);
  396. ret = true;
  397. }
  398. }
  399. spin_unlock_irq(&wq->hash->wait.lock);
  400. return ret;
  401. }
  402. static struct io_wq_work *io_get_next_work(struct io_wq_acct *acct,
  403. struct io_worker *worker)
  404. __must_hold(acct->lock)
  405. {
  406. struct io_wq_work_node *node, *prev;
  407. struct io_wq_work *work, *tail;
  408. unsigned int stall_hash = -1U;
  409. struct io_wq *wq = worker->wq;
  410. wq_list_for_each(node, prev, &acct->work_list) {
  411. unsigned int hash;
  412. work = container_of(node, struct io_wq_work, list);
  413. /* not hashed, can run anytime */
  414. if (!io_wq_is_hashed(work)) {
  415. wq_list_del(&acct->work_list, node, prev);
  416. return work;
  417. }
  418. hash = io_get_work_hash(work);
  419. /* all items with this hash lie in [work, tail] */
  420. tail = wq->hash_tail[hash];
  421. /* hashed, can run if not already running */
  422. if (!test_and_set_bit(hash, &wq->hash->map)) {
  423. wq->hash_tail[hash] = NULL;
  424. wq_list_cut(&acct->work_list, &tail->list, prev);
  425. return work;
  426. }
  427. if (stall_hash == -1U)
  428. stall_hash = hash;
  429. /* fast forward to a next hash, for-each will fix up @prev */
  430. node = &tail->list;
  431. }
  432. if (stall_hash != -1U) {
  433. bool unstalled;
  434. /*
  435. * Set this before dropping the lock to avoid racing with new
  436. * work being added and clearing the stalled bit.
  437. */
  438. set_bit(IO_ACCT_STALLED_BIT, &acct->flags);
  439. raw_spin_unlock(&acct->lock);
  440. unstalled = io_wait_on_hash(wq, stall_hash);
  441. raw_spin_lock(&acct->lock);
  442. if (unstalled) {
  443. clear_bit(IO_ACCT_STALLED_BIT, &acct->flags);
  444. if (wq_has_sleeper(&wq->hash->wait))
  445. wake_up(&wq->hash->wait);
  446. }
  447. }
  448. return NULL;
  449. }
  450. static void io_assign_current_work(struct io_worker *worker,
  451. struct io_wq_work *work)
  452. {
  453. if (work) {
  454. io_run_task_work();
  455. cond_resched();
  456. }
  457. raw_spin_lock(&worker->lock);
  458. worker->cur_work = work;
  459. raw_spin_unlock(&worker->lock);
  460. }
  461. /*
  462. * Called with acct->lock held, drops it before returning
  463. */
  464. static void io_worker_handle_work(struct io_wq_acct *acct,
  465. struct io_worker *worker)
  466. __releases(&acct->lock)
  467. {
  468. struct io_wq *wq = worker->wq;
  469. bool do_kill = test_bit(IO_WQ_BIT_EXIT, &wq->state);
  470. do {
  471. struct io_wq_work *work;
  472. /*
  473. * If we got some work, mark us as busy. If we didn't, but
  474. * the list isn't empty, it means we stalled on hashed work.
  475. * Mark us stalled so we don't keep looking for work when we
  476. * can't make progress, any work completion or insertion will
  477. * clear the stalled flag.
  478. */
  479. work = io_get_next_work(acct, worker);
  480. if (work) {
  481. /*
  482. * Make sure cancelation can find this, even before
  483. * it becomes the active work. That avoids a window
  484. * where the work has been removed from our general
  485. * work list, but isn't yet discoverable as the
  486. * current work item for this worker.
  487. */
  488. raw_spin_lock(&worker->lock);
  489. worker->cur_work = work;
  490. raw_spin_unlock(&worker->lock);
  491. }
  492. raw_spin_unlock(&acct->lock);
  493. if (!work)
  494. break;
  495. __io_worker_busy(wq, worker);
  496. io_assign_current_work(worker, work);
  497. __set_current_state(TASK_RUNNING);
  498. /* handle a whole dependent link */
  499. do {
  500. struct io_wq_work *next_hashed, *linked;
  501. unsigned int hash = io_get_work_hash(work);
  502. next_hashed = wq_next_work(work);
  503. if (do_kill &&
  504. (atomic_read(&work->flags) & IO_WQ_WORK_UNBOUND))
  505. atomic_or(IO_WQ_WORK_CANCEL, &work->flags);
  506. wq->do_work(work);
  507. io_assign_current_work(worker, NULL);
  508. linked = wq->free_work(work);
  509. work = next_hashed;
  510. if (!work && linked && !io_wq_is_hashed(linked)) {
  511. work = linked;
  512. linked = NULL;
  513. }
  514. io_assign_current_work(worker, work);
  515. if (linked)
  516. io_wq_enqueue(wq, linked);
  517. if (hash != -1U && !next_hashed) {
  518. /* serialize hash clear with wake_up() */
  519. spin_lock_irq(&wq->hash->wait.lock);
  520. clear_bit(hash, &wq->hash->map);
  521. clear_bit(IO_ACCT_STALLED_BIT, &acct->flags);
  522. spin_unlock_irq(&wq->hash->wait.lock);
  523. if (wq_has_sleeper(&wq->hash->wait))
  524. wake_up(&wq->hash->wait);
  525. }
  526. } while (work);
  527. if (!__io_acct_run_queue(acct))
  528. break;
  529. raw_spin_lock(&acct->lock);
  530. } while (1);
  531. }
  532. static int io_wq_worker(void *data)
  533. {
  534. struct io_worker *worker = data;
  535. struct io_wq_acct *acct = io_wq_get_acct(worker);
  536. struct io_wq *wq = worker->wq;
  537. bool exit_mask = false, last_timeout = false;
  538. char buf[TASK_COMM_LEN];
  539. set_mask_bits(&worker->flags, 0,
  540. BIT(IO_WORKER_F_UP) | BIT(IO_WORKER_F_RUNNING));
  541. snprintf(buf, sizeof(buf), "iou-wrk-%d", wq->task->pid);
  542. set_task_comm(current, buf);
  543. while (!test_bit(IO_WQ_BIT_EXIT, &wq->state)) {
  544. long ret;
  545. set_current_state(TASK_INTERRUPTIBLE);
  546. /*
  547. * If we have work to do, io_acct_run_queue() returns with
  548. * the acct->lock held. If not, it will drop it.
  549. */
  550. while (io_acct_run_queue(acct))
  551. io_worker_handle_work(acct, worker);
  552. raw_spin_lock(&wq->lock);
  553. /*
  554. * Last sleep timed out. Exit if we're not the last worker,
  555. * or if someone modified our affinity.
  556. */
  557. if (last_timeout && (exit_mask || acct->nr_workers > 1)) {
  558. acct->nr_workers--;
  559. raw_spin_unlock(&wq->lock);
  560. __set_current_state(TASK_RUNNING);
  561. break;
  562. }
  563. last_timeout = false;
  564. __io_worker_idle(wq, worker);
  565. raw_spin_unlock(&wq->lock);
  566. if (io_run_task_work())
  567. continue;
  568. ret = schedule_timeout(WORKER_IDLE_TIMEOUT);
  569. if (signal_pending(current)) {
  570. struct ksignal ksig;
  571. if (!get_signal(&ksig))
  572. continue;
  573. break;
  574. }
  575. if (!ret) {
  576. last_timeout = true;
  577. exit_mask = !cpumask_test_cpu(raw_smp_processor_id(),
  578. wq->cpu_mask);
  579. }
  580. }
  581. if (test_bit(IO_WQ_BIT_EXIT, &wq->state) && io_acct_run_queue(acct))
  582. io_worker_handle_work(acct, worker);
  583. io_worker_exit(worker);
  584. return 0;
  585. }
  586. /*
  587. * Called when a worker is scheduled in. Mark us as currently running.
  588. */
  589. void io_wq_worker_running(struct task_struct *tsk)
  590. {
  591. struct io_worker *worker = tsk->worker_private;
  592. if (!worker)
  593. return;
  594. if (!test_bit(IO_WORKER_F_UP, &worker->flags))
  595. return;
  596. if (test_bit(IO_WORKER_F_RUNNING, &worker->flags))
  597. return;
  598. set_bit(IO_WORKER_F_RUNNING, &worker->flags);
  599. io_wq_inc_running(worker);
  600. }
  601. /*
  602. * Called when worker is going to sleep. If there are no workers currently
  603. * running and we have work pending, wake up a free one or create a new one.
  604. */
  605. void io_wq_worker_sleeping(struct task_struct *tsk)
  606. {
  607. struct io_worker *worker = tsk->worker_private;
  608. if (!worker)
  609. return;
  610. if (!test_bit(IO_WORKER_F_UP, &worker->flags))
  611. return;
  612. if (!test_bit(IO_WORKER_F_RUNNING, &worker->flags))
  613. return;
  614. clear_bit(IO_WORKER_F_RUNNING, &worker->flags);
  615. io_wq_dec_running(worker);
  616. }
  617. static void io_init_new_worker(struct io_wq *wq, struct io_worker *worker,
  618. struct task_struct *tsk)
  619. {
  620. tsk->worker_private = worker;
  621. worker->task = tsk;
  622. set_cpus_allowed_ptr(tsk, wq->cpu_mask);
  623. raw_spin_lock(&wq->lock);
  624. hlist_nulls_add_head_rcu(&worker->nulls_node, &wq->free_list);
  625. list_add_tail_rcu(&worker->all_list, &wq->all_list);
  626. set_bit(IO_WORKER_F_FREE, &worker->flags);
  627. raw_spin_unlock(&wq->lock);
  628. wake_up_new_task(tsk);
  629. }
  630. static bool io_wq_work_match_all(struct io_wq_work *work, void *data)
  631. {
  632. return true;
  633. }
  634. static inline bool io_should_retry_thread(struct io_worker *worker, long err)
  635. {
  636. /*
  637. * Prevent perpetual task_work retry, if the task (or its group) is
  638. * exiting.
  639. */
  640. if (fatal_signal_pending(current))
  641. return false;
  642. if (worker->init_retries++ >= WORKER_INIT_LIMIT)
  643. return false;
  644. switch (err) {
  645. case -EAGAIN:
  646. case -ERESTARTSYS:
  647. case -ERESTARTNOINTR:
  648. case -ERESTARTNOHAND:
  649. return true;
  650. default:
  651. return false;
  652. }
  653. }
  654. static void queue_create_worker_retry(struct io_worker *worker)
  655. {
  656. /*
  657. * We only bother retrying because there's a chance that the
  658. * failure to create a worker is due to some temporary condition
  659. * in the forking task (e.g. outstanding signal); give the task
  660. * some time to clear that condition.
  661. */
  662. schedule_delayed_work(&worker->work,
  663. msecs_to_jiffies(worker->init_retries * 5));
  664. }
  665. static void create_worker_cont(struct callback_head *cb)
  666. {
  667. struct io_worker *worker;
  668. struct task_struct *tsk;
  669. struct io_wq *wq;
  670. worker = container_of(cb, struct io_worker, create_work);
  671. clear_bit_unlock(0, &worker->create_state);
  672. wq = worker->wq;
  673. tsk = create_io_thread(io_wq_worker, worker, NUMA_NO_NODE);
  674. if (!IS_ERR(tsk)) {
  675. io_init_new_worker(wq, worker, tsk);
  676. io_worker_release(worker);
  677. return;
  678. } else if (!io_should_retry_thread(worker, PTR_ERR(tsk))) {
  679. struct io_wq_acct *acct = io_wq_get_acct(worker);
  680. atomic_dec(&acct->nr_running);
  681. raw_spin_lock(&wq->lock);
  682. acct->nr_workers--;
  683. if (!acct->nr_workers) {
  684. struct io_cb_cancel_data match = {
  685. .fn = io_wq_work_match_all,
  686. .cancel_all = true,
  687. };
  688. raw_spin_unlock(&wq->lock);
  689. while (io_acct_cancel_pending_work(wq, acct, &match))
  690. ;
  691. } else {
  692. raw_spin_unlock(&wq->lock);
  693. }
  694. io_worker_ref_put(wq);
  695. kfree(worker);
  696. return;
  697. }
  698. /* re-create attempts grab a new worker ref, drop the existing one */
  699. io_worker_release(worker);
  700. queue_create_worker_retry(worker);
  701. }
  702. static void io_workqueue_create(struct work_struct *work)
  703. {
  704. struct io_worker *worker = container_of(work, struct io_worker,
  705. work.work);
  706. struct io_wq_acct *acct = io_wq_get_acct(worker);
  707. if (!io_queue_worker_create(worker, acct, create_worker_cont))
  708. kfree(worker);
  709. }
  710. static bool create_io_worker(struct io_wq *wq, int index)
  711. {
  712. struct io_wq_acct *acct = &wq->acct[index];
  713. struct io_worker *worker;
  714. struct task_struct *tsk;
  715. __set_current_state(TASK_RUNNING);
  716. worker = kzalloc(sizeof(*worker), GFP_KERNEL);
  717. if (!worker) {
  718. fail:
  719. atomic_dec(&acct->nr_running);
  720. raw_spin_lock(&wq->lock);
  721. acct->nr_workers--;
  722. raw_spin_unlock(&wq->lock);
  723. io_worker_ref_put(wq);
  724. return false;
  725. }
  726. refcount_set(&worker->ref, 1);
  727. worker->wq = wq;
  728. raw_spin_lock_init(&worker->lock);
  729. init_completion(&worker->ref_done);
  730. if (index == IO_WQ_ACCT_BOUND)
  731. set_bit(IO_WORKER_F_BOUND, &worker->flags);
  732. tsk = create_io_thread(io_wq_worker, worker, NUMA_NO_NODE);
  733. if (!IS_ERR(tsk)) {
  734. io_init_new_worker(wq, worker, tsk);
  735. } else if (!io_should_retry_thread(worker, PTR_ERR(tsk))) {
  736. kfree(worker);
  737. goto fail;
  738. } else {
  739. INIT_DELAYED_WORK(&worker->work, io_workqueue_create);
  740. queue_create_worker_retry(worker);
  741. }
  742. return true;
  743. }
  744. /*
  745. * Iterate the passed in list and call the specific function for each
  746. * worker that isn't exiting
  747. */
  748. static bool io_wq_for_each_worker(struct io_wq *wq,
  749. bool (*func)(struct io_worker *, void *),
  750. void *data)
  751. {
  752. struct io_worker *worker;
  753. bool ret = false;
  754. list_for_each_entry_rcu(worker, &wq->all_list, all_list) {
  755. if (io_worker_get(worker)) {
  756. /* no task if node is/was offline */
  757. if (worker->task)
  758. ret = func(worker, data);
  759. io_worker_release(worker);
  760. if (ret)
  761. break;
  762. }
  763. }
  764. return ret;
  765. }
  766. static bool io_wq_worker_wake(struct io_worker *worker, void *data)
  767. {
  768. __set_notify_signal(worker->task);
  769. wake_up_process(worker->task);
  770. return false;
  771. }
  772. static void io_run_cancel(struct io_wq_work *work, struct io_wq *wq)
  773. {
  774. do {
  775. atomic_or(IO_WQ_WORK_CANCEL, &work->flags);
  776. wq->do_work(work);
  777. work = wq->free_work(work);
  778. } while (work);
  779. }
  780. static void io_wq_insert_work(struct io_wq *wq, struct io_wq_work *work)
  781. {
  782. struct io_wq_acct *acct = io_work_get_acct(wq, work);
  783. unsigned int hash;
  784. struct io_wq_work *tail;
  785. if (!io_wq_is_hashed(work)) {
  786. append:
  787. wq_list_add_tail(&work->list, &acct->work_list);
  788. return;
  789. }
  790. hash = io_get_work_hash(work);
  791. tail = wq->hash_tail[hash];
  792. wq->hash_tail[hash] = work;
  793. if (!tail)
  794. goto append;
  795. wq_list_add_after(&work->list, &tail->list, &acct->work_list);
  796. }
  797. static bool io_wq_work_match_item(struct io_wq_work *work, void *data)
  798. {
  799. return work == data;
  800. }
  801. void io_wq_enqueue(struct io_wq *wq, struct io_wq_work *work)
  802. {
  803. struct io_wq_acct *acct = io_work_get_acct(wq, work);
  804. unsigned int work_flags = atomic_read(&work->flags);
  805. struct io_cb_cancel_data match = {
  806. .fn = io_wq_work_match_item,
  807. .data = work,
  808. .cancel_all = false,
  809. };
  810. bool do_create;
  811. /*
  812. * If io-wq is exiting for this task, or if the request has explicitly
  813. * been marked as one that should not get executed, cancel it here.
  814. */
  815. if (test_bit(IO_WQ_BIT_EXIT, &wq->state) ||
  816. (work_flags & IO_WQ_WORK_CANCEL)) {
  817. io_run_cancel(work, wq);
  818. return;
  819. }
  820. raw_spin_lock(&acct->lock);
  821. io_wq_insert_work(wq, work);
  822. clear_bit(IO_ACCT_STALLED_BIT, &acct->flags);
  823. raw_spin_unlock(&acct->lock);
  824. rcu_read_lock();
  825. do_create = !io_wq_activate_free_worker(wq, acct);
  826. rcu_read_unlock();
  827. if (do_create && ((work_flags & IO_WQ_WORK_CONCURRENT) ||
  828. !atomic_read(&acct->nr_running))) {
  829. bool did_create;
  830. did_create = io_wq_create_worker(wq, acct);
  831. if (likely(did_create))
  832. return;
  833. raw_spin_lock(&wq->lock);
  834. if (acct->nr_workers) {
  835. raw_spin_unlock(&wq->lock);
  836. return;
  837. }
  838. raw_spin_unlock(&wq->lock);
  839. /* fatal condition, failed to create the first worker */
  840. io_acct_cancel_pending_work(wq, acct, &match);
  841. }
  842. }
  843. /*
  844. * Work items that hash to the same value will not be done in parallel.
  845. * Used to limit concurrent writes, generally hashed by inode.
  846. */
  847. void io_wq_hash_work(struct io_wq_work *work, void *val)
  848. {
  849. unsigned int bit;
  850. bit = hash_ptr(val, IO_WQ_HASH_ORDER);
  851. atomic_or(IO_WQ_WORK_HASHED | (bit << IO_WQ_HASH_SHIFT), &work->flags);
  852. }
  853. static bool __io_wq_worker_cancel(struct io_worker *worker,
  854. struct io_cb_cancel_data *match,
  855. struct io_wq_work *work)
  856. {
  857. if (work && match->fn(work, match->data)) {
  858. atomic_or(IO_WQ_WORK_CANCEL, &work->flags);
  859. __set_notify_signal(worker->task);
  860. return true;
  861. }
  862. return false;
  863. }
  864. static bool io_wq_worker_cancel(struct io_worker *worker, void *data)
  865. {
  866. struct io_cb_cancel_data *match = data;
  867. /*
  868. * Hold the lock to avoid ->cur_work going out of scope, caller
  869. * may dereference the passed in work.
  870. */
  871. raw_spin_lock(&worker->lock);
  872. if (__io_wq_worker_cancel(worker, match, worker->cur_work))
  873. match->nr_running++;
  874. raw_spin_unlock(&worker->lock);
  875. return match->nr_running && !match->cancel_all;
  876. }
  877. static inline void io_wq_remove_pending(struct io_wq *wq,
  878. struct io_wq_work *work,
  879. struct io_wq_work_node *prev)
  880. {
  881. struct io_wq_acct *acct = io_work_get_acct(wq, work);
  882. unsigned int hash = io_get_work_hash(work);
  883. struct io_wq_work *prev_work = NULL;
  884. if (io_wq_is_hashed(work) && work == wq->hash_tail[hash]) {
  885. if (prev)
  886. prev_work = container_of(prev, struct io_wq_work, list);
  887. if (prev_work && io_get_work_hash(prev_work) == hash)
  888. wq->hash_tail[hash] = prev_work;
  889. else
  890. wq->hash_tail[hash] = NULL;
  891. }
  892. wq_list_del(&acct->work_list, &work->list, prev);
  893. }
  894. static bool io_acct_cancel_pending_work(struct io_wq *wq,
  895. struct io_wq_acct *acct,
  896. struct io_cb_cancel_data *match)
  897. {
  898. struct io_wq_work_node *node, *prev;
  899. struct io_wq_work *work;
  900. raw_spin_lock(&acct->lock);
  901. wq_list_for_each(node, prev, &acct->work_list) {
  902. work = container_of(node, struct io_wq_work, list);
  903. if (!match->fn(work, match->data))
  904. continue;
  905. io_wq_remove_pending(wq, work, prev);
  906. raw_spin_unlock(&acct->lock);
  907. io_run_cancel(work, wq);
  908. match->nr_pending++;
  909. /* not safe to continue after unlock */
  910. return true;
  911. }
  912. raw_spin_unlock(&acct->lock);
  913. return false;
  914. }
  915. static void io_wq_cancel_pending_work(struct io_wq *wq,
  916. struct io_cb_cancel_data *match)
  917. {
  918. int i;
  919. retry:
  920. for (i = 0; i < IO_WQ_ACCT_NR; i++) {
  921. struct io_wq_acct *acct = io_get_acct(wq, i == 0);
  922. if (io_acct_cancel_pending_work(wq, acct, match)) {
  923. if (match->cancel_all)
  924. goto retry;
  925. break;
  926. }
  927. }
  928. }
  929. static void io_wq_cancel_running_work(struct io_wq *wq,
  930. struct io_cb_cancel_data *match)
  931. {
  932. rcu_read_lock();
  933. io_wq_for_each_worker(wq, io_wq_worker_cancel, match);
  934. rcu_read_unlock();
  935. }
  936. enum io_wq_cancel io_wq_cancel_cb(struct io_wq *wq, work_cancel_fn *cancel,
  937. void *data, bool cancel_all)
  938. {
  939. struct io_cb_cancel_data match = {
  940. .fn = cancel,
  941. .data = data,
  942. .cancel_all = cancel_all,
  943. };
  944. /*
  945. * First check pending list, if we're lucky we can just remove it
  946. * from there. CANCEL_OK means that the work is returned as-new,
  947. * no completion will be posted for it.
  948. *
  949. * Then check if a free (going busy) or busy worker has the work
  950. * currently running. If we find it there, we'll return CANCEL_RUNNING
  951. * as an indication that we attempt to signal cancellation. The
  952. * completion will run normally in this case.
  953. *
  954. * Do both of these while holding the wq->lock, to ensure that
  955. * we'll find a work item regardless of state.
  956. */
  957. io_wq_cancel_pending_work(wq, &match);
  958. if (match.nr_pending && !match.cancel_all)
  959. return IO_WQ_CANCEL_OK;
  960. raw_spin_lock(&wq->lock);
  961. io_wq_cancel_running_work(wq, &match);
  962. raw_spin_unlock(&wq->lock);
  963. if (match.nr_running && !match.cancel_all)
  964. return IO_WQ_CANCEL_RUNNING;
  965. if (match.nr_running)
  966. return IO_WQ_CANCEL_RUNNING;
  967. if (match.nr_pending)
  968. return IO_WQ_CANCEL_OK;
  969. return IO_WQ_CANCEL_NOTFOUND;
  970. }
  971. static int io_wq_hash_wake(struct wait_queue_entry *wait, unsigned mode,
  972. int sync, void *key)
  973. {
  974. struct io_wq *wq = container_of(wait, struct io_wq, wait);
  975. int i;
  976. list_del_init(&wait->entry);
  977. rcu_read_lock();
  978. for (i = 0; i < IO_WQ_ACCT_NR; i++) {
  979. struct io_wq_acct *acct = &wq->acct[i];
  980. if (test_and_clear_bit(IO_ACCT_STALLED_BIT, &acct->flags))
  981. io_wq_activate_free_worker(wq, acct);
  982. }
  983. rcu_read_unlock();
  984. return 1;
  985. }
  986. struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
  987. {
  988. int ret, i;
  989. struct io_wq *wq;
  990. if (WARN_ON_ONCE(!data->free_work || !data->do_work))
  991. return ERR_PTR(-EINVAL);
  992. if (WARN_ON_ONCE(!bounded))
  993. return ERR_PTR(-EINVAL);
  994. wq = kzalloc(sizeof(struct io_wq), GFP_KERNEL);
  995. if (!wq)
  996. return ERR_PTR(-ENOMEM);
  997. refcount_inc(&data->hash->refs);
  998. wq->hash = data->hash;
  999. wq->free_work = data->free_work;
  1000. wq->do_work = data->do_work;
  1001. ret = -ENOMEM;
  1002. if (!alloc_cpumask_var(&wq->cpu_mask, GFP_KERNEL))
  1003. goto err;
  1004. cpuset_cpus_allowed(data->task, wq->cpu_mask);
  1005. wq->acct[IO_WQ_ACCT_BOUND].max_workers = bounded;
  1006. wq->acct[IO_WQ_ACCT_UNBOUND].max_workers =
  1007. task_rlimit(current, RLIMIT_NPROC);
  1008. INIT_LIST_HEAD(&wq->wait.entry);
  1009. wq->wait.func = io_wq_hash_wake;
  1010. for (i = 0; i < IO_WQ_ACCT_NR; i++) {
  1011. struct io_wq_acct *acct = &wq->acct[i];
  1012. acct->index = i;
  1013. atomic_set(&acct->nr_running, 0);
  1014. INIT_WQ_LIST(&acct->work_list);
  1015. raw_spin_lock_init(&acct->lock);
  1016. }
  1017. raw_spin_lock_init(&wq->lock);
  1018. INIT_HLIST_NULLS_HEAD(&wq->free_list, 0);
  1019. INIT_LIST_HEAD(&wq->all_list);
  1020. wq->task = get_task_struct(data->task);
  1021. atomic_set(&wq->worker_refs, 1);
  1022. init_completion(&wq->worker_done);
  1023. ret = cpuhp_state_add_instance_nocalls(io_wq_online, &wq->cpuhp_node);
  1024. if (ret) {
  1025. put_task_struct(wq->task);
  1026. goto err;
  1027. }
  1028. return wq;
  1029. err:
  1030. io_wq_put_hash(data->hash);
  1031. free_cpumask_var(wq->cpu_mask);
  1032. kfree(wq);
  1033. return ERR_PTR(ret);
  1034. }
  1035. static bool io_task_work_match(struct callback_head *cb, void *data)
  1036. {
  1037. struct io_worker *worker;
  1038. if (cb->func != create_worker_cb && cb->func != create_worker_cont)
  1039. return false;
  1040. worker = container_of(cb, struct io_worker, create_work);
  1041. return worker->wq == data;
  1042. }
  1043. void io_wq_exit_start(struct io_wq *wq)
  1044. {
  1045. set_bit(IO_WQ_BIT_EXIT, &wq->state);
  1046. }
  1047. static void io_wq_cancel_tw_create(struct io_wq *wq)
  1048. {
  1049. struct callback_head *cb;
  1050. while ((cb = task_work_cancel_match(wq->task, io_task_work_match, wq)) != NULL) {
  1051. struct io_worker *worker;
  1052. worker = container_of(cb, struct io_worker, create_work);
  1053. io_worker_cancel_cb(worker);
  1054. /*
  1055. * Only the worker continuation helper has worker allocated and
  1056. * hence needs freeing.
  1057. */
  1058. if (cb->func == create_worker_cont)
  1059. kfree(worker);
  1060. }
  1061. }
  1062. static void io_wq_exit_workers(struct io_wq *wq)
  1063. {
  1064. if (!wq->task)
  1065. return;
  1066. io_wq_cancel_tw_create(wq);
  1067. rcu_read_lock();
  1068. io_wq_for_each_worker(wq, io_wq_worker_wake, NULL);
  1069. rcu_read_unlock();
  1070. io_worker_ref_put(wq);
  1071. wait_for_completion(&wq->worker_done);
  1072. spin_lock_irq(&wq->hash->wait.lock);
  1073. list_del_init(&wq->wait.entry);
  1074. spin_unlock_irq(&wq->hash->wait.lock);
  1075. put_task_struct(wq->task);
  1076. wq->task = NULL;
  1077. }
  1078. static void io_wq_destroy(struct io_wq *wq)
  1079. {
  1080. struct io_cb_cancel_data match = {
  1081. .fn = io_wq_work_match_all,
  1082. .cancel_all = true,
  1083. };
  1084. cpuhp_state_remove_instance_nocalls(io_wq_online, &wq->cpuhp_node);
  1085. io_wq_cancel_pending_work(wq, &match);
  1086. free_cpumask_var(wq->cpu_mask);
  1087. io_wq_put_hash(wq->hash);
  1088. kfree(wq);
  1089. }
  1090. void io_wq_put_and_exit(struct io_wq *wq)
  1091. {
  1092. WARN_ON_ONCE(!test_bit(IO_WQ_BIT_EXIT, &wq->state));
  1093. io_wq_exit_workers(wq);
  1094. io_wq_destroy(wq);
  1095. }
  1096. struct online_data {
  1097. unsigned int cpu;
  1098. bool online;
  1099. };
  1100. static bool io_wq_worker_affinity(struct io_worker *worker, void *data)
  1101. {
  1102. struct online_data *od = data;
  1103. if (od->online)
  1104. cpumask_set_cpu(od->cpu, worker->wq->cpu_mask);
  1105. else
  1106. cpumask_clear_cpu(od->cpu, worker->wq->cpu_mask);
  1107. return false;
  1108. }
  1109. static int __io_wq_cpu_online(struct io_wq *wq, unsigned int cpu, bool online)
  1110. {
  1111. struct online_data od = {
  1112. .cpu = cpu,
  1113. .online = online
  1114. };
  1115. rcu_read_lock();
  1116. io_wq_for_each_worker(wq, io_wq_worker_affinity, &od);
  1117. rcu_read_unlock();
  1118. return 0;
  1119. }
  1120. static int io_wq_cpu_online(unsigned int cpu, struct hlist_node *node)
  1121. {
  1122. struct io_wq *wq = hlist_entry_safe(node, struct io_wq, cpuhp_node);
  1123. return __io_wq_cpu_online(wq, cpu, true);
  1124. }
  1125. static int io_wq_cpu_offline(unsigned int cpu, struct hlist_node *node)
  1126. {
  1127. struct io_wq *wq = hlist_entry_safe(node, struct io_wq, cpuhp_node);
  1128. return __io_wq_cpu_online(wq, cpu, false);
  1129. }
  1130. int io_wq_cpu_affinity(struct io_uring_task *tctx, cpumask_var_t mask)
  1131. {
  1132. cpumask_var_t allowed_mask;
  1133. int ret = 0;
  1134. if (!tctx || !tctx->io_wq)
  1135. return -EINVAL;
  1136. if (!alloc_cpumask_var(&allowed_mask, GFP_KERNEL))
  1137. return -ENOMEM;
  1138. rcu_read_lock();
  1139. cpuset_cpus_allowed(tctx->io_wq->task, allowed_mask);
  1140. if (mask) {
  1141. if (cpumask_subset(mask, allowed_mask))
  1142. cpumask_copy(tctx->io_wq->cpu_mask, mask);
  1143. else
  1144. ret = -EINVAL;
  1145. } else {
  1146. cpumask_copy(tctx->io_wq->cpu_mask, allowed_mask);
  1147. }
  1148. rcu_read_unlock();
  1149. free_cpumask_var(allowed_mask);
  1150. return ret;
  1151. }
  1152. /*
  1153. * Set max number of unbounded workers, returns old value. If new_count is 0,
  1154. * then just return the old value.
  1155. */
  1156. int io_wq_max_workers(struct io_wq *wq, int *new_count)
  1157. {
  1158. struct io_wq_acct *acct;
  1159. int prev[IO_WQ_ACCT_NR];
  1160. int i;
  1161. BUILD_BUG_ON((int) IO_WQ_ACCT_BOUND != (int) IO_WQ_BOUND);
  1162. BUILD_BUG_ON((int) IO_WQ_ACCT_UNBOUND != (int) IO_WQ_UNBOUND);
  1163. BUILD_BUG_ON((int) IO_WQ_ACCT_NR != 2);
  1164. for (i = 0; i < IO_WQ_ACCT_NR; i++) {
  1165. if (new_count[i] > task_rlimit(current, RLIMIT_NPROC))
  1166. new_count[i] = task_rlimit(current, RLIMIT_NPROC);
  1167. }
  1168. for (i = 0; i < IO_WQ_ACCT_NR; i++)
  1169. prev[i] = 0;
  1170. rcu_read_lock();
  1171. raw_spin_lock(&wq->lock);
  1172. for (i = 0; i < IO_WQ_ACCT_NR; i++) {
  1173. acct = &wq->acct[i];
  1174. prev[i] = max_t(int, acct->max_workers, prev[i]);
  1175. if (new_count[i])
  1176. acct->max_workers = new_count[i];
  1177. }
  1178. raw_spin_unlock(&wq->lock);
  1179. rcu_read_unlock();
  1180. for (i = 0; i < IO_WQ_ACCT_NR; i++)
  1181. new_count[i] = prev[i];
  1182. return 0;
  1183. }
  1184. static __init int io_wq_init(void)
  1185. {
  1186. int ret;
  1187. ret = cpuhp_setup_state_multi(CPUHP_AP_ONLINE_DYN, "io-wq/online",
  1188. io_wq_cpu_online, io_wq_cpu_offline);
  1189. if (ret < 0)
  1190. return ret;
  1191. io_wq_online = ret;
  1192. return 0;
  1193. }
  1194. subsys_initcall(io_wq_init);