recvmsg.c 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570
  1. // SPDX-License-Identifier: GPL-2.0-or-later
  2. /* RxRPC recvmsg() implementation
  3. *
  4. * Copyright (C) 2007 Red Hat, Inc. All Rights Reserved.
  5. * Written by David Howells (dhowells@redhat.com)
  6. */
  7. #define pr_fmt(fmt) KBUILD_MODNAME ": " fmt
  8. #include <linux/net.h>
  9. #include <linux/skbuff.h>
  10. #include <linux/export.h>
  11. #include <linux/sched/signal.h>
  12. #include <net/sock.h>
  13. #include <net/af_rxrpc.h>
  14. #include "ar-internal.h"
  15. /*
  16. * Post a call for attention by the socket or kernel service. Further
  17. * notifications are suppressed by putting recvmsg_link on a dummy queue.
  18. */
  19. void rxrpc_notify_socket(struct rxrpc_call *call)
  20. {
  21. struct rxrpc_sock *rx;
  22. struct sock *sk;
  23. _enter("%d", call->debug_id);
  24. if (!list_empty(&call->recvmsg_link))
  25. return;
  26. rcu_read_lock();
  27. rx = rcu_dereference(call->socket);
  28. sk = &rx->sk;
  29. if (rx && sk->sk_state < RXRPC_CLOSE) {
  30. if (call->notify_rx) {
  31. spin_lock(&call->notify_lock);
  32. call->notify_rx(sk, call, call->user_call_ID);
  33. spin_unlock(&call->notify_lock);
  34. } else {
  35. spin_lock(&rx->recvmsg_lock);
  36. if (list_empty(&call->recvmsg_link)) {
  37. rxrpc_get_call(call, rxrpc_call_get_notify_socket);
  38. list_add_tail(&call->recvmsg_link, &rx->recvmsg_q);
  39. }
  40. spin_unlock(&rx->recvmsg_lock);
  41. if (!sock_flag(sk, SOCK_DEAD)) {
  42. _debug("call %ps", sk->sk_data_ready);
  43. sk->sk_data_ready(sk);
  44. }
  45. }
  46. }
  47. rcu_read_unlock();
  48. _leave("");
  49. }
  50. /*
  51. * Pass a call terminating message to userspace.
  52. */
  53. static int rxrpc_recvmsg_term(struct rxrpc_call *call, struct msghdr *msg)
  54. {
  55. u32 tmp = 0;
  56. int ret;
  57. switch (call->completion) {
  58. case RXRPC_CALL_SUCCEEDED:
  59. ret = 0;
  60. if (rxrpc_is_service_call(call))
  61. ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ACK, 0, &tmp);
  62. break;
  63. case RXRPC_CALL_REMOTELY_ABORTED:
  64. tmp = call->abort_code;
  65. ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ABORT, 4, &tmp);
  66. break;
  67. case RXRPC_CALL_LOCALLY_ABORTED:
  68. tmp = call->abort_code;
  69. ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ABORT, 4, &tmp);
  70. break;
  71. case RXRPC_CALL_NETWORK_ERROR:
  72. tmp = -call->error;
  73. ret = put_cmsg(msg, SOL_RXRPC, RXRPC_NET_ERROR, 4, &tmp);
  74. break;
  75. case RXRPC_CALL_LOCAL_ERROR:
  76. tmp = -call->error;
  77. ret = put_cmsg(msg, SOL_RXRPC, RXRPC_LOCAL_ERROR, 4, &tmp);
  78. break;
  79. default:
  80. pr_err("Invalid terminal call state %u\n", call->completion);
  81. BUG();
  82. break;
  83. }
  84. trace_rxrpc_recvdata(call, rxrpc_recvmsg_terminal,
  85. call->ackr_window - 1,
  86. call->rx_pkt_offset, call->rx_pkt_len, ret);
  87. return ret;
  88. }
  89. /*
  90. * Discard a packet we've used up and advance the Rx window by one.
  91. */
  92. static void rxrpc_rotate_rx_window(struct rxrpc_call *call)
  93. {
  94. struct rxrpc_skb_priv *sp;
  95. struct sk_buff *skb;
  96. rxrpc_serial_t serial;
  97. rxrpc_seq_t old_consumed = call->rx_consumed, tseq;
  98. bool last;
  99. int acked;
  100. _enter("%d", call->debug_id);
  101. skb = skb_dequeue(&call->recvmsg_queue);
  102. rxrpc_see_skb(skb, rxrpc_skb_see_rotate);
  103. sp = rxrpc_skb(skb);
  104. tseq = sp->hdr.seq;
  105. serial = sp->hdr.serial;
  106. last = sp->hdr.flags & RXRPC_LAST_PACKET;
  107. /* Barrier against rxrpc_input_data(). */
  108. if (after(tseq, call->rx_consumed))
  109. smp_store_release(&call->rx_consumed, tseq);
  110. rxrpc_free_skb(skb, rxrpc_skb_put_rotate);
  111. trace_rxrpc_receive(call, last ? rxrpc_receive_rotate_last : rxrpc_receive_rotate,
  112. serial, call->rx_consumed);
  113. if (last)
  114. set_bit(RXRPC_CALL_RECVMSG_READ_ALL, &call->flags);
  115. /* Check to see if there's an ACK that needs sending. */
  116. acked = atomic_add_return(call->rx_consumed - old_consumed,
  117. &call->ackr_nr_consumed);
  118. if (acked > 8 &&
  119. !test_and_set_bit(RXRPC_CALL_RX_IS_IDLE, &call->flags))
  120. rxrpc_poke_call(call, rxrpc_call_poke_idle);
  121. }
  122. /*
  123. * Decrypt and verify a DATA packet.
  124. */
  125. static int rxrpc_verify_data(struct rxrpc_call *call, struct sk_buff *skb)
  126. {
  127. struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
  128. if (sp->flags & RXRPC_RX_VERIFIED)
  129. return 0;
  130. return call->security->verify_packet(call, skb);
  131. }
  132. /*
  133. * Deliver messages to a call. This keeps processing packets until the buffer
  134. * is filled and we find either more DATA (returns 0) or the end of the DATA
  135. * (returns 1). If more packets are required, it returns -EAGAIN and if the
  136. * call has failed it returns -EIO.
  137. */
  138. static int rxrpc_recvmsg_data(struct socket *sock, struct rxrpc_call *call,
  139. struct msghdr *msg, struct iov_iter *iter,
  140. size_t len, int flags, size_t *_offset)
  141. {
  142. struct rxrpc_skb_priv *sp;
  143. struct sk_buff *skb;
  144. rxrpc_seq_t seq = 0;
  145. size_t remain;
  146. unsigned int rx_pkt_offset, rx_pkt_len;
  147. int copy, ret = -EAGAIN, ret2;
  148. rx_pkt_offset = call->rx_pkt_offset;
  149. rx_pkt_len = call->rx_pkt_len;
  150. if (rxrpc_call_has_failed(call)) {
  151. seq = call->ackr_window - 1;
  152. ret = -EIO;
  153. goto done;
  154. }
  155. if (test_bit(RXRPC_CALL_RECVMSG_READ_ALL, &call->flags)) {
  156. seq = call->ackr_window - 1;
  157. ret = 1;
  158. goto done;
  159. }
  160. /* No one else can be removing stuff from the queue, so we shouldn't
  161. * need the Rx lock to walk it.
  162. */
  163. skb = skb_peek(&call->recvmsg_queue);
  164. while (skb) {
  165. rxrpc_see_skb(skb, rxrpc_skb_see_recvmsg);
  166. sp = rxrpc_skb(skb);
  167. seq = sp->hdr.seq;
  168. if (!(flags & MSG_PEEK))
  169. trace_rxrpc_receive(call, rxrpc_receive_front,
  170. sp->hdr.serial, seq);
  171. if (msg)
  172. sock_recv_timestamp(msg, sock->sk, skb);
  173. if (rx_pkt_offset == 0) {
  174. ret2 = rxrpc_verify_data(call, skb);
  175. trace_rxrpc_recvdata(call, rxrpc_recvmsg_next, seq,
  176. sp->offset, sp->len, ret2);
  177. if (ret2 < 0) {
  178. kdebug("verify = %d", ret2);
  179. ret = ret2;
  180. goto out;
  181. }
  182. rx_pkt_offset = sp->offset;
  183. rx_pkt_len = sp->len;
  184. } else {
  185. trace_rxrpc_recvdata(call, rxrpc_recvmsg_cont, seq,
  186. rx_pkt_offset, rx_pkt_len, 0);
  187. }
  188. /* We have to handle short, empty and used-up DATA packets. */
  189. remain = len - *_offset;
  190. copy = rx_pkt_len;
  191. if (copy > remain)
  192. copy = remain;
  193. if (copy > 0) {
  194. ret2 = skb_copy_datagram_iter(skb, rx_pkt_offset, iter,
  195. copy);
  196. if (ret2 < 0) {
  197. ret = ret2;
  198. goto out;
  199. }
  200. /* handle piecemeal consumption of data packets */
  201. rx_pkt_offset += copy;
  202. rx_pkt_len -= copy;
  203. *_offset += copy;
  204. }
  205. if (rx_pkt_len > 0) {
  206. trace_rxrpc_recvdata(call, rxrpc_recvmsg_full, seq,
  207. rx_pkt_offset, rx_pkt_len, 0);
  208. ASSERTCMP(*_offset, ==, len);
  209. ret = 0;
  210. break;
  211. }
  212. /* The whole packet has been transferred. */
  213. if (sp->hdr.flags & RXRPC_LAST_PACKET)
  214. ret = 1;
  215. rx_pkt_offset = 0;
  216. rx_pkt_len = 0;
  217. skb = skb_peek_next(skb, &call->recvmsg_queue);
  218. if (!(flags & MSG_PEEK))
  219. rxrpc_rotate_rx_window(call);
  220. }
  221. out:
  222. if (!(flags & MSG_PEEK)) {
  223. call->rx_pkt_offset = rx_pkt_offset;
  224. call->rx_pkt_len = rx_pkt_len;
  225. }
  226. done:
  227. trace_rxrpc_recvdata(call, rxrpc_recvmsg_data_return, seq,
  228. rx_pkt_offset, rx_pkt_len, ret);
  229. if (ret == -EAGAIN)
  230. set_bit(RXRPC_CALL_RX_IS_IDLE, &call->flags);
  231. return ret;
  232. }
  233. /*
  234. * Receive a message from an RxRPC socket
  235. * - we need to be careful about two or more threads calling recvmsg
  236. * simultaneously
  237. */
  238. int rxrpc_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
  239. int flags)
  240. {
  241. struct rxrpc_call *call;
  242. struct rxrpc_sock *rx = rxrpc_sk(sock->sk);
  243. struct list_head *l;
  244. unsigned int call_debug_id = 0;
  245. size_t copied = 0;
  246. long timeo;
  247. int ret;
  248. DEFINE_WAIT(wait);
  249. trace_rxrpc_recvmsg(0, rxrpc_recvmsg_enter, 0);
  250. if (flags & (MSG_OOB | MSG_TRUNC))
  251. return -EOPNOTSUPP;
  252. timeo = sock_rcvtimeo(&rx->sk, flags & MSG_DONTWAIT);
  253. try_again:
  254. lock_sock(&rx->sk);
  255. /* Return immediately if a client socket has no outstanding calls */
  256. if (RB_EMPTY_ROOT(&rx->calls) &&
  257. list_empty(&rx->recvmsg_q) &&
  258. rx->sk.sk_state != RXRPC_SERVER_LISTENING) {
  259. release_sock(&rx->sk);
  260. return -EAGAIN;
  261. }
  262. if (list_empty(&rx->recvmsg_q)) {
  263. ret = -EWOULDBLOCK;
  264. if (timeo == 0) {
  265. call = NULL;
  266. goto error_no_call;
  267. }
  268. release_sock(&rx->sk);
  269. /* Wait for something to happen */
  270. prepare_to_wait_exclusive(sk_sleep(&rx->sk), &wait,
  271. TASK_INTERRUPTIBLE);
  272. ret = sock_error(&rx->sk);
  273. if (ret)
  274. goto wait_error;
  275. if (list_empty(&rx->recvmsg_q)) {
  276. if (signal_pending(current))
  277. goto wait_interrupted;
  278. trace_rxrpc_recvmsg(0, rxrpc_recvmsg_wait, 0);
  279. timeo = schedule_timeout(timeo);
  280. }
  281. finish_wait(sk_sleep(&rx->sk), &wait);
  282. goto try_again;
  283. }
  284. /* Find the next call and dequeue it if we're not just peeking. If we
  285. * do dequeue it, that comes with a ref that we will need to release.
  286. * We also want to weed out calls that got requeued whilst we were
  287. * shovelling data out.
  288. */
  289. spin_lock(&rx->recvmsg_lock);
  290. l = rx->recvmsg_q.next;
  291. call = list_entry(l, struct rxrpc_call, recvmsg_link);
  292. if (!rxrpc_call_is_complete(call) &&
  293. skb_queue_empty(&call->recvmsg_queue)) {
  294. list_del_init(&call->recvmsg_link);
  295. spin_unlock(&rx->recvmsg_lock);
  296. release_sock(&rx->sk);
  297. trace_rxrpc_recvmsg(call->debug_id, rxrpc_recvmsg_unqueue, 0);
  298. rxrpc_put_call(call, rxrpc_call_put_recvmsg);
  299. goto try_again;
  300. }
  301. rxrpc_see_call(call, rxrpc_call_see_recvmsg);
  302. if (test_bit(RXRPC_CALL_RELEASED, &call->flags)) {
  303. rxrpc_see_call(call, rxrpc_call_see_already_released);
  304. list_del_init(&call->recvmsg_link);
  305. spin_unlock_irq(&rx->recvmsg_lock);
  306. release_sock(&rx->sk);
  307. trace_rxrpc_recvmsg(call->debug_id, rxrpc_recvmsg_unqueue, 0);
  308. rxrpc_put_call(call, rxrpc_call_put_recvmsg);
  309. goto try_again;
  310. }
  311. if (!(flags & MSG_PEEK))
  312. list_del_init(&call->recvmsg_link);
  313. else
  314. rxrpc_get_call(call, rxrpc_call_get_recvmsg);
  315. spin_unlock(&rx->recvmsg_lock);
  316. call_debug_id = call->debug_id;
  317. trace_rxrpc_recvmsg(call_debug_id, rxrpc_recvmsg_dequeue, 0);
  318. /* We're going to drop the socket lock, so we need to lock the call
  319. * against interference by sendmsg.
  320. */
  321. if (!mutex_trylock(&call->user_mutex)) {
  322. ret = -EWOULDBLOCK;
  323. if (flags & MSG_DONTWAIT)
  324. goto error_requeue_call;
  325. ret = -ERESTARTSYS;
  326. if (mutex_lock_interruptible(&call->user_mutex) < 0)
  327. goto error_requeue_call;
  328. }
  329. release_sock(&rx->sk);
  330. if (test_bit(RXRPC_CALL_RELEASED, &call->flags)) {
  331. rxrpc_see_call(call, rxrpc_call_see_already_released);
  332. mutex_unlock(&call->user_mutex);
  333. if (!(flags & MSG_PEEK))
  334. rxrpc_put_call(call, rxrpc_call_put_recvmsg);
  335. goto try_again;
  336. }
  337. if (test_bit(RXRPC_CALL_HAS_USERID, &call->flags)) {
  338. if (flags & MSG_CMSG_COMPAT) {
  339. unsigned int id32 = call->user_call_ID;
  340. ret = put_cmsg(msg, SOL_RXRPC, RXRPC_USER_CALL_ID,
  341. sizeof(unsigned int), &id32);
  342. } else {
  343. unsigned long idl = call->user_call_ID;
  344. ret = put_cmsg(msg, SOL_RXRPC, RXRPC_USER_CALL_ID,
  345. sizeof(unsigned long), &idl);
  346. }
  347. if (ret < 0)
  348. goto error_unlock_call;
  349. }
  350. if (msg->msg_name && call->peer) {
  351. size_t len = sizeof(call->dest_srx);
  352. memcpy(msg->msg_name, &call->dest_srx, len);
  353. msg->msg_namelen = len;
  354. }
  355. ret = rxrpc_recvmsg_data(sock, call, msg, &msg->msg_iter, len,
  356. flags, &copied);
  357. if (ret == -EAGAIN)
  358. ret = 0;
  359. if (ret == -EIO)
  360. goto call_failed;
  361. if (ret < 0)
  362. goto error_unlock_call;
  363. if (rxrpc_call_is_complete(call) &&
  364. skb_queue_empty(&call->recvmsg_queue))
  365. goto call_complete;
  366. if (rxrpc_call_has_failed(call))
  367. goto call_failed;
  368. if (!skb_queue_empty(&call->recvmsg_queue))
  369. rxrpc_notify_socket(call);
  370. goto not_yet_complete;
  371. call_failed:
  372. rxrpc_purge_queue(&call->recvmsg_queue);
  373. call_complete:
  374. ret = rxrpc_recvmsg_term(call, msg);
  375. if (ret < 0)
  376. goto error_unlock_call;
  377. if (!(flags & MSG_PEEK))
  378. rxrpc_release_call(rx, call);
  379. msg->msg_flags |= MSG_EOR;
  380. ret = 1;
  381. not_yet_complete:
  382. if (ret == 0)
  383. msg->msg_flags |= MSG_MORE;
  384. else
  385. msg->msg_flags &= ~MSG_MORE;
  386. ret = copied;
  387. error_unlock_call:
  388. mutex_unlock(&call->user_mutex);
  389. rxrpc_put_call(call, rxrpc_call_put_recvmsg);
  390. trace_rxrpc_recvmsg(call_debug_id, rxrpc_recvmsg_return, ret);
  391. return ret;
  392. error_requeue_call:
  393. if (!(flags & MSG_PEEK)) {
  394. spin_lock(&rx->recvmsg_lock);
  395. list_add(&call->recvmsg_link, &rx->recvmsg_q);
  396. spin_unlock(&rx->recvmsg_lock);
  397. trace_rxrpc_recvmsg(call_debug_id, rxrpc_recvmsg_requeue, 0);
  398. } else {
  399. rxrpc_put_call(call, rxrpc_call_put_recvmsg);
  400. }
  401. error_no_call:
  402. release_sock(&rx->sk);
  403. error_trace:
  404. trace_rxrpc_recvmsg(call_debug_id, rxrpc_recvmsg_return, ret);
  405. return ret;
  406. wait_interrupted:
  407. ret = sock_intr_errno(timeo);
  408. wait_error:
  409. finish_wait(sk_sleep(&rx->sk), &wait);
  410. call = NULL;
  411. goto error_trace;
  412. }
  413. /**
  414. * rxrpc_kernel_recv_data - Allow a kernel service to receive data/info
  415. * @sock: The socket that the call exists on
  416. * @call: The call to send data through
  417. * @iter: The buffer to receive into
  418. * @_len: The amount of data we want to receive (decreased on return)
  419. * @want_more: True if more data is expected to be read
  420. * @_abort: Where the abort code is stored if -ECONNABORTED is returned
  421. * @_service: Where to store the actual service ID (may be upgraded)
  422. *
  423. * Allow a kernel service to receive data and pick up information about the
  424. * state of a call. Returns 0 if got what was asked for and there's more
  425. * available, 1 if we got what was asked for and we're at the end of the data
  426. * and -EAGAIN if we need more data.
  427. *
  428. * Note that we may return -EAGAIN to drain empty packets at the end of the
  429. * data, even if we've already copied over the requested data.
  430. *
  431. * *_abort should also be initialised to 0.
  432. */
  433. int rxrpc_kernel_recv_data(struct socket *sock, struct rxrpc_call *call,
  434. struct iov_iter *iter, size_t *_len,
  435. bool want_more, u32 *_abort, u16 *_service)
  436. {
  437. size_t offset = 0;
  438. int ret;
  439. _enter("{%d},%zu,%d", call->debug_id, *_len, want_more);
  440. mutex_lock(&call->user_mutex);
  441. ret = rxrpc_recvmsg_data(sock, call, NULL, iter, *_len, 0, &offset);
  442. *_len -= offset;
  443. if (ret == -EIO)
  444. goto call_failed;
  445. if (ret < 0)
  446. goto out;
  447. /* We can only reach here with a partially full buffer if we have
  448. * reached the end of the data. We must otherwise have a full buffer
  449. * or have been given -EAGAIN.
  450. */
  451. if (ret == 1) {
  452. if (iov_iter_count(iter) > 0)
  453. goto short_data;
  454. if (!want_more)
  455. goto read_phase_complete;
  456. ret = 0;
  457. goto out;
  458. }
  459. if (!want_more)
  460. goto excess_data;
  461. goto out;
  462. read_phase_complete:
  463. ret = 1;
  464. out:
  465. if (_service)
  466. *_service = call->dest_srx.srx_service;
  467. mutex_unlock(&call->user_mutex);
  468. _leave(" = %d [%zu,%d]", ret, iov_iter_count(iter), *_abort);
  469. return ret;
  470. short_data:
  471. trace_rxrpc_abort(call->debug_id, rxrpc_recvmsg_short_data,
  472. call->cid, call->call_id, call->rx_consumed,
  473. 0, -EBADMSG);
  474. ret = -EBADMSG;
  475. goto out;
  476. excess_data:
  477. trace_rxrpc_abort(call->debug_id, rxrpc_recvmsg_excess_data,
  478. call->cid, call->call_id, call->rx_consumed,
  479. 0, -EMSGSIZE);
  480. ret = -EMSGSIZE;
  481. goto out;
  482. call_failed:
  483. *_abort = call->abort_code;
  484. ret = call->error;
  485. if (call->completion == RXRPC_CALL_SUCCEEDED) {
  486. ret = 1;
  487. if (iov_iter_count(iter) > 0)
  488. ret = -ECONNRESET;
  489. }
  490. goto out;
  491. }
  492. EXPORT_SYMBOL(rxrpc_kernel_recv_data);