git.py 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170
  1. # -*- coding: utf-8 -*-
  2. import collections
  3. import datetime
  4. import logging
  5. import os
  6. import re
  7. import subprocess
  8. import tarfile
  9. import tempfile
  10. GitRef = collections.namedtuple(
  11. "VersionRef",
  12. [
  13. "name",
  14. "commit",
  15. "source",
  16. "is_remote",
  17. "refname",
  18. "creatordate",
  19. ],
  20. )
  21. logger = logging.getLogger(__name__)
  22. def get_toplevel_path(cwd=None):
  23. cmd = (
  24. "git",
  25. "rev-parse",
  26. "--show-toplevel",
  27. )
  28. output = subprocess.check_output(cmd, cwd=cwd).decode()
  29. return output.rstrip("\n")
  30. def get_all_refs(gitroot):
  31. cmd = (
  32. "git",
  33. "for-each-ref",
  34. "--format",
  35. "%(objectname)\t%(refname)\t%(creatordate:iso)",
  36. "refs",
  37. )
  38. output = subprocess.check_output(cmd, cwd=gitroot).decode()
  39. for line in output.splitlines():
  40. is_remote = False
  41. fields = line.strip().split("\t")
  42. if len(fields) != 3:
  43. continue
  44. commit = fields[0]
  45. refname = fields[1]
  46. creatordate = datetime.datetime.strptime(
  47. fields[2], "%Y-%m-%d %H:%M:%S %z"
  48. )
  49. # Parse refname
  50. matchobj = re.match(
  51. r"^refs/(heads|tags|remotes/[^/]+)/(\S+)$", refname
  52. )
  53. if not matchobj:
  54. continue
  55. source = matchobj.group(1)
  56. name = matchobj.group(2)
  57. if source.startswith("remotes/"):
  58. is_remote = True
  59. yield GitRef(name, commit, source, is_remote, refname, creatordate)
  60. def get_refs(
  61. gitroot, tag_whitelist, branch_whitelist, remote_whitelist, files=()
  62. ):
  63. for ref in get_all_refs(gitroot):
  64. if ref.source == "tags":
  65. if tag_whitelist is None or not re.match(tag_whitelist, ref.name):
  66. logger.debug(
  67. "Skipping '%s' because tag '%s' doesn't match the "
  68. "whitelist pattern",
  69. ref.refname,
  70. ref.name,
  71. )
  72. continue
  73. elif ref.source == "heads":
  74. if branch_whitelist is None or not re.match(
  75. branch_whitelist, ref.name
  76. ):
  77. logger.debug(
  78. "Skipping '%s' because branch '%s' doesn't match the "
  79. "whitelist pattern",
  80. ref.refname,
  81. ref.name,
  82. )
  83. continue
  84. elif ref.is_remote and remote_whitelist is not None:
  85. remote_name = ref.source.partition("/")[2]
  86. if not re.match(remote_whitelist, remote_name):
  87. logger.debug(
  88. "Skipping '%s' because remote '%s' doesn't match the "
  89. "whitelist pattern",
  90. ref.refname,
  91. remote_name,
  92. )
  93. continue
  94. if branch_whitelist is None or not re.match(
  95. branch_whitelist, ref.name
  96. ):
  97. logger.debug(
  98. "Skipping '%s' because branch '%s' doesn't match the "
  99. "whitelist pattern",
  100. ref.refname,
  101. ref.name,
  102. )
  103. continue
  104. else:
  105. logger.debug(
  106. "Skipping '%s' because its not a branch or tag", ref.refname
  107. )
  108. continue
  109. missing_files = [
  110. filename
  111. for filename in files
  112. if filename != "."
  113. and not file_exists(gitroot, ref.refname, filename)
  114. ]
  115. if missing_files:
  116. logger.debug(
  117. "Skipping '%s' because it lacks required files: %r",
  118. ref.refname,
  119. missing_files,
  120. )
  121. continue
  122. yield ref
  123. def file_exists(gitroot, refname, filename):
  124. if os.sep != "/":
  125. # Git requires / path sep, make sure we use that
  126. filename = filename.replace(os.sep, "/")
  127. cmd = (
  128. "git",
  129. "cat-file",
  130. "-e",
  131. "{}:{}".format(refname, filename),
  132. )
  133. proc = subprocess.run(
  134. cmd, cwd=gitroot, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL
  135. )
  136. return proc.returncode == 0
  137. def copy_tree(gitroot, src, dst, reference, sourcepath="."):
  138. with tempfile.SpooledTemporaryFile() as fp:
  139. cmd = (
  140. "git",
  141. "archive",
  142. "--format",
  143. "tar",
  144. reference.commit,
  145. "--",
  146. sourcepath,
  147. )
  148. subprocess.check_call(cmd, cwd=gitroot, stdout=fp)
  149. fp.seek(0)
  150. with tarfile.TarFile(fileobj=fp) as tarfp:
  151. tarfp.extractall(dst)