txbuf.c 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238
  1. // SPDX-License-Identifier: GPL-2.0-or-later
  2. /* RxRPC Tx data buffering.
  3. *
  4. * Copyright (C) 2022 Red Hat, Inc. All Rights Reserved.
  5. * Written by David Howells (dhowells@redhat.com)
  6. */
  7. #define pr_fmt(fmt) KBUILD_MODNAME ": " fmt
  8. #include <linux/slab.h>
  9. #include "ar-internal.h"
  10. static atomic_t rxrpc_txbuf_debug_ids;
  11. atomic_t rxrpc_nr_txbuf;
  12. /*
  13. * Allocate and partially initialise a data transmission buffer.
  14. */
  15. struct rxrpc_txbuf *rxrpc_alloc_data_txbuf(struct rxrpc_call *call, size_t data_size,
  16. size_t data_align, gfp_t gfp)
  17. {
  18. struct rxrpc_wire_header *whdr;
  19. struct rxrpc_txbuf *txb;
  20. size_t total, hoff;
  21. void *buf;
  22. txb = kmalloc(sizeof(*txb), gfp);
  23. if (!txb)
  24. return NULL;
  25. hoff = round_up(sizeof(*whdr), data_align) - sizeof(*whdr);
  26. total = hoff + sizeof(*whdr) + data_size;
  27. data_align = umax(data_align, L1_CACHE_BYTES);
  28. mutex_lock(&call->conn->tx_data_alloc_lock);
  29. buf = page_frag_alloc_align(&call->conn->tx_data_alloc, total, gfp,
  30. data_align);
  31. mutex_unlock(&call->conn->tx_data_alloc_lock);
  32. if (!buf) {
  33. kfree(txb);
  34. return NULL;
  35. }
  36. whdr = buf + hoff;
  37. INIT_LIST_HEAD(&txb->call_link);
  38. INIT_LIST_HEAD(&txb->tx_link);
  39. refcount_set(&txb->ref, 1);
  40. txb->last_sent = KTIME_MIN;
  41. txb->call_debug_id = call->debug_id;
  42. txb->debug_id = atomic_inc_return(&rxrpc_txbuf_debug_ids);
  43. txb->space = data_size;
  44. txb->len = 0;
  45. txb->offset = sizeof(*whdr);
  46. txb->flags = call->conn->out_clientflag;
  47. txb->ack_why = 0;
  48. txb->seq = call->tx_prepared + 1;
  49. txb->serial = 0;
  50. txb->cksum = 0;
  51. txb->nr_kvec = 1;
  52. txb->kvec[0].iov_base = whdr;
  53. txb->kvec[0].iov_len = sizeof(*whdr);
  54. whdr->epoch = htonl(call->conn->proto.epoch);
  55. whdr->cid = htonl(call->cid);
  56. whdr->callNumber = htonl(call->call_id);
  57. whdr->seq = htonl(txb->seq);
  58. whdr->type = RXRPC_PACKET_TYPE_DATA;
  59. whdr->flags = 0;
  60. whdr->userStatus = 0;
  61. whdr->securityIndex = call->security_ix;
  62. whdr->_rsvd = 0;
  63. whdr->serviceId = htons(call->dest_srx.srx_service);
  64. trace_rxrpc_txbuf(txb->debug_id, txb->call_debug_id, txb->seq, 1,
  65. rxrpc_txbuf_alloc_data);
  66. atomic_inc(&rxrpc_nr_txbuf);
  67. return txb;
  68. }
  69. /*
  70. * Allocate and partially initialise an ACK packet.
  71. */
  72. struct rxrpc_txbuf *rxrpc_alloc_ack_txbuf(struct rxrpc_call *call, size_t sack_size)
  73. {
  74. struct rxrpc_wire_header *whdr;
  75. struct rxrpc_acktrailer *trailer;
  76. struct rxrpc_ackpacket *ack;
  77. struct rxrpc_txbuf *txb;
  78. gfp_t gfp = rcu_read_lock_held() ? GFP_ATOMIC | __GFP_NOWARN : GFP_NOFS;
  79. void *buf, *buf2 = NULL;
  80. u8 *filler;
  81. txb = kmalloc(sizeof(*txb), gfp);
  82. if (!txb)
  83. return NULL;
  84. buf = page_frag_alloc(&call->local->tx_alloc,
  85. sizeof(*whdr) + sizeof(*ack) + 1 + 3 + sizeof(*trailer), gfp);
  86. if (!buf) {
  87. kfree(txb);
  88. return NULL;
  89. }
  90. if (sack_size) {
  91. buf2 = page_frag_alloc(&call->local->tx_alloc, sack_size, gfp);
  92. if (!buf2) {
  93. page_frag_free(buf);
  94. kfree(txb);
  95. return NULL;
  96. }
  97. }
  98. whdr = buf;
  99. ack = buf + sizeof(*whdr);
  100. filler = buf + sizeof(*whdr) + sizeof(*ack) + 1;
  101. trailer = buf + sizeof(*whdr) + sizeof(*ack) + 1 + 3;
  102. INIT_LIST_HEAD(&txb->call_link);
  103. INIT_LIST_HEAD(&txb->tx_link);
  104. refcount_set(&txb->ref, 1);
  105. txb->call_debug_id = call->debug_id;
  106. txb->debug_id = atomic_inc_return(&rxrpc_txbuf_debug_ids);
  107. txb->space = 0;
  108. txb->len = sizeof(*whdr) + sizeof(*ack) + 3 + sizeof(*trailer);
  109. txb->offset = 0;
  110. txb->flags = call->conn->out_clientflag;
  111. txb->ack_rwind = 0;
  112. txb->seq = 0;
  113. txb->serial = 0;
  114. txb->cksum = 0;
  115. txb->nr_kvec = 3;
  116. txb->kvec[0].iov_base = whdr;
  117. txb->kvec[0].iov_len = sizeof(*whdr) + sizeof(*ack);
  118. txb->kvec[1].iov_base = buf2;
  119. txb->kvec[1].iov_len = sack_size;
  120. txb->kvec[2].iov_base = filler;
  121. txb->kvec[2].iov_len = 3 + sizeof(*trailer);
  122. whdr->epoch = htonl(call->conn->proto.epoch);
  123. whdr->cid = htonl(call->cid);
  124. whdr->callNumber = htonl(call->call_id);
  125. whdr->seq = 0;
  126. whdr->type = RXRPC_PACKET_TYPE_ACK;
  127. whdr->flags = 0;
  128. whdr->userStatus = 0;
  129. whdr->securityIndex = call->security_ix;
  130. whdr->_rsvd = 0;
  131. whdr->serviceId = htons(call->dest_srx.srx_service);
  132. get_page(virt_to_head_page(trailer));
  133. trace_rxrpc_txbuf(txb->debug_id, txb->call_debug_id, txb->seq, 1,
  134. rxrpc_txbuf_alloc_ack);
  135. atomic_inc(&rxrpc_nr_txbuf);
  136. return txb;
  137. }
  138. void rxrpc_get_txbuf(struct rxrpc_txbuf *txb, enum rxrpc_txbuf_trace what)
  139. {
  140. int r;
  141. __refcount_inc(&txb->ref, &r);
  142. trace_rxrpc_txbuf(txb->debug_id, txb->call_debug_id, txb->seq, r + 1, what);
  143. }
  144. void rxrpc_see_txbuf(struct rxrpc_txbuf *txb, enum rxrpc_txbuf_trace what)
  145. {
  146. int r = refcount_read(&txb->ref);
  147. trace_rxrpc_txbuf(txb->debug_id, txb->call_debug_id, txb->seq, r, what);
  148. }
  149. static void rxrpc_free_txbuf(struct rxrpc_txbuf *txb)
  150. {
  151. int i;
  152. trace_rxrpc_txbuf(txb->debug_id, txb->call_debug_id, txb->seq, 0,
  153. rxrpc_txbuf_free);
  154. for (i = 0; i < txb->nr_kvec; i++)
  155. if (txb->kvec[i].iov_base)
  156. page_frag_free(txb->kvec[i].iov_base);
  157. kfree(txb);
  158. atomic_dec(&rxrpc_nr_txbuf);
  159. }
  160. void rxrpc_put_txbuf(struct rxrpc_txbuf *txb, enum rxrpc_txbuf_trace what)
  161. {
  162. unsigned int debug_id, call_debug_id;
  163. rxrpc_seq_t seq;
  164. bool dead;
  165. int r;
  166. if (txb) {
  167. debug_id = txb->debug_id;
  168. call_debug_id = txb->call_debug_id;
  169. seq = txb->seq;
  170. dead = __refcount_dec_and_test(&txb->ref, &r);
  171. trace_rxrpc_txbuf(debug_id, call_debug_id, seq, r - 1, what);
  172. if (dead)
  173. rxrpc_free_txbuf(txb);
  174. }
  175. }
  176. /*
  177. * Shrink the transmit buffer.
  178. */
  179. void rxrpc_shrink_call_tx_buffer(struct rxrpc_call *call)
  180. {
  181. struct rxrpc_txbuf *txb;
  182. rxrpc_seq_t hard_ack = smp_load_acquire(&call->acks_hard_ack);
  183. bool wake = false;
  184. _enter("%x/%x/%x", call->tx_bottom, call->acks_hard_ack, call->tx_top);
  185. while ((txb = list_first_entry_or_null(&call->tx_buffer,
  186. struct rxrpc_txbuf, call_link))) {
  187. hard_ack = smp_load_acquire(&call->acks_hard_ack);
  188. if (before(hard_ack, txb->seq))
  189. break;
  190. if (txb->seq != call->tx_bottom + 1)
  191. rxrpc_see_txbuf(txb, rxrpc_txbuf_see_out_of_step);
  192. ASSERTCMP(txb->seq, ==, call->tx_bottom + 1);
  193. smp_store_release(&call->tx_bottom, call->tx_bottom + 1);
  194. list_del_rcu(&txb->call_link);
  195. trace_rxrpc_txqueue(call, rxrpc_txqueue_dequeue);
  196. rxrpc_put_txbuf(txb, rxrpc_txbuf_put_rotated);
  197. if (after(call->acks_hard_ack, call->tx_bottom + 128))
  198. wake = true;
  199. }
  200. if (wake)
  201. wake_up(&call->waitq);
  202. }