update-xfails.py 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204
  1. #!/usr/bin/env python3
  2. import argparse
  3. from collections import defaultdict
  4. import difflib
  5. import os
  6. import re
  7. from glcollate import Collate
  8. from termcolor import colored
  9. from urllib.parse import urlparse
  10. def get_canonical_name(job_name):
  11. return re.split(r" \d+/\d+", job_name)[0]
  12. def get_xfails_file_path(job_name, suffix):
  13. canonical_name = get_canonical_name(job_name)
  14. name = canonical_name.replace(":", "-")
  15. script_dir = os.path.dirname(os.path.abspath(__file__))
  16. return os.path.join(script_dir, f"{name}-{suffix}.txt")
  17. def get_unit_test_name_and_results(unit_test):
  18. if "Artifact results/failures.csv not found" in unit_test or '' == unit_test:
  19. return None, None
  20. unit_test_name, unit_test_result = unit_test.strip().split(",")
  21. return unit_test_name, unit_test_result
  22. def read_file(file_path):
  23. try:
  24. with open(file_path, "r") as file:
  25. f = file.readlines()
  26. if len(f):
  27. f[-1] = f[-1].strip() + "\n"
  28. return f
  29. except FileNotFoundError:
  30. return []
  31. def save_file(content, file_path):
  32. # delete file is content is empty
  33. if not content or not any(content):
  34. if os.path.exists(file_path):
  35. os.remove(file_path)
  36. return
  37. with open(file_path, "w") as file:
  38. file.writelines(content)
  39. def is_test_present_on_file(file_content, unit_test_name):
  40. return any(unit_test_name in line for line in file_content)
  41. def is_unit_test_present_in_other_jobs(unit_test, job_ids):
  42. return all(unit_test in job_ids[job_id] for job_id in job_ids)
  43. def remove_unit_test_if_present(lines, unit_test_name):
  44. if not is_test_present_on_file(lines, unit_test_name):
  45. return
  46. lines[:] = [line for line in lines if unit_test_name not in line]
  47. def add_unit_test_if_not_present(lines, unit_test_name, file_name):
  48. # core_getversion is mandatory
  49. if "core_getversion" in unit_test_name:
  50. print("WARNING: core_getversion should pass, not adding it to", os.path.basename(file_name))
  51. elif all(unit_test_name not in line for line in lines):
  52. lines.append(unit_test_name + "\n")
  53. def update_unit_test_result_in_fails_txt(fails_txt, unit_test):
  54. unit_test_name, unit_test_result = get_unit_test_name_and_results(unit_test)
  55. for i, line in enumerate(fails_txt):
  56. if unit_test_name in line:
  57. _, current_result = get_unit_test_name_and_results(line)
  58. fails_txt[i] = unit_test + "\n"
  59. return
  60. def add_unit_test_or_update_result_to_fails_if_present(fails_txt, unit_test, fails_txt_path):
  61. unit_test_name, _ = get_unit_test_name_and_results(unit_test)
  62. if not is_test_present_on_file(fails_txt, unit_test_name):
  63. add_unit_test_if_not_present(fails_txt, unit_test, fails_txt_path)
  64. # if it is present but not with the same result
  65. elif not is_test_present_on_file(fails_txt, unit_test):
  66. update_unit_test_result_in_fails_txt(fails_txt, unit_test)
  67. def split_unit_test_from_collate(xfails):
  68. for job_name in xfails.keys():
  69. for job_id in xfails[job_name].copy().keys():
  70. if "not found" in xfails[job_name][job_id].content_as_str:
  71. del xfails[job_name][job_id]
  72. continue
  73. xfails[job_name][job_id] = xfails[job_name][job_id].content_as_str.splitlines()
  74. def get_xfails_from_pipeline_url(pipeline_url):
  75. parsed_url = urlparse(pipeline_url)
  76. path_components = parsed_url.path.strip("/").split("/")
  77. namespace = path_components[0]
  78. project = path_components[1]
  79. pipeline_id = path_components[-1]
  80. print("Collating from:", namespace, project, pipeline_id)
  81. xfails = (
  82. Collate(namespace=namespace, project=project)
  83. .from_pipeline(pipeline_id)
  84. .get_artifact("results/failures.csv")
  85. )
  86. split_unit_test_from_collate(xfails)
  87. return xfails
  88. def get_xfails_from_pipeline_urls(pipelines_urls):
  89. xfails = defaultdict(dict)
  90. for url in pipelines_urls:
  91. new_xfails = get_xfails_from_pipeline_url(url)
  92. for key in new_xfails:
  93. xfails[key].update(new_xfails[key])
  94. return xfails
  95. def print_diff(old_content, new_content, file_name):
  96. diff = difflib.unified_diff(old_content, new_content, lineterm="", fromfile=file_name, tofile=file_name)
  97. diff = [colored(line, "green") if line.startswith("+") else
  98. colored(line, "red") if line.startswith("-") else line for line in diff]
  99. print("\n".join(diff[:3]))
  100. print("".join(diff[3:]))
  101. def main(pipelines_urls, only_flakes):
  102. xfails = get_xfails_from_pipeline_urls(pipelines_urls)
  103. for job_name in xfails.keys():
  104. fails_txt_path = get_xfails_file_path(job_name, "fails")
  105. flakes_txt_path = get_xfails_file_path(job_name, "flakes")
  106. fails_txt = read_file(fails_txt_path)
  107. flakes_txt = read_file(flakes_txt_path)
  108. fails_txt_original = fails_txt.copy()
  109. flakes_txt_original = flakes_txt.copy()
  110. for job_id in xfails[job_name].keys():
  111. for unit_test in xfails[job_name][job_id]:
  112. unit_test_name, unit_test_result = get_unit_test_name_and_results(unit_test)
  113. if not unit_test_name:
  114. continue
  115. if only_flakes:
  116. remove_unit_test_if_present(fails_txt, unit_test_name)
  117. add_unit_test_if_not_present(flakes_txt, unit_test_name, flakes_txt_path)
  118. continue
  119. # drop it from flakes if it is present to analyze it again
  120. remove_unit_test_if_present(flakes_txt, unit_test_name)
  121. if unit_test_result == "UnexpectedPass":
  122. remove_unit_test_if_present(fails_txt, unit_test_name)
  123. # flake result
  124. if not is_unit_test_present_in_other_jobs(unit_test, xfails[job_name]):
  125. add_unit_test_if_not_present(flakes_txt, unit_test_name, flakes_txt_path)
  126. continue
  127. # flake result
  128. if not is_unit_test_present_in_other_jobs(unit_test, xfails[job_name]):
  129. remove_unit_test_if_present(fails_txt, unit_test_name)
  130. add_unit_test_if_not_present(flakes_txt, unit_test_name, flakes_txt_path)
  131. continue
  132. # consistent result
  133. add_unit_test_or_update_result_to_fails_if_present(fails_txt, unit_test,
  134. fails_txt_path)
  135. fails_txt.sort()
  136. flakes_txt.sort()
  137. if fails_txt != fails_txt_original:
  138. save_file(fails_txt, fails_txt_path)
  139. print_diff(fails_txt_original, fails_txt, os.path.basename(fails_txt_path))
  140. if flakes_txt != flakes_txt_original:
  141. save_file(flakes_txt, flakes_txt_path)
  142. print_diff(flakes_txt_original, flakes_txt, os.path.basename(flakes_txt_path))
  143. if __name__ == "__main__":
  144. parser = argparse.ArgumentParser(description="Update xfails from a given pipeline.")
  145. parser.add_argument("pipeline_urls", nargs="+", type=str, help="URLs to the pipelines to analyze the failures.")
  146. parser.add_argument("--only-flakes", action="store_true", help="Treat every detected failure as a flake, edit *-flakes.txt only.")
  147. args = parser.parse_args()
  148. main(args.pipeline_urls, args.only_flakes)
  149. print("Done.")