drbd_req.c 56 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789
  1. // SPDX-License-Identifier: GPL-2.0-only
  2. /*
  3. drbd_req.c
  4. This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
  5. Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
  6. Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
  7. Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
  8. */
  9. #include <linux/module.h>
  10. #include <linux/slab.h>
  11. #include <linux/drbd.h>
  12. #include "drbd_int.h"
  13. #include "drbd_req.h"
  14. static bool drbd_may_do_local_read(struct drbd_device *device, sector_t sector, int size);
  15. static struct drbd_request *drbd_req_new(struct drbd_device *device, struct bio *bio_src)
  16. {
  17. struct drbd_request *req;
  18. req = mempool_alloc(&drbd_request_mempool, GFP_NOIO);
  19. if (!req)
  20. return NULL;
  21. memset(req, 0, sizeof(*req));
  22. req->rq_state = (bio_data_dir(bio_src) == WRITE ? RQ_WRITE : 0)
  23. | (bio_op(bio_src) == REQ_OP_WRITE_ZEROES ? RQ_ZEROES : 0)
  24. | (bio_op(bio_src) == REQ_OP_DISCARD ? RQ_UNMAP : 0);
  25. req->device = device;
  26. req->master_bio = bio_src;
  27. req->epoch = 0;
  28. drbd_clear_interval(&req->i);
  29. req->i.sector = bio_src->bi_iter.bi_sector;
  30. req->i.size = bio_src->bi_iter.bi_size;
  31. req->i.local = true;
  32. req->i.waiting = false;
  33. INIT_LIST_HEAD(&req->tl_requests);
  34. INIT_LIST_HEAD(&req->w.list);
  35. INIT_LIST_HEAD(&req->req_pending_master_completion);
  36. INIT_LIST_HEAD(&req->req_pending_local);
  37. /* one reference to be put by __drbd_make_request */
  38. atomic_set(&req->completion_ref, 1);
  39. /* one kref as long as completion_ref > 0 */
  40. kref_init(&req->kref);
  41. return req;
  42. }
  43. static void drbd_remove_request_interval(struct rb_root *root,
  44. struct drbd_request *req)
  45. {
  46. struct drbd_device *device = req->device;
  47. struct drbd_interval *i = &req->i;
  48. drbd_remove_interval(root, i);
  49. /* Wake up any processes waiting for this request to complete. */
  50. if (i->waiting)
  51. wake_up(&device->misc_wait);
  52. }
  53. void drbd_req_destroy(struct kref *kref)
  54. {
  55. struct drbd_request *req = container_of(kref, struct drbd_request, kref);
  56. struct drbd_device *device = req->device;
  57. const unsigned s = req->rq_state;
  58. if ((req->master_bio && !(s & RQ_POSTPONED)) ||
  59. atomic_read(&req->completion_ref) ||
  60. (s & RQ_LOCAL_PENDING) ||
  61. ((s & RQ_NET_MASK) && !(s & RQ_NET_DONE))) {
  62. drbd_err(device, "drbd_req_destroy: Logic BUG rq_state = 0x%x, completion_ref = %d\n",
  63. s, atomic_read(&req->completion_ref));
  64. return;
  65. }
  66. /* If called from mod_rq_state (expected normal case) or
  67. * drbd_send_and_submit (the less likely normal path), this holds the
  68. * req_lock, and req->tl_requests will typicaly be on ->transfer_log,
  69. * though it may be still empty (never added to the transfer log).
  70. *
  71. * If called from do_retry(), we do NOT hold the req_lock, but we are
  72. * still allowed to unconditionally list_del(&req->tl_requests),
  73. * because it will be on a local on-stack list only. */
  74. list_del_init(&req->tl_requests);
  75. /* finally remove the request from the conflict detection
  76. * respective block_id verification interval tree. */
  77. if (!drbd_interval_empty(&req->i)) {
  78. struct rb_root *root;
  79. if (s & RQ_WRITE)
  80. root = &device->write_requests;
  81. else
  82. root = &device->read_requests;
  83. drbd_remove_request_interval(root, req);
  84. } else if (s & (RQ_NET_MASK & ~RQ_NET_DONE) && req->i.size != 0)
  85. drbd_err(device, "drbd_req_destroy: Logic BUG: interval empty, but: rq_state=0x%x, sect=%llu, size=%u\n",
  86. s, (unsigned long long)req->i.sector, req->i.size);
  87. /* if it was a write, we may have to set the corresponding
  88. * bit(s) out-of-sync first. If it had a local part, we need to
  89. * release the reference to the activity log. */
  90. if (s & RQ_WRITE) {
  91. /* Set out-of-sync unless both OK flags are set
  92. * (local only or remote failed).
  93. * Other places where we set out-of-sync:
  94. * READ with local io-error */
  95. /* There is a special case:
  96. * we may notice late that IO was suspended,
  97. * and postpone, or schedule for retry, a write,
  98. * before it even was submitted or sent.
  99. * In that case we do not want to touch the bitmap at all.
  100. */
  101. struct drbd_peer_device *peer_device = first_peer_device(device);
  102. if ((s & (RQ_POSTPONED|RQ_LOCAL_MASK|RQ_NET_MASK)) != RQ_POSTPONED) {
  103. if (!(s & RQ_NET_OK) || !(s & RQ_LOCAL_OK))
  104. drbd_set_out_of_sync(peer_device, req->i.sector, req->i.size);
  105. if ((s & RQ_NET_OK) && (s & RQ_LOCAL_OK) && (s & RQ_NET_SIS))
  106. drbd_set_in_sync(peer_device, req->i.sector, req->i.size);
  107. }
  108. /* one might be tempted to move the drbd_al_complete_io
  109. * to the local io completion callback drbd_request_endio.
  110. * but, if this was a mirror write, we may only
  111. * drbd_al_complete_io after this is RQ_NET_DONE,
  112. * otherwise the extent could be dropped from the al
  113. * before it has actually been written on the peer.
  114. * if we crash before our peer knows about the request,
  115. * but after the extent has been dropped from the al,
  116. * we would forget to resync the corresponding extent.
  117. */
  118. if (s & RQ_IN_ACT_LOG) {
  119. if (get_ldev_if_state(device, D_FAILED)) {
  120. drbd_al_complete_io(device, &req->i);
  121. put_ldev(device);
  122. } else if (drbd_ratelimit()) {
  123. drbd_warn(device, "Should have called drbd_al_complete_io(, %llu, %u), "
  124. "but my Disk seems to have failed :(\n",
  125. (unsigned long long) req->i.sector, req->i.size);
  126. }
  127. }
  128. }
  129. mempool_free(req, &drbd_request_mempool);
  130. }
  131. static void wake_all_senders(struct drbd_connection *connection)
  132. {
  133. wake_up(&connection->sender_work.q_wait);
  134. }
  135. /* must hold resource->req_lock */
  136. void start_new_tl_epoch(struct drbd_connection *connection)
  137. {
  138. /* no point closing an epoch, if it is empty, anyways. */
  139. if (connection->current_tle_writes == 0)
  140. return;
  141. connection->current_tle_writes = 0;
  142. atomic_inc(&connection->current_tle_nr);
  143. wake_all_senders(connection);
  144. }
  145. void complete_master_bio(struct drbd_device *device,
  146. struct bio_and_error *m)
  147. {
  148. if (unlikely(m->error))
  149. m->bio->bi_status = errno_to_blk_status(m->error);
  150. bio_endio(m->bio);
  151. dec_ap_bio(device);
  152. }
  153. /* Helper for __req_mod().
  154. * Set m->bio to the master bio, if it is fit to be completed,
  155. * or leave it alone (it is initialized to NULL in __req_mod),
  156. * if it has already been completed, or cannot be completed yet.
  157. * If m->bio is set, the error status to be returned is placed in m->error.
  158. */
  159. static
  160. void drbd_req_complete(struct drbd_request *req, struct bio_and_error *m)
  161. {
  162. const unsigned s = req->rq_state;
  163. struct drbd_device *device = req->device;
  164. int error, ok;
  165. /* we must not complete the master bio, while it is
  166. * still being processed by _drbd_send_zc_bio (drbd_send_dblock)
  167. * not yet acknowledged by the peer
  168. * not yet completed by the local io subsystem
  169. * these flags may get cleared in any order by
  170. * the worker,
  171. * the receiver,
  172. * the bio_endio completion callbacks.
  173. */
  174. if ((s & RQ_LOCAL_PENDING && !(s & RQ_LOCAL_ABORTED)) ||
  175. (s & RQ_NET_QUEUED) || (s & RQ_NET_PENDING) ||
  176. (s & RQ_COMPLETION_SUSP)) {
  177. drbd_err(device, "drbd_req_complete: Logic BUG rq_state = 0x%x\n", s);
  178. return;
  179. }
  180. if (!req->master_bio) {
  181. drbd_err(device, "drbd_req_complete: Logic BUG, master_bio == NULL!\n");
  182. return;
  183. }
  184. /*
  185. * figure out whether to report success or failure.
  186. *
  187. * report success when at least one of the operations succeeded.
  188. * or, to put the other way,
  189. * only report failure, when both operations failed.
  190. *
  191. * what to do about the failures is handled elsewhere.
  192. * what we need to do here is just: complete the master_bio.
  193. *
  194. * local completion error, if any, has been stored as ERR_PTR
  195. * in private_bio within drbd_request_endio.
  196. */
  197. ok = (s & RQ_LOCAL_OK) || (s & RQ_NET_OK);
  198. error = PTR_ERR(req->private_bio);
  199. /* Before we can signal completion to the upper layers,
  200. * we may need to close the current transfer log epoch.
  201. * We are within the request lock, so we can simply compare
  202. * the request epoch number with the current transfer log
  203. * epoch number. If they match, increase the current_tle_nr,
  204. * and reset the transfer log epoch write_cnt.
  205. */
  206. if (op_is_write(bio_op(req->master_bio)) &&
  207. req->epoch == atomic_read(&first_peer_device(device)->connection->current_tle_nr))
  208. start_new_tl_epoch(first_peer_device(device)->connection);
  209. /* Update disk stats */
  210. bio_end_io_acct(req->master_bio, req->start_jif);
  211. /* If READ failed,
  212. * have it be pushed back to the retry work queue,
  213. * so it will re-enter __drbd_make_request(),
  214. * and be re-assigned to a suitable local or remote path,
  215. * or failed if we do not have access to good data anymore.
  216. *
  217. * Unless it was failed early by __drbd_make_request(),
  218. * because no path was available, in which case
  219. * it was not even added to the transfer_log.
  220. *
  221. * read-ahead may fail, and will not be retried.
  222. *
  223. * WRITE should have used all available paths already.
  224. */
  225. if (!ok &&
  226. bio_op(req->master_bio) == REQ_OP_READ &&
  227. !(req->master_bio->bi_opf & REQ_RAHEAD) &&
  228. !list_empty(&req->tl_requests))
  229. req->rq_state |= RQ_POSTPONED;
  230. if (!(req->rq_state & RQ_POSTPONED)) {
  231. m->error = ok ? 0 : (error ?: -EIO);
  232. m->bio = req->master_bio;
  233. req->master_bio = NULL;
  234. /* We leave it in the tree, to be able to verify later
  235. * write-acks in protocol != C during resync.
  236. * But we mark it as "complete", so it won't be counted as
  237. * conflict in a multi-primary setup. */
  238. req->i.completed = true;
  239. }
  240. if (req->i.waiting)
  241. wake_up(&device->misc_wait);
  242. /* Either we are about to complete to upper layers,
  243. * or we will restart this request.
  244. * In either case, the request object will be destroyed soon,
  245. * so better remove it from all lists. */
  246. list_del_init(&req->req_pending_master_completion);
  247. }
  248. /* still holds resource->req_lock */
  249. static void drbd_req_put_completion_ref(struct drbd_request *req, struct bio_and_error *m, int put)
  250. {
  251. struct drbd_device *device = req->device;
  252. D_ASSERT(device, m || (req->rq_state & RQ_POSTPONED));
  253. if (!put)
  254. return;
  255. if (!atomic_sub_and_test(put, &req->completion_ref))
  256. return;
  257. drbd_req_complete(req, m);
  258. /* local completion may still come in later,
  259. * we need to keep the req object around. */
  260. if (req->rq_state & RQ_LOCAL_ABORTED)
  261. return;
  262. if (req->rq_state & RQ_POSTPONED) {
  263. /* don't destroy the req object just yet,
  264. * but queue it for retry */
  265. drbd_restart_request(req);
  266. return;
  267. }
  268. kref_put(&req->kref, drbd_req_destroy);
  269. }
  270. static void set_if_null_req_next(struct drbd_peer_device *peer_device, struct drbd_request *req)
  271. {
  272. struct drbd_connection *connection = peer_device ? peer_device->connection : NULL;
  273. if (!connection)
  274. return;
  275. if (connection->req_next == NULL)
  276. connection->req_next = req;
  277. }
  278. static void advance_conn_req_next(struct drbd_peer_device *peer_device, struct drbd_request *req)
  279. {
  280. struct drbd_connection *connection = peer_device ? peer_device->connection : NULL;
  281. struct drbd_request *iter = req;
  282. if (!connection)
  283. return;
  284. if (connection->req_next != req)
  285. return;
  286. req = NULL;
  287. list_for_each_entry_continue(iter, &connection->transfer_log, tl_requests) {
  288. const unsigned int s = iter->rq_state;
  289. if (s & RQ_NET_QUEUED) {
  290. req = iter;
  291. break;
  292. }
  293. }
  294. connection->req_next = req;
  295. }
  296. static void set_if_null_req_ack_pending(struct drbd_peer_device *peer_device, struct drbd_request *req)
  297. {
  298. struct drbd_connection *connection = peer_device ? peer_device->connection : NULL;
  299. if (!connection)
  300. return;
  301. if (connection->req_ack_pending == NULL)
  302. connection->req_ack_pending = req;
  303. }
  304. static void advance_conn_req_ack_pending(struct drbd_peer_device *peer_device, struct drbd_request *req)
  305. {
  306. struct drbd_connection *connection = peer_device ? peer_device->connection : NULL;
  307. struct drbd_request *iter = req;
  308. if (!connection)
  309. return;
  310. if (connection->req_ack_pending != req)
  311. return;
  312. req = NULL;
  313. list_for_each_entry_continue(iter, &connection->transfer_log, tl_requests) {
  314. const unsigned int s = iter->rq_state;
  315. if ((s & RQ_NET_SENT) && (s & RQ_NET_PENDING)) {
  316. req = iter;
  317. break;
  318. }
  319. }
  320. connection->req_ack_pending = req;
  321. }
  322. static void set_if_null_req_not_net_done(struct drbd_peer_device *peer_device, struct drbd_request *req)
  323. {
  324. struct drbd_connection *connection = peer_device ? peer_device->connection : NULL;
  325. if (!connection)
  326. return;
  327. if (connection->req_not_net_done == NULL)
  328. connection->req_not_net_done = req;
  329. }
  330. static void advance_conn_req_not_net_done(struct drbd_peer_device *peer_device, struct drbd_request *req)
  331. {
  332. struct drbd_connection *connection = peer_device ? peer_device->connection : NULL;
  333. struct drbd_request *iter = req;
  334. if (!connection)
  335. return;
  336. if (connection->req_not_net_done != req)
  337. return;
  338. req = NULL;
  339. list_for_each_entry_continue(iter, &connection->transfer_log, tl_requests) {
  340. const unsigned int s = iter->rq_state;
  341. if ((s & RQ_NET_SENT) && !(s & RQ_NET_DONE)) {
  342. req = iter;
  343. break;
  344. }
  345. }
  346. connection->req_not_net_done = req;
  347. }
  348. /* I'd like this to be the only place that manipulates
  349. * req->completion_ref and req->kref. */
  350. static void mod_rq_state(struct drbd_request *req, struct bio_and_error *m,
  351. int clear, int set)
  352. {
  353. struct drbd_device *device = req->device;
  354. struct drbd_peer_device *peer_device = first_peer_device(device);
  355. unsigned s = req->rq_state;
  356. int c_put = 0;
  357. if (drbd_suspended(device) && !((s | clear) & RQ_COMPLETION_SUSP))
  358. set |= RQ_COMPLETION_SUSP;
  359. /* apply */
  360. req->rq_state &= ~clear;
  361. req->rq_state |= set;
  362. /* no change? */
  363. if (req->rq_state == s)
  364. return;
  365. /* intent: get references */
  366. kref_get(&req->kref);
  367. if (!(s & RQ_LOCAL_PENDING) && (set & RQ_LOCAL_PENDING))
  368. atomic_inc(&req->completion_ref);
  369. if (!(s & RQ_NET_PENDING) && (set & RQ_NET_PENDING)) {
  370. inc_ap_pending(device);
  371. atomic_inc(&req->completion_ref);
  372. }
  373. if (!(s & RQ_NET_QUEUED) && (set & RQ_NET_QUEUED)) {
  374. atomic_inc(&req->completion_ref);
  375. set_if_null_req_next(peer_device, req);
  376. }
  377. if (!(s & RQ_EXP_BARR_ACK) && (set & RQ_EXP_BARR_ACK))
  378. kref_get(&req->kref); /* wait for the DONE */
  379. if (!(s & RQ_NET_SENT) && (set & RQ_NET_SENT)) {
  380. /* potentially already completed in the ack_receiver thread */
  381. if (!(s & RQ_NET_DONE)) {
  382. atomic_add(req->i.size >> 9, &device->ap_in_flight);
  383. set_if_null_req_not_net_done(peer_device, req);
  384. }
  385. if (req->rq_state & RQ_NET_PENDING)
  386. set_if_null_req_ack_pending(peer_device, req);
  387. }
  388. if (!(s & RQ_COMPLETION_SUSP) && (set & RQ_COMPLETION_SUSP))
  389. atomic_inc(&req->completion_ref);
  390. /* progress: put references */
  391. if ((s & RQ_COMPLETION_SUSP) && (clear & RQ_COMPLETION_SUSP))
  392. ++c_put;
  393. if (!(s & RQ_LOCAL_ABORTED) && (set & RQ_LOCAL_ABORTED)) {
  394. D_ASSERT(device, req->rq_state & RQ_LOCAL_PENDING);
  395. ++c_put;
  396. }
  397. if ((s & RQ_LOCAL_PENDING) && (clear & RQ_LOCAL_PENDING)) {
  398. if (req->rq_state & RQ_LOCAL_ABORTED)
  399. kref_put(&req->kref, drbd_req_destroy);
  400. else
  401. ++c_put;
  402. list_del_init(&req->req_pending_local);
  403. }
  404. if ((s & RQ_NET_PENDING) && (clear & RQ_NET_PENDING)) {
  405. dec_ap_pending(device);
  406. ++c_put;
  407. req->acked_jif = jiffies;
  408. advance_conn_req_ack_pending(peer_device, req);
  409. }
  410. if ((s & RQ_NET_QUEUED) && (clear & RQ_NET_QUEUED)) {
  411. ++c_put;
  412. advance_conn_req_next(peer_device, req);
  413. }
  414. if (!(s & RQ_NET_DONE) && (set & RQ_NET_DONE)) {
  415. if (s & RQ_NET_SENT)
  416. atomic_sub(req->i.size >> 9, &device->ap_in_flight);
  417. if (s & RQ_EXP_BARR_ACK)
  418. kref_put(&req->kref, drbd_req_destroy);
  419. req->net_done_jif = jiffies;
  420. /* in ahead/behind mode, or just in case,
  421. * before we finally destroy this request,
  422. * the caching pointers must not reference it anymore */
  423. advance_conn_req_next(peer_device, req);
  424. advance_conn_req_ack_pending(peer_device, req);
  425. advance_conn_req_not_net_done(peer_device, req);
  426. }
  427. /* potentially complete and destroy */
  428. /* If we made progress, retry conflicting peer requests, if any. */
  429. if (req->i.waiting)
  430. wake_up(&device->misc_wait);
  431. drbd_req_put_completion_ref(req, m, c_put);
  432. kref_put(&req->kref, drbd_req_destroy);
  433. }
  434. static void drbd_report_io_error(struct drbd_device *device, struct drbd_request *req)
  435. {
  436. if (!drbd_ratelimit())
  437. return;
  438. drbd_warn(device, "local %s IO error sector %llu+%u on %pg\n",
  439. (req->rq_state & RQ_WRITE) ? "WRITE" : "READ",
  440. (unsigned long long)req->i.sector,
  441. req->i.size >> 9,
  442. device->ldev->backing_bdev);
  443. }
  444. /* Helper for HANDED_OVER_TO_NETWORK.
  445. * Is this a protocol A write (neither WRITE_ACK nor RECEIVE_ACK expected)?
  446. * Is it also still "PENDING"?
  447. * --> If so, clear PENDING and set NET_OK below.
  448. * If it is a protocol A write, but not RQ_PENDING anymore, neg-ack was faster
  449. * (and we must not set RQ_NET_OK) */
  450. static inline bool is_pending_write_protocol_A(struct drbd_request *req)
  451. {
  452. return (req->rq_state &
  453. (RQ_WRITE|RQ_NET_PENDING|RQ_EXP_WRITE_ACK|RQ_EXP_RECEIVE_ACK))
  454. == (RQ_WRITE|RQ_NET_PENDING);
  455. }
  456. /* obviously this could be coded as many single functions
  457. * instead of one huge switch,
  458. * or by putting the code directly in the respective locations
  459. * (as it has been before).
  460. *
  461. * but having it this way
  462. * enforces that it is all in this one place, where it is easier to audit,
  463. * it makes it obvious that whatever "event" "happens" to a request should
  464. * happen "atomically" within the req_lock,
  465. * and it enforces that we have to think in a very structured manner
  466. * about the "events" that may happen to a request during its life time ...
  467. *
  468. *
  469. * peer_device == NULL means local disk
  470. */
  471. int __req_mod(struct drbd_request *req, enum drbd_req_event what,
  472. struct drbd_peer_device *peer_device,
  473. struct bio_and_error *m)
  474. {
  475. struct drbd_device *const device = req->device;
  476. struct drbd_connection *const connection = peer_device ? peer_device->connection : NULL;
  477. struct net_conf *nc;
  478. int p, rv = 0;
  479. if (m)
  480. m->bio = NULL;
  481. switch (what) {
  482. default:
  483. drbd_err(device, "LOGIC BUG in %s:%u\n", __FILE__ , __LINE__);
  484. break;
  485. /* does not happen...
  486. * initialization done in drbd_req_new
  487. case CREATED:
  488. break;
  489. */
  490. case TO_BE_SENT: /* via network */
  491. /* reached via __drbd_make_request
  492. * and from w_read_retry_remote */
  493. D_ASSERT(device, !(req->rq_state & RQ_NET_MASK));
  494. rcu_read_lock();
  495. nc = rcu_dereference(connection->net_conf);
  496. p = nc->wire_protocol;
  497. rcu_read_unlock();
  498. req->rq_state |=
  499. p == DRBD_PROT_C ? RQ_EXP_WRITE_ACK :
  500. p == DRBD_PROT_B ? RQ_EXP_RECEIVE_ACK : 0;
  501. mod_rq_state(req, m, 0, RQ_NET_PENDING);
  502. break;
  503. case TO_BE_SUBMITTED: /* locally */
  504. /* reached via __drbd_make_request */
  505. D_ASSERT(device, !(req->rq_state & RQ_LOCAL_MASK));
  506. mod_rq_state(req, m, 0, RQ_LOCAL_PENDING);
  507. break;
  508. case COMPLETED_OK:
  509. if (req->rq_state & RQ_WRITE)
  510. device->writ_cnt += req->i.size >> 9;
  511. else
  512. device->read_cnt += req->i.size >> 9;
  513. mod_rq_state(req, m, RQ_LOCAL_PENDING,
  514. RQ_LOCAL_COMPLETED|RQ_LOCAL_OK);
  515. break;
  516. case ABORT_DISK_IO:
  517. mod_rq_state(req, m, 0, RQ_LOCAL_ABORTED);
  518. break;
  519. case WRITE_COMPLETED_WITH_ERROR:
  520. drbd_report_io_error(device, req);
  521. __drbd_chk_io_error(device, DRBD_WRITE_ERROR);
  522. mod_rq_state(req, m, RQ_LOCAL_PENDING, RQ_LOCAL_COMPLETED);
  523. break;
  524. case READ_COMPLETED_WITH_ERROR:
  525. drbd_set_out_of_sync(peer_device, req->i.sector, req->i.size);
  526. drbd_report_io_error(device, req);
  527. __drbd_chk_io_error(device, DRBD_READ_ERROR);
  528. fallthrough;
  529. case READ_AHEAD_COMPLETED_WITH_ERROR:
  530. /* it is legal to fail read-ahead, no __drbd_chk_io_error in that case. */
  531. mod_rq_state(req, m, RQ_LOCAL_PENDING, RQ_LOCAL_COMPLETED);
  532. break;
  533. case DISCARD_COMPLETED_NOTSUPP:
  534. case DISCARD_COMPLETED_WITH_ERROR:
  535. /* I'd rather not detach from local disk just because it
  536. * failed a REQ_OP_DISCARD. */
  537. mod_rq_state(req, m, RQ_LOCAL_PENDING, RQ_LOCAL_COMPLETED);
  538. break;
  539. case QUEUE_FOR_NET_READ:
  540. /* READ, and
  541. * no local disk,
  542. * or target area marked as invalid,
  543. * or just got an io-error. */
  544. /* from __drbd_make_request
  545. * or from bio_endio during read io-error recovery */
  546. /* So we can verify the handle in the answer packet.
  547. * Corresponding drbd_remove_request_interval is in
  548. * drbd_req_complete() */
  549. D_ASSERT(device, drbd_interval_empty(&req->i));
  550. drbd_insert_interval(&device->read_requests, &req->i);
  551. set_bit(UNPLUG_REMOTE, &device->flags);
  552. D_ASSERT(device, req->rq_state & RQ_NET_PENDING);
  553. D_ASSERT(device, (req->rq_state & RQ_LOCAL_MASK) == 0);
  554. mod_rq_state(req, m, 0, RQ_NET_QUEUED);
  555. req->w.cb = w_send_read_req;
  556. drbd_queue_work(&connection->sender_work,
  557. &req->w);
  558. break;
  559. case QUEUE_FOR_NET_WRITE:
  560. /* assert something? */
  561. /* from __drbd_make_request only */
  562. /* Corresponding drbd_remove_request_interval is in
  563. * drbd_req_complete() */
  564. D_ASSERT(device, drbd_interval_empty(&req->i));
  565. drbd_insert_interval(&device->write_requests, &req->i);
  566. /* NOTE
  567. * In case the req ended up on the transfer log before being
  568. * queued on the worker, it could lead to this request being
  569. * missed during cleanup after connection loss.
  570. * So we have to do both operations here,
  571. * within the same lock that protects the transfer log.
  572. *
  573. * _req_add_to_epoch(req); this has to be after the
  574. * _maybe_start_new_epoch(req); which happened in
  575. * __drbd_make_request, because we now may set the bit
  576. * again ourselves to close the current epoch.
  577. *
  578. * Add req to the (now) current epoch (barrier). */
  579. /* otherwise we may lose an unplug, which may cause some remote
  580. * io-scheduler timeout to expire, increasing maximum latency,
  581. * hurting performance. */
  582. set_bit(UNPLUG_REMOTE, &device->flags);
  583. /* queue work item to send data */
  584. D_ASSERT(device, req->rq_state & RQ_NET_PENDING);
  585. mod_rq_state(req, m, 0, RQ_NET_QUEUED|RQ_EXP_BARR_ACK);
  586. req->w.cb = w_send_dblock;
  587. drbd_queue_work(&connection->sender_work,
  588. &req->w);
  589. /* close the epoch, in case it outgrew the limit */
  590. rcu_read_lock();
  591. nc = rcu_dereference(connection->net_conf);
  592. p = nc->max_epoch_size;
  593. rcu_read_unlock();
  594. if (connection->current_tle_writes >= p)
  595. start_new_tl_epoch(connection);
  596. break;
  597. case QUEUE_FOR_SEND_OOS:
  598. mod_rq_state(req, m, 0, RQ_NET_QUEUED);
  599. req->w.cb = w_send_out_of_sync;
  600. drbd_queue_work(&connection->sender_work,
  601. &req->w);
  602. break;
  603. case READ_RETRY_REMOTE_CANCELED:
  604. case SEND_CANCELED:
  605. case SEND_FAILED:
  606. /* real cleanup will be done from tl_clear. just update flags
  607. * so it is no longer marked as on the worker queue */
  608. mod_rq_state(req, m, RQ_NET_QUEUED, 0);
  609. break;
  610. case HANDED_OVER_TO_NETWORK:
  611. /* assert something? */
  612. if (is_pending_write_protocol_A(req))
  613. /* this is what is dangerous about protocol A:
  614. * pretend it was successfully written on the peer. */
  615. mod_rq_state(req, m, RQ_NET_QUEUED|RQ_NET_PENDING,
  616. RQ_NET_SENT|RQ_NET_OK);
  617. else
  618. mod_rq_state(req, m, RQ_NET_QUEUED, RQ_NET_SENT);
  619. /* It is still not yet RQ_NET_DONE until the
  620. * corresponding epoch barrier got acked as well,
  621. * so we know what to dirty on connection loss. */
  622. break;
  623. case OOS_HANDED_TO_NETWORK:
  624. /* Was not set PENDING, no longer QUEUED, so is now DONE
  625. * as far as this connection is concerned. */
  626. mod_rq_state(req, m, RQ_NET_QUEUED, RQ_NET_DONE);
  627. break;
  628. case CONNECTION_LOST_WHILE_PENDING:
  629. /* transfer log cleanup after connection loss */
  630. mod_rq_state(req, m,
  631. RQ_NET_OK|RQ_NET_PENDING|RQ_COMPLETION_SUSP,
  632. RQ_NET_DONE);
  633. break;
  634. case CONFLICT_RESOLVED:
  635. /* for superseded conflicting writes of multiple primaries,
  636. * there is no need to keep anything in the tl, potential
  637. * node crashes are covered by the activity log.
  638. *
  639. * If this request had been marked as RQ_POSTPONED before,
  640. * it will actually not be completed, but "restarted",
  641. * resubmitted from the retry worker context. */
  642. D_ASSERT(device, req->rq_state & RQ_NET_PENDING);
  643. D_ASSERT(device, req->rq_state & RQ_EXP_WRITE_ACK);
  644. mod_rq_state(req, m, RQ_NET_PENDING, RQ_NET_DONE|RQ_NET_OK);
  645. break;
  646. case WRITE_ACKED_BY_PEER_AND_SIS:
  647. req->rq_state |= RQ_NET_SIS;
  648. fallthrough;
  649. case WRITE_ACKED_BY_PEER:
  650. /* Normal operation protocol C: successfully written on peer.
  651. * During resync, even in protocol != C,
  652. * we requested an explicit write ack anyways.
  653. * Which means we cannot even assert anything here.
  654. * Nothing more to do here.
  655. * We want to keep the tl in place for all protocols, to cater
  656. * for volatile write-back caches on lower level devices. */
  657. goto ack_common;
  658. case RECV_ACKED_BY_PEER:
  659. D_ASSERT(device, req->rq_state & RQ_EXP_RECEIVE_ACK);
  660. /* protocol B; pretends to be successfully written on peer.
  661. * see also notes above in HANDED_OVER_TO_NETWORK about
  662. * protocol != C */
  663. ack_common:
  664. mod_rq_state(req, m, RQ_NET_PENDING, RQ_NET_OK);
  665. break;
  666. case POSTPONE_WRITE:
  667. D_ASSERT(device, req->rq_state & RQ_EXP_WRITE_ACK);
  668. /* If this node has already detected the write conflict, the
  669. * worker will be waiting on misc_wait. Wake it up once this
  670. * request has completed locally.
  671. */
  672. D_ASSERT(device, req->rq_state & RQ_NET_PENDING);
  673. req->rq_state |= RQ_POSTPONED;
  674. if (req->i.waiting)
  675. wake_up(&device->misc_wait);
  676. /* Do not clear RQ_NET_PENDING. This request will make further
  677. * progress via restart_conflicting_writes() or
  678. * fail_postponed_requests(). Hopefully. */
  679. break;
  680. case NEG_ACKED:
  681. mod_rq_state(req, m, RQ_NET_OK|RQ_NET_PENDING, 0);
  682. break;
  683. case FAIL_FROZEN_DISK_IO:
  684. if (!(req->rq_state & RQ_LOCAL_COMPLETED))
  685. break;
  686. mod_rq_state(req, m, RQ_COMPLETION_SUSP, 0);
  687. break;
  688. case RESTART_FROZEN_DISK_IO:
  689. if (!(req->rq_state & RQ_LOCAL_COMPLETED))
  690. break;
  691. mod_rq_state(req, m,
  692. RQ_COMPLETION_SUSP|RQ_LOCAL_COMPLETED,
  693. RQ_LOCAL_PENDING);
  694. rv = MR_READ;
  695. if (bio_data_dir(req->master_bio) == WRITE)
  696. rv = MR_WRITE;
  697. get_ldev(device); /* always succeeds in this call path */
  698. req->w.cb = w_restart_disk_io;
  699. drbd_queue_work(&connection->sender_work,
  700. &req->w);
  701. break;
  702. case RESEND:
  703. /* Simply complete (local only) READs. */
  704. if (!(req->rq_state & RQ_WRITE) && !req->w.cb) {
  705. mod_rq_state(req, m, RQ_COMPLETION_SUSP, 0);
  706. break;
  707. }
  708. /* If RQ_NET_OK is already set, we got a P_WRITE_ACK or P_RECV_ACK
  709. before the connection loss (B&C only); only P_BARRIER_ACK
  710. (or the local completion?) was missing when we suspended.
  711. Throwing them out of the TL here by pretending we got a BARRIER_ACK.
  712. During connection handshake, we ensure that the peer was not rebooted. */
  713. if (!(req->rq_state & RQ_NET_OK)) {
  714. /* FIXME could this possibly be a req->dw.cb == w_send_out_of_sync?
  715. * in that case we must not set RQ_NET_PENDING. */
  716. mod_rq_state(req, m, RQ_COMPLETION_SUSP, RQ_NET_QUEUED|RQ_NET_PENDING);
  717. if (req->w.cb) {
  718. /* w.cb expected to be w_send_dblock, or w_send_read_req */
  719. drbd_queue_work(&connection->sender_work,
  720. &req->w);
  721. rv = req->rq_state & RQ_WRITE ? MR_WRITE : MR_READ;
  722. } /* else: FIXME can this happen? */
  723. break;
  724. }
  725. fallthrough; /* to BARRIER_ACKED */
  726. case BARRIER_ACKED:
  727. /* barrier ack for READ requests does not make sense */
  728. if (!(req->rq_state & RQ_WRITE))
  729. break;
  730. if (req->rq_state & RQ_NET_PENDING) {
  731. /* barrier came in before all requests were acked.
  732. * this is bad, because if the connection is lost now,
  733. * we won't be able to clean them up... */
  734. drbd_err(device, "FIXME (BARRIER_ACKED but pending)\n");
  735. }
  736. /* Allowed to complete requests, even while suspended.
  737. * As this is called for all requests within a matching epoch,
  738. * we need to filter, and only set RQ_NET_DONE for those that
  739. * have actually been on the wire. */
  740. mod_rq_state(req, m, RQ_COMPLETION_SUSP,
  741. (req->rq_state & RQ_NET_MASK) ? RQ_NET_DONE : 0);
  742. break;
  743. case DATA_RECEIVED:
  744. D_ASSERT(device, req->rq_state & RQ_NET_PENDING);
  745. mod_rq_state(req, m, RQ_NET_PENDING, RQ_NET_OK|RQ_NET_DONE);
  746. break;
  747. case QUEUE_AS_DRBD_BARRIER:
  748. start_new_tl_epoch(connection);
  749. mod_rq_state(req, m, 0, RQ_NET_OK|RQ_NET_DONE);
  750. break;
  751. }
  752. return rv;
  753. }
  754. /* we may do a local read if:
  755. * - we are consistent (of course),
  756. * - or we are generally inconsistent,
  757. * BUT we are still/already IN SYNC for this area.
  758. * since size may be bigger than BM_BLOCK_SIZE,
  759. * we may need to check several bits.
  760. */
  761. static bool drbd_may_do_local_read(struct drbd_device *device, sector_t sector, int size)
  762. {
  763. unsigned long sbnr, ebnr;
  764. sector_t esector, nr_sectors;
  765. if (device->state.disk == D_UP_TO_DATE)
  766. return true;
  767. if (device->state.disk != D_INCONSISTENT)
  768. return false;
  769. esector = sector + (size >> 9) - 1;
  770. nr_sectors = get_capacity(device->vdisk);
  771. D_ASSERT(device, sector < nr_sectors);
  772. D_ASSERT(device, esector < nr_sectors);
  773. sbnr = BM_SECT_TO_BIT(sector);
  774. ebnr = BM_SECT_TO_BIT(esector);
  775. return drbd_bm_count_bits(device, sbnr, ebnr) == 0;
  776. }
  777. static bool remote_due_to_read_balancing(struct drbd_device *device, sector_t sector,
  778. enum drbd_read_balancing rbm)
  779. {
  780. int stripe_shift;
  781. switch (rbm) {
  782. case RB_CONGESTED_REMOTE:
  783. return false;
  784. case RB_LEAST_PENDING:
  785. return atomic_read(&device->local_cnt) >
  786. atomic_read(&device->ap_pending_cnt) + atomic_read(&device->rs_pending_cnt);
  787. case RB_32K_STRIPING: /* stripe_shift = 15 */
  788. case RB_64K_STRIPING:
  789. case RB_128K_STRIPING:
  790. case RB_256K_STRIPING:
  791. case RB_512K_STRIPING:
  792. case RB_1M_STRIPING: /* stripe_shift = 20 */
  793. stripe_shift = (rbm - RB_32K_STRIPING + 15);
  794. return (sector >> (stripe_shift - 9)) & 1;
  795. case RB_ROUND_ROBIN:
  796. return test_and_change_bit(READ_BALANCE_RR, &device->flags);
  797. case RB_PREFER_REMOTE:
  798. return true;
  799. case RB_PREFER_LOCAL:
  800. default:
  801. return false;
  802. }
  803. }
  804. /*
  805. * complete_conflicting_writes - wait for any conflicting write requests
  806. *
  807. * The write_requests tree contains all active write requests which we
  808. * currently know about. Wait for any requests to complete which conflict with
  809. * the new one.
  810. *
  811. * Only way out: remove the conflicting intervals from the tree.
  812. */
  813. static void complete_conflicting_writes(struct drbd_request *req)
  814. {
  815. DEFINE_WAIT(wait);
  816. struct drbd_device *device = req->device;
  817. struct drbd_interval *i;
  818. sector_t sector = req->i.sector;
  819. int size = req->i.size;
  820. for (;;) {
  821. drbd_for_each_overlap(i, &device->write_requests, sector, size) {
  822. /* Ignore, if already completed to upper layers. */
  823. if (i->completed)
  824. continue;
  825. /* Handle the first found overlap. After the schedule
  826. * we have to restart the tree walk. */
  827. break;
  828. }
  829. if (!i) /* if any */
  830. break;
  831. /* Indicate to wake up device->misc_wait on progress. */
  832. prepare_to_wait(&device->misc_wait, &wait, TASK_UNINTERRUPTIBLE);
  833. i->waiting = true;
  834. spin_unlock_irq(&device->resource->req_lock);
  835. schedule();
  836. spin_lock_irq(&device->resource->req_lock);
  837. }
  838. finish_wait(&device->misc_wait, &wait);
  839. }
  840. /* called within req_lock */
  841. static void maybe_pull_ahead(struct drbd_device *device)
  842. {
  843. struct drbd_connection *connection = first_peer_device(device)->connection;
  844. struct net_conf *nc;
  845. bool congested = false;
  846. enum drbd_on_congestion on_congestion;
  847. rcu_read_lock();
  848. nc = rcu_dereference(connection->net_conf);
  849. on_congestion = nc ? nc->on_congestion : OC_BLOCK;
  850. rcu_read_unlock();
  851. if (on_congestion == OC_BLOCK ||
  852. connection->agreed_pro_version < 96)
  853. return;
  854. if (on_congestion == OC_PULL_AHEAD && device->state.conn == C_AHEAD)
  855. return; /* nothing to do ... */
  856. /* If I don't even have good local storage, we can not reasonably try
  857. * to pull ahead of the peer. We also need the local reference to make
  858. * sure device->act_log is there.
  859. */
  860. if (!get_ldev_if_state(device, D_UP_TO_DATE))
  861. return;
  862. if (nc->cong_fill &&
  863. atomic_read(&device->ap_in_flight) >= nc->cong_fill) {
  864. drbd_info(device, "Congestion-fill threshold reached\n");
  865. congested = true;
  866. }
  867. if (device->act_log->used >= nc->cong_extents) {
  868. drbd_info(device, "Congestion-extents threshold reached\n");
  869. congested = true;
  870. }
  871. if (congested) {
  872. /* start a new epoch for non-mirrored writes */
  873. start_new_tl_epoch(first_peer_device(device)->connection);
  874. if (on_congestion == OC_PULL_AHEAD)
  875. _drbd_set_state(_NS(device, conn, C_AHEAD), 0, NULL);
  876. else /*nc->on_congestion == OC_DISCONNECT */
  877. _drbd_set_state(_NS(device, conn, C_DISCONNECTING), 0, NULL);
  878. }
  879. put_ldev(device);
  880. }
  881. /* If this returns false, and req->private_bio is still set,
  882. * this should be submitted locally.
  883. *
  884. * If it returns false, but req->private_bio is not set,
  885. * we do not have access to good data :(
  886. *
  887. * Otherwise, this destroys req->private_bio, if any,
  888. * and returns true.
  889. */
  890. static bool do_remote_read(struct drbd_request *req)
  891. {
  892. struct drbd_device *device = req->device;
  893. enum drbd_read_balancing rbm;
  894. if (req->private_bio) {
  895. if (!drbd_may_do_local_read(device,
  896. req->i.sector, req->i.size)) {
  897. bio_put(req->private_bio);
  898. req->private_bio = NULL;
  899. put_ldev(device);
  900. }
  901. }
  902. if (device->state.pdsk != D_UP_TO_DATE)
  903. return false;
  904. if (req->private_bio == NULL)
  905. return true;
  906. /* TODO: improve read balancing decisions, take into account drbd
  907. * protocol, pending requests etc. */
  908. rcu_read_lock();
  909. rbm = rcu_dereference(device->ldev->disk_conf)->read_balancing;
  910. rcu_read_unlock();
  911. if (rbm == RB_PREFER_LOCAL && req->private_bio)
  912. return false; /* submit locally */
  913. if (remote_due_to_read_balancing(device, req->i.sector, rbm)) {
  914. if (req->private_bio) {
  915. bio_put(req->private_bio);
  916. req->private_bio = NULL;
  917. put_ldev(device);
  918. }
  919. return true;
  920. }
  921. return false;
  922. }
  923. bool drbd_should_do_remote(union drbd_dev_state s)
  924. {
  925. return s.pdsk == D_UP_TO_DATE ||
  926. (s.pdsk >= D_INCONSISTENT &&
  927. s.conn >= C_WF_BITMAP_T &&
  928. s.conn < C_AHEAD);
  929. /* Before proto 96 that was >= CONNECTED instead of >= C_WF_BITMAP_T.
  930. That is equivalent since before 96 IO was frozen in the C_WF_BITMAP*
  931. states. */
  932. }
  933. static bool drbd_should_send_out_of_sync(union drbd_dev_state s)
  934. {
  935. return s.conn == C_AHEAD || s.conn == C_WF_BITMAP_S;
  936. /* pdsk = D_INCONSISTENT as a consequence. Protocol 96 check not necessary
  937. since we enter state C_AHEAD only if proto >= 96 */
  938. }
  939. /* returns number of connections (== 1, for drbd 8.4)
  940. * expected to actually write this data,
  941. * which does NOT include those that we are L_AHEAD for. */
  942. static int drbd_process_write_request(struct drbd_request *req)
  943. {
  944. struct drbd_device *device = req->device;
  945. struct drbd_peer_device *peer_device = first_peer_device(device);
  946. int remote, send_oos;
  947. remote = drbd_should_do_remote(device->state);
  948. send_oos = drbd_should_send_out_of_sync(device->state);
  949. /* Need to replicate writes. Unless it is an empty flush,
  950. * which is better mapped to a DRBD P_BARRIER packet,
  951. * also for drbd wire protocol compatibility reasons.
  952. * If this was a flush, just start a new epoch.
  953. * Unless the current epoch was empty anyways, or we are not currently
  954. * replicating, in which case there is no point. */
  955. if (unlikely(req->i.size == 0)) {
  956. /* The only size==0 bios we expect are empty flushes. */
  957. D_ASSERT(device, req->master_bio->bi_opf & REQ_PREFLUSH);
  958. if (remote)
  959. _req_mod(req, QUEUE_AS_DRBD_BARRIER, peer_device);
  960. return remote;
  961. }
  962. if (!remote && !send_oos)
  963. return 0;
  964. D_ASSERT(device, !(remote && send_oos));
  965. if (remote) {
  966. _req_mod(req, TO_BE_SENT, peer_device);
  967. _req_mod(req, QUEUE_FOR_NET_WRITE, peer_device);
  968. } else if (drbd_set_out_of_sync(peer_device, req->i.sector, req->i.size))
  969. _req_mod(req, QUEUE_FOR_SEND_OOS, peer_device);
  970. return remote;
  971. }
  972. static void drbd_process_discard_or_zeroes_req(struct drbd_request *req, int flags)
  973. {
  974. int err = drbd_issue_discard_or_zero_out(req->device,
  975. req->i.sector, req->i.size >> 9, flags);
  976. if (err)
  977. req->private_bio->bi_status = BLK_STS_IOERR;
  978. bio_endio(req->private_bio);
  979. }
  980. static void
  981. drbd_submit_req_private_bio(struct drbd_request *req)
  982. {
  983. struct drbd_device *device = req->device;
  984. struct bio *bio = req->private_bio;
  985. unsigned int type;
  986. if (bio_op(bio) != REQ_OP_READ)
  987. type = DRBD_FAULT_DT_WR;
  988. else if (bio->bi_opf & REQ_RAHEAD)
  989. type = DRBD_FAULT_DT_RA;
  990. else
  991. type = DRBD_FAULT_DT_RD;
  992. /* State may have changed since we grabbed our reference on the
  993. * ->ldev member. Double check, and short-circuit to endio.
  994. * In case the last activity log transaction failed to get on
  995. * stable storage, and this is a WRITE, we may not even submit
  996. * this bio. */
  997. if (get_ldev(device)) {
  998. if (drbd_insert_fault(device, type))
  999. bio_io_error(bio);
  1000. else if (bio_op(bio) == REQ_OP_WRITE_ZEROES)
  1001. drbd_process_discard_or_zeroes_req(req, EE_ZEROOUT |
  1002. ((bio->bi_opf & REQ_NOUNMAP) ? 0 : EE_TRIM));
  1003. else if (bio_op(bio) == REQ_OP_DISCARD)
  1004. drbd_process_discard_or_zeroes_req(req, EE_TRIM);
  1005. else
  1006. submit_bio_noacct(bio);
  1007. put_ldev(device);
  1008. } else
  1009. bio_io_error(bio);
  1010. }
  1011. static void drbd_queue_write(struct drbd_device *device, struct drbd_request *req)
  1012. {
  1013. spin_lock_irq(&device->resource->req_lock);
  1014. list_add_tail(&req->tl_requests, &device->submit.writes);
  1015. list_add_tail(&req->req_pending_master_completion,
  1016. &device->pending_master_completion[1 /* WRITE */]);
  1017. spin_unlock_irq(&device->resource->req_lock);
  1018. queue_work(device->submit.wq, &device->submit.worker);
  1019. /* do_submit() may sleep internally on al_wait, too */
  1020. wake_up(&device->al_wait);
  1021. }
  1022. /* returns the new drbd_request pointer, if the caller is expected to
  1023. * drbd_send_and_submit() it (to save latency), or NULL if we queued the
  1024. * request on the submitter thread.
  1025. * Returns ERR_PTR(-ENOMEM) if we cannot allocate a drbd_request.
  1026. */
  1027. static struct drbd_request *
  1028. drbd_request_prepare(struct drbd_device *device, struct bio *bio)
  1029. {
  1030. const int rw = bio_data_dir(bio);
  1031. struct drbd_request *req;
  1032. /* allocate outside of all locks; */
  1033. req = drbd_req_new(device, bio);
  1034. if (!req) {
  1035. dec_ap_bio(device);
  1036. /* only pass the error to the upper layers.
  1037. * if user cannot handle io errors, that's not our business. */
  1038. drbd_err(device, "could not kmalloc() req\n");
  1039. bio->bi_status = BLK_STS_RESOURCE;
  1040. bio_endio(bio);
  1041. return ERR_PTR(-ENOMEM);
  1042. }
  1043. /* Update disk stats */
  1044. req->start_jif = bio_start_io_acct(req->master_bio);
  1045. if (get_ldev(device)) {
  1046. req->private_bio = bio_alloc_clone(device->ldev->backing_bdev,
  1047. bio, GFP_NOIO,
  1048. &drbd_io_bio_set);
  1049. req->private_bio->bi_private = req;
  1050. req->private_bio->bi_end_io = drbd_request_endio;
  1051. }
  1052. /* process discards always from our submitter thread */
  1053. if (bio_op(bio) == REQ_OP_WRITE_ZEROES ||
  1054. bio_op(bio) == REQ_OP_DISCARD)
  1055. goto queue_for_submitter_thread;
  1056. if (rw == WRITE && req->private_bio && req->i.size
  1057. && !test_bit(AL_SUSPENDED, &device->flags)) {
  1058. if (!drbd_al_begin_io_fastpath(device, &req->i))
  1059. goto queue_for_submitter_thread;
  1060. req->rq_state |= RQ_IN_ACT_LOG;
  1061. req->in_actlog_jif = jiffies;
  1062. }
  1063. return req;
  1064. queue_for_submitter_thread:
  1065. atomic_inc(&device->ap_actlog_cnt);
  1066. drbd_queue_write(device, req);
  1067. return NULL;
  1068. }
  1069. /* Require at least one path to current data.
  1070. * We don't want to allow writes on C_STANDALONE D_INCONSISTENT:
  1071. * We would not allow to read what was written,
  1072. * we would not have bumped the data generation uuids,
  1073. * we would cause data divergence for all the wrong reasons.
  1074. *
  1075. * If we don't see at least one D_UP_TO_DATE, we will fail this request,
  1076. * which either returns EIO, or, if OND_SUSPEND_IO is set, suspends IO,
  1077. * and queues for retry later.
  1078. */
  1079. static bool may_do_writes(struct drbd_device *device)
  1080. {
  1081. const union drbd_dev_state s = device->state;
  1082. return s.disk == D_UP_TO_DATE || s.pdsk == D_UP_TO_DATE;
  1083. }
  1084. struct drbd_plug_cb {
  1085. struct blk_plug_cb cb;
  1086. struct drbd_request *most_recent_req;
  1087. /* do we need more? */
  1088. };
  1089. static void drbd_unplug(struct blk_plug_cb *cb, bool from_schedule)
  1090. {
  1091. struct drbd_plug_cb *plug = container_of(cb, struct drbd_plug_cb, cb);
  1092. struct drbd_resource *resource = plug->cb.data;
  1093. struct drbd_request *req = plug->most_recent_req;
  1094. kfree(cb);
  1095. if (!req)
  1096. return;
  1097. spin_lock_irq(&resource->req_lock);
  1098. /* In case the sender did not process it yet, raise the flag to
  1099. * have it followed with P_UNPLUG_REMOTE just after. */
  1100. req->rq_state |= RQ_UNPLUG;
  1101. /* but also queue a generic unplug */
  1102. drbd_queue_unplug(req->device);
  1103. kref_put(&req->kref, drbd_req_destroy);
  1104. spin_unlock_irq(&resource->req_lock);
  1105. }
  1106. static struct drbd_plug_cb* drbd_check_plugged(struct drbd_resource *resource)
  1107. {
  1108. /* A lot of text to say
  1109. * return (struct drbd_plug_cb*)blk_check_plugged(); */
  1110. struct drbd_plug_cb *plug;
  1111. struct blk_plug_cb *cb = blk_check_plugged(drbd_unplug, resource, sizeof(*plug));
  1112. if (cb)
  1113. plug = container_of(cb, struct drbd_plug_cb, cb);
  1114. else
  1115. plug = NULL;
  1116. return plug;
  1117. }
  1118. static void drbd_update_plug(struct drbd_plug_cb *plug, struct drbd_request *req)
  1119. {
  1120. struct drbd_request *tmp = plug->most_recent_req;
  1121. /* Will be sent to some peer.
  1122. * Remember to tag it with UNPLUG_REMOTE on unplug */
  1123. kref_get(&req->kref);
  1124. plug->most_recent_req = req;
  1125. if (tmp)
  1126. kref_put(&tmp->kref, drbd_req_destroy);
  1127. }
  1128. static void drbd_send_and_submit(struct drbd_device *device, struct drbd_request *req)
  1129. {
  1130. struct drbd_resource *resource = device->resource;
  1131. struct drbd_peer_device *peer_device = first_peer_device(device);
  1132. const int rw = bio_data_dir(req->master_bio);
  1133. struct bio_and_error m = { NULL, };
  1134. bool no_remote = false;
  1135. bool submit_private_bio = false;
  1136. spin_lock_irq(&resource->req_lock);
  1137. if (rw == WRITE) {
  1138. /* This may temporarily give up the req_lock,
  1139. * but will re-aquire it before it returns here.
  1140. * Needs to be before the check on drbd_suspended() */
  1141. complete_conflicting_writes(req);
  1142. /* no more giving up req_lock from now on! */
  1143. /* check for congestion, and potentially stop sending
  1144. * full data updates, but start sending "dirty bits" only. */
  1145. maybe_pull_ahead(device);
  1146. }
  1147. if (drbd_suspended(device)) {
  1148. /* push back and retry: */
  1149. req->rq_state |= RQ_POSTPONED;
  1150. if (req->private_bio) {
  1151. bio_put(req->private_bio);
  1152. req->private_bio = NULL;
  1153. put_ldev(device);
  1154. }
  1155. goto out;
  1156. }
  1157. /* We fail READ early, if we can not serve it.
  1158. * We must do this before req is registered on any lists.
  1159. * Otherwise, drbd_req_complete() will queue failed READ for retry. */
  1160. if (rw != WRITE) {
  1161. if (!do_remote_read(req) && !req->private_bio)
  1162. goto nodata;
  1163. }
  1164. /* which transfer log epoch does this belong to? */
  1165. req->epoch = atomic_read(&first_peer_device(device)->connection->current_tle_nr);
  1166. /* no point in adding empty flushes to the transfer log,
  1167. * they are mapped to drbd barriers already. */
  1168. if (likely(req->i.size!=0)) {
  1169. if (rw == WRITE)
  1170. first_peer_device(device)->connection->current_tle_writes++;
  1171. list_add_tail(&req->tl_requests, &first_peer_device(device)->connection->transfer_log);
  1172. }
  1173. if (rw == WRITE) {
  1174. if (req->private_bio && !may_do_writes(device)) {
  1175. bio_put(req->private_bio);
  1176. req->private_bio = NULL;
  1177. put_ldev(device);
  1178. goto nodata;
  1179. }
  1180. if (!drbd_process_write_request(req))
  1181. no_remote = true;
  1182. } else {
  1183. /* We either have a private_bio, or we can read from remote.
  1184. * Otherwise we had done the goto nodata above. */
  1185. if (req->private_bio == NULL) {
  1186. _req_mod(req, TO_BE_SENT, peer_device);
  1187. _req_mod(req, QUEUE_FOR_NET_READ, peer_device);
  1188. } else
  1189. no_remote = true;
  1190. }
  1191. if (no_remote == false) {
  1192. struct drbd_plug_cb *plug = drbd_check_plugged(resource);
  1193. if (plug)
  1194. drbd_update_plug(plug, req);
  1195. }
  1196. /* If it took the fast path in drbd_request_prepare, add it here.
  1197. * The slow path has added it already. */
  1198. if (list_empty(&req->req_pending_master_completion))
  1199. list_add_tail(&req->req_pending_master_completion,
  1200. &device->pending_master_completion[rw == WRITE]);
  1201. if (req->private_bio) {
  1202. /* needs to be marked within the same spinlock */
  1203. req->pre_submit_jif = jiffies;
  1204. list_add_tail(&req->req_pending_local,
  1205. &device->pending_completion[rw == WRITE]);
  1206. _req_mod(req, TO_BE_SUBMITTED, NULL);
  1207. /* but we need to give up the spinlock to submit */
  1208. submit_private_bio = true;
  1209. } else if (no_remote) {
  1210. nodata:
  1211. if (drbd_ratelimit())
  1212. drbd_err(device, "IO ERROR: neither local nor remote data, sector %llu+%u\n",
  1213. (unsigned long long)req->i.sector, req->i.size >> 9);
  1214. /* A write may have been queued for send_oos, however.
  1215. * So we can not simply free it, we must go through drbd_req_put_completion_ref() */
  1216. }
  1217. out:
  1218. drbd_req_put_completion_ref(req, &m, 1);
  1219. spin_unlock_irq(&resource->req_lock);
  1220. /* Even though above is a kref_put(), this is safe.
  1221. * As long as we still need to submit our private bio,
  1222. * we hold a completion ref, and the request cannot disappear.
  1223. * If however this request did not even have a private bio to submit
  1224. * (e.g. remote read), req may already be invalid now.
  1225. * That's why we cannot check on req->private_bio. */
  1226. if (submit_private_bio)
  1227. drbd_submit_req_private_bio(req);
  1228. if (m.bio)
  1229. complete_master_bio(device, &m);
  1230. }
  1231. void __drbd_make_request(struct drbd_device *device, struct bio *bio)
  1232. {
  1233. struct drbd_request *req = drbd_request_prepare(device, bio);
  1234. if (IS_ERR_OR_NULL(req))
  1235. return;
  1236. drbd_send_and_submit(device, req);
  1237. }
  1238. static void submit_fast_path(struct drbd_device *device, struct list_head *incoming)
  1239. {
  1240. struct blk_plug plug;
  1241. struct drbd_request *req, *tmp;
  1242. blk_start_plug(&plug);
  1243. list_for_each_entry_safe(req, tmp, incoming, tl_requests) {
  1244. const int rw = bio_data_dir(req->master_bio);
  1245. if (rw == WRITE /* rw != WRITE should not even end up here! */
  1246. && req->private_bio && req->i.size
  1247. && !test_bit(AL_SUSPENDED, &device->flags)) {
  1248. if (!drbd_al_begin_io_fastpath(device, &req->i))
  1249. continue;
  1250. req->rq_state |= RQ_IN_ACT_LOG;
  1251. req->in_actlog_jif = jiffies;
  1252. atomic_dec(&device->ap_actlog_cnt);
  1253. }
  1254. list_del_init(&req->tl_requests);
  1255. drbd_send_and_submit(device, req);
  1256. }
  1257. blk_finish_plug(&plug);
  1258. }
  1259. static bool prepare_al_transaction_nonblock(struct drbd_device *device,
  1260. struct list_head *incoming,
  1261. struct list_head *pending,
  1262. struct list_head *later)
  1263. {
  1264. struct drbd_request *req;
  1265. int wake = 0;
  1266. int err;
  1267. spin_lock_irq(&device->al_lock);
  1268. while ((req = list_first_entry_or_null(incoming, struct drbd_request, tl_requests))) {
  1269. err = drbd_al_begin_io_nonblock(device, &req->i);
  1270. if (err == -ENOBUFS)
  1271. break;
  1272. if (err == -EBUSY)
  1273. wake = 1;
  1274. if (err)
  1275. list_move_tail(&req->tl_requests, later);
  1276. else
  1277. list_move_tail(&req->tl_requests, pending);
  1278. }
  1279. spin_unlock_irq(&device->al_lock);
  1280. if (wake)
  1281. wake_up(&device->al_wait);
  1282. return !list_empty(pending);
  1283. }
  1284. static void send_and_submit_pending(struct drbd_device *device, struct list_head *pending)
  1285. {
  1286. struct blk_plug plug;
  1287. struct drbd_request *req;
  1288. blk_start_plug(&plug);
  1289. while ((req = list_first_entry_or_null(pending, struct drbd_request, tl_requests))) {
  1290. req->rq_state |= RQ_IN_ACT_LOG;
  1291. req->in_actlog_jif = jiffies;
  1292. atomic_dec(&device->ap_actlog_cnt);
  1293. list_del_init(&req->tl_requests);
  1294. drbd_send_and_submit(device, req);
  1295. }
  1296. blk_finish_plug(&plug);
  1297. }
  1298. void do_submit(struct work_struct *ws)
  1299. {
  1300. struct drbd_device *device = container_of(ws, struct drbd_device, submit.worker);
  1301. LIST_HEAD(incoming); /* from drbd_make_request() */
  1302. LIST_HEAD(pending); /* to be submitted after next AL-transaction commit */
  1303. LIST_HEAD(busy); /* blocked by resync requests */
  1304. /* grab new incoming requests */
  1305. spin_lock_irq(&device->resource->req_lock);
  1306. list_splice_tail_init(&device->submit.writes, &incoming);
  1307. spin_unlock_irq(&device->resource->req_lock);
  1308. for (;;) {
  1309. DEFINE_WAIT(wait);
  1310. /* move used-to-be-busy back to front of incoming */
  1311. list_splice_init(&busy, &incoming);
  1312. submit_fast_path(device, &incoming);
  1313. if (list_empty(&incoming))
  1314. break;
  1315. for (;;) {
  1316. prepare_to_wait(&device->al_wait, &wait, TASK_UNINTERRUPTIBLE);
  1317. list_splice_init(&busy, &incoming);
  1318. prepare_al_transaction_nonblock(device, &incoming, &pending, &busy);
  1319. if (!list_empty(&pending))
  1320. break;
  1321. schedule();
  1322. /* If all currently "hot" activity log extents are kept busy by
  1323. * incoming requests, we still must not totally starve new
  1324. * requests to "cold" extents.
  1325. * Something left on &incoming means there had not been
  1326. * enough update slots available, and the activity log
  1327. * has been marked as "starving".
  1328. *
  1329. * Try again now, without looking for new requests,
  1330. * effectively blocking all new requests until we made
  1331. * at least _some_ progress with what we currently have.
  1332. */
  1333. if (!list_empty(&incoming))
  1334. continue;
  1335. /* Nothing moved to pending, but nothing left
  1336. * on incoming: all moved to busy!
  1337. * Grab new and iterate. */
  1338. spin_lock_irq(&device->resource->req_lock);
  1339. list_splice_tail_init(&device->submit.writes, &incoming);
  1340. spin_unlock_irq(&device->resource->req_lock);
  1341. }
  1342. finish_wait(&device->al_wait, &wait);
  1343. /* If the transaction was full, before all incoming requests
  1344. * had been processed, skip ahead to commit, and iterate
  1345. * without splicing in more incoming requests from upper layers.
  1346. *
  1347. * Else, if all incoming have been processed,
  1348. * they have become either "pending" (to be submitted after
  1349. * next transaction commit) or "busy" (blocked by resync).
  1350. *
  1351. * Maybe more was queued, while we prepared the transaction?
  1352. * Try to stuff those into this transaction as well.
  1353. * Be strictly non-blocking here,
  1354. * we already have something to commit.
  1355. *
  1356. * Commit if we don't make any more progres.
  1357. */
  1358. while (list_empty(&incoming)) {
  1359. LIST_HEAD(more_pending);
  1360. LIST_HEAD(more_incoming);
  1361. bool made_progress;
  1362. /* It is ok to look outside the lock,
  1363. * it's only an optimization anyways */
  1364. if (list_empty(&device->submit.writes))
  1365. break;
  1366. spin_lock_irq(&device->resource->req_lock);
  1367. list_splice_tail_init(&device->submit.writes, &more_incoming);
  1368. spin_unlock_irq(&device->resource->req_lock);
  1369. if (list_empty(&more_incoming))
  1370. break;
  1371. made_progress = prepare_al_transaction_nonblock(device, &more_incoming, &more_pending, &busy);
  1372. list_splice_tail_init(&more_pending, &pending);
  1373. list_splice_tail_init(&more_incoming, &incoming);
  1374. if (!made_progress)
  1375. break;
  1376. }
  1377. drbd_al_begin_io_commit(device);
  1378. send_and_submit_pending(device, &pending);
  1379. }
  1380. }
  1381. void drbd_submit_bio(struct bio *bio)
  1382. {
  1383. struct drbd_device *device = bio->bi_bdev->bd_disk->private_data;
  1384. bio = bio_split_to_limits(bio);
  1385. if (!bio)
  1386. return;
  1387. /*
  1388. * what we "blindly" assume:
  1389. */
  1390. D_ASSERT(device, IS_ALIGNED(bio->bi_iter.bi_size, 512));
  1391. inc_ap_bio(device);
  1392. __drbd_make_request(device, bio);
  1393. }
  1394. static bool net_timeout_reached(struct drbd_request *net_req,
  1395. struct drbd_connection *connection,
  1396. unsigned long now, unsigned long ent,
  1397. unsigned int ko_count, unsigned int timeout)
  1398. {
  1399. struct drbd_device *device = net_req->device;
  1400. if (!time_after(now, net_req->pre_send_jif + ent))
  1401. return false;
  1402. if (time_in_range(now, connection->last_reconnect_jif, connection->last_reconnect_jif + ent))
  1403. return false;
  1404. if (net_req->rq_state & RQ_NET_PENDING) {
  1405. drbd_warn(device, "Remote failed to finish a request within %ums > ko-count (%u) * timeout (%u * 0.1s)\n",
  1406. jiffies_to_msecs(now - net_req->pre_send_jif), ko_count, timeout);
  1407. return true;
  1408. }
  1409. /* We received an ACK already (or are using protocol A),
  1410. * but are waiting for the epoch closing barrier ack.
  1411. * Check if we sent the barrier already. We should not blame the peer
  1412. * for being unresponsive, if we did not even ask it yet. */
  1413. if (net_req->epoch == connection->send.current_epoch_nr) {
  1414. drbd_warn(device,
  1415. "We did not send a P_BARRIER for %ums > ko-count (%u) * timeout (%u * 0.1s); drbd kernel thread blocked?\n",
  1416. jiffies_to_msecs(now - net_req->pre_send_jif), ko_count, timeout);
  1417. return false;
  1418. }
  1419. /* Worst case: we may have been blocked for whatever reason, then
  1420. * suddenly are able to send a lot of requests (and epoch separating
  1421. * barriers) in quick succession.
  1422. * The timestamp of the net_req may be much too old and not correspond
  1423. * to the sending time of the relevant unack'ed barrier packet, so
  1424. * would trigger a spurious timeout. The latest barrier packet may
  1425. * have a too recent timestamp to trigger the timeout, potentially miss
  1426. * a timeout. Right now we don't have a place to conveniently store
  1427. * these timestamps.
  1428. * But in this particular situation, the application requests are still
  1429. * completed to upper layers, DRBD should still "feel" responsive.
  1430. * No need yet to kill this connection, it may still recover.
  1431. * If not, eventually we will have queued enough into the network for
  1432. * us to block. From that point of view, the timestamp of the last sent
  1433. * barrier packet is relevant enough.
  1434. */
  1435. if (time_after(now, connection->send.last_sent_barrier_jif + ent)) {
  1436. drbd_warn(device, "Remote failed to answer a P_BARRIER (sent at %lu jif; now=%lu jif) within %ums > ko-count (%u) * timeout (%u * 0.1s)\n",
  1437. connection->send.last_sent_barrier_jif, now,
  1438. jiffies_to_msecs(now - connection->send.last_sent_barrier_jif), ko_count, timeout);
  1439. return true;
  1440. }
  1441. return false;
  1442. }
  1443. /* A request is considered timed out, if
  1444. * - we have some effective timeout from the configuration,
  1445. * with some state restrictions applied,
  1446. * - the oldest request is waiting for a response from the network
  1447. * resp. the local disk,
  1448. * - the oldest request is in fact older than the effective timeout,
  1449. * - the connection was established (resp. disk was attached)
  1450. * for longer than the timeout already.
  1451. * Note that for 32bit jiffies and very stable connections/disks,
  1452. * we may have a wrap around, which is catched by
  1453. * !time_in_range(now, last_..._jif, last_..._jif + timeout).
  1454. *
  1455. * Side effect: once per 32bit wrap-around interval, which means every
  1456. * ~198 days with 250 HZ, we have a window where the timeout would need
  1457. * to expire twice (worst case) to become effective. Good enough.
  1458. */
  1459. void request_timer_fn(struct timer_list *t)
  1460. {
  1461. struct drbd_device *device = from_timer(device, t, request_timer);
  1462. struct drbd_connection *connection = first_peer_device(device)->connection;
  1463. struct drbd_request *req_read, *req_write, *req_peer; /* oldest request */
  1464. struct net_conf *nc;
  1465. unsigned long oldest_submit_jif;
  1466. unsigned long ent = 0, dt = 0, et, nt; /* effective timeout = ko_count * timeout */
  1467. unsigned long now;
  1468. unsigned int ko_count = 0, timeout = 0;
  1469. rcu_read_lock();
  1470. nc = rcu_dereference(connection->net_conf);
  1471. if (nc && device->state.conn >= C_WF_REPORT_PARAMS) {
  1472. ko_count = nc->ko_count;
  1473. timeout = nc->timeout;
  1474. }
  1475. if (get_ldev(device)) { /* implicit state.disk >= D_INCONSISTENT */
  1476. dt = rcu_dereference(device->ldev->disk_conf)->disk_timeout * HZ / 10;
  1477. put_ldev(device);
  1478. }
  1479. rcu_read_unlock();
  1480. ent = timeout * HZ/10 * ko_count;
  1481. et = min_not_zero(dt, ent);
  1482. if (!et)
  1483. return; /* Recurring timer stopped */
  1484. now = jiffies;
  1485. nt = now + et;
  1486. spin_lock_irq(&device->resource->req_lock);
  1487. req_read = list_first_entry_or_null(&device->pending_completion[0], struct drbd_request, req_pending_local);
  1488. req_write = list_first_entry_or_null(&device->pending_completion[1], struct drbd_request, req_pending_local);
  1489. /* maybe the oldest request waiting for the peer is in fact still
  1490. * blocking in tcp sendmsg. That's ok, though, that's handled via the
  1491. * socket send timeout, requesting a ping, and bumping ko-count in
  1492. * we_should_drop_the_connection().
  1493. */
  1494. /* check the oldest request we did successfully sent,
  1495. * but which is still waiting for an ACK. */
  1496. req_peer = connection->req_ack_pending;
  1497. /* if we don't have such request (e.g. protocoll A)
  1498. * check the oldest requests which is still waiting on its epoch
  1499. * closing barrier ack. */
  1500. if (!req_peer)
  1501. req_peer = connection->req_not_net_done;
  1502. /* evaluate the oldest peer request only in one timer! */
  1503. if (req_peer && req_peer->device != device)
  1504. req_peer = NULL;
  1505. /* do we have something to evaluate? */
  1506. if (req_peer == NULL && req_write == NULL && req_read == NULL)
  1507. goto out;
  1508. oldest_submit_jif =
  1509. (req_write && req_read)
  1510. ? ( time_before(req_write->pre_submit_jif, req_read->pre_submit_jif)
  1511. ? req_write->pre_submit_jif : req_read->pre_submit_jif )
  1512. : req_write ? req_write->pre_submit_jif
  1513. : req_read ? req_read->pre_submit_jif : now;
  1514. if (ent && req_peer && net_timeout_reached(req_peer, connection, now, ent, ko_count, timeout))
  1515. _conn_request_state(connection, NS(conn, C_TIMEOUT), CS_VERBOSE | CS_HARD);
  1516. if (dt && oldest_submit_jif != now &&
  1517. time_after(now, oldest_submit_jif + dt) &&
  1518. !time_in_range(now, device->last_reattach_jif, device->last_reattach_jif + dt)) {
  1519. drbd_warn(device, "Local backing device failed to meet the disk-timeout\n");
  1520. __drbd_chk_io_error(device, DRBD_FORCE_DETACH);
  1521. }
  1522. /* Reschedule timer for the nearest not already expired timeout.
  1523. * Fallback to now + min(effective network timeout, disk timeout). */
  1524. ent = (ent && req_peer && time_before(now, req_peer->pre_send_jif + ent))
  1525. ? req_peer->pre_send_jif + ent : now + et;
  1526. dt = (dt && oldest_submit_jif != now && time_before(now, oldest_submit_jif + dt))
  1527. ? oldest_submit_jif + dt : now + et;
  1528. nt = time_before(ent, dt) ? ent : dt;
  1529. out:
  1530. spin_unlock_irq(&device->resource->req_lock);
  1531. mod_timer(&device->request_timer, nt);
  1532. }