io_thread.c 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577
  1. // SPDX-License-Identifier: GPL-2.0-or-later
  2. /* RxRPC packet reception
  3. *
  4. * Copyright (C) 2007, 2016, 2022 Red Hat, Inc. All Rights Reserved.
  5. * Written by David Howells (dhowells@redhat.com)
  6. */
  7. #define pr_fmt(fmt) KBUILD_MODNAME ": " fmt
  8. #include "ar-internal.h"
  9. static int rxrpc_input_packet_on_conn(struct rxrpc_connection *conn,
  10. struct sockaddr_rxrpc *peer_srx,
  11. struct sk_buff *skb);
  12. /*
  13. * handle data received on the local endpoint
  14. * - may be called in interrupt context
  15. *
  16. * [!] Note that as this is called from the encap_rcv hook, the socket is not
  17. * held locked by the caller and nothing prevents sk_user_data on the UDP from
  18. * being cleared in the middle of processing this function.
  19. *
  20. * Called with the RCU read lock held from the IP layer via UDP.
  21. */
  22. int rxrpc_encap_rcv(struct sock *udp_sk, struct sk_buff *skb)
  23. {
  24. struct sk_buff_head *rx_queue;
  25. struct rxrpc_local *local = rcu_dereference_sk_user_data(udp_sk);
  26. struct task_struct *io_thread;
  27. if (unlikely(!local)) {
  28. kfree_skb(skb);
  29. return 0;
  30. }
  31. io_thread = READ_ONCE(local->io_thread);
  32. if (!io_thread) {
  33. kfree_skb(skb);
  34. return 0;
  35. }
  36. if (skb->tstamp == 0)
  37. skb->tstamp = ktime_get_real();
  38. skb->mark = RXRPC_SKB_MARK_PACKET;
  39. rxrpc_new_skb(skb, rxrpc_skb_new_encap_rcv);
  40. rx_queue = &local->rx_queue;
  41. #ifdef CONFIG_AF_RXRPC_INJECT_RX_DELAY
  42. if (rxrpc_inject_rx_delay ||
  43. !skb_queue_empty(&local->rx_delay_queue)) {
  44. skb->tstamp = ktime_add_ms(skb->tstamp, rxrpc_inject_rx_delay);
  45. rx_queue = &local->rx_delay_queue;
  46. }
  47. #endif
  48. skb_queue_tail(rx_queue, skb);
  49. wake_up_process(io_thread);
  50. return 0;
  51. }
  52. /*
  53. * Handle an error received on the local endpoint.
  54. */
  55. void rxrpc_error_report(struct sock *sk)
  56. {
  57. struct rxrpc_local *local;
  58. struct sk_buff *skb;
  59. rcu_read_lock();
  60. local = rcu_dereference_sk_user_data(sk);
  61. if (unlikely(!local)) {
  62. rcu_read_unlock();
  63. return;
  64. }
  65. while ((skb = skb_dequeue(&sk->sk_error_queue))) {
  66. skb->mark = RXRPC_SKB_MARK_ERROR;
  67. rxrpc_new_skb(skb, rxrpc_skb_new_error_report);
  68. skb_queue_tail(&local->rx_queue, skb);
  69. }
  70. rxrpc_wake_up_io_thread(local);
  71. rcu_read_unlock();
  72. }
  73. /*
  74. * Directly produce an abort from a packet.
  75. */
  76. bool rxrpc_direct_abort(struct sk_buff *skb, enum rxrpc_abort_reason why,
  77. s32 abort_code, int err)
  78. {
  79. struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
  80. trace_rxrpc_abort(0, why, sp->hdr.cid, sp->hdr.callNumber, sp->hdr.seq,
  81. abort_code, err);
  82. skb->mark = RXRPC_SKB_MARK_REJECT_ABORT;
  83. skb->priority = abort_code;
  84. return false;
  85. }
  86. static bool rxrpc_bad_message(struct sk_buff *skb, enum rxrpc_abort_reason why)
  87. {
  88. return rxrpc_direct_abort(skb, why, RX_PROTOCOL_ERROR, -EBADMSG);
  89. }
  90. #define just_discard true
  91. /*
  92. * Process event packets targeted at a local endpoint.
  93. */
  94. static bool rxrpc_input_version(struct rxrpc_local *local, struct sk_buff *skb)
  95. {
  96. struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
  97. char v;
  98. _enter("");
  99. rxrpc_see_skb(skb, rxrpc_skb_see_version);
  100. if (skb_copy_bits(skb, sizeof(struct rxrpc_wire_header), &v, 1) >= 0) {
  101. if (v == 0)
  102. rxrpc_send_version_request(local, &sp->hdr, skb);
  103. }
  104. return true;
  105. }
  106. /*
  107. * Extract the wire header from a packet and translate the byte order.
  108. */
  109. static bool rxrpc_extract_header(struct rxrpc_skb_priv *sp,
  110. struct sk_buff *skb)
  111. {
  112. struct rxrpc_wire_header whdr;
  113. struct rxrpc_ackpacket ack;
  114. /* dig out the RxRPC connection details */
  115. if (skb_copy_bits(skb, 0, &whdr, sizeof(whdr)) < 0)
  116. return rxrpc_bad_message(skb, rxrpc_badmsg_short_hdr);
  117. memset(sp, 0, sizeof(*sp));
  118. sp->hdr.epoch = ntohl(whdr.epoch);
  119. sp->hdr.cid = ntohl(whdr.cid);
  120. sp->hdr.callNumber = ntohl(whdr.callNumber);
  121. sp->hdr.seq = ntohl(whdr.seq);
  122. sp->hdr.serial = ntohl(whdr.serial);
  123. sp->hdr.flags = whdr.flags;
  124. sp->hdr.type = whdr.type;
  125. sp->hdr.userStatus = whdr.userStatus;
  126. sp->hdr.securityIndex = whdr.securityIndex;
  127. sp->hdr._rsvd = ntohs(whdr._rsvd);
  128. sp->hdr.serviceId = ntohs(whdr.serviceId);
  129. if (sp->hdr.type == RXRPC_PACKET_TYPE_ACK) {
  130. if (skb_copy_bits(skb, sizeof(whdr), &ack, sizeof(ack)) < 0)
  131. return rxrpc_bad_message(skb, rxrpc_badmsg_short_ack);
  132. sp->ack.first_ack = ntohl(ack.firstPacket);
  133. sp->ack.prev_ack = ntohl(ack.previousPacket);
  134. sp->ack.acked_serial = ntohl(ack.serial);
  135. sp->ack.reason = ack.reason;
  136. sp->ack.nr_acks = ack.nAcks;
  137. }
  138. return true;
  139. }
  140. /*
  141. * Extract the abort code from an ABORT packet and stash it in skb->priority.
  142. */
  143. static bool rxrpc_extract_abort(struct sk_buff *skb)
  144. {
  145. __be32 wtmp;
  146. if (skb_copy_bits(skb, sizeof(struct rxrpc_wire_header),
  147. &wtmp, sizeof(wtmp)) < 0)
  148. return false;
  149. skb->priority = ntohl(wtmp);
  150. return true;
  151. }
  152. /*
  153. * Process packets received on the local endpoint
  154. */
  155. static bool rxrpc_input_packet(struct rxrpc_local *local, struct sk_buff **_skb)
  156. {
  157. struct rxrpc_connection *conn;
  158. struct sockaddr_rxrpc peer_srx;
  159. struct rxrpc_skb_priv *sp;
  160. struct rxrpc_peer *peer = NULL;
  161. struct sk_buff *skb = *_skb;
  162. bool ret = false;
  163. skb_pull(skb, sizeof(struct udphdr));
  164. sp = rxrpc_skb(skb);
  165. /* dig out the RxRPC connection details */
  166. if (!rxrpc_extract_header(sp, skb))
  167. return just_discard;
  168. if (IS_ENABLED(CONFIG_AF_RXRPC_INJECT_LOSS)) {
  169. static int lose;
  170. if ((lose++ & 7) == 7) {
  171. trace_rxrpc_rx_lose(sp);
  172. return just_discard;
  173. }
  174. }
  175. trace_rxrpc_rx_packet(sp);
  176. switch (sp->hdr.type) {
  177. case RXRPC_PACKET_TYPE_VERSION:
  178. if (rxrpc_to_client(sp))
  179. return just_discard;
  180. return rxrpc_input_version(local, skb);
  181. case RXRPC_PACKET_TYPE_BUSY:
  182. if (rxrpc_to_server(sp))
  183. return just_discard;
  184. fallthrough;
  185. case RXRPC_PACKET_TYPE_ACK:
  186. case RXRPC_PACKET_TYPE_ACKALL:
  187. if (sp->hdr.callNumber == 0)
  188. return rxrpc_bad_message(skb, rxrpc_badmsg_zero_call);
  189. break;
  190. case RXRPC_PACKET_TYPE_ABORT:
  191. if (!rxrpc_extract_abort(skb))
  192. return just_discard; /* Just discard if malformed */
  193. break;
  194. case RXRPC_PACKET_TYPE_DATA:
  195. if (sp->hdr.callNumber == 0)
  196. return rxrpc_bad_message(skb, rxrpc_badmsg_zero_call);
  197. if (sp->hdr.seq == 0)
  198. return rxrpc_bad_message(skb, rxrpc_badmsg_zero_seq);
  199. /* Unshare the packet so that it can be modified for in-place
  200. * decryption.
  201. */
  202. if (sp->hdr.securityIndex != 0) {
  203. skb = skb_unshare(skb, GFP_ATOMIC);
  204. if (!skb) {
  205. rxrpc_eaten_skb(*_skb, rxrpc_skb_eaten_by_unshare_nomem);
  206. *_skb = NULL;
  207. return just_discard;
  208. }
  209. if (skb != *_skb) {
  210. rxrpc_eaten_skb(*_skb, rxrpc_skb_eaten_by_unshare);
  211. *_skb = skb;
  212. rxrpc_new_skb(skb, rxrpc_skb_new_unshared);
  213. sp = rxrpc_skb(skb);
  214. }
  215. }
  216. break;
  217. case RXRPC_PACKET_TYPE_CHALLENGE:
  218. if (rxrpc_to_server(sp))
  219. return just_discard;
  220. break;
  221. case RXRPC_PACKET_TYPE_RESPONSE:
  222. if (rxrpc_to_client(sp))
  223. return just_discard;
  224. break;
  225. /* Packet types 9-11 should just be ignored. */
  226. case RXRPC_PACKET_TYPE_PARAMS:
  227. case RXRPC_PACKET_TYPE_10:
  228. case RXRPC_PACKET_TYPE_11:
  229. return just_discard;
  230. default:
  231. return rxrpc_bad_message(skb, rxrpc_badmsg_unsupported_packet);
  232. }
  233. if (sp->hdr.serviceId == 0)
  234. return rxrpc_bad_message(skb, rxrpc_badmsg_zero_service);
  235. if (WARN_ON_ONCE(rxrpc_extract_addr_from_skb(&peer_srx, skb) < 0))
  236. return just_discard; /* Unsupported address type. */
  237. if (peer_srx.transport.family != local->srx.transport.family &&
  238. (peer_srx.transport.family == AF_INET &&
  239. local->srx.transport.family != AF_INET6)) {
  240. pr_warn_ratelimited("AF_RXRPC: Protocol mismatch %u not %u\n",
  241. peer_srx.transport.family,
  242. local->srx.transport.family);
  243. return just_discard; /* Wrong address type. */
  244. }
  245. if (rxrpc_to_client(sp)) {
  246. rcu_read_lock();
  247. conn = rxrpc_find_client_connection_rcu(local, &peer_srx, skb);
  248. conn = rxrpc_get_connection_maybe(conn, rxrpc_conn_get_call_input);
  249. rcu_read_unlock();
  250. if (!conn)
  251. return rxrpc_protocol_error(skb, rxrpc_eproto_no_client_conn);
  252. ret = rxrpc_input_packet_on_conn(conn, &peer_srx, skb);
  253. rxrpc_put_connection(conn, rxrpc_conn_put_call_input);
  254. return ret;
  255. }
  256. /* We need to look up service connections by the full protocol
  257. * parameter set. We look up the peer first as an intermediate step
  258. * and then the connection from the peer's tree.
  259. */
  260. rcu_read_lock();
  261. peer = rxrpc_lookup_peer_rcu(local, &peer_srx);
  262. if (!peer) {
  263. rcu_read_unlock();
  264. return rxrpc_new_incoming_call(local, NULL, NULL, &peer_srx, skb);
  265. }
  266. conn = rxrpc_find_service_conn_rcu(peer, skb);
  267. conn = rxrpc_get_connection_maybe(conn, rxrpc_conn_get_call_input);
  268. if (conn) {
  269. rcu_read_unlock();
  270. ret = rxrpc_input_packet_on_conn(conn, &peer_srx, skb);
  271. rxrpc_put_connection(conn, rxrpc_conn_put_call_input);
  272. return ret;
  273. }
  274. peer = rxrpc_get_peer_maybe(peer, rxrpc_peer_get_input);
  275. rcu_read_unlock();
  276. ret = rxrpc_new_incoming_call(local, peer, NULL, &peer_srx, skb);
  277. rxrpc_put_peer(peer, rxrpc_peer_put_input);
  278. return ret;
  279. }
  280. /*
  281. * Deal with a packet that's associated with an extant connection.
  282. */
  283. static int rxrpc_input_packet_on_conn(struct rxrpc_connection *conn,
  284. struct sockaddr_rxrpc *peer_srx,
  285. struct sk_buff *skb)
  286. {
  287. struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
  288. struct rxrpc_channel *chan;
  289. struct rxrpc_call *call = NULL;
  290. unsigned int channel;
  291. bool ret;
  292. if (sp->hdr.securityIndex != conn->security_ix)
  293. return rxrpc_direct_abort(skb, rxrpc_eproto_wrong_security,
  294. RXKADINCONSISTENCY, -EBADMSG);
  295. if (sp->hdr.serviceId != conn->service_id) {
  296. int old_id;
  297. if (!test_bit(RXRPC_CONN_PROBING_FOR_UPGRADE, &conn->flags))
  298. return rxrpc_protocol_error(skb, rxrpc_eproto_reupgrade);
  299. old_id = cmpxchg(&conn->service_id, conn->orig_service_id,
  300. sp->hdr.serviceId);
  301. if (old_id != conn->orig_service_id &&
  302. old_id != sp->hdr.serviceId)
  303. return rxrpc_protocol_error(skb, rxrpc_eproto_bad_upgrade);
  304. }
  305. if (after(sp->hdr.serial, conn->hi_serial))
  306. conn->hi_serial = sp->hdr.serial;
  307. /* It's a connection-level packet if the call number is 0. */
  308. if (sp->hdr.callNumber == 0)
  309. return rxrpc_input_conn_packet(conn, skb);
  310. /* Call-bound packets are routed by connection channel. */
  311. channel = sp->hdr.cid & RXRPC_CHANNELMASK;
  312. chan = &conn->channels[channel];
  313. /* Ignore really old calls */
  314. if (sp->hdr.callNumber < chan->last_call)
  315. return just_discard;
  316. if (sp->hdr.callNumber == chan->last_call) {
  317. if (chan->call ||
  318. sp->hdr.type == RXRPC_PACKET_TYPE_ABORT)
  319. return just_discard;
  320. /* For the previous service call, if completed successfully, we
  321. * discard all further packets.
  322. */
  323. if (rxrpc_conn_is_service(conn) &&
  324. chan->last_type == RXRPC_PACKET_TYPE_ACK)
  325. return just_discard;
  326. /* But otherwise we need to retransmit the final packet from
  327. * data cached in the connection record.
  328. */
  329. if (sp->hdr.type == RXRPC_PACKET_TYPE_DATA)
  330. trace_rxrpc_rx_data(chan->call_debug_id,
  331. sp->hdr.seq,
  332. sp->hdr.serial,
  333. sp->hdr.flags);
  334. rxrpc_conn_retransmit_call(conn, skb, channel);
  335. return just_discard;
  336. }
  337. call = rxrpc_try_get_call(chan->call, rxrpc_call_get_input);
  338. if (sp->hdr.callNumber > chan->call_id) {
  339. if (rxrpc_to_client(sp)) {
  340. rxrpc_put_call(call, rxrpc_call_put_input);
  341. return rxrpc_protocol_error(skb,
  342. rxrpc_eproto_unexpected_implicit_end);
  343. }
  344. if (call) {
  345. rxrpc_implicit_end_call(call, skb);
  346. rxrpc_put_call(call, rxrpc_call_put_input);
  347. call = NULL;
  348. }
  349. }
  350. if (!call) {
  351. if (rxrpc_to_client(sp))
  352. return rxrpc_protocol_error(skb, rxrpc_eproto_no_client_call);
  353. return rxrpc_new_incoming_call(conn->local, conn->peer, conn,
  354. peer_srx, skb);
  355. }
  356. ret = rxrpc_input_call_event(call, skb);
  357. rxrpc_put_call(call, rxrpc_call_put_input);
  358. return ret;
  359. }
  360. /*
  361. * I/O and event handling thread.
  362. */
  363. int rxrpc_io_thread(void *data)
  364. {
  365. struct rxrpc_connection *conn;
  366. struct sk_buff_head rx_queue;
  367. struct rxrpc_local *local = data;
  368. struct rxrpc_call *call;
  369. struct sk_buff *skb;
  370. #ifdef CONFIG_AF_RXRPC_INJECT_RX_DELAY
  371. ktime_t now;
  372. #endif
  373. bool should_stop;
  374. complete(&local->io_thread_ready);
  375. skb_queue_head_init(&rx_queue);
  376. set_user_nice(current, MIN_NICE);
  377. for (;;) {
  378. rxrpc_inc_stat(local->rxnet, stat_io_loop);
  379. /* Deal with connections that want immediate attention. */
  380. conn = list_first_entry_or_null(&local->conn_attend_q,
  381. struct rxrpc_connection,
  382. attend_link);
  383. if (conn) {
  384. spin_lock_bh(&local->lock);
  385. list_del_init(&conn->attend_link);
  386. spin_unlock_bh(&local->lock);
  387. rxrpc_input_conn_event(conn, NULL);
  388. rxrpc_put_connection(conn, rxrpc_conn_put_poke);
  389. continue;
  390. }
  391. if (test_and_clear_bit(RXRPC_CLIENT_CONN_REAP_TIMER,
  392. &local->client_conn_flags))
  393. rxrpc_discard_expired_client_conns(local);
  394. /* Deal with calls that want immediate attention. */
  395. if ((call = list_first_entry_or_null(&local->call_attend_q,
  396. struct rxrpc_call,
  397. attend_link))) {
  398. spin_lock_bh(&local->lock);
  399. list_del_init(&call->attend_link);
  400. spin_unlock_bh(&local->lock);
  401. trace_rxrpc_call_poked(call);
  402. rxrpc_input_call_event(call, NULL);
  403. rxrpc_put_call(call, rxrpc_call_put_poke);
  404. continue;
  405. }
  406. if (!list_empty(&local->new_client_calls))
  407. rxrpc_connect_client_calls(local);
  408. /* Process received packets and errors. */
  409. if ((skb = __skb_dequeue(&rx_queue))) {
  410. struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
  411. switch (skb->mark) {
  412. case RXRPC_SKB_MARK_PACKET:
  413. skb->priority = 0;
  414. if (!rxrpc_input_packet(local, &skb))
  415. rxrpc_reject_packet(local, skb);
  416. trace_rxrpc_rx_done(skb->mark, skb->priority);
  417. rxrpc_free_skb(skb, rxrpc_skb_put_input);
  418. break;
  419. case RXRPC_SKB_MARK_ERROR:
  420. rxrpc_input_error(local, skb);
  421. rxrpc_free_skb(skb, rxrpc_skb_put_error_report);
  422. break;
  423. case RXRPC_SKB_MARK_SERVICE_CONN_SECURED:
  424. rxrpc_input_conn_event(sp->conn, skb);
  425. rxrpc_put_connection(sp->conn, rxrpc_conn_put_poke);
  426. rxrpc_free_skb(skb, rxrpc_skb_put_conn_secured);
  427. break;
  428. default:
  429. WARN_ON_ONCE(1);
  430. rxrpc_free_skb(skb, rxrpc_skb_put_unknown);
  431. break;
  432. }
  433. continue;
  434. }
  435. /* Inject a delay into packets if requested. */
  436. #ifdef CONFIG_AF_RXRPC_INJECT_RX_DELAY
  437. now = ktime_get_real();
  438. while ((skb = skb_peek(&local->rx_delay_queue))) {
  439. if (ktime_before(now, skb->tstamp))
  440. break;
  441. skb = skb_dequeue(&local->rx_delay_queue);
  442. skb_queue_tail(&local->rx_queue, skb);
  443. }
  444. #endif
  445. if (!skb_queue_empty(&local->rx_queue)) {
  446. spin_lock_irq(&local->rx_queue.lock);
  447. skb_queue_splice_tail_init(&local->rx_queue, &rx_queue);
  448. spin_unlock_irq(&local->rx_queue.lock);
  449. continue;
  450. }
  451. set_current_state(TASK_INTERRUPTIBLE);
  452. should_stop = kthread_should_stop();
  453. if (!skb_queue_empty(&local->rx_queue) ||
  454. !list_empty(&local->call_attend_q) ||
  455. !list_empty(&local->conn_attend_q) ||
  456. !list_empty(&local->new_client_calls) ||
  457. test_bit(RXRPC_CLIENT_CONN_REAP_TIMER,
  458. &local->client_conn_flags)) {
  459. __set_current_state(TASK_RUNNING);
  460. continue;
  461. }
  462. if (should_stop)
  463. break;
  464. #ifdef CONFIG_AF_RXRPC_INJECT_RX_DELAY
  465. skb = skb_peek(&local->rx_delay_queue);
  466. if (skb) {
  467. unsigned long timeout;
  468. ktime_t tstamp = skb->tstamp;
  469. ktime_t now = ktime_get_real();
  470. s64 delay_ns = ktime_to_ns(ktime_sub(tstamp, now));
  471. if (delay_ns <= 0) {
  472. __set_current_state(TASK_RUNNING);
  473. continue;
  474. }
  475. timeout = nsecs_to_jiffies(delay_ns);
  476. timeout = max(timeout, 1UL);
  477. schedule_timeout(timeout);
  478. __set_current_state(TASK_RUNNING);
  479. continue;
  480. }
  481. #endif
  482. schedule();
  483. }
  484. __set_current_state(TASK_RUNNING);
  485. rxrpc_see_local(local, rxrpc_local_stop);
  486. rxrpc_destroy_local(local);
  487. WRITE_ONCE(local->io_thread, NULL);
  488. rxrpc_see_local(local, rxrpc_local_stopped);
  489. return 0;
  490. }