sqpoll.c 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573
  1. // SPDX-License-Identifier: GPL-2.0
  2. /*
  3. * Contains the core associated with submission side polling of the SQ
  4. * ring, offloading submissions from the application to a kernel thread.
  5. */
  6. #include <linux/kernel.h>
  7. #include <linux/errno.h>
  8. #include <linux/file.h>
  9. #include <linux/mm.h>
  10. #include <linux/slab.h>
  11. #include <linux/audit.h>
  12. #include <linux/security.h>
  13. #include <linux/cpuset.h>
  14. #include <linux/sched/cputime.h>
  15. #include <linux/io_uring.h>
  16. #include <uapi/linux/io_uring.h>
  17. #include "io_uring.h"
  18. #include "napi.h"
  19. #include "sqpoll.h"
  20. #define IORING_SQPOLL_CAP_ENTRIES_VALUE 8
  21. #define IORING_TW_CAP_ENTRIES_VALUE 32
  22. enum {
  23. IO_SQ_THREAD_SHOULD_STOP = 0,
  24. IO_SQ_THREAD_SHOULD_PARK,
  25. };
  26. void io_sq_thread_unpark(struct io_sq_data *sqd)
  27. __releases(&sqd->lock)
  28. {
  29. WARN_ON_ONCE(sqpoll_task_locked(sqd) == current);
  30. /*
  31. * Do the dance but not conditional clear_bit() because it'd race with
  32. * other threads incrementing park_pending and setting the bit.
  33. */
  34. clear_bit(IO_SQ_THREAD_SHOULD_PARK, &sqd->state);
  35. if (atomic_dec_return(&sqd->park_pending))
  36. set_bit(IO_SQ_THREAD_SHOULD_PARK, &sqd->state);
  37. mutex_unlock(&sqd->lock);
  38. }
  39. void io_sq_thread_park(struct io_sq_data *sqd)
  40. __acquires(&sqd->lock)
  41. {
  42. struct task_struct *tsk;
  43. atomic_inc(&sqd->park_pending);
  44. set_bit(IO_SQ_THREAD_SHOULD_PARK, &sqd->state);
  45. mutex_lock(&sqd->lock);
  46. tsk = sqpoll_task_locked(sqd);
  47. if (tsk) {
  48. WARN_ON_ONCE(tsk == current);
  49. wake_up_process(tsk);
  50. }
  51. }
  52. void io_sq_thread_stop(struct io_sq_data *sqd)
  53. {
  54. struct task_struct *tsk;
  55. WARN_ON_ONCE(test_bit(IO_SQ_THREAD_SHOULD_STOP, &sqd->state));
  56. set_bit(IO_SQ_THREAD_SHOULD_STOP, &sqd->state);
  57. mutex_lock(&sqd->lock);
  58. tsk = sqpoll_task_locked(sqd);
  59. if (tsk) {
  60. WARN_ON_ONCE(tsk == current);
  61. wake_up_process(tsk);
  62. }
  63. mutex_unlock(&sqd->lock);
  64. wait_for_completion(&sqd->exited);
  65. }
  66. void io_put_sq_data(struct io_sq_data *sqd)
  67. {
  68. if (refcount_dec_and_test(&sqd->refs)) {
  69. WARN_ON_ONCE(atomic_read(&sqd->park_pending));
  70. io_sq_thread_stop(sqd);
  71. kfree(sqd);
  72. }
  73. }
  74. static __cold void io_sqd_update_thread_idle(struct io_sq_data *sqd)
  75. {
  76. struct io_ring_ctx *ctx;
  77. unsigned sq_thread_idle = 0;
  78. list_for_each_entry(ctx, &sqd->ctx_list, sqd_list)
  79. sq_thread_idle = max(sq_thread_idle, ctx->sq_thread_idle);
  80. sqd->sq_thread_idle = sq_thread_idle;
  81. }
  82. void io_sq_thread_finish(struct io_ring_ctx *ctx)
  83. {
  84. struct io_sq_data *sqd = ctx->sq_data;
  85. if (sqd) {
  86. io_sq_thread_park(sqd);
  87. list_del_init(&ctx->sqd_list);
  88. io_sqd_update_thread_idle(sqd);
  89. io_sq_thread_unpark(sqd);
  90. io_put_sq_data(sqd);
  91. ctx->sq_data = NULL;
  92. }
  93. }
  94. static struct io_sq_data *io_attach_sq_data(struct io_uring_params *p)
  95. {
  96. struct io_ring_ctx *ctx_attach;
  97. struct io_sq_data *sqd;
  98. struct fd f;
  99. f = fdget(p->wq_fd);
  100. if (!fd_file(f))
  101. return ERR_PTR(-ENXIO);
  102. if (!io_is_uring_fops(fd_file(f))) {
  103. fdput(f);
  104. return ERR_PTR(-EINVAL);
  105. }
  106. ctx_attach = fd_file(f)->private_data;
  107. sqd = ctx_attach->sq_data;
  108. if (!sqd) {
  109. fdput(f);
  110. return ERR_PTR(-EINVAL);
  111. }
  112. if (sqd->task_tgid != current->tgid) {
  113. fdput(f);
  114. return ERR_PTR(-EPERM);
  115. }
  116. refcount_inc(&sqd->refs);
  117. fdput(f);
  118. return sqd;
  119. }
  120. static struct io_sq_data *io_get_sq_data(struct io_uring_params *p,
  121. bool *attached)
  122. {
  123. struct io_sq_data *sqd;
  124. *attached = false;
  125. if (p->flags & IORING_SETUP_ATTACH_WQ) {
  126. sqd = io_attach_sq_data(p);
  127. if (!IS_ERR(sqd)) {
  128. *attached = true;
  129. return sqd;
  130. }
  131. /* fall through for EPERM case, setup new sqd/task */
  132. if (PTR_ERR(sqd) != -EPERM)
  133. return sqd;
  134. }
  135. sqd = kzalloc(sizeof(*sqd), GFP_KERNEL);
  136. if (!sqd)
  137. return ERR_PTR(-ENOMEM);
  138. atomic_set(&sqd->park_pending, 0);
  139. refcount_set(&sqd->refs, 1);
  140. INIT_LIST_HEAD(&sqd->ctx_list);
  141. mutex_init(&sqd->lock);
  142. init_waitqueue_head(&sqd->wait);
  143. init_completion(&sqd->exited);
  144. return sqd;
  145. }
  146. static inline bool io_sqd_events_pending(struct io_sq_data *sqd)
  147. {
  148. return READ_ONCE(sqd->state);
  149. }
  150. struct io_sq_time {
  151. bool started;
  152. u64 usec;
  153. };
  154. u64 io_sq_cpu_usec(struct task_struct *tsk)
  155. {
  156. u64 utime, stime;
  157. task_cputime_adjusted(tsk, &utime, &stime);
  158. do_div(stime, 1000);
  159. return stime;
  160. }
  161. static void io_sq_update_worktime(struct io_sq_data *sqd, struct io_sq_time *ist)
  162. {
  163. if (!ist->started)
  164. return;
  165. ist->started = false;
  166. sqd->work_time += io_sq_cpu_usec(current) - ist->usec;
  167. }
  168. static void io_sq_start_worktime(struct io_sq_time *ist)
  169. {
  170. if (ist->started)
  171. return;
  172. ist->started = true;
  173. ist->usec = io_sq_cpu_usec(current);
  174. }
  175. static int __io_sq_thread(struct io_ring_ctx *ctx, struct io_sq_data *sqd,
  176. bool cap_entries, struct io_sq_time *ist)
  177. {
  178. unsigned int to_submit;
  179. int ret = 0;
  180. to_submit = io_sqring_entries(ctx);
  181. /* if we're handling multiple rings, cap submit size for fairness */
  182. if (cap_entries && to_submit > IORING_SQPOLL_CAP_ENTRIES_VALUE)
  183. to_submit = IORING_SQPOLL_CAP_ENTRIES_VALUE;
  184. if (to_submit || !wq_list_empty(&ctx->iopoll_list)) {
  185. const struct cred *creds = NULL;
  186. io_sq_start_worktime(ist);
  187. if (ctx->sq_creds != current_cred())
  188. creds = override_creds(ctx->sq_creds);
  189. mutex_lock(&ctx->uring_lock);
  190. if (!wq_list_empty(&ctx->iopoll_list))
  191. io_do_iopoll(ctx, true);
  192. /*
  193. * Don't submit if refs are dying, good for io_uring_register(),
  194. * but also it is relied upon by io_ring_exit_work()
  195. */
  196. if (to_submit && likely(!percpu_ref_is_dying(&ctx->refs)) &&
  197. !(ctx->flags & IORING_SETUP_R_DISABLED))
  198. ret = io_submit_sqes(ctx, to_submit);
  199. mutex_unlock(&ctx->uring_lock);
  200. if (to_submit && wq_has_sleeper(&ctx->sqo_sq_wait))
  201. wake_up(&ctx->sqo_sq_wait);
  202. if (creds)
  203. revert_creds(creds);
  204. }
  205. return ret;
  206. }
  207. static bool io_sqd_handle_event(struct io_sq_data *sqd)
  208. {
  209. bool did_sig = false;
  210. struct ksignal ksig;
  211. if (test_bit(IO_SQ_THREAD_SHOULD_PARK, &sqd->state) ||
  212. signal_pending(current)) {
  213. mutex_unlock(&sqd->lock);
  214. if (signal_pending(current))
  215. did_sig = get_signal(&ksig);
  216. cond_resched();
  217. mutex_lock(&sqd->lock);
  218. sqd->sq_cpu = raw_smp_processor_id();
  219. }
  220. return did_sig || test_bit(IO_SQ_THREAD_SHOULD_STOP, &sqd->state);
  221. }
  222. /*
  223. * Run task_work, processing the retry_list first. The retry_list holds
  224. * entries that we passed on in the previous run, if we had more task_work
  225. * than we were asked to process. Newly queued task_work isn't run until the
  226. * retry list has been fully processed.
  227. */
  228. static unsigned int io_sq_tw(struct llist_node **retry_list, int max_entries)
  229. {
  230. struct io_uring_task *tctx = current->io_uring;
  231. unsigned int count = 0;
  232. if (*retry_list) {
  233. *retry_list = io_handle_tw_list(*retry_list, &count, max_entries);
  234. if (count >= max_entries)
  235. goto out;
  236. max_entries -= count;
  237. }
  238. *retry_list = tctx_task_work_run(tctx, max_entries, &count);
  239. out:
  240. if (task_work_pending(current))
  241. task_work_run();
  242. return count;
  243. }
  244. static bool io_sq_tw_pending(struct llist_node *retry_list)
  245. {
  246. struct io_uring_task *tctx = current->io_uring;
  247. return retry_list || !llist_empty(&tctx->task_list);
  248. }
  249. static int io_sq_thread(void *data)
  250. {
  251. struct llist_node *retry_list = NULL;
  252. struct io_sq_data *sqd = data;
  253. struct io_ring_ctx *ctx;
  254. unsigned long timeout = 0;
  255. char buf[TASK_COMM_LEN];
  256. DEFINE_WAIT(wait);
  257. /* offload context creation failed, just exit */
  258. if (!current->io_uring) {
  259. mutex_lock(&sqd->lock);
  260. rcu_assign_pointer(sqd->thread, NULL);
  261. put_task_struct(current);
  262. mutex_unlock(&sqd->lock);
  263. goto err_out;
  264. }
  265. snprintf(buf, sizeof(buf), "iou-sqp-%d", sqd->task_pid);
  266. set_task_comm(current, buf);
  267. /* reset to our pid after we've set task_comm, for fdinfo */
  268. sqd->task_pid = current->pid;
  269. if (sqd->sq_cpu != -1) {
  270. set_cpus_allowed_ptr(current, cpumask_of(sqd->sq_cpu));
  271. } else {
  272. set_cpus_allowed_ptr(current, cpu_online_mask);
  273. sqd->sq_cpu = raw_smp_processor_id();
  274. }
  275. /*
  276. * Force audit context to get setup, in case we do prep side async
  277. * operations that would trigger an audit call before any issue side
  278. * audit has been done.
  279. */
  280. audit_uring_entry(IORING_OP_NOP);
  281. audit_uring_exit(true, 0);
  282. mutex_lock(&sqd->lock);
  283. while (1) {
  284. bool cap_entries, sqt_spin = false;
  285. struct io_sq_time ist = { };
  286. if (io_sqd_events_pending(sqd) || signal_pending(current)) {
  287. if (io_sqd_handle_event(sqd))
  288. break;
  289. timeout = jiffies + sqd->sq_thread_idle;
  290. }
  291. cap_entries = !list_is_singular(&sqd->ctx_list);
  292. list_for_each_entry(ctx, &sqd->ctx_list, sqd_list) {
  293. int ret = __io_sq_thread(ctx, sqd, cap_entries, &ist);
  294. if (!sqt_spin && (ret > 0 || !wq_list_empty(&ctx->iopoll_list)))
  295. sqt_spin = true;
  296. }
  297. if (io_sq_tw(&retry_list, IORING_TW_CAP_ENTRIES_VALUE))
  298. sqt_spin = true;
  299. list_for_each_entry(ctx, &sqd->ctx_list, sqd_list) {
  300. if (io_napi(ctx)) {
  301. io_sq_start_worktime(&ist);
  302. io_napi_sqpoll_busy_poll(ctx);
  303. }
  304. }
  305. io_sq_update_worktime(sqd, &ist);
  306. if (sqt_spin || !time_after(jiffies, timeout)) {
  307. if (sqt_spin)
  308. timeout = jiffies + sqd->sq_thread_idle;
  309. if (unlikely(need_resched())) {
  310. mutex_unlock(&sqd->lock);
  311. cond_resched();
  312. mutex_lock(&sqd->lock);
  313. sqd->sq_cpu = raw_smp_processor_id();
  314. }
  315. continue;
  316. }
  317. prepare_to_wait(&sqd->wait, &wait, TASK_INTERRUPTIBLE);
  318. if (!io_sqd_events_pending(sqd) && !io_sq_tw_pending(retry_list)) {
  319. bool needs_sched = true;
  320. list_for_each_entry(ctx, &sqd->ctx_list, sqd_list) {
  321. atomic_or(IORING_SQ_NEED_WAKEUP,
  322. &ctx->rings->sq_flags);
  323. if ((ctx->flags & IORING_SETUP_IOPOLL) &&
  324. !wq_list_empty(&ctx->iopoll_list)) {
  325. needs_sched = false;
  326. break;
  327. }
  328. /*
  329. * Ensure the store of the wakeup flag is not
  330. * reordered with the load of the SQ tail
  331. */
  332. smp_mb__after_atomic();
  333. if (io_sqring_entries(ctx)) {
  334. needs_sched = false;
  335. break;
  336. }
  337. }
  338. if (needs_sched) {
  339. mutex_unlock(&sqd->lock);
  340. schedule();
  341. mutex_lock(&sqd->lock);
  342. sqd->sq_cpu = raw_smp_processor_id();
  343. }
  344. list_for_each_entry(ctx, &sqd->ctx_list, sqd_list)
  345. atomic_andnot(IORING_SQ_NEED_WAKEUP,
  346. &ctx->rings->sq_flags);
  347. }
  348. finish_wait(&sqd->wait, &wait);
  349. timeout = jiffies + sqd->sq_thread_idle;
  350. }
  351. if (retry_list)
  352. io_sq_tw(&retry_list, UINT_MAX);
  353. io_uring_cancel_generic(true, sqd);
  354. rcu_assign_pointer(sqd->thread, NULL);
  355. put_task_struct(current);
  356. list_for_each_entry(ctx, &sqd->ctx_list, sqd_list)
  357. atomic_or(IORING_SQ_NEED_WAKEUP, &ctx->rings->sq_flags);
  358. io_run_task_work();
  359. mutex_unlock(&sqd->lock);
  360. err_out:
  361. complete(&sqd->exited);
  362. do_exit(0);
  363. }
  364. void io_sqpoll_wait_sq(struct io_ring_ctx *ctx)
  365. {
  366. DEFINE_WAIT(wait);
  367. do {
  368. if (!io_sqring_full(ctx))
  369. break;
  370. prepare_to_wait(&ctx->sqo_sq_wait, &wait, TASK_INTERRUPTIBLE);
  371. if (!io_sqring_full(ctx))
  372. break;
  373. schedule();
  374. } while (!signal_pending(current));
  375. finish_wait(&ctx->sqo_sq_wait, &wait);
  376. }
  377. __cold int io_sq_offload_create(struct io_ring_ctx *ctx,
  378. struct io_uring_params *p)
  379. {
  380. int ret;
  381. /* Retain compatibility with failing for an invalid attach attempt */
  382. if ((ctx->flags & (IORING_SETUP_ATTACH_WQ | IORING_SETUP_SQPOLL)) ==
  383. IORING_SETUP_ATTACH_WQ) {
  384. struct fd f;
  385. f = fdget(p->wq_fd);
  386. if (!fd_file(f))
  387. return -ENXIO;
  388. if (!io_is_uring_fops(fd_file(f))) {
  389. fdput(f);
  390. return -EINVAL;
  391. }
  392. fdput(f);
  393. }
  394. if (ctx->flags & IORING_SETUP_SQPOLL) {
  395. struct task_struct *tsk;
  396. struct io_sq_data *sqd;
  397. bool attached;
  398. ret = security_uring_sqpoll();
  399. if (ret)
  400. return ret;
  401. sqd = io_get_sq_data(p, &attached);
  402. if (IS_ERR(sqd)) {
  403. ret = PTR_ERR(sqd);
  404. goto err;
  405. }
  406. ctx->sq_creds = get_current_cred();
  407. ctx->sq_data = sqd;
  408. ctx->sq_thread_idle = msecs_to_jiffies(p->sq_thread_idle);
  409. if (!ctx->sq_thread_idle)
  410. ctx->sq_thread_idle = HZ;
  411. io_sq_thread_park(sqd);
  412. list_add(&ctx->sqd_list, &sqd->ctx_list);
  413. io_sqd_update_thread_idle(sqd);
  414. /* don't attach to a dying SQPOLL thread, would be racy */
  415. ret = (attached && !sqd->thread) ? -ENXIO : 0;
  416. io_sq_thread_unpark(sqd);
  417. if (ret < 0)
  418. goto err;
  419. if (attached)
  420. return 0;
  421. if (p->flags & IORING_SETUP_SQ_AFF) {
  422. cpumask_var_t allowed_mask;
  423. int cpu = p->sq_thread_cpu;
  424. ret = -EINVAL;
  425. if (cpu >= nr_cpu_ids || !cpu_online(cpu))
  426. goto err_sqpoll;
  427. ret = -ENOMEM;
  428. if (!alloc_cpumask_var(&allowed_mask, GFP_KERNEL))
  429. goto err_sqpoll;
  430. ret = -EINVAL;
  431. cpuset_cpus_allowed(current, allowed_mask);
  432. if (!cpumask_test_cpu(cpu, allowed_mask)) {
  433. free_cpumask_var(allowed_mask);
  434. goto err_sqpoll;
  435. }
  436. free_cpumask_var(allowed_mask);
  437. sqd->sq_cpu = cpu;
  438. } else {
  439. sqd->sq_cpu = -1;
  440. }
  441. sqd->task_pid = current->pid;
  442. sqd->task_tgid = current->tgid;
  443. tsk = create_io_thread(io_sq_thread, sqd, NUMA_NO_NODE);
  444. if (IS_ERR(tsk)) {
  445. ret = PTR_ERR(tsk);
  446. goto err_sqpoll;
  447. }
  448. mutex_lock(&sqd->lock);
  449. rcu_assign_pointer(sqd->thread, tsk);
  450. mutex_unlock(&sqd->lock);
  451. get_task_struct(tsk);
  452. ret = io_uring_alloc_task_context(tsk, ctx);
  453. wake_up_new_task(tsk);
  454. if (ret)
  455. goto err;
  456. } else if (p->flags & IORING_SETUP_SQ_AFF) {
  457. /* Can't have SQ_AFF without SQPOLL */
  458. ret = -EINVAL;
  459. goto err;
  460. }
  461. return 0;
  462. err_sqpoll:
  463. complete(&ctx->sq_data->exited);
  464. err:
  465. io_sq_thread_finish(ctx);
  466. return ret;
  467. }
  468. __cold int io_sqpoll_wq_cpu_affinity(struct io_ring_ctx *ctx,
  469. cpumask_var_t mask)
  470. {
  471. struct io_sq_data *sqd = ctx->sq_data;
  472. int ret = -EINVAL;
  473. if (sqd) {
  474. struct task_struct *tsk;
  475. io_sq_thread_park(sqd);
  476. /* Don't set affinity for a dying thread */
  477. tsk = sqpoll_task_locked(sqd);
  478. if (tsk)
  479. ret = io_wq_cpu_affinity(tsk->io_uring, mask);
  480. io_sq_thread_unpark(sqd);
  481. }
  482. return ret;
  483. }