rxperf.c 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637
  1. // SPDX-License-Identifier: GPL-2.0-or-later
  2. /* In-kernel rxperf server for testing purposes.
  3. *
  4. * Copyright (C) 2022 Red Hat, Inc. All Rights Reserved.
  5. * Written by David Howells (dhowells@redhat.com)
  6. */
  7. #define pr_fmt(fmt) "rxperf: " fmt
  8. #include <linux/module.h>
  9. #include <linux/slab.h>
  10. #include <net/sock.h>
  11. #include <net/af_rxrpc.h>
  12. #define RXRPC_TRACE_ONLY_DEFINE_ENUMS
  13. #include <trace/events/rxrpc.h>
  14. MODULE_DESCRIPTION("rxperf test server (afs)");
  15. MODULE_AUTHOR("Red Hat, Inc.");
  16. MODULE_LICENSE("GPL");
  17. #define RXPERF_PORT 7009
  18. #define RX_PERF_SERVICE 147
  19. #define RX_PERF_VERSION 3
  20. #define RX_PERF_SEND 0
  21. #define RX_PERF_RECV 1
  22. #define RX_PERF_RPC 3
  23. #define RX_PERF_FILE 4
  24. #define RX_PERF_MAGIC_COOKIE 0x4711
  25. struct rxperf_proto_params {
  26. __be32 version;
  27. __be32 type;
  28. __be32 rsize;
  29. __be32 wsize;
  30. } __packed;
  31. static const u8 rxperf_magic_cookie[] = { 0x00, 0x00, 0x47, 0x11 };
  32. static const u8 secret[8] = { 0xa7, 0x83, 0x8a, 0xcb, 0xc7, 0x83, 0xec, 0x94 };
  33. enum rxperf_call_state {
  34. RXPERF_CALL_SV_AWAIT_PARAMS, /* Server: Awaiting parameter block */
  35. RXPERF_CALL_SV_AWAIT_REQUEST, /* Server: Awaiting request data */
  36. RXPERF_CALL_SV_REPLYING, /* Server: Replying */
  37. RXPERF_CALL_SV_AWAIT_ACK, /* Server: Awaiting final ACK */
  38. RXPERF_CALL_COMPLETE, /* Completed or failed */
  39. };
  40. struct rxperf_call {
  41. struct rxrpc_call *rxcall;
  42. struct iov_iter iter;
  43. struct kvec kvec[1];
  44. struct work_struct work;
  45. const char *type;
  46. size_t iov_len;
  47. size_t req_len; /* Size of request blob */
  48. size_t reply_len; /* Size of reply blob */
  49. unsigned int debug_id;
  50. unsigned int operation_id;
  51. struct rxperf_proto_params params;
  52. __be32 tmp[2];
  53. s32 abort_code;
  54. enum rxperf_call_state state;
  55. short error;
  56. unsigned short unmarshal;
  57. u16 service_id;
  58. int (*deliver)(struct rxperf_call *call);
  59. void (*processor)(struct work_struct *work);
  60. };
  61. static struct socket *rxperf_socket;
  62. static struct key *rxperf_sec_keyring; /* Ring of security/crypto keys */
  63. static struct workqueue_struct *rxperf_workqueue;
  64. static void rxperf_deliver_to_call(struct work_struct *work);
  65. static int rxperf_deliver_param_block(struct rxperf_call *call);
  66. static int rxperf_deliver_request(struct rxperf_call *call);
  67. static int rxperf_process_call(struct rxperf_call *call);
  68. static void rxperf_charge_preallocation(struct work_struct *work);
  69. static DECLARE_WORK(rxperf_charge_preallocation_work,
  70. rxperf_charge_preallocation);
  71. static inline void rxperf_set_call_state(struct rxperf_call *call,
  72. enum rxperf_call_state to)
  73. {
  74. call->state = to;
  75. }
  76. static inline void rxperf_set_call_complete(struct rxperf_call *call,
  77. int error, s32 remote_abort)
  78. {
  79. if (call->state != RXPERF_CALL_COMPLETE) {
  80. call->abort_code = remote_abort;
  81. call->error = error;
  82. call->state = RXPERF_CALL_COMPLETE;
  83. }
  84. }
  85. static void rxperf_rx_discard_new_call(struct rxrpc_call *rxcall,
  86. unsigned long user_call_ID)
  87. {
  88. kfree((struct rxperf_call *)user_call_ID);
  89. }
  90. static void rxperf_rx_new_call(struct sock *sk, struct rxrpc_call *rxcall,
  91. unsigned long user_call_ID)
  92. {
  93. queue_work(rxperf_workqueue, &rxperf_charge_preallocation_work);
  94. }
  95. static void rxperf_queue_call_work(struct rxperf_call *call)
  96. {
  97. queue_work(rxperf_workqueue, &call->work);
  98. }
  99. static void rxperf_notify_rx(struct sock *sk, struct rxrpc_call *rxcall,
  100. unsigned long call_user_ID)
  101. {
  102. struct rxperf_call *call = (struct rxperf_call *)call_user_ID;
  103. if (call->state != RXPERF_CALL_COMPLETE)
  104. rxperf_queue_call_work(call);
  105. }
  106. static void rxperf_rx_attach(struct rxrpc_call *rxcall, unsigned long user_call_ID)
  107. {
  108. struct rxperf_call *call = (struct rxperf_call *)user_call_ID;
  109. call->rxcall = rxcall;
  110. }
  111. static void rxperf_notify_end_reply_tx(struct sock *sock,
  112. struct rxrpc_call *rxcall,
  113. unsigned long call_user_ID)
  114. {
  115. rxperf_set_call_state((struct rxperf_call *)call_user_ID,
  116. RXPERF_CALL_SV_AWAIT_ACK);
  117. }
  118. /*
  119. * Charge the incoming call preallocation.
  120. */
  121. static void rxperf_charge_preallocation(struct work_struct *work)
  122. {
  123. struct rxperf_call *call;
  124. for (;;) {
  125. call = kzalloc(sizeof(*call), GFP_KERNEL);
  126. if (!call)
  127. break;
  128. call->type = "unset";
  129. call->debug_id = atomic_inc_return(&rxrpc_debug_id);
  130. call->deliver = rxperf_deliver_param_block;
  131. call->state = RXPERF_CALL_SV_AWAIT_PARAMS;
  132. call->service_id = RX_PERF_SERVICE;
  133. call->iov_len = sizeof(call->params);
  134. call->kvec[0].iov_len = sizeof(call->params);
  135. call->kvec[0].iov_base = &call->params;
  136. iov_iter_kvec(&call->iter, READ, call->kvec, 1, call->iov_len);
  137. INIT_WORK(&call->work, rxperf_deliver_to_call);
  138. if (rxrpc_kernel_charge_accept(rxperf_socket,
  139. rxperf_notify_rx,
  140. rxperf_rx_attach,
  141. (unsigned long)call,
  142. GFP_KERNEL,
  143. call->debug_id) < 0)
  144. break;
  145. call = NULL;
  146. }
  147. kfree(call);
  148. }
  149. /*
  150. * Open an rxrpc socket and bind it to be a server for callback notifications
  151. * - the socket is left in blocking mode and non-blocking ops use MSG_DONTWAIT
  152. */
  153. static int rxperf_open_socket(void)
  154. {
  155. struct sockaddr_rxrpc srx;
  156. struct socket *socket;
  157. int ret;
  158. ret = sock_create_kern(&init_net, AF_RXRPC, SOCK_DGRAM, PF_INET6,
  159. &socket);
  160. if (ret < 0)
  161. goto error_1;
  162. socket->sk->sk_allocation = GFP_NOFS;
  163. /* bind the callback manager's address to make this a server socket */
  164. memset(&srx, 0, sizeof(srx));
  165. srx.srx_family = AF_RXRPC;
  166. srx.srx_service = RX_PERF_SERVICE;
  167. srx.transport_type = SOCK_DGRAM;
  168. srx.transport_len = sizeof(srx.transport.sin6);
  169. srx.transport.sin6.sin6_family = AF_INET6;
  170. srx.transport.sin6.sin6_port = htons(RXPERF_PORT);
  171. ret = rxrpc_sock_set_min_security_level(socket->sk,
  172. RXRPC_SECURITY_ENCRYPT);
  173. if (ret < 0)
  174. goto error_2;
  175. ret = rxrpc_sock_set_security_keyring(socket->sk, rxperf_sec_keyring);
  176. ret = kernel_bind(socket, (struct sockaddr *)&srx, sizeof(srx));
  177. if (ret < 0)
  178. goto error_2;
  179. rxrpc_kernel_new_call_notification(socket, rxperf_rx_new_call,
  180. rxperf_rx_discard_new_call);
  181. ret = kernel_listen(socket, INT_MAX);
  182. if (ret < 0)
  183. goto error_2;
  184. rxperf_socket = socket;
  185. rxperf_charge_preallocation(&rxperf_charge_preallocation_work);
  186. return 0;
  187. error_2:
  188. sock_release(socket);
  189. error_1:
  190. pr_err("Can't set up rxperf socket: %d\n", ret);
  191. return ret;
  192. }
  193. /*
  194. * close the rxrpc socket rxperf was using
  195. */
  196. static void rxperf_close_socket(void)
  197. {
  198. kernel_listen(rxperf_socket, 0);
  199. kernel_sock_shutdown(rxperf_socket, SHUT_RDWR);
  200. flush_workqueue(rxperf_workqueue);
  201. sock_release(rxperf_socket);
  202. }
  203. /*
  204. * Log remote abort codes that indicate that we have a protocol disagreement
  205. * with the server.
  206. */
  207. static void rxperf_log_error(struct rxperf_call *call, s32 remote_abort)
  208. {
  209. static int max = 0;
  210. const char *msg;
  211. int m;
  212. switch (remote_abort) {
  213. case RX_EOF: msg = "unexpected EOF"; break;
  214. case RXGEN_CC_MARSHAL: msg = "client marshalling"; break;
  215. case RXGEN_CC_UNMARSHAL: msg = "client unmarshalling"; break;
  216. case RXGEN_SS_MARSHAL: msg = "server marshalling"; break;
  217. case RXGEN_SS_UNMARSHAL: msg = "server unmarshalling"; break;
  218. case RXGEN_DECODE: msg = "opcode decode"; break;
  219. case RXGEN_SS_XDRFREE: msg = "server XDR cleanup"; break;
  220. case RXGEN_CC_XDRFREE: msg = "client XDR cleanup"; break;
  221. case -32: msg = "insufficient data"; break;
  222. default:
  223. return;
  224. }
  225. m = max;
  226. if (m < 3) {
  227. max = m + 1;
  228. pr_info("Peer reported %s failure on %s\n", msg, call->type);
  229. }
  230. }
  231. /*
  232. * deliver messages to a call
  233. */
  234. static void rxperf_deliver_to_call(struct work_struct *work)
  235. {
  236. struct rxperf_call *call = container_of(work, struct rxperf_call, work);
  237. enum rxperf_call_state state;
  238. u32 abort_code, remote_abort = 0;
  239. int ret = 0;
  240. if (call->state == RXPERF_CALL_COMPLETE)
  241. return;
  242. while (state = call->state,
  243. state == RXPERF_CALL_SV_AWAIT_PARAMS ||
  244. state == RXPERF_CALL_SV_AWAIT_REQUEST ||
  245. state == RXPERF_CALL_SV_AWAIT_ACK
  246. ) {
  247. if (state == RXPERF_CALL_SV_AWAIT_ACK) {
  248. if (!rxrpc_kernel_check_life(rxperf_socket, call->rxcall))
  249. goto call_complete;
  250. return;
  251. }
  252. ret = call->deliver(call);
  253. if (ret == 0)
  254. ret = rxperf_process_call(call);
  255. switch (ret) {
  256. case 0:
  257. continue;
  258. case -EINPROGRESS:
  259. case -EAGAIN:
  260. return;
  261. case -ECONNABORTED:
  262. rxperf_log_error(call, call->abort_code);
  263. goto call_complete;
  264. case -EOPNOTSUPP:
  265. abort_code = RXGEN_OPCODE;
  266. rxrpc_kernel_abort_call(rxperf_socket, call->rxcall,
  267. abort_code, ret,
  268. rxperf_abort_op_not_supported);
  269. goto call_complete;
  270. case -ENOTSUPP:
  271. abort_code = RX_USER_ABORT;
  272. rxrpc_kernel_abort_call(rxperf_socket, call->rxcall,
  273. abort_code, ret,
  274. rxperf_abort_op_not_supported);
  275. goto call_complete;
  276. case -EIO:
  277. pr_err("Call %u in bad state %u\n",
  278. call->debug_id, call->state);
  279. fallthrough;
  280. case -ENODATA:
  281. case -EBADMSG:
  282. case -EMSGSIZE:
  283. case -ENOMEM:
  284. case -EFAULT:
  285. rxrpc_kernel_abort_call(rxperf_socket, call->rxcall,
  286. RXGEN_SS_UNMARSHAL, ret,
  287. rxperf_abort_unmarshal_error);
  288. goto call_complete;
  289. default:
  290. rxrpc_kernel_abort_call(rxperf_socket, call->rxcall,
  291. RX_CALL_DEAD, ret,
  292. rxperf_abort_general_error);
  293. goto call_complete;
  294. }
  295. }
  296. call_complete:
  297. rxperf_set_call_complete(call, ret, remote_abort);
  298. /* The call may have been requeued */
  299. rxrpc_kernel_shutdown_call(rxperf_socket, call->rxcall);
  300. rxrpc_kernel_put_call(rxperf_socket, call->rxcall);
  301. cancel_work(&call->work);
  302. kfree(call);
  303. }
  304. /*
  305. * Extract a piece of data from the received data socket buffers.
  306. */
  307. static int rxperf_extract_data(struct rxperf_call *call, bool want_more)
  308. {
  309. u32 remote_abort = 0;
  310. int ret;
  311. ret = rxrpc_kernel_recv_data(rxperf_socket, call->rxcall, &call->iter,
  312. &call->iov_len, want_more, &remote_abort,
  313. &call->service_id);
  314. pr_debug("Extract i=%zu l=%zu m=%u ret=%d\n",
  315. iov_iter_count(&call->iter), call->iov_len, want_more, ret);
  316. if (ret == 0 || ret == -EAGAIN)
  317. return ret;
  318. if (ret == 1) {
  319. switch (call->state) {
  320. case RXPERF_CALL_SV_AWAIT_REQUEST:
  321. rxperf_set_call_state(call, RXPERF_CALL_SV_REPLYING);
  322. break;
  323. case RXPERF_CALL_COMPLETE:
  324. pr_debug("premature completion %d", call->error);
  325. return call->error;
  326. default:
  327. break;
  328. }
  329. return 0;
  330. }
  331. rxperf_set_call_complete(call, ret, remote_abort);
  332. return ret;
  333. }
  334. /*
  335. * Grab the operation ID from an incoming manager call.
  336. */
  337. static int rxperf_deliver_param_block(struct rxperf_call *call)
  338. {
  339. u32 version;
  340. int ret;
  341. /* Extract the parameter block */
  342. ret = rxperf_extract_data(call, true);
  343. if (ret < 0)
  344. return ret;
  345. version = ntohl(call->params.version);
  346. call->operation_id = ntohl(call->params.type);
  347. call->deliver = rxperf_deliver_request;
  348. if (version != RX_PERF_VERSION) {
  349. pr_info("Version mismatch %x\n", version);
  350. return -ENOTSUPP;
  351. }
  352. switch (call->operation_id) {
  353. case RX_PERF_SEND:
  354. call->type = "send";
  355. call->reply_len = 0;
  356. call->iov_len = 4; /* Expect req size */
  357. break;
  358. case RX_PERF_RECV:
  359. call->type = "recv";
  360. call->req_len = 0;
  361. call->iov_len = 4; /* Expect reply size */
  362. break;
  363. case RX_PERF_RPC:
  364. call->type = "rpc";
  365. call->iov_len = 8; /* Expect req size and reply size */
  366. break;
  367. case RX_PERF_FILE:
  368. call->type = "file";
  369. fallthrough;
  370. default:
  371. return -EOPNOTSUPP;
  372. }
  373. rxperf_set_call_state(call, RXPERF_CALL_SV_AWAIT_REQUEST);
  374. return call->deliver(call);
  375. }
  376. /*
  377. * Deliver the request data.
  378. */
  379. static int rxperf_deliver_request(struct rxperf_call *call)
  380. {
  381. int ret;
  382. switch (call->unmarshal) {
  383. case 0:
  384. call->kvec[0].iov_len = call->iov_len;
  385. call->kvec[0].iov_base = call->tmp;
  386. iov_iter_kvec(&call->iter, READ, call->kvec, 1, call->iov_len);
  387. call->unmarshal++;
  388. fallthrough;
  389. case 1:
  390. ret = rxperf_extract_data(call, true);
  391. if (ret < 0)
  392. return ret;
  393. switch (call->operation_id) {
  394. case RX_PERF_SEND:
  395. call->type = "send";
  396. call->req_len = ntohl(call->tmp[0]);
  397. call->reply_len = 0;
  398. break;
  399. case RX_PERF_RECV:
  400. call->type = "recv";
  401. call->req_len = 0;
  402. call->reply_len = ntohl(call->tmp[0]);
  403. break;
  404. case RX_PERF_RPC:
  405. call->type = "rpc";
  406. call->req_len = ntohl(call->tmp[0]);
  407. call->reply_len = ntohl(call->tmp[1]);
  408. break;
  409. default:
  410. pr_info("Can't parse extra params\n");
  411. return -EIO;
  412. }
  413. pr_debug("CALL op=%s rq=%zx rp=%zx\n",
  414. call->type, call->req_len, call->reply_len);
  415. call->iov_len = call->req_len;
  416. iov_iter_discard(&call->iter, READ, call->req_len);
  417. call->unmarshal++;
  418. fallthrough;
  419. case 2:
  420. ret = rxperf_extract_data(call, true);
  421. if (ret < 0)
  422. return ret;
  423. /* Deal with the terminal magic cookie. */
  424. call->iov_len = 4;
  425. call->kvec[0].iov_len = call->iov_len;
  426. call->kvec[0].iov_base = call->tmp;
  427. iov_iter_kvec(&call->iter, READ, call->kvec, 1, call->iov_len);
  428. call->unmarshal++;
  429. fallthrough;
  430. case 3:
  431. ret = rxperf_extract_data(call, false);
  432. if (ret < 0)
  433. return ret;
  434. call->unmarshal++;
  435. fallthrough;
  436. default:
  437. return 0;
  438. }
  439. }
  440. /*
  441. * Process a call for which we've received the request.
  442. */
  443. static int rxperf_process_call(struct rxperf_call *call)
  444. {
  445. struct msghdr msg = {};
  446. struct bio_vec bv;
  447. struct kvec iov[1];
  448. ssize_t n;
  449. size_t reply_len = call->reply_len, len;
  450. rxrpc_kernel_set_tx_length(rxperf_socket, call->rxcall,
  451. reply_len + sizeof(rxperf_magic_cookie));
  452. while (reply_len > 0) {
  453. len = min_t(size_t, reply_len, PAGE_SIZE);
  454. bvec_set_page(&bv, ZERO_PAGE(0), len, 0);
  455. iov_iter_bvec(&msg.msg_iter, WRITE, &bv, 1, len);
  456. msg.msg_flags = MSG_MORE;
  457. n = rxrpc_kernel_send_data(rxperf_socket, call->rxcall, &msg,
  458. len, rxperf_notify_end_reply_tx);
  459. if (n < 0)
  460. return n;
  461. if (n == 0)
  462. return -EIO;
  463. reply_len -= n;
  464. }
  465. len = sizeof(rxperf_magic_cookie);
  466. iov[0].iov_base = (void *)rxperf_magic_cookie;
  467. iov[0].iov_len = len;
  468. iov_iter_kvec(&msg.msg_iter, WRITE, iov, 1, len);
  469. msg.msg_flags = 0;
  470. n = rxrpc_kernel_send_data(rxperf_socket, call->rxcall, &msg, len,
  471. rxperf_notify_end_reply_tx);
  472. if (n >= 0)
  473. return 0; /* Success */
  474. if (n == -ENOMEM)
  475. rxrpc_kernel_abort_call(rxperf_socket, call->rxcall,
  476. RXGEN_SS_MARSHAL, -ENOMEM,
  477. rxperf_abort_oom);
  478. return n;
  479. }
  480. /*
  481. * Add a key to the security keyring.
  482. */
  483. static int rxperf_add_key(struct key *keyring)
  484. {
  485. key_ref_t kref;
  486. int ret;
  487. kref = key_create_or_update(make_key_ref(keyring, true),
  488. "rxrpc_s",
  489. __stringify(RX_PERF_SERVICE) ":2",
  490. secret,
  491. sizeof(secret),
  492. KEY_POS_VIEW | KEY_POS_READ | KEY_POS_SEARCH
  493. | KEY_USR_VIEW,
  494. KEY_ALLOC_NOT_IN_QUOTA);
  495. if (IS_ERR(kref)) {
  496. pr_err("Can't allocate rxperf server key: %ld\n", PTR_ERR(kref));
  497. return PTR_ERR(kref);
  498. }
  499. ret = key_link(keyring, key_ref_to_ptr(kref));
  500. if (ret < 0)
  501. pr_err("Can't link rxperf server key: %d\n", ret);
  502. key_ref_put(kref);
  503. return ret;
  504. }
  505. /*
  506. * Initialise the rxperf server.
  507. */
  508. static int __init rxperf_init(void)
  509. {
  510. struct key *keyring;
  511. int ret = -ENOMEM;
  512. pr_info("Server registering\n");
  513. rxperf_workqueue = alloc_workqueue("rxperf", 0, 0);
  514. if (!rxperf_workqueue)
  515. goto error_workqueue;
  516. keyring = keyring_alloc("rxperf_server",
  517. GLOBAL_ROOT_UID, GLOBAL_ROOT_GID, current_cred(),
  518. KEY_POS_VIEW | KEY_POS_READ | KEY_POS_SEARCH |
  519. KEY_POS_WRITE |
  520. KEY_USR_VIEW | KEY_USR_READ | KEY_USR_SEARCH |
  521. KEY_USR_WRITE |
  522. KEY_OTH_VIEW | KEY_OTH_READ | KEY_OTH_SEARCH,
  523. KEY_ALLOC_NOT_IN_QUOTA,
  524. NULL, NULL);
  525. if (IS_ERR(keyring)) {
  526. pr_err("Can't allocate rxperf server keyring: %ld\n",
  527. PTR_ERR(keyring));
  528. goto error_keyring;
  529. }
  530. rxperf_sec_keyring = keyring;
  531. ret = rxperf_add_key(keyring);
  532. if (ret < 0)
  533. goto error_key;
  534. ret = rxperf_open_socket();
  535. if (ret < 0)
  536. goto error_socket;
  537. return 0;
  538. error_socket:
  539. error_key:
  540. key_put(rxperf_sec_keyring);
  541. error_keyring:
  542. destroy_workqueue(rxperf_workqueue);
  543. rcu_barrier();
  544. error_workqueue:
  545. pr_err("Failed to register: %d\n", ret);
  546. return ret;
  547. }
  548. late_initcall(rxperf_init); /* Must be called after net/ to create socket */
  549. static void __exit rxperf_exit(void)
  550. {
  551. pr_info("Server unregistering.\n");
  552. rxperf_close_socket();
  553. key_put(rxperf_sec_keyring);
  554. destroy_workqueue(rxperf_workqueue);
  555. rcu_barrier();
  556. }
  557. module_exit(rxperf_exit);