input.c 31 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115
  1. // SPDX-License-Identifier: GPL-2.0-or-later
  2. /* Processing of received RxRPC packets
  3. *
  4. * Copyright (C) 2020 Red Hat, Inc. All Rights Reserved.
  5. * Written by David Howells (dhowells@redhat.com)
  6. */
  7. #define pr_fmt(fmt) KBUILD_MODNAME ": " fmt
  8. #include "ar-internal.h"
  9. /* Override priority when generating ACKs for received DATA */
  10. static const u8 rxrpc_ack_priority[RXRPC_ACK__INVALID] = {
  11. [RXRPC_ACK_IDLE] = 1,
  12. [RXRPC_ACK_DELAY] = 2,
  13. [RXRPC_ACK_REQUESTED] = 3,
  14. [RXRPC_ACK_DUPLICATE] = 4,
  15. [RXRPC_ACK_EXCEEDS_WINDOW] = 5,
  16. [RXRPC_ACK_NOSPACE] = 6,
  17. [RXRPC_ACK_OUT_OF_SEQUENCE] = 7,
  18. };
  19. static void rxrpc_proto_abort(struct rxrpc_call *call, rxrpc_seq_t seq,
  20. enum rxrpc_abort_reason why)
  21. {
  22. rxrpc_abort_call(call, seq, RX_PROTOCOL_ERROR, -EBADMSG, why);
  23. }
  24. /*
  25. * Do TCP-style congestion management [RFC 5681].
  26. */
  27. static void rxrpc_congestion_management(struct rxrpc_call *call,
  28. struct sk_buff *skb,
  29. struct rxrpc_ack_summary *summary,
  30. rxrpc_serial_t acked_serial)
  31. {
  32. enum rxrpc_congest_change change = rxrpc_cong_no_change;
  33. unsigned int cumulative_acks = call->cong_cumul_acks;
  34. unsigned int cwnd = call->cong_cwnd;
  35. bool resend = false;
  36. summary->flight_size =
  37. (call->tx_top - call->acks_hard_ack) - summary->nr_acks;
  38. if (test_and_clear_bit(RXRPC_CALL_RETRANS_TIMEOUT, &call->flags)) {
  39. summary->retrans_timeo = true;
  40. call->cong_ssthresh = max_t(unsigned int,
  41. summary->flight_size / 2, 2);
  42. cwnd = 1;
  43. if (cwnd >= call->cong_ssthresh &&
  44. call->cong_mode == RXRPC_CALL_SLOW_START) {
  45. call->cong_mode = RXRPC_CALL_CONGEST_AVOIDANCE;
  46. call->cong_tstamp = skb->tstamp;
  47. cumulative_acks = 0;
  48. }
  49. }
  50. cumulative_acks += summary->nr_new_acks;
  51. if (cumulative_acks > 255)
  52. cumulative_acks = 255;
  53. summary->cwnd = call->cong_cwnd;
  54. summary->ssthresh = call->cong_ssthresh;
  55. summary->cumulative_acks = cumulative_acks;
  56. summary->dup_acks = call->cong_dup_acks;
  57. switch (call->cong_mode) {
  58. case RXRPC_CALL_SLOW_START:
  59. if (summary->saw_nacks)
  60. goto packet_loss_detected;
  61. if (summary->cumulative_acks > 0)
  62. cwnd += 1;
  63. if (cwnd >= call->cong_ssthresh) {
  64. call->cong_mode = RXRPC_CALL_CONGEST_AVOIDANCE;
  65. call->cong_tstamp = skb->tstamp;
  66. }
  67. goto out;
  68. case RXRPC_CALL_CONGEST_AVOIDANCE:
  69. if (summary->saw_nacks)
  70. goto packet_loss_detected;
  71. /* We analyse the number of packets that get ACK'd per RTT
  72. * period and increase the window if we managed to fill it.
  73. */
  74. if (call->peer->rtt_count == 0)
  75. goto out;
  76. if (ktime_before(skb->tstamp,
  77. ktime_add_us(call->cong_tstamp,
  78. call->peer->srtt_us >> 3)))
  79. goto out_no_clear_ca;
  80. change = rxrpc_cong_rtt_window_end;
  81. call->cong_tstamp = skb->tstamp;
  82. if (cumulative_acks >= cwnd)
  83. cwnd++;
  84. goto out;
  85. case RXRPC_CALL_PACKET_LOSS:
  86. if (!summary->saw_nacks)
  87. goto resume_normality;
  88. if (summary->new_low_nack) {
  89. change = rxrpc_cong_new_low_nack;
  90. call->cong_dup_acks = 1;
  91. if (call->cong_extra > 1)
  92. call->cong_extra = 1;
  93. goto send_extra_data;
  94. }
  95. call->cong_dup_acks++;
  96. if (call->cong_dup_acks < 3)
  97. goto send_extra_data;
  98. change = rxrpc_cong_begin_retransmission;
  99. call->cong_mode = RXRPC_CALL_FAST_RETRANSMIT;
  100. call->cong_ssthresh = max_t(unsigned int,
  101. summary->flight_size / 2, 2);
  102. cwnd = call->cong_ssthresh + 3;
  103. call->cong_extra = 0;
  104. call->cong_dup_acks = 0;
  105. resend = true;
  106. goto out;
  107. case RXRPC_CALL_FAST_RETRANSMIT:
  108. if (!summary->new_low_nack) {
  109. if (summary->nr_new_acks == 0)
  110. cwnd += 1;
  111. call->cong_dup_acks++;
  112. if (call->cong_dup_acks == 2) {
  113. change = rxrpc_cong_retransmit_again;
  114. call->cong_dup_acks = 0;
  115. resend = true;
  116. }
  117. } else {
  118. change = rxrpc_cong_progress;
  119. cwnd = call->cong_ssthresh;
  120. if (!summary->saw_nacks)
  121. goto resume_normality;
  122. }
  123. goto out;
  124. default:
  125. BUG();
  126. goto out;
  127. }
  128. resume_normality:
  129. change = rxrpc_cong_cleared_nacks;
  130. call->cong_dup_acks = 0;
  131. call->cong_extra = 0;
  132. call->cong_tstamp = skb->tstamp;
  133. if (cwnd < call->cong_ssthresh)
  134. call->cong_mode = RXRPC_CALL_SLOW_START;
  135. else
  136. call->cong_mode = RXRPC_CALL_CONGEST_AVOIDANCE;
  137. out:
  138. cumulative_acks = 0;
  139. out_no_clear_ca:
  140. if (cwnd >= RXRPC_TX_MAX_WINDOW)
  141. cwnd = RXRPC_TX_MAX_WINDOW;
  142. call->cong_cwnd = cwnd;
  143. call->cong_cumul_acks = cumulative_acks;
  144. summary->mode = call->cong_mode;
  145. trace_rxrpc_congest(call, summary, acked_serial, change);
  146. if (resend)
  147. rxrpc_resend(call, skb);
  148. return;
  149. packet_loss_detected:
  150. change = rxrpc_cong_saw_nack;
  151. call->cong_mode = RXRPC_CALL_PACKET_LOSS;
  152. call->cong_dup_acks = 0;
  153. goto send_extra_data;
  154. send_extra_data:
  155. /* Send some previously unsent DATA if we have some to advance the ACK
  156. * state.
  157. */
  158. if (test_bit(RXRPC_CALL_TX_LAST, &call->flags) ||
  159. summary->nr_acks != call->tx_top - call->acks_hard_ack) {
  160. call->cong_extra++;
  161. wake_up(&call->waitq);
  162. }
  163. goto out_no_clear_ca;
  164. }
  165. /*
  166. * Degrade the congestion window if we haven't transmitted a packet for >1RTT.
  167. */
  168. void rxrpc_congestion_degrade(struct rxrpc_call *call)
  169. {
  170. ktime_t rtt, now;
  171. if (call->cong_mode != RXRPC_CALL_SLOW_START &&
  172. call->cong_mode != RXRPC_CALL_CONGEST_AVOIDANCE)
  173. return;
  174. if (__rxrpc_call_state(call) == RXRPC_CALL_CLIENT_AWAIT_REPLY)
  175. return;
  176. rtt = ns_to_ktime(call->peer->srtt_us * (1000 / 8));
  177. now = ktime_get_real();
  178. if (!ktime_before(ktime_add(call->tx_last_sent, rtt), now))
  179. return;
  180. trace_rxrpc_reset_cwnd(call, now);
  181. rxrpc_inc_stat(call->rxnet, stat_tx_data_cwnd_reset);
  182. call->tx_last_sent = now;
  183. call->cong_mode = RXRPC_CALL_SLOW_START;
  184. call->cong_ssthresh = max_t(unsigned int, call->cong_ssthresh,
  185. call->cong_cwnd * 3 / 4);
  186. call->cong_cwnd = max_t(unsigned int, call->cong_cwnd / 2, RXRPC_MIN_CWND);
  187. }
  188. /*
  189. * Apply a hard ACK by advancing the Tx window.
  190. */
  191. static bool rxrpc_rotate_tx_window(struct rxrpc_call *call, rxrpc_seq_t to,
  192. struct rxrpc_ack_summary *summary)
  193. {
  194. struct rxrpc_txbuf *txb;
  195. bool rot_last = false;
  196. list_for_each_entry_rcu(txb, &call->tx_buffer, call_link, false) {
  197. if (before_eq(txb->seq, call->acks_hard_ack))
  198. continue;
  199. if (txb->flags & RXRPC_LAST_PACKET) {
  200. set_bit(RXRPC_CALL_TX_LAST, &call->flags);
  201. rot_last = true;
  202. }
  203. if (txb->seq == to)
  204. break;
  205. }
  206. if (rot_last)
  207. set_bit(RXRPC_CALL_TX_ALL_ACKED, &call->flags);
  208. _enter("%x,%x,%x,%d", to, call->acks_hard_ack, call->tx_top, rot_last);
  209. if (call->acks_lowest_nak == call->acks_hard_ack) {
  210. call->acks_lowest_nak = to;
  211. } else if (after(to, call->acks_lowest_nak)) {
  212. summary->new_low_nack = true;
  213. call->acks_lowest_nak = to;
  214. }
  215. smp_store_release(&call->acks_hard_ack, to);
  216. trace_rxrpc_txqueue(call, (rot_last ?
  217. rxrpc_txqueue_rotate_last :
  218. rxrpc_txqueue_rotate));
  219. wake_up(&call->waitq);
  220. return rot_last;
  221. }
  222. /*
  223. * End the transmission phase of a call.
  224. *
  225. * This occurs when we get an ACKALL packet, the first DATA packet of a reply,
  226. * or a final ACK packet.
  227. */
  228. static void rxrpc_end_tx_phase(struct rxrpc_call *call, bool reply_begun,
  229. enum rxrpc_abort_reason abort_why)
  230. {
  231. ASSERT(test_bit(RXRPC_CALL_TX_LAST, &call->flags));
  232. call->resend_at = KTIME_MAX;
  233. trace_rxrpc_timer_can(call, rxrpc_timer_trace_resend);
  234. if (unlikely(call->cong_last_nack)) {
  235. rxrpc_free_skb(call->cong_last_nack, rxrpc_skb_put_last_nack);
  236. call->cong_last_nack = NULL;
  237. }
  238. switch (__rxrpc_call_state(call)) {
  239. case RXRPC_CALL_CLIENT_SEND_REQUEST:
  240. case RXRPC_CALL_CLIENT_AWAIT_REPLY:
  241. if (reply_begun) {
  242. rxrpc_set_call_state(call, RXRPC_CALL_CLIENT_RECV_REPLY);
  243. trace_rxrpc_txqueue(call, rxrpc_txqueue_end);
  244. break;
  245. }
  246. rxrpc_set_call_state(call, RXRPC_CALL_CLIENT_AWAIT_REPLY);
  247. trace_rxrpc_txqueue(call, rxrpc_txqueue_await_reply);
  248. break;
  249. case RXRPC_CALL_SERVER_AWAIT_ACK:
  250. rxrpc_call_completed(call);
  251. trace_rxrpc_txqueue(call, rxrpc_txqueue_end);
  252. break;
  253. default:
  254. kdebug("end_tx %s", rxrpc_call_states[__rxrpc_call_state(call)]);
  255. rxrpc_proto_abort(call, call->tx_top, abort_why);
  256. break;
  257. }
  258. }
  259. /*
  260. * Begin the reply reception phase of a call.
  261. */
  262. static bool rxrpc_receiving_reply(struct rxrpc_call *call)
  263. {
  264. struct rxrpc_ack_summary summary = { 0 };
  265. rxrpc_seq_t top = READ_ONCE(call->tx_top);
  266. if (call->ackr_reason) {
  267. call->delay_ack_at = KTIME_MAX;
  268. trace_rxrpc_timer_can(call, rxrpc_timer_trace_delayed_ack);
  269. }
  270. if (!test_bit(RXRPC_CALL_TX_LAST, &call->flags)) {
  271. if (!rxrpc_rotate_tx_window(call, top, &summary)) {
  272. rxrpc_proto_abort(call, top, rxrpc_eproto_early_reply);
  273. return false;
  274. }
  275. }
  276. rxrpc_end_tx_phase(call, true, rxrpc_eproto_unexpected_reply);
  277. return true;
  278. }
  279. /*
  280. * End the packet reception phase.
  281. */
  282. static void rxrpc_end_rx_phase(struct rxrpc_call *call, rxrpc_serial_t serial)
  283. {
  284. rxrpc_seq_t whigh = READ_ONCE(call->rx_highest_seq);
  285. _enter("%d,%s", call->debug_id, rxrpc_call_states[__rxrpc_call_state(call)]);
  286. trace_rxrpc_receive(call, rxrpc_receive_end, 0, whigh);
  287. switch (__rxrpc_call_state(call)) {
  288. case RXRPC_CALL_CLIENT_RECV_REPLY:
  289. rxrpc_propose_delay_ACK(call, serial, rxrpc_propose_ack_terminal_ack);
  290. rxrpc_call_completed(call);
  291. break;
  292. case RXRPC_CALL_SERVER_RECV_REQUEST:
  293. rxrpc_set_call_state(call, RXRPC_CALL_SERVER_ACK_REQUEST);
  294. call->expect_req_by = KTIME_MAX;
  295. rxrpc_propose_delay_ACK(call, serial, rxrpc_propose_ack_processing_op);
  296. break;
  297. default:
  298. break;
  299. }
  300. }
  301. static void rxrpc_input_update_ack_window(struct rxrpc_call *call,
  302. rxrpc_seq_t window, rxrpc_seq_t wtop)
  303. {
  304. call->ackr_window = window;
  305. call->ackr_wtop = wtop;
  306. }
  307. /*
  308. * Push a DATA packet onto the Rx queue.
  309. */
  310. static void rxrpc_input_queue_data(struct rxrpc_call *call, struct sk_buff *skb,
  311. rxrpc_seq_t window, rxrpc_seq_t wtop,
  312. enum rxrpc_receive_trace why)
  313. {
  314. struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
  315. bool last = sp->hdr.flags & RXRPC_LAST_PACKET;
  316. __skb_queue_tail(&call->recvmsg_queue, skb);
  317. rxrpc_input_update_ack_window(call, window, wtop);
  318. trace_rxrpc_receive(call, last ? why + 1 : why, sp->hdr.serial, sp->hdr.seq);
  319. if (last)
  320. rxrpc_end_rx_phase(call, sp->hdr.serial);
  321. }
  322. /*
  323. * Process a DATA packet.
  324. */
  325. static void rxrpc_input_data_one(struct rxrpc_call *call, struct sk_buff *skb,
  326. bool *_notify, rxrpc_serial_t *_ack_serial, int *_ack_reason)
  327. {
  328. struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
  329. struct sk_buff *oos;
  330. rxrpc_serial_t serial = sp->hdr.serial;
  331. unsigned int sack = call->ackr_sack_base;
  332. rxrpc_seq_t window = call->ackr_window;
  333. rxrpc_seq_t wtop = call->ackr_wtop;
  334. rxrpc_seq_t wlimit = window + call->rx_winsize - 1;
  335. rxrpc_seq_t seq = sp->hdr.seq;
  336. bool last = sp->hdr.flags & RXRPC_LAST_PACKET;
  337. int ack_reason = -1;
  338. rxrpc_inc_stat(call->rxnet, stat_rx_data);
  339. if (sp->hdr.flags & RXRPC_REQUEST_ACK)
  340. rxrpc_inc_stat(call->rxnet, stat_rx_data_reqack);
  341. if (sp->hdr.flags & RXRPC_JUMBO_PACKET)
  342. rxrpc_inc_stat(call->rxnet, stat_rx_data_jumbo);
  343. if (last) {
  344. if (test_and_set_bit(RXRPC_CALL_RX_LAST, &call->flags) &&
  345. seq + 1 != wtop)
  346. return rxrpc_proto_abort(call, seq, rxrpc_eproto_different_last);
  347. } else {
  348. if (test_bit(RXRPC_CALL_RX_LAST, &call->flags) &&
  349. after_eq(seq, wtop)) {
  350. pr_warn("Packet beyond last: c=%x q=%x window=%x-%x wlimit=%x\n",
  351. call->debug_id, seq, window, wtop, wlimit);
  352. return rxrpc_proto_abort(call, seq, rxrpc_eproto_data_after_last);
  353. }
  354. }
  355. if (after(seq, call->rx_highest_seq))
  356. call->rx_highest_seq = seq;
  357. trace_rxrpc_rx_data(call->debug_id, seq, serial, sp->hdr.flags);
  358. if (before(seq, window)) {
  359. ack_reason = RXRPC_ACK_DUPLICATE;
  360. goto send_ack;
  361. }
  362. if (after(seq, wlimit)) {
  363. ack_reason = RXRPC_ACK_EXCEEDS_WINDOW;
  364. goto send_ack;
  365. }
  366. /* Queue the packet. */
  367. if (seq == window) {
  368. if (sp->hdr.flags & RXRPC_REQUEST_ACK)
  369. ack_reason = RXRPC_ACK_REQUESTED;
  370. /* Send an immediate ACK if we fill in a hole */
  371. else if (!skb_queue_empty(&call->rx_oos_queue))
  372. ack_reason = RXRPC_ACK_DELAY;
  373. window++;
  374. if (after(window, wtop)) {
  375. trace_rxrpc_sack(call, seq, sack, rxrpc_sack_none);
  376. wtop = window;
  377. } else {
  378. trace_rxrpc_sack(call, seq, sack, rxrpc_sack_advance);
  379. sack = (sack + 1) % RXRPC_SACK_SIZE;
  380. }
  381. rxrpc_get_skb(skb, rxrpc_skb_get_to_recvmsg);
  382. spin_lock(&call->recvmsg_queue.lock);
  383. rxrpc_input_queue_data(call, skb, window, wtop, rxrpc_receive_queue);
  384. *_notify = true;
  385. while ((oos = skb_peek(&call->rx_oos_queue))) {
  386. struct rxrpc_skb_priv *osp = rxrpc_skb(oos);
  387. if (after(osp->hdr.seq, window))
  388. break;
  389. __skb_unlink(oos, &call->rx_oos_queue);
  390. last = osp->hdr.flags & RXRPC_LAST_PACKET;
  391. seq = osp->hdr.seq;
  392. call->ackr_sack_table[sack] = 0;
  393. trace_rxrpc_sack(call, seq, sack, rxrpc_sack_fill);
  394. sack = (sack + 1) % RXRPC_SACK_SIZE;
  395. window++;
  396. rxrpc_input_queue_data(call, oos, window, wtop,
  397. rxrpc_receive_queue_oos);
  398. }
  399. spin_unlock(&call->recvmsg_queue.lock);
  400. call->ackr_sack_base = sack;
  401. } else {
  402. unsigned int slot;
  403. ack_reason = RXRPC_ACK_OUT_OF_SEQUENCE;
  404. slot = seq - window;
  405. sack = (sack + slot) % RXRPC_SACK_SIZE;
  406. if (call->ackr_sack_table[sack % RXRPC_SACK_SIZE]) {
  407. ack_reason = RXRPC_ACK_DUPLICATE;
  408. goto send_ack;
  409. }
  410. call->ackr_sack_table[sack % RXRPC_SACK_SIZE] |= 1;
  411. trace_rxrpc_sack(call, seq, sack, rxrpc_sack_oos);
  412. if (after(seq + 1, wtop)) {
  413. wtop = seq + 1;
  414. rxrpc_input_update_ack_window(call, window, wtop);
  415. }
  416. skb_queue_walk(&call->rx_oos_queue, oos) {
  417. struct rxrpc_skb_priv *osp = rxrpc_skb(oos);
  418. if (after(osp->hdr.seq, seq)) {
  419. rxrpc_get_skb(skb, rxrpc_skb_get_to_recvmsg_oos);
  420. __skb_queue_before(&call->rx_oos_queue, oos, skb);
  421. goto oos_queued;
  422. }
  423. }
  424. rxrpc_get_skb(skb, rxrpc_skb_get_to_recvmsg_oos);
  425. __skb_queue_tail(&call->rx_oos_queue, skb);
  426. oos_queued:
  427. trace_rxrpc_receive(call, last ? rxrpc_receive_oos_last : rxrpc_receive_oos,
  428. sp->hdr.serial, sp->hdr.seq);
  429. }
  430. send_ack:
  431. if (ack_reason >= 0) {
  432. if (rxrpc_ack_priority[ack_reason] > rxrpc_ack_priority[*_ack_reason]) {
  433. *_ack_serial = serial;
  434. *_ack_reason = ack_reason;
  435. } else if (rxrpc_ack_priority[ack_reason] == rxrpc_ack_priority[*_ack_reason] &&
  436. ack_reason == RXRPC_ACK_REQUESTED) {
  437. *_ack_serial = serial;
  438. *_ack_reason = ack_reason;
  439. }
  440. }
  441. }
  442. /*
  443. * Split a jumbo packet and file the bits separately.
  444. */
  445. static bool rxrpc_input_split_jumbo(struct rxrpc_call *call, struct sk_buff *skb)
  446. {
  447. struct rxrpc_jumbo_header jhdr;
  448. struct rxrpc_skb_priv *sp = rxrpc_skb(skb), *jsp;
  449. struct sk_buff *jskb;
  450. rxrpc_serial_t ack_serial = 0;
  451. unsigned int offset = sizeof(struct rxrpc_wire_header);
  452. unsigned int len = skb->len - offset;
  453. bool notify = false;
  454. int ack_reason = 0;
  455. while (sp->hdr.flags & RXRPC_JUMBO_PACKET) {
  456. if (len < RXRPC_JUMBO_SUBPKTLEN)
  457. goto protocol_error;
  458. if (sp->hdr.flags & RXRPC_LAST_PACKET)
  459. goto protocol_error;
  460. if (skb_copy_bits(skb, offset + RXRPC_JUMBO_DATALEN,
  461. &jhdr, sizeof(jhdr)) < 0)
  462. goto protocol_error;
  463. jskb = skb_clone(skb, GFP_NOFS);
  464. if (!jskb) {
  465. kdebug("couldn't clone");
  466. return false;
  467. }
  468. rxrpc_new_skb(jskb, rxrpc_skb_new_jumbo_subpacket);
  469. jsp = rxrpc_skb(jskb);
  470. jsp->offset = offset;
  471. jsp->len = RXRPC_JUMBO_DATALEN;
  472. rxrpc_input_data_one(call, jskb, &notify, &ack_serial, &ack_reason);
  473. rxrpc_free_skb(jskb, rxrpc_skb_put_jumbo_subpacket);
  474. sp->hdr.flags = jhdr.flags;
  475. sp->hdr._rsvd = ntohs(jhdr._rsvd);
  476. sp->hdr.seq++;
  477. sp->hdr.serial++;
  478. offset += RXRPC_JUMBO_SUBPKTLEN;
  479. len -= RXRPC_JUMBO_SUBPKTLEN;
  480. }
  481. sp->offset = offset;
  482. sp->len = len;
  483. rxrpc_input_data_one(call, skb, &notify, &ack_serial, &ack_reason);
  484. if (ack_reason > 0) {
  485. rxrpc_send_ACK(call, ack_reason, ack_serial,
  486. rxrpc_propose_ack_input_data);
  487. } else {
  488. call->ackr_nr_unacked++;
  489. rxrpc_propose_delay_ACK(call, sp->hdr.serial,
  490. rxrpc_propose_ack_input_data);
  491. }
  492. if (notify && !test_bit(RXRPC_CALL_CONN_CHALLENGING, &call->flags)) {
  493. trace_rxrpc_notify_socket(call->debug_id, sp->hdr.serial);
  494. rxrpc_notify_socket(call);
  495. }
  496. return true;
  497. protocol_error:
  498. return false;
  499. }
  500. /*
  501. * Process a DATA packet, adding the packet to the Rx ring. The caller's
  502. * packet ref must be passed on or discarded.
  503. */
  504. static void rxrpc_input_data(struct rxrpc_call *call, struct sk_buff *skb)
  505. {
  506. struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
  507. rxrpc_serial_t serial = sp->hdr.serial;
  508. rxrpc_seq_t seq0 = sp->hdr.seq;
  509. _enter("{%x,%x,%x},{%u,%x}",
  510. call->ackr_window, call->ackr_wtop, call->rx_highest_seq,
  511. skb->len, seq0);
  512. if (__rxrpc_call_is_complete(call))
  513. return;
  514. switch (__rxrpc_call_state(call)) {
  515. case RXRPC_CALL_CLIENT_SEND_REQUEST:
  516. case RXRPC_CALL_CLIENT_AWAIT_REPLY:
  517. /* Received data implicitly ACKs all of the request
  518. * packets we sent when we're acting as a client.
  519. */
  520. if (!rxrpc_receiving_reply(call))
  521. goto out_notify;
  522. break;
  523. case RXRPC_CALL_SERVER_RECV_REQUEST: {
  524. unsigned long timo = READ_ONCE(call->next_req_timo);
  525. if (timo) {
  526. ktime_t delay = ms_to_ktime(timo);
  527. call->expect_req_by = ktime_add(ktime_get_real(), delay);
  528. trace_rxrpc_timer_set(call, delay, rxrpc_timer_trace_idle);
  529. }
  530. break;
  531. }
  532. default:
  533. break;
  534. }
  535. if (!rxrpc_input_split_jumbo(call, skb)) {
  536. rxrpc_proto_abort(call, sp->hdr.seq, rxrpc_badmsg_bad_jumbo);
  537. goto out_notify;
  538. }
  539. return;
  540. out_notify:
  541. trace_rxrpc_notify_socket(call->debug_id, serial);
  542. rxrpc_notify_socket(call);
  543. _leave(" [queued]");
  544. }
  545. /*
  546. * See if there's a cached RTT probe to complete.
  547. */
  548. static void rxrpc_complete_rtt_probe(struct rxrpc_call *call,
  549. ktime_t resp_time,
  550. rxrpc_serial_t acked_serial,
  551. rxrpc_serial_t ack_serial,
  552. enum rxrpc_rtt_rx_trace type)
  553. {
  554. rxrpc_serial_t orig_serial;
  555. unsigned long avail;
  556. ktime_t sent_at;
  557. bool matched = false;
  558. int i;
  559. avail = READ_ONCE(call->rtt_avail);
  560. smp_rmb(); /* Read avail bits before accessing data. */
  561. for (i = 0; i < ARRAY_SIZE(call->rtt_serial); i++) {
  562. if (!test_bit(i + RXRPC_CALL_RTT_PEND_SHIFT, &avail))
  563. continue;
  564. sent_at = call->rtt_sent_at[i];
  565. orig_serial = call->rtt_serial[i];
  566. if (orig_serial == acked_serial) {
  567. clear_bit(i + RXRPC_CALL_RTT_PEND_SHIFT, &call->rtt_avail);
  568. smp_mb(); /* Read data before setting avail bit */
  569. set_bit(i, &call->rtt_avail);
  570. rxrpc_peer_add_rtt(call, type, i, acked_serial, ack_serial,
  571. sent_at, resp_time);
  572. matched = true;
  573. }
  574. /* If a later serial is being acked, then mark this slot as
  575. * being available.
  576. */
  577. if (after(acked_serial, orig_serial)) {
  578. trace_rxrpc_rtt_rx(call, rxrpc_rtt_rx_obsolete, i,
  579. orig_serial, acked_serial, 0, 0);
  580. clear_bit(i + RXRPC_CALL_RTT_PEND_SHIFT, &call->rtt_avail);
  581. smp_wmb();
  582. set_bit(i, &call->rtt_avail);
  583. }
  584. }
  585. if (!matched)
  586. trace_rxrpc_rtt_rx(call, rxrpc_rtt_rx_lost, 9, 0, acked_serial, 0, 0);
  587. }
  588. /*
  589. * Process the extra information that may be appended to an ACK packet
  590. */
  591. static void rxrpc_input_ack_trailer(struct rxrpc_call *call, struct sk_buff *skb,
  592. struct rxrpc_acktrailer *trailer)
  593. {
  594. struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
  595. struct rxrpc_peer *peer;
  596. unsigned int mtu;
  597. bool wake = false;
  598. u32 rwind = ntohl(trailer->rwind);
  599. if (rwind > RXRPC_TX_MAX_WINDOW)
  600. rwind = RXRPC_TX_MAX_WINDOW;
  601. if (call->tx_winsize != rwind) {
  602. if (rwind > call->tx_winsize)
  603. wake = true;
  604. trace_rxrpc_rx_rwind_change(call, sp->hdr.serial, rwind, wake);
  605. call->tx_winsize = rwind;
  606. }
  607. mtu = min(ntohl(trailer->maxMTU), ntohl(trailer->ifMTU));
  608. peer = call->peer;
  609. if (mtu < peer->maxdata) {
  610. spin_lock(&peer->lock);
  611. peer->maxdata = mtu;
  612. peer->mtu = mtu + peer->hdrsize;
  613. spin_unlock(&peer->lock);
  614. }
  615. if (wake)
  616. wake_up(&call->waitq);
  617. }
  618. /*
  619. * Determine how many nacks from the previous ACK have now been satisfied.
  620. */
  621. static rxrpc_seq_t rxrpc_input_check_prev_ack(struct rxrpc_call *call,
  622. struct rxrpc_ack_summary *summary,
  623. rxrpc_seq_t seq)
  624. {
  625. struct sk_buff *skb = call->cong_last_nack;
  626. struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
  627. unsigned int i, new_acks = 0, retained_nacks = 0;
  628. rxrpc_seq_t old_seq = sp->ack.first_ack;
  629. u8 *acks = skb->data + sizeof(struct rxrpc_wire_header) + sizeof(struct rxrpc_ackpacket);
  630. if (after_eq(seq, old_seq + sp->ack.nr_acks)) {
  631. summary->nr_new_acks += sp->ack.nr_nacks;
  632. summary->nr_new_acks += seq - (old_seq + sp->ack.nr_acks);
  633. summary->nr_retained_nacks = 0;
  634. } else if (seq == old_seq) {
  635. summary->nr_retained_nacks = sp->ack.nr_nacks;
  636. } else {
  637. for (i = 0; i < sp->ack.nr_acks; i++) {
  638. if (acks[i] == RXRPC_ACK_TYPE_NACK) {
  639. if (before(old_seq + i, seq))
  640. new_acks++;
  641. else
  642. retained_nacks++;
  643. }
  644. }
  645. summary->nr_new_acks += new_acks;
  646. summary->nr_retained_nacks = retained_nacks;
  647. }
  648. return old_seq + sp->ack.nr_acks;
  649. }
  650. /*
  651. * Process individual soft ACKs.
  652. *
  653. * Each ACK in the array corresponds to one packet and can be either an ACK or
  654. * a NAK. If we get find an explicitly NAK'd packet we resend immediately;
  655. * packets that lie beyond the end of the ACK list are scheduled for resend by
  656. * the timer on the basis that the peer might just not have processed them at
  657. * the time the ACK was sent.
  658. */
  659. static void rxrpc_input_soft_acks(struct rxrpc_call *call,
  660. struct rxrpc_ack_summary *summary,
  661. struct sk_buff *skb,
  662. rxrpc_seq_t seq,
  663. rxrpc_seq_t since)
  664. {
  665. struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
  666. unsigned int i, old_nacks = 0;
  667. rxrpc_seq_t lowest_nak = seq + sp->ack.nr_acks;
  668. u8 *acks = skb->data + sizeof(struct rxrpc_wire_header) + sizeof(struct rxrpc_ackpacket);
  669. for (i = 0; i < sp->ack.nr_acks; i++) {
  670. if (acks[i] == RXRPC_ACK_TYPE_ACK) {
  671. summary->nr_acks++;
  672. if (after_eq(seq, since))
  673. summary->nr_new_acks++;
  674. } else {
  675. summary->saw_nacks = true;
  676. if (before(seq, since)) {
  677. /* Overlap with previous ACK */
  678. old_nacks++;
  679. } else {
  680. summary->nr_new_nacks++;
  681. sp->ack.nr_nacks++;
  682. }
  683. if (before(seq, lowest_nak))
  684. lowest_nak = seq;
  685. }
  686. seq++;
  687. }
  688. if (lowest_nak != call->acks_lowest_nak) {
  689. call->acks_lowest_nak = lowest_nak;
  690. summary->new_low_nack = true;
  691. }
  692. /* We *can* have more nacks than we did - the peer is permitted to drop
  693. * packets it has soft-acked and re-request them. Further, it is
  694. * possible for the nack distribution to change whilst the number of
  695. * nacks stays the same or goes down.
  696. */
  697. if (old_nacks < summary->nr_retained_nacks)
  698. summary->nr_new_acks += summary->nr_retained_nacks - old_nacks;
  699. summary->nr_retained_nacks = old_nacks;
  700. }
  701. /*
  702. * Return true if the ACK is valid - ie. it doesn't appear to have regressed
  703. * with respect to the ack state conveyed by preceding ACKs.
  704. */
  705. static bool rxrpc_is_ack_valid(struct rxrpc_call *call,
  706. rxrpc_seq_t first_pkt, rxrpc_seq_t prev_pkt)
  707. {
  708. rxrpc_seq_t base = READ_ONCE(call->acks_first_seq);
  709. if (after(first_pkt, base))
  710. return true; /* The window advanced */
  711. if (before(first_pkt, base))
  712. return false; /* firstPacket regressed */
  713. if (after_eq(prev_pkt, call->acks_prev_seq))
  714. return true; /* previousPacket hasn't regressed. */
  715. /* Some rx implementations put a serial number in previousPacket. */
  716. if (after_eq(prev_pkt, base + call->tx_winsize))
  717. return false;
  718. return true;
  719. }
  720. /*
  721. * Process an ACK packet.
  722. *
  723. * ack.firstPacket is the sequence number of the first soft-ACK'd/NAK'd packet
  724. * in the ACK array. Anything before that is hard-ACK'd and may be discarded.
  725. *
  726. * A hard-ACK means that a packet has been processed and may be discarded; a
  727. * soft-ACK means that the packet may be discarded and retransmission
  728. * requested. A phase is complete when all packets are hard-ACK'd.
  729. */
  730. static void rxrpc_input_ack(struct rxrpc_call *call, struct sk_buff *skb)
  731. {
  732. struct rxrpc_ack_summary summary = { 0 };
  733. struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
  734. struct rxrpc_acktrailer trailer;
  735. rxrpc_serial_t ack_serial, acked_serial;
  736. rxrpc_seq_t first_soft_ack, hard_ack, prev_pkt, since;
  737. int nr_acks, offset, ioffset;
  738. _enter("");
  739. offset = sizeof(struct rxrpc_wire_header) + sizeof(struct rxrpc_ackpacket);
  740. ack_serial = sp->hdr.serial;
  741. acked_serial = sp->ack.acked_serial;
  742. first_soft_ack = sp->ack.first_ack;
  743. prev_pkt = sp->ack.prev_ack;
  744. nr_acks = sp->ack.nr_acks;
  745. hard_ack = first_soft_ack - 1;
  746. summary.ack_reason = (sp->ack.reason < RXRPC_ACK__INVALID ?
  747. sp->ack.reason : RXRPC_ACK__INVALID);
  748. trace_rxrpc_rx_ack(call, ack_serial, acked_serial,
  749. first_soft_ack, prev_pkt,
  750. summary.ack_reason, nr_acks);
  751. rxrpc_inc_stat(call->rxnet, stat_rx_acks[summary.ack_reason]);
  752. if (acked_serial != 0) {
  753. switch (summary.ack_reason) {
  754. case RXRPC_ACK_PING_RESPONSE:
  755. rxrpc_complete_rtt_probe(call, skb->tstamp, acked_serial, ack_serial,
  756. rxrpc_rtt_rx_ping_response);
  757. break;
  758. case RXRPC_ACK_REQUESTED:
  759. rxrpc_complete_rtt_probe(call, skb->tstamp, acked_serial, ack_serial,
  760. rxrpc_rtt_rx_requested_ack);
  761. break;
  762. default:
  763. rxrpc_complete_rtt_probe(call, skb->tstamp, acked_serial, ack_serial,
  764. rxrpc_rtt_rx_other_ack);
  765. break;
  766. }
  767. }
  768. /* If we get an EXCEEDS_WINDOW ACK from the server, it probably
  769. * indicates that the client address changed due to NAT. The server
  770. * lost the call because it switched to a different peer.
  771. */
  772. if (unlikely(summary.ack_reason == RXRPC_ACK_EXCEEDS_WINDOW) &&
  773. first_soft_ack == 1 &&
  774. prev_pkt == 0 &&
  775. rxrpc_is_client_call(call)) {
  776. rxrpc_set_call_completion(call, RXRPC_CALL_REMOTELY_ABORTED,
  777. 0, -ENETRESET);
  778. goto send_response;
  779. }
  780. /* If we get an OUT_OF_SEQUENCE ACK from the server, that can also
  781. * indicate a change of address. However, we can retransmit the call
  782. * if we still have it buffered to the beginning.
  783. */
  784. if (unlikely(summary.ack_reason == RXRPC_ACK_OUT_OF_SEQUENCE) &&
  785. first_soft_ack == 1 &&
  786. prev_pkt == 0 &&
  787. call->acks_hard_ack == 0 &&
  788. rxrpc_is_client_call(call)) {
  789. rxrpc_set_call_completion(call, RXRPC_CALL_REMOTELY_ABORTED,
  790. 0, -ENETRESET);
  791. goto send_response;
  792. }
  793. /* Discard any out-of-order or duplicate ACKs (outside lock). */
  794. if (!rxrpc_is_ack_valid(call, first_soft_ack, prev_pkt)) {
  795. trace_rxrpc_rx_discard_ack(call->debug_id, ack_serial,
  796. first_soft_ack, call->acks_first_seq,
  797. prev_pkt, call->acks_prev_seq);
  798. goto send_response;
  799. }
  800. trailer.maxMTU = 0;
  801. ioffset = offset + nr_acks + 3;
  802. if (skb->len >= ioffset + sizeof(trailer) &&
  803. skb_copy_bits(skb, ioffset, &trailer, sizeof(trailer)) < 0)
  804. return rxrpc_proto_abort(call, 0, rxrpc_badmsg_short_ack_trailer);
  805. if (nr_acks > 0)
  806. skb_condense(skb);
  807. if (call->cong_last_nack) {
  808. since = rxrpc_input_check_prev_ack(call, &summary, first_soft_ack);
  809. rxrpc_free_skb(call->cong_last_nack, rxrpc_skb_put_last_nack);
  810. call->cong_last_nack = NULL;
  811. } else {
  812. summary.nr_new_acks = first_soft_ack - call->acks_first_seq;
  813. call->acks_lowest_nak = first_soft_ack + nr_acks;
  814. since = first_soft_ack;
  815. }
  816. call->acks_latest_ts = skb->tstamp;
  817. call->acks_first_seq = first_soft_ack;
  818. call->acks_prev_seq = prev_pkt;
  819. switch (summary.ack_reason) {
  820. case RXRPC_ACK_PING:
  821. break;
  822. default:
  823. if (acked_serial && after(acked_serial, call->acks_highest_serial))
  824. call->acks_highest_serial = acked_serial;
  825. break;
  826. }
  827. /* Parse rwind and mtu sizes if provided. */
  828. if (trailer.maxMTU)
  829. rxrpc_input_ack_trailer(call, skb, &trailer);
  830. if (first_soft_ack == 0)
  831. return rxrpc_proto_abort(call, 0, rxrpc_eproto_ackr_zero);
  832. /* Ignore ACKs unless we are or have just been transmitting. */
  833. switch (__rxrpc_call_state(call)) {
  834. case RXRPC_CALL_CLIENT_SEND_REQUEST:
  835. case RXRPC_CALL_CLIENT_AWAIT_REPLY:
  836. case RXRPC_CALL_SERVER_SEND_REPLY:
  837. case RXRPC_CALL_SERVER_AWAIT_ACK:
  838. break;
  839. default:
  840. goto send_response;
  841. }
  842. if (before(hard_ack, call->acks_hard_ack) ||
  843. after(hard_ack, call->tx_top))
  844. return rxrpc_proto_abort(call, 0, rxrpc_eproto_ackr_outside_window);
  845. if (nr_acks > call->tx_top - hard_ack)
  846. return rxrpc_proto_abort(call, 0, rxrpc_eproto_ackr_sack_overflow);
  847. if (after(hard_ack, call->acks_hard_ack)) {
  848. if (rxrpc_rotate_tx_window(call, hard_ack, &summary)) {
  849. rxrpc_end_tx_phase(call, false, rxrpc_eproto_unexpected_ack);
  850. goto send_response;
  851. }
  852. }
  853. if (nr_acks > 0) {
  854. if (offset > (int)skb->len - nr_acks)
  855. return rxrpc_proto_abort(call, 0, rxrpc_eproto_ackr_short_sack);
  856. rxrpc_input_soft_acks(call, &summary, skb, first_soft_ack, since);
  857. rxrpc_get_skb(skb, rxrpc_skb_get_last_nack);
  858. call->cong_last_nack = skb;
  859. }
  860. if (test_bit(RXRPC_CALL_TX_LAST, &call->flags) &&
  861. summary.nr_acks == call->tx_top - hard_ack &&
  862. rxrpc_is_client_call(call))
  863. rxrpc_propose_ping(call, ack_serial,
  864. rxrpc_propose_ack_ping_for_lost_reply);
  865. rxrpc_congestion_management(call, skb, &summary, acked_serial);
  866. send_response:
  867. if (summary.ack_reason == RXRPC_ACK_PING)
  868. rxrpc_send_ACK(call, RXRPC_ACK_PING_RESPONSE, ack_serial,
  869. rxrpc_propose_ack_respond_to_ping);
  870. else if (sp->hdr.flags & RXRPC_REQUEST_ACK)
  871. rxrpc_send_ACK(call, RXRPC_ACK_REQUESTED, ack_serial,
  872. rxrpc_propose_ack_respond_to_ack);
  873. }
  874. /*
  875. * Process an ACKALL packet.
  876. */
  877. static void rxrpc_input_ackall(struct rxrpc_call *call, struct sk_buff *skb)
  878. {
  879. struct rxrpc_ack_summary summary = { 0 };
  880. if (rxrpc_rotate_tx_window(call, call->tx_top, &summary))
  881. rxrpc_end_tx_phase(call, false, rxrpc_eproto_unexpected_ackall);
  882. }
  883. /*
  884. * Process an ABORT packet directed at a call.
  885. */
  886. static void rxrpc_input_abort(struct rxrpc_call *call, struct sk_buff *skb)
  887. {
  888. struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
  889. trace_rxrpc_rx_abort(call, sp->hdr.serial, skb->priority);
  890. rxrpc_set_call_completion(call, RXRPC_CALL_REMOTELY_ABORTED,
  891. skb->priority, -ECONNABORTED);
  892. }
  893. /*
  894. * Process an incoming call packet.
  895. */
  896. void rxrpc_input_call_packet(struct rxrpc_call *call, struct sk_buff *skb)
  897. {
  898. struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
  899. unsigned long timo;
  900. _enter("%p,%p", call, skb);
  901. if (sp->hdr.serviceId != call->dest_srx.srx_service)
  902. call->dest_srx.srx_service = sp->hdr.serviceId;
  903. if ((int)sp->hdr.serial - (int)call->rx_serial > 0)
  904. call->rx_serial = sp->hdr.serial;
  905. if (!test_bit(RXRPC_CALL_RX_HEARD, &call->flags))
  906. set_bit(RXRPC_CALL_RX_HEARD, &call->flags);
  907. timo = READ_ONCE(call->next_rx_timo);
  908. if (timo) {
  909. ktime_t delay = ms_to_ktime(timo);
  910. call->expect_rx_by = ktime_add(ktime_get_real(), delay);
  911. trace_rxrpc_timer_set(call, delay, rxrpc_timer_trace_expect_rx);
  912. }
  913. switch (sp->hdr.type) {
  914. case RXRPC_PACKET_TYPE_DATA:
  915. return rxrpc_input_data(call, skb);
  916. case RXRPC_PACKET_TYPE_ACK:
  917. return rxrpc_input_ack(call, skb);
  918. case RXRPC_PACKET_TYPE_BUSY:
  919. /* Just ignore BUSY packets from the server; the retry and
  920. * lifespan timers will take care of business. BUSY packets
  921. * from the client don't make sense.
  922. */
  923. return;
  924. case RXRPC_PACKET_TYPE_ABORT:
  925. return rxrpc_input_abort(call, skb);
  926. case RXRPC_PACKET_TYPE_ACKALL:
  927. return rxrpc_input_ackall(call, skb);
  928. default:
  929. break;
  930. }
  931. }
  932. /*
  933. * Handle a new service call on a channel implicitly completing the preceding
  934. * call on that channel. This does not apply to client conns.
  935. *
  936. * TODO: If callNumber > call_id + 1, renegotiate security.
  937. */
  938. void rxrpc_implicit_end_call(struct rxrpc_call *call, struct sk_buff *skb)
  939. {
  940. switch (__rxrpc_call_state(call)) {
  941. case RXRPC_CALL_SERVER_AWAIT_ACK:
  942. rxrpc_call_completed(call);
  943. fallthrough;
  944. case RXRPC_CALL_COMPLETE:
  945. break;
  946. default:
  947. rxrpc_abort_call(call, 0, RX_CALL_DEAD, -ESHUTDOWN,
  948. rxrpc_eproto_improper_term);
  949. trace_rxrpc_improper_term(call);
  950. break;
  951. }
  952. rxrpc_input_call_event(call, skb);
  953. }