sqpoll.c 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538
  1. // SPDX-License-Identifier: GPL-2.0
  2. /*
  3. * Contains the core associated with submission side polling of the SQ
  4. * ring, offloading submissions from the application to a kernel thread.
  5. */
  6. #include <linux/kernel.h>
  7. #include <linux/errno.h>
  8. #include <linux/file.h>
  9. #include <linux/mm.h>
  10. #include <linux/slab.h>
  11. #include <linux/audit.h>
  12. #include <linux/security.h>
  13. #include <linux/cpuset.h>
  14. #include <linux/io_uring.h>
  15. #include <uapi/linux/io_uring.h>
  16. #include "io_uring.h"
  17. #include "napi.h"
  18. #include "sqpoll.h"
  19. #define IORING_SQPOLL_CAP_ENTRIES_VALUE 8
  20. #define IORING_TW_CAP_ENTRIES_VALUE 8
  21. enum {
  22. IO_SQ_THREAD_SHOULD_STOP = 0,
  23. IO_SQ_THREAD_SHOULD_PARK,
  24. };
  25. void io_sq_thread_unpark(struct io_sq_data *sqd)
  26. __releases(&sqd->lock)
  27. {
  28. WARN_ON_ONCE(sqd->thread == current);
  29. /*
  30. * Do the dance but not conditional clear_bit() because it'd race with
  31. * other threads incrementing park_pending and setting the bit.
  32. */
  33. clear_bit(IO_SQ_THREAD_SHOULD_PARK, &sqd->state);
  34. if (atomic_dec_return(&sqd->park_pending))
  35. set_bit(IO_SQ_THREAD_SHOULD_PARK, &sqd->state);
  36. mutex_unlock(&sqd->lock);
  37. }
  38. void io_sq_thread_park(struct io_sq_data *sqd)
  39. __acquires(&sqd->lock)
  40. {
  41. WARN_ON_ONCE(data_race(sqd->thread) == current);
  42. atomic_inc(&sqd->park_pending);
  43. set_bit(IO_SQ_THREAD_SHOULD_PARK, &sqd->state);
  44. mutex_lock(&sqd->lock);
  45. if (sqd->thread)
  46. wake_up_process(sqd->thread);
  47. }
  48. void io_sq_thread_stop(struct io_sq_data *sqd)
  49. {
  50. WARN_ON_ONCE(sqd->thread == current);
  51. WARN_ON_ONCE(test_bit(IO_SQ_THREAD_SHOULD_STOP, &sqd->state));
  52. set_bit(IO_SQ_THREAD_SHOULD_STOP, &sqd->state);
  53. mutex_lock(&sqd->lock);
  54. if (sqd->thread)
  55. wake_up_process(sqd->thread);
  56. mutex_unlock(&sqd->lock);
  57. wait_for_completion(&sqd->exited);
  58. }
  59. void io_put_sq_data(struct io_sq_data *sqd)
  60. {
  61. if (refcount_dec_and_test(&sqd->refs)) {
  62. WARN_ON_ONCE(atomic_read(&sqd->park_pending));
  63. io_sq_thread_stop(sqd);
  64. kfree(sqd);
  65. }
  66. }
  67. static __cold void io_sqd_update_thread_idle(struct io_sq_data *sqd)
  68. {
  69. struct io_ring_ctx *ctx;
  70. unsigned sq_thread_idle = 0;
  71. list_for_each_entry(ctx, &sqd->ctx_list, sqd_list)
  72. sq_thread_idle = max(sq_thread_idle, ctx->sq_thread_idle);
  73. sqd->sq_thread_idle = sq_thread_idle;
  74. }
  75. void io_sq_thread_finish(struct io_ring_ctx *ctx)
  76. {
  77. struct io_sq_data *sqd = ctx->sq_data;
  78. if (sqd) {
  79. io_sq_thread_park(sqd);
  80. list_del_init(&ctx->sqd_list);
  81. io_sqd_update_thread_idle(sqd);
  82. io_sq_thread_unpark(sqd);
  83. io_put_sq_data(sqd);
  84. ctx->sq_data = NULL;
  85. }
  86. }
  87. static struct io_sq_data *io_attach_sq_data(struct io_uring_params *p)
  88. {
  89. struct io_ring_ctx *ctx_attach;
  90. struct io_sq_data *sqd;
  91. struct fd f;
  92. f = fdget(p->wq_fd);
  93. if (!fd_file(f))
  94. return ERR_PTR(-ENXIO);
  95. if (!io_is_uring_fops(fd_file(f))) {
  96. fdput(f);
  97. return ERR_PTR(-EINVAL);
  98. }
  99. ctx_attach = fd_file(f)->private_data;
  100. sqd = ctx_attach->sq_data;
  101. if (!sqd) {
  102. fdput(f);
  103. return ERR_PTR(-EINVAL);
  104. }
  105. if (sqd->task_tgid != current->tgid) {
  106. fdput(f);
  107. return ERR_PTR(-EPERM);
  108. }
  109. refcount_inc(&sqd->refs);
  110. fdput(f);
  111. return sqd;
  112. }
  113. static struct io_sq_data *io_get_sq_data(struct io_uring_params *p,
  114. bool *attached)
  115. {
  116. struct io_sq_data *sqd;
  117. *attached = false;
  118. if (p->flags & IORING_SETUP_ATTACH_WQ) {
  119. sqd = io_attach_sq_data(p);
  120. if (!IS_ERR(sqd)) {
  121. *attached = true;
  122. return sqd;
  123. }
  124. /* fall through for EPERM case, setup new sqd/task */
  125. if (PTR_ERR(sqd) != -EPERM)
  126. return sqd;
  127. }
  128. sqd = kzalloc(sizeof(*sqd), GFP_KERNEL);
  129. if (!sqd)
  130. return ERR_PTR(-ENOMEM);
  131. atomic_set(&sqd->park_pending, 0);
  132. refcount_set(&sqd->refs, 1);
  133. INIT_LIST_HEAD(&sqd->ctx_list);
  134. mutex_init(&sqd->lock);
  135. init_waitqueue_head(&sqd->wait);
  136. init_completion(&sqd->exited);
  137. return sqd;
  138. }
  139. static inline bool io_sqd_events_pending(struct io_sq_data *sqd)
  140. {
  141. return READ_ONCE(sqd->state);
  142. }
  143. static int __io_sq_thread(struct io_ring_ctx *ctx, bool cap_entries)
  144. {
  145. unsigned int to_submit;
  146. int ret = 0;
  147. to_submit = io_sqring_entries(ctx);
  148. /* if we're handling multiple rings, cap submit size for fairness */
  149. if (cap_entries && to_submit > IORING_SQPOLL_CAP_ENTRIES_VALUE)
  150. to_submit = IORING_SQPOLL_CAP_ENTRIES_VALUE;
  151. if (to_submit || !wq_list_empty(&ctx->iopoll_list)) {
  152. const struct cred *creds = NULL;
  153. if (ctx->sq_creds != current_cred())
  154. creds = override_creds(ctx->sq_creds);
  155. mutex_lock(&ctx->uring_lock);
  156. if (!wq_list_empty(&ctx->iopoll_list))
  157. io_do_iopoll(ctx, true);
  158. /*
  159. * Don't submit if refs are dying, good for io_uring_register(),
  160. * but also it is relied upon by io_ring_exit_work()
  161. */
  162. if (to_submit && likely(!percpu_ref_is_dying(&ctx->refs)) &&
  163. !(ctx->flags & IORING_SETUP_R_DISABLED))
  164. ret = io_submit_sqes(ctx, to_submit);
  165. mutex_unlock(&ctx->uring_lock);
  166. if (to_submit && wq_has_sleeper(&ctx->sqo_sq_wait))
  167. wake_up(&ctx->sqo_sq_wait);
  168. if (creds)
  169. revert_creds(creds);
  170. }
  171. return ret;
  172. }
  173. static bool io_sqd_handle_event(struct io_sq_data *sqd)
  174. {
  175. bool did_sig = false;
  176. struct ksignal ksig;
  177. if (test_bit(IO_SQ_THREAD_SHOULD_PARK, &sqd->state) ||
  178. signal_pending(current)) {
  179. mutex_unlock(&sqd->lock);
  180. if (signal_pending(current))
  181. did_sig = get_signal(&ksig);
  182. cond_resched();
  183. mutex_lock(&sqd->lock);
  184. sqd->sq_cpu = raw_smp_processor_id();
  185. }
  186. return did_sig || test_bit(IO_SQ_THREAD_SHOULD_STOP, &sqd->state);
  187. }
  188. /*
  189. * Run task_work, processing the retry_list first. The retry_list holds
  190. * entries that we passed on in the previous run, if we had more task_work
  191. * than we were asked to process. Newly queued task_work isn't run until the
  192. * retry list has been fully processed.
  193. */
  194. static unsigned int io_sq_tw(struct llist_node **retry_list, int max_entries)
  195. {
  196. struct io_uring_task *tctx = current->io_uring;
  197. unsigned int count = 0;
  198. if (*retry_list) {
  199. *retry_list = io_handle_tw_list(*retry_list, &count, max_entries);
  200. if (count >= max_entries)
  201. goto out;
  202. max_entries -= count;
  203. }
  204. *retry_list = tctx_task_work_run(tctx, max_entries, &count);
  205. out:
  206. if (task_work_pending(current))
  207. task_work_run();
  208. return count;
  209. }
  210. static bool io_sq_tw_pending(struct llist_node *retry_list)
  211. {
  212. struct io_uring_task *tctx = current->io_uring;
  213. return retry_list || !llist_empty(&tctx->task_list);
  214. }
  215. static void io_sq_update_worktime(struct io_sq_data *sqd, struct rusage *start)
  216. {
  217. struct rusage end;
  218. getrusage(current, RUSAGE_SELF, &end);
  219. end.ru_stime.tv_sec -= start->ru_stime.tv_sec;
  220. end.ru_stime.tv_usec -= start->ru_stime.tv_usec;
  221. sqd->work_time += end.ru_stime.tv_usec + end.ru_stime.tv_sec * 1000000;
  222. }
  223. static int io_sq_thread(void *data)
  224. {
  225. struct llist_node *retry_list = NULL;
  226. struct io_sq_data *sqd = data;
  227. struct io_ring_ctx *ctx;
  228. struct rusage start;
  229. unsigned long timeout = 0;
  230. char buf[TASK_COMM_LEN];
  231. DEFINE_WAIT(wait);
  232. /* offload context creation failed, just exit */
  233. if (!current->io_uring) {
  234. mutex_lock(&sqd->lock);
  235. sqd->thread = NULL;
  236. mutex_unlock(&sqd->lock);
  237. goto err_out;
  238. }
  239. snprintf(buf, sizeof(buf), "iou-sqp-%d", sqd->task_pid);
  240. set_task_comm(current, buf);
  241. /* reset to our pid after we've set task_comm, for fdinfo */
  242. sqd->task_pid = current->pid;
  243. if (sqd->sq_cpu != -1) {
  244. set_cpus_allowed_ptr(current, cpumask_of(sqd->sq_cpu));
  245. } else {
  246. set_cpus_allowed_ptr(current, cpu_online_mask);
  247. sqd->sq_cpu = raw_smp_processor_id();
  248. }
  249. /*
  250. * Force audit context to get setup, in case we do prep side async
  251. * operations that would trigger an audit call before any issue side
  252. * audit has been done.
  253. */
  254. audit_uring_entry(IORING_OP_NOP);
  255. audit_uring_exit(true, 0);
  256. mutex_lock(&sqd->lock);
  257. while (1) {
  258. bool cap_entries, sqt_spin = false;
  259. if (io_sqd_events_pending(sqd) || signal_pending(current)) {
  260. if (io_sqd_handle_event(sqd))
  261. break;
  262. timeout = jiffies + sqd->sq_thread_idle;
  263. }
  264. cap_entries = !list_is_singular(&sqd->ctx_list);
  265. getrusage(current, RUSAGE_SELF, &start);
  266. list_for_each_entry(ctx, &sqd->ctx_list, sqd_list) {
  267. int ret = __io_sq_thread(ctx, cap_entries);
  268. if (!sqt_spin && (ret > 0 || !wq_list_empty(&ctx->iopoll_list)))
  269. sqt_spin = true;
  270. }
  271. if (io_sq_tw(&retry_list, IORING_TW_CAP_ENTRIES_VALUE))
  272. sqt_spin = true;
  273. list_for_each_entry(ctx, &sqd->ctx_list, sqd_list)
  274. if (io_napi(ctx))
  275. io_napi_sqpoll_busy_poll(ctx);
  276. if (sqt_spin || !time_after(jiffies, timeout)) {
  277. if (sqt_spin) {
  278. io_sq_update_worktime(sqd, &start);
  279. timeout = jiffies + sqd->sq_thread_idle;
  280. }
  281. if (unlikely(need_resched())) {
  282. mutex_unlock(&sqd->lock);
  283. cond_resched();
  284. mutex_lock(&sqd->lock);
  285. sqd->sq_cpu = raw_smp_processor_id();
  286. }
  287. continue;
  288. }
  289. prepare_to_wait(&sqd->wait, &wait, TASK_INTERRUPTIBLE);
  290. if (!io_sqd_events_pending(sqd) && !io_sq_tw_pending(retry_list)) {
  291. bool needs_sched = true;
  292. list_for_each_entry(ctx, &sqd->ctx_list, sqd_list) {
  293. atomic_or(IORING_SQ_NEED_WAKEUP,
  294. &ctx->rings->sq_flags);
  295. if ((ctx->flags & IORING_SETUP_IOPOLL) &&
  296. !wq_list_empty(&ctx->iopoll_list)) {
  297. needs_sched = false;
  298. break;
  299. }
  300. /*
  301. * Ensure the store of the wakeup flag is not
  302. * reordered with the load of the SQ tail
  303. */
  304. smp_mb__after_atomic();
  305. if (io_sqring_entries(ctx)) {
  306. needs_sched = false;
  307. break;
  308. }
  309. }
  310. if (needs_sched) {
  311. mutex_unlock(&sqd->lock);
  312. schedule();
  313. mutex_lock(&sqd->lock);
  314. sqd->sq_cpu = raw_smp_processor_id();
  315. }
  316. list_for_each_entry(ctx, &sqd->ctx_list, sqd_list)
  317. atomic_andnot(IORING_SQ_NEED_WAKEUP,
  318. &ctx->rings->sq_flags);
  319. }
  320. finish_wait(&sqd->wait, &wait);
  321. timeout = jiffies + sqd->sq_thread_idle;
  322. }
  323. if (retry_list)
  324. io_sq_tw(&retry_list, UINT_MAX);
  325. io_uring_cancel_generic(true, sqd);
  326. sqd->thread = NULL;
  327. list_for_each_entry(ctx, &sqd->ctx_list, sqd_list)
  328. atomic_or(IORING_SQ_NEED_WAKEUP, &ctx->rings->sq_flags);
  329. io_run_task_work();
  330. mutex_unlock(&sqd->lock);
  331. err_out:
  332. complete(&sqd->exited);
  333. do_exit(0);
  334. }
  335. void io_sqpoll_wait_sq(struct io_ring_ctx *ctx)
  336. {
  337. DEFINE_WAIT(wait);
  338. do {
  339. if (!io_sqring_full(ctx))
  340. break;
  341. prepare_to_wait(&ctx->sqo_sq_wait, &wait, TASK_INTERRUPTIBLE);
  342. if (!io_sqring_full(ctx))
  343. break;
  344. schedule();
  345. } while (!signal_pending(current));
  346. finish_wait(&ctx->sqo_sq_wait, &wait);
  347. }
  348. __cold int io_sq_offload_create(struct io_ring_ctx *ctx,
  349. struct io_uring_params *p)
  350. {
  351. struct task_struct *task_to_put = NULL;
  352. int ret;
  353. /* Retain compatibility with failing for an invalid attach attempt */
  354. if ((ctx->flags & (IORING_SETUP_ATTACH_WQ | IORING_SETUP_SQPOLL)) ==
  355. IORING_SETUP_ATTACH_WQ) {
  356. struct fd f;
  357. f = fdget(p->wq_fd);
  358. if (!fd_file(f))
  359. return -ENXIO;
  360. if (!io_is_uring_fops(fd_file(f))) {
  361. fdput(f);
  362. return -EINVAL;
  363. }
  364. fdput(f);
  365. }
  366. if (ctx->flags & IORING_SETUP_SQPOLL) {
  367. struct task_struct *tsk;
  368. struct io_sq_data *sqd;
  369. bool attached;
  370. ret = security_uring_sqpoll();
  371. if (ret)
  372. return ret;
  373. sqd = io_get_sq_data(p, &attached);
  374. if (IS_ERR(sqd)) {
  375. ret = PTR_ERR(sqd);
  376. goto err;
  377. }
  378. ctx->sq_creds = get_current_cred();
  379. ctx->sq_data = sqd;
  380. ctx->sq_thread_idle = msecs_to_jiffies(p->sq_thread_idle);
  381. if (!ctx->sq_thread_idle)
  382. ctx->sq_thread_idle = HZ;
  383. io_sq_thread_park(sqd);
  384. list_add(&ctx->sqd_list, &sqd->ctx_list);
  385. io_sqd_update_thread_idle(sqd);
  386. /* don't attach to a dying SQPOLL thread, would be racy */
  387. ret = (attached && !sqd->thread) ? -ENXIO : 0;
  388. io_sq_thread_unpark(sqd);
  389. if (ret < 0)
  390. goto err;
  391. if (attached)
  392. return 0;
  393. if (p->flags & IORING_SETUP_SQ_AFF) {
  394. cpumask_var_t allowed_mask;
  395. int cpu = p->sq_thread_cpu;
  396. ret = -EINVAL;
  397. if (cpu >= nr_cpu_ids || !cpu_online(cpu))
  398. goto err_sqpoll;
  399. ret = -ENOMEM;
  400. if (!alloc_cpumask_var(&allowed_mask, GFP_KERNEL))
  401. goto err_sqpoll;
  402. ret = -EINVAL;
  403. cpuset_cpus_allowed(current, allowed_mask);
  404. if (!cpumask_test_cpu(cpu, allowed_mask)) {
  405. free_cpumask_var(allowed_mask);
  406. goto err_sqpoll;
  407. }
  408. free_cpumask_var(allowed_mask);
  409. sqd->sq_cpu = cpu;
  410. } else {
  411. sqd->sq_cpu = -1;
  412. }
  413. sqd->task_pid = current->pid;
  414. sqd->task_tgid = current->tgid;
  415. tsk = create_io_thread(io_sq_thread, sqd, NUMA_NO_NODE);
  416. if (IS_ERR(tsk)) {
  417. ret = PTR_ERR(tsk);
  418. goto err_sqpoll;
  419. }
  420. sqd->thread = tsk;
  421. task_to_put = get_task_struct(tsk);
  422. ret = io_uring_alloc_task_context(tsk, ctx);
  423. wake_up_new_task(tsk);
  424. if (ret)
  425. goto err;
  426. } else if (p->flags & IORING_SETUP_SQ_AFF) {
  427. /* Can't have SQ_AFF without SQPOLL */
  428. ret = -EINVAL;
  429. goto err;
  430. }
  431. if (task_to_put)
  432. put_task_struct(task_to_put);
  433. return 0;
  434. err_sqpoll:
  435. complete(&ctx->sq_data->exited);
  436. err:
  437. io_sq_thread_finish(ctx);
  438. if (task_to_put)
  439. put_task_struct(task_to_put);
  440. return ret;
  441. }
  442. __cold int io_sqpoll_wq_cpu_affinity(struct io_ring_ctx *ctx,
  443. cpumask_var_t mask)
  444. {
  445. struct io_sq_data *sqd = ctx->sq_data;
  446. int ret = -EINVAL;
  447. if (sqd) {
  448. io_sq_thread_park(sqd);
  449. /* Don't set affinity for a dying thread */
  450. if (sqd->thread)
  451. ret = io_wq_cpu_affinity(sqd->thread->io_uring, mask);
  452. io_sq_thread_unpark(sqd);
  453. }
  454. return ret;
  455. }