md-cluster.c 42 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527
  1. /*
  2. * Copyright (C) 2015, SUSE
  3. *
  4. * This program is free software; you can redistribute it and/or modify
  5. * it under the terms of the GNU General Public License as published by
  6. * the Free Software Foundation; either version 2, or (at your option)
  7. * any later version.
  8. *
  9. */
  10. #include <linux/module.h>
  11. #include <linux/kthread.h>
  12. #include <linux/dlm.h>
  13. #include <linux/sched.h>
  14. #include <linux/raid/md_p.h>
  15. #include "md.h"
  16. #include "md-bitmap.h"
  17. #include "md-cluster.h"
  18. #define LVB_SIZE 64
  19. #define NEW_DEV_TIMEOUT 5000
  20. struct dlm_lock_resource {
  21. dlm_lockspace_t *ls;
  22. struct dlm_lksb lksb;
  23. char *name; /* lock name. */
  24. uint32_t flags; /* flags to pass to dlm_lock() */
  25. wait_queue_head_t sync_locking; /* wait queue for synchronized locking */
  26. bool sync_locking_done;
  27. void (*bast)(void *arg, int mode); /* blocking AST function pointer*/
  28. struct mddev *mddev; /* pointing back to mddev. */
  29. int mode;
  30. };
  31. struct suspend_info {
  32. int slot;
  33. sector_t lo;
  34. sector_t hi;
  35. struct list_head list;
  36. };
  37. struct resync_info {
  38. __le64 lo;
  39. __le64 hi;
  40. };
  41. /* md_cluster_info flags */
  42. #define MD_CLUSTER_WAITING_FOR_NEWDISK 1
  43. #define MD_CLUSTER_SUSPEND_READ_BALANCING 2
  44. #define MD_CLUSTER_BEGIN_JOIN_CLUSTER 3
  45. /* Lock the send communication. This is done through
  46. * bit manipulation as opposed to a mutex in order to
  47. * accomodate lock and hold. See next comment.
  48. */
  49. #define MD_CLUSTER_SEND_LOCK 4
  50. /* If cluster operations (such as adding a disk) must lock the
  51. * communication channel, so as to perform extra operations
  52. * (update metadata) and no other operation is allowed on the
  53. * MD. Token needs to be locked and held until the operation
  54. * completes witha md_update_sb(), which would eventually release
  55. * the lock.
  56. */
  57. #define MD_CLUSTER_SEND_LOCKED_ALREADY 5
  58. /* We should receive message after node joined cluster and
  59. * set up all the related infos such as bitmap and personality */
  60. #define MD_CLUSTER_ALREADY_IN_CLUSTER 6
  61. #define MD_CLUSTER_PENDING_RECV_EVENT 7
  62. #define MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD 8
  63. struct md_cluster_info {
  64. struct mddev *mddev; /* the md device which md_cluster_info belongs to */
  65. /* dlm lock space and resources for clustered raid. */
  66. dlm_lockspace_t *lockspace;
  67. int slot_number;
  68. struct completion completion;
  69. struct mutex recv_mutex;
  70. struct dlm_lock_resource *bitmap_lockres;
  71. struct dlm_lock_resource **other_bitmap_lockres;
  72. struct dlm_lock_resource *resync_lockres;
  73. struct list_head suspend_list;
  74. spinlock_t suspend_lock;
  75. struct md_thread *recovery_thread;
  76. unsigned long recovery_map;
  77. /* communication loc resources */
  78. struct dlm_lock_resource *ack_lockres;
  79. struct dlm_lock_resource *message_lockres;
  80. struct dlm_lock_resource *token_lockres;
  81. struct dlm_lock_resource *no_new_dev_lockres;
  82. struct md_thread *recv_thread;
  83. struct completion newdisk_completion;
  84. wait_queue_head_t wait;
  85. unsigned long state;
  86. /* record the region in RESYNCING message */
  87. sector_t sync_low;
  88. sector_t sync_hi;
  89. };
  90. enum msg_type {
  91. METADATA_UPDATED = 0,
  92. RESYNCING,
  93. NEWDISK,
  94. REMOVE,
  95. RE_ADD,
  96. BITMAP_NEEDS_SYNC,
  97. CHANGE_CAPACITY,
  98. };
  99. struct cluster_msg {
  100. __le32 type;
  101. __le32 slot;
  102. /* TODO: Unionize this for smaller footprint */
  103. __le64 low;
  104. __le64 high;
  105. char uuid[16];
  106. __le32 raid_slot;
  107. };
  108. static void sync_ast(void *arg)
  109. {
  110. struct dlm_lock_resource *res;
  111. res = arg;
  112. res->sync_locking_done = true;
  113. wake_up(&res->sync_locking);
  114. }
  115. static int dlm_lock_sync(struct dlm_lock_resource *res, int mode)
  116. {
  117. int ret = 0;
  118. ret = dlm_lock(res->ls, mode, &res->lksb,
  119. res->flags, res->name, strlen(res->name),
  120. 0, sync_ast, res, res->bast);
  121. if (ret)
  122. return ret;
  123. wait_event(res->sync_locking, res->sync_locking_done);
  124. res->sync_locking_done = false;
  125. if (res->lksb.sb_status == 0)
  126. res->mode = mode;
  127. return res->lksb.sb_status;
  128. }
  129. static int dlm_unlock_sync(struct dlm_lock_resource *res)
  130. {
  131. return dlm_lock_sync(res, DLM_LOCK_NL);
  132. }
  133. /*
  134. * An variation of dlm_lock_sync, which make lock request could
  135. * be interrupted
  136. */
  137. static int dlm_lock_sync_interruptible(struct dlm_lock_resource *res, int mode,
  138. struct mddev *mddev)
  139. {
  140. int ret = 0;
  141. ret = dlm_lock(res->ls, mode, &res->lksb,
  142. res->flags, res->name, strlen(res->name),
  143. 0, sync_ast, res, res->bast);
  144. if (ret)
  145. return ret;
  146. wait_event(res->sync_locking, res->sync_locking_done
  147. || kthread_should_stop()
  148. || test_bit(MD_CLOSING, &mddev->flags));
  149. if (!res->sync_locking_done) {
  150. /*
  151. * the convert queue contains the lock request when request is
  152. * interrupted, and sync_ast could still be run, so need to
  153. * cancel the request and reset completion
  154. */
  155. ret = dlm_unlock(res->ls, res->lksb.sb_lkid, DLM_LKF_CANCEL,
  156. &res->lksb, res);
  157. res->sync_locking_done = false;
  158. if (unlikely(ret != 0))
  159. pr_info("failed to cancel previous lock request "
  160. "%s return %d\n", res->name, ret);
  161. return -EPERM;
  162. } else
  163. res->sync_locking_done = false;
  164. if (res->lksb.sb_status == 0)
  165. res->mode = mode;
  166. return res->lksb.sb_status;
  167. }
  168. static struct dlm_lock_resource *lockres_init(struct mddev *mddev,
  169. char *name, void (*bastfn)(void *arg, int mode), int with_lvb)
  170. {
  171. struct dlm_lock_resource *res = NULL;
  172. int ret, namelen;
  173. struct md_cluster_info *cinfo = mddev->cluster_info;
  174. res = kzalloc(sizeof(struct dlm_lock_resource), GFP_KERNEL);
  175. if (!res)
  176. return NULL;
  177. init_waitqueue_head(&res->sync_locking);
  178. res->sync_locking_done = false;
  179. res->ls = cinfo->lockspace;
  180. res->mddev = mddev;
  181. res->mode = DLM_LOCK_IV;
  182. namelen = strlen(name);
  183. res->name = kzalloc(namelen + 1, GFP_KERNEL);
  184. if (!res->name) {
  185. pr_err("md-cluster: Unable to allocate resource name for resource %s\n", name);
  186. goto out_err;
  187. }
  188. strlcpy(res->name, name, namelen + 1);
  189. if (with_lvb) {
  190. res->lksb.sb_lvbptr = kzalloc(LVB_SIZE, GFP_KERNEL);
  191. if (!res->lksb.sb_lvbptr) {
  192. pr_err("md-cluster: Unable to allocate LVB for resource %s\n", name);
  193. goto out_err;
  194. }
  195. res->flags = DLM_LKF_VALBLK;
  196. }
  197. if (bastfn)
  198. res->bast = bastfn;
  199. res->flags |= DLM_LKF_EXPEDITE;
  200. ret = dlm_lock_sync(res, DLM_LOCK_NL);
  201. if (ret) {
  202. pr_err("md-cluster: Unable to lock NL on new lock resource %s\n", name);
  203. goto out_err;
  204. }
  205. res->flags &= ~DLM_LKF_EXPEDITE;
  206. res->flags |= DLM_LKF_CONVERT;
  207. return res;
  208. out_err:
  209. kfree(res->lksb.sb_lvbptr);
  210. kfree(res->name);
  211. kfree(res);
  212. return NULL;
  213. }
  214. static void lockres_free(struct dlm_lock_resource *res)
  215. {
  216. int ret = 0;
  217. if (!res)
  218. return;
  219. /*
  220. * use FORCEUNLOCK flag, so we can unlock even the lock is on the
  221. * waiting or convert queue
  222. */
  223. ret = dlm_unlock(res->ls, res->lksb.sb_lkid, DLM_LKF_FORCEUNLOCK,
  224. &res->lksb, res);
  225. if (unlikely(ret != 0))
  226. pr_err("failed to unlock %s return %d\n", res->name, ret);
  227. else
  228. wait_event(res->sync_locking, res->sync_locking_done);
  229. kfree(res->name);
  230. kfree(res->lksb.sb_lvbptr);
  231. kfree(res);
  232. }
  233. static void add_resync_info(struct dlm_lock_resource *lockres,
  234. sector_t lo, sector_t hi)
  235. {
  236. struct resync_info *ri;
  237. ri = (struct resync_info *)lockres->lksb.sb_lvbptr;
  238. ri->lo = cpu_to_le64(lo);
  239. ri->hi = cpu_to_le64(hi);
  240. }
  241. static struct suspend_info *read_resync_info(struct mddev *mddev, struct dlm_lock_resource *lockres)
  242. {
  243. struct resync_info ri;
  244. struct suspend_info *s = NULL;
  245. sector_t hi = 0;
  246. dlm_lock_sync(lockres, DLM_LOCK_CR);
  247. memcpy(&ri, lockres->lksb.sb_lvbptr, sizeof(struct resync_info));
  248. hi = le64_to_cpu(ri.hi);
  249. if (hi > 0) {
  250. s = kzalloc(sizeof(struct suspend_info), GFP_KERNEL);
  251. if (!s)
  252. goto out;
  253. s->hi = hi;
  254. s->lo = le64_to_cpu(ri.lo);
  255. }
  256. dlm_unlock_sync(lockres);
  257. out:
  258. return s;
  259. }
  260. static void recover_bitmaps(struct md_thread *thread)
  261. {
  262. struct mddev *mddev = thread->mddev;
  263. struct md_cluster_info *cinfo = mddev->cluster_info;
  264. struct dlm_lock_resource *bm_lockres;
  265. char str[64];
  266. int slot, ret;
  267. struct suspend_info *s, *tmp;
  268. sector_t lo, hi;
  269. while (cinfo->recovery_map) {
  270. slot = fls64((u64)cinfo->recovery_map) - 1;
  271. snprintf(str, 64, "bitmap%04d", slot);
  272. bm_lockres = lockres_init(mddev, str, NULL, 1);
  273. if (!bm_lockres) {
  274. pr_err("md-cluster: Cannot initialize bitmaps\n");
  275. goto clear_bit;
  276. }
  277. ret = dlm_lock_sync_interruptible(bm_lockres, DLM_LOCK_PW, mddev);
  278. if (ret) {
  279. pr_err("md-cluster: Could not DLM lock %s: %d\n",
  280. str, ret);
  281. goto clear_bit;
  282. }
  283. ret = md_bitmap_copy_from_slot(mddev, slot, &lo, &hi, true);
  284. if (ret) {
  285. pr_err("md-cluster: Could not copy data from bitmap %d\n", slot);
  286. goto clear_bit;
  287. }
  288. /* Clear suspend_area associated with the bitmap */
  289. spin_lock_irq(&cinfo->suspend_lock);
  290. list_for_each_entry_safe(s, tmp, &cinfo->suspend_list, list)
  291. if (slot == s->slot) {
  292. list_del(&s->list);
  293. kfree(s);
  294. }
  295. spin_unlock_irq(&cinfo->suspend_lock);
  296. if (hi > 0) {
  297. if (lo < mddev->recovery_cp)
  298. mddev->recovery_cp = lo;
  299. /* wake up thread to continue resync in case resync
  300. * is not finished */
  301. if (mddev->recovery_cp != MaxSector) {
  302. /*
  303. * clear the REMOTE flag since we will launch
  304. * resync thread in current node.
  305. */
  306. clear_bit(MD_RESYNCING_REMOTE,
  307. &mddev->recovery);
  308. set_bit(MD_RECOVERY_NEEDED, &mddev->recovery);
  309. md_wakeup_thread(mddev->thread);
  310. }
  311. }
  312. clear_bit:
  313. lockres_free(bm_lockres);
  314. clear_bit(slot, &cinfo->recovery_map);
  315. }
  316. }
  317. static void recover_prep(void *arg)
  318. {
  319. struct mddev *mddev = arg;
  320. struct md_cluster_info *cinfo = mddev->cluster_info;
  321. set_bit(MD_CLUSTER_SUSPEND_READ_BALANCING, &cinfo->state);
  322. }
  323. static void __recover_slot(struct mddev *mddev, int slot)
  324. {
  325. struct md_cluster_info *cinfo = mddev->cluster_info;
  326. set_bit(slot, &cinfo->recovery_map);
  327. if (!cinfo->recovery_thread) {
  328. cinfo->recovery_thread = md_register_thread(recover_bitmaps,
  329. mddev, "recover");
  330. if (!cinfo->recovery_thread) {
  331. pr_warn("md-cluster: Could not create recovery thread\n");
  332. return;
  333. }
  334. }
  335. md_wakeup_thread(cinfo->recovery_thread);
  336. }
  337. static void recover_slot(void *arg, struct dlm_slot *slot)
  338. {
  339. struct mddev *mddev = arg;
  340. struct md_cluster_info *cinfo = mddev->cluster_info;
  341. pr_info("md-cluster: %s Node %d/%d down. My slot: %d. Initiating recovery.\n",
  342. mddev->bitmap_info.cluster_name,
  343. slot->nodeid, slot->slot,
  344. cinfo->slot_number);
  345. /* deduct one since dlm slot starts from one while the num of
  346. * cluster-md begins with 0 */
  347. __recover_slot(mddev, slot->slot - 1);
  348. }
  349. static void recover_done(void *arg, struct dlm_slot *slots,
  350. int num_slots, int our_slot,
  351. uint32_t generation)
  352. {
  353. struct mddev *mddev = arg;
  354. struct md_cluster_info *cinfo = mddev->cluster_info;
  355. cinfo->slot_number = our_slot;
  356. /* completion is only need to be complete when node join cluster,
  357. * it doesn't need to run during another node's failure */
  358. if (test_bit(MD_CLUSTER_BEGIN_JOIN_CLUSTER, &cinfo->state)) {
  359. complete(&cinfo->completion);
  360. clear_bit(MD_CLUSTER_BEGIN_JOIN_CLUSTER, &cinfo->state);
  361. }
  362. clear_bit(MD_CLUSTER_SUSPEND_READ_BALANCING, &cinfo->state);
  363. }
  364. /* the ops is called when node join the cluster, and do lock recovery
  365. * if node failure occurs */
  366. static const struct dlm_lockspace_ops md_ls_ops = {
  367. .recover_prep = recover_prep,
  368. .recover_slot = recover_slot,
  369. .recover_done = recover_done,
  370. };
  371. /*
  372. * The BAST function for the ack lock resource
  373. * This function wakes up the receive thread in
  374. * order to receive and process the message.
  375. */
  376. static void ack_bast(void *arg, int mode)
  377. {
  378. struct dlm_lock_resource *res = arg;
  379. struct md_cluster_info *cinfo = res->mddev->cluster_info;
  380. if (mode == DLM_LOCK_EX) {
  381. if (test_bit(MD_CLUSTER_ALREADY_IN_CLUSTER, &cinfo->state))
  382. md_wakeup_thread(cinfo->recv_thread);
  383. else
  384. set_bit(MD_CLUSTER_PENDING_RECV_EVENT, &cinfo->state);
  385. }
  386. }
  387. static void __remove_suspend_info(struct md_cluster_info *cinfo, int slot)
  388. {
  389. struct suspend_info *s, *tmp;
  390. list_for_each_entry_safe(s, tmp, &cinfo->suspend_list, list)
  391. if (slot == s->slot) {
  392. list_del(&s->list);
  393. kfree(s);
  394. break;
  395. }
  396. }
  397. static void remove_suspend_info(struct mddev *mddev, int slot)
  398. {
  399. struct md_cluster_info *cinfo = mddev->cluster_info;
  400. mddev->pers->quiesce(mddev, 1);
  401. spin_lock_irq(&cinfo->suspend_lock);
  402. __remove_suspend_info(cinfo, slot);
  403. spin_unlock_irq(&cinfo->suspend_lock);
  404. mddev->pers->quiesce(mddev, 0);
  405. }
  406. static void process_suspend_info(struct mddev *mddev,
  407. int slot, sector_t lo, sector_t hi)
  408. {
  409. struct md_cluster_info *cinfo = mddev->cluster_info;
  410. struct suspend_info *s;
  411. if (!hi) {
  412. /*
  413. * clear the REMOTE flag since resync or recovery is finished
  414. * in remote node.
  415. */
  416. clear_bit(MD_RESYNCING_REMOTE, &mddev->recovery);
  417. remove_suspend_info(mddev, slot);
  418. set_bit(MD_RECOVERY_NEEDED, &mddev->recovery);
  419. md_wakeup_thread(mddev->thread);
  420. return;
  421. }
  422. /*
  423. * The bitmaps are not same for different nodes
  424. * if RESYNCING is happening in one node, then
  425. * the node which received the RESYNCING message
  426. * probably will perform resync with the region
  427. * [lo, hi] again, so we could reduce resync time
  428. * a lot if we can ensure that the bitmaps among
  429. * different nodes are match up well.
  430. *
  431. * sync_low/hi is used to record the region which
  432. * arrived in the previous RESYNCING message,
  433. *
  434. * Call bitmap_sync_with_cluster to clear
  435. * NEEDED_MASK and set RESYNC_MASK since
  436. * resync thread is running in another node,
  437. * so we don't need to do the resync again
  438. * with the same section */
  439. md_bitmap_sync_with_cluster(mddev, cinfo->sync_low, cinfo->sync_hi, lo, hi);
  440. cinfo->sync_low = lo;
  441. cinfo->sync_hi = hi;
  442. s = kzalloc(sizeof(struct suspend_info), GFP_KERNEL);
  443. if (!s)
  444. return;
  445. s->slot = slot;
  446. s->lo = lo;
  447. s->hi = hi;
  448. mddev->pers->quiesce(mddev, 1);
  449. spin_lock_irq(&cinfo->suspend_lock);
  450. /* Remove existing entry (if exists) before adding */
  451. __remove_suspend_info(cinfo, slot);
  452. list_add(&s->list, &cinfo->suspend_list);
  453. spin_unlock_irq(&cinfo->suspend_lock);
  454. mddev->pers->quiesce(mddev, 0);
  455. }
  456. static void process_add_new_disk(struct mddev *mddev, struct cluster_msg *cmsg)
  457. {
  458. char disk_uuid[64];
  459. struct md_cluster_info *cinfo = mddev->cluster_info;
  460. char event_name[] = "EVENT=ADD_DEVICE";
  461. char raid_slot[16];
  462. char *envp[] = {event_name, disk_uuid, raid_slot, NULL};
  463. int len;
  464. len = snprintf(disk_uuid, 64, "DEVICE_UUID=");
  465. sprintf(disk_uuid + len, "%pU", cmsg->uuid);
  466. snprintf(raid_slot, 16, "RAID_DISK=%d", le32_to_cpu(cmsg->raid_slot));
  467. pr_info("%s:%d Sending kobject change with %s and %s\n", __func__, __LINE__, disk_uuid, raid_slot);
  468. init_completion(&cinfo->newdisk_completion);
  469. set_bit(MD_CLUSTER_WAITING_FOR_NEWDISK, &cinfo->state);
  470. kobject_uevent_env(&disk_to_dev(mddev->gendisk)->kobj, KOBJ_CHANGE, envp);
  471. wait_for_completion_timeout(&cinfo->newdisk_completion,
  472. NEW_DEV_TIMEOUT);
  473. clear_bit(MD_CLUSTER_WAITING_FOR_NEWDISK, &cinfo->state);
  474. }
  475. static void process_metadata_update(struct mddev *mddev, struct cluster_msg *msg)
  476. {
  477. int got_lock = 0;
  478. struct md_cluster_info *cinfo = mddev->cluster_info;
  479. mddev->good_device_nr = le32_to_cpu(msg->raid_slot);
  480. dlm_lock_sync(cinfo->no_new_dev_lockres, DLM_LOCK_CR);
  481. wait_event(mddev->thread->wqueue,
  482. (got_lock = mddev_trylock(mddev)) ||
  483. test_bit(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD, &cinfo->state));
  484. md_reload_sb(mddev, mddev->good_device_nr);
  485. if (got_lock)
  486. mddev_unlock(mddev);
  487. }
  488. static void process_remove_disk(struct mddev *mddev, struct cluster_msg *msg)
  489. {
  490. struct md_rdev *rdev;
  491. rcu_read_lock();
  492. rdev = md_find_rdev_nr_rcu(mddev, le32_to_cpu(msg->raid_slot));
  493. if (rdev) {
  494. set_bit(ClusterRemove, &rdev->flags);
  495. set_bit(MD_RECOVERY_NEEDED, &mddev->recovery);
  496. md_wakeup_thread(mddev->thread);
  497. }
  498. else
  499. pr_warn("%s: %d Could not find disk(%d) to REMOVE\n",
  500. __func__, __LINE__, le32_to_cpu(msg->raid_slot));
  501. rcu_read_unlock();
  502. }
  503. static void process_readd_disk(struct mddev *mddev, struct cluster_msg *msg)
  504. {
  505. struct md_rdev *rdev;
  506. rcu_read_lock();
  507. rdev = md_find_rdev_nr_rcu(mddev, le32_to_cpu(msg->raid_slot));
  508. if (rdev && test_bit(Faulty, &rdev->flags))
  509. clear_bit(Faulty, &rdev->flags);
  510. else
  511. pr_warn("%s: %d Could not find disk(%d) which is faulty",
  512. __func__, __LINE__, le32_to_cpu(msg->raid_slot));
  513. rcu_read_unlock();
  514. }
  515. static int process_recvd_msg(struct mddev *mddev, struct cluster_msg *msg)
  516. {
  517. int ret = 0;
  518. if (WARN(mddev->cluster_info->slot_number - 1 == le32_to_cpu(msg->slot),
  519. "node %d received it's own msg\n", le32_to_cpu(msg->slot)))
  520. return -1;
  521. switch (le32_to_cpu(msg->type)) {
  522. case METADATA_UPDATED:
  523. process_metadata_update(mddev, msg);
  524. break;
  525. case CHANGE_CAPACITY:
  526. set_capacity(mddev->gendisk, mddev->array_sectors);
  527. revalidate_disk(mddev->gendisk);
  528. break;
  529. case RESYNCING:
  530. set_bit(MD_RESYNCING_REMOTE, &mddev->recovery);
  531. process_suspend_info(mddev, le32_to_cpu(msg->slot),
  532. le64_to_cpu(msg->low),
  533. le64_to_cpu(msg->high));
  534. break;
  535. case NEWDISK:
  536. process_add_new_disk(mddev, msg);
  537. break;
  538. case REMOVE:
  539. process_remove_disk(mddev, msg);
  540. break;
  541. case RE_ADD:
  542. process_readd_disk(mddev, msg);
  543. break;
  544. case BITMAP_NEEDS_SYNC:
  545. __recover_slot(mddev, le32_to_cpu(msg->slot));
  546. break;
  547. default:
  548. ret = -1;
  549. pr_warn("%s:%d Received unknown message from %d\n",
  550. __func__, __LINE__, msg->slot);
  551. }
  552. return ret;
  553. }
  554. /*
  555. * thread for receiving message
  556. */
  557. static void recv_daemon(struct md_thread *thread)
  558. {
  559. struct md_cluster_info *cinfo = thread->mddev->cluster_info;
  560. struct dlm_lock_resource *ack_lockres = cinfo->ack_lockres;
  561. struct dlm_lock_resource *message_lockres = cinfo->message_lockres;
  562. struct cluster_msg msg;
  563. int ret;
  564. mutex_lock(&cinfo->recv_mutex);
  565. /*get CR on Message*/
  566. if (dlm_lock_sync(message_lockres, DLM_LOCK_CR)) {
  567. pr_err("md/raid1:failed to get CR on MESSAGE\n");
  568. mutex_unlock(&cinfo->recv_mutex);
  569. return;
  570. }
  571. /* read lvb and wake up thread to process this message_lockres */
  572. memcpy(&msg, message_lockres->lksb.sb_lvbptr, sizeof(struct cluster_msg));
  573. ret = process_recvd_msg(thread->mddev, &msg);
  574. if (ret)
  575. goto out;
  576. /*release CR on ack_lockres*/
  577. ret = dlm_unlock_sync(ack_lockres);
  578. if (unlikely(ret != 0))
  579. pr_info("unlock ack failed return %d\n", ret);
  580. /*up-convert to PR on message_lockres*/
  581. ret = dlm_lock_sync(message_lockres, DLM_LOCK_PR);
  582. if (unlikely(ret != 0))
  583. pr_info("lock PR on msg failed return %d\n", ret);
  584. /*get CR on ack_lockres again*/
  585. ret = dlm_lock_sync(ack_lockres, DLM_LOCK_CR);
  586. if (unlikely(ret != 0))
  587. pr_info("lock CR on ack failed return %d\n", ret);
  588. out:
  589. /*release CR on message_lockres*/
  590. ret = dlm_unlock_sync(message_lockres);
  591. if (unlikely(ret != 0))
  592. pr_info("unlock msg failed return %d\n", ret);
  593. mutex_unlock(&cinfo->recv_mutex);
  594. }
  595. /* lock_token()
  596. * Takes the lock on the TOKEN lock resource so no other
  597. * node can communicate while the operation is underway.
  598. */
  599. static int lock_token(struct md_cluster_info *cinfo)
  600. {
  601. int error;
  602. error = dlm_lock_sync(cinfo->token_lockres, DLM_LOCK_EX);
  603. if (error) {
  604. pr_err("md-cluster(%s:%d): failed to get EX on TOKEN (%d)\n",
  605. __func__, __LINE__, error);
  606. } else {
  607. /* Lock the receive sequence */
  608. mutex_lock(&cinfo->recv_mutex);
  609. }
  610. return error;
  611. }
  612. /* lock_comm()
  613. * Sets the MD_CLUSTER_SEND_LOCK bit to lock the send channel.
  614. */
  615. static int lock_comm(struct md_cluster_info *cinfo, bool mddev_locked)
  616. {
  617. int rv, set_bit = 0;
  618. struct mddev *mddev = cinfo->mddev;
  619. /*
  620. * If resync thread run after raid1d thread, then process_metadata_update
  621. * could not continue if raid1d held reconfig_mutex (and raid1d is blocked
  622. * since another node already got EX on Token and waitting the EX of Ack),
  623. * so let resync wake up thread in case flag is set.
  624. */
  625. if (mddev_locked && !test_bit(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD,
  626. &cinfo->state)) {
  627. rv = test_and_set_bit_lock(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD,
  628. &cinfo->state);
  629. WARN_ON_ONCE(rv);
  630. md_wakeup_thread(mddev->thread);
  631. set_bit = 1;
  632. }
  633. wait_event(cinfo->wait,
  634. !test_and_set_bit(MD_CLUSTER_SEND_LOCK, &cinfo->state));
  635. rv = lock_token(cinfo);
  636. if (set_bit)
  637. clear_bit_unlock(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD, &cinfo->state);
  638. return rv;
  639. }
  640. static void unlock_comm(struct md_cluster_info *cinfo)
  641. {
  642. WARN_ON(cinfo->token_lockres->mode != DLM_LOCK_EX);
  643. mutex_unlock(&cinfo->recv_mutex);
  644. dlm_unlock_sync(cinfo->token_lockres);
  645. clear_bit(MD_CLUSTER_SEND_LOCK, &cinfo->state);
  646. wake_up(&cinfo->wait);
  647. }
  648. /* __sendmsg()
  649. * This function performs the actual sending of the message. This function is
  650. * usually called after performing the encompassing operation
  651. * The function:
  652. * 1. Grabs the message lockresource in EX mode
  653. * 2. Copies the message to the message LVB
  654. * 3. Downconverts message lockresource to CW
  655. * 4. Upconverts ack lock resource from CR to EX. This forces the BAST on other nodes
  656. * and the other nodes read the message. The thread will wait here until all other
  657. * nodes have released ack lock resource.
  658. * 5. Downconvert ack lockresource to CR
  659. */
  660. static int __sendmsg(struct md_cluster_info *cinfo, struct cluster_msg *cmsg)
  661. {
  662. int error;
  663. int slot = cinfo->slot_number - 1;
  664. cmsg->slot = cpu_to_le32(slot);
  665. /*get EX on Message*/
  666. error = dlm_lock_sync(cinfo->message_lockres, DLM_LOCK_EX);
  667. if (error) {
  668. pr_err("md-cluster: failed to get EX on MESSAGE (%d)\n", error);
  669. goto failed_message;
  670. }
  671. memcpy(cinfo->message_lockres->lksb.sb_lvbptr, (void *)cmsg,
  672. sizeof(struct cluster_msg));
  673. /*down-convert EX to CW on Message*/
  674. error = dlm_lock_sync(cinfo->message_lockres, DLM_LOCK_CW);
  675. if (error) {
  676. pr_err("md-cluster: failed to convert EX to CW on MESSAGE(%d)\n",
  677. error);
  678. goto failed_ack;
  679. }
  680. /*up-convert CR to EX on Ack*/
  681. error = dlm_lock_sync(cinfo->ack_lockres, DLM_LOCK_EX);
  682. if (error) {
  683. pr_err("md-cluster: failed to convert CR to EX on ACK(%d)\n",
  684. error);
  685. goto failed_ack;
  686. }
  687. /*down-convert EX to CR on Ack*/
  688. error = dlm_lock_sync(cinfo->ack_lockres, DLM_LOCK_CR);
  689. if (error) {
  690. pr_err("md-cluster: failed to convert EX to CR on ACK(%d)\n",
  691. error);
  692. goto failed_ack;
  693. }
  694. failed_ack:
  695. error = dlm_unlock_sync(cinfo->message_lockres);
  696. if (unlikely(error != 0)) {
  697. pr_err("md-cluster: failed convert to NL on MESSAGE(%d)\n",
  698. error);
  699. /* in case the message can't be released due to some reason */
  700. goto failed_ack;
  701. }
  702. failed_message:
  703. return error;
  704. }
  705. static int sendmsg(struct md_cluster_info *cinfo, struct cluster_msg *cmsg,
  706. bool mddev_locked)
  707. {
  708. int ret;
  709. ret = lock_comm(cinfo, mddev_locked);
  710. if (!ret) {
  711. ret = __sendmsg(cinfo, cmsg);
  712. unlock_comm(cinfo);
  713. }
  714. return ret;
  715. }
  716. static int gather_all_resync_info(struct mddev *mddev, int total_slots)
  717. {
  718. struct md_cluster_info *cinfo = mddev->cluster_info;
  719. int i, ret = 0;
  720. struct dlm_lock_resource *bm_lockres;
  721. struct suspend_info *s;
  722. char str[64];
  723. sector_t lo, hi;
  724. for (i = 0; i < total_slots; i++) {
  725. memset(str, '\0', 64);
  726. snprintf(str, 64, "bitmap%04d", i);
  727. bm_lockres = lockres_init(mddev, str, NULL, 1);
  728. if (!bm_lockres)
  729. return -ENOMEM;
  730. if (i == (cinfo->slot_number - 1)) {
  731. lockres_free(bm_lockres);
  732. continue;
  733. }
  734. bm_lockres->flags |= DLM_LKF_NOQUEUE;
  735. ret = dlm_lock_sync(bm_lockres, DLM_LOCK_PW);
  736. if (ret == -EAGAIN) {
  737. s = read_resync_info(mddev, bm_lockres);
  738. if (s) {
  739. pr_info("%s:%d Resync[%llu..%llu] in progress on %d\n",
  740. __func__, __LINE__,
  741. (unsigned long long) s->lo,
  742. (unsigned long long) s->hi, i);
  743. spin_lock_irq(&cinfo->suspend_lock);
  744. s->slot = i;
  745. list_add(&s->list, &cinfo->suspend_list);
  746. spin_unlock_irq(&cinfo->suspend_lock);
  747. }
  748. ret = 0;
  749. lockres_free(bm_lockres);
  750. continue;
  751. }
  752. if (ret) {
  753. lockres_free(bm_lockres);
  754. goto out;
  755. }
  756. /* Read the disk bitmap sb and check if it needs recovery */
  757. ret = md_bitmap_copy_from_slot(mddev, i, &lo, &hi, false);
  758. if (ret) {
  759. pr_warn("md-cluster: Could not gather bitmaps from slot %d", i);
  760. lockres_free(bm_lockres);
  761. continue;
  762. }
  763. if ((hi > 0) && (lo < mddev->recovery_cp)) {
  764. set_bit(MD_RECOVERY_NEEDED, &mddev->recovery);
  765. mddev->recovery_cp = lo;
  766. md_check_recovery(mddev);
  767. }
  768. lockres_free(bm_lockres);
  769. }
  770. out:
  771. return ret;
  772. }
  773. static int join(struct mddev *mddev, int nodes)
  774. {
  775. struct md_cluster_info *cinfo;
  776. int ret, ops_rv;
  777. char str[64];
  778. cinfo = kzalloc(sizeof(struct md_cluster_info), GFP_KERNEL);
  779. if (!cinfo)
  780. return -ENOMEM;
  781. INIT_LIST_HEAD(&cinfo->suspend_list);
  782. spin_lock_init(&cinfo->suspend_lock);
  783. init_completion(&cinfo->completion);
  784. set_bit(MD_CLUSTER_BEGIN_JOIN_CLUSTER, &cinfo->state);
  785. init_waitqueue_head(&cinfo->wait);
  786. mutex_init(&cinfo->recv_mutex);
  787. mddev->cluster_info = cinfo;
  788. cinfo->mddev = mddev;
  789. memset(str, 0, 64);
  790. sprintf(str, "%pU", mddev->uuid);
  791. ret = dlm_new_lockspace(str, mddev->bitmap_info.cluster_name,
  792. DLM_LSFL_FS, LVB_SIZE,
  793. &md_ls_ops, mddev, &ops_rv, &cinfo->lockspace);
  794. if (ret)
  795. goto err;
  796. wait_for_completion(&cinfo->completion);
  797. if (nodes < cinfo->slot_number) {
  798. pr_err("md-cluster: Slot allotted(%d) is greater than available slots(%d).",
  799. cinfo->slot_number, nodes);
  800. ret = -ERANGE;
  801. goto err;
  802. }
  803. /* Initiate the communication resources */
  804. ret = -ENOMEM;
  805. cinfo->recv_thread = md_register_thread(recv_daemon, mddev, "cluster_recv");
  806. if (!cinfo->recv_thread) {
  807. pr_err("md-cluster: cannot allocate memory for recv_thread!\n");
  808. goto err;
  809. }
  810. cinfo->message_lockres = lockres_init(mddev, "message", NULL, 1);
  811. if (!cinfo->message_lockres)
  812. goto err;
  813. cinfo->token_lockres = lockres_init(mddev, "token", NULL, 0);
  814. if (!cinfo->token_lockres)
  815. goto err;
  816. cinfo->no_new_dev_lockres = lockres_init(mddev, "no-new-dev", NULL, 0);
  817. if (!cinfo->no_new_dev_lockres)
  818. goto err;
  819. ret = dlm_lock_sync(cinfo->token_lockres, DLM_LOCK_EX);
  820. if (ret) {
  821. ret = -EAGAIN;
  822. pr_err("md-cluster: can't join cluster to avoid lock issue\n");
  823. goto err;
  824. }
  825. cinfo->ack_lockres = lockres_init(mddev, "ack", ack_bast, 0);
  826. if (!cinfo->ack_lockres) {
  827. ret = -ENOMEM;
  828. goto err;
  829. }
  830. /* get sync CR lock on ACK. */
  831. if (dlm_lock_sync(cinfo->ack_lockres, DLM_LOCK_CR))
  832. pr_err("md-cluster: failed to get a sync CR lock on ACK!(%d)\n",
  833. ret);
  834. dlm_unlock_sync(cinfo->token_lockres);
  835. /* get sync CR lock on no-new-dev. */
  836. if (dlm_lock_sync(cinfo->no_new_dev_lockres, DLM_LOCK_CR))
  837. pr_err("md-cluster: failed to get a sync CR lock on no-new-dev!(%d)\n", ret);
  838. pr_info("md-cluster: Joined cluster %s slot %d\n", str, cinfo->slot_number);
  839. snprintf(str, 64, "bitmap%04d", cinfo->slot_number - 1);
  840. cinfo->bitmap_lockres = lockres_init(mddev, str, NULL, 1);
  841. if (!cinfo->bitmap_lockres) {
  842. ret = -ENOMEM;
  843. goto err;
  844. }
  845. if (dlm_lock_sync(cinfo->bitmap_lockres, DLM_LOCK_PW)) {
  846. pr_err("Failed to get bitmap lock\n");
  847. ret = -EINVAL;
  848. goto err;
  849. }
  850. cinfo->resync_lockres = lockres_init(mddev, "resync", NULL, 0);
  851. if (!cinfo->resync_lockres) {
  852. ret = -ENOMEM;
  853. goto err;
  854. }
  855. return 0;
  856. err:
  857. set_bit(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD, &cinfo->state);
  858. md_unregister_thread(&cinfo->recovery_thread);
  859. md_unregister_thread(&cinfo->recv_thread);
  860. lockres_free(cinfo->message_lockres);
  861. lockres_free(cinfo->token_lockres);
  862. lockres_free(cinfo->ack_lockres);
  863. lockres_free(cinfo->no_new_dev_lockres);
  864. lockres_free(cinfo->resync_lockres);
  865. lockres_free(cinfo->bitmap_lockres);
  866. if (cinfo->lockspace)
  867. dlm_release_lockspace(cinfo->lockspace, 2);
  868. mddev->cluster_info = NULL;
  869. kfree(cinfo);
  870. return ret;
  871. }
  872. static void load_bitmaps(struct mddev *mddev, int total_slots)
  873. {
  874. struct md_cluster_info *cinfo = mddev->cluster_info;
  875. /* load all the node's bitmap info for resync */
  876. if (gather_all_resync_info(mddev, total_slots))
  877. pr_err("md-cluster: failed to gather all resyn infos\n");
  878. set_bit(MD_CLUSTER_ALREADY_IN_CLUSTER, &cinfo->state);
  879. /* wake up recv thread in case something need to be handled */
  880. if (test_and_clear_bit(MD_CLUSTER_PENDING_RECV_EVENT, &cinfo->state))
  881. md_wakeup_thread(cinfo->recv_thread);
  882. }
  883. static void resync_bitmap(struct mddev *mddev)
  884. {
  885. struct md_cluster_info *cinfo = mddev->cluster_info;
  886. struct cluster_msg cmsg = {0};
  887. int err;
  888. cmsg.type = cpu_to_le32(BITMAP_NEEDS_SYNC);
  889. err = sendmsg(cinfo, &cmsg, 1);
  890. if (err)
  891. pr_err("%s:%d: failed to send BITMAP_NEEDS_SYNC message (%d)\n",
  892. __func__, __LINE__, err);
  893. }
  894. static void unlock_all_bitmaps(struct mddev *mddev);
  895. static int leave(struct mddev *mddev)
  896. {
  897. struct md_cluster_info *cinfo = mddev->cluster_info;
  898. if (!cinfo)
  899. return 0;
  900. /* BITMAP_NEEDS_SYNC message should be sent when node
  901. * is leaving the cluster with dirty bitmap, also we
  902. * can only deliver it when dlm connection is available */
  903. if (cinfo->slot_number > 0 && mddev->recovery_cp != MaxSector)
  904. resync_bitmap(mddev);
  905. set_bit(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD, &cinfo->state);
  906. md_unregister_thread(&cinfo->recovery_thread);
  907. md_unregister_thread(&cinfo->recv_thread);
  908. lockres_free(cinfo->message_lockres);
  909. lockres_free(cinfo->token_lockres);
  910. lockres_free(cinfo->ack_lockres);
  911. lockres_free(cinfo->no_new_dev_lockres);
  912. lockres_free(cinfo->resync_lockres);
  913. lockres_free(cinfo->bitmap_lockres);
  914. unlock_all_bitmaps(mddev);
  915. dlm_release_lockspace(cinfo->lockspace, 2);
  916. kfree(cinfo);
  917. return 0;
  918. }
  919. /* slot_number(): Returns the MD slot number to use
  920. * DLM starts the slot numbers from 1, wheras cluster-md
  921. * wants the number to be from zero, so we deduct one
  922. */
  923. static int slot_number(struct mddev *mddev)
  924. {
  925. struct md_cluster_info *cinfo = mddev->cluster_info;
  926. return cinfo->slot_number - 1;
  927. }
  928. /*
  929. * Check if the communication is already locked, else lock the communication
  930. * channel.
  931. * If it is already locked, token is in EX mode, and hence lock_token()
  932. * should not be called.
  933. */
  934. static int metadata_update_start(struct mddev *mddev)
  935. {
  936. struct md_cluster_info *cinfo = mddev->cluster_info;
  937. int ret;
  938. /*
  939. * metadata_update_start is always called with the protection of
  940. * reconfig_mutex, so set WAITING_FOR_TOKEN here.
  941. */
  942. ret = test_and_set_bit_lock(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD,
  943. &cinfo->state);
  944. WARN_ON_ONCE(ret);
  945. md_wakeup_thread(mddev->thread);
  946. wait_event(cinfo->wait,
  947. !test_and_set_bit(MD_CLUSTER_SEND_LOCK, &cinfo->state) ||
  948. test_and_clear_bit(MD_CLUSTER_SEND_LOCKED_ALREADY, &cinfo->state));
  949. /* If token is already locked, return 0 */
  950. if (cinfo->token_lockres->mode == DLM_LOCK_EX) {
  951. clear_bit_unlock(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD, &cinfo->state);
  952. return 0;
  953. }
  954. ret = lock_token(cinfo);
  955. clear_bit_unlock(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD, &cinfo->state);
  956. return ret;
  957. }
  958. static int metadata_update_finish(struct mddev *mddev)
  959. {
  960. struct md_cluster_info *cinfo = mddev->cluster_info;
  961. struct cluster_msg cmsg;
  962. struct md_rdev *rdev;
  963. int ret = 0;
  964. int raid_slot = -1;
  965. memset(&cmsg, 0, sizeof(cmsg));
  966. cmsg.type = cpu_to_le32(METADATA_UPDATED);
  967. /* Pick up a good active device number to send.
  968. */
  969. rdev_for_each(rdev, mddev)
  970. if (rdev->raid_disk > -1 && !test_bit(Faulty, &rdev->flags)) {
  971. raid_slot = rdev->desc_nr;
  972. break;
  973. }
  974. if (raid_slot >= 0) {
  975. cmsg.raid_slot = cpu_to_le32(raid_slot);
  976. ret = __sendmsg(cinfo, &cmsg);
  977. } else
  978. pr_warn("md-cluster: No good device id found to send\n");
  979. clear_bit(MD_CLUSTER_SEND_LOCKED_ALREADY, &cinfo->state);
  980. unlock_comm(cinfo);
  981. return ret;
  982. }
  983. static void metadata_update_cancel(struct mddev *mddev)
  984. {
  985. struct md_cluster_info *cinfo = mddev->cluster_info;
  986. clear_bit(MD_CLUSTER_SEND_LOCKED_ALREADY, &cinfo->state);
  987. unlock_comm(cinfo);
  988. }
  989. /*
  990. * return 0 if all the bitmaps have the same sync_size
  991. */
  992. static int cluster_check_sync_size(struct mddev *mddev)
  993. {
  994. int i, rv;
  995. bitmap_super_t *sb;
  996. unsigned long my_sync_size, sync_size = 0;
  997. int node_num = mddev->bitmap_info.nodes;
  998. int current_slot = md_cluster_ops->slot_number(mddev);
  999. struct bitmap *bitmap = mddev->bitmap;
  1000. char str[64];
  1001. struct dlm_lock_resource *bm_lockres;
  1002. sb = kmap_atomic(bitmap->storage.sb_page);
  1003. my_sync_size = sb->sync_size;
  1004. kunmap_atomic(sb);
  1005. for (i = 0; i < node_num; i++) {
  1006. if (i == current_slot)
  1007. continue;
  1008. bitmap = get_bitmap_from_slot(mddev, i);
  1009. if (IS_ERR(bitmap)) {
  1010. pr_err("can't get bitmap from slot %d\n", i);
  1011. return -1;
  1012. }
  1013. /*
  1014. * If we can hold the bitmap lock of one node then
  1015. * the slot is not occupied, update the sb.
  1016. */
  1017. snprintf(str, 64, "bitmap%04d", i);
  1018. bm_lockres = lockres_init(mddev, str, NULL, 1);
  1019. if (!bm_lockres) {
  1020. pr_err("md-cluster: Cannot initialize %s\n", str);
  1021. md_bitmap_free(bitmap);
  1022. return -1;
  1023. }
  1024. bm_lockres->flags |= DLM_LKF_NOQUEUE;
  1025. rv = dlm_lock_sync(bm_lockres, DLM_LOCK_PW);
  1026. if (!rv)
  1027. md_bitmap_update_sb(bitmap);
  1028. lockres_free(bm_lockres);
  1029. sb = kmap_atomic(bitmap->storage.sb_page);
  1030. if (sync_size == 0)
  1031. sync_size = sb->sync_size;
  1032. else if (sync_size != sb->sync_size) {
  1033. kunmap_atomic(sb);
  1034. md_bitmap_free(bitmap);
  1035. return -1;
  1036. }
  1037. kunmap_atomic(sb);
  1038. md_bitmap_free(bitmap);
  1039. }
  1040. return (my_sync_size == sync_size) ? 0 : -1;
  1041. }
  1042. /*
  1043. * Update the size for cluster raid is a little more complex, we perform it
  1044. * by the steps:
  1045. * 1. hold token lock and update superblock in initiator node.
  1046. * 2. send METADATA_UPDATED msg to other nodes.
  1047. * 3. The initiator node continues to check each bitmap's sync_size, if all
  1048. * bitmaps have the same value of sync_size, then we can set capacity and
  1049. * let other nodes to perform it. If one node can't update sync_size
  1050. * accordingly, we need to revert to previous value.
  1051. */
  1052. static void update_size(struct mddev *mddev, sector_t old_dev_sectors)
  1053. {
  1054. struct md_cluster_info *cinfo = mddev->cluster_info;
  1055. struct cluster_msg cmsg;
  1056. struct md_rdev *rdev;
  1057. int ret = 0;
  1058. int raid_slot = -1;
  1059. md_update_sb(mddev, 1);
  1060. if (lock_comm(cinfo, 1)) {
  1061. pr_err("%s: lock_comm failed\n", __func__);
  1062. return;
  1063. }
  1064. memset(&cmsg, 0, sizeof(cmsg));
  1065. cmsg.type = cpu_to_le32(METADATA_UPDATED);
  1066. rdev_for_each(rdev, mddev)
  1067. if (rdev->raid_disk >= 0 && !test_bit(Faulty, &rdev->flags)) {
  1068. raid_slot = rdev->desc_nr;
  1069. break;
  1070. }
  1071. if (raid_slot >= 0) {
  1072. cmsg.raid_slot = cpu_to_le32(raid_slot);
  1073. /*
  1074. * We can only change capiticy after all the nodes can do it,
  1075. * so need to wait after other nodes already received the msg
  1076. * and handled the change
  1077. */
  1078. ret = __sendmsg(cinfo, &cmsg);
  1079. if (ret) {
  1080. pr_err("%s:%d: failed to send METADATA_UPDATED msg\n",
  1081. __func__, __LINE__);
  1082. unlock_comm(cinfo);
  1083. return;
  1084. }
  1085. } else {
  1086. pr_err("md-cluster: No good device id found to send\n");
  1087. unlock_comm(cinfo);
  1088. return;
  1089. }
  1090. /*
  1091. * check the sync_size from other node's bitmap, if sync_size
  1092. * have already updated in other nodes as expected, send an
  1093. * empty metadata msg to permit the change of capacity
  1094. */
  1095. if (cluster_check_sync_size(mddev) == 0) {
  1096. memset(&cmsg, 0, sizeof(cmsg));
  1097. cmsg.type = cpu_to_le32(CHANGE_CAPACITY);
  1098. ret = __sendmsg(cinfo, &cmsg);
  1099. if (ret)
  1100. pr_err("%s:%d: failed to send CHANGE_CAPACITY msg\n",
  1101. __func__, __LINE__);
  1102. set_capacity(mddev->gendisk, mddev->array_sectors);
  1103. revalidate_disk(mddev->gendisk);
  1104. } else {
  1105. /* revert to previous sectors */
  1106. ret = mddev->pers->resize(mddev, old_dev_sectors);
  1107. if (!ret)
  1108. revalidate_disk(mddev->gendisk);
  1109. ret = __sendmsg(cinfo, &cmsg);
  1110. if (ret)
  1111. pr_err("%s:%d: failed to send METADATA_UPDATED msg\n",
  1112. __func__, __LINE__);
  1113. }
  1114. unlock_comm(cinfo);
  1115. }
  1116. static int resync_start(struct mddev *mddev)
  1117. {
  1118. struct md_cluster_info *cinfo = mddev->cluster_info;
  1119. return dlm_lock_sync_interruptible(cinfo->resync_lockres, DLM_LOCK_EX, mddev);
  1120. }
  1121. static int resync_info_update(struct mddev *mddev, sector_t lo, sector_t hi)
  1122. {
  1123. struct md_cluster_info *cinfo = mddev->cluster_info;
  1124. struct resync_info ri;
  1125. struct cluster_msg cmsg = {0};
  1126. /* do not send zero again, if we have sent before */
  1127. if (hi == 0) {
  1128. memcpy(&ri, cinfo->bitmap_lockres->lksb.sb_lvbptr, sizeof(struct resync_info));
  1129. if (le64_to_cpu(ri.hi) == 0)
  1130. return 0;
  1131. }
  1132. add_resync_info(cinfo->bitmap_lockres, lo, hi);
  1133. /* Re-acquire the lock to refresh LVB */
  1134. dlm_lock_sync(cinfo->bitmap_lockres, DLM_LOCK_PW);
  1135. cmsg.type = cpu_to_le32(RESYNCING);
  1136. cmsg.low = cpu_to_le64(lo);
  1137. cmsg.high = cpu_to_le64(hi);
  1138. /*
  1139. * mddev_lock is held if resync_info_update is called from
  1140. * resync_finish (md_reap_sync_thread -> resync_finish)
  1141. */
  1142. if (lo == 0 && hi == 0)
  1143. return sendmsg(cinfo, &cmsg, 1);
  1144. else
  1145. return sendmsg(cinfo, &cmsg, 0);
  1146. }
  1147. static int resync_finish(struct mddev *mddev)
  1148. {
  1149. struct md_cluster_info *cinfo = mddev->cluster_info;
  1150. int ret = 0;
  1151. clear_bit(MD_RESYNCING_REMOTE, &mddev->recovery);
  1152. /*
  1153. * If resync thread is interrupted so we can't say resync is finished,
  1154. * another node will launch resync thread to continue.
  1155. */
  1156. if (!test_bit(MD_CLOSING, &mddev->flags))
  1157. ret = resync_info_update(mddev, 0, 0);
  1158. dlm_unlock_sync(cinfo->resync_lockres);
  1159. return ret;
  1160. }
  1161. static int area_resyncing(struct mddev *mddev, int direction,
  1162. sector_t lo, sector_t hi)
  1163. {
  1164. struct md_cluster_info *cinfo = mddev->cluster_info;
  1165. int ret = 0;
  1166. struct suspend_info *s;
  1167. if ((direction == READ) &&
  1168. test_bit(MD_CLUSTER_SUSPEND_READ_BALANCING, &cinfo->state))
  1169. return 1;
  1170. spin_lock_irq(&cinfo->suspend_lock);
  1171. if (list_empty(&cinfo->suspend_list))
  1172. goto out;
  1173. list_for_each_entry(s, &cinfo->suspend_list, list)
  1174. if (hi > s->lo && lo < s->hi) {
  1175. ret = 1;
  1176. break;
  1177. }
  1178. out:
  1179. spin_unlock_irq(&cinfo->suspend_lock);
  1180. return ret;
  1181. }
  1182. /* add_new_disk() - initiates a disk add
  1183. * However, if this fails before writing md_update_sb(),
  1184. * add_new_disk_cancel() must be called to release token lock
  1185. */
  1186. static int add_new_disk(struct mddev *mddev, struct md_rdev *rdev)
  1187. {
  1188. struct md_cluster_info *cinfo = mddev->cluster_info;
  1189. struct cluster_msg cmsg;
  1190. int ret = 0;
  1191. struct mdp_superblock_1 *sb = page_address(rdev->sb_page);
  1192. char *uuid = sb->device_uuid;
  1193. memset(&cmsg, 0, sizeof(cmsg));
  1194. cmsg.type = cpu_to_le32(NEWDISK);
  1195. memcpy(cmsg.uuid, uuid, 16);
  1196. cmsg.raid_slot = cpu_to_le32(rdev->desc_nr);
  1197. if (lock_comm(cinfo, 1))
  1198. return -EAGAIN;
  1199. ret = __sendmsg(cinfo, &cmsg);
  1200. if (ret) {
  1201. unlock_comm(cinfo);
  1202. return ret;
  1203. }
  1204. cinfo->no_new_dev_lockres->flags |= DLM_LKF_NOQUEUE;
  1205. ret = dlm_lock_sync(cinfo->no_new_dev_lockres, DLM_LOCK_EX);
  1206. cinfo->no_new_dev_lockres->flags &= ~DLM_LKF_NOQUEUE;
  1207. /* Some node does not "see" the device */
  1208. if (ret == -EAGAIN)
  1209. ret = -ENOENT;
  1210. if (ret)
  1211. unlock_comm(cinfo);
  1212. else {
  1213. dlm_lock_sync(cinfo->no_new_dev_lockres, DLM_LOCK_CR);
  1214. /* Since MD_CHANGE_DEVS will be set in add_bound_rdev which
  1215. * will run soon after add_new_disk, the below path will be
  1216. * invoked:
  1217. * md_wakeup_thread(mddev->thread)
  1218. * -> conf->thread (raid1d)
  1219. * -> md_check_recovery -> md_update_sb
  1220. * -> metadata_update_start/finish
  1221. * MD_CLUSTER_SEND_LOCKED_ALREADY will be cleared eventually.
  1222. *
  1223. * For other failure cases, metadata_update_cancel and
  1224. * add_new_disk_cancel also clear below bit as well.
  1225. * */
  1226. set_bit(MD_CLUSTER_SEND_LOCKED_ALREADY, &cinfo->state);
  1227. wake_up(&cinfo->wait);
  1228. }
  1229. return ret;
  1230. }
  1231. static void add_new_disk_cancel(struct mddev *mddev)
  1232. {
  1233. struct md_cluster_info *cinfo = mddev->cluster_info;
  1234. clear_bit(MD_CLUSTER_SEND_LOCKED_ALREADY, &cinfo->state);
  1235. unlock_comm(cinfo);
  1236. }
  1237. static int new_disk_ack(struct mddev *mddev, bool ack)
  1238. {
  1239. struct md_cluster_info *cinfo = mddev->cluster_info;
  1240. if (!test_bit(MD_CLUSTER_WAITING_FOR_NEWDISK, &cinfo->state)) {
  1241. pr_warn("md-cluster(%s): Spurious cluster confirmation\n", mdname(mddev));
  1242. return -EINVAL;
  1243. }
  1244. if (ack)
  1245. dlm_unlock_sync(cinfo->no_new_dev_lockres);
  1246. complete(&cinfo->newdisk_completion);
  1247. return 0;
  1248. }
  1249. static int remove_disk(struct mddev *mddev, struct md_rdev *rdev)
  1250. {
  1251. struct cluster_msg cmsg = {0};
  1252. struct md_cluster_info *cinfo = mddev->cluster_info;
  1253. cmsg.type = cpu_to_le32(REMOVE);
  1254. cmsg.raid_slot = cpu_to_le32(rdev->desc_nr);
  1255. return sendmsg(cinfo, &cmsg, 1);
  1256. }
  1257. static int lock_all_bitmaps(struct mddev *mddev)
  1258. {
  1259. int slot, my_slot, ret, held = 1, i = 0;
  1260. char str[64];
  1261. struct md_cluster_info *cinfo = mddev->cluster_info;
  1262. cinfo->other_bitmap_lockres =
  1263. kcalloc(mddev->bitmap_info.nodes - 1,
  1264. sizeof(struct dlm_lock_resource *), GFP_KERNEL);
  1265. if (!cinfo->other_bitmap_lockres) {
  1266. pr_err("md: can't alloc mem for other bitmap locks\n");
  1267. return 0;
  1268. }
  1269. my_slot = slot_number(mddev);
  1270. for (slot = 0; slot < mddev->bitmap_info.nodes; slot++) {
  1271. if (slot == my_slot)
  1272. continue;
  1273. memset(str, '\0', 64);
  1274. snprintf(str, 64, "bitmap%04d", slot);
  1275. cinfo->other_bitmap_lockres[i] = lockres_init(mddev, str, NULL, 1);
  1276. if (!cinfo->other_bitmap_lockres[i])
  1277. return -ENOMEM;
  1278. cinfo->other_bitmap_lockres[i]->flags |= DLM_LKF_NOQUEUE;
  1279. ret = dlm_lock_sync(cinfo->other_bitmap_lockres[i], DLM_LOCK_PW);
  1280. if (ret)
  1281. held = -1;
  1282. i++;
  1283. }
  1284. return held;
  1285. }
  1286. static void unlock_all_bitmaps(struct mddev *mddev)
  1287. {
  1288. struct md_cluster_info *cinfo = mddev->cluster_info;
  1289. int i;
  1290. /* release other node's bitmap lock if they are existed */
  1291. if (cinfo->other_bitmap_lockres) {
  1292. for (i = 0; i < mddev->bitmap_info.nodes - 1; i++) {
  1293. if (cinfo->other_bitmap_lockres[i]) {
  1294. lockres_free(cinfo->other_bitmap_lockres[i]);
  1295. }
  1296. }
  1297. kfree(cinfo->other_bitmap_lockres);
  1298. cinfo->other_bitmap_lockres = NULL;
  1299. }
  1300. }
  1301. static int gather_bitmaps(struct md_rdev *rdev)
  1302. {
  1303. int sn, err;
  1304. sector_t lo, hi;
  1305. struct cluster_msg cmsg = {0};
  1306. struct mddev *mddev = rdev->mddev;
  1307. struct md_cluster_info *cinfo = mddev->cluster_info;
  1308. cmsg.type = cpu_to_le32(RE_ADD);
  1309. cmsg.raid_slot = cpu_to_le32(rdev->desc_nr);
  1310. err = sendmsg(cinfo, &cmsg, 1);
  1311. if (err)
  1312. goto out;
  1313. for (sn = 0; sn < mddev->bitmap_info.nodes; sn++) {
  1314. if (sn == (cinfo->slot_number - 1))
  1315. continue;
  1316. err = md_bitmap_copy_from_slot(mddev, sn, &lo, &hi, false);
  1317. if (err) {
  1318. pr_warn("md-cluster: Could not gather bitmaps from slot %d", sn);
  1319. goto out;
  1320. }
  1321. if ((hi > 0) && (lo < mddev->recovery_cp))
  1322. mddev->recovery_cp = lo;
  1323. }
  1324. out:
  1325. return err;
  1326. }
  1327. static struct md_cluster_operations cluster_ops = {
  1328. .join = join,
  1329. .leave = leave,
  1330. .slot_number = slot_number,
  1331. .resync_start = resync_start,
  1332. .resync_finish = resync_finish,
  1333. .resync_info_update = resync_info_update,
  1334. .metadata_update_start = metadata_update_start,
  1335. .metadata_update_finish = metadata_update_finish,
  1336. .metadata_update_cancel = metadata_update_cancel,
  1337. .area_resyncing = area_resyncing,
  1338. .add_new_disk = add_new_disk,
  1339. .add_new_disk_cancel = add_new_disk_cancel,
  1340. .new_disk_ack = new_disk_ack,
  1341. .remove_disk = remove_disk,
  1342. .load_bitmaps = load_bitmaps,
  1343. .gather_bitmaps = gather_bitmaps,
  1344. .lock_all_bitmaps = lock_all_bitmaps,
  1345. .unlock_all_bitmaps = unlock_all_bitmaps,
  1346. .update_size = update_size,
  1347. };
  1348. static int __init cluster_init(void)
  1349. {
  1350. pr_warn("md-cluster: support raid1 and raid10 (limited support)\n");
  1351. pr_info("Registering Cluster MD functions\n");
  1352. register_md_cluster_operations(&cluster_ops, THIS_MODULE);
  1353. return 0;
  1354. }
  1355. static void cluster_exit(void)
  1356. {
  1357. unregister_md_cluster_operations();
  1358. }
  1359. module_init(cluster_init);
  1360. module_exit(cluster_exit);
  1361. MODULE_AUTHOR("SUSE");
  1362. MODULE_LICENSE("GPL");
  1363. MODULE_DESCRIPTION("Clustering support for MD");