git.py 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163
  1. # -*- coding: utf-8 -*-
  2. import collections
  3. import datetime
  4. import logging
  5. import os
  6. import re
  7. import subprocess
  8. import tarfile
  9. import tempfile
  10. GitRef = collections.namedtuple(
  11. "VersionRef",
  12. ["name", "commit", "source", "is_remote", "refname", "creatordate",],
  13. )
  14. logger = logging.getLogger(__name__)
  15. def get_toplevel_path(cwd=None):
  16. cmd = (
  17. "git",
  18. "rev-parse",
  19. "--show-toplevel",
  20. )
  21. output = subprocess.check_output(cmd, cwd=cwd).decode()
  22. return output.rstrip("\n")
  23. def get_all_refs(gitroot):
  24. cmd = (
  25. "git",
  26. "for-each-ref",
  27. "--format",
  28. "%(objectname)\t%(refname)\t%(creatordate:iso)",
  29. "refs",
  30. )
  31. output = subprocess.check_output(cmd, cwd=gitroot).decode()
  32. for line in output.splitlines():
  33. is_remote = False
  34. fields = line.strip().split("\t")
  35. if len(fields) != 3:
  36. continue
  37. commit = fields[0]
  38. refname = fields[1]
  39. creatordate = datetime.datetime.strptime(
  40. fields[2], "%Y-%m-%d %H:%M:%S %z"
  41. )
  42. # Parse refname
  43. matchobj = re.match(
  44. r"^refs/(heads|tags|remotes/[^/]+)/(\S+)$", refname
  45. )
  46. if not matchobj:
  47. continue
  48. source = matchobj.group(1)
  49. name = matchobj.group(2)
  50. if source.startswith("remotes/"):
  51. is_remote = True
  52. yield GitRef(name, commit, source, is_remote, refname, creatordate)
  53. def get_refs(
  54. gitroot, tag_whitelist, branch_whitelist, remote_whitelist, files=()
  55. ):
  56. for ref in get_all_refs(gitroot):
  57. if ref.source == "tags":
  58. if tag_whitelist is None or not re.match(tag_whitelist, ref.name):
  59. logger.debug(
  60. "Skipping '%s' because tag '%s' doesn't match the "
  61. "whitelist pattern",
  62. ref.refname,
  63. ref.name,
  64. )
  65. continue
  66. elif ref.source == "heads":
  67. if branch_whitelist is None or not re.match(
  68. branch_whitelist, ref.name
  69. ):
  70. logger.debug(
  71. "Skipping '%s' because branch '%s' doesn't match the "
  72. "whitelist pattern",
  73. ref.refname,
  74. ref.name,
  75. )
  76. continue
  77. elif ref.is_remote and remote_whitelist is not None:
  78. remote_name = ref.source.partition("/")[2]
  79. if not re.match(remote_whitelist, remote_name):
  80. logger.debug(
  81. "Skipping '%s' because remote '%s' doesn't match the "
  82. "whitelist pattern",
  83. ref.refname,
  84. remote_name,
  85. )
  86. continue
  87. if branch_whitelist is None or not re.match(
  88. branch_whitelist, ref.name
  89. ):
  90. logger.debug(
  91. "Skipping '%s' because branch '%s' doesn't match the "
  92. "whitelist pattern",
  93. ref.refname,
  94. ref.name,
  95. )
  96. continue
  97. else:
  98. logger.debug(
  99. "Skipping '%s' because its not a branch or tag", ref.refname
  100. )
  101. continue
  102. missing_files = [
  103. filename
  104. for filename in files
  105. if filename != "."
  106. and not file_exists(gitroot, ref.refname, filename)
  107. ]
  108. if missing_files:
  109. logger.debug(
  110. "Skipping '%s' because it lacks required files: %r",
  111. ref.refname,
  112. missing_files,
  113. )
  114. continue
  115. yield ref
  116. def file_exists(gitroot, refname, filename):
  117. if os.sep != "/":
  118. # Git requires / path sep, make sure we use that
  119. filename = filename.replace(os.sep, "/")
  120. cmd = (
  121. "git",
  122. "cat-file",
  123. "-e",
  124. "{}:{}".format(refname, filename),
  125. )
  126. proc = subprocess.run(
  127. cmd, cwd=gitroot, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL
  128. )
  129. return proc.returncode == 0
  130. def copy_tree(gitroot, src, dst, reference, sourcepath="."):
  131. with tempfile.SpooledTemporaryFile() as fp:
  132. cmd = (
  133. "git",
  134. "archive",
  135. "--format",
  136. "tar",
  137. reference.commit,
  138. "--",
  139. sourcepath,
  140. )
  141. subprocess.check_call(cmd, cwd=gitroot, stdout=fp)
  142. fp.seek(0)
  143. with tarfile.TarFile(fileobj=fp) as tarfp:
  144. tarfp.extractall(dst)